Skip to content

Commit

Permalink
Merge pull request #82 from trocco-io/24500-reconnect
Browse files Browse the repository at this point in the history
fix: reconnect when auth error occurs in runDropStage method
  • Loading branch information
NamedPython authored Nov 7, 2024
2 parents 5fd4e49 + ed4f320 commit b7ab45d
Showing 1 changed file with 42 additions and 2 deletions.
44 changes: 42 additions & 2 deletions src/main/java/org/embulk/output/SnowflakeOutputPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.sql.Types;
import java.util.*;
import java.util.function.BiFunction;
import net.snowflake.client.jdbc.SnowflakeSQLException;
import net.snowflake.client.jdbc.internal.org.bouncycastle.operator.OperatorCreationException;
import net.snowflake.client.jdbc.internal.org.bouncycastle.pkcs.PKCSException;
import org.embulk.config.ConfigDiff;
Expand Down Expand Up @@ -124,6 +125,16 @@ public static MatchByColumnName fromString(String value) {
}
}

// error codes which need reauthenticate
// ref:
// https://github.com/snowflakedb/snowflake-jdbc/blob/v3.13.26/src/main/java/net/snowflake/client/jdbc/SnowflakeUtil.java#L42
private static final int ID_TOKEN_EXPIRED_GS_CODE = 390110;
private static final int SESSION_NOT_EXIST_GS_CODE = 390111;
private static final int MASTER_TOKEN_NOTFOUND = 390113;
private static final int MASTER_EXPIRED_GS_CODE = 390114;
private static final int MASTER_TOKEN_INVALID_GS_CODE = 390115;
private static final int ID_TOKEN_INVALID_LOGIN_REQUEST_GS_CODE = 390195;

@Override
protected Class<? extends PluginTask> getTaskClass() {
return SnowflakePluginTask.class;
Expand Down Expand Up @@ -204,12 +215,12 @@ public ConfigDiff transaction(
snowflakeCon.runCreateStage(stageIdentifier);
configDiff = super.transaction(config, schema, taskCount, control);
if (t.getDeleteStage()) {
snowflakeCon.runDropStage(stageIdentifier);
runDropStageWithRecovery(snowflakeCon, stageIdentifier, task);
}
} catch (Exception e) {
if (t.getDeleteStage() && t.getDeleteStageOnError()) {
try {
snowflakeCon.runDropStage(stageIdentifier);
runDropStageWithRecovery(snowflakeCon, stageIdentifier, task);
} catch (SQLException ex) {
throw new RuntimeException(ex);
}
Expand All @@ -220,6 +231,35 @@ public ConfigDiff transaction(
return configDiff;
}

private void runDropStageWithRecovery(
SnowflakeOutputConnection snowflakeCon, StageIdentifier stageIdentifier, PluginTask task)
throws SQLException {
try {
snowflakeCon.runDropStage(stageIdentifier);
} catch (SnowflakeSQLException ex) {
// INFO: Don't handle only SnowflakeReauthenticationRequest here
// because SnowflakeSQLException with following error codes may be thrown in some cases.

logger.info("SnowflakeSQLException was caught: ({}) {}", ex.getErrorCode(), ex.getMessage());

switch (ex.getErrorCode()) {
case ID_TOKEN_EXPIRED_GS_CODE:
case SESSION_NOT_EXIST_GS_CODE:
case MASTER_TOKEN_NOTFOUND:
case MASTER_EXPIRED_GS_CODE:
case MASTER_TOKEN_INVALID_GS_CODE:
case ID_TOKEN_INVALID_LOGIN_REQUEST_GS_CODE:
// INFO: If runCreateStage consumed a lot of time, authentication might be expired.
// In this case, retry to drop stage.
snowflakeCon = (SnowflakeOutputConnection) getConnector(task, true).connect(true);
snowflakeCon.runDropStage(stageIdentifier);
break;
default:
throw ex;
}
}
}

@Override
public ConfigDiff resume(
TaskSource taskSource, Schema schema, int taskCount, OutputPlugin.Control control) {
Expand Down

0 comments on commit b7ab45d

Please sign in to comment.