Skip to content

Commit

Permalink
Update last-updated-ms for DDL operations (#956)
Browse files Browse the repository at this point in the history
* Update last-updated-ms for DDL operations

* Update last-updated-ms if there are valid changes

* Update unit tests
  • Loading branch information
soumya-ghosh committed Jul 25, 2024
1 parent 055938d commit dd8d76d
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 1 deletion.
7 changes: 7 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -911,6 +911,9 @@ def is_added_sort_order(self, sort_order_id: int) -> bool:
update.sort_order.order_id == sort_order_id for update in self._updates if isinstance(update, AddSortOrderUpdate)
)

def has_changes(self) -> bool:
return len(self._updates) > 0


@singledispatch
def _apply_table_update(update: TableUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
Expand Down Expand Up @@ -1185,6 +1188,10 @@ def update_table_metadata(
for update in updates:
new_metadata = _apply_table_update(update, new_metadata, context)

# Update last_updated_ms if it was not updated by update operations
if context.has_changes() and base_metadata.last_updated_ms == new_metadata.last_updated_ms:
new_metadata = new_metadata.model_copy(update={"last_updated_ms": datetime_to_millis(datetime.now().astimezone())})

if enforce_validation:
return TableMetadataUtil.parse_obj(new_metadata.model_dump())
else:
Expand Down
2 changes: 2 additions & 0 deletions tests/catalog/test_glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,7 @@ def test_create_table_transaction(
partition_spec=PartitionSpec(PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="foo")),
properties={"format-version": format_version},
) as txn:
last_updated_metadata = txn.table_metadata.last_updated_ms
with txn.update_schema() as update_schema:
update_schema.add_column(path="b", field_type=IntegerType())

Expand All @@ -887,6 +888,7 @@ def test_create_table_transaction(
assert table.spec().fields_by_source_id(2)[0].name == "bar"
assert table.spec().fields_by_source_id(2)[0].field_id == 1001
assert table.spec().fields_by_source_id(2)[0].transform == IdentityTransform()
assert table.metadata.last_updated_ms > last_updated_metadata


@mock_aws
Expand Down
2 changes: 2 additions & 0 deletions tests/catalog/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -1270,6 +1270,7 @@ def test_commit_table(catalog: SqlCatalog, table_schema_nested: Schema, table_id
namespace = Catalog.namespace_from(table_identifier_nocatalog)
catalog.create_namespace(namespace)
table = catalog.create_table(table_identifier, table_schema_nested)
last_updated_ms = table.metadata.last_updated_ms

assert catalog._parse_metadata_version(table.metadata_location) == 0
assert table.metadata.current_schema_id == 0
Expand All @@ -1289,6 +1290,7 @@ def test_commit_table(catalog: SqlCatalog, table_schema_nested: Schema, table_id
assert new_schema
assert new_schema == update._apply()
assert new_schema.find_field("b").field_type == IntegerType()
assert updated_table_metadata.last_updated_ms > last_updated_ms


@pytest.mark.parametrize(
Expand Down
4 changes: 3 additions & 1 deletion tests/table/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,7 @@ def test_apply_set_properties_update(table_v2: Table) -> None:
"test_b": "test_b",
"test_c": "test_c",
}
assert new_metadata_add_only.last_updated_ms > base_metadata.last_updated_ms


def test_apply_remove_properties_update(table_v2: Table) -> None:
Expand Down Expand Up @@ -689,7 +690,7 @@ def test_update_metadata_add_snapshot(table_v2: Table) -> None:
snapshot_id=25,
parent_snapshot_id=19,
sequence_number=200,
timestamp_ms=1602638573590,
timestamp_ms=1602638593590,
manifest_list="s3:/a/b/c.avro",
summary=Summary(Operation.APPEND),
schema_id=3,
Expand Down Expand Up @@ -759,6 +760,7 @@ def test_update_metadata_add_update_sort_order(table_v2: Table) -> None:
assert len(new_metadata.sort_orders) == 2
assert new_metadata.sort_orders[-1] == new_sort_order
assert new_metadata.default_sort_order_id == new_sort_order.order_id
assert new_metadata.last_updated_ms > table_v2.metadata.last_updated_ms


def test_update_metadata_update_sort_order_invalid(table_v2: Table) -> None:
Expand Down

0 comments on commit dd8d76d

Please sign in to comment.