DiscoveryServer

DiscoveryServer

DiscoveryServer是envoy xds api的 pilot grpc 实现,一遍负责监听grpc接口,另一边接收update事件,将数据返回给客户端

DiscoveryServer实现了 ads接口

func (s *DiscoveryServer) StreamAggregatedResources(stream discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {}

StreamAggregatedResources

// StreamAggregatedResources implements the ADS interface.
func (s *DiscoveryServer) StreamAggregatedResources(stream discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
    // 检查服务器是否准备好接受客户端并处理新请求。 当前准备就绪意味着缓存已同步,因此可以正确构建集群。 如果不进行此检查,则下面的InitContext()调用将使用空配置初始化,从而导致重新连接的Envoy失去配置。 除了添加caches之外,这是一项额外的安全检查。已将逻辑同步到就绪探针以处理kube-proxy ip表更新延迟的情况。
    // See https://github.com/istio/istio/issues/25495.
    if !s.IsServerReady() {
        return errors.New("server is not ready to serve discovery information")
    }
    // 获取上下文信息
    ctx := stream.Context()
    peerAddr := "0.0.0.0"
    // 获取客户端信息
    if peerInfo, ok := peer.FromContext(ctx); ok {
        peerAddr = peerInfo.Addr.String()
    }

    // 验证客户端身份
    ids, err := s.authenticate(ctx)
    if err != nil {
        return err
    }
    if ids != nil {
        adsLog.Debugf("Authenticated XDS: %v with identity %v", peerAddr, ids)
    } else {
        adsLog.Debug("Unauthenticated XDS: ", peerAddr)
    }

    // 初始化pushcontent
    if err = s.globalPushContext().InitContext(s.Env, nil, nil); err != nil {
        // Error accessing the data - log and close, maybe a different pilot replica
        // has more luck
        adsLog.Warnf("Error reading config %v", err)
        return err
    }
    con := newConnection(peerAddr, stream)
    con.Identities = ids

    // 从流中读取是阻塞操作。 每个连接都需要读取发现请求并等待配置更改时的推送命令,因此我们添加了go协程。 如果go grpc为流添加了gochannel支持,则不需要。 这也会检测到关闭。
    var receiveError error
    reqChannel := make(chan *discovery.DiscoveryRequest, 1)
    go s.receive(con, reqChannel, &receiveError)

    for {
        // 阻塞直到接收到请求或触发推送。 我们需要2条go例程,因为Recv()中的"read"块。
        // 为了避免2个协程,我们尝试在StreamAggregateResource中使用Recv() -并在推送发生时开始对不同的短暂go 协程的推送。 由于推动被限制,这将减少长时间运行的go协程的1/2。 主要问题是关闭-当前的gRPC库不允许关闭流。
        select {
        case req, ok := <-reqChannel:
            if !ok {
                // 远程链接已经关闭或者在处理时发生错误.
                return receiveError
            }
            // processRequest调用pushXXX,并通过pushConnection访问常见的结构。 如果我们要保存1/2个线程,添加同步是要解决的第二个问题。
            err := s.processRequest(req, con)
            if err != nil {
                return err
            }

        case pushEv := <-con.pushChannel:
            // 在LDS和RDS之间获取初始配置时,该推送将丢失受监视的"路由"。 CDS/EDS间隔相同。 由于该协议,处理起来非常棘手-但是定期推送会从中恢复。
            err := s.pushConnection(con, pushEv)
            pushEv.done()
            if err != nil {
                return err
            }
        case <-con.stop:
            return nil
        }
    }
}

processRequest

pushconnection

pushxds

Last updated

Was this helpful?