DiscoveryServer
DiscoveryServer
DiscoveryServer是envoy xds api的 pilot grpc 实现,一遍负责监听grpc接口,另一边接收update事件,将数据返回给客户端
DiscoveryServer实现了 ads接口
func (s *DiscoveryServer) StreamAggregatedResources(stream discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {}StreamAggregatedResources
// StreamAggregatedResources implements the ADS interface.
func (s *DiscoveryServer) StreamAggregatedResources(stream discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
// 检查服务器是否准备好接受客户端并处理新请求。 当前准备就绪意味着缓存已同步,因此可以正确构建集群。 如果不进行此检查,则下面的InitContext()调用将使用空配置初始化,从而导致重新连接的Envoy失去配置。 除了添加caches之外,这是一项额外的安全检查。已将逻辑同步到就绪探针以处理kube-proxy ip表更新延迟的情况。
// See https://github.com/istio/istio/issues/25495.
if !s.IsServerReady() {
return errors.New("server is not ready to serve discovery information")
}
// 获取上下文信息
ctx := stream.Context()
peerAddr := "0.0.0.0"
// 获取客户端信息
if peerInfo, ok := peer.FromContext(ctx); ok {
peerAddr = peerInfo.Addr.String()
}
// 验证客户端身份
ids, err := s.authenticate(ctx)
if err != nil {
return err
}
if ids != nil {
adsLog.Debugf("Authenticated XDS: %v with identity %v", peerAddr, ids)
} else {
adsLog.Debug("Unauthenticated XDS: ", peerAddr)
}
// 初始化pushcontent
if err = s.globalPushContext().InitContext(s.Env, nil, nil); err != nil {
// Error accessing the data - log and close, maybe a different pilot replica
// has more luck
adsLog.Warnf("Error reading config %v", err)
return err
}
con := newConnection(peerAddr, stream)
con.Identities = ids
// 从流中读取是阻塞操作。 每个连接都需要读取发现请求并等待配置更改时的推送命令,因此我们添加了go协程。 如果go grpc为流添加了gochannel支持,则不需要。 这也会检测到关闭。
var receiveError error
reqChannel := make(chan *discovery.DiscoveryRequest, 1)
go s.receive(con, reqChannel, &receiveError)
for {
// 阻塞直到接收到请求或触发推送。 我们需要2条go例程,因为Recv()中的"read"块。
// 为了避免2个协程,我们尝试在StreamAggregateResource中使用Recv() -并在推送发生时开始对不同的短暂go 协程的推送。 由于推动被限制,这将减少长时间运行的go协程的1/2。 主要问题是关闭-当前的gRPC库不允许关闭流。
select {
case req, ok := <-reqChannel:
if !ok {
// 远程链接已经关闭或者在处理时发生错误.
return receiveError
}
// processRequest调用pushXXX,并通过pushConnection访问常见的结构。 如果我们要保存1/2个线程,添加同步是要解决的第二个问题。
err := s.processRequest(req, con)
if err != nil {
return err
}
case pushEv := <-con.pushChannel:
// 在LDS和RDS之间获取初始配置时,该推送将丢失受监视的"路由"。 CDS/EDS间隔相同。 由于该协议,处理起来非常棘手-但是定期推送会从中恢复。
err := s.pushConnection(con, pushEv)
pushEv.done()
if err != nil {
return err
}
case <-con.stop:
return nil
}
}
}processRequest
pushconnection
pushxds
Last updated
Was this helpful?