From 093a7569549fa839eb095805d3e22d460c10c35c Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Tue, 2 Jan 2024 16:02:26 +0100 Subject: [PATCH] feat(python): expose custom metadata to writers (#1994) # Description - exposes the custom_metadata to pyarrow and rust writer - addresses a bug in the create operation, we were not passing the app_metadata to the actual commit # Related Issue(s) - closes https://github.com/delta-io/delta-rs/issues/1990 --- .../deltalake-core/src/operations/create.rs | 14 ++++++++----- python/deltalake/_internal.pyi | 3 +++ python/deltalake/writer.py | 7 +++++++ python/src/lib.rs | 21 ++++++++++++++++++- 4 files changed, 39 insertions(+), 6 deletions(-) diff --git a/crates/deltalake-core/src/operations/create.rs b/crates/deltalake-core/src/operations/create.rs index 0dca038f4a..0e44fe215f 100644 --- a/crates/deltalake-core/src/operations/create.rs +++ b/crates/deltalake-core/src/operations/create.rs @@ -5,7 +5,7 @@ use std::collections::HashMap; use std::sync::Arc; use futures::future::BoxFuture; -use serde_json::{Map, Value}; +use serde_json::Value; use super::transaction::{commit, PROTOCOL}; use crate::errors::{DeltaResult, DeltaTableError}; @@ -56,7 +56,7 @@ pub struct CreateBuilder { actions: Vec, log_store: Option, configuration: HashMap>, - metadata: Option>, + metadata: Option>, } impl Default for CreateBuilder { @@ -181,8 +181,11 @@ impl CreateBuilder { /// /// This might include provenance information such as an id of the /// user that made the commit or the program that created it. - pub fn with_metadata(mut self, metadata: Map) -> Self { - self.metadata = Some(metadata); + pub fn with_metadata( + mut self, + metadata: impl IntoIterator, + ) -> Self { + self.metadata = Some(HashMap::from_iter(metadata)); self } @@ -286,6 +289,7 @@ impl std::future::IntoFuture for CreateBuilder { let this = self; Box::pin(async move { let mode = this.mode.clone(); + let app_metadata = this.metadata.clone(); let (mut table, actions, operation) = this.into_table_and_actions()?; let log_store = table.log_store(); let table_state = if log_store.is_delta_table_location().await? { @@ -310,7 +314,7 @@ impl std::future::IntoFuture for CreateBuilder { &actions, operation, table_state, - None, + app_metadata, ) .await?; table.load_version(version).await?; diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index b4d0ca8c3d..b893fc065b 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -135,6 +135,7 @@ class RawDeltaTable: partition_by: List[str], schema: pyarrow.Schema, partitions_filters: Optional[FilterType], + custom_metadata: Optional[Dict[str, str]], ) -> None: ... def cleanup_metadata(self) -> None: ... @@ -149,6 +150,7 @@ def write_new_deltalake( description: Optional[str], configuration: Optional[Mapping[str, Optional[str]]], storage_options: Optional[Dict[str, str]], + custom_metadata: Optional[Dict[str, str]], ) -> None: ... def write_to_deltalake( table_uri: str, @@ -163,6 +165,7 @@ def write_to_deltalake( configuration: Optional[Mapping[str, Optional[str]]], storage_options: Optional[Dict[str, str]], writer_properties: Optional[Dict[str, Optional[str]]], + custom_metadata: Optional[Dict[str, str]], ) -> None: ... def convert_to_deltalake( uri: str, diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 609a6487c6..7306a5705c 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -100,6 +100,7 @@ def write_deltalake( partition_filters: Optional[List[Tuple[str, str, Any]]] = ..., large_dtypes: bool = ..., engine: Literal["pyarrow"] = ..., + custom_metadata: Optional[Dict[str, str]] = ..., ) -> None: ... @@ -128,6 +129,7 @@ def write_deltalake( large_dtypes: bool = ..., engine: Literal["rust"], writer_properties: WriterProperties = ..., + custom_metadata: Optional[Dict[str, str]] = ..., ) -> None: ... @@ -163,6 +165,7 @@ def write_deltalake( large_dtypes: bool = False, engine: Literal["pyarrow", "rust"] = "pyarrow", writer_properties: Optional[WriterProperties] = None, + custom_metadata: Optional[Dict[str, str]] = None, ) -> None: """Write to a Delta Lake table @@ -236,6 +239,7 @@ def write_deltalake( engine: writer engine to write the delta table. `Rust` engine is still experimental but you may see up to 4x performance improvements over pyarrow. writer_properties: Pass writer properties to the Rust parquet writer. + custom_metadata: Custom metadata to add to the commitInfo. """ table, table_uri = try_get_table_and_table_uri(table_or_uri, storage_options) if table is not None: @@ -300,6 +304,7 @@ def write_deltalake( writer_properties=writer_properties._to_dict() if writer_properties else None, + custom_metadata=custom_metadata, ) if table: table.update_incremental() @@ -492,6 +497,7 @@ def validate_batch(batch: pa.RecordBatch) -> pa.RecordBatch: description, configuration, storage_options, + custom_metadata, ) else: table._table.create_write_transaction( @@ -500,6 +506,7 @@ def validate_batch(batch: pa.RecordBatch) -> pa.RecordBatch: partition_by or [], schema, partition_filters, + custom_metadata, ) table.update_incremental() else: diff --git a/python/src/lib.rs b/python/src/lib.rs index 55a7442281..4f921f21cf 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -741,6 +741,7 @@ impl RawDeltaTable { partition_by: Vec, schema: PyArrowType, partitions_filters: Option>, + custom_metadata: Option>, ) -> PyResult<()> { let mode = mode.parse().map_err(PythonError::from)?; @@ -803,6 +804,10 @@ impl RawDeltaTable { partition_by: Some(partition_by), predicate: None, }; + + let app_metadata = + custom_metadata.map(|md| md.into_iter().map(|(k, v)| (k, v.into())).collect()); + let store = self._table.log_store(); rt()? @@ -811,7 +816,7 @@ impl RawDeltaTable { &actions, operation, self._table.get_state(), - None, + app_metadata, )) .map_err(PythonError::from)?; @@ -1173,6 +1178,7 @@ fn write_to_deltalake( configuration: Option>>, storage_options: Option>, writer_properties: Option>>, + custom_metadata: Option>, ) -> PyResult<()> { let batches = data.0.map(|batch| batch.unwrap()).collect::>(); let save_mode = mode.parse().map_err(PythonError::from)?; @@ -1216,6 +1222,12 @@ fn write_to_deltalake( builder = builder.with_configuration(config); }; + if let Some(metadata) = custom_metadata { + let json_metadata: Map = + metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); + builder = builder.with_metadata(json_metadata); + }; + rt()? .block_on(builder.into_future()) .map_err(PythonError::from)?; @@ -1280,6 +1292,7 @@ fn write_new_deltalake( description: Option, configuration: Option>>, storage_options: Option>, + custom_metadata: Option>, ) -> PyResult<()> { let table = DeltaTableBuilder::from_uri(table_uri) .with_storage_options(storage_options.unwrap_or_default()) @@ -1306,6 +1319,12 @@ fn write_new_deltalake( builder = builder.with_configuration(config); }; + if let Some(metadata) = custom_metadata { + let json_metadata: Map = + metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); + builder = builder.with_metadata(json_metadata); + }; + rt()? .block_on(builder.into_future()) .map_err(PythonError::from)?;