SDS Server

多集群证书管理

初始化SDS Server

s.initSDSServer(args)
func (s *Server) initSDSServer(args *PilotArgs) {
    if s.kubeClient != nil {
        // 是否校验客户端身份
        if !features.EnableXDSIdentityCheck {
            // Make sure we have security
            log.Warnf("skipping Kubernetes credential reader; PILOT_ENABLE_XDS_IDENTITY_CHECK must be set to true for this feature.")
        } else {
            // 多集群初始化
            sc := kubesecrets.NewMulticluster(s.kubeClient, s.clusterID, args.RegistryOptions.ClusterRegistriesNamespace, make(chan struct{}))
            // 添加事件处理器,当有变化则推送XDS
            sc.AddEventHandler(func(name, namespace string) {
                s.XDSServer.ConfigUpdate(&model.PushRequest{
                    Full: false,
                    ConfigsUpdated: map[model.ConfigKey]struct{}{
                        {
                            Kind:      gvk.Secret,
                            Name:      name,
                            Namespace: namespace,
                        }: {},
                    },
                    Reason: []model.TriggerReason{model.SecretTrigger},
                })
            })
            // sds Generator
            s.XDSServer.Generators[v3.SecretType] = xds.NewSecretGen(sc, s.XDSServer.Cache)
        }
    }
}
func NewMulticluster(client kube.Client, localCluster, secretNamespace string, stop chan struct{}) *Multicluster {
    // 初始化
    m := &Multicluster{
        remoteKubeControllers: map[string]*SecretsController{},
        localCluster:          localCluster,
        stop:                  stop,
    }
    // 添加本地集群
    m.addMemberCluster(client, localCluster)
    // 启动informer
    sc := secretcontroller.StartSecretController(client,
        func(c kube.Client, k string) error { m.addMemberCluster(c, k); return nil },
        func(c kube.Client, k string) error { m.updateMemberCluster(c, k); return nil },
        func(k string) error { m.deleteMemberCluster(k); return nil },
        secretNamespace,
        time.Millisecond*100,
        stop)
    m.secretController = sc
    return m
}
func NewController(
    kubeclientset kubernetes.Interface,
    namespace string,
    cs *ClusterStore,
    addCallback addSecretCallback,
    updateCallback updateSecretCallback,
    removeCallback removeSecretCallback) *Controller {

    //获取informer 需要有 "istio/multiCluster=true"标签
    secretsInformer := cache.NewSharedIndexInformer(
        &cache.ListWatch{
            ListFunc: func(opts meta_v1.ListOptions) (runtime.Object, error) {
                opts.LabelSelector = MultiClusterSecretLabel + "=true"
                return kubeclientset.CoreV1().Secrets(namespace).List(context.TODO(), opts)
            },
            WatchFunc: func(opts meta_v1.ListOptions) (watch.Interface, error) {
                opts.LabelSelector = MultiClusterSecretLabel + "=true"
                return kubeclientset.CoreV1().Secrets(namespace).Watch(context.TODO(), opts)
            },
        },
        &corev1.Secret{}, 0, cache.Indexers{},
    )
    // 获取一个queue
    queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

    controller := &Controller{
        kubeclientset:  kubeclientset,
        namespace:      namespace,
        cs:             cs,
        informer:       secretsInformer,
        queue:          queue,
        addCallback:    addCallback,
        updateCallback: updateCallback,
        removeCallback: removeCallback,
    }

    log.Info("Setting up event handlers")
    // 添加事件处理,全部放入queue
    secretsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            key, err := cache.MetaNamespaceKeyFunc(obj)
            log.Infof("Processing add: %s", key)
            if err == nil {
                queue.Add(key)
            }
        },
        UpdateFunc: func(oldObj, newObj interface{}) {
            if oldObj == newObj || reflect.DeepEqual(oldObj, newObj) {
                return
            }

            key, err := cache.MetaNamespaceKeyFunc(newObj)
            log.Infof("Processing update: %s", key)
            if err == nil {
                queue.Add(key)
            }
        },
        DeleteFunc: func(obj interface{}) {
            key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
            log.Infof("Processing delete: %s", key)
            if err == nil {
                queue.Add(key)
            }
        },
    })

    return controller
}

事件处理

func (c *Controller) processNextItem() bool {
    // 获取secret
    secretName, quit := c.queue.Get()
    if quit {
        return false
    }
    defer c.queue.Done(secretName)

    // 具体处理逻辑
    err := c.processItem(secretName.(string))
    if err == nil {
        // No error, reset the ratelimit counters
        c.queue.Forget(secretName)
    } else if c.queue.NumRequeues(secretName) < maxRetries {
        log.Errorf("Error processing %s (will retry): %v", secretName, err)
        c.queue.AddRateLimited(secretName)
    } else {
        log.Errorf("Error processing %s (giving up): %v", secretName, err)
        c.queue.Forget(secretName)
        utilruntime.HandleError(err)
    }

    return true
}
func (c *Controller) processItem(secretName string) error {
    if secretName == initialSyncSignal {
        c.initialSync.Store(true)
        return nil
    }

    obj, exists, err := c.informer.GetIndexer().GetByKey(secretName)
    if err != nil {
        return fmt.Errorf("error fetching object %s error: %v", secretName, err)
    }

    if exists {
        // 存在用addMemberCluster进行处理
        c.addMemberCluster(secretName, obj.(*corev1.Secret))
    } else {
        // 不存在则删除
        c.deleteMemberCluster(secretName)
    }

    return nil
}
func (c *Controller) addMemberCluster(secretName string, s *corev1.Secret) {
    for clusterID, kubeConfig := range s.Data {
        // 判断cluster是否存在
        if prev, ok := c.cs.remoteClusters[clusterID]; !ok {
            log.Infof("Adding cluster_id=%v from secret=%v", clusterID, secretName)
            // 创建remote集群对象
            remoteCluster, err := createRemoteCluster(kubeConfig, secretName)
            if err != nil {
                log.Errorf("Failed to add remote cluster from secret=%v for cluster_id=%v: %v",
                    secretName, clusterID, err)
                continue
            }
            // 写入remoteClusters
            c.cs.remoteClusters[clusterID] = remoteCluster
            // 调用addCallback
            if err := c.addCallback(remoteCluster.clients, clusterID); err != nil {
                log.Errorf("Error creating cluster_id=%s from secret %v: %v",
                    clusterID, secretName, err)
            }
        } else {
            if prev.secretName != secretName {
                log.Errorf("ClusterID reused in two different secrets: %v and %v. ClusterID "+
                    "must be unique across all secrets", prev.secretName, secretName)
                continue
            }

            kubeConfigSha := sha256.Sum256(kubeConfig)
            if bytes.Equal(kubeConfigSha[:], prev.kubeConfigSha[:]) {
                log.Infof("Updating cluster_id=%v from secret=%v: (kubeconfig are identical)", clusterID, secretName)
            } else {
                log.Infof("Updating cluster %v from secret %v", clusterID, secretName)

                remoteCluster, err := createRemoteCluster(kubeConfig, secretName)
                if err != nil {
                    log.Errorf("Error updating cluster_id=%v from secret=%v: %v",
                        clusterID, secretName, err)
                    continue
                }
                c.cs.remoteClusters[clusterID] = remoteCluster
                // 调用updateCallback
                if err := c.updateCallback(remoteCluster.clients, clusterID); err != nil {
                    log.Errorf("Error updating cluster_id from secret=%v: %s %v",
                        clusterID, secretName, err)
                }
            }
        }
    }

    log.Infof("Number of remote clusters: %d", len(c.cs.remoteClusters))
}
func (c *Controller) deleteMemberCluster(secretName string) {
    for clusterID, cluster := range c.cs.remoteClusters {
        if cluster.secretName == secretName {
            log.Infof("Deleting cluster_id=%v configured by secret=%v", clusterID, secretName)
            // 调用remove call back
            err := c.removeCallback(clusterID)
            if err != nil {
                log.Errorf("Error removing cluster_id=%v configured by secret=%v: %v",
                    clusterID, secretName, err)
            }
            delete(c.cs.remoteClusters, clusterID)
        }
    }
    log.Infof("Number of remote clusters: %d", len(c.cs.remoteClusters))
}

callbacks

  • addcallback

func (m *Multicluster) addMemberCluster(clients kube.Client, key string) {
    log.Infof("initializing Kubernetes credential reader for cluster %v", key)
    sc := NewSecretsController(clients, key)
    m.m.Lock()
    m.remoteKubeControllers[key] = sc
    m.m.Unlock()
    clients.RunAndWait(m.stop)
}
  • updatecallback

func (m *Multicluster) updateMemberCluster(clients kube.Client, key string) {
    m.deleteMemberCluster(key)
    m.addMemberCluster(clients, key)
}
  • delete callback

func (m *Multicluster) deleteMemberCluster(key string) {
    m.m.Lock()
    delete(m.remoteKubeControllers, key)
    m.m.Unlock()
}

Generate

func (s *SecretGen) Generate(proxy *model.Proxy, _ *model.PushContext, w *model.WatchedResource, req *model.PushRequest) model.Resources {
    if proxy.VerifiedIdentity == nil {
        adsLog.Warnf("proxy %v is not authorized to receive secrets. Ensure you are connecting over TLS port and are authenticated.", proxy.ID)
        return nil
    }
    // 获取远程和本地集群的secretcontroller
    secrets, err := s.secrets.ForCluster(proxy.Metadata.ClusterID)
    if err != nil {
        adsLog.Warnf("proxy %v is from an unknown cluster, cannot retrieve certificates: %v", proxy.ID, err)
        return nil
    }
    // 认证客户端身份
    if err := secrets.Authorize(proxy.VerifiedIdentity.ServiceAccount, proxy.VerifiedIdentity.Namespace); err != nil {
        adsLog.Warnf("proxy %v is not authorized to receive secrets: %v", proxy.ID, err)
        return nil
    }
    if req == nil || !needsUpdate(proxy, req.ConfigsUpdated) {
        return nil
    }
    var updatedSecrets map[model.ConfigKey]struct{}
    if !req.Full {
        updatedSecrets = model.ConfigsOfKind(req.ConfigsUpdated, gvk.Secret)
    }
    results := model.Resources{}
    for _, resource := range w.ResourceNames {
        // 获取对应的secretresource
        sr, err := parseResourceName(resource, proxy.ConfigNamespace)
        if err != nil {
            adsLog.Warnf("error parsing resource name: %v", err)
            continue
        }

        if updatedSecrets != nil {
            if !containsAny(updatedSecrets, relatedConfigs(model.ConfigKey{Kind: gvk.Secret, Name: sr.Name, Namespace: sr.Namespace})) {
                // This is an incremental update, filter out secrets that are not updated.
                continue
            }
        }
        // 请求的资源必须和proxy同一命名空间
        if err := s.proxyAuthorizedForSecret(proxy, sr); err != nil {
            adsLog.Warnf("requested secret %v not accessible for proxy %v: %v", sr.ResourceName, proxy.ID, err)
            continue
        }
        // 如果已经缓存则直接获取
        if cached, f := s.cache.Get(sr); f {
            // If it is in the Cache, add it and continue
            results = append(results, cached)
            continue
        }
        // 判断是否为网关根证书
        isCAOnlySecret := strings.HasSuffix(sr.Name, GatewaySdsCaSuffix)
        if isCAOnlySecret {
            // 获取根证书
            secret := secrets.GetCaCert(sr.Name, sr.Namespace)
            if secret != nil {
                res := toEnvoyCaSecret(sr.ResourceName, secret)
                results = append(results, res)
                s.cache.Add(sr, res)
            } else {
                adsLog.Warnf("failed to fetch ca certificate for %v", sr.ResourceName)
            }
        } else {
            // 获取私钥和证书
            key, cert := secrets.GetKeyAndCert(sr.Name, sr.Namespace)
            if key != nil && cert != nil {
                res := toEnvoyKeyCertSecret(sr.ResourceName, key, cert)
                results = append(results, res)
                s.cache.Add(sr, res)
            } else {
                adsLog.Warnf("failed to fetch key and certificate for %v", sr.ResourceName)
            }
        }
    }
    return results
}

Last updated