XDS Server

XDS Server

istiod 中 xds Server 分为Secure/insecure 两种

initSecureDiscoveryService

对于启用tls的DiscoveryService需要先初始化spiffe验证器,对应的方法为setPeerCertVerifier

setPeerCertVerifier

func (s *Server) setPeerCertVerifier(tlsOptions TLSOptions) error {
    if tlsOptions.CaCertFile == "" && s.CA == nil && features.SpiffeBundleEndpoints == "" {
        // Running locally without configured certs - no TLS mode
        return nil
    }
    s.peerCertVerifier = spiffe.NewPeerCertVerifier()
    var rootCertBytes []byte
    var err error
    // 判断是否手动指定
    if tlsOptions.CaCertFile != "" {
        if rootCertBytes, err = ioutil.ReadFile(tlsOptions.CaCertFile); err != nil {
            return err
        }
    } else {
        // 加载RA cert
        if s.RA != nil {
            rootCertBytes = append(rootCertBytes, s.RA.GetCAKeyCertBundle().GetRootCertPem()...)
        }
        // 加载CA cert
        if s.CA != nil {
            rootCertBytes = append(rootCertBytes, s.CA.GetCAKeyCertBundle().GetRootCertPem()...)
        }
    }

    if len(rootCertBytes) != 0 {
        // 根据信任域添加添加CA证书到certPools/generalCertPool
        err := s.peerCertVerifier.AddMappingFromPEM(spiffe.GetTrustDomain(), rootCertBytes)
        if err != nil {
            log.Errorf("Add Root CAs into peerCertVerifier failed: %v", err)
            return fmt.Errorf("add root CAs into peerCertVerifier failed: %v", err)
        }
    }

    if features.SpiffeBundleEndpoints != "" {
        certMap, err := spiffe.RetrieveSpiffeBundleRootCertsFromStringInput(
            features.SpiffeBundleEndpoints, []*x509.Certificate{})
        if err != nil {
            return err
        }
        s.peerCertVerifier.AddMappings(certMap)
    }

    return nil
}

在initSecureDiscoveryService初始化grpc server时通过该验证器进行验证客户端身份

    cfg := &tls.Config{
        GetCertificate: s.getIstiodCertificate,
        ClientAuth:     tls.VerifyClientCertIfGiven,
        ClientCAs:      s.peerCertVerifier.GetGeneralCertPool(),
        VerifyPeerCertificate: func(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error {
            err := s.peerCertVerifier.VerifyPeerCert(rawCerts, verifiedChains)
            if err != nil {
                log.Infof("Could not verify certificate: %v", err)
            }
            return err
        },
    }

具体验证客户端cert的逻辑如下

func (v *PeerCertVerifier) VerifyPeerCert(rawCerts [][]byte, _ [][]*x509.Certificate) error {
    if len(rawCerts) == 0 {
        // Peer doesn't present a certificate. Just skip. Other authn methods may be used.
        return nil
    }
    var peerCert *x509.Certificate
    intCertPool := x509.NewCertPool()
    for id, rawCert := range rawCerts {
        cert, err := x509.ParseCertificate(rawCert)
        if err != nil {
            return err
        }
        if id == 0 {
            peerCert = cert
        } else {
            intCertPool.AddCert(cert)
        }
    }
    if len(peerCert.URIs) != 1 {
        return fmt.Errorf("peer certificate does not contain 1 URI type SAN, detected %d", len(peerCert.URIs))
    }

    //根据证书的URI获取信任域
    trustDomain, err := GetTrustDomainFromURISAN(peerCert.URIs[0].String())
    if err != nil {
        return err
    }
    // 根据信任域获取对应的根证书
    rootCertPool, ok := v.certPools[trustDomain]
    if !ok {
        return fmt.Errorf("no cert pool found for trust domain %s", trustDomain)
    }
    // 验证客户端证书
    _, err = peerCert.Verify(x509.VerifyOptions{
        Roots:         rootCertPool,
        Intermediates: intCertPool,
    })
    return err
}

注册handler s.XDSServer.Register(s.secureGrpcServer)

StreamAggregatedResources 实现了envoy ADS接口

func (s *DiscoveryServer) StreamAggregatedResources(stream discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
    // 检查服务器是否准备好接受客户端并处理新请求.当前准备就绪意味着缓存已同步,因此可以正确构建集群.如果不进行此检查,则下面的InitContext()调用将使用空配置初始化,从而导致重新连接的Envoy失去配置.这是除了添加caches之外的另一项安全检查。已将逻辑同步到就绪探针以处理kube-proxy ip表更新延迟的情况。
    if !s.IsServerReady() {
        return errors.New("server is not ready to serve discovery information")
    }

    ctx := stream.Context()
    peerAddr := "0.0.0.0"
    // 获取节点信息
    if peerInfo, ok := peer.FromContext(ctx); ok {
        peerAddr = peerInfo.Addr.String()
    }

    // 对节点身份进行验证
    ids, err := s.authenticate(ctx)
    if err != nil {
        return err
    }
    if ids != nil {
        adsLog.Debugf("Authenticated XDS: %v with identity %v", peerAddr, ids)
    } else {
        adsLog.Debuga("Unauthenticated XDS: ", peerAddr)
    }

    // 初始化上下文,因为启动了insecure,不确定哪一个先收到请求
    if err = s.globalPushContext().InitContext(s.Env, nil, nil); err != nil {
        adsLog.Warnf("Error reading config %v", err)
        return err
    }
    con := newConnection(peerAddr, stream)
    con.Identities = ids

    // 从客户端接收信息
    var receiveError error
    reqChannel := make(chan *discovery.DiscoveryRequest, 1)
    go s.receive(con, reqChannel, &receiveError)

    for {
        select {
        // 读取请求
        case req, ok := <-reqChannel:
            if !ok {
                return receiveError
            }
            // 返回信息给客户端
            err := s.processRequest(req, con)
            if err != nil {
                return err
            }

        // 获取推送channnel的数据,eventhandler控制器将根据资源变更发送数据到pushChannel
        case pushEv := <-con.pushChannel:
            err := s.pushConnection(con, pushEv)
            pushEv.done()
            if err != nil {
                return err
            }
        case <-con.stop:
            return nil
        }
    }
}

客户端身份认证

func (s *DiscoveryServer) authenticate(ctx context.Context) ([]string, error) {
    if !features.XDSAuth {
        return nil, nil
    }

    // 当前仅检查该请求是否具有使用我们的密钥签名的证书.受标志保护以避免破坏升级-应该在公开XDS的多集群/网格扩展中启用。
    peerInfo, ok := peer.FromContext(ctx)
    if !ok {
        return nil, errors.New("invalid context")
    }
    // 不是TLS连接,我们将不执行身份验证,
    if _, ok := peerInfo.AuthInfo.(credentials.TLSInfo); !ok {
        return nil, nil
    }
    authFailMsgs := []string{}
    // 遍历认证器,任何一个验证通过则认为通过
    for _, authn := range s.Authenticators {
        u, err := authn.Authenticate(ctx)
        if u != nil && u.Identities != nil && err == nil {
            return u.Identities, nil
        }
    ....
}

具体逻辑参见18节

处理请求的连接

func (s *DiscoveryServer) processRequest(req *discovery.DiscoveryRequest, con *Connection) error {
    if s.StatusReporter != nil {
        s.StatusReporter.RegisterEvent(con.ConID, req.TypeUrl, req.ResponseNonce)
    }

    if !s.shouldRespond(con, req) {
        return nil
    }

    push := s.globalPushContext()

    // 推送对应数据给客户端
    return s.pushXds(con, push, versionInfo(), con.Watched(req.TypeUrl), &model.PushRequest{Full: true})
}

主动推送

func (s *DiscoveryServer) pushConnection(con *Connection, pushEv *Event) error {
    pushRequest := pushEv.pushRequest

    // 全量推送更新节点当前的信息
    if pushRequest.Full {
        // Update Proxy with current information.
        s.updateProxy(con.proxy, pushRequest.Push)
    }
    // 判断是否需要推送
    if !ProxyNeedsPush(con.proxy, pushEv) {
        adsLog.Debugf("Skipping push to %v, no updates required", con.ConID)
        // 只有全量推送增加版本,增量推送不更新版本
        if pushRequest.Full {
            reportAllEvents(s.StatusReporter, con.ConID, pushRequest.Push.Version, nil)
        }
        return nil
    }

    currentVersion := versionInfo()

    // 向所有生成器发送推送,每个生成器负责确定推送事件是否需要推送
    for _, w := range getPushResources(con.proxy.WatchedResources) {
        err := s.pushXds(con, pushRequest.Push, currentVersion, w, pushRequest)
        if err != nil {
            return err
        }
    }
    if pushRequest.Full {
        // 像unwatch资源报告所有事件.watch的资源将通过pushXds或ack报告。
        reportAllEvents(s.StatusReporter, con.ConID, pushRequest.Push.Version, con.proxy.WatchedResources)
    }

    proxiesConvergeDelay.Record(time.Since(pushRequest.Start).Seconds())
    return nil
}
func (s *DiscoveryServer) pushXds(con *Connection, push *model.PushContext,
    currentVersion string, w *model.WatchedResource, req *model.PushRequest) error {
    if w == nil {
        return nil
    }
    //根据请求url 确定Generator
    gen := s.findGenerator(w.TypeUrl, con)
    if gen == nil {
        return nil
    }

    t0 := time.Now()
    // 生成resource
    cl := gen.Generate(con.proxy, push, w, req)
    if cl == nil {
        //如果没有内容推送,返回ACK
        if s.StatusReporter != nil {
            s.StatusReporter.RegisterEvent(con.ConID, w.TypeUrl, push.Version)
        }
        return nil 
    }
    defer func() { recordPushTime(w.TypeUrl, time.Since(t0)) }()

    resp := &discovery.DiscoveryResponse{
        TypeUrl:     w.TypeUrl,
        VersionInfo: currentVersion,
        Nonce:       nonce(push.Version),
        Resources:   cl,
    }

    // 返回数据
    err := con.send(resp)
    if err != nil {
        recordSendError(w.TypeUrl, con.ConID, err)
        return err
    }

    if _, f := SkipLogTypes[w.TypeUrl]; !f {
        adsLog.Infof("%s: PUSH for node:%s resources:%d", v3.GetShortType(w.TypeUrl), con.proxy.ID, len(cl))
    }
    return nil
}

cds

func (c CdsGenerator) Generate(proxy *model.Proxy, push *model.PushContext, w *model.WatchedResource, req *model.PushRequest) model.Resources {
    if !cdsNeedsPush(req, proxy) {
        return nil
    }
    rawClusters := c.Server.ConfigGenerator.BuildClusters(proxy, push)
    resources := model.Resources{}
    for _, c := range rawClusters {
        resources = append(resources, util.MessageToAny(c))
    }
    return resources
}

eds

func (eds *EdsGenerator) Generate(proxy *model.Proxy, push *model.PushContext, w *model.WatchedResource, req *model.PushRequest) model.Resources {
    if !edsNeedsPush(req.ConfigsUpdated) {
        return nil
    }
    var edsUpdatedServices map[string]struct{}
    if !req.Full {
        edsUpdatedServices = model.ConfigNamesOfKind(req.ConfigsUpdated, gvk.ServiceEntry)
    }
    resources := make([]*any.Any, 0)
    empty := 0

    cached := 0
    regenerated := 0
    for _, clusterName := range w.ResourceNames {
        if edsUpdatedServices != nil {
            _, _, hostname, _ := model.ParseSubsetKey(clusterName)
            if _, ok := edsUpdatedServices[string(hostname)]; !ok {
                // Cluster was not updated, skip recomputing. This happens when we get an incremental update for a
                // specific Hostname. On connect or for full push edsUpdatedServices will be empty.
                continue
            }
        }
        builder := NewEndpointBuilder(clusterName, proxy, push)
        if marshalledEndpoint, f := eds.Server.Cache.Get(builder); f {
            resources = append(resources, marshalledEndpoint)
            cached++
        } else {
            l := eds.Server.generateEndpoints(builder)
            if l == nil {
                continue
            }
            regenerated++

            if len(l.Endpoints) == 0 {
                empty++
            }
            resource := util.MessageToAny(l)
            resources = append(resources, resource)
            eds.Server.Cache.Add(builder, resource)
        }
    }
    if len(edsUpdatedServices) == 0 {
        adsLog.Infof("EDS: PUSH for node:%s resources:%d empty:%v cached:%v/%v",
            proxy.ID, len(resources), empty, cached, cached+regenerated)
    } else {
        adsLog.Debugf("EDS: PUSH INC for node:%s clusters:%d empty:%v cached:%v/%v",
            proxy.ID, len(resources), empty, cached, cached+regenerated)
    }
    return resources
}

lds

func (l LdsGenerator) Generate(proxy *model.Proxy, push *model.PushContext, w *model.WatchedResource, req *model.PushRequest) model.Resources {
    if !ldsNeedsPush(req) {
        return nil
    }
    listeners := l.Server.ConfigGenerator.BuildListeners(proxy, push)
    resources := model.Resources{}
    for _, c := range listeners {
        resources = append(resources, util.MessageToAny(c))
    }
    return resources
}

nds

func (n NdsGenerator) Generate(proxy *model.Proxy, push *model.PushContext, w *model.WatchedResource, req *model.PushRequest) model.Resources {
    if !ndsNeedsPush(req) {
        return nil
    }
    nt := n.Server.ConfigGenerator.BuildNameTable(proxy, push)
    if nt == nil {
        return nil
    }
    resources := model.Resources{util.MessageToAny(nt)}
    return resources
}

rds

func (c RdsGenerator) Generate(proxy *model.Proxy, push *model.PushContext, w *model.WatchedResource, req *model.PushRequest) model.Resources {
    if !rdsNeedsPush(req) {
        return nil
    }
    rawRoutes := c.Server.ConfigGenerator.BuildHTTPRoutes(proxy, push, w.ResourceNames)
    resources := model.Resources{}
    for _, c := range rawRoutes {
        resources = append(resources, util.MessageToAny(c))
    }
    return resources
}

Last updated

Was this helpful?