Skip to content

Commit

Permalink
Merge branch 'master' into issue2997
Browse files Browse the repository at this point in the history
  • Loading branch information
innobead authored Aug 8, 2023
2 parents 27f34ed + 460b23b commit babace2
Show file tree
Hide file tree
Showing 33 changed files with 879 additions and 530 deletions.
104 changes: 58 additions & 46 deletions app/recurring_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"

"golang.org/x/sync/errgroup"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -71,15 +72,14 @@ func RecurringJobCmd() cli.Command {
},
Action: func(c *cli.Context) {
if err := recurringJob(c); err != nil {
logrus.WithError(err).Fatal("Failed to snapshot")
logrus.WithError(err).Fatal("Failed to do a recurring job")
}
},
}
}

func recurringJob(c *cli.Context) error {
func recurringJob(c *cli.Context) (err error) {
logger := logrus.StandardLogger()
var err error

var managerURL string = c.String(FlagManagerURL)
if managerURL == "" {
Expand Down Expand Up @@ -108,6 +108,7 @@ func recurringJob(c *cli.Context) error {
var jobGroups []string = recurringJob.Spec.Groups
var jobRetain int = recurringJob.Spec.Retain
var jobConcurrent int = recurringJob.Spec.Concurrency
jobTask := recurringJob.Spec.Task

jobLabelMap := map[string]string{}
if recurringJob.Spec.Labels != nil {
Expand Down Expand Up @@ -142,51 +143,62 @@ func recurringJob(c *cli.Context) error {
logger.Infof("Found %v volumes with recurring job %v", len(filteredVolumes), jobName)

concurrentLimiter := make(chan struct{}, jobConcurrent)
var wg sync.WaitGroup
defer wg.Wait()
ewg := &errgroup.Group{}
defer func() {
if wgError := ewg.Wait(); wgError != nil {
err = wgError
}
}()
for _, volumeName := range filteredVolumes {
wg.Add(1)
go func(volumeName string) {
concurrentLimiter <- struct{}{}
defer func() {
<-concurrentLimiter
wg.Done()
}()

log := logger.WithFields(logrus.Fields{
"job": jobName,
"volume": volumeName,
"task": recurringJob.Spec.Task,
"retain": jobRetain,
"concurrent": jobConcurrent,
"groups": strings.Join(jobGroups, ","),
"labels": string(labelJSON),
})
log.Info("Creating job")

snapshotName := sliceStringSafely(types.GetCronJobNameForRecurringJob(jobName), 0, 8) + "-" + util.UUID()
job, err := NewJob(
logger,
managerURL,
volumeName,
snapshotName,
jobLabelMap,
jobRetain,
recurringJob.Spec.Task)
if err != nil {
log.WithError(err).Error("Failed to create new job for volume")
return
}
err = job.run()
if err != nil {
log.WithError(err).Errorf("Failed to run job for volume")
return
}
startJobVolumeName := volumeName
ewg.Go(func() error {
return startVolumeJob(startJobVolumeName, logger, concurrentLimiter, managerURL, jobName, jobTask, jobRetain, jobConcurrent, jobGroups, jobLabelMap, labelJSON)
})
}

log.Info("Created job")
}(volumeName)
return err
}

func startVolumeJob(
volumeName string, logger *logrus.Logger, concurrentLimiter chan struct{}, managerURL string,
jobName string, jobTask longhorn.RecurringJobType, jobRetain int, jobConcurrent int, jobGroups []string, jobLabelMap map[string]string, labelJSON []byte) error {

concurrentLimiter <- struct{}{}
defer func() {
<-concurrentLimiter
}()

log := logger.WithFields(logrus.Fields{
"job": jobName,
"volume": volumeName,
"task": jobTask,
"retain": jobRetain,
"concurrent": jobConcurrent,
"groups": strings.Join(jobGroups, ","),
"labels": string(labelJSON),
})
log.Info("Creating job")

snapshotName := sliceStringSafely(types.GetCronJobNameForRecurringJob(jobName), 0, 8) + "-" + util.UUID()
job, err := newJob(
logger,
managerURL,
volumeName,
snapshotName,
jobLabelMap,
jobRetain,
jobTask)
if err != nil {
log.WithError(err).Error("Failed to create new job for volume")
return err
}
err = job.run()
if err != nil {
log.WithError(err).Errorf("Failed to run job for volume")
return err
}

log.Info("Created job")
return nil
}

Expand All @@ -200,7 +212,7 @@ func sliceStringSafely(s string, begin, end int) string {
return s[begin:end]
}

func NewJob(logger logrus.FieldLogger, managerURL, volumeName, snapshotName string, labels map[string]string, retain int, task longhorn.RecurringJobType) (*Job, error) {
func newJob(logger logrus.FieldLogger, managerURL, volumeName, snapshotName string, labels map[string]string, retain int, task longhorn.RecurringJobType) (*Job, error) {
namespace := os.Getenv(types.EnvPodNamespace)
if namespace == "" {
return nil, fmt.Errorf("failed detect pod namespace, environment variable %v is missing", types.EnvPodNamespace)
Expand Down
12 changes: 7 additions & 5 deletions controller/engine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1565,7 +1565,8 @@ func cloneSnapshot(engine *longhorn.Engine, engineClientProxy engineapi.EngineCl
}

sourceEngineControllerURL := imutil.GetURL(sourceEngine.Status.StorageIP, sourceEngine.Status.Port)
if err := engineClientProxy.SnapshotClone(engine, snapshotName, sourceEngineControllerURL, fileSyncHTTPClientTimeout); err != nil {
if err := engineClientProxy.SnapshotClone(engine, snapshotName, sourceEngineControllerURL,
sourceEngine.Spec.VolumeName, sourceEngine.Name, fileSyncHTTPClientTimeout); err != nil {
// There is only 1 replica during volume cloning,
// so if the cloning failed, it must be that the replica failed to clone.
for _, status := range engine.Status.CloneStatus {
Expand Down Expand Up @@ -1609,10 +1610,11 @@ func GetBinaryClientForEngine(e *longhorn.Engine, engines engineapi.EngineClient
}

client, err = engines.NewEngineClient(&engineapi.EngineClientRequest{
VolumeName: e.Spec.VolumeName,
EngineImage: image,
IP: e.Status.IP,
Port: e.Status.Port,
VolumeName: e.Spec.VolumeName,
EngineImage: image,
IP: e.Status.IP,
Port: e.Status.Port,
InstanceName: e.Name,
})
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion engineapi/backup_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func (m *BackupMonitor) syncBackupStatusFromEngineReplica() (currentBackupStatus
m.backupStatus.DeepCopyInto(&currentBackupStatus)
m.backupStatusLock.RUnlock()

engineBackupStatus, err = m.engineClientProxy.SnapshotBackupStatus(m.engine, m.backupName, m.replicaAddress)
engineBackupStatus, err = m.engineClientProxy.SnapshotBackupStatus(m.engine, m.backupName, m.replicaAddress, "")
if err != nil {
return currentBackupStatus, err
}
Expand Down
18 changes: 15 additions & 3 deletions engineapi/backups.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,10 +318,10 @@ func (e *EngineBinary) SnapshotBackup(engine *longhorn.Engine, snapName, backupN
// TODO: update when replacing this function
snap, err := e.SnapshotGet(nil, snapName)
if err != nil {
return "", "", errors.Wrapf(err, "error getting snapshot '%s', volume '%s'", snapName, e.name)
return "", "", errors.Wrapf(err, "error getting snapshot '%s', volume '%s'", snapName, e.volumeName)
}
if snap == nil {
return "", "", errors.Errorf("could not find snapshot '%s' to backup, volume '%s'", snapName, e.name)
return "", "", errors.Errorf("could not find snapshot '%s' to backup, volume '%s'", snapName, e.volumeName)
}
version, err := e.VersionGet(nil, true)
if err != nil {
Expand Down Expand Up @@ -365,12 +365,24 @@ func (e *EngineBinary) SnapshotBackup(engine *longhorn.Engine, snapName, backupN

// SnapshotBackupStatus calls engine binary
// TODO: Deprecated, replaced by gRPC proxy
func (e *EngineBinary) SnapshotBackupStatus(engine *longhorn.Engine, backupName, replicaAddress string) (*longhorn.EngineBackupStatus, error) {
func (e *EngineBinary) SnapshotBackupStatus(engine *longhorn.Engine, backupName, replicaAddress,
replicaName string) (*longhorn.EngineBackupStatus, error) {
args := []string{"backup", "status", backupName}
if replicaAddress != "" {
args = append(args, "--replica", replicaAddress)
}

// For now, we likely don't know the replica name here. Don't bother checking the binary version if we don't.
if replicaName != "" {
version, err := e.VersionGet(engine, true)
if err != nil {
return nil, err
}
if version.ClientVersion.CLIAPIVersion >= 9 {
args = append(args, "--replica-instance-name", replicaName)
}
}

output, err := e.ExecuteEngineBinary(args...)
if err != nil {
return nil, err
Expand Down
64 changes: 47 additions & 17 deletions engineapi/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ const (
type EngineCollection struct{}

type EngineBinary struct {
name string
image string
ip string
port int
cURL string
volumeName string
image string
ip string
port int
cURL string
instanceName string
}

func (c *EngineCollection) NewEngineClient(request *EngineClientRequest) (*EngineBinary, error) {
Expand All @@ -46,34 +47,44 @@ func (c *EngineCollection) NewEngineClient(request *EngineClientRequest) (*Engin
}

return &EngineBinary{
name: request.VolumeName,
image: request.EngineImage,
ip: request.IP,
port: request.Port,
cURL: imutil.GetURL(request.IP, request.Port),
volumeName: request.VolumeName,
image: request.EngineImage,
ip: request.IP,
port: request.Port,
cURL: imutil.GetURL(request.IP, request.Port),
instanceName: request.InstanceName,
}, nil
}

func (e *EngineBinary) Name() string {
return e.name
return e.volumeName
}

func (e *EngineBinary) LonghornEngineBinary() string {
return filepath.Join(types.GetEngineBinaryDirectoryOnHostForImage(e.image), "longhorn")
}

func (e *EngineBinary) ExecuteEngineBinary(args ...string) (string, error) {
args = append([]string{"--url", e.cURL}, args...)
args, err := e.addFlags(args)
if err != nil {
return "", err
}
return util.Execute([]string{}, e.LonghornEngineBinary(), args...)
}

func (e *EngineBinary) ExecuteEngineBinaryWithTimeout(timeout time.Duration, args ...string) (string, error) {
args = append([]string{"--url", e.cURL}, args...)
args, err := e.addFlags(args)
if err != nil {
return "", err
}
return util.ExecuteWithTimeout(timeout, []string{}, e.LonghornEngineBinary(), args...)
}

func (e *EngineBinary) ExecuteEngineBinaryWithoutTimeout(envs []string, args ...string) (string, error) {
args = append([]string{"--url", e.cURL}, args...)
args, err := e.addFlags(args)
if err != nil {
return "", err
}
return util.ExecuteWithoutTimeout(envs, e.LonghornEngineBinary(), args...)
}

Expand All @@ -98,7 +109,7 @@ func parseReplica(s string) (*Replica, error) {
func (e *EngineBinary) ReplicaList(*longhorn.Engine) (map[string]*Replica, error) {
output, err := e.ExecuteEngineBinary("ls")
if err != nil {
return nil, errors.Wrapf(err, "failed to list replicas from controller '%s'", e.name)
return nil, errors.Wrapf(err, "failed to list replicas from controller '%s'", e.volumeName)
}
replicas := make(map[string]*Replica)
lines := strings.Split(output, "\n")
Expand Down Expand Up @@ -148,8 +159,12 @@ func (e *EngineBinary) ReplicaAdd(engine *longhorn.Engine, replicaName, url stri
}
}

if version.ClientVersion.CLIAPIVersion >= 9 {
cmd = append(cmd, "--replica-instance-name", replicaName)
}

if _, err := e.ExecuteEngineBinaryWithoutTimeout([]string{}, cmd...); err != nil {
return errors.Wrapf(err, "failed to add replica address='%s' to controller '%s'", url, e.name)
return errors.Wrapf(err, "failed to add replica address='%s' to controller '%s'", url, e.volumeName)
}
return nil
}
Expand All @@ -161,7 +176,7 @@ func (e *EngineBinary) ReplicaRemove(engine *longhorn.Engine, url string) error
return err
}
if _, err := e.ExecuteEngineBinary("rm", url); err != nil {
return errors.Wrapf(err, "failed to rm replica address='%s' from controller '%s'", url, e.name)
return errors.Wrapf(err, "failed to rm replica address='%s' from controller '%s'", url, e.volumeName)
}
return nil
}
Expand Down Expand Up @@ -304,3 +319,18 @@ func (e *EngineBinary) ReplicaModeUpdate(engine *longhorn.Engine, url, mode stri
func (e *EngineBinary) MetricsGet(*longhorn.Engine) (*Metrics, error) {
return nil, fmt.Errorf(ErrNotImplement)
}

// addFlags always adds required flags to args. In addition, if the engine version is high enough, it adds additional
// engine identity validation flags.
func (e *EngineBinary) addFlags(args []string) ([]string, error) {
version, err := e.VersionGet(nil, true)
if err != nil {
return args, errors.Wrap(err, "failed to get engine CLI version while adding identity flags")
}

argsToAdd := []string{"--url", e.cURL}
if version.ClientVersion.CLIAPIVersion >= 9 {
argsToAdd = append(argsToAdd, "--volume-name", e.volumeName, "--engine-instance-name", e.instanceName)
}
return append(argsToAdd, args...), nil
}
9 changes: 5 additions & 4 deletions engineapi/engine_binary.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ func GetEngineBinaryClient(ds *datastore.DataStore, volumeName, nodeID string) (

engineCollection := &EngineCollection{}
return engineCollection.NewEngineClient(&EngineClientRequest{
VolumeName: e.Spec.VolumeName,
EngineImage: e.Status.CurrentImage,
IP: e.Status.IP,
Port: e.Status.Port,
VolumeName: e.Spec.VolumeName,
EngineImage: e.Status.CurrentImage,
IP: e.Status.IP,
Port: e.Status.Port,
InstanceName: e.Name,
})
}
6 changes: 4 additions & 2 deletions engineapi/enginesim.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ func (e *EngineSimulator) SnapshotBackup(engine *longhorn.Engine, backupName, sn
return "", "", fmt.Errorf(ErrNotImplement)
}

func (e *EngineSimulator) SnapshotBackupStatus(engine *longhorn.Engine, backupName, replicaAddress string) (*longhorn.EngineBackupStatus, error) {
func (e *EngineSimulator) SnapshotBackupStatus(engine *longhorn.Engine, backupName, replicaAddress,
replicaName string) (*longhorn.EngineBackupStatus, error) {
return nil, fmt.Errorf(ErrNotImplement)
}

Expand All @@ -216,7 +217,8 @@ func (e *EngineSimulator) BackupRestore(engine *longhorn.Engine, backupTarget, b
return fmt.Errorf(ErrNotImplement)
}

func (e *EngineSimulator) SnapshotClone(engine *longhorn.Engine, snapshotName, fromControllerAddress string, fileSyncHTTPClientTimeout int64) error {
func (e *EngineSimulator) SnapshotClone(engine *longhorn.Engine, snapshotName, fromEngineAddress, fromVolumeName,
fromEngineName string, fileSyncHTTPClientTimeout int64) error {
return fmt.Errorf(ErrNotImplement)
}

Expand Down
Loading

0 comments on commit babace2

Please sign in to comment.