Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 104 additions & 8 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ mod tests {
use futures::stream::BoxStream;
use futures::StreamExt;
use insta::assert_snapshot;
use object_store::chunked::ChunkedStore;
use object_store::local::LocalFileSystem;
use object_store::path::Path;
use object_store::{
Expand Down Expand Up @@ -104,6 +105,14 @@ mod tests {
}

async fn get(&self, location: &Path) -> object_store::Result<GetResult> {
self.get_opts(location, GetOptions::default()).await
}

async fn get_opts(
&self,
location: &Path,
_opts: GetOptions,
) -> object_store::Result<GetResult> {
let bytes = self.bytes_to_repeat.clone();
let len = bytes.len() as u64;
let range = 0..len * self.max_iterations;
Expand All @@ -130,14 +139,6 @@ mod tests {
})
}

async fn get_opts(
&self,
_location: &Path,
_opts: GetOptions,
) -> object_store::Result<GetResult> {
unimplemented!()
}

async fn get_ranges(
&self,
_location: &Path,
Expand Down Expand Up @@ -470,6 +471,59 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_infer_schema_stream_null_chunks() -> Result<()> {
let session_ctx = SessionContext::new();
let state = session_ctx.state();

// a stream where each line is read as a separate chunk,
// data type for each chunk is inferred separately.
// +----+-----+----+
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you for these comments

// | c1 | c2 | c3 |
// +----+-----+----+
// | 1 | 1.0 | | type: Int64, Float64, Null
// | | | | type: Null, Null, Null
// +----+-----+----+
let chunked_object_store = Arc::new(ChunkedStore::new(
Arc::new(VariableStream::new(
Bytes::from(
r#"c1,c2,c3
1,1.0,
,,
"#,
),
1,
)),
1,
));
let object_meta = ObjectMeta {
location: Path::parse("/")?,
last_modified: DateTime::default(),
size: u64::MAX,
e_tag: None,
version: None,
};

let csv_format = CsvFormat::default().with_has_header(true);
let inferred_schema = csv_format
.infer_schema(
&state,
&(chunked_object_store as Arc<dyn ObjectStore>),
&[object_meta],
)
.await?;

let actual_fields: Vec<_> = inferred_schema
.fields()
.iter()
.map(|f| format!("{}: {:?}", f.name(), f.data_type()))
.collect();

// ensure null chunks don't skew type inference
assert_eq!(vec!["c1: Int64", "c2: Float64", "c3: Null"], actual_fields);
Ok(())
}

#[rstest(
file_compression_type,
case(FileCompressionType::UNCOMPRESSED),
Expand Down Expand Up @@ -822,6 +876,48 @@ mod tests {
Ok(())
}

/// Read multiple csv files (some are empty) with header
///
/// some_empty_with_header
/// ├── a_empty.csv
/// ├── b.csv
/// └── c_nulls_column.csv
///
/// a_empty.csv:
/// c1,c2,c3
///
/// b.csv:
/// c1,c2,c3
/// 1,1,1
/// 2,2,2
///
/// c_nulls_column.csv:
/// c1,c2,c3
/// 3,3,
#[tokio::test]
async fn test_csv_some_empty_with_header() -> Result<()> {
let ctx = SessionContext::new();
ctx.register_csv(
"some_empty_with_header",
"tests/data/empty_files/some_empty_with_header",
CsvReadOptions::new().has_header(true),
)
.await?;

let query = "select sum(c3) from some_empty_with_header;";
let query_result = ctx.sql(query).await?.collect().await?;

assert_snapshot!(batches_to_string(&query_result),@r"
+--------------------------------+
| sum(some_empty_with_header.c3) |
+--------------------------------+
| 3 |
+--------------------------------+
");

Ok(())
}

#[tokio::test]
async fn test_csv_extension_compressed() -> Result<()> {
// Write compressed CSV files
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
c1,c2,c3
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
c1,c2,c3
1,1,1
2,2,2
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
c1,c2,c3
3,3,
14 changes: 11 additions & 3 deletions datafusion/datasource-csv/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -582,20 +582,28 @@ impl CsvFormat {
}
}

let schema = build_schema_helper(column_names, &column_type_possibilities);
let schema = build_schema_helper(column_names, column_type_possibilities);
Ok((schema, total_records_read))
}
}

fn build_schema_helper(names: Vec<String>, types: &[HashSet<DataType>]) -> Schema {
fn build_schema_helper(names: Vec<String>, types: Vec<HashSet<DataType>>) -> Schema {
let fields = names
.into_iter()
.zip(types)
.map(|(field_name, data_type_possibilities)| {
.map(|(field_name, mut data_type_possibilities)| {
// ripped from arrow::csv::reader::infer_reader_schema_with_csv_options
// determine data type based on possible types
// if there are incompatible types, use DataType::Utf8

// ignore nulls, to avoid conflicting datatypes (e.g. [nulls, int]) being inferred as Utf8.
data_type_possibilities.remove(&DataType::Null);

match data_type_possibilities.len() {
// Return Null for columns with only nulls / empty files
// This allows schema merging to work when reading folders
// such files along with normal files.
0 => Field::new(field_name, DataType::Null, true),
1 => Field::new(
field_name,
data_type_possibilities.iter().next().unwrap().clone(),
Expand Down
6 changes: 3 additions & 3 deletions datafusion/sqllogictest/test_files/ddl.slt
Original file line number Diff line number Diff line change
Expand Up @@ -707,9 +707,9 @@ CREATE EXTERNAL TABLE empty STORED AS CSV LOCATION '../core/tests/data/empty.csv
query TTI
select column_name, data_type, ordinal_position from information_schema.columns where table_name='empty';;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this makes sense to me

----
c1 Utf8 0
c2 Utf8 1
c3 Utf8 2
c1 Null 0
c2 Null 1
c3 Null 2


## should allow any type of exprs as values
Expand Down