diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index a5f64137c6..7e1e8eebab 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -256,6 +256,14 @@ rules: - patch - update - watch +- apiGroups: + - apps.kruise.io + resources: + - ephemeraljobs/finalizers + verbs: + - get + - patch + - update - apiGroups: - apps.kruise.io resources: diff --git a/pkg/controller/ephemeraljob/econtainer/api.go b/pkg/controller/ephemeraljob/econtainer/api.go index 838681f975..e7d84abdd8 100644 --- a/pkg/controller/ephemeraljob/econtainer/api.go +++ b/pkg/controller/ephemeraljob/econtainer/api.go @@ -1,21 +1,28 @@ package econtainer import ( + "time" + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" v1 "k8s.io/api/core/v1" ) type EphemeralContainerInterface interface { - // GetEphemeralContainers will return all ephemeral container status. + // GetEphemeralContainersStatus will return all ephemeral container status. // Maybe they are not created by current ephemeral jobs. GetEphemeralContainersStatus(target *v1.Pod) []v1.ContainerStatus // GetEphemeralContainers return all ephemeral containers which have been created in target pods. // Maybe they are not created by current ephemeral jobs. GetEphemeralContainers(target *v1.Pod) []v1.EphemeralContainer + // ContainsEphemeralContainer return if target pod contains(1st return value) and owns(2nd return value) + // the ephemeral containers having the same name with the ones ephemeralJob want to inject. + // Owning an ephemeral containers to a ephemeralJob means KRUISE_EJOB_ID env of the ephemeral container + // equals to this ephemeralJob's uid. + ContainsEphemeralContainer(target *v1.Pod) (bool, bool) UpdateEphemeralContainer(target *v1.Pod) error CreateEphemeralContainer(target *v1.Pod) error - RemoveEphemeralContainer(target *v1.Pod) error + RemoveEphemeralContainer(target *v1.Pod) (*time.Duration, error) } func New(job *appsv1alpha1.EphemeralJob) EphemeralContainerInterface { diff --git a/pkg/controller/ephemeraljob/econtainer/econtainer_k8s.go b/pkg/controller/ephemeraljob/econtainer/econtainer_k8s.go index 302ede9be0..41b28a88a4 100644 --- a/pkg/controller/ephemeraljob/econtainer/econtainer_k8s.go +++ b/pkg/controller/ephemeraljob/econtainer/econtainer_k8s.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "time" appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" kubeclient "github.com/openkruise/kruise/pkg/client" @@ -211,9 +212,9 @@ func (k *k8sControl) createEphemeralContainerLegacy(targetPod *v1.Pod, eContaine } // RemoveEphemeralContainer is not support before kubernetes v1.23 -func (k *k8sControl) RemoveEphemeralContainer(target *v1.Pod) error { +func (k *k8sControl) RemoveEphemeralContainer(target *v1.Pod) (*time.Duration, error) { klog.Warning("RemoveEphemeralContainer is not support before kubernetes v1.23") - return nil + return nil, nil } // UpdateEphemeralContainer is not support before kubernetes v1.23 @@ -221,3 +222,22 @@ func (k *k8sControl) UpdateEphemeralContainer(target *v1.Pod) error { klog.Warning("UpdateEphemeralContainer is not support before kubernetes v1.23") return nil } + +func (k *k8sControl) ContainsEphemeralContainer(target *v1.Pod) (exists, owned bool) { + ephemeralContainersMaps, _ := getEphemeralContainersMaps(k.GetEphemeralContainers(target)) + for _, e := range k.Spec.Template.EphemeralContainers { + if targetEC, ok := ephemeralContainersMaps[e.Name]; ok { + return true, isCreatedByEJob(string(k.UID), targetEC) + } + } + return false, false +} + +func isCreatedByEJob(jobUid string, container v1.EphemeralContainer) bool { + for _, env := range container.Env { + if env.Name == appsv1alpha1.EphemeralContainerEnvKey && env.Value == jobUid { + return true + } + } + return false +} diff --git a/pkg/controller/ephemeraljob/ephemeraljob_controller.go b/pkg/controller/ephemeraljob/ephemeraljob_controller.go index 029f32fa6c..8feb4a5dee 100644 --- a/pkg/controller/ephemeraljob/ephemeraljob_controller.go +++ b/pkg/controller/ephemeraljob/ephemeraljob_controller.go @@ -34,6 +34,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" "k8s.io/klog/v2" kubecontroller "k8s.io/kubernetes/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/client" @@ -65,8 +66,9 @@ func Add(mgr manager.Manager) error { // newReconciler returns a new reconcile.Reconciler func newReconciler(mgr manager.Manager) *ReconcileEphemeralJob { return &ReconcileEphemeralJob{ - Client: utilclient.NewClientFromManager(mgr, "ephemeraljob-controller"), - scheme: mgr.GetScheme(), + Client: utilclient.NewClientFromManager(mgr, "ephemeraljob-controller"), + scheme: mgr.GetScheme(), + recorder: mgr.GetEventRecorderFor("ephemeraljob-controller"), } } @@ -98,11 +100,13 @@ var _ reconcile.Reconciler = &ReconcileEphemeralJob{} // ReconcileEphemeralJob reconciles a ImagePullJob object type ReconcileEphemeralJob struct { client.Client - scheme *runtime.Scheme + scheme *runtime.Scheme + recorder record.EventRecorder } // +kubebuilder:rbac:groups=apps.kruise.io,resources=ephemeraljobs,verbs=get;list;watch;update;patch;delete // +kubebuilder:rbac:groups=apps.kruise.io,resources=ephemeraljobs/status,verbs=get;update;patch +// +kubebuilder:rbac:groups=apps.kruise.io,resources=ephemeraljobs/finalizers,verbs=get;update;patch // +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=core,resources=pods/ephemeralcontainers,verbs=get;update;patch @@ -123,7 +127,6 @@ func (r *ReconcileEphemeralJob) Reconcile(context context.Context, request recon job := &appsv1alpha1.EphemeralJob{} err = r.Get(context, request.NamespacedName, job) - if err != nil { if errors.IsNotFound(err) { // Object not found, return. Created objects are automatically garbage collected. @@ -137,9 +140,14 @@ func (r *ReconcileEphemeralJob) Reconcile(context context.Context, request recon } if job.DeletionTimestamp != nil { - if err := r.removeEphemeralContainers(job); err != nil { + retryAfter, err := r.removeEphemeralContainers(job) + if err != nil { return reconcile.Result{}, err } + if retryAfter != nil { + return reconcile.Result{RequeueAfter: *retryAfter}, nil + } + job.Finalizers = deleteEphemeralContainerFinalizer(job.Finalizers, EphemeralContainerFinalizer) return reconcile.Result{}, r.Update(context, job) } @@ -281,6 +289,7 @@ func (r *ReconcileEphemeralJob) filterInjectedPods(job *appsv1alpha1.EphemeralJo return nil, err } + control := econtainer.New(job) // Ignore inactive pods var targetPods []*v1.Pod for i := range podList.Items { @@ -288,7 +297,7 @@ func (r *ReconcileEphemeralJob) filterInjectedPods(job *appsv1alpha1.EphemeralJo if !kubecontroller.IsPodActive(pod) { continue } - if exists, owned := existEphemeralContainer(job, pod); exists { + if exists, owned := control.ContainsEphemeralContainer(pod); exists { if owned { targetPods = append(targetPods, pod) } else { @@ -335,7 +344,7 @@ func (r *ReconcileEphemeralJob) syncTargetPods(job *appsv1alpha1.EphemeralJob, t _, err := clonesetutils.DoItSlowly(len(toCreatePods), kubecontroller.SlowStartInitialBatchSize, func() error { pod := <-podsCreationChan - if exists, _ := existEphemeralContainer(job, pod); exists { + if exists, _ := control.ContainsEphemeralContainer(pod); exists { return nil } @@ -348,11 +357,14 @@ func (r *ReconcileEphemeralJob) syncTargetPods(job *appsv1alpha1.EphemeralJob, t for _, podEphemeralContainerName := range getPodEphemeralContainers(pod, job) { scaleExpectations.ObserveScale(key, expectations.Create, podEphemeralContainerName) } - return err + return fmt.Errorf("failed to create ephemeral container in pod %s/%s: %v", pod.Namespace, pod.Name, err) } return nil }) + if err != nil { + r.recorder.Eventf(job, v1.EventTypeWarning, "CreateFailed", err.Error()) + } return err } @@ -433,19 +445,22 @@ func (r *ReconcileEphemeralJob) updateJobStatus(job *appsv1alpha1.EphemeralJob) return r.Status().Update(context.TODO(), job) } -func (r *ReconcileEphemeralJob) removeEphemeralContainers(job *appsv1alpha1.EphemeralJob) error { +func (r *ReconcileEphemeralJob) removeEphemeralContainers(job *appsv1alpha1.EphemeralJob) (*time.Duration, error) { targetPods, err := r.filterInjectedPods(job) if err != nil { klog.Errorf("Failed to get ephemeral job %s/%s related target pods: %v", job.Namespace, job.Name, err) - return err + return nil, err } - var errors error + control := econtainer.New(job) + var retryAfter *time.Duration for _, pod := range targetPods { - if e := econtainer.New(job).RemoveEphemeralContainer(pod); e != nil { - errors = e + if duration, removeErr := control.RemoveEphemeralContainer(pod); removeErr != nil { + err = fmt.Errorf("failed to remove ephemeral containers for pod %s/%s: %s", pod.Namespace, pod.Name, removeErr.Error()) + r.recorder.Eventf(job, v1.EventTypeWarning, "RemoveFailed", removeErr.Error()) + } else if duration != nil && (retryAfter == nil || *retryAfter > *duration) { + retryAfter = duration } } - - return errors + return retryAfter, err } diff --git a/pkg/controller/ephemeraljob/ephemeraljob_utils.go b/pkg/controller/ephemeraljob/ephemeraljob_utils.go index 43b111bc46..988b60b3a5 100644 --- a/pkg/controller/ephemeraljob/ephemeraljob_utils.go +++ b/pkg/controller/ephemeraljob/ephemeraljob_utils.go @@ -92,17 +92,6 @@ func getPodEphemeralContainers(pod *v1.Pod, ejob *appsv1alpha1.EphemeralJob) []s return podEphemeralNames } -func existEphemeralContainer(job *appsv1alpha1.EphemeralJob, targetPod *v1.Pod) (exists, owned bool) { - ephemeralContainersMaps, _ := getEphemeralContainersMaps(econtainer.New(job).GetEphemeralContainers(targetPod)) - for _, e := range job.Spec.Template.EphemeralContainers { - if targetEC, ok := ephemeralContainersMaps[e.Name]; ok { - return true, isCreatedByEJob(string(job.UID), targetEC) - } - } - - return false, false -} - func existDuplicatedEphemeralContainer(job *appsv1alpha1.EphemeralJob, targetPod *v1.Pod) bool { ephemeralContainersMaps, _ := getEphemeralContainersMaps(econtainer.New(job).GetEphemeralContainers(targetPod)) for _, e := range job.Spec.Template.EphemeralContainers {