Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added merge mode #31

Merged
merged 1 commit into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ OAuth flow for installed applications.
| auto_create_gcs_bucket | boolean | optional | false | See [GCS Bucket](#gcs-bucket) |
| progress_log_interval | float | optional | nil (Disabled) | Progress log interval. The progress log is disabled by nil (default). NOTE: This option may be removed in a future because a filter plugin can achieve the same goal |
| description | string | optional | nil | description of table |
| merge_keys | array | optional | | key column names for merging records in merge mode (string array, required in merge mode if table doesn't have primary key) |
| merge_rule | array | optional | | list of column assignments for updating existing records used in merge mode, for example foo = T.foo + S.foo (T means target table and S means source table). (string array, default: always overwrites with new values) |

Client or request options

Expand Down Expand Up @@ -174,6 +176,11 @@ NOTE: BigQuery does not support replacing (actually, copying into) a non-partiti
1. Delete destination table (or partition), if it exists.
2. Load to destination table (or partition).

##### merge

1. Load to temporary table (Create and WRITE_APPEND in parallel)
2. Merge temporary table to destination table (or partition). (Use query job instead of copy job)

### Authentication

There are four authentication methods
Expand Down
20 changes: 14 additions & 6 deletions lib/embulk/output/bigquery.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ def self.configure(config, schema, task_count)
'time_partitioning' => config.param('time_partitioning', :hash, :default => nil),
'clustering' => config.param('clustering', :hash, :default => nil), # google-api-ruby-client >= v0.21.0
'schema_update_options' => config.param('schema_update_options', :array, :default => nil),
'merge_keys' => config.param('merge_keys', :array, :default => []),
'merge_rule' => config.param('merge_rule', :array, :default => []),

'temporary_table_expiration' => config.param('temporary_table_expiration', :integer, :default => nil),

Expand All @@ -103,11 +105,11 @@ def self.configure(config, schema, task_count)
now = Time.now

task['mode'] = task['mode'].downcase
unless %w[append append_direct replace delete_in_advance replace_backup].include?(task['mode'])
raise ConfigError.new "`mode` must be one of append, append_direct, replace, delete_in_advance, replace_backup"
unless %w[append append_direct replace delete_in_advance replace_backup merge].include?(task['mode'])
raise ConfigError.new "`mode` must be one of append, append_direct, replace, delete_in_advance, replace_backup, merge"
end

if %w[append replace delete_in_advance replace_backup].include?(task['mode']) and !task['auto_create_table']
if %w[append replace delete_in_advance replace_backup merge].include?(task['mode']) and !task['auto_create_table']
raise ConfigError.new "`mode: #{task['mode']}` requires `auto_create_table: true`"
end

Expand Down Expand Up @@ -209,7 +211,7 @@ def self.configure(config, schema, task_count)

unique_name = SecureRandom.uuid.gsub('-', '_')

if %w[replace replace_backup append].include?(task['mode'])
if %w[replace replace_backup append merge].include?(task['mode'])
task['temp_table'] ||= "LOAD_TEMP_#{unique_name}_#{task['table']}"
else
task['temp_table'] = nil
Expand Down Expand Up @@ -317,6 +319,9 @@ def self.auto_create(task, bigquery)
when 'append'
bigquery.create_table_if_not_exists(task['temp_table'], options: temp_options)
bigquery.create_table_if_not_exists(task['table']) # needs for when task['table'] is a partition
when 'merge'
bigquery.create_table_if_not_exists(task['temp_table'], options: temp_options)
bigquery.create_table_if_not_exists(task['table']) # needs for when task['table'] is a partition
when 'replace_backup'
bigquery.create_table_if_not_exists(task['temp_table'], options: temp_options)
bigquery.create_table_if_not_exists(task['table'])
Expand Down Expand Up @@ -401,7 +406,10 @@ def self.transaction(config, schema, task_count, &control)
end

if task['temp_table']
if task['mode'] == 'append'
case task['mode']
when 'merge'
bigquery.merge(task['temp_table'], task['table'], task['merge_keys'], task['merge_rule'])
when 'append'
bigquery.copy(task['temp_table'], task['table'], write_disposition: 'WRITE_APPEND')
else # replace or replace_backup
bigquery.copy(task['temp_table'], task['table'], write_disposition: 'WRITE_TRUNCATE')
Expand All @@ -413,7 +421,7 @@ def self.transaction(config, schema, task_count, &control)

ensure
begin
if task['temp_table'] # append or replace or replace_backup
if task['temp_table'] # append or replace or replace_backup or merge
bigquery.delete_table(task['temp_table'])
end
ensure
Expand Down
119 changes: 119 additions & 0 deletions lib/embulk/output/bigquery/bigquery_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,125 @@ def patch_description(fields, column_options)
with_network_retry { client.patch_table(@project, @dataset, table_id, table) }
end
end

def merge(source_table, target_table, merge_keys, merge_rule)
columns = @schema.map { |column| column[:name] }
query = <<~EOD
MERGE `#{@dataset}`.`#{target_table}` T
USING `#{@dataset}`.`#{source_table}` S
ON #{join_merge_keys(merge_keys.empty? ? merge_keys(target_table) : merge_keys)}
WHEN MATCHED THEN
UPDATE SET #{join_merge_rule_or_columns(merge_rule, columns)}
WHEN NOT MATCHED THEN
INSERT (#{join_columns(columns)})
VALUES (#{join_columns(columns)})
EOD
Embulk.logger.info { "embulk-output-bigquery: Execute query... #{query.gsub(/\s+/, ' ')}" }
execute_query(query)
end

def merge_keys(table)
query = <<~EOD
SELECT
KCU.COLUMN_NAME
FROM
`#{@dataset}`.INFORMATION_SCHEMA.KEY_COLUMN_USAGE KCU
JOIN
`#{@dataset}`.INFORMATION_SCHEMA.TABLE_CONSTRAINTS TC
ON
KCU.CONSTRAINT_CATALOG = TC.CONSTRAINT_CATALOG AND
KCU.CONSTRAINT_SCHEMA = TC.CONSTRAINT_SCHEMA AND
KCU.CONSTRAINT_NAME = TC.CONSTRAINT_NAME AND
KCU.TABLE_CATALOG = TC.TABLE_CATALOG AND
KCU.TABLE_SCHEMA = TC.TABLE_SCHEMA AND
KCU.TABLE_NAME = TC.TABLE_NAME
WHERE
TC.TABLE_NAME = '#{table}' AND
TC.CONSTRAINT_TYPE = 'PRIMARY KEY'
ORDER BY
KCU.ORDINAL_POSITION
EOD
rows = []
run_query(query) { |response| rows.concat(response[:rows] || []) }
rows.flat_map { |row| row[:f] }.map { |cell| cell[:v] }
end

def run_query(query, &block)
response = execute_query(query)
response = query_results(response, &block) while response
end

def query_results(response)
with_job_retry do
begin
job_id = response[:job_reference][:job_id]
page_token = response[:page_token].to_s unless response[:page_token].to_s.empty?
response = with_network_retry { client.get_job_query_results(@project, job_id, page_token: page_token) }.to_h
yield response
response unless response[:page_token].to_s.empty?
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
response = {status_code: e.status_code, message: e.message, error_class: e.class}
Embulk.logger.error {
"embulk-output-bigquery: get_job_query_results(#{@project}, #{job_id}), response:#{response}"
}
raise Error, "failed to get query results, response:#{response}"
end
end
end

def join_merge_keys(merge_keys)
raise "merge key or primary key is required" if merge_keys.empty?

merge_keys.map { |merge_key| "T.`#{merge_key}` = S.`#{merge_key}`" }.join(" AND ")
end

def join_merge_rule_or_columns(merge_rule, columns)
merge_rule_or_columns = merge_rule.empty? ? columns.map { |column| "T.`#{column}` = S.`#{column}`" } : merge_rule
merge_rule_or_columns.join(", ")
end

def join_columns(columns)
columns.map { |column| "`#{column}`" }.join(", ")
end

def execute_query(query)
with_job_retry do
begin
job_id = "embulk_query_job_#{SecureRandom.uuid}"

Embulk.logger.info {
"embulk-output-bigquery: Query job starting... job_id:[#{job_id}]"
}

body = {
job_reference: {
project_id: @project,
job_id: job_id,
},
configuration: {
query: {
query: query,
use_legacy_sql: false,
}
}
}

if @location
body[:job_reference][:location] = @location
end

opts = {}
response = with_network_retry { client.insert_job(@project, body, **opts) }
wait_load('Query', response).to_h
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
response = {status_code: e.status_code, message: e.message, error_class: e.class}
Embulk.logger.error {
"embulk-output-bigquery: insert_job(#{@project}, #{body}, #{opts}), response:#{response}"
}
raise Error, "failed to query, response:#{response}"
end
end
end
end
end
end
Expand Down
3 changes: 3 additions & 0 deletions test/test_configure.rb
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ def test_mode

config = least_config.merge('mode' => 'replace_backup')
assert_raise { Bigquery.configure(config, schema, processor_count) }

config = least_config.merge('mode' => 'merge')
assert_nothing_raised { Bigquery.configure(config, schema, processor_count) }
end

def test_location
Expand Down
30 changes: 30 additions & 0 deletions test/test_transaction.rb
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,36 @@ def test_append_with_partitioning
Bigquery.transaction(config, schema, processor_count, &control)
end
end

sub_test_case "merge" do
def test_merge
config = least_config.merge('mode' => 'merge')
task = Bigquery.configure(config, schema, processor_count)
any_instance_of(BigqueryClient) do |obj|
mock(obj).get_dataset(config['dataset'])
mock(obj).create_table_if_not_exists(config['temp_table'], options: {"expiration_time"=>nil})
mock(obj).create_table_if_not_exists(config['table'])
mock(obj).merge(config['temp_table'], config['table'], task['merge_keys'], task['merge_rule'])
mock(obj).delete_table(config['temp_table'])
mock(obj).patch_table
end
Bigquery.transaction(config, schema, processor_count, &control)
end

def test_merge_with_partitioning
config = least_config.merge('mode' => 'merge', 'table' => 'table$20160929', 'auto_create_table' => true)
task = Bigquery.configure(config, schema, processor_count)
any_instance_of(BigqueryClient) do |obj|
mock(obj).get_dataset(config['dataset'])
mock(obj).create_table_if_not_exists(config['temp_table'], options: {"expiration_time"=>nil})
mock(obj).create_table_if_not_exists(config['table'])
mock(obj).merge(config['temp_table'], config['table'], task['merge_keys'], task['merge_rule'])
mock(obj).delete_table(config['temp_table'])
mock(obj).patch_table
end
Bigquery.transaction(config, schema, processor_count, &control)
end
end
end
end
end