KubeBilder 的背后 - controller-runtime之manager的实现

源代码 2024-10-9 02:15:59 273 0 来自 中国
先容

在controller-runtime中利用一个 Manager 的接口来管理 Controller,除了控制器着实还可以管理Admission Webhook,也包罗访问资源对象的client、cache、scheme等,如下图所示:
Manager 如何利用

起首我们来看看controller-runtime中的Manager 是如何利用的,检察controller-runtime代码堆栈中的示例,示例中关于Manager的利用步调如下:

  • 实例化 manager,参数 config
  • 向 manager 添加 scheme
  • 向 manager 添加 controller, 该 controller 包罗一个 reconciler 布局体,我们必要在 reconciler 布局体实现逻辑处理惩罚
  • 向 manager 添加 webhook,同样必要实现逻辑处理惩罚
  • 启动 manager.start()
代码如下所示:
func main() {    ctrl.SetLogger(zap.New())    // 根据 config 实例化 Manager    // config.GetConfigOrDie() 利用默认的设置~/.kube/config    mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{})    if err != nil {        setupLog.Error(err, "unable to start manager")        os.Exit(1)    }    // in a real controller, we'd create a new scheme for this    // 将 api 注册到 Scheme,Scheme 提供了 GVK 到 go type 的映射    // 如果多个 crd, 必要多次调用 AddToScheme    err = api.AddToScheme(mgr.GetScheme())    if err != nil {        setupLog.Error(err, "unable to add scheme")        os.Exit(1)    }    // 注册 Controller 到 Manager    // For: 监控的资源,相当于调用 Watches(&source.Kind{Type:apiType},&handler.EnqueueRequestFOrObject{})    // Owns:拥有的下属资源,如果 corev1.Pod{} 资源属于 api.ChaosPod{},也将会被监控,相当于调用 Watches(&source.Kind{Type: <ForType-apiType>}, &handler.EnqueueRequestForOwner{OwnerType: apiType, IsController: true})    // reconciler 布局体:继续 Reconciler,必要实现该布局体和 Reconcile 方法    // mgr.GetClient()、mgr.GetScheme() 是 Client 和 Scheme,前面的 manager.New 初始化了    err = ctrl.NewControllerManagedBy(mgr).        For(&api.ChaosPod{}).        Owns(&corev1.Pod{}).        Complete(&reconciler{            Client: mgr.GetClient(),            scheme: mgr.GetScheme(),        })    if err != nil {        setupLog.Error(err, "unable to create controller")        os.Exit(1)    }    // 构建 webhook    err = ctrl.NewWebhookManagedBy(mgr).        For(&api.ChaosPod{}).        Complete()    if err != nil {        setupLog.Error(err, "unable to create webhook")        os.Exit(1)    }    // 启动 manager,实际上是启动 controller    setupLog.Info("starting manager")    if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {        setupLog.Error(err, "problem running manager")        os.Exit(1)    }}Manager是一个用于初始化共享依靠关系的布局,接口界说如下所示:// pkg/manager/manager.go// Manager initializes shared dependencies such as Caches and Clients, and provides them to Runnables.// A Manager is required to create Controllers.// Manager 初始化共享的依靠关系,好比Caches 和 Client,并将它们提供给 Runnablestype Manager interface {    // Cluster holds a variety of methods to interact with a cluster.    // Cluster 拥有多种与集群交互的方法    cluster.Cluster    // Add will set requested dependencies on the component, and cause the component to be    // started when Start is called.  Add will inject any dependencies for which the argument    // implements the inject interface - e.g. inject.Client.    // Depending on if a Runnable implements LeaderElectionRunnable interface, a Runnable can be run in either    // non-leaderelection mode (always running) or leader election mode (managed by leader election if enabled).    // Add 将在组建上设置所需的依靠关系,并在调用 Start 时启动组件    // Add 将注入接口的依靠关系,好比:注入inject.Client    // 根据 Runnable 是否实现了 LeaderElectionRunnable 接口判定    // Runnable 可以在非 LeaderElection 模式(始终运行)或 LeaderElection 模式(如果启用了 LeaderElection,则由 LeaderElection 管理)下运行    Add(Runnable) error    // Elected is closed when this manager is elected leader of a group of    // managers, either because it won a leader election or because no leader    // election was configured.    // leader 推选    // 当赢得推选大概为设置推选则关闭    Elected() <-chan struct{}    // AddMetricsExtraHandler adds an extra handler served on path to the http server that serves metrics.    // Might be useful to register some diagnostic endpoints e.g. pprof. Note that these endpoints meant to be    // sensitive and shouldn't be exposed publicly.    // If the simple path -> handler mapping offered here is not enough, a new http server/listener should be added as    // Runnable to the manager via Add method.    AddMetricsExtraHandler(path string, handler http.Handler) error    // AddHealthzCheck allows you to add Healthz checker    AddHealthzCheck(name string, check healthz.Checker) error    // AddReadyzCheck allows you to add Readyz checker    AddReadyzCheck(name string, check healthz.Checker) error    // Start starts all registered Controllers and blocks until the context is cancelled.    // Returns an error if there is an error starting any controller.    //    // If LeaderElection is used, the binary must be exited immediately after this returns,    // otherwise components that need leader election might continue to run after the leader    // lock was lost.    // Start 启动所有已注册的控制器,并不绝运行,直到制止通道关闭    // 如果启动任何控制器都堕落,则返回错误    // 如果利用了 LeaderElection,则必须在此返回后立刻退出二进制文件    // 否则必要 Leader 推选的组件可能会在 Leader 锁丢失后继续运行    Start(ctx context.Context) error    // GetWebhookServer returns a webhook.Server    GetWebhookServer() *webhook.Server    // GetLogger returns this manager's logger.    GetLogger() logr.Logger    // GetControllerOptions returns controller global configuration options.    // GetControllerOptions 控制器全局设置选项    GetControllerOptions() v1alpha1.ControllerConfigurationSpec}Manager 可以关闭 Runnable 的生命周期(添加/启动),如果不通过 Manager 启动(必要处理惩罚各种常见的依靠关系)。
Manager 还保持共同的依靠性:client、cache、scheme等。

  • 提供了 getter(比方GetClient)
  • 简单的注入机制(runtime/inject)
  • 别的还支持领导人推选,只需用选项指定即可,还提供了一个用于优雅关闭的信号处理惩罚步调。
Manager 实例化

检察 Manager 的实例化 New 函数的实现:
// pkg/manager/manager.go
// New returns a new Manager for creating Controllers.
// New 返回用于创建控制器的新 Manager
func New(config *rest.Config, options Options) (Manager, error) {
// Set default values for options fields
// 设置选项字段的默认值
options = setOptionsDefaults(options)
// 构造集群cluster, err := cluster.New(config, func(clusterOptions *cluster.Options) {    clusterOptions.Scheme = options.Scheme    clusterOptions.MapperProvider = options.MapperProvider    clusterOptions.Logger = options.Logger    clusterOptions.SyncPeriod = options.SyncPeriod    clusterOptions.Namespace = options.Namespace    clusterOptions.NewCache = options.NewCache    clusterOptions.NewClient = options.NewClient    clusterOptions.ClientDisableCacheFor = options.ClientDisableCacheFor    clusterOptions.DryRunClient = options.DryRunClient    clusterOptions.EventBroadcaster = options.EventBroadcaster //nolint:staticcheck})if err != nil {    return nil, err}// Create the recorder provider to inject event recorders for the components.// TODO(directxman12): the log for the event provider should have a context (name, tags, etc) specific// to the particular controller that it's being injected into, rather than a generic one like is here.recorderProvider, err := options.newRecorderProvider(config, cluster.GetScheme(), options.Logger.WithName("events"), options.makeBroadcaster)if err != nil {    return nil, err}// Create the resource lock to enable leader electionvar leaderConfig *rest.Configvar leaderRecorderProvider *intrec.Providerif options.LeaderElectionConfig == nil {    leaderConfig = rest.CopyConfig(config)    leaderRecorderProvider = recorderProvider} else {    leaderConfig = rest.CopyConfig(options.LeaderElectionConfig)    leaderRecorderProvider, err = options.newRecorderProvider(leaderConfig, cluster.GetScheme(), options.Logger.WithName("events"), options.makeBroadcaster)    if err != nil {        return nil, err    }}resourceLock, err := options.newResourceLock(leaderConfig, leaderRecorderProvider, leaderelection.Options{    LeaderElection:             options.LeaderElection,    LeaderElectionResourceLock: options.LeaderElectionResourceLock,    LeaderElectionID:           options.LeaderElectionID,    LeaderElectionNamespace:    options.LeaderElectionNamespace,})if err != nil {    return nil, err}// Create the metrics listener. This will throw an error if the metrics bind// address is invalid or already in use.metricsListener, err := options.newMetricsListener(options.MetricsBindAddress)if err != nil {    return nil, err}// By default we have no extra endpoints to expose on metrics http server.metricsExtraHandlers := make(map[string]http.Handler)// Create health probes listener. This will throw an error if the bind// address is invalid or already in use.healthProbeListener, err := options.newHealthProbeListener(options.HealthProbeBindAddress)if err != nil {    return nil, err}errChan := make(chan error)runnables := newRunnables(options.BaseContext, errChan)return &controllerManager{    stopProcedureEngaged:          pointer.Int64(0),    cluster:                       cluster,    runnables:                     runnables,    errChan:                       errChan,    recorderProvider:              recorderProvider,    resourceLock:                  resourceLock,    metricsListener:               metricsListener,    metricsExtraHandlers:          metricsExtraHandlers,    controllerOptions:             options.Controller,    logger:                        options.Logger,    elected:                       make(chan struct{}),    port:                          options.Port,    host:                          options.Host,    certDir:                       options.CertDir,    webhookServer:                 options.WebhookServer,    leaseDuration:                 *options.LeaseDuration,    renewDeadline:                 *options.RenewDeadline,    retryPeriod:                   *options.RetryPeriod,    healthProbeListener:           healthProbeListener,    readinessEndpointName:         options.ReadinessEndpointName,    livenessEndpointName:          options.LivenessEndpointName,    gracefulShutdownTimeout:       *options.GracefulShutdownTimeout,    internalProceduresStop:        make(chan struct{}),    leaderElectionStopped:         make(chan struct{}),    leaderElectionReleaseOnCancel: options.LeaderElectionReleaseOnCancel,}, nil}
New 函数中就是为 Manager 实行初始化工作,末了返回的是一个 controllerManager 的实例,这是因为该布局体是 Manager 接口的一个实现,所以 Manager 的真正操纵都是这个布局体去实现的。
接下来最告急的是注册 Controller 到 Manager 的过程:
err = ctrl.NewControllerManagedBy(mgr).        For(&api.ChaosPod{}).        Owns(&corev1.Pod{}).        Complete(&reconciler{            Client: mgr.GetClient(),            scheme: mgr.GetScheme(),        })builder.ControllerManagedBy 函数返回一个新的控制器构造器 Builder 对象,天生的控制器将由所提供的管理器 Manager 启动,函数实现很简单:// pkg/builder/controller.go// Builder builds a Controller.// Builder 构造一个控制器type Builder struct {    forInput         ForInput    ownsInput        []OwnsInput    watchesInput     []WatchesInput    mgr              manager.Manager    globalPredicates []predicate.Predicate    ctrl             controller.Controller    ctrlOptions      controller.Options    name             string}// ControllerManagedBy returns a new controller builder that will be started by the provided Manager.func ControllerManagedBy(m manager.Manager) *Builder {    return &Builder{mgr: m}}可以看到controller-runtime封装了一个Builder的布局体用来天生Controller,将Manager转达给这个构造器,然后就是调用构造器的For函数了:// pkg/builder/controller.go// ForInput represents the information set by For method.// ForInput 标识 For 方法设置的信息type ForInput struct {    object           client.Object    predicates       []predicate.Predicate    objectProjection objectProjection    err              error}// For defines the type of Object being *reconciled*, and configures the ControllerManagedBy to respond to create / delete /// update events by *reconciling the object*.// This is the equivalent of calling// Watches(&source.Kind{Type: apiType}, &handler.EnqueueRequestForObject{}).// For 函数界说了被调谐的对象范例// 并设置 ControllerManagerBy 通过调谐对象来相应 create/delete/update 变乱// 调用 For 函数相当于调用:// Watches(&source.Kind{Type: apiType}, &handler.EnqueueRequestForObject{}).func (blder *Builder) For(object client.Object, opts ...ForOption) *Builder {    if blder.forInput.object != nil {        blder.forInput.err = fmt.Errorf("For(...) should only be called once, could not assign multiple objects for reconciliation")        return blder    }    input := ForInput{object: object}    for _, opt := range opts {        opt.ApplyToFor(&input)    }    blder.forInput = input    return blder}For 函数就是用来界说我们要处理惩罚的对象范例的,接着调用了 Owns 函数:
// pkg/builder/controller.go// Owns defines types of Objects being *generated* by the ControllerManagedBy, and configures the ControllerManagedBy to respond to// create / delete / update events by *reconciling the owner object*.  This is the equivalent of calling// Watches(&source.Kind{Type: <ForType-forInput>}, &handler.EnqueueRequestForOwner{OwnerType: apiType, IsController: true}).// Owns 界说了 ControllerManagerBy 天生的对象范例// 并设置 ControllerManagerBy 通过调协所有者对象来相应 create/delete/update 变乱// 这相当于调用:// Watches(&source.Kind{Type: <ForType-forInput>}, &handler.EnqueueRequestForOwner{OwnerType: apiType, IsController: true})func (blder *Builder) Owns(object client.Object, opts ...OwnsOption) *Builder {    input := OwnsInput{object: object}    for _, opt := range opts {        opt.ApplyToOwns(&input)    }    blder.ownsInput = append(blder.ownsInput, input)    return blder}Owns 函数就是来设置我们监听的资源对象的子资源,如果想要和谐资源则必要调用 Owns 函数举行设置,然后就是最告急的 Complete 函数了:// pkg/builder/controller.go// Build builds the Application Controller and returns the Controller it created.// Build 构建应用步调 ControllerManagedBy 并返回它创建的 Controllerfunc (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, error) {    if r == nil {        return nil, fmt.Errorf("must provide a non-nil Reconciler")    }    if blder.mgr == nil {        return nil, fmt.Errorf("must provide a non-nil Manager")    }    if blder.forInput.err != nil {        return nil, blder.forInput.err    }    // Checking the reconcile type exist or not    if blder.forInput.object == nil {        return nil, fmt.Errorf("must provide an object for reconciliation")    }    // Set the ControllerManagedBy    // 设置 ControllerManagedBy    if err := blder.doController(r); err != nil {        return nil, err    }    // Set the Watch    // 设置 Watch    if err := blder.doWatch(); err != nil {        return nil, err    }    return blder.ctrl, nil}Complete 函数通过调用 Build 函数来构建 Controller,此中比力告急的就是 doController 和 doWatch 两个函数,doController 就是去真正实例化 Controller 的函数:
// pkg/builder/controller.gofunc (blder *Builder) doController(r reconcile.Reconciler) error {    globalOpts := blder.mgr.GetControllerOptions()    ctrlOptions := blder.ctrlOptions    if ctrlOptions.Reconciler == nil {        ctrlOptions.Reconciler = r    }    // Retrieve the GVK from the object we're reconciling    // to prepopulate logger information, and to optionally generate a default name.    // 从我们正在调协的对象中检索GVK    gvk, err := getGvk(blder.forInput.object, blder.mgr.GetScheme())    if err != nil {        return err    }    // Setup concurrency.    // 设置并发    if ctrlOptions.MaxConcurrentReconciles == 0 {        groupKind := gvk.GroupKind().String()        if concurrency, ok := globalOpts.GroupKindConcurrency[groupKind]; ok && concurrency > 0 {            ctrlOptions.MaxConcurrentReconciles = concurrency        }    }    // Setup cache sync timeout.    // 设置缓存同步超市时间    if ctrlOptions.CacheSyncTimeout == 0 && globalOpts.CacheSyncTimeout != nil {        ctrlOptions.CacheSyncTimeout = *globalOpts.CacheSyncTimeout    }    // 根据GVK获取控制器名    controllerName := blder.getControllerName(gvk)    // Setup the logger.    // 设置日志 Logger    if ctrlOptions.LogConstructor == nil {        log = blder.mgr.GetLogger().WithValues(            "controller", controllerName,            "controllerGroup", gvk.Group,            "controllerKind", gvk.Kind,        )        lowerCamelCaseKind := strings.ToLower(gvk.Kind[:1]) + gvk.Kind[1:]        ctrlOptions.LogConstructor = func(req *reconcile.Request) logr.Logger {            log := log            if req != nil {                log = log.WithValues(                    lowerCamelCaseKind, klog.KRef(req.Namespace, req.Name),                    "namespace", req.Namespace, "name", req.Name,                )            }            return log        }    }    // Build the controller and return.    // 构造 Controller    // var newController = controller.New    blder.ctrl, err = newController(controllerName, blder.mgr, ctrlOptions)    return err}上面的函数通过获取资源对象的 GVK 来获取 Controller 的名称,末了通过一个 newController 函数(controller.New 的别名)来实例化一个真正的 Controller:
// pkg/builder/controller.go// New returns a new Controller registered with the Manager.  The Manager will ensure that shared Caches have// been synced before the Controller is Started.// New 返回一个在 Manager 注册的 Controller// Manager 将确保共享缓存在控制器启动前已经同步func New(name string, mgr manager.Manager, options Options) (Controller, error) {    c, err := NewUnmanaged(name, mgr, options)    if err != nil {        return nil, err    }    // Add the controller as a Manager components    // 将 controller 作为 manager 的组件    return c, mgr.Add(c)}// NewUnmanaged returns a new controller without adding it to the manager. The// caller is responsible for starting the returned controller.// NewUnmanaged 返回一个新的控制器,而不将其添加到 manager 中// 调用者负责启动返回的控制器func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller, error) {    if options.Reconciler == nil {        return nil, fmt.Errorf("must specify Reconciler")    }    if len(name) == 0 {        return nil, fmt.Errorf("must specify Name for Controller")    }    if options.LogConstructor == nil {        log := mgr.GetLogger().WithValues(            "controller", name,        )        options.LogConstructor = func(req *reconcile.Request) logr.Logger {            log := log            if req != nil {                log = log.WithValues(                    "object", klog.KRef(req.Namespace, req.Name),                    "namespace", req.Namespace, "name", req.Name,                )            }            return log        }    }    if options.MaxConcurrentReconciles <= 0 {        options.MaxConcurrentReconciles = 1    }    if options.CacheSyncTimeout == 0 {        options.CacheSyncTimeout = 2 * time.Minute    }    if options.RateLimiter == nil {        options.RateLimiter = workqueue.DefaultControllerRateLimiter()    }    // Inject dependencies into Reconciler    // 在 Reconciler 中注入依靠关系    if err := mgr.SetFields(options.Reconciler); err != nil {        return nil, err    }    // Create controller with dependencies set    // 创建 Controller 并设置依靠关系    return &controller.Controller{        Do: options.Reconciler,        MakeQueue: func() workqueue.RateLimitingInterface {            return workqueue.NewNamedRateLimitingQueue(options.RateLimiter, name)        },        MaxConcurrentReconciles: options.MaxConcurrentReconciles,        CacheSyncTimeout:        options.CacheSyncTimeout,        SetFields:               mgr.SetFields,        Name:                    name,        LogConstructor:          options.LogConstructor,        RecoverPanic:            options.RecoverPanic,    }, nil}可以看到NewUnmanaged函数才是真正实例化 Controller 的地方,终于和前文的 Controller 联系起来来,Controller 实例化完成后,又通过 mgr.Add(c) 函数将控制器添加到 Manager 中去举行管理,所以我们还必要去检察下 Manager 的 Add 函数的实现,固然是看 controllerManager 中的具体实现:
// pkg/manager/manager.go// Runnable allows a component to be started.// It's very important that Start blocks until// it's done running.// Runnable 允许一个组件被启动type Runnable interface {    // Start starts running the component.  The component will stop running    // when the context is closed. Start blocks until the context is closed or    // an error occurs.    Start(context.Context) error}//  pkg/manager/internal.go// Add sets dependencies on i, and adds it to the list of Runnables to start.// Add 设置i的依靠,并将其他添加到 Runnables 列表启动func (cm *controllerManager) Add(r Runnable) error {    cm.Lock()    defer cm.Unlock()    return cm.add(r)}func (cm *controllerManager) add(r Runnable) error {    // Set dependencies on the object    // 设置对象的依靠    if err := cm.SetFields(r); err != nil {        return err    }    return cm.runnables.Add(r)}// pkg/manager/runnable_group.go// Add adds a runnable to closest group of runnable that they belong to.//// Add should be able to be called before and after Start, but not after StopAndWait.// Add should return an error when called during StopAndWait.// The runnables added before Start are started when Start is called.// The runnables added after Start are started directly.// Add将runnable添加到它们所属的迩来的runnable组。// Add应该可以或许在Start之前和之后调用,但不能在StopAndWait之后调用。// 在StopAndWait期间调用Add时应返回错误。// 调用Start时,启动在Start之前添加的可运行项。// 启动后添加的可运行项直接启动。func (r *runnables) Add(fn Runnable) error {    switch runnable := fn.(type) {    case hasCache:        return r.Caches.Add(fn, func(ctx context.Context) bool {            return runnable.GetCache().WaitForCacheSync(ctx)        })    case *webhook.Server:        return r.Webhooks.Add(fn, nil)    case LeaderElectionRunnable:        if !runnable.NeedLeaderElection() {            return r.Others.Add(fn, nil)        }        return r.LeaderElection.Add(fn, nil)    default:        return r.LeaderElection.Add(fn, nil)    }}controllerManager 的 Add 函数转达的是一个 Runnable 参数,Runnable 是一个接口,用来表现可以启动的一个组件,而恰好 Controller 实际上就实现了这个接口的 Start 函数,所以可以通过 Add 函数来添加 Controller 实例,在 Add 函数中除了依靠注入之外,还根据 Runnable 来判定组件是否支持推选功能,支持则将组件加入到 leaderElectionRunnables 列表中,否则加入到 nonLeaderElectionRunnables 列表中,这点非常告急,涉及到后面控制器的启动方式。
启动过Manager

如果 Manager 已经启动了,如今调用 Add 函数来添加 Runnable,则必要立刻调用 startRunnable 函数启动控制器,startRunnable 函数就是在一个 goroutine 中去调用 Runnable 的 Start 函数,这里就相当于调用 Controller 的 Start 函数来启动控制器了。
到这里就实例化 Controller 完成了,回到前面 Builder 的 build 函数中,doController 函数调用完成,接着是 doWatch 函数的实现:
// pkg/builder/controller.gofunc (blder *Builder) doWatch() error {    // Reconcile type    // 调协范例    typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)    if err != nil {        return err    }    src := &source.Kind{Type: typeForSrc}    hdler := &handler.EnqueueRequestForObject{}    allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)    if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {        return err    }    // Watches the managed types    // Watches 管理的范例(子范例)    for _, own := range blder.ownsInput {        typeForSrc, err := blder.project(own.object, own.objectProjection)        if err != nil {            return err        }        src := &source.Kind{Type: typeForSrc}        hdler := &handler.EnqueueRequestForOwner{            OwnerType:    blder.forInput.object,            IsController: true,        }        allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)        allPredicates = append(allPredicates, own.predicates...)        if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {            return err        }    }    // Do the watch requests    // 实行 watch 哀求    for _, w := range blder.watchesInput {        allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)        allPredicates = append(allPredicates, w.predicates...)        // If the source of this watch is of type *source.Kind, project it.        if srckind, ok := w.src.(*source.Kind); ok {            typeForSrc, err := blder.project(srckind.Type, w.objectProjection)            if err != nil {                return err            }            srckind.Type = typeForSrc        }        if err := blder.ctrl.Watch(w.src, w.eventhandler, allPredicates...); err != nil {            return err        }    }    return nil}上面的 doWatch 函数就是去将我们必要调谐的资源对象放到 Controller 中举行 Watch 操纵,包罗资源对象管理的子范例,都必要去实行 Watch 操纵,这就又回到了前面 Controller 的 Watch 操纵了,着实就是去注册 Informer 的变乱监听器,将数据添加到工作队列中去。如许到这里我们就将 Controller 初始化完成,并为我们调谐的资源对象实行了 Watch 操纵。
末了是调用 Manager 的 Start 函数来启动 Manager,由于上面我们已经把 Controller 添加到了 Manager 中,所以这里启动着实是启动关联的 Controller,启动函数实现如下所示:
// pkg/manager/internal.go// Start starts the manager and waits indefinitely.// There is only two ways to have start return:// An error has occurred during in one of the internal operations,// such as leader election, cache start, webhooks, and so on.// Or, the context is cancelled.// Start 启动管理器并无穷期等候// 只有两种环境让Start 返回:// 在此中一个内部操纵中发生错误// 比方领导人推选、cache start、webhooks等等。// 大概 context 取消func (cm *controllerManager) Start(ctx context.Context) (err error) {    // 判定是否启动,如果已经启动,则直接返回    cm.Lock()    if cm.started {        cm.Unlock()        return errors.New("manager already started")    }    var ready bool    defer func() {        // Only unlock the manager if we haven't reached        // the internal readiness condition.        if !ready {            cm.Unlock()        }    }()    // Initialize the internal context.    // 初始化内部的 context    cm.internalCtx, cm.internalCancel = context.WithCancel(ctx)    // This chan indicates that stop is complete, in other words all runnables have returned or timeout on stop request    // 此chan表现制止已完成,换句话说,所有可运行步调都已返回或在制止哀求时超时    stopComplete := make(chan struct{})    defer close(stopComplete)    // This must be deferred after closing stopComplete, otherwise we deadlock.    // stopComplete 关闭后必须在 defer 实行下面的操纵,否则会出现死锁    defer func() {        // https://hips.hearstapps.com/hmg-prod.s3.amazonaws.com/images/gettyimages-459889618-1533579787.jpg        stopErr := cm.engageStopProcedure(stopComplete)        if stopErr != nil {            if err != nil {                // Utilerrors.Aggregate allows to use errors.Is for all contained errors                // whereas fmt.Errorf allows wrapping at most one error which means the                // other one can not be found anymore.                err = kerrors.NewAggregate([]error{err, stopErr})            } else {                err = stopErr            }        }    }()    // Add the cluster runnable.    // 添加集群 runnable    if err := cm.add(cm.cluster); err != nil {        return fmt.Errorf("failed to add cluster to runnables: %w", err)    }    // Metrics should be served whether the controller is leader or not.    // (If we don't serve metrics for non-leaders, prometheus will still scrape    // the pod but will get a connection refused).    // Metrics 服务    if cm.metricsListener != nil {        cm.serveMetrics()    }    // Serve health probes.    // 康健查抄    if cm.healthProbeListener != nil {        cm.serveHealthProbes()    }    // First start any webhook servers, which includes conversion, validation, and defaulting    // webhooks that are registered.    //    // WARNING: Webhooks MUST start before any cache is populated, otherwise there is a race condition    // between conversion webhooks and the cache sync (usually initial list) which causes the webhooks    // to never start because no cache can be populated.    if err := cm.runnables.Webhooks.Start(cm.internalCtx); err != nil {        if !errors.Is(err, wait.ErrWaitTimeout) {            return err        }    }    // Start and wait for caches.    // 启动并等候缓存同步    if err := cm.runnables.Caches.Start(cm.internalCtx); err != nil {        if !errors.Is(err, wait.ErrWaitTimeout) {            return err        }    }    // Start the non-leaderelection Runnables after the cache has synced.    if err := cm.runnables.Others.Start(cm.internalCtx); err != nil {        if !errors.Is(err, wait.ErrWaitTimeout) {            return err        }    }    // Start the leader election and all required runnables.    {        ctx, cancel := context.WithCancel(context.Background())        cm.leaderElectionCancel = cancel        go func() {            if cm.resourceLock != nil {                if err := cm.startLeaderElection(ctx); err != nil {                    cm.errChan <- err                }            } else {                // Treat not having leader election enabled the same as being elected.                if err := cm.startLeaderElectionRunnables(); err != nil {                    cm.errChan <- err                }                close(cm.elected)            }        }()    }    ready = true    cm.Unlock()    select {    case <-ctx.Done():        // We are done        return nil    case err := <-cm.errChan:        // Error starting or running a runnable        return err    }}上面的启动函数着实就是去启动前面我们加入到 Manager 中的 Runnable(Controller),非 LeaderElection 的列表与 LeaderElection 的列表都分别在一个 goroutine 中启动:
// pkg/manager/runnable_group.go// Start starts the group and waits for all// initially registered runnables to start.// It can only be called once, subsequent calls have no effect.// Start启动组并等候所有最初注册的可运行步调启动。// 只能调用一次,后续调用无效。func (r *runnableGroup) Start(ctx context.Context) error {    var retErr error    r.startOnce.Do(func() {        defer close(r.startReadyCh)        // Start the internal reconciler.        go r.reconcile()        // Start the group and queue up all        // the runnables that were added prior.        r.start.Lock()        r.started = true        for _, rn := range r.startQueue {            rn.signalReady = true            r.ch <- rn        }        r.start.Unlock()        // If we don't have any queue, return.        if len(r.startQueue) == 0 {            return        }        // Wait for all runnables to signal.        for {            select {            case <-ctx.Done():                if err := ctx.Err(); !errors.Is(err, context.Canceled) {                    retErr = err                }            case rn := <-r.startReadyCh:                for i, existing := range r.startQueue {                    if existing == rn {                        // Remove the item from the start queue.                        r.startQueue = append(r.startQueue[:i], r.startQueue[i+1:]...)                        break                    }                }                // We're done waiting if the queue is empty, return.                if len(r.startQueue) == 0 {                    return                }            }        }    })    return retErr}可以看到终极还是去调用的 Runnable 的 Start 函数来启动,这里着实也就是 Controller 的 Start 函数,这个函数相当于启动一个控制循环不绝从工作队列中消费数据,然后给到一个 Reconciler 接口举行处理惩罚,也就是我们要去实现的 Reconcile(Request) (Result, error) 这个业务逻辑函数。
到这里我们就完成了 Manager 的整个启动过程,包罗 Manager 是如何初始化,如何和 Controller 举行关联以及如何启动 Controller 的。
您需要登录后才可以回帖 登录 | 立即注册

Powered by CangBaoKu v1.0 小黑屋藏宝库It社区( 冀ICP备14008649号 )

GMT+8, 2025-1-19 10:30, Processed in 0.210059 second(s), 32 queries.© 2003-2025 cbk Team.

快速回复 返回顶部 返回列表