Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1569,7 +1569,6 @@ class BeamModulePlugin implements Plugin<Project> {
"TimeUnitConversionChecker",
"UndefinedEquals",
"UnescapedEntity",
"UnnecessaryLambda",
"UnnecessaryMethodReference",
"UnnecessaryParentheses",
"UnrecognisedJavadocTag",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,25 +261,26 @@ public class StreamingDataflowWorkerTest {
private static final String DEFAULT_DATA_STRING = "data";
private static final String DEFAULT_DESTINATION_STREAM_ID = "out";
private static final long MAXIMUM_BYTES_OUTSTANDING = 10000000;
private static final Function<GetDataRequest, GetDataResponse> EMPTY_DATA_RESPONDER =
(GetDataRequest request) -> {
GetDataResponse.Builder builder = GetDataResponse.newBuilder();
for (ComputationGetDataRequest compRequest : request.getRequestsList()) {
ComputationGetDataResponse.Builder compBuilder =
builder.addDataBuilder().setComputationId(compRequest.getComputationId());
for (KeyedGetDataRequest keyRequest : compRequest.getRequestsList()) {
KeyedGetDataResponse.Builder keyBuilder =
compBuilder
.addDataBuilder()
.setKey(keyRequest.getKey())
.setShardingKey(keyRequest.getShardingKey());
keyBuilder.addAllValues(keyRequest.getValuesToFetchList());
keyBuilder.addAllBags(keyRequest.getBagsToFetchList());
keyBuilder.addAllWatermarkHolds(keyRequest.getWatermarkHoldsToFetchList());
}
}
return builder.build();
};

private static GetDataResponse emptyDataResponder(GetDataRequest request) {
GetDataResponse.Builder builder = GetDataResponse.newBuilder();
for (ComputationGetDataRequest compRequest : request.getRequestsList()) {
ComputationGetDataResponse.Builder compBuilder =
builder.addDataBuilder().setComputationId(compRequest.getComputationId());
for (KeyedGetDataRequest keyRequest : compRequest.getRequestsList()) {
KeyedGetDataResponse.Builder keyBuilder =
compBuilder
.addDataBuilder()
.setKey(keyRequest.getKey())
.setShardingKey(keyRequest.getShardingKey());
keyBuilder.addAllValues(keyRequest.getValuesToFetchList());
keyBuilder.addAllBags(keyRequest.getBagsToFetchList());
keyBuilder.addAllWatermarkHolds(keyRequest.getWatermarkHoldsToFetchList());
}
}
return builder.build();
}

private final boolean streamingEngine;
private final Supplier<Long> idGenerator =
new Supplier<Long>() {
Expand Down Expand Up @@ -2315,7 +2316,7 @@ private void runMergeSessionsActions(List<Action> actions) throws Exception {
worker.start();

// Respond to any GetData requests with empty state.
server.whenGetDataCalled().answerByDefault(EMPTY_DATA_RESPONDER);
server.whenGetDataCalled().answerByDefault(StreamingDataflowWorkerTest::emptyDataResponder);

for (int i = 0; i < actions.size(); ++i) {
Action action = actions.get(i);
Expand Down Expand Up @@ -3686,7 +3687,8 @@ public void testLatencyAttributionToQueuedState() throws Exception {
.build());
worker.start();

ActiveWorkRefreshSink awrSink = new ActiveWorkRefreshSink(EMPTY_DATA_RESPONDER);
ActiveWorkRefreshSink awrSink =
new ActiveWorkRefreshSink(StreamingDataflowWorkerTest::emptyDataResponder);
server.whenGetDataCalled().answerByDefault(awrSink::getData).delayEachResponseBy(Duration.ZERO);
server
.whenGetWorkCalled()
Expand Down Expand Up @@ -3723,7 +3725,8 @@ public void testLatencyAttributionToActiveState() throws Exception {
.build());
worker.start();

ActiveWorkRefreshSink awrSink = new ActiveWorkRefreshSink(EMPTY_DATA_RESPONDER);
ActiveWorkRefreshSink awrSink =
new ActiveWorkRefreshSink(StreamingDataflowWorkerTest::emptyDataResponder);
server.whenGetDataCalled().answerByDefault(awrSink::getData).delayEachResponseBy(Duration.ZERO);
server.whenGetWorkCalled().thenReturn(makeInput(workToken, 0 /* timestamp */));
server.waitForAndGetCommits(1);
Expand Down Expand Up @@ -3760,7 +3763,7 @@ public void testLatencyAttributionToReadingState() throws Exception {
new ActiveWorkRefreshSink(
(request) -> {
clock.sleep(Duration.millis(1000));
return EMPTY_DATA_RESPONDER.apply(request);
return emptyDataResponder(request);
});
server.whenGetDataCalled().answerByDefault(awrSink::getData).delayEachResponseBy(Duration.ZERO);
server.whenGetWorkCalled().thenReturn(makeInput(workToken, 0 /* timestamp */));
Expand Down Expand Up @@ -3801,7 +3804,8 @@ public void testLatencyAttributionToCommittingState() throws Exception {
.build());
worker.start();

ActiveWorkRefreshSink awrSink = new ActiveWorkRefreshSink(EMPTY_DATA_RESPONDER);
ActiveWorkRefreshSink awrSink =
new ActiveWorkRefreshSink(StreamingDataflowWorkerTest::emptyDataResponder);
server.whenGetDataCalled().answerByDefault(awrSink::getData).delayEachResponseBy(Duration.ZERO);
server.whenGetWorkCalled().thenReturn(makeInput(workToken, TimeUnit.MILLISECONDS.toMicros(0)));
server.waitForAndGetCommits(1);
Expand Down Expand Up @@ -3834,7 +3838,8 @@ public void testLatencyAttributionPopulatedInCommitRequest() throws Exception {
.build());
worker.start();

ActiveWorkRefreshSink awrSink = new ActiveWorkRefreshSink(EMPTY_DATA_RESPONDER);
ActiveWorkRefreshSink awrSink =
new ActiveWorkRefreshSink(StreamingDataflowWorkerTest::emptyDataResponder);
server.whenGetDataCalled().answerByDefault(awrSink::getData).delayEachResponseBy(Duration.ZERO);
server.whenGetWorkCalled().thenReturn(makeInput(workToken, 1 /* timestamp */));
Map<Long, WorkItemCommitRequest> workItemCommitRequest = server.waitForAndGetCommits(1);
Expand Down Expand Up @@ -4818,7 +4823,7 @@ GetDataResponse getData(GetDataRequest request) {
}
}
}
return EMPTY_DATA_RESPONDER.apply(request);
return emptyDataResponder(request);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;

import java.util.function.BooleanSupplier;
import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
import org.eclipse.jetty.server.LocalConnector;
import org.eclipse.jetty.server.Server;
Expand All @@ -39,13 +38,17 @@ public class WorkerStatusPagesTest {
private final Server server = new Server();
private final LocalConnector connector = new LocalConnector(server);
@Mock private MemoryMonitor mockMemoryMonitor;
private final BooleanSupplier mockHealthyIndicator = () -> true;

private boolean mockHealthyIndicator() {
return true;
}

private WorkerStatusPages wsp;

@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
wsp = new WorkerStatusPages(server, mockMemoryMonitor, mockHealthyIndicator);
wsp = new WorkerStatusPages(server, mockMemoryMonitor, this::mockHealthyIndicator);
server.addConnector(connector);
wsp.start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.beam.runners.dataflow.worker.WorkerUncaughtExceptionHandler;
import org.apache.beam.runners.dataflow.worker.streaming.ComputationState;
import org.apache.beam.runners.dataflow.worker.util.common.worker.JvmRuntime;
Expand All @@ -44,9 +43,13 @@ public class SingleSourceWorkerHarnessTest {
private final WorkCommitter workCommitter = mock(WorkCommitter.class);
private final GetDataClient getDataClient = mock(GetDataClient.class);
private final HeartbeatSender heartbeatSender = mock(HeartbeatSender.class);
private final Runnable waitForResources = () -> {};
private final Function<String, Optional<ComputationState>> computationStateFetcher =
ignored -> Optional.empty();

private void waitForResources() {}

private Optional<ComputationState> computationStateFetcher(String ignored) {
return Optional.empty();
}

private final StreamingWorkScheduler streamingWorkScheduler = mock(StreamingWorkScheduler.class);

private SingleSourceWorkerHarness createWorkerHarness(
Expand All @@ -57,9 +60,9 @@ private SingleSourceWorkerHarness createWorkerHarness(
.setWorkCommitter(workCommitter)
.setGetDataClient(getDataClient)
.setHeartbeatSender(heartbeatSender)
.setWaitForResources(waitForResources)
.setWaitForResources(this::waitForResources)
.setStreamingWorkScheduler(streamingWorkScheduler)
.setComputationStateFetcher(computationStateFetcher)
.setComputationStateFetcher(this::computationStateFetcher)
.setGetWorkSender(getWorkSender)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,10 @@

@RunWith(JUnit4.class)
public class ActiveWorkRefresherTest {
private static final Supplier<Instant> A_LONG_TIME_AGO =
() -> Instant.parse("1998-09-04T00:00:00Z");
private static Instant aLongTimeAgo() {
return Instant.parse("1998-09-04T00:00:00Z");
}

private static final String COMPUTATION_ID_PREFIX = "ComputationId-";
private final HeartbeatSender heartbeatSender = mock(HeartbeatSender.class);

Expand Down Expand Up @@ -134,7 +136,7 @@ private ExecutableWork createOldWork(
Work.createProcessingContext(
"computationId", new FakeGetDataClient(), ignored -> {}, heartbeatSender),
false,
A_LONG_TIME_AGO),
ActiveWorkRefresherTest::aLongTimeAgo),
processWork);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
Expand Down Expand Up @@ -114,42 +113,42 @@ public class EncoderHelpers {
private static final Map<Class<?>, Encoder<?>> DEFAULT_ENCODERS = new ConcurrentHashMap<>();

// Factory for default encoders by class
private static final Function<Class<?>, @Nullable Encoder<?>> ENCODER_FACTORY =
cls -> {
if (cls.equals(PaneInfo.class)) {
return paneInfoEncoder();
} else if (cls.equals(GlobalWindow.class)) {
return binaryEncoder(GlobalWindow.Coder.INSTANCE, false);
} else if (cls.equals(IntervalWindow.class)) {
return binaryEncoder(IntervalWindowCoder.of(), false);
} else if (cls.equals(Instant.class)) {
return instantEncoder();
} else if (cls.equals(String.class)) {
return Encoders.STRING();
} else if (cls.equals(Boolean.class)) {
return Encoders.BOOLEAN();
} else if (cls.equals(Integer.class)) {
return Encoders.INT();
} else if (cls.equals(Long.class)) {
return Encoders.LONG();
} else if (cls.equals(Float.class)) {
return Encoders.FLOAT();
} else if (cls.equals(Double.class)) {
return Encoders.DOUBLE();
} else if (cls.equals(BigDecimal.class)) {
return Encoders.DECIMAL();
} else if (cls.equals(byte[].class)) {
return Encoders.BINARY();
} else if (cls.equals(Byte.class)) {
return Encoders.BYTE();
} else if (cls.equals(Short.class)) {
return Encoders.SHORT();
}
return null;
};
private static @Nullable Encoder<?> encoderFactory(Class<?> cls) {
if (cls.equals(PaneInfo.class)) {
return paneInfoEncoder();
} else if (cls.equals(GlobalWindow.class)) {
return binaryEncoder(GlobalWindow.Coder.INSTANCE, false);
} else if (cls.equals(IntervalWindow.class)) {
return binaryEncoder(IntervalWindowCoder.of(), false);
} else if (cls.equals(Instant.class)) {
return instantEncoder();
} else if (cls.equals(String.class)) {
return Encoders.STRING();
} else if (cls.equals(Boolean.class)) {
return Encoders.BOOLEAN();
} else if (cls.equals(Integer.class)) {
return Encoders.INT();
} else if (cls.equals(Long.class)) {
return Encoders.LONG();
} else if (cls.equals(Float.class)) {
return Encoders.FLOAT();
} else if (cls.equals(Double.class)) {
return Encoders.DOUBLE();
} else if (cls.equals(BigDecimal.class)) {
return Encoders.DECIMAL();
} else if (cls.equals(byte[].class)) {
return Encoders.BINARY();
} else if (cls.equals(Byte.class)) {
return Encoders.BYTE();
} else if (cls.equals(Short.class)) {
return Encoders.SHORT();
}
return null;
Comment on lines +117 to +146
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For better readability and maintainability, you could refactor this long if-else if chain into a switch statement on the canonical name of the class. This makes the code cleaner and easier to extend.

    String canonicalName = cls.getCanonicalName();
    if (canonicalName == null) {
      return null;
    }
    switch (canonicalName) {
      case "org.apache.beam.sdk.transforms.windowing.PaneInfo":
        return paneInfoEncoder();
      case "org.apache.beam.sdk.transforms.windowing.GlobalWindow":
        return binaryEncoder(GlobalWindow.Coder.INSTANCE, false);
      case "org.apache.beam.sdk.transforms.windowing.IntervalWindow":
        return binaryEncoder(IntervalWindowCoder.of(), false);
      case "org.joda.time.Instant":
        return instantEncoder();
      case "java.lang.String":
        return Encoders.STRING();
      case "java.lang.Boolean":
        return Encoders.BOOLEAN();
      case "java.lang.Integer":
        return Encoders.INT();
      case "java.lang.Long":
        return Encoders.LONG();
      case "java.lang.Float":
        return Encoders.FLOAT();
      case "java.lang.Double":
        return Encoders.DOUBLE();
      case "java.math.BigDecimal":
        return Encoders.DECIMAL();
      case "byte[]":
        return Encoders.BINARY();
      case "java.lang.Byte":
        return Encoders.BYTE();
      case "java.lang.Short":
        return Encoders.SHORT();
      default:
        return null;
    }

}

@SuppressWarnings({"nullness", "methodref.return"}) // computeIfAbsent allows null returns
private static <T> @Nullable Encoder<T> getOrCreateDefaultEncoder(Class<? super T> cls) {
return (Encoder<T>) DEFAULT_ENCODERS.computeIfAbsent(cls, ENCODER_FACTORY);
return (Encoder<T>) DEFAULT_ENCODERS.computeIfAbsent(cls, EncoderHelpers::encoderFactory);
}

/** Gets or creates a default {@link Encoder} for {@link T}. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ public static <T> Builder<T> withOnNext(Consumer<T> onNext) {
return new Builder<>(
new ForwardingCallStreamObserver<>(
onNext,
TestStreams.throwingErrorHandler(),
TestStreams.noopRunnable(),
TestStreams.alwaysTrueSupplier()));
TestStreams::throwingErrorHandler,
TestStreams::noopRunnable,
TestStreams::alwaysTrueSupplier));
}

/** A builder for a test {@link CallStreamObserver} that performs various callbacks. */
Expand Down Expand Up @@ -90,18 +90,14 @@ public CallStreamObserver<T> build() {
}
}

private static Consumer<Throwable> throwingErrorHandler() {
return item -> {
throw new RuntimeException(item);
};
private static void throwingErrorHandler(Throwable item) {
throw new RuntimeException(item);
}

private static Runnable noopRunnable() {
return () -> {};
}
private static void noopRunnable() {}

private static Supplier<Boolean> alwaysTrueSupplier() {
return () -> true;
private static boolean alwaysTrueSupplier() {
return true;
}

/** A {@link CallStreamObserver} which executes the supplied callbacks. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.beam.sdk.options.FileStagingOptions;
import org.apache.beam.sdk.options.PipelineOptions;
Expand Down Expand Up @@ -56,15 +55,17 @@ public static List<String> detectClassPathResourcesToStage(
List<String> detectedResources =
artifactsRelatedOptions.getPipelineResourcesDetector().detect(classLoader);

return detectedResources.stream().filter(isStageable()).collect(Collectors.toList());
return detectedResources.stream()
.filter(PipelineResources::isStageable)
.collect(Collectors.toList());
}

/**
* Returns a predicate for filtering all resources that are impossible to stage (like gradle
* wrapper jars).
*/
private static Predicate<String> isStageable() {
return resourcePath -> !resourcePath.contains("gradle/wrapper");
private static boolean isStageable(String resourcePath) {
return !resourcePath.contains("gradle/wrapper");
}

/**
Expand Down
Loading
Loading