Skip to content

Commit

Permalink
Support encrypted volumes in block mode
Browse files Browse the repository at this point in the history
Use the staging path as a parent folder and bind mount device mapper mapped
encrypted volume device into a file inside of that folder.

This allows us to store additional metadata and give us more control over
our tear down for example since we are not sharing control with kubernetes
as is the case for the staging path itself.

Ref: #1613

Longhorn 4883

Signed-off-by: Derek Su <derek.su@suse.com>
  • Loading branch information
derekbit authored and David Ko committed Sep 19, 2023
1 parent cb2d7f0 commit a66539f
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 53 deletions.
133 changes: 80 additions & 53 deletions csi/node_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type NodeServer struct {
apiClient *longhornclient.RancherClient
nodeID string
caps []*csi.NodeServiceCapability
log *logrus.Entry
}

func NewNodeServer(apiClient *longhornclient.RancherClient, nodeID string) *NodeServer {
Expand All @@ -68,17 +69,13 @@ func NewNodeServer(apiClient *longhornclient.RancherClient, nodeID string) *Node
csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
csi.NodeServiceCapability_RPC_EXPAND_VOLUME,
}),
log: logrus.StandardLogger().WithField("component", "csi-node-server"),
}
}

func getLoggerForCSINodeServer() *logrus.Entry {
return logrus.StandardLogger().WithField("component", "csi-node-server")
}

// NodePublishVolume will mount the volume /dev/longhorn/<volume_name> to target_path
func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
log := getLoggerForCSINodeServer()
log = log.WithFields(logrus.Fields{"function": "NodePublishVolume"})
log := ns.log.WithFields(logrus.Fields{"function": "NodePublishVolume"})

targetPath := req.GetTargetPath()
if targetPath == "" {
Expand Down Expand Up @@ -140,7 +137,17 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
}

if volumeCapability.GetBlock() != nil {
devicePath := volume.Controllers[0].Endpoint
devicePath := getStageBlockVolumePath(stagingTargetPath, volumeID)
_, err := os.Stat(devicePath)
if err != nil {
if !os.IsNotExist(err) {
return nil, status.Errorf(codes.Internal, errors.Wrapf(err, "failed to stat device %s", devicePath).Error())
}
// Fall back to the controller endpoint if the device path under the stagingTargetPath doesn't exist
log.Infof("Device path %s doesn't exist, falling back to controller endpoint %s", devicePath, volume.Controllers[0].Endpoint)
devicePath = volume.Controllers[0].Endpoint
}

if err := ns.nodePublishBlockVolume(volumeID, devicePath, targetPath, mounter); err != nil {
log.WithError(err).Errorf("Failed to publish BlockVolume %s", volumeID)
return nil, err
Expand All @@ -152,7 +159,7 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis

// we validate the staging path to make sure the global mount is still valid
if isMnt, err := ensureMountPoint(stagingTargetPath, mounter); err != nil || !isMnt {
msg := fmt.Sprintf("Staging path %v is no longer valid for volume %v", stagingTargetPath, volumeID)
msg := fmt.Sprintf("Staging target path %v is no longer valid for volume %v", stagingTargetPath, volumeID)
log.WithError(err).Error(msg)

// HACK: normally when we return FailedPrecondition below kubelet should call NodeStageVolume again
Expand Down Expand Up @@ -249,12 +256,11 @@ func (ns *NodeServer) nodeStageSharedVolume(volumeID, shareEndpoint, targetPath
}

func (ns *NodeServer) nodeStageMountVolume(volumeID, devicePath, stagingTargetPath, fsType string, mountFlags []string, mounter *mount.SafeFormatAndMount) error {
log := getLoggerForCSINodeServer()
log = log.WithFields(logrus.Fields{"function": "NodePublishVolume"})
log := ns.log.WithFields(logrus.Fields{"function": "NodePublishVolume"})

isMnt, err := ensureMountPoint(stagingTargetPath, mounter)
if err != nil {
return status.Errorf(codes.Internal, errors.Wrapf(err, "failed to prepare mount point for volume %v", volumeID).Error())
return status.Errorf(codes.Internal, errors.Wrapf(err, "failed to prepare mount point %v for volume %v", stagingTargetPath, volumeID).Error())
}
if isMnt {
return nil
Expand All @@ -267,30 +273,38 @@ func (ns *NodeServer) nodeStageMountVolume(volumeID, devicePath, stagingTargetPa
return nil
}

// nodeStageBlockVolume utilizes the stagingTargetPath to create a volumeID file to bind mount the devicePath
// this is valid since the csi plugin is in control of the staging path
func (ns *NodeServer) nodeStageBlockVolume(volumeID, devicePath, stagingTargetPath string, mounter mount.Interface) error {
path := getStageBlockVolumePath(stagingTargetPath, volumeID)
return ns.nodePublishBlockVolume(volumeID, devicePath, path, mounter)
}

func (ns *NodeServer) nodePublishBlockVolume(volumeID, devicePath, targetPath string, mounter mount.Interface) error {
log := ns.log.WithFields(logrus.Fields{"function": "nodePublishBlockVolume"})

// we ensure the parent directory exists and is valid
if _, err := ensureMountPoint(filepath.Dir(stagingTargetPath), mounter); err != nil {
if _, err := ensureMountPoint(filepath.Dir(targetPath), mounter); err != nil {
return status.Errorf(codes.Internal, errors.Wrapf(err, "failed to prepare mount point for block device %v", devicePath).Error())
}

// create file where we can bind mount the device to
if err := makeFile(stagingTargetPath); err != nil {
return status.Errorf(codes.Internal, errors.Wrapf(err, "failed to create file %v", stagingTargetPath).Error())
if err := makeFile(targetPath); err != nil {
return status.Errorf(codes.Internal, errors.Wrapf(err, "failed to create file %v", targetPath).Error())
}

log.Infof("Bind mounting device %v at %v", devicePath, stagingTargetPath)
if err := mounter.Mount(devicePath, stagingTargetPath, "", []string{"bind"}); err != nil {
if removeErr := os.Remove(stagingTargetPath); removeErr != nil {
return status.Errorf(codes.Internal, errors.Wrapf(removeErr, "failed to remove mount target %q", stagingTargetPath).Error())
log.Infof("Bind mounting device %v at %v", devicePath, targetPath)
if err := mounter.Mount(devicePath, targetPath, "", []string{"bind"}); err != nil {
if removeErr := os.Remove(targetPath); removeErr != nil {
return status.Errorf(codes.Internal, errors.Wrapf(removeErr, "failed to remove mount target %q", targetPath).Error())
}
return status.Errorf(codes.Internal, errors.Wrapf(err, "failed to bind mount %q at %q", devicePath, stagingTargetPath).Error())
return status.Errorf(codes.Internal, errors.Wrapf(err, "failed to bind mount %q at %q", devicePath, targetPath).Error())
}
return nil
}

func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
log := getLoggerForCSINodeServer()
log = log.WithFields(logrus.Fields{"function": "NodeUnpublishVolume"})
log := ns.log.WithFields(logrus.Fields{"function": "NodeUnpublishVolume"})

targetPath := req.GetTargetPath()
if targetPath == "" {
Expand All @@ -311,8 +325,7 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
}

func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
log := getLoggerForCSINodeServer()
log = log.WithFields(logrus.Fields{"function": "NodeStageVolume"})
log := ns.log.WithFields(logrus.Fields{"function": "NodeStageVolume"})

stagingTargetPath := req.GetStagingTargetPath()
if stagingTargetPath == "" {
Expand Down Expand Up @@ -359,7 +372,7 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
if volume.State != string(longhorn.VolumeStateAttached) || volume.Controllers[0].Endpoint == "" {
log.Infof("Volume %v hasn't been attached yet, unmounting potential mount point %v", volumeID, stagingTargetPath)
if err := unmount(stagingTargetPath, mounter); err != nil {
log.WithError(err).Warn("Failed to unmount stagingTargetPath")
log.WithError(err).Warnf("Failed to unmount stagingTargetPath %v", stagingTargetPath)
}
return nil, status.Errorf(codes.InvalidArgument, "volume %s hasn't been attached yet", volumeID)
}
Expand All @@ -368,13 +381,6 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
return nil, status.Errorf(codes.Aborted, "volume %s is not ready for workloads", volumeID)
}

devicePath := volume.Controllers[0].Endpoint

// do nothing for block devices, since they are handled by publish
if volumeCapability.GetBlock() != nil {
return &csi.NodeStageVolumeResponse{}, nil
}

if requiresSharedAccess(volume, volumeCapability) && !volume.Migratable {
if volume.AccessMode != string(longhorn.AccessModeReadWriteMany) {
return nil, status.Errorf(codes.FailedPrecondition, "volume %s requires shared access but is not marked for shared use", volumeID)
Expand All @@ -399,18 +405,8 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
return &csi.NodeStageVolumeResponse{}, nil
}

options := volumeCapability.GetMount().GetMountFlags()
fsType := volumeCapability.GetMount().GetFsType()
if fsType == "" {
fsType = defaultFsType
}

formatMounter, ok := mounter.(*mount.SafeFormatAndMount)
if !ok {
return nil, status.Errorf(codes.Internal, "volume %v cannot get format mounter that support filesystem %v creation", volumeID, fsType)
}

diskFormat, err := formatMounter.GetDiskFormat(devicePath)
devicePath := volume.Controllers[0].Endpoint
diskFormat, err := getDiskFormat(devicePath)
if err != nil {
return nil, status.Errorf(codes.Internal, errors.Wrapf(err, "failed to evaluate device filesystem %v format", devicePath).Error())
}
Expand Down Expand Up @@ -453,7 +449,27 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
devicePath = cryptoDevice
}

if err := ns.nodeStageMountVolume(volumeID, devicePath, targetPath, fsType, options, formatMounter); err != nil {
if volumeCapability.GetBlock() != nil {
if err := ns.nodeStageBlockVolume(volumeID, devicePath, stagingTargetPath, mounter); err != nil {
return nil, err
}

logrus.Infof("Volume %v device %v available for usage as block device", volumeID, devicePath)
return &csi.NodeStageVolumeResponse{}, nil
}

options := volumeCapability.GetMount().GetMountFlags()
fsType := volumeCapability.GetMount().GetFsType()
if fsType == "" {
fsType = defaultFsType
}

formatMounter, ok := mounter.(*mount.SafeFormatAndMount)
if !ok {
return nil, status.Errorf(codes.Internal, "volume %v cannot get format mounter that support filesystem %v creation", volumeID, fsType)
}

if err := ns.nodeStageMountVolume(volumeID, devicePath, stagingTargetPath, fsType, options, formatMounter); err != nil {
return nil, err
}

Expand Down Expand Up @@ -484,8 +500,7 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
}

func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
log := getLoggerForCSINodeServer()
log = log.WithFields(logrus.Fields{"function": "NodeUnstageVolume"})
log := ns.log.WithFields(logrus.Fields{"function": "NodeUnstageVolume"})

stagingTargetPath := req.GetStagingTargetPath()
if stagingTargetPath == "" {
Expand All @@ -497,16 +512,29 @@ func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
return nil, status.Error(codes.InvalidArgument, "volume id missing in request")
}

mounter := mount.New("")

// CO owns the staging_path so we only unmount but not remove the path
if err := unmount(targetPath, mount.New("")); err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("failed to unmount volume %s mount point %v error %v", volumeID, targetPath, err))
if err := unmount(stagingTargetPath, mounter); err != nil {
return nil, status.Error(codes.Internal, errors.Wrapf(err, "failed to unmount volume %s mount point %v", volumeID, stagingTargetPath).Error())
}

// For block mode we use the staging path as parent, so we have to do additional cleanup of the subfolder/files
// we should transition the regular fs mounts to also use the same sub folder, this allows us to store additional
// metadata as well as do more forcefully removals since we no longer share the control of the staging_path with kubernetes
//
// The unmount of the parent is a no op for block mode, this is also important for backwards compatibility of the existing block devices.
deviceFilePath := getStageBlockVolumePath(stagingTargetPath, volumeID)
if err := cleanupMountPoint(deviceFilePath, mounter); err != nil {
return nil, status.Error(codes.Internal, errors.Wrapf(err, "failed to clean up volume %s device mount point %v", volumeID, deviceFilePath).Error())
}

// optionally try to retrieve the volume and check if it's an RWX volume
// if it is we let the share-manager clean up the crypto device
volume, _ := ns.apiClient.Volume.ById(volumeID)
cleanupCryptoDevice := !requiresSharedAccess(volume, nil)

// Currently, only "RWO volumes" and "block device with volume.Migratable is true" supports encryption.
cleanupCryptoDevice := !requiresSharedAccess(volume, nil)
if cleanupCryptoDevice {
cryptoDevice := crypto.VolumeMapper(volumeID)
if isOpen, err := crypto.IsDeviceOpen(cryptoDevice); err != nil {
Expand Down Expand Up @@ -555,7 +583,7 @@ func (ns *NodeServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVo
if isBlockVolume {
volCapacity, err := strconv.ParseInt(existVol.Size, 10, 64)
if err != nil {
return nil, status.Errorf(codes.Internal, errors.Wrapf(err, "failed to convert volume size %v", existVol.Size).Error())
return nil, status.Errorf(codes.Internal, errors.Wrapf(err, "failed to convert volume size %v for volume %v", existVol.Size, volumeID).Error())
}
return &csi.NodeGetVolumeStatsResponse{
Usage: []*csi.VolumeUsage{
Expand All @@ -574,7 +602,7 @@ func (ns *NodeServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVo
if errors.Is(err, unix.ENOENT) {
return nil, status.Errorf(codes.NotFound, "volume %v is not mounted on path %v", volumeID, volumePath)
}
return nil, status.Errorf(codes.Internal, errors.Wrapf(err, "failed to retrieve capacity statistics for volume path %v", volumePath).Error())
return nil, status.Errorf(codes.Internal, errors.Wrapf(err, "failed to retrieve capacity statistics for volume path %v for volume %v", volumePath, volumeID).Error())
}

return &csi.NodeGetVolumeStatsResponse{
Expand All @@ -597,8 +625,7 @@ func (ns *NodeServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVo

// NodeExpandVolume is designed to expand the file system for ONLINE expansion,
func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
log := getLoggerForCSINodeServer()
log = log.WithFields(logrus.Fields{"function": "NodeExpandVolume"})
log := ns.log.WithFields(logrus.Fields{"function": "NodeExpandVolume"})

if req.CapacityRange == nil {
return nil, status.Error(codes.InvalidArgument, "capacity range missing in request")
Expand Down
10 changes: 10 additions & 0 deletions csi/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"os"
"path"
"path/filepath"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -372,6 +373,11 @@ func isBlockDevice(volumePath string) (bool, error) {
return false, nil
}

func getDiskFormat(devicePath string) (string, error) {
m := mount.SafeFormatAndMount{Interface: mount.New(""), Exec: utilexec.New()}
return m.GetDiskFormat(devicePath)
}

func getFilesystemStatistics(volumePath string) (*volumeFilesystemStatistics, error) {
var statfs unix.Statfs_t
// See http://man7.org/linux/man-pages/man2/statfs.2.html for details.
Expand Down Expand Up @@ -428,3 +434,7 @@ func requiresSharedAccess(vol *longhornclient.Volume, cap *csi.VolumeCapability)
mode == csi.VolumeCapability_AccessMode_MULTI_NODE_SINGLE_WRITER ||
mode == csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER
}

func getStageBlockVolumePath(stagingTargetPath, volumeID string) string {
return filepath.Join(stagingTargetPath, volumeID)
}

0 comments on commit a66539f

Please sign in to comment.