Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added BigQueryToSpannerDelete template #13

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions metadata/BigQueryToSpannerDelete_metadata
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
{
"name": "BigQueryToSpannerDelete",
"description": "Pipeline to insert records to Spanner table from BigQuery query results.",
"parameters": [
{
"name": "query",
"label": "SQL Query text",
"help_text": "SQL Query text to read records from BigQuery."
},
{
"name": "projectId",
"label": "Output Spanner Project ID (Google Cloud Project ID)",
"help_text": "Project ID Cloud Spanner to store is belong to."
},
{
"name": "instanceId",
"label": "Output Spanner Instance ID",
"help_text": "Cloud Spanner instance ID to store."
},
{
"name": "databaseId",
"label": "Output Database ID",
"help_text": "Cloud Spanner database ID to store."
},
{
"name": "table",
"label": "Output Spanner Table Name",
"help_text": "Cloud Spanner target table name to store."
},
{
"name": "outputError",
"label": "GCS path to output error record as avro files.",
"help_text": "Path and filename prefix for writing output avro files. ex: gs://MyBucket/xxx",
"regexes": ["^gs:\/\/[^\n\r]+$"]
},
{
"name": "mutationOp",
"label": "Spanner insert policy. `INSERT` or `UPDATE` or `REPLACE` or `INSERT_OR_UPDATE`",
"help_text": "Detail: https://googleapis.github.io/google-cloud-java/google-cloud-clients/apidocs/com/google/cloud/spanner/Mutation.Op.html"
},
{
"name": "primaryKeyFields",
"label": "Key field on destination Spanner table. (Required table auto generation)",
"help_text": "If destination Spanner table not exists, template will create the table using this field as key.",
"is_optional": true
},
{
"name": "outputNotify",
"label": "OutputNotify",
"help_text": "GCS path to put notification file that contains output file paths.",
"regexes": ["^gs:\/\/[^\n\r]+$"],
"is_optional": true
},
{
"name": "splitField",
"label": "SplitField",
"help_text": "Field name to split output destination by the value",
"is_optional": true
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright (c) Mercari, Inc.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
package com.mercari.solution.templates;

import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Struct;
import com.mercari.solution.transforms.dofns.StructToMutationDoFn;
import com.mercari.solution.util.converter.RecordToStructConverter;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.ParDo;

/**
* <p>BigQueryToSpannerDelete template inserts BigQuery query results to specified Cloud Spanner table.</p>
* <p>
* Template parameters are as follows.
*
* <table summary="summary" border="1" cellpadding="3" cellspacing="0">
* <tr><th>Parameter</th><th>Type</th><th>Description</th></tr>
* <tr><td>query</td><td>String</td><td>SQL query to read records from BigQuery</td></tr>
* <tr><td>projectId</td><td>String</td><td>Project ID for Spanner you will recover</td></tr>
* <tr><td>instanceId</td><td>String</td><td>Spanner instanceID you will recover.</td></tr>
* <tr><td>databaseId</td><td>String</td><td>Spanner databaseID you will recover.</td></tr>
* <tr><td>table</td><td>String</td><td>Spanner table name to insert records.</td></tr>
* <tr><td>mutationOp</td><td>String</td><td>Spanner insert policy. INSERT or UPDATE or REPLACE or INSERT_OR_UPDATE</td></tr>
* </table>
*/
public class BigQueryToSpannerDelete {
private BigQueryToSpannerDelete() {
}

public interface BigQueryToSpannerDeletePipelineOption extends PipelineOptions {
@Description("SQL query to extract records from spanner")
ValueProvider<String> getQuery();

void setQuery(ValueProvider<String> query);

@Description("Project id spanner for store belong to")
ValueProvider<String> getProjectId();

void setProjectId(ValueProvider<String> projectId);

@Description("Spanner instance id for store")
ValueProvider<String> getInstanceId();

void setInstanceId(ValueProvider<String> instanceId);

@Description("Spanner Database id for store")
ValueProvider<String> getDatabaseId();

void setDatabaseId(ValueProvider<String> databaseId);

@Description("Spanner table name to store query result")
ValueProvider<String> getTable();

void setTable(ValueProvider<String> table);

@Description("Key fields in query results. If composite key case, set comma-separated fields in key sequence.")
ValueProvider<String> getKeyFields();

void setKeyFields(ValueProvider<String> keyFields);
}

public static void main(final String[] args) {
final BigQueryToSpannerDeletePipelineOption options = PipelineOptionsFactory
.fromArgs(args)
.as(BigQueryToSpannerDeletePipelineOption.class);
final Pipeline pipeline = Pipeline.create(options);
// The BigQuery Storage API is distinct from the existing BigQuery API.
// You must enable the BigQuery Storage API for your Google Cloud Platform project.
// TODO
// Dynamic work re-balancing is not currently supported.
// As a result, reads might be less efficient in the presence of stragglers.
// https://issues.apache.org/jira/browse/BEAM-7495
pipeline
.apply("QueryBigQuery", BigQueryIO.read(RecordToStructConverter::convert)
.fromQuery(options.getQuery())
.usingStandardSql()
.withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ)
.withQueryPriority(BigQueryIO.TypedRead.QueryPriority.INTERACTIVE)
.withTemplateCompatibility()
.withoutValidation()
.withCoder(SerializableCoder.of(Struct.class)))
.apply("ConvertToMutation", ParDo.of(new StructToMutationDoFn(
options.getTable(),
ValueProvider.StaticValueProvider.of(Mutation.Op.DELETE.name()),
options.getKeyFields())))
.apply("DeleteMutation", SpannerIO.write()
.withProjectId(options.getProjectId())
.withInstanceId(options.getInstanceId())
.withDatabaseId(options.getDatabaseId()));
pipeline.run();
}
}