func (s *ServiceEntryStore) workloadEntryHandler(old, curr config.Config, event model.Event) {
var oldWle *networking.WorkloadEntry
if old.Spec != nil {
oldWle = old.Spec.(*networking.WorkloadEntry)
}
wle := curr.Spec.(*networking.WorkloadEntry)
key := configKey{
kind: workloadEntryConfigType,
name: curr.Name,
namespace: curr.Namespace,
}
if features.WorkloadEntryHealthChecks && !isHealthy(curr) {
event = model.EventDelete
}
// fire off the k8s handlers
if len(s.workloadHandlers) > 0 {
si := convertWorkloadEntryToWorkloadInstance(curr)
if si != nil {
for _, h := range s.workloadHandlers {
h(si, event)
}
}
}
s.storeMutex.RLock()
// 获取同命名空间的entries
entries := s.seWithSelectorByNamespace[curr.Namespace]
s.storeMutex.RUnlock()
// if there are no service entries, return now to avoid taking unnecessary locks
if len(entries) == 0 {
return
}
log.Debugf("Handle event %s for workload entry %s in namespace %s", event, curr.Name, curr.Namespace)
instancesUpdated := []*model.ServiceInstance{}
instancesDeleted := []*model.ServiceInstance{}
workloadLabels := labels.Collection{wle.Labels}
fullPush := false
configsUpdated := map[model.ConfigKey]struct{}{}
for _, se := range entries {
selected := false
// workloadentry不满足serviceentry label selector
if !workloadLabels.IsSupersetOf(se.entry.WorkloadSelector.Labels) {
// 更新操作
if oldWle != nil {
oldWorkloadLabels := labels.Collection{oldWle.Labels}
// 如果老的workloadentry满足,则需要删除掉原有的endpoint
if oldWorkloadLabels.IsSupersetOf(se.entry.WorkloadSelector.Labels) {
selected = true
instance := convertWorkloadEntryToServiceInstances(oldWle, se.services, se.entry, &key)
instancesDeleted = append(instancesDeleted, instance...)
}
}
} else {
// 满足labelselector 更新为新的
selected = true
instance := convertWorkloadEntryToServiceInstances(wle, se.services, se.entry, &key)
instancesUpdated = append(instancesUpdated, instance...)
}
if selected {
// serviceentry 解析方式为DNS,全量更新
if se.entry.Resolution == networking.ServiceEntry_DNS {
fullPush = true
for key, value := range getUpdatedConfigs(se.services) {
configsUpdated[key] = value
}
}
}
}
if len(instancesDeleted) > 0 {
s.deleteExistingInstances(key, instancesDeleted)
}
if event != model.EventDelete {
s.updateExistingInstances(key, instancesUpdated)
} else {
s.deleteExistingInstances(key, instancesUpdated)
}
// 非全量值更新eds
if !fullPush {
s.edsUpdate(append(instancesUpdated, instancesDeleted...), true)
// trigger full xds push to the related sidecar proxy
if event == model.EventAdd {
s.XdsUpdater.ProxyUpdate(s.Cluster(), wle.Address)
}
return
}
// 更新eds cache
s.edsUpdate(append(instancesUpdated, instancesDeleted...), false)
pushReq := &model.PushRequest{
Full: true,
ConfigsUpdated: configsUpdated,
Reason: []model.TriggerReason{model.EndpointUpdate},
}
// 全量更新 ads
s.XdsUpdater.ConfigUpdate(pushReq)
}
func (s *ServiceEntryStore) serviceEntryHandler(old, curr config.Config, event model.Event) {
// 转换为svc列表
cs := convertServices(curr)
configsUpdated := map[model.ConfigKey]struct{}{}
// 如果是添加/删除事件,始终进行完全推送。 如果是update事件,则仅当服务已更改时,我们才应进行完全推送-否则,只需推送端点更新即可。
var addedSvcs, deletedSvcs, updatedSvcs, unchangedSvcs []*model.Service
switch event {
case model.EventUpdate:
// 转换为svc列表
os := convertServices(old)
// 对比新老serviceentry的workloadselector,变化则进行全量更新
if selectorChanged(old, curr) {
// 更新所有的svc
mark := make(map[host.Name]*model.Service, len(cs))
for _, svc := range cs {
mark[svc.Hostname] = svc
updatedSvcs = append(updatedSvcs, svc)
}
for _, svc := range os {
if _, f := mark[svc.Hostname]; !f {
updatedSvcs = append(updatedSvcs, svc)
}
}
} else {
// 对比差异
addedSvcs, deletedSvcs, updatedSvcs, unchangedSvcs = servicesDiff(os, cs)
}
case model.EventDelete:
deletedSvcs = cs
case model.EventAdd:
addedSvcs = cs
default:
unchangedSvcs = cs
}
for _, svc := range addedSvcs {
s.XdsUpdater.SvcUpdate(s.Cluster(), string(svc.Hostname), svc.Attributes.Namespace, model.EventAdd)
configsUpdated[makeConfigKey(svc)] = struct{}{}
}
for _, svc := range updatedSvcs {
s.XdsUpdater.SvcUpdate(s.Cluster(), string(svc.Hostname), svc.Attributes.Namespace, model.EventUpdate)
configsUpdated[makeConfigKey(svc)] = struct{}{}
}
// 清理endpoint shards
for _, svc := range deletedSvcs {
s.XdsUpdater.SvcUpdate(s.Cluster(), string(svc.Hostname), svc.Attributes.Namespace, model.EventDelete)
configsUpdated[makeConfigKey(svc)] = struct{}{}
}
if len(unchangedSvcs) > 0 {
currentServiceEntry := curr.Spec.(*networking.ServiceEntry)
oldServiceEntry := old.Spec.(*networking.ServiceEntry)
// 解析dns 且ep变化则更新
if currentServiceEntry.Resolution == networking.ServiceEntry_DNS {
if !reflect.DeepEqual(currentServiceEntry.Endpoints, oldServiceEntry.Endpoints) {
// fqdn endpoints have changed. Need full push
for _, svc := range unchangedSvcs {
configsUpdated[makeConfigKey(svc)] = struct{}{}
}
}
}
}
fullPush := len(configsUpdated) > 0
// 没有服务变化,即为update操作且没有更新svc
if !fullPush {
// STATIC服务条目中的IP端点已更改。 我们需要EDS更新如果是全量推送,则将edsUpdate保留。 我们应该对所有未更改的Svcs进行edsUpdate,
instances := convertServiceEntryToInstances(curr, unchangedSvcs)
key := configKey{
kind: serviceEntryConfigType,
name: curr.Name,
namespace: curr.Namespace,
}
//如果后端实例变更,为变更的实例更新索引
s.updateExistingInstances(key, instances)
s.edsUpdate(instances, true)
return
}
// 在这里重新计算索引太昂贵-需要时进行延迟构建。 如果服务已更改,则仅重新计算索引。
s.refreshIndexes.Store(true)
// 进行完全推送时,非DNS添加,更新,未更改的服务会触发eds更新,以便更新endpointshards。
allServices := make([]*model.Service, 0, len(addedSvcs)+len(updatedSvcs)+len(unchangedSvcs))
nonDNSServices := make([]*model.Service, 0, len(addedSvcs)+len(updatedSvcs)+len(unchangedSvcs))
allServices = append(allServices, addedSvcs...)
allServices = append(allServices, updatedSvcs...)
allServices = append(allServices, unchangedSvcs...)
for _, svc := range allServices {
if svc.Resolution != model.DNSLB {
nonDNSServices = append(nonDNSServices, svc)
}
}
// 非dns服务
keys := map[instancesKey]struct{}{}
for _, svc := range nonDNSServices {
keys[instancesKey{hostname: svc.Hostname, namespace: curr.Namespace}] = struct{}{}
}
// 更新eds endpoint shards
s.edsUpdateByKeys(keys, false)
pushReq := &model.PushRequest{
Full: true,
ConfigsUpdated: configsUpdated,
Reason: []model.TriggerReason{model.ServiceUpdate},
}
s.XdsUpdater.ConfigUpdate(pushReq)
}
func (c *Controller) WorkloadInstanceHandler(si *model.WorkloadInstance, event model.Event) {
// ignore malformed workload entries. And ignore any workload entry that does not have a label
// as there is no way for us to select them
if si.Namespace == "" || len(si.Endpoint.Labels) == 0 {
return
}
// this is from a workload entry. Store it in separate map so that
// the InstancesByPort can use these as well as the k8s pods.
c.Lock()
switch event {
case model.EventDelete:
delete(c.workloadInstancesByIP, si.Endpoint.Address)
default: // add or update
// Check to see if the workload entry changed. If it did, clear the old entry
k := si.Namespace + "/" + si.Name
existing := c.workloadInstancesIPsByName[k]
if existing != si.Endpoint.Address {
delete(c.workloadInstancesByIP, existing)
}
c.workloadInstancesByIP[si.Endpoint.Address] = si
c.workloadInstancesIPsByName[k] = si.Endpoint.Address
}
c.Unlock()
// find the workload entry's service by label selector
// rather than scanning through our internal map of model.services, get the services via the k8s apis
dummyPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Namespace: si.Namespace, Labels: si.Endpoint.Labels},
}
// find the services that map to this workload entry, fire off eds updates if the service is of type client-side lb
if k8sServices, err := getPodServices(c.serviceLister, dummyPod); err == nil && len(k8sServices) > 0 {
for _, k8sSvc := range k8sServices {
var service *model.Service
c.RLock()
service = c.servicesMap[kube.ServiceHostname(k8sSvc.Name, k8sSvc.Namespace, c.domainSuffix)]
c.RUnlock()
// Note that this cannot be an external service because k8s external services do not have label selectors.
if service == nil || service.Resolution != model.ClientSideLB {
// may be a headless service
continue
}
// Get the updated list of endpoints that includes k8s pods and the workload entries for this service
// and then notify the EDS server that endpoints for this service have changed.
// We need one endpoint object for each service port
endpoints := make([]*model.IstioEndpoint, 0)
for _, port := range service.Ports {
if port.Protocol == protocol.UDP {
continue
}
// Similar code as UpdateServiceShards in eds.go
instances := c.InstancesByPort(service, port.Port, labels.Collection{})
for _, inst := range instances {
endpoints = append(endpoints, inst.Endpoint)
}
}
// fire off eds update
c.xdsUpdater.EDSUpdate(c.clusterID, string(service.Hostname), service.Attributes.Namespace, endpoints)
}
}
}
// WorkloadInstanceHandler defines the handler for service instances generated by other registries
func (s *ServiceEntryStore) WorkloadInstanceHandler(wi *model.WorkloadInstance, event model.Event) {
key := configKey{
kind: workloadInstanceConfigType,
name: wi.Name,
namespace: wi.Namespace,
}
// Used to indicate if this event was fired for a pod->workloadentry conversion
// and that the event can be ignored due to no relevant change in the workloadentry
redundantEventForPod := false
var addressToDelete string
s.storeMutex.Lock()
// this is from a pod. Store it in separate map so that
// the refreshIndexes function can use these as well as the store ones.
k := wi.Namespace + "/" + wi.Name
switch event {
case model.EventDelete:
if _, exists := s.workloadInstancesByIP[wi.Endpoint.Address]; !exists {
// multiple delete events for the same pod (succeeded/failed/unknown status repeating).
redundantEventForPod = true
} else {
delete(s.workloadInstancesByIP, wi.Endpoint.Address)
delete(s.workloadInstancesIPsByName, k)
}
default: // add or update
// Check to see if the workload entry changed. If it did, clear the old entry
existing := s.workloadInstancesIPsByName[k]
if existing != "" && existing != wi.Endpoint.Address {
delete(s.workloadInstancesByIP, existing)
addressToDelete = existing
}
if old, exists := s.workloadInstancesByIP[wi.Endpoint.Address]; exists {
// If multiple k8s services select the same pod or a service has multiple ports,
// we may be getting multiple events ignore them as we only care about the Endpoint IP itself.
if model.WorkloadInstancesEqual(old, wi) {
// ignore the update as nothing has changed
redundantEventForPod = true
}
}
s.workloadInstancesByIP[wi.Endpoint.Address] = wi
s.workloadInstancesIPsByName[k] = wi.Endpoint.Address
}
// We will only select entries in the same namespace
entries := s.seWithSelectorByNamespace[wi.Namespace]
s.storeMutex.Unlock()
// nothing useful to do.
if len(entries) == 0 || redundantEventForPod {
return
}
log.Debugf("Handle event %s for service instance (from %s) in namespace %s", event,
wi.Endpoint.Address, wi.Namespace)
instances := []*model.ServiceInstance{}
instancesDeleted := []*model.ServiceInstance{}
for _, se := range entries {
workloadLabels := labels.Collection{wi.Endpoint.Labels}
if !workloadLabels.IsSupersetOf(se.entry.WorkloadSelector.Labels) {
// Not a match, skip this one
continue
}
instance := convertWorkloadInstanceToServiceInstance(wi.Endpoint, se.services, se.entry)
instances = append(instances, instance...)
if addressToDelete != "" {
for _, i := range instance {
di := i.DeepCopy()
di.Endpoint.Address = addressToDelete
instancesDeleted = append(instancesDeleted, di)
}
}
}
if len(instancesDeleted) > 0 {
s.deleteExistingInstances(key, instancesDeleted)
}
if event != model.EventDelete {
s.updateExistingInstances(key, instances)
} else {
s.deleteExistingInstances(key, instances)
}
s.edsUpdate(instances, true)
}
// WorkloadInstanceHandler defines the handler for service instances generated by other registries
func (s *ServiceEntryStore) WorkloadInstanceHandler(wi *model.WorkloadInstance, event model.Event) {
key := configKey{
kind: workloadInstanceConfigType,
name: wi.Name,
namespace: wi.Namespace,
}
// 用于标明是否为pod-> workloadentry转换触发了此事件,并且由于工作负载项没有相关更改而可以忽略该事件
redundantEventForPod := false
var addressToDelete string
s.storeMutex.Lock()
// 从pod里获取 将其存储在单独的映射中,以便refreshIndexes函数可以使用存储的对象
k := wi.Namespace + "/" + wi.Name
switch event {
case model.EventDelete:
if _, exists := s.workloadInstancesByIP[wi.Endpoint.Address]; !exists {
// 同一pod的多个删除事件(成功/失败/未知状态重复)
redundantEventForPod = true
} else {
delete(s.workloadInstancesByIP, wi.Endpoint.Address)
delete(s.workloadInstancesIPsByName, k)
}
default: // add or update
// 检查是否更新
existing := s.workloadInstancesIPsByName[k]
if existing != "" && existing != wi.Endpoint.Address {
delete(s.workloadInstancesByIP, existing)
addressToDelete = existing
}
if old, exists := s.workloadInstancesByIP[wi.Endpoint.Address]; exists {
// 如果多个k8s服务选择同一个Pod或一个服务具有多个端口,则由于我们只关心端点IP本身,我们可能会收到多个事件忽略它们。
if model.WorkloadInstancesEqual(old, wi) {
redundantEventForPod = true
}
}
s.workloadInstancesByIP[wi.Endpoint.Address] = wi
s.workloadInstancesIPsByName[k] = wi.Endpoint.Address
}
// 只过滤同命名空间的对象
entries := s.seWithSelectorByNamespace[wi.Namespace]
s.storeMutex.Unlock()
if len(entries) == 0 || redundantEventForPod {
return
}
instances := []*model.ServiceInstance{}
instancesDeleted := []*model.ServiceInstance{}
for _, se := range entries {
workloadLabels := labels.Collection{wi.Endpoint.Labels}
// 不满足筛选条件则跳过
if !workloadLabels.IsSupersetOf(se.entry.WorkloadSelector.Labels) {
continue
}
// 添加新的instance
instance := convertWorkloadInstanceToServiceInstance(wi.Endpoint, se.services, se.entry)
instances = append(instances, instance...)
// 删除原有的
if addressToDelete != "" {
for _, i := range instance {
di := i.DeepCopy()
di.Endpoint.Address = addressToDelete
instancesDeleted = append(instancesDeleted, di)
}
}
}
// 更新缓存
if len(instancesDeleted) > 0 {
s.deleteExistingInstances(key, instancesDeleted)
}
if event != model.EventDelete {
s.updateExistingInstances(key, instances)
} else {
s.deleteExistingInstances(key, instances)
}
// 触发eds
s.edsUpdate(instances, true)
}
if localCluster {
// TODO implement deduping in aggregate registry to allow multiple k8s registries to handle WorkloadEntry
if m.serviceEntryStore != nil && features.EnableK8SServiceSelectWorkloadEntries {
// Add an instance handler in the service entry store to notify kubernetes about workload entry events
m.serviceEntryStore.AppendWorkloadHandler(kubeRegistry.WorkloadInstanceHandler)
}
}
// WorkloadInstanceHandler定义其他registry生成服务实例的处理程序
func (c *Controller) WorkloadInstanceHandler(si *model.WorkloadInstance, event model.Event) {
// 忽略命名空间或label为空的情况
if si.Namespace == "" || len(si.Endpoint.Labels) == 0 {
return
}
// 更新缓存
c.Lock()
switch event {
case model.EventDelete:
delete(c.workloadInstancesByIP, si.Endpoint.Address)
default:
k := si.Namespace + "/" + si.Name
existing := c.workloadInstancesIPsByName[k]
if existing != si.Endpoint.Address {
delete(c.workloadInstancesByIP, existing)
}
c.workloadInstancesByIP[si.Endpoint.Address] = si
c.workloadInstancesIPsByName[k] = si.Endpoint.Address
}
c.Unlock()
// ,通过k8s api获取服务
dummyPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Namespace: si.Namespace, Labels: si.Endpoint.Labels},
}
// 找到映射到该工作负载条目的服务,如果该服务属于客户端lb类型,则触发eds更新
if k8sServices, err := getPodServices(c.serviceLister, dummyPod); err == nil && len(k8sServices) > 0 {
for _, k8sSvc := range k8sServices {
var service *model.Service
c.RLock()
service = c.servicesMap[kube.ServiceHostname(k8sSvc.Name, k8sSvc.Namespace, c.domainSuffix)]
c.RUnlock()
// 不能是外部服务,因为k8s外部服务没有标签选择器。
if service == nil || service.Resolution != model.ClientSideLB {
// 可能是headless svc
continue
}
// 获取包括该服务的k8s pod和工作负载条目的endpoints的更新列表,然后通知EDS服务器该服务的端点已更改。 每个服务端口需要一个endpoint对象
endpoints := make([]*model.IstioEndpoint, 0)
for _, port := range service.Ports {
if port.Protocol == protocol.UDP {
continue
}
instances := c.InstancesByPort(service, port.Port, labels.Collection{})
for _, inst := range instances {n
endpoints = append(endpoints, inst.Endpoint)
}
}
// 触发EDS更新
c.xdsUpdater.EDSUpdate(c.clusterID, string(service.Hostname), service.Attributes.Namespace, endpoints)
}
}
}