controller-runtime是通过informer监控apiserver来获取资源的变动。
接下来我会使用kubebuilder构建一个demo,根据这个demo进行深入解析controller-runtime
Demo创建kubebuilder官网文档 (具体操作还请参考官网文档,我只写出几个我认为需要说明一下的地方)
安装kubebuilder 一定要给kubebuilder 运行权限
创建并初始化项目
kubebuilder init --domain my.domain --repo my.domain/guestbook# 这里官网有注意事项说明,请移步官网# 这里一般不需要配置东西直接下一步即可
创建API
kubebuilder create api --group webapp --version v1 --kind Guestbook# kind 后面 第一个字母要大写否则报错# 调用这一步后,除了crd YAMl文件没有生成以外其余基本上都已生成,这时候如果想自定义crd,那么仅修改api/v1/ 下面的 *_type.go文件以及相关联的go文件即可
生成CRD
make manifests
将CRD安装到集群
make install
运行生成的控制器
make run# 如果想实现控制器的功能,需要修改controllers/*_controller.go 文件里面的 Reconcile 方法即可# 如果想在方法里面操控别的资源需要添加权限,在该文件里面添加注释即可# 比如//+kubebuilder:rbac:groups=webapp.my.domain,resources=guestbooks,verbs=get;list;watch;create;update;patch;delete 就是对guestbooks 资源查询创建等权限
启动步骤那么接下来让我们讲解一下控制器run后运行吧
controller-runtime启动步骤大体如下
创建Manager
创建controller
如果想使用webhook,则创建webhook。
添加心跳检查 (这里可以可以实现自己的函数)
添加就绪检查
开始启动manager
注意:这里有一个类非常重要 Scheme,该类提供了GVK到TYPE,TYPE到GVK的功能,可以调用runtime.NewScheme()进行获取(可以理解成,序列化GVK与反序列化)
Manager 的创建调用NewManager
ctrl.NewManager(config *rest.Config, options Options)// 第一参数记录了client所需要的配置信息比如Host,Username等。读取--kubeconfig 上的配置,如果没有会尝试集群里的kubconfig(Home/.kube/config),都没有会读取默认配置// 第二个参数是创建Manager的配置信息// 第二参数 在看生成的代码的时候我们会发现有这么一个配置LeaderElectionID: "ecaf1259.my.domain" 这个配置就是竞选时的资源锁名称,leader竞争时默认使用configmap资源作为资源锁,那么我们调用kubectl get configmap -n guestbook-system 时就可以看到这个ecaf1259.my.domain 资源
2.进入NewManager
func New(config *rest.Config, options Options) (Manager, error) {// 对options进行填充工作,没有配置的会添加默认值options = setOptionsDefaults(options) // 创建cluser 并且创建 cache 绑定informer // 由于内容过长该函数参数不展示cluster, err := cluster.New()// 获取leader配置文件 如果没有就使用默认配置 可以通过启动时配置leaderConfig := options.LeaderElectionConfigif leaderConfig == nil {leaderConfig = rest.CopyConfig(config)} // 竞选锁 resourceLock, err := options.newResourceLock(leaderConfig, recorderProvider, leaderelection.Options{ //是否启用leader选举LeaderElection: options.LeaderElection, // 资源锁 类型 默认为 configmapsleasesLeaderElectionResourceLock: options.LeaderElectionResourceLock, //决定使用那个资源锁进行选举LeaderElectionID: options.LeaderElectionID, // 资源命名空间LeaderElectionNamespace: options.LeaderElectionNamespace,})if err != nil {return nil, err}// 创建MetricsListenermetricsListener, err := options.newMetricsListener(options.MetricsBindAddress)if err != nil {return nil, err}// 当访问Metrics时调用的handlersmetricsExtraHandlers := make(map[string]http.Handler)// 同理上healthProbeListener, err := options.newHealthProbeListener(options.HealthProbeBindAddress)if err != nil {return nil, err}errChan := make(chan error)runnables := newRunnables(errChan)return &controllerManager}
Controller创建首先创建GuestbookReconciler ,里面两个参数在上面都已经创建完成,client是创建manager时创建,而Scheme,是定义的全局变量
调用SetupWithManager ,使用了Builder模式,所以让我们深入研究一下吧
// SetupWithManager sets up the controller with the Manager.func (r *GuestbookReconciler) SetupWithManager(mgr ctrl.Manager) error {return ctrl.NewControllerManagedBy(mgr).For(&webappv1.Guestbook{}).Complete(r)}
调用NewControllerManagedBy 创建一个Builder
调用FOR 初始化 Builder
开始Complete进行Controller创建
该方法里我们只需要关注两个方法即可doController,doWatch。从字面意思行我们就能知道一个是控制器,一个是监视器。
func (blder *Builder) Complete(r reconcile.Reconciler) error { _, err := blder.Build(r) return err}func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, error) { if err := blder.doController(r); err != nil { return nil, err } if err := blder.doWatch(); err != nil { return nil, err } return blder.ctrl, nil}
创建Controller,首先获取配置(配置信息在下面,这里不再讲解)然后将配置进行初始化操作如果配置没有进行设置的话。
func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller, error) { *** return &controller.Controller{ //此处 do的函数就是我们配置的ReconcilerDo: options.Reconciler, // 监听CRD资源的CUD事件缓冲队列MakeQueue: func() workqueue.RateLimitingInterface {return workqueue.NewNamedRateLimitingQueue(options.RateLimiter, name)},MaxConcurrentReconciles: options.MaxConcurrentReconciles,CacheSyncTimeout: options.CacheSyncTimeout,SetFields: mgr.SetFields,Name: name,Log: options.Log.WithName("controller").WithName(name),RecoverPanic: options.RecoverPanic,}, nil}
doWatch添加CRD资源监视以及与Controller相关的监视。
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {return err}// Watch 里面就是使用informer为资源添加监控事件// allPredicates过滤器
Manager启动在控制器创建完成与manager启动之前中间还有一些调用,主要是添加就绪handler与健康检查handler 有默认值
启动serveMetrics
启动serveHealthProbes
如果配置了Webhooks就启动
Controller启动,轮询事件队列,读取调用Reconcile
for i := 0; i < c.MaxConcurrentReconciles; i++ { go func() { defer wg.Done() // Run a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the reconcileHandler is never invoked concurrently with the same object. for c.processNextWorkItem(ctx) { } }()}func (c *Controller) processNextWorkItem(ctx context.Context) bool {obj, shutdown := c.Queue.Get()if shutdown {// Stop workingreturn false}// We call Done here so the workqueue knows we have finished// processing this item、We also must remember to call Forget if we// do not want this work item being re-queued、For example, we do// not call Forget if a transient error occurs, instead the item is// put back on the workqueue and attempted again after a back-off// period.defer c.Queue.Done(obj)ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(1)defer ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(-1)c.reconcileHandler(ctx, obj)return true}
watch启动
//func (m *InformersMap) Start(ctx context.Context) error {go m.structured.Start(ctx)go m.unstructured.Start(ctx)go m.metadata.Start(ctx)<-ctx.Done()return nil}func (ip *specificInformersMap) Start(ctx context.Context) {func() {ip.mu.Lock()defer ip.mu.Unlock()// Set the stop channel so it can be passed to informers that are added laterip.stop = ctx.Done()// 开始循环启动informerfor _, informer := range ip.informersByGVK {go informer.Informer.Run(ctx.Done())}ip.started = trueclose(ip.startWait)}()<-ctx.Done()}// 运行informerfunc (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {defer utilruntime.HandleCrash()if s.HasStarted() {klog.Warningf("The sharedIndexInformer has started, run more than once is not allowed")return} // 创建fifofifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{KnownObjects: s.indexer,EmitDeltaTypeReplaced: true,}) // 创建配置cfg := &Config{Queue: fifo, // list,watchListerWatcher: s.listerWatcher,ObjectType: s.objectType,FullResyncPeriod: s.resyncCheckPeriod,RetryOnError: false,ShouldResync: s.processor.shouldResync, // 接受fifo队列进行处理的方法Process: s.HandleDeltas,WatchErrorHandler: s.watchErrorHandler,}func() {s.startedLock.Lock()defer s.startedLock.Unlock()s.controller = New(cfg)s.controller.(*controller).clock = s.clocks.started = true}()// Separate stop channel because Processor should be stopped strictly after controllerprocessorStopCh := make(chan struct{})var wg wait.Groupdefer wg.Wait() // Wait for Processor to stopdefer close(processorStopCh) // Tell Processor to stopwg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)wg.StartWithChannel(processorStopCh, s.processor.run)defer func() {s.startedLock.Lock()defer s.startedLock.Unlock()s.stopped = true // Don't want any new listeners}() // 开始运行CacheControllers.controller.Run(stopCh)}func (c *controller) Run(stopCh <-chan struct{}) {// 创建NewReflectorr := NewReflector(c.config.ListerWatcher,c.config.ObjectType,c.config.Queue,c.config.FullResyncPeriod,) // 开始list watch 并将事件添加到fifo中 注意这个是上面创建的那个fifowg.StartWithChannel(stopCh, r.Run) wait.Until(c.processLoop, time.Second, stopCh)wg.Wait()}//循环取出fifo中的值并调用PopProcessFuncfunc (c *controller) processLoop() {for {obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) }}// PopProcessFunc 实际上调用的是shared_informer.HandleDeltasfunc (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {s.blockDeltas.Lock()defer s.blockDeltas.Unlock()// from oldest to newestfor _, d := range obj.(Deltas) {switch d.Type {case Sync, Replaced, Added, Updated:s.cacheMutationDetector.AddObject(d.Object)if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { // 对indexer 的操作if err := s.indexer.Update(d.Object); err != nil {return err}isSync := falseswitch {case d.Type == Sync:// Sync events are only propagated to listeners that requested resyncisSync = truecase d.Type == Replaced:if accessor, err := meta.Accessor(d.Object); err == nil {if oldAccessor, err := meta.Accessor(old); err == nil {// Replaced events that didn't change resourceVersion are treated as resync events// and only propagated to listeners that requested resyncisSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()}}} // 通知监视者 s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)} else {if err := s.indexer.Add(d.Object); err != nil {return err}s.processor.distribute(addNotification{newObj: d.Object}, false)}case Deleted:if err := s.indexer.Delete(d.Object); err != nil {return err}s.processor.distribute(deleteNotification{oldObj: d.Object}, false)}}return nil}
leader选举操作
对于Even重试
ctx = logf.IntoContext(ctx, log)// 这里就是开始调用自定义Reconcile方法result, err := c.Reconcile(ctx, req)switch {case err != nil: // 这里开始进行重试,将请求重新添加到队列中,但是由于限速等机制,会放入一个等待队列,当等待时间结束会放进正式队列c.Queue.AddRateLimited(req)}// 重试时间间隔计算方法是 0.05s * 2 ^ exp 其中exp是重试次数backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2,float64(exp))// 这里就是将重试时间与当前时间相加后与当前请求封装传入等待队列q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}
配置介绍 Manager启动时的配置文件ctrl.Optionstype Options struct {//将gvk转runtime.Object 使用 runtime.NewScheme() 可以获取Scheme *runtime.Scheme// MapperProvider provides the rest mapper used to map go types to Kubernetes APIsMapperProvider func(c *rest.Config) (meta.RESTMapper, error)//资源轮询的最小时间间隔SyncPeriod *time.Duration//日志 有默认Logger logr.Logger//是否启用leader选举LeaderElection bool//决定使用那个资源锁进行选举LeaderElectionID string//自定义选举配置LeaderElectionConfig *rest.Config//是否自愿变为从节点LeaderElectionReleaseOnCancel bool// 租约时间间隔LeaseDuration *time.Duration// leader持有锁时间RenewDeadline *time.Duration// 其它副本重试(竞争leader)时间间隔RetryPeriod *time.Duration// 命名空间限制管理器缓存观察的对象Namespace string// 配置Metrics 地址MetricsBindAddress string// 配置HealthProbe 地址HealthProbeBindAddress string// 就绪探测端点名称,默认为“readyz”ReadinessEndpointName string// Liveness 探测端点名称,默认为“healthz”LivenessEndpointName string// webhook 服务器服务的端口Port int// webhook 服务器绑定到的主机名Host string// CertDir 是包含服务器密钥和证书的目录CertDir string// WebhookServer 是一个外部配置的 webhook.Server // 该配置优先于上面几个配置WebhookServer *webhook.Server// 创建缓存的函数 有默认值NewCache cache.NewCacheFunc// 创建client 的函数NewClient cluster.NewClientFunc// 对于一些资源关闭缓存ClientDisableCacheFor []client.Object// 指定客户端是否应配置为强制执行DryRunClient bool// 关闭前的缓冲时间 有默认值GracefulShutdownTimeout *time.Duration// Controller 全局配置Controller v1alpha1.ControllerConfigurationSpec { //等待同步缓存的时间限制。 默认2分钟 可选 CacheSyncTimeout *time.Duration }}
创建Controller时配置// 可以运行的最大并发协调数,默认为1MaxConcurrentReconciles int// 资源修改触发函数,也是在这个里面进行逻辑编写Reconciler reconcile.Reconciler// 限制请求的频率RateLimiter ratelimiter.RateLimiter// 日志Log logr.Logger// 同步缓存等待时间 默认2分钟CacheSyncTimeout time.Duration// 是否恢复由异常引起的panic(应该是发生异常后是否继续)RecoverPanic bool