pilot-agent如何转发xds到istiod

判断是否由agent提供XDS服务

    if sa.cfg.ProxyXDSViaAgent {
        // 启动XDS
        sa.xdsProxy, err = initXdsProxy(sa)
        if err != nil {
            return nil, fmt.Errorf("failed to start xds proxy: %v", err)
        }
    }
    // 初始化XDS Server
    if err = proxy.initDownstreamServer(); err != nil {
        return nil, err
    }

    // 初始化连接上游客户端参数
    if proxy.istiodDialOptions, err = proxy.buildUpstreamClientDialOpts(ia); err != nil {
        return nil, err
    }

    //启动XDS Server
    go func() {
        if err := proxy.downstreamGrpcServer.Serve(proxy.downstreamListener); err != nil {
            log.Errorf("failed to accept downstream gRPC connection %v", err)
        }
    }()

处理下游连接

func (p *XdsProxy) StreamAggregatedResources(downstream discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
    proxyLog.Infof("Envoy ADS stream established")

    con := &ProxyConnection{
        upstreamError:   make(chan error),
        downstreamError: make(chan error),
        requestsChan:    make(chan *discovery.DiscoveryRequest, 10),
        responsesChan:   make(chan *discovery.DiscoveryResponse, 10),
        stopChan:        make(chan struct{}),
        downstream:      downstream,
    }

    p.RegisterStream(con)

    // Handle downstream xds
    firstNDSSent := false
    //从下游获取请求,发送给requestsChan
    go func() {
        for {
            // From Envoy
            req, err := downstream.Recv()
            if err != nil {
                con.downstreamError <- err
                return
            }
            // forward to istiod
            con.requestsChan <- req
            if p.localDNSServer != nil && !firstNDSSent && req.TypeUrl == v3.ListenerType {
                // fire off an initial NDS request
                con.requestsChan <- &discovery.DiscoveryRequest{
                    TypeUrl: v3.NameTableType,
                }
                firstNDSSent = true
            }
        }
    }()

    ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
    defer cancel()

    // 新建一个上游连接
    upstreamConn, err := grpc.DialContext(ctx, p.istiodAddress, p.istiodDialOptions...)
    if err != nil {
        proxyLog.Errorf("failed to connect to upstream %s: %v", p.istiodAddress, err)
        metrics.IstiodConnectionFailures.Increment()
        return err
    }
    defer upstreamConn.Close()

    xds := discovery.NewAggregatedDiscoveryServiceClient(upstreamConn)
    ctx = metadata.AppendToOutgoingContext(context.Background(), "ClusterID", p.clusterID)
    if p.agent.cfg.XDSHeaders != nil {
        for k, v := range p.agent.cfg.XDSHeaders {
            ctx = metadata.AppendToOutgoingContext(ctx, k, v)
        }
    }
    // We must propagate upstream termination to Envoy. This ensures that we resume the full XDS sequence on new connection
    return p.HandleUpstream(ctx, con, xds)
}

处理上游返回,发送给下游

func (p *XdsProxy) HandleUpstream(ctx context.Context, con *ProxyConnection, xds discovery.AggregatedDiscoveryServiceClient) error {
    proxyLog.Infof("connecting to upstream XDS server: %s", p.istiodAddress)
    defer proxyLog.Infof("disconnected from XDS server: %s", p.istiodAddress)
    upstream, err := xds.StreamAggregatedResources(ctx,
        grpc.MaxCallRecvMsgSize(defaultClientMaxReceiveMessageSize))
    if err != nil {
        proxyLog.Errorf("failed to create upstream grpc client: %v", err)
        return err
    }

    // Handle upstream xds
    // 上游的返回写入channel
    go func() {
        for {
            // from istiod
            resp, err := upstream.Recv()
            if err != nil {
                con.upstreamError <- err
                return
            }
            con.responsesChan <- resp
        }
    }()

    for {
        select {
        case err := <-con.upstreamError:
            // error from upstream Istiod.
            if isExpectedGRPCError(err) {
                proxyLog.Debugf("upstream terminated with status %v", err)
                metrics.IstiodConnectionCancellations.Increment()
            } else {
                proxyLog.Warnf("upstream terminated with unexpected error %v", err)
                metrics.IstiodConnectionErrors.Increment()
            }
            _ = upstream.CloseSend()
            return nil
        case err := <-con.downstreamError:
            // error from downstream Envoy.
            if isExpectedGRPCError(err) {
                proxyLog.Debugf("downstream terminated with status %v", err)
                metrics.EnvoyConnectionCancellations.Increment()
            } else {
                proxyLog.Warnf("downstream terminated with unexpected error %v", err)
                metrics.EnvoyConnectionErrors.Increment()
            }
            // On downstream error, we will return. This propagates the error to downstream envoy which will trigger reconnect
            return err
        //转发请求
        case req, ok := <-con.requestsChan:
            if !ok {
                return nil
            }
            proxyLog.Debugf("request for type url %s", req.TypeUrl)
            metrics.XdsProxyRequests.Increment()
            if err = sendUpstreamWithTimeout(ctx, upstream, req); err != nil {
                proxyLog.Errorf("upstream send error for type url %s: %v", req.TypeUrl, err)
                return err
            }
        // 返回给下游
        case resp, ok := <-con.responsesChan:
            if !ok {
                return nil
            }
            proxyLog.Debugf("response for type url %s", resp.TypeUrl)
            metrics.XdsProxyResponses.Increment()
            switch resp.TypeUrl {
            case v3.NameTableType:
                // intercept. This is for the dns server
                if p.localDNSServer != nil && len(resp.Resources) > 0 {
                    var nt nds.NameTable
                    // TODO we should probably send ACK and not update nametable here
                    if err = ptypes.UnmarshalAny(resp.Resources[0], &nt); err != nil {
                        log.Errorf("failed to unmarshall name table: %v", err)
                    }
                    p.localDNSServer.UpdateLookupTable(&nt)
                }

                // Send ACK
                con.requestsChan <- &discovery.DiscoveryRequest{
                    VersionInfo:   resp.VersionInfo,
                    TypeUrl:       v3.NameTableType,
                    ResponseNonce: resp.Nonce,
                }
            default:
                // TODO: Validate the known type urls before forwarding them to Envoy.
                if err := con.downstream.Send(resp); err != nil {
                    proxyLog.Errorf("downstream send error: %v", err)
                    // we cannot return partial error and hope to restart just the downstream
                    // as we are blindly proxying req/responses. For now, the best course of action
                    // is to terminate upstream connection as well and restart afresh.
                    return err
                }
            }
        case <-con.stopChan:
            _ = upstream.CloseSend()
            return nil
        }
    }
}

Last updated

Was this helpful?