Skip to content

Commit

Permalink
Use new identity validation flags and fields
Browse files Browse the repository at this point in the history
Signed-off-by: Eric Weber <eric.weber@suse.com>
  • Loading branch information
ejweber committed Jul 7, 2023
1 parent 68646ee commit 33c028b
Show file tree
Hide file tree
Showing 10 changed files with 72 additions and 35 deletions.
3 changes: 2 additions & 1 deletion controller/engine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1565,7 +1565,8 @@ func cloneSnapshot(engine *longhorn.Engine, engineClientProxy engineapi.EngineCl
}

sourceEngineControllerURL := imutil.GetURL(sourceEngine.Status.StorageIP, sourceEngine.Status.Port)
if err := engineClientProxy.SnapshotClone(engine, snapshotName, sourceEngineControllerURL, fileSyncHTTPClientTimeout); err != nil {
if err := engineClientProxy.SnapshotClone(engine, snapshotName, sourceEngineControllerURL, sourceEngine.Name,
fileSyncHTTPClientTimeout); err != nil {
// There is only 1 replica during volume cloning,
// so if the cloning failed, it must be that the replica failed to clone.
for _, status := range engine.Status.CloneStatus {
Expand Down
3 changes: 2 additions & 1 deletion engineapi/enginesim.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ func (e *EngineSimulator) BackupRestore(engine *longhorn.Engine, backupTarget, b
return fmt.Errorf(ErrNotImplement)
}

func (e *EngineSimulator) SnapshotClone(engine *longhorn.Engine, snapshotName, fromControllerAddress string, fileSyncHTTPClientTimeout int64) error {
func (e *EngineSimulator) SnapshotClone(engine *longhorn.Engine, snapshotName, fromEngineAddress,
fromEngineName string, fileSyncHTTPClientTimeout int64) error {
return fmt.Errorf(ErrNotImplement)
}

Expand Down
14 changes: 13 additions & 1 deletion engineapi/instance_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,10 @@ func getBinaryAndArgsForEngineProcessCreation(e *longhorn.Engine,
}
}

if engineCLIAPIVersion >= 9 {
args = append([]string{"--engine-instance-name", e.Name}, args...)
}

for _, addr := range e.Status.CurrentReplicaAddressMap {
args = append(args, "--replica", GetBackendReplicaURL(addr))
}
Expand All @@ -358,7 +362,10 @@ func getBinaryAndArgsForReplicaProcessCreation(r *longhorn.Replica,
args = append(args, "--disableRevCounter")
}
if engineCLIAPIVersion >= 7 {
args = append(args, "--volume-name", r.Spec.VolumeName)
if engineCLIAPIVersion < 9 {
// Replaced by the global --volume-name flag when engineCLIAPIVersion == 9.
args = append(args, "--volume-name", r.Spec.VolumeName)
}

if dataLocality == longhorn.DataLocalityStrictLocal {
args = append(args, "--data-server-protocol", "unix")
Expand All @@ -369,6 +376,11 @@ func getBinaryAndArgsForReplicaProcessCreation(r *longhorn.Replica,
}
}

if engineCLIAPIVersion >= 9 {
args = append(args, "--replica-instance-name", r.Name)
args = append([]string{"--volume-name", r.Spec.VolumeName}, args...)
}

// 3 ports are already used by replica server, data server and syncagent server
syncAgentPortCount := portCount - 3
args = append(args, "--sync-agent-port-count", strconv.Itoa(syncAgentPortCount))
Expand Down
15 changes: 9 additions & 6 deletions engineapi/proxy_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ func (p *Proxy) SnapshotBackup(e *longhorn.Engine, snapshotName, backupName, bac
return "", "", err
}

backupID, replicaAddress, err := p.grpcClient.SnapshotBackup(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e),
backupName, snapshotName, backupTarget, backingImageName, backingImageChecksum,
compressionMethod, concurrentLimit, storageClassName, labels, credentialEnv,
backupID, replicaAddress, err := p.grpcClient.SnapshotBackup(string(e.Spec.BackendStoreDriver), e.Name,
e.Spec.VolumeName, p.DirectToURL(e), backupName, snapshotName, backupTarget, backingImageName,
backingImageChecksum, compressionMethod, concurrentLimit, storageClassName, labels, credentialEnv,
)
if err != nil {
return "", "", err
Expand All @@ -49,7 +49,8 @@ func (p *Proxy) SnapshotBackup(e *longhorn.Engine, snapshotName, backupName, bac
}

func (p *Proxy) SnapshotBackupStatus(e *longhorn.Engine, backupName, replicaAddress string) (status *longhorn.EngineBackupStatus, err error) {
recv, err := p.grpcClient.SnapshotBackupStatus(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e), backupName, replicaAddress)
recv, err := p.grpcClient.SnapshotBackupStatus(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName,
p.DirectToURL(e), backupName, replicaAddress, "")
if err != nil {
return nil, err
}
Expand All @@ -67,11 +68,13 @@ func (p *Proxy) BackupRestore(e *longhorn.Engine, backupTarget, backupName, back
return err
}

return p.grpcClient.BackupRestore(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e), backupURL, backupTarget, backupVolumeName, envs, concurrentLimit)
return p.grpcClient.BackupRestore(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName, p.DirectToURL(e),
backupURL, backupTarget, backupVolumeName, envs, concurrentLimit)
}

func (p *Proxy) BackupRestoreStatus(e *longhorn.Engine) (status map[string]*longhorn.RestoreStatus, err error) {
recv, err := p.grpcClient.BackupRestoreStatus(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e))
recv, err := p.grpcClient.BackupRestoreStatus(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName,
p.DirectToURL(e))
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion engineapi/proxy_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
)

func (p *Proxy) MetricsGet(e *longhorn.Engine) (*Metrics, error) {
metrics, err := p.grpcClient.MetricsGet(p.DirectToURL(e))
metrics, err := p.grpcClient.MetricsGet(e.Name, e.Spec.VolumeName, p.DirectToURL(e))
if err != nil {
return nil, err
}
Expand Down
13 changes: 8 additions & 5 deletions engineapi/proxy_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,18 @@ import (
)

func (p *Proxy) ReplicaAdd(e *longhorn.Engine, replicaName, replicaAddress string, restore, fastSync bool, replicaFileSyncHTTPClientTimeout int64) (err error) {
return p.grpcClient.ReplicaAdd(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e), replicaName, replicaAddress, restore, e.Spec.VolumeSize, e.Status.CurrentSize, int(replicaFileSyncHTTPClientTimeout), fastSync)
return p.grpcClient.ReplicaAdd(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName, p.DirectToURL(e),
replicaName, replicaAddress, restore, e.Spec.VolumeSize, e.Status.CurrentSize,
int(replicaFileSyncHTTPClientTimeout), fastSync)
}

func (p *Proxy) ReplicaRemove(e *longhorn.Engine, address string) (err error) {
return p.grpcClient.ReplicaRemove(string(e.Spec.BackendStoreDriver), p.DirectToURL(e), e.Name, address, "")
}

func (p *Proxy) ReplicaList(e *longhorn.Engine) (replicas map[string]*Replica, err error) {
resp, err := p.grpcClient.ReplicaList(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e))
resp, err := p.grpcClient.ReplicaList(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName,
p.DirectToURL(e))
if err != nil {
return nil, err
}
Expand All @@ -33,7 +36,8 @@ func (p *Proxy) ReplicaList(e *longhorn.Engine) (replicas map[string]*Replica, e
}

func (p *Proxy) ReplicaRebuildStatus(e *longhorn.Engine) (status map[string]*longhorn.RebuildStatus, err error) {
recv, err := p.grpcClient.ReplicaRebuildingStatus(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e))
recv, err := p.grpcClient.ReplicaRebuildingStatus(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName,
p.DirectToURL(e))
if err != nil {
return nil, err
}
Expand All @@ -49,8 +53,7 @@ func (p *Proxy) ReplicaRebuildVerify(e *longhorn.Engine, url string) (err error)
if err := ValidateReplicaURL(url); err != nil {
return err
}

return p.grpcClient.ReplicaVerifyRebuild(string(e.Spec.BackendStoreDriver), p.DirectToURL(e), url)
return p.grpcClient.ReplicaVerifyRebuild(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName, p.DirectToURL(e), url, "")
}

func (p *Proxy) ReplicaModeUpdate(e *longhorn.Engine, url, mode string) (err error) {
Expand Down
33 changes: 22 additions & 11 deletions engineapi/proxy_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (
)

func (p *Proxy) SnapshotCreate(e *longhorn.Engine, name string, labels map[string]string) (string, error) {
return p.grpcClient.VolumeSnapshot(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e), name, labels)
return p.grpcClient.VolumeSnapshot(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName, p.DirectToURL(e),
name, labels)
}

func (p *Proxy) SnapshotList(e *longhorn.Engine) (snapshots map[string]*longhorn.SnapshotInfo, err error) {
recv, err := p.grpcClient.SnapshotList(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e))
recv, err := p.grpcClient.SnapshotList(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName,
p.DirectToURL(e))
if err != nil {
return nil, err
}
Expand All @@ -30,12 +32,15 @@ func (p *Proxy) SnapshotGet(e *longhorn.Engine, name string) (snapshot *longhorn
return recv[name], nil
}

func (p *Proxy) SnapshotClone(e *longhorn.Engine, snapshotName, fromController string, fileSyncHTTPClientTimeout int64) (err error) {
return p.grpcClient.SnapshotClone(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e), snapshotName, fromController, int(fileSyncHTTPClientTimeout))
func (p *Proxy) SnapshotClone(e *longhorn.Engine, snapshotName, fromEngineAddress, fromEngineName string,
fileSyncHTTPClientTimeout int64) (err error) {
return p.grpcClient.SnapshotClone(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName, p.DirectToURL(e),
snapshotName, fromEngineAddress, fromEngineName, int(fileSyncHTTPClientTimeout))
}

func (p *Proxy) SnapshotCloneStatus(e *longhorn.Engine) (status map[string]*longhorn.SnapshotCloneStatus, err error) {
recv, err := p.grpcClient.SnapshotCloneStatus(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e))
recv, err := p.grpcClient.SnapshotCloneStatus(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName,
p.DirectToURL(e))
if err != nil {
return nil, err
}
Expand All @@ -48,15 +53,18 @@ func (p *Proxy) SnapshotCloneStatus(e *longhorn.Engine) (status map[string]*long
}

func (p *Proxy) SnapshotRevert(e *longhorn.Engine, snapshotName string) (err error) {
return p.grpcClient.SnapshotRevert(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e), snapshotName)
return p.grpcClient.SnapshotRevert(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName, p.DirectToURL(e),
snapshotName)
}

func (p *Proxy) SnapshotPurge(e *longhorn.Engine) (err error) {
return p.grpcClient.SnapshotPurge(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e), true)
return p.grpcClient.SnapshotPurge(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName, p.DirectToURL(e),
true)
}

func (p *Proxy) SnapshotPurgeStatus(e *longhorn.Engine) (status map[string]*longhorn.PurgeStatus, err error) {
recv, err := p.grpcClient.SnapshotPurgeStatus(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e))
recv, err := p.grpcClient.SnapshotPurgeStatus(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName,
p.DirectToURL(e))
if err != nil {
return nil, err
}
Expand All @@ -69,15 +77,18 @@ func (p *Proxy) SnapshotPurgeStatus(e *longhorn.Engine) (status map[string]*long
}

func (p *Proxy) SnapshotDelete(e *longhorn.Engine, name string) (err error) {
return p.grpcClient.SnapshotRemove(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e), []string{name})
return p.grpcClient.SnapshotRemove(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName, p.DirectToURL(e),
[]string{name})
}

func (p *Proxy) SnapshotHash(e *longhorn.Engine, snapshotName string, rehash bool) error {
return p.grpcClient.SnapshotHash(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e), snapshotName, rehash)
return p.grpcClient.SnapshotHash(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName, p.DirectToURL(e),
snapshotName, rehash)
}

func (p *Proxy) SnapshotHashStatus(e *longhorn.Engine, snapshotName string) (status map[string]*longhorn.HashStatus, err error) {
recv, err := p.grpcClient.SnapshotHashStatus(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e), snapshotName)
recv, err := p.grpcClient.SnapshotHashStatus(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName,
p.DirectToURL(e), snapshotName)
if err != nil {
return nil, err
}
Expand Down
14 changes: 9 additions & 5 deletions engineapi/proxy_volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

func (p *Proxy) VolumeGet(e *longhorn.Engine) (volume *Volume, err error) {
recv, err := p.grpcClient.VolumeGet(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e))
recv, err := p.grpcClient.VolumeGet(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName, p.DirectToURL(e))
if err != nil {
return nil, err
}
Expand All @@ -16,7 +16,8 @@ func (p *Proxy) VolumeGet(e *longhorn.Engine) (volume *Volume, err error) {
}

func (p *Proxy) VolumeExpand(e *longhorn.Engine) (err error) {
return p.grpcClient.VolumeExpand(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e), e.Spec.VolumeSize)
return p.grpcClient.VolumeExpand(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName, p.DirectToURL(e),
e.Spec.VolumeSize)
}

func (p *Proxy) VolumeFrontendStart(e *longhorn.Engine) (err error) {
Expand All @@ -29,13 +30,16 @@ func (p *Proxy) VolumeFrontendStart(e *longhorn.Engine) (err error) {
return fmt.Errorf("cannot start empty frontend")
}

return p.grpcClient.VolumeFrontendStart(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e), frontendName)
return p.grpcClient.VolumeFrontendStart(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName,
p.DirectToURL(e), frontendName)
}

func (p *Proxy) VolumeFrontendShutdown(e *longhorn.Engine) (err error) {
return p.grpcClient.VolumeFrontendShutdown(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e))
return p.grpcClient.VolumeFrontendShutdown(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName,
p.DirectToURL(e))
}

func (p *Proxy) VolumeUnmapMarkSnapChainRemovedSet(e *longhorn.Engine) error {
return p.grpcClient.VolumeUnmapMarkSnapChainRemovedSet(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e), e.Spec.UnmapMarkSnapChainRemovedEnabled)
return p.grpcClient.VolumeUnmapMarkSnapChainRemovedSet(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName,
p.DirectToURL(e), e.Spec.UnmapMarkSnapChainRemovedEnabled)
}
8 changes: 5 additions & 3 deletions engineapi/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,14 @@ func (e *EngineBinary) SnapshotPurgeStatus(*longhorn.Engine) (map[string]*longho

// SnapshotClone calls engine binary
// TODO: Deprecated, replaced by gRPC proxy
func (e *EngineBinary) SnapshotClone(engine *longhorn.Engine, snapshotName, fromControllerAddress string, fileSyncHTTPClientTimeout int64) error {
args := []string{"snapshot", "clone", "--snapshot-name", snapshotName, "--from-controller-address", fromControllerAddress}
func (e *EngineBinary) SnapshotClone(engine *longhorn.Engine, snapshotName, fromEngineAddress, fromEngineName string,
fileSyncHTTPClientTimeout int64) error {
args := []string{"snapshot", "clone", "--snapshot-name", snapshotName, "--from-controller-address",
fromEngineAddress, "--from-controller-instance-name", fromEngineName}
if _, err := e.ExecuteEngineBinaryWithoutTimeout([]string{}, args...); err != nil {
return errors.Wrapf(err, "error starting snapshot clone")
}
logrus.Debugf("Cloned snapshot %v from volume %v to volume %v", snapshotName, fromControllerAddress, e.cURL)
logrus.Debugf("Cloned snapshot %v from volume %v to volume %v", snapshotName, fromEngineAddress, e.cURL)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion engineapi/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ type EngineClient interface {
SnapshotBackup(engine *longhorn.Engine, backupName, snapName, backupTarget, backingImageName, backingImageChecksum, compressionMethod string, concurrentLimit int, storageClassName string, labels, credential map[string]string) (string, string, error)
SnapshotBackupStatus(engine *longhorn.Engine, backupName, replicaAddress string) (*longhorn.EngineBackupStatus, error)
SnapshotCloneStatus(engine *longhorn.Engine) (map[string]*longhorn.SnapshotCloneStatus, error)
SnapshotClone(engine *longhorn.Engine, snapshotName, fromControllerAddress string, fileSyncHTTPClientTimeout int64) error
SnapshotClone(engine *longhorn.Engine, snapshotName, fromEngineAddress, fromEngineName string, fileSyncHTTPClientTimeout int64) error
SnapshotHash(engine *longhorn.Engine, snapshotName string, rehash bool) error
SnapshotHashStatus(engine *longhorn.Engine, snapshotName string) (map[string]*longhorn.HashStatus, error)

Expand Down

0 comments on commit 33c028b

Please sign in to comment.