Skip to content

Commit

Permalink
Allow higher parallelism in multi-regions
Browse files Browse the repository at this point in the history
  • Loading branch information
jayehwhyehentee committed Jan 27, 2025
1 parent b932699 commit ce6e5b0
Show file tree
Hide file tree
Showing 17 changed files with 267 additions and 48 deletions.
11 changes: 6 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ BigQuerySinkTableConfig sinkTableConfig = BigQuerySinkTableConfig.newBuilder()
.project(...) // REQUIRED
.dataset(...) // REQUIRED
.streamExecutionEnvironment(env) // REQUIRED if deliveryGuarantee is EXACTLY_ONCE
.sinkParallelism(...) // OPTIONAL; Should be atmost 128
.sinkParallelism(...) // OPTIONAL;
.deliveryGuarantee(...) // OPTIONAL; Default is AT_LEAST_ONCE
.enableTableCreation(...) // OPTIONAL
.partitionField(...) // OPTIONAL
Expand Down Expand Up @@ -321,11 +321,12 @@ Check out BigQuery SQL's [DDL](https://cloud.google.com/bigquery/docs/reference/
when maintaining records in avro format. Check Flink's [blog on non-trivial serialization](https://nightlies.apache.org/flink/flink-docs-release-1.17/api/java/org/apache/flink/connector/base/DeliveryGuarantee.html#AT_LEAST_ONCE).
Note that avro schema of an existing BigQuery table can be obtained from
[BigQuerySchemaProviderImpl](https://github.com/GoogleCloudDataproc/flink-bigquery-connector/blob/main/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/serializer/BigQuerySchemaProviderImpl.java).
* The maximum parallelism of BigQuery sinks has been capped at **128**. This is to respect BigQuery storage
[write quotas](https://cloud.google.com/bigquery/quotas#write-api-limits) while adhering to
[best usage practices](https://cloud.google.com/bigquery/docs/write-api-best-practices). Users should either set
* The maximum parallelism of BigQuery sinks has been capped at **512** for multi-regions US or EU, and **128** for the rest.
This is to respect BigQuery storage [write quotas](https://cloud.google.com/bigquery/quotas#write-api-limits) while keeping
[throughput](https://cloud.google.com/bigquery/docs/write-api#connections) and
[best usage practices](https://cloud.google.com/bigquery/docs/write-api-best-practices) in mind. Users should either set
[sink level parallelism](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/execution/parallel/#operator-level)
explicitly, or ensure that default job level parallelism is under 128.
explicitly, or ensure that default job level parallelism is under region-specific maximums (512 or 128).
* BigQuerySinkConfig requires the StreamExecutionEnvironment if delivery guarantee is exactly-once.
**Restart strategy must be explicitly set in the StreamExecutionEnvironment**.
This is to [validate](https://github.com/GoogleCloudDataproc/flink-bigquery-connector/blob/92db3690c741fb2cdb99e28c575e19affb5c8b69/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/BigQuerySinkConfig.java#L185)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
package com.google.cloud.flink.bigquery.sink;

import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.util.StringUtils;

import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.TimePartitioning;
import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions;
import com.google.cloud.flink.bigquery.services.BigQueryServicesImpl;
Expand All @@ -29,21 +31,25 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.List;

/** Base class for developing a BigQuery sink. */
abstract class BigQueryBaseSink<IN> implements Sink<IN> {

protected final Logger logger = LoggerFactory.getLogger(getClass());

// BigQuery write streams can offer over 10 MBps throughput, and per project throughput quotas
// are in the order of single digit GBps. With each sink writer maintaining a single and unique
// write connection to BigQuery, maximum parallelism for sink is intentionally restricted to
// 128 for initial releases of this connector. This is also the default max parallelism of
// Flink applications.
// Based on performance observations and user feedback, this number can be increased in the
// future.
public static final int MAX_SINK_PARALLELISM = 128;
// Generally, a single write connection supports at least 1 MBps of throughput.
// Check https://cloud.google.com/bigquery/docs/write-api#connections
// Project-level BQ storage write API throughout in multi-regions (us and eu) is capped at
// 3 GBps. Other regions allow 300 MBps.
// Check https://cloud.google.com/bigquery/quotas#write-api-limits
// Keeping these factors in mind, the BigQuery sink's parallelism is capped according to the
// destination table's region.
public static final int DEFAULT_MAX_SINK_PARALLELISM = 128;
public static final int MULTI_REGION_MAX_SINK_PARALLELISM = 512;
public static final List<String> BQ_MULTI_REGIONS = Arrays.asList("us", "eu");
private static final String DEFAULT_REGION = "us";

final BigQueryConnectOptions connectOptions;
final BigQuerySchemaProvider schemaProvider;
Expand All @@ -55,6 +61,7 @@ abstract class BigQueryBaseSink<IN> implements Sink<IN> {
final Long partitionExpirationMillis;
final List<String> clusteredFields;
final String region;
final int maxParallelism;
final String traceId;

BigQueryBaseSink(BigQuerySinkConfig sinkConfig) {
Expand All @@ -77,7 +84,8 @@ abstract class BigQueryBaseSink<IN> implements Sink<IN> {
partitionType = sinkConfig.getPartitionType();
partitionExpirationMillis = sinkConfig.getPartitionExpirationMillis();
clusteredFields = sinkConfig.getClusteredFields();
region = sinkConfig.getRegion();
region = getRegion(sinkConfig.getRegion());
maxParallelism = getMaxParallelism();
traceId = BigQueryServicesImpl.generateTraceId();
}

Expand All @@ -101,10 +109,10 @@ private void validateSinkConfig(BigQuerySinkConfig sinkConfig) {

/** Ensures Sink's parallelism does not exceed the allowed maximum when scaling Flink job. */
void checkParallelism(int numberOfParallelSubtasks) {
if (numberOfParallelSubtasks > MAX_SINK_PARALLELISM) {
if (numberOfParallelSubtasks > maxParallelism) {
logger.error(
"Maximum allowed parallelism for Sink is {}, but attempting to create Writer number {}",
MAX_SINK_PARALLELISM,
maxParallelism,
numberOfParallelSubtasks);
throw new IllegalStateException("Attempting to create more Sink Writers than allowed");
}
Expand All @@ -119,4 +127,33 @@ CreateTableOptions createTableOptions() {
clusteredFields,
region);
}

private String getRegion(String userProvidedRegion) {
Dataset dataset = BigQueryClientWithErrorHandling.getDataset(connectOptions);
if (dataset == null || !dataset.exists()) {
if (StringUtils.isNullOrWhitespaceOnly(userProvidedRegion)) {
logger.info(
"No destination BQ region provided by user. Using default us multi-region.");
return DEFAULT_REGION;
}
return userProvidedRegion;
}
String datasetLocation = dataset.getLocation();
if (!datasetLocation.equalsIgnoreCase(userProvidedRegion)) {
logger.warn(
"Provided sink dataset region {} will be overridden by dataset's existing location {}",
userProvidedRegion,
datasetLocation);
}
return dataset.getLocation();
}

// Max sink parallelism is deduced using destination dataset's region.
// Ensure instance variable 'region' is assigned before invoking this method.
private int getMaxParallelism() {
if (BQ_MULTI_REGIONS.contains(region)) {
return MULTI_REGION_MAX_SINK_PARALLELISM;
}
return DEFAULT_MAX_SINK_PARALLELISM;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public SinkWriter createWriter(InitContext context) {
schemaProvider,
serializer,
createTableOptions(),
maxParallelism,
traceId,
context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class BigQueryExactlyOnceSink<IN> extends BigQueryBaseSink<IN>
schemaProvider,
serializer,
createTableOptions(),
maxParallelism,
traceId,
context);
}
Expand All @@ -82,6 +83,7 @@ public class BigQueryExactlyOnceSink<IN> extends BigQueryBaseSink<IN>
schemaProvider,
serializer,
createTableOptions(),
maxParallelism,
traceId,
context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions;
import com.google.cloud.flink.bigquery.common.exceptions.BigQueryConnectorException;
Expand Down Expand Up @@ -135,4 +136,19 @@ public static void createTable(
e);
}
}

public static Dataset getDataset(BigQueryConnectOptions connectOptions) {
try {
BigQueryServices.QueryDataClient queryDataClient =
BigQueryServicesFactory.instance(connectOptions).queryClient();
return queryDataClient.getDataset(
connectOptions.getProjectId(), connectOptions.getDataset());
} catch (BigQueryException e) {
throw new BigQueryConnectorException(
String.format(
"Unable to check existence of BigQuery dataset %s.%s",
connectOptions.getProjectId(), connectOptions.getDataset()),
e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.google.cloud.flink.bigquery.sink.throttle;

import com.google.cloud.flink.bigquery.sink.BigQueryExactlyOnceSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -40,18 +39,18 @@
*/
public class BigQueryWriterThrottler implements Throttler {

// MAX_SINK_PARALLELISM is set as 128.
public static final int MAX_BUCKETS = BigQueryExactlyOnceSink.MAX_SINK_PARALLELISM / 3;
private static final Logger LOG = LoggerFactory.getLogger(BigQueryWriterThrottler.class);
private final int writerId;
private final int maxBuckets;

public BigQueryWriterThrottler(int writerId) {
public BigQueryWriterThrottler(int writerId, int maxParallelism) {
this.writerId = writerId;
this.maxBuckets = maxParallelism / 3;
}

@Override
public void throttle() {
int waitSeconds = writerId % MAX_BUCKETS;
int waitSeconds = writerId % maxBuckets;
LOG.debug("Throttling writer {} for {} second", writerId, waitSeconds);
try {
// Sleep does nothing if input is 0 or less.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ abstract class BaseWriter<IN> implements SinkWriter<IN> {
BigQuerySchemaProvider schemaProvider,
BigQueryProtoSerializer serializer,
CreateTableOptions createTableOptions,
int maxParallelism,
String traceId) {
this.subtaskId = subtaskId;
this.tablePath = tablePath;
Expand All @@ -135,7 +136,7 @@ abstract class BaseWriter<IN> implements SinkWriter<IN> {
appendRequestSizeBytes = 0L;
appendResponseFuturesQueue = new LinkedList<>();
protoRowsBuilder = ProtoRows.newBuilder();
throttler = new BigQueryWriterThrottler(subtaskId);
throttler = new BigQueryWriterThrottler(subtaskId, maxParallelism);
}

/** Append pending records and validate all remaining append responses. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public BigQueryBufferedWriter(
BigQuerySchemaProvider schemaProvider,
BigQueryProtoSerializer serializer,
CreateTableOptions createTableOptions,
int maxParallelism,
String traceId,
InitContext context) {
this(
Expand All @@ -119,6 +120,7 @@ public BigQueryBufferedWriter(
schemaProvider,
serializer,
createTableOptions,
maxParallelism,
traceId,
context);
}
Expand All @@ -134,6 +136,7 @@ public BigQueryBufferedWriter(
BigQuerySchemaProvider schemaProvider,
BigQueryProtoSerializer serializer,
CreateTableOptions createTableOptions,
int maxParallelism,
String traceId,
InitContext context) {
super(
Expand All @@ -143,6 +146,7 @@ public BigQueryBufferedWriter(
schemaProvider,
serializer,
createTableOptions,
maxParallelism,
traceId);
this.streamNameInState = StringUtils.isNullOrWhitespaceOnly(streamName) ? "" : streamName;
this.streamName = this.streamNameInState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public BigQueryDefaultWriter(
BigQuerySchemaProvider schemaProvider,
BigQueryProtoSerializer serializer,
CreateTableOptions createTableOptions,
int maxParallelism,
String taceId,
InitContext context) {
super(
Expand All @@ -72,6 +73,7 @@ public BigQueryDefaultWriter(
schemaProvider,
serializer,
createTableOptions,
maxParallelism,
taceId);
streamName = String.format("%s/streams/_default", tablePath);
totalRecordsSeen = 0L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.api.services.bigquery.model.JobStatistics2;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
Expand Down Expand Up @@ -146,6 +147,8 @@ public static class FakeQueryDataClient implements QueryDataClient {
private final RuntimeException createDatasetError;
private final RuntimeException createTableError;

private boolean datasetExists;
private String datasetRegion;
private int tableExistsInvocations;
private int createDatasetInvocations;
private int createTableInvocations;
Expand All @@ -162,6 +165,17 @@ public FakeQueryDataClient(
tableExistsInvocations = 0;
createDatasetInvocations = 0;
createTableInvocations = 0;
datasetExists = true;
datasetRegion = "us-central1";
}

public FakeQueryDataClient(boolean datasetExists, String datasetRegion) {
this.datasetExists = datasetExists;
this.datasetRegion = datasetRegion;
tableExists = false;
tableExistsError = null;
createDatasetError = null;
createTableError = null;
}

static FakeQueryDataClient defaultInstance =
Expand Down Expand Up @@ -199,6 +213,14 @@ public TableSchema getTableSchema(String project, String dataset, String table)
return SIMPLE_BQ_TABLE_SCHEMA;
}

@Override
public Dataset getDataset(String project, String dataset) {
Dataset mockedDataset = Mockito.mock(Dataset.class);
Mockito.when(mockedDataset.exists()).thenReturn(datasetExists);
Mockito.when(mockedDataset.getLocation()).thenReturn(datasetRegion);
return mockedDataset;
}

@Override
public void createDataset(String project, String dataset, String region) {
createDatasetInvocations++;
Expand Down Expand Up @@ -882,6 +904,13 @@ public static BigQueryConnectOptions createConnectOptionsForQuery(
return createConnectOptions(null, null, queryClient);
}

public static BigQueryConnectOptions createConnectOptionsForQuery(
boolean datasetExists, String datasetRegion) {
FakeBigQueryServices.FakeQueryDataClient queryClient =
new FakeBigQueryServices.FakeQueryDataClient(datasetExists, datasetRegion);
return createConnectOptions(null, null, queryClient);
}

public static BigQueryConnectOptions createConnectOptions(
FakeBigQueryServices.FakeBigQueryStorageReadClient readClient,
FakeBigQueryServices.FakeBigQueryStorageWriteClient writeClient,
Expand Down
Loading

0 comments on commit ce6e5b0

Please sign in to comment.