Skip to content

Commit

Permalink
Merge pull request #31 from trocco-io/added_merge_mode
Browse files Browse the repository at this point in the history
Added merge mode
  • Loading branch information
NamedPython authored Aug 5, 2024
2 parents e109e0c + fbd2cce commit 61937a9
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 6 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ OAuth flow for installed applications.
| auto_create_gcs_bucket | boolean | optional | false | See [GCS Bucket](#gcs-bucket) |
| progress_log_interval | float | optional | nil (Disabled) | Progress log interval. The progress log is disabled by nil (default). NOTE: This option may be removed in a future because a filter plugin can achieve the same goal |
| description | string | optional | nil | description of table |
| merge_keys | array | optional | | key column names for merging records in merge mode (string array, required in merge mode if table doesn't have primary key) |
| merge_rule | array | optional | | list of column assignments for updating existing records used in merge mode, for example foo = T.foo + S.foo (T means target table and S means source table). (string array, default: always overwrites with new values) |

Client or request options

Expand Down Expand Up @@ -174,6 +176,11 @@ NOTE: BigQuery does not support replacing (actually, copying into) a non-partiti
1. Delete destination table (or partition), if it exists.
2. Load to destination table (or partition).

##### merge

1. Load to temporary table (Create and WRITE_APPEND in parallel)
2. Merge temporary table to destination table (or partition). (Use query job instead of copy job)

### Authentication

There are four authentication methods
Expand Down
20 changes: 14 additions & 6 deletions lib/embulk/output/bigquery.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ def self.configure(config, schema, task_count)
'time_partitioning' => config.param('time_partitioning', :hash, :default => nil),
'clustering' => config.param('clustering', :hash, :default => nil), # google-api-ruby-client >= v0.21.0
'schema_update_options' => config.param('schema_update_options', :array, :default => nil),
'merge_keys' => config.param('merge_keys', :array, :default => []),
'merge_rule' => config.param('merge_rule', :array, :default => []),

'temporary_table_expiration' => config.param('temporary_table_expiration', :integer, :default => nil),

Expand All @@ -103,11 +105,11 @@ def self.configure(config, schema, task_count)
now = Time.now

task['mode'] = task['mode'].downcase
unless %w[append append_direct replace delete_in_advance replace_backup].include?(task['mode'])
raise ConfigError.new "`mode` must be one of append, append_direct, replace, delete_in_advance, replace_backup"
unless %w[append append_direct replace delete_in_advance replace_backup merge].include?(task['mode'])
raise ConfigError.new "`mode` must be one of append, append_direct, replace, delete_in_advance, replace_backup, merge"
end

if %w[append replace delete_in_advance replace_backup].include?(task['mode']) and !task['auto_create_table']
if %w[append replace delete_in_advance replace_backup merge].include?(task['mode']) and !task['auto_create_table']
raise ConfigError.new "`mode: #{task['mode']}` requires `auto_create_table: true`"
end

Expand Down Expand Up @@ -209,7 +211,7 @@ def self.configure(config, schema, task_count)

unique_name = SecureRandom.uuid.gsub('-', '_')

if %w[replace replace_backup append].include?(task['mode'])
if %w[replace replace_backup append merge].include?(task['mode'])
task['temp_table'] ||= "LOAD_TEMP_#{unique_name}_#{task['table']}"
else
task['temp_table'] = nil
Expand Down Expand Up @@ -317,6 +319,9 @@ def self.auto_create(task, bigquery)
when 'append'
bigquery.create_table_if_not_exists(task['temp_table'], options: temp_options)
bigquery.create_table_if_not_exists(task['table']) # needs for when task['table'] is a partition
when 'merge'
bigquery.create_table_if_not_exists(task['temp_table'], options: temp_options)
bigquery.create_table_if_not_exists(task['table']) # needs for when task['table'] is a partition
when 'replace_backup'
bigquery.create_table_if_not_exists(task['temp_table'], options: temp_options)
bigquery.create_table_if_not_exists(task['table'])
Expand Down Expand Up @@ -401,7 +406,10 @@ def self.transaction(config, schema, task_count, &control)
end

if task['temp_table']
if task['mode'] == 'append'
case task['mode']
when 'merge'
bigquery.merge(task['temp_table'], task['table'], task['merge_keys'], task['merge_rule'])
when 'append'
bigquery.copy(task['temp_table'], task['table'], write_disposition: 'WRITE_APPEND')
else # replace or replace_backup
bigquery.copy(task['temp_table'], task['table'], write_disposition: 'WRITE_TRUNCATE')
Expand All @@ -413,7 +421,7 @@ def self.transaction(config, schema, task_count, &control)

ensure
begin
if task['temp_table'] # append or replace or replace_backup
if task['temp_table'] # append or replace or replace_backup or merge
bigquery.delete_table(task['temp_table'])
end
ensure
Expand Down
119 changes: 119 additions & 0 deletions lib/embulk/output/bigquery/bigquery_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,125 @@ def patch_description(fields, column_options)
with_network_retry { client.patch_table(@project, @dataset, table_id, table) }
end
end

def merge(source_table, target_table, merge_keys, merge_rule)
columns = @schema.map { |column| column[:name] }
query = <<~EOD
MERGE `#{@dataset}`.`#{target_table}` T
USING `#{@dataset}`.`#{source_table}` S
ON #{join_merge_keys(merge_keys.empty? ? merge_keys(target_table) : merge_keys)}
WHEN MATCHED THEN
UPDATE SET #{join_merge_rule_or_columns(merge_rule, columns)}
WHEN NOT MATCHED THEN
INSERT (#{join_columns(columns)})
VALUES (#{join_columns(columns)})
EOD
Embulk.logger.info { "embulk-output-bigquery: Execute query... #{query.gsub(/\s+/, ' ')}" }
execute_query(query)
end

def merge_keys(table)
query = <<~EOD
SELECT
KCU.COLUMN_NAME
FROM
`#{@dataset}`.INFORMATION_SCHEMA.KEY_COLUMN_USAGE KCU
JOIN
`#{@dataset}`.INFORMATION_SCHEMA.TABLE_CONSTRAINTS TC
ON
KCU.CONSTRAINT_CATALOG = TC.CONSTRAINT_CATALOG AND
KCU.CONSTRAINT_SCHEMA = TC.CONSTRAINT_SCHEMA AND
KCU.CONSTRAINT_NAME = TC.CONSTRAINT_NAME AND
KCU.TABLE_CATALOG = TC.TABLE_CATALOG AND
KCU.TABLE_SCHEMA = TC.TABLE_SCHEMA AND
KCU.TABLE_NAME = TC.TABLE_NAME
WHERE
TC.TABLE_NAME = '#{table}' AND
TC.CONSTRAINT_TYPE = 'PRIMARY KEY'
ORDER BY
KCU.ORDINAL_POSITION
EOD
rows = []
run_query(query) { |response| rows.concat(response[:rows] || []) }
rows.flat_map { |row| row[:f] }.map { |cell| cell[:v] }
end

def run_query(query, &block)
response = execute_query(query)
response = query_results(response, &block) while response
end

def query_results(response)
with_job_retry do
begin
job_id = response[:job_reference][:job_id]
page_token = response[:page_token].to_s unless response[:page_token].to_s.empty?
response = with_network_retry { client.get_job_query_results(@project, job_id, page_token: page_token) }.to_h
yield response
response unless response[:page_token].to_s.empty?
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
response = {status_code: e.status_code, message: e.message, error_class: e.class}
Embulk.logger.error {
"embulk-output-bigquery: get_job_query_results(#{@project}, #{job_id}), response:#{response}"
}
raise Error, "failed to get query results, response:#{response}"
end
end
end

def join_merge_keys(merge_keys)
raise "merge key or primary key is required" if merge_keys.empty?

merge_keys.map { |merge_key| "T.`#{merge_key}` = S.`#{merge_key}`" }.join(" AND ")
end

def join_merge_rule_or_columns(merge_rule, columns)
merge_rule_or_columns = merge_rule.empty? ? columns.map { |column| "T.`#{column}` = S.`#{column}`" } : merge_rule
merge_rule_or_columns.join(", ")
end

def join_columns(columns)
columns.map { |column| "`#{column}`" }.join(", ")
end

def execute_query(query)
with_job_retry do
begin
job_id = "embulk_query_job_#{SecureRandom.uuid}"

Embulk.logger.info {
"embulk-output-bigquery: Query job starting... job_id:[#{job_id}]"
}

body = {
job_reference: {
project_id: @project,
job_id: job_id,
},
configuration: {
query: {
query: query,
use_legacy_sql: false,
}
}
}

if @location
body[:job_reference][:location] = @location
end

opts = {}
response = with_network_retry { client.insert_job(@project, body, **opts) }
wait_load('Query', response).to_h
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
response = {status_code: e.status_code, message: e.message, error_class: e.class}
Embulk.logger.error {
"embulk-output-bigquery: insert_job(#{@project}, #{body}, #{opts}), response:#{response}"
}
raise Error, "failed to query, response:#{response}"
end
end
end
end
end
end
Expand Down
3 changes: 3 additions & 0 deletions test/test_configure.rb
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ def test_mode

config = least_config.merge('mode' => 'replace_backup')
assert_raise { Bigquery.configure(config, schema, processor_count) }

config = least_config.merge('mode' => 'merge')
assert_nothing_raised { Bigquery.configure(config, schema, processor_count) }
end

def test_location
Expand Down
30 changes: 30 additions & 0 deletions test/test_transaction.rb
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,36 @@ def test_append_with_partitioning
Bigquery.transaction(config, schema, processor_count, &control)
end
end

sub_test_case "merge" do
def test_merge
config = least_config.merge('mode' => 'merge')
task = Bigquery.configure(config, schema, processor_count)
any_instance_of(BigqueryClient) do |obj|
mock(obj).get_dataset(config['dataset'])
mock(obj).create_table_if_not_exists(config['temp_table'], options: {"expiration_time"=>nil})
mock(obj).create_table_if_not_exists(config['table'])
mock(obj).merge(config['temp_table'], config['table'], task['merge_keys'], task['merge_rule'])
mock(obj).delete_table(config['temp_table'])
mock(obj).patch_table
end
Bigquery.transaction(config, schema, processor_count, &control)
end

def test_merge_with_partitioning
config = least_config.merge('mode' => 'merge', 'table' => 'table$20160929', 'auto_create_table' => true)
task = Bigquery.configure(config, schema, processor_count)
any_instance_of(BigqueryClient) do |obj|
mock(obj).get_dataset(config['dataset'])
mock(obj).create_table_if_not_exists(config['temp_table'], options: {"expiration_time"=>nil})
mock(obj).create_table_if_not_exists(config['table'])
mock(obj).merge(config['temp_table'], config['table'], task['merge_keys'], task['merge_rule'])
mock(obj).delete_table(config['temp_table'])
mock(obj).patch_table
end
Bigquery.transaction(config, schema, processor_count, &control)
end
end
end
end
end

0 comments on commit 61937a9

Please sign in to comment.