Skip to content

Commit ff53c14

Browse files
google-genai-botcopybara-github
authored andcommitted
feat: Implement basic version of BigQuery Agent Analytics Plugin
This change introduces a new plugin for the Agent Development Kit (ADK) that logs agent execution events to BigQuery. It includes: - `BigQueryAgentAnalyticsPlugin`: A plugin that captures various agent lifecycle events (user messages, tool calls, model invocations) and sends them to BigQuery. - `BigQueryLoggerConfig`: Configuration options for the plugin, including project/dataset/table IDs, batching, and retry settings. - `BigQuerySchema`: Defines the BigQuery and Arrow schemas used for the event table. - `BatchProcessor`: Handles batching of events and writing them to BigQuery using the Storage Write API with Arrow format. - `JsonFormatter`: Utility for safely formatting JSON content for BigQuery. PiperOrigin-RevId: 877497234
1 parent 005121b commit ff53c14

File tree

9 files changed

+1668
-0
lines changed

9 files changed

+1668
-0
lines changed

core/pom.xml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,25 @@
189189
<artifactId>opentelemetry-sdk-testing</artifactId>
190190
<scope>test</scope>
191191
</dependency>
192+
<dependency>
193+
<groupId>com.google.cloud</groupId>
194+
<artifactId>google-cloud-bigquery</artifactId>
195+
</dependency>
196+
<dependency>
197+
<groupId>com.google.cloud</groupId>
198+
<artifactId>google-cloud-bigquerystorage</artifactId>
199+
</dependency>
200+
<dependency>
201+
<groupId>org.apache.arrow</groupId>
202+
<artifactId>arrow-vector</artifactId>
203+
<version>18.1.0</version>
204+
</dependency>
205+
<dependency>
206+
<groupId>org.apache.arrow</groupId>
207+
<artifactId>arrow-memory-netty</artifactId>
208+
<version>18.1.0</version>
209+
<scope>runtime</scope>
210+
</dependency>
192211
</dependencies>
193212
<build>
194213
<resources>
Lines changed: 269 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
/*
2+
* Copyright 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.adk.plugins.agentanalytics;
18+
19+
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
20+
import static java.nio.charset.StandardCharsets.UTF_8;
21+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
22+
import static java.util.concurrent.TimeUnit.NANOSECONDS;
23+
import static java.util.concurrent.TimeUnit.SECONDS;
24+
25+
import com.fasterxml.jackson.databind.JsonNode;
26+
import com.fasterxml.jackson.databind.node.ArrayNode;
27+
import com.fasterxml.jackson.databind.node.ObjectNode;
28+
import com.google.api.core.ApiFuture;
29+
import com.google.api.core.ApiFutureCallback;
30+
import com.google.api.core.ApiFutures;
31+
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
32+
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializationError;
33+
import com.google.cloud.bigquery.storage.v1.StreamWriter;
34+
import com.google.common.annotations.VisibleForTesting;
35+
import java.time.Duration;
36+
import java.time.Instant;
37+
import java.util.ArrayList;
38+
import java.util.List;
39+
import java.util.Map;
40+
import java.util.concurrent.BlockingQueue;
41+
import java.util.concurrent.LinkedBlockingQueue;
42+
import java.util.concurrent.ScheduledExecutorService;
43+
import java.util.concurrent.atomic.AtomicBoolean;
44+
import java.util.logging.Level;
45+
import java.util.logging.Logger;
46+
import org.apache.arrow.memory.BufferAllocator;
47+
import org.apache.arrow.memory.RootAllocator;
48+
import org.apache.arrow.vector.BigIntVector;
49+
import org.apache.arrow.vector.BitVector;
50+
import org.apache.arrow.vector.FieldVector;
51+
import org.apache.arrow.vector.TimeStampVector;
52+
import org.apache.arrow.vector.VarCharVector;
53+
import org.apache.arrow.vector.VectorSchemaRoot;
54+
import org.apache.arrow.vector.VectorUnloader;
55+
import org.apache.arrow.vector.complex.ListVector;
56+
import org.apache.arrow.vector.complex.StructVector;
57+
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
58+
import org.apache.arrow.vector.types.pojo.Field;
59+
import org.apache.arrow.vector.types.pojo.Schema;
60+
61+
/** Handles asynchronous batching and writing of events to BigQuery. */
62+
class BatchProcessor implements AutoCloseable {
63+
private static final Logger logger = Logger.getLogger(BatchProcessor.class.getName());
64+
65+
private final StreamWriter writer;
66+
private final int batchSize;
67+
private final Duration flushInterval;
68+
private final BlockingQueue<Map<String, Object>> queue;
69+
private final ScheduledExecutorService executor;
70+
@VisibleForTesting final BufferAllocator allocator;
71+
final AtomicBoolean flushLock = new AtomicBoolean(false);
72+
private final Schema arrowSchema;
73+
74+
public BatchProcessor(
75+
StreamWriter writer,
76+
int batchSize,
77+
Duration flushInterval,
78+
int queueMaxSize,
79+
ScheduledExecutorService executor) {
80+
this.writer = writer;
81+
this.batchSize = batchSize;
82+
this.flushInterval = flushInterval;
83+
this.queue = new LinkedBlockingQueue<>(queueMaxSize);
84+
this.executor = executor;
85+
this.allocator = new RootAllocator(Long.MAX_VALUE);
86+
this.arrowSchema = BigQuerySchema.getArrowSchema();
87+
}
88+
89+
public void start() {
90+
@SuppressWarnings("unused")
91+
var unused =
92+
executor.scheduleWithFixedDelay(
93+
() -> {
94+
try {
95+
flush();
96+
} catch (RuntimeException e) {
97+
logger.log(Level.SEVERE, "Error in background flush", e);
98+
}
99+
},
100+
flushInterval.toMillis(),
101+
flushInterval.toMillis(),
102+
MILLISECONDS);
103+
}
104+
105+
public void append(Map<String, Object> row) {
106+
if (!queue.offer(row)) {
107+
logger.warning("BigQuery event queue is full, dropping event.");
108+
return;
109+
}
110+
if (queue.size() >= batchSize && !flushLock.get()) {
111+
executor.execute(this::flush);
112+
}
113+
}
114+
115+
public void flush() {
116+
// Acquire the flushLock. If another flush is already in progress, return immediately.
117+
if (!flushLock.compareAndSet(false, true)) {
118+
return;
119+
}
120+
121+
try {
122+
if (queue.isEmpty()) {
123+
return;
124+
}
125+
126+
List<Map<String, Object>> batch = new ArrayList<>();
127+
queue.drainTo(batch, batchSize);
128+
129+
if (batch.isEmpty()) {
130+
return;
131+
}
132+
try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator)) {
133+
root.allocateNew();
134+
int rowCount = batch.size();
135+
136+
for (int i = 0; i < rowCount; i++) {
137+
Map<String, Object> row = batch.get(i);
138+
for (Field field : arrowSchema.getFields()) {
139+
populateVector(root.getVector(field.getName()), i, row.get(field.getName()));
140+
}
141+
}
142+
root.setRowCount(rowCount);
143+
144+
try (ArrowRecordBatch recordBatch = new VectorUnloader(root).getRecordBatch()) {
145+
ApiFuture<AppendRowsResponse> future = writer.append(recordBatch);
146+
ApiFutures.addCallback(
147+
future,
148+
new ApiFutureCallback<AppendRowsResponse>() {
149+
@Override
150+
public void onFailure(Throwable t) {
151+
logger.log(Level.SEVERE, "Failed to write batch to BigQuery", t);
152+
if (t instanceof AppendSerializationError ase) {
153+
Map<Integer, String> rowIndexToErrorMessage = ase.getRowIndexToErrorMessage();
154+
155+
if (rowIndexToErrorMessage != null && !rowIndexToErrorMessage.isEmpty()) {
156+
logger.severe("Row-level errors found:");
157+
for (Map.Entry<Integer, String> entry : rowIndexToErrorMessage.entrySet()) {
158+
logger.severe(
159+
String.format(
160+
"Row error at index %d: %s", entry.getKey(), entry.getValue()));
161+
}
162+
} else {
163+
logger.severe(
164+
"AppendSerializationError occurred, but no row-specific errors were"
165+
+ " provided.");
166+
}
167+
}
168+
}
169+
170+
@Override
171+
public void onSuccess(AppendRowsResponse result) {
172+
if (result.hasError()) {
173+
logger.severe("BigQuery append error: " + result.getError().getMessage());
174+
for (var error : result.getRowErrorsList()) {
175+
logger.severe(
176+
String.format(
177+
"Row error at index %d: %s", error.getIndex(), error.getMessage()));
178+
}
179+
} else {
180+
logger.fine("Successfully wrote " + batch.size() + " rows to BigQuery.");
181+
}
182+
}
183+
},
184+
directExecutor());
185+
}
186+
} catch (RuntimeException e) {
187+
logger.log(Level.SEVERE, "Failed to append rows to StreamWriter", e);
188+
}
189+
} finally {
190+
flushLock.set(false);
191+
if (queue.size() >= batchSize && !flushLock.get()) {
192+
executor.execute(this::flush);
193+
}
194+
}
195+
}
196+
197+
private void populateVector(FieldVector vector, int index, Object value) {
198+
if (value == null || (value instanceof JsonNode jsonNode && jsonNode.isNull())) {
199+
vector.setNull(index);
200+
return;
201+
}
202+
203+
if (vector instanceof VarCharVector varCharVector) {
204+
String strValue = (value instanceof JsonNode jsonNode) ? jsonNode.asText() : value.toString();
205+
varCharVector.setSafe(index, strValue.getBytes(UTF_8));
206+
} else if (vector instanceof BigIntVector bigIntVector) {
207+
long longValue;
208+
if (value instanceof JsonNode jsonNode) {
209+
longValue = jsonNode.asLong();
210+
} else if (value instanceof Number number) {
211+
longValue = number.longValue();
212+
} else {
213+
longValue = Long.parseLong(value.toString());
214+
}
215+
bigIntVector.setSafe(index, longValue);
216+
} else if (vector instanceof BitVector bitVector) {
217+
boolean boolValue =
218+
(value instanceof JsonNode jsonNode) ? jsonNode.asBoolean() : (Boolean) value;
219+
bitVector.setSafe(index, boolValue ? 1 : 0);
220+
} else if (vector instanceof TimeStampVector timeStampVector) {
221+
if (value instanceof Instant instant) {
222+
long micros =
223+
SECONDS.toMicros(instant.getEpochSecond()) + NANOSECONDS.toMicros(instant.getNano());
224+
timeStampVector.setSafe(index, micros);
225+
} else if (value instanceof JsonNode jsonNode) {
226+
timeStampVector.setSafe(index, jsonNode.asLong());
227+
} else if (value instanceof Long longValue) {
228+
timeStampVector.setSafe(index, longValue);
229+
}
230+
} else if (vector instanceof ListVector listVector) {
231+
int start = listVector.startNewValue(index);
232+
if (value instanceof ArrayNode arrayNode) {
233+
for (int i = 0; i < arrayNode.size(); i++) {
234+
populateVector(listVector.getDataVector(), start + i, arrayNode.get(i));
235+
}
236+
listVector.endValue(index, arrayNode.size());
237+
} else if (value instanceof List) {
238+
List<?> list = (List<?>) value;
239+
for (int i = 0; i < list.size(); i++) {
240+
populateVector(listVector.getDataVector(), start + i, list.get(i));
241+
}
242+
listVector.endValue(index, list.size());
243+
}
244+
} else if (vector instanceof StructVector structVector) {
245+
structVector.setIndexDefined(index);
246+
if (value instanceof ObjectNode objectNode) {
247+
for (FieldVector child : structVector.getChildrenFromFields()) {
248+
populateVector(child, index, objectNode.get(child.getName()));
249+
}
250+
} else if (value instanceof Map) {
251+
Map<?, ?> map = (Map<?, ?>) value;
252+
for (FieldVector child : structVector.getChildrenFromFields()) {
253+
populateVector(child, index, map.get(child.getName()));
254+
}
255+
}
256+
}
257+
}
258+
259+
@Override
260+
public void close() {
261+
flush();
262+
if (writer != null) {
263+
writer.close();
264+
}
265+
if (allocator != null) {
266+
allocator.close();
267+
}
268+
}
269+
}

0 commit comments

Comments
 (0)