CEPH CSI source code analysis RBD driver controller server analysis

Kubernetes CEPH CSI analysis directory navigation

CEPH CSI source code analysis (4) - RBD driver controller server analysis

When the driver type specified when the CEPH CSI component is started is rbd, the services related to rbd driver will be started. Then, according to the parameter configuration of controllerserver and nodeserver, decide to start controllerserver and IdentityServer, or nodeserver and IdentityServer.

Based on tag v3 zero

https://github.com/ceph/ceph-csi/releases/tag/v3.0.0

rbd driver analysis will be divided into four parts: service entry analysis, controller server analysis, nodeserver analysis and identity server analysis.

In this section, the controller server is analyzed. The controller server mainly includes CreateVolume (create storage), DeleteVolume (delete storage), ControllerExpandVolume (storage expansion), CreateSnapshot (create storage snapshot), DeleteSnapshot (delete storage snapshot) operations. Here, it is mainly used for CreateVolume (create storage), DeleteVolume (delete storage) Analyze the ControllerExpandVolume.

Controller server analysis

(1)CreateVolume

brief introduction

Call ceph storage backend to create storage (rbd image).

CreateVolume creates the volume in backend.

General steps:
(1) Check whether the driver supports the creation of storage. If not, an error will be returned directly;
(2) Build ceph request voucher and generate volume ID;
(3) Call ceph storage backend to create storage.

Three sources of creation:
(1) Copy from existing image;
(2) Create an image based on the snapshot;
(3) Create a new image directly.

CreateVolume

Main process:
(1) Check whether the driver supports the creation of storage. If not, an error will be returned directly;
(2) Build the ceph request voucher according to the secret (the secret is passed in by the external provider component);
(3) Convert the request parameters into rbdVol structure;
(4) Create a connection with Ceph cluster;
(5) Generate RbdImageName and ReservedID, and generate volume ID;
(6) Call createBackingImage to create an image.
(temporarily skip the analysis of copying from an existing image and creating an image from a snapshot)

//ceph-csi/internal/rbd/controllerserver.go

func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
    // (1) Check request parameters;  
	if err := cs.validateVolumeReq(ctx, req); err != nil {
		return nil, err
	}

	// (2) Build ceph request voucher according to secret; 
	cr, err := util.NewUserCredentials(req.GetSecrets())
	if err != nil {
		return nil, status.Error(codes.Internal, err.Error())
	}
	defer cr.DeleteCredentials()
    
    // (3) Convert the request parameters into rbdVol structure; 
	rbdVol, err := cs.parseVolCreateRequest(ctx, req)
	if err != nil {
		return nil, err
	}
	defer rbdVol.Destroy()
	// Existence and conflict checks
	if acquired := cs.VolumeLocks.TryAcquire(req.GetName()); !acquired {
		klog.Errorf(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), req.GetName())
		return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, req.GetName())
	}
	defer cs.VolumeLocks.Release(req.GetName())
    
    // (4) Create a connection with Ceph cluster; 
	err = rbdVol.Connect(cr)
	if err != nil {
		klog.Errorf(util.Log(ctx, "failed to connect to volume %v: %v"), rbdVol.RbdImageName, err)
		return nil, status.Error(codes.Internal, err.Error())
	}

	parentVol, rbdSnap, err := checkContentSource(ctx, req, cr)
	if err != nil {
		return nil, err
	}

	found, err := rbdVol.Exists(ctx, parentVol)
	if err != nil {
		return nil, getGRPCErrorForCreateVolume(err)
	}
	if found {
		if rbdSnap != nil {
			// check if image depth is reached limit and requires flatten
			err = checkFlatten(ctx, rbdVol, cr)
			if err != nil {
				return nil, err
			}
		}
		return buildCreateVolumeResponse(ctx, req, rbdVol)
	}

	err = validateRequestedVolumeSize(rbdVol, parentVol, rbdSnap, cr)
	if err != nil {
		return nil, err
	}

	err = flattenParentImage(ctx, parentVol, cr)
	if err != nil {
		return nil, err
	}
    
    // (5) Generate rbd image name and volume ID; 
	err = reserveVol(ctx, rbdVol, rbdSnap, cr)
	if err != nil {
		return nil, status.Error(codes.Internal, err.Error())
	}
	defer func() {
		if err != nil {
			if !errors.Is(err, ErrFlattenInProgress) {
				errDefer := undoVolReservation(ctx, rbdVol, cr)
				if errDefer != nil {
					klog.Warningf(util.Log(ctx, "failed undoing reservation of volume: %s (%s)"), req.GetName(), errDefer)
				}
			}
		}
	}()
    
    // (6) Call createBackingImage to create an image. 
	err = cs.createBackingImage(ctx, cr, rbdVol, parentVol, rbdSnap)
	if err != nil {
		if errors.Is(err, ErrFlattenInProgress) {
			return nil, status.Error(codes.Aborted, err.Error())
		}
		return nil, err
	}

	volumeContext := req.GetParameters()
	volumeContext["pool"] = rbdVol.Pool
	volumeContext["journalPool"] = rbdVol.JournalPool
	volumeContext["imageName"] = rbdVol.RbdImageName
	volume := &csi.Volume{
		VolumeId:      rbdVol.VolID,
		CapacityBytes: rbdVol.VolSize,
		VolumeContext: volumeContext,
		ContentSource: req.GetVolumeContentSource(),
	}
	if rbdVol.Topology != nil {
		volume.AccessibleTopology =
			[]*csi.Topology{
				{
					Segments: rbdVol.Topology,
				},
			}
	}
	return &csi.CreateVolumeResponse{Volume: volume}, nil
}
1 reserveVol

It is used to generate RbdImageName and ReservedID, and generate volume ID. Call j.ReserveName to get imageName, and call util Generatevolid generates volume ID.

//ceph-csi/internal/rbd/rbd_journal.go

// reserveVol is a helper routine to request a rbdVolume name reservation and generate the
// volume ID for the generated name.
func reserveVol(ctx context.Context, rbdVol *rbdVolume, rbdSnap *rbdSnapshot, cr *util.Credentials) error {
	var (
		err error
	)

	err = updateTopologyConstraints(rbdVol, rbdSnap)
	if err != nil {
		return err
	}

	journalPoolID, imagePoolID, err := util.GetPoolIDs(ctx, rbdVol.Monitors, rbdVol.JournalPool, rbdVol.Pool, cr)
	if err != nil {
		return err
	}

	kmsID := ""
	if rbdVol.Encrypted {
		kmsID = rbdVol.KMS.GetID()
	}

	j, err := volJournal.Connect(rbdVol.Monitors, cr)
	if err != nil {
		return err
	}
	defer j.Destroy()
    
    // Generate rbd image name
	rbdVol.ReservedID, rbdVol.RbdImageName, err = j.ReserveName(
		ctx, rbdVol.JournalPool, journalPoolID, rbdVol.Pool, imagePoolID,
		rbdVol.RequestName, rbdVol.NamePrefix, "", kmsID)
	if err != nil {
		return err
	}
    
    // Generate volume ID
	rbdVol.VolID, err = util.GenerateVolID(ctx, rbdVol.Monitors, cr, imagePoolID, rbdVol.Pool,
		rbdVol.ClusterID, rbdVol.ReservedID, volIDVersion)
	if err != nil {
		return err
	}

	util.DebugLog(ctx, "generated Volume ID (%s) and image name (%s) for request name (%s)",
		rbdVol.VolID, rbdVol.RbdImageName, rbdVol.RequestName)

	return nil
}
1.1 GenerateVolID

Call vi.ComposeCSIID() to generate volume ID.

//ceph-csi/internal/util/util.go

func GenerateVolID(ctx context.Context, monitors string, cr *Credentials, locationID int64, pool, clusterID, objUUID string, volIDVersion uint16) (string, error) {
	var err error

	if locationID == InvalidPoolID {
		locationID, err = GetPoolID(monitors, cr, pool)
		if err != nil {
			return "", err
		}
	}

	// generate the volume ID to return to the CO system
	vi := CSIIdentifier{
		LocationID:      locationID,
		EncodingVersion: volIDVersion,
		ClusterID:       clusterID,
		ObjectUUID:      objUUID,
	}

	volID, err := vi.ComposeCSIID()

	return volID, err
}

vi.ComposeCSIID() is the method to generate volume ID, which is determined by csi_id_version+ length of clusterID+ clusterID+ poolID+ ObjectUUID, 64bytes in total.

//ceph-csi/internal/util/volid.go

/*
ComposeCSIID composes a CSI ID from passed in parameters.
Version 1 of the encoding scheme is as follows,
	[csi_id_version=1:4byte] + [-:1byte]
	[length of clusterID=1:4byte] + [-:1byte]
	[clusterID:36bytes (MAX)] + [-:1byte]
	[poolID:16bytes] + [-:1byte]
	[ObjectUUID:36bytes]

	Total of constant field lengths, including '-' field separators would hence be,
	4+1+4+1+1+16+1+36 = 64
*/
func (ci CSIIdentifier) ComposeCSIID() (string, error) {
	buf16 := make([]byte, 2)
	buf64 := make([]byte, 8)

	if (knownFieldSize + len(ci.ClusterID)) > maxVolIDLen {
		return "", errors.New("CSI ID encoding length overflow")
	}

	if len(ci.ObjectUUID) != uuidSize {
		return "", errors.New("CSI ID invalid object uuid")
	}

	binary.BigEndian.PutUint16(buf16, ci.EncodingVersion)
	versionEncodedHex := hex.EncodeToString(buf16)

	binary.BigEndian.PutUint16(buf16, uint16(len(ci.ClusterID)))
	clusterIDLength := hex.EncodeToString(buf16)

	binary.BigEndian.PutUint64(buf64, uint64(ci.LocationID))
	poolIDEncodedHex := hex.EncodeToString(buf64)

	return strings.Join([]string{versionEncodedHex, clusterIDLength, ci.ClusterID,
		poolIDEncodedHex, ci.ObjectUUID}, "-"), nil
}
2 createBackingImage

Main process:
(1) Call createImage to create an image;
(2) Call j.StoreImageID to store image ID and other information into omap.

//ceph-csi/internal/rbd/controllerserver.go

func (cs *ControllerServer) createBackingImage(ctx context.Context, cr *util.Credentials, rbdVol, parentVol *rbdVolume, rbdSnap *rbdSnapshot) error {
	var err error

	var j = &journal.Connection{}
	j, err = volJournal.Connect(rbdVol.Monitors, cr)
	if err != nil {
		return status.Error(codes.Internal, err.Error())
	}
	defer j.Destroy()

	// nolint:gocritic // this ifElseChain can not be rewritten to a switch statement
	if rbdSnap != nil {
		if err = cs.OperationLocks.GetRestoreLock(rbdSnap.SnapID); err != nil {
			klog.Error(util.Log(ctx, err.Error()))
			return status.Error(codes.Aborted, err.Error())
		}
		defer cs.OperationLocks.ReleaseRestoreLock(rbdSnap.SnapID)

		err = cs.createVolumeFromSnapshot(ctx, cr, rbdVol, rbdSnap.SnapID)
		if err != nil {
			return err
		}
		util.DebugLog(ctx, "created volume %s from snapshot %s", rbdVol.RequestName, rbdSnap.RbdSnapName)
	} else if parentVol != nil {
		if err = cs.OperationLocks.GetCloneLock(parentVol.VolID); err != nil {
			klog.Error(util.Log(ctx, err.Error()))
			return status.Error(codes.Aborted, err.Error())
		}
		defer cs.OperationLocks.ReleaseCloneLock(parentVol.VolID)
		return rbdVol.createCloneFromImage(ctx, parentVol)
	} else {
		err = createImage(ctx, rbdVol, cr)
		if err != nil {
			klog.Errorf(util.Log(ctx, "failed to create volume: %v"), err)
			return status.Error(codes.Internal, err.Error())
		}
	}

	util.DebugLog(ctx, "created volume %s backed by image %s", rbdVol.RequestName, rbdVol.RbdImageName)

	defer func() {
		if err != nil {
			if !errors.Is(err, ErrFlattenInProgress) {
				if deleteErr := deleteImage(ctx, rbdVol, cr); deleteErr != nil {
					klog.Errorf(util.Log(ctx, "failed to delete rbd image: %s with error: %v"), rbdVol, deleteErr)
				}
			}
		}
	}()
	err = rbdVol.getImageID()
	if err != nil {
		klog.Errorf(util.Log(ctx, "failed to get volume id %s: %v"), rbdVol, err)
		return status.Error(codes.Internal, err.Error())
	}

	err = j.StoreImageID(ctx, rbdVol.JournalPool, rbdVol.ReservedID, rbdVol.ImageID, cr)
	if err != nil {
		klog.Errorf(util.Log(ctx, "failed to reserve volume %s: %v"), rbdVol, err)
		return status.Error(codes.Internal, err.Error())
	}

	if rbdSnap != nil {
		err = rbdVol.flattenRbdImage(ctx, cr, false, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth)
		if err != nil {
			klog.Errorf(util.Log(ctx, "failed to flatten image %s: %v"), rbdVol, err)
			return err
		}
	}
	if rbdVol.Encrypted {
		err = rbdVol.ensureEncryptionMetadataSet(rbdImageRequiresEncryption)
		if err != nil {
			klog.Errorf(util.Log(ctx, "failed to save encryption status, deleting image %s: %s"),
				rbdVol, err)
			return status.Error(codes.Internal, err.Error())
		}
	}
	return nil
}
2.1 StoreImageID

Call setOMapKeys to store ImageID and ReservedID in omap.

//ceph-csi/internal/journal/voljournal.go

// StoreImageID stores the image ID in omap.
func (conn *Connection) StoreImageID(ctx context.Context, pool, reservedUUID, imageID string, cr *util.Credentials) error {
	err := setOMapKeys(ctx, conn, pool, conn.config.namespace, conn.config.cephUUIDDirectoryPrefix+reservedUUID,
		map[string]string{conn.config.csiImageIDKey: imageID})
	if err != nil {
		return err
	}
	return nil
}
//ceph-csi/internal/journal/omap.go

func setOMapKeys(
	ctx context.Context,
	conn *Connection,
	poolName, namespace, oid string, pairs map[string]string) error {
	// fetch and configure the rados ioctx
	ioctx, err := conn.conn.GetIoctx(poolName)
	if err != nil {
		return omapPoolError(err)
	}
	defer ioctx.Destroy()

	if namespace != "" {
		ioctx.SetNamespace(namespace)
	}

	bpairs := make(map[string][]byte, len(pairs))
	for k, v := range pairs {
		bpairs[k] = []byte(v)
	}
	err = ioctx.SetOmap(oid, bpairs)
	if err != nil {
		klog.Errorf(
			util.Log(ctx, "failed setting omap keys (pool=%q, namespace=%q, name=%q, pairs=%+v): %v"),
			poolName, namespace, oid, pairs, err)
		return err
	}
	util.DebugLog(ctx, "set omap keys (pool=%q, namespace=%q, name=%q): %+v)",
		poolName, namespace, oid, pairs)
	return nil
}
imageName/volUUID (reserved ID) / VolumeID example
apiVersion: v1
kind: PersistentVolume
metadata:
  annotations:
    pv.kubernetes.io/provisioned-by: rbd.csi.ceph.com
  creationTimestamp: "2020-08-31T08:23:14Z"
  finalizers:
  - kubernetes.io/pv-protection
  name: pvc-c61bb423-609b-49c0-bbd7-e3db1ad7ba8d
  resourceVersion: "66339437"
  selfLink: /api/v1/persistentvolumes/pvc-c61bb423-609b-49c0-bbd7-e3db1ad7ba8d
  uid: 37f1cbde-deaa-4daf-a8f0-4e61bef47a61
spec:
  accessModes:
  - ReadWriteOnce
  capacity:
    storage: 1Gi
  claimRef:
    apiVersion: v1
    kind: PersistentVolumeClaim
    name: zjl-test2
    namespace: test
    resourceVersion: "66339424"
    uid: c61bb423-609b-49c0-bbd7-e3db1ad7ba8d
  csi:
    controllerExpandSecretRef:
      name: csi-rbd-secret
      namespace: test
    driver: rbd.csi.ceph.com
    fsType: ext4
    nodeStageSecretRef:
      name: csi-rbd-secret
      namespace: test
    volumeAttributes:
      clusterID: 0bba3be9-0a1c-41db-a619-26ffea20161e
      imageFeatures: layering
      imageName: csi-vol-36eaa70e-eb63-11ea-8e79-246e96907f74
      journalPool: kubernetes
      pool: kubernetes
      storage.kubernetes.io/csiProvisionerIdentity: 1598236777786-8081-rbd.csi.ceph.com
    volumeHandle: 0001-0024-0bba3be9-0a1c-41db-a619-26ffea20161e-0000000000000004-36eaa70e-eb63-11ea-8e79-246e96907f74
  mountOptions:
  - discard
  persistentVolumeReclaimPolicy: Delete
  storageClassName: csi-rbd-sc
  volumeMode: Filesystem
status:
  phase: Bound

imageName: csi-vol-36eaa70e-eb63-11ea-8e79-246e96907f74
volUUID/ReservedID: 36eaa70e-eb63-11ea-8e79-246e96907f74
VolumeID: 0001-0024-0bba3be9-0a1c-41db-a619-26ffea20161e-0000000000000004-36eaa70e-eb63-11ea-8e79-246e96907f74

setOMap(ReservedID/ImageID)

Example of CEPH CSI component log

Action: CreateVolume
Source: deployment: CSI rbdplugin provider, container: CSI rbdplugin

I0827 07:24:54.293449       1 utils.go:159] ID: 5013 Req-ID: pvc-6eb10d8d-7c2c-4792-9ed6-df17996b1423 GRPC call: /csi.v1.Controller/CreateVolume
I0827 07:24:54.296490       1 utils.go:160] ID: 5013 Req-ID: pvc-6eb10d8d-7c2c-4792-9ed6-df17996b1423 GRPC request: {"capacity_range":{"required_bytes":1073741824},"name":"pvc-6eb10d8d-7c2c-4792-9ed6-df17996b1423","parameters":{"clusterID":"0bba3be9-0a1c-41db-a619-26ffea20161e","imageFeatures":"layering","pool":"kubernetes"},"secrets":"***stripped***","volume_capabilities":[{"AccessType":{"Mount":{"fs_type":"ext4","mount_flags":["discard"]}},"access_mode":{"mode":1}}]}
I0827 07:24:54.296666       1 rbd_util.go:722] ID: 5013 Req-ID: pvc-6eb10d8d-7c2c-4792-9ed6-df17996b1423 setting disableInUseChecks on rbd volume to: false
I0827 07:24:54.298803       1 omap.go:74] ID: 5013 Req-ID: pvc-6eb10d8d-7c2c-4792-9ed6-df17996b1423 got omap values: (pool="kubernetes", namespace="", name="csi.volumes.default"): map[]
I0827 07:24:54.332458       1 omap.go:140] ID: 5013 Req-ID: pvc-6eb10d8d-7c2c-4792-9ed6-df17996b1423 set omap keys (pool="kubernetes", namespace="", name="csi.volumes.default"): map[csi.volume.pvc-6eb10d8d-7c2c-4792-9ed6-df17996b1423:670cc2a2-e836-11ea-8e79-246e96907f74])
I0827 07:24:54.342465       1 omap.go:140] ID: 5013 Req-ID: pvc-6eb10d8d-7c2c-4792-9ed6-df17996b1423 set omap keys (pool="kubernetes", namespace="", name="csi.volume.670cc2a2-e836-11ea-8e79-246e96907f74"): map[csi.imagename:csi-vol-670cc2a2-e836-11ea-8e79-246e96907f74 csi.volname:pvc-6eb10d8d-7c2c-4792-9ed6-df17996b1423])
I0827 07:24:54.342511       1 rbd_journal.go:445] ID: 5013 Req-ID: pvc-6eb10d8d-7c2c-4792-9ed6-df17996b1423 generated Volume ID (0001-0024-0bba3be9-0a1c-41db-a619-26ffea20161e-0000000000000004-670cc2a2-e836-11ea-8e79-246e96907f74) and image name (csi-vol-670cc2a2-e836-11ea-8e79-246e96907f74) for request name (pvc-6eb10d8d-7c2c-4792-9ed6-df17996b1423)
I0827 07:24:54.342594       1 rbd_util.go:191] ID: 5013 Req-ID: pvc-6eb10d8d-7c2c-4792-9ed6-df17996b1423 rbd: create kubernetes/csi-vol-670cc2a2-e836-11ea-8e79-246e96907f74 size 1024M (features: [layering]) using mon 10.248.32.13:6789,10.248.32.14:6789,10.248.32.15:6789
I0827 07:24:54.395928       1 controllerserver.go:446] ID: 5013 Req-ID: pvc-6eb10d8d-7c2c-4792-9ed6-df17996b1423 created volume pvc-6eb10d8d-7c2c-4792-9ed6-df17996b1423 backed by image csi-vol-670cc2a2-e836-11ea-8e79-246e96907f74
I0827 07:24:54.479605       1 omap.go:140] ID: 5013 Req-ID: pvc-6eb10d8d-7c2c-4792-9ed6-df17996b1423 set omap keys (pool="kubernetes", namespace="", name="csi.volume.670cc2a2-e836-11ea-8e79-246e96907f74"): map[csi.imageid:e5832a10d20c])
I0827 07:24:54.481188       1 utils.go:165] ID: 5013 Req-ID: pvc-6eb10d8d-7c2c-4792-9ed6-df17996b1423 GRPC response: {"volume":{"capacity_bytes":1073741824,"volume_context":{"clusterID":"0bba3be9-0a1c-41db-a619-26ffea20161e","imageFeatures":"layering","imageName":"csi-vol-670cc2a2-e836-11ea-8e79-246e96907f74","journalPool":"kubernetes","pool":"kubernetes"},"volume_id":"0001-0024-0bba3be9-0a1c-41db-a619-26ffea20161e-0000000000000004-670cc2a2-e836-11ea-8e79-246e96907f74"}}

(2)DeleteVolume

brief introduction

Call ceph storage backend and delete storage (rbd image).
DeleteVolume deletes the volume in backend and removes the volume metadata from store

General steps:
(1) Check whether the driver supports the deletion of storage. If not, an error will be returned directly;
(2) Judge whether the image is still in use (whether there are watchers);
(3) If it is not used, call ceph storage backend to delete the image.

DeleteVolume

Main process:
(1) Check whether the driver supports the deletion of storage. If not, an error will be returned directly;
(2) Build the ceph request voucher according to the secret (the secret is passed in by the external provider component);
(3) Obtain the volumeID from the request parameter;
(4) Convert the request parameters into rbdVol structure;
(5) Judge whether the rbd image is still being used. If it is used, an error is returned;
(6) Delete temporary rbd image (temporary image of clone operation);
(7) Delete rbd image.

//ceph-csi/internal/rbd/controllerserver.go

func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
    // (1) Check request parameters;
	if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
		klog.Errorf(util.Log(ctx, "invalid delete volume req: %v"), protosanitizer.StripSecrets(req))
		return nil, err
	}
    
    // (2) Build ceph request voucher according to secret;
	cr, err := util.NewUserCredentials(req.GetSecrets())
	if err != nil {
		return nil, status.Error(codes.Internal, err.Error())
	}
	defer cr.DeleteCredentials()
    
    // (3) Obtain the volumeID from the request parameter; 
	// For now the image get unconditionally deleted, but here retention policy can be checked
	volumeID := req.GetVolumeId()
	if volumeID == "" {
		return nil, status.Error(codes.InvalidArgument, "empty volume ID in request")
	}

	if acquired := cs.VolumeLocks.TryAcquire(volumeID); !acquired {
		klog.Errorf(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), volumeID)
		return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
	}
	defer cs.VolumeLocks.Release(volumeID)

	// lock out volumeID for clone and expand operation
	if err = cs.OperationLocks.GetDeleteLock(volumeID); err != nil {
		klog.Error(util.Log(ctx, err.Error()))
		return nil, status.Error(codes.Aborted, err.Error())
	}
	defer cs.OperationLocks.ReleaseDeleteLock(volumeID)

	rbdVol := &rbdVolume{}
	defer rbdVol.Destroy()
    
    // (4) Convert the request parameters into rbdVol structure; 
	rbdVol, err = genVolFromVolID(ctx, volumeID, cr, req.GetSecrets())
	if err != nil {
		if errors.Is(err, util.ErrPoolNotFound) {
			klog.Warningf(util.Log(ctx, "failed to get backend volume for %s: %v"), volumeID, err)
			return &csi.DeleteVolumeResponse{}, nil
		}

		// if error is ErrKeyNotFound, then a previous attempt at deletion was complete
		// or partially complete (image and imageOMap are garbage collected already), hence return
		// success as deletion is complete
		if errors.Is(err, util.ErrKeyNotFound) {
			klog.Warningf(util.Log(ctx, "Failed to volume options for %s: %v"), volumeID, err)
			return &csi.DeleteVolumeResponse{}, nil
		}

		// All errors other than ErrImageNotFound should return an error back to the caller
		if !errors.Is(err, ErrImageNotFound) {
			return nil, status.Error(codes.Internal, err.Error())
		}

		// If error is ErrImageNotFound then we failed to find the image, but found the imageOMap
		// to lead us to the image, hence the imageOMap needs to be garbage collected, by calling
		// unreserve for the same
		if acquired := cs.VolumeLocks.TryAcquire(rbdVol.RequestName); !acquired {
			klog.Errorf(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), rbdVol.RequestName)
			return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, rbdVol.RequestName)
		}
		defer cs.VolumeLocks.Release(rbdVol.RequestName)

		if err = undoVolReservation(ctx, rbdVol, cr); err != nil {
			return nil, status.Error(codes.Internal, err.Error())
		}
		return &csi.DeleteVolumeResponse{}, nil
	}
	defer rbdVol.Destroy()

	// lock out parallel create requests against the same volume name as we
	// cleanup the image and associated omaps for the same
	if acquired := cs.VolumeLocks.TryAcquire(rbdVol.RequestName); !acquired {
		klog.Errorf(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), rbdVol.RequestName)
		return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, rbdVol.RequestName)
	}
	defer cs.VolumeLocks.Release(rbdVol.RequestName)
    
    // (5) Judge whether the rbd image is still being used. If it is used, an error is returned;
	inUse, err := rbdVol.isInUse()
	if err != nil {
		klog.Errorf(util.Log(ctx, "failed getting information for image (%s): (%s)"), rbdVol, err)
		return nil, status.Error(codes.Internal, err.Error())
	}
	if inUse {
		klog.Errorf(util.Log(ctx, "rbd %s is still being used"), rbdVol)
		return nil, status.Errorf(codes.Internal, "rbd %s is still being used", rbdVol.RbdImageName)
	}
    
    // (6) Delete temporary rbd image;
	// delete the temporary rbd image created as part of volume clone during
	// create volume
	tempClone := rbdVol.generateTempClone()
	err = deleteImage(ctx, tempClone, cr)
	if err != nil {
		// return error if it is not ErrImageNotFound
		if !errors.Is(err, ErrImageNotFound) {
			klog.Errorf(util.Log(ctx, "failed to delete rbd image: %s with error: %v"),
				tempClone, err)
			return nil, status.Error(codes.Internal, err.Error())
		}
	}

    // (7) Delete rbd image
	// Deleting rbd image
	util.DebugLog(ctx, "deleting image %s", rbdVol.RbdImageName)
	if err = deleteImage(ctx, rbdVol, cr); err != nil {
		klog.Errorf(util.Log(ctx, "failed to delete rbd image: %s with error: %v"),
			rbdVol, err)
		return nil, status.Error(codes.Internal, err.Error())
	}

	if err = undoVolReservation(ctx, rbdVol, cr); err != nil {
		klog.Errorf(util.Log(ctx, "failed to remove reservation for volume (%s) with backing image (%s) (%s)"),
			rbdVol.RequestName, rbdVol.RbdImageName, err)
		return nil, status.Error(codes.Internal, err.Error())
	}

	if rbdVol.Encrypted {
		if err = rbdVol.KMS.DeletePassphrase(rbdVol.VolID); err != nil {
			klog.Warningf(util.Log(ctx, "failed to clean the passphrase for volume %s: %s"), rbdVol.VolID, err)
		}
	}

	return &csi.DeleteVolumeResponse{}, nil
}
isInUse

Judge whether the image is being used by listing the watcher of the image. isInUse returns false when the image does not have a watcher; otherwise, it returns true.

//ceph-csi/internal/rbd/rbd_util.go

func (rv *rbdVolume) isInUse() (bool, error) {
	image, err := rv.open()
	if err != nil {
		if errors.Is(err, ErrImageNotFound) || errors.Is(err, util.ErrPoolNotFound) {
			return false, err
		}
		// any error should assume something else is using the image
		return true, err
	}
	defer image.Close()

	watchers, err := image.ListWatchers()
	if err != nil {
		return false, err
	}

	// because we opened the image, there is at least one watcher
	return len(watchers) != 1, nil
}

(3)ControllerExpandVolume

brief introduction

Call ceph storage backend to expand storage (rbd image).

ControllerExpandVolume expand RBD Volumes on demand based on resizer request.

General steps:
(1) Check whether the driver supports storage expansion. If not, an error will be returned directly;
(2) Call ceph to store the back-end to expand the image.

In fact, the storage expansion is divided into two steps. The first step is that the external provider component calls the rbdtype controllerserver type CEPH CSI to perform ControllerExpandVolume, which is mainly responsible for the expansion of the underlying storage; The second step is that kubelet calls CEPH CSI of rbdtype nodeserver type to perform NodeExpandVolume, which is mainly responsible for synchronizing the capacity expansion information of the underlying rbd image to rbd/nbd device and expanding the xfs/ext file system (when the volumemode is block, node side capacity expansion is not required).

ControllerExpandVolume

Main process:
(1) Check whether the driver supports storage expansion. If not, an error will be returned directly;
(2) Build the ceph request voucher according to the secret (the secret is passed in by the external provider component);
(3) Convert the request parameters into rbdVol structure;
(4) Call rbdvol Resize to expand the image.

func (cs *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
	if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_EXPAND_VOLUME); err != nil {
		klog.Errorf(util.Log(ctx, "invalid expand volume req: %v"), protosanitizer.StripSecrets(req))
		return nil, err
	}

	volID := req.GetVolumeId()
	if volID == "" {
		return nil, status.Error(codes.InvalidArgument, "volume ID cannot be empty")
	}

	capRange := req.GetCapacityRange()
	if capRange == nil {
		return nil, status.Error(codes.InvalidArgument, "capacityRange cannot be empty")
	}

	// lock out parallel requests against the same volume ID
	if acquired := cs.VolumeLocks.TryAcquire(volID); !acquired {
		klog.Errorf(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), volID)
		return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volID)
	}
	defer cs.VolumeLocks.Release(volID)

	// lock out volumeID for clone and delete operation
	if err := cs.OperationLocks.GetExpandLock(volID); err != nil {
		klog.Error(util.Log(ctx, err.Error()))
		return nil, status.Error(codes.Aborted, err.Error())
	}
	defer cs.OperationLocks.ReleaseExpandLock(volID)

	cr, err := util.NewUserCredentials(req.GetSecrets())
	if err != nil {
		return nil, status.Error(codes.Internal, err.Error())
	}
	defer cr.DeleteCredentials()

	rbdVol := &rbdVolume{}
	defer rbdVol.Destroy()

	rbdVol, err = genVolFromVolID(ctx, volID, cr, req.GetSecrets())
	if err != nil {
		// nolint:gocritic // this ifElseChain can not be rewritten to a switch statement
		if errors.Is(err, ErrImageNotFound) {
			err = status.Errorf(codes.NotFound, "volume ID %s not found", volID)
		} else if errors.Is(err, util.ErrPoolNotFound) {
			klog.Errorf(util.Log(ctx, "failed to get backend volume for %s: %v"), volID, err)
			err = status.Errorf(codes.NotFound, err.Error())
		} else {
			err = status.Errorf(codes.Internal, err.Error())
		}
		return nil, err
	}
	defer rbdVol.Destroy()

	if rbdVol.Encrypted {
		return nil, status.Errorf(codes.InvalidArgument, "encrypted volumes do not support resize (%s)",
			rbdVol)
	}

	// always round up the request size in bytes to the nearest MiB/GiB
	volSize := util.RoundOffBytes(req.GetCapacityRange().GetRequiredBytes())

	// resize volume if required
	nodeExpansion := false
	if rbdVol.VolSize < volSize {
		util.DebugLog(ctx, "rbd volume %s size is %v,resizing to %v", rbdVol, rbdVol.VolSize, volSize)
		rbdVol.VolSize = volSize
		nodeExpansion = true
		err = rbdVol.resize(ctx, cr)
		if err != nil {
			klog.Errorf(util.Log(ctx, "failed to resize rbd image: %s with error: %v"), rbdVol, err)
			return nil, status.Error(codes.Internal, err.Error())
		}
	}

	return &csi.ControllerExpandVolumeResponse{
		CapacityBytes:         rbdVol.VolSize,
		NodeExpansionRequired: nodeExpansion,
	}, nil
}
rbdVol.resize

Splice the resize command and execute it.

func (rv *rbdVolume) resize(ctx context.Context, cr *util.Credentials) error {
	mon := rv.Monitors
	volSzMiB := fmt.Sprintf("%dM", util.RoundOffVolSize(rv.VolSize))

	args := []string{"resize", rv.String(), "--size", volSzMiB, "--id", cr.ID, "-m", mon, "--keyfile=" + cr.KeyFile}
	_, stderr, err := util.ExecCommand(ctx, "rbd", args...)

	if err != nil {
		return fmt.Errorf("failed to resize rbd image (%w), command output: %s", err, stderr)
	}

	return nil
}

So far, the analysis of RBD driver controller server has been completed. Here is a summary.

RBD driver controller server analysis summary

(1) Create snapshot, create controller, delete volume and delete volume The functions of ControllerExpandVolume are as follows:

CreateVolume: call ceph storage backend to create storage (rbd image).
DeleteVolume: call ceph storage backend to delete storage (rbd image).
ControllerExpandVolume: call ceph storage backend to expand storage (rbd image).

(2) The storage expansion is divided into two steps. The first step is that the external provider component calls the rbdtype controllerserver type CEPH CSI to perform ControllerExpandVolume, which is mainly responsible for the expansion of the underlying storage; The second step is that kubelet calls CEPH CSI of rbdtype nodeserver type to perform NodeExpandVolume, which is mainly responsible for synchronizing the capacity expansion information of the underlying rbd image to rbd/nbd device and expanding the xfs/ext file system (when the volumemode is block, node side capacity expansion is not required).

Keywords: Ceph Kubernetes source code source code analysis

Added by Sharif on Sun, 20 Feb 2022 01:44:50 +0200