Skip to content

Commit

Permalink
Implement the Repo maintanence Job configuration design.
Browse files Browse the repository at this point in the history
TODO:
1. Add comment about the rule of the JobConfig key.
2. Add UT for new funtion getMaintenanceJobConfig.
3. Need to figure out the log level and log format for Job.

Signed-off-by: Xun Jiang <xun.jiang@broadcom.com>
  • Loading branch information
blackpiglet committed Aug 26, 2024
1 parent f5671c7 commit 9f62b81
Show file tree
Hide file tree
Showing 15 changed files with 500 additions and 237 deletions.
18 changes: 16 additions & 2 deletions pkg/cmd/cli/nodeagent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import (
"github.com/vmware-tanzu/velero/pkg/metrics"
"github.com/vmware-tanzu/velero/pkg/nodeagent"
"github.com/vmware-tanzu/velero/pkg/repository"
velerotypes "github.com/vmware-tanzu/velero/pkg/types"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
"github.com/vmware-tanzu/velero/pkg/util/logging"

Expand Down Expand Up @@ -292,7 +293,7 @@ func (s *nodeAgentServer) run() {
s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller")
}

var loadAffinity *nodeagent.LoadAffinity
var loadAffinity *velerotypes.LoadAffinity
if s.dataPathConfigs != nil && len(s.dataPathConfigs.LoadAffinity) > 0 {
loadAffinity = s.dataPathConfigs.LoadAffinity[0]
}
Expand All @@ -302,7 +303,20 @@ func (s *nodeAgentServer) run() {
backupPVCConfig = s.dataPathConfigs.BackupPVCConfig
}

dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, loadAffinity, backupPVCConfig, clock.RealClock{}, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
dataUploadReconciler := controller.NewDataUploadReconciler(
s.mgr.GetClient(),
s.mgr,
s.kubeClient,
s.csiSnapshotClient.SnapshotV1(),
s.dataPathMgr,
loadAffinity,
backupPVCConfig,
clock.RealClock{},
s.nodeName,
s.config.dataMoverPrepareTimeout,
s.logger,
s.metrics,
)
if err = dataUploadReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the data upload controller")
}
Expand Down
49 changes: 33 additions & 16 deletions pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ type serverConfig struct {
defaultSnapshotMoveData bool
disableInformerCache bool
scheduleSkipImmediately bool
maintenanceCfg repository.MaintenanceConfig
backukpRepoConfig string
backupRepoConfig string
repoMaintenanceJobConfig string
}

func NewCommand(f client.Factory) *cobra.Command {
Expand Down Expand Up @@ -172,9 +172,6 @@ func NewCommand(f client.Factory) *cobra.Command {
defaultSnapshotMoveData: false,
disableInformerCache: defaultDisableInformerCache,
scheduleSkipImmediately: false,
maintenanceCfg: repository.MaintenanceConfig{
KeepLatestMaitenanceJobs: repository.DefaultKeepLatestMaitenanceJobs,
},
}
)

Expand Down Expand Up @@ -248,17 +245,20 @@ func NewCommand(f client.Factory) *cobra.Command {
command.Flags().BoolVar(&config.defaultSnapshotMoveData, "default-snapshot-move-data", config.defaultSnapshotMoveData, "Move data by default for all snapshots supporting data movement.")
command.Flags().BoolVar(&config.disableInformerCache, "disable-informer-cache", config.disableInformerCache, "Disable informer cache for Get calls on restore. With this enabled, it will speed up restore in cases where there are backup resources which already exist in the cluster, but for very large clusters this will increase velero memory usage. Default is false (don't disable).")
command.Flags().BoolVar(&config.scheduleSkipImmediately, "schedule-skip-immediately", config.scheduleSkipImmediately, "Skip the first scheduled backup immediately after creating a schedule. Default is false (don't skip).")
command.Flags().IntVar(&config.maintenanceCfg.KeepLatestMaitenanceJobs, "keep-latest-maintenance-jobs", config.maintenanceCfg.KeepLatestMaitenanceJobs, "Number of latest maintenance jobs to keep each repository. Optional.")
command.Flags().StringVar(&config.maintenanceCfg.CPURequest, "maintenance-job-cpu-request", config.maintenanceCfg.CPURequest, "CPU request for maintenance job. Default is no limit.")
command.Flags().StringVar(&config.maintenanceCfg.MemRequest, "maintenance-job-mem-request", config.maintenanceCfg.MemRequest, "Memory request for maintenance job. Default is no limit.")
command.Flags().StringVar(&config.maintenanceCfg.CPULimit, "maintenance-job-cpu-limit", config.maintenanceCfg.CPULimit, "CPU limit for maintenance job. Default is no limit.")
command.Flags().StringVar(&config.maintenanceCfg.MemLimit, "maintenance-job-mem-limit", config.maintenanceCfg.MemLimit, "Memory limit for maintenance job. Default is no limit.")

command.Flags().StringVar(&config.backukpRepoConfig, "backup-repository-config", config.backukpRepoConfig, "The name of configMap containing backup repository configurations.")
command.Flags().StringVar(
&config.backupRepoConfig,
"backup-repository-config",
config.backupRepoConfig,
"The name of configMap containing backup repository configurations.",
)
command.Flags().StringVar(
&config.repoMaintenanceJobConfig,
"repo-maintenance-job-config",
config.repoMaintenanceJobConfig,
"The name of ConfigMap containing repository maintenance Job configurations.",
)

// maintenance job log setting inherited from velero server
config.maintenanceCfg.FormatFlag = config.formatFlag
config.maintenanceCfg.LogLevelFlag = logLevelFlag
return command
}

Expand Down Expand Up @@ -667,7 +667,17 @@ func (s *server) initRepoManager() error {
s.repoLocker = repository.NewRepoLocker()
s.repoEnsurer = repository.NewEnsurer(s.mgr.GetClient(), s.logger, s.config.resourceTimeout)

s.repoManager = repository.NewManager(s.namespace, s.mgr.GetClient(), s.repoLocker, s.repoEnsurer, s.credentialFileStore, s.credentialSecretStore, s.config.maintenanceCfg, s.logger)
s.repoManager = repository.NewManager(
s.namespace,
s.mgr.GetClient(),
s.crClient,
s.repoLocker,
s.repoEnsurer,
s.credentialFileStore,
s.credentialSecretStore,
s.config.repoMaintenanceJobConfig,
s.logger,
)

return nil
}
Expand Down Expand Up @@ -881,7 +891,14 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
}

if _, ok := enabledRuntimeControllers[controller.BackupRepo]; ok {
if err := controller.NewBackupRepoReconciler(s.namespace, s.logger, s.mgr.GetClient(), s.config.repoMaintenanceFrequency, s.config.backukpRepoConfig, s.repoManager).SetupWithManager(s.mgr); err != nil {
if err := controller.NewBackupRepoReconciler(
s.namespace,
s.logger,
s.mgr.GetClient(),
s.config.repoMaintenanceFrequency,
s.config.backupRepoConfig,
s.repoManager,
).SetupWithManager(s.mgr); err != nil {
s.logger.Fatal(err, "unable to create controller", "controller", controller.BackupRepo)
}
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/controller/backup_repository_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,19 @@ type BackupRepoReconciler struct {
logger logrus.FieldLogger
clock clocks.WithTickerAndDelayedExecution
maintenanceFrequency time.Duration
backukpRepoConfig string
backupRepoConfig string
repositoryManager repository.Manager
}

func NewBackupRepoReconciler(namespace string, logger logrus.FieldLogger, client client.Client,
maintenanceFrequency time.Duration, backukpRepoConfig string, repositoryManager repository.Manager) *BackupRepoReconciler {
maintenanceFrequency time.Duration, backupRepoConfig string, repositoryManager repository.Manager) *BackupRepoReconciler {
c := &BackupRepoReconciler{
client,
namespace,
logger,
clocks.RealClock{},
maintenanceFrequency,
backukpRepoConfig,
backupRepoConfig,
repositoryManager,
}

Expand Down Expand Up @@ -229,7 +229,7 @@ func (r *BackupRepoReconciler) getIdentiferByBSL(ctx context.Context, req *veler
}

func (r *BackupRepoReconciler) initializeRepo(ctx context.Context, req *velerov1api.BackupRepository, log logrus.FieldLogger) error {
log.WithField("repoConfig", r.backukpRepoConfig).Info("Initializing backup repository")
log.WithField("repoConfig", r.backupRepoConfig).Info("Initializing backup repository")

// confirm the repo's BackupStorageLocation is valid
repoIdentifier, err := r.getIdentiferByBSL(ctx, req)
Expand All @@ -244,7 +244,7 @@ func (r *BackupRepoReconciler) initializeRepo(ctx context.Context, req *velerov1
})
}

config, err := getBackupRepositoryConfig(ctx, r, r.backukpRepoConfig, r.namespace, req.Name, req.Spec.RepositoryType, log)
config, err := getBackupRepositoryConfig(ctx, r, r.backupRepoConfig, r.namespace, req.Name, req.Spec.RepositoryType, log)
if err != nil {
log.WithError(err).Warn("Failed to get repo config, repo config is ignored")
} else if config != nil {
Expand Down
23 changes: 17 additions & 6 deletions pkg/controller/data_upload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"time"

snapshotter "github.com/kubernetes-csi/external-snapshotter/client/v7/clientset/versioned/typed/volumesnapshot/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
Expand All @@ -39,8 +40,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

snapshotter "github.com/kubernetes-csi/external-snapshotter/client/v7/clientset/versioned/typed/volumesnapshot/v1"

"github.com/vmware-tanzu/velero/pkg/apis/velero/shared"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1"
Expand All @@ -49,6 +48,7 @@ import (
"github.com/vmware-tanzu/velero/pkg/exposer"
"github.com/vmware-tanzu/velero/pkg/metrics"
"github.com/vmware-tanzu/velero/pkg/nodeagent"
velerotypes "github.com/vmware-tanzu/velero/pkg/types"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/util/kube"
)
Expand All @@ -71,15 +71,26 @@ type DataUploadReconciler struct {
logger logrus.FieldLogger
snapshotExposerList map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer
dataPathMgr *datapath.Manager
loadAffinity *nodeagent.LoadAffinity
loadAffinity *velerotypes.LoadAffinity
backupPVCConfig map[string]nodeagent.BackupPVC
preparingTimeout time.Duration
metrics *metrics.ServerMetrics
}

func NewDataUploadReconciler(client client.Client, mgr manager.Manager, kubeClient kubernetes.Interface, csiSnapshotClient snapshotter.SnapshotV1Interface,
dataPathMgr *datapath.Manager, loadAffinity *nodeagent.LoadAffinity, backupPVCConfig map[string]nodeagent.BackupPVC, clock clocks.WithTickerAndDelayedExecution,
nodeName string, preparingTimeout time.Duration, log logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataUploadReconciler {
func NewDataUploadReconciler(
client client.Client,
mgr manager.Manager,
kubeClient kubernetes.Interface,
csiSnapshotClient snapshotter.SnapshotV1Interface,
dataPathMgr *datapath.Manager,
loadAffinity *velerotypes.LoadAffinity,
backupPVCConfig map[string]nodeagent.BackupPVC,
clock clocks.WithTickerAndDelayedExecution,
nodeName string,
preparingTimeout time.Duration,
log logrus.FieldLogger,
metrics *metrics.ServerMetrics,
) *DataUploadReconciler {
return &DataUploadReconciler{
client: client,
mgr: mgr,
Expand Down
32 changes: 30 additions & 2 deletions pkg/controller/data_upload_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,36 @@ func initDataUploaderReconcilerWithError(needError ...error) (*DataUploadReconci
fakeSnapshotClient := snapshotFake.NewSimpleClientset(vsObject, vscObj)
fakeKubeClient := clientgofake.NewSimpleClientset(daemonSet)

return NewDataUploadReconciler(fakeClient, nil, fakeKubeClient, fakeSnapshotClient.SnapshotV1(), dataPathMgr, nil, map[string]nodeagent.BackupPVC{},
testclocks.NewFakeClock(now), "test-node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil
return NewDataUploadReconciler(
fakeClient,
nil,
fakeKubeClient,
fakeSnapshotClient.SnapshotV1(),
dataPathMgr,
nil,
map[string]nodeagent.BackupPVC{},
testclocks.NewFakeClock(now),
"test-node",
time.Minute*5,
velerotest.NewLogger(),
metrics.NewServerMetrics(),

/*
client client.Client
kubeClient kubernetes.Interface
csiSnapshotClient snapshotter.SnapshotV1Interface
mgr manager.Manager
Clock clocks.WithTickerAndDelayedExecution
nodeName string
logger logrus.FieldLogger
snapshotExposerList map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer
dataPathMgr *datapath.Manager
loadAffinity *nodeagent.LoadAffinity
backupPVCConfig map[string]nodeagent.BackupPVC
preparingTimeout time.Duration
metrics *metrics.ServerMetrics
*/
), nil
}

func dataUploadBuilder() *builder.DataUploadBuilder {
Expand Down
67 changes: 23 additions & 44 deletions pkg/exposer/csi_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/vmware-tanzu/velero/pkg/nodeagent"
velerotypes "github.com/vmware-tanzu/velero/pkg/types"
"github.com/vmware-tanzu/velero/pkg/util/boolptr"
"github.com/vmware-tanzu/velero/pkg/util/csi"
"github.com/vmware-tanzu/velero/pkg/util/kube"
Expand Down Expand Up @@ -66,7 +67,7 @@ type CSISnapshotExposeParam struct {
VolumeSize resource.Quantity

// Affinity specifies the node affinity of the backup pod
Affinity *nodeagent.LoadAffinity
Affinity *velerotypes.LoadAffinity

// BackupPVCConfig is the config for backupPVC (intermediate PVC) of snapshot data movement
BackupPVCConfig map[string]nodeagent.BackupPVC
Expand Down Expand Up @@ -191,7 +192,14 @@ func (e *csiSnapshotExposer) Expose(ctx context.Context, ownerObject corev1.Obje
}
}()

backupPod, err := e.createBackupPod(ctx, ownerObject, backupPVC, csiExposeParam.OperationTimeout, csiExposeParam.HostingPodLabels, csiExposeParam.Affinity)
backupPod, err := e.createBackupPod(
ctx,
ownerObject,
backupPVC,
csiExposeParam.OperationTimeout,
csiExposeParam.HostingPodLabels,
csiExposeParam.Affinity,
)
if err != nil {
return errors.Wrap(err, "error to create backup pod")
}
Expand Down Expand Up @@ -422,8 +430,14 @@ func (e *csiSnapshotExposer) createBackupPVC(ctx context.Context, ownerObject co
return created, err
}

func (e *csiSnapshotExposer) createBackupPod(ctx context.Context, ownerObject corev1.ObjectReference, backupPVC *corev1.PersistentVolumeClaim, operationTimeout time.Duration,
label map[string]string, affinity *nodeagent.LoadAffinity) (*corev1.Pod, error) {
func (e *csiSnapshotExposer) createBackupPod(
ctx context.Context,
ownerObject corev1.ObjectReference,
backupPVC *corev1.PersistentVolumeClaim,
operationTimeout time.Duration,
label map[string]string,
affinity *velerotypes.LoadAffinity,
) (*corev1.Pod, error) {
podName := ownerObject.Name

containerName := string(ownerObject.UID)
Expand Down Expand Up @@ -498,7 +512,11 @@ func (e *csiSnapshotExposer) createBackupPod(ctx context.Context, ownerObject co
},
},
},
Affinity: toSystemAffinity(affinity),
Affinity: csi.ToSystemAffinity(
[]*velerotypes.LoadAffinity{
affinity,
},
),
Containers: []corev1.Container{
{
Name: containerName,
Expand Down Expand Up @@ -527,42 +545,3 @@ func (e *csiSnapshotExposer) createBackupPod(ctx context.Context, ownerObject co

return e.kubeClient.CoreV1().Pods(ownerObject.Namespace).Create(ctx, pod, metav1.CreateOptions{})
}

func toSystemAffinity(loadAffinity *nodeagent.LoadAffinity) *corev1.Affinity {
if loadAffinity == nil {
return nil
}

requirements := []corev1.NodeSelectorRequirement{}
for k, v := range loadAffinity.NodeSelector.MatchLabels {
requirements = append(requirements, corev1.NodeSelectorRequirement{
Key: k,
Values: []string{v},
Operator: corev1.NodeSelectorOpIn,
})
}

for _, exp := range loadAffinity.NodeSelector.MatchExpressions {
requirements = append(requirements, corev1.NodeSelectorRequirement{
Key: exp.Key,
Values: exp.Values,
Operator: corev1.NodeSelectorOperator(exp.Operator),
})
}

if len(requirements) == 0 {
return nil
}

return &corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{
{
MatchExpressions: requirements,
},
},
},
},
}
}
Loading

0 comments on commit 9f62b81

Please sign in to comment.