Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: reconnect when auth error occurs in runDropStage method #77

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 42 additions & 2 deletions src/main/java/org/embulk/output/SnowflakeOutputPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.sql.Types;
import java.util.*;
import java.util.function.BiFunction;
import net.snowflake.client.jdbc.SnowflakeSQLException;
import net.snowflake.client.jdbc.internal.org.bouncycastle.operator.OperatorCreationException;
import net.snowflake.client.jdbc.internal.org.bouncycastle.pkcs.PKCSException;
import org.embulk.config.ConfigDiff;
Expand Down Expand Up @@ -124,6 +125,16 @@ public static MatchByColumnName fromString(String value) {
}
}

// error codes which need reauthenticate
// ref:
// https://github.com/snowflakedb/snowflake-jdbc/blob/v3.13.26/src/main/java/net/snowflake/client/jdbc/SnowflakeUtil.java#L42
private static final int ID_TOKEN_EXPIRED_GS_CODE = 390110;
private static final int SESSION_NOT_EXIST_GS_CODE = 390111;
private static final int MASTER_TOKEN_NOTFOUND = 390113;
private static final int MASTER_EXPIRED_GS_CODE = 390114;
private static final int MASTER_TOKEN_INVALID_GS_CODE = 390115;
private static final int ID_TOKEN_INVALID_LOGIN_REQUEST_GS_CODE = 390195;

@Override
protected Class<? extends PluginTask> getTaskClass() {
return SnowflakePluginTask.class;
Expand Down Expand Up @@ -204,12 +215,12 @@ public ConfigDiff transaction(
snowflakeCon.runCreateStage(stageIdentifier);
configDiff = super.transaction(config, schema, taskCount, control);
if (t.getDeleteStage()) {
snowflakeCon.runDropStage(stageIdentifier);
runDropStageWithRecovery(snowflakeCon, stageIdentifier, task);
}
} catch (Exception e) {
if (t.getDeleteStage() && t.getDeleteStageOnError()) {
try {
snowflakeCon.runDropStage(stageIdentifier);
runDropStageWithRecovery(snowflakeCon, stageIdentifier, task);
} catch (SQLException ex) {
throw new RuntimeException(ex);
}
Expand All @@ -220,6 +231,35 @@ public ConfigDiff transaction(
return configDiff;
}

private void runDropStageWithRecovery(
SnowflakeOutputConnection snowflakeCon, StageIdentifier stageIdentifier, PluginTask task)
throws SQLException {
try {
snowflakeCon.runDropStage(stageIdentifier);
} catch (SnowflakeSQLException ex) {
// INFO: Don't handle only SnowflakeReauthenticationRequest here
// because SnowflakeSQLException with following error codes may be thrown in some cases.

logger.info("SnowflakeSQLException was caught: ({}) {}", ex.getErrorCode(), ex.getMessage());

switch (ex.getErrorCode()) {
Pasukaru1996 marked this conversation as resolved.
Show resolved Hide resolved
case ID_TOKEN_EXPIRED_GS_CODE:
case SESSION_NOT_EXIST_GS_CODE:
case MASTER_TOKEN_NOTFOUND:
case MASTER_EXPIRED_GS_CODE:
case MASTER_TOKEN_INVALID_GS_CODE:
case ID_TOKEN_INVALID_LOGIN_REQUEST_GS_CODE:
// INFO: If runCreateStage consumed a lot of time, authentication might be expired.
// In this case, retry to drop stage.
snowflakeCon = (SnowflakeOutputConnection) getConnector(task, true).connect(true);
snowflakeCon.runDropStage(stageIdentifier);
break;
default:
throw ex;
}
}
}

@Override
public ConfigDiff resume(
TaskSource taskSource, Schema schema, int taskCount, OutputPlugin.Control control) {
Expand Down
Loading