From b3691fccbd1763308dfbd9348f218f07045af958 Mon Sep 17 00:00:00 2001 From: Oleksii Smotrov Date: Wed, 26 Nov 2025 19:19:54 +0200 Subject: [PATCH] feat: add compression level configuration for JSON/CSV writers Adds `compression_level` option to `JsonOptions` and `CsvOptions` allowing users to specify compression level for ZSTD, GZIP, BZIP2, and XZ algorithms. - Add compression_level field to JsonOptions and CsvOptions in config.rs - Add convert_async_writer_with_level method (non-breaking, extends API) - Keep original convert_async_writer for backward compatibility - Update JsonWriterOptions and CsvWriterOptions with compression_level - Update ObjectWriterBuilder to support compression level - Update JSON and CSV sinks to pass compression level through - Update proto definitions and conversions for serialization Closes #18947 --- datafusion/common/src/config.rs | 25 ++++++++++ .../common/src/file_options/csv_writer.rs | 17 +++++++ .../common/src/file_options/json_writer.rs | 18 ++++++- datafusion/datasource-csv/src/file_format.rs | 1 + datafusion/datasource-json/src/file_format.rs | 1 + .../datasource/src/file_compression_type.rs | 50 ++++++++++++++++--- datafusion/datasource/src/write/mod.rs | 23 ++++++++- .../datasource/src/write/orchestration.rs | 2 + datafusion/expr/src/udf.rs | 7 ++- .../proto/datafusion_common.proto | 2 + datafusion/proto-common/src/from_proto/mod.rs | 2 + .../proto-common/src/generated/pbjson.rs | 40 +++++++++++++++ .../proto-common/src/generated/prost.rs | 6 +++ datafusion/proto-common/src/to_proto/mod.rs | 2 + .../src/generated/datafusion_proto_common.rs | 6 +++ .../proto/src/logical_plan/file_formats.rs | 4 ++ 16 files changed, 194 insertions(+), 12 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 9ba2550c3941..1c52e252cc6b 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1661,6 +1661,7 @@ config_field!(bool, value => default_config_transform(value.to_lowercase().as_st config_field!(usize); config_field!(f64); config_field!(u64); +config_field!(u32); impl ConfigField for u8 { fn visit(&self, v: &mut V, key: &str, description: &'static str) { @@ -2786,6 +2787,14 @@ config_namespace! { /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting. pub newlines_in_values: Option, default = None pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED + /// Compression level for the output file. The valid range depends on the + /// compression algorithm: + /// - ZSTD: 1 to 22 (default: 3) + /// - GZIP: 0 to 10 (default: varies by implementation) + /// - BZIP2: 0 to 9 (default: 6) + /// - XZ: 0 to 9 (default: 6) + /// If not specified, the default level for the compression algorithm is used. + pub compression_level: Option, default = None pub schema_infer_max_rec: Option, default = None pub date_format: Option, default = None pub datetime_format: Option, default = None @@ -2908,6 +2917,14 @@ impl CsvOptions { self } + /// Set the compression level for the output file. + /// The valid range depends on the compression algorithm. + /// If not specified, the default level for the algorithm is used. + pub fn with_compression_level(mut self, level: u32) -> Self { + self.compression_level = Some(level); + self + } + /// The delimiter character. pub fn delimiter(&self) -> u8 { self.delimiter @@ -2933,6 +2950,14 @@ config_namespace! { /// Options controlling JSON format pub struct JsonOptions { pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED + /// Compression level for the output file. The valid range depends on the + /// compression algorithm: + /// - ZSTD: 1 to 22 (default: 3) + /// - GZIP: 0 to 10 (default: varies by implementation) + /// - BZIP2: 0 to 9 (default: 6) + /// - XZ: 0 to 9 (default: 6) + /// If not specified, the default level for the compression algorithm is used. + pub compression_level: Option, default = None pub schema_infer_max_rec: Option, default = None } } diff --git a/datafusion/common/src/file_options/csv_writer.rs b/datafusion/common/src/file_options/csv_writer.rs index 943288af9164..8f9f82086e7f 100644 --- a/datafusion/common/src/file_options/csv_writer.rs +++ b/datafusion/common/src/file_options/csv_writer.rs @@ -31,6 +31,8 @@ pub struct CsvWriterOptions { /// Compression to apply after ArrowWriter serializes RecordBatches. /// This compression is applied by DataFusion not the ArrowWriter itself. pub compression: CompressionTypeVariant, + /// Compression level for the output file. + pub compression_level: Option, } impl CsvWriterOptions { @@ -41,6 +43,20 @@ impl CsvWriterOptions { Self { writer_options, compression, + compression_level: None, + } + } + + /// Create a new `CsvWriterOptions` with the specified compression level. + pub fn new_with_level( + writer_options: WriterBuilder, + compression: CompressionTypeVariant, + compression_level: Option, + ) -> Self { + Self { + writer_options, + compression, + compression_level, } } } @@ -81,6 +97,7 @@ impl TryFrom<&CsvOptions> for CsvWriterOptions { Ok(CsvWriterOptions { writer_options: builder, compression: value.compression, + compression_level: value.compression_level, }) } } diff --git a/datafusion/common/src/file_options/json_writer.rs b/datafusion/common/src/file_options/json_writer.rs index 750d2972329b..3e4c673086c5 100644 --- a/datafusion/common/src/file_options/json_writer.rs +++ b/datafusion/common/src/file_options/json_writer.rs @@ -27,11 +27,26 @@ use crate::{ #[derive(Clone, Debug)] pub struct JsonWriterOptions { pub compression: CompressionTypeVariant, + pub compression_level: Option, } impl JsonWriterOptions { pub fn new(compression: CompressionTypeVariant) -> Self { - Self { compression } + Self { + compression, + compression_level: None, + } + } + + /// Create a new `JsonWriterOptions` with the specified compression and level. + pub fn new_with_level( + compression: CompressionTypeVariant, + compression_level: Option, + ) -> Self { + Self { + compression, + compression_level, + } } } @@ -41,6 +56,7 @@ impl TryFrom<&JsonOptions> for JsonWriterOptions { fn try_from(value: &JsonOptions) -> Result { Ok(JsonWriterOptions { compression: value.compression, + compression_level: value.compression_level, }) } } diff --git a/datafusion/datasource-csv/src/file_format.rs b/datafusion/datasource-csv/src/file_format.rs index 6b27687a56f7..cc660c4f8ea3 100644 --- a/datafusion/datasource-csv/src/file_format.rs +++ b/datafusion/datasource-csv/src/file_format.rs @@ -780,6 +780,7 @@ impl FileSink for CsvSink { context, serializer, self.writer_options.compression.into(), + self.writer_options.compression_level, object_store, demux_task, file_stream_rx, diff --git a/datafusion/datasource-json/src/file_format.rs b/datafusion/datasource-json/src/file_format.rs index afb12e526271..eb48293e3629 100644 --- a/datafusion/datasource-json/src/file_format.rs +++ b/datafusion/datasource-json/src/file_format.rs @@ -379,6 +379,7 @@ impl FileSink for JsonSink { context, serializer, self.writer_options.compression.into(), + self.writer_options.compression_level, object_store, demux_task, file_stream_rx, diff --git a/datafusion/datasource/src/file_compression_type.rs b/datafusion/datasource/src/file_compression_type.rs index 9ca5d8763b74..82d9aea253cf 100644 --- a/datafusion/datasource/src/file_compression_type.rs +++ b/datafusion/datasource/src/file_compression_type.rs @@ -155,25 +155,63 @@ impl FileCompressionType { } /// Wrap the given `BufWriter` so that it performs compressed writes - /// according to this `FileCompressionType`. + /// according to this `FileCompressionType` using the default compression level. pub fn convert_async_writer( &self, w: BufWriter, ) -> Result> { + self.convert_async_writer_with_level(w, None) + } + + /// Wrap the given `BufWriter` so that it performs compressed writes + /// according to this `FileCompressionType`. + /// + /// If `compression_level` is `Some`, the encoder will use the specified + /// compression level. If `None`, the default level for each algorithm is used. + pub fn convert_async_writer_with_level( + &self, + w: BufWriter, + compression_level: Option, + ) -> Result> { + #[cfg(feature = "compression")] + use async_compression::Level; + Ok(match self.variant { #[cfg(feature = "compression")] - GZIP => Box::new(GzipEncoder::new(w)), + GZIP => match compression_level { + Some(level) => { + Box::new(GzipEncoder::with_quality(w, Level::Precise(level as i32))) + } + None => Box::new(GzipEncoder::new(w)), + }, #[cfg(feature = "compression")] - BZIP2 => Box::new(BzEncoder::new(w)), + BZIP2 => match compression_level { + Some(level) => { + Box::new(BzEncoder::with_quality(w, Level::Precise(level as i32))) + } + None => Box::new(BzEncoder::new(w)), + }, #[cfg(feature = "compression")] - XZ => Box::new(XzEncoder::new(w)), + XZ => match compression_level { + Some(level) => { + Box::new(XzEncoder::with_quality(w, Level::Precise(level as i32))) + } + None => Box::new(XzEncoder::new(w)), + }, #[cfg(feature = "compression")] - ZSTD => Box::new(ZstdEncoder::new(w)), + ZSTD => match compression_level { + Some(level) => { + Box::new(ZstdEncoder::with_quality(w, Level::Precise(level as i32))) + } + None => Box::new(ZstdEncoder::new(w)), + }, #[cfg(not(feature = "compression"))] GZIP | BZIP2 | XZ | ZSTD => { + // compression_level is not used when compression feature is disabled + let _ = compression_level; return Err(DataFusionError::NotImplemented( "Compression feature is not enabled".to_owned(), - )) + )); } UNCOMPRESSED => Box::new(w), }) diff --git a/datafusion/datasource/src/write/mod.rs b/datafusion/datasource/src/write/mod.rs index 85832f81bc18..a53060586b57 100644 --- a/datafusion/datasource/src/write/mod.rs +++ b/datafusion/datasource/src/write/mod.rs @@ -131,6 +131,8 @@ pub struct ObjectWriterBuilder { object_store: Arc, /// The size of the buffer for the object writer. buffer_size: Option, + /// The compression level for the object writer. + compression_level: Option, } impl ObjectWriterBuilder { @@ -145,6 +147,7 @@ impl ObjectWriterBuilder { location: location.clone(), object_store, buffer_size: None, + compression_level: None, } } @@ -202,6 +205,22 @@ impl ObjectWriterBuilder { self.buffer_size } + /// Set compression level for object writer. + pub fn set_compression_level(&mut self, compression_level: Option) { + self.compression_level = compression_level; + } + + /// Set compression level for object writer, returning the builder. + pub fn with_compression_level(mut self, compression_level: Option) -> Self { + self.compression_level = compression_level; + self + } + + /// Currently specified compression level. + pub fn get_compression_level(&self) -> Option { + self.compression_level + } + /// Return a writer object that writes to the object store location. /// /// If a buffer size has not been set, the default buffer buffer size will @@ -215,6 +234,7 @@ impl ObjectWriterBuilder { location, object_store, buffer_size, + compression_level, } = self; let buf_writer = match buffer_size { @@ -222,6 +242,7 @@ impl ObjectWriterBuilder { None => BufWriter::new(object_store, location), }; - file_compression_type.convert_async_writer(buf_writer) + file_compression_type + .convert_async_writer_with_level(buf_writer, compression_level) } } diff --git a/datafusion/datasource/src/write/orchestration.rs b/datafusion/datasource/src/write/orchestration.rs index ab836b7b7f38..7e131d46808f 100644 --- a/datafusion/datasource/src/write/orchestration.rs +++ b/datafusion/datasource/src/write/orchestration.rs @@ -240,6 +240,7 @@ pub async fn spawn_writer_tasks_and_join( context: &Arc, serializer: Arc, compression: FileCompressionType, + compression_level: Option, object_store: Arc, demux_task: SpawnedTask>, mut file_stream_rx: DemuxedStreamReceiver, @@ -265,6 +266,7 @@ pub async fn spawn_writer_tasks_and_join( .execution .objectstore_writer_buffer_size, )) + .with_compression_level(compression_level) .build()?; if tx_file_bundle diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index 89d8569a261b..adb2ca28eebd 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -25,10 +25,9 @@ use crate::udf_eq::UdfEq; use crate::{ColumnarValue, Documentation, Expr, Signature}; use arrow::datatypes::{DataType, Field, FieldRef}; use datafusion_common::config::ConfigOptions; -use datafusion_common::{ - assert_or_internal_err, not_impl_err, DataFusionError, ExprSchema, Result, - ScalarValue, -}; +#[cfg(debug_assertions)] +use datafusion_common::{assert_or_internal_err, DataFusionError}; +use datafusion_common::{not_impl_err, ExprSchema, Result, ScalarValue}; use datafusion_expr_common::dyn_eq::{DynEq, DynHash}; use datafusion_expr_common::interval_arithmetic::Interval; use std::any::Any; diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 267953556b16..a2634365b961 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -461,12 +461,14 @@ message CsvOptions { bytes newlines_in_values = 16; // Indicates if newlines are supported in values bytes terminator = 17; // Optional terminator character as a byte bytes truncated_rows = 18; // Indicates if truncated rows are allowed + optional uint32 compression_level = 19; // Optional compression level } // Options controlling CSV format message JsonOptions { CompressionTypeVariant compression = 1; // Compression type optional uint64 schema_infer_max_rec = 2; // Optional max records for schema inference + optional uint32 compression_level = 3; // Optional compression level } message TableParquetOptions { diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index 4ede5b970eae..b42ff98b7e92 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -903,6 +903,7 @@ impl TryFrom<&protobuf::CsvOptions> for CsvOptions { double_quote: proto_opts.has_header.first().map(|h| *h != 0), newlines_in_values: proto_opts.newlines_in_values.first().map(|h| *h != 0), compression: proto_opts.compression().into(), + compression_level: proto_opts.compression_level, schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as usize), date_format: (!proto_opts.date_format.is_empty()) .then(|| proto_opts.date_format.clone()), @@ -1091,6 +1092,7 @@ impl TryFrom<&protobuf::JsonOptions> for JsonOptions { let compression: protobuf::CompressionTypeVariant = proto_opts.compression(); Ok(JsonOptions { compression: compression.into(), + compression_level: proto_opts.compression_level, schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as usize), }) } diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index e63f345459b8..8c2afbc7a985 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -1666,6 +1666,9 @@ impl serde::Serialize for CsvOptions { if !self.truncated_rows.is_empty() { len += 1; } + if self.compression_level.is_some() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion_common.CsvOptions", len)?; if !self.has_header.is_empty() { #[allow(clippy::needless_borrow)] @@ -1743,6 +1746,9 @@ impl serde::Serialize for CsvOptions { #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("truncatedRows", pbjson::private::base64::encode(&self.truncated_rows).as_str())?; } + if let Some(v) = self.compression_level.as_ref() { + struct_ser.serialize_field("compressionLevel", v)?; + } struct_ser.end() } } @@ -1783,6 +1789,8 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { "terminator", "truncated_rows", "truncatedRows", + "compression_level", + "compressionLevel", ]; #[allow(clippy::enum_variant_names)] @@ -1805,6 +1813,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { NewlinesInValues, Terminator, TruncatedRows, + CompressionLevel, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -1844,6 +1853,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { "newlinesInValues" | "newlines_in_values" => Ok(GeneratedField::NewlinesInValues), "terminator" => Ok(GeneratedField::Terminator), "truncatedRows" | "truncated_rows" => Ok(GeneratedField::TruncatedRows), + "compressionLevel" | "compression_level" => Ok(GeneratedField::CompressionLevel), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -1881,6 +1891,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { let mut newlines_in_values__ = None; let mut terminator__ = None; let mut truncated_rows__ = None; + let mut compression_level__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::HasHeader => { @@ -2011,6 +2022,14 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { Some(map_.next_value::<::pbjson::private::BytesDeserialize<_>>()?.0) ; } + GeneratedField::CompressionLevel => { + if compression_level__.is_some() { + return Err(serde::de::Error::duplicate_field("compressionLevel")); + } + compression_level__ = + map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| x.0) + ; + } } } Ok(CsvOptions { @@ -2032,6 +2051,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { newlines_in_values: newlines_in_values__.unwrap_or_default(), terminator: terminator__.unwrap_or_default(), truncated_rows: truncated_rows__.unwrap_or_default(), + compression_level: compression_level__, }) } } @@ -4548,6 +4568,9 @@ impl serde::Serialize for JsonOptions { if self.schema_infer_max_rec.is_some() { len += 1; } + if self.compression_level.is_some() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion_common.JsonOptions", len)?; if self.compression != 0 { let v = CompressionTypeVariant::try_from(self.compression) @@ -4559,6 +4582,9 @@ impl serde::Serialize for JsonOptions { #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("schemaInferMaxRec", ToString::to_string(&v).as_str())?; } + if let Some(v) = self.compression_level.as_ref() { + struct_ser.serialize_field("compressionLevel", v)?; + } struct_ser.end() } } @@ -4572,12 +4598,15 @@ impl<'de> serde::Deserialize<'de> for JsonOptions { "compression", "schema_infer_max_rec", "schemaInferMaxRec", + "compression_level", + "compressionLevel", ]; #[allow(clippy::enum_variant_names)] enum GeneratedField { Compression, SchemaInferMaxRec, + CompressionLevel, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -4601,6 +4630,7 @@ impl<'de> serde::Deserialize<'de> for JsonOptions { match value { "compression" => Ok(GeneratedField::Compression), "schemaInferMaxRec" | "schema_infer_max_rec" => Ok(GeneratedField::SchemaInferMaxRec), + "compressionLevel" | "compression_level" => Ok(GeneratedField::CompressionLevel), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -4622,6 +4652,7 @@ impl<'de> serde::Deserialize<'de> for JsonOptions { { let mut compression__ = None; let mut schema_infer_max_rec__ = None; + let mut compression_level__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::Compression => { @@ -4638,11 +4669,20 @@ impl<'de> serde::Deserialize<'de> for JsonOptions { map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| x.0) ; } + GeneratedField::CompressionLevel => { + if compression_level__.is_some() { + return Err(serde::de::Error::duplicate_field("compressionLevel")); + } + compression_level__ = + map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| x.0) + ; + } } } Ok(JsonOptions { compression: compression__.unwrap_or_default(), schema_infer_max_rec: schema_infer_max_rec__, + compression_level: compression_level__, }) } } diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index aa7c3d51a9d6..d0ca751925f7 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -649,6 +649,9 @@ pub struct CsvOptions { /// Indicates if truncated rows are allowed #[prost(bytes = "vec", tag = "18")] pub truncated_rows: ::prost::alloc::vec::Vec, + /// Optional compression level + #[prost(uint32, optional, tag = "19")] + pub compression_level: ::core::option::Option, } /// Options controlling CSV format #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] @@ -659,6 +662,9 @@ pub struct JsonOptions { /// Optional max records for schema inference #[prost(uint64, optional, tag = "2")] pub schema_infer_max_rec: ::core::option::Option, + /// Optional compression level + #[prost(uint32, optional, tag = "3")] + pub compression_level: ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct TableParquetOptions { diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index e9de1d9e9a9e..bac51cd5947a 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -974,6 +974,7 @@ impl TryFrom<&CsvOptions> for protobuf::CsvOptions { null_regex: opts.null_regex.clone().unwrap_or_default(), comment: opts.comment.map_or_else(Vec::new, |h| vec![h]), truncated_rows: opts.truncated_rows.map_or_else(Vec::new, |h| vec![h as u8]), + compression_level: opts.compression_level, }) } } @@ -986,6 +987,7 @@ impl TryFrom<&JsonOptions> for protobuf::JsonOptions { Ok(protobuf::JsonOptions { compression: compression.into(), schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64), + compression_level: opts.compression_level, }) } } diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index aa7c3d51a9d6..d0ca751925f7 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -649,6 +649,9 @@ pub struct CsvOptions { /// Indicates if truncated rows are allowed #[prost(bytes = "vec", tag = "18")] pub truncated_rows: ::prost::alloc::vec::Vec, + /// Optional compression level + #[prost(uint32, optional, tag = "19")] + pub compression_level: ::core::option::Option, } /// Options controlling CSV format #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] @@ -659,6 +662,9 @@ pub struct JsonOptions { /// Optional max records for schema inference #[prost(uint64, optional, tag = "2")] pub schema_infer_max_rec: ::core::option::Option, + /// Optional compression level + #[prost(uint32, optional, tag = "3")] + pub compression_level: ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct TableParquetOptions { diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index d32bfb22ffdd..ffa81f15e062 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -62,6 +62,7 @@ impl CsvOptionsProto { .newlines_in_values .map_or(vec![], |v| vec![v as u8]), truncated_rows: options.truncated_rows.map_or(vec![], |v| vec![v as u8]), + compression_level: options.compression_level, } } else { CsvOptionsProto::default() @@ -152,6 +153,7 @@ impl From<&CsvOptionsProto> for CsvOptions { } else { Some(proto.truncated_rows[0] != 0) }, + compression_level: proto.compression_level, } } } @@ -238,6 +240,7 @@ impl JsonOptionsProto { JsonOptionsProto { compression: options.compression as i32, schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64), + compression_level: options.compression_level, } } else { JsonOptionsProto::default() @@ -256,6 +259,7 @@ impl From<&JsonOptionsProto> for JsonOptions { _ => CompressionTypeVariant::UNCOMPRESSED, }, schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize), + compression_level: proto.compression_level, } } }