-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Add coerce int96 option for Parquet to support different TimeUnits, test int96_from_spark.parquet from parquet-testing #15537
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
apache/parquet-testing#73 merged so I updated the parquet-testing dependency. Now waiting on an arrow-rs release and DF bumping to that version. |
# Conflicts: # Cargo.lock # Cargo.toml # datafusion/datasource-parquet/src/opener.rs
I believe all dependencies are updated, marking this as ready for review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM although I am not familiar with the serde part. Thanks @mbutrovich.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm.
(couple of minor nits, you may ignore)
/// which stores microsecond resolution timestamps in an int96 allowing it | ||
/// to write values with a larger date range than 64-bit timestamps with | ||
/// nanosecond resolution. | ||
pub coerce_int96: Option<String>, transform = str::to_lowercase, default = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if there is any usecase for int96 other than timestamps.
Specifically, maybe we can simply always change the behavior and coerce int96 --> microseconds
At the very least default the option to be enabled perhaps
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if there is any usecase for int96 other than timestamps.
Not as far as I know, but I don't think the (deprecated) int96 spec said that it had to represent a timestamp. It's just where Spark, Hive, Impala, etc. ended up.
Specifically, maybe we can simply always change the behavior and coerce int96 --> microseconds
At the very least default the option to be enabled perhaps
It's not clear to me if we should assume that an int96 originated from a system that treated the originating timestamp it as microseconds. While it's very likely that it originated from one of those systems, I don't know how to treat the default in this case. Snowflake, for example, seems to use microseconds for its timestamps when dealing with Iceberg:
https://docs.snowflake.com/en/user-guide/tables-iceberg-data-types#supported-data-types-for-iceberg-tables
I'm hesitant to mess with defaults, but am open to hearing more from the community. @parthchandra
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given how dominant spark is and how rarely used int96 is outside the spark ecosystem, I was thinking that basically if anyone had such a file it is likely we should treat the values as microseconds.
I don't have a strong preference, I was just trying to come up with a way to keep the code less compilcated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC, the use of int96 originated from Impala/Parquet-cpp where it was used to store nanoseconds (The C++ implementation came from the Impala team). I think the Java implementation ended up with int96 in order to be compatible. Spark came along with its own variant and well, here we are.
(https://issues.apache.org/jira/browse/PARQUET-323)
The Parquet community assumed that this was the only usage of int96 before it was deprecated so I feel it is a safe for us to assume the same.
It can be done as a follow up, though, I feel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @mbutrovich -- I am wondering what
I checked that the data seems to come out ok with datafusion 46. Can you remind me what the different would be with this option (that the timestamp type is different?)
andrewlamb@Andrews-MacBook-Pro-2:~/Software/datafusion/parquet-testing$ ~/Software/datafusion-cli/datafusion-cli-46.0.0 -c "select * from 'data/int96_from_spark.parquet'";
DataFusion CLI v46.0.0
+-------------------------------+
| a |
+-------------------------------+
| 2024-01-01T20:34:56.123456 |
| 2024-01-01T01:00:00 |
| 1816-03-29T08:56:08.066277376 |
| 2024-12-30T23:00:00 |
| NULL |
| 2147-08-27T00:35:19.850745856 |
+-------------------------------+
6 row(s) fetched.
Elapsed 0.007 seconds.
andrewlamb@Andrews-MacBook-Pro-2:~/Software/datafusion/parquet-testing$ ~/Software/datafusion-cli/datafusion-cli-46.0.0 -c "describe 'data/int96_from_spark.parquet'";
DataFusion CLI v46.0.0
+-------------+-----------------------------+-------------+
| column_name | data_type | is_nullable |
+-------------+-----------------------------+-------------+
| a | Timestamp(Nanosecond, None) | YES |
+-------------+-----------------------------+-------------+
1 row(s) fetched.
Elapsed 0.001 seconds.
@@ -58,6 +58,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | |||
| datafusion.execution.parquet.reorder_filters | false | (reading) If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend we add a test to
datafusion/datafusion/sqllogictest/test_files/parquet.slt
Lines 174 to 190 in df0d966
# Setup alltypes_plain table: | |
statement ok | |
CREATE EXTERNAL TABLE alltypes_plain ( | |
id INT NOT NULL, | |
bool_col BOOLEAN NOT NULL, | |
tinyint_col TINYINT NOT NULL, | |
smallint_col SMALLINT NOT NULL, | |
int_col INT NOT NULL, | |
bigint_col BIGINT NOT NULL, | |
float_col FLOAT NOT NULL, | |
double_col DOUBLE NOT NULL, | |
date_string_col BYTEA NOT NULL, | |
string_col VARCHAR NOT NULL, | |
timestamp_col TIMESTAMP NOT NULL, | |
) | |
STORED AS PARQUET | |
LOCATION '../../parquet-testing/data/alltypes_plain.parquet'; |
Without coercion:
With coercion:
Frustratingly, those two new nulls aren't really nulls. They're the challenging values that we want to be able to read back in Comet. However, we can't print them with current chrono behavior which is why I didn't test at the SQL layer. However, the real values are in there and we'll be able to do what we need to do in Comet at the SchemaAdapter level with this change. |
Let's go ahead and merge this. The new config is optional, so won't affect existing users, and it meets the requirements for Comet to switch over to using DataFusion's ParquetExec by default. @alamb It seems that we cannot add SQL tests until apache/arrow-rs#7287 is resolved, which we won't be able to do in time for the DataFusion 47.0.0 release. |
@@ -296,6 +297,7 @@ datafusion.execution.parquet.bloom_filter_fpp NULL (writing) Sets bloom filter f | |||
datafusion.execution.parquet.bloom_filter_ndv NULL (writing) Sets bloom filter number of distinct values. If NULL, uses default parquet writer setting | |||
datafusion.execution.parquet.bloom_filter_on_read true (writing) Use any available bloom filters when reading parquet files | |||
datafusion.execution.parquet.bloom_filter_on_write false (writing) Write bloom filters for all columns when creating parquet files | |||
datafusion.execution.parquet.coerce_int96 NULL (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for reading data from systems like Spark which stores microsecond resolution timestamps in an int96 allowing it to write values with a larger date range than 64-bit timestamps with nanosecond resolution. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While reviewiew this PR again, I think this text is not quite right -- it isn't true
, instead it takes a string value ms
, ns
, us
, etc for the timestamp resolution
External error: task 17 panicked with message "called `Result::unwrap()` on an `Err` value: Configuration(\"Unknown or unsupported parquet coerce_int96: true. Valid values are: ns, us, ms, and s.\")"
I will file a ticket
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Strangely setting it to true
seems to work in datafusion-cli 🤔
DataFusion CLI v46.0.1
> set datafusion.execution.parquet.coerce_int96 = true;
0 row(s) fetched.
Elapsed 0.000 seconds.
> show all;
...
| datafusion.execution.parquet.coerce_int96 | true |
I created a PR with some SLT tests for this feature |
Which issue does this PR close?
Rationale for this change
We are adding Spark-compatible int96 support to DataFusion Comet when using arrow-rs's Parquet reader. To achieve this, we first added support for arrow-rs to read int96 at different resolutions than nanosecond. It would previously generate nulls for non-null values. Next, we will add support to DataFusion to generate the necessary schema for arrow-rs to read int96 at the resolution that Spark expects. Finally, we will connect everything together in DataFusion Comet for accelerated Parquet reading with int96 values.
What changes are included in this PR?
ParquetOptions
to coerce int96 resolution, with serialization support (I think I did this correctly)Are these changes tested?
Added a new test that relies on new int96_from_spark.parquet in parquet-testing.
Are there any user-facing changes?
There is a new field in
ParquetOptions
. There is an API-change to apub(crate)
test function to accept a provided table schema.