From d93a24b426ce2312c41d1caa1862b3d8e355068d Mon Sep 17 00:00:00 2001 From: Derek Su Date: Thu, 27 Jun 2024 07:12:43 +0000 Subject: [PATCH] feat(upgrade): support v2 volume live upgrade Longhorn 6001 Signed-off-by: Derek Su --- app/cmd/start.go | 57 +----------- pkg/api/instance.go | 24 ++--- pkg/client/instance.go | 141 +++++++++++++++++++++++++++++ pkg/instance/instance.go | 189 ++++++++++++++++++++++++++++++++++++--- 4 files changed, 335 insertions(+), 76 deletions(-) diff --git a/app/cmd/start.go b/app/cmd/start.go index 7c33f89d2..94dfe0127 100644 --- a/app/cmd/start.go +++ b/app/cmd/start.go @@ -10,7 +10,6 @@ import ( "os" "os/signal" "path/filepath" - "regexp" "strconv" "strings" "syscall" @@ -26,10 +25,6 @@ import ( "google.golang.org/grpc/reflection" "k8s.io/mount-utils" - commonTypes "github.com/longhorn/go-common-libs/types" - helpernvme "github.com/longhorn/go-spdk-helper/pkg/nvme" - helpertypes "github.com/longhorn/go-spdk-helper/pkg/types" - helperutil "github.com/longhorn/go-spdk-helper/pkg/util" engineutil "github.com/longhorn/longhorn-engine/pkg/util" spdk "github.com/longhorn/longhorn-spdk-engine/pkg/spdk" spdkutil "github.com/longhorn/longhorn-spdk-engine/pkg/util" @@ -115,52 +110,6 @@ func cleanup(pm *process.Manager) { logrus.Errorf("Failed to clean up all processes for %s graceful shutdown", types.ProcessManagerGrpcService) } -func getVolumeNameFromNQN(input string) (string, error) { - // compile a regular expression that matches ${name} between : and -e- - re, err := regexp.Compile(`:(.*)-e-`) - if err != nil { - return "", err - } - // find the first submatch of the input string - submatch := re.FindStringSubmatch(input) - if len(submatch) < 2 { - return "", fmt.Errorf("no name found in input") - } - // return the second element of the submatch, which is ${name} - return submatch[1], nil -} - -func cleanupStaledNvmeAndDmDevices() error { - executor, err := helperutil.NewExecutor(commonTypes.ProcDirectory) - if err != nil { - return errors.Wrapf(err, "failed to create executor for cleaning up staled NVMe and dm devices") - } - - subsystems, err := helpernvme.GetSubsystems(executor) - if err != nil { - return errors.Wrapf(err, "failed to get NVMe subsystems") - } - for _, sys := range subsystems { - logrus.Infof("Found NVMe subsystem %+v", sys) - if strings.HasPrefix(sys.NQN, helpertypes.NQNPrefix) { - dmDeviceName, err := getVolumeNameFromNQN(sys.NQN) - if err != nil { - return errors.Wrapf(err, "failed to get volume name from NQN %v", sys.NQN) - } - logrus.Infof("Removing dm device %v", dmDeviceName) - if err := helperutil.DmsetupRemove(dmDeviceName, false, false, executor); err != nil { - logrus.WithError(err).Warnf("Failed to remove dm device %v, will continue the cleanup", dmDeviceName) - } - - logrus.Infof("Cleaning up NVMe subsystem %v: NQN %v", sys.Name, sys.NQN) - if err := helpernvme.DisconnectTarget(sys.NQN, executor); err != nil { - logrus.WithError(err).Warnf("Failed to disconnect NVMe subsystem %v: NQN %v, will continue the cleanup", sys.Name, sys.NQN) - } - } - } - return nil -} - func unfreezeFilesystems() error { // We do not need to switch to the host mount namespace to get mount points here. Usually, longhorn-engine runs in a // container that has / bind mounted to /host with at least HostToContainer (rslave) propagation. @@ -209,11 +158,7 @@ func start(c *cli.Context) (err error) { return err } - if spdkEnabled { - if err := cleanupStaledNvmeAndDmDevices(); err != nil { - return err - } - } else { + if !spdkEnabled { if err := unfreezeFilesystems(); err != nil { return err } diff --git a/pkg/api/instance.go b/pkg/api/instance.go index c151db5eb..74752823a 100644 --- a/pkg/api/instance.go +++ b/pkg/api/instance.go @@ -64,20 +64,24 @@ func RPCToInstanceList(obj *rpc.InstanceListResponse) map[string]*Instance { } type InstanceStatus struct { - State string `json:"state"` - ErrorMsg string `json:"errorMsg"` - Conditions map[string]bool `json:"conditions"` - PortStart int32 `json:"portStart"` - PortEnd int32 `json:"portEnd"` + State string `json:"state"` + ErrorMsg string `json:"errorMsg"` + Conditions map[string]bool `json:"conditions"` + PortStart int32 `json:"portStart"` + PortEnd int32 `json:"portEnd"` + TargetPortStart int32 `json:"targetPortStart"` + TargetPortEnd int32 `json:"targetPortEnd"` } func RPCToInstanceStatus(obj *rpc.InstanceStatus) InstanceStatus { return InstanceStatus{ - State: obj.State, - ErrorMsg: obj.ErrorMsg, - Conditions: obj.Conditions, - PortStart: obj.PortStart, - PortEnd: obj.PortEnd, + State: obj.State, + ErrorMsg: obj.ErrorMsg, + Conditions: obj.Conditions, + PortStart: obj.PortStart, + PortEnd: obj.PortEnd, + TargetPortStart: obj.TargetPortStart, + TargetPortEnd: obj.TargetPortEnd, } } diff --git a/pkg/client/instance.go b/pkg/client/instance.go index 5367a566e..b7c6c6915 100644 --- a/pkg/client/instance.go +++ b/pkg/client/instance.go @@ -88,6 +88,9 @@ func NewInstanceServiceClientWithTLS(ctx context.Context, ctxCancel context.Canc type EngineCreateRequest struct { ReplicaAddressMap map[string]string Frontend string + InitiatorAddress string + TargetAddress string + UpgradeRequired bool } type ReplicaCreateRequest struct { @@ -115,6 +118,17 @@ type InstanceCreateRequest struct { BackendStoreDriver string } +type InstanceSuspendRequest struct { + DataEngine string + Name string + InstanceType string + VolumeName string + + Engine EngineCreateRequest + Replica ReplicaCreateRequest +} + +// InstanceCreate creates an instance. func (c *InstanceServiceClient) InstanceCreate(req *InstanceCreateRequest) (*api.Instance, error) { if req.Name == "" || req.InstanceType == "" { return nil, fmt.Errorf("failed to create instance: missing required parameter") @@ -169,6 +183,10 @@ func (c *InstanceServiceClient) InstanceCreate(req *InstanceCreateRequest) (*api ProcessInstanceSpec: processInstanceSpec, SpdkInstanceSpec: spdkInstanceSpec, + + UpgradeRequired: req.Engine.UpgradeRequired, + InitiatorAddress: req.Engine.InitiatorAddress, + TargetAddress: req.Engine.TargetAddress, }, }) if err != nil { @@ -178,6 +196,7 @@ func (c *InstanceServiceClient) InstanceCreate(req *InstanceCreateRequest) (*api return api.RPCToInstance(p), nil } +// InstanceDelete deletes the instance by name. func (c *InstanceServiceClient) InstanceDelete(dataEngine, name, instanceType, diskUUID string, cleanupRequired bool) (*api.Instance, error) { if name == "" { return nil, fmt.Errorf("failed to delete instance: missing required parameter name") @@ -207,6 +226,7 @@ func (c *InstanceServiceClient) InstanceDelete(dataEngine, name, instanceType, d return api.RPCToInstance(p), nil } +// InstanceGet returns the instance by name. func (c *InstanceServiceClient) InstanceGet(dataEngine, name, instanceType string) (*api.Instance, error) { if name == "" { return nil, fmt.Errorf("failed to get instance: missing required parameter name") @@ -246,6 +266,7 @@ func (c *InstanceServiceClient) InstanceList() (map[string]*api.Instance, error) return api.RPCToInstanceList(instances), nil } +// InstanceLog returns the log stream of an instance. func (c *InstanceServiceClient) InstanceLog(ctx context.Context, dataEngine, name, instanceType string) (*api.LogStream, error) { if name == "" { return nil, fmt.Errorf("failed to get instance: missing required parameter name") @@ -270,6 +291,7 @@ func (c *InstanceServiceClient) InstanceLog(ctx context.Context, dataEngine, nam return api.NewLogStream(stream), nil } +// InstanceWatch watches for instance updates. func (c *InstanceServiceClient) InstanceWatch(ctx context.Context) (*api.InstanceStream, error) { client := c.getControllerServiceClient() stream, err := client.InstanceWatch(ctx, &emptypb.Empty{}) @@ -280,6 +302,7 @@ func (c *InstanceServiceClient) InstanceWatch(ctx context.Context) (*api.Instanc return api.NewInstanceStream(stream), nil } +// InstanceReplace replaces an instance with a new one. func (c *InstanceServiceClient) InstanceReplace(dataEngine, name, instanceType, binary string, portCount int, args, portArgs []string, terminateSignal string) (*api.Instance, error) { if name == "" || binary == "" { return nil, fmt.Errorf("failed to replace instance: missing required parameter") @@ -320,6 +343,120 @@ func (c *InstanceServiceClient) InstanceReplace(dataEngine, name, instanceType, return api.RPCToInstance(p), nil } +// InstanceSuspend suspends an instance. +func (c *InstanceServiceClient) InstanceSuspend(dataEngine, name, instanceType string) error { + if name == "" { + return fmt.Errorf("failed to suspend instance: missing required parameter name") + } + + driver, ok := rpc.DataEngine_value[getDataEngine(dataEngine)] + if !ok { + return fmt.Errorf("failed to suspend instance: invalid data engine %v", dataEngine) + } + + client := c.getControllerServiceClient() + ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout) + defer cancel() + + _, err := client.InstanceSuspend(ctx, &rpc.InstanceSuspendRequest{ + Name: name, + Type: instanceType, + DataEngine: rpc.DataEngine(driver), + }) + if err != nil { + return errors.Wrapf(err, "failed to suspend instance %v", name) + } + + return nil +} + +// InstanceResume suspends an instance. +func (c *InstanceServiceClient) InstanceResume(dataEngine, name, instanceType string) error { + if name == "" { + return fmt.Errorf("failed to resume instance: missing required parameter name") + } + + driver, ok := rpc.DataEngine_value[getDataEngine(dataEngine)] + if !ok { + return fmt.Errorf("failed to resume instance: invalid data engine %v", dataEngine) + } + + client := c.getControllerServiceClient() + ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout) + defer cancel() + + _, err := client.InstanceResume(ctx, &rpc.InstanceResumeRequest{ + Name: name, + Type: instanceType, + DataEngine: rpc.DataEngine(driver), + }) + if err != nil { + return errors.Wrapf(err, "failed to resume instance %v", name) + } + + return nil +} + +// InstanceSwitchOverTarget switches over the target for an instance. +func (c *InstanceServiceClient) InstanceSwitchOverTarget(dataEngine, name, instanceType, targetAddress string) error { + if name == "" { + return fmt.Errorf("failed to switch over target for instance: missing required parameter name") + } + + if targetAddress == "" { + return fmt.Errorf("failed to switch over target for instance: missing required parameter target address") + } + + driver, ok := rpc.DataEngine_value[getDataEngine(dataEngine)] + if !ok { + return fmt.Errorf("failed to switch over target instance: invalid data engine %v", dataEngine) + } + + client := c.getControllerServiceClient() + ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout) + defer cancel() + + _, err := client.InstanceSwitchOverTarget(ctx, &rpc.InstanceSwitchOverTargetRequest{ + Name: name, + Type: instanceType, + DataEngine: rpc.DataEngine(driver), + TargetAddress: targetAddress, + }) + if err != nil { + return errors.Wrapf(err, "failed to switch over target for instance %v", name) + } + + return nil +} + +// InstanceDeleteTarget delete target for an instance. +func (c *InstanceServiceClient) InstanceDeleteTarget(dataEngine, name, instanceType string) error { + if name == "" { + return fmt.Errorf("failed to delete target for instance: missing required parameter name") + } + + driver, ok := rpc.DataEngine_value[getDataEngine(dataEngine)] + if !ok { + return fmt.Errorf("failed to delete target instance: invalid data engine %v", dataEngine) + } + + client := c.getControllerServiceClient() + ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout) + defer cancel() + + _, err := client.InstanceDeleteTarget(ctx, &rpc.InstanceDeleteTargetRequest{ + Name: name, + Type: instanceType, + DataEngine: rpc.DataEngine(driver), + }) + if err != nil { + return errors.Wrapf(err, "failed to delete target for instance %v", name) + } + + return nil +} + +// InstanceResume resumes an instance. func (c *InstanceServiceClient) VersionGet() (*meta.VersionOutput, error) { client := c.getControllerServiceClient() ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout) @@ -343,6 +480,7 @@ func (c *InstanceServiceClient) VersionGet() (*meta.VersionOutput, error) { }, nil } +// LogSetLevel sets the log level of the service. func (c *InstanceServiceClient) LogSetLevel(dataEngine, service, level string) error { client := c.getControllerServiceClient() ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout) @@ -360,6 +498,7 @@ func (c *InstanceServiceClient) LogSetLevel(dataEngine, service, level string) e return err } +// LogSetFlags sets the log flags of the service.x func (c *InstanceServiceClient) LogSetFlags(dataEngine, service, flags string) error { client := c.getControllerServiceClient() ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout) @@ -377,6 +516,7 @@ func (c *InstanceServiceClient) LogSetFlags(dataEngine, service, flags string) e return err } +// LogGetLevel returns the log level of the service. func (c *InstanceServiceClient) LogGetLevel(dataEngine, service string) (string, error) { client := c.getControllerServiceClient() ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout) @@ -396,6 +536,7 @@ func (c *InstanceServiceClient) LogGetLevel(dataEngine, service string) (string, return resp.Level, nil } +// LogGetFlags returns the log flags of the service. func (c *InstanceServiceClient) LogGetFlags(dataEngine, service string) (string, error) { client := c.getControllerServiceClient() ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout) diff --git a/pkg/instance/instance.go b/pkg/instance/instance.go index e5cf2a4ea..8dab107a8 100644 --- a/pkg/instance/instance.go +++ b/pkg/instance/instance.go @@ -35,6 +35,10 @@ type InstanceOps interface { InstanceList(map[string]*rpc.InstanceResponse) error InstanceReplace(*rpc.InstanceReplaceRequest) (*rpc.InstanceResponse, error) InstanceLog(*rpc.InstanceLogRequest, rpc.InstanceService_InstanceLogServer) error + InstanceSuspend(*rpc.InstanceSuspendRequest) (*emptypb.Empty, error) + InstanceResume(*rpc.InstanceResumeRequest) (*emptypb.Empty, error) + InstanceSwitchOverTarget(*rpc.InstanceSwitchOverTargetRequest) (*emptypb.Empty, error) + InstanceDeleteTarget(*rpc.InstanceDeleteTargetRequest) (*emptypb.Empty, error) LogSetLevel(context.Context, *rpc.LogSetLevelRequest) (*emptypb.Empty, error) LogSetFlags(context.Context, *rpc.LogSetFlagsRequest) (*emptypb.Empty, error) @@ -107,9 +111,10 @@ func (s *Server) VersionGet(ctx context.Context, req *emptypb.Empty) (*rpc.Versi func (s *Server) InstanceCreate(ctx context.Context, req *rpc.InstanceCreateRequest) (*rpc.InstanceResponse, error) { logrus.WithFields(logrus.Fields{ - "name": req.Spec.Name, - "type": req.Spec.Type, - "dataEngine": req.Spec.DataEngine, + "name": req.Spec.Name, + "type": req.Spec.Type, + "dataEngine": req.Spec.DataEngine, + "upgradeRequired": req.Spec.UpgradeRequired, }).Info("Creating instance") ops, ok := s.ops[req.Spec.DataEngine] @@ -148,7 +153,8 @@ func (ops V2DataEngineInstanceOps) InstanceCreate(req *rpc.InstanceCreateRequest switch req.Spec.Type { case types.InstanceTypeEngine: - engine, err := c.EngineCreate(req.Spec.Name, req.Spec.VolumeName, req.Spec.SpdkInstanceSpec.Frontend, req.Spec.SpdkInstanceSpec.Size, req.Spec.SpdkInstanceSpec.ReplicaAddressMap, req.Spec.PortCount) + engine, err := c.EngineCreate(req.Spec.Name, req.Spec.VolumeName, req.Spec.SpdkInstanceSpec.Frontend, req.Spec.SpdkInstanceSpec.Size, req.Spec.SpdkInstanceSpec.ReplicaAddressMap, + req.Spec.PortCount, req.Spec.InitiatorAddress, req.Spec.TargetAddress, req.Spec.UpgradeRequired) if err != nil { return nil, err } @@ -511,7 +517,11 @@ func (s *Server) InstanceWatch(req *emptypb.Empty, srv rpc.InstanceService_Insta // Close the clients for closing streams and unblocking notifier Recv() with error. done <- struct{}{} }() - return s.handleNotify(ctx, notifyChan, srv) + err := s.handleNotify(ctx, notifyChan, srv) + if err != nil { + logrus.WithError(err).Error("Failed to handle notify") + } + return err }) g.Go(func() error { @@ -702,11 +712,170 @@ func engineResponseToInstanceResponse(e *spdkapi.Engine) *rpc.InstanceResponse { DataEngine: rpc.DataEngine_DATA_ENGINE_V2, }, Status: &rpc.InstanceStatus{ - State: e.State, - ErrorMsg: e.ErrorMsg, - PortStart: e.Port, - PortEnd: e.Port, - Conditions: make(map[string]bool), + State: e.State, + ErrorMsg: e.ErrorMsg, + PortStart: e.Port, + PortEnd: e.Port, + TargetPortStart: e.TargetPort, + TargetPortEnd: e.TargetPort, + Conditions: make(map[string]bool), }, } } + +func (s *Server) InstanceSuspend(ctx context.Context, req *rpc.InstanceSuspendRequest) (*emptypb.Empty, error) { + logrus.WithFields(logrus.Fields{ + "name": req.Name, + "type": req.Type, + "dataEngine": req.DataEngine, + }).Info("Suspending instance") + + ops, ok := s.ops[req.DataEngine] + if !ok { + return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "unsupported data engine %v", req.DataEngine) + } + return ops.InstanceSuspend(req) +} + +func (ops V1DataEngineInstanceOps) InstanceSuspend(req *rpc.InstanceSuspendRequest) (*emptypb.Empty, error) { + return nil, grpcstatus.Error(grpccodes.Unimplemented, "v1 data engine instance suspend is not supported") +} + +func (ops V2DataEngineInstanceOps) InstanceSuspend(req *rpc.InstanceSuspendRequest) (*emptypb.Empty, error) { + c, err := spdkclient.NewSPDKClient(ops.spdkServiceAddress) + if err != nil { + return nil, grpcstatus.Error(grpccodes.Internal, errors.Wrapf(err, "failed to create SPDK client").Error()) + } + defer c.Close() + + switch req.Type { + case types.InstanceTypeEngine: + err := c.EngineSuspend(req.Name) + if err != nil { + return nil, grpcstatus.Error(grpccodes.Internal, errors.Wrapf(err, "failed to suspend engine %v", req.Name).Error()) + } + return &emptypb.Empty{}, nil + case types.InstanceTypeReplica: + return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "suspend is not supported for instance type %v", req.Type) + default: + return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "unknown instance type %v", req.Type) + } +} + +func (s *Server) InstanceResume(ctx context.Context, req *rpc.InstanceResumeRequest) (*emptypb.Empty, error) { + logrus.WithFields(logrus.Fields{ + "name": req.Name, + "type": req.Type, + "dataEngine": req.DataEngine, + }).Info("Resuming instance") + + ops, ok := s.ops[req.DataEngine] + if !ok { + return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "unsupported data engine %v", req.DataEngine) + } + return ops.InstanceResume(req) +} + +func (ops V1DataEngineInstanceOps) InstanceResume(req *rpc.InstanceResumeRequest) (*emptypb.Empty, error) { + return nil, grpcstatus.Error(grpccodes.Unimplemented, "v1 data engine instance resume is not supported") +} + +func (ops V2DataEngineInstanceOps) InstanceResume(req *rpc.InstanceResumeRequest) (*emptypb.Empty, error) { + c, err := spdkclient.NewSPDKClient(ops.spdkServiceAddress) + if err != nil { + return nil, grpcstatus.Error(grpccodes.Internal, errors.Wrapf(err, "failed to create SPDK client").Error()) + } + defer c.Close() + + switch req.Type { + case types.InstanceTypeEngine: + err := c.EngineResume(req.Name) + if err != nil { + return nil, grpcstatus.Error(grpccodes.Internal, errors.Wrapf(err, "failed to resume engine %v", req.Name).Error()) + } + return &emptypb.Empty{}, nil + case types.InstanceTypeReplica: + return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "resume is not supported for instance type %v", req.Type) + default: + return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "unknown instance type %v", req.Type) + } +} + +func (s *Server) InstanceSwitchOverTarget(ctx context.Context, req *rpc.InstanceSwitchOverTargetRequest) (*emptypb.Empty, error) { + logrus.WithFields(logrus.Fields{ + "name": req.Name, + "type": req.Type, + "dataEngine": req.DataEngine, + "targetAddress": req.TargetAddress, + }).Info("Switching over target instance") + + ops, ok := s.ops[req.DataEngine] + if !ok { + return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "unsupported data engine %v", req.DataEngine) + } + return ops.InstanceSwitchOverTarget(req) +} + +func (ops V1DataEngineInstanceOps) InstanceSwitchOverTarget(req *rpc.InstanceSwitchOverTargetRequest) (*emptypb.Empty, error) { + return nil, grpcstatus.Error(grpccodes.Unimplemented, "v1 data engine instance target switch over is not supported") +} + +func (ops V2DataEngineInstanceOps) InstanceSwitchOverTarget(req *rpc.InstanceSwitchOverTargetRequest) (*emptypb.Empty, error) { + c, err := spdkclient.NewSPDKClient(ops.spdkServiceAddress) + if err != nil { + return nil, grpcstatus.Error(grpccodes.Internal, errors.Wrapf(err, "failed to create SPDK client").Error()) + } + defer c.Close() + + switch req.Type { + case types.InstanceTypeEngine: + err := c.EngineSwitchOverTarget(req.Name, req.TargetAddress) + if err != nil { + return nil, grpcstatus.Error(grpccodes.Internal, errors.Wrapf(err, "failed to switch over target for engine %v", req.Name).Error()) + } + return &emptypb.Empty{}, nil + case types.InstanceTypeReplica: + return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "target switchover is not supported for instance type %v", req.Type) + default: + return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "unknown instance type %v", req.Type) + } +} + +func (s *Server) InstanceDeleteTarget(ctx context.Context, req *rpc.InstanceDeleteTargetRequest) (*emptypb.Empty, error) { + logrus.WithFields(logrus.Fields{ + "name": req.Name, + "type": req.Type, + "dataEngine": req.DataEngine, + }).Info("Delete target") + + ops, ok := s.ops[req.DataEngine] + if !ok { + return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "unsupported data engine %v", req.DataEngine) + } + return ops.InstanceDeleteTarget(req) +} + +func (ops V1DataEngineInstanceOps) InstanceDeleteTarget(req *rpc.InstanceDeleteTargetRequest) (*emptypb.Empty, error) { + return nil, grpcstatus.Error(grpccodes.Unimplemented, "v1 data engine instance target delete is not supported") +} + +func (ops V2DataEngineInstanceOps) InstanceDeleteTarget(req *rpc.InstanceDeleteTargetRequest) (*emptypb.Empty, error) { + c, err := spdkclient.NewSPDKClient(ops.spdkServiceAddress) + if err != nil { + return nil, grpcstatus.Error(grpccodes.Internal, errors.Wrapf(err, "failed to create SPDK client").Error()) + } + defer c.Close() + + switch req.Type { + case types.InstanceTypeEngine: + err := c.EngineDeleteTarget(req.Name) + if err != nil { + return nil, grpcstatus.Error(grpccodes.Internal, errors.Wrapf(err, "failed to delete target for engine %v", req.Name).Error()) + } + return &emptypb.Empty{}, nil + case types.InstanceTypeReplica: + return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "target deletion is not supported for instance type %v", req.Type) + default: + return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "unknown instance type %v", req.Type) + } +}