Skip to content

Commit

Permalink
support migarte etcd mds and snapshot
Browse files Browse the repository at this point in the history
Signed-off-by: caoxianfei1 <caoxianfei@corp.netease.com>
  • Loading branch information
caoxianfei1 committed Dec 25, 2023
1 parent b1a3a4e commit 8097a71
Show file tree
Hide file tree
Showing 14 changed files with 466 additions and 10 deletions.
29 changes: 24 additions & 5 deletions cli/command/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,24 @@ import (
"github.com/spf13/cobra"
)

const (
ETCD_INIT_CLUSTER_STATE = "initial-cluster-state"
ETCD_STATE_EXSITING = "existing"
)

var (
MIGRATE_ETCD_STEPS = []int{
playbook.STOP_SERVICE,
playbook.CLEAN_SERVICE, // only container
playbook.ADD_ETCD_MEMBER,
playbook.PULL_IMAGE,
playbook.CREATE_CONTAINER,
playbook.SYNC_CONFIG,
playbook.AMEND_ETCD_CONFIG,
playbook.START_ETCD,
playbook.UPDATE_TOPOLOGY,
playbook.AMEND_MDS_CONFIG,
playbook.RESTART_SERVICE, // restart all mds
// playbook.STOP_SERVICE,
// playbook.CLEAN_SERVICE, // only container
// playbook.UPDATE_TOPOLOGY,
}

// mds
Expand Down Expand Up @@ -157,7 +166,7 @@ func checkMigrateTopology(curveadm *cli.CurveAdm, data string) error {
} else if len(dcs2add) < len(dcs2del) {
return errno.ERR_DELETE_SERVICE_WHILE_MIGRATING_IS_DENIED
}
// len(dcs2add) == len(dcs2del)

if len(dcs2add) == 0 {
return errno.ERR_NO_SERVICES_FOR_MIGRATING
}
Expand Down Expand Up @@ -199,6 +208,7 @@ func genMigratePlaybook(curveadm *cli.CurveAdm,
migrates := getMigrates(curveadm, data)
role := migrates[0].From.GetRole()
steps := MIGRATE_ROLE_STEPS[role]
etcdDCs := curveadm.FilterDeployConfigByRole(dcs, topology.ROLE_ETCD)

// post clean
if options.clean {
Expand All @@ -221,10 +231,14 @@ func genMigratePlaybook(curveadm *cli.CurveAdm,
config := dcs2add
switch step {
case playbook.STOP_SERVICE,
playbook.CLEAN_SERVICE:
playbook.CLEAN_SERVICE,
playbook.ADD_ETCD_MEMBER:
config = dcs2del
case playbook.BACKUP_ETCD_DATA:
config = curveadm.FilterDeployConfigByRole(dcs, topology.ROLE_ETCD)
case playbook.AMEND_MDS_CONFIG,
playbook.RESTART_SERVICE:
config = curveadm.FilterDeployConfigByRole(dcs, topology.ROLE_MDS)
case
playbook.CREATE_PHYSICAL_POOL,
playbook.CREATE_LOGICAL_POOL,
Expand All @@ -251,6 +265,11 @@ func genMigratePlaybook(curveadm *cli.CurveAdm,
optionsKV[comm.KEY_POOLSET] = poolset
case playbook.UPDATE_TOPOLOGY:
optionsKV[comm.KEY_NEW_TOPOLOGY_DATA] = data
case playbook.ADD_ETCD_MEMBER,
playbook.AMEND_ETCD_CONFIG,
playbook.AMEND_MDS_CONFIG:
optionsKV[comm.KEY_MIGRATE_SERVERS] = migrates
optionsKV[comm.KEY_CLUSTER_DCS] = etcdDCs
}

pb.AddStep(&playbook.PlaybookStep{
Expand Down
Binary file added curve
Binary file not shown.
1 change: 1 addition & 0 deletions internal/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ const (
// migrate
KEY_MIGRATE_STATUS = "MIGRATE_STATUS"
KEY_MIGRATE_COMMON_STATUS = "MIGRATE_COMMON_STATUS"
KEY_CLUSTER_DCS = "CLUSTER_DCS"

// check
KEY_CHECK_WITH_WEAK = "CHECK_WITH_WEAK"
Expand Down
6 changes: 5 additions & 1 deletion internal/configure/topology/dc_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,11 @@ func (dc *DeployConfig) GetInstances() int { return dc.instanc
func (dc *DeployConfig) GetHostSequence() int { return dc.hostSequence }
func (dc *DeployConfig) GetInstancesSequence() int { return dc.instancesSequence }
func (dc *DeployConfig) GetServiceConfig() map[string]string { return dc.serviceConfig }
func (dc *DeployConfig) GetVariables() *variable.Variables { return dc.variables }
func (dc *DeployConfig) SetServiceConfig(key, value string) {
dc.serviceConfig[key] = value
}

func (dc *DeployConfig) GetVariables() *variable.Variables { return dc.variables }

// (2): config item
func (dc *DeployConfig) GetPrefix() string { return dc.getString(CONFIG_PREFIX) }
Expand Down
4 changes: 2 additions & 2 deletions internal/configure/topology/variables.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ var (
{name: "cluster_mds_dummy_addr"},
{name: "cluster_mds_dummy_port"},
{name: "cluster_chunkserver_addr", kind: []string{KIND_CURVEBS}},
{name: "cluster_snapshotclone_addr", kind: []string{KIND_CURVEBS}},
{name: "cluster_snapshotclone_addr"},
{name: "cluster_snapshotclone_proxy_addr", kind: []string{KIND_CURVEBS}},
{name: "cluster_snapshotclone_dummy_port", kind: []string{KIND_CURVEBS}},
{name: "cluster_snapshotclone_dummy_port"},
{name: "cluster_snapshotclone_nginx_upstream", kind: []string{KIND_CURVEBS}},
{name: "cluster_snapshot_addr"}, // tools-v2: compatible with some old version image
{name: "cluster_snapshot_dummy_addr"}, // tools-v2
Expand Down
2 changes: 2 additions & 0 deletions internal/errno/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,8 @@ var (
ERR_GET_CHUNKSERVER_COPYSET = EC(410026, "failed to get chunkserver copyset")
ERR_GET_MIGRATE_COPYSET = EC(410027, "migrate chunkserver copyset info must be 2")
ERR_CONTAINER_NOT_REMOVED = EC(410027, "container not removed")
ERR_GET_CLUSTER_ETCD_ADDR = EC(410028, "failed to get cluster_etcd_addr variable")
ERR_ADD_ETCD_MEMEBER = EC(410029, "failed to add member to existing etcd cluster")
// 420: common (curvebs client)
ERR_VOLUME_ALREADY_MAPPED = EC(420000, "volume already mapped")
ERR_VOLUME_CONTAINER_LOSED = EC(420001, "volume container is losed")
Expand Down
9 changes: 9 additions & 0 deletions internal/playbook/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ const (
INSTALL_CLIENT
UNINSTALL_CLIENT
ATTACH_LEADER_OR_RANDOM_CONTAINER
ADD_ETCD_MEMBER
AMEND_ETCD_CONFIG
AMEND_MDS_CONFIG

// bs
FORMAT_CHUNKFILE_POOL
Expand Down Expand Up @@ -251,6 +254,12 @@ func (p *Playbook) createTasks(step *PlaybookStep) (*tasks.Tasks, error) {
t, err = comm.NewInstallClientTask(curveadm, config.GetCC(i))
case UNINSTALL_CLIENT:
t, err = comm.NewUninstallClientTask(curveadm, nil)
case ADD_ETCD_MEMBER:
t, err = comm.NewAddEtcdMemberTask(curveadm, config.GetDC(i))
case AMEND_ETCD_CONFIG:
t, err = comm.NewAmendEtcdConfigTask(curveadm, config.GetDC(i))
case AMEND_MDS_CONFIG:
t, err = comm.NewAmendMdsConfigTask(curveadm, config.GetDC(i))
// bs
case FORMAT_CHUNKFILE_POOL:
t, err = bs.NewFormatChunkfilePoolTask(curveadm, config.GetFC(i))
Expand Down
4 changes: 2 additions & 2 deletions internal/task/scripts/enable_etcd_auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
*/

/*
* Project: Curveadm
* Created Date: 2023-08-02
* Author: wanghai (SeanHai)
*/
*/

package scripts

Expand Down
2 changes: 2 additions & 0 deletions internal/task/scripts/script.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ var (
WAIT string
//go:embed shell/report.sh
REPORT string
//go:embed shell/add_etcd.sh
ADD_ETCD string

// CurveBS

Expand Down
43 changes: 43 additions & 0 deletions internal/task/scripts/shell/add_etcd.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#!/usr/bin/env bash

# Usage:
# Example:
# Created Date: 2023-12-15
# Author: Caoxianfei

etcdctl=$1
endpoints=$2
old_name=$3
new_name=$4
new_peer_url=$5

${etcdctl} --endpoints=${endpoints} member add ${new_name} --peer-urls ${new_peer_url} > add_etcd.log 2>&1
if [ $? -ne 0 ]; then
if cat add_etcd.log | grep -q "Peer URLs already exists"; then
exit 0
else
exit 1
fi
fi

# output=$(${etcdctl} --endpoints=${endpoints} member list)
# if [ $? -ne 0 ]; then
# echo "failed to list all etcd members"
# exit 1
# fi

# id=$(echo "$output" | awk -v name="$old_name" -F ', ' '$3 == name {print $1}')
# if [ -z "${id}" ]; then
# echo "failed to get id of member ${old_name}"
# exit 1
# fi

# ${etcdctl} --endpoints=${endpoints} member remove ${id}
# if [ $? -ne 0 ]; then
# echo "failed to remove member ${old_name}"
# exit 1
# fi




116 changes: 116 additions & 0 deletions internal/task/task/common/add_etcd_mem.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright (c) 2023 NetEase Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/*
* Project: CurveAdm
* Created Date: 2023-12-20
* Author: Caoxianfei
*/

package common

import (
"fmt"
"strconv"

"github.com/opencurve/curveadm/cli/cli"
comm "github.com/opencurve/curveadm/internal/common"
"github.com/opencurve/curveadm/internal/configure"
"github.com/opencurve/curveadm/internal/configure/topology"
"github.com/opencurve/curveadm/internal/errno"
"github.com/opencurve/curveadm/internal/task/context"
"github.com/opencurve/curveadm/internal/task/scripts"
"github.com/opencurve/curveadm/internal/task/step"
"github.com/opencurve/curveadm/internal/task/task"
tui "github.com/opencurve/curveadm/internal/tui/common"
)

func checkAddEtcdMemberStatus(success *bool, out *string) step.LambdaType {
return func(ctx *context.Context) error {
if !*success {
return errno.ERR_ADD_ETCD_MEMEBER.S(*out)
}
return nil
}
}

func NewAddEtcdMemberTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (*task.Task, error) {
serviceId := curveadm.GetServiceId(dc.GetId())
containerId, err := curveadm.GetContainerId(serviceId)
if curveadm.IsSkip(dc) {
return nil, nil
} else if err != nil {
return nil, err
}
hc, err := curveadm.GetHost(dc.GetHost())
if err != nil {
return nil, err
}

subname := fmt.Sprintf("host=%s role=%s containerId=%s",
dc.GetHost(), dc.GetRole(), tui.TrimContainerId(containerId))
t := task.NewTask("Backup Etcd Data", subname, hc.GetSSHConfig())

host, role := dc.GetHost(), dc.GetRole()
script := scripts.ADD_ETCD
layout := dc.GetProjectLayout()
scriptPath := fmt.Sprintf("%s/add_etcd.sh", layout.ServiceBinDir)
etcdctlPath := layout.ServiceBinDir + "/etcdctl"
endpoints, err := dc.GetVariables().Get("cluster_etcd_addr")
if err != nil {
return nil, errno.ERR_GET_CLUSTER_ETCD_ADDR
}
oldName := fmt.Sprint("etcd", strconv.Itoa(dc.GetHostSequence()), strconv.Itoa(dc.GetInstancesSequence()))
newName := fmt.Sprint("etcd", strconv.Itoa(dc.GetHostSequence()+3), strconv.Itoa(dc.GetInstancesSequence()))
migrates := []*configure.MigrateServer{}
if curveadm.MemStorage().Get(comm.KEY_MIGRATE_SERVERS) != nil {
migrates = curveadm.MemStorage().Get(comm.KEY_MIGRATE_SERVERS).([]*configure.MigrateServer)
}
toService := migrates[0].To
peerUrl := fmt.Sprint("http://", toService.GetListenIp(), ":", strconv.Itoa(toService.GetListenPort()))
amendEtcdConfCmd := fmt.Sprintf("/bin/bash %s %s %s %s %s %s", scriptPath, etcdctlPath, endpoints, oldName, newName, peerUrl)

var success bool
var out string
t.AddStep(&step.ListContainers{
ShowAll: true,
Format: `"{{.ID}}"`,
Filter: fmt.Sprintf("id=%s", containerId),
Out: &out,
ExecOptions: curveadm.ExecOptions(),
})
t.AddStep(&step.Lambda{
Lambda: CheckContainerExist(host, role, containerId, &out),
})
t.AddStep(&step.InstallFile{
ContainerId: &containerId,
ContainerDestPath: scriptPath,
Content: &script,
ExecOptions: curveadm.ExecOptions(),
})
t.AddStep(&step.ContainerExec{
ContainerId: &containerId,
Success: &success,
Out: &out,
Command: amendEtcdConfCmd,
ExecOptions: curveadm.ExecOptions(),
})
t.AddStep(&step.Lambda{
Lambda: checkAddEtcdMemberStatus(&success, &out),
})

return t, nil
}
Loading

0 comments on commit 8097a71

Please sign in to comment.