From 89c977cd53fdfe64ce3394e7489c10d540fdd561 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Fri, 1 Nov 2024 12:06:08 +0800 Subject: [PATCH 1/7] save --- src/frontend/src/handler/util.rs | 50 ++++++++++---- .../optimizer/plan_node/generic/log_scan.rs | 29 +------- src/frontend/src/session/cursor_manager.rs | 67 ++++++++++++------- 3 files changed, 83 insertions(+), 63 deletions(-) diff --git a/src/frontend/src/handler/util.rs b/src/frontend/src/handler/util.rs index 9ff2cc92b552..2253405a091b 100644 --- a/src/frontend/src/handler/util.rs +++ b/src/frontend/src/handler/util.rs @@ -234,20 +234,44 @@ pub fn gen_query_from_table_name(from_name: ObjectName) -> Query { } } +// Plan like 'select * , pk in table order by pk' pub fn gen_query_from_table_name_order_by(from_name: ObjectName, pk_names: Vec) -> Query { - let mut query = gen_query_from_table_name(from_name); - query.order_by = pk_names - .into_iter() - .map(|pk| { - let expr = Expr::Identifier(Ident::with_quote_unchecked('"', pk)); - OrderByExpr { - expr, - asc: None, - nulls_first: None, - } - }) - .collect(); - query + let table_factor = TableFactor::Table { + name: from_name, + alias: None, + as_of: None, + }; + let from = vec![TableWithJoins { + relation: table_factor, + joins: vec![], + }]; + let mut projection = vec![SelectItem::Wildcard(None)]; + projection.extend(pk_names.iter().map(|name| SelectItem::UnnamedExpr(Expr::Identifier(Ident::new_unchecked(name.clone()))))); + let select = Select { + from, + projection, + ..Default::default() + }; + let body = SetExpr::Select(Box::new(select)); + let order_by = pk_names + .into_iter() + .map(|pk| { + let expr = Expr::Identifier(Ident::with_quote_unchecked('"', pk)); + OrderByExpr { + expr, + asc: None, + nulls_first: None, + } + }) + .collect(); + Query { + with: None, + body, + order_by, + limit: None, + offset: None, + fetch: None, + } } pub fn convert_unix_millis_to_logstore_u64(unix_millis: u64) -> u64 { diff --git a/src/frontend/src/optimizer/plan_node/generic/log_scan.rs b/src/frontend/src/optimizer/plan_node/generic/log_scan.rs index a57ba79242d1..d38b5b3e72d5 100644 --- a/src/frontend/src/optimizer/plan_node/generic/log_scan.rs +++ b/src/frontend/src/optimizer/plan_node/generic/log_scan.rs @@ -17,7 +17,6 @@ use std::rc::Rc; use educe::Educe; use fixedbitset::FixedBitSet; -use itertools::Itertools; use pretty_xmlish::Pretty; use risingwave_common::catalog::{Field, Schema, TableDesc}; use risingwave_common::types::DataType; @@ -34,9 +33,7 @@ const OP_TYPE: DataType = DataType::Varchar; #[educe(PartialEq, Eq, Hash)] pub struct LogScan { pub table_name: String, - /// Include `output_col_idx_with_out_hidden` and `op_column` - pub output_col_idx_with_out_hidden: Vec, - /// Include `output_col_idx_with_out_hidden` and `op_column` and hidden pk + /// Include `output_col_idx` and `op_column` pub output_col_idx: Vec, /// Descriptor of the table pub table_desc: Rc, @@ -85,16 +82,6 @@ impl LogScan { out_column_names } - pub(crate) fn column_names_without_hidden(&self) -> Vec { - let mut out_column_names: Vec<_> = self - .output_col_idx_with_out_hidden - .iter() - .map(|&i| self.table_desc.columns[i].name.clone()) - .collect(); - out_column_names.push(OP_NAME.to_string()); - out_column_names - } - pub fn distribution_key(&self) -> Option> { let tb_idx_to_op_idx = self .output_col_idx @@ -112,7 +99,6 @@ impl LogScan { /// Create a logical scan node for log table scan pub(crate) fn new( table_name: String, - output_col_idx_with_out_hidden: Vec, output_col_idx: Vec, table_desc: Rc, ctx: OptimizerContextRef, @@ -122,7 +108,6 @@ impl LogScan { ) -> Self { Self { table_name, - output_col_idx_with_out_hidden, output_col_idx, table_desc, chunk_size: None, @@ -163,17 +148,7 @@ impl LogScan { pub(crate) fn out_fields(&self) -> FixedBitSet { let mut out_fields_vec = self - .output_col_idx - .iter() - .enumerate() - .filter_map(|(index, idx)| { - if self.output_col_idx_with_out_hidden.contains(idx) { - Some(index) - } else { - None - } - }) - .collect_vec(); + .output_col_idx.clone(); // add op column out_fields_vec.push(self.output_col_idx.len()); FixedBitSet::from_iter(out_fields_vec) diff --git a/src/frontend/src/session/cursor_manager.rs b/src/frontend/src/session/cursor_manager.rs index f8464120a531..02bf9918716c 100644 --- a/src/frontend/src/session/cursor_manager.rs +++ b/src/frontend/src/session/cursor_manager.rs @@ -23,6 +23,7 @@ use std::time::Instant; use anyhow::anyhow; use bytes::Bytes; use futures::StreamExt; +use itertools::Itertools; use pgwire::pg_field_descriptor::PgFieldDescriptor; use pgwire::pg_response::StatementType; use pgwire::types::{Format, Row}; @@ -30,6 +31,7 @@ use risingwave_common::catalog::Field; use risingwave_common::error::BoxedError; use risingwave_common::session_config::QueryMode; use risingwave_common::types::DataType; +use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::sort_util::ColumnOrder; use risingwave_hummock_sdk::HummockVersionId; use risingwave_sqlparser::ast::{Ident, ObjectName, Statement}; @@ -37,7 +39,7 @@ use risingwave_sqlparser::ast::{Ident, ObjectName, Statement}; use super::SessionImpl; use crate::catalog::subscription_catalog::SubscriptionCatalog; use crate::catalog::TableId; -use crate::error::{ErrorCode, Result, RwError}; +use crate::error::{ErrorCode, Result}; use crate::handler::declare_cursor::create_chunk_stream_for_cursor; use crate::handler::query::{ gen_batch_plan_by_statement, gen_batch_plan_fragmenter, BatchQueryPlanResult, @@ -267,6 +269,7 @@ pub struct SubscriptionCursor { fields: Vec, cursor_metrics: Arc, last_fetch: Instant, + pk_column_names: HashMap, } impl SubscriptionCursor { @@ -278,7 +281,7 @@ impl SubscriptionCursor { handler_args: &HandlerArgs, cursor_metrics: Arc, ) -> Result { - let (state, fields) = if let Some(start_timestamp) = start_timestamp { + let (state, fields,pk_column_names) = if let Some(start_timestamp) = start_timestamp { let table_catalog = handler_args.session.get_table_by_id(&dependent_table_id)?; let fields = table_catalog .columns @@ -286,6 +289,7 @@ impl SubscriptionCursor { .filter(|c| !c.is_hidden) .map(|c| Field::with_name(c.data_type().clone(), c.name())) .collect(); + let pk_column_names = get_pk_names(table_catalog.pk(), &table_catalog); let fields = Self::build_desc(fields, true); ( State::InitLogStoreQuery { @@ -293,13 +297,14 @@ impl SubscriptionCursor { expected_timestamp: None, }, fields, + pk_column_names, ) } else { // The query stream needs to initiated on cursor creation to make sure // future fetch on the cursor starts from the snapshot when the cursor is declared. // // TODO: is this the right behavior? Should we delay the query stream initiation till the first fetch? - let (chunk_stream, fields, init_query_timer) = + let (chunk_stream, fields, init_query_timer,pk_column_names) = Self::initiate_query(None, &dependent_table_id, handler_args.clone()).await?; let pinned_epoch = handler_args .session @@ -324,6 +329,7 @@ impl SubscriptionCursor { init_query_timer, }, fields, + pk_column_names, ) }; @@ -338,6 +344,7 @@ impl SubscriptionCursor { fields, cursor_metrics, last_fetch: Instant::now(), + pk_column_names }) } @@ -363,7 +370,7 @@ impl SubscriptionCursor { &self.subscription, ) { Ok((Some(rw_timestamp), expected_timestamp)) => { - let (mut chunk_stream, fields, init_query_timer) = + let (mut chunk_stream, fields, init_query_timer, pk_column_names) = Self::initiate_query( Some(rw_timestamp), &self.dependent_table_id, @@ -392,8 +399,9 @@ impl SubscriptionCursor { expected_timestamp, init_query_timer, }; - if self.fields.ne(&fields) { + if self.fields.ne(&fields) || self.pk_column_names.ne(&pk_column_names) { self.fields = fields; + self.pk_column_names = pk_column_names; return Ok(None); } } @@ -546,6 +554,7 @@ impl SubscriptionCursor { } } self.last_fetch = Instant::now(); + Self::process_output_desc_row(descs, row, pk_column_names) let desc = self.fields.iter().map(to_pg_field).collect(); Ok((ans, desc)) @@ -653,21 +662,14 @@ impl SubscriptionCursor { let pk_names = pks .iter() .map(|f| { - Ok::( table_catalog .columns .get(f.column_index) - .ok_or_else(|| { - anyhow!( - "columns not find in table schema, index is {:?}", - f.column_index - ) - })? + .unwrap() .name() - .to_string(), - ) + .to_string() }) - .collect::>>()?; + .collect_vec(); let query_stmt = Statement::Query(Box::new(gen_query_from_table_name_order_by( subscription_from_table_name, pk_names, @@ -680,8 +682,12 @@ impl SubscriptionCursor { rw_timestamp: Option, dependent_table_id: &TableId, handler_args: HandlerArgs, - ) -> Result<(CursorDataChunkStream, Vec, Instant)> { + ) -> Result<(CursorDataChunkStream, Vec, Instant, HashMap)> { let init_query_timer = Instant::now(); + let session = handler_args.clone().session; + let table_catalog = session.get_table_by_id(dependent_table_id)?; + let pks = table_catalog.pk(); + let pk_column_names = get_pk_names(pks, &table_catalog); let plan_result = Self::init_batch_plan_for_subscription_cursor( rw_timestamp, dependent_table_id, @@ -694,6 +700,7 @@ impl SubscriptionCursor { chunk_stream, Self::build_desc(fields, rw_timestamp.is_none()), init_query_timer, + pk_column_names, )) } @@ -740,6 +747,12 @@ impl SubscriptionCursor { Ok(row) } + pub fn process_output_desc_row(descs: Vec, row: Vec,pk_column_names: &HashSet) -> (Vec,Vec) { + descs.into_iter().enumerate().filter_map(|(index, field)| { + + }) + } + pub fn build_desc(mut descs: Vec, from_snapshot: bool) -> Vec { if from_snapshot { descs.push(Field::with_name(DataType::Varchar, "op")); @@ -770,14 +783,8 @@ impl SubscriptionCursor { } }) .collect::>(); - let output_col_idx_with_out_hidden = output_col_idx - .iter() - .filter(|index| !table_catalog.columns[**index].is_hidden) - .cloned() - .collect::>(); let core = generic::LogScan::new( table_catalog.name.clone(), - output_col_idx_with_out_hidden, output_col_idx, Rc::new(table_catalog.table_desc()), context, @@ -789,7 +796,7 @@ impl SubscriptionCursor { let batch_log_seq_scan = BatchLogSeqScan::new(core); let out_fields = batch_log_seq_scan.core().out_fields(); - let out_names = batch_log_seq_scan.core().column_names_without_hidden(); + let out_names = batch_log_seq_scan.core().column_names(); // order by pk, so don't need to sort let order = Order::new(pks.to_vec()); @@ -1043,3 +1050,17 @@ impl CursorManager { } } } + +fn get_pk_names(pks: &[ColumnOrder], table_catalog: &TableCatalog) -> HashMap { + pks + .iter() + .map(|f| { + let column = + table_catalog + .columns + .get(f.column_index) + .unwrap(); + (column.name().to_string(),column.is_hidden) + }) + .collect() +} \ No newline at end of file From c071111fb20e0da10b08576e5c38d65123cd693b Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Fri, 1 Nov 2024 14:05:31 +0800 Subject: [PATCH 2/7] fix all --- src/frontend/src/session/cursor_manager.rs | 18 ++++++++++++++---- src/utils/pgwire/src/types.rs | 2 +- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/src/frontend/src/session/cursor_manager.rs b/src/frontend/src/session/cursor_manager.rs index 02bf9918716c..4203cb02f561 100644 --- a/src/frontend/src/session/cursor_manager.rs +++ b/src/frontend/src/session/cursor_manager.rs @@ -747,10 +747,20 @@ impl SubscriptionCursor { Ok(row) } - pub fn process_output_desc_row(descs: Vec, row: Vec,pk_column_names: &HashSet) -> (Vec,Vec) { - descs.into_iter().enumerate().filter_map(|(index, field)| { - - }) + pub fn process_output_desc_row(descs: Vec, mut row: Vec,pk_column_names: &HashMap) -> (Vec,Vec) { + let iter= descs.iter().map(|field| { + if let Some(is_hidden) = pk_column_names.get(&field.name) && *is_hidden{ + (false,field) + } else { + (true,field) + } + }); + let pk_fields = iter.filter(|(is_hidden,_)| *is_hidden).map(|(_,field)| field).cloned().collect(); + let mut pk_keep = iter.map(|(is_hidden,_)| is_hidden); + row.iter_mut().for_each(|row| { + row.0.retain(|x| pk_keep.next().unwrap()); + }); + (pk_fields,row) } pub fn build_desc(mut descs: Vec, from_snapshot: bool) -> Vec { diff --git a/src/utils/pgwire/src/types.rs b/src/utils/pgwire/src/types.rs index c76aa20aac4c..95638d4f6c02 100644 --- a/src/utils/pgwire/src/types.rs +++ b/src/utils/pgwire/src/types.rs @@ -23,7 +23,7 @@ use crate::error::{PsqlError, PsqlResult}; /// A row of data returned from the database by a query. #[derive(Debug, Clone)] // NOTE: Since we only support simple query protocol, the values are represented as strings. -pub struct Row(Vec>); +pub struct Row(pub Vec>); impl Row { /// Create a row from values. From 0ab16687347119e1c41b6896d272e6187838669d Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Fri, 1 Nov 2024 15:47:44 +0800 Subject: [PATCH 3/7] save --- src/frontend/src/session/cursor_manager.rs | 36 +++++++++++++++------- 1 file changed, 25 insertions(+), 11 deletions(-) diff --git a/src/frontend/src/session/cursor_manager.rs b/src/frontend/src/session/cursor_manager.rs index 4203cb02f561..df83909e58f6 100644 --- a/src/frontend/src/session/cursor_manager.rs +++ b/src/frontend/src/session/cursor_manager.rs @@ -270,6 +270,7 @@ pub struct SubscriptionCursor { cursor_metrics: Arc, last_fetch: Instant, pk_column_names: HashMap, + seek_pk_row: Option>>, } impl SubscriptionCursor { @@ -344,7 +345,8 @@ impl SubscriptionCursor { fields, cursor_metrics, last_fetch: Instant::now(), - pk_column_names + pk_column_names, + seek_pk_row: None, }) } @@ -554,10 +556,13 @@ impl SubscriptionCursor { } } self.last_fetch = Instant::now(); - Self::process_output_desc_row(descs, row, pk_column_names) - let desc = self.fields.iter().map(to_pg_field).collect(); + let (fields,rows,seek_pk_row) = Self::process_output_desc_row(&self.fields, ans, &self.pk_column_names); + if let Some(seek_pk_row) = seek_pk_row{ + self.seek_pk_row = Some(seek_pk_row); + } + let desc = fields.iter().map(to_pg_field).collect(); - Ok((ans, desc)) + Ok((rows, desc)) } fn get_next_rw_timestamp( @@ -747,20 +752,29 @@ impl SubscriptionCursor { Ok(row) } - pub fn process_output_desc_row(descs: Vec, mut row: Vec,pk_column_names: &HashMap) -> (Vec,Vec) { + pub fn process_output_desc_row(descs: &Vec, mut rows: Vec,pk_column_names: &HashMap) -> (Vec,Vec,Option>>) { + let last_row = rows.last_mut().map(|row|{ + row.0.iter().zip_eq_fast(descs.iter()).filter_map(|(data,field)|{ + if pk_column_names.contains_key(&field.name){ + Some(data.clone()) + } else { + None + } + }).collect_vec() + }); let iter= descs.iter().map(|field| { if let Some(is_hidden) = pk_column_names.get(&field.name) && *is_hidden{ (false,field) } else { (true,field) } + }).collect_vec(); + let pk_fields = iter.iter().filter(|(is_hidden,_)| *is_hidden).map(|(_,field)| (*field).clone()).collect(); + let mut pk_keep = iter.iter().map(|(is_hidden,_)| *is_hidden); + rows.iter_mut().for_each(|row| { + row.0.retain(|_| pk_keep.next().unwrap()); }); - let pk_fields = iter.filter(|(is_hidden,_)| *is_hidden).map(|(_,field)| field).cloned().collect(); - let mut pk_keep = iter.map(|(is_hidden,_)| is_hidden); - row.iter_mut().for_each(|row| { - row.0.retain(|x| pk_keep.next().unwrap()); - }); - (pk_fields,row) + (pk_fields,rows,last_row) } pub fn build_desc(mut descs: Vec, from_snapshot: bool) -> Vec { From aa47c6e454347dc7399f61ee0fdf8792a5f2446e Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Fri, 1 Nov 2024 17:15:24 +0800 Subject: [PATCH 4/7] save save support all data support --- proto/batch_plan.proto | 1 + src/batch/src/executor/log_row_seq_scan.rs | 71 +++-- src/batch/src/executor/row_seq_scan.rs | 114 +------- src/batch/src/executor/utils.rs | 125 +++++++++ src/frontend/src/expr/function_call.rs | 2 +- src/frontend/src/handler/util.rs | 91 +++++-- .../optimizer/plan_node/batch_log_seq_scan.rs | 37 ++- .../src/optimizer/plan_node/batch_seq_scan.rs | 70 +---- .../optimizer/plan_node/batch_sys_seq_scan.rs | 66 +---- .../optimizer/plan_node/generic/log_scan.rs | 25 +- src/frontend/src/optimizer/plan_node/utils.rs | 58 +++++ src/frontend/src/session/cursor_manager.rs | 246 ++++++++++++++---- .../src/table/batch_table/storage_table.rs | 7 +- .../executor/backfill/snapshot_backfill.rs | 3 + 14 files changed, 582 insertions(+), 334 deletions(-) diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto index f881f6546fae..dd531ed5773a 100644 --- a/proto/batch_plan.proto +++ b/proto/batch_plan.proto @@ -123,6 +123,7 @@ message LogRowSeqScanNode { common.BatchQueryEpoch old_epoch = 4; common.BatchQueryEpoch new_epoch = 5; bool ordered = 6; + repeated ScanRange scan_ranges = 7; } message InsertNode { diff --git a/src/batch/src/executor/log_row_seq_scan.rs b/src/batch/src/executor/log_row_seq_scan.rs index 6f40f42fbba8..cde3ae5ee863 100644 --- a/src/batch/src/executor/log_row_seq_scan.rs +++ b/src/batch/src/executor/log_row_seq_scan.rs @@ -18,13 +18,15 @@ use std::sync::Arc; use futures::prelude::stream::StreamExt; use futures_async_stream::try_stream; use futures_util::pin_mut; +use iceberg::scan; use prometheus::Histogram; use risingwave_common::array::{DataChunk, Op}; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::{ColumnId, Field, Schema}; use risingwave_common::hash::VnodeCountCompat; use risingwave_common::row::{Row, RowExt}; -use risingwave_common::types::ScalarImpl; +use risingwave_common::types::{DataType, ScalarImpl}; +use risingwave_common::util::scan_range; use risingwave_hummock_sdk::{HummockReadEpoch, HummockVersionId}; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::common::{batch_query_epoch, BatchQueryEpoch}; @@ -33,7 +35,9 @@ use risingwave_storage::table::batch_table::storage_table::StorageTable; use risingwave_storage::table::collect_data_chunk; use risingwave_storage::{dispatch_state_store, StateStore}; -use super::{BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder}; +use super::{ + BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder, ScanRange, +}; use crate::error::{BatchError, Result}; use crate::monitor::BatchMetrics; use crate::task::BatchTaskContext; @@ -53,6 +57,7 @@ pub struct LogRowSeqScanExecutor { new_epoch: u64, version_id: HummockVersionId, ordered: bool, + scan_ranges: Vec, } impl LogRowSeqScanExecutor { @@ -65,6 +70,7 @@ impl LogRowSeqScanExecutor { identity: String, metrics: Option, ordered: bool, + scan_ranges: Vec, ) -> Self { let mut schema = table.schema().clone(); schema.fields.push(Field::with_name( @@ -81,6 +87,7 @@ impl LogRowSeqScanExecutor { new_epoch, version_id, ordered, + scan_ranges, } } } @@ -139,6 +146,28 @@ impl BoxedExecutorBuilder for LogStoreRowSeqScanExecutorBuilder { let old_epoch = old_epoch.epoch; let new_epoch = new_epoch.epoch; + let scan_ranges = { + let scan_ranges = &log_store_seq_scan_node.scan_ranges; + if scan_ranges.is_empty() { + vec![ScanRange::full()] + } else { + scan_ranges + .iter() + .map(|scan_range| { + let pk_types = table_desc.pk.iter().map(|order| { + DataType::from( + table_desc.columns[order.column_index as usize] + .column_type + .as_ref() + .unwrap(), + ) + }); + ScanRange::new(scan_range.clone(), pk_types) + }) + .try_collect()? + } + }; + dispatch_state_store!(source.context().state_store(), state_store, { let table = StorageTable::new_partial(state_store, column_ids, vnodes, table_desc); Ok(Box::new(LogRowSeqScanExecutor::new( @@ -150,6 +179,7 @@ impl BoxedExecutorBuilder for LogStoreRowSeqScanExecutorBuilder { source.plan_node().get_identity().clone(), metrics, log_store_seq_scan_node.ordered, + scan_ranges, ))) }) } @@ -180,6 +210,7 @@ impl LogRowSeqScanExecutor { version_id, schema, ordered, + scan_ranges, .. } = *self; let table = std::sync::Arc::new(table); @@ -191,20 +222,23 @@ impl LogRowSeqScanExecutor { // Range Scan // WARN: DO NOT use `select` to execute range scans concurrently // it can consume too much memory if there're too many ranges. - let stream = Self::execute_range( - table.clone(), - old_epoch, - new_epoch, - version_id, - chunk_size, - histogram, - Arc::new(schema.clone()), - ordered, - ); - #[for_await] - for chunk in stream { - let chunk = chunk?; - yield chunk; + for range in scan_ranges { + let stream = Self::execute_range( + table.clone(), + old_epoch, + new_epoch, + version_id, + chunk_size, + histogram, + Arc::new(schema.clone()), + ordered, + range, + ); + #[for_await] + for chunk in stream { + let chunk = chunk?; + yield chunk; + } } } @@ -218,13 +252,18 @@ impl LogRowSeqScanExecutor { histogram: Option>, schema: Arc, ordered: bool, + scan_range: ScanRange, ) { + let pk_prefix = scan_range.pk_prefix.clone(); + let range_bounds = scan_range.convert_to_range_bounds(table.clone()); // Range Scan. let iter = table .batch_iter_log_with_pk_bounds( old_epoch, HummockReadEpoch::BatchQueryCommitted(new_epoch, version_id), ordered, + range_bounds, + pk_prefix, ) .await? .flat_map(|r| { diff --git a/src/batch/src/executor/row_seq_scan.rs b/src/batch/src/executor/row_seq_scan.rs index b65f4bf8939b..303ddf4270aa 100644 --- a/src/batch/src/executor/row_seq_scan.rs +++ b/src/batch/src/executor/row_seq_scan.rs @@ -35,6 +35,7 @@ use risingwave_storage::store::PrefetchOptions; use risingwave_storage::table::batch_table::storage_table::StorageTable; use risingwave_storage::{dispatch_state_store, StateStore}; +use super::ScanRange; use crate::error::{BatchError, Result}; use crate::executor::{ BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder, @@ -59,15 +60,6 @@ pub struct RowSeqScanExecutor { as_of: Option, } -/// Range for batch scan. -pub struct ScanRange { - /// The prefix of the primary key. - pub pk_prefix: OwnedRow, - - /// The range bounds of the next column. - pub next_col_bounds: (Bound, Bound), -} - #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct AsOf { pub timestamp: i64, @@ -98,64 +90,6 @@ impl From<&AsOf> for PbAsOf { } } -impl ScanRange { - /// Create a scan range from the prost representation. - pub fn new( - scan_range: PbScanRange, - mut pk_types: impl Iterator, - ) -> Result { - let pk_prefix = OwnedRow::new( - scan_range - .eq_conds - .iter() - .map(|v| { - let ty = pk_types.next().unwrap(); - deserialize_datum(v.as_slice(), &ty) - }) - .try_collect()?, - ); - if scan_range.lower_bound.is_none() && scan_range.upper_bound.is_none() { - return Ok(Self { - pk_prefix, - ..Self::full() - }); - } - - let bound_ty = pk_types.next().unwrap(); - let build_bound = |bound: &scan_range::Bound| -> Bound { - let datum = deserialize_datum(bound.value.as_slice(), &bound_ty).unwrap(); - if bound.inclusive { - Bound::Included(datum) - } else { - Bound::Excluded(datum) - } - }; - - let next_col_bounds: (Bound, Bound) = match ( - scan_range.lower_bound.as_ref(), - scan_range.upper_bound.as_ref(), - ) { - (Some(lb), Some(ub)) => (build_bound(lb), build_bound(ub)), - (None, Some(ub)) => (Bound::Unbounded, build_bound(ub)), - (Some(lb), None) => (build_bound(lb), Bound::Unbounded), - (None, None) => unreachable!(), - }; - - Ok(Self { - pk_prefix, - next_col_bounds, - }) - } - - /// Create a scan range for full table scan. - pub fn full() -> Self { - Self { - pk_prefix: OwnedRow::default(), - next_col_bounds: (Bound::Unbounded, Bound::Unbounded), - } - } -} - impl RowSeqScanExecutor { pub fn new( table: StorageTable, @@ -419,55 +353,15 @@ impl RowSeqScanExecutor { limit: Option, histogram: Option>, ) { - let ScanRange { - pk_prefix, - next_col_bounds, - } = scan_range; - - let order_type = table.pk_serializer().get_order_types()[pk_prefix.len()]; - let (start_bound, end_bound) = if order_type.is_ascending() { - (next_col_bounds.0, next_col_bounds.1) - } else { - (next_col_bounds.1, next_col_bounds.0) - }; - - let start_bound_is_bounded = !matches!(start_bound, Bound::Unbounded); - let end_bound_is_bounded = !matches!(end_bound, Bound::Unbounded); - + let pk_prefix = scan_range.pk_prefix.clone(); + let range_bounds = scan_range.convert_to_range_bounds(table.clone()); // Range Scan. assert!(pk_prefix.len() < table.pk_indices().len()); let iter = table .batch_chunk_iter_with_pk_bounds( epoch.into(), &pk_prefix, - ( - match start_bound { - Bound::Unbounded => { - if end_bound_is_bounded && order_type.nulls_are_first() { - // `NULL`s are at the start bound side, we should exclude them to meet SQL semantics. - Bound::Excluded(OwnedRow::new(vec![None])) - } else { - // Both start and end are unbounded, so we need to select all rows. - Bound::Unbounded - } - } - Bound::Included(x) => Bound::Included(OwnedRow::new(vec![x])), - Bound::Excluded(x) => Bound::Excluded(OwnedRow::new(vec![x])), - }, - match end_bound { - Bound::Unbounded => { - if start_bound_is_bounded && order_type.nulls_are_last() { - // `NULL`s are at the end bound side, we should exclude them to meet SQL semantics. - Bound::Excluded(OwnedRow::new(vec![None])) - } else { - // Both start and end are unbounded, so we need to select all rows. - Bound::Unbounded - } - } - Bound::Included(x) => Bound::Included(OwnedRow::new(vec![x])), - Bound::Excluded(x) => Bound::Excluded(OwnedRow::new(vec![x])), - }, - ), + range_bounds, ordered, chunk_size, PrefetchOptions::new(limit.is_none(), true), diff --git a/src/batch/src/executor/utils.rs b/src/batch/src/executor/utils.rs index 4f724ec5416c..acbd45159a32 100644 --- a/src/batch/src/executor/utils.rs +++ b/src/batch/src/executor/utils.rs @@ -12,11 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. +use core::ops::{Bound, RangeBounds}; +use std::sync::Arc; + use futures::stream::BoxStream; use futures::StreamExt; use futures_async_stream::try_stream; use risingwave_common::array::DataChunk; use risingwave_common::catalog::Schema; +use risingwave_common::row::{OwnedRow, Row}; +use risingwave_common::types::{DataType, Datum}; +use risingwave_common::util::value_encoding::deserialize_datum; +use risingwave_pb::batch_plan::{scan_range, PbScanRange}; +use risingwave_storage::table::batch_table::storage_table::StorageTable; +use risingwave_storage::StateStore; use crate::error::{BatchError, Result}; use crate::executor::{BoxedDataChunkStream, Executor}; @@ -124,3 +133,119 @@ impl Executor for WrapStreamExecutor { self.stream } } + +/// Range for batch scan. +pub struct ScanRange { + /// The prefix of the primary key. + pub pk_prefix: OwnedRow, + + /// The range bounds of the next column. + pub next_col_bounds: (Bound, Bound), +} + +impl ScanRange { + /// Create a scan range from the prost representation. + pub fn new( + scan_range: PbScanRange, + mut pk_types: impl Iterator, + ) -> Result { + let pk_prefix = OwnedRow::new( + scan_range + .eq_conds + .iter() + .map(|v| { + let ty = pk_types.next().unwrap(); + deserialize_datum(v.as_slice(), &ty) + }) + .try_collect()?, + ); + if scan_range.lower_bound.is_none() && scan_range.upper_bound.is_none() { + return Ok(Self { + pk_prefix, + ..Self::full() + }); + } + + let bound_ty = pk_types.next().unwrap(); + let build_bound = |bound: &scan_range::Bound| -> Bound { + let datum = deserialize_datum(bound.value.as_slice(), &bound_ty).unwrap(); + if bound.inclusive { + Bound::Included(datum) + } else { + Bound::Excluded(datum) + } + }; + + let next_col_bounds: (Bound, Bound) = match ( + scan_range.lower_bound.as_ref(), + scan_range.upper_bound.as_ref(), + ) { + (Some(lb), Some(ub)) => (build_bound(lb), build_bound(ub)), + (None, Some(ub)) => (Bound::Unbounded, build_bound(ub)), + (Some(lb), None) => (build_bound(lb), Bound::Unbounded), + (None, None) => unreachable!(), + }; + + Ok(Self { + pk_prefix, + next_col_bounds, + }) + } + + /// Create a scan range for full table scan. + pub fn full() -> Self { + Self { + pk_prefix: OwnedRow::default(), + next_col_bounds: (Bound::Unbounded, Bound::Unbounded), + } + } + + pub fn convert_to_range_bounds( + self, + table: Arc>, + ) -> impl RangeBounds { + let ScanRange { + pk_prefix, + next_col_bounds, + } = self; + + let order_type = table.pk_serializer().get_order_types()[pk_prefix.len()]; + let (start_bound, end_bound) = if order_type.is_ascending() { + (next_col_bounds.0, next_col_bounds.1) + } else { + (next_col_bounds.1, next_col_bounds.0) + }; + + let start_bound_is_bounded = !matches!(start_bound, Bound::Unbounded); + let end_bound_is_bounded = !matches!(end_bound, Bound::Unbounded); + + ( + match start_bound { + Bound::Unbounded => { + if end_bound_is_bounded && order_type.nulls_are_first() { + // `NULL`s are at the start bound side, we should exclude them to meet SQL semantics. + Bound::Excluded(OwnedRow::new(vec![None])) + } else { + // Both start and end are unbounded, so we need to select all rows. + Bound::Unbounded + } + } + Bound::Included(x) => Bound::Included(OwnedRow::new(vec![x])), + Bound::Excluded(x) => Bound::Excluded(OwnedRow::new(vec![x])), + }, + match end_bound { + Bound::Unbounded => { + if start_bound_is_bounded && order_type.nulls_are_last() { + // `NULL`s are at the end bound side, we should exclude them to meet SQL semantics. + Bound::Excluded(OwnedRow::new(vec![None])) + } else { + // Both start and end are unbounded, so we need to select all rows. + Bound::Unbounded + } + } + Bound::Included(x) => Bound::Included(OwnedRow::new(vec![x])), + Bound::Excluded(x) => Bound::Excluded(OwnedRow::new(vec![x])), + }, + ) + } +} diff --git a/src/frontend/src/expr/function_call.rs b/src/frontend/src/expr/function_call.rs index af1f84b321eb..4d9e73004974 100644 --- a/src/frontend/src/expr/function_call.rs +++ b/src/frontend/src/expr/function_call.rs @@ -25,7 +25,7 @@ use crate::expr::{ExprDisplay, ExprType, ExprVisitor, ImpureAnalyzer}; #[derive(Clone, Eq, PartialEq, Hash)] pub struct FunctionCall { - pub(super) func_type: ExprType, + pub func_type: ExprType, pub(super) return_type: DataType, pub(super) inputs: Vec, } diff --git a/src/frontend/src/handler/util.rs b/src/frontend/src/handler/util.rs index 2253405a091b..4fd38efb7b99 100644 --- a/src/frontend/src/handler/util.rs +++ b/src/frontend/src/handler/util.rs @@ -35,8 +35,8 @@ use risingwave_common::types::{ use risingwave_common::util::epoch::Epoch; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_sqlparser::ast::{ - CompatibleSourceSchema, ConnectorSchema, Expr, Ident, ObjectName, OrderByExpr, Query, Select, - SelectItem, SetExpr, TableFactor, TableWithJoins, + BinaryOperator, CompatibleSourceSchema, ConnectorSchema, Expr, Ident, ObjectName, OrderByExpr, + Query, Select, SelectItem, SetExpr, TableFactor, TableWithJoins, Value, }; use thiserror_ext::AsReport; @@ -235,7 +235,25 @@ pub fn gen_query_from_table_name(from_name: ObjectName) -> Query { } // Plan like 'select * , pk in table order by pk' -pub fn gen_query_from_table_name_order_by(from_name: ObjectName, pk_names: Vec) -> Query { +pub fn gen_query_from_table_name_order_by( + from_name: ObjectName, + pks: Vec<(String, bool)>, + seek_pk_rows: Option>>, +) -> Query { + let select_pks = pks + .iter() + .filter_map( + |(name, is_hidden)| { + if *is_hidden { + Some(name.clone()) + } else { + None + } + }, + ) + .collect_vec(); + let order_pks = pks.iter().map(|(name, _)| name).collect_vec(); + let table_factor = TableFactor::Table { name: from_name, alias: None, @@ -246,24 +264,67 @@ pub fn gen_query_from_table_name_order_by(from_name: ObjectName, pk_names: Vec, core: generic::LogScan, + scan_ranges: Vec, } impl BatchLogSeqScan { - fn new_inner(core: generic::LogScan, dist: Distribution) -> Self { - let order = Order::new(core.table_desc.pk.clone()); + fn new_inner(core: generic::LogScan, dist: Distribution, scan_ranges: Vec) -> Self { + let order = if scan_ranges.len() > 1 { + Order::any() + } else { + Order::new(core.table_desc.pk.clone()) + }; let base = PlanBase::new_batch(core.ctx(), core.schema(), dist, order); - Self { base, core } + Self { + base, + core, + scan_ranges, + } } - pub fn new(core: generic::LogScan) -> Self { + pub fn new(core: generic::LogScan, scan_ranges: Vec) -> Self { // Use `Single` by default, will be updated later with `clone_with_dist`. - Self::new_inner(core, Distribution::Single) + Self::new_inner(core, Distribution::Single, scan_ranges) } fn clone_with_dist(&self) -> Self { @@ -62,6 +72,7 @@ impl BatchLogSeqScan { } } }, + self.scan_ranges.clone(), ) } @@ -91,6 +102,17 @@ impl Distill for BatchLogSeqScan { vec.push(("old_epoch", Pretty::from(self.core.old_epoch.to_string()))); vec.push(("new_epoch", Pretty::from(self.core.new_epoch.to_string()))); vec.push(("version_id", Pretty::from(self.core.version_id.to_string()))); + if !self.scan_ranges.is_empty() { + let order_names = match verbose { + true => self.core.order_names_with_table_prefix(), + false => self.core.order_names(), + }; + let range_strs = scan_ranges_as_strs(order_names, &self.scan_ranges); + vec.push(( + "scan_ranges", + Pretty::Array(range_strs.into_iter().map(Pretty::from).collect()), + )); + } childless_record("BatchLogSeqScan", vec) } @@ -131,6 +153,7 @@ impl TryToBatchPb for BatchLogSeqScan { }), // It's currently true. ordered: !self.order().is_any(), + scan_ranges: self.scan_ranges.iter().map(|r| r.to_protobuf()).collect(), })) } } @@ -144,7 +167,7 @@ impl ToLocalBatch for BatchLogSeqScan { } else { Distribution::SomeShard }; - Ok(Self::new_inner(self.core.clone(), dist).into()) + Ok(Self::new_inner(self.core.clone(), dist, self.scan_ranges.clone()).into()) } } diff --git a/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs b/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs index 576793f4dd45..addd83087322 100644 --- a/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs @@ -12,18 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::ops::Bound; - -use itertools::Itertools; use pretty_xmlish::{Pretty, XmlNode}; -use risingwave_common::types::ScalarImpl; use risingwave_common::util::scan_range::{is_full_range, ScanRange}; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::RowSeqScanNode; use risingwave_sqlparser::ast::AsOf; use super::batch::prelude::*; -use super::utils::{childless_record, to_pb_time_travel_as_of, Distill}; +use super::utils::{childless_record, scan_ranges_as_strs, to_pb_time_travel_as_of, Distill}; use super::{generic, ExprRewritable, PlanBase, PlanRef, ToDistributedBatch}; use crate::catalog::ColumnId; use crate::error::Result; @@ -135,37 +131,6 @@ impl BatchSeqScan { &self.scan_ranges } - fn scan_ranges_as_strs(&self, verbose: bool) -> Vec { - let order_names = match verbose { - true => self.core.order_names_with_table_prefix(), - false => self.core.order_names(), - }; - let mut range_strs = vec![]; - - let explain_max_range = 20; - for scan_range in self.scan_ranges.iter().take(explain_max_range) { - #[expect(clippy::disallowed_methods)] - let mut range_str = scan_range - .eq_conds - .iter() - .zip(order_names.iter()) - .map(|(v, name)| match v { - Some(v) => format!("{} = {:?}", name, v), - None => format!("{} IS NULL", name), - }) - .collect_vec(); - if !is_full_range(&scan_range.range) { - let i = scan_range.eq_conds.len(); - range_str.push(range_to_string(&order_names[i], &scan_range.range)) - } - range_strs.push(range_str.join(" AND ")); - } - if self.scan_ranges.len() > explain_max_range { - range_strs.push("...".to_string()); - } - range_strs - } - pub fn limit(&self) -> &Option { &self.limit } @@ -173,33 +138,6 @@ impl BatchSeqScan { impl_plan_tree_node_for_leaf! { BatchSeqScan } -fn lb_to_string(name: &str, lb: &Bound) -> String { - let (op, v) = match lb { - Bound::Included(v) => (">=", v), - Bound::Excluded(v) => (">", v), - Bound::Unbounded => unreachable!(), - }; - format!("{} {} {:?}", name, op, v) -} -fn ub_to_string(name: &str, ub: &Bound) -> String { - let (op, v) = match ub { - Bound::Included(v) => ("<=", v), - Bound::Excluded(v) => ("<", v), - Bound::Unbounded => unreachable!(), - }; - format!("{} {} {:?}", name, op, v) -} -fn range_to_string(name: &str, range: &(Bound, Bound)) -> String { - match (&range.0, &range.1) { - (Bound::Unbounded, Bound::Unbounded) => unreachable!(), - (Bound::Unbounded, ub) => ub_to_string(name, ub), - (lb, Bound::Unbounded) => lb_to_string(name, lb), - (lb, ub) => { - format!("{} AND {}", lb_to_string(name, lb), ub_to_string(name, ub)) - } - } -} - impl Distill for BatchSeqScan { fn distill<'a>(&self) -> XmlNode<'a> { let verbose = self.base.ctx().is_explain_verbose(); @@ -208,7 +146,11 @@ impl Distill for BatchSeqScan { vec.push(("columns", self.core.columns_pretty(verbose))); if !self.scan_ranges.is_empty() { - let range_strs = self.scan_ranges_as_strs(verbose); + let order_names = match verbose { + true => self.core.order_names_with_table_prefix(), + false => self.core.order_names(), + }; + let range_strs = scan_ranges_as_strs(order_names, &self.scan_ranges); vec.push(( "scan_ranges", Pretty::Array(range_strs.into_iter().map(Pretty::from).collect()), diff --git a/src/frontend/src/optimizer/plan_node/batch_sys_seq_scan.rs b/src/frontend/src/optimizer/plan_node/batch_sys_seq_scan.rs index 6068c1131626..007e983e67f3 100644 --- a/src/frontend/src/optimizer/plan_node/batch_sys_seq_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_sys_seq_scan.rs @@ -23,7 +23,7 @@ use risingwave_pb::batch_plan::SysRowSeqScanNode; use risingwave_pb::plan_common::PbColumnDesc; use super::batch::prelude::*; -use super::utils::{childless_record, Distill}; +use super::utils::{childless_record, range_to_string, scan_ranges_as_strs, Distill}; use super::{generic, ExprRewritable, PlanBase, PlanRef, ToBatchPb, ToDistributedBatch}; use crate::error::Result; use crate::expr::{ExprRewriter, ExprVisitor}; @@ -91,68 +91,10 @@ impl BatchSysSeqScan { pub fn scan_ranges(&self) -> &[ScanRange] { &self.scan_ranges } - - fn scan_ranges_as_strs(&self, verbose: bool) -> Vec { - let order_names = match verbose { - true => self.core.order_names_with_table_prefix(), - false => self.core.order_names(), - }; - let mut range_strs = vec![]; - - let explain_max_range = 20; - for scan_range in self.scan_ranges.iter().take(explain_max_range) { - #[expect(clippy::disallowed_methods)] - let mut range_str = scan_range - .eq_conds - .iter() - .zip(order_names.iter()) - .map(|(v, name)| match v { - Some(v) => format!("{} = {:?}", name, v), - None => format!("{} IS NULL", name), - }) - .collect_vec(); - if !is_full_range(&scan_range.range) { - let i = scan_range.eq_conds.len(); - range_str.push(range_to_string(&order_names[i], &scan_range.range)) - } - range_strs.push(range_str.join(" AND ")); - } - if self.scan_ranges.len() > explain_max_range { - range_strs.push("...".to_string()); - } - range_strs - } } impl_plan_tree_node_for_leaf! { BatchSysSeqScan } -fn lb_to_string(name: &str, lb: &Bound) -> String { - let (op, v) = match lb { - Bound::Included(v) => (">=", v), - Bound::Excluded(v) => (">", v), - Bound::Unbounded => unreachable!(), - }; - format!("{} {} {:?}", name, op, v) -} -fn ub_to_string(name: &str, ub: &Bound) -> String { - let (op, v) = match ub { - Bound::Included(v) => ("<=", v), - Bound::Excluded(v) => ("<", v), - Bound::Unbounded => unreachable!(), - }; - format!("{} {} {:?}", name, op, v) -} -fn range_to_string(name: &str, range: &(Bound, Bound)) -> String { - match (&range.0, &range.1) { - (Bound::Unbounded, Bound::Unbounded) => unreachable!(), - (Bound::Unbounded, ub) => ub_to_string(name, ub), - (lb, Bound::Unbounded) => lb_to_string(name, lb), - (lb, ub) => { - format!("{} AND {}", lb_to_string(name, lb), ub_to_string(name, ub)) - } - } -} - impl Distill for BatchSysSeqScan { fn distill<'a>(&self) -> XmlNode<'a> { let verbose = self.base.ctx().is_explain_verbose(); @@ -161,7 +103,11 @@ impl Distill for BatchSysSeqScan { vec.push(("columns", self.core.columns_pretty(verbose))); if !self.scan_ranges.is_empty() { - let range_strs = self.scan_ranges_as_strs(verbose); + let order_names = match verbose { + true => self.core.order_names_with_table_prefix(), + false => self.core.order_names(), + }; + let range_strs = scan_ranges_as_strs(order_names, &self.scan_ranges); vec.push(( "scan_ranges", Pretty::Array(range_strs.into_iter().map(Pretty::from).collect()), diff --git a/src/frontend/src/optimizer/plan_node/generic/log_scan.rs b/src/frontend/src/optimizer/plan_node/generic/log_scan.rs index d38b5b3e72d5..5e384dd3dc84 100644 --- a/src/frontend/src/optimizer/plan_node/generic/log_scan.rs +++ b/src/frontend/src/optimizer/plan_node/generic/log_scan.rs @@ -18,7 +18,7 @@ use std::rc::Rc; use educe::Educe; use fixedbitset::FixedBitSet; use pretty_xmlish::Pretty; -use risingwave_common::catalog::{Field, Schema, TableDesc}; +use risingwave_common::catalog::{ColumnDesc, Field, Schema, TableDesc}; use risingwave_common::types::DataType; use risingwave_common::util::sort_util::ColumnOrder; use risingwave_hummock_sdk::HummockVersionId; @@ -147,8 +147,7 @@ impl LogScan { } pub(crate) fn out_fields(&self) -> FixedBitSet { - let mut out_fields_vec = self - .output_col_idx.clone(); + let mut out_fields_vec = self.output_col_idx.clone(); // add op column out_fields_vec.push(self.output_col_idx.len()); FixedBitSet::from_iter(out_fields_vec) @@ -157,4 +156,24 @@ impl LogScan { pub(crate) fn ctx(&self) -> OptimizerContextRef { self.ctx.clone() } + + pub fn get_table_columns(&self) -> &[ColumnDesc] { + &self.table_desc.columns + } + + pub(crate) fn order_names(&self) -> Vec { + self.table_desc + .order_column_indices() + .iter() + .map(|&i| self.get_table_columns()[i].name.clone()) + .collect() + } + + pub(crate) fn order_names_with_table_prefix(&self) -> Vec { + self.table_desc + .order_column_indices() + .iter() + .map(|&i| format!("{}.{}", self.table_name, self.get_table_columns()[i].name)) + .collect() + } } diff --git a/src/frontend/src/optimizer/plan_node/utils.rs b/src/frontend/src/optimizer/plan_node/utils.rs index 2433a659bad0..22f28cb0d973 100644 --- a/src/frontend/src/optimizer/plan_node/utils.rs +++ b/src/frontend/src/optimizer/plan_node/utils.rs @@ -14,6 +14,7 @@ use std::collections::HashMap; use std::default::Default; +use std::ops::Bound; use std::vec; use anyhow::anyhow; @@ -28,6 +29,8 @@ use risingwave_common::constants::log_store::v2::{ KV_LOG_STORE_PREDEFINED_COLUMNS, PK_ORDERING, VNODE_COLUMN_INDEX, }; use risingwave_common::hash::VnodeCount; +use risingwave_common::types::ScalarImpl; +use risingwave_common::util::scan_range::{is_full_range, ScanRange}; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; use crate::catalog::table_catalog::TableType; @@ -462,3 +465,58 @@ pub fn to_pb_time_travel_as_of(a: &Option) -> Result> { as_of_type: Some(as_of_type), })) } + +pub fn scan_ranges_as_strs(order_names: Vec, scan_ranges: &Vec) -> Vec { + let mut range_strs = vec![]; + + let explain_max_range = 20; + for scan_range in scan_ranges.iter().take(explain_max_range) { + #[expect(clippy::disallowed_methods)] + let mut range_str = scan_range + .eq_conds + .iter() + .zip(order_names.iter()) + .map(|(v, name)| match v { + Some(v) => format!("{} = {:?}", name, v), + None => format!("{} IS NULL", name), + }) + .collect_vec(); + if !is_full_range(&scan_range.range) { + let i = scan_range.eq_conds.len(); + range_str.push(range_to_string(&order_names[i], &scan_range.range)) + } + range_strs.push(range_str.join(" AND ")); + } + if scan_ranges.len() > explain_max_range { + range_strs.push("...".to_string()); + } + range_strs +} + +pub fn range_to_string(name: &str, range: &(Bound, Bound)) -> String { + match (&range.0, &range.1) { + (Bound::Unbounded, Bound::Unbounded) => unreachable!(), + (Bound::Unbounded, ub) => ub_to_string(name, ub), + (lb, Bound::Unbounded) => lb_to_string(name, lb), + (lb, ub) => { + format!("{} AND {}", lb_to_string(name, lb), ub_to_string(name, ub)) + } + } +} + +fn lb_to_string(name: &str, lb: &Bound) -> String { + let (op, v) = match lb { + Bound::Included(v) => (">=", v), + Bound::Excluded(v) => (">", v), + Bound::Unbounded => unreachable!(), + }; + format!("{} {} {:?}", name, op, v) +} +fn ub_to_string(name: &str, ub: &Bound) -> String { + let (op, v) = match ub { + Bound::Included(v) => ("<=", v), + Bound::Excluded(v) => ("<", v), + Bound::Unbounded => unreachable!(), + }; + format!("{} {} {:?}", name, op, v) +} diff --git a/src/frontend/src/session/cursor_manager.rs b/src/frontend/src/session/cursor_manager.rs index df83909e58f6..638acfeb872f 100644 --- a/src/frontend/src/session/cursor_manager.rs +++ b/src/frontend/src/session/cursor_manager.rs @@ -27,19 +27,23 @@ use itertools::Itertools; use pgwire::pg_field_descriptor::PgFieldDescriptor; use pgwire::pg_response::StatementType; use pgwire::types::{Format, Row}; +use prost::Message; use risingwave_common::catalog::Field; use risingwave_common::error::BoxedError; use risingwave_common::session_config::QueryMode; -use risingwave_common::types::DataType; +use risingwave_common::types::{DataType, ScalarImpl}; use risingwave_common::util::iter_util::ZipEqFast; +use risingwave_common::util::scan_range; use risingwave_common::util::sort_util::ColumnOrder; use risingwave_hummock_sdk::HummockVersionId; +use risingwave_pb::expr::expr_node::Type; use risingwave_sqlparser::ast::{Ident, ObjectName, Statement}; use super::SessionImpl; use crate::catalog::subscription_catalog::SubscriptionCatalog; use crate::catalog::TableId; use crate::error::{ErrorCode, Result}; +use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef, Literal}; use crate::handler::declare_cursor::create_chunk_stream_for_cursor; use crate::handler::query::{ gen_batch_plan_by_statement, gen_batch_plan_fragmenter, BatchQueryPlanResult, @@ -50,10 +54,11 @@ use crate::handler::util::{ }; use crate::handler::HandlerArgs; use crate::monitor::{CursorMetrics, PeriodicCursorMetrics}; -use crate::optimizer::plan_node::{generic, BatchLogSeqScan}; +use crate::optimizer::plan_node::{generic, BatchFilter, BatchLogSeqScan}; use crate::optimizer::property::{Order, RequiredDist}; use crate::optimizer::PlanRoot; use crate::scheduler::{DistributedQueryStream, LocalQueryStream}; +use crate::utils::Condition; use crate::{ Binder, OptimizerContext, OptimizerContextRef, PgResponseStream, PlanRef, TableCatalog, }; @@ -269,7 +274,7 @@ pub struct SubscriptionCursor { fields: Vec, cursor_metrics: Arc, last_fetch: Instant, - pk_column_names: HashMap, + pk_column_names: HashMap, seek_pk_row: Option>>, } @@ -282,7 +287,7 @@ impl SubscriptionCursor { handler_args: &HandlerArgs, cursor_metrics: Arc, ) -> Result { - let (state, fields,pk_column_names) = if let Some(start_timestamp) = start_timestamp { + let (state, fields, pk_column_names) = if let Some(start_timestamp) = start_timestamp { let table_catalog = handler_args.session.get_table_by_id(&dependent_table_id)?; let fields = table_catalog .columns @@ -305,8 +310,8 @@ impl SubscriptionCursor { // future fetch on the cursor starts from the snapshot when the cursor is declared. // // TODO: is this the right behavior? Should we delay the query stream initiation till the first fetch? - let (chunk_stream, fields, init_query_timer,pk_column_names) = - Self::initiate_query(None, &dependent_table_id, handler_args.clone()).await?; + let (chunk_stream, fields, init_query_timer, pk_column_names) = + Self::initiate_query(None, &dependent_table_id, handler_args.clone(), None).await?; let pinned_epoch = handler_args .session .env @@ -377,6 +382,7 @@ impl SubscriptionCursor { Some(rw_timestamp), &self.dependent_table_id, handler_args.clone(), + None, ) .await?; Self::init_row_stream( @@ -386,6 +392,26 @@ impl SubscriptionCursor { &fields, handler_args.session.clone(), ); + { + let (mut chunk_stream, fields, init_query_timer, pk_column_names) = + Self::initiate_query( + Some(rw_timestamp), + &self.dependent_table_id, + handler_args.clone(), + self.seek_pk_row.clone(), + ) + .await?; + Self::init_row_stream( + &mut chunk_stream, + formats, + &from_snapshot, + &fields, + handler_args.session.clone(), + ); + while let Some(a) = chunk_stream.next().await? { + println!("testtest {:?}", a); + } + } self.cursor_need_drop_time = Instant::now() + Duration::from_secs(self.subscription.retention_seconds); @@ -401,7 +427,8 @@ impl SubscriptionCursor { expected_timestamp, init_query_timer, }; - if self.fields.ne(&fields) || self.pk_column_names.ne(&pk_column_names) { + if self.fields.ne(&fields) || self.pk_column_names.ne(&pk_column_names) + { self.fields = fields; self.pk_column_names = pk_column_names; return Ok(None); @@ -556,8 +583,9 @@ impl SubscriptionCursor { } } self.last_fetch = Instant::now(); - let (fields,rows,seek_pk_row) = Self::process_output_desc_row(&self.fields, ans, &self.pk_column_names); - if let Some(seek_pk_row) = seek_pk_row{ + let (fields, rows, seek_pk_row) = + Self::process_output_desc_row(&self.fields, ans, &self.pk_column_names); + if let Some(seek_pk_row) = seek_pk_row { self.seek_pk_row = Some(seek_pk_row); } let desc = fields.iter().map(to_pg_field).collect(); @@ -603,6 +631,7 @@ impl SubscriptionCursor { Some(0), &self.dependent_table_id, handler_args, + self.seek_pk_row.clone(), ), State::Fetch { from_snapshot, @@ -614,12 +643,14 @@ impl SubscriptionCursor { None, &self.dependent_table_id, handler_args, + self.seek_pk_row.clone(), ) } else { Self::init_batch_plan_for_subscription_cursor( Some(rw_timestamp), &self.dependent_table_id, handler_args, + self.seek_pk_row.clone(), ) } } @@ -634,10 +665,10 @@ impl SubscriptionCursor { rw_timestamp: Option, dependent_table_id: &TableId, handler_args: HandlerArgs, + seek_pk_row: Option>>, ) -> Result { let session = handler_args.clone().session; let table_catalog = session.get_table_by_id(dependent_table_id)?; - let pks = table_catalog.pk(); let context = OptimizerContext::from_handler_args(handler_args.clone()); if let Some(rw_timestamp) = rw_timestamp { let version_id = { @@ -659,25 +690,23 @@ impl SubscriptionCursor { rw_timestamp, rw_timestamp, version_id, - pks, + seek_pk_row, ) } else { - let subscription_from_table_name = - ObjectName(vec![Ident::from(table_catalog.name.as_ref())]); - let pk_names = pks + let pks = table_catalog.pk(); + let pks = pks .iter() .map(|f| { - table_catalog - .columns - .get(f.column_index) - .unwrap() - .name() - .to_string() + let pk = table_catalog.columns.get(f.column_index).unwrap(); + (pk.name().to_string(), pk.is_hidden) }) .collect_vec(); + let subscription_from_table_name = + ObjectName(vec![Ident::from(table_catalog.name.as_ref())]); let query_stmt = Statement::Query(Box::new(gen_query_from_table_name_order_by( subscription_from_table_name, - pk_names, + pks, + seek_pk_row, ))); gen_batch_plan_by_statement(&session, context.into(), query_stmt) } @@ -687,7 +716,13 @@ impl SubscriptionCursor { rw_timestamp: Option, dependent_table_id: &TableId, handler_args: HandlerArgs, - ) -> Result<(CursorDataChunkStream, Vec, Instant, HashMap)> { + seek_pk_row: Option>>, + ) -> Result<( + CursorDataChunkStream, + Vec, + Instant, + HashMap, + )> { let init_query_timer = Instant::now(); let session = handler_args.clone().session; let table_catalog = session.get_table_by_id(dependent_table_id)?; @@ -697,6 +732,7 @@ impl SubscriptionCursor { rw_timestamp, dependent_table_id, handler_args.clone(), + seek_pk_row, )?; let plan_fragmenter_result = gen_batch_plan_fragmenter(&handler_args.session, plan_result)?; let (chunk_stream, fields) = @@ -752,29 +788,46 @@ impl SubscriptionCursor { Ok(row) } - pub fn process_output_desc_row(descs: &Vec, mut rows: Vec,pk_column_names: &HashMap) -> (Vec,Vec,Option>>) { - let last_row = rows.last_mut().map(|row|{ - row.0.iter().zip_eq_fast(descs.iter()).filter_map(|(data,field)|{ - if pk_column_names.contains_key(&field.name){ - Some(data.clone()) + pub fn process_output_desc_row( + descs: &Vec, + mut rows: Vec, + pk_column_names: &HashMap, + ) -> (Vec, Vec, Option>>) { + let last_row = rows.last_mut().map(|row| { + row.0 + .iter() + .zip_eq_fast(descs.iter()) + .filter_map(|(data, field)| { + if pk_column_names.contains_key(&field.name) { + Some(data.clone()) + } else { + None + } + }) + .collect_vec() + }); + let iter = descs + .iter() + .map(|field| { + if let Some(is_hidden) = pk_column_names.get(&field.name) + && *is_hidden + { + (false, field) } else { - None + (true, field) } - }).collect_vec() - }); - let iter= descs.iter().map(|field| { - if let Some(is_hidden) = pk_column_names.get(&field.name) && *is_hidden{ - (false,field) - } else { - (true,field) - } - }).collect_vec(); - let pk_fields = iter.iter().filter(|(is_hidden,_)| *is_hidden).map(|(_,field)| (*field).clone()).collect(); - let mut pk_keep = iter.iter().map(|(is_hidden,_)| *is_hidden); + }) + .collect_vec(); + let pk_fields = iter + .iter() + .filter(|(is_hidden, _)| *is_hidden) + .map(|(_, field)| (*field).clone()) + .collect(); + let mut pk_keep = iter.iter().map(|(is_hidden, _)| *is_hidden); rows.iter_mut().for_each(|row| { row.0.retain(|_| pk_keep.next().unwrap()); }); - (pk_fields,rows,last_row) + (pk_fields, rows, last_row) } pub fn build_desc(mut descs: Vec, from_snapshot: bool) -> Vec { @@ -792,7 +845,7 @@ impl SubscriptionCursor { old_epoch: u64, new_epoch: u64, version_id: HummockVersionId, - pks: &[ColumnOrder], + seek_pk_row: Option>>, ) -> Result { // pk + all column without hidden let output_col_idx = table_catalog @@ -807,6 +860,7 @@ impl SubscriptionCursor { } }) .collect::>(); + let max_split_range_gap = context.session_ctx().config().max_split_range_gap() as u64; let core = generic::LogScan::new( table_catalog.name.clone(), output_col_idx, @@ -816,18 +870,105 @@ impl SubscriptionCursor { new_epoch, version_id, ); + let pks = table_catalog.pk(); + let pks = pks + .iter() + .map(|f| { + let pk = table_catalog.columns.get(f.column_index).unwrap(); + (pk.name().to_string(), pk.data_type(), f.column_index) + }) + .collect_vec(); + // let selection = if let Some(seek_pk_rows) = seek_pk_rows { + // let mut pk_rows = vec![]; + // let mut values = vec![]; + // for ((name, _), seek_pk) in pks.iter().zip_eq_fast(seek_pk_rows.iter()) { + // if let Some(seek_pk) = seek_pk { + // pk_rows.push( + // Expr::Identifier(Ident::with_quote_unchecked( + // '"', + // name.clone(), + // )) + // ); + // values.push(String::from_utf8(seek_pk.clone().into()).unwrap()); + // } + // } + // if pk_rows.is_empty() { + // None + // } else if pk_rows.len() == 1 { + // let left = pk_rows.pop().unwrap(); + // let right = Expr::Value(Value::SingleQuotedString(values.pop().unwrap())); + // Some(Expr::BinaryOp { + // left: Box::new(left), + // op: BinaryOperator::Eq, + // right: Box::new(right), + // }) + // }else{ + // let left = Expr::Row(pk_rows); + // let values = values.join(","); + // let right = Expr::Value(Value::SingleQuotedString(format!("({})", values))); + // Some(Expr::BinaryOp { + // left: Box::new(left), + // op: BinaryOperator::Gt, + // right: Box::new(right), + // }) + // } + // } else{ + // None + // }; + let (scan, predicate) = match seek_pk_row { + Some(seek_pk_row) => { + let seek_pk_row = seek_pk_row + .into_iter() + .zip_eq_fast(pks.into_iter()) + .filter_map(|(pk, (name, data_type, column_index))| { + if let Some(seek_pk) = pk { + let column = InputRef { + index: column_index, + data_type: data_type.clone(), + }; + let value_string = String::from_utf8(seek_pk.clone().into()).unwrap(); + let value_data = + ScalarImpl::from_text(&value_string, data_type).unwrap(); + let value = Literal::new(Some(value_data), data_type.clone()); + Some( + FunctionCall::new( + ExprType::LessThan, + vec![column.into(), value.into()], + ) + .unwrap() + .into(), + ) + } else { + None + } + }) + .collect_vec(); + let (scan, predicate) = Condition { + conjunctions: seek_pk_row, + } + .split_to_scan_ranges(table_catalog.table_desc().into(), max_split_range_gap) + .unwrap(); + (scan, Some(predicate)) + } + None => (vec![], None), + }; - let batch_log_seq_scan = BatchLogSeqScan::new(core); - + let batch_log_seq_scan = BatchLogSeqScan::new(core, scan); let out_fields = batch_log_seq_scan.core().out_fields(); let out_names = batch_log_seq_scan.core().column_names(); + let plan = if let Some(predicate) = predicate { + BatchFilter::new(generic::Filter::new(predicate, batch_log_seq_scan.into())).into() + } else { + batch_log_seq_scan.into() + }; + // order by pk, so don't need to sort - let order = Order::new(pks.to_vec()); + let order = Order::new(table_catalog.pk().to_vec()); // Here we just need a plan_root to call the method, only out_fields and out_names will be used let plan_root = PlanRoot::new_with_batch_plan( - PlanRef::from(batch_log_seq_scan.clone()), + plan, RequiredDist::single(), order, out_fields, @@ -1075,16 +1216,11 @@ impl CursorManager { } } -fn get_pk_names(pks: &[ColumnOrder], table_catalog: &TableCatalog) -> HashMap { - pks - .iter() +fn get_pk_names(pks: &[ColumnOrder], table_catalog: &TableCatalog) -> HashMap { + pks.iter() .map(|f| { - let column = - table_catalog - .columns - .get(f.column_index) - .unwrap(); - (column.name().to_string(),column.is_hidden) + let column = table_catalog.columns.get(f.column_index).unwrap(); + (column.name().to_string(), column.is_hidden) }) .collect() -} \ No newline at end of file +} diff --git a/src/storage/src/table/batch_table/storage_table.rs b/src/storage/src/table/batch_table/storage_table.rs index a665a37be68e..e00a76286932 100644 --- a/src/storage/src/table/batch_table/storage_table.rs +++ b/src/storage/src/table/batch_table/storage_table.rs @@ -756,10 +756,11 @@ impl StorageTableInner { start_epoch: u64, end_epoch: HummockReadEpoch, ordered: bool, + range_bounds: impl RangeBounds, + pk_prefix: impl Row, ) -> StorageResult> + Send + 'static> { - let pk_prefix = OwnedRow::default(); - let start_key = self.serialize_pk_bound(&pk_prefix, Unbounded, true); - let end_key = self.serialize_pk_bound(&pk_prefix, Unbounded, false); + let start_key = self.serialize_pk_bound(&pk_prefix, range_bounds.start_bound(), true); + let end_key = self.serialize_pk_bound(&pk_prefix, range_bounds.end_bound(), false); assert!(pk_prefix.len() <= self.pk_indices.len()); let table_key_ranges = { diff --git a/src/stream/src/executor/backfill/snapshot_backfill.rs b/src/stream/src/executor/backfill/snapshot_backfill.rs index 7234a56deca2..3e5a644fadb0 100644 --- a/src/stream/src/executor/backfill/snapshot_backfill.rs +++ b/src/stream/src/executor/backfill/snapshot_backfill.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use core::ops::Bound; use std::cmp::min; use std::collections::VecDeque; use std::future::{pending, Future}; @@ -199,6 +200,8 @@ impl SnapshotBackfillExecutor { barrier_epoch.prev, HummockReadEpoch::Committed(barrier_epoch.prev), false, + (Bound::::Unbounded, Bound::::Unbounded), + OwnedRow::default(), )) .await?; let data_types = self.upstream_table.schema().data_types(); From dae04dbcf873db49038356ae9d9b3b59853487f3 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Wed, 6 Nov 2024 19:45:21 +0800 Subject: [PATCH 5/7] save --- src/expr/impl/src/scalar/cast.rs | 8 ++ src/frontend/src/handler/mod.rs | 2 +- src/frontend/src/handler/util.rs | 2 +- src/frontend/src/session/cursor_manager.rs | 122 +++++++++------------ 4 files changed, 60 insertions(+), 74 deletions(-) diff --git a/src/expr/impl/src/scalar/cast.rs b/src/expr/impl/src/scalar/cast.rs index 41c51d95445e..1b065efed985 100644 --- a/src/expr/impl/src/scalar/cast.rs +++ b/src/expr/impl/src/scalar/cast.rs @@ -47,6 +47,14 @@ where }) } +#[function("cast(varchar) -> struct", type_infer = "unreachable")] +pub fn str_parse_struct(elem: &str, ctx: &Context) -> Result{ + match &ctx.return_type { + risingwave_common::types::DataType::Struct(s) => Ok(StructValue::from_str(elem, s).map_err(|e| ExprError::Parse(format!("error: {:?}",e.as_report()).into()))?), + _ => return Err(ExprError::Parse("unsupported type".into())), + } +} + // TODO: introduce `FromBinary` and support all types #[function("pgwire_recv(bytea) -> int8")] pub fn pgwire_recv(elem: &[u8]) -> Result { diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index e0bd5a5efae2..c9ae3749cb34 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -540,7 +540,7 @@ pub async fn handle( Statement::Query(_) | Statement::Insert { .. } | Statement::Delete { .. } - | Statement::Update { .. } => query::handle_query(handler_args, stmt, formats).await, + | Statement::Update { .. } => Ok(query::handle_query(handler_args, stmt, formats).await.unwrap()), Statement::CreateView { materialized, if_not_exists, diff --git a/src/frontend/src/handler/util.rs b/src/frontend/src/handler/util.rs index 4fd38efb7b99..4d412a09c628 100644 --- a/src/frontend/src/handler/util.rs +++ b/src/frontend/src/handler/util.rs @@ -290,7 +290,7 @@ pub fn gen_query_from_table_name_order_by( let right = Expr::Value(Value::SingleQuotedString(values.pop().unwrap())); Some(Expr::BinaryOp { left: Box::new(left), - op: BinaryOperator::Eq, + op: BinaryOperator::Gt, right: Box::new(right), }) }else{ diff --git a/src/frontend/src/session/cursor_manager.rs b/src/frontend/src/session/cursor_manager.rs index 638acfeb872f..bccafc750790 100644 --- a/src/frontend/src/session/cursor_manager.rs +++ b/src/frontend/src/session/cursor_manager.rs @@ -31,13 +31,13 @@ use prost::Message; use risingwave_common::catalog::Field; use risingwave_common::error::BoxedError; use risingwave_common::session_config::QueryMode; -use risingwave_common::types::{DataType, ScalarImpl}; +use risingwave_common::types::{DataType, ScalarImpl, StructType, StructValue}; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::scan_range; use risingwave_common::util::sort_util::ColumnOrder; use risingwave_hummock_sdk::HummockVersionId; use risingwave_pb::expr::expr_node::Type; -use risingwave_sqlparser::ast::{Ident, ObjectName, Statement}; +use risingwave_sqlparser::ast::{BinaryOperator, Expr, Ident, ObjectName, Statement, Value}; use super::SessionImpl; use crate::catalog::subscription_catalog::SubscriptionCatalog; @@ -845,7 +845,7 @@ impl SubscriptionCursor { old_epoch: u64, new_epoch: u64, version_id: HummockVersionId, - seek_pk_row: Option>>, + seek_pk_rows: Option>>, ) -> Result { // pk + all column without hidden let output_col_idx = table_catalog @@ -878,79 +878,57 @@ impl SubscriptionCursor { (pk.name().to_string(), pk.data_type(), f.column_index) }) .collect_vec(); - // let selection = if let Some(seek_pk_rows) = seek_pk_rows { - // let mut pk_rows = vec![]; - // let mut values = vec![]; - // for ((name, _), seek_pk) in pks.iter().zip_eq_fast(seek_pk_rows.iter()) { - // if let Some(seek_pk) = seek_pk { - // pk_rows.push( - // Expr::Identifier(Ident::with_quote_unchecked( - // '"', - // name.clone(), - // )) - // ); - // values.push(String::from_utf8(seek_pk.clone().into()).unwrap()); - // } - // } - // if pk_rows.is_empty() { - // None - // } else if pk_rows.len() == 1 { - // let left = pk_rows.pop().unwrap(); - // let right = Expr::Value(Value::SingleQuotedString(values.pop().unwrap())); - // Some(Expr::BinaryOp { - // left: Box::new(left), - // op: BinaryOperator::Eq, - // right: Box::new(right), - // }) - // }else{ - // let left = Expr::Row(pk_rows); - // let values = values.join(","); - // let right = Expr::Value(Value::SingleQuotedString(format!("({})", values))); - // Some(Expr::BinaryOp { - // left: Box::new(left), - // op: BinaryOperator::Gt, - // right: Box::new(right), - // }) - // } - // } else{ - // None - // }; - let (scan, predicate) = match seek_pk_row { - Some(seek_pk_row) => { - let seek_pk_row = seek_pk_row - .into_iter() - .zip_eq_fast(pks.into_iter()) - .filter_map(|(pk, (name, data_type, column_index))| { - if let Some(seek_pk) = pk { - let column = InputRef { - index: column_index, - data_type: data_type.clone(), - }; - let value_string = String::from_utf8(seek_pk.clone().into()).unwrap(); - let value_data = - ScalarImpl::from_text(&value_string, data_type).unwrap(); - let value = Literal::new(Some(value_data), data_type.clone()); - Some( - FunctionCall::new( - ExprType::LessThan, - vec![column.into(), value.into()], - ) - .unwrap() - .into(), - ) - } else { - None + let (scan, predicate) = if let Some(seek_pk_rows) = seek_pk_rows { + let mut pk_rows = vec![]; + let mut values = vec![]; + for (seek_pk, (name, data_type, column_index)) in seek_pk_rows + .into_iter() + .zip_eq_fast(pks.into_iter()) { + if let Some(seek_pk) = seek_pk { + pk_rows.push( + InputRef { + index: column_index, + data_type: data_type.clone(), } - }) - .collect_vec(); - let (scan, predicate) = Condition { - conjunctions: seek_pk_row, + ); + let value_string = String::from_utf8(seek_pk.clone().into()).unwrap(); + let value_data = + ScalarImpl::from_text(&value_string, data_type).unwrap(); + values.push((Some(value_data),data_type.clone())); } - .split_to_scan_ranges(table_catalog.table_desc().into(), max_split_range_gap) - .unwrap(); + } + if pk_rows.is_empty() { + (vec![], None) + } else if pk_rows.len() == 1 { + let left = pk_rows.pop().unwrap(); + let (right_data,right_type) = values.pop().unwrap(); + let (scan, predicate) = Condition { + conjunctions: vec![FunctionCall::new( + ExprType::GreaterThan, + vec![left.into(), Literal::new(right_data, right_type).into()], + )?.into()], + }.split_to_scan_ranges(table_catalog.table_desc().into(), max_split_range_gap)?; + (scan, Some(predicate)) + }else{ + let (right_datas,right_types):(Vec<_>,Vec<_>) = values.into_iter().unzip(); + let right_data = ScalarImpl::Struct(StructValue::new(right_datas)); + let right_type = DataType::Struct(StructType::unnamed(right_types)); + let left = FunctionCall::new_unchecked( + ExprType::Row, + pk_rows.into_iter().map(|pk| pk.into()).collect(), + right_type.clone(), + ); + let right = Literal::new(Some(right_data), right_type); + let (scan, predicate) = Condition { + conjunctions: vec![FunctionCall::new( + ExprType::GreaterThan, + vec![left.into(), right.into()], + )?.into()], + }.split_to_scan_ranges(table_catalog.table_desc().into(), max_split_range_gap)?; (scan, Some(predicate)) } - None => (vec![], None), + } else{ + (vec![], None) }; let batch_log_seq_scan = BatchLogSeqScan::new(core, scan); From e4770afd7554ab157d1217ff5ab64c6384c6857d Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Thu, 7 Nov 2024 14:47:16 +0800 Subject: [PATCH 6/7] support --- src/batch/src/executor/log_row_seq_scan.rs | 2 - src/batch/src/executor/row_seq_scan.rs | 6 +- src/common/src/array/struct_array.rs | 91 ++++++++++++++++++- src/expr/impl/src/scalar/cast.rs | 7 +- src/frontend/src/handler/mod.rs | 4 +- src/frontend/src/handler/util.rs | 16 ++-- .../optimizer/plan_node/batch_sys_seq_scan.rs | 6 +- src/frontend/src/session/cursor_manager.rs | 80 ++++++---------- 8 files changed, 132 insertions(+), 80 deletions(-) diff --git a/src/batch/src/executor/log_row_seq_scan.rs b/src/batch/src/executor/log_row_seq_scan.rs index cde3ae5ee863..7ca89871c026 100644 --- a/src/batch/src/executor/log_row_seq_scan.rs +++ b/src/batch/src/executor/log_row_seq_scan.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use futures::prelude::stream::StreamExt; use futures_async_stream::try_stream; use futures_util::pin_mut; -use iceberg::scan; use prometheus::Histogram; use risingwave_common::array::{DataChunk, Op}; use risingwave_common::bitmap::Bitmap; @@ -26,7 +25,6 @@ use risingwave_common::catalog::{ColumnId, Field, Schema}; use risingwave_common::hash::VnodeCountCompat; use risingwave_common::row::{Row, RowExt}; use risingwave_common::types::{DataType, ScalarImpl}; -use risingwave_common::util::scan_range; use risingwave_hummock_sdk::{HummockReadEpoch, HummockVersionId}; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::common::{batch_query_epoch, BatchQueryEpoch}; diff --git a/src/batch/src/executor/row_seq_scan.rs b/src/batch/src/executor/row_seq_scan.rs index 303ddf4270aa..df0a060d6b32 100644 --- a/src/batch/src/executor/row_seq_scan.rs +++ b/src/batch/src/executor/row_seq_scan.rs @@ -11,7 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -use std::ops::{Bound, Deref}; +use std::ops::Deref; use std::sync::Arc; use futures::{pin_mut, StreamExt}; @@ -23,11 +23,9 @@ use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::{ColumnId, Schema}; use risingwave_common::hash::VnodeCountCompat; use risingwave_common::row::{OwnedRow, Row}; -use risingwave_common::types::{DataType, Datum}; +use risingwave_common::types::DataType; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; -use risingwave_common::util::value_encoding::deserialize_datum; use risingwave_pb::batch_plan::plan_node::NodeBody; -use risingwave_pb::batch_plan::{scan_range, PbScanRange}; use risingwave_pb::common::BatchQueryEpoch; use risingwave_pb::plan_common::as_of::AsOfType; use risingwave_pb::plan_common::{as_of, PbAsOf, StorageTableDesc}; diff --git a/src/common/src/array/struct_array.rs b/src/common/src/array/struct_array.rs index 10ded3a64d66..37841d978b97 100644 --- a/src/common/src/array/struct_array.rs +++ b/src/common/src/array/struct_array.rs @@ -361,11 +361,27 @@ impl StructValue { if !s.ends_with(')') { return Err("Missing right parenthesis".into()); } - let mut fields = Vec::with_capacity(s.len()); - for (s, ty) in s[1..s.len() - 1].split(',').zip_eq_debug(ty.types()) { - let datum = match s.trim() { + let s = &s[1..s.len() - 1]; + let mut split_str = Vec::with_capacity(ty.len()); + let mut left_parenthesis_num = 0; + let mut start = 0; + for (i, c) in s.char_indices() { + match c { + '(' => left_parenthesis_num += 1, + ')' => left_parenthesis_num -= 1, + ',' if left_parenthesis_num == 0 => { + split_str.push(&s[start..i]); + start = i + 1; + } + _ => {} + } + } + split_str.push(&s[start..=(s.len() - 1)]); + let mut fields = Vec::with_capacity(ty.len()); + for (str, ty) in split_str.iter().zip_eq_debug(ty.types()) { + let datum = match str.trim() { "" => None, - s => Some(ScalarImpl::from_text(s, ty)?), + s => Some(ScalarImpl::from_text(s, ty).unwrap()), }; fields.push(datum); } @@ -832,4 +848,71 @@ mod tests { test("{1,2}", r#""{1,2}""#); test(r#"{"f": 1}"#, r#""{""f"": 1}""#); } + + #[test] + fn test_from_str_nested_struct() { + let struct_str = "(1,sad ,(3, 4.0),(1,( 2,(3,(4,(5, 6))) )) )"; + let struct_type = StructType::unnamed(vec![ + DataType::Int32, + DataType::Varchar, + DataType::new_unnamed_struct(vec![DataType::Int32, DataType::Float64]), + DataType::new_unnamed_struct(vec![ + DataType::Int32, + DataType::new_unnamed_struct(vec![ + DataType::Int32, + DataType::new_unnamed_struct(vec![ + DataType::Int32, + DataType::new_unnamed_struct(vec![ + DataType::Int32, + DataType::new_unnamed_struct(vec![DataType::Int32, DataType::Int32]), + ]), + ]), + ]), + ]), + ]); + let struct_value = StructValue::from_str(struct_str, &struct_type).unwrap(); + let expected = StructValue::new(vec![ + Some(1.to_scalar_value()), + Some("sad".into()), + Some( + StructValue::new(vec![ + Some(3.to_scalar_value()), + Some(ScalarImpl::Float64(4.0.into())), + ]) + .to_scalar_value(), + ), + Some( + StructValue::new(vec![ + Some(1.to_scalar_value()), + Some( + StructValue::new(vec![ + Some(2.to_scalar_value()), + Some( + StructValue::new(vec![ + Some(3.to_scalar_value()), + Some( + StructValue::new(vec![ + Some(4.to_scalar_value()), + Some( + StructValue::new(vec![ + Some(5.to_scalar_value()), + Some(6.to_scalar_value()), + ]) + .to_scalar_value(), + ), + ]) + .to_scalar_value(), + ), + ]) + .to_scalar_value(), + ), + ]) + .to_scalar_value(), + ), + ]) + .to_scalar_value(), + ), + ]); + assert_eq!(struct_value, expected); + } } diff --git a/src/expr/impl/src/scalar/cast.rs b/src/expr/impl/src/scalar/cast.rs index 1b065efed985..63348df481e2 100644 --- a/src/expr/impl/src/scalar/cast.rs +++ b/src/expr/impl/src/scalar/cast.rs @@ -48,10 +48,11 @@ where } #[function("cast(varchar) -> struct", type_infer = "unreachable")] -pub fn str_parse_struct(elem: &str, ctx: &Context) -> Result{ +pub fn str_parse_struct(elem: &str, ctx: &Context) -> Result { match &ctx.return_type { - risingwave_common::types::DataType::Struct(s) => Ok(StructValue::from_str(elem, s).map_err(|e| ExprError::Parse(format!("error: {:?}",e.as_report()).into()))?), - _ => return Err(ExprError::Parse("unsupported type".into())), + risingwave_common::types::DataType::Struct(s) => Ok(StructValue::from_str(elem, s) + .map_err(|e| ExprError::Parse(format!("error: {:?}", e.as_report()).into()))?), + _ => Err(ExprError::Parse("unsupported type".into())), } } diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index c9ae3749cb34..cb0d19bab395 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -540,7 +540,9 @@ pub async fn handle( Statement::Query(_) | Statement::Insert { .. } | Statement::Delete { .. } - | Statement::Update { .. } => Ok(query::handle_query(handler_args, stmt, formats).await.unwrap()), + | Statement::Update { .. } => Ok(query::handle_query(handler_args, stmt, formats) + .await + .unwrap()), Statement::CreateView { materialized, if_not_exists, diff --git a/src/frontend/src/handler/util.rs b/src/frontend/src/handler/util.rs index 4d412a09c628..e82bcd51900f 100644 --- a/src/frontend/src/handler/util.rs +++ b/src/frontend/src/handler/util.rs @@ -274,12 +274,10 @@ pub fn gen_query_from_table_name_order_by( let mut values = vec![]; for ((name, _), seek_pk) in pks.iter().zip_eq_fast(seek_pk_rows.iter()) { if let Some(seek_pk) = seek_pk { - pk_rows.push( - Expr::Identifier(Ident::with_quote_unchecked( - '"', - name.clone(), - )) - ); + pk_rows.push(Expr::Identifier(Ident::with_quote_unchecked( + '"', + name.clone(), + ))); values.push(String::from_utf8(seek_pk.clone().into()).unwrap()); } } @@ -293,7 +291,7 @@ pub fn gen_query_from_table_name_order_by( op: BinaryOperator::Gt, right: Box::new(right), }) - }else{ + } else { let left = Expr::Row(pk_rows); let values = values.join(","); let right = Expr::Value(Value::SingleQuotedString(format!("({})", values))); @@ -303,13 +301,13 @@ pub fn gen_query_from_table_name_order_by( right: Box::new(right), }) } - } else{ + } else { None }; let select = Select { - from, projection, + from, selection, ..Default::default() }; diff --git a/src/frontend/src/optimizer/plan_node/batch_sys_seq_scan.rs b/src/frontend/src/optimizer/plan_node/batch_sys_seq_scan.rs index 007e983e67f3..a7133e47a218 100644 --- a/src/frontend/src/optimizer/plan_node/batch_sys_seq_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_sys_seq_scan.rs @@ -12,18 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::ops::Bound; - -use itertools::Itertools; use pretty_xmlish::{Pretty, XmlNode}; -use risingwave_common::types::ScalarImpl; use risingwave_common::util::scan_range::{is_full_range, ScanRange}; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::SysRowSeqScanNode; use risingwave_pb::plan_common::PbColumnDesc; use super::batch::prelude::*; -use super::utils::{childless_record, range_to_string, scan_ranges_as_strs, Distill}; +use super::utils::{childless_record, scan_ranges_as_strs, Distill}; use super::{generic, ExprRewritable, PlanBase, PlanRef, ToBatchPb, ToDistributedBatch}; use crate::error::Result; use crate::expr::{ExprRewriter, ExprVisitor}; diff --git a/src/frontend/src/session/cursor_manager.rs b/src/frontend/src/session/cursor_manager.rs index bccafc750790..c0669378197b 100644 --- a/src/frontend/src/session/cursor_manager.rs +++ b/src/frontend/src/session/cursor_manager.rs @@ -27,23 +27,20 @@ use itertools::Itertools; use pgwire::pg_field_descriptor::PgFieldDescriptor; use pgwire::pg_response::StatementType; use pgwire::types::{Format, Row}; -use prost::Message; use risingwave_common::catalog::Field; use risingwave_common::error::BoxedError; use risingwave_common::session_config::QueryMode; use risingwave_common::types::{DataType, ScalarImpl, StructType, StructValue}; use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_common::util::scan_range; use risingwave_common::util::sort_util::ColumnOrder; use risingwave_hummock_sdk::HummockVersionId; -use risingwave_pb::expr::expr_node::Type; -use risingwave_sqlparser::ast::{BinaryOperator, Expr, Ident, ObjectName, Statement, Value}; +use risingwave_sqlparser::ast::{Ident, ObjectName, Statement}; use super::SessionImpl; use crate::catalog::subscription_catalog::SubscriptionCatalog; use crate::catalog::TableId; use crate::error::{ErrorCode, Result}; -use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef, Literal}; +use crate::expr::{ExprType, FunctionCall, InputRef, Literal}; use crate::handler::declare_cursor::create_chunk_stream_for_cursor; use crate::handler::query::{ gen_batch_plan_by_statement, gen_batch_plan_fragmenter, BatchQueryPlanResult, @@ -59,9 +56,7 @@ use crate::optimizer::property::{Order, RequiredDist}; use crate::optimizer::PlanRoot; use crate::scheduler::{DistributedQueryStream, LocalQueryStream}; use crate::utils::Condition; -use crate::{ - Binder, OptimizerContext, OptimizerContextRef, PgResponseStream, PlanRef, TableCatalog, -}; +use crate::{Binder, OptimizerContext, OptimizerContextRef, PgResponseStream, TableCatalog}; pub enum CursorDataChunkStream { LocalDataChunk(Option), @@ -392,26 +387,6 @@ impl SubscriptionCursor { &fields, handler_args.session.clone(), ); - { - let (mut chunk_stream, fields, init_query_timer, pk_column_names) = - Self::initiate_query( - Some(rw_timestamp), - &self.dependent_table_id, - handler_args.clone(), - self.seek_pk_row.clone(), - ) - .await?; - Self::init_row_stream( - &mut chunk_stream, - formats, - &from_snapshot, - &fields, - handler_args.session.clone(), - ); - while let Some(a) = chunk_stream.next().await? { - println!("testtest {:?}", a); - } - } self.cursor_need_drop_time = Instant::now() + Duration::from_secs(self.subscription.retention_seconds); @@ -875,43 +850,42 @@ impl SubscriptionCursor { .iter() .map(|f| { let pk = table_catalog.columns.get(f.column_index).unwrap(); - (pk.name().to_string(), pk.data_type(), f.column_index) + (pk.data_type(), f.column_index) }) .collect_vec(); let (scan, predicate) = if let Some(seek_pk_rows) = seek_pk_rows { let mut pk_rows = vec![]; let mut values = vec![]; - for (seek_pk, (name, data_type, column_index)) in seek_pk_rows - .into_iter() - .zip_eq_fast(pks.into_iter()) { + for (seek_pk, (data_type, column_index)) in + seek_pk_rows.into_iter().zip_eq_fast(pks.into_iter()) + { if let Some(seek_pk) = seek_pk { - pk_rows.push( - InputRef { - index: column_index, - data_type: data_type.clone(), - } - ); + pk_rows.push(InputRef { + index: column_index, + data_type: data_type.clone(), + }); let value_string = String::from_utf8(seek_pk.clone().into()).unwrap(); - let value_data = - ScalarImpl::from_text(&value_string, data_type).unwrap(); - values.push((Some(value_data),data_type.clone())); + let value_data = ScalarImpl::from_text(&value_string, data_type).unwrap(); + values.push((Some(value_data), data_type.clone())); } } if pk_rows.is_empty() { (vec![], None) } else if pk_rows.len() == 1 { let left = pk_rows.pop().unwrap(); - let (right_data,right_type) = values.pop().unwrap(); + let (right_data, right_type) = values.pop().unwrap(); let (scan, predicate) = Condition { conjunctions: vec![FunctionCall::new( ExprType::GreaterThan, vec![left.into(), Literal::new(right_data, right_type).into()], - )?.into()], - }.split_to_scan_ranges(table_catalog.table_desc().into(), max_split_range_gap)?; + )? + .into()], + } + .split_to_scan_ranges(table_catalog.table_desc().into(), max_split_range_gap)?; (scan, Some(predicate)) - }else{ - let (right_datas,right_types):(Vec<_>,Vec<_>) = values.into_iter().unzip(); - let right_data = ScalarImpl::Struct(StructValue::new(right_datas)); + } else { + let (right_data, right_types): (Vec<_>, Vec<_>) = values.into_iter().unzip(); + let right_data = ScalarImpl::Struct(StructValue::new(right_data)); let right_type = DataType::Struct(StructType::unnamed(right_types)); let left = FunctionCall::new_unchecked( ExprType::Row, @@ -921,13 +895,15 @@ impl SubscriptionCursor { let right = Literal::new(Some(right_data), right_type); let (scan, predicate) = Condition { conjunctions: vec![FunctionCall::new( - ExprType::GreaterThan, - vec![left.into(), right.into()], - )?.into()], - }.split_to_scan_ranges(table_catalog.table_desc().into(), max_split_range_gap)?; + ExprType::GreaterThan, + vec![left.into(), right.into()], + )? + .into()], + } + .split_to_scan_ranges(table_catalog.table_desc().into(), max_split_range_gap)?; (scan, Some(predicate)) } - } else{ + } else { (vec![], None) }; From 7f4e0cd2fcfbb6ad4117e28e75ac21c1c25bb82d Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Fri, 8 Nov 2024 16:42:20 +0800 Subject: [PATCH 7/7] fix cursor panic --- src/common/src/array/struct_array.rs | 2 +- src/frontend/src/handler/mod.rs | 4 +--- src/frontend/src/handler/util.rs | 13 +++++++++---- src/frontend/src/session/cursor_manager.rs | 2 +- 4 files changed, 12 insertions(+), 9 deletions(-) diff --git a/src/common/src/array/struct_array.rs b/src/common/src/array/struct_array.rs index 37841d978b97..91cd28345f38 100644 --- a/src/common/src/array/struct_array.rs +++ b/src/common/src/array/struct_array.rs @@ -381,7 +381,7 @@ impl StructValue { for (str, ty) in split_str.iter().zip_eq_debug(ty.types()) { let datum = match str.trim() { "" => None, - s => Some(ScalarImpl::from_text(s, ty).unwrap()), + s => Some(ScalarImpl::from_text(s, ty)?), }; fields.push(datum); } diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index 9d5a235a658c..9cf94a37c65b 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -541,9 +541,7 @@ pub async fn handle( Statement::Query(_) | Statement::Insert { .. } | Statement::Delete { .. } - | Statement::Update { .. } => Ok(query::handle_query(handler_args, stmt, formats) - .await - .unwrap()), + | Statement::Update { .. } => query::handle_query(handler_args, stmt, formats).await, Statement::CreateView { materialized, if_not_exists, diff --git a/src/frontend/src/handler/util.rs b/src/frontend/src/handler/util.rs index 7509d7dd1199..6e043c4f0ac1 100644 --- a/src/frontend/src/handler/util.rs +++ b/src/frontend/src/handler/util.rs @@ -239,7 +239,7 @@ pub fn gen_query_from_table_name_order_by( from_name: ObjectName, pks: Vec<(String, bool)>, seek_pk_rows: Option>>, -) -> Query { +) -> RwResult { let select_pks = pks .iter() .filter_map( @@ -278,7 +278,12 @@ pub fn gen_query_from_table_name_order_by( '"', name.clone(), ))); - values.push(String::from_utf8(seek_pk.clone().into()).unwrap()); + values.push(String::from_utf8(seek_pk.clone().into()).map_err(|e| { + ErrorCode::InternalError(format!( + "Convert cursor seek_pk to string error: {:?}", + e.as_report() + )) + })?); } } if pk_rows.is_empty() { @@ -323,14 +328,14 @@ pub fn gen_query_from_table_name_order_by( } }) .collect(); - Query { + Ok(Query { with: None, body, order_by, limit: None, offset: None, fetch: None, - } + }) } pub fn convert_unix_millis_to_logstore_u64(unix_millis: u64) -> u64 { diff --git a/src/frontend/src/session/cursor_manager.rs b/src/frontend/src/session/cursor_manager.rs index c0669378197b..92252d064b94 100644 --- a/src/frontend/src/session/cursor_manager.rs +++ b/src/frontend/src/session/cursor_manager.rs @@ -682,7 +682,7 @@ impl SubscriptionCursor { subscription_from_table_name, pks, seek_pk_row, - ))); + )?)); gen_batch_plan_by_statement(&session, context.into(), query_stmt) } }