Skip to content

Commit

Permalink
Handle FileAlreadyExistsException in S3DynamoDBLogStore
Browse files Browse the repository at this point in the history
## Description

Resolves #1734

It is possible that more than one concurrent reader/writers will try to fix the same incomplete entry in DynamoDB. This could result in some seeing a `FileAlreadyExistsException` when trying to copy the temp file to the delta log. We should not propagate this error to the end user since the recovery operation was successful and we can proceed. See #1734 for more details.

Note, we attempt to copy a temp file in two places:
1. As part of writing N.json [here](https://github.com/delta-io/delta/blob/master/storage-s3-dynamodb/src/main/java/io/delta/storage/BaseExternalLogStore.java#L249)
2. In `fixDeltaLog` when performing a recovery for N-1.json as part of either a write or listFrom

We only need to catch the exception in scenario (2). In scenario (1) we already catch _all_ seen errors.

This is hard to test without manipulating the FS + external store a lot. We could manipulate `FailingFileSystem` to throw a `FileAlreadyExistsException`.

Closes #1776

Signed-off-by: Allison Portis <allison.portis@databricks.com>
GitOrigin-RevId: fcce5d5577d79dff4d071ebdd63b3ee837e5b645
(cherry picked from commit 5ad6443)
  • Loading branch information
allisonport-db committed May 24, 2023
1 parent 5ee2d1a commit 306efc3
Showing 1 changed file with 12 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,11 @@ protected void fixDeltaLogPutCompleteDbEntry(ExternalCommitEntry entry) throws I
/**
* Method for assuring consistency on filesystem according to the external cache.
* Method tries to rewrite TransactionLog entry from temporary path if it does not exist.
*
* Should never throw a FileAlreadyExistsException.
* - If we see one when copying the temp file, we can assume the target file N.json already
* exists and a concurrent writer has already copied the contents of T(N).
* - We will never see one when writing to the external cache since overwrite=true.
*/
private void fixDeltaLog(FileSystem fs, ExternalCommitEntry entry) throws IOException {
if (entry.complete) {
Expand All @@ -396,7 +401,13 @@ private void fixDeltaLog(FileSystem fs, ExternalCommitEntry entry) throws IOExce
fixDeltaLogPutCompleteDbEntry(entry);
LOG.info("fixed file {}", entry.fileName);
return;
} catch(Throwable e) {
} catch (java.nio.file.FileAlreadyExistsException e) {
LOG.info("file {} already copied: {}:",
entry.fileName, e.getClass().getSimpleName(), e);
copied = true;
// Don't return since we still need to mark the DB entry as complete. This will
// happen when we execute the main try block on the next while loop iteration
} catch (Throwable e) {
LOG.info("{}:", e.getClass().getSimpleName(), e);
if (retry >= 3) {
throw e;
Expand Down

0 comments on commit 306efc3

Please sign in to comment.