Skip to content

Commit

Permalink
feat: support background DDL for MV on shared source
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Oct 15, 2024
1 parent 1ba3525 commit 651dd45
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 7 deletions.
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/stream_materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::catalog::table_catalog::{TableCatalog, TableType, TableVersion};
use crate::error::Result;
use crate::optimizer::plan_node::derive::derive_pk;
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::utils::plan_has_backfill_leaf_nodes;
use crate::optimizer::plan_node::utils::plan_can_use_backgronud_ddl;
use crate::optimizer::plan_node::{PlanBase, PlanNodeMeta};
use crate::optimizer::property::{Cardinality, Distribution, Order, RequiredDist};
use crate::stream_fragmenter::BuildFragmentGraphState;
Expand Down Expand Up @@ -88,7 +88,7 @@ impl StreamMaterialize {

let create_type = if matches!(table_type, TableType::MaterializedView)
&& input.ctx().session_ctx().config().background_ddl()
&& plan_has_backfill_leaf_nodes(&input)
&& plan_can_use_backgronud_ddl(&input)
{
CreateType::Background
} else {
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/stream_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use super::{generic, ExprRewritable, PlanBase, PlanRef, StreamNode, StreamProjec
use crate::error::{ErrorCode, Result};
use crate::expr::{ExprImpl, FunctionCall, InputRef};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::utils::plan_has_backfill_leaf_nodes;
use crate::optimizer::plan_node::utils::plan_can_use_backgronud_ddl;
use crate::optimizer::plan_node::PlanTreeNodeUnary;
use crate::optimizer::property::{Distribution, Order, RequiredDist};
use crate::stream_fragmenter::BuildFragmentGraphState;
Expand Down Expand Up @@ -380,7 +380,7 @@ impl StreamSink {
let input = required_dist.enforce_if_not_satisfies(input, &Order::any())?;
let distribution_key = input.distribution().dist_column_indices().to_vec();
let create_type = if input.ctx().session_ctx().config().background_ddl()
&& plan_has_backfill_leaf_nodes(&input)
&& plan_can_use_backgronud_ddl(&input)
{
CreateType::Background
} else {
Expand Down
10 changes: 7 additions & 3 deletions src/frontend/src/optimizer/plan_node/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,17 +377,21 @@ pub fn infer_kv_log_store_table_catalog_inner(

/// Check that all leaf nodes must be stream table scan,
/// since that plan node maps to `backfill` executor, which supports recovery.
pub(crate) fn plan_has_backfill_leaf_nodes(plan: &PlanRef) -> bool {
/// Some other leaf nodes like `StreamValues` do not support recovery, and they
/// cannot use background ddl.
pub(crate) fn plan_can_use_backgronud_ddl(plan: &PlanRef) -> bool {
if plan.inputs().is_empty() {
if let Some(scan) = plan.as_stream_table_scan() {
if plan.as_stream_source_scan().is_some() {
true
} else if let Some(scan) = plan.as_stream_table_scan() {
scan.stream_scan_type() == StreamScanType::Backfill
|| scan.stream_scan_type() == StreamScanType::ArrangementBackfill
} else {
false
}
} else {
assert!(!plan.inputs().is_empty());
plan.inputs().iter().all(plan_has_backfill_leaf_nodes)
plan.inputs().iter().all(plan_can_use_backgronud_ddl)
}
}

Expand Down
1 change: 1 addition & 0 deletions src/meta/src/rpc/ddl_controller_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ impl DdlController {
tracing::debug!(
id = job_id,
definition = streaming_job.definition(),
create_type = streaming_job.create_type().as_str_name(),
"starting streaming job",
);
let _permit = self
Expand Down

0 comments on commit 651dd45

Please sign in to comment.