EventHandlers事件处理器
configcontroller
根据ConfigStores生成configstorecache
注册configstore
ingress类型
s.ConfigStores = append(s.ConfigStores,
ingress.NewController(s.kubeClient, s.environment.Watcher, args.RegistryOptions.KubeOptions))
文件类型
store := memory.Make(collections.Pilot)
configController := memory.NewController(store)
err := s.makeFileMonitor(args.RegistryOptions.FileDir, args.RegistryOptions.KubeOptions.DomainSuffix, configController)
if err != nil {
return err
}
s.ConfigStores = append(s.ConfigStores, configController)
CRD类型
known := knownCRDs(client.Ext())
for _, s := range out.schemas.All() {
// From the spec: "Its name MUST be in the format <.spec.name>.<.spec.group>."
name := fmt.Sprintf("%s.%s", s.Resource().Plural(), s.Resource().Group())
if _, f := known[name]; f {
var i informers.GenericInformer
var err error
if s.Resource().Group() == "networking.x-k8s.io" {
i, err = client.ServiceApisInformer().ForResource(s.Resource().GroupVersionResource())
} else {
i, err = client.IstioInformer().ForResource(s.Resource().GroupVersionResource())
}
if err != nil {
return nil, err
}
out.kinds[s.Resource().GroupVersionKind()] = createCacheHandler(out, s, i)
} else {
scope.Warnf("Skipping CRD %v as it is not present", s.Resource().GroupVersionKind())
}
}
XDS类型
store := memory.Make(collections.Pilot)
configController := memory.NewController(store)
xdsMCP.Store = model.MakeIstioStore(configController)
err = xdsMCP.Run()
if err != nil {
return fmt.Errorf("MCP: failed running %v", err)
}
s.ConfigStores = append(s.ConfigStores, configController)
log.Warna("Started XDS config ", s.ConfigStores)
k8s类型
if s.kubeClient == nil {
return nil
}
configController, err := s.makeKubeConfigController(args)
if err != nil {
return err
}
s.ConfigStores = append(s.ConfigStores, configController)
EventHandler
即事件处理器用于接收配置类型的配置更新事件,initRegistryEventHandlers 用于handler的注册,根据不同类型istio中包含以下不同的handler
serviceHandler - ServiceController
configHandler - configController
workloadEntryHandler - configController
serviceEntryHandler - configController
serviceHandler
serviceHandler函数如下,当收到服务变化时进行全量更新
serviceHandler := func(svc *model.Service, _ model.Event) {
pushReq := &model.PushRequest{
Full: true,
ConfigsUpdated: map[model.ConfigKey]struct{}{{
Kind: gvk.ServiceEntry,
Name: string(svc.Hostname),
Namespace: svc.Attributes.Namespace,
}: {}},
Reason: []model.TriggerReason{model.ServiceUpdate},
}
s.XDSServer.ConfigUpdate(pushReq)
}
configHandler
configHandler 收到新的事件后会将事件加入pushchannel以待推送更新到client
configHandler := func(_, curr config.Config, event model.Event) {
pushReq := &model.PushRequest{
Full: true,
ConfigsUpdated: map[model.ConfigKey]struct{}{{
Kind: curr.GroupVersionKind,
Name: curr.Name,
Namespace: curr.Namespace,
}: {}},
Reason: []model.TriggerReason{model.ConfigUpdate},
}
s.XDSServer.ConfigUpdate(pushReq)
if event != model.EventDelete {
s.statusReporter.AddInProgressResource(curr)
} else {
s.statusReporter.DeleteInProgressResource(curr)
}
}
对于以下类型的资源更新将触发configHandler
Pilot = collection.NewSchemasBuilder().
MustAdd(IstioNetworkingV1Alpha3Destinationrules).
MustAdd(IstioNetworkingV1Alpha3Envoyfilters).
MustAdd(IstioNetworkingV1Alpha3Gateways).
MustAdd(IstioNetworkingV1Alpha3Serviceentries).
MustAdd(IstioNetworkingV1Alpha3Sidecars).
MustAdd(IstioNetworkingV1Alpha3Virtualservices).
MustAdd(IstioNetworkingV1Alpha3Workloadentries).
MustAdd(IstioNetworkingV1Alpha3Workloadgroups).
MustAdd(IstioSecurityV1Beta1Authorizationpolicies).
MustAdd(IstioSecurityV1Beta1Peerauthentications).
MustAdd(IstioSecurityV1Beta1Requestauthentications).
Build()
变化则推送一次XDS
以ingress为例
func (c *controller) onEvent(obj interface{}, event model.Event) error {
if !c.HasSynced() {
return errors.New("waiting till full synchronization")
}
ing, ok := obj.(*ingress.Ingress)
process, err := c.shouldProcessIngress(c.meshWatcher.Mesh(), ing)
if err != nil {
return err
}
if !ok || !process {
return nil
}
log.Infof("ingress event %s for %s/%s", event, ing.Namespace, ing.Name)
// Trigger updates for Gateway and VirtualService
// TODO: we could be smarter here and only trigger when real changes were found
for _, f := range c.virtualServiceHandlers {
f(config.Config{}, config.Config{
Meta: config.Meta{
GroupVersionKind: gvk.VirtualService,
},
}, event)
}
for _, f := range c.gatewayHandlers {
f(config.Config{}, config.Config{
Meta: config.Meta{
GroupVersionKind: gvk.Gateway,
},
}, event)
}
return nil
}
service handler
传入serviceentry来决定是否推送
serviceHandler := func(svc *model.Service, _ model.Event) {
pushReq := &model.PushRequest{
Full: true,
ConfigsUpdated: map[model.ConfigKey]struct{}{{
Kind: gvk.ServiceEntry,
Name: string(svc.Hostname),
Namespace: svc.Attributes.Namespace,
}: {}},
Reason: []model.TriggerReason{model.ServiceUpdate},
}
s.XDSServer.ConfigUpdate(pushReq)
}
if err := s.ServiceController().AppendServiceHandler(serviceHandler); err != nil {
return fmt.Errorf("append service handler failed: %v", err)
}
Last updated