Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.runners.dataflow.worker;

import static org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannels.remoteChannel;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

import com.google.api.services.dataflow.model.MapTask;
import com.google.auto.value.AutoValue;
Expand Down Expand Up @@ -119,6 +120,8 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.checkerframework.checker.initialization.qual.UnderInitialization;
import org.checkerframework.checker.initialization.qual.UnknownInitialization;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
Expand All @@ -129,9 +132,6 @@
*
* <p>Implements a Streaming Dataflow worker.
*/
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
@Internal
public final class StreamingDataflowWorker {

Expand Down Expand Up @@ -189,7 +189,7 @@ public final class StreamingDataflowWorker {
private final StreamingWorkerStatusReporter workerStatusReporter;
private final int numCommitThreads;
private final Supplier<Instant> clock;
private final GrpcDispatcherClient dispatcherClient;
private final @Nullable GrpcDispatcherClient dispatcherClient;
private final ExecutorService harnessSwitchExecutor;
private final long clientId;
private final WindmillServerStub windmillServer;
Expand Down Expand Up @@ -271,7 +271,7 @@ private StreamingDataflowWorker(
streamingWorkScheduler,
getDataMetricTracker,
memoryMonitor,
this.dispatcherClient);
checkNotNull(this.dispatcherClient));
} else {
harnessFactoryOutput =
createSingleSourceWorkerHarness(
Expand Down Expand Up @@ -330,6 +330,8 @@ private StreamingDataflowWorker(
}

private StreamingWorkerHarnessFactoryOutput createApplianceWorkerHarness(
@UnderInitialization()
StreamingDataflowWorker this, // Use receiver parameter syntax to allow annotation.
long clientId,
DataflowWorkerHarnessOptions options,
WindmillServerStub windmillServer,
Expand All @@ -345,6 +347,7 @@ private StreamingWorkerHarnessFactoryOutput createApplianceWorkerHarness(

GetDataClient getDataClient = new ApplianceGetDataClient(windmillServer, getDataMetricTracker);
HeartbeatSender heartbeatSender = new ApplianceHeartbeatSender(windmillServer::getData);
@SuppressWarnings("methodref.receiver.bound")
WorkCommitter workCommitter =
StreamingApplianceWorkCommitter.create(windmillServer::commitWork, this::onCompleteCommit);
GetWorkSender getWorkSender = GetWorkSender.forAppliance(() -> windmillServer.getWork(request));
Expand All @@ -355,7 +358,7 @@ private StreamingWorkerHarnessFactoryOutput createApplianceWorkerHarness(
.setStreamingWorkScheduler(streamingWorkScheduler)
.setWorkCommitter(workCommitter)
.setGetDataClient(getDataClient)
.setComputationStateFetcher(this.computationStateCache::get)
.setComputationStateFetcher(checkNotNull(this.computationStateCache)::get)
.setWaitForResources(() -> memoryMonitor.waitForResources("GetWork"))
.setHeartbeatSender(heartbeatSender)
.setGetWorkSender(getWorkSender)
Expand All @@ -368,6 +371,8 @@ private StreamingWorkerHarnessFactoryOutput createApplianceWorkerHarness(
}

private StreamingWorkerHarnessFactoryOutput createFanOutStreamingEngineWorkerHarness(
@UnknownInitialization()
StreamingDataflowWorker this, // Use receiver parameter syntax to allow annotation.
long clientId,
DataflowWorkerHarnessOptions options,
GrpcWindmillStreamFactory windmillStreamFactory,
Expand All @@ -376,7 +381,8 @@ private StreamingWorkerHarnessFactoryOutput createFanOutStreamingEngineWorkerHar
MemoryMonitor memoryMonitor,
GrpcDispatcherClient dispatcherClient) {
WeightedSemaphore<Commit> maxCommitByteSemaphore = Commits.maxCommitByteSemaphore();
ChannelCache channelCache = createChannelCache(options, configFetcher);
ChannelCache channelCache = createChannelCache(options, checkNotNull(configFetcher));
@SuppressWarnings("methodref.receiver.bound")
FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkerHarness =
FanOutStreamingEngineWorkerHarness.create(
createJobHeader(options, clientId),
Expand All @@ -391,7 +397,7 @@ private StreamingWorkerHarnessFactoryOutput createFanOutStreamingEngineWorkerHar
processingContext,
drainMode,
getWorkStreamLatencies) ->
computationStateCache
checkNotNull(computationStateCache)
.get(processingContext.computationId())
.ifPresent(
computationState -> {
Expand All @@ -407,7 +413,7 @@ private StreamingWorkerHarnessFactoryOutput createFanOutStreamingEngineWorkerHar
}),
ChannelCachingRemoteStubFactory.create(options.getGcpCredential(), channelCache),
GetWorkBudgetDistributors.distributeEvenly(),
Preconditions.checkNotNull(dispatcherClient),
checkNotNull(dispatcherClient),
commitWorkStream ->
StreamingEngineWorkCommitter.builder()
// Share the commitByteSemaphore across all created workCommitters.
Expand All @@ -433,6 +439,8 @@ private StreamingWorkerHarnessFactoryOutput createFanOutStreamingEngineWorkerHar
}

private StreamingWorkerHarnessFactoryOutput createSingleSourceWorkerHarness(
@UnknownInitialization()
StreamingDataflowWorker this, // Use receiver parameter syntax to allow annotation.
long clientId,
DataflowWorkerHarnessOptions options,
WindmillServerStub windmillServer,
Expand All @@ -454,7 +462,11 @@ private StreamingWorkerHarnessFactoryOutput createSingleSourceWorkerHarness(
new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool);
HeartbeatSender heartbeatSender =
createStreamingEngineHeartbeatSender(
options, windmillServer, getDataStreamPool, configFetcher.getGlobalConfigHandle());
options,
windmillServer,
getDataStreamPool,
checkNotNull(configFetcher).getGlobalConfigHandle());
@SuppressWarnings("methodref.receiver.bound")
WorkCommitter workCommitter =
StreamingEngineWorkCommitter.builder()
.setCommitWorkStreamFactory(
Expand All @@ -476,7 +488,7 @@ private StreamingWorkerHarnessFactoryOutput createSingleSourceWorkerHarness(
.setStreamingWorkScheduler(streamingWorkScheduler)
.setWorkCommitter(workCommitter)
.setGetDataClient(getDataClient)
.setComputationStateFetcher(this.computationStateCache::get)
.setComputationStateFetcher(checkNotNull(this.computationStateCache)::get)
.setWaitForResources(() -> memoryMonitor.waitForResources("GetWork"))
.setHeartbeatSender(heartbeatSender)
.setGetWorkSender(getWorkSender)
Expand All @@ -489,17 +501,20 @@ private StreamingWorkerHarnessFactoryOutput createSingleSourceWorkerHarness(
}

private void switchStreamingWorkerHarness(ConnectivityType connectivityType) {
if ((connectivityType == ConnectivityType.CONNECTIVITY_TYPE_DIRECTPATH
if (connectivityType == ConnectivityType.CONNECTIVITY_TYPE_DEFAULT) {
return;
}
boolean directPath = connectivityType == ConnectivityType.CONNECTIVITY_TYPE_DIRECTPATH;
if ((directPath
&& this.streamingWorkerHarness.get() instanceof FanOutStreamingEngineWorkerHarness)
|| (connectivityType == ConnectivityType.CONNECTIVITY_TYPE_CLOUDPATH
&& streamingWorkerHarness.get() instanceof SingleSourceWorkerHarness)) {
|| (!directPath && streamingWorkerHarness.get() instanceof SingleSourceWorkerHarness)) {
return;
}
// Stop the current status pages before switching the harness.
this.statusPages.get().stop();
LOG.debug("Stopped StreamingWorkerStatusPages before switching connectivity type.");
StreamingWorkerHarnessFactoryOutput newHarnessFactoryOutput = null;
if (connectivityType == ConnectivityType.CONNECTIVITY_TYPE_DIRECTPATH) {
StreamingWorkerHarnessFactoryOutput newHarnessFactoryOutput;
if (directPath) {
// If dataflow experiment `enable_windmill_service_direct_path` is not set for
// the job, do not switch to FanOutStreamingEngineWorkerHarness. This is because
// `enable_windmill_service_direct_path` is tied to SDK version and is only
Expand All @@ -524,11 +539,11 @@ private void switchStreamingWorkerHarness(ConnectivityType connectivityType) {
this.streamingWorkScheduler,
this.getDataMetricTracker,
this.memoryMonitor.memoryMonitor(),
this.dispatcherClient);
checkNotNull(this.dispatcherClient));
this.streamingWorkerHarness.set(newHarnessFactoryOutput.streamingWorkerHarness());
streamingWorkerHarness.get().start();
LOG.debug("Started FanOutStreamingEngineWorkerHarness");
} else if (connectivityType == ConnectivityType.CONNECTIVITY_TYPE_CLOUDPATH) {
} else {
LOG.info("Switching connectivity type from DIRECTPATH to CLOUDPATH");
LOG.debug("Shutting down FanOutStreamingEngineWorkerHarness");
streamingWorkerHarness.get().shutdown();
Expand Down
Loading