Skip to content

Commit 736fc0e

Browse files
committed
[Dataflow Streaming] Add a pipeline option to skip decoding errors of input elements. Such messages will log an error but are otherwise discarded.
1 parent 676c998 commit 736fc0e

File tree

11 files changed

+414
-116
lines changed

11 files changed

+414
-116
lines changed

runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.beam.sdk.options.ExperimentalOptions;
2424
import org.apache.beam.sdk.options.Hidden;
2525
import org.apache.beam.sdk.options.PipelineOptions;
26+
import org.apache.beam.sdk.options.ValueProvider;
2627
import org.joda.time.Duration;
2728

2829
/** [Internal] Options for configuring StreamingDataflowWorker. */
@@ -226,6 +227,12 @@ public interface DataflowStreamingPipelineOptions extends PipelineOptions {
226227

227228
void setWindmillServiceStreamMaxBackoffMillis(int value);
228229

230+
@Description(
231+
"If true, log and skip input elements that are unable to successfully decode from the streaming backend.")
232+
ValueProvider<Boolean> getSkipInputElementsWithDecodingExceptions();
233+
234+
void setSkipInputElementsWithDecodingExceptions(ValueProvider<Boolean> value);
235+
229236
@Description("Enables direct path mode for streaming engine.")
230237
@Default.InstanceFactory(EnableWindmillServiceDirectPathFactory.class)
231238
boolean getIsWindmillServiceDirectPathEnabled();

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubReader.java

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.io.IOException;
2525
import java.io.InputStream;
2626
import java.util.Map;
27+
import org.apache.beam.runners.dataflow.options.DataflowStreamingPipelineOptions;
2728
import org.apache.beam.runners.dataflow.util.CloudObject;
2829
import org.apache.beam.runners.dataflow.util.PropertyNames;
2930
import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
@@ -32,6 +33,7 @@
3233
import org.apache.beam.sdk.coders.Coder;
3334
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
3435
import org.apache.beam.sdk.options.PipelineOptions;
36+
import org.apache.beam.sdk.options.ValueProvider;
3537
import org.apache.beam.sdk.transforms.SimpleFunction;
3638
import org.apache.beam.sdk.util.SerializableUtils;
3739
import org.apache.beam.sdk.values.WindowedValue;
@@ -41,26 +43,24 @@
4143
import org.checkerframework.checker.nullness.qual.Nullable;
4244

4345
/** A Reader that receives elements from Pubsub, via a Windmill server. */
44-
@SuppressWarnings({
45-
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
46-
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
47-
})
4846
class PubsubReader<T> extends NativeReader<WindowedValue<T>> {
4947
private final Coder<T> coder;
5048
private final StreamingModeExecutionContext context;
5149
// Function used to parse Windmill data.
5250
// If non-null, data from Windmill is expected to be a PubsubMessage protobuf.
53-
private final SimpleFunction<PubsubMessage, T> parseFn;
51+
private final @Nullable SimpleFunction<PubsubMessage, T> parseFn;
52+
private final ValueProvider<Boolean> skipUndecodableElements;
5453

5554
PubsubReader(
5655
Coder<WindowedValue<T>> coder,
5756
StreamingModeExecutionContext context,
58-
SimpleFunction<PubsubMessage, T> parseFn) {
59-
@SuppressWarnings({"unchecked", "rawtypes"})
60-
WindowedValueCoder<T> windowedCoder = (WindowedValueCoder) coder;
57+
@Nullable SimpleFunction<PubsubMessage, T> parseFn,
58+
ValueProvider<Boolean> skipUndecodableElements) {
59+
WindowedValueCoder<T> windowedCoder = (WindowedValueCoder<T>) coder;
6160
this.coder = windowedCoder.getValueCoder();
6261
this.context = context;
6362
this.parseFn = parseFn;
63+
this.skipUndecodableElements = skipUndecodableElements;
6464
}
6565

6666
/** A {@link ReaderFactory.Registrar} for pubsub sources. */
@@ -75,19 +75,19 @@ public Map<String, ReaderFactory> factories() {
7575
}
7676
}
7777

78+
@SuppressWarnings({"unchecked", "rawtypes"})
7879
static class Factory implements ReaderFactory {
7980
@Override
8081
public NativeReader<?> create(
8182
CloudObject cloudSourceSpec,
82-
Coder<?> coder,
83+
@Nullable Coder<?> coder,
8384
@Nullable PipelineOptions options,
8485
@Nullable DataflowExecutionContext executionContext,
8586
DataflowOperationContext operationContext)
8687
throws Exception {
87-
coder = checkArgumentNotNull(coder);
88-
@SuppressWarnings("unchecked")
89-
Coder<WindowedValue<Object>> typedCoder = (Coder<WindowedValue<Object>>) coder;
90-
SimpleFunction<PubsubMessage, Object> parseFn = null;
88+
Coder<WindowedValue<Object>> typedCoder =
89+
(Coder<WindowedValue<Object>>) checkArgumentNotNull(coder);
90+
@Nullable SimpleFunction<PubsubMessage, Object> parseFn = null;
9191
byte[] attributesFnBytes =
9292
getBytes(cloudSourceSpec, PropertyNames.PUBSUB_SERIALIZED_ATTRIBUTES_FN, null);
9393
// If attributesFnBytes is set, Pubsub data will be in PubsubMessage protobuf format. The
@@ -98,8 +98,20 @@ public NativeReader<?> create(
9898
(SimpleFunction<PubsubMessage, Object>)
9999
SerializableUtils.deserializeFromByteArray(attributesFnBytes, "serialized fn info");
100100
}
101+
@Nullable
102+
ValueProvider<Boolean> skipUndecodableElements =
103+
(options != null)
104+
? options
105+
.as(DataflowStreamingPipelineOptions.class)
106+
.getSkipInputElementsWithDecodingExceptions()
107+
: null;
101108
return new PubsubReader<>(
102-
typedCoder, (StreamingModeExecutionContext) executionContext, parseFn);
109+
typedCoder,
110+
(StreamingModeExecutionContext) checkArgumentNotNull(executionContext),
111+
parseFn,
112+
skipUndecodableElements != null
113+
? skipUndecodableElements
114+
: ValueProvider.StaticValueProvider.of(false));
103115
}
104116
}
105117

@@ -110,7 +122,7 @@ public NativeReaderIterator<WindowedValue<T>> iterator() throws IOException {
110122

111123
class PubsubReaderIterator extends WindmillReaderIteratorBase<T> {
112124
protected PubsubReaderIterator(Windmill.WorkItem work) {
113-
super(work);
125+
super(work, skipUndecodableElements);
114126
}
115127

116128
@Override

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,22 @@
1818
package org.apache.beam.runners.dataflow.worker;
1919

2020
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
21+
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
2122

2223
import com.google.auto.service.AutoService;
2324
import java.io.IOException;
2425
import java.io.InputStream;
2526
import java.util.Collection;
2627
import java.util.Map;
2728
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
29+
import org.apache.beam.runners.dataflow.options.DataflowStreamingPipelineOptions;
2830
import org.apache.beam.runners.dataflow.util.CloudObject;
2931
import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
3032
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
3133
import org.apache.beam.sdk.coders.Coder;
3234
import org.apache.beam.sdk.coders.KvCoder;
3335
import org.apache.beam.sdk.options.PipelineOptions;
36+
import org.apache.beam.sdk.options.ValueProvider;
3437
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
3538
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
3639
import org.apache.beam.sdk.values.CausedByDrain;
@@ -45,20 +48,21 @@
4548
/**
4649
* A Reader that receives input data from a Windmill server, and returns it as individual elements.
4750
*/
48-
@SuppressWarnings({
49-
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
50-
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
51-
})
5251
class UngroupedWindmillReader<T> extends NativeReader<WindowedValue<T>> {
5352
private final Coder<T> valueCoder;
5453
private final Coder<Collection<? extends BoundedWindow>> windowsCoder;
55-
private StreamingModeExecutionContext context;
54+
private final StreamingModeExecutionContext context;
55+
private final ValueProvider<Boolean> skipUndecodableElements;
5656

57-
UngroupedWindmillReader(Coder<WindowedValue<T>> coder, StreamingModeExecutionContext context) {
57+
UngroupedWindmillReader(
58+
Coder<WindowedValue<T>> coder,
59+
StreamingModeExecutionContext context,
60+
ValueProvider<Boolean> skipUndecodableElements) {
5861
FullWindowedValueCoder<T> inputCoder = (FullWindowedValueCoder<T>) coder;
5962
this.valueCoder = inputCoder.getValueCoder();
6063
this.windowsCoder = inputCoder.getWindowsCoder();
6164
this.context = context;
65+
this.skipUndecodableElements = skipUndecodableElements;
6266
}
6367

6468
/** A {@link ReaderFactory.Registrar} for ungrouped windmill sources. */
@@ -75,6 +79,7 @@ public Map<String, ReaderFactory> factories() {
7579
}
7680
}
7781

82+
@SuppressWarnings({"unchecked", "rawtypes"})
7883
static class Factory implements ReaderFactory {
7984
@Override
8085
public NativeReader<?> create(
@@ -84,11 +89,21 @@ public NativeReader<?> create(
8489
@Nullable DataflowExecutionContext executionContext,
8590
DataflowOperationContext operationContext)
8691
throws Exception {
87-
coder = checkArgumentNotNull(coder);
88-
@SuppressWarnings("unchecked")
89-
Coder<WindowedValue<Object>> typedCoder = (Coder<WindowedValue<Object>>) coder;
92+
Coder<WindowedValue<Object>> typedCoder =
93+
(Coder<WindowedValue<Object>>) checkArgumentNotNull(coder);
94+
@Nullable
95+
ValueProvider<Boolean> skipUndecodableElements =
96+
options != null
97+
? options
98+
.as(DataflowStreamingPipelineOptions.class)
99+
.getSkipInputElementsWithDecodingExceptions()
100+
: null;
90101
return new UngroupedWindmillReader<>(
91-
typedCoder, (StreamingModeExecutionContext) executionContext);
102+
typedCoder,
103+
(StreamingModeExecutionContext) checkArgumentNotNull(executionContext),
104+
skipUndecodableElements != null
105+
? skipUndecodableElements
106+
: ValueProvider.StaticValueProvider.of(false));
92107
}
93108
}
94109

@@ -97,9 +112,9 @@ public NativeReaderIterator<WindowedValue<T>> iterator() throws IOException {
97112
return new UngroupedWindmillReaderIterator(context.getWorkItem());
98113
}
99114

100-
class UngroupedWindmillReaderIterator extends WindmillReaderIteratorBase {
115+
class UngroupedWindmillReaderIterator extends WindmillReaderIteratorBase<T> {
101116
UngroupedWindmillReaderIterator(Windmill.WorkItem work) {
102-
super(work);
117+
super(work, skipUndecodableElements);
103118
}
104119

105120
@Override
@@ -134,7 +149,7 @@ protected WindowedValue<T> decodeMessage(Windmill.Message message) throws IOExce
134149
}
135150
if (valueCoder instanceof KvCoder) {
136151
KvCoder<?, ?> kvCoder = (KvCoder<?, ?>) valueCoder;
137-
InputStream key = context.getSerializedKey().newInput();
152+
InputStream key = checkNotNull(context.getSerializedKey()).newInput();
138153
notifyElementRead(key.available() + data.available() + metadata.available());
139154

140155
@SuppressWarnings("unchecked")

0 commit comments

Comments
 (0)