Skip to content

Commit 62451fd

Browse files
committed
support clone
Signed-off-by: Vicente Cheng <vicente.cheng@suse.com>
1 parent 721a1a4 commit 62451fd

File tree

4 files changed

+218
-30
lines changed

4 files changed

+218
-30
lines changed

cmd/provisioner/clonelv.go

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"os"
6+
"syscall"
7+
"time"
8+
9+
"github.com/urfave/cli/v2"
10+
"k8s.io/klog/v2"
11+
12+
lvm "github.com/harvester/csi-driver-lvm/pkg/lvm"
13+
)
14+
15+
func cloneLVCmd() *cli.Command {
16+
return &cli.Command{
17+
Name: "clonelv",
18+
Flags: []cli.Flag{
19+
&cli.StringFlag{
20+
Name: flagSrcDev,
21+
Usage: "Required. Source device name.",
22+
},
23+
&cli.StringFlag{
24+
Name: flagLVName,
25+
Usage: "Required. Target LV",
26+
},
27+
&cli.StringFlag{
28+
Name: flagVGName,
29+
Usage: "Required. the name of the volumegroup",
30+
},
31+
&cli.StringFlag{
32+
Name: flagLVMType,
33+
Usage: "Required. type of lvs, can be either striped or dm-thin",
34+
},
35+
&cli.Uint64Flag{
36+
Name: flagLVSize,
37+
Usage: "Required. The size of the lv in MiB",
38+
},
39+
},
40+
Action: func(c *cli.Context) error {
41+
if err := clonelv(c); err != nil {
42+
klog.Fatalf("Error creating lv: %v", err)
43+
return err
44+
}
45+
return nil
46+
},
47+
}
48+
}
49+
50+
func clonelv(c *cli.Context) error {
51+
srcDev := c.String(flagSrcDev)
52+
if srcDev == "" {
53+
return fmt.Errorf("invalid empty flag %v", flagSrcDev)
54+
}
55+
dstLV := c.String(flagLVName)
56+
if dstLV == "" {
57+
return fmt.Errorf("invalid empty flag %v", flagLVName)
58+
}
59+
dstVGName := c.String(flagVGName)
60+
if dstVGName == "" {
61+
return fmt.Errorf("invalid empty flag %v", flagVGName)
62+
}
63+
dstLVType := c.String(flagLVMType)
64+
if dstLVType == "" {
65+
return fmt.Errorf("invalid empty flag %v", flagLVMType)
66+
}
67+
dstSize := c.Uint64(flagLVSize)
68+
if dstSize == 0 {
69+
return fmt.Errorf("invalid empty flag %v", flagLVSize)
70+
}
71+
72+
klog.Infof("Clone from src:%s, to dst: %s/%s", srcDev, dstVGName, dstLV)
73+
74+
// check source dev
75+
src, err := os.OpenFile(srcDev, syscall.O_RDONLY|syscall.O_DIRECT, 0)
76+
if err != nil {
77+
return fmt.Errorf("unable to open source device: %w", err)
78+
}
79+
defer src.Close()
80+
81+
if !lvm.VgExists(dstVGName) {
82+
lvm.VgActivate()
83+
time.Sleep(1 * time.Second) // jitter
84+
if !lvm.VgExists(dstVGName) {
85+
return fmt.Errorf("vg %s does not exist, please check the corresponding VG is created", dstVGName)
86+
}
87+
}
88+
89+
output, err := lvm.CreateLVS(dstVGName, dstLV, dstSize, dstLVType)
90+
if err != nil {
91+
return fmt.Errorf("unable to create lv: %w output:%s", err, output)
92+
}
93+
klog.Infof("lv: %s created, vg:%s size:%d type:%s", dstLV, dstVGName, dstSize, dstLVType)
94+
95+
dst, err := os.OpenFile(fmt.Sprintf("/dev/%s/%s", dstVGName, dstLV), syscall.O_WRONLY|syscall.O_DIRECT, 0)
96+
if err != nil {
97+
return fmt.Errorf("unable to open target device: %w", err)
98+
}
99+
defer dst.Close()
100+
101+
// Clone the source device to the target device
102+
if err := lvm.CloneDevice(src, dst); err != nil {
103+
return fmt.Errorf("unable to clone device: %w", err)
104+
}
105+
if err := dst.Sync(); err != nil {
106+
return fmt.Errorf("unable to sync target device: %w", err)
107+
}
108+
109+
klog.Infof("lv: %s/%s cloned from %s", dstVGName, dstLV, srcDev)
110+
111+
return nil
112+
}

cmd/provisioner/main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ const (
1515
flagDevicesPattern = "devices"
1616
flagLVMType = "lvmtype"
1717
flagSnapName = "snapname"
18+
flagSrcDev = "srcdev"
1819
)
1920

2021
func cmdNotFound(c *cli.Context, command string) {
@@ -33,6 +34,7 @@ func main() {
3334
deleteLVCmd(),
3435
createSnapCmd(),
3536
deleteSnapCmd(),
37+
cloneLVCmd(),
3638
}
3739
p.CommandNotFound = cmdNotFound
3840
p.OnUsageError = onUsageError

pkg/lvm/controllerserver.go

Lines changed: 79 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package lvm
1818

1919
import (
20+
"fmt"
2021
"strconv"
2122
"time"
2223

@@ -66,6 +67,7 @@ func newControllerServer(nodeID string, hostWritePath string, namespace string,
6667
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
6768
csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT,
6869
csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS,
70+
csi.ControllerServiceCapability_RPC_CLONE_VOLUME,
6971
// TODO
7072
// csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS,
7173
// csi.ControllerServiceCapability_RPC_CLONE_VOLUME,
@@ -134,22 +136,41 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
134136
}}
135137
klog.Infof("creating volume %s on node: %s", req.GetName(), node)
136138

137-
va := volumeAction{
138-
action: actionTypeCreate,
139-
name: req.GetName(),
140-
nodeName: node,
141-
size: req.GetCapacityRange().GetRequiredBytes(),
142-
lvmType: lvmType,
143-
pullPolicy: cs.pullPolicy,
144-
provisionerImage: cs.provisionerImage,
145-
kubeClient: cs.kubeClient,
146-
namespace: cs.namespace,
147-
vgName: vgName,
148-
hostWritePath: cs.hostWritePath,
149-
}
150-
if err := createProvisionerPod(ctx, va); err != nil {
151-
klog.Errorf("error creating provisioner pod :%v", err)
152-
return nil, err
139+
if req.GetVolumeContentSource() != nil {
140+
volumeSource := req.VolumeContentSource
141+
switch volumeSource.Type.(type) {
142+
case *csi.VolumeContentSource_Snapshot:
143+
return nil, status.Error(codes.NotFound, "snapshot source not supported")
144+
case *csi.VolumeContentSource_Volume:
145+
srcVolID := volumeSource.GetVolume().GetVolumeId()
146+
srcVolume, err := cs.kubeClient.CoreV1().PersistentVolumes().Get(ctx, srcVolID, metav1.GetOptions{})
147+
if err != nil {
148+
return nil, status.Errorf(codes.Unavailable, "source volume %s not found", srcVolID)
149+
}
150+
if err := cs.cloneFromVolume(ctx, srcVolume, req.GetName(), node, lvmType, vgName, req.GetCapacityRange().GetRequiredBytes()); err != nil {
151+
return nil, err
152+
}
153+
default:
154+
return nil, status.Errorf(codes.InvalidArgument, "%v not a proper volume source", volumeSource)
155+
}
156+
} else {
157+
va := volumeAction{
158+
action: actionTypeCreate,
159+
name: req.GetName(),
160+
nodeName: node,
161+
size: req.GetCapacityRange().GetRequiredBytes(),
162+
lvmType: lvmType,
163+
pullPolicy: cs.pullPolicy,
164+
provisionerImage: cs.provisionerImage,
165+
kubeClient: cs.kubeClient,
166+
namespace: cs.namespace,
167+
vgName: vgName,
168+
hostWritePath: cs.hostWritePath,
169+
}
170+
if err := createProvisionerPod(ctx, va); err != nil {
171+
klog.Errorf("error creating provisioner pod :%v", err)
172+
return nil, err
173+
}
153174
}
154175

155176
return &csi.CreateVolumeResponse{
@@ -163,6 +184,48 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
163184
}, nil
164185
}
165186

187+
func (cs *controllerServer) cloneFromVolume(ctx context.Context, srcVol *v1.PersistentVolume, dstName, dstNode, dstLVType, dstVGName string, dstSize int64) error {
188+
ns := srcVol.Spec.NodeAffinity.Required.NodeSelectorTerms
189+
srcNode := ns[0].MatchExpressions[0].Values[0]
190+
srcVgName := srcVol.Spec.CSI.VolumeAttributes["vgName"]
191+
srcSizeStr := srcVol.Spec.CSI.VolumeAttributes["RequiredBytes"]
192+
srcSize, err := strconv.ParseInt(srcSizeStr, 10, 64)
193+
if err != nil {
194+
klog.Errorf("error parsing srcSize: %v", err)
195+
return err
196+
}
197+
srcDev := fmt.Sprintf("/dev/%s/%s", srcVgName, srcVol.GetName())
198+
klog.V(4).Infof("cloning volume from %s ", srcDev)
199+
200+
if srcSize > dstSize {
201+
return status.Error(codes.InvalidArgument, "source volume size is larger than destination volume size")
202+
}
203+
if srcNode != dstNode {
204+
return status.Errorf(codes.InvalidArgument, "source (%s) and destination (%s) nodes are different (not supported)", srcNode, dstNode)
205+
}
206+
207+
va := volumeAction{
208+
action: actionTypeClone,
209+
name: dstName,
210+
nodeName: dstNode,
211+
size: dstSize,
212+
lvmType: dstLVType,
213+
pullPolicy: cs.pullPolicy,
214+
provisionerImage: cs.provisionerImage,
215+
kubeClient: cs.kubeClient,
216+
namespace: cs.namespace,
217+
vgName: dstVGName,
218+
hostWritePath: cs.hostWritePath,
219+
srcDev: srcDev,
220+
}
221+
if err := createProvisionerPod(ctx, va); err != nil {
222+
klog.Errorf("error creating provisioner pod :%v", err)
223+
return err
224+
}
225+
226+
return nil
227+
}
228+
166229
func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
167230
// Check arguments
168231
if len(req.GetVolumeId()) == 0 {

pkg/lvm/lvm.go

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,11 @@ import (
2727
"time"
2828

2929
cmd "github.com/harvester/go-common/command"
30+
ioutil "github.com/harvester/go-common/io"
3031
"google.golang.org/grpc/codes"
3132
"google.golang.org/grpc/status"
3233
v1 "k8s.io/api/core/v1"
3334
k8serror "k8s.io/apimachinery/pkg/api/errors"
34-
"k8s.io/apimachinery/pkg/api/resource"
3535
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3636
"k8s.io/client-go/kubernetes"
3737
"k8s.io/klog/v2"
@@ -72,6 +72,7 @@ type volumeAction struct {
7272
namespace string
7373
vgName string
7474
hostWritePath string
75+
srcDev string
7576
}
7677

7778
type snapshotAction struct {
@@ -94,8 +95,10 @@ const (
9495
mirrorType = "mirror"
9596
actionTypeCreate = "create"
9697
actionTypeDelete = "delete"
98+
actionTypeClone = "clone"
9799
pullIfNotPresent = "ifnotpresent"
98100
fsTypeRegexpString = `TYPE="(\w+)"`
101+
DefaultChunkSize = 4 * 1024 * 1024
99102
)
100103

101104
var (
@@ -337,11 +340,15 @@ func createProvisionerPod(ctx context.Context, va volumeAction) (err error) {
337340
}
338341

339342
args := []string{}
340-
if va.action == actionTypeCreate {
343+
switch va.action {
344+
case actionTypeCreate:
341345
args = append(args, "createlv", "--lvsize", fmt.Sprintf("%d", va.size), "--lvmtype", va.lvmType, "--vgname", va.vgName)
342-
}
343-
if va.action == actionTypeDelete {
346+
case actionTypeDelete:
344347
args = append(args, "deletelv")
348+
case actionTypeClone:
349+
args = append(args, "clonelv", "--srcdev", va.srcDev, "--lvsize", fmt.Sprintf("%d", va.size), "--vgname", va.vgName, "--lvmtype", va.lvmType)
350+
default:
351+
return fmt.Errorf("invalid action %v", va.action)
345352
}
346353
args = append(args, "--lvname", va.name)
347354

@@ -568,6 +575,10 @@ func DeleteSnapshot(snapshotName, vgName string) (string, error) {
568575
return out, err
569576
}
570577

578+
func CloneDevice(src, dst *os.File) error {
579+
return ioutil.Copy(src, dst, DefaultChunkSize)
580+
}
581+
571582
func pvCount(vgname string) (int, error) {
572583
executor := cmd.NewExecutor()
573584
out, err := executor.Execute("vgs", []string{vgname, "--noheadings", "-o", "pv_count"})
@@ -712,16 +723,16 @@ func genProvisionerPodContent(action, name, targetNode, hostWritePath, provision
712723
SecurityContext: &v1.SecurityContext{
713724
Privileged: &privileged,
714725
},
715-
Resources: v1.ResourceRequirements{
716-
Requests: v1.ResourceList{
717-
"cpu": resource.MustParse("50m"),
718-
"memory": resource.MustParse("50Mi"),
719-
},
720-
Limits: v1.ResourceList{
721-
"cpu": resource.MustParse("100m"),
722-
"memory": resource.MustParse("100Mi"),
723-
},
724-
},
726+
//Resources: v1.ResourceRequirements{
727+
// Requests: v1.ResourceList{
728+
// "cpu": resource.MustParse("50m"),
729+
// "memory": resource.MustParse("50Mi"),
730+
// },
731+
// Limits: v1.ResourceList{
732+
// "cpu": resource.MustParse("100m"),
733+
// "memory": resource.MustParse("100Mi"),
734+
// },
735+
//},
725736
},
726737
},
727738
Volumes: []v1.Volume{

0 commit comments

Comments
 (0)