From a66539f419ff3d2c68c04586b2d8df7211fca278 Mon Sep 17 00:00:00 2001 From: Derek Su Date: Wed, 5 Jul 2023 14:39:26 +0800 Subject: [PATCH] Support encrypted volumes in block mode Use the staging path as a parent folder and bind mount device mapper mapped encrypted volume device into a file inside of that folder. This allows us to store additional metadata and give us more control over our tear down for example since we are not sharing control with kubernetes as is the case for the staging path itself. Ref: https://github.com/longhorn/longhorn-manager/pull/1613 Longhorn 4883 Signed-off-by: Derek Su --- csi/node_server.go | 133 +++++++++++++++++++++++++++------------------ csi/util.go | 10 ++++ 2 files changed, 90 insertions(+), 53 deletions(-) diff --git a/csi/node_server.go b/csi/node_server.go index e58c492f35..13faeba868 100644 --- a/csi/node_server.go +++ b/csi/node_server.go @@ -56,6 +56,7 @@ type NodeServer struct { apiClient *longhornclient.RancherClient nodeID string caps []*csi.NodeServiceCapability + log *logrus.Entry } func NewNodeServer(apiClient *longhornclient.RancherClient, nodeID string) *NodeServer { @@ -68,17 +69,13 @@ func NewNodeServer(apiClient *longhornclient.RancherClient, nodeID string) *Node csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME, csi.NodeServiceCapability_RPC_EXPAND_VOLUME, }), + log: logrus.StandardLogger().WithField("component", "csi-node-server"), } } -func getLoggerForCSINodeServer() *logrus.Entry { - return logrus.StandardLogger().WithField("component", "csi-node-server") -} - // NodePublishVolume will mount the volume /dev/longhorn/ to target_path func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { - log := getLoggerForCSINodeServer() - log = log.WithFields(logrus.Fields{"function": "NodePublishVolume"}) + log := ns.log.WithFields(logrus.Fields{"function": "NodePublishVolume"}) targetPath := req.GetTargetPath() if targetPath == "" { @@ -140,7 +137,17 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis } if volumeCapability.GetBlock() != nil { - devicePath := volume.Controllers[0].Endpoint + devicePath := getStageBlockVolumePath(stagingTargetPath, volumeID) + _, err := os.Stat(devicePath) + if err != nil { + if !os.IsNotExist(err) { + return nil, status.Errorf(codes.Internal, errors.Wrapf(err, "failed to stat device %s", devicePath).Error()) + } + // Fall back to the controller endpoint if the device path under the stagingTargetPath doesn't exist + log.Infof("Device path %s doesn't exist, falling back to controller endpoint %s", devicePath, volume.Controllers[0].Endpoint) + devicePath = volume.Controllers[0].Endpoint + } + if err := ns.nodePublishBlockVolume(volumeID, devicePath, targetPath, mounter); err != nil { log.WithError(err).Errorf("Failed to publish BlockVolume %s", volumeID) return nil, err @@ -152,7 +159,7 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis // we validate the staging path to make sure the global mount is still valid if isMnt, err := ensureMountPoint(stagingTargetPath, mounter); err != nil || !isMnt { - msg := fmt.Sprintf("Staging path %v is no longer valid for volume %v", stagingTargetPath, volumeID) + msg := fmt.Sprintf("Staging target path %v is no longer valid for volume %v", stagingTargetPath, volumeID) log.WithError(err).Error(msg) // HACK: normally when we return FailedPrecondition below kubelet should call NodeStageVolume again @@ -249,12 +256,11 @@ func (ns *NodeServer) nodeStageSharedVolume(volumeID, shareEndpoint, targetPath } func (ns *NodeServer) nodeStageMountVolume(volumeID, devicePath, stagingTargetPath, fsType string, mountFlags []string, mounter *mount.SafeFormatAndMount) error { - log := getLoggerForCSINodeServer() - log = log.WithFields(logrus.Fields{"function": "NodePublishVolume"}) + log := ns.log.WithFields(logrus.Fields{"function": "NodePublishVolume"}) isMnt, err := ensureMountPoint(stagingTargetPath, mounter) if err != nil { - return status.Errorf(codes.Internal, errors.Wrapf(err, "failed to prepare mount point for volume %v", volumeID).Error()) + return status.Errorf(codes.Internal, errors.Wrapf(err, "failed to prepare mount point %v for volume %v", stagingTargetPath, volumeID).Error()) } if isMnt { return nil @@ -267,30 +273,38 @@ func (ns *NodeServer) nodeStageMountVolume(volumeID, devicePath, stagingTargetPa return nil } +// nodeStageBlockVolume utilizes the stagingTargetPath to create a volumeID file to bind mount the devicePath +// this is valid since the csi plugin is in control of the staging path +func (ns *NodeServer) nodeStageBlockVolume(volumeID, devicePath, stagingTargetPath string, mounter mount.Interface) error { + path := getStageBlockVolumePath(stagingTargetPath, volumeID) + return ns.nodePublishBlockVolume(volumeID, devicePath, path, mounter) +} + func (ns *NodeServer) nodePublishBlockVolume(volumeID, devicePath, targetPath string, mounter mount.Interface) error { + log := ns.log.WithFields(logrus.Fields{"function": "nodePublishBlockVolume"}) + // we ensure the parent directory exists and is valid - if _, err := ensureMountPoint(filepath.Dir(stagingTargetPath), mounter); err != nil { + if _, err := ensureMountPoint(filepath.Dir(targetPath), mounter); err != nil { return status.Errorf(codes.Internal, errors.Wrapf(err, "failed to prepare mount point for block device %v", devicePath).Error()) } // create file where we can bind mount the device to - if err := makeFile(stagingTargetPath); err != nil { - return status.Errorf(codes.Internal, errors.Wrapf(err, "failed to create file %v", stagingTargetPath).Error()) + if err := makeFile(targetPath); err != nil { + return status.Errorf(codes.Internal, errors.Wrapf(err, "failed to create file %v", targetPath).Error()) } - log.Infof("Bind mounting device %v at %v", devicePath, stagingTargetPath) - if err := mounter.Mount(devicePath, stagingTargetPath, "", []string{"bind"}); err != nil { - if removeErr := os.Remove(stagingTargetPath); removeErr != nil { - return status.Errorf(codes.Internal, errors.Wrapf(removeErr, "failed to remove mount target %q", stagingTargetPath).Error()) + log.Infof("Bind mounting device %v at %v", devicePath, targetPath) + if err := mounter.Mount(devicePath, targetPath, "", []string{"bind"}); err != nil { + if removeErr := os.Remove(targetPath); removeErr != nil { + return status.Errorf(codes.Internal, errors.Wrapf(removeErr, "failed to remove mount target %q", targetPath).Error()) } - return status.Errorf(codes.Internal, errors.Wrapf(err, "failed to bind mount %q at %q", devicePath, stagingTargetPath).Error()) + return status.Errorf(codes.Internal, errors.Wrapf(err, "failed to bind mount %q at %q", devicePath, targetPath).Error()) } return nil } func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) { - log := getLoggerForCSINodeServer() - log = log.WithFields(logrus.Fields{"function": "NodeUnpublishVolume"}) + log := ns.log.WithFields(logrus.Fields{"function": "NodeUnpublishVolume"}) targetPath := req.GetTargetPath() if targetPath == "" { @@ -311,8 +325,7 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu } func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { - log := getLoggerForCSINodeServer() - log = log.WithFields(logrus.Fields{"function": "NodeStageVolume"}) + log := ns.log.WithFields(logrus.Fields{"function": "NodeStageVolume"}) stagingTargetPath := req.GetStagingTargetPath() if stagingTargetPath == "" { @@ -359,7 +372,7 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol if volume.State != string(longhorn.VolumeStateAttached) || volume.Controllers[0].Endpoint == "" { log.Infof("Volume %v hasn't been attached yet, unmounting potential mount point %v", volumeID, stagingTargetPath) if err := unmount(stagingTargetPath, mounter); err != nil { - log.WithError(err).Warn("Failed to unmount stagingTargetPath") + log.WithError(err).Warnf("Failed to unmount stagingTargetPath %v", stagingTargetPath) } return nil, status.Errorf(codes.InvalidArgument, "volume %s hasn't been attached yet", volumeID) } @@ -368,13 +381,6 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol return nil, status.Errorf(codes.Aborted, "volume %s is not ready for workloads", volumeID) } - devicePath := volume.Controllers[0].Endpoint - - // do nothing for block devices, since they are handled by publish - if volumeCapability.GetBlock() != nil { - return &csi.NodeStageVolumeResponse{}, nil - } - if requiresSharedAccess(volume, volumeCapability) && !volume.Migratable { if volume.AccessMode != string(longhorn.AccessModeReadWriteMany) { return nil, status.Errorf(codes.FailedPrecondition, "volume %s requires shared access but is not marked for shared use", volumeID) @@ -399,18 +405,8 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol return &csi.NodeStageVolumeResponse{}, nil } - options := volumeCapability.GetMount().GetMountFlags() - fsType := volumeCapability.GetMount().GetFsType() - if fsType == "" { - fsType = defaultFsType - } - - formatMounter, ok := mounter.(*mount.SafeFormatAndMount) - if !ok { - return nil, status.Errorf(codes.Internal, "volume %v cannot get format mounter that support filesystem %v creation", volumeID, fsType) - } - - diskFormat, err := formatMounter.GetDiskFormat(devicePath) + devicePath := volume.Controllers[0].Endpoint + diskFormat, err := getDiskFormat(devicePath) if err != nil { return nil, status.Errorf(codes.Internal, errors.Wrapf(err, "failed to evaluate device filesystem %v format", devicePath).Error()) } @@ -453,7 +449,27 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol devicePath = cryptoDevice } - if err := ns.nodeStageMountVolume(volumeID, devicePath, targetPath, fsType, options, formatMounter); err != nil { + if volumeCapability.GetBlock() != nil { + if err := ns.nodeStageBlockVolume(volumeID, devicePath, stagingTargetPath, mounter); err != nil { + return nil, err + } + + logrus.Infof("Volume %v device %v available for usage as block device", volumeID, devicePath) + return &csi.NodeStageVolumeResponse{}, nil + } + + options := volumeCapability.GetMount().GetMountFlags() + fsType := volumeCapability.GetMount().GetFsType() + if fsType == "" { + fsType = defaultFsType + } + + formatMounter, ok := mounter.(*mount.SafeFormatAndMount) + if !ok { + return nil, status.Errorf(codes.Internal, "volume %v cannot get format mounter that support filesystem %v creation", volumeID, fsType) + } + + if err := ns.nodeStageMountVolume(volumeID, devicePath, stagingTargetPath, fsType, options, formatMounter); err != nil { return nil, err } @@ -484,8 +500,7 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol } func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) { - log := getLoggerForCSINodeServer() - log = log.WithFields(logrus.Fields{"function": "NodeUnstageVolume"}) + log := ns.log.WithFields(logrus.Fields{"function": "NodeUnstageVolume"}) stagingTargetPath := req.GetStagingTargetPath() if stagingTargetPath == "" { @@ -497,16 +512,29 @@ func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag return nil, status.Error(codes.InvalidArgument, "volume id missing in request") } + mounter := mount.New("") + // CO owns the staging_path so we only unmount but not remove the path - if err := unmount(targetPath, mount.New("")); err != nil { - return nil, status.Error(codes.Internal, fmt.Sprintf("failed to unmount volume %s mount point %v error %v", volumeID, targetPath, err)) + if err := unmount(stagingTargetPath, mounter); err != nil { + return nil, status.Error(codes.Internal, errors.Wrapf(err, "failed to unmount volume %s mount point %v", volumeID, stagingTargetPath).Error()) + } + + // For block mode we use the staging path as parent, so we have to do additional cleanup of the subfolder/files + // we should transition the regular fs mounts to also use the same sub folder, this allows us to store additional + // metadata as well as do more forcefully removals since we no longer share the control of the staging_path with kubernetes + // + // The unmount of the parent is a no op for block mode, this is also important for backwards compatibility of the existing block devices. + deviceFilePath := getStageBlockVolumePath(stagingTargetPath, volumeID) + if err := cleanupMountPoint(deviceFilePath, mounter); err != nil { + return nil, status.Error(codes.Internal, errors.Wrapf(err, "failed to clean up volume %s device mount point %v", volumeID, deviceFilePath).Error()) } // optionally try to retrieve the volume and check if it's an RWX volume // if it is we let the share-manager clean up the crypto device volume, _ := ns.apiClient.Volume.ById(volumeID) - cleanupCryptoDevice := !requiresSharedAccess(volume, nil) + // Currently, only "RWO volumes" and "block device with volume.Migratable is true" supports encryption. + cleanupCryptoDevice := !requiresSharedAccess(volume, nil) if cleanupCryptoDevice { cryptoDevice := crypto.VolumeMapper(volumeID) if isOpen, err := crypto.IsDeviceOpen(cryptoDevice); err != nil { @@ -555,7 +583,7 @@ func (ns *NodeServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVo if isBlockVolume { volCapacity, err := strconv.ParseInt(existVol.Size, 10, 64) if err != nil { - return nil, status.Errorf(codes.Internal, errors.Wrapf(err, "failed to convert volume size %v", existVol.Size).Error()) + return nil, status.Errorf(codes.Internal, errors.Wrapf(err, "failed to convert volume size %v for volume %v", existVol.Size, volumeID).Error()) } return &csi.NodeGetVolumeStatsResponse{ Usage: []*csi.VolumeUsage{ @@ -574,7 +602,7 @@ func (ns *NodeServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVo if errors.Is(err, unix.ENOENT) { return nil, status.Errorf(codes.NotFound, "volume %v is not mounted on path %v", volumeID, volumePath) } - return nil, status.Errorf(codes.Internal, errors.Wrapf(err, "failed to retrieve capacity statistics for volume path %v", volumePath).Error()) + return nil, status.Errorf(codes.Internal, errors.Wrapf(err, "failed to retrieve capacity statistics for volume path %v for volume %v", volumePath, volumeID).Error()) } return &csi.NodeGetVolumeStatsResponse{ @@ -597,8 +625,7 @@ func (ns *NodeServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVo // NodeExpandVolume is designed to expand the file system for ONLINE expansion, func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) { - log := getLoggerForCSINodeServer() - log = log.WithFields(logrus.Fields{"function": "NodeExpandVolume"}) + log := ns.log.WithFields(logrus.Fields{"function": "NodeExpandVolume"}) if req.CapacityRange == nil { return nil, status.Error(codes.InvalidArgument, "capacity range missing in request") diff --git a/csi/util.go b/csi/util.go index 4edc631efb..85253c1ce2 100644 --- a/csi/util.go +++ b/csi/util.go @@ -7,6 +7,7 @@ import ( "io" "os" "path" + "path/filepath" "strconv" "strings" "time" @@ -372,6 +373,11 @@ func isBlockDevice(volumePath string) (bool, error) { return false, nil } +func getDiskFormat(devicePath string) (string, error) { + m := mount.SafeFormatAndMount{Interface: mount.New(""), Exec: utilexec.New()} + return m.GetDiskFormat(devicePath) +} + func getFilesystemStatistics(volumePath string) (*volumeFilesystemStatistics, error) { var statfs unix.Statfs_t // See http://man7.org/linux/man-pages/man2/statfs.2.html for details. @@ -428,3 +434,7 @@ func requiresSharedAccess(vol *longhornclient.Volume, cap *csi.VolumeCapability) mode == csi.VolumeCapability_AccessMode_MULTI_NODE_SINGLE_WRITER || mode == csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER } + +func getStageBlockVolumePath(stagingTargetPath, volumeID string) string { + return filepath.Join(stagingTargetPath, volumeID) +}