diff --git a/changelogs/unreleased/6897-dzaninovic b/changelogs/unreleased/6897-dzaninovic new file mode 100644 index 0000000000..b4d735d145 --- /dev/null +++ b/changelogs/unreleased/6897-dzaninovic @@ -0,0 +1 @@ +Add support for block volumes with Kopia \ No newline at end of file diff --git a/design/CLI/PoC/overlays/plugins/node-agent.yaml b/design/CLI/PoC/overlays/plugins/node-agent.yaml index dbb4ce18db..0db26e2c54 100644 --- a/design/CLI/PoC/overlays/plugins/node-agent.yaml +++ b/design/CLI/PoC/overlays/plugins/node-agent.yaml @@ -49,6 +49,9 @@ spec: - mountPath: /host_pods mountPropagation: HostToContainer name: host-pods + - mountPath: /var/lib/kubelet/plugins + mountPropagation: HostToContainer + name: host-plugins - mountPath: /scratch name: scratch - mountPath: /credentials @@ -60,6 +63,9 @@ spec: - hostPath: path: /var/lib/kubelet/pods name: host-pods + - hostPath: + path: /var/lib/kubelet/plugins + name: host-plugins - emptyDir: {} name: scratch - name: cloud-credentials diff --git a/design/volume-snapshot-data-movement/volume-snapshot-data-movement.md b/design/volume-snapshot-data-movement/volume-snapshot-data-movement.md index d006b29e21..6a88b8b48e 100644 --- a/design/volume-snapshot-data-movement/volume-snapshot-data-movement.md +++ b/design/volume-snapshot-data-movement/volume-snapshot-data-movement.md @@ -703,33 +703,38 @@ type Provider interface { In this case, we will extend the default kopia uploader to add the ability, when a given volume is for a block mode and is mapped as a device, we will use the [StreamingFile](https://pkg.go.dev/github.com/kopia/kopia@v0.13.0/fs#StreamingFile) to stream the device and backup to the kopia repository. ```go -func getLocalBlockEntry(kopiaEntry fs.Entry, log logrus.FieldLogger) (fs.Entry, error) { - path := kopiaEntry.LocalFilesystemPath() +func getLocalBlockEntry(sourcePath string) (fs.Entry, error) { + source, err := resolveSymlink(sourcePath) + if err != nil { + return nil, errors.Wrap(err, "resolveSymlink") + } - fileInfo, err := os.Lstat(path) + fileInfo, err := os.Lstat(source) if err != nil { - return nil, errors.Wrapf(err, "Unable to get the source device information %s", path) + return nil, errors.Wrapf(err, "unable to get the source device information %s", source) } if (fileInfo.Sys().(*syscall.Stat_t).Mode & syscall.S_IFMT) != syscall.S_IFBLK { - return nil, errors.Errorf("Source path %s is not a block device", path) + return nil, errors.Errorf("source path %s is not a block device", source) } - device, err := os.Open(path) + + device, err := os.Open(source) if err != nil { if os.IsPermission(err) || err.Error() == ErrNotPermitted { - return nil, errors.Wrapf(err, "No permission to open the source device %s, make sure that node agent is running in privileged mode", path) + return nil, errors.Wrapf(err, "no permission to open the source device %s, make sure that node agent is running in privileged mode", source) } - return nil, errors.Wrapf(err, "Unable to open the source device %s", path) + return nil, errors.Wrapf(err, "unable to open the source device %s", source) } - return virtualfs.StreamingFileFromReader(kopiaEntry.Name(), device), nil + sf := virtualfs.StreamingFileFromReader(source, device) + return virtualfs.NewStaticDirectory(source, []fs.Entry{sf}), nil } ``` In the `pkg/uploader/kopia/snapshot.go` this is used in the Backup call like ```go - if volMode == PersistentVolumeFilesystem { + if volMode == uploader.PersistentVolumeFilesystem { // to be consistent with restic when backup empty dir returns one error for upper logic handle dirs, err := os.ReadDir(source) if err != nil { @@ -742,15 +747,17 @@ In the `pkg/uploader/kopia/snapshot.go` this is used in the Backup call like source = filepath.Clean(source) ... - sourceEntry, err := getLocalFSEntry(source) - if err != nil { - return nil, false, errors.Wrap(err, "Unable to get local filesystem entry") - } + var sourceEntry fs.Entry - if volMode == PersistentVolumeBlock { - sourceEntry, err = getLocalBlockEntry(sourceEntry, log) + if volMode == uploader.PersistentVolumeBlock { + sourceEntry, err = getLocalBlockEntry(source) + if err != nil { + return nil, false, errors.Wrap(err, "unable to get local block device entry") + } + } else { + sourceEntry, err = getLocalFSEntry(source) if err != nil { - return nil, false, errors.Wrap(err, "Unable to get local block device entry") + return nil, false, errors.Wrap(err, "unable to get local filesystem entry") } } @@ -766,6 +773,8 @@ We only need to extend two functions the rest will be passed through. ```go type BlockOutput struct { *restore.FilesystemOutput + + targetFileName string } var _ restore.Output = &BlockOutput{} @@ -773,30 +782,15 @@ var _ restore.Output = &BlockOutput{} const bufferSize = 128 * 1024 func (o *BlockOutput) WriteFile(ctx context.Context, relativePath string, remoteFile fs.File) error { - - targetFileName, err := filepath.EvalSymlinks(o.TargetPath) - if err != nil { - return errors.Wrapf(err, "Unable to evaluate symlinks for %s", targetFileName) - } - - fileInfo, err := os.Lstat(targetFileName) - if err != nil { - return errors.Wrapf(err, "Unable to get the target device information for %s", targetFileName) - } - - if (fileInfo.Sys().(*syscall.Stat_t).Mode & syscall.S_IFMT) != syscall.S_IFBLK { - return errors.Errorf("Target file %s is not a block device", targetFileName) - } - remoteReader, err := remoteFile.Open(ctx) if err != nil { - return errors.Wrapf(err, "Failed to open remote file %s", remoteFile.Name()) + return errors.Wrapf(err, "failed to open remote file %s", remoteFile.Name()) } defer remoteReader.Close() - targetFile, err := os.Create(targetFileName) + targetFile, err := os.Create(o.targetFileName) if err != nil { - return errors.Wrapf(err, "Failed to open file %s", targetFileName) + return errors.Wrapf(err, "failed to open file %s", o.targetFileName) } defer targetFile.Close() @@ -807,7 +801,7 @@ func (o *BlockOutput) WriteFile(ctx context.Context, relativePath string, remote bytesToWrite, err := remoteReader.Read(buffer) if err != nil { if err != io.EOF { - return errors.Wrapf(err, "Failed to read data from remote file %s", targetFileName) + return errors.Wrapf(err, "failed to read data from remote file %s", o.targetFileName) } readData = false } @@ -819,7 +813,7 @@ func (o *BlockOutput) WriteFile(ctx context.Context, relativePath string, remote bytesToWrite -= bytesWritten offset += bytesWritten } else { - return errors.Wrapf(err, "Failed to write data to file %s", targetFileName) + return errors.Wrapf(err, "failed to write data to file %s", o.targetFileName) } } } @@ -829,42 +823,43 @@ func (o *BlockOutput) WriteFile(ctx context.Context, relativePath string, remote } func (o *BlockOutput) BeginDirectory(ctx context.Context, relativePath string, e fs.Directory) error { - targetFileName, err := filepath.EvalSymlinks(o.TargetPath) + var err error + o.targetFileName, err = filepath.EvalSymlinks(o.TargetPath) if err != nil { - return errors.Wrapf(err, "Unable to evaluate symlinks for %s", targetFileName) + return errors.Wrapf(err, "unable to evaluate symlinks for %s", o.targetFileName) } - fileInfo, err := os.Lstat(targetFileName) + fileInfo, err := os.Lstat(o.targetFileName) if err != nil { - return errors.Wrapf(err, "Unable to get the target device information for %s", o.TargetPath) + return errors.Wrapf(err, "unable to get the target device information for %s", o.TargetPath) } if (fileInfo.Sys().(*syscall.Stat_t).Mode & syscall.S_IFMT) != syscall.S_IFBLK { - return errors.Errorf("Target file %s is not a block device", o.TargetPath) + return errors.Errorf("target file %s is not a block device", o.TargetPath) } return nil } ``` -Of note, we do need to add root access to the daemon set node agent to access the new mount. +Additional mount is required in the node-agent specification to resolve symlinks to the block devices from /host_pods/POD_ID/volumeDevices/kubernetes.io~csi directory. ```yaml -... - mountPath: /var/lib/kubelet/plugins mountPropagation: HostToContainer name: host-plugins - .... - hostPath: path: /var/lib/kubelet/plugins name: host-plugins +``` + +Privileged mode is required to access the block devices in /var/lib/kubelet/plugins/kubernetes.io/csi/volumeDevices/publish directory as confirmed by testing on EKS and Minikube. -... +```yaml SecurityContext: &corev1.SecurityContext{ - Privileged: &c.privilegedAgent, + Privileged: &c.privilegedNodeAgent, }, - ``` ## Plugin Data Movers diff --git a/pkg/builder/persistent_volume_builder.go b/pkg/builder/persistent_volume_builder.go index 5fee88c196..4cf2e47f20 100644 --- a/pkg/builder/persistent_volume_builder.go +++ b/pkg/builder/persistent_volume_builder.go @@ -95,6 +95,12 @@ func (b *PersistentVolumeBuilder) StorageClass(name string) *PersistentVolumeBui return b } +// VolumeMode sets the PersistentVolume's volume mode. +func (b *PersistentVolumeBuilder) VolumeMode(volMode corev1api.PersistentVolumeMode) *PersistentVolumeBuilder { + b.object.Spec.VolumeMode = &volMode + return b +} + // NodeAffinityRequired sets the PersistentVolume's NodeAffinity Requirement. func (b *PersistentVolumeBuilder) NodeAffinityRequired(req *corev1api.NodeSelector) *PersistentVolumeBuilder { b.object.Spec.NodeAffinity = &corev1api.VolumeNodeAffinity{ diff --git a/pkg/cmd/cli/install/install.go b/pkg/cmd/cli/install/install.go index 95f3297936..43698af45c 100644 --- a/pkg/cmd/cli/install/install.go +++ b/pkg/cmd/cli/install/install.go @@ -66,6 +66,7 @@ type Options struct { BackupStorageConfig flag.Map VolumeSnapshotConfig flag.Map UseNodeAgent bool + PrivilegedNodeAgent bool //TODO remove UseRestic when migration test out of using it UseRestic bool Wait bool @@ -109,6 +110,7 @@ func (o *Options) BindFlags(flags *pflag.FlagSet) { flags.BoolVar(&o.RestoreOnly, "restore-only", o.RestoreOnly, "Run the server in restore-only mode. Optional.") flags.BoolVar(&o.DryRun, "dry-run", o.DryRun, "Generate resources, but don't send them to the cluster. Use with -o. Optional.") flags.BoolVar(&o.UseNodeAgent, "use-node-agent", o.UseNodeAgent, "Create Velero node-agent daemonset. Optional. Velero node-agent hosts Velero modules that need to run in one or more nodes(i.e. Restic, Kopia).") + flags.BoolVar(&o.PrivilegedNodeAgent, "privileged-node-agent", o.PrivilegedNodeAgent, "Use privileged mode for the node agent. Optional. Required to backup block devices.") flags.BoolVar(&o.Wait, "wait", o.Wait, "Wait for Velero deployment to be ready. Optional.") flags.DurationVar(&o.DefaultRepoMaintenanceFrequency, "default-repo-maintain-frequency", o.DefaultRepoMaintenanceFrequency, "How often 'maintain' is run for backup repositories by default. Optional.") flags.DurationVar(&o.GarbageCollectionFrequency, "garbage-collection-frequency", o.GarbageCollectionFrequency, "How often the garbage collection runs for expired backups.(default 1h)") @@ -195,6 +197,7 @@ func (o *Options) AsVeleroOptions() (*install.VeleroOptions, error) { SecretData: secretData, RestoreOnly: o.RestoreOnly, UseNodeAgent: o.UseNodeAgent, + PrivilegedNodeAgent: o.PrivilegedNodeAgent, UseVolumeSnapshots: o.UseVolumeSnapshots, BSLConfig: o.BackupStorageConfig.Data(), VSLConfig: o.VolumeSnapshotConfig.Data(), diff --git a/pkg/controller/data_upload_controller.go b/pkg/controller/data_upload_controller.go index 18f7e09f95..8bc650f5f9 100644 --- a/pkg/controller/data_upload_controller.go +++ b/pkg/controller/data_upload_controller.go @@ -177,7 +177,10 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, nil } - exposeParam := r.setupExposeParam(du) + exposeParam, err := r.setupExposeParam(du) + if err != nil { + return r.errorOut(ctx, du, err, "failed to set exposer parameters", log) + } // Expose() will trigger to create one pod whose volume is restored by a given volume snapshot, // but the pod maybe is not in the same node of the current controller, so we need to return it here. @@ -735,18 +738,33 @@ func (r *DataUploadReconciler) closeDataPath(ctx context.Context, duName string) r.dataPathMgr.RemoveAsyncBR(duName) } -func (r *DataUploadReconciler) setupExposeParam(du *velerov2alpha1api.DataUpload) interface{} { +func (r *DataUploadReconciler) setupExposeParam(du *velerov2alpha1api.DataUpload) (interface{}, error) { if du.Spec.SnapshotType == velerov2alpha1api.SnapshotTypeCSI { + pvc := &corev1.PersistentVolumeClaim{} + err := r.client.Get(context.Background(), types.NamespacedName{ + Namespace: du.Spec.SourceNamespace, + Name: du.Spec.SourcePVC, + }, pvc) + + if err != nil { + return nil, errors.Wrapf(err, "failed to get PVC %s/%s", du.Spec.SourceNamespace, du.Spec.SourcePVC) + } + + accessMode := exposer.AccessModeFileSystem + if pvc.Spec.VolumeMode != nil && *pvc.Spec.VolumeMode == corev1.PersistentVolumeBlock { + accessMode = exposer.AccessModeBlock + } + return &exposer.CSISnapshotExposeParam{ SnapshotName: du.Spec.CSISnapshot.VolumeSnapshot, SourceNamespace: du.Spec.SourceNamespace, StorageClass: du.Spec.CSISnapshot.StorageClass, HostingPodLabels: map[string]string{velerov1api.DataUploadLabel: du.Name}, - AccessMode: exposer.AccessModeFileSystem, + AccessMode: accessMode, Timeout: du.Spec.OperationTimeout.Duration, - } + }, nil } - return nil + return nil, nil } func (r *DataUploadReconciler) setupWaitExposePara(du *velerov2alpha1api.DataUpload) interface{} { diff --git a/pkg/controller/data_upload_controller_test.go b/pkg/controller/data_upload_controller_test.go index 270a084ad9..34bc4a6aae 100644 --- a/pkg/controller/data_upload_controller_test.go +++ b/pkg/controller/data_upload_controller_test.go @@ -306,6 +306,7 @@ func TestReconcile(t *testing.T) { name string du *velerov2alpha1api.DataUpload pod *corev1.Pod + pvc *corev1.PersistentVolumeClaim snapshotExposerList map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer dataMgr *datapath.Manager expectedProcessed bool @@ -345,11 +346,21 @@ func TestReconcile(t *testing.T) { }, { name: "Dataupload should be accepted", du: dataUploadBuilder().Result(), - pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), + pod: builder.ForPod("fake-ns", dataUploadName).Volumes(&corev1.Volume{Name: "test-pvc"}).Result(), + pvc: builder.ForPersistentVolumeClaim("fake-ns", "test-pvc").Result(), expectedProcessed: false, expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(), expectedRequeue: ctrl.Result{}, }, + { + name: "Dataupload should fail to get PVC information", + du: dataUploadBuilder().Result(), + pod: builder.ForPod("fake-ns", dataUploadName).Volumes(&corev1.Volume{Name: "wrong-pvc"}).Result(), + expectedProcessed: true, + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).Result(), + expectedRequeue: ctrl.Result{}, + expectedErrMsg: "failed to get PVC", + }, { name: "Dataupload should be prepared", du: dataUploadBuilder().SnapshotType(fakeSnapshotType).Result(), @@ -448,6 +459,11 @@ func TestReconcile(t *testing.T) { require.NoError(t, err) } + if test.pvc != nil { + err = r.client.Create(ctx, test.pvc) + require.NoError(t, err) + } + if test.dataMgr != nil { r.dataPathMgr = test.dataMgr } else { diff --git a/pkg/datapath/file_system.go b/pkg/datapath/file_system.go index 741f6ae086..fba9eac7b1 100644 --- a/pkg/datapath/file_system.go +++ b/pkg/datapath/file_system.go @@ -133,10 +133,10 @@ func (fs *fileSystemBR) StartBackup(source AccessPoint, realSource string, paren if !fs.initialized { return errors.New("file system data path is not initialized") } - volMode := getPersistentVolumeMode(source) go func() { - snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, realSource, tags, forceFull, parentSnapshot, volMode, fs) + snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, realSource, tags, forceFull, + parentSnapshot, source.VolMode, fs) if err == provider.ErrorCanceled { fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName) @@ -155,10 +155,8 @@ func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint) erro return errors.New("file system data path is not initialized") } - volMode := getPersistentVolumeMode(target) - go func() { - err := fs.uploaderProv.RunRestore(fs.ctx, snapshotID, target.ByPath, volMode, fs) + err := fs.uploaderProv.RunRestore(fs.ctx, snapshotID, target.ByPath, target.VolMode, fs) if err == provider.ErrorCanceled { fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName) @@ -172,13 +170,6 @@ func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint) erro return nil } -func getPersistentVolumeMode(source AccessPoint) uploader.PersistentVolumeMode { - if source.ByBlock != "" { - return uploader.PersistentVolumeBlock - } - return uploader.PersistentVolumeFilesystem -} - // UpdateProgress which implement ProgressUpdater interface to update progress status func (fs *fileSystemBR) UpdateProgress(p *uploader.Progress) { if fs.callbacks.OnProgress != nil { diff --git a/pkg/datapath/types.go b/pkg/datapath/types.go index 431fccc5dc..e26cf94823 100644 --- a/pkg/datapath/types.go +++ b/pkg/datapath/types.go @@ -53,7 +53,7 @@ type Callbacks struct { // AccessPoint represents an access point that has been exposed to a data path instance type AccessPoint struct { ByPath string - ByBlock string + VolMode uploader.PersistentVolumeMode } // AsyncBR is the interface for asynchronous data path methods diff --git a/pkg/exposer/csi_snapshot.go b/pkg/exposer/csi_snapshot.go index e0842f5f13..a0492d5233 100644 --- a/pkg/exposer/csi_snapshot.go +++ b/pkg/exposer/csi_snapshot.go @@ -233,9 +233,12 @@ func (e *csiSnapshotExposer) CleanUp(ctx context.Context, ownerObject corev1.Obj } func getVolumeModeByAccessMode(accessMode string) (corev1.PersistentVolumeMode, error) { - if accessMode == AccessModeFileSystem { + switch accessMode { + case AccessModeFileSystem: return corev1.PersistentVolumeFilesystem, nil - } else { + case AccessModeBlock: + return corev1.PersistentVolumeBlock, nil + default: return "", errors.Errorf("unsupported access mode %s", accessMode) } } @@ -356,6 +359,7 @@ func (e *csiSnapshotExposer) createBackupPod(ctx context.Context, ownerObject co } var gracePeriod int64 = 0 + volumeMounts, volumeDevices := kube.MakePodPVCAttachment(volumeName, backupPVC.Spec.VolumeMode) pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -379,10 +383,8 @@ func (e *csiSnapshotExposer) createBackupPod(ctx context.Context, ownerObject co Image: podInfo.image, ImagePullPolicy: corev1.PullNever, Command: []string{"/velero-helper", "pause"}, - VolumeMounts: []corev1.VolumeMount{{ - Name: volumeName, - MountPath: "/" + volumeName, - }}, + VolumeMounts: volumeMounts, + VolumeDevices: volumeDevices, }, }, ServiceAccountName: podInfo.serviceAccount, diff --git a/pkg/exposer/generic_restore.go b/pkg/exposer/generic_restore.go index ca5cd68a3f..0868aba475 100644 --- a/pkg/exposer/generic_restore.go +++ b/pkg/exposer/generic_restore.go @@ -82,7 +82,7 @@ func (e *genericRestoreExposer) Expose(ctx context.Context, ownerObject corev1.O return errors.Errorf("Target PVC %s/%s has already been bound, abort", sourceNamespace, targetPVCName) } - restorePod, err := e.createRestorePod(ctx, ownerObject, hostingPodLabels, selectedNode) + restorePod, err := e.createRestorePod(ctx, ownerObject, targetPVC, hostingPodLabels, selectedNode) if err != nil { return errors.Wrapf(err, "error to create restore pod") } @@ -247,7 +247,8 @@ func (e *genericRestoreExposer) RebindVolume(ctx context.Context, ownerObject co return nil } -func (e *genericRestoreExposer) createRestorePod(ctx context.Context, ownerObject corev1.ObjectReference, label map[string]string, selectedNode string) (*corev1.Pod, error) { +func (e *genericRestoreExposer) createRestorePod(ctx context.Context, ownerObject corev1.ObjectReference, targetPVC *corev1.PersistentVolumeClaim, + label map[string]string, selectedNode string) (*corev1.Pod, error) { restorePodName := ownerObject.Name restorePVCName := ownerObject.Name @@ -260,6 +261,7 @@ func (e *genericRestoreExposer) createRestorePod(ctx context.Context, ownerObjec } var gracePeriod int64 = 0 + volumeMounts, volumeDevices := kube.MakePodPVCAttachment(volumeName, targetPVC.Spec.VolumeMode) pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -283,10 +285,8 @@ func (e *genericRestoreExposer) createRestorePod(ctx context.Context, ownerObjec Image: podInfo.image, ImagePullPolicy: corev1.PullNever, Command: []string{"/velero-helper", "pause"}, - VolumeMounts: []corev1.VolumeMount{{ - Name: volumeName, - MountPath: "/" + volumeName, - }}, + VolumeMounts: volumeMounts, + VolumeDevices: volumeDevices, }, }, ServiceAccountName: podInfo.serviceAccount, diff --git a/pkg/exposer/host_path.go b/pkg/exposer/host_path.go index 458667d923..94dc4503c3 100644 --- a/pkg/exposer/host_path.go +++ b/pkg/exposer/host_path.go @@ -26,11 +26,13 @@ import ( ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" "github.com/vmware-tanzu/velero/pkg/datapath" + "github.com/vmware-tanzu/velero/pkg/uploader" "github.com/vmware-tanzu/velero/pkg/util/filesystem" "github.com/vmware-tanzu/velero/pkg/util/kube" ) var getVolumeDirectory = kube.GetVolumeDirectory +var getVolumeMode = kube.GetVolumeMode var singlePathMatch = kube.SinglePathMatch // GetPodVolumeHostPath returns a path that can be accessed from the host for a given volume of a pod @@ -45,7 +47,17 @@ func GetPodVolumeHostPath(ctx context.Context, pod *corev1.Pod, volumeName strin logger.WithField("volDir", volDir).Info("Got volume dir") - pathGlob := fmt.Sprintf("/host_pods/%s/volumes/*/%s", string(pod.GetUID()), volDir) + volMode, err := getVolumeMode(ctx, logger, pod, volumeName, cli) + if err != nil { + return datapath.AccessPoint{}, errors.Wrapf(err, "error getting volume mode for volume %s in pod %s", volumeName, pod.Name) + } + + volSubDir := "volumes" + if volMode == uploader.PersistentVolumeBlock { + volSubDir = "volumeDevices" + } + + pathGlob := fmt.Sprintf("/host_pods/%s/%s/*/%s", string(pod.GetUID()), volSubDir, volDir) logger.WithField("pathGlob", pathGlob).Debug("Looking for path matching glob") path, err := singlePathMatch(pathGlob, fs, logger) @@ -56,6 +68,7 @@ func GetPodVolumeHostPath(ctx context.Context, pod *corev1.Pod, volumeName strin logger.WithField("path", path).Info("Found path matching glob") return datapath.AccessPoint{ - ByPath: path, + ByPath: path, + VolMode: volMode, }, nil } diff --git a/pkg/exposer/host_path_test.go b/pkg/exposer/host_path_test.go index f71518d2d7..1022dffd30 100644 --- a/pkg/exposer/host_path_test.go +++ b/pkg/exposer/host_path_test.go @@ -29,17 +29,19 @@ import ( velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" "github.com/vmware-tanzu/velero/pkg/builder" velerotest "github.com/vmware-tanzu/velero/pkg/test" + "github.com/vmware-tanzu/velero/pkg/uploader" "github.com/vmware-tanzu/velero/pkg/util/filesystem" ) func TestGetPodVolumeHostPath(t *testing.T) { tests := []struct { - name string - getVolumeDirFunc func(context.Context, logrus.FieldLogger, *corev1.Pod, string, ctrlclient.Client) (string, error) - pathMatchFunc func(string, filesystem.Interface, logrus.FieldLogger) (string, error) - pod *corev1.Pod - pvc string - err string + name string + getVolumeDirFunc func(context.Context, logrus.FieldLogger, *corev1.Pod, string, ctrlclient.Client) (string, error) + getVolumeModeFunc func(context.Context, logrus.FieldLogger, *corev1.Pod, string, ctrlclient.Client) (uploader.PersistentVolumeMode, error) + pathMatchFunc func(string, filesystem.Interface, logrus.FieldLogger) (string, error) + pod *corev1.Pod + pvc string + err string }{ { name: "get volume dir fail", @@ -55,6 +57,9 @@ func TestGetPodVolumeHostPath(t *testing.T) { getVolumeDirFunc: func(context.Context, logrus.FieldLogger, *corev1.Pod, string, ctrlclient.Client) (string, error) { return "", nil }, + getVolumeModeFunc: func(context.Context, logrus.FieldLogger, *corev1.Pod, string, ctrlclient.Client) (uploader.PersistentVolumeMode, error) { + return uploader.PersistentVolumeFilesystem, nil + }, pathMatchFunc: func(string, filesystem.Interface, logrus.FieldLogger) (string, error) { return "", errors.New("fake-error-2") }, @@ -62,6 +67,18 @@ func TestGetPodVolumeHostPath(t *testing.T) { pvc: "fake-pvc-1", err: "error identifying unique volume path on host for volume fake-pvc-1 in pod fake-pod-2: fake-error-2", }, + { + name: "get block volume dir success", + getVolumeDirFunc: func(context.Context, logrus.FieldLogger, *corev1.Pod, string, ctrlclient.Client) ( + string, error) { + return "fake-pvc-1", nil + }, + pathMatchFunc: func(string, filesystem.Interface, logrus.FieldLogger) (string, error) { + return "/host_pods/fake-pod-1-id/volumeDevices/kubernetes.io~csi/fake-pvc-1-id", nil + }, + pod: builder.ForPod(velerov1api.DefaultNamespace, "fake-pod-1").Result(), + pvc: "fake-pvc-1", + }, } for _, test := range tests { @@ -70,12 +87,18 @@ func TestGetPodVolumeHostPath(t *testing.T) { getVolumeDirectory = test.getVolumeDirFunc } + if test.getVolumeModeFunc != nil { + getVolumeMode = test.getVolumeModeFunc + } + if test.pathMatchFunc != nil { singlePathMatch = test.pathMatchFunc } _, err := GetPodVolumeHostPath(context.Background(), test.pod, test.pvc, nil, nil, velerotest.NewLogger()) - assert.EqualError(t, err, test.err) + if test.err != "" || err != nil { + assert.EqualError(t, err, test.err) + } }) } } diff --git a/pkg/exposer/types.go b/pkg/exposer/types.go index 253256eb97..21c473366d 100644 --- a/pkg/exposer/types.go +++ b/pkg/exposer/types.go @@ -22,6 +22,7 @@ import ( const ( AccessModeFileSystem = "by-file-system" + AccessModeBlock = "by-block-device" ) // ExposeResult defines the result of expose. diff --git a/pkg/install/daemonset.go b/pkg/install/daemonset.go index b139f81242..8e74e16da1 100644 --- a/pkg/install/daemonset.go +++ b/pkg/install/daemonset.go @@ -86,6 +86,14 @@ func DaemonSet(namespace string, opts ...podTemplateOption) *appsv1.DaemonSet { }, }, }, + { + Name: "host-plugins", + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: "/var/lib/kubelet/plugins", + }, + }, + }, { Name: "scratch", VolumeSource: corev1.VolumeSource{ @@ -102,13 +110,20 @@ func DaemonSet(namespace string, opts ...podTemplateOption) *appsv1.DaemonSet { "/velero", }, Args: daemonSetArgs, - + SecurityContext: &corev1.SecurityContext{ + Privileged: &c.privilegedNodeAgent, + }, VolumeMounts: []corev1.VolumeMount{ { Name: "host-pods", MountPath: "/host_pods", MountPropagation: &mountPropagationMode, }, + { + Name: "host-plugins", + MountPath: "/var/lib/kubelet/plugins", + MountPropagation: &mountPropagationMode, + }, { Name: "scratch", MountPath: "/scratch", diff --git a/pkg/install/daemonset_test.go b/pkg/install/daemonset_test.go index 762e95b16b..017d5004d2 100644 --- a/pkg/install/daemonset_test.go +++ b/pkg/install/daemonset_test.go @@ -35,7 +35,7 @@ func TestDaemonSet(t *testing.T) { ds = DaemonSet("velero", WithSecret(true)) assert.Equal(t, 7, len(ds.Spec.Template.Spec.Containers[0].Env)) - assert.Equal(t, 3, len(ds.Spec.Template.Spec.Volumes)) + assert.Equal(t, 4, len(ds.Spec.Template.Spec.Volumes)) ds = DaemonSet("velero", WithFeatures([]string{"foo,bar,baz"})) assert.Len(t, ds.Spec.Template.Spec.Containers[0].Args, 3) diff --git a/pkg/install/deployment.go b/pkg/install/deployment.go index 22e2e5a4dd..0d076e3b45 100644 --- a/pkg/install/deployment.go +++ b/pkg/install/deployment.go @@ -46,6 +46,7 @@ type podTemplateConfig struct { defaultVolumesToFsBackup bool serviceAccountName string uploaderType string + privilegedNodeAgent bool } func WithImage(image string) podTemplateOption { @@ -142,6 +143,12 @@ func WithServiceAccountName(sa string) podTemplateOption { } } +func WithPrivilegedNodeAgent() podTemplateOption { + return func(c *podTemplateConfig) { + c.privilegedNodeAgent = true + } +} + func Deployment(namespace string, opts ...podTemplateOption) *appsv1.Deployment { // TODO: Add support for server args c := &podTemplateConfig{ diff --git a/pkg/install/resources.go b/pkg/install/resources.go index b7b3407eb3..9cf7b901c8 100644 --- a/pkg/install/resources.go +++ b/pkg/install/resources.go @@ -240,6 +240,7 @@ type VeleroOptions struct { SecretData []byte RestoreOnly bool UseNodeAgent bool + PrivilegedNodeAgent bool UseVolumeSnapshots bool BSLConfig map[string]string VSLConfig map[string]string @@ -369,6 +370,9 @@ func AllResources(o *VeleroOptions) *unstructured.UnstructuredList { if len(o.Features) > 0 { dsOpts = append(dsOpts, WithFeatures(o.Features)) } + if o.PrivilegedNodeAgent { + dsOpts = append(dsOpts, WithPrivilegedNodeAgent()) + } ds := DaemonSet(o.Namespace, dsOpts...) if err := appendUnstructured(resources, ds); err != nil { fmt.Printf("error appending DaemonSet %s: %s\n", ds.GetName(), err.Error()) diff --git a/pkg/podvolume/backupper.go b/pkg/podvolume/backupper.go index 57ab0c030f..2955b06b50 100644 --- a/pkg/podvolume/backupper.go +++ b/pkg/podvolume/backupper.go @@ -200,10 +200,11 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api. b.resultsLock.Unlock() var ( - errs []error - podVolumeBackups []*velerov1api.PodVolumeBackup - podVolumes = make(map[string]corev1api.Volume) - mountedPodVolumes = sets.String{} + errs []error + podVolumeBackups []*velerov1api.PodVolumeBackup + podVolumes = make(map[string]corev1api.Volume) + mountedPodVolumes = sets.String{} + attachedPodDevices = sets.String{} ) pvcSummary := NewPVCBackupSummary() @@ -233,6 +234,9 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api. for _, volumeMount := range container.VolumeMounts { mountedPodVolumes.Insert(volumeMount.Name) } + for _, volumeDevice := range container.VolumeDevices { + attachedPodDevices.Insert(volumeDevice.Name) + } } var numVolumeSnapshots int @@ -263,6 +267,15 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api. continue } + // check if volume is a block volume + if attachedPodDevices.Has(volumeName) { + msg := fmt.Sprintf("volume %s declared in pod %s/%s is a block volume. Block volumes are not supported for fs backup, skipping", + volumeName, pod.Namespace, pod.Name) + log.Warn(msg) + pvcSummary.addSkipped(volumeName, msg) + continue + } + // volumes that are not mounted by any container should not be backed up, because // its directory is not created if !mountedPodVolumes.Has(volumeName) { diff --git a/pkg/uploader/kopia/block_backup.go b/pkg/uploader/kopia/block_backup.go new file mode 100644 index 0000000000..a637925a49 --- /dev/null +++ b/pkg/uploader/kopia/block_backup.go @@ -0,0 +1,55 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kopia + +import ( + "os" + "syscall" + + "github.com/kopia/kopia/fs" + "github.com/kopia/kopia/fs/virtualfs" + "github.com/pkg/errors" +) + +const ErrNotPermitted = "operation not permitted" + +func getLocalBlockEntry(sourcePath string) (fs.Entry, error) { + source, err := resolveSymlink(sourcePath) + if err != nil { + return nil, errors.Wrap(err, "resolveSymlink") + } + + fileInfo, err := os.Lstat(source) + if err != nil { + return nil, errors.Wrapf(err, "unable to get the source device information %s", source) + } + + if (fileInfo.Sys().(*syscall.Stat_t).Mode & syscall.S_IFMT) != syscall.S_IFBLK { + return nil, errors.Errorf("source path %s is not a block device", source) + } + + device, err := os.Open(source) + if err != nil { + if os.IsPermission(err) || err.Error() == ErrNotPermitted { + return nil, errors.Wrapf(err, "no permission to open the source device %s, make sure that node agent is running in privileged mode", source) + } + return nil, errors.Wrapf(err, "unable to open the source device %s", source) + } + + sf := virtualfs.StreamingFileFromReader(source, device) + return virtualfs.NewStaticDirectory(source, []fs.Entry{sf}), nil +} diff --git a/pkg/uploader/kopia/block_restore.go b/pkg/uploader/kopia/block_restore.go new file mode 100644 index 0000000000..25d11ee24e --- /dev/null +++ b/pkg/uploader/kopia/block_restore.go @@ -0,0 +1,99 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kopia + +import ( + "context" + "io" + "os" + "path/filepath" + "syscall" + + "github.com/kopia/kopia/fs" + "github.com/kopia/kopia/snapshot/restore" + "github.com/pkg/errors" +) + +type BlockOutput struct { + *restore.FilesystemOutput + + targetFileName string +} + +var _ restore.Output = &BlockOutput{} + +const bufferSize = 128 * 1024 + +func (o *BlockOutput) WriteFile(ctx context.Context, relativePath string, remoteFile fs.File) error { + remoteReader, err := remoteFile.Open(ctx) + if err != nil { + return errors.Wrapf(err, "failed to open remote file %s", remoteFile.Name()) + } + defer remoteReader.Close() + + targetFile, err := os.Create(o.targetFileName) + if err != nil { + return errors.Wrapf(err, "failed to open file %s", o.targetFileName) + } + defer targetFile.Close() + + buffer := make([]byte, bufferSize) + + readData := true + for readData { + bytesToWrite, err := remoteReader.Read(buffer) + if err != nil { + if err != io.EOF { + return errors.Wrapf(err, "failed to read data from remote file %s", o.targetFileName) + } + readData = false + } + + if bytesToWrite > 0 { + offset := 0 + for bytesToWrite > 0 { + if bytesWritten, err := targetFile.Write(buffer[offset:bytesToWrite]); err == nil { + bytesToWrite -= bytesWritten + offset += bytesWritten + } else { + return errors.Wrapf(err, "failed to write data to file %s", o.targetFileName) + } + } + } + } + + return nil +} + +func (o *BlockOutput) BeginDirectory(ctx context.Context, relativePath string, e fs.Directory) error { + var err error + o.targetFileName, err = filepath.EvalSymlinks(o.TargetPath) + if err != nil { + return errors.Wrapf(err, "unable to evaluate symlinks for %s", o.targetFileName) + } + + fileInfo, err := os.Lstat(o.targetFileName) + if err != nil { + return errors.Wrapf(err, "unable to get the target device information for %s", o.TargetPath) + } + + if (fileInfo.Sys().(*syscall.Stat_t).Mode & syscall.S_IFMT) != syscall.S_IFBLK { + return errors.Errorf("target file %s is not a block device", o.TargetPath) + } + + return nil +} diff --git a/pkg/uploader/kopia/snapshot.go b/pkg/uploader/kopia/snapshot.go index 02129c9f0b..27ac842532 100644 --- a/pkg/uploader/kopia/snapshot.go +++ b/pkg/uploader/kopia/snapshot.go @@ -131,25 +131,22 @@ func Backup(ctx context.Context, fsUploader SnapshotUploader, repoWriter repo.Re if fsUploader == nil { return nil, false, errors.New("get empty kopia uploader") } - - if volMode == uploader.PersistentVolumeBlock { - return nil, false, errors.New("unable to handle block storage") - } - - dir, err := filepath.Abs(sourcePath) + source, err := filepath.Abs(sourcePath) if err != nil { return nil, false, errors.Wrapf(err, "Invalid source path '%s'", sourcePath) } - // to be consistent with restic when backup empty dir returns one error for upper logic handle - dirs, err := os.ReadDir(dir) - if err != nil { - return nil, false, errors.Wrapf(err, "Unable to read dir in path %s", dir) - } else if len(dirs) == 0 { - return nil, true, nil + if volMode == uploader.PersistentVolumeFilesystem { + // to be consistent with restic when backup empty dir returns one error for upper logic handle + dirs, err := os.ReadDir(source) + if err != nil { + return nil, false, errors.Wrapf(err, "Unable to read dir in path %s", source) + } else if len(dirs) == 0 { + return nil, true, nil + } } - dir = filepath.Clean(dir) + source = filepath.Clean(source) sourceInfo := snapshot.SourceInfo{ UserName: udmrepo.GetRepoUser(), @@ -157,16 +154,25 @@ func Backup(ctx context.Context, fsUploader SnapshotUploader, repoWriter repo.Re Path: filepath.Clean(realSource), } if realSource == "" { - sourceInfo.Path = dir + sourceInfo.Path = source } - rootDir, err := getLocalFSEntry(dir) - if err != nil { - return nil, false, errors.Wrap(err, "Unable to get local filesystem entry") + var sourceEntry fs.Entry + + if volMode == uploader.PersistentVolumeBlock { + sourceEntry, err = getLocalBlockEntry(source) + if err != nil { + return nil, false, errors.Wrap(err, "unable to get local block device entry") + } + } else { + sourceEntry, err = getLocalFSEntry(source) + if err != nil { + return nil, false, errors.Wrap(err, "unable to get local filesystem entry") + } } kopiaCtx := kopia.SetupKopiaLog(ctx, log) - snapID, snapshotSize, err := SnapshotSource(kopiaCtx, repoWriter, fsUploader, sourceInfo, rootDir, forceFull, parentSnapshot, tags, log, "Kopia Uploader") + snapID, snapshotSize, err := SnapshotSource(kopiaCtx, repoWriter, fsUploader, sourceInfo, sourceEntry, forceFull, parentSnapshot, tags, log, "Kopia Uploader") if err != nil { return nil, false, err } @@ -348,7 +354,8 @@ func findPreviousSnapshotManifest(ctx context.Context, rep repo.Repository, sour } // Restore restore specific sourcePath with given snapshotID and update progress -func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *Progress, snapshotID, dest string, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) { +func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *Progress, snapshotID, dest string, volMode uploader.PersistentVolumeMode, + log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) { log.Info("Start to restore...") kopiaCtx := kopia.SetupKopiaLog(ctx, log) @@ -370,7 +377,7 @@ func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *Progress, return 0, 0, errors.Wrapf(err, "Unable to resolve path %v", dest) } - output := &restore.FilesystemOutput{ + fsOutput := &restore.FilesystemOutput{ TargetPath: path, OverwriteDirectories: true, OverwriteFiles: true, @@ -378,11 +385,18 @@ func Restore(ctx context.Context, rep repo.RepositoryWriter, progress *Progress, IgnorePermissionErrors: true, } - err = output.Init(ctx) + err = fsOutput.Init(ctx) if err != nil { return 0, 0, errors.Wrap(err, "error to init output") } + var output restore.Output = fsOutput + if volMode == uploader.PersistentVolumeBlock { + output = &BlockOutput{ + FilesystemOutput: fsOutput, + } + } + stat, err := restoreEntryFunc(kopiaCtx, rep, output, rootEntry, restore.Options{ Parallel: runtime.NumCPU(), RestoreDirEntryAtDepth: math.MaxInt32, diff --git a/pkg/uploader/kopia/snapshot_test.go b/pkg/uploader/kopia/snapshot_test.go index 232ea92c4d..65ec22136a 100644 --- a/pkg/uploader/kopia/snapshot_test.go +++ b/pkg/uploader/kopia/snapshot_test.go @@ -23,6 +23,7 @@ import ( "time" "github.com/kopia/kopia/fs" + "github.com/kopia/kopia/fs/virtualfs" "github.com/kopia/kopia/repo" "github.com/kopia/kopia/repo/manifest" "github.com/kopia/kopia/snapshot" @@ -594,11 +595,11 @@ func TestBackup(t *testing.T) { expectedError: errors.New("Unable to read dir"), }, { - name: "Unable to handle block mode", + name: "Source path is not a block device", sourcePath: "/", tags: nil, volMode: uploader.PersistentVolumeBlock, - expectedError: errors.New("unable to handle block storage"), + expectedError: errors.New("source path / is not a block device"), }, } @@ -660,6 +661,7 @@ func TestRestore(t *testing.T) { expectedBytes int64 expectedCount int32 expectedError error + volMode uploader.PersistentVolumeMode } // Define test cases @@ -697,6 +699,46 @@ func TestRestore(t *testing.T) { snapshotID: "snapshot-123", expectedError: nil, }, + { + name: "Expect block volume successful", + filesystemEntryFunc: func(ctx context.Context, rep repo.Repository, rootID string, consistentAttributes bool) (fs.Entry, error) { + return snapshotfs.EntryFromDirEntry(rep, &snapshot.DirEntry{Type: snapshot.EntryTypeFile}), nil + }, + restoreEntryFunc: func(ctx context.Context, rep repo.Repository, output restore.Output, rootEntry fs.Entry, options restore.Options) (restore.Stats, error) { + return restore.Stats{}, nil + }, + snapshotID: "snapshot-123", + expectedError: nil, + volMode: uploader.PersistentVolumeBlock, + }, + { + name: "Unable to evaluate symlinks for block volume", + filesystemEntryFunc: func(ctx context.Context, rep repo.Repository, rootID string, consistentAttributes bool) (fs.Entry, error) { + return snapshotfs.EntryFromDirEntry(rep, &snapshot.DirEntry{Type: snapshot.EntryTypeFile}), nil + }, + restoreEntryFunc: func(ctx context.Context, rep repo.Repository, output restore.Output, rootEntry fs.Entry, options restore.Options) (restore.Stats, error) { + err := output.BeginDirectory(ctx, "fake-dir", virtualfs.NewStaticDirectory("fake-dir", nil)) + return restore.Stats{}, err + }, + snapshotID: "snapshot-123", + expectedError: errors.New("unable to evaluate symlinks for"), + volMode: uploader.PersistentVolumeBlock, + dest: "/wrong-dest", + }, + { + name: "Target file is not a block device", + filesystemEntryFunc: func(ctx context.Context, rep repo.Repository, rootID string, consistentAttributes bool) (fs.Entry, error) { + return snapshotfs.EntryFromDirEntry(rep, &snapshot.DirEntry{Type: snapshot.EntryTypeFile}), nil + }, + restoreEntryFunc: func(ctx context.Context, rep repo.Repository, output restore.Output, rootEntry fs.Entry, options restore.Options) (restore.Stats, error) { + err := output.BeginDirectory(ctx, "fake-dir", virtualfs.NewStaticDirectory("fake-dir", nil)) + return restore.Stats{}, err + }, + snapshotID: "snapshot-123", + expectedError: errors.New("target file /tmp is not a block device"), + volMode: uploader.PersistentVolumeBlock, + dest: "/tmp", + }, } em := &manifest.EntryMetadata{ @@ -706,6 +748,10 @@ func TestRestore(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { + if tc.volMode == "" { + tc.volMode = uploader.PersistentVolumeFilesystem + } + if tc.invalidManifestType { em.Labels[manifest.TypeLabelKey] = "" } else { @@ -725,7 +771,7 @@ func TestRestore(t *testing.T) { repoWriterMock.On("OpenObject", mock.Anything, mock.Anything).Return(em, nil) progress := new(Progress) - bytesRestored, fileCount, err := Restore(context.Background(), repoWriterMock, progress, tc.snapshotID, tc.dest, logrus.New(), nil) + bytesRestored, fileCount, err := Restore(context.Background(), repoWriterMock, progress, tc.snapshotID, tc.dest, tc.volMode, logrus.New(), nil) // Check if the returned error matches the expected error if tc.expectedError != nil { diff --git a/pkg/uploader/provider/kopia.go b/pkg/uploader/provider/kopia.go index dd32173b82..706393362d 100644 --- a/pkg/uploader/provider/kopia.go +++ b/pkg/uploader/provider/kopia.go @@ -128,11 +128,6 @@ func (kp *kopiaProvider) RunBackup( return "", false, errors.New("path is empty") } - // For now, error on block mode - if volMode == uploader.PersistentVolumeBlock { - return "", false, errors.New("unable to currently support block mode") - } - log := kp.log.WithFields(logrus.Fields{ "path": path, "realSource": realSource, @@ -214,10 +209,6 @@ func (kp *kopiaProvider) RunRestore( "volumePath": volumePath, }) - if volMode == uploader.PersistentVolumeBlock { - return errors.New("unable to currently support block mode") - } - repoWriter := kopia.NewShimRepo(kp.bkRepo) progress := new(kopia.Progress) progress.InitThrottle(restoreProgressCheckInterval) @@ -235,7 +226,7 @@ func (kp *kopiaProvider) RunRestore( // We use the cancel channel to control the restore cancel, so don't pass a context with cancel to Kopia restore. // Otherwise, Kopia restore will not response to the cancel control but return an arbitrary error. // Kopia restore cancel is not designed as well as Kopia backup which uses the context to control backup cancel all the way. - size, fileCount, err := RestoreFunc(context.Background(), repoWriter, progress, snapshotID, volumePath, log, restoreCancel) + size, fileCount, err := RestoreFunc(context.Background(), repoWriter, progress, snapshotID, volumePath, volMode, log, restoreCancel) if err != nil { return errors.Wrapf(err, "Failed to run kopia restore") diff --git a/pkg/uploader/provider/kopia_test.go b/pkg/uploader/provider/kopia_test.go index 944cdbcceb..c38d370ce3 100644 --- a/pkg/uploader/provider/kopia_test.go +++ b/pkg/uploader/provider/kopia_test.go @@ -94,12 +94,12 @@ func TestRunBackup(t *testing.T) { notError: false, }, { - name: "error on vol mode", + name: "success to backup block mode volume", hookBackupFunc: func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) { - return nil, true, nil + return &uploader.SnapshotInfo{}, false, nil }, volMode: uploader.PersistentVolumeBlock, - notError: false, + notError: true, }, } for _, tc := range testCases { @@ -125,31 +125,31 @@ func TestRunRestore(t *testing.T) { testCases := []struct { name string - hookRestoreFunc func(ctx context.Context, rep repo.RepositoryWriter, progress *kopia.Progress, snapshotID, dest string, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) + hookRestoreFunc func(ctx context.Context, rep repo.RepositoryWriter, progress *kopia.Progress, snapshotID, dest string, volMode uploader.PersistentVolumeMode, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) notError bool volMode uploader.PersistentVolumeMode }{ { name: "normal restore", - hookRestoreFunc: func(ctx context.Context, rep repo.RepositoryWriter, progress *kopia.Progress, snapshotID, dest string, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) { + hookRestoreFunc: func(ctx context.Context, rep repo.RepositoryWriter, progress *kopia.Progress, snapshotID, dest string, volMode uploader.PersistentVolumeMode, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) { return 0, 0, nil }, notError: true, }, { name: "failed to restore", - hookRestoreFunc: func(ctx context.Context, rep repo.RepositoryWriter, progress *kopia.Progress, snapshotID, dest string, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) { + hookRestoreFunc: func(ctx context.Context, rep repo.RepositoryWriter, progress *kopia.Progress, snapshotID, dest string, volMode uploader.PersistentVolumeMode, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) { return 0, 0, errors.New("failed to restore") }, notError: false, }, { - name: "failed to restore block mode", - hookRestoreFunc: func(ctx context.Context, rep repo.RepositoryWriter, progress *kopia.Progress, snapshotID, dest string, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) { - return 0, 0, errors.New("failed to restore") + name: "normal block mode restore", + hookRestoreFunc: func(ctx context.Context, rep repo.RepositoryWriter, progress *kopia.Progress, snapshotID, dest string, volMode uploader.PersistentVolumeMode, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) { + return 0, 0, nil }, volMode: uploader.PersistentVolumeBlock, - notError: false, + notError: true, }, } diff --git a/pkg/uploader/provider/restic_test.go b/pkg/uploader/provider/restic_test.go index f2203d7bdd..62f44d04f3 100644 --- a/pkg/uploader/provider/restic_test.go +++ b/pkg/uploader/provider/restic_test.go @@ -212,6 +212,9 @@ func TestResticRunRestore(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { + if tc.volMode == "" { + tc.volMode = uploader.PersistentVolumeFilesystem + } resticRestoreCMDFunc = tc.hookResticRestoreFunc if tc.volMode == "" { tc.volMode = uploader.PersistentVolumeFilesystem diff --git a/pkg/util/kube/pvc_pv.go b/pkg/util/kube/pvc_pv.go index 2af818d909..1a393b3ab2 100644 --- a/pkg/util/kube/pvc_pv.go +++ b/pkg/util/kube/pvc_pv.go @@ -316,3 +316,23 @@ func WaitPVBound(ctx context.Context, pvGetter corev1client.CoreV1Interface, pvN func IsPVCBound(pvc *corev1api.PersistentVolumeClaim) bool { return pvc.Spec.VolumeName != "" } + +// MakePodPVCAttachment returns the volume mounts and devices for a pod needed to attach a PVC +func MakePodPVCAttachment(volumeName string, volumeMode *corev1api.PersistentVolumeMode) ([]corev1api.VolumeMount, []corev1api.VolumeDevice) { + var volumeMounts []corev1api.VolumeMount = nil + var volumeDevices []corev1api.VolumeDevice = nil + + if volumeMode != nil && *volumeMode == corev1api.PersistentVolumeBlock { + volumeDevices = []corev1api.VolumeDevice{{ + Name: volumeName, + DevicePath: "/" + volumeName, + }} + } else { + volumeMounts = []corev1api.VolumeMount{{ + Name: volumeName, + MountPath: "/" + volumeName, + }} + } + + return volumeMounts, volumeDevices +} diff --git a/pkg/util/kube/utils.go b/pkg/util/kube/utils.go index 3d8d4e3ef1..e1ed48dba2 100644 --- a/pkg/util/kube/utils.go +++ b/pkg/util/kube/utils.go @@ -35,6 +35,7 @@ import ( corev1client "k8s.io/client-go/kubernetes/typed/core/v1" "sigs.k8s.io/controller-runtime/pkg/client" + "github.com/vmware-tanzu/velero/pkg/uploader" "github.com/vmware-tanzu/velero/pkg/util/filesystem" ) @@ -50,6 +51,8 @@ const ( KubeAnnSelectedNode = "volume.kubernetes.io/selected-node" ) +var ErrorPodVolumeIsNotPVC = errors.New("pod volume is not a PVC") + // NamespaceAndName returns a string in the format / func NamespaceAndName(objMeta metav1.Object) string { if objMeta.GetNamespace() == "" { @@ -122,6 +125,57 @@ func EnsureNamespaceExistsAndIsReady(namespace *corev1api.Namespace, client core // where the specified volume lives. // For volumes with a CSIVolumeSource, append "/mount" to the directory name. func GetVolumeDirectory(ctx context.Context, log logrus.FieldLogger, pod *corev1api.Pod, volumeName string, cli client.Client) (string, error) { + pvc, pv, volume, err := GetPodPVCVolume(ctx, log, pod, volumeName, cli) + if err != nil { + // This case implies the administrator created the PV and attached it directly, without PVC. + // Note that only one VolumeSource can be populated per Volume on a pod + if err == ErrorPodVolumeIsNotPVC { + if volume.VolumeSource.CSI != nil { + return volume.Name + "/mount", nil + } + return volume.Name, nil + } + return "", errors.WithStack(err) + } + + // Most common case is that we have a PVC VolumeSource, and we need to check the PV it points to for a CSI source. + // PV's been created with a CSI source. + isProvisionedByCSI, err := isProvisionedByCSI(log, pv, cli) + if err != nil { + return "", errors.WithStack(err) + } + if isProvisionedByCSI { + if pv.Spec.VolumeMode != nil && *pv.Spec.VolumeMode == corev1api.PersistentVolumeBlock { + return pvc.Spec.VolumeName, nil + } + return pvc.Spec.VolumeName + "/mount", nil + } + + return pvc.Spec.VolumeName, nil +} + +// GetVolumeMode gets the uploader.PersistentVolumeMode of the volume. +func GetVolumeMode(ctx context.Context, log logrus.FieldLogger, pod *corev1api.Pod, volumeName string, cli client.Client) ( + uploader.PersistentVolumeMode, error) { + _, pv, _, err := GetPodPVCVolume(ctx, log, pod, volumeName, cli) + + if err != nil { + if err == ErrorPodVolumeIsNotPVC { + return uploader.PersistentVolumeFilesystem, nil + } + return "", errors.WithStack(err) + } + + if pv.Spec.VolumeMode != nil && *pv.Spec.VolumeMode == corev1api.PersistentVolumeBlock { + return uploader.PersistentVolumeBlock, nil + } + return uploader.PersistentVolumeFilesystem, nil +} + +// GetPodPVCVolume gets the PVC, PV and volume for a pod volume name. +// Returns pod volume in case of ErrorPodVolumeIsNotPVC error +func GetPodPVCVolume(ctx context.Context, log logrus.FieldLogger, pod *corev1api.Pod, volumeName string, cli client.Client) ( + *corev1api.PersistentVolumeClaim, *corev1api.PersistentVolume, *corev1api.Volume, error) { var volume *corev1api.Volume for i := range pod.Spec.Volumes { @@ -132,41 +186,26 @@ func GetVolumeDirectory(ctx context.Context, log logrus.FieldLogger, pod *corev1 } if volume == nil { - return "", errors.New("volume not found in pod") + return nil, nil, nil, errors.New("volume not found in pod") } - // This case implies the administrator created the PV and attached it directly, without PVC. - // Note that only one VolumeSource can be populated per Volume on a pod if volume.VolumeSource.PersistentVolumeClaim == nil { - if volume.VolumeSource.CSI != nil { - return volume.Name + "/mount", nil - } - return volume.Name, nil + return nil, nil, volume, ErrorPodVolumeIsNotPVC // There is a pod volume but it is not a PVC } - // Most common case is that we have a PVC VolumeSource, and we need to check the PV it points to for a CSI source. pvc := &corev1api.PersistentVolumeClaim{} err := cli.Get(ctx, client.ObjectKey{Namespace: pod.Namespace, Name: volume.VolumeSource.PersistentVolumeClaim.ClaimName}, pvc) if err != nil { - return "", errors.WithStack(err) + return nil, nil, nil, errors.WithStack(err) } pv := &corev1api.PersistentVolume{} err = cli.Get(ctx, client.ObjectKey{Name: pvc.Spec.VolumeName}, pv) if err != nil { - return "", errors.WithStack(err) + return nil, nil, nil, errors.WithStack(err) } - // PV's been created with a CSI source. - isProvisionedByCSI, err := isProvisionedByCSI(log, pv, cli) - if err != nil { - return "", errors.WithStack(err) - } - if isProvisionedByCSI { - return pvc.Spec.VolumeName + "/mount", nil - } - - return pvc.Spec.VolumeName, nil + return pvc, pv, volume, nil } // isProvisionedByCSI function checks whether this is a CSI PV by annotation. diff --git a/pkg/util/kube/utils_test.go b/pkg/util/kube/utils_test.go index 6edf084351..31110dc8c0 100644 --- a/pkg/util/kube/utils_test.go +++ b/pkg/util/kube/utils_test.go @@ -38,6 +38,7 @@ import ( "github.com/vmware-tanzu/velero/pkg/builder" velerotest "github.com/vmware-tanzu/velero/pkg/test" + "github.com/vmware-tanzu/velero/pkg/uploader" ) func TestNamespaceAndName(t *testing.T) { @@ -164,6 +165,13 @@ func TestGetVolumeDirectorySuccess(t *testing.T) { pv: builder.ForPersistentVolume("a-pv").CSI("csi.test.com", "provider-volume-id").Result(), want: "a-pv/mount", }, + { + name: "Block CSI volume with a PVC/PV does not append '/mount' to the volume name", + pod: builder.ForPod("ns-1", "my-pod").Volumes(builder.ForVolume("my-vol").PersistentVolumeClaimSource("my-pvc").Result()).Result(), + pvc: builder.ForPersistentVolumeClaim("ns-1", "my-pvc").VolumeName("a-pv").Result(), + pv: builder.ForPersistentVolume("a-pv").CSI("csi.test.com", "provider-volume-id").VolumeMode(corev1.PersistentVolumeBlock).Result(), + want: "a-pv", + }, { name: "CSI volume mounted without a PVC appends '/mount' to the volume name", pod: builder.ForPod("ns-1", "my-pod").Volumes(builder.ForVolume("my-vol").CSISource("csi.test.com").Result()).Result(), @@ -211,6 +219,54 @@ func TestGetVolumeDirectorySuccess(t *testing.T) { } } +// TestGetVolumeModeSuccess tests the GetVolumeMode function +func TestGetVolumeModeSuccess(t *testing.T) { + tests := []struct { + name string + pod *corev1.Pod + pvc *corev1.PersistentVolumeClaim + pv *corev1.PersistentVolume + want uploader.PersistentVolumeMode + }{ + { + name: "Filesystem PVC volume", + pod: builder.ForPod("ns-1", "my-pod").Volumes(builder.ForVolume("my-vol").PersistentVolumeClaimSource("my-pvc").Result()).Result(), + pvc: builder.ForPersistentVolumeClaim("ns-1", "my-pvc").VolumeName("a-pv").Result(), + pv: builder.ForPersistentVolume("a-pv").VolumeMode(corev1.PersistentVolumeFilesystem).Result(), + want: uploader.PersistentVolumeFilesystem, + }, + { + name: "Block PVC volume", + pod: builder.ForPod("ns-1", "my-pod").Volumes(builder.ForVolume("my-vol").PersistentVolumeClaimSource("my-pvc").Result()).Result(), + pvc: builder.ForPersistentVolumeClaim("ns-1", "my-pvc").VolumeName("a-pv").Result(), + pv: builder.ForPersistentVolume("a-pv").VolumeMode(corev1.PersistentVolumeBlock).Result(), + want: uploader.PersistentVolumeBlock, + }, + { + name: "Pod volume without a PVC", + pod: builder.ForPod("ns-1", "my-pod").Volumes(builder.ForVolume("my-vol").Result()).Result(), + want: uploader.PersistentVolumeFilesystem, + }, + } + + for _, tc := range tests { + clientBuilder := fake.NewClientBuilder() + + if tc.pvc != nil { + clientBuilder = clientBuilder.WithObjects(tc.pvc) + } + if tc.pv != nil { + clientBuilder = clientBuilder.WithObjects(tc.pv) + } + + // Function under test + mode, err := GetVolumeMode(context.Background(), logrus.StandardLogger(), tc.pod, tc.pod.Spec.Volumes[0].Name, clientBuilder.Build()) + + require.NoError(t, err) + assert.Equal(t, tc.want, mode) + } +} + func TestIsV1Beta1CRDReady(t *testing.T) { tests := []struct { name string diff --git a/site/content/docs/main/csi-snapshot-data-movement.md b/site/content/docs/main/csi-snapshot-data-movement.md index 667be0a48c..b694da9f3a 100644 --- a/site/content/docs/main/csi-snapshot-data-movement.md +++ b/site/content/docs/main/csi-snapshot-data-movement.md @@ -75,24 +75,10 @@ To mount the correct hostpath to pods volumes, run the node-agent pod in `privil oc adm policy add-scc-to-user privileged -z velero -n velero ``` -2. Modify the DaemonSet yaml to request a privileged mode: - - ```diff - @@ -67,3 +67,5 @@ spec: - value: /credentials/cloud - - name: VELERO_SCRATCH_DIR - value: /scratch - + securityContext: - + privileged: true +2. Install Velero with the '--privileged-node-agent' option to request a privileged mode: + ``` - - or - - ```shell - oc patch ds/node-agent \ - --namespace velero \ - --type json \ - -p '[{"op":"add","path":"/spec/template/spec/containers/0/securityContext","value": { "privileged": true}}]' + velero install --use-node-agent --privileged-node-agent ``` If node-agent is not running in a privileged mode, it will not be able to access snapshot volumes within the mounted diff --git a/site/content/docs/main/customize-installation.md b/site/content/docs/main/customize-installation.md index 0d8621685d..a4e2299c8a 100644 --- a/site/content/docs/main/customize-installation.md +++ b/site/content/docs/main/customize-installation.md @@ -23,6 +23,14 @@ By default, `velero install` does not install Velero's [File System Backup][3]. If you've already run `velero install` without the `--use-node-agent` flag, you can run the same command again, including the `--use-node-agent` flag, to add the file system backup to your existing install. +## CSI Snapshot Data Movement + +Velero node-agent is required by CSI snapshot data movement when Velero built-in data mover is used. By default, `velero install` does not install Velero's node-agent. To enable it, specify the `--use-node-agent` flag. + +For some use cases, Velero node-agent requires to run under privileged mode. For example, when backing up block volumes, it is required to allow the node-agent to access the block device. To enable it set velero install flags `--privileged-node-agent`. + +If you've already run `velero install` without the `--use-node-agent` or `--privileged-node-agent` flag, you can run the same command again, including the `--use-node-agent` or `--privileged-node-agent` flag, to add CSI snapshot data movement to your existing install. + ## Default Pod Volume backup to file system backup By default, `velero install` does not enable the use of File System Backup (FSB) to take backups of all pod volumes. You must apply an [annotation](file-system-backup.md/#using-opt-in-pod-volume-backup) to every pod which contains volumes for Velero to use FSB for the backup. diff --git a/site/content/docs/main/file-system-backup.md b/site/content/docs/main/file-system-backup.md index 108daede44..30021806be 100644 --- a/site/content/docs/main/file-system-backup.md +++ b/site/content/docs/main/file-system-backup.md @@ -111,24 +111,10 @@ To mount the correct hostpath to pods volumes, run the node-agent pod in `privil oc adm policy add-scc-to-user privileged -z velero -n velero ``` -2. Modify the DaemonSet yaml to request a privileged mode: - - ```diff - @@ -67,3 +67,5 @@ spec: - value: /credentials/cloud - - name: VELERO_SCRATCH_DIR - value: /scratch - + securityContext: - + privileged: true +2. Install Velero with the '--privileged-node-agent' option to request a privileged mode: + ``` - - or - - ```shell - oc patch ds/node-agent \ - --namespace velero \ - --type json \ - -p '[{"op":"add","path":"/spec/template/spec/containers/0/securityContext","value": { "privileged": true}}]' + velero install --use-node-agent --privileged-node-agent ``` diff --git a/tilt-resources/examples/node-agent.yaml b/tilt-resources/examples/node-agent.yaml index d5c10fc47e..835ba297ff 100644 --- a/tilt-resources/examples/node-agent.yaml +++ b/tilt-resources/examples/node-agent.yaml @@ -49,6 +49,9 @@ spec: - mountPath: /host_pods mountPropagation: HostToContainer name: host-pods + - mountPath: /var/lib/kubelet/plugins + mountPropagation: HostToContainer + name: host-plugins - mountPath: /scratch name: scratch - mountPath: /credentials @@ -60,6 +63,9 @@ spec: - hostPath: path: /var/lib/kubelet/pods name: host-pods + - hostPath: + path: /var/lib/kubelet/plugins + name: host-plugins - emptyDir: {} name: scratch - name: cloud-credentials