Skip to content
7 changes: 7 additions & 0 deletions src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1333,6 +1333,13 @@ impl DefaultSettings {
scope: SettingScope::Both,
range: Some(SettingRange::Numeric(0..=1)),
}),
("fuse_parquet_read_batch_size", DefaultSettingValue {
value: UserSettingValue::UInt64(8192),
desc: "The batch size while deserializing fuse table with parquet storage format",
mode: SettingMode::Both,
scope: SettingScope::Both,
range: Some(SettingRange::Numeric(1..=1_000_000)),
}),
]);

Ok(Arc::new(DefaultSettings {
Expand Down
4 changes: 4 additions & 0 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -983,4 +983,8 @@ impl Settings {
pub fn get_max_aggregate_restore_worker(&self) -> Result<u64> {
self.try_get_u64("max_aggregate_restore_worker")
}

pub fn get_fuse_parquet_read_batch_size(&self) -> Result<usize> {
Ok(self.try_get_u64("fuse_parquet_read_batch_size")? as usize)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ struct CacheItem {
value: Bytes,
}

#[derive(Clone)]
#[derive(Clone, Eq, PartialEq, Hash)]
pub struct TableDataCacheKey {
cache_key: String,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl AggIndexReader {
self.index_id
}

pub(super) fn apply_agg_info(&self, block: DataBlock) -> Result<DataBlock> {
pub(super) fn apply_agg_info_to_block(&self, block: DataBlock) -> Result<DataBlock> {
let evaluator = Evaluator::new(&block, &self.func_ctx, &BUILTIN_FUNCTIONS);

// 1. Filter the block if there is a filter.
Expand Down Expand Up @@ -145,4 +145,11 @@ impl AggIndexReader {
)),
))
}

pub(super) fn apply_agg_info(&self, block: Vec<DataBlock>) -> Result<Vec<DataBlock>> {
block
.into_iter()
.map(|block| self.apply_agg_info_to_block(block))
.collect::<Result<_>>()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::sync::Arc;
use std::vec;

use databend_common_exception::Result;
use databend_common_expression::DataBlock;
Expand Down Expand Up @@ -167,6 +168,6 @@ impl AggIndexReader {
blocks.push(block);
}
let block = DataBlock::concat(&blocks)?;
self.apply_agg_info(block)
self.apply_agg_info_to_block(block)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,17 +113,19 @@ impl AggIndexReader {
&self,
part: PartInfoPtr,
data: BlockReadResult,
) -> Result<DataBlock> {
batch_size_hint: Option<usize>,
) -> Result<Vec<DataBlock>> {
let columns_chunks = data.columns_chunks()?;
let part = FuseBlockPartInfo::from_part(&part)?;
let block = self.reader.deserialize_parquet_chunks(
let blocks = self.reader.deserialize_parquet_to_blocks(
part.nums_rows,
&part.columns_meta,
columns_chunks,
&part.compression,
&part.location,
batch_size_hint,
)?;

self.apply_agg_info(block)
self.apply_agg_info(blocks)
}
}
20 changes: 14 additions & 6 deletions src/query/storages/fuse/src/io/read/block/parquet/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use arrow_schema::Schema;
use databend_common_expression::ColumnId;
use databend_common_expression::TableSchema;
use databend_storages_common_table_meta::meta::Compression;
use itertools::Itertools;
use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
use parquet::arrow::parquet_to_arrow_field_levels;
use parquet::arrow::ArrowSchemaConverter;
Expand All @@ -34,7 +35,8 @@ pub fn column_chunks_to_record_batch(
num_rows: usize,
column_chunks: &HashMap<ColumnId, DataItem>,
compression: &Compression,
) -> databend_common_exception::Result<RecordBatch> {
batch_size: Option<usize>,
) -> databend_common_exception::Result<Vec<RecordBatch>> {
let arrow_schema = Schema::from(original_schema);
let parquet_schema = ArrowSchemaConverter::new().convert(&arrow_schema)?;

Expand Down Expand Up @@ -66,13 +68,19 @@ pub fn column_chunks_to_record_batch(
ProjectionMask::leaves(&parquet_schema, projection_mask),
Some(arrow_schema.fields()),
)?;
let mut record_reader = ParquetRecordBatchReader::try_new_with_row_groups(

let batch_size = batch_size.unwrap_or(num_rows);
let record_reader = ParquetRecordBatchReader::try_new_with_row_groups(
&field_levels,
row_group.as_ref(),
num_rows,
batch_size,
None,
)?;
let record = record_reader.next().unwrap()?;
assert!(record_reader.next().is_none());
Ok(record)

let records: Vec<_> = record_reader.try_collect()?;
assert_eq!(
num_rows,
records.iter().map(|r| r.num_rows()).sum::<usize>()
);
Ok(records)
}
151 changes: 104 additions & 47 deletions src/query/storages/fuse/src/io/read/block/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::collections::HashMap;

use arrow_array::Array;
use arrow_array::ArrayRef;
use arrow_array::RecordBatch;
use arrow_array::StructArray;
Expand All @@ -35,6 +36,7 @@ mod adapter;
mod deserialize;

pub use adapter::RowGroupImplBuilder;
use databend_common_exception::Result;
pub use deserialize::column_chunks_to_record_batch;

use crate::io::read::block::block_reader_merge_io::DataItem;
Expand All @@ -48,17 +50,41 @@ impl BlockReader {
column_chunks: HashMap<ColumnId, DataItem>,
compression: &Compression,
block_path: &str,
) -> databend_common_exception::Result<DataBlock> {
) -> Result<DataBlock> {
let mut blocks = self.deserialize_parquet_to_blocks(
num_rows,
column_metas,
column_chunks,
compression,
block_path,
None,
)?;
// Defensive check: using `num_rows` as batch_size, expects only one block
assert_eq!(blocks.len(), 1);
Ok(blocks.pop().unwrap())
}

pub(crate) fn deserialize_parquet_to_blocks(
&self,
num_rows: usize,
column_metas: &HashMap<ColumnId, ColumnMeta>,
column_chunks: HashMap<ColumnId, DataItem>,
compression: &Compression,
block_path: &str,
batch_size_hint: Option<usize>,
) -> Result<Vec<DataBlock>> {
if column_chunks.is_empty() {
return self.build_default_values_block(num_rows);
return Ok(vec![self.build_default_values_block(num_rows)?]);
}
let record_batch = column_chunks_to_record_batch(

let record_batches = column_chunks_to_record_batch(
&self.original_schema,
num_rows,
&column_chunks,
compression,
batch_size_hint,
)?;
let mut columns = Vec::with_capacity(self.projected_schema.fields.len());

let name_paths = column_name_paths(&self.projection, &self.original_schema);

let array_cache = if self.put_cache {
Expand All @@ -67,58 +93,89 @@ impl BlockReader {
None
};

for ((i, field), column_node) in self
.projected_schema
.fields
.iter()
.enumerate()
.zip(self.project_column_nodes.iter())
{
let data_type = field.data_type().into();

// NOTE, there is something tricky here:
// - `column_chunks` always contains data of leaf columns
// - here we may processing a nested type field
// - But, even if the field being processed is a field with multiple leaf columns
// `column_chunks.get(&field.column_id)` will still return Some(DataItem::_)[^1],
// even if we are getting data from `column_chunks` using a non-leaf
// `column_id` of `projected_schema.fields`
//
// [^1]: Except in the current block, there is no data stored for the
// corresponding field, and a default value has been declared for
// the corresponding field.
//
// Yes, it is too obscure, we need to polish it later.

let value = match column_chunks.get(&field.column_id) {
Some(DataItem::RawData(data)) => {
// get the deserialized arrow array, which may be a nested array
let arrow_array = column_by_name(&record_batch, &name_paths[i]);
if !column_node.is_nested {
if let Some(cache) = &array_cache {
let mut blocks = Vec::with_capacity(record_batches.len());
let mut array_cache_buffer = HashMap::with_capacity(record_batches.len());

let mut offset = 0;
for record_batch in record_batches {
let num_rows_record_batch = record_batch.num_rows();
let mut columns = Vec::with_capacity(self.projected_schema.fields.len());
for ((i, field), column_node) in self
.projected_schema
.fields
.iter()
.enumerate()
.zip(self.project_column_nodes.iter())
{
let data_type = field.data_type().into();

// NOTE, there is something tricky here:
// - `column_chunks` always contains data of leaf columns
// - here we may processing a nested type field
// - But, even if the field being processed is a field with multiple leaf columns
// `column_chunks.get(&field.column_id)` will still return Some(DataItem::_)[^1],
// even if we are getting data from `column_chunks` using a non-leaf
// `column_id` of `projected_schema.fields`
//
// [^1]: Except in the current block, there is no data stored for the
// corresponding field, and a default value has been declared for
// the corresponding field.
//
// It is too confusing, we need to polish it SOON.

let value = match column_chunks.get(&field.column_id) {
Some(DataItem::RawData(data)) => {
// get the deserialized arrow array, which may be a nested array
let arrow_array = column_by_name(&record_batch, &name_paths[i]);
if !column_node.is_nested && array_cache.is_some() {
let meta = column_metas.get(&field.column_id).unwrap();
let (offset, len) = meta.offset_length();
let key =
TableDataCacheKey::new(block_path, field.column_id, offset, len);
cache.insert(key.into(), (arrow_array.clone(), data.len()));
array_cache_buffer
.entry(key)
.and_modify(|v: &mut Vec<_>| {
v.push((arrow_array.clone(), data.len()))
})
.or_insert(vec![(arrow_array.clone(), data.len())]);
}
Value::from_arrow_rs(arrow_array, &data_type)?
}
Value::from_arrow_rs(arrow_array, &data_type)?
}
Some(DataItem::ColumnArray(cached)) => {
if column_node.is_nested {
// a defensive check, should never happen
return Err(ErrorCode::StorageOther(
"unexpected nested field: nested leaf field hits cached",
));
Some(DataItem::ColumnArray(cached)) => {
if column_node.is_nested {
// a defensive check, should never happen
return Err(ErrorCode::StorageOther(
"unexpected nested field: nested leaf field hits cached",
));
}
let array = cached.0.slice(offset, record_batch.num_rows());
Value::from_arrow_rs(array, &data_type)?
}
Value::from_arrow_rs(cached.0.clone(), &data_type)?
None => Value::Scalar(self.default_vals[i].clone()),
};
columns.push(BlockEntry::new(data_type, value));
}

offset += record_batch.num_rows();
blocks.push(DataBlock::new(columns, num_rows_record_batch));
}

// TODO doc this
if let Some(array_cache) = &array_cache {
for (key, items) in array_cache_buffer {
let mut arrays = Vec::with_capacity(items.len());
let mut len = 0;
for (array, size) in &items {
arrays.push(array.as_ref());
len += size;
}
None => Value::Scalar(self.default_vals[i].clone()),
};
columns.push(BlockEntry::new(data_type, value));
use arrow::compute::concat;
let result = concat(&arrays)?;
array_cache.insert(key.into(), (result, len));
}
}
Ok(DataBlock::new(columns, num_rows))

Ok(blocks)
}
}

Expand Down
Loading
Loading