From 736fc0e00ecd797f33da64455cc66b4acb8bfaf4 Mon Sep 17 00:00:00 2001 From: Sam Whittle Date: Fri, 20 Feb 2026 11:17:57 +0100 Subject: [PATCH 1/3] [Dataflow Streaming] Add a pipeline option to skip decoding errors of input elements. Such messages will log an error but are otherwise discarded. --- .../DataflowStreamingPipelineOptions.java | 7 ++ .../runners/dataflow/worker/PubsubReader.java | 42 +++++--- .../worker/UngroupedWindmillReader.java | 41 ++++--- .../worker/WindmillKeyedWorkItem.java | 101 ++++++++++++------ .../worker/WindmillReaderIteratorBase.java | 54 ++++++---- .../runners/dataflow/worker/WindmillSink.java | 12 ++- .../worker/WindowingWindmillReader.java | 54 ++++++---- .../worker/StreamingDataflowWorkerTest.java | 74 ++++++++++++- .../worker/WindmillKeyedWorkItemTest.java | 97 +++++++++++++++++ .../WindmillReaderIteratorBaseTest.java | 42 +++++++- .../sdk/transforms/windowing/PaneInfo.java | 6 +- 11 files changed, 414 insertions(+), 116 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java index ffb2e27e55b2..9cc98276f2d9 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java @@ -23,6 +23,7 @@ import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.options.Hidden; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.ValueProvider; import org.joda.time.Duration; /** [Internal] Options for configuring StreamingDataflowWorker. */ @@ -226,6 +227,12 @@ public interface DataflowStreamingPipelineOptions extends PipelineOptions { void setWindmillServiceStreamMaxBackoffMillis(int value); + @Description( + "If true, log and skip input elements that are unable to successfully decode from the streaming backend.") + ValueProvider getSkipInputElementsWithDecodingExceptions(); + + void setSkipInputElementsWithDecodingExceptions(ValueProvider value); + @Description("Enables direct path mode for streaming engine.") @Default.InstanceFactory(EnableWindmillServiceDirectPathFactory.class) boolean getIsWindmillServiceDirectPathEnabled(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubReader.java index 024b790e8ca9..b60cb84415ff 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubReader.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubReader.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.Map; +import org.apache.beam.runners.dataflow.options.DataflowStreamingPipelineOptions; import org.apache.beam.runners.dataflow.util.CloudObject; import org.apache.beam.runners.dataflow.util.PropertyNames; import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader; @@ -32,6 +33,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.values.WindowedValue; @@ -41,26 +43,24 @@ import org.checkerframework.checker.nullness.qual.Nullable; /** A Reader that receives elements from Pubsub, via a Windmill server. */ -@SuppressWarnings({ - "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) class PubsubReader extends NativeReader> { private final Coder coder; private final StreamingModeExecutionContext context; // Function used to parse Windmill data. // If non-null, data from Windmill is expected to be a PubsubMessage protobuf. - private final SimpleFunction parseFn; + private final @Nullable SimpleFunction parseFn; + private final ValueProvider skipUndecodableElements; PubsubReader( Coder> coder, StreamingModeExecutionContext context, - SimpleFunction parseFn) { - @SuppressWarnings({"unchecked", "rawtypes"}) - WindowedValueCoder windowedCoder = (WindowedValueCoder) coder; + @Nullable SimpleFunction parseFn, + ValueProvider skipUndecodableElements) { + WindowedValueCoder windowedCoder = (WindowedValueCoder) coder; this.coder = windowedCoder.getValueCoder(); this.context = context; this.parseFn = parseFn; + this.skipUndecodableElements = skipUndecodableElements; } /** A {@link ReaderFactory.Registrar} for pubsub sources. */ @@ -75,19 +75,19 @@ public Map factories() { } } + @SuppressWarnings({"unchecked", "rawtypes"}) static class Factory implements ReaderFactory { @Override public NativeReader create( CloudObject cloudSourceSpec, - Coder coder, + @Nullable Coder coder, @Nullable PipelineOptions options, @Nullable DataflowExecutionContext executionContext, DataflowOperationContext operationContext) throws Exception { - coder = checkArgumentNotNull(coder); - @SuppressWarnings("unchecked") - Coder> typedCoder = (Coder>) coder; - SimpleFunction parseFn = null; + Coder> typedCoder = + (Coder>) checkArgumentNotNull(coder); + @Nullable SimpleFunction parseFn = null; byte[] attributesFnBytes = getBytes(cloudSourceSpec, PropertyNames.PUBSUB_SERIALIZED_ATTRIBUTES_FN, null); // If attributesFnBytes is set, Pubsub data will be in PubsubMessage protobuf format. The @@ -98,8 +98,20 @@ public NativeReader create( (SimpleFunction) SerializableUtils.deserializeFromByteArray(attributesFnBytes, "serialized fn info"); } + @Nullable + ValueProvider skipUndecodableElements = + (options != null) + ? options + .as(DataflowStreamingPipelineOptions.class) + .getSkipInputElementsWithDecodingExceptions() + : null; return new PubsubReader<>( - typedCoder, (StreamingModeExecutionContext) executionContext, parseFn); + typedCoder, + (StreamingModeExecutionContext) checkArgumentNotNull(executionContext), + parseFn, + skipUndecodableElements != null + ? skipUndecodableElements + : ValueProvider.StaticValueProvider.of(false)); } } @@ -110,7 +122,7 @@ public NativeReaderIterator> iterator() throws IOException { class PubsubReaderIterator extends WindmillReaderIteratorBase { protected PubsubReaderIterator(Windmill.WorkItem work) { - super(work); + super(work, skipUndecodableElements); } @Override diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java index 625dc590d24b..2347529cf4a5 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.dataflow.worker; import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import com.google.auto.service.AutoService; import java.io.IOException; @@ -25,12 +26,14 @@ import java.util.Collection; import java.util.Map; import org.apache.beam.model.fnexecution.v1.BeamFnApi; +import org.apache.beam.runners.dataflow.options.DataflowStreamingPipelineOptions; import org.apache.beam.runners.dataflow.util.CloudObject; import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.values.CausedByDrain; @@ -45,20 +48,21 @@ /** * A Reader that receives input data from a Windmill server, and returns it as individual elements. */ -@SuppressWarnings({ - "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) class UngroupedWindmillReader extends NativeReader> { private final Coder valueCoder; private final Coder> windowsCoder; - private StreamingModeExecutionContext context; + private final StreamingModeExecutionContext context; + private final ValueProvider skipUndecodableElements; - UngroupedWindmillReader(Coder> coder, StreamingModeExecutionContext context) { + UngroupedWindmillReader( + Coder> coder, + StreamingModeExecutionContext context, + ValueProvider skipUndecodableElements) { FullWindowedValueCoder inputCoder = (FullWindowedValueCoder) coder; this.valueCoder = inputCoder.getValueCoder(); this.windowsCoder = inputCoder.getWindowsCoder(); this.context = context; + this.skipUndecodableElements = skipUndecodableElements; } /** A {@link ReaderFactory.Registrar} for ungrouped windmill sources. */ @@ -75,6 +79,7 @@ public Map factories() { } } + @SuppressWarnings({"unchecked", "rawtypes"}) static class Factory implements ReaderFactory { @Override public NativeReader create( @@ -84,11 +89,21 @@ public NativeReader create( @Nullable DataflowExecutionContext executionContext, DataflowOperationContext operationContext) throws Exception { - coder = checkArgumentNotNull(coder); - @SuppressWarnings("unchecked") - Coder> typedCoder = (Coder>) coder; + Coder> typedCoder = + (Coder>) checkArgumentNotNull(coder); + @Nullable + ValueProvider skipUndecodableElements = + options != null + ? options + .as(DataflowStreamingPipelineOptions.class) + .getSkipInputElementsWithDecodingExceptions() + : null; return new UngroupedWindmillReader<>( - typedCoder, (StreamingModeExecutionContext) executionContext); + typedCoder, + (StreamingModeExecutionContext) checkArgumentNotNull(executionContext), + skipUndecodableElements != null + ? skipUndecodableElements + : ValueProvider.StaticValueProvider.of(false)); } } @@ -97,9 +112,9 @@ public NativeReaderIterator> iterator() throws IOException { return new UngroupedWindmillReaderIterator(context.getWorkItem()); } - class UngroupedWindmillReaderIterator extends WindmillReaderIteratorBase { + class UngroupedWindmillReaderIterator extends WindmillReaderIteratorBase { UngroupedWindmillReaderIterator(Windmill.WorkItem work) { - super(work); + super(work, skipUndecodableElements); } @Override @@ -134,7 +149,7 @@ protected WindowedValue decodeMessage(Windmill.Message message) throws IOExce } if (valueCoder instanceof KvCoder) { KvCoder kvCoder = (KvCoder) valueCoder; - InputStream key = context.getSerializedKey().newInput(); + InputStream key = checkNotNull(context.getSerializedKey()).newInput(); notifyElementRead(key.available() + data.available() + metadata.available()); @SuppressWarnings("unchecked") diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java index 59489babf0bd..4dd1f1fa0b44 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java @@ -34,6 +34,7 @@ import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncoding; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTimerData; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StructuredCoder; @@ -49,6 +50,8 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * An implementation of {@link KeyedWorkItem} that wraps around a {@code Windmill.WorkItem}. @@ -56,14 +59,13 @@ * @param the key type * @param the element type */ -@SuppressWarnings({ - "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) +@Internal public class WindmillKeyedWorkItem implements KeyedWorkItem { private static final Predicate IS_WATERMARK = input -> input.getType() == Timer.Type.WATERMARK; + private static final Logger LOG = LoggerFactory.getLogger(WindmillKeyedWorkItem.class); + private final Windmill.WorkItem workItem; private final K key; // used to inform that timer was caused by drain @@ -73,6 +75,7 @@ public class WindmillKeyedWorkItem implements KeyedWorkItem private final transient Coder> windowsCoder; private final transient Coder valueCoder; private final WindmillTagEncoding windmillTagEncoding; + private final boolean skipUndecodableElements; public WindmillKeyedWorkItem( K key, @@ -82,6 +85,26 @@ public WindmillKeyedWorkItem( Coder valueCoder, WindmillTagEncoding windmillTagEncoding, boolean drainMode) { + this( + key, + workItem, + windowCoder, + windowsCoder, + valueCoder, + windmillTagEncoding, + drainMode, + false); + } + + public WindmillKeyedWorkItem( + K key, + Windmill.WorkItem workItem, + Coder windowCoder, + Coder> windowsCoder, + Coder valueCoder, + WindmillTagEncoding windmillTagEncoding, + boolean drainMode, + boolean skipUndecodableElements) { this.key = key; this.workItem = workItem; this.windowCoder = windowCoder; @@ -89,6 +112,7 @@ public WindmillKeyedWorkItem( this.valueCoder = valueCoder; this.windmillTagEncoding = windmillTagEncoding; this.drainMode = drainMode; + this.skipUndecodableElements = skipUndecodableElements; } @Override @@ -113,39 +137,45 @@ public Iterable timersIterable() { }); } + private @Nullable WindowedValue parseElem(Windmill.Message message) { + try { + Instant timestamp = WindmillTimeUtils.windmillToHarnessTimestamp(message.getTimestamp()); + Collection windows = + WindmillSink.decodeMetadataWindows(windowsCoder, message.getMetadata()); + PaneInfo paneInfo = WindmillSink.decodeMetadataPane(message.getMetadata()); + /** + * https://s.apache.org/beam-drain-mode - propagate drain bit if aggregation/expiry induced by + * drain happened upstream + */ + CausedByDrain drainingValueFromUpstream = CausedByDrain.NORMAL; + if (WindowedValues.WindowedValueCoder.isMetadataSupported()) { + BeamFnApi.Elements.ElementMetadata elementMetadata = + WindmillSink.decodeAdditionalMetadata(windowsCoder, message.getMetadata()); + drainingValueFromUpstream = + elementMetadata.getDrain() == BeamFnApi.Elements.DrainMode.Enum.DRAINING + ? CausedByDrain.CAUSED_BY_DRAIN + : CausedByDrain.NORMAL; + } + InputStream inputStream = message.getData().newInput(); + ElemT value = valueCoder.decode(inputStream, Coder.Context.OUTER); + return WindowedValues.of( + value, timestamp, windows, paneInfo, null, null, drainingValueFromUpstream); + } catch (RuntimeException | IOException e) { + if (!skipUndecodableElements) { + throw new RuntimeException(e); + } + LOG.error("Skipping input element due to decoding error", e); + return null; + } + } + @Override + @SuppressWarnings("nullness") public Iterable> elementsIterable() { return FluentIterable.from(workItem.getMessageBundlesList()) .transformAndConcat(Windmill.InputMessageBundle::getMessagesList) - .transform( - message -> { - try { - Instant timestamp = - WindmillTimeUtils.windmillToHarnessTimestamp(message.getTimestamp()); - Collection windows = - WindmillSink.decodeMetadataWindows(windowsCoder, message.getMetadata()); - PaneInfo paneInfo = WindmillSink.decodeMetadataPane(message.getMetadata()); - /** - * https://s.apache.org/beam-drain-mode - propagate drain bit if aggregation/expiry - * induced by drain happened upstream - */ - CausedByDrain drainingValueFromUpstream = CausedByDrain.NORMAL; - if (WindowedValues.WindowedValueCoder.isMetadataSupported()) { - BeamFnApi.Elements.ElementMetadata elementMetadata = - WindmillSink.decodeAdditionalMetadata(windowsCoder, message.getMetadata()); - drainingValueFromUpstream = - elementMetadata.getDrain() == BeamFnApi.Elements.DrainMode.Enum.DRAINING - ? CausedByDrain.CAUSED_BY_DRAIN - : CausedByDrain.NORMAL; - } - InputStream inputStream = message.getData().newInput(); - ElemT value = valueCoder.decode(inputStream, Coder.Context.OUTER); - return WindowedValues.of( - value, timestamp, windows, paneInfo, null, null, drainingValueFromUpstream); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); + .transform(this::parseElem) + .filter(Objects::nonNull); } @Override @@ -237,12 +267,13 @@ public Coder getElementCoder() { return kvCoder.getValueCoder(); } + @SuppressWarnings("unchecked") protected FakeKeyedWorkItemCoder(Coder elemCoder) { if (elemCoder instanceof KeyedWorkItemCoder) { - KeyedWorkItemCoder kwiCoder = (KeyedWorkItemCoder) elemCoder; + KeyedWorkItemCoder kwiCoder = (KeyedWorkItemCoder) elemCoder; this.kvCoder = KvCoder.of(kwiCoder.getKeyCoder(), kwiCoder.getElementCoder()); } else if (elemCoder instanceof KvCoder) { - this.kvCoder = ((KvCoder) elemCoder); + this.kvCoder = (KvCoder) elemCoder; } else { throw new IllegalArgumentException( "FakeKeyedWorkItemCoder only works with KeyedWorkItemCoder or KvCoder; was: " diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBase.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBase.java index 5e68641bf661..4a47f2604087 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBase.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBase.java @@ -17,27 +17,33 @@ */ package org.apache.beam.runners.dataflow.worker; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; + import java.io.IOException; import java.util.NoSuchElementException; +import javax.annotation.Nullable; import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.values.WindowedValue; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Optional; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Base class for iterators that decode messages from bundles inside a {@link Windmill.WorkItem}. */ -@SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) public abstract class WindmillReaderIteratorBase extends NativeReader.NativeReaderIterator> { - private Windmill.WorkItem work; + private final Windmill.WorkItem work; private int bundleIndex = 0; private int messageIndex = -1; - private Optional> current; + private @Nullable WindowedValue current = null; + private final ValueProvider skipUndecodableElements; + private static final Logger LOG = LoggerFactory.getLogger(WindmillReaderIteratorBase.class); - protected WindmillReaderIteratorBase(Windmill.WorkItem work) { + protected WindmillReaderIteratorBase( + Windmill.WorkItem work, ValueProvider skipUndecodableElements) { + this.skipUndecodableElements = skipUndecodableElements; this.work = work; } @@ -48,30 +54,38 @@ public boolean start() throws IOException { @Override public boolean advance() throws IOException { - if (bundleIndex == work.getMessageBundlesCount() - || messageIndex == work.getMessageBundles(bundleIndex).getMessagesCount()) { - current = Optional.absent(); - return false; - } - ++messageIndex; - for (; bundleIndex < work.getMessageBundlesCount(); ++bundleIndex, messageIndex = 0) { + while (true) { + if (bundleIndex >= work.getMessageBundlesCount()) { + current = null; + return false; + } Windmill.InputMessageBundle bundle = work.getMessageBundles(bundleIndex); - if (messageIndex < bundle.getMessagesCount()) { - current = Optional.of(decodeMessage(bundle.getMessages(messageIndex))); + ++messageIndex; + if (messageIndex >= bundle.getMessagesCount()) { + messageIndex = -1; + ++bundleIndex; + continue; + } + try { + current = checkNotNull(decodeMessage(bundle.getMessages(messageIndex))); return true; + } catch (RuntimeException | IOException e) { + if (Boolean.TRUE.equals(skipUndecodableElements.get())) { + LOG.error("Skipping input element due to decoding error", e); + continue; + } + throw e; } } - current = Optional.absent(); - return false; } protected abstract WindowedValue decodeMessage(Windmill.Message message) throws IOException; @Override public WindowedValue getCurrent() throws NoSuchElementException { - if (!current.isPresent()) { + if (current == null) { throw new NoSuchElementException(); } - return current.get(); + return current; } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java index 2ed29125bd40..abee9a33df2d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java @@ -214,7 +214,7 @@ private ByteString encode(Coder coder, EncodeT object) throws } @Override - @SuppressWarnings({"rawtypes", "NestedInstanceOfConditions"}) + @SuppressWarnings("rawtypes") public long add(WindowedValue data) throws IOException { ByteString key, value; ByteString id = ByteString.EMPTY; @@ -228,16 +228,18 @@ public long add(WindowedValue data) throws IOException { KvCoder kvCoder = (KvCoder) valueCoder; KV kv = checkNotNull((KV) data.getValue()); key = encode(kvCoder.getKeyCoder(), kv.getKey()); - Coder valueCoder = kvCoder.getValueCoder(); + Coder nestedValueCoder = kvCoder.getValueCoder(); // If ids are explicitly provided, use that instead of the windmill-generated id. // This is used when reading an UnboundedSource to deduplicate records. - if (valueCoder instanceof ValueWithRecordId.ValueWithRecordIdCoder) { + if (nestedValueCoder instanceof ValueWithRecordId.ValueWithRecordIdCoder) { ValueWithRecordId valueAndId = checkNotNull((ValueWithRecordId) kv.getValue()); value = - encode(((ValueWithRecordIdCoder) valueCoder).getValueCoder(), valueAndId.getValue()); + encode( + ((ValueWithRecordIdCoder) nestedValueCoder).getValueCoder(), + valueAndId.getValue()); id = ByteString.copyFrom(valueAndId.getId()); } else { - value = encode(valueCoder, kv.getValue()); + value = encode(nestedValueCoder, kv.getValue()); } } else { key = checkNotNull(context.getSerializedKey()); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java index 7dd55d91211d..2d9b27b8e34b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.dataflow.worker; import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import com.google.auto.service.AutoService; import java.io.IOException; @@ -25,12 +26,15 @@ import java.util.Map; import java.util.NoSuchElementException; import org.apache.beam.runners.core.KeyedWorkItem; +import org.apache.beam.runners.dataflow.options.DataflowStreamingPipelineOptions; import org.apache.beam.runners.dataflow.util.CloudObject; import org.apache.beam.runners.dataflow.worker.util.ValueInEmptyWindows; import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues.FullWindowedValueCoder; @@ -42,10 +46,7 @@ * A Reader that receives input data from a Windmill server, and returns a singleton iterable * containing the work item. */ -@SuppressWarnings({ - "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) +@Internal class WindowingWindmillReader extends NativeReader>> { private final Coder keyCoder; @@ -53,19 +54,22 @@ class WindowingWindmillReader extends NativeReader windowCoder; private final Coder> windowsCoder; private StreamingModeExecutionContext context; + private final ValueProvider skipDecodingExceptions; WindowingWindmillReader( - Coder>> coder, StreamingModeExecutionContext context) { + Coder>> coder, + StreamingModeExecutionContext context, + ValueProvider skipDecodingExceptions) { FullWindowedValueCoder> inputCoder = (FullWindowedValueCoder>) coder; this.windowsCoder = inputCoder.getWindowsCoder(); this.windowCoder = inputCoder.getWindowCoder(); - @SuppressWarnings("unchecked") WindmillKeyedWorkItem.FakeKeyedWorkItemCoder keyedWorkItemCoder = (WindmillKeyedWorkItem.FakeKeyedWorkItemCoder) inputCoder.getValueCoder(); this.keyCoder = keyedWorkItemCoder.getKeyCoder(); this.valueCoder = keyedWorkItemCoder.getElementCoder(); this.context = context; + this.skipDecodingExceptions = skipDecodingExceptions; } /** A {@link ReaderFactory.Registrar} for grouping windmill sources. */ @@ -87,6 +91,7 @@ public Map factories() { static class Factory implements ReaderFactory { @Override + @SuppressWarnings("rawtypes") public NativeReader create( CloudObject spec, @Nullable Coder coder, @@ -94,14 +99,22 @@ public NativeReader create( @Nullable DataflowExecutionContext context, DataflowOperationContext operationContext) throws Exception { - coder = checkArgumentNotNull(coder); - @SuppressWarnings({ - "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) - "unchecked" - }) + @SuppressWarnings("unchecked") Coder>> typedCoder = - (Coder>>) coder; - return WindowingWindmillReader.create(typedCoder, (StreamingModeExecutionContext) context); + (Coder>>) checkArgumentNotNull(coder); + @Nullable + ValueProvider skipUndecodableElements = + (options != null) + ? options + .as(DataflowStreamingPipelineOptions.class) + .getSkipInputElementsWithDecodingExceptions() + : null; + return WindowingWindmillReader.create( + typedCoder, + (StreamingModeExecutionContext) checkArgumentNotNull(context), + skipUndecodableElements != null + ? skipUndecodableElements + : ValueProvider.StaticValueProvider.of(false)); } } @@ -110,13 +123,17 @@ public NativeReader create( * StreamingModeExecutionContext}. */ public static WindowingWindmillReader create( - Coder>> coder, StreamingModeExecutionContext context) { - return new WindowingWindmillReader(coder, context); + Coder>> coder, + StreamingModeExecutionContext context, + ValueProvider skipUndecodableElements) { + return new WindowingWindmillReader<>(coder, context, skipUndecodableElements); } @Override public NativeReaderIterator>> iterator() throws IOException { - final K key = keyCoder.decode(context.getSerializedKey().newInput(), Coder.Context.OUTER); + final K key = + keyCoder.decode( + checkStateNotNull(context.getSerializedKey()).newInput(), Coder.Context.OUTER); final WorkItem workItem = context.getWorkItem(); KeyedWorkItem keyedWorkItem = new WindmillKeyedWorkItem<>( @@ -126,7 +143,8 @@ public NativeReaderIterator>> iterator() throw windowsCoder, valueCoder, context.getWindmillTagEncoding(), - context.getDrainMode()); + context.getDrainMode(), + Boolean.TRUE.equals(skipDecodingExceptions.get())); final boolean isEmptyWorkItem = (Iterables.isEmpty(keyedWorkItem.timersIterable()) && Iterables.isEmpty(keyedWorkItem.elementsIterable())); @@ -152,7 +170,7 @@ public WindowedValue> getCurrent() { }; } else { return new NativeReaderIterator>>() { - private WindowedValue> current; + private @Nullable WindowedValue> current = null; @Override public boolean start() throws IOException { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index d11c6c374333..fb618812197d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -149,6 +149,7 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.TextualIntegerCoder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.extensions.gcp.util.Transport; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -224,9 +225,7 @@ /** Unit tests for {@link StreamingDataflowWorker}. */ @RunWith(Parameterized.class) -// TODO(https://github.com/apache/beam/issues/21230): Remove when new version of errorprone is -// released (2.11.0) -@SuppressWarnings({"unused", "deprecation"}) +@SuppressWarnings("deprecation") public class StreamingDataflowWorkerTest { private static final Logger LOG = LoggerFactory.getLogger(StreamingDataflowWorkerTest.class); @@ -857,6 +856,13 @@ private DataflowWorkerHarnessOptions createTestingPipelineOptions(String... args if (streamingEngine) { argsList.add("--experiments=enable_streaming_engine"); } + // We need to set the ValueProvider in all cases because we examine it and it is a + // RuntimeValueParameter. + if (argsList.stream() + .noneMatch(s -> s.startsWith("--skipInputElementsWithDecodingExceptions"))) { + argsList.add("--skipInputElementsWithDecodingExceptions=false"); + } + LOG.info("Running with args {}", argsList); DataflowWorkerHarnessOptions options = PipelineOptionsFactory.fromArgs(argsList.toArray(new String[0])) .as(DataflowWorkerHarnessOptions.class); @@ -870,7 +876,6 @@ private DataflowWorkerHarnessOptions createTestingPipelineOptions(String... args if (options.getActiveWorkRefreshPeriodMillis() == 10000) { options.setActiveWorkRefreshPeriodMillis(0); } - return options; } @@ -4314,6 +4319,67 @@ public void testDefaultNumCommitThreads() { } } + @Test + public void testSkipInputElementsWithDecodingExceptions() throws Exception { + KvCoder kvCoder = KvCoder.of(StringUtf8Coder.of(), TextualIntegerCoder.of()); + List instructions = + Arrays.asList(makeSourceInstruction(kvCoder), makeSinkInstruction(kvCoder, 0)); + + StreamingDataflowWorker worker = + makeWorker( + defaultWorkerParams("--skipInputElementsWithDecodingExceptions=true") + .setInstructions(instructions) + .publishCounters() + .build()); + worker.start(); + + // Create a work item with one valid message and one corrupted message. + Windmill.GetWorkResponse.Builder builder = Windmill.GetWorkResponse.newBuilder(); + Windmill.ComputationWorkItems.Builder computationBuilder = + builder.addWorkBuilder().setComputationId(DEFAULT_COMPUTATION_ID).setInputDataWatermark(1); + Windmill.WorkItem.Builder workItemBuilder = + computationBuilder + .addWorkBuilder() + .setKey(DEFAULT_KEY_BYTES) + .setShardingKey(DEFAULT_SHARDING_KEY) + .setWorkToken(1) + .setCacheToken(2); + + Windmill.InputMessageBundle.Builder bundleBuilder = + workItemBuilder + .addMessageBundlesBuilder() + .setSourceComputationId(DEFAULT_SOURCE_COMPUTATION_ID); + + // Valid message + bundleBuilder + .addMessagesBuilder() + .setTimestamp(0) + .setData(ByteString.copyFromUtf8("12345")) + .setMetadata(addPaneTag(PaneInfo.NO_FIRING, intervalWindowBytes(DEFAULT_WINDOW))); + + // Corrupted message (invalid data for kvCoder) + bundleBuilder + .addMessagesBuilder() + .setTimestamp(1000) + .setData(ByteString.copyFromUtf8("54321corrupted data")) + .setMetadata(addPaneTag(PaneInfo.NO_FIRING, intervalWindowBytes(DEFAULT_WINDOW))); + + server.whenGetWorkCalled().thenReturn(builder.build()); + + Map result = server.waitForAndGetCommits(1); + worker.stop(); + + assertTrue(result.containsKey(1L)); + Windmill.WorkItemCommitRequest commit = result.get(1L); + + // Verify that only the valid message was processed and output. + assertEquals(1, commit.getOutputMessagesCount()); + assertEquals(1, commit.getOutputMessages(0).getBundles(0).getMessagesCount()); + assertEquals("key", commit.getOutputMessages(0).getBundles(0).getKey().toStringUtf8()); + assertEquals( + "12345", commit.getOutputMessages(0).getBundles(0).getMessages(0).getData().toStringUtf8()); + } + static class BlockingFn extends DoFn implements TestRule { public static AtomicReference blocker = diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java index 69aa4d0d69af..c1568058435b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java @@ -18,7 +18,9 @@ package org.apache.beam.runners.dataflow.worker; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertTrue; +import com.google.common.collect.Iterables; import java.io.IOException; import java.util.Collection; import java.util.Collections; @@ -118,6 +120,93 @@ public void testElementIteration() throws Exception { WindowedValues.of("earth", new Instant(6), WINDOW_1, paneInfo(1)))); } + @Test + public void testElementIterationWithSkipEnabled() throws Exception { + Windmill.WorkItem.Builder workItem = + Windmill.WorkItem.newBuilder().setKey(SERIALIZED_KEY).setWorkToken(17); + Windmill.InputMessageBundle.Builder chunk1 = workItem.addMessageBundlesBuilder(); + chunk1.setSourceComputationId("computation"); + addElement(chunk1, 5, "hello", WINDOW_1, paneInfo(0)); + addElement(chunk1, 7, "world", WINDOW_2, paneInfo(2)); + Windmill.InputMessageBundle.Builder chunk2 = workItem.addMessageBundlesBuilder(); + chunk2.setSourceComputationId("computation"); + addElement(chunk2, 6, "earth", WINDOW_1, paneInfo(1)); + + KeyedWorkItem keyedWorkItem = + new WindmillKeyedWorkItem<>( + KEY, + workItem.build(), + WINDOW_CODER, + WINDOWS_CODER, + VALUE_CODER, + windmillTagEncoding, + false, + true); + + assertThat( + keyedWorkItem.elementsIterable(), + Matchers.contains( + WindowedValues.of("hello", new Instant(5), WINDOW_1, paneInfo(0)), + WindowedValues.of("world", new Instant(7), WINDOW_2, paneInfo(2)), + WindowedValues.of("earth", new Instant(6), WINDOW_1, paneInfo(1)))); + } + + @Test + public void testElementIterationSkips() throws Exception { + Windmill.WorkItem.Builder workItem = + Windmill.WorkItem.newBuilder().setKey(SERIALIZED_KEY).setWorkToken(17); + Windmill.InputMessageBundle.Builder chunk1 = workItem.addMessageBundlesBuilder(); + chunk1.setSourceComputationId("computation"); + addElement(chunk1, 5, "hello", WINDOW_1, paneInfo(0)); + addCorruptedElement(chunk1); + Windmill.InputMessageBundle.Builder chunk2 = workItem.addMessageBundlesBuilder(); + chunk2.setSourceComputationId("computation"); + addElement(chunk2, 6, "earth", WINDOW_1, paneInfo(1)); + + KeyedWorkItem keyedWorkItem = + new WindmillKeyedWorkItem<>( + KEY, + workItem.build(), + WINDOW_CODER, + WINDOWS_CODER, + VALUE_CODER, + windmillTagEncoding, + false, + true); + + assertThat( + keyedWorkItem.elementsIterable(), + Matchers.contains( + WindowedValues.of("hello", new Instant(5), WINDOW_1, paneInfo(0)), + WindowedValues.of("earth", new Instant(6), WINDOW_1, paneInfo(1)))); + } + + @Test + public void testElementIterationAllSkips() throws Exception { + Windmill.WorkItem.Builder workItem = + Windmill.WorkItem.newBuilder().setKey(SERIALIZED_KEY).setWorkToken(17); + Windmill.InputMessageBundle.Builder chunk1 = workItem.addMessageBundlesBuilder(); + chunk1.setSourceComputationId("computation"); + addCorruptedElement(chunk1); + addCorruptedElement(chunk1); + Windmill.InputMessageBundle.Builder chunk2 = workItem.addMessageBundlesBuilder(); + chunk2.setSourceComputationId("computation"); + addCorruptedElement(chunk2); + + KeyedWorkItem keyedWorkItem = + new WindmillKeyedWorkItem<>( + KEY, + workItem.build(), + WINDOW_CODER, + WINDOWS_CODER, + VALUE_CODER, + windmillTagEncoding, + false, + true); + + assertTrue(Iterables.isEmpty(keyedWorkItem.elementsIterable())); + } + private void addElement( Windmill.InputMessageBundle.Builder chunk, long timestamp, @@ -156,6 +245,14 @@ private void addElementWithMetadata( .setMetadata(encodedMetadata); } + private void addCorruptedElement(Windmill.InputMessageBundle.Builder chunk) { + chunk + .addMessagesBuilder() + .setTimestamp(1) + .setData(ByteString.copyFromUtf8("bad data")) + .setMetadata(ByteString.copyFromUtf8("bad metadata")); + } + private PaneInfo paneInfo(int index) { return PaneInfo.createPane(false, false, Timing.EARLY, index, -1); } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBaseTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBaseTest.java index b6a4cb86c686..61e2f4250d06 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBaseTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBaseTest.java @@ -24,7 +24,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.ThreadLocalRandom; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; @@ -36,12 +39,16 @@ @RunWith(JUnit4.class) public class WindmillReaderIteratorBaseTest { private static class TestWindmillReaderIterator extends WindmillReaderIteratorBase { - protected TestWindmillReaderIterator(Windmill.WorkItem work) { - super(work); + protected TestWindmillReaderIterator( + Windmill.WorkItem work, ValueProvider skipUndecodableElements) { + super(work, skipUndecodableElements); } @Override - protected WindowedValue decodeMessage(Windmill.Message message) { + protected WindowedValue decodeMessage(Windmill.Message message) throws CoderException { + if (message.getTimestamp() < 0) { + throw new CoderException("Injected decoding error to test skipping."); + } return WindowedValues.valueInGlobalWindow(message.getTimestamp()); } } @@ -60,7 +67,26 @@ public void testBasic() throws IOException { testForMessageBundleCounts(0, 0, 1, 3, 0, 1, 0, 0, 0, 0); } + @Test + public void testSkipErrors() throws IOException { + testForMessageBundleCounts(true); + testForMessageBundleCounts(true, 0); + testForMessageBundleCounts(true, 0, 0); + testForMessageBundleCounts(true, 1); + testForMessageBundleCounts(true, 2); + testForMessageBundleCounts(true, 1, 1); + testForMessageBundleCounts(true, 0, 1); + testForMessageBundleCounts(true, 1, 0); + testForMessageBundleCounts(true, 0, 0, 1, 3, 0, 1, 0, 0, 0, 1); + testForMessageBundleCounts(true, 0, 0, 1, 3, 0, 1, 0, 0, 0, 0); + } + private void testForMessageBundleCounts(int... messageBundleCounts) throws IOException { + testForMessageBundleCounts(false, messageBundleCounts); + } + + private void testForMessageBundleCounts(boolean skipErrors, int... messageBundleCounts) + throws IOException { List bundles = new ArrayList<>(); long numTotalMessages = 0; for (int count : messageBundleCounts) { @@ -73,6 +99,10 @@ private void testForMessageBundleCounts(int... messageBundleCounts) throws IOExc .setData(ByteString.EMPTY) .build()); } + if (skipErrors && ThreadLocalRandom.current().nextBoolean()) { + bundle.addMessages( + Windmill.Message.newBuilder().setTimestamp(-10).setData(ByteString.EMPTY).build()); + } bundles.add(bundle.build()); } Windmill.WorkItem workItem = @@ -81,7 +111,9 @@ private void testForMessageBundleCounts(int... messageBundleCounts) throws IOExc .setWorkToken(0L) .addAllMessageBundles(bundles) .build(); - try (TestWindmillReaderIterator iter = new TestWindmillReaderIterator(workItem)) { + try (TestWindmillReaderIterator iter = + new TestWindmillReaderIterator( + workItem, ValueProvider.StaticValueProvider.of(skipErrors))) { List actual = ReaderTestUtils.windowedValuesToValues( ReaderUtils.readRemainingFromIterator(iter, false)); @@ -90,7 +122,7 @@ private void testForMessageBundleCounts(int... messageBundleCounts) throws IOExc for (int i = 0; i < numTotalMessages; ++i) { expected.add((long) i); } - assertEquals(Arrays.toString(messageBundleCounts), expected, actual); + assertEquals(Arrays.toString(messageBundleCounts) + skipErrors, expected, actual); } } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java index f253d1794837..c8a21f8d9c12 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java @@ -341,7 +341,11 @@ private enum Encoding { tag = (byte) (ordinal() << 4); } - public static Encoding fromTag(byte b) { + public static Encoding fromTag(byte b) throws CoderException { + int index = b >> 4; + if (index < 0 || index > values().length) { + throw new CoderException("Invalid pane encoding " + index); + } return Encoding.values()[b >> 4]; } } From ded91d6728b076e2f6d3735ec1b0ee3b40126623 Mon Sep 17 00:00:00 2001 From: Sam Whittle Date: Tue, 10 Mar 2026 12:47:51 +0100 Subject: [PATCH 2/3] address comments --- .../runners/dataflow/worker/WindmillKeyedWorkItem.java | 6 +++++- .../dataflow/worker/WindmillReaderIteratorBase.java | 9 +++++++-- .../runners/dataflow/worker/WindowingWindmillReader.java | 9 +++++---- .../dataflow/worker/StreamingDataflowWorkerTest.java | 6 ------ 4 files changed, 17 insertions(+), 13 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java index 4dd1f1fa0b44..c328719bfb50 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java @@ -164,7 +164,11 @@ public Iterable timersIterable() { if (!skipUndecodableElements) { throw new RuntimeException(e); } - LOG.error("Skipping input element due to decoding error", e); + LOG.error( + "Skipping input element for work token {} on sharding key {} due to decoding error", + workItem.getWorkToken(), + workItem.getShardingKey(), + e); return null; } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBase.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBase.java index 4a47f2604087..7e6508a4788c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBase.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBase.java @@ -70,8 +70,13 @@ public boolean advance() throws IOException { current = checkNotNull(decodeMessage(bundle.getMessages(messageIndex))); return true; } catch (RuntimeException | IOException e) { - if (Boolean.TRUE.equals(skipUndecodableElements.get())) { - LOG.error("Skipping input element due to decoding error", e); + if (skipUndecodableElements.isAccessible() + && Boolean.TRUE.equals(skipUndecodableElements.get())) { + LOG.error( + "Skipping input element for work token {} on sharding key {} due to decoding error", + work.getWorkToken(), + work.getShardingKey(), + e); continue; } throw e; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java index 2d9b27b8e34b..173b254f6395 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java @@ -54,12 +54,12 @@ class WindowingWindmillReader extends NativeReader windowCoder; private final Coder> windowsCoder; private StreamingModeExecutionContext context; - private final ValueProvider skipDecodingExceptions; + private final ValueProvider skipUndecodableElements; WindowingWindmillReader( Coder>> coder, StreamingModeExecutionContext context, - ValueProvider skipDecodingExceptions) { + ValueProvider skipUndecodableElements) { FullWindowedValueCoder> inputCoder = (FullWindowedValueCoder>) coder; this.windowsCoder = inputCoder.getWindowsCoder(); @@ -69,7 +69,7 @@ class WindowingWindmillReader extends NativeReader>> iterator() throw valueCoder, context.getWindmillTagEncoding(), context.getDrainMode(), - Boolean.TRUE.equals(skipDecodingExceptions.get())); + skipUndecodableElements.isAccessible() + && Boolean.TRUE.equals(skipUndecodableElements.get())); final boolean isEmptyWorkItem = (Iterables.isEmpty(keyedWorkItem.timersIterable()) && Iterables.isEmpty(keyedWorkItem.elementsIterable())); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index fb618812197d..5e4e3b276880 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -856,12 +856,6 @@ private DataflowWorkerHarnessOptions createTestingPipelineOptions(String... args if (streamingEngine) { argsList.add("--experiments=enable_streaming_engine"); } - // We need to set the ValueProvider in all cases because we examine it and it is a - // RuntimeValueParameter. - if (argsList.stream() - .noneMatch(s -> s.startsWith("--skipInputElementsWithDecodingExceptions"))) { - argsList.add("--skipInputElementsWithDecodingExceptions=false"); - } LOG.info("Running with args {}", argsList); DataflowWorkerHarnessOptions options = PipelineOptionsFactory.fromArgs(argsList.toArray(new String[0])) From 4e40b48ef8498dc7436a086bd53d0ee167e9cd5f Mon Sep 17 00:00:00 2001 From: scwhittle Date: Tue, 10 Mar 2026 12:49:40 +0100 Subject: [PATCH 3/3] Update sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java Co-authored-by: Arun Pandian --- .../java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java index c8a21f8d9c12..4805122035a6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java @@ -343,7 +343,7 @@ private enum Encoding { public static Encoding fromTag(byte b) throws CoderException { int index = b >> 4; - if (index < 0 || index > values().length) { + if (index < 0 || index >= values().length) { throw new CoderException("Invalid pane encoding " + index); } return Encoding.values()[b >> 4];