Skip to content

Commit d02e55c

Browse files
authored
chore: remove limit physical plan node (#19682)
1 parent d280b20 commit d02e55c

File tree

8 files changed

+205
-84
lines changed

8 files changed

+205
-84
lines changed

pkg/engine/internal/planner/logical/builder.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,20 @@ func (b *Builder) Sort(column ColumnRef, ascending, nullsFirst bool) *Builder {
8484
}
8585
}
8686

87+
// TopK applies a [TopK] operation to the Builder.
88+
func (b *Builder) TopK(sortBy *ColumnRef, K int, ascending, nullsFirst bool) *Builder {
89+
return &Builder{
90+
val: &TopK{
91+
Table: b.val,
92+
93+
SortBy: sortBy,
94+
Ascending: ascending,
95+
NullsFirst: nullsFirst,
96+
K: K,
97+
},
98+
}
99+
}
100+
87101
// BinOpRight adds a binary arithmetic operation with a given right value
88102
func (b *Builder) BinOpRight(op types.BinaryOp, right Value) *Builder {
89103
return &Builder{

pkg/engine/internal/planner/logical/builder_convert.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ func (b *ssaBuilder) process(value Value) (Value, error) {
4141
return b.processLimitPlan(value)
4242
case *Sort:
4343
return b.processSortPlan(value)
44+
case *TopK:
45+
return b.processTopKPlan(value)
4446
case *Projection:
4547
return b.processProjection(value)
4648
case *RangeAggregation:
@@ -150,6 +152,19 @@ func (b *ssaBuilder) processSortPlan(plan *Sort) (Value, error) {
150152
return plan, nil
151153
}
152154

155+
func (b *ssaBuilder) processTopKPlan(plan *TopK) (Value, error) {
156+
if _, err := b.process(plan.Table); err != nil {
157+
return nil, err
158+
}
159+
160+
// Only append the first time we see this.
161+
if plan.id == "" {
162+
plan.id = fmt.Sprintf("%%%d", b.getID())
163+
b.instructions = append(b.instructions, plan)
164+
}
165+
return plan, nil
166+
}
167+
153168
func (b *ssaBuilder) processUnaryOp(value *UnaryOp) (Value, error) {
154169
if _, err := b.process(value.Value); err != nil {
155170
return nil, err

pkg/engine/internal/planner/logical/format_tree.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ func (t *treeFormatter) convert(value Value) *tree.Node {
2929
return t.convertLimit(value)
3030
case *Sort:
3131
return t.convertSort(value)
32+
case *TopK:
33+
return t.convertTopK(value)
3234
case *RangeAggregation:
3335
return t.convertRangeAggregation(value)
3436
case *VectorAggregation:
@@ -101,6 +103,29 @@ func (t *treeFormatter) convertSort(ast *Sort) *tree.Node {
101103
return node
102104
}
103105

106+
func (t *treeFormatter) convertTopK(ast *TopK) *tree.Node {
107+
direction := "asc"
108+
if !ast.Ascending {
109+
direction = "desc"
110+
}
111+
112+
nullsPosition := "last"
113+
if ast.NullsFirst {
114+
nullsPosition = "first"
115+
}
116+
117+
node := tree.NewNode("TOPK", ast.Name(),
118+
tree.NewProperty("table", false, ast.Table.Name()),
119+
tree.NewProperty("sort_by", false, ast.SortBy.Name()),
120+
tree.NewProperty("k", false, ast.K),
121+
tree.NewProperty("direction", false, direction),
122+
tree.NewProperty("nulls", false, nullsPosition),
123+
)
124+
node.Comments = append(node.Comments, t.convert(ast.SortBy))
125+
node.Children = append(node.Children, t.convert(ast.Table))
126+
return node
127+
}
128+
104129
func (t *treeFormatter) convertUnaryOp(expr *UnaryOp) *tree.Node {
105130
node := tree.NewNode("UnaryOp", expr.Name(),
106131
tree.NewProperty("op", false, expr.Op.String()),
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package logical
2+
3+
import (
4+
"fmt"
5+
)
6+
7+
// TopK represents a plan node that performs topK operation.
8+
// Topk only identifies which rows belong in the top K, but does not
9+
// guarantee any specific ordering of those rows in the compacted output. Callers
10+
// should sort the result if a specific order is required.
11+
12+
// The TopK instruction find the top K rows from a table relation. TopK implements both
13+
// [Instruction] and [Value].
14+
type TopK struct {
15+
id string
16+
17+
Table Value // The table relation to sort.
18+
19+
SortBy *ColumnRef // The column to sort by.
20+
Ascending bool // Whether to sort in ascending order.
21+
NullsFirst bool // Controls whether NULLs appear first (true) or last (false).
22+
K int // Number of top rows to return.
23+
}
24+
25+
var (
26+
_ Value = (*TopK)(nil)
27+
_ Instruction = (*TopK)(nil)
28+
)
29+
30+
// Name returns an identifier for the TopK operation.
31+
func (s *TopK) Name() string {
32+
if s.id != "" {
33+
return s.id
34+
}
35+
return fmt.Sprintf("%p", s)
36+
}
37+
38+
// String returns the disassembled SSA form of the TopK instruction.
39+
func (s *TopK) String() string {
40+
return fmt.Sprintf(
41+
"TOPK %s [sort_by=%s, k=%d, asc=%t, nulls_first=%t]",
42+
s.Table.Name(),
43+
s.SortBy.String(),
44+
s.K,
45+
s.Ascending,
46+
s.NullsFirst,
47+
)
48+
}
49+
50+
func (s *TopK) isInstruction() {}
51+
func (s *TopK) isValue() {}

pkg/engine/internal/planner/logical/planner.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -203,14 +203,9 @@ func buildPlanForLogQuery(
203203

204204
// Metric queries do not apply a limit.
205205
if !isMetricQuery {
206-
// SORT -> SortMerge
207206
// We always sort DESC. ASC timestamp sorting is not supported for logs
208207
// queries, and metric queries do not need sorting.
209-
builder = builder.Sort(*timestampColumnRef(), false, false)
210-
211-
// LIMIT -> Limit
212-
limit := params.Limit()
213-
builder = builder.Limit(0, limit)
208+
builder = builder.TopK(timestampColumnRef(), int(params.Limit()), false, false)
214209
}
215210

216211
return builder.Value(), nil

pkg/engine/internal/planner/logical/planner_test.go

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -108,10 +108,9 @@ func TestConvertAST_Success(t *testing.T) {
108108
%16 = SELECT %14 [predicate=%15]
109109
%17 = SELECT %16 [predicate=%6]
110110
%18 = SELECT %17 [predicate=%11]
111-
%19 = SORT %18 [column=builtin.timestamp, asc=false, nulls_first=false]
112-
%20 = LIMIT %19 [skip=0, fetch=1000]
113-
%21 = LOGQL_COMPAT %20
114-
RETURN %21
111+
%19 = TOPK %18 [sort_by=builtin.timestamp, k=1000, asc=false, nulls_first=false]
112+
%20 = LOGQL_COMPAT %19
113+
RETURN %20
115114
`
116115

117116
require.Equal(t, expected, logicalPlan.String())
@@ -471,10 +470,9 @@ RETURN %12
471470
%7 = PROJECT %6 [mode=*E, expr=PARSE_LOGFMT(builtin.message, [], false, false)]
472471
%8 = EQ ambiguous.level "error"
473472
%9 = SELECT %7 [predicate=%8]
474-
%10 = SORT %9 [column=builtin.timestamp, asc=false, nulls_first=false]
475-
%11 = LIMIT %10 [skip=0, fetch=1000]
476-
%12 = LOGQL_COMPAT %11
477-
RETURN %12
473+
%10 = TOPK %9 [sort_by=builtin.timestamp, k=1000, asc=false, nulls_first=false]
474+
%11 = LOGQL_COMPAT %10
475+
RETURN %11
478476
`
479477
require.Equal(t, expected, plan.String())
480478
})
@@ -532,10 +530,9 @@ RETURN %12
532530
%7 = PROJECT %6 [mode=*E, expr=PARSE_JSON(builtin.message, [], false, false)]
533531
%8 = EQ ambiguous.level "error"
534532
%9 = SELECT %7 [predicate=%8]
535-
%10 = SORT %9 [column=builtin.timestamp, asc=false, nulls_first=false]
536-
%11 = LIMIT %10 [skip=0, fetch=1000]
537-
%12 = LOGQL_COMPAT %11
538-
RETURN %12
533+
%10 = TOPK %9 [sort_by=builtin.timestamp, k=1000, asc=false, nulls_first=false]
534+
%11 = LOGQL_COMPAT %10
535+
RETURN %11
539536
`
540537
require.Equal(t, expected, plan.String())
541538
})
@@ -571,10 +568,9 @@ RETURN %12
571568
%11 = PROJECT %10 [mode=*E, expr=PARSE_LOGFMT(builtin.message, [], false, false)]
572569
%12 = EQ ambiguous.level "debug"
573570
%13 = SELECT %11 [predicate=%12]
574-
%14 = SORT %13 [column=builtin.timestamp, asc=false, nulls_first=false]
575-
%15 = LIMIT %14 [skip=0, fetch=1000]
576-
%16 = LOGQL_COMPAT %15
577-
RETURN %16
571+
%14 = TOPK %13 [sort_by=builtin.timestamp, k=1000, asc=false, nulls_first=false]
572+
%15 = LOGQL_COMPAT %14
573+
RETURN %15
578574
`
579575

580576
require.Equal(t, expected, plan.String(), "Operations should be in the correct order: LineFilter before Parse, LabelFilter after Parse")
@@ -642,10 +638,9 @@ func TestPlannerCreatesProjection(t *testing.T) {
642638
%5 = LT builtin.timestamp 1970-01-01T01:00:00Z
643639
%6 = SELECT %4 [predicate=%5]
644640
%7 = PROJECT %6 [mode=*D, expr=ambiguous.level, expr=ambiguous.detected_level]
645-
%8 = SORT %7 [column=builtin.timestamp, asc=false, nulls_first=false]
646-
%9 = LIMIT %8 [skip=0, fetch=0]
647-
%10 = LOGQL_COMPAT %9
648-
RETURN %10
641+
%8 = TOPK %7 [sort_by=builtin.timestamp, k=0, asc=false, nulls_first=false]
642+
%9 = LOGQL_COMPAT %8
643+
RETURN %9
649644
`
650645
require.Equal(t, expected, plan.String())
651646
})

pkg/engine/internal/planner/physical/planner.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,8 @@ func (p *Planner) process(inst logical.Value, ctx *Context) (Node, error) {
158158
return p.processSort(inst, ctx)
159159
case *logical.Limit:
160160
return p.processLimit(inst, ctx)
161+
case *logical.TopK:
162+
return p.processTopK(inst, ctx)
161163
case *logical.RangeAggregation:
162164
return p.processRangeAggregation(inst, ctx)
163165
case *logical.VectorAggregation:
@@ -301,6 +303,35 @@ func (p *Planner) processSort(lp *logical.Sort, ctx *Context) (Node, error) {
301303
return node, nil
302304
}
303305

306+
// processTopK processes a [logical.TopK] node.
307+
func (p *Planner) processTopK(lp *logical.TopK, ctx *Context) (Node, error) {
308+
order := DESC
309+
if lp.Ascending {
310+
order = ASC
311+
}
312+
313+
node := &TopK{
314+
NodeID: ulid.Make(),
315+
316+
SortBy: &ColumnExpr{Ref: lp.SortBy.Ref},
317+
Ascending: order == ASC,
318+
NullsFirst: lp.NullsFirst,
319+
K: lp.K,
320+
}
321+
322+
p.plan.graph.Add(node)
323+
324+
child, err := p.process(lp.Table, ctx.WithDirection(order))
325+
if err != nil {
326+
return nil, err
327+
}
328+
329+
if err := p.plan.graph.AddEdge(dag.Edge[Node]{Parent: node, Child: child}); err != nil {
330+
return nil, err
331+
}
332+
return node, nil
333+
}
334+
304335
// Converts a [logical.Projection] into a physical [Projection] node.
305336
func (p *Planner) processProjection(lp *logical.Projection, ctx *Context) (Node, error) {
306337
expressions := make([]Expression, len(lp.Expressions))

0 commit comments

Comments
 (0)