diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/BigQueryBaseSink.java b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/BigQueryBaseSink.java index 7959cbee..fb891403 100644 --- a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/BigQueryBaseSink.java +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/BigQueryBaseSink.java @@ -20,6 +20,7 @@ import com.google.cloud.bigquery.TimePartitioning; import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions; +import com.google.cloud.flink.bigquery.services.BigQueryServicesImpl; import com.google.cloud.flink.bigquery.sink.client.BigQueryClientWithErrorHandling; import com.google.cloud.flink.bigquery.sink.serializer.BigQueryProtoSerializer; import com.google.cloud.flink.bigquery.sink.serializer.BigQuerySchemaProvider; @@ -54,6 +55,7 @@ abstract class BigQueryBaseSink implements Sink { final Long partitionExpirationMillis; final List clusteredFields; final String region; + final String traceId; BigQueryBaseSink(BigQuerySinkConfig sinkConfig) { validateSinkConfig(sinkConfig); @@ -76,6 +78,7 @@ abstract class BigQueryBaseSink implements Sink { partitionExpirationMillis = sinkConfig.getPartitionExpirationMillis(); clusteredFields = sinkConfig.getClusteredFields(); region = sinkConfig.getRegion(); + traceId = BigQueryServicesImpl.generateTraceId(); } private void validateSinkConfig(BigQuerySinkConfig sinkConfig) { diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/BigQueryDefaultSink.java b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/BigQueryDefaultSink.java index a4afa24f..e364f9e4 100644 --- a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/BigQueryDefaultSink.java +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/BigQueryDefaultSink.java @@ -43,6 +43,7 @@ public SinkWriter createWriter(InitContext context) { schemaProvider, serializer, createTableOptions(), + traceId, context); } } diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/BigQueryExactlyOnceSink.java b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/BigQueryExactlyOnceSink.java index 09e4c912..3f42d795 100644 --- a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/BigQueryExactlyOnceSink.java +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/BigQueryExactlyOnceSink.java @@ -56,6 +56,7 @@ public class BigQueryExactlyOnceSink extends BigQueryBaseSink schemaProvider, serializer, createTableOptions(), + traceId, context); } @@ -81,6 +82,7 @@ public class BigQueryExactlyOnceSink extends BigQueryBaseSink schemaProvider, serializer, createTableOptions(), + traceId, context); } diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/writer/BaseWriter.java b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/writer/BaseWriter.java index dd059ef8..6005e319 100644 --- a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/writer/BaseWriter.java +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/writer/BaseWriter.java @@ -98,6 +98,7 @@ abstract class BaseWriter implements SinkWriter { private boolean firstWritePostConstructor = true; final CreateTableOptions createTableOptions; + final String traceId; final Queue appendResponseFuturesQueue; // Initialization of writeClient has been deferred to first append call. BigQuery's best // practices suggest that client connections should be opened when needed. @@ -122,13 +123,15 @@ abstract class BaseWriter implements SinkWriter { BigQueryConnectOptions connectOptions, BigQuerySchemaProvider schemaProvider, BigQueryProtoSerializer serializer, - CreateTableOptions createTableOptions) { + CreateTableOptions createTableOptions, + String traceId) { this.subtaskId = subtaskId; this.tablePath = tablePath; this.connectOptions = connectOptions; this.schemaProvider = schemaProvider; this.serializer = serializer; this.createTableOptions = createTableOptions; + this.traceId = traceId; appendRequestSizeBytes = 0L; appendResponseFuturesQueue = new LinkedList<>(); protoRowsBuilder = ProtoRows.newBuilder(); @@ -250,7 +253,8 @@ void createStreamWriter(boolean enableConnectionPool) { streamName, subtaskId); streamWriter = - writeClient.createStreamWriter(streamName, protoSchema, enableConnectionPool); + writeClient.createStreamWriter( + streamName, protoSchema, enableConnectionPool, traceId); } catch (IOException e) { logger.error( String.format( diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryBufferedWriter.java b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryBufferedWriter.java index 7790c1ae..65a8ce22 100644 --- a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryBufferedWriter.java +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryBufferedWriter.java @@ -106,6 +106,7 @@ public BigQueryBufferedWriter( BigQuerySchemaProvider schemaProvider, BigQueryProtoSerializer serializer, CreateTableOptions createTableOptions, + String traceId, InitContext context) { this( "", @@ -118,6 +119,7 @@ public BigQueryBufferedWriter( schemaProvider, serializer, createTableOptions, + traceId, context); } @@ -132,6 +134,7 @@ public BigQueryBufferedWriter( BigQuerySchemaProvider schemaProvider, BigQueryProtoSerializer serializer, CreateTableOptions createTableOptions, + String traceId, InitContext context) { super( context.getSubtaskId(), @@ -139,7 +142,8 @@ public BigQueryBufferedWriter( connectOptions, schemaProvider, serializer, - createTableOptions); + createTableOptions, + traceId); this.streamNameInState = StringUtils.isNullOrWhitespaceOnly(streamName) ? "" : streamName; this.streamName = this.streamNameInState; this.streamOffsetInState = streamOffset; diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryDefaultWriter.java b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryDefaultWriter.java index 0b6ba6a6..11961d80 100644 --- a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryDefaultWriter.java +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryDefaultWriter.java @@ -63,6 +63,7 @@ public BigQueryDefaultWriter( BigQuerySchemaProvider schemaProvider, BigQueryProtoSerializer serializer, CreateTableOptions createTableOptions, + String taceId, InitContext context) { super( context.getSubtaskId(), @@ -70,7 +71,8 @@ public BigQueryDefaultWriter( connectOptions, schemaProvider, serializer, - createTableOptions); + createTableOptions, + taceId); streamName = String.format("%s/streams/_default", tablePath); totalRecordsSeen = 0L; totalRecordsWritten = 0L; diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/fakes/StorageClientFaker.java b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/fakes/StorageClientFaker.java index 5df1c5da..efaa9981 100644 --- a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/fakes/StorageClientFaker.java +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/fakes/StorageClientFaker.java @@ -440,7 +440,10 @@ public FakeBigQueryStorageWriteClient( @Override public StreamWriter createStreamWriter( - String streamName, ProtoSchema protoSchema, boolean enableConnectionPool) { + String streamName, + ProtoSchema protoSchema, + boolean enableConnectionPool, + String traceId) { return mockedWriter; } diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryBufferedWriterTest.java b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryBufferedWriterTest.java index 22006b0b..6b0fd6cc 100644 --- a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryBufferedWriterTest.java +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryBufferedWriterTest.java @@ -1149,6 +1149,7 @@ private BigQueryBufferedWriter createBufferedWriter( TestBigQuerySchemas.getSimpleRecordSchema(), mockSerializer, null, + "traceId", context); } @@ -1188,6 +1189,7 @@ private BigQueryBufferedWriter createBufferedWriter( schemaProvider, mockSerializer, createTableOptions, + "traceId", context); } @@ -1222,6 +1224,7 @@ private BigQueryBufferedWriter createBufferedWriter( TestBigQuerySchemas.getSimpleRecordSchema(), mockSerializer, null, + "traceId", context); } diff --git a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryDefaultWriterTest.java b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryDefaultWriterTest.java index 3f3ba853..54a25150 100644 --- a/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryDefaultWriterTest.java +++ b/flink-1.17-connector-bigquery/flink-connector-bigquery/src/test/java/com/google/cloud/flink/bigquery/sink/writer/BigQueryDefaultWriterTest.java @@ -398,6 +398,7 @@ private BigQueryDefaultWriter createDefaultWriter( schemaProvider, mockSerializer, createTableOptions, + "traceId", mockInitContext); } diff --git a/flink-connector-bigquery-common/src/main/java/com/google/cloud/flink/bigquery/services/BigQueryServices.java b/flink-connector-bigquery-common/src/main/java/com/google/cloud/flink/bigquery/services/BigQueryServices.java index a50e0aa7..963f9365 100644 --- a/flink-connector-bigquery-common/src/main/java/com/google/cloud/flink/bigquery/services/BigQueryServices.java +++ b/flink-connector-bigquery-common/src/main/java/com/google/cloud/flink/bigquery/services/BigQueryServices.java @@ -123,11 +123,15 @@ interface StorageWriteClient extends AutoCloseable { * @param streamName the write stream to be used by this writer. * @param protoSchema the schema of the serialized protocol buffer data rows. * @param enableConnectionPool enable BigQuery client multiplexing for this writer. + * @param traceId job identifier. * @return A StreamWriter for a BigQuery storage write stream. * @throws IOException */ StreamWriter createStreamWriter( - String streamName, ProtoSchema protoSchema, boolean enableConnectionPool) + String streamName, + ProtoSchema protoSchema, + boolean enableConnectionPool, + String traceId) throws IOException; /** diff --git a/flink-connector-bigquery-common/src/main/java/com/google/cloud/flink/bigquery/services/BigQueryServicesImpl.java b/flink-connector-bigquery-common/src/main/java/com/google/cloud/flink/bigquery/services/BigQueryServicesImpl.java index 3c43e1ba..31b5af84 100644 --- a/flink-connector-bigquery-common/src/main/java/com/google/cloud/flink/bigquery/services/BigQueryServicesImpl.java +++ b/flink-connector-bigquery-common/src/main/java/com/google/cloud/flink/bigquery/services/BigQueryServicesImpl.java @@ -75,6 +75,7 @@ import java.util.Iterator; import java.util.List; import java.util.Optional; +import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -84,14 +85,15 @@ public class BigQueryServicesImpl implements BigQueryServices { public static final int ALREADY_EXISTS_ERROR_CODE = 409; + public static final String TRACE_ID_FORMAT = "Flink:%s_%s"; + private static final Logger LOG = LoggerFactory.getLogger(BigQueryServicesImpl.class); private static final HeaderProvider USER_AGENT_HEADER_PROVIDER = FixedHeaderProvider.create( "User-Agent", "flink-bigquery-connector/" + FlinkVersion.current().toString()); - public static final String TRACE_ID = - String.format("Flink:%s", FlinkVersion.current().toString()); + private static final String FLINK_VERSION = FlinkVersion.current().toString(); @Override public StorageReadClient createStorageReadClient(CredentialsOptions credentialsOptions) @@ -253,7 +255,10 @@ private StorageWriteClientImpl(CredentialsOptions options) throws IOException { @Override public StreamWriter createStreamWriter( - String streamName, ProtoSchema protoSchema, boolean enableConnectionPool) + String streamName, + ProtoSchema protoSchema, + boolean enableConnectionPool, + String traceId) throws IOException { /** * Enable client lib automatic retries on request level errors. @@ -278,7 +283,7 @@ public StreamWriter createStreamWriter( .build(); return StreamWriter.newBuilder(streamName, client) .setEnableConnectionPool(enableConnectionPool) - .setTraceId(TRACE_ID) + .setTraceId(traceId) .setRetrySettings(retrySettings) .setWriterSchema(protoSchema) .build(); @@ -558,4 +563,8 @@ static List processErrorMessages(List errors) { .collect(Collectors.toList()); } } + + public static final String generateTraceId() { + return String.format(TRACE_ID_FORMAT, FLINK_VERSION, UUID.randomUUID().toString()); + } }