Kubernetes CEPH CSI analysis directory navigation
CEPH CSI source code analysis (4) - RBD driver controller server analysis
When the driver type specified when the CEPH CSI component is started is rbd, the services related to rbd driver will be started. Then, according to the parameter configuration of controllerserver and nodeserver, decide to start controllerserver and IdentityServer, or nodeserver and IdentityServer.
Based on tag v3 zero
https://github.com/ceph/ceph-csi/releases/tag/v3.0.0
rbd driver analysis will be divided into four parts: service entry analysis, controller server analysis, nodeserver analysis and identity server analysis.
In this section, the controller server is analyzed. The controller server mainly includes CreateVolume (create storage), DeleteVolume (delete storage), ControllerExpandVolume (storage expansion), CreateSnapshot (create storage snapshot), DeleteSnapshot (delete storage snapshot) operations. Here, it is mainly used for CreateVolume (create storage), DeleteVolume (delete storage) Analyze the ControllerExpandVolume.
Controller server analysis
(1)CreateVolume
brief introduction
Call ceph storage backend to create storage (rbd image).
CreateVolume creates the volume in backend.
General steps:
(1) Check whether the driver supports the creation of storage. If not, an error will be returned directly;
(2) Build ceph request voucher and generate volume ID;
(3) Call ceph storage backend to create storage.
Three sources of creation:
(1) Copy from existing image;
(2) Create an image based on the snapshot;
(3) Create a new image directly.
CreateVolume
Main process:
(1) Check whether the driver supports the creation of storage. If not, an error will be returned directly;
(2) Build the ceph request voucher according to the secret (the secret is passed in by the external provider component);
(3) Convert the request parameters into rbdVol structure;
(4) Create a connection with Ceph cluster;
(5) Generate RbdImageName and ReservedID, and generate volume ID;
(6) Call createBackingImage to create an image.
(temporarily skip the analysis of copying from an existing image and creating an image from a snapshot)
//ceph-csi/internal/rbd/controllerserver.go func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { // (1) Check request parameters; if err := cs.validateVolumeReq(ctx, req); err != nil { return nil, err } // (2) Build ceph request voucher according to secret; cr, err := util.NewUserCredentials(req.GetSecrets()) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } defer cr.DeleteCredentials() // (3) Convert the request parameters into rbdVol structure; rbdVol, err := cs.parseVolCreateRequest(ctx, req) if err != nil { return nil, err } defer rbdVol.Destroy() // Existence and conflict checks if acquired := cs.VolumeLocks.TryAcquire(req.GetName()); !acquired { klog.Errorf(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), req.GetName()) return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, req.GetName()) } defer cs.VolumeLocks.Release(req.GetName()) // (4) Create a connection with Ceph cluster; err = rbdVol.Connect(cr) if err != nil { klog.Errorf(util.Log(ctx, "failed to connect to volume %v: %v"), rbdVol.RbdImageName, err) return nil, status.Error(codes.Internal, err.Error()) } parentVol, rbdSnap, err := checkContentSource(ctx, req, cr) if err != nil { return nil, err } found, err := rbdVol.Exists(ctx, parentVol) if err != nil { return nil, getGRPCErrorForCreateVolume(err) } if found { if rbdSnap != nil { // check if image depth is reached limit and requires flatten err = checkFlatten(ctx, rbdVol, cr) if err != nil { return nil, err } } return buildCreateVolumeResponse(ctx, req, rbdVol) } err = validateRequestedVolumeSize(rbdVol, parentVol, rbdSnap, cr) if err != nil { return nil, err } err = flattenParentImage(ctx, parentVol, cr) if err != nil { return nil, err } // (5) Generate rbd image name and volume ID; err = reserveVol(ctx, rbdVol, rbdSnap, cr) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } defer func() { if err != nil { if !errors.Is(err, ErrFlattenInProgress) { errDefer := undoVolReservation(ctx, rbdVol, cr) if errDefer != nil { klog.Warningf(util.Log(ctx, "failed undoing reservation of volume: %s (%s)"), req.GetName(), errDefer) } } } }() // (6) Call createBackingImage to create an image. err = cs.createBackingImage(ctx, cr, rbdVol, parentVol, rbdSnap) if err != nil { if errors.Is(err, ErrFlattenInProgress) { return nil, status.Error(codes.Aborted, err.Error()) } return nil, err } volumeContext := req.GetParameters() volumeContext["pool"] = rbdVol.Pool volumeContext["journalPool"] = rbdVol.JournalPool volumeContext["imageName"] = rbdVol.RbdImageName volume := &csi.Volume{ VolumeId: rbdVol.VolID, CapacityBytes: rbdVol.VolSize, VolumeContext: volumeContext, ContentSource: req.GetVolumeContentSource(), } if rbdVol.Topology != nil { volume.AccessibleTopology = []*csi.Topology{ { Segments: rbdVol.Topology, }, } } return &csi.CreateVolumeResponse{Volume: volume}, nil }
1 reserveVol
It is used to generate RbdImageName and ReservedID, and generate volume ID. Call j.ReserveName to get imageName, and call util Generatevolid generates volume ID.
//ceph-csi/internal/rbd/rbd_journal.go // reserveVol is a helper routine to request a rbdVolume name reservation and generate the // volume ID for the generated name. func reserveVol(ctx context.Context, rbdVol *rbdVolume, rbdSnap *rbdSnapshot, cr *util.Credentials) error { var ( err error ) err = updateTopologyConstraints(rbdVol, rbdSnap) if err != nil { return err } journalPoolID, imagePoolID, err := util.GetPoolIDs(ctx, rbdVol.Monitors, rbdVol.JournalPool, rbdVol.Pool, cr) if err != nil { return err } kmsID := "" if rbdVol.Encrypted { kmsID = rbdVol.KMS.GetID() } j, err := volJournal.Connect(rbdVol.Monitors, cr) if err != nil { return err } defer j.Destroy() // Generate rbd image name rbdVol.ReservedID, rbdVol.RbdImageName, err = j.ReserveName( ctx, rbdVol.JournalPool, journalPoolID, rbdVol.Pool, imagePoolID, rbdVol.RequestName, rbdVol.NamePrefix, "", kmsID) if err != nil { return err } // Generate volume ID rbdVol.VolID, err = util.GenerateVolID(ctx, rbdVol.Monitors, cr, imagePoolID, rbdVol.Pool, rbdVol.ClusterID, rbdVol.ReservedID, volIDVersion) if err != nil { return err } util.DebugLog(ctx, "generated Volume ID (%s) and image name (%s) for request name (%s)", rbdVol.VolID, rbdVol.RbdImageName, rbdVol.RequestName) return nil }
1.1 GenerateVolID
Call vi.ComposeCSIID() to generate volume ID.
//ceph-csi/internal/util/util.go func GenerateVolID(ctx context.Context, monitors string, cr *Credentials, locationID int64, pool, clusterID, objUUID string, volIDVersion uint16) (string, error) { var err error if locationID == InvalidPoolID { locationID, err = GetPoolID(monitors, cr, pool) if err != nil { return "", err } } // generate the volume ID to return to the CO system vi := CSIIdentifier{ LocationID: locationID, EncodingVersion: volIDVersion, ClusterID: clusterID, ObjectUUID: objUUID, } volID, err := vi.ComposeCSIID() return volID, err }
vi.ComposeCSIID() is the method to generate volume ID, which is determined by csi_id_version+ length of clusterID+ clusterID+ poolID+ ObjectUUID, 64bytes in total.
//ceph-csi/internal/util/volid.go /* ComposeCSIID composes a CSI ID from passed in parameters. Version 1 of the encoding scheme is as follows, [csi_id_version=1:4byte] + [-:1byte] [length of clusterID=1:4byte] + [-:1byte] [clusterID:36bytes (MAX)] + [-:1byte] [poolID:16bytes] + [-:1byte] [ObjectUUID:36bytes] Total of constant field lengths, including '-' field separators would hence be, 4+1+4+1+1+16+1+36 = 64 */ func (ci CSIIdentifier) ComposeCSIID() (string, error) { buf16 := make([]byte, 2) buf64 := make([]byte, 8) if (knownFieldSize + len(ci.ClusterID)) > maxVolIDLen { return "", errors.New("CSI ID encoding length overflow") } if len(ci.ObjectUUID) != uuidSize { return "", errors.New("CSI ID invalid object uuid") } binary.BigEndian.PutUint16(buf16, ci.EncodingVersion) versionEncodedHex := hex.EncodeToString(buf16) binary.BigEndian.PutUint16(buf16, uint16(len(ci.ClusterID))) clusterIDLength := hex.EncodeToString(buf16) binary.BigEndian.PutUint64(buf64, uint64(ci.LocationID)) poolIDEncodedHex := hex.EncodeToString(buf64) return strings.Join([]string{versionEncodedHex, clusterIDLength, ci.ClusterID, poolIDEncodedHex, ci.ObjectUUID}, "-"), nil }
2 createBackingImage
Main process:
(1) Call createImage to create an image;
(2) Call j.StoreImageID to store image ID and other information into omap.
//ceph-csi/internal/rbd/controllerserver.go func (cs *ControllerServer) createBackingImage(ctx context.Context, cr *util.Credentials, rbdVol, parentVol *rbdVolume, rbdSnap *rbdSnapshot) error { var err error var j = &journal.Connection{} j, err = volJournal.Connect(rbdVol.Monitors, cr) if err != nil { return status.Error(codes.Internal, err.Error()) } defer j.Destroy() // nolint:gocritic // this ifElseChain can not be rewritten to a switch statement if rbdSnap != nil { if err = cs.OperationLocks.GetRestoreLock(rbdSnap.SnapID); err != nil { klog.Error(util.Log(ctx, err.Error())) return status.Error(codes.Aborted, err.Error()) } defer cs.OperationLocks.ReleaseRestoreLock(rbdSnap.SnapID) err = cs.createVolumeFromSnapshot(ctx, cr, rbdVol, rbdSnap.SnapID) if err != nil { return err } util.DebugLog(ctx, "created volume %s from snapshot %s", rbdVol.RequestName, rbdSnap.RbdSnapName) } else if parentVol != nil { if err = cs.OperationLocks.GetCloneLock(parentVol.VolID); err != nil { klog.Error(util.Log(ctx, err.Error())) return status.Error(codes.Aborted, err.Error()) } defer cs.OperationLocks.ReleaseCloneLock(parentVol.VolID) return rbdVol.createCloneFromImage(ctx, parentVol) } else { err = createImage(ctx, rbdVol, cr) if err != nil { klog.Errorf(util.Log(ctx, "failed to create volume: %v"), err) return status.Error(codes.Internal, err.Error()) } } util.DebugLog(ctx, "created volume %s backed by image %s", rbdVol.RequestName, rbdVol.RbdImageName) defer func() { if err != nil { if !errors.Is(err, ErrFlattenInProgress) { if deleteErr := deleteImage(ctx, rbdVol, cr); deleteErr != nil { klog.Errorf(util.Log(ctx, "failed to delete rbd image: %s with error: %v"), rbdVol, deleteErr) } } } }() err = rbdVol.getImageID() if err != nil { klog.Errorf(util.Log(ctx, "failed to get volume id %s: %v"), rbdVol, err) return status.Error(codes.Internal, err.Error()) } err = j.StoreImageID(ctx, rbdVol.JournalPool, rbdVol.ReservedID, rbdVol.ImageID, cr) if err != nil { klog.Errorf(util.Log(ctx, "failed to reserve volume %s: %v"), rbdVol, err) return status.Error(codes.Internal, err.Error()) } if rbdSnap != nil { err = rbdVol.flattenRbdImage(ctx, cr, false, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth) if err != nil { klog.Errorf(util.Log(ctx, "failed to flatten image %s: %v"), rbdVol, err) return err } } if rbdVol.Encrypted { err = rbdVol.ensureEncryptionMetadataSet(rbdImageRequiresEncryption) if err != nil { klog.Errorf(util.Log(ctx, "failed to save encryption status, deleting image %s: %s"), rbdVol, err) return status.Error(codes.Internal, err.Error()) } } return nil }
2.1 StoreImageID
Call setOMapKeys to store ImageID and ReservedID in omap.
//ceph-csi/internal/journal/voljournal.go // StoreImageID stores the image ID in omap. func (conn *Connection) StoreImageID(ctx context.Context, pool, reservedUUID, imageID string, cr *util.Credentials) error { err := setOMapKeys(ctx, conn, pool, conn.config.namespace, conn.config.cephUUIDDirectoryPrefix+reservedUUID, map[string]string{conn.config.csiImageIDKey: imageID}) if err != nil { return err } return nil }
//ceph-csi/internal/journal/omap.go func setOMapKeys( ctx context.Context, conn *Connection, poolName, namespace, oid string, pairs map[string]string) error { // fetch and configure the rados ioctx ioctx, err := conn.conn.GetIoctx(poolName) if err != nil { return omapPoolError(err) } defer ioctx.Destroy() if namespace != "" { ioctx.SetNamespace(namespace) } bpairs := make(map[string][]byte, len(pairs)) for k, v := range pairs { bpairs[k] = []byte(v) } err = ioctx.SetOmap(oid, bpairs) if err != nil { klog.Errorf( util.Log(ctx, "failed setting omap keys (pool=%q, namespace=%q, name=%q, pairs=%+v): %v"), poolName, namespace, oid, pairs, err) return err } util.DebugLog(ctx, "set omap keys (pool=%q, namespace=%q, name=%q): %+v)", poolName, namespace, oid, pairs) return nil }
imageName/volUUID (reserved ID) / VolumeID example
apiVersion: v1 kind: PersistentVolume metadata: annotations: pv.kubernetes.io/provisioned-by: rbd.csi.ceph.com creationTimestamp: "2020-08-31T08:23:14Z" finalizers: - kubernetes.io/pv-protection name: pvc-c61bb423-609b-49c0-bbd7-e3db1ad7ba8d resourceVersion: "66339437" selfLink: /api/v1/persistentvolumes/pvc-c61bb423-609b-49c0-bbd7-e3db1ad7ba8d uid: 37f1cbde-deaa-4daf-a8f0-4e61bef47a61 spec: accessModes: - ReadWriteOnce capacity: storage: 1Gi claimRef: apiVersion: v1 kind: PersistentVolumeClaim name: zjl-test2 namespace: test resourceVersion: "66339424" uid: c61bb423-609b-49c0-bbd7-e3db1ad7ba8d csi: controllerExpandSecretRef: name: csi-rbd-secret namespace: test driver: rbd.csi.ceph.com fsType: ext4 nodeStageSecretRef: name: csi-rbd-secret namespace: test volumeAttributes: clusterID: 0bba3be9-0a1c-41db-a619-26ffea20161e imageFeatures: layering imageName: csi-vol-36eaa70e-eb63-11ea-8e79-246e96907f74 journalPool: kubernetes pool: kubernetes storage.kubernetes.io/csiProvisionerIdentity: 1598236777786-8081-rbd.csi.ceph.com volumeHandle: 0001-0024-0bba3be9-0a1c-41db-a619-26ffea20161e-0000000000000004-36eaa70e-eb63-11ea-8e79-246e96907f74 mountOptions: - discard persistentVolumeReclaimPolicy: Delete storageClassName: csi-rbd-sc volumeMode: Filesystem status: phase: Bound
imageName: csi-vol-36eaa70e-eb63-11ea-8e79-246e96907f74
volUUID/ReservedID: 36eaa70e-eb63-11ea-8e79-246e96907f74
VolumeID: 0001-0024-0bba3be9-0a1c-41db-a619-26ffea20161e-0000000000000004-36eaa70e-eb63-11ea-8e79-246e96907f74
setOMap(ReservedID/ImageID)
Example of CEPH CSI component log
Action: CreateVolume
Source: deployment: CSI rbdplugin provider, container: CSI rbdplugin
I0827 07:24:54.293449 1 utils.go:159] ID: 5013 Req-ID: pvc-6eb10d8d-7c2c-4792-9ed6-df17996b1423 GRPC call: /csi.v1.Controller/CreateVolume I0827 07:24:54.296490 1 utils.go:160] ID: 5013 Req-ID: pvc-6eb10d8d-7c2c-4792-9ed6-df17996b1423 GRPC request: {"capacity_range":{"required_bytes":1073741824},"name":"pvc-6eb10d8d-7c2c-4792-9ed6-df17996b1423","parameters":{"clusterID":"0bba3be9-0a1c-41db-a619-26ffea20161e","imageFeatures":"layering","pool":"kubernetes"},"secrets":"***stripped***","volume_capabilities":[{"AccessType":{"Mount":{"fs_type":"ext4","mount_flags":["discard"]}},"access_mode":{"mode":1}}]} I0827 07:24:54.296666 1 rbd_util.go:722] ID: 5013 Req-ID: pvc-6eb10d8d-7c2c-4792-9ed6-df17996b1423 setting disableInUseChecks on rbd volume to: false I0827 07:24:54.298803 1 omap.go:74] ID: 5013 Req-ID: pvc-6eb10d8d-7c2c-4792-9ed6-df17996b1423 got omap values: (pool="kubernetes", namespace="", name="csi.volumes.default"): map[] I0827 07:24:54.332458 1 omap.go:140] ID: 5013 Req-ID: pvc-6eb10d8d-7c2c-4792-9ed6-df17996b1423 set omap keys (pool="kubernetes", namespace="", name="csi.volumes.default"): map[csi.volume.pvc-6eb10d8d-7c2c-4792-9ed6-df17996b1423:670cc2a2-e836-11ea-8e79-246e96907f74]) I0827 07:24:54.342465 1 omap.go:140] ID: 5013 Req-ID: pvc-6eb10d8d-7c2c-4792-9ed6-df17996b1423 set omap keys (pool="kubernetes", namespace="", name="csi.volume.670cc2a2-e836-11ea-8e79-246e96907f74"): map[csi.imagename:csi-vol-670cc2a2-e836-11ea-8e79-246e96907f74 csi.volname:pvc-6eb10d8d-7c2c-4792-9ed6-df17996b1423]) I0827 07:24:54.342511 1 rbd_journal.go:445] ID: 5013 Req-ID: pvc-6eb10d8d-7c2c-4792-9ed6-df17996b1423 generated Volume ID (0001-0024-0bba3be9-0a1c-41db-a619-26ffea20161e-0000000000000004-670cc2a2-e836-11ea-8e79-246e96907f74) and image name (csi-vol-670cc2a2-e836-11ea-8e79-246e96907f74) for request name (pvc-6eb10d8d-7c2c-4792-9ed6-df17996b1423) I0827 07:24:54.342594 1 rbd_util.go:191] ID: 5013 Req-ID: pvc-6eb10d8d-7c2c-4792-9ed6-df17996b1423 rbd: create kubernetes/csi-vol-670cc2a2-e836-11ea-8e79-246e96907f74 size 1024M (features: [layering]) using mon 10.248.32.13:6789,10.248.32.14:6789,10.248.32.15:6789 I0827 07:24:54.395928 1 controllerserver.go:446] ID: 5013 Req-ID: pvc-6eb10d8d-7c2c-4792-9ed6-df17996b1423 created volume pvc-6eb10d8d-7c2c-4792-9ed6-df17996b1423 backed by image csi-vol-670cc2a2-e836-11ea-8e79-246e96907f74 I0827 07:24:54.479605 1 omap.go:140] ID: 5013 Req-ID: pvc-6eb10d8d-7c2c-4792-9ed6-df17996b1423 set omap keys (pool="kubernetes", namespace="", name="csi.volume.670cc2a2-e836-11ea-8e79-246e96907f74"): map[csi.imageid:e5832a10d20c]) I0827 07:24:54.481188 1 utils.go:165] ID: 5013 Req-ID: pvc-6eb10d8d-7c2c-4792-9ed6-df17996b1423 GRPC response: {"volume":{"capacity_bytes":1073741824,"volume_context":{"clusterID":"0bba3be9-0a1c-41db-a619-26ffea20161e","imageFeatures":"layering","imageName":"csi-vol-670cc2a2-e836-11ea-8e79-246e96907f74","journalPool":"kubernetes","pool":"kubernetes"},"volume_id":"0001-0024-0bba3be9-0a1c-41db-a619-26ffea20161e-0000000000000004-670cc2a2-e836-11ea-8e79-246e96907f74"}}
(2)DeleteVolume
brief introduction
Call ceph storage backend and delete storage (rbd image).
DeleteVolume deletes the volume in backend and removes the volume metadata from store
General steps:
(1) Check whether the driver supports the deletion of storage. If not, an error will be returned directly;
(2) Judge whether the image is still in use (whether there are watchers);
(3) If it is not used, call ceph storage backend to delete the image.
DeleteVolume
Main process:
(1) Check whether the driver supports the deletion of storage. If not, an error will be returned directly;
(2) Build the ceph request voucher according to the secret (the secret is passed in by the external provider component);
(3) Obtain the volumeID from the request parameter;
(4) Convert the request parameters into rbdVol structure;
(5) Judge whether the rbd image is still being used. If it is used, an error is returned;
(6) Delete temporary rbd image (temporary image of clone operation);
(7) Delete rbd image.
//ceph-csi/internal/rbd/controllerserver.go func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { // (1) Check request parameters; if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil { klog.Errorf(util.Log(ctx, "invalid delete volume req: %v"), protosanitizer.StripSecrets(req)) return nil, err } // (2) Build ceph request voucher according to secret; cr, err := util.NewUserCredentials(req.GetSecrets()) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } defer cr.DeleteCredentials() // (3) Obtain the volumeID from the request parameter; // For now the image get unconditionally deleted, but here retention policy can be checked volumeID := req.GetVolumeId() if volumeID == "" { return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") } if acquired := cs.VolumeLocks.TryAcquire(volumeID); !acquired { klog.Errorf(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), volumeID) return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) } defer cs.VolumeLocks.Release(volumeID) // lock out volumeID for clone and expand operation if err = cs.OperationLocks.GetDeleteLock(volumeID); err != nil { klog.Error(util.Log(ctx, err.Error())) return nil, status.Error(codes.Aborted, err.Error()) } defer cs.OperationLocks.ReleaseDeleteLock(volumeID) rbdVol := &rbdVolume{} defer rbdVol.Destroy() // (4) Convert the request parameters into rbdVol structure; rbdVol, err = genVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) if err != nil { if errors.Is(err, util.ErrPoolNotFound) { klog.Warningf(util.Log(ctx, "failed to get backend volume for %s: %v"), volumeID, err) return &csi.DeleteVolumeResponse{}, nil } // if error is ErrKeyNotFound, then a previous attempt at deletion was complete // or partially complete (image and imageOMap are garbage collected already), hence return // success as deletion is complete if errors.Is(err, util.ErrKeyNotFound) { klog.Warningf(util.Log(ctx, "Failed to volume options for %s: %v"), volumeID, err) return &csi.DeleteVolumeResponse{}, nil } // All errors other than ErrImageNotFound should return an error back to the caller if !errors.Is(err, ErrImageNotFound) { return nil, status.Error(codes.Internal, err.Error()) } // If error is ErrImageNotFound then we failed to find the image, but found the imageOMap // to lead us to the image, hence the imageOMap needs to be garbage collected, by calling // unreserve for the same if acquired := cs.VolumeLocks.TryAcquire(rbdVol.RequestName); !acquired { klog.Errorf(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), rbdVol.RequestName) return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, rbdVol.RequestName) } defer cs.VolumeLocks.Release(rbdVol.RequestName) if err = undoVolReservation(ctx, rbdVol, cr); err != nil { return nil, status.Error(codes.Internal, err.Error()) } return &csi.DeleteVolumeResponse{}, nil } defer rbdVol.Destroy() // lock out parallel create requests against the same volume name as we // cleanup the image and associated omaps for the same if acquired := cs.VolumeLocks.TryAcquire(rbdVol.RequestName); !acquired { klog.Errorf(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), rbdVol.RequestName) return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, rbdVol.RequestName) } defer cs.VolumeLocks.Release(rbdVol.RequestName) // (5) Judge whether the rbd image is still being used. If it is used, an error is returned; inUse, err := rbdVol.isInUse() if err != nil { klog.Errorf(util.Log(ctx, "failed getting information for image (%s): (%s)"), rbdVol, err) return nil, status.Error(codes.Internal, err.Error()) } if inUse { klog.Errorf(util.Log(ctx, "rbd %s is still being used"), rbdVol) return nil, status.Errorf(codes.Internal, "rbd %s is still being used", rbdVol.RbdImageName) } // (6) Delete temporary rbd image; // delete the temporary rbd image created as part of volume clone during // create volume tempClone := rbdVol.generateTempClone() err = deleteImage(ctx, tempClone, cr) if err != nil { // return error if it is not ErrImageNotFound if !errors.Is(err, ErrImageNotFound) { klog.Errorf(util.Log(ctx, "failed to delete rbd image: %s with error: %v"), tempClone, err) return nil, status.Error(codes.Internal, err.Error()) } } // (7) Delete rbd image // Deleting rbd image util.DebugLog(ctx, "deleting image %s", rbdVol.RbdImageName) if err = deleteImage(ctx, rbdVol, cr); err != nil { klog.Errorf(util.Log(ctx, "failed to delete rbd image: %s with error: %v"), rbdVol, err) return nil, status.Error(codes.Internal, err.Error()) } if err = undoVolReservation(ctx, rbdVol, cr); err != nil { klog.Errorf(util.Log(ctx, "failed to remove reservation for volume (%s) with backing image (%s) (%s)"), rbdVol.RequestName, rbdVol.RbdImageName, err) return nil, status.Error(codes.Internal, err.Error()) } if rbdVol.Encrypted { if err = rbdVol.KMS.DeletePassphrase(rbdVol.VolID); err != nil { klog.Warningf(util.Log(ctx, "failed to clean the passphrase for volume %s: %s"), rbdVol.VolID, err) } } return &csi.DeleteVolumeResponse{}, nil }
isInUse
Judge whether the image is being used by listing the watcher of the image. isInUse returns false when the image does not have a watcher; otherwise, it returns true.
//ceph-csi/internal/rbd/rbd_util.go func (rv *rbdVolume) isInUse() (bool, error) { image, err := rv.open() if err != nil { if errors.Is(err, ErrImageNotFound) || errors.Is(err, util.ErrPoolNotFound) { return false, err } // any error should assume something else is using the image return true, err } defer image.Close() watchers, err := image.ListWatchers() if err != nil { return false, err } // because we opened the image, there is at least one watcher return len(watchers) != 1, nil }
(3)ControllerExpandVolume
brief introduction
Call ceph storage backend to expand storage (rbd image).
ControllerExpandVolume expand RBD Volumes on demand based on resizer request.
General steps:
(1) Check whether the driver supports storage expansion. If not, an error will be returned directly;
(2) Call ceph to store the back-end to expand the image.
In fact, the storage expansion is divided into two steps. The first step is that the external provider component calls the rbdtype controllerserver type CEPH CSI to perform ControllerExpandVolume, which is mainly responsible for the expansion of the underlying storage; The second step is that kubelet calls CEPH CSI of rbdtype nodeserver type to perform NodeExpandVolume, which is mainly responsible for synchronizing the capacity expansion information of the underlying rbd image to rbd/nbd device and expanding the xfs/ext file system (when the volumemode is block, node side capacity expansion is not required).
ControllerExpandVolume
Main process:
(1) Check whether the driver supports storage expansion. If not, an error will be returned directly;
(2) Build the ceph request voucher according to the secret (the secret is passed in by the external provider component);
(3) Convert the request parameters into rbdVol structure;
(4) Call rbdvol Resize to expand the image.
func (cs *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) { if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_EXPAND_VOLUME); err != nil { klog.Errorf(util.Log(ctx, "invalid expand volume req: %v"), protosanitizer.StripSecrets(req)) return nil, err } volID := req.GetVolumeId() if volID == "" { return nil, status.Error(codes.InvalidArgument, "volume ID cannot be empty") } capRange := req.GetCapacityRange() if capRange == nil { return nil, status.Error(codes.InvalidArgument, "capacityRange cannot be empty") } // lock out parallel requests against the same volume ID if acquired := cs.VolumeLocks.TryAcquire(volID); !acquired { klog.Errorf(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), volID) return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volID) } defer cs.VolumeLocks.Release(volID) // lock out volumeID for clone and delete operation if err := cs.OperationLocks.GetExpandLock(volID); err != nil { klog.Error(util.Log(ctx, err.Error())) return nil, status.Error(codes.Aborted, err.Error()) } defer cs.OperationLocks.ReleaseExpandLock(volID) cr, err := util.NewUserCredentials(req.GetSecrets()) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } defer cr.DeleteCredentials() rbdVol := &rbdVolume{} defer rbdVol.Destroy() rbdVol, err = genVolFromVolID(ctx, volID, cr, req.GetSecrets()) if err != nil { // nolint:gocritic // this ifElseChain can not be rewritten to a switch statement if errors.Is(err, ErrImageNotFound) { err = status.Errorf(codes.NotFound, "volume ID %s not found", volID) } else if errors.Is(err, util.ErrPoolNotFound) { klog.Errorf(util.Log(ctx, "failed to get backend volume for %s: %v"), volID, err) err = status.Errorf(codes.NotFound, err.Error()) } else { err = status.Errorf(codes.Internal, err.Error()) } return nil, err } defer rbdVol.Destroy() if rbdVol.Encrypted { return nil, status.Errorf(codes.InvalidArgument, "encrypted volumes do not support resize (%s)", rbdVol) } // always round up the request size in bytes to the nearest MiB/GiB volSize := util.RoundOffBytes(req.GetCapacityRange().GetRequiredBytes()) // resize volume if required nodeExpansion := false if rbdVol.VolSize < volSize { util.DebugLog(ctx, "rbd volume %s size is %v,resizing to %v", rbdVol, rbdVol.VolSize, volSize) rbdVol.VolSize = volSize nodeExpansion = true err = rbdVol.resize(ctx, cr) if err != nil { klog.Errorf(util.Log(ctx, "failed to resize rbd image: %s with error: %v"), rbdVol, err) return nil, status.Error(codes.Internal, err.Error()) } } return &csi.ControllerExpandVolumeResponse{ CapacityBytes: rbdVol.VolSize, NodeExpansionRequired: nodeExpansion, }, nil }
rbdVol.resize
Splice the resize command and execute it.
func (rv *rbdVolume) resize(ctx context.Context, cr *util.Credentials) error { mon := rv.Monitors volSzMiB := fmt.Sprintf("%dM", util.RoundOffVolSize(rv.VolSize)) args := []string{"resize", rv.String(), "--size", volSzMiB, "--id", cr.ID, "-m", mon, "--keyfile=" + cr.KeyFile} _, stderr, err := util.ExecCommand(ctx, "rbd", args...) if err != nil { return fmt.Errorf("failed to resize rbd image (%w), command output: %s", err, stderr) } return nil }
So far, the analysis of RBD driver controller server has been completed. Here is a summary.
RBD driver controller server analysis summary
(1) Create snapshot, create controller, delete volume and delete volume The functions of ControllerExpandVolume are as follows:
CreateVolume: call ceph storage backend to create storage (rbd image).
DeleteVolume: call ceph storage backend to delete storage (rbd image).
ControllerExpandVolume: call ceph storage backend to expand storage (rbd image).
(2) The storage expansion is divided into two steps. The first step is that the external provider component calls the rbdtype controllerserver type CEPH CSI to perform ControllerExpandVolume, which is mainly responsible for the expansion of the underlying storage; The second step is that kubelet calls CEPH CSI of rbdtype nodeserver type to perform NodeExpandVolume, which is mainly responsible for synchronizing the capacity expansion information of the underlying rbd image to rbd/nbd device and expanding the xfs/ext file system (when the volumemode is block, node side capacity expansion is not required).