From 816b25c00122003e14e3d214d9de7a6e58b3323e Mon Sep 17 00:00:00 2001 From: Vyzaldy Sanchez Date: Mon, 7 Oct 2024 15:13:45 -0400 Subject: [PATCH] Refactors workflows engine loop (#14375) * Refactors workflows engine loop * Adds changeset + fixes lint * Adds mutex * Fixes tests - WIP * Improves locking handling for goroutines * Fixes lint * Fixes tests * Adds support for multi-branch steps workflows * Fixes lint issue * Improves error * Improves workflow processed check logic * Properly propagates states to step dependants * Generates proper workflow status on workflow being processed * Validates for workflow timeout before validating processed status * Fixes tests nil-pointer deference * Fixes merge update issue * Addresses review comments * Addresses review comments * Fixes status propagation to step dependents * Adds defensive lock-check --- .changeset/khaki-pants-melt.md | 5 + core/services/workflows/engine.go | 372 ++++++++++++------ core/services/workflows/engine_test.go | 152 +++++-- core/services/workflows/models.go | 3 +- core/services/workflows/store/store.go | 2 +- core/services/workflows/store/store_db.go | 36 +- .../services/workflows/store/store_db_test.go | 19 +- 7 files changed, 426 insertions(+), 163 deletions(-) create mode 100644 .changeset/khaki-pants-melt.md diff --git a/.changeset/khaki-pants-melt.md b/.changeset/khaki-pants-melt.md new file mode 100644 index 00000000000..133fd480f56 --- /dev/null +++ b/.changeset/khaki-pants-melt.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +#updated Workflows Engine loop refactored diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index cf57826cf6f..ed584ba5aec 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -31,6 +31,57 @@ type stepRequest struct { state store.WorkflowExecution } +type stepUpdateChannel struct { + executionID string + ch chan store.WorkflowExecutionStep +} + +type stepUpdateManager struct { + mu sync.RWMutex + m map[string]stepUpdateChannel +} + +func (sucm *stepUpdateManager) add(executionID string, ch stepUpdateChannel) (added bool) { + sucm.mu.RLock() + _, ok := sucm.m[executionID] + sucm.mu.RUnlock() + if ok { + return false + } + sucm.mu.Lock() + defer sucm.mu.Unlock() + if _, ok = sucm.m[executionID]; ok { + return false + } + sucm.m[executionID] = ch + return true +} + +func (sucm *stepUpdateManager) remove(executionID string) { + sucm.mu.Lock() + defer sucm.mu.Unlock() + if _, ok := sucm.m[executionID]; ok { + close(sucm.m[executionID].ch) + delete(sucm.m, executionID) + } +} + +func (sucm *stepUpdateManager) send(ctx context.Context, executionID string, stepUpdate store.WorkflowExecutionStep) error { + sucm.mu.RLock() + stepUpdateCh, ok := sucm.m[executionID] + sucm.mu.RUnlock() + if !ok { + return fmt.Errorf("step update channel not found for execution %s, dropping step update", executionID) + } + + select { + case <-ctx.Done(): + return fmt.Errorf("context canceled before step update could be issued: %w", context.Cause(ctx)) + case stepUpdateCh.ch <- stepUpdate: + return nil + } +} + // Engine handles the lifecycle of a single workflow and its executions. type Engine struct { services.StateMachine @@ -42,7 +93,7 @@ type Engine struct { executionStates store.Store pendingStepRequests chan stepRequest triggerEvents chan capabilities.TriggerResponse - stepUpdateCh chan store.WorkflowExecutionStep + stepUpdatesChMap stepUpdateManager wg sync.WaitGroup stopCh services.StopChan newWorkerTimeout time.Duration @@ -64,7 +115,7 @@ type Engine struct { clock clockwork.Clock } -func (e *Engine) Start(ctx context.Context) error { +func (e *Engine) Start(_ context.Context) error { return e.StartOnce("Engine", func() error { // create a new context, since the one passed in via Start is short-lived. ctx, _ := e.stopCh.NewCtx() @@ -74,9 +125,8 @@ func (e *Engine) Start(ctx context.Context) error { go e.worker(ctx) } - e.wg.Add(2) + e.wg.Add(1) go e.init(ctx) - go e.loop(ctx) return nil }) @@ -320,6 +370,16 @@ func (e *Engine) resumeInProgressExecutions(ctx context.Context) error { } for _, sd := range sds { + ch := make(chan store.WorkflowExecutionStep) + added := e.stepUpdatesChMap.add(execution.ExecutionID, stepUpdateChannel{ + ch: ch, + executionID: execution.ExecutionID, + }) + if added { + // We trigger the `stepUpdateLoop` for this execution, since the loop is not running atm. + e.wg.Add(1) + go e.stepUpdateLoop(ctx, execution.ExecutionID, ch, execution.CreatedAt) + } e.queueIfReady(execution, sd) } } @@ -397,62 +457,34 @@ func (e *Engine) registerTrigger(ctx context.Context, t *triggerCapability, trig return nil } -// loop is the synchronization goroutine for the engine, and is responsible for: -// - dispatching new workers up to the limit specified (default = 100) -// - starting a new execution when a trigger emits a message on `triggerEvents` -// - updating the `executionState` with the outcome of a `step`. +// stepUpdateLoop is a singleton goroutine per `Execution`, and it updates the `executionState` with the outcome of a `step`. // // Note: `executionState` is only mutated by this loop directly. // // This is important to avoid data races, and any accesses of `executionState` by any other // goroutine should happen via a `stepRequest` message containing a copy of the latest // `executionState`. -// -// This works because a worker thread for a given step will only -// be spun up once all dependent steps have completed (guaranteeing that the state associated -// with those dependent steps will no longer change). Therefore as long this worker thread only -// accesses data from dependent states, the data will never be stale. -func (e *Engine) loop(ctx context.Context) { +func (e *Engine) stepUpdateLoop(ctx context.Context, executionID string, stepUpdateCh chan store.WorkflowExecutionStep, workflowCreatedAt *time.Time) { defer e.wg.Done() + lggr := e.logger.With(eIDKey, executionID) + e.logger.Debugf("running stepUpdateLoop for execution %s", executionID) for { select { case <-ctx.Done(): - e.logger.Debug("shutting down loop") + lggr.Debug("shutting down stepUpdateLoop") return - case resp, isOpen := <-e.triggerEvents: - if !isOpen { - e.logger.Error("trigger events channel is no longer open, skipping") - continue - } - - if resp.Err != nil { - e.logger.Errorf("trigger event was an error %v; not executing", resp.Err) - continue - } - - te := resp.Event - - if te.ID == "" { - e.logger.With(tIDKey, te.TriggerType).Error("trigger event ID is empty; not executing") - continue - } - - executionID, err := generateExecutionID(e.workflow.id, te.ID) - if err != nil { - e.logger.With(tIDKey, te.ID).Errorf("could not generate execution ID: %v", err) - continue - } - - err = e.startExecution(ctx, executionID, resp.Event.Outputs) - if err != nil { - e.logger.With(eIDKey, executionID).Errorf("failed to start execution: %v", err) + case stepUpdate, open := <-stepUpdateCh: + if !open { + lggr.Debug("stepUpdate channel closed, shutting down stepUpdateLoop") + return } - case stepUpdate := <-e.stepUpdateCh: // Executed synchronously to ensure we correctly schedule subsequent tasks. - err := e.handleStepUpdate(ctx, stepUpdate) + e.logger.Debugw(fmt.Sprintf("received step update for execution %s", stepUpdate.ExecutionID), + eIDKey, stepUpdate.ExecutionID, sRKey, stepUpdate.Ref) + err := e.handleStepUpdate(ctx, stepUpdate, workflowCreatedAt) if err != nil { - e.logger.With(eIDKey, stepUpdate.ExecutionID, sRKey, stepUpdate.Ref). - Errorf("failed to update step state: %+v, %s", stepUpdate, err) + e.logger.Errorf(fmt.Sprintf("failed to update step state: %+v, %s", stepUpdate, err), + eIDKey, stepUpdate.ExecutionID, sRKey, stepUpdate.Ref) } } } @@ -492,7 +524,7 @@ func (e *Engine) startExecution(ctx context.Context, executionID string, event * Status: store.StatusStarted, } - err := e.executionStates.Add(ctx, ec) + dbWex, err := e.executionStates.Add(ctx, ec) if err != nil { return err } @@ -505,6 +537,19 @@ func (e *Engine) startExecution(ctx context.Context, executionID string, event * return err } + ch := make(chan store.WorkflowExecutionStep) + added := e.stepUpdatesChMap.add(executionID, stepUpdateChannel{ + ch: ch, + executionID: executionID, + }) + if !added { + // skip this execution since there's already a stepUpdateLoop running for the execution ID + e.logger.With(eIDKey, executionID).Debugf("won't start execution for execution %s, execution was already started", executionID) + return nil + } + e.wg.Add(1) + go e.stepUpdateLoop(ctx, executionID, ch, dbWex.CreatedAt) + for _, td := range triggerDependents { e.queueIfReady(*ec, td) } @@ -512,79 +557,51 @@ func (e *Engine) startExecution(ctx context.Context, executionID string, event * return nil } -func (e *Engine) handleStepUpdate(ctx context.Context, stepUpdate store.WorkflowExecutionStep) error { +func (e *Engine) handleStepUpdate(ctx context.Context, stepUpdate store.WorkflowExecutionStep, workflowCreatedAt *time.Time) error { + l := e.logger.With(eIDKey, stepUpdate.ExecutionID, sRKey, stepUpdate.Ref) + + // If we've been executing for too long, let's time the workflow step out and continue. + if workflowCreatedAt != nil && e.clock.Since(*workflowCreatedAt) > e.maxExecutionDuration { + l.Info("execution timed out; setting step status to timeout") + stepUpdate.Status = store.StatusTimeout + } + state, err := e.executionStates.UpsertStep(ctx, &stepUpdate) if err != nil { return err } - l := e.logger.With(eIDKey, state.ExecutionID, sRKey, stepUpdate.Ref) - - switch stepUpdate.Status { - case store.StatusCompleted: - stepDependents, err := e.workflow.dependents(stepUpdate.Ref) - if err != nil { - return err - } - - // There are no steps left to process in the current path, so let's check if - // we've completed the workflow. - if len(stepDependents) == 0 { - workflowCompleted := true - err := e.workflow.walkDo(workflows.KeywordTrigger, func(s *step) error { - step, ok := state.Steps[s.Ref] - // The step is missing from the state, - // which means it hasn't been processed yet. - // Let's mark `workflowCompleted` = false, and - // continue. - if !ok { - workflowCompleted = false - return nil - } - switch step.Status { - case store.StatusCompleted, store.StatusErrored, store.StatusCompletedEarlyExit: - default: - workflowCompleted = false - } - return nil - }) - if err != nil { - return err - } - - if workflowCompleted { - return e.finishExecution(ctx, state.ExecutionID, store.StatusCompleted) - } - } + workflowIsFullyProcessed, status, err := e.isWorkflowFullyProcessed(state) + if err != nil { + return err + } - // We haven't completed the workflow, but should we continue? - // If we've been executing for too long, let's time the workflow out and stop here. - if state.CreatedAt != nil && e.clock.Since(*state.CreatedAt) > e.maxExecutionDuration { + if workflowIsFullyProcessed { + switch status { + case store.StatusTimeout: l.Info("execution timed out") - return e.finishExecution(ctx, state.ExecutionID, store.StatusTimeout) + case store.StatusCompleted: + l.Info("workflow finished") + case store.StatusErrored: + l.Info("execution errored") + case store.StatusCompletedEarlyExit: + l.Info("execution terminated early") + // NOTE: even though this marks the workflow as completed, any branches of the DAG + // that don't depend on the step that signaled for an early exit will still complete. + // This is to ensure that any side effects are executed consistently, since otherwise + // the async nature of the workflow engine would provide no guarantees. } + return e.finishExecution(ctx, state.ExecutionID, status) + } - // Finally, since the workflow hasn't timed out or completed, let's - // check for any dependents that are ready to process. - for _, sd := range stepDependents { - e.queueIfReady(state, sd) - } - case store.StatusCompletedEarlyExit: - l.Info("execution terminated early") - // NOTE: even though this marks the workflow as completed, any branches of the DAG - // that don't depend on the step that signaled for an early exit will still complete. - // This is to ensure that any side effects are executed consistently, since otherwise - // the async nature of the workflow engine would provide no guarantees. - err := e.finishExecution(ctx, state.ExecutionID, store.StatusCompletedEarlyExit) - if err != nil { - return err - } - case store.StatusErrored: - l.Info("execution errored") - err := e.finishExecution(ctx, state.ExecutionID, store.StatusErrored) - if err != nil { - return err - } + // Finally, since the workflow hasn't timed out or completed, let's + // check for any dependents that are ready to process. + stepDependents, err := e.workflow.dependents(stepUpdate.Ref) + if err != nil { + return err + } + for _, sd := range stepDependents { + e.queueIfReady(state, sd) } return nil @@ -628,10 +645,14 @@ func (e *Engine) finishExecution(ctx context.Context, executionID string, status return err } + e.stepUpdatesChMap.remove(executionID) e.onExecutionFinished(executionID) return nil } +// worker is responsible for: +// - handling a `pendingStepRequests` +// - starting a new execution when a trigger emits a message on `triggerEvents` func (e *Engine) worker(ctx context.Context) { defer e.wg.Done() @@ -639,6 +660,34 @@ func (e *Engine) worker(ctx context.Context) { select { case pendingStepRequest := <-e.pendingStepRequests: e.workerForStepRequest(ctx, pendingStepRequest) + case resp, isOpen := <-e.triggerEvents: + if !isOpen { + e.logger.Error("trigger events channel is no longer open, skipping") + continue + } + + if resp.Err != nil { + e.logger.Errorf("trigger event was an error %v; not executing", resp.Err) + continue + } + + te := resp.Event + + if te.ID == "" { + e.logger.With(tIDKey, te.TriggerType).Error("trigger event ID is empty; not executing") + continue + } + + executionID, err := generateExecutionID(e.workflow.id, te.ID) + if err != nil { + e.logger.With(tIDKey, te.ID).Errorf("could not generate execution ID: %v", err) + continue + } + + err = e.startExecution(ctx, executionID, resp.Event.Outputs) + if err != nil { + e.logger.With(eIDKey, executionID).Errorf("failed to start execution: %v", err) + } case <-ctx.Done(): return } @@ -682,11 +731,13 @@ func (e *Engine) workerForStepRequest(ctx context.Context, msg stepRequest) { // receiving loop may not pick up any messages we emit. // Note: When full persistence support is added, any hanging steps // like this one will get picked up again and will be reprocessed. - select { - case <-ctx.Done(): - l.Errorf("context canceled before step update could be issued; error %v", err) - case e.stepUpdateCh <- *stepState: + l.Debugf("trying to send step state update for execution %s with status %s", stepState.ExecutionID, stepStatus) + err = e.stepUpdatesChMap.send(ctx, stepState.ExecutionID, *stepState) + if err != nil { + l.Errorf("failed to issue step state update; error %v", err) + return } + l.Debugf("sent step state update for execution %s with status %s", stepState.ExecutionID, stepStatus) } func merge(baseConfig *values.Map, overrideConfig *values.Map) *values.Map { @@ -807,6 +858,95 @@ func (e *Engine) deregisterTrigger(ctx context.Context, t *triggerCapability, tr return nil } +func (e *Engine) isWorkflowFullyProcessed(state store.WorkflowExecution) (bool, string, error) { + statuses := map[string]string{} + // we need to first propagate the status of the errored status if it exists... + err := e.workflow.walkDo(workflows.KeywordTrigger, func(s *step) error { + stateStep, ok := state.Steps[s.Ref] + if !ok { + // The step not existing on the state means that it has not been processed yet. + // So ignore it. + return nil + } + statuses[s.Ref] = stateStep.Status + switch stateStep.Status { + // For each step with any of the following statuses, propagate the statuses to its dependants + // since they will not be executed. + case store.StatusErrored, store.StatusCompletedEarlyExit, store.StatusTimeout: + // Let's properly propagate the status to all dependents, not just direct dependents. + queue := []string{s.Ref} + for len(queue) > 0 { + current := queue[0] // Grab the current step reference + queue = queue[1:] // Remove it from the queue + + // Find the dependents for the current step reference + dependents, err := e.workflow.dependents(current) + if err != nil { + return err + } + + // Propagate the status to all direct dependents + // With no direct dependents, it will go to the next step reference in the queue. + for _, sd := range dependents { + if _, dependentProcessed := statuses[sd.Ref]; !dependentProcessed { + statuses[sd.Ref] = stateStep.Status + // Queue the dependent for to be processed later + queue = append(queue, sd.Ref) + } + } + } + } + return nil + }) + if err != nil { + return false, "", err + } + + workflowProcessed := true + // Let's validate whether the workflow has been fully processed. + err = e.workflow.walkDo(workflows.KeywordTrigger, func(s *step) error { + // If the step is not part of the state, it is a pending step + // so we should consider the workflow as not fully processed. + if _, ok := statuses[s.Ref]; !ok { + workflowProcessed = false + } + return nil + }) + if err != nil { + return false, "", err + } + + if !workflowProcessed { + return workflowProcessed, "", nil + } + + var hasErrored, hasTimedOut, hasCompletedEarlyExit bool + // Let's determine the status of the workflow. + for _, status := range statuses { + switch status { + case store.StatusErrored: + hasErrored = true + case store.StatusTimeout: + hasTimedOut = true + case store.StatusCompletedEarlyExit: + hasCompletedEarlyExit = true + } + } + + // The `errored` status has precedence over the other statuses to be returned, based on occurrence. + // Status precedence: `errored` -> `timed_out` -> `completed_early_exit` -> `completed`. + if hasErrored { + return workflowProcessed, store.StatusErrored, nil + } + if hasTimedOut { + return workflowProcessed, store.StatusTimeout, nil + } + if hasCompletedEarlyExit { + return workflowProcessed, store.StatusCompletedEarlyExit, nil + } + return workflowProcessed, store.StatusCompleted, nil +} + func (e *Engine) Close() error { return e.StopOnce("Engine", func() error { e.logger.Info("shutting down engine") @@ -963,7 +1103,7 @@ func NewEngine(cfg Config) (engine *Engine, err error) { }, executionStates: cfg.Store, pendingStepRequests: make(chan stepRequest, cfg.QueueSize), - stepUpdateCh: make(chan store.WorkflowExecutionStep), + stepUpdatesChMap: stepUpdateManager{m: map[string]stepUpdateChannel{}}, triggerEvents: make(chan capabilities.TriggerResponse), stopCh: make(chan struct{}), newWorkerTimeout: cfg.NewWorkerTimeout, diff --git a/core/services/workflows/engine_test.go b/core/services/workflows/engine_test.go index f12680a062f..048c353c747 100644 --- a/core/services/workflows/engine_test.go +++ b/core/services/workflows/engine_test.go @@ -258,8 +258,8 @@ func TestEngineWithHardcodedWorkflow(t *testing.T) { trigger, cr := mockTrigger(t) require.NoError(t, reg.Add(ctx, trigger)) - require.NoError(t, reg.Add(ctx, mockConsensus())) - target1 := mockTarget() + require.NoError(t, reg.Add(ctx, mockConsensus(""))) + target1 := mockTarget("") require.NoError(t, reg.Add(ctx, target1)) target2 := newMockCapability( @@ -392,10 +392,13 @@ func mockFailingConsensus() *mockCapability { ) } -func mockConsensusWithEarlyTermination() *mockCapability { +func mockConsensusWithEarlyTermination(id string) *mockCapability { + if len(id) == 0 { + id = "offchain_reporting@1.0.0" + } return newMockCapability( capabilities.MustNewCapabilityInfo( - "offchain_reporting@1.0.0", + id, capabilities.CapabilityTypeConsensus, "an ocr3 consensus capability", ), @@ -407,10 +410,13 @@ func mockConsensusWithEarlyTermination() *mockCapability { ) } -func mockConsensus() *mockCapability { +func mockConsensus(id string) *mockCapability { + if len(id) == 0 { + id = "offchain_reporting@1.0.0" + } return newMockCapability( capabilities.MustNewCapabilityInfo( - "offchain_reporting@1.0.0", + id, capabilities.CapabilityTypeConsensus, "an ocr3 consensus capability", ), @@ -432,10 +438,13 @@ func mockConsensus() *mockCapability { ) } -func mockTarget() *mockCapability { +func mockTarget(id string) *mockCapability { + if len(id) == 0 { + id = "write_polygon-testnet-mumbai@1.0.0" + } return newMockCapability( capabilities.MustNewCapabilityInfo( - "write_polygon-testnet-mumbai@1.0.0", + id, capabilities.CapabilityTypeTarget, "a write capability targeting polygon mumbai testnet", ), @@ -457,7 +466,7 @@ func TestEngine_ErrorsTheWorkflowIfAStepErrors(t *testing.T) { require.NoError(t, reg.Add(ctx, trigger)) require.NoError(t, reg.Add(ctx, mockFailingConsensus())) - require.NoError(t, reg.Add(ctx, mockTarget())) + require.NoError(t, reg.Add(ctx, mockTarget("write_polygon-testnet-mumbai@1.0.0"))) eng, hooks := newTestEngineWithYAMLSpec(t, reg, simpleWorkflow) @@ -480,8 +489,8 @@ func TestEngine_GracefulEarlyTermination(t *testing.T) { trigger, _ := mockTrigger(t) require.NoError(t, reg.Add(ctx, trigger)) - require.NoError(t, reg.Add(ctx, mockConsensusWithEarlyTermination())) - require.NoError(t, reg.Add(ctx, mockTarget())) + require.NoError(t, reg.Add(ctx, mockConsensusWithEarlyTermination(""))) + require.NoError(t, reg.Add(ctx, mockTarget(""))) eng, hooks := newTestEngineWithYAMLSpec(t, reg, simpleWorkflow) servicetest.Run(t, eng) @@ -571,8 +580,8 @@ func TestEngine_MultiStepDependencies(t *testing.T) { trigger, tr := mockTrigger(t) require.NoError(t, reg.Add(ctx, trigger)) - require.NoError(t, reg.Add(ctx, mockConsensus())) - require.NoError(t, reg.Add(ctx, mockTarget())) + require.NoError(t, reg.Add(ctx, mockConsensus(""))) + require.NoError(t, reg.Add(ctx, mockTarget(""))) action, out := mockAction(t) require.NoError(t, reg.Add(ctx, action)) @@ -619,8 +628,8 @@ func TestEngine_ResumesPendingExecutions(t *testing.T) { require.NoError(t, err) require.NoError(t, reg.Add(ctx, trigger)) - require.NoError(t, reg.Add(ctx, mockConsensus())) - require.NoError(t, reg.Add(ctx, mockTarget())) + require.NoError(t, reg.Add(ctx, mockConsensus(""))) + require.NoError(t, reg.Add(ctx, mockTarget(""))) action, _ := mockAction(t) require.NoError(t, reg.Add(ctx, action)) @@ -640,7 +649,7 @@ func TestEngine_ResumesPendingExecutions(t *testing.T) { ExecutionID: "", Status: store.StatusStarted, } - err = dbstore.Add(ctx, ec) + _, err = dbstore.Add(ctx, ec) require.NoError(t, err) eng, hooks := newTestEngineWithYAMLSpec( @@ -671,8 +680,8 @@ func TestEngine_TimesOutOldExecutions(t *testing.T) { require.NoError(t, err) require.NoError(t, reg.Add(ctx, trigger)) - require.NoError(t, reg.Add(ctx, mockConsensus())) - require.NoError(t, reg.Add(ctx, mockTarget())) + require.NoError(t, reg.Add(ctx, mockConsensus(""))) + require.NoError(t, reg.Add(ctx, mockTarget(""))) action, _ := mockAction(t) require.NoError(t, reg.Add(ctx, action)) @@ -694,7 +703,7 @@ func TestEngine_TimesOutOldExecutions(t *testing.T) { ExecutionID: "", Status: store.StatusStarted, } - err = dbstore.Add(ctx, ec) + _, err = dbstore.Add(ctx, ec) require.NoError(t, err) eng, hooks := newTestEngineWithYAMLSpec( @@ -768,8 +777,8 @@ func TestEngine_WrapsTargets(t *testing.T) { trigger, _ := mockTrigger(t) require.NoError(t, reg.Add(ctx, trigger)) - require.NoError(t, reg.Add(ctx, mockConsensus())) - require.NoError(t, reg.Add(ctx, mockTarget())) + require.NoError(t, reg.Add(ctx, mockConsensus(""))) + require.NoError(t, reg.Add(ctx, mockTarget(""))) clock := clockwork.NewFakeClock() dbstore := newTestDBStore(t, clock) @@ -814,8 +823,8 @@ func TestEngine_GetsNodeInfoDuringInitialization(t *testing.T) { trigger, _ := mockTrigger(t) require.NoError(t, reg.Add(ctx, trigger)) - require.NoError(t, reg.Add(ctx, mockConsensus())) - require.NoError(t, reg.Add(ctx, mockTarget())) + require.NoError(t, reg.Add(ctx, mockConsensus(""))) + require.NoError(t, reg.Add(ctx, mockTarget(""))) clock := clockwork.NewFakeClock() dbstore := newTestDBStore(t, clock) @@ -906,7 +915,7 @@ func TestEngine_PassthroughInterpolation(t *testing.T) { trigger, _ := mockTrigger(t) require.NoError(t, reg.Add(ctx, trigger)) - require.NoError(t, reg.Add(ctx, mockConsensus())) + require.NoError(t, reg.Add(ctx, mockConsensus(""))) writeID := "write_ethereum-testnet-sepolia@1.0.0" target := newMockCapability( capabilities.MustNewCapabilityInfo( @@ -1021,7 +1030,7 @@ func TestEngine_MergesWorkflowConfigAndCRConfig(t *testing.T) { trigger, _ := mockTrigger(t) require.NoError(t, reg.Add(ctx, trigger)) - require.NoError(t, reg.Add(ctx, mockConsensus())) + require.NoError(t, reg.Add(ctx, mockConsensus(""))) writeID := "write_polygon-testnet-mumbai@1.0.0" gotConfig := values.EmptyMap() @@ -1093,7 +1102,7 @@ func TestEngine_HandlesNilConfigOnchain(t *testing.T) { trigger, _ := mockTrigger(t) require.NoError(t, reg.Add(ctx, trigger)) - require.NoError(t, reg.Add(ctx, mockConsensus())) + require.NoError(t, reg.Add(ctx, mockConsensus(""))) writeID := "write_polygon-testnet-mumbai@1.0.0" gotConfig := values.EmptyMap() @@ -1137,6 +1146,97 @@ func TestEngine_HandlesNilConfigOnchain(t *testing.T) { assert.Len(t, m.(map[string]any), 3) } +func TestEngine_MultiBranchExecution(t *testing.T) { + // This workflow describes 2 branches in the workflow graph. + // A -> B -> C + // A -> D -> E + workflowSpec := ` +triggers: + - id: "mercury-trigger@1.0.0" + config: + feedlist: + - "0x1111111111111111111100000000000000000000000000000000000000000000" # ETHUSD + - "0x2222222222222222222200000000000000000000000000000000000000000000" # LINKUSD + - "0x3333333333333333333300000000000000000000000000000000000000000000" # BTCUSD + +consensus: + - id: "offchain_reporting@1.0.0" + ref: "evm_median" + inputs: + observations: + - "$(trigger.outputs)" + config: + aggregation_method: "data_feeds_2_0" + aggregation_config: + "0x1111111111111111111100000000000000000000000000000000000000000000": + deviation: "0.001" + heartbeat: "30m" + "0x2222222222222222222200000000000000000000000000000000000000000000": + deviation: "0.001" + heartbeat: "30m" + "0x3333333333333333333300000000000000000000000000000000000000000000": + deviation: "0.001" + heartbeat: "30m" + encoder: "EVM" + encoder_config: + abi: "mercury_reports bytes[]" + - id: "early_exit_offchain_reporting@1.0.0" + ref: "evm_median_early_exit" + inputs: + observations: + - "$(trigger.outputs)" + config: + aggregation_method: "data_feeds_2_0" + aggregation_config: + "0x1111111111111111111100000000000000000000000000000000000000000000": + deviation: "0.001" + heartbeat: "30m" + "0x2222222222222222222200000000000000000000000000000000000000000000": + deviation: "0.001" + heartbeat: "30m" + "0x3333333333333333333300000000000000000000000000000000000000000000": + deviation: "0.001" + heartbeat: "30m" + encoder: "EVM" + encoder_config: + abi: "mercury_reports bytes[]" + +targets: + - id: "write_polygon-testnet-mumbai@1.0.0" + inputs: + report: "$(evm_median.outputs.report)" + config: + address: "0x3F3554832c636721F1fD1822Ccca0354576741Ef" + params: ["$(report)"] + abi: "receive(report bytes)" + - id: "write_polygon-testnet-early-exit@1.0.0" + inputs: + report: "$(evm_median_early_exit.outputs.report)" + config: + address: "0x3F3554832c636721F1fD1822Ccca0354576741Ef" + params: ["$(report)"] + abi: "receive(report bytes)" +` + ctx := testutils.Context(t) + reg := coreCap.NewRegistry(logger.TestLogger(t)) + + trigger, _ := mockTrigger(t) + require.NoError(t, reg.Add(ctx, trigger)) + require.NoError(t, reg.Add(ctx, mockConsensus(""))) + require.NoError(t, reg.Add(ctx, mockConsensusWithEarlyTermination("early_exit_offchain_reporting@1.0.0"))) + require.NoError(t, reg.Add(ctx, mockTarget(""))) + require.NoError(t, reg.Add(ctx, mockTarget("write_polygon-testnet-early-exit@1.0.0"))) + + eng, hooks := newTestEngineWithYAMLSpec(t, reg, workflowSpec) + servicetest.Run(t, eng) + + eid := getExecutionId(t, eng, hooks) + state, err := eng.executionStates.Get(ctx, eid) + require.NoError(t, err) + + assert.Equal(t, store.StatusCompletedEarlyExit, state.Status) +} + func basicTestTrigger(t *testing.T) *mockTriggerCapability { mt := &mockTriggerCapability{ CapabilityInfo: capabilities.MustNewCapabilityInfo( diff --git a/core/services/workflows/models.go b/core/services/workflows/models.go index cf1fffa9a4c..bf75fe432ee 100644 --- a/core/services/workflows/models.go +++ b/core/services/workflows/models.go @@ -53,8 +53,9 @@ func (w *workflow) walkDo(start string, do func(s *step) error) error { return outerErr } +// dependents returns all steps that directly depend on the step with the given ref func (w *workflow) dependents(start string) ([]*step, error) { - steps := []*step{} + var steps []*step m, err := w.Graph.AdjacencyMap() if err != nil { return nil, err diff --git a/core/services/workflows/store/store.go b/core/services/workflows/store/store.go index 72045ea062c..9f77cf3380e 100644 --- a/core/services/workflows/store/store.go +++ b/core/services/workflows/store/store.go @@ -5,7 +5,7 @@ import ( ) type Store interface { - Add(ctx context.Context, state *WorkflowExecution) error + Add(ctx context.Context, state *WorkflowExecution) (WorkflowExecution, error) UpsertStep(ctx context.Context, step *WorkflowExecutionStep) (WorkflowExecution, error) UpdateStatus(ctx context.Context, executionID string, status string) error Get(ctx context.Context, executionID string) (WorkflowExecution, error) diff --git a/core/services/workflows/store/store_db.go b/core/services/workflows/store/store_db.go index 80ecfbb2d6e..929fb5f377e 100644 --- a/core/services/workflows/store/store_db.go +++ b/core/services/workflows/store/store_db.go @@ -14,6 +14,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" "github.com/smartcontractkit/chainlink-common/pkg/values" valuespb "github.com/smartcontractkit/chainlink-common/pkg/values/pb" + "github.com/smartcontractkit/chainlink/v2/core/logger" ) @@ -202,11 +203,12 @@ func stateToStep(state *WorkflowExecutionStep) (workflowStepRow, error) { return wsr, nil } -// `Add` creates the relevant workflow_execution and workflow_step entries +// Add creates the relevant workflow_execution and workflow_step entries // to persist the passed in ExecutionState. -func (d *DBStore) Add(ctx context.Context, state *WorkflowExecution) error { +func (d *DBStore) Add(ctx context.Context, state *WorkflowExecution) (WorkflowExecution, error) { l := d.lggr.With("executionID", state.ExecutionID, "workflowID", state.WorkflowID, "status", state.Status) - return d.transact(ctx, func(db *DBStore) error { + var workflowExecution WorkflowExecution + err := d.transact(ctx, func(db *DBStore) error { var wid *string if state.WorkflowID != "" { wid = &state.WorkflowID @@ -219,12 +221,23 @@ func (d *DBStore) Add(ctx context.Context, state *WorkflowExecution) error { } l.Debug("Adding workflow execution") - err := db.insertWorkflowExecution(ctx, wex) + dbWex, err := db.insertWorkflowExecution(ctx, wex) if err != nil { return fmt.Errorf("could not insert workflow execution %s: %w", state.ExecutionID, err) } - - ws := []workflowStepRow{} + workflowExecution = WorkflowExecution{ + ExecutionID: dbWex.ID, + Status: dbWex.Status, + Steps: state.Steps, + CreatedAt: dbWex.CreatedAt, + UpdatedAt: dbWex.UpdatedAt, + FinishedAt: dbWex.FinishedAt, + } + // Tests are not passing the ID, so to avoid a nil-pointer dereference, we added this check. + if wid != nil { + workflowExecution.WorkflowID = *wid + } + var ws []workflowStepRow for _, step := range state.Steps { step, err := stateToStep(step) if err != nil { @@ -238,6 +251,8 @@ func (d *DBStore) Add(ctx context.Context, state *WorkflowExecution) error { } return nil }) + + return workflowExecution, err } func (d *DBStore) upsertSteps(ctx context.Context, steps []workflowStepRow) error { @@ -269,14 +284,15 @@ func (d *DBStore) upsertSteps(ctx context.Context, steps []workflowStepRow) erro return err } -func (d *DBStore) insertWorkflowExecution(ctx context.Context, execution *workflowExecutionRow) error { +func (d *DBStore) insertWorkflowExecution(ctx context.Context, execution *workflowExecutionRow) (*workflowExecutionRow, error) { sql := ` INSERT INTO workflow_executions(id, workflow_id, status, created_at) - VALUES ($1, $2, $3, $4) + VALUES ($1, $2, $3, $4) RETURNING * ` - _, err := d.db.ExecContext(ctx, sql, execution.ID, execution.WorkflowID, execution.Status, d.clock.Now()) - return err + wex := &workflowExecutionRow{} + err := d.db.GetContext(ctx, wex, sql, execution.ID, execution.WorkflowID, execution.Status, d.clock.Now()) + return wex, err } func (d *DBStore) transact(ctx context.Context, fn func(*DBStore) error) error { diff --git a/core/services/workflows/store/store_db_test.go b/core/services/workflows/store/store_db_test.go index 9a98db3056f..1b58f816e59 100644 --- a/core/services/workflows/store/store_db_test.go +++ b/core/services/workflows/store/store_db_test.go @@ -12,6 +12,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" "github.com/smartcontractkit/chainlink-common/pkg/values" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/smartcontractkit/chainlink/v2/core/logger" ) @@ -51,7 +52,7 @@ func Test_StoreDB(t *testing.T) { Status: StatusStarted, } - err := store.Add(tests.Context(t), &es) + _, err := store.Add(tests.Context(t), &es) require.NoError(t, err) gotEs, err := store.Get(tests.Context(t), es.ExecutionID) @@ -83,10 +84,10 @@ func Test_StoreDB_DuplicateEntry(t *testing.T) { Status: StatusStarted, } - err := store.Add(tests.Context(t), &es) + _, err := store.Add(tests.Context(t), &es) require.NoError(t, err) - err = store.Add(tests.Context(t), &es) + _, err = store.Add(tests.Context(t), &es) assert.ErrorContains(t, err, "duplicate key value violates") } @@ -111,7 +112,7 @@ func Test_StoreDB_UpdateStatus(t *testing.T) { Status: StatusStarted, } - err := store.Add(tests.Context(t), &es) + _, err := store.Add(tests.Context(t), &es) require.NoError(t, err) completedStatus := StatusCompleted @@ -147,7 +148,7 @@ func Test_StoreDB_UpdateStep(t *testing.T) { Status: StatusStarted, } - err := store.Add(tests.Context(t), &es) + _, err := store.Add(tests.Context(t), &es) require.NoError(t, err) stepOne.Status = StatusCompleted @@ -195,7 +196,7 @@ func Test_StoreDB_WorkflowStatus(t *testing.T) { Status: s, } - err := store.Add(tests.Context(t), &es) + _, err := store.Add(tests.Context(t), &es) require.NoError(t, err) } } @@ -223,7 +224,7 @@ func Test_StoreDB_WorkflowStepStatus(t *testing.T) { Status: StatusStarted, } - err := store.Add(tests.Context(t), &es) + _, err := store.Add(tests.Context(t), &es) require.NoError(t, err) for s := range ValidStatuses { @@ -256,7 +257,7 @@ func Test_StoreDB_GetUnfinishedSteps(t *testing.T) { Status: StatusStarted, } - err := store.Add(tests.Context(t), &es) + _, err := store.Add(tests.Context(t), &es) require.NoError(t, err) id = randomID() @@ -265,7 +266,7 @@ func Test_StoreDB_GetUnfinishedSteps(t *testing.T) { Status: StatusCompleted, Steps: map[string]*WorkflowExecutionStep{}, } - err = store.Add(tests.Context(t), &esTwo) + _, err = store.Add(tests.Context(t), &esTwo) require.NoError(t, err) states, err := store.GetUnfinished(tests.Context(t), 0, 100)