Skip to content

Commit c5cedc5

Browse files
committed
[RayCluster] fast-fail in RayClusterScaleExpectation's IsSatisfied
1 parent 58c2aad commit c5cedc5

File tree

2 files changed

+144
-26
lines changed

2 files changed

+144
-26
lines changed

ray-operator/controllers/ray/expectations/scale_expectations.go

Lines changed: 29 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -70,44 +70,47 @@ func (r *rayClusterScaleExpectationImpl) ExpectScalePod(namespace, rayClusterNam
7070
}
7171
}
7272

73+
func (r *rayClusterScaleExpectationImpl) isPodScaled(ctx context.Context, rp *rayPod, namespace string) bool {
74+
pod := &corev1.Pod{}
75+
switch rp.action {
76+
case Create:
77+
if err := r.Get(ctx, types.NamespacedName{Name: rp.name, Namespace: namespace}, pod); err == nil {
78+
return true
79+
}
80+
// Tolerating extreme case:
81+
// The first reconciliation created a Pod. If the Pod was quickly deleted from etcd by another component
82+
// before the second reconciliation. This would lead to never satisfying the expected condition.
83+
// Avoid this by setting a timeout.
84+
return rp.recordTimestamp.Add(ExpectationsTimeout).Before(time.Now())
85+
case Delete:
86+
if err := r.Get(ctx, types.NamespacedName{Name: rp.name, Namespace: namespace}, pod); err != nil {
87+
return errors.IsNotFound(err)
88+
}
89+
}
90+
return false
91+
}
92+
7393
func (r *rayClusterScaleExpectationImpl) IsSatisfied(ctx context.Context, namespace, rayClusterName, group string) (isSatisfied bool) {
7494
items, err := r.itemsCache.ByIndex(GroupIndex, fmt.Sprintf("%s/%s/%s", namespace, rayClusterName, group))
7595
if err != nil {
7696
// An error occurs when there is no corresponding IndexFunc for GroupIndex. This should be a fatal error.
7797
panic(err)
7898
}
79-
isSatisfied = true
8099
for i := range items {
81100
rp := items[i].(*rayPod)
82-
pod := &corev1.Pod{}
83-
isPodSatisfied := false
84-
switch rp.action {
85-
case Create:
86-
if err := r.Get(ctx, types.NamespacedName{Name: rp.name, Namespace: namespace}, pod); err == nil {
87-
isPodSatisfied = true
88-
} else {
89-
// Tolerating extreme case:
90-
// The first reconciliation created a Pod. If the Pod was quickly deleted from etcd by another component
91-
// before the second reconciliation. This would lead to never satisfying the expected condition.
92-
// Avoid this by setting a timeout.
93-
isPodSatisfied = rp.recordTimestamp.Add(ExpectationsTimeout).Before(time.Now())
94-
}
95-
case Delete:
96-
if err := r.Get(ctx, types.NamespacedName{Name: rp.name, Namespace: namespace}, pod); err != nil {
97-
isPodSatisfied = errors.IsNotFound(err)
98-
}
101+
isPodSatisfied := r.isPodScaled(ctx, rp, namespace)
102+
103+
if !isPodSatisfied {
104+
return false
99105
}
106+
100107
// delete satisfied item in cache
101-
if isPodSatisfied {
102-
if err := r.itemsCache.Delete(items[i]); err != nil {
103-
// Fatal error in KeyFunc.
104-
panic(err)
105-
}
106-
} else {
107-
isSatisfied = false
108+
if err := r.itemsCache.Delete(items[i]); err != nil {
109+
// Fatal error in KeyFunc.
110+
panic(err)
108111
}
109112
}
110-
return isSatisfied
113+
return true
111114
}
112115

113116
func (r *rayClusterScaleExpectationImpl) Delete(rayClusterName, namespace string) {

ray-operator/controllers/ray/expectations/scale_expectations_test.go

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/stretchr/testify/require"
1010
corev1 "k8s.io/api/core/v1"
1111
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
12+
"sigs.k8s.io/controller-runtime/pkg/client"
1213
clientFake "sigs.k8s.io/controller-runtime/pkg/client/fake"
1314
)
1415

@@ -166,3 +167,117 @@ func getTestPod() []corev1.Pod {
166167
},
167168
}
168169
}
170+
171+
func TestIsPodScaled(t *testing.T) {
172+
ctx := context.Background()
173+
174+
tests := []struct {
175+
name string
176+
action ScaleAction
177+
podExists bool
178+
expectedResult bool
179+
setupFunc func(*testing.T, client.Client, *corev1.Pod)
180+
}{
181+
{
182+
name: "Create action - pod exists",
183+
action: Create,
184+
podExists: true,
185+
expectedResult: true,
186+
setupFunc: func(t *testing.T, client client.Client, pod *corev1.Pod) {
187+
err := client.Create(ctx, pod)
188+
require.NoError(t, err)
189+
},
190+
},
191+
{
192+
name: "Create action - pod does not exist",
193+
action: Create,
194+
podExists: false,
195+
expectedResult: false,
196+
setupFunc: func(t *testing.T, client client.Client, pod *corev1.Pod) {},
197+
},
198+
{
199+
name: "Delete action - pod exists",
200+
action: Delete,
201+
podExists: true,
202+
expectedResult: false,
203+
setupFunc: func(t *testing.T, client client.Client, pod *corev1.Pod) {
204+
err := client.Create(ctx, pod)
205+
require.NoError(t, err)
206+
},
207+
},
208+
{
209+
name: "Delete action - pod does not exist",
210+
action: Delete,
211+
podExists: false,
212+
expectedResult: true,
213+
setupFunc: func(t *testing.T, client client.Client, pod *corev1.Pod) {},
214+
},
215+
}
216+
217+
for _, tt := range tests {
218+
t.Run(tt.name, func(t *testing.T) {
219+
fakeClient := clientFake.NewClientBuilder().WithRuntimeObjects().Build()
220+
exp := &rayClusterScaleExpectationImpl{
221+
Client: fakeClient,
222+
itemsCache: nil, // Not used in isPodScaled
223+
}
224+
225+
testPod := &corev1.Pod{
226+
ObjectMeta: metav1.ObjectMeta{
227+
Name: "test-pod",
228+
Namespace: "default",
229+
},
230+
}
231+
232+
rp := &rayPod{
233+
name: testPod.Name,
234+
namespace: testPod.Namespace,
235+
action: tt.action,
236+
recordTimestamp: time.Now(),
237+
}
238+
239+
tt.setupFunc(t, fakeClient, testPod)
240+
241+
result := exp.isPodScaled(ctx, rp, testPod.Namespace)
242+
assert.Equal(t, tt.expectedResult, result)
243+
})
244+
}
245+
}
246+
247+
func TestIsPodScaledTimeout(t *testing.T) {
248+
ctx := context.Background()
249+
250+
// Save original timeout and restore after test
251+
originalTimeout := ExpectationsTimeout
252+
ExpectationsTimeout = 20 * time.Millisecond
253+
defer func() { ExpectationsTimeout = originalTimeout }()
254+
255+
fakeClient := clientFake.NewClientBuilder().WithRuntimeObjects().Build()
256+
exp := &rayClusterScaleExpectationImpl{
257+
Client: fakeClient,
258+
itemsCache: nil,
259+
}
260+
261+
testPod := &corev1.Pod{
262+
ObjectMeta: metav1.ObjectMeta{
263+
Name: "test-pod",
264+
Namespace: "default",
265+
},
266+
}
267+
268+
rp := &rayPod{
269+
name: testPod.Name,
270+
namespace: testPod.Namespace,
271+
action: Create,
272+
recordTimestamp: time.Now(),
273+
}
274+
275+
// Initially should return false (pod doesn't exist)
276+
result := exp.isPodScaled(ctx, rp, testPod.Namespace)
277+
assert.False(t, result)
278+
279+
// After timeout, should return true even though pod doesn't exist
280+
time.Sleep(ExpectationsTimeout + 10*time.Millisecond)
281+
result = exp.isPodScaled(ctx, rp, testPod.Namespace)
282+
assert.True(t, result)
283+
}

0 commit comments

Comments
 (0)