diff --git a/go.mod b/go.mod index 19c1a9ce5..133b3c532 100644 --- a/go.mod +++ b/go.mod @@ -97,3 +97,5 @@ require ( sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect sigs.k8s.io/yaml v1.3.0 // indirect ) + +replace github.com/longhorn/longhorn-spdk-engine v0.0.0-20240811140223-087cddee60f1 => github.com/derekbit/longhorn-spdk-engine v0.0.0-20240812162349-6c3834620dfa diff --git a/go.sum b/go.sum index 86c28c129..1f2387b49 100644 --- a/go.sum +++ b/go.sum @@ -27,6 +27,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/derekbit/longhorn-spdk-engine v0.0.0-20240812162349-6c3834620dfa h1:uTt92hQmHjrtL5tOjXrlfi+AUng9qy4BANzRldx3QtM= +github.com/derekbit/longhorn-spdk-engine v0.0.0-20240812162349-6c3834620dfa/go.mod h1:AEYaufJlkiw6WAzYGdWIZUYdnGXKCk/yluLbdn+5gcc= github.com/dnaeon/go-vcr v1.1.0/go.mod h1:M7tiix8f0r6mKKJ3Yq/kqU1OYf3MnfmBWVbPx/yU9ko= github.com/dnaeon/go-vcr v1.2.0 h1:zHCHvJYTMh1N7xnV7zf1m1GPBF9Ad0Jk/whtQ1663qI= github.com/dnaeon/go-vcr v1.2.0/go.mod h1:R4UdLID7HZT3taECzJs4YgbbH6PIGXB6W/sc5OLb6RQ= @@ -101,8 +103,6 @@ github.com/longhorn/go-spdk-helper v0.0.0-20240811121608-9383fa59dd7c h1:ztjrkxi github.com/longhorn/go-spdk-helper v0.0.0-20240811121608-9383fa59dd7c/go.mod h1:BrtXoVnIZ97+uZ+cMcaICc5KnuJkh8j3+G1NjKxh+8Q= github.com/longhorn/longhorn-engine v1.7.0-rc3 h1:YTt++OeSrEOlifz++8VAOH/aJ4lGShD2TaJP1ZaQ3Uw= github.com/longhorn/longhorn-engine v1.7.0-rc3/go.mod h1:2Hq/3QzW4fF2yUg+kauiAT3ps5WCKLMkrwXW2Wyfj9o= -github.com/longhorn/longhorn-spdk-engine v0.0.0-20240811140223-087cddee60f1 h1:2hRz+mJySsxJ2D4aAYZcCMnu/8pZoTvNKL088yKaEW8= -github.com/longhorn/longhorn-spdk-engine v0.0.0-20240811140223-087cddee60f1/go.mod h1:AEYaufJlkiw6WAzYGdWIZUYdnGXKCk/yluLbdn+5gcc= github.com/longhorn/nsfilelock v0.0.0-20200723175406-fa7c83ad0003 h1:Jw9uANsGcHTxp6HcC++/vN17LfeuDmozHI2j6DoZf5E= github.com/longhorn/nsfilelock v0.0.0-20200723175406-fa7c83ad0003/go.mod h1:0CLeXlf59Lg6C0kjLSDf47ft73Dh37CwymYRKWwAn04= github.com/longhorn/sparse-tools v0.0.0-20240703010727-92451e38077a h1:+o63c0oh7ZNKeQdc0Hawfzz5vRa4LiDvLOtJYjegtnk= diff --git a/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/engine.go b/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/engine.go index de581ac64..af07bb25a 100644 --- a/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/engine.go +++ b/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/engine.go @@ -215,11 +215,16 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str e.IP = targetIP // Get ReplicaModeMap and ReplicaBdevNameMap - targetSPDKClient, err := GetServiceClient(net.JoinHostPort(e.IP, strconv.Itoa(types.SPDKServicePort))) + targetSPDKServiceAddress := net.JoinHostPort(e.IP, strconv.Itoa(types.SPDKServicePort)) + targetSPDKClient, err := GetServiceClient(targetSPDKServiceAddress) if err != nil { return nil, err } - defer targetSPDKClient.Close() + defer func() { + if errClose := targetSPDKClient.Close(); errClose != nil { + e.log.WithError(errClose).Errorf("Failed to close target spdk client with address %s", targetSPDKServiceAddress) + } + }() var engineWithTarget *api.Engine if initiatorIP != targetIP { @@ -642,6 +647,7 @@ func (e *Engine) checkAndUpdateInfoFromReplicaNoLock() { replicaAncestorMap := map[string]*api.Lvol{} // hasBackingImage := false hasSnapshot := false + for replicaName, address := range e.ReplicaAddressMap { if e.ReplicaModeMap[replicaName] != types.ModeRW { continue @@ -651,58 +657,66 @@ func (e *Engine) checkAndUpdateInfoFromReplicaNoLock() { e.log.WithError(err).Warnf("failed to get service client for replica %s with address %s, will skip this replica and continue info update from replica", replicaName, address) continue } - defer replicaServiceCli.Close() - replica, err := replicaServiceCli.ReplicaGet(replicaName) - if err != nil { - e.log.WithError(err).Warnf("Failed to get replica %s with address %s, mark the mode from %v to ERR", replicaName, address, e.ReplicaModeMap[replicaName]) - e.ReplicaModeMap[replicaName] = types.ModeERR - continue - } - if e.ReplicaModeMap[replicaName] == types.ModeWO { - shallowCopyStatus, err := replicaServiceCli.ReplicaRebuildingDstShallowCopyCheck(replicaName) + // Ensure the replica is not rebuilding + func() { + defer func() { + if errClose := replicaServiceCli.Close(); errClose != nil { + e.log.WithError(errClose).Errorf("Failed to close replica %s client with address %s", replicaName, address) + } + }() + + replica, err := replicaServiceCli.ReplicaGet(replicaName) if err != nil { - e.log.WithError(err).Warnf("failed to get rebuilding replica %s shallow copy info, will skip this replica and continue info update from replica", replicaName) - continue - } - if shallowCopyStatus.TotalState == helpertypes.ShallowCopyStateError || shallowCopyStatus.Error != "" { - e.log.Errorf("Engine found rebuilding replica %s error %v during info update from replica, will mark the mode from WO to ERR and continue info update from replica", replicaName, shallowCopyStatus.Error) + e.log.WithError(err).Warnf("Failed to get replica %s with address %s, mark the mode from %v to ERR", replicaName, address, e.ReplicaModeMap[replicaName]) e.ReplicaModeMap[replicaName] = types.ModeERR + return } - // No need to do anything if `shallowCopyStatus.TotalState == helpertypes.ShallowCopyStateComplete`, engine should leave the rebuilding logic to update its mode - continue - } - - // The ancestor check sequence: the backing image, then the oldest snapshot, finally head - // TODO: Check the backing image first - - // if replica.BackingImage != nil { - // hasBackingImage = true - // replicaAncestorMap[replicaName] = replica.BackingImage - // } else - if len(replica.Snapshots) != 0 { - // if hasBackingImage { - // e.log.Warnf("Found replica %s does not have a backing image while other replicas have during info update from replica", replicaName) - // } else {} - hasSnapshot = true - for snapshotName, snapApiLvol := range replica.Snapshots { - if snapApiLvol.Parent == "" { - replicaAncestorMap[replicaName] = replica.Snapshots[snapshotName] - break + if e.ReplicaModeMap[replicaName] == types.ModeWO { + shallowCopyStatus, err := replicaServiceCli.ReplicaRebuildingDstShallowCopyCheck(replicaName) + if err != nil { + e.log.WithError(err).Warnf("failed to get rebuilding replica %s shallow copy info, will skip this replica and continue info update from replica", replicaName) + return + } + if shallowCopyStatus.TotalState == helpertypes.ShallowCopyStateError || shallowCopyStatus.Error != "" { + e.log.Errorf("Engine found rebuilding replica %s error %v during info update from replica, will mark the mode from WO to ERR and continue info update from replica", replicaName, shallowCopyStatus.Error) + e.ReplicaModeMap[replicaName] = types.ModeERR } + // No need to do anything if `shallowCopyStatus.TotalState == helpertypes.ShallowCopyStateComplete`, engine should leave the rebuilding logic to update its mode + return } - } else { - if hasSnapshot { - e.log.Warnf("Found replica %s does not have a snapshot while other replicas have during info update from replica", replicaName) + + // The ancestor check sequence: the backing image, then the oldest snapshot, finally head + // TODO: Check the backing image first + + // if replica.BackingImage != nil { + // hasBackingImage = true + // replicaAncestorMap[replicaName] = replica.BackingImage + // } else + if len(replica.Snapshots) != 0 { + // if hasBackingImage { + // e.log.Warnf("Found replica %s does not have a backing image while other replicas have during info update from replica", replicaName) + // } else {} + hasSnapshot = true + for snapshotName, snapApiLvol := range replica.Snapshots { + if snapApiLvol.Parent == "" { + replicaAncestorMap[replicaName] = replica.Snapshots[snapshotName] + break + } + } } else { - replicaAncestorMap[replicaName] = replica.Head + if hasSnapshot { + e.log.Warnf("Found replica %s does not have a snapshot while other replicas have during info update from replica", replicaName) + } else { + replicaAncestorMap[replicaName] = replica.Head + } } - } - if replicaAncestorMap[replicaName] == nil { - e.log.Warnf("Cannot find replica %s ancestor, will skip this replica and continue info update from replica", replicaName) - continue - } - replicaMap[replicaName] = replica + if replicaAncestorMap[replicaName] == nil { + e.log.Warnf("Cannot find replica %s ancestor, will skip this replica and continue info update from replica", replicaName) + return + } + replicaMap[replicaName] = replica + }() } // If there are multiple candidates, the priority is: @@ -922,32 +936,23 @@ func (e *Engine) ReplicaAdd(spdkClient *spdkclient.Client, dstReplicaName, dstRe if err != nil { return err } - srcReplicaServiceCli, err := GetServiceClient(srcReplicaAddress) - if err != nil { - return err - } - defer func() { - if err != nil { - srcReplicaServiceCli.Close() - } - }() - dstReplicaServiceCli, err := GetServiceClient(dstReplicaAddress) + + srcReplicaServiceCli, dstReplicaServiceCli, err := e.getSrcAndDstReplicaClients(srcReplicaName, srcReplicaAddress, dstReplicaName, dstReplicaAddress) if err != nil { return err } - defer func() { - if err != nil { - dstReplicaServiceCli.Close() - } - }() var rebuildingSnapshotList []*api.Lvol // Need to make sure the replica clients available before set this deferred goroutine defer func() { go func() { defer func() { - srcReplicaServiceCli.Close() - dstReplicaServiceCli.Close() + if errClose := srcReplicaServiceCli.Close(); errClose != nil { + e.log.WithError(errClose).Errorf("Failed to close source replica %s (%s) client", srcReplicaName, srcReplicaAddress) + } + if errClose := dstReplicaServiceCli.Close(); errClose != nil { + e.log.WithError(errClose).Errorf("Failed to close dest replica %s (%s) client", dstReplicaName, dstReplicaAddress) + } }() if err == nil && engineErr == nil { @@ -1026,6 +1031,32 @@ func (e *Engine) ReplicaAdd(spdkClient *spdkclient.Client, dstReplicaName, dstRe return nil } +func (e *Engine) getSrcAndDstReplicaClients(srcReplicaName, srcReplicaAddress, dstReplicaName, dstReplicaAddress string) (srcReplicaServiceCli, dstReplicaServiceCli *client.SPDKClient, err error) { + defer func() { + if err != nil { + if srcReplicaServiceCli != nil { + if errClose := srcReplicaServiceCli.Close(); errClose != nil { + e.log.WithError(errClose).Errorf("Failed to close source replica %s client with address %s", srcReplicaName, srcReplicaAddress) + } + } + if dstReplicaServiceCli != nil { + if errClose := dstReplicaServiceCli.Close(); errClose != nil { + e.log.WithError(errClose).Errorf("Failed to close dest replica %s client with address %s", dstReplicaName, dstReplicaAddress) + } + } + srcReplicaServiceCli = nil + dstReplicaServiceCli = nil + } + }() + + srcReplicaServiceCli, err = GetServiceClient(srcReplicaAddress) + if err != nil { + return + } + dstReplicaServiceCli, err = GetServiceClient(dstReplicaAddress) + return +} + func (e *Engine) replicaShallowCopy(srcReplicaServiceCli, dstReplicaServiceCli *client.SPDKClient, srcReplicaName, dstReplicaName string, rebuildingSnapshotList []*api.Lvol) (err error) { updateRequired := false defer func() { @@ -1406,7 +1437,11 @@ func (e *Engine) getReplicaClients() (replicaClients map[string]*client.SPDKClie func (e *Engine) closeRplicaClients(replicaClients map[string]*client.SPDKClient) { for replicaName := range replicaClients { - replicaClients[replicaName].Close() + if replicaClients[replicaName] != nil { + if errClose := replicaClients[replicaName].Close(); errClose != nil { + e.log.WithError(errClose).Errorf("Failed to close replica %s client", replicaName) + } + } } } @@ -1545,15 +1580,22 @@ func (e *Engine) ReplicaList(spdkClient *spdkclient.Client) (ret map[string]*api e.log.WithError(err).Errorf("Failed to get service client for replica %s with address %s", name, address) continue } - defer replicaServiceCli.Close() - replica, err := replicaServiceCli.ReplicaGet(name) - if err != nil { - e.log.WithError(err).Errorf("Failed to get replica %s with address %s", name, address) - continue - } + func() { + defer func() { + if errClose := replicaServiceCli.Close(); errClose != nil { + e.log.WithError(errClose).Errorf("Failed to close replica %s client with address %s", name, address) + } + }() - replicas[name] = replica + replica, err := replicaServiceCli.ReplicaGet(name) + if err != nil { + e.log.WithError(err).Errorf("Failed to get replica %s with address %s", name, address) + return + } + + replicas[name] = replica + }() } return replicas, nil @@ -1600,7 +1642,11 @@ func (e *Engine) BackupCreate(backupName, volumeName, engineName, snapshotName, if err != nil { return nil, grpcstatus.Errorf(grpccodes.Internal, err.Error()) } - defer replicaServiceCli.Close() + defer func() { + if errClose := replicaServiceCli.Close(); errClose != nil { + e.log.WithError(errClose).Errorf("Failed to close replica %s client with address %s", replicaName, replicaAddress) + } + }() recv, err := replicaServiceCli.ReplicaBackupCreate(&client.BackupCreateRequest{ BackupName: backupName, @@ -1650,7 +1696,11 @@ func (e *Engine) BackupStatus(backupName, replicaAddress string) (*spdkrpc.Backu if err != nil { return nil, grpcstatus.Errorf(grpccodes.Internal, err.Error()) } - defer replicaServiceCli.Close() + defer func() { + if errClose := replicaServiceCli.Close(); errClose != nil { + e.log.WithError(errClose).Errorf("Failed to close replica client with address %s", replicaAddress) + } + }() return replicaServiceCli.ReplicaBackupStatus(backupName) } @@ -1694,23 +1744,30 @@ func (e *Engine) BackupRestore(spdkClient *spdkclient.Client, backupUrl, engineN replicaServiceCli, err := GetServiceClient(replicaAddress) if err != nil { - e.log.WithError(err).Errorf("Failed to restore backup on replica %s address %s", replicaName, replicaAddress) + e.log.WithError(err).Errorf("Failed to restore backup on replica %s with address %s", replicaName, replicaAddress) resp.Errors[replicaAddress] = err.Error() continue } - defer replicaServiceCli.Close() - err = replicaServiceCli.ReplicaBackupRestore(&client.BackupRestoreRequest{ - BackupUrl: backupUrl, - ReplicaName: replicaName, - SnapshotName: snapshotName, - Credential: credential, - ConcurrentLimit: concurrentLimit, - }) - if err != nil { - e.log.WithError(err).Errorf("Failed to restore backup on replica %s address %s", replicaName, replicaAddress) - resp.Errors[replicaAddress] = err.Error() - } + func() { + defer func() { + if errClose := replicaServiceCli.Close(); errClose != nil { + e.log.WithError(errClose).Errorf("Failed to close replica %s client with address %s", replicaName, replicaAddress) + } + }() + + err = replicaServiceCli.ReplicaBackupRestore(&client.BackupRestoreRequest{ + BackupUrl: backupUrl, + ReplicaName: replicaName, + SnapshotName: snapshotName, + Credential: credential, + ConcurrentLimit: concurrentLimit, + }) + if err != nil { + e.log.WithError(err).Errorf("Failed to restore backup on replica %s address %s", replicaName, replicaAddress) + resp.Errors[replicaAddress] = err.Error() + } + }() } return resp, nil @@ -1762,13 +1819,7 @@ func (e *Engine) RestoreStatus() (*spdkrpc.RestoreStatusResponse, error) { continue } - replicaServiceCli, err := GetServiceClient(replicaAddress) - if err != nil { - return nil, err - } - defer replicaServiceCli.Close() - - status, err := replicaServiceCli.ReplicaRestoreStatus(replicaName) + status, err := e.getReplicaRestoreStatus(replicaName, replicaAddress) if err != nil { return nil, err } @@ -1778,6 +1829,25 @@ func (e *Engine) RestoreStatus() (*spdkrpc.RestoreStatusResponse, error) { return resp, nil } +func (e *Engine) getReplicaRestoreStatus(replicaName, replicaAddress string) (*spdkrpc.ReplicaRestoreStatusResponse, error) { + replicaServiceCli, err := GetServiceClient(replicaAddress) + if err != nil { + return nil, err + } + defer func() { + if errClose := replicaServiceCli.Close(); errClose != nil { + e.log.WithError(errClose).Errorf("Failed to close replica client with address %s", replicaAddress) + } + }() + + status, err := replicaServiceCli.ReplicaRestoreStatus(replicaName) + if err != nil { + return nil, err + } + + return status, nil +} + // Suspend suspends the engine. IO operations will be suspended. func (e *Engine) Suspend(spdkClient *spdkclient.Client) (err error) { e.Lock() diff --git a/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/replica.go b/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/replica.go index 4e6d8b163..6080563d1 100644 --- a/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/replica.go +++ b/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/replica.go @@ -1563,7 +1563,11 @@ func (r *Replica) RebuildingDstShallowCopyStart(spdkClient *spdkclient.Client, s if err != nil { return err } - defer srcReplicaServiceCli.Close() + defer func() { + if errClose := srcReplicaServiceCli.Close(); errClose != nil { + r.log.WithError(errClose).Errorf("Failed to close replica %s client with address %s", r.rebuildingDstCache.srcReplicaName, r.rebuildingDstCache.srcReplicaAddress) + } + }() r.rebuildingDstCache.processingOpID, err = srcReplicaServiceCli.ReplicaRebuildingSrcShallowCopyStart(r.rebuildingDstCache.srcReplicaName, snapshotName) return err @@ -1619,7 +1623,11 @@ func (r *Replica) RebuildingDstShallowCopyCheck(spdkClient *spdkclient.Client) ( if err != nil { return nil, err } - defer srcReplicaServiceCli.Close() + defer func() { + if errClose := srcReplicaServiceCli.Close(); errClose != nil { + r.log.WithError(errClose).Errorf("Failed to close replica %s client with address %s", r.rebuildingDstCache.srcReplicaName, r.rebuildingDstCache.srcReplicaAddress) + } + }() state, copiedClusters, totalClusters, errorMsg, err := srcReplicaServiceCli.ReplicaRebuildingSrcShallowCopyCheck(r.rebuildingDstCache.srcReplicaName, r.Name, r.rebuildingDstCache.processingSnapshotName, r.rebuildingDstCache.processingOpID) if err != nil { diff --git a/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/server.go b/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/server.go index 0f2820976..0ed65eb8b 100644 --- a/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/server.go +++ b/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/server.go @@ -248,20 +248,23 @@ func (s *Server) verify() (err error) { replicaMapForSync[lvolName] = replicaMap[lvolName] } for replicaName, r := range replicaMap { - if r.ChainLength < 2 { - delete(replicaMap, replicaName) - continue - } - headSvcLvol := r.ActiveChain[r.ChainLength-1] - if headSvcLvol == nil { - delete(replicaMap, replicaName) - continue - } - // TODO: How to handle a broken replica without a head lvol - if bdevLvolMap[headSvcLvol.Name] == nil { - delete(replicaMap, replicaName) - continue - } + func() { + r.RLock() + defer r.RUnlock() + if r.ChainLength < 2 { + delete(replicaMap, replicaName) + return + } + headSvcLvol := r.ActiveChain[r.ChainLength-1] + if headSvcLvol == nil { + delete(replicaMap, replicaName) + return + } + // TODO: How to handle a broken replica without a head lvol + if bdevLvolMap[headSvcLvol.Name] == nil { + delete(replicaMap, replicaName) + } + }() } s.replicaMap = replicaMap s.Unlock() @@ -596,11 +599,14 @@ func (s *Server) ReplicaRebuildingSrcAttach(ctx context.Context, req *spdkrpc.Re return nil, grpcstatus.Error(grpccodes.InvalidArgument, "dst replica name and dst rebuilding lvol address are required") } + logrus.Infof("Debug =====> ReplicaRebuildingSrcAttach") s.RLock() r := s.replicaMap[req.Name] spdkClient := s.spdkClient s.RUnlock() + logrus.Infof("Debug =====> ReplicaRebuildingSrcAttach lock acquired") + if r == nil { return nil, grpcstatus.Errorf(grpccodes.NotFound, "cannot find replica %s during rebuilding src attach", req.Name) } diff --git a/vendor/modules.txt b/vendor/modules.txt index afe0856c4..3717c2d17 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -230,7 +230,7 @@ github.com/longhorn/longhorn-engine/pkg/sync github.com/longhorn/longhorn-engine/pkg/types github.com/longhorn/longhorn-engine/pkg/util github.com/longhorn/longhorn-engine/pkg/util/disk -# github.com/longhorn/longhorn-spdk-engine v0.0.0-20240811140223-087cddee60f1 +# github.com/longhorn/longhorn-spdk-engine v0.0.0-20240811140223-087cddee60f1 => github.com/derekbit/longhorn-spdk-engine v0.0.0-20240812162349-6c3834620dfa ## explicit; go 1.22.0 github.com/longhorn/longhorn-spdk-engine/pkg/api github.com/longhorn/longhorn-spdk-engine/pkg/client