diff --git a/.github/workflows/e2e-1.18.yaml b/.github/workflows/e2e-1.18.yaml index b691404f19..66f11ae918 100644 --- a/.github/workflows/e2e-1.18.yaml +++ b/.github/workflows/e2e-1.18.yaml @@ -557,6 +557,8 @@ jobs: - name: Install Kruise run: | set -ex + kubectl create ns kruise-system + kubectl apply -f test/kruise-e2e-config.yaml kubectl cluster-info IMG=openkruise/kruise-manager:e2e-${GITHUB_RUN_ID} ./scripts/deploy_kind.sh NODES=$(kubectl get node | wc -l) @@ -588,6 +590,7 @@ jobs: export KUBECONFIG=/home/runner/.kube/config make ginkgo set +e + kubectl apply -f https://raw.githubusercontent.com/kubeflow/training-operator/refs/tags/v1.3.0/manifests/base/crds/kubeflow.org_tfjobs.yaml ./bin/ginkgo -timeout 90m -v --skip='\[apps\] (AppStatefulSetStorage|StatefulSet|PullImage|PullImages|ContainerRecreateRequest|DaemonSet|SidecarSet|EphemeralJob)' --skip='\[policy\] PodUnavailableBudget' test/e2e retVal=$? restartCount=$(kubectl get pod -n kruise-system -l control-plane=controller-manager --no-headers | awk '{print $4}') diff --git a/.github/workflows/e2e-1.24.yaml b/.github/workflows/e2e-1.24.yaml index 4a4448afd2..c46320b329 100644 --- a/.github/workflows/e2e-1.24.yaml +++ b/.github/workflows/e2e-1.24.yaml @@ -618,6 +618,8 @@ jobs: - name: Install Kruise run: | set -ex + kubectl create ns kruise-system + kubectl apply -f test/kruise-e2e-config.yaml kubectl cluster-info IMG=openkruise/kruise-manager:e2e-${GITHUB_RUN_ID} ./scripts/deploy_kind.sh NODES=$(kubectl get node | wc -l) @@ -649,6 +651,7 @@ jobs: export KUBECONFIG=/home/runner/.kube/config make ginkgo set +e + kubectl apply -f https://raw.githubusercontent.com/kubeflow/training-operator/refs/heads/v1.8-branch/manifests/base/crds/kubeflow.org_tfjobs.yaml ./bin/ginkgo -timeout 90m -v --skip='\[apps\] (AppStatefulSetStorage|StatefulSet|PullImage|PullImages|ContainerRecreateRequest|DaemonSet|SidecarSet|EphemeralJob)' --skip='\[policy\] PodUnavailableBudget' test/e2e retVal=$? restartCount=$(kubectl get pod -n kruise-system -l control-plane=controller-manager --no-headers | awk '{print $4}') diff --git a/.github/workflows/e2e-1.26.yaml b/.github/workflows/e2e-1.26.yaml index 8ae7e413cf..adf76c717a 100644 --- a/.github/workflows/e2e-1.26.yaml +++ b/.github/workflows/e2e-1.26.yaml @@ -617,6 +617,8 @@ jobs: - name: Install Kruise run: | set -ex + kubectl create ns kruise-system + kubectl apply -f test/kruise-e2e-config.yaml kubectl cluster-info IMG=openkruise/kruise-manager:e2e-${GITHUB_RUN_ID} ./scripts/deploy_kind.sh NODES=$(kubectl get node | wc -l) @@ -648,6 +650,7 @@ jobs: export KUBECONFIG=/home/runner/.kube/config make ginkgo set +e + kubectl apply -f https://raw.githubusercontent.com/kubeflow/training-operator/refs/heads/v1.8-branch/manifests/base/crds/kubeflow.org_tfjobs.yaml ./bin/ginkgo -timeout 90m -v --skip='\[apps\] (AppStatefulSetStorage|StatefulSet|PullImage|PullImages|ContainerRecreateRequest|DaemonSet|SidecarSet|EphemeralJob)' --skip='\[policy\] PodUnavailableBudget' test/e2e retVal=$? restartCount=$(kubectl get pod -n kruise-system -l control-plane=controller-manager --no-headers | awk '{print $4}') diff --git a/.github/workflows/e2e-1.28.yaml b/.github/workflows/e2e-1.28.yaml index 1bc8478df5..b7e8ce8a27 100644 --- a/.github/workflows/e2e-1.28.yaml +++ b/.github/workflows/e2e-1.28.yaml @@ -700,6 +700,8 @@ jobs: - name: Install Kruise run: | set -ex + kubectl create ns kruise-system + kubectl apply -f test/kruise-e2e-config.yaml kubectl cluster-info IMG=openkruise/kruise-manager:e2e-${GITHUB_RUN_ID} ./scripts/deploy_kind.sh NODES=$(kubectl get node | wc -l) @@ -731,6 +733,7 @@ jobs: export KUBECONFIG=/home/runner/.kube/config make ginkgo set +e + kubectl apply -f https://raw.githubusercontent.com/kubeflow/training-operator/refs/heads/v1.8-branch/manifests/base/crds/kubeflow.org_tfjobs.yaml ./bin/ginkgo -timeout 90m -v --skip='\[apps\] (InplaceVPA|AppStatefulSetStorage|StatefulSet|PullImage|PullImages|ContainerRecreateRequest|DaemonSet|SidecarSet|EphemeralJob)' --skip='\[policy\] PodUnavailableBudget' test/e2e retVal=$? restartCount=$(kubectl get pod -n kruise-system -l control-plane=controller-manager --no-headers | awk '{print $4}') diff --git a/Makefile b/Makefile index 8b7a10be2e..80fff843de 100644 --- a/Makefile +++ b/Makefile @@ -175,6 +175,8 @@ kube-load-image: $(tools/kind) # install-kruise install kruise with local build image to kube cluster. .PHONY: install-kruise install-kruise: + kubectl create ns kruise-system + kubectl apply -f test/kruise-e2e-config.yaml tools/hack/install-kruise.sh $(IMG) # run-kruise-e2e-test starts to run kruise e2e tests. diff --git a/apis/apps/v1alpha1/workloadspread_types.go b/apis/apps/v1alpha1/workloadspread_types.go index 5b8784dc01..8955ed50b5 100644 --- a/apis/apps/v1alpha1/workloadspread_types.go +++ b/apis/apps/v1alpha1/workloadspread_types.go @@ -28,6 +28,11 @@ type WorkloadSpreadSpec struct { // TargetReference is the target workload that WorkloadSpread want to control. TargetReference *TargetReference `json:"targetRef"` + // TargetFilter allows WorkloadSpread to manage only a portion of the Pods in the TargetReference: + // by specifying the criteria for the Pods to be managed through a label selector, + // and by specifying how to obtain the total number of these selected Pods from the workload using replicasPaths. + TargetFilter *TargetFilter `json:"targetFilter,omitempty"` + // Subsets describes the pods distribution details between each of subsets. // +patchMergeKey=name // +patchStrategy=merge @@ -48,6 +53,58 @@ type TargetReference struct { Name string `json:"name"` } +/* +TargetFilter is an optional parameter that allows WorkloadSpread to manage only a subset of the Pods generated by the target workload. + +For example, suppose a WorkloadSpread points to the following Kubeflow TFJob resource: + + ```yaml + apiVersion: kubeflow.org/v1 + kind: TFJob + spec: + tfReplicaSpecs: + PS: + replicas: 1 + ... + MASTER: + replicas: 1 + ... + Worker: + replicas: 2 + ... + ``` + +If you want to manage only the 2 Worker Pods that are generated, you need to configure the TargetFilter as follows: + + ```yaml + targetFilter: + selector: + matchLabels: + role: worker + replicasPathList: + - spec.tfReplicaSpecs.Worker.replicas + ``` + +With this configuration, the PS Pods and Master Pods generated by the TFJob will not be managed by WorkloadSpread and will not be +counted toward the total number of replicas. +*/ +type TargetFilter struct { + // Selector is used to filter the Pods to be managed. + // + //+optional + Selector *metav1.LabelSelector `json:"selector,omitempty"` + + // ReplicasPathList is a list of resource paths used to specify how to determine the total number of replicas of + // the target workload after filtering. If this list is not empty, WorkloadSpread will look for the corresponding + // values in the target resource according to each path, and treat the sum of these values as the total number of replicas after filtering. + // + // The replicas path is a dot-separated path, similar to "spec.replicas". If there are arrays, you can use numbers to denote indexes, like "subsets.1.replicas". + // The real values of these paths must be integers. + // + // +optional + ReplicasPathList []string `json:"replicasPathList,omitempty"` +} + // WorkloadSpreadScheduleStrategyType is a string enumeration type that enumerates // all possible schedule strategies for the WorkloadSpread controller. // +kubebuilder:validation:Enum=Adaptive;Fixed;"" diff --git a/apis/apps/v1alpha1/zz_generated.deepcopy.go b/apis/apps/v1alpha1/zz_generated.deepcopy.go index cd443d3fc1..7eb70819c8 100644 --- a/apis/apps/v1alpha1/zz_generated.deepcopy.go +++ b/apis/apps/v1alpha1/zz_generated.deepcopy.go @@ -3268,6 +3268,31 @@ func (in *SyncStatus) DeepCopy() *SyncStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TargetFilter) DeepCopyInto(out *TargetFilter) { + *out = *in + if in.Selector != nil { + in, out := &in.Selector, &out.Selector + *out = new(metav1.LabelSelector) + (*in).DeepCopyInto(*out) + } + if in.ReplicasPathList != nil { + in, out := &in.ReplicasPathList, &out.ReplicasPathList + *out = make([]string, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TargetFilter. +func (in *TargetFilter) DeepCopy() *TargetFilter { + if in == nil { + return nil + } + out := new(TargetFilter) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *TargetReference) DeepCopyInto(out *TargetReference) { *out = *in @@ -3726,6 +3751,11 @@ func (in *WorkloadSpreadSpec) DeepCopyInto(out *WorkloadSpreadSpec) { *out = new(TargetReference) **out = **in } + if in.TargetFilter != nil { + in, out := &in.TargetFilter, &out.TargetFilter + *out = new(TargetFilter) + (*in).DeepCopyInto(*out) + } if in.Subsets != nil { in, out := &in.Subsets, &out.Subsets *out = make([]WorkloadSpreadSubset, len(*in)) diff --git a/config/crd/bases/apps.kruise.io_workloadspreads.yaml b/config/crd/bases/apps.kruise.io_workloadspreads.yaml index 7d6fd2c20c..7bf83b8db7 100644 --- a/config/crd/bases/apps.kruise.io_workloadspreads.yaml +++ b/config/crd/bases/apps.kruise.io_workloadspreads.yaml @@ -310,6 +310,69 @@ spec: - name type: object type: array + targetFilter: + description: |- + TargetFilter allows WorkloadSpread to manage only a portion of the Pods in the TargetReference: + by specifying the criteria for the Pods to be managed through a label selector, + and by specifying how to obtain the total number of these selected Pods from the workload using replicasPaths. + properties: + replicasPathList: + description: |- + ReplicasPathList is a list of resource paths used to specify how to determine the total number of replicas of + the target workload after filtering. If this list is not empty, WorkloadSpread will look for the corresponding + values in the target resource according to each path, and treat the sum of these values as the total number of replicas after filtering. + + + The replicas path is a dot-separated path, similar to "spec.replicas". If there are arrays, you can use numbers to denote indexes, like "subsets.1.replicas". + The real values of these paths must be integers. + items: + type: string + type: array + selector: + description: Selector is used to filter the Pods to be managed. + properties: + matchExpressions: + description: matchExpressions is a list of label selector + requirements. The requirements are ANDed. + items: + description: |- + A label selector requirement is a selector that contains values, a key, and an operator that + relates the key and values. + properties: + key: + description: key is the label key that the selector + applies to. + type: string + operator: + description: |- + operator represents a key's relationship to a set of values. + Valid operators are In, NotIn, Exists and DoesNotExist. + type: string + values: + description: |- + values is an array of string values. If the operator is In or NotIn, + the values array must be non-empty. If the operator is Exists or DoesNotExist, + the values array must be empty. This array is replaced during a strategic + merge patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchLabels: + additionalProperties: + type: string + description: |- + matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels + map is equivalent to an element of matchExpressions, whose key field is "key", the + operator is "In", and the values array contains only "value". The requirements are ANDed. + type: object + type: object + x-kubernetes-map-type: atomic + type: object targetRef: description: TargetReference is the target workload that WorkloadSpread want to control. diff --git a/pkg/controller/workloadspread/workloadspread_controller.go b/pkg/controller/workloadspread/workloadspread_controller.go index 4662e09b76..951839ca7f 100644 --- a/pkg/controller/workloadspread/workloadspread_controller.go +++ b/pkg/controller/workloadspread/workloadspread_controller.go @@ -34,6 +34,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/tools/record" @@ -237,7 +238,7 @@ func (r *ReconcileWorkloadSpread) Reconcile(_ context.Context, req reconcile.Req func (r *ReconcileWorkloadSpread) getPodJob(ref *appsv1alpha1.TargetReference, namespace string) ([]*corev1.Pod, int32, error) { ok, err := wsutil.VerifyGroupKind(ref, controllerKindJob.Kind, []string{controllerKindJob.Group}) if err != nil || !ok { - return nil, -1, err + return nil, 0, err } job := &batchv1.Job{} @@ -248,13 +249,13 @@ func (r *ReconcileWorkloadSpread) getPodJob(ref *appsv1alpha1.TargetReference, n klog.V(3).InfoS("Could not find Job", "job", klog.KRef(namespace, ref.Name)) return nil, 0, nil } - return nil, -1, err + return nil, 0, err } labelSelector, err := util.ValidatedLabelSelectorAsSelector(job.Spec.Selector) if err != nil { klog.ErrorS(err, "Failed to get labelSelector") - return nil, -1, nil + return nil, 0, err } podList := &corev1.PodList{} @@ -265,7 +266,7 @@ func (r *ReconcileWorkloadSpread) getPodJob(ref *appsv1alpha1.TargetReference, n } err = r.List(context.TODO(), podList, listOption) if err != nil { - return nil, -1, err + return nil, 0, err } matchedPods := make([]*corev1.Pod, 0, len(podList.Items)) @@ -275,15 +276,39 @@ func (r *ReconcileWorkloadSpread) getPodJob(ref *appsv1alpha1.TargetReference, n return matchedPods, *(job.Spec.Parallelism), nil } +func (r *ReconcileWorkloadSpread) getReplicasPathList(ws *appsv1alpha1.WorkloadSpread) ([]string, error) { + if ws.Spec.TargetReference == nil { + return nil, nil + } + if ws.Spec.TargetFilter != nil && len(ws.Spec.TargetFilter.ReplicasPathList) > 0 { + return ws.Spec.TargetFilter.ReplicasPathList, nil + } + whiteList, err := configuration.GetWSWatchCustomWorkloadWhiteList(r.Client) + if err != nil { + return nil, err + } + gv, err := schema.ParseGroupVersion(ws.Spec.TargetReference.APIVersion) + if err != nil { + return nil, err + } + for _, wl := range whiteList.Workloads { + if wl.GroupVersion() != gv || wl.GroupVersionKind.Kind != ws.Spec.TargetReference.Kind { + continue + } + klog.V(5).InfoS("found replicas path in whitelist", "path", wl.ReplicasPath, "workloadSpread", klog.KObj(ws)) + return []string{wl.ReplicasPath}, nil + } + return nil, nil +} + // getPodsForWorkloadSpread returns Pods managed by the WorkloadSpread object. // return two parameters // 1. podList for workloadSpread // 2. workloadReplicas func (r *ReconcileWorkloadSpread) getPodsForWorkloadSpread(ws *appsv1alpha1.WorkloadSpread) ([]*corev1.Pod, int32, error) { if ws.Spec.TargetReference == nil { - return nil, -1, nil + return nil, 0, nil } - var pods []*corev1.Pod var workloadReplicas int32 var err error @@ -295,15 +320,68 @@ func (r *ReconcileWorkloadSpread) getPodsForWorkloadSpread(ws *appsv1alpha1.Work default: pods, workloadReplicas, err = r.controllerFinder.GetPodsForRef(targetRef.APIVersion, targetRef.Kind, ws.Namespace, targetRef.Name, false) } - if err != nil { klog.ErrorS(err, "WorkloadSpread handled targetReference failed", "workloadSpread", klog.KObj(ws)) - return nil, -1, err + return nil, 0, err } + workloadReplicas, pods, err = r.filterWorkload(ws, pods, workloadReplicas) + if err != nil { + klog.ErrorS(err, "Filter workload failed", "workloadSpread", klog.KObj(ws)) + return nil, 0, err + } return pods, workloadReplicas, err } +func (r *ReconcileWorkloadSpread) filterWorkload(ws *appsv1alpha1.WorkloadSpread, pods []*corev1.Pod, replicas int32) (int32, []*corev1.Pod, error) { + klog.V(5).InfoS("before workload filtering", "pods", len(pods), "replicas", replicas, "workloadSpread", klog.KObj(ws)) + replicasPathList, err := r.getReplicasPathList(ws) + if err != nil { + return replicas, pods, err + } + var filteredReplicas int32 + if len(replicasPathList) > 0 { + // replicas path list configured in someplace, should overwrite replicas value + targetRef := ws.Spec.TargetReference + wl, err := r.controllerFinder.GetControllerAsUnstructured(controllerfinder.ControllerReference{ + APIVersion: targetRef.APIVersion, + Kind: targetRef.Kind, + Name: targetRef.Name, + }, ws.Namespace) + if err != nil { + return replicas, pods, client.IgnoreNotFound(err) + } + for _, replicasPath := range replicasPathList { + n, err := wsutil.GetReplicasFromObject(wl, replicasPath) + if err != nil { + return replicas, pods, err + } + filteredReplicas += n + } + klog.V(4).InfoS("replicas after filtering", "replicas", filteredReplicas, + "replicasPathList", replicasPathList, "workloadSpread", klog.KObj(ws)) + } else { + filteredReplicas = replicas + klog.V(4).InfoS("replicas not filtered", "workloadSpread", klog.KObj(ws)) + } + var filteredPods []*corev1.Pod + if ws.Spec.TargetFilter != nil && ws.Spec.TargetFilter.Selector != nil { + for _, pod := range pods { + selected, err := wsutil.IsPodSelected(ws.Spec.TargetFilter, pod.Labels) + if err != nil { + return replicas, pods, err + } + if selected { + filteredPods = append(filteredPods, pod) + } + } + klog.V(4).InfoS("pods after filtering", "pods", len(filteredPods), "selector", ws.Spec.TargetFilter.Selector) + } else { + filteredPods = pods + } + return filteredReplicas, filteredPods, nil +} + // syncWorkloadSpread is the main logic of the WorkloadSpread controller. Firstly, we get Pods from workload managed by // WorkloadSpread and then classify these Pods to each corresponding subset. Secondly, we set Pod deletion-cost annotation // value by compare the number of subset's Pods with the subset's maxReplicas, and then we consider rescheduling failed Pods. @@ -311,11 +389,13 @@ func (r *ReconcileWorkloadSpread) getPodsForWorkloadSpread(ws *appsv1alpha1.Work // to maintain WorkloadSpread status together. The controller is responsible for calculating the real status, and the webhook // mainly counts missingReplicas and records the creation or deletion entry of Pod into map. func (r *ReconcileWorkloadSpread) syncWorkloadSpread(ws *appsv1alpha1.WorkloadSpread) error { + if ws.Spec.TargetReference == nil { + klog.InfoS("WorkloadSpread has no target reference", "workloadSpread", klog.KObj(ws)) + return nil + } pods, workloadReplicas, err := r.getPodsForWorkloadSpread(ws) - if err != nil || workloadReplicas == -1 { - if err != nil { - klog.ErrorS(err, "WorkloadSpread got matched pods failed", "workloadSpread", klog.KObj(ws)) - } + if err != nil { + klog.ErrorS(err, "WorkloadSpread got matched pods failed", "workloadSpread", klog.KObj(ws)) return err } if len(pods) == 0 { @@ -398,7 +478,7 @@ func (r *ReconcileWorkloadSpread) groupPodBySubset(ws *appsv1alpha1.WorkloadSpre for _, subset := range ws.Spec.Subsets { podMap[subset.Name] = []*corev1.Pod{} subsetMissingReplicas[subset.Name], _ = intstr.GetScaledValueFromIntOrPercent( - intstr.ValueOrDefault(subset.MaxReplicas, intstr.FromInt(math.MaxInt32)), int(replicas), true) + intstr.ValueOrDefault(subset.MaxReplicas, intstr.FromInt32(math.MaxInt32)), int(replicas), true) } // count managed pods for each subset @@ -649,7 +729,7 @@ func (r *ReconcileWorkloadSpread) calculateWorkloadSpreadSubsetStatus(ws *appsv1 // MaxReplicas is nil, which means there is no limit for subset replicas, using -1 to represent it. subsetMaxReplicas = -1 } else { - subsetMaxReplicas, err = intstr.GetValueFromIntOrPercent(subset.MaxReplicas, int(workloadReplicas), true) + subsetMaxReplicas, err = intstr.GetScaledValueFromIntOrPercent(subset.MaxReplicas, int(workloadReplicas), true) if err != nil || subsetMaxReplicas < 0 { klog.ErrorS(err, "Failed to get maxReplicas value from subset of WorkloadSpread", "subsetName", subset.Name, "workloadSpread", klog.KObj(ws)) return nil diff --git a/pkg/controller/workloadspread/workloadspread_controller_test.go b/pkg/controller/workloadspread/workloadspread_controller_test.go index de3670459e..16879fcced 100644 --- a/pkg/controller/workloadspread/workloadspread_controller_test.go +++ b/pkg/controller/workloadspread/workloadspread_controller_test.go @@ -25,16 +25,21 @@ import ( "testing" "time" + "github.com/openkruise/kruise/pkg/util" + "github.com/openkruise/kruise/pkg/util/configuration" apps "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/tools/record" utilpointer "k8s.io/utils/pointer" + "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -71,9 +76,9 @@ var ( APIVersion: "apps.kruise.io/v1alpha1", Kind: "CloneSet", Name: "cloneset-test", - Controller: utilpointer.BoolPtr(true), + Controller: ptr.To(true), UID: types.UID("a03eb001-27eb-4713-b634-7c46f6861758"), - BlockOwnerDeletion: utilpointer.BoolPtr(true), + BlockOwnerDeletion: ptr.To(true), }, }, }, @@ -100,7 +105,7 @@ var ( UID: types.UID("a03eb001-27eb-4713-b634-7c46f6861758"), }, Spec: appsv1alpha1.CloneSetSpec{ - Replicas: utilpointer.Int32Ptr(10), + Replicas: ptr.To(int32(10)), Selector: &metav1.LabelSelector{ MatchLabels: map[string]string{ "app": "nginx", @@ -153,6 +158,21 @@ var ( Name: "subset-a", MaxReplicas: &intstr.IntOrString{Type: intstr.Int, IntVal: 5}, } + + getWhiteListDemoCopy = func() configuration.WSCustomWorkloadWhiteList { + return configuration.WSCustomWorkloadWhiteList{ + Workloads: []configuration.CustomWorkload{ + { + GroupVersionKind: schema.GroupVersionKind{ + Group: "mock.kruise.io", + Version: "v1", + Kind: "GameServerSet", + }, + ReplicasPath: "spec.replicas", + }, + }, + } + } ) func init() { @@ -731,11 +751,73 @@ func TestSubsetPodDeletionCost(t *testing.T) { } func TestWorkloadSpreadReconcile(t *testing.T) { + getTwoPodsWithDifferentLabels := func() []*corev1.Pod { + pod1 := podDemo.DeepCopy() + pod1.Annotations = map[string]string{ + wsutil.MatchedWorkloadSpreadSubsetAnnotations: `{"Name":"test-workloadSpread","Subset":"subset-a"}`, + } + pod1.OwnerReferences = []metav1.OwnerReference{ + { + APIVersion: "mock.kruise.io/v1", + Kind: "GameServerSet", + Name: "workload", + UID: "12345", + }, + } + pod1.Labels["selected"] = "true" + pod2 := pod1.DeepCopy() + pod2.Name = "another" + pod2.Labels["selected"] = "false" + pod2.Labels["app"] = "not-nginx" // preventing being selected by func getLatestPods + return []*corev1.Pod{pod1, pod2} + } + + getWorkloadSpreadWithPercentSubsetB := func() *appsv1alpha1.WorkloadSpread { + workloadSpread := workloadSpreadDemo.DeepCopy() + workloadSpread.Spec.Subsets = append(workloadSpread.Spec.Subsets, appsv1alpha1.WorkloadSpreadSubset{ + Name: "subset-b", + MaxReplicas: &intstr.IntOrString{Type: intstr.String, StrVal: "50%"}, + }) + workloadSpread.Spec.TargetReference = &appsv1alpha1.TargetReference{ + APIVersion: "mock.kruise.io/v1", + Kind: "GameServerSet", + Name: "workload", + } + workloadSpread.Spec.TargetFilter = &appsv1alpha1.TargetFilter{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "selected": "true", + }, + }, + } + return workloadSpread + } + expectWorkloadSpreadWithPercentSubsetB := func() *appsv1alpha1.WorkloadSpread { + workloadSpread := workloadSpreadDemo.DeepCopy() + workloadSpread.Spec.TargetReference = &appsv1alpha1.TargetReference{ + APIVersion: "mock.kruise.io/v1", + Kind: "GameServerSet", + Name: "workload", + } + workloadSpread.Status.SubsetStatuses = append(workloadSpread.Status.SubsetStatuses, appsv1alpha1.WorkloadSpreadSubsetStatus{}) + workloadSpread.Status.SubsetStatuses[0].MissingReplicas = 4 + workloadSpread.Status.SubsetStatuses[0].Replicas = 1 + workloadSpread.Status.SubsetStatuses[0].CreatingPods = map[string]metav1.Time{} + workloadSpread.Status.SubsetStatuses[0].DeletingPods = map[string]metav1.Time{} + workloadSpread.Status.SubsetStatuses[1].Name = "subset-b" + workloadSpread.Status.SubsetStatuses[1].MissingReplicas = 0 + workloadSpread.Status.SubsetStatuses[1].Replicas = 0 + workloadSpread.Status.SubsetStatuses[1].CreatingPods = map[string]metav1.Time{} + workloadSpread.Status.SubsetStatuses[1].DeletingPods = map[string]metav1.Time{} + return workloadSpread + } cases := []struct { name string getPods func() []*corev1.Pod getWorkloadSpread func() *appsv1alpha1.WorkloadSpread getCloneSet func() *appsv1alpha1.CloneSet + getWorkloads func() []client.Object + getWhiteList func() configuration.WSCustomWorkloadWhiteList expectPods func() []*corev1.Pod expectWorkloadSpread func() *appsv1alpha1.WorkloadSpread }{ @@ -747,9 +829,6 @@ func TestWorkloadSpreadReconcile(t *testing.T) { getWorkloadSpread: func() *appsv1alpha1.WorkloadSpread { return workloadSpreadDemo.DeepCopy() }, - getCloneSet: func() *appsv1alpha1.CloneSet { - return nil - }, expectPods: func() []*corev1.Pod { return []*corev1.Pod{} }, @@ -905,7 +984,7 @@ func TestWorkloadSpreadReconcile(t *testing.T) { }, getCloneSet: func() *appsv1alpha1.CloneSet { cloneSet := cloneSetDemo.DeepCopy() - cloneSet.Spec.Replicas = utilpointer.Int32Ptr(5) + cloneSet.Spec.Replicas = ptr.To(int32(5)) return cloneSet }, expectPods: func() []*corev1.Pod { @@ -942,7 +1021,7 @@ func TestWorkloadSpreadReconcile(t *testing.T) { }, getCloneSet: func() *appsv1alpha1.CloneSet { cloneSet := cloneSetDemo.DeepCopy() - cloneSet.Spec.Replicas = utilpointer.Int32Ptr(5) + cloneSet.Spec.Replicas = ptr.To(int32(5)) return cloneSet }, expectPods: func() []*corev1.Pod { @@ -1523,8 +1602,128 @@ func TestWorkloadSpreadReconcile(t *testing.T) { return workloadSpread }, }, + { + name: "custom workload with replica path in whitelist", + getWorkloads: func() []client.Object { + clone := cloneSetDemo.DeepCopy() + clone.Name = "workload" + clone.Kind = "GameServerSet" + clone.APIVersion = "mock.kruise.io/v1" + clone.UID = "12345" + clone.Spec.Replicas = utilpointer.Int32(14) + unstructuredMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(clone) + if err != nil { + panic("err when convert to unstructured object") + } + return []client.Object{&unstructured.Unstructured{Object: unstructuredMap}} + }, + getPods: getTwoPodsWithDifferentLabels, + getWorkloadSpread: func() *appsv1alpha1.WorkloadSpread { + workloadSpread := getWorkloadSpreadWithPercentSubsetB() + workloadSpread.Spec.TargetFilter = nil + return workloadSpread + }, + getWhiteList: getWhiteListDemoCopy, + expectPods: func() []*corev1.Pod { + pod := podDemo.DeepCopy() + pod.Annotations = map[string]string{ + wsutil.MatchedWorkloadSpreadSubsetAnnotations: `{"Name":"test-workloadSpread","Subset":"subset-a"}`, + } + return []*corev1.Pod{pod} + }, + expectWorkloadSpread: func() *appsv1alpha1.WorkloadSpread { + workloadSpread := expectWorkloadSpreadWithPercentSubsetB() + workloadSpread.Status.SubsetStatuses[0].Replicas = 2 + workloadSpread.Status.SubsetStatuses[0].MissingReplicas = 3 + workloadSpread.Status.SubsetStatuses[1].Replicas = 0 + workloadSpread.Status.SubsetStatuses[1].MissingReplicas = 7 + return workloadSpread + }, + }, + { + name: "custom workload with target filter", + getWorkloads: func() []client.Object { + clone := cloneSetDemo.DeepCopy() + clone.Name = "workload" + clone.Kind = "GameServerSet" + clone.APIVersion = "mock.kruise.io/v1" + clone.UID = "12345" + clone.Spec.RevisionHistoryLimit = utilpointer.Int32(18) // as replicas + unstructuredMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(clone) + if err != nil { + panic("err when convert to unstructured object") + } + return []client.Object{&unstructured.Unstructured{Object: unstructuredMap}} + }, + getPods: func() []*corev1.Pod { + pod1 := podDemo.DeepCopy() + pod1.OwnerReferences = []metav1.OwnerReference{ + { + APIVersion: "mock.kruise.io/v1", + Kind: "GameServerSet", + Name: "workload", + UID: "12345", + }, + } + pod1.Annotations = map[string]string{ + wsutil.MatchedWorkloadSpreadSubsetAnnotations: `{"Name":"test-workloadSpread","Subset":"subset-a"}`, + } + pod2 := pod1.DeepCopy() + pod1.Labels["selected"] = "true" + pod2.Labels["selected"] = "false" + return []*corev1.Pod{pod1} + }, + getWorkloadSpread: func() *appsv1alpha1.WorkloadSpread { + workloadSpread := getWorkloadSpreadWithPercentSubsetB() + workloadSpread.Spec.TargetFilter.ReplicasPathList = []string{"spec.revisionHistoryLimit"} + return workloadSpread + }, + getWhiteList: getWhiteListDemoCopy, + expectPods: func() []*corev1.Pod { + pod := podDemo.DeepCopy() + pod.Annotations = map[string]string{ + wsutil.MatchedWorkloadSpreadSubsetAnnotations: `{"Name":"test-workloadSpread","Subset":"subset-a"}`, + } + return []*corev1.Pod{pod} + }, + expectWorkloadSpread: func() *appsv1alpha1.WorkloadSpread { + workloadSpread := expectWorkloadSpreadWithPercentSubsetB() + workloadSpread.Status.SubsetStatuses[1].MissingReplicas = 9 + return workloadSpread + }, + }, + { + name: "custom workload without replicas", + getWorkloads: func() []client.Object { + clone := cloneSetDemo.DeepCopy() + clone.Name = "workload" + clone.Kind = "GameServerSet" + clone.APIVersion = "mock.kruise.io/v1" + clone.UID = "12345" + // with no any replicas + unstructuredMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(clone) + if err != nil { + panic("err when convert to unstructured object") + } + return []client.Object{&unstructured.Unstructured{Object: unstructuredMap}} + }, + getPods: getTwoPodsWithDifferentLabels, + getWorkloadSpread: getWorkloadSpreadWithPercentSubsetB, + getWhiteList: func() configuration.WSCustomWorkloadWhiteList { + whiteList := getWhiteListDemoCopy() + whiteList.Workloads[0].ReplicasPath = "" // not configured + return whiteList + }, + expectPods: func() []*corev1.Pod { + pod := podDemo.DeepCopy() + pod.Annotations = map[string]string{ + wsutil.MatchedWorkloadSpreadSubsetAnnotations: `{"Name":"test-workloadSpread","Subset":"subset-a"}`, + } + return []*corev1.Pod{pod} + }, + expectWorkloadSpread: expectWorkloadSpreadWithPercentSubsetB, + }, } - if !wsutil.EnabledWorkloadSetForVersionedStatus.Has("cloneset") { wsutil.EnabledWorkloadSetForVersionedStatus.Insert("cloneset") defer wsutil.EnabledWorkloadSetForVersionedStatus.Delete("cloneset") @@ -1542,9 +1741,25 @@ func TestWorkloadSpreadReconcile(t *testing.T) { } return owners }).WithStatusSubresource(&appsv1alpha1.WorkloadSpread{}) - if cs.getCloneSet() != nil { + if cs.getCloneSet != nil { builder.WithObjects(cs.getCloneSet()) } + if cs.getWorkloads != nil { + builder.WithObjects(cs.getWorkloads()...) + } + if cs.getWhiteList != nil { + whiteList := cs.getWhiteList() + marshaled, _ := json.Marshal(whiteList) + builder.WithObjects(&corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: configuration.KruiseConfigurationName, + Namespace: util.GetKruiseNamespace(), + }, + Data: map[string]string{ + configuration.WSWatchCustomWorkloadWhiteList: string(marshaled), + }, + }) + } for _, pod := range cs.getPods() { podIn := pod.DeepCopy() builder.WithObjects(podIn) @@ -1935,7 +2150,7 @@ func TestManagerExistingPods(t *testing.T) { }, getCloneSet: func() *appsv1alpha1.CloneSet { cloneSet := cloneSetDemo.DeepCopy() - cloneSet.Spec.Replicas = utilpointer.Int32Ptr(10) + cloneSet.Spec.Replicas = ptr.To(int32(10)) return cloneSet }, getNodes: func() []*corev1.Node { @@ -2108,7 +2323,7 @@ func TestManagerExistingPods(t *testing.T) { }, getCloneSet: func() *appsv1alpha1.CloneSet { cloneSet := cloneSetDemo.DeepCopy() - cloneSet.Spec.Replicas = utilpointer.Int32Ptr(3) + cloneSet.Spec.Replicas = ptr.To(int32(3)) return cloneSet }, getNodes: func() []*corev1.Node { diff --git a/pkg/util/controllerfinder/controller_finder.go b/pkg/util/controllerfinder/controller_finder.go index f2dd7eec76..6553a28875 100644 --- a/pkg/util/controllerfinder/controller_finder.go +++ b/pkg/util/controllerfinder/controller_finder.go @@ -43,6 +43,8 @@ import ( var Finder *ControllerFinder +const ReplicasUnknown int32 = -1 + func InitControllerFinder(mgr manager.Manager) error { Finder = &ControllerFinder{ Client: mgr.GetClient(), @@ -72,7 +74,7 @@ func InitControllerFinder(mgr manager.Manager) error { // controller finder functions. type ScaleAndSelector struct { ControllerReference - // controller.spec.Replicas + // controller.spec.Replicas; the value -1 means it is uncertain currently Scale int32 // kruise statefulSet.spec.ReserveOrdinals ReserveOrdinals []int @@ -155,7 +157,7 @@ func (r *ControllerFinder) GetScaleAndSelectorForRef(apiVersion, kind, ns, name func (r *ControllerFinder) Finders() []PodControllerFinder { return []PodControllerFinder{r.getPodReplicationController, r.getPodDeployment, r.getPodReplicaSet, - r.getPodStatefulSet, r.getPodKruiseCloneSet, r.getPodKruiseStatefulSet, r.getPodStatefulSetLike, r.getScaleController} + r.getPodStatefulSet, r.getPodKruiseCloneSet, r.getPodKruiseStatefulSet, r.getPodStatefulSetLike, r.getScaleController, r.getRefUID} } var ( @@ -465,7 +467,9 @@ func (r *ControllerFinder) getScaleController(ref ControllerReference, namespace Group: gv.Group, Kind: ref.Kind, } - + if r.mapper == nil { + return nil, nil // only happens in test scenarios, preventing panic + } mapping, err := r.mapper.RESTMapping(gk, gv.Version) if err != nil { return nil, err @@ -499,6 +503,38 @@ func (r *ControllerFinder) getScaleController(ref ControllerReference, namespace }, nil } +func (r *ControllerFinder) GetControllerAsUnstructured(ref ControllerReference, namespace string) (*unstructured.Unstructured, error) { + un := unstructured.Unstructured{} + un.SetAPIVersion(ref.APIVersion) + un.SetKind(ref.Kind) + return &un, r.Get(context.Background(), client.ObjectKey{Namespace: namespace, Name: ref.Name}, &un) +} + +func (r *ControllerFinder) getRefUID(ref ControllerReference, namespace string) (*ScaleAndSelector, error) { + un, err := r.GetControllerAsUnstructured(ref, namespace) + if err != nil { + return nil, client.IgnoreNotFound(err) + } + return &ScaleAndSelector{ + Scale: ReplicasUnknown, + ControllerReference: ControllerReference{ + APIVersion: ref.APIVersion, + Kind: ref.Kind, + Name: ref.Name, + UID: un.GetUID(), + }, + Metadata: metav1.ObjectMeta{ + Namespace: un.GetNamespace(), + Name: un.GetName(), + Annotations: un.GetAnnotations(), + UID: un.GetUID(), + OwnerReferences: un.GetOwnerReferences(), + Labels: un.GetLabels(), + ResourceVersion: un.GetResourceVersion(), + }, + }, nil +} + func verifyGroupKind(apiVersion, kind string, gvk schema.GroupVersionKind) (bool, error) { gv, err := schema.ParseGroupVersion(apiVersion) if err != nil { diff --git a/pkg/util/controllerfinder/pods_finder.go b/pkg/util/controllerfinder/pods_finder.go index 16bec7c20d..fc41c4a690 100644 --- a/pkg/util/controllerfinder/pods_finder.go +++ b/pkg/util/controllerfinder/pods_finder.go @@ -18,6 +18,7 @@ package controllerfinder import ( "context" + "fmt" "github.com/openkruise/kruise/pkg/util" utilclient "github.com/openkruise/kruise/pkg/util/client" @@ -91,6 +92,8 @@ func (r *ControllerFinder) GetPodsForRef(apiVersion, kind, ns, name string, acti labelSelector = obj.Selector workloadUIDs = append(workloadUIDs, obj.UID) } + klog.V(5).InfoS("find pods and replicas result", "target", fmt.Sprintf("%s/%s", ns, name), "kind", kind, + "workloadReplicas", workloadReplicas, "workloadUIDs", workloadUIDs, "labelSelector", labelSelector) if workloadReplicas == 0 { return nil, workloadReplicas, nil } @@ -120,6 +123,8 @@ func (r *ControllerFinder) GetPodsForRef(apiVersion, kind, ns, name string, acti FieldSelector: fields.SelectorFromSet(fields.Set{fieldindex.IndexNameForOwnerRefUID: string(uid)}), } pods, err := listPods(&listOption) + klog.V(5).InfoS("result of list pods with owner ref uid", + "target", fmt.Sprintf("%s/%s", ns, name), "kind", kind, "pods", len(pods), "err", err, "refUid", uid) if err != nil { return nil, -1, err } diff --git a/pkg/util/selector.go b/pkg/util/selector.go index 4c1ef9808d..331df4e20b 100644 --- a/pkg/util/selector.go +++ b/pkg/util/selector.go @@ -121,6 +121,8 @@ func sliceContains(a, b []string) bool { return true } +// ValidatedLabelSelectorAsSelector is faster than native `metav1.LabelSelectorAsSelector` for the newRequirement function +// performs no validation. MAKE SURE the `ps` param is validated with `metav1.LabelSelectorAsSelector` before. func ValidatedLabelSelectorAsSelector(ps *metav1.LabelSelector) (labels.Selector, error) { if ps == nil { return labels.Nothing(), nil diff --git a/pkg/util/workloadspread/utils.go b/pkg/util/workloadspread/utils.go new file mode 100644 index 0000000000..c13d5353a9 --- /dev/null +++ b/pkg/util/workloadspread/utils.go @@ -0,0 +1,89 @@ +/* +Copyright 2024 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workloadspread + +import ( + "errors" + "fmt" + "strconv" + "strings" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "github.com/openkruise/kruise/pkg/util" + "k8s.io/apimachinery/pkg/labels" + intstrutil "k8s.io/apimachinery/pkg/util/intstr" +) + +func hasPercentSubset(ws *appsv1alpha1.WorkloadSpread) (has bool) { + if ws == nil { + return false + } + for _, subset := range ws.Spec.Subsets { + if subset.MaxReplicas != nil && subset.MaxReplicas.Type == intstrutil.String && + strings.HasSuffix(subset.MaxReplicas.StrVal, "%") { + return true + } + } + return false +} + +func NestedField[T any](obj any, paths ...string) (T, bool, error) { + if len(paths) == 0 { + val, ok := obj.(T) + if !ok { + return *new(T), false, errors.New("object type error") + } + return val, true, nil + } + if o, ok := obj.(map[string]any); ok { + return nestedMap[T](o, paths...) + } + if o, ok := obj.([]any); ok { + return nestedSlice[T](o, paths...) + } + return *new(T), false, errors.New("object is not deep enough") +} + +func nestedSlice[T any](obj []any, paths ...string) (T, bool, error) { + idx, err := strconv.Atoi(paths[0]) + if err != nil { + return *new(T), false, err + } + if len(obj) < idx+1 { + return *new(T), false, fmt.Errorf("index %d out of range", idx) + } + return NestedField[T](obj[idx], paths[1:]...) +} + +func nestedMap[T any](obj map[string]any, paths ...string) (T, bool, error) { + if val, ok := obj[paths[0]]; ok { + return NestedField[T](val, paths[1:]...) + } else { + return *new(T), false, fmt.Errorf("path \"%s\" not exists", paths[0]) + } +} + +func IsPodSelected(filter *appsv1alpha1.TargetFilter, podLabels map[string]string) (bool, error) { + if filter == nil { + return true, nil + } + selector, err := util.ValidatedLabelSelectorAsSelector(filter.Selector) + if err != nil { + return false, err + } + return selector.Matches(labels.Set(podLabels)), nil +} diff --git a/pkg/util/workloadspread/utils_test.go b/pkg/util/workloadspread/utils_test.go new file mode 100644 index 0000000000..5ba5c733e4 --- /dev/null +++ b/pkg/util/workloadspread/utils_test.go @@ -0,0 +1,218 @@ +package workloadspread + +import ( + "reflect" + "testing" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + intstrutil "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/utils/ptr" +) + +func TestNestedField(t *testing.T) { + type args struct { + obj map[string]any + paths []string + } + type testCase[T any] struct { + name string + args args + want T + exists bool + wantErr bool + } + ud := &appsv1alpha1.UnitedDeployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ud", + Namespace: "ns", + }, + Spec: appsv1alpha1.UnitedDeploymentSpec{ + Replicas: ptr.To(int32(5)), + Topology: appsv1alpha1.Topology{ + Subsets: []appsv1alpha1.Subset{ + { + Name: "subset1", + Replicas: &intstrutil.IntOrString{ + Type: intstrutil.Int, + IntVal: 5, + }, + }, + }, + }, + }, + } + un, err := runtime.DefaultUnstructuredConverter.ToUnstructured(ud) + if err != nil { + t.Fatal(err) + } + tests := []testCase[int64]{ + { + name: "exists", + args: args{ + obj: map[string]any{ + "m": []any{int64(1)}, + }, + paths: []string{"m", "0"}, + }, + want: 1, + exists: true, + }, + { + name: "no key", + args: args{ + obj: map[string]any{ + "m": []any{1}, + }, + paths: []string{"n", "0"}, + }, + want: 0, + exists: false, + wantErr: true, + }, + { + name: "bad type", + args: args{ + obj: map[string]any{ + "m": []any{"1"}, + }, + paths: []string{"m", "0"}, + }, + want: 0, + exists: false, + wantErr: true, + }, + { + name: "not deep enough", + args: args{ + obj: map[string]any{ + "m": []any{1}, + }, + paths: []string{"m", "0", "n"}, + }, + want: 0, + exists: false, + wantErr: true, + }, + { + name: "bad slice index", + args: args{ + obj: map[string]any{ + "m": []any{1}, + }, + paths: []string{"m", "a"}, + }, + want: 0, + exists: false, + wantErr: true, + }, + { + name: "slice out of range", + args: args{ + obj: map[string]any{ + "m": []any{1}, + }, + paths: []string{"m", "10"}, + }, + want: 0, + exists: false, + wantErr: true, + }, + { + name: "real", + args: args{ + obj: un, + paths: []string{"spec", "topology", "subsets", "0", "replicas"}, + }, + want: 5, + exists: true, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, exists, err := NestedField[int64](tt.args.obj, tt.args.paths...) + if (err != nil) != tt.wantErr { + t.Errorf("NestedField() error = %s, wantErr %v", err.Error(), tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("NestedField() got = %v, want %v", got, tt.want) + } + if exists != tt.exists { + t.Errorf("NestedField() exists = %v, want %v", exists, tt.exists) + } + }) + } +} + +func TestIsPodSelected(t *testing.T) { + commonFilter := &appsv1alpha1.TargetFilter{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "selected", + }, + }, + } + cases := []struct { + name string + filter *appsv1alpha1.TargetFilter + labels map[string]string + selected bool + wantErr bool + }{ + { + name: "selected", + filter: commonFilter, + labels: map[string]string{ + "app": "selected", + }, + selected: true, + }, + { + name: "not selected", + filter: commonFilter, + labels: map[string]string{ + "app": "not-selected", + }, + selected: false, + }, + { + name: "selector is nil", + filter: nil, + labels: map[string]string{ + "app": "selected", + }, + selected: true, + }, + { + name: "selector is invalid", + filter: &appsv1alpha1.TargetFilter{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "selected", + }, + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "app", + Operator: "Invalid", + Values: []string{"selected"}, + }, + }, + }, + }, + selected: false, + wantErr: true, + }, + } + for _, cs := range cases { + t.Run(cs.name, func(t *testing.T) { + selected, err := IsPodSelected(cs.filter, cs.labels) + if selected != cs.selected || (err != nil) != cs.wantErr { + t.Fatalf("got unexpected result, actual: [selected=%v,err=%v] expected: [selected=%v,wantErr=%v]", + selected, err, cs.selected, cs.wantErr) + } + }) + } +} diff --git a/pkg/util/workloadspread/workloadspread.go b/pkg/util/workloadspread/workloadspread.go index 84c46e3e17..ff5cf0d3f3 100644 --- a/pkg/util/workloadspread/workloadspread.go +++ b/pkg/util/workloadspread/workloadspread.go @@ -224,16 +224,25 @@ func (h *Handler) HandlePodCreation(pod *corev1.Pod) (skip bool, err error) { continue } // determine if the reference of workloadSpread and pod is equal - if ok, err := h.isReferenceEqual(ws.Spec.TargetReference, ref, pod.Namespace); ok { - matchedWS = &ws - // pod has at most one matched workloadSpread - break - } else if err != nil { + referenceEqual, err := h.isReferenceEqual(ws.Spec.TargetReference, ref, pod.GetNamespace()) + if err != nil { klog.ErrorS(err, "failed to determine whether workloadspread refers pod's owner", "pod", klog.KObj(pod), "workloadspread", klog.KObj(&ws)) if errors.IsNotFound(err) { return true, err } + continue + } + selected, err := IsPodSelected(ws.Spec.TargetFilter, pod.GetLabels()) + if err != nil { + klog.ErrorS(err, "failed to determine whether workloadspread selects pod", + "pod", klog.KObj(pod), "workloadspread", klog.KObj(&ws)) + continue + } + if referenceEqual && selected { + matchedWS = &ws + // pod has at most one matched workloadSpread + break } } // not found matched workloadSpread @@ -512,8 +521,8 @@ func (h *Handler) updateSubsetForPod(ws *appsv1alpha1.WorkloadSpread, suitableSubset = h.getSuitableSubset(subsetStatuses) if suitableSubset == nil { - klog.InfoS("WorkloadSpread don't have a suitable subset for Pod when creating", - "namespace", ws.Namespace, "wsName", ws.Name, "podName", pod.Name) + klog.InfoS("WorkloadSpread doesn't have a suitable subset for Pod when creating", + "namespace", ws.Namespace, "wsName", ws.Name, "podName", pod.GetGenerateName()) return false, nil, "", nil } // no need to update WorkloadSpread status if MaxReplicas == nil @@ -791,11 +800,13 @@ func initializeWorkloadsInWhiteList(c client.Client) { }) } } + klog.InfoS("initialized workload list", "workloads", workloads) workloadsInWhiteListInitialized = true } func (h *Handler) initializedSubsetStatuses(ws *appsv1alpha1.WorkloadSpread) ([]appsv1alpha1.WorkloadSpreadSubsetStatus, error) { replicas, err := h.getWorkloadReplicas(ws) + klog.V(5).InfoS("get workload replicas", "replicas", replicas, "err", err, "workloadSpread", klog.KObj(ws)) if err != nil { return nil, err } @@ -815,7 +826,7 @@ func (h *Handler) initializedSubsetStatuses(ws *appsv1alpha1.WorkloadSpread) ([] } func (h *Handler) getWorkloadReplicas(ws *appsv1alpha1.WorkloadSpread) (int32, error) { - if ws.Spec.TargetReference == nil { + if ws.Spec.TargetReference == nil || !hasPercentSubset(ws) { return 0, nil } gvk := schema.FromAPIVersionAndKind(ws.Spec.TargetReference.APIVersion, ws.Spec.TargetReference.Kind) @@ -828,6 +839,10 @@ func (h *Handler) getWorkloadReplicas(ws *appsv1alpha1.WorkloadSpread) (int32, e return 0, client.IgnoreNotFound(err) } + if ws.Spec.TargetFilter != nil && len(ws.Spec.TargetFilter.ReplicasPathList) > 0 { + return GetReplicasFromWorkloadWithTargetFilter(object, ws.Spec.TargetFilter) + } + switch o := object.(type) { case *appsv1.Deployment: return *o.Spec.Replicas, nil @@ -871,6 +886,21 @@ func GenerateEmptyWorkloadObject(gvk schema.GroupVersionKind, key types.Namespac return } +func GetReplicasFromObject(object *unstructured.Unstructured, replicasPath string) (int32, error) { + if replicasPath == "" { + return 0, nil + } + var exists bool + var replicas int64 + var err error + path := strings.Split(replicasPath, ".") + replicas, exists, err = NestedField[int64](object.Object, path...) + if err != nil || !exists { + return 0, err + } + return int32(replicas), nil +} + func GetReplicasFromCustomWorkload(reader client.Reader, object *unstructured.Unstructured) int32 { if object == nil { return 0 @@ -886,25 +916,32 @@ func GetReplicasFromCustomWorkload(reader client.Reader, object *unstructured.Un if wl.GroupVersionKind.GroupKind() != gvk.GroupKind() { continue } - var exists bool - var replicas int64 - path := strings.Split(wl.ReplicasPath, ".") - if len(path) > 0 { - replicas, exists, err = unstructured.NestedInt64(object.Object, path...) - if err != nil || !exists { - klog.ErrorS(err, "Failed to get replicas", "from", gvk, "replicasPath", wl.ReplicasPath) - } - } else { - replicas, exists, err = unstructured.NestedInt64(object.Object, "spec", "replicas") - if err != nil || !exists { - klog.ErrorS(err, "Failed to get replicas", "from", gvk, "replicasPath", wl.ReplicasPath) - } + replicas, err := GetReplicasFromObject(object, wl.ReplicasPath) + if err != nil { + klog.ErrorS(err, "Failed to get replicas from custom workload", "gvk", gvk, "object", klog.KObj(object), "replicasPath", wl.ReplicasPath) } - return int32(replicas) + return replicas } return 0 } +func GetReplicasFromWorkloadWithTargetFilter(object client.Object, targetFilter *appsv1alpha1.TargetFilter) (int32, error) { + objMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(object) + if err != nil { + return 0, err + } + obj := &unstructured.Unstructured{Object: objMap} + var replicas int32 = 0 + for _, path := range targetFilter.ReplicasPathList { + r, err := GetReplicasFromObject(obj, path) + if err != nil { + return 0, err + } + replicas += r + } + return replicas, nil +} + func GetPodVersion(pod *corev1.Pod) string { if !enableVersionedStatus(pod) { return VersionIgnored diff --git a/pkg/util/workloadspread/workloadspread_test.go b/pkg/util/workloadspread/workloadspread_test.go index 12762838f9..81399693a3 100644 --- a/pkg/util/workloadspread/workloadspread_test.go +++ b/pkg/util/workloadspread/workloadspread_test.go @@ -26,7 +26,9 @@ import ( "testing" "time" + "github.com/openkruise/kruise/pkg/util/configuration" appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" @@ -440,6 +442,7 @@ func init() { utilruntime.Must(appsv1beta1.AddToScheme(scheme)) utilruntime.Must(appsv1.AddToScheme(scheme)) utilruntime.Must(corev1.AddToScheme(scheme)) + utilruntime.Must(batchv1.AddToScheme(scheme)) } func TestWorkloadSpreadCreatePodWithoutFullName(t *testing.T) { @@ -1167,6 +1170,139 @@ func TestWorkloadSpreadMutatingPod(t *testing.T) { } } +func TestGetWorkloadReplicas(t *testing.T) { + cases := []struct { + name string + targetReference *appsv1alpha1.TargetReference + targetFilter *appsv1alpha1.TargetFilter + replicas int32 + wantErr bool + }{ + { + name: "without target reference", + }, + { + name: "deployment", + targetReference: &appsv1alpha1.TargetReference{ + APIVersion: "apps/v1", + Kind: "Deployment", + Name: "test", + }, + replicas: 5, + }, + { + name: "Advanced StatefulSet", + targetReference: &appsv1alpha1.TargetReference{ + APIVersion: "apps.kruise.io/v1alpha1", + Kind: "StatefulSet", + Name: "test", + }, + replicas: 5, + }, + { + name: "custom workload", + targetReference: &appsv1alpha1.TargetReference{ + APIVersion: "apps.kruise.io/v1alpha1", + Kind: "DaemonSet", + Name: "test", + }, + replicas: 1, + }, + { + name: "filter assigned replicas path", + targetReference: &appsv1alpha1.TargetReference{ + APIVersion: "apps.kruise.io/v1alpha1", + Kind: "DaemonSet", + Name: "test", + }, + targetFilter: &appsv1alpha1.TargetFilter{ + ReplicasPathList: []string{"spec.revisionHistoryLimit"}, + }, + replicas: 2, + }, + { + name: "filter default path", + targetReference: &appsv1alpha1.TargetReference{ + APIVersion: "apps.kruise.io/v1alpha1", + Kind: "DaemonSet", + Name: "test", + }, + targetFilter: &appsv1alpha1.TargetFilter{Selector: &metav1.LabelSelector{MatchLabels: map[string]string{ + "foo": "bar", + }}}, + replicas: 1, // default path value is 1, even no pods selected + }, + { + name: "job", + targetReference: &appsv1alpha1.TargetReference{ + APIVersion: "batch/v1", + Kind: "Job", + Name: "test", + }, + replicas: 3, + }, + } + whiteList := &configuration.WSCustomWorkloadWhiteList{ + Workloads: []configuration.CustomWorkload{ + { + GroupVersionKind: schema.GroupVersionKind{ + Group: "apps.kruise.io", + Version: "v1alpha1", + Kind: "DaemonSet", + }, + ReplicasPath: "spec.minReadySeconds", + }, + }, + } + whiteListJson, _ := json.Marshal(whiteList) + h := Handler{fake.NewClientBuilder().WithScheme(scheme). + WithObjects( + &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "test"}, + Spec: appsv1.DeploymentSpec{Replicas: ptr.To(int32(5))}, + }, + &appsv1beta1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "test"}, + Spec: appsv1beta1.StatefulSetSpec{Replicas: ptr.To(int32(5))}, + }, + &appsv1alpha1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "test"}, + Spec: appsv1alpha1.DaemonSetSpec{MinReadySeconds: 1, RevisionHistoryLimit: ptr.To(int32(2))}, + }, + &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "test"}, + Spec: batchv1.JobSpec{Parallelism: ptr.To(int32(3))}, + }, + &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: configuration.KruiseConfigurationName, + Namespace: util.GetKruiseNamespace(), + }, + Data: map[string]string{ + configuration.WSWatchCustomWorkloadWhiteList: string(whiteListJson), + }, + }, + ).Build()} + percent := intstr.FromString("30%") + for _, cs := range cases { + t.Run(cs.name, func(t *testing.T) { + ws := workloadSpreadDemo.DeepCopy() + ws.Namespace = "test" + ws.Spec.TargetFilter = cs.targetFilter + ws.Spec.TargetReference = cs.targetReference + ws.Spec.Subsets = append(ws.Spec.Subsets, appsv1alpha1.WorkloadSpreadSubset{ + MaxReplicas: &percent, + }) + replicas, err := h.getWorkloadReplicas(ws) + if cs.wantErr != (err != nil) { + t.Fatalf("wantErr: %v, but got: %v", cs.wantErr, err) + } + if replicas != cs.replicas { + t.Fatalf("want replicas: %v, but got: %v", cs.replicas, replicas) + } + }) + } +} func compareVersionedSubsetStatuses(actual, expect map[string][]appsv1alpha1.WorkloadSpreadSubsetStatus) bool { if len(actual) != len(expect) { return false @@ -2086,3 +2222,115 @@ func getLatestWorkloadSpread(client client.Client, workloadSpread *appsv1alpha1. err := client.Get(context.TODO(), Key, newWS) return newWS, err } + +func TestGetReplicasFromObject(t *testing.T) { + object := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "spec": map[string]interface{}{ + "replicas": int64(5), + "replicaSlice": []any{int64(1), int64(2)}, + "stringField": "5", + }, + }, + } + tests := []struct { + name string + replicasPath string + want int32 + wantErr string + }{ + { + name: "empty path", + replicasPath: "", + want: 0, + }, + { + name: "not exist", + replicasPath: "spec.not.exist", + want: 0, + wantErr: "path \"not\" not exists", + }, + { + name: "error e.g. string field", + replicasPath: "spec.stringField", + want: 0, + wantErr: "object type error", + }, + { + name: "success", + replicasPath: "spec.replicas", + want: 5, + }, + { + name: "success in slice", + replicasPath: "spec.replicaSlice.1", + want: 2, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := GetReplicasFromObject(object, tt.replicasPath) + if err != nil && err.Error() != tt.wantErr { + t.Errorf("GetReplicasFromObject() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("GetReplicasFromObject() got = %v, want %v", got, tt.want) + } + }) + } +} + +func TestGetReplicasFromWorkloadWithTargetFilter(t *testing.T) { + object := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "spec": map[string]interface{}{ + "replicas": int64(5), + "replicaSlice": []any{int64(1), int64(2)}, + }, + }, + } + tests := []struct { + name string + targetFilter *appsv1alpha1.TargetFilter + want int32 + wantErr bool + }{ + { + name: "empty filter", + targetFilter: &appsv1alpha1.TargetFilter{}, + }, + { + name: "all", + targetFilter: &appsv1alpha1.TargetFilter{ + ReplicasPathList: []string{ + "spec.replicas", + "spec.replicaSlice.0", + "spec.replicaSlice.1", + }, + }, + want: 8, + }, + { + name: "with error", + targetFilter: &appsv1alpha1.TargetFilter{ + ReplicasPathList: []string{ + "spec.not.exist", + }, + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := GetReplicasFromWorkloadWithTargetFilter(object, tt.targetFilter) + if (err != nil) != tt.wantErr { + t.Errorf("GetReplicasFromWorkloadWithTargetFilter() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("GetReplicasFromWorkloadWithTargetFilter() got = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/webhook/workloadspread/validating/workloadspread_validation.go b/pkg/webhook/workloadspread/validating/workloadspread_validation.go index ac10c648f0..374901e814 100644 --- a/pkg/webhook/workloadspread/validating/workloadspread_validation.go +++ b/pkg/webhook/workloadspread/validating/workloadspread_validation.go @@ -23,6 +23,7 @@ import ( "math" "time" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/strategicpatch" webhookutil "github.com/openkruise/kruise/pkg/webhook/util" @@ -161,6 +162,7 @@ func validateWorkloadSpreadSpec(h *WorkloadSpreadCreateUpdateHandler, obj *appsv whiteList, err := configuration.GetWSWatchCustomWorkloadWhiteList(h.Client) if err != nil { allErrs = append(allErrs, field.InternalError(fldPath.Child("targetRef"), err)) + break } matched := false for _, wl := range whiteList.Workloads { @@ -212,6 +214,13 @@ func validateWorkloadSpreadSpec(h *WorkloadSpreadCreateUpdateHandler, obj *appsv } } + // validate targetFilter + if spec.TargetFilter != nil { + if _, err := metav1.LabelSelectorAsSelector(spec.TargetFilter.Selector); err != nil { + allErrs = append(allErrs, field.Invalid(fldPath.Child("targetFilter"), spec.TargetFilter, err.Error())) + } + } + return allErrs } diff --git a/test/e2e/apps/workloadspread.go b/test/e2e/apps/workloadspread.go index 09358687e5..290100f356 100644 --- a/test/e2e/apps/workloadspread.go +++ b/test/e2e/apps/workloadspread.go @@ -71,7 +71,7 @@ var _ = SIGDescribe("workloadspread", func() { ns = f.Namespace.Name c = f.ClientSet kc = f.KruiseClientSet - tester = framework.NewWorkloadSpreadTester(c, kc) + tester = framework.NewWorkloadSpreadTester(c, kc, f.DynamicClient) // label nodes nodeList, err := c.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) @@ -1551,205 +1551,84 @@ var _ = SIGDescribe("workloadspread", func() { ginkgo.By("manage statefulset pods only with patch, done") }) - //ginkgo.It("deploy in two zone, maxReplicas=50%", func() { - // cloneSet := tester.NewBaseCloneSet(ns) - // // create workloadSpread - // targetRef := appsv1alpha1.TargetReference{ - // APIVersion: KruiseKindCloneSet.GroupVersion().String(), - // Kind: KruiseKindCloneSet.Kind, - // Name: cloneSet.Name, - // } - // subset1 := appsv1alpha1.WorkloadSpreadSubset{ - // Name: "zone-a", - // RequiredNodeSelectorTerm: &corev1.NodeSelectorTerm{ - // MatchExpressions: []corev1.NodeSelectorRequirement{ - // { - // Key: WorkloadSpreadFakeZoneKey, - // Operator: corev1.NodeSelectorOpIn, - // Values: []string{"zone-a"}, - // }, - // }, - // }, - // MaxReplicas: &intstr.IntOrString{Type: intstr.String, StrVal: "50%"}, - // Patch: runtime.RawExtension{ - // Raw: []byte(`{"metadata":{"annotations":{"subset":"zone-a"}}}`), - // }, - // } - // subset2 := appsv1alpha1.WorkloadSpreadSubset{ - // Name: "zone-b", - // RequiredNodeSelectorTerm: &corev1.NodeSelectorTerm{ - // MatchExpressions: []corev1.NodeSelectorRequirement{ - // { - // Key: WorkloadSpreadFakeZoneKey, - // Operator: corev1.NodeSelectorOpIn, - // Values: []string{"zone-b"}, - // }, - // }, - // }, - // MaxReplicas: &intstr.IntOrString{Type: intstr.String, StrVal: "50%"}, - // Patch: runtime.RawExtension{ - // Raw: []byte(`{"metadata":{"annotations":{"subset":"zone-b"}}}`), - // }, - // } - // workloadSpread := tester.NewWorkloadSpread(ns, workloadSpreadName, &targetRef, []appsv1alpha1.WorkloadSpreadSubset{subset1, subset2}) - // workloadSpread = tester.CreateWorkloadSpread(workloadSpread) - // - // // create cloneset, replicas = 2 - // cloneSet = tester.CreateCloneSet(cloneSet) - // tester.WaitForCloneSetRunning(cloneSet) - // - // // get pods, and check workloadSpread - // ginkgo.By(fmt.Sprintf("get cloneSet(%s/%s) pods, and check workloadSpread(%s/%s) status", cloneSet.Namespace, cloneSet.Name, workloadSpread.Namespace, workloadSpread.Name)) - // pods, err := tester.GetSelectorPods(cloneSet.Namespace, cloneSet.Spec.Selector) - // gomega.Expect(err).NotTo(gomega.HaveOccurred()) - // gomega.Expect(pods).To(gomega.HaveLen(2)) - // subset1Pods := 0 - // subset2Pods := 0 - // for _, pod := range pods { - // if str, ok := pod.Annotations[workloadspread.MatchedWorkloadSpreadSubsetAnnotations]; ok { - // var injectWorkloadSpread *workloadspread.InjectWorkloadSpread - // err := json.Unmarshal([]byte(str), &injectWorkloadSpread) - // gomega.Expect(err).NotTo(gomega.HaveOccurred()) - // if injectWorkloadSpread.Subset == subset1.Name { - // subset1Pods++ - // gomega.Expect(injectWorkloadSpread.Name).To(gomega.Equal(workloadSpread.Name)) - // gomega.Expect(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions).To(gomega.Equal(subset1.RequiredNodeSelectorTerm.MatchExpressions)) - // gomega.Expect(pod.Annotations[workloadspread.PodDeletionCostAnnotation]).To(gomega.Equal("200")) - // } else if injectWorkloadSpread.Subset == subset2.Name { - // subset2Pods++ - // gomega.Expect(injectWorkloadSpread.Name).To(gomega.Equal(workloadSpread.Name)) - // gomega.Expect(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions).To(gomega.Equal(subset2.RequiredNodeSelectorTerm.MatchExpressions)) - // gomega.Expect(pod.Annotations[workloadspread.PodDeletionCostAnnotation]).To(gomega.Equal("100")) - // } - // } else { - // // others PodDeletionCostAnnotation not set - // gomega.Expect(pod.Annotations[workloadspread.PodDeletionCostAnnotation]).To(gomega.Equal("")) - // } - // } - // gomega.Expect(subset1Pods).To(gomega.Equal(1)) - // gomega.Expect(subset2Pods).To(gomega.Equal(1)) - // - // // update cloneset image - // ginkgo.By(fmt.Sprintf("update cloneSet(%s/%s) image=%s", cloneSet.Namespace, cloneSet.Name, NewWebserverImage)) - // cloneSet.Spec.Template.Spec.Containers[0].Image = NewWebserverImage - // tester.UpdateCloneSet(cloneSet) - // tester.WaitForCloneSetRunning(cloneSet) - // - // // get pods, and check workloadSpread - // ginkgo.By(fmt.Sprintf("get cloneSet(%s/%s) pods, and check workloadSpread(%s/%s) status", cloneSet.Namespace, cloneSet.Name, workloadSpread.Namespace, workloadSpread.Name)) - // pods, err = tester.GetSelectorPods(cloneSet.Namespace, cloneSet.Spec.Selector) - // gomega.Expect(err).NotTo(gomega.HaveOccurred()) - // gomega.Expect(pods).To(gomega.HaveLen(2)) - // subset1Pods = 0 - // subset2Pods = 0 - // for _, pod := range pods { - // if str, ok := pod.Annotations[workloadspread.MatchedWorkloadSpreadSubsetAnnotations]; ok { - // var injectWorkloadSpread *workloadspread.InjectWorkloadSpread - // err := json.Unmarshal([]byte(str), &injectWorkloadSpread) - // gomega.Expect(err).NotTo(gomega.HaveOccurred()) - // if injectWorkloadSpread.Subset == subset1.Name { - // subset1Pods++ - // gomega.Expect(injectWorkloadSpread.Name).To(gomega.Equal(workloadSpread.Name)) - // gomega.Expect(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions).To(gomega.Equal(subset1.RequiredNodeSelectorTerm.MatchExpressions)) - // gomega.Expect(pod.Annotations[workloadspread.PodDeletionCostAnnotation]).To(gomega.Equal("200")) - // } else if injectWorkloadSpread.Subset == subset2.Name { - // subset2Pods++ - // gomega.Expect(injectWorkloadSpread.Name).To(gomega.Equal(workloadSpread.Name)) - // gomega.Expect(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions).To(gomega.Equal(subset2.RequiredNodeSelectorTerm.MatchExpressions)) - // gomega.Expect(pod.Annotations[workloadspread.PodDeletionCostAnnotation]).To(gomega.Equal("100")) - // } - // } else { - // // others PodDeletionCostAnnotation not set - // gomega.Expect(pod.Annotations[workloadspread.PodDeletionCostAnnotation]).To(gomega.Equal("")) - // } - // } - // gomega.Expect(subset1Pods).To(gomega.Equal(1)) - // gomega.Expect(subset2Pods).To(gomega.Equal(1)) - // - // //scale up cloneSet.replicas = 6 - // ginkgo.By(fmt.Sprintf("scale up cloneSet(%s/%s) replicas=6", cloneSet.Namespace, cloneSet.Name)) - // cloneSet.Spec.Replicas = ptr.To(int32(6)) - // tester.UpdateCloneSet(cloneSet) - // tester.WaitForCloneSetRunning(cloneSet) - // - // // get pods, and check workloadSpread - // ginkgo.By(fmt.Sprintf("get cloneSet(%s/%s) pods, and check workloadSpread(%s/%s) status", cloneSet.Namespace, cloneSet.Name, workloadSpread.Namespace, workloadSpread.Name)) - // pods, err = tester.GetSelectorPods(cloneSet.Namespace, cloneSet.Spec.Selector) - // gomega.Expect(err).NotTo(gomega.HaveOccurred()) - // gomega.Expect(pods).To(gomega.HaveLen(6)) - // subset1Pods = 0 - // subset2Pods = 0 - // for _, pod := range pods { - // if str, ok := pod.Annotations[workloadspread.MatchedWorkloadSpreadSubsetAnnotations]; ok { - // var injectWorkloadSpread *workloadspread.InjectWorkloadSpread - // err := json.Unmarshal([]byte(str), &injectWorkloadSpread) - // gomega.Expect(err).NotTo(gomega.HaveOccurred()) - // if injectWorkloadSpread.Subset == subset1.Name { - // subset1Pods++ - // gomega.Expect(injectWorkloadSpread.Name).To(gomega.Equal(workloadSpread.Name)) - // gomega.Expect(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions).To(gomega.Equal(subset1.RequiredNodeSelectorTerm.MatchExpressions)) - // gomega.Expect(pod.Annotations[workloadspread.PodDeletionCostAnnotation]).To(gomega.Equal("200")) - // } else if injectWorkloadSpread.Subset == subset2.Name { - // subset2Pods++ - // gomega.Expect(injectWorkloadSpread.Name).To(gomega.Equal(workloadSpread.Name)) - // gomega.Expect(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions).To(gomega.Equal(subset2.RequiredNodeSelectorTerm.MatchExpressions)) - // gomega.Expect(pod.Annotations[workloadspread.PodDeletionCostAnnotation]).To(gomega.Equal("100")) - // } - // } else { - // // others PodDeletionCostAnnotation not set - // gomega.Expect(pod.Annotations[workloadspread.PodDeletionCostAnnotation]).To(gomega.Equal("")) - // } - // } - // gomega.Expect(subset1Pods).To(gomega.Equal(3)) - // gomega.Expect(subset2Pods).To(gomega.Equal(3)) - // - // workloadSpread, err = kc.AppsV1alpha1().WorkloadSpreads(workloadSpread.Namespace).Get(workloadSpread.Name, metav1.GetOptions{}) - // gomega.Expect(err).NotTo(gomega.HaveOccurred()) - // - // gomega.Expect(workloadSpread.Status.SubsetStatuses[0].Name).To(gomega.Equal(workloadSpread.Spec.Subsets[0].Name)) - // gomega.Expect(workloadSpread.Status.SubsetStatuses[0].MissingReplicas).To(gomega.Equal(int32(0))) - // gomega.Expect(len(workloadSpread.Status.SubsetStatuses[0].CreatingPods)).To(gomega.Equal(0)) - // gomega.Expect(len(workloadSpread.Status.SubsetStatuses[0].DeletingPods)).To(gomega.Equal(0)) - // - // //scale down cloneSet.replicas = 2 - // ginkgo.By(fmt.Sprintf("scale down cloneSet(%s/%s) replicas=2", cloneSet.Namespace, cloneSet.Name)) - // cloneSet.Spec.Replicas = ptr.To(int32(2)) - // tester.UpdateCloneSet(cloneSet) - // tester.WaitForCloneSetRunning(cloneSet) - // - // // get pods, and check workloadSpread - // ginkgo.By(fmt.Sprintf("get cloneSet(%s/%s) pods, and check workloadSpread(%s/%s) status", cloneSet.Namespace, cloneSet.Name, workloadSpread.Namespace, workloadSpread.Name)) - // pods, err = tester.GetSelectorPods(cloneSet.Namespace, cloneSet.Spec.Selector) - // gomega.Expect(err).NotTo(gomega.HaveOccurred()) - // gomega.Expect(pods).To(gomega.HaveLen(2)) - // subset1Pods = 0 - // subset2Pods = 0 - // for _, pod := range pods { - // if str, ok := pod.Annotations[workloadspread.MatchedWorkloadSpreadSubsetAnnotations]; ok { - // var injectWorkloadSpread *workloadspread.InjectWorkloadSpread - // err := json.Unmarshal([]byte(str), &injectWorkloadSpread) - // gomega.Expect(err).NotTo(gomega.HaveOccurred()) - // if injectWorkloadSpread.Subset == subset1.Name { - // subset1Pods++ - // gomega.Expect(injectWorkloadSpread.Name).To(gomega.Equal(workloadSpread.Name)) - // gomega.Expect(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions).To(gomega.Equal(subset1.RequiredNodeSelectorTerm.MatchExpressions)) - // gomega.Expect(pod.Annotations[workloadspread.PodDeletionCostAnnotation]).To(gomega.Equal("200")) - // } else if injectWorkloadSpread.Subset == subset2.Name { - // subset2Pods++ - // gomega.Expect(injectWorkloadSpread.Name).To(gomega.Equal(workloadSpread.Name)) - // gomega.Expect(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions).To(gomega.Equal(subset2.RequiredNodeSelectorTerm.MatchExpressions)) - // gomega.Expect(pod.Annotations[workloadspread.PodDeletionCostAnnotation]).To(gomega.Equal("100")) - // } - // } else { - // // others PodDeletionCostAnnotation not set - // gomega.Expect(pod.Annotations[workloadspread.PodDeletionCostAnnotation]).To(gomega.Equal("")) - // } - // } - // gomega.Expect(subset1Pods).To(gomega.Equal(1)) - // gomega.Expect(subset2Pods).To(gomega.Equal(1)) - // - // ginkgo.By("deploy in two zone, maxReplicas=50%, done") - //}) + framework.ConformanceIt("job-like custom workload", func() { + newTargetReference := func(name string, apiVersion, kind string) *appsv1alpha1.TargetReference { + return &appsv1alpha1.TargetReference{ + APIVersion: apiVersion, + Kind: kind, + Name: name, + } + } + newSubsets := func(aMax, bMax intstr.IntOrString) []appsv1alpha1.WorkloadSpreadSubset { + return []appsv1alpha1.WorkloadSpreadSubset{ + { + Name: "subset-a", + MaxReplicas: &aMax, + }, + { + Name: "subset-b", + MaxReplicas: &bMax, + }, + } + } + checkWorkloadSpread := func(ws *appsv1alpha1.WorkloadSpread, replicasA, missA, replicasB, missB int) func(gomega.Gomega) { + return func(g gomega.Gomega) { + ws, err := tester.GetWorkloadSpread(ws.Namespace, ws.Name) + g.Expect(err).NotTo(gomega.HaveOccurred()) + statuses := ws.Status.SubsetStatuses + g.Expect(len(statuses)).To(gomega.BeEquivalentTo(2)) + g.Expect(statuses[0].Replicas).To(gomega.BeEquivalentTo(replicasA)) + g.Expect(statuses[0].MissingReplicas).To(gomega.BeEquivalentTo(missA)) + g.Expect(statuses[1].Replicas).To(gomega.BeEquivalentTo(replicasB)) + g.Expect(statuses[1].MissingReplicas).To(gomega.BeEquivalentTo(missB)) + } + } + ginkgo.By("invalid targetFilter") + ws := tester.NewWorkloadSpread(ns, "ws-invalid-target-filter", newTargetReference("invalid-target-filter", "apps.kruise.io/v1alpha1", "DaemonSet"), newSubsets(intstr.FromInt32(2), intstr.FromInt32(5))) + ws.Spec.TargetFilter = &appsv1alpha1.TargetFilter{ + Selector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "key", + Operator: metav1.LabelSelectorOpIn, + }, + }, + }, + } + _, err := tester.KC.AppsV1alpha1().WorkloadSpreads(ws.Namespace).Create(context.Background(), ws, metav1.CreateOptions{}) + gomega.Expect(err).To(gomega.HaveOccurred()) + + ginkgo.By("custom workload with no replicas: replicas 5, 5 => 2/2, 1/5") + ws = tester.NewWorkloadSpread(ns, "ws-no-replicas", newTargetReference("no-replicas", "apps.kruise.io/v1alpha1", "DaemonSet"), newSubsets(intstr.FromInt32(2), intstr.FromInt32(5))) + tester.CreateWorkloadSpread(ws) + ads := tester.NewBaseDaemonSet("no-replicas", ns) + tester.CreateDaemonSet(ads) + gomega.Eventually(checkWorkloadSpread(ws, 2, 0, 1, 4)).WithTimeout(time.Minute).WithPolling(time.Second).Should(gomega.Succeed()) + + ginkgo.By("custom workload with replicas path in whitelist (which is MASTER), want replicas 5 and 1 ps + 3 master created (pods not filtered), subset replicas 20%, 80% => 1/1, 3/4") + ws = tester.NewWorkloadSpread(ns, "ws-replicas-whitelist", newTargetReference("replicas-whitelist", "kubeflow.org/v1", "TFJob"), newSubsets(intstr.FromString("20%"), intstr.FromString("80%"))) + tfjob := tester.NewTFJob("replicas-whitelist", ns, 1, 5, 0) + tester.CreateWorkloadSpread(ws) + tester.CreateTFJob(tfjob, 1, 3, 0) + gomega.Eventually(checkWorkloadSpread(ws, 1, 0, 3, 1)).WithTimeout(time.Minute).WithPolling(time.Second).Should(gomega.Succeed()) + + ginkgo.By("custom workload with target filter (which is worker), want replicas 4 and 1 ps + 1 master + 2 worker created (pods filtered), subset replicas 25%, 75% => 1/1, 1/3") + ws = tester.NewWorkloadSpread(ns, "ws-with-filter", newTargetReference("with-filter", "kubeflow.org/v1", "TFJob"), newSubsets(intstr.FromString("25%"), intstr.FromString("75%"))) + ws.Spec.TargetFilter = &appsv1alpha1.TargetFilter{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "role": "worker", + }, + }, + ReplicasPathList: []string{ + "spec.tfReplicaSpecs.Worker.replicas", + }, + } + tester.CreateWorkloadSpread(ws) + tfjob = tester.NewTFJob("with-filter", ns, 1, 1, 4) + tester.CreateTFJob(tfjob, 1, 1, 2) + gomega.Eventually(checkWorkloadSpread(ws, 1, 0, 1, 2)).WithTimeout(time.Minute).WithPolling(time.Second).Should(gomega.Succeed()) + }) //test k8s cluster version >= 1.21 ginkgo.It("elastic deploy for deployment, zone-a=2, zone-b=nil", func() { diff --git a/test/e2e/framework/workloadspread_util.go b/test/e2e/framework/workloadspread_util.go index 962b5d5338..e72d5a772e 100644 --- a/test/e2e/framework/workloadspread_util.go +++ b/test/e2e/framework/workloadspread_util.go @@ -18,36 +18,41 @@ package framework import ( "context" + "fmt" "time" "github.com/onsi/gomega" + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1" + kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned" + "github.com/openkruise/kruise/pkg/util" appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/dynamic" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/util/retry" kubecontroller "k8s.io/kubernetes/pkg/controller" imageutils "k8s.io/kubernetes/test/utils/image" "k8s.io/utils/ptr" - - appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" - appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1" - kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned" - "github.com/openkruise/kruise/pkg/util" ) type WorkloadSpreadTester struct { C clientset.Interface - kc kruiseclientset.Interface + KC kruiseclientset.Interface + dc dynamic.Interface } -func NewWorkloadSpreadTester(c clientset.Interface, kc kruiseclientset.Interface) *WorkloadSpreadTester { +func NewWorkloadSpreadTester(c clientset.Interface, kc kruiseclientset.Interface, dc dynamic.Interface) *WorkloadSpreadTester { return &WorkloadSpreadTester{ C: c, - kc: kc, + KC: kc, + dc: dc, } } @@ -231,6 +236,64 @@ func (t *WorkloadSpreadTester) NewBaseDeployment(namespace string) *appsv1.Deplo } } +func (t *WorkloadSpreadTester) NewTFJob(name, namespace string, ps, master, worker int64) *unstructured.Unstructured { + un := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "spec": map[string]interface{}{ + "tfReplicaSpecs": map[string]interface{}{ + "PS": map[string]interface{}{ + "replicas": ps, + }, + "MASTER": map[string]interface{}{ + "replicas": master, + }, + "Worker": map[string]interface{}{ + "replicas": worker, + }, + }, + }, + }, + } + un.SetAPIVersion("kubeflow.org/v1") + un.SetKind("TFJob") + un.SetNamespace(namespace) + un.SetName(name) + return un +} + +func (t *WorkloadSpreadTester) NewBaseDaemonSet(name, namespace string) *appsv1alpha1.DaemonSet { + return &appsv1alpha1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: appsv1alpha1.DaemonSetSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": name, + }, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": name, + }, + }, + Spec: corev1.PodSpec{ + TerminationGracePeriodSeconds: ptr.To(int64(0)), + Containers: []corev1.Container{ + { + Name: "main", + Image: imageutils.GetE2EImage(imageutils.Httpd), + Command: []string{"/bin/sh", "-c", "sleep 10000000"}, + }, + }, + }, + }, + }, + } +} + func (t *WorkloadSpreadTester) SetNodeLabel(c clientset.Interface, node *corev1.Node, key, value string) { labels := node.GetLabels() if labels == nil { @@ -251,29 +314,31 @@ func (t *WorkloadSpreadTester) SetNodeLabel(c clientset.Interface, node *corev1. } func (t *WorkloadSpreadTester) CreateWorkloadSpread(workloadSpread *appsv1alpha1.WorkloadSpread) *appsv1alpha1.WorkloadSpread { - Logf("create WorkloadSpread (%s/%s)", workloadSpread.Namespace, workloadSpread.Name) - _, err := t.kc.AppsV1alpha1().WorkloadSpreads(workloadSpread.Namespace).Create(context.TODO(), workloadSpread, metav1.CreateOptions{}) - gomega.Expect(err).NotTo(gomega.HaveOccurred()) - t.WaitForWorkloadSpreadRunning(workloadSpread) - Logf("create workloadSpread (%s/%s) success", workloadSpread.Namespace, workloadSpread.Name) - workloadSpread, _ = t.kc.AppsV1alpha1().WorkloadSpreads(workloadSpread.Namespace).Get(context.TODO(), workloadSpread.Name, metav1.GetOptions{}) + gomega.Eventually(func(g gomega.Gomega) { + Logf("create WorkloadSpread (%s/%s)", workloadSpread.Namespace, workloadSpread.Name) + _, err := t.KC.AppsV1alpha1().WorkloadSpreads(workloadSpread.Namespace).Create(context.TODO(), workloadSpread, metav1.CreateOptions{}) + g.Expect(err).NotTo(gomega.HaveOccurred()) + t.WaitForWorkloadSpreadRunning(workloadSpread) + Logf("create workloadSpread (%s/%s) success", workloadSpread.Namespace, workloadSpread.Name) + workloadSpread, _ = t.KC.AppsV1alpha1().WorkloadSpreads(workloadSpread.Namespace).Get(context.TODO(), workloadSpread.Name, metav1.GetOptions{}) + }).WithTimeout(20 * time.Second).WithPolling(time.Second).Should(gomega.Succeed()) return workloadSpread } func (t *WorkloadSpreadTester) GetWorkloadSpread(namespace, name string) (*appsv1alpha1.WorkloadSpread, error) { Logf("Get WorkloadSpread (%s/%s)", namespace, name) - return t.kc.AppsV1alpha1().WorkloadSpreads(namespace).Get(context.TODO(), name, metav1.GetOptions{}) + return t.KC.AppsV1alpha1().WorkloadSpreads(namespace).Get(context.TODO(), name, metav1.GetOptions{}) } func (t *WorkloadSpreadTester) GetCloneSet(namespace, name string) (*appsv1alpha1.CloneSet, error) { Logf("Get CloneSet (%s/%s)", namespace, name) - return t.kc.AppsV1alpha1().CloneSets(namespace).Get(context.TODO(), name, metav1.GetOptions{}) + return t.KC.AppsV1alpha1().CloneSets(namespace).Get(context.TODO(), name, metav1.GetOptions{}) } func (t *WorkloadSpreadTester) WaitForWorkloadSpreadRunning(ws *appsv1alpha1.WorkloadSpread) { pollErr := wait.PollUntilContextTimeout(context.Background(), time.Second, time.Minute*5, true, func(ctx context.Context) (bool, error) { - inner, err := t.kc.AppsV1alpha1().WorkloadSpreads(ws.Namespace).Get(context.TODO(), ws.Name, metav1.GetOptions{}) + inner, err := t.KC.AppsV1alpha1().WorkloadSpreads(ws.Namespace).Get(context.TODO(), ws.Name, metav1.GetOptions{}) if err != nil { return false, err } @@ -289,19 +354,19 @@ func (t *WorkloadSpreadTester) WaitForWorkloadSpreadRunning(ws *appsv1alpha1.Wor func (t *WorkloadSpreadTester) CreateCloneSet(cloneSet *appsv1alpha1.CloneSet) *appsv1alpha1.CloneSet { Logf("create CloneSet (%s/%s)", cloneSet.Namespace, cloneSet.Name) - _, err := t.kc.AppsV1alpha1().CloneSets(cloneSet.Namespace).Create(context.TODO(), cloneSet, metav1.CreateOptions{}) + _, err := t.KC.AppsV1alpha1().CloneSets(cloneSet.Namespace).Create(context.TODO(), cloneSet, metav1.CreateOptions{}) gomega.Expect(err).NotTo(gomega.HaveOccurred()) Logf("create cloneSet (%s/%s) success", cloneSet.Namespace, cloneSet.Name) - cloneSet, _ = t.kc.AppsV1alpha1().CloneSets(cloneSet.Namespace).Get(context.TODO(), cloneSet.Name, metav1.GetOptions{}) + cloneSet, _ = t.KC.AppsV1alpha1().CloneSets(cloneSet.Namespace).Get(context.TODO(), cloneSet.Name, metav1.GetOptions{}) return cloneSet } func (t *WorkloadSpreadTester) CreateStatefulSet(statefulSet *appsv1alpha1.StatefulSet) *appsv1beta1.StatefulSet { Logf("create statefulSet (%s/%s)", statefulSet.Namespace, statefulSet.Name) - _, err := t.kc.AppsV1alpha1().StatefulSets(statefulSet.Namespace).Create(context.TODO(), statefulSet, metav1.CreateOptions{}) + _, err := t.KC.AppsV1alpha1().StatefulSets(statefulSet.Namespace).Create(context.TODO(), statefulSet, metav1.CreateOptions{}) gomega.Expect(err).NotTo(gomega.HaveOccurred()) Logf("create statefulSet (%s/%s) success", statefulSet.Namespace, statefulSet.Name) - asts, _ := t.kc.AppsV1beta1().StatefulSets(statefulSet.Namespace).Get(context.TODO(), statefulSet.Name, metav1.GetOptions{}) + asts, _ := t.KC.AppsV1beta1().StatefulSets(statefulSet.Namespace).Get(context.TODO(), statefulSet.Name, metav1.GetOptions{}) return asts } @@ -329,10 +394,75 @@ func (t *WorkloadSpreadTester) CreateJob(job *batchv1.Job) *batchv1.Job { return job } +func (t *WorkloadSpreadTester) CreateTFJob(obj *unstructured.Unstructured, ps, master, worker int) { + Logf("create TFJob (%s/%s)", obj.GetNamespace(), obj.GetName()) + // create unstructured with dynamic client + obj, err := t.dc.Resource(schema.GroupVersionResource{ + Group: "kubeflow.org", + Version: "v1", + Resource: "tfjobs", + }).Namespace(obj.GetNamespace()).Create(context.Background(), obj, metav1.CreateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + // ger PS, MASTER and Worker replicas from obj + Logf("creating fake pods: PS %d, MASTER %d, Worker: %d", ps, master, worker) + createPod := func(name string, labels map[string]string) { + fakePod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labels, + Name: name, + Namespace: obj.GetNamespace(), + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "kubeflow.org/v1", + BlockOwnerDeletion: ptr.To(true), + Controller: ptr.To(true), + Kind: "TFJob", + Name: obj.GetName(), + UID: obj.GetUID(), + }, + }, + }, + Spec: corev1.PodSpec{ + TerminationGracePeriodSeconds: ptr.To(int64(0)), + Containers: []corev1.Container{ + { + Name: "main", + Image: imageutils.GetE2EImage(imageutils.Httpd), + Command: []string{"/bin/sh", "-c", "sleep 10000000"}, + }, + }, + }, + } + _, err = t.C.CoreV1().Pods(obj.GetNamespace()).Create(context.Background(), fakePod, metav1.CreateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + } + for i := 0; i < ps; i++ { + createPod(fmt.Sprintf("tfjob-%s-ps-%d", obj.GetName(), i), map[string]string{"app": "tfjob", "role": "ps"}) + } + for i := 0; i < master; i++ { + createPod(fmt.Sprintf("tfjob-%s-master-%d", obj.GetName(), i), map[string]string{"app": "tfjob", "role": "master"}) + } + for i := 0; i < worker; i++ { + createPod(fmt.Sprintf("fake-tfjob-worker-%d", i), map[string]string{"app": "tfjob", "role": "worker"}) + } +} + +func (t *WorkloadSpreadTester) CreateDaemonSet(ads *appsv1alpha1.DaemonSet) *appsv1alpha1.DaemonSet { + Logf("create DaemonSet (%s/%s)", ads.Namespace, ads.Name) + _, err := t.KC.AppsV1alpha1().DaemonSets(ads.Namespace).Create(context.Background(), ads, metav1.CreateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Eventually(func(g gomega.Gomega) { + ads, err = t.KC.AppsV1alpha1().DaemonSets(ads.Namespace).Get(context.Background(), ads.Name, metav1.GetOptions{}) + g.Expect(err).NotTo(gomega.HaveOccurred()) + g.Expect(ads.Status.NumberReady).To(gomega.BeEquivalentTo(3)) + }).WithTimeout(time.Minute).WithPolling(time.Second).Should(gomega.Succeed()) + return ads +} + func (t *WorkloadSpreadTester) WaitForCloneSetRunning(cloneSet *appsv1alpha1.CloneSet) { pollErr := wait.PollUntilContextTimeout(context.Background(), time.Second, time.Minute*10, true, func(ctx context.Context) (bool, error) { - inner, err := t.kc.AppsV1alpha1().CloneSets(cloneSet.Namespace).Get(context.TODO(), cloneSet.Name, metav1.GetOptions{}) + inner, err := t.KC.AppsV1alpha1().CloneSets(cloneSet.Namespace).Get(context.TODO(), cloneSet.Name, metav1.GetOptions{}) if err != nil { return false, err } @@ -351,7 +481,7 @@ func (t *WorkloadSpreadTester) WaitForCloneSetRunning(cloneSet *appsv1alpha1.Clo func (t *WorkloadSpreadTester) WaitForStatefulSetRunning(statefulSet *appsv1beta1.StatefulSet) { pollErr := wait.PollUntilContextTimeout(context.Background(), time.Second, time.Minute*10, true, func(ctx context.Context) (bool, error) { - inner, err := t.kc.AppsV1beta1().StatefulSets(statefulSet.Namespace).Get(context.TODO(), statefulSet.Name, metav1.GetOptions{}) + inner, err := t.KC.AppsV1beta1().StatefulSets(statefulSet.Namespace).Get(context.TODO(), statefulSet.Name, metav1.GetOptions{}) if err != nil { return false, err } @@ -370,7 +500,7 @@ func (t *WorkloadSpreadTester) WaitForStatefulSetRunning(statefulSet *appsv1beta func (t *WorkloadSpreadTester) WaitForCloneSetRunReplicas(cloneSet *appsv1alpha1.CloneSet, replicas int32) { pollErr := wait.PollUntilContextTimeout(context.Background(), time.Second, time.Minute*5, true, func(ctx context.Context) (bool, error) { - inner, err := t.kc.AppsV1alpha1().CloneSets(cloneSet.Namespace).Get(context.TODO(), cloneSet.Name, metav1.GetOptions{}) + inner, err := t.KC.AppsV1alpha1().CloneSets(cloneSet.Namespace).Get(context.TODO(), cloneSet.Name, metav1.GetOptions{}) if err != nil { return false, err } @@ -442,10 +572,10 @@ func (t *WorkloadSpreadTester) GetSelectorPods(namespace string, selector *metav func (t *WorkloadSpreadTester) UpdateCloneSet(cloneSet *appsv1alpha1.CloneSet) { //goland:noinspection SqlNoDataSourceInspection Logf("update cloneSet (%s/%s)", cloneSet.Namespace, cloneSet.Name) - clone, _ := t.kc.AppsV1alpha1().CloneSets(cloneSet.Namespace).Get(context.TODO(), cloneSet.Name, metav1.GetOptions{}) + clone, _ := t.KC.AppsV1alpha1().CloneSets(cloneSet.Namespace).Get(context.TODO(), cloneSet.Name, metav1.GetOptions{}) err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { clone.Spec = cloneSet.Spec - _, updateErr := t.kc.AppsV1alpha1().CloneSets(clone.Namespace).Update(context.TODO(), clone, metav1.UpdateOptions{}) + _, updateErr := t.KC.AppsV1alpha1().CloneSets(clone.Namespace).Update(context.TODO(), clone, metav1.UpdateOptions{}) if updateErr == nil { return nil } @@ -472,7 +602,7 @@ func (t *WorkloadSpreadTester) UpdateDeployment(deployment *appsv1.Deployment) { func (t *WorkloadSpreadTester) WaiteCloneSetUpdate(cloneSet *appsv1alpha1.CloneSet) { pollErr := wait.PollUntilContextTimeout(context.Background(), time.Second, time.Minute*5, true, func(ctx context.Context) (bool, error) { - inner, err := t.kc.AppsV1alpha1().CloneSets(cloneSet.Namespace).Get(context.TODO(), cloneSet.Name, metav1.GetOptions{}) + inner, err := t.KC.AppsV1alpha1().CloneSets(cloneSet.Namespace).Get(context.TODO(), cloneSet.Name, metav1.GetOptions{}) if err != nil { return false, err } @@ -513,19 +643,19 @@ func (t *WorkloadSpreadTester) WaiteDeploymentUpdate(deployment *appsv1.Deployme func (t *WorkloadSpreadTester) UpdateWorkloadSpread(workloadSpread *appsv1alpha1.WorkloadSpread) { Logf("update workloadSpread (%s/%s)", workloadSpread.Namespace, workloadSpread.Name) - clone, _ := t.kc.AppsV1alpha1().WorkloadSpreads(workloadSpread.Namespace).Get(context.TODO(), workloadSpread.Name, metav1.GetOptions{}) + clone, _ := t.KC.AppsV1alpha1().WorkloadSpreads(workloadSpread.Namespace).Get(context.TODO(), workloadSpread.Name, metav1.GetOptions{}) err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { clone.Spec = workloadSpread.Spec - _, updateErr := t.kc.AppsV1alpha1().WorkloadSpreads(clone.Namespace).Update(context.TODO(), clone, metav1.UpdateOptions{}) + _, updateErr := t.KC.AppsV1alpha1().WorkloadSpreads(clone.Namespace).Update(context.TODO(), clone, metav1.UpdateOptions{}) if updateErr == nil { return nil } - clone, _ = t.kc.AppsV1alpha1().WorkloadSpreads(clone.Namespace).Get(context.TODO(), clone.Name, metav1.GetOptions{}) + clone, _ = t.KC.AppsV1alpha1().WorkloadSpreads(clone.Namespace).Get(context.TODO(), clone.Name, metav1.GetOptions{}) return updateErr }) gomega.Expect(err).NotTo(gomega.HaveOccurred()) pollErr := wait.PollUntilContextTimeout(context.Background(), time.Second, time.Minute*2, true, func(ctx context.Context) (bool, error) { - clone, err = t.kc.AppsV1alpha1().WorkloadSpreads(clone.Namespace).Get(context.TODO(), workloadSpread.Name, metav1.GetOptions{}) + clone, err = t.KC.AppsV1alpha1().WorkloadSpreads(clone.Namespace).Get(context.TODO(), workloadSpread.Name, metav1.GetOptions{}) if clone.Generation == clone.Status.ObservedGeneration { return true, nil } diff --git a/test/kruise-e2e-config.yaml b/test/kruise-e2e-config.yaml new file mode 100644 index 0000000000..6197e36bab --- /dev/null +++ b/test/kruise-e2e-config.yaml @@ -0,0 +1,43 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: kruise-configuration + namespace: kruise-system +data: + "WorkloadSpread_Watch_Custom_Workload_WhiteList": | + { + "workloads": [ + { + "Group": "kubeflow.org", + "Version": "v1", + "Kind": "TFJob", + "ReplicasPath": "spec.tfReplicaSpecs.MASTER.replicas" + }, + { + "Group": "apps.kruise.io", + "Kind": "DaemonSet" + } + ] + } +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: kruise-e2e-access +rules: + - apiGroups: [ "kubeflow.org" ] + resources: [ "tfjobs" ] + verbs: [ "get" ] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: kruise-e2e-access-binding +subjects: + - kind: ServiceAccount + name: kruise-manager + namespace: kruise-system +roleRef: + kind: ClusterRole + name: kruise-e2e-access + apiGroup: rbac.authorization.k8s.io \ No newline at end of file diff --git a/tools/hack/run-kruise-e2e-test.sh b/tools/hack/run-kruise-e2e-test.sh index 0c7811d2f8..1baeafa8ff 100755 --- a/tools/hack/run-kruise-e2e-test.sh +++ b/tools/hack/run-kruise-e2e-test.sh @@ -18,11 +18,9 @@ set -ex export KUBECONFIG=${HOME}/.kube/config make ginkgo set +e -#./bin/ginkgo -p -timeout 60m -v --focus='\[apps\] InplaceVPA' test/e2e -#./bin/ginkgo -p -timeout 60m -v --focus='\[apps\] CloneSet' test/e2e +echo "installing tfjobs crds" +kubectl apply -f https://raw.githubusercontent.com/kubeflow/training-operator/refs/heads/v1.8-branch/manifests/base/crds/kubeflow.org_tfjobs.yaml ./bin/ginkgo -p -timeout 60m -v --focus='\[apps\] StatefulSet' test/e2e -#./bin/ginkgo -p -timeout 60m -v --focus='\[apps\] (CloneSet|InplaceVPA)' test/e2e - retVal=$? restartCount=$(kubectl get pod -n kruise-system -l control-plane=controller-manager --no-headers | awk '{print $4}') if [ "${restartCount}" -eq "0" ];then