Skip to content

Commit 6514ec7

Browse files
authored
Adds Partitioned CSV test to object store access tests (#18370)
## Which issue does this PR close? N/A -- This PR is a supporting effort to: - #18146 - #17211 ## Rationale for this change Adding these tests not only improves test coverage/expected output validation, but also gives us a common way to test and talk about object store access for specific query scenarios. ## What changes are included in this PR? - Adds a new test to the object store access integration tests that selects all rows from a set of CSV files under a hive partitioned directory structure - Adds new test harness method to build a partitioned ListingTable backed by CSV data - Adds a new helper method to build a partitioned csv data and register the table ## Are these changes tested? The changes are tests! ## Are there any user-facing changes? No cc @alamb
1 parent 9435513 commit 6514ec7

File tree

1 file changed

+208
-3
lines changed

1 file changed

+208
-3
lines changed

datafusion/core/tests/datasource/object_store_access.rs

Lines changed: 208 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ use arrow::array::{ArrayRef, Int32Array, RecordBatch};
2828
use async_trait::async_trait;
2929
use bytes::Bytes;
3030
use datafusion::prelude::{CsvReadOptions, ParquetReadOptions, SessionContext};
31+
use datafusion_catalog_listing::{ListingOptions, ListingTable, ListingTableConfig};
32+
use datafusion_datasource::ListingTableUrl;
33+
use datafusion_datasource_csv::CsvFormat;
3134
use futures::stream::BoxStream;
3235
use insta::assert_snapshot;
3336
use object_store::memory::InMemory;
@@ -123,6 +126,163 @@ async fn query_multi_csv_file() {
123126
);
124127
}
125128

129+
#[tokio::test]
130+
async fn query_partitioned_csv_file() {
131+
let test = Test::new().with_partitioned_csv().await;
132+
assert_snapshot!(
133+
test.query("select * from csv_table_partitioned").await,
134+
@r"
135+
------- Query Output (6 rows) -------
136+
+---------+-------+-------+---+----+-----+
137+
| d1 | d2 | d3 | a | b | c |
138+
+---------+-------+-------+---+----+-----+
139+
| 0.00001 | 1e-12 | true | 1 | 10 | 100 |
140+
| 0.00003 | 5e-12 | false | 1 | 10 | 100 |
141+
| 0.00002 | 2e-12 | true | 2 | 20 | 200 |
142+
| 0.00003 | 5e-12 | false | 2 | 20 | 200 |
143+
| 0.00003 | 3e-12 | true | 3 | 30 | 300 |
144+
| 0.00003 | 5e-12 | false | 3 | 30 | 300 |
145+
+---------+-------+-------+---+----+-----+
146+
------- Object Store Request Summary -------
147+
RequestCountingObjectStore()
148+
Total Requests: 13
149+
- LIST (with delimiter) prefix=data
150+
- LIST (with delimiter) prefix=data/a=1
151+
- LIST (with delimiter) prefix=data/a=2
152+
- LIST (with delimiter) prefix=data/a=3
153+
- LIST (with delimiter) prefix=data/a=1/b=10
154+
- LIST (with delimiter) prefix=data/a=2/b=20
155+
- LIST (with delimiter) prefix=data/a=3/b=30
156+
- LIST (with delimiter) prefix=data/a=1/b=10/c=100
157+
- LIST (with delimiter) prefix=data/a=2/b=20/c=200
158+
- LIST (with delimiter) prefix=data/a=3/b=30/c=300
159+
- GET (opts) path=data/a=1/b=10/c=100/file_1.csv
160+
- GET (opts) path=data/a=2/b=20/c=200/file_2.csv
161+
- GET (opts) path=data/a=3/b=30/c=300/file_3.csv
162+
"
163+
);
164+
165+
assert_snapshot!(
166+
test.query("select * from csv_table_partitioned WHERE a=2").await,
167+
@r"
168+
------- Query Output (2 rows) -------
169+
+---------+-------+-------+---+----+-----+
170+
| d1 | d2 | d3 | a | b | c |
171+
+---------+-------+-------+---+----+-----+
172+
| 0.00002 | 2e-12 | true | 2 | 20 | 200 |
173+
| 0.00003 | 5e-12 | false | 2 | 20 | 200 |
174+
+---------+-------+-------+---+----+-----+
175+
------- Object Store Request Summary -------
176+
RequestCountingObjectStore()
177+
Total Requests: 4
178+
- LIST (with delimiter) prefix=data/a=2
179+
- LIST (with delimiter) prefix=data/a=2/b=20
180+
- LIST (with delimiter) prefix=data/a=2/b=20/c=200
181+
- GET (opts) path=data/a=2/b=20/c=200/file_2.csv
182+
"
183+
);
184+
185+
assert_snapshot!(
186+
test.query("select * from csv_table_partitioned WHERE b=20").await,
187+
@r"
188+
------- Query Output (2 rows) -------
189+
+---------+-------+-------+---+----+-----+
190+
| d1 | d2 | d3 | a | b | c |
191+
+---------+-------+-------+---+----+-----+
192+
| 0.00002 | 2e-12 | true | 2 | 20 | 200 |
193+
| 0.00003 | 5e-12 | false | 2 | 20 | 200 |
194+
+---------+-------+-------+---+----+-----+
195+
------- Object Store Request Summary -------
196+
RequestCountingObjectStore()
197+
Total Requests: 11
198+
- LIST (with delimiter) prefix=data
199+
- LIST (with delimiter) prefix=data/a=1
200+
- LIST (with delimiter) prefix=data/a=2
201+
- LIST (with delimiter) prefix=data/a=3
202+
- LIST (with delimiter) prefix=data/a=1/b=10
203+
- LIST (with delimiter) prefix=data/a=2/b=20
204+
- LIST (with delimiter) prefix=data/a=3/b=30
205+
- LIST (with delimiter) prefix=data/a=1/b=10/c=100
206+
- LIST (with delimiter) prefix=data/a=2/b=20/c=200
207+
- LIST (with delimiter) prefix=data/a=3/b=30/c=300
208+
- GET (opts) path=data/a=2/b=20/c=200/file_2.csv
209+
"
210+
);
211+
212+
assert_snapshot!(
213+
test.query("select * from csv_table_partitioned WHERE c=200").await,
214+
@r"
215+
------- Query Output (2 rows) -------
216+
+---------+-------+-------+---+----+-----+
217+
| d1 | d2 | d3 | a | b | c |
218+
+---------+-------+-------+---+----+-----+
219+
| 0.00002 | 2e-12 | true | 2 | 20 | 200 |
220+
| 0.00003 | 5e-12 | false | 2 | 20 | 200 |
221+
+---------+-------+-------+---+----+-----+
222+
------- Object Store Request Summary -------
223+
RequestCountingObjectStore()
224+
Total Requests: 11
225+
- LIST (with delimiter) prefix=data
226+
- LIST (with delimiter) prefix=data/a=1
227+
- LIST (with delimiter) prefix=data/a=2
228+
- LIST (with delimiter) prefix=data/a=3
229+
- LIST (with delimiter) prefix=data/a=1/b=10
230+
- LIST (with delimiter) prefix=data/a=2/b=20
231+
- LIST (with delimiter) prefix=data/a=3/b=30
232+
- LIST (with delimiter) prefix=data/a=1/b=10/c=100
233+
- LIST (with delimiter) prefix=data/a=2/b=20/c=200
234+
- LIST (with delimiter) prefix=data/a=3/b=30/c=300
235+
- GET (opts) path=data/a=2/b=20/c=200/file_2.csv
236+
"
237+
);
238+
239+
assert_snapshot!(
240+
test.query("select * from csv_table_partitioned WHERE a=2 AND b=20").await,
241+
@r"
242+
------- Query Output (2 rows) -------
243+
+---------+-------+-------+---+----+-----+
244+
| d1 | d2 | d3 | a | b | c |
245+
+---------+-------+-------+---+----+-----+
246+
| 0.00002 | 2e-12 | true | 2 | 20 | 200 |
247+
| 0.00003 | 5e-12 | false | 2 | 20 | 200 |
248+
+---------+-------+-------+---+----+-----+
249+
------- Object Store Request Summary -------
250+
RequestCountingObjectStore()
251+
Total Requests: 3
252+
- LIST (with delimiter) prefix=data/a=2/b=20
253+
- LIST (with delimiter) prefix=data/a=2/b=20/c=200
254+
- GET (opts) path=data/a=2/b=20/c=200/file_2.csv
255+
"
256+
);
257+
258+
assert_snapshot!(
259+
test.query("select * from csv_table_partitioned WHERE a<2 AND b=10 AND c=100").await,
260+
@r"
261+
------- Query Output (2 rows) -------
262+
+---------+-------+-------+---+----+-----+
263+
| d1 | d2 | d3 | a | b | c |
264+
+---------+-------+-------+---+----+-----+
265+
| 0.00001 | 1e-12 | true | 1 | 10 | 100 |
266+
| 0.00003 | 5e-12 | false | 1 | 10 | 100 |
267+
+---------+-------+-------+---+----+-----+
268+
------- Object Store Request Summary -------
269+
RequestCountingObjectStore()
270+
Total Requests: 11
271+
- LIST (with delimiter) prefix=data
272+
- LIST (with delimiter) prefix=data/a=1
273+
- LIST (with delimiter) prefix=data/a=2
274+
- LIST (with delimiter) prefix=data/a=3
275+
- LIST (with delimiter) prefix=data/a=1/b=10
276+
- LIST (with delimiter) prefix=data/a=2/b=20
277+
- LIST (with delimiter) prefix=data/a=3/b=30
278+
- LIST (with delimiter) prefix=data/a=1/b=10/c=100
279+
- LIST (with delimiter) prefix=data/a=2/b=20/c=200
280+
- LIST (with delimiter) prefix=data/a=3/b=30/c=300
281+
- GET (opts) path=data/a=1/b=10/c=100/file_1.csv
282+
"
283+
);
284+
}
285+
126286
#[tokio::test]
127287
async fn create_single_parquet_file_default() {
128288
// The default metadata size hint is 512KB
@@ -363,7 +523,7 @@ impl Test {
363523
self
364524
}
365525

366-
/// Register a CSV file at the given path relative to the [`datafusion_test_data`] directory
526+
/// Register a CSV file at the given path
367527
async fn register_csv(self, table_name: &str, path: &str) -> Self {
368528
let mut options = CsvReadOptions::new();
369529
options.has_header = true;
@@ -375,8 +535,30 @@ impl Test {
375535
self
376536
}
377537

378-
/// Register a Parquet file at the given path relative to the
379-
/// [`datafusion_test_data`] directory
538+
/// Register a partitioned CSV table at the given path
539+
async fn register_partitioned_csv(self, table_name: &str, path: &str) -> Self {
540+
let file_format = Arc::new(CsvFormat::default().with_has_header(true));
541+
let options = ListingOptions::new(file_format);
542+
543+
let url = format!("mem://{path}").parse().unwrap();
544+
let table_url = ListingTableUrl::try_new(url, None).unwrap();
545+
546+
let session_state = self.session_context.state();
547+
let mut config = ListingTableConfig::new(table_url).with_listing_options(options);
548+
config = config
549+
.infer_partitions_from_path(&session_state)
550+
.await
551+
.unwrap();
552+
config = config.infer_schema(&session_state).await.unwrap();
553+
554+
let table = Arc::new(ListingTable::try_new(config).unwrap());
555+
self.session_context
556+
.register_table(table_name, table)
557+
.unwrap();
558+
self
559+
}
560+
561+
/// Register a Parquet file at the given path
380562
async fn register_parquet(self, table_name: &str, path: &str) -> Self {
381563
let path = format!("mem://{path}");
382564
let mut options: ParquetReadOptions<'_> = ParquetReadOptions::new();
@@ -425,6 +607,29 @@ impl Test {
425607
self.register_csv("csv_table", "/data/").await
426608
}
427609

610+
/// Register three CSV files in a partitioned directory structure, called
611+
/// `csv_table_partitioned`
612+
async fn with_partitioned_csv(mut self) -> Test {
613+
for i in 1..4 {
614+
// upload CSV data to object store
615+
let csv_data1 = format!(
616+
r#"d1,d2,d3
617+
0.0000{i},{i}e-12,true
618+
0.00003,5e-12,false
619+
"#
620+
);
621+
self = self
622+
.with_bytes(
623+
&format!("/data/a={i}/b={}/c={}/file_{i}.csv", i * 10, i * 100,),
624+
csv_data1,
625+
)
626+
.await;
627+
}
628+
// register table
629+
self.register_partitioned_csv("csv_table_partitioned", "/data/")
630+
.await
631+
}
632+
428633
/// Add a single parquet file that has two columns and two row groups named `parquet_table`
429634
///
430635
/// Column "a": Int32 with values 0-100] in row group 1

0 commit comments

Comments
 (0)