Skip to content

Commit 4b118fb

Browse files
authored
chore: Admission lanes for workflow tasks (#19584)
This PR introduces "admission lanes" for dispatching tasks to the runner (scheduler), that are controlled by token buckets, to avoid overflowing the runner with a large amount of tasks. Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
1 parent d02e55c commit 4b118fb

File tree

12 files changed

+340
-16
lines changed

12 files changed

+340
-16
lines changed

docs/sources/shared/configuration.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4707,6 +4707,12 @@ otlp_config:
47074707
# override is set, the encryption context will not be provided to S3. Ignored if
47084708
# the SSE type override is not set.
47094709
[s3_sse_kms_encryption_context: <string> | default = ""]
4710+
4711+
# Experimental: Controls the amount of scan tasks that can be running in
4712+
# parallel in the new query engine. The default of 0 means unlimited parallelism
4713+
# and all tasks will be scheduled at once.
4714+
# CLI flag: -limits.max-scan-task-parallelism
4715+
[max_scan_task_parallelism: <int> | default = 0]
47104716
```
47114717
47124718
### local_storage_config

pkg/engine/engine.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,11 @@ func (e *Engine) buildWorkflow(ctx context.Context, logger log.Logger, physicalP
323323
span := trace.SpanFromContext(ctx)
324324
timer := prometheus.NewTimer(e.metrics.workflowPlanning)
325325

326-
wf, err := workflow.New(logger, tenantID, e.scheduler.inner, physicalPlan)
326+
opts := workflow.Options{
327+
MaxRunningScanTasks: e.limits.MaxScanTaskParallelism(tenantID),
328+
MaxRunningOtherTasks: 0,
329+
}
330+
wf, err := workflow.New(opts, logger, tenantID, e.scheduler.inner, physicalPlan)
327331
if err != nil {
328332
level.Warn(logger).Log("msg", "failed to create workflow", "err", err)
329333
span.RecordError(err)

pkg/engine/internal/worker/worker_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,11 @@ func buildWorkflow(ctx context.Context, t *testing.T, logger log.Logger, loc obj
158158
fmt.Fprintln(os.Stderr, physical.PrintAsTree(plan))
159159
}
160160

161-
wf, err := workflow.New(logger, objtest.Tenant, sched, plan)
161+
opts := workflow.Options{
162+
MaxRunningScanTasks: 32,
163+
MaxRunningOtherTasks: 0, // unlimited
164+
}
165+
wf, err := workflow.New(opts, logger, objtest.Tenant, sched, plan)
162166
require.NoError(t, err)
163167

164168
if testing.Verbose() {
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package workflow
2+
3+
import (
4+
"math"
5+
6+
"golang.org/x/sync/semaphore"
7+
8+
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
9+
)
10+
11+
type taskType string
12+
13+
const (
14+
taskTypeScan taskType = "scan"
15+
taskTypeOther taskType = "other"
16+
)
17+
18+
type admissionLane struct {
19+
*semaphore.Weighted
20+
capacity int64
21+
lane taskType
22+
}
23+
24+
func newAdmissionLane(lane taskType, capacity int64) *admissionLane {
25+
return &admissionLane{
26+
Weighted: semaphore.NewWeighted(capacity),
27+
capacity: capacity,
28+
lane: lane,
29+
}
30+
}
31+
32+
// admissionControl is a control structure to lookup "admission lanes" for different types of tasks.
33+
// It is a lightweight wrapper around a mapping of task type to admission lane.
34+
type admissionControl struct {
35+
mapping map[taskType]*admissionLane
36+
}
37+
38+
func newAdmissionControl(maxScanTasks, maxOtherTasks int64) *admissionControl {
39+
if maxScanTasks < 1 {
40+
maxScanTasks = math.MaxInt64
41+
}
42+
if maxOtherTasks < 1 {
43+
maxOtherTasks = math.MaxInt64
44+
}
45+
46+
return &admissionControl{
47+
mapping: map[taskType]*admissionLane{
48+
taskTypeScan: newAdmissionLane(taskTypeScan, maxScanTasks),
49+
taskTypeOther: newAdmissionLane(taskTypeOther, maxOtherTasks),
50+
},
51+
}
52+
}
53+
54+
// groupByBucket categorizes a slice of tasks into groups based on their characteristics (scan, other, ...).
55+
func (ac *admissionControl) groupByType(tasks []*Task) map[taskType][]*Task {
56+
groups := map[taskType][]*Task{
57+
taskTypeScan: make([]*Task, 0, len(tasks)),
58+
taskTypeOther: make([]*Task, 0, len(tasks)),
59+
}
60+
61+
for _, t := range tasks {
62+
ty := ac.typeFor(t)
63+
groups[ty] = append(groups[ty], t)
64+
}
65+
66+
return groups
67+
}
68+
69+
func (ac *admissionControl) typeFor(task *Task) taskType {
70+
if isScanTask(task) {
71+
return taskTypeScan
72+
}
73+
return taskTypeOther
74+
}
75+
76+
func (ac *admissionControl) laneFor(task *Task) *admissionLane {
77+
return ac.mapping[ac.typeFor(task)]
78+
}
79+
80+
func (ac *admissionControl) get(ty taskType) *admissionLane {
81+
return ac.mapping[ty]
82+
}
83+
84+
func isScanTask(task *Task) bool {
85+
for node := range task.Fragment.Graph().Nodes() {
86+
if node.Type() == physical.NodeTypeDataObjScan {
87+
return true
88+
}
89+
}
90+
return false
91+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package workflow
2+
3+
import (
4+
"math"
5+
"testing"
6+
7+
"github.com/oklog/ulid/v2"
8+
"github.com/stretchr/testify/require"
9+
10+
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
11+
"github.com/grafana/loki/v3/pkg/engine/internal/util/dag"
12+
)
13+
14+
func TestAdmissionControl_getBucket(t *testing.T) {
15+
ac := newAdmissionControl(32, math.MaxInt64)
16+
17+
t.Run("Task without a DataObjScan node is considered an 'other' task", func(t *testing.T) {
18+
fragment := dag.Graph[physical.Node]{}
19+
task := &Task{
20+
ULID: ulid.Make(),
21+
Fragment: physical.FromGraph(fragment),
22+
}
23+
bucket := ac.typeFor(task)
24+
require.Equal(t, taskTypeOther, bucket)
25+
})
26+
27+
t.Run("Task with a DataObjScan node is considered an 'scan' task", func(t *testing.T) {
28+
fragment := dag.Graph[physical.Node]{}
29+
fragment.Add(&physical.DataObjScan{})
30+
31+
task := &Task{
32+
ULID: ulid.Make(),
33+
Fragment: physical.FromGraph(fragment),
34+
}
35+
ty := ac.typeFor(task)
36+
require.Equal(t, taskTypeScan, ty)
37+
})
38+
}

pkg/engine/internal/workflow/workflow.go

Lines changed: 67 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ package workflow
44

55
import (
66
"context"
7+
"fmt"
78
"sync"
89

910
"github.com/apache/arrow-go/v18/arrow"
@@ -17,9 +18,15 @@ import (
1718
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
1819
)
1920

21+
type Options struct {
22+
MaxRunningScanTasks int
23+
MaxRunningOtherTasks int
24+
}
25+
2026
// Workflow represents a physical plan that has been partitioned into
2127
// parallelizable tasks.
2228
type Workflow struct {
29+
opts Options
2330
logger log.Logger
2431
runner Runner
2532
graph dag.Graph[*Task]
@@ -33,14 +40,16 @@ type Workflow struct {
3340

3441
streamsMut sync.RWMutex
3542
streamStates map[*Stream]StreamState
43+
44+
admissionControl *admissionControl
3645
}
3746

3847
// New creates a new Workflow from a physical plan. New returns an error if the
3948
// physical plan does not have exactly one root node, or if the physical plan
4049
// cannot be partitioned into a Workflow.
4150
//
4251
// The provided Runner will be used for Workflow execution.
43-
func New(logger log.Logger, tenantID string, runner Runner, plan *physical.Plan) (*Workflow, error) {
52+
func New(opts Options, logger log.Logger, tenantID string, runner Runner, plan *physical.Plan) (*Workflow, error) {
4453
graph, err := planWorkflow(tenantID, plan)
4554
if err != nil {
4655
return nil, err
@@ -53,6 +62,7 @@ func New(logger log.Logger, tenantID string, runner Runner, plan *physical.Plan)
5362
}
5463

5564
return &Workflow{
65+
opts: opts,
5666
logger: logger,
5767
runner: runner,
5868
graph: graph,
@@ -133,19 +143,54 @@ func (wf *Workflow) Run(ctx context.Context) (pipeline executor.Pipeline, err er
133143
}
134144
}
135145

136-
// TODO(rfratto): For logs queries, we want a system to limit how many scan
137-
// tasks get sent to the runner at once.
138-
//
139-
// This will limit unnecessary resource consumption of workflows when
140-
// there's a lot of compute capacity.
141-
if err := wf.runner.Start(ctx, wrappedHandler, tasks...); err != nil {
142-
pipeline.Close()
143-
return nil, err
144-
}
146+
// Start dispatching in background goroutine
147+
go func() {
148+
err := wf.dispatchTasks(ctx, wrappedHandler, tasks)
149+
if err != nil {
150+
wrapped.SetError(err)
151+
wrapped.Close()
152+
}
153+
}()
145154

146155
return wrapped, nil
147156
}
148157

158+
// dispatchTasks groups the slice of tasks by their associated "admission lane" (token bucket)
159+
// and dispatches them to the runner.
160+
// Tasks from different admission lanes are dispatched concurrently.
161+
// The caller needs to wait on the returned error group.
162+
func (wf *Workflow) dispatchTasks(ctx context.Context, handler TaskEventHandler, tasks []*Task) error {
163+
wf.admissionControl = newAdmissionControl(
164+
int64(wf.opts.MaxRunningScanTasks),
165+
int64(wf.opts.MaxRunningOtherTasks),
166+
)
167+
168+
groups := wf.admissionControl.groupByType(tasks)
169+
for _, taskType := range []taskType{
170+
taskTypeOther,
171+
taskTypeScan,
172+
} {
173+
lane := wf.admissionControl.get(taskType)
174+
tasks := groups[taskType]
175+
176+
var offset int64
177+
total := int64(len(tasks))
178+
maxBatchSize := min(total, lane.capacity)
179+
180+
for ; offset < total; offset += maxBatchSize {
181+
batchSize := min(maxBatchSize, total-offset)
182+
if err := lane.Acquire(ctx, batchSize); err != nil {
183+
return fmt.Errorf("failed to acquire tokens from admission lane %s: %w", taskType, err)
184+
}
185+
if err := wf.runner.Start(ctx, handler, tasks[offset:offset+batchSize]...); err != nil {
186+
return fmt.Errorf("failed to start tasks: %w", err)
187+
}
188+
}
189+
}
190+
191+
return nil
192+
}
193+
149194
func (wf *Workflow) allStreams() []*Stream {
150195
var (
151196
result []*Stream
@@ -211,13 +256,25 @@ func (wf *Workflow) onTaskChange(ctx context.Context, task *Task, newStatus Task
211256
level.Debug(wf.logger).Log("msg", "task state change", "task_id", task.ULID, "new_state", newStatus.State)
212257

213258
wf.tasksMut.Lock()
259+
oldState := wf.taskStates[task]
214260
wf.taskStates[task] = newStatus.State
215261
wf.tasksMut.Unlock()
216262

263+
if oldState == newStatus.State {
264+
return
265+
}
266+
217267
if !newStatus.State.Terminal() {
218268
return
219269
}
220270

271+
if wf.admissionControl == nil {
272+
level.Warn(wf.logger).Log("msg", "admission control was not initialised")
273+
} else if oldState == TaskStatePending || oldState == TaskStateRunning {
274+
// Release tokens only if the task was already enqueued and therefore either pending or running.
275+
defer wf.admissionControl.laneFor(task).Release(1)
276+
}
277+
221278
if newStatus.Statistics != nil {
222279
wf.mergeResults(*newStatus.Statistics)
223280
}

0 commit comments

Comments
 (0)