Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,188 @@
package dataflowaffinity

import (
"context"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/common"
"github.com/fluid-cloudnative/fluid/pkg/utils/fake"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"reflect"
"testing"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

func TestDataOpJobReconciler_injectPodNodeLabelsToJob(t *testing.T) {
type args struct {
job *batchv1.Job
pods *v1.Pod
node *v1.Node
}
tests := []struct {
name string
args args
wantAnnotations map[string]string
wantErr bool
}{
{
name: "job with succeed pods",
args: args{
job: &batchv1.Job{
var _ = Describe("DataOpJobReconciler", func() {
var testScheme *runtime.Scheme

BeforeEach(func() {
testScheme = runtime.NewScheme()
Expect(v1.AddToScheme(testScheme)).To(Succeed())
Expect(batchv1.AddToScheme(testScheme)).To(Succeed())
Expect(datav1alpha1.AddToScheme(testScheme)).To(Succeed())
})

Describe("ControllerName", func() {
It("returns the controller name constant", func() {
f := &DataOpJobReconciler{Log: fake.NullLogger()}
Expect(f.ControllerName()).To(Equal(DataOpJobControllerName))
})
})

Describe("ManagedResource", func() {
It("returns a batchv1.Job object", func() {
f := &DataOpJobReconciler{Log: fake.NullLogger()}
obj := f.ManagedResource()
Expect(obj).To(BeAssignableToTypeOf(&batchv1.Job{}))
})
})

Describe("NewDataOpJobReconciler", func() {
It("constructs a reconciler with the given client, logger, and recorder", func() {
c := fake.NewFakeClientWithScheme(testScheme)
logger := fake.NullLogger()
r := NewDataOpJobReconciler(c, logger, nil)
Expect(r).NotTo(BeNil())
Expect(r.Client).To(Equal(c))
})
})

Describe("Reconcile", func() {
Context("when the job does not exist", func() {
It("returns an error (not-found propagates)", func() {
c := fake.NewFakeClientWithScheme(testScheme)
f := &DataOpJobReconciler{Client: c, Log: fake.NullLogger()}
_, err := f.Reconcile(context.TODO(), reconcile.Request{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

It is a better practice to use context.Background() instead of context.TODO() in tests when a background, non-cancellable context is needed. context.TODO() is intended as a temporary placeholder. Please replace all instances of context.TODO() with context.Background() in this file for consistency and clarity.

Suggested change
_, err := f.Reconcile(context.TODO(), reconcile.Request{
_, err := f.Reconcile(context.Background(), reconcile.Request{

NamespacedName: types.NamespacedName{Name: "missing-job", Namespace: "default"},
})
Expect(err).To(HaveOccurred())
})
})

Context("when job should not be in queue (cronjob label)", func() {
It("returns no-requeue without error", func() {
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "cron-job",
Namespace: "default",
Labels: map[string]string{
common.LabelAnnotationManagedBy: common.Fluid,
"cronjob": "something",
},
},
}
c := fake.NewFakeClientWithScheme(testScheme, job)
f := &DataOpJobReconciler{Client: c, Log: fake.NullLogger()}
result, err := f.Reconcile(context.TODO(), reconcile.Request{
NamespacedName: types.NamespacedName{Name: "cron-job", Namespace: "default"},
})
Expect(err).NotTo(HaveOccurred())
Expect(result.Requeue).To(BeFalse())
})
})

Context("when job is a valid fluid job without affinity annotation", func() {
It("injects the dataflow affinity annotation and returns no-requeue", func() {
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "test-job",

Check failure on line 109 in pkg/controllers/v1alpha1/fluidapp/dataflowaffinity/dataflowaffinity_controller_test.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Define a constant instead of duplicating this literal "test-job" 4 times.

See more on https://sonarcloud.io/project/issues?id=fluid-cloudnative_fluid&issues=AZzTffkFNQvIVGdB_p2j&open=AZzTffkFNQvIVGdB_p2j&pullRequest=5685
Namespace: "default",
Labels: map[string]string{
common.LabelAnnotationManagedBy: common.Fluid,
},
},
}
c := fake.NewFakeClientWithScheme(testScheme, job)
f := &DataOpJobReconciler{Client: c, Log: fake.NullLogger()}
result, err := f.Reconcile(context.TODO(), reconcile.Request{
NamespacedName: types.NamespacedName{Name: "test-job", Namespace: "default"},
})
Expect(err).NotTo(HaveOccurred())
Expect(result.Requeue).To(BeFalse())

updatedJob := &batchv1.Job{}
Expect(c.Get(context.TODO(), types.NamespacedName{Name: "test-job", Namespace: "default"}, updatedJob)).To(Succeed())
Expect(updatedJob.Annotations).To(HaveKeyWithValue(common.AnnotationDataFlowAffinityInject, "true"))
})
})

Context("when job is complete and has a succeeded pod", func() {
It("injects node labels and returns no-requeue", func() {
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "complete-job",

Check failure on line 134 in pkg/controllers/v1alpha1/fluidapp/dataflowaffinity/dataflowaffinity_controller_test.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Define a constant instead of duplicating this literal "complete-job" 3 times.

See more on https://sonarcloud.io/project/issues?id=fluid-cloudnative_fluid&issues=AZzTffkFNQvIVGdB_p2k&open=AZzTffkFNQvIVGdB_p2k&pullRequest=5685
Namespace: "default",
Labels: map[string]string{
common.LabelAnnotationManagedBy: common.Fluid,
},
Annotations: map[string]string{
common.AnnotationDataFlowAffinityInject: "true",
},
},
Spec: batchv1.JobSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"controller-uid": "abc-123",

Check failure on line 146 in pkg/controllers/v1alpha1/fluidapp/dataflowaffinity/dataflowaffinity_controller_test.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Define a constant instead of duplicating this literal "controller-uid" 6 times.

See more on https://sonarcloud.io/project/issues?id=fluid-cloudnative_fluid&issues=AZzTffkFNQvIVGdB_p2l&open=AZzTffkFNQvIVGdB_p2l&pullRequest=5685
},
},
},
Status: batchv1.JobStatus{
Conditions: []batchv1.JobCondition{
{Type: batchv1.JobComplete},
},
},
}
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "complete-pod",
Namespace: "default",
Labels: map[string]string{
"controller-uid": "abc-123",
},
},
Spec: v1.PodSpec{
NodeName: "node01",
},
Status: v1.PodStatus{
Phase: v1.PodSucceeded,
},
}
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node01",
Labels: map[string]string{
common.K8sNodeNameLabelKey: "node01",
common.K8sRegionLabelKey: "region01",
common.K8sZoneLabelKey: "zone01",
},
},
}
c := fake.NewFakeClientWithScheme(testScheme, job, pod, node)
f := &DataOpJobReconciler{Client: c, Log: fake.NullLogger()}
result, err := f.Reconcile(context.TODO(), reconcile.Request{
NamespacedName: types.NamespacedName{Name: "complete-job", Namespace: "default"},
})
Expect(err).NotTo(HaveOccurred())
Expect(result.Requeue).To(BeFalse())

updatedJob := &batchv1.Job{}
Expect(c.Get(context.TODO(), types.NamespacedName{Name: "complete-job", Namespace: "default"}, updatedJob)).To(Succeed())
Expect(updatedJob.Annotations).To(HaveKeyWithValue(common.AnnotationDataFlowCustomizedAffinityPrefix+common.K8sNodeNameLabelKey, "node01"))
Expect(updatedJob.Annotations).To(HaveKeyWithValue(common.AnnotationDataFlowCustomizedAffinityPrefix+common.K8sRegionLabelKey, "region01"))
Expect(updatedJob.Annotations).To(HaveKeyWithValue(common.AnnotationDataFlowCustomizedAffinityPrefix+common.K8sZoneLabelKey, "zone01"))
})
})
})

Describe("injectPodNodeLabelsToJob", func() {
Context("when job has a succeeded pod", func() {
It("should inject node labels as annotations onto the job", func() {
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "test-job",
Labels: map[string]string{
Expand All @@ -57,8 +212,8 @@
},
},
},
},
pods: &v1.Pod{
}
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
Labels: map[string]string{
Expand Down Expand Up @@ -91,8 +246,8 @@
Status: v1.PodStatus{
Phase: v1.PodSucceeded,
},
},
node: &v1.Node{
}
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node01",
Labels: map[string]string{
Expand All @@ -102,29 +257,42 @@
"k8s.gpu": "true",
},
},
},
},
wantAnnotations: map[string]string{
common.AnnotationDataFlowCustomizedAffinityPrefix + common.K8sNodeNameLabelKey: "node01",
common.AnnotationDataFlowCustomizedAffinityPrefix + common.K8sRegionLabelKey: "region01",
common.AnnotationDataFlowCustomizedAffinityPrefix + common.K8sZoneLabelKey: "zone01",
common.AnnotationDataFlowCustomizedAffinityPrefix + "k8s.gpu": "true",
},
wantErr: false,
},
{
name: "job with failed pods",
args: args{
job: &batchv1.Job{
}

c := fake.NewFakeClientWithScheme(testScheme, job, pod, node)
f := &DataOpJobReconciler{
Client: c,
Log: fake.NullLogger(),
}

err := f.injectPodNodeLabelsToJob(job)
Expect(err).NotTo(HaveOccurred())

wantAnnotations := map[string]string{
common.AnnotationDataFlowCustomizedAffinityPrefix + common.K8sNodeNameLabelKey: "node01",
common.AnnotationDataFlowCustomizedAffinityPrefix + common.K8sRegionLabelKey: "region01",
common.AnnotationDataFlowCustomizedAffinityPrefix + common.K8sZoneLabelKey: "zone01",
common.AnnotationDataFlowCustomizedAffinityPrefix + "k8s.gpu": "true",
}
Expect(job.Annotations).To(Equal(wantAnnotations))
})
})

Context("when job has only a failed pod", func() {
It("should return an error", func() {
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "test-job-failed",
},
Spec: batchv1.JobSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"controller-uid": "455afc34-93b1-4e75-a6fa-8e13d2c6ca06",
},
},
},
},
pods: &v1.Pod{
}
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
Labels: map[string]string{
Expand All @@ -134,41 +302,25 @@
Status: v1.PodStatus{
Phase: v1.PodFailed,
},
},
node: &v1.Node{
}
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node01",
Labels: map[string]string{
common.K8sNodeNameLabelKey: "node01",
common.K8sRegionLabelKey: "region01",
common.K8sZoneLabelKey: "zone01",
"k8s.gpu": "true",
},
},
},
},
wantErr: true,
},
}
testScheme := runtime.NewScheme()
_ = v1.AddToScheme(testScheme)
_ = batchv1.AddToScheme(testScheme)
_ = datav1alpha1.AddToScheme(testScheme)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var c = fake.NewFakeClientWithScheme(testScheme, tt.args.job, tt.args.pods, tt.args.node)

f := &DataOpJobReconciler{
Client: c,
Log: fake.NullLogger(),
}
err := f.injectPodNodeLabelsToJob(tt.args.job)
if (err != nil) != tt.wantErr {
t.Errorf("injectPodNodeLabelsToJob() error = %v, wantErr %v", err, tt.wantErr)
}
if err == nil && !reflect.DeepEqual(tt.args.job.Annotations, tt.wantAnnotations) {
t.Errorf("injectPodNodeLabelsToJob() got = %v, want %v", tt.args.job.Labels, tt.wantAnnotations)
}
}

c := fake.NewFakeClientWithScheme(testScheme, job, pod, node)
f := &DataOpJobReconciler{
Client: c,
Log: fake.NullLogger(),
}

err := f.injectPodNodeLabelsToJob(job)
Expect(err).To(HaveOccurred())
})
})
}
}
})
})
29 changes: 29 additions & 0 deletions pkg/controllers/v1alpha1/fluidapp/dataflowaffinity/suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
Copyright 2024 The Fluid Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package dataflowaffinity

import (
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func TestDataflowaffinity(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Dataflowaffinity Suite")
}
6 changes: 3 additions & 3 deletions test/gha-e2e/jindo/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ function panic() {

function setup_minio() {
kubectl create -f test/gha-e2e/jindo/minio.yaml
minio_pod=$(kubectl get pod -oname | grep minio)
kubectl wait --for=condition=Ready $minio_pod
kubectl wait --for=condition=Ready --timeout=180s -l app=minio pod

kubectl exec -it $minio_pod -- /bin/bash -c 'mc alias set myminio http://127.0.0.1:9000 minioadmin minioadmin && mc mb myminio/mybucket && echo "helloworld" > testfile && mc mv testfile myminio/mybucket/subpath/testfile && mc cat myminio/mybucket/subpath/testfile'
minio_pod=$(kubectl get pod -l app=minio -oname)
kubectl exec $minio_pod -- /bin/bash -c 'mc alias set myminio http://127.0.0.1:9000 minioadmin minioadmin && mc mb myminio/mybucket && echo "helloworld" > testfile && mc mv testfile myminio/mybucket/subpath/testfile && mc cat myminio/mybucket/subpath/testfile'
}

function create_dataset() {
Expand Down
Loading