diff --git a/src/query/expression/src/converts/arrow/mod.rs b/src/query/expression/src/converts/arrow/mod.rs index f1429cac4d7c2..c5678ffaa8252 100644 --- a/src/query/expression/src/converts/arrow/mod.rs +++ b/src/query/expression/src/converts/arrow/mod.rs @@ -14,6 +14,7 @@ mod from; mod to; +pub use self::to::table_schema_arrow_leaf_paths; pub const EXTENSION_KEY: &str = "Extension"; pub const ARROW_EXT_TYPE_EMPTY_ARRAY: &str = "EmptyArray"; diff --git a/src/query/expression/src/converts/arrow/to.rs b/src/query/expression/src/converts/arrow/to.rs index 73437acce117c..d6b8644e251be 100644 --- a/src/query/expression/src/converts/arrow/to.rs +++ b/src/query/expression/src/converts/arrow/to.rs @@ -41,6 +41,7 @@ use super::ARROW_EXT_TYPE_VARIANT; use super::ARROW_EXT_TYPE_VECTOR; use super::EXTENSION_KEY; use crate::infer_table_schema; +use crate::schema::is_internal_column_id; use crate::types::DataType; use crate::types::DecimalColumn; use crate::types::DecimalDataType; @@ -50,6 +51,7 @@ use crate::types::VectorColumn; use crate::types::VectorDataType; use crate::with_number_type; use crate::Column; +use crate::ColumnId; use crate::DataBlock; use crate::DataField; use crate::DataSchema; @@ -82,6 +84,57 @@ impl From<&TableSchema> for Schema { } } +pub fn table_schema_arrow_leaf_paths(table_schema: &TableSchema) -> Vec<(ColumnId, Vec)> { + let mut arrow_paths = Vec::new(); + for field in table_schema.fields() { + if is_internal_column_id(field.column_id()) { + continue; + } + + let arrow_field = Field::from(field); + let mut current_path = vec![arrow_field.name().clone()]; + collect_arrow_leaf_paths(&arrow_field, &mut current_path, &mut arrow_paths); + } + + let leaf_fields = table_schema.leaf_fields(); + debug_assert_eq!(leaf_fields.len(), arrow_paths.len()); + + leaf_fields + .into_iter() + .zip(arrow_paths) + .map(|(field, path)| (field.column_id(), path)) + .collect() +} + +fn collect_arrow_leaf_paths( + arrow_field: &Field, + current_path: &mut Vec, + paths: &mut Vec>, +) { + match arrow_field.data_type() { + ArrowDataType::Struct(children) => { + for child in children { + current_path.push(child.name().clone()); + collect_arrow_leaf_paths(child.as_ref(), current_path, paths); + current_path.pop(); + } + } + ArrowDataType::Map(child, _) => { + current_path.push(child.name().clone()); + collect_arrow_leaf_paths(child.as_ref(), current_path, paths); + current_path.pop(); + } + ArrowDataType::LargeList(child) + | ArrowDataType::List(child) + | ArrowDataType::FixedSizeList(child, _) => { + current_path.push(child.name().clone()); + collect_arrow_leaf_paths(child.as_ref(), current_path, paths); + current_path.pop(); + } + _ => paths.push(current_path.clone()), + } +} + impl From<&DataType> for ArrowDataType { fn from(ty: &DataType) -> Self { let fields = DataField::new("dummy", ty.clone()); @@ -93,7 +146,6 @@ impl From<&DataType> for ArrowDataType { impl From<&TableField> for Field { fn from(f: &TableField) -> Self { let mut metadata = HashMap::new(); - let ty = match &f.data_type { TableDataType::Null => ArrowDataType::Null, TableDataType::EmptyArray => { @@ -264,6 +316,30 @@ impl DataBlock { self.to_record_batch(&table_schema) } + pub fn to_record_batch_with_arrow_schema( + self, + arrow_schema: &Arc, + ) -> Result { + let num_fields = arrow_schema.fields.len(); + if self.columns().len() != num_fields { + return Err(ErrorCode::Internal(format!( + "The number of columns in the data block does not match the number of fields in the table schema, block_columns: {}, table_schema_fields: {}", + self.columns().len(), + num_fields, + ))); + } + + if num_fields == 0 { + return Ok(RecordBatch::try_new_with_options( + Arc::new(Schema::empty()), + vec![], + &RecordBatchOptions::default().with_row_count(Some(self.num_rows())), + )?); + } + + self.build_record_batch(arrow_schema.clone()) + } + pub fn to_record_batch(self, table_schema: &TableSchema) -> Result { if self.columns().len() != table_schema.num_fields() { return Err(ErrorCode::Internal(format!( @@ -282,6 +358,10 @@ impl DataBlock { } let arrow_schema = Schema::from(table_schema); + self.build_record_batch(Arc::new(arrow_schema)) + } + + fn build_record_batch(self, arrow_schema: Arc) -> Result { let mut arrays = Vec::with_capacity(self.columns().len()); for (entry, arrow_field) in self.take_columns().into_iter().zip(arrow_schema.fields()) { let array = entry.to_column().maybe_gc().into_arrow_rs(); @@ -289,7 +369,7 @@ impl DataBlock { // Adjust struct array names arrays.push(Self::adjust_nested_array(array, arrow_field.as_ref())); } - Ok(RecordBatch::try_new(Arc::new(arrow_schema), arrays)?) + Ok(RecordBatch::try_new(arrow_schema, arrays)?) } fn adjust_nested_array(array: Arc, arrow_field: &Field) -> Arc { diff --git a/src/query/storages/common/blocks/src/parquet_rs.rs b/src/query/storages/common/blocks/src/parquet_rs.rs index 6da54c70d985d..579a2e441dd18 100644 --- a/src/query/storages/common/blocks/src/parquet_rs.rs +++ b/src/query/storages/common/blocks/src/parquet_rs.rs @@ -15,17 +15,23 @@ use std::sync::Arc; use databend_common_exception::Result; +use databend_common_expression::converts::arrow::table_schema_arrow_leaf_paths; +use databend_common_expression::ColumnId; use databend_common_expression::DataBlock; use databend_common_expression::TableSchema; +use databend_storages_common_table_meta::meta::StatisticsOfColumns; use databend_storages_common_table_meta::table::TableCompression; use parquet::arrow::ArrowWriter; use parquet::basic::Encoding; use parquet::file::metadata::KeyValue; use parquet::file::properties::EnabledStatistics; use parquet::file::properties::WriterProperties; -use parquet::file::properties::WriterPropertiesBuilder; use parquet::file::properties::WriterVersion; use parquet::format::FileMetaData; +use parquet::schema::types::ColumnPath; + +/// Disable dictionary encoding once the NDV-to-row ratio is greater than this threshold. +const HIGH_CARDINALITY_RATIO_THRESHOLD: f64 = 0.1; /// Serialize data blocks to parquet format. pub fn blocks_to_parquet( @@ -35,16 +41,58 @@ pub fn blocks_to_parquet( compression: TableCompression, enable_dictionary: bool, metadata: Option>, +) -> Result { + blocks_to_parquet_with_stats( + table_schema, + blocks, + write_buffer, + compression, + enable_dictionary, + metadata, + None, + ) +} + +/// Serialize blocks while optionally tuning dictionary behavior via NDV statistics. +/// +/// * `table_schema` - Logical schema used to build Arrow batches. +/// * `blocks` - In-memory blocks that will be serialized into a single Parquet file. +/// * `write_buffer` - Destination buffer that receives the serialized Parquet bytes. +/// * `compression` - Compression algorithm specified by table-level settings. +/// * `enable_dictionary` - Enables dictionary encoding globally before per-column overrides. +/// * `metadata` - Additional user metadata embedded into the Parquet footer. +/// * `column_stats` - Optional NDV stats from the first block, used to configure writer properties +/// before ArrowWriter instantiation disables further changes. +pub fn blocks_to_parquet_with_stats( + table_schema: &TableSchema, + blocks: Vec, + write_buffer: &mut Vec, + compression: TableCompression, + enable_dictionary: bool, + metadata: Option>, + column_stats: Option<&StatisticsOfColumns>, ) -> Result { assert!(!blocks.is_empty()); - let builder = parquet_writer_properties_builder(compression, enable_dictionary, metadata); - let props = builder.build(); + // Writer properties cannot be tweaked after ArrowWriter creation, so we mirror the behavior of + // the streaming writer and only rely on the first block's NDV (and row count) snapshot. + let num_rows = blocks[0].num_rows(); + let arrow_schema = Arc::new(table_schema.into()); + + let props = build_parquet_writer_properties( + compression, + enable_dictionary, + column_stats, + metadata, + num_rows, + table_schema, + ); + let batches = blocks .into_iter() - .map(|block| block.to_record_batch(table_schema)) + .map(|block| block.to_record_batch_with_arrow_schema(&arrow_schema)) .collect::>>()?; - let arrow_schema = Arc::new(table_schema.into()); + let mut writer = ArrowWriter::try_new(write_buffer, arrow_schema, Some(props))?; for batch in batches { writer.write(&batch)?; @@ -53,12 +101,16 @@ pub fn blocks_to_parquet( Ok(file_meta) } -pub fn parquet_writer_properties_builder( +/// Create writer properties, optionally disabling dictionaries for high-cardinality columns. +pub fn build_parquet_writer_properties( compression: TableCompression, enable_dictionary: bool, + cols_stats: Option, metadata: Option>, -) -> WriterPropertiesBuilder { - let builder = WriterProperties::builder() + num_rows: usize, + table_schema: &TableSchema, +) -> WriterProperties { + let mut builder = WriterProperties::builder() .set_compression(compression.into()) // use `usize::MAX` to effectively limit the number of row groups to 1 .set_max_row_group_size(usize::MAX) @@ -68,10 +120,147 @@ pub fn parquet_writer_properties_builder( .set_key_value_metadata(metadata); if enable_dictionary { - builder + // Enable dictionary for all columns + builder = builder .set_writer_version(WriterVersion::PARQUET_2_0) - .set_dictionary_enabled(true) + .set_dictionary_enabled(true); + if let Some(cols_stats) = cols_stats { + // Disable dictionary of columns that have high cardinality + for (column_id, components) in table_schema_arrow_leaf_paths(table_schema) { + if let Some(ndv) = cols_stats.column_ndv(&column_id) { + if num_rows > 0 + && (ndv as f64 / num_rows as f64) > HIGH_CARDINALITY_RATIO_THRESHOLD + { + builder = builder + .set_column_dictionary_enabled(ColumnPath::from(components), false); + } + } + } + } + builder.build() } else { - builder.set_dictionary_enabled(false) + builder.set_dictionary_enabled(false).build() + } +} + +/// Provides per column NDV statistics +pub trait NdvProvider { + fn column_ndv(&self, column_id: &ColumnId) -> Option; +} + +impl NdvProvider for &StatisticsOfColumns { + fn column_ndv(&self, column_id: &ColumnId) -> Option { + self.get(column_id).and_then(|item| item.distinct_of_values) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use databend_common_expression::types::number::NumberDataType; + use databend_common_expression::TableDataType; + use databend_common_expression::TableField; + + use super::*; + + struct TestNdvProvider { + ndv: HashMap, + } + + impl NdvProvider for TestNdvProvider { + fn column_ndv(&self, column_id: &ColumnId) -> Option { + self.ndv.get(column_id).copied() + } + } + + fn sample_schema() -> TableSchema { + TableSchema::new(vec![ + TableField::new("simple", TableDataType::Number(NumberDataType::Int32)), + TableField::new("nested", TableDataType::Tuple { + fields_name: vec!["leaf".to_string(), "arr".to_string()], + fields_type: vec![ + TableDataType::Number(NumberDataType::Int64), + TableDataType::Array(Box::new(TableDataType::Number(NumberDataType::UInt64))), + ], + }), + TableField::new("no_stats", TableDataType::String), + ]) + } + + fn column_id(schema: &TableSchema, name: &str) -> ColumnId { + schema + .leaf_fields() + .into_iter() + .find(|field| field.name() == name) + .unwrap_or_else(|| panic!("missing field {}", name)) + .column_id() + } + + #[test] + fn test_build_parquet_writer_properties_handles_nested_leaves() { + let schema = sample_schema(); + + let mut ndv = HashMap::new(); + ndv.insert(column_id(&schema, "simple"), 500); + ndv.insert(column_id(&schema, "nested:leaf"), 50); + ndv.insert(column_id(&schema, "nested:arr:0"), 400); + + let column_paths: HashMap = table_schema_arrow_leaf_paths(&schema) + .into_iter() + .map(|(id, path)| (id, ColumnPath::from(path))) + .collect(); + + let props = build_parquet_writer_properties( + TableCompression::Zstd, + true, + Some(TestNdvProvider { ndv }), + None, + 1000, + &schema, + ); + + assert!( + !props.dictionary_enabled(&column_paths[&column_id(&schema, "simple")]), + "high cardinality top-level column should disable dictionary" + ); + assert!( + props.dictionary_enabled(&column_paths[&column_id(&schema, "nested:leaf")]), + "low cardinality nested column should keep dictionary" + ); + assert!( + !props.dictionary_enabled(&column_paths[&column_id(&schema, "nested:arr:0")]), + "high cardinality nested array element should disable dictionary" + ); + assert!( + props.dictionary_enabled(&column_paths[&column_id(&schema, "no_stats")]), + "columns without NDV stats keep the default dictionary behavior" + ); + } + + #[test] + fn test_build_parquet_writer_properties_disabled_globally() { + let schema = sample_schema(); + + let column_paths: HashMap = table_schema_arrow_leaf_paths(&schema) + .into_iter() + .map(|(id, path)| (id, ColumnPath::from(path))) + .collect(); + + let props = build_parquet_writer_properties( + TableCompression::Zstd, + false, + None::, + None, + 1000, + &schema, + ); + + for field in schema.leaf_fields() { + assert!( + !props.dictionary_enabled(&column_paths[&field.column_id()]), + "dictionary must remain disabled when enable_dictionary is false", + ); + } } } diff --git a/src/query/storages/fuse/src/fuse_table.rs b/src/query/storages/fuse/src/fuse_table.rs index c2792920d6a98..dc8c9ea11d75b 100644 --- a/src/query/storages/fuse/src/fuse_table.rs +++ b/src/query/storages/fuse/src/fuse_table.rs @@ -323,7 +323,7 @@ impl FuseTable { self.get_option(FUSE_OPT_KEY_BLOCK_PER_SEGMENT, DEFAULT_BLOCK_PER_SEGMENT); let enable_parquet_dictionary_encoding = - self.get_option(FUSE_OPT_KEY_ENABLE_PARQUET_DICTIONARY, false); + self.get_option(FUSE_OPT_KEY_ENABLE_PARQUET_DICTIONARY, true); WriteSettings { storage_format: self.storage_format, diff --git a/src/query/storages/fuse/src/io/write/block_statistics_writer.rs b/src/query/storages/fuse/src/io/write/block_statistics_writer.rs index 3257863962f40..eb1b68dc2cef0 100644 --- a/src/query/storages/fuse/src/io/write/block_statistics_writer.rs +++ b/src/query/storages/fuse/src/io/write/block_statistics_writer.rs @@ -17,6 +17,7 @@ use std::collections::HashMap; use databend_common_exception::Result; use databend_common_expression::BlockEntry; +use databend_common_expression::ColumnId; use databend_common_expression::DataBlock; use databend_common_expression::FieldIndex; use databend_common_expression::TableField; @@ -42,7 +43,7 @@ pub struct BlockStatsBuilder { pub struct ColumnNDVBuilder { index: FieldIndex, field: TableField, - builder: ColumnNDVEstimator, + pub builder: ColumnNDVEstimator, } impl BlockStatsBuilder { @@ -85,6 +86,13 @@ impl BlockStatsBuilder { Ok(()) } + pub fn peek_cols_ndv(&self) -> HashMap { + self.builders + .iter() + .map(|item| (item.field.column_id(), item.builder.peek())) + .collect() + } + pub fn finalize(self) -> Result> { if self.builders.is_empty() { return Ok(None); diff --git a/src/query/storages/fuse/src/io/write/block_writer.rs b/src/query/storages/fuse/src/io/write/block_writer.rs index 531f70a17c5b5..5a03e64978c34 100644 --- a/src/query/storages/fuse/src/io/write/block_writer.rs +++ b/src/query/storages/fuse/src/io/write/block_writer.rs @@ -44,7 +44,7 @@ use databend_common_metrics::storage::metrics_inc_block_virtual_column_write_num use databend_common_metrics::storage::metrics_inc_block_write_milliseconds; use databend_common_metrics::storage::metrics_inc_block_write_nums; use databend_common_native::write::NativeWriter; -use databend_storages_common_blocks::blocks_to_parquet; +use databend_storages_common_blocks::blocks_to_parquet_with_stats; use databend_storages_common_index::NgramArgs; use databend_storages_common_table_meta::meta::encode_column_hll; use databend_storages_common_table_meta::meta::BlockHLLState; @@ -52,6 +52,7 @@ use databend_storages_common_table_meta::meta::BlockMeta; use databend_storages_common_table_meta::meta::ClusterStatistics; use databend_storages_common_table_meta::meta::ColumnMeta; use databend_storages_common_table_meta::meta::ExtendedBlockMeta; +use databend_storages_common_table_meta::meta::StatisticsOfColumns; use databend_storages_common_table_meta::meta::TableMetaTimestamps; use databend_storages_common_table_meta::table::TableCompression; use opendal::Operator; @@ -76,17 +77,28 @@ pub fn serialize_block( schema: &TableSchemaRef, block: DataBlock, buf: &mut Vec, +) -> Result> { + serialize_block_with_column_stats(write_settings, schema, None, block, buf) +} + +pub fn serialize_block_with_column_stats( + write_settings: &WriteSettings, + schema: &TableSchemaRef, + column_stats: Option<&StatisticsOfColumns>, + block: DataBlock, + buf: &mut Vec, ) -> Result> { let schema = Arc::new(schema.remove_virtual_computed_fields()); match write_settings.storage_format { FuseStorageFormat::Parquet => { - let result = blocks_to_parquet( + let result = blocks_to_parquet_with_stats( &schema, vec![block], buf, write_settings.table_compression, write_settings.enable_parquet_dictionary, None, + column_stats, )?; let meta = column_parquet_metas(&result, &schema)?; Ok(meta) @@ -248,9 +260,10 @@ impl BlockBuilder { let mut buffer = Vec::with_capacity(DEFAULT_BLOCK_BUFFER_SIZE); let block_size = data_block.estimate_block_size() as u64; - let col_metas = serialize_block( + let col_metas = serialize_block_with_column_stats( &self.write_settings, &self.source_schema, + Some(&col_stats), data_block, &mut buffer, )?; diff --git a/src/query/storages/fuse/src/io/write/stream/block_builder.rs b/src/query/storages/fuse/src/io/write/stream/block_builder.rs index 4de89f7e0a6fb..36369c41a031b 100644 --- a/src/query/storages/fuse/src/io/write/stream/block_builder.rs +++ b/src/query/storages/fuse/src/io/write/stream/block_builder.rs @@ -19,6 +19,8 @@ use std::collections::HashSet; use std::mem; use std::sync::Arc; +use arrow_array::RecordBatch; +use arrow_schema::Schema; use chrono::Utc; use databend_common_catalog::table::Table; use databend_common_catalog::table_context::TableContext; @@ -38,7 +40,8 @@ use databend_common_meta_app::schema::TableIndex; use databend_common_native::write::NativeWriter; use databend_common_native::write::WriteOptions; use databend_common_sql::executor::physical_plans::MutationKind; -use databend_storages_common_blocks::parquet_writer_properties_builder; +use databend_storages_common_blocks::build_parquet_writer_properties; +use databend_storages_common_blocks::NdvProvider; use databend_storages_common_index::BloomIndex; use databend_storages_common_index::BloomIndexBuilder; use databend_storages_common_index::Index; @@ -50,8 +53,10 @@ use databend_storages_common_table_meta::meta::ColumnMeta; use databend_storages_common_table_meta::meta::TableMetaTimestamps; use databend_storages_common_table_meta::table::TableCompression; use parquet::arrow::ArrowWriter; +use parquet::format::FileMetaData; use crate::io::create_inverted_index_builders; +use crate::io::write::stream::block_builder::ArrowParquetWriter::Initialized; use crate::io::write::stream::cluster_statistics::ClusterStatisticsBuilder; use crate::io::write::stream::cluster_statistics::ClusterStatisticsState; use crate::io::write::stream::ColumnStatisticsState; @@ -69,14 +74,102 @@ use crate::operations::column_parquet_metas; use crate::FuseStorageFormat; use crate::FuseTable; +pub struct UninitializedArrowWriter { + write_settings: WriteSettings, + arrow_schema: Arc, + table_schema: TableSchemaRef, +} +impl UninitializedArrowWriter { + fn init(&self, cols_ndv_info: ColumnsNdvInfo) -> Result>> { + let write_settings = &self.write_settings; + let num_rows = cols_ndv_info.num_rows; + + let writer_properties = build_parquet_writer_properties( + write_settings.table_compression, + write_settings.enable_parquet_dictionary, + Some(cols_ndv_info), + None, + num_rows, + self.table_schema.as_ref(), + ); + let buffer = Vec::with_capacity(DEFAULT_BLOCK_BUFFER_SIZE); + let writer = + ArrowWriter::try_new(buffer, self.arrow_schema.clone(), Some(writer_properties))?; + Ok(writer) + } +} + +pub struct InitializedArrowWriter { + inner: ArrowWriter>, +} +pub enum ArrowParquetWriter { + Uninitialized(UninitializedArrowWriter), + Initialized(InitializedArrowWriter), +} +impl ArrowParquetWriter { + fn new_uninitialized(write_settings: WriteSettings, table_schema: TableSchemaRef) -> Self { + let arrow_schema = Arc::new(table_schema.as_ref().into()); + ArrowParquetWriter::Uninitialized(UninitializedArrowWriter { + write_settings, + arrow_schema, + table_schema, + }) + } + fn write(&mut self, batch: &RecordBatch) -> Result<()> { + let Initialized(writer) = self else { + unreachable!("ArrowParquetWriter::write called before initialization"); + }; + writer.inner.write(batch)?; + Ok(()) + } + + fn finish(&mut self) -> Result { + let Initialized(writer) = self else { + unreachable!("ArrowParquetWriter::finish called before initialization"); + }; + let file_meta = writer.inner.finish()?; + Ok(file_meta) + } + + fn inner_mut(&mut self) -> &mut Vec { + let Initialized(writer) = self else { + unreachable!("ArrowParquetWriter::inner_mut called before initialization"); + }; + writer.inner.inner_mut() + } + + fn in_progress_size(&self) -> usize { + match self { + ArrowParquetWriter::Uninitialized(_) => 0, + Initialized(writer) => writer.inner.in_progress_size(), + } + } +} + +pub struct ColumnsNdvInfo { + cols_ndv: HashMap, + num_rows: usize, +} + +impl ColumnsNdvInfo { + fn new(num_rows: usize, cols_ndv: HashMap) -> Self { + Self { cols_ndv, num_rows } + } +} +impl NdvProvider for ColumnsNdvInfo { + fn column_ndv(&self, column_id: &ColumnId) -> Option { + self.cols_ndv.get(column_id).map(|v| *v as u64) + } +} + pub enum BlockWriterImpl { - Arrow(ArrowWriter>), + Parquet(ArrowParquetWriter), // Native format doesnot support stream write. Native(NativeWriter>), } pub trait BlockWriter { - fn start(&mut self) -> Result<()>; + fn start(&mut self, cols_ndv: ColumnsNdvInfo) -> Result<()>; fn write(&mut self, block: DataBlock, schema: &TableSchema) -> Result<()>; @@ -88,18 +181,28 @@ pub trait BlockWriter { } impl BlockWriter for BlockWriterImpl { - fn start(&mut self) -> Result<()> { + fn start(&mut self, cols_ndv_info: ColumnsNdvInfo) -> Result<()> { match self { - BlockWriterImpl::Arrow(_) => Ok(()), - BlockWriterImpl::Native(writer) => Ok(writer.start()?), + BlockWriterImpl::Parquet(arrow_writer) => { + let ArrowParquetWriter::Uninitialized(uninitialized) = arrow_writer else { + unreachable!( + "Unexpected writer state: ArrowWriterImpl::Parquet has been initialized" + ); + }; + + let inner = uninitialized.init(cols_ndv_info)?; + *arrow_writer = ArrowParquetWriter::Initialized(InitializedArrowWriter { inner }); + Ok(()) + } + BlockWriterImpl::Native(native_writer) => Ok(native_writer.start()?), } } fn write(&mut self, block: DataBlock, schema: &TableSchema) -> Result<()> { match self { - BlockWriterImpl::Arrow(writer) => { + BlockWriterImpl::Parquet(writer) => { let batch = block.to_record_batch(schema)?; - writer.write(&batch)?; + writer.write(&batch)? } BlockWriterImpl::Native(writer) => { let block = block.consume_convert_to_full(); @@ -116,7 +219,7 @@ impl BlockWriter for BlockWriterImpl { fn finish(&mut self, schema: &TableSchemaRef) -> Result> { match self { - BlockWriterImpl::Arrow(writer) => { + BlockWriterImpl::Parquet(writer) => { let file_meta = writer.finish()?; column_parquet_metas(&file_meta, schema) } @@ -136,14 +239,14 @@ impl BlockWriter for BlockWriterImpl { fn inner_mut(&mut self) -> &mut Vec { match self { - BlockWriterImpl::Arrow(writer) => writer.inner_mut(), + BlockWriterImpl::Parquet(writer) => writer.inner_mut(), BlockWriterImpl::Native(writer) => writer.inner_mut(), } } fn compressed_size(&self) -> usize { match self { - BlockWriterImpl::Arrow(writer) => writer.in_progress_size(), + BlockWriterImpl::Parquet(writer) => writer.in_progress_size(), BlockWriterImpl::Native(writer) => writer.total_size(), } } @@ -170,17 +273,10 @@ impl StreamBlockBuilder { let buffer = Vec::with_capacity(DEFAULT_BLOCK_BUFFER_SIZE); let block_writer = match properties.write_settings.storage_format { FuseStorageFormat::Parquet => { - let write_settings = &properties.write_settings; - let props = parquet_writer_properties_builder( - write_settings.table_compression, - write_settings.enable_parquet_dictionary, - None, - ) - .build(); - - let arrow_schema = Arc::new(properties.source_schema.as_ref().into()); - let writer = ArrowWriter::try_new(buffer, arrow_schema, Some(props))?; - BlockWriterImpl::Arrow(writer) + BlockWriterImpl::Parquet(ArrowParquetWriter::new_uninitialized( + properties.write_settings.clone(), + properties.source_schema.clone(), + )) } FuseStorageFormat::Native => { let mut default_compress_ratio = Some(2.10f64); @@ -263,9 +359,7 @@ impl StreamBlockBuilder { return Ok(()); } - if self.row_count == 0 { - self.block_writer.start()?; - } + let had_existing_rows = self.row_count > 0; let block = self.cluster_stats_state.add_block(block)?; self.column_stats_state @@ -283,6 +377,16 @@ impl StreamBlockBuilder { } self.row_count += block.num_rows(); self.block_size += block.estimate_block_size(); + + if !had_existing_rows { + // Writer properties must be fixed before the ArrowWriter starts, so we rely on the first + // block's NDV stats to heuristically configure the parquet writer. + let mut cols_ndv = self.column_stats_state.peek_cols_ndv(); + cols_ndv.extend(self.block_stats_builder.peek_cols_ndv()); + self.block_writer + .start(ColumnsNdvInfo::new(block.num_rows(), cols_ndv))?; + } + self.block_writer .write(block, &self.properties.source_schema)?; Ok(()) diff --git a/src/query/storages/fuse/src/io/write/stream/column_ndv_estimator.rs b/src/query/storages/fuse/src/io/write/stream/column_ndv_estimator.rs index 84ea04b690330..8784094357e78 100644 --- a/src/query/storages/fuse/src/io/write/stream/column_ndv_estimator.rs +++ b/src/query/storages/fuse/src/io/write/stream/column_ndv_estimator.rs @@ -47,6 +47,8 @@ use enum_dispatch::enum_dispatch; pub trait ColumnNDVEstimatorOps: Send + Sync { fn update_column(&mut self, column: &Column); fn update_scalar(&mut self, scalar: &ScalarRef); + + fn peek(&self) -> usize; fn finalize(&self) -> usize; fn hll(self) -> MetaHLL; } @@ -182,6 +184,10 @@ where self.hll.add_object(&val); } + fn peek(&self) -> usize { + self.hll.count() + } + fn finalize(&self) -> usize { self.hll.count() } diff --git a/src/query/storages/fuse/src/io/write/stream/column_statistics_state.rs b/src/query/storages/fuse/src/io/write/stream/column_statistics_state.rs index ac65378b20d22..df5304e4e5afc 100644 --- a/src/query/storages/fuse/src/io/write/stream/column_statistics_state.rs +++ b/src/query/storages/fuse/src/io/write/stream/column_statistics_state.rs @@ -86,6 +86,13 @@ impl ColumnStatisticsState { Ok(()) } + pub fn peek_cols_ndv(&self) -> HashMap { + self.distinct_columns + .iter() + .map(|(column_id, ndv_estimator)| (*column_id, ndv_estimator.peek())) + .collect() + } + pub fn finalize( self, mut column_distinct_count: HashMap, diff --git a/src/query/storages/fuse/src/io/write/virtual_column_builder.rs b/src/query/storages/fuse/src/io/write/virtual_column_builder.rs index a79b7a0b29b97..9b1325f6a3785 100644 --- a/src/query/storages/fuse/src/io/write/virtual_column_builder.rs +++ b/src/query/storages/fuse/src/io/write/virtual_column_builder.rs @@ -50,7 +50,7 @@ use databend_common_hashtable::StackHashMap; use databend_common_io::constants::DEFAULT_BLOCK_INDEX_BUFFER_SIZE; use databend_common_license::license::Feature; use databend_common_license::license_manager::LicenseManagerSwitch; -use databend_storages_common_blocks::blocks_to_parquet; +use databend_storages_common_blocks::blocks_to_parquet_with_stats; use databend_storages_common_table_meta::meta::DraftVirtualBlockMeta; use databend_storages_common_table_meta::meta::DraftVirtualColumnMeta; use databend_storages_common_table_meta::meta::Location; @@ -506,13 +506,14 @@ impl VirtualColumnBuilder { gen_columns_statistics(&virtual_block, None, &virtual_block_schema)?; let mut data = Vec::with_capacity(DEFAULT_BLOCK_INDEX_BUFFER_SIZE); - let file_meta = blocks_to_parquet( + let file_meta = blocks_to_parquet_with_stats( virtual_block_schema.as_ref(), vec![virtual_block], &mut data, write_settings.table_compression, write_settings.enable_parquet_dictionary, None, + Some(&columns_statistics), )?; let draft_virtual_column_metas = self.file_meta_to_virtual_column_metas( diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0046_parquet_encoding.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0046_parquet_encoding.test index 92c5d76960ba7..0a45d5be70d28 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0046_parquet_encoding.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0046_parquet_encoding.test @@ -8,11 +8,13 @@ use test_tbl_opt_parquet_encoding; # Create table with parquet encoding option # ############################################# + +# by default, enable_parquet_dictionary = 'true' statement ok -create or replace table t_encoded (c int, s string) enable_parquet_dictionary = 'true' storage_format = 'parquet'; +create or replace table t_encoded (c int, s string) storage_format = 'parquet'; statement ok -create or replace table t(c int, s string) storage_format = 'parquet'; +create or replace table t(c int, s string) enable_parquet_dictionary = 'false' storage_format = 'parquet'; statement ok insert into t_encoded(c, s) select 1 as c, to_string(1) as s from numbers(1000000); @@ -43,7 +45,7 @@ with # 1. prepare plain encoded data and keep the file size statement ok -create or replace table tbl (c int, s string) storage_format = 'parquet'; +create or replace table tbl (c int, s string) storage_format = 'parquet' enable_parquet_dictionary = 'false'; statement ok insert into tbl(c, s) select 1 as c, to_string(1) as s from numbers(1000000); diff --git a/tests/sqllogictests/suites/ee/01_ee_system/01_0002_virtual_column.test b/tests/sqllogictests/suites/ee/01_ee_system/01_0002_virtual_column.test index 62715a081c148..816021553ec89 100644 --- a/tests/sqllogictests/suites/ee/01_ee_system/01_0002_virtual_column.test +++ b/tests/sqllogictests/suites/ee/01_ee_system/01_0002_virtual_column.test @@ -143,29 +143,29 @@ test_virtual_column t2 val 3000000002 ['c'] String query II select row_count, virtual_column_size from fuse_block('test_virtual_column', 't2') ---- -3 833 +3 809 query III select block_count, row_count, virtual_column_size from fuse_segment('test_virtual_column', 't2'); ---- -1 3 833 +1 3 809 query III select block_count, row_count, virtual_column_size from fuse_snapshot('test_virtual_column', 't2'); ---- -1 3 833 +1 3 809 query IITTIII select virtual_block_size, row_count, column_name, column_type, column_id, block_offset, bytes_compressed from fuse_virtual_column('test_virtual_column', 't2') ---- -833 3 val['a'] UInt64 NULL 3000000000 4 48 -833 3 val['b'] UInt64 NULL 3000000001 52 48 -833 3 val['c'] String NULL 3000000002 100 48 +809 3 val['a'] UInt64 NULL 3000000000 4 40 +809 3 val['b'] UInt64 NULL 3000000001 44 40 +809 3 val['c'] String NULL 3000000002 84 40 query IIIIII select block_count, row_count, bytes_uncompressed, bytes_compressed, index_size, virtual_block_count from fuse_segment('test_virtual_column', 't2') ---- -1 3 134 712 1271 1 +1 3 134 734 1247 1 statement ok insert into t2 values(4, '{"a":44,"b":4,"c":"value"}'), (5, '{"a":55,"b":5,"c":"bend"}'), (6, '6') diff --git a/tests/sqllogictests/suites/mode/cluster/filter_nulls.test b/tests/sqllogictests/suites/mode/cluster/filter_nulls.test index ff54fe042adaa..29a426f6c1e31 100644 --- a/tests/sqllogictests/suites/mode/cluster/filter_nulls.test +++ b/tests/sqllogictests/suites/mode/cluster/filter_nulls.test @@ -60,7 +60,7 @@ Exchange │ ├── scan id: 1 │ ├── output columns: [value (#1)] │ ├── read rows: 1000 - │ ├── read size: 2.30 KiB + │ ├── read size: 2.31 KiB │ ├── partitions total: 6 │ ├── partitions scanned: 3 │ ├── pruning stats: [segments: >, blocks: >] @@ -75,7 +75,7 @@ Exchange ├── scan id: 0 ├── output columns: [value (#0)] ├── read rows: 2000 - ├── read size: 3.94 KiB + ├── read size: 3.95 KiB ├── partitions total: 6 ├── partitions scanned: 3 ├── pruning stats: [segments: >, blocks: >] @@ -124,7 +124,7 @@ Exchange │ │ ├── scan id: 1 │ │ ├── output columns: [value (#1)] │ │ ├── read rows: 1000 - │ │ ├── read size: 2.30 KiB + │ │ ├── read size: 2.31 KiB │ │ ├── partitions total: 6 │ │ ├── partitions scanned: 3 │ │ ├── pruning stats: [segments: >, blocks: >] @@ -139,7 +139,7 @@ Exchange │ ├── scan id: 2 │ ├── output columns: [value (#2)] │ ├── read rows: 2000 - │ ├── read size: 3.94 KiB + │ ├── read size: 3.95 KiB │ ├── partitions total: 6 │ ├── partitions scanned: 3 │ ├── pruning stats: [segments: >, blocks: >] @@ -155,7 +155,7 @@ Exchange ├── scan id: 0 ├── output columns: [value (#0)] ├── read rows: 2000 - ├── read size: 3.94 KiB + ├── read size: 3.95 KiB ├── partitions total: 6 ├── partitions scanned: 3 ├── pruning stats: [segments: >, blocks: >] @@ -190,7 +190,7 @@ Exchange │ ├── scan id: 1 │ ├── output columns: [value (#1)] │ ├── read rows: 1000 - │ ├── read size: 2.30 KiB + │ ├── read size: 2.31 KiB │ ├── partitions total: 6 │ ├── partitions scanned: 3 │ ├── pruning stats: [segments: >, blocks: >] @@ -205,7 +205,7 @@ Exchange ├── scan id: 0 ├── output columns: [value (#0)] ├── read rows: 2000 - ├── read size: 3.94 KiB + ├── read size: 3.95 KiB ├── partitions total: 6 ├── partitions scanned: 3 ├── pruning stats: [segments: >, blocks: >] @@ -240,7 +240,7 @@ Exchange │ ├── scan id: 1 │ ├── output columns: [value (#1)] │ ├── read rows: 1000 - │ ├── read size: 2.30 KiB + │ ├── read size: 2.31 KiB │ ├── partitions total: 6 │ ├── partitions scanned: 3 │ ├── pruning stats: [segments: >, blocks: >] @@ -258,7 +258,7 @@ Exchange ├── scan id: 0 ├── output columns: [value (#0)] ├── read rows: 2000 - ├── read size: 3.94 KiB + ├── read size: 3.95 KiB ├── partitions total: 6 ├── partitions scanned: 3 ├── pruning stats: [segments: >, blocks: >] diff --git a/tests/sqllogictests/suites/mode/cluster/lazy_read.test b/tests/sqllogictests/suites/mode/cluster/lazy_read.test index fbd5e1208151f..80bbbb252a41e 100644 --- a/tests/sqllogictests/suites/mode/cluster/lazy_read.test +++ b/tests/sqllogictests/suites/mode/cluster/lazy_read.test @@ -119,7 +119,7 @@ Limit ├── scan id: 0 ├── output columns: [a (#0), b (#1), c (#2), d (#3), e (#6)] ├── read rows: 200 - ├── read size: 1.12 KiB + ├── read size: 1.26 KiB ├── partitions total: 3 ├── partitions scanned: 2 ├── pruning stats: [segments: >, blocks: , topn pruning: 3 to 2 cost: >] diff --git a/tests/sqllogictests/suites/mode/standalone/explain/fold_agg.test b/tests/sqllogictests/suites/mode/standalone/explain/fold_agg.test index 9e55dd5f0540c..65939ba5af0a7 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/fold_agg.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/fold_agg.test @@ -86,7 +86,7 @@ AggregateFinal ├── scan id: 0 ├── output columns: [number (#0)] ├── read rows: 3000 - ├── read size: 5.89 KiB + ├── read size: 5.90 KiB ├── partitions total: 2 ├── partitions scanned: 2 ├── pruning stats: [segments: >, blocks: >] @@ -110,7 +110,7 @@ AggregateFinal ├── scan id: 0 ├── output columns: [number (#0)] ├── read rows: 3000 - ├── read size: 5.89 KiB + ├── read size: 5.90 KiB ├── partitions total: 2 ├── partitions scanned: 2 ├── pruning stats: [segments: >, blocks: >] diff --git a/tests/sqllogictests/suites/mode/standalone/explain/index/explain_ngram_index.test b/tests/sqllogictests/suites/mode/standalone/explain/index/explain_ngram_index.test index 6637fdb6872e6..fcb2720dd8774 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/index/explain_ngram_index.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/index/explain_ngram_index.test @@ -47,7 +47,7 @@ Filter ├── scan id: 0 ├── output columns: [id (#0), content (#1)] ├── read rows: 16 - ├── read size: 1.03 KiB + ├── read size: < 1 KiB ├── partitions total: 8 ├── partitions scanned: 8 ├── pruning stats: [segments: >, blocks: , bloom pruning: 8 to 8 cost: >] @@ -104,7 +104,7 @@ Filter ├── scan id: 0 ├── output columns: [id (#0), content (#1)] ├── read rows: 16 - ├── read size: 1.03 KiB + ├── read size: < 1 KiB ├── partitions total: 8 ├── partitions scanned: 8 ├── pruning stats: [segments: >, blocks: , bloom pruning: 8 to 8 cost: >] diff --git a/tests/sqllogictests/suites/mode/standalone/explain/selectivity/modulo.test b/tests/sqllogictests/suites/mode/standalone/explain/selectivity/modulo.test index 4c30b49fb5f4c..aa2340bbed87e 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/selectivity/modulo.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/selectivity/modulo.test @@ -23,7 +23,7 @@ AggregateFinal ├── scan id: 0 ├── output columns: [number (#0)] ├── read rows: 1000 - ├── read size: 1.40 KiB + ├── read size: 1.41 KiB ├── partitions total: 1 ├── partitions scanned: 1 ├── pruning stats: [segments: >, blocks: >] @@ -52,7 +52,7 @@ AggregateFinal ├── scan id: 0 ├── output columns: [number (#0)] ├── read rows: 1000 - ├── read size: 1.40 KiB + ├── read size: 1.41 KiB ├── partitions total: 1 ├── partitions scanned: 1 ├── pruning stats: [segments: >, blocks: >]