sidecar 注入原理

func (s *Server) initSidecarInjector(args *PilotArgs) (*inject.Webhook, error) {
    // 当前常量: "./var/lib/istio/inject"
    injectPath := args.InjectionOptions.InjectionDirectory
    if injectPath == "" || !injectionEnabled.Get() {
        log.Infof("Skipping sidecar injector, injection path is missing or disabled.")
        return nil, nil
    }

    // 本地或者远程配置存在将启动注入
    var watcher inject.Watcher
    // 查看本地是否有配置
    if _, err := os.Stat(filepath.Join(injectPath, "config")); !os.IsNotExist(err) {
        configFile := filepath.Join(injectPath, "config")
        valuesFile := filepath.Join(injectPath, "values")
        watcher, err = inject.NewFileWatcher(configFile, valuesFile)
        if err != nil {
            return nil, err
        }
    } else {
        //访问configmap,常量istio-sidecar-injector
        configMapName := getInjectorConfigMapName(args.Revision)
        cms := s.kubeClient.CoreV1().ConfigMaps(args.Namespace)
        if _, err := cms.Get(context.TODO(), configMapName, metav1.GetOptions{}); err != nil {
            if errors.IsNotFound(err) {
                log.Infof("Skipping sidecar injector, template not found")
                return nil, nil
            }
            return nil, err
        }
        watcher = inject.NewConfigMapWatcher(s.kubeClient, args.Namespace, configMapName, "config", "values")
    }

    log.Info("initializing sidecar injector")

    parameters := inject.WebhookParameters{
        Watcher: watcher,
        Env:     s.environment,
        // Disable monitoring. The injection metrics will be picked up by Pilots metrics exporter already
        MonitoringPort: -1,
        Mux:            s.httpsMux,
        Revision:       args.Revision,
    }

    wh, err := inject.NewWebhook(parameters)
    if err != nil {
        return nil, fmt.Errorf("failed to create injection webhook: %v", err)
    }
    // Patch cert if a webhook config name is provided.
    // This requires RBAC permissions - a low-priv Istiod should not attempt to patch but rely on
    // operator or CI/CD
    if features.InjectionWebhookConfigName.Get() != "" {
        s.addStartFunc(func(stop <-chan struct{}) error {
            // No leader election - different istiod revisions will patch their own cert.
            caBundlePath := s.caBundlePath
            if hasCustomTLSCerts(args.ServerOptions.TLSOptions) {
                caBundlePath = args.ServerOptions.TLSOptions.CaCertFile
            }
            webhooks.PatchCertLoop(features.InjectionWebhookConfigName.Get(), webhookName, caBundlePath, s.kubeClient, stop)
            return nil
        })
    }
    s.addStartFunc(func(stop <-chan struct{}) error {
        go wh.Run(stop)
        return nil
    })
    return wh, nil
}
func NewWebhook(p WebhookParameters) (*Webhook, error) {
    ...
    //具体的处理器
    p.Mux.HandleFunc("/inject", wh.serveInject)
    p.Mux.HandleFunc("/inject/", wh.serveInject)

    p.Env.Watcher.AddMeshHandler(func() {
        wh.mu.Lock()
        wh.meshConfig = p.Env.Mesh()
        wh.mu.Unlock()
    })

    ...
}
func (wh *Webhook) serveInject(w http.ResponseWriter, r *http.Request) {
    totalInjections.Increment()
    ...

    var reviewResponse *kube.AdmissionResponse
    var obj runtime.Object
    var ar *kube.AdmissionReview
    // webhook传入admissionreview,获取admissionrequest 写入admissionresponse
    if out, _, err := deserializer.Decode(body, nil, obj); err != nil {
        handleError(fmt.Sprintf("Could not decode body: %v", err))
        //返回error信息
        reviewResponse = toAdmissionResponse(err)
    } else {
        log.Debugf("AdmissionRequest for path=%s\n", path)
        ar, err = kube.AdmissionReviewKubeToAdapter(out)
        if err != nil {
            handleError(fmt.Sprintf("Could not decode object: %v", err))
        }
        //注入
        reviewResponse = wh.inject(ar, path)
    }

    response := kube.AdmissionReview{}
    response.Response = reviewResponse
    var responseKube runtime.Object
    var apiVersion string
    if ar != nil {
        apiVersion = ar.APIVersion
        response.TypeMeta = ar.TypeMeta
        if response.Response != nil {
            if ar.Request != nil {
                response.Response.UID = ar.Request.UID
            }
        }
    }
    //根据版本返回信息
    responseKube = kube.AdmissionReviewAdapterToKube(&response, apiVersion)
    resp, err := json.Marshal(responseKube)
    if err != nil {
        log.Errorf("Could not encode response: %v", err)
        http.Error(w, fmt.Sprintf("could not encode response: %v", err), http.StatusInternalServerError)
    }
    if _, err := w.Write(resp); err != nil {
        log.Errorf("Could not write response: %v", err)
        http.Error(w, fmt.Sprintf("could not write response: %v", err), http.StatusInternalServerError)
    }
}

```go func (wh Webhook) inject(ar kube.AdmissionReview, path string) *kube.AdmissionResponse { ... // 生成patch patchBytes, err := injectPod(params) if err != nil { handleError(fmt.Sprintf("Pod injection failed: %v", err)) return toAdmissionResponse(err) }

// 返回resp AdmissionResponse
reviewResponse := kube.AdmissionResponse{
    Allowed: true,
    Patch:   patchBytes,
    PatchType: func() *string {
        pt := "JSONPatch"
        return &pt
    }(),
}
totalSuccessfulInjections.Increment()
return &reviewResponse

}

Last updated

Was this helpful?