Multicluster

Multicluster

func (s *Server) initServiceControllers(args *PilotArgs) error {
    serviceControllers := s.ServiceController()

    s.serviceEntryStore = serviceentry.NewServiceDiscovery(s.configController, s.environment.IstioConfigStore, s.XDSServer)
    serviceControllers.AddRegistry(s.serviceEntryStore)

    registered := make(map[serviceregistry.ProviderID]bool)
    for _, r := range args.RegistryOptions.Registries {
        serviceRegistry := serviceregistry.ProviderID(r)
        if _, exists := registered[serviceRegistry]; exists {
            log.Warnf("%s registry specified multiple times.", r)
            continue
        }
        registered[serviceRegistry] = true
        log.Infof("Adding %s registry adapter", serviceRegistry)
        switch serviceRegistry {
        case serviceregistry.Kubernetes:
            if err := s.initKubeRegistry(args); err != nil {
                return err
            }
        case serviceregistry.Mock:
            s.initMockRegistry()
        default:
            return fmt.Errorf("service registry %s is not supported", r)
        }
    }

    // Defer running of the service controllers.
    s.addStartFunc(func(stop <-chan struct{}) error {
        go serviceControllers.Run(stop)
        return nil
    })

    return nil
}
func (s *Server) initKubeRegistry(args *PilotArgs) (err error) {
    args.RegistryOptions.KubeOptions.ClusterID = s.clusterID
    args.RegistryOptions.KubeOptions.Metrics = s.environment
    args.RegistryOptions.KubeOptions.XDSUpdater = s.XDSServer
    args.RegistryOptions.KubeOptions.NetworksWatcher = s.environment.NetworksWatcher
    args.RegistryOptions.KubeOptions.SystemNamespace = args.Namespace

    caBundlePath := s.caBundlePath
    if hasCustomTLSCerts(args.ServerOptions.TLSOptions) {
        caBundlePath = args.ServerOptions.TLSOptions.CaCertFile
    }

    mc := kubecontroller.NewMulticluster(args.PodName,
        s.kubeClient,
        args.RegistryOptions.ClusterRegistriesNamespace,
        args.RegistryOptions.KubeOptions,
        s.ServiceController(),
        s.serviceEntryStore,
        caBundlePath,
        args.Revision,
        s.fetchCARoot,
        s.environment)

    // initialize the "main" cluster registry before starting controllers for remote clusters
    if err := mc.AddMemberCluster(s.kubeClient, args.RegistryOptions.KubeOptions.ClusterID); err != nil {
        log.Errorf("failed initializing registry for %s: %v", args.RegistryOptions.KubeOptions.ClusterID, err)
        return err
    }

    // start remote cluster controllers
    s.addStartFunc(func(stop <-chan struct{}) error {
        mc.InitSecretController(stop)
        return nil
    })

    s.multicluster = mc
    return
}
func (m *Multicluster) InitSecretController(stop <-chan struct{}) {
    m.secretController = secretcontroller.StartSecretController(
        m.client, m.AddMemberCluster, m.UpdateMemberCluster, m.DeleteMemberCluster,
        m.secretNamespace, m.syncInterval, stop)
}
// AddMemberCluster is passed to the secret controller as a callback to be called
// when a remote cluster is added.  This function needs to set up all the handlers
// to watch for resources being added, deleted or changed on remote clusters.
func (m *Multicluster) AddMemberCluster(client kubelib.Client, clusterID string) error {
    // stopCh to stop controller created here when cluster removed.
    stopCh := make(chan struct{})
    m.m.Lock()
    options := m.opts
    options.ClusterID = clusterID

    log.Infof("Initializing Kubernetes service registry %q", options.ClusterID)
    kubeRegistry := NewController(client, options)
    m.serviceController.AddRegistry(kubeRegistry)
    m.remoteKubeControllers[clusterID] = &kubeController{
        Controller: kubeRegistry,
        stopCh:     stopCh,
    }
    localCluster := m.opts.ClusterID == clusterID

    m.m.Unlock()

    // Only need to add service handler for kubernetes registry as `initRegistryEventHandlers`,
    // because when endpoints update `XDSUpdater.EDSUpdate` has already been called.
    kubeRegistry.AppendServiceHandler(func(svc *model.Service, ev model.Event) { m.updateHandler(svc) })

    // TODO move instance cache out of registries
    if m.serviceEntryStore != nil && features.EnableServiceEntrySelectPods {
        // Add an instance handler in the kubernetes registry to notify service entry store about pod events
        kubeRegistry.AppendWorkloadHandler(m.serviceEntryStore.WorkloadInstanceHandler)
    }

    if localCluster {
        // TODO implement deduping in aggregate registry to allow multiple k8s registries to handle WorkloadEntry
        if m.serviceEntryStore != nil && features.EnableK8SServiceSelectWorkloadEntries {
            // Add an instance handler in the service entry store to notify kubernetes about workload entry events
            m.serviceEntryStore.AppendWorkloadHandler(kubeRegistry.WorkloadInstanceHandler)
        }
    }

    // TODO only create namespace controller and cert patch for remote clusters (no way to tell currently)
    if m.serviceController.Running() {
        go kubeRegistry.Run(stopCh)
    }
    if m.fetchCaRoot != nil && m.fetchCaRoot() != nil && (features.ExternalIstioD || features.CentralIstioD || localCluster) {
        log.Infof("joining leader-election for %s in %s", leaderelection.NamespaceController, options.SystemNamespace)
        go leaderelection.
            NewLeaderElection(options.SystemNamespace, m.serverID, leaderelection.NamespaceController, client.Kube()).
            AddRunFunction(func(leaderStop <-chan struct{}) {
                log.Infof("starting namespace controller for cluster %s", clusterID)
                nc := NewNamespaceController(m.fetchCaRoot, client)
                // Start informers again. This fixes the case where informers for namespace do not start,
                // as we create them only after acquiring the leader lock
                // Note: stop here should be the overall pilot stop, NOT the leader election stop. We are
                // basically lazy loading the informer, if we stop it when we lose the lock we will never
                // recreate it again.
                client.RunAndWait(stopCh)
                nc.Run(leaderStop)
            }).Run(stopCh)
    }

    // Patch cert if a webhook config name is provided.
    // This requires RBAC permissions - a low-priv Istiod should not attempt to patch but rely on
    // operator or CI/CD
    webhookConfigName := strings.ReplaceAll(validationWebhookConfigNameTemplate, validationWebhookConfigNameTemplateVar, m.secretNamespace)
    if features.InjectionWebhookConfigName.Get() != "" && m.caBundlePath != "" && !localCluster && (features.ExternalIstioD || features.CentralIstioD) {
        // TODO remove the patch loop init from initSidecarInjector (does this need leader elect? how well does it work with multi-primary?)
        log.Infof("initializing webhook cert patch for cluster %s", clusterID)
        patcher, err := webhooks.NewWebhookCertPatcher(client.Kube(), m.revision, webhookName, m.caBundlePath)
        if err != nil {
            log.Errorf("could not initialize webhook cert patcher")
        } else {
            patcher.Run(stopCh)
        }
        validationWebhookController := webhooks.CreateValidationWebhookController(client, webhookConfigName,
            m.secretNamespace, m.caBundlePath, true)
        if validationWebhookController != nil {
            go validationWebhookController.Start(stopCh)
        }
    }

    client.RunAndWait(stopCh)
    return nil
}
func (m *Multicluster) UpdateMemberCluster(clients kubelib.Client, clusterID string) error {
    if err := m.DeleteMemberCluster(clusterID); err != nil {
        return err
    }
    return m.AddMemberCluster(clients, clusterID)
}
// DeleteMemberCluster is passed to the secret controller as a callback to be called
// when a remote cluster is deleted.  Also must clear the cache so remote resources
// are removed.
func (m *Multicluster) DeleteMemberCluster(clusterID string) error {

    m.m.Lock()
    defer m.m.Unlock()
    m.serviceController.DeleteRegistry(clusterID)
    kc, ok := m.remoteKubeControllers[clusterID]
    if !ok {
        log.Infof("cluster %s does not exist, maybe caused by invalid kubeconfig", clusterID)
        return nil
    }
    if err := kc.Cleanup(); err != nil {
        log.Warnf("failed cleaning up services in %s: %v", clusterID, err)
    }
    close(m.remoteKubeControllers[clusterID].stopCh)
    delete(m.remoteKubeControllers, clusterID)
    if m.XDSUpdater != nil {
        m.XDSUpdater.ConfigUpdate(&model.PushRequest{Full: true})
    }

    return nil
}

Last updated

Was this helpful?