From 33c028bef846f95f113bb7da67d44c8489936502 Mon Sep 17 00:00:00 2001 From: Eric Weber Date: Mon, 26 Jun 2023 15:57:05 -0500 Subject: [PATCH] Use new identity validation flags and fields Signed-off-by: Eric Weber --- controller/engine_controller.go | 3 ++- engineapi/enginesim.go | 3 ++- engineapi/instance_manager.go | 14 +++++++++++++- engineapi/proxy_backup.go | 15 +++++++++------ engineapi/proxy_metrics.go | 2 +- engineapi/proxy_replica.go | 13 ++++++++----- engineapi/proxy_snapshot.go | 33 ++++++++++++++++++++++----------- engineapi/proxy_volume.go | 14 +++++++++----- engineapi/snapshot.go | 8 +++++--- engineapi/types.go | 2 +- 10 files changed, 72 insertions(+), 35 deletions(-) diff --git a/controller/engine_controller.go b/controller/engine_controller.go index b7da4ede8b..a0e5e45108 100644 --- a/controller/engine_controller.go +++ b/controller/engine_controller.go @@ -1565,7 +1565,8 @@ func cloneSnapshot(engine *longhorn.Engine, engineClientProxy engineapi.EngineCl } sourceEngineControllerURL := imutil.GetURL(sourceEngine.Status.StorageIP, sourceEngine.Status.Port) - if err := engineClientProxy.SnapshotClone(engine, snapshotName, sourceEngineControllerURL, fileSyncHTTPClientTimeout); err != nil { + if err := engineClientProxy.SnapshotClone(engine, snapshotName, sourceEngineControllerURL, sourceEngine.Name, + fileSyncHTTPClientTimeout); err != nil { // There is only 1 replica during volume cloning, // so if the cloning failed, it must be that the replica failed to clone. for _, status := range engine.Status.CloneStatus { diff --git a/engineapi/enginesim.go b/engineapi/enginesim.go index 33516480c1..c9e63162a8 100644 --- a/engineapi/enginesim.go +++ b/engineapi/enginesim.go @@ -216,7 +216,8 @@ func (e *EngineSimulator) BackupRestore(engine *longhorn.Engine, backupTarget, b return fmt.Errorf(ErrNotImplement) } -func (e *EngineSimulator) SnapshotClone(engine *longhorn.Engine, snapshotName, fromControllerAddress string, fileSyncHTTPClientTimeout int64) error { +func (e *EngineSimulator) SnapshotClone(engine *longhorn.Engine, snapshotName, fromEngineAddress, + fromEngineName string, fileSyncHTTPClientTimeout int64) error { return fmt.Errorf(ErrNotImplement) } diff --git a/engineapi/instance_manager.go b/engineapi/instance_manager.go index 0a647671b8..a1a3917f76 100644 --- a/engineapi/instance_manager.go +++ b/engineapi/instance_manager.go @@ -336,6 +336,10 @@ func getBinaryAndArgsForEngineProcessCreation(e *longhorn.Engine, } } + if engineCLIAPIVersion >= 9 { + args = append([]string{"--engine-instance-name", e.Name}, args...) + } + for _, addr := range e.Status.CurrentReplicaAddressMap { args = append(args, "--replica", GetBackendReplicaURL(addr)) } @@ -358,7 +362,10 @@ func getBinaryAndArgsForReplicaProcessCreation(r *longhorn.Replica, args = append(args, "--disableRevCounter") } if engineCLIAPIVersion >= 7 { - args = append(args, "--volume-name", r.Spec.VolumeName) + if engineCLIAPIVersion < 9 { + // Replaced by the global --volume-name flag when engineCLIAPIVersion == 9. + args = append(args, "--volume-name", r.Spec.VolumeName) + } if dataLocality == longhorn.DataLocalityStrictLocal { args = append(args, "--data-server-protocol", "unix") @@ -369,6 +376,11 @@ func getBinaryAndArgsForReplicaProcessCreation(r *longhorn.Replica, } } + if engineCLIAPIVersion >= 9 { + args = append(args, "--replica-instance-name", r.Name) + args = append([]string{"--volume-name", r.Spec.VolumeName}, args...) + } + // 3 ports are already used by replica server, data server and syncagent server syncAgentPortCount := portCount - 3 args = append(args, "--sync-agent-port-count", strconv.Itoa(syncAgentPortCount)) diff --git a/engineapi/proxy_backup.go b/engineapi/proxy_backup.go index 194ad3072a..b82c6f0b68 100644 --- a/engineapi/proxy_backup.go +++ b/engineapi/proxy_backup.go @@ -37,9 +37,9 @@ func (p *Proxy) SnapshotBackup(e *longhorn.Engine, snapshotName, backupName, bac return "", "", err } - backupID, replicaAddress, err := p.grpcClient.SnapshotBackup(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e), - backupName, snapshotName, backupTarget, backingImageName, backingImageChecksum, - compressionMethod, concurrentLimit, storageClassName, labels, credentialEnv, + backupID, replicaAddress, err := p.grpcClient.SnapshotBackup(string(e.Spec.BackendStoreDriver), e.Name, + e.Spec.VolumeName, p.DirectToURL(e), backupName, snapshotName, backupTarget, backingImageName, + backingImageChecksum, compressionMethod, concurrentLimit, storageClassName, labels, credentialEnv, ) if err != nil { return "", "", err @@ -49,7 +49,8 @@ func (p *Proxy) SnapshotBackup(e *longhorn.Engine, snapshotName, backupName, bac } func (p *Proxy) SnapshotBackupStatus(e *longhorn.Engine, backupName, replicaAddress string) (status *longhorn.EngineBackupStatus, err error) { - recv, err := p.grpcClient.SnapshotBackupStatus(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e), backupName, replicaAddress) + recv, err := p.grpcClient.SnapshotBackupStatus(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName, + p.DirectToURL(e), backupName, replicaAddress, "") if err != nil { return nil, err } @@ -67,11 +68,13 @@ func (p *Proxy) BackupRestore(e *longhorn.Engine, backupTarget, backupName, back return err } - return p.grpcClient.BackupRestore(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e), backupURL, backupTarget, backupVolumeName, envs, concurrentLimit) + return p.grpcClient.BackupRestore(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName, p.DirectToURL(e), + backupURL, backupTarget, backupVolumeName, envs, concurrentLimit) } func (p *Proxy) BackupRestoreStatus(e *longhorn.Engine) (status map[string]*longhorn.RestoreStatus, err error) { - recv, err := p.grpcClient.BackupRestoreStatus(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e)) + recv, err := p.grpcClient.BackupRestoreStatus(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName, + p.DirectToURL(e)) if err != nil { return nil, err } diff --git a/engineapi/proxy_metrics.go b/engineapi/proxy_metrics.go index 7fdc1f5851..56ff4b0db9 100644 --- a/engineapi/proxy_metrics.go +++ b/engineapi/proxy_metrics.go @@ -5,7 +5,7 @@ import ( ) func (p *Proxy) MetricsGet(e *longhorn.Engine) (*Metrics, error) { - metrics, err := p.grpcClient.MetricsGet(p.DirectToURL(e)) + metrics, err := p.grpcClient.MetricsGet(e.Name, e.Spec.VolumeName, p.DirectToURL(e)) if err != nil { return nil, err } diff --git a/engineapi/proxy_replica.go b/engineapi/proxy_replica.go index cd4fa4f07a..c48f7009be 100644 --- a/engineapi/proxy_replica.go +++ b/engineapi/proxy_replica.go @@ -5,7 +5,9 @@ import ( ) func (p *Proxy) ReplicaAdd(e *longhorn.Engine, replicaName, replicaAddress string, restore, fastSync bool, replicaFileSyncHTTPClientTimeout int64) (err error) { - return p.grpcClient.ReplicaAdd(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e), replicaName, replicaAddress, restore, e.Spec.VolumeSize, e.Status.CurrentSize, int(replicaFileSyncHTTPClientTimeout), fastSync) + return p.grpcClient.ReplicaAdd(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName, p.DirectToURL(e), + replicaName, replicaAddress, restore, e.Spec.VolumeSize, e.Status.CurrentSize, + int(replicaFileSyncHTTPClientTimeout), fastSync) } func (p *Proxy) ReplicaRemove(e *longhorn.Engine, address string) (err error) { @@ -13,7 +15,8 @@ func (p *Proxy) ReplicaRemove(e *longhorn.Engine, address string) (err error) { } func (p *Proxy) ReplicaList(e *longhorn.Engine) (replicas map[string]*Replica, err error) { - resp, err := p.grpcClient.ReplicaList(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e)) + resp, err := p.grpcClient.ReplicaList(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName, + p.DirectToURL(e)) if err != nil { return nil, err } @@ -33,7 +36,8 @@ func (p *Proxy) ReplicaList(e *longhorn.Engine) (replicas map[string]*Replica, e } func (p *Proxy) ReplicaRebuildStatus(e *longhorn.Engine) (status map[string]*longhorn.RebuildStatus, err error) { - recv, err := p.grpcClient.ReplicaRebuildingStatus(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e)) + recv, err := p.grpcClient.ReplicaRebuildingStatus(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName, + p.DirectToURL(e)) if err != nil { return nil, err } @@ -49,8 +53,7 @@ func (p *Proxy) ReplicaRebuildVerify(e *longhorn.Engine, url string) (err error) if err := ValidateReplicaURL(url); err != nil { return err } - - return p.grpcClient.ReplicaVerifyRebuild(string(e.Spec.BackendStoreDriver), p.DirectToURL(e), url) + return p.grpcClient.ReplicaVerifyRebuild(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName, p.DirectToURL(e), url, "") } func (p *Proxy) ReplicaModeUpdate(e *longhorn.Engine, url, mode string) (err error) { diff --git a/engineapi/proxy_snapshot.go b/engineapi/proxy_snapshot.go index e28d742a61..4d65e09283 100644 --- a/engineapi/proxy_snapshot.go +++ b/engineapi/proxy_snapshot.go @@ -5,11 +5,13 @@ import ( ) func (p *Proxy) SnapshotCreate(e *longhorn.Engine, name string, labels map[string]string) (string, error) { - return p.grpcClient.VolumeSnapshot(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e), name, labels) + return p.grpcClient.VolumeSnapshot(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName, p.DirectToURL(e), + name, labels) } func (p *Proxy) SnapshotList(e *longhorn.Engine) (snapshots map[string]*longhorn.SnapshotInfo, err error) { - recv, err := p.grpcClient.SnapshotList(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e)) + recv, err := p.grpcClient.SnapshotList(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName, + p.DirectToURL(e)) if err != nil { return nil, err } @@ -30,12 +32,15 @@ func (p *Proxy) SnapshotGet(e *longhorn.Engine, name string) (snapshot *longhorn return recv[name], nil } -func (p *Proxy) SnapshotClone(e *longhorn.Engine, snapshotName, fromController string, fileSyncHTTPClientTimeout int64) (err error) { - return p.grpcClient.SnapshotClone(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e), snapshotName, fromController, int(fileSyncHTTPClientTimeout)) +func (p *Proxy) SnapshotClone(e *longhorn.Engine, snapshotName, fromEngineAddress, fromEngineName string, + fileSyncHTTPClientTimeout int64) (err error) { + return p.grpcClient.SnapshotClone(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName, p.DirectToURL(e), + snapshotName, fromEngineAddress, fromEngineName, int(fileSyncHTTPClientTimeout)) } func (p *Proxy) SnapshotCloneStatus(e *longhorn.Engine) (status map[string]*longhorn.SnapshotCloneStatus, err error) { - recv, err := p.grpcClient.SnapshotCloneStatus(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e)) + recv, err := p.grpcClient.SnapshotCloneStatus(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName, + p.DirectToURL(e)) if err != nil { return nil, err } @@ -48,15 +53,18 @@ func (p *Proxy) SnapshotCloneStatus(e *longhorn.Engine) (status map[string]*long } func (p *Proxy) SnapshotRevert(e *longhorn.Engine, snapshotName string) (err error) { - return p.grpcClient.SnapshotRevert(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e), snapshotName) + return p.grpcClient.SnapshotRevert(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName, p.DirectToURL(e), + snapshotName) } func (p *Proxy) SnapshotPurge(e *longhorn.Engine) (err error) { - return p.grpcClient.SnapshotPurge(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e), true) + return p.grpcClient.SnapshotPurge(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName, p.DirectToURL(e), + true) } func (p *Proxy) SnapshotPurgeStatus(e *longhorn.Engine) (status map[string]*longhorn.PurgeStatus, err error) { - recv, err := p.grpcClient.SnapshotPurgeStatus(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e)) + recv, err := p.grpcClient.SnapshotPurgeStatus(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName, + p.DirectToURL(e)) if err != nil { return nil, err } @@ -69,15 +77,18 @@ func (p *Proxy) SnapshotPurgeStatus(e *longhorn.Engine) (status map[string]*long } func (p *Proxy) SnapshotDelete(e *longhorn.Engine, name string) (err error) { - return p.grpcClient.SnapshotRemove(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e), []string{name}) + return p.grpcClient.SnapshotRemove(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName, p.DirectToURL(e), + []string{name}) } func (p *Proxy) SnapshotHash(e *longhorn.Engine, snapshotName string, rehash bool) error { - return p.grpcClient.SnapshotHash(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e), snapshotName, rehash) + return p.grpcClient.SnapshotHash(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName, p.DirectToURL(e), + snapshotName, rehash) } func (p *Proxy) SnapshotHashStatus(e *longhorn.Engine, snapshotName string) (status map[string]*longhorn.HashStatus, err error) { - recv, err := p.grpcClient.SnapshotHashStatus(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e), snapshotName) + recv, err := p.grpcClient.SnapshotHashStatus(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName, + p.DirectToURL(e), snapshotName) if err != nil { return nil, err } diff --git a/engineapi/proxy_volume.go b/engineapi/proxy_volume.go index a5bd1fee83..cb477effd2 100644 --- a/engineapi/proxy_volume.go +++ b/engineapi/proxy_volume.go @@ -7,7 +7,7 @@ import ( ) func (p *Proxy) VolumeGet(e *longhorn.Engine) (volume *Volume, err error) { - recv, err := p.grpcClient.VolumeGet(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e)) + recv, err := p.grpcClient.VolumeGet(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName, p.DirectToURL(e)) if err != nil { return nil, err } @@ -16,7 +16,8 @@ func (p *Proxy) VolumeGet(e *longhorn.Engine) (volume *Volume, err error) { } func (p *Proxy) VolumeExpand(e *longhorn.Engine) (err error) { - return p.grpcClient.VolumeExpand(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e), e.Spec.VolumeSize) + return p.grpcClient.VolumeExpand(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName, p.DirectToURL(e), + e.Spec.VolumeSize) } func (p *Proxy) VolumeFrontendStart(e *longhorn.Engine) (err error) { @@ -29,13 +30,16 @@ func (p *Proxy) VolumeFrontendStart(e *longhorn.Engine) (err error) { return fmt.Errorf("cannot start empty frontend") } - return p.grpcClient.VolumeFrontendStart(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e), frontendName) + return p.grpcClient.VolumeFrontendStart(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName, + p.DirectToURL(e), frontendName) } func (p *Proxy) VolumeFrontendShutdown(e *longhorn.Engine) (err error) { - return p.grpcClient.VolumeFrontendShutdown(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e)) + return p.grpcClient.VolumeFrontendShutdown(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName, + p.DirectToURL(e)) } func (p *Proxy) VolumeUnmapMarkSnapChainRemovedSet(e *longhorn.Engine) error { - return p.grpcClient.VolumeUnmapMarkSnapChainRemovedSet(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e), e.Spec.UnmapMarkSnapChainRemovedEnabled) + return p.grpcClient.VolumeUnmapMarkSnapChainRemovedSet(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName, + p.DirectToURL(e), e.Spec.UnmapMarkSnapChainRemovedEnabled) } diff --git a/engineapi/snapshot.go b/engineapi/snapshot.go index b32bd3c19b..ca8beee2d6 100644 --- a/engineapi/snapshot.go +++ b/engineapi/snapshot.go @@ -105,12 +105,14 @@ func (e *EngineBinary) SnapshotPurgeStatus(*longhorn.Engine) (map[string]*longho // SnapshotClone calls engine binary // TODO: Deprecated, replaced by gRPC proxy -func (e *EngineBinary) SnapshotClone(engine *longhorn.Engine, snapshotName, fromControllerAddress string, fileSyncHTTPClientTimeout int64) error { - args := []string{"snapshot", "clone", "--snapshot-name", snapshotName, "--from-controller-address", fromControllerAddress} +func (e *EngineBinary) SnapshotClone(engine *longhorn.Engine, snapshotName, fromEngineAddress, fromEngineName string, + fileSyncHTTPClientTimeout int64) error { + args := []string{"snapshot", "clone", "--snapshot-name", snapshotName, "--from-controller-address", + fromEngineAddress, "--from-controller-instance-name", fromEngineName} if _, err := e.ExecuteEngineBinaryWithoutTimeout([]string{}, args...); err != nil { return errors.Wrapf(err, "error starting snapshot clone") } - logrus.Debugf("Cloned snapshot %v from volume %v to volume %v", snapshotName, fromControllerAddress, e.cURL) + logrus.Debugf("Cloned snapshot %v from volume %v to volume %v", snapshotName, fromEngineAddress, e.cURL) return nil } diff --git a/engineapi/types.go b/engineapi/types.go index a1592095c1..422ced9b1c 100644 --- a/engineapi/types.go +++ b/engineapi/types.go @@ -96,7 +96,7 @@ type EngineClient interface { SnapshotBackup(engine *longhorn.Engine, backupName, snapName, backupTarget, backingImageName, backingImageChecksum, compressionMethod string, concurrentLimit int, storageClassName string, labels, credential map[string]string) (string, string, error) SnapshotBackupStatus(engine *longhorn.Engine, backupName, replicaAddress string) (*longhorn.EngineBackupStatus, error) SnapshotCloneStatus(engine *longhorn.Engine) (map[string]*longhorn.SnapshotCloneStatus, error) - SnapshotClone(engine *longhorn.Engine, snapshotName, fromControllerAddress string, fileSyncHTTPClientTimeout int64) error + SnapshotClone(engine *longhorn.Engine, snapshotName, fromEngineAddress, fromEngineName string, fileSyncHTTPClientTimeout int64) error SnapshotHash(engine *longhorn.Engine, snapshotName string, rehash bool) error SnapshotHashStatus(engine *longhorn.Engine, snapshotName string) (map[string]*longhorn.HashStatus, error)