Skip to content

Commit

Permalink
Merge branch 'main' into enh/4838-support-plugin-loading-in-conifg
Browse files Browse the repository at this point in the history
Signed-off-by: George Chen <qchea@amazon.com>
  • Loading branch information
chenqi0805 committed Sep 24, 2024
2 parents 80ef670 + a1c11e5 commit a4d4a08
Show file tree
Hide file tree
Showing 87 changed files with 5,155 additions and 300 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
public @interface DataPrepperPlugin {
String DEFAULT_DEPRECATED_NAME = "";

String DEFAULT_ALTERNATE_NAME = "";

/**
*
* @return Name of the plugin which should be unique for the type
Expand All @@ -46,6 +48,12 @@
*/
String deprecatedName() default DEFAULT_DEPRECATED_NAME;

/**
*
* @return Alternate name of the plugin which should be unique for the type
*/
String[] alternateNames() default {};

/**
* The class type for this plugin.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,17 +97,21 @@ public void shutdown() {
shutdownServers();
}

private void shutdownPipelines() {
shutdownPipelines(DataPrepperShutdownOptions.defaultOptions());
}

/**
* Triggers the shutdown of all configured valid pipelines.
*/
public void shutdownPipelines() {
public void shutdownPipelines(final DataPrepperShutdownOptions shutdownOptions) {
transformationPipelines.forEach((name, pipeline) -> {
pipeline.removeShutdownObserver(pipelinesObserver);
});

for (final Pipeline pipeline : transformationPipelines.values()) {
LOG.info("Shutting down pipeline: {}", pipeline.getName());
pipeline.shutdown();
pipeline.shutdown(shutdownOptions);
}
}

Expand All @@ -127,11 +131,12 @@ public void shutdownServers() {
*
* @param pipeline name of the pipeline
*/
public void shutdownPipelines(final String pipeline) {
public void shutdownPipeline(final String pipeline) {
if (transformationPipelines.containsKey(pipeline)) {
transformationPipelines.get(pipeline).shutdown();
}
}

public PluginFactory getPluginFactory() {
return pluginFactory;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper;

import java.time.Duration;

public class DataPrepperShutdownOptions {
private final Duration bufferReadTimeout;
private final Duration bufferDrainTimeout;

public static DataPrepperShutdownOptions defaultOptions() {
return new DataPrepperShutdownOptions(builder());
}

public static Builder builder() {
return new Builder();
}

public static class Builder {
private Duration bufferReadTimeout;
private Duration bufferDrainTimeout;

private Builder() {
}

public Builder withBufferReadTimeout(final Duration bufferReadTimeout) {
this.bufferReadTimeout = bufferReadTimeout;
return this;
}

public Builder withBufferDrainTimeout(final Duration bufferDrainTimeout) {
this.bufferDrainTimeout = bufferDrainTimeout;
return this;
}

public DataPrepperShutdownOptions build() {
return new DataPrepperShutdownOptions(this);
}
}

private DataPrepperShutdownOptions(final Builder builder) {
this.bufferReadTimeout = builder.bufferReadTimeout;
this.bufferDrainTimeout = builder.bufferDrainTimeout;

if(bufferReadTimeout != null && bufferDrainTimeout != null) {
if (bufferReadTimeout.compareTo(bufferDrainTimeout) > 0) {
throw new IllegalArgumentException("Buffer read timeout cannot be greater than buffer drain timeout");
}
}
}

public Duration getBufferReadTimeout() {
return bufferReadTimeout;
}

public Duration getBufferDrainTimeout() {
return bufferDrainTimeout;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.pipeline;

import com.google.common.base.Preconditions;
import org.opensearch.dataprepper.DataPrepperShutdownOptions;
import org.opensearch.dataprepper.acknowledgements.InactiveAcknowledgementSetManager;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
Expand Down Expand Up @@ -41,7 +42,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand All @@ -55,7 +55,7 @@
public class Pipeline {
private static final Logger LOG = LoggerFactory.getLogger(Pipeline.class);
private static final int SINK_LOGGING_FREQUENCY = (int) Duration.ofSeconds(60).toMillis();
private volatile AtomicBoolean stopRequested;
private final PipelineShutdown pipelineShutdown;

private final String name;
private final Source source;
Expand Down Expand Up @@ -137,7 +137,7 @@ public Pipeline(
this.sinkExecutorService = PipelineThreadPoolExecutor.newFixedThreadPool(processorThreads,
new PipelineThreadFactory(format("%s-sink-worker", name)), this);

stopRequested = new AtomicBoolean(false);
this.pipelineShutdown = new PipelineShutdown(buffer);
}

AcknowledgementSetManager getAcknowledgementSetManager() {
Expand Down Expand Up @@ -176,7 +176,11 @@ public Collection<Sink> getSinks() {
}

public boolean isStopRequested() {
return stopRequested.get();
return pipelineShutdown.isStopRequested();
}

public boolean isForceStopReadingBuffers() {
return pipelineShutdown.isForceStopReadingBuffers();
}

public Duration getPeerForwarderDrainTimeout() {
Expand Down Expand Up @@ -267,6 +271,10 @@ public void execute() {
}
}

public synchronized void shutdown() {
shutdown(DataPrepperShutdownOptions.defaultOptions());
}

/**
* Initiates shutdown of the pipeline by:
* 1. Stopping the source to prevent new items from being consumed
Expand All @@ -276,19 +284,20 @@ public void execute() {
* 5. Shutting down processors and sinks
* 6. Stopping the sink ExecutorService
*/
public synchronized void shutdown() {
public synchronized void shutdown(final DataPrepperShutdownOptions dataPrepperShutdownOptions) {
LOG.info("Pipeline [{}] - Received shutdown signal with buffer drain timeout {}, processor shutdown timeout {}, " +
"and sink shutdown timeout {}. Initiating the shutdown process",
name, buffer.getDrainTimeout(), processorShutdownTimeout, sinkShutdownTimeout);
try {
source.stop();
stopRequested.set(true);
} catch (Exception ex) {
LOG.error("Pipeline [{}] - Encountered exception while stopping the source, " +
"proceeding with termination of process workers", name, ex);
}

shutdownExecutorService(processorExecutorService, buffer.getDrainTimeout().toMillis() + processorShutdownTimeout.toMillis(), "processor");
pipelineShutdown.shutdown(dataPrepperShutdownOptions);

shutdownExecutorService(processorExecutorService, pipelineShutdown.getBufferDrainTimeout().plus(processorShutdownTimeout), "processor");

processorSets.forEach(processorSet -> processorSet.forEach(Processor::shutdown));
buffer.shutdown();
Expand All @@ -297,7 +306,7 @@ public synchronized void shutdown() {
.map(DataFlowComponent::getComponent)
.forEach(Sink::shutdown);

shutdownExecutorService(sinkExecutorService, sinkShutdownTimeout.toMillis(), "sink");
shutdownExecutorService(sinkExecutorService, sinkShutdownTimeout, "sink");

LOG.info("Pipeline [{}] - Pipeline fully shutdown.", name);

Expand All @@ -312,13 +321,13 @@ public void removeShutdownObserver(final PipelineObserver pipelineObserver) {
observers.remove(pipelineObserver);
}

private void shutdownExecutorService(final ExecutorService executorService, final long timeoutForTerminationInMillis, final String workerName) {
private void shutdownExecutorService(final ExecutorService executorService, final Duration timeoutForTermination, final String workerName) {
LOG.info("Pipeline [{}] - Shutting down {} process workers.", name, workerName);

executorService.shutdown();
try {
if (!executorService.awaitTermination(timeoutForTerminationInMillis, TimeUnit.MILLISECONDS)) {
LOG.warn("Pipeline [{}] - Workers did not terminate in time, forcing termination of {} workers.", name, workerName);
if (!executorService.awaitTermination(timeoutForTermination.toMillis(), TimeUnit.MILLISECONDS)) {
LOG.warn("Pipeline [{}] - Workers did not terminate in {}, forcing termination of {} workers.", name, timeoutForTermination, workerName);
executorService.shutdownNow();
}
} catch (InterruptedException ex) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.pipeline;

import org.opensearch.dataprepper.DataPrepperShutdownOptions;
import org.opensearch.dataprepper.model.buffer.Buffer;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;

class PipelineShutdown {
private final AtomicBoolean stopRequested = new AtomicBoolean(false);
private final Duration bufferDrainTimeout;
private final Clock clock;
private Instant shutdownRequestedAt;
private Instant forceStopReadingBuffersAt;
private Duration bufferDrainTimeoutOverride;

PipelineShutdown(final Buffer<?> buffer) {
this(buffer, Clock.systemDefaultZone());
}

PipelineShutdown(final Buffer<?> buffer, final Clock clock) {
bufferDrainTimeout = Objects.requireNonNull(buffer.getDrainTimeout());
this.clock = clock;
}

public void shutdown(final DataPrepperShutdownOptions dataPrepperShutdownOptions) {
final boolean stopPreviouslyRequested = stopRequested.get();
if(stopPreviouslyRequested) {
return;
}

stopRequested.set(true);
shutdownRequestedAt = now();

final Duration bufferReadTimeout = dataPrepperShutdownOptions.getBufferReadTimeout();
if(bufferReadTimeout != null) {
forceStopReadingBuffersAt = shutdownRequestedAt.plus(bufferReadTimeout);
}

final Duration bufferDrainTimeoutOverride = dataPrepperShutdownOptions.getBufferDrainTimeout();
if(bufferDrainTimeoutOverride != null) {
this.bufferDrainTimeoutOverride = bufferDrainTimeoutOverride;
}
}

boolean isStopRequested() {
return stopRequested.get();
}

boolean isForceStopReadingBuffers() {
return forceStopReadingBuffersAt != null && now().isAfter(forceStopReadingBuffersAt);
}

public Duration getBufferDrainTimeout() {
return bufferDrainTimeoutOverride != null ?
bufferDrainTimeoutOverride : bufferDrainTimeout;
}

private Instant now() {
return Instant.ofEpochMilli(clock.millis());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,37 +60,41 @@ public void run() {
while (!pipeline.isStopRequested()) {
doRun();
}
LOG.info("Processor shutdown phase 1 complete.");
executeShutdownProcess();
} catch (final Exception e) {
LOG.error("Encountered exception during pipeline {} processing", pipeline.getName(), e);
}
}

// Phase 2 - execute until buffers are empty
LOG.info("Beginning processor shutdown phase 2, iterating until buffers empty.");
while (!readBuffer.isEmpty()) {
doRun();
}
LOG.info("Processor shutdown phase 2 complete.");
private void executeShutdownProcess() {
LOG.info("Processor shutdown phase 1 complete.");

// Phase 3 - execute until peer forwarder drain period expires (best effort to process all peer forwarder data)
final long drainTimeoutExpiration = System.currentTimeMillis() + pipeline.getPeerForwarderDrainTimeout().toMillis();
LOG.info("Beginning processor shutdown phase 3, iterating until {}.", drainTimeoutExpiration);
while (System.currentTimeMillis() < drainTimeoutExpiration) {
doRun();
}
LOG.info("Processor shutdown phase 3 complete.");
// Phase 2 - execute until buffers are empty
LOG.info("Beginning processor shutdown phase 2, iterating until buffers empty.");
while (!isBufferReadyForShutdown()) {
doRun();
}
LOG.info("Processor shutdown phase 2 complete.");

// Phase 4 - prepare processors for shutdown
LOG.info("Beginning processor shutdown phase 4, preparing processors for shutdown.");
processors.forEach(Processor::prepareForShutdown);
LOG.info("Processor shutdown phase 4 complete.");
// Phase 3 - execute until peer forwarder drain period expires (best effort to process all peer forwarder data)
final long drainTimeoutExpiration = System.currentTimeMillis() + pipeline.getPeerForwarderDrainTimeout().toMillis();
LOG.info("Beginning processor shutdown phase 3, iterating until {}.", drainTimeoutExpiration);
while (System.currentTimeMillis() < drainTimeoutExpiration) {
doRun();
}
LOG.info("Processor shutdown phase 3 complete.");

// Phase 5 - execute until processors are ready to shutdown
LOG.info("Beginning processor shutdown phase 5, iterating until processors are ready to shutdown.");
while (!areComponentsReadyForShutdown()) {
doRun();
}
LOG.info("Processor shutdown phase 5 complete.");
} catch (final Exception e) {
LOG.error("Encountered exception during pipeline {} processing", pipeline.getName(), e);
// Phase 4 - prepare processors for shutdown
LOG.info("Beginning processor shutdown phase 4, preparing processors for shutdown.");
processors.forEach(Processor::prepareForShutdown);
LOG.info("Processor shutdown phase 4 complete.");

// Phase 5 - execute until processors are ready to shutdown
LOG.info("Beginning processor shutdown phase 5, iterating until processors are ready to shutdown.");
while (!areComponentsReadyForShutdown()) {
doRun();
}
LOG.info("Processor shutdown phase 5 complete.");
}

private void processAcknowledgements(List<Event> inputEvents, Collection<Record<Event>> outputRecords) {
Expand Down Expand Up @@ -153,11 +157,19 @@ private void doRun() {
}

private boolean areComponentsReadyForShutdown() {
return readBuffer.isEmpty() && processors.stream()
return isBufferReadyForShutdown() && processors.stream()
.map(Processor::isReadyForShutdown)
.allMatch(result -> result == true);
}

private boolean isBufferReadyForShutdown() {
final boolean isBufferEmpty = readBuffer.isEmpty();
final boolean forceStopReadingBuffers = pipeline.isForceStopReadingBuffers();
final boolean isBufferReadyForShutdown = isBufferEmpty || forceStopReadingBuffers;
LOG.debug("isBufferReadyForShutdown={}, isBufferEmpty={}, forceStopReadingBuffers={}", isBufferReadyForShutdown, isBufferEmpty, forceStopReadingBuffers);
return isBufferReadyForShutdown;
}

/**
* TODO Add isolator pattern - Fail if one of the Sink fails [isolator Pattern]
* Uses the pipeline method to publish to sinks, waits for each of the sink result to be true before attempting to
Expand Down
Loading

0 comments on commit a4d4a08

Please sign in to comment.