As a component of the K8S master, the controller manager is responsible for the startup and termination of many controllers. These controllers are responsible for monitoring various resources in k8s and performing tuning, so that their actual state can constantly approach the expected state. These controllers include server controller, nodecontroller, and deployment controller
Wait. The CRD also needs to be equipped with a controller. The controller of the CRD also needs to be started and stopped by a controller manager. The whole process is long, so I will split it into several articles. Through this series, the first article will introduce how to define a controller manager to start the controller under its jurisdiction and implement the simplest controller. The second part implements a more standard controller and introduces the structure of informer; The last chapter will introduce how to implement a CRD controller without scaffold.
Introduce the structure of the whole project
controller-demo |---api //struct used to define various attributes of CRD |---v1 |---client |---versiond |----scheme //scheme for storing CRD |----typed //It is used to store the client corresponding to CRD |---controller //Used to store all controller s |---informers //It is used to store information, including each apiGroup, each version and a factory |---ecsbind/v1 //One of the apigroups, one of the version informer s, is of course the only one |---internalinterfaces //interface of informer |---listers //Used to store lister s, including various apigroups and version s |---ecsbind/v1 //One apiGroup and one version informer are also the only one
controller-manager
Controller has two functions. One is responsible for calling the main function to start controller manager as the entry of controller manager; The other is used to start all controllers he manages.
The Run function called by the main function is defined as follows
func Run(stopCh <-chan struct{}) error { run :=func(stopCh <-chan struct{}){ err := StartController(stopCh) if err != nil { glog.Fatalf("error running service controllers: %v", err) } select {} } ///Ignore the logic related to leader election ...... run(stopCh) panic("unreachable") }
The above function is passed into a channel to pass a termination signal to each controller. A run function is defined in the function to call StartController. The reason why a run function needs to be defined is that although generally such components run multiple copies for high availability, only one copy is actually running, Other replicas are running as standby, and the real running replica is called leader. The process of competing for resources from ordinary replicas is called leader election. Only the leader hangs up, and the remaining replicas are elected as new leaders. Of course, the leader election mode can also be run. Therefore, the run function should include whether to conduct leader election. If so, the logic of leader election will be executed, and the run function will be executed only when the leader is selected; If there is no leader election, run directly. However, this logic is omitted.
Another function of controller manager is to actually start each controller. StartController also receives the channel transmitted from the Run function, which is finally transferred to each controller to transmit the stop signal. Each controller will be constructed in the function, and the controller will be started by opening up a coroutine and calling the Run method of the controller. The code is as follows
unc StartController(stopCh <-chan struct{}) error { cfg, err := clientcmd.BuildConfigFromFlags("", "/root/.kube/config") if err != nil { glog.Fatalf("error building kubernetes config:%s", err.Error()) } kubeClient, err := kubernetes.NewForConfig(cfg) factory := informers.NewSharedInformerFactory(kubeClient, 0) podInformer:=factory.Core().V1().Pods() pc:=controller.NewPodController(kubeClient,podInformer,"k8s-cluster") go pc.Run(stopCh) factory.Start(stopCh) return nil }
Controlle r. Newpodcontroller is a kubeclient required to construct a PodController. informer needs to be constructed in advance. For k8s original resources, the information can be obtained through sharedintermerfactory. After executing pc.Run(stopCh) through the collaboration process, you also need to execute factory.Start(stopCh). Factory. Start can only be executed after each controller runs. Otherwise, the corresponding control will have no running effect.
A thin Controller
The function of this controller is to count the number of all pods in the cluster, and then write the total number of pods to a label of the master, and the counted pods will generate a new record in its event to indicate that this pod has been counted.
List the fields of this podcontroller structure
type PodController struct { kubeClient kubernetes.Interface //Used to label the master clusterName string podLister corelisters.PodLister //It is used to obtain the monitored pod resources podListerSynced cache.InformerSynced //Used to synchronize cache broadcaster record.EventBroadcaster //Used to broadcast events recorder record.EventRecorder //event used to record pod }
The constructor of Controller is as follows
func NewPodController(kubeClient kubernetes.Interface,podInformer coreinformers.PodInformer,clusterName string)*PodController { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "pod_controller"}) rc:=&PodController{ kubeClient:kubeClient, clusterName:clusterName, podLister:podInformer.Lister(), podListerSynced:podInformer.Informer().HasSynced, broadcaster:eventBroadcaster, recorder:recorder, } return rc }
Among the various attributes of the controller, except that the broadcaster and recorder are self constructed, the rest are passed in through parameters. Since event recording is required in the controller, the recorder provides this function. However, if an event is triggered and needs to be disseminated to subscribers, a broadcaster is required. This involves the event mechanism of k8s and is not intended to be described in detail. The event broadcast has been bound to glog.Infof in the constructor, that is, the recorder triggers the event and outputs the event information in the log. This broadcaster will be used later when running the controller.
The startup method of the whole Controller is as follows
func (p *PodController)Run(stopCh <-chan struct{}) { glog.Info("Starting pod controller\n") defer glog.Info("Shutting down pod controller\n") if !controller.WaitForCacheSync("pod", stopCh, p.podListerSynced) { return } if p.broadcaster != nil { p.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(p.kubeClient.CoreV1().RESTClient()).Events("")}) } go wait.NonSlidingUntil(func() { if err := p.reconcilePods(); err != nil { glog.Errorf("Couldn't reconcile pod: %v", err) } }, metav1.Duration{Duration: 10 * time.Second}.Duration, stopCh) <-stopCh }
WaitForCacheSync is used to synchronize the cache of specified resources, which will be used later. In case of synchronization failure, the controller will not run
StartRecordingToSink is used to broadcast events to the apiserver. If it is not executed here, even if the recorder triggers the event, the apiserver does not receive the event, and the final event information is not saved to the corresponding pod. When we pass kubectl describe po, we will not see the corresponding event record
By starting a coroutine to periodically execute the reconcilePods() method, the NonSlidingUntil function executes the func passed in immediately after being called, and then repeats it every 10 seconds until it receives the termination signal from the stopCh channel.
Finally, the current coroutine is blocked by waiting to receive the signal from stopCh, thus preventing the completion of the call of this function
The general logic of the reconcilePods method is as described above. After multiple calls, get all the pods from the lister, traverse each pod, print the namespace of the pod and the name of the pod, find the master node in the cluster through the label selector, type the number of pods on the label named hopegi / pod count, and finally add a pod count is n to the event event of each pod (this n is the total number of pods) such records.
func (p *PodController)reconcilePods()error { glog.Infof("reconcilePods ") pods,err:= p.podLister.List(labels.Everything()) if err!=nil{ return fmt.Errorf("error listing pods: %v", err) } return p.reconcile(pods) } func (p *PodController)reconcile(pods []*v1.Pod)error { glog.Infof("reconcile pods") for _,pod :=range pods{ fmt.Printf("pod name is %s.%s \n",(*pod).Namespace,(*pod).Name) } nodes,err:= p.kubeClient.CoreV1().Nodes().List(metav1.ListOptions{LabelSelector:"node-role.kubernetes.io/master"}) if err!=nil{ glog.Infof("get master error %v\n",err) return err } for _,n:=range nodes.Items{ n.Labels["hopegi/pod-count"]=fmt.Sprintf("%d",len(pods)) _,err= p.kubeClient.CoreV1().Nodes().Update(&n) if err!=nil{ glog.Infof("label node error:%v ",err) } } if p.recorder!=nil { msg:=fmt.Sprintf("pod count is %d",len(pods)) for _, pod := range pods { p.recorder.Eventf(&v1.ObjectReference{ Kind:"Pod", Name:pod.Name, UID:pod.UID, Namespace:pod.Namespace, },v1.EventTypeNormal,"SuccessCalculatePod",msg) } } return nil }
The lister is used to obtain all pods in the cluster instead of kubeclient. The difference is that it is the informer of a K8S resource (POD in this example) , there is a cache of corresponding resources in it. This cache is consistent with the pod data stored in the cluster under the listAndWatch mechanism. This listAndWatch will be introduced in the next article. The latter sends a request to the apiserver every time it accesses. Many controller s communicate with the apiserver frequently, which will be overwhelmed and consume a lot of network resources , acquisition efficiency is also low.