Skip to content

Commit

Permalink
Support table creation in sink
Browse files Browse the repository at this point in the history
- Use avro schema in BigQuerySchemaProvider to derive BigQuery table schema
- Use Flink's in-built logical type to avro schema converter for Table API
- Rely on default BigQuery client retries when creating BigQuery table

Also, remove duplicate connector exception class from sink
  • Loading branch information
jayehwhyehentee committed Jan 5, 2025
1 parent 6938190 commit d0e7a62
Show file tree
Hide file tree
Showing 36 changed files with 1,209 additions and 302 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@

import org.apache.flink.api.connector.sink2.Sink;

import com.google.cloud.bigquery.TimePartitioning;
import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions;
import com.google.cloud.flink.bigquery.sink.client.BigQueryClientWithErrorHandling;
import com.google.cloud.flink.bigquery.sink.serializer.BigQueryProtoSerializer;
import com.google.cloud.flink.bigquery.sink.serializer.BigQuerySchemaProvider;
import com.google.cloud.flink.bigquery.sink.serializer.BigQuerySchemaProviderImpl;
import com.google.cloud.flink.bigquery.sink.writer.CreateTableOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

/** Base class for developing a BigQuery sink. */
abstract class BigQueryBaseSink<IN> implements Sink<IN> {

Expand All @@ -43,6 +48,12 @@ abstract class BigQueryBaseSink<IN> implements Sink<IN> {
final BigQuerySchemaProvider schemaProvider;
final BigQueryProtoSerializer serializer;
final String tablePath;
final boolean enableTableCreation;
final String partitionField;
final TimePartitioning.Type partitionType;
final Long partitionExpirationMillis;
final List<String> clusteredFields;
final String region;

BigQueryBaseSink(BigQuerySinkConfig sinkConfig) {
validateSinkConfig(sinkConfig);
Expand All @@ -59,14 +70,29 @@ abstract class BigQueryBaseSink<IN> implements Sink<IN> {
connectOptions.getProjectId(),
connectOptions.getDataset(),
connectOptions.getTable());
enableTableCreation = sinkConfig.enableTableCreation();
partitionField = sinkConfig.getPartitionField();
partitionType = sinkConfig.getPartitionType();
partitionExpirationMillis = sinkConfig.getPartitionExpirationMillis();
clusteredFields = sinkConfig.getClusteredFields();
region = sinkConfig.getRegion();
}

private void validateSinkConfig(BigQuerySinkConfig sinkConfig) {
if (sinkConfig.getConnectOptions() == null) {
throw new IllegalArgumentException("BigQuery connect options cannot be null");
// Do not use class attribute!
// This method is invoked before any assignments.
BigQueryConnectOptions options = sinkConfig.getConnectOptions();
if (options == null) {
throw new IllegalArgumentException(
"BigQuery connect options in sink config cannot be null");
}
if (sinkConfig.getSerializer() == null) {
throw new IllegalArgumentException("BigQuery serializer cannot be null");
throw new IllegalArgumentException("BigQuery serializer in sink config cannot be null");
}
if (!BigQueryClientWithErrorHandling.tableExists(options)
&& !sinkConfig.enableTableCreation()) {
throw new IllegalStateException(
"Destination BigQuery table does not exist and table creation is not enabled in sink.");
}
}

Expand All @@ -80,4 +106,14 @@ void checkParallelism(int numberOfParallelSubtasks) {
throw new IllegalStateException("Attempting to create more Sink Writers than allowed");
}
}

CreateTableOptions createTableOptions() {
return new CreateTableOptions(
enableTableCreation,
partitionField,
partitionType,
partitionExpirationMillis,
clusteredFields,
region);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ class BigQueryDefaultSink extends BigQueryBaseSink {
public SinkWriter createWriter(InitContext context) {
checkParallelism(context.getNumberOfParallelSubtasks());
return new BigQueryDefaultWriter(
tablePath, connectOptions, schemaProvider, serializer, context);
tablePath,
connectOptions,
schemaProvider,
serializer,
createTableOptions(),
context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,12 @@ public class BigQueryExactlyOnceSink<IN> extends BigQueryBaseSink<IN>
createWriter(InitContext context) {
checkParallelism(context.getNumberOfParallelSubtasks());
return new BigQueryBufferedWriter(
tablePath, connectOptions, schemaProvider, serializer, context);
tablePath,
connectOptions,
schemaProvider,
serializer,
createTableOptions(),
context);
}

@Override
Expand All @@ -75,6 +80,7 @@ public class BigQueryExactlyOnceSink<IN> extends BigQueryBaseSink<IN>
connectOptions,
schemaProvider,
serializer,
createTableOptions(),
context);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ public static BigQuerySinkConfig forTable(
deliveryGuarantee,
new BigQuerySchemaProviderImpl(
BigQueryTableSchemaProvider.getAvroSchemaFromLogicalSchema(logicalType)),
new RowDataToProtoSerializer(),
new RowDataToProtoSerializer(logicalType),
enableTableCreation,
partitionField,
partitionType,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright 2025 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.flink.bigquery.sink.client;

import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions;
import com.google.cloud.flink.bigquery.common.exceptions.BigQueryConnectorException;
import com.google.cloud.flink.bigquery.services.BigQueryServices;
import com.google.cloud.flink.bigquery.services.BigQueryServicesFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.google.cloud.flink.bigquery.services.BigQueryServicesImpl.ALREADY_EXISTS_ERROR_CODE;

/** Wrapper around {@link BigQueryServices} with sink specific error handling. */
public class BigQueryClientWithErrorHandling {

private static final Logger LOG =
LoggerFactory.getLogger(BigQueryClientWithErrorHandling.class);

private BigQueryClientWithErrorHandling() {}

public static boolean tableExists(BigQueryConnectOptions connectOptions)
throws BigQueryConnectorException {
try {
BigQueryServices.QueryDataClient queryDataClient =
BigQueryServicesFactory.instance(connectOptions).queryClient();
return queryDataClient.tableExists(
connectOptions.getProjectId(),
connectOptions.getDataset(),
connectOptions.getTable());
} catch (Exception e) {
throw new BigQueryConnectorException(
String.format(
"Unable to check existence of BigQuery table %s.%s.%s",
connectOptions.getProjectId(),
connectOptions.getDataset(),
connectOptions.getTable()),
e);
}
}

public static TableSchema getTableSchema(BigQueryConnectOptions connectOptions)
throws BigQueryConnectorException {
try {
BigQueryServices.QueryDataClient queryDataClient =
BigQueryServicesFactory.instance(connectOptions).queryClient();
return queryDataClient.getTableSchema(
connectOptions.getProjectId(),
connectOptions.getDataset(),
connectOptions.getTable());
} catch (Exception e) {
throw new BigQueryConnectorException(
String.format(
"Unable to get schema of BigQuery table %s.%s.%s",
connectOptions.getProjectId(),
connectOptions.getDataset(),
connectOptions.getTable()),
e);
}
}

public static void createDataset(BigQueryConnectOptions connectOptions, String region) {
try {
BigQueryServices.QueryDataClient queryDataClient =
BigQueryServicesFactory.instance(connectOptions).queryClient();
queryDataClient.createDataset(
connectOptions.getProjectId(), connectOptions.getDataset(), region);
LOG.info(
"Created BigQuery dataset {}.{}",
connectOptions.getProjectId(),
connectOptions.getDataset());
} catch (BigQueryException e) {
if (e.getCode() == ALREADY_EXISTS_ERROR_CODE) {
LOG.warn(
"Attempted creation of BigQuery dataset {}.{} failed, since it already exists",
connectOptions.getProjectId(),
connectOptions.getDataset());
return;
}
throw new BigQueryConnectorException(
String.format(
"Unable to create BigQuery dataset %s.%s",
connectOptions.getProjectId(), connectOptions.getDataset()),
e);
}
}

public static void createTable(
BigQueryConnectOptions connectOptions, TableDefinition tableDefinition) {
try {
BigQueryServices.QueryDataClient queryDataClient =
BigQueryServicesFactory.instance(connectOptions).queryClient();
queryDataClient.createTable(
connectOptions.getProjectId(),
connectOptions.getDataset(),
connectOptions.getTable(),
tableDefinition);
LOG.info(
"Created BigQuery table {}.{}.{}",
connectOptions.getProjectId(),
connectOptions.getDataset(),
connectOptions.getTable());
} catch (BigQueryException e) {
if (e.getCode() == ALREADY_EXISTS_ERROR_CODE) {
LOG.warn(
"Attempted creation of BigQuery table {}.{}.{} failed, since it already exists",
connectOptions.getProjectId(),
connectOptions.getDataset(),
connectOptions.getTable());
return;
}
throw new BigQueryConnectorException(
String.format(
"Unable to create BigQuery table %s.%s.%s",
connectOptions.getProjectId(),
connectOptions.getDataset(),
connectOptions.getTable()),
e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.bigquery.storage.v1.FlushRowsResponse;
import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions;
import com.google.cloud.flink.bigquery.common.exceptions.BigQueryConnectorException;
import com.google.cloud.flink.bigquery.services.BigQueryServices;
import com.google.cloud.flink.bigquery.services.BigQueryServicesFactory;
import com.google.cloud.flink.bigquery.sink.exceptions.BigQueryConnectorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ public void init(BigQuerySchemaProvider bigQuerySchemaProvider) {
this.descriptor = derivedDescriptor;
}

@Override
public Schema getAvroSchema(GenericRecord record) {
return record.getSchema();
}

@Override
public ByteString serialize(GenericRecord record) throws BigQuerySerializationException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.google.cloud.flink.bigquery.sink.exceptions.BigQuerySerializationException;
import com.google.protobuf.ByteString;
import org.apache.avro.Schema;

import java.io.Serializable;

Expand Down Expand Up @@ -46,4 +47,13 @@ public abstract class BigQueryProtoSerializer<IN> implements Serializable {
* @param schemaProvider BigQuery table's schema information.
*/
public void init(BigQuerySchemaProvider schemaProvider) {}

/**
* Derives Avro {@link Schema} describing the data record. This is primarily used by the sink to
* infer schema for creating new destination BigQuery table if one doesn't already exist.
*
* @param record Record to check for schema
* @return Schema.
*/
public abstract Schema getAvroSchema(IN record);
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,12 @@ public interface BigQuerySchemaProvider extends Serializable {
* @return AvroSchema
*/
Schema getAvroSchema();

/**
* Returns true if BigQuery table's schema is unknown, else false. Schema can be unknown due to
* reasons like table does not exist before Flink job.
*
* @return boolean
*/
boolean schemaUnknown();
}
Loading

0 comments on commit d0e7a62

Please sign in to comment.