diff --git a/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java b/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java index 7f34019..e379fa9 100644 --- a/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java +++ b/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java @@ -7,6 +7,7 @@ import java.sql.Types; import java.util.*; import java.util.function.BiFunction; +import net.snowflake.client.jdbc.SnowflakeSQLException; import net.snowflake.client.jdbc.internal.org.bouncycastle.operator.OperatorCreationException; import net.snowflake.client.jdbc.internal.org.bouncycastle.pkcs.PKCSException; import org.embulk.config.ConfigDiff; @@ -124,6 +125,16 @@ public static MatchByColumnName fromString(String value) { } } + // error codes which need reauthenticate + // ref: + // https://github.com/snowflakedb/snowflake-jdbc/blob/v3.13.26/src/main/java/net/snowflake/client/jdbc/SnowflakeUtil.java#L42 + private static final int ID_TOKEN_EXPIRED_GS_CODE = 390110; + private static final int SESSION_NOT_EXIST_GS_CODE = 390111; + private static final int MASTER_TOKEN_NOTFOUND = 390113; + private static final int MASTER_EXPIRED_GS_CODE = 390114; + private static final int MASTER_TOKEN_INVALID_GS_CODE = 390115; + private static final int ID_TOKEN_INVALID_LOGIN_REQUEST_GS_CODE = 390195; + @Override protected Class getTaskClass() { return SnowflakePluginTask.class; @@ -204,12 +215,12 @@ public ConfigDiff transaction( snowflakeCon.runCreateStage(stageIdentifier); configDiff = super.transaction(config, schema, taskCount, control); if (t.getDeleteStage()) { - snowflakeCon.runDropStage(stageIdentifier); + runDropStageWithRecovery(snowflakeCon, stageIdentifier, task); } } catch (Exception e) { if (t.getDeleteStage() && t.getDeleteStageOnError()) { try { - snowflakeCon.runDropStage(stageIdentifier); + runDropStageWithRecovery(snowflakeCon, stageIdentifier, task); } catch (SQLException ex) { throw new RuntimeException(ex); } @@ -220,6 +231,35 @@ public ConfigDiff transaction( return configDiff; } + private void runDropStageWithRecovery( + SnowflakeOutputConnection snowflakeCon, StageIdentifier stageIdentifier, PluginTask task) + throws SQLException { + try { + snowflakeCon.runDropStage(stageIdentifier); + } catch (SnowflakeSQLException ex) { + // INFO: Don't handle only SnowflakeReauthenticationRequest here + // because SnowflakeSQLException with following error codes may be thrown in some cases. + + logger.info("SnowflakeSQLException was caught: ({}) {}", ex.getErrorCode(), ex.getMessage()); + + switch (ex.getErrorCode()) { + case ID_TOKEN_EXPIRED_GS_CODE: + case SESSION_NOT_EXIST_GS_CODE: + case MASTER_TOKEN_NOTFOUND: + case MASTER_EXPIRED_GS_CODE: + case MASTER_TOKEN_INVALID_GS_CODE: + case ID_TOKEN_INVALID_LOGIN_REQUEST_GS_CODE: + // INFO: If runCreateStage consumed a lot of time, authentication might be expired. + // In this case, retry to drop stage. + snowflakeCon = (SnowflakeOutputConnection) getConnector(task, true).connect(true); + snowflakeCon.runDropStage(stageIdentifier); + break; + default: + throw ex; + } + } + } + @Override public ConfigDiff resume( TaskSource taskSource, Schema schema, int taskCount, OutputPlugin.Control control) {