From 8e8161d78cb44addd6b627a6eae9fd7313453afd Mon Sep 17 00:00:00 2001 From: Khanh Duong Date: Fri, 26 Sep 2025 07:52:17 +0900 Subject: [PATCH 1/7] fix: ignore `DataType::Null` in possible types during csv type inference --- .../core/src/datasource/file_format/csv.rs | 58 ++++++++++++++++--- datafusion/datasource-csv/src/file_format.rs | 20 +++++-- 2 files changed, 64 insertions(+), 14 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index edbbea97a10e..2c0d05f5bb63 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -60,6 +60,7 @@ mod tests { use futures::stream::BoxStream; use futures::StreamExt; use insta::assert_snapshot; + use object_store::chunked::ChunkedStore; use object_store::local::LocalFileSystem; use object_store::path::Path; use object_store::{ @@ -104,6 +105,14 @@ mod tests { } async fn get(&self, location: &Path) -> object_store::Result { + self.get_opts(location, GetOptions::default()).await + } + + async fn get_opts( + &self, + location: &Path, + _opts: GetOptions, + ) -> object_store::Result { let bytes = self.bytes_to_repeat.clone(); let len = bytes.len() as u64; let range = 0..len * self.max_iterations; @@ -130,14 +139,6 @@ mod tests { }) } - async fn get_opts( - &self, - _location: &Path, - _opts: GetOptions, - ) -> object_store::Result { - unimplemented!() - } - async fn get_ranges( &self, _location: &Path, @@ -470,6 +471,47 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_infer_schema_stream_separated_chunks_with_nulls() -> Result<()> { + let session_ctx = SessionContext::new(); + let state = session_ctx.state(); + let variable_object_store = Arc::new(VariableStream::new( + Bytes::from( + r#"c1,c2 +1,1.0 +, +"#, + ), + 1, + )); + let chunked_object_store = Arc::new(ChunkedStore::new(variable_object_store, 1)); + let object_meta = ObjectMeta { + location: Path::parse("/")?, + last_modified: DateTime::default(), + size: u64::MAX, + e_tag: None, + version: None, + }; + + let csv_format = CsvFormat::default().with_has_header(true); + let inferred_schema = csv_format + .infer_schema( + &state, + &(chunked_object_store as Arc), + &[object_meta], + ) + .await?; + + let actual_fields: Vec<_> = inferred_schema + .fields() + .iter() + .map(|f| format!("{}: {:?}", f.name(), f.data_type())) + .collect(); + + assert_eq!(vec!["c1: Int64", "c2: Float64"], actual_fields); + Ok(()) + } + #[rstest( file_compression_type, case(FileCompressionType::UNCOMPRESSED), diff --git a/datafusion/datasource-csv/src/file_format.rs b/datafusion/datasource-csv/src/file_format.rs index e09ac3af7c66..96e0fe481805 100644 --- a/datafusion/datasource-csv/src/file_format.rs +++ b/datafusion/datasource-csv/src/file_format.rs @@ -593,15 +593,23 @@ fn build_schema_helper(names: Vec, types: &[HashSet]) -> Schem .zip(types) .map(|(field_name, data_type_possibilities)| { // ripped from arrow::csv::reader::infer_reader_schema_with_csv_options - // determine data type based on possible types - // if there are incompatible types, use DataType::Utf8 - match data_type_possibilities.len() { - 1 => Field::new( + // determine data type based on possible types, ignoring DataType::Null, + // if there are incompatible types, use DataType::Utf8. + match ( + data_type_possibilities.contains(&DataType::Null), + data_type_possibilities.len(), + ) { + (true, 1) => Field::new(field_name, DataType::Null, true), + (false, 1) | (true, 2) => Field::new( field_name, - data_type_possibilities.iter().next().unwrap().clone(), + data_type_possibilities + .iter() + .find(|&d| d != &DataType::Null) + .unwrap() + .clone(), true, ), - 2 => { + (false, 2) | (true, 3) => { if data_type_possibilities.contains(&DataType::Int64) && data_type_possibilities.contains(&DataType::Float64) { From 0f0af727d9770260462607663687350ddd959824 Mon Sep 17 00:00:00 2001 From: Khanh Duong Date: Sat, 27 Sep 2025 07:58:29 +0900 Subject: [PATCH 2/7] refactor: apply suggession, simplify inferring csv data types --- datafusion/datasource-csv/src/file_format.rs | 31 +++++++++----------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/datafusion/datasource-csv/src/file_format.rs b/datafusion/datasource-csv/src/file_format.rs index 96e0fe481805..c2fe93e19d21 100644 --- a/datafusion/datasource-csv/src/file_format.rs +++ b/datafusion/datasource-csv/src/file_format.rs @@ -582,34 +582,31 @@ impl CsvFormat { } } - let schema = build_schema_helper(column_names, &column_type_possibilities); + let schema = build_schema_helper(column_names, column_type_possibilities); Ok((schema, total_records_read)) } } -fn build_schema_helper(names: Vec, types: &[HashSet]) -> Schema { +fn build_schema_helper(names: Vec, types: Vec>) -> Schema { let fields = names .into_iter() .zip(types) - .map(|(field_name, data_type_possibilities)| { + .map(|(field_name, mut data_type_possibilities)| { // ripped from arrow::csv::reader::infer_reader_schema_with_csv_options - // determine data type based on possible types, ignoring DataType::Null, - // if there are incompatible types, use DataType::Utf8. - match ( - data_type_possibilities.contains(&DataType::Null), - data_type_possibilities.len(), - ) { - (true, 1) => Field::new(field_name, DataType::Null, true), - (false, 1) | (true, 2) => Field::new( + // determine data type based on possible types + // if there are incompatible types, use DataType::Utf8 + + // ignore nulls, to avoid conflicting datatypes (e.g. [nulls, int]) being inferred as Utf8. + data_type_possibilities.remove(&DataType::Null); + + match data_type_possibilities.len() { + 0 => Field::new(field_name, DataType::Null, true), + 1 => Field::new( field_name, - data_type_possibilities - .iter() - .find(|&d| d != &DataType::Null) - .unwrap() - .clone(), + data_type_possibilities.iter().next().unwrap().clone(), true, ), - (false, 2) | (true, 3) => { + 2 => { if data_type_possibilities.contains(&DataType::Int64) && data_type_possibilities.contains(&DataType::Float64) { From d6047901382fd31ca78be2cd12db5629c7d6139e Mon Sep 17 00:00:00 2001 From: Khanh Duong Date: Sat, 27 Sep 2025 08:18:49 +0900 Subject: [PATCH 3/7] docs: add comment to testcase --- .../core/src/datasource/file_format/csv.rs | 30 +++++++++++++------ 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 2c0d05f5bb63..36de98f3e017 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -472,19 +472,30 @@ mod tests { } #[tokio::test] - async fn test_infer_schema_stream_separated_chunks_with_nulls() -> Result<()> { + async fn test_infer_schema_stream_null_chunks() -> Result<()> { let session_ctx = SessionContext::new(); let state = session_ctx.state(); - let variable_object_store = Arc::new(VariableStream::new( - Bytes::from( - r#"c1,c2 -1,1.0 -, + + // a stream where each line is read as a separate chunk, + // data type for each chunk is inferred separately. + // +----+-----+----+ + // | c1 | c2 | c3 | + // +----+-----+----+ + // | 1 | 1.0 | | type: Int64, Float64, Null + // | | | | type: Null, Null, Null + // +----+-----+----+ + let chunked_object_store = Arc::new(ChunkedStore::new( + Arc::new(VariableStream::new( + Bytes::from( + r#"c1,c2,c3 +1,1.0, +,, "#, - ), + ), + 1, + )), 1, )); - let chunked_object_store = Arc::new(ChunkedStore::new(variable_object_store, 1)); let object_meta = ObjectMeta { location: Path::parse("/")?, last_modified: DateTime::default(), @@ -508,7 +519,8 @@ mod tests { .map(|f| format!("{}: {:?}", f.name(), f.data_type())) .collect(); - assert_eq!(vec!["c1: Int64", "c2: Float64"], actual_fields); + // ensure null chunks don't skew type inference + assert_eq!(vec!["c1: Int64", "c2: Float64", "c3: Null"], actual_fields); Ok(()) } From 9523450d0b8dafdf2184f8cda53acca8c40147c8 Mon Sep 17 00:00:00 2001 From: Khanh Duong Date: Mon, 29 Sep 2025 21:48:43 +0900 Subject: [PATCH 4/7] chore: update test data type for empty table --- datafusion/sqllogictest/test_files/ddl.slt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/sqllogictest/test_files/ddl.slt b/datafusion/sqllogictest/test_files/ddl.slt index f755ab3f356c..03ef08e1a5f8 100644 --- a/datafusion/sqllogictest/test_files/ddl.slt +++ b/datafusion/sqllogictest/test_files/ddl.slt @@ -707,9 +707,9 @@ CREATE EXTERNAL TABLE empty STORED AS CSV LOCATION '../core/tests/data/empty.csv query TTI select column_name, data_type, ordinal_position from information_schema.columns where table_name='empty';; ---- -c1 Utf8 0 -c2 Utf8 1 -c3 Utf8 2 +c1 Null 0 +c2 Null 1 +c3 Null 2 ## should allow any type of exprs as values From 45e10279e740bc6b5cdbdadff35909c6cbc093ce Mon Sep 17 00:00:00 2001 From: Khanh Duong Date: Mon, 29 Sep 2025 21:51:35 +0900 Subject: [PATCH 5/7] test: folder contains empty csv files (with header) and normal files --- .../core/src/datasource/file_format/csv.rs | 42 +++++++++++++++++++ .../some_empty_with_header/a_empty.csv | 1 + .../empty_files/some_empty_with_header/b.csv | 3 ++ .../some_empty_with_header/c_nulls_column.csv | 2 + 4 files changed, 48 insertions(+) create mode 100644 datafusion/core/tests/data/empty_files/some_empty_with_header/a_empty.csv create mode 100644 datafusion/core/tests/data/empty_files/some_empty_with_header/b.csv create mode 100644 datafusion/core/tests/data/empty_files/some_empty_with_header/c_nulls_column.csv diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 36de98f3e017..52fb8ae904eb 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -876,6 +876,48 @@ mod tests { Ok(()) } + /// Read multiple csv files (some are empty) with header + /// + /// some_empty_with_header + /// ├── a_empty.csv + /// ├── b.csv + /// └── c_nulls_column.csv + /// + /// a_empty.csv: + /// c1,c2,c3 + /// + /// b.csv: + /// c1,c2,c3 + /// 1,1,1 + /// 2,2,2 + /// + /// c_nulls_column.csv: + /// c1,c2,c3 + /// 3,3, + #[tokio::test] + async fn test_csv_some_empty_with_header() -> Result<()> { + let ctx = SessionContext::new(); + ctx.register_csv( + "some_empty_with_header", + "tests/data/empty_files/some_empty_with_header", + CsvReadOptions::new().has_header(true), + ) + .await?; + + let query = "select sum(c3) from some_empty_with_header;"; + let query_result = ctx.sql(query).await?.collect().await?; + + assert_snapshot!(batches_to_string(&query_result),@r" + +--------------------------------+ + | sum(some_empty_with_header.c3) | + +--------------------------------+ + | 3 | + +--------------------------------+ + "); + + Ok(()) + } + #[tokio::test] async fn test_csv_extension_compressed() -> Result<()> { // Write compressed CSV files diff --git a/datafusion/core/tests/data/empty_files/some_empty_with_header/a_empty.csv b/datafusion/core/tests/data/empty_files/some_empty_with_header/a_empty.csv new file mode 100644 index 000000000000..f1968a0906d0 --- /dev/null +++ b/datafusion/core/tests/data/empty_files/some_empty_with_header/a_empty.csv @@ -0,0 +1 @@ +c1,c2,c3 diff --git a/datafusion/core/tests/data/empty_files/some_empty_with_header/b.csv b/datafusion/core/tests/data/empty_files/some_empty_with_header/b.csv new file mode 100644 index 000000000000..ff596071444c --- /dev/null +++ b/datafusion/core/tests/data/empty_files/some_empty_with_header/b.csv @@ -0,0 +1,3 @@ +c1,c2,c3 +1,1,1 +2,2,2 diff --git a/datafusion/core/tests/data/empty_files/some_empty_with_header/c_nulls_column.csv b/datafusion/core/tests/data/empty_files/some_empty_with_header/c_nulls_column.csv new file mode 100644 index 000000000000..bf86844cb029 --- /dev/null +++ b/datafusion/core/tests/data/empty_files/some_empty_with_header/c_nulls_column.csv @@ -0,0 +1,2 @@ +c1,c2,c3 +3,3, From fb55dd4da4f39ce37c7359e82eb4018539e8a44a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 29 Sep 2025 10:01:14 -0400 Subject: [PATCH 6/7] Update datafusion/datasource-csv/src/file_format.rs --- datafusion/datasource-csv/src/file_format.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/datafusion/datasource-csv/src/file_format.rs b/datafusion/datasource-csv/src/file_format.rs index c2fe93e19d21..c65003bd7a49 100644 --- a/datafusion/datasource-csv/src/file_format.rs +++ b/datafusion/datasource-csv/src/file_format.rs @@ -600,6 +600,9 @@ fn build_schema_helper(names: Vec, types: Vec>) -> Sch data_type_possibilities.remove(&DataType::Null); match data_type_possibilities.len() { + // Return Null for columns with only nulls / empty files + // This allows schema merging to work when reading folders + // such files along with normal files. 0 => Field::new(field_name, DataType::Null, true), 1 => Field::new( field_name, From fa1dfcb1c45af8acdc1328800d9c210591f4f640 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 29 Sep 2025 10:01:29 -0400 Subject: [PATCH 7/7] fmt --- datafusion/datasource-csv/src/file_format.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/datasource-csv/src/file_format.rs b/datafusion/datasource-csv/src/file_format.rs index c65003bd7a49..21065f918a3c 100644 --- a/datafusion/datasource-csv/src/file_format.rs +++ b/datafusion/datasource-csv/src/file_format.rs @@ -601,7 +601,7 @@ fn build_schema_helper(names: Vec, types: Vec>) -> Sch match data_type_possibilities.len() { // Return Null for columns with only nulls / empty files - // This allows schema merging to work when reading folders + // This allows schema merging to work when reading folders // such files along with normal files. 0 => Field::new(field_name, DataType::Null, true), 1 => Field::new(