Multicluster
Multicluster
func (s *Server) initServiceControllers(args *PilotArgs) error {
serviceControllers := s.ServiceController()
s.serviceEntryStore = serviceentry.NewServiceDiscovery(s.configController, s.environment.IstioConfigStore, s.XDSServer)
serviceControllers.AddRegistry(s.serviceEntryStore)
registered := make(map[serviceregistry.ProviderID]bool)
for _, r := range args.RegistryOptions.Registries {
serviceRegistry := serviceregistry.ProviderID(r)
if _, exists := registered[serviceRegistry]; exists {
log.Warnf("%s registry specified multiple times.", r)
continue
}
registered[serviceRegistry] = true
log.Infof("Adding %s registry adapter", serviceRegistry)
switch serviceRegistry {
case serviceregistry.Kubernetes:
if err := s.initKubeRegistry(args); err != nil {
return err
}
case serviceregistry.Mock:
s.initMockRegistry()
default:
return fmt.Errorf("service registry %s is not supported", r)
}
}
// Defer running of the service controllers.
s.addStartFunc(func(stop <-chan struct{}) error {
go serviceControllers.Run(stop)
return nil
})
return nil
}
func (s *Server) initKubeRegistry(args *PilotArgs) (err error) {
args.RegistryOptions.KubeOptions.ClusterID = s.clusterID
args.RegistryOptions.KubeOptions.Metrics = s.environment
args.RegistryOptions.KubeOptions.XDSUpdater = s.XDSServer
args.RegistryOptions.KubeOptions.NetworksWatcher = s.environment.NetworksWatcher
args.RegistryOptions.KubeOptions.SystemNamespace = args.Namespace
caBundlePath := s.caBundlePath
if hasCustomTLSCerts(args.ServerOptions.TLSOptions) {
caBundlePath = args.ServerOptions.TLSOptions.CaCertFile
}
mc := kubecontroller.NewMulticluster(args.PodName,
s.kubeClient,
args.RegistryOptions.ClusterRegistriesNamespace,
args.RegistryOptions.KubeOptions,
s.ServiceController(),
s.serviceEntryStore,
caBundlePath,
args.Revision,
s.fetchCARoot,
s.environment)
// initialize the "main" cluster registry before starting controllers for remote clusters
if err := mc.AddMemberCluster(s.kubeClient, args.RegistryOptions.KubeOptions.ClusterID); err != nil {
log.Errorf("failed initializing registry for %s: %v", args.RegistryOptions.KubeOptions.ClusterID, err)
return err
}
// start remote cluster controllers
s.addStartFunc(func(stop <-chan struct{}) error {
mc.InitSecretController(stop)
return nil
})
s.multicluster = mc
return
}
func (m *Multicluster) InitSecretController(stop <-chan struct{}) {
m.secretController = secretcontroller.StartSecretController(
m.client, m.AddMemberCluster, m.UpdateMemberCluster, m.DeleteMemberCluster,
m.secretNamespace, m.syncInterval, stop)
}
// AddMemberCluster is passed to the secret controller as a callback to be called
// when a remote cluster is added. This function needs to set up all the handlers
// to watch for resources being added, deleted or changed on remote clusters.
func (m *Multicluster) AddMemberCluster(client kubelib.Client, clusterID string) error {
// stopCh to stop controller created here when cluster removed.
stopCh := make(chan struct{})
m.m.Lock()
options := m.opts
options.ClusterID = clusterID
log.Infof("Initializing Kubernetes service registry %q", options.ClusterID)
kubeRegistry := NewController(client, options)
m.serviceController.AddRegistry(kubeRegistry)
m.remoteKubeControllers[clusterID] = &kubeController{
Controller: kubeRegistry,
stopCh: stopCh,
}
localCluster := m.opts.ClusterID == clusterID
m.m.Unlock()
// Only need to add service handler for kubernetes registry as `initRegistryEventHandlers`,
// because when endpoints update `XDSUpdater.EDSUpdate` has already been called.
kubeRegistry.AppendServiceHandler(func(svc *model.Service, ev model.Event) { m.updateHandler(svc) })
// TODO move instance cache out of registries
if m.serviceEntryStore != nil && features.EnableServiceEntrySelectPods {
// Add an instance handler in the kubernetes registry to notify service entry store about pod events
kubeRegistry.AppendWorkloadHandler(m.serviceEntryStore.WorkloadInstanceHandler)
}
if localCluster {
// TODO implement deduping in aggregate registry to allow multiple k8s registries to handle WorkloadEntry
if m.serviceEntryStore != nil && features.EnableK8SServiceSelectWorkloadEntries {
// Add an instance handler in the service entry store to notify kubernetes about workload entry events
m.serviceEntryStore.AppendWorkloadHandler(kubeRegistry.WorkloadInstanceHandler)
}
}
// TODO only create namespace controller and cert patch for remote clusters (no way to tell currently)
if m.serviceController.Running() {
go kubeRegistry.Run(stopCh)
}
if m.fetchCaRoot != nil && m.fetchCaRoot() != nil && (features.ExternalIstioD || features.CentralIstioD || localCluster) {
log.Infof("joining leader-election for %s in %s", leaderelection.NamespaceController, options.SystemNamespace)
go leaderelection.
NewLeaderElection(options.SystemNamespace, m.serverID, leaderelection.NamespaceController, client.Kube()).
AddRunFunction(func(leaderStop <-chan struct{}) {
log.Infof("starting namespace controller for cluster %s", clusterID)
nc := NewNamespaceController(m.fetchCaRoot, client)
// Start informers again. This fixes the case where informers for namespace do not start,
// as we create them only after acquiring the leader lock
// Note: stop here should be the overall pilot stop, NOT the leader election stop. We are
// basically lazy loading the informer, if we stop it when we lose the lock we will never
// recreate it again.
client.RunAndWait(stopCh)
nc.Run(leaderStop)
}).Run(stopCh)
}
// Patch cert if a webhook config name is provided.
// This requires RBAC permissions - a low-priv Istiod should not attempt to patch but rely on
// operator or CI/CD
webhookConfigName := strings.ReplaceAll(validationWebhookConfigNameTemplate, validationWebhookConfigNameTemplateVar, m.secretNamespace)
if features.InjectionWebhookConfigName.Get() != "" && m.caBundlePath != "" && !localCluster && (features.ExternalIstioD || features.CentralIstioD) {
// TODO remove the patch loop init from initSidecarInjector (does this need leader elect? how well does it work with multi-primary?)
log.Infof("initializing webhook cert patch for cluster %s", clusterID)
patcher, err := webhooks.NewWebhookCertPatcher(client.Kube(), m.revision, webhookName, m.caBundlePath)
if err != nil {
log.Errorf("could not initialize webhook cert patcher")
} else {
patcher.Run(stopCh)
}
validationWebhookController := webhooks.CreateValidationWebhookController(client, webhookConfigName,
m.secretNamespace, m.caBundlePath, true)
if validationWebhookController != nil {
go validationWebhookController.Start(stopCh)
}
}
client.RunAndWait(stopCh)
return nil
}
func (m *Multicluster) UpdateMemberCluster(clients kubelib.Client, clusterID string) error {
if err := m.DeleteMemberCluster(clusterID); err != nil {
return err
}
return m.AddMemberCluster(clients, clusterID)
}
// DeleteMemberCluster is passed to the secret controller as a callback to be called
// when a remote cluster is deleted. Also must clear the cache so remote resources
// are removed.
func (m *Multicluster) DeleteMemberCluster(clusterID string) error {
m.m.Lock()
defer m.m.Unlock()
m.serviceController.DeleteRegistry(clusterID)
kc, ok := m.remoteKubeControllers[clusterID]
if !ok {
log.Infof("cluster %s does not exist, maybe caused by invalid kubeconfig", clusterID)
return nil
}
if err := kc.Cleanup(); err != nil {
log.Warnf("failed cleaning up services in %s: %v", clusterID, err)
}
close(m.remoteKubeControllers[clusterID].stopCh)
delete(m.remoteKubeControllers, clusterID)
if m.XDSUpdater != nil {
m.XDSUpdater.ConfigUpdate(&model.PushRequest{Full: true})
}
return nil
}
Last updated