Skip to content

Commit

Permalink
adding block mode to uploader/provider interfaces
Browse files Browse the repository at this point in the history
Signed-off-by: Shawn Hurley <shawn@hurley.page>
  • Loading branch information
shawn-hurley committed Aug 9, 2023
1 parent b21aef6 commit d58a9fb
Show file tree
Hide file tree
Showing 14 changed files with 127 additions and 214 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/6608-shawn-hurley
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Design update to handle block devices during the kopia data mover
200 changes: 5 additions & 195 deletions design/volume-snapshot-data-movement/volume-snapshot-data-movement.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Moreover, we would like to create a general workflow to variations during the da

## Non-Goals

- The current support for block level access is through file system uploader, so it is not aimed to deliver features of an ultimate block level backup. Block level backup will be included in a future design
- The implementation details of block level access will be included in a future design
- Most of the components are generic, but the Exposer is snapshot type specific or snapshot access specific. The current design covers the implementation details for exposing CSI snapshot to host path access only, for other types or accesses, we may need a separate design
- The current workflow focuses on snapshot-based data movements. For some application/SaaS level data sources, snapshots may not be taken explicitly. We don’t take them into consideration, though we believe that some workflows or components may still be reusable.

Expand Down Expand Up @@ -67,7 +67,7 @@ DMs take the responsibility to handle DUCR/DDCRs, Velero provides a built-in DM
**Node-Agent**: Node-Agent is an existing Velero module that will be used to host VBDM.
**Exposer**: Exposer is to expose the snapshot/target volume as a path/device name/endpoint that are recognizable by Velero generic data path. For different snapshot types/snapshot accesses, the Exposer may be different. This isolation guarantees that when we want to support other snapshot types/snapshot accesses, we only need to replace with a new Exposer and keep other components as is.
**Velero Generic Data Path (VGDP)**: VGDP is the collective of modules that is introduced in [Unified Repository design][1]. Velero uses these modules to finish data transmission for various purposes. In includes uploaders and the backup repository.
**Uploader**: Uploader is the module in VGDP that reads data from the source and writes to backup repository for backup; while read data from backup repository and write to the restore target for restore. At present, only file system uploader is supported. In future, the block level uploader will be added. For file system and basic block uploader, only Kopia uploader will be used, Restic will not be integrated with VBDM.
**Uploader**: Uploader is the module in VGDP that reads data from the source and writes to backup repository for backup; while read data from backup repository and write to the restore target for restore. At present, only file system uploader is supported. In future, the block level uploader will be added. For file system uploader, only Kopia uploader will be used, Restic will not be integrated with VBDM.

## Replacement
3rd parties could integrate their own data movement into Velero by replacing VBDM with their own DMs. The DMs should process DUCR/DDCRs appropriately and finally put them into one of the terminal states as shown in the DataUpload CRD and DataDownload CRD sections.
Expand All @@ -82,8 +82,8 @@ Below are the data movement actions and sequences during backup:
Below are actions from Velero and DMP:

**BIA Execute**
This the existing logic in Velero. For a source PVC/PV, Velero delivers it to the corresponding BackupItemAction plugin, the plugin then takes the related actions to back it up.
For example, the existing CSI plugin takes a CSI snapshot to the volume represented by the PVC and then returns additional items (i.e., VolumeSnapshot, VolumeSnapshotContent and VolumeSnapshotClass) for Velero to further backup.
This the existing logic in Velero. For a source PVC/PV, Velero delivers it to the corresponding BackupItemAction plugin, the plugin then takes the related actions to back it up.
For example, the existing CSI plugin takes a CSI snapshot to the volume represented by the PVC and then returns additional items (i.e., VolumeSnapshot, VolumeSnapshotContent and VolumeSnapshotClass) for Velero to further backup.
To support data movement, we will use BIA V2 which supports asynchronized operation management. Here is the Execute method from BIA V2:
```
Execute(item runtime.Unstructured, backup *api.Backup) (runtime.Unstructured, []velero.ResourceIdentifier, string, []velero.ResourceIdentifier, error)
Expand Down Expand Up @@ -115,7 +115,7 @@ Progress(operationID string, backup *api.Backup) (velero.OperationProgress, erro
On the call of this method, DMP will query the DUCR’s status. Some critical progress data is transferred from DUCR to the ```OperationProgress``` which is the return value of BIA V2’s Progress method. For example, NCompleted indicates the size/number of data that have been completed and NTotal indicates the total size/number of data.
When the async operation completes, the Progress method returns an OperationProgress with ```Completed``` set as true. Then Velero will persist DUCR as well as any other items returned by DUP as ```itemToUpdate```.
Finally, then backup is as ```Completed```.
To help BIA Progress find the corresponding DUCR, the ```operationID``` is saved along with the DUCR as a label ```velero.io/async-operation-id```.
To help BIA Progress find the corresponding DUCR, the ```operationID``` is saved along with the DUCR as a label ```velero.io/async-operation-id```.

DUCRs are handled by the data movers, so how to handle them are totally decided by the data movers. Below covers the details of VBDM, plugging data movers should have their own actions and workflows.

Expand Down Expand Up @@ -663,196 +663,6 @@ If DM restarts, Velero has no way to detect this, DM is expected to:

At present, VBDM doesn't support recovery, so it will follow the second rule.

## Kopia For Block Device
To work with block devices, VGDP will be updated. Today, when Kopia attempts to create a snapshot of the block device, it will error because kopia does not support this file type. Kopia does have a nice set of interfaces that are able to be extended though.

To achieve the necessary information to determine the type of volume that is being used, we will need to pass in the volume mode in provider interface.

```go
type PersistentVolumeMode string

const (
// PersistentVolumeBlock means the volume will not be formatted with a filesystem and will remain a raw block device.
PersistentVolumeBlock PersistentVolumeMode = "Block"
// PersistentVolumeFilesystem means the volume will be or is formatted with a filesystem.
PersistentVolumeFilesystem PersistentVolumeMode = "Filesystem"
)

// Provider which is designed for one pod volume to do the backup or restore
type Provider interface {
// RunBackup which will do backup for one specific volume and return snapshotID, isSnapshotEmpty, error
// updater is used for updating backup progress which implement by third-party
RunBackup(
ctx context.Context,
path string,
volMode PersistentVolumeMode,
realSource string,
tags map[string]string,
forceFull bool,
parentSnapshot string,
updater uploader.ProgressUpdater) (string, bool, error)
```
In this case, we will extend the default kopia uploader to add the ability, when a given volume is for a block mode and is mapped as a device, we will use a new specific type that conforms to the [File](https://pkg.go.dev/github.com/kopia/kopia/fs@v0.13.0#File) interface. This new type, will handle streaming the block device to the rest of the kopia process.
For this particular one, we will create a new `deviceEntry` and `deviceReader` structs that conform to the [fs.Reader](https://pkg.go.dev/github.com/kopia/kopia/fs@v0.13.0#Reader) and [File](https://pkg.go.dev/github.com/kopia/kopia/fs@v0.13.0#File) interfaces.
```go
type deviceEntry struct {
path string
size int64
kopiaEntry fs.Entry
log logrus.FieldLogger
}

type deviceReader struct {
e *deviceEntry
file *os.File
}

var _ fs.Reader = deviceReader{}
var _ fs.File = deviceEntry{}
```
Then we would create a helper, to make the `fs.Entry` that is now this deviceEntry
```go
func getLocalBlockEntry(kopiaEntry fs.Entry, log logrus.FieldLogger) (fs.Entry, error) {
path := kopiaEntry.LocalFilesystemPath()

fileInfo, err := os.Lstat(path)
if err != nil {
return nil, errors.Wrapf(err, "Unable to get the source device information %s", path)
}

if (fileInfo.Sys().(*syscall.Stat_t).Mode & syscall.S_IFMT) != syscall.S_IFBLK {
return nil, errors.Errorf("Source path %s is not a block device", path)
}

device, err := os.Open(path)
if err != nil {
if os.IsPermission(err) || err.Error() == ErrNotPermitted {
return nil, errors.Wrapf(err, "No permission to open the source device %s, make sure that node agent is running in privileged mode", path)
}
return nil, errors.Wrapf(err, "Unable to open the source device %s", path)
}
defer device.Close()

size, err := device.Seek(0, io.SeekEnd) // seek to the end of block device to discover its size
if err != nil {
return nil, errors.Wrapf(err, "Unable to get the source device size %s", path)
}

return fs.File(&deviceEntry{
path: path,
size: size,
kopiaEntry: kopiaEntry,
}), nil
}
```
In the `pkg/uploader/kopia/snapshot.go` this is used in the Backup call like
```go
if volMode == PersistentVolumeFilesystem {
// to be consistent with restic when backup empty dir returns one error for upper logic handle
dirs, err := os.ReadDir(source)
if err != nil {
return nil, false, errors.Wrapf(err, "Unable to read dir in path %s", source)
} else if len(dirs) == 0 {
return nil, true, nil
}
}

source = filepath.Clean(source)
...

sourceEntry, err := getLocalFSEntry(source)
if err != nil {
return nil, false, errors.Wrap(err, "Unable to get local filesystem entry")
}

if volMode == PersistentVolumeBlock {
sourceEntry, err = getLocalBlockEntry(sourceEntry, log)
if err != nil {
return nil, false, errors.Wrap(err, "Unable to get local block device entry")
}
}

...
snapID, snapshotSize, err := SnapshotSource(kopiaCtx, repoWriter, fsUploader, sourceInfo, sourceEntry, forceFull, parentSnapshot, tags, log, "Kopia Uploader")

```
While restoring, we are able extend the [Output](https://pkg.go.dev/github.com/kopia/kopia@v0.13.0/snapshot/restore#Output) interface. This will allow us to handle the case where we are writing to the device
We will create a new block device Output struct that encapsulates the output.
```go
type deviceOutput struct {
localOuput restore.FileSystemOutput
}

.... For all functions besides write, call the underlying localOutput

func (d *deviceOutput) WriteFile(ctx context.Context, relativePath string, e fs.File) error {
... handle writing to device driver
}

```
In the `pkg/uploader/kopia/snapshot.go` this is used in the Restore call like
```go
output := &restore.FilesystemOutput{
TargetPath: path,
OverwriteDirectories: true,
OverwriteFiles: true,
OverwriteSymlinks: true,
IgnorePermissionErrors: true,
}

if volMode == PersistentVolumeBlock {
output = deviceOutput{
localOutput: output,
}
}

err = output.Init(ctx)

...
stat, err := restoreEntryFunc(kopiaCtx, rep, output, rootEntry, restore.Options{
Parallel: runtime.NumCPU(),
RestoreDirEntryAtDepth: math.MaxInt32,
Cancel: cancleCh,
ProgressCallback: func(ctx context.Context, stats restore.Stats) {
progress.ProgressBytes(stats.RestoredTotalFileSize, stats.EnqueuedTotalFileSize)
},
})
```
While extending these interfaces, we will use the defaults except for when it is for a block device.
Of note, we do need to add root access to the daemon set node agent to access the new mount.
```yaml
...
- mountPath: /var/lib/kubelet/plugins
mountPropagation: HostToContainer
name: host-plugins

....
- hostPath:
path: /var/lib/kubelet/plugins
name: host-plugins

...
SecurityContext: &corev1.SecurityContext{
Privileged: &c.privilegedAgent,
},

```

## Plugin Data Movers
There should be only one DM to handle a specific DUCR/DDCR in all cases. If more than one DMs process a DUCR/DDCR at the same time, there will be a disaster.
Expand Down
6 changes: 5 additions & 1 deletion pkg/datapath/file_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,13 @@ func (fs *fileSystemBR) StartBackup(source AccessPoint, realSource string, paren
if !fs.initialized {
return errors.New("file system data path is not initialized")
}
volMode := uploader.PersistentVolumeFilesystem
if source.ByBlock != "" {
volMode = uploader.PersistentVolumeBlock
}

go func() {
snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, realSource, tags, forceFull, parentSnapshot, fs)
snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, realSource, tags, forceFull, parentSnapshot, volMode, fs)

if err == provider.ErrorCanceled {
fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName)
Expand Down
2 changes: 1 addition & 1 deletion pkg/datapath/file_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestAsyncBackup(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
fs := newFileSystemBR("job-1", "test", nil, "velero", Callbacks{}, velerotest.NewLogger()).(*fileSystemBR)
mockProvider := providerMock.NewProvider(t)
mockProvider.On("RunBackup", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.result.Backup.SnapshotID, test.result.Backup.EmptySnapshot, test.err)
mockProvider.On("RunBackup", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.result.Backup.SnapshotID, test.result.Backup.EmptySnapshot, test.err)
fs.uploaderProv = mockProvider
fs.initialized = true
fs.callbacks = test.callbacks
Expand Down
3 changes: 2 additions & 1 deletion pkg/datapath/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ type Callbacks struct {

// AccessPoint represents an access point that has been exposed to a data path instance
type AccessPoint struct {
ByPath string
ByPath string
ByBlock string
}

// AsyncBR is the interface for asynchronous data path methods
Expand Down
7 changes: 6 additions & 1 deletion pkg/uploader/kopia/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,15 @@ func setupDefaultPolicy() *policy.Tree {

// Backup backup specific sourcePath and update progress
func Backup(ctx context.Context, fsUploader SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string,
forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
if fsUploader == nil {
return nil, false, errors.New("get empty kopia uploader")
}

if volMode == uploader.PersistentVolumeBlock {
return nil, false, errors.New("unable to handle block storage")
}

dir, err := filepath.Abs(sourcePath)
if err != nil {
return nil, false, errors.Wrapf(err, "Invalid source path '%s'", sourcePath)
Expand Down
15 changes: 13 additions & 2 deletions pkg/uploader/kopia/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,7 @@ func TestBackup(t *testing.T) {
isSnapshotSourceError bool
expectedError error
expectedEmpty bool
volMode uploader.PersistentVolumeMode
}
manifest := &snapshot.Manifest{
ID: "test",
Expand All @@ -590,10 +591,20 @@ func TestBackup(t *testing.T) {
tags: nil,
expectedError: errors.New("Unable to read dir"),
},
{
name: "Unable to handle block mode",
sourcePath: "/",
tags: nil,
volMode: uploader.PersistentVolumeBlock,
expectedError: errors.New("unable to handle block storage"),
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
if tc.volMode == "" {
tc.volMode = uploader.PersistentVolumeFilesystem
}
s := injectSnapshotFuncs()
args := []mockArgs{
{methodName: "LoadSnapshot", returns: []interface{}{manifest, nil}},
Expand All @@ -616,9 +627,9 @@ func TestBackup(t *testing.T) {
var snapshotInfo *uploader.SnapshotInfo
var err error
if tc.isEmptyUploader {
snapshotInfo, isSnapshotEmpty, err = Backup(context.Background(), nil, s.repoWriterMock, tc.sourcePath, "", tc.forceFull, tc.parentSnapshot, tc.tags, &logrus.Logger{})
snapshotInfo, isSnapshotEmpty, err = Backup(context.Background(), nil, s.repoWriterMock, tc.sourcePath, "", tc.forceFull, tc.parentSnapshot, tc.volMode, tc.tags, &logrus.Logger{})
} else {
snapshotInfo, isSnapshotEmpty, err = Backup(context.Background(), s.uploderMock, s.repoWriterMock, tc.sourcePath, "", tc.forceFull, tc.parentSnapshot, tc.tags, &logrus.Logger{})
snapshotInfo, isSnapshotEmpty, err = Backup(context.Background(), s.uploderMock, s.repoWriterMock, tc.sourcePath, "", tc.forceFull, tc.parentSnapshot, tc.volMode, tc.tags, &logrus.Logger{})
}
// Check if the returned error matches the expected error
if tc.expectedError != nil {
Expand Down
14 changes: 13 additions & 1 deletion pkg/uploader/provider/kopia.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func (kp *kopiaProvider) RunBackup(
tags map[string]string,
forceFull bool,
parentSnapshot string,
volMode uploader.PersistentVolumeMode,
updater uploader.ProgressUpdater) (string, bool, error) {
if updater == nil {
return "", false, errors.New("Need to initial backup progress updater first")
Expand All @@ -127,6 +128,11 @@ func (kp *kopiaProvider) RunBackup(
return "", false, errors.New("path is empty")
}

// For now, error on block mode
if volMode == uploader.PersistentVolumeBlock {
return "", false, errors.New("unable to currently support block mode")
}

log := kp.log.WithFields(logrus.Fields{
"path": path,
"realSource": realSource,
Expand All @@ -153,7 +159,7 @@ func (kp *kopiaProvider) RunBackup(
tags[uploader.SnapshotRequesterTag] = kp.requestorType
tags[uploader.SnapshotUploaderTag] = uploader.KopiaType

snapshotInfo, isSnapshotEmpty, err := BackupFunc(ctx, kpUploader, repoWriter, path, realSource, forceFull, parentSnapshot, tags, log)
snapshotInfo, isSnapshotEmpty, err := BackupFunc(ctx, kpUploader, repoWriter, path, realSource, forceFull, parentSnapshot, volMode, tags, log)
if err != nil {
if kpUploader.IsCanceled() {
log.Error("Kopia backup is canceled")
Expand Down Expand Up @@ -197,11 +203,17 @@ func (kp *kopiaProvider) RunRestore(
ctx context.Context,
snapshotID string,
volumePath string,
volMode uploader.PersistentVolumeMode,
updater uploader.ProgressUpdater) error {
log := kp.log.WithFields(logrus.Fields{
"snapshotID": snapshotID,
"volumePath": volumePath,
})

if volMode == uploader.PersistentVolumeBlock {
return errors.New("unable to currently support block mode")
}

repoWriter := kopia.NewShimRepo(kp.bkRepo)
progress := new(kopia.Progress)
progress.InitThrottle(restoreProgressCheckInterval)
Expand Down
Loading

0 comments on commit d58a9fb

Please sign in to comment.