From f07751bed42a804bdff3ea52009b7bf9dfe4e9e0 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Wed, 26 Nov 2025 15:31:50 +0800 Subject: [PATCH 01/25] wip --- .../storages/common/blocks/src/parquet_rs.rs | 13 +++++++++++++ .../storages/fuse/src/io/write/block_writer.rs | 17 +++++++++++++++-- 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/src/query/storages/common/blocks/src/parquet_rs.rs b/src/query/storages/common/blocks/src/parquet_rs.rs index 6da54c70d985d..33cb2b2e7093e 100644 --- a/src/query/storages/common/blocks/src/parquet_rs.rs +++ b/src/query/storages/common/blocks/src/parquet_rs.rs @@ -26,6 +26,7 @@ use parquet::file::properties::WriterProperties; use parquet::file::properties::WriterPropertiesBuilder; use parquet::file::properties::WriterVersion; use parquet::format::FileMetaData; +use databend_storages_common_table_meta::meta::ColumnStatistics; /// Serialize data blocks to parquet format. pub fn blocks_to_parquet( @@ -35,6 +36,18 @@ pub fn blocks_to_parquet( compression: TableCompression, enable_dictionary: bool, metadata: Option>, +) -> Result { + blocks_to_parquet_with_stats(table_schema, blocks, write_buffer, compression, enable_dictionary, metadata, None) +} + +pub fn blocks_to_parquet_with_stats( + table_schema: &TableSchema, + blocks: Vec, + write_buffer: &mut Vec, + compression: TableCompression, + enable_dictionary: bool, + metadata: Option>, + column_stats: Option<&ColumnStatistics>, ) -> Result { assert!(!blocks.is_empty()); let builder = parquet_writer_properties_builder(compression, enable_dictionary, metadata); diff --git a/src/query/storages/fuse/src/io/write/block_writer.rs b/src/query/storages/fuse/src/io/write/block_writer.rs index 531f70a17c5b5..0de19bb54c5bf 100644 --- a/src/query/storages/fuse/src/io/write/block_writer.rs +++ b/src/query/storages/fuse/src/io/write/block_writer.rs @@ -46,7 +46,7 @@ use databend_common_metrics::storage::metrics_inc_block_write_nums; use databend_common_native::write::NativeWriter; use databend_storages_common_blocks::blocks_to_parquet; use databend_storages_common_index::NgramArgs; -use databend_storages_common_table_meta::meta::encode_column_hll; +use databend_storages_common_table_meta::meta::{encode_column_hll, StatisticsOfColumns}; use databend_storages_common_table_meta::meta::BlockHLLState; use databend_storages_common_table_meta::meta::BlockMeta; use databend_storages_common_table_meta::meta::ClusterStatistics; @@ -76,6 +76,18 @@ pub fn serialize_block( schema: &TableSchemaRef, block: DataBlock, buf: &mut Vec, +) -> Result> { + serialize_block_with_column_stats( + write_settings,schema, None, block, buf + ) +} + +pub fn serialize_block_with_column_stats( + write_settings: &WriteSettings, + schema: &TableSchemaRef, + column_stats: Option<&StatisticsOfColumns>, + block: DataBlock, + buf: &mut Vec, ) -> Result> { let schema = Arc::new(schema.remove_virtual_computed_fields()); match write_settings.storage_format { @@ -248,9 +260,10 @@ impl BlockBuilder { let mut buffer = Vec::with_capacity(DEFAULT_BLOCK_BUFFER_SIZE); let block_size = data_block.estimate_block_size() as u64; - let col_metas = serialize_block( + let col_metas = serialize_block_with_column_stats( &self.write_settings, &self.source_schema, + Some(&col_stats), data_block, &mut buffer, )?; From 45a858c843a17cf040f4c3024f178581f5b239d4 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Wed, 26 Nov 2025 23:09:38 +0800 Subject: [PATCH 02/25] enable dictionary page heuristic rules (not for streeam write yet) --- .../storages/common/blocks/src/parquet_rs.rs | 58 ++++++++++++++++++- .../fuse/src/io/write/block_writer.rs | 12 ++-- .../fuse/src/io/write/stream/block_builder.rs | 1 + 3 files changed, 62 insertions(+), 9 deletions(-) diff --git a/src/query/storages/common/blocks/src/parquet_rs.rs b/src/query/storages/common/blocks/src/parquet_rs.rs index 33cb2b2e7093e..eb264385cd4e6 100644 --- a/src/query/storages/common/blocks/src/parquet_rs.rs +++ b/src/query/storages/common/blocks/src/parquet_rs.rs @@ -12,11 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::sync::Arc; use databend_common_exception::Result; use databend_common_expression::DataBlock; use databend_common_expression::TableSchema; +use databend_storages_common_table_meta::meta::StatisticsOfColumns; use databend_storages_common_table_meta::table::TableCompression; use parquet::arrow::ArrowWriter; use parquet::basic::Encoding; @@ -26,7 +28,7 @@ use parquet::file::properties::WriterProperties; use parquet::file::properties::WriterPropertiesBuilder; use parquet::file::properties::WriterVersion; use parquet::format::FileMetaData; -use databend_storages_common_table_meta::meta::ColumnStatistics; +use parquet::schema::types::ColumnPath; /// Serialize data blocks to parquet format. pub fn blocks_to_parquet( @@ -37,7 +39,15 @@ pub fn blocks_to_parquet( enable_dictionary: bool, metadata: Option>, ) -> Result { - blocks_to_parquet_with_stats(table_schema, blocks, write_buffer, compression, enable_dictionary, metadata, None) + blocks_to_parquet_with_stats( + table_schema, + blocks, + write_buffer, + compression, + enable_dictionary, + metadata, + None, + ) } pub fn blocks_to_parquet_with_stats( @@ -47,10 +57,22 @@ pub fn blocks_to_parquet_with_stats( compression: TableCompression, enable_dictionary: bool, metadata: Option>, - column_stats: Option<&ColumnStatistics>, + column_stats: Option<&StatisticsOfColumns>, ) -> Result { assert!(!blocks.is_empty()); let builder = parquet_writer_properties_builder(compression, enable_dictionary, metadata); + let builder = if let Some(cols_stats) = column_stats { + let num_rows = blocks[0].num_rows(); + adjust_writer_properties_by_col_stats( + builder, + enable_dictionary, + cols_stats, + num_rows, + table_schema, + ) + } else { + builder + }; let props = builder.build(); let batches = blocks @@ -88,3 +110,33 @@ pub fn parquet_writer_properties_builder( builder.set_dictionary_enabled(false) } } + +pub fn adjust_writer_properties_by_col_stats( + builder: WriterPropertiesBuilder, + enable_dictionary: bool, + cols_stats: &StatisticsOfColumns, + num_rows: usize, + table_schema: &TableSchema, +) -> WriterPropertiesBuilder { + if !enable_dictionary { + return builder; + }; + + let mut builder = builder.set_dictionary_enabled(false); + + let colum_names: HashMap<_, _> = table_schema + .fields + .iter() + .map(|f| (f.column_id, f.name.as_str())) + .collect(); + for (col_id, stats) in cols_stats.iter() { + if let Some(ndv) = stats.distinct_of_values { + if (ndv as f64 / num_rows as f64) < 0.1 { + // DOC safe to unwrap + let col_name = colum_names.get(col_id).unwrap(); + builder = builder.set_column_dictionary_enabled(ColumnPath::from(*col_name), true); + } + } + } + builder +} diff --git a/src/query/storages/fuse/src/io/write/block_writer.rs b/src/query/storages/fuse/src/io/write/block_writer.rs index 0de19bb54c5bf..5a03e64978c34 100644 --- a/src/query/storages/fuse/src/io/write/block_writer.rs +++ b/src/query/storages/fuse/src/io/write/block_writer.rs @@ -44,14 +44,15 @@ use databend_common_metrics::storage::metrics_inc_block_virtual_column_write_num use databend_common_metrics::storage::metrics_inc_block_write_milliseconds; use databend_common_metrics::storage::metrics_inc_block_write_nums; use databend_common_native::write::NativeWriter; -use databend_storages_common_blocks::blocks_to_parquet; +use databend_storages_common_blocks::blocks_to_parquet_with_stats; use databend_storages_common_index::NgramArgs; -use databend_storages_common_table_meta::meta::{encode_column_hll, StatisticsOfColumns}; +use databend_storages_common_table_meta::meta::encode_column_hll; use databend_storages_common_table_meta::meta::BlockHLLState; use databend_storages_common_table_meta::meta::BlockMeta; use databend_storages_common_table_meta::meta::ClusterStatistics; use databend_storages_common_table_meta::meta::ColumnMeta; use databend_storages_common_table_meta::meta::ExtendedBlockMeta; +use databend_storages_common_table_meta::meta::StatisticsOfColumns; use databend_storages_common_table_meta::meta::TableMetaTimestamps; use databend_storages_common_table_meta::table::TableCompression; use opendal::Operator; @@ -77,9 +78,7 @@ pub fn serialize_block( block: DataBlock, buf: &mut Vec, ) -> Result> { - serialize_block_with_column_stats( - write_settings,schema, None, block, buf - ) + serialize_block_with_column_stats(write_settings, schema, None, block, buf) } pub fn serialize_block_with_column_stats( @@ -92,13 +91,14 @@ pub fn serialize_block_with_column_stats( let schema = Arc::new(schema.remove_virtual_computed_fields()); match write_settings.storage_format { FuseStorageFormat::Parquet => { - let result = blocks_to_parquet( + let result = blocks_to_parquet_with_stats( &schema, vec![block], buf, write_settings.table_compression, write_settings.enable_parquet_dictionary, None, + column_stats, )?; let meta = column_parquet_metas(&result, &schema)?; Ok(meta) diff --git a/src/query/storages/fuse/src/io/write/stream/block_builder.rs b/src/query/storages/fuse/src/io/write/stream/block_builder.rs index 4de89f7e0a6fb..c74d3b08cd033 100644 --- a/src/query/storages/fuse/src/io/write/stream/block_builder.rs +++ b/src/query/storages/fuse/src/io/write/stream/block_builder.rs @@ -171,6 +171,7 @@ impl StreamBlockBuilder { let block_writer = match properties.write_settings.storage_format { FuseStorageFormat::Parquet => { let write_settings = &properties.write_settings; + // TODO NDV for heuristic rule let props = parquet_writer_properties_builder( write_settings.table_compression, write_settings.enable_parquet_dictionary, From f75023c408f4940bf9438fa72067dc188dd8c07f Mon Sep 17 00:00:00 2001 From: dantengsky Date: Wed, 26 Nov 2025 23:24:10 +0800 Subject: [PATCH 03/25] fix typos --- src/query/storages/common/blocks/src/parquet_rs.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/query/storages/common/blocks/src/parquet_rs.rs b/src/query/storages/common/blocks/src/parquet_rs.rs index eb264385cd4e6..26cab8230e8b8 100644 --- a/src/query/storages/common/blocks/src/parquet_rs.rs +++ b/src/query/storages/common/blocks/src/parquet_rs.rs @@ -61,12 +61,12 @@ pub fn blocks_to_parquet_with_stats( ) -> Result { assert!(!blocks.is_empty()); let builder = parquet_writer_properties_builder(compression, enable_dictionary, metadata); - let builder = if let Some(cols_stats) = column_stats { + let builder = if let Some(stats) = column_stats { let num_rows = blocks[0].num_rows(); adjust_writer_properties_by_col_stats( builder, enable_dictionary, - cols_stats, + stats, num_rows, table_schema, ) @@ -124,7 +124,7 @@ pub fn adjust_writer_properties_by_col_stats( let mut builder = builder.set_dictionary_enabled(false); - let colum_names: HashMap<_, _> = table_schema + let column_names: HashMap<_, _> = table_schema .fields .iter() .map(|f| (f.column_id, f.name.as_str())) @@ -133,8 +133,8 @@ pub fn adjust_writer_properties_by_col_stats( if let Some(ndv) = stats.distinct_of_values { if (ndv as f64 / num_rows as f64) < 0.1 { // DOC safe to unwrap - let col_name = colum_names.get(col_id).unwrap(); - builder = builder.set_column_dictionary_enabled(ColumnPath::from(*col_name), true); + let name = column_names.get(col_id).unwrap(); + builder = builder.set_column_dictionary_enabled(ColumnPath::from(*name), true); } } } From a9d0845f28b7039fc89289b77dff6dca1081b95a Mon Sep 17 00:00:00 2001 From: dantengsky Date: Thu, 27 Nov 2025 17:54:43 +0800 Subject: [PATCH 04/25] wip --- .../storages/fuse/src/io/write/stream/block_builder.rs | 1 + .../fuse/src/io/write/stream/column_ndv_estimator.rs | 6 ++++++ 2 files changed, 7 insertions(+) diff --git a/src/query/storages/fuse/src/io/write/stream/block_builder.rs b/src/query/storages/fuse/src/io/write/stream/block_builder.rs index c74d3b08cd033..6c4f3d6c303e8 100644 --- a/src/query/storages/fuse/src/io/write/stream/block_builder.rs +++ b/src/query/storages/fuse/src/io/write/stream/block_builder.rs @@ -284,6 +284,7 @@ impl StreamBlockBuilder { } self.row_count += block.num_rows(); self.block_size += block.estimate_block_size(); + self.block_writer .write(block, &self.properties.source_schema)?; Ok(()) diff --git a/src/query/storages/fuse/src/io/write/stream/column_ndv_estimator.rs b/src/query/storages/fuse/src/io/write/stream/column_ndv_estimator.rs index 84ea04b690330..8784094357e78 100644 --- a/src/query/storages/fuse/src/io/write/stream/column_ndv_estimator.rs +++ b/src/query/storages/fuse/src/io/write/stream/column_ndv_estimator.rs @@ -47,6 +47,8 @@ use enum_dispatch::enum_dispatch; pub trait ColumnNDVEstimatorOps: Send + Sync { fn update_column(&mut self, column: &Column); fn update_scalar(&mut self, scalar: &ScalarRef); + + fn peek(&self) -> usize; fn finalize(&self) -> usize; fn hll(self) -> MetaHLL; } @@ -182,6 +184,10 @@ where self.hll.add_object(&val); } + fn peek(&self) -> usize { + self.hll.count() + } + fn finalize(&self) -> usize { self.hll.count() } From 25b4ac89cda5cfbd143754300c3f94b83cffdbeb Mon Sep 17 00:00:00 2001 From: dantengsky Date: Mon, 1 Dec 2025 15:06:15 +0800 Subject: [PATCH 05/25] wip --- .../fuse/src/io/write/stream/block_builder.rs | 38 +++++++++++++++---- .../write/stream/column_statistics_state.rs | 6 +++ 2 files changed, 36 insertions(+), 8 deletions(-) diff --git a/src/query/storages/fuse/src/io/write/stream/block_builder.rs b/src/query/storages/fuse/src/io/write/stream/block_builder.rs index 6c4f3d6c303e8..3de0244fc8604 100644 --- a/src/query/storages/fuse/src/io/write/stream/block_builder.rs +++ b/src/query/storages/fuse/src/io/write/stream/block_builder.rs @@ -19,6 +19,7 @@ use std::collections::HashSet; use std::mem; use std::sync::Arc; +use arrow_schema::Schema; use chrono::Utc; use databend_common_catalog::table::Table; use databend_common_catalog::table_context::TableContext; @@ -69,6 +70,26 @@ use crate::operations::column_parquet_metas; use crate::FuseStorageFormat; use crate::FuseTable; +struct UninitializedArrowWriter { + write_settings: WriteSettings, + arrow_schema: Arc, +} + +impl UninitializedArrowWriter { + fn init(self, cols_ndv: HashMap) -> Result>> { + let write_settings = &self.write_settings; + let props = parquet_writer_properties_builder( + write_settings.table_compression, + write_settings.enable_parquet_dictionary, + None, + ) + .build(); + let buffer = Vec::with_capacity(DEFAULT_BLOCK_BUFFER_SIZE); + let writer = ArrowWriter::try_new(buffer, self.arrow_schema, Some(props))?; + Ok(writer) + } +} + pub enum BlockWriterImpl { Arrow(ArrowWriter>), // Native format doesnot support stream write. @@ -76,7 +97,7 @@ pub enum BlockWriterImpl { } pub trait BlockWriter { - fn start(&mut self) -> Result<()>; + fn start(&mut self, cols_ndv: HashMap) -> Result<()>; fn write(&mut self, block: DataBlock, schema: &TableSchema) -> Result<()>; @@ -88,10 +109,10 @@ pub trait BlockWriter { } impl BlockWriter for BlockWriterImpl { - fn start(&mut self) -> Result<()> { + fn start(&mut self, cols_ndv: HashMap) -> Result<()> { match self { - BlockWriterImpl::Arrow(_) => Ok(()), - BlockWriterImpl::Native(writer) => Ok(writer.start()?), + BlockWriterImpl::Arrow(arrow_writer) => Ok(()), + BlockWriterImpl::Native(native_writer) => Ok(native_writer.start()?), } } @@ -264,10 +285,6 @@ impl StreamBlockBuilder { return Ok(()); } - if self.row_count == 0 { - self.block_writer.start()?; - } - let block = self.cluster_stats_state.add_block(block)?; self.column_stats_state .add_block(&self.properties.source_schema, &block)?; @@ -285,6 +302,11 @@ impl StreamBlockBuilder { self.row_count += block.num_rows(); self.block_size += block.estimate_block_size(); + if self.row_count == 0 { + let cols_ndv = self.column_stats_state.peek_cols_ndv(); + self.block_writer.start(cols_ndv)?; + } + self.block_writer .write(block, &self.properties.source_schema)?; Ok(()) diff --git a/src/query/storages/fuse/src/io/write/stream/column_statistics_state.rs b/src/query/storages/fuse/src/io/write/stream/column_statistics_state.rs index ac65378b20d22..8263dc522f885 100644 --- a/src/query/storages/fuse/src/io/write/stream/column_statistics_state.rs +++ b/src/query/storages/fuse/src/io/write/stream/column_statistics_state.rs @@ -86,6 +86,12 @@ impl ColumnStatisticsState { Ok(()) } + pub fn peek_cols_ndv(&self) -> HashMap { + self.distinct_columns.iter().map(|(column_id, ndv_estimator)| { + (*column_id, ndv_estimator.peek()) + }).collect() + } + pub fn finalize( self, mut column_distinct_count: HashMap, From caa45b766de7968a46177f84e18cfccc6c57f007 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Mon, 1 Dec 2025 17:01:02 +0800 Subject: [PATCH 06/25] wip --- .../storages/common/blocks/src/parquet_rs.rs | 28 +++++++++++-------- .../fuse/src/io/write/stream/block_builder.rs | 1 + 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/src/query/storages/common/blocks/src/parquet_rs.rs b/src/query/storages/common/blocks/src/parquet_rs.rs index 26cab8230e8b8..4f42a96072bd1 100644 --- a/src/query/storages/common/blocks/src/parquet_rs.rs +++ b/src/query/storages/common/blocks/src/parquet_rs.rs @@ -16,7 +16,7 @@ use std::collections::HashMap; use std::sync::Arc; use databend_common_exception::Result; -use databend_common_expression::DataBlock; +use databend_common_expression::{ColumnId, DataBlock}; use databend_common_expression::TableSchema; use databend_storages_common_table_meta::meta::StatisticsOfColumns; use databend_storages_common_table_meta::table::TableCompression; @@ -111,6 +111,16 @@ pub fn parquet_writer_properties_builder( } } +trait NdvProvider { + fn column_ndv(&self, column_id: &ColumnId) -> Option; +} + +impl NdvProvider for &StatisticsOfColumns { + fn column_ndv(&self, column_id: &ColumnId) -> Option { + self.get(column_id).and_then(|item| item.distinct_of_values) + } +} + pub fn adjust_writer_properties_by_col_stats( builder: WriterPropertiesBuilder, enable_dictionary: bool, @@ -124,19 +134,15 @@ pub fn adjust_writer_properties_by_col_stats( let mut builder = builder.set_dictionary_enabled(false); - let column_names: HashMap<_, _> = table_schema - .fields - .iter() - .map(|f| (f.column_id, f.name.as_str())) - .collect(); - for (col_id, stats) in cols_stats.iter() { - if let Some(ndv) = stats.distinct_of_values { + for field in table_schema.fields().iter() { + let col_id = field.column_id(); + if let Some(ndv) = cols_stats.column_ndv(&col_id) { if (ndv as f64 / num_rows as f64) < 0.1 { - // DOC safe to unwrap - let name = column_names.get(col_id).unwrap(); - builder = builder.set_column_dictionary_enabled(ColumnPath::from(*name), true); + let name = field.name().as_str(); + builder = builder.set_column_dictionary_enabled(ColumnPath::from(name), true); } } } + builder } diff --git a/src/query/storages/fuse/src/io/write/stream/block_builder.rs b/src/query/storages/fuse/src/io/write/stream/block_builder.rs index 3de0244fc8604..cb7ca9f5963ba 100644 --- a/src/query/storages/fuse/src/io/write/stream/block_builder.rs +++ b/src/query/storages/fuse/src/io/write/stream/block_builder.rs @@ -73,6 +73,7 @@ use crate::FuseTable; struct UninitializedArrowWriter { write_settings: WriteSettings, arrow_schema: Arc, + table_schema: TableSchemaRef, } impl UninitializedArrowWriter { From fa0ee9a81b4d6de0bded988f399c0937b6f3583c Mon Sep 17 00:00:00 2001 From: dantengsky Date: Mon, 1 Dec 2025 22:02:52 +0800 Subject: [PATCH 07/25] apply heuristic ruule to stream write --- .../storages/common/blocks/src/parquet_rs.rs | 12 +- .../fuse/src/io/write/stream/block_builder.rs | 131 ++++++++++++++---- .../write/stream/column_statistics_state.rs | 7 +- 3 files changed, 115 insertions(+), 35 deletions(-) diff --git a/src/query/storages/common/blocks/src/parquet_rs.rs b/src/query/storages/common/blocks/src/parquet_rs.rs index 4f42a96072bd1..a204e7ec237b0 100644 --- a/src/query/storages/common/blocks/src/parquet_rs.rs +++ b/src/query/storages/common/blocks/src/parquet_rs.rs @@ -12,11 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; use std::sync::Arc; use databend_common_exception::Result; -use databend_common_expression::{ColumnId, DataBlock}; +use databend_common_expression::ColumnId; +use databend_common_expression::DataBlock; use databend_common_expression::TableSchema; use databend_storages_common_table_meta::meta::StatisticsOfColumns; use databend_storages_common_table_meta::table::TableCompression; @@ -111,8 +111,8 @@ pub fn parquet_writer_properties_builder( } } -trait NdvProvider { - fn column_ndv(&self, column_id: &ColumnId) -> Option; +pub trait NdvProvider { + fn column_ndv(&self, column_id: &ColumnId) -> Option; } impl NdvProvider for &StatisticsOfColumns { @@ -124,7 +124,7 @@ impl NdvProvider for &StatisticsOfColumns { pub fn adjust_writer_properties_by_col_stats( builder: WriterPropertiesBuilder, enable_dictionary: bool, - cols_stats: &StatisticsOfColumns, + cols_stats: impl NdvProvider, num_rows: usize, table_schema: &TableSchema, ) -> WriterPropertiesBuilder { @@ -135,7 +135,7 @@ pub fn adjust_writer_properties_by_col_stats( let mut builder = builder.set_dictionary_enabled(false); for field in table_schema.fields().iter() { - let col_id = field.column_id(); + let col_id = field.column_id(); if let Some(ndv) = cols_stats.column_ndv(&col_id) { if (ndv as f64 / num_rows as f64) < 0.1 { let name = field.name().as_str(); diff --git a/src/query/storages/fuse/src/io/write/stream/block_builder.rs b/src/query/storages/fuse/src/io/write/stream/block_builder.rs index cb7ca9f5963ba..ceae47fcd60aa 100644 --- a/src/query/storages/fuse/src/io/write/stream/block_builder.rs +++ b/src/query/storages/fuse/src/io/write/stream/block_builder.rs @@ -19,6 +19,7 @@ use std::collections::HashSet; use std::mem; use std::sync::Arc; +use arrow_array::RecordBatch; use arrow_schema::Schema; use chrono::Utc; use databend_common_catalog::table::Table; @@ -39,7 +40,9 @@ use databend_common_meta_app::schema::TableIndex; use databend_common_native::write::NativeWriter; use databend_common_native::write::WriteOptions; use databend_common_sql::executor::physical_plans::MutationKind; +use databend_storages_common_blocks::adjust_writer_properties_by_col_stats; use databend_storages_common_blocks::parquet_writer_properties_builder; +use databend_storages_common_blocks::NdvProvider; use databend_storages_common_index::BloomIndex; use databend_storages_common_index::BloomIndexBuilder; use databend_storages_common_index::Index; @@ -51,8 +54,10 @@ use databend_storages_common_table_meta::meta::ColumnMeta; use databend_storages_common_table_meta::meta::TableMetaTimestamps; use databend_storages_common_table_meta::table::TableCompression; use parquet::arrow::ArrowWriter; +use parquet::format::FileMetaData; use crate::io::create_inverted_index_builders; +use crate::io::write::stream::block_builder::ArrowParquetWriter::Initialized; use crate::io::write::stream::cluster_statistics::ClusterStatisticsBuilder; use crate::io::write::stream::cluster_statistics::ClusterStatisticsState; use crate::io::write::stream::ColumnStatisticsState; @@ -70,29 +75,90 @@ use crate::operations::column_parquet_metas; use crate::FuseStorageFormat; use crate::FuseTable; -struct UninitializedArrowWriter { +pub struct UninitializedArrowWriter { write_settings: WriteSettings, arrow_schema: Arc, table_schema: TableSchemaRef, } - impl UninitializedArrowWriter { - fn init(self, cols_ndv: HashMap) -> Result>> { + fn init(&self, cols_ndv: HashMap) -> Result>> { let write_settings = &self.write_settings; - let props = parquet_writer_properties_builder( + let builder = parquet_writer_properties_builder( write_settings.table_compression, write_settings.enable_parquet_dictionary, None, - ) - .build(); + ); + + let builder = adjust_writer_properties_by_col_stats( + builder, + self.write_settings.enable_parquet_dictionary, + ColumnsNdv(cols_ndv), + 0, + self.table_schema.as_ref(), + ); let buffer = Vec::with_capacity(DEFAULT_BLOCK_BUFFER_SIZE); - let writer = ArrowWriter::try_new(buffer, self.arrow_schema, Some(props))?; + let writer = + ArrowWriter::try_new(buffer, self.arrow_schema.clone(), Some(builder.build()))?; Ok(writer) } } +pub struct InitializedArrowWriter { + inner: ArrowWriter>, +} +pub enum ArrowParquetWriter { + Uninitialized(UninitializedArrowWriter), + Initialized(InitializedArrowWriter), +} +impl ArrowParquetWriter { + fn new_uninitialized(write_settings: WriteSettings, table_schema: TableSchemaRef) -> Self { + let arrow_schema = Arc::new(table_schema.as_ref().into()); + ArrowParquetWriter::Uninitialized(UninitializedArrowWriter { + write_settings, + arrow_schema, + table_schema, + }) + } + fn write(&mut self, batch: &RecordBatch) -> Result<()> { + let Initialized(writer) = self else { + unreachable!("ArrowParquetWriter::write called before initialization"); + }; + writer.inner.write(batch)?; + Ok(()) + } + + fn finish(&mut self) -> Result { + let Initialized(writer) = self else { + unreachable!("ArrowParquetWriter::finish called before initialization"); + }; + let file_meta = writer.inner.finish()?; + Ok(file_meta) + } + + fn inner_mut(&mut self) -> &mut Vec { + let Initialized(writer) = self else { + unreachable!("ArrowParquetWriter::inner_mut called before initialization"); + }; + writer.inner.inner_mut() + } + + fn in_progress_size(&self) -> usize { + let Initialized(writer) = self else { + unreachable!("ArrowParquetWriter::in_progress_size called before initialization"); + }; + writer.inner.in_progress_size() + } +} + +struct ColumnsNdv(HashMap); +impl NdvProvider for ColumnsNdv { + fn column_ndv(&self, column_id: &ColumnId) -> Option { + self.0.get(column_id).map(|v| *v as u64) + } +} + pub enum BlockWriterImpl { - Arrow(ArrowWriter>), + Parquet(ArrowParquetWriter), // Native format doesnot support stream write. Native(NativeWriter>), } @@ -112,16 +178,26 @@ pub trait BlockWriter { impl BlockWriter for BlockWriterImpl { fn start(&mut self, cols_ndv: HashMap) -> Result<()> { match self { - BlockWriterImpl::Arrow(arrow_writer) => Ok(()), + BlockWriterImpl::Parquet(arrow_writer) => { + let ArrowParquetWriter::Uninitialized(uninitialized) = arrow_writer else { + unreachable!( + "Unexpected writer state: ArrowWriterImpl::Parquet has been initialized" + ); + }; + + let inner = uninitialized.init(cols_ndv)?; + *arrow_writer = ArrowParquetWriter::Initialized(InitializedArrowWriter { inner }); + Ok(()) + } BlockWriterImpl::Native(native_writer) => Ok(native_writer.start()?), } } fn write(&mut self, block: DataBlock, schema: &TableSchema) -> Result<()> { match self { - BlockWriterImpl::Arrow(writer) => { + BlockWriterImpl::Parquet(writer) => { let batch = block.to_record_batch(schema)?; - writer.write(&batch)?; + writer.write(&batch)? } BlockWriterImpl::Native(writer) => { let block = block.consume_convert_to_full(); @@ -138,7 +214,7 @@ impl BlockWriter for BlockWriterImpl { fn finish(&mut self, schema: &TableSchemaRef) -> Result> { match self { - BlockWriterImpl::Arrow(writer) => { + BlockWriterImpl::Parquet(writer) => { let file_meta = writer.finish()?; column_parquet_metas(&file_meta, schema) } @@ -158,14 +234,14 @@ impl BlockWriter for BlockWriterImpl { fn inner_mut(&mut self) -> &mut Vec { match self { - BlockWriterImpl::Arrow(writer) => writer.inner_mut(), + BlockWriterImpl::Parquet(writer) => writer.inner_mut(), BlockWriterImpl::Native(writer) => writer.inner_mut(), } } fn compressed_size(&self) -> usize { match self { - BlockWriterImpl::Arrow(writer) => writer.in_progress_size(), + BlockWriterImpl::Parquet(writer) => writer.in_progress_size(), BlockWriterImpl::Native(writer) => writer.total_size(), } } @@ -192,18 +268,21 @@ impl StreamBlockBuilder { let buffer = Vec::with_capacity(DEFAULT_BLOCK_BUFFER_SIZE); let block_writer = match properties.write_settings.storage_format { FuseStorageFormat::Parquet => { - let write_settings = &properties.write_settings; - // TODO NDV for heuristic rule - let props = parquet_writer_properties_builder( - write_settings.table_compression, - write_settings.enable_parquet_dictionary, - None, - ) - .build(); - - let arrow_schema = Arc::new(properties.source_schema.as_ref().into()); - let writer = ArrowWriter::try_new(buffer, arrow_schema, Some(props))?; - BlockWriterImpl::Arrow(writer) + // let write_settings = &properties.write_settings; + // // TODO NDV for heuristic rule + // let props = parquet_writer_properties_builder( + // write_settings.table_compression, + // write_settings.enable_parquet_dictionary, + // None, + // ) + // .build(); + // + // let arrow_schema = Arc::new(properties.source_schema.as_ref().into()); + // let writer = ArrowWriter::try_new(buffer, arrow_schema, Some(props))?; + BlockWriterImpl::Parquet(ArrowParquetWriter::new_uninitialized( + properties.write_settings.clone(), + properties.source_schema.clone(), + )) } FuseStorageFormat::Native => { let mut default_compress_ratio = Some(2.10f64); diff --git a/src/query/storages/fuse/src/io/write/stream/column_statistics_state.rs b/src/query/storages/fuse/src/io/write/stream/column_statistics_state.rs index 8263dc522f885..df5304e4e5afc 100644 --- a/src/query/storages/fuse/src/io/write/stream/column_statistics_state.rs +++ b/src/query/storages/fuse/src/io/write/stream/column_statistics_state.rs @@ -87,9 +87,10 @@ impl ColumnStatisticsState { } pub fn peek_cols_ndv(&self) -> HashMap { - self.distinct_columns.iter().map(|(column_id, ndv_estimator)| { - (*column_id, ndv_estimator.peek()) - }).collect() + self.distinct_columns + .iter() + .map(|(column_id, ndv_estimator)| (*column_id, ndv_estimator.peek())) + .collect() } pub fn finalize( From 8fd5fc5242e5de6d5f143586e05a7965de5fa15a Mon Sep 17 00:00:00 2001 From: dantengsky Date: Tue, 2 Dec 2025 09:06:05 +0800 Subject: [PATCH 08/25] fix: parquet writer not inited on time --- src/query/storages/fuse/src/io/write/stream/block_builder.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/query/storages/fuse/src/io/write/stream/block_builder.rs b/src/query/storages/fuse/src/io/write/stream/block_builder.rs index ceae47fcd60aa..439657dfaefd0 100644 --- a/src/query/storages/fuse/src/io/write/stream/block_builder.rs +++ b/src/query/storages/fuse/src/io/write/stream/block_builder.rs @@ -365,6 +365,8 @@ impl StreamBlockBuilder { return Ok(()); } + let had_existing_rows = self.row_count > 0; + let block = self.cluster_stats_state.add_block(block)?; self.column_stats_state .add_block(&self.properties.source_schema, &block)?; @@ -382,7 +384,7 @@ impl StreamBlockBuilder { self.row_count += block.num_rows(); self.block_size += block.estimate_block_size(); - if self.row_count == 0 { + if !had_existing_rows { let cols_ndv = self.column_stats_state.peek_cols_ndv(); self.block_writer.start(cols_ndv)?; } From 24bf10c2ddeb571506425bab5c9fa832e92c1aa1 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Tue, 2 Dec 2025 10:08:00 +0800 Subject: [PATCH 09/25] use correct num_rows to init arrow writer --- .../fuse/src/io/write/stream/block_builder.rs | 32 +++++++++++++------ 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/src/query/storages/fuse/src/io/write/stream/block_builder.rs b/src/query/storages/fuse/src/io/write/stream/block_builder.rs index 439657dfaefd0..c4961ec29d83d 100644 --- a/src/query/storages/fuse/src/io/write/stream/block_builder.rs +++ b/src/query/storages/fuse/src/io/write/stream/block_builder.rs @@ -81,7 +81,7 @@ pub struct UninitializedArrowWriter { table_schema: TableSchemaRef, } impl UninitializedArrowWriter { - fn init(&self, cols_ndv: HashMap) -> Result>> { + fn init(&self, cols_ndv_info: ColumnsNdvInfo) -> Result>> { let write_settings = &self.write_settings; let builder = parquet_writer_properties_builder( write_settings.table_compression, @@ -89,11 +89,12 @@ impl UninitializedArrowWriter { None, ); + let num_rows = cols_ndv_info.num_rows; let builder = adjust_writer_properties_by_col_stats( builder, self.write_settings.enable_parquet_dictionary, - ColumnsNdv(cols_ndv), - 0, + cols_ndv_info, + num_rows, self.table_schema.as_ref(), ); let buffer = Vec::with_capacity(DEFAULT_BLOCK_BUFFER_SIZE); @@ -150,10 +151,20 @@ impl ArrowParquetWriter { } } -struct ColumnsNdv(HashMap); -impl NdvProvider for ColumnsNdv { +pub struct ColumnsNdvInfo { + cols_ndv: HashMap, + num_rows: usize, +} + +impl ColumnsNdvInfo { + fn new(num_rows: usize, cols_ndv: HashMap) -> Self { + Self { cols_ndv, num_rows } + } +} + +impl NdvProvider for ColumnsNdvInfo { fn column_ndv(&self, column_id: &ColumnId) -> Option { - self.0.get(column_id).map(|v| *v as u64) + self.cols_ndv.get(column_id).map(|v| *v as u64) } } @@ -164,7 +175,7 @@ pub enum BlockWriterImpl { } pub trait BlockWriter { - fn start(&mut self, cols_ndv: HashMap) -> Result<()>; + fn start(&mut self, cols_ndv: ColumnsNdvInfo) -> Result<()>; fn write(&mut self, block: DataBlock, schema: &TableSchema) -> Result<()>; @@ -176,7 +187,7 @@ pub trait BlockWriter { } impl BlockWriter for BlockWriterImpl { - fn start(&mut self, cols_ndv: HashMap) -> Result<()> { + fn start(&mut self, cols_ndv_info: ColumnsNdvInfo) -> Result<()> { match self { BlockWriterImpl::Parquet(arrow_writer) => { let ArrowParquetWriter::Uninitialized(uninitialized) = arrow_writer else { @@ -185,7 +196,7 @@ impl BlockWriter for BlockWriterImpl { ); }; - let inner = uninitialized.init(cols_ndv)?; + let inner = uninitialized.init(cols_ndv_info)?; *arrow_writer = ArrowParquetWriter::Initialized(InitializedArrowWriter { inner }); Ok(()) } @@ -386,7 +397,8 @@ impl StreamBlockBuilder { if !had_existing_rows { let cols_ndv = self.column_stats_state.peek_cols_ndv(); - self.block_writer.start(cols_ndv)?; + self.block_writer + .start(ColumnsNdvInfo::new(block.num_rows(), cols_ndv))?; } self.block_writer From 51d2f9d550463b3ae16cb030fc825580ecc325a3 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Tue, 2 Dec 2025 11:54:04 +0800 Subject: [PATCH 10/25] combine ndv stats before init writer --- src/query/storages/common/blocks/src/parquet_rs.rs | 12 ++++-------- .../fuse/src/io/write/block_statistics_writer.rs | 11 +++++++++-- .../fuse/src/io/write/stream/block_builder.rs | 4 +++- 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/src/query/storages/common/blocks/src/parquet_rs.rs b/src/query/storages/common/blocks/src/parquet_rs.rs index a204e7ec237b0..fbb1019714ba0 100644 --- a/src/query/storages/common/blocks/src/parquet_rs.rs +++ b/src/query/storages/common/blocks/src/parquet_rs.rs @@ -122,7 +122,7 @@ impl NdvProvider for &StatisticsOfColumns { } pub fn adjust_writer_properties_by_col_stats( - builder: WriterPropertiesBuilder, + mut builder: WriterPropertiesBuilder, enable_dictionary: bool, cols_stats: impl NdvProvider, num_rows: usize, @@ -131,16 +131,12 @@ pub fn adjust_writer_properties_by_col_stats( if !enable_dictionary { return builder; }; - - let mut builder = builder.set_dictionary_enabled(false); - for field in table_schema.fields().iter() { let col_id = field.column_id(); if let Some(ndv) = cols_stats.column_ndv(&col_id) { - if (ndv as f64 / num_rows as f64) < 0.1 { - let name = field.name().as_str(); - builder = builder.set_column_dictionary_enabled(ColumnPath::from(name), true); - } + let enable_dictionary = (ndv as f64 / num_rows as f64) < 0.1; + let name = field.name().as_str(); + builder = builder.set_column_dictionary_enabled(ColumnPath::from(name), enable_dictionary); } } diff --git a/src/query/storages/fuse/src/io/write/block_statistics_writer.rs b/src/query/storages/fuse/src/io/write/block_statistics_writer.rs index 3257863962f40..cee078706f765 100644 --- a/src/query/storages/fuse/src/io/write/block_statistics_writer.rs +++ b/src/query/storages/fuse/src/io/write/block_statistics_writer.rs @@ -16,7 +16,7 @@ use std::collections::BTreeMap; use std::collections::HashMap; use databend_common_exception::Result; -use databend_common_expression::BlockEntry; +use databend_common_expression::{BlockEntry, ColumnId}; use databend_common_expression::DataBlock; use databend_common_expression::FieldIndex; use databend_common_expression::TableField; @@ -42,7 +42,7 @@ pub struct BlockStatsBuilder { pub struct ColumnNDVBuilder { index: FieldIndex, field: TableField, - builder: ColumnNDVEstimator, + pub builder: ColumnNDVEstimator, } impl BlockStatsBuilder { @@ -85,6 +85,13 @@ impl BlockStatsBuilder { Ok(()) } + pub fn peek_cols_ndv(&self) -> HashMap { + self.builders + .iter() + .map(|item| (item.field.column_id(), item.builder.peek())) + .collect() + } + pub fn finalize(self) -> Result> { if self.builders.is_empty() { return Ok(None); diff --git a/src/query/storages/fuse/src/io/write/stream/block_builder.rs b/src/query/storages/fuse/src/io/write/stream/block_builder.rs index c4961ec29d83d..1ca4fb97fcbea 100644 --- a/src/query/storages/fuse/src/io/write/stream/block_builder.rs +++ b/src/query/storages/fuse/src/io/write/stream/block_builder.rs @@ -396,7 +396,9 @@ impl StreamBlockBuilder { self.block_size += block.estimate_block_size(); if !had_existing_rows { - let cols_ndv = self.column_stats_state.peek_cols_ndv(); + // Initialize the writer with columns ndv + let mut cols_ndv = self.column_stats_state.peek_cols_ndv(); + cols_ndv.extend(self.block_stats_builder.peek_cols_ndv()); self.block_writer .start(ColumnsNdvInfo::new(block.num_rows(), cols_ndv))?; } From 47ac662fc53358239513af5ca4f261a0ac7a18a0 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Tue, 2 Dec 2025 15:19:27 +0800 Subject: [PATCH 11/25] minor refactor --- .../storages/common/blocks/src/parquet_rs.rs | 67 ++++++++++++++----- .../src/io/write/block_statistics_writer.rs | 3 +- .../fuse/src/io/write/stream/block_builder.rs | 18 ++--- 3 files changed, 60 insertions(+), 28 deletions(-) diff --git a/src/query/storages/common/blocks/src/parquet_rs.rs b/src/query/storages/common/blocks/src/parquet_rs.rs index fbb1019714ba0..53fb4960b99f9 100644 --- a/src/query/storages/common/blocks/src/parquet_rs.rs +++ b/src/query/storages/common/blocks/src/parquet_rs.rs @@ -60,21 +60,16 @@ pub fn blocks_to_parquet_with_stats( column_stats: Option<&StatisticsOfColumns>, ) -> Result { assert!(!blocks.is_empty()); - let builder = parquet_writer_properties_builder(compression, enable_dictionary, metadata); - let builder = if let Some(stats) = column_stats { - let num_rows = blocks[0].num_rows(); - adjust_writer_properties_by_col_stats( - builder, - enable_dictionary, - stats, - num_rows, - table_schema, - ) - } else { - builder - }; + let num_rows = blocks[0].num_rows(); + let props = build_parquet_writer_properties( + compression, + enable_dictionary, + column_stats, + metadata, + num_rows, + table_schema, + ); - let props = builder.build(); let batches = blocks .into_iter() .map(|block| block.to_record_batch(table_schema)) @@ -110,6 +105,47 @@ pub fn parquet_writer_properties_builder( builder.set_dictionary_enabled(false) } } +pub fn build_parquet_writer_properties( + compression: TableCompression, + enable_dictionary: bool, + cols_stats: Option, + metadata: Option>, + num_rows: usize, + table_schema: &TableSchema, +) -> WriterProperties { + let mut builder = WriterProperties::builder() + .set_compression(compression.into()) + // use `usize::MAX` to effectively limit the number of row groups to 1 + .set_max_row_group_size(usize::MAX) + .set_encoding(Encoding::PLAIN) + .set_statistics_enabled(EnabledStatistics::None) + .set_bloom_filter_enabled(false) + .set_key_value_metadata(metadata); + + if enable_dictionary { + // Enable dictionary for all columns + builder = builder + .set_writer_version(WriterVersion::PARQUET_2_0) + .set_dictionary_enabled(true); + if let Some(cols_stats) = cols_stats { + // Disable dictionary of columns that hava high cardinality + for field in table_schema.fields().iter() { + let col_id = field.column_id(); + if let Some(ndv) = cols_stats.column_ndv(&col_id) { + let high_cardinality = (ndv as f64 / num_rows as f64) > 0.1; + if high_cardinality { + let name = field.name().as_str(); + builder = + builder.set_column_dictionary_enabled(ColumnPath::from(name), false); + } + } + } + } + builder.build() + } else { + builder.set_dictionary_enabled(false).build() + } +} pub trait NdvProvider { fn column_ndv(&self, column_id: &ColumnId) -> Option; @@ -136,7 +172,8 @@ pub fn adjust_writer_properties_by_col_stats( if let Some(ndv) = cols_stats.column_ndv(&col_id) { let enable_dictionary = (ndv as f64 / num_rows as f64) < 0.1; let name = field.name().as_str(); - builder = builder.set_column_dictionary_enabled(ColumnPath::from(name), enable_dictionary); + builder = + builder.set_column_dictionary_enabled(ColumnPath::from(name), enable_dictionary); } } diff --git a/src/query/storages/fuse/src/io/write/block_statistics_writer.rs b/src/query/storages/fuse/src/io/write/block_statistics_writer.rs index cee078706f765..eb1b68dc2cef0 100644 --- a/src/query/storages/fuse/src/io/write/block_statistics_writer.rs +++ b/src/query/storages/fuse/src/io/write/block_statistics_writer.rs @@ -16,7 +16,8 @@ use std::collections::BTreeMap; use std::collections::HashMap; use databend_common_exception::Result; -use databend_common_expression::{BlockEntry, ColumnId}; +use databend_common_expression::BlockEntry; +use databend_common_expression::ColumnId; use databend_common_expression::DataBlock; use databend_common_expression::FieldIndex; use databend_common_expression::TableField; diff --git a/src/query/storages/fuse/src/io/write/stream/block_builder.rs b/src/query/storages/fuse/src/io/write/stream/block_builder.rs index 1ca4fb97fcbea..379dc39f685b9 100644 --- a/src/query/storages/fuse/src/io/write/stream/block_builder.rs +++ b/src/query/storages/fuse/src/io/write/stream/block_builder.rs @@ -40,8 +40,7 @@ use databend_common_meta_app::schema::TableIndex; use databend_common_native::write::NativeWriter; use databend_common_native::write::WriteOptions; use databend_common_sql::executor::physical_plans::MutationKind; -use databend_storages_common_blocks::adjust_writer_properties_by_col_stats; -use databend_storages_common_blocks::parquet_writer_properties_builder; +use databend_storages_common_blocks::build_parquet_writer_properties; use databend_storages_common_blocks::NdvProvider; use databend_storages_common_index::BloomIndex; use databend_storages_common_index::BloomIndexBuilder; @@ -83,23 +82,19 @@ pub struct UninitializedArrowWriter { impl UninitializedArrowWriter { fn init(&self, cols_ndv_info: ColumnsNdvInfo) -> Result>> { let write_settings = &self.write_settings; - let builder = parquet_writer_properties_builder( + let num_rows = cols_ndv_info.num_rows; + + let writer_properties = build_parquet_writer_properties( write_settings.table_compression, write_settings.enable_parquet_dictionary, + Some(cols_ndv_info), None, - ); - - let num_rows = cols_ndv_info.num_rows; - let builder = adjust_writer_properties_by_col_stats( - builder, - self.write_settings.enable_parquet_dictionary, - cols_ndv_info, num_rows, self.table_schema.as_ref(), ); let buffer = Vec::with_capacity(DEFAULT_BLOCK_BUFFER_SIZE); let writer = - ArrowWriter::try_new(buffer, self.arrow_schema.clone(), Some(builder.build()))?; + ArrowWriter::try_new(buffer, self.arrow_schema.clone(), Some(writer_properties))?; Ok(writer) } } @@ -161,7 +156,6 @@ impl ColumnsNdvInfo { Self { cols_ndv, num_rows } } } - impl NdvProvider for ColumnsNdvInfo { fn column_ndv(&self, column_id: &ColumnId) -> Option { self.cols_ndv.get(column_id).map(|v| *v as u64) From e7e5f10d290f9802de77c7098d0ed5c362eedc42 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Tue, 2 Dec 2025 17:13:42 +0800 Subject: [PATCH 12/25] refine --- .../storages/common/blocks/src/parquet_rs.rs | 34 ++++--------------- 1 file changed, 6 insertions(+), 28 deletions(-) diff --git a/src/query/storages/common/blocks/src/parquet_rs.rs b/src/query/storages/common/blocks/src/parquet_rs.rs index 53fb4960b99f9..bae3455cb12a9 100644 --- a/src/query/storages/common/blocks/src/parquet_rs.rs +++ b/src/query/storages/common/blocks/src/parquet_rs.rs @@ -128,15 +128,16 @@ pub fn build_parquet_writer_properties( .set_writer_version(WriterVersion::PARQUET_2_0) .set_dictionary_enabled(true); if let Some(cols_stats) = cols_stats { - // Disable dictionary of columns that hava high cardinality - for field in table_schema.fields().iter() { + // Disable dictionary of columns that have high cardinality + for field in table_schema.leaf_fields() { let col_id = field.column_id(); if let Some(ndv) = cols_stats.column_ndv(&col_id) { let high_cardinality = (ndv as f64 / num_rows as f64) > 0.1; if high_cardinality { - let name = field.name().as_str(); - builder = - builder.set_column_dictionary_enabled(ColumnPath::from(name), false); + builder = builder.set_column_dictionary_enabled( + ColumnPath::from(field.name().as_str()), + false, + ); } } } @@ -156,26 +157,3 @@ impl NdvProvider for &StatisticsOfColumns { self.get(column_id).and_then(|item| item.distinct_of_values) } } - -pub fn adjust_writer_properties_by_col_stats( - mut builder: WriterPropertiesBuilder, - enable_dictionary: bool, - cols_stats: impl NdvProvider, - num_rows: usize, - table_schema: &TableSchema, -) -> WriterPropertiesBuilder { - if !enable_dictionary { - return builder; - }; - for field in table_schema.fields().iter() { - let col_id = field.column_id(); - if let Some(ndv) = cols_stats.column_ndv(&col_id) { - let enable_dictionary = (ndv as f64 / num_rows as f64) < 0.1; - let name = field.name().as_str(); - builder = - builder.set_column_dictionary_enabled(ColumnPath::from(name), enable_dictionary); - } - } - - builder -} From 1c9de50c47cf3b2ca8315f1b22f258a2a0b5cef5 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Tue, 2 Dec 2025 18:05:06 +0800 Subject: [PATCH 13/25] add unit tests --- .../storages/common/blocks/src/parquet_rs.rs | 101 ++++++++++++++++++ 1 file changed, 101 insertions(+) diff --git a/src/query/storages/common/blocks/src/parquet_rs.rs b/src/query/storages/common/blocks/src/parquet_rs.rs index bae3455cb12a9..7fe1abf9b14fa 100644 --- a/src/query/storages/common/blocks/src/parquet_rs.rs +++ b/src/query/storages/common/blocks/src/parquet_rs.rs @@ -157,3 +157,104 @@ impl NdvProvider for &StatisticsOfColumns { self.get(column_id).and_then(|item| item.distinct_of_values) } } + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use databend_common_expression::types::number::NumberDataType; + use databend_common_expression::TableDataType; + use databend_common_expression::TableField; + + use super::*; + + struct TestNdvProvider { + ndv: HashMap, + } + + impl NdvProvider for TestNdvProvider { + fn column_ndv(&self, column_id: &ColumnId) -> Option { + self.ndv.get(column_id).copied() + } + } + + fn sample_schema() -> TableSchema { + TableSchema::new(vec![ + TableField::new("simple", TableDataType::Number(NumberDataType::Int32)), + TableField::new("nested", TableDataType::Tuple { + fields_name: vec!["leaf".to_string(), "arr".to_string()], + fields_type: vec![ + TableDataType::Number(NumberDataType::Int64), + TableDataType::Array(Box::new(TableDataType::Number(NumberDataType::UInt64))), + ], + }), + TableField::new("no_stats", TableDataType::String), + ]) + } + + fn column_id(schema: &TableSchema, name: &str) -> ColumnId { + schema + .leaf_fields() + .into_iter() + .find(|field| field.name() == name) + .unwrap_or_else(|| panic!("missing field {}", name)) + .column_id() + } + + #[test] + fn test_build_parquet_writer_properties_handles_nested_leaves() { + let schema = sample_schema(); + + let mut ndv = HashMap::new(); + ndv.insert(column_id(&schema, "simple"), 500); + ndv.insert(column_id(&schema, "nested:leaf"), 50); + ndv.insert(column_id(&schema, "nested:arr:0"), 400); + + let props = build_parquet_writer_properties( + TableCompression::Zstd, + true, + Some(TestNdvProvider { ndv }), + None, + 1000, + &schema, + ); + + assert!( + !props.dictionary_enabled(&ColumnPath::from("simple")), + "high cardinality top-level column should disable dictionary" + ); + assert!( + props.dictionary_enabled(&ColumnPath::from("nested:leaf")), + "low cardinality nested column should keep dictionary" + ); + assert!( + !props.dictionary_enabled(&ColumnPath::from("nested:arr:0")), + "high cardinality nested array element should disable dictionary" + ); + assert!( + props.dictionary_enabled(&ColumnPath::from("no_stats")), + "columns without NDV stats keep the default dictionary behavior" + ); + } + + #[test] + fn test_build_parquet_writer_properties_disabled_globally() { + let schema = sample_schema(); + + let props = build_parquet_writer_properties( + TableCompression::Zstd, + false, + None::, + None, + 1000, + &schema, + ); + + for field in schema.leaf_fields() { + assert!( + !props.dictionary_enabled(&ColumnPath::from(field.name().as_str())), + "dictionary must remain disabled when enable_dictionary is false", + ); + } + } +} From 5e99562589ae04019f2c73e06a35be61d66b0ebb Mon Sep 17 00:00:00 2001 From: dantengsky Date: Wed, 3 Dec 2025 20:43:47 +0800 Subject: [PATCH 14/25] fix ArrowParquetWriter::in_progress_size Returns zero if the writer is not initialized, rather than panicking. --- .../storages/fuse/src/io/write/stream/block_builder.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/query/storages/fuse/src/io/write/stream/block_builder.rs b/src/query/storages/fuse/src/io/write/stream/block_builder.rs index 379dc39f685b9..8fc20418b970d 100644 --- a/src/query/storages/fuse/src/io/write/stream/block_builder.rs +++ b/src/query/storages/fuse/src/io/write/stream/block_builder.rs @@ -139,10 +139,10 @@ impl ArrowParquetWriter { } fn in_progress_size(&self) -> usize { - let Initialized(writer) = self else { - unreachable!("ArrowParquetWriter::in_progress_size called before initialization"); - }; - writer.inner.in_progress_size() + match self { + ArrowParquetWriter::Uninitialized(_) => 0, + Initialized(writer) => writer.inner.in_progress_size(), + } } } From 1f634f9052786db7a20a35b5423df431f2e5ced6 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Wed, 3 Dec 2025 22:21:33 +0800 Subject: [PATCH 15/25] remove unnecessary schema conversions --- src/query/expression/src/converts/arrow/to.rs | 30 ++++++++++++++++++- .../storages/common/blocks/src/parquet_rs.rs | 5 ++-- 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/src/query/expression/src/converts/arrow/to.rs b/src/query/expression/src/converts/arrow/to.rs index 73437acce117c..4d8ec2d35cef6 100644 --- a/src/query/expression/src/converts/arrow/to.rs +++ b/src/query/expression/src/converts/arrow/to.rs @@ -264,6 +264,30 @@ impl DataBlock { self.to_record_batch(&table_schema) } + pub fn to_record_batch_with_arrow_schema( + self, + arrow_schema: &Arc, + ) -> Result { + let num_fields = arrow_schema.fields.len(); + if self.columns().len() != num_fields { + return Err(ErrorCode::Internal(format!( + "The number of columns in the data block does not match the number of fields in the table schema, block_columns: {}, table_schema_fields: {}", + self.columns().len(), + num_fields, + ))); + } + + if num_fields == 0 { + return Ok(RecordBatch::try_new_with_options( + Arc::new(Schema::empty()), + vec![], + &RecordBatchOptions::default().with_row_count(Some(self.num_rows())), + )?); + } + + self.build_record_batch(arrow_schema.clone()) + } + pub fn to_record_batch(self, table_schema: &TableSchema) -> Result { if self.columns().len() != table_schema.num_fields() { return Err(ErrorCode::Internal(format!( @@ -282,6 +306,10 @@ impl DataBlock { } let arrow_schema = Schema::from(table_schema); + self.build_record_batch(Arc::new(arrow_schema)) + } + + fn build_record_batch(self, arrow_schema: Arc) -> Result { let mut arrays = Vec::with_capacity(self.columns().len()); for (entry, arrow_field) in self.take_columns().into_iter().zip(arrow_schema.fields()) { let array = entry.to_column().maybe_gc().into_arrow_rs(); @@ -289,7 +317,7 @@ impl DataBlock { // Adjust struct array names arrays.push(Self::adjust_nested_array(array, arrow_field.as_ref())); } - Ok(RecordBatch::try_new(Arc::new(arrow_schema), arrays)?) + Ok(RecordBatch::try_new(arrow_schema, arrays)?) } fn adjust_nested_array(array: Arc, arrow_field: &Field) -> Arc { diff --git a/src/query/storages/common/blocks/src/parquet_rs.rs b/src/query/storages/common/blocks/src/parquet_rs.rs index 7fe1abf9b14fa..1ca2561b8da07 100644 --- a/src/query/storages/common/blocks/src/parquet_rs.rs +++ b/src/query/storages/common/blocks/src/parquet_rs.rs @@ -70,11 +70,12 @@ pub fn blocks_to_parquet_with_stats( table_schema, ); + let arrow_schema = Arc::new(table_schema.into()); let batches = blocks .into_iter() - .map(|block| block.to_record_batch(table_schema)) + .map(|block| block.to_record_batch_with_arrow_schema(&arrow_schema)) .collect::>>()?; - let arrow_schema = Arc::new(table_schema.into()); + let mut writer = ArrowWriter::try_new(write_buffer, arrow_schema, Some(props))?; for batch in batches { writer.write(&batch)?; From 807537638758a1e5cfdba30c9ad2c4dfa472bb10 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Thu, 4 Dec 2025 13:33:50 +0800 Subject: [PATCH 16/25] cleanup --- src/query/expression/src/converts/arrow/to.rs | 1 - .../storages/common/blocks/src/parquet_rs.rs | 27 +++---------------- .../fuse/src/io/write/stream/block_builder.rs | 11 -------- 3 files changed, 3 insertions(+), 36 deletions(-) diff --git a/src/query/expression/src/converts/arrow/to.rs b/src/query/expression/src/converts/arrow/to.rs index 4d8ec2d35cef6..ac2af27742693 100644 --- a/src/query/expression/src/converts/arrow/to.rs +++ b/src/query/expression/src/converts/arrow/to.rs @@ -93,7 +93,6 @@ impl From<&DataType> for ArrowDataType { impl From<&TableField> for Field { fn from(f: &TableField) -> Self { let mut metadata = HashMap::new(); - let ty = match &f.data_type { TableDataType::Null => ArrowDataType::Null, TableDataType::EmptyArray => { diff --git a/src/query/storages/common/blocks/src/parquet_rs.rs b/src/query/storages/common/blocks/src/parquet_rs.rs index 1ca2561b8da07..c733bec7362d8 100644 --- a/src/query/storages/common/blocks/src/parquet_rs.rs +++ b/src/query/storages/common/blocks/src/parquet_rs.rs @@ -25,7 +25,6 @@ use parquet::basic::Encoding; use parquet::file::metadata::KeyValue; use parquet::file::properties::EnabledStatistics; use parquet::file::properties::WriterProperties; -use parquet::file::properties::WriterPropertiesBuilder; use parquet::file::properties::WriterVersion; use parquet::format::FileMetaData; use parquet::schema::types::ColumnPath; @@ -60,7 +59,10 @@ pub fn blocks_to_parquet_with_stats( column_stats: Option<&StatisticsOfColumns>, ) -> Result { assert!(!blocks.is_empty()); + let num_rows = blocks[0].num_rows(); + let arrow_schema = Arc::new(table_schema.into()); + let props = build_parquet_writer_properties( compression, enable_dictionary, @@ -70,7 +72,6 @@ pub fn blocks_to_parquet_with_stats( table_schema, ); - let arrow_schema = Arc::new(table_schema.into()); let batches = blocks .into_iter() .map(|block| block.to_record_batch_with_arrow_schema(&arrow_schema)) @@ -84,28 +85,6 @@ pub fn blocks_to_parquet_with_stats( Ok(file_meta) } -pub fn parquet_writer_properties_builder( - compression: TableCompression, - enable_dictionary: bool, - metadata: Option>, -) -> WriterPropertiesBuilder { - let builder = WriterProperties::builder() - .set_compression(compression.into()) - // use `usize::MAX` to effectively limit the number of row groups to 1 - .set_max_row_group_size(usize::MAX) - .set_encoding(Encoding::PLAIN) - .set_statistics_enabled(EnabledStatistics::None) - .set_bloom_filter_enabled(false) - .set_key_value_metadata(metadata); - - if enable_dictionary { - builder - .set_writer_version(WriterVersion::PARQUET_2_0) - .set_dictionary_enabled(true) - } else { - builder.set_dictionary_enabled(false) - } -} pub fn build_parquet_writer_properties( compression: TableCompression, enable_dictionary: bool, diff --git a/src/query/storages/fuse/src/io/write/stream/block_builder.rs b/src/query/storages/fuse/src/io/write/stream/block_builder.rs index 8fc20418b970d..d798b8768ae48 100644 --- a/src/query/storages/fuse/src/io/write/stream/block_builder.rs +++ b/src/query/storages/fuse/src/io/write/stream/block_builder.rs @@ -273,17 +273,6 @@ impl StreamBlockBuilder { let buffer = Vec::with_capacity(DEFAULT_BLOCK_BUFFER_SIZE); let block_writer = match properties.write_settings.storage_format { FuseStorageFormat::Parquet => { - // let write_settings = &properties.write_settings; - // // TODO NDV for heuristic rule - // let props = parquet_writer_properties_builder( - // write_settings.table_compression, - // write_settings.enable_parquet_dictionary, - // None, - // ) - // .build(); - // - // let arrow_schema = Arc::new(properties.source_schema.as_ref().into()); - // let writer = ArrowWriter::try_new(buffer, arrow_schema, Some(props))?; BlockWriterImpl::Parquet(ArrowParquetWriter::new_uninitialized( properties.write_settings.clone(), properties.source_schema.clone(), From b1c1e3771ce26461bc1c7ea835ef9b3705b79d87 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Thu, 4 Dec 2025 14:15:54 +0800 Subject: [PATCH 17/25] fix ColumnPath --- .../storages/common/blocks/src/parquet_rs.rs | 79 ++++++++++++++++--- 1 file changed, 67 insertions(+), 12 deletions(-) diff --git a/src/query/storages/common/blocks/src/parquet_rs.rs b/src/query/storages/common/blocks/src/parquet_rs.rs index c733bec7362d8..26033d37e72e0 100644 --- a/src/query/storages/common/blocks/src/parquet_rs.rs +++ b/src/query/storages/common/blocks/src/parquet_rs.rs @@ -17,7 +17,9 @@ use std::sync::Arc; use databend_common_exception::Result; use databend_common_expression::ColumnId; use databend_common_expression::DataBlock; +use databend_common_expression::TableDataType; use databend_common_expression::TableSchema; +use databend_common_expression::is_internal_column_id; use databend_storages_common_table_meta::meta::StatisticsOfColumns; use databend_storages_common_table_meta::table::TableCompression; use parquet::arrow::ArrowWriter; @@ -109,15 +111,11 @@ pub fn build_parquet_writer_properties( .set_dictionary_enabled(true); if let Some(cols_stats) = cols_stats { // Disable dictionary of columns that have high cardinality - for field in table_schema.leaf_fields() { - let col_id = field.column_id(); - if let Some(ndv) = cols_stats.column_ndv(&col_id) { + for (column_id, column_path) in leaf_column_paths(table_schema) { + if let Some(ndv) = cols_stats.column_ndv(&column_id) { let high_cardinality = (ndv as f64 / num_rows as f64) > 0.1; if high_cardinality { - builder = builder.set_column_dictionary_enabled( - ColumnPath::from(field.name().as_str()), - false, - ); + builder = builder.set_column_dictionary_enabled(column_path, false); } } } @@ -128,6 +126,57 @@ pub fn build_parquet_writer_properties( } } +fn leaf_column_paths(table_schema: &TableSchema) -> Vec<(ColumnId, ColumnPath)> { + let mut paths = Vec::new(); + for field in table_schema.fields() { + if is_internal_column_id(field.column_id()) { + continue; + } + + let mut next_column_id = field.column_id(); + let mut current_path = vec![field.name().clone()]; + collect_leaf_column_paths(field.data_type(), &mut current_path, &mut next_column_id, &mut paths); + } + paths +} + +fn collect_leaf_column_paths( + data_type: &TableDataType, + current_path: &mut Vec, + next_column_id: &mut ColumnId, + paths: &mut Vec<(ColumnId, ColumnPath)>, +) { + match data_type { + TableDataType::Nullable(inner) => { + collect_leaf_column_paths(inner.as_ref(), current_path, next_column_id, paths); + } + TableDataType::Tuple { + fields_name, + fields_type, + } => { + for (name, ty) in fields_name.iter().zip(fields_type) { + current_path.push(name.clone()); + collect_leaf_column_paths(ty, current_path, next_column_id, paths); + current_path.pop(); + } + } + TableDataType::Array(inner) => { + current_path.push("_array".to_string()); + collect_leaf_column_paths(inner.as_ref(), current_path, next_column_id, paths); + current_path.pop(); + } + TableDataType::Map(inner) => { + current_path.push("entries".to_string()); + collect_leaf_column_paths(inner.as_ref(), current_path, next_column_id, paths); + current_path.pop(); + } + _ => { + paths.push((*next_column_id, ColumnPath::from(current_path.clone()))); + *next_column_id += 1; + } + } +} + pub trait NdvProvider { fn column_ndv(&self, column_id: &ColumnId) -> Option; } @@ -190,6 +239,9 @@ mod tests { ndv.insert(column_id(&schema, "nested:leaf"), 50); ndv.insert(column_id(&schema, "nested:arr:0"), 400); + let column_paths: HashMap = + leaf_column_paths(&schema).into_iter().collect(); + let props = build_parquet_writer_properties( TableCompression::Zstd, true, @@ -200,19 +252,19 @@ mod tests { ); assert!( - !props.dictionary_enabled(&ColumnPath::from("simple")), + !props.dictionary_enabled(&column_paths[&column_id(&schema, "simple")]), "high cardinality top-level column should disable dictionary" ); assert!( - props.dictionary_enabled(&ColumnPath::from("nested:leaf")), + props.dictionary_enabled(&column_paths[&column_id(&schema, "nested:leaf")]), "low cardinality nested column should keep dictionary" ); assert!( - !props.dictionary_enabled(&ColumnPath::from("nested:arr:0")), + !props.dictionary_enabled(&column_paths[&column_id(&schema, "nested:arr:0")]), "high cardinality nested array element should disable dictionary" ); assert!( - props.dictionary_enabled(&ColumnPath::from("no_stats")), + props.dictionary_enabled(&column_paths[&column_id(&schema, "no_stats")]), "columns without NDV stats keep the default dictionary behavior" ); } @@ -221,6 +273,9 @@ mod tests { fn test_build_parquet_writer_properties_disabled_globally() { let schema = sample_schema(); + let column_paths: HashMap = + leaf_column_paths(&schema).into_iter().collect(); + let props = build_parquet_writer_properties( TableCompression::Zstd, false, @@ -232,7 +287,7 @@ mod tests { for field in schema.leaf_fields() { assert!( - !props.dictionary_enabled(&ColumnPath::from(field.name().as_str())), + !props.dictionary_enabled(&column_paths[&field.column_id()]), "dictionary must remain disabled when enable_dictionary is false", ); } From 9482a339d1bc44c3c8543fb8dd4db51c1f38bdd7 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Thu, 4 Dec 2025 16:17:26 +0800 Subject: [PATCH 18/25] refactor column path extraction --- Cargo.lock | 1 + src/query/storages/common/blocks/Cargo.toml | 1 + .../storages/common/blocks/src/parquet_rs.rs | 85 +++++++++++++++---- 3 files changed, 72 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 758356029b890..33fc1cf4e91f3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5685,6 +5685,7 @@ dependencies = [ name = "databend-storages-common-blocks" version = "0.1.0" dependencies = [ + "arrow-schema 56.2.0", "databend-common-exception", "databend-common-expression", "databend-storages-common-table-meta", diff --git a/src/query/storages/common/blocks/Cargo.toml b/src/query/storages/common/blocks/Cargo.toml index 931136b27b318..de4c15f3ee7e2 100644 --- a/src/query/storages/common/blocks/Cargo.toml +++ b/src/query/storages/common/blocks/Cargo.toml @@ -7,6 +7,7 @@ publish = { workspace = true } edition = { workspace = true } [dependencies] +arrow-schema = { workspace = true } databend-common-exception = { workspace = true } databend-common-expression = { workspace = true } databend-storages-common-table-meta = { workspace = true } diff --git a/src/query/storages/common/blocks/src/parquet_rs.rs b/src/query/storages/common/blocks/src/parquet_rs.rs index 26033d37e72e0..22bdedea01578 100644 --- a/src/query/storages/common/blocks/src/parquet_rs.rs +++ b/src/query/storages/common/blocks/src/parquet_rs.rs @@ -14,12 +14,14 @@ use std::sync::Arc; +use arrow_schema::DataType as ArrowDataType; +use arrow_schema::Field as ArrowField; use databend_common_exception::Result; +use databend_common_expression::is_internal_column_id; use databend_common_expression::ColumnId; use databend_common_expression::DataBlock; use databend_common_expression::TableDataType; use databend_common_expression::TableSchema; -use databend_common_expression::is_internal_column_id; use databend_storages_common_table_meta::meta::StatisticsOfColumns; use databend_storages_common_table_meta::table::TableCompression; use parquet::arrow::ArrowWriter; @@ -133,41 +135,94 @@ fn leaf_column_paths(table_schema: &TableSchema) -> Vec<(ColumnId, ColumnPath)> continue; } + let arrow_field = ArrowField::from(field); let mut next_column_id = field.column_id(); - let mut current_path = vec![field.name().clone()]; - collect_leaf_column_paths(field.data_type(), &mut current_path, &mut next_column_id, &mut paths); + let mut current_path = vec![arrow_field.name().clone()]; + collect_leaf_column_paths( + field.data_type(), + &arrow_field, + &mut current_path, + &mut next_column_id, + &mut paths, + ); } paths } fn collect_leaf_column_paths( data_type: &TableDataType, + arrow_field: &ArrowField, current_path: &mut Vec, next_column_id: &mut ColumnId, paths: &mut Vec<(ColumnId, ColumnPath)>, ) { match data_type { TableDataType::Nullable(inner) => { - collect_leaf_column_paths(inner.as_ref(), current_path, next_column_id, paths); + collect_leaf_column_paths( + inner.as_ref(), + arrow_field, + current_path, + next_column_id, + paths, + ); } - TableDataType::Tuple { - fields_name, - fields_type, - } => { - for (name, ty) in fields_name.iter().zip(fields_type) { - current_path.push(name.clone()); - collect_leaf_column_paths(ty, current_path, next_column_id, paths); + TableDataType::Tuple { fields_type, .. } => { + let children = match arrow_field.data_type() { + ArrowDataType::Struct(fields) => fields, + other => { + debug_assert!(false, "unexpected Arrow type for tuple: {:?}", other); + return; + } + }; + debug_assert_eq!(children.len(), fields_type.len()); + for (child_field, ty) in children.iter().zip(fields_type) { + current_path.push(child_field.name().clone()); + collect_leaf_column_paths( + ty, + child_field.as_ref(), + current_path, + next_column_id, + paths, + ); current_path.pop(); } } TableDataType::Array(inner) => { - current_path.push("_array".to_string()); - collect_leaf_column_paths(inner.as_ref(), current_path, next_column_id, paths); + let child_field = match arrow_field.data_type() { + ArrowDataType::LargeList(child) + | ArrowDataType::List(child) + | ArrowDataType::FixedSizeList(child, _) => child, + other => { + debug_assert!(false, "unexpected Arrow type for array: {:?}", other); + return; + } + }; + current_path.push(child_field.name().clone()); + collect_leaf_column_paths( + inner.as_ref(), + child_field.as_ref(), + current_path, + next_column_id, + paths, + ); current_path.pop(); } TableDataType::Map(inner) => { - current_path.push("entries".to_string()); - collect_leaf_column_paths(inner.as_ref(), current_path, next_column_id, paths); + let entry_field = match arrow_field.data_type() { + ArrowDataType::Map(child, _) => child, + other => { + debug_assert!(false, "unexpected Arrow type for map: {:?}", other); + return; + } + }; + current_path.push(entry_field.name().clone()); + collect_leaf_column_paths( + inner.as_ref(), + entry_field.as_ref(), + current_path, + next_column_id, + paths, + ); current_path.pop(); } _ => { From 2ebafa11404783d1bf4da6d9dfb698b701ce71e4 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Thu, 4 Dec 2025 17:59:26 +0800 Subject: [PATCH 19/25] refine collect_leaf_column_paths --- .../storages/common/blocks/src/parquet_rs.rs | 108 +++++------------- 1 file changed, 27 insertions(+), 81 deletions(-) diff --git a/src/query/storages/common/blocks/src/parquet_rs.rs b/src/query/storages/common/blocks/src/parquet_rs.rs index 22bdedea01578..b63bccb6e3309 100644 --- a/src/query/storages/common/blocks/src/parquet_rs.rs +++ b/src/query/storages/common/blocks/src/parquet_rs.rs @@ -20,7 +20,6 @@ use databend_common_exception::Result; use databend_common_expression::is_internal_column_id; use databend_common_expression::ColumnId; use databend_common_expression::DataBlock; -use databend_common_expression::TableDataType; use databend_common_expression::TableSchema; use databend_storages_common_table_meta::meta::StatisticsOfColumns; use databend_storages_common_table_meta::table::TableCompression; @@ -129,106 +128,53 @@ pub fn build_parquet_writer_properties( } fn leaf_column_paths(table_schema: &TableSchema) -> Vec<(ColumnId, ColumnPath)> { - let mut paths = Vec::new(); + let mut arrow_paths = Vec::new(); for field in table_schema.fields() { if is_internal_column_id(field.column_id()) { continue; } let arrow_field = ArrowField::from(field); - let mut next_column_id = field.column_id(); let mut current_path = vec![arrow_field.name().clone()]; - collect_leaf_column_paths( - field.data_type(), - &arrow_field, - &mut current_path, - &mut next_column_id, - &mut paths, - ); + collect_arrow_leaf_paths(&arrow_field, &mut current_path, &mut arrow_paths); } - paths + + let leaf_fields = table_schema.leaf_fields(); + debug_assert_eq!(leaf_fields.len(), arrow_paths.len()); + + leaf_fields + .into_iter() + .zip(arrow_paths.into_iter()) + .map(|(field, path)| (field.column_id(), ColumnPath::from(path))) + .collect() } -fn collect_leaf_column_paths( - data_type: &TableDataType, +fn collect_arrow_leaf_paths( arrow_field: &ArrowField, current_path: &mut Vec, - next_column_id: &mut ColumnId, - paths: &mut Vec<(ColumnId, ColumnPath)>, + paths: &mut Vec>, ) { - match data_type { - TableDataType::Nullable(inner) => { - collect_leaf_column_paths( - inner.as_ref(), - arrow_field, - current_path, - next_column_id, - paths, - ); - } - TableDataType::Tuple { fields_type, .. } => { - let children = match arrow_field.data_type() { - ArrowDataType::Struct(fields) => fields, - other => { - debug_assert!(false, "unexpected Arrow type for tuple: {:?}", other); - return; - } - }; - debug_assert_eq!(children.len(), fields_type.len()); - for (child_field, ty) in children.iter().zip(fields_type) { - current_path.push(child_field.name().clone()); - collect_leaf_column_paths( - ty, - child_field.as_ref(), - current_path, - next_column_id, - paths, - ); + match arrow_field.data_type() { + ArrowDataType::Struct(children) => { + for child in children { + current_path.push(child.name().clone()); + collect_arrow_leaf_paths(child.as_ref(), current_path, paths); current_path.pop(); } } - TableDataType::Array(inner) => { - let child_field = match arrow_field.data_type() { - ArrowDataType::LargeList(child) - | ArrowDataType::List(child) - | ArrowDataType::FixedSizeList(child, _) => child, - other => { - debug_assert!(false, "unexpected Arrow type for array: {:?}", other); - return; - } - }; - current_path.push(child_field.name().clone()); - collect_leaf_column_paths( - inner.as_ref(), - child_field.as_ref(), - current_path, - next_column_id, - paths, - ); + ArrowDataType::Map(child, _) => { + current_path.push(child.name().clone()); + collect_arrow_leaf_paths(child.as_ref(), current_path, paths); current_path.pop(); } - TableDataType::Map(inner) => { - let entry_field = match arrow_field.data_type() { - ArrowDataType::Map(child, _) => child, - other => { - debug_assert!(false, "unexpected Arrow type for map: {:?}", other); - return; - } - }; - current_path.push(entry_field.name().clone()); - collect_leaf_column_paths( - inner.as_ref(), - entry_field.as_ref(), - current_path, - next_column_id, - paths, - ); + ArrowDataType::LargeList(child) + | ArrowDataType::List(child) + | ArrowDataType::FixedSizeList(child, _) => { + current_path.push(child.name().clone()); + collect_arrow_leaf_paths(child.as_ref(), current_path, paths); current_path.pop(); } - _ => { - paths.push((*next_column_id, ColumnPath::from(current_path.clone()))); - *next_column_id += 1; - } + _ => paths.push(current_path.clone()), } } From 1470f3fa340ed6619e0a3ea67b25c1e44549b77c Mon Sep 17 00:00:00 2001 From: dantengsky Date: Thu, 4 Dec 2025 18:17:06 +0800 Subject: [PATCH 20/25] minor refactor --- Cargo.lock | 1 - .../expression/src/converts/arrow/mod.rs | 1 + src/query/expression/src/converts/arrow/to.rs | 53 ++++++++++++++ src/query/storages/common/blocks/Cargo.toml | 1 - .../storages/common/blocks/src/parquet_rs.rs | 72 ++++--------------- 5 files changed, 66 insertions(+), 62 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 33fc1cf4e91f3..758356029b890 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5685,7 +5685,6 @@ dependencies = [ name = "databend-storages-common-blocks" version = "0.1.0" dependencies = [ - "arrow-schema 56.2.0", "databend-common-exception", "databend-common-expression", "databend-storages-common-table-meta", diff --git a/src/query/expression/src/converts/arrow/mod.rs b/src/query/expression/src/converts/arrow/mod.rs index f1429cac4d7c2..c5678ffaa8252 100644 --- a/src/query/expression/src/converts/arrow/mod.rs +++ b/src/query/expression/src/converts/arrow/mod.rs @@ -14,6 +14,7 @@ mod from; mod to; +pub use self::to::table_schema_arrow_leaf_paths; pub const EXTENSION_KEY: &str = "Extension"; pub const ARROW_EXT_TYPE_EMPTY_ARRAY: &str = "EmptyArray"; diff --git a/src/query/expression/src/converts/arrow/to.rs b/src/query/expression/src/converts/arrow/to.rs index ac2af27742693..42ba82fc8c166 100644 --- a/src/query/expression/src/converts/arrow/to.rs +++ b/src/query/expression/src/converts/arrow/to.rs @@ -41,6 +41,7 @@ use super::ARROW_EXT_TYPE_VARIANT; use super::ARROW_EXT_TYPE_VECTOR; use super::EXTENSION_KEY; use crate::infer_table_schema; +use crate::schema::is_internal_column_id; use crate::types::DataType; use crate::types::DecimalColumn; use crate::types::DecimalDataType; @@ -50,6 +51,7 @@ use crate::types::VectorColumn; use crate::types::VectorDataType; use crate::with_number_type; use crate::Column; +use crate::ColumnId; use crate::DataBlock; use crate::DataField; use crate::DataSchema; @@ -82,6 +84,57 @@ impl From<&TableSchema> for Schema { } } +pub fn table_schema_arrow_leaf_paths(table_schema: &TableSchema) -> Vec<(ColumnId, Vec)> { + let mut arrow_paths = Vec::new(); + for field in table_schema.fields() { + if is_internal_column_id(field.column_id()) { + continue; + } + + let arrow_field = Field::from(field); + let mut current_path = vec![arrow_field.name().clone()]; + collect_arrow_leaf_paths(&arrow_field, &mut current_path, &mut arrow_paths); + } + + let leaf_fields = table_schema.leaf_fields(); + debug_assert_eq!(leaf_fields.len(), arrow_paths.len()); + + leaf_fields + .into_iter() + .zip(arrow_paths.into_iter()) + .map(|(field, path)| (field.column_id(), path)) + .collect() +} + +fn collect_arrow_leaf_paths( + arrow_field: &Field, + current_path: &mut Vec, + paths: &mut Vec>, +) { + match arrow_field.data_type() { + ArrowDataType::Struct(children) => { + for child in children { + current_path.push(child.name().clone()); + collect_arrow_leaf_paths(child.as_ref(), current_path, paths); + current_path.pop(); + } + } + ArrowDataType::Map(child, _) => { + current_path.push(child.name().clone()); + collect_arrow_leaf_paths(child.as_ref(), current_path, paths); + current_path.pop(); + } + ArrowDataType::LargeList(child) + | ArrowDataType::List(child) + | ArrowDataType::FixedSizeList(child, _) => { + current_path.push(child.name().clone()); + collect_arrow_leaf_paths(child.as_ref(), current_path, paths); + current_path.pop(); + } + _ => paths.push(current_path.clone()), + } +} + impl From<&DataType> for ArrowDataType { fn from(ty: &DataType) -> Self { let fields = DataField::new("dummy", ty.clone()); diff --git a/src/query/storages/common/blocks/Cargo.toml b/src/query/storages/common/blocks/Cargo.toml index de4c15f3ee7e2..931136b27b318 100644 --- a/src/query/storages/common/blocks/Cargo.toml +++ b/src/query/storages/common/blocks/Cargo.toml @@ -7,7 +7,6 @@ publish = { workspace = true } edition = { workspace = true } [dependencies] -arrow-schema = { workspace = true } databend-common-exception = { workspace = true } databend-common-expression = { workspace = true } databend-storages-common-table-meta = { workspace = true } diff --git a/src/query/storages/common/blocks/src/parquet_rs.rs b/src/query/storages/common/blocks/src/parquet_rs.rs index b63bccb6e3309..c2f821038b8b0 100644 --- a/src/query/storages/common/blocks/src/parquet_rs.rs +++ b/src/query/storages/common/blocks/src/parquet_rs.rs @@ -14,10 +14,8 @@ use std::sync::Arc; -use arrow_schema::DataType as ArrowDataType; -use arrow_schema::Field as ArrowField; use databend_common_exception::Result; -use databend_common_expression::is_internal_column_id; +use databend_common_expression::converts::arrow::table_schema_arrow_leaf_paths; use databend_common_expression::ColumnId; use databend_common_expression::DataBlock; use databend_common_expression::TableSchema; @@ -112,11 +110,12 @@ pub fn build_parquet_writer_properties( .set_dictionary_enabled(true); if let Some(cols_stats) = cols_stats { // Disable dictionary of columns that have high cardinality - for (column_id, column_path) in leaf_column_paths(table_schema) { + for (column_id, components) in table_schema_arrow_leaf_paths(table_schema) { if let Some(ndv) = cols_stats.column_ndv(&column_id) { let high_cardinality = (ndv as f64 / num_rows as f64) > 0.1; if high_cardinality { - builder = builder.set_column_dictionary_enabled(column_path, false); + builder = builder + .set_column_dictionary_enabled(ColumnPath::from(components), false); } } } @@ -127,57 +126,6 @@ pub fn build_parquet_writer_properties( } } -fn leaf_column_paths(table_schema: &TableSchema) -> Vec<(ColumnId, ColumnPath)> { - let mut arrow_paths = Vec::new(); - for field in table_schema.fields() { - if is_internal_column_id(field.column_id()) { - continue; - } - - let arrow_field = ArrowField::from(field); - let mut current_path = vec![arrow_field.name().clone()]; - collect_arrow_leaf_paths(&arrow_field, &mut current_path, &mut arrow_paths); - } - - let leaf_fields = table_schema.leaf_fields(); - debug_assert_eq!(leaf_fields.len(), arrow_paths.len()); - - leaf_fields - .into_iter() - .zip(arrow_paths.into_iter()) - .map(|(field, path)| (field.column_id(), ColumnPath::from(path))) - .collect() -} - -fn collect_arrow_leaf_paths( - arrow_field: &ArrowField, - current_path: &mut Vec, - paths: &mut Vec>, -) { - match arrow_field.data_type() { - ArrowDataType::Struct(children) => { - for child in children { - current_path.push(child.name().clone()); - collect_arrow_leaf_paths(child.as_ref(), current_path, paths); - current_path.pop(); - } - } - ArrowDataType::Map(child, _) => { - current_path.push(child.name().clone()); - collect_arrow_leaf_paths(child.as_ref(), current_path, paths); - current_path.pop(); - } - ArrowDataType::LargeList(child) - | ArrowDataType::List(child) - | ArrowDataType::FixedSizeList(child, _) => { - current_path.push(child.name().clone()); - collect_arrow_leaf_paths(child.as_ref(), current_path, paths); - current_path.pop(); - } - _ => paths.push(current_path.clone()), - } -} - pub trait NdvProvider { fn column_ndv(&self, column_id: &ColumnId) -> Option; } @@ -240,8 +188,10 @@ mod tests { ndv.insert(column_id(&schema, "nested:leaf"), 50); ndv.insert(column_id(&schema, "nested:arr:0"), 400); - let column_paths: HashMap = - leaf_column_paths(&schema).into_iter().collect(); + let column_paths: HashMap = table_schema_arrow_leaf_paths(&schema) + .into_iter() + .map(|(id, path)| (id, ColumnPath::from(path))) + .collect(); let props = build_parquet_writer_properties( TableCompression::Zstd, @@ -274,8 +224,10 @@ mod tests { fn test_build_parquet_writer_properties_disabled_globally() { let schema = sample_schema(); - let column_paths: HashMap = - leaf_column_paths(&schema).into_iter().collect(); + let column_paths: HashMap = table_schema_arrow_leaf_paths(&schema) + .into_iter() + .map(|(id, path)| (id, ColumnPath::from(path))) + .collect(); let props = build_parquet_writer_properties( TableCompression::Zstd, From 489e98958ad512a6b1112f52de07b0a885219e17 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Thu, 4 Dec 2025 18:21:02 +0800 Subject: [PATCH 21/25] cleanup --- src/query/expression/src/converts/arrow/to.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/expression/src/converts/arrow/to.rs b/src/query/expression/src/converts/arrow/to.rs index 42ba82fc8c166..d6b8644e251be 100644 --- a/src/query/expression/src/converts/arrow/to.rs +++ b/src/query/expression/src/converts/arrow/to.rs @@ -101,7 +101,7 @@ pub fn table_schema_arrow_leaf_paths(table_schema: &TableSchema) -> Vec<(ColumnI leaf_fields .into_iter() - .zip(arrow_paths.into_iter()) + .zip(arrow_paths) .map(|(field, path)| (field.column_id(), path)) .collect() } From dbe474e0998078c843562cf8a777cd72c147b757 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Fri, 5 Dec 2025 15:39:55 +0800 Subject: [PATCH 22/25] enable dictionary page by default --- src/query/storages/fuse/src/fuse_table.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/storages/fuse/src/fuse_table.rs b/src/query/storages/fuse/src/fuse_table.rs index c2792920d6a98..dc8c9ea11d75b 100644 --- a/src/query/storages/fuse/src/fuse_table.rs +++ b/src/query/storages/fuse/src/fuse_table.rs @@ -323,7 +323,7 @@ impl FuseTable { self.get_option(FUSE_OPT_KEY_BLOCK_PER_SEGMENT, DEFAULT_BLOCK_PER_SEGMENT); let enable_parquet_dictionary_encoding = - self.get_option(FUSE_OPT_KEY_ENABLE_PARQUET_DICTIONARY, false); + self.get_option(FUSE_OPT_KEY_ENABLE_PARQUET_DICTIONARY, true); WriteSettings { storage_format: self.storage_format, From e92f1e9b04d03d31c9d3526c3a141f4dbf4ca590 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Fri, 5 Dec 2025 17:06:21 +0800 Subject: [PATCH 23/25] refine doc --- .../storages/common/blocks/src/parquet_rs.rs | 22 +++++++++++++++++-- .../fuse/src/io/write/stream/block_builder.rs | 3 ++- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/src/query/storages/common/blocks/src/parquet_rs.rs b/src/query/storages/common/blocks/src/parquet_rs.rs index c2f821038b8b0..579a2e441dd18 100644 --- a/src/query/storages/common/blocks/src/parquet_rs.rs +++ b/src/query/storages/common/blocks/src/parquet_rs.rs @@ -30,6 +30,9 @@ use parquet::file::properties::WriterVersion; use parquet::format::FileMetaData; use parquet::schema::types::ColumnPath; +/// Disable dictionary encoding once the NDV-to-row ratio is greater than this threshold. +const HIGH_CARDINALITY_RATIO_THRESHOLD: f64 = 0.1; + /// Serialize data blocks to parquet format. pub fn blocks_to_parquet( table_schema: &TableSchema, @@ -50,6 +53,16 @@ pub fn blocks_to_parquet( ) } +/// Serialize blocks while optionally tuning dictionary behavior via NDV statistics. +/// +/// * `table_schema` - Logical schema used to build Arrow batches. +/// * `blocks` - In-memory blocks that will be serialized into a single Parquet file. +/// * `write_buffer` - Destination buffer that receives the serialized Parquet bytes. +/// * `compression` - Compression algorithm specified by table-level settings. +/// * `enable_dictionary` - Enables dictionary encoding globally before per-column overrides. +/// * `metadata` - Additional user metadata embedded into the Parquet footer. +/// * `column_stats` - Optional NDV stats from the first block, used to configure writer properties +/// before ArrowWriter instantiation disables further changes. pub fn blocks_to_parquet_with_stats( table_schema: &TableSchema, blocks: Vec, @@ -61,6 +74,8 @@ pub fn blocks_to_parquet_with_stats( ) -> Result { assert!(!blocks.is_empty()); + // Writer properties cannot be tweaked after ArrowWriter creation, so we mirror the behavior of + // the streaming writer and only rely on the first block's NDV (and row count) snapshot. let num_rows = blocks[0].num_rows(); let arrow_schema = Arc::new(table_schema.into()); @@ -86,6 +101,7 @@ pub fn blocks_to_parquet_with_stats( Ok(file_meta) } +/// Create writer properties, optionally disabling dictionaries for high-cardinality columns. pub fn build_parquet_writer_properties( compression: TableCompression, enable_dictionary: bool, @@ -112,8 +128,9 @@ pub fn build_parquet_writer_properties( // Disable dictionary of columns that have high cardinality for (column_id, components) in table_schema_arrow_leaf_paths(table_schema) { if let Some(ndv) = cols_stats.column_ndv(&column_id) { - let high_cardinality = (ndv as f64 / num_rows as f64) > 0.1; - if high_cardinality { + if num_rows > 0 + && (ndv as f64 / num_rows as f64) > HIGH_CARDINALITY_RATIO_THRESHOLD + { builder = builder .set_column_dictionary_enabled(ColumnPath::from(components), false); } @@ -126,6 +143,7 @@ pub fn build_parquet_writer_properties( } } +/// Provides per column NDV statistics pub trait NdvProvider { fn column_ndv(&self, column_id: &ColumnId) -> Option; } diff --git a/src/query/storages/fuse/src/io/write/stream/block_builder.rs b/src/query/storages/fuse/src/io/write/stream/block_builder.rs index d798b8768ae48..36369c41a031b 100644 --- a/src/query/storages/fuse/src/io/write/stream/block_builder.rs +++ b/src/query/storages/fuse/src/io/write/stream/block_builder.rs @@ -379,7 +379,8 @@ impl StreamBlockBuilder { self.block_size += block.estimate_block_size(); if !had_existing_rows { - // Initialize the writer with columns ndv + // Writer properties must be fixed before the ArrowWriter starts, so we rely on the first + // block's NDV stats to heuristically configure the parquet writer. let mut cols_ndv = self.column_stats_state.peek_cols_ndv(); cols_ndv.extend(self.block_stats_builder.peek_cols_ndv()); self.block_writer From 6f50265c3743065c4bef29c6f9d06d02e4de69e9 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Fri, 5 Dec 2025 18:15:35 +0800 Subject: [PATCH 24/25] tweak logic tests --- .../fuse/src/io/write/virtual_column_builder.rs | 5 +++-- .../09_fuse_engine/09_0046_parquet_encoding.test | 8 +++++--- .../ee/01_ee_system/01_0002_virtual_column.test | 14 +++++++------- .../suites/mode/cluster/filter_nulls.test | 14 +++++++------- .../suites/mode/cluster/lazy_read.test | 2 +- .../suites/mode/standalone/explain/fold_agg.test | 4 ++-- .../explain/index/explain_ngram_index.test | 4 ++-- .../standalone/explain/selectivity/modulo.test | 4 ++-- 8 files changed, 29 insertions(+), 26 deletions(-) diff --git a/src/query/storages/fuse/src/io/write/virtual_column_builder.rs b/src/query/storages/fuse/src/io/write/virtual_column_builder.rs index a79b7a0b29b97..9b1325f6a3785 100644 --- a/src/query/storages/fuse/src/io/write/virtual_column_builder.rs +++ b/src/query/storages/fuse/src/io/write/virtual_column_builder.rs @@ -50,7 +50,7 @@ use databend_common_hashtable::StackHashMap; use databend_common_io::constants::DEFAULT_BLOCK_INDEX_BUFFER_SIZE; use databend_common_license::license::Feature; use databend_common_license::license_manager::LicenseManagerSwitch; -use databend_storages_common_blocks::blocks_to_parquet; +use databend_storages_common_blocks::blocks_to_parquet_with_stats; use databend_storages_common_table_meta::meta::DraftVirtualBlockMeta; use databend_storages_common_table_meta::meta::DraftVirtualColumnMeta; use databend_storages_common_table_meta::meta::Location; @@ -506,13 +506,14 @@ impl VirtualColumnBuilder { gen_columns_statistics(&virtual_block, None, &virtual_block_schema)?; let mut data = Vec::with_capacity(DEFAULT_BLOCK_INDEX_BUFFER_SIZE); - let file_meta = blocks_to_parquet( + let file_meta = blocks_to_parquet_with_stats( virtual_block_schema.as_ref(), vec![virtual_block], &mut data, write_settings.table_compression, write_settings.enable_parquet_dictionary, None, + Some(&columns_statistics), )?; let draft_virtual_column_metas = self.file_meta_to_virtual_column_metas( diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0046_parquet_encoding.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0046_parquet_encoding.test index 92c5d76960ba7..0a45d5be70d28 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0046_parquet_encoding.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0046_parquet_encoding.test @@ -8,11 +8,13 @@ use test_tbl_opt_parquet_encoding; # Create table with parquet encoding option # ############################################# + +# by default, enable_parquet_dictionary = 'true' statement ok -create or replace table t_encoded (c int, s string) enable_parquet_dictionary = 'true' storage_format = 'parquet'; +create or replace table t_encoded (c int, s string) storage_format = 'parquet'; statement ok -create or replace table t(c int, s string) storage_format = 'parquet'; +create or replace table t(c int, s string) enable_parquet_dictionary = 'false' storage_format = 'parquet'; statement ok insert into t_encoded(c, s) select 1 as c, to_string(1) as s from numbers(1000000); @@ -43,7 +45,7 @@ with # 1. prepare plain encoded data and keep the file size statement ok -create or replace table tbl (c int, s string) storage_format = 'parquet'; +create or replace table tbl (c int, s string) storage_format = 'parquet' enable_parquet_dictionary = 'false'; statement ok insert into tbl(c, s) select 1 as c, to_string(1) as s from numbers(1000000); diff --git a/tests/sqllogictests/suites/ee/01_ee_system/01_0002_virtual_column.test b/tests/sqllogictests/suites/ee/01_ee_system/01_0002_virtual_column.test index 62715a081c148..816021553ec89 100644 --- a/tests/sqllogictests/suites/ee/01_ee_system/01_0002_virtual_column.test +++ b/tests/sqllogictests/suites/ee/01_ee_system/01_0002_virtual_column.test @@ -143,29 +143,29 @@ test_virtual_column t2 val 3000000002 ['c'] String query II select row_count, virtual_column_size from fuse_block('test_virtual_column', 't2') ---- -3 833 +3 809 query III select block_count, row_count, virtual_column_size from fuse_segment('test_virtual_column', 't2'); ---- -1 3 833 +1 3 809 query III select block_count, row_count, virtual_column_size from fuse_snapshot('test_virtual_column', 't2'); ---- -1 3 833 +1 3 809 query IITTIII select virtual_block_size, row_count, column_name, column_type, column_id, block_offset, bytes_compressed from fuse_virtual_column('test_virtual_column', 't2') ---- -833 3 val['a'] UInt64 NULL 3000000000 4 48 -833 3 val['b'] UInt64 NULL 3000000001 52 48 -833 3 val['c'] String NULL 3000000002 100 48 +809 3 val['a'] UInt64 NULL 3000000000 4 40 +809 3 val['b'] UInt64 NULL 3000000001 44 40 +809 3 val['c'] String NULL 3000000002 84 40 query IIIIII select block_count, row_count, bytes_uncompressed, bytes_compressed, index_size, virtual_block_count from fuse_segment('test_virtual_column', 't2') ---- -1 3 134 712 1271 1 +1 3 134 734 1247 1 statement ok insert into t2 values(4, '{"a":44,"b":4,"c":"value"}'), (5, '{"a":55,"b":5,"c":"bend"}'), (6, '6') diff --git a/tests/sqllogictests/suites/mode/cluster/filter_nulls.test b/tests/sqllogictests/suites/mode/cluster/filter_nulls.test index ff54fe042adaa..2c55f1fd914bb 100644 --- a/tests/sqllogictests/suites/mode/cluster/filter_nulls.test +++ b/tests/sqllogictests/suites/mode/cluster/filter_nulls.test @@ -60,7 +60,7 @@ Exchange │ ├── scan id: 1 │ ├── output columns: [value (#1)] │ ├── read rows: 1000 - │ ├── read size: 2.30 KiB + │ ├── read size: 2.31 KiB │ ├── partitions total: 6 │ ├── partitions scanned: 3 │ ├── pruning stats: [segments: >, blocks: >] @@ -75,7 +75,7 @@ Exchange ├── scan id: 0 ├── output columns: [value (#0)] ├── read rows: 2000 - ├── read size: 3.94 KiB + ├── read size: 3.95 KiB ├── partitions total: 6 ├── partitions scanned: 3 ├── pruning stats: [segments: >, blocks: >] @@ -124,7 +124,7 @@ Exchange │ │ ├── scan id: 1 │ │ ├── output columns: [value (#1)] │ │ ├── read rows: 1000 - │ │ ├── read size: 2.30 KiB + │ │ ├── read size: 2.31 KiB │ │ ├── partitions total: 6 │ │ ├── partitions scanned: 3 │ │ ├── pruning stats: [segments: >, blocks: >] @@ -139,7 +139,7 @@ Exchange │ ├── scan id: 2 │ ├── output columns: [value (#2)] │ ├── read rows: 2000 - │ ├── read size: 3.94 KiB + │ ├── read size: 3.95 KiB │ ├── partitions total: 6 │ ├── partitions scanned: 3 │ ├── pruning stats: [segments: >, blocks: >] @@ -155,7 +155,7 @@ Exchange ├── scan id: 0 ├── output columns: [value (#0)] ├── read rows: 2000 - ├── read size: 3.94 KiB + ├── read size: 3.95 KiB ├── partitions total: 6 ├── partitions scanned: 3 ├── pruning stats: [segments: >, blocks: >] @@ -190,7 +190,7 @@ Exchange │ ├── scan id: 1 │ ├── output columns: [value (#1)] │ ├── read rows: 1000 - │ ├── read size: 2.30 KiB + │ ├── read size: 2.31 KiB │ ├── partitions total: 6 │ ├── partitions scanned: 3 │ ├── pruning stats: [segments: >, blocks: >] @@ -205,7 +205,7 @@ Exchange ├── scan id: 0 ├── output columns: [value (#0)] ├── read rows: 2000 - ├── read size: 3.94 KiB + ├── read size: 3.95 KiB ├── partitions total: 6 ├── partitions scanned: 3 ├── pruning stats: [segments: >, blocks: >] diff --git a/tests/sqllogictests/suites/mode/cluster/lazy_read.test b/tests/sqllogictests/suites/mode/cluster/lazy_read.test index fbd5e1208151f..80bbbb252a41e 100644 --- a/tests/sqllogictests/suites/mode/cluster/lazy_read.test +++ b/tests/sqllogictests/suites/mode/cluster/lazy_read.test @@ -119,7 +119,7 @@ Limit ├── scan id: 0 ├── output columns: [a (#0), b (#1), c (#2), d (#3), e (#6)] ├── read rows: 200 - ├── read size: 1.12 KiB + ├── read size: 1.26 KiB ├── partitions total: 3 ├── partitions scanned: 2 ├── pruning stats: [segments: >, blocks: , topn pruning: 3 to 2 cost: >] diff --git a/tests/sqllogictests/suites/mode/standalone/explain/fold_agg.test b/tests/sqllogictests/suites/mode/standalone/explain/fold_agg.test index 9e55dd5f0540c..65939ba5af0a7 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/fold_agg.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/fold_agg.test @@ -86,7 +86,7 @@ AggregateFinal ├── scan id: 0 ├── output columns: [number (#0)] ├── read rows: 3000 - ├── read size: 5.89 KiB + ├── read size: 5.90 KiB ├── partitions total: 2 ├── partitions scanned: 2 ├── pruning stats: [segments: >, blocks: >] @@ -110,7 +110,7 @@ AggregateFinal ├── scan id: 0 ├── output columns: [number (#0)] ├── read rows: 3000 - ├── read size: 5.89 KiB + ├── read size: 5.90 KiB ├── partitions total: 2 ├── partitions scanned: 2 ├── pruning stats: [segments: >, blocks: >] diff --git a/tests/sqllogictests/suites/mode/standalone/explain/index/explain_ngram_index.test b/tests/sqllogictests/suites/mode/standalone/explain/index/explain_ngram_index.test index 6637fdb6872e6..fcb2720dd8774 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/index/explain_ngram_index.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/index/explain_ngram_index.test @@ -47,7 +47,7 @@ Filter ├── scan id: 0 ├── output columns: [id (#0), content (#1)] ├── read rows: 16 - ├── read size: 1.03 KiB + ├── read size: < 1 KiB ├── partitions total: 8 ├── partitions scanned: 8 ├── pruning stats: [segments: >, blocks: , bloom pruning: 8 to 8 cost: >] @@ -104,7 +104,7 @@ Filter ├── scan id: 0 ├── output columns: [id (#0), content (#1)] ├── read rows: 16 - ├── read size: 1.03 KiB + ├── read size: < 1 KiB ├── partitions total: 8 ├── partitions scanned: 8 ├── pruning stats: [segments: >, blocks: , bloom pruning: 8 to 8 cost: >] diff --git a/tests/sqllogictests/suites/mode/standalone/explain/selectivity/modulo.test b/tests/sqllogictests/suites/mode/standalone/explain/selectivity/modulo.test index 4c30b49fb5f4c..aa2340bbed87e 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/selectivity/modulo.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/selectivity/modulo.test @@ -23,7 +23,7 @@ AggregateFinal ├── scan id: 0 ├── output columns: [number (#0)] ├── read rows: 1000 - ├── read size: 1.40 KiB + ├── read size: 1.41 KiB ├── partitions total: 1 ├── partitions scanned: 1 ├── pruning stats: [segments: >, blocks: >] @@ -52,7 +52,7 @@ AggregateFinal ├── scan id: 0 ├── output columns: [number (#0)] ├── read rows: 1000 - ├── read size: 1.40 KiB + ├── read size: 1.41 KiB ├── partitions total: 1 ├── partitions scanned: 1 ├── pruning stats: [segments: >, blocks: >] From c3c533f6b5c47af34b4391a39699f1a52953c2df Mon Sep 17 00:00:00 2001 From: dantengsky Date: Sat, 6 Dec 2025 16:27:00 +0800 Subject: [PATCH 25/25] fix filter_nulls.test --- tests/sqllogictests/suites/mode/cluster/filter_nulls.test | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/sqllogictests/suites/mode/cluster/filter_nulls.test b/tests/sqllogictests/suites/mode/cluster/filter_nulls.test index 2c55f1fd914bb..29a426f6c1e31 100644 --- a/tests/sqllogictests/suites/mode/cluster/filter_nulls.test +++ b/tests/sqllogictests/suites/mode/cluster/filter_nulls.test @@ -240,7 +240,7 @@ Exchange │ ├── scan id: 1 │ ├── output columns: [value (#1)] │ ├── read rows: 1000 - │ ├── read size: 2.30 KiB + │ ├── read size: 2.31 KiB │ ├── partitions total: 6 │ ├── partitions scanned: 3 │ ├── pruning stats: [segments: >, blocks: >] @@ -258,7 +258,7 @@ Exchange ├── scan id: 0 ├── output columns: [value (#0)] ├── read rows: 2000 - ├── read size: 3.94 KiB + ├── read size: 3.95 KiB ├── partitions total: 6 ├── partitions scanned: 3 ├── pruning stats: [segments: >, blocks: >]