From bce6d8a07504ec826e0a1adf41eb6edf9fe03188 Mon Sep 17 00:00:00 2001 From: Jayant Jain <141257304+jayehwhyehentee@users.noreply.github.com> Date: Mon, 27 Jan 2025 22:29:24 +0530 Subject: [PATCH] Allow higher parallelism in multi-regions (#203) Allow max parallelism of 512 in US and EU, while maintaining 128 in remaining regions. --- README.md | 22 ++-- .../flink/bigquery/sink/BigQueryBaseSink.java | 61 ++++++++-- .../bigquery/sink/BigQueryDefaultSink.java | 1 + .../sink/BigQueryExactlyOnceSink.java | 2 + .../BigQueryClientWithErrorHandling.java | 16 +++ .../throttle/BigQueryWriterThrottler.java | 9 +- .../bigquery/sink/writer/BaseWriter.java | 3 +- .../sink/writer/BigQueryBufferedWriter.java | 4 + .../sink/writer/BigQueryDefaultWriter.java | 2 + .../bigquery/fakes/StorageClientFaker.java | 29 +++++ .../sink/BigQueryDefaultSinkTest.java | 114 ++++++++++++++++-- .../sink/BigQueryExactlyOnceSinkTest.java | 23 ++-- .../throttle/BigQueryWriterThrottlerTest.java | 20 ++- .../writer/BigQueryBufferedWriterTest.java | 3 + .../writer/BigQueryDefaultWriterTest.java | 1 + .../bigquery/services/BigQueryServices.java | 10 ++ .../services/BigQueryServicesImpl.java | 8 ++ 17 files changed, 274 insertions(+), 54 deletions(-) diff --git a/README.md b/README.md index 16276dc8..c278cc04 100644 --- a/README.md +++ b/README.md @@ -245,7 +245,7 @@ BigQuerySinkTableConfig sinkTableConfig = BigQuerySinkTableConfig.newBuilder() .project(...) // REQUIRED .dataset(...) // REQUIRED .streamExecutionEnvironment(env) // REQUIRED if deliveryGuarantee is EXACTLY_ONCE - .sinkParallelism(...) // OPTIONAL; Should be atmost 128 + .sinkParallelism(...) // OPTIONAL; .deliveryGuarantee(...) // OPTIONAL; Default is AT_LEAST_ONCE .enableTableCreation(...) // OPTIONAL .partitionField(...) // OPTIONAL @@ -303,11 +303,16 @@ Check out BigQuery SQL's [DDL](https://cloud.google.com/bigquery/docs/reference/ ### Sink Details [MUST READ] * BigQuery sinks require that checkpoint is enabled. -* When using a BigQuery sink, checkpoint timeout should be liberal (1+ minutes). This is because sink writers are - [throttled](https://github.com/GoogleCloudDataproc/flink-bigquery-connector/blob/05fe7b14f2dc688bc808f553c3f863ba2380e317/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/throttle/BigQueryWriterThrottler.java#L26) - before they start sending data to BigQuery. Depending on the sink's parallelism, this throttling can be as high as 40 seconds. - Throttling is necessary to gracefully handle BigQuery's rate limiting on certain APIs used by the sink writers. Note that this only - affects the first checkpoint. +* The maximum parallelism of BigQuery sinks has been capped at **512** for multi-regions US or EU, and **128** for the rest. + This is to respect BigQuery storage [write quotas](https://cloud.google.com/bigquery/quotas#write-api-limits) while keeping + [throughput](https://cloud.google.com/bigquery/docs/write-api#connections) and + [best usage practices](https://cloud.google.com/bigquery/docs/write-api-best-practices) in mind. Users should either set + [sink level parallelism](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/execution/parallel/#operator-level) + explicitly, or ensure that default job level parallelism is under region-specific maximums (512 or 128). +* When using a BigQuery sink, checkpoint timeout should be liberal. This is because sink writers are + [throttled](https://github.com/GoogleCloudDataproc/flink-bigquery-connector/blob/main/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/throttle/BigQueryWriterThrottler.java) + before they start sending data to BigQuery. Depending on the destination dataset's region, an estimate of this throttling is 3 minutes for US and EU multi-regions, and 45 seconds for others. + Throttling is necessary to gracefully handle BigQuery's rate limiting on certain APIs used by the sink writers. Note that this throttling happens once per writer, before the first checkpoint they encounter after accepting data. * Delivery guarantee can be [at-least-once](https://nightlies.apache.org/flink/flink-docs-release-1.17/api/java/org/apache/flink/connector/base/DeliveryGuarantee.html#AT_LEAST_ONCE) or [exactly-once](https://nightlies.apache.org/flink/flink-docs-release-1.17/api/java/org/apache/flink/connector/base/DeliveryGuarantee.html#EXACTLY_ONCE). * The at-least-once sink enables BigQuery client [multiplexing](https://cloud.google.com/bigquery/docs/write-api-streaming#use_multiplexing) @@ -321,11 +326,6 @@ Check out BigQuery SQL's [DDL](https://cloud.google.com/bigquery/docs/reference/ when maintaining records in avro format. Check Flink's [blog on non-trivial serialization](https://nightlies.apache.org/flink/flink-docs-release-1.17/api/java/org/apache/flink/connector/base/DeliveryGuarantee.html#AT_LEAST_ONCE). Note that avro schema of an existing BigQuery table can be obtained from [BigQuerySchemaProviderImpl](https://github.com/GoogleCloudDataproc/flink-bigquery-connector/blob/main/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/serializer/BigQuerySchemaProviderImpl.java). -* The maximum parallelism of BigQuery sinks has been capped at **128**. This is to respect BigQuery storage - [write quotas](https://cloud.google.com/bigquery/quotas#write-api-limits) while adhering to - [best usage practices](https://cloud.google.com/bigquery/docs/write-api-best-practices). Users should either set - [sink level parallelism](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/execution/parallel/#operator-level) - explicitly, or ensure that default job level parallelism is under 128. * BigQuerySinkConfig requires the StreamExecutionEnvironment if delivery guarantee is exactly-once. **Restart strategy must be explicitly set in the StreamExecutionEnvironment**. This is to [validate](https://github.com/GoogleCloudDataproc/flink-bigquery-connector/blob/92db3690c741fb2cdb99e28c575e19affb5c8b69/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/BigQuerySinkConfig.java#L185) diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/BigQueryBaseSink.java b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/BigQueryBaseSink.java index fb891403..aabe0a3d 100644 --- a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/BigQueryBaseSink.java +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/BigQueryBaseSink.java @@ -17,7 +17,9 @@ package com.google.cloud.flink.bigquery.sink; import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.util.StringUtils; +import com.google.cloud.bigquery.Dataset; import com.google.cloud.bigquery.TimePartitioning; import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions; import com.google.cloud.flink.bigquery.services.BigQueryServicesImpl; @@ -29,6 +31,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; import java.util.List; /** Base class for developing a BigQuery sink. */ @@ -36,14 +39,17 @@ abstract class BigQueryBaseSink implements Sink { protected final Logger logger = LoggerFactory.getLogger(getClass()); - // BigQuery write streams can offer over 10 MBps throughput, and per project throughput quotas - // are in the order of single digit GBps. With each sink writer maintaining a single and unique - // write connection to BigQuery, maximum parallelism for sink is intentionally restricted to - // 128 for initial releases of this connector. This is also the default max parallelism of - // Flink applications. - // Based on performance observations and user feedback, this number can be increased in the - // future. - public static final int MAX_SINK_PARALLELISM = 128; + // Generally, a single write connection supports at least 1 MBps of throughput. + // Check https://cloud.google.com/bigquery/docs/write-api#connections + // Project-level BQ storage write API throughout in multi-regions (us and eu) is capped at + // 3 GBps. Other regions allow 300 MBps. + // Check https://cloud.google.com/bigquery/quotas#write-api-limits + // Keeping these factors in mind, the BigQuery sink's parallelism is capped according to the + // destination table's region. + public static final int DEFAULT_MAX_SINK_PARALLELISM = 128; + public static final int MULTI_REGION_MAX_SINK_PARALLELISM = 512; + public static final List BQ_MULTI_REGIONS = Arrays.asList("us", "eu"); + private static final String DEFAULT_REGION = "us"; final BigQueryConnectOptions connectOptions; final BigQuerySchemaProvider schemaProvider; @@ -55,6 +61,7 @@ abstract class BigQueryBaseSink implements Sink { final Long partitionExpirationMillis; final List clusteredFields; final String region; + final int maxParallelism; final String traceId; BigQueryBaseSink(BigQuerySinkConfig sinkConfig) { @@ -77,7 +84,8 @@ abstract class BigQueryBaseSink implements Sink { partitionType = sinkConfig.getPartitionType(); partitionExpirationMillis = sinkConfig.getPartitionExpirationMillis(); clusteredFields = sinkConfig.getClusteredFields(); - region = sinkConfig.getRegion(); + region = getRegion(sinkConfig.getRegion()); + maxParallelism = getMaxParallelism(); traceId = BigQueryServicesImpl.generateTraceId(); } @@ -101,10 +109,10 @@ private void validateSinkConfig(BigQuerySinkConfig sinkConfig) { /** Ensures Sink's parallelism does not exceed the allowed maximum when scaling Flink job. */ void checkParallelism(int numberOfParallelSubtasks) { - if (numberOfParallelSubtasks > MAX_SINK_PARALLELISM) { + if (numberOfParallelSubtasks > maxParallelism) { logger.error( "Maximum allowed parallelism for Sink is {}, but attempting to create Writer number {}", - MAX_SINK_PARALLELISM, + maxParallelism, numberOfParallelSubtasks); throw new IllegalStateException("Attempting to create more Sink Writers than allowed"); } @@ -119,4 +127,35 @@ CreateTableOptions createTableOptions() { clusteredFields, region); } + + private String getRegion(String userProvidedRegion) { + Dataset dataset = BigQueryClientWithErrorHandling.getDataset(connectOptions); + if (dataset == null || !dataset.exists()) { + if (StringUtils.isNullOrWhitespaceOnly(userProvidedRegion)) { + logger.info( + "No destination BQ region provided by user. Using default us multi-region."); + return DEFAULT_REGION; + } + return userProvidedRegion; + } + String datasetLocation = dataset.getLocation(); + if (!datasetLocation.equalsIgnoreCase(userProvidedRegion)) { + logger.warn( + "Provided sink dataset region {} will be overridden by dataset's existing location {}", + userProvidedRegion, + datasetLocation); + } + return dataset.getLocation(); + } + + // Max sink parallelism is deduced using destination dataset's region. + // Ensure instance variable 'region' is assigned before invoking this method. + private int getMaxParallelism() { + for (String multiRegion : BQ_MULTI_REGIONS) { + if (multiRegion.equalsIgnoreCase(region)) { + return MULTI_REGION_MAX_SINK_PARALLELISM; + } + } + return DEFAULT_MAX_SINK_PARALLELISM; + } } diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/BigQueryDefaultSink.java b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/BigQueryDefaultSink.java index e364f9e4..f19cad4a 100644 --- a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/BigQueryDefaultSink.java +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/BigQueryDefaultSink.java @@ -43,6 +43,7 @@ public SinkWriter createWriter(InitContext context) { schemaProvider, serializer, createTableOptions(), + maxParallelism, traceId, context); } diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/BigQueryExactlyOnceSink.java b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/BigQueryExactlyOnceSink.java index 3f42d795..663246b8 100644 --- a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/BigQueryExactlyOnceSink.java +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/BigQueryExactlyOnceSink.java @@ -56,6 +56,7 @@ public class BigQueryExactlyOnceSink extends BigQueryBaseSink schemaProvider, serializer, createTableOptions(), + maxParallelism, traceId, context); } @@ -82,6 +83,7 @@ public class BigQueryExactlyOnceSink extends BigQueryBaseSink schemaProvider, serializer, createTableOptions(), + maxParallelism, traceId, context); } diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/client/BigQueryClientWithErrorHandling.java b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/client/BigQueryClientWithErrorHandling.java index 91deae4a..2b5282de 100644 --- a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/client/BigQueryClientWithErrorHandling.java +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/client/BigQueryClientWithErrorHandling.java @@ -18,6 +18,7 @@ import com.google.api.services.bigquery.model.TableSchema; import com.google.cloud.bigquery.BigQueryException; +import com.google.cloud.bigquery.Dataset; import com.google.cloud.bigquery.TableDefinition; import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions; import com.google.cloud.flink.bigquery.common.exceptions.BigQueryConnectorException; @@ -135,4 +136,19 @@ public static void createTable( e); } } + + public static Dataset getDataset(BigQueryConnectOptions connectOptions) { + try { + BigQueryServices.QueryDataClient queryDataClient = + BigQueryServicesFactory.instance(connectOptions).queryClient(); + return queryDataClient.getDataset( + connectOptions.getProjectId(), connectOptions.getDataset()); + } catch (BigQueryException e) { + throw new BigQueryConnectorException( + String.format( + "Unable to check existence of BigQuery dataset %s.%s", + connectOptions.getProjectId(), connectOptions.getDataset()), + e); + } + } } diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/throttle/BigQueryWriterThrottler.java b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/throttle/BigQueryWriterThrottler.java index a9313632..c686b550 100644 --- a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/throttle/BigQueryWriterThrottler.java +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/throttle/BigQueryWriterThrottler.java @@ -16,7 +16,6 @@ package com.google.cloud.flink.bigquery.sink.throttle; -import com.google.cloud.flink.bigquery.sink.BigQueryExactlyOnceSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,18 +39,18 @@ */ public class BigQueryWriterThrottler implements Throttler { - // MAX_SINK_PARALLELISM is set as 128. - public static final int MAX_BUCKETS = BigQueryExactlyOnceSink.MAX_SINK_PARALLELISM / 3; private static final Logger LOG = LoggerFactory.getLogger(BigQueryWriterThrottler.class); private final int writerId; + private final int maxBuckets; - public BigQueryWriterThrottler(int writerId) { + public BigQueryWriterThrottler(int writerId, int maxParallelism) { this.writerId = writerId; + this.maxBuckets = maxParallelism / 3; } @Override public void throttle() { - int waitSeconds = writerId % MAX_BUCKETS; + int waitSeconds = writerId % maxBuckets; LOG.debug("Throttling writer {} for {} second", writerId, waitSeconds); try { // Sleep does nothing if input is 0 or less. diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/writer/BaseWriter.java b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/writer/BaseWriter.java index 6005e319..8085a524 100644 --- a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/writer/BaseWriter.java +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/writer/BaseWriter.java @@ -124,6 +124,7 @@ abstract class BaseWriter implements SinkWriter { BigQuerySchemaProvider schemaProvider, BigQueryProtoSerializer serializer, CreateTableOptions createTableOptions, + int maxParallelism, String traceId) { this.subtaskId = subtaskId; this.tablePath = tablePath; @@ -135,7 +136,7 @@ abstract class BaseWriter implements SinkWriter { appendRequestSizeBytes = 0L; appendResponseFuturesQueue = new LinkedList<>(); protoRowsBuilder = ProtoRows.newBuilder(); - throttler = new BigQueryWriterThrottler(subtaskId); + throttler = new BigQueryWriterThrottler(subtaskId, maxParallelism); } /** Append pending records and validate all remaining append responses. */ diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryBufferedWriter.java b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryBufferedWriter.java index 65a8ce22..5fa0a68c 100644 --- a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryBufferedWriter.java +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryBufferedWriter.java @@ -106,6 +106,7 @@ public BigQueryBufferedWriter( BigQuerySchemaProvider schemaProvider, BigQueryProtoSerializer serializer, CreateTableOptions createTableOptions, + int maxParallelism, String traceId, InitContext context) { this( @@ -119,6 +120,7 @@ public BigQueryBufferedWriter( schemaProvider, serializer, createTableOptions, + maxParallelism, traceId, context); } @@ -134,6 +136,7 @@ public BigQueryBufferedWriter( BigQuerySchemaProvider schemaProvider, BigQueryProtoSerializer serializer, CreateTableOptions createTableOptions, + int maxParallelism, String traceId, InitContext context) { super( @@ -143,6 +146,7 @@ public BigQueryBufferedWriter( schemaProvider, serializer, createTableOptions, + maxParallelism, traceId); this.streamNameInState = StringUtils.isNullOrWhitespaceOnly(streamName) ? "" : streamName; this.streamName = this.streamNameInState; diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryDefaultWriter.java b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryDefaultWriter.java index 11961d80..33840a3c 100644 --- a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryDefaultWriter.java +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryDefaultWriter.java @@ -63,6 +63,7 @@ public BigQueryDefaultWriter( BigQuerySchemaProvider schemaProvider, BigQueryProtoSerializer serializer, CreateTableOptions createTableOptions, + int maxParallelism, String taceId, InitContext context) { super( @@ -72,6 +73,7 @@ public BigQueryDefaultWriter( schemaProvider, serializer, createTableOptions, + maxParallelism, taceId); streamName = String.format("%s/streams/_default", tablePath); totalRecordsSeen = 0L; diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/fakes/StorageClientFaker.java b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/fakes/StorageClientFaker.java index efaa9981..ec3d2fcf 100644 --- a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/fakes/StorageClientFaker.java +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/fakes/StorageClientFaker.java @@ -28,6 +28,7 @@ import com.google.api.services.bigquery.model.JobStatistics2; import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableSchema; +import com.google.cloud.bigquery.Dataset; import com.google.cloud.bigquery.StandardSQLTypeName; import com.google.cloud.bigquery.TableDefinition; import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; @@ -146,6 +147,8 @@ public static class FakeQueryDataClient implements QueryDataClient { private final RuntimeException createDatasetError; private final RuntimeException createTableError; + private boolean datasetExists; + private String datasetRegion; private int tableExistsInvocations; private int createDatasetInvocations; private int createTableInvocations; @@ -162,6 +165,17 @@ public FakeQueryDataClient( tableExistsInvocations = 0; createDatasetInvocations = 0; createTableInvocations = 0; + datasetExists = true; + datasetRegion = "us-central1"; + } + + public FakeQueryDataClient(boolean datasetExists, String datasetRegion) { + this.datasetExists = datasetExists; + this.datasetRegion = datasetRegion; + tableExists = false; + tableExistsError = null; + createDatasetError = null; + createTableError = null; } static FakeQueryDataClient defaultInstance = @@ -199,6 +213,14 @@ public TableSchema getTableSchema(String project, String dataset, String table) return SIMPLE_BQ_TABLE_SCHEMA; } + @Override + public Dataset getDataset(String project, String dataset) { + Dataset mockedDataset = Mockito.mock(Dataset.class); + Mockito.when(mockedDataset.exists()).thenReturn(datasetExists); + Mockito.when(mockedDataset.getLocation()).thenReturn(datasetRegion); + return mockedDataset; + } + @Override public void createDataset(String project, String dataset, String region) { createDatasetInvocations++; @@ -882,6 +904,13 @@ public static BigQueryConnectOptions createConnectOptionsForQuery( return createConnectOptions(null, null, queryClient); } + public static BigQueryConnectOptions createConnectOptionsForQuery( + boolean datasetExists, String datasetRegion) { + FakeBigQueryServices.FakeQueryDataClient queryClient = + new FakeBigQueryServices.FakeQueryDataClient(datasetExists, datasetRegion); + return createConnectOptions(null, null, queryClient); + } + public static BigQueryConnectOptions createConnectOptions( FakeBigQueryServices.FakeBigQueryStorageReadClient readClient, FakeBigQueryServices.FakeBigQueryStorageWriteClient writeClient, diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/BigQueryDefaultSinkTest.java b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/BigQueryDefaultSinkTest.java index cbfd899c..2a00541f 100644 --- a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/BigQueryDefaultSinkTest.java +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/BigQueryDefaultSinkTest.java @@ -22,7 +22,9 @@ import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions; import com.google.cloud.flink.bigquery.fakes.StorageClientFaker; +import com.google.cloud.flink.bigquery.sink.serializer.AvroToProtoSerializer; import com.google.cloud.flink.bigquery.sink.serializer.FakeBigQuerySerializer; import com.google.cloud.flink.bigquery.sink.serializer.TestBigQuerySchemas; import com.google.protobuf.ByteString; @@ -62,7 +64,7 @@ public void testConstructor() { env.setRestartStrategy(FIXED_DELAY_RESTART_STRATEGY); BigQuerySinkConfig sinkConfig = BigQuerySinkConfig.newBuilder() - .connectOptions(StorageClientFaker.createConnectOptionsForWrite(null)) + .connectOptions(getConnectOptions(true)) .schemaProvider(TestBigQuerySchemas.getSimpleRecordSchema()) .serializer(new FakeBigQuerySerializer(ByteString.copyFromUtf8("foo"))) .streamExecutionEnvironment(env) @@ -98,7 +100,7 @@ public void testConstructor_withoutSerializer() { env.setRestartStrategy(FIXED_DELAY_RESTART_STRATEGY); BigQuerySinkConfig sinkConfig = BigQuerySinkConfig.newBuilder() - .connectOptions(StorageClientFaker.createConnectOptionsForWrite(null)) + .connectOptions(getConnectOptions(true)) .schemaProvider(TestBigQuerySchemas.getSimpleRecordSchema()) .serializer(null) .streamExecutionEnvironment(env) @@ -114,9 +116,7 @@ public void testConstructor_withCreateTable_withoutExistingTable() { env.setRestartStrategy(FIXED_DELAY_RESTART_STRATEGY); BigQuerySinkConfig sinkConfig = BigQuerySinkConfig.newBuilder() - .connectOptions( - StorageClientFaker.createConnectOptionsForQuery( - false, null, null, null)) + .connectOptions(getConnectOptions(false)) .serializer(new FakeBigQuerySerializer(ByteString.copyFromUtf8("foo"))) .streamExecutionEnvironment(env) .enableTableCreation(true) @@ -132,9 +132,7 @@ public void testConstructor_withCreateTable_withExistingTable() { env.setRestartStrategy(FIXED_DELAY_RESTART_STRATEGY); BigQuerySinkConfig sinkConfig = BigQuerySinkConfig.newBuilder() - .connectOptions( - StorageClientFaker.createConnectOptionsForQuery( - true, null, null, null)) + .connectOptions(getConnectOptions(true)) .serializer(new FakeBigQuerySerializer(ByteString.copyFromUtf8("foo"))) .streamExecutionEnvironment(env) .enableTableCreation(true) @@ -150,9 +148,7 @@ public void testConstructor_withoutCreateTable_withoutExistingTable() { env.setRestartStrategy(FIXED_DELAY_RESTART_STRATEGY); BigQuerySinkConfig sinkConfig = BigQuerySinkConfig.newBuilder() - .connectOptions( - StorageClientFaker.createConnectOptionsForQuery( - false, null, null, null)) + .connectOptions(getConnectOptions(false)) .serializer(new FakeBigQuerySerializer(ByteString.copyFromUtf8("foo"))) .streamExecutionEnvironment(env) .build(); @@ -174,7 +170,7 @@ public void testCreateWriter() { .thenReturn(UnregisteredMetricsGroup.createSinkWriterMetricGroup()); BigQuerySinkConfig sinkConfig = BigQuerySinkConfig.newBuilder() - .connectOptions(StorageClientFaker.createConnectOptionsForWrite(null)) + .connectOptions(getConnectOptions(true)) .schemaProvider(TestBigQuerySchemas.getSimpleRecordSchema()) .serializer(new FakeBigQuerySerializer(ByteString.copyFromUtf8("foo"))) .streamExecutionEnvironment(env) @@ -191,7 +187,7 @@ public void testCreateWriter_withMoreWritersThanAllowed() { Mockito.when(mockedContext.getNumberOfParallelSubtasks()).thenReturn(129); BigQuerySinkConfig sinkConfig = BigQuerySinkConfig.newBuilder() - .connectOptions(StorageClientFaker.createConnectOptionsForWrite(null)) + .connectOptions(getConnectOptions(true)) .schemaProvider(TestBigQuerySchemas.getSimpleRecordSchema()) .serializer(new FakeBigQuerySerializer(ByteString.copyFromUtf8("foo"))) .streamExecutionEnvironment(env) @@ -204,4 +200,96 @@ public void testCreateWriter_withMoreWritersThanAllowed() { .hasMessageThat() .contains("Attempting to create more Sink Writers than allowed"); } + + @Test + public void testRegionAndParallelism_datasetExists_providedRegionOverridden() { + env.setRestartStrategy(FIXED_DELAY_RESTART_STRATEGY); + BigQuerySinkConfig sinkConfig = + BigQuerySinkConfig.newBuilder() + .connectOptions( + StorageClientFaker.createConnectOptionsForQuery( + true, "us-central1")) + .serializer(new AvroToProtoSerializer()) + .enableTableCreation(true) + .streamExecutionEnvironment(env) + .region("us") + .build(); + BigQueryDefaultSink sink = new BigQueryDefaultSink(sinkConfig); + assertEquals("us-central1", sink.region); + assertEquals(128, sink.maxParallelism); + } + + @Test + public void testRegionAndParallelism_datasetExistsMultiRegion_providedRegionOverridden() { + env.setRestartStrategy(FIXED_DELAY_RESTART_STRATEGY); + BigQuerySinkConfig sinkConfig = + BigQuerySinkConfig.newBuilder() + .connectOptions(StorageClientFaker.createConnectOptionsForQuery(true, "us")) + .serializer(new AvroToProtoSerializer()) + .enableTableCreation(true) + .streamExecutionEnvironment(env) + .region("us-central1") + .build(); + BigQueryDefaultSink sink = new BigQueryDefaultSink(sinkConfig); + assertEquals("us", sink.region); + assertEquals(512, sink.maxParallelism); + } + + @Test + public void testRegionAndParallelism_useProvidedRegion() { + env.setRestartStrategy(FIXED_DELAY_RESTART_STRATEGY); + BigQuerySinkConfig sinkConfig = + BigQuerySinkConfig.newBuilder() + .connectOptions( + StorageClientFaker.createConnectOptionsForQuery(false, null)) + .serializer(new AvroToProtoSerializer()) + .enableTableCreation(true) + .streamExecutionEnvironment(env) + .region("us-central1") + .build(); + BigQueryDefaultSink sink = new BigQueryDefaultSink(sinkConfig); + assertEquals("us-central1", sink.region); + assertEquals(128, sink.maxParallelism); + } + + @Test + public void testRegionAndParallelism_useProvidedMultiRegion() { + env.setRestartStrategy(FIXED_DELAY_RESTART_STRATEGY); + BigQuerySinkConfig sinkConfig = + BigQuerySinkConfig.newBuilder() + .connectOptions( + StorageClientFaker.createConnectOptionsForQuery(false, null)) + .serializer(new AvroToProtoSerializer()) + .enableTableCreation(true) + .streamExecutionEnvironment(env) + .region("US") + .build(); + BigQueryDefaultSink sink = new BigQueryDefaultSink(sinkConfig); + assertEquals("US", sink.region); + assertEquals(512, sink.maxParallelism); + } + + @Test + public void testRegionAndParallelism_useDefaultRegion() { + env.setRestartStrategy(FIXED_DELAY_RESTART_STRATEGY); + BigQuerySinkConfig sinkConfig = + BigQuerySinkConfig.newBuilder() + .connectOptions( + StorageClientFaker.createConnectOptionsForQuery(false, null)) + .serializer(new AvroToProtoSerializer()) + .enableTableCreation(true) + .streamExecutionEnvironment(env) + .build(); + BigQueryDefaultSink sink = new BigQueryDefaultSink(sinkConfig); + assertEquals("us", sink.region); + assertEquals(512, sink.maxParallelism); + } + + private static BigQueryConnectOptions getConnectOptions(boolean tableExists) { + return StorageClientFaker.createConnectOptions( + null, + new StorageClientFaker.FakeBigQueryServices.FakeBigQueryStorageWriteClient(null), + new StorageClientFaker.FakeBigQueryServices.FakeQueryDataClient( + tableExists, null, null, null)); + } } diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/BigQueryExactlyOnceSinkTest.java b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/BigQueryExactlyOnceSinkTest.java index 8cbbc923..da501250 100644 --- a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/BigQueryExactlyOnceSinkTest.java +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/BigQueryExactlyOnceSinkTest.java @@ -22,6 +22,7 @@ import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions; import com.google.cloud.flink.bigquery.fakes.StorageClientFaker; import com.google.cloud.flink.bigquery.sink.serializer.FakeBigQuerySerializer; import com.google.cloud.flink.bigquery.sink.serializer.TestBigQuerySchemas; @@ -60,7 +61,7 @@ public void tearDown() throws Exception { public void testConstructor() { BigQuerySinkConfig sinkConfig = BigQuerySinkConfig.newBuilder() - .connectOptions(StorageClientFaker.createConnectOptionsForWrite(null)) + .connectOptions(getConnectOptions(true)) .schemaProvider(TestBigQuerySchemas.getSimpleRecordSchema()) .serializer(new FakeBigQuerySerializer(ByteString.copyFromUtf8("foo"))) .streamExecutionEnvironment(env) @@ -95,7 +96,7 @@ public void testCreateWriter() { .thenReturn(UnregisteredMetricsGroup.createSinkWriterMetricGroup()); BigQuerySinkConfig sinkConfig = BigQuerySinkConfig.newBuilder() - .connectOptions(StorageClientFaker.createConnectOptionsForWrite(null)) + .connectOptions(getConnectOptions(true)) .schemaProvider(TestBigQuerySchemas.getSimpleRecordSchema()) .serializer(new FakeBigQuerySerializer(ByteString.copyFromUtf8("foo"))) .streamExecutionEnvironment(env) @@ -113,7 +114,7 @@ public void testCreate_withMoreWritersThanAllowed() { .thenReturn(UnregisteredMetricsGroup.createSinkWriterMetricGroup()); BigQuerySinkConfig sinkConfig = BigQuerySinkConfig.newBuilder() - .connectOptions(StorageClientFaker.createConnectOptionsForWrite(null)) + .connectOptions(getConnectOptions(true)) .schemaProvider(TestBigQuerySchemas.getSimpleRecordSchema()) .serializer(new FakeBigQuerySerializer(ByteString.copyFromUtf8("foo"))) .streamExecutionEnvironment(env) @@ -136,7 +137,7 @@ public void testRestoreWriter() { .thenReturn(UnregisteredMetricsGroup.createSinkWriterMetricGroup()); BigQuerySinkConfig sinkConfig = BigQuerySinkConfig.newBuilder() - .connectOptions(StorageClientFaker.createConnectOptionsForWrite(null)) + .connectOptions(getConnectOptions(true)) .schemaProvider(TestBigQuerySchemas.getSimpleRecordSchema()) .serializer(new FakeBigQuerySerializer(ByteString.copyFromUtf8("foo"))) .streamExecutionEnvironment(env) @@ -161,7 +162,7 @@ public void testRestoreWriter() { public void testCreateCommitter() { BigQuerySinkConfig sinkConfig = BigQuerySinkConfig.newBuilder() - .connectOptions(StorageClientFaker.createConnectOptionsForWrite(null)) + .connectOptions(getConnectOptions(true)) .schemaProvider(TestBigQuerySchemas.getSimpleRecordSchema()) .serializer(new FakeBigQuerySerializer(ByteString.copyFromUtf8("foo"))) .streamExecutionEnvironment(env) @@ -174,7 +175,7 @@ public void testCreateCommitter() { public void testGetCommittableSerializer() { BigQuerySinkConfig sinkConfig = BigQuerySinkConfig.newBuilder() - .connectOptions(StorageClientFaker.createConnectOptionsForWrite(null)) + .connectOptions(getConnectOptions(true)) .schemaProvider(TestBigQuerySchemas.getSimpleRecordSchema()) .serializer(new FakeBigQuerySerializer(ByteString.copyFromUtf8("foo"))) .streamExecutionEnvironment(env) @@ -187,7 +188,7 @@ public void testGetCommittableSerializer() { public void testGetWriterStateSerializer() { BigQuerySinkConfig sinkConfig = BigQuerySinkConfig.newBuilder() - .connectOptions(StorageClientFaker.createConnectOptionsForWrite(null)) + .connectOptions(getConnectOptions(true)) .schemaProvider(TestBigQuerySchemas.getSimpleRecordSchema()) .serializer(new FakeBigQuerySerializer(ByteString.copyFromUtf8("foo"))) .streamExecutionEnvironment(env) @@ -195,4 +196,12 @@ public void testGetWriterStateSerializer() { BigQueryExactlyOnceSink exactlyOnceSink = new BigQueryExactlyOnceSink(sinkConfig); assertNotNull(exactlyOnceSink.getWriterStateSerializer()); } + + private static BigQueryConnectOptions getConnectOptions(boolean tableExists) { + return StorageClientFaker.createConnectOptions( + null, + new StorageClientFaker.FakeBigQueryServices.FakeBigQueryStorageWriteClient(null), + new StorageClientFaker.FakeBigQueryServices.FakeQueryDataClient( + tableExists, null, null, null)); + } } diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/throttle/BigQueryWriterThrottlerTest.java b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/throttle/BigQueryWriterThrottlerTest.java index 16d5e880..80039bd6 100644 --- a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/throttle/BigQueryWriterThrottlerTest.java +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/throttle/BigQueryWriterThrottlerTest.java @@ -27,28 +27,36 @@ public class BigQueryWriterThrottlerTest { @Test - public void testThrottle() { - Duration duration = invokeThrottle(3); + public void testThrottle_defaultMaxParallelism() { + Duration duration = invokeThrottle(3, 128); assertTrue(duration.toMillis() >= 3000L); + assertTrue(duration.toMillis() < 4000L); + } + + @Test + public void testThrottle_multiRegionMaxParallelism() { + Duration duration = invokeThrottle(171, 512); + assertTrue(duration.toMillis() >= 1000L); + assertTrue(duration.toMillis() < 2000L); } @Test public void testThrottle_withInterruptedException() { // Force interruption Thread.currentThread().interrupt(); - Duration duration = invokeThrottle(3); + Duration duration = invokeThrottle(3, 128); assertTrue(duration.toMillis() < 3000L); } @Test public void testThrottle_withInvalidWriterId_expectNoThrottling() { - Duration duration = invokeThrottle(-1); + Duration duration = invokeThrottle(-1, 128); long waitSeconds = duration.toMillis() / 1000; assertTrue(waitSeconds == 0); } - private Duration invokeThrottle(int writerId) { - BigQueryWriterThrottler throttler = new BigQueryWriterThrottler(writerId); + private Duration invokeThrottle(int writerId, int maxParallelism) { + BigQueryWriterThrottler throttler = new BigQueryWriterThrottler(writerId, maxParallelism); Instant start = Instant.now(); throttler.throttle(); Instant end = Instant.now(); diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryBufferedWriterTest.java b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryBufferedWriterTest.java index 6b0fd6cc..edc69a0d 100644 --- a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryBufferedWriterTest.java +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryBufferedWriterTest.java @@ -1149,6 +1149,7 @@ private BigQueryBufferedWriter createBufferedWriter( TestBigQuerySchemas.getSimpleRecordSchema(), mockSerializer, null, + 128, "traceId", context); } @@ -1189,6 +1190,7 @@ private BigQueryBufferedWriter createBufferedWriter( schemaProvider, mockSerializer, createTableOptions, + 128, "traceId", context); } @@ -1224,6 +1226,7 @@ private BigQueryBufferedWriter createBufferedWriter( TestBigQuerySchemas.getSimpleRecordSchema(), mockSerializer, null, + 128, "traceId", context); } diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryDefaultWriterTest.java b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryDefaultWriterTest.java index 54a25150..32e3fa7a 100644 --- a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryDefaultWriterTest.java +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryDefaultWriterTest.java @@ -398,6 +398,7 @@ private BigQueryDefaultWriter createDefaultWriter( schemaProvider, mockSerializer, createTableOptions, + 128, "traceId", mockInitContext); } diff --git a/flink-connector-bigquery-common/src/main/java/com/google/cloud/flink/bigquery/services/BigQueryServices.java b/flink-connector-bigquery-common/src/main/java/com/google/cloud/flink/bigquery/services/BigQueryServices.java index 963f9365..638a9391 100644 --- a/flink-connector-bigquery-common/src/main/java/com/google/cloud/flink/bigquery/services/BigQueryServices.java +++ b/flink-connector-bigquery-common/src/main/java/com/google/cloud/flink/bigquery/services/BigQueryServices.java @@ -20,6 +20,7 @@ import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.TableSchema; +import com.google.cloud.bigquery.Dataset; import com.google.cloud.bigquery.TableDefinition; import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest; import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamResponse; @@ -218,6 +219,15 @@ List retrievePartitionsStatus( */ TableSchema getTableSchema(String project, String dataset, String table); + /** + * Get BigQuery dataset. + * + * @param project The GCP project. + * @param dataset The BigQuery dataset. + * @return The BigQuery {@link Dataset}. + */ + Dataset getDataset(String project, String dataset); + /** * Create BigQuery dataset. * diff --git a/flink-connector-bigquery-common/src/main/java/com/google/cloud/flink/bigquery/services/BigQueryServicesImpl.java b/flink-connector-bigquery-common/src/main/java/com/google/cloud/flink/bigquery/services/BigQueryServicesImpl.java index 31b5af84..cb884180 100644 --- a/flink-connector-bigquery-common/src/main/java/com/google/cloud/flink/bigquery/services/BigQueryServicesImpl.java +++ b/flink-connector-bigquery-common/src/main/java/com/google/cloud/flink/bigquery/services/BigQueryServicesImpl.java @@ -35,6 +35,8 @@ import com.google.api.services.bigquery.model.TableSchema; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.Dataset; +import com.google.cloud.bigquery.DatasetId; import com.google.cloud.bigquery.DatasetInfo; import com.google.cloud.bigquery.QueryJobConfiguration; import com.google.cloud.bigquery.StandardSQLTypeName; @@ -459,6 +461,12 @@ public TableSchema getTableSchema(String project, String dataset, String table) return BigQueryTableInfo.getSchema(bigQuery, project, dataset, table); } + @Override + public Dataset getDataset(String project, String dataset) { + DatasetId datasetId = DatasetId.of(project, dataset); + return bigQuery.getDataset(datasetId); + } + @Override public void createDataset(String project, String dataset, String region) { DatasetInfo.Builder datasetInfo = DatasetInfo.newBuilder(project, dataset);