DiscoveryServer
DiscoveryServer
DiscoveryServer是envoy xds api的 pilot grpc 实现,一遍负责监听grpc接口,另一边接收update事件,将数据返回给客户端
DiscoveryServer实现了 ads接口
func (s *DiscoveryServer) StreamAggregatedResources(stream discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {}
StreamAggregatedResources
// StreamAggregatedResources implements the ADS interface.
func (s *DiscoveryServer) StreamAggregatedResources(stream discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
// 检查服务器是否准备好接受客户端并处理新请求。 当前准备就绪意味着缓存已同步,因此可以正确构建集群。 如果不进行此检查,则下面的InitContext()调用将使用空配置初始化,从而导致重新连接的Envoy失去配置。 除了添加caches之外,这是一项额外的安全检查。已将逻辑同步到就绪探针以处理kube-proxy ip表更新延迟的情况。
// See https://github.com/istio/istio/issues/25495.
if !s.IsServerReady() {
return errors.New("server is not ready to serve discovery information")
}
// 获取上下文信息
ctx := stream.Context()
peerAddr := "0.0.0.0"
// 获取客户端信息
if peerInfo, ok := peer.FromContext(ctx); ok {
peerAddr = peerInfo.Addr.String()
}
// 验证客户端身份
ids, err := s.authenticate(ctx)
if err != nil {
return err
}
if ids != nil {
adsLog.Debugf("Authenticated XDS: %v with identity %v", peerAddr, ids)
} else {
adsLog.Debug("Unauthenticated XDS: ", peerAddr)
}
// 初始化pushcontent
if err = s.globalPushContext().InitContext(s.Env, nil, nil); err != nil {
// Error accessing the data - log and close, maybe a different pilot replica
// has more luck
adsLog.Warnf("Error reading config %v", err)
return err
}
con := newConnection(peerAddr, stream)
con.Identities = ids
// 从流中读取是阻塞操作。 每个连接都需要读取发现请求并等待配置更改时的推送命令,因此我们添加了go协程。 如果go grpc为流添加了gochannel支持,则不需要。 这也会检测到关闭。
var receiveError error
reqChannel := make(chan *discovery.DiscoveryRequest, 1)
go s.receive(con, reqChannel, &receiveError)
for {
// 阻塞直到接收到请求或触发推送。 我们需要2条go例程,因为Recv()中的"read"块。
// 为了避免2个协程,我们尝试在StreamAggregateResource中使用Recv() -并在推送发生时开始对不同的短暂go 协程的推送。 由于推动被限制,这将减少长时间运行的go协程的1/2。 主要问题是关闭-当前的gRPC库不允许关闭流。
select {
case req, ok := <-reqChannel:
if !ok {
// 远程链接已经关闭或者在处理时发生错误.
return receiveError
}
// processRequest调用pushXXX,并通过pushConnection访问常见的结构。 如果我们要保存1/2个线程,添加同步是要解决的第二个问题。
err := s.processRequest(req, con)
if err != nil {
return err
}
case pushEv := <-con.pushChannel:
// 在LDS和RDS之间获取初始配置时,该推送将丢失受监视的"路由"。 CDS/EDS间隔相同。 由于该协议,处理起来非常棘手-但是定期推送会从中恢复。
err := s.pushConnection(con, pushEv)
pushEv.done()
if err != nil {
return err
}
case <-con.stop:
return nil
}
}
}
processRequest
func (s *DiscoveryServer) processRequest(req *discovery.DiscoveryRequest, con *Connection) error {
// 预处理请求。 返回是否继续。
if !s.preProcessRequest(con.proxy, req) {
return nil
}
if s.StatusReporter != nil {
s.StatusReporter.RegisterEvent(con.ConID, req.TypeUrl, req.ResponseNonce)
}
// 请求是否需要被响应
shouldRespond := s.shouldRespond(con, req)
// 检查我们是否阻止了推送。 如果这是一个ACK,我们将发送它。 无论哪种方式,我们都将删除阻止的推送,因为我们将发送推送。
con.proxy.Lock()
request, haveBlockedPush := con.blockedPushes[req.TypeUrl]
delete(con.blockedPushes, req.TypeUrl)
con.proxy.Unlock()
if shouldRespond {
// 这是一个请求,触发此类型的完全推送覆盖阻止的推送(如果存在),因为可以保证此完全推送是我们从阻止的推送中可以推送的内容的超集。
request = &model.PushRequest{Full: true}
} else if !haveBlockedPush {
// 这是一个ACK,没有延迟的推送立即返回,无需采取任何措施
return nil
} else {
// 我们有一个被阻止的推送,我们将使用
adsLog.Debugf("%s: DEQUEUE for node:%s", v3.GetShortType(req.TypeUrl), con.proxy.ID)
}
push := s.globalPushContext()
return s.pushXds(con, push, versionInfo(), con.Watched(req.TypeUrl), request)
}
pushconnection
// Compute and send the new configuration for a connection. This is blocking and may be slow
// for large configs. The method will hold a lock on con.pushMutex.
func (s *DiscoveryServer) pushConnection(con *Connection, pushEv *Event) error {
pushRequest := pushEv.pushRequest
if pushRequest.Full {
// Update Proxy with current information.
s.updateProxy(con.proxy, pushRequest.Push)
}
if !ProxyNeedsPush(con.proxy, pushEv) {
adsLog.Debugf("Skipping push to %v, no updates required", con.ConID)
if pushRequest.Full {
// Only report for full versions, incremental pushes do not have a new version
reportAllEvents(s.StatusReporter, con.ConID, pushRequest.Push.Version, nil)
}
return nil
}
currentVersion := versionInfo()
// Send pushes to all generators
// Each Generator is responsible for determining if the push event requires a push
for _, w := range getWatchedResources(con.proxy.WatchedResources) {
if !features.EnableFlowControl {
// Always send the push if flow control disabled
if err := s.pushXds(con, pushRequest.Push, currentVersion, w, pushRequest); err != nil {
return err
}
continue
}
// If flow control is enabled, we will only push if we got an ACK for the previous response
synced, timeout := con.Synced(w.TypeUrl)
if !synced && timeout {
// We are not synced, but we have been stuck for too long. We will trigger the push anyways to
// avoid any scenario where this may deadlock.
// This can possibly be removed in the future if we find this never causes issues
totalDelayedPushes.With(typeTag.Value(v3.GetMetricType(w.TypeUrl))).Increment()
adsLog.Warnf("%s: QUEUE TIMEOUT for node:%s", v3.GetShortType(w.TypeUrl), con.proxy.ID)
}
if synced || timeout {
// Send the push now
if err := s.pushXds(con, pushRequest.Push, currentVersion, w, pushRequest); err != nil {
return err
}
} else {
// The type is not yet synced. Instead of pushing now, which may overload Envoy,
// we will wait until the last push is ACKed and trigger the push. See
// https://github.com/istio/istio/issues/25685 for details on the performance
// impact of sending pushes before Envoy ACKs.
totalDelayedPushes.With(typeTag.Value(v3.GetMetricType(w.TypeUrl))).Increment()
adsLog.Debugf("%s: QUEUE for node:%s", v3.GetShortType(w.TypeUrl), con.proxy.ID)
con.proxy.Lock()
con.blockedPushes[w.TypeUrl] = con.blockedPushes[w.TypeUrl].Merge(pushEv.pushRequest)
con.proxy.Unlock()
}
}
if pushRequest.Full {
// Report all events for unwatched resources. Watched resources will be reported in pushXds or on ack.
reportAllEvents(s.StatusReporter, con.ConID, pushRequest.Push.Version, con.proxy.WatchedResources)
}
proxiesConvergeDelay.Record(time.Since(pushRequest.Start).Seconds())
return nil
}
pushxds
func (s *DiscoveryServer) pushXds(con *Connection, push *model.PushContext,
currentVersion string, w *model.WatchedResource, req *model.PushRequest) error {
if w == nil {
return nil
}
gen := s.findGenerator(w.TypeUrl, con)
if gen == nil {
return nil
}
t0 := time.Now()
cl := gen.Generate(con.proxy, push, w, req)
if cl == nil {
// If we have nothing to send, report that we got an ACK for this version.
if s.StatusReporter != nil {
s.StatusReporter.RegisterEvent(con.ConID, w.TypeUrl, push.Version)
}
return nil // No push needed.
}
defer func() { recordPushTime(w.TypeUrl, time.Since(t0)) }()
resp := &discovery.DiscoveryResponse{
TypeUrl: w.TypeUrl,
VersionInfo: currentVersion,
Nonce: nonce(push.Version),
Resources: cl,
}
// Approximate size by looking at the Any marshaled size. This avoids high cost
// proto.Size, at the expense of slightly under counting.
size := 0
for _, r := range cl {
size += len(r.Value)
}
err := con.send(resp)
if err != nil {
recordSendError(w.TypeUrl, con.ConID, err)
return err
}
// Some types handle logs inside Generate, skip them here
if _, f := SkipLogTypes[w.TypeUrl]; !f {
adsLog.Infof("%s: PUSH for node:%s resources:%d size:%s", v3.GetShortType(w.TypeUrl), con.proxy.ID, len(cl), util.ByteCount(size))
}
return nil
}
Last updated