Skip to content

Commit

Permalink
Add feature to force a Pod sync to speed up Secret change propagation
Browse files Browse the repository at this point in the history
  • Loading branch information
mtneug committed Dec 16, 2024
1 parent 6755b59 commit 9fe30bb
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 1 deletion.
6 changes: 6 additions & 0 deletions api/v1alpha1/task_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ const (
JobProtectionFinalizer = "engine.nagare.media/job-protection"
)

const (
// Annotation that indicates the last config change. If this annotation does not exist, there may have been no config
// changes. Users should not rely on this annotation. This is mainly set to force a Pod sync.
LastConfigChangePodAnnotation = "engine.nagare.media/last-config-change"
)

// Specification of a Task.
type TaskSpec struct {
// Reference to a TaskTemplate or ClusterTaskTemplate. Only references to these two kinds are allowed. A Task can only
Expand Down
10 changes: 10 additions & 0 deletions config/rbac/role-workflow-manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,16 @@ rules:
verbs:
- create
- patch
- apiGroups:
- ""
resources:
- pods
verbs:
- get
- list
- patch
- update
- watch
- apiGroups:
- batch
resources:
Expand Down
54 changes: 53 additions & 1 deletion internal/workflow-manager/controllers/task_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type TaskReconciler struct {
}

// +kubebuilder:rbac:groups="",resources=configmaps,verbs=get
// +kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch;update;patch
// +kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -801,6 +802,8 @@ func (r *TaskReconciler) ensureJobServiceExists(ctx context.Context, task *engin
}

func (r *TaskReconciler) ensureWorkflowManagerHelperDataSecretExists(ctx context.Context, task *enginev1.Task) (Result, error) {
log := logf.FromContext(ctx)

// get MPE client for this task
c, err := r.getMPEClientForTask(task)
if err != nil {
Expand Down Expand Up @@ -912,7 +915,14 @@ func (r *TaskReconciler) ensureWorkflowManagerHelperDataSecretExists(ctx context
}

if exists {
_, err := utils.Patch(ctx, c, secret, secretOld)
changed, err := utils.Patch(ctx, c, secret, secretOld)
if changed {
// It takes a sync loop interval to propagate Secret changes to Pods. This can be faster if we update the Pod
// container and force a reconsiliation.
if err2 := r.tryForcedJobPodsSync(ctx, task); err2 != nil {
log.Error(err, "failed to force a Pod sync to speedup Secret propagation")
}
}
} else {
err = c.Create(ctx, secret)
}
Expand Down Expand Up @@ -1034,6 +1044,48 @@ func (r *TaskReconciler) setCommonLabels(labels map[string]string, task *enginev
labels[enginev1.TaskNameLabel] = task.Name
}

func (r *TaskReconciler) tryForcedJobPodsSync(ctx context.Context, task *enginev1.Task) error {
job, err := r.resolveJobRef(ctx, task)
if err != nil {
return err
}

c, err := r.getMPEClientForTask(task)
if err != nil {
return err
}

sel, err := metav1.LabelSelectorAsSelector(job.Spec.Selector)
if err != nil {
return err
}

pods := &corev1.PodList{}
err = c.List(ctx, pods, client.MatchingLabelsSelector{Selector: sel})
if err != nil {
return err
}

var errs []error

for _, pod := range pods.Items {
if pod.Status.Phase == corev1.PodRunning {
oldPod := pod.DeepCopy()
if pod.Annotations == nil {
pod.Annotations = make(map[string]string)
}
pod.Annotations[enginev1.LastConfigChangePodAnnotation] = time.Now().UTC().Format(time.RFC3339)

_, err = utils.Patch(ctx, c, &pod, oldPod)
if err != nil {
errs = append(errs, err)
}
}
}

return kerrors.NewAggregate(errs)
}

func (r *TaskReconciler) resolveWorkflowRef(ctx context.Context, task *enginev1.Task) (*enginev1.Workflow, error) {
wf := &enginev1.Workflow{}
if err := r.Client.Get(ctx, client.ObjectKey{Namespace: task.Namespace, Name: task.Spec.WorkflowRef.Name}, wf); err != nil {
Expand Down

0 comments on commit 9fe30bb

Please sign in to comment.