[Dataflow Java Runner] Add support for sending logs directly to Cloud Logging#37662
[Dataflow Java Runner] Add support for sending logs directly to Cloud Logging#37662scwhittle merged 3 commits intoapache:masterfrom
Conversation
Summary of ChangesHello @scwhittle, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request enhances Dataflow worker logging by enabling direct transmission of logs to Google Cloud Logging. This change aims to reduce log latency and improve reliability by allowing logs to bypass the traditional disk-based ingestion mechanism. A new configuration option provides granular control over which log levels are directly uploaded, offering more flexibility for users to manage their logging behavior. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #37662 +/- ##
=========================================
Coverage 40.08% 40.08%
Complexity 3416 3416
=========================================
Files 1178 1177 -1
Lines 187433 187443 +10
Branches 3589 3592 +3
=========================================
+ Hits 75130 75144 +14
+ Misses 108912 108905 -7
- Partials 3391 3394 +3
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
25dbee1 to
06a8578
Compare
403f898 to
bec21cf
Compare
162ffa0 to
d8f5559
Compare
|
Assigning reviewers: R: @kennknowles for label java. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
|
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
| } | ||
| checkNotNull(testDirectLogInterceptor).accept(logEntry); | ||
| } else { | ||
| checkNotNull(directLogging).write(ImmutableList.of(logEntry), directWriteOptions); |
There was a problem hiding this comment.
did you consider calling to com.google.cloud.logging.LoggingHandler to do direct logging? Seems like LoggingHandler has most of the logic already. We could setup a ErrorManager to collect errors and fallback to disk logging.
There was a problem hiding this comment.
reposting here since email reply didn't respond in thread:
Yes i was using that at first but it has some downsides:
- formats as string instead of payload
- adds labels we don't have in existing logs
- we need to modify the MonitoredResource based upon the log as it includes
the step name and the only way to do that was to override the protected
logentryfor method instead of using enhancers etc - we want to ensure we don't double log which is tricky if there are two
separate handlers
I was originally delegating to an instance of LoggingHandler but it
actually wasn't saving much code and was doing extra things we didn't want.
|
Yes i was using that at first but it has some downsides:
- formats as string instead of payload
- adds labels we don't have in existing logs
- we need to modify the MonitoredResource based upon the log as it includes
the step name and the only way to do that was to override the protected
logentryfor method instead of using enhancers etc
- we want to ensure we don't double log which is tricky if there are two
separate handlers
I was originally delegating to an instance of LoggingHandler but it
actually wasn't saving much code and was doing extra things we didn't want.
…On Fri, Mar 6, 2026, 8:14 AM Arun Pandian ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java
<#37662 (comment)>:
> + if (isDirectLog) {
+ if (directThrottler.shouldAttemptDirectLog()) {
+ try {
+ LogEntry logEntry = constructDirectLogEntry(record, executionState);
+ if (testDirectLogInterceptor != null) {
+ // The default labels are applied by write options generally bute we merge them in here
+ // so they are visible to the test.
+ HashMap<String, String> mergedLabels = new HashMap<>(defaultLabels);
+ mergedLabels.putAll(logEntry.getLabels());
+ logEntry = logEntry.toBuilder().setLabels(mergedLabels).build();
+ if (logEntry.getResource() == null) {
+ logEntry = logEntry.toBuilder().setResource(steplessMonitoredResource).build();
+ }
+ checkNotNull(testDirectLogInterceptor).accept(logEntry);
+ } else {
+ checkNotNull(directLogging).write(ImmutableList.of(logEntry), directWriteOptions);
did you consider calling to com.google.cloud.logging.LoggingHandler to do
direct logging? Seems like LoggingHandler has most of the logic already. We
could setup a ErrorManager to collect errors and fallback to disk logging.
—
Reply to this email directly, view it on GitHub
<#37662 (review)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/ABBZZTBGLCRX7MWYWEJCVFD4PJ3GPAVCNFSM6AAAAACV2T5K7SVHI2DSMVQWIX3LMV43YUDVNRWFEZLROVSXG5CSMV3GSZLXHMZTSMBRHA4DGOJWGE>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
|
| .build(); | ||
| } | ||
|
|
||
| private static String middleCrop(String value, int maxSize) { |
There was a problem hiding this comment.
can we use org.apache.commons.lang3.StringUtils.abbreviateMiddle instead?
| @LazyInit private ImmutableMap<String, String> defaultLabels = ImmutableMap.of(); | ||
| @LazyInit private @Nullable Consumer<LogEntry> testDirectLogInterceptor; | ||
|
|
||
| private static final String LOG_TYPE = "dataflow.googleapis.com%2Fworker"; |
There was a problem hiding this comment.
should this be "/" instead of "%2f"?
There was a problem hiding this comment.
This matches the existing dataflow logs.
...ava/src/main/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptions.java
Show resolved
Hide resolved
|
|
||
| // Either we were throttled or encountered an error enqueuing the log. | ||
| if (!fallbackDirectErrorsToDisk) { | ||
| return; |
There was a problem hiding this comment.
Would be useful to add a throttled disk log here saying we are dropping direct logs.
.../main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java
Outdated
Show resolved
Hide resolved
| MonitoredResource.newBuilder(RESOURCE_TYPE).build(); | ||
|
|
||
| @LazyInit private ImmutableMap<String, String> defaultLabels = ImmutableMap.of(); | ||
| @LazyInit private @Nullable Consumer<LogEntry> testDirectLogInterceptor; |
There was a problem hiding this comment.
These fields (except logCustomMdc) are set in enableDirectLogging enableDirectLogging is synchronized. The fields are accessed in publish without synchronization. What makes the threads calling publish see the updated/consistent values?
There was a problem hiding this comment.
I was thinking that it would be based on when this was called in main thread, but there could already be background threads at that point that could try to log so I agree we should change it to be thread-safe.
| // are logically final. | ||
| @LazyInit private @Nullable Logging directLogging = null; | ||
| @LazyInit private boolean fallbackDirectErrorsToDisk = false; | ||
| @LazyInit private Level defaultNonDirectLogLevel = Level.ALL; |
There was a problem hiding this comment.
Can we group the directLogging related LazyInit fields into a single Class DirectLoggingOptions and have a single @LazyInit DirectLoggingOptions field. Grouping them will make it clear that the updates of the grouped fields show up consistently (all/nothing).
There was a problem hiding this comment.
changed, I just went with an AtomicReference to keep things obviously safe. Though it is fine whether or not a reference to direct logging is observed or not, we need the fields of the object itself to be published fully which I'm not sure is guaranteed. The atomic should be low-enough overhead compared to the rest of Json formatting etc with logging.
.../main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java
Outdated
Show resolved
Hide resolved
.../main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java
Outdated
Show resolved
Hide resolved
|
test failure is unrelated flink test error |
|
Thanks! LGTM |
The default path for logs for dataflow pipelines is to disk and then uploaded to Cloud Logging by an agent on the VM. There is a throttling limit in this path after which logs are dropped. This was somewhat arbitrary but it prevents excessive logging billing for customers that have per-element logs enabled and there are costs and scaling limits of the current implementation. This PR adds options to allow configuring logs to be sent directly to Cloud Logging, with fallback to disk-based logging on errors.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.