Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.Hidden;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.joda.time.Duration;

/** [Internal] Options for configuring StreamingDataflowWorker. */
Expand Down Expand Up @@ -226,6 +227,12 @@ public interface DataflowStreamingPipelineOptions extends PipelineOptions {

void setWindmillServiceStreamMaxBackoffMillis(int value);

@Description(
"If true, log and skip input elements that are unable to successfully decode from the streaming backend.")
ValueProvider<Boolean> getSkipInputElementsWithDecodingExceptions();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is not clear why this needs to be a ValueProvider and not a Boolean. It seems like all the places where this is used are instantiated during the runtime and the pipeline options will be available there.

I'm not familiar with how ValueProviders work, does this need to be a ValueProvider?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that ValueProvider was needed for all templates but it is just classic templates. In the classic template case, the graph and pipeline serialization happens during template creation and the valueprovider is a placeholder that allows injecting the value later when the template is instantiated. Flex templates on the other hand rerun the graph and pipeline serialization logic each invocation and don't require them. In particular the source/reader is a serialized part of the graph. Given that this is more of an emergency option and a customer using a classic template may wish to use it, it seems worthwhile keeping it as a valueprovider.


void setSkipInputElementsWithDecodingExceptions(ValueProvider<Boolean> value);

@Description("Enables direct path mode for streaming engine.")
@Default.InstanceFactory(EnableWindmillServiceDirectPathFactory.class)
boolean getIsWindmillServiceDirectPathEnabled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import org.apache.beam.runners.dataflow.options.DataflowStreamingPipelineOptions;
import org.apache.beam.runners.dataflow.util.CloudObject;
import org.apache.beam.runners.dataflow.util.PropertyNames;
import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
Expand All @@ -32,6 +33,7 @@
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.WindowedValue;
Expand All @@ -41,26 +43,24 @@
import org.checkerframework.checker.nullness.qual.Nullable;

/** A Reader that receives elements from Pubsub, via a Windmill server. */
@SuppressWarnings({
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
class PubsubReader<T> extends NativeReader<WindowedValue<T>> {
private final Coder<T> coder;
private final StreamingModeExecutionContext context;
// Function used to parse Windmill data.
// If non-null, data from Windmill is expected to be a PubsubMessage protobuf.
private final SimpleFunction<PubsubMessage, T> parseFn;
private final @Nullable SimpleFunction<PubsubMessage, T> parseFn;
private final ValueProvider<Boolean> skipUndecodableElements;

PubsubReader(
Coder<WindowedValue<T>> coder,
StreamingModeExecutionContext context,
SimpleFunction<PubsubMessage, T> parseFn) {
@SuppressWarnings({"unchecked", "rawtypes"})
WindowedValueCoder<T> windowedCoder = (WindowedValueCoder) coder;
@Nullable SimpleFunction<PubsubMessage, T> parseFn,
ValueProvider<Boolean> skipUndecodableElements) {
WindowedValueCoder<T> windowedCoder = (WindowedValueCoder<T>) coder;
this.coder = windowedCoder.getValueCoder();
this.context = context;
this.parseFn = parseFn;
this.skipUndecodableElements = skipUndecodableElements;
}

/** A {@link ReaderFactory.Registrar} for pubsub sources. */
Expand All @@ -75,19 +75,19 @@ public Map<String, ReaderFactory> factories() {
}
}

@SuppressWarnings({"unchecked", "rawtypes"})
static class Factory implements ReaderFactory {
@Override
public NativeReader<?> create(
CloudObject cloudSourceSpec,
Coder<?> coder,
@Nullable Coder<?> coder,
@Nullable PipelineOptions options,
@Nullable DataflowExecutionContext executionContext,
DataflowOperationContext operationContext)
throws Exception {
coder = checkArgumentNotNull(coder);
@SuppressWarnings("unchecked")
Coder<WindowedValue<Object>> typedCoder = (Coder<WindowedValue<Object>>) coder;
SimpleFunction<PubsubMessage, Object> parseFn = null;
Coder<WindowedValue<Object>> typedCoder =
(Coder<WindowedValue<Object>>) checkArgumentNotNull(coder);
@Nullable SimpleFunction<PubsubMessage, Object> parseFn = null;
byte[] attributesFnBytes =
getBytes(cloudSourceSpec, PropertyNames.PUBSUB_SERIALIZED_ATTRIBUTES_FN, null);
// If attributesFnBytes is set, Pubsub data will be in PubsubMessage protobuf format. The
Expand All @@ -98,8 +98,20 @@ public NativeReader<?> create(
(SimpleFunction<PubsubMessage, Object>)
SerializableUtils.deserializeFromByteArray(attributesFnBytes, "serialized fn info");
}
@Nullable
ValueProvider<Boolean> skipUndecodableElements =
(options != null)
? options
.as(DataflowStreamingPipelineOptions.class)
.getSkipInputElementsWithDecodingExceptions()
: null;
return new PubsubReader<>(
typedCoder, (StreamingModeExecutionContext) executionContext, parseFn);
typedCoder,
(StreamingModeExecutionContext) checkArgumentNotNull(executionContext),
parseFn,
skipUndecodableElements != null
? skipUndecodableElements
: ValueProvider.StaticValueProvider.of(false));
}
}

Expand All @@ -110,7 +122,7 @@ public NativeReaderIterator<WindowedValue<T>> iterator() throws IOException {

class PubsubReaderIterator extends WindmillReaderIteratorBase<T> {
protected PubsubReaderIterator(Windmill.WorkItem work) {
super(work);
super(work, skipUndecodableElements);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,22 @@
package org.apache.beam.runners.dataflow.worker;

import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

import com.google.auto.service.AutoService;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.Map;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.runners.dataflow.options.DataflowStreamingPipelineOptions;
import org.apache.beam.runners.dataflow.util.CloudObject;
import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.CausedByDrain;
Expand All @@ -45,20 +48,21 @@
/**
* A Reader that receives input data from a Windmill server, and returns it as individual elements.
*/
@SuppressWarnings({
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
class UngroupedWindmillReader<T> extends NativeReader<WindowedValue<T>> {
private final Coder<T> valueCoder;
private final Coder<Collection<? extends BoundedWindow>> windowsCoder;
private StreamingModeExecutionContext context;
private final StreamingModeExecutionContext context;
private final ValueProvider<Boolean> skipUndecodableElements;

UngroupedWindmillReader(Coder<WindowedValue<T>> coder, StreamingModeExecutionContext context) {
UngroupedWindmillReader(
Coder<WindowedValue<T>> coder,
StreamingModeExecutionContext context,
ValueProvider<Boolean> skipUndecodableElements) {
FullWindowedValueCoder<T> inputCoder = (FullWindowedValueCoder<T>) coder;
this.valueCoder = inputCoder.getValueCoder();
this.windowsCoder = inputCoder.getWindowsCoder();
this.context = context;
this.skipUndecodableElements = skipUndecodableElements;
}

/** A {@link ReaderFactory.Registrar} for ungrouped windmill sources. */
Expand All @@ -75,6 +79,7 @@ public Map<String, ReaderFactory> factories() {
}
}

@SuppressWarnings({"unchecked", "rawtypes"})
static class Factory implements ReaderFactory {
@Override
public NativeReader<?> create(
Expand All @@ -84,11 +89,21 @@ public NativeReader<?> create(
@Nullable DataflowExecutionContext executionContext,
DataflowOperationContext operationContext)
throws Exception {
coder = checkArgumentNotNull(coder);
@SuppressWarnings("unchecked")
Coder<WindowedValue<Object>> typedCoder = (Coder<WindowedValue<Object>>) coder;
Coder<WindowedValue<Object>> typedCoder =
(Coder<WindowedValue<Object>>) checkArgumentNotNull(coder);
@Nullable
ValueProvider<Boolean> skipUndecodableElements =
options != null
? options
.as(DataflowStreamingPipelineOptions.class)
.getSkipInputElementsWithDecodingExceptions()
: null;
return new UngroupedWindmillReader<>(
typedCoder, (StreamingModeExecutionContext) executionContext);
typedCoder,
(StreamingModeExecutionContext) checkArgumentNotNull(executionContext),
skipUndecodableElements != null
? skipUndecodableElements
: ValueProvider.StaticValueProvider.of(false));
}
}

Expand All @@ -97,9 +112,9 @@ public NativeReaderIterator<WindowedValue<T>> iterator() throws IOException {
return new UngroupedWindmillReaderIterator(context.getWorkItem());
}

class UngroupedWindmillReaderIterator extends WindmillReaderIteratorBase {
class UngroupedWindmillReaderIterator extends WindmillReaderIteratorBase<T> {
UngroupedWindmillReaderIterator(Windmill.WorkItem work) {
super(work);
super(work, skipUndecodableElements);
}

@Override
Expand Down Expand Up @@ -134,7 +149,7 @@ protected WindowedValue<T> decodeMessage(Windmill.Message message) throws IOExce
}
if (valueCoder instanceof KvCoder) {
KvCoder<?, ?> kvCoder = (KvCoder<?, ?>) valueCoder;
InputStream key = context.getSerializedKey().newInput();
InputStream key = checkNotNull(context.getSerializedKey()).newInput();
notifyElementRead(key.available() + data.available() + metadata.available());

@SuppressWarnings("unchecked")
Expand Down
Loading
Loading