XDS Server
XDS Server
istiod 中 xds Server 分为Secure/insecure 两种
func (s *Server) setPeerCertVerifier(tlsOptions TLSOptions) error {
if tlsOptions.CaCertFile == "" && s.CA == nil && features.SpiffeBundleEndpoints == "" {
// Running locally without configured certs - no TLS mode
return nil
s.peerCertVerifier = spiffe.NewPeerCertVerifier()
var rootCertBytes []byte
var err error
// 判断是否手动指定
if tlsOptions.CaCertFile != "" {
if rootCertBytes, err = ioutil.ReadFile(tlsOptions.CaCertFile); err != nil {
return err
} else {
// 加载RA cert
if s.RA != nil {
rootCertBytes = append(rootCertBytes, s.RA.GetCAKeyCertBundle().GetRootCertPem()...)
// 加载CA cert
if s.CA != nil {
rootCertBytes = append(rootCertBytes, s.CA.GetCAKeyCertBundle().GetRootCertPem()...)
if len(rootCertBytes) != 0 {
// 根据信任域添加添加CA证书到certPools/generalCertPool
err := s.peerCertVerifier.AddMappingFromPEM(spiffe.GetTrustDomain(), rootCertBytes)
if err != nil {
log.Errorf("Add Root CAs into peerCertVerifier failed: %v", err)
return fmt.Errorf("add root CAs into peerCertVerifier failed: %v", err)
if features.SpiffeBundleEndpoints != "" {
certMap, err := spiffe.RetrieveSpiffeBundleRootCertsFromStringInput(
features.SpiffeBundleEndpoints, []*x509.Certificate{})
if err != nil {
return err
return nil
在initSecureDiscoveryService初始化grpc server时通过该验证器进行验证客户端身份
cfg := &tls.Config{
GetCertificate: s.getIstiodCertificate,
ClientAuth: tls.VerifyClientCertIfGiven,
ClientCAs: s.peerCertVerifier.GetGeneralCertPool(),
VerifyPeerCertificate: func(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error {
err := s.peerCertVerifier.VerifyPeerCert(rawCerts, verifiedChains)
if err != nil {
log.Infof("Could not verify certificate: %v", err)
return err
func (v *PeerCertVerifier) VerifyPeerCert(rawCerts [][]byte, _ [][]*x509.Certificate) error {
if len(rawCerts) == 0 {
// Peer doesn't present a certificate. Just skip. Other authn methods may be used.
return nil
var peerCert *x509.Certificate
intCertPool := x509.NewCertPool()
for id, rawCert := range rawCerts {
cert, err := x509.ParseCertificate(rawCert)
if err != nil {
return err
if id == 0 {
peerCert = cert
} else {
if len(peerCert.URIs) != 1 {
return fmt.Errorf("peer certificate does not contain 1 URI type SAN, detected %d", len(peerCert.URIs))
trustDomain, err := GetTrustDomainFromURISAN(peerCert.URIs[0].String())
if err != nil {
return err
// 根据信任域获取对应的根证书
rootCertPool, ok := v.certPools[trustDomain]
if !ok {
return fmt.Errorf("no cert pool found for trust domain %s", trustDomain)
// 验证客户端证书
_, err = peerCert.Verify(x509.VerifyOptions{
Roots: rootCertPool,
Intermediates: intCertPool,
return err
注册handler s.XDSServer.Register(s.secureGrpcServer)
StreamAggregatedResources 实现了envoy ADS接口
func (s *DiscoveryServer) StreamAggregatedResources(stream discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
// 检查服务器是否准备好接受客户端并处理新请求.当前准备就绪意味着缓存已同步,因此可以正确构建集群.如果不进行此检查,则下面的InitContext()调用将使用空配置初始化,从而导致重新连接的Envoy失去配置.这是除了添加caches之外的另一项安全检查。已将逻辑同步到就绪探针以处理kube-proxy ip表更新延迟的情况。
if !s.IsServerReady() {
return errors.New("server is not ready to serve discovery information")
ctx := stream.Context()
peerAddr := ""
// 获取节点信息
if peerInfo, ok := peer.FromContext(ctx); ok {
peerAddr = peerInfo.Addr.String()
// 对节点身份进行验证
ids, err := s.authenticate(ctx)
if err != nil {
return err
if ids != nil {
adsLog.Debugf("Authenticated XDS: %v with identity %v", peerAddr, ids)
} else {
adsLog.Debuga("Unauthenticated XDS: ", peerAddr)
// 初始化上下文,因为启动了insecure,不确定哪一个先收到请求
if err = s.globalPushContext().InitContext(s.Env, nil, nil); err != nil {
adsLog.Warnf("Error reading config %v", err)
return err
con := newConnection(peerAddr, stream)
con.Identities = ids
// 从客户端接收信息
var receiveError error
reqChannel := make(chan *discovery.DiscoveryRequest, 1)
go s.receive(con, reqChannel, &receiveError)
for {
select {
// 读取请求
case req, ok := <-reqChannel:
if !ok {
return receiveError
// 返回信息给客户端
err := s.processRequest(req, con)
if err != nil {
return err
// 获取推送channnel的数据,eventhandler控制器将根据资源变更发送数据到pushChannel
case pushEv := <-con.pushChannel:
err := s.pushConnection(con, pushEv)
if err != nil {
return err
case <-con.stop:
return nil
func (s *DiscoveryServer) authenticate(ctx context.Context) ([]string, error) {
if !features.XDSAuth {
return nil, nil
// 当前仅检查该请求是否具有使用我们的密钥签名的证书.受标志保护以避免破坏升级-应该在公开XDS的多集群/网格扩展中启用。
peerInfo, ok := peer.FromContext(ctx)
if !ok {
return nil, errors.New("invalid context")
// 不是TLS连接,我们将不执行身份验证,
if _, ok := peerInfo.AuthInfo.(credentials.TLSInfo); !ok {
return nil, nil
authFailMsgs := []string{}
// 遍历认证器,任何一个验证通过则认为通过
for _, authn := range s.Authenticators {
u, err := authn.Authenticate(ctx)
if u != nil && u.Identities != nil && err == nil {
return u.Identities, nil
func (s *DiscoveryServer) processRequest(req *discovery.DiscoveryRequest, con *Connection) error {
if s.StatusReporter != nil {
s.StatusReporter.RegisterEvent(con.ConID, req.TypeUrl, req.ResponseNonce)
if !s.shouldRespond(con, req) {
return nil
push := s.globalPushContext()
// 推送对应数据给客户端
return s.pushXds(con, push, versionInfo(), con.Watched(req.TypeUrl), &model.PushRequest{Full: true})
func (s *DiscoveryServer) pushConnection(con *Connection, pushEv *Event) error {
pushRequest := pushEv.pushRequest
// 全量推送更新节点当前的信息
if pushRequest.Full {
// Update Proxy with current information.
s.updateProxy(con.proxy, pushRequest.Push)
// 判断是否需要推送
if !ProxyNeedsPush(con.proxy, pushEv) {
adsLog.Debugf("Skipping push to %v, no updates required", con.ConID)
// 只有全量推送增加版本,增量推送不更新版本
if pushRequest.Full {
reportAllEvents(s.StatusReporter, con.ConID, pushRequest.Push.Version, nil)
return nil
currentVersion := versionInfo()
// 向所有生成器发送推送,每个生成器负责确定推送事件是否需要推送
for _, w := range getPushResources(con.proxy.WatchedResources) {
err := s.pushXds(con, pushRequest.Push, currentVersion, w, pushRequest)
if err != nil {
return err
if pushRequest.Full {
// 像unwatch资源报告所有事件.watch的资源将通过pushXds或ack报告。
reportAllEvents(s.StatusReporter, con.ConID, pushRequest.Push.Version, con.proxy.WatchedResources)
return nil
func (s *DiscoveryServer) pushXds(con *Connection, push *model.PushContext,
currentVersion string, w *model.WatchedResource, req *model.PushRequest) error {
if w == nil {
return nil
//根据请求url 确定Generator
gen := s.findGenerator(w.TypeUrl, con)
if gen == nil {
return nil
t0 := time.Now()
// 生成resource
cl := gen.Generate(con.proxy, push, w, req)
if cl == nil {
if s.StatusReporter != nil {
s.StatusReporter.RegisterEvent(con.ConID, w.TypeUrl, push.Version)
return nil
defer func() { recordPushTime(w.TypeUrl, time.Since(t0)) }()
resp := &discovery.DiscoveryResponse{
TypeUrl: w.TypeUrl,
VersionInfo: currentVersion,
Nonce: nonce(push.Version),
Resources: cl,
// 返回数据
err := con.send(resp)
if err != nil {
recordSendError(w.TypeUrl, con.ConID, err)
return err
if _, f := SkipLogTypes[w.TypeUrl]; !f {
adsLog.Infof("%s: PUSH for node:%s resources:%d", v3.GetShortType(w.TypeUrl), con.proxy.ID, len(cl))
return nil
func (c CdsGenerator) Generate(proxy *model.Proxy, push *model.PushContext, w *model.WatchedResource, req *model.PushRequest) model.Resources {
if !cdsNeedsPush(req, proxy) {
return nil
rawClusters := c.Server.ConfigGenerator.BuildClusters(proxy, push)
resources := model.Resources{}
for _, c := range rawClusters {
resources = append(resources, util.MessageToAny(c))
return resources
func (eds *EdsGenerator) Generate(proxy *model.Proxy, push *model.PushContext, w *model.WatchedResource, req *model.PushRequest) model.Resources {
if !edsNeedsPush(req.ConfigsUpdated) {
return nil
var edsUpdatedServices map[string]struct{}
if !req.Full {
edsUpdatedServices = model.ConfigNamesOfKind(req.ConfigsUpdated, gvk.ServiceEntry)
resources := make([]*any.Any, 0)
empty := 0
cached := 0
regenerated := 0
for _, clusterName := range w.ResourceNames {
if edsUpdatedServices != nil {
_, _, hostname, _ := model.ParseSubsetKey(clusterName)
if _, ok := edsUpdatedServices[string(hostname)]; !ok {
// Cluster was not updated, skip recomputing. This happens when we get an incremental update for a
// specific Hostname. On connect or for full push edsUpdatedServices will be empty.
builder := NewEndpointBuilder(clusterName, proxy, push)
if marshalledEndpoint, f := eds.Server.Cache.Get(builder); f {
resources = append(resources, marshalledEndpoint)
} else {
l := eds.Server.generateEndpoints(builder)
if l == nil {
if len(l.Endpoints) == 0 {
resource := util.MessageToAny(l)
resources = append(resources, resource)
eds.Server.Cache.Add(builder, resource)
if len(edsUpdatedServices) == 0 {
adsLog.Infof("EDS: PUSH for node:%s resources:%d empty:%v cached:%v/%v",
proxy.ID, len(resources), empty, cached, cached+regenerated)
} else {
adsLog.Debugf("EDS: PUSH INC for node:%s clusters:%d empty:%v cached:%v/%v",
proxy.ID, len(resources), empty, cached, cached+regenerated)
return resources
func (l LdsGenerator) Generate(proxy *model.Proxy, push *model.PushContext, w *model.WatchedResource, req *model.PushRequest) model.Resources {
if !ldsNeedsPush(req) {
return nil
listeners := l.Server.ConfigGenerator.BuildListeners(proxy, push)
resources := model.Resources{}
for _, c := range listeners {
resources = append(resources, util.MessageToAny(c))
return resources
func (n NdsGenerator) Generate(proxy *model.Proxy, push *model.PushContext, w *model.WatchedResource, req *model.PushRequest) model.Resources {
if !ndsNeedsPush(req) {
return nil
nt := n.Server.ConfigGenerator.BuildNameTable(proxy, push)
if nt == nil {
return nil
resources := model.Resources{util.MessageToAny(nt)}
return resources
func (c RdsGenerator) Generate(proxy *model.Proxy, push *model.PushContext, w *model.WatchedResource, req *model.PushRequest) model.Resources {
if !rdsNeedsPush(req) {
return nil
rawRoutes := c.Server.ConfigGenerator.BuildHTTPRoutes(proxy, push, w.ResourceNames)
resources := model.Resources{}
for _, c := range rawRoutes {
resources = append(resources, util.MessageToAny(c))
return resources
Last updated
Was this helpful?