SDS Server
多集群证书管理
初始化SDS Server
s.initSDSServer(args)
func (s *Server) initSDSServer(args *PilotArgs) {
if s.kubeClient != nil {
// 是否校验客户端身份
if !features.EnableXDSIdentityCheck {
// Make sure we have security
log.Warnf("skipping Kubernetes credential reader; PILOT_ENABLE_XDS_IDENTITY_CHECK must be set to true for this feature.")
} else {
// 多集群初始化
sc := kubesecrets.NewMulticluster(s.kubeClient, s.clusterID, args.RegistryOptions.ClusterRegistriesNamespace, make(chan struct{}))
// 添加事件处理器,当有变化则推送XDS
sc.AddEventHandler(func(name, namespace string) {
s.XDSServer.ConfigUpdate(&model.PushRequest{
Full: false,
ConfigsUpdated: map[model.ConfigKey]struct{}{
{
Kind: gvk.Secret,
Name: name,
Namespace: namespace,
}: {},
},
Reason: []model.TriggerReason{model.SecretTrigger},
})
})
// sds Generator
s.XDSServer.Generators[v3.SecretType] = xds.NewSecretGen(sc, s.XDSServer.Cache)
}
}
}
func NewMulticluster(client kube.Client, localCluster, secretNamespace string, stop chan struct{}) *Multicluster {
// 初始化
m := &Multicluster{
remoteKubeControllers: map[string]*SecretsController{},
localCluster: localCluster,
stop: stop,
}
// 添加本地集群
m.addMemberCluster(client, localCluster)
// 启动informer
sc := secretcontroller.StartSecretController(client,
func(c kube.Client, k string) error { m.addMemberCluster(c, k); return nil },
func(c kube.Client, k string) error { m.updateMemberCluster(c, k); return nil },
func(k string) error { m.deleteMemberCluster(k); return nil },
secretNamespace,
time.Millisecond*100,
stop)
m.secretController = sc
return m
}
func NewController(
kubeclientset kubernetes.Interface,
namespace string,
cs *ClusterStore,
addCallback addSecretCallback,
updateCallback updateSecretCallback,
removeCallback removeSecretCallback) *Controller {
//获取informer 需要有 "istio/multiCluster=true"标签
secretsInformer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(opts meta_v1.ListOptions) (runtime.Object, error) {
opts.LabelSelector = MultiClusterSecretLabel + "=true"
return kubeclientset.CoreV1().Secrets(namespace).List(context.TODO(), opts)
},
WatchFunc: func(opts meta_v1.ListOptions) (watch.Interface, error) {
opts.LabelSelector = MultiClusterSecretLabel + "=true"
return kubeclientset.CoreV1().Secrets(namespace).Watch(context.TODO(), opts)
},
},
&corev1.Secret{}, 0, cache.Indexers{},
)
// 获取一个queue
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
controller := &Controller{
kubeclientset: kubeclientset,
namespace: namespace,
cs: cs,
informer: secretsInformer,
queue: queue,
addCallback: addCallback,
updateCallback: updateCallback,
removeCallback: removeCallback,
}
log.Info("Setting up event handlers")
// 添加事件处理,全部放入queue
secretsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
log.Infof("Processing add: %s", key)
if err == nil {
queue.Add(key)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
if oldObj == newObj || reflect.DeepEqual(oldObj, newObj) {
return
}
key, err := cache.MetaNamespaceKeyFunc(newObj)
log.Infof("Processing update: %s", key)
if err == nil {
queue.Add(key)
}
},
DeleteFunc: func(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
log.Infof("Processing delete: %s", key)
if err == nil {
queue.Add(key)
}
},
})
return controller
}
事件处理
func (c *Controller) processNextItem() bool {
// 获取secret
secretName, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(secretName)
// 具体处理逻辑
err := c.processItem(secretName.(string))
if err == nil {
// No error, reset the ratelimit counters
c.queue.Forget(secretName)
} else if c.queue.NumRequeues(secretName) < maxRetries {
log.Errorf("Error processing %s (will retry): %v", secretName, err)
c.queue.AddRateLimited(secretName)
} else {
log.Errorf("Error processing %s (giving up): %v", secretName, err)
c.queue.Forget(secretName)
utilruntime.HandleError(err)
}
return true
}
func (c *Controller) processItem(secretName string) error {
if secretName == initialSyncSignal {
c.initialSync.Store(true)
return nil
}
obj, exists, err := c.informer.GetIndexer().GetByKey(secretName)
if err != nil {
return fmt.Errorf("error fetching object %s error: %v", secretName, err)
}
if exists {
// 存在用addMemberCluster进行处理
c.addMemberCluster(secretName, obj.(*corev1.Secret))
} else {
// 不存在则删除
c.deleteMemberCluster(secretName)
}
return nil
}
func (c *Controller) addMemberCluster(secretName string, s *corev1.Secret) {
for clusterID, kubeConfig := range s.Data {
// 判断cluster是否存在
if prev, ok := c.cs.remoteClusters[clusterID]; !ok {
log.Infof("Adding cluster_id=%v from secret=%v", clusterID, secretName)
// 创建remote集群对象
remoteCluster, err := createRemoteCluster(kubeConfig, secretName)
if err != nil {
log.Errorf("Failed to add remote cluster from secret=%v for cluster_id=%v: %v",
secretName, clusterID, err)
continue
}
// 写入remoteClusters
c.cs.remoteClusters[clusterID] = remoteCluster
// 调用addCallback
if err := c.addCallback(remoteCluster.clients, clusterID); err != nil {
log.Errorf("Error creating cluster_id=%s from secret %v: %v",
clusterID, secretName, err)
}
} else {
if prev.secretName != secretName {
log.Errorf("ClusterID reused in two different secrets: %v and %v. ClusterID "+
"must be unique across all secrets", prev.secretName, secretName)
continue
}
kubeConfigSha := sha256.Sum256(kubeConfig)
if bytes.Equal(kubeConfigSha[:], prev.kubeConfigSha[:]) {
log.Infof("Updating cluster_id=%v from secret=%v: (kubeconfig are identical)", clusterID, secretName)
} else {
log.Infof("Updating cluster %v from secret %v", clusterID, secretName)
remoteCluster, err := createRemoteCluster(kubeConfig, secretName)
if err != nil {
log.Errorf("Error updating cluster_id=%v from secret=%v: %v",
clusterID, secretName, err)
continue
}
c.cs.remoteClusters[clusterID] = remoteCluster
// 调用updateCallback
if err := c.updateCallback(remoteCluster.clients, clusterID); err != nil {
log.Errorf("Error updating cluster_id from secret=%v: %s %v",
clusterID, secretName, err)
}
}
}
}
log.Infof("Number of remote clusters: %d", len(c.cs.remoteClusters))
}
func (c *Controller) deleteMemberCluster(secretName string) {
for clusterID, cluster := range c.cs.remoteClusters {
if cluster.secretName == secretName {
log.Infof("Deleting cluster_id=%v configured by secret=%v", clusterID, secretName)
// 调用remove call back
err := c.removeCallback(clusterID)
if err != nil {
log.Errorf("Error removing cluster_id=%v configured by secret=%v: %v",
clusterID, secretName, err)
}
delete(c.cs.remoteClusters, clusterID)
}
}
log.Infof("Number of remote clusters: %d", len(c.cs.remoteClusters))
}
callbacks
addcallback
func (m *Multicluster) addMemberCluster(clients kube.Client, key string) {
log.Infof("initializing Kubernetes credential reader for cluster %v", key)
sc := NewSecretsController(clients, key)
m.m.Lock()
m.remoteKubeControllers[key] = sc
m.m.Unlock()
clients.RunAndWait(m.stop)
}
updatecallback
func (m *Multicluster) updateMemberCluster(clients kube.Client, key string) {
m.deleteMemberCluster(key)
m.addMemberCluster(clients, key)
}
delete callback
func (m *Multicluster) deleteMemberCluster(key string) {
m.m.Lock()
delete(m.remoteKubeControllers, key)
m.m.Unlock()
}
Generate
func (s *SecretGen) Generate(proxy *model.Proxy, _ *model.PushContext, w *model.WatchedResource, req *model.PushRequest) model.Resources {
if proxy.VerifiedIdentity == nil {
adsLog.Warnf("proxy %v is not authorized to receive secrets. Ensure you are connecting over TLS port and are authenticated.", proxy.ID)
return nil
}
// 获取远程和本地集群的secretcontroller
secrets, err := s.secrets.ForCluster(proxy.Metadata.ClusterID)
if err != nil {
adsLog.Warnf("proxy %v is from an unknown cluster, cannot retrieve certificates: %v", proxy.ID, err)
return nil
}
// 认证客户端身份
if err := secrets.Authorize(proxy.VerifiedIdentity.ServiceAccount, proxy.VerifiedIdentity.Namespace); err != nil {
adsLog.Warnf("proxy %v is not authorized to receive secrets: %v", proxy.ID, err)
return nil
}
if req == nil || !needsUpdate(proxy, req.ConfigsUpdated) {
return nil
}
var updatedSecrets map[model.ConfigKey]struct{}
if !req.Full {
updatedSecrets = model.ConfigsOfKind(req.ConfigsUpdated, gvk.Secret)
}
results := model.Resources{}
for _, resource := range w.ResourceNames {
// 获取对应的secretresource
sr, err := parseResourceName(resource, proxy.ConfigNamespace)
if err != nil {
adsLog.Warnf("error parsing resource name: %v", err)
continue
}
if updatedSecrets != nil {
if !containsAny(updatedSecrets, relatedConfigs(model.ConfigKey{Kind: gvk.Secret, Name: sr.Name, Namespace: sr.Namespace})) {
// This is an incremental update, filter out secrets that are not updated.
continue
}
}
// 请求的资源必须和proxy同一命名空间
if err := s.proxyAuthorizedForSecret(proxy, sr); err != nil {
adsLog.Warnf("requested secret %v not accessible for proxy %v: %v", sr.ResourceName, proxy.ID, err)
continue
}
// 如果已经缓存则直接获取
if cached, f := s.cache.Get(sr); f {
// If it is in the Cache, add it and continue
results = append(results, cached)
continue
}
// 判断是否为网关根证书
isCAOnlySecret := strings.HasSuffix(sr.Name, GatewaySdsCaSuffix)
if isCAOnlySecret {
// 获取根证书
secret := secrets.GetCaCert(sr.Name, sr.Namespace)
if secret != nil {
res := toEnvoyCaSecret(sr.ResourceName, secret)
results = append(results, res)
s.cache.Add(sr, res)
} else {
adsLog.Warnf("failed to fetch ca certificate for %v", sr.ResourceName)
}
} else {
// 获取私钥和证书
key, cert := secrets.GetKeyAndCert(sr.Name, sr.Namespace)
if key != nil && cert != nil {
res := toEnvoyKeyCertSecret(sr.ResourceName, key, cert)
results = append(results, res)
s.cache.Add(sr, res)
} else {
adsLog.Warnf("failed to fetch key and certificate for %v", sr.ResourceName)
}
}
}
return results
}
Last updated
Was this helpful?