Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for block volumes #6897

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelogs/unreleased/6897-dzaninovic
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add support for block volumes with Kopia
6 changes: 6 additions & 0 deletions design/CLI/PoC/overlays/plugins/node-agent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ spec:
- mountPath: /host_pods
mountPropagation: HostToContainer
name: host-pods
- mountPath: /var/lib/kubelet/plugins
mountPropagation: HostToContainer
name: host-plugins
- mountPath: /scratch
name: scratch
- mountPath: /credentials
Expand All @@ -60,6 +63,9 @@ spec:
- hostPath:
path: /var/lib/kubelet/pods
name: host-pods
- hostPath:
path: /var/lib/kubelet/plugins
name: host-plugins
- emptyDir: {}
name: scratch
- name: cloud-credentials
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -703,33 +703,38 @@ type Provider interface {
In this case, we will extend the default kopia uploader to add the ability, when a given volume is for a block mode and is mapped as a device, we will use the [StreamingFile](https://pkg.go.dev/github.com/kopia/kopia@v0.13.0/fs#StreamingFile) to stream the device and backup to the kopia repository.

```go
func getLocalBlockEntry(kopiaEntry fs.Entry, log logrus.FieldLogger) (fs.Entry, error) {
path := kopiaEntry.LocalFilesystemPath()
func getLocalBlockEntry(sourcePath string) (fs.Entry, error) {
source, err := resolveSymlink(sourcePath)
if err != nil {
return nil, errors.Wrap(err, "resolveSymlink")
}

fileInfo, err := os.Lstat(path)
fileInfo, err := os.Lstat(source)
if err != nil {
return nil, errors.Wrapf(err, "Unable to get the source device information %s", path)
return nil, errors.Wrapf(err, "unable to get the source device information %s", source)
}

if (fileInfo.Sys().(*syscall.Stat_t).Mode & syscall.S_IFMT) != syscall.S_IFBLK {
return nil, errors.Errorf("Source path %s is not a block device", path)
return nil, errors.Errorf("source path %s is not a block device", source)
}
device, err := os.Open(path)

device, err := os.Open(source)
if err != nil {
if os.IsPermission(err) || err.Error() == ErrNotPermitted {
return nil, errors.Wrapf(err, "No permission to open the source device %s, make sure that node agent is running in privileged mode", path)
return nil, errors.Wrapf(err, "no permission to open the source device %s, make sure that node agent is running in privileged mode", source)
}
return nil, errors.Wrapf(err, "Unable to open the source device %s", path)
return nil, errors.Wrapf(err, "unable to open the source device %s", source)
}
return virtualfs.StreamingFileFromReader(kopiaEntry.Name(), device), nil

sf := virtualfs.StreamingFileFromReader(source, device)
return virtualfs.NewStaticDirectory(source, []fs.Entry{sf}), nil
}
```

In the `pkg/uploader/kopia/snapshot.go` this is used in the Backup call like

```go
if volMode == PersistentVolumeFilesystem {
if volMode == uploader.PersistentVolumeFilesystem {
// to be consistent with restic when backup empty dir returns one error for upper logic handle
dirs, err := os.ReadDir(source)
if err != nil {
Expand All @@ -742,15 +747,17 @@ In the `pkg/uploader/kopia/snapshot.go` this is used in the Backup call like
source = filepath.Clean(source)
...

sourceEntry, err := getLocalFSEntry(source)
if err != nil {
return nil, false, errors.Wrap(err, "Unable to get local filesystem entry")
}
var sourceEntry fs.Entry

if volMode == PersistentVolumeBlock {
sourceEntry, err = getLocalBlockEntry(sourceEntry, log)
if volMode == uploader.PersistentVolumeBlock {
sourceEntry, err = getLocalBlockEntry(source)
if err != nil {
return nil, false, errors.Wrap(err, "unable to get local block device entry")
}
} else {
sourceEntry, err = getLocalFSEntry(source)
if err != nil {
return nil, false, errors.Wrap(err, "Unable to get local block device entry")
return nil, false, errors.Wrap(err, "unable to get local filesystem entry")
}
}

Expand All @@ -766,37 +773,24 @@ We only need to extend two functions the rest will be passed through.
```go
type BlockOutput struct {
*restore.FilesystemOutput

targetFileName string
}

var _ restore.Output = &BlockOutput{}

const bufferSize = 128 * 1024

func (o *BlockOutput) WriteFile(ctx context.Context, relativePath string, remoteFile fs.File) error {

targetFileName, err := filepath.EvalSymlinks(o.TargetPath)
if err != nil {
return errors.Wrapf(err, "Unable to evaluate symlinks for %s", targetFileName)
}

fileInfo, err := os.Lstat(targetFileName)
if err != nil {
return errors.Wrapf(err, "Unable to get the target device information for %s", targetFileName)
}

if (fileInfo.Sys().(*syscall.Stat_t).Mode & syscall.S_IFMT) != syscall.S_IFBLK {
return errors.Errorf("Target file %s is not a block device", targetFileName)
}

remoteReader, err := remoteFile.Open(ctx)
if err != nil {
return errors.Wrapf(err, "Failed to open remote file %s", remoteFile.Name())
return errors.Wrapf(err, "failed to open remote file %s", remoteFile.Name())
}
defer remoteReader.Close()

targetFile, err := os.Create(targetFileName)
targetFile, err := os.Create(o.targetFileName)
if err != nil {
return errors.Wrapf(err, "Failed to open file %s", targetFileName)
return errors.Wrapf(err, "failed to open file %s", o.targetFileName)
}
defer targetFile.Close()

Expand All @@ -807,7 +801,7 @@ func (o *BlockOutput) WriteFile(ctx context.Context, relativePath string, remote
bytesToWrite, err := remoteReader.Read(buffer)
if err != nil {
if err != io.EOF {
return errors.Wrapf(err, "Failed to read data from remote file %s", targetFileName)
return errors.Wrapf(err, "failed to read data from remote file %s", o.targetFileName)
}
readData = false
}
Expand All @@ -819,7 +813,7 @@ func (o *BlockOutput) WriteFile(ctx context.Context, relativePath string, remote
bytesToWrite -= bytesWritten
offset += bytesWritten
} else {
return errors.Wrapf(err, "Failed to write data to file %s", targetFileName)
return errors.Wrapf(err, "failed to write data to file %s", o.targetFileName)
}
}
}
Expand All @@ -829,42 +823,43 @@ func (o *BlockOutput) WriteFile(ctx context.Context, relativePath string, remote
}

func (o *BlockOutput) BeginDirectory(ctx context.Context, relativePath string, e fs.Directory) error {
targetFileName, err := filepath.EvalSymlinks(o.TargetPath)
var err error
o.targetFileName, err = filepath.EvalSymlinks(o.TargetPath)
if err != nil {
return errors.Wrapf(err, "Unable to evaluate symlinks for %s", targetFileName)
return errors.Wrapf(err, "unable to evaluate symlinks for %s", o.targetFileName)
}

fileInfo, err := os.Lstat(targetFileName)
fileInfo, err := os.Lstat(o.targetFileName)
if err != nil {
return errors.Wrapf(err, "Unable to get the target device information for %s", o.TargetPath)
return errors.Wrapf(err, "unable to get the target device information for %s", o.TargetPath)
}

if (fileInfo.Sys().(*syscall.Stat_t).Mode & syscall.S_IFMT) != syscall.S_IFBLK {
return errors.Errorf("Target file %s is not a block device", o.TargetPath)
return errors.Errorf("target file %s is not a block device", o.TargetPath)
}

return nil
}
```

Of note, we do need to add root access to the daemon set node agent to access the new mount.
Additional mount is required in the node-agent specification to resolve symlinks to the block devices from /host_pods/POD_ID/volumeDevices/kubernetes.io~csi directory.

```yaml
...
- mountPath: /var/lib/kubelet/plugins
mountPropagation: HostToContainer
name: host-plugins

....
- hostPath:
path: /var/lib/kubelet/plugins
name: host-plugins
```

Privileged mode is required to access the block devices in /var/lib/kubelet/plugins/kubernetes.io/csi/volumeDevices/publish directory as confirmed by testing on EKS and Minikube.

...
```yaml
SecurityContext: &corev1.SecurityContext{
Privileged: &c.privilegedAgent,
Privileged: &c.privilegedNodeAgent,
},

```

## Plugin Data Movers
Expand Down
6 changes: 6 additions & 0 deletions pkg/builder/persistent_volume_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ func (b *PersistentVolumeBuilder) StorageClass(name string) *PersistentVolumeBui
return b
}

// VolumeMode sets the PersistentVolume's volume mode.
func (b *PersistentVolumeBuilder) VolumeMode(volMode corev1api.PersistentVolumeMode) *PersistentVolumeBuilder {
b.object.Spec.VolumeMode = &volMode
return b
}

// NodeAffinityRequired sets the PersistentVolume's NodeAffinity Requirement.
func (b *PersistentVolumeBuilder) NodeAffinityRequired(req *corev1api.NodeSelector) *PersistentVolumeBuilder {
b.object.Spec.NodeAffinity = &corev1api.VolumeNodeAffinity{
Expand Down
3 changes: 3 additions & 0 deletions pkg/cmd/cli/install/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type Options struct {
BackupStorageConfig flag.Map
VolumeSnapshotConfig flag.Map
UseNodeAgent bool
PrivilegedNodeAgent bool
//TODO remove UseRestic when migration test out of using it
UseRestic bool
Wait bool
Expand Down Expand Up @@ -109,6 +110,7 @@ func (o *Options) BindFlags(flags *pflag.FlagSet) {
flags.BoolVar(&o.RestoreOnly, "restore-only", o.RestoreOnly, "Run the server in restore-only mode. Optional.")
flags.BoolVar(&o.DryRun, "dry-run", o.DryRun, "Generate resources, but don't send them to the cluster. Use with -o. Optional.")
flags.BoolVar(&o.UseNodeAgent, "use-node-agent", o.UseNodeAgent, "Create Velero node-agent daemonset. Optional. Velero node-agent hosts Velero modules that need to run in one or more nodes(i.e. Restic, Kopia).")
flags.BoolVar(&o.PrivilegedNodeAgent, "privileged-node-agent", o.PrivilegedNodeAgent, "Use privileged mode for the node agent. Optional. Required to backup block devices.")
flags.BoolVar(&o.Wait, "wait", o.Wait, "Wait for Velero deployment to be ready. Optional.")
flags.DurationVar(&o.DefaultRepoMaintenanceFrequency, "default-repo-maintain-frequency", o.DefaultRepoMaintenanceFrequency, "How often 'maintain' is run for backup repositories by default. Optional.")
flags.DurationVar(&o.GarbageCollectionFrequency, "garbage-collection-frequency", o.GarbageCollectionFrequency, "How often the garbage collection runs for expired backups.(default 1h)")
Expand Down Expand Up @@ -195,6 +197,7 @@ func (o *Options) AsVeleroOptions() (*install.VeleroOptions, error) {
SecretData: secretData,
RestoreOnly: o.RestoreOnly,
UseNodeAgent: o.UseNodeAgent,
PrivilegedNodeAgent: o.PrivilegedNodeAgent,
UseVolumeSnapshots: o.UseVolumeSnapshots,
BSLConfig: o.BackupStorageConfig.Data(),
VSLConfig: o.VolumeSnapshotConfig.Data(),
Expand Down
28 changes: 23 additions & 5 deletions pkg/controller/data_upload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,10 @@
return ctrl.Result{}, nil
}

exposeParam := r.setupExposeParam(du)
exposeParam, err := r.setupExposeParam(du)
if err != nil {
return r.errorOut(ctx, du, err, "failed to set exposer parameters", log)
}

// Expose() will trigger to create one pod whose volume is restored by a given volume snapshot,
// but the pod maybe is not in the same node of the current controller, so we need to return it here.
Expand Down Expand Up @@ -735,18 +738,33 @@
r.dataPathMgr.RemoveAsyncBR(duName)
}

func (r *DataUploadReconciler) setupExposeParam(du *velerov2alpha1api.DataUpload) interface{} {
func (r *DataUploadReconciler) setupExposeParam(du *velerov2alpha1api.DataUpload) (interface{}, error) {
if du.Spec.SnapshotType == velerov2alpha1api.SnapshotTypeCSI {
pvc := &corev1.PersistentVolumeClaim{}
err := r.client.Get(context.Background(), types.NamespacedName{
Namespace: du.Spec.SourceNamespace,
Name: du.Spec.SourcePVC,
}, pvc)

if err != nil {
return nil, errors.Wrapf(err, "failed to get PVC %s/%s", du.Spec.SourceNamespace, du.Spec.SourcePVC)
}

accessMode := exposer.AccessModeFileSystem
if pvc.Spec.VolumeMode != nil && *pvc.Spec.VolumeMode == corev1.PersistentVolumeBlock {
accessMode = exposer.AccessModeBlock
}

Check warning on line 756 in pkg/controller/data_upload_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_upload_controller.go#L755-L756

Added lines #L755 - L756 were not covered by tests

return &exposer.CSISnapshotExposeParam{
SnapshotName: du.Spec.CSISnapshot.VolumeSnapshot,
SourceNamespace: du.Spec.SourceNamespace,
StorageClass: du.Spec.CSISnapshot.StorageClass,
HostingPodLabels: map[string]string{velerov1api.DataUploadLabel: du.Name},
AccessMode: exposer.AccessModeFileSystem,
AccessMode: accessMode,
Timeout: du.Spec.OperationTimeout.Duration,
}
}, nil
}
return nil
return nil, nil
}

func (r *DataUploadReconciler) setupWaitExposePara(du *velerov2alpha1api.DataUpload) interface{} {
Expand Down
18 changes: 17 additions & 1 deletion pkg/controller/data_upload_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ func TestReconcile(t *testing.T) {
name string
du *velerov2alpha1api.DataUpload
pod *corev1.Pod
pvc *corev1.PersistentVolumeClaim
snapshotExposerList map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer
dataMgr *datapath.Manager
expectedProcessed bool
Expand Down Expand Up @@ -345,11 +346,21 @@ func TestReconcile(t *testing.T) {
}, {
name: "Dataupload should be accepted",
du: dataUploadBuilder().Result(),
pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(),
pod: builder.ForPod("fake-ns", dataUploadName).Volumes(&corev1.Volume{Name: "test-pvc"}).Result(),
pvc: builder.ForPersistentVolumeClaim("fake-ns", "test-pvc").Result(),
expectedProcessed: false,
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(),
expectedRequeue: ctrl.Result{},
},
{
name: "Dataupload should fail to get PVC information",
du: dataUploadBuilder().Result(),
pod: builder.ForPod("fake-ns", dataUploadName).Volumes(&corev1.Volume{Name: "wrong-pvc"}).Result(),
expectedProcessed: true,
expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).Result(),
expectedRequeue: ctrl.Result{},
expectedErrMsg: "failed to get PVC",
},
{
name: "Dataupload should be prepared",
du: dataUploadBuilder().SnapshotType(fakeSnapshotType).Result(),
Expand Down Expand Up @@ -448,6 +459,11 @@ func TestReconcile(t *testing.T) {
require.NoError(t, err)
}

if test.pvc != nil {
err = r.client.Create(ctx, test.pvc)
require.NoError(t, err)
}

if test.dataMgr != nil {
r.dataPathMgr = test.dataMgr
} else {
Expand Down
15 changes: 3 additions & 12 deletions pkg/datapath/file_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,10 @@ func (fs *fileSystemBR) StartBackup(source AccessPoint, realSource string, paren
if !fs.initialized {
return errors.New("file system data path is not initialized")
}
volMode := getPersistentVolumeMode(source)

go func() {
snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, realSource, tags, forceFull, parentSnapshot, volMode, fs)
snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, realSource, tags, forceFull,
parentSnapshot, source.VolMode, fs)

if err == provider.ErrorCanceled {
fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName)
Expand All @@ -155,10 +155,8 @@ func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint) erro
return errors.New("file system data path is not initialized")
}

volMode := getPersistentVolumeMode(target)

go func() {
err := fs.uploaderProv.RunRestore(fs.ctx, snapshotID, target.ByPath, volMode, fs)
err := fs.uploaderProv.RunRestore(fs.ctx, snapshotID, target.ByPath, target.VolMode, fs)

if err == provider.ErrorCanceled {
fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName)
Expand All @@ -172,13 +170,6 @@ func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint) erro
return nil
}

func getPersistentVolumeMode(source AccessPoint) uploader.PersistentVolumeMode {
if source.ByBlock != "" {
return uploader.PersistentVolumeBlock
}
return uploader.PersistentVolumeFilesystem
}

// UpdateProgress which implement ProgressUpdater interface to update progress status
func (fs *fileSystemBR) UpdateProgress(p *uploader.Progress) {
if fs.callbacks.OnProgress != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/datapath/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type Callbacks struct {
// AccessPoint represents an access point that has been exposed to a data path instance
type AccessPoint struct {
ByPath string
ByBlock string
VolMode uploader.PersistentVolumeMode
}

// AsyncBR is the interface for asynchronous data path methods
Expand Down
Loading
Loading