Skip to content

Commit

Permalink
Allow higher parallelism in multi-regions (#203)
Browse files Browse the repository at this point in the history
Allow max parallelism of 512 in US and EU, while maintaining 128 in
remaining regions.
  • Loading branch information
jayehwhyehentee authored Jan 27, 2025
1 parent 04c811e commit bce6d8a
Show file tree
Hide file tree
Showing 17 changed files with 274 additions and 54 deletions.
22 changes: 11 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ BigQuerySinkTableConfig sinkTableConfig = BigQuerySinkTableConfig.newBuilder()
.project(...) // REQUIRED
.dataset(...) // REQUIRED
.streamExecutionEnvironment(env) // REQUIRED if deliveryGuarantee is EXACTLY_ONCE
.sinkParallelism(...) // OPTIONAL; Should be atmost 128
.sinkParallelism(...) // OPTIONAL;
.deliveryGuarantee(...) // OPTIONAL; Default is AT_LEAST_ONCE
.enableTableCreation(...) // OPTIONAL
.partitionField(...) // OPTIONAL
Expand Down Expand Up @@ -303,11 +303,16 @@ Check out BigQuery SQL's [DDL](https://cloud.google.com/bigquery/docs/reference/
### Sink Details [MUST READ]

* BigQuery sinks require that checkpoint is enabled.
* When using a BigQuery sink, checkpoint timeout should be liberal (1+ minutes). This is because sink writers are
[throttled](https://github.com/GoogleCloudDataproc/flink-bigquery-connector/blob/05fe7b14f2dc688bc808f553c3f863ba2380e317/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/throttle/BigQueryWriterThrottler.java#L26)
before they start sending data to BigQuery. Depending on the sink's parallelism, this throttling can be as high as 40 seconds.
Throttling is necessary to gracefully handle BigQuery's rate limiting on certain APIs used by the sink writers. Note that this only
affects the first checkpoint.
* The maximum parallelism of BigQuery sinks has been capped at **512** for multi-regions US or EU, and **128** for the rest.
This is to respect BigQuery storage [write quotas](https://cloud.google.com/bigquery/quotas#write-api-limits) while keeping
[throughput](https://cloud.google.com/bigquery/docs/write-api#connections) and
[best usage practices](https://cloud.google.com/bigquery/docs/write-api-best-practices) in mind. Users should either set
[sink level parallelism](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/execution/parallel/#operator-level)
explicitly, or ensure that default job level parallelism is under region-specific maximums (512 or 128).
* When using a BigQuery sink, checkpoint timeout should be liberal. This is because sink writers are
[throttled](https://github.com/GoogleCloudDataproc/flink-bigquery-connector/blob/main/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/throttle/BigQueryWriterThrottler.java)
before they start sending data to BigQuery. Depending on the destination dataset's region, an estimate of this throttling is 3 minutes for US and EU multi-regions, and 45 seconds for others.
Throttling is necessary to gracefully handle BigQuery's rate limiting on certain APIs used by the sink writers. Note that this throttling happens once per writer, before the first checkpoint they encounter after accepting data.
* Delivery guarantee can be [at-least-once](https://nightlies.apache.org/flink/flink-docs-release-1.17/api/java/org/apache/flink/connector/base/DeliveryGuarantee.html#AT_LEAST_ONCE) or
[exactly-once](https://nightlies.apache.org/flink/flink-docs-release-1.17/api/java/org/apache/flink/connector/base/DeliveryGuarantee.html#EXACTLY_ONCE).
* The at-least-once sink enables BigQuery client [multiplexing](https://cloud.google.com/bigquery/docs/write-api-streaming#use_multiplexing)
Expand All @@ -321,11 +326,6 @@ Check out BigQuery SQL's [DDL](https://cloud.google.com/bigquery/docs/reference/
when maintaining records in avro format. Check Flink's [blog on non-trivial serialization](https://nightlies.apache.org/flink/flink-docs-release-1.17/api/java/org/apache/flink/connector/base/DeliveryGuarantee.html#AT_LEAST_ONCE).
Note that avro schema of an existing BigQuery table can be obtained from
[BigQuerySchemaProviderImpl](https://github.com/GoogleCloudDataproc/flink-bigquery-connector/blob/main/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/serializer/BigQuerySchemaProviderImpl.java).
* The maximum parallelism of BigQuery sinks has been capped at **128**. This is to respect BigQuery storage
[write quotas](https://cloud.google.com/bigquery/quotas#write-api-limits) while adhering to
[best usage practices](https://cloud.google.com/bigquery/docs/write-api-best-practices). Users should either set
[sink level parallelism](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/execution/parallel/#operator-level)
explicitly, or ensure that default job level parallelism is under 128.
* BigQuerySinkConfig requires the StreamExecutionEnvironment if delivery guarantee is exactly-once.
**Restart strategy must be explicitly set in the StreamExecutionEnvironment**.
This is to [validate](https://github.com/GoogleCloudDataproc/flink-bigquery-connector/blob/92db3690c741fb2cdb99e28c575e19affb5c8b69/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/BigQuerySinkConfig.java#L185)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
package com.google.cloud.flink.bigquery.sink;

import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.util.StringUtils;

import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.TimePartitioning;
import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions;
import com.google.cloud.flink.bigquery.services.BigQueryServicesImpl;
Expand All @@ -29,21 +31,25 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.List;

/** Base class for developing a BigQuery sink. */
abstract class BigQueryBaseSink<IN> implements Sink<IN> {

protected final Logger logger = LoggerFactory.getLogger(getClass());

// BigQuery write streams can offer over 10 MBps throughput, and per project throughput quotas
// are in the order of single digit GBps. With each sink writer maintaining a single and unique
// write connection to BigQuery, maximum parallelism for sink is intentionally restricted to
// 128 for initial releases of this connector. This is also the default max parallelism of
// Flink applications.
// Based on performance observations and user feedback, this number can be increased in the
// future.
public static final int MAX_SINK_PARALLELISM = 128;
// Generally, a single write connection supports at least 1 MBps of throughput.
// Check https://cloud.google.com/bigquery/docs/write-api#connections
// Project-level BQ storage write API throughout in multi-regions (us and eu) is capped at
// 3 GBps. Other regions allow 300 MBps.
// Check https://cloud.google.com/bigquery/quotas#write-api-limits
// Keeping these factors in mind, the BigQuery sink's parallelism is capped according to the
// destination table's region.
public static final int DEFAULT_MAX_SINK_PARALLELISM = 128;
public static final int MULTI_REGION_MAX_SINK_PARALLELISM = 512;
public static final List<String> BQ_MULTI_REGIONS = Arrays.asList("us", "eu");
private static final String DEFAULT_REGION = "us";

final BigQueryConnectOptions connectOptions;
final BigQuerySchemaProvider schemaProvider;
Expand All @@ -55,6 +61,7 @@ abstract class BigQueryBaseSink<IN> implements Sink<IN> {
final Long partitionExpirationMillis;
final List<String> clusteredFields;
final String region;
final int maxParallelism;
final String traceId;

BigQueryBaseSink(BigQuerySinkConfig sinkConfig) {
Expand All @@ -77,7 +84,8 @@ abstract class BigQueryBaseSink<IN> implements Sink<IN> {
partitionType = sinkConfig.getPartitionType();
partitionExpirationMillis = sinkConfig.getPartitionExpirationMillis();
clusteredFields = sinkConfig.getClusteredFields();
region = sinkConfig.getRegion();
region = getRegion(sinkConfig.getRegion());
maxParallelism = getMaxParallelism();
traceId = BigQueryServicesImpl.generateTraceId();
}

Expand All @@ -101,10 +109,10 @@ private void validateSinkConfig(BigQuerySinkConfig sinkConfig) {

/** Ensures Sink's parallelism does not exceed the allowed maximum when scaling Flink job. */
void checkParallelism(int numberOfParallelSubtasks) {
if (numberOfParallelSubtasks > MAX_SINK_PARALLELISM) {
if (numberOfParallelSubtasks > maxParallelism) {
logger.error(
"Maximum allowed parallelism for Sink is {}, but attempting to create Writer number {}",
MAX_SINK_PARALLELISM,
maxParallelism,
numberOfParallelSubtasks);
throw new IllegalStateException("Attempting to create more Sink Writers than allowed");
}
Expand All @@ -119,4 +127,35 @@ CreateTableOptions createTableOptions() {
clusteredFields,
region);
}

private String getRegion(String userProvidedRegion) {
Dataset dataset = BigQueryClientWithErrorHandling.getDataset(connectOptions);
if (dataset == null || !dataset.exists()) {
if (StringUtils.isNullOrWhitespaceOnly(userProvidedRegion)) {
logger.info(
"No destination BQ region provided by user. Using default us multi-region.");
return DEFAULT_REGION;
}
return userProvidedRegion;
}
String datasetLocation = dataset.getLocation();
if (!datasetLocation.equalsIgnoreCase(userProvidedRegion)) {
logger.warn(
"Provided sink dataset region {} will be overridden by dataset's existing location {}",
userProvidedRegion,
datasetLocation);
}
return dataset.getLocation();
}

// Max sink parallelism is deduced using destination dataset's region.
// Ensure instance variable 'region' is assigned before invoking this method.
private int getMaxParallelism() {
for (String multiRegion : BQ_MULTI_REGIONS) {
if (multiRegion.equalsIgnoreCase(region)) {
return MULTI_REGION_MAX_SINK_PARALLELISM;
}
}
return DEFAULT_MAX_SINK_PARALLELISM;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public SinkWriter createWriter(InitContext context) {
schemaProvider,
serializer,
createTableOptions(),
maxParallelism,
traceId,
context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class BigQueryExactlyOnceSink<IN> extends BigQueryBaseSink<IN>
schemaProvider,
serializer,
createTableOptions(),
maxParallelism,
traceId,
context);
}
Expand All @@ -82,6 +83,7 @@ public class BigQueryExactlyOnceSink<IN> extends BigQueryBaseSink<IN>
schemaProvider,
serializer,
createTableOptions(),
maxParallelism,
traceId,
context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions;
import com.google.cloud.flink.bigquery.common.exceptions.BigQueryConnectorException;
Expand Down Expand Up @@ -135,4 +136,19 @@ public static void createTable(
e);
}
}

public static Dataset getDataset(BigQueryConnectOptions connectOptions) {
try {
BigQueryServices.QueryDataClient queryDataClient =
BigQueryServicesFactory.instance(connectOptions).queryClient();
return queryDataClient.getDataset(
connectOptions.getProjectId(), connectOptions.getDataset());
} catch (BigQueryException e) {
throw new BigQueryConnectorException(
String.format(
"Unable to check existence of BigQuery dataset %s.%s",
connectOptions.getProjectId(), connectOptions.getDataset()),
e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.google.cloud.flink.bigquery.sink.throttle;

import com.google.cloud.flink.bigquery.sink.BigQueryExactlyOnceSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -40,18 +39,18 @@
*/
public class BigQueryWriterThrottler implements Throttler {

// MAX_SINK_PARALLELISM is set as 128.
public static final int MAX_BUCKETS = BigQueryExactlyOnceSink.MAX_SINK_PARALLELISM / 3;
private static final Logger LOG = LoggerFactory.getLogger(BigQueryWriterThrottler.class);
private final int writerId;
private final int maxBuckets;

public BigQueryWriterThrottler(int writerId) {
public BigQueryWriterThrottler(int writerId, int maxParallelism) {
this.writerId = writerId;
this.maxBuckets = maxParallelism / 3;
}

@Override
public void throttle() {
int waitSeconds = writerId % MAX_BUCKETS;
int waitSeconds = writerId % maxBuckets;
LOG.debug("Throttling writer {} for {} second", writerId, waitSeconds);
try {
// Sleep does nothing if input is 0 or less.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ abstract class BaseWriter<IN> implements SinkWriter<IN> {
BigQuerySchemaProvider schemaProvider,
BigQueryProtoSerializer serializer,
CreateTableOptions createTableOptions,
int maxParallelism,
String traceId) {
this.subtaskId = subtaskId;
this.tablePath = tablePath;
Expand All @@ -135,7 +136,7 @@ abstract class BaseWriter<IN> implements SinkWriter<IN> {
appendRequestSizeBytes = 0L;
appendResponseFuturesQueue = new LinkedList<>();
protoRowsBuilder = ProtoRows.newBuilder();
throttler = new BigQueryWriterThrottler(subtaskId);
throttler = new BigQueryWriterThrottler(subtaskId, maxParallelism);
}

/** Append pending records and validate all remaining append responses. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public BigQueryBufferedWriter(
BigQuerySchemaProvider schemaProvider,
BigQueryProtoSerializer serializer,
CreateTableOptions createTableOptions,
int maxParallelism,
String traceId,
InitContext context) {
this(
Expand All @@ -119,6 +120,7 @@ public BigQueryBufferedWriter(
schemaProvider,
serializer,
createTableOptions,
maxParallelism,
traceId,
context);
}
Expand All @@ -134,6 +136,7 @@ public BigQueryBufferedWriter(
BigQuerySchemaProvider schemaProvider,
BigQueryProtoSerializer serializer,
CreateTableOptions createTableOptions,
int maxParallelism,
String traceId,
InitContext context) {
super(
Expand All @@ -143,6 +146,7 @@ public BigQueryBufferedWriter(
schemaProvider,
serializer,
createTableOptions,
maxParallelism,
traceId);
this.streamNameInState = StringUtils.isNullOrWhitespaceOnly(streamName) ? "" : streamName;
this.streamName = this.streamNameInState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public BigQueryDefaultWriter(
BigQuerySchemaProvider schemaProvider,
BigQueryProtoSerializer serializer,
CreateTableOptions createTableOptions,
int maxParallelism,
String taceId,
InitContext context) {
super(
Expand All @@ -72,6 +73,7 @@ public BigQueryDefaultWriter(
schemaProvider,
serializer,
createTableOptions,
maxParallelism,
taceId);
streamName = String.format("%s/streams/_default", tablePath);
totalRecordsSeen = 0L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.api.services.bigquery.model.JobStatistics2;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
Expand Down Expand Up @@ -146,6 +147,8 @@ public static class FakeQueryDataClient implements QueryDataClient {
private final RuntimeException createDatasetError;
private final RuntimeException createTableError;

private boolean datasetExists;
private String datasetRegion;
private int tableExistsInvocations;
private int createDatasetInvocations;
private int createTableInvocations;
Expand All @@ -162,6 +165,17 @@ public FakeQueryDataClient(
tableExistsInvocations = 0;
createDatasetInvocations = 0;
createTableInvocations = 0;
datasetExists = true;
datasetRegion = "us-central1";
}

public FakeQueryDataClient(boolean datasetExists, String datasetRegion) {
this.datasetExists = datasetExists;
this.datasetRegion = datasetRegion;
tableExists = false;
tableExistsError = null;
createDatasetError = null;
createTableError = null;
}

static FakeQueryDataClient defaultInstance =
Expand Down Expand Up @@ -199,6 +213,14 @@ public TableSchema getTableSchema(String project, String dataset, String table)
return SIMPLE_BQ_TABLE_SCHEMA;
}

@Override
public Dataset getDataset(String project, String dataset) {
Dataset mockedDataset = Mockito.mock(Dataset.class);
Mockito.when(mockedDataset.exists()).thenReturn(datasetExists);
Mockito.when(mockedDataset.getLocation()).thenReturn(datasetRegion);
return mockedDataset;
}

@Override
public void createDataset(String project, String dataset, String region) {
createDatasetInvocations++;
Expand Down Expand Up @@ -882,6 +904,13 @@ public static BigQueryConnectOptions createConnectOptionsForQuery(
return createConnectOptions(null, null, queryClient);
}

public static BigQueryConnectOptions createConnectOptionsForQuery(
boolean datasetExists, String datasetRegion) {
FakeBigQueryServices.FakeQueryDataClient queryClient =
new FakeBigQueryServices.FakeQueryDataClient(datasetExists, datasetRegion);
return createConnectOptions(null, null, queryClient);
}

public static BigQueryConnectOptions createConnectOptions(
FakeBigQueryServices.FakeBigQueryStorageReadClient readClient,
FakeBigQueryServices.FakeBigQueryStorageWriteClient writeClient,
Expand Down
Loading

0 comments on commit bce6d8a

Please sign in to comment.