Skip to content

Commit 2d54ddc

Browse files
authored
chore(engine): adjustments to log task short circuiting (#20121)
1 parent 7156991 commit 2d54ddc

File tree

8 files changed

+86
-26
lines changed

8 files changed

+86
-26
lines changed

pkg/engine/internal/executor/pipeline.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,28 @@ type Pipeline interface {
2424
Close()
2525
}
2626

27+
// WrappedPipeline represents a pipeline that wraps another pipeline.
28+
type WrappedPipeline interface {
29+
Pipeline
30+
31+
// Unwrap returns the inner pipeline. Implementations must always return the
32+
// same non-nil value representing the inner pipeline.
33+
Unwrap() Pipeline
34+
}
35+
36+
// Unwrap recursively unwraps the provided pipeline. [WrappedPipeline.Unwrap] is
37+
// invoked for each wrapped pipeline until the first non-wrapped pipeline is
38+
// reached.
39+
func Unwrap(p Pipeline) Pipeline {
40+
for {
41+
wrapped, ok := p.(WrappedPipeline)
42+
if !ok {
43+
return p
44+
}
45+
p = wrapped.Unwrap()
46+
}
47+
}
48+
2749
// RegionProvider is an optional interface that pipelines can implement
2850
// to expose their associated xcap region for statistics collection.
2951
type RegionProvider interface {
@@ -353,6 +375,11 @@ func (p *observedPipeline) Read(ctx context.Context) (arrow.RecordBatch, error)
353375
return rec, err
354376
}
355377

378+
// Unwrap returns the underlying pipeline.
379+
func (p *observedPipeline) Unwrap() Pipeline {
380+
return p.inner
381+
}
382+
356383
// Close implements Pipeline.
357384
func (p *observedPipeline) Close() {
358385
p.inner.Close()

pkg/engine/internal/scheduler/scheduler.go

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -752,9 +752,9 @@ func (s *Scheduler) Start(ctx context.Context, tasks ...*workflow.Task) error {
752752

753753
// We set markPending *after* enqueueTasks to give tasks an opportunity to
754754
// immediately transition into running (lowering state transition noise).
755-
err = s.enqueueTasks(trackedTasks)
755+
s.enqueueTasks(trackedTasks)
756756
s.markPending(ctx, trackedTasks)
757-
return err
757+
return nil
758758
}
759759

760760
// findTasks gets a list of [task] from workflow tasks. Returns an error if any
@@ -781,17 +781,15 @@ func (s *Scheduler) findTasks(tasks []*workflow.Task) ([]*task, error) {
781781
return res, nil
782782
}
783783

784-
func (s *Scheduler) enqueueTasks(tasks []*task) error {
784+
func (s *Scheduler) enqueueTasks(tasks []*task) {
785785
s.assignMut.Lock()
786786
defer s.assignMut.Unlock()
787787

788-
var errs []error
789-
790788
for _, task := range tasks {
791-
// Only allow to enqueue tasks in the initial state (created). This
792-
// prevents tasks from accidentally being run multiple times.
789+
// Ignore tasks that aren't in the initial state (created). This
790+
// prevents us from rejecting tasks which were preemptively canceled by
791+
// callers.
793792
if got, want := task.status.State, workflow.TaskStateCreated; got != want {
794-
errs = append(errs, fmt.Errorf("task %s is in state %s, not %s", task.inner.ULID, got, want))
795793
continue
796794
}
797795

@@ -802,11 +800,6 @@ func (s *Scheduler) enqueueTasks(tasks []*task) error {
802800
if len(s.readyWorkers) > 0 && len(s.taskQueue) > 0 {
803801
nudgeSemaphore(s.assignSema)
804802
}
805-
806-
if len(errs) > 0 {
807-
return errors.Join(errs...)
808-
}
809-
return nil
810803
}
811804

812805
func (s *Scheduler) markPending(ctx context.Context, tasks []*task) {

pkg/engine/internal/scheduler/scheduler_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,7 @@ func TestScheduler_Start(t *testing.T) {
367367
require.Equal(t, workflow.TaskStatePending, taskStatus.State, "Started tasks should move to pending state")
368368
})
369369

370-
t.Run("Fails with existing task", func(t *testing.T) {
370+
t.Run("Ignores already started tasks", func(t *testing.T) {
371371
sched := newTestScheduler(t)
372372

373373
var (
@@ -380,8 +380,7 @@ func TestScheduler_Start(t *testing.T) {
380380
)
381381
require.NoError(t, sched.RegisterManifest(t.Context(), manifest), "Scheduler should accept valid manifest")
382382
require.NoError(t, sched.Start(t.Context(), exampleTask), "Scheduler should start registered task")
383-
384-
require.Error(t, sched.Start(t.Context(), exampleTask), "Scheduler should reject already started tasks")
383+
require.NoError(t, sched.Start(t.Context(), exampleTask), "Scheduler should ignore already started tasks")
385384
})
386385
}
387386

pkg/engine/internal/scheduler/task.go

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,16 +44,26 @@ var validTaskTransitions = map[workflow.TaskState][]workflow.TaskState{
4444
func (t *task) setState(m *metrics, newStatus workflow.TaskStatus) (bool, error) {
4545
oldState, newState := t.status.State, newStatus.State
4646

47-
if newState == oldState {
47+
switch {
48+
case newStatus != t.status && newState == oldState:
49+
// State is the same (so we don't have to validate transitions), but
50+
// there's a new payload about the status, so we should store it.
51+
t.status = newStatus
52+
return true, nil
53+
54+
case newState == oldState:
55+
// Status is the exact same, no need to update.
4856
return false, nil
49-
}
5057

51-
validStates := validTaskTransitions[oldState]
52-
if !slices.Contains(validStates, newState) {
53-
return false, fmt.Errorf("invalid state transition from %s to %s", oldState, newState)
58+
default:
59+
validStates := validTaskTransitions[oldState]
60+
if !slices.Contains(validStates, newState) {
61+
return false, fmt.Errorf("invalid state transition from %s to %s", oldState, newState)
62+
}
63+
64+
t.status = newStatus
65+
m.tasksTotal.WithLabelValues(newState.String()).Inc()
66+
return true, nil
5467
}
5568

56-
t.status = newStatus
57-
m.tasksTotal.WithLabelValues(newState.String()).Inc()
58-
return true, nil
5969
}

pkg/engine/internal/scheduler/wire/codec.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,13 @@ func (c *protobufCodec) taskStatusFromPbTaskStatus(ts *wirepb.TaskStatus) (workf
316316
status.Capture = capture
317317
}
318318

319+
if ts.ContributingTimeRange != nil {
320+
status.ContributingTimeRange = workflow.ContributingTimeRange{
321+
Timestamp: ts.ContributingTimeRange.Timestamp,
322+
LessThan: ts.ContributingTimeRange.LessThan,
323+
}
324+
}
325+
319326
return status, nil
320327
}
321328

@@ -571,6 +578,10 @@ func (c *protobufCodec) taskToPbTask(from *workflow.Task) (*wirepb.Task, error)
571578
func (c *protobufCodec) taskStatusToPbTaskStatus(from workflow.TaskStatus) (*wirepb.TaskStatus, error) {
572579
ts := &wirepb.TaskStatus{
573580
State: c.taskStateToPbTaskState(from.State),
581+
ContributingTimeRange: &wirepb.ContributingTimeRange{
582+
Timestamp: from.ContributingTimeRange.Timestamp,
583+
LessThan: from.ContributingTimeRange.LessThan,
584+
},
574585
}
575586

576587
if from.Error != nil {

pkg/engine/internal/scheduler/wire/codec_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"net"
66
"net/netip"
77
"testing"
8+
"time"
89

910
"github.com/apache/arrow-go/v18/arrow"
1011
"github.com/apache/arrow-go/v18/arrow/array"
@@ -126,6 +127,18 @@ func TestProtobufCodec_Messages(t *testing.T) {
126127
},
127128
},
128129
},
130+
"TaskStatusMessage with Running state and ContributingTimeRange": {
131+
message: TaskStatusMessage{
132+
ID: taskULID,
133+
Status: workflow.TaskStatus{
134+
State: workflow.TaskStateRunning,
135+
ContributingTimeRange: workflow.ContributingTimeRange{
136+
Timestamp: time.Now().Add(-time.Minute),
137+
LessThan: true,
138+
},
139+
},
140+
},
141+
},
129142
"TaskStatusMessage with Completed state": {
130143
message: TaskStatusMessage{
131144
ID: taskULID,

pkg/engine/internal/worker/thread.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ func (t *thread) runJob(ctx context.Context, job *threadJob) {
167167
// If the root pipeline can be interested in some specific contributing time range
168168
// then subscribe to changes.
169169
// TODO(spiridonov): find a way to subscribe on non-root pipelines.
170-
notifier, ok := pipeline.(executor.ContributingTimeRangeChangedNotifier)
170+
notifier, ok := executor.Unwrap(pipeline).(executor.ContributingTimeRangeChangedNotifier)
171171
if ok {
172172
notifier.SubscribeToTimeRangeChanges(func(ts time.Time, lessThan bool) {
173173
// Send a Running task status update with the current time range

pkg/engine/internal/workflow/workflow.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import (
1111
"github.com/go-kit/log"
1212
"github.com/go-kit/log/level"
1313
"github.com/oklog/ulid/v2"
14+
"github.com/prometheus/client_golang/prometheus"
15+
"github.com/prometheus/client_golang/prometheus/promauto"
1416

1517
"github.com/grafana/loki/v3/pkg/engine/internal/executor"
1618
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
@@ -19,6 +21,11 @@ import (
1921
"github.com/grafana/loki/v3/pkg/xcap"
2022
)
2123

24+
var shortCircuitsTotal = promauto.NewCounter(prometheus.CounterOpts{
25+
Name: "loki_engine_v2_task_short_circuits_total",
26+
Help: "Total number of tasks preemptively canceled by short circuiting.",
27+
})
28+
2229
// Options configures a [Workflow].
2330
type Options struct {
2431
// MaxRunningScanTasks specifies the maximum number of scan tasks that may
@@ -379,8 +386,8 @@ func (wf *Workflow) handleNonTerminalStateChange(ctx context.Context, task *Task
379386
// TODO(spiridonov): We do not check parents here right now, there is only 1 parent now,
380387
// but in general a task can be canceled only if all its parents are in terminal states OR
381388
// have non-inersecting contributing time range.
382-
383389
tasksToCancel = append(tasksToCancel, child)
390+
shortCircuitsTotal.Inc()
384391
}
385392
}
386393
wf.tasksMut.RUnlock()

0 commit comments

Comments
 (0)