# pilot-agent如何转发xds到istiod

判断是否由agent提供XDS服务

```
    if sa.cfg.ProxyXDSViaAgent {
        // 启动XDS
        sa.xdsProxy, err = initXdsProxy(sa)
        if err != nil {
            return nil, fmt.Errorf("failed to start xds proxy: %v", err)
        }
    }
```

```
    // 初始化XDS Server
    if err = proxy.initDownstreamServer(); err != nil {
        return nil, err
    }

    // 初始化连接上游客户端参数
    if proxy.istiodDialOptions, err = proxy.buildUpstreamClientDialOpts(ia); err != nil {
        return nil, err
    }

    //启动XDS Server
    go func() {
        if err := proxy.downstreamGrpcServer.Serve(proxy.downstreamListener); err != nil {
            log.Errorf("failed to accept downstream gRPC connection %v", err)
        }
    }()
```

处理下游连接

```go
func (p *XdsProxy) StreamAggregatedResources(downstream discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
    proxyLog.Infof("Envoy ADS stream established")

    con := &ProxyConnection{
        upstreamError:   make(chan error),
        downstreamError: make(chan error),
        requestsChan:    make(chan *discovery.DiscoveryRequest, 10),
        responsesChan:   make(chan *discovery.DiscoveryResponse, 10),
        stopChan:        make(chan struct{}),
        downstream:      downstream,
    }

    p.RegisterStream(con)

    // Handle downstream xds
    firstNDSSent := false
    //从下游获取请求,发送给requestsChan
    go func() {
        for {
            // From Envoy
            req, err := downstream.Recv()
            if err != nil {
                con.downstreamError <- err
                return
            }
            // forward to istiod
            con.requestsChan <- req
            if p.localDNSServer != nil && !firstNDSSent && req.TypeUrl == v3.ListenerType {
                // fire off an initial NDS request
                con.requestsChan <- &discovery.DiscoveryRequest{
                    TypeUrl: v3.NameTableType,
                }
                firstNDSSent = true
            }
        }
    }()

    ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
    defer cancel()

    // 新建一个上游连接
    upstreamConn, err := grpc.DialContext(ctx, p.istiodAddress, p.istiodDialOptions...)
    if err != nil {
        proxyLog.Errorf("failed to connect to upstream %s: %v", p.istiodAddress, err)
        metrics.IstiodConnectionFailures.Increment()
        return err
    }
    defer upstreamConn.Close()

    xds := discovery.NewAggregatedDiscoveryServiceClient(upstreamConn)
    ctx = metadata.AppendToOutgoingContext(context.Background(), "ClusterID", p.clusterID)
    if p.agent.cfg.XDSHeaders != nil {
        for k, v := range p.agent.cfg.XDSHeaders {
            ctx = metadata.AppendToOutgoingContext(ctx, k, v)
        }
    }
    // We must propagate upstream termination to Envoy. This ensures that we resume the full XDS sequence on new connection
    return p.HandleUpstream(ctx, con, xds)
}
```

处理上游返回,发送给下游

```go
func (p *XdsProxy) HandleUpstream(ctx context.Context, con *ProxyConnection, xds discovery.AggregatedDiscoveryServiceClient) error {
    proxyLog.Infof("connecting to upstream XDS server: %s", p.istiodAddress)
    defer proxyLog.Infof("disconnected from XDS server: %s", p.istiodAddress)
    upstream, err := xds.StreamAggregatedResources(ctx,
        grpc.MaxCallRecvMsgSize(defaultClientMaxReceiveMessageSize))
    if err != nil {
        proxyLog.Errorf("failed to create upstream grpc client: %v", err)
        return err
    }

    // Handle upstream xds
    // 上游的返回写入channel
    go func() {
        for {
            // from istiod
            resp, err := upstream.Recv()
            if err != nil {
                con.upstreamError <- err
                return
            }
            con.responsesChan <- resp
        }
    }()

    for {
        select {
        case err := <-con.upstreamError:
            // error from upstream Istiod.
            if isExpectedGRPCError(err) {
                proxyLog.Debugf("upstream terminated with status %v", err)
                metrics.IstiodConnectionCancellations.Increment()
            } else {
                proxyLog.Warnf("upstream terminated with unexpected error %v", err)
                metrics.IstiodConnectionErrors.Increment()
            }
            _ = upstream.CloseSend()
            return nil
        case err := <-con.downstreamError:
            // error from downstream Envoy.
            if isExpectedGRPCError(err) {
                proxyLog.Debugf("downstream terminated with status %v", err)
                metrics.EnvoyConnectionCancellations.Increment()
            } else {
                proxyLog.Warnf("downstream terminated with unexpected error %v", err)
                metrics.EnvoyConnectionErrors.Increment()
            }
            // On downstream error, we will return. This propagates the error to downstream envoy which will trigger reconnect
            return err
        //转发请求
        case req, ok := <-con.requestsChan:
            if !ok {
                return nil
            }
            proxyLog.Debugf("request for type url %s", req.TypeUrl)
            metrics.XdsProxyRequests.Increment()
            if err = sendUpstreamWithTimeout(ctx, upstream, req); err != nil {
                proxyLog.Errorf("upstream send error for type url %s: %v", req.TypeUrl, err)
                return err
            }
        // 返回给下游
        case resp, ok := <-con.responsesChan:
            if !ok {
                return nil
            }
            proxyLog.Debugf("response for type url %s", resp.TypeUrl)
            metrics.XdsProxyResponses.Increment()
            switch resp.TypeUrl {
            case v3.NameTableType:
                // intercept. This is for the dns server
                if p.localDNSServer != nil && len(resp.Resources) > 0 {
                    var nt nds.NameTable
                    // TODO we should probably send ACK and not update nametable here
                    if err = ptypes.UnmarshalAny(resp.Resources[0], &nt); err != nil {
                        log.Errorf("failed to unmarshall name table: %v", err)
                    }
                    p.localDNSServer.UpdateLookupTable(&nt)
                }

                // Send ACK
                con.requestsChan <- &discovery.DiscoveryRequest{
                    VersionInfo:   resp.VersionInfo,
                    TypeUrl:       v3.NameTableType,
                    ResponseNonce: resp.Nonce,
                }
            default:
                // TODO: Validate the known type urls before forwarding them to Envoy.
                if err := con.downstream.Send(resp); err != nil {
                    proxyLog.Errorf("downstream send error: %v", err)
                    // we cannot return partial error and hope to restart just the downstream
                    // as we are blindly proxying req/responses. For now, the best course of action
                    // is to terminate upstream connection as well and restart afresh.
                    return err
                }
            }
        case <-con.stopChan:
            _ = upstream.CloseSend()
            return nil
        }
    }
}
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://rocdu.gitbook.io/deep-understanding-of-istio/6/5.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
