Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix snapshot metrics #182

Merged
merged 24 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
300194b
Populate User Agent Header and Trace ID to track Read and Write API u…
prashastia Sep 23, 2024
c11c8ee
Populate User Agent Header and Trace ID to track Read and Write API u…
prashastia Sep 23, 2024
4e75e1e
Populate User Agent Header and Trace ID to track Read and Write API u…
prashastia Sep 23, 2024
f30c973
Remove the commented out `.setTraceId()` for
prashastia Sep 24, 2024
8549563
Remove User Agent Header from the transport channel.
prashastia Sep 24, 2024
7f08506
throw BigQueryConnector Exception in case of schema mismatch.
prashastia Sep 30, 2024
e4c3f49
throw BigQueryConnector Exception in case of schema mismatch.
prashastia Sep 30, 2024
72d0db2
Fix No edge case time type.
prashastia Sep 30, 2024
a652677
Merge remote-tracking branch 'dataproc/main' into improve-debuggability
prashastia Sep 30, 2024
df4741f
Remove log messages.
prashastia Sep 30, 2024
64c38c8
Fix day, month and year partition reads along with tests.
prashastia Oct 24, 2024
cefd02f
Merge remote-tracking branch 'dataproc/main' into troubleshooting-guide
prashastia Oct 27, 2024
80e8470
Add TROUBLESHOOT.md
prashastia Oct 27, 2024
99903c6
Merge remote-tracking branch 'dataproc/main' into troubleshooting-guide
prashastia Oct 29, 2024
e7fd9ea
Address review changes for code.
prashastia Oct 29, 2024
5fddee2
Address review changes for code.
prashastia Oct 29, 2024
39c2ea0
Merge remote-tracking branch 'dataproc/main' into troubleshooting-guide
prashastia Oct 29, 2024
250240c
Address review changes for code.
prashastia Nov 6, 2024
2d179c4
Remove extra space.
prashastia Nov 6, 2024
6acf4d1
Address Review Comments - elaborate BigQueryConnector Exception.
prashastia Nov 6, 2024
5aa3b10
Reset Since Checkpoint variables in SnapshotState rather than first w…
prashastia Nov 12, 2024
c4cb4f2
Merge remote-tracking branch 'dataproc/main' into fix-snapshot-metrics
prashastia Nov 12, 2024
aeb25d7
Reset Since Checkpoint variables in SnapshotState rather than first w…
prashastia Nov 12, 2024
83541df
Address review comments.
prashastia Dec 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,6 @@ private void preWriteOpsAfterCommit() {
long numberOfRecordsWrittenInLastCommit = totalRecordsWritten - totalRecordsCommitted;
totalRecordsCommitted = totalRecordsWritten;
numberOfRecordsWrittenToBigQuery.inc(numberOfRecordsWrittenInLastCommit);
// Reset the "Since Checkpoint" values to 0.
numberOfRecordsBufferedByBigQuerySinceCheckpoint.dec(
numberOfRecordsBufferedByBigQuerySinceCheckpoint.getCount());
numberOfRecordsSeenByWriterSinceCheckpoint.dec(
numberOfRecordsSeenByWriterSinceCheckpoint.getCount());
}

/**
Expand Down Expand Up @@ -272,6 +267,11 @@ public List<BigQueryWriterState> snapshotState(long checkpointId) {
isFirstWriteAfterCheckpoint = true;
streamNameInState = streamName;
streamOffsetInState = streamOffset;
// Reset the "Since Checkpoint" values to 0.
numberOfRecordsBufferedByBigQuerySinceCheckpoint.dec(
numberOfRecordsBufferedByBigQuerySinceCheckpoint.getCount());
numberOfRecordsSeenByWriterSinceCheckpoint.dec(
numberOfRecordsSeenByWriterSinceCheckpoint.getCount());
return Collections.singletonList(
// Note that it's possible to store the associated checkpointId in writer's state.
// For now, we're not leveraging this due to absence of a use case.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -725,9 +725,9 @@ public void testSnapshotState_withNewWriter() {
assertEquals(3, bufferedWriter.getStreamOffsetInState());
// Test Flink Metrics
assertEquals(3, bufferedWriter.numberOfRecordsSeenByWriter.getCount());
assertEquals(3, bufferedWriter.numberOfRecordsSeenByWriterSinceCheckpoint.getCount());
assertEquals(0, bufferedWriter.numberOfRecordsSeenByWriterSinceCheckpoint.getCount());
assertEquals(0, bufferedWriter.numberOfRecordsWrittenToBigQuery.getCount());
assertEquals(3, bufferedWriter.numberOfRecordsBufferedByBigQuerySinceCheckpoint.getCount());
assertEquals(0, bufferedWriter.numberOfRecordsBufferedByBigQuerySinceCheckpoint.getCount());
}

@Test
Expand Down Expand Up @@ -767,10 +767,10 @@ public void testSnapshotState_withNewWriter_metrics() {
BigQueryWriterState writerState = (BigQueryWriterState) writerStates.toArray()[0];
// Test Flink Metrics
assertEquals(3, bufferedWriter.numberOfRecordsSeenByWriter.getCount());
assertEquals(3, bufferedWriter.numberOfRecordsSeenByWriterSinceCheckpoint.getCount());
assertEquals(0, bufferedWriter.numberOfRecordsSeenByWriterSinceCheckpoint.getCount());
// Updated at first write after checkpoint.
assertEquals(0, bufferedWriter.numberOfRecordsWrittenToBigQuery.getCount());
assertEquals(3, bufferedWriter.numberOfRecordsBufferedByBigQuerySinceCheckpoint.getCount());
assertEquals(0, bufferedWriter.numberOfRecordsBufferedByBigQuerySinceCheckpoint.getCount());
bufferedWriter.write(new Object(), null);
// Test Flink Metrics
assertEquals(4, bufferedWriter.numberOfRecordsSeenByWriter.getCount());
Expand Down Expand Up @@ -833,9 +833,9 @@ public void testSnapshotState_withRestoredWriter_withUsableStream() {
assertEquals(103, bufferedWriter.getStreamOffsetInState());
// Test Flink Metrics
assertEquals(213, bufferedWriter.numberOfRecordsSeenByWriter.getCount());
assertEquals(3, bufferedWriter.numberOfRecordsSeenByWriterSinceCheckpoint.getCount());
assertEquals(0, bufferedWriter.numberOfRecordsSeenByWriterSinceCheckpoint.getCount());
assertEquals(200, bufferedWriter.numberOfRecordsWrittenToBigQuery.getCount());
assertEquals(3, bufferedWriter.numberOfRecordsBufferedByBigQuerySinceCheckpoint.getCount());
assertEquals(0, bufferedWriter.numberOfRecordsBufferedByBigQuerySinceCheckpoint.getCount());
}

@Test
Expand Down Expand Up @@ -880,9 +880,9 @@ public void testSnapshotState_withRestoredWriter_withUsableStream_testMetrics()
Collection<BigQueryWriterState> writerStates = bufferedWriter.snapshotState(1);
// Test Flink Metrics
assertEquals(213, bufferedWriter.numberOfRecordsSeenByWriter.getCount());
assertEquals(3, bufferedWriter.numberOfRecordsSeenByWriterSinceCheckpoint.getCount());
assertEquals(0, bufferedWriter.numberOfRecordsSeenByWriterSinceCheckpoint.getCount());
assertEquals(200, bufferedWriter.numberOfRecordsWrittenToBigQuery.getCount());
assertEquals(3, bufferedWriter.numberOfRecordsBufferedByBigQuerySinceCheckpoint.getCount());
assertEquals(0, bufferedWriter.numberOfRecordsBufferedByBigQuerySinceCheckpoint.getCount());
bufferedWriter.write(new Object(), null);
bufferedWriter.write(new Object(), null);
assertEquals(215, bufferedWriter.numberOfRecordsSeenByWriter.getCount());
Expand Down Expand Up @@ -939,9 +939,9 @@ public void testSnapshotState_withRestoredWriter_withUnusableStream() {
assertEquals(3, bufferedWriter.getStreamOffsetInState());
// Test Flink Metrics
assertEquals(213, bufferedWriter.numberOfRecordsSeenByWriter.getCount());
assertEquals(3, bufferedWriter.numberOfRecordsSeenByWriterSinceCheckpoint.getCount());
assertEquals(0, bufferedWriter.numberOfRecordsSeenByWriterSinceCheckpoint.getCount());
assertEquals(200, bufferedWriter.numberOfRecordsWrittenToBigQuery.getCount());
assertEquals(3, bufferedWriter.numberOfRecordsBufferedByBigQuerySinceCheckpoint.getCount());
assertEquals(0, bufferedWriter.numberOfRecordsBufferedByBigQuerySinceCheckpoint.getCount());
}

@Test
Expand Down
Loading