From 2cef47a9b7cfd1852f91088951a9da0de37243b4 Mon Sep 17 00:00:00 2001
From: parth-gr Mirroring configuration of CephBlockPoolRadosNamespace
+
@@ -3224,6 +3238,20 @@ string
the CephBlockPool CR.
+
+mirroring
+
+
+RadosNamespaceMirroring
+
+
+
+(Optional)
+
+
mirroring
Mirroring configuration of CephBlockPoolRadosNamespace
++(Appears on:CephBlockPoolRadosNamespaceSpec) +
+RadosNamespaceMirroring represents the mirroring configuration of CephBlockPoolRadosNamespace
+Field | +Description | +
---|---|
+remoteNamespace + +string + + |
+
+(Optional)
+ RemoteNamespace is the name of the CephBlockPoolRadosNamespace on the secondary cluster CephBlockPool + |
+
+mode + + +RadosNamespaceMirroringMode + + + |
+
+ Mode is the mirroring mode; either pool or image + |
+
+snapshotSchedules + + +[]SnapshotScheduleSpec + + + |
+
+(Optional)
+ SnapshotSchedules is the scheduling of snapshot for mirrored images + |
+
string
alias)+(Appears on:RadosNamespaceMirroring) +
+RadosNamespaceMirroringMode represents the mode of the RadosNamespace
+Value | +Description | +
---|---|
"image" |
+RadosNamespaceMirroringModeImage represents the image mode + |
+
"pool" |
+RadosNamespaceMirroringModePool represents the pool mode + |
+
@@ -12152,7 +12260,7 @@ string
-(Appears on:FSMirroringSpec, MirroringSpec) +(Appears on:FSMirroringSpec, MirroringSpec, RadosNamespaceMirroring)
SnapshotScheduleSpec represents the snapshot scheduling settings of a mirrored pool
diff --git a/PendingReleaseNotes.md b/PendingReleaseNotes.md index 779cd62017d5..e624b80e49c6 100644 --- a/PendingReleaseNotes.md +++ b/PendingReleaseNotes.md @@ -4,3 +4,5 @@ ## Features + +- Enable mirroring for CephBlockPoolRadosNamespaces (see [#14701](https://github.com/rook/rook/pull/14701)). diff --git a/deploy/charts/rook-ceph/templates/resources.yaml b/deploy/charts/rook-ceph/templates/resources.yaml index 0092ad799fad..dd46ce6dd046 100644 --- a/deploy/charts/rook-ceph/templates/resources.yaml +++ b/deploy/charts/rook-ceph/templates/resources.yaml @@ -60,6 +60,38 @@ spec: x-kubernetes-validations: - message: blockPoolName is immutable rule: self == oldSelf + mirroring: + description: Mirroring configuration of CephBlockPoolRadosNamespace + properties: + mode: + description: Mode is the mirroring mode; either pool or image + enum: + - "" + - pool + - image + type: string + remoteNamespace: + description: RemoteNamespace is the name of the CephBlockPoolRadosNamespace on the secondary cluster CephBlockPool + type: string + snapshotSchedules: + description: SnapshotSchedules is the scheduling of snapshot for mirrored images + items: + description: SnapshotScheduleSpec represents the snapshot scheduling settings of a mirrored pool + properties: + interval: + description: Interval represent the periodicity of the snapshot. + type: string + path: + description: Path is the path to snapshot, only valid for CephFS + type: string + startTime: + description: StartTime indicates when to start the snapshot + type: string + type: object + type: array + required: + - mode + type: object name: description: The name of the CephBlockPoolRadosNamespaceSpec namespace. If not set, the default is the name of the CR. type: string diff --git a/deploy/examples/crds.yaml b/deploy/examples/crds.yaml index aa8759c17784..6b5ea0f6974d 100644 --- a/deploy/examples/crds.yaml +++ b/deploy/examples/crds.yaml @@ -63,6 +63,38 @@ spec: x-kubernetes-validations: - message: blockPoolName is immutable rule: self == oldSelf + mirroring: + description: Mirroring configuration of CephBlockPoolRadosNamespace + properties: + mode: + description: Mode is the mirroring mode; either pool or image + enum: + - "" + - pool + - image + type: string + remoteNamespace: + description: RemoteNamespace is the name of the CephBlockPoolRadosNamespace on the secondary cluster CephBlockPool + type: string + snapshotSchedules: + description: SnapshotSchedules is the scheduling of snapshot for mirrored images + items: + description: SnapshotScheduleSpec represents the snapshot scheduling settings of a mirrored pool + properties: + interval: + description: Interval represent the periodicity of the snapshot. + type: string + path: + description: Path is the path to snapshot, only valid for CephFS + type: string + startTime: + description: StartTime indicates when to start the snapshot + type: string + type: object + type: array + required: + - mode + type: object name: description: The name of the CephBlockPoolRadosNamespaceSpec namespace. If not set, the default is the name of the CR. type: string diff --git a/deploy/examples/radosnamesapce-mirrored.yaml b/deploy/examples/radosnamesapce-mirrored.yaml new file mode 100644 index 000000000000..57d4900939eb --- /dev/null +++ b/deploy/examples/radosnamesapce-mirrored.yaml @@ -0,0 +1,20 @@ +--- +apiVersion: ceph.rook.io/v1 +kind: CephBlockPoolRadosNamespace +metadata: + name: namespace-a + namespace: rook-ceph # namespace:cluster +spec: + # The name of the RADOS namespace. If not set, the default is the name of the CR. + # name: namespace-a + # blockPoolName is the name of the CephBlockPool CR where the namespace will be created. + blockPoolName: replicapool + mirroring: + remoteNamespace: namespace-a + # mirroring mode: pool level or per image + # for more details see: https://docs.ceph.com/docs/master/rbd/rbd-mirroring/#enable-mirroring + mode: image + # specify the schedule(s) on which snapshots should be taken + snapshotSchedules: + - interval: 24h # daily snapshots + startTime: 14:00:00-05:00 diff --git a/pkg/apis/ceph.rook.io/v1/types.go b/pkg/apis/ceph.rook.io/v1/types.go index d06c20081f95..19da6a60712e 100755 --- a/pkg/apis/ceph.rook.io/v1/types.go +++ b/pkg/apis/ceph.rook.io/v1/types.go @@ -3339,6 +3339,29 @@ type CephBlockPoolRadosNamespaceList struct { Items []CephBlockPoolRadosNamespace `json:"items"` } +// RadosNamespaceMirroring represents the mirroring configuration of CephBlockPoolRadosNamespace +type RadosNamespaceMirroring struct { + // RemoteNamespace is the name of the CephBlockPoolRadosNamespace on the secondary cluster CephBlockPool + // +optional + RemoteNamespace *string `json:"remoteNamespace"` + // Mode is the mirroring mode; either pool or image + // +kubebuilder:validation:Enum="";pool;image + Mode RadosNamespaceMirroringMode `json:"mode"` + // SnapshotSchedules is the scheduling of snapshot for mirrored images + // +optional + SnapshotSchedules []SnapshotScheduleSpec `json:"snapshotSchedules,omitempty"` +} + +// RadosNamespaceMirroringMode represents the mode of the RadosNamespace +type RadosNamespaceMirroringMode string + +const ( + // RadosNamespaceMirroringModePool represents the pool mode + RadosNamespaceMirroringModePool RadosNamespaceMirroringMode = "pool" + // RadosNamespaceMirroringModeImage represents the image mode + RadosNamespaceMirroringModeImage RadosNamespaceMirroringMode = "image" +) + // CephBlockPoolRadosNamespaceSpec represents the specification of a CephBlockPool Rados Namespace type CephBlockPoolRadosNamespaceSpec struct { // The name of the CephBlockPoolRadosNamespaceSpec namespace. If not set, the default is the name of the CR. @@ -3349,6 +3372,9 @@ type CephBlockPoolRadosNamespaceSpec struct { // the CephBlockPool CR. // +kubebuilder:validation:XValidation:message="blockPoolName is immutable",rule="self == oldSelf" BlockPoolName string `json:"blockPoolName"` + // Mirroring configuration of CephBlockPoolRadosNamespace + // +optional + Mirroring *RadosNamespaceMirroring `json:"mirroring,omitempty"` } // CephBlockPoolRadosNamespaceStatus represents the Status of Ceph BlockPool diff --git a/pkg/apis/ceph.rook.io/v1/zz_generated.deepcopy.go b/pkg/apis/ceph.rook.io/v1/zz_generated.deepcopy.go index 6ac445966598..e653d3869f9b 100644 --- a/pkg/apis/ceph.rook.io/v1/zz_generated.deepcopy.go +++ b/pkg/apis/ceph.rook.io/v1/zz_generated.deepcopy.go @@ -391,7 +391,7 @@ func (in *CephBlockPoolRadosNamespace) DeepCopyInto(out *CephBlockPoolRadosNames *out = *in out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) - out.Spec = in.Spec + in.Spec.DeepCopyInto(&out.Spec) if in.Status != nil { in, out := &in.Status, &out.Status *out = new(CephBlockPoolRadosNamespaceStatus) @@ -454,6 +454,11 @@ func (in *CephBlockPoolRadosNamespaceList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *CephBlockPoolRadosNamespaceSpec) DeepCopyInto(out *CephBlockPoolRadosNamespaceSpec) { *out = *in + if in.Mirroring != nil { + in, out := &in.Mirroring, &out.Mirroring + *out = new(RadosNamespaceMirroring) + (*in).DeepCopyInto(*out) + } return } @@ -1011,6 +1016,22 @@ func (in *CephDaemonsVersions) DeepCopy() *CephDaemonsVersions { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *CephExporterSpec) DeepCopyInto(out *CephExporterSpec) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CephExporterSpec. +func (in *CephExporterSpec) DeepCopy() *CephExporterSpec { + if in == nil { + return nil + } + out := new(CephExporterSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *CephFilesystem) DeepCopyInto(out *CephFilesystem) { *out = *in @@ -3249,6 +3270,11 @@ func (in *MonitoringSpec) DeepCopyInto(out *MonitoringSpec) { *out = new(metav1.Duration) **out = **in } + if in.Exporter != nil { + in, out := &in.Exporter, &out.Exporter + *out = new(CephExporterSpec) + **out = **in + } return } @@ -4313,6 +4339,32 @@ func (in *RGWServiceSpec) DeepCopy() *RGWServiceSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RadosNamespaceMirroring) DeepCopyInto(out *RadosNamespaceMirroring) { + *out = *in + if in.RemoteNamespace != nil { + in, out := &in.RemoteNamespace, &out.RemoteNamespace + *out = new(string) + **out = **in + } + if in.SnapshotSchedules != nil { + in, out := &in.SnapshotSchedules, &out.SnapshotSchedules + *out = make([]SnapshotScheduleSpec, len(*in)) + copy(*out, *in) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RadosNamespaceMirroring. +func (in *RadosNamespaceMirroring) DeepCopy() *RadosNamespaceMirroring { + if in == nil { + return nil + } + out := new(RadosNamespaceMirroring) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ReadAffinitySpec) DeepCopyInto(out *ReadAffinitySpec) { *out = *in diff --git a/pkg/daemon/ceph/client/mirror.go b/pkg/daemon/ceph/client/mirror.go index 438f03d3db68..30b4f3c2486f 100644 --- a/pkg/daemon/ceph/client/mirror.go +++ b/pkg/daemon/ceph/client/mirror.go @@ -26,6 +26,7 @@ import ( "github.com/pkg/errors" cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1" "github.com/rook/rook/pkg/clusterd" + cephver "github.com/rook/rook/pkg/operator/ceph/version" "k8s.io/apimachinery/pkg/util/sets" ) @@ -50,8 +51,9 @@ type Images struct { } var ( - rbdMirrorPeerCaps = []string{"mon", "profile rbd-mirror-peer", "osd", "profile rbd"} - rbdMirrorPeerKeyringID = "rbd-mirror-peer" + rbdMirrorPeerCaps = []string{"mon", "profile rbd-mirror-peer", "osd", "profile rbd"} + rbdMirrorPeerKeyringID = "rbd-mirror-peer" + radosNamespaceMirroringMinimumVersion = cephver.CephVersion{Major: 20, Minor: 0, Extra: 0} ) // ImportRBDMirrorBootstrapPeer add a mirror peer in the rbd-mirror configuration @@ -278,8 +280,8 @@ func removeSnapshotSchedule(context *clusterd.Context, clusterInfo *ClusterInfo, return nil } -func enableSnapshotSchedules(context *clusterd.Context, clusterInfo *ClusterInfo, pool cephv1.NamedPoolSpec) error { - logger.Info("resetting current snapshot schedules") +func EnableSnapshotSchedules(context *clusterd.Context, clusterInfo *ClusterInfo, pool string, snapshotSchedules []cephv1.SnapshotScheduleSpec) error { + logger.Info("resetting current snapshot schedules in cluster namespace %q", clusterInfo.Namespace) // Reset any existing schedules err := removeSnapshotSchedules(context, clusterInfo, pool) if err != nil { @@ -287,8 +289,8 @@ func enableSnapshotSchedules(context *clusterd.Context, clusterInfo *ClusterInfo } // Enable all the snap schedules - for _, snapSchedule := range pool.Mirroring.SnapshotSchedules { - err := enableSnapshotSchedule(context, clusterInfo, snapSchedule, pool.Name) + for _, snapSchedule := range snapshotSchedules { + err := enableSnapshotSchedule(context, clusterInfo, snapSchedule, pool) if err != nil { return errors.Wrap(err, "failed to enable snapshot schedule") } @@ -298,16 +300,16 @@ func enableSnapshotSchedules(context *clusterd.Context, clusterInfo *ClusterInfo } // removeSnapshotSchedules removes all the existing snapshot schedules -func removeSnapshotSchedules(context *clusterd.Context, clusterInfo *ClusterInfo, pool cephv1.NamedPoolSpec) error { +func removeSnapshotSchedules(context *clusterd.Context, clusterInfo *ClusterInfo, pool string) error { // Get the list of existing snapshot schedule - existingSnapshotSchedules, err := listSnapshotSchedules(context, clusterInfo, pool.Name) + existingSnapshotSchedules, err := listSnapshotSchedules(context, clusterInfo, pool) if err != nil { return errors.Wrap(err, "failed to list snapshot schedule(s)") } // Remove each schedule for _, existingSnapshotSchedule := range existingSnapshotSchedules { - err := removeSnapshotSchedule(context, clusterInfo, existingSnapshotSchedule, pool.Name) + err := removeSnapshotSchedule(context, clusterInfo, existingSnapshotSchedule, pool) if err != nil { return errors.Wrapf(err, "failed to remove snapshot schedule %v", existingSnapshotSchedule) } @@ -362,6 +364,45 @@ func ListSnapshotSchedulesRecursively(context *clusterd.Context, clusterInfo *Cl return snapshotSchedulesRecursive, nil } +// EnableRBDRadosNamespaceMirroring enables rbd mirroring on a rados namespace. +func EnableRBDRadosNamespaceMirroring(context *clusterd.Context, clusterInfo *ClusterInfo, poolAndRadosNamespaceName string, remoteNamespace *string, mode string) error { + logger.Infof("enable mirroring in rados namespace %s in k8s namespace %q", poolAndRadosNamespaceName, clusterInfo.Namespace) + + // remove the check when the min supported version is 20.0.0 + if !clusterInfo.CephVersion.IsAtLeast(radosNamespaceMirroringMinimumVersion) { + return errors.Errorf("ceph version %q does not support mirroring in rados namespace %q with --remote-namespace flag, supported version are v20 and above.", clusterInfo.CephVersion.String(), poolAndRadosNamespaceName) + } + + args := []string{"mirror", "pool", "enable", poolAndRadosNamespaceName, mode} + if remoteNamespace != nil { + args = []string{"mirror", "pool", "enable", poolAndRadosNamespaceName, mode, "--remote-namespace", *remoteNamespace} + } + + cmd := NewRBDCommand(context, clusterInfo, args) + cmd.JsonOutput = false + output, err := cmd.Run() + if err != nil { + return errors.Wrapf(err, "failed to enable mirroring in rados namespace %s with mode %s. %s", poolAndRadosNamespaceName, mode, output) + } + + logger.Infof("successfully enabled mirroring in rados namespace %s in k8s namespace %q", poolAndRadosNamespaceName, clusterInfo.Namespace) + return nil +} + +func DisableRBDRadosNamespaceMirroring(context *clusterd.Context, clusterInfo *ClusterInfo, poolAndRadosNamespaceName string) error { + logger.Infof("disable mirroring in rados namespace %s in k8s namespace %q", poolAndRadosNamespaceName, clusterInfo.Namespace) + args := []string{"mirror", "pool", "disable", poolAndRadosNamespaceName} + cmd := NewRBDCommand(context, clusterInfo, args) + cmd.JsonOutput = false + output, err := cmd.Run() + if err != nil { + return errors.Wrapf(err, "failed to disable mirroring in rados namespace %s. %s", poolAndRadosNamespaceName, output) + } + + logger.Infof("successfully disabled mirroring in rados namespace %s in k8s namespace %q", poolAndRadosNamespaceName, clusterInfo.Namespace) + return nil +} + /* CreateRBDMirrorBootstrapPeerWithoutPool creates a bootstrap peer for the current cluster diff --git a/pkg/daemon/ceph/client/mirror_test.go b/pkg/daemon/ceph/client/mirror_test.go index 22580cf7dfeb..2e856fd8fa20 100644 --- a/pkg/daemon/ceph/client/mirror_test.go +++ b/pkg/daemon/ceph/client/mirror_test.go @@ -331,7 +331,7 @@ func TestRemoveSnapshotSchedules(t *testing.T) { }, }, } - err := removeSnapshotSchedules(context, AdminTestClusterInfo("mycluster"), pool) + err := removeSnapshotSchedules(context, AdminTestClusterInfo("mycluster"), pool.Name) assert.NoError(t, err) } diff --git a/pkg/daemon/ceph/client/pool.go b/pkg/daemon/ceph/client/pool.go index 82eae5e77eb9..c0dbf4dc6c3e 100644 --- a/pkg/daemon/ceph/client/pool.go +++ b/pkg/daemon/ceph/client/pool.go @@ -327,7 +327,7 @@ func setCommonPoolProperties(context *clusterd.Context, clusterInfo *ClusterInfo // Schedule snapshots if pool.Mirroring.SnapshotSchedulesEnabled() { - err = enableSnapshotSchedules(context, clusterInfo, pool) + err = EnableSnapshotSchedules(context, clusterInfo, pool.Name, pool.Mirroring.SnapshotSchedules) if err != nil { return errors.Wrapf(err, "failed to enable snapshot scheduling for pool %q", pool.Name) } diff --git a/pkg/daemon/ceph/client/radosnamespace.go b/pkg/daemon/ceph/client/radosnamespace.go index 73fe28b806cb..bb6b185a888d 100644 --- a/pkg/daemon/ceph/client/radosnamespace.go +++ b/pkg/daemon/ceph/client/radosnamespace.go @@ -104,3 +104,37 @@ func DeleteRadosNamespace(context *clusterd.Context, clusterInfo *ClusterInfo, p logger.Infof("successfully deleted rados namespace %s/%s in k8s namespace %q", poolName, namespaceName, clusterInfo.Namespace) return nil } + +// ListRadosNamespacesInPool lists the rados namespaces in a pool +func ListRadosNamespacesInPool(context *clusterd.Context, clusterInfo *ClusterInfo, poolName string) ([]string, error) { + // Build command + args := []string{"namespace", "list", poolName} + // sample output: [{"name":"abc"},{"name":"abc1"},{"name":"abc3"}] + cmd := NewRBDCommand(context, clusterInfo, args) + cmd.JsonOutput = true + + // Run command + buf, err := cmd.Run() + if err != nil { + return nil, errors.Wrapf(err, "failed to retrieve rados namespaces on pool %q. %s", poolName, string(buf)) + } + + type radosNamespace struct { + Name string `json:"name,omitempty"` + } + + // Unmarshal JSON into Go struct + var namespaces []radosNamespace + + if err := json.Unmarshal([]byte(buf), &namespaces); err != nil { + return nil, errors.Wrap(err, "failed to unmarshal rados namespaces list response") + } + + logger.Debugf("successfully listed rados namespaces for pool %q running in ceph cluster namespace %q", poolName, clusterInfo.Namespace) + + var namespacesList []string + for _, namespace := range namespaces { + namespacesList = append(namespacesList, namespace.Name) + } + return namespacesList, nil +} diff --git a/pkg/operator/ceph/pool/controller.go b/pkg/operator/ceph/pool/controller.go index 24d080ebf644..d4c5ba37c662 100644 --- a/pkg/operator/ceph/pool/controller.go +++ b/pkg/operator/ceph/pool/controller.go @@ -341,9 +341,9 @@ func (r *ReconcileCephBlockPool) reconcile(request reconcile.Request) (reconcile // If not mirrored there is no Status Info field to fulfil } else { // disable mirroring - err := r.disableMirroring(poolSpec.Name) + err = r.disableMirroring(poolSpec.Name) if err != nil { - logger.Warningf("failed to disable mirroring on pool %q. %v", poolSpec.Name, err) + logger.Warningf("failed to disable mirroring on pool %q running in ceph cluster namespace %q. %v", poolSpec.Name, r.clusterInfo.Namespace, err) } // update ObservedGeneration in status at the end of reconcile // Set Ready status, we are done reconciling @@ -508,6 +508,18 @@ func (r *ReconcileCephBlockPool) disableMirroring(pool string) error { if err != nil { return errors.Wrapf(err, "failed to get mirroring info for the pool %q", pool) } + if mirrorInfo.Mode == "disabled" { + return nil + } + + mirroringEnabled, err := r.isAnyRadosNamespaceMirrored(pool) + if err != nil { + return errors.Wrap(err, "failed to check if any rados namespace is mirrored") + } + if mirroringEnabled { + logger.Debugf("disabling mirroring on pool %q is not possible. There are mirrored rados namespaces in the pool running in ceph cluster namespace %q", pool, r.clusterInfo.Namespace) + return errors.New("mirroring must be disabled in all radosnamespaces in the pool before disabling mirroring in the pool") + } if mirrorInfo.Mode == "image" { mirroredPools, err := cephclient.GetMirroredPoolImages(r.context, r.clusterInfo, pool) @@ -542,3 +554,27 @@ func (r *ReconcileCephBlockPool) disableMirroring(pool string) error { return nil } + +func (r *ReconcileCephBlockPool) isAnyRadosNamespaceMirrored(poolName string) (bool, error) { + logger.Debugf("list rados namespace in pool %q running in ceph cluster namespace %q", poolName, r.clusterInfo.Namespace) + + list, err := cephclient.ListRadosNamespacesInPool(r.context, r.clusterInfo, poolName) + if err != nil { + return false, errors.Wrapf(err, "failed to list rados namespace in pool %q", poolName) + } + logger.Debugf("rados namespace list %v in pool %q running in ceph cluster namespace %q", list, poolName, r.clusterInfo.Namespace) + + for _, namespace := range list { + poolAndRadosNamespaceName := fmt.Sprintf("%s/%s", poolName, namespace) + mirrorInfo, err := cephclient.GetPoolMirroringInfo(r.context, r.clusterInfo, poolAndRadosNamespaceName) + if err != nil { + return false, errors.Wrapf(err, "failed to get mirroring info for the rados namespace %q", poolAndRadosNamespaceName) + } + logger.Debugf("mirroring info for the rados namespace %q running in ceph cluster namespace %q: %v", poolAndRadosNamespaceName, r.clusterInfo.Namespace, mirrorInfo) + if mirrorInfo.Mode != "disabled" { + return true, nil + } + } + + return false, nil +} diff --git a/pkg/operator/ceph/pool/controller_test.go b/pkg/operator/ceph/pool/controller_test.go index 29a277370917..db18b0c1930d 100644 --- a/pkg/operator/ceph/pool/controller_test.go +++ b/pkg/operator/ceph/pool/controller_test.go @@ -545,6 +545,66 @@ func TestCephBlockPoolController(t *testing.T) { }) } +func TestIsAnyRadosNamespaceMirrored(t *testing.T) { + pool := "test" + object := []runtime.Object{} + // Register operator types with the runtime scheme. + s := scheme.Scheme + cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(object...).Build() + executor := &exectest.MockExecutor{} + c := &clusterd.Context{ + Executor: executor, + Clientset: testop.New(t, 1), + RookClientset: rookclient.NewSimpleClientset(), + } + // Create a ReconcileCephBlockPool object with the scheme and fake client. + r := &ReconcileCephBlockPool{ + client: cl, + scheme: s, + context: c, + blockPoolContexts: make(map[string]*blockPoolHealth), + opManagerContext: context.TODO(), + recorder: record.NewFakeRecorder(5), + clusterInfo: cephclient.AdminTestClusterInfo("mycluster"), + } + + t.Run("rados namespace mirroring enabled", func(t *testing.T) { + r.context.Executor = &exectest.MockExecutor{ + MockExecuteCommandWithOutput: func(command string, args ...string) (string, error) { + if args[0] == "namespace" { + assert.Equal(t, pool, args[2]) + return `[{"name":"abc"},{"name":"abc1"},{"name":"abc3"}]`, nil + } + if args[0] == "mirror" && args[1] == "pool" && args[2] == "info" { + return "{}", nil + } + return "", nil + }, + } + enabled, err := r.isAnyRadosNamespaceMirrored(pool) + assert.NoError(t, err) + assert.True(t, enabled) + }) + + t.Run("rados namespace mirroring disabled", func(t *testing.T) { + r.context.Executor = &exectest.MockExecutor{ + MockExecuteCommandWithOutput: func(command string, args ...string) (string, error) { + if args[0] == "namespace" { + assert.Equal(t, pool, args[2]) + return `[]`, nil + } + if args[0] == "mirror" && args[1] == "pool" && args[2] == "info" { + return "{}", nil + } + return "", nil + }, + } + enabled, err := r.isAnyRadosNamespaceMirrored(pool) + assert.NoError(t, err) + assert.False(t, enabled) + }) +} + func TestConfigureRBDStats(t *testing.T) { var ( s = runtime.NewScheme() diff --git a/pkg/operator/ceph/pool/radosnamespace/controller.go b/pkg/operator/ceph/pool/radosnamespace/controller.go index cd8e6fe7d5cc..7bae0b595ba9 100644 --- a/pkg/operator/ceph/pool/radosnamespace/controller.go +++ b/pkg/operator/ceph/pool/radosnamespace/controller.go @@ -57,6 +57,8 @@ var logger = capnslog.NewPackageLogger("github.com/rook/rook", controllerName) var poolNamespace = reflect.TypeOf(cephv1.CephBlockPoolRadosNamespace{}).Name() +var detectCephVersion = opcontroller.DetectCephVersion + // Sets the type meta for the controller main object var controllerTypeMeta = metav1.TypeMeta{ Kind: poolNamespace, @@ -181,7 +183,21 @@ func (r *ReconcileCephBlockPoolRadosNamespace) reconcile(request reconcile.Reque return reconcile.Result{}, errors.Wrap(err, "failed to populate cluster info") } r.clusterInfo.Context = r.opManagerContext - + cephversion, err := detectCephVersion( + r.opManagerContext, + r.opConfig.Image, + cephBlockPoolRadosNamespace.Namespace, + controllerName, + k8sutil.NewOwnerInfo(cephBlockPoolRadosNamespace, r.scheme), + r.context.Clientset, + &cephCluster.Spec, + ) + if err != nil { + return reconcile.Result{}, errors.Wrap(err, "failed to detect ceph version") + } + if cephversion != nil { + r.clusterInfo.CephVersion = *cephversion + } // DELETE: the CR was deleted if !cephBlockPoolRadosNamespace.GetDeletionTimestamp().IsZero() { logger.Debugf("delete cephBlockPoolRadosNamespace %q", namespacedName) @@ -230,11 +246,12 @@ func (r *ReconcileCephBlockPoolRadosNamespace) reconcile(request reconcile.Reque // Build the NamespacedName to fetch the CephBlockPool and make sure it exists, if not we cannot // create the rados namespace cephBlockPool := &cephv1.CephBlockPool{} - cephBlockPoolRadosNamespacedName := types.NamespacedName{Name: cephBlockPoolRadosNamespace.Spec.BlockPoolName, Namespace: request.Namespace} - err = r.client.Get(r.opManagerContext, cephBlockPoolRadosNamespacedName, cephBlockPool) + pool := cephBlockPoolRadosNamespace.Spec.BlockPoolName + cephBlockPoolNamespacedName := types.NamespacedName{Name: pool, Namespace: request.Namespace} + err = r.client.Get(r.opManagerContext, cephBlockPoolNamespacedName, cephBlockPool) if err != nil { if kerrors.IsNotFound(err) { - return reconcile.Result{}, errors.Wrapf(err, "failed to fetch ceph blockpool %q, cannot create rados namespace %q", cephBlockPoolRadosNamespace.Spec.BlockPoolName, cephBlockPoolRadosNamespace.Name) + return reconcile.Result{}, errors.Wrapf(err, "failed to fetch ceph blockpool %q, cannot create rados namespace %q", pool, cephBlockPoolRadosNamespace.Name) } // Error reading the object - requeue the request. return reconcile.Result{}, errors.Wrap(err, "failed to get cephBlockPoolRadosNamespace") @@ -243,7 +260,7 @@ func (r *ReconcileCephBlockPoolRadosNamespace) reconcile(request reconcile.Reque // If the cephBlockPool is not ready to accept commands, we should wait for it to be ready if cephBlockPool.Status.Phase != cephv1.ConditionReady { // We know the CR is present so it should a matter of second for it to become ready - return reconcile.Result{Requeue: true, RequeueAfter: 10 * time.Second}, errors.Wrapf(err, "failed to fetch ceph blockpool %q, cannot create rados namespace %q", cephBlockPoolRadosNamespace.Spec.BlockPoolName, cephBlockPoolRadosNamespace.Name) + return reconcile.Result{Requeue: true, RequeueAfter: 10 * time.Second}, errors.Wrapf(err, "failed to fetch ceph blockpool %q, cannot create rados namespace %q", pool, cephBlockPoolRadosNamespace.Name) } // Create or Update rados namespace err = r.createOrUpdateRadosNamespace(cephBlockPoolRadosNamespace) @@ -261,10 +278,15 @@ func (r *ReconcileCephBlockPoolRadosNamespace) reconcile(request reconcile.Reque return reconcile.Result{}, errors.Wrap(err, "failed to save cluster config") } + err = r.reconcileMirroring(cephBlockPoolRadosNamespace, cephBlockPool) + if err != nil { + return reconcile.Result{}, err + } + r.updateStatus(r.client, namespacedName, cephv1.ConditionReady) if csi.EnableCSIOperator() { - err = csi.CreateUpdateClientProfileRadosNamespace(r.clusterInfo.Context, r.client, r.clusterInfo, cephBlockPoolRadosNamespacedName, buildClusterID(cephBlockPoolRadosNamespace), cephCluster.Name) + err = csi.CreateUpdateClientProfileRadosNamespace(r.clusterInfo.Context, r.client, r.clusterInfo, cephBlockPoolNamespacedName, buildClusterID(cephBlockPoolRadosNamespace), cephCluster.Name) if err != nil { return reconcile.Result{}, errors.Wrap(err, "failed to create ceph csi-op config CR for RadosNamespace") } @@ -378,7 +400,7 @@ func buildClusterID(cephBlockPoolRadosNamespace *cephv1.CephBlockPoolRadosNamesp } func (r *ReconcileCephBlockPoolRadosNamespace) cleanup(radosNamespace *cephv1.CephBlockPoolRadosNamespace, cephCluster *cephv1.CephCluster) error { - logger.Infof("starting cleanup of the ceph resources for radosNamesapce %q in namespace %q", radosNamespace.Name, radosNamespace.Namespace) + logger.Infof("starting cleanup of the ceph resources for radosNamespace %q in namespace %q", radosNamespace.Name, radosNamespace.Namespace) cleanupConfig := map[string]string{ opcontroller.CephBlockPoolNameEnv: radosNamespace.Spec.BlockPoolName, opcontroller.CephBlockPoolRadosNamespaceEnv: getRadosNamespaceName(radosNamespace), @@ -387,7 +409,56 @@ func (r *ReconcileCephBlockPoolRadosNamespace) cleanup(radosNamespace *cephv1.Ce jobName := k8sutil.TruncateNodeNameForJob("cleanup-radosnamespace-%s", fmt.Sprintf("%s-%s", radosNamespace.Spec.BlockPoolName, radosNamespace.Name)) err := cleanup.StartJob(r.clusterInfo.Context, r.context.Clientset, jobName) if err != nil { - return errors.Wrapf(err, "failed to run clean up job to clean the ceph resources in radosNamesapce %q", radosNamespace.Name) + return errors.Wrapf(err, "failed to run clean up job to clean the ceph resources in radosNamespace %q", radosNamespace.Name) + } + return nil +} + +func checkBlockPoolMirroring(cephBlockPool *cephv1.CephBlockPool) bool { + return !(cephBlockPool.Spec.Mirroring.Enabled) +} + +func (r *ReconcileCephBlockPoolRadosNamespace) reconcileMirroring(cephBlockPoolRadosNamespace *cephv1.CephBlockPoolRadosNamespace, cephBlockPool *cephv1.CephBlockPool) error { + poolAndRadosNamespaceName := fmt.Sprintf("%s/%s", cephBlockPool.Name, getRadosNamespaceName(cephBlockPoolRadosNamespace)) + mirrorInfo, err := cephclient.GetPoolMirroringInfo(r.context, r.clusterInfo, poolAndRadosNamespaceName) + if err != nil { + return errors.Wrapf(err, "failed to get mirroring info for the radosnamespace %q", poolAndRadosNamespaceName) + } + + if cephBlockPoolRadosNamespace.Spec.Mirroring != nil { + mirroringDisabled := checkBlockPoolMirroring(cephBlockPool) + if mirroringDisabled { + return errors.Errorf("mirroring is disabled for block pool %q, cannot enable mirroring for radosnamespace %q", cephBlockPool.Name, poolAndRadosNamespaceName) + } + + err = cephclient.EnableRBDRadosNamespaceMirroring(r.context, r.clusterInfo, poolAndRadosNamespaceName, cephBlockPoolRadosNamespace.Spec.Mirroring.RemoteNamespace, string(cephBlockPoolRadosNamespace.Spec.Mirroring.Mode)) + if err != nil { + return errors.Wrap(err, "failed to enable rbd rados namespace mirroring") + } + + // Schedule snapshots + err = cephclient.EnableSnapshotSchedules(r.context, r.clusterInfo, poolAndRadosNamespaceName, cephBlockPoolRadosNamespace.Spec.Mirroring.SnapshotSchedules) + if err != nil { + return errors.Wrapf(err, "failed to enable snapshot scheduling for rbd rados namespace %q", poolAndRadosNamespaceName) + } + } + + if cephBlockPoolRadosNamespace.Spec.Mirroring == nil && mirrorInfo.Mode != "disabled" { + if mirrorInfo.Mode == "image" { + mirroredPools, err := cephclient.GetMirroredPoolImages(r.context, r.clusterInfo, poolAndRadosNamespaceName) + if err != nil { + return errors.Wrapf(err, "failed to list mirrored images for radosnamespace %q", poolAndRadosNamespaceName) + } + + if len(*mirroredPools.Images) > 0 { + return errors.Errorf("there are images in the radosnamespace %q. Please manually disable mirroring for each image", poolAndRadosNamespaceName) + } + } + + err = cephclient.DisableRBDRadosNamespaceMirroring(r.context, r.clusterInfo, poolAndRadosNamespaceName) + if err != nil { + return errors.Wrap(err, "failed to disable rbd rados namespace mirroring") + } } return nil } diff --git a/pkg/operator/ceph/pool/radosnamespace/controller_test.go b/pkg/operator/ceph/pool/radosnamespace/controller_test.go index 50f6a4616ba6..1446c94a56f5 100644 --- a/pkg/operator/ceph/pool/radosnamespace/controller_test.go +++ b/pkg/operator/ceph/pool/radosnamespace/controller_test.go @@ -25,7 +25,10 @@ import ( rookclient "github.com/rook/rook/pkg/client/clientset/versioned/fake" "github.com/rook/rook/pkg/client/clientset/versioned/scheme" "github.com/rook/rook/pkg/clusterd" + opcontroller "github.com/rook/rook/pkg/operator/ceph/controller" "github.com/rook/rook/pkg/operator/ceph/csi" + "github.com/rook/rook/pkg/operator/ceph/version" + cephver "github.com/rook/rook/pkg/operator/ceph/version" "github.com/rook/rook/pkg/operator/k8sutil" testop "github.com/rook/rook/pkg/operator/test" exectest "github.com/rook/rook/pkg/util/exec/test" @@ -36,6 +39,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) @@ -99,6 +103,7 @@ func TestCephBlockPoolRadosNamespaceController(t *testing.T) { scheme: s, context: c, opManagerContext: ctx, + opConfig: opcontroller.OperatorConfig{Image: "ceph/ceph:v14.2.9"}, } // Mock request to simulate Reconcile() being called on an event for a @@ -115,6 +120,12 @@ func TestCephBlockPoolRadosNamespaceController(t *testing.T) { Name: namespace, Namespace: namespace, }, + Spec: cephv1.ClusterSpec{ + CephVersion: cephv1.CephVersionSpec{ + Image: "ceph/ceph:v14.2.9", + ImagePullPolicy: v1.PullIfNotPresent, + }, + }, Status: cephv1.ClusterStatus{ Phase: "", CephVersion: &cephv1.ClusterVersion{ @@ -138,7 +149,7 @@ func TestCephBlockPoolRadosNamespaceController(t *testing.T) { // Create a fake client to mock API calls. cl = fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(object...).Build() // Create a ReconcileCephBlockPoolRadosNamespace object with the scheme and fake client. - r = &ReconcileCephBlockPoolRadosNamespace{client: cl, scheme: s, context: c, opManagerContext: context.TODO()} + r = &ReconcileCephBlockPoolRadosNamespace{client: cl, scheme: s, context: c, opManagerContext: context.TODO(), opConfig: opcontroller.OperatorConfig{Image: "ceph/ceph:v14.2.9"}} res, err := r.Reconcile(ctx, req) assert.NoError(t, err) assert.True(t, res.Requeue) @@ -195,6 +206,9 @@ func TestCephBlockPoolRadosNamespaceController(t *testing.T) { if args[0] == "namespace" && args[1] == "create" { return "", nil } + if args[0] == "mirror" && args[1] == "pool" { + return `{"mode":"disabled"}`, nil + } return "", nil }, @@ -208,6 +222,10 @@ func TestCephBlockPoolRadosNamespaceController(t *testing.T) { scheme: s, context: c, opManagerContext: context.TODO(), + opConfig: opcontroller.OperatorConfig{Image: "ceph/ceph:v14.2.9"}, + } + detectCephVersion = func(ctx context.Context, rookImage, namespace, jobName string, ownerInfo *k8sutil.OwnerInfo, clientset kubernetes.Interface, cephClusterSpec *cephv1.ClusterSpec) (*cephver.CephVersion, error) { + return &version.Reef, nil } // Enable CSI @@ -254,6 +272,10 @@ func TestCephBlockPoolRadosNamespaceController(t *testing.T) { scheme: s, context: c, opManagerContext: ctx, + opConfig: opcontroller.OperatorConfig{Image: "ceph/ceph:v14.2.9"}, + } + detectCephVersion = func(ctx context.Context, rookImage, namespace, jobName string, ownerInfo *k8sutil.OwnerInfo, clientset kubernetes.Interface, cephClusterSpec *cephv1.ClusterSpec) (*cephver.CephVersion, error) { + return &version.Reef, nil } // Enable CSI @@ -280,6 +302,320 @@ func TestCephBlockPoolRadosNamespaceController(t *testing.T) { assert.NotEmpty(t, cm.Data[csi.ConfigKey]) assert.Contains(t, cm.Data[csi.ConfigKey], "clusterID") assert.Contains(t, cm.Data[csi.ConfigKey], name) + cephCluster.Spec.External.Enable = false + }) + + t.Run("test rbd rados namespace mirroring enabled and blockpool mirroring disabled", func(t *testing.T) { + remoteNamespace := "" + cephBlockPoolRadosNamespace.Spec.Mirroring = &cephv1.RadosNamespaceMirroring{ + RemoteNamespace: &remoteNamespace, + Mode: "image", + } + objects := []runtime.Object{ + cephBlockPoolRadosNamespace, + cephCluster, + cephBlockPool, + } + // Create a fake client to mock API calls. + cl = fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build() + c.Client = cl + + executor = &exectest.MockExecutor{ + MockExecuteCommandWithOutput: func(command string, args ...string) (string, error) { + if args[0] == "namespace" && args[1] == "create" { + return "", nil + } + if args[0] == "mirror" && args[1] == "pool" { + return `{"mode":""}`, nil + } + + return "", nil + }, + } + c.Executor = executor + + s.AddKnownTypes(cephv1.SchemeGroupVersion, &cephv1.CephBlockPoolList{}) + // Create a ReconcileCephBlockPoolRadosNamespace object with the scheme and fake client. + r = &ReconcileCephBlockPoolRadosNamespace{ + client: cl, + scheme: s, + context: c, + opManagerContext: context.TODO(), + opConfig: opcontroller.OperatorConfig{Image: "ceph/ceph:v14.2.9"}, + } + detectCephVersion = func(ctx context.Context, rookImage, namespace, jobName string, ownerInfo *k8sutil.OwnerInfo, clientset kubernetes.Interface, cephClusterSpec *cephv1.ClusterSpec) (*cephver.CephVersion, error) { + return &version.Reef, nil + } + + res, err := r.Reconcile(ctx, req) + assert.Error(t, err) + assert.False(t, res.Requeue) + }) + + t.Run("test rbd rados namespace mirroring enabled and blockpool mirroring is also enabled but empty rados namespace", func(t *testing.T) { + remoteNamespace := "" + cephBlockPoolRadosNamespace.Spec.Mirroring = &cephv1.RadosNamespaceMirroring{ + RemoteNamespace: &remoteNamespace, + Mode: "image", + } + cephBlockPool.Spec.Mirroring.Enabled = true + objects := []runtime.Object{ + cephBlockPoolRadosNamespace, + cephCluster, + cephBlockPool, + } + // Create a fake client to mock API calls. + cl = fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build() + c.Client = cl + + executor = &exectest.MockExecutor{ + MockExecuteCommandWithOutput: func(command string, args ...string) (string, error) { + if args[0] == "namespace" && args[1] == "create" { + return "", nil + } + if args[0] == "mirror" && args[1] == "pool" { + return `{"mode":""}`, nil + } + + return "", nil + }, + } + c.Executor = executor + + s.AddKnownTypes(cephv1.SchemeGroupVersion, &cephv1.CephBlockPoolList{}) + // Create a ReconcileCephBlockPoolRadosNamespace object with the scheme and fake client. + r = &ReconcileCephBlockPoolRadosNamespace{ + client: cl, + scheme: s, + context: c, + opManagerContext: context.TODO(), + opConfig: opcontroller.OperatorConfig{Image: "ceph/ceph:v14.2.9"}, + } + detectCephVersion = func(ctx context.Context, rookImage, namespace, jobName string, ownerInfo *k8sutil.OwnerInfo, clientset kubernetes.Interface, cephClusterSpec *cephv1.ClusterSpec) (*cephver.CephVersion, error) { + return &version.Reef, nil + } + + res, err := r.Reconcile(ctx, req) + assert.Error(t, err) + assert.False(t, res.Requeue) + }) + + t.Run("test rbd rados namespace mirroring enabled and blockpool mirroring is also enabled and non empty rados namespace but less ceph version", func(t *testing.T) { + remoteNamespace := "test-1" + cephBlockPoolRadosNamespace.Spec.Mirroring = &cephv1.RadosNamespaceMirroring{ + RemoteNamespace: &remoteNamespace, + Mode: "image", + } + cephBlockPool.Spec.Mirroring.Enabled = true + objects := []runtime.Object{ + cephBlockPoolRadosNamespace, + cephCluster, + cephBlockPool, + } + // Create a fake client to mock API calls. + cl = fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build() + c.Client = cl + + executor = &exectest.MockExecutor{ + MockExecuteCommandWithOutput: func(command string, args ...string) (string, error) { + if args[0] == "namespace" && args[1] == "create" { + return "", nil + } + if args[0] == "mirror" && args[1] == "pool" && args[2] == "enable" { + assert.Equal(t, cephBlockPoolRadosNamespace.Spec.Mirroring.RemoteNamespace, args[6]) + return "", nil + } + if args[0] == "mirror" && args[1] == "pool" && args[2] == "info" { + return `{}`, nil + } + return "", nil + }, + } + c.Executor = executor + + s.AddKnownTypes(cephv1.SchemeGroupVersion, &cephv1.CephBlockPoolList{}) + // Create a ReconcileCephBlockPoolRadosNamespace object with the scheme and fake client. + r = &ReconcileCephBlockPoolRadosNamespace{ + client: cl, + scheme: s, + context: c, + opManagerContext: context.TODO(), + opConfig: opcontroller.OperatorConfig{Image: "ceph/ceph:v14.2.9"}, + } + detectCephVersion = func(ctx context.Context, rookImage, namespace, jobName string, ownerInfo *k8sutil.OwnerInfo, clientset kubernetes.Interface, cephClusterSpec *cephv1.ClusterSpec) (*cephver.CephVersion, error) { + return &version.Reef, nil + } + + res, err := r.Reconcile(ctx, req) + assert.Error(t, err) + assert.False(t, res.Requeue) + }) + + t.Run("test rbd rados namespace mirroring enabled and blockpool mirroring is also enabled and non empty rados namespace and correct ceph version", func(t *testing.T) { + remoteNamespace := "test-1" + cephBlockPoolRadosNamespace.Spec.Mirroring = &cephv1.RadosNamespaceMirroring{ + RemoteNamespace: &remoteNamespace, + Mode: "image", + } + cephBlockPool.Spec.Mirroring.Enabled = true + objects := []runtime.Object{ + cephBlockPoolRadosNamespace, + cephCluster, + cephBlockPool, + } + // Create a fake client to mock API calls. + cl = fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build() + c.Client = cl + + executor = &exectest.MockExecutor{ + MockExecuteCommandWithOutput: func(command string, args ...string) (string, error) { + if args[0] == "namespace" && args[1] == "create" { + return "", nil + } + if args[0] == "mirror" && args[1] == "pool" && args[2] == "enable" { + assert.Equal(t, *cephBlockPoolRadosNamespace.Spec.Mirroring.RemoteNamespace, args[6]) + return "", nil + } + if args[0] == "mirror" && args[1] == "pool" && args[2] == "info" { + return `{}`, nil + } + return "", nil + }, + } + c.Executor = executor + + s.AddKnownTypes(cephv1.SchemeGroupVersion, &cephv1.CephBlockPoolList{}) + // Create a ReconcileCephBlockPoolRadosNamespace object with the scheme and fake client. + r = &ReconcileCephBlockPoolRadosNamespace{ + client: cl, + scheme: s, + context: c, + opManagerContext: context.TODO(), + opConfig: opcontroller.OperatorConfig{Image: "ceph/ceph:v14.2.9"}, + } + detectCephVersion = func(ctx context.Context, rookImage, namespace, jobName string, ownerInfo *k8sutil.OwnerInfo, clientset kubernetes.Interface, cephClusterSpec *cephv1.ClusterSpec) (*cephver.CephVersion, error) { + return &cephver.CephVersion{Major: 20, Minor: 0, Extra: 0}, nil + } + + res, err := r.Reconcile(ctx, req) + assert.NoError(t, err) + assert.False(t, res.Requeue) + + err = r.client.Get(ctx, req.NamespacedName, cephBlockPoolRadosNamespace) + assert.NoError(t, err) + assert.Equal(t, cephv1.ConditionReady, cephBlockPoolRadosNamespace.Status.Phase) + assert.NotEmpty(t, cephBlockPoolRadosNamespace.Status.Info["clusterID"]) + }) + + t.Run("test rbd rados namespace mirroring enabled and blockpool mirroring is also enabled and no remote rados namespace and correct ceph version", func(t *testing.T) { + cephBlockPoolRadosNamespace.Spec.Mirroring = &cephv1.RadosNamespaceMirroring{ + Mode: "image", + } + cephBlockPool.Spec.Mirroring.Enabled = true + objects := []runtime.Object{ + cephBlockPoolRadosNamespace, + cephCluster, + cephBlockPool, + } + // Create a fake client to mock API calls. + cl = fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build() + c.Client = cl + + executor = &exectest.MockExecutor{ + MockExecuteCommandWithOutput: func(command string, args ...string) (string, error) { + if args[0] == "namespace" && args[1] == "create" { + return "", nil + } + if args[0] == "mirror" && args[1] == "pool" && args[2] == "enable" { + assert.Equal(t, string(cephBlockPoolRadosNamespace.Spec.Mirroring.Mode), args[4]) + return "", nil + } + if args[0] == "mirror" && args[1] == "pool" && args[2] == "info" { + return `{}`, nil + } + return "", nil + }, + } + c.Executor = executor + + s.AddKnownTypes(cephv1.SchemeGroupVersion, &cephv1.CephBlockPoolList{}) + // Create a ReconcileCephBlockPoolRadosNamespace object with the scheme and fake client. + r = &ReconcileCephBlockPoolRadosNamespace{ + client: cl, + scheme: s, + context: c, + opManagerContext: context.TODO(), + opConfig: opcontroller.OperatorConfig{Image: "ceph/ceph:v14.2.9"}, + } + detectCephVersion = func(ctx context.Context, rookImage, namespace, jobName string, ownerInfo *k8sutil.OwnerInfo, clientset kubernetes.Interface, cephClusterSpec *cephv1.ClusterSpec) (*cephver.CephVersion, error) { + return &cephver.CephVersion{Major: 20, Minor: 0, Extra: 0}, nil + } + + res, err := r.Reconcile(ctx, req) + assert.NoError(t, err) + assert.False(t, res.Requeue) + + err = r.client.Get(ctx, req.NamespacedName, cephBlockPoolRadosNamespace) + assert.NoError(t, err) + assert.Equal(t, cephv1.ConditionReady, cephBlockPoolRadosNamespace.Status.Phase) + assert.NotEmpty(t, cephBlockPoolRadosNamespace.Status.Info["clusterID"]) + }) + + t.Run("test rbd rados namespace mirroring disabled", func(t *testing.T) { + cephBlockPoolRadosNamespace.Spec.Mirroring = nil + + objects := []runtime.Object{ + cephBlockPoolRadosNamespace, + cephCluster, + cephBlockPool, + } + // Create a fake client to mock API calls. + cl = fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build() + c.Client = cl + executor = &exectest.MockExecutor{ + MockExecuteCommandWithOutput: func(command string, args ...string) (string, error) { + if args[0] == "namespace" && args[1] == "create" { + return "", nil + } + // set mode = image as it was enabled earlier + if args[0] == "mirror" && args[1] == "pool" && args[2] == "info" { + return `{"mode":"image"}`, nil + } + if args[0] == "mirror" && args[1] == "pool" && args[2] == "status" { + return `{"images":[]}`, nil + } + if args[0] == "mirror" && args[1] == "pool" && args[2] == "peer" { + return `{}`, nil + } + if args[0] == "mirror" && args[1] == "pool" && args[2] == "disable" { + assert.Equal(t, cephBlockPool.Name+"/"+cephBlockPoolRadosNamespace.Name, args[3]) + return `{}`, nil + } + return "", nil + }, + } + c.Executor = executor + + s.AddKnownTypes(cephv1.SchemeGroupVersion, &cephv1.CephBlockPoolList{}) + // Create a ReconcileCephBlockPoolRadosNamespace object with the scheme and fake client. + r = &ReconcileCephBlockPoolRadosNamespace{ + client: cl, + scheme: s, + context: c, + opManagerContext: context.TODO(), + opConfig: opcontroller.OperatorConfig{Image: "ceph/ceph:v14.2.9"}, + } + detectCephVersion = func(ctx context.Context, rookImage, namespace, jobName string, ownerInfo *k8sutil.OwnerInfo, clientset kubernetes.Interface, cephClusterSpec *cephv1.ClusterSpec) (*cephver.CephVersion, error) { + return &version.Reef, nil + } + + res, err := r.Reconcile(ctx, req) + assert.NoError(t, err) + assert.False(t, res.Requeue) + + err = r.client.Get(context.TODO(), req.NamespacedName, cephBlockPoolRadosNamespace) + assert.NoError(t, err) + assert.Equal(t, cephv1.ConditionReady, cephBlockPoolRadosNamespace.Status.Phase) }) }