diff --git a/app/cmd/start.go b/app/cmd/start.go index 7c33f89d2..94dfe0127 100644 --- a/app/cmd/start.go +++ b/app/cmd/start.go @@ -10,7 +10,6 @@ import ( "os" "os/signal" "path/filepath" - "regexp" "strconv" "strings" "syscall" @@ -26,10 +25,6 @@ import ( "google.golang.org/grpc/reflection" "k8s.io/mount-utils" - commonTypes "github.com/longhorn/go-common-libs/types" - helpernvme "github.com/longhorn/go-spdk-helper/pkg/nvme" - helpertypes "github.com/longhorn/go-spdk-helper/pkg/types" - helperutil "github.com/longhorn/go-spdk-helper/pkg/util" engineutil "github.com/longhorn/longhorn-engine/pkg/util" spdk "github.com/longhorn/longhorn-spdk-engine/pkg/spdk" spdkutil "github.com/longhorn/longhorn-spdk-engine/pkg/util" @@ -115,52 +110,6 @@ func cleanup(pm *process.Manager) { logrus.Errorf("Failed to clean up all processes for %s graceful shutdown", types.ProcessManagerGrpcService) } -func getVolumeNameFromNQN(input string) (string, error) { - // compile a regular expression that matches ${name} between : and -e- - re, err := regexp.Compile(`:(.*)-e-`) - if err != nil { - return "", err - } - // find the first submatch of the input string - submatch := re.FindStringSubmatch(input) - if len(submatch) < 2 { - return "", fmt.Errorf("no name found in input") - } - // return the second element of the submatch, which is ${name} - return submatch[1], nil -} - -func cleanupStaledNvmeAndDmDevices() error { - executor, err := helperutil.NewExecutor(commonTypes.ProcDirectory) - if err != nil { - return errors.Wrapf(err, "failed to create executor for cleaning up staled NVMe and dm devices") - } - - subsystems, err := helpernvme.GetSubsystems(executor) - if err != nil { - return errors.Wrapf(err, "failed to get NVMe subsystems") - } - for _, sys := range subsystems { - logrus.Infof("Found NVMe subsystem %+v", sys) - if strings.HasPrefix(sys.NQN, helpertypes.NQNPrefix) { - dmDeviceName, err := getVolumeNameFromNQN(sys.NQN) - if err != nil { - return errors.Wrapf(err, "failed to get volume name from NQN %v", sys.NQN) - } - logrus.Infof("Removing dm device %v", dmDeviceName) - if err := helperutil.DmsetupRemove(dmDeviceName, false, false, executor); err != nil { - logrus.WithError(err).Warnf("Failed to remove dm device %v, will continue the cleanup", dmDeviceName) - } - - logrus.Infof("Cleaning up NVMe subsystem %v: NQN %v", sys.Name, sys.NQN) - if err := helpernvme.DisconnectTarget(sys.NQN, executor); err != nil { - logrus.WithError(err).Warnf("Failed to disconnect NVMe subsystem %v: NQN %v, will continue the cleanup", sys.Name, sys.NQN) - } - } - } - return nil -} - func unfreezeFilesystems() error { // We do not need to switch to the host mount namespace to get mount points here. Usually, longhorn-engine runs in a // container that has / bind mounted to /host with at least HostToContainer (rslave) propagation. @@ -209,11 +158,7 @@ func start(c *cli.Context) (err error) { return err } - if spdkEnabled { - if err := cleanupStaledNvmeAndDmDevices(); err != nil { - return err - } - } else { + if !spdkEnabled { if err := unfreezeFilesystems(); err != nil { return err } diff --git a/go.mod b/go.mod index 9f735c516..7af7c79bb 100644 --- a/go.mod +++ b/go.mod @@ -7,9 +7,9 @@ require ( github.com/google/uuid v1.6.0 github.com/longhorn/backupstore v0.0.0-20240706152841-78e2c8892f4e github.com/longhorn/go-common-libs v0.0.0-20240707062002-b9354601827e - github.com/longhorn/go-spdk-helper v0.0.0-20240702033003-35e00d17218f + github.com/longhorn/go-spdk-helper v0.0.0-20240708060539-de887e9cc6db github.com/longhorn/longhorn-engine v1.7.0-dev.0.20240707085442-0bfac42c4aff - github.com/longhorn/longhorn-spdk-engine v0.0.0-20240704075534-f807c4a293c6 + github.com/longhorn/longhorn-spdk-engine v0.0.0-20240708130427-1b50687fc568 github.com/longhorn/types v0.0.0-20240706151541-33cb010c3544 github.com/pkg/errors v0.9.1 github.com/sirupsen/logrus v1.9.3 diff --git a/go.sum b/go.sum index 08c826f39..cb7d25b78 100644 --- a/go.sum +++ b/go.sum @@ -97,12 +97,12 @@ github.com/longhorn/go-common-libs v0.0.0-20240707062002-b9354601827e h1:0SiyvTu github.com/longhorn/go-common-libs v0.0.0-20240707062002-b9354601827e/go.mod h1:vX53A9KF4RHC1UTbEGouZHsZO6bwT3zk63l1hvwF5T8= github.com/longhorn/go-iscsi-helper v0.0.0-20240706152726-9dbb9c7bdf30 h1:tTHysoIMIUt53dRqLtIo7AH0RRMr2P5Jy8DAMxDISO8= github.com/longhorn/go-iscsi-helper v0.0.0-20240706152726-9dbb9c7bdf30/go.mod h1:Y4SlLQTYuOOMWlJNdFhTkM3MsXTmY/jkWj8jbuU7kOE= -github.com/longhorn/go-spdk-helper v0.0.0-20240702033003-35e00d17218f h1:CVv/UA0Jiq/w5p2Eb8d6KgwQr9TRMnpqkHZHQuZ0tIU= -github.com/longhorn/go-spdk-helper v0.0.0-20240702033003-35e00d17218f/go.mod h1:5dAfJz0UmWUqHgUtmAd/lFNA4yVbpLDgaMfin8UBv3w= +github.com/longhorn/go-spdk-helper v0.0.0-20240708060539-de887e9cc6db h1:wQRJNpZadsOD2w57Y3e0M0SnSp5IPIWFEF+1yoRzmrE= +github.com/longhorn/go-spdk-helper v0.0.0-20240708060539-de887e9cc6db/go.mod h1:BMhlxcEnrn0/jyO+9cMV1gWz2jkmeGFZLnGIqsH3lv0= github.com/longhorn/longhorn-engine v1.7.0-dev.0.20240707085442-0bfac42c4aff h1:w8z+IYOHtvWEW7jZniVDX1vJGcGr4DIt2UmHeSxzXKU= github.com/longhorn/longhorn-engine v1.7.0-dev.0.20240707085442-0bfac42c4aff/go.mod h1:u0TZ1221YusDYA+ExdVLjLid1Ps6JuJXgh9185l5D9Y= -github.com/longhorn/longhorn-spdk-engine v0.0.0-20240704075534-f807c4a293c6 h1:Q7wTQeHyraqMxyFQ36jB1a8z5pddakh/7LbsUr5qpW8= -github.com/longhorn/longhorn-spdk-engine v0.0.0-20240704075534-f807c4a293c6/go.mod h1:rGr1eYUryyGLxXGMFexCcj0VIFKetXr+0Balszp+raI= +github.com/longhorn/longhorn-spdk-engine v0.0.0-20240708130427-1b50687fc568 h1:FioK5fYJ2yXpVSaHWq13TvC6BMFcgaYWAYIjjuSYZ1c= +github.com/longhorn/longhorn-spdk-engine v0.0.0-20240708130427-1b50687fc568/go.mod h1:XFF4wVEdM10/7fStU+mM0tx8zXEFurmIWHZjFvQyv/A= github.com/longhorn/nsfilelock v0.0.0-20200723175406-fa7c83ad0003 h1:Jw9uANsGcHTxp6HcC++/vN17LfeuDmozHI2j6DoZf5E= github.com/longhorn/nsfilelock v0.0.0-20200723175406-fa7c83ad0003/go.mod h1:0CLeXlf59Lg6C0kjLSDf47ft73Dh37CwymYRKWwAn04= github.com/longhorn/sparse-tools v0.0.0-20240703010727-92451e38077a h1:+o63c0oh7ZNKeQdc0Hawfzz5vRa4LiDvLOtJYjegtnk= diff --git a/package/Dockerfile b/package/Dockerfile index 1f1c1f3d2..29ed6e2af 100644 --- a/package/Dockerfile +++ b/package/Dockerfile @@ -35,6 +35,13 @@ FROM registry.suse.com/bci/bci-base:15.6 AS cbuilder ARG ARCH=amd64 +ENV LIBLONGHORN_COMMIT_ID 53d1c063b95efc8d949b095bd4bf04637230265f +ENV TGT_COMMIT_ID 3a8bc4823b5390e046f7aa8231ed262c0365c42c +ENV SPDK_COMMIT_ID a6478cde7e0cff2fb09992868308a7387aa5202a +ENV LIBJSONC_COMMIT_ID b4c371fa0cbc4dcbaccc359ce9e957a22988fb34 +# nvme-cli 2.9.1 +ENV NVME_CLI_COMMIT_ID b340fd7dcf1aef76f8d46ab28bef3c170d310887 + RUN zypper -n addrepo --refresh https://download.opensuse.org/repositories/system:/snappy/SLE_15/system:snappy.repo && \ zypper -n addrepo --refresh https://download.opensuse.org/repositories/devel:libraries:c_c++/15.6/devel:libraries:c_c++.repo && \ zypper -n addrepo --refresh https://download.opensuse.org/repositories/devel:languages:python:Factory/15.6/devel:languages:python:Factory.repo && \ @@ -46,7 +53,6 @@ RUN zypper -n addrepo --refresh https://download.opensuse.org/repositories/syste RUN zypper -n install cmake gcc xsltproc docbook-xsl-stylesheets git python311 python311-pip patchelf fuse3 libfuse3-3 # Build liblonghorn -ENV LIBLONGHORN_COMMIT_ID 53d1c063b95efc8d949b095bd4bf04637230265f RUN cd /usr/src && \ git clone https://github.com/rancher/liblonghorn.git && \ cd liblonghorn && \ @@ -55,7 +61,6 @@ RUN cd /usr/src && \ make install # Build TGT -ENV TGT_COMMIT_ID 3a8bc4823b5390e046f7aa8231ed262c0365c42c RUN cd /usr/src && \ git clone https://github.com/rancher/tgt.git && \ cd tgt && \ @@ -63,9 +68,8 @@ RUN cd /usr/src && \ make; \ make install -# Build SPDK +# Build spdk ENV SPDK_DIR /usr/src/spdk -ENV SPDK_COMMIT_ID a6478cde7e0cff2fb09992868308a7387aa5202a RUN git clone https://github.com/longhorn/spdk.git ${SPDK_DIR} --recursive && \ cd ${SPDK_DIR} && \ git checkout ${SPDK_COMMIT_ID} && \ @@ -88,7 +92,6 @@ RUN git clone https://github.com/longhorn/spdk.git ${SPDK_DIR} --recursive && \ fi # Build libjson-c-devel -ENV LIBJSONC_COMMIT_ID b4c371fa0cbc4dcbaccc359ce9e957a22988fb34 RUN cd /usr/src && \ git clone https://github.com/json-c/json-c.git && \ cd json-c && \ @@ -99,9 +102,8 @@ RUN cd /usr/src && \ make && \ make install -# Build nvme-cli 2.9.1 +# Build nvme-cli ENV NVME_CLI_DIR /usr/src/nvme-cli -ENV NVME_CLI_COMMIT_ID b340fd7dcf1aef76f8d46ab28bef3c170d310887 RUN git clone https://github.com/linux-nvme/nvme-cli.git ${NVME_CLI_DIR} && \ cd ${NVME_CLI_DIR} && \ git checkout ${NVME_CLI_COMMIT_ID} && \ diff --git a/pkg/api/instance.go b/pkg/api/instance.go index c151db5eb..74752823a 100644 --- a/pkg/api/instance.go +++ b/pkg/api/instance.go @@ -64,20 +64,24 @@ func RPCToInstanceList(obj *rpc.InstanceListResponse) map[string]*Instance { } type InstanceStatus struct { - State string `json:"state"` - ErrorMsg string `json:"errorMsg"` - Conditions map[string]bool `json:"conditions"` - PortStart int32 `json:"portStart"` - PortEnd int32 `json:"portEnd"` + State string `json:"state"` + ErrorMsg string `json:"errorMsg"` + Conditions map[string]bool `json:"conditions"` + PortStart int32 `json:"portStart"` + PortEnd int32 `json:"portEnd"` + TargetPortStart int32 `json:"targetPortStart"` + TargetPortEnd int32 `json:"targetPortEnd"` } func RPCToInstanceStatus(obj *rpc.InstanceStatus) InstanceStatus { return InstanceStatus{ - State: obj.State, - ErrorMsg: obj.ErrorMsg, - Conditions: obj.Conditions, - PortStart: obj.PortStart, - PortEnd: obj.PortEnd, + State: obj.State, + ErrorMsg: obj.ErrorMsg, + Conditions: obj.Conditions, + PortStart: obj.PortStart, + PortEnd: obj.PortEnd, + TargetPortStart: obj.TargetPortStart, + TargetPortEnd: obj.TargetPortEnd, } } diff --git a/pkg/client/instance.go b/pkg/client/instance.go index 5367a566e..6b4f67083 100644 --- a/pkg/client/instance.go +++ b/pkg/client/instance.go @@ -88,6 +88,9 @@ func NewInstanceServiceClientWithTLS(ctx context.Context, ctxCancel context.Canc type EngineCreateRequest struct { ReplicaAddressMap map[string]string Frontend string + InitiatorAddress string + TargetAddress string + UpgradeRequired bool } type ReplicaCreateRequest struct { @@ -115,6 +118,7 @@ type InstanceCreateRequest struct { BackendStoreDriver string } +// InstanceCreate creates an instance. func (c *InstanceServiceClient) InstanceCreate(req *InstanceCreateRequest) (*api.Instance, error) { if req.Name == "" || req.InstanceType == "" { return nil, fmt.Errorf("failed to create instance: missing required parameter") @@ -169,6 +173,10 @@ func (c *InstanceServiceClient) InstanceCreate(req *InstanceCreateRequest) (*api ProcessInstanceSpec: processInstanceSpec, SpdkInstanceSpec: spdkInstanceSpec, + + UpgradeRequired: req.Engine.UpgradeRequired, + InitiatorAddress: req.Engine.InitiatorAddress, + TargetAddress: req.Engine.TargetAddress, }, }) if err != nil { @@ -178,6 +186,7 @@ func (c *InstanceServiceClient) InstanceCreate(req *InstanceCreateRequest) (*api return api.RPCToInstance(p), nil } +// InstanceDelete deletes the instance by name. func (c *InstanceServiceClient) InstanceDelete(dataEngine, name, instanceType, diskUUID string, cleanupRequired bool) (*api.Instance, error) { if name == "" { return nil, fmt.Errorf("failed to delete instance: missing required parameter name") @@ -207,6 +216,7 @@ func (c *InstanceServiceClient) InstanceDelete(dataEngine, name, instanceType, d return api.RPCToInstance(p), nil } +// InstanceGet returns the instance by name. func (c *InstanceServiceClient) InstanceGet(dataEngine, name, instanceType string) (*api.Instance, error) { if name == "" { return nil, fmt.Errorf("failed to get instance: missing required parameter name") @@ -246,6 +256,7 @@ func (c *InstanceServiceClient) InstanceList() (map[string]*api.Instance, error) return api.RPCToInstanceList(instances), nil } +// InstanceLog returns the log stream of an instance. func (c *InstanceServiceClient) InstanceLog(ctx context.Context, dataEngine, name, instanceType string) (*api.LogStream, error) { if name == "" { return nil, fmt.Errorf("failed to get instance: missing required parameter name") @@ -270,6 +281,7 @@ func (c *InstanceServiceClient) InstanceLog(ctx context.Context, dataEngine, nam return api.NewLogStream(stream), nil } +// InstanceWatch watches for instance updates. func (c *InstanceServiceClient) InstanceWatch(ctx context.Context) (*api.InstanceStream, error) { client := c.getControllerServiceClient() stream, err := client.InstanceWatch(ctx, &emptypb.Empty{}) @@ -280,6 +292,7 @@ func (c *InstanceServiceClient) InstanceWatch(ctx context.Context) (*api.Instanc return api.NewInstanceStream(stream), nil } +// InstanceReplace replaces an instance with a new one. func (c *InstanceServiceClient) InstanceReplace(dataEngine, name, instanceType, binary string, portCount int, args, portArgs []string, terminateSignal string) (*api.Instance, error) { if name == "" || binary == "" { return nil, fmt.Errorf("failed to replace instance: missing required parameter") @@ -320,6 +333,120 @@ func (c *InstanceServiceClient) InstanceReplace(dataEngine, name, instanceType, return api.RPCToInstance(p), nil } +// InstanceSuspend suspends an instance. +func (c *InstanceServiceClient) InstanceSuspend(dataEngine, name, instanceType string) error { + if name == "" { + return fmt.Errorf("failed to suspend instance: missing required parameter name") + } + + driver, ok := rpc.DataEngine_value[getDataEngine(dataEngine)] + if !ok { + return fmt.Errorf("failed to suspend instance: invalid data engine %v", dataEngine) + } + + client := c.getControllerServiceClient() + ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout) + defer cancel() + + _, err := client.InstanceSuspend(ctx, &rpc.InstanceSuspendRequest{ + Name: name, + Type: instanceType, + DataEngine: rpc.DataEngine(driver), + }) + if err != nil { + return errors.Wrapf(err, "failed to suspend instance %v", name) + } + + return nil +} + +// InstanceResume suspends an instance. +func (c *InstanceServiceClient) InstanceResume(dataEngine, name, instanceType string) error { + if name == "" { + return fmt.Errorf("failed to resume instance: missing required parameter name") + } + + driver, ok := rpc.DataEngine_value[getDataEngine(dataEngine)] + if !ok { + return fmt.Errorf("failed to resume instance: invalid data engine %v", dataEngine) + } + + client := c.getControllerServiceClient() + ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout) + defer cancel() + + _, err := client.InstanceResume(ctx, &rpc.InstanceResumeRequest{ + Name: name, + Type: instanceType, + DataEngine: rpc.DataEngine(driver), + }) + if err != nil { + return errors.Wrapf(err, "failed to resume instance %v", name) + } + + return nil +} + +// InstanceSwitchOverTarget switches over the target for an instance. +func (c *InstanceServiceClient) InstanceSwitchOverTarget(dataEngine, name, instanceType, targetAddress string) error { + if name == "" { + return fmt.Errorf("failed to switch over target for instance: missing required parameter name") + } + + if targetAddress == "" { + return fmt.Errorf("failed to switch over target for instance: missing required parameter target address") + } + + driver, ok := rpc.DataEngine_value[getDataEngine(dataEngine)] + if !ok { + return fmt.Errorf("failed to switch over target instance: invalid data engine %v", dataEngine) + } + + client := c.getControllerServiceClient() + ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout) + defer cancel() + + _, err := client.InstanceSwitchOverTarget(ctx, &rpc.InstanceSwitchOverTargetRequest{ + Name: name, + Type: instanceType, + DataEngine: rpc.DataEngine(driver), + TargetAddress: targetAddress, + }) + if err != nil { + return errors.Wrapf(err, "failed to switch over target for instance %v", name) + } + + return nil +} + +// InstanceDeleteTarget delete target for an instance. +func (c *InstanceServiceClient) InstanceDeleteTarget(dataEngine, name, instanceType string) error { + if name == "" { + return fmt.Errorf("failed to delete target for instance: missing required parameter name") + } + + driver, ok := rpc.DataEngine_value[getDataEngine(dataEngine)] + if !ok { + return fmt.Errorf("failed to delete target instance: invalid data engine %v", dataEngine) + } + + client := c.getControllerServiceClient() + ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout) + defer cancel() + + _, err := client.InstanceDeleteTarget(ctx, &rpc.InstanceDeleteTargetRequest{ + Name: name, + Type: instanceType, + DataEngine: rpc.DataEngine(driver), + }) + if err != nil { + return errors.Wrapf(err, "failed to delete target for instance %v", name) + } + + return nil +} + +// InstanceResume resumes an instance. func (c *InstanceServiceClient) VersionGet() (*meta.VersionOutput, error) { client := c.getControllerServiceClient() ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout) @@ -343,6 +470,7 @@ func (c *InstanceServiceClient) VersionGet() (*meta.VersionOutput, error) { }, nil } +// LogSetLevel sets the log level of the service. func (c *InstanceServiceClient) LogSetLevel(dataEngine, service, level string) error { client := c.getControllerServiceClient() ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout) @@ -360,6 +488,7 @@ func (c *InstanceServiceClient) LogSetLevel(dataEngine, service, level string) e return err } +// LogSetFlags sets the log flags of the service.x func (c *InstanceServiceClient) LogSetFlags(dataEngine, service, flags string) error { client := c.getControllerServiceClient() ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout) @@ -377,6 +506,7 @@ func (c *InstanceServiceClient) LogSetFlags(dataEngine, service, flags string) e return err } +// LogGetLevel returns the log level of the service. func (c *InstanceServiceClient) LogGetLevel(dataEngine, service string) (string, error) { client := c.getControllerServiceClient() ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout) @@ -396,6 +526,7 @@ func (c *InstanceServiceClient) LogGetLevel(dataEngine, service string) (string, return resp.Level, nil } +// LogGetFlags returns the log flags of the service. func (c *InstanceServiceClient) LogGetFlags(dataEngine, service string) (string, error) { client := c.getControllerServiceClient() ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout) diff --git a/pkg/instance/instance.go b/pkg/instance/instance.go index e5cf2a4ea..d467a91d8 100644 --- a/pkg/instance/instance.go +++ b/pkg/instance/instance.go @@ -35,6 +35,10 @@ type InstanceOps interface { InstanceList(map[string]*rpc.InstanceResponse) error InstanceReplace(*rpc.InstanceReplaceRequest) (*rpc.InstanceResponse, error) InstanceLog(*rpc.InstanceLogRequest, rpc.InstanceService_InstanceLogServer) error + InstanceSuspend(*rpc.InstanceSuspendRequest) (*emptypb.Empty, error) + InstanceResume(*rpc.InstanceResumeRequest) (*emptypb.Empty, error) + InstanceSwitchOverTarget(*rpc.InstanceSwitchOverTargetRequest) (*emptypb.Empty, error) + InstanceDeleteTarget(*rpc.InstanceDeleteTargetRequest) (*emptypb.Empty, error) LogSetLevel(context.Context, *rpc.LogSetLevelRequest) (*emptypb.Empty, error) LogSetFlags(context.Context, *rpc.LogSetFlagsRequest) (*emptypb.Empty, error) @@ -107,9 +111,10 @@ func (s *Server) VersionGet(ctx context.Context, req *emptypb.Empty) (*rpc.Versi func (s *Server) InstanceCreate(ctx context.Context, req *rpc.InstanceCreateRequest) (*rpc.InstanceResponse, error) { logrus.WithFields(logrus.Fields{ - "name": req.Spec.Name, - "type": req.Spec.Type, - "dataEngine": req.Spec.DataEngine, + "name": req.Spec.Name, + "type": req.Spec.Type, + "dataEngine": req.Spec.DataEngine, + "upgradeRequired": req.Spec.UpgradeRequired, }).Info("Creating instance") ops, ok := s.ops[req.Spec.DataEngine] @@ -148,7 +153,8 @@ func (ops V2DataEngineInstanceOps) InstanceCreate(req *rpc.InstanceCreateRequest switch req.Spec.Type { case types.InstanceTypeEngine: - engine, err := c.EngineCreate(req.Spec.Name, req.Spec.VolumeName, req.Spec.SpdkInstanceSpec.Frontend, req.Spec.SpdkInstanceSpec.Size, req.Spec.SpdkInstanceSpec.ReplicaAddressMap, req.Spec.PortCount) + engine, err := c.EngineCreate(req.Spec.Name, req.Spec.VolumeName, req.Spec.SpdkInstanceSpec.Frontend, req.Spec.SpdkInstanceSpec.Size, req.Spec.SpdkInstanceSpec.ReplicaAddressMap, + req.Spec.PortCount, req.Spec.InitiatorAddress, req.Spec.TargetAddress, req.Spec.UpgradeRequired) if err != nil { return nil, err } @@ -511,7 +517,11 @@ func (s *Server) InstanceWatch(req *emptypb.Empty, srv rpc.InstanceService_Insta // Close the clients for closing streams and unblocking notifier Recv() with error. done <- struct{}{} }() - return s.handleNotify(ctx, notifyChan, srv) + err := s.handleNotify(ctx, notifyChan, srv) + if err != nil { + logrus.WithError(err).Error("Failed to handle notify") + } + return err }) g.Go(func() error { @@ -648,6 +658,14 @@ func (s *Server) watchProcess(ctx context.Context, req *emptypb.Empty, client *c } func processResponseToInstanceResponse(p *rpc.ProcessResponse, processType string) *rpc.InstanceResponse { + // v1 data engine doesn't support the separation of initiator and target, so + // initiator and target are always on the same node. + targetPortStart := int32(0) + targetPortEnd := int32(0) + if processType == types.InstanceTypeEngine { + targetPortStart = p.Status.PortStart + targetPortEnd = p.Status.PortEnd + } return &rpc.InstanceResponse{ Spec: &rpc.InstanceSpec{ Name: p.Spec.Name, @@ -663,11 +681,13 @@ func processResponseToInstanceResponse(p *rpc.ProcessResponse, processType strin PortArgs: p.Spec.PortArgs, }, Status: &rpc.InstanceStatus{ - State: p.Status.State, - PortStart: p.Status.PortStart, - PortEnd: p.Status.PortEnd, - ErrorMsg: p.Status.ErrorMsg, - Conditions: p.Status.Conditions, + State: p.Status.State, + PortStart: p.Status.PortStart, + PortEnd: p.Status.PortEnd, + TargetPortStart: targetPortStart, + TargetPortEnd: targetPortEnd, + ErrorMsg: p.Status.ErrorMsg, + Conditions: p.Status.Conditions, }, Deleted: p.Deleted, } @@ -702,11 +722,170 @@ func engineResponseToInstanceResponse(e *spdkapi.Engine) *rpc.InstanceResponse { DataEngine: rpc.DataEngine_DATA_ENGINE_V2, }, Status: &rpc.InstanceStatus{ - State: e.State, - ErrorMsg: e.ErrorMsg, - PortStart: e.Port, - PortEnd: e.Port, - Conditions: make(map[string]bool), + State: e.State, + ErrorMsg: e.ErrorMsg, + PortStart: e.Port, + PortEnd: e.Port, + TargetPortStart: e.TargetPort, + TargetPortEnd: e.TargetPort, + Conditions: make(map[string]bool), }, } } + +func (s *Server) InstanceSuspend(ctx context.Context, req *rpc.InstanceSuspendRequest) (*emptypb.Empty, error) { + logrus.WithFields(logrus.Fields{ + "name": req.Name, + "type": req.Type, + "dataEngine": req.DataEngine, + }).Info("Suspending instance") + + ops, ok := s.ops[req.DataEngine] + if !ok { + return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "unsupported data engine %v", req.DataEngine) + } + return ops.InstanceSuspend(req) +} + +func (ops V1DataEngineInstanceOps) InstanceSuspend(req *rpc.InstanceSuspendRequest) (*emptypb.Empty, error) { + return nil, grpcstatus.Error(grpccodes.Unimplemented, "v1 data engine instance suspend is not supported") +} + +func (ops V2DataEngineInstanceOps) InstanceSuspend(req *rpc.InstanceSuspendRequest) (*emptypb.Empty, error) { + c, err := spdkclient.NewSPDKClient(ops.spdkServiceAddress) + if err != nil { + return nil, grpcstatus.Error(grpccodes.Internal, errors.Wrapf(err, "failed to create SPDK client").Error()) + } + defer c.Close() + + switch req.Type { + case types.InstanceTypeEngine: + err := c.EngineSuspend(req.Name) + if err != nil { + return nil, grpcstatus.Error(grpccodes.Internal, errors.Wrapf(err, "failed to suspend engine %v", req.Name).Error()) + } + return &emptypb.Empty{}, nil + case types.InstanceTypeReplica: + return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "suspend is not supported for instance type %v", req.Type) + default: + return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "unknown instance type %v", req.Type) + } +} + +func (s *Server) InstanceResume(ctx context.Context, req *rpc.InstanceResumeRequest) (*emptypb.Empty, error) { + logrus.WithFields(logrus.Fields{ + "name": req.Name, + "type": req.Type, + "dataEngine": req.DataEngine, + }).Info("Resuming instance") + + ops, ok := s.ops[req.DataEngine] + if !ok { + return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "unsupported data engine %v", req.DataEngine) + } + return ops.InstanceResume(req) +} + +func (ops V1DataEngineInstanceOps) InstanceResume(req *rpc.InstanceResumeRequest) (*emptypb.Empty, error) { + return nil, grpcstatus.Error(grpccodes.Unimplemented, "v1 data engine instance resume is not supported") +} + +func (ops V2DataEngineInstanceOps) InstanceResume(req *rpc.InstanceResumeRequest) (*emptypb.Empty, error) { + c, err := spdkclient.NewSPDKClient(ops.spdkServiceAddress) + if err != nil { + return nil, grpcstatus.Error(grpccodes.Internal, errors.Wrapf(err, "failed to create SPDK client").Error()) + } + defer c.Close() + + switch req.Type { + case types.InstanceTypeEngine: + err := c.EngineResume(req.Name) + if err != nil { + return nil, grpcstatus.Error(grpccodes.Internal, errors.Wrapf(err, "failed to resume engine %v", req.Name).Error()) + } + return &emptypb.Empty{}, nil + case types.InstanceTypeReplica: + return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "resume is not supported for instance type %v", req.Type) + default: + return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "unknown instance type %v", req.Type) + } +} + +func (s *Server) InstanceSwitchOverTarget(ctx context.Context, req *rpc.InstanceSwitchOverTargetRequest) (*emptypb.Empty, error) { + logrus.WithFields(logrus.Fields{ + "name": req.Name, + "type": req.Type, + "dataEngine": req.DataEngine, + "targetAddress": req.TargetAddress, + }).Info("Switching over target instance") + + ops, ok := s.ops[req.DataEngine] + if !ok { + return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "unsupported data engine %v", req.DataEngine) + } + return ops.InstanceSwitchOverTarget(req) +} + +func (ops V1DataEngineInstanceOps) InstanceSwitchOverTarget(req *rpc.InstanceSwitchOverTargetRequest) (*emptypb.Empty, error) { + return nil, grpcstatus.Error(grpccodes.Unimplemented, "v1 data engine instance target switch over is not supported") +} + +func (ops V2DataEngineInstanceOps) InstanceSwitchOverTarget(req *rpc.InstanceSwitchOverTargetRequest) (*emptypb.Empty, error) { + c, err := spdkclient.NewSPDKClient(ops.spdkServiceAddress) + if err != nil { + return nil, grpcstatus.Error(grpccodes.Internal, errors.Wrapf(err, "failed to create SPDK client").Error()) + } + defer c.Close() + + switch req.Type { + case types.InstanceTypeEngine: + err := c.EngineSwitchOverTarget(req.Name, req.TargetAddress) + if err != nil { + return nil, grpcstatus.Error(grpccodes.Internal, errors.Wrapf(err, "failed to switch over target for engine %v", req.Name).Error()) + } + return &emptypb.Empty{}, nil + case types.InstanceTypeReplica: + return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "target switch over is not supported for instance type %v", req.Type) + default: + return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "unknown instance type %v", req.Type) + } +} + +func (s *Server) InstanceDeleteTarget(ctx context.Context, req *rpc.InstanceDeleteTargetRequest) (*emptypb.Empty, error) { + logrus.WithFields(logrus.Fields{ + "name": req.Name, + "type": req.Type, + "dataEngine": req.DataEngine, + }).Info("Deleting target") + + ops, ok := s.ops[req.DataEngine] + if !ok { + return nil, grpcstatus.Errorf(grpccodes.Unimplemented, "unsupported data engine %v", req.DataEngine) + } + return ops.InstanceDeleteTarget(req) +} + +func (ops V1DataEngineInstanceOps) InstanceDeleteTarget(req *rpc.InstanceDeleteTargetRequest) (*emptypb.Empty, error) { + return nil, grpcstatus.Error(grpccodes.Unimplemented, "v1 data engine instance target delete is not supported") +} + +func (ops V2DataEngineInstanceOps) InstanceDeleteTarget(req *rpc.InstanceDeleteTargetRequest) (*emptypb.Empty, error) { + c, err := spdkclient.NewSPDKClient(ops.spdkServiceAddress) + if err != nil { + return nil, grpcstatus.Error(grpccodes.Internal, errors.Wrapf(err, "failed to create SPDK client").Error()) + } + defer c.Close() + + switch req.Type { + case types.InstanceTypeEngine: + err := c.EngineDeleteTarget(req.Name) + if err != nil { + return nil, grpcstatus.Error(grpccodes.Internal, errors.Wrapf(err, "failed to delete target for engine %v", req.Name).Error()) + } + return &emptypb.Empty{}, nil + case types.InstanceTypeReplica: + return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "target deletion is not supported for instance type %v", req.Type) + default: + return nil, grpcstatus.Errorf(grpccodes.InvalidArgument, "unknown instance type %v", req.Type) + } +} diff --git a/vendor/github.com/longhorn/go-spdk-helper/pkg/nvme/initiator.go b/vendor/github.com/longhorn/go-spdk-helper/pkg/nvme/initiator.go index f387e93c8..75ab7d135 100644 --- a/vendor/github.com/longhorn/go-spdk-helper/pkg/nvme/initiator.go +++ b/vendor/github.com/longhorn/go-spdk-helper/pkg/nvme/initiator.go @@ -22,10 +22,11 @@ const ( LockFile = "/var/run/longhorn-spdk.lock" LockTimeout = 120 * time.Second - RetryCounts = 5 - RetryInterval = 3 * time.Second + maxNumRetries = 15 + retryInterval = 1 * time.Second - waitDeviceTimeout = 60 * time.Second + maxNumWaitDeviceRetries = 60 + waitDeviceInterval = 1 * time.Second HostProc = "/host/proc" @@ -42,7 +43,7 @@ type Initiator struct { Endpoint string ControllerName string NamespaceName string - dev *util.KernelDevice + dev *util.LonghornBlockDevice isUp bool hostProc string @@ -51,6 +52,7 @@ type Initiator struct { logger logrus.FieldLogger } +// NewInitiator creates a new NVMe initiator func NewInitiator(name, subsystemNQN, hostProc string) (*Initiator, error) { if name == "" || subsystemNQN == "" { return nil, fmt.Errorf("empty name or subsystem for initiator creation") @@ -78,6 +80,87 @@ func NewInitiator(name, subsystemNQN, hostProc string) (*Initiator, error) { }, nil } +// DiscoverTarget discovers a target +func (i *Initiator) DiscoverTarget(ip, port string) (string, error) { + if i.hostProc != "" { + lock := nsfilelock.NewLockWithTimeout(util.GetHostNamespacePath(i.hostProc), LockFile, LockTimeout) + if err := lock.Lock(); err != nil { + return "", errors.Wrapf(err, "failed to get file lock for initiator %s", i.Name) + } + defer lock.Unlock() + } + + return DiscoverTarget(ip, port, i.executor) +} + +// ConnectTarget connects to a target +func (i *Initiator) ConnectTarget(ip, port, nqn string) (string, error) { + if i.hostProc != "" { + lock := nsfilelock.NewLockWithTimeout(util.GetHostNamespacePath(i.hostProc), LockFile, LockTimeout) + if err := lock.Lock(); err != nil { + return "", errors.Wrapf(err, "failed to get file lock for initiator %s", i.Name) + } + defer lock.Unlock() + } + + return ConnectTarget(ip, port, nqn, i.executor) +} + +// DisconnectTarget disconnects a target +func (i *Initiator) DisconnectTarget() error { + if i.hostProc != "" { + lock := nsfilelock.NewLockWithTimeout(util.GetHostNamespacePath(i.hostProc), LockFile, LockTimeout) + if err := lock.Lock(); err != nil { + return errors.Wrapf(err, "failed to get file lock for initiator %s", i.Name) + } + defer lock.Unlock() + } + + return DisconnectTarget(i.SubsystemNQN, i.executor) +} + +// WaitForConnect waits for the NVMe initiator to connect +func (i *Initiator) WaitForConnect(maxNumRetries int, retryInterval time.Duration) (err error) { + if i.hostProc != "" { + lock := nsfilelock.NewLockWithTimeout(util.GetHostNamespacePath(i.hostProc), LockFile, LockTimeout) + if err := lock.Lock(); err != nil { + return errors.Wrapf(err, "failed to get file lock for initiator %s", i.Name) + } + defer lock.Unlock() + } + + for r := 0; r < maxNumRetries; r++ { + err = i.loadNVMeDeviceInfoWithoutLock(i.TransportAddress, i.TransportServiceID, i.SubsystemNQN) + if err == nil { + return nil + } + time.Sleep(retryInterval) + } + + return err +} + +// WaitForDisconnect waits for the NVMe initiator to disconnect +func (i *Initiator) WaitForDisconnect(maxNumRetries int, retryInterval time.Duration) (err error) { + if i.hostProc != "" { + lock := nsfilelock.NewLockWithTimeout(util.GetHostNamespacePath(i.hostProc), LockFile, LockTimeout) + if err := lock.Lock(); err != nil { + return errors.Wrapf(err, "failed to get file lock for initiator %s", i.Name) + } + defer lock.Unlock() + } + + for r := 0; r < maxNumRetries; r++ { + err = i.loadNVMeDeviceInfoWithoutLock(i.TransportAddress, i.TransportServiceID, i.SubsystemNQN) + if IsValidNvmeDeviceNotFound(err) { + return nil + } + time.Sleep(retryInterval) + } + + return err +} + // Suspend suspends the device mapper device for the NVMe initiator func (i *Initiator) Suspend(noflush, nolockfs bool) error { if i.hostProc != "" { @@ -88,16 +171,53 @@ func (i *Initiator) Suspend(noflush, nolockfs bool) error { defer lock.Unlock() } - if err := i.suspendLinearDmDevice(noflush, nolockfs); err != nil { - return errors.Wrapf(err, "failed to suspend device mapper device for NVMe initiator %s", i.Name) + suspended, err := i.IsSuspended() + if err != nil { + return errors.Wrapf(err, "failed to check if linear dm device is suspended for NVMe initiator %s", i.Name) + } + + if !suspended { + if err := i.suspendLinearDmDevice(noflush, nolockfs); err != nil { + return errors.Wrapf(err, "failed to suspend device mapper device for NVMe initiator %s", i.Name) + } + } + + return nil +} + +// Resume resumes the device mapper device for the NVMe initiator +func (i *Initiator) Resume() error { + if i.hostProc != "" { + lock := nsfilelock.NewLockWithTimeout(util.GetHostNamespacePath(i.hostProc), LockFile, LockTimeout) + if err := lock.Lock(); err != nil { + return errors.Wrapf(err, "failed to get file lock for initiator %s", i.Name) + } + defer lock.Unlock() + } + + if err := i.resumeLinearDmDevice(); err != nil { + return errors.Wrapf(err, "failed to resume device mapper device for NVMe initiator %s", i.Name) } return nil } +func (i *Initiator) resumeLinearDmDevice() error { + logrus.Infof("Resuming linear dm device %s", i.Name) + + return util.DmsetupResume(i.Name, i.executor) +} + func (i *Initiator) replaceDmDeviceTarget() error { - if err := i.suspendLinearDmDevice(true, false); err != nil { - return errors.Wrapf(err, "failed to suspend linear dm device for NVMe initiator %s", i.Name) + suspended, err := i.IsSuspended() + if err != nil { + return errors.Wrapf(err, "failed to check if linear dm device is suspended for NVMe initiator %s", i.Name) + } + + if !suspended { + if err := i.suspendLinearDmDevice(true, false); err != nil { + return errors.Wrapf(err, "failed to suspend linear dm device for NVMe initiator %s", i.Name) + } } if err := i.reloadLinearDmDevice(); err != nil { @@ -111,13 +231,21 @@ func (i *Initiator) replaceDmDeviceTarget() error { } // Start starts the NVMe initiator with the given transportAddress and transportServiceID -func (i *Initiator) Start(transportAddress, transportServiceID string, needDmDeviceCleanup bool) (dmDeviceBusy bool, err error) { +func (i *Initiator) Start(transportAddress, transportServiceID string, dmDeviceCleanupRequired bool) (dmDeviceBusy bool, err error) { defer func() { if err != nil { err = errors.Wrapf(err, "failed to start NVMe initiator %s", i.Name) } }() + i.logger.WithFields(logrus.Fields{ + "transportAddress": transportAddress, + "transportServiceID": transportServiceID, + "dmDeviceCleanupRequired": dmDeviceCleanupRequired, + }) + + i.logger.Info("Starting initiator") + if transportAddress == "" || transportServiceID == "" { return false, fmt.Errorf("invalid TransportAddress %s and TransportServiceID %s for initiator %s start", transportAddress, transportServiceID, i.Name) } @@ -131,12 +259,8 @@ func (i *Initiator) Start(transportAddress, transportServiceID string, needDmDev } // Check if the initiator/NVMe device is already launched and matches the params - if err := i.loadNVMeDeviceInfoWithoutLock(); err == nil { + if err := i.loadNVMeDeviceInfoWithoutLock(i.TransportAddress, i.TransportServiceID, i.SubsystemNQN); err == nil { if i.TransportAddress == transportAddress && i.TransportServiceID == transportServiceID { - i.logger.WithFields(logrus.Fields{ - "transportAddress": transportAddress, - "transportServiceID": transportServiceID, - }) if err = i.LoadEndpoint(false); err == nil { i.logger.Info("NVMe initiator is already launched with correct params") return false, nil @@ -149,7 +273,7 @@ func (i *Initiator) Start(transportAddress, transportServiceID string, needDmDev } i.logger.Infof("Stopping NVMe initiator blindly before starting") - dmDeviceBusy, err = i.stopWithoutLock(needDmDeviceCleanup, false, false) + dmDeviceBusy, err = i.stopWithoutLock(dmDeviceCleanupRequired, false, false) if err != nil { return dmDeviceBusy, errors.Wrapf(err, "failed to stop the mismatching NVMe initiator %s before starting", i.Name) } @@ -161,42 +285,48 @@ func (i *Initiator) Start(transportAddress, transportServiceID string, needDmDev i.logger.Infof("Launching NVMe initiator") // Setup initiator - for counter := 0; counter < RetryCounts; counter++ { + for r := 0; r < maxNumRetries; r++ { // Rerun this API for a discovered target should be fine - if i.SubsystemNQN, err = DiscoverTarget(transportAddress, transportServiceID, i.executor); err != nil { - i.logger.WithError(err).Warnf("Failed to discover") - time.Sleep(RetryInterval) + subsystemNQN, err := DiscoverTarget(transportAddress, transportServiceID, i.executor) + if err != nil { + i.logger.WithError(err).Warn("Failed to discover target") + time.Sleep(retryInterval) continue } - if i.ControllerName, err = ConnectTarget(transportAddress, transportServiceID, i.SubsystemNQN, i.executor); err != nil { - i.logger.WithError(err).Warnf("Failed to connect target") - time.Sleep(RetryInterval) + controllerName, err := ConnectTarget(transportAddress, transportServiceID, subsystemNQN, i.executor) + if err != nil { + i.logger.WithError(err).Warn("Failed to connect target") + time.Sleep(retryInterval) continue } + + i.SubsystemNQN = subsystemNQN + i.ControllerName = controllerName break } if i.ControllerName == "" { - return dmDeviceBusy, fmt.Errorf("failed to start NVMe initiator %s within %d * %v sec retries", i.Name, RetryCounts, RetryInterval.Seconds()) + return dmDeviceBusy, fmt.Errorf("failed to start NVMe initiator %s within %d * %v sec retries", i.Name, maxNumRetries, retryInterval.Seconds()) } - for t := 0; t < int(waitDeviceTimeout.Seconds()); t++ { - if err = i.loadNVMeDeviceInfoWithoutLock(); err == nil { + for r := 0; r < maxNumWaitDeviceRetries; r++ { + err = i.loadNVMeDeviceInfoWithoutLock(i.TransportAddress, i.TransportServiceID, i.SubsystemNQN) + if err == nil { break } - time.Sleep(time.Second) + time.Sleep(waitDeviceInterval) } if err != nil { return dmDeviceBusy, errors.Wrapf(err, "failed to load device info after starting NVMe initiator %s", i.Name) } needMakeEndpoint := true - if needDmDeviceCleanup { + if dmDeviceCleanupRequired { if dmDeviceBusy { // Endpoint is already created, just replace the target device needMakeEndpoint = false - i.logger.Infof("Linear dm device %s is busy, trying the best to replace the target device for NVMe initiator %s", i.Name, i.Name) + i.logger.Infof("Linear dm device is busy, trying the best to replace the target device for NVMe initiator %s", i.Name) if err := i.replaceDmDeviceTarget(); err != nil { i.logger.WithError(err).Warnf("Failed to replace the target device for NVMe initiator %s", i.Name) } else { @@ -206,7 +336,7 @@ func (i *Initiator) Start(transportAddress, transportServiceID string, needDmDev } else { i.logger.Infof("Creating linear dm device for NVMe initiator %s", i.Name) if err := i.createLinearDmDevice(); err != nil { - return dmDeviceBusy, errors.Wrapf(err, "failed to create linear dm device for NVMe initiator %s", i.Name) + return false, errors.Wrapf(err, "failed to create linear dm device for NVMe initiator %s", i.Name) } } } else { @@ -226,7 +356,7 @@ func (i *Initiator) Start(transportAddress, transportServiceID string, needDmDev return dmDeviceBusy, nil } -func (i *Initiator) Stop(needDmDeviceCleanup, deferDmDeviceCleanup, errOnBusyDmDevice bool) (bool, error) { +func (i *Initiator) Stop(dmDeviceCleanupRequired, deferDmDeviceCleanup, errOnBusyDmDevice bool) (bool, error) { if i.hostProc != "" { lock := nsfilelock.NewLockWithTimeout(util.GetHostNamespacePath(i.hostProc), LockFile, LockTimeout) if err := lock.Lock(); err != nil { @@ -235,7 +365,7 @@ func (i *Initiator) Stop(needDmDeviceCleanup, deferDmDeviceCleanup, errOnBusyDmD defer lock.Unlock() } - return i.stopWithoutLock(needDmDeviceCleanup, deferDmDeviceCleanup, errOnBusyDmDevice) + return i.stopWithoutLock(dmDeviceCleanupRequired, deferDmDeviceCleanup, errOnBusyDmDevice) } func (i *Initiator) removeDmDeviceAndEndpoint(deferDmDeviceCleanup, errOnBusyDmDevice bool) (bool, error) { @@ -254,9 +384,9 @@ func (i *Initiator) removeDmDeviceAndEndpoint(deferDmDeviceCleanup, errOnBusyDmD return false, nil } -func (i *Initiator) stopWithoutLock(needDmDeviceCleanup, deferDmDeviceCleanup, errOnBusyDmDevice bool) (bool, error) { +func (i *Initiator) stopWithoutLock(dmDeviceCleanupRequired, deferDmDeviceCleanup, errOnBusyDmDevice bool) (bool, error) { dmDeviceBusy := false - if needDmDeviceCleanup { + if dmDeviceCleanupRequired { var err error dmDeviceBusy, err = i.removeDmDeviceAndEndpoint(deferDmDeviceCleanup, errOnBusyDmDevice) if err != nil { @@ -299,7 +429,7 @@ func (i *Initiator) GetEndpoint() string { return "" } -func (i *Initiator) LoadNVMeDeviceInfo() (err error) { +func (i *Initiator) LoadNVMeDeviceInfo(transportAddress, transportServiceID, subsystemNQN string) (err error) { if i.hostProc != "" { lock := nsfilelock.NewLockWithTimeout(util.GetHostNamespacePath(i.hostProc), LockFile, LockTimeout) if err := lock.Lock(); err != nil { @@ -308,11 +438,11 @@ func (i *Initiator) LoadNVMeDeviceInfo() (err error) { defer lock.Unlock() } - return i.loadNVMeDeviceInfoWithoutLock() + return i.loadNVMeDeviceInfoWithoutLock(transportAddress, transportServiceID, subsystemNQN) } -func (i *Initiator) loadNVMeDeviceInfoWithoutLock() error { - nvmeDevices, err := GetDevices(i.TransportAddress, i.TransportServiceID, i.SubsystemNQN, i.executor) +func (i *Initiator) loadNVMeDeviceInfoWithoutLock(transportAddress, transportServiceID, subsystemNQN string) error { + nvmeDevices, err := GetDevices(transportAddress, transportServiceID, subsystemNQN, i.executor) if err != nil { return err } @@ -341,7 +471,9 @@ func (i *Initiator) loadNVMeDeviceInfoWithoutLock() error { return errors.Wrapf(err, "cannot find the device for NVMe initiator %s with namespace name %s", i.Name, i.NamespaceName) } - i.dev = dev + i.dev = &util.LonghornBlockDevice{ + Nvme: *dev, + } return nil } @@ -368,20 +500,22 @@ func (i *Initiator) LoadEndpoint(dmDeviceBusy bool) error { return err } - depDevices, err := i.findDependentDevices(dev.Nvme.Name) + depDevices, err := i.findDependentDevices(dev.Name) if err != nil { return err } if dmDeviceBusy { - i.logger.Debugf("Skipping endpoint %v loading for NVMe initiator %s due to device busy", i.Endpoint, i.Name) + i.logger.Debugf("Skipping endpoint %v loading for NVMe initiator %v due to device busy", i.Endpoint, i.Name) } else { if i.NamespaceName != "" && !i.isNamespaceExist(depDevices) { - return fmt.Errorf("detected device %s name mismatching from endpoint %v for NVMe initiator %s", dev.Nvme.Name, i.Endpoint, i.Name) + return fmt.Errorf("detected device %s name mismatching from endpoint %v for NVMe initiator %s", dev.Name, i.Endpoint, i.Name) } } - i.dev = dev + i.dev = &util.LonghornBlockDevice{ + Export: *dev, + } i.isUp = true return nil @@ -472,10 +606,32 @@ func (i *Initiator) suspendLinearDmDevice(noflush, nolockfs bool) error { return util.DmsetupSuspend(i.Name, noflush, nolockfs, i.executor) } -func (i *Initiator) resumeLinearDmDevice() error { - logrus.Infof("Resuming linear dm device %s", i.Name) +// ReloadDmDevice reloads the linear dm device +func (i *Initiator) ReloadDmDevice() (err error) { + if i.hostProc != "" { + lock := nsfilelock.NewLockWithTimeout(util.GetHostNamespacePath(i.hostProc), LockFile, LockTimeout) + if err := lock.Lock(); err != nil { + return errors.Wrapf(err, "failed to get file lock for NVMe initiator %s", i.Name) + } + defer lock.Unlock() + } - return util.DmsetupResume(i.Name, i.executor) + return i.reloadLinearDmDevice() +} + +// IsSuspended checks if the linear dm device is suspended +func (i *Initiator) IsSuspended() (bool, error) { + devices, err := util.DmsetupInfo(i.Name, i.executor) + if err != nil { + return false, err + } + + for _, device := range devices { + if device.Name == i.Name { + return device.Suspended, nil + } + } + return false, fmt.Errorf("failed to find linear dm device %s", i.Name) } func (i *Initiator) reloadLinearDmDevice() error { @@ -496,7 +652,7 @@ func (i *Initiator) reloadLinearDmDevice() error { table := fmt.Sprintf("0 %v linear %v 0", sectors, devPath) - logrus.Infof("Reloading linear dm device %s with table %s", i.Name, table) + logrus.Infof("Reloading linear dm device %s with table '%s'", i.Name, table) return util.DmsetupReload(i.Name, table, i.executor) } @@ -504,3 +660,7 @@ func (i *Initiator) reloadLinearDmDevice() error { func getDmDevicePath(name string) string { return fmt.Sprintf("/dev/mapper/%s", name) } + +func IsValidNvmeDeviceNotFound(err error) bool { + return strings.Contains(err.Error(), ErrorMessageCannotFindValidNvmeDevice) +} diff --git a/vendor/github.com/longhorn/go-spdk-helper/pkg/nvme/nvme.go b/vendor/github.com/longhorn/go-spdk-helper/pkg/nvme/nvme.go index 6f9fd0681..88ed9c7e8 100644 --- a/vendor/github.com/longhorn/go-spdk-helper/pkg/nvme/nvme.go +++ b/vendor/github.com/longhorn/go-spdk-helper/pkg/nvme/nvme.go @@ -10,6 +10,10 @@ import ( commonNs "github.com/longhorn/go-common-libs/ns" ) +const ( + ErrorMessageCannotFindValidNvmeDevice = "cannot find a valid nvme device" +) + // DiscoverTarget discovers a target func DiscoverTarget(ip, port string, executor *commonNs.Executor) (subnqn string, err error) { hostID, err := getHostID(executor) @@ -162,7 +166,7 @@ func GetDevices(ip, port, nqn string, executor *commonNs.Executor) (devices []De } } - return nil, fmt.Errorf("cannot find a valid nvme device with subsystem NQN %s and address %s:%s", nqn, ip, port) + return nil, fmt.Errorf(ErrorMessageCannotFindValidNvmeDevice+" with subsystem NQN %s and address %s:%s", nqn, ip, port) } return res, nil } @@ -171,3 +175,8 @@ func GetDevices(ip, port, nqn string, executor *commonNs.Executor) (devices []De func GetSubsystems(executor *commonNs.Executor) (subsystems []Subsystem, err error) { return listSubsystems("", executor) } + +// Flush commits data and metadata associated with the specified namespace(s) to nonvolatile media. +func Flush(device, namespaceID string, executor *commonNs.Executor) (output string, err error) { + return flush(device, namespaceID, executor) +} diff --git a/vendor/github.com/longhorn/go-spdk-helper/pkg/nvme/nvmecli.go b/vendor/github.com/longhorn/go-spdk-helper/pkg/nvme/nvmecli.go index 2477a0d20..91a6300d6 100644 --- a/vendor/github.com/longhorn/go-spdk-helper/pkg/nvme/nvmecli.go +++ b/vendor/github.com/longhorn/go-spdk-helper/pkg/nvme/nvmecli.go @@ -17,7 +17,9 @@ const ( DefaultTransportType = "tcp" // Set short ctrlLossTimeoutSec for quick response to the controller loss. - defaultCtrlLossTimeoutSec = 30 + defaultCtrlLossTmo = 30 + defaultKeepAliveTmo = 5 + defaultReconnectDelay = 10 ) type Device struct { @@ -306,7 +308,9 @@ func connect(hostID, hostNQN, nqn, transpotType, ip, port string, executor *comm "connect", "-t", transpotType, "--nqn", nqn, - "--ctrl-loss-tmo", strconv.Itoa(defaultCtrlLossTimeoutSec), + "--ctrl-loss-tmo", strconv.Itoa(defaultCtrlLossTmo), + "--keep-alive-tmo", strconv.Itoa(defaultKeepAliveTmo), + "--reconnect-delay", strconv.Itoa(defaultReconnectDelay), "-o", "json", } @@ -409,3 +413,18 @@ func GetIPAndPortFromControllerAddress(address string) (string, string) { return traddr, trsvcid } + +func flush(devicePath, namespaceID string, executor *commonNs.Executor) (string, error) { + + opts := []string{ + "flush", + devicePath, + "-o", "json", + } + + if namespaceID != "" { + opts = append(opts, "-n", namespaceID) + } + + return executor.Execute(nil, nvmeBinary, opts, types.ExecuteTimeout) +} diff --git a/vendor/github.com/longhorn/go-spdk-helper/pkg/types/types.go b/vendor/github.com/longhorn/go-spdk-helper/pkg/types/types.go index f717cd9c7..291dd6d8b 100644 --- a/vendor/github.com/longhorn/go-spdk-helper/pkg/types/types.go +++ b/vendor/github.com/longhorn/go-spdk-helper/pkg/types/types.go @@ -18,6 +18,10 @@ const ( FrontendSPDKTCPNvmf = "spdk-tcp-nvmf" FrontendSPDKTCPBlockdev = "spdk-tcp-blockdev" + ShallowCopyStateInProgress = "in progress" + ShallowCopyStateComplete = "complete" + ShallowCopyStateError = "error" + DefaultCtrlrLossTimeoutSec = 30 // DefaultReconnectDelaySec can't be more than DefaultFastIoFailTimeoutSec, same for non-default values. DefaultReconnectDelaySec = 5 diff --git a/vendor/github.com/longhorn/go-spdk-helper/pkg/util/device.go b/vendor/github.com/longhorn/go-spdk-helper/pkg/util/device.go index c3beb0a69..f55f4fd85 100644 --- a/vendor/github.com/longhorn/go-spdk-helper/pkg/util/device.go +++ b/vendor/github.com/longhorn/go-spdk-helper/pkg/util/device.go @@ -35,7 +35,7 @@ type BlockDevices struct { Devices []BlockDevice `json:"blockdevices"` } -type KernelDevice struct { +type LonghornBlockDevice struct { Nvme BlockDevice Export BlockDevice } @@ -51,19 +51,18 @@ func RemoveDevice(dev string) error { } // GetKnownDevices returns the path of the device with the given major and minor numbers -func GetKnownDevices(executor *commonNs.Executor) (map[string]*KernelDevice, error) { - knownDevices := make(map[string]*KernelDevice) - - /* Example command output - $ lsblk -l -n -o NAME,MAJ:MIN - sda 8:0 - sdb 8:16 - sdc 8:32 - nvme0n1 259:0 - nvme0n1p1 259:1 - nvme0n1p128 259:2 - nvme1n1 259:3 - */ +func GetKnownDevices(executor *commonNs.Executor) (map[string]*LonghornBlockDevice, error) { + knownDevices := make(map[string]*LonghornBlockDevice) + + // Example command output + // $ lsblk -l -n -o NAME,MAJ:MIN + // sda 8:0 + // sdb 8:16 + // sdc 8:32 + // nvme0n1 259:0 + // nvme0n1p1 259:1 + // nvme0n1p128 259:2 + // nvme1n1 259:3 opts := []string{ "-l", "-n", "-o", "NAME,MAJ:MIN", @@ -79,7 +78,7 @@ func GetKnownDevices(executor *commonNs.Executor) (map[string]*KernelDevice, err line := scanner.Text() f := strings.Fields(line) if len(f) == 2 { - dev := &KernelDevice{ + dev := &LonghornBlockDevice{ Nvme: BlockDevice{ Name: f[0], }, @@ -95,11 +94,10 @@ func GetKnownDevices(executor *commonNs.Executor) (map[string]*KernelDevice, err } // DetectDevice detects the device with the given path -func DetectDevice(path string, executor *commonNs.Executor) (*KernelDevice, error) { - /* Example command output - $ lsblk -l -n -o NAME,MAJ:MIN - nvme1n1 259:3 - */ +func DetectDevice(path string, executor *commonNs.Executor) (*BlockDevice, error) { + // Example command output + // $ lsblk -l -n -o NAME,MAJ:MIN + // nvme1n1 259:3 opts := []string{ path, "-n", "-o", "NAME,MAJ:MIN", "--nodeps", @@ -110,19 +108,18 @@ func DetectDevice(path string, executor *commonNs.Executor) (*KernelDevice, erro return nil, err } - var dev *KernelDevice + var dev *BlockDevice scanner := bufio.NewScanner(strings.NewReader(output)) for scanner.Scan() { line := scanner.Text() f := strings.Fields(line) if len(f) == 2 { - dev = &KernelDevice{ - Nvme: BlockDevice{ - Name: f[0], - }, + dev = &BlockDevice{ + Name: f[0], } - if _, err := fmt.Sscanf(f[1], "%d:%d", &dev.Nvme.Major, &dev.Nvme.Minor); err != nil { - return nil, fmt.Errorf("invalid major:minor %s for device %s with path %s", dev.Nvme.Name, f[1], path) + _, err = fmt.Sscanf(f[1], "%d:%d", &dev.Major, &dev.Minor) + if err != nil { + return nil, fmt.Errorf("invalid major:minor %s for device %s with path %s", dev.Name, f[1], path) } } break // nolint:staticcheck @@ -210,7 +207,7 @@ func GetDeviceNumbers(devPath string, executor *commonNs.Executor) (int, int, er } // DuplicateDevice creates a device node for the given device -func DuplicateDevice(dev *KernelDevice, dest string) error { +func DuplicateDevice(dev *LonghornBlockDevice, dest string) error { if dev == nil { return fmt.Errorf("found nil device for device duplication") } diff --git a/vendor/github.com/longhorn/go-spdk-helper/pkg/util/dmsetup.go b/vendor/github.com/longhorn/go-spdk-helper/pkg/util/dmsetup.go index c761ca0c2..591012c89 100644 --- a/vendor/github.com/longhorn/go-spdk-helper/pkg/util/dmsetup.go +++ b/vendor/github.com/longhorn/go-spdk-helper/pkg/util/dmsetup.go @@ -1,7 +1,9 @@ package util import ( + "fmt" "regexp" + "strings" commonNs "github.com/longhorn/go-common-libs/ns" @@ -98,3 +100,76 @@ func parseDependentDevicesFromString(str string) []string { return devices } + +type DeviceInfo struct { + Name string + BlockDeviceName string + TableLive bool + TableInactive bool + Suspended bool + ReadOnly bool + Major uint32 + Minor uint32 + OpenCount uint32 // Open reference count + TargetCount uint32 // Number of targets in the live table + EventNumber uint32 // Last event sequence number (used by wait) +} + +// DmsetupInfo returns the information of the device mapper device with the given name +func DmsetupInfo(dmDeviceName string, executor *commonNs.Executor) ([]*DeviceInfo, error) { + opts := []string{ + "info", + "--columns", + "--noheadings", + "-o", + "name,blkdevname,attr,major,minor,open,segments,events", + "--separator", + " ", + dmDeviceName, + } + + outputStr, err := executor.Execute(nil, dmsetupBinary, opts, types.ExecuteTimeout) + if err != nil { + return nil, err + } + + var ( + lines = strings.Split(outputStr, "\n") + devices = []*DeviceInfo{} + ) + + for _, line := range lines { + var ( + attr = "" + info = &DeviceInfo{} + ) + + // Break, if the line is empty or EOF + if line == "" { + break + } + + _, err := fmt.Sscan(line, + &info.Name, + &info.BlockDeviceName, + &attr, + &info.Major, + &info.Minor, + &info.OpenCount, + &info.TargetCount, + &info.EventNumber) + if err != nil { + continue + } + + // Parse attributes (see "man 8 dmsetup" for details) + info.Suspended = strings.Contains(attr, "s") + info.ReadOnly = strings.Contains(attr, "r") + info.TableLive = strings.Contains(attr, "L") + info.TableInactive = strings.Contains(attr, "I") + + devices = append(devices, info) + } + + return devices, nil +} diff --git a/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/api/types.go b/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/api/types.go index 3ea04627b..19e634c28 100644 --- a/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/api/types.go +++ b/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/api/types.go @@ -127,6 +127,8 @@ type Engine struct { ActualSize uint64 `json:"actual_size"` IP string `json:"ip"` Port int32 `json:"port"` + TargetIP string `json:"target_ip"` + TargetPort int32 `json:"target_port"` ReplicaAddressMap map[string]string `json:"replica_address_map"` ReplicaModeMap map[string]types.Mode `json:"replica_mode_map"` Head *Lvol `json:"head"` @@ -145,6 +147,8 @@ func ProtoEngineToEngine(e *spdkrpc.Engine) *Engine { ActualSize: e.ActualSize, IP: e.Ip, Port: e.Port, + TargetIP: e.TargetIp, + TargetPort: e.TargetPort, ReplicaAddressMap: e.ReplicaAddressMap, ReplicaModeMap: map[string]types.Mode{}, Head: ProtoLvolToLvol(e.Head), diff --git a/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/client/client.go b/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/client/client.go index b1d68d1f2..9ef67706c 100644 --- a/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/client/client.go +++ b/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/client/client.go @@ -359,7 +359,8 @@ func (c *SPDKClient) ReplicaRebuildingDstSnapshotRevert(name, snapshotName strin return errors.Wrapf(err, "failed to revert dst SPDK replica %s rebuilding snapshot %s", name, snapshotName) } -func (c *SPDKClient) EngineCreate(name, volumeName, frontend string, specSize uint64, replicaAddressMap map[string]string, portCount int32) (*api.Engine, error) { +func (c *SPDKClient) EngineCreate(name, volumeName, frontend string, specSize uint64, replicaAddressMap map[string]string, portCount int32, + initiatorAddress, targetAddress string, upgradeRequired bool) (*api.Engine, error) { if name == "" || volumeName == "" || len(replicaAddressMap) == 0 { return nil, fmt.Errorf("failed to start SPDK engine: missing required parameters") } @@ -375,6 +376,9 @@ func (c *SPDKClient) EngineCreate(name, volumeName, frontend string, specSize ui ReplicaAddressMap: replicaAddressMap, Frontend: frontend, PortCount: portCount, + UpgradeRequired: upgradeRequired, + TargetAddress: targetAddress, + InitiatorAddress: initiatorAddress, }) if err != nil { return nil, errors.Wrap(err, "failed to start SPDK engine") @@ -416,6 +420,79 @@ func (c *SPDKClient) EngineGet(name string) (*api.Engine, error) { return api.ProtoEngineToEngine(resp), nil } +func (c *SPDKClient) EngineSuspend(name string) error { + if name == "" { + return fmt.Errorf("failed to suspend engine: missing required parameter") + } + + client := c.getSPDKServiceClient() + ctx, cancel := context.WithTimeout(context.Background(), GRPCServiceTimeout) + defer cancel() + + _, err := client.EngineSuspend(ctx, &spdkrpc.EngineSuspendRequest{ + Name: name, + }) + if err != nil { + return errors.Wrapf(err, "failed to suspend engine %v", name) + } + return nil +} + +func (c *SPDKClient) EngineResume(name string) error { + if name == "" { + return fmt.Errorf("failed to resume engine: missing required parameter") + } + + client := c.getSPDKServiceClient() + ctx, cancel := context.WithTimeout(context.Background(), GRPCServiceTimeout) + defer cancel() + + _, err := client.EngineResume(ctx, &spdkrpc.EngineResumeRequest{ + Name: name, + }) + if err != nil { + return errors.Wrapf(err, "failed to resume engine %v", name) + } + return nil +} + +func (c *SPDKClient) EngineSwitchOverTarget(name, targetAddress string) error { + if name == "" { + return fmt.Errorf("failed to switch over target for engine: missing required parameter") + } + + client := c.getSPDKServiceClient() + ctx, cancel := context.WithTimeout(context.Background(), GRPCServiceTimeout) + defer cancel() + + _, err := client.EngineSwitchOverTarget(ctx, &spdkrpc.EngineSwitchOverTargetRequest{ + Name: name, + TargetAddress: targetAddress, + }) + if err != nil { + return errors.Wrapf(err, "failed to switch over target for engine %v", name) + } + return nil +} + +func (c *SPDKClient) EngineDeleteTarget(name string) error { + if name == "" { + return fmt.Errorf("failed to delete target for engine: missing required parameter") + } + + client := c.getSPDKServiceClient() + ctx, cancel := context.WithTimeout(context.Background(), GRPCServiceTimeout) + defer cancel() + + _, err := client.EngineDeleteTarget(ctx, &spdkrpc.EngineDeleteTargetRequest{ + Name: name, + }) + if err != nil { + return errors.Wrapf(err, "failed to delete target for engine %v", name) + } + return nil +} + func (c *SPDKClient) EngineList() (map[string]*api.Engine, error) { client := c.getSPDKServiceClient() ctx, cancel := context.WithTimeout(context.Background(), GRPCServiceTimeout) diff --git a/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/disk.go b/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/disk.go index 75eb8c045..c4a21c73c 100644 --- a/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/disk.go +++ b/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/disk.go @@ -193,10 +193,10 @@ func getDiskID(filename string) (string, error) { dev, err := spdkutil.DetectDevice(filename, executor) if err != nil { - return "", errors.Wrap(err, "failed to detect disk device") + return "", errors.Wrapf(err, "failed to detect disk device %v", filename) } - return fmt.Sprintf("%d-%d", dev.Nvme.Major, dev.Nvme.Minor), nil + return fmt.Sprintf("%d-%d", dev.Major, dev.Minor), nil } func validateAioDiskCreation(spdkClient *spdkclient.Client, diskPath string, diskDriver commonTypes.DiskDriver) error { diff --git a/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/engine.go b/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/engine.go index cf0039a15..5c16f3eca 100644 --- a/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/engine.go +++ b/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/engine.go @@ -42,6 +42,8 @@ type Engine struct { ReplicaModeMap map[string]types.Mode IP string Port int32 + TargetIP string + TargetPort int32 Frontend string Endpoint string Nqn string @@ -95,8 +97,14 @@ func NewEngine(engineName, volumeName, frontend string, specSize uint64, engineU } } -func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[string]string, portCount int32, superiorPortAllocator *util.Bitmap) (ret *spdkrpc.Engine, err error) { - e.log.Infof("Creating engine with replicas %+v", replicaAddressMap) +func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[string]string, portCount int32, superiorPortAllocator *util.Bitmap, initiatorAddress, targetAddress string, upgradeRequired bool) (ret *spdkrpc.Engine, err error) { + logrus.WithFields(logrus.Fields{ + "portCount": portCount, + "upgradeRequired": upgradeRequired, + "replicaAddressMap": replicaAddressMap, + "initiatorAddress": initiatorAddress, + "targetAddress": targetAddress, + }).Info("Creating engine") requireUpdate := true @@ -109,9 +117,26 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str } }() + podIP, err := commonNet.GetIPForPod() + if err != nil { + return nil, err + } + + initiatorIP, _, err := splitHostPort(initiatorAddress) + if err != nil { + return nil, errors.Wrapf(err, "failed to split initiator address %v", initiatorAddress) + } + targetIP, _, err := splitHostPort(targetAddress) + if err != nil { + return nil, errors.Wrapf(err, "failed to split target address %v", targetAddress) + } + if e.State != types.InstanceStatePending { - requireUpdate = false - return nil, fmt.Errorf("invalid state %s for engine %s creation", e.State, e.Name) + switchingOverBack := e.State == types.InstanceStateRunning && initiatorIP == targetIP + if !switchingOverBack { + requireUpdate = false + return nil, fmt.Errorf("invalid state %s for engine %s creation", e.State, e.Name) + } } defer func() { @@ -131,39 +156,99 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str } }() - podIP, err := commonNet.GetIPForPod() - if err != nil { - return nil, err - } - e.IP = podIP - e.log = e.log.WithField("ip", podIP) + e.Nqn = helpertypes.GetNQN(e.Name) replicaBdevList := []string{} - for replicaName, replicaAddr := range replicaAddressMap { - bdevName, err := e.connectReplica(spdkClient, replicaName, replicaAddr) + + initiatorCreationRequired := true + if !upgradeRequired { + if e.IP == "" { + if initiatorIP != targetIP { + // For creating target on another node + initiatorCreationRequired = false + e.log.Info("Creating an target engine") + e.TargetIP = podIP + } else { + // For newly creating engine + e.log.Info("Creating an new engine") + e.IP = podIP + e.TargetIP = podIP + } + + e.log = e.log.WithField("ip", e.IP) + } else { + if initiatorIP != targetIP { + return nil, errors.Errorf("unsupported operation: engine ip=%v, initiator address=%v, target address=%v", e.IP, initiatorAddress, targetAddress) + } + + // For creating target on attached node + initiatorCreationRequired = false + } + + for replicaName, replicaAddr := range replicaAddressMap { + bdevName, err := e.connectReplica(spdkClient, replicaName, replicaAddr) + if err != nil { + e.log.WithError(err).Errorf("Failed to get bdev from replica %s with address %s, will skip it and continue", replicaName, replicaAddr) + e.ReplicaModeMap[replicaName] = types.ModeERR + e.ReplicaBdevNameMap[replicaName] = "" + continue + } + // TODO: Check if a replica is really a RW replica rather than a rebuilding failed replica + e.ReplicaModeMap[replicaName] = types.ModeRW + e.ReplicaBdevNameMap[replicaName] = bdevName + replicaBdevList = append(replicaBdevList, bdevName) + } + e.ReplicaAddressMap = replicaAddressMap + e.log = e.log.WithField("replicaAddressMap", replicaAddressMap) + + e.CheckAndUpdateInfoFromReplica() + + e.log.Info("Launching raid during engine creation") + if _, err := spdkClient.BdevRaidCreate(e.Name, spdktypes.BdevRaidLevel1, 0, replicaBdevList); err != nil { + return nil, err + } + } else { + // For reconstructing engine after switching over target to another node + initiatorCreationRequired = false + + e.IP = targetIP + + // Get ReplicaModeMap and ReplicaBdevNameMap + targetSPDKClient, err := GetServiceClient(net.JoinHostPort(e.IP, strconv.Itoa(types.SPDKServicePort))) if err != nil { - e.log.WithError(err).Errorf("Failed to get bdev from replica %s with address %s, will skip it and continue", replicaName, replicaAddr) - e.ReplicaModeMap[replicaName] = types.ModeERR - e.ReplicaBdevNameMap[replicaName] = "" - continue + return nil, err } - // TODO: Check if a replica is really a RW replica rather than a rebuilding failed replica - e.ReplicaModeMap[replicaName] = types.ModeRW - e.ReplicaBdevNameMap[replicaName] = bdevName - replicaBdevList = append(replicaBdevList, bdevName) - } - e.ReplicaAddressMap = replicaAddressMap - e.log = e.log.WithField("replicaAddressMap", replicaAddressMap) - e.CheckAndUpdateInfoFromReplica() + var engineWithTarget *api.Engine + if initiatorIP != targetIP { + engineWithTarget, err = targetSPDKClient.EngineGet(e.Name) + if err != nil { + return nil, errors.Wrapf(err, "failed to get engine %v from %v", e.Name, targetAddress) + } + } else { + engineWithTarget = api.ProtoEngineToEngine(e.getWithoutLock()) + } - e.log.Info("Launching RAID during engine creation") - if _, err := spdkClient.BdevRaidCreate(e.Name, spdktypes.BdevRaidLevel1, 0, replicaBdevList); err != nil { - return nil, err + for replicaName, replicaAddr := range replicaAddressMap { + _, ok := engineWithTarget.ReplicaAddressMap[replicaName] + if !ok { + e.log.WithError(err).Errorf("Failed to get bdev from replica %s with address %s, will skip it and continue", replicaName, replicaAddr) + e.ReplicaModeMap[replicaName] = types.ModeERR + e.ReplicaBdevNameMap[replicaName] = "" + continue + } + + e.ReplicaModeMap[replicaName] = types.ModeRW + e.ReplicaBdevNameMap[replicaName] = replicaName + replicaBdevList = append(replicaBdevList, replicaName) + } + + e.ReplicaAddressMap = replicaAddressMap + e.log = e.log.WithField("replicaAddressMap", replicaAddressMap) } - e.log.Info("Launching Frontend during engine creation") - if err := e.handleFrontend(spdkClient, portCount, superiorPortAllocator); err != nil { + e.log.Info("Launching frontend during engine creation") + if err := e.handleFrontend(spdkClient, portCount, superiorPortAllocator, initiatorCreationRequired, upgradeRequired, initiatorAddress, targetAddress); err != nil { return nil, err } @@ -195,8 +280,8 @@ func (e *Engine) connectReplica(spdkClient *spdkclient.Client, replicaName, repl return nvmeBdevNameList[0], nil } -func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, portCount int32, superiorPortAllocator *util.Bitmap) error { - if e.Frontend != types.FrontendEmpty && e.Frontend != types.FrontendSPDKTCPNvmf && e.Frontend != types.FrontendSPDKTCPBlockdev { +func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, portCount int32, superiorPortAllocator *util.Bitmap, initiatorCreationRequired, upgradeRequired bool, initiatorAddress, targetAddress string) (err error) { + if !types.IsFrontendSupported(e.Frontend) { return fmt.Errorf("unknown frontend type %s", e.Frontend) } @@ -205,42 +290,96 @@ func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, portCount int32, return nil } - e.Nqn = helpertypes.GetNQN(e.Name) - e.Nguid = commonUtils.RandomID(nvmeNguidLength) - - e.log.Info("Blindly stopping expose bdev for engine") - if err := spdkClient.StopExposeBdev(e.Nqn); err != nil { - return errors.Wrap(err, "failed to stop expose bdev for engine") + initiatorIP, _, err := splitHostPort(initiatorAddress) + if err != nil { + return errors.Wrapf(err, "failed to split initiator address %v", initiatorAddress) } - port, _, err := superiorPortAllocator.AllocateRange(portCount) + targetIP, targetPort, err := splitHostPort(targetAddress) if err != nil { - return err + return errors.Wrapf(err, "failed to split target address %v", targetAddress) } - portStr := strconv.Itoa(int(port)) - if err := spdkClient.StartExposeBdev(e.Nqn, e.Name, e.Nguid, e.IP, portStr); err != nil { - return err + e.Nqn = helpertypes.GetNQN(e.Name) + + var port int32 + if !upgradeRequired { + e.Nguid = commonUtils.RandomID(nvmeNguidLength) + + e.log.Info("Blindly stopping expose bdev for engine") + if err := spdkClient.StopExposeBdev(e.Nqn); err != nil { + return errors.Wrapf(err, "failed to blindly stop expose bdev for engine %v", e.Name) + } + + port, _, err = superiorPortAllocator.AllocateRange(portCount) + if err != nil { + return err + } + + e.log.Infof("Allocated port %v", port) + if err := spdkClient.StartExposeBdev(e.Nqn, e.Name, e.Nguid, targetIP, strconv.Itoa(int(port))); err != nil { + return err + } + + if initiatorCreationRequired { + e.Port = port + e.TargetPort = port + } else { + e.TargetPort = port + } + } else { + e.Port = targetPort } - e.Port = port - e.log = e.log.WithField("port", port) if e.Frontend == types.FrontendSPDKTCPNvmf { - e.Endpoint = GetNvmfEndpoint(e.Nqn, e.IP, e.Port) + e.Endpoint = GetNvmfEndpoint(e.Nqn, targetIP, port) + return nil + } + + if initiatorIP != targetIP && !upgradeRequired { + e.log.Infof("Initiator IP %v is different from target IP %s, will not start initiator for engine", initiatorIP, targetIP) return nil } initiator, err := nvme.NewInitiator(e.VolumeName, e.Nqn, nvme.HostProc) if err != nil { - return err + return errors.Wrapf(err, "failed to create initiator for engine %v", e.Name) } - dmDeviceBusy, err := initiator.Start(e.IP, portStr, true) - if err != nil { - return err + + dmDeviceBusy := false + if initiatorCreationRequired { + e.log.Info("Starting initiator for engine") + dmDeviceBusy, err = initiator.Start(targetIP, strconv.Itoa(int(port)), true) + if err != nil { + return errors.Wrapf(err, "failed to start initiator for engine %v", e.Name) + } + } else { + e.log.Info("Loading NVMe device info for engine") + err = initiator.LoadNVMeDeviceInfo(initiator.TransportAddress, initiator.TransportServiceID, initiator.SubsystemNQN) + if err != nil { + if nvme.IsValidNvmeDeviceNotFound(err) { + dmDeviceBusy, err = initiator.Start(targetIP, strconv.Itoa(int(targetPort)), true) + if err != nil { + return errors.Wrapf(err, "failed to start initiator for engine %v", e.Name) + } + } else { + return errors.Wrapf(err, "failed to load NVMe device info for engine %v", e.Name) + } + } + err = initiator.LoadEndpoint(false) + if err != nil { + return errors.Wrapf(err, "failed to load endpoint for engine %v", e.Name) + } + //dmDeviceBusy = true } + e.dmDeviceBusy = dmDeviceBusy e.Endpoint = initiator.GetEndpoint() - e.log = e.log.WithField("endpoint", e.Endpoint) + + e.log = e.log.WithFields(logrus.Fields{ + "endpoint": e.Endpoint, + "port": port, + }) return nil } @@ -296,10 +435,15 @@ func (e *Engine) Delete(spdkClient *spdkclient.Client, superiorPortAllocator *ut requireUpdate = true } - if e.Port != 0 { - if err := superiorPortAllocator.ReleaseRange(e.Port, e.Port); err != nil { + if e.TargetPort != 0 || e.Port != 0 { + port := e.TargetPort + if port == 0 { + port = e.Port + } + if err := superiorPortAllocator.ReleaseRange(port, port); err != nil { return err } + e.TargetPort = 0 e.Port = 0 requireUpdate = true } @@ -356,6 +500,8 @@ func (e *Engine) getWithoutLock() (res *spdkrpc.Engine) { ReplicaModeMap: map[string]spdkrpc.ReplicaMode{}, Ip: e.IP, Port: e.Port, + TargetIp: e.TargetIP, + TargetPort: e.TargetPort, Snapshots: map[string]*spdkrpc.Lvol{}, Frontend: e.Frontend, Endpoint: e.Endpoint, @@ -426,7 +572,11 @@ func (e *Engine) ValidateAndUpdate(spdkClient *spdkclient.Client) (err error) { return err } if e.IP != podIP { - return fmt.Errorf("found mismatching between engine IP %s and pod IP %s for engine %s", e.IP, podIP, e.Name) + // Skip the validation if the engine is being upgraded + if engineOnlyContainsInitiator(e) || engineOnlyContainsTarget(e) { + return nil + } + return fmt.Errorf("found mismatching between engine IP %s and pod IP %s for engine %v", e.IP, podIP, e.Name) } if err := e.validateAndUpdateFrontend(subsystemMap); err != nil { @@ -486,7 +636,7 @@ func (e *Engine) ValidateAndUpdate(spdkClient *spdkclient.Client) (err error) { mode, err := e.validateAndUpdateReplicaMode(replicaName, bdevMap[bdevName]) if err != nil { if e.ReplicaModeMap[replicaName] != types.ModeERR { - e.log.WithError(err).Errorf("Replica %s is invalid, will update the mode from %s to %s", replicaName, e.ReplicaModeMap[replicaName], types.ModeERR) + e.log.WithError(err).Errorf("Replica %v is invalid, will update the mode from %s to %s", replicaName, e.ReplicaModeMap[replicaName], types.ModeERR) e.ReplicaModeMap[replicaName] = types.ModeERR updateRequired = true } @@ -654,7 +804,7 @@ func (e *Engine) validateAndUpdateFrontend(subsystemMap map[string]*spdktypes.Nv if err != nil { return err } - if err := initiator.LoadNVMeDeviceInfo(); err != nil { + if err := initiator.LoadNVMeDeviceInfo(initiator.TransportAddress, initiator.TransportServiceID, initiator.SubsystemNQN); err != nil { if strings.Contains(err.Error(), "connecting state") || strings.Contains(err.Error(), "resetting state") { e.log.WithError(err).Warnf("Ignored to validate and update engine %v, because the device is still in a transient state", e.Name) @@ -1535,3 +1685,329 @@ func (e *Engine) RestoreStatus() (*spdkrpc.RestoreStatusResponse, error) { return resp, nil } + +// Suspend suspends the engine. IO operations will be suspended. +func (e *Engine) Suspend(spdkClient *spdkclient.Client) (err error) { + e.Lock() + defer func() { + e.Unlock() + + if err != nil { + if e.State != types.InstanceStateError { + e.State = types.InstanceStateError + e.log.WithError(err).Info("Failed to suspend engine, will mark the engine as error") + } + e.ErrorMsg = err.Error() + } else { + e.State = types.InstanceStateSuspended + e.ErrorMsg = "" + + e.log.Infof("Suspended engine") + } + + e.UpdateCh <- nil + }() + + e.log.Info("Creating initiator for suspending engine") + initiator, err := nvme.NewInitiator(e.VolumeName, e.Nqn, nvme.HostProc) + if err != nil { + return errors.Wrapf(err, "failed to create initiator for suspending engine %s", e.Name) + } + + e.log.Info("Suspending engine") + return initiator.Suspend(false, false) +} + +// Resume resumes the engine. IO operations will be resumed. +func (e *Engine) Resume(spdkClient *spdkclient.Client) (err error) { + e.Lock() + defer func() { + e.Unlock() + + if err != nil { + if e.State != types.InstanceStateError { + e.State = types.InstanceStateError + e.log.WithError(err).Info("Failed to resume engine, will mark the engine as error") + } + e.ErrorMsg = err.Error() + } else { + e.State = types.InstanceStateRunning + e.ErrorMsg = "" + + e.log.Infof("Resumed engine") + } + + e.UpdateCh <- nil + }() + + if e.State == types.InstanceStateRunning { + return nil + } + + e.log.Info("Creating initiator for resuming engine") + initiator, err := nvme.NewInitiator(e.VolumeName, e.Nqn, nvme.HostProc) + if err != nil { + return errors.Wrapf(err, "failed to create initiator for resuming engine %s", e.Name) + } + + e.log.Info("Resuming engine") + return initiator.Resume() +} + +// SwitchOverTarget function in the Engine struct is responsible for switching the engine's target to a new address. +func (e *Engine) SwitchOverTarget(spdkClient *spdkclient.Client, targetAddress string) (err error) { + e.log.Infof("Switching over engine to target address %s", targetAddress) + + currentTargetAddress := "" + + e.Lock() + defer func() { + e.Unlock() + + if err != nil { + e.log.WithError(err).Warnf("Failed to switch over engine to target address %s", targetAddress) + + if disconnected, errCheck := e.IsTargetDisconnected(); errCheck != nil { + e.log.WithError(errCheck).Warnf("Failed to check if target %s is disconnected", targetAddress) + } else if disconnected { + if errConnect := e.connectTarget(currentTargetAddress); errConnect != nil { + e.log.WithError(errConnect).Warnf("Failed to connect target back to %s", currentTargetAddress) + } else { + e.log.Infof("Connected target back to %s", currentTargetAddress) + + if errReload := e.reloadDevice(); errReload != nil { + e.log.WithError(errReload).Warnf("Failed to reload device mapper") + } else { + e.log.Infof("Reloaded device mapper for connecting target back to %s", currentTargetAddress) + } + } + } + } else { + e.ErrorMsg = "" + + e.log.Infof("Switched over target to %s", targetAddress) + } + + e.UpdateCh <- nil + }() + + initiator, err := nvme.NewInitiator(e.VolumeName, e.Nqn, nvme.HostProc) + if err != nil { + return errors.Wrapf(err, "failed to create initiator for engine %s target switchover", e.Name) + } + + suspended, err := initiator.IsSuspended() + if err != nil { + return errors.Wrapf(err, "failed to check if engine %s is suspended", e.Name) + } + if !suspended { + return fmt.Errorf("engine %s must be suspended before target switchover", e.Name) + } + + if err := initiator.LoadNVMeDeviceInfo(initiator.TransportAddress, initiator.TransportServiceID, initiator.SubsystemNQN); err != nil { + if !nvme.IsValidNvmeDeviceNotFound(err) { + return errors.Wrapf(err, "failed to load NVMe device info for engine %s target switchover", e.Name) + } + } + + currentTargetAddress = net.JoinHostPort(initiator.TransportAddress, initiator.TransportServiceID) + if e.isSwitchOverTargetRequired(currentTargetAddress, targetAddress) { + if currentTargetAddress != "" { + if err := e.disconnectTarget(currentTargetAddress); err != nil { + return err + } + } + + if err := e.connectTarget(targetAddress); err != nil { + return err + } + } + + // Replace IP and Port with the new target address. + // No need to update TargetIP and TargetPort, because target is not delete yet. + targetIP, targetPort, err := splitHostPort(targetAddress) + if err != nil { + return errors.Wrapf(err, "failed to split target address %s", targetAddress) + } + + e.IP = targetIP + e.Port = targetPort + + e.log.Info("Reloading device mapper after target switchover") + if err := e.reloadDevice(); err != nil { + return err + } + + return nil +} + +func (e *Engine) IsTargetDisconnected() (bool, error) { + initiator, err := nvme.NewInitiator(e.VolumeName, e.Nqn, nvme.HostProc) + if err != nil { + return false, errors.Wrapf(err, "failed to create initiator for checking engine %s target disconnected", e.Name) + } + + suspended, err := initiator.IsSuspended() + if err != nil { + return false, errors.Wrapf(err, "failed to check if engine %s is suspended", e.Name) + } + if !suspended { + return false, fmt.Errorf("engine %s must be suspended before checking target disconnected", e.Name) + } + + if err := initiator.LoadNVMeDeviceInfo(initiator.TransportAddress, initiator.TransportServiceID, initiator.SubsystemNQN); err != nil { + if !nvme.IsValidNvmeDeviceNotFound(err) { + return false, errors.Wrapf(err, "failed to load NVMe device info for checking engine %s target disconnected", e.Name) + } + } + + return initiator.TransportAddress == "" && initiator.TransportServiceID == "", nil +} + +func (e *Engine) reloadDevice() error { + initiator, err := nvme.NewInitiator(e.VolumeName, e.Nqn, nvme.HostProc) + if err != nil { + return errors.Wrapf(err, "failed to recreate initiator after engine %s target switchover", e.Name) + } + + if err := initiator.LoadNVMeDeviceInfo(initiator.TransportAddress, initiator.TransportServiceID, initiator.SubsystemNQN); err != nil { + return errors.Wrapf(err, "failed to load NVMe device info after engine %s target switchover", e.Name) + } + + if err := initiator.ReloadDmDevice(); err != nil { + return errors.Wrapf(err, "failed to reload device mapper after engine %s target switchover", e.Name) + } + + return nil +} + +func (e *Engine) disconnectTarget(targetAddress string) error { + initiator, err := nvme.NewInitiator(e.VolumeName, e.Nqn, nvme.HostProc) + if err != nil { + return errors.Wrapf(err, "failed to create initiator for engine %s disconnect target %v", e.Name, targetAddress) + } + + if err := initiator.LoadNVMeDeviceInfo(initiator.TransportAddress, initiator.TransportServiceID, initiator.SubsystemNQN); err != nil { + if !nvme.IsValidNvmeDeviceNotFound(err) { + return errors.Wrapf(err, "failed to load NVMe device info for engine %s disconnect target %v", e.Name, targetAddress) + } + } + + e.log.Infof("Disconnecting from old target %s before target switchover", targetAddress) + if err := initiator.DisconnectTarget(); err != nil { + return errors.Wrapf(err, "failed to disconnect from old target %s for engine %s", targetAddress, e.Name) + } + + // Make sure the old target is disconnected before connecting to the new targets + if err := initiator.WaitForDisconnect(maxNumRetries, retryInterval); err != nil { + return errors.Wrapf(err, "failed to wait for disconnect from old target %s for engine %s", targetAddress, e.Name) + } + + e.log.Infof("Disconnected from old target %s before target switchover", targetAddress) + + return nil +} + +func (e *Engine) connectTarget(targetAddress string) error { + if targetAddress == "" { + return fmt.Errorf("failed to connect target for engine %s: missing required parameter target address", e.Name) + } + + targetIP, targetPort, err := splitHostPort(targetAddress) + if err != nil { + return errors.Wrapf(err, "failed to split target address %s", targetAddress) + } + + initiator, err := nvme.NewInitiator(e.VolumeName, e.Nqn, nvme.HostProc) + if err != nil { + return errors.Wrapf(err, "failed to create initiator for engine %s connect target %v:%v", e.Name, targetIP, targetPort) + } + + if err := initiator.LoadNVMeDeviceInfo(initiator.TransportAddress, initiator.TransportServiceID, initiator.SubsystemNQN); err != nil { + if !nvme.IsValidNvmeDeviceNotFound(err) { + return errors.Wrapf(err, "failed to load NVMe device info for engine %s connect target %v:%v", e.Name, targetIP, targetPort) + } + } + + for r := 0; r < maxNumRetries; r++ { + e.log.Infof("Discovering target %v:%v before target switchover", targetIP, targetPort) + subsystemNQN, err := initiator.DiscoverTarget(targetIP, strconv.Itoa(int(targetPort))) + if err != nil { + e.log.WithError(err).Warnf("Failed to discover target %v:%v for target switchover", targetIP, targetPort) + time.Sleep(retryInterval) + continue + } + initiator.SubsystemNQN = subsystemNQN + + e.log.Infof("Connecting to target %v:%v before target switchover", targetIP, targetPort) + controllerName, err := initiator.ConnectTarget(targetIP, strconv.Itoa(int(targetPort)), e.Nqn) + if err != nil { + e.log.WithError(err).Warnf("Failed to connect to target %v:%v for target switchover", targetIP, targetPort) + time.Sleep(retryInterval) + continue + } + initiator.ControllerName = controllerName + break + } + + if initiator.SubsystemNQN == "" || initiator.ControllerName == "" { + return fmt.Errorf("failed to connect to target %v:%v for engine %v target switchover", targetIP, targetPort, e.Name) + } + + // Target is switched over, to avoid the error "failed to wait for connect to target", + // create a new initiator and wait for connect + initiator, err = nvme.NewInitiator(e.VolumeName, e.Nqn, nvme.HostProc) + if err != nil { + return errors.Wrapf(err, "failed to create initiator for engine %s wait for connect target %v:%v", e.Name, targetIP, targetPort) + } + + if err := initiator.WaitForConnect(maxNumRetries, retryInterval); err == nil { + return errors.Wrapf(err, "failed to wait for connect to target %v:%v for engine %v target switchover", targetIP, targetPort, e.Name) + } + + return nil +} + +// DeleteTarget deletes the target +func (e *Engine) DeleteTarget(spdkClient *spdkclient.Client, superiorPortAllocator *util.Bitmap) (err error) { + e.log.Infof("Deleting target") + + if err := spdkClient.StopExposeBdev(e.Nqn); err != nil { + return errors.Wrapf(err, "failed to stop expose bdev after engine %s target switchover", e.Name) + } + + if e.TargetPort != 0 { + if err := superiorPortAllocator.ReleaseRange(e.TargetPort, e.TargetPort); err != nil { + return err + } + e.TargetPort = 0 + } + + e.log.Infof("Deleting raid bdev %s before target switchover", e.Name) + if _, err := spdkClient.BdevRaidDelete(e.Name); err != nil && !jsonrpc.IsJSONRPCRespErrorNoSuchDevice(err) { + return errors.Wrapf(err, "failed to delete raid bdev after engine %s target switchover", e.Name) + } + + for replicaName := range e.ReplicaAddressMap { + e.log.Infof("Disconnecting replica %s after target switchover", replicaName) + if err := e.disconnectReplica(spdkClient, replicaName); err != nil { + e.log.WithError(err).Warnf("Failed to disconnect replica %s after target switchover", replicaName) + if e.ReplicaModeMap[replicaName] != types.ModeERR { + e.ReplicaModeMap[replicaName] = types.ModeERR + } + } + } + return nil +} + +func (e *Engine) isSwitchOverTargetRequired(oldTargetAddress, newTargetAddress string) bool { + return oldTargetAddress != newTargetAddress +} + +func engineOnlyContainsInitiator(e *Engine) bool { + return e.Port != 0 && e.TargetPort == 0 +} + +func engineOnlyContainsTarget(e *Engine) bool { + return e.Port == 0 && e.TargetPort != 0 +} diff --git a/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/replica.go b/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/replica.go index b83c1794d..33e36bd0e 100644 --- a/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/replica.go +++ b/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/replica.go @@ -1672,6 +1672,7 @@ func (r *Replica) BackupRestore(spdkClient *spdkclient.Client, backupUrl, snapsh defer func() { if err != nil { // nolint:staticcheck // TODO: Support snapshot revert for incremental restore + r.log.WithError(err).Error("Failed to start backup restore") } }() @@ -1686,8 +1687,8 @@ func (r *Replica) BackupRestore(spdkClient *spdkclient.Client, backupUrl, snapsh } go func() { - if completeErr := r.completeBackupRestore(spdkClient); completeErr != nil { - logrus.WithError(completeErr).Warn("Failed to complete backup restore") + if err := r.completeBackupRestore(spdkClient); err != nil { + logrus.WithError(err).Warn("Failed to complete backup restore") } }() diff --git a/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/server.go b/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/server.go index 1a4c3b572..32c8a120a 100644 --- a/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/server.go +++ b/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/server.go @@ -717,17 +717,29 @@ func (s *Server) EngineCreate(ctx context.Context, req *spdkrpc.EngineCreateRequ } s.Lock() - if _, ok := s.engineMap[req.Name]; ok { - s.Unlock() - return nil, grpcstatus.Errorf(grpccodes.AlreadyExists, "engine %v already exists", req.Name) + e, ok := s.engineMap[req.Name] + if ok { + // Check if the engine already exists. + // If the engine exists and the initiator address is the same as the target address, return AlreadyExists error. + if localTargetExists(e) && req.InitiatorAddress == req.TargetAddress { + s.Unlock() + return nil, grpcstatus.Errorf(grpccodes.AlreadyExists, "engine %v already exists", req.Name) + } + } + + if e == nil { + s.engineMap[req.Name] = NewEngine(req.Name, req.VolumeName, req.Frontend, req.SpecSize, s.updateChs[types.InstanceTypeEngine]) + e = s.engineMap[req.Name] } - s.engineMap[req.Name] = NewEngine(req.Name, req.VolumeName, req.Frontend, req.SpecSize, s.updateChs[types.InstanceTypeEngine]) - e := s.engineMap[req.Name] spdkClient := s.spdkClient s.Unlock() - return e.Create(spdkClient, req.ReplicaAddressMap, req.PortCount, s.portAllocator) + return e.Create(spdkClient, req.ReplicaAddressMap, req.PortCount, s.portAllocator, req.InitiatorAddress, req.TargetAddress, req.UpgradeRequired) +} + +func localTargetExists(e *Engine) bool { + return e.Port != 0 && e.TargetPort != 0 } func (s *Server) getLocalReplicaLvsNameMap(replicaMap map[string]string) (replicaLvsNameMap map[string]string) { @@ -766,6 +778,102 @@ func (s *Server) EngineDelete(ctx context.Context, req *spdkrpc.EngineDeleteRequ return &emptypb.Empty{}, nil } +func (s *Server) EngineSuspend(ctx context.Context, req *spdkrpc.EngineSuspendRequest) (ret *emptypb.Empty, err error) { + if req.Name == "" { + return nil, grpcstatus.Error(grpccodes.InvalidArgument, "engine name is required") + } + + s.RLock() + e := s.engineMap[req.Name] + s.RUnlock() + + if e == nil { + return nil, grpcstatus.Errorf(grpccodes.NotFound, "cannot find engine %v for suspension", req.Name) + } + + err = e.Suspend(s.spdkClient) + if err != nil { + return nil, grpcstatus.Error(grpccodes.Internal, errors.Wrapf(err, "failed to suspend engine %v", req.Name).Error()) + } + + return &emptypb.Empty{}, nil +} + +func (s *Server) EngineResume(ctx context.Context, req *spdkrpc.EngineResumeRequest) (ret *emptypb.Empty, err error) { + if req.Name == "" { + return nil, grpcstatus.Error(grpccodes.InvalidArgument, "engine name is required") + } + + s.RLock() + e := s.engineMap[req.Name] + s.RUnlock() + + if e == nil { + return nil, grpcstatus.Errorf(grpccodes.NotFound, "cannot find engine %v for resumption", req.Name) + } + + err = e.Resume(s.spdkClient) + if err != nil { + return nil, grpcstatus.Error(grpccodes.Internal, errors.Wrapf(err, "failed to resume engine %v", req.Name).Error()) + } + + return &emptypb.Empty{}, nil +} + +func (s *Server) EngineSwitchOverTarget(ctx context.Context, req *spdkrpc.EngineSwitchOverTargetRequest) (ret *emptypb.Empty, err error) { + if req.Name == "" || req.TargetAddress == "" { + return nil, grpcstatus.Error(grpccodes.InvalidArgument, "engine name and target address are required") + } + + s.RLock() + e := s.engineMap[req.Name] + s.RUnlock() + + if e == nil { + return nil, grpcstatus.Errorf(grpccodes.NotFound, "cannot find engine %v for target switchover", req.Name) + } + + err = e.SwitchOverTarget(s.spdkClient, req.TargetAddress) + if err != nil { + return nil, grpcstatus.Error(grpccodes.Internal, errors.Wrapf(err, "failed to switch over target for engine %v", req.Name).Error()) + } + + return &emptypb.Empty{}, nil +} + +func (s *Server) EngineDeleteTarget(ctx context.Context, req *spdkrpc.EngineDeleteTargetRequest) (ret *emptypb.Empty, err error) { + if req.Name == "" { + return nil, grpcstatus.Error(grpccodes.InvalidArgument, "engine name is required") + } + + s.RLock() + e := s.engineMap[req.Name] + s.RUnlock() + + if e == nil { + return nil, grpcstatus.Errorf(grpccodes.NotFound, "cannot find engine %s for target deletion", req.Name) + } + + defer func() { + if err == nil { + s.Lock() + // Only delete the engine if both initiator (e.Port) and target (e.TargetPort) are not exists. + if e.Port == 0 && e.TargetPort == 0 { + e.log.Info("Deleting engine %s", req.Name) + delete(s.engineMap, req.Name) + } + s.Unlock() + } + }() + + err = e.DeleteTarget(s.spdkClient, s.portAllocator) + if err != nil { + return nil, grpcstatus.Error(grpccodes.Internal, errors.Wrapf(err, "failed to delete target for engine %v", req.Name).Error()) + } + + return &emptypb.Empty{}, nil +} + func (s *Server) EngineGet(ctx context.Context, req *spdkrpc.EngineGetRequest) (ret *spdkrpc.Engine, err error) { s.RLock() e := s.engineMap[req.Name] diff --git a/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/types.go b/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/types.go index 138b4311d..c608be929 100644 --- a/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/types.go +++ b/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/types.go @@ -15,8 +15,6 @@ import ( "github.com/longhorn/longhorn-spdk-engine/pkg/util" ) -type ShallowCopyState string - const ( DiskTypeFilesystem = "filesystem" DiskTypeBlock = "block" @@ -28,6 +26,13 @@ const ( nvmeNguidLength = 32 + maxNumRetries = 15 + retryInterval = 1 * time.Second +) + +type ShallowCopyState string + +const ( ShallowCopyStateError = ShallowCopyState("error") ShallowCopyStateComplete = ShallowCopyState("complete") ) diff --git a/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/util.go b/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/util.go index 0b51f15d6..f62ffed0a 100644 --- a/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/util.go +++ b/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/util.go @@ -1,7 +1,9 @@ package spdk import ( + "net" "strconv" + "strings" "time" "github.com/pkg/errors" @@ -32,20 +34,40 @@ func exposeSnapshotLvolBdev(spdkClient *spdkclient.Client, lvsName, lvolName, ip return "", "", errors.Wrapf(err, "failed to expose snapshot lvol bdev %v", lvolName) } - for r := 0; r < nvme.RetryCounts; r++ { + for r := 0; r < maxNumRetries; r++ { subsystemNQN, err = nvme.DiscoverTarget(ip, portStr, executor) if err != nil { logrus.WithError(err).Errorf("Failed to discover target for snapshot lvol bdev %v", lvolName) - time.Sleep(nvme.RetryInterval) + time.Sleep(retryInterval) continue } controllerName, err = nvme.ConnectTarget(ip, portStr, subsystemNQN, executor) if err != nil { logrus.WithError(err).Errorf("Failed to connect target for snapshot lvol bdev %v", lvolName) - time.Sleep(nvme.RetryInterval) + time.Sleep(retryInterval) continue } } return subsystemNQN, controllerName, nil } + +func splitHostPort(address string) (string, int32, error) { + if strings.Contains(address, ":") { + host, port, err := net.SplitHostPort(address) + if err != nil { + return "", 0, errors.Wrapf(err, "failed to split host and port from address %v", address) + } + + portAsInt := 0 + if port != "" { + portAsInt, err = strconv.Atoi(port) + if err != nil { + return "", 0, errors.Wrapf(err, "failed to parse port %v", port) + } + } + return host, int32(portAsInt), nil + } + + return address, 0, nil +} diff --git a/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/types/types.go b/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/types/types.go index e7dc21d9a..32ab96d41 100644 --- a/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/types/types.go +++ b/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/types/types.go @@ -24,6 +24,7 @@ const ( InstanceStateRunning = "running" InstanceStateTerminating = "terminating" InstanceStateError = "error" + InstanceStateSuspended = "suspended" ) type InstanceType string @@ -60,3 +61,7 @@ func GRPCReplicaModeToReplicaMode(replicaMode spdkrpc.ReplicaMode) Mode { } return ModeERR } + +func IsFrontendSupported(frontend string) bool { + return frontend == FrontendEmpty || frontend == FrontendSPDKTCPNvmf || frontend == FrontendSPDKTCPBlockdev +} diff --git a/vendor/modules.txt b/vendor/modules.txt index a9840df79..cd2c5b7ab 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -208,7 +208,7 @@ github.com/longhorn/go-iscsi-helper/iscsidev github.com/longhorn/go-iscsi-helper/longhorndev github.com/longhorn/go-iscsi-helper/types github.com/longhorn/go-iscsi-helper/util -# github.com/longhorn/go-spdk-helper v0.0.0-20240702033003-35e00d17218f +# github.com/longhorn/go-spdk-helper v0.0.0-20240708060539-de887e9cc6db ## explicit; go 1.22.0 github.com/longhorn/go-spdk-helper/pkg/jsonrpc github.com/longhorn/go-spdk-helper/pkg/nvme @@ -230,7 +230,7 @@ github.com/longhorn/longhorn-engine/pkg/sync github.com/longhorn/longhorn-engine/pkg/types github.com/longhorn/longhorn-engine/pkg/util github.com/longhorn/longhorn-engine/pkg/util/disk -# github.com/longhorn/longhorn-spdk-engine v0.0.0-20240704075534-f807c4a293c6 +# github.com/longhorn/longhorn-spdk-engine v0.0.0-20240708130427-1b50687fc568 ## explicit; go 1.22.0 github.com/longhorn/longhorn-spdk-engine/pkg/api github.com/longhorn/longhorn-spdk-engine/pkg/client