Skip to content

Commit

Permalink
Add UUID to traceId in sink
Browse files Browse the repository at this point in the history
  • Loading branch information
jayehwhyehentee committed Jan 13, 2025
1 parent 299e842 commit dfa5b66
Show file tree
Hide file tree
Showing 11 changed files with 46 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.cloud.bigquery.TimePartitioning;
import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions;
import com.google.cloud.flink.bigquery.services.BigQueryServicesImpl;
import com.google.cloud.flink.bigquery.sink.client.BigQueryClientWithErrorHandling;
import com.google.cloud.flink.bigquery.sink.serializer.BigQueryProtoSerializer;
import com.google.cloud.flink.bigquery.sink.serializer.BigQuerySchemaProvider;
Expand Down Expand Up @@ -54,6 +55,7 @@ abstract class BigQueryBaseSink<IN> implements Sink<IN> {
final Long partitionExpirationMillis;
final List<String> clusteredFields;
final String region;
final String traceId;

BigQueryBaseSink(BigQuerySinkConfig sinkConfig) {
validateSinkConfig(sinkConfig);
Expand All @@ -76,6 +78,7 @@ abstract class BigQueryBaseSink<IN> implements Sink<IN> {
partitionExpirationMillis = sinkConfig.getPartitionExpirationMillis();
clusteredFields = sinkConfig.getClusteredFields();
region = sinkConfig.getRegion();
traceId = BigQueryServicesImpl.generateTraceId();
}

private void validateSinkConfig(BigQuerySinkConfig sinkConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public SinkWriter createWriter(InitContext context) {
schemaProvider,
serializer,
createTableOptions(),
traceId,
context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class BigQueryExactlyOnceSink<IN> extends BigQueryBaseSink<IN>
schemaProvider,
serializer,
createTableOptions(),
traceId,
context);
}

Expand All @@ -81,6 +82,7 @@ public class BigQueryExactlyOnceSink<IN> extends BigQueryBaseSink<IN>
schemaProvider,
serializer,
createTableOptions(),
traceId,
context);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ abstract class BaseWriter<IN> implements SinkWriter<IN> {
private boolean firstWritePostConstructor = true;

final CreateTableOptions createTableOptions;
final String traceId;
final Queue<AppendInfo> appendResponseFuturesQueue;
// Initialization of writeClient has been deferred to first append call. BigQuery's best
// practices suggest that client connections should be opened when needed.
Expand All @@ -122,13 +123,15 @@ abstract class BaseWriter<IN> implements SinkWriter<IN> {
BigQueryConnectOptions connectOptions,
BigQuerySchemaProvider schemaProvider,
BigQueryProtoSerializer serializer,
CreateTableOptions createTableOptions) {
CreateTableOptions createTableOptions,
String traceId) {
this.subtaskId = subtaskId;
this.tablePath = tablePath;
this.connectOptions = connectOptions;
this.schemaProvider = schemaProvider;
this.serializer = serializer;
this.createTableOptions = createTableOptions;
this.traceId = traceId;
appendRequestSizeBytes = 0L;
appendResponseFuturesQueue = new LinkedList<>();
protoRowsBuilder = ProtoRows.newBuilder();
Expand Down Expand Up @@ -250,7 +253,8 @@ void createStreamWriter(boolean enableConnectionPool) {
streamName,
subtaskId);
streamWriter =
writeClient.createStreamWriter(streamName, protoSchema, enableConnectionPool);
writeClient.createStreamWriter(
streamName, protoSchema, enableConnectionPool, traceId);
} catch (IOException e) {
logger.error(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public BigQueryBufferedWriter(
BigQuerySchemaProvider schemaProvider,
BigQueryProtoSerializer serializer,
CreateTableOptions createTableOptions,
String traceId,
InitContext context) {
this(
"",
Expand All @@ -118,6 +119,7 @@ public BigQueryBufferedWriter(
schemaProvider,
serializer,
createTableOptions,
traceId,
context);
}

Expand All @@ -132,14 +134,16 @@ public BigQueryBufferedWriter(
BigQuerySchemaProvider schemaProvider,
BigQueryProtoSerializer serializer,
CreateTableOptions createTableOptions,
String traceId,
InitContext context) {
super(
context.getSubtaskId(),
tablePath,
connectOptions,
schemaProvider,
serializer,
createTableOptions);
createTableOptions,
traceId);
this.streamNameInState = StringUtils.isNullOrWhitespaceOnly(streamName) ? "" : streamName;
this.streamName = this.streamNameInState;
this.streamOffsetInState = streamOffset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,16 @@ public BigQueryDefaultWriter(
BigQuerySchemaProvider schemaProvider,
BigQueryProtoSerializer serializer,
CreateTableOptions createTableOptions,
String taceId,
InitContext context) {
super(
context.getSubtaskId(),
tablePath,
connectOptions,
schemaProvider,
serializer,
createTableOptions);
createTableOptions,
taceId);
streamName = String.format("%s/streams/_default", tablePath);
totalRecordsSeen = 0L;
totalRecordsWritten = 0L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,10 @@ public FakeBigQueryStorageWriteClient(

@Override
public StreamWriter createStreamWriter(
String streamName, ProtoSchema protoSchema, boolean enableConnectionPool) {
String streamName,
ProtoSchema protoSchema,
boolean enableConnectionPool,
String traceId) {
return mockedWriter;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1149,6 +1149,7 @@ private BigQueryBufferedWriter createBufferedWriter(
TestBigQuerySchemas.getSimpleRecordSchema(),
mockSerializer,
null,
"traceId",
context);
}

Expand Down Expand Up @@ -1188,6 +1189,7 @@ private BigQueryBufferedWriter createBufferedWriter(
schemaProvider,
mockSerializer,
createTableOptions,
"traceId",
context);
}

Expand Down Expand Up @@ -1222,6 +1224,7 @@ private BigQueryBufferedWriter createBufferedWriter(
TestBigQuerySchemas.getSimpleRecordSchema(),
mockSerializer,
null,
"traceId",
context);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ private BigQueryDefaultWriter createDefaultWriter(
schemaProvider,
mockSerializer,
createTableOptions,
"traceId",
mockInitContext);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,15 @@ interface StorageWriteClient extends AutoCloseable {
* @param streamName the write stream to be used by this writer.
* @param protoSchema the schema of the serialized protocol buffer data rows.
* @param enableConnectionPool enable BigQuery client multiplexing for this writer.
* @param traceId job identifier.
* @return A StreamWriter for a BigQuery storage write stream.
* @throws IOException
*/
StreamWriter createStreamWriter(
String streamName, ProtoSchema protoSchema, boolean enableConnectionPool)
String streamName,
ProtoSchema protoSchema,
boolean enableConnectionPool,
String traceId)
throws IOException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

Expand All @@ -84,14 +85,15 @@ public class BigQueryServicesImpl implements BigQueryServices {

public static final int ALREADY_EXISTS_ERROR_CODE = 409;

public static final String TRACE_ID_FORMAT = "Flink:%s_%s";

private static final Logger LOG = LoggerFactory.getLogger(BigQueryServicesImpl.class);

private static final HeaderProvider USER_AGENT_HEADER_PROVIDER =
FixedHeaderProvider.create(
"User-Agent", "flink-bigquery-connector/" + FlinkVersion.current().toString());

public static final String TRACE_ID =
String.format("Flink:%s", FlinkVersion.current().toString());
private static final String FLINK_VERSION = FlinkVersion.current().toString();

@Override
public StorageReadClient createStorageReadClient(CredentialsOptions credentialsOptions)
Expand Down Expand Up @@ -253,7 +255,10 @@ private StorageWriteClientImpl(CredentialsOptions options) throws IOException {

@Override
public StreamWriter createStreamWriter(
String streamName, ProtoSchema protoSchema, boolean enableConnectionPool)
String streamName,
ProtoSchema protoSchema,
boolean enableConnectionPool,
String traceId)
throws IOException {
/**
* Enable client lib automatic retries on request level errors.
Expand All @@ -278,7 +283,7 @@ public StreamWriter createStreamWriter(
.build();
return StreamWriter.newBuilder(streamName, client)
.setEnableConnectionPool(enableConnectionPool)
.setTraceId(TRACE_ID)
.setTraceId(traceId)
.setRetrySettings(retrySettings)
.setWriterSchema(protoSchema)
.build();
Expand Down Expand Up @@ -558,4 +563,8 @@ static List<String> processErrorMessages(List<ErrorProto> errors) {
.collect(Collectors.toList());
}
}

public static final String generateTraceId() {
return String.format(TRACE_ID_FORMAT, FLINK_VERSION, UUID.randomUUID().toString());
}
}

0 comments on commit dfa5b66

Please sign in to comment.