Skip to content

Commit

Permalink
Merge pull request #30 from trocco-io/merge_origin
Browse files Browse the repository at this point in the history
Merge origin
  • Loading branch information
d-hrs authored Jul 29, 2024
2 parents 9f4febc + a6bc143 commit e109e0c
Show file tree
Hide file tree
Showing 12 changed files with 125 additions and 26 deletions.
32 changes: 32 additions & 0 deletions .github/workflows/check.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
name: Check
on: [ pull_request, push ]
jobs:
check:
runs-on: ubuntu-latest
# push: always run.
# pull_request: run only when the PR is submitted from a forked repository, not within this repository.
if: github.event_name == 'push' || github.event.pull_request.head.repo.full_name != github.repository
strategy:
matrix:
jruby_version:
- 9.1.17.0
fail-fast: false
steps:
- uses: actions/checkout@v4
- name: Set up OpenJDK 8
uses: actions/setup-java@v4
with:
java-version: 8
distribution: "temurin"
- uses: ruby/setup-ruby@v1
with:
ruby-version: 'jruby-${{ matrix.jruby_version }}'
bundler-cache: false
- name: install embulk.jar
run: "curl -L -o embulk.jar https://github.com/embulk/embulk/releases/download/v0.9.25/embulk-0.9.25.jar"
- name: chmod embulk.jar
run: "chmod a+x embulk.jar"
- name: bundle install
run: "./embulk.jar bundle install --path vendor/bundle"
- name: rake test
run: 'bundle exec env RUBYOPT="-r ./embulk.jar -r embulk -r embulk/java/bootstrap" rake test'
42 changes: 42 additions & 0 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
name: Publish
on:
push:
tags:
- "v0.*"
jobs:
publish:
runs-on: ubuntu-latest
environment: maven-central-and-ruby-gems
strategy:
fail-fast: true
steps:
- uses: actions/checkout@v4
- name: Set up Ruby
uses: ruby/setup-ruby@v1
with:
ruby-version: 3.3.0
# get tag variable using {{ github.ref_name }}
#
# References:
# * https://docs.github.com/en/actions/learn-github-actions/contexts#github-context
# * https://docs.github.com/en/actions/learn-github-actions/variables#default-environment-variables
- name: extract gem version from tag
id: vars
run: echo version=${{ github.ref_name }} | sed -e 's/v0/0/' >> $GITHUB_OUTPUT
#
# From gem push documents.
#
# The push command will use ~/.gem/credentials to authenticate to a server,
# but you can use the RubyGems environment variable GEM_HOST_API_KEY
# to set the api key to authenticate.
#
# https://guides.rubygems.org/command-reference/#gem-push
#
- name: Publish
run: |
if [ -z "${GEM_HOST_API_KEY}" ]; then exit 0; fi
rake build
gem push pkg/${EMBULK_PLUGIN_NAME}-${{ steps.vars.outputs.version }}.gem
env:
EMBULK_PLUGIN_NAME: embulk-output-bigquery
GEM_HOST_API_KEY: "${{secrets.RUBYGEMS_API_KEY}}"
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
## 0.7.2 - 2024-07-21
* [maintenance] Fix GitHub Actions #166
* [maintenance] Fix gcs_client in order to load data using gcs_bucket parameter (Thanks to kashira202111) #164
* [maintenance] Prevent creating unnecessary tables. (Thanks to kashira202111) #148

## 0.7.1 - 2024-03-4
* [enhancement] Support description of columns and tables (Thanks to @kyoshidajp and @fagai ) #142
* [maintenance] Add missing GitHub Actions environment setting. #160
* [maintenance] Replace google-api-client with specific Google APIs (Thanks to @Nozomuts) #161
* [maintenance] Update GitHub Actions use checkout@v4 and setup-java@v4 #162

## 0.7.0 - 2024-02-1
* [enhancement] Add support Embulk 0.11.x

## 0.6.9 - 2023-03-16
* [enhancement] Add SSLException to retry job (thanks to @mzumi)

Expand Down
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ gem 'embulk-parser-jsonl'
gem 'pry-nav'
gem 'test-unit'
gem 'test-unit-rr'
gem 'rake', '10.4.2'
11 changes: 9 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
# embulk-output-bigquery

[![Build Status](https://secure.travis-ci.org/embulk/embulk-output-bigquery.png?branch=master)](http://travis-ci.org/embulk/embulk-output-bigquery)

[Embulk](https://github.com/embulk/embulk/) output plugin to load/insert data into [Google BigQuery](https://cloud.google.com/bigquery/) using [direct insert](https://cloud.google.com/bigquery/loading-data-into-bigquery#loaddatapostrequest)

## Overview
Expand All @@ -14,6 +12,13 @@ https://developers.google.com/bigquery/loading-data-into-bigquery
* **Cleanup supported**: no
* **Dynamic table creating**: yes

### Supported Embulk

| gem version | Embulk version |
|------------------|--------------------|
| 0.7.0 and higher | v0.11.0 and higher |
| 0.6.9 and lower | v0.9.X and lower |

### NOT IMPLEMENTED
* insert data over streaming inserts
* for continuous real-time insertions
Expand Down Expand Up @@ -55,6 +60,7 @@ OAuth flow for installed applications.
| gcs_bucket | string | optional | nil | See [GCS Bucket](#gcs-bucket) |
| auto_create_gcs_bucket | boolean | optional | false | See [GCS Bucket](#gcs-bucket) |
| progress_log_interval | float | optional | nil (Disabled) | Progress log interval. The progress log is disabled by nil (default). NOTE: This option may be removed in a future because a filter plugin can achieve the same goal |
| description | string | optional | nil | description of table |

Client or request options

Expand Down Expand Up @@ -325,6 +331,7 @@ Column options are used to aid guessing BigQuery schema, or to define conversion
- numeric: `STRING`
- **mode**: BigQuery mode such as `NULLABLE`, `REQUIRED`, and `REPEATED` (string, default: `NULLABLE`)
- **fields**: Describes the nested schema fields if the type property is set to RECORD. Please note that this is **required** for `RECORD` column.
- **description**: description (string, default is `None`).
- **timestamp_format**: timestamp format to convert into/from `timestamp` (string, default is `default_timestamp_format`)
- **timezone**: timezone to convert into/from `timestamp`, `date` (string, default is `default_timezone`).
- **description**: description for the column.
Expand Down
2 changes: 1 addition & 1 deletion embulk-output-bigquery.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |spec|
spec.name = "embulk-output-bigquery"
spec.version = "0.6.9.trocco.0.0.3"
spec.version = "0.7.2"
spec.authors = ["Satoshi Akama", "Naotoshi Seo"]
spec.summary = "Google BigQuery output plugin for Embulk"
spec.description = "Embulk plugin that insert records to Google BigQuery."
Expand Down
2 changes: 2 additions & 0 deletions lib/embulk/output/bigquery.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ def self.configure(config, schema, task_count)
'payload_column' => config.param('payload_column', :string, :default => nil),
'payload_column_index' => config.param('payload_column_index', :integer, :default => nil),

'description' => config.param('description', :string, :default => nil),

'open_timeout_sec' => config.param('open_timeout_sec', :integer, :default => nil),
'timeout_sec' => config.param('timeout_sec', :integer, :default => nil), # google-api-ruby-client < v0.11.0
'send_timeout_sec' => config.param('send_timeout_sec', :integer, :default => nil), # google-api-ruby-client >= v0.11.0
Expand Down
15 changes: 8 additions & 7 deletions lib/embulk/output/bigquery/bigquery_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def load_from_gcs(object_uris, table)
opts = {}

Embulk.logger.debug { "embulk-output-bigquery: insert_job(#{@project}, #{body}, #{opts})" }
response = with_network_retry { client.insert_job(@project, body, opts) }
response = with_network_retry { client.insert_job(@project, body, **opts) }
unless @task['is_skip_job_result_check']
response = wait_load('Load', response)
end
Expand Down Expand Up @@ -222,7 +222,7 @@ def load(path, table, write_disposition: 'WRITE_APPEND')
# },
}
Embulk.logger.debug { "embulk-output-bigquery: insert_job(#{@project}, #{body}, #{opts})" }
response = with_network_retry { client.insert_job(@project, body, opts) }
response = with_network_retry { client.insert_job(@project, body, **opts) }
if @task['is_skip_job_result_check']
response
else
Expand Down Expand Up @@ -278,7 +278,7 @@ def copy(source_table, destination_table, destination_dataset = nil, write_dispo

opts = {}
Embulk.logger.debug { "embulk-output-bigquery: insert_job(#{@project}, #{body}, #{opts})" }
response = with_network_retry { client.insert_job(@project, body, opts) }
response = with_network_retry { client.insert_job(@project, body, **opts) }
wait_load('Copy', response)
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
response = {status_code: e.status_code, message: e.message, error_class: e.class}
Expand Down Expand Up @@ -372,7 +372,7 @@ def create_dataset(dataset = nil, reference: nil)
end
opts = {}
Embulk.logger.debug { "embulk-output-bigquery: insert_dataset(#{@project}, #{dataset}, #{@location_for_log}, #{body}, #{opts})" }
with_network_retry { client.insert_dataset(@project, body, opts) }
with_network_retry { client.insert_dataset(@project, body, **opts) }
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
if e.status_code == 409 && /Already Exists:/ =~ e.message
# ignore 'Already Exists' error
Expand Down Expand Up @@ -420,6 +420,7 @@ def create_table_if_not_exists(table, dataset: nil, options: nil)
table_reference: {
table_id: table,
},
description: @task['description'],
schema: {
fields: fields,
}
Expand All @@ -446,8 +447,8 @@ def create_table_if_not_exists(table, dataset: nil, options: nil)
end

opts = {}
Embulk.logger.debug { "embulk-output-bigquery: insert_table(#{@project}, #{dataset}, #{@location_for_log}, #{body}, #{opts})" }
with_network_retry { client.insert_table(@project, dataset, body, opts) }
Embulk.logger.debug { "embulk-output-bigquery: insert_table(#{@destination_project}, #{dataset}, #{@location_for_log}, #{body}, #{opts})" }
with_network_retry { client.insert_table(@destination_project, dataset, body, **opts) }
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
if e.status_code == 409 && /Already Exists:/ =~ e.message
# ignore 'Already Exists' error
Expand All @@ -456,7 +457,7 @@ def create_table_if_not_exists(table, dataset: nil, options: nil)

response = {status_code: e.status_code, message: e.message, error_class: e.class}
Embulk.logger.error {
"embulk-output-bigquery: insert_table(#{@project}, #{dataset}, #{@location_for_log}, #{body}, #{opts}), response:#{response}"
"embulk-output-bigquery: insert_table(#{@destination_project}, #{dataset}, #{@location_for_log}, #{body}, #{opts}), response:#{response}"
}
raise Error, "failed to create table #{@destination_project}:#{dataset}.#{table} in #{@location_for_log}, response:#{response}"
end
Expand Down
6 changes: 3 additions & 3 deletions lib/embulk/output/bigquery/gcs_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def insert_temporary_bucket(bucket = nil)
opts = {}

Embulk.logger.debug { "embulk-output-bigquery: insert_temporary_bucket(#{@project}, #{body}, #{opts})" }
with_network_retry { client.insert_bucket(@project, body, opts) }
with_network_retry { client.insert_bucket(@project, body, **opts) }
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
if e.status_code == 409 && /conflict:/ =~ e.message
# ignore 'Already Exists' error
Expand Down Expand Up @@ -81,7 +81,7 @@ def insert_object(path, object: nil, bucket: nil)

Embulk.logger.debug { "embulk-output-bigquery: insert_object(#{bucket}, #{body}, #{opts})" }
# memo: gcs is strongly consistent for insert (read-after-write). ref: https://cloud.google.com/storage/docs/consistency
with_network_retry { client.insert_object(bucket, body, opts) }
with_network_retry { client.insert_object(bucket, body, **opts) }
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
response = {status_code: e.status_code, message: e.message, error_class: e.class}
Embulk.logger.error {
Expand Down Expand Up @@ -114,7 +114,7 @@ def delete_object(object, bucket: nil)
opts = {}

Embulk.logger.debug { "embulk-output-bigquery: delete_object(#{bucket}, #{object}, #{opts})" }
response = with_network_retry { client.delete_object(bucket, object, opts) }
response = with_network_retry { client.delete_object(bucket, object, **opts) }
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
if e.status_code == 404 # ignore 'notFound' error
return nil
Expand Down
8 changes: 4 additions & 4 deletions lib/embulk/output/bigquery/helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ def self.fields_from_embulk_schema(task, schema)
embulk_type = column[:type]
column_option = column_options_map[column_name] || {}
{}.tap do |field|
field[:name] = column_name
field[:type] = (column_option['type'] || bq_type_from_embulk_type(embulk_type)).upcase
field[:mode] = column_option['mode'] if column_option['mode']
field[:fields] = deep_symbolize_keys(column_option['fields']) if column_option['fields']
field[:name] = column_name
field[:type] = (column_option['type'] || bq_type_from_embulk_type(embulk_type)).upcase
field[:mode] = column_option['mode'] if column_option['mode']
field[:fields] = deep_symbolize_keys(column_option['fields']) if column_option['fields']
field[:description] = column_option['description'] if column_option['description']
end
end
Expand Down
4 changes: 2 additions & 2 deletions test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def test_fields_from_embulk_schema_with_column_options
])
task = {
'column_options' => [
{'name' => 'boolean', 'type' => 'STRING', 'mode' => 'REQUIRED'},
{'name' => 'boolean', 'type' => 'STRING', 'mode' => 'REQUIRED', 'description' => 'hoge'},
{'name' => 'long', 'type' => 'STRING'},
{'name' => 'double', 'type' => 'STRING'},
{'name' => 'string', 'type' => 'INTEGER', 'description' => 'memo'},
Expand All @@ -81,7 +81,7 @@ def test_fields_from_embulk_schema_with_column_options
],
}
expected = [
{name: 'boolean', type: 'STRING', mode: 'REQUIRED'},
{name: 'boolean', type: 'STRING', mode: 'REQUIRED', description: 'hoge'},
{name: 'long', type: 'STRING'},
{name: 'double', type: 'STRING'},
{name: 'string', type: 'INTEGER', description: 'memo'},
Expand Down
14 changes: 7 additions & 7 deletions test/test_transaction.rb
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def test_replace
task = Bigquery.configure(config, schema, processor_count)
any_instance_of(BigqueryClient) do |obj|
mock(obj).get_dataset(config['dataset'])
mock(obj).create_table_if_not_exists(config['temp_table'])
mock(obj).create_table_if_not_exists(config['temp_table'], options: {"expiration_time"=>nil})
mock(obj).create_table_if_not_exists(config['table'])
mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_TRUNCATE')
mock(obj).delete_table(config['temp_table'])
Expand All @@ -129,7 +129,7 @@ def test_replace_with_partitioning
task = Bigquery.configure(config, schema, processor_count)
any_instance_of(BigqueryClient) do |obj|
mock(obj).get_dataset(config['dataset'])
mock(obj).create_table_if_not_exists(config['temp_table'])
mock(obj).create_table_if_not_exists(config['temp_table'], options: {"expiration_time"=>nil})
mock(obj).create_table_if_not_exists(config['table'])
mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_TRUNCATE')
mock(obj).delete_table(config['temp_table'])
Expand All @@ -146,7 +146,7 @@ def test_replace_backup
any_instance_of(BigqueryClient) do |obj|
mock(obj).get_dataset(config['dataset'])
mock(obj).get_dataset(config['dataset_old'])
mock(obj).create_table_if_not_exists(config['temp_table'])
mock(obj).create_table_if_not_exists(config['temp_table'], options: {"expiration_time"=>nil})
mock(obj).create_table_if_not_exists(config['table'])
mock(obj).create_table_if_not_exists(config['table_old'], dataset: config['dataset_old'])

Expand All @@ -167,7 +167,7 @@ def test_replace_backup_auto_create_dataset
mock(obj).create_dataset(config['dataset'])
mock(obj).create_dataset(config['dataset_old'], reference: config['dataset'])
mock(obj).create_table_if_not_exists(config['table'])
mock(obj).create_table_if_not_exists(config['temp_table'])
mock(obj).create_table_if_not_exists(config['temp_table'], options: {"expiration_time"=>nil})
mock(obj).create_table_if_not_exists(config['table_old'], dataset: config['dataset_old'])

mock(obj).get_table_or_partition(config['table'])
Expand All @@ -186,7 +186,7 @@ def test_replace_backup_with_partitioning
any_instance_of(BigqueryClient) do |obj|
mock(obj).get_dataset(config['dataset'])
mock(obj).get_dataset(config['dataset_old'])
mock(obj).create_table_if_not_exists(config['temp_table'])
mock(obj).create_table_if_not_exists(config['temp_table'], options: {"expiration_time"=>nil})
mock(obj).create_table_if_not_exists(config['table'])
mock(obj).create_table_if_not_exists(config['table_old'], dataset: config['dataset_old'])

Expand All @@ -207,7 +207,7 @@ def test_append
task = Bigquery.configure(config, schema, processor_count)
any_instance_of(BigqueryClient) do |obj|
mock(obj).get_dataset(config['dataset'])
mock(obj).create_table_if_not_exists(config['temp_table'])
mock(obj).create_table_if_not_exists(config['temp_table'], options: {"expiration_time"=>nil})
mock(obj).create_table_if_not_exists(config['table'])
mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_APPEND')
mock(obj).delete_table(config['temp_table'])
Expand All @@ -221,7 +221,7 @@ def test_append_with_partitioning
task = Bigquery.configure(config, schema, processor_count)
any_instance_of(BigqueryClient) do |obj|
mock(obj).get_dataset(config['dataset'])
mock(obj).create_table_if_not_exists(config['temp_table'])
mock(obj).create_table_if_not_exists(config['temp_table'], options: {"expiration_time"=>nil})
mock(obj).create_table_if_not_exists(config['table'])
mock(obj).copy(config['temp_table'], config['table'], write_disposition: 'WRITE_APPEND')
mock(obj).delete_table(config['temp_table'])
Expand Down

0 comments on commit e109e0c

Please sign in to comment.