Source Code Analysis of Garbage Collector in kubernetes Garbage Recycler

kubernetes version: 1.13.2

background

Because of the redis cluster created by the operator, the redis cluster is deleted abnormally (including the redis exporter statefullset and redis statefullset) after the kubernetes APIs server is restarted. After deletion, the operator rebuilds and reorganizes the cluster, and the instance IP changes (middleware containerization, we developed fixed IP, when statefullset is deleted, the IP will be recycled), resulting in the failure of creating the cluster, and finally the cluster is unavailable.

After repeated operations, after apserver restarts, no active deletion of the redis cluster (redis stateful set) or the monitoring instance (redis exporter) is found by querying the redis operator log. Further check the Kube controller manager log, set its log level to -- v=5, and continue to reproduce. Finally, the following logs are found in the Kube controller manager log:

You can see that garbage collector triggers the deletion operation. This problem doesn't exist in the normal time of apiserver. To find out, you need to look at the logic of the controller, the built-in component of Kube controller manager, garbage collector.

As the content is too long, it can be divided into several sections:
①. As a producer, monitors put the changed resources into the graphChanges queue. At the same time, restMapper regularly detects the resource types in the cluster and refreshes monitors.
2. runProcessGraphChanges takes the changed item s from the graphChanges queue and puts them into the attemptToDelete queue according to the situation; runAttemptToDeleteWorker takes out the garbage resources for processing;
③. runProcessGraphChanges takes the changed item s from the graphChanges queue and puts them into the attemptToOrphan queue according to the situation; runattempttoorphan worker takes out and processes the isolated resources;

text

To enable GC, you need to set enable garbage collector to true in the startup parameters of Kube API server and Kube controller manager, and enable GC by default in version 1.13.2.

Note: the parameters of the two components must be synchronized.

Kube controller manager startup entry, load controller manager default startup parameters in app.NewControllerManagerCommand(), create * cobra.Command object:

func main() {
        rand.Seed(time.Now().UnixNano())
        //Load the default startup parameter of controller manager to create the * cobra.Command object
        command := app.NewControllerManagerCommand()
        //Omit...
        //Execute cobra.command and start controller manager
        if err := command.Execute(); err != nil {
            fmt.Fprintf(os.Stderr, "%v\n", err)
            os.Exit(1)
        }
}

Start Kube controller manager in the following code:

New Default Component Config (ports. Insecure Kube Controller Manager Port) loads the configuration of each controller:

//New kubecontrollermanagerioptions create a new kubecontrollermanagerioptions using the default configuration
func NewKubeControllerManagerOptions() (*KubeControllerManagerOptions, error) {
    //Load default configuration for each controller
    componentConfig, err := NewDefaultComponentConfig(ports.InsecureKubeControllerManagerPort)
    if err != nil {
        return nil, err
    }

    s := KubeControllerManagerOptions{
        Generic:         cmoptions.NewGenericControllerManagerConfigurationOptions(componentConfig.Generic),
        //Omission
        GarbageCollectorController: &GarbageCollectorControllerOptions{
            ConcurrentGCSyncs:      componentConfig.GarbageCollectorController.ConcurrentGCSyncs,
            EnableGarbageCollector: componentConfig.GarbageCollectorController.EnableGarbageCollector,
        },
        //Omission
    }
    //List of resource objects ignored by gc
    gcIgnoredResources := make([]kubectrlmgrconfig.GroupResource, 0, len(garbagecollector.DefaultIgnoredResources()))
    for r := range garbagecollector.DefaultIgnoredResources() {
        gcIgnoredResources = append(gcIgnoredResources, kubectrlmgrconfig.GroupResource{Group: r.Group, Resource: r.Resource})
    }
    s.GarbageCollectorController.GCIgnoredResources = gcIgnoredResources
    return &s, nil
}
// NewDefaultComponentConfig returns the Kube controller manager configuration object
func NewDefaultComponentConfig(insecurePort int32) (kubectrlmgrconfig.KubeControllerManagerConfiguration, error) {
    scheme := runtime.NewScheme()
    if err := kubectrlmgrschemev1alpha1.AddToScheme(scheme); err != nil {
        return kubectrlmgrconfig.KubeControllerManagerConfiguration{}, err
    }
    if err := kubectrlmgrconfig.AddToScheme(scheme); err != nil {
        return kubectrlmgrconfig.KubeControllerManagerConfiguration{}, err
    }

    versioned := kubectrlmgrconfigv1alpha1.KubeControllerManagerConfiguration{}
    //Load default parameters
    scheme.Default(&versioned)

    internal := kubectrlmgrconfig.KubeControllerManagerConfiguration{}
    if err := scheme.Convert(&versioned, &internal, nil); err != nil {
        return internal, err
    }
    internal.Generic.Port = insecurePort
    return internal, nil
}
// Get the default parameters provided according to the Object
func (s *Scheme) Default(src Object) {
    if fn, ok := s.defaulterFuncs[reflect.TypeOf(src)]; ok {
        fn(src)
    }
}

The type of s.defaulterFuncs is map[reflect.Type]func(interface {}), which is used to get the default value function according to the pointer type. Where does the data in the map come from?

The code is located in src\k8s.io\kubernetes\pkg\controller\apis\config\v1alpha1\zz_generated.defaults.go

You can see that gc (enable garbage collector) is enabled by default in the garbage collector in the default parameter, and the concurrency number is 20 (ConcurrentGCSyncs).

func SetDefaults_GarbageCollectorControllerConfiguration(obj *kubectrlmgrconfigv1alpha1.GarbageCollectorControllerConfiguration) {
    if obj.EnableGarbageCollector == nil {
        obj.EnableGarbageCollector = utilpointer.BoolPtr(true)
    }
    if obj.ConcurrentGCSyncs == 0 {
        obj.ConcurrentGCSyncs = 20
    }
}

Return to the Run function, which calls NewControllerInitializers to start all controllers:

Focus on the startgarbage collectorcontroller function to start garbage collector:

func startGarbageCollectorController(ctx ControllerContext) (http.Handler, bool, error) {
    //k8s 1.13.2 is true by default. You can add -- enable garbage collector = false to the startup parameters of Kube API server and Kube controller manager.
    //It is necessary to ensure that the parameter values in the two components are consistent.
    if !ctx.ComponentConfig.GarbageCollectorController.EnableGarbageCollector {
        return nil, false, nil
    }

    //k8s various native resource object client collections (built with SimpleControllerClientBuilder in the default startup parameters)
    gcClientset := ctx.ClientBuilder.ClientOrDie("generic-garbage-collector")
    discoveryClient := cacheddiscovery.NewMemCacheClient(gcClientset.Discovery())

    //Generate rest config
    config := ctx.ClientBuilder.ConfigOrDie("generic-garbage-collector")
    dynamicClient, err := dynamic.NewForConfig(config)
    if err != nil {
        return nil, true, err
    }

    // Get an initial set of deletable resources to prime the garbage collector.
    //Gets an initial set of removable resources to populate the garbage collector.
    deletableResources := garbagecollector.GetDeletableResources(discoveryClient)
    ignoredResources := make(map[schema.GroupResource]struct{})

    //Ignore gc resource types
    for _, r := range ctx.ComponentConfig.GarbageCollectorController.GCIgnoredResources {
        ignoredResources[schema.GroupResource{Group: r.Group, Resource: r.Resource}] = struct{}{}
    }
    garbageCollector, err := garbagecollector.NewGarbageCollector(
        dynamicClient,
        ctx.RESTMapper,
        deletableResources,
        ignoredResources,
        ctx.InformerFactory,
        ctx.InformersStarted,
    )
    if err != nil {
        return nil, true, fmt.Errorf("Failed to start the generic garbage collector: %v", err)
    }

    // Start the garbage collector.
    //The default of startup parameters is 20 processes
    workers := int(ctx.ComponentConfig.GarbageCollectorController.ConcurrentGCSyncs)
    //Start monitors and deleteWorkers, orpanworkers
    go garbageCollector.Run(workers, ctx.Stop)

    // Periodically refresh the RESTMapper with new discovery information and sync
    // the garbage collector.
    //Periodically refresh the RESTMapper with new discovery information and synchronize the garbage collector.
    go garbageCollector.Sync(gcClientset.Discovery(), 30*time.Second, ctx.Stop)

    //gc provides debug dot grab dependency graph interface
    return garbagecollector.NewDebugHandler(garbageCollector), true, nil
}

The main functions of this function are:
1. Delettableresources: = garbage collector.getdelettableresources (discoveryclient) gets all the resource objects that can be deleted in the cluster and excludes the ignored resource objects.
2. Construct the garbage collector structure object;
3. garbageCollector.Run(workers, ctx.Stop) starts a monitor to monitor the changes of resource objects (corresponding to the runProcessGraphChanges loop), and the default 20 deleteWorkers processes the deleted resource objects and 20 orphanWorkers processes the orphaned objects.
4. garbageCollector.Sync(gcClientset.Discovery(), 30*time.Second, ctx.Stop) regularly gets whether there are new types of resource objects in a cluster, and refreshes monitors to listen for new types of resource objects.
5. The garbagcollector.newdebughandler (garbagcollector) registers the debug interface, which is used to provide the interface to obtain the dot flowchart:

curl http://127.0.0.1:10252/debug/controllers/garbagecollector/graph?uid=11211212edsaddkqedmk12

Using the dot.exe provided by graphviz, you can generate a graph in svg format, which can be viewed by google browser as follows:

// curl http://127.0.0.1:10252/debug/controllers/garbagecollector/graph?uid=11211212edsaddkqedmk12
func (h *debugHTTPHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
    if req.URL.Path != "/graph" {
        http.Error(w, "", http.StatusNotFound)
        return
    }

    var graph graph.Directed
    if uidStrings := req.URL.Query()["uid"]; len(uidStrings) > 0 {
        uids := []types.UID{}
        for _, uidString := range uidStrings {
            uids = append(uids, types.UID(uidString))
        }
        graph = h.controller.dependencyGraphBuilder.uidToNode.ToGonumGraphForObj(uids...)

    } else {
        graph = h.controller.dependencyGraphBuilder.uidToNode.ToGonumGraph()
    }

    //Generate dot flow chart data, and use the dot.exe tool in graphviz tool to convert it into svg chart (opened by google browser) or png chart
    //API reference: https://godoc.org/forum.org/v1/forum/graph
    //Download address of graphviz: https://graphviz.gitlab.io/_pages/Download/Download_windows.html
    //dot.exe test.dot -T svg -o test.svg
    data, err := dot.Marshal(graph, "full", "", "  ", false)
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    w.Write(data)
    w.WriteHeader(http.StatusOK)
}


Through restMapper, the garbage collector periodically resets the removable resource types and updates the monitors in GraphBuilder. The monitors will create the change notification callback function for all resource types and add the changed resource objects to graphChanges queue of GraphBuilder. The runProcessGraphChanges() of GraphBuilder will always get the changes from the queue and build a cache object. Depending on the graph of the relationship, as well as triggering the dependency graph builder to queue the objects that may be garbage collected to the attemptToDelete queue, and to queue the objects whose dependencies need to be orphaned to the attemptToOrphan queue. GarbageCollector has the staff of runAttemptToDeleteWorker and runattempttoorphan worker who use these two queues. They are taken from the attemptToDelete queue and attemptToOrphan queue respectively, and send a request to the API server to delete the update object accordingly.

// The garbage collector runs a reflector to monitor changes to managed API objects, and summarizes the results to a single threaded dependency graphbuilder.
// Build a graph of the dependencies between cached objects. Triggered by graph changes, dependencyGraphBuilder will be garbage collected
// Queue to the 'attemptToDelete' queue and queue objects whose dependencies need to be orphaned to the 'attemptToOrphan' queue.
// The garbage collector has staff using these two queues to send requests to the API server to delete the update objects accordingly.
// Note that let the dependency graph builder notify the garbage collector to make sure that the garbage collector operates with at least the same up-to-date graphics as sending the notification.
type GarbageCollector struct {
    // Resettable RESTMapper is a RESTMapper that can reset itself when the discovery resource type is found
    restMapper    resettableRESTMapper
    // Dynamic client provides interface methods for operating all resource objects in the cluster, including k8s built-in and CRD generated custom resources.
    dynamicClient dynamic.Interface
    //The garbage collector attempts to delete item s in the attemptToDelete queue when time is up
    attemptToDelete workqueue.RateLimitingInterface
    //The garbage collector attempts to isolate the dependency of an item in the attemptToOrphan queue and then delete the item
    attemptToOrphan        workqueue.RateLimitingInterface
    dependencyGraphBuilder *GraphBuilder
    // Only when there is a resource object of the owner, can the absentOwnerCache be filled with the owner information that does not exist.
    absentOwnerCache *UIDCache
    sharedInformers  informers.SharedInformerFactory

    workerLock sync.RWMutex
}
// GraphBuilder: Based on the events provided by informers, GraphBuilder updates
// uidToNode, a graph that caches the dependencies we know, and
// Items are placed in the attemptToDelete and attemptToOrphan queues
type GraphBuilder struct {
    restMapper meta.RESTMapper

    //Each monitor list / monitoring resource, and the results are collected in dependencyGraphBuilder
    monitors    monitors
    monitorLock sync.RWMutex
    // informersStarted is closed after after all of the controllers have been initialized and are running.
    // After that it is safe to start them here, before that it is not.
    // informersStarted shuts down after all controllers are initialized and running. It's safe to start them here later, but it wasn't before.
    informersStarted <-chan struct{}

    // stopCh drives shutdown. When a receive from it unblocks, monitors will shut down.
    // This channel is also protected by monitorLock.
    // stopCh drive shutdown the monitor will shut down when reception from it is unblocked. This channel is also protected by monitorLock.
    stopCh <-chan struct{}

    // running tracks whether Run() has been called.
    // it is protected by monitorLock.
    //Whether the run track has called Run() which is protected by monitorLock.
    running bool

    dynamicClient dynamic.Interface
    // monitors are the producer of the graphChanges queue, graphBuilder alters
    // the in-memory graph according to the changes.
    // monitor is the builder of the graphChanges queue. graphBuilder changes the graphics in memory according to the changes.
    graphChanges workqueue.RateLimitingInterface

    // uidToNode doesn't require a lock to protect, because only the
    // single-threaded GraphBuilder.processGraphChanges() reads/writes it.
    //uidToNode does not need lock protection because it is read and written only by single threaded GraphBuilder.processGraphChanges().
    uidToNode *concurrentUIDToNode

    // GraphBuilder is the producer of attemptToDelete and attemptToOrphan, GC is the consumer.
    // GraphBuilder is the producer of attemptToDelete and attemptToOrphan, and GC is the consumer.
    attemptToDelete workqueue.RateLimitingInterface
    attemptToOrphan workqueue.RateLimitingInterface

    // GraphBuilder and GC share the absentOwnerCache. Objects that are known to
    // be non-existent are added to the cached.
    // GraphBuilder and GC share the absentOwnerCache. Objects that are known to not exist are added to the cache.
    absentOwnerCache *UIDCache

    //informer of all k8s resource object sets
    sharedInformers  informers.SharedInformerFactory

    //Resource object set ignored by monitor
    ignoredResources map[schema.GroupResource]struct{}
}

To create a newgarbage collector structure:

func NewGarbageCollector(
    dynamicClient dynamic.Interface,
    mapper resettableRESTMapper,
    deletableResources map[schema.GroupVersionResource]struct{},
    ignoredResources map[schema.GroupResource]struct{},
    sharedInformers informers.SharedInformerFactory,
    informersStarted <-chan struct{},
) (*GarbageCollector, error) {
    attemptToDelete := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_attempt_to_delete")
    attemptToOrphan := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_attempt_to_orphan")
    absentOwnerCache := NewUIDCache(500)
    gc := &GarbageCollector{
        dynamicClient:    dynamicClient,
        restMapper:       mapper,
        attemptToDelete:  attemptToDelete,
        attemptToOrphan:  attemptToOrphan,
        absentOwnerCache: absentOwnerCache,
    }
    gb := &GraphBuilder{
        dynamicClient:    dynamicClient,
        informersStarted: informersStarted,
        restMapper:       mapper,
        graphChanges:     workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_graph_changes"),
        uidToNode: &concurrentUIDToNode{
            uidToNode: make(map[types.UID]*node),
        },
        attemptToDelete:  attemptToDelete,
        attemptToOrphan:  attemptToOrphan,
        absentOwnerCache: absentOwnerCache,
        sharedInformers:  sharedInformers,
        ignoredResources: ignoredResources,
    }
    //Initialize the monitors of each resource object, start the listeners of each resource object, trigger a callback when it changes, and add it to the graphChanges queue
    if err := gb.syncMonitors(deletableResources); err != nil {
        utilruntime.HandleError(fmt.Errorf("failed to sync all monitors: %v", err))
    }
    gc.dependencyGraphBuilder = gb

    return gc, nil
}

Main functions:
1. Construct the garbage collector structure;
2. Build a dependency structure graph to maintain the structure GraphBuilder, and share the attemptToDelete and attemptToOrphan queues with the garbage collector. GraphBuilder, as a producer, puts appropriate resources into the attemptToDelete or attemptToOrphan queues for the worker s in the garbage collector to consume.
3. Initialize the monitors of each resource object, start the listeners of each resource object, trigger the callback when it changes, and add it to the graphChanges queue.

The most important method of GB. Syncmonitors (deletable resource s) is C, s, err: = GB. Controllerfor (resource, kind)

func (gb *GraphBuilder) controllerFor(resource schema.GroupVersionResource, kind schema.GroupVersionKind) (cache.Controller, cache.Store, error) {
    handlers := cache.ResourceEventHandlerFuncs{
        // add the event to the dependencyGraphBuilder's graphChanges.
        // Add the event to graphChanges in the dependencyGraphBuilder.
        AddFunc: func(obj interface{}) {
            event := &event{
                eventType: addEvent,
                obj:       obj,
                gvk:       kind,
            }
            gb.graphChanges.Add(event)
        },
        UpdateFunc: func(oldObj, newObj interface{}) {
            // TODO: check if there are differences in the ownerRefs,
            // finalizers, and DeletionTimestamp; if not, ignore the update.
            //TODO: check whether there are differences between ownerRefs, finalizers and DeletionTimestamp; if not, ignore the update.
            event := &event{
                eventType: updateEvent,
                obj:       newObj,
                oldObj:    oldObj,
                gvk:       kind,
            }
            gb.graphChanges.Add(event)
        },
        DeleteFunc: func(obj interface{}) {
            // delta fifo may wrap the object in a cache.DeletedFinalStateUnknown, unwrap it
            // delta fifo can wrap the object in cache.DeletedFinalStateUnknown and unpack it.
            if deletedFinalStateUnknown, ok := obj.(cache.DeletedFinalStateUnknown); ok {
                obj = deletedFinalStateUnknown.Obj
            }
            event := &event{
                eventType: deleteEvent,
                obj:       obj,
                gvk:       kind,
            }
            gb.graphChanges.Add(event)
        },
    }
    shared, err := gb.sharedInformers.ForResource(resource)
    if err == nil {
        klog.V(4).Infof("using a shared informer for resource %q, kind %q", resource.String(), kind.String())
        // need to clone because it's from a shared cache
        shared.Informer().AddEventHandlerWithResyncPeriod(handlers, ResourceResyncTime)
        return shared.Informer().GetController(), shared.Informer().GetStore(), nil
    } else {
        //When getting resource objects, errors will come here, such as non k8s built-in RedisCluster, clusterbases, clusters, esclusters, volumeproviders, stsmasters, apps, mysqlclusters, brokerclusters, clustertemplates;
        //Built in networkPolicies, Apis services, customresourcedefinitions
        klog.V(4).Infof("unable to use a shared informer for resource %q, kind %q: %v", resource.String(), kind.String(), err)
    }

    // TODO: consider store in one storage.
    // TODO: consider storing in one store.
    klog.V(5).Infof("create storage for resource %s", resource)
    //store and controller of the above failed resource object
    store, monitor := cache.NewInformer(
        listWatcher(gb.dynamicClient, resource),
        nil,
        ResourceResyncTime,
        // don't need to clone because it's not from shared cache
        //Clone is not required because it is not from shared cache
        handlers,
    )
    return monitor, store, nil
}

The main functions of this method are:
1. Build the new, changed and deleted resource objects into event structure, put them into graphChanges queue of GraphBuilder, and finally consume them by the worker runProcessGraphChanges;
2. Build sharedinformationfactory of most built-in resources, and use cache.NewInformer to build failed ones (objects defined by CRD and some k8s built-in objects)

The code continues to go back to the startgarbage collector controller in k8s. IO \ kubernetes \ CMD \ Kube Controller Manager \ app \ core.go. See the garbage collector.run (workers, CTX. Stop) method:

func (gc *GarbageCollector) Run(workers int, stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
    defer gc.attemptToDelete.ShutDown()
    defer gc.attemptToOrphan.ShutDown()
    defer gc.dependencyGraphBuilder.graphChanges.ShutDown()

    klog.Infof("Starting garbage collector controller")
    defer klog.Infof("Shutting down garbage collector controller")

    //Producer monitors
    go gc.dependencyGraphBuilder.Run(stopCh)

    //Wait for the dependencyGraphBuilder cache to start synchronizing
    if !controller.WaitForCacheSync("garbage collector", stopCh, gc.dependencyGraphBuilder.IsSynced) {
        return
    }

    //Garbage collector: all resource monitors are synchronized. Continue to collect garbage
    klog.Infof("Garbage collector: all resource monitors have synced. Proceeding to collect garbage")

    // gc workers
    //Process running consumer DeleteWorkers and OrphanWorkers
    for i := 0; i < workers; i++ {
        //The default parameter is 20 concurrent processes trying to delete worker
        go wait.Until(gc.runAttemptToDeleteWorker, 1*time.Second, stopCh)
        //The default parameter is 20 concurrent process attempts, orphan worker
        go wait.Until(gc.runAttemptToOrphanWorker, 1*time.Second, stopCh)
    }

    <-stopCh
}

gc.dependencyGraphBuilder.Run(stopCh) main functions:
1. gb.startMonitors() starts the informer that listens for resource changes;
2. wait.Until(gb.runProcessGraphChanges, 1*time.Second, stopCh) to open the worker consumed from the queue GraphBuilder.graphChanges.

Start 20 runattempttodeleteworkers and 20 runattempttoorphanworkers

Reference resources:

k8s official document garbage collection English version:
https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/

Depending on the icon to generate the library gongum API document:
https://godoc.org/gonum.org/v1/gonum/graph

graphviz Download:
https://graphviz.gitlab.io/_pages/Download/Download_windows.html

This public account provides free csdn download service and massive it learning resources. If you are ready to enter the IT pit and aspire to become an excellent program ape, these resources are suitable for you, including but not limited to java, go, python, springcloud, elk, embedded, big data, interview materials, front-end and other resources. At the same time, we have set up a technology exchange group. There are many big guys who will share technology articles from time to time. If you want to learn and improve together, you can reply [2] in the background of the public account. Free invitation plus technology exchange groups will learn from each other and share programming it related resources from time to time.

Scan the code to pay attention to the wonderful content and push it to you at the first time

Keywords: Go Redis Kubernetes curl Google

Added by Whitestripes9805 on Wed, 16 Oct 2019 04:54:16 +0300