Skip to content

Commit

Permalink
Add retry logic for COPY operation in SnowflakeCopyBatchInsert with c…
Browse files Browse the repository at this point in the history
…onfigurable max retries

- Introduced a new configuration parameter max_copy_retries to specify the maximum number of retries for the COPY operation in Snowflake.
- Updated SnowflakeOutputPlugin to include max_copy_retries configuration.
- Modified SnowflakeCopyBatchInsert to handle retries for the COPY operation using the newly added parameter.
- Added a method isRetryableForCopyTask to determine whether a COPY error is retryable.
- Adjusted constructor and method signatures to include maxCopyRetries where necessary.
- Ensured proper resource cleanup and retry handling for the COPY task.
- Improved logging for retries to provide more context during errors.
  • Loading branch information
kentoyoshida committed Jan 10, 2025
1 parent b7ab45d commit 76dc706
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 23 deletions.
5 changes: 5 additions & 0 deletions src/main/java/org/embulk/output/SnowflakeOutputPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ public interface SnowflakePluginTask extends PluginTask {
@ConfigDefault("3")
public int getMaxUploadRetries();

@Config("max_copy_retries")
@ConfigDefault("3")
public int getMaxCopyRetries();

@Config("empty_field_as_null")
@ConfigDefault("true")
public boolean getEmtpyFieldAsNull();
Expand Down Expand Up @@ -321,6 +325,7 @@ protected BatchInsert newBatchInsert(PluginTask task, Optional<MergeConfig> merg
pluginTask.getCopyIntoCSVColumnNumbers(),
false,
pluginTask.getMaxUploadRetries(),
pluginTask.getMaxCopyRetries(),
pluginTask.getEmtpyFieldAsNull());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class SnowflakeCopyBatchInsert implements BatchInsert {
protected static final String newLineString = "\n";
protected static final String delimiterString = "\t";
private final int maxUploadRetries;
private final int maxCopyRetries;

private SnowflakeOutputConnection connection = null;
private TableIdentifier tableIdentifier = null;
Expand All @@ -51,6 +52,7 @@ public SnowflakeCopyBatchInsert(
int[] copyIntoCSVColumnNumbers,
boolean deleteStageFile,
int maxUploadRetries,
int maxCopyRetries,
boolean emptyFieldAsNull)
throws IOException {
this.index = 0;
Expand All @@ -63,6 +65,7 @@ public SnowflakeCopyBatchInsert(
this.deleteStageFile = deleteStageFile;
this.uploadAndCopyFutures = new ArrayList();
this.maxUploadRetries = maxUploadRetries;
this.maxCopyRetries = maxCopyRetries;
this.emptyFieldAsNull = emptyFieldAsNull;
}

Expand Down Expand Up @@ -270,7 +273,8 @@ public void flush() throws IOException, SQLException {
Future<Void> uploadFuture = executorService.submit(uploadTask);
uploadAndCopyFutures.add(uploadFuture);

CopyTask copyTask = new CopyTask(uploadFuture, snowflakeStageFileName, emptyFieldAsNull);
CopyTask copyTask =
new CopyTask(uploadFuture, snowflakeStageFileName, emptyFieldAsNull, maxCopyRetries);
uploadAndCopyFutures.add(executorService.submit(copyTask));

fileCount++;
Expand Down Expand Up @@ -404,40 +408,62 @@ private class CopyTask implements Callable<Void> {
private final Future<Void> uploadFuture;
private final String snowflakeStageFileName;
private final boolean emptyFieldAsNull;
private final int maxCopyRetries;

public CopyTask(
Future<Void> uploadFuture, String snowflakeStageFileName, boolean emptyFieldAsNull) {
Future<Void> uploadFuture,
String snowflakeStageFileName,
boolean emptyFieldAsNull,
int maxCopyRetries) {
this.uploadFuture = uploadFuture;
this.snowflakeStageFileName = snowflakeStageFileName;
this.emptyFieldAsNull = emptyFieldAsNull;
this.maxCopyRetries = maxCopyRetries;
}

public Void call() throws SQLException, InterruptedException, ExecutionException {
try {
uploadFuture.get();

SnowflakeOutputConnection con = (SnowflakeOutputConnection) connector.connect(true);
try {
logger.info("Running COPY from file {}", snowflakeStageFileName);

long startTime = System.currentTimeMillis();
con.runCopy(
tableIdentifier,
stageIdentifier,
snowflakeStageFileName,
copyIntoTableColumnNames,
copyIntoCSVColumnNumbers,
delimiterString,
emptyFieldAsNull);

double seconds = (System.currentTimeMillis() - startTime) / 1000.0;

logger.info(
String.format(
"Loaded file %s (%.2f seconds for COPY)", snowflakeStageFileName, seconds));

} finally {
con.close();
int retries = 0;
while (true) {
try {
logger.info("Running COPY from file {}", snowflakeStageFileName);

long startTime = System.currentTimeMillis();
con.runCopy(
tableIdentifier,
stageIdentifier,
snowflakeStageFileName,
copyIntoTableColumnNames,
copyIntoCSVColumnNumbers,
delimiterString,
emptyFieldAsNull);

double seconds = (System.currentTimeMillis() - startTime) / 1000.0;

logger.info(
String.format(
"Loaded file %s (%.2f seconds for COPY)", snowflakeStageFileName, seconds));

break;
} catch (SQLException e) {
if (!isRetryableForCopyTask(e)) {
throw e;
}

retries++;
if (retries > this.maxCopyRetries) {
throw e;
}
logger.warn(
String.format(
"Copy error %s file %s retries: %d", e, snowflakeStageFileName, retries));
Thread.sleep(retries * retries * 1000);
} finally {
con.close();
}
}
} finally {
if (deleteStageFile) {
Expand All @@ -447,5 +473,10 @@ public Void call() throws SQLException, InterruptedException, ExecutionException

return null;
}

private boolean isRetryableForCopyTask(SQLException e) {
String message = e.getMessage();
return message != null && message.contains("JDBC driver encountered communication error");
}
}
}

0 comments on commit 76dc706

Please sign in to comment.