pilot-agent如何转发xds到istiod
判断是否由agent提供XDS服务
if sa.cfg.ProxyXDSViaAgent {
// 启动XDS
sa.xdsProxy, err = initXdsProxy(sa)
if err != nil {
return nil, fmt.Errorf("failed to start xds proxy: %v", err)
}
}
// 初始化XDS Server
if err = proxy.initDownstreamServer(); err != nil {
return nil, err
}
// 初始化连接上游客户端参数
if proxy.istiodDialOptions, err = proxy.buildUpstreamClientDialOpts(ia); err != nil {
return nil, err
}
//启动XDS Server
go func() {
if err := proxy.downstreamGrpcServer.Serve(proxy.downstreamListener); err != nil {
log.Errorf("failed to accept downstream gRPC connection %v", err)
}
}()
处理下游连接
func (p *XdsProxy) StreamAggregatedResources(downstream discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
proxyLog.Infof("Envoy ADS stream established")
con := &ProxyConnection{
upstreamError: make(chan error),
downstreamError: make(chan error),
requestsChan: make(chan *discovery.DiscoveryRequest, 10),
responsesChan: make(chan *discovery.DiscoveryResponse, 10),
stopChan: make(chan struct{}),
downstream: downstream,
}
p.RegisterStream(con)
// Handle downstream xds
firstNDSSent := false
//从下游获取请求,发送给requestsChan
go func() {
for {
// From Envoy
req, err := downstream.Recv()
if err != nil {
con.downstreamError <- err
return
}
// forward to istiod
con.requestsChan <- req
if p.localDNSServer != nil && !firstNDSSent && req.TypeUrl == v3.ListenerType {
// fire off an initial NDS request
con.requestsChan <- &discovery.DiscoveryRequest{
TypeUrl: v3.NameTableType,
}
firstNDSSent = true
}
}
}()
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
// 新建一个上游连接
upstreamConn, err := grpc.DialContext(ctx, p.istiodAddress, p.istiodDialOptions...)
if err != nil {
proxyLog.Errorf("failed to connect to upstream %s: %v", p.istiodAddress, err)
metrics.IstiodConnectionFailures.Increment()
return err
}
defer upstreamConn.Close()
xds := discovery.NewAggregatedDiscoveryServiceClient(upstreamConn)
ctx = metadata.AppendToOutgoingContext(context.Background(), "ClusterID", p.clusterID)
if p.agent.cfg.XDSHeaders != nil {
for k, v := range p.agent.cfg.XDSHeaders {
ctx = metadata.AppendToOutgoingContext(ctx, k, v)
}
}
// We must propagate upstream termination to Envoy. This ensures that we resume the full XDS sequence on new connection
return p.HandleUpstream(ctx, con, xds)
}
处理上游返回,发送给下游
func (p *XdsProxy) HandleUpstream(ctx context.Context, con *ProxyConnection, xds discovery.AggregatedDiscoveryServiceClient) error {
proxyLog.Infof("connecting to upstream XDS server: %s", p.istiodAddress)
defer proxyLog.Infof("disconnected from XDS server: %s", p.istiodAddress)
upstream, err := xds.StreamAggregatedResources(ctx,
grpc.MaxCallRecvMsgSize(defaultClientMaxReceiveMessageSize))
if err != nil {
proxyLog.Errorf("failed to create upstream grpc client: %v", err)
return err
}
// Handle upstream xds
// 上游的返回写入channel
go func() {
for {
// from istiod
resp, err := upstream.Recv()
if err != nil {
con.upstreamError <- err
return
}
con.responsesChan <- resp
}
}()
for {
select {
case err := <-con.upstreamError:
// error from upstream Istiod.
if isExpectedGRPCError(err) {
proxyLog.Debugf("upstream terminated with status %v", err)
metrics.IstiodConnectionCancellations.Increment()
} else {
proxyLog.Warnf("upstream terminated with unexpected error %v", err)
metrics.IstiodConnectionErrors.Increment()
}
_ = upstream.CloseSend()
return nil
case err := <-con.downstreamError:
// error from downstream Envoy.
if isExpectedGRPCError(err) {
proxyLog.Debugf("downstream terminated with status %v", err)
metrics.EnvoyConnectionCancellations.Increment()
} else {
proxyLog.Warnf("downstream terminated with unexpected error %v", err)
metrics.EnvoyConnectionErrors.Increment()
}
// On downstream error, we will return. This propagates the error to downstream envoy which will trigger reconnect
return err
//转发请求
case req, ok := <-con.requestsChan:
if !ok {
return nil
}
proxyLog.Debugf("request for type url %s", req.TypeUrl)
metrics.XdsProxyRequests.Increment()
if err = sendUpstreamWithTimeout(ctx, upstream, req); err != nil {
proxyLog.Errorf("upstream send error for type url %s: %v", req.TypeUrl, err)
return err
}
// 返回给下游
case resp, ok := <-con.responsesChan:
if !ok {
return nil
}
proxyLog.Debugf("response for type url %s", resp.TypeUrl)
metrics.XdsProxyResponses.Increment()
switch resp.TypeUrl {
case v3.NameTableType:
// intercept. This is for the dns server
if p.localDNSServer != nil && len(resp.Resources) > 0 {
var nt nds.NameTable
// TODO we should probably send ACK and not update nametable here
if err = ptypes.UnmarshalAny(resp.Resources[0], &nt); err != nil {
log.Errorf("failed to unmarshall name table: %v", err)
}
p.localDNSServer.UpdateLookupTable(&nt)
}
// Send ACK
con.requestsChan <- &discovery.DiscoveryRequest{
VersionInfo: resp.VersionInfo,
TypeUrl: v3.NameTableType,
ResponseNonce: resp.Nonce,
}
default:
// TODO: Validate the known type urls before forwarding them to Envoy.
if err := con.downstream.Send(resp); err != nil {
proxyLog.Errorf("downstream send error: %v", err)
// we cannot return partial error and hope to restart just the downstream
// as we are blindly proxying req/responses. For now, the best course of action
// is to terminate upstream connection as well and restart afresh.
return err
}
}
case <-con.stopChan:
_ = upstream.CloseSend()
return nil
}
}
}
Last updated