Skip to content

Commit

Permalink
Track Read and Write API usage (#158)
Browse files Browse the repository at this point in the history
Populate User Agent Header and Trace ID.

/gcbrun
  • Loading branch information
prashastia authored Sep 30, 2024
1 parent 6a40c09 commit 6aceee8
Showing 1 changed file with 11 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@
public class BigQueryServicesImpl implements BigQueryServices {
private static final Logger LOG = LoggerFactory.getLogger(BigQueryServicesImpl.class);

private static final HeaderProvider USER_AGENT_HEADER_PROVIDER =
FixedHeaderProvider.create(
"User-Agent", "flink-bigquery-connector/" + FlinkVersion.current().toString());

public static final String TRACE_ID =
String.format("Flink:%s", FlinkVersion.current().toString());

@Override
public StorageReadClient createStorageReadClient(CredentialsOptions credentialsOptions)
throws IOException {
Expand Down Expand Up @@ -122,20 +129,16 @@ public void cancel() {

/** Implementation of a BigQuery read client wrapper. */
public static class StorageReadClientImpl implements StorageReadClient {
private static final HeaderProvider USER_AGENT_HEADER_PROVIDER =
FixedHeaderProvider.create(
"user-agent", "Apache_Flink_Java/" + FlinkVersion.current().toString());

private final BigQueryReadClient client;

private StorageReadClientImpl(CredentialsOptions options) throws IOException {
BigQueryReadSettings.Builder settingsBuilder =
BigQueryReadSettings.newBuilder()
.setCredentialsProvider(
FixedCredentialsProvider.create(options.getCredentials()))
.setHeaderProvider(USER_AGENT_HEADER_PROVIDER)
.setTransportChannelProvider(
BigQueryReadSettings.defaultGrpcTransportProviderBuilder()
.setHeaderProvider(USER_AGENT_HEADER_PROVIDER)
.build());

UnaryCallSettings.Builder<CreateReadSessionRequest, ReadSession>
Expand Down Expand Up @@ -185,20 +188,16 @@ public void close() {

/** Implementation of a BigQuery write client wrapper. */
public static class StorageWriteClientImpl implements StorageWriteClient {
private static final HeaderProvider USER_AGENT_HEADER_PROVIDER =
FixedHeaderProvider.create(
"user-agent", "Apache_Flink_Java/" + FlinkVersion.current().toString());

private final BigQueryWriteClient client;

private StorageWriteClientImpl(CredentialsOptions options) throws IOException {
BigQueryWriteSettings.Builder settingsBuilder =
BigQueryWriteSettings.newBuilder()
.setCredentialsProvider(
FixedCredentialsProvider.create(options.getCredentials()))
.setHeaderProvider(USER_AGENT_HEADER_PROVIDER)
.setTransportChannelProvider(
BigQueryReadSettings.defaultGrpcTransportProviderBuilder()
.setHeaderProvider(USER_AGENT_HEADER_PROVIDER)
.build());

UnaryCallSettings.Builder<CreateWriteStreamRequest, WriteStream>
Expand Down Expand Up @@ -261,6 +260,7 @@ public StreamWriter createStreamWriter(
.build();
return StreamWriter.newBuilder(streamName, client)
.setEnableConnectionPool(enableConnectionPool)
.setTraceId(TRACE_ID)
.setRetrySettings(retrySettings)
.setWriterSchema(protoSchema)
.build();
Expand Down Expand Up @@ -304,6 +304,7 @@ public QueryDataClientImpl(CredentialsOptions options) {
bigQuery =
BigQueryOptions.newBuilder()
.setCredentials(options.getCredentials())
.setHeaderProvider(USER_AGENT_HEADER_PROVIDER)
.build()
.getService();
bigquery = BigQueryUtils.newBigqueryBuilder(options).build();
Expand Down

0 comments on commit 6aceee8

Please sign in to comment.