mcp-over-xds

在今年五月份社区已经添加了 MCP-OVER-XDS的实现 ,在当前的master代码中已经 移除了mcp 协议 的实现代码,将全部转换为MCP-OVER-XDS实现,也就意味着istio 1.9将不再支持原有MCP协议,具体参考 XDS-OVER-MCP设计

initConfigSources

当我们配置的ConfigSource为XDS类型时,将创建XDS client,用于发起请求

// 初始化一个ads client
xdsMCP, err := adsc.New(srcAddress.Host, &adsc.Config{
    Meta: model.NodeMetadata{
        Generator: "api",
    }.ToStruct(),
    InitialDiscoveryRequests: adsc.ConfigInitialRequests(),
})
if err != nil {
    return fmt.Errorf("failed to dial XDS %s %v", configSource.Address, err)
}
// 初始化一个configstore
store := memory.Make(collections.Pilot)
// 初始化config controller
configController := memory.NewController(store)
// 初始化istio config stroe
xdsMCP.Store = model.MakeIstioStore(configController)
// 运行
err = xdsMCP.Run()
if err != nil {
    return fmt.Errorf("MCP: failed running %v", err)
}
s.ConfigStores = append(s.ConfigStores, configController)
log.Warn("Started XDS config ", s.ConfigStores)

InitialDiscoveryRequests 用于进行建联后的初始请求,代表着istio所需要关注的资源,

out = append(out, &discovery.DiscoveryRequest{
        // meshconfig类型
        TypeUrl: collections.IstioMeshV1Alpha1MeshConfig.Resource().GroupVersionKind().String(),
    })
    for _, sch := range collections.Pilot.All() {
        out = append(out, &discovery.DiscoveryRequest{
            TypeUrl: sch.Resource().GroupVersionKind().String(),
        })
    }

Pilot 涉及的所有资源

    Pilot = collection.NewSchemasBuilder().
        MustAdd(IstioNetworkingV1Alpha3Destinationrules).
        MustAdd(IstioNetworkingV1Alpha3Envoyfilters).
        MustAdd(IstioNetworkingV1Alpha3Gateways).
        MustAdd(IstioNetworkingV1Alpha3Serviceentries).
        MustAdd(IstioNetworkingV1Alpha3Sidecars).
        MustAdd(IstioNetworkingV1Alpha3Virtualservices).
        MustAdd(IstioNetworkingV1Alpha3Workloadentries).
        MustAdd(IstioNetworkingV1Alpha3Workloadgroups).
        MustAdd(IstioSecurityV1Beta1Authorizationpolicies).
        MustAdd(IstioSecurityV1Beta1Peerauthentications).
        MustAdd(IstioSecurityV1Beta1Requestauthentications).
        Build()

ads run

ads run主要进行发送初始发现请求,然后接收返回的数据

func (a *ADSC) Run() error {
    var err error
    a.client = discovery.NewAggregatedDiscoveryServiceClient(a.conn)
    a.stream, err = a.client.StreamAggregatedResources(context.Background())
    if err != nil {
        return err
    }
    a.sendNodeMeta = true
    a.InitialLoad = 0
    //  发送初始请求
    for _, r := range a.cfg.InitialDiscoveryRequests {
        if r.TypeUrl == v3.ClusterType {
            a.watchTime = time.Now()
        }
        _ = a.Send(r)
    }
    a.RecvWg.Add(1)
    // 接收ads server返回的数据
    go a.handleRecv()
    return nil
}

handleMCP

因为adsc是一个通用的ads客户端,我们不需要关注其它的逻辑主要关注handleMCP

获取请求的gvk

groupVersionKind := config.GroupVersionKind{Group: gvk[0], Version: gvk[1], Kind: gvk[2]}

判断cache内是否有对应的对象,有则更新,无则创建

cfg := a.Store.Get(val.GroupVersionKind, val.Name, val.Namespace)
        if cfg == nil {
            _, err = a.Store.Create(*val)
            if err != nil {
                adscLog.Warnf("Error adding a new resource to the store %v", err)
                continue
            }
        } else {
            _, err = a.Store.Update(*val)
            if err != nil {
                adscLog.Warnf("Error updating an existing resource in the store %v", err)
                continue
            }
        }

当envoy连接时,将从configstore 获取配置列表下发到客户端,当连接后,对于Create或者update类型触发对应的handler,对应的为confighandler来进行push

MCP-OVER-XDS简单示例

部署istio

istioctl manifest generate --set profile=demo > demo.yaml 修改 istio.istio-system configmap添加以下内容

添加xds configsource

    configSources:
      - address: xds://172.16.233.1:1109

部署istio

kubectl apply -f demo.yaml

实现ads server

对于原生的envoy-control-plane,使用xds.newserver,利用snapshotcache来实现ads server的方式在istio中不适用,因为envoy-control-plane只能管理原生envoy的xds资源,而mcp-over-xds涉及到istio的crd资源

对于一个adsserver 来说需要实现AggregatedDiscoveryServiceServer接口

type AggregatedDiscoveryServiceServer interface {
    // This is a gRPC-only API.
    StreamAggregatedResources(AggregatedDiscoveryService_StreamAggregatedResourcesServer) error
    DeltaAggregatedResources(AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error
}
func (m myserver) StreamAggregatedResources(stream d3.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
    if peerInfo, ok := peer.FromContext(stream.Context()); ok {
        log.Println(peerInfo)
    }
    pushall(stream)
    for {
        select {
        case <-m.psuhc:
            pushall(stream)
        }
    }
    return nil
}

pushall主要了推送对应数据的逻辑,istio会根据TypeUrl进行反解析

err = stream.Send(&d3.DiscoveryResponse{
        TypeUrl:     "security.istio.io/v1beta1/PeerAuthentication",
        VersionInfo: "1",
        Nonce:       "",
        Resources:   resp,
    })

这里我们统一为客户端也就是istio推送一个PeerAuthentication策略

    pa := v1beta1.PeerAuthentication{
        TypeMeta: v1.TypeMeta{
            APIVersion: "security.istio.io/v1beta1",
            Kind:       "PeerAuthentication",
        },
        ObjectMeta: v1.ObjectMeta{
            Name:      "default",
            Namespace: "istio-system",
        },
        Spec: securityv1beta1.PeerAuthentication{
            Mtls: &securityv1beta1.PeerAuthentication_MutualTLS{
                Mode: securityv1beta1.PeerAuthentication_MutualTLS_STRICT,
            },
        },
    }

检查

  • 查看配置是否生效

    我们访问istiod的debug接口可以看到已经收到了对应的策略

curl 172.17.116.27:8080/debug/configz
[
    {
      "kind": "PeerAuthentication",
      "apiVersion": "security.istio.io/v1beta1",
      "metadata": {
        "name": "default",
        "namespace": "istio-system",
        "resourceVersion": "2020-12-15 06:17:28.774277383 +0000 UTC m=+782.333181911",
        "creationTimestamp": null
      },
      "spec": {
        "mtls": {}
      }
    }
  ]
  • 访问服务

部署测试实例

kubectl create ns foo
kubectl apply -f <(istioctl kube-inject -f samples/httpbin/httpbin.yaml) -n foo
kubectl apply -f <(istioctl kube-inject -f samples/sleep/sleep.yaml) -n foo
kubectl create ns bar
kubectl apply -f <(istioctl kube-inject -f samples/httpbin/httpbin.yaml) -n bar
kubectl apply -f <(istioctl kube-inject -f samples/sleep/sleep.yaml) -n bar
kubectl create ns legacy
kubectl apply -f samples/httpbin/httpbin.yaml -n legacy
kubectl apply -f samples/sleep/sleep.yaml -n legacy

访问服务已经无法正常的访问

curl httpbin.foo:8000/ip
curl: (56) Recv failure: Connection reset by peer

Last updated

Was this helpful?