diff --git a/README.md b/README.md index fbd5e9e..436e15c 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,7 @@ in the Kubernetes documentation. ## Develop You can follow the [csi spec](https://github.com/container-storage-interface/spec/blob/master/spec.md) -and [curve cli](docs/Curve%20Interface.md). +and [curve interface](docs/curve-interface). ## Setup @@ -33,7 +33,7 @@ and [curve cli](docs/Curve%20Interface.md). 2. Choose a way to deploy the plugin: - Using the helm chart: [helm installation](charts/curve-csi/README.md) -- Using the kubernetes manifests: refer to the specific version in `deploy/kubernetes` +- Using the kubernetes manifests: refer to the specific version in `deploy/manifests` ## Test and User Guide diff --git a/charts/curve-csi/templates/csi-daemonset.yaml b/charts/curve-csi/templates/csi-daemonset.yaml index 869c0ce..e9e970b 100644 --- a/charts/curve-csi/templates/csi-daemonset.yaml +++ b/charts/curve-csi/templates/csi-daemonset.yaml @@ -52,9 +52,6 @@ spec: image: "{{ .Values.nodeplugin.plugin.image }}" args: - --endpoint=$(CSI_ENDPOINT) -{{- if .Values.common.curveVolumeNamingPrefix }} - - "--curve-volume-prefix={{ .Values.common.curveVolumeNamingPrefix }}" -{{- end }} - --drivername=curve.csi.netease.com - --nodeid=$(NODE_ID) {{- if .Values.nodeplugin.debug.enabled }} diff --git a/charts/curve-csi/templates/csi-deployment.yaml b/charts/curve-csi/templates/csi-deployment.yaml index 5b1f563..ed1a387 100644 --- a/charts/curve-csi/templates/csi-deployment.yaml +++ b/charts/curve-csi/templates/csi-deployment.yaml @@ -114,11 +114,9 @@ spec: image: "{{ .Values.controllerplugin.plugin.image }}" args: - --endpoint=$(CSI_ENDPOINT) -{{- if .Values.common.curveVolumeNamingPrefix }} - - "--curve-volume-prefix={{ .Values.common.curveVolumeNamingPrefix }}" -{{- end }} - --drivername=curve.csi.netease.com - --nodeid=$(NODE_ID) + - --snapshot-server={{ .Values.controllerplugin.snapshotServer }} {{- if .Values.controllerplugin.debug.enabled }} - "--debug-port={{ .Values.controllerplugin.debug.port }}" {{- end }} diff --git a/charts/curve-csi/values.yaml b/charts/curve-csi/values.yaml index 425474a..fab2ab6 100644 --- a/charts/curve-csi/values.yaml +++ b/charts/curve-csi/values.yaml @@ -28,6 +28,8 @@ nodeplugin: controllerplugin: replicas: 2 + snapshotServer: http://127.0.0.1:5555 + debug: enabled: true port: 9696 @@ -66,7 +68,3 @@ controllerplugin: tolerations: [] affinity: {} - -# common variables -common: - curveVolumeNamingPrefix: "" diff --git a/cmd/curve-csi.go b/cmd/curve-csi.go index b3f6cb3..6d5df8c 100644 --- a/cmd/curve-csi.go +++ b/cmd/curve-csi.go @@ -48,8 +48,8 @@ func init() { flag.BoolVar(&curveConf.IsNodeServer, "node-server", false, "start curve-csi node server") flag.BoolVar(&curveConf.IsControllerServer, "controller-server", false, "start curve-csi controller server") - // curve volume name prefix - flag.StringVar(&curveConf.CurveVolumePrefix, "curve-volume-prefix", "csi-vol-", "curve volume name prefix") + // curve snashot/clone server + flag.StringVar(&curveConf.SnapshotServer, "snapshot-server", "http://127.0.0.1:5555", "curve snapshot/clone http server address, set empty to disable snapshot") // debug flag.IntVar(&curveConf.DebugPort, "debug-port", 0, "debug port, set 0 to disable") diff --git a/cmd/options/options.go b/cmd/options/options.go index 1f27086..46f1234 100644 --- a/cmd/options/options.go +++ b/cmd/options/options.go @@ -26,8 +26,8 @@ type CurveConf struct { IsControllerServer bool IsNodeServer bool - // curve volume namd prefix - CurveVolumePrefix string + // curve flags + SnapshotServer string // debugs DebugPort int diff --git a/deploy/manifests/provisioner-deploy.yaml b/deploy/manifests/provisioner-deploy.yaml index 8f57982..ea569a2 100644 --- a/deploy/manifests/provisioner-deploy.yaml +++ b/deploy/manifests/provisioner-deploy.yaml @@ -105,6 +105,7 @@ spec: - --endpoint=$(CSI_ENDPOINT) - --drivername=curve.csi.netease.com - --nodeid=$(NODE_ID) + - "--snapshot-server=http://127.0.0.1:5555" - --controller-server=true - --debug-port=9696 - --logtostderr=false diff --git a/docs/README.md b/docs/README.md index 972a586..d50c583 100644 --- a/docs/README.md +++ b/docs/README.md @@ -2,6 +2,21 @@ This document provides more detail about curve-csi driver. +- [Deploy](#deploy) + - [Requirements](#requirements) + - [Using the helm chart](#using-the-helm-chart) + - [Using the kubernetes manifests](#using-the-kubernetes-manifests) +- [Debug](#debug) +- [Examples](#examples) + - [Create StorageClass](#create-storageclass) + - [Create PersistentVolumeClaim](#create-persistentvolumeclaim) + - [Create Test Pod](#create-test-pod) + - [Test block volume](#test-block-volume) + - [Test volume expanding](#test-volume-expanding) + - [Test snapshot](#test-snapshot) + - [Test volume clone](#test-volume-clone) +- [Test Using CSC Tool](#test-using-csc-tool) + ## Deploy #### Requirements @@ -36,7 +51,7 @@ and call: curl -XPUT http://127.0.0.1:/debug/flags/v -d '5' ``` -## Example +## Examples #### Create StorageClass @@ -93,8 +108,8 @@ spec: ``` ## create the pvc and pod: -kubectl create -f ../examples/block/pvc.yaml -kubectl create -f ../examples/block/pod.yaml +kubectl create -f ../examples/pvc-block.yaml +kubectl create -f ../examples/pod-block.yaml ## waiting for the pod running kubectl exec -it csi-curve-test-block bash @@ -111,8 +126,23 @@ kubectl exec -it csi-curve-test-block bash #### Test snapshot +Prerequisite: [install snapshot-controller](https://kubernetes-csi.github.io/docs/snapshot-controller.html) + + +Create snapshot: + +``` +kubectl create -f ../examples/snapshotclass.yaml +kubectl create -f ../examples/snapshot.yaml +``` + #### Test volume clone +``` +kubectl create -f ../examples/pvc.yaml +kubectl create -f ../examples/pvc-clone.yaml +kubectl create -f ../examples/pvc-restore.yaml +``` ## Test Using CSC Tool @@ -164,6 +194,8 @@ $ csc controller create --endpoint tcp://127.0.0.1:10000 \ "0003-k8s-csi-vol-volume-fa0c04c9-2e93-487e-8986-1e1625fd8c46" 10737418240 "user"="k8s" ``` +If the volume is block type, set: `--cap 5,1` + Check: ```text @@ -190,6 +222,8 @@ $ csc node stage --endpoint tcp://127.0.0.1:10000 \ 0003-k8s-csi-vol-volume-fa0c04c9-2e93-487e-8986-1e1625fd8c46 ``` +If the volume is block type, set: `--cap 5,1` + Check: ```text @@ -197,9 +231,9 @@ $ sudo curve-nbd list-mapped id image device 97297 cbd:k8s//k8s/pvc-ce482926-91d8-11ea-bf6e-fa163e23ce53_k8s_ /dev/nbd0 -$ sudo findmnt /mnt/test-csi/volume-globalmount +$ sudo findmnt /mnt/test-csi/volume-globalmount/0003-k8s-csi-vol-volume-fa0c04c9-2e93-487e-8986-1e1625fd8c46 TARGET SOURCE FSTYPE OPTIONS -/mnt/test-csi/volume-globalmount /dev/nbd0 ext4 rw,relatime,data=ordered +/mnt/test-csi/volume-globalmount/0003-k8s-csi-vol-volume-fa0c04c9-2e93-487e-8986-1e1625fd8c46 /dev/nbd0 ext4 rw,relatime,data=ordered ``` #### NodePublish a volume @@ -253,8 +287,8 @@ $ csc controller expand --endpoint tcp://127.0.0.1:10000 \ $ # nodeExpand: $ csc node expand --endpoint tcp://127.0.0.1:10000 \ - /mnt/test-csi/test-pod \ - 0003-k8s-csi-vol-volume-fa0c04c9-2e93-487e-8986-1e1625fd8c46 + 0003-k8s-csi-vol-volume-fa0c04c9-2e93-487e-8986-1e1625fd8c46 \ + /mnt/test-csi/test-pod 0 ``` @@ -317,3 +351,18 @@ $ sudo curve stat --user k8s --filename /k8s/csi-vol-volume-fa0c04c9-2e93-487e-8 E 2020-08-24T13:57:55.946636+0800 58360 mds_client.cpp:395] GetFileInfo: filename = /k8s/volume-fa0c04c9-2e93-487e-8986-1e1625fd8c46, owner = k8s, errocde = 6, error msg = kFileNotExists, log id = 1 stat fail, ret = -6 ``` + +#### Snapshot + +``` +## create a snapshot +$ csc controller create-snapshot --endpoint tcp://127.0.0.1:10000 \ + --source-volume 0003-k8s-csi-vol-volume-fa0c04c9-2e93-487e-8986-1e1625fd8c46 \ + snapshot-215d24ff-c04c-4b08-a1fb-692c94627c63 +"0024-9ea1a8fc-160d-47ef-b2ef-f0e09677b066-0003-k8s-csi-vol-volume-fa0c04c9-2e93-487e-8986-1e1625fd8c46" 23622320128 0003-k8s-csi-vol-volume-fa0c04c9-2e93-487e-8986-1e1625fd8c46 seconds:1639297566 nanos:780828000 true + +## delete a snapshot +$ csc controller delete-snapshot --endpoint tcp://127.0.0.1:10000 \ + 0024-9ea1a8fc-160d-47ef-b2ef-f0e09677b066-0003-k8s-csi-vol-volume-fa0c04c9-2e93-487e-8986-1e1625fd8c46 +0024-9ea1a8fc-160d-47ef-b2ef-f0e09677b066-0003-k8s-csi-vol-volume-fa0c04c9-2e93-487e-8986-1e1625fd8c46 +``` diff --git a/docs/clone.md b/docs/clone.md new file mode 100644 index 0000000..fef4dbe --- /dev/null +++ b/docs/clone.md @@ -0,0 +1,21 @@ +### Verify if PVC is in Bound state + +```bash +$ kubectl get pvc +NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGE +curve-test-pvc Bound pvc-b2789e09-9854-4aa1-b556-d9b0e0569f87 30Gi RWO curve 35m +``` + +### Create a new cloned PVC + +``` +kubectl create -f ../examples/pvc-clone.yaml +``` + +### Get PVC + +```bash +$ kubectl get pvc curve-pvc-clone +NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGE +curve-pvc-clone Bound pvc-755cfb47-9b03-41b5-bdf9-0772a1ae41ef 40Gi RWO curve 3s +``` diff --git a/docs/Curve Interface.md b/docs/curve-interface/curve-cli.md similarity index 100% rename from docs/Curve Interface.md rename to docs/curve-interface/curve-cli.md diff --git a/docs/Curve-nbd.md b/docs/curve-interface/curve-nbd.md similarity index 100% rename from docs/Curve-nbd.md rename to docs/curve-interface/curve-nbd.md diff --git a/docs/Curve SnapshotClone Interface.md b/docs/curve-interface/snapshot-clone-service.md similarity index 88% rename from docs/Curve SnapshotClone Interface.md rename to docs/curve-interface/snapshot-clone-service.md index 341b8d1..2593558 100644 --- a/docs/Curve SnapshotClone Interface.md +++ b/docs/curve-interface/snapshot-clone-service.md @@ -39,10 +39,10 @@ HTTP/1.1 200 OK Content-Length: xxx { -​ "Code" : "0", -​ "Message" : "Exec success.", -​ "RequestId" : "xxx", -​ "UUID" : "xxx" + "Code" : "0", + "Message" : "Exec success.", + "RequestId" : "xxx", + "UUID" : "xxx" } ``` @@ -84,9 +84,9 @@ HTTP/1.1 200 OK Content-Length: xxx { -​ "Code" : "0", -​ "Message" : "Exec success.", -​ "RequestId" : "xxx" + "Code" : "0", + "Message" : "Exec success.", + "RequestId" : "xxx" } ``` @@ -128,9 +128,9 @@ HTTP/1.1 200 OK Content-Length: xxx { -​ "Code" : "0", -​ "Message" : "Exec success.", -​ "RequestId" : "xxx" + "Code" : "0", + "Message" : "Exec success.", + "RequestId" : "xxx" } ``` @@ -191,21 +191,21 @@ HTTP/1.1 200 OK Content-Length: xxx { -​ "Code" : "0", -​ "Message" : "Exec success.", -​ "RequestId" : "xxx", -​ "TotalCount": 1, -​ "Snapshots":[{ -​ "File" : "/zjm/test1", -​ "FileLength" : 10737418240, -​ "Name" : "snap1", -​ "Progress" : 30, -​ "SeqNum" : 1, -​ "Status" : 1, -​ "Time" : 1564391913582677, -​ "UUID" : "de06df66-b9e4-44df-ba3d-ac94ddee0b28", -​ "User" : "zjm" - }] + "Code" : "0", + "Message" : "Exec success.", + "RequestId" : "xxx", + "TotalCount": 1, + "Snapshots":[{ + "File" : "/zjm/test1", + "FileLength" : 10737418240, + "Name" : "snap1", + "Progress" : 30, + "SeqNum" : 1, + "Status" : 1, + "Time" : 1564391913582677, + "UUID" : "de06df66-b9e4-44df-ba3d-ac94ddee0b28", + "User" : "zjm" + }] } ``` @@ -249,10 +249,10 @@ HTTP/1.1 200 OK Content-Length: xxx { -​ "Code" : "0", -​ "Message" : "Exec success.", -​ "RequestId" : "xxx", -​ "UUID" : "xxx" + "Code" : "0", + "Message" : "Exec success.", + "RequestId" : "xxx", + "UUID" : "xxx" } ``` @@ -296,10 +296,10 @@ HTTP/1.1 200 OK Content-Length: xxx { -​ "Code" : "0", -​ "Message" : "Exec success.", -​ "RequestId" : "xxx", -​ "UUID" : "xxx" + "Code" : "0", + "Message" : "Exec success.", + "RequestId" : "xxx", + "UUID" : "xxx" } ``` @@ -340,9 +340,9 @@ HTTP/1.1 200 OK Content-Length: xxx { -​ "Code" : "0", -​ "Message" : "Exec success.", -​ "RequestId" : "xxx" + "Code" : "0", + "Message" : "Exec success.", + "RequestId" : "xxx" } ``` @@ -385,7 +385,11 @@ TaskInfo struct: | Name | Type | Description | | --- | --- | --- | -|TaskType |enum| task type:
(0:clone, 1:recover)| +|TaskType |enum| task type:
(0:clone, 1:recover)| +|FileType |enum| task file type:
(0:SrcFile 1:SrcSnapshot) | +|IsLazy | bool |task is lazy| +|Progress|uint8| task progress percent | +|Src | string | task source snapshot or file | |User |string |the user of the volume | |File |string|volume name which the clone/recover task belongs to| |Time |uint64| created time stamp | @@ -405,17 +409,17 @@ HTTP/1.1 200 OK Content-Length: xxx { -​ "Code": "0", -​ "Message": "Exec success.", -​ "RequestId": "xxx", -​ "TotalCount": 1, -​ "TaskInfos":[{ - ​ "File" : "/zjm/clone1", - ​ "UUID" : "78e83875-2b50-438f-8f25-36715380f4f5", - ​ "TaskStatus" : 5, - ​ "TaskType" : 0, - ​ "Time" : 0, - ​ "User" : "zjm" + "Code": "0", + "Message": "Exec success.", + "RequestId": "xxx", + "TotalCount": 1, + "TaskInfos":[{ + "File" : "/zjm/clone1", + "UUID" : "78e83875-2b50-438f-8f25-36715380f4f5", + "TaskStatus" : 5, + "TaskType" : 0, + "Time" : 0, + "User" : "zjm" }] } ``` @@ -459,9 +463,9 @@ HTTP/1.1 200 OK Content-Length: xxx { -​ "Code" : "0", -​ "Message" : "Exec success.", -​ "RequestId" : "xxx" + "Code" : "0", + "Message" : "Exec success.", + "RequestId" : "xxx" } ``` diff --git a/docs/e2e.md b/docs/e2e.md new file mode 100644 index 0000000..95cadce --- /dev/null +++ b/docs/e2e.md @@ -0,0 +1,42 @@ +## Start the curve-csi + +``` +curve-csi --nodeid testnode \ + --endpoint tcp://127.0.0.1:10000 \ + --snapshot-server http://127.0.0.1:5556 \ + -v 5 +``` + +## Install csi-sanity + +Starting with csi-test v4.3.0, you can build the csi-sanity command with `go get github.com/kubernetes-csi/csi-test/cmd/csi-sanity` and you'll find the compiled binary in `$GOPATH/bin/csi-sanity`. + +go get always builds the latest revision from the master branch. To build a certain release, [get the source code](https://github.com/kubernetes-csi/csi-test/releases) and run `make -C cmd/csi-sanity`. This produces `cmd/csi-sanity/csi-sanity`. + +## Run e2e test + +Skip 2 cases: + +- As the plugin limit: `len(user+volume)<=80`, so can not support volume with maximum-length(128). +- The plugin does not implement: `should fail when requesting to create a snapshot with already existing name and different source volume ID` + +``` +cat > config.yaml <= v1.15 and for block volume expand support the kubernetes version should be >=1.16. Also, `ExpandCSIVolumes` feature gate has to be enabled for the volume expand functionality to work. + +- The controlling StorageClass must have `allowVolumeExpansion` set to `true`. + +## Expand Curve Filesystem PVC + +Create PVC: + +```yaml +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: curve-test-pvc +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 20Gi + storageClassName: curve +``` + +Wait PVC bounded: + +```bash +$ kubectl get pvc +NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGE +curve-test-pvc Bound pvc-b2789e09-9854-4aa1-b556-d9b0e0569f87 20Gi RWO curve 38s +``` + +Create the pod using this PVC, check the size: + +```bash +$ kubectl exec -it csi-curve-test bash +root@csi-curve-test:/# df -h /var/lib/www/html +Filesystem Size Used Avail Use% Mounted on +/dev/nbd0 20G 45M 20G 1% /var/lib/www/html +``` + +Now expand the PVC by editing the PVC (pvc.spec.resource.requests.storage), then get it: + +```bash +$ kubectl get pvc curve-test-pvc +NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGE +curve-test-pvc Bound pvc-b2789e09-9854-4aa1-b556-d9b0e0569f87 30Gi RWO curve 7m3s +``` + +Check the directory size inside the pod where PVC is mounted: + +```bash +$ kubectl exec -it csi-curve-test bash +root@csi-curve-test:/# df -h /var/lib/www/html +Filesystem Size Used Avail Use% Mounted on +/dev/nbd0 30G 44M 30G 1% /var/lib/www/html +``` + +## Expand Curve Block PVC + +Create block PVC: + +```yaml +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: curve-test-pvc-block +spec: + accessModes: + - ReadWriteMany + volumeMode: Block + storageClassName: curve + resources: + requests: + storage: 20Gi +``` + +Wait the PVC bounded: + +```bash +$ kubectl get pvc curve-test-pvc-block +NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGE +curve-test-pvc-block Bound pvc-6fd55b8f-5a26-422c-b4d9-e9613e5724b5 20Gi RWX curve 14s +``` + +Create the pod using this PVC: + +```yaml +apiVersion: v1 +kind: Pod +metadata: + annotations: + container.apparmor.security.beta.kubernetes.io/my-container: unconfined + name: csi-curve-test-block +spec: + containers: + - name: my-container + image: debian + command: + - sleep + - "3600" + securityContext: + capabilities: + add: ["SYS_ADMIN"] + volumeDevices: + - devicePath: /dev/block + name: my-volume + volumes: + - name: my-volume + persistentVolumeClaim: + claimName: curve-test-pvc-block +``` + +Check the size: + +```bash +$ kubectl exec -it csi-curve-test-block bash +root@csi-curve-test-block:/# blockdev --getsize64 /dev/block +21474836480 +``` + +Now expand the PVC by editing the PVC (pvc.spec.resource.requests.storage), then get it: + +```bash +$ kubectl get pvc curve-test-pvc-block +NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGE +curve-test-pvc-block Bound pvc-6fd55b8f-5a26-422c-b4d9-e9613e5724b5 30Gi RWX curve 6m45s +``` + +Check the block size inside the pod: + +```bash +$ kubectl exec -it csi-curve-test-block bash +root@csi-curve-test-block:/# blockdev --getsize64 /dev/block +32212254720 +``` diff --git a/docs/snapshot.md b/docs/snapshot.md new file mode 100644 index 0000000..cbdab13 --- /dev/null +++ b/docs/snapshot.md @@ -0,0 +1,105 @@ +- [Prerequisite](#prerequisite) +- [Create SnapshotClass](#create-snapshotclass) +- [Create Snapshot](#create-snapshot) +- [Restore Snapshot to a new PVC](#restore-snapshot-to-a-new-pvc) + +## Prerequisite + +For snapshot functionality to be supported for your Kubernetes cluster, the Kubernetes version running in your cluster should be `>= v1.17`. We also need the snapshot controller deployed in your Kubernetes cluster along with csi-snapshotter sidecar container. + +**Git Repository:** https://github.com/kubernetes-csi/external-snapshotter + +**Supported Versions** + +|Latest stable release |Min CSI Version |Max CSI Version |Container Image |Min K8s Version |Max K8s Version |Recommended K8s Version| +| --- | --- | --- | --- | --- |--- |---| +|v4.2.1 | v1.0.0 |- |k8s.gcr.io/sig-storage/snapshot-controller:v4.2.1| v1.20 |- |v1.22| +| v4.1.1| v1.0.0 |- |k8s.gcr.io/sig-storage/snapshot-controller:v4.1.1| v1.20| - |v1.20| +| v3.0.3 (beta)| v1.0.0| -| k8s.gcr.io/sig-storage/snapshot-controller:v3.0.3 |v1.17 |-| v1.17| + + +**Install Snapshot Beta CRDs:** + +```bash +kubectl create -f https://raw.githubusercontent.com/kubernetes-csi/external-snapshotter/release-3.0/client/config/crd/snapshot.storage.k8s.io_volumesnapshotclasses.yaml + +kubectl create -f https://raw.githubusercontent.com/kubernetes-csi/external-snapshotter/release-3.0/client/config/crd/snapshot.storage.k8s.io_volumesnapshotcontents.yaml + +kubectl create -f https://raw.githubusercontent.com/kubernetes-csi/external-snapshotter/release-3.0/client/config/crd/snapshot.storage.k8s.io_volumesnapshots.yaml +``` + +**Install Snapshot Controller:** + +```bash +kubectl apply -f https://raw.githubusercontent.com/kubernetes-csi/external-snapshotter/release-3.0/deploy/kubernetes/snapshot-controller/rbac-snapshot-controller.yaml + +kubectl apply -f https://raw.githubusercontent.com/kubernetes-csi/external-snapshotter/release-3.0/deploy/kubernetes/snapshot-controller/setup-snapshot-controller.yaml +``` + +## Create SnapshotClass + +```bash +kubectl create -f ../examples/snapshotclass.yaml +``` + +## Create Snapshot + +- Verify if PVC is in Bound state + +```bash +$ kubectl get pvc +NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGE +curve-test-pvc Bound pvc-b2789e09-9854-4aa1-b556-d9b0e0569f87 30Gi RWO curve 35m +``` + +- Create snapshot of the bound PVC + +```bash +kubectl create -f ../examples/snapshot.yaml +``` + +- Wait the snapshot ready + +```bash +$ kubectl get volumesnapshot curve-snapshot-test -o yaml +apiVersion: snapshot.storage.k8s.io/v1beta1 +kind: VolumeSnapshot +metadata: + creationTimestamp: "2021-12-14T06:53:41Z" + finalizers: + - snapshot.storage.kubernetes.io/volumesnapshot-as-source-protection + - snapshot.storage.kubernetes.io/volumesnapshot-bound-protection + generation: 1 + name: curve-snapshot-test + namespace: default + resourceVersion: "319004898" + selfLink: /apis/snapshot.storage.k8s.io/v1beta1/namespaces/default/volumesnapshots/curve-snapshot-test + uid: 9ed2b88c-e816-438f-996e-8819980f0159 +spec: + source: + persistentVolumeClaimName: curve-test-pvc + volumeSnapshotClassName: curve-snapclass +status: + boundVolumeSnapshotContentName: snapcontent-9ed2b88c-e816-438f-996e-8819980f0159 + creationTime: "2021-12-14T06:53:41Z" + readyToUse: true + restoreSize: 30Gi + +$ kubectl get volumesnapshotcontent +NAME READYTOUSE RESTORESIZE DELETIONPOLICY DRIVER VOLUMESNAPSHOTCLASS VOLUMESNAPSHOT AGE +snapcontent-9ed2b88c-e816-438f-996e-8819980f0159 true 32212254720 Delete curve.csi.netease.com curve-snapclass curve-snapshot-test 5m20s +``` + +## Restore Snapshot to a new PVC + +```bash +kubectl create -f ../examples/pvc-restore.yaml +``` + +Get the pvc: + +```bash +$ kubectl get pvc curve-pvc-restore +NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGE +curve-pvc-restore Bound pvc-60cdcc88-61d1-48de-b860-0ecc0ff2dd0e 40Gi RWO curve 4s +``` diff --git a/examples/block/pod.yaml b/examples/pod-block.yaml similarity index 100% rename from examples/block/pod.yaml rename to examples/pod-block.yaml diff --git a/examples/block/pvc.yaml b/examples/pvc-block.yaml similarity index 100% rename from examples/block/pvc.yaml rename to examples/pvc-block.yaml diff --git a/examples/pvc-clone.yaml b/examples/pvc-clone.yaml new file mode 100644 index 0000000..6c45d7a --- /dev/null +++ b/examples/pvc-clone.yaml @@ -0,0 +1,15 @@ +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: curve-pvc-clone +spec: + storageClassName: curve + dataSource: + name: curve-test-pvc + kind: PersistentVolumeClaim + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 40Gi \ No newline at end of file diff --git a/examples/pvc-restore.yaml b/examples/pvc-restore.yaml new file mode 100644 index 0000000..a9937b8 --- /dev/null +++ b/examples/pvc-restore.yaml @@ -0,0 +1,16 @@ +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: curve-pvc-restore +spec: + storageClassName: curve + dataSource: + name: curve-snapshot-test + kind: VolumeSnapshot + apiGroup: snapshot.storage.k8s.io + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 40Gi \ No newline at end of file diff --git a/examples/snapshot.yaml b/examples/snapshot.yaml new file mode 100644 index 0000000..096e0e1 --- /dev/null +++ b/examples/snapshot.yaml @@ -0,0 +1,8 @@ +apiVersion: snapshot.storage.k8s.io/v1beta1 +kind: VolumeSnapshot +metadata: + name: curve-snapshot-test +spec: + volumeSnapshotClassName: curve-snapclass + source: + persistentVolumeClaimName: curve-test-pvc diff --git a/examples/snapshotclass.yaml b/examples/snapshotclass.yaml new file mode 100644 index 0000000..b0fccd3 --- /dev/null +++ b/examples/snapshotclass.yaml @@ -0,0 +1,6 @@ +apiVersion: snapshot.storage.k8s.io/v1beta1 +kind: VolumeSnapshotClass +metadata: + name: curve-snapclass +driver: curve.csi.netease.com +deletionPolicy: Delete diff --git a/examples/storageclass.yaml b/examples/storageclass.yaml index aff2981..892f8cf 100644 --- a/examples/storageclass.yaml +++ b/examples/storageclass.yaml @@ -5,6 +5,7 @@ metadata: name: curve parameters: user: k8s + cloneLazy: "true" provisioner: curve.csi.netease.com reclaimPolicy: Delete volumeBindingMode: Immediate diff --git a/go.mod b/go.mod index 9d2a365..eaf4e10 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/stretchr/testify v1.7.0 golang.org/x/sys v0.0.0-20210616094352-59db8d763f22 google.golang.org/grpc v1.38.0 + google.golang.org/protobuf v1.26.0 k8s.io/apimachinery v0.21.7 k8s.io/cloud-provider v0.21.7 k8s.io/klog/v2 v2.9.0 diff --git a/pkg/curve/controllerserver.go b/pkg/curve/controllerserver.go index fdef883..9955dc5 100644 --- a/pkg/curve/controllerserver.go +++ b/pkg/curve/controllerserver.go @@ -18,10 +18,13 @@ package curve import ( "context" + "fmt" + "time" "github.com/container-storage-interface/spec/lib/go/csi" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/timestamppb" volumehelpers "k8s.io/cloud-provider/volume/helpers" csicommon "github.com/opencurve/curve-csi/pkg/csi-common" @@ -33,12 +36,20 @@ import ( type controllerServer struct { *csicommon.DefaultControllerServer - volumeLocks *util.VolumeLocks - curveVolumePrefix string + // A map storing all volumes with ongoing operations so that additional operations + // for that same volume (as defined by VolumeID/volume name) return an Aborted error + volumeLocks *util.VolumeLocks + // A map storing all snapshots with ongoing operations so that additional operations + // for that same snapshot (as defined by SnapshotID/snapshot name) return an Aborted error + snapshotLocks *util.VolumeLocks + + snapshotServer string } // CreateVolume creates the volume in backend, if it is not already present -func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { +func (cs *controllerServer) CreateVolume( + ctx context.Context, + req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { if err := cs.validateCreateVolumeRequest(req); err != nil { ctxlog.Errorf(ctx, err.Error()) return nil, err @@ -53,36 +64,48 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol ctxlog.Infof(ctx, "starting creating volume requestNamed %s", reqName) // get volume options - volOptions, err := newVolumeOptions(req, cs.curveVolumePrefix) + volOptions, err := newVolumeOptions(req) if err != nil { ctxlog.ErrorS(ctx, err, "failed to new volume options") return nil, status.Error(codes.InvalidArgument, err.Error()) } - - // compose csi volume id - csiVolumeId, err := composeCSIID(volOptions.user, volOptions.volName) - if err != nil { - ctxlog.ErrorS(ctx, err, "failed to composeCSIID") - return nil, status.Error(codes.Internal, err.Error()) - } - ctxlog.V(5).Infof(ctx, "build volumeOptions: %+v with csiVolumeId: %v", volOptions, csiVolumeId) - - // TODO: support volume clone and snapshot restore + ctxlog.V(5).Infof(ctx, "build volumeOptions: %+v", volOptions) // verify the volume already exists curveVol := curveservice.NewCurveVolume(volOptions.user, volOptions.volName, volOptions.sizeGiB) volDetail, err := curveVol.Stat(ctx) - if err != nil { + if err == nil { + ctxlog.V(4).Infof(ctx, "the volume %v already created, status: %v", volOptions.volName, volDetail.FileStatus) + if volDetail.LengthGiB != volOptions.sizeGiB { + return nil, status.Errorf(codes.AlreadyExists, "request size %vGiB not equal with existing %vGiB", volOptions.sizeGiB, volDetail.LengthGiB) + } + return &csi.CreateVolumeResponse{ + Volume: &csi.Volume{ + VolumeId: volOptions.volId, + CapacityBytes: int64(volOptions.sizeGiB * volumehelpers.GiB), + VolumeContext: req.GetParameters(), + }, + }, nil + } + if !util.IsNotFoundErr(err) { ctxlog.ErrorS(ctx, err, "failed to get volDetail") return nil, status.Error(codes.Internal, err.Error()) } - if volDetail.FileStatus == curveservice.CurveVolumeStatusCreated { - ctxlog.V(4).Infof(ctx, "the volume %s already created", reqName) + + // create volume from contentSource: snapshot or clone from an existing volume + volSource, err := cs.createVolFromContentSource(ctx, req, volOptions, curveVol) + if err != nil { + return nil, err + } + if len(volSource) > 0 { + volContext := req.GetParameters() + volContext["volSource"] = volSource return &csi.CreateVolumeResponse{ Volume: &csi.Volume{ - VolumeId: csiVolumeId, + VolumeId: volOptions.volId, CapacityBytes: int64(volOptions.sizeGiB * volumehelpers.GiB), - VolumeContext: req.GetParameters(), + ContentSource: req.GetVolumeContentSource(), + VolumeContext: volContext, }, }, nil } @@ -96,7 +119,7 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol ctxlog.Infof(ctx, "successfully created volume named %s for request name %s", curveVol.FileName, reqName) return &csi.CreateVolumeResponse{ Volume: &csi.Volume{ - VolumeId: csiVolumeId, + VolumeId: volOptions.volId, CapacityBytes: int64(volOptions.sizeGiB * volumehelpers.GiB), VolumeContext: req.GetParameters(), }, @@ -104,7 +127,9 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol } // DeleteVolume deletes the volume in backend and its reservation -func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { +func (cs *controllerServer) DeleteVolume( + ctx context.Context, + req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { if err := cs.validateDeleteVolumeRequest(req); err != nil { ctxlog.ErrorS(ctx, err, "DeleteVolumeRequest validation failed") return nil, err @@ -120,11 +145,11 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol ctxlog.Infof(ctx, "starting deleting volume id %s", volumeId) - volOptions, err := newVolumeOptionsFromVolID(volumeId, cs.curveVolumePrefix) + volOptions, err := newVolumeOptionsFromVolID(volumeId) if err != nil { - return nil, status.Error(codes.Internal, err.Error()) + ctxlog.Warningf(ctx, "failed to new volOptions from volume id %v", volumeId) + return &csi.DeleteVolumeResponse{}, nil } - // lock out parallel delete and create requests against the same volume name if acquired := cs.volumeLocks.TryAcquire(volOptions.reqName); !acquired { ctxlog.Infof(ctx, util.VolumeOperationAlreadyExistsFmt, volumeId) @@ -132,17 +157,51 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol } defer cs.volumeLocks.Release(volOptions.reqName) + if cs.snapshotServer == "" { + // delete volume + curveVol := curveservice.NewCurveVolume(volOptions.user, volOptions.volName, volOptions.sizeGiB) + if err := curveVol.Delete(ctx); err != nil { + ctxlog.ErrorS(ctx, err, "failed to delete volume", "volumeId", volumeId) + return nil, status.Error(codes.Internal, err.Error()) + } + ctxlog.Infof(ctx, "successfully deleted volume %s", volumeId) + return &csi.DeleteVolumeResponse{}, nil + } + + // ensure all the tasks created from this volume status done. + snapServer := curveservice.NewSnapshotServer(cs.snapshotServer, volOptions.user, volOptions.volName) + if err = snapServer.EnsureTaskFromSourceDone(ctx, volOptions.genVolumePath()); err != nil { + ctxlog.Errorf(ctx, "failed to ensure tasks from %v status done: %v", volumeId, err) + return nil, status.Error(codes.Internal, err.Error()) + } + + // detete volume curveVol := curveservice.NewCurveVolume(volOptions.user, volOptions.volName, volOptions.sizeGiB) if err := curveVol.Delete(ctx); err != nil { ctxlog.ErrorS(ctx, err, "failed to delete volume", "volumeId", volumeId) return nil, status.Error(codes.Internal, err.Error()) } - ctxlog.Infof(ctx, "successfully deleted volume %s", volumeId) + + // clean cloneTask if the volume is cloned + taskInfo, err := snapServer.GetCloneTaskOfDestination(ctx, volOptions.genVolumePath()) + if err != nil { + if util.IsNotFoundErr(err) { + ctxlog.Infof(ctx, "the volume is not cloned, need not clean tasks") + } else { + ctxlog.Warningf(ctx, "can not get taskInfo of path %v", volOptions.genVolumePath()) + } + return &csi.DeleteVolumeResponse{}, nil + } + if err = snapServer.CleanCloneTask(ctx, taskInfo.UUID); err != nil { + ctxlog.Warningf(ctx, "can not clean task %v", taskInfo.UUID) + } return &csi.DeleteVolumeResponse{}, nil } -func (cs *controllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) { +func (cs *controllerServer) ControllerExpandVolume( + ctx context.Context, + req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) { if err := cs.validateExpandVolumeRequest(req); err != nil { ctxlog.ErrorS(ctx, err, "ExpandVolumeRequest validation failed") return nil, err @@ -161,7 +220,7 @@ func (cs *controllerServer) ControllerExpandVolume(ctx context.Context, req *csi } defer cs.volumeLocks.Release(volumeId) - volOptions, err := newVolumeOptionsFromVolID(volumeId, cs.curveVolumePrefix) + volOptions, err := newVolumeOptionsFromVolID(volumeId) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } @@ -174,39 +233,159 @@ func (cs *controllerServer) ControllerExpandVolume(ctx context.Context, req *csi defer cs.volumeLocks.Release(volOptions.reqName) curveVol := curveservice.NewCurveVolume(volOptions.user, volOptions.volName, reqSizeGiB) - // get volume information + sizeGiB, resizeRequired, err := expandVolume(ctx, curveVol, reqSizeGiB) + if err != nil { + ctxlog.ErrorS(ctx, err, "failed to expandVolume") + return nil, status.Error(codes.Internal, err.Error()) + } + return &csi.ControllerExpandVolumeResponse{ + CapacityBytes: int64(sizeGiB * volumehelpers.GiB), + NodeExpansionRequired: resizeRequired, + }, nil +} + +// CreateSnapshot creates the snapshot in backend. +func (cs *controllerServer) CreateSnapshot( + ctx context.Context, + req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) { + if cs.snapshotServer == "" { + return nil, status.Error(codes.Unimplemented, "") + } + if err := cs.validateSnapshotReq(req); err != nil { + ctxlog.ErrorS(ctx, err, "CreateSnapshotRequest validation failed") + return nil, err + } + + snapshotName := req.GetName() + // lock out parallel snapshot operations + if acquired := cs.snapshotLocks.TryAcquire(snapshotName); !acquired { + ctxlog.Infof(ctx, util.SnapshotOperationAlreadyExistsFmt, snapshotName) + return nil, status.Errorf(codes.Aborted, util.SnapshotOperationAlreadyExistsFmt, snapshotName) + } + defer cs.snapshotLocks.Release(snapshotName) + + // build source volume options from volume id + sourceVolId := req.GetSourceVolumeId() + volOptions, err := newVolumeOptionsFromVolID(sourceVolId) + if err != nil { + ctxlog.ErrorS(ctx, err, "failed to new volume options from id", "volumeId", sourceVolId) + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + ctxlog.V(5).Infof(ctx, "build volOptions: %+v", volOptions) + + // lock out parallel delete/create/snapshot requests against the same volume + if acquired := cs.volumeLocks.TryAcquire(volOptions.reqName); !acquired { + ctxlog.Infof(ctx, util.VolumeOperationAlreadyExistsFmt, volOptions.reqName) + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volOptions.reqName) + } + defer cs.volumeLocks.Release(volOptions.reqName) + + snapServer := curveservice.NewSnapshotServer(cs.snapshotServer, volOptions.user, volOptions.volName) + // verify the snapshot already exists + curveSnapshot, err := snapServer.GetFileSnapshotOfName(ctx, snapshotName) + if err == nil { + ctxlog.V(4).Infof(ctx, "snapshot (name %v) already exists, check status...", snapshotName) + return waitSnapshotDone(ctx, snapServer, curveSnapshot, sourceVolId) + } + if !util.IsNotFoundErr(err) { + ctxlog.ErrorS(ctx, err, "failed to get snapshot by name", "snapshotName", snapshotName) + return nil, status.Error(codes.Internal, err.Error()) + } + + // check source volume status + curveVol := curveservice.NewCurveVolume(volOptions.user, volOptions.volName, volOptions.sizeGiB) volDetail, err := curveVol.Stat(ctx) if err != nil { + ctxlog.ErrorS(ctx, err, "failed to stat source volume", "volumeId", sourceVolId) return nil, status.Error(codes.Internal, err.Error()) } - if volDetail.FileStatus == curveservice.CurveVolumeStatusNotExist { - return nil, status.Errorf(codes.Internal, "the curve volume %s not exists", volOptions.volName) + if volDetail.FileStatus == curveservice.CurveVolumeStatusBeingCloned { + ctxlog.Warningf(ctx, "the source volume %v status is BeingCloned, flatten it", sourceVolId) + if err = snapServer.EnsureTaskFromSourceDone(ctx, volOptions.genVolumePath()); err != nil { + ctxlog.ErrorS(ctx, err, "failed to flatten all tasks sourced volume", "volumeId", sourceVolId) + return nil, status.Error(codes.Internal, err.Error()) + } } - ctxlog.Infof(ctx, "volume %s(status %s) size is %dGiB, reqSize is round up to %dGiB", - volDetail.FileName, volDetail.FileStatus, volDetail.LengthGiB, reqSizeGiB) - if reqSizeGiB <= volDetail.LengthGiB { - return &csi.ControllerExpandVolumeResponse{ - CapacityBytes: int64(volDetail.LengthGiB * volumehelpers.GiB), - NodeExpansionRequired: false, - }, nil + // do snapshot + snapCurveUUID, err := snapServer.CreateSnapshot(ctx, snapshotName) + if err != nil { + ctxlog.ErrorS(ctx, err, "failed to create snapshot of name", "snapshotName", snapshotName) + return nil, status.Error(codes.Internal, err.Error()) + } + curveSnapshot, err = snapServer.GetFileSnapshotOfId(ctx, snapCurveUUID) + if err != nil { + ctxlog.ErrorS(ctx, err, "failed to get snapshot by id", "snapCurveUUID", snapCurveUUID) + return nil, status.Error(codes.Internal, err.Error()) } + return waitSnapshotDone(ctx, snapServer, curveSnapshot, sourceVolId) +} - if err := curveVol.Extend(ctx, reqSizeGiB); err != nil { - ctxlog.ErrorS(ctx, err, "failed to delete volume", "volumeId", volumeId) +// DeleteSnapshot deletes thesnapshot in backend. +func (cs *controllerServer) DeleteSnapshot( + ctx context.Context, + req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) { + if cs.snapshotServer == "" { + return nil, status.Error(codes.Unimplemented, "") + } + if err := cs.validateDeleteSnapshotReq(req); err != nil { + ctxlog.ErrorS(ctx, err, "DeleteSnapshotRequest validation failed") + return nil, err + } + + snapshotId := req.GetSnapshotId() + // lock out parallel snapshot + if acquired := cs.snapshotLocks.TryAcquire(snapshotId); !acquired { + ctxlog.Errorf(ctx, util.SnapshotOperationAlreadyExistsFmt, snapshotId) + return nil, status.Errorf(codes.Aborted, util.SnapshotOperationAlreadyExistsFmt, snapshotId) + } + defer cs.snapshotLocks.Release(snapshotId) + + snapCurveUUID, volOptions, err := parseSnapshotID(snapshotId) + if err != nil { + ctxlog.Warningf(ctx, "failed to parse snapshot id: %v", snapshotId) + return &csi.DeleteSnapshotResponse{}, nil + } + + snapServer := curveservice.NewSnapshotServer(cs.snapshotServer, volOptions.user, volOptions.volName) + // get snapshot + curveSnapshot, err := snapServer.GetFileSnapshotOfId(ctx, snapCurveUUID) + if err != nil { + if util.IsNotFoundErr(err, snapCurveUUID) { + ctxlog.Infof(ctx, "snapshot %v not found, maybe already deleted.", snapshotId) + return &csi.DeleteSnapshotResponse{}, nil + } + ctxlog.ErrorS(ctx, err, "failed to get snapshot", "snapCurveUUID", snapCurveUUID) return nil, status.Error(codes.Internal, err.Error()) } - ctxlog.Infof(ctx, "successfully extend volume %s size to %dGiB", volDetail.FileName, reqSizeGiB) - return &csi.ControllerExpandVolumeResponse{ - CapacityBytes: int64(reqSizeGiB * volumehelpers.GiB), - NodeExpansionRequired: true, - }, nil + // lock out parallel snapshot + if acquired := cs.snapshotLocks.TryAcquire(curveSnapshot.Name); !acquired { + ctxlog.Errorf(ctx, util.SnapshotOperationAlreadyExistsFmt, curveSnapshot.Name) + return nil, status.Errorf(codes.Aborted, util.SnapshotOperationAlreadyExistsFmt, curveSnapshot.Name) + } + defer cs.snapshotLocks.Release(curveSnapshot.Name) + + // ensure all the tasks created from this snapshot status done. + if err = snapServer.EnsureTaskFromSourceDone(ctx, snapCurveUUID); err != nil { + ctxlog.Errorf(ctx, "failed to ensure tasks from %v status done: %v", snapCurveUUID, err) + return nil, status.Error(codes.Internal, err.Error()) + } + + // do delete + if err = snapServer.DeleteSnapshot(ctx, snapCurveUUID); err != nil { + ctxlog.ErrorS(ctx, err, "failed to delete snapshot", "snapCurveUUID", snapCurveUUID) + return nil, status.Error(codes.Internal, err.Error()) + } + return &csi.DeleteSnapshotResponse{}, nil } // ValidateVolumeCapabilities checks whether the volume capabilities requested are supported. -func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) { - if req.GetVolumeId() == "" { +func (cs *controllerServer) ValidateVolumeCapabilities( + ctx context.Context, + req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) { + volumeId := req.GetVolumeId() + if volumeId == "" { return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") } @@ -214,9 +393,228 @@ func (cs *controllerServer) ValidateVolumeCapabilities(ctx context.Context, req return nil, status.Error(codes.InvalidArgument, "empty volume capabilities in request") } + volOptions, err := newVolumeOptionsFromVolID(volumeId) + if err != nil { + return nil, status.Error(codes.NotFound, err.Error()) + } + curveVol := curveservice.NewCurveVolume(volOptions.user, volOptions.volName, volOptions.sizeGiB) + volDetail, err := curveVol.Stat(ctx) + if err != nil || volDetail.FileStatus == curveservice.CurveVolumeStatusNotExist { + return nil, status.Error(codes.NotFound, err.Error()) + } + return &csi.ValidateVolumeCapabilitiesResponse{ Confirmed: &csi.ValidateVolumeCapabilitiesResponse_Confirmed{ VolumeCapabilities: req.VolumeCapabilities, }, }, nil } + +// createVolFromContentSource creates a volume from the request contentSource +// return non-empty volSource if clone successfully. +func (cs *controllerServer) createVolFromContentSource( + ctx context.Context, + req *csi.CreateVolumeRequest, + destVolOptions *volumeOptions, + curveVol *curveservice.CurveVolume) (volSource string, err error) { + if cs.snapshotServer == "" { + return "", status.Error(codes.Unimplemented, "") + } + + if req.VolumeContentSource == nil { + return "", nil + } + + volDestination := destVolOptions.genVolumePath() + // check contentSource + switch req.VolumeContentSource.Type.(type) { + case *csi.VolumeContentSource_Snapshot: + snapshotId := req.VolumeContentSource.GetSnapshot().GetSnapshotId() + // lock out parallel snapshot + if acquired := cs.snapshotLocks.TryAcquire(snapshotId); !acquired { + ctxlog.Errorf(ctx, util.SnapshotOperationAlreadyExistsFmt, snapshotId) + return "", status.Errorf(codes.Aborted, util.SnapshotOperationAlreadyExistsFmt, snapshotId) + } + defer cs.snapshotLocks.Release(snapshotId) + // ensure the source snapshot exists, + // and get the snapshot UUID as the source to create a new volume + volSource, err = ensureSnapshotExists(ctx, cs.snapshotServer, snapshotId) + case *csi.VolumeContentSource_Volume: + volumeId := req.VolumeContentSource.GetVolume().GetVolumeId() + // lock out parallel source volume + if acquired := cs.volumeLocks.TryAcquire(volumeId); !acquired { + ctxlog.Errorf(ctx, util.VolumeOperationAlreadyExistsFmt, volumeId) + return "", status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeId) + } + defer cs.volumeLocks.Release(volumeId) + // ensurce the source volume exists, + // and get the volume path as the source to create a new volume + volSource, err = ensureVolumeExists(ctx, cs.snapshotServer, volumeId) + default: + err = status.Errorf(codes.InvalidArgument, "not a proper volume source %v", req.VolumeContentSource) + } + if err != nil { + return "", err + } + + ctxlog.V(4).Infof(ctx, "clone/snapshot volume from %v to %v", volSource, volDestination) + snapServer := curveservice.NewSnapshotServer(cs.snapshotServer, destVolOptions.user, destVolOptions.volName) + var taskUUID string + taskUUID, err = cloneVolume(ctx, snapServer, volSource, volDestination, destVolOptions.cloneLazy) + if err != nil { + ctxlog.ErrorS(ctx, err, "failed to clone volume") + return "", status.Error(codes.Internal, err.Error()) + } + ctxlog.V(4).Infof(ctx, "clone %v status done", taskUUID) + + // fix size if the cloned volume size less than requested size. + _, _, err = expandVolume(ctx, curveVol, destVolOptions.sizeGiB) + if err != nil { + ctxlog.ErrorS(ctx, err, "failed to expand volume") + return "", status.Error(codes.Internal, err.Error()) + } + + return volSource, nil +} + +// Expand volume if the existing size is less than reqSizeGiB +func expandVolume( + ctx context.Context, + curveVol *curveservice.CurveVolume, + reqSizeGiB int) (sizeGiB int, resizeRequired bool, err error) { + volDetail, err := curveVol.Stat(ctx) + if err != nil { + return 0, false, err + } + if volDetail.FileStatus == curveservice.CurveVolumeStatusNotExist { + return 0, false, fmt.Errorf("the curve volume not exists") + } + ctxlog.Infof(ctx, "volume %s(status %s) size is %dGiB, reqSize is round up to %dGiB", + volDetail.FileName, volDetail.FileStatus, volDetail.LengthGiB, reqSizeGiB) + if reqSizeGiB <= volDetail.LengthGiB { + return volDetail.LengthGiB, false, nil + } + + if err := curveVol.Extend(ctx, reqSizeGiB); err != nil { + return 0, false, fmt.Errorf("failed to extend volume") + } + ctxlog.Infof(ctx, "successfully extend volume %s size to %dGiB", volDetail.FileName, reqSizeGiB) + return reqSizeGiB, true, nil +} + +// Waits the snapshot status Done. +// generate a snapshotId from the UUID in curve and the source volume id, then return the response. +func waitSnapshotDone( + ctx context.Context, + snapServer *curveservice.SnapshotServer, + curveSnapshot curveservice.Snapshot, + sourceVolId string) (*csi.CreateSnapshotResponse, error) { + // wait snapshot status done + if curveSnapshot.Status != curveservice.SnapshotStatusDone { + var err error + curveSnapshot, err = snapServer.WaitForSnapshotDone(ctx, curveSnapshot.UUID) + if err != nil { + ctxlog.ErrorS(ctx, err, "failed to wait snapshot status Done", "snapName", curveSnapshot.Name, "UUID", curveSnapshot.UUID) + return nil, status.Error(codes.Internal, err.Error()) + } + } + + snapshotId, err := composeSnapshotID(curveSnapshot.UUID, sourceVolId) + if err != nil { + ctxlog.ErrorS(ctx, err, "failed to compose snapshot id", "snapId", curveSnapshot.UUID, "sourceVolId", sourceVolId) + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + createTime := time.Unix(0, curveSnapshot.Time*1000) + ctxlog.Infof(ctx, "Snapshot(name %v csiId %v) status Done", curveSnapshot.Name, snapshotId) + return &csi.CreateSnapshotResponse{ + Snapshot: &csi.Snapshot{ + SizeBytes: int64(curveSnapshot.FileLength), + SnapshotId: snapshotId, + SourceVolumeId: sourceVolId, + CreationTime: timestamppb.New(createTime), + ReadyToUse: true, + }, + }, nil +} + +// Clone volume from volSource to volDestination and wait for the clone task ready to use. +func cloneVolume( + ctx context.Context, + snapServer *curveservice.SnapshotServer, + volSource, volDestination string, + cloneLazy bool) (string, error) { + taskInfo, err := snapServer.GetCloneTaskOfDestination(ctx, volDestination) + if err == nil { + ctxlog.V(4).Infof(ctx, "get existing task when clone: %v", taskInfo) + if taskInfo.TaskStatus != curveservice.TaskStatusDone { + return taskInfo.UUID, snapServer.WaitForCloneTaskReady(ctx, volDestination) + } + return taskInfo.UUID, nil + } + if !util.IsNotFoundErr(err) { + return "", err + } + + taskUUID, err := snapServer.Clone(ctx, volSource, volDestination, cloneLazy) + if err != nil { + return taskUUID, err + } + return taskUUID, snapServer.WaitForCloneTaskReady(ctx, volDestination) +} + +// Ensure the snapshot exists. +func ensureSnapshotExists(ctx context.Context, snapshotServer, snapshotId string) (string, error) { + snapCurveUUID, volOptions, err := parseSnapshotID(snapshotId) + if err != nil { + return "", status.Errorf(codes.NotFound, "snapshot id %v not found", snapshotId) + } + snapServer := curveservice.NewSnapshotServer(snapshotServer, volOptions.user, volOptions.volName) + if _, err := snapServer.GetFileSnapshotOfId(ctx, snapCurveUUID); err != nil { + if util.IsNotFoundErr(err, snapCurveUUID) { + return "", status.Errorf(codes.NotFound, "the source snapshot(UUID %v) not found", snapCurveUUID) + } + return "", status.Error(codes.Internal, err.Error()) + } + return snapCurveUUID, nil +} + +// Ensure the volume exists. +// If the volume was cloned, ensure the clone task done. +func ensureVolumeExists(ctx context.Context, snapshotServer, volumeId string) (string, error) { + volOptions, err := newVolumeOptionsFromVolID(volumeId) + if err != nil { + return "", status.Errorf(codes.NotFound, "volume id %v not found", volumeId) + } + curveVol := curveservice.NewCurveVolume(volOptions.user, volOptions.volName, volOptions.sizeGiB) + if _, err := curveVol.Stat(ctx); err != nil { + if util.IsNotFoundErr(err) { + return "", status.Errorf(codes.NotFound, "the source volume (%v) not found", volOptions) + } + return "", status.Error(codes.Internal, err.Error()) + } + // flatten the volume if it was cloned by other lazy + snapServer := curveservice.NewSnapshotServer(snapshotServer, volOptions.user, volOptions.volName) + volPath := volOptions.genVolumePath() + taskInfo, err := snapServer.GetCloneTaskOfDestination(ctx, volPath) + if err != nil { + if util.IsNotFoundErr(err) { + return volPath, nil + } + return "", status.Error(codes.Internal, err.Error()) + } + if taskInfo.TaskStatus == curveservice.TaskStatusDone { + return volPath, nil + } + if taskInfo.TaskStatus == curveservice.TaskStatusMetaInstalled { + if err = snapServer.Flatten(ctx, taskInfo.UUID); err != nil { + return "", status.Error(codes.Internal, err.Error()) + } + } + + // wait done + if err = snapServer.WaitForCloneTaskDone(ctx, volPath); err != nil { + return "", status.Error(codes.Internal, err.Error()) + } + ctxlog.V(4).Infof(ctx, "the clone task of destination %v done", volPath) + return volPath, nil +} diff --git a/pkg/curve/curve.go b/pkg/curve/curve.go index f6b3567..aff0eec 100644 --- a/pkg/curve/curve.go +++ b/pkg/curve/curve.go @@ -50,22 +50,22 @@ func NewIdentityServer(d *csicommon.CSIDriver) *identityServer { } } -func NewControllerServer(d *csicommon.CSIDriver, curveVolumePrefix string) *controllerServer { +func NewControllerServer(d *csicommon.CSIDriver, snapshotServer string) *controllerServer { return &controllerServer{ DefaultControllerServer: csicommon.NewDefaultControllerServer(d), volumeLocks: util.NewVolumeLocks(), - curveVolumePrefix: curveVolumePrefix, + snapshotLocks: util.NewVolumeLocks(), + snapshotServer: snapshotServer, } } -func NewNodeServer(d *csicommon.CSIDriver, curveVolumePrefix string) *nodeServer { +func NewNodeServer(d *csicommon.CSIDriver) *nodeServer { curveservice.InitCurveNbd() mounter := mount.New("") return &nodeServer{ DefaultNodeServer: csicommon.NewDefaultNodeServer(d), mounter: mounter, volumeLocks: util.NewVolumeLocks(), - curveVolumePrefix: curveVolumePrefix, } } @@ -102,15 +102,15 @@ func (c *curveDriver) Run(curveConf options.CurveConf) { c.ids = NewIdentityServer(c.driver) if curveConf.IsControllerServer { - c.cs = NewControllerServer(c.driver, curveConf.CurveVolumePrefix) + c.cs = NewControllerServer(c.driver, curveConf.SnapshotServer) } if curveConf.IsNodeServer { - c.ns = NewNodeServer(c.driver, curveConf.CurveVolumePrefix) + c.ns = NewNodeServer(c.driver) } if !curveConf.IsControllerServer && !curveConf.IsNodeServer { - c.cs = NewControllerServer(c.driver, curveConf.CurveVolumePrefix) - c.ns = NewNodeServer(c.driver, curveConf.CurveVolumePrefix) + c.cs = NewControllerServer(c.driver, curveConf.SnapshotServer) + c.ns = NewNodeServer(c.driver) } s := csicommon.NewNonBlockingGRPCServer() diff --git a/pkg/curve/identityserver.go b/pkg/curve/identityserver.go index 520588a..d56196c 100644 --- a/pkg/curve/identityserver.go +++ b/pkg/curve/identityserver.go @@ -50,13 +50,6 @@ func (is *identityServer) GetPluginCapabilities( }, }, }, - { - Type: &csi.PluginCapability_Service_{ - Service: &csi.PluginCapability_Service{ - Type: csi.PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS, - }, - }, - }, }, }, nil } diff --git a/pkg/curve/nodeserver.go b/pkg/curve/nodeserver.go index 1200f98..a5f4969 100644 --- a/pkg/curve/nodeserver.go +++ b/pkg/curve/nodeserver.go @@ -37,9 +37,8 @@ import ( type nodeServer struct { *csicommon.DefaultNodeServer - mounter mount.Interface - volumeLocks *util.VolumeLocks - curveVolumePrefix string + mounter mount.Interface + volumeLocks *util.VolumeLocks } func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { @@ -107,7 +106,7 @@ func (ns *nodeServer) attachDevice(ctx context.Context, req *csi.NodeStageVolume } } - volOptions, err := newVolumeOptionsFromVolID(req.GetVolumeId(), ns.curveVolumePrefix) + volOptions, err := newVolumeOptionsFromVolID(req.GetVolumeId()) if err != nil { return "", status.Error(codes.Internal, err.Error()) } @@ -175,6 +174,12 @@ func (ns *nodeServer) mountVolumeToStagePath(ctx context.Context, req *csi.NodeS err = diskMounter.Mount(devicePath, stagingPath, fsType, opt) } else { err = diskMounter.FormatAndMount(devicePath, stagingPath, fsType, opt) + // resize2fs + resizer := util.NewResizeFs(diskMounter) + ok, err := resizer.Resize(ctx, devicePath, stagingPath) + if !ok { + ctxlog.Warningf(ctx, "resize failed on device %v path %v, err: %v", devicePath, stagingPath, err) + } } if err != nil { ctxlog.ErrorS(ctx, err, "failed to mount device to staging path", "devicePath", devicePath, "stagingPath", stagingPath, "volumeId", req.GetVolumeId()) @@ -344,7 +349,7 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag } // unmap - volOptions, err := newVolumeOptionsFromVolID(volumeId, ns.curveVolumePrefix) + volOptions, err := newVolumeOptionsFromVolID(volumeId) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } @@ -380,14 +385,24 @@ func (ns *nodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV // having the staging_target_path information volumePath = req.GetVolumePath() } - if volumePath == "" { - return nil, status.Error(codes.InvalidArgument, "volume path must be provided") + + // check path exists + if _, err := os.Stat(volumePath); err != nil { + if os.IsNotExist(err) { + return nil, status.Errorf(codes.NotFound, "path %v not exists", volumePath) + } + return nil, status.Errorf(codes.Internal, "can not stat path %v", volumePath) + } + + if req.GetVolumeCapability().GetBlock() != nil { + return &csi.NodeExpandVolumeResponse{}, nil } // get device path + volumePath += "/" + volumeId devicePath, _, err := mount.GetDeviceNameFromMount(ns.mounter, volumePath) if err != nil { - return nil, fmt.Errorf("can not get device from mount, err: %v", err) + return nil, status.Errorf(codes.Internal, "can not get device from mount, err: %v", err) } if devicePath == "" { ctxlog.V(4).Infof(ctx, "the path %s is not mounted, ignore resizing", volumePath) @@ -407,11 +422,11 @@ func (ns *nodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV // NodeGetVolumeStats returns volume stats func (ns *nodeServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) { - volumePath := req.GetVolumePath() - if volumePath == "" { - return nil, status.Errorf(codes.InvalidArgument, "volumePath %v is empty", volumePath) + if err := ns.validateNodeGetVolumeStatsRequest(req); err != nil { + return nil, err } + volumePath := req.GetVolumePath() exists, err := utilpath.Exists(utilpath.CheckFollowSymlink, volumePath) if err != nil { return nil, status.Errorf(codes.Internal, "failed to check whether volumePath exists: %s", err) diff --git a/pkg/curve/validate.go b/pkg/curve/validate.go index cc7d859..c8116f3 100644 --- a/pkg/curve/validate.go +++ b/pkg/curve/validate.go @@ -44,6 +44,10 @@ func (cs *controllerServer) validateDeleteVolumeRequest(req *csi.DeleteVolumeReq return err } + if req.GetVolumeId() == "" { + return status.Error(codes.InvalidArgument, "volume Id cannot be empty") + } + return nil } @@ -52,14 +56,45 @@ func (cs *controllerServer) validateExpandVolumeRequest(req *csi.ControllerExpan return err } - capRange := req.GetCapacityRange() - if capRange == nil { + if req.GetVolumeId() == "" { + return status.Error(codes.InvalidArgument, "volume Id cannot be empty") + } + + if req.GetCapacityRange() == nil { return status.Error(codes.InvalidArgument, "capacityRange cannot be empty") } return nil } +func (cs *controllerServer) validateSnapshotReq(req *csi.CreateSnapshotRequest) error { + if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT); err != nil { + return err + } + + // Check sanity of request Snapshot Name, Source Volume Id + if req.Name == "" { + return status.Error(codes.InvalidArgument, "snapshot Name cannot be empty") + } + if req.SourceVolumeId == "" { + return status.Error(codes.InvalidArgument, "source Volume ID cannot be empty") + } + + return nil +} + +func (cs *controllerServer) validateDeleteSnapshotReq(req *csi.DeleteSnapshotRequest) error { + if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT); err != nil { + return err + } + + if req.GetSnapshotId() == "" { + return status.Error(codes.InvalidArgument, "snapshot ID cannot be empty") + } + + return nil +} + func (ns *nodeServer) validateNodeStageVolumeRequest(req *csi.NodeStageVolumeRequest) error { if req.GetVolumeCapability() == nil { return status.Error(codes.InvalidArgument, "volume capability missing in request") @@ -131,6 +166,20 @@ func (ns *nodeServer) validateNodeExpandVolumeRequest(req *csi.NodeExpandVolumeR return status.Error(codes.InvalidArgument, "volume ID missing in request") } + if req.GetStagingTargetPath() == "" && req.GetVolumePath() == "" { + return status.Error(codes.InvalidArgument, "volume path must be provided") + } + + return nil +} + +func (ns *nodeServer) validateNodeGetVolumeStatsRequest(req *csi.NodeGetVolumeStatsRequest) error { + if req.GetVolumeId() == "" { + return status.Error(codes.InvalidArgument, "volume ID missing in request") + } + if req.GetVolumePath() == "" { + return status.Error(codes.InvalidArgument, "volume Path missing in request") + } return nil } diff --git a/pkg/curve/volid.go b/pkg/curve/volid.go index 289cd95..8a52efb 100644 --- a/pkg/curve/volid.go +++ b/pkg/curve/volid.go @@ -24,7 +24,7 @@ import ( ) const ( - maxVolIDLen = 128 + maxCSIIDLen = 128 ) /* @@ -36,7 +36,7 @@ ComposeCSIID composes a CSI ID from passed in parameters. func composeCSIID(user, volName string) (string, error) { buf16 := make([]byte, 2) - if (4 + 1 + len(user) + 1 + len(volName)) > maxVolIDLen { + if (4 + 1 + len(user) + 1 + len(volName)) > maxCSIIDLen { return "", fmt.Errorf("CSI ID encoding length overflow") } @@ -47,6 +47,9 @@ func composeCSIID(user, volName string) (string, error) { } func decomposeCSIID(composedCSIID string) (user string, volName string, err error) { + if len(composedCSIID) < 8 { + return "", "", fmt.Errorf("%q can not less than 8", composedCSIID) + } buf16, err := hex.DecodeString(composedCSIID[0:4]) if err != nil { return "", "", err @@ -56,3 +59,34 @@ func decomposeCSIID(composedCSIID string) (user string, volName string, err erro volName = composedCSIID[6+userLength:] return user, volName, nil } + +/* +ComposeSnapshotID composes a Snapshot ID from passed in parameters. + [length of snapCurveUUID=1:4byte] + [-:1byte] + [snapCurveUUID] + [-:1byte] + + composeCSIID(user, volName string) +*/ +func composeSnapshotID(snapCurveUUID, volId string) (string, error) { + buf16 := make([]byte, 2) + if (4 + 1 + len(snapCurveUUID) + 1 + len(volId)) > maxCSIIDLen { + return "", fmt.Errorf("CSI Snapshot ID encoding length overflow") + } + + binary.BigEndian.PutUint16(buf16, uint16(len(snapCurveUUID))) + snapCurveUUIDLength := hex.EncodeToString(buf16) + return strings.Join([]string{snapCurveUUIDLength, snapCurveUUID, volId}, "-"), nil +} + +func decomposeSnapshotID(composedSnapID string) (snapCurveUUID string, volId string, err error) { + if len(composedSnapID) < 8 { + return "", "", fmt.Errorf("%q can not less than 8", composedSnapID) + } + buf16, err := hex.DecodeString(composedSnapID[0:4]) + if err != nil { + return "", "", err + } + snapCurveUUIDLength := binary.BigEndian.Uint16(buf16) + snapCurveUUID = composedSnapID[5 : 5+snapCurveUUIDLength] + volId = composedSnapID[6+snapCurveUUIDLength:] + return snapCurveUUID, volId, nil +} diff --git a/pkg/curve/volid_test.go b/pkg/curve/volid_test.go index 6f8750e..1ab98e3 100644 --- a/pkg/curve/volid_test.go +++ b/pkg/curve/volid_test.go @@ -24,7 +24,7 @@ import ( func TestComposeCSIID(t *testing.T) { user := "k8s" - volName := csiDefaultVolNamingPrefix + "pvc-eeafeeb3-7a35-11ea-934a-fa163e28f309" + volName := csiVolNamingPrefix + "pvc-eeafeeb3-7a35-11ea-934a-fa163e28f309" id, err := composeCSIID(user, volName) assert.NoError(t, err) diff --git a/pkg/curve/volumeoptions.go b/pkg/curve/volumeoptions.go index 85341b0..00b2810 100644 --- a/pkg/curve/volumeoptions.go +++ b/pkg/curve/volumeoptions.go @@ -26,36 +26,52 @@ import ( ) const ( - csiDefaultVolNamingPrefix = "csi-vol-" + csiVolNamingPrefix = "csi-vol-" + + // max length of curve volume uesr + curveUserMaxLen = 30 + // clone lazy + curveCloneDefaultLazy = true ) type volumeOptions struct { - reqName string - volName string - sizeGiB int - user string + reqName string + volName string + volId string + sizeGiB int + user string + cloneLazy bool +} + +func (vo *volumeOptions) genVolumePath() string { + return "/" + vo.user + "/" + vo.volName } -func newVolumeOptions(req *csi.CreateVolumeRequest, curveVolumePrefix string) (*volumeOptions, error) { +func newVolumeOptions(req *csi.CreateVolumeRequest) (*volumeOptions, error) { var ( ok bool err error ) opts := &volumeOptions{ reqName: req.GetName(), + volName: csiVolNamingPrefix + req.GetName(), } - if curveVolumePrefix != "" { - opts.volName = curveVolumePrefix + opts.reqName - } else { - opts.volName = csiDefaultVolNamingPrefix + opts.reqName - } - - volOptions := req.GetParameters() - opts.user, ok = volOptions["user"] + parameters := req.GetParameters() + opts.user, ok = parameters["user"] if !ok { return nil, fmt.Errorf("missing required field: user") } + if len(opts.user) == 0 || len(opts.user) > curveUserMaxLen { + return nil, fmt.Errorf("length of field user must be 1~%v", curveUserMaxLen) + } + + cloneLazy, ok := parameters["cloneLazy"] + if ok { + opts.cloneLazy = cloneLazy == "true" + } else { + opts.cloneLazy = curveCloneDefaultLazy + } // volume size - default is 10GiB opts.sizeGiB = 10 @@ -66,26 +82,26 @@ func newVolumeOptions(req *csi.CreateVolumeRequest, curveVolumePrefix string) (* } } + opts.volId, err = composeCSIID(opts.user, opts.volName) + if err != nil { + return nil, fmt.Errorf("failed to composeCSIID: %v", err) + } + return opts, nil } -func newVolumeOptionsFromVolID(volumeId string, curveVolumePrefix string) (*volumeOptions, error) { - var ( - volOptions volumeOptions - err error - ) - +func newVolumeOptionsFromVolID(volumeId string) (*volumeOptions, error) { + var err error + volOptions := &volumeOptions{ + volId: volumeId, + } volOptions.user, volOptions.volName, err = decomposeCSIID(volumeId) if err != nil { return nil, err } - volNamingPrefix := curveVolumePrefix - if volNamingPrefix == "" { - volNamingPrefix = csiDefaultVolNamingPrefix - } - volOptions.reqName = strings.TrimPrefix(volOptions.volName, volNamingPrefix) + volOptions.reqName = strings.TrimPrefix(volOptions.volName, csiVolNamingPrefix) - return &volOptions, nil + return volOptions, nil } func roundUpToGiBInt(sizeBytes int64) (int, error) { @@ -103,3 +119,13 @@ func roundUpToGiBInt(sizeBytes int64) (int, error) { } return sizeGiB, nil } + +func parseSnapshotID(snapshotId string) (string, *volumeOptions, error) { + snapCurveUUID, volId, err := decomposeSnapshotID(snapshotId) + if err != nil { + return "", nil, err + } + volOptions, err := newVolumeOptionsFromVolID(volId) + + return snapCurveUUID, volOptions, err +} diff --git a/pkg/curveservice/curveservice.go b/pkg/curveservice/curveservice.go index 675efa9..8f7e5ff 100644 --- a/pkg/curveservice/curveservice.go +++ b/pkg/curveservice/curveservice.go @@ -40,6 +40,8 @@ const ( CurveVolumeStatusExist CurveVolumeStatus = "kFileExists" CurveVolumeStatusCreated CurveVolumeStatus = "Created" CurveVolumeStatusOwnerAuthFail CurveVolumeStatus = "kOwnerAuthFail" + CurveVolumeStatusClonedLazy CurveVolumeStatus = "CloneMetaInstalled" + CurveVolumeStatusBeingCloned CurveVolumeStatus = "BeingCloned" CurveVolumeStatusUnknown CurveVolumeStatus = "Unknown" ) @@ -74,9 +76,7 @@ func (cv *CurveVolume) Stat(ctx context.Context) (*CurveVolumeDetail, error) { ctxlog.Warningf(ctx, "[curve] failed to stat the file %s, err: %v, output: %v", cv.FilePath, err, outputStr) if strings.Contains(outputStr, fmt.Sprintf(retFailFormat, retNotExist)) { - return &CurveVolumeDetail{ - FileStatus: CurveVolumeStatusNotExist, - }, nil + return nil, util.NewNotFoundErr() } return nil, fmt.Errorf("can not run curve %v, err: %v, output: %v", args, err, outputStr) diff --git a/pkg/curveservice/snapshot.go b/pkg/curveservice/snapshot.go new file mode 100644 index 0000000..00b4bc8 --- /dev/null +++ b/pkg/curveservice/snapshot.go @@ -0,0 +1,545 @@ +/* +Copyright 2021 The Netease Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package curveservice + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "strconv" + "time" + + "k8s.io/apimachinery/pkg/util/wait" + + "github.com/opencurve/curve-csi/pkg/util" + "github.com/opencurve/curve-csi/pkg/util/ctxlog" +) + +const ( + // The following three values are used for 30 seconds timeout + // while waiting for snapshot Watcher to expire. + snapshotWatcherInitDelay = 1 * time.Second + snapshotWatcherFactor = 1.4 + snapshotWatcherSteps = 10 +) + +type SnapshotServer struct { + URL string `json:"server"` + User string `json:"user"` + FilePath string `json:"filepath"` +} + +func NewSnapshotServer(server, user, volName string) *SnapshotServer { + return &SnapshotServer{ + URL: server + "/SnapshotCloneService", + User: user, + FilePath: "/" + user + "/" + volName, + } +} + +// GetSnapshotByName gets the snapshot with specific name +func (cs *SnapshotServer) GetFileSnapshotOfName(ctx context.Context, snapName string) (Snapshot, error) { + var snap Snapshot + limit, offset, total := 20, 0, 1 + for offset < total { + snapshotResp, err := cs.getFileSnapshots(ctx, "", limit, offset) + if err != nil { + return snap, err + } + for _, oneSnap := range snapshotResp.Snapshots { + if oneSnap.Name == snapName { + ctxlog.V(4).Infof(ctx, "[curve snapshot] get snapshot (%+v) by name: %v", oneSnap, snapName) + return oneSnap, nil + } + } + + total = snapshotResp.TotalCount + offset += limit + } + + return snap, util.NewNotFoundErr() +} + +// GetSnapshotById gets the snapshot with specific uuid +func (cs *SnapshotServer) GetFileSnapshotOfId(ctx context.Context, uuid string) (Snapshot, error) { + var snap Snapshot + snapshotResp, err := cs.getFileSnapshots(ctx, uuid, 0, 0) + if err != nil { + ctxlog.ErrorS(ctx, err, "failed to get snapshot by id", "uuid", uuid) + return snap, err + } + + // check snapshot field + if len(snapshotResp.Snapshots) > 1 { + return snap, fmt.Errorf("found multi snapshots with uuid %v", uuid) + } + if snapshotResp.Snapshots[0].UUID != uuid { + return snap, fmt.Errorf("the snapshot(%+v) in response not matched with uuid: %v", snapshotResp.Snapshots[0], uuid) + } + + ctxlog.V(4).Infof(ctx, "[curve snapshot] get snapshot %v successfully.", snapshotResp.Snapshots[0]) + return snapshotResp.Snapshots[0], nil +} + +// getFileSnapshots get snapshots list +func (cs *SnapshotServer) getFileSnapshots(ctx context.Context, uuid string, limit, offset int) (GetSnapshotResp, error) { + var resp GetSnapshotResp + queryMap := map[string]string{ + "Action": "GetFileSnapshotInfo", + "Version": "0.0.6", + "User": cs.User, + "File": cs.FilePath, + } + if limit > 0 { + queryMap["Limit"] = strconv.Itoa(limit) + } + if offset > 0 { + queryMap["Offset"] = strconv.Itoa(offset) + } + if uuid != "" { + queryMap["UUID"] = uuid + } + + ctxlog.V(4).Infof(ctx, "starting to get snapshots: %v", queryMap) + statusCode, data, err := util.HttpGet(cs.URL, queryMap) + if err != nil { + return resp, fmt.Errorf("failed to get snapshot, err: %v", err) + } + + if err = json.Unmarshal(data, &resp); err != nil { + return resp, fmt.Errorf("unmarshal failed when get snapshot. statusCode: %v, data: %v, err: %v", statusCode, string(data), err) + } + + if resp.Code == FileNotExists || len(resp.Snapshots) == 0 { + ctxlog.V(4).Infof(ctx, "not found, resp: %+v", resp) + if uuid != "" { + return resp, util.NewNotFoundErr(uuid) + } + return resp, util.NewNotFoundErr() + } + + if resp.Code != ExecSuccess { + return resp, fmt.Errorf("faied to get snapshot, resp: %+v", resp) + } + + ctxlog.V(5).Infof(ctx, "[curve snapshot] get snapshots: %+v", resp) + return resp, nil +} + +// CreateSnapshot creates a snapshot and returns the uuid +func (cs *SnapshotServer) CreateSnapshot(ctx context.Context, snapName string) (string, error) { + queryMap := map[string]string{ + "Action": "CreateSnapshot", + "Version": "0.0.6", + "User": cs.User, + "File": cs.FilePath, + "Name": snapName, + } + + ctxlog.V(4).Infof(ctx, "starting to create snapshot: %v", queryMap) + statusCode, data, err := util.HttpGet(cs.URL, queryMap) + if err != nil { + return "", fmt.Errorf("failed to create snapshot, err: %v", err) + } + if statusCode != http.StatusOK { + return "", fmt.Errorf("failed to create snapshot, statusCode: %v, response data: %v", statusCode, string(data)) + } + + var resp CreateSnapshotResp + if err = json.Unmarshal(data, &resp); err != nil { + return "", fmt.Errorf("failed to unmarshal data: %v, err: %v", string(data), err) + } + + if resp.Code != ExecSuccess { + return "", fmt.Errorf("faied to create snapshot, resp: %+v", resp) + } + + ctxlog.V(4).Infof(ctx, "[curve snapshot] create snapshot successfully with uuid: %v", resp.UUID) + return resp.UUID, nil +} + +// DeleteSnapshot detetes a snapshot +func (cs *SnapshotServer) DeleteSnapshot(ctx context.Context, uuid string) error { + queryMap := map[string]string{ + "Action": "DeleteSnapshot", + "Version": "0.0.6", + "User": cs.User, + "File": cs.FilePath, + "UUID": uuid, + } + + ctxlog.V(4).Infof(ctx, "starting to delete snapshot: %v", queryMap) + statusCode, data, err := util.HttpGet(cs.URL, queryMap) + if err != nil { + return fmt.Errorf("failed to delete snapshot, err: %v", err) + } + if statusCode != http.StatusOK { + return fmt.Errorf("failed to delete snapshot, statusCode: %v, response data: %v", statusCode, string(data)) + } + + var resp DeleteSnapshotResp + if err = json.Unmarshal(data, &resp); err != nil { + return fmt.Errorf("failed to unmarshal data: %v, err: %v", string(data), err) + } + + if resp.Code != ExecSuccess { + return fmt.Errorf("faied to delete snapshot, resp: %+v", resp) + } + + ctxlog.V(4).Infof(ctx, "[curve snapshot] delete snapshot successfully with uuid: %v", uuid) + return nil +} + +// CancelSnapshot cancels a snapshot +func (cs *SnapshotServer) CancelSnapshot(ctx context.Context, uuid string) error { + queryMap := map[string]string{ + "Action": "CancelSnapshot", + "Version": "0.0.6", + "User": cs.User, + "File": cs.FilePath, + "UUID": uuid, + } + + ctxlog.V(4).Infof(ctx, "starting to cancel snapshot: %v", queryMap) + statusCode, data, err := util.HttpGet(cs.URL, queryMap) + if err != nil { + return fmt.Errorf("failed to cancel snapshot, err: %v", err) + } + + var resp CancelSnapshotResp + if err = json.Unmarshal(data, &resp); err != nil { + return fmt.Errorf("unmarshal failed when cancel snapshot. statusCode: %v, data: %v, err: %v", statusCode, string(data), err) + } + + if resp.Code == FileNotExists { + return util.NewNotFoundErr(uuid) + } + if resp.Code != ExecSuccess { + return fmt.Errorf("faied to cancel snapshot, resp: %+v", resp) + } + + ctxlog.V(4).Infof(ctx, "[curve snapshot] cancel snapshot successfully with uuid: %v", uuid) + return nil +} + +// Wait for the snapshot ready +func (cs *SnapshotServer) WaitForSnapshotDone(ctx context.Context, uuid string) (Snapshot, error) { + var ( + snap Snapshot + err error + ) + backoff := wait.Backoff{ + Duration: snapshotWatcherInitDelay, + Factor: snapshotWatcherFactor, + Steps: snapshotWatcherSteps, + } + + waitErr := wait.ExponentialBackoff(backoff, func() (bool, error) { + snap, err = cs.GetFileSnapshotOfId(ctx, uuid) + if err != nil { + return false, fmt.Errorf("failed to get snapshort for uuid %v, err: %v", uuid, err) + } + ctxlog.V(4).Infof(ctx, "the snapshot (name: %v uuid: %v) process %v%%", snap.Name, uuid, snap.Progress) + return snap.Status == SnapshotStatusDone, nil + }) + // return error if err has not become available for the specified timeout + if waitErr == wait.ErrWaitTimeout { + return snap, fmt.Errorf("timeout to wait, snapshot (uuid %v) is still not done", uuid) + } + // return error if any other errors were encountered during waiting for the snapshot to become done + return snap, waitErr +} + +// Get task with specific destination +func (cs *SnapshotServer) GetCloneTaskOfDestination(ctx context.Context, destination string) (TaskInfo, error) { + var taskInfo TaskInfo + resp, err := cs.getCloneTask(ctx, "", destination, 0, 0) + if err != nil { + return taskInfo, err + } + if len(resp.TaskInfos) > 1 { + return taskInfo, fmt.Errorf("found multi task of destination: %v", destination) + } + ctxlog.V(4).Infof(ctx, "[curve snapshot] get clone task successfully: %+v", resp.TaskInfos[0]) + return resp.TaskInfos[0], nil +} + +// Get task with specific uuid +func (cs *SnapshotServer) GetCloneTaskOfId(ctx context.Context, uuid string) (TaskInfo, error) { + var taskInfo TaskInfo + resp, err := cs.getCloneTask(ctx, uuid, "", 0, 0) + if err != nil { + return taskInfo, err + } + if len(resp.TaskInfos) > 1 { + return taskInfo, fmt.Errorf("found multi task of uuid: %v", uuid) + } + ctxlog.V(4).Infof(ctx, "[curve snapshot] get clone task successfully: %+v", resp.TaskInfos[0]) + return resp.TaskInfos[0], nil +} + +// Get tasks +func (cs *SnapshotServer) getCloneTask(ctx context.Context, uuid, destination string, limit, offset int) (GetCloneTaskResp, error) { + var resp GetCloneTaskResp + queryMap := map[string]string{ + "Action": "GetCloneTasks", + "Version": "0.0.6", + "User": cs.User, + } + if uuid != "" { + queryMap["UUID"] = uuid + } + if destination != "" { + queryMap["File"] = destination + } + if limit != 0 { + queryMap["Limit"] = strconv.Itoa(limit) + } + if offset != 0 { + queryMap["Offset"] = strconv.Itoa(offset) + } + + ctxlog.V(4).Infof(ctx, "starting to get clone task: %v", queryMap) + statusCode, data, err := util.HttpGet(cs.URL, queryMap) + if err != nil { + return resp, fmt.Errorf("failed to get clone task, err: %v", err) + } + + if err = json.Unmarshal(data, &resp); err != nil { + return resp, fmt.Errorf("unmarshal failed when get task. statusCode: %v, data: %v, err: %v", statusCode, string(data), err) + } + + if resp.Code == FileNotExists || len(resp.TaskInfos) == 0 { + return resp, util.NewNotFoundErr() + } + if resp.Code != ExecSuccess { + return resp, fmt.Errorf("faied to get task, resp: %+v", resp) + } + return resp, nil +} + +// Clone a volume from source to destination +func (cs *SnapshotServer) Clone(ctx context.Context, source, destination string, lazy bool) (string, error) { + queryMap := map[string]string{ + "Action": "Clone", + "Version": "0.0.6", + "User": cs.User, + "Source": source, + "Destination": destination, + "Lazy": strconv.FormatBool(lazy), + } + + ctxlog.V(4).Infof(ctx, "starting to clone snapshot: %v", queryMap) + statusCode, data, err := util.HttpGet(cs.URL, queryMap) + if err != nil { + return "", fmt.Errorf("failed to clone snapshot, err: %v", err) + } + + var resp CloneResp + if err = json.Unmarshal(data, &resp); err != nil { + return "", fmt.Errorf("unmarshal failed when clone. statusCode: %v, data: %v, err: %v", statusCode, string(data), err) + } + + if resp.Code == FileNotExists { + return "", util.NewNotFoundErr() + } + if resp.Code != ExecSuccess { + return "", fmt.Errorf("faied to clone snapshot, resp: %+v", resp) + } + + ctxlog.V(4).Infof(ctx, "[curve snapshot] clone %v to %v successfully with task id: %v", source, destination, resp.UUID) + return resp.UUID, nil +} + +// Clean a clone task, flatten if it is unfinished. +func (cs *SnapshotServer) CleanCloneTask(ctx context.Context, uuid string) error { + ctxlog.V(4).Infof(ctx, "get task status %v before clean it", uuid) + taskInfo, err := cs.GetCloneTaskOfId(ctx, uuid) + if err != nil { + if util.IsNotFoundErr(err) { + return nil + } + return err + } + if taskInfo.TaskStatus == TaskStatusMetaInstalled { + if err = cs.Flatten(ctx, uuid); err != nil { + return err + } + } + if err = cs.waitForCloneTaskStatus(ctx, taskInfo.File, TaskStatusDone, TaskStatusError); err != nil { + if util.IsNotFoundErr(err) { + return nil + } + return err + } + + return cs.cleanCloneTask(ctx, uuid) +} + +func (cs *SnapshotServer) cleanCloneTask(ctx context.Context, uuid string) error { + queryMap := map[string]string{ + "Action": "CleanCloneTask", + "Version": "0.0.6", + "User": cs.User, + "UUID": uuid, + } + + ctxlog.V(4).Infof(ctx, "starting to clean cloneTask: %v", queryMap) + statusCode, data, err := util.HttpGet(cs.URL, queryMap) + if err != nil { + return fmt.Errorf("failed to clean cloneTask, err: %v", err) + } + + var resp CleanCloneTaskResp + if err = json.Unmarshal(data, &resp); err != nil { + return fmt.Errorf("unmarshal failed when clean cloneTask. statusCode: %v, data: %v, err: %v", statusCode, string(data), err) + } + + if resp.Code == FileNotExists { + return nil + } + if resp.Code != ExecSuccess { + return fmt.Errorf("faied to clean cloneTask, resp: %+v", resp) + } + + ctxlog.V(4).Infof(ctx, "[curve snapshot] successfully clean cloneTask: %v", uuid) + return nil +} + +func (cs *SnapshotServer) Flatten(ctx context.Context, uuid string) error { + queryMap := map[string]string{ + "Action": "Flatten", + "Version": "0.0.6", + "User": cs.User, + "UUID": uuid, + } + + ctxlog.V(4).Infof(ctx, "starting to flatten task: %v", queryMap) + statusCode, data, err := util.HttpGet(cs.URL, queryMap) + if err != nil { + return fmt.Errorf("failed to flatten task, err: %v", err) + } + + var resp FlattenResp + if err = json.Unmarshal(data, &resp); err != nil { + return fmt.Errorf("unmarshal failed when flatten task. statusCode: %v, data: %v, err: %v", statusCode, string(data), err) + } + + if resp.Code == FileNotExists { + return util.NewNotFoundErr() + } + if resp.Code != ExecSuccess { + return fmt.Errorf("faied to flatten task, resp: %+v", resp) + } + + ctxlog.V(4).Infof(ctx, "[curve snapshot] successfully flatten task: %v", uuid) + return nil +} + +func (cs *SnapshotServer) waitForCloneTaskStatus(ctx context.Context, destination string, taskStatus ...TaskStatus) error { + if len(taskStatus) == 0 { + return nil + } + + backoff := wait.Backoff{ + Duration: snapshotWatcherInitDelay, + Factor: snapshotWatcherFactor, + Steps: snapshotWatcherSteps, + } + + waitErr := wait.ExponentialBackoff(backoff, func() (bool, error) { + taskInfo, err := cs.GetCloneTaskOfDestination(ctx, destination) + if err != nil { + ctxlog.ErrorS(ctx, err, "failed to get clone task", "destination", destination) + return false, err + } + for _, status := range taskStatus { + if taskInfo.TaskStatus == status { + return true, nil + } + } + return false, nil + }) + // return error if err has not become available for the specified timeout + if waitErr == wait.ErrWaitTimeout { + return fmt.Errorf("timeout to wait, task is still not %v", taskStatus) + } + // return error if any other errors were encountered during waiting for the task to become done + return waitErr +} + +// Wait for the task ready: Done or MetaInstalled +func (cs *SnapshotServer) WaitForCloneTaskReady(ctx context.Context, destination string) error { + ctxlog.V(4).Infof(ctx, "wait for task of destination %q status ready", destination) + return cs.waitForCloneTaskStatus(ctx, destination, TaskStatusDone, TaskStatusMetaInstalled) +} + +func (cs *SnapshotServer) WaitForCloneTaskDone(ctx context.Context, destination string) error { + ctxlog.V(4).Infof(ctx, "wait for task of destination %q status done", destination) + return cs.waitForCloneTaskStatus(ctx, destination, TaskStatusDone) +} + +func (cs *SnapshotServer) EnsureTaskFromSourceDone(ctx context.Context, source string) error { + ctxlog.V(4).Infof(ctx, "ensure task created from %v status done", source) + + needFlatten := make([]TaskInfo, 0) + limit, offset, total := 20, 0, 1 + for offset < total { + taskResp, err := cs.getCloneTask(ctx, "", "", limit, offset) + if err != nil { + if util.IsNotFoundErr(err) { + break + } + return err + } + for _, oneTask := range taskResp.TaskInfos { + if oneTask.Src == source && oneTask.TaskStatus == TaskStatusMetaInstalled { + needFlatten = append(needFlatten, oneTask) + } + } + total = taskResp.TotalCount + offset += limit + } + + ctxlog.V(4).Infof(ctx, "need flatten tasks: %v", needFlatten) + for _, t := range needFlatten { + if err := cs.Flatten(ctx, t.UUID); err != nil { + return err + } + } + for _, t := range needFlatten { + taskInfo, err := cs.GetCloneTaskOfId(ctx, t.UUID) + if err != nil { + if util.IsNotFoundErr(err) { + continue + } + return err + } + if taskInfo.TaskStatus == TaskStatusError { + ctxlog.Warningf(ctx, "%v status err, just clean it", t) + if err = cs.cleanCloneTask(ctx, taskInfo.UUID); err != nil { + return err + } + } + if err := cs.waitForCloneTaskStatus(ctx, t.File, TaskStatusDone); err != nil { + return err + } + } + ctxlog.V(4).Infof(ctx, "[curve snapshot] all tasks from %v done", source) + return nil +} diff --git a/pkg/curveservice/snapshotdetail.go b/pkg/curveservice/snapshotdetail.go new file mode 100644 index 0000000..f776562 --- /dev/null +++ b/pkg/curveservice/snapshotdetail.go @@ -0,0 +1,135 @@ +/* +Copyright 2021 The Netease Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package curveservice + +type SnapshotStatus uint8 + +type RespCode string + +const ( + ExecSuccess RespCode = "0" + FileNotExists RespCode = "-8" +) + +// SnapshotStatus: +// (0:done, 1:pending, 2:deleteing, 3:errorDeleting, 4:canceling, 5:error) +const ( + SnapshotStatusDone SnapshotStatus = iota + SnapshotStatusPending + SnapshotStatusDeleteing + SnapshotStatusErrorDeleting + SnapshotStatusCanceling + SnapshotStatusError +) + +type TaskType uint8 + +// TaskType: (0:clone, 1:recover) +const ( + TaskTypeClone TaskType = iota + TaskTypeRecover +) + +type TaskFileType uint8 + +// TaskFileType: (0:SrcFile 1:SrcSnapshot) +const ( + TaskFileTypeSrcFile TaskFileType = iota + TaskFileTypeSrcSnapshot +) + +type TaskStatus uint8 + +// TaskStatus +// (0:done, 1:cloning, 2:recovering, 3:cleaning, 4:errorCleaning, 5:error,6:retrying, 7:metaInstalled) +const ( + TaskStatusDone TaskStatus = iota + TaskStatusCloning + TaskStatusRecovering + TaskStatusCleaning + TaskStatusErrorCleaning + TaskStatusError + TaskStatusRetrying + TaskStatusMetaInstalled +) + +type SnapshotCommonResp struct { + Code RespCode `json:"Code"` + Message string `json:"Message"` + RequestId string `json:"RequestId"` +} + +type CreateSnapshotResp struct { + SnapshotCommonResp + UUID string `json:"UUID,omitempty"` +} + +type DeleteSnapshotResp SnapshotCommonResp + +type CancelSnapshotResp SnapshotCommonResp + +type GetSnapshotResp struct { + SnapshotCommonResp + TotalCount int `json:"TotalCount,omitempty"` + Snapshots []Snapshot `json:"Snapshots,omitempty"` +} + +type Snapshot struct { + UUID string `json:"UUID"` + User string `json:"User"` + File string `json:"File"` + SeqNum uint32 `json:"SeqNum"` + Name string `json:"Name"` + Time int64 `json:"Time"` + FileLength uint64 `json:"FileLength"` //unit Byte + Status SnapshotStatus `json:"Status"` + Progress uint8 `json:"Progress"` +} + +type CloneResp struct { + SnapshotCommonResp + UUID string `json:"UUID,omitempty"` +} + +type RecoverResp struct { + SnapshotCommonResp + UUID string `json:"UUID,omitempty"` +} + +type FlattenResp SnapshotCommonResp + +// also use as recover task +type GetCloneTaskResp struct { + SnapshotCommonResp + TotalCount int `json:"TotalCount,omitempty"` + TaskInfos []TaskInfo `json:"TaskInfos,omitempty"` +} + +type TaskInfo struct { + File string `json:"File"` + TaskFileType TaskFileType `json:"FileType,omitempty"` + IsLazy bool `json:"IsLazy,omitempty"` + Progress uint8 `json:"Progress,omitempty"` + Src string `json:"Src,omitempty"` + TaskStatus TaskStatus `json:"TaskStatus"` + TaskType TaskType `json:"TaskType"` + Time int64 `json:"Time"` + UUID string `json:"UUID"` + User string `json:"User"` +} + +type CleanCloneTaskResp SnapshotCommonResp diff --git a/pkg/util/devicestats.go b/pkg/util/devicestats.go index 90f9fcf..8dab42f 100644 --- a/pkg/util/devicestats.go +++ b/pkg/util/devicestats.go @@ -55,17 +55,18 @@ func GetDeviceStats(path string) (*DeviceStats, error) { return nil, fmt.Errorf("failed to statfs() %q: %s", path, err) } - return &DeviceStats{ + deviceStats := &DeviceStats{ Block: false, - AvailableBytes: int64(statfs.Bavail) * statfs.Bsize, - TotalBytes: int64(statfs.Blocks) * statfs.Bsize, - UsedBytes: (int64(statfs.Blocks) - int64(statfs.Bfree)) * statfs.Bsize, + AvailableBytes: int64(statfs.Bavail) * int64(statfs.Bsize), + TotalBytes: int64(statfs.Blocks) * int64(statfs.Bsize), AvailableInodes: int64(statfs.Ffree), TotalInodes: int64(statfs.Files), - UsedInodes: int64(statfs.Files) - int64(statfs.Ffree), - }, nil + } + deviceStats.UsedBytes = deviceStats.TotalBytes - deviceStats.AvailableBytes + deviceStats.UsedInodes = deviceStats.TotalInodes - deviceStats.AvailableInodes + return deviceStats, nil } func isBlockDevice(path string) (bool, error) { diff --git a/pkg/util/httphandler.go b/pkg/util/httphandler.go index 16ba919..1e01e12 100644 --- a/pkg/util/httphandler.go +++ b/pkg/util/httphandler.go @@ -22,6 +22,7 @@ import ( "net/http" "net/http/pprof" runtime_pprof "runtime/pprof" + "strings" "k8s.io/klog/v2" ) @@ -79,3 +80,47 @@ func addPath(name string, handler http.Handler) { http.Handle(name, handler) klog.V(4).Infof("DEBUG: registered profiling handler on /debug/pprof/%s", name) } + +func HttpGet(reqURL string, queryMap map[string]string) (int, []byte, error) { + req, err := http.NewRequest(http.MethodGet, reqURL, nil) + if err != nil { + return 0, nil, err + } + + // set http header + header := http.Header{} + header.Add("Content-Type", "application/json") + header.Add("Accept", "application/json") + req.Header = header + + // TODO: now the snapshot server can not recognize URL encode + req.URL.RawQuery = buildRawQuery(queryMap) + + klog.V(6).Infof("Request: %+v", req) + resp, err := http.DefaultClient.Do(req) + if err != nil { + return -1, nil, err + } + klog.V(6).Infof("Response: %+v", *resp) + defer resp.Body.Close() + + data, err := ioutil.ReadAll(resp.Body) + if err != nil { + return resp.StatusCode, nil, err + } + klog.V(7).Infof("Response data: %+v", string(data)) + return resp.StatusCode, data, nil +} + +func buildRawQuery(queryMap map[string]string) string { + var buf strings.Builder + for k, v := range queryMap { + if buf.Len() > 0 { + buf.WriteByte('&') + } + buf.WriteString(k) + buf.WriteByte('=') + buf.WriteString(v) + } + return buf.String() +} diff --git a/pkg/util/idlocker.go b/pkg/util/idlocker.go index b07060f..58f89cd 100644 --- a/pkg/util/idlocker.go +++ b/pkg/util/idlocker.go @@ -25,6 +25,9 @@ import ( const ( // VolumeOperationAlreadyExistsFmt string format to return for concerrent operation VolumeOperationAlreadyExistsFmt = "an operation with the given Volume ID %s already exists" + + // VolumeOperationAlreadyExistsFmt string format to return for concerrent operation + SnapshotOperationAlreadyExistsFmt = "an operation with the given Snapshot ID %s already exists" ) // VolumeLocks implements a map with atomic operations. It stores a set of all volume IDs diff --git a/pkg/util/util.go b/pkg/util/util.go index 8968441..f6f8437 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -84,3 +84,26 @@ func SystemMapOnHost(ctx context.Context, serviceName string, mapCommands []stri ctxlog.Infof(ctx, "map successfully, running as %s.service", serviceName) return nil } + +type NotFoundErr struct { + Id string +} + +func NewNotFoundErr(id ...string) *NotFoundErr { + if len(id) != 0 { + return &NotFoundErr{Id: strings.Join(id, ",")} + } + return &NotFoundErr{} +} + +func (e *NotFoundErr) Error() string { + if e.Id == "" { + return "Not found" + } + return fmt.Sprintf("Object with Id(%v) not found", e.Id) +} + +func IsNotFoundErr(err error, id ...string) bool { + notFoundErr := NewNotFoundErr(id...) + return err.Error() == notFoundErr.Error() +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 4065bba..061a312 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -116,6 +116,7 @@ google.golang.org/grpc/stats google.golang.org/grpc/status google.golang.org/grpc/tap # google.golang.org/protobuf v1.26.0 +## explicit google.golang.org/protobuf/encoding/prototext google.golang.org/protobuf/encoding/protowire google.golang.org/protobuf/internal/descfmt