diff --git a/Documentation/CRDs/specification.md b/Documentation/CRDs/specification.md index c35787ba3b62..d30907261be1 100644 --- a/Documentation/CRDs/specification.md +++ b/Documentation/CRDs/specification.md @@ -3612,6 +3612,18 @@ string + + +osd
+ + +OSDStatus + + + + + +

CephVersionSpec @@ -8181,6 +8193,35 @@ string +

OSDStatus +

+

+(Appears on:CephStorage) +

+
+

OSDStatus represents OSD status of the ceph Cluster

+
+ + + + + + + + + + + + + +
FieldDescription
+storeType
+ +map[string]int + +
+

StoreType is a mapping between the OSD backend stores and number of OSDs using these stores

+

OSDStore

@@ -8209,6 +8250,19 @@ string

Type of backend storage to be used while creating OSDs. If empty, then bluestore will be used

+ + +updateStore
+ +string + + + +(Optional) +

UpdateStore updates the backend store for existing OSDs. It destroys each OSD one at a time, cleans up the backing disk +and prepares same OSD on that disk

+ +

ObjectEndpoints diff --git a/cmd/rook/ceph/osd.go b/cmd/rook/ceph/osd.go index e55eef7c1b8b..5c2873ac3a7e 100644 --- a/cmd/rook/ceph/osd.go +++ b/cmd/rook/ceph/osd.go @@ -19,6 +19,7 @@ package ceph import ( "context" "encoding/json" + "fmt" "os" "path" "strconv" @@ -29,8 +30,12 @@ import ( "github.com/pkg/errors" "github.com/rook/rook/cmd/rook/rook" cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1" + "github.com/rook/rook/pkg/clusterd" + cleanup "github.com/rook/rook/pkg/daemon/ceph/cleanup" + "github.com/rook/rook/pkg/daemon/ceph/client" osddaemon "github.com/rook/rook/pkg/daemon/ceph/osd" "github.com/rook/rook/pkg/operator/ceph/cluster/mon" + "github.com/rook/rook/pkg/operator/ceph/cluster/osd" oposd "github.com/rook/rook/pkg/operator/ceph/cluster/osd" osdcfg "github.com/rook/rook/pkg/operator/ceph/cluster/osd/config" opcontroller "github.com/rook/rook/pkg/operator/ceph/controller" @@ -66,6 +71,7 @@ var ( ownerRefID string clusterName string osdID int + replaceOSDID int osdStoreType string osdStringID string osdUUID string @@ -88,6 +94,7 @@ func addOSDFlags(command *cobra.Command) { addOSDConfigFlags(provisionCmd) // flags specific to provisioning + provisionCmd.Flags().IntVar(&replaceOSDID, "replace-osd", -1, "osd to be destroyed") provisionCmd.Flags().StringVar(&cfg.devices, "data-devices", "", "comma separated list of devices to use for storage") provisionCmd.Flags().StringVar(&osdDataDeviceFilter, "data-device-filter", "", "a regex filter for the device names to use, or \"all\"") provisionCmd.Flags().StringVar(&osdDataDevicePathFilter, "data-device-path-filter", "", "a regex filter for the device path names to use") @@ -196,6 +203,7 @@ func writeOSDConfig(cmd *cobra.Command, args []string) error { // Provision a device or directory for an OSD func prepareOSD(cmd *cobra.Command, args []string) error { + if err := verifyConfigFlags(provisionCmd); err != nil { return err } @@ -251,8 +259,19 @@ func prepareOSD(cmd *cobra.Command, args []string) error { clusterInfo.OwnerInfo = ownerInfo clusterInfo.Context = cmd.Context() kv := k8sutil.NewConfigMapKVStore(clusterInfo.Namespace, context.Clientset, ownerInfo) + + // destroy the OSD using the OSD ID + var replaceOSD *osd.OSDReplaceInfo + if replaceOSDID != -1 { + osdInfo, err := destroyOSD(context, &clusterInfo, replaceOSDID) + if err != nil { + rook.TerminateFatal(errors.Wrapf(err, "failed to destroy OSD %d.", osdInfo.ID)) + } + replaceOSD = &oposd.OSDReplaceInfo{ID: osdInfo.ID, Path: osdInfo.BlockPath} + } + agent := osddaemon.NewAgent(context, dataDevices, cfg.metadataDevice, forceFormat, - cfg.storeConfig, &clusterInfo, cfg.nodeName, kv, cfg.pvcBacked) + cfg.storeConfig, &clusterInfo, cfg.nodeName, kv, replaceOSD, cfg.pvcBacked) if cfg.metadataDevice != "" { metaDevice = cfg.metadataDevice @@ -398,3 +417,36 @@ func readCephSecret(path string) error { } return nil } + +func destroyOSD(context *clusterd.Context, clusterInfo *client.ClusterInfo, osdID int) (*oposd.OSDInfo, error) { + osdInfo, err := osddaemon.GetOSDInfoById(context, clusterInfo, osdID) + if err != nil { + return nil, errors.Wrapf(err, "failed to get OSD info for OSD.%d", osdID) + } + + // destroy the osd + logger.Infof("destroying OSD %d on path %q in %q mode", osdInfo.ID, osdInfo.BlockPath, osdInfo.CVMode) + destroyOSDArgs := []string{"osd", "destroy", fmt.Sprintf("osd.%d", osdInfo.ID), "--yes-i-really-mean-it"} + _, err = client.NewCephCommand(context, clusterInfo, destroyOSDArgs).Run() + if err != nil { + return nil, errors.Wrapf(err, "failed to destroy osd.%d.", osdInfo.ID) + } + + // Sanitize OSD disk + s := cleanup.NewDiskSanitizer(context, clusterInfo, + &cephv1.SanitizeDisksSpec{ + Method: cephv1.SanitizeMethodProperty(cephv1.SanitizeMethodComplete), + DataSource: cephv1.SanitizeDataSourceProperty(cephv1.SanitizeDataSourceZero), + Iteration: 1, + }, + ) + + // TODO: handle disk sanitization errors + if osdInfo.CVMode == "raw" { + s.SanitizeRawDisk([]oposd.OSDInfo{osdInfo}) + } else if osdInfo.CVMode == "lvm" { + s.SanitizeLVMDisk([]oposd.OSDInfo{osdInfo}) + } + + return &osdInfo, nil +} diff --git a/deploy/charts/rook-ceph/templates/resources.yaml b/deploy/charts/rook-ceph/templates/resources.yaml index b5098dcd2262..b1f0298b053f 100644 --- a/deploy/charts/rook-ceph/templates/resources.yaml +++ b/deploy/charts/rook-ceph/templates/resources.yaml @@ -4385,6 +4385,10 @@ spec: - bluestore - bluestore-rdr type: string + updateStore: + description: UpdateStore updates the backend store for existing OSDs. It destroys each OSD one at a time, cleans up the backing disk and prepares same OSD on that disk + pattern: ^$|^yes-really-update-store$ + type: string type: object useAllDevices: description: Whether to consume all the storage devices found on a machine @@ -4753,6 +4757,15 @@ spec: type: string type: object type: array + osd: + description: OSDStatus represents OSD status of the ceph Cluster + properties: + storeType: + additionalProperties: + type: integer + description: StoreType is a mapping between the OSD backend stores and number of OSDs using these stores + type: object + type: object type: object version: description: ClusterVersion represents the version of a Ceph Cluster diff --git a/deploy/examples/crds.yaml b/deploy/examples/crds.yaml index ae33cd51d856..a720bc1405ff 100644 --- a/deploy/examples/crds.yaml +++ b/deploy/examples/crds.yaml @@ -4383,6 +4383,10 @@ spec: - bluestore - bluestore-rdr type: string + updateStore: + description: UpdateStore updates the backend store for existing OSDs. It destroys each OSD one at a time, cleans up the backing disk and prepares same OSD on that disk + pattern: ^$|^yes-really-update-store$ + type: string type: object useAllDevices: description: Whether to consume all the storage devices found on a machine @@ -4751,6 +4755,15 @@ spec: type: string type: object type: array + osd: + description: OSDStatus represents OSD status of the ceph Cluster + properties: + storeType: + additionalProperties: + type: integer + description: StoreType is a mapping between the OSD backend stores and number of OSDs using these stores + type: object + type: object type: object version: description: ClusterVersion represents the version of a Ceph Cluster diff --git a/pkg/apis/ceph.rook.io/v1/types.go b/pkg/apis/ceph.rook.io/v1/types.go index c6a496309067..b2f428b661b1 100755 --- a/pkg/apis/ceph.rook.io/v1/types.go +++ b/pkg/apis/ceph.rook.io/v1/types.go @@ -420,6 +420,7 @@ type Capacity struct { // CephStorage represents flavors of Ceph Cluster Storage type CephStorage struct { DeviceClasses []DeviceClasses `json:"deviceClasses,omitempty"` + OSD OSDStatus `json:"osd,omitempty"` } // DeviceClasses represents device classes of a Ceph Cluster @@ -427,6 +428,12 @@ type DeviceClasses struct { Name string `json:"name,omitempty"` } +// OSDStatus represents OSD status of the ceph Cluster +type OSDStatus struct { + // StoreType is a mapping between the OSD backend stores and number of OSDs using these stores + StoreType map[string]int `json:"storeType,omitempty"` +} + // ClusterVersion represents the version of a Ceph Cluster type ClusterVersion struct { Image string `json:"image,omitempty"` @@ -2597,6 +2604,11 @@ type OSDStore struct { // +optional // +kubebuilder:validation:Enum=bluestore;bluestore-rdr; Type string `json:"type,omitempty"` + // UpdateStore updates the backend store for existing OSDs. It destroys each OSD one at a time, cleans up the backing disk + // and prepares same OSD on that disk + // +optional + // +kubebuilder:validation:Pattern=`^$|^yes-really-update-store$` + UpdateStore string `json:"updateStore,omitempty"` } // Node is a storage nodes diff --git a/pkg/apis/ceph.rook.io/v1/zz_generated.deepcopy.go b/pkg/apis/ceph.rook.io/v1/zz_generated.deepcopy.go index 7be6f7f05980..d2fc33cd05e5 100644 --- a/pkg/apis/ceph.rook.io/v1/zz_generated.deepcopy.go +++ b/pkg/apis/ceph.rook.io/v1/zz_generated.deepcopy.go @@ -1649,6 +1649,7 @@ func (in *CephStorage) DeepCopyInto(out *CephStorage) { *out = make([]DeviceClasses, len(*in)) copy(*out, *in) } + in.OSD.DeepCopyInto(&out.OSD) return } @@ -3197,6 +3198,29 @@ func (in *NotificationKeyFilterRule) DeepCopy() *NotificationKeyFilterRule { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *OSDStatus) DeepCopyInto(out *OSDStatus) { + *out = *in + if in.StoreType != nil { + in, out := &in.StoreType, &out.StoreType + *out = make(map[string]int, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OSDStatus. +func (in *OSDStatus) DeepCopy() *OSDStatus { + if in == nil { + return nil + } + out := new(OSDStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *OSDStore) DeepCopyInto(out *OSDStore) { *out = *in diff --git a/pkg/daemon/ceph/cleanup/disk.go b/pkg/daemon/ceph/cleanup/disk.go index 59c6b4173d5e..484c48bf9557 100644 --- a/pkg/daemon/ceph/cleanup/disk.go +++ b/pkg/daemon/ceph/cleanup/disk.go @@ -63,7 +63,7 @@ func (s *DiskSanitizer) StartSanitizeDisks() { logger.Errorf("failed to list lvm osd(s). %v", err) } else { // Start the sanitizing sequence - s.sanitizeLVMDisk(osdLVMList) + s.SanitizeLVMDisk(osdLVMList) } // Raw based OSDs @@ -72,11 +72,11 @@ func (s *DiskSanitizer) StartSanitizeDisks() { logger.Errorf("failed to list raw osd(s). %v", err) } else { // Start the sanitizing sequence - s.sanitizeRawDisk(osdRawList) + s.SanitizeRawDisk(osdRawList) } } -func (s *DiskSanitizer) sanitizeRawDisk(osdRawList []oposd.OSDInfo) { +func (s *DiskSanitizer) SanitizeRawDisk(osdRawList []oposd.OSDInfo) { // Initialize work group to wait for completion of all the go routine var wg sync.WaitGroup @@ -93,7 +93,7 @@ func (s *DiskSanitizer) sanitizeRawDisk(osdRawList []oposd.OSDInfo) { wg.Wait() } -func (s *DiskSanitizer) sanitizeLVMDisk(osdLVMList []oposd.OSDInfo) { +func (s *DiskSanitizer) SanitizeLVMDisk(osdLVMList []oposd.OSDInfo) { // Initialize work group to wait for completion of all the go routine var wg sync.WaitGroup pvs := []string{} @@ -112,7 +112,7 @@ func (s *DiskSanitizer) sanitizeLVMDisk(osdLVMList []oposd.OSDInfo) { wg.Wait() var wg2 sync.WaitGroup - // // purge remaining LVM2 metadata from PV + // purge remaining LVM2 metadata from PV for _, pv := range pvs { wg2.Add(1) go s.executeSanitizeCommand(oposd.OSDInfo{BlockPath: pv}, &wg2) diff --git a/pkg/daemon/ceph/osd/agent.go b/pkg/daemon/ceph/osd/agent.go index eee03661d4f5..8dcd1195c97d 100644 --- a/pkg/daemon/ceph/osd/agent.go +++ b/pkg/daemon/ceph/osd/agent.go @@ -19,6 +19,7 @@ package osd import ( "github.com/rook/rook/pkg/clusterd" cephclient "github.com/rook/rook/pkg/daemon/ceph/client" + oposd "github.com/rook/rook/pkg/operator/ceph/cluster/osd" "github.com/rook/rook/pkg/operator/ceph/cluster/osd/config" "github.com/rook/rook/pkg/operator/k8sutil" ) @@ -37,11 +38,13 @@ type OsdAgent struct { storeConfig config.StoreConfig kv *k8sutil.ConfigMapKVStore pvcBacked bool + replaceOSD *oposd.OSDReplaceInfo } // NewAgent is the instantiation of the OSD agent func NewAgent(context *clusterd.Context, devices []DesiredDevice, metadataDevice string, forceFormat bool, - storeConfig config.StoreConfig, clusterInfo *cephclient.ClusterInfo, nodeName string, kv *k8sutil.ConfigMapKVStore, pvcBacked bool) *OsdAgent { + storeConfig config.StoreConfig, clusterInfo *cephclient.ClusterInfo, nodeName string, kv *k8sutil.ConfigMapKVStore, + replaceOSD *oposd.OSDReplaceInfo, pvcBacked bool) *OsdAgent { return &OsdAgent{ devices: devices, @@ -52,6 +55,7 @@ func NewAgent(context *clusterd.Context, devices []DesiredDevice, metadataDevice nodeName: nodeName, kv: kv, pvcBacked: pvcBacked, + replaceOSD: replaceOSD, } } @@ -64,3 +68,12 @@ func getDeviceLVPath(context *clusterd.Context, deviceName string) string { logger.Debugf("logical volume path for device %q is %q", deviceName, output) return output } + +// GetReplaceOSDId returns the OSD ID based on the device name +func (a *OsdAgent) GetReplaceOSDId(device string) int { + if device == a.replaceOSD.Path { + return a.replaceOSD.ID + } + + return -1 +} diff --git a/pkg/daemon/ceph/osd/daemon.go b/pkg/daemon/ceph/osd/daemon.go index b98c80819a04..5514119f0866 100644 --- a/pkg/daemon/ceph/osd/daemon.go +++ b/pkg/daemon/ceph/osd/daemon.go @@ -601,3 +601,32 @@ func getVolumeGroupName(lvPath string) string { return vgSlice[2] } + +// GetOSDInfoByID returns the OSDInfo using the ceph volume list +func GetOSDInfoById(context *clusterd.Context, clusterInfo *client.ClusterInfo, osdID int) (oposd.OSDInfo, error) { + // LVM mode OSDs + osdLVMList, err := GetCephVolumeLVMOSDs(context, clusterInfo, clusterInfo.FSID, "", false, false) + if err != nil { + return oposd.OSDInfo{}, errors.Wrap(err, "failed to list lvm osd(s)") + } + + for _, osdInfo := range osdLVMList { + if osdInfo.ID == osdID { + return osdInfo, nil + } + } + + // Raw mode OSDs + osdRawList, err := GetCephVolumeRawOSDs(context, clusterInfo, clusterInfo.FSID, "", "", "", false, true) + if err != nil { + return oposd.OSDInfo{}, errors.Wrap(err, "failed to list raw osd(s)") + } + + for _, osdInfo := range osdRawList { + if osdInfo.ID == osdID { + return osdInfo, nil + } + } + + return oposd.OSDInfo{}, fmt.Errorf("failed to get details for OSD %d using ceph-volume list", osdID) +} diff --git a/pkg/daemon/ceph/osd/device.go b/pkg/daemon/ceph/osd/device.go index 3c4676762098..3e4bd57f624c 100644 --- a/pkg/daemon/ceph/osd/device.go +++ b/pkg/daemon/ceph/osd/device.go @@ -63,6 +63,7 @@ type DeviceOsdIDEntry struct { Config DesiredDevice // Device specific config options PersistentDevicePaths []string DeviceInfo *sys.LocalDisk // low-level info about the device + RestoreOSD bool // Restore OSD by reparing it with with OSD ID } func (m *DeviceOsdMapping) String() string { diff --git a/pkg/daemon/ceph/osd/volume.go b/pkg/daemon/ceph/osd/volume.go index 440115bfa3b5..f5afac586b7f 100644 --- a/pkg/daemon/ceph/osd/volume.go +++ b/pkg/daemon/ceph/osd/volume.go @@ -282,6 +282,16 @@ func (a *OsdAgent) initializeBlockPVC(context *clusterd.Context, devices *Device deviceArg, }...) + if a.replaceOSD != nil { + replaceOSDID := a.GetReplaceOSDId(device.DeviceInfo.RealPath) + if replaceOSDID != -1 { + immediateExecuteArgs = append(immediateExecuteArgs, []string{ + "--osd-id", + fmt.Sprintf("%d", replaceOSDID), + }...) + } + } + crushDeviceClass := os.Getenv(oposd.CrushDeviceClassVarName) if crushDeviceClass != "" { immediateExecuteArgs = append(immediateExecuteArgs, []string{crushDeviceClassFlag, crushDeviceClass}...) @@ -526,6 +536,16 @@ func (a *OsdAgent) initializeDevicesRawMode(context *clusterd.Context, devices * deviceArg, }...) + if a.replaceOSD != nil { + restoreOSDID := a.GetReplaceOSDId(deviceArg) + if restoreOSDID != -1 { + immediateExecuteArgs = append(immediateExecuteArgs, []string{ + "--osd-id", + fmt.Sprintf("%d", restoreOSDID), + }...) + } + } + // assign the device class specific to the device immediateExecuteArgs = a.appendDeviceClassArg(device, immediateExecuteArgs) @@ -860,6 +880,7 @@ func GetCephVolumeLVMOSDs(context *clusterd.Context, clusterInfo *client.Cluster lvPath = lv } + // TODO: Don't read osd store type from env variable osdStore := os.Getenv(oposd.OSDStoreTypeVarName) if osdStore == "" { osdStore = string(cephv1.StoreTypeBlueStore) @@ -1053,10 +1074,7 @@ func GetCephVolumeRawOSDs(context *clusterd.Context, clusterInfo *client.Cluster blockPath = block } - osdStore := os.Getenv(oposd.OSDStoreTypeVarName) - if osdStore == "" { - osdStore = string(cephv1.StoreTypeBlueStore) - } + osdStore := osdInfo.Type osd := oposd.OSDInfo{ ID: osdID, diff --git a/pkg/daemon/ceph/osd/volume_test.go b/pkg/daemon/ceph/osd/volume_test.go index 03fe7b47f357..ee814ce70aa1 100644 --- a/pkg/daemon/ceph/osd/volume_test.go +++ b/pkg/daemon/ceph/osd/volume_test.go @@ -26,6 +26,7 @@ import ( "github.com/pkg/errors" "github.com/rook/rook/pkg/clusterd" cephclient "github.com/rook/rook/pkg/daemon/ceph/client" + "github.com/rook/rook/pkg/operator/ceph/cluster/osd" oposd "github.com/rook/rook/pkg/operator/ceph/cluster/osd" "github.com/rook/rook/pkg/operator/ceph/cluster/osd/config" cephver "github.com/rook/rook/pkg/operator/ceph/version" @@ -1189,6 +1190,48 @@ func TestInitializeBlockPVC(t *testing.T) { assert.Equal(t, "", blockPath) assert.Equal(t, "", metadataBlockPath) assert.Equal(t, "", walBlockPath) + + // Test for condition when osd is prepared with existing osd ID + a = &OsdAgent{clusterInfo: clusterInfo, nodeName: "node1", storeConfig: config.StoreConfig{StoreType: "bluestore"}, replaceOSD: &osd.OSDReplaceInfo{ID: 3, Path: "/dev/sda"}} + devices = &DeviceOsdMapping{ + Entries: map[string]*DeviceOsdIDEntry{ + "data": {Data: -1, Metadata: nil, Config: DesiredDevice{Name: "/mnt/set1-data-0-rpf2k"}, DeviceInfo: &sys.LocalDisk{RealPath: "/dev/sda"}}, + }, + } + executor.MockExecuteCommandWithCombinedOutput = func(command string, args ...string) (string, error) { + logger.Infof("%s %v", command, args) + if args[1] == "ceph-volume" && args[2] == "raw" && args[3] == "prepare" && args[4] == "--bluestore" && args[7] == "--osd-id" && args[8] == "3" { + return initializeBlockPVCTestResult, nil + } + + return "", errors.Errorf("unknown command %s %s", command, args) + } + blockPath, metadataBlockPath, walBlockPath, err = a.initializeBlockPVC(context, devices, false) + assert.Nil(t, err) + assert.Equal(t, "/mnt/set1-data-0-rpf2k", blockPath) + assert.Equal(t, "", metadataBlockPath) + assert.Equal(t, "", walBlockPath) + + // Test for condition that --osd-id is not passed for the devices that don't match the OSD to be replaced. + a = &OsdAgent{clusterInfo: clusterInfo, nodeName: "node1", storeConfig: config.StoreConfig{StoreType: "bluestore"}, replaceOSD: &osd.OSDReplaceInfo{ID: 3, Path: "/dev/sda"}} + devices = &DeviceOsdMapping{ + Entries: map[string]*DeviceOsdIDEntry{ + "data": {Data: -1, Metadata: nil, Config: DesiredDevice{Name: "/mnt/set1-data-0-rpf2k"}, DeviceInfo: &sys.LocalDisk{RealPath: "/dev/sdb"}}, + }, + } + executor.MockExecuteCommandWithCombinedOutput = func(command string, args ...string) (string, error) { + logger.Infof("%s %v", command, args) + if args[1] == "ceph-volume" && args[2] == "raw" && args[3] == "prepare" && args[4] == "--bluestore" && args[7] != "--osd-id" && args[8] != "3" { + return initializeBlockPVCTestResult, nil + } + + return "", errors.Errorf("unknown command %s %s", command, args) + } + blockPath, metadataBlockPath, walBlockPath, err = a.initializeBlockPVC(context, devices, false) + assert.Nil(t, err) + assert.Equal(t, "/mnt/set1-data-0-rpf2k", blockPath) + assert.Equal(t, "", metadataBlockPath) + assert.Equal(t, "", walBlockPath) } func TestInitializeBlockPVCWithMetadata(t *testing.T) { diff --git a/pkg/operator/ceph/cluster/osd/create.go b/pkg/operator/ceph/cluster/osd/create.go index 9d5d97d3e00e..ea3d5d2d2fdc 100644 --- a/pkg/operator/ceph/cluster/osd/create.go +++ b/pkg/operator/ceph/cluster/osd/create.go @@ -18,6 +18,7 @@ package osd import ( "fmt" + "strings" "github.com/pkg/errors" cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1" @@ -180,7 +181,20 @@ func (c *Cluster) startProvisioningOverPVCs(config *provisionConfig, errs *provi // Skip OSD prepare if deployment already exists for the PVC // Also skip the encryption work part to avoid overriding the existing encryption key + skipPreparePod := false if existingDeployments.Has(dataSource.ClaimName) { + skipPreparePod = true + } + + // Allow updating OSD prepare pod if the OSD needs migration + if c.replaceOSD != nil { + if strings.Contains(c.replaceOSD.Path, dataSource.ClaimName) { + logger.Infof("updating OSD prepare pod to replace OSD.%d", c.replaceOSD.ID) + skipPreparePod = false + } + } + + if skipPreparePod { logger.Infof("skipping OSD prepare job creation for PVC %q because OSD daemon using the PVC already exists", osdProps.crushHostname) continue } diff --git a/pkg/operator/ceph/cluster/osd/envs.go b/pkg/operator/ceph/cluster/osd/envs.go index 2ffc1ca10825..074c9e29ca3c 100644 --- a/pkg/operator/ceph/cluster/osd/envs.go +++ b/pkg/operator/ceph/cluster/osd/envs.go @@ -49,6 +49,7 @@ const ( CrushDeviceClassVarName = "ROOK_OSD_CRUSH_DEVICE_CLASS" CrushInitialWeightVarName = "ROOK_OSD_CRUSH_INITIAL_WEIGHT" OSDStoreTypeVarName = "ROOK_OSD_STORE_TYPE" + ReplaceOSDIDVarName = "ROOK_REPLACE_OSD" CrushRootVarName = "ROOK_CRUSHMAP_ROOT" tcmallocMaxTotalThreadCacheBytesEnv = "TCMALLOC_MAX_TOTAL_THREAD_CACHE_BYTES" ) @@ -175,6 +176,10 @@ func osdStoreTypeEnvVar(storeType string) v1.EnvVar { return v1.EnvVar{Name: OSDStoreTypeVarName, Value: storeType} } +func replaceOSDIDEnvVar(id string) v1.EnvVar { + return v1.EnvVar{Name: ReplaceOSDIDVarName, Value: id} +} + func crushInitialWeightEnvVar(crushInitialWeight string) v1.EnvVar { return v1.EnvVar{Name: CrushInitialWeightVarName, Value: crushInitialWeight} } diff --git a/pkg/operator/ceph/cluster/osd/health.go b/pkg/operator/ceph/cluster/osd/health.go index ed18a3427379..00febe89c20f 100644 --- a/pkg/operator/ceph/cluster/osd/health.go +++ b/pkg/operator/ceph/cluster/osd/health.go @@ -82,6 +82,7 @@ func (m *OSDHealthMonitor) Start(monitoringRoutines map[string]*opcontroller.Clu case <-time.After(*m.interval): logger.Debug("checking osd processes status.") m.checkOSDHealth() + m.updateCephStorageStatus() case <-monitoringRoutines[daemon].InternalCtx.Done(): logger.Infof("stopping monitoring of OSDs in namespace %q", m.clusterInfo.Namespace) @@ -102,23 +103,6 @@ func (m *OSDHealthMonitor) checkOSDHealth() { if err != nil { logger.Debugf("failed to check OSD Dump. %v", err) } - err = m.checkDeviceClasses() - if err != nil { - logger.Debugf("failed to check device classes. %v", err) - } -} - -func (m *OSDHealthMonitor) checkDeviceClasses() error { - devices, err := client.GetDeviceClasses(m.context, m.clusterInfo) - if err != nil { - return errors.Wrap(err, "failed to get osd device classes") - } - - if len(devices) > 0 { - m.updateCephStatus(devices) - } - - return nil } func (m *OSDHealthMonitor) checkOSDDump() error { @@ -191,15 +175,30 @@ func (m *OSDHealthMonitor) removeOSDDeploymentIfSafeToDestroy(outOSDid int) erro return nil } -// updateCephStorage updates the CR with deviceclass details -func (m *OSDHealthMonitor) updateCephStatus(devices []string) { +// updateCephStorageStatus updates the Storage details in the ceph cluster CR +func (m *OSDHealthMonitor) updateCephStorageStatus() { cephCluster := cephv1.CephCluster{} cephClusterStorage := cephv1.CephStorage{} - for _, device := range devices { - cephClusterStorage.DeviceClasses = append(cephClusterStorage.DeviceClasses, cephv1.DeviceClasses{Name: device}) + deviceClasses, err := client.GetDeviceClasses(m.context, m.clusterInfo) + if err != nil { + logger.Errorf("failed to get osd device classes. %v", err) + return + } + + for _, deviceClass := range deviceClasses { + cephClusterStorage.DeviceClasses = append(cephClusterStorage.DeviceClasses, cephv1.DeviceClasses{Name: deviceClass}) + } + + osdStore, err := m.getOSDStoreStatus() + if err != nil { + logger.Errorf("failed to get osd store status. %v", err) + return } - err := m.context.Client.Get(m.clusterInfo.Context, m.clusterInfo.NamespacedName(), &cephCluster) + + cephClusterStorage.OSD = *osdStore + + err = m.context.Client.Get(m.clusterInfo.Context, m.clusterInfo.NamespacedName(), &cephCluster) if err != nil { if kerrors.IsNotFound(err) { logger.Debug("CephCluster resource not found. Ignoring since object must be deleted.") @@ -216,3 +215,25 @@ func (m *OSDHealthMonitor) updateCephStatus(devices []string) { } } } + +func (m *OSDHealthMonitor) getOSDStoreStatus() (*cephv1.OSDStatus, error) { + label := fmt.Sprintf("%s=%s", k8sutil.AppAttr, AppName) + osdDeployments, err := k8sutil.GetDeployments(m.clusterInfo.Context, m.context.Clientset, m.clusterInfo.Namespace, label) + if err != nil { + if kerrors.IsNotFound(err) { + return nil, nil + } + return nil, errors.Wrap(err, "failed to get osd deployments") + } + + storeType := map[string]int{} + for i := range osdDeployments.Items { + if osdStore, ok := osdDeployments.Items[i].Labels[osdStore]; ok { + storeType[osdStore]++ + } + } + + return &cephv1.OSDStatus{ + StoreType: storeType, + }, nil +} diff --git a/pkg/operator/ceph/cluster/osd/health_test.go b/pkg/operator/ceph/cluster/osd/health_test.go index b260e773f6c1..fe01891078fd 100644 --- a/pkg/operator/ceph/cluster/osd/health_test.go +++ b/pkg/operator/ceph/cluster/osd/health_test.go @@ -141,46 +141,103 @@ func TestNewOSDHealthMonitor(t *testing.T) { } } -func TestDeviceClasses(t *testing.T) { +func TestUpdateCephStorageStatus(t *testing.T) { + ctx := context.TODO() clusterInfo := client.AdminTestClusterInfo("fake") - clusterInfo.SetName("rook-ceph") - - var execCount = 0 executor := &exectest.MockExecutor{ MockExecuteCommandWithOutput: func(command string, args ...string) (string, error) { - return "{\"key\":\"mysecurekey\", \"osdid\":3.0}", nil + logger.Infof("ExecuteCommandWithOutputFile: %s %v", command, args) + if args[1] == "crush" && args[2] == "class" && args[3] == "ls" { + // Mock executor for OSD crush class list command, returning ssd as available device class + return `["ssd"]`, nil + } + return "", nil }, } - executor.MockExecuteCommandWithOutput = func(command string, args ...string) (string, error) { - logger.Infof("ExecuteCommandWithOutputFile: %s %v", command, args) - execCount++ - if args[1] == "crush" && args[2] == "class" && args[3] == "ls" { - // Mock executor for OSD crush class list command, returning ssd as available device class - return `["ssd"]`, nil - } - return "", nil - } - cephCluster := &cephv1.CephCluster{} + cephCluster := &cephv1.CephCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testing", + Namespace: "fake", + }, + } // Objects to track in the fake client. object := []runtime.Object{ cephCluster, } s := scheme.Scheme // Create a fake client to mock API calls. - cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(object...).Build() + c := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(object...).Build() context := &clusterd.Context{ - Executor: executor, - Client: cl, + Executor: executor, + Client: c, + Clientset: testexec.New(t, 2), } // Initializing an OSD monitoring - osdMon := NewOSDHealthMonitor(context, clusterInfo, true, cephv1.CephClusterHealthCheckSpec{}) + osdHealthMonitor := NewOSDHealthMonitor(context, clusterInfo, true, cephv1.CephClusterHealthCheckSpec{}) + + t.Run("verify ssd device class added to storage status", func(t *testing.T) { + osdHealthMonitor.updateCephStorageStatus() + err := context.Client.Get(clusterInfo.Context, clusterInfo.NamespacedName(), cephCluster) + assert.NoError(t, err) + assert.Equal(t, 1, len(cephCluster.Status.CephStorage.DeviceClasses)) + assert.Equal(t, "ssd", cephCluster.Status.CephStorage.DeviceClasses[0].Name) + }) + + t.Run("verify bluestore OSD count in storage status", func(t *testing.T) { + labels := map[string]string{ + k8sutil.AppAttr: AppName, + k8sutil.ClusterAttr: clusterInfo.Namespace, + OsdIdLabelKey: "0", + osdStore: "bluestore", + } - // Run OSD monitoring routine - err := osdMon.checkDeviceClasses() - assert.Nil(t, err) - // checkDeviceClasses has 1 mocked cmd for fetching the device classes - assert.Equal(t, 1, execCount) + deployment := &apps.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "osd0", + Namespace: clusterInfo.Namespace, + Labels: labels, + }, + } + if _, err := context.Clientset.AppsV1().Deployments(clusterInfo.Namespace).Create(ctx, deployment, metav1.CreateOptions{}); err != nil { + logger.Errorf("Error creating fake deployment: %v", err) + } + osdHealthMonitor.updateCephStorageStatus() + err := context.Client.Get(clusterInfo.Context, clusterInfo.NamespacedName(), cephCluster) + assert.NoError(t, err) + assert.Equal(t, 1, len(cephCluster.Status.CephStorage.DeviceClasses)) + assert.Equal(t, "ssd", cephCluster.Status.CephStorage.DeviceClasses[0].Name) + assert.Equal(t, 1, cephCluster.Status.CephStorage.OSD.StoreType["bluestore"]) + assert.Equal(t, 0, cephCluster.Status.CephStorage.OSD.StoreType["bluestore-rdr"]) + + }) + + t.Run("verify bluestoreRDR OSD count in storage status", func(t *testing.T) { + labels := map[string]string{ + k8sutil.AppAttr: AppName, + k8sutil.ClusterAttr: clusterInfo.Namespace, + OsdIdLabelKey: "1", + osdStore: "bluestore-rdr", + } + + deployment := &apps.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "osd1", + Namespace: clusterInfo.Namespace, + Labels: labels, + }, + } + if _, err := context.Clientset.AppsV1().Deployments(clusterInfo.Namespace).Create(ctx, deployment, metav1.CreateOptions{}); err != nil { + logger.Errorf("Error creating fake deployment: %v", err) + } + osdHealthMonitor.updateCephStorageStatus() + err := context.Client.Get(clusterInfo.Context, clusterInfo.NamespacedName(), cephCluster) + assert.NoError(t, err) + assert.Equal(t, 1, len(cephCluster.Status.CephStorage.DeviceClasses)) + assert.Equal(t, "ssd", cephCluster.Status.CephStorage.DeviceClasses[0].Name) + assert.Equal(t, 1, cephCluster.Status.CephStorage.OSD.StoreType["bluestore-rdr"]) + assert.Equal(t, 1, cephCluster.Status.CephStorage.OSD.StoreType["bluestore"]) + }) } diff --git a/pkg/operator/ceph/cluster/osd/labels.go b/pkg/operator/ceph/cluster/osd/labels.go index b3e7e6471f83..f663311f0082 100644 --- a/pkg/operator/ceph/cluster/osd/labels.go +++ b/pkg/operator/ceph/cluster/osd/labels.go @@ -59,7 +59,7 @@ func (c *Cluster) getOSDLabels(osd OSDInfo, failureDomainValue string, portable labels[FailureDomainKey] = failureDomainValue labels[portableKey] = strconv.FormatBool(portable) labels[deviceClass] = osd.DeviceClass - labels[osdStore] = c.spec.Storage.GetOSDStore() + labels[osdStore] = osd.Store for k, v := range getOSDTopologyLocationLabels(osd.Location) { labels[k] = v diff --git a/pkg/operator/ceph/cluster/osd/osd.go b/pkg/operator/ceph/cluster/osd/osd.go index bd8a62ade919..7df6b01695c0 100644 --- a/pkg/operator/ceph/cluster/osd/osd.go +++ b/pkg/operator/ceph/cluster/osd/osd.go @@ -34,6 +34,7 @@ import ( "github.com/rook/rook/pkg/operator/ceph/controller" cephver "github.com/rook/rook/pkg/operator/ceph/version" "github.com/rook/rook/pkg/operator/k8sutil" + "github.com/rook/rook/pkg/util" "github.com/coreos/pkg/capnslog" "github.com/pkg/errors" @@ -46,7 +47,9 @@ import ( ) var ( - logger = capnslog.NewPackageLogger("github.com/rook/rook", "op-osd") + logger = capnslog.NewPackageLogger("github.com/rook/rook", "op-osd") + waitForHealthyPGInterval = 10 * time.Second + waitForHealthyPGTimeout = 15 * time.Minute ) const ( @@ -79,6 +82,7 @@ type Cluster struct { ValidStorage cephv1.StorageScopeSpec // valid subset of `Storage`, computed at runtime kv *k8sutil.ConfigMapKVStore deviceSets []deviceSet + replaceOSD *OSDReplaceInfo } // New creates an instance of the OSD manager @@ -112,6 +116,7 @@ type OSDInfo struct { TopologyAffinity string `json:"topologyAffinity"` Encrypted bool `json:"encrypted"` ExportService bool `json:"exportService"` + NodeName string `json:"nodeName"` } // OrchestrationStatus represents the status of an OSD orchestration @@ -193,16 +198,24 @@ func (c *Cluster) Start() error { } logger.Infof("wait timeout for healthy OSDs during upgrade or restart is %q", c.clusterInfo.OsdUpgradeTimeout) - // prepare for updating existing OSDs - updateQueue, deployments, err := c.getOSDUpdateInfo(errs) + // identify OSDs that need migration to a new backend store + err := c.replaceOSDForNewStore() if err != nil { - return errors.Wrapf(err, "failed to get information about currently-running OSD Deployments in namespace %q", namespace) + return errors.Wrapf(err, "failed to replace an OSD that needs migration to new backend store in namespace %q", namespace) } + osdsToSkipReconcile, err := c.getOSDsToSkipReconcile() if err != nil { logger.Warningf("failed to get osds to skip reconcile. %v", err) } - logger.Debugf("%d of %d OSD Deployments need updated", updateQueue.Len(), deployments.Len()) + + // prepare for updating existing OSDs + updateQueue, deployments, err := c.getOSDUpdateInfo(errs) + if err != nil { + return errors.Wrapf(err, "failed to get information about currently-running OSD Deployments in namespace %q", namespace) + } + + logger.Debugf("%d of %d OSD Deployments need update", updateQueue.Len(), deployments.Len()) updateConfig := c.newUpdateConfig(config, updateQueue, deployments, osdsToSkipReconcile) // prepare for creating new OSDs @@ -247,6 +260,22 @@ func (c *Cluster) Start() error { return errors.Wrapf(err, "failed to reconcile key rotation cron jobs") } + if c.replaceOSD != nil { + delOpts := &k8sutil.DeleteOptions{MustDelete: true, WaitOptions: k8sutil.WaitOptions{Wait: true}} + err := k8sutil.DeleteConfigMap(c.clusterInfo.Context, c.context.Clientset, OSDReplaceConfigName, namespace, delOpts) + if err != nil { + return errors.Wrapf(err, "failed to delete the %q configmap", OSDReplaceConfigName) + } + + // Wait for PGs to be healthy before continuing the reconcile + _, err = c.waitForHealthyPGs() + if err != nil { + return errors.Wrapf(err, "failed to wait for pgs to be healhty") + } + + return errors.New("reconcile operator to replace OSDs that are pending migration") + } + logger.Infof("finished running OSDs in namespace %q", namespace) return nil } @@ -496,6 +525,9 @@ func (c *Cluster) getOSDInfo(d *appsv1.Deployment) (OSDInfo, error) { isPVC := false for _, envVar := range d.Spec.Template.Spec.Containers[0].Env { + if envVar.Name == "ROOK_NODE_NAME" { + osd.NodeName = envVar.Value + } if envVar.Name == "ROOK_OSD_UUID" { osd.UUID = envVar.Value } @@ -577,6 +609,8 @@ func (c *Cluster) getOSDInfo(d *appsv1.Deployment) (OSDInfo, error) { return OSDInfo{}, errors.Errorf("failed to get required osdInfo. %+v", osd) } + osd.Store = d.Labels[osdStore] + return osd, nil } @@ -779,3 +813,61 @@ func (c *Cluster) applyUpgradeOSDFunctionality() { } } } + +// replaceOSDForNewStore deletes an existing OSD deployment that does not not match the expected OSD backend store provided in the storage spec +func (c *Cluster) replaceOSDForNewStore() error { + if c.spec.Storage.Store.UpdateStore != OSDStoreUpdateConfirmation { + logger.Debugf("no OSD migration to a new backend store is requested") + return nil + } + + osdToReplace, err := c.getOSDReplaceInfo() + if err != nil { + return errors.Wrapf(err, "failed to get OSD replace info") + } + + if osdToReplace == nil { + logger.Info("no osd to replace") + return nil + } + + logger.Infof("starting migration of the OSD.%d", osdToReplace.ID) + + // Delete the OSD deployment + deploymentName := fmt.Sprintf("rook-ceph-osd-%d", osdToReplace.ID) + logger.Infof("removing the OSD deployment %q", deploymentName) + if err := k8sutil.DeleteDeployment(c.clusterInfo.Context, c.context.Clientset, c.clusterInfo.Namespace, deploymentName); err != nil { + if err != nil { + if kerrors.IsNotFound(err) { + logger.Debugf("osd deployment %q not found. Ignoring since object must be deleted.", deploymentName) + } else { + return errors.Wrapf(err, "failed to delete OSD deployment %q.", deploymentName) + } + } + } + + c.replaceOSD = osdToReplace + + return osdToReplace.saveAsConfig(c.context, c.clusterInfo) +} + +func (c *Cluster) waitForHealthyPGs() (bool, error) { + waitFunc := func() (done bool, err error) { + pgHealthMsg, pgClean, err := cephclient.IsClusterClean(c.context, c.clusterInfo) + if err != nil { + return false, errors.Wrap(err, "failed to check pg are healthy") + } + if pgClean { + return true, nil + } + logger.Infof("waiting for PGs to be healthy after replacing an OSD, status: %q", pgHealthMsg) + return false, nil + } + + err := util.RetryWithTimeout(waitFunc, waitForHealthyPGInterval, waitForHealthyPGTimeout, "pgs to be healthy") + if err != nil { + return false, err + } + + return true, nil +} diff --git a/pkg/operator/ceph/cluster/osd/osd_test.go b/pkg/operator/ceph/cluster/osd/osd_test.go index 2e8bedf166e8..d9d84a0eff9c 100644 --- a/pkg/operator/ceph/cluster/osd/osd_test.go +++ b/pkg/operator/ceph/cluster/osd/osd_test.go @@ -18,6 +18,8 @@ package osd import ( "context" + "encoding/json" + "fmt" "testing" "github.com/pkg/errors" @@ -45,6 +47,11 @@ import ( k8stesting "k8s.io/client-go/testing" ) +const ( + healthyCephStatus = `{"fsid":"877a47e0-7f6c-435e-891a-76983ab8c509","health":{"checks":{},"status":"HEALTH_OK"},"election_epoch":12,"quorum":[0,1,2],"quorum_names":["a","b","c"],"monmap":{"epoch":3,"fsid":"877a47e0-7f6c-435e-891a-76983ab8c509","modified":"2020-11-02 09:58:23.015313","created":"2020-11-02 09:57:37.719235","min_mon_release":14,"min_mon_release_name":"nautilus","features":{"persistent":["kraken","luminous","mimic","osdmap-prune","nautilus"],"optional":[]},"mons":[{"rank":0,"name":"a","public_addrs":{"addrvec":[{"type":"v2","addr":"172.30.74.42:3300","nonce":0},{"type":"v1","addr":"172.30.74.42:6789","nonce":0}]},"addr":"172.30.74.42:6789/0","public_addr":"172.30.74.42:6789/0"},{"rank":1,"name":"b","public_addrs":{"addrvec":[{"type":"v2","addr":"172.30.101.61:3300","nonce":0},{"type":"v1","addr":"172.30.101.61:6789","nonce":0}]},"addr":"172.30.101.61:6789/0","public_addr":"172.30.101.61:6789/0"},{"rank":2,"name":"c","public_addrs":{"addrvec":[{"type":"v2","addr":"172.30.250.55:3300","nonce":0},{"type":"v1","addr":"172.30.250.55:6789","nonce":0}]},"addr":"172.30.250.55:6789/0","public_addr":"172.30.250.55:6789/0"}]},"osdmap":{"osdmap":{"epoch":19,"num_osds":3,"num_up_osds":3,"num_in_osds":3,"num_remapped_pgs":0}},"pgmap":{"pgs_by_state":[{"state_name":"active+clean","count":96}],"num_pgs":96,"num_pools":3,"num_objects":79,"data_bytes":81553681,"bytes_used":3255447552,"bytes_avail":1646011994112,"bytes_total":1649267441664,"read_bytes_sec":853,"write_bytes_sec":5118,"read_op_per_sec":1,"write_op_per_sec":0},"fsmap":{"epoch":9,"id":1,"up":1,"in":1,"max":1,"by_rank":[{"filesystem_id":1,"rank":0,"name":"ocs-storagecluster-cephfilesystem-b","status":"up:active","gid":14161},{"filesystem_id":1,"rank":0,"name":"ocs-storagecluster-cephfilesystem-a","status":"up:standby-replay","gid":24146}],"up:standby":0},"mgrmap":{"epoch":10,"active_gid":14122,"active_name":"a","active_addrs":{"addrvec":[{"type":"v2","addr":"10.131.0.28:6800","nonce":1},{"type":"v1","addr":"10.131.0.28:6801","nonce":1}]}}}` + unHealthyCephStatus = `{"fsid":"613975f3-3025-4802-9de1-a2280b950e75","health":{"checks":{"OSD_DOWN":{"severity":"HEALTH_WARN","summary":{"message":"1 osds down"}},"OSD_HOST_DOWN":{"severity":"HEALTH_WARN","summary":{"message":"1 host (1 osds) down"}},"PG_AVAILABILITY":{"severity":"HEALTH_WARN","summary":{"message":"Reduced data availability: 101 pgs stale"}},"POOL_APP_NOT_ENABLED":{"severity":"HEALTH_WARN","summary":{"message":"application not enabled on 1 pool(s)"}}},"status":"HEALTH_WARN","overall_status":"HEALTH_WARN"},"election_epoch":12,"quorum":[0,1,2],"quorum_names":["rook-ceph-mon0","rook-ceph-mon2","rook-ceph-mon1"],"monmap":{"epoch":3,"fsid":"613975f3-3025-4802-9de1-a2280b950e75","modified":"2017-08-11 20:13:02.075679","created":"2017-08-11 20:12:35.314510","features":{"persistent":["kraken","luminous"],"optional":[]},"mons":[{"rank":0,"name":"rook-ceph-mon0","addr":"10.3.0.45:6789/0","public_addr":"10.3.0.45:6789/0"},{"rank":1,"name":"rook-ceph-mon2","addr":"10.3.0.249:6789/0","public_addr":"10.3.0.249:6789/0"},{"rank":2,"name":"rook-ceph-mon1","addr":"10.3.0.252:6789/0","public_addr":"10.3.0.252:6789/0"}]},"osdmap":{"osdmap":{"epoch":17,"num_osds":2,"num_up_osds":1,"num_in_osds":2,"full":false,"nearfull":true,"num_remapped_pgs":0}},"pgmap":{"pgs_by_state":[{"state_name":"stale+active+clean","count":101},{"state_name":"active+clean","count":99}],"num_pgs":200,"num_pools":2,"num_objects":243,"data_bytes":976793635,"bytes_used":13611479040,"bytes_avail":19825307648,"bytes_total":33436786688},"fsmap":{"epoch":1,"by_rank":[]},"mgrmap":{"epoch":3,"active_gid":14111,"active_name":"rook-ceph-mgr0","active_addr":"10.2.73.6:6800/9","available":true,"standbys":[],"modules":["restful","status"],"available_modules":["dashboard","prometheus","restful","status","zabbix"]},"servicemap":{"epoch":1,"modified":"0.000000","services":{}}}` +) + func TestOSDProperties(t *testing.T) { osdProps := []osdProperties{ {pvc: corev1.PersistentVolumeClaimVolumeSource{ClaimName: "claim"}, @@ -640,3 +647,169 @@ func TestGetOSDInfoWithCustomRoot(t *testing.T) { _, err := c.getOSDInfo(d3) assert.Error(t, err) } + +func TestReplaceOSDForNewStore(t *testing.T) { + clusterInfo := &cephclient.ClusterInfo{Namespace: "ns", Context: context.TODO()} + clusterInfo.SetName("test") + clusterInfo.OwnerInfo = cephclient.NewMinimumOwnerInfo(t) + executor := &exectest.MockExecutor{ + MockExecuteCommandWithOutput: func(command string, args ...string) (string, error) { + logger.Infof("Command: %s %v", command, args) + if args[0] == "status" { + return healthyCephStatus, nil + } + return "", errors.Errorf("unexpected ceph command '%v'", args) + }, + } + clientset := fake.NewSimpleClientset() + context := &clusterd.Context{ + Clientset: clientset, + Executor: executor, + } + + t.Run("no osd migration is requested in the cephcluster spec", func(t *testing.T) { + spec := cephv1.ClusterSpec{ + Storage: cephv1.StorageScopeSpec{ + Store: cephv1.OSDStore{}, + }, + } + c := New(context, clusterInfo, spec, "myversion") + err := c.replaceOSDForNewStore() + assert.NoError(t, err) + assert.Nil(t, c.replaceOSD) + }) + + t.Run("migration is requested but no osd pods are running", func(t *testing.T) { + spec := cephv1.ClusterSpec{ + Storage: cephv1.StorageScopeSpec{ + Store: cephv1.OSDStore{ + Type: "bluestore-rdr", + UpdateStore: "yes-really-update-store", + }, + }, + } + c := New(context, clusterInfo, spec, "myversion") + err := c.replaceOSDForNewStore() + assert.NoError(t, err) + assert.Nil(t, c.replaceOSD) + }) + + t.Run("migration is requested but all OSDs are running on expected backed store", func(t *testing.T) { + spec := cephv1.ClusterSpec{ + Storage: cephv1.StorageScopeSpec{ + Store: cephv1.OSDStore{ + Type: "bluestore-rdr", + UpdateStore: "yes-really-update-store", + }, + }, + } + c := New(context, clusterInfo, spec, "myversion") + d := getDummyDeploymentOnNode(clientset, c, "node2", 0) + d.Labels[osdStore] = "bluestore-rdr" + createDeploymentOrPanic(clientset, d) + err := c.replaceOSDForNewStore() + assert.NoError(t, err) + assert.Nil(t, c.replaceOSD) + }) + + t.Run("migration is requested and one OSD on node is running legacy backend store", func(t *testing.T) { + spec := cephv1.ClusterSpec{ + Storage: cephv1.StorageScopeSpec{ + Store: cephv1.OSDStore{ + Type: "bluestore-rdr", + UpdateStore: "yes-really-update-store", + }, + }, + } + c := New(context, clusterInfo, spec, "myversion") + // create osd deployment with `bluestore` backend store + d := getDummyDeploymentOnNode(clientset, c, "node2", 1) + createDeploymentOrPanic(clientset, d) + err := c.replaceOSDForNewStore() + assert.NoError(t, err) + assert.NotNil(t, c.replaceOSD) + assert.Equal(t, 1, c.replaceOSD.ID) + assert.Equal(t, "node2", c.replaceOSD.Node) + + // assert that OSD.1 deployment got deleted + _, err = clientset.AppsV1().Deployments(clusterInfo.Namespace).Get(clusterInfo.Context, deploymentName(1), metav1.GetOptions{}) + assert.Equal(t, true, k8serrors.IsNotFound(err)) + + // validate the osd replace config map + actualCM, err := clientset.CoreV1().ConfigMaps(clusterInfo.Namespace).Get(clusterInfo.Context, OSDReplaceConfigName, metav1.GetOptions{}) + assert.NoError(t, err) + assert.NotNil(t, actualCM) + expectedOSDInfo := OSDReplaceInfo{} + err = json.Unmarshal([]byte(actualCM.Data[OSDReplaceConfigKey]), &expectedOSDInfo) + assert.NoError(t, err) + assert.Equal(t, 1, expectedOSDInfo.ID) + assert.Equal(t, "node2", expectedOSDInfo.Node) + + // delete configmap + err = k8sutil.DeleteConfigMap(clusterInfo.Context, clientset, OSDReplaceConfigName, clusterInfo.Namespace, &k8sutil.DeleteOptions{}) + assert.NoError(t, err) + }) + + t.Run("migration is requested and one osd on pvc is running on legacy backend store", func(t *testing.T) { + spec := cephv1.ClusterSpec{ + Storage: cephv1.StorageScopeSpec{ + Store: cephv1.OSDStore{ + Type: "bluestore-rdr", + UpdateStore: "yes-really-update-store", + }, + }, + } + c := New(context, clusterInfo, spec, "myversion") + d := getDummyDeploymentOnPVC(clientset, c, "pvc1", 2) + createDeploymentOrPanic(clientset, d) + err := c.replaceOSDForNewStore() + assert.NoError(t, err) + fmt.Printf("%+v", c.replaceOSD) + assert.NotNil(t, c.replaceOSD) + assert.Equal(t, 2, c.replaceOSD.ID) + assert.Equal(t, "/some/path", c.replaceOSD.Path) + + // assert that OSD.2 deployment got deleted + _, err = clientset.AppsV1().Deployments(clusterInfo.Namespace).Get(clusterInfo.Context, deploymentName(2), metav1.GetOptions{}) + assert.Equal(t, true, k8serrors.IsNotFound(err)) + + // validate the osd replace config map + actualCM, err := clientset.CoreV1().ConfigMaps(clusterInfo.Namespace).Get(clusterInfo.Context, OSDReplaceConfigName, metav1.GetOptions{}) + assert.NoError(t, err) + assert.NotNil(t, actualCM) + expectedOSDInfo := OSDReplaceInfo{} + err = json.Unmarshal([]byte(actualCM.Data[OSDReplaceConfigKey]), &expectedOSDInfo) + assert.NoError(t, err) + assert.Equal(t, 2, expectedOSDInfo.ID) + assert.Equal(t, "/some/path", c.replaceOSD.Path) + + // delete configmap + err = k8sutil.DeleteConfigMap(clusterInfo.Context, clientset, OSDReplaceConfigName, clusterInfo.Namespace, &k8sutil.DeleteOptions{}) + assert.NoError(t, err) + }) + + t.Run("migration is requested but pgs are not clean", func(t *testing.T) { + spec := cephv1.ClusterSpec{ + Storage: cephv1.StorageScopeSpec{ + Store: cephv1.OSDStore{ + Type: "bluestore-rdr", + UpdateStore: "yes-really-update-store", + }, + }, + } + executor := &exectest.MockExecutor{ + MockExecuteCommandWithOutput: func(command string, args ...string) (string, error) { + logger.Infof("Command: %s %v", command, args) + if args[0] == "status" { + return unHealthyCephStatus, nil + } + return "", errors.Errorf("unexpected ceph command '%v'", args) + }, + } + context.Executor = executor + c := New(context, clusterInfo, spec, "myversion") + err := c.replaceOSDForNewStore() + assert.NoError(t, err) + assert.Nil(t, c.replaceOSD) + }) +} diff --git a/pkg/operator/ceph/cluster/osd/provision_spec.go b/pkg/operator/ceph/cluster/osd/provision_spec.go index 4f586e4cf94e..9e3673dfbb0b 100644 --- a/pkg/operator/ceph/cluster/osd/provision_spec.go +++ b/pkg/operator/ceph/cluster/osd/provision_spec.go @@ -20,6 +20,7 @@ import ( "encoding/json" "fmt" "path" + "strings" "github.com/pkg/errors" cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1" @@ -303,6 +304,22 @@ func (c *Cluster) provisionOSDContainer(osdProps osdProperties, copyBinariesMoun volumeMounts = append(volumeMounts, v1.VolumeMount{Name: "rootfs", MountPath: "/rootfs", ReadOnly: true}) } + // Add OSD ID as environment variables. + // When this env is set, prepare pod job will destroy this OSD. + if c.replaceOSD != nil { + // Compare pvc claim name in case of OSDs on PVC + if osdProps.onPVC() { + if strings.Contains(c.replaceOSD.Path, osdProps.pvc.ClaimName) { + envVars = append(envVars, replaceOSDIDEnvVar(fmt.Sprint(c.replaceOSD.ID))) + } + } else { + // Compare the node name in case of OSDs on disk + if c.replaceOSD.Node == osdProps.crushHostname { + envVars = append(envVars, replaceOSDIDEnvVar(fmt.Sprint(c.replaceOSD.ID))) + } + } + } + // run privileged always since we always mount /dev privileged := true runAsUser := int64(0) diff --git a/pkg/operator/ceph/cluster/osd/replace.go b/pkg/operator/ceph/cluster/osd/replace.go new file mode 100644 index 000000000000..9fe6bc913533 --- /dev/null +++ b/pkg/operator/ceph/cluster/osd/replace.go @@ -0,0 +1,167 @@ +/* +Copyright 2023 The Rook Authors. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package osd for the Ceph OSDs. +package osd + +import ( + "encoding/json" + "fmt" + + "github.com/pkg/errors" + "github.com/rook/rook/pkg/clusterd" + cephclient "github.com/rook/rook/pkg/daemon/ceph/client" + "github.com/rook/rook/pkg/operator/k8sutil" + v1 "k8s.io/api/core/v1" + kerrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const ( + OSDReplaceConfigName = "osd-replace-config" + OSDReplaceConfigKey = "config" + OSDStoreUpdateConfirmation = "yes-really-update-store" +) + +// OSDReplaceInfo represents an OSD that needs to replaced +type OSDReplaceInfo struct { + ID int `json:"id"` + Path string `json:"path"` + Node string `json:"node"` +} + +func (o *OSDReplaceInfo) saveAsConfig(context *clusterd.Context, clusterInfo *cephclient.ClusterInfo) error { + configStr, err := o.string() + if err != nil { + return errors.Wrapf(err, "failed to convert osd replace config to string") + } + + newConfigMap := &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: OSDReplaceConfigName, + Namespace: clusterInfo.Namespace, + }, + Data: map[string]string{ + OSDReplaceConfigKey: configStr, + }, + } + + err = clusterInfo.OwnerInfo.SetControllerReference(newConfigMap) + if err != nil { + return errors.Wrapf(err, "failed to set owner reference on %q configMap", newConfigMap.Name) + } + + _, err = k8sutil.CreateOrUpdateConfigMap(clusterInfo.Context, context.Clientset, newConfigMap) + if err != nil { + return errors.Wrapf(err, "failed to create or update %q configMap", newConfigMap.Name) + } + + return nil +} + +func (o *OSDReplaceInfo) string() (string, error) { + configInBytes, err := json.Marshal(o) + if err != nil { + return "", errors.Wrap(err, "failed to marshal osd replace config") + } + + return string(configInBytes), nil +} + +// getOSDReplaceInfo returns an existing OSD that needs to be replaced for a new backend store +func (c *Cluster) getOSDReplaceInfo() (*OSDReplaceInfo, error) { + osdReplaceInfo, err := GetOSDReplaceConfigMap(c.context, c.clusterInfo) + if err != nil { + return nil, errors.Wrap(err, "failed to get any existing OSD in replace configmap") + } + + if osdReplaceInfo != nil { + return osdReplaceInfo, nil + } + + pgHealthMsg, pgClean, err := cephclient.IsClusterClean(c.context, c.clusterInfo) + if err != nil { + return nil, errors.Wrapf(err, "failed to check if the pgs are clean before replacing OSDs") + } + if !pgClean { + logger.Warningf("skipping OSD replacement because pgs are not healthy. PG status: %q", pgHealthMsg) + return nil, nil + } + + logger.Infof("placement group status: %q", pgHealthMsg) + + osdsToReplace, err := c.getOSDWithNonMatchingStore() + if err != nil { + return nil, errors.Wrap(err, "failed to list out OSDs with non matching backend store") + } + + if len(osdsToReplace) == 0 { + logger.Infof("all osds have already been migrated to backend store %q", c.spec.Storage.Store.Type) + return nil, nil + } + + logger.Infof("%d osd(s) require migration to new backend store %q.", len(osdsToReplace), c.spec.Storage.Store.Type) + + return &osdsToReplace[0], nil +} + +// getOSDWithNonMatchingStore returns OSDs with osd-store label different from expected store in cephCluster spec +func (c *Cluster) getOSDWithNonMatchingStore() ([]OSDReplaceInfo, error) { + listOpts := metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", k8sutil.AppAttr, AppName)} + deployments, err := c.context.Clientset.AppsV1().Deployments(c.clusterInfo.Namespace).List(c.clusterInfo.Context, listOpts) + if err != nil { + return nil, errors.Wrap(err, "failed to query OSDs to skip reconcile") + } + + osdReplaceList := []OSDReplaceInfo{} + for i := range deployments.Items { + if osdStore, ok := deployments.Items[i].Labels[osdStore]; ok { + if osdStore != string(c.spec.Storage.Store.Type) { + osdInfo, err := c.getOSDInfo(&deployments.Items[i]) + if err != nil { + return nil, errors.Wrapf(err, "failed to details about the OSD %q", deployments.Items[i].Name) + } + osdReplaceList = append(osdReplaceList, OSDReplaceInfo{ID: osdInfo.ID, Path: osdInfo.BlockPath, Node: osdInfo.NodeName}) + } + } + } + + return osdReplaceList, nil +} + +// GetOSDReplaceConfigMap returns the OSD replace config map +func GetOSDReplaceConfigMap(context *clusterd.Context, clusterInfo *cephclient.ClusterInfo) (*OSDReplaceInfo, error) { + cm, err := context.Clientset.CoreV1().ConfigMaps(clusterInfo.Namespace).Get(clusterInfo.Context, OSDReplaceConfigName, metav1.GetOptions{}) + if err != nil { + if kerrors.IsNotFound(err) { + return nil, nil + } + } + + configStr, ok := cm.Data[OSDReplaceConfigKey] + if !ok || configStr == "" { + logger.Debugf("empty config map %q", OSDReplaceConfigName) + return nil, nil + } + + config := &OSDReplaceInfo{} + err = json.Unmarshal([]byte(configStr), config) + if err != nil { + return nil, errors.Wrapf(err, "failed to JSON unmarshal osd replace status from the (%q)", configStr) + } + + return config, nil +} diff --git a/pkg/operator/ceph/cluster/osd/replace_test.go b/pkg/operator/ceph/cluster/osd/replace_test.go new file mode 100644 index 000000000000..e47e8e12a75e --- /dev/null +++ b/pkg/operator/ceph/cluster/osd/replace_test.go @@ -0,0 +1,287 @@ +/* +Copyright 2023 The Rook Authors. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package osd for the Ceph OSDs. +package osd + +import ( + "context" + "testing" + + "github.com/pkg/errors" + cephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1" + "github.com/rook/rook/pkg/clusterd" + cephclient "github.com/rook/rook/pkg/daemon/ceph/client" + "github.com/rook/rook/pkg/operator/k8sutil" + exectest "github.com/rook/rook/pkg/util/exec/test" + "github.com/stretchr/testify/assert" + appsv1 "k8s.io/api/apps/v1" + "k8s.io/client-go/kubernetes/fake" +) + +func TestGetOSDWithNonMatchingStoreOnNodes(t *testing.T) { + namespace := "rook-ceph" + namespace2 := "rook-ceph2" + clientset := fake.NewSimpleClientset() + ctx := &clusterd.Context{ + Clientset: clientset, + } + clusterInfo := &cephclient.ClusterInfo{ + Namespace: namespace, + Context: context.TODO(), + } + clusterInfo.SetName("mycluster") + clusterInfo.OwnerInfo = cephclient.NewMinimumOwnerInfo(t) + spec := cephv1.ClusterSpec{ + Storage: cephv1.StorageScopeSpec{ + Store: cephv1.OSDStore{ + Type: "bluestore-rdr", + }, + }, + } + c := New(ctx, clusterInfo, spec, "rook/rook:master") + + var d *appsv1.Deployment + + t.Run("all osd deployments are running on bluestore-rdr osd store", func(t *testing.T) { + d = getDummyDeploymentOnNode(clientset, c, "node2", 0) + d.Labels[osdStore] = "bluestore-rdr" + createDeploymentOrPanic(clientset, d) + + d = getDummyDeploymentOnNode(clientset, c, "node3", 1) + d.Labels[osdStore] = "bluestore-rdr" + createDeploymentOrPanic(clientset, d) + + d = getDummyDeploymentOnNode(clientset, c, "node4", 2) + d.Labels[osdStore] = "bluestore-rdr" + createDeploymentOrPanic(clientset, d) + + osdList, err := c.getOSDWithNonMatchingStore() + assert.NoError(t, err) + assert.Equal(t, 0, len(osdList)) + }) + + t.Run("all osd deployments are not running on bluestore-rdr store", func(t *testing.T) { + c.clusterInfo.Namespace = namespace2 + + // osd.0 is still using bluestore + d = getDummyDeploymentOnNode(clientset, c, "node2", 0) + createDeploymentOrPanic(clientset, d) + + // osd.1 and osd.2 are using `bluestore-rdr` + d = getDummyDeploymentOnNode(clientset, c, "node3", 1) + d.Labels[osdStore] = "bluestore-rdr" + createDeploymentOrPanic(clientset, d) + + d = getDummyDeploymentOnNode(clientset, c, "node4", 2) + d.Labels[osdStore] = "bluestore-rdr" + createDeploymentOrPanic(clientset, d) + + osdList, err := c.getOSDWithNonMatchingStore() + assert.NoError(t, err) + assert.Equal(t, 1, len(osdList)) + assert.Equal(t, 0, osdList[0].ID) + assert.Equal(t, "node2", osdList[0].Node) + assert.Equal(t, "/dev/vda", osdList[0].Path) + }) +} + +func TestGetOSDWithNonMatchingStoreOnPVCs(t *testing.T) { + namespace := "rook-ceph" + namespace2 := "rook-ceph2" + clientset := fake.NewSimpleClientset() + ctx := &clusterd.Context{ + Clientset: clientset, + } + clusterInfo := &cephclient.ClusterInfo{ + Namespace: namespace, + Context: context.TODO(), + } + clusterInfo.SetName("mycluster") + clusterInfo.OwnerInfo = cephclient.NewMinimumOwnerInfo(t) + spec := cephv1.ClusterSpec{ + Storage: cephv1.StorageScopeSpec{ + Store: cephv1.OSDStore{ + Type: "bluestore-rdr", + }, + }, + } + c := New(ctx, clusterInfo, spec, "rook/rook:master") + + var d *appsv1.Deployment + + t.Run("all osd deployments are running on bluestore-rdr osd store", func(t *testing.T) { + d = getDummyDeploymentOnPVC(clientset, c, "pvc0", 0) + d.Labels[osdStore] = "bluestore-rdr" + createDeploymentOrPanic(clientset, d) + + d = getDummyDeploymentOnPVC(clientset, c, "pvc1", 1) + d.Labels[osdStore] = "bluestore-rdr" + createDeploymentOrPanic(clientset, d) + + d = getDummyDeploymentOnPVC(clientset, c, "pvc2", 2) + d.Labels[osdStore] = "bluestore-rdr" + createDeploymentOrPanic(clientset, d) + + osdList, err := c.getOSDWithNonMatchingStore() + assert.NoError(t, err) + assert.Equal(t, 0, len(osdList)) + }) + + t.Run("all osd deployments are not running on bluestore-rdr store", func(t *testing.T) { + c.clusterInfo.Namespace = namespace2 + + // osd.0 is still using `bluestore` + d = getDummyDeploymentOnPVC(clientset, c, "pvc0", 0) + createDeploymentOrPanic(clientset, d) + + d = getDummyDeploymentOnPVC(clientset, c, "pvc1", 1) + d.Labels[osdStore] = "bluestore-rdr" + createDeploymentOrPanic(clientset, d) + + d = getDummyDeploymentOnPVC(clientset, c, "pvc2", 2) + d.Labels[osdStore] = "bluestore-rdr" + createDeploymentOrPanic(clientset, d) + + osdList, err := c.getOSDWithNonMatchingStore() + assert.NoError(t, err) + assert.Equal(t, 1, len(osdList)) + assert.Equal(t, 0, osdList[0].ID) + assert.Equal(t, "/some/path", osdList[0].Path) + }) +} + +func TestGetOSDReplaceInfo(t *testing.T) { + namespace := "rook-ceph" + clientset := fake.NewSimpleClientset() + executor := &exectest.MockExecutor{ + MockExecuteCommandWithOutput: func(command string, args ...string) (string, error) { + logger.Infof("Command: %s %v", command, args) + if args[0] == "status" { + return healthyCephStatus, nil + } + return "", errors.Errorf("unexpected ceph command '%v'", args) + }, + } + ctx := &clusterd.Context{ + Clientset: clientset, + Executor: executor, + } + clusterInfo := &cephclient.ClusterInfo{ + Namespace: namespace, + Context: context.TODO(), + } + clusterInfo.SetName("mycluster") + clusterInfo.OwnerInfo = cephclient.NewMinimumOwnerInfo(t) + + spec := cephv1.ClusterSpec{} + c := New(ctx, clusterInfo, spec, "rook/rook:master") + + t.Run("no OSD replace info available", func(t *testing.T) { + actualOSDInfo, err := c.getOSDReplaceInfo() + assert.NoError(t, err) + assert.Nil(t, actualOSDInfo) + }) + + t.Run("ensure no OSD to replace if pgs are not healthy", func(t *testing.T) { + executor := &exectest.MockExecutor{ + MockExecuteCommandWithOutput: func(command string, args ...string) (string, error) { + logger.Infof("Command: %s %v", command, args) + if args[0] == "status" { + return unHealthyCephStatus, nil + } + return "", errors.Errorf("unexpected ceph command '%v'", args) + }, + } + + ctx.Executor = executor + c := New(ctx, clusterInfo, spec, "rook/rook:master") + actualOSDInfo, err := c.getOSDReplaceInfo() + assert.NoError(t, err) + assert.Nil(t, actualOSDInfo) + }) + + t.Run("read OSD 0 replace info from config map", func(t *testing.T) { + actualOSDInfo := &OSDReplaceInfo{ID: 0, Path: "/dev/sda", Node: "node1"} + err := actualOSDInfo.saveAsConfig(c.context, c.clusterInfo) + assert.NoError(t, err) + expectedOSDInfo, err := c.getOSDReplaceInfo() + assert.NoError(t, err) + assert.NotNil(t, expectedOSDInfo) + assert.Equal(t, 0, expectedOSDInfo.ID) + err = k8sutil.DeleteConfigMap(clusterInfo.Context, ctx.Clientset, OSDReplaceConfigName, namespace, &k8sutil.DeleteOptions{}) + assert.NoError(t, err) + }) + + t.Run("ensure no OSD replace info if all OSDs using expected OSD store", func(t *testing.T) { + spec := cephv1.ClusterSpec{ + Storage: cephv1.StorageScopeSpec{ + Store: cephv1.OSDStore{ + Type: "bluestore-rdr", + UpdateStore: "yes-really-update-store", + }, + }, + } + executor := &exectest.MockExecutor{ + MockExecuteCommandWithOutput: func(command string, args ...string) (string, error) { + logger.Infof("Command: %s %v", command, args) + if args[0] == "status" { + return healthyCephStatus, nil + } + return "", errors.Errorf("unexpected ceph command '%v'", args) + }, + } + + ctx.Executor = executor + c := New(ctx, clusterInfo, spec, "myversion") + d := getDummyDeploymentOnNode(clientset, c, "node2", 1) + d.Labels[osdStore] = "bluestore-rdr" + createDeploymentOrPanic(clientset, d) + expectedOSDInfo, err := c.getOSDReplaceInfo() + assert.NoError(t, err) + assert.Nil(t, expectedOSDInfo) + }) + + t.Run("ensure OSD replace info if OSD store is not up to date", func(t *testing.T) { + spec := cephv1.ClusterSpec{ + Storage: cephv1.StorageScopeSpec{ + Store: cephv1.OSDStore{ + Type: "bluestore-rdr", + UpdateStore: "yes-really-update-store", + }, + }, + } + executor := &exectest.MockExecutor{ + MockExecuteCommandWithOutput: func(command string, args ...string) (string, error) { + logger.Infof("Command: %s %v", command, args) + if args[0] == "status" { + return healthyCephStatus, nil + } + return "", errors.Errorf("unexpected ceph command '%v'", args) + }, + } + + ctx.Executor = executor + c := New(ctx, clusterInfo, spec, "myversion") + d := getDummyDeploymentOnNode(clientset, c, "node3", 2) + createDeploymentOrPanic(clientset, d) + expectedOSDInfo, err := c.getOSDReplaceInfo() + assert.NoError(t, err) + assert.NotNil(t, expectedOSDInfo) + assert.Equal(t, 2, expectedOSDInfo.ID) + assert.Equal(t, "node3", expectedOSDInfo.Node) + }) +} diff --git a/pkg/operator/ceph/cluster/osd/spec_test.go b/pkg/operator/ceph/cluster/osd/spec_test.go index c566554db0ff..2361c673dd30 100644 --- a/pkg/operator/ceph/cluster/osd/spec_test.go +++ b/pkg/operator/ceph/cluster/osd/spec_test.go @@ -718,6 +718,7 @@ func getDummyDeploymentOnPVC(clientset *fake.Clientset, c *Cluster, pvcName stri UUID: "some-uuid", BlockPath: "/some/path", CVMode: "raw", + Store: "bluestore", } c.deviceSets = append(c.deviceSets, deviceSet{ Name: pvcName, @@ -741,6 +742,7 @@ func getDummyDeploymentOnNode(clientset *fake.Clientset, c *Cluster, nodeName st UUID: "some-uuid", BlockPath: "/dev/vda", CVMode: "raw", + Store: "bluestore", } c.ValidStorage.Nodes = append(c.ValidStorage.Nodes, cephv1.Node{Name: nodeName}) config := c.newProvisionConfig() diff --git a/pkg/operator/k8sutil/configmap.go b/pkg/operator/k8sutil/configmap.go index c7754510bb8d..f12ca5f418c2 100644 --- a/pkg/operator/k8sutil/configmap.go +++ b/pkg/operator/k8sutil/configmap.go @@ -23,7 +23,10 @@ import ( "sync" "time" + "github.com/pkg/errors" + v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + kerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" ) @@ -43,6 +46,31 @@ func DeleteConfigMap(ctx context.Context, clientset kubernetes.Interface, cmName return DeleteResource(delete, verify, resource, opts, defaultWaitOptions) } +func CreateOrUpdateConfigMap(ctx context.Context, clientset kubernetes.Interface, cm *v1.ConfigMap) (*v1.ConfigMap, error) { + name := cm.GetName() + namespace := cm.GetNamespace() + existingCm, err := clientset.CoreV1().ConfigMaps(namespace).Get(ctx, name, metav1.GetOptions{}) + if err != nil { + if kerrors.IsNotFound(err) { + cm, err := clientset.CoreV1().ConfigMaps(namespace).Create(ctx, cm, metav1.CreateOptions{}) + if err != nil { + return nil, errors.Wrapf(err, "failed to create %q configmap", name) + } + return cm, nil + } + + return nil, errors.Wrapf(err, "failed to retrieve %q configmap.", name) + } + + existingCm.Data = cm.Data + existingCm.OwnerReferences = cm.OwnerReferences + if existingCm, err := clientset.CoreV1().ConfigMaps(namespace).Update(ctx, existingCm, metav1.UpdateOptions{}); err != nil { + return nil, errors.Wrapf(err, "failed to update existing %q configmap", existingCm.Name) + } + + return existingCm, nil +} + // GetOperatorSetting gets the operator setting from ConfigMap or Env Var // returns defaultValue if setting is not found func GetOperatorSetting(context context.Context, clientset kubernetes.Interface, configMapName, settingName, defaultValue string) (string, error) { diff --git a/pkg/operator/k8sutil/configmap_test.go b/pkg/operator/k8sutil/configmap_test.go index b8db353f3325..cc03f1fcc691 100644 --- a/pkg/operator/k8sutil/configmap_test.go +++ b/pkg/operator/k8sutil/configmap_test.go @@ -137,3 +137,35 @@ func TestLogChangedSettings(t *testing.T) { ok = logChangedSettings(key, value) assert.True(t, true, ok) } + +func TestCreateOrUpdateConfigMap(t *testing.T) { + k8s := fake.NewSimpleClientset() + ctx := context.TODO() + + cm := &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-configmap", + Namespace: "test-namespace", + }, + Data: map[string]string{ + "test": "data", + }, + } + + _, err := CreateOrUpdateConfigMap(ctx, k8s, cm) + assert.NoError(t, err) + + actualCM, err := k8s.CoreV1().ConfigMaps("test-namespace").Get(ctx, "test-configmap", metav1.GetOptions{}) + assert.NoError(t, err) + assert.NotNil(t, actualCM) + assert.Equal(t, "data", actualCM.Data["test"]) + + // update config map + cm.Data["test"] = "updatedData" + _, err = CreateOrUpdateConfigMap(ctx, k8s, cm) + assert.NoError(t, err) + actualCM, err = k8s.CoreV1().ConfigMaps("test-namespace").Get(ctx, "test-configmap", metav1.GetOptions{}) + assert.NoError(t, err) + assert.NotNil(t, actualCM) + assert.Equal(t, "updatedData", actualCM.Data["test"]) +}