Skip to content

Commit

Permalink
instance: implement InstanceSuspend and InstanceCreate with UpgradeRe…
Browse files Browse the repository at this point in the history
…quired

Longhorn 6001

Signed-off-by: Derek Su <derek.su@suse.com>
  • Loading branch information
derekbit committed Apr 23, 2024
1 parent 134afdf commit f35f9c2
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 4 deletions.
51 changes: 51 additions & 0 deletions pkg/client/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func NewInstanceServiceClientWithTLS(ctx context.Context, ctxCancel context.Canc
type EngineCreateRequest struct {
ReplicaAddressMap map[string]string
Frontend string
UpgradeRequired bool
}

type ReplicaCreateRequest struct {
Expand Down Expand Up @@ -115,6 +116,17 @@ type InstanceCreateRequest struct {
BackendStoreDriver string
}

type InstanceSuspendRequest struct {
DataEngine string
Name string
InstanceType string
VolumeName string

Engine EngineCreateRequest
Replica ReplicaCreateRequest
}

// InstanceCreate creates an instance.
func (c *InstanceServiceClient) InstanceCreate(req *InstanceCreateRequest) (*api.Instance, error) {
if req.Name == "" || req.InstanceType == "" {
return nil, fmt.Errorf("failed to create instance: missing required parameter")
Expand Down Expand Up @@ -169,6 +181,8 @@ func (c *InstanceServiceClient) InstanceCreate(req *InstanceCreateRequest) (*api

ProcessInstanceSpec: processInstanceSpec,
SpdkInstanceSpec: spdkInstanceSpec,

UpgradeRequired: req.Engine.UpgradeRequired,
},
})
if err != nil {
Expand All @@ -178,6 +192,7 @@ func (c *InstanceServiceClient) InstanceCreate(req *InstanceCreateRequest) (*api
return api.RPCToInstance(p), nil
}

// InstanceDelete deletes the instance by name.
func (c *InstanceServiceClient) InstanceDelete(dataEngine, name, instanceType, diskUUID string, cleanupRequired bool) (*api.Instance, error) {
if name == "" {
return nil, fmt.Errorf("failed to delete instance: missing required parameter name")
Expand Down Expand Up @@ -207,6 +222,7 @@ func (c *InstanceServiceClient) InstanceDelete(dataEngine, name, instanceType, d
return api.RPCToInstance(p), nil
}

// InstanceGet returns the instance by name.
func (c *InstanceServiceClient) InstanceGet(dataEngine, name, instanceType string) (*api.Instance, error) {
if name == "" {
return nil, fmt.Errorf("failed to get instance: missing required parameter name")
Expand Down Expand Up @@ -246,6 +262,7 @@ func (c *InstanceServiceClient) InstanceList() (map[string]*api.Instance, error)
return api.RPCToInstanceList(instances), nil
}

// InstanceLog returns the log stream of an instance.
func (c *InstanceServiceClient) InstanceLog(ctx context.Context, dataEngine, name, instanceType string) (*api.LogStream, error) {
if name == "" {
return nil, fmt.Errorf("failed to get instance: missing required parameter name")
Expand All @@ -270,6 +287,7 @@ func (c *InstanceServiceClient) InstanceLog(ctx context.Context, dataEngine, nam
return api.NewLogStream(stream), nil
}

// InstanceWatch watches for instance updates.
func (c *InstanceServiceClient) InstanceWatch(ctx context.Context) (*api.InstanceStream, error) {
client := c.getControllerServiceClient()
stream, err := client.InstanceWatch(ctx, &emptypb.Empty{})
Expand All @@ -280,6 +298,7 @@ func (c *InstanceServiceClient) InstanceWatch(ctx context.Context) (*api.Instanc
return api.NewInstanceStream(stream), nil
}

// InstanceReplace replaces an instance with a new one.
func (c *InstanceServiceClient) InstanceReplace(dataEngine, name, instanceType, binary string, portCount int, args, portArgs []string, terminateSignal string) (*api.Instance, error) {
if name == "" || binary == "" {
return nil, fmt.Errorf("failed to replace instance: missing required parameter")
Expand Down Expand Up @@ -320,6 +339,34 @@ func (c *InstanceServiceClient) InstanceReplace(dataEngine, name, instanceType,
return api.RPCToInstance(p), nil
}

// InstanceSuspend suspends an instance.
func (c *InstanceServiceClient) InstanceSuspend(dataEngine, name, instanceType string) error {
if name == "" {
return fmt.Errorf("failed to suspend instance: missing required parameter name")
}

driver, ok := rpc.DataEngine_value[getDataEngine(dataEngine)]
if !ok {
return fmt.Errorf("failed to suspend instance: invalid data engine %v", dataEngine)
}

client := c.getControllerServiceClient()
ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout)
defer cancel()

_, err := client.InstanceSuspend(ctx, &rpc.InstanceSuspendRequest{
Name: name,
Type: instanceType,
DataEngine: rpc.DataEngine(driver),
})
if err != nil {
return errors.Wrapf(err, "failed to suspend instance %v", name)
}

return nil
}

// InstanceResume resumes an instance.
func (c *InstanceServiceClient) VersionGet() (*meta.VersionOutput, error) {
client := c.getControllerServiceClient()
ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout)
Expand All @@ -343,6 +390,7 @@ func (c *InstanceServiceClient) VersionGet() (*meta.VersionOutput, error) {
}, nil
}

// LogSetLevel sets the log level of the service.
func (c *InstanceServiceClient) LogSetLevel(dataEngine, service, level string) error {
client := c.getControllerServiceClient()
ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout)
Expand All @@ -360,6 +408,7 @@ func (c *InstanceServiceClient) LogSetLevel(dataEngine, service, level string) e
return err
}

// LogSetFlags sets the log flags of the service.x
func (c *InstanceServiceClient) LogSetFlags(dataEngine, service, flags string) error {
client := c.getControllerServiceClient()
ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout)
Expand All @@ -377,6 +426,7 @@ func (c *InstanceServiceClient) LogSetFlags(dataEngine, service, flags string) e
return err
}

// LogGetLevel returns the log level of the service.
func (c *InstanceServiceClient) LogGetLevel(dataEngine, service string) (string, error) {
client := c.getControllerServiceClient()
ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout)
Expand All @@ -396,6 +446,7 @@ func (c *InstanceServiceClient) LogGetLevel(dataEngine, service string) (string,
return resp.Level, nil
}

// LogGetFlags returns the log flags of the service.
func (c *InstanceServiceClient) LogGetFlags(dataEngine, service string) (string, error) {
client := c.getControllerServiceClient()
ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout)
Expand Down
51 changes: 47 additions & 4 deletions pkg/instance/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type InstanceOps interface {
InstanceList(map[string]*rpc.InstanceResponse) error
InstanceReplace(*rpc.InstanceReplaceRequest) (*rpc.InstanceResponse, error)
InstanceLog(*rpc.InstanceLogRequest, rpc.InstanceService_InstanceLogServer) error
InstanceSuspend(*rpc.InstanceSuspendRequest) (*emptypb.Empty, error)

LogSetLevel(context.Context, *rpc.LogSetLevelRequest) (*emptypb.Empty, error)
LogSetFlags(context.Context, *rpc.LogSetFlagsRequest) (*emptypb.Empty, error)
Expand Down Expand Up @@ -113,9 +114,10 @@ func (s *Server) VersionGet(ctx context.Context, req *emptypb.Empty) (*rpc.Versi

func (s *Server) InstanceCreate(ctx context.Context, req *rpc.InstanceCreateRequest) (*rpc.InstanceResponse, error) {
logrus.WithFields(logrus.Fields{
"name": req.Spec.Name,
"type": req.Spec.Type,
"dataEngine": req.Spec.DataEngine,
"name": req.Spec.Name,
"type": req.Spec.Type,
"dataEngine": req.Spec.DataEngine,
"upgradeRequired": req.Spec.UpgradeRequired,
}).Info("Creating instance")

ops, ok := s.ops[req.Spec.DataEngine]
Expand Down Expand Up @@ -154,7 +156,9 @@ func (ops V2DataEngineInstanceOps) InstanceCreate(req *rpc.InstanceCreateRequest

switch req.Spec.Type {
case types.InstanceTypeEngine:
engine, err := c.EngineCreate(req.Spec.Name, req.Spec.VolumeName, req.Spec.SpdkInstanceSpec.Frontend, req.Spec.SpdkInstanceSpec.Size, req.Spec.SpdkInstanceSpec.ReplicaAddressMap, req.Spec.PortCount)
engine, err := c.EngineCreate(req.Spec.Name, req.Spec.VolumeName, req.Spec.SpdkInstanceSpec.Frontend,
req.Spec.SpdkInstanceSpec.Size, req.Spec.SpdkInstanceSpec.ReplicaAddressMap, req.Spec.PortCount,
req.Spec.UpgradeRequired)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -716,3 +720,42 @@ func engineResponseToInstanceResponse(e *spdkapi.Engine) *rpc.InstanceResponse {
},
}
}

func (s *Server) InstanceSuspend(ctx context.Context, req *rpc.InstanceSuspendRequest) (*emptypb.Empty, error) {
logrus.WithFields(logrus.Fields{
"name": req.Name,
"type": req.Type,
"dataEngine": req.DataEngine,
}).Info("Suspending instance")

ops, ok := s.ops[req.DataEngine]
if !ok {
return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "unsupported data engine %v", req.DataEngine)
}
return ops.InstanceSuspend(req)
}

func (ops V1DataEngineInstanceOps) InstanceSuspend(req *rpc.InstanceSuspendRequest) (*emptypb.Empty, error) {
return nil, grpcstatus.Error(grpccodes.Unimplemented, "v1 data engine instance suspend is not supported")
}

func (ops V2DataEngineInstanceOps) InstanceSuspend(req *rpc.InstanceSuspendRequest) (*emptypb.Empty, error) {
c, err := spdkclient.NewSPDKClient(ops.spdkServiceAddress)
if err != nil {
return nil, grpcstatus.Error(grpccodes.Internal, errors.Wrapf(err, "failed to create SPDK client").Error())
}
defer c.Close()

switch req.Type {
case types.InstanceTypeEngine:
err := c.EngineSuspend(req.Name)
if err != nil {
return nil, grpcstatus.Error(grpccodes.Internal, errors.Wrapf(err, "failed to suspend engine %v", req.Name).Error())
}
return &emptypb.Empty{}, nil
case types.InstanceTypeReplica:
return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "suspend is not supported for instance type %v", req.Type)
default:
return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "unknown instance type %v", req.Type)
}
}

0 comments on commit f35f9c2

Please sign in to comment.