Skip to content

Commit

Permalink
Merge pull request #83 from trocco-io/add-rety-for-copy-task
Browse files Browse the repository at this point in the history
Add retry logic for COPY operation in SnowflakeCopyBatchInsert with configurable max retries
  • Loading branch information
kentoyoshida authored Jan 14, 2025
2 parents b7ab45d + 7a3f515 commit f612567
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 24 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ Snowflake output plugin for Embulk loads records to Snowflake.
- **retry_limit**: max retry count for database operations (integer, default: 12). When intermediate table to create already created by another process, this plugin will retry with another table name to avoid collision.
- **retry_wait**: initial retry wait time in milliseconds (integer, default: 1000 (1 second))
- **max_retry_wait**: upper limit of retry wait, which will be doubled at every retry (integer, default: 1800000 (30 minutes))
- **max_upload_retries**: maximum number of retries for file upload to Snowflake (integer, default: 3).
- **max_copy_retries**: maximum number of retries for COPY operations in Snowflake (integer, default: 3). Retries occur when transient errors such as communication failures happen during the COPY process.
- **mode**: "insert", "insert_direct", "truncate_insert", "replace" or "merge". See below. (string, required)
- **merge_keys**: key column names for merging records in merge mode (string array, required in merge mode if table doesn't have primary key)
- **merge_rule**: list of column assignments for updating existing records used in merge mode, for example `"foo" = T."foo" + S."foo"` (`T` means target table and `S` means source table). (string array, default: always overwrites with new values)
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/org/embulk/output/SnowflakeOutputPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ public interface SnowflakePluginTask extends PluginTask {
@ConfigDefault("3")
public int getMaxUploadRetries();

@Config("max_copy_retries")
@ConfigDefault("3")
public int getMaxCopyRetries();

@Config("empty_field_as_null")
@ConfigDefault("true")
public boolean getEmtpyFieldAsNull();
Expand Down Expand Up @@ -321,6 +325,7 @@ protected BatchInsert newBatchInsert(PluginTask task, Optional<MergeConfig> merg
pluginTask.getCopyIntoCSVColumnNumbers(),
false,
pluginTask.getMaxUploadRetries(),
pluginTask.getMaxCopyRetries(),
pluginTask.getEmtpyFieldAsNull());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class SnowflakeCopyBatchInsert implements BatchInsert {
protected static final String newLineString = "\n";
protected static final String delimiterString = "\t";
private final int maxUploadRetries;
private final int maxCopyRetries;

private SnowflakeOutputConnection connection = null;
private TableIdentifier tableIdentifier = null;
Expand All @@ -51,6 +52,7 @@ public SnowflakeCopyBatchInsert(
int[] copyIntoCSVColumnNumbers,
boolean deleteStageFile,
int maxUploadRetries,
int maxCopyRetries,
boolean emptyFieldAsNull)
throws IOException {
this.index = 0;
Expand All @@ -63,6 +65,7 @@ public SnowflakeCopyBatchInsert(
this.deleteStageFile = deleteStageFile;
this.uploadAndCopyFutures = new ArrayList();
this.maxUploadRetries = maxUploadRetries;
this.maxCopyRetries = maxCopyRetries;
this.emptyFieldAsNull = emptyFieldAsNull;
}

Expand Down Expand Up @@ -270,7 +273,8 @@ public void flush() throws IOException, SQLException {
Future<Void> uploadFuture = executorService.submit(uploadTask);
uploadAndCopyFutures.add(uploadFuture);

CopyTask copyTask = new CopyTask(uploadFuture, snowflakeStageFileName, emptyFieldAsNull);
CopyTask copyTask =
new CopyTask(uploadFuture, snowflakeStageFileName, emptyFieldAsNull, maxCopyRetries);
uploadAndCopyFutures.add(executorService.submit(copyTask));

fileCount++;
Expand Down Expand Up @@ -404,40 +408,60 @@ private class CopyTask implements Callable<Void> {
private final Future<Void> uploadFuture;
private final String snowflakeStageFileName;
private final boolean emptyFieldAsNull;
private final int maxCopyRetries;

public CopyTask(
Future<Void> uploadFuture, String snowflakeStageFileName, boolean emptyFieldAsNull) {
Future<Void> uploadFuture,
String snowflakeStageFileName,
boolean emptyFieldAsNull,
int maxCopyRetries) {
this.uploadFuture = uploadFuture;
this.snowflakeStageFileName = snowflakeStageFileName;
this.emptyFieldAsNull = emptyFieldAsNull;
this.maxCopyRetries = maxCopyRetries;
}

public Void call() throws SQLException, InterruptedException, ExecutionException {
try {
uploadFuture.get();

SnowflakeOutputConnection con = (SnowflakeOutputConnection) connector.connect(true);
try {
logger.info("Running COPY from file {}", snowflakeStageFileName);

long startTime = System.currentTimeMillis();
con.runCopy(
tableIdentifier,
stageIdentifier,
snowflakeStageFileName,
copyIntoTableColumnNames,
copyIntoCSVColumnNumbers,
delimiterString,
emptyFieldAsNull);

double seconds = (System.currentTimeMillis() - startTime) / 1000.0;

logger.info(
String.format(
"Loaded file %s (%.2f seconds for COPY)", snowflakeStageFileName, seconds));

} finally {
con.close();
int retries = 0;
while (true) {
try (SnowflakeOutputConnection con =
(SnowflakeOutputConnection) connector.connect(true)) {
logger.info("Running COPY from file {}", snowflakeStageFileName);

long startTime = System.currentTimeMillis();
con.runCopy(
tableIdentifier,
stageIdentifier,
snowflakeStageFileName,
copyIntoTableColumnNames,
copyIntoCSVColumnNumbers,
delimiterString,
emptyFieldAsNull);

double seconds = (System.currentTimeMillis() - startTime) / 1000.0;

logger.info(
String.format(
"Loaded file %s (%.2f seconds for COPY)", snowflakeStageFileName, seconds));

break;
} catch (SQLException e) {
if (!isRetryableForCopyTask(e)) {
throw e;
}

retries++;
if (retries > this.maxCopyRetries) {
throw e;
}
logger.warn(
String.format(
"Copy error %s file %s retries: %d", e, snowflakeStageFileName, retries));
Thread.sleep(retries * retries * 1000);
}
}
} finally {
if (deleteStageFile) {
Expand All @@ -447,5 +471,10 @@ public Void call() throws SQLException, InterruptedException, ExecutionException

return null;
}

private boolean isRetryableForCopyTask(SQLException e) {
String message = e.getMessage();
return message != null && message.contains("JDBC driver encountered communication error");
}
}
}

0 comments on commit f612567

Please sign in to comment.