// AddMemberCluster is passed to the secret controller as a callback to be called
// when a remote cluster is added. This function needs to set up all the handlers
// to watch for resources being added, deleted or changed on remote clusters.
func (m *Multicluster) AddMemberCluster(client kubelib.Client, clusterID string) error {
// stopCh to stop controller created here when cluster removed.
stopCh := make(chan struct{})
m.m.Lock()
options := m.opts
options.ClusterID = clusterID
log.Infof("Initializing Kubernetes service registry %q", options.ClusterID)
kubeRegistry := NewController(client, options)
m.serviceController.AddRegistry(kubeRegistry)
m.remoteKubeControllers[clusterID] = &kubeController{
Controller: kubeRegistry,
stopCh: stopCh,
}
localCluster := m.opts.ClusterID == clusterID
m.m.Unlock()
// Only need to add service handler for kubernetes registry as `initRegistryEventHandlers`,
// because when endpoints update `XDSUpdater.EDSUpdate` has already been called.
kubeRegistry.AppendServiceHandler(func(svc *model.Service, ev model.Event) { m.updateHandler(svc) })
// TODO move instance cache out of registries
if m.serviceEntryStore != nil && features.EnableServiceEntrySelectPods {
// Add an instance handler in the kubernetes registry to notify service entry store about pod events
kubeRegistry.AppendWorkloadHandler(m.serviceEntryStore.WorkloadInstanceHandler)
}
if localCluster {
// TODO implement deduping in aggregate registry to allow multiple k8s registries to handle WorkloadEntry
if m.serviceEntryStore != nil && features.EnableK8SServiceSelectWorkloadEntries {
// Add an instance handler in the service entry store to notify kubernetes about workload entry events
m.serviceEntryStore.AppendWorkloadHandler(kubeRegistry.WorkloadInstanceHandler)
}
}
// TODO only create namespace controller and cert patch for remote clusters (no way to tell currently)
if m.serviceController.Running() {
go kubeRegistry.Run(stopCh)
}
if m.fetchCaRoot != nil && m.fetchCaRoot() != nil && (features.ExternalIstioD || features.CentralIstioD || localCluster) {
log.Infof("joining leader-election for %s in %s", leaderelection.NamespaceController, options.SystemNamespace)
go leaderelection.
NewLeaderElection(options.SystemNamespace, m.serverID, leaderelection.NamespaceController, client.Kube()).
AddRunFunction(func(leaderStop <-chan struct{}) {
log.Infof("starting namespace controller for cluster %s", clusterID)
nc := NewNamespaceController(m.fetchCaRoot, client)
// Start informers again. This fixes the case where informers for namespace do not start,
// as we create them only after acquiring the leader lock
// Note: stop here should be the overall pilot stop, NOT the leader election stop. We are
// basically lazy loading the informer, if we stop it when we lose the lock we will never
// recreate it again.
client.RunAndWait(stopCh)
nc.Run(leaderStop)
}).Run(stopCh)
}
// Patch cert if a webhook config name is provided.
// This requires RBAC permissions - a low-priv Istiod should not attempt to patch but rely on
// operator or CI/CD
webhookConfigName := strings.ReplaceAll(validationWebhookConfigNameTemplate, validationWebhookConfigNameTemplateVar, m.secretNamespace)
if features.InjectionWebhookConfigName.Get() != "" && m.caBundlePath != "" && !localCluster && (features.ExternalIstioD || features.CentralIstioD) {
// TODO remove the patch loop init from initSidecarInjector (does this need leader elect? how well does it work with multi-primary?)
log.Infof("initializing webhook cert patch for cluster %s", clusterID)
patcher, err := webhooks.NewWebhookCertPatcher(client.Kube(), m.revision, webhookName, m.caBundlePath)
if err != nil {
log.Errorf("could not initialize webhook cert patcher")
} else {
patcher.Run(stopCh)
}
validationWebhookController := webhooks.CreateValidationWebhookController(client, webhookConfigName,
m.secretNamespace, m.caBundlePath, true)
if validationWebhookController != nil {
go validationWebhookController.Start(stopCh)
}
}
client.RunAndWait(stopCh)
return nil
}
// DeleteMemberCluster is passed to the secret controller as a callback to be called
// when a remote cluster is deleted. Also must clear the cache so remote resources
// are removed.
func (m *Multicluster) DeleteMemberCluster(clusterID string) error {
m.m.Lock()
defer m.m.Unlock()
m.serviceController.DeleteRegistry(clusterID)
kc, ok := m.remoteKubeControllers[clusterID]
if !ok {
log.Infof("cluster %s does not exist, maybe caused by invalid kubeconfig", clusterID)
return nil
}
if err := kc.Cleanup(); err != nil {
log.Warnf("failed cleaning up services in %s: %v", clusterID, err)
}
close(m.remoteKubeControllers[clusterID].stopCh)
delete(m.remoteKubeControllers, clusterID)
if m.XDSUpdater != nil {
m.XDSUpdater.ConfigUpdate(&model.PushRequest{Full: true})
}
return nil
}