Skip to content

Commit

Permalink
chore: add some comments and minor refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
dantengsky committed Jan 30, 2024
1 parent 5770958 commit 7da4b26
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 12 deletions.
1 change: 1 addition & 0 deletions src/query/storages/common/blocks/src/parquet_rs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub fn blocks_to_parquet(
assert!(!blocks.is_empty());
let props = WriterProperties::builder()
.set_compression(compression.into())
// use `usize::MAX` to effectively limit the number of row groups to 1
.set_max_row_group_size(usize::MAX)
.set_encoding(Encoding::PLAIN)
.set_dictionary_enabled(false)
Expand Down
43 changes: 31 additions & 12 deletions src/query/storages/fuse/src/io/read/block/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ impl BlockReader {
)?;
let mut columns = Vec::with_capacity(self.projected_schema.fields.len());
let name_paths = column_name_paths(&self.projection, &self.original_schema);

let array_cache = if self.put_cache {
CacheManager::instance().get_table_data_array_cache()
} else {
None
};

for ((i, field), column_node) in self
.projected_schema
.fields
Expand All @@ -67,23 +74,29 @@ impl BlockReader {
.zip(self.project_column_nodes.iter())
{
let data_type = field.data_type().into();
if column_node.is_nested
&& column_node
.leaf_column_ids
.iter()
.any(|id| matches!(column_chunks.get(id), Some(DataItem::ColumnArray(_))))
{
return Err(ErrorCode::StorageOther(
"unexpected nested field: nested leaf field hits cached",
));
}

// NOTE, there is something tricky here:
// - `column_chunks` always contains data of leaf columns
// - here we may processing a nested type field
// - But, even if the field being processed is a filed with multiple leaf columns
// `column_chunks.get(&field.column_id)` will still return Some(DataItem::_)[^1],
// even if we are getting data from `column_chunks` using a non-leaf
// `column_id` of `projected_schema.fields`
//
// [^1]: Except in the current block, there is no data stored for the
// corresponding field, and a default value has been declared for
// the corresponding field.
//
// Yes, it is too obscure, we need to polish it later.

let value = match column_chunks.get(&field.column_id) {
Some(DataItem::RawData(data)) => {
// get the deserialized arrow array, which may be a nested array
let arrow_array = column_by_name(&record_batch, &name_paths[i]);
let arrow2_array: Box<dyn databend_common_arrow::arrow::array::Array> =
arrow_array.into();
if self.put_cache && !column_node.is_nested {
if let Some(cache) = CacheManager::instance().get_table_data_array_cache() {
if !column_node.is_nested {
if let Some(cache) = &array_cache {
let meta = column_metas.get(&field.column_id).unwrap();
let (offset, len) = meta.offset_length();
let key =
Expand All @@ -94,6 +107,12 @@ impl BlockReader {
Value::Column(Column::from_arrow(arrow2_array.as_ref(), &data_type)?)
}
Some(DataItem::ColumnArray(cached)) => {
if column_node.is_nested {
// a defensive check, should never happen
return Err(ErrorCode::StorageOther(
"unexpected nested field: nested leaf field hits cached",
));
}
Value::Column(Column::from_arrow(cached.0.as_ref(), &data_type)?)
}
None => Value::Scalar(self.default_vals[i].clone()),
Expand Down

0 comments on commit 7da4b26

Please sign in to comment.