diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/BigQueryDefaultSink.java b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/BigQueryDefaultSink.java index 510ffbed..8c10aaa5 100644 --- a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/BigQueryDefaultSink.java +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/BigQueryDefaultSink.java @@ -38,6 +38,6 @@ class BigQueryDefaultSink extends BigQueryBaseSink { public SinkWriter createWriter(InitContext context) { checkParallelism(context.getNumberOfParallelSubtasks()); return new BigQueryDefaultWriter( - context.getSubtaskId(), connectOptions, schemaProvider, serializer, tablePath); + context.getSubtaskId(), tablePath, connectOptions, schemaProvider, serializer); } } diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/BigQueryExactlyOnceSink.java b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/BigQueryExactlyOnceSink.java index 27918a8a..e62ffc94 100644 --- a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/BigQueryExactlyOnceSink.java +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/BigQueryExactlyOnceSink.java @@ -31,7 +31,7 @@ *

Depending on the checkpointing mode, this writer offers the following consistency guarantees: *

  • {@link CheckpointingMode#EXACTLY_ONCE}: exactly-once write consistency. *
  • {@link CheckpointingMode#AT_LEAST_ONCE}: at-least-once write consistency. - *
  • Checkpointing disabled: no consistency guarantee. + *
  • Checkpointing disabled (NOT RECOMMENDED!): no consistency guarantee. * * @param Type of records written to BigQuery */ diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/throttle/WriteStreamCreationThrottler.java b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/throttle/WriteStreamCreationThrottler.java new file mode 100644 index 00000000..41b7a79b --- /dev/null +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/throttle/WriteStreamCreationThrottler.java @@ -0,0 +1,61 @@ +/* + * Copyright 2024 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.flink.bigquery.sink.throttle; + +import com.google.cloud.flink.bigquery.sink.BigQueryExactlyOnceSink; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +/** + * Throttler implementation for BigQuery write stream creation. + * + *

    Each {@link BigQueryBufferedWriter} will invoke BigQuery's CreateWriteStream API before its + * initial write to a BigQuery table. This API, however, requires a low QPS (~3) for best + * performance in steady state since write stream creation is an expensive operation for BigQuery + * storage backend. Hence, this throttler is responsible for distributing writers into buckets which + * correspond to a specific "wait" duration before calling the CreateWriteStream API. + * + *

    Note that actual separation between CreateWriteStream invocations across all writers will not + * ensure exact QPS of 3, because neither all writers are initialized at the same instant, nor do + * they all identify the need to create a write stream after some uniform fixed duration. Given + * these uncontrollable factors, this throttler aims to achieve 3 QPS on a best effort basis. + */ +public class WriteStreamCreationThrottler implements Throttler { + + // MAX_SINK_PARALLELISM is set as 128. + public static final int MAX_BUCKETS = BigQueryExactlyOnceSink.MAX_SINK_PARALLELISM / 3; + private static final Logger LOG = LoggerFactory.getLogger(WriteStreamCreationThrottler.class); + private final int writerId; + + public WriteStreamCreationThrottler(int writerId) { + this.writerId = writerId; + } + + public void throttle() { + int waitSeconds = writerId % MAX_BUCKETS; + LOG.debug("Throttling writer {} for {} second", writerId, waitSeconds); + try { + // Sleep does nothing if input is 0 or less. + TimeUnit.SECONDS.sleep(waitSeconds); + } catch (InterruptedException e) { + LOG.warn("Throttle attempt interrupted in subtask {}", writerId); + Thread.currentThread().interrupt(); + } + } +} diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/writer/BaseWriter.java b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/writer/BaseWriter.java index 3d15e717..5c1af36a 100644 --- a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/writer/BaseWriter.java +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/writer/BaseWriter.java @@ -25,6 +25,7 @@ import com.google.cloud.bigquery.storage.v1.ProtoSchema; import com.google.cloud.bigquery.storage.v1.ProtoSchemaConverter; import com.google.cloud.bigquery.storage.v1.StreamWriter; +import com.google.cloud.bigquery.storage.v1.WriteStream; import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions; import com.google.cloud.flink.bigquery.services.BigQueryServices; import com.google.cloud.flink.bigquery.services.BigQueryServicesFactory; @@ -70,23 +71,34 @@ abstract class BaseWriter implements SinkWriter { // Number of bytes to be sent in the next append request. private long appendRequestSizeBytes; - private BigQueryServices.StorageWriteClient writeClient; protected final int subtaskId; + private final String tablePath; private final BigQueryConnectOptions connectOptions; private final ProtoSchema protoSchema; private final BigQueryProtoSerializer serializer; - private final Queue appendResponseFuturesQueue; private final ProtoRows.Builder protoRowsBuilder; + final Queue appendResponseFuturesQueue; + // Initialization of writeClient has been deferred to first append call. BigQuery's best + // practices suggest that client connections should be opened when needed. + BigQueryServices.StorageWriteClient writeClient; StreamWriter streamWriter; String streamName; + long totalRecordsSeen; + // In exactly-once mode, "totalRecordsWritten" actually represents records appended to a + // write stream by this writer. Only at a checkpoint, when sink's commit is invoked, will + // the records in a stream get committed to the table. Hence, records written to BigQuery + // table is equal to this "totalRecordsWritten" only upon checkpoint completion. + long totalRecordsWritten; BaseWriter( int subtaskId, + String tablePath, BigQueryConnectOptions connectOptions, BigQuerySchemaProvider schemaProvider, BigQueryProtoSerializer serializer) { this.subtaskId = subtaskId; + this.tablePath = tablePath; this.connectOptions = connectOptions; this.protoSchema = getProtoSchema(schemaProvider); this.serializer = serializer; @@ -102,7 +114,7 @@ public void flush(boolean endOfInput) { if (appendRequestSizeBytes > 0) { append(); } - logger.debug("Validating all pending append responses in subtask {}", subtaskId); + logger.info("Validating all pending append responses in subtask {}", subtaskId); validateAppendResponses(true); } @@ -125,10 +137,10 @@ public void close() { } /** Invoke BigQuery storage API for appending data to a table. */ - abstract ApiFuture sendAppendRequest(ProtoRows protoRows); + abstract void sendAppendRequest(ProtoRows protoRows); /** Checks append response for errors. */ - abstract void validateAppendResponse(ApiFuture appendResponseFuture); + abstract void validateAppendResponse(AppendInfo appendInfo); /** Add serialized record to append request. */ void addToAppendRequest(ByteString protoRow) { @@ -138,21 +150,45 @@ void addToAppendRequest(ByteString protoRow) { /** Send append request to BigQuery storage and prepare for next append request. */ void append() { - ApiFuture responseFuture = sendAppendRequest(protoRowsBuilder.build()); - appendResponseFuturesQueue.add(responseFuture); + sendAppendRequest(protoRowsBuilder.build()); protoRowsBuilder.clear(); appendRequestSizeBytes = 0L; } /** Creates a StreamWriter for appending to BigQuery table. */ - StreamWriter createStreamWriter(boolean enableConnectionPool) { - logger.debug("Creating BigQuery StreamWriter in subtask {}", subtaskId); + void createStreamWriter(boolean enableConnectionPool) { try { - writeClient = BigQueryServicesFactory.instance(connectOptions).storageWrite(); - return writeClient.createStreamWriter(streamName, protoSchema, enableConnectionPool); + if (writeClient == null) { + writeClient = BigQueryServicesFactory.instance(connectOptions).storageWrite(); + } + logger.info( + "Creating BigQuery StreamWriter for write stream {} in subtask {}", + streamName, + subtaskId); + streamWriter = + writeClient.createStreamWriter(streamName, protoSchema, enableConnectionPool); + } catch (IOException e) { + logger.error( + String.format( + "Unable to create StreamWriter for stream %s in subtask %d", + streamName, subtaskId), + e); + throw new BigQueryConnectorException("Unable to connect to BigQuery", e); + } + } + + /** Creates a write stream for appending to BigQuery table. */ + void createWriteStream(WriteStream.Type streamType) { + try { + if (writeClient == null) { + writeClient = BigQueryServicesFactory.instance(connectOptions).storageWrite(); + } + logger.info("Creating BigQuery write stream in subtask {}", subtaskId); + streamName = writeClient.createWriteStream(tablePath, streamType).getName(); } catch (IOException e) { - logger.error("Unable to create StreamWriter for stream {}", streamName); - throw new BigQueryConnectorException("Unable to create StreamWriter", e); + logger.error( + String.format("Unable to create write stream in subtask %d", subtaskId), e); + throw new BigQueryConnectorException("Unable to connect to BigQuery", e); } } @@ -196,17 +232,55 @@ private int getProtoRowBytes(ByteString protoRow) { * order, we proceed to check the next response only after the previous one has arrived. */ void validateAppendResponses(boolean waitForResponse) { - ApiFuture appendResponseFuture; - while ((appendResponseFuture = appendResponseFuturesQueue.peek()) != null) { - if (waitForResponse || appendResponseFuture.isDone()) { + while (!appendResponseFuturesQueue.isEmpty()) { + AppendInfo appendInfo = appendResponseFuturesQueue.peek(); + if (waitForResponse || appendInfo.getFuture().isDone()) { appendResponseFuturesQueue.poll(); - validateAppendResponse(appendResponseFuture); + validateAppendResponse(appendInfo); } else { break; } } } + void logAndThrowFatalException(Throwable error) { + logger.error(String.format("AppendRows request failed in subtask %d", subtaskId), error); + throw new BigQueryConnectorException("Error while writing to BigQuery", error); + } + + void logAndThrowFatalException(String errorMessage) { + logger.error( + String.format( + "AppendRows request failed in subtask %d\n%s", subtaskId, errorMessage)); + throw new BigQueryConnectorException( + String.format("Error while writing to BigQuery\n%s", errorMessage)); + } + + static class AppendInfo { + private final ApiFuture future; + private final long expectedOffset; + private final long recordsAppended; + + AppendInfo( + ApiFuture future, long expectedOffset, long recordsAppended) { + this.future = future; + this.expectedOffset = expectedOffset; + this.recordsAppended = recordsAppended; + } + + public ApiFuture getFuture() { + return future; + } + + public long getExpectedOffset() { + return expectedOffset; + } + + public long getRecordsAppended() { + return recordsAppended; + } + } + /** * Following "getters" expose some internal fields required for testing. * diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryBufferedWriter.java b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryBufferedWriter.java index 85852f14..edb82e6a 100644 --- a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryBufferedWriter.java +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryBufferedWriter.java @@ -16,22 +16,41 @@ package com.google.cloud.flink.bigquery.sink.writer; +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.StringUtils; + import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.Exceptions.OffsetAlreadyExists; +import com.google.cloud.bigquery.storage.v1.Exceptions.OffsetOutOfRange; +import com.google.cloud.bigquery.storage.v1.Exceptions.StreamFinalizedException; +import com.google.cloud.bigquery.storage.v1.Exceptions.StreamNotFound; import com.google.cloud.bigquery.storage.v1.ProtoRows; +import com.google.cloud.bigquery.storage.v1.WriteStream; import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions; import com.google.cloud.flink.bigquery.sink.TwoPhaseCommittingStatefulSink; import com.google.cloud.flink.bigquery.sink.committer.BigQueryCommittable; +import com.google.cloud.flink.bigquery.sink.exceptions.BigQueryConnectorException; +import com.google.cloud.flink.bigquery.sink.exceptions.BigQuerySerializationException; import com.google.cloud.flink.bigquery.sink.serializer.BigQueryProtoSerializer; import com.google.cloud.flink.bigquery.sink.serializer.BigQuerySchemaProvider; +import com.google.cloud.flink.bigquery.sink.throttle.Throttler; +import com.google.cloud.flink.bigquery.sink.throttle.WriteStreamCreationThrottler; +import com.google.protobuf.ByteString; import java.io.IOException; import java.util.Collection; +import java.util.Collections; import java.util.List; +import java.util.concurrent.ExecutionException; /** * Writer implementation for {@link BigQueryBufferedSink}. * + *

    Each {@link BigQueryBufferedWriter} will write to an exclusive write stream, implying same + * number of active write streams as writers at any given point of time. + * *

    This writer appends records to the BigQuery table's buffered write stream. This means that * records are buffered in the stream until flushed (BigQuery write API, different from sink * writer's flush). Records will be written to the destination table after the BigQuery flush API is @@ -45,7 +64,7 @@ *

    Depending on the checkpointing mode, this writer offers the following consistency guarantees: *

  • {@link CheckpointingMode#EXACTLY_ONCE}: exactly-once write consistency. *
  • {@link CheckpointingMode#AT_LEAST_ONCE}: at-least-once write consistency. - *
  • {Checkpointing disabled}: no write consistency. + *
  • Checkpointing disabled: no write consistency. * * @param Type of records to be written to BigQuery. */ @@ -53,37 +72,257 @@ public class BigQueryBufferedWriter extends BaseWriter implements TwoPhaseCommittingStatefulSink.PrecommittingStatefulSinkWriter< IN, BigQueryWriterState, BigQueryCommittable> { + // Write stream creation must be throttled to ensure proper client usage. + private final Throttler writeStreamCreationThrottler; + + // Write stream name stored in writer's state. In case of a new writer, this will be an empty + // string until first checkpoint. + private String streamNameInState; + + // Offset position where next append should occur in current stream. + private long streamOffset; + + // Write stream offset stored in writer's state. In case of a new writer, this will be an 0 + // until first checkpoint. + private long streamOffsetInState; + + // Number of rows appended by this writer to current stream. + private long appendRequestRowCount; + public BigQueryBufferedWriter( int subtaskId, + String streamName, + long streamOffset, + String tablePath, + long totalRecordsSeen, + long totalRecordsWritten, BigQueryConnectOptions connectOptions, BigQuerySchemaProvider schemaProvider, BigQueryProtoSerializer serializer) { - super(subtaskId, connectOptions, schemaProvider, serializer); + super(subtaskId, tablePath, connectOptions, schemaProvider, serializer); + this.streamNameInState = StringUtils.isNullOrWhitespaceOnly(streamName) ? "" : streamName; + this.streamName = this.streamNameInState; + this.streamOffsetInState = streamOffset; + this.streamOffset = streamOffset; + this.totalRecordsSeen = totalRecordsSeen; + this.totalRecordsWritten = totalRecordsWritten; + writeStreamCreationThrottler = new WriteStreamCreationThrottler(subtaskId); + appendRequestRowCount = 0L; } + /** + * Accept record for writing to BigQuery table. + * + * @param element Record to write + * @param context {@link Context} for input record + */ @Override - public void write(IN element, Context context) throws IOException, InterruptedException { - throw new UnsupportedOperationException("write not implemented"); + public void write(IN element, Context context) { + totalRecordsSeen++; + try { + ByteString protoRow = getProtoRow(element); + if (!fitsInAppendRequest(protoRow)) { + validateAppendResponses(false); + append(); + } + addToAppendRequest(protoRow); + appendRequestRowCount++; + } catch (BigQuerySerializationException e) { + logger.error(String.format("Unable to serialize record %s. Dropping it!", element), e); + } } + /** + * Asynchronously append to BigQuery table's buffered stream. + * + *

    If a writer has been initialized for the very first time, then it will not have an + * associated write stream and must create one before appending data to it. + * + *

    If a writer has been restored after failure recovery, then it already has an associated + * stream. Before appending data to it again, the writer needs to check if this stream is still + * usable. The stream may be corrupt due to several reasons (listed below in code), in which + * case it must be discarded and the writer will create a new write stream. If the stream was + * not corrupt and is indeed usable, then the writer will continue appending to it. + */ @Override - ApiFuture sendAppendRequest(ProtoRows protoRows) { - throw new UnsupportedOperationException("sendAppendRequest not implemented"); + void sendAppendRequest(ProtoRows protoRows) { + long rowCount = protoRows.getSerializedRowsCount(); + if (streamOffset == streamOffsetInState + && streamName.equals(streamNameInState) + && !StringUtils.isNullOrWhitespaceOnly(streamName)) { + // Writer has an associated write stream and is invoking append for the first + // time since re-initialization. + performFirstAppendOnRestoredStream(protoRows, rowCount); + return; + } + if (StringUtils.isNullOrWhitespaceOnly(streamName)) { + // Throttle stream creation to ensure proper usage of BigQuery createWriteStream API. + logger.info("Throttling creation of BigQuery write stream in subtask {}", subtaskId); + writeStreamCreationThrottler.throttle(); + createWriteStream(WriteStream.Type.BUFFERED); + createStreamWriter(false); + } + ApiFuture future = streamWriter.append(protoRows, streamOffset); + postAppendOps(future, rowCount); } + /** Throws a RuntimeException if an error is found with append response. */ @Override - void validateAppendResponse(ApiFuture appendResponseFuture) { - throw new UnsupportedOperationException("validateAppendResponse not implemented"); + void validateAppendResponse(AppendInfo appendInfo) { + ApiFuture appendResponseFuture = appendInfo.getFuture(); + long expectedOffset = appendInfo.getExpectedOffset(); + long recordsAppended = appendInfo.getRecordsAppended(); + AppendRowsResponse response; + try { + response = appendResponseFuture.get(); + if (response.hasError()) { + logAndThrowFatalException(response.getError().getMessage()); + } + long offset = response.getAppendResult().getOffset().getValue(); + if (offset != expectedOffset) { + logAndThrowFatalException( + String.format( + "Inconsistent offset in BigQuery API response. Found %d, expected %d", + offset, expectedOffset)); + } + totalRecordsWritten += recordsAppended; + } catch (ExecutionException | InterruptedException e) { + if (e.getCause().getClass() == OffsetAlreadyExists.class) { + logger.info( + "Ignoring OffsetAlreadyExists error in subtask {} as this can be due to faulty retries", + subtaskId); + return; + } + logAndThrowFatalException(e); + } } @Override public Collection prepareCommit() throws IOException, InterruptedException { - throw new UnsupportedOperationException("prepareCommit not implemented"); + logger.info("Preparing commit in subtask {}", subtaskId); + if (streamOffset == 0 + || streamNameInState.equals(streamName) && streamOffset == streamOffsetInState) { + logger.info("No new data appended in subtask {}. Nothing to commit.", subtaskId); + return Collections.EMPTY_LIST; + } + return Collections.singletonList( + new BigQueryCommittable(subtaskId, streamName, streamOffset)); } @Override public List snapshotState(long checkpointId) throws IOException { - throw new UnsupportedOperationException("snapshotState not implemented"); + logger.info("Snapshotting state in subtask {} for checkpoint {}", subtaskId, checkpointId); + streamNameInState = streamName; + streamOffsetInState = streamOffset; + return Collections.singletonList( + // Note that it's possible to store the associated checkpointId in writer's state. + // For now, we're not leveraging this due to absence of a use case. + new BigQueryWriterState( + streamName, streamOffset, totalRecordsSeen, totalRecordsWritten)); + } + + @Override + public void close() { + if (!streamNameInState.equals(streamName) || streamOffsetInState != streamOffset) { + // Either new stream was created which will not be stored in any state, or something was + // appended to the existing stream which will not be committed. In both scenarios, the + // stream is not usable and must be finalized, i.e. "closed". + finalizeStream(); + } + super.close(); + } + + private void performFirstAppendOnRestoredStream(ProtoRows protoRows, long rowCount) { + try { + // Connection pool (method parameter below) can be enabled only for default stream. + createStreamWriter(false); + } catch (BigQueryConnectorException e) { + // If StreamWriter could not be created for this write stream, then discard it. + discardStreamAndResendAppendRequest(e, protoRows); + return; + } + ApiFuture future = streamWriter.append(protoRows, streamOffset); + AppendRowsResponse response; + try { + // Get this future immediately to check whether append worked or not, inferring stream + // is usable or not. + response = future.get(); + postAppendOps(ApiFutures.immediateFuture(response), rowCount); + } catch (ExecutionException | InterruptedException e) { + if (e.getCause().getClass() == OffsetAlreadyExists.class + || e.getCause().getClass() == OffsetOutOfRange.class + || e.getCause().getClass() == StreamFinalizedException.class + || e.getCause().getClass() == StreamNotFound.class) { + discardStreamAndResendAppendRequest(e, protoRows); + return; + } + // Append failed for some unexpected reason. This "might be" fatal and the job owner + // should intervene. + logAndThrowFatalException(e); + } + } + + private void discardStreamAndResendAppendRequest(Exception e, ProtoRows protoRows) { + discardStream(e); + sendAppendRequest(protoRows); + } + + private void discardStream(Exception e) { + logger.info( + String.format( + "Writer %d cannot use stream %s. Discarding this stream.", + subtaskId, streamName), + e); + finalizeStream(); + // Empty streamName will prompt following sendAppendRequest invocation to create anew write + // stream. + streamName = ""; + // Also discard the offset. + streamOffset = 0L; + } + + private void finalizeStream() { + logger.debug("Finalizing write stream {} in subtask {}", streamName, subtaskId); + try { + writeClient.finalizeWriteStream(streamName); + } catch (Exception innerException) { + // Do not fret! + // This is not fatal. + logger.debug( + String.format( + "Failed while finalizing write stream %s in subtask %d", + streamName, subtaskId), + innerException); + } + } + + private void postAppendOps(ApiFuture future, long rowCount) { + appendResponseFuturesQueue.add(new AppendInfo(future, streamOffset, rowCount)); + streamOffset += appendRequestRowCount; + appendRequestRowCount = 0L; + } + + /** + * Following "getters" expose some internal fields required for testing. + * + *

    In addition to keeping these methods package private, ensure that exposed field cannot be + * changed in a way that alters the class instance's state. + * + *

    Do NOT use these methods outside tests! + */ + @Internal + long getStreamOffset() { + return streamOffset; + } + + @Internal + long getStreamOffsetInState() { + return streamOffsetInState; + } + + @Internal + String getStreamNameInState() { + return streamNameInState; } } diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryDefaultWriter.java b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryDefaultWriter.java index 648a097e..5cdbc2dc 100644 --- a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryDefaultWriter.java +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryDefaultWriter.java @@ -20,7 +20,6 @@ import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; import com.google.cloud.bigquery.storage.v1.ProtoRows; import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions; -import com.google.cloud.flink.bigquery.sink.exceptions.BigQueryConnectorException; import com.google.cloud.flink.bigquery.sink.exceptions.BigQuerySerializationException; import com.google.cloud.flink.bigquery.sink.serializer.BigQueryProtoSerializer; import com.google.cloud.flink.bigquery.sink.serializer.BigQuerySchemaProvider; @@ -43,7 +42,7 @@ *

    Depending on the checkpointing mode, this writer offers the following consistency guarantees: *

  • {@link CheckpointingMode#EXACTLY_ONCE}: at-least-once write consistency. *
  • {@link CheckpointingMode#AT_LEAST_ONCE}: at-least-once write consistency. - *
  • {Checkpointing disabled}: no write consistency. + *
  • Checkpointing disabled: no write consistency. * * @param Type of records to be written to BigQuery. */ @@ -51,17 +50,25 @@ public class BigQueryDefaultWriter extends BaseWriter { public BigQueryDefaultWriter( int subtaskId, + String tablePath, BigQueryConnectOptions connectOptions, BigQuerySchemaProvider schemaProvider, - BigQueryProtoSerializer serializer, - String tablePath) { - super(subtaskId, connectOptions, schemaProvider, serializer); + BigQueryProtoSerializer serializer) { + super(subtaskId, tablePath, connectOptions, schemaProvider, serializer); streamName = String.format("%s/streams/_default", tablePath); + totalRecordsSeen = 0L; + totalRecordsWritten = 0L; } - /** Accept record for writing to BigQuery table. */ + /** + * Accept record for writing to BigQuery table. + * + * @param element Record to write + * @param context {@link Context} for input record + */ @Override public void write(IN element, Context context) { + totalRecordsSeen++; try { ByteString protoRow = getProtoRow(element); if (!fitsInAppendRequest(protoRow)) { @@ -76,37 +83,30 @@ public void write(IN element, Context context) { /** Asynchronously append to BigQuery table's default stream. */ @Override - ApiFuture sendAppendRequest(ProtoRows protoRows) { + void sendAppendRequest(ProtoRows protoRows) { if (streamWriter == null) { - streamWriter = createStreamWriter(true); + createStreamWriter(true); } - return streamWriter.append(protoRows); + ApiFuture response = streamWriter.append(protoRows); + appendResponseFuturesQueue.add( + new AppendInfo(response, -1L, Long.valueOf(protoRows.getSerializedRowsCount()))); } /** Throws a RuntimeException if an error is found with append response. */ @Override - void validateAppendResponse(ApiFuture appendResponseFuture) { + void validateAppendResponse(AppendInfo appendInfo) { + // Offset has no relevance when appending to the default write stream. + ApiFuture appendResponseFuture = appendInfo.getFuture(); + long recordsAppended = appendInfo.getRecordsAppended(); AppendRowsResponse response; try { response = appendResponseFuture.get(); + if (response.hasError()) { + logAndThrowFatalException(response.getError().getMessage()); + } + totalRecordsWritten += recordsAppended; } catch (ExecutionException | InterruptedException e) { - logger.error( - String.format( - "Exception while retrieving AppendRowsResponse in subtask %s", - subtaskId), - e); - throw new BigQueryConnectorException( - "Error getting response for BigQuery write API", e); - } - if (response.hasError()) { - logger.error( - String.format( - "Request to AppendRows failed in subtask %s with error %s", - subtaskId, response.getError().getMessage())); - throw new BigQueryConnectorException( - String.format( - "Exception while writing to BigQuery table: %s", - response.getError().getMessage())); + logAndThrowFatalException(e); } } } diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryWriterState.java b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryWriterState.java index 5dbbcbbd..f2e287f7 100644 --- a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryWriterState.java +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryWriterState.java @@ -22,8 +22,8 @@ public class BigQueryWriterState extends BigQueryStreamState { // Used for Flink metrics. - private long totalRecordsSeen; - private long totalRecordsWritten; + private final long totalRecordsSeen; + private final long totalRecordsWritten; public BigQueryWriterState( String streamName, long streamOffset, long totalRecordsSeen, long totalRecordsWritten) { diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/fakes/StorageClientFaker.java b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/fakes/StorageClientFaker.java index 1e8b03e9..4f9ddac4 100644 --- a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/fakes/StorageClientFaker.java +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/fakes/StorageClientFaker.java @@ -19,6 +19,7 @@ import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.SerializableFunction; +import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobStatistics; @@ -58,6 +59,7 @@ import org.apache.avro.io.EncoderFactory; import org.apache.avro.util.RandomData; import org.mockito.Mockito; +import org.mockito.stubbing.OngoingStubbing; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -313,11 +315,40 @@ public void close() {} public static class FakeBigQueryStorageWriteClient implements StorageWriteClient { private final StreamWriter mockedWriter; + private final WriteStream writeStream; + private final FlushRowsResponse flushResponse; + private final FinalizeWriteStreamResponse finalizeResponse; + + private int createWriteStreamInvocations; + private int finalizeWriteStreamInvocations; public FakeBigQueryStorageWriteClient(AppendRowsResponse appendResponse) { mockedWriter = Mockito.mock(StreamWriter.class); Mockito.when(mockedWriter.append(Mockito.any())) .thenReturn(ApiFutures.immediateFuture(appendResponse)); + writeStream = null; + flushResponse = null; + finalizeResponse = null; + } + + public FakeBigQueryStorageWriteClient( + ApiFuture[] appendResponseFutures, + WriteStream writeStream, + FlushRowsResponse flushResponse, + FinalizeWriteStreamResponse finalizeResponse) { + mockedWriter = Mockito.mock(StreamWriter.class); + // Mockito cannot unbox "any()" for primitive types, throwing the dreaded + // NullPointerException. Hence, use primitive variants for argument matching. + OngoingStubbing stubbing = + Mockito.when(mockedWriter.append(Mockito.any(), Mockito.anyLong())); + for (ApiFuture future : appendResponseFutures) { + stubbing = stubbing.thenReturn(future); + } + this.writeStream = writeStream; + this.flushResponse = flushResponse; + this.finalizeResponse = finalizeResponse; + createWriteStreamInvocations = 0; + finalizeWriteStreamInvocations = 0; } @Override @@ -328,23 +359,48 @@ public StreamWriter createStreamWriter( @Override public WriteStream createWriteStream(String tablePath, WriteStream.Type streamType) { - throw new UnsupportedOperationException("fake createWriteStream not supported"); + createWriteStreamInvocations++; + assert streamType == WriteStream.Type.BUFFERED; + if (writeStream == null) { + throw new RuntimeException("testing error scenario"); + } + return writeStream; } @Override public FlushRowsResponse flushRows(String streamName, long offset) { - throw new UnsupportedOperationException("fake flushRows not supported"); + if (flushResponse == null) { + throw new RuntimeException("testing error scenario"); + } + return flushResponse; } @Override public FinalizeWriteStreamResponse finalizeWriteStream(String streamName) { - throw new UnsupportedOperationException("fake finalizeWriteStream not supported"); + finalizeWriteStreamInvocations++; + if (finalizeResponse == null) { + throw new RuntimeException("testing error scenario"); + } + return finalizeResponse; } @Override public void close() { Mockito.when(mockedWriter.isUserClosed()).thenReturn(true); } + + public int getCreateWriteStreamInvocations() { + return createWriteStreamInvocations; + } + + public int getFinalizeWriteStreamInvocations() { + return finalizeWriteStreamInvocations; + } + + public void verifytAppendWithOffsetInvocations(int expectedInvocations) { + Mockito.verify(mockedWriter, Mockito.times(expectedInvocations)) + .append(Mockito.any(), Mockito.anyLong()); + } } } @@ -717,6 +773,31 @@ public static BigQueryConnectOptions createConnectOptionsForWrite( .build(); } + public static BigQueryConnectOptions createConnectOptionsForWrite( + ApiFuture[] appendResponseFutures, + WriteStream writeStream, + FlushRowsResponse flushResponse, + FinalizeWriteStreamResponse finalizeResponse) + throws IOException { + return BigQueryConnectOptions.builder() + .setDataset("dataset") + .setProjectId("project") + .setTable("table") + .setCredentialsOptions(null) + .setTestingBigQueryServices( + () -> { + return FakeBigQueryServices.getInstance( + null, + new StorageClientFaker.FakeBigQueryServices + .FakeBigQueryStorageWriteClient( + appendResponseFutures, + writeStream, + flushResponse, + finalizeResponse)); + }) + .build(); + } + public static final TableSchema SIMPLE_BQ_TABLE_SCHEMA_TABLE = new TableSchema() .setFields( diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/BigQueryDefaultSinkTest.java b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/BigQueryDefaultSinkTest.java index 9ec68daa..cd7c2381 100644 --- a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/BigQueryDefaultSinkTest.java +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/BigQueryDefaultSinkTest.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2023 Google Inc. + * Copyright (C) 2024 Google Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/BigQuerySinkTest.java b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/BigQuerySinkTest.java index e72cd297..3a45ec44 100644 --- a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/BigQuerySinkTest.java +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/BigQuerySinkTest.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2023 Google Inc. + * Copyright (C) 2024 Google Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/serializer/AvroToProtoSerializerTest.java b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/serializer/AvroToProtoSerializerTest.java index b19d85b5..0eae8eaf 100644 --- a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/serializer/AvroToProtoSerializerTest.java +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/serializer/AvroToProtoSerializerTest.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2023 Google Inc. + * Copyright (C) 2024 Google Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/serializer/BigQuerySchemaProviderTest.java b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/serializer/BigQuerySchemaProviderTest.java index f472683c..f7fdd98f 100644 --- a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/serializer/BigQuerySchemaProviderTest.java +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/serializer/BigQuerySchemaProviderTest.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2023 Google Inc. + * Copyright (C) 2024 Google Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/throttle/WriteStreamCreationThrottlerTest.java b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/throttle/WriteStreamCreationThrottlerTest.java new file mode 100644 index 00000000..7abebf21 --- /dev/null +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/throttle/WriteStreamCreationThrottlerTest.java @@ -0,0 +1,57 @@ +/* + * Copyright 2024 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.flink.bigquery.sink.throttle; + +import org.junit.Test; + +import java.time.Duration; +import java.time.Instant; + +import static org.junit.Assert.assertTrue; + +/** Tests for {@link WriteStreamCreationThrottler}. */ +public class WriteStreamCreationThrottlerTest { + + @Test + public void testThrottle() { + Duration duration = invokeThrottle(3); + assertTrue(duration.toMillis() >= 3000L); + } + + @Test + public void testThrottle_withInterruptedException() { + // Force interruption + Thread.currentThread().interrupt(); + Duration duration = invokeThrottle(3); + assertTrue(duration.toMillis() < 3000L); + } + + @Test + public void testThrottle_withInvalidWriterId_expectNoThrottling() { + Duration duration = invokeThrottle(-1); + long waitSeconds = duration.toMillis() / 1000; + assertTrue(waitSeconds == 0); + } + + private Duration invokeThrottle(int writerId) { + WriteStreamCreationThrottler throttler = new WriteStreamCreationThrottler(writerId); + Instant start = Instant.now(); + throttler.throttle(); + Instant end = Instant.now(); + return Duration.between(start, end); + } +} diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryBufferedWriterTest.java b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryBufferedWriterTest.java new file mode 100644 index 00000000..4fc4ccb0 --- /dev/null +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryBufferedWriterTest.java @@ -0,0 +1,831 @@ +/* + * Copyright 2024 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.flink.bigquery.sink.writer; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse.AppendResult; +import com.google.cloud.bigquery.storage.v1.Exceptions.OffsetAlreadyExists; +import com.google.cloud.bigquery.storage.v1.Exceptions.OffsetOutOfRange; +import com.google.cloud.bigquery.storage.v1.Exceptions.StorageException; +import com.google.cloud.bigquery.storage.v1.Exceptions.StreamFinalizedException; +import com.google.cloud.bigquery.storage.v1.Exceptions.StreamNotFound; +import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamResponse; +import com.google.cloud.bigquery.storage.v1.ProtoRows; +import com.google.cloud.bigquery.storage.v1.StreamWriter; +import com.google.cloud.bigquery.storage.v1.WriteStream; +import com.google.cloud.flink.bigquery.fakes.StorageClientFaker; +import com.google.cloud.flink.bigquery.fakes.StorageClientFaker.FakeBigQueryServices.FakeBigQueryStorageWriteClient; +import com.google.cloud.flink.bigquery.sink.committer.BigQueryCommittable; +import com.google.cloud.flink.bigquery.sink.exceptions.BigQueryConnectorException; +import com.google.cloud.flink.bigquery.sink.exceptions.BigQuerySerializationException; +import com.google.cloud.flink.bigquery.sink.serializer.BigQueryProtoSerializer; +import com.google.cloud.flink.bigquery.sink.serializer.FakeBigQuerySerializer; +import com.google.cloud.flink.bigquery.sink.serializer.TestBigQuerySchemas; +import com.google.protobuf.ByteString; +import com.google.protobuf.Int64Value; +import com.google.rpc.Status; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +/** Tests for {@link BigQueryBufferedWriter}. */ +public class BigQueryBufferedWriterTest { + + MockedStatic streamWriterStaticMock; + + @Before + public void setUp() { + streamWriterStaticMock = Mockito.mockStatic(StreamWriter.class); + streamWriterStaticMock.when(StreamWriter::getApiMaxRequestBytes).thenReturn(10L); + } + + @After + public void tearDown() throws Exception { + streamWriterStaticMock.close(); + } + + @Test + public void testConstructor_withNewWriter() throws IOException { + BigQueryBufferedWriter bufferedWriter = + createBufferedWriter(null, 0L, 0L, 0L, FakeBigQuerySerializer.getEmptySerializer()); + assertNotNull(bufferedWriter); + checkStreamlessWriterAttributes(bufferedWriter); + assertEquals(0, bufferedWriter.totalRecordsSeen); + assertEquals(0, bufferedWriter.totalRecordsWritten); + assertEquals(0, bufferedWriter.getAppendRequestSizeBytes()); + assertEquals(0, bufferedWriter.getProtoRows().getSerializedRowsCount()); + assertTrue(bufferedWriter.getProtoRows().getSerializedRowsList().isEmpty()); + assertTrue(bufferedWriter.getAppendResponseFuturesQueue().isEmpty()); + } + + @Test + public void testConstructor_withRestoredWriter() throws IOException { + BigQueryBufferedWriter bufferedWriter = + createBufferedWriter( + "foo", 100L, 210L, 200L, FakeBigQuerySerializer.getEmptySerializer()); + assertNotNull(bufferedWriter); + assertNull(bufferedWriter.streamWriter); + assertEquals("foo", bufferedWriter.streamName); + assertEquals(100, bufferedWriter.getStreamOffset()); + assertEquals(100, bufferedWriter.getStreamOffsetInState()); + assertEquals(210, bufferedWriter.totalRecordsSeen); + assertEquals(200, bufferedWriter.totalRecordsWritten); + assertEquals(0, bufferedWriter.getAppendRequestSizeBytes()); + assertEquals(0, bufferedWriter.getProtoRows().getSerializedRowsCount()); + assertTrue(bufferedWriter.getProtoRows().getSerializedRowsList().isEmpty()); + assertTrue(bufferedWriter.getAppendResponseFuturesQueue().isEmpty()); + } + + @Test + public void testWrite_withoutAppend() throws IOException { + BigQueryBufferedWriter bufferedWriter = + createBufferedWriter( + null, + 0L, + 0L, + 0L, + new FakeBigQuerySerializer(ByteString.copyFromUtf8("hi"))); + // ByteString for "hi" will be 2 bytes in size, and serialization overhead of 2 will be + // added. + bufferedWriter.write(new Object(), null); + checkStreamlessWriterAttributes(bufferedWriter); + assertEquals(1, bufferedWriter.totalRecordsSeen); + assertEquals(0, bufferedWriter.totalRecordsWritten); + assertEquals(4, bufferedWriter.getAppendRequestSizeBytes()); + assertEquals(1, bufferedWriter.getProtoRows().getSerializedRowsCount()); + assertTrue(bufferedWriter.getAppendResponseFuturesQueue().isEmpty()); + + bufferedWriter.write(new Object(), null); + checkStreamlessWriterAttributes(bufferedWriter); + assertEquals(2, bufferedWriter.totalRecordsSeen); + assertEquals(0, bufferedWriter.totalRecordsWritten); + assertEquals(8, bufferedWriter.getAppendRequestSizeBytes()); + assertEquals(2, bufferedWriter.getProtoRows().getSerializedRowsCount()); + assertTrue(bufferedWriter.getAppendResponseFuturesQueue().isEmpty()); + + ProtoRows protoRows = bufferedWriter.getProtoRows(); + assertEquals(ByteString.copyFromUtf8("hi"), protoRows.getSerializedRowsList().get(0)); + assertEquals(ByteString.copyFromUtf8("hi"), protoRows.getSerializedRowsList().get(1)); + } + + @Test + public void testWrite_withAppend_withNewStream() throws IOException { + BigQueryBufferedWriter bufferedWriter = + createBufferedWriter( + null, + 0L, + 0L, + 0L, + new FakeBigQuerySerializer(ByteString.copyFromUtf8("foobar")), + new ApiFuture[] { + // First append at offset 0. + ApiFutures.immediateFuture(AppendRowsResponse.newBuilder().build()), + // Second append at offset 1. This second offset wont be actually tested + // here. Being pedantic to help the reader understand. + ApiFutures.immediateFuture( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendResult.newBuilder() + .setOffset(Int64Value.of(1)) + .build()) + .build()), + }, + WriteStream.newBuilder().setName("new_stream").build(), + null); + bufferedWriter.write(new Object(), null); + checkStreamlessWriterAttributes(bufferedWriter); + assertEquals(1, bufferedWriter.totalRecordsSeen); + assertEquals(0, bufferedWriter.totalRecordsWritten); + assertEquals(8, bufferedWriter.getAppendRequestSizeBytes()); + assertEquals(1, bufferedWriter.getProtoRows().getSerializedRowsCount()); + assertTrue(bufferedWriter.getAppendResponseFuturesQueue().isEmpty()); + + // Second element will exceed append request's size, so append will be called with + // first element in request. + // Observe the first append behavior when writer does not have an existing stream. + bufferedWriter.write(new Object(), null); + assertEquals( + 1, + ((FakeBigQueryStorageWriteClient) bufferedWriter.writeClient) + .getCreateWriteStreamInvocations()); + assertNotNull(bufferedWriter.streamWriter); + assertEquals("new_stream", bufferedWriter.streamName); + assertEquals(1, bufferedWriter.getStreamOffset()); + assertEquals(2, bufferedWriter.totalRecordsSeen); + // The totalRecordsWritten attribute is incremented after response validation. + assertEquals(0, bufferedWriter.totalRecordsWritten); + // Second element was added to new request. + assertEquals(8, bufferedWriter.getAppendRequestSizeBytes()); + assertEquals(1, bufferedWriter.getProtoRows().getSerializedRowsCount()); + assertEquals(1, bufferedWriter.getAppendResponseFuturesQueue().size()); + + // Third write, second append, first response validation. + bufferedWriter.write(new Object(), null); + assertEquals(2, bufferedWriter.getStreamOffset()); + assertEquals(3, bufferedWriter.totalRecordsSeen); + // Upon successful response validation, totalRecordsWritten is incremented. + assertEquals(1, bufferedWriter.totalRecordsWritten); + // One future was added by latest append, and one was removed for validation. + assertEquals(1, bufferedWriter.getAppendResponseFuturesQueue().size()); + + ((FakeBigQueryStorageWriteClient) bufferedWriter.writeClient) + .verifytAppendWithOffsetInvocations(2); + + // Ensure new write stream was not created again. + assertEquals( + 1, + ((FakeBigQueryStorageWriteClient) bufferedWriter.writeClient) + .getCreateWriteStreamInvocations()); + } + + @Test + public void testWrite_withAppend_withUsableRestoredStream() throws IOException { + BigQueryBufferedWriter bufferedWriter = + createBufferedWriter( + "restored_stream", + 100L, + 210L, + 200L, + new FakeBigQuerySerializer(ByteString.copyFromUtf8("foobar")), + new ApiFuture[] { + ApiFutures.immediateFuture( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendResult.newBuilder() + .setOffset(Int64Value.of(100)) + .build()) + .build()), + ApiFutures.immediateFuture( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendResult.newBuilder() + .setOffset(Int64Value.of(101)) + .build()) + .build()) + }, + null, + null); + bufferedWriter.write(new Object(), null); + assertNull(bufferedWriter.streamWriter); + assertEquals("restored_stream", bufferedWriter.streamName); + assertEquals(100, bufferedWriter.getStreamOffset()); + assertEquals(211, bufferedWriter.totalRecordsSeen); + assertEquals(200, bufferedWriter.totalRecordsWritten); + assertEquals(8, bufferedWriter.getAppendRequestSizeBytes()); + assertEquals(1, bufferedWriter.getProtoRows().getSerializedRowsCount()); + assertTrue(bufferedWriter.getAppendResponseFuturesQueue().isEmpty()); + + // Second element will exceed append request's size, so append will be called with + // first element in request. + // Observe the first append behavior when writer has an existing stream. + bufferedWriter.write(new Object(), null); + assertNotNull(bufferedWriter.streamWriter); + assertEquals(101, bufferedWriter.getStreamOffset()); + assertEquals(212, bufferedWriter.totalRecordsSeen); + assertEquals(200, bufferedWriter.totalRecordsWritten); + assertEquals(8, bufferedWriter.getAppendRequestSizeBytes()); + assertEquals(1, bufferedWriter.getProtoRows().getSerializedRowsCount()); + assertEquals(1, bufferedWriter.getAppendResponseFuturesQueue().size()); + + // Third write, second append, first response validation. + bufferedWriter.write(new Object(), null); + assertEquals(102, bufferedWriter.getStreamOffset()); + assertEquals(213, bufferedWriter.totalRecordsSeen); + assertEquals(201, bufferedWriter.totalRecordsWritten); + assertEquals(1, bufferedWriter.getAppendResponseFuturesQueue().size()); + + ((FakeBigQueryStorageWriteClient) bufferedWriter.writeClient) + .verifytAppendWithOffsetInvocations(2); + + // Existing stream was used, so finalize should not be called. + assertEquals( + 0, + ((FakeBigQueryStorageWriteClient) bufferedWriter.writeClient) + .getFinalizeWriteStreamInvocations()); + // New stream was not created. + assertEquals( + 0, + ((FakeBigQueryStorageWriteClient) bufferedWriter.writeClient) + .getCreateWriteStreamInvocations()); + } + + @Test + public void testFirstAppend_withUnusableRestoredStream() throws IOException { + // This is a rare test where parameterization is needed. However, we are not using standard + // JUnit parameterization techniques to avoid importing a new dependency. Motivation is to + // keep the connector artifact as small as possible. + for (StorageException storageException : + Arrays.asList( + Mockito.mock(OffsetAlreadyExists.class), + Mockito.mock(OffsetOutOfRange.class), + Mockito.mock(StreamFinalizedException.class), + Mockito.mock(StreamNotFound.class))) { + BigQueryBufferedWriter bufferedWriter = + createBufferedWriter( + "restored_stream", + 100L, + 210L, + 200L, + new FakeBigQuerySerializer(ByteString.copyFromUtf8("foobar")), + new ApiFuture[] { + ApiFutures.immediateFailedFuture(storageException), + ApiFutures.immediateFuture(AppendRowsResponse.newBuilder().build()), + ApiFutures.immediateFuture( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendResult.newBuilder() + .setOffset(Int64Value.of(1)) + .build())) + }, + WriteStream.newBuilder().setName("new_stream").build(), + FinalizeWriteStreamResponse.getDefaultInstance()); + bufferedWriter.write(new Object(), null); + assertEquals("restored_stream", bufferedWriter.streamName); + assertEquals(100, bufferedWriter.getStreamOffset()); + assertEquals(211, bufferedWriter.totalRecordsSeen); + assertEquals(200, bufferedWriter.totalRecordsWritten); + assertEquals(8, bufferedWriter.getAppendRequestSizeBytes()); + assertEquals(1, bufferedWriter.getProtoRows().getSerializedRowsCount()); + assertTrue(bufferedWriter.getAppendResponseFuturesQueue().isEmpty()); + + bufferedWriter.write(new Object(), null); + // Existing stream was finalized. + assertEquals( + 1, + ((FakeBigQueryStorageWriteClient) bufferedWriter.writeClient) + .getFinalizeWriteStreamInvocations()); + // New stream was created. + assertEquals( + 1, + ((FakeBigQueryStorageWriteClient) bufferedWriter.writeClient) + .getCreateWriteStreamInvocations()); + assertEquals("new_stream", bufferedWriter.streamName); + assertEquals(1, bufferedWriter.getStreamOffset()); + assertEquals(212, bufferedWriter.totalRecordsSeen); + assertEquals(200, bufferedWriter.totalRecordsWritten); + assertEquals(8, bufferedWriter.getAppendRequestSizeBytes()); + assertEquals(1, bufferedWriter.getProtoRows().getSerializedRowsCount()); + assertEquals(1, bufferedWriter.getAppendResponseFuturesQueue().size()); + + bufferedWriter.write(new Object(), null); + assertEquals(2, bufferedWriter.getStreamOffset()); + assertEquals(213, bufferedWriter.totalRecordsSeen); + assertEquals(201, bufferedWriter.totalRecordsWritten); + assertEquals(1, bufferedWriter.getAppendResponseFuturesQueue().size()); + + // First invocation on existing stream, which failed. + // Second invocation on appending same request to new stream. + // Third invocation for appending second request on new stream. + ((FakeBigQueryStorageWriteClient) bufferedWriter.writeClient) + .verifytAppendWithOffsetInvocations(3); + + // Ensure finalize or new stream creation were not invoked again. + assertEquals( + 1, + ((FakeBigQueryStorageWriteClient) bufferedWriter.writeClient) + .getFinalizeWriteStreamInvocations()); + assertEquals( + 1, + ((FakeBigQueryStorageWriteClient) bufferedWriter.writeClient) + .getCreateWriteStreamInvocations()); + } + } + + @Test(expected = BigQueryConnectorException.class) + public void testFirstAppend_withUnusableRestoredStream_withUnexpectedError() + throws IOException { + BigQueryBufferedWriter bufferedWriter = + createBufferedWriter( + "restored_stream", + 100L, + 210L, + 200L, + new FakeBigQuerySerializer(ByteString.copyFromUtf8("foobar")), + new ApiFuture[] {ApiFutures.immediateFailedFuture(new RuntimeException())}, + WriteStream.newBuilder().setName("new_stream").build(), + FinalizeWriteStreamResponse.getDefaultInstance()); + bufferedWriter.write(new Object(), null); + assertNull(bufferedWriter.streamWriter); + assertEquals("restored_stream", bufferedWriter.streamName); + assertEquals(100, bufferedWriter.getStreamOffset()); + assertEquals(211, bufferedWriter.totalRecordsSeen); + assertEquals(200, bufferedWriter.totalRecordsWritten); + assertEquals(8, bufferedWriter.getAppendRequestSizeBytes()); + assertEquals(1, bufferedWriter.getProtoRows().getSerializedRowsCount()); + assertTrue(bufferedWriter.getAppendResponseFuturesQueue().isEmpty()); + bufferedWriter.write(new Object(), null); + } + + @Test + public void testValidateAppendResponse_withOffsetAlreadyExists() throws IOException { + BigQueryBufferedWriter bufferedWriter = + createBufferedWriter( + null, 0L, 10L, 0L, FakeBigQuerySerializer.getEmptySerializer()); + bufferedWriter.validateAppendResponse( + new BigQueryDefaultWriter.AppendInfo( + ApiFutures.immediateFailedFuture(mock(OffsetAlreadyExists.class)), 0L, 0L)); + // OffsetAlreadyExists is ignored and validation ends successfully. + } + + @Test(expected = BigQueryConnectorException.class) + public void testValidateAppendResponse_withResponseError() throws IOException { + BigQueryBufferedWriter bufferedWriter = + createBufferedWriter( + null, 0L, 10L, 0L, FakeBigQuerySerializer.getEmptySerializer()); + bufferedWriter.validateAppendResponse( + new BigQueryDefaultWriter.AppendInfo( + ApiFutures.immediateFuture( + AppendRowsResponse.newBuilder() + .setError(Status.newBuilder().setCode(4).build()) + .build()), + 0L, + 10L)); + } + + @Test(expected = BigQueryConnectorException.class) + public void testValidateAppendResponse_withOffsetMismatch() throws IOException { + BigQueryBufferedWriter bufferedWriter = + createBufferedWriter( + null, 0L, 10L, 0L, FakeBigQuerySerializer.getEmptySerializer()); + bufferedWriter.validateAppendResponse( + new BigQueryDefaultWriter.AppendInfo( + ApiFutures.immediateFuture( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendResult.newBuilder() + .setOffset(Int64Value.of(10)) + .build()) + .build()), + 0L, + 10L)); + } + + @Test(expected = BigQueryConnectorException.class) + public void testValidateAppendResponse_withUnexpectedError() throws IOException { + BigQueryBufferedWriter bufferedWriter = + createBufferedWriter( + null, 0L, 10L, 0L, FakeBigQuerySerializer.getEmptySerializer()); + bufferedWriter.validateAppendResponse( + new BigQueryDefaultWriter.AppendInfo( + ApiFutures.immediateFailedFuture(mock(OffsetOutOfRange.class)), 0L, 0L)); + } + + @Test + public void testFlush() throws IOException { + BigQueryBufferedWriter bufferedWriter = + createBufferedWriter( + null, + 0L, + 0L, + 0L, + new FakeBigQuerySerializer(ByteString.copyFromUtf8("foobar")), + new ApiFuture[] { + ApiFutures.immediateFuture(AppendRowsResponse.newBuilder().build()), + ApiFutures.immediateFuture( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendResult.newBuilder() + .setOffset(Int64Value.of(1)) + .build()) + .build()), + }, + WriteStream.newBuilder().setName("new_stream").build(), + null); + bufferedWriter.write(new Object(), null); + assertEquals(1, bufferedWriter.totalRecordsSeen); + assertEquals(0, bufferedWriter.totalRecordsWritten); + assertEquals(1, bufferedWriter.getProtoRows().getSerializedRowsCount()); + assertTrue(bufferedWriter.getAppendResponseFuturesQueue().isEmpty()); + bufferedWriter.write(new Object(), null); + // AppendRows invoked, response future stored. + assertEquals(2, bufferedWriter.totalRecordsSeen); + assertEquals(0, bufferedWriter.totalRecordsWritten); + assertEquals(1, bufferedWriter.getProtoRows().getSerializedRowsCount()); + assertEquals(1, bufferedWriter.getAppendResponseFuturesQueue().size()); + // Flush will send append request for pending records, and validate all pending append + // responses. + bufferedWriter.flush(false); + assertEquals(2, bufferedWriter.totalRecordsSeen); + assertEquals(2, bufferedWriter.totalRecordsWritten); + assertEquals(0, bufferedWriter.getProtoRows().getSerializedRowsCount()); + assertTrue(bufferedWriter.getAppendResponseFuturesQueue().isEmpty()); + + ((FakeBigQueryStorageWriteClient) bufferedWriter.writeClient) + .verifytAppendWithOffsetInvocations(2); + } + + @Test + public void testPrepareCommit_withAppends() throws IOException, InterruptedException { + BigQueryBufferedWriter bufferedWriter = + createBufferedWriter( + null, + 0L, + 0L, + 0L, + new FakeBigQuerySerializer(ByteString.copyFromUtf8("foobar")), + new ApiFuture[] { + ApiFutures.immediateFuture(AppendRowsResponse.newBuilder().build()), + ApiFutures.immediateFuture( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendResult.newBuilder() + .setOffset(Int64Value.of(1)) + .build()) + .build()), + ApiFutures.immediateFuture( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendResult.newBuilder() + .setOffset(Int64Value.of(2)) + .build()) + .build()), + }, + WriteStream.newBuilder().setName("new_stream").build(), + null); + bufferedWriter.write(new Object(), null); + bufferedWriter.write(new Object(), null); + bufferedWriter.write(new Object(), null); + bufferedWriter.flush(false); + Collection committables = bufferedWriter.prepareCommit(); + assertEquals(1, committables.size()); + BigQueryCommittable committable = (BigQueryCommittable) committables.toArray()[0]; + assertEquals(1, committable.getProducerId()); + assertEquals("new_stream", committable.getStreamName()); + assertEquals(3, committable.getStreamOffset()); + } + + @Test + public void testPrepareCommit_withoutAppends() throws IOException, InterruptedException { + BigQueryBufferedWriter bufferedWriter = + createBufferedWriter( + null, + 0L, + 0L, + 0L, + new FakeBigQuerySerializer(ByteString.copyFromUtf8("foobar")), + new ApiFuture[] {}, + null, + null); + // No writes. + bufferedWriter.flush(false); + Collection committables = bufferedWriter.prepareCommit(); + assertTrue(committables.isEmpty()); + } + + @Test + public void testSnapshotState_withNewWriter() throws IOException { + BigQueryBufferedWriter bufferedWriter = + createBufferedWriter( + null, + 0L, + 0L, + 0L, + new FakeBigQuerySerializer(ByteString.copyFromUtf8("foobar")), + new ApiFuture[] { + ApiFutures.immediateFuture(AppendRowsResponse.newBuilder().build()), + ApiFutures.immediateFuture( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendResult.newBuilder() + .setOffset(Int64Value.of(1)) + .build()) + .build()), + ApiFutures.immediateFuture( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendResult.newBuilder() + .setOffset(Int64Value.of(2)) + .build()) + .build()) + }, + WriteStream.newBuilder().setName("new_stream").build(), + null); + bufferedWriter.write(new Object(), null); + bufferedWriter.write(new Object(), null); + bufferedWriter.write(new Object(), null); + bufferedWriter.flush(false); + assertEquals("", bufferedWriter.getStreamNameInState()); + assertEquals(0, bufferedWriter.getStreamOffsetInState()); + Collection writerStates = bufferedWriter.snapshotState(1); + BigQueryWriterState writerState = (BigQueryWriterState) writerStates.toArray()[0]; + assertEquals(1, writerStates.size()); + assertEquals("new_stream", writerState.getStreamName()); + assertEquals(3, writerState.getTotalRecordsSeen()); + assertEquals(3, writerState.getTotalRecordsWritten()); + assertEquals("new_stream", bufferedWriter.getStreamNameInState()); + assertEquals(3, bufferedWriter.getStreamOffsetInState()); + } + + @Test + public void testSnapshotState_withRestoredWriter_withUsableStream() throws IOException { + BigQueryBufferedWriter bufferedWriter = + createBufferedWriter( + "restored_stream", + 100L, + 210L, + 200L, + new FakeBigQuerySerializer(ByteString.copyFromUtf8("foobar")), + new ApiFuture[] { + ApiFutures.immediateFuture( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendResult.newBuilder() + .setOffset(Int64Value.of(100)) + .build()) + .build()), + ApiFutures.immediateFuture( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendResult.newBuilder() + .setOffset(Int64Value.of(101)) + .build()) + .build()), + ApiFutures.immediateFuture( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendResult.newBuilder() + .setOffset(Int64Value.of(102)) + .build()) + .build()) + }, + null, + null); + bufferedWriter.write(new Object(), null); + bufferedWriter.write(new Object(), null); + bufferedWriter.write(new Object(), null); + bufferedWriter.flush(false); + assertEquals("restored_stream", bufferedWriter.getStreamNameInState()); + assertEquals(100, bufferedWriter.getStreamOffsetInState()); + Collection writerStates = bufferedWriter.snapshotState(1); + BigQueryWriterState writerState = (BigQueryWriterState) writerStates.toArray()[0]; + assertEquals(1, writerStates.size()); + assertEquals("restored_stream", writerState.getStreamName()); + assertEquals(213, writerState.getTotalRecordsSeen()); + assertEquals(203, writerState.getTotalRecordsWritten()); + assertEquals("restored_stream", bufferedWriter.getStreamNameInState()); + assertEquals(103, bufferedWriter.getStreamOffsetInState()); + } + + @Test + public void testSnapshotState_withRestoredWriter_withUnusableStream() throws IOException { + BigQueryBufferedWriter bufferedWriter = + createBufferedWriter( + "restored_stream", + 100L, + 210L, + 200L, + new FakeBigQuerySerializer(ByteString.copyFromUtf8("foobar")), + new ApiFuture[] { + ApiFutures.immediateFailedFuture(mock(StreamFinalizedException.class)), + ApiFutures.immediateFuture(AppendRowsResponse.newBuilder().build()), + ApiFutures.immediateFuture( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendResult.newBuilder() + .setOffset(Int64Value.of(1)) + .build()) + .build()), + ApiFutures.immediateFuture( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendResult.newBuilder() + .setOffset(Int64Value.of(2)) + .build()) + .build()) + }, + WriteStream.newBuilder().setName("new_stream").build(), + null); + bufferedWriter.write(new Object(), null); + bufferedWriter.write(new Object(), null); + bufferedWriter.write(new Object(), null); + bufferedWriter.flush(false); + assertEquals("restored_stream", bufferedWriter.getStreamNameInState()); + assertEquals(100, bufferedWriter.getStreamOffsetInState()); + Collection writerStates = bufferedWriter.snapshotState(1); + BigQueryWriterState writerState = (BigQueryWriterState) writerStates.toArray()[0]; + assertEquals(1, writerStates.size()); + assertEquals("new_stream", writerState.getStreamName()); + assertEquals(213, writerState.getTotalRecordsSeen()); + assertEquals(203, writerState.getTotalRecordsWritten()); + assertEquals("new_stream", bufferedWriter.getStreamNameInState()); + assertEquals(3, bufferedWriter.getStreamOffsetInState()); + } + + @Test + public void testClose_withStreamFinalize() throws IOException { + BigQueryBufferedWriter bufferedWriter = + createBufferedWriter( + null, + 0L, + 0L, + 0L, + new FakeBigQuerySerializer(ByteString.copyFromUtf8("foobar")), + new ApiFuture[] { + ApiFutures.immediateFuture(AppendRowsResponse.newBuilder().build()) + }, + WriteStream.newBuilder().setName("new_stream").build(), + null); + bufferedWriter.write(new Object(), null); + assertNull(bufferedWriter.streamWriter); + bufferedWriter.write(new Object(), null); + assertEquals(1, bufferedWriter.getProtoRows().getSerializedRowsCount()); + assertEquals(1, bufferedWriter.getAppendResponseFuturesQueue().size()); + assertFalse(bufferedWriter.streamWriter.isUserClosed()); + bufferedWriter.close(); + assertEquals(0, bufferedWriter.getProtoRows().getSerializedRowsCount()); + assertTrue(bufferedWriter.getAppendResponseFuturesQueue().isEmpty()); + assertTrue(bufferedWriter.streamWriter.isUserClosed()); + assertEquals( + 1, + ((FakeBigQueryStorageWriteClient) bufferedWriter.writeClient) + .getFinalizeWriteStreamInvocations()); + } + + @Test + public void testClose_withoutStreamFinalize() throws IOException { + BigQueryBufferedWriter bufferedWriter = + createBufferedWriter( + null, + 0L, + 0L, + 0L, + new FakeBigQuerySerializer(ByteString.copyFromUtf8("foobar")), + new ApiFuture[] {}, + WriteStream.newBuilder().setName("new_stream").build(), + null); + bufferedWriter.write(new Object(), null); + assertEquals(1, bufferedWriter.getProtoRows().getSerializedRowsCount()); + assertTrue(bufferedWriter.getAppendResponseFuturesQueue().isEmpty()); + assertNull(bufferedWriter.streamWriter); + bufferedWriter.close(); + assertEquals(0, bufferedWriter.getProtoRows().getSerializedRowsCount()); + assertTrue(bufferedWriter.getAppendResponseFuturesQueue().isEmpty()); + assertNull(bufferedWriter.streamWriter); + assertNull(bufferedWriter.writeClient); + } + + @Test + public void testWrite_withSerializationException() throws IOException { + BigQueryBufferedWriter bufferedWriter = + createBufferedWriter( + null, 0L, 10L, 0L, FakeBigQuerySerializer.getErringSerializer()); + assertEquals(0, bufferedWriter.getProtoRows().getSerializedRowsCount()); + // If write experiences a serialization exception, then the element is ignored and no + // action is taken. + bufferedWriter.write(new Object(), null); + assertEquals(0, bufferedWriter.getProtoRows().getSerializedRowsCount()); + assertTrue(bufferedWriter.getAppendResponseFuturesQueue().isEmpty()); + } + + @Test(expected = BigQuerySerializationException.class) + public void testGetProtoRow_withMaxAppendRequestSizeViolation() + throws IOException, BigQuerySerializationException { + BigQueryBufferedWriter bufferedWriter = + createBufferedWriter( + null, + 0L, + 10L, + 0L, + new FakeBigQuerySerializer(ByteString.copyFromUtf8("foobarbazqux"))); + // The serializer.serialize method will return ByteString with 14 bytes, exceeding the + // maximum request size, causing getProtoRow method to throw + // BigQuerySerializationException. + bufferedWriter.getProtoRow(new Object()); + } + + @Test + public void testWrite_withLargeElement() throws IOException { + BigQueryBufferedWriter bufferedWriter = + createBufferedWriter( + null, + 0L, + 10L, + 0L, + new FakeBigQuerySerializer(ByteString.copyFromUtf8("foobarbazqux"))); + assertEquals(0, bufferedWriter.getProtoRows().getSerializedRowsCount()); + // This will add 14 bytes to append request, which exceeds the maximum request size, + // leading to the element being ignored. + bufferedWriter.write(new Object(), null); + assertEquals(0, bufferedWriter.getProtoRows().getSerializedRowsCount()); + assertTrue(bufferedWriter.getAppendResponseFuturesQueue().isEmpty()); + } + + private BigQueryBufferedWriter createBufferedWriter( + String streamName, + long streamOffset, + long totalRecordsSeen, + long totalRecordsWritten, + BigQueryProtoSerializer mockSerializer) + throws IOException { + return new BigQueryBufferedWriter( + 1, + streamName, + streamOffset, + "/projects/project/datasets/dataset/tables/table", + totalRecordsSeen, + totalRecordsWritten, + StorageClientFaker.createConnectOptionsForWrite(null), + TestBigQuerySchemas.getSimpleRecordSchema(), + mockSerializer); + } + + private BigQueryBufferedWriter createBufferedWriter( + String streamName, + long streamOffset, + long totalRecordsSeen, + long totalRecordsWritten, + BigQueryProtoSerializer mockSerializer, + ApiFuture[] appendResponseFutures, + WriteStream writeStream, + FinalizeWriteStreamResponse finalizeResponse) + throws IOException { + return new BigQueryBufferedWriter( + 1, + streamName, + streamOffset, + "/projects/project/datasets/dataset/tables/table", + totalRecordsSeen, + totalRecordsWritten, + StorageClientFaker.createConnectOptionsForWrite( + appendResponseFutures, writeStream, null, finalizeResponse), + TestBigQuerySchemas.getSimpleRecordSchema(), + mockSerializer); + } + + private void checkStreamlessWriterAttributes(BigQueryBufferedWriter bufferedWriter) { + assertNull(bufferedWriter.streamWriter); + assertEquals("", bufferedWriter.streamName); + assertEquals("", bufferedWriter.getStreamNameInState()); + assertEquals(0, bufferedWriter.getStreamOffset()); + assertEquals(0, bufferedWriter.getStreamOffsetInState()); + } +} diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryDefaultWriterTest.java b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryDefaultWriterTest.java index 3774d23a..b67cc4a4 100644 --- a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryDefaultWriterTest.java +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryDefaultWriterTest.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2023 Google Inc. + * Copyright (C) 2024 Google Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -27,6 +27,8 @@ import com.google.cloud.flink.bigquery.sink.serializer.TestBigQuerySchemas; import com.google.protobuf.ByteString; import com.google.rpc.Status; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import org.mockito.MockedStatic; import org.mockito.Mockito; @@ -42,6 +44,19 @@ /** Tests for {@link BigQueryDefaultWriter}. */ public class BigQueryDefaultWriterTest { + MockedStatic streamWriterStaticMock; + + @Before + public void setUp() { + streamWriterStaticMock = Mockito.mockStatic(StreamWriter.class); + streamWriterStaticMock.when(StreamWriter::getApiMaxRequestBytes).thenReturn(10L); + } + + @After + public void tearDown() throws Exception { + streamWriterStaticMock.close(); + } + @Test public void testConstructor() throws IOException { BigQueryDefaultWriter defaultWriter = @@ -51,6 +66,8 @@ public void testConstructor() throws IOException { assertEquals( "/projects/project/datasets/dataset/tables/table/streams/_default", defaultWriter.streamName); + assertEquals(0, defaultWriter.totalRecordsSeen); + assertEquals(0, defaultWriter.totalRecordsWritten); assertEquals(0, defaultWriter.getAppendRequestSizeBytes()); assertEquals(0, defaultWriter.getProtoRows().getSerializedRowsCount()); assertTrue(defaultWriter.getProtoRows().getSerializedRowsList().isEmpty()); @@ -58,13 +75,15 @@ public void testConstructor() throws IOException { } @Test - public void testWriteWithoutAppend() throws IOException { + public void testWrite_withoutAppend() throws IOException { BigQueryDefaultWriter defaultWriter = createDefaultWriter( new FakeBigQuerySerializer(ByteString.copyFromUtf8("foo")), null); // ByteString for "foo" will be 3 bytes in size, and serialization overhead of 2 will be // added. defaultWriter.write(new Object(), null); + assertEquals(1, defaultWriter.totalRecordsSeen); + assertEquals(0, defaultWriter.totalRecordsWritten); assertEquals(5, defaultWriter.getAppendRequestSizeBytes()); assertEquals(1, defaultWriter.getProtoRows().getSerializedRowsCount()); assertEquals( @@ -75,93 +94,100 @@ public void testWriteWithoutAppend() throws IOException { @Test public void testAppend() throws IOException { - try (MockedStatic streamWriterStaticMock = - Mockito.mockStatic(StreamWriter.class)) { - streamWriterStaticMock.when(StreamWriter::getApiMaxRequestBytes).thenReturn(10L); - BigQueryDefaultWriter defaultWriter = - createDefaultWriter( - new FakeBigQuerySerializer(ByteString.copyFromUtf8("foobar")), - AppendRowsResponse.newBuilder().build()); - // First element will be added to append request. - defaultWriter.write(new Object(), null); - assertEquals(8, defaultWriter.getAppendRequestSizeBytes()); - assertEquals(1, defaultWriter.getProtoRows().getSerializedRowsCount()); - assertTrue(defaultWriter.getAppendResponseFuturesQueue().isEmpty()); - // Invoke append and verify request reset. - defaultWriter.append(); - assertEquals(0, defaultWriter.getAppendRequestSizeBytes()); - assertEquals(0, defaultWriter.getProtoRows().getSerializedRowsCount()); - assertEquals(1, defaultWriter.getAppendResponseFuturesQueue().size()); - } + BigQueryDefaultWriter defaultWriter = + createDefaultWriter( + new FakeBigQuerySerializer(ByteString.copyFromUtf8("foobar")), + AppendRowsResponse.newBuilder().build()); + // First element will be added to append request. + defaultWriter.write(new Object(), null); + assertEquals(1, defaultWriter.totalRecordsSeen); + assertEquals(0, defaultWriter.totalRecordsWritten); + assertEquals(8, defaultWriter.getAppendRequestSizeBytes()); + assertEquals(1, defaultWriter.getProtoRows().getSerializedRowsCount()); + assertTrue(defaultWriter.getAppendResponseFuturesQueue().isEmpty()); + // Invoke append and verify request reset. + defaultWriter.append(); + assertEquals(1, defaultWriter.totalRecordsSeen); + // The totalRecordsWritten attribute is incremented after response validation. + assertEquals(0, defaultWriter.totalRecordsWritten); + assertEquals(0, defaultWriter.getAppendRequestSizeBytes()); + assertEquals(0, defaultWriter.getProtoRows().getSerializedRowsCount()); + assertEquals(1, defaultWriter.getAppendResponseFuturesQueue().size()); } @Test - public void testWriteWithAppend() throws IOException { - try (MockedStatic streamWriterStaticMock = - Mockito.mockStatic(StreamWriter.class)) { - streamWriterStaticMock.when(StreamWriter::getApiMaxRequestBytes).thenReturn(10L); - BigQueryDefaultWriter defaultWriter = - createDefaultWriter( - new FakeBigQuerySerializer(ByteString.copyFromUtf8("foobar")), - AppendRowsResponse.newBuilder().build()); - defaultWriter.write(new Object(), null); - assertTrue(defaultWriter.getAppendResponseFuturesQueue().isEmpty()); - // Second element will exceed append request's size, so append will be called with - // first element in request. - defaultWriter.write(new Object(), null); - assertEquals(1, defaultWriter.getAppendResponseFuturesQueue().size()); - } + public void testWrite_withAppend() throws IOException { + BigQueryDefaultWriter defaultWriter = + createDefaultWriter( + new FakeBigQuerySerializer(ByteString.copyFromUtf8("foobar")), + AppendRowsResponse.newBuilder().build()); + defaultWriter.write(new Object(), null); + assertEquals(1, defaultWriter.totalRecordsSeen); + assertEquals(0, defaultWriter.totalRecordsWritten); + assertTrue(defaultWriter.getAppendResponseFuturesQueue().isEmpty()); + // Second element will exceed append request's size, so append will be called with + // first element in request. + defaultWriter.write(new Object(), null); + assertEquals(2, defaultWriter.totalRecordsSeen); + assertEquals(0, defaultWriter.totalRecordsWritten); + assertEquals(1, defaultWriter.getAppendResponseFuturesQueue().size()); + // Third element will again exceed append request's size, so append will be called with + // second element in request. Response future from first AppendRows request will be + // validated, incrementing totalRecordsWritten. + defaultWriter.write(new Object(), null); + assertEquals(3, defaultWriter.totalRecordsSeen); + assertEquals(1, defaultWriter.totalRecordsWritten); + assertEquals(1, defaultWriter.getAppendResponseFuturesQueue().size()); } @Test public void testFlush() throws IOException { - try (MockedStatic streamWriterStaticMock = - Mockito.mockStatic(StreamWriter.class)) { - streamWriterStaticMock.when(StreamWriter::getApiMaxRequestBytes).thenReturn(10L); - BigQueryDefaultWriter defaultWriter = - createDefaultWriter( - new FakeBigQuerySerializer(ByteString.copyFromUtf8("foobar")), - AppendRowsResponse.newBuilder().build()); - defaultWriter.write(new Object(), null); - assertEquals(1, defaultWriter.getProtoRows().getSerializedRowsCount()); - assertTrue(defaultWriter.getAppendResponseFuturesQueue().isEmpty()); - defaultWriter.write(new Object(), null); - assertEquals(1, defaultWriter.getProtoRows().getSerializedRowsCount()); - assertEquals(1, defaultWriter.getAppendResponseFuturesQueue().size()); - // Flush will send append request for pending records, and validate all pending append - // responses. - defaultWriter.flush(false); - assertEquals(0, defaultWriter.getProtoRows().getSerializedRowsCount()); - assertTrue(defaultWriter.getAppendResponseFuturesQueue().isEmpty()); - } + BigQueryDefaultWriter defaultWriter = + createDefaultWriter( + new FakeBigQuerySerializer(ByteString.copyFromUtf8("foobar")), + AppendRowsResponse.newBuilder().build()); + defaultWriter.write(new Object(), null); + assertEquals(1, defaultWriter.totalRecordsSeen); + assertEquals(0, defaultWriter.totalRecordsWritten); + assertEquals(1, defaultWriter.getProtoRows().getSerializedRowsCount()); + assertTrue(defaultWriter.getAppendResponseFuturesQueue().isEmpty()); + defaultWriter.write(new Object(), null); + // AppendRows invoked, response future stored. + assertEquals(2, defaultWriter.totalRecordsSeen); + assertEquals(0, defaultWriter.totalRecordsWritten); + assertEquals(1, defaultWriter.getProtoRows().getSerializedRowsCount()); + assertEquals(1, defaultWriter.getAppendResponseFuturesQueue().size()); + // Flush will send append request for pending records, and validate all pending append + // responses. + defaultWriter.flush(false); + assertEquals(2, defaultWriter.totalRecordsSeen); + assertEquals(2, defaultWriter.totalRecordsWritten); + assertEquals(0, defaultWriter.getProtoRows().getSerializedRowsCount()); + assertTrue(defaultWriter.getAppendResponseFuturesQueue().isEmpty()); } @Test public void testClose() throws IOException { - try (MockedStatic streamWriterStaticMock = - Mockito.mockStatic(StreamWriter.class)) { - streamWriterStaticMock.when(StreamWriter::getApiMaxRequestBytes).thenReturn(10L); - BigQueryDefaultWriter defaultWriter = - createDefaultWriter( - new FakeBigQuerySerializer(ByteString.copyFromUtf8("foobar")), - AppendRowsResponse.newBuilder().build()); - defaultWriter.write(new Object(), null); - assertEquals(1, defaultWriter.getProtoRows().getSerializedRowsCount()); - assertTrue(defaultWriter.getAppendResponseFuturesQueue().isEmpty()); - assertNull(defaultWriter.streamWriter); - defaultWriter.write(new Object(), null); - assertEquals(1, defaultWriter.getProtoRows().getSerializedRowsCount()); - assertEquals(1, defaultWriter.getAppendResponseFuturesQueue().size()); - assertFalse(defaultWriter.streamWriter.isUserClosed()); - defaultWriter.close(); - assertEquals(0, defaultWriter.getProtoRows().getSerializedRowsCount()); - assertTrue(defaultWriter.getAppendResponseFuturesQueue().isEmpty()); - assertTrue(defaultWriter.streamWriter.isUserClosed()); - } + BigQueryDefaultWriter defaultWriter = + createDefaultWriter( + new FakeBigQuerySerializer(ByteString.copyFromUtf8("foobar")), + AppendRowsResponse.newBuilder().build()); + defaultWriter.write(new Object(), null); + assertEquals(1, defaultWriter.getProtoRows().getSerializedRowsCount()); + assertTrue(defaultWriter.getAppendResponseFuturesQueue().isEmpty()); + assertNull(defaultWriter.streamWriter); + defaultWriter.write(new Object(), null); + assertEquals(1, defaultWriter.getProtoRows().getSerializedRowsCount()); + assertEquals(1, defaultWriter.getAppendResponseFuturesQueue().size()); + assertFalse(defaultWriter.streamWriter.isUserClosed()); + defaultWriter.close(); + assertEquals(0, defaultWriter.getProtoRows().getSerializedRowsCount()); + assertTrue(defaultWriter.getAppendResponseFuturesQueue().isEmpty()); + assertTrue(defaultWriter.streamWriter.isUserClosed()); } @Test - public void testWriteWithSerializationException() throws IOException { + public void testWrite_withSerializationException() throws IOException { BigQueryDefaultWriter defaultWriter = createDefaultWriter(FakeBigQuerySerializer.getErringSerializer(), null); assertEquals(0, defaultWriter.getProtoRows().getSerializedRowsCount()); @@ -173,46 +199,51 @@ public void testWriteWithSerializationException() throws IOException { } @Test(expected = BigQuerySerializationException.class) - public void testMaxAppendRequestSizeViolation() + public void testGetProtoRow_withMaxAppendRequestSizeViolation() throws IOException, BigQuerySerializationException { - try (MockedStatic streamWriterStaticMock = - Mockito.mockStatic(StreamWriter.class)) { - streamWriterStaticMock.when(StreamWriter::getApiMaxRequestBytes).thenReturn(10L); - BigQueryDefaultWriter defaultWriter = - createDefaultWriter( - new FakeBigQuerySerializer(ByteString.copyFromUtf8("foobarbazqux")), - AppendRowsResponse.newBuilder().build()); - defaultWriter.getProtoRow(new Object()); - } + BigQueryDefaultWriter defaultWriter = + createDefaultWriter( + new FakeBigQuerySerializer(ByteString.copyFromUtf8("foobarbazqux")), null); + // The serializer.serialize method will return ByteString with 14 bytes, exceeding the + // maximum request size, causing getProtoRow method to throw + // BigQuerySerializationException. + defaultWriter.getProtoRow(new Object()); } @Test - public void testWriteWithLargeElement() throws IOException { - try (MockedStatic streamWriterStaticMock = - Mockito.mockStatic(StreamWriter.class)) { - streamWriterStaticMock.when(StreamWriter::getApiMaxRequestBytes).thenReturn(10L); - BigQueryDefaultWriter defaultWriter = - createDefaultWriter( - new FakeBigQuerySerializer(ByteString.copyFromUtf8("foobarbazqux")), - AppendRowsResponse.newBuilder().build()); - assertEquals(0, defaultWriter.getProtoRows().getSerializedRowsCount()); - // This will add 14 bytes to append request but maximum request size is 5, leading to - // the element being ignored. - defaultWriter.write(new Object(), null); - assertEquals(0, defaultWriter.getProtoRows().getSerializedRowsCount()); - assertTrue(defaultWriter.getAppendResponseFuturesQueue().isEmpty()); - } + public void testWrite_withLargeElement() throws IOException { + BigQueryDefaultWriter defaultWriter = + createDefaultWriter( + new FakeBigQuerySerializer(ByteString.copyFromUtf8("foobarbazqux")), null); + assertEquals(0, defaultWriter.getProtoRows().getSerializedRowsCount()); + // This will add 14 bytes to append request, which exceeds the maximum request size, + // leading to the element being ignored. + defaultWriter.write(new Object(), null); + assertEquals(0, defaultWriter.getProtoRows().getSerializedRowsCount()); + assertTrue(defaultWriter.getAppendResponseFuturesQueue().isEmpty()); + } + + @Test(expected = BigQueryConnectorException.class) + public void testValidateAppendResponse_withResponseError() throws IOException { + BigQueryDefaultWriter defaultWriter = + createDefaultWriter(FakeBigQuerySerializer.getEmptySerializer(), null); + defaultWriter.validateAppendResponse( + new BigQueryDefaultWriter.AppendInfo( + ApiFutures.immediateFuture( + AppendRowsResponse.newBuilder() + .setError(Status.newBuilder().setCode(4).build()) + .build()), + -1L, + 10L)); } @Test(expected = BigQueryConnectorException.class) - public void testResponseValidationError() throws IOException { + public void testValidateAppendResponse_withExecutionException() throws IOException { BigQueryDefaultWriter defaultWriter = createDefaultWriter(FakeBigQuerySerializer.getEmptySerializer(), null); defaultWriter.validateAppendResponse( - ApiFutures.immediateFuture( - AppendRowsResponse.newBuilder() - .setError(Status.newBuilder().setCode(4).build()) - .build())); + new BigQueryDefaultWriter.AppendInfo( + ApiFutures.immediateFailedFuture(new RuntimeException("foo")), -1L, 10L)); } private BigQueryDefaultWriter createDefaultWriter( @@ -220,9 +251,9 @@ private BigQueryDefaultWriter createDefaultWriter( throws IOException { return new BigQueryDefaultWriter( 0, + "/projects/project/datasets/dataset/tables/table", StorageClientFaker.createConnectOptionsForWrite(appendResponse), TestBigQuerySchemas.getSimpleRecordSchema(), - mockSerializer, - "/projects/project/datasets/dataset/tables/table"); + mockSerializer); } } diff --git a/flink-connector-bigquery-common/src/test/java/com/google/cloud/flink/bigquery/services/BigQueryServicesTest.java b/flink-connector-bigquery-common/src/test/java/com/google/cloud/flink/bigquery/services/BigQueryServicesTest.java index d1e4822a..b1f2b32c 100644 --- a/flink-connector-bigquery-common/src/test/java/com/google/cloud/flink/bigquery/services/BigQueryServicesTest.java +++ b/flink-connector-bigquery-common/src/test/java/com/google/cloud/flink/bigquery/services/BigQueryServicesTest.java @@ -26,7 +26,7 @@ import static com.google.common.truth.Truth.assertThat; -/** */ +/** Tests for {@link BigQueryServices}. */ public class BigQueryServicesTest { @Test public void testFactoryWithTestServices() throws IOException {