Skip to content

Commit

Permalink
refactor: some CDC table's code (#19255)
Browse files Browse the repository at this point in the history
  • Loading branch information
st1page authored Nov 8, 2024
1 parent 99703d1 commit c658340
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 192 deletions.
88 changes: 43 additions & 45 deletions src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::{ProjectNode, StreamFragmentGraph};
use risingwave_sqlparser::ast::{
AlterTableOperation, ColumnDef, ColumnOption, DataType as AstDataType, Encode,
FormatEncodeOptions, ObjectName, Statement, StructField,
FormatEncodeOptions, Ident, ObjectName, Statement, StructField, TableConstraint,
};
use risingwave_sqlparser::parser::Parser;

Expand All @@ -43,34 +43,10 @@ use crate::catalog::table_catalog::TableType;
use crate::error::{ErrorCode, Result, RwError};
use crate::expr::{Expr, ExprImpl, InputRef, Literal};
use crate::handler::create_sink::{fetch_incoming_sinks, insert_merger_to_union_with_project};
use crate::handler::create_table::bind_table_constraints;
use crate::session::SessionImpl;
use crate::{Binder, TableCatalog, WithOptions};

pub async fn replace_table_with_definition(
session: &Arc<SessionImpl>,
table_name: ObjectName,
definition: Statement,
original_catalog: &Arc<TableCatalog>,
format_encode: Option<FormatEncodeOptions>,
) -> Result<()> {
let (source, table, graph, col_index_mapping, job_type) = get_replace_table_plan(
session,
table_name,
definition,
original_catalog,
format_encode,
None,
)
.await?;

let catalog_writer = session.catalog_writer()?;

catalog_writer
.replace_table(source, table, graph, col_index_mapping, job_type)
.await?;
Ok(())
}

/// Used in auto schema change process
pub async fn get_new_table_definition_for_cdc_table(
session: &Arc<SessionImpl>,
Expand All @@ -84,9 +60,11 @@ pub async fn get_new_table_definition_for_cdc_table(
.context("unable to parse original table definition")?
.try_into()
.unwrap();

let Statement::CreateTable {
columns: original_columns,
format_encode,
constraints,
..
} = &mut definition
else {
Expand All @@ -98,6 +76,22 @@ pub async fn get_new_table_definition_for_cdc_table(
"source schema should be None for CDC table"
);

if bind_table_constraints(constraints)?.is_empty() {
// For table created by `create table t (*)` the constraint is empty, we need to
// retrieve primary key names from original table catalog if available
let pk_names: Vec<_> = original_catalog
.pk
.iter()
.map(|x| original_catalog.columns[x.column_index].name().to_string())
.collect();

constraints.push(TableConstraint::Unique {
name: None,
columns: pk_names.iter().map(Ident::new_unchecked).collect(),
is_primary: true,
});
}

let orig_column_catalog: HashMap<String, ColumnCatalog> = HashMap::from_iter(
original_catalog
.columns()
Expand Down Expand Up @@ -163,9 +157,8 @@ fn to_ast_data_type(ty: &DataType) -> Result<AstDataType> {
pub async fn get_replace_table_plan(
session: &Arc<SessionImpl>,
table_name: ObjectName,
definition: Statement,
original_catalog: &Arc<TableCatalog>,
format_encode: Option<FormatEncodeOptions>,
new_definition: Statement,
old_catalog: &Arc<TableCatalog>,
new_version_columns: Option<Vec<ColumnCatalog>>, // only provided in auto schema change
) -> Result<(
Option<Source>,
Expand All @@ -175,8 +168,8 @@ pub async fn get_replace_table_plan(
TableJobType,
)> {
// Create handler args as if we're creating a new table with the altered definition.
let handler_args = HandlerArgs::new(session.clone(), &definition, Arc::from(""))?;
let col_id_gen = ColumnIdGenerator::new_alter(original_catalog);
let handler_args = HandlerArgs::new(session.clone(), &new_definition, Arc::from(""))?;
let col_id_gen = ColumnIdGenerator::new_alter(old_catalog);
let Statement::CreateTable {
columns,
constraints,
Expand All @@ -186,16 +179,21 @@ pub async fn get_replace_table_plan(
with_version_column,
wildcard_idx,
cdc_table_info,
format_encode,
..
} = definition
} = new_definition
else {
panic!("unexpected statement type: {:?}", definition);
panic!("unexpected statement type: {:?}", new_definition);
};

let format_encode = format_encode
.clone()
.map(|format_encode| format_encode.into_v2_with_warning());

let (mut graph, table, source, job_type) = generate_stream_graph_for_replace_table(
session,
table_name,
original_catalog,
old_catalog,
format_encode,
handler_args.clone(),
col_id_gen,
Expand All @@ -213,7 +211,7 @@ pub async fn get_replace_table_plan(

// Calculate the mapping from the original columns to the new columns.
let col_index_mapping = ColIndexMapping::new(
original_catalog
old_catalog
.columns()
.iter()
.map(|old_c| {
Expand All @@ -225,7 +223,7 @@ pub async fn get_replace_table_plan(
table.columns.len(),
);

let incoming_sink_ids: HashSet<_> = original_catalog.incoming_sinks.iter().copied().collect();
let incoming_sink_ids: HashSet<_> = old_catalog.incoming_sinks.iter().copied().collect();

let target_columns = table
.columns
Expand All @@ -245,7 +243,7 @@ pub async fn get_replace_table_plan(
// Set some fields ourselves so that the meta service does not need to maintain them.
let mut table = table;
table.incoming_sinks = incoming_sink_ids.iter().copied().collect();
table.maybe_vnode_count = VnodeCount::set(original_catalog.vnode_count()).to_protobuf();
table.maybe_vnode_count = VnodeCount::set(old_catalog.vnode_count()).to_protobuf();

Ok((source, table, graph, col_index_mapping, job_type))
}
Expand Down Expand Up @@ -332,6 +330,7 @@ pub async fn handle_alter_table_column(
else {
panic!("unexpected statement: {:?}", definition);
};

let format_encode = format_encode
.clone()
.map(|format_encode| format_encode.into_v2_with_warning());
Expand Down Expand Up @@ -455,15 +454,14 @@ pub async fn handle_alter_table_column(
_ => unreachable!(),
};

replace_table_with_definition(
&session,
table_name,
definition,
&original_catalog,
format_encode,
)
.await?;
let (source, table, graph, col_index_mapping, job_type) =
get_replace_table_plan(&session, table_name, definition, &original_catalog, None).await?;

let catalog_writer = session.catalog_writer()?;

catalog_writer
.replace_table(source, table, graph, col_index_mapping, job_type)
.await?;
Ok(PgResponse::empty_result(StatementType::ALTER_TABLE))
}

Expand Down
64 changes: 34 additions & 30 deletions src/frontend/src/handler/alter_table_with_sr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,16 @@

use anyhow::{anyhow, Context};
use fancy_regex::Regex;
use pgwire::pg_response::StatementType;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::bail_not_implemented;
use risingwave_sqlparser::ast::{FormatEncodeOptions, ObjectName, Statement};
use risingwave_sqlparser::parser::Parser;
use thiserror_ext::AsReport;

use super::alter_source_with_sr::alter_definition_format_encode;
use super::alter_table_column::{
fetch_table_catalog_for_alter, replace_table_with_definition, schema_has_schema_registry,
};
use super::alter_table_column::{fetch_table_catalog_for_alter, schema_has_schema_registry};
use super::util::SourceSchemaCompatExt;
use super::{HandlerArgs, RwPgResponse};
use super::{get_replace_table_plan, HandlerArgs, RwPgResponse};
use crate::error::{ErrorCode, Result};
use crate::TableCatalog;

Expand Down Expand Up @@ -66,6 +64,7 @@ pub async fn handle_refresh_schema(
format_encode.unwrap()
};

// NOTE(st1page): since we have not implemented alter format encode for table, it is actually no use.
let definition = alter_definition_format_encode(
&original_table.definition,
format_encode.row_options.clone(),
Expand All @@ -76,31 +75,36 @@ pub async fn handle_refresh_schema(
.try_into()
.unwrap();

let result = replace_table_with_definition(
&session,
table_name,
definition,
&original_table,
Some(format_encode),
)
.await;

match result {
Ok(_) => Ok(RwPgResponse::empty_result(StatementType::ALTER_TABLE)),
Err(e) => {
let report = e.to_report_string();
// This is a workaround for reporting errors when columns to drop is referenced by generated column.
// Finding the actual columns to drop requires generating `PbSource` from the sql definition
// and fetching schema from schema registry, which will cause a lot of unnecessary refactor.
// Here we match the error message to yield when failing to bind generated column exprs.
let re = Regex::new(r#"fail to bind expression in generated column "(.*?)""#).unwrap();
let captures = re.captures(&report).map_err(anyhow::Error::from)?;
if let Some(gen_col_name) = captures.and_then(|captures| captures.get(1)) {
Err(anyhow!(e).context(format!("failed to refresh schema because some of the columns to drop are referenced by a generated column \"{}\"",
gen_col_name.as_str())).into())
} else {
Err(e)
let (source, table, graph, col_index_mapping, job_type) = {
let result =
get_replace_table_plan(&session, table_name, definition, &original_table, None).await;
match result {
Ok((source, table, graph, col_index_mapping, job_type)) => {
Ok((source, table, graph, col_index_mapping, job_type))
}
Err(e) => {
let report = e.to_report_string();
// NOTE(yuhao): This is a workaround for reporting errors when columns to drop is referenced by generated column.
// Finding the actual columns to drop requires generating `PbSource` from the sql definition
// and fetching schema from schema registry, which will cause a lot of unnecessary refactor.
// Here we match the error message to yield when failing to bind generated column exprs.
let re =
Regex::new(r#"fail to bind expression in generated column "(.*?)""#).unwrap();
let captures = re.captures(&report).map_err(anyhow::Error::from)?;
if let Some(gen_col_name) = captures.and_then(|captures| captures.get(1)) {
Err(anyhow!(e).context(format!("failed to refresh schema because some of the columns to drop are referenced by a generated column \"{}\"",
gen_col_name.as_str())).into())
} else {
Err(e)
}
}
}
}
}?;
let catalog_writer = session.catalog_writer()?;

catalog_writer
.replace_table(source, table, graph, col_index_mapping, job_type)
.await?;

Ok(PgResponse::empty_result(StatementType::ALTER_TABLE))
}
5 changes: 2 additions & 3 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ use crate::error::{Result, RwError};
use crate::expr::Expr;
use crate::handler::create_table::{
bind_pk_and_row_id_on_relation, bind_sql_column_constraints, bind_sql_columns,
bind_sql_pk_names, ensure_table_constraints_supported, ColumnIdGenerator,
bind_sql_pk_names, bind_table_constraints, ColumnIdGenerator,
};
use crate::handler::util::SourceSchemaCompatExt;
use crate::handler::HandlerArgs;
Expand Down Expand Up @@ -1523,8 +1523,7 @@ pub async fn bind_create_source_or_table_with_connector(
}
}

ensure_table_constraints_supported(&constraints)?;
let sql_pk_names = bind_sql_pk_names(sql_columns_defs, &constraints)?;
let sql_pk_names = bind_sql_pk_names(sql_columns_defs, bind_table_constraints(&constraints)?)?;

let columns_from_sql = bind_sql_columns(sql_columns_defs)?;

Expand Down
Loading

0 comments on commit c658340

Please sign in to comment.