@@ -4,9 +4,11 @@ import (
4
4
"context"
5
5
"fmt"
6
6
"testing"
7
+ "time"
7
8
8
9
"github.com/stretchr/testify/assert"
9
10
"github.com/stretchr/testify/mock"
11
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
10
12
"k8s.io/apimachinery/pkg/types"
11
13
12
14
idlcore "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
@@ -184,9 +186,15 @@ func createNodeExecutionContext(dataStore *storage.DataStore, eventRecorder inte
184
186
nCtx .OnNodeStateWriter ().Return (nodeStateWriter )
185
187
186
188
// NodeStatus
189
+ nowMinus := time .Now ().Add (time .Duration (- 5 ) * time .Second )
190
+ metav1NowMinus := metav1.Time {
191
+ Time : nowMinus ,
192
+ }
187
193
nCtx .OnNodeStatus ().Return (& v1alpha1.NodeStatus {
188
194
DataDir : storage .DataReference ("s3://bucket/data" ),
189
195
OutputDir : storage .DataReference ("s3://bucket/output" ),
196
+ LastAttemptStartedAt : & metav1NowMinus ,
197
+ StartedAt : & metav1NowMinus ,
190
198
})
191
199
192
200
return nCtx
@@ -508,25 +516,27 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
508
516
}
509
517
510
518
tests := []struct {
511
- name string
512
- parallelism * uint32
513
- minSuccessRatio * float32
514
- subNodePhases []v1alpha1.NodePhase
515
- subNodeTaskPhases []core.Phase
516
- subNodeTransitions []handler.Transition
517
- expectedArrayNodePhase v1alpha1.ArrayNodePhase
518
- expectedArrayNodeSubPhases []v1alpha1.NodePhase
519
- expectedTransitionPhase handler.EPhase
520
- expectedExternalResourcePhases []idlcore.TaskExecution_Phase
521
- currentWfParallelism uint32
522
- maxWfParallelism uint32
523
- incrementParallelismCount uint32
524
- useFakeEventRecorder bool
525
- eventRecorderFailures uint32
526
- eventRecorderError error
527
- expectedTaskPhaseVersion uint32
528
- expectHandleError bool
529
- expectedEventingCalls int
519
+ name string
520
+ parallelism * uint32
521
+ minSuccessRatio * float32
522
+ subNodePhases []v1alpha1.NodePhase
523
+ subNodeTaskPhases []core.Phase
524
+ subNodeDeltaTimestamps []uint64
525
+ subNodeTransitions []handler.Transition
526
+ expectedArrayNodePhase v1alpha1.ArrayNodePhase
527
+ expectedArrayNodeSubPhases []v1alpha1.NodePhase
528
+ expectedDiffArrayNodeSubDeltaTimestamps []bool
529
+ expectedTransitionPhase handler.EPhase
530
+ expectedExternalResourcePhases []idlcore.TaskExecution_Phase
531
+ currentWfParallelism uint32
532
+ maxWfParallelism uint32
533
+ incrementParallelismCount uint32
534
+ useFakeEventRecorder bool
535
+ eventRecorderFailures uint32
536
+ eventRecorderError error
537
+ expectedTaskPhaseVersion uint32
538
+ expectHandleError bool
539
+ expectedEventingCalls int
530
540
}{
531
541
{
532
542
name : "StartAllSubNodes" ,
@@ -829,6 +839,31 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
829
839
expectHandleError : true ,
830
840
expectedEventingCalls : 1 ,
831
841
},
842
+ {
843
+ name : "DeltaTimestampUpdates" ,
844
+ parallelism : uint32Ptr (0 ),
845
+ subNodePhases : []v1alpha1.NodePhase {
846
+ v1alpha1 .NodePhaseQueued ,
847
+ v1alpha1 .NodePhaseRunning ,
848
+ },
849
+ subNodeTaskPhases : []core.Phase {
850
+ core .PhaseUndefined ,
851
+ core .PhaseUndefined ,
852
+ },
853
+ subNodeTransitions : []handler.Transition {
854
+ handler .DoTransition (handler .TransitionTypeEphemeral , handler .PhaseInfoRunning (& handler.ExecutionInfo {})),
855
+ handler .DoTransition (handler .TransitionTypeEphemeral , handler .PhaseInfoRetryableFailure (idlcore .ExecutionError_SYSTEM , "" , "" , & handler.ExecutionInfo {})),
856
+ },
857
+ expectedArrayNodePhase : v1alpha1 .ArrayNodePhaseExecuting ,
858
+ expectedArrayNodeSubPhases : []v1alpha1.NodePhase {
859
+ v1alpha1 .NodePhaseRunning ,
860
+ v1alpha1 .NodePhaseRetryableFailure ,
861
+ },
862
+ expectedTaskPhaseVersion : 1 ,
863
+ expectedTransitionPhase : handler .EPhaseRunning ,
864
+ expectedExternalResourcePhases : []idlcore.TaskExecution_Phase {idlcore .TaskExecution_RUNNING , idlcore .TaskExecution_FAILED },
865
+ incrementParallelismCount : 1 ,
866
+ },
832
867
}
833
868
834
869
for _ , test := range tests {
@@ -870,6 +905,10 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
870
905
arrayNodeState .SubNodePhases .SetItem (i , bitarray .Item (nodePhase )) // #nosec G115
871
906
}
872
907
908
+ for i , deltaTimestmap := range test .subNodeDeltaTimestamps {
909
+ arrayNodeState .SubNodeDeltaTimestamps .SetItem (i , bitarray .Item (deltaTimestmap )) // #nosec G115
910
+ }
911
+
873
912
nodeSpec := arrayNodeSpec
874
913
nodeSpec .ArrayNode .Parallelism = test .parallelism
875
914
nodeSpec .ArrayNode .MinSuccessRatio = test .minSuccessRatio
@@ -925,6 +964,14 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
925
964
assert .Equal (t , expectedPhase , v1alpha1 .NodePhase (arrayNodeState .SubNodePhases .GetItem (i ))) // #nosec G115
926
965
}
927
966
967
+ for i , expectedDiffDeltaTimestamps := range test .expectedDiffArrayNodeSubDeltaTimestamps {
968
+ if expectedDiffDeltaTimestamps {
969
+ assert .NotEqual (t , arrayNodeState .SubNodeDeltaTimestamps .GetItem (i ), test .subNodeDeltaTimestamps [i ])
970
+ } else {
971
+ assert .Equal (t , arrayNodeState .SubNodeDeltaTimestamps .GetItem (i ), test .subNodeDeltaTimestamps [i ])
972
+ }
973
+ }
974
+
928
975
bufferedEventRecorder , ok := eventRecorder .(* bufferedEventRecorder )
929
976
if ok {
930
977
if len (test .expectedExternalResourcePhases ) > 0 {
0 commit comments