From 168a68e6387e74311d7475dd90137e2be8f5fbb8 Mon Sep 17 00:00:00 2001 From: maeken Date: Mon, 26 Feb 2024 04:39:17 +0000 Subject: [PATCH 1/7] =?UTF-8?q?transaction=E3=83=A1=E3=82=BD=E3=83=83?= =?UTF-8?q?=E3=83=89=E3=82=92override?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../embulk/output/SnowflakeOutputPlugin.java | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java b/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java index a7e201e..ce30524 100644 --- a/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java +++ b/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java @@ -6,6 +6,7 @@ import java.util.*; import org.embulk.config.ConfigDiff; import org.embulk.config.ConfigException; +import org.embulk.config.ConfigSource; import org.embulk.config.TaskSource; import org.embulk.output.jdbc.*; import org.embulk.output.snowflake.PrivateKeyReader; @@ -129,6 +130,36 @@ protected JdbcOutputConnector getConnector(PluginTask task, boolean retryableMet return new SnowflakeOutputConnector(url, props, t.getTransactionIsolation()); } + @Override + public ConfigDiff transaction(ConfigSource config, + Schema schema, int taskCount, + OutputPlugin.Control control) + { + PluginTask task = CONFIG_MAPPER.map(config, this.getTaskClass()); + SnowflakePluginTask t = (SnowflakePluginTask) task; + this.stageIdentifier = StageIdentifierHolder.getStageIdentifier(t); + ConfigDiff configDiff; + SnowflakeOutputConnection snowflakeCon = null; + + try { + snowflakeCon = (SnowflakeOutputConnection) getConnector(task, true).connect(true); + snowflakeCon.runCreateStage(this.stageIdentifier); + configDiff = super.transaction(config, schema, taskCount, control); + } catch (SQLException ex) { + throw new RuntimeException(ex); + } finally { + if (t.getDeleteStage()) { + try { + snowflakeCon.runDropStage(this.stageIdentifier); + } catch (SQLException ex) { + throw new RuntimeException(ex); + } + } + } + + return configDiff; + } + @Override public ConfigDiff resume( TaskSource taskSource, Schema schema, int taskCount, OutputPlugin.Control control) { From 081c74a2ee6382713f03195c51b53d13954787fd Mon Sep 17 00:00:00 2001 From: maeken Date: Mon, 26 Feb 2024 05:45:56 +0000 Subject: [PATCH 2/7] =?UTF-8?q?SnowflakeStage=E9=96=A2=E9=80=A3=E3=81=AE?= =?UTF-8?q?=E6=97=A2=E5=AD=98=E8=A8=98=E8=BF=B0=E3=82=92=E5=89=8A=E9=99=A4?= =?UTF-8?q?=E3=83=BB=E6=95=B4=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../embulk/output/SnowflakeOutputPlugin.java | 26 +------------------ .../snowflake/SnowflakeCopyBatchInsert.java | 1 - 2 files changed, 1 insertion(+), 26 deletions(-) diff --git a/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java b/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java index ce30524..face4cc 100644 --- a/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java +++ b/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java @@ -166,22 +166,6 @@ public ConfigDiff resume( throw new UnsupportedOperationException("snowflake output plugin does not support resuming"); } - @Override - protected void doCommit(JdbcOutputConnection con, PluginTask task, int taskCount) - throws SQLException { - super.doCommit(con, task, taskCount); - SnowflakeOutputConnection snowflakeCon = (SnowflakeOutputConnection) con; - - SnowflakePluginTask t = (SnowflakePluginTask) task; - if (this.stageIdentifier == null) { - this.stageIdentifier = StageIdentifierHolder.getStageIdentifier(t); - } - - if (t.getDeleteStage()) { - snowflakeCon.runDropStage(this.stageIdentifier); - } - } - @Override protected void doBegin( JdbcOutputConnection con, PluginTask task, final Schema schema, int taskCount) @@ -196,16 +180,8 @@ protected BatchInsert newBatchInsert(PluginTask task, Optional merg throw new UnsupportedOperationException( "Snowflake output plugin doesn't support 'merge_direct' mode."); } - - SnowflakePluginTask t = (SnowflakePluginTask) task; - // TODO: put some where executes once - if (this.stageIdentifier == null) { - SnowflakeOutputConnection snowflakeCon = - (SnowflakeOutputConnection) getConnector(task, true).connect(true); - this.stageIdentifier = StageIdentifierHolder.getStageIdentifier(t); - snowflakeCon.runCreateStage(this.stageIdentifier); - } SnowflakePluginTask pluginTask = (SnowflakePluginTask) task; + this.stageIdentifier = StageIdentifierHolder.getStageIdentifier(pluginTask); return new SnowflakeCopyBatchInsert( getConnector(task, true), diff --git a/src/main/java/org/embulk/output/snowflake/SnowflakeCopyBatchInsert.java b/src/main/java/org/embulk/output/snowflake/SnowflakeCopyBatchInsert.java index 18f8155..a1ebd8c 100644 --- a/src/main/java/org/embulk/output/snowflake/SnowflakeCopyBatchInsert.java +++ b/src/main/java/org/embulk/output/snowflake/SnowflakeCopyBatchInsert.java @@ -61,7 +61,6 @@ public SnowflakeCopyBatchInsert( @Override public void prepare(TableIdentifier loadTable, JdbcSchema insertSchema) throws SQLException { this.connection = (SnowflakeOutputConnection) connector.connect(true); - this.connection.runCreateStage(stageIdentifier); this.tableIdentifier = loadTable; } From 0ed819534f961b1ab79aead15c8154cf005a15f0 Mon Sep 17 00:00:00 2001 From: maeken Date: Mon, 26 Feb 2024 06:06:43 +0000 Subject: [PATCH 3/7] ./gradlew :spotlessApply --- .../embulk/output/SnowflakeOutputPlugin.java | 46 +++++++++---------- 1 file changed, 22 insertions(+), 24 deletions(-) diff --git a/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java b/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java index face4cc..c8413a3 100644 --- a/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java +++ b/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java @@ -131,33 +131,31 @@ protected JdbcOutputConnector getConnector(PluginTask task, boolean retryableMet } @Override - public ConfigDiff transaction(ConfigSource config, - Schema schema, int taskCount, - OutputPlugin.Control control) - { - PluginTask task = CONFIG_MAPPER.map(config, this.getTaskClass()); - SnowflakePluginTask t = (SnowflakePluginTask) task; - this.stageIdentifier = StageIdentifierHolder.getStageIdentifier(t); - ConfigDiff configDiff; - SnowflakeOutputConnection snowflakeCon = null; - - try { - snowflakeCon = (SnowflakeOutputConnection) getConnector(task, true).connect(true); - snowflakeCon.runCreateStage(this.stageIdentifier); - configDiff = super.transaction(config, schema, taskCount, control); - } catch (SQLException ex) { - throw new RuntimeException(ex); - } finally { - if (t.getDeleteStage()) { - try { - snowflakeCon.runDropStage(this.stageIdentifier); - } catch (SQLException ex) { - throw new RuntimeException(ex); - } + public ConfigDiff transaction( + ConfigSource config, Schema schema, int taskCount, OutputPlugin.Control control) { + PluginTask task = CONFIG_MAPPER.map(config, this.getTaskClass()); + SnowflakePluginTask t = (SnowflakePluginTask) task; + this.stageIdentifier = StageIdentifierHolder.getStageIdentifier(t); + ConfigDiff configDiff; + SnowflakeOutputConnection snowflakeCon = null; + + try { + snowflakeCon = (SnowflakeOutputConnection) getConnector(task, true).connect(true); + snowflakeCon.runCreateStage(this.stageIdentifier); + configDiff = super.transaction(config, schema, taskCount, control); + } catch (SQLException ex) { + throw new RuntimeException(ex); + } finally { + if (t.getDeleteStage()) { + try { + snowflakeCon.runDropStage(this.stageIdentifier); + } catch (SQLException ex) { + throw new RuntimeException(ex); } } + } - return configDiff; + return configDiff; } @Override From 6c9633b793aebe846e3b72d275bc3d7796ec9773 Mon Sep 17 00:00:00 2001 From: maeken Date: Mon, 26 Feb 2024 10:03:34 +0000 Subject: [PATCH 4/7] add logging DDL for STAGE --- .../embulk/output/snowflake/SnowflakeOutputConnection.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/main/java/org/embulk/output/snowflake/SnowflakeOutputConnection.java b/src/main/java/org/embulk/output/snowflake/SnowflakeOutputConnection.java index 5bc161b..1168629 100644 --- a/src/main/java/org/embulk/output/snowflake/SnowflakeOutputConnection.java +++ b/src/main/java/org/embulk/output/snowflake/SnowflakeOutputConnection.java @@ -11,8 +11,12 @@ import org.embulk.output.jdbc.JdbcSchema; import org.embulk.output.jdbc.MergeConfig; import org.embulk.output.jdbc.TableIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class SnowflakeOutputConnection extends JdbcOutputConnection { + private final Logger logger = LoggerFactory.getLogger(SnowflakeOutputConnection.class); + public SnowflakeOutputConnection(Connection connection) throws SQLException { super(connection, null); } @@ -32,11 +36,13 @@ public void runCopy( public void runCreateStage(StageIdentifier stageIdentifier) throws SQLException { String sql = buildCreateStageSQL(stageIdentifier); runUpdate(sql); + logger.info("SQL: {}", sql); } public void runDropStage(StageIdentifier stageIdentifier) throws SQLException { String sql = buildDropStageSQL(stageIdentifier); runUpdate(sql); + logger.info("SQL: {}", sql); } public void runUploadFile( From d0fad24ea109553bb2d7e894d06bfd606dd8b971 Mon Sep 17 00:00:00 2001 From: maeken Date: Wed, 28 Feb 2024 01:47:32 +0000 Subject: [PATCH 5/7] add delete_stage_on_error option --- .../org/embulk/output/SnowflakeOutputPlugin.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java b/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java index c8413a3..8c2112b 100644 --- a/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java +++ b/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java @@ -66,6 +66,10 @@ public interface SnowflakePluginTask extends PluginTask { @Config("empty_field_as_null") @ConfigDefault("true") public boolean getEmtpyFieldAsNull(); + + @Config("delete_stage_on_error") + @ConfigDefault("false") + public boolean getDeleteStageOnError(); } @Override @@ -143,16 +147,16 @@ public ConfigDiff transaction( snowflakeCon = (SnowflakeOutputConnection) getConnector(task, true).connect(true); snowflakeCon.runCreateStage(this.stageIdentifier); configDiff = super.transaction(config, schema, taskCount, control); - } catch (SQLException ex) { - throw new RuntimeException(ex); - } finally { - if (t.getDeleteStage()) { + snowflakeCon.runDropStage(this.stageIdentifier); + } catch (Exception e) { + if (t.getDeleteStageOnError()) { try { snowflakeCon.runDropStage(this.stageIdentifier); } catch (SQLException ex) { throw new RuntimeException(ex); } } + throw new RuntimeException(e); } return configDiff; From 62415056ae360815adb10d723cb7df64d750bf95 Mon Sep 17 00:00:00 2001 From: maeken Date: Wed, 28 Feb 2024 02:09:03 +0000 Subject: [PATCH 6/7] consider delete_stage option to delete stage --- src/main/java/org/embulk/output/SnowflakeOutputPlugin.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java b/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java index 8c2112b..e3e2363 100644 --- a/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java +++ b/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java @@ -147,9 +147,11 @@ public ConfigDiff transaction( snowflakeCon = (SnowflakeOutputConnection) getConnector(task, true).connect(true); snowflakeCon.runCreateStage(this.stageIdentifier); configDiff = super.transaction(config, schema, taskCount, control); - snowflakeCon.runDropStage(this.stageIdentifier); + if (t.getDeleteStage()) { + snowflakeCon.runDropStage(this.stageIdentifier); + } } catch (Exception e) { - if (t.getDeleteStageOnError()) { + if (t.getDeleteStage() && t.getDeleteStageOnError()) { try { snowflakeCon.runDropStage(this.stageIdentifier); } catch (SQLException ex) { From dd8d24a8100243370eae047613221cefc565ff90 Mon Sep 17 00:00:00 2001 From: maeken Date: Fri, 1 Mar 2024 01:03:50 +0000 Subject: [PATCH 7/7] remove unnecessary instance variable --- .../org/embulk/output/SnowflakeOutputPlugin.java | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java b/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java index e3e2363..bddc9cc 100644 --- a/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java +++ b/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java @@ -23,8 +23,6 @@ import org.embulk.util.config.ConfigDefault; public class SnowflakeOutputPlugin extends AbstractJdbcOutputPlugin { - private StageIdentifier stageIdentifier; - public interface SnowflakePluginTask extends PluginTask { @Config("driver_path") @ConfigDefault("null") @@ -139,21 +137,21 @@ public ConfigDiff transaction( ConfigSource config, Schema schema, int taskCount, OutputPlugin.Control control) { PluginTask task = CONFIG_MAPPER.map(config, this.getTaskClass()); SnowflakePluginTask t = (SnowflakePluginTask) task; - this.stageIdentifier = StageIdentifierHolder.getStageIdentifier(t); + StageIdentifier stageIdentifier = StageIdentifierHolder.getStageIdentifier(t); ConfigDiff configDiff; SnowflakeOutputConnection snowflakeCon = null; try { snowflakeCon = (SnowflakeOutputConnection) getConnector(task, true).connect(true); - snowflakeCon.runCreateStage(this.stageIdentifier); + snowflakeCon.runCreateStage(stageIdentifier); configDiff = super.transaction(config, schema, taskCount, control); if (t.getDeleteStage()) { - snowflakeCon.runDropStage(this.stageIdentifier); + snowflakeCon.runDropStage(stageIdentifier); } } catch (Exception e) { if (t.getDeleteStage() && t.getDeleteStageOnError()) { try { - snowflakeCon.runDropStage(this.stageIdentifier); + snowflakeCon.runDropStage(stageIdentifier); } catch (SQLException ex) { throw new RuntimeException(ex); } @@ -185,11 +183,10 @@ protected BatchInsert newBatchInsert(PluginTask task, Optional merg "Snowflake output plugin doesn't support 'merge_direct' mode."); } SnowflakePluginTask pluginTask = (SnowflakePluginTask) task; - this.stageIdentifier = StageIdentifierHolder.getStageIdentifier(pluginTask); return new SnowflakeCopyBatchInsert( getConnector(task, true), - this.stageIdentifier, + StageIdentifierHolder.getStageIdentifier(pluginTask), false, pluginTask.getMaxUploadRetries(), pluginTask.getEmtpyFieldAsNull());