Skip to content

Commit

Permalink
feat: add parquet page filter (#664)
Browse files Browse the repository at this point in the history
## Rationale
Part of #589

## Detailed Changes
- Introduce `PagePruningPredicate` when build `ParquetRecordBatchStream`

## Test Plan
  • Loading branch information
jiacai2050 authored Jun 5, 2023
1 parent 96fc8ab commit d5593b7
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 31 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
target
.DS_Store
.idea/
.vscode
.vscode
.dir-locals.el
38 changes: 19 additions & 19 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ cluster = { path = "cluster" }
criterion = "0.3"
common_types = { path = "common_types" }
common_util = { path = "common_util" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "06e9f53637f20dd91bef43b74942ec36c38c22d5" }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev = "06e9f53637f20dd91bef43b74942ec36c38c22d5" }
datafusion = { git = "https://github.com/jiacai2050/arrow-datafusion.git", rev = "13314c37020b90246db9b80f8294370c06e61018" }
datafusion-proto = { git = "https://github.com/jiacai2050/arrow-datafusion.git", rev = "13314c37020b90246db9b80f8294370c06e61018" }
df_operator = { path = "df_operator" }
etcd-client = "0.10.3"
env_logger = "0.6"
Expand All @@ -89,10 +89,10 @@ lazy_static = "1.4.0"
log = "0.4"
logger = { path = "components/logger" }
lru = "0.7.6"
influxql-logical-planner = { git = "https://github.com/CeresDB/influxql", package = "iox_query_influxql" }
influxql-parser = { git = "https://github.com/CeresDB/influxql", package = "influxdb_influxql_parser" }
influxql-query = { git = "https://github.com/CeresDB/influxql", package = "iox_query" }
influxql-schema = { git = "https://github.com/CeresDB/influxql", package = "schema" }
influxql-logical-planner = { git = "https://github.com/CeresDB/influxql", rev = "efbc589", package = "iox_query_influxql" }
influxql-parser = { git = "https://github.com/CeresDB/influxql", rev = "efbc589", package = "influxdb_influxql_parser" }
influxql-query = { git = "https://github.com/CeresDB/influxql", rev = "efbc589", package = "iox_query" }
influxql-schema = { git = "https://github.com/CeresDB/influxql", rev = "efbc589", package = "schema" }
interpreters = { path = "interpreters" }
itertools = "0.10.5"
meta_client = { path = "meta_client" }
Expand Down
70 changes: 65 additions & 5 deletions analytic_engine/src/sst/parquet/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,22 @@ use common_util::{
runtime::{AbortOnDropMany, JoinHandle, Runtime},
time::InstantExt,
};
use datafusion::{
common::ToDFSchema,
physical_expr::{create_physical_expr, execution_props::ExecutionProps},
physical_plan::{
file_format::{parquet::page_filter::PagePruningPredicate, ParquetFileMetrics},
metrics::ExecutionPlanMetricsSet,
},
};
use futures::{future::BoxFuture, FutureExt, Stream, StreamExt, TryFutureExt};
use log::{debug, error};
use object_store::{ObjectStoreRef, Path};
use parquet::{
arrow::{async_reader::AsyncFileReader, ParquetRecordBatchStreamBuilder, ProjectionMask},
arrow::{
arrow_reader::RowSelection, async_reader::AsyncFileReader, ParquetRecordBatchStreamBuilder,
ProjectionMask,
},
file::metadata::RowGroupMetaData,
};
use parquet_ext::meta_data::ChunkReader;
Expand Down Expand Up @@ -71,6 +82,7 @@ pub struct Reader<'a> {

/// Options for `read_parallelly`
metrics: Metrics,
df_plan_metrics: ExecutionPlanMetricsSet,
}

#[derive(Default, Debug, Clone, TraceMetricWhenDrop)]
Expand All @@ -94,7 +106,7 @@ impl<'a> Reader<'a> {
metrics_collector: Option<MetricsCollector>,
) -> Self {
let store = store_picker.pick_by_freq(options.frequency);

let df_plan_metrics = ExecutionPlanMetricsSet::new();
let metrics = Metrics {
metrics_collector,
..Default::default()
Expand All @@ -112,6 +124,7 @@ impl<'a> Reader<'a> {
meta_data: None,
row_projector: None,
metrics,
df_plan_metrics,
}
}

Expand Down Expand Up @@ -182,6 +195,36 @@ impl<'a> Reader<'a> {
suggested.min(num_row_groups).max(1)
}

fn build_row_selection(
&self,
arrow_schema: SchemaRef,
row_groups: &[usize],
file_metadata: &parquet_ext::ParquetMetaData,
) -> Result<Option<RowSelection>> {
// TODO: remove fixed partition
let partition = 0;
let exprs = datafusion::optimizer::utils::conjunction(self.predicate.exprs().to_vec());
let exprs = match exprs {
Some(exprs) => exprs,
None => return Ok(None),
};

let df_schema = arrow_schema
.clone()
.to_dfschema()
.context(DataFusionError)?;
let physical_expr =
create_physical_expr(&exprs, &df_schema, &arrow_schema, &ExecutionProps::new())
.context(DataFusionError)?;
let page_predicate = PagePruningPredicate::try_new(&physical_expr, arrow_schema.clone())
.context(DataFusionError)?;

let metrics = ParquetFileMetrics::new(partition, self.path.as_ref(), &self.df_plan_metrics);
page_predicate
.prune(row_groups, file_metadata, &metrics)
.context(DataFusionError)
}

async fn fetch_record_batch_streams(
&mut self,
suggested_parallelism: usize,
Expand All @@ -190,10 +233,10 @@ impl<'a> Reader<'a> {

let meta_data = self.meta_data.as_ref().unwrap();
let row_projector = self.row_projector.as_ref().unwrap();

let arrow_schema = meta_data.custom().schema.to_arrow_schema_ref();
// Get target row groups.
let target_row_groups = self.prune_row_groups(
meta_data.custom().schema.to_arrow_schema_ref(),
arrow_schema.clone(),
meta_data.parquet().row_groups(),
meta_data.custom().parquet_filter.as_ref(),
)?;
Expand Down Expand Up @@ -226,6 +269,7 @@ impl<'a> Reader<'a> {
target_row_group_chunks[chunk_idx].push(row_group);
}

let parquet_metadata = meta_data.parquet();
let proj_mask = ProjectionMask::leaves(
meta_data.parquet().file_metadata().schema_descr(),
row_projector.existed_source_projection().iter().copied(),
Expand All @@ -239,9 +283,15 @@ impl<'a> Reader<'a> {
for chunk in target_row_group_chunks {
let object_store_reader =
ObjectStoreReader::new(self.store.clone(), self.path.clone(), meta_data.clone());
let builder = ParquetRecordBatchStreamBuilder::new(object_store_reader)
let mut builder = ParquetRecordBatchStreamBuilder::new(object_store_reader)
.await
.with_context(|| ParquetError)?;
let row_selection =
self.build_row_selection(arrow_schema.clone(), &chunk, parquet_metadata)?;
if let Some(selection) = row_selection {
builder = builder.with_row_selection(selection);
};

let stream = builder
.with_batch_size(self.num_rows_per_row_group)
.with_row_groups(chunk)
Expand Down Expand Up @@ -353,6 +403,16 @@ impl<'a> Reader<'a> {
}
}

impl<'a> Drop for Reader<'a> {
fn drop(&mut self) {
debug!(
"Parquet reader dropped, path:{:?}, df_plan_metrics:{}",
self.path,
self.df_plan_metrics.clone_inner().to_string()
);
}
}

#[derive(Clone)]
struct ObjectStoreReader {
storage: ObjectStoreRef,
Expand Down
3 changes: 3 additions & 0 deletions analytic_engine/src/sst/parquet/row_group_pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ pub struct RowGroupPruner<'a> {
}

impl<'a> RowGroupPruner<'a> {
// TODO: DataFusion already change predicates to PhyscialExpr, we should keep up
// with upstream.
// https://github.com/apache/arrow-datafusion/issues/4695
pub fn try_new(
schema: &'a SchemaRef,
row_groups: &'a [RowGroupMetaData],
Expand Down

0 comments on commit d5593b7

Please sign in to comment.