Skip to content

Commit ff555dd

Browse files
authored
Merge branch 'mergeinto-runtime-filter' into main
2 parents 8ce9622 + a226452 commit ff555dd

23 files changed

+871
-333
lines changed

src/query/service/src/interpreters/interpreter_merge_into.rs

Lines changed: 58 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,14 @@ use databend_common_exception::Result;
2323
use databend_common_expression::types::UInt32Type;
2424
use databend_common_expression::ConstantFolder;
2525
use databend_common_expression::DataBlock;
26+
use databend_common_expression::DataField;
2627
use databend_common_expression::DataSchema;
2728
use databend_common_expression::DataSchemaRef;
2829
use databend_common_expression::FieldIndex;
2930
use databend_common_expression::FromData;
3031
use databend_common_expression::RemoteExpr;
3132
use databend_common_expression::SendableDataBlockStream;
33+
use databend_common_expression::ROW_ID_COL_NAME;
3234
use databend_common_expression::ROW_NUMBER_COL_NAME;
3335
use databend_common_functions::BUILTIN_FUNCTIONS;
3436
use databend_common_meta_app::schema::TableInfo;
@@ -162,6 +164,7 @@ impl MergeIntoInterpreter {
162164
field_index_map,
163165
merge_type,
164166
distributed,
167+
change_join_order,
165168
..
166169
} = &self.plan;
167170

@@ -181,20 +184,18 @@ impl MergeIntoInterpreter {
181184
let table_name = table_name.clone();
182185
let input = input.clone();
183186

184-
let input = if let RelOperator::Exchange(_) = input.plan() {
185-
Box::new(input.child(0)?.clone())
187+
// we need to extract join plan, but we need to give this exchange
188+
// back at last.
189+
let (input, extract_exchange) = if let RelOperator::Exchange(_) = input.plan() {
190+
(Box::new(input.child(0)?.clone()), true)
186191
} else {
187-
input
192+
(input, false)
188193
};
189194

190-
let optimized_input =
191-
Self::build_static_filter(&input, meta_data, self.ctx.clone(), check_table).await?;
192195
let mut builder = PhysicalPlanBuilder::new(meta_data.clone(), self.ctx.clone(), false);
193-
194196
// build source for MergeInto
195-
let join_input = builder
196-
.build(&optimized_input, *columns_set.clone())
197-
.await?;
197+
let join_input = builder.build(&input, *columns_set.clone()).await?;
198+
198199

199200
// find row_id column index
200201
let join_output_schema = join_input.output_schema()?;
@@ -227,7 +228,7 @@ impl MergeIntoInterpreter {
227228
}
228229
}
229230

230-
if *distributed {
231+
if *distributed && !*change_join_order {
231232
row_number_idx = Some(join_output_schema.index_of(ROW_NUMBER_COL_NAME)?);
232233
}
233234

@@ -238,7 +239,7 @@ impl MergeIntoInterpreter {
238239
));
239240
}
240241

241-
if *distributed && row_number_idx.is_none() {
242+
if *distributed && row_number_idx.is_none() && !*change_join_order {
242243
return Err(ErrorCode::InvalidRowIdIndex(
243244
"can't get internal row_number_idx when running merge into",
244245
));
@@ -258,11 +259,28 @@ impl MergeIntoInterpreter {
258259

259260
// merge_into_source is used to recv join's datablocks and split them into macthed and not matched
260261
// datablocks.
261-
let merge_into_source = PhysicalPlan::MergeIntoSource(MergeIntoSource {
262-
input: Box::new(join_input),
263-
row_id_idx: row_id_idx as u32,
264-
merge_type: merge_type.clone(),
265-
});
262+
let merge_into_source = if !*distributed && extract_exchange {
263+
// if we doesn't support distributed merge into, we should give the exchange merge back.
264+
let rollback_join_input = PhysicalPlan::Exchange(Exchange {
265+
plan_id: 0,
266+
input: Box::new(join_input),
267+
kind: FragmentKind::Merge,
268+
keys: vec![],
269+
allow_adjust_parallelism: true,
270+
ignore_exchange: false,
271+
});
272+
PhysicalPlan::MergeIntoSource(MergeIntoSource {
273+
input: Box::new(rollback_join_input),
274+
row_id_idx: row_id_idx as u32,
275+
merge_type: merge_type.clone(),
276+
})
277+
} else {
278+
PhysicalPlan::MergeIntoSource(MergeIntoSource {
279+
input: Box::new(join_input),
280+
row_id_idx: row_id_idx as u32,
281+
merge_type: merge_type.clone(),
282+
})
283+
};
266284

267285
// transform unmatched for insert
268286
// reference to func `build_eval_scalar`
@@ -399,6 +417,7 @@ impl MergeIntoInterpreter {
399417
distributed: false,
400418
output_schema: DataSchemaRef::default(),
401419
merge_type: merge_type.clone(),
420+
change_join_order: *change_join_order,
402421
}))
403422
} else {
404423
let merge_append = PhysicalPlan::MergeInto(Box::new(MergeInto {
@@ -409,14 +428,30 @@ impl MergeIntoInterpreter {
409428
matched,
410429
field_index_of_input_schema,
411430
row_id_idx,
412-
segments,
431+
segments: segments.clone(),
413432
distributed: true,
414-
output_schema: DataSchemaRef::new(DataSchema::new(vec![
415-
join_output_schema.fields[row_number_idx.unwrap()].clone(),
416-
])),
433+
output_schema: match *change_join_order {
434+
false => DataSchemaRef::new(DataSchema::new(vec![
435+
join_output_schema.fields[row_number_idx.unwrap()].clone(),
436+
])),
437+
true => DataSchemaRef::new(DataSchema::new(vec![DataField::new(
438+
ROW_ID_COL_NAME,
439+
databend_common_expression::types::DataType::Number(
440+
databend_common_expression::types::NumberDataType::UInt64,
441+
),
442+
)])),
443+
},
417444
merge_type: merge_type.clone(),
445+
change_join_order: *change_join_order,
418446
}));
419-
447+
// if change_join_order = true, it means the target is build side,
448+
// in this way, we will do matched operation and not matched operation
449+
// locally in every node, and the main node just receive rowids to apply.
450+
let segments = if *change_join_order {
451+
segments.clone()
452+
} else {
453+
vec![]
454+
};
420455
PhysicalPlan::MergeIntoAppendNotMatched(Box::new(MergeIntoAppendNotMatched {
421456
input: Box::new(PhysicalPlan::Exchange(Exchange {
422457
plan_id: 0,
@@ -431,6 +466,8 @@ impl MergeIntoInterpreter {
431466
unmatched: unmatched.clone(),
432467
input_schema: merge_into_source.output_schema()?,
433468
merge_type: merge_type.clone(),
469+
change_join_order: *change_join_order,
470+
segments,
434471
}))
435472
};
436473

src/query/service/src/interpreters/interpreter_merge_into_static_filter.rs

Lines changed: 55 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -56,13 +56,15 @@ use crate::interpreters::interpreter_merge_into::MergeIntoInterpreter;
5656
use crate::interpreters::InterpreterFactory;
5757
use crate::sessions::QueryContext;
5858

59+
#[allow(dead_code)]
5960
struct MergeStyleJoin<'a> {
60-
source_conditions: &'a [ScalarExpr],
61-
target_conditions: &'a [ScalarExpr],
62-
source_sexpr: &'a SExpr,
63-
target_sexpr: &'a SExpr,
61+
build_conditions: &'a [ScalarExpr],
62+
probe_conditions: &'a [ScalarExpr],
63+
build_sexpr: &'a SExpr,
64+
probe_sexpr: &'a SExpr,
6465
}
6566

67+
#[allow(dead_code)]
6668
impl MergeStyleJoin<'_> {
6769
pub fn new(join: &SExpr) -> MergeStyleJoin {
6870
let join_op = match join.plan() {
@@ -73,25 +75,27 @@ impl MergeStyleJoin<'_> {
7375
join_op.join_type == JoinType::Right
7476
|| join_op.join_type == JoinType::RightAnti
7577
|| join_op.join_type == JoinType::Inner
78+
|| join_op.join_type == JoinType::Left
79+
|| join_op.join_type == JoinType::LeftAnti
7680
);
77-
let source_conditions = &join_op.right_conditions;
78-
let target_conditions = &join_op.left_conditions;
79-
let source_sexpr = join.child(1).unwrap();
80-
let target_sexpr = join.child(0).unwrap();
81+
let build_conditions = &join_op.right_conditions;
82+
let probe_conditions = &join_op.left_conditions;
83+
let build_sexpr = join.child(1).unwrap();
84+
let probe_sexpr = join.child(0).unwrap();
8185
MergeStyleJoin {
82-
source_conditions,
83-
target_conditions,
84-
source_sexpr,
85-
target_sexpr,
86+
build_conditions,
87+
probe_conditions,
88+
build_sexpr,
89+
probe_sexpr,
8690
}
8791
}
8892

8993
pub fn collect_column_map(&self) -> HashMap<String, ColumnBinding> {
9094
let mut column_map = HashMap::new();
9195
for (t, s) in self
92-
.target_conditions
96+
.probe_conditions
9397
.iter()
94-
.zip(self.source_conditions.iter())
98+
.zip(self.build_conditions.iter())
9599
{
96100
if let (ScalarExpr::BoundColumnRef(t_col), ScalarExpr::BoundColumnRef(s_col)) = (t, s) {
97101
column_map.insert(t_col.column.column_name.clone(), s_col.column.clone());
@@ -101,6 +105,7 @@ impl MergeStyleJoin<'_> {
101105
}
102106
}
103107

108+
#[allow(dead_code)]
104109
impl MergeIntoInterpreter {
105110
pub async fn build_static_filter(
106111
join: &SExpr,
@@ -119,7 +124,7 @@ impl MergeIntoInterpreter {
119124
// \
120125
// SourcePlan
121126
let m_join = MergeStyleJoin::new(join);
122-
if m_join.source_conditions.is_empty() {
127+
if m_join.build_conditions.is_empty() {
123128
return Ok(Box::new(join.clone()));
124129
}
125130
let column_map = m_join.collect_column_map();
@@ -181,9 +186,9 @@ impl MergeIntoInterpreter {
181186

182187
// 2. build filter and push down to target side
183188
ctx.set_status_info("building pushdown filters");
184-
let mut filters = Vec::with_capacity(m_join.target_conditions.len());
189+
let mut filters = Vec::with_capacity(m_join.probe_conditions.len());
185190

186-
for (i, target_side_expr) in m_join.target_conditions.iter().enumerate() {
191+
for (i, target_side_expr) in m_join.probe_conditions.iter().enumerate() {
187192
let mut filter_parts = vec![];
188193
for block in blocks.iter() {
189194
let block = block.convert_to_full();
@@ -225,11 +230,11 @@ impl MergeIntoInterpreter {
225230
}
226231
filters.extend(Self::combine_filter_parts(&filter_parts).into_iter());
227232
}
228-
let mut target_plan = m_join.target_sexpr.clone();
229-
Self::push_down_filters(&mut target_plan, &filters)?;
230-
let source_plan = m_join.source_sexpr;
233+
let mut probe_plan = m_join.probe_sexpr.clone();
234+
Self::push_down_filters(&mut probe_plan, &filters)?;
235+
let build_plan = m_join.build_sexpr;
231236
let new_sexpr =
232-
join.replace_children(vec![Arc::new(target_plan), Arc::new(source_plan.clone())]);
237+
join.replace_children(vec![Arc::new(probe_plan), Arc::new(build_plan.clone())]);
233238

234239
ctx.set_status_info("join expression replaced");
235240
Ok(Box::new(new_sexpr))
@@ -381,9 +386,9 @@ impl MergeIntoInterpreter {
381386
metadata: &MetadataRef,
382387
group_expr: ScalarExpr,
383388
) -> Result<Plan> {
384-
let mut eval_scalar_items = Vec::with_capacity(m_join.source_conditions.len());
385-
let mut min_max_binding = Vec::with_capacity(m_join.source_conditions.len() * 2);
386-
let mut min_max_scalar_items = Vec::with_capacity(m_join.source_conditions.len() * 2);
389+
let mut eval_scalar_items = Vec::with_capacity(m_join.build_conditions.len());
390+
let mut min_max_binding = Vec::with_capacity(m_join.build_conditions.len() * 2);
391+
let mut min_max_scalar_items = Vec::with_capacity(m_join.build_conditions.len() * 2);
387392
let mut group_items = vec![];
388393

389394
let index = metadata
@@ -407,46 +412,46 @@ impl MergeIntoInterpreter {
407412
scalar: evaled,
408413
index,
409414
});
410-
for source_side_expr in m_join.source_conditions {
415+
for build_side_expr in m_join.build_conditions {
411416
// eval source side join expr
412417
let index = metadata
413418
.write()
414-
.add_derived_column("".to_string(), source_side_expr.data_type()?);
419+
.add_derived_column("".to_string(), build_side_expr.data_type()?);
415420
let evaled = ScalarExpr::BoundColumnRef(BoundColumnRef {
416421
span: None,
417422
column: ColumnBindingBuilder::new(
418423
"".to_string(),
419424
index,
420-
Box::new(source_side_expr.data_type()?),
425+
Box::new(build_side_expr.data_type()?),
421426
Visibility::Visible,
422427
)
423428
.build(),
424429
});
425430
eval_scalar_items.push(ScalarItem {
426-
scalar: source_side_expr.clone(),
431+
scalar: build_side_expr.clone(),
427432
index,
428433
});
429434

430435
// eval min/max of source side join expr
431-
let min_display_name = format!("min({:?})", source_side_expr);
432-
let max_display_name = format!("max({:?})", source_side_expr);
436+
let min_display_name = format!("min({:?})", build_side_expr);
437+
let max_display_name = format!("max({:?})", build_side_expr);
433438
let min_index = metadata
434439
.write()
435-
.add_derived_column(min_display_name.clone(), source_side_expr.data_type()?);
440+
.add_derived_column(min_display_name.clone(), build_side_expr.data_type()?);
436441
let max_index = metadata
437442
.write()
438-
.add_derived_column(max_display_name.clone(), source_side_expr.data_type()?);
443+
.add_derived_column(max_display_name.clone(), build_side_expr.data_type()?);
439444
let min_binding = ColumnBindingBuilder::new(
440445
min_display_name.clone(),
441446
min_index,
442-
Box::new(source_side_expr.data_type()?),
447+
Box::new(build_side_expr.data_type()?),
443448
Visibility::Visible,
444449
)
445450
.build();
446451
let max_binding = ColumnBindingBuilder::new(
447452
max_display_name.clone(),
448453
max_index,
449-
Box::new(source_side_expr.data_type()?),
454+
Box::new(build_side_expr.data_type()?),
450455
Visibility::Visible,
451456
)
452457
.build();
@@ -458,7 +463,7 @@ impl MergeIntoInterpreter {
458463
distinct: false,
459464
params: vec![],
460465
args: vec![evaled.clone()],
461-
return_type: Box::new(source_side_expr.data_type()?),
466+
return_type: Box::new(build_side_expr.data_type()?),
462467
display_name: min_display_name.clone(),
463468
}),
464469
index: min_index,
@@ -469,7 +474,7 @@ impl MergeIntoInterpreter {
469474
distinct: false,
470475
params: vec![],
471476
args: vec![evaled],
472-
return_type: Box::new(source_side_expr.data_type()?),
477+
return_type: Box::new(build_side_expr.data_type()?),
473478
display_name: max_display_name.clone(),
474479
}),
475480
index: max_index,
@@ -478,21 +483,26 @@ impl MergeIntoInterpreter {
478483
min_max_scalar_items.push(max);
479484
}
480485

481-
let eval_source_side_join_expr_op = EvalScalar {
486+
let eval_build_side_join_expr_op = EvalScalar {
482487
items: eval_scalar_items,
483488
};
484-
let source_plan = m_join.source_sexpr;
485-
let eval_target_side_condition_sexpr = if let RelOperator::Exchange(_) = source_plan.plan()
486-
{
489+
let build_plan = m_join.build_sexpr;
490+
let eval_probe_side_condition_sexpr = if let RelOperator::Exchange(_) = build_plan.plan() {
487491
// there is another row_number operator here
488492
SExpr::create_unary(
489-
Arc::new(eval_source_side_join_expr_op.into()),
490-
Arc::new(source_plan.child(0)?.child(0)?.clone()),
493+
Arc::new(eval_build_side_join_expr_op.into()),
494+
Arc::new(SExpr::create_unary(
495+
// merge data here
496+
Arc::new(RelOperator::Exchange(
497+
databend_common_sql::plans::Exchange::Merge,
498+
)),
499+
Arc::new(build_plan.child(0)?.child(0)?.clone()),
500+
)),
491501
)
492502
} else {
493503
SExpr::create_unary(
494-
Arc::new(eval_source_side_join_expr_op.into()),
495-
Arc::new(source_plan.clone()),
504+
Arc::new(eval_build_side_join_expr_op.into()),
505+
Arc::new(build_plan.clone()),
496506
)
497507
};
498508

@@ -509,7 +519,7 @@ impl MergeIntoInterpreter {
509519
};
510520
let agg_partial_sexpr = SExpr::create_unary(
511521
Arc::new(agg_partial_op.into()),
512-
Arc::new(eval_target_side_condition_sexpr),
522+
Arc::new(eval_probe_side_condition_sexpr),
513523
);
514524
let agg_final_op = Aggregate {
515525
mode: AggregateMode::Final,

src/query/service/src/interpreters/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ mod interpreter_index_refresh;
4848
mod interpreter_insert;
4949
mod interpreter_kill;
5050
mod interpreter_merge_into;
51-
mod interpreter_merge_into_static_filter;
5251
mod interpreter_metrics;
5352
mod interpreter_network_policies_show;
5453
mod interpreter_network_policy_alter;

0 commit comments

Comments
 (0)