Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/query/expression/src/converts/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

mod from;
mod to;
pub use self::to::table_schema_arrow_leaf_paths;

pub const EXTENSION_KEY: &str = "Extension";
pub const ARROW_EXT_TYPE_EMPTY_ARRAY: &str = "EmptyArray";
Expand Down
84 changes: 82 additions & 2 deletions src/query/expression/src/converts/arrow/to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use super::ARROW_EXT_TYPE_VARIANT;
use super::ARROW_EXT_TYPE_VECTOR;
use super::EXTENSION_KEY;
use crate::infer_table_schema;
use crate::schema::is_internal_column_id;
use crate::types::DataType;
use crate::types::DecimalColumn;
use crate::types::DecimalDataType;
Expand All @@ -50,6 +51,7 @@ use crate::types::VectorColumn;
use crate::types::VectorDataType;
use crate::with_number_type;
use crate::Column;
use crate::ColumnId;
use crate::DataBlock;
use crate::DataField;
use crate::DataSchema;
Expand Down Expand Up @@ -82,6 +84,57 @@ impl From<&TableSchema> for Schema {
}
}

pub fn table_schema_arrow_leaf_paths(table_schema: &TableSchema) -> Vec<(ColumnId, Vec<String>)> {
let mut arrow_paths = Vec::new();
for field in table_schema.fields() {
if is_internal_column_id(field.column_id()) {
continue;
}

let arrow_field = Field::from(field);
let mut current_path = vec![arrow_field.name().clone()];
collect_arrow_leaf_paths(&arrow_field, &mut current_path, &mut arrow_paths);
}

let leaf_fields = table_schema.leaf_fields();
debug_assert_eq!(leaf_fields.len(), arrow_paths.len());

leaf_fields
.into_iter()
.zip(arrow_paths)
.map(|(field, path)| (field.column_id(), path))
.collect()
}

fn collect_arrow_leaf_paths(
arrow_field: &Field,
current_path: &mut Vec<String>,
paths: &mut Vec<Vec<String>>,
) {
match arrow_field.data_type() {
ArrowDataType::Struct(children) => {
for child in children {
current_path.push(child.name().clone());
collect_arrow_leaf_paths(child.as_ref(), current_path, paths);
current_path.pop();
}
}
ArrowDataType::Map(child, _) => {
current_path.push(child.name().clone());
collect_arrow_leaf_paths(child.as_ref(), current_path, paths);
current_path.pop();
}
ArrowDataType::LargeList(child)
| ArrowDataType::List(child)
| ArrowDataType::FixedSizeList(child, _) => {
current_path.push(child.name().clone());
collect_arrow_leaf_paths(child.as_ref(), current_path, paths);
current_path.pop();
}
_ => paths.push(current_path.clone()),
}
}

impl From<&DataType> for ArrowDataType {
fn from(ty: &DataType) -> Self {
let fields = DataField::new("dummy", ty.clone());
Expand All @@ -93,7 +146,6 @@ impl From<&DataType> for ArrowDataType {
impl From<&TableField> for Field {
fn from(f: &TableField) -> Self {
let mut metadata = HashMap::new();

let ty = match &f.data_type {
TableDataType::Null => ArrowDataType::Null,
TableDataType::EmptyArray => {
Expand Down Expand Up @@ -264,6 +316,30 @@ impl DataBlock {
self.to_record_batch(&table_schema)
}

pub fn to_record_batch_with_arrow_schema(
self,
arrow_schema: &Arc<arrow_schema::Schema>,
) -> Result<RecordBatch> {
let num_fields = arrow_schema.fields.len();
if self.columns().len() != num_fields {
return Err(ErrorCode::Internal(format!(
"The number of columns in the data block does not match the number of fields in the table schema, block_columns: {}, table_schema_fields: {}",
self.columns().len(),
num_fields,
)));
}

if num_fields == 0 {
return Ok(RecordBatch::try_new_with_options(
Arc::new(Schema::empty()),
vec![],
&RecordBatchOptions::default().with_row_count(Some(self.num_rows())),
)?);
}

self.build_record_batch(arrow_schema.clone())
}

pub fn to_record_batch(self, table_schema: &TableSchema) -> Result<RecordBatch> {
if self.columns().len() != table_schema.num_fields() {
return Err(ErrorCode::Internal(format!(
Expand All @@ -282,14 +358,18 @@ impl DataBlock {
}

let arrow_schema = Schema::from(table_schema);
self.build_record_batch(Arc::new(arrow_schema))
}

fn build_record_batch(self, arrow_schema: Arc<arrow_schema::Schema>) -> Result<RecordBatch> {
let mut arrays = Vec::with_capacity(self.columns().len());
for (entry, arrow_field) in self.take_columns().into_iter().zip(arrow_schema.fields()) {
let array = entry.to_column().maybe_gc().into_arrow_rs();

// Adjust struct array names
arrays.push(Self::adjust_nested_array(array, arrow_field.as_ref()));
}
Ok(RecordBatch::try_new(Arc::new(arrow_schema), arrays)?)
Ok(RecordBatch::try_new(arrow_schema, arrays)?)
}

fn adjust_nested_array(array: Arc<dyn Array>, arrow_field: &Field) -> Arc<dyn Array> {
Expand Down
211 changes: 200 additions & 11 deletions src/query/storages/common/blocks/src/parquet_rs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,23 @@
use std::sync::Arc;

use databend_common_exception::Result;
use databend_common_expression::converts::arrow::table_schema_arrow_leaf_paths;
use databend_common_expression::ColumnId;
use databend_common_expression::DataBlock;
use databend_common_expression::TableSchema;
use databend_storages_common_table_meta::meta::StatisticsOfColumns;
use databend_storages_common_table_meta::table::TableCompression;
use parquet::arrow::ArrowWriter;
use parquet::basic::Encoding;
use parquet::file::metadata::KeyValue;
use parquet::file::properties::EnabledStatistics;
use parquet::file::properties::WriterProperties;
use parquet::file::properties::WriterPropertiesBuilder;
use parquet::file::properties::WriterVersion;
use parquet::format::FileMetaData;
use parquet::schema::types::ColumnPath;

/// Disable dictionary encoding once the NDV-to-row ratio is greater than this threshold.
const HIGH_CARDINALITY_RATIO_THRESHOLD: f64 = 0.1;

/// Serialize data blocks to parquet format.
pub fn blocks_to_parquet(
Expand All @@ -35,16 +41,58 @@ pub fn blocks_to_parquet(
compression: TableCompression,
enable_dictionary: bool,
metadata: Option<Vec<KeyValue>>,
) -> Result<FileMetaData> {
blocks_to_parquet_with_stats(
table_schema,
blocks,
write_buffer,
compression,
enable_dictionary,
metadata,
None,
)
}

/// Serialize blocks while optionally tuning dictionary behavior via NDV statistics.
///
/// * `table_schema` - Logical schema used to build Arrow batches.
/// * `blocks` - In-memory blocks that will be serialized into a single Parquet file.
/// * `write_buffer` - Destination buffer that receives the serialized Parquet bytes.
/// * `compression` - Compression algorithm specified by table-level settings.
/// * `enable_dictionary` - Enables dictionary encoding globally before per-column overrides.
/// * `metadata` - Additional user metadata embedded into the Parquet footer.
/// * `column_stats` - Optional NDV stats from the first block, used to configure writer properties
/// before ArrowWriter instantiation disables further changes.
pub fn blocks_to_parquet_with_stats(
table_schema: &TableSchema,
blocks: Vec<DataBlock>,
write_buffer: &mut Vec<u8>,
compression: TableCompression,
enable_dictionary: bool,
metadata: Option<Vec<KeyValue>>,
column_stats: Option<&StatisticsOfColumns>,
) -> Result<FileMetaData> {
assert!(!blocks.is_empty());
let builder = parquet_writer_properties_builder(compression, enable_dictionary, metadata);

let props = builder.build();
// Writer properties cannot be tweaked after ArrowWriter creation, so we mirror the behavior of
// the streaming writer and only rely on the first block's NDV (and row count) snapshot.
let num_rows = blocks[0].num_rows();
let arrow_schema = Arc::new(table_schema.into());

let props = build_parquet_writer_properties(
compression,
enable_dictionary,
column_stats,
metadata,
num_rows,
table_schema,
);

let batches = blocks
.into_iter()
.map(|block| block.to_record_batch(table_schema))
.map(|block| block.to_record_batch_with_arrow_schema(&arrow_schema))
.collect::<Result<Vec<_>>>()?;
let arrow_schema = Arc::new(table_schema.into());

let mut writer = ArrowWriter::try_new(write_buffer, arrow_schema, Some(props))?;
for batch in batches {
writer.write(&batch)?;
Expand All @@ -53,12 +101,16 @@ pub fn blocks_to_parquet(
Ok(file_meta)
}

pub fn parquet_writer_properties_builder(
/// Create writer properties, optionally disabling dictionaries for high-cardinality columns.
pub fn build_parquet_writer_properties(
compression: TableCompression,
enable_dictionary: bool,
cols_stats: Option<impl NdvProvider>,
metadata: Option<Vec<KeyValue>>,
) -> WriterPropertiesBuilder {
let builder = WriterProperties::builder()
num_rows: usize,
table_schema: &TableSchema,
) -> WriterProperties {
let mut builder = WriterProperties::builder()
.set_compression(compression.into())
// use `usize::MAX` to effectively limit the number of row groups to 1
.set_max_row_group_size(usize::MAX)
Expand All @@ -68,10 +120,147 @@ pub fn parquet_writer_properties_builder(
.set_key_value_metadata(metadata);

if enable_dictionary {
builder
// Enable dictionary for all columns
builder = builder
.set_writer_version(WriterVersion::PARQUET_2_0)
.set_dictionary_enabled(true)
.set_dictionary_enabled(true);
if let Some(cols_stats) = cols_stats {
// Disable dictionary of columns that have high cardinality
for (column_id, components) in table_schema_arrow_leaf_paths(table_schema) {
if let Some(ndv) = cols_stats.column_ndv(&column_id) {
if num_rows > 0
&& (ndv as f64 / num_rows as f64) > HIGH_CARDINALITY_RATIO_THRESHOLD
{
builder = builder
.set_column_dictionary_enabled(ColumnPath::from(components), false);
}
}
}
}
builder.build()
} else {
builder.set_dictionary_enabled(false)
builder.set_dictionary_enabled(false).build()
}
}

/// Provides per column NDV statistics
pub trait NdvProvider {
fn column_ndv(&self, column_id: &ColumnId) -> Option<u64>;
}

impl NdvProvider for &StatisticsOfColumns {
fn column_ndv(&self, column_id: &ColumnId) -> Option<u64> {
self.get(column_id).and_then(|item| item.distinct_of_values)
}
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;

use databend_common_expression::types::number::NumberDataType;
use databend_common_expression::TableDataType;
use databend_common_expression::TableField;

use super::*;

struct TestNdvProvider {
ndv: HashMap<ColumnId, u64>,
}

impl NdvProvider for TestNdvProvider {
fn column_ndv(&self, column_id: &ColumnId) -> Option<u64> {
self.ndv.get(column_id).copied()
}
}

fn sample_schema() -> TableSchema {
TableSchema::new(vec![
TableField::new("simple", TableDataType::Number(NumberDataType::Int32)),
TableField::new("nested", TableDataType::Tuple {
fields_name: vec!["leaf".to_string(), "arr".to_string()],
fields_type: vec![
TableDataType::Number(NumberDataType::Int64),
TableDataType::Array(Box::new(TableDataType::Number(NumberDataType::UInt64))),
],
}),
TableField::new("no_stats", TableDataType::String),
])
}

fn column_id(schema: &TableSchema, name: &str) -> ColumnId {
schema
.leaf_fields()
.into_iter()
.find(|field| field.name() == name)
.unwrap_or_else(|| panic!("missing field {}", name))
.column_id()
}

#[test]
fn test_build_parquet_writer_properties_handles_nested_leaves() {
let schema = sample_schema();

let mut ndv = HashMap::new();
ndv.insert(column_id(&schema, "simple"), 500);
ndv.insert(column_id(&schema, "nested:leaf"), 50);
ndv.insert(column_id(&schema, "nested:arr:0"), 400);

let column_paths: HashMap<ColumnId, ColumnPath> = table_schema_arrow_leaf_paths(&schema)
.into_iter()
.map(|(id, path)| (id, ColumnPath::from(path)))
.collect();

let props = build_parquet_writer_properties(
TableCompression::Zstd,
true,
Some(TestNdvProvider { ndv }),
None,
1000,
&schema,
);

assert!(
!props.dictionary_enabled(&column_paths[&column_id(&schema, "simple")]),
"high cardinality top-level column should disable dictionary"
);
assert!(
props.dictionary_enabled(&column_paths[&column_id(&schema, "nested:leaf")]),
"low cardinality nested column should keep dictionary"
);
assert!(
!props.dictionary_enabled(&column_paths[&column_id(&schema, "nested:arr:0")]),
"high cardinality nested array element should disable dictionary"
);
assert!(
props.dictionary_enabled(&column_paths[&column_id(&schema, "no_stats")]),
"columns without NDV stats keep the default dictionary behavior"
);
}

#[test]
fn test_build_parquet_writer_properties_disabled_globally() {
let schema = sample_schema();

let column_paths: HashMap<ColumnId, ColumnPath> = table_schema_arrow_leaf_paths(&schema)
.into_iter()
.map(|(id, path)| (id, ColumnPath::from(path)))
.collect();

let props = build_parquet_writer_properties(
TableCompression::Zstd,
false,
None::<TestNdvProvider>,
None,
1000,
&schema,
);

for field in schema.leaf_fields() {
assert!(
!props.dictionary_enabled(&column_paths[&field.column_id()]),
"dictionary must remain disabled when enable_dictionary is false",
);
}
}
}
Loading
Loading