From 9fe30bb3bc0520ca4b525d728eddf7ec3af7e952 Mon Sep 17 00:00:00 2001 From: Matthias Neugebauer Date: Mon, 16 Dec 2024 16:48:08 +0100 Subject: [PATCH] Add feature to force a Pod sync to speed up Secret change propagation --- api/v1alpha1/task_types.go | 6 +++ config/rbac/role-workflow-manager.yaml | 10 ++++ .../controllers/task_controller.go | 54 ++++++++++++++++++- 3 files changed, 69 insertions(+), 1 deletion(-) diff --git a/api/v1alpha1/task_types.go b/api/v1alpha1/task_types.go index c1eec29..90f9839 100644 --- a/api/v1alpha1/task_types.go +++ b/api/v1alpha1/task_types.go @@ -41,6 +41,12 @@ const ( JobProtectionFinalizer = "engine.nagare.media/job-protection" ) +const ( + // Annotation that indicates the last config change. If this annotation does not exist, there may have been no config + // changes. Users should not rely on this annotation. This is mainly set to force a Pod sync. + LastConfigChangePodAnnotation = "engine.nagare.media/last-config-change" +) + // Specification of a Task. type TaskSpec struct { // Reference to a TaskTemplate or ClusterTaskTemplate. Only references to these two kinds are allowed. A Task can only diff --git a/config/rbac/role-workflow-manager.yaml b/config/rbac/role-workflow-manager.yaml index ef6b7da..6d9812d 100644 --- a/config/rbac/role-workflow-manager.yaml +++ b/config/rbac/role-workflow-manager.yaml @@ -25,6 +25,16 @@ rules: verbs: - create - patch +- apiGroups: + - "" + resources: + - pods + verbs: + - get + - list + - patch + - update + - watch - apiGroups: - batch resources: diff --git a/internal/workflow-manager/controllers/task_controller.go b/internal/workflow-manager/controllers/task_controller.go index 98d007f..f9b457a 100644 --- a/internal/workflow-manager/controllers/task_controller.go +++ b/internal/workflow-manager/controllers/task_controller.go @@ -89,6 +89,7 @@ type TaskReconciler struct { } // +kubebuilder:rbac:groups="",resources=configmaps,verbs=get +// +kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch;update;patch // +kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete @@ -801,6 +802,8 @@ func (r *TaskReconciler) ensureJobServiceExists(ctx context.Context, task *engin } func (r *TaskReconciler) ensureWorkflowManagerHelperDataSecretExists(ctx context.Context, task *enginev1.Task) (Result, error) { + log := logf.FromContext(ctx) + // get MPE client for this task c, err := r.getMPEClientForTask(task) if err != nil { @@ -912,7 +915,14 @@ func (r *TaskReconciler) ensureWorkflowManagerHelperDataSecretExists(ctx context } if exists { - _, err := utils.Patch(ctx, c, secret, secretOld) + changed, err := utils.Patch(ctx, c, secret, secretOld) + if changed { + // It takes a sync loop interval to propagate Secret changes to Pods. This can be faster if we update the Pod + // container and force a reconsiliation. + if err2 := r.tryForcedJobPodsSync(ctx, task); err2 != nil { + log.Error(err, "failed to force a Pod sync to speedup Secret propagation") + } + } } else { err = c.Create(ctx, secret) } @@ -1034,6 +1044,48 @@ func (r *TaskReconciler) setCommonLabels(labels map[string]string, task *enginev labels[enginev1.TaskNameLabel] = task.Name } +func (r *TaskReconciler) tryForcedJobPodsSync(ctx context.Context, task *enginev1.Task) error { + job, err := r.resolveJobRef(ctx, task) + if err != nil { + return err + } + + c, err := r.getMPEClientForTask(task) + if err != nil { + return err + } + + sel, err := metav1.LabelSelectorAsSelector(job.Spec.Selector) + if err != nil { + return err + } + + pods := &corev1.PodList{} + err = c.List(ctx, pods, client.MatchingLabelsSelector{Selector: sel}) + if err != nil { + return err + } + + var errs []error + + for _, pod := range pods.Items { + if pod.Status.Phase == corev1.PodRunning { + oldPod := pod.DeepCopy() + if pod.Annotations == nil { + pod.Annotations = make(map[string]string) + } + pod.Annotations[enginev1.LastConfigChangePodAnnotation] = time.Now().UTC().Format(time.RFC3339) + + _, err = utils.Patch(ctx, c, &pod, oldPod) + if err != nil { + errs = append(errs, err) + } + } + } + + return kerrors.NewAggregate(errs) +} + func (r *TaskReconciler) resolveWorkflowRef(ctx context.Context, task *enginev1.Task) (*enginev1.Workflow, error) { wf := &enginev1.Workflow{} if err := r.Client.Get(ctx, client.ObjectKey{Namespace: task.Namespace, Name: task.Spec.WorkflowRef.Name}, wf); err != nil {