Skip to content

Commit f4a49b5

Browse files
authored
feat(small): Set 'summary' level metrics for DataSourceExec with parquet source (#18196)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> Part of #18116 ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> The below configuration can be used to let `EXPLAIN ANALYZE` only show important high-level insights. ``` set datafusion.explain.analyze_level = summary; ``` This PR sets `summary` level metrics for the parquet data source: ### `summary` level metrics for `DataSourceExec` with `Parquet` source - File level pruning metrics - Row-group level pruning metrics - Bytes scanned - metadata load time In https://github.com/apache/datafusion/blob/155b56e521d75186776a65f1634ee03058899a79/datafusion/datasource-parquet/src/metrics.rs#L29 The remaining metrics are kept in the `dev` level. I'm not sure if the page level pruning metrics should also be included to the `summary` level, I'm open to suggestions for this, or any other metrics that should also be included. While implementing this, I came up with a few ideas to further improve metrics tracking in the Parquet scanner. I’ve documented them in #18195 ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> Set the above metrics to `summary` analyze level ## Are these changes tested? UTs <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? No <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent 987f333 commit f4a49b5

File tree

2 files changed

+87
-31
lines changed

2 files changed

+87
-31
lines changed

datafusion/core/tests/sql/explain_analyze.rs

Lines changed: 53 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -161,30 +161,71 @@ fn nanos_from_timestamp(ts: &Timestamp) -> i64 {
161161
}
162162

163163
// Test different detail level for config `datafusion.explain.analyze_level`
164+
165+
async fn collect_plan_with_context(
166+
sql_str: &str,
167+
ctx: &SessionContext,
168+
level: ExplainAnalyzeLevel,
169+
) -> String {
170+
{
171+
let state = ctx.state_ref();
172+
let mut state = state.write();
173+
state.config_mut().options_mut().explain.analyze_level = level;
174+
}
175+
let dataframe = ctx.sql(sql_str).await.unwrap();
176+
let batches = dataframe.collect().await.unwrap();
177+
arrow::util::pretty::pretty_format_batches(&batches)
178+
.unwrap()
179+
.to_string()
180+
}
181+
182+
async fn collect_plan(sql_str: &str, level: ExplainAnalyzeLevel) -> String {
183+
let ctx = SessionContext::new();
184+
collect_plan_with_context(sql_str, &ctx, level).await
185+
}
186+
164187
#[tokio::test]
165188
async fn explain_analyze_level() {
166-
async fn collect_plan(level: ExplainAnalyzeLevel) -> String {
167-
let mut config = SessionConfig::new();
168-
config.options_mut().explain.analyze_level = level;
169-
let ctx = SessionContext::new_with_config(config);
170-
let sql = "EXPLAIN ANALYZE \
189+
let sql = "EXPLAIN ANALYZE \
171190
SELECT * \
172191
FROM generate_series(10) as t1(v1) \
173192
ORDER BY v1 DESC";
174-
let dataframe = ctx.sql(sql).await.unwrap();
175-
let batches = dataframe.collect().await.unwrap();
176-
arrow::util::pretty::pretty_format_batches(&batches)
177-
.unwrap()
178-
.to_string()
179-
}
180193

181194
for (level, needle, should_contain) in [
182195
(ExplainAnalyzeLevel::Summary, "spill_count", false),
183196
(ExplainAnalyzeLevel::Summary, "output_rows", true),
184197
(ExplainAnalyzeLevel::Dev, "spill_count", true),
185198
(ExplainAnalyzeLevel::Dev, "output_rows", true),
186199
] {
187-
let plan = collect_plan(level).await;
200+
let plan = collect_plan(sql, level).await;
201+
assert_eq!(
202+
plan.contains(needle),
203+
should_contain,
204+
"plan for level {level:?} unexpected content: {plan}"
205+
);
206+
}
207+
}
208+
209+
#[tokio::test]
210+
async fn explain_analyze_level_datasource_parquet() {
211+
let table_name = "tpch_lineitem_small";
212+
let parquet_path = "tests/data/tpch_lineitem_small.parquet";
213+
let sql = format!("EXPLAIN ANALYZE SELECT * FROM {table_name}");
214+
215+
// Register test parquet file into context
216+
let ctx = SessionContext::new();
217+
ctx.register_parquet(table_name, parquet_path, ParquetReadOptions::default())
218+
.await
219+
.expect("register parquet table for explain analyze test");
220+
221+
for (level, needle, should_contain) in [
222+
(ExplainAnalyzeLevel::Summary, "metadata_load_time", true),
223+
(ExplainAnalyzeLevel::Summary, "page_index_eval_time", false),
224+
(ExplainAnalyzeLevel::Dev, "metadata_load_time", true),
225+
(ExplainAnalyzeLevel::Dev, "page_index_eval_time", true),
226+
] {
227+
let plan = collect_plan_with_context(&sql, &ctx, level).await;
228+
188229
assert_eq!(
189230
plan.contains(needle),
190231
should_contain,

datafusion/datasource-parquet/src/metrics.rs

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
// under the License.
1717

1818
use datafusion_physical_plan::metrics::{
19-
Count, ExecutionPlanMetricsSet, MetricBuilder, Time,
19+
Count, ExecutionPlanMetricsSet, MetricBuilder, MetricType, Time,
2020
};
2121

2222
/// Stores metrics about the parquet execution for a particular parquet file.
@@ -88,30 +88,59 @@ impl ParquetFileMetrics {
8888
filename: &str,
8989
metrics: &ExecutionPlanMetricsSet,
9090
) -> Self {
91-
let predicate_evaluation_errors = MetricBuilder::new(metrics)
92-
.with_new_label("filename", filename.to_string())
93-
.counter("predicate_evaluation_errors", partition);
94-
91+
// -----------------------
92+
// 'summary' level metrics
93+
// -----------------------
9594
let row_groups_matched_bloom_filter = MetricBuilder::new(metrics)
9695
.with_new_label("filename", filename.to_string())
96+
.with_type(MetricType::SUMMARY)
9797
.counter("row_groups_matched_bloom_filter", partition);
9898

9999
let row_groups_pruned_bloom_filter = MetricBuilder::new(metrics)
100100
.with_new_label("filename", filename.to_string())
101+
.with_type(MetricType::SUMMARY)
101102
.counter("row_groups_pruned_bloom_filter", partition);
102103

103104
let row_groups_matched_statistics = MetricBuilder::new(metrics)
104105
.with_new_label("filename", filename.to_string())
106+
.with_type(MetricType::SUMMARY)
105107
.counter("row_groups_matched_statistics", partition);
106108

107109
let row_groups_pruned_statistics = MetricBuilder::new(metrics)
108110
.with_new_label("filename", filename.to_string())
111+
.with_type(MetricType::SUMMARY)
109112
.counter("row_groups_pruned_statistics", partition);
110113

114+
let page_index_rows_pruned = MetricBuilder::new(metrics)
115+
.with_new_label("filename", filename.to_string())
116+
.with_type(MetricType::SUMMARY)
117+
.counter("page_index_rows_pruned", partition);
118+
let page_index_rows_matched = MetricBuilder::new(metrics)
119+
.with_new_label("filename", filename.to_string())
120+
.with_type(MetricType::SUMMARY)
121+
.counter("page_index_rows_matched", partition);
122+
111123
let bytes_scanned = MetricBuilder::new(metrics)
112124
.with_new_label("filename", filename.to_string())
125+
.with_type(MetricType::SUMMARY)
113126
.counter("bytes_scanned", partition);
114127

128+
let metadata_load_time = MetricBuilder::new(metrics)
129+
.with_new_label("filename", filename.to_string())
130+
.with_type(MetricType::SUMMARY)
131+
.subset_time("metadata_load_time", partition);
132+
133+
let files_ranges_pruned_statistics = MetricBuilder::new(metrics)
134+
.with_type(MetricType::SUMMARY)
135+
.counter("files_ranges_pruned_statistics", partition);
136+
137+
// -----------------------
138+
// 'dev' level metrics
139+
// -----------------------
140+
let predicate_evaluation_errors = MetricBuilder::new(metrics)
141+
.with_new_label("filename", filename.to_string())
142+
.counter("predicate_evaluation_errors", partition);
143+
115144
let pushdown_rows_pruned = MetricBuilder::new(metrics)
116145
.with_new_label("filename", filename.to_string())
117146
.counter("pushdown_rows_pruned", partition);
@@ -129,24 +158,10 @@ impl ParquetFileMetrics {
129158
.with_new_label("filename", filename.to_string())
130159
.subset_time("bloom_filter_eval_time", partition);
131160

132-
let page_index_rows_pruned = MetricBuilder::new(metrics)
133-
.with_new_label("filename", filename.to_string())
134-
.counter("page_index_rows_pruned", partition);
135-
let page_index_rows_matched = MetricBuilder::new(metrics)
136-
.with_new_label("filename", filename.to_string())
137-
.counter("page_index_rows_matched", partition);
138-
139161
let page_index_eval_time = MetricBuilder::new(metrics)
140162
.with_new_label("filename", filename.to_string())
141163
.subset_time("page_index_eval_time", partition);
142164

143-
let metadata_load_time = MetricBuilder::new(metrics)
144-
.with_new_label("filename", filename.to_string())
145-
.subset_time("metadata_load_time", partition);
146-
147-
let files_ranges_pruned_statistics = MetricBuilder::new(metrics)
148-
.counter("files_ranges_pruned_statistics", partition);
149-
150165
let predicate_cache_inner_records = MetricBuilder::new(metrics)
151166
.with_new_label("filename", filename.to_string())
152167
.counter("predicate_cache_inner_records", partition);

0 commit comments

Comments
 (0)