External provider source code analysis - main processing logic analysis

summary

Next, we will analyze the source code of the external provider component.

Based on tag v1 six

https://github.com/kubernetes-csi/external-provisioner/releases/tag/v1.6.0

RBD usage analysis combined with CEPH CSI

Introduction to the role of external provider

(1) When creating pvc, the external provider participates in the creation of storage resources and pv objects. After the external provider component listens to the pvc creation event, it is responsible for the splicing request and calls the CreateVolume method of CEPH CSI component to create the storage. After the storage is created successfully, it creates the pv object;
(2) When the object participating in pvc external delete is deleted, the object participating in pvc external delete is deleted. When pvc is deleted, pv controller updates its pv object state from bound to release. After listening to pv update event, external-provisioner invokes ceph-csi's DeleteVolume method to delete storage and delete pv objects.

External provider source code analysis (1) - subject processing logic analysis

In the external provider component, the main business processing logic is in the provisionController, so the analysis of the external provider component starts with the provisionController.

The provisionController is mainly responsible for handling claimQueue (i.e. handling the new and update events of pvc objects), calling the CreateVolume method of CEPH CSI component to create storage and create pv objects as needed; And handling volumeQueue (i.e. handling the new and update events of pv objects). According to the status of pv and recycling policy, decide whether to call the DeleteVolume method of CEPH CSI component to delete storage and delete pv objects.

The provisionController. is called in the main method. Run (wait. Neverstop), as the analysis entry of provisioncontroller.

provisionController.Run()

provisionController. The run method is defined in run () and executed. Mainly focus on Ctrl. In the run method Runclaimworker and Ctrl Runvolumeworker, these two methods are responsible for processing the body logic.

// Run starts all of this controller's control loops
func (ctrl *ProvisionController) Run(_ <-chan struct{}) {
	// TODO: arg is as of 1.12 unused. Nothing can ever be cancelled. Should
	// accept a context instead and use it instead of context.TODO(), but would
	// break API. Not urgent: realistically, users are simply passing in
	// wait.NeverStop() anyway.

	run := func(ctx context.Context) {
		glog.Infof("Starting provisioner controller %s!", ctrl.component)
		defer utilruntime.HandleCrash()
		defer ctrl.claimQueue.ShutDown()
		defer ctrl.volumeQueue.ShutDown()

		ctrl.hasRunLock.Lock()
		ctrl.hasRun = true
		ctrl.hasRunLock.Unlock()
		if ctrl.metricsPort > 0 {
			prometheus.MustRegister([]prometheus.Collector{
				metrics.PersistentVolumeClaimProvisionTotal,
				metrics.PersistentVolumeClaimProvisionFailedTotal,
				metrics.PersistentVolumeClaimProvisionDurationSeconds,
				metrics.PersistentVolumeDeleteTotal,
				metrics.PersistentVolumeDeleteFailedTotal,
				metrics.PersistentVolumeDeleteDurationSeconds,
			}...)
			http.Handle(ctrl.metricsPath, promhttp.Handler())
			address := net.JoinHostPort(ctrl.metricsAddress, strconv.FormatInt(int64(ctrl.metricsPort), 10))
			glog.Infof("Starting metrics server at %s\n", address)
			go wait.Forever(func() {
				err := http.ListenAndServe(address, nil)
				if err != nil {
					glog.Errorf("Failed to listen on %s: %v", address, err)
				}
			}, 5*time.Second)
		}

		// If a external SharedInformer has been passed in, this controller
		// should not call Run again
		if !ctrl.customClaimInformer {
			go ctrl.claimInformer.Run(ctx.Done())
		}
		if !ctrl.customVolumeInformer {
			go ctrl.volumeInformer.Run(ctx.Done())
		}
		if !ctrl.customClassInformer {
			go ctrl.classInformer.Run(ctx.Done())
		}

		if !cache.WaitForCacheSync(ctx.Done(), ctrl.claimInformer.HasSynced, ctrl.volumeInformer.HasSynced, ctrl.classInformer.HasSynced) {
			return
		}
        
        // Two worker s run multiple goroutine s
		for i := 0; i < ctrl.threadiness; i++ {
			go wait.Until(ctrl.runClaimWorker, time.Second, context.TODO().Done())
			go wait.Until(ctrl.runVolumeWorker, time.Second, context.TODO().Done())
		}

		glog.Infof("Started provisioner controller %s!", ctrl.component)

		select {}
	}

	go ctrl.volumeStore.Run(context.TODO(), DefaultThreadiness)
    
    // Select main correlation
	if ctrl.leaderElection {
		rl, err := resourcelock.New("endpoints",
			ctrl.leaderElectionNamespace,
			strings.Replace(ctrl.provisionerName, "/", "-", -1),
			ctrl.client.CoreV1(),
			nil,
			resourcelock.ResourceLockConfig{
				Identity:      ctrl.id,
				EventRecorder: ctrl.eventRecorder,
			})
		if err != nil {
			glog.Fatalf("Error creating lock: %v", err)
		}

		leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
			Lock:          rl,
			LeaseDuration: ctrl.leaseDuration,
			RenewDeadline: ctrl.renewDeadline,
			RetryPeriod:   ctrl.retryPeriod,
			Callbacks: leaderelection.LeaderCallbacks{
				OnStartedLeading: run,
				OnStoppedLeading: func() {
					glog.Fatalf("leaderelection lost")
				},
			},
		})
		panic("unreachable")
	} else {
		run(context.TODO())
	}
}

Next, the CTRL. In the run method will be Runclaimworker and Ctrl Runvolumeworker for analysis.

1.ctrl.runClaimWorker

According to the number of threadiness, start the goroutine of the corresponding number and run Ctrl runClaimWorker.

It is mainly responsible for handling the claimQueue, handling the new and update events of pvc objects, and calling the CreateVolume method of csi components to create storage and create pv objects as needed.

        for i := 0; i < ctrl.threadiness; i++ {
			go wait.Until(ctrl.runClaimWorker, time.Second, context.TODO().Done())
			go wait.Until(ctrl.runVolumeWorker, time.Second, context.TODO().Done())
		}
// vendor/sigs.k8s.io/sig-storage-lib-external-provisioner/v5/controller/controller.go

func (ctrl *ProvisionController) runClaimWorker() {
    // Infinite loop processNextClaimWorkItem
	for ctrl.processNextClaimWorkItem() {
	}
}

Call chain: main() -- > provisioncontroller Run() --> ctrl. runClaimWorker() --> ctrl. processNextClaimWorkItem() --> ctrl. syncClaimHandler() --> ctrl. syncClaim() --> ctrl. provisionClaimOperation() --> ctrl. provisioner. Provision()

1.1 ctrl.processNextClaimWorkItem

Main logic:
(1) Obtain pvc from claimQueue;
(2) Adjust Ctrl Syncclaimhandler for further processing;
(3) After successful processing, clean the rateLimiter of the pvc and remove the pvc from claimsInProgress;
(4) After the processing fails, it will retry a certain number of times, that is, add rateLimiter to the pvc;
(5) Finally, no matter how you adjust Ctrl Whether the syncclaimhandler is successful or not, remove the pvc from the claimQueue.

// Map UID -> *PVC with all claims that may be provisioned in the background.
claimsInProgress sync.Map

// processNextClaimWorkItem processes items from claimQueue
func (ctrl *ProvisionController) processNextClaimWorkItem() bool {
    // Get pvc from claimQueue
	obj, shutdown := ctrl.claimQueue.Get()

	if shutdown {
		return false
	}

	err := func(obj interface{}) error {
	    // Finally, no matter how you adjust Ctrl Whether the syncclaimhandler is successful or not, remove the pvc from the claimQueue
		defer ctrl.claimQueue.Done(obj)
		var key string
		var ok bool
		if key, ok = obj.(string); !ok {
			ctrl.claimQueue.Forget(obj)
			return fmt.Errorf("expected string in workqueue but got %#v", obj)
		}
        
        // Adjust Ctrl Syncclaimhandler for further processing
		if err := ctrl.syncClaimHandler(key); err != nil {
		    // After the processing fails, it will retry a certain number of times, that is, add rateLimiter to the pvc
			if ctrl.failedProvisionThreshold == 0 {
				glog.Warningf("Retrying syncing claim %q, failure %v", key, ctrl.claimQueue.NumRequeues(obj))
				ctrl.claimQueue.AddRateLimited(obj)
			} else if ctrl.claimQueue.NumRequeues(obj) < ctrl.failedProvisionThreshold {
				glog.Warningf("Retrying syncing claim %q because failures %v < threshold %v", key, ctrl.claimQueue.NumRequeues(obj), ctrl.failedProvisionThreshold)
				ctrl.claimQueue.AddRateLimited(obj)
			} else {
				glog.Errorf("Giving up syncing claim %q because failures %v >= threshold %v", key, ctrl.claimQueue.NumRequeues(obj), ctrl.failedProvisionThreshold)
				glog.V(2).Infof("Removing PVC %s from claims in progress", key)
				ctrl.claimsInProgress.Delete(key) // This can leak a volume that's being provisioned in the background!
				// Done but do not Forget: it will not be in the queue but NumRequeues
				// will be saved until the obj is deleted from kubernetes
			}
			return fmt.Errorf("error syncing claim %q: %s", key, err.Error())
		}
        
        // After successful processing, clean up the rateLimiter of the pvc and remove the pvc from claimsInProgress
		ctrl.claimQueue.Forget(obj)
		// Silently remove the PVC from list of volumes in progress. The provisioning either succeeded
		// or the PVC was ignored by this provisioner.
		ctrl.claimsInProgress.Delete(key)
		return nil
	}(obj)

	if err != nil {
		utilruntime.HandleError(err)
		return true
	}

	return true
}

Let's first analyze the relevant methods of claimQueue:
(1) Done: remove from claimQueue

// Done marks item as done processing, and if it has been marked as dirty again
// while it was being processed, it will be re-added to the queue for
// re-processing.
func (q *Type) Done(item interface{}) {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()

	q.metrics.done(item)

	q.processing.delete(item)
	if q.dirty.has(item) {
		q.queue = append(q.queue, item)
		q.cond.Signal()
	}
}

(2) Forget: clean up rateLimiter only

func (q *rateLimitingType) Forget(item interface{}) {
	q.rateLimiter.Forget(item)
}

(3) AddRateLimited: rejoin the claimQueue after the rate limiter indicates that it is OK

// AddRateLimited AddAfter's the item based on the time when the rate limiter says it's ok
func (q *rateLimitingType) AddRateLimited(item interface{}) {
	q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}

// AddAfter adds the given item to the work queue after the given delay
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
	// don't add if we're already shutting down
	if q.ShuttingDown() {
		return
	}

	q.metrics.retry()

	// immediately add things with no delay
	if duration <= 0 {
		q.Add(item)
		return
	}

	select {
	case <-q.stopCh:
		// unblock if ShutDown() is called
	case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
	}
}

1.1.1 ctrl.syncClaimHandler

The most important thing is to adjust Ctrl syncClaim

// syncClaimHandler gets the claim from informer's cache then calls syncClaim. A non-nil error triggers requeuing of the claim.
func (ctrl *ProvisionController) syncClaimHandler(key string) error {
	objs, err := ctrl.claimsIndexer.ByIndex(uidIndex, key)
	if err != nil {
		return err
	}
	var claimObj interface{}
	if len(objs) > 0 {
		claimObj = objs[0]
	} else {
		obj, found := ctrl.claimsInProgress.Load(key)
		if !found {
			utilruntime.HandleError(fmt.Errorf("claim %q in work queue no longer exists", key))
			return nil
		}
		claimObj = obj
	}
	return ctrl.syncClaim(claimObj)
}
1.1.1.1 ctrl.syncClaim

Main logic:
(1) Call Ctrl. First Shouldprovision determines whether provision operation is required;
(2) Adjust Ctrl Provisionclaimoperation for further operations.

// syncClaim checks if the claim should have a volume provisioned for it and
// provisions one if so. Returns an error if the claim is to be requeued.
func (ctrl *ProvisionController) syncClaim(obj interface{}) error {
	claim, ok := obj.(*v1.PersistentVolumeClaim)
	if !ok {
		return fmt.Errorf("expected claim but got %+v", obj)
	}

	should, err := ctrl.shouldProvision(claim)
	if err != nil {
		ctrl.updateProvisionStats(claim, err, time.Time{})
		return err
	} else if should {
		startTime := time.Now()

		status, err := ctrl.provisionClaimOperation(claim)
		ctrl.updateProvisionStats(claim, err, startTime)
		if err == nil || status == ProvisioningFinished {
			// Provisioning is 100% finished / not in progress.
			switch err {
			case nil:
				glog.V(5).Infof("Claim processing succeeded, removing PVC %s from claims in progress", claim.UID)
			case errStopProvision:
				glog.V(5).Infof("Stop provisioning, removing PVC %s from claims in progress", claim.UID)
				// Our caller would requeue if we pass on this special error; return nil instead.
				err = nil
			default:
				glog.V(2).Infof("Final error received, removing PVC %s from claims in progress", claim.UID)
			}
			ctrl.claimsInProgress.Delete(string(claim.UID))
			return err
		}
		if status == ProvisioningInBackground {
			// Provisioning is in progress in background.
			glog.V(2).Infof("Temporary error received, adding PVC %s to claims in progress", claim.UID)
			ctrl.claimsInProgress.Store(string(claim.UID), claim)
		} else {
			// status == ProvisioningNoChange.
			// Don't change claimsInProgress:
			// - the claim is already there if previous status was ProvisioningInBackground.
			// - the claim is not there if if previous status was ProvisioningFinished.
		}
		return err
	}
	return nil
}
ctrl.shouldProvision

This method mainly judges whether a pvc object needs provision operation. The main logic is as follows:
(1) When claim When spec.volumename is not empty, provision operation is not required, and false is returned;
(2) Adjust qualifier Shouldprovision determines whether the storage driver supports the provision operation;
(3) If it is kubernetes version 1.5 or above, obtain the driver name from the annotation of pvc and verify it accordingly;

// shouldProvision returns whether a claim should have a volume provisioned for
// it, i.e. whether a Provision is "desired"
func (ctrl *ProvisionController) shouldProvision(claim *v1.PersistentVolumeClaim) (bool, error) {
	if claim.Spec.VolumeName != "" {
		return false, nil
	}

	if qualifier, ok := ctrl.provisioner.(Qualifier); ok {
		if !qualifier.ShouldProvision(claim) {
			return false, nil
		}
	}

	// Kubernetes 1.5 provisioning with annStorageProvisioner
	if ctrl.kubeVersion.AtLeast(utilversion.MustParseSemantic("v1.5.0")) {
		if provisioner, found := claim.Annotations[annStorageProvisioner]; found {
			if ctrl.knownProvisioner(provisioner) {
				claimClass := util.GetPersistentVolumeClaimClass(claim)
				class, err := ctrl.getStorageClass(claimClass)
				if err != nil {
					return false, err
				}
				if class.VolumeBindingMode != nil && *class.VolumeBindingMode == storage.VolumeBindingWaitForFirstConsumer {
					// When claim is in delay binding mode, annSelectedNode is
					// required to provision volume.
					// Though PV controller set annStorageProvisioner only when
					// annSelectedNode is set, but provisioner may remove
					// annSelectedNode to notify scheduler to reschedule again.
					if selectedNode, ok := claim.Annotations[annSelectedNode]; ok && selectedNode != "" {
						return true, nil
					}
					return false, nil
				}
				return true, nil
			}
		}
	} else {
		// Kubernetes 1.4 provisioning, evaluating class.Provisioner
		claimClass := util.GetPersistentVolumeClaimClass(claim)
		class, err := ctrl.getStorageClass(claimClass)
		if err != nil {
			glog.Errorf("Error getting claim %q's StorageClass's fields: %v", claimToClaimKey(claim), err)
			return false, err
		}
		if class.Provisioner != ctrl.provisionerName {
			return false, nil
		}

		return true, nil
	}

	return false, nil
}
ctrl.provisionClaimOperation

Main logic:
(1) Get the name of storageclass from pvc object;
(2) Check whether pv already exists;
(3) Obtain information from pvc object to construct claimRef;
(4) Check whether dynamic storage creation is supported;
(5) Get the storageclass object;
(6) Check whether the provisioner in the storageclass object has been registered;
(7) Construct options structure;
(8) Start calling the provision method and return the pv object structure;
(9) Adding additional information of pv object structure;
(10) Create pv to apiserver.

// provisionClaimOperation attempts to provision a volume for the given claim.
// Returns nil error only when the volume was provisioned (in which case it also returns ProvisioningFinished),
// a normal error when the volume was not provisioned and provisioning should be retried (requeue the claim),
// or the special errStopProvision when provisioning was impossible and no further attempts to provision should be tried.
func (ctrl *ProvisionController) provisionClaimOperation(claim *v1.PersistentVolumeClaim) (ProvisioningState, error) {
	// Most code here is identical to that found in controller.go of kube's PV controller...
	
	// Get the name of the storageclass from the pvc object
	claimClass := util.GetPersistentVolumeClaimClass(claim)
	operation := fmt.Sprintf("provision %q class %q", claimToClaimKey(claim), claimClass)
	glog.Info(logOperation(operation, "started"))

	//  Check if pv already exists
	pvName := ctrl.getProvisionedVolumeNameForClaim(claim)
	volume, err := ctrl.client.CoreV1().PersistentVolumes().Get(pvName, metav1.GetOptions{})
	if err == nil && volume != nil {
		// Volume has been already provisioned, nothing to do.
		glog.Info(logOperation(operation, "persistentvolume %q already exists, skipping", pvName))
		return ProvisioningFinished, errStopProvision
	}

	// Get information from pvc object to construct claimRef
	claimRef, err := ref.GetReference(scheme.Scheme, claim)
	if err != nil {
		glog.Error(logOperation(operation, "unexpected error getting claim reference: %v", err))
		return ProvisioningNoChange, err
	}

	// Check whether dynamic creation of storage is supported
	if err = ctrl.canProvision(claim); err != nil {
		ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningFailed", err.Error())
		glog.Error(logOperation(operation, "failed to provision volume: %v", err))
		return ProvisioningFinished, errStopProvision
	}

	// Get storageclass object
	class, err := ctrl.getStorageClass(claimClass)
	if err != nil {
		glog.Error(logOperation(operation, "error getting claim's StorageClass's fields: %v", err))
		return ProvisioningFinished, err
	}
	
	// Check whether the provisioner in the storageclass object is registered
	if !ctrl.knownProvisioner(class.Provisioner) {
		// class.Provisioner has either changed since shouldProvision() or
		// annDynamicallyProvisioned contains different provisioner than
		// class.Provisioner.
		glog.Error(logOperation(operation, "unknown provisioner %q requested in claim's StorageClass", class.Provisioner))
		return ProvisioningFinished, errStopProvision
	}
    
    // Specify node related operations
	var selectedNode *v1.Node
	if ctrl.kubeVersion.AtLeast(utilversion.MustParseSemantic("v1.11.0")) {
		// Get SelectedNode
		if nodeName, ok := getString(claim.Annotations, annSelectedNode, annAlphaSelectedNode); ok {
			selectedNode, err = ctrl.client.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{}) // TODO (verult) cache Nodes
			if err != nil {
				err = fmt.Errorf("failed to get target node: %v", err)
				ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningFailed", err.Error())
				return ProvisioningNoChange, err
			}
		}
	}
    
    // Construct options structure
	options := ProvisionOptions{
		StorageClass: class,
		PVName:       pvName,
		PVC:          claim,
		SelectedNode: selectedNode,
	}

	ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, "Provisioning", fmt.Sprintf("External provisioner is provisioning volume for claim %q", claimToClaimKey(claim)))
    
    // Start calling the provision method and return the pv object structure
	result := ProvisioningFinished
	if p, ok := ctrl.provisioner.(ProvisionerExt); ok {
		volume, result, err = p.ProvisionExt(options)
	} else {
		volume, err = ctrl.provisioner.Provision(options)
	}
	if err != nil {
		if ierr, ok := err.(*IgnoredError); ok {
			// Provision ignored, do nothing and hope another provisioner will provision it.
			glog.Info(logOperation(operation, "volume provision ignored: %v", ierr))
			return ProvisioningFinished, errStopProvision
		}
		err = fmt.Errorf("failed to provision volume with StorageClass %q: %v", claimClass, err)
		ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningFailed", err.Error())
		if _, ok := claim.Annotations[annSelectedNode]; ok && result == ProvisioningReschedule {
			// For dynamic PV provisioning with delayed binding, the provisioner may fail
			// because the node is wrong (permanent error) or currently unusable (not enough
			// capacity). If the provisioner wants to give up scheduling with the currently
			// selected node, then it can ask for that by returning ProvisioningReschedule
			// as state.
			//
			// `selectedNode` must be removed to notify scheduler to schedule again.
			if errLabel := ctrl.rescheduleProvisioning(claim); errLabel != nil {
				glog.Info(logOperation(operation, "volume rescheduling failed: %v", errLabel))
				// If unsetting that label fails in ctrl.rescheduleProvisioning, we
				// keep the volume in the work queue as if the provisioner had
				// returned ProvisioningFinished and simply try again later.
				return ProvisioningFinished, err
			}
			// Label was removed, stop working on the volume.
			glog.Info(logOperation(operation, "volume rescheduled because: %v", err))
			return ProvisioningFinished, errStopProvision
		}

		// ProvisioningReschedule shouldn't have been returned for volumes without selected node,
		// but if we get it anyway, then treat it like ProvisioningFinished because we cannot
		// reschedule.
		if result == ProvisioningReschedule {
			result = ProvisioningFinished
		}
		return result, err
	}

	glog.Info(logOperation(operation, "volume %q provisioned", volume.Name))
    
    // pv object structure additional information added
	// Set ClaimRef and the PV controller will bind and set annBoundByController for us
	volume.Spec.ClaimRef = claimRef

	// Add external provisioner finalizer if it doesn't already have it
	if ctrl.addFinalizer && !ctrl.checkFinalizer(volume, finalizerPV) {
		volume.ObjectMeta.Finalizers = append(volume.ObjectMeta.Finalizers, finalizerPV)
	}

	metav1.SetMetaDataAnnotation(&volume.ObjectMeta, annDynamicallyProvisioned, ctrl.provisionerName)
	if ctrl.kubeVersion.AtLeast(utilversion.MustParseSemantic("v1.6.0")) {
		volume.Spec.StorageClassName = claimClass
	} else {
		metav1.SetMetaDataAnnotation(&volume.ObjectMeta, annClass, claimClass)
	}

	glog.Info(logOperation(operation, "succeeded"))
    
    // Create pv to apiserver
	if err := ctrl.volumeStore.StoreVolume(claim, volume); err != nil {
		return ProvisioningFinished, err
	}
	return ProvisioningFinished, nil
}

ctrl.provisioner.Provision:

The Provision method mainly calls the CreateVolume method of csi to create storage and return the pv object structure.

Main logic:
(1) Call p.checkDriverCapabilities to check the capabilities provided by the driver;
(2) Build pv object name;
(3) From storageclass Get fstype from parameters and verify it;
(4) Obtain the storage application size from pvc;
(5) Build the CreateVolumeRequest structure;
(6) Obtain the name and namespace of the secret corresponding to the operations of provisioner, controllerPublish, nodeStage, nodePublish and controllerExpand;
(7) Obtain the secret object corresponding to the provisioner from the apiserver and store it in the CreateVolumeRequest structure;
(8) Remove part of storageclass The parameters in parameters are then stored in the CreateVolumeRequest structure;
(9) Call p.csiclient CreateVolume (that is, call the CreateVolume method of ceph CSI) to create ceph storage;
(10) After the storage is created successfully, verify whether the created size meets the requirements;
(11) Build pv object structure and return.

func (p *csiProvisioner) Provision(options controller.ProvisionOptions) (*v1.PersistentVolume, error) {
	// The controller should call ProvisionExt() instead, but just in case...
	pv, _, err := p.ProvisionExt(options)
	return pv, err
}

func (p *csiProvisioner) ProvisionExt(options controller.ProvisionOptions) (*v1.PersistentVolume, controller.ProvisioningState, error) {
	if options.StorageClass == nil {
		return nil, controller.ProvisioningFinished, errors.New("storage class was nil")
	}

	if options.PVC.Annotations[annStorageProvisioner] != p.driverName && options.PVC.Annotations[annMigratedTo] != p.driverName {
		// The storage provisioner annotation may not equal driver name but the
		// PVC could have annotation "migrated-to" which is the new way to
		// signal a PVC is migrated (k8s v1.17+)
		return nil, controller.ProvisioningFinished, &controller.IgnoredError{
			Reason: fmt.Sprintf("PVC annotated with external-provisioner name %s does not match provisioner driver name %s. This could mean the PVC is not migrated",
				options.PVC.Annotations[annStorageProvisioner],
				p.driverName),
		}

	}

	migratedVolume := false
	if p.supportsMigrationFromInTreePluginName != "" {
		// NOTE: we cannot depend on PVC.Annotations[volume.beta.kubernetes.io/storage-provisioner] to get
		// the in-tree provisioner name in case of CSI migration scenarios. The annotation will be
		// set to the CSI provisioner name by PV controller for migration scenarios
		// so that external provisioner can correctly pick up the PVC pointing to an in-tree plugin
		if options.StorageClass.Provisioner == p.supportsMigrationFromInTreePluginName {
			klog.V(2).Infof("translating storage class for in-tree plugin %s to CSI", options.StorageClass.Provisioner)
			storageClass, err := p.translator.TranslateInTreeStorageClassToCSI(p.supportsMigrationFromInTreePluginName, options.StorageClass)
			if err != nil {
				return nil, controller.ProvisioningFinished, fmt.Errorf("failed to translate storage class: %v", err)
			}
			options.StorageClass = storageClass
			migratedVolume = true
		} else {
			klog.V(4).Infof("skip translation of storage class for plugin: %s", options.StorageClass.Provisioner)
		}
	}

	// Make sure the plugin is capable of fulfilling the requested options
	rc := &requiredCapabilities{}
	if options.PVC.Spec.DataSource != nil {
		// PVC.Spec.DataSource.Name is the name of the VolumeSnapshot API object
		if options.PVC.Spec.DataSource.Name == "" {
			return nil, controller.ProvisioningFinished, fmt.Errorf("the PVC source not found for PVC %s", options.PVC.Name)
		}

		switch options.PVC.Spec.DataSource.Kind {
		case snapshotKind:
			if *(options.PVC.Spec.DataSource.APIGroup) != snapshotAPIGroup {
				return nil, controller.ProvisioningFinished, fmt.Errorf("the PVC source does not belong to the right APIGroup. Expected %s, Got %s", snapshotAPIGroup, *(options.PVC.Spec.DataSource.APIGroup))
			}
			rc.snapshot = true
		case pvcKind:
			rc.clone = true
		default:
			klog.Infof("Unsupported DataSource specified (%s), the provisioner won't act on this request", options.PVC.Spec.DataSource.Kind)
		}
	}
	if err := p.checkDriverCapabilities(rc); err != nil {
		return nil, controller.ProvisioningFinished, err
	}

	if options.PVC.Spec.Selector != nil {
		return nil, controller.ProvisioningFinished, fmt.Errorf("claim Selector is not supported")
	}

	pvName, err := makeVolumeName(p.volumeNamePrefix, fmt.Sprintf("%s", options.PVC.ObjectMeta.UID), p.volumeNameUUIDLength)
	if err != nil {
		return nil, controller.ProvisioningFinished, err
	}

	fsTypesFound := 0
	fsType := ""
	for k, v := range options.StorageClass.Parameters {
		if strings.ToLower(k) == "fstype" || k == prefixedFsTypeKey {
			fsType = v
			fsTypesFound++
		}
		if strings.ToLower(k) == "fstype" {
			klog.Warningf(deprecationWarning("fstype", prefixedFsTypeKey, ""))
		}
	}
	if fsTypesFound > 1 {
		return nil, controller.ProvisioningFinished, fmt.Errorf("fstype specified in parameters with both \"fstype\" and \"%s\" keys", prefixedFsTypeKey)
	}
	if len(fsType) == 0 {
		fsType = defaultFSType
	}

	capacity := options.PVC.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)]
	volSizeBytes := capacity.Value()

	// add by zhongjialiang, if it is rbd request and fstype is xfs, we will check volume request size, it need to be equal or larger than 1G
	// because if the request size is less than 1G, it may occur an error when kubelet call NodeStageVolume(ceph-csi)
	// failed to run mkfs error: exit status 1, output: agsize (xxxx blocks) too small, need at least 4096 blocks
	//
	// issue: https://git-sa.nie.netease.com/venice/ceph-csi/issues/105
	klog.Infof("request volume size is %s (volSizeBytes: %d)", capacity.String(), volSizeBytes)
	if isRbdRequest(options.StorageClass.Parameters) && fsType == xfsFSType && volSizeBytes < oneGi {
		return nil, controller.ProvisioningFinished, fmt.Errorf("fstype xfs volume size request must be equal or larger than 1Gi, but your volume size request is %s ", capacity.String())
	}

	// Get access mode
	volumeCaps := make([]*csi.VolumeCapability, 0)
	for _, pvcAccessMode := range options.PVC.Spec.AccessModes {
		volumeCaps = append(volumeCaps, getVolumeCapability(options, pvcAccessMode, fsType))
	}

	// Create a CSI CreateVolumeRequest and Response
	req := csi.CreateVolumeRequest{
		Name:               pvName,
		Parameters:         options.StorageClass.Parameters,
		VolumeCapabilities: volumeCaps,
		CapacityRange: &csi.CapacityRange{
			RequiredBytes: int64(volSizeBytes),
		},
	}

	if options.PVC.Spec.DataSource != nil && (rc.clone || rc.snapshot) {
		volumeContentSource, err := p.getVolumeContentSource(options)
		if err != nil {
			return nil, controller.ProvisioningNoChange, fmt.Errorf("error getting handle for DataSource Type %s by Name %s: %v", options.PVC.Spec.DataSource.Kind, options.PVC.Spec.DataSource.Name, err)
		}
		req.VolumeContentSource = volumeContentSource
	}

	if options.PVC.Spec.DataSource != nil && rc.clone {
		err = p.setCloneFinalizer(options.PVC)
		if err != nil {
			return nil, controller.ProvisioningNoChange, err
		}
	}

	if p.supportsTopology() {
		requirements, err := GenerateAccessibilityRequirements(
			p.client,
			p.driverName,
			options.PVC.Name,
			options.StorageClass.AllowedTopologies,
			options.SelectedNode,
			p.strictTopology,
			p.csiNodeLister,
			p.nodeLister)
		if err != nil {
			return nil, controller.ProvisioningNoChange, fmt.Errorf("error generating accessibility requirements: %v", err)
		}
		req.AccessibilityRequirements = requirements
	}

	klog.V(5).Infof("CreateVolumeRequest %+v", req)

	rep := &csi.CreateVolumeResponse{}

	// Resolve provision secret credentials.
	provisionerSecretRef, err := getSecretReference(provisionerSecretParams, options.StorageClass.Parameters, pvName, options.PVC)
	if err != nil {
		return nil, controller.ProvisioningNoChange, err
	}
	provisionerCredentials, err := getCredentials(p.client, provisionerSecretRef)
	if err != nil {
		return nil, controller.ProvisioningNoChange, err
	}
	req.Secrets = provisionerCredentials

	// Resolve controller publish, node stage, node publish secret references
	controllerPublishSecretRef, err := getSecretReference(controllerPublishSecretParams, options.StorageClass.Parameters, pvName, options.PVC)
	if err != nil {
		return nil, controller.ProvisioningNoChange, err
	}
	nodeStageSecretRef, err := getSecretReference(nodeStageSecretParams, options.StorageClass.Parameters, pvName, options.PVC)
	if err != nil {
		return nil, controller.ProvisioningNoChange, err
	}
	nodePublishSecretRef, err := getSecretReference(nodePublishSecretParams, options.StorageClass.Parameters, pvName, options.PVC)
	if err != nil {
		return nil, controller.ProvisioningNoChange, err
	}
	controllerExpandSecretRef, err := getSecretReference(controllerExpandSecretParams, options.StorageClass.Parameters, pvName, options.PVC)
	if err != nil {
		return nil, controller.ProvisioningNoChange, err
	}

	req.Parameters, err = removePrefixedParameters(options.StorageClass.Parameters)
	if err != nil {
		return nil, controller.ProvisioningFinished, fmt.Errorf("failed to strip CSI Parameters of prefixed keys: %v", err)
	}

	if p.extraCreateMetadata {
		// add pvc and pv metadata to request for use by the plugin
		req.Parameters[pvcNameKey] = options.PVC.GetName()
		req.Parameters[pvcNamespaceKey] = options.PVC.GetNamespace()
		req.Parameters[pvNameKey] = pvName
	}

	ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
	defer cancel()
	rep, err = p.csiClient.CreateVolume(ctx, &req)

	if err != nil {
		// Giving up after an error and telling the pod scheduler to retry with a different node
		// only makes sense if:
		// - The CSI driver supports topology: without that, the next CreateVolume call after
		//   rescheduling will be exactly the same.
		// - We are working on a volume with late binding: only in that case will
		//   provisioning be retried if we give up for now.
		// - The error is one where rescheduling is
		//   a) allowed (i.e. we don't have to keep calling CreateVolume because the operation might be running) and
		//   b) it makes sense (typically local resource exhausted).
		//   isFinalError is going to check this.
		//
		// We do this regardless whether the driver has asked for strict topology because
		// even drivers which did not ask for it explicitly might still only look at the first
		// topology entry and thus succeed after rescheduling.
		mayReschedule := p.supportsTopology() &&
			options.SelectedNode != nil
		state := checkError(err, mayReschedule)
		klog.V(5).Infof("CreateVolume failed, supports topology = %v, node selected %v => may reschedule = %v => state = %v: %v",
			p.supportsTopology(),
			options.SelectedNode != nil,
			mayReschedule,
			state,
			err)
		return nil, state, err
	}

	if rep.Volume != nil {
		klog.V(3).Infof("create volume rep: %+v", *rep.Volume)
	}
	volumeAttributes := map[string]string{provisionerIDKey: p.identity}
	for k, v := range rep.Volume.VolumeContext {
		volumeAttributes[k] = v
	}
	respCap := rep.GetVolume().GetCapacityBytes()

	//According to CSI spec CreateVolume should be able to return capacity = 0, which means it is unknown. for example NFS/FTP
	if respCap == 0 {
		respCap = volSizeBytes
		klog.V(3).Infof("csiClient response volume with size 0, which is not supported by apiServer, will use claim size:%d", respCap)
	} else if respCap < volSizeBytes {
		capErr := fmt.Errorf("created volume capacity %v less than requested capacity %v", respCap, volSizeBytes)
		delReq := &csi.DeleteVolumeRequest{
			VolumeId: rep.GetVolume().GetVolumeId(),
		}
		err = cleanupVolume(p, delReq, provisionerCredentials)
		if err != nil {
			capErr = fmt.Errorf("%v. Cleanup of volume %s failed, volume is orphaned: %v", capErr, pvName, err)
		}
		// use InBackground to retry the call, hoping the volume is deleted correctly next time.
		return nil, controller.ProvisioningInBackground, capErr
	}

	if options.PVC.Spec.DataSource != nil {
		contentSource := rep.GetVolume().ContentSource
		if contentSource == nil {
			sourceErr := fmt.Errorf("volume content source missing")
			delReq := &csi.DeleteVolumeRequest{
				VolumeId: rep.GetVolume().GetVolumeId(),
			}
			err = cleanupVolume(p, delReq, provisionerCredentials)
			if err != nil {
				sourceErr = fmt.Errorf("%v. cleanup of volume %s failed, volume is orphaned: %v", sourceErr, pvName, err)
			}
			return nil, controller.ProvisioningInBackground, sourceErr
		}
	}

	pv := &v1.PersistentVolume{
		ObjectMeta: metav1.ObjectMeta{
			Name: pvName,
		},
		Spec: v1.PersistentVolumeSpec{
			AccessModes:  options.PVC.Spec.AccessModes,
			MountOptions: options.StorageClass.MountOptions,
			Capacity: v1.ResourceList{
				v1.ResourceName(v1.ResourceStorage): bytesToGiQuantity(respCap),
			},
			// TODO wait for CSI VolumeSource API
			PersistentVolumeSource: v1.PersistentVolumeSource{
				CSI: &v1.CSIPersistentVolumeSource{
					Driver:                     p.driverName,
					VolumeHandle:               p.volumeIdToHandle(rep.Volume.VolumeId),
					VolumeAttributes:           volumeAttributes,
					ControllerPublishSecretRef: controllerPublishSecretRef,
					NodeStageSecretRef:         nodeStageSecretRef,
					NodePublishSecretRef:       nodePublishSecretRef,
					ControllerExpandSecretRef:  controllerExpandSecretRef,
				},
			},
		},
	}

	if options.StorageClass.ReclaimPolicy != nil {
		pv.Spec.PersistentVolumeReclaimPolicy = *options.StorageClass.ReclaimPolicy
	}

	if p.supportsTopology() {
		pv.Spec.NodeAffinity = GenerateVolumeNodeAffinity(rep.Volume.AccessibleTopology)
	}

	// Set VolumeMode to PV if it is passed via PVC spec when Block feature is enabled
	if options.PVC.Spec.VolumeMode != nil {
		pv.Spec.VolumeMode = options.PVC.Spec.VolumeMode
	}
	// Set FSType if PV is not Block Volume
	if !util.CheckPersistentVolumeClaimModeBlock(options.PVC) {
		pv.Spec.PersistentVolumeSource.CSI.FSType = fsType
	}

	klog.V(2).Infof("successfully created PV %v for PVC %v and csi volume name %v", pv.Name, options.PVC.Name, pv.Spec.CSI.VolumeHandle)

	if migratedVolume {
		pv, err = p.translator.TranslateCSIPVToInTree(pv)
		if err != nil {
			klog.Warningf("failed to translate CSI PV to in-tree due to: %v. Deleting provisioned PV", err)
			deleteErr := p.Delete(pv)
			if deleteErr != nil {
				klog.Warningf("failed to delete partly provisioned PV: %v", deleteErr)
				// Retry the call again to clean up the orphan
				return nil, controller.ProvisioningInBackground, err
			}
			return nil, controller.ProvisioningFinished, err
		}
	}

	klog.V(5).Infof("successfully created PV %+v", pv.Spec.PersistentVolumeSource)
	return pv, controller.ProvisioningFinished, nil
}

2.ctrl.runVolumeWorker

According to the number of threadiness, start the goroutine of the corresponding number and run Ctrl runVolumeWorker.

It is mainly responsible for handling volumeQueue, handling the new and update events of pv objects, and calling the DeleteVolume method of csi components to delete storage and delete pv objects as needed).

        for i := 0; i < ctrl.threadiness; i++ {
			go wait.Until(ctrl.runClaimWorker, time.Second, context.TODO().Done())
			go wait.Until(ctrl.runVolumeWorker, time.Second, context.TODO().Done())
		}
// vendor/sigs.k8s.io/sig-storage-lib-external-provisioner/v5/controller/controller.go

func (ctrl *ProvisionController) runClaimWorker() {
    // Infinite loop Ctrl processNextClaimWorkItem
	for ctrl.processNextClaimWorkItem() {
	}
}

Call chain: main() -- > provisioncontroller Run() --> ctrl. runVolumeWorker() --> ctrl. processNextVolumeWorkItem() --> ctrl. syncVolumeHandler() --> ctrl. syncVolume() --> ctrl. deleteVolumeOperation() --> ctrl. provisioner. Delete()

2.1 crtl.processNextVolumeWorkItem

Main logic:
(1) Get pv from volumeQueue;
(2) Adjust Ctrl Syncclaimhandler for further processing;
(3) After successful processing, clear the ratelimit of the pv;
(4) After the processing fails, a certain number of retries will be made, that is, the pv will be added with rateLimiter;
(5) Finally, no matter how you adjust Ctrl Whether the syncvolumehandler is successful or not, remove the pv from the volumeQueue.

Mainly adjust Ctrl syncVolumeHandler

// processNextVolumeWorkItem processes items from volumeQueue
func (ctrl *ProvisionController) processNextVolumeWorkItem() bool {
    // Get pv from volumeQueue
	obj, shutdown := ctrl.volumeQueue.Get()

	if shutdown {
		return false
	}

	err := func(obj interface{}) error {
	    // Finally, no matter how you adjust Ctrl Whether the syncvolumehandler is successful or not, remove the pv from the volumeQueue
		defer ctrl.volumeQueue.Done(obj)
		var key string
		var ok bool
		if key, ok = obj.(string); !ok {
			ctrl.volumeQueue.Forget(obj)
			return fmt.Errorf("expected string in workqueue but got %#v", obj)
		}
        
        // Adjust Ctrl Further processing by volumesynchandler
		if err := ctrl.syncVolumeHandler(key); err != nil {
		    // After the processing fails, a certain number of retries will be made, that is, the pv will be added with rateLimiter
			if ctrl.failedDeleteThreshold == 0 {
				glog.Warningf("Retrying syncing volume %q, failure %v", key, ctrl.volumeQueue.NumRequeues(obj))
				ctrl.volumeQueue.AddRateLimited(obj)
			} else if ctrl.volumeQueue.NumRequeues(obj) < ctrl.failedDeleteThreshold {
				glog.Warningf("Retrying syncing volume %q because failures %v < threshold %v", key, ctrl.volumeQueue.NumRequeues(obj), ctrl.failedDeleteThreshold)
				ctrl.volumeQueue.AddRateLimited(obj)
			} else {
				glog.Errorf("Giving up syncing volume %q because failures %v >= threshold %v", key, ctrl.volumeQueue.NumRequeues(obj), ctrl.failedDeleteThreshold)
				// Done but do not Forget: it will not be in the queue but NumRequeues
				// will be saved until the obj is deleted from kubernetes
			}
			return fmt.Errorf("error syncing volume %q: %s", key, err.Error())
		}
        
        // After successful processing, clean up the rateLimiter of the pv
		ctrl.volumeQueue.Forget(obj)
		return nil
	}(obj)

	if err != nil {
		utilruntime.HandleError(err)
		return true
	}

	return true
}

2.1.1 ctrl.syncVolumeHandler

Mainly adjust Ctrl syncVolume

// syncVolumeHandler gets the volume from informer's cache then calls syncVolume
func (ctrl *ProvisionController) syncVolumeHandler(key string) error {
	volumeObj, exists, err := ctrl.volumes.GetByKey(key)
	if err != nil {
		return err
	}
	if !exists {
		utilruntime.HandleError(fmt.Errorf("volume %q in work queue no longer exists", key))
		return nil
	}

	return ctrl.syncVolume(volumeObj)
}
2.1.1.1 ctrl.syncVolume

Main logic:
(1) Adjust Ctrl Shoulddelete: judge whether to delete;
(2) To delete, press Ctrl deleteVolumeOperation.

// syncVolume checks if the volume should be deleted and deletes if so
func (ctrl *ProvisionController) syncVolume(obj interface{}) error {
	volume, ok := obj.(*v1.PersistentVolume)
	if !ok {
		return fmt.Errorf("expected volume but got %+v", obj)
	}

	if ctrl.shouldDelete(volume) {
		startTime := time.Now()
		err := ctrl.deleteVolumeOperation(volume)
		ctrl.updateDeleteStats(volume, err, startTime)
		return err
	}
	return nil
}

ctrl.shouldDelete main logic:
(1) Judge whether the provider authorizes the deletion action;
(2) In the 1.9 + version k8s, PV protection related verification is performed (false is returned when the DeletionTimestamp of pv object is not empty, indicating that the deletion of underlying storage and the deletion of pv object have been triggered);
(3) In the 1.5 + version k8s, when the state of pv is not Released or Failed, false is returned;
(2) Judge whether the recycling policy of pv is delete. If not, return false

// shouldDelete returns whether a volume should have its backing volume
// deleted, i.e. whether a Delete is "desired"
func (ctrl *ProvisionController) shouldDelete(volume *v1.PersistentVolume) bool {
    // Judge whether the provider authorizes the deletion action
	if deletionGuard, ok := ctrl.provisioner.(DeletionGuard); ok {
		if !deletionGuard.ShouldDelete(volume) {
			return false
		}
	}

	// In 1.9+ PV protection means the object will exist briefly with a
	// deletion timestamp even after our successful Delete. Ignore it.
	if ctrl.kubeVersion.AtLeast(utilversion.MustParseSemantic("v1.9.0")) {
		if ctrl.addFinalizer && !ctrl.checkFinalizer(volume, finalizerPV) && volume.ObjectMeta.DeletionTimestamp != nil {
			return false
		} else if volume.ObjectMeta.DeletionTimestamp != nil {
			return false
		}
	}

	// In 1.5+ we delete only if the volume is in state Released. In 1.4 we must
	// delete if the volume is in state Failed too.
	if ctrl.kubeVersion.AtLeast(utilversion.MustParseSemantic("v1.5.0")) {
		if volume.Status.Phase != v1.VolumeReleased {
			return false
		}
	} else {
		if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed {
			return false
		}
	}
    
    // If the policy is delete d, it is judged as false, and it is not returned as whether the policy is recycled
	if volume.Spec.PersistentVolumeReclaimPolicy != v1.PersistentVolumeReclaimDelete {
		return false
	}

	if !metav1.HasAnnotation(volume.ObjectMeta, annDynamicallyProvisioned) {
		return false
	}

	ann := volume.Annotations[annDynamicallyProvisioned]
	migratedTo := volume.Annotations[annMigratedTo]
	if ann != ctrl.provisionerName && migratedTo != ctrl.provisionerName {
		return false
	}

	return true
}
2.1.1.2 crtl.deleteVolumeOperation

Main logic:
(1) Call provisioner Delete rbd image;
(2) Delete the pv object from apiserver;
(3) Finalizer related processing.

// deleteVolumeOperation attempts to delete the volume backing the given
// volume. Returns error, which indicates whether deletion should be retried
// (requeue the volume) or not
func (ctrl *ProvisionController) deleteVolumeOperation(volume *v1.PersistentVolume) error {
	operation := fmt.Sprintf("delete %q", volume.Name)
	glog.Info(logOperation(operation, "started"))

	// This method may have been waiting for a volume lock for some time.
	// Our check does not have to be as sophisticated as PV controller's, we can
	// trust that the PV controller has set the PV to Released/Failed and it's
	// ours to delete
	newVolume, err := ctrl.client.CoreV1().PersistentVolumes().Get(volume.Name, metav1.GetOptions{})
	if err != nil {
		return nil
	}
	if !ctrl.shouldDelete(newVolume) {
		glog.Info(logOperation(operation, "persistentvolume no longer needs deletion, skipping"))
		return nil
	}

	err = ctrl.provisioner.Delete(volume)
	if err != nil {
		if ierr, ok := err.(*IgnoredError); ok {
			// Delete ignored, do nothing and hope another provisioner will delete it.
			glog.Info(logOperation(operation, "volume deletion ignored: %v", ierr))
			return nil
		}
		// Delete failed, emit an event.
		glog.Error(logOperation(operation, "volume deletion failed: %v", err))
		ctrl.eventRecorder.Event(volume, v1.EventTypeWarning, "VolumeFailedDelete", err.Error())
		return err
	}

	glog.Info(logOperation(operation, "volume deleted"))

	// Delete the volume
	if err = ctrl.client.CoreV1().PersistentVolumes().Delete(volume.Name, nil); err != nil {
		// Oops, could not delete the volume and therefore the controller will
		// try to delete the volume again on next update.
		glog.Info(logOperation(operation, "failed to delete persistentvolume: %v", err))
		return err
	}

	if ctrl.addFinalizer {
		if len(newVolume.ObjectMeta.Finalizers) > 0 {
			// Remove external-provisioner finalizer

			// need to get the pv again because the delete has updated the object with a deletion timestamp
			newVolume, err := ctrl.client.CoreV1().PersistentVolumes().Get(volume.Name, metav1.GetOptions{})
			if err != nil {
				// If the volume is not found return, otherwise error
				if !apierrs.IsNotFound(err) {
					glog.Info(logOperation(operation, "failed to get persistentvolume to update finalizer: %v", err))
					return err
				}
				return nil
			}
			finalizers := make([]string, 0)
			for _, finalizer := range newVolume.ObjectMeta.Finalizers {
				if finalizer != finalizerPV {
					finalizers = append(finalizers, finalizer)
				}
			}

			// Only update the finalizers if we actually removed something
			if len(finalizers) != len(newVolume.ObjectMeta.Finalizers) {
				newVolume.ObjectMeta.Finalizers = finalizers
				if _, err = ctrl.client.CoreV1().PersistentVolumes().Update(newVolume); err != nil {
					if !apierrs.IsNotFound(err) {
						// Couldn't remove finalizer and the object still exists, the controller may
						// try to remove the finalizer again on the next update
						glog.Info(logOperation(operation, "failed to remove finalizer for persistentvolume: %v", err))
						return err
					}
				}
			}
		}
	}

	glog.Info(logOperation(operation, "persistentvolume deleted"))

	glog.Info(logOperation(operation, "succeeded"))
	return nil
}

ctrl.provisioner.Delete is mainly used to delete storage by calling the DeleteVolume of CEPH CSI component.

Main logic:
(1) Get volumeId;
(2) Build the DeleteVolumeRequest request structure;
(3) Obtain the secret object required for deleting storage from apiserver and store it in the DeleteVolumeRequest request structure;
(4) Call p.csiclient DeleteVolume (that is, call the DeleteVolume method of CEPH CSI) to delete the storage.

func (p *csiProvisioner) Delete(volume *v1.PersistentVolume) error {
	if volume == nil {
		return fmt.Errorf("invalid CSI PV")
	}

	var err error
	if p.translator.IsPVMigratable(volume) {
		// we end up here only if CSI migration is enabled in-tree (both overall
		// and for the specific plugin that is migratable) causing in-tree PV
		// controller to yield deletion of PVs with in-tree source to external provisioner
		// based on AnnDynamicallyProvisioned annotation.
		volume, err = p.translator.TranslateInTreePVToCSI(volume)
		if err != nil {
			return err
		}
	}

	if volume.Spec.CSI == nil {
		return fmt.Errorf("invalid CSI PV")
	}

	volumeId := p.volumeHandleToId(volume.Spec.CSI.VolumeHandle)

	rc := &requiredCapabilities{}
	if err := p.checkDriverCapabilities(rc); err != nil {
		return err
	}

	req := csi.DeleteVolumeRequest{
		VolumeId: volumeId,
	}

	// get secrets if StorageClass specifies it
	storageClassName := util.GetPersistentVolumeClass(volume)
	if len(storageClassName) != 0 {
		if storageClass, err := p.scLister.Get(storageClassName); err == nil {

			// edit by zhongjialiang
			// issue: https://git-sa.nie.netease.com/venice/ceph-csi/issues/85
			secretParams := provisionerSecretParams

			if isRbdRequest(storageClass.Parameters) {
				secretParams = deleteSecretParams
			}

			// Resolve provision secret credentials.
			provisionerSecretRef, err := getSecretReference(secretParams, storageClass.Parameters, volume.Name, &v1.PersistentVolumeClaim{
				ObjectMeta: metav1.ObjectMeta{
					Name:      volume.Spec.ClaimRef.Name,
					Namespace: volume.Spec.ClaimRef.Namespace,
				},
			})
			if err != nil {
				return fmt.Errorf("failed to get secretreference for volume %s: %v", volume.Name, err)
			}

			credentials, err := getCredentials(p.client, provisionerSecretRef)
			if err != nil {
				// Continue with deletion, as the secret may have already been deleted.
				klog.Errorf("Failed to get credentials for volume %s: %s", volume.Name, err.Error())
			}
			req.Secrets = credentials
		} else {
			klog.Warningf("failed to get storageclass: %s, proceeding to delete without secrets. %v", storageClassName, err)
		}
	}
	ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
	defer cancel()

	_, err = p.csiClient.DeleteVolume(ctx, &req)

	return err
}

Here, the analysis of the provisionController of the external provisioner is finished. Here is a brief summary of the analysis of the provisionController.

summary

The provisionController is mainly responsible for handling claimQueue (i.e. handling the new and update events of pvc objects), calling the CreateVolume method of CEPH CSI component to create storage and create pv objects as needed; And handling volumeQueue (i.e. handling the new and update events of pv objects). According to the status of pv and recycling policy, decide whether to call the DeleteVolume method of CEPH CSI component to delete storage and delete pv objects.

Keywords: Ceph Kubernetes source code analysis

Added by sweetstuff2003 on Tue, 08 Mar 2022 23:17:42 +0200