Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete tmp stage even on error and Add delete_stage_on_error option #72

Merged
merged 7 commits into from
Mar 11, 2024

Conversation

maeken
Copy link
Contributor

@maeken maeken commented Feb 26, 2024

What to do

  • Modifying policies
    • Both runDropStage and runCreateStage must be executed on same level of the job process.
    • CREATE STAGE IF NOT EXIST clause was executed for the number of threads because the execution point of runCreateStage was the multi-threaded startup part of a split job to begin with. However in the case of runDropStage, if the preceding thread deleted the snowflake temp stage, the subsequent threads would fail. So, create and drop should be completed in the OutputPlugin's transaction method, which can encompass the entire job multi-threading processes.
  • Remove existing unnecessary create and drop processes for snowflake temp stage.
  • Add drop_stage_on_error option. Both delete_stage and delete_stage_on_error option are effective.

Issue / Reference

@maeken maeken self-assigned this Feb 26, 2024
@maeken maeken requested review from kekekenta and d-hrs February 26, 2024 06:03
@maeken maeken changed the title #19356 Snowflake転送先時の転送ジョブエラー時に一時STAGEが残ってしまう問題の解消 Delete snowflake temp stage even when jobs got error Feb 26, 2024
Copy link
Contributor

@d-hrs d-hrs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for backward compatibility, please make it the option like delete_stage and defualt false (do not delete stage).

I think the presence of a stage is helpful for investigate the errors.

@maeken maeken changed the title Delete snowflake temp stage even when jobs got error Delete tmp stage even on error and Add delete_stage_on_error option Feb 28, 2024
@maeken maeken requested a review from d-hrs February 28, 2024 02:21
@kekekenta
Copy link
Member

kekekenta commented Feb 29, 2024

@maeken Would you share an example config for reproduce the error and result log?

@maeken
Copy link
Contributor Author

maeken commented Feb 29, 2024

@kekekenta
Here is a sample config yaml to get an error, content of sample csv, and result log.

in:
  type: file
  path_prefix: ./test.csv
  parser:
    type: csv
    delimiter: ","
    skip_header_lines: 0
    null_string: ""
    columns:
      - { name: col1, type: long }
      - { name: col2, type: string }
    stop_on_invalid_record: true

out:
  type: snowflake
  host: <YOUR_SNOWFLAKE_HOST>
  user: <YOUR_SNOWFLAKE_USER>
  password: <YOUR_SNOWFLAKE_PASSWORD>
  warehouse: <YOUR_SNOWFLAKE_WAREHOUSE>
  database: <YOUR_SNOWFLAKE_DATABASE>
  schema: <YOUR_SNOWFLAKE_SCHEMA>
  table: <YOUR_SNOWFLAKE_TABLE>
  delete_stage: true
  delete_stage_on_error: true
  retry_limit: 1
  retry_wait: 1000
  max_retry_wait: 1800000
  mode: replace
  default_timezone: UTC

and content of test.csv

"hogehogefugafuga","90123456789"
"hogehgoe21928398","B01234567890123456789"
"hogeho8dyf9e","C01234567890123456789012345678901234567890123456789"

and result log

2024-02-29 09:36:07.947 +0000: Embulk v0.9.26
2024-02-29 09:36:08.567 +0000 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
2024-02-29 09:36:09.941 +0000 [INFO] (main): BUNDLE_GEMFILE is being set: "/work/embulk_bundle/Gemfile"
2024-02-29 09:36:09.941 +0000 [INFO] (main): Gem's home and path are being cleared.
You must use Bundler 2 or greater with this lockfile.
2024-02-29 09:36:11.071 +0000 [INFO] (main): Started Embulk v0.9.26
2024-02-29 09:36:11.135 +0000 [INFO] (0001:transaction): Loaded plugin embulk/output/snowflake from a load path
2024-02-29 09:36:11.167 +0000 [INFO] (0001:transaction): Listing local files at directory '.' filtering filename by prefix 'test.csv'
2024-02-29 09:36:11.168 +0000 [INFO] (0001:transaction): "follow_symlinks" is set false. Note that symbolic links to directories are skipped.
2024-02-29 09:36:11.171 +0000 [INFO] (0001:transaction): Loading files [./test.csv]
2024-02-29 09:36:11.191 +0000 [INFO] (0001:transaction): Using local thread executor with max_threads=16 / output tasks 8 = input tasks 1 * 8
2024-02-29 09:36:11.208 +0000 [WARN] (0001:transaction): "UTC" is recognized as "Z" to be compatible with the legacy style.
2024-02-29 09:36:11.219 +0000 [INFO] (0001:transaction): Connecting to jdbc:snowflake://nz47967.ap-northeast-1.aws.snowflakecomputing.com options {user=pn, warehouse=COMPUTE_WH, db=TEST_BY_MAEKEN, password=***, schema=TEST_SCHEMA, MULTI_STATEMENT_COUNT=0, CLIENT_METADATA_REQUEST_USE_CONNECTION_CTX=true}
2024-02-29 09:36:12.064 +0000 [INFO] (0001:transaction): TransactionIsolation=unknown
2024-02-29 09:36:12.325 +0000 [INFO] (0001:transaction): SQL: CREATE STAGE IF NOT EXISTS TEST_BY_MAEKEN.TEST_SCHEMA.embulk_snowflake_20240229ARpVxc;
2024-02-29 09:36:12.327 +0000 [WARN] (0001:transaction): "UTC" is recognized as "Z" to be compatible with the legacy style.
2024-02-29 09:36:12.329 +0000 [INFO] (0001:transaction): Connecting to jdbc:snowflake://nz47967.ap-northeast-1.aws.snowflakecomputing.com options {user=pn, warehouse=COMPUTE_WH, db=TEST_BY_MAEKEN, password=***, schema=TEST_SCHEMA, MULTI_STATEMENT_COUNT=0, CLIENT_METADATA_REQUEST_USE_CONNECTION_CTX=true}
2024-02-29 09:36:12.756 +0000 [INFO] (0001:transaction): TransactionIsolation=unknown
2024-02-29 09:36:12.756 +0000 [INFO] (0001:transaction): Using JDBC Driver 3.13.26
2024-02-29 09:36:12.756 +0000 [INFO] (0001:transaction): Using replace mode
2024-02-29 09:36:12.901 +0000 [INFO] (0001:transaction): SQL: CREATE TABLE "STAGE_DELETE_TEST_0000018df438b265_embulk" ("col1" BIGINT, "col2" VARCHAR(65535))
2024-02-29 09:36:13.231 +0000 [INFO] (0001:transaction): > 0.33 seconds
2024-02-29 09:36:13.674 +0000 [INFO] (0001:transaction): {done:  0 / 1, running: 0}
2024-02-29 09:36:13.679 +0000 [WARN] (0016:task-0000): Z is deprecated as a military time zone name. Use UTC instead.
2024-02-29 09:36:13.680 +0000 [WARN] (0016:task-0000): "Z" is recognized as "Z" to be compatible with the legacy style.
2024-02-29 09:36:13.689 +0000 [INFO] (0016:task-0000): Connecting to jdbc:snowflake://nz47967.ap-northeast-1.aws.snowflakecomputing.com options {user=pn, warehouse=COMPUTE_WH, db=TEST_BY_MAEKEN, password=***, schema=TEST_SCHEMA, MULTI_STATEMENT_COUNT=0, CLIENT_METADATA_REQUEST_USE_CONNECTION_CTX=true}
2024-02-29 09:36:13.846 +0000 [INFO] (0016:task-0000): TransactionIsolation=unknown
2024-02-29 09:36:13.849 +0000 [WARN] (0016:task-0000): Z is deprecated as a military time zone name. Use UTC instead.
2024-02-29 09:36:13.849 +0000 [WARN] (0016:task-0000): "Z" is recognized as "Z" to be compatible with the legacy style.
2024-02-29 09:36:13.850 +0000 [INFO] (0016:task-0000): Connecting to jdbc:snowflake://nz47967.ap-northeast-1.aws.snowflakecomputing.com options {user=pn, warehouse=COMPUTE_WH, db=TEST_BY_MAEKEN, password=***, schema=TEST_SCHEMA, MULTI_STATEMENT_COUNT=0, CLIENT_METADATA_REQUEST_USE_CONNECTION_CTX=true}
2024-02-29 09:36:13.987 +0000 [INFO] (0016:task-0000): TransactionIsolation=unknown
2024-02-29 09:36:13.987 +0000 [WARN] (0016:task-0000): Z is deprecated as a military time zone name. Use UTC instead.
2024-02-29 09:36:13.988 +0000 [WARN] (0016:task-0000): "Z" is recognized as "Z" to be compatible with the legacy style.
2024-02-29 09:36:13.989 +0000 [INFO] (0016:task-0000): Connecting to jdbc:snowflake://nz47967.ap-northeast-1.aws.snowflakecomputing.com options {user=pn, warehouse=COMPUTE_WH, db=TEST_BY_MAEKEN, password=***, schema=TEST_SCHEMA, MULTI_STATEMENT_COUNT=0, CLIENT_METADATA_REQUEST_USE_CONNECTION_CTX=true}
2024-02-29 09:36:14.170 +0000 [INFO] (0016:task-0000): TransactionIsolation=unknown
2024-02-29 09:36:14.171 +0000 [WARN] (0016:task-0000): Z is deprecated as a military time zone name. Use UTC instead.
2024-02-29 09:36:14.171 +0000 [WARN] (0016:task-0000): "Z" is recognized as "Z" to be compatible with the legacy style.
2024-02-29 09:36:14.173 +0000 [INFO] (0016:task-0000): Connecting to jdbc:snowflake://nz47967.ap-northeast-1.aws.snowflakecomputing.com options {user=pn, warehouse=COMPUTE_WH, db=TEST_BY_MAEKEN, password=***, schema=TEST_SCHEMA, MULTI_STATEMENT_COUNT=0, CLIENT_METADATA_REQUEST_USE_CONNECTION_CTX=true}
2024-02-29 09:36:14.322 +0000 [INFO] (0016:task-0000): TransactionIsolation=unknown
2024-02-29 09:36:14.323 +0000 [WARN] (0016:task-0000): Z is deprecated as a military time zone name. Use UTC instead.
2024-02-29 09:36:14.323 +0000 [WARN] (0016:task-0000): "Z" is recognized as "Z" to be compatible with the legacy style.
2024-02-29 09:36:14.324 +0000 [INFO] (0016:task-0000): Connecting to jdbc:snowflake://nz47967.ap-northeast-1.aws.snowflakecomputing.com options {user=pn, warehouse=COMPUTE_WH, db=TEST_BY_MAEKEN, password=***, schema=TEST_SCHEMA, MULTI_STATEMENT_COUNT=0, CLIENT_METADATA_REQUEST_USE_CONNECTION_CTX=true}
2024-02-29 09:36:14.522 +0000 [INFO] (0016:task-0000): TransactionIsolation=unknown
2024-02-29 09:36:14.523 +0000 [WARN] (0016:task-0000): Z is deprecated as a military time zone name. Use UTC instead.
2024-02-29 09:36:14.523 +0000 [WARN] (0016:task-0000): "Z" is recognized as "Z" to be compatible with the legacy style.
2024-02-29 09:36:14.525 +0000 [INFO] (0016:task-0000): Connecting to jdbc:snowflake://nz47967.ap-northeast-1.aws.snowflakecomputing.com options {user=pn, warehouse=COMPUTE_WH, db=TEST_BY_MAEKEN, password=***, schema=TEST_SCHEMA, MULTI_STATEMENT_COUNT=0, CLIENT_METADATA_REQUEST_USE_CONNECTION_CTX=true}
2024-02-29 09:36:14.654 +0000 [INFO] (0016:task-0000): TransactionIsolation=unknown
2024-02-29 09:36:14.655 +0000 [WARN] (0016:task-0000): Z is deprecated as a military time zone name. Use UTC instead.
2024-02-29 09:36:14.655 +0000 [WARN] (0016:task-0000): "Z" is recognized as "Z" to be compatible with the legacy style.
2024-02-29 09:36:14.656 +0000 [INFO] (0016:task-0000): Connecting to jdbc:snowflake://nz47967.ap-northeast-1.aws.snowflakecomputing.com options {user=pn, warehouse=COMPUTE_WH, db=TEST_BY_MAEKEN, password=***, schema=TEST_SCHEMA, MULTI_STATEMENT_COUNT=0, CLIENT_METADATA_REQUEST_USE_CONNECTION_CTX=true}
2024-02-29 09:36:14.800 +0000 [INFO] (0016:task-0000): TransactionIsolation=unknown
2024-02-29 09:36:14.800 +0000 [WARN] (0016:task-0000): Z is deprecated as a military time zone name. Use UTC instead.
2024-02-29 09:36:14.801 +0000 [WARN] (0016:task-0000): "Z" is recognized as "Z" to be compatible with the legacy style.
2024-02-29 09:36:14.802 +0000 [INFO] (0016:task-0000): Connecting to jdbc:snowflake://nz47967.ap-northeast-1.aws.snowflakecomputing.com options {user=pn, warehouse=COMPUTE_WH, db=TEST_BY_MAEKEN, password=***, schema=TEST_SCHEMA, MULTI_STATEMENT_COUNT=0, CLIENT_METADATA_REQUEST_USE_CONNECTION_CTX=true}
2024-02-29 09:36:14.976 +0000 [INFO] (0016:task-0000): TransactionIsolation=unknown
2024-02-29 09:36:15.394 +0000 [INFO] (0001:transaction): {done:  1 / 1, running: 0}
2024-02-29 09:36:15.514 +0000 [INFO] (0001:transaction): SQL: DROP STAGE IF EXISTS TEST_BY_MAEKEN.TEST_SCHEMA.embulk_snowflake_20240229ARpVxc;
2024-02-29 09:36:15.520 +0000 [WARN] (0001:cleanup): Z is deprecated as a military time zone name. Use UTC instead.
2024-02-29 09:36:15.520 +0000 [WARN] (0001:cleanup): "Z" is recognized as "Z" to be compatible with the legacy style.
2024-02-29 09:36:15.521 +0000 [INFO] (0001:cleanup): Connecting to jdbc:snowflake://nz47967.ap-northeast-1.aws.snowflakecomputing.com options {user=pn, warehouse=COMPUTE_WH, db=TEST_BY_MAEKEN, password=***, schema=TEST_SCHEMA, MULTI_STATEMENT_COUNT=0, CLIENT_METADATA_REQUEST_USE_CONNECTION_CTX=true}
2024-02-29 09:36:15.675 +0000 [INFO] (0001:cleanup): TransactionIsolation=unknown
2024-02-29 09:36:15.675 +0000 [INFO] (0001:cleanup): SQL: DROP TABLE IF EXISTS "STAGE_DELETE_TEST_0000018df438b265_embulk"
2024-02-29 09:36:15.814 +0000 [INFO] (0001:cleanup): > 0.14 seconds
org.embulk.exec.PartialExecutionException: java.lang.RuntimeException: org.embulk.spi.DataException: Invalid record at /src/test/./test.csv:1: "hogehogefugafuga","90123456789"
	at org.embulk.exec.BulkLoader$LoaderState.buildPartialExecuteException(BulkLoader.java:340)
	at org.embulk.exec.BulkLoader.doRun(BulkLoader.java:566)
	at org.embulk.exec.BulkLoader.access$000(BulkLoader.java:35)
	at org.embulk.exec.BulkLoader$1.run(BulkLoader.java:353)
	at org.embulk.exec.BulkLoader$1.run(BulkLoader.java:350)
	at org.embulk.spi.Exec.doWith(Exec.java:22)
	at org.embulk.exec.BulkLoader.run(BulkLoader.java:350)
	at org.embulk.EmbulkEmbed.run(EmbulkEmbed.java:242)
	at org.embulk.EmbulkRunner.runInternal(EmbulkRunner.java:290)
	at org.embulk.EmbulkRunner.run(EmbulkRunner.java:155)
	at org.embulk.cli.EmbulkRun.runSubcommand(EmbulkRun.java:431)
	at org.embulk.cli.EmbulkRun.run(EmbulkRun.java:90)
	at org.embulk.cli.Main.main(Main.java:64)
Caused by: java.lang.RuntimeException: org.embulk.spi.DataException: Invalid record at /src/test/./test.csv:1: "hogehogefugafuga","90123456789"
	at org.embulk.output.SnowflakeOutputPlugin.transaction(SnowflakeOutputPlugin.java:161)
	at org.embulk.exec.BulkLoader$4$1$1.transaction(BulkLoader.java:521)
	at org.embulk.exec.LocalExecutorPlugin.transaction(LocalExecutorPlugin.java:50)
	at org.embulk.exec.BulkLoader$4$1.run(BulkLoader.java:516)
	at org.embulk.spi.util.Filters$RecursiveControl.transaction(Filters.java:84)
	at org.embulk.spi.util.Filters.transaction(Filters.java:42)
	at org.embulk.exec.BulkLoader$4.run(BulkLoader.java:511)
	at org.embulk.spi.FileInputRunner$RunnerControl$1$1.run(FileInputRunner.java:112)
	at org.embulk.standards.CsvParserPlugin.transaction(CsvParserPlugin.java:237)
	at org.embulk.spi.FileInputRunner$RunnerControl$1.run(FileInputRunner.java:107)
	at org.embulk.spi.util.Decoders$RecursiveControl.transaction(Decoders.java:68)
	at org.embulk.spi.util.Decoders.transaction(Decoders.java:29)
	at org.embulk.spi.FileInputRunner$RunnerControl.run(FileInputRunner.java:105)
	at org.embulk.standards.LocalFileInputPlugin.resume(LocalFileInputPlugin.java:79)
	at org.embulk.standards.LocalFileInputPlugin.transaction(LocalFileInputPlugin.java:72)
	at org.embulk.spi.FileInputRunner.transaction(FileInputRunner.java:62)
	at org.embulk.exec.BulkLoader.doRun(BulkLoader.java:507)
	... 11 more
Caused by: org.embulk.spi.DataException: Invalid record at /src/test/./test.csv:1: "hogehogefugafuga","90123456789"
	at org.embulk.standards.CsvParserPlugin.run(CsvParserPlugin.java:374)
	at org.embulk.spi.FileInputRunner.run(FileInputRunner.java:140)
	at org.embulk.exec.LocalExecutorPlugin$ScatterExecutor.runInputTask(LocalExecutorPlugin.java:269)
	at org.embulk.exec.LocalExecutorPlugin$ScatterExecutor.access$100(LocalExecutorPlugin.java:194)
	at org.embulk.exec.LocalExecutorPlugin$ScatterExecutor$1.call(LocalExecutorPlugin.java:233)
	at org.embulk.exec.LocalExecutorPlugin$ScatterExecutor$1.call(LocalExecutorPlugin.java:230)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.embulk.standards.CsvParserPlugin$CsvRecordValidateException: java.lang.NumberFormatException: For input string: "hogehogefugafuga"
	at org.embulk.standards.CsvParserPlugin$1.longColumn(CsvParserPlugin.java:291)
	at org.embulk.spi.Column.visit(Column.java:48)
	at org.embulk.spi.Schema.visitColumns(Schema.java:68)
	at org.embulk.standards.CsvParserPlugin.run(CsvParserPlugin.java:272)
	... 9 more
Caused by: java.lang.NumberFormatException: For input string: "hogehogefugafuga"
	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
	at java.lang.Long.parseLong(Long.java:589)
	at java.lang.Long.parseLong(Long.java:631)
	at org.embulk.standards.CsvParserPlugin$1.longColumn(CsvParserPlugin.java:288)
	... 12 more

Error: java.lang.RuntimeException: org.embulk.spi.DataException: Invalid record at /src/test/./test.csv:1: "hogehogefugafuga","90123456789"

if (this.stageIdentifier == null) {
this.stageIdentifier = StageIdentifierHolder.getStageIdentifier(t);
}
this.stageIdentifier = StageIdentifierHolder.getStageIdentifier(t);
Copy link
Member

@kekekenta kekekenta Feb 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we don't have to define stageIdentifier as field. (as this.stageIdentifier)
stageIdentifier can be local variable.

SnowflakePluginTask pluginTask = (SnowflakePluginTask) task;
this.stageIdentifier = StageIdentifierHolder.getStageIdentifier(pluginTask);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.stageIdentifier can be local variable?

Copy link
Member

@kekekenta kekekenta left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commented

@maeken maeken requested a review from kekekenta March 1, 2024 01:11
Copy link
Member

@kekekenta kekekenta left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@d-hrs d-hrs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@d-hrs d-hrs merged commit 9cfaa02 into main Mar 11, 2024
1 check passed
@d-hrs d-hrs deleted the 19356-drop-snowflake-stage branch March 11, 2024 08:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants