Skip to content

Commit ce587bc

Browse files
Implement volume health monitoring feature to detect abnormal volumes
1 parent 74c3e9f commit ce587bc

23 files changed

+1054
-60
lines changed

cmd/directpv/node-server.go

+17-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"errors"
2222
"os"
23+
"time"
2324

2425
"github.com/container-storage-interface/spec/lib/go/csi"
2526
"github.com/minio/directpv/pkg/consts"
@@ -33,7 +34,11 @@ import (
3334
"k8s.io/klog/v2"
3435
)
3536

36-
var metricsPort = consts.MetricsPort
37+
var (
38+
metricsPort = consts.MetricsPort
39+
volumeHealthMonitorInterval = 10 * time.Minute
40+
enableVolumeHealthMonitor bool
41+
)
3742

3843
var nodeServerCmd = &cobra.Command{
3944
Use: consts.NodeServerName,
@@ -56,6 +61,8 @@ var nodeServerCmd = &cobra.Command{
5661

5762
func init() {
5863
nodeServerCmd.PersistentFlags().IntVar(&metricsPort, "metrics-port", metricsPort, "Metrics port at "+consts.AppPrettyName+" exports metrics data")
64+
nodeServerCmd.PersistentFlags().BoolVar(&enableVolumeHealthMonitor, "enable-volume-health-monitor", enableVolumeHealthMonitor, "Enable volume health monitoring")
65+
nodeServerCmd.PersistentFlags().DurationVar(&volumeHealthMonitorInterval, "volume-health-monitor-interval", volumeHealthMonitorInterval, "Interval for volume health monitoring in duration. Example: '20m','1h'")
5966
}
6067

6168
func startNodeServer(ctx context.Context) error {
@@ -114,6 +121,15 @@ func startNodeServer(ctx context.Context) error {
114121
}
115122
}()
116123

124+
if enableVolumeHealthMonitor {
125+
go func() {
126+
if err := volume.StartHealthMonitor(ctx, nodeID, volumeHealthMonitorInterval); err != nil {
127+
klog.ErrorS(err, "unable to start volume health monitor")
128+
errCh <- err
129+
}
130+
}()
131+
}
132+
117133
return <-errCh
118134
}
119135

cmd/kubectl-directpv/install.go

+23-17
Original file line numberDiff line numberDiff line change
@@ -40,21 +40,22 @@ import (
4040
)
4141

4242
var (
43-
image = consts.AppName + ":" + Version
44-
registry = "quay.io"
45-
org = "minio"
46-
nodeSelectorArgs = []string{}
47-
tolerationArgs = []string{}
48-
seccompProfile = ""
49-
apparmorProfile = ""
50-
imagePullSecrets = []string{}
51-
nodeSelector map[string]string
52-
tolerations []corev1.Toleration
53-
k8sVersion = "1.27.0"
54-
kubeVersion *version.Version
55-
legacyFlag bool
56-
declarativeFlag bool
57-
openshiftFlag bool
43+
image = consts.AppName + ":" + Version
44+
registry = "quay.io"
45+
org = "minio"
46+
nodeSelectorArgs = []string{}
47+
tolerationArgs = []string{}
48+
seccompProfile = ""
49+
apparmorProfile = ""
50+
imagePullSecrets = []string{}
51+
nodeSelector map[string]string
52+
tolerations []corev1.Toleration
53+
k8sVersion = "1.27.0"
54+
kubeVersion *version.Version
55+
legacyFlag bool
56+
declarativeFlag bool
57+
openshiftFlag bool
58+
enableVolumeHealthMonitor bool
5859
)
5960

6061
var installCmd = &cobra.Command{
@@ -82,7 +83,10 @@ var installCmd = &cobra.Command{
8283
$ kubectl {PLUGIN_NAME} install --apparmor-profile directpv
8384
8485
7. Install DirectPV with seccomp profile
85-
$ kubectl {PLUGIN_NAME} install --seccomp-profile profiles/seccomp.json`,
86+
$ kubectl {PLUGIN_NAME} install --seccomp-profile profiles/seccomp.json
87+
88+
8. Install DirectPV with volume health monitoring enabled
89+
$ kubectl {PLUGIN_NAME} install --enable-volume-health-monitoring`,
8690
`{PLUGIN_NAME}`,
8791
consts.AppName,
8892
),
@@ -123,6 +127,7 @@ func init() {
123127
installCmd.PersistentFlags().BoolVar(&declarativeFlag, "declarative", declarativeFlag, "Output YAML for declarative installation")
124128
installCmd.PersistentFlags().MarkHidden("declarative")
125129
installCmd.PersistentFlags().BoolVar(&openshiftFlag, "openshift", openshiftFlag, "Use OpenShift specific installation")
130+
installCmd.PersistentFlags().BoolVar(&enableVolumeHealthMonitor, "enable-volume-health-monitoring", enableVolumeHealthMonitor, "Enable volume health monitoring")
126131
}
127132

128133
func validateNodeSelectorArgs() error {
@@ -305,8 +310,9 @@ func installMain(ctx context.Context) {
305310
}
306311
}
307312
}
308-
args.Declarative = declarativeFlag
309313
args.Openshift = openshiftFlag
314+
args.Declarative = declarativeFlag
315+
args.EnableVolumeHealthMonitor = enableVolumeHealthMonitor
310316

311317
var failed bool
312318
var installedComponents []installer.Component

cmd/kubectl-directpv/list_volumes.go

+2
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,8 @@ func listVolumesMain(ctx context.Context) {
241241
status = "Released"
242242
case volume.IsDriveLost():
243243
status = "Lost"
244+
case volume.HasError():
245+
status = "Error"
244246
case volume.IsPublished():
245247
status = "Bounded"
246248
}

docs/monitoring.md

+18
Original file line numberDiff line numberDiff line change
@@ -79,3 +79,21 @@ scrape_configs:
7979
action: replace
8080
target_label: kubernetes_name
8181
```
82+
83+
# Volume health monitoring
84+
85+
This is a [CSI feature](https://kubernetes.io/docs/concepts/storage/volume-health-monitoring/) introduced as an Alpha feature in Kubernetes v1.19 and a second Alpha was done in v1.21. This feature is to detect "abnormal" volume conditions and report them as events on PVCs and Pods. A DirectPV volume will be considered as "abnormal" if the respective volume mounts are not present in the host.
86+
87+
For node side monitoring, the feature gate `CSIVolumeHealth` needs to be enabled. However, DirectPV also installs external health monitor controller which monitors and reports volume health events to PVCs.
88+
89+
To enable volume health monitoring, Install directpv with `--enable-volume-health-monitoring` flag.
90+
91+
```sh
92+
kubectl directpv install --enable-volume-health-monitoring
93+
```
94+
95+
For private registries, please note that the following image is required for enabling volume health monitoring
96+
97+
```
98+
quay.io/minio/csi-external-health-monitor-controller:v0.10.0
99+
```

pkg/apis/directpv.min.io/types/types.go

+8-3
Original file line numberDiff line numberDiff line change
@@ -120,23 +120,28 @@ type VolumeConditionType string
120120

121121
// Enum value of VolumeConditionType type.
122122
const (
123-
VolumeConditionTypeLost VolumeConditionType = "Lost"
123+
VolumeConditionTypeLost VolumeConditionType = "Lost"
124+
VolumeConditionTypeError VolumeConditionType = "Error"
124125
)
125126

126127
// VolumeConditionReason denotes volume reason. Allows maximum upto 1024 chars.
127128
type VolumeConditionReason string
128129

129130
// Enum values of VolumeConditionReason type.
130131
const (
131-
VolumeConditionReasonDriveLost VolumeConditionReason = "DriveLost"
132+
VolumeConditionReasonDriveLost VolumeConditionReason = "DriveLost"
133+
VolumeConditionReasonNotMounted VolumeConditionReason = "NotMounted"
134+
VolumeConditionReasonNoError VolumeConditionReason = "NoError"
132135
)
133136

134137
// VolumeConditionMessage denotes drive message. Allows maximum upto 32768 chars.
135138
type VolumeConditionMessage string
136139

137140
// Enum values of VolumeConditionMessage type.
138141
const (
139-
VolumeConditionMessageDriveLost VolumeConditionMessage = "Associated drive was removed. Refer https://github.com/minio/directpv/blob/master/docs/troubleshooting.md"
142+
VolumeConditionMessageDriveLost VolumeConditionMessage = "Associated drive was removed. Refer https://github.com/minio/directpv/blob/master/docs/troubleshooting.md"
143+
VolumeConditionMessageStagingPathNotMounted VolumeConditionMessage = "Staging path is umounted from the host. Please restart the workload"
144+
VolumeConditionMessageTargetPathNotMounted VolumeConditionMessage = "Target path is umounted from the host. Please restart the workload"
140145
)
141146

142147
// DriveConditionType denotes drive condition. Allows maximum upto 316 chars.

pkg/apis/directpv.min.io/v1beta1/volume.go

+25
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121

2222
"github.com/minio/directpv/pkg/apis/directpv.min.io/types"
2323
"github.com/minio/directpv/pkg/consts"
24+
"github.com/minio/directpv/pkg/k8s"
2425
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2526
)
2627

@@ -123,6 +124,12 @@ func (volume DirectPVVolume) IsDriveLost() bool {
123124
return false
124125
}
125126

127+
// HasError returns if the volume is in error state.
128+
func (volume DirectPVVolume) HasError() bool {
129+
condition := k8s.GetConditionByType(volume.Status.Conditions, string(types.VolumeConditionTypeError))
130+
return condition != nil && k8s.ConditionStatusToBool(condition.Status)
131+
}
132+
126133
// SetDriveLost sets associated drive is lost.
127134
func (volume *DirectPVVolume) SetDriveLost() {
128135
c := metav1.Condition{
@@ -316,6 +323,24 @@ func (volume *DirectPVVolume) Resume() bool {
316323
return volume.RemoveLabel(types.SuspendLabelKey)
317324
}
318325

326+
// ResetStageMountErrorCondition resets the stage volume mount error condition.
327+
func (volume *DirectPVVolume) ResetStageMountErrorCondition() {
328+
k8s.ResetConditionIfMatches(volume.Status.Conditions,
329+
string(types.VolumeConditionTypeError),
330+
string(types.VolumeConditionReasonNotMounted),
331+
string(types.VolumeConditionMessageStagingPathNotMounted),
332+
string(types.VolumeConditionReasonNoError))
333+
}
334+
335+
// ResetTargetMountErrorCondition resets the target volume mount error condition.
336+
func (volume *DirectPVVolume) ResetTargetMountErrorCondition() {
337+
k8s.ResetConditionIfMatches(volume.Status.Conditions,
338+
string(types.VolumeConditionTypeError),
339+
string(types.VolumeConditionReasonNotMounted),
340+
string(types.VolumeConditionMessageTargetPathNotMounted),
341+
string(types.VolumeConditionReasonNoError))
342+
}
343+
319344
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
320345

321346
// DirectPVVolumeList denotes list of volumes.

pkg/consts/consts.go

+3
Original file line numberDiff line numberDiff line change
@@ -97,4 +97,7 @@ const (
9797

9898
// TmpFS mount
9999
TmpMountDir = AppRootDir + "/tmp"
100+
101+
// Volume Health Monitor
102+
VolumeHealthMonitorIntervalInDuration = "10m"
100103
)

pkg/consts/consts.go.in

+3
Original file line numberDiff line numberDiff line change
@@ -95,4 +95,7 @@ const (
9595

9696
// TmpFS mount
9797
TmpMountDir = AppRootDir + "/tmp"
98+
99+
// Volume Health Monitor
100+
VolumeHealthMonitorIntervalInDuration = "10m"
98101
)

pkg/csi/controller/server.go

+79-4
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/dustin/go-humanize"
2626
directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types"
2727
"github.com/minio/directpv/pkg/client"
28+
csiutils "github.com/minio/directpv/pkg/csi/utils"
2829
"github.com/minio/directpv/pkg/types"
2930
"google.golang.org/grpc/codes"
3031
"google.golang.org/grpc/status"
@@ -97,6 +98,21 @@ func (c *Server) ControllerGetCapabilities(_ context.Context, _ *csi.ControllerG
9798
Rpc: &csi.ControllerServiceCapability_RPC{Type: csi.ControllerServiceCapability_RPC_EXPAND_VOLUME},
9899
},
99100
},
101+
{
102+
Type: &csi.ControllerServiceCapability_Rpc{
103+
Rpc: &csi.ControllerServiceCapability_RPC{Type: csi.ControllerServiceCapability_RPC_LIST_VOLUMES},
104+
},
105+
},
106+
{
107+
Type: &csi.ControllerServiceCapability_Rpc{
108+
Rpc: &csi.ControllerServiceCapability_RPC{Type: csi.ControllerServiceCapability_RPC_GET_VOLUME},
109+
},
110+
},
111+
{
112+
Type: &csi.ControllerServiceCapability_Rpc{
113+
Rpc: &csi.ControllerServiceCapability_RPC{Type: csi.ControllerServiceCapability_RPC_VOLUME_CONDITION},
114+
},
115+
},
100116
},
101117
}, nil
102118
}
@@ -359,8 +375,52 @@ func (c *Server) ControllerExpandVolume(ctx context.Context, req *csi.Controller
359375

360376
// ListVolumes implements ListVolumes controller RPC
361377
// reference: https://github.com/container-storage-interface/spec/blob/master/spec.md#listvolumes
362-
func (c *Server) ListVolumes(_ context.Context, _ *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
363-
return nil, status.Error(codes.Unimplemented, "unimplemented")
378+
func (c *Server) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
379+
result, err := client.VolumeClient().List(ctx, metav1.ListOptions{
380+
Limit: int64(req.GetMaxEntries()),
381+
Continue: req.GetStartingToken(),
382+
})
383+
if err != nil {
384+
if req.GetStartingToken() != "" {
385+
return nil, status.Errorf(codes.Aborted, "unable to list volumes: %v", err)
386+
}
387+
return nil, status.Errorf(codes.Internal, "unable to list volumes: %v", err)
388+
}
389+
var volumeEntries []*csi.ListVolumesResponse_Entry
390+
for _, volume := range result.Items {
391+
csiVolume, err := getCSIVolume(ctx, &volume)
392+
if err != nil {
393+
return nil, status.Error(codes.Internal, err.Error())
394+
}
395+
volumeEntries = append(volumeEntries, &csi.ListVolumesResponse_Entry{
396+
Volume: csiVolume,
397+
Status: &csi.ListVolumesResponse_VolumeStatus{
398+
VolumeCondition: csiutils.GetCSIVolumeCondition(&volume),
399+
},
400+
})
401+
}
402+
return &csi.ListVolumesResponse{
403+
Entries: volumeEntries,
404+
NextToken: result.Continue,
405+
}, nil
406+
}
407+
408+
func getCSIVolume(ctx context.Context, volume *types.Volume) (*csi.Volume, error) {
409+
drive, err := client.DriveClient().Get(ctx, string(volume.GetDriveID()), metav1.GetOptions{
410+
TypeMeta: types.NewDriveTypeMeta(),
411+
})
412+
if err != nil {
413+
return nil, err
414+
}
415+
return &csi.Volume{
416+
CapacityBytes: volume.Status.TotalCapacity,
417+
VolumeId: volume.Name,
418+
AccessibleTopology: []*csi.Topology{
419+
{
420+
Segments: drive.Status.Topology,
421+
},
422+
},
423+
}, nil
364424
}
365425

366426
// ControllerPublishVolume - controller RPC to publish volumes
@@ -377,8 +437,23 @@ func (c *Server) ControllerUnpublishVolume(_ context.Context, _ *csi.ControllerU
377437

378438
// ControllerGetVolume - controller RPC for get volume
379439
// reference: https://github.com/container-storage-interface/spec/blob/master/spec.md#controllergetvolume
380-
func (c *Server) ControllerGetVolume(_ context.Context, _ *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
381-
return nil, status.Error(codes.Unimplemented, "unimplemented")
440+
func (c *Server) ControllerGetVolume(ctx context.Context, req *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
441+
volume, err := client.VolumeClient().Get(
442+
ctx, req.GetVolumeId(), metav1.GetOptions{TypeMeta: types.NewVolumeTypeMeta()},
443+
)
444+
if err != nil {
445+
return nil, status.Error(codes.NotFound, err.Error())
446+
}
447+
csiVolume, err := getCSIVolume(ctx, volume)
448+
if err != nil {
449+
return nil, status.Error(codes.Internal, err.Error())
450+
}
451+
return &csi.ControllerGetVolumeResponse{
452+
Volume: csiVolume,
453+
Status: &csi.ControllerGetVolumeResponse_VolumeStatus{
454+
VolumeCondition: csiutils.GetCSIVolumeCondition(volume),
455+
},
456+
}, nil
382457
}
383458

384459
// ListSnapshots - controller RPC for listing snapshots

0 commit comments

Comments
 (0)