Skip to content

Commit

Permalink
Support v2 volume live upgrade
Browse files Browse the repository at this point in the history
Longhorn 6001

Signed-off-by: Derek Su <derek.su@suse.com>
  • Loading branch information
derekbit committed May 21, 2024
1 parent a77fc13 commit d7b98cc
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 4 deletions.
90 changes: 90 additions & 0 deletions pkg/client/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/tls"
"fmt"
"net/url"

rpc "github.com/longhorn/types/pkg/generated/imrpc"
"github.com/pkg/errors"
Expand Down Expand Up @@ -88,6 +89,7 @@ func NewInstanceServiceClientWithTLS(ctx context.Context, ctxCancel context.Canc
type EngineCreateRequest struct {
ReplicaAddressMap map[string]string
Frontend string
UpgradeRequired bool
}

type ReplicaCreateRequest struct {
Expand Down Expand Up @@ -115,6 +117,17 @@ type InstanceCreateRequest struct {
BackendStoreDriver string
}

type InstanceSuspendRequest struct {
DataEngine string
Name string
InstanceType string
VolumeName string

Engine EngineCreateRequest
Replica ReplicaCreateRequest
}

// InstanceCreate creates an instance.
func (c *InstanceServiceClient) InstanceCreate(req *InstanceCreateRequest) (*api.Instance, error) {
if req.Name == "" || req.InstanceType == "" {
return nil, fmt.Errorf("failed to create instance: missing required parameter")
Expand Down Expand Up @@ -156,6 +169,11 @@ func (c *InstanceServiceClient) InstanceCreate(req *InstanceCreateRequest) (*api
}
}

uri, err := url.Parse(c.serviceURL)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse service URL from %v for instance create", c.serviceURL)
}

p, err := client.InstanceCreate(ctx, &rpc.InstanceCreateRequest{
Spec: &rpc.InstanceSpec{
// nolint:all replaced with DataEngine
Expand All @@ -169,6 +187,9 @@ func (c *InstanceServiceClient) InstanceCreate(req *InstanceCreateRequest) (*api

ProcessInstanceSpec: processInstanceSpec,
SpdkInstanceSpec: spdkInstanceSpec,

UpgradeRequired: req.Engine.UpgradeRequired,
TargetAddress: uri.Hostname(),
},
})
if err != nil {
Expand All @@ -178,6 +199,7 @@ func (c *InstanceServiceClient) InstanceCreate(req *InstanceCreateRequest) (*api
return api.RPCToInstance(p), nil
}

// InstanceDelete deletes the instance by name.
func (c *InstanceServiceClient) InstanceDelete(dataEngine, name, instanceType, diskUUID string, cleanupRequired bool) (*api.Instance, error) {
if name == "" {
return nil, fmt.Errorf("failed to delete instance: missing required parameter name")
Expand Down Expand Up @@ -207,6 +229,7 @@ func (c *InstanceServiceClient) InstanceDelete(dataEngine, name, instanceType, d
return api.RPCToInstance(p), nil
}

// InstanceGet returns the instance by name.
func (c *InstanceServiceClient) InstanceGet(dataEngine, name, instanceType string) (*api.Instance, error) {
if name == "" {
return nil, fmt.Errorf("failed to get instance: missing required parameter name")
Expand Down Expand Up @@ -246,6 +269,7 @@ func (c *InstanceServiceClient) InstanceList() (map[string]*api.Instance, error)
return api.RPCToInstanceList(instances), nil
}

// InstanceLog returns the log stream of an instance.
func (c *InstanceServiceClient) InstanceLog(ctx context.Context, dataEngine, name, instanceType string) (*api.LogStream, error) {
if name == "" {
return nil, fmt.Errorf("failed to get instance: missing required parameter name")
Expand All @@ -270,6 +294,7 @@ func (c *InstanceServiceClient) InstanceLog(ctx context.Context, dataEngine, nam
return api.NewLogStream(stream), nil
}

// InstanceWatch watches for instance updates.
func (c *InstanceServiceClient) InstanceWatch(ctx context.Context) (*api.InstanceStream, error) {
client := c.getControllerServiceClient()
stream, err := client.InstanceWatch(ctx, &emptypb.Empty{})
Expand All @@ -280,6 +305,7 @@ func (c *InstanceServiceClient) InstanceWatch(ctx context.Context) (*api.Instanc
return api.NewInstanceStream(stream), nil
}

// InstanceReplace replaces an instance with a new one.
func (c *InstanceServiceClient) InstanceReplace(dataEngine, name, instanceType, binary string, portCount int, args, portArgs []string, terminateSignal string) (*api.Instance, error) {
if name == "" || binary == "" {
return nil, fmt.Errorf("failed to replace instance: missing required parameter")
Expand Down Expand Up @@ -320,6 +346,66 @@ func (c *InstanceServiceClient) InstanceReplace(dataEngine, name, instanceType,
return api.RPCToInstance(p), nil
}

// InstanceSuspend suspends an instance.
func (c *InstanceServiceClient) InstanceSuspend(dataEngine, name, instanceType string) error {
if name == "" {
return fmt.Errorf("failed to suspend instance: missing required parameter name")
}

driver, ok := rpc.DataEngine_value[getDataEngine(dataEngine)]
if !ok {
return fmt.Errorf("failed to suspend instance: invalid data engine %v", dataEngine)
}

client := c.getControllerServiceClient()
ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout)
defer cancel()

_, err := client.InstanceSuspend(ctx, &rpc.InstanceSuspendRequest{
Name: name,
Type: instanceType,
DataEngine: rpc.DataEngine(driver),
})
if err != nil {
return errors.Wrapf(err, "failed to suspend instance %v", name)
}

return nil
}

// InstanceSwitchOver switches over the target for an instance.
func (c *InstanceServiceClient) InstanceSwitchOver(dataEngine, name, instanceType, targetAddress string) error {
if name == "" {
return fmt.Errorf("failed to switch over target for instance: missing required parameter name")
}

if targetAddress == "" {
return fmt.Errorf("failed to switch over target for instance: missing required parameter target address")
}

driver, ok := rpc.DataEngine_value[getDataEngine(dataEngine)]
if !ok {
return fmt.Errorf("failed to switch over target instance: invalid data engine %v", dataEngine)
}

client := c.getControllerServiceClient()
ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout)
defer cancel()

_, err := client.InstanceSwitchOver(ctx, &rpc.InstanceSwitchOverRequest{
Name: name,
Type: instanceType,
DataEngine: rpc.DataEngine(driver),
TargetAddress: targetAddress,
})
if err != nil {
return errors.Wrapf(err, "failed to switch over target instance %v", name)
}

return nil
}

// InstanceResume resumes an instance.
func (c *InstanceServiceClient) VersionGet() (*meta.VersionOutput, error) {
client := c.getControllerServiceClient()
ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout)
Expand All @@ -343,6 +429,7 @@ func (c *InstanceServiceClient) VersionGet() (*meta.VersionOutput, error) {
}, nil
}

// LogSetLevel sets the log level of the service.
func (c *InstanceServiceClient) LogSetLevel(dataEngine, service, level string) error {
client := c.getControllerServiceClient()
ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout)
Expand All @@ -360,6 +447,7 @@ func (c *InstanceServiceClient) LogSetLevel(dataEngine, service, level string) e
return err
}

// LogSetFlags sets the log flags of the service.x
func (c *InstanceServiceClient) LogSetFlags(dataEngine, service, flags string) error {
client := c.getControllerServiceClient()
ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout)
Expand All @@ -377,6 +465,7 @@ func (c *InstanceServiceClient) LogSetFlags(dataEngine, service, flags string) e
return err
}

// LogGetLevel returns the log level of the service.
func (c *InstanceServiceClient) LogGetLevel(dataEngine, service string) (string, error) {
client := c.getControllerServiceClient()
ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout)
Expand All @@ -396,6 +485,7 @@ func (c *InstanceServiceClient) LogGetLevel(dataEngine, service string) (string,
return resp.Level, nil
}

// LogGetFlags returns the log flags of the service.
func (c *InstanceServiceClient) LogGetFlags(dataEngine, service string) (string, error) {
client := c.getControllerServiceClient()
ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout)
Expand Down
91 changes: 87 additions & 4 deletions pkg/instance/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type InstanceOps interface {
InstanceList(map[string]*rpc.InstanceResponse) error
InstanceReplace(*rpc.InstanceReplaceRequest) (*rpc.InstanceResponse, error)
InstanceLog(*rpc.InstanceLogRequest, rpc.InstanceService_InstanceLogServer) error
InstanceSuspend(*rpc.InstanceSuspendRequest) (*emptypb.Empty, error)
InstanceSwitchOver(*rpc.InstanceSwitchOverRequest) (*emptypb.Empty, error)

LogSetLevel(context.Context, *rpc.LogSetLevelRequest) (*emptypb.Empty, error)
LogSetFlags(context.Context, *rpc.LogSetFlagsRequest) (*emptypb.Empty, error)
Expand Down Expand Up @@ -107,9 +109,10 @@ func (s *Server) VersionGet(ctx context.Context, req *emptypb.Empty) (*rpc.Versi

func (s *Server) InstanceCreate(ctx context.Context, req *rpc.InstanceCreateRequest) (*rpc.InstanceResponse, error) {
logrus.WithFields(logrus.Fields{
"name": req.Spec.Name,
"type": req.Spec.Type,
"dataEngine": req.Spec.DataEngine,
"name": req.Spec.Name,
"type": req.Spec.Type,
"dataEngine": req.Spec.DataEngine,
"upgradeRequired": req.Spec.UpgradeRequired,
}).Info("Creating instance")

ops, ok := s.ops[req.Spec.DataEngine]
Expand Down Expand Up @@ -148,7 +151,8 @@ func (ops V2DataEngineInstanceOps) InstanceCreate(req *rpc.InstanceCreateRequest

switch req.Spec.Type {
case types.InstanceTypeEngine:
engine, err := c.EngineCreate(req.Spec.Name, req.Spec.VolumeName, req.Spec.SpdkInstanceSpec.Frontend, req.Spec.SpdkInstanceSpec.Size, req.Spec.SpdkInstanceSpec.ReplicaAddressMap, req.Spec.PortCount)
engine, err := c.EngineCreate(req.Spec.Name, req.Spec.VolumeName, req.Spec.SpdkInstanceSpec.Frontend, req.Spec.SpdkInstanceSpec.Size, req.Spec.SpdkInstanceSpec.ReplicaAddressMap,
req.Spec.PortCount, req.Spec.UpgradeRequired, req.Spec.TargetAddress)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -710,3 +714,82 @@ func engineResponseToInstanceResponse(e *spdkapi.Engine) *rpc.InstanceResponse {
},
}
}

func (s *Server) InstanceSuspend(ctx context.Context, req *rpc.InstanceSuspendRequest) (*emptypb.Empty, error) {
logrus.WithFields(logrus.Fields{
"name": req.Name,
"type": req.Type,
"dataEngine": req.DataEngine,
}).Info("Suspending instance")

ops, ok := s.ops[req.DataEngine]
if !ok {
return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "unsupported data engine %v", req.DataEngine)
}
return ops.InstanceSuspend(req)
}

func (ops V1DataEngineInstanceOps) InstanceSuspend(req *rpc.InstanceSuspendRequest) (*emptypb.Empty, error) {
return nil, grpcstatus.Error(grpccodes.Unimplemented, "v1 data engine instance suspend is not supported")
}

func (ops V2DataEngineInstanceOps) InstanceSuspend(req *rpc.InstanceSuspendRequest) (*emptypb.Empty, error) {
c, err := spdkclient.NewSPDKClient(ops.spdkServiceAddress)
if err != nil {
return nil, grpcstatus.Error(grpccodes.Internal, errors.Wrapf(err, "failed to create SPDK client").Error())
}
defer c.Close()

switch req.Type {
case types.InstanceTypeEngine:
err := c.EngineSuspend(req.Name)
if err != nil {
return nil, grpcstatus.Error(grpccodes.Internal, errors.Wrapf(err, "failed to suspend engine %v", req.Name).Error())
}
return &emptypb.Empty{}, nil
case types.InstanceTypeReplica:
return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "suspend is not supported for instance type %v", req.Type)
default:
return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "unknown instance type %v", req.Type)
}
}

func (s *Server) InstanceSwitchOver(ctx context.Context, req *rpc.InstanceSwitchOverRequest) (*emptypb.Empty, error) {
logrus.WithFields(logrus.Fields{
"name": req.Name,
"type": req.Type,
"dataEngine": req.DataEngine,
"targetAddress": req.TargetAddress,
}).Info("Suspending instance")

ops, ok := s.ops[req.DataEngine]
if !ok {
return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "unsupported data engine %v", req.DataEngine)
}
return ops.InstanceSwitchOver(req)
}

func (ops V1DataEngineInstanceOps) InstanceSwitchOver(req *rpc.InstanceSwitchOverRequest) (*emptypb.Empty, error) {
return nil, grpcstatus.Error(grpccodes.Unimplemented, "v1 data engine instance target switch over is not supported")
}

func (ops V2DataEngineInstanceOps) InstanceSwitchOver(req *rpc.InstanceSwitchOverRequest) (*emptypb.Empty, error) {
c, err := spdkclient.NewSPDKClient(ops.spdkServiceAddress)
if err != nil {
return nil, grpcstatus.Error(grpccodes.Internal, errors.Wrapf(err, "failed to create SPDK client").Error())
}
defer c.Close()

switch req.Type {
case types.InstanceTypeEngine:
err := c.EngineSwitchOver(req.Name, req.TargetAddress)
if err != nil {
return nil, grpcstatus.Error(grpccodes.Internal, errors.Wrapf(err, "failed to switch over target for engine %v", req.Name).Error())
}
return &emptypb.Empty{}, nil
case types.InstanceTypeReplica:
return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "switchover is not supported for instance type %v", req.Type)
default:
return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "unknown instance type %v", req.Type)
}
}

0 comments on commit d7b98cc

Please sign in to comment.