Skip to content

Commit

Permalink
feat(python): expose custom metadata to writers (#1994)
Browse files Browse the repository at this point in the history
# Description
- exposes the custom_metadata to pyarrow and rust writer
- addresses a bug in the create operation, we were not passing the
app_metadata to the actual commit

# Related Issue(s)
- closes #1990
  • Loading branch information
ion-elgreco committed Jan 2, 2024
1 parent f54bb28 commit 093a756
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 6 deletions.
14 changes: 9 additions & 5 deletions crates/deltalake-core/src/operations/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::collections::HashMap;
use std::sync::Arc;

use futures::future::BoxFuture;
use serde_json::{Map, Value};
use serde_json::Value;

use super::transaction::{commit, PROTOCOL};
use crate::errors::{DeltaResult, DeltaTableError};
Expand Down Expand Up @@ -56,7 +56,7 @@ pub struct CreateBuilder {
actions: Vec<Action>,
log_store: Option<LogStoreRef>,
configuration: HashMap<String, Option<String>>,
metadata: Option<Map<String, Value>>,
metadata: Option<HashMap<String, Value>>,
}

impl Default for CreateBuilder {
Expand Down Expand Up @@ -181,8 +181,11 @@ impl CreateBuilder {
///
/// This might include provenance information such as an id of the
/// user that made the commit or the program that created it.
pub fn with_metadata(mut self, metadata: Map<String, Value>) -> Self {
self.metadata = Some(metadata);
pub fn with_metadata(
mut self,
metadata: impl IntoIterator<Item = (String, serde_json::Value)>,
) -> Self {
self.metadata = Some(HashMap::from_iter(metadata));
self
}

Expand Down Expand Up @@ -286,6 +289,7 @@ impl std::future::IntoFuture for CreateBuilder {
let this = self;
Box::pin(async move {
let mode = this.mode.clone();
let app_metadata = this.metadata.clone();
let (mut table, actions, operation) = this.into_table_and_actions()?;
let log_store = table.log_store();
let table_state = if log_store.is_delta_table_location().await? {
Expand All @@ -310,7 +314,7 @@ impl std::future::IntoFuture for CreateBuilder {
&actions,
operation,
table_state,
None,
app_metadata,
)
.await?;
table.load_version(version).await?;
Expand Down
3 changes: 3 additions & 0 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ class RawDeltaTable:
partition_by: List[str],
schema: pyarrow.Schema,
partitions_filters: Optional[FilterType],
custom_metadata: Optional[Dict[str, str]],
) -> None: ...
def cleanup_metadata(self) -> None: ...

Expand All @@ -149,6 +150,7 @@ def write_new_deltalake(
description: Optional[str],
configuration: Optional[Mapping[str, Optional[str]]],
storage_options: Optional[Dict[str, str]],
custom_metadata: Optional[Dict[str, str]],
) -> None: ...
def write_to_deltalake(
table_uri: str,
Expand All @@ -163,6 +165,7 @@ def write_to_deltalake(
configuration: Optional[Mapping[str, Optional[str]]],
storage_options: Optional[Dict[str, str]],
writer_properties: Optional[Dict[str, Optional[str]]],
custom_metadata: Optional[Dict[str, str]],
) -> None: ...
def convert_to_deltalake(
uri: str,
Expand Down
7 changes: 7 additions & 0 deletions python/deltalake/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def write_deltalake(
partition_filters: Optional[List[Tuple[str, str, Any]]] = ...,
large_dtypes: bool = ...,
engine: Literal["pyarrow"] = ...,
custom_metadata: Optional[Dict[str, str]] = ...,
) -> None:
...

Expand Down Expand Up @@ -128,6 +129,7 @@ def write_deltalake(
large_dtypes: bool = ...,
engine: Literal["rust"],
writer_properties: WriterProperties = ...,
custom_metadata: Optional[Dict[str, str]] = ...,
) -> None:
...

Expand Down Expand Up @@ -163,6 +165,7 @@ def write_deltalake(
large_dtypes: bool = False,
engine: Literal["pyarrow", "rust"] = "pyarrow",
writer_properties: Optional[WriterProperties] = None,
custom_metadata: Optional[Dict[str, str]] = None,
) -> None:
"""Write to a Delta Lake table
Expand Down Expand Up @@ -236,6 +239,7 @@ def write_deltalake(
engine: writer engine to write the delta table. `Rust` engine is still experimental but you may
see up to 4x performance improvements over pyarrow.
writer_properties: Pass writer properties to the Rust parquet writer.
custom_metadata: Custom metadata to add to the commitInfo.
"""
table, table_uri = try_get_table_and_table_uri(table_or_uri, storage_options)
if table is not None:
Expand Down Expand Up @@ -300,6 +304,7 @@ def write_deltalake(
writer_properties=writer_properties._to_dict()
if writer_properties
else None,
custom_metadata=custom_metadata,
)
if table:
table.update_incremental()
Expand Down Expand Up @@ -492,6 +497,7 @@ def validate_batch(batch: pa.RecordBatch) -> pa.RecordBatch:
description,
configuration,
storage_options,
custom_metadata,
)
else:
table._table.create_write_transaction(
Expand All @@ -500,6 +506,7 @@ def validate_batch(batch: pa.RecordBatch) -> pa.RecordBatch:
partition_by or [],
schema,
partition_filters,
custom_metadata,
)
table.update_incremental()
else:
Expand Down
21 changes: 20 additions & 1 deletion python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,7 @@ impl RawDeltaTable {
partition_by: Vec<String>,
schema: PyArrowType<ArrowSchema>,
partitions_filters: Option<Vec<(&str, &str, PartitionFilterValue)>>,
custom_metadata: Option<HashMap<String, String>>,
) -> PyResult<()> {
let mode = mode.parse().map_err(PythonError::from)?;

Expand Down Expand Up @@ -803,6 +804,10 @@ impl RawDeltaTable {
partition_by: Some(partition_by),
predicate: None,
};

let app_metadata =
custom_metadata.map(|md| md.into_iter().map(|(k, v)| (k, v.into())).collect());

let store = self._table.log_store();

rt()?
Expand All @@ -811,7 +816,7 @@ impl RawDeltaTable {
&actions,
operation,
self._table.get_state(),
None,
app_metadata,
))
.map_err(PythonError::from)?;

Expand Down Expand Up @@ -1173,6 +1178,7 @@ fn write_to_deltalake(
configuration: Option<HashMap<String, Option<String>>>,
storage_options: Option<HashMap<String, String>>,
writer_properties: Option<HashMap<String, Option<String>>>,
custom_metadata: Option<HashMap<String, String>>,
) -> PyResult<()> {
let batches = data.0.map(|batch| batch.unwrap()).collect::<Vec<_>>();
let save_mode = mode.parse().map_err(PythonError::from)?;
Expand Down Expand Up @@ -1216,6 +1222,12 @@ fn write_to_deltalake(
builder = builder.with_configuration(config);
};

if let Some(metadata) = custom_metadata {
let json_metadata: Map<String, Value> =
metadata.into_iter().map(|(k, v)| (k, v.into())).collect();
builder = builder.with_metadata(json_metadata);
};

rt()?
.block_on(builder.into_future())
.map_err(PythonError::from)?;
Expand Down Expand Up @@ -1280,6 +1292,7 @@ fn write_new_deltalake(
description: Option<String>,
configuration: Option<HashMap<String, Option<String>>>,
storage_options: Option<HashMap<String, String>>,
custom_metadata: Option<HashMap<String, String>>,
) -> PyResult<()> {
let table = DeltaTableBuilder::from_uri(table_uri)
.with_storage_options(storage_options.unwrap_or_default())
Expand All @@ -1306,6 +1319,12 @@ fn write_new_deltalake(
builder = builder.with_configuration(config);
};

if let Some(metadata) = custom_metadata {
let json_metadata: Map<String, Value> =
metadata.into_iter().map(|(k, v)| (k, v.into())).collect();
builder = builder.with_metadata(json_metadata);
};

rt()?
.block_on(builder.into_future())
.map_err(PythonError::from)?;
Expand Down

0 comments on commit 093a756

Please sign in to comment.