Skip to content

Commit

Permalink
Add jitter window
Browse files Browse the repository at this point in the history
Before this PR, if scheduled time was every hour, if a Cleaner
instance was reconciled at 1:00:01, that Cleaner instance was
not processed because reconciliation happened a second later than
the scheduled one.

If multiple Cleaner instances are all scheduled to be processed at
the same time, this was causing problem with some Cleaner instances
never processed.

This PR introduces a window aroung the schedule time. By default this
window is set to 15 seconds (though it is configurable using arg `jitter-window`).
If reconciliation happens within jitter Window, a cleaner instance is still
processed.

To test this PR I created 8 Cleaner instances all scheduled at the same time:

```bash
kubectl get cleaner -A
NAME                            AGE
completed-jobs                  6m47s
completed-pods                  6m47s
stale-persistent-volume-claim   6m47s
unbound-peristent-volumes       6m47s
unused-configmaps               6m47s
unused-roles                    6m47s
unused-secrets                  6m47s
unused-service-accounts         6m46s
```

A report instance for each one of them was created

```bash
kubectl get report -A
NAME                            AGE
completed-jobs                  4m57s
completed-pods                  4m58s
stale-persistent-volume-claim   4m58s
unbound-peristent-volumes       4m58s
unused-configmaps               4m57s
unused-roles                    4m57s
unused-secrets                  4m58s
unused-service-accounts         4m58s
```
  • Loading branch information
mgianluc committed Jun 12, 2024
1 parent c161a9a commit d5eaeb7
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 20 deletions.
32 changes: 19 additions & 13 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,16 @@ import (
)

var (
setupLog = ctrl.Log.WithName("setup")
metricsAddr string
probeAddr string
workers int
restConfigQPS float32
restConfigBurst int
webhookPort int
concurrentReconciles int
syncPeriod time.Duration
setupLog = ctrl.Log.WithName("setup")
metricsAddr string
probeAddr string
workers int
restConfigQPS float32
restConfigBurst int
webhookPort int
concurrentReconciles int
syncPeriod time.Duration
jitterWindowInSeconds int
)

func main() {
Expand Down Expand Up @@ -105,9 +106,10 @@ func main() {
}

if err = (&controller.CleanerReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
ConcurrentReconciles: concurrentReconciles,
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
ConcurrentReconciles: concurrentReconciles,
JitterWindowInSeconds: jitterWindowInSeconds,
}).SetupWithManager(ctx, mgr, workers, ctrl.Log.WithName("worker")); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Cleaner")
os.Exit(1)
Expand Down Expand Up @@ -139,7 +141,11 @@ func initFlags(fs *pflag.FlagSet) {

const defaultWorkers = 5
fs.IntVar(&workers, "worker-number", defaultWorkers,
"Number of worker. Workers are used to process cleaner instances in backgroun")
"Number of worker. Workers are used to process cleaner instances in background")

const defaultJitterWindow = 15
fs.IntVar(&jitterWindowInSeconds, "jitter", defaultJitterWindow,
"The predefined time interval around a scheduled execution time.")

const defaultReconcilers = 10
fs.IntVar(&concurrentReconciles, "concurrent-reconciles", defaultReconcilers,
Expand Down
18 changes: 11 additions & 7 deletions internal/controller/cleaner_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ import (
// CleanerReconciler reconciles a Cleaner object
type CleanerReconciler struct {
client.Client
Scheme *runtime.Scheme
ConcurrentReconciles int
Scheme *runtime.Scheme
ConcurrentReconciles int
JitterWindowInSeconds int
}

//+kubebuilder:rbac:groups=apps.projectsveltos.io,resources=cleaners,verbs=get;list;watch;patch
Expand Down Expand Up @@ -151,7 +152,7 @@ func (r *CleanerReconciler) reconcileNormal(ctx context.Context, cleanerScope *s
}

now := time.Now()
nextRun, err := schedule(ctx, cleanerScope, logger)
nextRun, err := schedule(ctx, cleanerScope, r.JitterWindowInSeconds, logger)
if err != nil {
logger.Info("failed to get next run. Err: %v", err)
msg := err.Error()
Expand Down Expand Up @@ -216,7 +217,9 @@ func (r *CleanerReconciler) removeReport(ctx context.Context,
return fmt.Errorf("report instance still present")
}

func schedule(ctx context.Context, cleanerScope *scope.CleanerScope, logger logr.Logger) (*time.Time, error) {
func schedule(ctx context.Context, cleanerScope *scope.CleanerScope, jitterWindowInSeconds int,
logger logr.Logger) (*time.Time, error) {

newLastRunTime := cleanerScope.Cleaner.Status.LastRunTime

now := time.Now()
Expand All @@ -231,7 +234,7 @@ func schedule(ctx context.Context, cleanerScope *scope.CleanerScope, logger logr
logger.Info("set NextScheduleTime")
newNextScheduleTime = &metav1.Time{Time: *nextRun}
} else {
if shouldSchedule(cleanerScope.Cleaner, logger) {
if shouldSchedule(cleanerScope.Cleaner, jitterWindowInSeconds, logger) {
logger.Info("queuing job")
executorClient := executor.GetClient()
executorClient.Process(ctx, cleanerScope.Cleaner.Name)
Expand Down Expand Up @@ -285,8 +288,9 @@ func getNextScheduleTime(cleaner *appsv1alpha1.Cleaner, now time.Time) (*time.Ti
return &next, nil
}

func shouldSchedule(cleaner *appsv1alpha1.Cleaner, logger logr.Logger) bool {
now := time.Now()
func shouldSchedule(cleaner *appsv1alpha1.Cleaner, jitterWindowInSeconds int, logger logr.Logger) bool {
// if reconciliation is happening within jitterWindowInSeconds from scheduled time still process it
now := time.Now().Add(time.Duration(jitterWindowInSeconds) * time.Second)
logger.Info(fmt.Sprintf("currently next schedule is %s", cleaner.Status.NextScheduleTime.Time))

if now.Before(cleaner.Status.NextScheduleTime.Time) {
Expand Down

0 comments on commit d5eaeb7

Please sign in to comment.