# XDS Server

## XDS Server

istiod 中 xds Server 分为Secure/insecure 两种

## initSecureDiscoveryService

对于启用tls的DiscoveryService需要先初始化spiffe验证器,对应的方法为setPeerCertVerifier

### setPeerCertVerifier

```
func (s *Server) setPeerCertVerifier(tlsOptions TLSOptions) error {
    if tlsOptions.CaCertFile == "" && s.CA == nil && features.SpiffeBundleEndpoints == "" {
        // Running locally without configured certs - no TLS mode
        return nil
    }
    s.peerCertVerifier = spiffe.NewPeerCertVerifier()
    var rootCertBytes []byte
    var err error
    // 判断是否手动指定
    if tlsOptions.CaCertFile != "" {
        if rootCertBytes, err = ioutil.ReadFile(tlsOptions.CaCertFile); err != nil {
            return err
        }
    } else {
        // 加载RA cert
        if s.RA != nil {
            rootCertBytes = append(rootCertBytes, s.RA.GetCAKeyCertBundle().GetRootCertPem()...)
        }
        // 加载CA cert
        if s.CA != nil {
            rootCertBytes = append(rootCertBytes, s.CA.GetCAKeyCertBundle().GetRootCertPem()...)
        }
    }

    if len(rootCertBytes) != 0 {
        // 根据信任域添加添加CA证书到certPools/generalCertPool
        err := s.peerCertVerifier.AddMappingFromPEM(spiffe.GetTrustDomain(), rootCertBytes)
        if err != nil {
            log.Errorf("Add Root CAs into peerCertVerifier failed: %v", err)
            return fmt.Errorf("add root CAs into peerCertVerifier failed: %v", err)
        }
    }

    if features.SpiffeBundleEndpoints != "" {
        certMap, err := spiffe.RetrieveSpiffeBundleRootCertsFromStringInput(
            features.SpiffeBundleEndpoints, []*x509.Certificate{})
        if err != nil {
            return err
        }
        s.peerCertVerifier.AddMappings(certMap)
    }

    return nil
}
```

在initSecureDiscoveryService初始化grpc server时通过该验证器进行验证客户端身份

```
    cfg := &tls.Config{
        GetCertificate: s.getIstiodCertificate,
        ClientAuth:     tls.VerifyClientCertIfGiven,
        ClientCAs:      s.peerCertVerifier.GetGeneralCertPool(),
        VerifyPeerCertificate: func(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error {
            err := s.peerCertVerifier.VerifyPeerCert(rawCerts, verifiedChains)
            if err != nil {
                log.Infof("Could not verify certificate: %v", err)
            }
            return err
        },
    }
```

具体验证客户端cert的逻辑如下

```
func (v *PeerCertVerifier) VerifyPeerCert(rawCerts [][]byte, _ [][]*x509.Certificate) error {
    if len(rawCerts) == 0 {
        // Peer doesn't present a certificate. Just skip. Other authn methods may be used.
        return nil
    }
    var peerCert *x509.Certificate
    intCertPool := x509.NewCertPool()
    for id, rawCert := range rawCerts {
        cert, err := x509.ParseCertificate(rawCert)
        if err != nil {
            return err
        }
        if id == 0 {
            peerCert = cert
        } else {
            intCertPool.AddCert(cert)
        }
    }
    if len(peerCert.URIs) != 1 {
        return fmt.Errorf("peer certificate does not contain 1 URI type SAN, detected %d", len(peerCert.URIs))
    }

    //根据证书的URI获取信任域
    trustDomain, err := GetTrustDomainFromURISAN(peerCert.URIs[0].String())
    if err != nil {
        return err
    }
    // 根据信任域获取对应的根证书
    rootCertPool, ok := v.certPools[trustDomain]
    if !ok {
        return fmt.Errorf("no cert pool found for trust domain %s", trustDomain)
    }
    // 验证客户端证书
    _, err = peerCert.Verify(x509.VerifyOptions{
        Roots:         rootCertPool,
        Intermediates: intCertPool,
    })
    return err
}
```

注册handler s.XDSServer.Register(s.secureGrpcServer)

StreamAggregatedResources 实现了envoy ADS接口

```go
func (s *DiscoveryServer) StreamAggregatedResources(stream discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
    // 检查服务器是否准备好接受客户端并处理新请求.当前准备就绪意味着缓存已同步,因此可以正确构建集群.如果不进行此检查,则下面的InitContext()调用将使用空配置初始化,从而导致重新连接的Envoy失去配置.这是除了添加caches之外的另一项安全检查。已将逻辑同步到就绪探针以处理kube-proxy ip表更新延迟的情况。
    if !s.IsServerReady() {
        return errors.New("server is not ready to serve discovery information")
    }

    ctx := stream.Context()
    peerAddr := "0.0.0.0"
    // 获取节点信息
    if peerInfo, ok := peer.FromContext(ctx); ok {
        peerAddr = peerInfo.Addr.String()
    }

    // 对节点身份进行验证
    ids, err := s.authenticate(ctx)
    if err != nil {
        return err
    }
    if ids != nil {
        adsLog.Debugf("Authenticated XDS: %v with identity %v", peerAddr, ids)
    } else {
        adsLog.Debuga("Unauthenticated XDS: ", peerAddr)
    }

    // 初始化上下文,因为启动了insecure,不确定哪一个先收到请求
    if err = s.globalPushContext().InitContext(s.Env, nil, nil); err != nil {
        adsLog.Warnf("Error reading config %v", err)
        return err
    }
    con := newConnection(peerAddr, stream)
    con.Identities = ids

    // 从客户端接收信息
    var receiveError error
    reqChannel := make(chan *discovery.DiscoveryRequest, 1)
    go s.receive(con, reqChannel, &receiveError)

    for {
        select {
        // 读取请求
        case req, ok := <-reqChannel:
            if !ok {
                return receiveError
            }
            // 返回信息给客户端
            err := s.processRequest(req, con)
            if err != nil {
                return err
            }

        // 获取推送channnel的数据,eventhandler控制器将根据资源变更发送数据到pushChannel
        case pushEv := <-con.pushChannel:
            err := s.pushConnection(con, pushEv)
            pushEv.done()
            if err != nil {
                return err
            }
        case <-con.stop:
            return nil
        }
    }
}
```

客户端身份认证

```
func (s *DiscoveryServer) authenticate(ctx context.Context) ([]string, error) {
    if !features.XDSAuth {
        return nil, nil
    }

    // 当前仅检查该请求是否具有使用我们的密钥签名的证书.受标志保护以避免破坏升级-应该在公开XDS的多集群/网格扩展中启用。
    peerInfo, ok := peer.FromContext(ctx)
    if !ok {
        return nil, errors.New("invalid context")
    }
    // 不是TLS连接,我们将不执行身份验证,
    if _, ok := peerInfo.AuthInfo.(credentials.TLSInfo); !ok {
        return nil, nil
    }
    authFailMsgs := []string{}
    // 遍历认证器,任何一个验证通过则认为通过
    for _, authn := range s.Authenticators {
        u, err := authn.Authenticate(ctx)
        if u != nil && u.Identities != nil && err == nil {
            return u.Identities, nil
        }
    ....
}
```

具体逻辑参见[18节](/deep-understanding-of-istio/6/18.md)

处理请求的连接

```
func (s *DiscoveryServer) processRequest(req *discovery.DiscoveryRequest, con *Connection) error {
    if s.StatusReporter != nil {
        s.StatusReporter.RegisterEvent(con.ConID, req.TypeUrl, req.ResponseNonce)
    }

    if !s.shouldRespond(con, req) {
        return nil
    }

    push := s.globalPushContext()

    // 推送对应数据给客户端
    return s.pushXds(con, push, versionInfo(), con.Watched(req.TypeUrl), &model.PushRequest{Full: true})
}
```

## 主动推送

```
func (s *DiscoveryServer) pushConnection(con *Connection, pushEv *Event) error {
    pushRequest := pushEv.pushRequest

    // 全量推送更新节点当前的信息
    if pushRequest.Full {
        // Update Proxy with current information.
        s.updateProxy(con.proxy, pushRequest.Push)
    }
    // 判断是否需要推送
    if !ProxyNeedsPush(con.proxy, pushEv) {
        adsLog.Debugf("Skipping push to %v, no updates required", con.ConID)
        // 只有全量推送增加版本,增量推送不更新版本
        if pushRequest.Full {
            reportAllEvents(s.StatusReporter, con.ConID, pushRequest.Push.Version, nil)
        }
        return nil
    }

    currentVersion := versionInfo()

    // 向所有生成器发送推送,每个生成器负责确定推送事件是否需要推送
    for _, w := range getPushResources(con.proxy.WatchedResources) {
        err := s.pushXds(con, pushRequest.Push, currentVersion, w, pushRequest)
        if err != nil {
            return err
        }
    }
    if pushRequest.Full {
        // 像unwatch资源报告所有事件.watch的资源将通过pushXds或ack报告。
        reportAllEvents(s.StatusReporter, con.ConID, pushRequest.Push.Version, con.proxy.WatchedResources)
    }

    proxiesConvergeDelay.Record(time.Since(pushRequest.Start).Seconds())
    return nil
}
```

```
func (s *DiscoveryServer) pushXds(con *Connection, push *model.PushContext,
    currentVersion string, w *model.WatchedResource, req *model.PushRequest) error {
    if w == nil {
        return nil
    }
    //根据请求url 确定Generator
    gen := s.findGenerator(w.TypeUrl, con)
    if gen == nil {
        return nil
    }

    t0 := time.Now()
    // 生成resource
    cl := gen.Generate(con.proxy, push, w, req)
    if cl == nil {
        //如果没有内容推送,返回ACK
        if s.StatusReporter != nil {
            s.StatusReporter.RegisterEvent(con.ConID, w.TypeUrl, push.Version)
        }
        return nil 
    }
    defer func() { recordPushTime(w.TypeUrl, time.Since(t0)) }()

    resp := &discovery.DiscoveryResponse{
        TypeUrl:     w.TypeUrl,
        VersionInfo: currentVersion,
        Nonce:       nonce(push.Version),
        Resources:   cl,
    }

    // 返回数据
    err := con.send(resp)
    if err != nil {
        recordSendError(w.TypeUrl, con.ConID, err)
        return err
    }

    if _, f := SkipLogTypes[w.TypeUrl]; !f {
        adsLog.Infof("%s: PUSH for node:%s resources:%d", v3.GetShortType(w.TypeUrl), con.proxy.ID, len(cl))
    }
    return nil
}
```

## cds

```
func (c CdsGenerator) Generate(proxy *model.Proxy, push *model.PushContext, w *model.WatchedResource, req *model.PushRequest) model.Resources {
    if !cdsNeedsPush(req, proxy) {
        return nil
    }
    rawClusters := c.Server.ConfigGenerator.BuildClusters(proxy, push)
    resources := model.Resources{}
    for _, c := range rawClusters {
        resources = append(resources, util.MessageToAny(c))
    }
    return resources
}
```

## eds

```
func (eds *EdsGenerator) Generate(proxy *model.Proxy, push *model.PushContext, w *model.WatchedResource, req *model.PushRequest) model.Resources {
    if !edsNeedsPush(req.ConfigsUpdated) {
        return nil
    }
    var edsUpdatedServices map[string]struct{}
    if !req.Full {
        edsUpdatedServices = model.ConfigNamesOfKind(req.ConfigsUpdated, gvk.ServiceEntry)
    }
    resources := make([]*any.Any, 0)
    empty := 0

    cached := 0
    regenerated := 0
    for _, clusterName := range w.ResourceNames {
        if edsUpdatedServices != nil {
            _, _, hostname, _ := model.ParseSubsetKey(clusterName)
            if _, ok := edsUpdatedServices[string(hostname)]; !ok {
                // Cluster was not updated, skip recomputing. This happens when we get an incremental update for a
                // specific Hostname. On connect or for full push edsUpdatedServices will be empty.
                continue
            }
        }
        builder := NewEndpointBuilder(clusterName, proxy, push)
        if marshalledEndpoint, f := eds.Server.Cache.Get(builder); f {
            resources = append(resources, marshalledEndpoint)
            cached++
        } else {
            l := eds.Server.generateEndpoints(builder)
            if l == nil {
                continue
            }
            regenerated++

            if len(l.Endpoints) == 0 {
                empty++
            }
            resource := util.MessageToAny(l)
            resources = append(resources, resource)
            eds.Server.Cache.Add(builder, resource)
        }
    }
    if len(edsUpdatedServices) == 0 {
        adsLog.Infof("EDS: PUSH for node:%s resources:%d empty:%v cached:%v/%v",
            proxy.ID, len(resources), empty, cached, cached+regenerated)
    } else {
        adsLog.Debugf("EDS: PUSH INC for node:%s clusters:%d empty:%v cached:%v/%v",
            proxy.ID, len(resources), empty, cached, cached+regenerated)
    }
    return resources
}
```

## lds

```
func (l LdsGenerator) Generate(proxy *model.Proxy, push *model.PushContext, w *model.WatchedResource, req *model.PushRequest) model.Resources {
    if !ldsNeedsPush(req) {
        return nil
    }
    listeners := l.Server.ConfigGenerator.BuildListeners(proxy, push)
    resources := model.Resources{}
    for _, c := range listeners {
        resources = append(resources, util.MessageToAny(c))
    }
    return resources
}
```

## nds

```
func (n NdsGenerator) Generate(proxy *model.Proxy, push *model.PushContext, w *model.WatchedResource, req *model.PushRequest) model.Resources {
    if !ndsNeedsPush(req) {
        return nil
    }
    nt := n.Server.ConfigGenerator.BuildNameTable(proxy, push)
    if nt == nil {
        return nil
    }
    resources := model.Resources{util.MessageToAny(nt)}
    return resources
}
```

## rds

```
func (c RdsGenerator) Generate(proxy *model.Proxy, push *model.PushContext, w *model.WatchedResource, req *model.PushRequest) model.Resources {
    if !rdsNeedsPush(req) {
        return nil
    }
    rawRoutes := c.Server.ConfigGenerator.BuildHTTPRoutes(proxy, push, w.ResourceNames)
    resources := model.Resources{}
    for _, c := range rawRoutes {
        resources = append(resources, util.MessageToAny(c))
    }
    return resources
}
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://rocdu.gitbook.io/deep-understanding-of-istio/6/15.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
