diff --git a/.github/workflows/check.yml b/.github/workflows/check.yml new file mode 100644 index 0000000..535d915 --- /dev/null +++ b/.github/workflows/check.yml @@ -0,0 +1,32 @@ +name: Check +on: [ pull_request, push ] +jobs: + check: + runs-on: ubuntu-latest + # push: always run. + # pull_request: run only when the PR is submitted from a forked repository, not within this repository. + if: github.event_name == 'push' || github.event.pull_request.head.repo.full_name != github.repository + strategy: + matrix: + jruby_version: + - 9.1.17.0 + fail-fast: false + steps: + - uses: actions/checkout@v4 + - name: Set up OpenJDK 8 + uses: actions/setup-java@v4 + with: + java-version: 8 + distribution: "temurin" + - uses: ruby/setup-ruby@v1 + with: + ruby-version: 'jruby-${{ matrix.jruby_version }}' + bundler-cache: false + - name: install embulk.jar + run: "curl -L -o embulk.jar https://github.com/embulk/embulk/releases/download/v0.9.25/embulk-0.9.25.jar" + - name: chmod embulk.jar + run: "chmod a+x embulk.jar" + - name: bundle install + run: "./embulk.jar bundle install --path vendor/bundle" + - name: rake test + run: 'bundle exec env RUBYOPT="-r ./embulk.jar -r embulk -r embulk/java/bootstrap" rake test' diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml new file mode 100644 index 0000000..00374cf --- /dev/null +++ b/.github/workflows/publish.yml @@ -0,0 +1,42 @@ +name: Publish +on: + push: + tags: + - "v0.*" +jobs: + publish: + runs-on: ubuntu-latest + environment: maven-central-and-ruby-gems + strategy: + fail-fast: true + steps: + - uses: actions/checkout@v4 + - name: Set up Ruby + uses: ruby/setup-ruby@v1 + with: + ruby-version: 3.3.0 + # get tag variable using {{ github.ref_name }} + # + # References: + # * https://docs.github.com/en/actions/learn-github-actions/contexts#github-context + # * https://docs.github.com/en/actions/learn-github-actions/variables#default-environment-variables + - name: extract gem version from tag + id: vars + run: echo version=${{ github.ref_name }} | sed -e 's/v0/0/' >> $GITHUB_OUTPUT + # + # From gem push documents. + # + # The push command will use ~/.gem/credentials to authenticate to a server, + # but you can use the RubyGems environment variable GEM_HOST_API_KEY + # to set the api key to authenticate. + # + # https://guides.rubygems.org/command-reference/#gem-push + # + - name: Publish + run: | + if [ -z "${GEM_HOST_API_KEY}" ]; then exit 0; fi + rake build + gem push pkg/${EMBULK_PLUGIN_NAME}-${{ steps.vars.outputs.version }}.gem + env: + EMBULK_PLUGIN_NAME: embulk-output-bigquery + GEM_HOST_API_KEY: "${{secrets.RUBYGEMS_API_KEY}}" diff --git a/CHANGELOG.md b/CHANGELOG.md index f879f7e..e589949 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,17 @@ +## 0.7.2 - 2024-07-21 +* [maintenance] Fix GitHub Actions #166 +* [maintenance] Fix gcs_client in order to load data using gcs_bucket parameter (Thanks to kashira202111) #164 +* [maintenance] Prevent creating unnecessary tables. (Thanks to kashira202111) #148 + +## 0.7.1 - 2024-03-4 +* [enhancement] Support description of columns and tables (Thanks to @kyoshidajp and @fagai ) #142 +* [maintenance] Add missing GitHub Actions environment setting. #160 +* [maintenance] Replace google-api-client with specific Google APIs (Thanks to @Nozomuts) #161 +* [maintenance] Update GitHub Actions use checkout@v4 and setup-java@v4 #162 + +## 0.7.0 - 2024-02-1 +* [enhancement] Add support Embulk 0.11.x + ## 0.6.9 - 2023-03-16 * [enhancement] Add SSLException to retry job (thanks to @mzumi) diff --git a/Gemfile b/Gemfile index 7328ea6..cf3727f 100644 --- a/Gemfile +++ b/Gemfile @@ -8,3 +8,4 @@ gem 'embulk-parser-jsonl' gem 'pry-nav' gem 'test-unit' gem 'test-unit-rr' +gem 'rake', '10.4.2' diff --git a/README.md b/README.md index caf747b..a5d2cb9 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,5 @@ # embulk-output-bigquery -[![Build Status](https://secure.travis-ci.org/embulk/embulk-output-bigquery.png?branch=master)](http://travis-ci.org/embulk/embulk-output-bigquery) - [Embulk](https://github.com/embulk/embulk/) output plugin to load/insert data into [Google BigQuery](https://cloud.google.com/bigquery/) using [direct insert](https://cloud.google.com/bigquery/loading-data-into-bigquery#loaddatapostrequest) ## Overview @@ -14,6 +12,13 @@ https://developers.google.com/bigquery/loading-data-into-bigquery * **Cleanup supported**: no * **Dynamic table creating**: yes +### Supported Embulk + +| gem version | Embulk version | +|------------------|--------------------| +| 0.7.0 and higher | v0.11.0 and higher | +| 0.6.9 and lower | v0.9.X and lower | + ### NOT IMPLEMENTED * insert data over streaming inserts * for continuous real-time insertions @@ -55,6 +60,7 @@ OAuth flow for installed applications. | gcs_bucket | string | optional | nil | See [GCS Bucket](#gcs-bucket) | | auto_create_gcs_bucket | boolean | optional | false | See [GCS Bucket](#gcs-bucket) | | progress_log_interval | float | optional | nil (Disabled) | Progress log interval. The progress log is disabled by nil (default). NOTE: This option may be removed in a future because a filter plugin can achieve the same goal | +| description | string | optional | nil | description of table | Client or request options @@ -325,6 +331,7 @@ Column options are used to aid guessing BigQuery schema, or to define conversion - numeric: `STRING` - **mode**: BigQuery mode such as `NULLABLE`, `REQUIRED`, and `REPEATED` (string, default: `NULLABLE`) - **fields**: Describes the nested schema fields if the type property is set to RECORD. Please note that this is **required** for `RECORD` column. + - **description**: description (string, default is `None`). - **timestamp_format**: timestamp format to convert into/from `timestamp` (string, default is `default_timestamp_format`) - **timezone**: timezone to convert into/from `timestamp`, `date` (string, default is `default_timezone`). - **description**: description for the column. diff --git a/embulk-output-bigquery.gemspec b/embulk-output-bigquery.gemspec index 46c2f14..380bca1 100644 --- a/embulk-output-bigquery.gemspec +++ b/embulk-output-bigquery.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |spec| spec.name = "embulk-output-bigquery" - spec.version = "0.6.9.trocco.0.0.3" + spec.version = "0.7.2" spec.authors = ["Satoshi Akama", "Naotoshi Seo"] spec.summary = "Google BigQuery output plugin for Embulk" spec.description = "Embulk plugin that insert records to Google BigQuery." diff --git a/lib/embulk/output/bigquery.rb b/lib/embulk/output/bigquery.rb index 8c28840..240e0aa 100644 --- a/lib/embulk/output/bigquery.rb +++ b/lib/embulk/output/bigquery.rb @@ -63,6 +63,8 @@ def self.configure(config, schema, task_count) 'payload_column' => config.param('payload_column', :string, :default => nil), 'payload_column_index' => config.param('payload_column_index', :integer, :default => nil), + 'description' => config.param('description', :string, :default => nil), + 'open_timeout_sec' => config.param('open_timeout_sec', :integer, :default => nil), 'timeout_sec' => config.param('timeout_sec', :integer, :default => nil), # google-api-ruby-client < v0.11.0 'send_timeout_sec' => config.param('send_timeout_sec', :integer, :default => nil), # google-api-ruby-client >= v0.11.0 diff --git a/lib/embulk/output/bigquery/bigquery_client.rb b/lib/embulk/output/bigquery/bigquery_client.rb index c6d69c3..14a2990 100644 --- a/lib/embulk/output/bigquery/bigquery_client.rb +++ b/lib/embulk/output/bigquery/bigquery_client.rb @@ -121,7 +121,7 @@ def load_from_gcs(object_uris, table) opts = {} Embulk.logger.debug { "embulk-output-bigquery: insert_job(#{@project}, #{body}, #{opts})" } - response = with_network_retry { client.insert_job(@project, body, opts) } + response = with_network_retry { client.insert_job(@project, body, **opts) } unless @task['is_skip_job_result_check'] response = wait_load('Load', response) end @@ -222,7 +222,7 @@ def load(path, table, write_disposition: 'WRITE_APPEND') # }, } Embulk.logger.debug { "embulk-output-bigquery: insert_job(#{@project}, #{body}, #{opts})" } - response = with_network_retry { client.insert_job(@project, body, opts) } + response = with_network_retry { client.insert_job(@project, body, **opts) } if @task['is_skip_job_result_check'] response else @@ -278,7 +278,7 @@ def copy(source_table, destination_table, destination_dataset = nil, write_dispo opts = {} Embulk.logger.debug { "embulk-output-bigquery: insert_job(#{@project}, #{body}, #{opts})" } - response = with_network_retry { client.insert_job(@project, body, opts) } + response = with_network_retry { client.insert_job(@project, body, **opts) } wait_load('Copy', response) rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e response = {status_code: e.status_code, message: e.message, error_class: e.class} @@ -372,7 +372,7 @@ def create_dataset(dataset = nil, reference: nil) end opts = {} Embulk.logger.debug { "embulk-output-bigquery: insert_dataset(#{@project}, #{dataset}, #{@location_for_log}, #{body}, #{opts})" } - with_network_retry { client.insert_dataset(@project, body, opts) } + with_network_retry { client.insert_dataset(@project, body, **opts) } rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e if e.status_code == 409 && /Already Exists:/ =~ e.message # ignore 'Already Exists' error @@ -420,6 +420,7 @@ def create_table_if_not_exists(table, dataset: nil, options: nil) table_reference: { table_id: table, }, + description: @task['description'], schema: { fields: fields, } @@ -446,8 +447,8 @@ def create_table_if_not_exists(table, dataset: nil, options: nil) end opts = {} - Embulk.logger.debug { "embulk-output-bigquery: insert_table(#{@project}, #{dataset}, #{@location_for_log}, #{body}, #{opts})" } - with_network_retry { client.insert_table(@project, dataset, body, opts) } + Embulk.logger.debug { "embulk-output-bigquery: insert_table(#{@destination_project}, #{dataset}, #{@location_for_log}, #{body}, #{opts})" } + with_network_retry { client.insert_table(@destination_project, dataset, body, **opts) } rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e if e.status_code == 409 && /Already Exists:/ =~ e.message # ignore 'Already Exists' error @@ -456,7 +457,7 @@ def create_table_if_not_exists(table, dataset: nil, options: nil) response = {status_code: e.status_code, message: e.message, error_class: e.class} Embulk.logger.error { - "embulk-output-bigquery: insert_table(#{@project}, #{dataset}, #{@location_for_log}, #{body}, #{opts}), response:#{response}" + "embulk-output-bigquery: insert_table(#{@destination_project}, #{dataset}, #{@location_for_log}, #{body}, #{opts}), response:#{response}" } raise Error, "failed to create table #{@destination_project}:#{dataset}.#{table} in #{@location_for_log}, response:#{response}" end diff --git a/lib/embulk/output/bigquery/gcs_client.rb b/lib/embulk/output/bigquery/gcs_client.rb index 0e0b57f..c2bf162 100644 --- a/lib/embulk/output/bigquery/gcs_client.rb +++ b/lib/embulk/output/bigquery/gcs_client.rb @@ -48,7 +48,7 @@ def insert_temporary_bucket(bucket = nil) opts = {} Embulk.logger.debug { "embulk-output-bigquery: insert_temporary_bucket(#{@project}, #{body}, #{opts})" } - with_network_retry { client.insert_bucket(@project, body, opts) } + with_network_retry { client.insert_bucket(@project, body, **opts) } rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e if e.status_code == 409 && /conflict:/ =~ e.message # ignore 'Already Exists' error @@ -81,7 +81,7 @@ def insert_object(path, object: nil, bucket: nil) Embulk.logger.debug { "embulk-output-bigquery: insert_object(#{bucket}, #{body}, #{opts})" } # memo: gcs is strongly consistent for insert (read-after-write). ref: https://cloud.google.com/storage/docs/consistency - with_network_retry { client.insert_object(bucket, body, opts) } + with_network_retry { client.insert_object(bucket, body, **opts) } rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e response = {status_code: e.status_code, message: e.message, error_class: e.class} Embulk.logger.error { @@ -114,7 +114,7 @@ def delete_object(object, bucket: nil) opts = {} Embulk.logger.debug { "embulk-output-bigquery: delete_object(#{bucket}, #{object}, #{opts})" } - response = with_network_retry { client.delete_object(bucket, object, opts) } + response = with_network_retry { client.delete_object(bucket, object, **opts) } rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e if e.status_code == 404 # ignore 'notFound' error return nil diff --git a/lib/embulk/output/bigquery/helper.rb b/lib/embulk/output/bigquery/helper.rb index f4e4c1d..37cac69 100644 --- a/lib/embulk/output/bigquery/helper.rb +++ b/lib/embulk/output/bigquery/helper.rb @@ -46,10 +46,10 @@ def self.fields_from_embulk_schema(task, schema) embulk_type = column[:type] column_option = column_options_map[column_name] || {} {}.tap do |field| - field[:name] = column_name - field[:type] = (column_option['type'] || bq_type_from_embulk_type(embulk_type)).upcase - field[:mode] = column_option['mode'] if column_option['mode'] - field[:fields] = deep_symbolize_keys(column_option['fields']) if column_option['fields'] + field[:name] = column_name + field[:type] = (column_option['type'] || bq_type_from_embulk_type(embulk_type)).upcase + field[:mode] = column_option['mode'] if column_option['mode'] + field[:fields] = deep_symbolize_keys(column_option['fields']) if column_option['fields'] field[:description] = column_option['description'] if column_option['description'] end end diff --git a/test/test_helper.rb b/test/test_helper.rb index 309cb84..7cfad76 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -68,7 +68,7 @@ def test_fields_from_embulk_schema_with_column_options ]) task = { 'column_options' => [ - {'name' => 'boolean', 'type' => 'STRING', 'mode' => 'REQUIRED'}, + {'name' => 'boolean', 'type' => 'STRING', 'mode' => 'REQUIRED', 'description' => 'hoge'}, {'name' => 'long', 'type' => 'STRING'}, {'name' => 'double', 'type' => 'STRING'}, {'name' => 'string', 'type' => 'INTEGER', 'description' => 'memo'}, @@ -81,7 +81,7 @@ def test_fields_from_embulk_schema_with_column_options ], } expected = [ - {name: 'boolean', type: 'STRING', mode: 'REQUIRED'}, + {name: 'boolean', type: 'STRING', mode: 'REQUIRED', description: 'hoge'}, {name: 'long', type: 'STRING'}, {name: 'double', type: 'STRING'}, {name: 'string', type: 'INTEGER', description: 'memo'}, diff --git a/test/test_transaction.rb b/test/test_transaction.rb index 0cb5c72..a00fbb4 100644 --- a/test/test_transaction.rb +++ b/test/test_transaction.rb @@ -115,7 +115,7 @@ def test_replace task = Bigquery.configure(config, schema, processor_count) any_instance_of(BigqueryClient) do |obj| mock(obj).get_dataset(config['dataset']) - mock(obj).create_table_if_not_exists(config['temp_table']) + mock(obj).create_table_if_not_exists(config['temp_table'], options: {"expiration_time"=>nil}) mock(obj).create_table_if_not_exists(config['table']) mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_TRUNCATE') mock(obj).delete_table(config['temp_table']) @@ -129,7 +129,7 @@ def test_replace_with_partitioning task = Bigquery.configure(config, schema, processor_count) any_instance_of(BigqueryClient) do |obj| mock(obj).get_dataset(config['dataset']) - mock(obj).create_table_if_not_exists(config['temp_table']) + mock(obj).create_table_if_not_exists(config['temp_table'], options: {"expiration_time"=>nil}) mock(obj).create_table_if_not_exists(config['table']) mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_TRUNCATE') mock(obj).delete_table(config['temp_table']) @@ -146,7 +146,7 @@ def test_replace_backup any_instance_of(BigqueryClient) do |obj| mock(obj).get_dataset(config['dataset']) mock(obj).get_dataset(config['dataset_old']) - mock(obj).create_table_if_not_exists(config['temp_table']) + mock(obj).create_table_if_not_exists(config['temp_table'], options: {"expiration_time"=>nil}) mock(obj).create_table_if_not_exists(config['table']) mock(obj).create_table_if_not_exists(config['table_old'], dataset: config['dataset_old']) @@ -167,7 +167,7 @@ def test_replace_backup_auto_create_dataset mock(obj).create_dataset(config['dataset']) mock(obj).create_dataset(config['dataset_old'], reference: config['dataset']) mock(obj).create_table_if_not_exists(config['table']) - mock(obj).create_table_if_not_exists(config['temp_table']) + mock(obj).create_table_if_not_exists(config['temp_table'], options: {"expiration_time"=>nil}) mock(obj).create_table_if_not_exists(config['table_old'], dataset: config['dataset_old']) mock(obj).get_table_or_partition(config['table']) @@ -186,7 +186,7 @@ def test_replace_backup_with_partitioning any_instance_of(BigqueryClient) do |obj| mock(obj).get_dataset(config['dataset']) mock(obj).get_dataset(config['dataset_old']) - mock(obj).create_table_if_not_exists(config['temp_table']) + mock(obj).create_table_if_not_exists(config['temp_table'], options: {"expiration_time"=>nil}) mock(obj).create_table_if_not_exists(config['table']) mock(obj).create_table_if_not_exists(config['table_old'], dataset: config['dataset_old']) @@ -207,7 +207,7 @@ def test_append task = Bigquery.configure(config, schema, processor_count) any_instance_of(BigqueryClient) do |obj| mock(obj).get_dataset(config['dataset']) - mock(obj).create_table_if_not_exists(config['temp_table']) + mock(obj).create_table_if_not_exists(config['temp_table'], options: {"expiration_time"=>nil}) mock(obj).create_table_if_not_exists(config['table']) mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_APPEND') mock(obj).delete_table(config['temp_table']) @@ -221,7 +221,7 @@ def test_append_with_partitioning task = Bigquery.configure(config, schema, processor_count) any_instance_of(BigqueryClient) do |obj| mock(obj).get_dataset(config['dataset']) - mock(obj).create_table_if_not_exists(config['temp_table']) + mock(obj).create_table_if_not_exists(config['temp_table'], options: {"expiration_time"=>nil}) mock(obj).create_table_if_not_exists(config['table']) mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_APPEND') mock(obj).delete_table(config['temp_table'])