Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Engine identity validation #1897

Merged
merged 6 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions controller/engine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1565,7 +1565,8 @@ func cloneSnapshot(engine *longhorn.Engine, engineClientProxy engineapi.EngineCl
}

sourceEngineControllerURL := imutil.GetURL(sourceEngine.Status.StorageIP, sourceEngine.Status.Port)
if err := engineClientProxy.SnapshotClone(engine, snapshotName, sourceEngineControllerURL, fileSyncHTTPClientTimeout); err != nil {
if err := engineClientProxy.SnapshotClone(engine, snapshotName, sourceEngineControllerURL,
sourceEngine.Spec.VolumeName, sourceEngine.Name, fileSyncHTTPClientTimeout); err != nil {
// There is only 1 replica during volume cloning,
// so if the cloning failed, it must be that the replica failed to clone.
for _, status := range engine.Status.CloneStatus {
Expand Down Expand Up @@ -1609,10 +1610,11 @@ func GetBinaryClientForEngine(e *longhorn.Engine, engines engineapi.EngineClient
}

client, err = engines.NewEngineClient(&engineapi.EngineClientRequest{
VolumeName: e.Spec.VolumeName,
EngineImage: image,
IP: e.Status.IP,
Port: e.Status.Port,
VolumeName: e.Spec.VolumeName,
EngineImage: image,
IP: e.Status.IP,
Port: e.Status.Port,
InstanceName: e.Name,
})
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion engineapi/backup_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func (m *BackupMonitor) syncBackupStatusFromEngineReplica() (currentBackupStatus
m.backupStatus.DeepCopyInto(&currentBackupStatus)
m.backupStatusLock.RUnlock()

engineBackupStatus, err = m.engineClientProxy.SnapshotBackupStatus(m.engine, m.backupName, m.replicaAddress)
engineBackupStatus, err = m.engineClientProxy.SnapshotBackupStatus(m.engine, m.backupName, m.replicaAddress, "")
if err != nil {
return currentBackupStatus, err
}
Expand Down
18 changes: 15 additions & 3 deletions engineapi/backups.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,10 +318,10 @@ func (e *EngineBinary) SnapshotBackup(engine *longhorn.Engine, snapName, backupN
// TODO: update when replacing this function
snap, err := e.SnapshotGet(nil, snapName)
if err != nil {
return "", "", errors.Wrapf(err, "error getting snapshot '%s', volume '%s'", snapName, e.name)
return "", "", errors.Wrapf(err, "error getting snapshot '%s', volume '%s'", snapName, e.volumeName)
}
if snap == nil {
return "", "", errors.Errorf("could not find snapshot '%s' to backup, volume '%s'", snapName, e.name)
return "", "", errors.Errorf("could not find snapshot '%s' to backup, volume '%s'", snapName, e.volumeName)
}
version, err := e.VersionGet(nil, true)
if err != nil {
Expand Down Expand Up @@ -365,12 +365,24 @@ func (e *EngineBinary) SnapshotBackup(engine *longhorn.Engine, snapName, backupN

// SnapshotBackupStatus calls engine binary
// TODO: Deprecated, replaced by gRPC proxy
func (e *EngineBinary) SnapshotBackupStatus(engine *longhorn.Engine, backupName, replicaAddress string) (*longhorn.EngineBackupStatus, error) {
func (e *EngineBinary) SnapshotBackupStatus(engine *longhorn.Engine, backupName, replicaAddress,
replicaName string) (*longhorn.EngineBackupStatus, error) {
args := []string{"backup", "status", backupName}
if replicaAddress != "" {
args = append(args, "--replica", replicaAddress)
}

// For now, we likely don't know the replica name here. Don't bother checking the binary version if we don't.
if replicaName != "" {
version, err := e.VersionGet(engine, true)
if err != nil {
return nil, err
}
if version.ClientVersion.CLIAPIVersion >= 9 {
args = append(args, "--replica-instance-name", replicaName)
}
}

output, err := e.ExecuteEngineBinary(args...)
if err != nil {
return nil, err
Expand Down
64 changes: 47 additions & 17 deletions engineapi/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ const (
type EngineCollection struct{}

type EngineBinary struct {
name string
image string
ip string
port int
cURL string
volumeName string
image string
ip string
port int
cURL string
instanceName string
}

func (c *EngineCollection) NewEngineClient(request *EngineClientRequest) (*EngineBinary, error) {
Expand All @@ -46,34 +47,44 @@ func (c *EngineCollection) NewEngineClient(request *EngineClientRequest) (*Engin
}

return &EngineBinary{
name: request.VolumeName,
image: request.EngineImage,
ip: request.IP,
port: request.Port,
cURL: imutil.GetURL(request.IP, request.Port),
volumeName: request.VolumeName,
image: request.EngineImage,
ip: request.IP,
port: request.Port,
cURL: imutil.GetURL(request.IP, request.Port),
instanceName: request.InstanceName,
}, nil
}

func (e *EngineBinary) Name() string {
return e.name
return e.volumeName
}

func (e *EngineBinary) LonghornEngineBinary() string {
return filepath.Join(types.GetEngineBinaryDirectoryOnHostForImage(e.image), "longhorn")
}

func (e *EngineBinary) ExecuteEngineBinary(args ...string) (string, error) {
args = append([]string{"--url", e.cURL}, args...)
args, err := e.addFlags(args)
if err != nil {
return "", err
}
return util.Execute([]string{}, e.LonghornEngineBinary(), args...)
}

func (e *EngineBinary) ExecuteEngineBinaryWithTimeout(timeout time.Duration, args ...string) (string, error) {
args = append([]string{"--url", e.cURL}, args...)
args, err := e.addFlags(args)
if err != nil {
return "", err
}
return util.ExecuteWithTimeout(timeout, []string{}, e.LonghornEngineBinary(), args...)
}

func (e *EngineBinary) ExecuteEngineBinaryWithoutTimeout(envs []string, args ...string) (string, error) {
args = append([]string{"--url", e.cURL}, args...)
args, err := e.addFlags(args)
if err != nil {
return "", err
}
return util.ExecuteWithoutTimeout(envs, e.LonghornEngineBinary(), args...)
}

Expand All @@ -98,7 +109,7 @@ func parseReplica(s string) (*Replica, error) {
func (e *EngineBinary) ReplicaList(*longhorn.Engine) (map[string]*Replica, error) {
output, err := e.ExecuteEngineBinary("ls")
if err != nil {
return nil, errors.Wrapf(err, "failed to list replicas from controller '%s'", e.name)
return nil, errors.Wrapf(err, "failed to list replicas from controller '%s'", e.volumeName)
}
replicas := make(map[string]*Replica)
lines := strings.Split(output, "\n")
Expand Down Expand Up @@ -148,8 +159,12 @@ func (e *EngineBinary) ReplicaAdd(engine *longhorn.Engine, replicaName, url stri
}
}

if version.ClientVersion.CLIAPIVersion >= 9 {
shuo-wu marked this conversation as resolved.
Show resolved Hide resolved
cmd = append(cmd, "--replica-instance-name", replicaName)
}

if _, err := e.ExecuteEngineBinaryWithoutTimeout([]string{}, cmd...); err != nil {
return errors.Wrapf(err, "failed to add replica address='%s' to controller '%s'", url, e.name)
return errors.Wrapf(err, "failed to add replica address='%s' to controller '%s'", url, e.volumeName)
}
return nil
}
Expand All @@ -161,7 +176,7 @@ func (e *EngineBinary) ReplicaRemove(engine *longhorn.Engine, url string) error
return err
}
if _, err := e.ExecuteEngineBinary("rm", url); err != nil {
return errors.Wrapf(err, "failed to rm replica address='%s' from controller '%s'", url, e.name)
return errors.Wrapf(err, "failed to rm replica address='%s' from controller '%s'", url, e.volumeName)
}
return nil
}
Expand Down Expand Up @@ -304,3 +319,18 @@ func (e *EngineBinary) ReplicaModeUpdate(engine *longhorn.Engine, url, mode stri
func (e *EngineBinary) MetricsGet(*longhorn.Engine) (*Metrics, error) {
return nil, fmt.Errorf(ErrNotImplement)
}

// addFlags always adds required flags to args. In addition, if the engine version is high enough, it adds additional
// engine identity validation flags.
func (e *EngineBinary) addFlags(args []string) ([]string, error) {
version, err := e.VersionGet(nil, true)
if err != nil {
return args, errors.Wrap(err, "failed to get engine CLI version while adding identity flags")
}

argsToAdd := []string{"--url", e.cURL}
if version.ClientVersion.CLIAPIVersion >= 9 {
argsToAdd = append(argsToAdd, "--volume-name", e.volumeName, "--engine-instance-name", e.instanceName)
}
return append(argsToAdd, args...), nil
}
9 changes: 5 additions & 4 deletions engineapi/engine_binary.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ func GetEngineBinaryClient(ds *datastore.DataStore, volumeName, nodeID string) (

engineCollection := &EngineCollection{}
return engineCollection.NewEngineClient(&EngineClientRequest{
VolumeName: e.Spec.VolumeName,
EngineImage: e.Status.CurrentImage,
IP: e.Status.IP,
Port: e.Status.Port,
VolumeName: e.Spec.VolumeName,
EngineImage: e.Status.CurrentImage,
IP: e.Status.IP,
Port: e.Status.Port,
InstanceName: e.Name,
})
}
6 changes: 4 additions & 2 deletions engineapi/enginesim.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ func (e *EngineSimulator) SnapshotBackup(engine *longhorn.Engine, backupName, sn
return "", "", fmt.Errorf(ErrNotImplement)
}

func (e *EngineSimulator) SnapshotBackupStatus(engine *longhorn.Engine, backupName, replicaAddress string) (*longhorn.EngineBackupStatus, error) {
func (e *EngineSimulator) SnapshotBackupStatus(engine *longhorn.Engine, backupName, replicaAddress,
replicaName string) (*longhorn.EngineBackupStatus, error) {
return nil, fmt.Errorf(ErrNotImplement)
}

Expand All @@ -216,7 +217,8 @@ func (e *EngineSimulator) BackupRestore(engine *longhorn.Engine, backupTarget, b
return fmt.Errorf(ErrNotImplement)
}

func (e *EngineSimulator) SnapshotClone(engine *longhorn.Engine, snapshotName, fromControllerAddress string, fileSyncHTTPClientTimeout int64) error {
func (e *EngineSimulator) SnapshotClone(engine *longhorn.Engine, snapshotName, fromEngineAddress, fromVolumeName,
fromEngineName string, fileSyncHTTPClientTimeout int64) error {
return fmt.Errorf(ErrNotImplement)
}

Expand Down
20 changes: 18 additions & 2 deletions engineapi/instance_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
)

const (
CurrentInstanceManagerAPIVersion = 4
CurrentInstanceManagerAPIVersion = 5
MinInstanceManagerAPIVersion = 1
UnknownInstanceManagerAPIVersion = 0

Expand Down Expand Up @@ -336,6 +336,10 @@ func getBinaryAndArgsForEngineProcessCreation(e *longhorn.Engine,
}
}

if engineCLIAPIVersion >= 9 {
args = append([]string{"--engine-instance-name", e.Name}, args...)
}

PhanLe1010 marked this conversation as resolved.
Show resolved Hide resolved
for _, addr := range e.Status.CurrentReplicaAddressMap {
args = append(args, "--replica", GetBackendReplicaURL(addr))
}
Expand All @@ -358,7 +362,10 @@ func getBinaryAndArgsForReplicaProcessCreation(r *longhorn.Replica,
args = append(args, "--disableRevCounter")
}
if engineCLIAPIVersion >= 7 {
args = append(args, "--volume-name", r.Spec.VolumeName)
if engineCLIAPIVersion < 9 {
// Replaced by the global --volume-name flag when engineCLIAPIVersion == 9.
args = append(args, "--volume-name", r.Spec.VolumeName)
}

if dataLocality == longhorn.DataLocalityStrictLocal {
args = append(args, "--data-server-protocol", "unix")
Expand All @@ -369,6 +376,11 @@ func getBinaryAndArgsForReplicaProcessCreation(r *longhorn.Replica,
}
}

if engineCLIAPIVersion >= 9 {
args = append(args, "--replica-instance-name", r.Name)
args = append([]string{"--volume-name", r.Spec.VolumeName}, args...)
PhanLe1010 marked this conversation as resolved.
Show resolved Hide resolved
}

// 3 ports are already used by replica server, data server and syncagent server
syncAgentPortCount := portCount - 3
args = append(args, "--sync-agent-port-count", strconv.Itoa(syncAgentPortCount))
Expand Down Expand Up @@ -687,6 +699,10 @@ func (c *InstanceManagerClient) engineInstanceUpgrade(req *EngineInstanceUpgrade
}
}

if req.EngineCLIAPIVersion >= 9 {
args = append([]string{"--engine-instance-name", req.Engine.Name}, args...)
}

binary := filepath.Join(types.GetEngineBinaryDirectoryForEngineManagerContainer(req.Engine.Spec.EngineImage), types.EngineBinaryName)

if c.GetAPIVersion() < 4 {
Expand Down
18 changes: 11 additions & 7 deletions engineapi/proxy_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ func (p *Proxy) SnapshotBackup(e *longhorn.Engine, snapshotName, backupName, bac
return "", "", err
}

backupID, replicaAddress, err := p.grpcClient.SnapshotBackup(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e),
backupName, snapshotName, backupTarget, backingImageName, backingImageChecksum,
compressionMethod, concurrentLimit, storageClassName, labels, credentialEnv,
backupID, replicaAddress, err := p.grpcClient.SnapshotBackup(string(e.Spec.BackendStoreDriver), e.Name,
e.Spec.VolumeName, p.DirectToURL(e), backupName, snapshotName, backupTarget, backingImageName,
backingImageChecksum, compressionMethod, concurrentLimit, storageClassName, labels, credentialEnv,
)
if err != nil {
return "", "", err
Expand All @@ -48,8 +48,10 @@ func (p *Proxy) SnapshotBackup(e *longhorn.Engine, snapshotName, backupName, bac
return backupID, replicaAddress, nil
}

func (p *Proxy) SnapshotBackupStatus(e *longhorn.Engine, backupName, replicaAddress string) (status *longhorn.EngineBackupStatus, err error) {
recv, err := p.grpcClient.SnapshotBackupStatus(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e), backupName, replicaAddress)
func (p *Proxy) SnapshotBackupStatus(e *longhorn.Engine, backupName, replicaAddress,
replicaName string) (status *longhorn.EngineBackupStatus, err error) {
recv, err := p.grpcClient.SnapshotBackupStatus(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName,
p.DirectToURL(e), backupName, replicaAddress, replicaName)
if err != nil {
return nil, err
}
Expand All @@ -67,11 +69,13 @@ func (p *Proxy) BackupRestore(e *longhorn.Engine, backupTarget, backupName, back
return err
}

return p.grpcClient.BackupRestore(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e), backupURL, backupTarget, backupVolumeName, envs, concurrentLimit)
return p.grpcClient.BackupRestore(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName, p.DirectToURL(e),
backupURL, backupTarget, backupVolumeName, envs, concurrentLimit)
}

func (p *Proxy) BackupRestoreStatus(e *longhorn.Engine) (status map[string]*longhorn.RestoreStatus, err error) {
recv, err := p.grpcClient.BackupRestoreStatus(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e))
recv, err := p.grpcClient.BackupRestoreStatus(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName,
p.DirectToURL(e))
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion engineapi/proxy_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
)

func (p *Proxy) MetricsGet(e *longhorn.Engine) (*Metrics, error) {
metrics, err := p.grpcClient.MetricsGet(p.DirectToURL(e))
metrics, err := p.grpcClient.MetricsGet(e.Name, e.Spec.VolumeName, p.DirectToURL(e))
if err != nil {
return nil, err
}
Expand Down
13 changes: 8 additions & 5 deletions engineapi/proxy_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,18 @@ import (
)

func (p *Proxy) ReplicaAdd(e *longhorn.Engine, replicaName, replicaAddress string, restore, fastSync bool, replicaFileSyncHTTPClientTimeout int64) (err error) {
return p.grpcClient.ReplicaAdd(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e), replicaName, replicaAddress, restore, e.Spec.VolumeSize, e.Status.CurrentSize, int(replicaFileSyncHTTPClientTimeout), fastSync)
return p.grpcClient.ReplicaAdd(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName, p.DirectToURL(e),
replicaName, replicaAddress, restore, e.Spec.VolumeSize, e.Status.CurrentSize,
int(replicaFileSyncHTTPClientTimeout), fastSync)
}

func (p *Proxy) ReplicaRemove(e *longhorn.Engine, address string) (err error) {
return p.grpcClient.ReplicaRemove(string(e.Spec.BackendStoreDriver), p.DirectToURL(e), e.Name, address, "")
}

func (p *Proxy) ReplicaList(e *longhorn.Engine) (replicas map[string]*Replica, err error) {
resp, err := p.grpcClient.ReplicaList(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e))
resp, err := p.grpcClient.ReplicaList(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName,
p.DirectToURL(e))
if err != nil {
return nil, err
}
Expand All @@ -33,7 +36,8 @@ func (p *Proxy) ReplicaList(e *longhorn.Engine) (replicas map[string]*Replica, e
}

func (p *Proxy) ReplicaRebuildStatus(e *longhorn.Engine) (status map[string]*longhorn.RebuildStatus, err error) {
recv, err := p.grpcClient.ReplicaRebuildingStatus(string(e.Spec.BackendStoreDriver), e.Name, p.DirectToURL(e))
recv, err := p.grpcClient.ReplicaRebuildingStatus(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName,
p.DirectToURL(e))
if err != nil {
return nil, err
}
Expand All @@ -49,8 +53,7 @@ func (p *Proxy) ReplicaRebuildVerify(e *longhorn.Engine, url string) (err error)
if err := ValidateReplicaURL(url); err != nil {
return err
}

return p.grpcClient.ReplicaVerifyRebuild(string(e.Spec.BackendStoreDriver), p.DirectToURL(e), url)
return p.grpcClient.ReplicaVerifyRebuild(string(e.Spec.BackendStoreDriver), e.Name, e.Spec.VolumeName, p.DirectToURL(e), url, "")
}

func (p *Proxy) ReplicaModeUpdate(e *longhorn.Engine, url, mode string) (err error) {
Expand Down
Loading