Skip to content

Commit

Permalink
feat(upgrade): support v2 volume live upgrade
Browse files Browse the repository at this point in the history
Longhorn 6001

Signed-off-by: Derek Su <derek.su@suse.com>
  • Loading branch information
derekbit committed Jul 7, 2024
1 parent 0c8227c commit d93a24b
Show file tree
Hide file tree
Showing 4 changed files with 335 additions and 76 deletions.
57 changes: 1 addition & 56 deletions app/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"os"
"os/signal"
"path/filepath"
"regexp"
"strconv"
"strings"
"syscall"
Expand All @@ -26,10 +25,6 @@ import (
"google.golang.org/grpc/reflection"
"k8s.io/mount-utils"

commonTypes "github.com/longhorn/go-common-libs/types"
helpernvme "github.com/longhorn/go-spdk-helper/pkg/nvme"
helpertypes "github.com/longhorn/go-spdk-helper/pkg/types"
helperutil "github.com/longhorn/go-spdk-helper/pkg/util"
engineutil "github.com/longhorn/longhorn-engine/pkg/util"
spdk "github.com/longhorn/longhorn-spdk-engine/pkg/spdk"
spdkutil "github.com/longhorn/longhorn-spdk-engine/pkg/util"
Expand Down Expand Up @@ -115,52 +110,6 @@ func cleanup(pm *process.Manager) {
logrus.Errorf("Failed to clean up all processes for %s graceful shutdown", types.ProcessManagerGrpcService)
}

func getVolumeNameFromNQN(input string) (string, error) {
// compile a regular expression that matches ${name} between : and -e-
re, err := regexp.Compile(`:(.*)-e-`)
if err != nil {
return "", err
}
// find the first submatch of the input string
submatch := re.FindStringSubmatch(input)
if len(submatch) < 2 {
return "", fmt.Errorf("no name found in input")
}
// return the second element of the submatch, which is ${name}
return submatch[1], nil
}

func cleanupStaledNvmeAndDmDevices() error {
executor, err := helperutil.NewExecutor(commonTypes.ProcDirectory)
if err != nil {
return errors.Wrapf(err, "failed to create executor for cleaning up staled NVMe and dm devices")
}

subsystems, err := helpernvme.GetSubsystems(executor)
if err != nil {
return errors.Wrapf(err, "failed to get NVMe subsystems")
}
for _, sys := range subsystems {
logrus.Infof("Found NVMe subsystem %+v", sys)
if strings.HasPrefix(sys.NQN, helpertypes.NQNPrefix) {
dmDeviceName, err := getVolumeNameFromNQN(sys.NQN)
if err != nil {
return errors.Wrapf(err, "failed to get volume name from NQN %v", sys.NQN)
}
logrus.Infof("Removing dm device %v", dmDeviceName)
if err := helperutil.DmsetupRemove(dmDeviceName, false, false, executor); err != nil {
logrus.WithError(err).Warnf("Failed to remove dm device %v, will continue the cleanup", dmDeviceName)
}

logrus.Infof("Cleaning up NVMe subsystem %v: NQN %v", sys.Name, sys.NQN)
if err := helpernvme.DisconnectTarget(sys.NQN, executor); err != nil {
logrus.WithError(err).Warnf("Failed to disconnect NVMe subsystem %v: NQN %v, will continue the cleanup", sys.Name, sys.NQN)
}
}
}
return nil
}

func unfreezeFilesystems() error {
// We do not need to switch to the host mount namespace to get mount points here. Usually, longhorn-engine runs in a
// container that has / bind mounted to /host with at least HostToContainer (rslave) propagation.
Expand Down Expand Up @@ -209,11 +158,7 @@ func start(c *cli.Context) (err error) {
return err
}

if spdkEnabled {
if err := cleanupStaledNvmeAndDmDevices(); err != nil {
return err
}
} else {
if !spdkEnabled {
if err := unfreezeFilesystems(); err != nil {
return err
}
Expand Down
24 changes: 14 additions & 10 deletions pkg/api/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,24 @@ func RPCToInstanceList(obj *rpc.InstanceListResponse) map[string]*Instance {
}

type InstanceStatus struct {
State string `json:"state"`
ErrorMsg string `json:"errorMsg"`
Conditions map[string]bool `json:"conditions"`
PortStart int32 `json:"portStart"`
PortEnd int32 `json:"portEnd"`
State string `json:"state"`
ErrorMsg string `json:"errorMsg"`
Conditions map[string]bool `json:"conditions"`
PortStart int32 `json:"portStart"`
PortEnd int32 `json:"portEnd"`
TargetPortStart int32 `json:"targetPortStart"`
TargetPortEnd int32 `json:"targetPortEnd"`
}

func RPCToInstanceStatus(obj *rpc.InstanceStatus) InstanceStatus {
return InstanceStatus{
State: obj.State,
ErrorMsg: obj.ErrorMsg,
Conditions: obj.Conditions,
PortStart: obj.PortStart,
PortEnd: obj.PortEnd,
State: obj.State,
ErrorMsg: obj.ErrorMsg,
Conditions: obj.Conditions,
PortStart: obj.PortStart,
PortEnd: obj.PortEnd,
TargetPortStart: obj.TargetPortStart,
TargetPortEnd: obj.TargetPortEnd,
}
}

Expand Down
141 changes: 141 additions & 0 deletions pkg/client/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ func NewInstanceServiceClientWithTLS(ctx context.Context, ctxCancel context.Canc
type EngineCreateRequest struct {
ReplicaAddressMap map[string]string
Frontend string
InitiatorAddress string
TargetAddress string
UpgradeRequired bool
}

type ReplicaCreateRequest struct {
Expand Down Expand Up @@ -115,6 +118,17 @@ type InstanceCreateRequest struct {
BackendStoreDriver string
}

type InstanceSuspendRequest struct {
DataEngine string
Name string
InstanceType string
VolumeName string

Engine EngineCreateRequest
Replica ReplicaCreateRequest
}

// InstanceCreate creates an instance.
func (c *InstanceServiceClient) InstanceCreate(req *InstanceCreateRequest) (*api.Instance, error) {
if req.Name == "" || req.InstanceType == "" {
return nil, fmt.Errorf("failed to create instance: missing required parameter")
Expand Down Expand Up @@ -169,6 +183,10 @@ func (c *InstanceServiceClient) InstanceCreate(req *InstanceCreateRequest) (*api

ProcessInstanceSpec: processInstanceSpec,
SpdkInstanceSpec: spdkInstanceSpec,

UpgradeRequired: req.Engine.UpgradeRequired,
InitiatorAddress: req.Engine.InitiatorAddress,
TargetAddress: req.Engine.TargetAddress,
},
})
if err != nil {
Expand All @@ -178,6 +196,7 @@ func (c *InstanceServiceClient) InstanceCreate(req *InstanceCreateRequest) (*api
return api.RPCToInstance(p), nil
}

// InstanceDelete deletes the instance by name.
func (c *InstanceServiceClient) InstanceDelete(dataEngine, name, instanceType, diskUUID string, cleanupRequired bool) (*api.Instance, error) {
if name == "" {
return nil, fmt.Errorf("failed to delete instance: missing required parameter name")
Expand Down Expand Up @@ -207,6 +226,7 @@ func (c *InstanceServiceClient) InstanceDelete(dataEngine, name, instanceType, d
return api.RPCToInstance(p), nil
}

// InstanceGet returns the instance by name.
func (c *InstanceServiceClient) InstanceGet(dataEngine, name, instanceType string) (*api.Instance, error) {
if name == "" {
return nil, fmt.Errorf("failed to get instance: missing required parameter name")
Expand Down Expand Up @@ -246,6 +266,7 @@ func (c *InstanceServiceClient) InstanceList() (map[string]*api.Instance, error)
return api.RPCToInstanceList(instances), nil
}

// InstanceLog returns the log stream of an instance.
func (c *InstanceServiceClient) InstanceLog(ctx context.Context, dataEngine, name, instanceType string) (*api.LogStream, error) {
if name == "" {
return nil, fmt.Errorf("failed to get instance: missing required parameter name")
Expand All @@ -270,6 +291,7 @@ func (c *InstanceServiceClient) InstanceLog(ctx context.Context, dataEngine, nam
return api.NewLogStream(stream), nil
}

// InstanceWatch watches for instance updates.
func (c *InstanceServiceClient) InstanceWatch(ctx context.Context) (*api.InstanceStream, error) {
client := c.getControllerServiceClient()
stream, err := client.InstanceWatch(ctx, &emptypb.Empty{})
Expand All @@ -280,6 +302,7 @@ func (c *InstanceServiceClient) InstanceWatch(ctx context.Context) (*api.Instanc
return api.NewInstanceStream(stream), nil
}

// InstanceReplace replaces an instance with a new one.
func (c *InstanceServiceClient) InstanceReplace(dataEngine, name, instanceType, binary string, portCount int, args, portArgs []string, terminateSignal string) (*api.Instance, error) {
if name == "" || binary == "" {
return nil, fmt.Errorf("failed to replace instance: missing required parameter")
Expand Down Expand Up @@ -320,6 +343,120 @@ func (c *InstanceServiceClient) InstanceReplace(dataEngine, name, instanceType,
return api.RPCToInstance(p), nil
}

// InstanceSuspend suspends an instance.
func (c *InstanceServiceClient) InstanceSuspend(dataEngine, name, instanceType string) error {
if name == "" {
return fmt.Errorf("failed to suspend instance: missing required parameter name")
}

driver, ok := rpc.DataEngine_value[getDataEngine(dataEngine)]
if !ok {
return fmt.Errorf("failed to suspend instance: invalid data engine %v", dataEngine)
}

client := c.getControllerServiceClient()
ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout)
defer cancel()

_, err := client.InstanceSuspend(ctx, &rpc.InstanceSuspendRequest{
Name: name,
Type: instanceType,
DataEngine: rpc.DataEngine(driver),
})
if err != nil {
return errors.Wrapf(err, "failed to suspend instance %v", name)
}

return nil
}

// InstanceResume suspends an instance.
func (c *InstanceServiceClient) InstanceResume(dataEngine, name, instanceType string) error {
if name == "" {
return fmt.Errorf("failed to resume instance: missing required parameter name")
}

driver, ok := rpc.DataEngine_value[getDataEngine(dataEngine)]
if !ok {
return fmt.Errorf("failed to resume instance: invalid data engine %v", dataEngine)
}

client := c.getControllerServiceClient()
ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout)
defer cancel()

_, err := client.InstanceResume(ctx, &rpc.InstanceResumeRequest{
Name: name,
Type: instanceType,
DataEngine: rpc.DataEngine(driver),
})
if err != nil {
return errors.Wrapf(err, "failed to resume instance %v", name)
}

return nil
}

// InstanceSwitchOverTarget switches over the target for an instance.
func (c *InstanceServiceClient) InstanceSwitchOverTarget(dataEngine, name, instanceType, targetAddress string) error {
if name == "" {
return fmt.Errorf("failed to switch over target for instance: missing required parameter name")
}

if targetAddress == "" {
return fmt.Errorf("failed to switch over target for instance: missing required parameter target address")
}

driver, ok := rpc.DataEngine_value[getDataEngine(dataEngine)]
if !ok {
return fmt.Errorf("failed to switch over target instance: invalid data engine %v", dataEngine)
}

client := c.getControllerServiceClient()
ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout)
defer cancel()

_, err := client.InstanceSwitchOverTarget(ctx, &rpc.InstanceSwitchOverTargetRequest{
Name: name,
Type: instanceType,
DataEngine: rpc.DataEngine(driver),
TargetAddress: targetAddress,
})
if err != nil {
return errors.Wrapf(err, "failed to switch over target for instance %v", name)
}

return nil
}

// InstanceDeleteTarget delete target for an instance.
func (c *InstanceServiceClient) InstanceDeleteTarget(dataEngine, name, instanceType string) error {
if name == "" {
return fmt.Errorf("failed to delete target for instance: missing required parameter name")
}

driver, ok := rpc.DataEngine_value[getDataEngine(dataEngine)]
if !ok {
return fmt.Errorf("failed to delete target instance: invalid data engine %v", dataEngine)
}

client := c.getControllerServiceClient()
ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout)
defer cancel()

_, err := client.InstanceDeleteTarget(ctx, &rpc.InstanceDeleteTargetRequest{
Name: name,
Type: instanceType,
DataEngine: rpc.DataEngine(driver),
})
if err != nil {
return errors.Wrapf(err, "failed to delete target for instance %v", name)
}

return nil
}

// InstanceResume resumes an instance.
func (c *InstanceServiceClient) VersionGet() (*meta.VersionOutput, error) {
client := c.getControllerServiceClient()
ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout)
Expand All @@ -343,6 +480,7 @@ func (c *InstanceServiceClient) VersionGet() (*meta.VersionOutput, error) {
}, nil
}

// LogSetLevel sets the log level of the service.
func (c *InstanceServiceClient) LogSetLevel(dataEngine, service, level string) error {
client := c.getControllerServiceClient()
ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout)
Expand All @@ -360,6 +498,7 @@ func (c *InstanceServiceClient) LogSetLevel(dataEngine, service, level string) e
return err
}

// LogSetFlags sets the log flags of the service.x
func (c *InstanceServiceClient) LogSetFlags(dataEngine, service, flags string) error {
client := c.getControllerServiceClient()
ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout)
Expand All @@ -377,6 +516,7 @@ func (c *InstanceServiceClient) LogSetFlags(dataEngine, service, flags string) e
return err
}

// LogGetLevel returns the log level of the service.
func (c *InstanceServiceClient) LogGetLevel(dataEngine, service string) (string, error) {
client := c.getControllerServiceClient()
ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout)
Expand All @@ -396,6 +536,7 @@ func (c *InstanceServiceClient) LogGetLevel(dataEngine, service string) (string,
return resp.Level, nil
}

// LogGetFlags returns the log flags of the service.
func (c *InstanceServiceClient) LogGetFlags(dataEngine, service string) (string, error) {
client := c.getControllerServiceClient()
ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout)
Expand Down
Loading

0 comments on commit d93a24b

Please sign in to comment.