Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(encrypt): close encrypted volume if it is opened #3140

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions csi/crypto/crypto.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,16 @@ func ResizeEncryptoDevice(volume, passphrase string) error {
return err
}

// IsDeviceMappedToNullPath determines if encrypted device is already open at a null path. The command 'cryptsetup status [crypted_device]' show "device: (null)"
func IsDeviceMappedToNullPath(device string) (bool, error) {
devPath, mappedFile, err := DeviceEncryptionStatus(device)
if err != nil {
return false, err
}

return mappedFile != "" && strings.Compare(devPath, "(null)") == 0, nil
}

// IsDeviceOpen determines if encrypted device is already open.
func IsDeviceOpen(device string) (bool, error) {
_, mappedFile, err := DeviceEncryptionStatus(device)
Expand Down
12 changes: 12 additions & 0 deletions csi/node_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,154 +70,154 @@
}

// NodePublishVolume will mount the volume /dev/longhorn/<volume_name> to target_path
func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
log := ns.log.WithFields(logrus.Fields{"function": "NodePublishVolume"})

log.Infof("NodePublishVolume is called with req %+v", req)

targetPath := req.GetTargetPath()
if targetPath == "" {
return nil, status.Error(codes.InvalidArgument, "target path missing in request")
}

stagingTargetPath := req.GetStagingTargetPath()
if stagingTargetPath == "" {
return nil, status.Error(codes.InvalidArgument, "staging target path missing in request")
}

volumeCapability := req.GetVolumeCapability()
if volumeCapability == nil {
return nil, status.Error(codes.InvalidArgument, "volume capability missing in request")
}

volumeID := req.GetVolumeId()
if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "volume id missing in request")
}

volume, err := ns.apiClient.Volume.ById(volumeID)
if err != nil {
return nil, status.Error(codes.Internal, errors.Wrapf(err, "failed to get volume %s for publishing volume", volumeID).Error())
}
if volume == nil {
return nil, status.Errorf(codes.NotFound, "volume %s not found", volumeID)
}

mounter, err := ns.getMounter(volume, volumeCapability, req.VolumeContext)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

// For mounting volumes, we don't want multiple controllers for a volume, since the filesystem could get messed up
if len(volume.Controllers) == 0 || (len(volume.Controllers) > 1 && volumeCapability.GetBlock() == nil) {
return nil, status.Errorf(codes.InvalidArgument, "volume %s has invalid controller count %v", volumeID, len(volume.Controllers))
}

if volume.DisableFrontend {
return nil, status.Errorf(codes.InvalidArgument, "volume %s frontend is disabled", volumeID)
}

if volume.Frontend != string(longhorn.VolumeFrontendBlockDev) {
return nil, status.Errorf(codes.InvalidArgument, "volume %s has invalid frontend type %v", volumeID, volume.Frontend)
}

// Check volume attachment status
if types.IsDataEngineV1(longhorn.DataEngineType(volume.DataEngine)) {
if volume.State != string(longhorn.VolumeStateAttached) || volume.Controllers[0].Endpoint == "" {
log.WithField("state", volume.State).Infof("Volume %v hasn't been attached yet, unmounting potential mount point %v", volumeID, targetPath)
if err := unmount(targetPath, mounter); err != nil {
log.WithError(err).Warnf("Failed to unmount targetPath %v", targetPath)
}
return nil, status.Errorf(codes.InvalidArgument, "volume %s hasn't been attached yet", volumeID)
}
}

if !volume.Ready {
return nil, status.Errorf(codes.Aborted, "volume %s is not ready for workloads", volumeID)
}

podsStatus := ns.collectWorkloadPodsStatus(volume, log)
if len(podsStatus[corev1.PodPending]) == 0 && len(podsStatus[corev1.PodRunning]) != len(volume.KubernetesStatus.WorkloadsStatus) {
return nil, status.Errorf(codes.Aborted, "no %v workload pods for volume %v to be mounted: %+v", corev1.PodPending, volumeID, podsStatus)
}

// It may be necessary to restage the volume before we can publish it. For example, sometimes kubelet calls
// NodePublishVolume without calling NodeStageVolume. According to the CSI spec, we should be able to respond with
// FailedPrecondition and expect kubelet to call NodeStageVolume again, but as of Kubernetes v1.27 it does not.
isBlock := volumeCapability.GetBlock() != nil

storageNetworkSetting, err := ns.apiClient.Setting.ById(string(types.SettingNameStorageNetwork))
if err != nil {
log.WithError(err).Warnf("Skipping restaging condition check for storage network setting")
}

restageRequired, err := restageRequired(volume, volumeID, stagingTargetPath, mounter, isBlock, storageNetworkSetting.Value != "")
if restageRequired {
msg := fmt.Sprintf("Staging target path %v is no longer valid for volume %v", stagingTargetPath, volumeID)
log.WithError(err).Warn(msg)

log.Warnf("Calling NodeUnstageVolume for volume %v", volumeID)
_, _ = ns.NodeUnstageVolume(ctx, &csi.NodeUnstageVolumeRequest{
VolumeId: volumeID,
StagingTargetPath: stagingTargetPath,
})

log.Warnf("Calling NodeStageVolume for volume %v", volumeID)
_, err := ns.NodeStageVolume(ctx, &csi.NodeStageVolumeRequest{
VolumeId: volumeID,
PublishContext: req.PublishContext,
StagingTargetPath: stagingTargetPath,
VolumeCapability: volumeCapability,
Secrets: req.Secrets,
VolumeContext: req.VolumeContext,
})
if err != nil {
log.WithError(err).Errorf("Failed NodeStageVolume staging path is still in a bad state for volume %v", volumeID)
return nil, status.Error(codes.FailedPrecondition, msg)
}
}

if isBlock {
devicePath := getStageBlockVolumePath(stagingTargetPath, volumeID)
_, err := os.Stat(devicePath)
if err != nil {
if !os.IsNotExist(err) {
return nil, status.Errorf(codes.Internal, errors.Wrapf(err, "failed to stat device %s", devicePath).Error())
}
}

if err := ns.nodePublishBlockVolume(volumeID, devicePath, targetPath, mounter); err != nil {
log.WithError(err).Errorf("Failed to publish BlockVolume %s", volumeID)
return nil, err
}

log.Infof("Published BlockVolume %s", volumeID)
return &csi.NodePublishVolumeResponse{}, nil
}

isMnt, err := ensureMountPoint(targetPath, mounter)
if err != nil {
msg := fmt.Sprintf("Failed to prepare mount point for volume %v error %v", volumeID, err)
log.WithError(err).Error(msg)
return nil, status.Error(codes.Internal, msg)
}
if isMnt {
return &csi.NodePublishVolumeResponse{}, nil
}

mountOptions := []string{"bind"}
if req.GetReadonly() {
mountOptions = append(mountOptions, "ro")
}
mountOptions = append(mountOptions, volumeCapability.GetMount().GetMountFlags()...)

if err := mounter.Mount(stagingTargetPath, targetPath, "", mountOptions); err != nil {
return nil, status.Errorf(codes.Internal, "failed to bind mount volume %v", volumeID)
}

return &csi.NodePublishVolumeResponse{}, nil
}

Check notice on line 220 in csi/node_server.go

View check run for this annotation

codefactor.io / CodeFactor

csi/node_server.go#L73-L220

Complex Method
func (ns *NodeServer) collectWorkloadPodsStatus(volume *longhornclient.Volume, log *logrus.Entry) map[corev1.PodPhase][]string {
podsStatus := map[corev1.PodPhase][]string{}

Expand Down Expand Up @@ -368,188 +368,200 @@
return &csi.NodeUnpublishVolumeResponse{}, nil
}

func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
log := ns.log.WithFields(logrus.Fields{"function": "NodeStageVolume"})

log.Infof("NodeStageVolume is called with req %+v", req)

stagingTargetPath := req.GetStagingTargetPath()
if stagingTargetPath == "" {
return nil, status.Error(codes.InvalidArgument, "staging target path missing in request")
}

volumeCapability := req.GetVolumeCapability()
if volumeCapability == nil {
return nil, status.Error(codes.InvalidArgument, "volume capability missing in request")
}

volumeID := req.GetVolumeId()
if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "volume id missing in request")
}

volume, err := ns.apiClient.Volume.ById(volumeID)
if err != nil {
return nil, status.Error(codes.Internal, errors.Wrapf(err, "failed to get volume %s for staging volume", volumeID).Error())
}
if volume == nil {
return nil, status.Errorf(codes.NotFound, "volume %s not found", volumeID)
}

mounter, err := ns.getMounter(volume, volumeCapability, req.VolumeContext)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

// For mounting volumes, we don't want multiple controllers for a volume, since the filesystem could get messed up
if len(volume.Controllers) == 0 || (len(volume.Controllers) > 1 && volumeCapability.GetBlock() == nil) {
return nil, status.Errorf(codes.InvalidArgument, "volume %s has invalid controller count %v", volumeID, len(volume.Controllers))
}

if volume.DisableFrontend {
return nil, status.Errorf(codes.InvalidArgument, "volume %s frontend is disabled", volumeID)
}

if volume.Frontend != string(longhorn.VolumeFrontendBlockDev) {
return nil, status.Errorf(codes.InvalidArgument, "volume %s has invalid frontend type %v", volumeID, volume.Frontend)
}

// Check volume attachment status
if volume.State != string(longhorn.VolumeStateAttached) || volume.Controllers[0].Endpoint == "" {
log.Infof("Volume %v hasn't been attached yet, unmounting potential mount point %v", volumeID, stagingTargetPath)
if err := unmount(stagingTargetPath, mounter); err != nil {
log.WithError(err).Warnf("Failed to unmount stagingTargetPath %v", stagingTargetPath)
}
return nil, status.Errorf(codes.InvalidArgument, "volume %s hasn't been attached yet", volumeID)
}

if !volume.Ready {
return nil, status.Errorf(codes.Aborted, "volume %s is not ready for workloads", volumeID)
}

if requiresSharedAccess(volume, volumeCapability) && !volume.Migratable {
if volume.AccessMode != string(longhorn.AccessModeReadWriteMany) {
return nil, status.Errorf(codes.FailedPrecondition, "volume %s requires shared access but is not marked for shared use", volumeID)
}

if !isVolumeShareAvailable(volume) {
return nil, status.Errorf(codes.Aborted, "volume %s share not yet available", volumeID)
}

// undocumented field to allow testing different nfs mount options
// this can be used to enable the default host (ubuntu) client async mode
var mountOptions []string
if len(req.VolumeContext["nfsOptions"]) > 0 {
mountOptions = strings.Split(req.VolumeContext["nfsOptions"], ",")
}

if err := ns.nodeStageSharedVolume(volumeID, volume.ShareEndpoint, stagingTargetPath, mounter, mountOptions); err != nil {
return nil, err
}

log.Infof("Mounted shared volume %v on node %v via share endpoint %v", volumeID, ns.nodeID, volume.ShareEndpoint)
return &csi.NodeStageVolumeResponse{}, nil
}

devicePath := volume.Controllers[0].Endpoint
diskFormat, err := getDiskFormat(devicePath)
if err != nil {
return nil, status.Errorf(codes.Internal, errors.Wrapf(err, "failed to evaluate device filesystem %v format", devicePath).Error())
}

log.Infof("Volume %v device %v contains filesystem of format %v", volumeID, devicePath, diskFormat)

if volume.Encrypted {
secrets := req.GetSecrets()
keyProvider := secrets[types.CryptoKeyProvider]
passphrase := secrets[types.CryptoKeyValue]
if keyProvider != "" && keyProvider != "secret" {
return nil, status.Errorf(codes.InvalidArgument, "unsupported key provider %v for encrypted volume %v", keyProvider, volumeID)
}

if len(passphrase) == 0 {
return nil, status.Errorf(codes.InvalidArgument, "missing passphrase for encrypted volume %v", volumeID)
}

if diskFormat != "" && diskFormat != "crypto_LUKS" {
return nil, status.Errorf(codes.InvalidArgument, "unsupported disk encryption format %v", diskFormat)
}

cryptoParams := crypto.NewEncryptParams(keyProvider, secrets[types.CryptoKeyCipher], secrets[types.CryptoKeyHash], secrets[types.CryptoKeySize], secrets[types.CryptoPBKDF])

// initial setup of longhorn device for crypto
if diskFormat == "" {
if err := crypto.EncryptVolume(devicePath, passphrase, cryptoParams); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
}

cryptoDevice := crypto.VolumeMapper(volumeID)
log.Infof("Volume %s requires crypto device %s", volumeID, cryptoDevice)

// check if the crypto device is open at the null path.
// this will happen if the crypto device is not closed properly and a new attaching request is made on the same node.
// reference issue: https://github.com/longhorn/longhorn/issues/9385
if mappedToNullPath, err := crypto.IsDeviceMappedToNullPath(cryptoDevice); err != nil {
return nil, status.Errorf(codes.Internal, "failed to check if the crypto device %s for volume %s is mapped to the null path: %v", cryptoDevice, volumeID, err.Error())
} else if mappedToNullPath {
log.Warnf("Closing active crypto device %s for volume %s since the volume is not closed properly before", cryptoDevice, volumeID)
if err := crypto.CloseVolume(volumeID); err != nil {
derekbit marked this conversation as resolved.
Show resolved Hide resolved
return nil, status.Errorf(codes.Internal, "failed to close active crypto device %s for volume %s: %v ", cryptoDevice, volumeID, err.Error())
}
}

if err := crypto.OpenVolume(volumeID, devicePath, passphrase); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

// update the device path to point to the new crypto device
devicePath = cryptoDevice
}

if volumeCapability.GetBlock() != nil {
if err := ns.nodeStageBlockVolume(volumeID, devicePath, stagingTargetPath, mounter); err != nil {
return nil, err
}

logrus.Infof("Volume %v device %v available for usage as block device", volumeID, devicePath)
return &csi.NodeStageVolumeResponse{}, nil
}

options := volumeCapability.GetMount().GetMountFlags()
fsType := volumeCapability.GetMount().GetFsType()
if fsType == "" {
fsType = defaultFsType
}
if fsType == "xfs" {
// By default, xfs does not allow mounting of two volumes with the same filesystem uuid.
// Force ignore this uuid to be able to mount volume + its clone / restored snapshot on the same node.
options = append(options, "nouuid")
}

formatMounter, ok := mounter.(*mount.SafeFormatAndMount)
if !ok {
return nil, status.Errorf(codes.Internal, "volume %v cannot get format mounter that support filesystem %v creation", volumeID, fsType)
}

if err := ns.nodeStageMountVolume(volumeID, devicePath, stagingTargetPath, fsType, options, formatMounter); err != nil {
return nil, err
}

// check if we need to resize the fs
// this is important since cloned volumes of bigger size don't trigger NodeExpandVolume
// therefore NodeExpandVolume is kind of redundant since we have to do this anyway
// some refs below for more details
// https://github.com/kubernetes/kubernetes/issues/94929
// https://github.com/kubernetes-sigs/aws-ebs-csi-driver/pull/753
resizer := mount.NewResizeFs(utilexec.New())
if needsResize, err := resizer.NeedResize(devicePath, stagingTargetPath); err != nil {
return nil, status.Error(codes.Internal, err.Error())
} else if needsResize {
if resized, err := resizer.Resize(devicePath, stagingTargetPath); err != nil {
log.WithError(err).Errorf("Mounted volume %v on node %v failed required filesystem resize", volumeID, ns.nodeID)
return nil, status.Error(codes.Internal, err.Error())
} else if resized {
log.Infof("Mounted volume %v on node %v successfully resized filesystem after mount", volumeID, ns.nodeID)
} else {
log.Infof("Mounted volume %v on node %v already has correct filesystem size", volumeID, ns.nodeID)
}
} else {
log.Infof("Mounted volume %v on node %v does not require filesystem resize", volumeID, ns.nodeID)
}

log.Infof("Mounted volume %v on node %v via device %v", volumeID, ns.nodeID, devicePath)
return &csi.NodeStageVolumeResponse{}, nil
}

Check notice on line 564 in csi/node_server.go

View check run for this annotation

codefactor.io / CodeFactor

csi/node_server.go#L371-L564

Complex Method
func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
log := ns.log.WithFields(logrus.Fields{"function": "NodeUnstageVolume"})

Expand Down Expand Up @@ -679,110 +691,110 @@
}

// NodeExpandVolume is designed to expand the file system for ONLINE expansion,
func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
log := ns.log.WithFields(logrus.Fields{"function": "NodeExpandVolume"})

log.Infof("NodeNodeExpandVolume is called with req %+v", req)

if req.CapacityRange == nil {
return nil, status.Error(codes.InvalidArgument, "capacity range missing in request")
}
requestedSize := req.CapacityRange.GetRequiredBytes()

volumeCapability := req.GetVolumeCapability()
if volumeCapability == nil {
return nil, status.Error(codes.InvalidArgument, "volume capability missing in request")
}

volumeID := req.GetVolumeId()
if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "volume id missing in request")
}

if req.VolumeCapability.GetBlock() != nil {
log.Infof("Volume %v on node %v does not require filesystem resize/node expansion since it is access mode Block", volumeID, ns.nodeID)
return &csi.NodeExpandVolumeResponse{}, nil
}

volume, err := ns.apiClient.Volume.ById(volumeID)
if err != nil {
return nil, status.Errorf(codes.Internal, err.Error())
}
if volume == nil {
return nil, status.Errorf(codes.NotFound, "volume %s missing", volumeID)
}
if len(volume.Controllers) != 1 {
return nil, status.Errorf(codes.InvalidArgument, "invalid controller count %v for volume %v node expansion", len(volume.Controllers), volumeID)
}
if volume.State != string(longhorn.VolumeStateAttached) {
return nil, status.Errorf(codes.FailedPrecondition, "invalid state %v for volume %v node expansion", volume.State, volumeID)
}
devicePath := volume.Controllers[0].Endpoint

mounter := &mount.SafeFormatAndMount{Interface: mount.New(""), Exec: utilexec.New()}
diskFormat, err := mounter.GetDiskFormat(devicePath)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to evaluate device filesystem format for volume %v node expansion", volumeID)
}
if diskFormat == "" {
return nil, fmt.Errorf("unknown filesystem type for volume %v node expansion", volumeID)
}

devicePath, err = func() (string, error) {
if !volume.Encrypted {
return devicePath, nil
}
if diskFormat != "crypto_LUKS" {
return "", status.Errorf(codes.InvalidArgument, "unsupported disk encryption format %v", diskFormat)
}
devicePath = crypto.VolumeMapper(volumeID)

// Need to enable feature gate in v1.25:
// https://github.com/kubernetes/enhancements/issues/3107
// https://kubernetes.io/blog/2022/09/21/kubernetes-1-25-use-secrets-while-expanding-csi-volumes-on-node-alpha/
secrets := req.GetSecrets()
if len(secrets) == 0 {
log.Infof("Skip encrypto device resizing for volume %v node expansion since the secret empty, maybe the related feature gate is not enabled", volumeID)
return devicePath, nil
}
keyProvider := secrets[types.CryptoKeyProvider]
passphrase := secrets[types.CryptoKeyValue]
if keyProvider != "" && keyProvider != "secret" {
return "", status.Errorf(codes.InvalidArgument, "unsupported key provider %v for encrypted volume %v", keyProvider, volumeID)
}
if len(passphrase) == 0 {
return "", status.Errorf(codes.InvalidArgument, "missing passphrase for encrypted volume %v", volumeID)
}

// blindly resize the encrypto device
if err := crypto.ResizeEncryptoDevice(volumeID, passphrase); err != nil {
return "", status.Errorf(codes.InvalidArgument, errors.Wrapf(err, "failed to resize crypto device %v for volume %v node expansion", devicePath, volumeID).Error())
}

return devicePath, nil
}()
if err != nil {
return nil, err
}

resizer := mount.NewResizeFs(utilexec.New())
if needsResize, err := resizer.NeedResize(devicePath, req.StagingTargetPath); err != nil {
return nil, status.Error(codes.Internal, err.Error())
} else if needsResize {
if resized, err := resizer.Resize(devicePath, req.StagingTargetPath); err != nil {
return nil, status.Error(codes.Internal, err.Error())
} else if resized {
log.Infof("Volume %v on node %v successfully resized filesystem after mount", volumeID, ns.nodeID)
} else {
log.Infof("Volume %v on node %v already has correct filesystem size", volumeID, ns.nodeID)
}
} else {
log.Infof("Volume %v on node %v does not require filesystem resize", volumeID, ns.nodeID)
}

return &csi.NodeExpandVolumeResponse{CapacityBytes: requestedSize}, nil
}

Check notice on line 797 in csi/node_server.go

View check run for this annotation

codefactor.io / CodeFactor

csi/node_server.go#L694-L797

Complex Method
func (ns *NodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
return &csi.NodeGetInfoResponse{
NodeId: ns.nodeID,
Expand Down