Skip to content

Commit 760b752

Browse files
jembishopJem Bishop
andauthored
Raise concurrency errors properly for glue tables (#1875)
## Which issue does this PR close? #1868 ## What changes are included in this PR? As referenced in the issue, the optimistic concurrency was not working correctly as we need to check the glue table version has not incremented before we make our update, so we can get back the error if there is a concurrent modification. This changes the `update_table` to properly handle this case. Also changed the `load_table` to `load_table_with_version_id`, and `load_table` now just uses that. <!-- Provide a summary of the modifications in this PR. List the main changes such as new features, bug fixes, refactoring, or any other updates. --> ## Are these changes tested? I have tested them using my specific setup with concurrent writers to iceberg tables in aws glue, but not generically. Writing a test case doesn't seem feasible as one needs a glue table. <!-- Specify what test covers (unit test, integration test, etc.). If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> --------- Co-authored-by: Jem Bishop <jem@lo.tech>
1 parent a667539 commit 760b752

File tree

1 file changed

+69
-39
lines changed

1 file changed

+69
-39
lines changed

crates/catalog/glue/src/catalog.rs

Lines changed: 69 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,62 @@ impl GlueCatalog {
196196
pub fn file_io(&self) -> FileIO {
197197
self.file_io.clone()
198198
}
199+
200+
/// Loads a table from the Glue Catalog along with its version_id for optimistic locking.
201+
///
202+
/// # Returns
203+
/// A `Result` wrapping a tuple of (`Table`, `Option<String>`) where the String is the version_id
204+
/// from Glue that should be used for optimistic concurrency control when updating the table.
205+
///
206+
/// # Errors
207+
/// This function may return an error in several scenarios, including:
208+
/// - Failure to validate the namespace.
209+
/// - Failure to retrieve the table from the Glue Catalog.
210+
/// - Absence of metadata location information in the table's properties.
211+
/// - Issues reading or deserializing the table's metadata file.
212+
async fn load_table_with_version_id(
213+
&self,
214+
table: &TableIdent,
215+
) -> Result<(Table, Option<String>)> {
216+
let db_name = validate_namespace(table.namespace())?;
217+
let table_name = table.name();
218+
219+
let builder = self
220+
.client
221+
.0
222+
.get_table()
223+
.database_name(&db_name)
224+
.name(table_name);
225+
let builder = with_catalog_id!(builder, self.config);
226+
227+
let glue_table_output = builder.send().await.map_err(from_aws_sdk_error)?;
228+
229+
let glue_table = glue_table_output.table().ok_or_else(|| {
230+
Error::new(
231+
ErrorKind::TableNotFound,
232+
format!(
233+
"Table object for database: {db_name} and table: {table_name} does not exist"
234+
),
235+
)
236+
})?;
237+
238+
let version_id = glue_table.version_id.clone();
239+
let metadata_location = get_metadata_location(&glue_table.parameters)?;
240+
241+
let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
242+
243+
let table = Table::builder()
244+
.file_io(self.file_io())
245+
.metadata_location(metadata_location)
246+
.metadata(metadata)
247+
.identifier(TableIdent::new(
248+
NamespaceIdent::new(db_name),
249+
table_name.to_owned(),
250+
))
251+
.build()?;
252+
253+
Ok((table, version_id))
254+
}
199255
}
200256

201257
#[async_trait]
@@ -514,42 +570,8 @@ impl Catalog for GlueCatalog {
514570
/// - Absence of metadata location information in the table's properties.
515571
/// - Issues reading or deserializing the table's metadata file.
516572
async fn load_table(&self, table: &TableIdent) -> Result<Table> {
517-
let db_name = validate_namespace(table.namespace())?;
518-
let table_name = table.name();
519-
520-
let builder = self
521-
.client
522-
.0
523-
.get_table()
524-
.database_name(&db_name)
525-
.name(table_name);
526-
let builder = with_catalog_id!(builder, self.config);
527-
528-
let glue_table_output = builder.send().await.map_err(from_aws_sdk_error)?;
529-
530-
match glue_table_output.table() {
531-
None => Err(Error::new(
532-
ErrorKind::TableNotFound,
533-
format!(
534-
"Table object for database: {db_name} and table: {table_name} does not exist"
535-
),
536-
)),
537-
Some(table) => {
538-
let metadata_location = get_metadata_location(&table.parameters)?;
539-
540-
let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
541-
542-
Table::builder()
543-
.file_io(self.file_io())
544-
.metadata_location(metadata_location)
545-
.metadata(metadata)
546-
.identifier(TableIdent::new(
547-
NamespaceIdent::new(db_name),
548-
table_name.to_owned(),
549-
))
550-
.build()
551-
}
552-
}
573+
let (table, _) = self.load_table_with_version_id(table).await?;
574+
Ok(table)
553575
}
554576

555577
/// Asynchronously drops a table from the database.
@@ -761,7 +783,9 @@ impl Catalog for GlueCatalog {
761783
async fn update_table(&self, commit: TableCommit) -> Result<Table> {
762784
let table_ident = commit.identifier().clone();
763785
let table_namespace = validate_namespace(table_ident.namespace())?;
764-
let current_table = self.load_table(&table_ident).await?;
786+
787+
let (current_table, current_version_id) =
788+
self.load_table_with_version_id(&table_ident).await?;
765789
let current_metadata_location = current_table.metadata_location_result()?.to_string();
766790

767791
let staged_table = commit.apply(current_table)?;
@@ -773,8 +797,8 @@ impl Catalog for GlueCatalog {
773797
.write_to(staged_table.file_io(), staged_metadata_location)
774798
.await?;
775799

776-
// Persist staged table to Glue
777-
let builder = self
800+
// Persist staged table to Glue with optimistic locking
801+
let mut builder = self
778802
.client
779803
.0
780804
.update_table()
@@ -787,6 +811,12 @@ impl Catalog for GlueCatalog {
787811
staged_table.metadata().properties(),
788812
Some(current_metadata_location),
789813
)?);
814+
815+
// Add VersionId for optimistic locking
816+
if let Some(version_id) = current_version_id {
817+
builder = builder.version_id(version_id);
818+
}
819+
790820
let builder = with_catalog_id!(builder, self.config);
791821
let _ = builder.send().await.map_err(|e| {
792822
let error = e.into_service_error();

0 commit comments

Comments
 (0)