diff --git a/pkg/client/instance.go b/pkg/client/instance.go index 5367a566e..77db7fe14 100644 --- a/pkg/client/instance.go +++ b/pkg/client/instance.go @@ -88,6 +88,7 @@ func NewInstanceServiceClientWithTLS(ctx context.Context, ctxCancel context.Canc type EngineCreateRequest struct { ReplicaAddressMap map[string]string Frontend string + UpgradeRequired bool } type ReplicaCreateRequest struct { @@ -115,6 +116,17 @@ type InstanceCreateRequest struct { BackendStoreDriver string } +type InstanceSuspendRequest struct { + DataEngine string + Name string + InstanceType string + VolumeName string + + Engine EngineCreateRequest + Replica ReplicaCreateRequest +} + +// InstanceCreate creates an instance. func (c *InstanceServiceClient) InstanceCreate(req *InstanceCreateRequest) (*api.Instance, error) { if req.Name == "" || req.InstanceType == "" { return nil, fmt.Errorf("failed to create instance: missing required parameter") @@ -169,6 +181,8 @@ func (c *InstanceServiceClient) InstanceCreate(req *InstanceCreateRequest) (*api ProcessInstanceSpec: processInstanceSpec, SpdkInstanceSpec: spdkInstanceSpec, + + UpgradeRequired: req.Engine.UpgradeRequired, }, }) if err != nil { @@ -178,6 +192,7 @@ func (c *InstanceServiceClient) InstanceCreate(req *InstanceCreateRequest) (*api return api.RPCToInstance(p), nil } +// InstanceDelete deletes the instance by name. func (c *InstanceServiceClient) InstanceDelete(dataEngine, name, instanceType, diskUUID string, cleanupRequired bool) (*api.Instance, error) { if name == "" { return nil, fmt.Errorf("failed to delete instance: missing required parameter name") @@ -207,6 +222,7 @@ func (c *InstanceServiceClient) InstanceDelete(dataEngine, name, instanceType, d return api.RPCToInstance(p), nil } +// InstanceGet returns the instance by name. func (c *InstanceServiceClient) InstanceGet(dataEngine, name, instanceType string) (*api.Instance, error) { if name == "" { return nil, fmt.Errorf("failed to get instance: missing required parameter name") @@ -246,6 +262,7 @@ func (c *InstanceServiceClient) InstanceList() (map[string]*api.Instance, error) return api.RPCToInstanceList(instances), nil } +// InstanceLog returns the log stream of an instance. func (c *InstanceServiceClient) InstanceLog(ctx context.Context, dataEngine, name, instanceType string) (*api.LogStream, error) { if name == "" { return nil, fmt.Errorf("failed to get instance: missing required parameter name") @@ -270,6 +287,7 @@ func (c *InstanceServiceClient) InstanceLog(ctx context.Context, dataEngine, nam return api.NewLogStream(stream), nil } +// InstanceWatch watches for instance updates. func (c *InstanceServiceClient) InstanceWatch(ctx context.Context) (*api.InstanceStream, error) { client := c.getControllerServiceClient() stream, err := client.InstanceWatch(ctx, &emptypb.Empty{}) @@ -280,6 +298,7 @@ func (c *InstanceServiceClient) InstanceWatch(ctx context.Context) (*api.Instanc return api.NewInstanceStream(stream), nil } +// InstanceReplace replaces an instance with a new one. func (c *InstanceServiceClient) InstanceReplace(dataEngine, name, instanceType, binary string, portCount int, args, portArgs []string, terminateSignal string) (*api.Instance, error) { if name == "" || binary == "" { return nil, fmt.Errorf("failed to replace instance: missing required parameter") @@ -320,6 +339,34 @@ func (c *InstanceServiceClient) InstanceReplace(dataEngine, name, instanceType, return api.RPCToInstance(p), nil } +// InstanceSuspend suspends an instance. +func (c *InstanceServiceClient) InstanceSuspend(dataEngine, name, instanceType string) error { + if name == "" { + return fmt.Errorf("failed to suspend instance: missing required parameter name") + } + + driver, ok := rpc.DataEngine_value[getDataEngine(dataEngine)] + if !ok { + return fmt.Errorf("failed to suspend instance: invalid data engine %v", dataEngine) + } + + client := c.getControllerServiceClient() + ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout) + defer cancel() + + _, err := client.InstanceSuspend(ctx, &rpc.InstanceSuspendRequest{ + Name: name, + Type: instanceType, + DataEngine: rpc.DataEngine(driver), + }) + if err != nil { + return errors.Wrapf(err, "failed to suspend instance %v", name) + } + + return nil +} + +// InstanceResume resumes an instance. func (c *InstanceServiceClient) VersionGet() (*meta.VersionOutput, error) { client := c.getControllerServiceClient() ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout) @@ -343,6 +390,7 @@ func (c *InstanceServiceClient) VersionGet() (*meta.VersionOutput, error) { }, nil } +// LogSetLevel sets the log level of the service. func (c *InstanceServiceClient) LogSetLevel(dataEngine, service, level string) error { client := c.getControllerServiceClient() ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout) @@ -360,6 +408,7 @@ func (c *InstanceServiceClient) LogSetLevel(dataEngine, service, level string) e return err } +// LogSetFlags sets the log flags of the service.x func (c *InstanceServiceClient) LogSetFlags(dataEngine, service, flags string) error { client := c.getControllerServiceClient() ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout) @@ -377,6 +426,7 @@ func (c *InstanceServiceClient) LogSetFlags(dataEngine, service, flags string) e return err } +// LogGetLevel returns the log level of the service. func (c *InstanceServiceClient) LogGetLevel(dataEngine, service string) (string, error) { client := c.getControllerServiceClient() ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout) @@ -396,6 +446,7 @@ func (c *InstanceServiceClient) LogGetLevel(dataEngine, service string) (string, return resp.Level, nil } +// LogGetFlags returns the log flags of the service. func (c *InstanceServiceClient) LogGetFlags(dataEngine, service string) (string, error) { client := c.getControllerServiceClient() ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout) diff --git a/pkg/instance/instance.go b/pkg/instance/instance.go index 18f4edb44..0e6f03691 100644 --- a/pkg/instance/instance.go +++ b/pkg/instance/instance.go @@ -35,6 +35,7 @@ type InstanceOps interface { InstanceList(map[string]*rpc.InstanceResponse) error InstanceReplace(*rpc.InstanceReplaceRequest) (*rpc.InstanceResponse, error) InstanceLog(*rpc.InstanceLogRequest, rpc.InstanceService_InstanceLogServer) error + InstanceSuspend(*rpc.InstanceSuspendRequest) (*emptypb.Empty, error) LogSetLevel(context.Context, *rpc.LogSetLevelRequest) (*emptypb.Empty, error) LogSetFlags(context.Context, *rpc.LogSetFlagsRequest) (*emptypb.Empty, error) @@ -113,9 +114,10 @@ func (s *Server) VersionGet(ctx context.Context, req *emptypb.Empty) (*rpc.Versi func (s *Server) InstanceCreate(ctx context.Context, req *rpc.InstanceCreateRequest) (*rpc.InstanceResponse, error) { logrus.WithFields(logrus.Fields{ - "name": req.Spec.Name, - "type": req.Spec.Type, - "dataEngine": req.Spec.DataEngine, + "name": req.Spec.Name, + "type": req.Spec.Type, + "dataEngine": req.Spec.DataEngine, + "upgradeRequired": req.Spec.UpgradeRequired, }).Info("Creating instance") ops, ok := s.ops[req.Spec.DataEngine] @@ -154,7 +156,9 @@ func (ops V2DataEngineInstanceOps) InstanceCreate(req *rpc.InstanceCreateRequest switch req.Spec.Type { case types.InstanceTypeEngine: - engine, err := c.EngineCreate(req.Spec.Name, req.Spec.VolumeName, req.Spec.SpdkInstanceSpec.Frontend, req.Spec.SpdkInstanceSpec.Size, req.Spec.SpdkInstanceSpec.ReplicaAddressMap, req.Spec.PortCount) + engine, err := c.EngineCreate(req.Spec.Name, req.Spec.VolumeName, req.Spec.SpdkInstanceSpec.Frontend, + req.Spec.SpdkInstanceSpec.Size, req.Spec.SpdkInstanceSpec.ReplicaAddressMap, req.Spec.PortCount, + req.Spec.UpgradeRequired) if err != nil { return nil, err } @@ -716,3 +720,42 @@ func engineResponseToInstanceResponse(e *spdkapi.Engine) *rpc.InstanceResponse { }, } } + +func (s *Server) InstanceSuspend(ctx context.Context, req *rpc.InstanceSuspendRequest) (*emptypb.Empty, error) { + logrus.WithFields(logrus.Fields{ + "name": req.Name, + "type": req.Type, + "dataEngine": req.DataEngine, + }).Info("Suspending instance") + + ops, ok := s.ops[req.DataEngine] + if !ok { + return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "unsupported data engine %v", req.DataEngine) + } + return ops.InstanceSuspend(req) +} + +func (ops V1DataEngineInstanceOps) InstanceSuspend(req *rpc.InstanceSuspendRequest) (*emptypb.Empty, error) { + return nil, grpcstatus.Error(grpccodes.Unimplemented, "v1 data engine instance suspend is not supported") +} + +func (ops V2DataEngineInstanceOps) InstanceSuspend(req *rpc.InstanceSuspendRequest) (*emptypb.Empty, error) { + c, err := spdkclient.NewSPDKClient(ops.spdkServiceAddress) + if err != nil { + return nil, grpcstatus.Error(grpccodes.Internal, errors.Wrapf(err, "failed to create SPDK client").Error()) + } + defer c.Close() + + switch req.Type { + case types.InstanceTypeEngine: + err := c.EngineSuspend(req.Name) + if err != nil { + return nil, grpcstatus.Error(grpccodes.Internal, errors.Wrapf(err, "failed to suspend engine %v", req.Name).Error()) + } + return &emptypb.Empty{}, nil + case types.InstanceTypeReplica: + return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "suspend is not supported for instance type %v", req.Type) + default: + return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "unknown instance type %v", req.Type) + } +}