Skip to content

Commit

Permalink
Additional logging when shutting down the pipeline.
Browse files Browse the repository at this point in the history
Signed-off-by: David Venable <dlv@amazon.com>
  • Loading branch information
dlvenable committed Sep 30, 2024
1 parent 5ffc738 commit 4152b5a
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public Pipeline(
this.sinkExecutorService = PipelineThreadPoolExecutor.newFixedThreadPool(processorThreads,
new PipelineThreadFactory(format("%s-sink-worker", name)), this);

this.pipelineShutdown = new PipelineShutdown(buffer);
this.pipelineShutdown = new PipelineShutdown(name, buffer);
}

AcknowledgementSetManager getAcknowledgementSetManager() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import org.opensearch.dataprepper.DataPrepperShutdownOptions;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Clock;
import java.time.Duration;
Expand All @@ -15,19 +17,25 @@
import java.util.concurrent.atomic.AtomicBoolean;

class PipelineShutdown {
private static final Logger LOG = LoggerFactory.getLogger(PipelineShutdown.class);

private final AtomicBoolean stopRequested = new AtomicBoolean(false);
private final Duration bufferDrainTimeout;
private final Duration pipelineConfiguredBufferDrainTimeout;
private final Clock clock;
private final String pipelineName;
private Instant shutdownRequestedAt;
private Instant forceStopReadingBuffersAt;
private Duration bufferDrainTimeoutOverride;
private Duration bufferDrainTimeout;

PipelineShutdown(final Buffer<?> buffer) {
this(buffer, Clock.systemDefaultZone());
PipelineShutdown(final String pipelineName, final Buffer<?> buffer) {
this(pipelineName, buffer, Clock.systemDefaultZone());
}

PipelineShutdown(final Buffer<?> buffer, final Clock clock) {
bufferDrainTimeout = Objects.requireNonNull(buffer.getDrainTimeout());
PipelineShutdown(String pipelineName, final Buffer<?> buffer, final Clock clock) {
this.pipelineName = pipelineName;
pipelineConfiguredBufferDrainTimeout = Objects.requireNonNull(buffer.getDrainTimeout());
bufferDrainTimeout = pipelineConfiguredBufferDrainTimeout;
this.clock = clock;
}

Expand All @@ -48,7 +56,11 @@ public void shutdown(final DataPrepperShutdownOptions dataPrepperShutdownOptions
final Duration bufferDrainTimeoutOverride = dataPrepperShutdownOptions.getBufferDrainTimeout();
if(bufferDrainTimeoutOverride != null) {
this.bufferDrainTimeoutOverride = bufferDrainTimeoutOverride;
bufferDrainTimeout = bufferDrainTimeoutOverride;
}

LOG.info("Started shutdown for pipeline {}. Requested at {}. Force stop reading buffers at {}. The buffer drain timeout to use is {}",
pipelineName, shutdownRequestedAt, forceStopReadingBuffersAt, bufferDrainTimeout);
}

boolean isStopRequested() {
Expand All @@ -60,8 +72,7 @@ boolean isForceStopReadingBuffers() {
}

public Duration getBufferDrainTimeout() {
return bufferDrainTimeoutOverride != null ?
bufferDrainTimeoutOverride : bufferDrainTimeout;
return bufferDrainTimeout;
}

private Instant now() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.time.Duration;
import java.time.Instant;
import java.util.Random;
import java.util.UUID;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -37,17 +38,19 @@ class PipelineShutdownTest {

private Duration bufferDrainTimeout;
private Random random;
private String pipelineName;

@BeforeEach
void setUp() {
random = new Random();
pipelineName = UUID.randomUUID().toString();
bufferDrainTimeout = Duration.ofSeconds(random.nextInt(100) + 1_000);

when(buffer.getDrainTimeout()).thenReturn(bufferDrainTimeout);
}

private PipelineShutdown createObjectUnderTest() {
return new PipelineShutdown(buffer, clock);
return new PipelineShutdown(pipelineName, buffer, clock);
}

@Test
Expand Down

0 comments on commit 4152b5a

Please sign in to comment.