EventHandlers事件处理器

configcontroller

根据ConfigStores生成configstorecache

注册configstore

ingress类型

s.ConfigStores = append(s.ConfigStores,
            ingress.NewController(s.kubeClient, s.environment.Watcher, args.RegistryOptions.KubeOptions))

文件类型

store := memory.Make(collections.Pilot)
configController := memory.NewController(store)

err := s.makeFileMonitor(args.RegistryOptions.FileDir, args.RegistryOptions.KubeOptions.DomainSuffix, configController)
if err != nil {
    return err
}
s.ConfigStores = append(s.ConfigStores, configController)

CRD类型

        known := knownCRDs(client.Ext())
        for _, s := range out.schemas.All() {
            // From the spec: "Its name MUST be in the format <.spec.name>.<.spec.group>."
            name := fmt.Sprintf("%s.%s", s.Resource().Plural(), s.Resource().Group())
            if _, f := known[name]; f {
                var i informers.GenericInformer
                var err error
                if s.Resource().Group() == "networking.x-k8s.io" {
                    i, err = client.ServiceApisInformer().ForResource(s.Resource().GroupVersionResource())
                } else {
                    i, err = client.IstioInformer().ForResource(s.Resource().GroupVersionResource())
                }
                if err != nil {
                    return nil, err
                }
                out.kinds[s.Resource().GroupVersionKind()] = createCacheHandler(out, s, i)
            } else {
                scope.Warnf("Skipping CRD %v as it is not present", s.Resource().GroupVersionKind())
            }
        }

XDS类型

store := memory.Make(collections.Pilot)
            configController := memory.NewController(store)
            xdsMCP.Store = model.MakeIstioStore(configController)
            err = xdsMCP.Run()
            if err != nil {
                return fmt.Errorf("MCP: failed running %v", err)
            }
            s.ConfigStores = append(s.ConfigStores, configController)
            log.Warna("Started XDS config ", s.ConfigStores)

k8s类型

if s.kubeClient == nil {
    return nil
}
configController, err := s.makeKubeConfigController(args)
if err != nil {
    return err
}
s.ConfigStores = append(s.ConfigStores, configController)

EventHandler

即事件处理器用于接收配置类型的配置更新事件,initRegistryEventHandlers 用于handler的注册,根据不同类型istio中包含以下不同的handler

  • serviceHandler - ServiceController

  • configHandler - configController

  • workloadEntryHandler - configController

  • serviceEntryHandler - configController

serviceHandler

serviceHandler函数如下,当收到服务变化时进行全量更新

serviceHandler := func(svc *model.Service, _ model.Event) {
    pushReq := &model.PushRequest{
        Full: true,
        ConfigsUpdated: map[model.ConfigKey]struct{}{{
            Kind:      gvk.ServiceEntry,
            Name:      string(svc.Hostname),
            Namespace: svc.Attributes.Namespace,
        }: {}},
        Reason: []model.TriggerReason{model.ServiceUpdate},
    }
    s.XDSServer.ConfigUpdate(pushReq)
}

configHandler

configHandler 收到新的事件后会将事件加入pushchannel以待推送更新到client

configHandler := func(_, curr config.Config, event model.Event) {
    pushReq := &model.PushRequest{
        Full: true,
        ConfigsUpdated: map[model.ConfigKey]struct{}{{
            Kind:      curr.GroupVersionKind,
            Name:      curr.Name,
            Namespace: curr.Namespace,
        }: {}},
        Reason: []model.TriggerReason{model.ConfigUpdate},
    }
    s.XDSServer.ConfigUpdate(pushReq)
    if event != model.EventDelete {
        s.statusReporter.AddInProgressResource(curr)
    } else {
        s.statusReporter.DeleteInProgressResource(curr)
    }
}

对于以下类型的资源更新将触发configHandler

    Pilot = collection.NewSchemasBuilder().
        MustAdd(IstioNetworkingV1Alpha3Destinationrules).
        MustAdd(IstioNetworkingV1Alpha3Envoyfilters).
        MustAdd(IstioNetworkingV1Alpha3Gateways).
        MustAdd(IstioNetworkingV1Alpha3Serviceentries).
        MustAdd(IstioNetworkingV1Alpha3Sidecars).
        MustAdd(IstioNetworkingV1Alpha3Virtualservices).
        MustAdd(IstioNetworkingV1Alpha3Workloadentries).
        MustAdd(IstioNetworkingV1Alpha3Workloadgroups).
        MustAdd(IstioSecurityV1Beta1Authorizationpolicies).
        MustAdd(IstioSecurityV1Beta1Peerauthentications).
        MustAdd(IstioSecurityV1Beta1Requestauthentications).
        Build()

变化则推送一次XDS

以ingress为例

func (c *controller) onEvent(obj interface{}, event model.Event) error {
    if !c.HasSynced() {
        return errors.New("waiting till full synchronization")
    }

    ing, ok := obj.(*ingress.Ingress)
    process, err := c.shouldProcessIngress(c.meshWatcher.Mesh(), ing)
    if err != nil {
        return err
    }
    if !ok || !process {
        return nil
    }
    log.Infof("ingress event %s for %s/%s", event, ing.Namespace, ing.Name)

    // Trigger updates for Gateway and VirtualService
    // TODO: we could be smarter here and only trigger when real changes were found
    for _, f := range c.virtualServiceHandlers {
        f(config.Config{}, config.Config{
            Meta: config.Meta{
                GroupVersionKind: gvk.VirtualService,
            },
        }, event)
    }
    for _, f := range c.gatewayHandlers {
        f(config.Config{}, config.Config{
            Meta: config.Meta{
                GroupVersionKind: gvk.Gateway,
            },
        }, event)
    }

    return nil
}

service handler

传入serviceentry来决定是否推送

    serviceHandler := func(svc *model.Service, _ model.Event) {
        pushReq := &model.PushRequest{
            Full: true,
            ConfigsUpdated: map[model.ConfigKey]struct{}{{
                Kind:      gvk.ServiceEntry,
                Name:      string(svc.Hostname),
                Namespace: svc.Attributes.Namespace,
            }: {}},
            Reason: []model.TriggerReason{model.ServiceUpdate},
        }
        s.XDSServer.ConfigUpdate(pushReq)
    }
    if err := s.ServiceController().AppendServiceHandler(serviceHandler); err != nil {
        return fmt.Errorf("append service handler failed: %v", err)
    }

Last updated