diff --git a/cli/command/deploy.go b/cli/command/deploy.go index a193d88f3..be38b5cdc 100644 --- a/cli/command/deploy.go +++ b/cli/command/deploy.go @@ -231,7 +231,6 @@ func genDeployPlaybook(curveadm *cli.CurveAdm, Name: options.poolset, Type: options.poolsetDiskType, } - diskType := options.poolsetDiskType pb := playbook.NewPlaybook(curveadm) for _, step := range steps { @@ -255,8 +254,7 @@ func genDeployPlaybook(curveadm *cli.CurveAdm, options[comm.KEY_NUMBER_OF_CHUNKSERVER] = calcNumOfChunkserver(curveadm, dcs) } else if step == CREATE_LOGICAL_POOL { options[comm.KEY_CREATE_POOL_TYPE] = comm.POOL_TYPE_LOGICAL - options[comm.POOLSET] = poolset - options[comm.POOLSET_DISK_TYPE] = diskType + options[comm.KEY_POOLSET] = poolset options[comm.KEY_NUMBER_OF_CHUNKSERVER] = calcNumOfChunkserver(curveadm, dcs) } diff --git a/cli/command/migrate.go b/cli/command/migrate.go index 310b29cee..33efba765 100644 --- a/cli/command/migrate.go +++ b/cli/command/migrate.go @@ -30,7 +30,10 @@ import ( "github.com/opencurve/curveadm/internal/configure/topology" "github.com/opencurve/curveadm/internal/errno" "github.com/opencurve/curveadm/internal/playbook" - tui "github.com/opencurve/curveadm/internal/tui/common" + "github.com/opencurve/curveadm/internal/task/task/common" + tui "github.com/opencurve/curveadm/internal/tui" + tuicomm "github.com/opencurve/curveadm/internal/tui/common" + cliutil "github.com/opencurve/curveadm/internal/utils" "github.com/spf13/cobra" ) @@ -71,14 +74,12 @@ var ( // chunkserevr (curvebs) MIGRATE_CHUNKSERVER_STEPS = []int{ playbook.BACKUP_ETCD_DATA, - playbook.STOP_SERVICE, - playbook.CLEAN_SERVICE, // only container + playbook.CREATE_PHYSICAL_POOL, // add machine that migrate to playbook.PULL_IMAGE, playbook.CREATE_CONTAINER, playbook.SYNC_CONFIG, - playbook.CREATE_PHYSICAL_POOL, playbook.START_CHUNKSERVER, - playbook.CREATE_LOGICAL_POOL, + playbook.MARK_SERVER_PENGDDING, } // metaserver (curvefs) @@ -100,12 +101,25 @@ var ( topology.ROLE_SNAPSHOTCLONE: MIGRATE_SNAPSHOTCLONE_STEPS, topology.ROLE_METASERVER: MIGRATE_METASERVER_STEPS, } + + MIGRATE_POST_CLEAN_STEPS = []int{ + playbook.STOP_SERVICE, + playbook.CLEAN_SERVICE, // only container + playbook.CREATE_PHYSICAL_POOL, // remove machine that migrate from, only for chunkserver or metaserver + playbook.UPDATE_TOPOLOGY, + } + + GET_MIGRATE_STATUS = []int{ + playbook.GET_MIGRATE_STATUS, + } ) type migrateOptions struct { filename string poolset string poolsetDiskType string + showStatus bool + clean bool } func NewMigrateCommand(curveadm *cli.CurveAdm) *cobra.Command { @@ -125,7 +139,8 @@ func NewMigrateCommand(curveadm *cli.CurveAdm) *cobra.Command { flags := cmd.Flags() flags.StringVar(&options.poolset, "poolset", "default", "Specify the poolset") flags.StringVar(&options.poolsetDiskType, "poolset-disktype", "ssd", "Specify the disk type of physical pool") - + flags.BoolVar(&options.showStatus, "status", false, "Show copyset transferring status") + flags.BoolVar(&options.clean, "clean", false, "Clean migrated environment for chunkserver or metaserver") return cmd } @@ -191,8 +206,21 @@ func genMigratePlaybook(curveadm *cli.CurveAdm, migrates := getMigrates(curveadm, data) role := migrates[0].From.GetRole() steps := MIGRATE_ROLE_STEPS[role] - poolset := options.poolset - poolsetDiskType := options.poolsetDiskType + + // show status + if options.showStatus { + steps = GET_MIGRATE_STATUS + } + + // post clean + if options.clean { + steps = MIGRATE_POST_CLEAN_STEPS + } + + poolset := configure.Poolset{ + Name: options.poolset, + Type: options.poolsetDiskType, + } pb := playbook.NewPlaybook(curveadm) for _, step := range steps { @@ -204,36 +232,40 @@ func genMigratePlaybook(curveadm *cli.CurveAdm, config = dcs2del case playbook.BACKUP_ETCD_DATA: config = curveadm.FilterDeployConfigByRole(dcs, topology.ROLE_ETCD) - case CREATE_PHYSICAL_POOL, - CREATE_LOGICAL_POOL: + case + playbook.CREATE_PHYSICAL_POOL, + playbook.CREATE_LOGICAL_POOL, + playbook.MARK_SERVER_PENGDDING, + playbook.GET_MIGRATE_STATUS: config = curveadm.FilterDeployConfigByRole(dcs, topology.ROLE_MDS)[:1] } // options - options := map[string]interface{}{} + optionsKV := map[string]interface{}{} switch step { case playbook.CLEAN_SERVICE: - options[comm.KEY_CLEAN_ITEMS] = []string{comm.CLEAN_ITEM_CONTAINER} - options[comm.KEY_CLEAN_BY_RECYCLE] = true + optionsKV[comm.KEY_CLEAN_ITEMS] = []string{comm.CLEAN_ITEM_CONTAINER} + optionsKV[comm.KEY_CLEAN_BY_RECYCLE] = true + optionsKV[comm.KEY_REMOVE_MIGRATED_SERVER] = true case playbook.CREATE_PHYSICAL_POOL: - options[comm.KEY_CREATE_POOL_TYPE] = comm.POOL_TYPE_PHYSICAL - options[comm.KEY_MIGRATE_SERVERS] = migrates - options[comm.POOLSET] = poolset - options[comm.POOLSET_DISK_TYPE] = poolsetDiskType + optionsKV[comm.KEY_CREATE_POOL_TYPE] = comm.POOL_TYPE_PHYSICAL + optionsKV[comm.KEY_MIGRATE_SERVERS] = migrates + optionsKV[comm.KEY_POOLSET] = poolset case playbook.CREATE_LOGICAL_POOL: - options[comm.KEY_CREATE_POOL_TYPE] = comm.POOL_TYPE_LOGICAL - options[comm.KEY_MIGRATE_SERVERS] = migrates - options[comm.KEY_NEW_TOPOLOGY_DATA] = data - options[comm.POOLSET] = poolset - options[comm.POOLSET_DISK_TYPE] = poolsetDiskType + optionsKV[comm.KEY_CREATE_POOL_TYPE] = comm.POOL_TYPE_LOGICAL + optionsKV[comm.KEY_MIGRATE_SERVERS] = migrates + optionsKV[comm.KEY_NEW_TOPOLOGY_DATA] = data + optionsKV[comm.KEY_POOLSET] = poolset case playbook.UPDATE_TOPOLOGY: - options[comm.KEY_NEW_TOPOLOGY_DATA] = data + optionsKV[comm.KEY_NEW_TOPOLOGY_DATA] = data + case playbook.GET_MIGRATE_STATUS: + optionsKV[comm.KEY_MIGRATE_SERVERS] = migrates } pb.AddStep(&playbook.PlaybookStep{ Type: step, Configs: config, - Options: options, + Options: optionsKV, ExecOptions: playbook.ExecOptions{ SilentSubBar: step == playbook.UPDATE_TOPOLOGY, }, @@ -252,6 +284,23 @@ func displayMigrateTitle(curveadm *cli.CurveAdm, data string) { curveadm.WriteOutln(color.YellowString(" - Migrate host: from %s to %s", from.GetHost(), to.GetHost())) } +func displayMigrateStatus(curveadm *cli.CurveAdm) { + var output string + statuses := []common.MigrateStatus{} + v := curveadm.MemStorage().Get(comm.KEY_MIGRATE_STATUS) + if v != nil { + m := v.(map[string]common.MigrateStatus) + for _, status := range m { + statuses = append(statuses, status) + } + } + + output = tui.FormatMigrateStatus(statuses) + + curveadm.WriteOutln("") + curveadm.WriteOut("%s", output) +} + func runMigrate(curveadm *cli.CurveAdm, options migrateOptions) error { // TODO(P0): added prechek for target host // 1) parse cluster topology @@ -261,7 +310,11 @@ func runMigrate(curveadm *cli.CurveAdm, options migrateOptions) error { } // 2) read topology from file - data, err := readTopology(curveadm, options.filename) + data, err := readTopology(curveadm, + options.filename, + options.showStatus, + options.clean, + ) if err != nil { return err } @@ -272,13 +325,15 @@ func runMigrate(curveadm *cli.CurveAdm, options migrateOptions) error { return err } - // 4) display title - displayMigrateTitle(curveadm, data) + if !options.showStatus && !options.clean { + // 4) display title + displayMigrateTitle(curveadm, data) - // 5) confirm by user - if pass := tui.ConfirmYes(tui.DEFAULT_CONFIRM_PROMPT); !pass { - curveadm.WriteOutln(tui.PromptCancelOpetation("migrate service")) - return errno.ERR_CANCEL_OPERATION + // 5) confirm by user + if pass := tuicomm.ConfirmYes(tuicomm.DEFAULT_CONFIRM_PROMPT); !pass { + curveadm.WriteOutln(tuicomm.PromptCancelOpetation("migrate service")) + return errno.ERR_CANCEL_OPERATION + } } // 6) generate migrate playbook @@ -294,6 +349,13 @@ func runMigrate(curveadm *cli.CurveAdm, options migrateOptions) error { } // 9) print success prompt + if options.showStatus { + displayMigrateStatus(curveadm) + return nil + } + if options.clean { + return nil + } curveadm.WriteOutln("") curveadm.WriteOutln(color.GreenString("Services successfully migrateed ^_^.")) // TODO(P1): warning iff there is changed configs diff --git a/cli/command/scale_out.go b/cli/command/scale_out.go index 5e703b056..807e80c8a 100644 --- a/cli/command/scale_out.go +++ b/cli/command/scale_out.go @@ -144,7 +144,7 @@ func NewScaleOutCommand(curveadm *cli.CurveAdm) *cobra.Command { return cmd } -func readTopology(curveadm *cli.CurveAdm, filename string) (string, error) { +func readTopology(curveadm *cli.CurveAdm, filename string, showStatus bool, clean bool) (string, error) { if !utils.PathExist(filename) { return "", errno.ERR_TOPOLOGY_FILE_NOT_FOUND. F("%s: no such file", utils.AbsPath(filename)) @@ -156,7 +156,9 @@ func readTopology(curveadm *cli.CurveAdm, filename string) (string, error) { } oldData := curveadm.ClusterTopologyData() - curveadm.WriteOut("%s", utils.Diff(oldData, data)) + if !showStatus && !clean { + curveadm.WriteOut("%s", utils.Diff(oldData, data)) + } return data, nil } @@ -384,7 +386,7 @@ func runScaleOut(curveadm *cli.CurveAdm, options scaleOutOptions) error { } // 2) read topology from file - data, err := readTopology(curveadm, options.filename) + data, err := readTopology(curveadm, options.filename, false, false) if err != nil { return err } diff --git a/internal/common/common.go b/internal/common/common.go index 8e67c6485..cc9bfb344 100644 --- a/internal/common/common.go +++ b/internal/common/common.go @@ -53,6 +53,10 @@ const ( // format KEY_ALL_FORMAT_STATUS = "ALL_FORMAT_STATUS" + // migrate + KEY_MIGRATE_STATUS = "MIGRATE_STATUS" + KEY_MIGRATE_COMMON_STATUS = "MIGRATE_COMMON_STATUS" + // check KEY_CHECK_WITH_WEAK = "CHECK_WITH_WEAK" KEY_CHECK_KERNEL_MODULE_NAME = "CHECK_KERNEL_MODULE_NAME" @@ -71,12 +75,13 @@ const ( SERVICE_STATUS_UNKNOWN = "Unknown" // clean - KEY_CLEAN_ITEMS = "CLEAN_ITEMS" - KEY_CLEAN_BY_RECYCLE = "CLEAN_BY_RECYCLE" - CLEAN_ITEM_LOG = "log" - CLEAN_ITEM_DATA = "data" - CLEAN_ITEM_CONTAINER = "container" - CLEANED_CONTAINER_ID = "-" + KEY_CLEAN_ITEMS = "CLEAN_ITEMS" + KEY_CLEAN_BY_RECYCLE = "CLEAN_BY_RECYCLE" + CLEAN_ITEM_LOG = "log" + CLEAN_ITEM_DATA = "data" + CLEAN_ITEM_CONTAINER = "container" + CLEANED_CONTAINER_ID = "-" + KEY_REMOVE_MIGRATED_SERVER = "REMOVE_MIGRATED_SERVER" // client KEY_CLIENT_HOST = "CLIENT_HOST" diff --git a/internal/configure/pool.go b/internal/configure/pool.go index b38274ba8..2fced1dfe 100644 --- a/internal/configure/pool.go +++ b/internal/configure/pool.go @@ -263,26 +263,39 @@ func ScaleOutClusterPool(old *CurveClusterTopo, dcs []*topology.DeployConfig, po old.NPools = old.NPools + 1 } -func MigrateClusterServer(old *CurveClusterTopo, migrates []*MigrateServer) { +func MigrateClusterServer(old *CurveClusterTopo, migrates []*MigrateServer, removeMigratedServer bool) { m := map[string]*topology.DeployConfig{} // key: from.Name, value: to.DeployConfig for _, migrate := range migrates { m[formatName(migrate.From)] = migrate.To } - for i, server := range old.Servers { - dc, ok := m[server.Name] - if !ok { - continue + // add server that will migrate to + for fromName, toDc := range m { + server := Server{} + server.InternalIp = toDc.GetListenIp() + server.ExternalIp = toDc.GetListenExternalIp() + server.InternalPort = toDc.GetListenPort() + server.ExternalPort = toDc.GetListenExternalPort() + server.Name = formatName(toDc) + + for _, oldServer := range old.Servers { + if oldServer.Name == fromName { + server.PhysicalPool = oldServer.PhysicalPool + server.Poolset = oldServer.Poolset + server.Zone = oldServer.Zone + } } + old.Servers = append(old.Servers, server) + } - server.InternalIp = dc.GetListenIp() - server.ExternalIp = dc.GetListenExternalIp() - server.Name = formatName(dc) - if server.InternalPort != 0 && server.ExternalPort != 0 { - server.InternalPort = dc.GetListenPort() - server.ExternalPort = dc.GetListenExternalPort() + // remove server that has migrated + if removeMigratedServer { + for i := 0; i < len(old.Servers); i++ { + _, ok := m[old.Servers[i].Name] + if ok { + old.Servers = append(old.Servers[:i], old.Servers[i+1:]...) + } } - old.Servers[i] = server } } diff --git a/internal/errno/errno.go b/internal/errno/errno.go index 46b8228c5..e421dbed6 100644 --- a/internal/errno/errno.go +++ b/internal/errno/errno.go @@ -398,7 +398,11 @@ var ( ERR_ENCRYPT_FILE_FAILED = EC(410021, "encrypt file failed") ERR_CLIENT_ID_NOT_FOUND = EC(410022, "client id not found") ERR_ENABLE_ETCD_AUTH_FAILED = EC(410023, "enable etcd auth failed") - + ERR_MARK_CHUNKSERVER_PENDDING = EC(410024, "mark chunkserver pendding status failed when migrate") + RRR_GET_CLUSTER_MDSADDR = EC(410025, "failed to get cluster mds addr") + ERR_GET_CHUNKSERVER_COPYSET = EC(410026, "failed to get chunkserver copyset") + ERR_GET_MIGRATE_COPYSET = EC(410027, "migrate chunkserver copyset info must be 2") + ERR_CONTAINER_NOT_REMOVED = EC(410027, "container not removed") // 420: common (curvebs client) ERR_VOLUME_ALREADY_MAPPED = EC(420000, "volume already mapped") ERR_VOLUME_CONTAINER_LOSED = EC(420001, "volume container is losed") diff --git a/internal/playbook/factory.go b/internal/playbook/factory.go index f62a7b3e5..2689d42e7 100644 --- a/internal/playbook/factory.go +++ b/internal/playbook/factory.go @@ -83,6 +83,7 @@ const ( GET_CLIENT_STATUS INSTALL_CLIENT UNINSTALL_CLIENT + GET_MIGRATE_STATUS // bs FORMAT_CHUNKFILE_POOL @@ -93,6 +94,7 @@ const ( CREATE_VOLUME MAP_IMAGE UNMAP_IMAGE + MARK_SERVER_PENGDDING // monitor PULL_MONITOR_IMAGE @@ -247,6 +249,8 @@ func (p *Playbook) createTasks(step *PlaybookStep) (*tasks.Tasks, error) { t, err = comm.NewInstallClientTask(curveadm, config.GetCC(i)) case UNINSTALL_CLIENT: t, err = comm.NewUninstallClientTask(curveadm, nil) + case GET_MIGRATE_STATUS: + t, err = comm.NewGetMigrateStatusTask(curveadm, config.GetDC(i)) // bs case FORMAT_CHUNKFILE_POOL: t, err = bs.NewFormatChunkfilePoolTask(curveadm, config.GetFC(i)) @@ -275,6 +279,8 @@ func (p *Playbook) createTasks(step *PlaybookStep) (*tasks.Tasks, error) { t, err = bs.NewDeleteTargetTask(curveadm, nil) case LIST_TARGETS: t, err = bs.NewListTargetsTask(curveadm, nil) + case MARK_SERVER_PENGDDING: + t, err = bs.NewMarkServerPendding(curveadm, config.GetDC(i)) // fs case CHECK_CLIENT_S3: t, err = checker.NewClientS3ConfigureTask(curveadm, config.GetCC(i)) diff --git a/internal/task/scripts/script.go b/internal/task/scripts/script.go index 51bcc9813..3edd5f061 100644 --- a/internal/task/scripts/script.go +++ b/internal/task/scripts/script.go @@ -61,4 +61,10 @@ var ( //go:embed shell/create_fs.sh CREATE_FS string + + //go:embed shell/mark_server_pendding.sh + MARK_SERVER_PENDDING string + + //go:embed shell/get_copyset_status.sh + GET_COPYSET_STATUS string ) diff --git a/internal/task/scripts/shell/get_copyset_status.sh b/internal/task/scripts/shell/get_copyset_status.sh new file mode 100644 index 000000000..34fa8ad13 --- /dev/null +++ b/internal/task/scripts/shell/get_copyset_status.sh @@ -0,0 +1,42 @@ +#!/usr/bin/env bash + +# Usage: create_volume USER VOLUME SIZE +# Example: create_volume curve test 10 +# Created Date: 2023-12-04 +# Author: Caoxianfei(caoxianfei1) + +clusterMdsAddr=$1 +fromChunkserverMdsAddr=$2 +toChunkserverMdsAddr=$3 + + +from_total_copyset=$(curve_ops_tool check-chunkserver -mdsAddr="$clusterMdsAddr" -chunkserverAddr="$fromChunkserverMdsAddr" | awk -v ip="$clusterMdsAddr" -v port="$fromChunkserverMdsAddr" ' +BEGIN { FS = ", "; OFS = "\n" } +$0 ~ /total/ { + split($1, arr1, ": ") + print arr1[2] + exit +}') + +if [ -z $"from_total_copyset" ]; then + echo "get $fromChunkserverMdsAddr copyset info failed" + exit1 +fi + +to_total_copyset=$(curve_ops_tool check-chunkserver -mdsAddr="$clusterMdsAddr" -chunkserverAddr="$toChunkserverMdsAddr" | awk -v ip="$clusterMdsAddr" -v port="$toChunkserverMdsAddr" ' +BEGIN { FS = ", "; OFS = "\n" } +$0 ~ /total/ { + split($1, arr1, ": ") + print arr1[2] + exit +}') + +if [ -z $"from_total_copyset" ]; then + echo "get $toChunkserverMdsAddr copyset info failed" + exit1 +fi + +echo "$from_total_copyset":"$to_total_copyset" + +exit 0 + diff --git a/internal/task/scripts/shell/mark_server_pendding.sh b/internal/task/scripts/shell/mark_server_pendding.sh new file mode 100644 index 000000000..9f1460077 --- /dev/null +++ b/internal/task/scripts/shell/mark_server_pendding.sh @@ -0,0 +1,32 @@ +#!/usr/bin/env bash + +# Usage: create_volume USER VOLUME SIZE +# Example: create_volume curve test 10 +# Created Date: 2023-12-04 +# Author: Caoxianfei(caoxianfei1) + +IP=$1 +PORT=$2 + +CHUNKSERVER_ID=$(curve_ops_tool chunkserver-list | awk -v ip="$IP" -v port="$PORT" ' +BEGIN { FS = ", "; OFS = "\n" } +$0 ~ /chunkServerID/ { + split($3, arr1, " = ") + split($4, arr2, " = ") + if (arr1[2] == ip && arr2[2] == port) { + split($1, arr3, " = ") + print arr3[2] + exit + } +}') + +if [ -z "$CHUNKSERVER_ID" ]; then + echo "chunkserver $IP:$PORT not found" + exit 1 +fi + +/curvebs/tools/sbin/curvebs-tool -op=set_chunkserver -chunkserver_id=$CHUNKSERVER_ID -chunkserver_status=pendding +if [ $? -ne 0 ]; then + echo "failed to set chunkserver $IP:$PORT pendding status" + exit 1 +fi \ No newline at end of file diff --git a/internal/task/task/bs/mark_server_pendding.go b/internal/task/task/bs/mark_server_pendding.go new file mode 100644 index 000000000..06b350bec --- /dev/null +++ b/internal/task/task/bs/mark_server_pendding.go @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2022 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Project: CurveAdm + * Created Date: 2023-11-30 + * Author: Xianfei Cao (caoxianfei1) + */ + +package bs + +import ( + "fmt" + + "github.com/opencurve/curveadm/cli/cli" + comm "github.com/opencurve/curveadm/internal/common" + "github.com/opencurve/curveadm/internal/configure" + "github.com/opencurve/curveadm/internal/configure/topology" + "github.com/opencurve/curveadm/internal/errno" + "github.com/opencurve/curveadm/internal/task/context" + "github.com/opencurve/curveadm/internal/task/scripts" + "github.com/opencurve/curveadm/internal/task/step" + "github.com/opencurve/curveadm/internal/task/task" + tui "github.com/opencurve/curveadm/internal/tui/common" +) + +func CheckContainerExist(host, role, containerId string, out *string) step.LambdaType { + return func(ctx *context.Context) error { + if len(*out) == 0 { + return errno.ERR_CONTAINER_ALREADT_REMOVED. + F("host=%s role=%s containerId=%s", + host, role, tui.TrimContainerId(containerId)) + } + return nil + } +} + +func checkMarkStatus(success *bool, out *string) step.LambdaType { + return func(ctx *context.Context) error { + if !*success { + return errno.ERR_MARK_CHUNKSERVER_PENDDING.S(*out) + } + return nil + } +} + +func NewMarkServerPendding(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (*task.Task, error) { + serviceId := curveadm.GetServiceId(dc.GetId()) + containerId, err := curveadm.GetContainerId(serviceId) + if curveadm.IsSkip(dc) { + return nil, nil + } else if err != nil { + return nil, err + } + + hc, err := curveadm.GetHost(dc.GetHost()) + if err != nil { + return nil, err + } + + // new task + subname := fmt.Sprintf("host=%s role=%s containerId=%s", + dc.GetHost(), dc.GetRole(), tui.TrimContainerId(containerId)) + t := task.NewTask("Mark Chunkserver Pendding", subname, hc.GetSSHConfig()) + + var out string + var success bool + host, role := dc.GetHost(), dc.GetRole() + layout := dc.GetProjectLayout() + markCSPenddingScript := scripts.MARK_SERVER_PENDDING + scriptPath := layout.ToolsBinDir + "/mark_server_pendding.sh" + + migrates := []*configure.MigrateServer{} + if curveadm.MemStorage().Get(comm.KEY_MIGRATE_SERVERS) != nil { + migrates = curveadm.MemStorage().Get(comm.KEY_MIGRATE_SERVERS).([]*configure.MigrateServer) + } + + t.AddStep(&step.ListContainers{ + ShowAll: true, + Format: `"{{.ID}}"`, + Filter: fmt.Sprintf("id=%s", containerId), + Out: &out, + ExecOptions: curveadm.ExecOptions(), + }) + t.AddStep(&step.Lambda{ + Lambda: CheckContainerExist(host, role, containerId, &out), + }) + t.AddStep(&step.InstallFile{ // install /curvebs/tools/sbin/mark_chunkserver_pendding.sh + ContainerId: &containerId, + ContainerDestPath: scriptPath, + Content: &markCSPenddingScript, + ExecOptions: curveadm.ExecOptions(), + }) + for _, migrate := range migrates { + hostip := migrate.From.GetListenIp() + hostport := migrate.From.GetListenPort() + t.AddStep(&step.ContainerExec{ + ContainerId: &containerId, + Command: fmt.Sprintf("/bin/bash %s %s %d", scriptPath, hostip, hostport), + Success: &success, + Out: &out, + ExecOptions: curveadm.ExecOptions(), + }) + t.AddStep(&step.Lambda{ + Lambda: checkMarkStatus(&success, &out), + }) + } + + return t, nil +} diff --git a/internal/task/task/common/create_pool.go b/internal/task/task/common/create_pool.go index 3adbbc200..bc957c6b8 100644 --- a/internal/task/task/common/create_pool.go +++ b/internal/task/task/common/create_pool.go @@ -110,7 +110,14 @@ func prepare(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (clusterPoolJson configure.ScaleOutClusterPool(&clusterPool, dcs, poolset) } else if curveadm.MemStorage().Get(comm.KEY_MIGRATE_SERVERS) != nil { // migrate servers migrates := curveadm.MemStorage().Get(comm.KEY_MIGRATE_SERVERS).([]*configure.MigrateServer) - configure.MigrateClusterServer(&clusterPool, migrates) + var removeMigratedServer bool + if curveadm.MemStorage().Get(comm.KEY_REMOVE_MIGRATED_SERVER) == nil { + removeMigratedServer = false + } else if curveadm.MemStorage().Get(comm.KEY_REMOVE_MIGRATED_SERVER) != nil && + curveadm.MemStorage().Get(comm.KEY_REMOVE_MIGRATED_SERVER).(bool) == true { + removeMigratedServer = true + } + configure.MigrateClusterServer(&clusterPool, migrates, removeMigratedServer) } // 3. encode cluster pool to json string diff --git a/internal/task/task/common/migrate_status.go b/internal/task/task/common/migrate_status.go new file mode 100644 index 000000000..bae73a4de --- /dev/null +++ b/internal/task/task/common/migrate_status.go @@ -0,0 +1,215 @@ +/* + * Copyright (c) 2022 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Project: CurveAdm + * Created Date: 2023-11-30 + * Author: Xianfei Cao (caoxianfei1) + */ + +package common + +import ( + "fmt" + "strconv" + "strings" + + "github.com/opencurve/curveadm/cli/cli" + comm "github.com/opencurve/curveadm/internal/common" + "github.com/opencurve/curveadm/internal/configure" + "github.com/opencurve/curveadm/internal/configure/topology" + "github.com/opencurve/curveadm/internal/errno" + "github.com/opencurve/curveadm/internal/task/context" + "github.com/opencurve/curveadm/internal/task/scripts" + "github.com/opencurve/curveadm/internal/task/step" + "github.com/opencurve/curveadm/internal/task/task" + tui "github.com/opencurve/curveadm/internal/tui/common" + "github.com/opencurve/curveadm/internal/utils" +) + +const ( + ContainerDown = "Down" + ContainerUp = "Up" +) + +type MigrateStatus struct { + FromChunkserver string + FromCopyset string + ToChunkserver string + ToCopyset string + Status string +} + +type step2FormatMigrateStatus struct { + config *topology.DeployConfig + fromChunkserver string + toChunkserver string + copysetInfo *string + memStorage *utils.SafeMap +} + +func CheckGetCopySet(success *bool, out *string) step.LambdaType { + return func(ctx *context.Context) error { + if !*success { + return errno.ERR_GET_CHUNKSERVER_COPYSET.S(*out) + } + return nil + } +} + +func (s *step2FormatMigrateStatus) Execute(ctx *context.Context) error { + config := s.config + copysets := strings.Split(*s.copysetInfo, ":") + if len(copysets) != 2 { + return errno.ERR_GET_MIGRATE_COPYSET + } + fromCsCopySet := copysets[0] + toCsCopySet := copysets[1] + + fromCsCopySetInt, toCsCopySetInt, err := convertCopySet(fromCsCopySet, toCsCopySet) + if err != nil { + return err + } + + totalCopyset := config.GetCopysets() + status := "Finished" + if toCsCopySetInt == 0 { + status = "Not Start" + } else if (toCsCopySetInt > 0 && toCsCopySetInt < totalCopyset) || fromCsCopySetInt > 0 { + status = "Transferring" + } else if fromCsCopySetInt == 0 && toCsCopySetInt == totalCopyset { + status = "Finished" + } + + migrateStatus := MigrateStatus{ + FromChunkserver: s.fromChunkserver, + FromCopyset: fromCsCopySet, + ToChunkserver: s.toChunkserver, + ToCopyset: toCsCopySet, + Status: status, + } + + id := s.fromChunkserver + s.memStorage.TX(func(kv *utils.SafeMap) error { + m := map[string]MigrateStatus{} + v := kv.Get(comm.KEY_MIGRATE_STATUS) + if v != nil { + m = v.(map[string]MigrateStatus) + } + m[id] = migrateStatus + kv.Set(comm.KEY_MIGRATE_STATUS, m) + return nil + }) + + return nil +} + +func convertCopySet(fromCopyset, toCopyset string) (int, int, error) { + fromCsCopySetInt, err := strconv.Atoi(fromCopyset) + if err != nil { + return 0, 0, err + } + toCsCopySetInt, err := strconv.Atoi(toCopyset) + if err != nil { + return 0, 0, err + } + return fromCsCopySetInt, toCsCopySetInt, nil +} + +func NewGetMigrateStatusTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (*task.Task, error) { + serviceId := curveadm.GetServiceId(dc.GetId()) + containerId, err := curveadm.GetContainerId(serviceId) + if curveadm.IsSkip(dc) { + return nil, nil + } else if err != nil { + return nil, err + } + + hc, err := curveadm.GetHost(dc.GetHost()) + if err != nil { + return nil, err + } + + // new task + subname := fmt.Sprintf("host=%s role=%s containerId=%s", + dc.GetHost(), dc.GetRole(), tui.TrimContainerId(containerId)) + t := task.NewTask("Get Migrate progress", subname, hc.GetSSHConfig()) + + var out string + var success bool + host, role := dc.GetHost(), dc.GetRole() + layout := dc.GetProjectLayout() + clusterMdsAddr, err := dc.GetVariables().Get("cluster_mds_addr") + if err != nil { + return nil, errno.RRR_GET_CLUSTER_MDSADDR + } + getCopySetStatusScript := scripts.GET_COPYSET_STATUS + scriptPath := layout.ToolsBinDir + "/get_copyset_status.sh" + + migrates := []*configure.MigrateServer{} + if curveadm.MemStorage().Get(comm.KEY_MIGRATE_SERVERS) != nil { + migrates = curveadm.MemStorage().Get(comm.KEY_MIGRATE_SERVERS).([]*configure.MigrateServer) + } + migrateRole := migrates[0].From.GetRole() + + t.AddStep(&step.ListContainers{ + ShowAll: true, + Format: `"{{.ID}}"`, + Filter: fmt.Sprintf("id=%s", containerId), + Out: &out, + ExecOptions: curveadm.ExecOptions(), + }) + t.AddStep(&step.Lambda{ + Lambda: CheckContainerExist(host, role, containerId, &out), + }) + t.AddStep(&step.InstallFile{ // install /curvebs/tools/sbin/get_copyset_status.sh + ContainerId: &containerId, + ContainerDestPath: scriptPath, + Content: &getCopySetStatusScript, + ExecOptions: curveadm.ExecOptions(), + }) + + if migrateRole == topology.ROLE_CHUNKSERVER { + for _, migrate := range migrates { // migrate chunkserver + fromHostip := migrate.From.GetListenIp() + fromHostport := migrate.From.GetListenPort() + fromChunkserverAddr := fmt.Sprint(fromHostip, ":", fromHostport) + toHostip := migrate.To.GetListenIp() + toHostport := migrate.To.GetListenPort() + toChunkserverAddr := fmt.Sprint(toHostip, ":", toHostport) + t.AddStep(&step.ContainerExec{ + ContainerId: &containerId, + Command: fmt.Sprintf("/bin/bash %s %s %s %s", + scriptPath, clusterMdsAddr, fromChunkserverAddr, toChunkserverAddr), + Success: &success, + Out: &out, + ExecOptions: curveadm.ExecOptions(), + }) + t.AddStep(&step.Lambda{ + Lambda: CheckGetCopySet(&success, &out), + }) + t.AddStep(&step2FormatMigrateStatus{ + config: migrate.From, + fromChunkserver: fromChunkserverAddr, + toChunkserver: toChunkserverAddr, + copysetInfo: &out, + memStorage: curveadm.MemStorage(), + }) + } + } + + return t, nil +} diff --git a/internal/tui/migrate.go b/internal/tui/migrate.go new file mode 100644 index 000000000..6f0dd8e11 --- /dev/null +++ b/internal/tui/migrate.go @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2022 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Project: CurveAdm + * Created Date: 2023-12-5 + * Author: Caoxianfei(caoxianfei1) + */ + +package tui + +import ( + "github.com/opencurve/curveadm/internal/task/task/common" + tui "github.com/opencurve/curveadm/internal/tui/common" +) + +func FormatMigrateStatus(statuses []common.MigrateStatus) string { + lines := [][]interface{}{} + + // title + title := []string{"Migrate From", "Copysets", "Migrate To", "Copysets", "Status"} + first, second := tui.FormatTitle(title) + lines = append(lines, first) + lines = append(lines, second) + + // status + for _, status := range statuses { + line := []interface{}{ + status.FromChunkserver, + status.FromCopyset, + status.ToChunkserver, + status.ToCopyset, + status.Status, + } + lines = append(lines, line) + } + + output := tui.FixedFormat(lines, 2) + return output +}