From ea5b4c51061762e32006f8d4a25127d13392b0bb Mon Sep 17 00:00:00 2001 From: Askar Atahujaev Date: Wed, 14 Feb 2024 18:10:23 +0500 Subject: [PATCH] Added BigQueryToSpannerDelete template --- metadata/BigQueryToSpannerDelete_metadata | 61 ++++++++++ .../templates/BigQueryToSpannerDelete.java | 104 ++++++++++++++++++ 2 files changed, 165 insertions(+) create mode 100644 metadata/BigQueryToSpannerDelete_metadata create mode 100644 src/main/java/com/mercari/solution/templates/BigQueryToSpannerDelete.java diff --git a/metadata/BigQueryToSpannerDelete_metadata b/metadata/BigQueryToSpannerDelete_metadata new file mode 100644 index 0000000..3f9745f --- /dev/null +++ b/metadata/BigQueryToSpannerDelete_metadata @@ -0,0 +1,61 @@ +{ + "name": "BigQueryToSpannerDelete", + "description": "Pipeline to insert records to Spanner table from BigQuery query results.", + "parameters": [ + { + "name": "query", + "label": "SQL Query text", + "help_text": "SQL Query text to read records from BigQuery." + }, + { + "name": "projectId", + "label": "Output Spanner Project ID (Google Cloud Project ID)", + "help_text": "Project ID Cloud Spanner to store is belong to." + }, + { + "name": "instanceId", + "label": "Output Spanner Instance ID", + "help_text": "Cloud Spanner instance ID to store." + }, + { + "name": "databaseId", + "label": "Output Database ID", + "help_text": "Cloud Spanner database ID to store." + }, + { + "name": "table", + "label": "Output Spanner Table Name", + "help_text": "Cloud Spanner target table name to store." + }, + { + "name": "outputError", + "label": "GCS path to output error record as avro files.", + "help_text": "Path and filename prefix for writing output avro files. ex: gs://MyBucket/xxx", + "regexes": ["^gs:\/\/[^\n\r]+$"] + }, + { + "name": "mutationOp", + "label": "Spanner insert policy. `INSERT` or `UPDATE` or `REPLACE` or `INSERT_OR_UPDATE`", + "help_text": "Detail: https://googleapis.github.io/google-cloud-java/google-cloud-clients/apidocs/com/google/cloud/spanner/Mutation.Op.html" + }, + { + "name": "primaryKeyFields", + "label": "Key field on destination Spanner table. (Required table auto generation)", + "help_text": "If destination Spanner table not exists, template will create the table using this field as key.", + "is_optional": true + }, + { + "name": "outputNotify", + "label": "OutputNotify", + "help_text": "GCS path to put notification file that contains output file paths.", + "regexes": ["^gs:\/\/[^\n\r]+$"], + "is_optional": true + }, + { + "name": "splitField", + "label": "SplitField", + "help_text": "Field name to split output destination by the value", + "is_optional": true + } + ] +} \ No newline at end of file diff --git a/src/main/java/com/mercari/solution/templates/BigQueryToSpannerDelete.java b/src/main/java/com/mercari/solution/templates/BigQueryToSpannerDelete.java new file mode 100644 index 0000000..b6d10db --- /dev/null +++ b/src/main/java/com/mercari/solution/templates/BigQueryToSpannerDelete.java @@ -0,0 +1,104 @@ +/* + * Copyright (c) Mercari, Inc. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ +package com.mercari.solution.templates; + +import com.google.cloud.spanner.Mutation; +import com.google.cloud.spanner.Struct; +import com.mercari.solution.transforms.dofns.StructToMutationDoFn; +import com.mercari.solution.util.converter.RecordToStructConverter; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; +import org.apache.beam.sdk.io.gcp.spanner.SpannerIO; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.transforms.ParDo; + +/** + *

BigQueryToSpannerDelete template inserts BigQuery query results to specified Cloud Spanner table.

+ *

+ * Template parameters are as follows. + * + * + * + * + * + * + * + * + * + *
ParameterTypeDescription
queryStringSQL query to read records from BigQuery
projectIdStringProject ID for Spanner you will recover
instanceIdStringSpanner instanceID you will recover.
databaseIdStringSpanner databaseID you will recover.
tableStringSpanner table name to insert records.
mutationOpStringSpanner insert policy. INSERT or UPDATE or REPLACE or INSERT_OR_UPDATE
+ */ +public class BigQueryToSpannerDelete { + private BigQueryToSpannerDelete() { + } + + public interface BigQueryToSpannerDeletePipelineOption extends PipelineOptions { + @Description("SQL query to extract records from spanner") + ValueProvider getQuery(); + + void setQuery(ValueProvider query); + + @Description("Project id spanner for store belong to") + ValueProvider getProjectId(); + + void setProjectId(ValueProvider projectId); + + @Description("Spanner instance id for store") + ValueProvider getInstanceId(); + + void setInstanceId(ValueProvider instanceId); + + @Description("Spanner Database id for store") + ValueProvider getDatabaseId(); + + void setDatabaseId(ValueProvider databaseId); + + @Description("Spanner table name to store query result") + ValueProvider getTable(); + + void setTable(ValueProvider table); + + @Description("Key fields in query results. If composite key case, set comma-separated fields in key sequence.") + ValueProvider getKeyFields(); + + void setKeyFields(ValueProvider keyFields); + } + + public static void main(final String[] args) { + final BigQueryToSpannerDeletePipelineOption options = PipelineOptionsFactory + .fromArgs(args) + .as(BigQueryToSpannerDeletePipelineOption.class); + final Pipeline pipeline = Pipeline.create(options); + // The BigQuery Storage API is distinct from the existing BigQuery API. + // You must enable the BigQuery Storage API for your Google Cloud Platform project. + // TODO + // Dynamic work re-balancing is not currently supported. + // As a result, reads might be less efficient in the presence of stragglers. + // https://issues.apache.org/jira/browse/BEAM-7495 + pipeline + .apply("QueryBigQuery", BigQueryIO.read(RecordToStructConverter::convert) + .fromQuery(options.getQuery()) + .usingStandardSql() + .withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ) + .withQueryPriority(BigQueryIO.TypedRead.QueryPriority.INTERACTIVE) + .withTemplateCompatibility() + .withoutValidation() + .withCoder(SerializableCoder.of(Struct.class))) + .apply("ConvertToMutation", ParDo.of(new StructToMutationDoFn( + options.getTable(), + ValueProvider.StaticValueProvider.of(Mutation.Op.DELETE.name()), + options.getKeyFields()))) + .apply("DeleteMutation", SpannerIO.write() + .withProjectId(options.getProjectId()) + .withInstanceId(options.getInstanceId()) + .withDatabaseId(options.getDatabaseId())); + pipeline.run(); + } +}