Skip to content

[Dataflow Streaming] Add a pipeline option to skip input elements that cannot be decoded successfully#37762

Open
scwhittle wants to merge 1 commit intoapache:masterfrom
scwhittle:parse_error_option
Open

[Dataflow Streaming] Add a pipeline option to skip input elements that cannot be decoded successfully#37762
scwhittle wants to merge 1 commit intoapache:masterfrom
scwhittle:parse_error_option

Conversation

@scwhittle
Copy link
Contributor

Such messages have the exception logged but are otherwise discarded. This defaults to off but can be enabled during pipeline update as a possible mitigation for a pipeline that is stuck due to corrupted inputs.

Fix some suppressions while modifying.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request enhances the robustness of Dataflow streaming pipelines by introducing a new configuration option that allows workers to gracefully handle and skip input elements that cannot be successfully decoded. This prevents pipeline failures caused by corrupted data, ensuring continuous processing by logging errors and moving past malformed inputs.

Highlights

  • New Pipeline Option: Introduced a new Dataflow streaming pipeline option, skipInputElementsWithDecodingExceptions, which allows the pipeline to log and skip input elements that fail to decode successfully from the streaming backend. This option defaults to off but can be enabled during pipeline updates.
  • Error Handling in Readers: Modified PubsubReader, UngroupedWindmillReader, and WindmillReaderIteratorBase to incorporate the new skipDecodingErrors flag. When enabled, these readers will catch IOException during decoding, log the error, and skip the problematic element instead of throwing an exception and potentially halting the pipeline.
  • Keyed Work Item Processing: Updated WindmillKeyedWorkItem to accept a skipExceptions flag. The element parsing logic was refactored into a dedicated parseElem method, which now logs decoding errors and returns null for corrupted elements if skipExceptions is true, allowing the iterable to filter them out.
  • Improved Robustness for PaneInfo: Added validation to PaneInfo.Encoding.fromTag to throw a CoderException for invalid pane encoding tags, enhancing error detection for corrupted metadata.
  • Testing and Suppressions: Added comprehensive unit tests in StreamingDataflowWorkerTest, WindmillKeyedWorkItemTest, and WindmillReaderIteratorBaseTest to verify the functionality of skipping corrupted input elements. Several @SuppressWarnings annotations were also removed or updated, and @Internal was added to WindmillKeyedWorkItem.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
    • Added getSkipInputElementsWithDecodingExceptions() and setSkipInputElementsWithDecodingExceptions() methods to introduce a new pipeline option.
  • runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubReader.java
    • Imported DataflowStreamingPipelineOptions.
    • Added skipDecodingErrors field and updated constructors to accept this new boolean flag.
    • Modified the Factory class to retrieve the skipInputElementsWithDecodingExceptions option from pipeline options and pass it to the PubsubReader constructor.
    • Updated the PubsubReaderIterator constructor to pass the skipDecodingErrors flag to its superclass.
  • runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java
    • Imported DataflowStreamingPipelineOptions and checkNotNull.
    • Added skipDecodingErrors field and updated constructors to accept this new boolean flag.
    • Modified the Factory class to retrieve the skipInputElementsWithDecodingExceptions option from pipeline options and pass it to the UngroupedWindmillReader constructor.
    • Updated the UngroupedWindmillReaderIterator constructor to pass the skipDecodingErrors flag to its superclass.
    • Changed context.getSerializedKey().newInput() to checkNotNull(context.getSerializedKey()).newInput() for null safety.
  • runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java
    • Imported Internal, Logger, and LoggerFactory.
    • Added @Internal annotation to the class and removed old @SuppressWarnings.
    • Added LOG static field for logging.
    • Added skipExceptions field and an overloaded constructor to accept this flag.
    • Refactored element decoding logic into a new private method parseElem which handles IOException by logging and returning null if skipExceptions is true.
    • Modified elementsIterable() to use parseElem and filter out null results.
    • Added @SuppressWarnings({"unchecked"}) to FakeKeyedWorkItemCoder constructor.
  • runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBase.java
    • Imported Logger and LoggerFactory.
    • Removed @SuppressWarnings annotation.
    • Added skipDecodingErrors field and LOG static field for logging.
    • Updated constructor to accept the skipDecodingErrors flag.
    • Modified the advance() method to catch IOException during message decoding; if skipDecodingErrors is true, the error is logged, and the element is skipped.
  • runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java
    • Imported Objects, DataflowStreamingPipelineOptions, and ValueProvider.
    • Added skipDecodingExceptions field and updated constructors to accept this new boolean flag.
    • Modified the Factory class to retrieve the skipInputElementsWithDecodingExceptions option from pipeline options and pass it to the WindowingWindmillReader.create method.
  • runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
    • Added testSkipInputElementsWithDecodingExceptions test case to verify that corrupted input elements are skipped when the new option is enabled.
  • runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java
    • Imported Iterables and assertTrue.
    • Added testElementIterationWithSkipEnabled, testElementIterationSkips, and testElementIterationAllSkips test cases to validate the skipping of corrupted elements in WindmillKeyedWorkItem.
    • Added addCorruptedElement helper method for tests.
  • runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBaseTest.java
    • Imported ThreadLocalRandom and CoderException.
    • Modified TestWindmillReaderIterator.decodeMessage to throw CoderException for negative timestamps to simulate decoding errors.
    • Added testSkipErrors to verify the behavior of skipping errors in the iterator.
    • Updated testForMessageBundleCounts to accept a skipErrors parameter and inject corrupted messages randomly.
  • sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
    • Added validation in Encoding.fromTag to check for valid index range and throw CoderException for invalid pane encodings.
Activity
  • The pull request introduces a new pipeline option to handle decoding exceptions in streaming Dataflow jobs.
  • The core logic for skipping corrupted input elements has been implemented across several reader and work item classes.
  • Extensive unit tests have been added to ensure the new functionality works as expected, covering scenarios with valid, corrupted, and entirely corrupted input streams.
  • The author has also addressed minor code quality improvements, such as updating @SuppressWarnings annotations and adding null checks.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@github-actions github-actions bot added the direct label Mar 4, 2026
@scwhittle scwhittle force-pushed the parse_error_option branch from e19ab8f to 5d760aa Compare March 4, 2026 12:52
@github-actions github-actions bot removed the direct label Mar 4, 2026
@github-actions
Copy link
Contributor

github-actions bot commented Mar 4, 2026

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@scwhittle scwhittle force-pushed the parse_error_option branch 2 times, most recently from d02c734 to e7c825f Compare March 6, 2026 15:28
… input elements. Such messages will log an error but are otherwise discarded.
@scwhittle scwhittle force-pushed the parse_error_option branch from e7c825f to 736fc0e Compare March 9, 2026 10:08
@scwhittle
Copy link
Contributor Author

R: @arunpandianp

@github-actions
Copy link
Contributor

github-actions bot commented Mar 9, 2026

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment assign set of reviewers

Copy link
Contributor

@arunpandianp arunpandianp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of questions around how ValueProviders work and a couple of nits.

Looks good otherwise.


@Description(
"If true, log and skip input elements that are unable to successfully decode from the streaming backend.")
ValueProvider<Boolean> getSkipInputElementsWithDecodingExceptions();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is not clear why this needs to be a ValueProvider and not a Boolean. It seems like all the places where this is used are instantiated during the runtime and the pipeline options will be available there.

I'm not familiar with how ValueProviders work, does this need to be a ValueProvider?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that ValueProvider was needed for all templates but it is just classic templates. In the classic template case, the graph and pipeline serialization happens during template creation and the valueprovider is a placeholder that allows injecting the value later when the template is instantiated. Flex templates on the other hand rerun the graph and pipeline serialization logic each invocation and don't require them. In particular the source/reader is a serialized part of the graph. Given that this is more of an emergency option and a customer using a classic template may wish to use it, it seems worthwhile keeping it as a valueprovider.

context.getWindmillTagEncoding(),
context.getDrainMode());
context.getDrainMode(),
Boolean.TRUE.equals(skipDecodingExceptions.get()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need a skipDecodingExceptions.isAccessible() check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, that seems safer and lets me cleanup the test where otherwise it was required to set it. I think if using a real runner it always initializes all the value providers during options deserialization.

current = checkNotNull(decodeMessage(bundle.getMessages(messageIndex)));
return true;
} catch (RuntimeException | IOException e) {
if (Boolean.TRUE.equals(skipUndecodableElements.get())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need a skipDecodingExceptions.isAccessible() check?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants