From f4ccf40e4dc6db61fff6017a6fe3927d163901cf Mon Sep 17 00:00:00 2001 From: Shawn Hurley Date: Fri, 4 Aug 2023 12:49:28 -0400 Subject: [PATCH] adding block mode to uploader/provider interfaces Signed-off-by: Shawn Hurley --- changelogs/unreleased/6608-shawn-hurley | 1 + .../volume-snapshot-data-movement.md | 200 +----------------- pkg/datapath/file_system.go | 14 +- pkg/datapath/file_system_test.go | 4 +- pkg/datapath/types.go | 3 +- pkg/uploader/kopia/snapshot.go | 7 +- pkg/uploader/kopia/snapshot_test.go | 15 +- pkg/uploader/provider/kopia.go | 14 +- pkg/uploader/provider/kopia_test.go | 36 +++- pkg/uploader/provider/mocks/Provider.go | 8 +- pkg/uploader/provider/provider.go | 2 + pkg/uploader/provider/restic.go | 10 + pkg/uploader/provider/restic_test.go | 32 ++- pkg/uploader/types.go | 9 + 14 files changed, 137 insertions(+), 218 deletions(-) create mode 100644 changelogs/unreleased/6608-shawn-hurley diff --git a/changelogs/unreleased/6608-shawn-hurley b/changelogs/unreleased/6608-shawn-hurley new file mode 100644 index 0000000000..3caeca55ae --- /dev/null +++ b/changelogs/unreleased/6608-shawn-hurley @@ -0,0 +1 @@ +Add API support for volMode block, only error for now. \ No newline at end of file diff --git a/design/volume-snapshot-data-movement/volume-snapshot-data-movement.md b/design/volume-snapshot-data-movement/volume-snapshot-data-movement.md index 88207b5133..030e8ba7af 100644 --- a/design/volume-snapshot-data-movement/volume-snapshot-data-movement.md +++ b/design/volume-snapshot-data-movement/volume-snapshot-data-movement.md @@ -31,7 +31,7 @@ Moreover, we would like to create a general workflow to variations during the da ## Non-Goals -- The current support for block level access is through file system uploader, so it is not aimed to deliver features of an ultimate block level backup. Block level backup will be included in a future design +- The implementation details of block level access will be included in a future design - Most of the components are generic, but the Exposer is snapshot type specific or snapshot access specific. The current design covers the implementation details for exposing CSI snapshot to host path access only, for other types or accesses, we may need a separate design - The current workflow focuses on snapshot-based data movements. For some application/SaaS level data sources, snapshots may not be taken explicitly. We don’t take them into consideration, though we believe that some workflows or components may still be reusable. @@ -67,7 +67,7 @@ DMs take the responsibility to handle DUCR/DDCRs, Velero provides a built-in DM **Node-Agent**: Node-Agent is an existing Velero module that will be used to host VBDM. **Exposer**: Exposer is to expose the snapshot/target volume as a path/device name/endpoint that are recognizable by Velero generic data path. For different snapshot types/snapshot accesses, the Exposer may be different. This isolation guarantees that when we want to support other snapshot types/snapshot accesses, we only need to replace with a new Exposer and keep other components as is. **Velero Generic Data Path (VGDP)**: VGDP is the collective of modules that is introduced in [Unified Repository design][1]. Velero uses these modules to finish data transmission for various purposes. In includes uploaders and the backup repository. -**Uploader**: Uploader is the module in VGDP that reads data from the source and writes to backup repository for backup; while read data from backup repository and write to the restore target for restore. At present, only file system uploader is supported. In future, the block level uploader will be added. For file system and basic block uploader, only Kopia uploader will be used, Restic will not be integrated with VBDM. +**Uploader**: Uploader is the module in VGDP that reads data from the source and writes to backup repository for backup; while read data from backup repository and write to the restore target for restore. At present, only file system uploader is supported. In future, the block level uploader will be added. For file system uploader, only Kopia uploader will be used, Restic will not be integrated with VBDM. ## Replacement 3rd parties could integrate their own data movement into Velero by replacing VBDM with their own DMs. The DMs should process DUCR/DDCRs appropriately and finally put them into one of the terminal states as shown in the DataUpload CRD and DataDownload CRD sections. @@ -82,8 +82,8 @@ Below are the data movement actions and sequences during backup: Below are actions from Velero and DMP: **BIA Execute** -This the existing logic in Velero. For a source PVC/PV, Velero delivers it to the corresponding BackupItemAction plugin, the plugin then takes the related actions to back it up. -For example, the existing CSI plugin takes a CSI snapshot to the volume represented by the PVC and then returns additional items (i.e., VolumeSnapshot, VolumeSnapshotContent and VolumeSnapshotClass) for Velero to further backup. +This the existing logic in Velero. For a source PVC/PV, Velero delivers it to the corresponding BackupItemAction plugin, the plugin then takes the related actions to back it up. +For example, the existing CSI plugin takes a CSI snapshot to the volume represented by the PVC and then returns additional items (i.e., VolumeSnapshot, VolumeSnapshotContent and VolumeSnapshotClass) for Velero to further backup. To support data movement, we will use BIA V2 which supports asynchronized operation management. Here is the Execute method from BIA V2: ``` Execute(item runtime.Unstructured, backup *api.Backup) (runtime.Unstructured, []velero.ResourceIdentifier, string, []velero.ResourceIdentifier, error) @@ -115,7 +115,7 @@ Progress(operationID string, backup *api.Backup) (velero.OperationProgress, erro On the call of this method, DMP will query the DUCR’s status. Some critical progress data is transferred from DUCR to the ```OperationProgress``` which is the return value of BIA V2’s Progress method. For example, NCompleted indicates the size/number of data that have been completed and NTotal indicates the total size/number of data. When the async operation completes, the Progress method returns an OperationProgress with ```Completed``` set as true. Then Velero will persist DUCR as well as any other items returned by DUP as ```itemToUpdate```. Finally, then backup is as ```Completed```. -To help BIA Progress find the corresponding DUCR, the ```operationID``` is saved along with the DUCR as a label ```velero.io/async-operation-id```. +To help BIA Progress find the corresponding DUCR, the ```operationID``` is saved along with the DUCR as a label ```velero.io/async-operation-id```. DUCRs are handled by the data movers, so how to handle them are totally decided by the data movers. Below covers the details of VBDM, plugging data movers should have their own actions and workflows. @@ -663,196 +663,6 @@ If DM restarts, Velero has no way to detect this, DM is expected to: At present, VBDM doesn't support recovery, so it will follow the second rule. -## Kopia For Block Device -To work with block devices, VGDP will be updated. Today, when Kopia attempts to create a snapshot of the block device, it will error because kopia does not support this file type. Kopia does have a nice set of interfaces that are able to be extended though. - -To achieve the necessary information to determine the type of volume that is being used, we will need to pass in the volume mode in provider interface. - -```go -type PersistentVolumeMode string - -const ( - // PersistentVolumeBlock means the volume will not be formatted with a filesystem and will remain a raw block device. - PersistentVolumeBlock PersistentVolumeMode = "Block" - // PersistentVolumeFilesystem means the volume will be or is formatted with a filesystem. - PersistentVolumeFilesystem PersistentVolumeMode = "Filesystem" -) - -// Provider which is designed for one pod volume to do the backup or restore -type Provider interface { - // RunBackup which will do backup for one specific volume and return snapshotID, isSnapshotEmpty, error - // updater is used for updating backup progress which implement by third-party - RunBackup( - ctx context.Context, - path string, - volMode PersistentVolumeMode, - realSource string, - tags map[string]string, - forceFull bool, - parentSnapshot string, - updater uploader.ProgressUpdater) (string, bool, error) -``` - -In this case, we will extend the default kopia uploader to add the ability, when a given volume is for a block mode and is mapped as a device, we will use a new specific type that conforms to the [File](https://pkg.go.dev/github.com/kopia/kopia/fs@v0.13.0#File) interface. This new type, will handle streaming the block device to the rest of the kopia process. - -For this particular one, we will create a new `deviceEntry` and `deviceReader` structs that conform to the [fs.Reader](https://pkg.go.dev/github.com/kopia/kopia/fs@v0.13.0#Reader) and [File](https://pkg.go.dev/github.com/kopia/kopia/fs@v0.13.0#File) interfaces. - -```go -type deviceEntry struct { - path string - size int64 - kopiaEntry fs.Entry - log logrus.FieldLogger -} - -type deviceReader struct { - e *deviceEntry - file *os.File -} - -var _ fs.Reader = deviceReader{} -var _ fs.File = deviceEntry{} -``` - -Then we would create a helper, to make the `fs.Entry` that is now this deviceEntry - -```go -func getLocalBlockEntry(kopiaEntry fs.Entry, log logrus.FieldLogger) (fs.Entry, error) { - path := kopiaEntry.LocalFilesystemPath() - - fileInfo, err := os.Lstat(path) - if err != nil { - return nil, errors.Wrapf(err, "Unable to get the source device information %s", path) - } - - if (fileInfo.Sys().(*syscall.Stat_t).Mode & syscall.S_IFMT) != syscall.S_IFBLK { - return nil, errors.Errorf("Source path %s is not a block device", path) - } - - device, err := os.Open(path) - if err != nil { - if os.IsPermission(err) || err.Error() == ErrNotPermitted { - return nil, errors.Wrapf(err, "No permission to open the source device %s, make sure that node agent is running in privileged mode", path) - } - return nil, errors.Wrapf(err, "Unable to open the source device %s", path) - } - defer device.Close() - - size, err := device.Seek(0, io.SeekEnd) // seek to the end of block device to discover its size - if err != nil { - return nil, errors.Wrapf(err, "Unable to get the source device size %s", path) - } - - return fs.File(&deviceEntry{ - path: path, - size: size, - kopiaEntry: kopiaEntry, - }), nil -} -``` - -In the `pkg/uploader/kopia/snapshot.go` this is used in the Backup call like - -```go - if volMode == PersistentVolumeFilesystem { - // to be consistent with restic when backup empty dir returns one error for upper logic handle - dirs, err := os.ReadDir(source) - if err != nil { - return nil, false, errors.Wrapf(err, "Unable to read dir in path %s", source) - } else if len(dirs) == 0 { - return nil, true, nil - } - } - - source = filepath.Clean(source) - ... - - sourceEntry, err := getLocalFSEntry(source) - if err != nil { - return nil, false, errors.Wrap(err, "Unable to get local filesystem entry") - } - - if volMode == PersistentVolumeBlock { - sourceEntry, err = getLocalBlockEntry(sourceEntry, log) - if err != nil { - return nil, false, errors.Wrap(err, "Unable to get local block device entry") - } - } - - ... - snapID, snapshotSize, err := SnapshotSource(kopiaCtx, repoWriter, fsUploader, sourceInfo, sourceEntry, forceFull, parentSnapshot, tags, log, "Kopia Uploader") - -``` - -While restoring, we are able extend the [Output](https://pkg.go.dev/github.com/kopia/kopia@v0.13.0/snapshot/restore#Output) interface. This will allow us to handle the case where we are writing to the device - -We will create a new block device Output struct that encapsulates the output. - -```go -type deviceOutput struct { - localOuput restore.FileSystemOutput -} - -.... For all functions besides write, call the underlying localOutput - -func (d *deviceOutput) WriteFile(ctx context.Context, relativePath string, e fs.File) error { - ... handle writing to device driver -} - -``` - -In the `pkg/uploader/kopia/snapshot.go` this is used in the Restore call like - -```go -output := &restore.FilesystemOutput{ - TargetPath: path, - OverwriteDirectories: true, - OverwriteFiles: true, - OverwriteSymlinks: true, - IgnorePermissionErrors: true, - } - - if volMode == PersistentVolumeBlock { - output = deviceOutput{ - localOutput: output, - } - } - -err = output.Init(ctx) - -... - stat, err := restoreEntryFunc(kopiaCtx, rep, output, rootEntry, restore.Options{ - Parallel: runtime.NumCPU(), - RestoreDirEntryAtDepth: math.MaxInt32, - Cancel: cancleCh, - ProgressCallback: func(ctx context.Context, stats restore.Stats) { - progress.ProgressBytes(stats.RestoredTotalFileSize, stats.EnqueuedTotalFileSize) - }, - }) -``` - - -While extending these interfaces, we will use the defaults except for when it is for a block device. - -Of note, we do need to add root access to the daemon set node agent to access the new mount. - -```yaml -... - - mountPath: /var/lib/kubelet/plugins - mountPropagation: HostToContainer - name: host-plugins - -.... - - hostPath: - path: /var/lib/kubelet/plugins - name: host-plugins - -... - SecurityContext: &corev1.SecurityContext{ - Privileged: &c.privilegedAgent, - }, - -``` ## Plugin Data Movers There should be only one DM to handle a specific DUCR/DDCR in all cases. If more than one DMs process a DUCR/DDCR at the same time, there will be a disaster. diff --git a/pkg/datapath/file_system.go b/pkg/datapath/file_system.go index 4e4b16aee0..741f6ae086 100644 --- a/pkg/datapath/file_system.go +++ b/pkg/datapath/file_system.go @@ -133,9 +133,10 @@ func (fs *fileSystemBR) StartBackup(source AccessPoint, realSource string, paren if !fs.initialized { return errors.New("file system data path is not initialized") } + volMode := getPersistentVolumeMode(source) go func() { - snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, realSource, tags, forceFull, parentSnapshot, fs) + snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, realSource, tags, forceFull, parentSnapshot, volMode, fs) if err == provider.ErrorCanceled { fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName) @@ -154,8 +155,10 @@ func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint) erro return errors.New("file system data path is not initialized") } + volMode := getPersistentVolumeMode(target) + go func() { - err := fs.uploaderProv.RunRestore(fs.ctx, snapshotID, target.ByPath, fs) + err := fs.uploaderProv.RunRestore(fs.ctx, snapshotID, target.ByPath, volMode, fs) if err == provider.ErrorCanceled { fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName) @@ -169,6 +172,13 @@ func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint) erro return nil } +func getPersistentVolumeMode(source AccessPoint) uploader.PersistentVolumeMode { + if source.ByBlock != "" { + return uploader.PersistentVolumeBlock + } + return uploader.PersistentVolumeFilesystem +} + // UpdateProgress which implement ProgressUpdater interface to update progress status func (fs *fileSystemBR) UpdateProgress(p *uploader.Progress) { if fs.callbacks.OnProgress != nil { diff --git a/pkg/datapath/file_system_test.go b/pkg/datapath/file_system_test.go index 072f1996c6..c8e4b881cd 100644 --- a/pkg/datapath/file_system_test.go +++ b/pkg/datapath/file_system_test.go @@ -95,7 +95,7 @@ func TestAsyncBackup(t *testing.T) { t.Run(test.name, func(t *testing.T) { fs := newFileSystemBR("job-1", "test", nil, "velero", Callbacks{}, velerotest.NewLogger()).(*fileSystemBR) mockProvider := providerMock.NewProvider(t) - mockProvider.On("RunBackup", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.result.Backup.SnapshotID, test.result.Backup.EmptySnapshot, test.err) + mockProvider.On("RunBackup", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.result.Backup.SnapshotID, test.result.Backup.EmptySnapshot, test.err) fs.uploaderProv = mockProvider fs.initialized = true fs.callbacks = test.callbacks @@ -178,7 +178,7 @@ func TestAsyncRestore(t *testing.T) { t.Run(test.name, func(t *testing.T) { fs := newFileSystemBR("job-1", "test", nil, "velero", Callbacks{}, velerotest.NewLogger()).(*fileSystemBR) mockProvider := providerMock.NewProvider(t) - mockProvider.On("RunRestore", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.err) + mockProvider.On("RunRestore", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.err) fs.uploaderProv = mockProvider fs.initialized = true fs.callbacks = test.callbacks diff --git a/pkg/datapath/types.go b/pkg/datapath/types.go index 4f85fceecb..431fccc5dc 100644 --- a/pkg/datapath/types.go +++ b/pkg/datapath/types.go @@ -52,7 +52,8 @@ type Callbacks struct { // AccessPoint represents an access point that has been exposed to a data path instance type AccessPoint struct { - ByPath string + ByPath string + ByBlock string } // AsyncBR is the interface for asynchronous data path methods diff --git a/pkg/uploader/kopia/snapshot.go b/pkg/uploader/kopia/snapshot.go index 85000ad00b..32ec54ccd0 100644 --- a/pkg/uploader/kopia/snapshot.go +++ b/pkg/uploader/kopia/snapshot.go @@ -88,10 +88,15 @@ func setupDefaultPolicy() *policy.Tree { // Backup backup specific sourcePath and update progress func Backup(ctx context.Context, fsUploader SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, - forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) { + forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) { if fsUploader == nil { return nil, false, errors.New("get empty kopia uploader") } + + if volMode == uploader.PersistentVolumeBlock { + return nil, false, errors.New("unable to handle block storage") + } + dir, err := filepath.Abs(sourcePath) if err != nil { return nil, false, errors.Wrapf(err, "Invalid source path '%s'", sourcePath) diff --git a/pkg/uploader/kopia/snapshot_test.go b/pkg/uploader/kopia/snapshot_test.go index 0d352ab6d3..2093b55d57 100644 --- a/pkg/uploader/kopia/snapshot_test.go +++ b/pkg/uploader/kopia/snapshot_test.go @@ -564,6 +564,7 @@ func TestBackup(t *testing.T) { isSnapshotSourceError bool expectedError error expectedEmpty bool + volMode uploader.PersistentVolumeMode } manifest := &snapshot.Manifest{ ID: "test", @@ -590,10 +591,20 @@ func TestBackup(t *testing.T) { tags: nil, expectedError: errors.New("Unable to read dir"), }, + { + name: "Unable to handle block mode", + sourcePath: "/", + tags: nil, + volMode: uploader.PersistentVolumeBlock, + expectedError: errors.New("unable to handle block storage"), + }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { + if tc.volMode == "" { + tc.volMode = uploader.PersistentVolumeFilesystem + } s := injectSnapshotFuncs() args := []mockArgs{ {methodName: "LoadSnapshot", returns: []interface{}{manifest, nil}}, @@ -616,9 +627,9 @@ func TestBackup(t *testing.T) { var snapshotInfo *uploader.SnapshotInfo var err error if tc.isEmptyUploader { - snapshotInfo, isSnapshotEmpty, err = Backup(context.Background(), nil, s.repoWriterMock, tc.sourcePath, "", tc.forceFull, tc.parentSnapshot, tc.tags, &logrus.Logger{}) + snapshotInfo, isSnapshotEmpty, err = Backup(context.Background(), nil, s.repoWriterMock, tc.sourcePath, "", tc.forceFull, tc.parentSnapshot, tc.volMode, tc.tags, &logrus.Logger{}) } else { - snapshotInfo, isSnapshotEmpty, err = Backup(context.Background(), s.uploderMock, s.repoWriterMock, tc.sourcePath, "", tc.forceFull, tc.parentSnapshot, tc.tags, &logrus.Logger{}) + snapshotInfo, isSnapshotEmpty, err = Backup(context.Background(), s.uploderMock, s.repoWriterMock, tc.sourcePath, "", tc.forceFull, tc.parentSnapshot, tc.volMode, tc.tags, &logrus.Logger{}) } // Check if the returned error matches the expected error if tc.expectedError != nil { diff --git a/pkg/uploader/provider/kopia.go b/pkg/uploader/provider/kopia.go index 0a241c4738..37882859ec 100644 --- a/pkg/uploader/provider/kopia.go +++ b/pkg/uploader/provider/kopia.go @@ -118,6 +118,7 @@ func (kp *kopiaProvider) RunBackup( tags map[string]string, forceFull bool, parentSnapshot string, + volMode uploader.PersistentVolumeMode, updater uploader.ProgressUpdater) (string, bool, error) { if updater == nil { return "", false, errors.New("Need to initial backup progress updater first") @@ -127,6 +128,11 @@ func (kp *kopiaProvider) RunBackup( return "", false, errors.New("path is empty") } + // For now, error on block mode + if volMode == uploader.PersistentVolumeBlock { + return "", false, errors.New("unable to currently support block mode") + } + log := kp.log.WithFields(logrus.Fields{ "path": path, "realSource": realSource, @@ -153,7 +159,7 @@ func (kp *kopiaProvider) RunBackup( tags[uploader.SnapshotRequesterTag] = kp.requestorType tags[uploader.SnapshotUploaderTag] = uploader.KopiaType - snapshotInfo, isSnapshotEmpty, err := BackupFunc(ctx, kpUploader, repoWriter, path, realSource, forceFull, parentSnapshot, tags, log) + snapshotInfo, isSnapshotEmpty, err := BackupFunc(ctx, kpUploader, repoWriter, path, realSource, forceFull, parentSnapshot, volMode, tags, log) if err != nil { if kpUploader.IsCanceled() { log.Error("Kopia backup is canceled") @@ -197,11 +203,17 @@ func (kp *kopiaProvider) RunRestore( ctx context.Context, snapshotID string, volumePath string, + volMode uploader.PersistentVolumeMode, updater uploader.ProgressUpdater) error { log := kp.log.WithFields(logrus.Fields{ "snapshotID": snapshotID, "volumePath": volumePath, }) + + if volMode == uploader.PersistentVolumeBlock { + return errors.New("unable to currently support block mode") + } + repoWriter := kopia.NewShimRepo(kp.bkRepo) progress := new(kopia.Progress) progress.InitThrottle(restoreProgressCheckInterval) diff --git a/pkg/uploader/provider/kopia_test.go b/pkg/uploader/provider/kopia_test.go index a1105cbe8e..944cdbcceb 100644 --- a/pkg/uploader/provider/kopia_test.go +++ b/pkg/uploader/provider/kopia_test.go @@ -68,35 +68,47 @@ func TestRunBackup(t *testing.T) { testCases := []struct { name string - hookBackupFunc func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) + hookBackupFunc func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) + volMode uploader.PersistentVolumeMode notError bool }{ { name: "success to backup", - hookBackupFunc: func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) { + hookBackupFunc: func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) { return &uploader.SnapshotInfo{}, false, nil }, notError: true, }, { name: "get error to backup", - hookBackupFunc: func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) { + hookBackupFunc: func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) { return &uploader.SnapshotInfo{}, false, errors.New("failed to backup") }, notError: false, }, { name: "got empty snapshot", - hookBackupFunc: func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) { + hookBackupFunc: func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) { return nil, true, errors.New("snapshot is empty") }, notError: false, }, + { + name: "error on vol mode", + hookBackupFunc: func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) { + return nil, true, nil + }, + volMode: uploader.PersistentVolumeBlock, + notError: false, + }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { + if tc.volMode == "" { + tc.volMode = uploader.PersistentVolumeFilesystem + } BackupFunc = tc.hookBackupFunc - _, _, err := kp.RunBackup(context.Background(), "var", "", nil, false, "", &updater) + _, _, err := kp.RunBackup(context.Background(), "var", "", nil, false, "", tc.volMode, &updater) if tc.notError { assert.NoError(t, err) } else { @@ -115,6 +127,7 @@ func TestRunRestore(t *testing.T) { name string hookRestoreFunc func(ctx context.Context, rep repo.RepositoryWriter, progress *kopia.Progress, snapshotID, dest string, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) notError bool + volMode uploader.PersistentVolumeMode }{ { name: "normal restore", @@ -130,12 +143,23 @@ func TestRunRestore(t *testing.T) { }, notError: false, }, + { + name: "failed to restore block mode", + hookRestoreFunc: func(ctx context.Context, rep repo.RepositoryWriter, progress *kopia.Progress, snapshotID, dest string, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) { + return 0, 0, errors.New("failed to restore") + }, + volMode: uploader.PersistentVolumeBlock, + notError: false, + }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { + if tc.volMode == "" { + tc.volMode = uploader.PersistentVolumeFilesystem + } RestoreFunc = tc.hookRestoreFunc - err := kp.RunRestore(context.Background(), "", "/var", &updater) + err := kp.RunRestore(context.Background(), "", "/var", tc.volMode, &updater) if tc.notError { assert.NoError(t, err) } else { diff --git a/pkg/uploader/provider/mocks/Provider.go b/pkg/uploader/provider/mocks/Provider.go index 54a461ea14..a15bd940ff 100644 --- a/pkg/uploader/provider/mocks/Provider.go +++ b/pkg/uploader/provider/mocks/Provider.go @@ -30,8 +30,8 @@ func (_m *Provider) Close(ctx context.Context) error { } // RunBackup provides a mock function with given fields: ctx, path, realSource, tags, forceFull, parentSnapshot, updater -func (_m *Provider) RunBackup(ctx context.Context, path string, realSource string, tags map[string]string, forceFull bool, parentSnapshot string, updater uploader.ProgressUpdater) (string, bool, error) { - ret := _m.Called(ctx, path, realSource, tags, forceFull, parentSnapshot, updater) +func (_m *Provider) RunBackup(ctx context.Context, path string, realSource string, tags map[string]string, forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, updater uploader.ProgressUpdater) (string, bool, error) { + ret := _m.Called(ctx, path, realSource, tags, forceFull, parentSnapshot, volMode, updater) var r0 string var r1 bool @@ -61,8 +61,8 @@ func (_m *Provider) RunBackup(ctx context.Context, path string, realSource strin } // RunRestore provides a mock function with given fields: ctx, snapshotID, volumePath, updater -func (_m *Provider) RunRestore(ctx context.Context, snapshotID string, volumePath string, updater uploader.ProgressUpdater) error { - ret := _m.Called(ctx, snapshotID, volumePath, updater) +func (_m *Provider) RunRestore(ctx context.Context, snapshotID string, volumePath string, volMode uploader.PersistentVolumeMode, updater uploader.ProgressUpdater) error { + ret := _m.Called(ctx, snapshotID, volumePath, volMode, updater) var r0 error if rf, ok := ret.Get(0).(func(context.Context, string, string, uploader.ProgressUpdater) error); ok { diff --git a/pkg/uploader/provider/provider.go b/pkg/uploader/provider/provider.go index e3373936d5..09ff3f162e 100644 --- a/pkg/uploader/provider/provider.go +++ b/pkg/uploader/provider/provider.go @@ -48,6 +48,7 @@ type Provider interface { tags map[string]string, forceFull bool, parentSnapshot string, + volMode uploader.PersistentVolumeMode, updater uploader.ProgressUpdater) (string, bool, error) // RunRestore which will do restore for one specific volume with given snapshot id and return error // updater is used for updating backup progress which implement by third-party @@ -55,6 +56,7 @@ type Provider interface { ctx context.Context, snapshotID string, volumePath string, + volMode uploader.PersistentVolumeMode, updater uploader.ProgressUpdater) error // Close which will close related repository Close(ctx context.Context) error diff --git a/pkg/uploader/provider/restic.go b/pkg/uploader/provider/restic.go index 22e0f5b40a..243597a545 100644 --- a/pkg/uploader/provider/restic.go +++ b/pkg/uploader/provider/restic.go @@ -121,6 +121,7 @@ func (rp *resticProvider) RunBackup( tags map[string]string, forceFull bool, parentSnapshot string, + volMode uploader.PersistentVolumeMode, updater uploader.ProgressUpdater) (string, bool, error) { if updater == nil { return "", false, errors.New("Need to initial backup progress updater first") @@ -134,6 +135,10 @@ func (rp *resticProvider) RunBackup( return "", false, errors.New("real source is not empty, this is not supported by restic uploader") } + if volMode == uploader.PersistentVolumeBlock { + return "", false, errors.New("unable to currently support block mode") + } + log := rp.log.WithFields(logrus.Fields{ "path": path, "parentSnapshot": parentSnapshot, @@ -179,6 +184,7 @@ func (rp *resticProvider) RunRestore( ctx context.Context, snapshotID string, volumePath string, + volMode uploader.PersistentVolumeMode, updater uploader.ProgressUpdater) error { if updater == nil { return errors.New("Need to initial backup progress updater first") @@ -188,6 +194,10 @@ func (rp *resticProvider) RunRestore( "volumePath": volumePath, }) + if volMode == uploader.PersistentVolumeBlock { + return errors.New("unable to support block mode") + } + restoreCmd := resticRestoreCMDFunc(rp.repoIdentifier, rp.credentialsFile, snapshotID, volumePath) restoreCmd.Env = rp.cmdEnv restoreCmd.CACertFile = rp.caCertFile diff --git a/pkg/uploader/provider/restic_test.go b/pkg/uploader/provider/restic_test.go index eaf0273ff5..b0408c544b 100644 --- a/pkg/uploader/provider/restic_test.go +++ b/pkg/uploader/provider/restic_test.go @@ -45,6 +45,7 @@ func TestResticRunBackup(t *testing.T) { nilUpdater bool parentSnapshot string rp *resticProvider + volMode uploader.PersistentVolumeMode hookBackupFunc func(string, string, string, map[string]string) *restic.Command hookResticBackupFunc func(*restic.Command, logrus.FieldLogger, uploader.ProgressUpdater) (string, string, error) hookResticGetSnapshotFunc func(string, string, map[string]string) *restic.Command @@ -117,6 +118,14 @@ func TestResticRunBackup(t *testing.T) { return strings.Contains(err.Error(), "failed to get snapshot id") }, }, + { + name: "failed to use block mode", + rp: &resticProvider{log: logrus.New(), extraFlags: []string{"testFlags"}}, + volMode: uploader.PersistentVolumeBlock, + errorHandleFunc: func(err error) bool { + return strings.Contains(err.Error(), "unable to currently support block mode") + }, + }, } for _, tc := range testCases { @@ -135,11 +144,14 @@ func TestResticRunBackup(t *testing.T) { if tc.hookResticGetSnapshotIDFunc != nil { resticGetSnapshotIDFunc = tc.hookResticGetSnapshotIDFunc } + if tc.volMode == "" { + tc.volMode = uploader.PersistentVolumeFilesystem + } if !tc.nilUpdater { updater := FakeBackupProgressUpdater{PodVolumeBackup: &velerov1api.PodVolumeBackup{}, Log: tc.rp.log, Ctx: context.Background(), Cli: fake.NewClientBuilder().WithScheme(scheme.Scheme).Build()} - _, _, err = tc.rp.RunBackup(context.Background(), "var", "", map[string]string{}, false, parentSnapshot, &updater) + _, _, err = tc.rp.RunBackup(context.Background(), "var", "", map[string]string{}, false, parentSnapshot, tc.volMode, &updater) } else { - _, _, err = tc.rp.RunBackup(context.Background(), "var", "", map[string]string{}, false, parentSnapshot, nil) + _, _, err = tc.rp.RunBackup(context.Background(), "var", "", map[string]string{}, false, parentSnapshot, tc.volMode, nil) } tc.rp.log.Infof("test name %v error %v", tc.name, err) @@ -158,6 +170,7 @@ func TestResticRunRestore(t *testing.T) { nilUpdater bool hookResticRestoreFunc func(repoIdentifier, passwordFile, snapshotID, target string) *restic.Command errorHandleFunc func(err error) bool + volMode uploader.PersistentVolumeMode }{ { name: "wrong restic execute command", @@ -187,17 +200,28 @@ func TestResticRunRestore(t *testing.T) { return strings.Contains(err.Error(), "error running command") }, }, + { + name: "error block volume mode", + rp: &resticProvider{log: logrus.New()}, + errorHandleFunc: func(err error) bool { + return strings.Contains(err.Error(), "unable to currently support block mode") + }, + volMode: uploader.PersistentVolumeBlock, + }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { resticRestoreCMDFunc = tc.hookResticRestoreFunc + if tc.volMode == "" { + tc.volMode = uploader.PersistentVolumeFilesystem + } var err error if !tc.nilUpdater { updater := FakeBackupProgressUpdater{PodVolumeBackup: &velerov1api.PodVolumeBackup{}, Log: tc.rp.log, Ctx: context.Background(), Cli: fake.NewClientBuilder().WithScheme(scheme.Scheme).Build()} - err = tc.rp.RunRestore(context.Background(), "", "var", &updater) + err = tc.rp.RunRestore(context.Background(), "", "var", tc.volMode, &updater) } else { - err = tc.rp.RunRestore(context.Background(), "", "var", nil) + err = tc.rp.RunRestore(context.Background(), "", "var", tc.volMode, nil) } tc.rp.log.Infof("test name %v error %v", tc.name, err) diff --git a/pkg/uploader/types.go b/pkg/uploader/types.go index 8df77fb3d5..02106e266e 100644 --- a/pkg/uploader/types.go +++ b/pkg/uploader/types.go @@ -28,6 +28,15 @@ const ( SnapshotUploaderTag = "snapshot-uploader" ) +type PersistentVolumeMode string + +const ( + // PersistentVolumeBlock means the volume will not be formatted with a filesystem and will remain a raw block device. + PersistentVolumeBlock PersistentVolumeMode = "Block" + // PersistentVolumeFilesystem means the volume will be or is formatted with a filesystem. + PersistentVolumeFilesystem PersistentVolumeMode = "Filesystem" +) + // ValidateUploaderType validates if the input param is a valid uploader type. // It will return an error if it's invalid. func ValidateUploaderType(t string) error {