diff --git a/README.md b/README.md index a5d2cb9..a5109f1 100644 --- a/README.md +++ b/README.md @@ -61,6 +61,8 @@ OAuth flow for installed applications. | auto_create_gcs_bucket | boolean | optional | false | See [GCS Bucket](#gcs-bucket) | | progress_log_interval | float | optional | nil (Disabled) | Progress log interval. The progress log is disabled by nil (default). NOTE: This option may be removed in a future because a filter plugin can achieve the same goal | | description | string | optional | nil | description of table | +| merge_keys | array | optional | | key column names for merging records in merge mode (string array, required in merge mode if table doesn't have primary key) | +| merge_rule | array | optional | | list of column assignments for updating existing records used in merge mode, for example foo = T.foo + S.foo (T means target table and S means source table). (string array, default: always overwrites with new values) | Client or request options @@ -174,6 +176,11 @@ NOTE: BigQuery does not support replacing (actually, copying into) a non-partiti 1. Delete destination table (or partition), if it exists. 2. Load to destination table (or partition). +##### merge + +1. Load to temporary table (Create and WRITE_APPEND in parallel) +2. Merge temporary table to destination table (or partition). (Use query job instead of copy job) + ### Authentication There are four authentication methods diff --git a/lib/embulk/output/bigquery.rb b/lib/embulk/output/bigquery.rb index 240e0aa..8ada6ab 100644 --- a/lib/embulk/output/bigquery.rb +++ b/lib/embulk/output/bigquery.rb @@ -91,6 +91,8 @@ def self.configure(config, schema, task_count) 'time_partitioning' => config.param('time_partitioning', :hash, :default => nil), 'clustering' => config.param('clustering', :hash, :default => nil), # google-api-ruby-client >= v0.21.0 'schema_update_options' => config.param('schema_update_options', :array, :default => nil), + 'merge_keys' => config.param('merge_keys', :array, :default => []), + 'merge_rule' => config.param('merge_rule', :array, :default => []), 'temporary_table_expiration' => config.param('temporary_table_expiration', :integer, :default => nil), @@ -103,11 +105,11 @@ def self.configure(config, schema, task_count) now = Time.now task['mode'] = task['mode'].downcase - unless %w[append append_direct replace delete_in_advance replace_backup].include?(task['mode']) - raise ConfigError.new "`mode` must be one of append, append_direct, replace, delete_in_advance, replace_backup" + unless %w[append append_direct replace delete_in_advance replace_backup merge].include?(task['mode']) + raise ConfigError.new "`mode` must be one of append, append_direct, replace, delete_in_advance, replace_backup, merge" end - if %w[append replace delete_in_advance replace_backup].include?(task['mode']) and !task['auto_create_table'] + if %w[append replace delete_in_advance replace_backup merge].include?(task['mode']) and !task['auto_create_table'] raise ConfigError.new "`mode: #{task['mode']}` requires `auto_create_table: true`" end @@ -209,7 +211,7 @@ def self.configure(config, schema, task_count) unique_name = SecureRandom.uuid.gsub('-', '_') - if %w[replace replace_backup append].include?(task['mode']) + if %w[replace replace_backup append merge].include?(task['mode']) task['temp_table'] ||= "LOAD_TEMP_#{unique_name}_#{task['table']}" else task['temp_table'] = nil @@ -317,6 +319,9 @@ def self.auto_create(task, bigquery) when 'append' bigquery.create_table_if_not_exists(task['temp_table'], options: temp_options) bigquery.create_table_if_not_exists(task['table']) # needs for when task['table'] is a partition + when 'merge' + bigquery.create_table_if_not_exists(task['temp_table'], options: temp_options) + bigquery.create_table_if_not_exists(task['table']) # needs for when task['table'] is a partition when 'replace_backup' bigquery.create_table_if_not_exists(task['temp_table'], options: temp_options) bigquery.create_table_if_not_exists(task['table']) @@ -401,7 +406,10 @@ def self.transaction(config, schema, task_count, &control) end if task['temp_table'] - if task['mode'] == 'append' + case task['mode'] + when 'merge' + bigquery.merge(task['temp_table'], task['table'], task['merge_keys'], task['merge_rule']) + when 'append' bigquery.copy(task['temp_table'], task['table'], write_disposition: 'WRITE_APPEND') else # replace or replace_backup bigquery.copy(task['temp_table'], task['table'], write_disposition: 'WRITE_TRUNCATE') @@ -413,7 +421,7 @@ def self.transaction(config, schema, task_count, &control) ensure begin - if task['temp_table'] # append or replace or replace_backup + if task['temp_table'] # append or replace or replace_backup or merge bigquery.delete_table(task['temp_table']) end ensure diff --git a/lib/embulk/output/bigquery/bigquery_client.rb b/lib/embulk/output/bigquery/bigquery_client.rb index 14a2990..b42f9b9 100644 --- a/lib/embulk/output/bigquery/bigquery_client.rb +++ b/lib/embulk/output/bigquery/bigquery_client.rb @@ -544,6 +544,125 @@ def patch_description(fields, column_options) with_network_retry { client.patch_table(@project, @dataset, table_id, table) } end end + + def merge(source_table, target_table, merge_keys, merge_rule) + columns = @schema.map { |column| column[:name] } + query = <<~EOD + MERGE `#{@dataset}`.`#{target_table}` T + USING `#{@dataset}`.`#{source_table}` S + ON #{join_merge_keys(merge_keys.empty? ? merge_keys(target_table) : merge_keys)} + WHEN MATCHED THEN + UPDATE SET #{join_merge_rule_or_columns(merge_rule, columns)} + WHEN NOT MATCHED THEN + INSERT (#{join_columns(columns)}) + VALUES (#{join_columns(columns)}) + EOD + Embulk.logger.info { "embulk-output-bigquery: Execute query... #{query.gsub(/\s+/, ' ')}" } + execute_query(query) + end + + def merge_keys(table) + query = <<~EOD + SELECT + KCU.COLUMN_NAME + FROM + `#{@dataset}`.INFORMATION_SCHEMA.KEY_COLUMN_USAGE KCU + JOIN + `#{@dataset}`.INFORMATION_SCHEMA.TABLE_CONSTRAINTS TC + ON + KCU.CONSTRAINT_CATALOG = TC.CONSTRAINT_CATALOG AND + KCU.CONSTRAINT_SCHEMA = TC.CONSTRAINT_SCHEMA AND + KCU.CONSTRAINT_NAME = TC.CONSTRAINT_NAME AND + KCU.TABLE_CATALOG = TC.TABLE_CATALOG AND + KCU.TABLE_SCHEMA = TC.TABLE_SCHEMA AND + KCU.TABLE_NAME = TC.TABLE_NAME + WHERE + TC.TABLE_NAME = '#{table}' AND + TC.CONSTRAINT_TYPE = 'PRIMARY KEY' + ORDER BY + KCU.ORDINAL_POSITION + EOD + rows = [] + run_query(query) { |response| rows.concat(response[:rows] || []) } + rows.flat_map { |row| row[:f] }.map { |cell| cell[:v] } + end + + def run_query(query, &block) + response = execute_query(query) + response = query_results(response, &block) while response + end + + def query_results(response) + with_job_retry do + begin + job_id = response[:job_reference][:job_id] + page_token = response[:page_token].to_s unless response[:page_token].to_s.empty? + response = with_network_retry { client.get_job_query_results(@project, job_id, page_token: page_token) }.to_h + yield response + response unless response[:page_token].to_s.empty? + rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e + response = {status_code: e.status_code, message: e.message, error_class: e.class} + Embulk.logger.error { + "embulk-output-bigquery: get_job_query_results(#{@project}, #{job_id}), response:#{response}" + } + raise Error, "failed to get query results, response:#{response}" + end + end + end + + def join_merge_keys(merge_keys) + raise "merge key or primary key is required" if merge_keys.empty? + + merge_keys.map { |merge_key| "T.`#{merge_key}` = S.`#{merge_key}`" }.join(" AND ") + end + + def join_merge_rule_or_columns(merge_rule, columns) + merge_rule_or_columns = merge_rule.empty? ? columns.map { |column| "T.`#{column}` = S.`#{column}`" } : merge_rule + merge_rule_or_columns.join(", ") + end + + def join_columns(columns) + columns.map { |column| "`#{column}`" }.join(", ") + end + + def execute_query(query) + with_job_retry do + begin + job_id = "embulk_query_job_#{SecureRandom.uuid}" + + Embulk.logger.info { + "embulk-output-bigquery: Query job starting... job_id:[#{job_id}]" + } + + body = { + job_reference: { + project_id: @project, + job_id: job_id, + }, + configuration: { + query: { + query: query, + use_legacy_sql: false, + } + } + } + + if @location + body[:job_reference][:location] = @location + end + + opts = {} + response = with_network_retry { client.insert_job(@project, body, **opts) } + wait_load('Query', response).to_h + rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e + response = {status_code: e.status_code, message: e.message, error_class: e.class} + Embulk.logger.error { + "embulk-output-bigquery: insert_job(#{@project}, #{body}, #{opts}), response:#{response}" + } + raise Error, "failed to query, response:#{response}" + end + end + end end end end diff --git a/test/test_configure.rb b/test/test_configure.rb index c4f16aa..b1e76ac 100644 --- a/test/test_configure.rb +++ b/test/test_configure.rb @@ -100,6 +100,9 @@ def test_mode config = least_config.merge('mode' => 'replace_backup') assert_raise { Bigquery.configure(config, schema, processor_count) } + + config = least_config.merge('mode' => 'merge') + assert_nothing_raised { Bigquery.configure(config, schema, processor_count) } end def test_location diff --git a/test/test_transaction.rb b/test/test_transaction.rb index a00fbb4..bc59510 100644 --- a/test/test_transaction.rb +++ b/test/test_transaction.rb @@ -230,6 +230,36 @@ def test_append_with_partitioning Bigquery.transaction(config, schema, processor_count, &control) end end + + sub_test_case "merge" do + def test_merge + config = least_config.merge('mode' => 'merge') + task = Bigquery.configure(config, schema, processor_count) + any_instance_of(BigqueryClient) do |obj| + mock(obj).get_dataset(config['dataset']) + mock(obj).create_table_if_not_exists(config['temp_table'], options: {"expiration_time"=>nil}) + mock(obj).create_table_if_not_exists(config['table']) + mock(obj).merge(config['temp_table'], config['table'], task['merge_keys'], task['merge_rule']) + mock(obj).delete_table(config['temp_table']) + mock(obj).patch_table + end + Bigquery.transaction(config, schema, processor_count, &control) + end + + def test_merge_with_partitioning + config = least_config.merge('mode' => 'merge', 'table' => 'table$20160929', 'auto_create_table' => true) + task = Bigquery.configure(config, schema, processor_count) + any_instance_of(BigqueryClient) do |obj| + mock(obj).get_dataset(config['dataset']) + mock(obj).create_table_if_not_exists(config['temp_table'], options: {"expiration_time"=>nil}) + mock(obj).create_table_if_not_exists(config['table']) + mock(obj).merge(config['temp_table'], config['table'], task['merge_keys'], task['merge_rule']) + mock(obj).delete_table(config['temp_table']) + mock(obj).patch_table + end + Bigquery.transaction(config, schema, processor_count, &control) + end + end end end end