diff --git a/Cargo.lock b/Cargo.lock index 2cfdd82851d67..facd4440be05d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2156,8 +2156,9 @@ dependencies = [ "datafusion-session", "futures", "object_store", - "serde_json", "tokio", + "tokio-stream", + "tokio-util", ] [[package]] @@ -6469,6 +6470,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] @@ -6479,6 +6481,7 @@ checksum = "14307c986784f72ef81c89db7d9e28d6ac26d16213b109ea501696195e6e3ce5" dependencies = [ "bytes", "futures-core", + "futures-io", "futures-sink", "pin-project-lite", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 36198430e40b1..f57ec6fc7dc16 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -181,6 +181,8 @@ tempfile = "3" testcontainers = { version = "0.25.2", features = ["default"] } testcontainers-modules = { version = "0.13" } tokio = { version = "1.48", features = ["macros", "rt", "sync"] } +tokio-stream = "0.1" +tokio-util = "0.7" url = "2.5.7" [workspace.lints.clippy] diff --git a/datafusion-examples/examples/csv_json_opener.rs b/datafusion-examples/examples/csv_json_opener.rs index e7b809c82ed3d..8a24c2c6e1495 100644 --- a/datafusion-examples/examples/csv_json_opener.rs +++ b/datafusion-examples/examples/csv_json_opener.rs @@ -121,7 +121,7 @@ async fn json_opener() -> Result<()> { projected, FileCompressionType::UNCOMPRESSED, Arc::new(object_store), - false, + true, ); let scan_config = FileScanConfigBuilder::new( diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 68022c2f06fe9..19254e9436814 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -2813,23 +2813,22 @@ config_namespace! { pub struct JsonOptions { pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED pub schema_infer_max_rec: Option, default = None - pub compression_level: Option, default = None - /// The format of JSON input files. - /// - /// When `false` (default), expects newline-delimited JSON (NDJSON): - /// ```text - /// {"key1": 1, "key2": "val"} - /// {"key1": 2, "key2": "vals"} - /// ``` - /// - /// When `true`, expects JSON array format: - /// ```text - /// [ - /// {"key1": 1, "key2": "val"}, - /// {"key1": 2, "key2": "vals"} - /// ] - /// ``` - pub format_array: bool, default = false + /// The JSON format to use when reading files. + /// + /// When `true` (default), expects newline-delimited JSON (NDJSON): + /// ```text + /// {"key1": 1, "key2": "val"} + /// {"key1": 2, "key2": "vals"} + /// ``` + /// + /// When `false`, expects JSON array format: + /// ```text + /// [ + /// {"key1": 1, "key2": "val"}, + /// {"key1": 2, "key2": "vals"} + /// ] + /// ``` + pub newline_delimited: bool, default = true } } diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index aa378d42622d6..a81a74fffa999 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -479,7 +479,7 @@ impl DataFrame { /// # #[tokio::main] /// # async fn main() -> Result<()> { /// let ctx = SessionContext::new(); - /// let df = ctx.read_json("tests/data/unnest.json", NdJsonReadOptions::default()).await?; + /// let df = ctx.read_json("tests/data/unnest.json", JsonReadOptions::default()).await?; /// // expand into multiple columns if it's json array, flatten field name if it's nested structure /// let df = df.unnest_columns(&["b","c","d"])?; /// let expected = vec![ diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index cce8ead46cd78..b53fdad5db7e9 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -25,7 +25,7 @@ mod tests { use super::*; use crate::datasource::file_format::test_util::scan_format; - use crate::prelude::{NdJsonReadOptions, SessionConfig, SessionContext}; + use crate::prelude::{SessionConfig, SessionContext}; use crate::test::object_store::local_unpartitioned_file; use arrow::array::RecordBatch; use arrow_schema::Schema; @@ -46,6 +46,7 @@ mod tests { use datafusion_common::internal_err; use datafusion_common::stats::Precision; + use crate::execution::options::JsonReadOptions; use datafusion_common::Result; use datafusion_datasource::file_compression_type::FileCompressionType; use futures::StreamExt; @@ -53,6 +54,46 @@ mod tests { use object_store::local::LocalFileSystem; use regex::Regex; use rstest::rstest; + // ==================== Test Helpers ==================== + + /// Create a temporary JSON file and return (TempDir, path) + fn create_temp_json(content: &str) -> (tempfile::TempDir, String) { + let tmp_dir = tempfile::TempDir::new().unwrap(); + let path = format!("{}/test.json", tmp_dir.path().to_string_lossy()); + std::fs::write(&path, content).unwrap(); + (tmp_dir, path) + } + + /// Infer schema from JSON array format file + async fn infer_json_array_schema( + content: &str, + ) -> Result { + let (_tmp_dir, path) = create_temp_json(content); + let session = SessionContext::new(); + let ctx = session.state(); + let store = Arc::new(LocalFileSystem::new()) as _; + let format = JsonFormat::default().with_newline_delimited(false); + format + .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)]) + .await + } + + /// Register a JSON array table and run a query + async fn query_json_array(content: &str, query: &str) -> Result> { + let (_tmp_dir, path) = create_temp_json(content); + let ctx = SessionContext::new(); + let options = JsonReadOptions::default().newline_delimited(false); + ctx.register_json("test_table", &path, options).await?; + ctx.sql(query).await?.collect().await + } + + /// Register a JSON array table and run a query, return formatted string + async fn query_json_array_str(content: &str, query: &str) -> Result { + let result = query_json_array(content, query).await?; + Ok(batches_to_string(&result)) + } + + // ==================== Existing Tests ==================== #[tokio::test] async fn read_small_batches() -> Result<()> { @@ -209,7 +250,7 @@ mod tests { let ctx = SessionContext::new_with_config(config); let table_path = "tests/data/1.json"; - let options = NdJsonReadOptions::default(); + let options = JsonReadOptions::default(); ctx.register_json("json_parallel", table_path, options) .await?; @@ -241,7 +282,7 @@ mod tests { let ctx = SessionContext::new_with_config(config); let table_path = "tests/data/empty.json"; - let options = NdJsonReadOptions::default(); + let options = JsonReadOptions::default(); ctx.register_json("json_parallel_empty", table_path, options) .await?; @@ -315,7 +356,6 @@ mod tests { .digest(r#"{ "c1": 11, "c2": 12, "c3": 13, "c4": 14, "c5": 15 }"#.into()); let mut all_batches = RecordBatch::new_empty(schema.clone()); - // We get RequiresMoreData after 2 batches because of how json::Decoder works for _ in 0..2 { let output = deserializer.next()?; let DeserializerOutput::RecordBatch(batch) = output else { @@ -359,7 +399,6 @@ mod tests { let df = ctx.sql("SELECT CAST(1 AS BIGINT) AS id LIMIT 0").await?; df.write_json(&path, crate::dataframe::DataFrameWriteOptions::new(), None) .await?; - // Expected the file to exist and be empty assert!(std::path::Path::new(&path).exists()); let metadata = std::fs::metadata(&path)?; assert_eq!(metadata.len(), 0); @@ -386,280 +425,210 @@ mod tests { let df = ctx.read_batch(empty_batch.clone())?; df.write_json(&path, crate::dataframe::DataFrameWriteOptions::new(), None) .await?; - // Expected the file to exist and be empty assert!(std::path::Path::new(&path).exists()); let metadata = std::fs::metadata(&path)?; assert_eq!(metadata.len(), 0); Ok(()) } - #[tokio::test] - async fn test_json_array_format() -> Result<()> { - let session = SessionContext::new(); - let ctx = session.state(); - let store = Arc::new(LocalFileSystem::new()) as _; - - // Create a temporary file with JSON array format - let tmp_dir = tempfile::TempDir::new()?; - let path = format!("{}/array.json", tmp_dir.path().to_string_lossy()); - std::fs::write( - &path, - r#"[ - {"a": 1, "b": 2.0, "c": true}, - {"a": 2, "b": 3.5, "c": false}, - {"a": 3, "b": 4.0, "c": true} - ]"#, - )?; + // ==================== JSON Array Format Tests ==================== - // Test with format_array = true - let format = JsonFormat::default().with_format_array(true); - let file_schema = format - .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)]) - .await - .expect("Schema inference"); + #[tokio::test] + async fn test_json_array_schema_inference() -> Result<()> { + let schema = infer_json_array_schema( + r#"[{"a": 1, "b": 2.0, "c": true}, {"a": 2, "b": 3.5, "c": false}]"#, + ) + .await?; - let fields = file_schema + let fields: Vec<_> = schema .fields() .iter() .map(|f| format!("{}: {:?}", f.name(), f.data_type())) - .collect::>(); + .collect(); assert_eq!(vec!["a: Int64", "b: Float64", "c: Boolean"], fields); - Ok(()) } #[tokio::test] - async fn test_json_array_format_empty() -> Result<()> { - let session = SessionContext::new(); - let ctx = session.state(); - let store = Arc::new(LocalFileSystem::new()) as _; - - let tmp_dir = tempfile::TempDir::new()?; - let path = format!("{}/empty_array.json", tmp_dir.path().to_string_lossy()); - std::fs::write(&path, "[]")?; - - let format = JsonFormat::default().with_format_array(true); - let result = format - .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)]) - .await; - - assert!(result.is_err()); - assert!(result - .unwrap_err() - .to_string() - .contains("JSON array is empty")); - + async fn test_json_array_empty() -> Result<()> { + let schema = infer_json_array_schema("[]").await?; + assert_eq!(schema.fields().len(), 0); Ok(()) } #[tokio::test] - async fn test_json_array_format_with_limit() -> Result<()> { - let session = SessionContext::new(); - let ctx = session.state(); - let store = Arc::new(LocalFileSystem::new()) as _; - - let tmp_dir = tempfile::TempDir::new()?; - let path = format!("{}/array_limit.json", tmp_dir.path().to_string_lossy()); - std::fs::write( - &path, - r#"[ - {"a": 1}, - {"a": 2, "b": "extra"} - ]"#, - )?; - - // Only infer from first record - let format = JsonFormat::default() - .with_format_array(true) - .with_schema_infer_max_rec(1); - - let file_schema = format - .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)]) - .await - .expect("Schema inference"); - - // Should only have field "a" since we limited to 1 record - let fields = file_schema - .fields() - .iter() - .map(|f| format!("{}: {:?}", f.name(), f.data_type())) - .collect::>(); - assert_eq!(vec!["a: Int64"], fields); + async fn test_json_array_nested_struct() -> Result<()> { + let schema = infer_json_array_schema( + r#"[{"id": 1, "info": {"name": "Alice", "age": 30}}]"#, + ) + .await?; + let info_field = schema.field_with_name("info").unwrap(); + assert!(matches!(info_field.data_type(), DataType::Struct(_))); Ok(()) } #[tokio::test] - async fn test_json_array_format_read_data() -> Result<()> { - let session = SessionContext::new(); - let ctx = session.state(); - let task_ctx = ctx.task_ctx(); - let store = Arc::new(LocalFileSystem::new()) as _; - - // Create a temporary file with JSON array format - let tmp_dir = tempfile::TempDir::new()?; - let path = format!("{}/array.json", tmp_dir.path().to_string_lossy()); - std::fs::write( - &path, - r#"[ - {"a": 1, "b": 2.0, "c": true}, - {"a": 2, "b": 3.5, "c": false}, - {"a": 3, "b": 4.0, "c": true} - ]"#, - )?; + async fn test_json_array_list_type() -> Result<()> { + let schema = + infer_json_array_schema(r#"[{"id": 1, "tags": ["a", "b", "c"]}]"#).await?; - let format = JsonFormat::default().with_format_array(true); - - // Infer schema - let file_schema = format - .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)]) - .await?; + let tags_field = schema.field_with_name("tags").unwrap(); + assert!(matches!(tags_field.data_type(), DataType::List(_))); + Ok(()) + } - // Scan and read data - let exec = scan_format( - &ctx, - &format, - Some(file_schema), - tmp_dir.path().to_str().unwrap(), - "array.json", - None, - None, + #[tokio::test] + async fn test_json_array_basic_query() -> Result<()> { + let result = query_json_array_str( + r#"[{"a": 1, "b": "hello"}, {"a": 2, "b": "world"}, {"a": 3, "b": "test"}]"#, + "SELECT a, b FROM test_table ORDER BY a", ) .await?; - let batches = collect(exec, task_ctx).await?; - - assert_eq!(1, batches.len()); - assert_eq!(3, batches[0].num_columns()); - assert_eq!(3, batches[0].num_rows()); - - // Verify data - let array_a = as_int64_array(batches[0].column(0))?; - assert_eq!( - vec![1, 2, 3], - (0..3).map(|i| array_a.value(i)).collect::>() - ); + assert_snapshot!(result, @r" + +---+-------+ + | a | b | + +---+-------+ + | 1 | hello | + | 2 | world | + | 3 | test | + +---+-------+ + "); Ok(()) } #[tokio::test] - async fn test_json_array_format_with_projection() -> Result<()> { - let session = SessionContext::new(); - let ctx = session.state(); - let task_ctx = ctx.task_ctx(); - let store = Arc::new(LocalFileSystem::new()) as _; - - let tmp_dir = tempfile::TempDir::new()?; - let path = format!("{}/array.json", tmp_dir.path().to_string_lossy()); - std::fs::write(&path, r#"[{"a": 1, "b": "hello"}, {"a": 2, "b": "world"}]"#)?; - - let format = JsonFormat::default().with_format_array(true); - let file_schema = format - .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)]) + async fn test_json_array_with_nulls() -> Result<()> { + let result = query_json_array_str( + r#"[{"id": 1, "name": "Alice"}, {"id": 2, "name": null}, {"id": 3, "name": "Charlie"}]"#, + "SELECT id, name FROM test_table ORDER BY id", + ) .await?; - // Project only column "a" - let exec = scan_format( - &ctx, - &format, - Some(file_schema), - tmp_dir.path().to_str().unwrap(), - "array.json", - Some(vec![0]), - None, + assert_snapshot!(result, @r" + +----+---------+ + | id | name | + +----+---------+ + | 1 | Alice | + | 2 | | + | 3 | Charlie | + +----+---------+ + "); + Ok(()) + } + + #[tokio::test] + async fn test_json_array_unnest() -> Result<()> { + let result = query_json_array_str( + r#"[{"id": 1, "values": [10, 20, 30]}, {"id": 2, "values": [40, 50]}]"#, + "SELECT id, unnest(values) as value FROM test_table ORDER BY id, value", ) .await?; - let batches = collect(exec, task_ctx).await?; - - assert_eq!(1, batches.len()); - assert_eq!(1, batches[0].num_columns()); // Only 1 column projected - assert_eq!(2, batches[0].num_rows()); + assert_snapshot!(result, @r" + +----+-------+ + | id | value | + +----+-------+ + | 1 | 10 | + | 1 | 20 | + | 1 | 30 | + | 2 | 40 | + | 2 | 50 | + +----+-------+ + "); Ok(()) } #[tokio::test] - async fn test_ndjson_read_options_format_array() -> Result<()> { - let ctx = SessionContext::new(); - - // Create a temporary file with JSON array format - let tmp_dir = tempfile::TempDir::new()?; - let path = format!("{}/array.json", tmp_dir.path().to_string_lossy()); - std::fs::write( - &path, - r#"[ - {"a": 1, "b": "hello"}, - {"a": 2, "b": "world"}, - {"a": 3, "b": "test"} - ]"#, - )?; - - // Use NdJsonReadOptions with format_array = true - let options = NdJsonReadOptions::default().format_array(true); - - ctx.register_json("json_array_table", &path, options) + async fn test_json_array_unnest_struct() -> Result<()> { + let result = query_json_array_str( + r#"[{"id": 1, "orders": [{"product": "A", "qty": 2}, {"product": "B", "qty": 3}]}, {"id": 2, "orders": [{"product": "C", "qty": 1}]}]"#, + "SELECT id, unnest(orders)['product'] as product, unnest(orders)['qty'] as qty FROM test_table ORDER BY id, product", + ) .await?; - let result = ctx - .sql("SELECT a, b FROM json_array_table ORDER BY a") - .await? - .collect() - .await?; + assert_snapshot!(result, @r" + +----+---------+-----+ + | id | product | qty | + +----+---------+-----+ + | 1 | A | 2 | + | 1 | B | 3 | + | 2 | C | 1 | + +----+---------+-----+ + "); + Ok(()) + } - assert_snapshot!(batches_to_string(&result), @r" - +---+-------+ - | a | b | - +---+-------+ - | 1 | hello | - | 2 | world | - | 3 | test | - +---+-------+ - "); + #[tokio::test] + async fn test_json_array_nested_struct_access() -> Result<()> { + let result = query_json_array_str( + r#"[{"id": 1, "dept": {"name": "Engineering", "head": "Alice"}}, {"id": 2, "dept": {"name": "Sales", "head": "Bob"}}]"#, + "SELECT id, dept['name'] as dept_name, dept['head'] as head FROM test_table ORDER BY id", + ) + .await?; + assert_snapshot!(result, @r" + +----+-------------+-------+ + | id | dept_name | head | + +----+-------------+-------+ + | 1 | Engineering | Alice | + | 2 | Sales | Bob | + +----+-------------+-------+ + "); Ok(()) } #[tokio::test] - async fn test_ndjson_read_options_format_array_with_compression() -> Result<()> { + async fn test_json_array_with_compression() -> Result<()> { use flate2::write::GzEncoder; use flate2::Compression; use std::io::Write; - let ctx = SessionContext::new(); - - // Create a temporary gzip compressed JSON array file let tmp_dir = tempfile::TempDir::new()?; let path = format!("{}/array.json.gz", tmp_dir.path().to_string_lossy()); - let json_content = r#"[{"a": 1, "b": "hello"}, {"a": 2, "b": "world"}]"#; let file = std::fs::File::create(&path)?; let mut encoder = GzEncoder::new(file, Compression::default()); - encoder.write_all(json_content.as_bytes())?; + encoder.write_all( + r#"[{"a": 1, "b": "hello"}, {"a": 2, "b": "world"}]"#.as_bytes(), + )?; encoder.finish()?; - // Use NdJsonReadOptions with format_array and GZIP compression - let options = NdJsonReadOptions::default() - .format_array(true) + let ctx = SessionContext::new(); + let options = JsonReadOptions::default() + .newline_delimited(false) .file_compression_type(FileCompressionType::GZIP) .file_extension(".json.gz"); - ctx.register_json("json_array_gzip", &path, options).await?; - + ctx.register_json("test_table", &path, options).await?; let result = ctx - .sql("SELECT a, b FROM json_array_gzip ORDER BY a") + .sql("SELECT a, b FROM test_table ORDER BY a") .await? .collect() .await?; assert_snapshot!(batches_to_string(&result), @r" - +---+-------+ - | a | b | - +---+-------+ - | 1 | hello | - | 2 | world | - +---+-------+ - "); + +---+-------+ + | a | b | + +---+-------+ + | 1 | hello | + | 2 | world | + +---+-------+ + "); + Ok(()) + } + + #[tokio::test] + async fn test_json_array_list_of_structs() -> Result<()> { + let batches = query_json_array( + r#"[{"id": 1, "items": [{"name": "x", "price": 10.5}]}, {"id": 2, "items": []}]"#, + "SELECT id, items FROM test_table ORDER BY id", + ) + .await?; + assert_eq!(1, batches.len()); + assert_eq!(2, batches[0].num_rows()); Ok(()) } } diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index fb707ed3d6c57..7c9777d01488f 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -442,14 +442,23 @@ impl<'a> AvroReadOptions<'a> { } } -/// Options that control the reading of Line-delimited JSON files (NDJson) +#[deprecated( + since = "53.0.0", + note = "Use `JsonReadOptions` instead. This alias will be removed in a future version." +)] +#[doc = "Deprecated: Use [`JsonReadOptions`] instead."] +pub type NdJsonReadOptions<'a> = JsonReadOptions<'a>; + +/// Options that control the reading of JSON files. +/// +/// Supports both newline-delimited JSON (NDJSON) and JSON array formats. /// /// Note this structure is supplied when a datasource is created and -/// can not not vary from statement to statement. For settings that +/// can not vary from statement to statement. For settings that /// can vary statement to statement see /// [`ConfigOptions`](crate::config::ConfigOptions). #[derive(Clone)] -pub struct NdJsonReadOptions<'a> { +pub struct JsonReadOptions<'a> { /// The data source schema. pub schema: Option<&'a Schema>, /// Max number of rows to read from JSON files for schema inference if needed. Defaults to `DEFAULT_SCHEMA_INFER_MAX_RECORD`. @@ -465,12 +474,25 @@ pub struct NdJsonReadOptions<'a> { pub infinite: bool, /// Indicates how the file is sorted pub file_sort_order: Vec>, - /// Whether the JSON file is in array format `[{...}, {...}]` instead of - /// line-delimited format. Defaults to `false`. - pub format_array: bool, + /// Whether to read as newline-delimited JSON (default: true). + /// + /// When `true` (default), expects newline-delimited JSON (NDJSON): + /// ```text + /// {"key1": 1, "key2": "val"} + /// {"key1": 2, "key2": "vals"} + /// ``` + /// + /// When `false`, expects JSON array format: + /// ```text + /// [ + /// {"key1": 1, "key2": "val"}, + /// {"key1": 2, "key2": "vals"} + /// ] + /// ``` + pub newline_delimited: bool, } -impl Default for NdJsonReadOptions<'_> { +impl Default for JsonReadOptions<'_> { fn default() -> Self { Self { schema: None, @@ -480,12 +502,12 @@ impl Default for NdJsonReadOptions<'_> { file_compression_type: FileCompressionType::UNCOMPRESSED, infinite: false, file_sort_order: vec![], - format_array: false, + newline_delimited: true, } } } -impl<'a> NdJsonReadOptions<'a> { +impl<'a> JsonReadOptions<'a> { /// Specify table_partition_cols for partition pruning pub fn table_partition_cols( mut self, @@ -534,10 +556,23 @@ impl<'a> NdJsonReadOptions<'a> { self } - /// Specify whether the JSON file is in array format `[{...}, {...}]` - /// instead of line-delimited format. - pub fn format_array(mut self, format_array: bool) -> Self { - self.format_array = format_array; + /// Set whether to read as newline-delimited JSON. + /// + /// When `true` (default), expects newline-delimited JSON (NDJSON): + /// ```text + /// {"key1": 1, "key2": "val"} + /// {"key1": 2, "key2": "vals"} + /// ``` + /// + /// When `false`, expects JSON array format: + /// ```text + /// [ + /// {"key1": 1, "key2": "val"}, + /// {"key1": 2, "key2": "vals"} + /// ] + /// ``` + pub fn newline_delimited(mut self, newline_delimited: bool) -> Self { + self.newline_delimited = newline_delimited; self } } @@ -665,7 +700,7 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> { } #[async_trait] -impl ReadOptions<'_> for NdJsonReadOptions<'_> { +impl ReadOptions<'_> for JsonReadOptions<'_> { fn to_listing_options( &self, config: &SessionConfig, @@ -675,7 +710,7 @@ impl ReadOptions<'_> for NdJsonReadOptions<'_> { .with_options(table_options.json) .with_schema_infer_max_rec(self.schema_infer_max_records) .with_file_compression_type(self.file_compression_type.to_owned()) - .with_format_array(self.format_array); + .with_newline_delimited(self.newline_delimited); ListingOptions::new(Arc::new(file_format)) .with_file_extension(self.file_extension) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 3333b70676203..06af40f8217b8 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -110,6 +110,7 @@ mod tests { #[cfg(feature = "parquet")] use crate::datasource::file_format::parquet::ParquetFormat; use crate::datasource::listing::table::ListingTableConfigExt; + use crate::execution::options::JsonReadOptions; use crate::prelude::*; use crate::{ datasource::{ @@ -806,7 +807,7 @@ mod tests { .register_json( "t", tmp_dir.path().to_str().unwrap(), - NdJsonReadOptions::default() + JsonReadOptions::default() .schema(schema.as_ref()) .file_compression_type(file_compression_type), ) diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index f7d5c710bf48a..6701bab332df9 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -32,7 +32,7 @@ mod tests { use crate::dataframe::DataFrameWriteOptions; use crate::execution::SessionState; - use crate::prelude::{CsvReadOptions, NdJsonReadOptions, SessionContext}; + use crate::prelude::{CsvReadOptions, JsonReadOptions, SessionContext}; use crate::test::partitioned_file_groups; use datafusion_common::cast::{as_int32_array, as_int64_array, as_string_array}; use datafusion_common::test_util::batches_to_string; @@ -132,7 +132,7 @@ mod tests { .get_ext_with_compression(&file_compression_type) .unwrap(); - let read_options = NdJsonReadOptions::default() + let read_options = JsonReadOptions::default() .file_extension(ext.as_str()) .file_compression_type(file_compression_type.to_owned()); let frame = ctx.read_json(path, read_options).await.unwrap(); @@ -384,7 +384,7 @@ mod tests { let path = format!("{TEST_DATA_BASE}/1.json"); // register json file with the execution context - ctx.register_json("test", path.as_str(), NdJsonReadOptions::default()) + ctx.register_json("test", path.as_str(), JsonReadOptions::default()) .await?; // register a local file system object store for /tmp directory @@ -426,7 +426,7 @@ mod tests { } // register each partition as well as the top level dir - let json_read_option = NdJsonReadOptions::default(); + let json_read_option = JsonReadOptions::default(); ctx.register_json( "part0", &format!("{out_dir}/{part_0_name}"), @@ -503,7 +503,7 @@ mod tests { async fn read_test_data(schema_infer_max_records: usize) -> Result { let ctx = SessionContext::new(); - let options = NdJsonReadOptions { + let options = JsonReadOptions { schema_infer_max_records, ..Default::default() }; @@ -579,7 +579,7 @@ mod tests { .get_ext_with_compression(&file_compression_type) .unwrap(); - let read_option = NdJsonReadOptions::default() + let read_option = JsonReadOptions::default() .file_compression_type(file_compression_type) .file_extension(ext.as_str()); diff --git a/datafusion/core/src/execution/context/json.rs b/datafusion/core/src/execution/context/json.rs index e9d799400863d..f7df2ad7a1cd6 100644 --- a/datafusion/core/src/execution/context/json.rs +++ b/datafusion/core/src/execution/context/json.rs @@ -15,13 +15,13 @@ // specific language governing permissions and limitations // under the License. +use super::super::options::ReadOptions; +use super::{DataFilePaths, DataFrame, ExecutionPlan, Result, SessionContext}; +use crate::execution::options::JsonReadOptions; use datafusion_common::TableReference; use datafusion_datasource_json::source::plan_to_json; use std::sync::Arc; -use super::super::options::{NdJsonReadOptions, ReadOptions}; -use super::{DataFilePaths, DataFrame, ExecutionPlan, Result, SessionContext}; - impl SessionContext { /// Creates a [`DataFrame`] for reading an JSON data source. /// @@ -32,7 +32,7 @@ impl SessionContext { pub async fn read_json( &self, table_paths: P, - options: NdJsonReadOptions<'_>, + options: JsonReadOptions<'_>, ) -> Result { self._read_type(table_paths, options).await } @@ -43,7 +43,7 @@ impl SessionContext { &self, table_ref: impl Into, table_path: impl AsRef, - options: NdJsonReadOptions<'_>, + options: JsonReadOptions<'_>, ) -> Result<()> { let listing_options = options .to_listing_options(&self.copied_config(), self.copied_table_options()); diff --git a/datafusion/core/src/prelude.rs b/datafusion/core/src/prelude.rs index d723620d32323..a94716aed7831 100644 --- a/datafusion/core/src/prelude.rs +++ b/datafusion/core/src/prelude.rs @@ -29,7 +29,7 @@ pub use crate::dataframe; pub use crate::dataframe::DataFrame; pub use crate::execution::context::{SQLOptions, SessionConfig, SessionContext}; pub use crate::execution::options::{ - AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions, + AvroReadOptions, CsvReadOptions, JsonReadOptions, ParquetReadOptions, }; pub use datafusion_common::Column; diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 4d52345a2adc5..e4d8f10d719af 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -56,9 +56,7 @@ use datafusion::error::Result; use datafusion::execution::context::SessionContext; use datafusion::execution::session_state::SessionStateBuilder; use datafusion::logical_expr::{ColumnarValue, Volatility}; -use datafusion::prelude::{ - CsvReadOptions, JoinType, NdJsonReadOptions, ParquetReadOptions, -}; +use datafusion::prelude::{CsvReadOptions, JoinType, ParquetReadOptions}; use datafusion::test_util::{ parquet_test_data, populate_csv_partitions, register_aggregate_csv, test_table, test_table_with_name, @@ -93,6 +91,7 @@ use datafusion_physical_plan::empty::EmptyExec; use datafusion_physical_plan::{displayable, ExecutionPlan, ExecutionPlanProperties}; use datafusion::error::Result as DataFusionResult; +use datafusion::execution::options::JsonReadOptions; use datafusion_functions_window::expr_fn::lag; // Get string representation of the plan @@ -2767,7 +2766,7 @@ async fn write_json_with_order() -> Result<()> { ctx.register_json( "data", test_path.to_str().unwrap(), - NdJsonReadOptions::default().schema(&schema), + JsonReadOptions::default().schema(&schema), ) .await?; @@ -6240,7 +6239,7 @@ async fn register_non_json_file() { .register_json( "data", "tests/data/test_binary.parquet", - NdJsonReadOptions::default(), + JsonReadOptions::default(), ) .await; assert_contains!( diff --git a/datafusion/datasource-json/Cargo.toml b/datafusion/datasource-json/Cargo.toml index 168ae8880eee7..657c40b6b46ee 100644 --- a/datafusion/datasource-json/Cargo.toml +++ b/datafusion/datasource-json/Cargo.toml @@ -44,8 +44,9 @@ datafusion-physical-plan = { workspace = true } datafusion-session = { workspace = true } futures = { workspace = true } object_store = { workspace = true } -serde_json = { workspace = true } tokio = { workspace = true } +tokio-stream = { workspace = true, features = ["sync"] } +tokio-util = { workspace = true, features = ["io", "io-util", "compat"] } # Note: add additional linter rules in lib.rs. # Rust does not support workspace + new linter rules in subcrates yet diff --git a/datafusion/datasource-json/src/file_format.rs b/datafusion/datasource-json/src/file_format.rs index 589ad6bea9c8b..b2a3dca9f230a 100644 --- a/datafusion/datasource-json/src/file_format.rs +++ b/datafusion/datasource-json/src/file_format.rs @@ -31,6 +31,7 @@ use arrow::datatypes::{Schema, SchemaRef}; use arrow::error::ArrowError; use arrow::json; use arrow::json::reader::{infer_json_schema_from_iterator, ValueIter}; +use bytes::{Buf, Bytes}; use datafusion_common::config::{ConfigField, ConfigFileType, JsonOptions}; use datafusion_common::file_options::json_writer::JsonWriterOptions; use datafusion_common::{ @@ -47,7 +48,6 @@ use datafusion_datasource::file_format::{ use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig}; use datafusion_datasource::sink::{DataSink, DataSinkExec}; - use datafusion_datasource::source::DataSourceExec; use datafusion_datasource::write::demux::DemuxedStreamReceiver; use datafusion_datasource::write::orchestration::spawn_writer_tasks_and_join; @@ -58,8 +58,8 @@ use datafusion_physical_expr_common::sort_expr::LexRequirement; use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; use datafusion_session::Session; +use crate::utils::JsonArrayToNdjsonReader; use async_trait::async_trait; -use bytes::{Buf, Bytes}; use object_store::{GetResultPayload, ObjectMeta, ObjectStore}; #[derive(Default)] @@ -136,19 +136,22 @@ impl Debug for JsonFormatFactory { /// /// # Supported Formats /// -/// ## Line-Delimited JSON (default) +/// ## Line-Delimited JSON (default, `newline_delimited = true`) /// ```text /// {"key1": 1, "key2": "val"} /// {"key1": 2, "key2": "vals"} /// ``` /// -/// ## JSON Array Format (when `format_array` option is true) +/// ## JSON Array Format (`newline_delimited = false`) /// ```text /// [ /// {"key1": 1, "key2": "val"}, /// {"key1": 2, "key2": "vals"} /// ] /// ``` +/// +/// Note: JSON array format is processed using streaming conversion, +/// which is memory-efficient even for large files. #[derive(Debug, Default)] pub struct JsonFormat { options: JsonOptions, @@ -183,48 +186,51 @@ impl JsonFormat { self } - /// Set whether to expect JSON array format instead of line-delimited format. + /// Set whether to read as newline-delimited JSON (NDJSON). /// - /// When `true`, expects input like: `[{"a": 1}, {"a": 2}]` - /// When `false` (default), expects input like: + /// When `true` (default), expects newline-delimited format: /// ```text /// {"a": 1} /// {"a": 2} /// ``` - pub fn with_format_array(mut self, format_array: bool) -> Self { - self.options.format_array = format_array; + /// + /// When `false`, expects JSON array format: + /// ```text + /// [{"a": 1}, {"a": 2}] + /// ``` + pub fn with_newline_delimited(mut self, newline_delimited: bool) -> Self { + self.options.newline_delimited = newline_delimited; self } } -/// Infer schema from a JSON array format file. +/// Infer schema from JSON array format using streaming conversion. +/// +/// This function converts JSON array format to NDJSON on-the-fly and uses +/// arrow-json's schema inference. It properly tracks the number of records +/// processed for correct `records_to_read` management. /// -/// This function reads JSON data in array format `[{...}, {...}]` and infers -/// the Arrow schema from the contained objects. -fn infer_json_schema_from_json_array( - reader: &mut R, +/// # Returns +/// A tuple of (Schema, records_consumed) where records_consumed is the +/// number of records that were processed for schema inference. +fn infer_schema_from_json_array( + reader: R, max_records: usize, -) -> std::result::Result { - let mut content = String::new(); - reader.read_to_string(&mut content).map_err(|e| { - ArrowError::JsonError(format!("Failed to read JSON content: {e}")) - })?; - - // Parse as JSON array using serde_json - let values: Vec = serde_json::from_str(&content) - .map_err(|e| ArrowError::JsonError(format!("Failed to parse JSON array: {e}")))?; +) -> Result<(Schema, usize)> { + let ndjson_reader = JsonArrayToNdjsonReader::new(reader); - // Take only max_records for schema inference - let values_to_infer: Vec<_> = values.into_iter().take(max_records).collect(); + let iter = ValueIter::new(ndjson_reader, None); + let mut count = 0; - if values_to_infer.is_empty() { - return Err(ArrowError::JsonError( - "JSON array is empty, cannot infer schema".to_string(), - )); - } + let schema = infer_json_schema_from_iterator(iter.take_while(|_| { + let should_take = count < max_records; + if should_take { + count += 1; + } + should_take + }))?; - // Use arrow's schema inference on the parsed values - infer_json_schema_from_iterator(values_to_infer.into_iter().map(Ok)) + Ok((schema, count)) } #[async_trait] @@ -261,53 +267,67 @@ impl FileFormat for JsonFormat { .schema_infer_max_rec .unwrap_or(DEFAULT_SCHEMA_INFER_MAX_RECORD); let file_compression_type = FileCompressionType::from(self.options.compression); - let is_array_format = self.options.format_array; + let newline_delimited = self.options.newline_delimited; for object in objects { - let mut take_while = || { - let should_take = records_to_read > 0; - if should_take { - records_to_read -= 1; - } - should_take - }; + // Early exit if we've read enough records + if records_to_read == 0 { + break; + } let r = store.as_ref().get(&object.location).await?; - let schema = match r.payload { + + let (schema, records_consumed) = match r.payload { #[cfg(not(target_arch = "wasm32"))] GetResultPayload::File(file, _) => { let decoder = file_compression_type.convert_read(file)?; - let mut reader = BufReader::new(decoder); - - if is_array_format { - infer_json_schema_from_json_array(&mut reader, records_to_read)? + let reader = BufReader::new(decoder); + + if newline_delimited { + // NDJSON: use ValueIter directly + let iter = ValueIter::new(reader, None); + let mut count = 0; + let schema = + infer_json_schema_from_iterator(iter.take_while(|_| { + let should_take = count < records_to_read; + if should_take { + count += 1; + } + should_take + }))?; + (schema, count) } else { - let iter = ValueIter::new(&mut reader, None); - infer_json_schema_from_iterator( - iter.take_while(|_| take_while()), - )? + // JSON array format: use streaming converter + infer_schema_from_json_array(reader, records_to_read)? } } GetResultPayload::Stream(_) => { let data = r.bytes().await?; let decoder = file_compression_type.convert_read(data.reader())?; - let mut reader = BufReader::new(decoder); - - if is_array_format { - infer_json_schema_from_json_array(&mut reader, records_to_read)? + let reader = BufReader::new(decoder); + + if newline_delimited { + let iter = ValueIter::new(reader, None); + let mut count = 0; + let schema = + infer_json_schema_from_iterator(iter.take_while(|_| { + let should_take = count < records_to_read; + if should_take { + count += 1; + } + should_take + }))?; + (schema, count) } else { - let iter = ValueIter::new(&mut reader, None); - infer_json_schema_from_iterator( - iter.take_while(|_| take_while()), - )? + // JSON array format: use streaming converter + infer_schema_from_json_array(reader, records_to_read)? } } }; schemas.push(schema); - if records_to_read == 0 { - break; - } + // Correctly decrement records_to_read + records_to_read = records_to_read.saturating_sub(records_consumed); } let schema = Schema::try_merge(schemas)?; @@ -329,8 +349,9 @@ impl FileFormat for JsonFormat { _state: &dyn Session, conf: FileScanConfig, ) -> Result> { - let source = - Arc::new(JsonSource::new().with_format_array(self.options.format_array)); + let source = Arc::new( + JsonSource::new().with_newline_delimited(self.options.newline_delimited), + ); let conf = FileScanConfigBuilder::from(conf) .with_file_compression_type(FileCompressionType::from( self.options.compression, @@ -359,7 +380,7 @@ impl FileFormat for JsonFormat { } fn file_source(&self) -> Arc { - Arc::new(JsonSource::new().with_format_array(self.options.format_array)) + Arc::new(JsonSource::new().with_newline_delimited(self.options.newline_delimited)) } } diff --git a/datafusion/datasource-json/src/mod.rs b/datafusion/datasource-json/src/mod.rs index 18bb8792c3ffe..549393bb397be 100644 --- a/datafusion/datasource-json/src/mod.rs +++ b/datafusion/datasource-json/src/mod.rs @@ -21,5 +21,6 @@ pub mod file_format; pub mod source; +pub mod utils; pub use file_format::*; diff --git a/datafusion/datasource-json/src/source.rs b/datafusion/datasource-json/src/source.rs index d1107b0b97751..e3855846520b8 100644 --- a/datafusion/datasource-json/src/source.rs +++ b/datafusion/datasource-json/src/source.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Execution plan for reading line-delimited JSON files +//! Execution plan for reading JSON files (line-delimited and array formats) use std::any::Any; use std::io::{BufReader, Read, Seek, SeekFrom}; @@ -23,9 +23,10 @@ use std::sync::Arc; use std::task::Poll; use crate::file_format::JsonDecoder; +use crate::utils::JsonArrayToNdjsonReader; use datafusion_common::error::{DataFusionError, Result}; -use datafusion_common_runtime::JoinSet; +use datafusion_common_runtime::{JoinSet, SpawnedTask}; use datafusion_datasource::decoder::{deserialize_stream, DecoderDeserializer}; use datafusion_datasource::file_compression_type::FileCompressionType; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; @@ -36,7 +37,6 @@ use datafusion_datasource::{ }; use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; -use arrow::array::RecordBatch; use arrow::json::ReaderBuilder; use arrow::{datatypes::SchemaRef, json}; use datafusion_common::Statistics; @@ -49,6 +49,12 @@ use futures::{StreamExt, TryStreamExt}; use object_store::buffered::BufWriter; use object_store::{GetOptions, GetResultPayload, ObjectStore}; use tokio::io::AsyncWriteExt; +use tokio_stream::wrappers::ReceiverStream; +use tokio_util::io::{StreamReader, SyncIoBridge}; + +// ============================================================================ +// JsonOpener and JsonSource +// ============================================================================ /// A [`FileOpener`] that opens a JSON file and yields a [`FileOpenFuture`] pub struct JsonOpener { @@ -56,24 +62,26 @@ pub struct JsonOpener { projected_schema: SchemaRef, file_compression_type: FileCompressionType, object_store: Arc, - format_array: bool, + /// When `true` (default), expects newline-delimited JSON (NDJSON). + /// When `false`, expects JSON array format `[{...}, {...}]`. + newline_delimited: bool, } impl JsonOpener { - /// Returns a [`JsonOpener`] + /// Returns a [`JsonOpener`] pub fn new( batch_size: usize, projected_schema: SchemaRef, file_compression_type: FileCompressionType, object_store: Arc, - format_array: bool, + newline_delimited: bool, ) -> Self { Self { batch_size, projected_schema, file_compression_type, object_store, - format_array, + newline_delimited, } } } @@ -85,18 +93,27 @@ pub struct JsonSource { metrics: ExecutionPlanMetricsSet, projected_statistics: Option, schema_adapter_factory: Option>, - format_array: bool, + newline_delimited: bool, } impl JsonSource { - /// Initialize a JsonSource with default values + /// Initialize a JsonSource with the provided schema pub fn new() -> Self { - Self::default() + Self { + batch_size: None, + metrics: ExecutionPlanMetricsSet::new(), + projected_statistics: None, + schema_adapter_factory: None, + newline_delimited: true, + } } - /// Set whether to expect JSON array format - pub fn with_format_array(mut self, format_array: bool) -> Self { - self.format_array = format_array; + /// Set whether to read as newline-delimited JSON. + /// + /// When `true` (default), expects newline-delimited format. + /// When `false`, expects JSON array format `[{...}, {...}]`. + pub fn with_newline_delimited(mut self, newline_delimited: bool) -> Self { + self.newline_delimited = newline_delimited; self } } @@ -121,8 +138,8 @@ impl FileSource for JsonSource { projected_schema: base_config.projected_file_schema(), file_compression_type: base_config.file_compression_type, object_store, - format_array: self.format_array, - }) + newline_delimited: self.newline_delimited, + }) as Arc } fn as_any(&self) -> &dyn Any { @@ -179,7 +196,7 @@ impl FileSource for JsonSource { } impl FileOpener for JsonOpener { - /// Open a partitioned NDJSON file. + /// Open a partitioned JSON file. /// /// If `file_meta.range` is `None`, the entire file is opened. /// Else `file_meta.range` is `Some(FileRange{start, end})`, which corresponds to the byte range [start, end) within the file. @@ -188,18 +205,20 @@ impl FileOpener for JsonOpener { /// are applied to determine which lines to read: /// 1. The first line of the partition is the line in which the index of the first character >= `start`. /// 2. The last line of the partition is the line in which the byte at position `end - 1` resides. + /// + /// Note: JSON array format does not support range-based scanning. fn open(&self, partitioned_file: PartitionedFile) -> Result { let store = Arc::clone(&self.object_store); let schema = Arc::clone(&self.projected_schema); let batch_size = self.batch_size; let file_compression_type = self.file_compression_type.to_owned(); - let format_array = self.format_array; + let newline_delimited = self.newline_delimited; // JSON array format requires reading the complete file - if format_array && partitioned_file.range.is_some() { + if !newline_delimited && partitioned_file.range.is_some() { return Err(DataFusionError::NotImplemented( "JSON array format does not support range-based file scanning. \ - Disable repartition_file_scans or use line-delimited JSON format." + Disable repartition_file_scans or use newline-delimited JSON format." .to_string(), )); } @@ -239,42 +258,32 @@ impl FileOpener for JsonOpener { } }; - if format_array { - // Handle JSON array format - let batches = read_json_array_to_batches( - BufReader::new(bytes), - schema, - batch_size, - )?; - Ok(futures::stream::iter(batches.into_iter().map(Ok)).boxed()) + if newline_delimited { + // NDJSON: use BufReader directly + let reader = BufReader::new(bytes); + let arrow_reader = ReaderBuilder::new(schema) + .with_batch_size(batch_size) + .build(reader)?; + + Ok(futures::stream::iter(arrow_reader) + .map(|r| r.map_err(Into::into)) + .boxed()) } else { - let reader = ReaderBuilder::new(schema) + // JSON array format: wrap with streaming converter + // JsonArrayToNdjsonReader implements BufRead + let ndjson_reader = JsonArrayToNdjsonReader::new(bytes); + let arrow_reader = ReaderBuilder::new(schema) .with_batch_size(batch_size) - .build(BufReader::new(bytes))?; - Ok(futures::stream::iter(reader) + .build(ndjson_reader)?; + + Ok(futures::stream::iter(arrow_reader) .map(|r| r.map_err(Into::into)) .boxed()) } } GetResultPayload::Stream(s) => { - if format_array { - // For streaming, we need to collect all bytes first - let bytes = s - .map_err(DataFusionError::from) - .try_fold(Vec::new(), |mut acc, chunk| async move { - acc.extend_from_slice(&chunk); - Ok(acc) - }) - .await?; - let decompressed = file_compression_type - .convert_read(std::io::Cursor::new(bytes))?; - let batches = read_json_array_to_batches( - BufReader::new(decompressed), - schema, - batch_size, - )?; - Ok(futures::stream::iter(batches.into_iter().map(Ok)).boxed()) - } else { + if newline_delimited { + // Newline-delimited JSON (NDJSON) streaming reader let s = s.map_err(DataFusionError::from); let decoder = ReaderBuilder::new(schema) .with_batch_size(batch_size) @@ -286,6 +295,38 @@ impl FileOpener for JsonOpener { DecoderDeserializer::new(JsonDecoder::new(decoder)), ); Ok(stream.map_err(Into::into).boxed()) + } else { + // JSON array format: streaming conversion without loading entire file + let s = s.map_err(DataFusionError::from); + let decompressed_stream = + file_compression_type.convert_stream(s.boxed())?; + + // Convert async stream to sync reader for JsonArrayToNdjsonReader + let stream_reader = StreamReader::new( + decompressed_stream.map_err(DataFusionError::from), + ); + let sync_reader = SyncIoBridge::new(stream_reader); + + // Use streaming converter - processes data in chunks without loading entire file + let ndjson_reader = JsonArrayToNdjsonReader::new(sync_reader); + + let arrow_reader = ReaderBuilder::new(schema) + .with_batch_size(batch_size) + .build(ndjson_reader)?; + + // Process arrow reader in blocking task to avoid blocking async executor + let (tx, rx) = tokio::sync::mpsc::channel(2); + SpawnedTask::spawn_blocking(move || { + for batch_result in arrow_reader { + if tx.blocking_send(batch_result).is_err() { + break; // Receiver dropped + } + } + }); + + Ok(ReceiverStream::new(rx) + .map(|r| r.map_err(Into::into)) + .boxed()) } } } @@ -293,40 +334,6 @@ impl FileOpener for JsonOpener { } } -/// Read JSON array format and convert to RecordBatches -fn read_json_array_to_batches( - mut reader: R, - schema: SchemaRef, - batch_size: usize, -) -> Result> { - use arrow::json::ReaderBuilder; - - let mut content = String::new(); - reader.read_to_string(&mut content)?; - - // Parse JSON array - let values: Vec = serde_json::from_str(&content) - .map_err(|e| DataFusionError::External(Box::new(e)))?; - - if values.is_empty() { - return Ok(vec![RecordBatch::new_empty(schema)]); - } - - // Convert to NDJSON string for arrow-json reader - let ndjson: String = values - .iter() - .map(|v| v.to_string()) - .collect::>() - .join("\n"); - - let cursor = std::io::Cursor::new(ndjson); - let reader = ReaderBuilder::new(schema) - .with_batch_size(batch_size) - .build(cursor)?; - - reader.collect::, _>>().map_err(Into::into) -} - pub async fn plan_to_json( task_ctx: Arc, plan: Arc, diff --git a/datafusion/datasource-json/src/utils.rs b/datafusion/datasource-json/src/utils.rs new file mode 100644 index 0000000000000..10342506f9fa8 --- /dev/null +++ b/datafusion/datasource-json/src/utils.rs @@ -0,0 +1,475 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Utility types for JSON processing + +use std::io::{BufRead, Read}; + +// ============================================================================ +// JsonArrayToNdjsonReader - Streaming JSON Array to NDJSON Converter +// ============================================================================ +// +// Architecture: +// +// ```text +// ┌─────────────────────────────────────────────────────────────┐ +// │ JSON Array File (potentially very large, e.g. 1GB) │ +// │ [{"a":1}, {"a":2}, {"a":3}, ...... {"a":1000000}] │ +// └─────────────────────────────────────────────────────────────┘ +// │ +// ▼ read small chunks at a time (e.g. 64KB) +// ┌───────────────────┐ +// │ JsonArrayToNdjson │ ← character substitution only: +// │ Reader │ '[' skip, ',' → '\n', ']' stop +// └───────────────────┘ +// │ +// ▼ outputs NDJSON format +// ┌───────────────────┐ +// │ Arrow Reader │ ← internal buffer, batch parsing +// │ batch_size=1024 │ +// └───────────────────┘ +// │ +// ▼ outputs RecordBatch for every batch_size rows +// ┌───────────────────┐ +// │ RecordBatch │ +// │ (1024 rows) │ +// └───────────────────┘ +// ``` +// +// Memory Efficiency: +// +// | Approach | Memory for 1GB file | Parse count | +// |---------------------------------------|---------------------|-------------| +// | Load entire file + serde_json | ~5GB | 3x | +// | Streaming with JsonArrayToNdjsonReader| ~few MB | 1x | +// + +/// Default buffer size for JsonArrayToNdjsonReader (64KB) +const DEFAULT_BUF_SIZE: usize = 64 * 1024; + +/// Parser state for JSON array streaming +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum JsonArrayState { + /// Initial state, looking for opening '[' + Start, + /// Inside the JSON array, processing objects + InArray, + /// Reached the closing ']', finished + Done, +} + +/// A streaming reader that converts JSON array format to NDJSON format. +/// +/// This reader wraps an underlying reader containing JSON array data +/// `[{...}, {...}, ...]` and transforms it on-the-fly to newline-delimited +/// JSON format that Arrow's JSON reader can process. +/// +/// Implements both `Read` and `BufRead` traits for compatibility with Arrow's +/// `ReaderBuilder::build()` which requires `BufRead`. +/// +/// # Transformation Rules +/// +/// - Skip leading `[` and whitespace before it +/// - Convert top-level `,` (between objects) to `\n` +/// - Skip whitespace at top level (between objects) +/// - Stop at trailing `]` +/// - Preserve everything inside objects (including nested `[`, `]`, `,`) +/// - Properly handle strings (ignore special chars inside quotes) +/// +/// # Example +/// +/// ```text +/// Input: [{"a":1}, {"b":[1,2]}, {"c":"x,y"}] +/// Output: {"a":1} +/// {"b":[1,2]} +/// {"c":"x,y"} +/// ``` +pub struct JsonArrayToNdjsonReader { + inner: R, + state: JsonArrayState, + /// Tracks nesting depth of `{` and `[` to identify top-level commas + depth: i32, + /// Whether we're currently inside a JSON string + in_string: bool, + /// Whether the next character is escaped (after `\`) + escape_next: bool, + /// Internal buffer for BufRead implementation + buffer: Vec, + /// Current position in the buffer + pos: usize, + /// Number of valid bytes in the buffer + filled: usize, + /// Whether trailing non-whitespace content was detected after ']' + has_trailing_content: bool, +} + +impl JsonArrayToNdjsonReader { + /// Create a new streaming reader that converts JSON array to NDJSON. + pub fn new(reader: R) -> Self { + Self { + inner: reader, + state: JsonArrayState::Start, + depth: 0, + in_string: false, + escape_next: false, + buffer: vec![0; DEFAULT_BUF_SIZE], + pos: 0, + filled: 0, + has_trailing_content: false, + } + } + + /// Check if the JSON array was properly terminated. + /// + /// This should be called after all data has been read. + /// + /// Returns an error if: + /// - Unbalanced braces/brackets (depth != 0) + /// - Unterminated string + /// - Missing closing `]` + /// - Unexpected trailing content after `]` + pub fn validate_complete(&self) -> std::io::Result<()> { + if self.depth != 0 { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "Malformed JSON array: unbalanced braces or brackets", + )); + } + if self.in_string { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "Malformed JSON array: unterminated string", + )); + } + if self.state != JsonArrayState::Done { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "Incomplete JSON array: expected closing bracket ']'", + )); + } + if self.has_trailing_content { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "Malformed JSON: unexpected trailing content after ']'", + )); + } + Ok(()) + } + + /// Process a single byte and return the transformed byte (if any) + fn process_byte(&mut self, byte: u8) -> Option { + match self.state { + JsonArrayState::Start => { + // Looking for the opening '[', skip whitespace + if byte == b'[' { + self.state = JsonArrayState::InArray; + } + // Skip whitespace and the '[' itself + None + } + JsonArrayState::InArray => { + // Handle escape sequences in strings + if self.escape_next { + self.escape_next = false; + return Some(byte); + } + + if self.in_string { + // Inside a string: handle escape and closing quote + match byte { + b'\\' => self.escape_next = true, + b'"' => self.in_string = false, + _ => {} + } + Some(byte) + } else { + // Outside strings: track depth and transform + match byte { + b'"' => { + self.in_string = true; + Some(byte) + } + b'{' | b'[' => { + self.depth += 1; + Some(byte) + } + b'}' => { + self.depth -= 1; + Some(byte) + } + b']' => { + if self.depth == 0 { + // Top-level ']' means end of array + self.state = JsonArrayState::Done; + None + } else { + // Nested ']' inside an object + self.depth -= 1; + Some(byte) + } + } + b',' if self.depth == 0 => { + // Top-level comma between objects → newline + Some(b'\n') + } + _ => { + // At depth 0, skip whitespace between objects + if self.depth == 0 && byte.is_ascii_whitespace() { + None + } else { + Some(byte) + } + } + } + } + } + JsonArrayState::Done => { + // After ']', check for non-whitespace trailing content + if !byte.is_ascii_whitespace() { + self.has_trailing_content = true; + } + None + } + } + } + + /// Fill the internal buffer with transformed data + fn fill_internal_buffer(&mut self) -> std::io::Result<()> { + // Read raw data from inner reader + let mut raw_buf = vec![0u8; DEFAULT_BUF_SIZE]; + let mut write_pos = 0; + + loop { + let bytes_read = self.inner.read(&mut raw_buf)?; + if bytes_read == 0 { + break; // EOF + } + + for &byte in &raw_buf[..bytes_read] { + if let Some(transformed) = self.process_byte(byte) { + if write_pos < self.buffer.len() { + self.buffer[write_pos] = transformed; + write_pos += 1; + } + } + // Note: process_byte is called for all bytes to track state, + // even when buffer is full or in Done state + } + + // Only stop if buffer is full + if write_pos >= self.buffer.len() { + break; + } + } + + self.pos = 0; + self.filled = write_pos; + Ok(()) + } +} + +impl Read for JsonArrayToNdjsonReader { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + // If buffer is empty, fill it + if self.pos >= self.filled { + self.fill_internal_buffer()?; + if self.filled == 0 { + return Ok(0); // EOF + } + } + + // Copy from internal buffer to output + let available = self.filled - self.pos; + let to_copy = std::cmp::min(available, buf.len()); + buf[..to_copy].copy_from_slice(&self.buffer[self.pos..self.pos + to_copy]); + self.pos += to_copy; + Ok(to_copy) + } +} + +impl BufRead for JsonArrayToNdjsonReader { + fn fill_buf(&mut self) -> std::io::Result<&[u8]> { + if self.pos >= self.filled { + self.fill_internal_buffer()?; + } + Ok(&self.buffer[self.pos..self.filled]) + } + + fn consume(&mut self, amt: usize) { + self.pos = std::cmp::min(self.pos + amt, self.filled); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_json_array_to_ndjson_simple() { + let input = r#"[{"a":1}, {"a":2}, {"a":3}]"#; + let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes()); + let mut output = String::new(); + reader.read_to_string(&mut output).unwrap(); + assert_eq!(output, "{\"a\":1}\n{\"a\":2}\n{\"a\":3}"); + } + + #[test] + fn test_json_array_to_ndjson_nested() { + let input = r#"[{"a":{"b":1}}, {"c":[1,2,3]}]"#; + let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes()); + let mut output = String::new(); + reader.read_to_string(&mut output).unwrap(); + assert_eq!(output, "{\"a\":{\"b\":1}}\n{\"c\":[1,2,3]}"); + } + + #[test] + fn test_json_array_to_ndjson_strings_with_special_chars() { + let input = r#"[{"a":"[1,2]"}, {"b":"x,y"}]"#; + let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes()); + let mut output = String::new(); + reader.read_to_string(&mut output).unwrap(); + assert_eq!(output, "{\"a\":\"[1,2]\"}\n{\"b\":\"x,y\"}"); + } + + #[test] + fn test_json_array_to_ndjson_escaped_quotes() { + let input = r#"[{"a":"say \"hello\""}, {"b":1}]"#; + let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes()); + let mut output = String::new(); + reader.read_to_string(&mut output).unwrap(); + assert_eq!(output, "{\"a\":\"say \\\"hello\\\"\"}\n{\"b\":1}"); + } + + #[test] + fn test_json_array_to_ndjson_empty() { + let input = r#"[]"#; + let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes()); + let mut output = String::new(); + reader.read_to_string(&mut output).unwrap(); + assert_eq!(output, ""); + } + + #[test] + fn test_json_array_to_ndjson_single_element() { + let input = r#"[{"a":1}]"#; + let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes()); + let mut output = String::new(); + reader.read_to_string(&mut output).unwrap(); + assert_eq!(output, "{\"a\":1}"); + } + + #[test] + fn test_json_array_to_ndjson_bufread() { + let input = r#"[{"a":1}, {"a":2}]"#; + let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes()); + + let buf = reader.fill_buf().unwrap(); + assert!(!buf.is_empty()); + + let first_len = buf.len(); + reader.consume(first_len); + + let mut output = String::new(); + reader.read_to_string(&mut output).unwrap(); + } + + #[test] + fn test_json_array_to_ndjson_whitespace() { + let input = r#" [ {"a":1} , {"a":2} ] "#; + let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes()); + let mut output = String::new(); + reader.read_to_string(&mut output).unwrap(); + // Top-level whitespace is skipped, internal whitespace preserved + assert_eq!(output, "{\"a\":1}\n{\"a\":2}"); + } + + #[test] + fn test_validate_complete_valid_json() { + let valid_json = r#"[{"a":1},{"a":2}]"#; + let mut reader = JsonArrayToNdjsonReader::new(valid_json.as_bytes()); + let mut output = String::new(); + reader.read_to_string(&mut output).unwrap(); + reader.validate_complete().unwrap(); + } + + #[test] + fn test_json_array_with_trailing_junk() { + let input = r#" [ {"a":1} , {"a":2} ] some { junk [ here ] "#; + let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes()); + let mut output = String::new(); + reader.read_to_string(&mut output).unwrap(); + + // Should extract the valid array content + assert_eq!(output, "{\"a\":1}\n{\"a\":2}"); + + // But validation should catch the trailing junk + let result = reader.validate_complete(); + assert!(result.is_err()); + let err_msg = result.unwrap_err().to_string(); + assert!( + err_msg.contains("trailing content") + || err_msg.contains("Unexpected trailing"), + "Expected trailing content error, got: {err_msg}" + ); + } + + #[test] + fn test_validate_complete_incomplete_array() { + let invalid_json = r#"[{"a":1},{"a":2}"#; // Missing closing ] + let mut reader = JsonArrayToNdjsonReader::new(invalid_json.as_bytes()); + let mut output = String::new(); + reader.read_to_string(&mut output).unwrap(); + + let result = reader.validate_complete(); + assert!(result.is_err()); + let err_msg = result.unwrap_err().to_string(); + assert!( + err_msg.contains("expected closing bracket") + || err_msg.contains("missing closing"), + "Expected missing bracket error, got: {err_msg}" + ); + } + + #[test] + fn test_validate_complete_unbalanced_braces() { + let invalid_json = r#"[{"a":1},{"a":2]"#; // Wrong closing bracket + let mut reader = JsonArrayToNdjsonReader::new(invalid_json.as_bytes()); + let mut output = String::new(); + reader.read_to_string(&mut output).unwrap(); + + let result = reader.validate_complete(); + assert!(result.is_err()); + let err_msg = result.unwrap_err().to_string(); + assert!( + err_msg.contains("unbalanced") + || err_msg.contains("expected closing bracket"), + "Expected unbalanced or missing bracket error, got: {err_msg}" + ); + } + + #[test] + fn test_validate_complete_valid_with_trailing_whitespace() { + let input = r#"[{"a":1},{"a":2}] + "#; // Trailing whitespace is OK + let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes()); + let mut output = String::new(); + reader.read_to_string(&mut output).unwrap(); + + // Whitespace after ] should be allowed + reader.validate_complete().unwrap(); + } +} diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index c598b8dcb644a..5df9ff9faf519 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -467,8 +467,7 @@ message CsvOptions { message JsonOptions { CompressionTypeVariant compression = 1; // Compression type optional uint64 schema_infer_max_rec = 2; // Optional max records for schema inference - optional uint32 compression_level = 3; // Optional compression level - bool format_array = 4; // Whether the JSON is in array format [{},...] (default false = line-delimited) + optional bool newline_delimited = 3; // Whether to read as newline-delimited JSON (default true). When false, expects JSON array format [{},...] } message TableParquetOptions { diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index 71f8cc3e78dc5..ba8ae52202b3d 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -1092,8 +1092,7 @@ impl TryFrom<&protobuf::JsonOptions> for JsonOptions { Ok(JsonOptions { compression: compression.into(), schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as usize), - compression_level: proto_opts.compression_level.map(|h| h as usize), - format_array: proto_opts.format_array, + newline_delimited: proto_opts.newline_delimited.unwrap_or(true), }) } } diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index cc646e48b02d8..b9dd232c180ac 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -4548,10 +4548,7 @@ impl serde::Serialize for JsonOptions { if self.schema_infer_max_rec.is_some() { len += 1; } - if self.compression_level.is_some() { - len += 1; - } - if self.format_array { + if self.newline_delimited.is_some() { len += 1; } let mut struct_ser = serializer.serialize_struct("datafusion_common.JsonOptions", len)?; @@ -4565,11 +4562,8 @@ impl serde::Serialize for JsonOptions { #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("schemaInferMaxRec", ToString::to_string(&v).as_str())?; } - if let Some(v) = self.compression_level.as_ref() { - struct_ser.serialize_field("compressionLevel", v)?; - } - if self.format_array { - struct_ser.serialize_field("formatArray", &self.format_array)?; + if let Some(v) = self.newline_delimited.as_ref() { + struct_ser.serialize_field("newlineDelimited", v)?; } struct_ser.end() } @@ -4584,18 +4578,15 @@ impl<'de> serde::Deserialize<'de> for JsonOptions { "compression", "schema_infer_max_rec", "schemaInferMaxRec", - "compression_level", - "compressionLevel", - "format_array", - "formatArray", + "newline_delimited", + "newlineDelimited", ]; #[allow(clippy::enum_variant_names)] enum GeneratedField { Compression, SchemaInferMaxRec, - CompressionLevel, - FormatArray, + NewlineDelimited, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -4619,8 +4610,7 @@ impl<'de> serde::Deserialize<'de> for JsonOptions { match value { "compression" => Ok(GeneratedField::Compression), "schemaInferMaxRec" | "schema_infer_max_rec" => Ok(GeneratedField::SchemaInferMaxRec), - "compressionLevel" | "compression_level" => Ok(GeneratedField::CompressionLevel), - "formatArray" | "format_array" => Ok(GeneratedField::FormatArray), + "newlineDelimited" | "newline_delimited" => Ok(GeneratedField::NewlineDelimited), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -4642,8 +4632,7 @@ impl<'de> serde::Deserialize<'de> for JsonOptions { { let mut compression__ = None; let mut schema_infer_max_rec__ = None; - let mut compression_level__ = None; - let mut format_array__ = None; + let mut newline_delimited__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::Compression => { @@ -4660,27 +4649,18 @@ impl<'de> serde::Deserialize<'de> for JsonOptions { map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| x.0) ; } - GeneratedField::CompressionLevel => { - if compression_level__.is_some() { - return Err(serde::de::Error::duplicate_field("compressionLevel")); - } - compression_level__ = - map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| x.0) - ; - } - GeneratedField::FormatArray => { - if format_array__.is_some() { - return Err(serde::de::Error::duplicate_field("formatArray")); + GeneratedField::NewlineDelimited => { + if newline_delimited__.is_some() { + return Err(serde::de::Error::duplicate_field("newlineDelimited")); } - format_array__ = Some(map_.next_value()?); + newline_delimited__ = map_.next_value()?; } } } Ok(JsonOptions { compression: compression__.unwrap_or_default(), schema_infer_max_rec: schema_infer_max_rec__, - compression_level: compression_level__, - format_array: format_array__.unwrap_or_default(), + newline_delimited: newline_delimited__, }) } } diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index 24d6600e4d21f..6e8c0368fbe5f 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -659,12 +659,9 @@ pub struct JsonOptions { /// Optional max records for schema inference #[prost(uint64, optional, tag = "2")] pub schema_infer_max_rec: ::core::option::Option, - /// Optional compression level - #[prost(uint32, optional, tag = "3")] - pub compression_level: ::core::option::Option, - /// Whether the JSON is in array format \[{},...\] (default false = line-delimited) - #[prost(bool, tag = "4")] - pub format_array: bool, + /// Whether to read as newline-delimited JSON (default true). When false, expects JSON array format \[{},...\] + #[prost(bool, optional, tag = "3")] + pub newline_delimited: ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct TableParquetOptions { diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index 69f3c269c78a4..33e38826de008 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -986,8 +986,7 @@ impl TryFrom<&JsonOptions> for protobuf::JsonOptions { Ok(protobuf::JsonOptions { compression: compression.into(), schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64), - compression_level: opts.compression_level.map(|h| h as u32), - format_array: opts.format_array, + newline_delimited: Some(opts.newline_delimited), }) } } diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index 24d6600e4d21f..6e8c0368fbe5f 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -659,12 +659,9 @@ pub struct JsonOptions { /// Optional max records for schema inference #[prost(uint64, optional, tag = "2")] pub schema_infer_max_rec: ::core::option::Option, - /// Optional compression level - #[prost(uint32, optional, tag = "3")] - pub compression_level: ::core::option::Option, - /// Whether the JSON is in array format \[{},...\] (default false = line-delimited) - #[prost(bool, tag = "4")] - pub format_array: bool, + /// Whether to read as newline-delimited JSON (default true). When false, expects JSON array format \[{},...\] + #[prost(bool, optional, tag = "3")] + pub newline_delimited: ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct TableParquetOptions { diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index 3685e615420fe..0891e6aeb510f 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -238,8 +238,7 @@ impl JsonOptionsProto { JsonOptionsProto { compression: options.compression as i32, schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64), - compression_level: options.compression_level.map(|v| v as u32), - format_array: options.format_array, + newline_delimited: Some(options.newline_delimited), } } else { JsonOptionsProto::default() @@ -258,8 +257,7 @@ impl From<&JsonOptionsProto> for JsonOptions { _ => CompressionTypeVariant::UNCOMPRESSED, }, schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize), - compression_level: proto.compression_level.map(|v| v as usize), - format_array: proto.format_array, + newline_delimited: proto.newline_delimited.unwrap_or(true), } } } diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index 9c63f8078ead1..2ff3393178336 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -28,7 +28,7 @@ use datafusion::datasource::file_format::json::{JsonFormat, JsonFormatFactory}; use datafusion::datasource::listing::{ ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, }; -use datafusion::execution::options::ArrowReadOptions; +use datafusion::execution::options::{ArrowReadOptions, JsonReadOptions}; use datafusion::optimizer::eliminate_nested_union::EliminateNestedUnion; use datafusion::optimizer::Optimizer; use datafusion_common::parsers::CompressionTypeVariant; @@ -749,7 +749,7 @@ async fn create_json_scan(ctx: &SessionContext) -> Result