Skip to content

Commit

Permalink
JobSidecarTerminator support ignore exit code capability via env
Browse files Browse the repository at this point in the history
Signed-off-by: liheng.zms <liheng.zms@alibaba-inc.com>
  • Loading branch information
zmberg committed Mar 10, 2025
1 parent 29258d3 commit 2d1cda7
Show file tree
Hide file tree
Showing 9 changed files with 575 additions and 130 deletions.
3 changes: 3 additions & 0 deletions apis/apps/v1alpha1/well_known_labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,7 @@ const (
// using in-place update strategy to kill sidecar. This image must be given if you want to use in-place update
// strategy to terminate sidecar containers.
KruiseTerminateSidecarWithImageEnv = "KRUISE_TERMINATE_SIDECAR_WHEN_JOB_EXIT_WITH_IMAGE"

// KruiseIgnoreContainerExitCodeEnv is an env name, which represents a switch to ignore the exit code of sidecar container.
KruiseIgnoreContainerExitCodeEnv = "KRUISE_TERMINATE_SIDECAR_IGNORE_EXIT_CODE"
)
12 changes: 8 additions & 4 deletions pkg/controller/sidecarterminator/inplace_update_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"k8s.io/klog/v2"
)

func (r *ReconcileSidecarTerminator) executeInPlaceUpdateAction(originalPod *corev1.Pod, sidecars sets.String) error {
func (r *ReconcileSidecarTerminator) executeInPlaceUpdateAction(originalPod *corev1.Pod, sidecars sets.Set[string]) error {
uncompletedSidecars := filterUncompletedSidecars(originalPod, sidecars)
if uncompletedSidecars.Len() == 0 {
return nil
Expand Down Expand Up @@ -60,22 +60,26 @@ func (r *ReconcileSidecarTerminator) executeInPlaceUpdateAction(originalPod *cor
klog.V(3).InfoS("SidecarTerminator -- InPlace update pod successfully", "pod", klog.KObj(originalPod))

r.recorder.Eventf(originalPod, corev1.EventTypeNormal, "SidecarTerminator",
"Kruise SidecarTerminator is trying to terminate sidecar %v using in-place update", uncompletedSidecars.List())
"Kruise SidecarTerminator is trying to terminate sidecar %v using in-place update", uncompletedSidecars.UnsortedList())
}

return err
}

// updateSidecarsForInPlaceUpdate replace sidecar image with KruiseTerminateSidecarWithImageEnv
func updateSidecarsForInPlaceUpdate(pod *corev1.Pod, sidecars sets.String) (bool, *corev1.Pod) {
func updateSidecarsForInPlaceUpdate(pod *corev1.Pod, sidecars sets.Set[string]) (bool, *corev1.Pod) {
changed := false
for i := range pod.Spec.Containers {
container := &pod.Spec.Containers[i]
if !sidecars.Has(container.Name) {
continue
}
newImage := getImageFromEnv(container)
if container.Image == newImage {
continue

Check warning on line 79 in pkg/controller/sidecarterminator/inplace_update_action.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/sidecarterminator/inplace_update_action.go#L79

Added line #L79 was not covered by tests
}
changed = true
container.Image = getImageFromEnv(container)
container.Image = newImage
}
return changed, pod
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/controller/sidecarterminator/kill_container_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

func (r *ReconcileSidecarTerminator) executeKillContainerAction(pod *corev1.Pod, sidecars sets.String) error {
func (r *ReconcileSidecarTerminator) executeKillContainerAction(pod *corev1.Pod, sidecars sets.Set[string]) error {
uncompletedSidecars := filterUncompletedSidecars(pod, sidecars)
if uncompletedSidecars.Len() == 0 {
return nil
Expand All @@ -47,7 +47,7 @@ func (r *ReconcileSidecarTerminator) executeKillContainerAction(pod *corev1.Pod,
}

var sidecarContainers []appsv1alpha1.ContainerRecreateRequestContainer
for _, name := range uncompletedSidecars.List() {
for _, name := range uncompletedSidecars.UnsortedList() {
sidecarContainers = append(sidecarContainers, appsv1alpha1.ContainerRecreateRequestContainer{
Name: name,
})
Expand Down Expand Up @@ -78,14 +78,14 @@ func (r *ReconcileSidecarTerminator) executeKillContainerAction(pod *corev1.Pod,
klog.V(3).InfoS("SidecarTerminator -- Creating CRR successfully", "containerRecreateRequest", klog.KObj(crr))

r.recorder.Eventf(pod, corev1.EventTypeNormal, "SidecarTerminator",
"Kruise SidecarTerminator is trying to terminate sidecar %v using crr", uncompletedSidecars.List())
"Kruise SidecarTerminator is trying to terminate sidecar %v using crr", uncompletedSidecars.UnsortedList())
}

return err
}

func filterUncompletedSidecars(pod *corev1.Pod, sidecars sets.String) sets.String {
uncompletedSidecars := sets.NewString(sidecars.List()...)
func filterUncompletedSidecars(pod *corev1.Pod, sidecars sets.Set[string]) sets.Set[string] {
uncompletedSidecars := sets.New[string](sidecars.UnsortedList()...)
for i := range pod.Status.ContainerStatuses {
status := &pod.Status.ContainerStatuses[i]
if sidecars.Has(status.Name) && status.State.Terminated != nil {
Expand Down
53 changes: 15 additions & 38 deletions pkg/controller/sidecarterminator/sidecar_terminator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"encoding/json"
"flag"
"fmt"
"strings"
"time"

corev1 "k8s.io/api/core/v1"
Expand All @@ -38,7 +37,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
"github.com/openkruise/kruise/pkg/features"
"github.com/openkruise/kruise/pkg/util"
utilclient "github.com/openkruise/kruise/pkg/util/client"
Expand Down Expand Up @@ -136,22 +134,25 @@ func (r *ReconcileSidecarTerminator) doReconcile(pod *corev1.Pod) (reconcile.Res
if !isInterestingPod(pod) {
return reconcile.Result{}, nil
}

sidecarNeedToExecuteKillContainer, sidecarNeedToExecuteInPlaceUpdate, err := r.groupSidecars(pod)

vk, err := IsPodRunningOnVirtualKubelet(pod, r.Client)
if err != nil {
klog.ErrorS(err, "SidecarTerminator -- Error occurred when try to check if pod is running on virtual-kubelet", "pod", klog.KObj(pod))

Check warning on line 139 in pkg/controller/sidecarterminator/sidecar_terminator_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/sidecarterminator/sidecar_terminator_controller.go#L139

Added line #L139 was not covered by tests
return reconcile.Result{}, err
}

if err := r.executeInPlaceUpdateAction(pod, sidecarNeedToExecuteInPlaceUpdate); err != nil {
normalSidecarNames, ignoreExitCodeSidecarNames, inplaceUpdateSidecarNames := getSidecarContainerNames(pod, vk)
if err = r.executeInPlaceUpdateAction(pod, inplaceUpdateSidecarNames); err != nil {
return reconcile.Result{}, err
}

if err = r.executeKillContainerAction(pod, normalSidecarNames); err != nil {
return reconcile.Result{}, err
}

Check warning on line 148 in pkg/controller/sidecarterminator/sidecar_terminator_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/sidecarterminator/sidecar_terminator_controller.go#L147-L148

Added lines #L147 - L148 were not covered by tests
if ignoreExitCodeSidecarNames.Len() == 0 || !containersCompleted(pod, normalSidecarNames) || !containersCompleted(pod, inplaceUpdateSidecarNames) {
return reconcile.Result{}, nil
}
if err := r.markJobPodTerminated(pod); err != nil {
return reconcile.Result{}, err
}

if err := r.executeKillContainerAction(pod, sidecarNeedToExecuteKillContainer); err != nil {
if err := r.executeKillContainerAction(pod, ignoreExitCodeSidecarNames); err != nil {
return reconcile.Result{}, err
}

Expand Down Expand Up @@ -184,8 +185,9 @@ func (r *ReconcileSidecarTerminator) markJobPodTerminated(pod *corev1.Pod) error
},
}

mainContainers, _ := groupMainSidecarContainers(pod)
// patch pod phase
if containersSucceeded(pod, getMain(pod)) {
if containersSucceeded(pod, mainContainers) {
status.Phase = corev1.PodSucceeded
} else {
status.Phase = corev1.PodFailed
Expand All @@ -203,32 +205,7 @@ func (r *ReconcileSidecarTerminator) markJobPodTerminated(pod *corev1.Pod) error
return nil
}

func (r *ReconcileSidecarTerminator) groupSidecars(pod *corev1.Pod) (sets.String, sets.String, error) {
runningOnVK, err := IsPodRunningOnVirtualKubelet(pod, r.Client)
if err != nil {
return nil, nil, client.IgnoreNotFound(err)
}

inPlaceUpdate := sets.NewString()
killContainer := sets.NewString()
for i := range pod.Spec.Containers {
container := &pod.Spec.Containers[i]
for j := range container.Env {
if !runningOnVK && container.Env[j].Name == appsv1alpha1.KruiseTerminateSidecarEnv &&
strings.EqualFold(container.Env[j].Value, "true") {
killContainer.Insert(container.Name)
break
}
if container.Env[j].Name == appsv1alpha1.KruiseTerminateSidecarWithImageEnv &&
container.Env[j].Value != "" {
inPlaceUpdate.Insert(container.Name)
}
}
}
return killContainer, inPlaceUpdate, nil
}

func containersCompleted(pod *corev1.Pod, containers sets.String) bool {
func containersCompleted(pod *corev1.Pod, containers sets.Set[string]) bool {
if len(pod.Spec.Containers) != len(pod.Status.ContainerStatuses) {
return false
}
Expand All @@ -242,7 +219,7 @@ func containersCompleted(pod *corev1.Pod, containers sets.String) bool {
return true
}

func containersSucceeded(pod *corev1.Pod, containers sets.String) bool {
func containersSucceeded(pod *corev1.Pod, containers sets.Set[string]) bool {
if len(pod.Spec.Containers) != len(pod.Status.ContainerStatuses) {
return false
}
Expand Down
125 changes: 98 additions & 27 deletions pkg/controller/sidecarterminator/sidecar_terminator_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,20 @@ import (
"context"
"encoding/json"
"reflect"
"sort"
"testing"

utilruntime "k8s.io/apimachinery/pkg/util/runtime"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
)

const (
Expand Down Expand Up @@ -250,24 +249,6 @@ func TestKruiseDaemonStrategy(t *testing.T) {
},
expectedPod: func() *corev1.Pod {
pod := podDemo.DeepCopy()
pod.Status.Phase = corev1.PodFailed
return pod
},
},
{
name: "normal pod with sidecar, restartPolicy=Never, main containers failed and sidecar running",
getIn: func() *corev1.Pod {
podIn := podDemo.DeepCopy()
podIn.Status.ContainerStatuses[0] = failedMainContainerStatus
podIn.Status.ContainerStatuses[1] = runningSidecarContainerStatus
return podIn
},
getCRR: func() *appsv1alpha1.ContainerRecreateRequest {
return crrDemo.DeepCopy()
},
expectedPod: func() *corev1.Pod {
pod := podDemo.DeepCopy()
pod.Status.Phase = corev1.PodFailed
return pod
},
},
Expand Down Expand Up @@ -302,7 +283,6 @@ func TestKruiseDaemonStrategy(t *testing.T) {
},
expectedPod: func() *corev1.Pod {
pod := podDemo.DeepCopy()
pod.Status.Phase = corev1.PodSucceeded
return pod
},
},
Expand Down Expand Up @@ -387,7 +367,6 @@ func TestKruiseDaemonStrategy(t *testing.T) {
},
expectedPod: func() *corev1.Pod {
pod := podDemo.DeepCopy()
pod.Status.Phase = corev1.PodSucceeded
return pod
},
},
Expand Down Expand Up @@ -419,7 +398,6 @@ func TestKruiseDaemonStrategy(t *testing.T) {
},
expectedPod: func() *corev1.Pod {
pod := podDemo.DeepCopy()
pod.Status.Phase = corev1.PodSucceeded
return pod
},
},
Expand Down Expand Up @@ -451,7 +429,6 @@ func TestKruiseDaemonStrategy(t *testing.T) {
},
expectedPod: func() *corev1.Pod {
pod := podDemo.DeepCopy()
pod.Status.Phase = corev1.PodSucceeded
return pod
},
},
Expand Down Expand Up @@ -482,6 +459,82 @@ func TestKruiseDaemonStrategy(t *testing.T) {
return pod
},
},
{
name: "normal pod with sidecar, restartPolicy=OnFailure, 2 succeeded main containers, 3 sidecars uncompleted",
getIn: func() *corev1.Pod {
podIn := podDemo.DeepCopy()
podIn.Spec.Containers = []corev1.Container{
mainContainerFactory("main-1"),
mainContainerFactory("main-2"),
sidecarContainerFactory("sidecar-1", "true"),
sidecarContainerFactory("sidecar-2", "true"),
sidecarContainerFactory("sidecar-3", "true"),
}
podIn.Spec.Containers[4].Env = append(podIn.Spec.Containers[4].Env, corev1.EnvVar{
Name: appsv1alpha1.KruiseIgnoreContainerExitCodeEnv,
Value: "true",
})
podIn.Spec.RestartPolicy = corev1.RestartPolicyOnFailure
podIn.Status.ContainerStatuses = []corev1.ContainerStatus{
rename(succeededMainContainerStatus.DeepCopy(), "main-1"),
rename(succeededMainContainerStatus.DeepCopy(), "main-2"),
rename(uncompletedSidecarContainerStatus.DeepCopy(), "sidecar-1"),
rename(uncompletedSidecarContainerStatus.DeepCopy(), "sidecar-2"),
rename(uncompletedSidecarContainerStatus.DeepCopy(), "sidecar-3"),
}
return podIn
},
getCRR: func() *appsv1alpha1.ContainerRecreateRequest {
crr := crrDemo.DeepCopy()
crr.Spec.Containers = []appsv1alpha1.ContainerRecreateRequestContainer{
{Name: "sidecar-2"},
{Name: "sidecar-1"},
}
return crr
},
expectedPod: func() *corev1.Pod {
pod := podDemo.DeepCopy()
return pod
},
},
{
name: "normal pod with sidecar, restartPolicy=OnFailure, 2 succeeded main containers, 2 sidecars completed, 1 uncompleted",
getIn: func() *corev1.Pod {
podIn := podDemo.DeepCopy()
podIn.Spec.Containers = []corev1.Container{
mainContainerFactory("main-1"),
mainContainerFactory("main-2"),
sidecarContainerFactory("sidecar-1", "true"),
sidecarContainerFactory("sidecar-2", "true"),
sidecarContainerFactory("sidecar-3", "true"),
}
podIn.Spec.Containers[4].Env = append(podIn.Spec.Containers[4].Env, corev1.EnvVar{
Name: appsv1alpha1.KruiseIgnoreContainerExitCodeEnv,
Value: "true",
})
podIn.Spec.RestartPolicy = corev1.RestartPolicyOnFailure
podIn.Status.ContainerStatuses = []corev1.ContainerStatus{
rename(succeededMainContainerStatus.DeepCopy(), "main-1"),
rename(succeededMainContainerStatus.DeepCopy(), "main-2"),
rename(completedSidecarContainerStatus.DeepCopy(), "sidecar-1"),
rename(completedSidecarContainerStatus.DeepCopy(), "sidecar-2"),
rename(uncompletedSidecarContainerStatus.DeepCopy(), "sidecar-3"),
}
return podIn
},
getCRR: func() *appsv1alpha1.ContainerRecreateRequest {
crr := crrDemo.DeepCopy()
crr.Spec.Containers = []appsv1alpha1.ContainerRecreateRequestContainer{
{Name: "sidecar-3"},
}
return crr
},
expectedPod: func() *corev1.Pod {
pod := podDemo.DeepCopy()
pod.Status.Phase = corev1.PodSucceeded
return pod
},
},
}

for _, cs := range cases {
Expand Down Expand Up @@ -514,9 +567,14 @@ func TestKruiseDaemonStrategy(t *testing.T) {
realCRR.TypeMeta.APIVersion = appsv1alpha1.SchemeGroupVersion.String()
realCRR.TypeMeta.Kind = "ContainerRecreateRequest"

if realCRR != nil {
sort.Sort(SortContainers(realCRR.Spec.Containers))
}
if expectCRR != nil {
sort.Sort(SortContainers(expectCRR.Spec.Containers))
}
realBy, _ := json.Marshal(realCRR)
expectBy, _ := json.Marshal(expectCRR)

if !(expectCRR == nil && errors.IsNotFound(err) || reflect.DeepEqual(realBy, expectBy)) {
t.Fatal("Get unexpected CRR")
}
Expand Down Expand Up @@ -559,6 +617,7 @@ func TestInPlaceUpdateStrategy(t *testing.T) {
podIn.Spec.Containers[1].Env = []corev1.EnvVar{
{Name: appsv1alpha1.KruiseTerminateSidecarWithImageEnv, Value: ExitQuicklyImage},
}
podIn.Spec.NodeName = vkNode.Name
return podIn
},
expectedNumber: 1,
Expand Down Expand Up @@ -735,3 +794,15 @@ func rename(status *corev1.ContainerStatus, name string) corev1.ContainerStatus
status.Name = name
return *status
}

type SortContainers []appsv1alpha1.ContainerRecreateRequestContainer

func (s SortContainers) Len() int {
return len(s)
}
func (s SortContainers) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (s SortContainers) Less(i, j int) bool {
return s[i].Name < s[j].Name
}
Loading

0 comments on commit 2d1cda7

Please sign in to comment.