Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 53 additions & 12 deletions datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,30 +161,71 @@ fn nanos_from_timestamp(ts: &Timestamp) -> i64 {
}

// Test different detail level for config `datafusion.explain.analyze_level`

async fn collect_plan_with_context(
sql_str: &str,
ctx: &SessionContext,
level: ExplainAnalyzeLevel,
) -> String {
{
let state = ctx.state_ref();
let mut state = state.write();
state.config_mut().options_mut().explain.analyze_level = level;
}
let dataframe = ctx.sql(sql_str).await.unwrap();
let batches = dataframe.collect().await.unwrap();
arrow::util::pretty::pretty_format_batches(&batches)
.unwrap()
.to_string()
}

async fn collect_plan(sql_str: &str, level: ExplainAnalyzeLevel) -> String {
let ctx = SessionContext::new();
collect_plan_with_context(sql_str, &ctx, level).await
}

#[tokio::test]
async fn explain_analyze_level() {
async fn collect_plan(level: ExplainAnalyzeLevel) -> String {
let mut config = SessionConfig::new();
config.options_mut().explain.analyze_level = level;
let ctx = SessionContext::new_with_config(config);
let sql = "EXPLAIN ANALYZE \
let sql = "EXPLAIN ANALYZE \
SELECT * \
FROM generate_series(10) as t1(v1) \
ORDER BY v1 DESC";
let dataframe = ctx.sql(sql).await.unwrap();
let batches = dataframe.collect().await.unwrap();
arrow::util::pretty::pretty_format_batches(&batches)
.unwrap()
.to_string()
}

for (level, needle, should_contain) in [
(ExplainAnalyzeLevel::Summary, "spill_count", false),
(ExplainAnalyzeLevel::Summary, "output_rows", true),
(ExplainAnalyzeLevel::Dev, "spill_count", true),
(ExplainAnalyzeLevel::Dev, "output_rows", true),
] {
let plan = collect_plan(level).await;
let plan = collect_plan(sql, level).await;
assert_eq!(
plan.contains(needle),
should_contain,
"plan for level {level:?} unexpected content: {plan}"
);
}
}

#[tokio::test]
async fn explain_analyze_level_datasource_parquet() {
let table_name = "tpch_lineitem_small";
let parquet_path = "tests/data/tpch_lineitem_small.parquet";
let sql = format!("EXPLAIN ANALYZE SELECT * FROM {table_name}");

// Register test parquet file into context
let ctx = SessionContext::new();
ctx.register_parquet(table_name, parquet_path, ParquetReadOptions::default())
.await
.expect("register parquet table for explain analyze test");

for (level, needle, should_contain) in [
(ExplainAnalyzeLevel::Summary, "metadata_load_time", true),
(ExplainAnalyzeLevel::Summary, "page_index_eval_time", false),
(ExplainAnalyzeLevel::Dev, "metadata_load_time", true),
(ExplainAnalyzeLevel::Dev, "page_index_eval_time", true),
] {
let plan = collect_plan_with_context(&sql, &ctx, level).await;

assert_eq!(
plan.contains(needle),
should_contain,
Expand Down
53 changes: 34 additions & 19 deletions datafusion/datasource-parquet/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use datafusion_physical_plan::metrics::{
Count, ExecutionPlanMetricsSet, MetricBuilder, Time,
Count, ExecutionPlanMetricsSet, MetricBuilder, MetricType, Time,
};

/// Stores metrics about the parquet execution for a particular parquet file.
Expand Down Expand Up @@ -88,30 +88,59 @@ impl ParquetFileMetrics {
filename: &str,
metrics: &ExecutionPlanMetricsSet,
) -> Self {
let predicate_evaluation_errors = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.counter("predicate_evaluation_errors", partition);

// -----------------------
// 'summary' level metrics
// -----------------------
let row_groups_matched_bloom_filter = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.with_type(MetricType::SUMMARY)
.counter("row_groups_matched_bloom_filter", partition);

let row_groups_pruned_bloom_filter = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.with_type(MetricType::SUMMARY)
.counter("row_groups_pruned_bloom_filter", partition);

let row_groups_matched_statistics = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.with_type(MetricType::SUMMARY)
.counter("row_groups_matched_statistics", partition);

let row_groups_pruned_statistics = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.with_type(MetricType::SUMMARY)
.counter("row_groups_pruned_statistics", partition);

let page_index_rows_pruned = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.with_type(MetricType::SUMMARY)
.counter("page_index_rows_pruned", partition);
let page_index_rows_matched = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.with_type(MetricType::SUMMARY)
.counter("page_index_rows_matched", partition);

let bytes_scanned = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.with_type(MetricType::SUMMARY)
.counter("bytes_scanned", partition);

let metadata_load_time = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.with_type(MetricType::SUMMARY)
.subset_time("metadata_load_time", partition);

let files_ranges_pruned_statistics = MetricBuilder::new(metrics)
.with_type(MetricType::SUMMARY)
.counter("files_ranges_pruned_statistics", partition);

// -----------------------
// 'dev' level metrics
// -----------------------
let predicate_evaluation_errors = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.counter("predicate_evaluation_errors", partition);

let pushdown_rows_pruned = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.counter("pushdown_rows_pruned", partition);
Expand All @@ -129,24 +158,10 @@ impl ParquetFileMetrics {
.with_new_label("filename", filename.to_string())
.subset_time("bloom_filter_eval_time", partition);

let page_index_rows_pruned = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.counter("page_index_rows_pruned", partition);
let page_index_rows_matched = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.counter("page_index_rows_matched", partition);

let page_index_eval_time = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.subset_time("page_index_eval_time", partition);

let metadata_load_time = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.subset_time("metadata_load_time", partition);

let files_ranges_pruned_statistics = MetricBuilder::new(metrics)
.counter("files_ranges_pruned_statistics", partition);

let predicate_cache_inner_records = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.counter("predicate_cache_inner_records", partition);
Expand Down