From a1c11e5ba5784de03eb4321b10f4bcefa4b22506 Mon Sep 17 00:00:00 2001 From: David Venable Date: Mon, 23 Sep 2024 18:03:21 -0500 Subject: [PATCH] Variable drain timeouts when shutting down over HTTP shutdown. (#4970) Variable drain timeouts when shutting down over HTTP shutdown. Adds two new parameters to the shutdown API. The first is bufferReadTimeout which controls the amount of time to wait for the buffer to be empty. The second is bufferDrainTimeout which controls the overall wait time for the process worker threads to complete. To support Data Prepper durations in HTTP query parameters, I extracted the parsing logic for durations out of DataPrepperDurationDeserializer and into a new DataPrepperDurationParser class. Resolves #4966. Signed-off-by: David Venable --- .../opensearch/dataprepper/DataPrepper.java | 11 +- .../DataPrepperShutdownOptions.java | 62 ++++++ .../dataprepper/pipeline/Pipeline.java | 31 +-- .../pipeline/PipelineShutdown.java | 70 +++++++ .../dataprepper/pipeline/ProcessWorker.java | 66 ++++--- .../pipeline/server/ShutdownHandler.java | 36 +++- .../DataPrepperShutdownOptionsTest.java | 82 ++++++++ .../dataprepper/DataPrepperTests.java | 9 +- .../pipeline/PipelineShutdownTest.java | 186 ++++++++++++++++++ .../dataprepper/pipeline/PipelineTests.java | 36 +++- .../pipeline/server/ShutdownHandlerTest.java | 122 +++++++++--- .../DataPrepperDurationDeserializer.java | 46 +---- .../parser/DataPrepperDurationParser.java | 59 ++++++ .../parser/DataPrepperDurationParserTest.java | 56 ++++++ 14 files changed, 751 insertions(+), 121 deletions(-) create mode 100644 data-prepper-core/src/main/java/org/opensearch/dataprepper/DataPrepperShutdownOptions.java create mode 100644 data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/PipelineShutdown.java create mode 100644 data-prepper-core/src/test/java/org/opensearch/dataprepper/DataPrepperShutdownOptionsTest.java create mode 100644 data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineShutdownTest.java create mode 100644 data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/DataPrepperDurationParser.java create mode 100644 data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/DataPrepperDurationParserTest.java diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/DataPrepper.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/DataPrepper.java index 19c0822ce9..b2dd7a5541 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/DataPrepper.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/DataPrepper.java @@ -97,17 +97,21 @@ public void shutdown() { shutdownServers(); } + private void shutdownPipelines() { + shutdownPipelines(DataPrepperShutdownOptions.defaultOptions()); + } + /** * Triggers the shutdown of all configured valid pipelines. */ - public void shutdownPipelines() { + public void shutdownPipelines(final DataPrepperShutdownOptions shutdownOptions) { transformationPipelines.forEach((name, pipeline) -> { pipeline.removeShutdownObserver(pipelinesObserver); }); for (final Pipeline pipeline : transformationPipelines.values()) { LOG.info("Shutting down pipeline: {}", pipeline.getName()); - pipeline.shutdown(); + pipeline.shutdown(shutdownOptions); } } @@ -127,11 +131,12 @@ public void shutdownServers() { * * @param pipeline name of the pipeline */ - public void shutdownPipelines(final String pipeline) { + public void shutdownPipeline(final String pipeline) { if (transformationPipelines.containsKey(pipeline)) { transformationPipelines.get(pipeline).shutdown(); } } + public PluginFactory getPluginFactory() { return pluginFactory; } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/DataPrepperShutdownOptions.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/DataPrepperShutdownOptions.java new file mode 100644 index 0000000000..ea3edbf4f5 --- /dev/null +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/DataPrepperShutdownOptions.java @@ -0,0 +1,62 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper; + +import java.time.Duration; + +public class DataPrepperShutdownOptions { + private final Duration bufferReadTimeout; + private final Duration bufferDrainTimeout; + + public static DataPrepperShutdownOptions defaultOptions() { + return new DataPrepperShutdownOptions(builder()); + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private Duration bufferReadTimeout; + private Duration bufferDrainTimeout; + + private Builder() { + } + + public Builder withBufferReadTimeout(final Duration bufferReadTimeout) { + this.bufferReadTimeout = bufferReadTimeout; + return this; + } + + public Builder withBufferDrainTimeout(final Duration bufferDrainTimeout) { + this.bufferDrainTimeout = bufferDrainTimeout; + return this; + } + + public DataPrepperShutdownOptions build() { + return new DataPrepperShutdownOptions(this); + } + } + + private DataPrepperShutdownOptions(final Builder builder) { + this.bufferReadTimeout = builder.bufferReadTimeout; + this.bufferDrainTimeout = builder.bufferDrainTimeout; + + if(bufferReadTimeout != null && bufferDrainTimeout != null) { + if (bufferReadTimeout.compareTo(bufferDrainTimeout) > 0) { + throw new IllegalArgumentException("Buffer read timeout cannot be greater than buffer drain timeout"); + } + } + } + + public Duration getBufferReadTimeout() { + return bufferReadTimeout; + } + + public Duration getBufferDrainTimeout() { + return bufferDrainTimeout; + } +} diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/Pipeline.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/Pipeline.java index 29bb69db46..de22876041 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/Pipeline.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/Pipeline.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.pipeline; import com.google.common.base.Preconditions; +import org.opensearch.dataprepper.DataPrepperShutdownOptions; import org.opensearch.dataprepper.acknowledgements.InactiveAcknowledgementSetManager; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; @@ -41,7 +42,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.stream.Collectors; @@ -55,7 +55,7 @@ public class Pipeline { private static final Logger LOG = LoggerFactory.getLogger(Pipeline.class); private static final int SINK_LOGGING_FREQUENCY = (int) Duration.ofSeconds(60).toMillis(); - private volatile AtomicBoolean stopRequested; + private final PipelineShutdown pipelineShutdown; private final String name; private final Source source; @@ -137,7 +137,7 @@ public Pipeline( this.sinkExecutorService = PipelineThreadPoolExecutor.newFixedThreadPool(processorThreads, new PipelineThreadFactory(format("%s-sink-worker", name)), this); - stopRequested = new AtomicBoolean(false); + this.pipelineShutdown = new PipelineShutdown(buffer); } AcknowledgementSetManager getAcknowledgementSetManager() { @@ -176,7 +176,11 @@ public Collection getSinks() { } public boolean isStopRequested() { - return stopRequested.get(); + return pipelineShutdown.isStopRequested(); + } + + public boolean isForceStopReadingBuffers() { + return pipelineShutdown.isForceStopReadingBuffers(); } public Duration getPeerForwarderDrainTimeout() { @@ -267,6 +271,10 @@ public void execute() { } } + public synchronized void shutdown() { + shutdown(DataPrepperShutdownOptions.defaultOptions()); + } + /** * Initiates shutdown of the pipeline by: * 1. Stopping the source to prevent new items from being consumed @@ -276,19 +284,20 @@ public void execute() { * 5. Shutting down processors and sinks * 6. Stopping the sink ExecutorService */ - public synchronized void shutdown() { + public synchronized void shutdown(final DataPrepperShutdownOptions dataPrepperShutdownOptions) { LOG.info("Pipeline [{}] - Received shutdown signal with buffer drain timeout {}, processor shutdown timeout {}, " + "and sink shutdown timeout {}. Initiating the shutdown process", name, buffer.getDrainTimeout(), processorShutdownTimeout, sinkShutdownTimeout); try { source.stop(); - stopRequested.set(true); } catch (Exception ex) { LOG.error("Pipeline [{}] - Encountered exception while stopping the source, " + "proceeding with termination of process workers", name, ex); } - shutdownExecutorService(processorExecutorService, buffer.getDrainTimeout().toMillis() + processorShutdownTimeout.toMillis(), "processor"); + pipelineShutdown.shutdown(dataPrepperShutdownOptions); + + shutdownExecutorService(processorExecutorService, pipelineShutdown.getBufferDrainTimeout().plus(processorShutdownTimeout), "processor"); processorSets.forEach(processorSet -> processorSet.forEach(Processor::shutdown)); buffer.shutdown(); @@ -297,7 +306,7 @@ public synchronized void shutdown() { .map(DataFlowComponent::getComponent) .forEach(Sink::shutdown); - shutdownExecutorService(sinkExecutorService, sinkShutdownTimeout.toMillis(), "sink"); + shutdownExecutorService(sinkExecutorService, sinkShutdownTimeout, "sink"); LOG.info("Pipeline [{}] - Pipeline fully shutdown.", name); @@ -312,13 +321,13 @@ public void removeShutdownObserver(final PipelineObserver pipelineObserver) { observers.remove(pipelineObserver); } - private void shutdownExecutorService(final ExecutorService executorService, final long timeoutForTerminationInMillis, final String workerName) { + private void shutdownExecutorService(final ExecutorService executorService, final Duration timeoutForTermination, final String workerName) { LOG.info("Pipeline [{}] - Shutting down {} process workers.", name, workerName); executorService.shutdown(); try { - if (!executorService.awaitTermination(timeoutForTerminationInMillis, TimeUnit.MILLISECONDS)) { - LOG.warn("Pipeline [{}] - Workers did not terminate in time, forcing termination of {} workers.", name, workerName); + if (!executorService.awaitTermination(timeoutForTermination.toMillis(), TimeUnit.MILLISECONDS)) { + LOG.warn("Pipeline [{}] - Workers did not terminate in {}, forcing termination of {} workers.", name, timeoutForTermination, workerName); executorService.shutdownNow(); } } catch (InterruptedException ex) { diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/PipelineShutdown.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/PipelineShutdown.java new file mode 100644 index 0000000000..f3731e9d67 --- /dev/null +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/PipelineShutdown.java @@ -0,0 +1,70 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.pipeline; + +import org.opensearch.dataprepper.DataPrepperShutdownOptions; +import org.opensearch.dataprepper.model.buffer.Buffer; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; + +class PipelineShutdown { + private final AtomicBoolean stopRequested = new AtomicBoolean(false); + private final Duration bufferDrainTimeout; + private final Clock clock; + private Instant shutdownRequestedAt; + private Instant forceStopReadingBuffersAt; + private Duration bufferDrainTimeoutOverride; + + PipelineShutdown(final Buffer buffer) { + this(buffer, Clock.systemDefaultZone()); + } + + PipelineShutdown(final Buffer buffer, final Clock clock) { + bufferDrainTimeout = Objects.requireNonNull(buffer.getDrainTimeout()); + this.clock = clock; + } + + public void shutdown(final DataPrepperShutdownOptions dataPrepperShutdownOptions) { + final boolean stopPreviouslyRequested = stopRequested.get(); + if(stopPreviouslyRequested) { + return; + } + + stopRequested.set(true); + shutdownRequestedAt = now(); + + final Duration bufferReadTimeout = dataPrepperShutdownOptions.getBufferReadTimeout(); + if(bufferReadTimeout != null) { + forceStopReadingBuffersAt = shutdownRequestedAt.plus(bufferReadTimeout); + } + + final Duration bufferDrainTimeoutOverride = dataPrepperShutdownOptions.getBufferDrainTimeout(); + if(bufferDrainTimeoutOverride != null) { + this.bufferDrainTimeoutOverride = bufferDrainTimeoutOverride; + } + } + + boolean isStopRequested() { + return stopRequested.get(); + } + + boolean isForceStopReadingBuffers() { + return forceStopReadingBuffersAt != null && now().isAfter(forceStopReadingBuffersAt); + } + + public Duration getBufferDrainTimeout() { + return bufferDrainTimeoutOverride != null ? + bufferDrainTimeoutOverride : bufferDrainTimeout; + } + + private Instant now() { + return Instant.ofEpochMilli(clock.millis()); + } +} diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java index b5538dfe73..8117848f9a 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java @@ -60,37 +60,41 @@ public void run() { while (!pipeline.isStopRequested()) { doRun(); } - LOG.info("Processor shutdown phase 1 complete."); + executeShutdownProcess(); + } catch (final Exception e) { + LOG.error("Encountered exception during pipeline {} processing", pipeline.getName(), e); + } + } - // Phase 2 - execute until buffers are empty - LOG.info("Beginning processor shutdown phase 2, iterating until buffers empty."); - while (!readBuffer.isEmpty()) { - doRun(); - } - LOG.info("Processor shutdown phase 2 complete."); + private void executeShutdownProcess() { + LOG.info("Processor shutdown phase 1 complete."); - // Phase 3 - execute until peer forwarder drain period expires (best effort to process all peer forwarder data) - final long drainTimeoutExpiration = System.currentTimeMillis() + pipeline.getPeerForwarderDrainTimeout().toMillis(); - LOG.info("Beginning processor shutdown phase 3, iterating until {}.", drainTimeoutExpiration); - while (System.currentTimeMillis() < drainTimeoutExpiration) { - doRun(); - } - LOG.info("Processor shutdown phase 3 complete."); + // Phase 2 - execute until buffers are empty + LOG.info("Beginning processor shutdown phase 2, iterating until buffers empty."); + while (!isBufferReadyForShutdown()) { + doRun(); + } + LOG.info("Processor shutdown phase 2 complete."); - // Phase 4 - prepare processors for shutdown - LOG.info("Beginning processor shutdown phase 4, preparing processors for shutdown."); - processors.forEach(Processor::prepareForShutdown); - LOG.info("Processor shutdown phase 4 complete."); + // Phase 3 - execute until peer forwarder drain period expires (best effort to process all peer forwarder data) + final long drainTimeoutExpiration = System.currentTimeMillis() + pipeline.getPeerForwarderDrainTimeout().toMillis(); + LOG.info("Beginning processor shutdown phase 3, iterating until {}.", drainTimeoutExpiration); + while (System.currentTimeMillis() < drainTimeoutExpiration) { + doRun(); + } + LOG.info("Processor shutdown phase 3 complete."); - // Phase 5 - execute until processors are ready to shutdown - LOG.info("Beginning processor shutdown phase 5, iterating until processors are ready to shutdown."); - while (!areComponentsReadyForShutdown()) { - doRun(); - } - LOG.info("Processor shutdown phase 5 complete."); - } catch (final Exception e) { - LOG.error("Encountered exception during pipeline {} processing", pipeline.getName(), e); + // Phase 4 - prepare processors for shutdown + LOG.info("Beginning processor shutdown phase 4, preparing processors for shutdown."); + processors.forEach(Processor::prepareForShutdown); + LOG.info("Processor shutdown phase 4 complete."); + + // Phase 5 - execute until processors are ready to shutdown + LOG.info("Beginning processor shutdown phase 5, iterating until processors are ready to shutdown."); + while (!areComponentsReadyForShutdown()) { + doRun(); } + LOG.info("Processor shutdown phase 5 complete."); } private void processAcknowledgements(List inputEvents, Collection> outputRecords) { @@ -153,11 +157,19 @@ private void doRun() { } private boolean areComponentsReadyForShutdown() { - return readBuffer.isEmpty() && processors.stream() + return isBufferReadyForShutdown() && processors.stream() .map(Processor::isReadyForShutdown) .allMatch(result -> result == true); } + private boolean isBufferReadyForShutdown() { + final boolean isBufferEmpty = readBuffer.isEmpty(); + final boolean forceStopReadingBuffers = pipeline.isForceStopReadingBuffers(); + final boolean isBufferReadyForShutdown = isBufferEmpty || forceStopReadingBuffers; + LOG.debug("isBufferReadyForShutdown={}, isBufferEmpty={}, forceStopReadingBuffers={}", isBufferReadyForShutdown, isBufferEmpty, forceStopReadingBuffers); + return isBufferReadyForShutdown; + } + /** * TODO Add isolator pattern - Fail if one of the Sink fails [isolator Pattern] * Uses the pipeline method to publish to sinks, waits for each of the sink result to be true before attempting to diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/server/ShutdownHandler.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/server/ShutdownHandler.java index 08449e0b21..e3da8fc51d 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/server/ShutdownHandler.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/server/ShutdownHandler.java @@ -7,16 +7,23 @@ import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; +import org.apache.http.NameValuePair; +import org.apache.http.client.utils.URLEncodedUtils; import org.opensearch.dataprepper.DataPrepper; +import org.opensearch.dataprepper.DataPrepperShutdownOptions; +import org.opensearch.dataprepper.pipeline.parser.DataPrepperDurationParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.ws.rs.HttpMethod; import java.io.IOException; import java.net.HttpURLConnection; +import java.net.URI; +import java.nio.charset.Charset; +import java.util.List; /** - * HttpHandler to handle requests to shut down the data prepper instance + * HttpHandler to handle requests to shut down the Data Prepper instance */ public class ShutdownHandler implements HttpHandler { private final DataPrepper dataPrepper; @@ -40,7 +47,8 @@ public void handle(final HttpExchange exchange) throws IOException { LOG.info("Received HTTP shutdown request to shutdown Data Prepper. Shutdown pipelines and server. User-Agent='{}'", exchange.getRequestHeaders().getFirst("User-Agent")); } - dataPrepper.shutdownPipelines(); + final DataPrepperShutdownOptions dataPrepperShutdownOptions = mapShutdownOptions(exchange.getRequestURI()); + dataPrepper.shutdownPipelines(dataPrepperShutdownOptions); exchange.sendResponseHeaders(HttpURLConnection.HTTP_OK, 0); } catch (final Exception e) { LOG.error("Caught exception shutting down data prepper", e); @@ -50,4 +58,28 @@ public void handle(final HttpExchange exchange) throws IOException { dataPrepper.shutdownServers(); } } + + private DataPrepperShutdownOptions mapShutdownOptions(final URI requestURI) { + final List queryParams = URLEncodedUtils.parse(requestURI, Charset.defaultCharset()); + + DataPrepperShutdownOptions.Builder shutdownOptionsBuilder + = DataPrepperShutdownOptions.builder(); + + for (final NameValuePair queryParam : queryParams) { + final String value = queryParam.getValue(); + switch(queryParam.getName()) { + case "bufferReadTimeout": + shutdownOptionsBuilder = + shutdownOptionsBuilder.withBufferReadTimeout(DataPrepperDurationParser.parse(value)); + break; + case "bufferDrainTimeout": + shutdownOptionsBuilder = + shutdownOptionsBuilder.withBufferDrainTimeout(DataPrepperDurationParser.parse(value)); + break; + default: + LOG.warn("Unrecognized query parameter '{}'", queryParam.getName()); + } + } + return shutdownOptionsBuilder.build(); + } } diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/DataPrepperShutdownOptionsTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/DataPrepperShutdownOptionsTest.java new file mode 100644 index 0000000000..42ea27e97b --- /dev/null +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/DataPrepperShutdownOptionsTest.java @@ -0,0 +1,82 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.Random; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class DataPrepperShutdownOptionsTest { + private Random random; + + @BeforeEach + void setUp() { + random = new Random(); + } + + @Test + void defaultOptions_returns_correct_defaults() { + final DataPrepperShutdownOptions options = DataPrepperShutdownOptions.defaultOptions(); + + assertThat(options.getBufferDrainTimeout(), nullValue()); + assertThat(options.getBufferReadTimeout(), nullValue()); + } + + @Test + void builder_returns_valid_builder() { + final DataPrepperShutdownOptions.Builder builder = DataPrepperShutdownOptions.builder(); + + assertThat(builder, notNullValue()); + } + + @Test + void build_throws_if_bufferReadTimeout_is_greater_than_bufferDrainTimeout() { + final Duration bufferDrainTimeout = Duration.ofSeconds(random.nextInt(20)); + final Duration bufferReadTimeout = bufferDrainTimeout.plus(1, ChronoUnit.MILLIS); + final DataPrepperShutdownOptions.Builder builder = DataPrepperShutdownOptions.builder() + .withBufferDrainTimeout(bufferDrainTimeout) + .withBufferReadTimeout(bufferReadTimeout); + assertThrows(IllegalArgumentException.class, builder::build); + } + + @Test + void build_creates_new_options_with_bufferReadTimeout_equal_to_bufferDrainTimeout() { + final Duration timeout = Duration.ofSeconds(random.nextInt(20)); + final DataPrepperShutdownOptions dataPrepperShutdownOptions = DataPrepperShutdownOptions.builder() + .withBufferDrainTimeout(timeout) + .withBufferReadTimeout(timeout) + .build(); + + + assertThat(dataPrepperShutdownOptions, notNullValue()); + assertThat(dataPrepperShutdownOptions.getBufferReadTimeout(), equalTo(timeout)); + assertThat(dataPrepperShutdownOptions.getBufferDrainTimeout(), equalTo(timeout)); + } + + @Test + void build_creates_new_options_with_bufferReadTimeout_less_than_bufferDrainTimeout() { + final Duration bufferReadTimeout = Duration.ofSeconds(random.nextInt(20)); + final Duration bufferDrainTimeout = Duration.ofSeconds(random.nextInt(20)).plus(bufferReadTimeout); + final DataPrepperShutdownOptions dataPrepperShutdownOptions = DataPrepperShutdownOptions.builder() + .withBufferDrainTimeout(bufferDrainTimeout) + .withBufferReadTimeout(bufferReadTimeout) + .build(); + + + assertThat(dataPrepperShutdownOptions, notNullValue()); + assertThat(dataPrepperShutdownOptions.getBufferReadTimeout(), equalTo(bufferReadTimeout)); + assertThat(dataPrepperShutdownOptions.getBufferDrainTimeout(), equalTo(bufferDrainTimeout)); + } +} \ No newline at end of file diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/DataPrepperTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/DataPrepperTests.java index 670d9664c6..3332be605f 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/DataPrepperTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/DataPrepperTests.java @@ -111,8 +111,9 @@ public void testGivenValidPipelineParserWhenExecuteThenAllPipelinesExecuteAndSer @Test public void testDataPrepperShutdown() throws NoSuchFieldException, IllegalAccessException { - createObjectUnderTest().shutdownPipelines(); - verify(pipeline).shutdown(); + final DataPrepperShutdownOptions dataPrepperShutdownOptions = mock(DataPrepperShutdownOptions.class); + createObjectUnderTest().shutdownPipelines(dataPrepperShutdownOptions); + verify(pipeline).shutdown(dataPrepperShutdownOptions); } @Test @@ -120,14 +121,14 @@ public void testDataPrepperShutdownPipeline() throws NoSuchFieldException, Illeg final Pipeline randomPipeline = mock(Pipeline.class); lenient().when(randomPipeline.isReady()).thenReturn(true); parseConfigurationFixture.put("Random Pipeline", randomPipeline); - createObjectUnderTest().shutdownPipelines("Random Pipeline"); + createObjectUnderTest().shutdownPipeline("Random Pipeline"); verify(randomPipeline).shutdown(); } @Test public void testDataPrepperShutdownNonExistentPipelineWithoutException() throws NoSuchFieldException, IllegalAccessException { - createObjectUnderTest().shutdownPipelines("Missing Pipeline"); + createObjectUnderTest().shutdownPipeline("Missing Pipeline"); } @Test diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineShutdownTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineShutdownTest.java new file mode 100644 index 0000000000..36ac4aa3d1 --- /dev/null +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineShutdownTest.java @@ -0,0 +1,186 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.pipeline; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.DataPrepperShutdownOptions; +import org.opensearch.dataprepper.model.buffer.Buffer; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.Random; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class PipelineShutdownTest { + @Mock + private Buffer buffer; + + @Mock + private Clock clock; + + @Mock + private DataPrepperShutdownOptions dataPrepperShutdownOptions; + + private Duration bufferDrainTimeout; + private Random random; + + @BeforeEach + void setUp() { + random = new Random(); + bufferDrainTimeout = Duration.ofSeconds(random.nextInt(100) + 1_000); + + when(buffer.getDrainTimeout()).thenReturn(bufferDrainTimeout); + } + + private PipelineShutdown createObjectUnderTest() { + return new PipelineShutdown(buffer, clock); + } + + @Test + void constructor_throws_if_drainTimeout_is_null() { + reset(buffer); + when(buffer.getDrainTimeout()).thenReturn(null); + assertThrows(NullPointerException.class, this::createObjectUnderTest); + } + + @Test + void isStopRequested_returns_false() { + assertThat(createObjectUnderTest().isStopRequested(), equalTo(false)); + } + + @Test + void isForceStopReadingBuffers_returns_false() { + assertThat(createObjectUnderTest().isForceStopReadingBuffers(), equalTo(false)); + } + + @Test + void isStopRequested_returns_true_after_shutdown() { + final PipelineShutdown objectUnderTest = createObjectUnderTest(); + when(clock.millis()).thenReturn(Clock.systemUTC().millis()); + objectUnderTest.shutdown(dataPrepperShutdownOptions); + assertThat(objectUnderTest.isStopRequested(), equalTo(true)); + } + + @Test + void isStopRequested_returns_true_after_multiple_shutdown_calls() { + final PipelineShutdown objectUnderTest = createObjectUnderTest(); + when(clock.millis()).thenReturn(Clock.systemUTC().millis()); + for (int i = 0; i < 10; i++) { + objectUnderTest.shutdown(dataPrepperShutdownOptions); + } + assertThat(objectUnderTest.isStopRequested(), equalTo(true)); + } + + @Test + void isForceStopReadingBuffers_returns_false_after_shutdown_if_getBufferReadTimeout_is_null() { + final PipelineShutdown objectUnderTest = createObjectUnderTest(); + + when(dataPrepperShutdownOptions.getBufferReadTimeout()).thenReturn(null); + objectUnderTest.shutdown(dataPrepperShutdownOptions); + + assertThat(objectUnderTest.isForceStopReadingBuffers(), equalTo(false)); + } + + @Test + void isForceStopReadingBuffers_returns_false_after_shutdown_if_time_is_before_shutdown_plus_getBufferReadTimeout() { + final PipelineShutdown objectUnderTest = createObjectUnderTest(); + + when(dataPrepperShutdownOptions.getBufferReadTimeout()).thenReturn(Duration.ofSeconds(1)); + final Instant baseTime = Instant.now(); + when(clock.millis()) + .thenReturn(baseTime.toEpochMilli()); + + objectUnderTest.shutdown(dataPrepperShutdownOptions); + + assertThat(objectUnderTest.isForceStopReadingBuffers(), equalTo(false)); + } + + @Test + void isForceStopReadingBuffers_returns_true_after_shutdown_if_time_is_after_shutdown_plus_getBufferReadTimeout() { + final PipelineShutdown objectUnderTest = createObjectUnderTest(); + + when(dataPrepperShutdownOptions.getBufferReadTimeout()).thenReturn(Duration.ofSeconds(1)); + final Instant baseTime = Instant.now(); + when(clock.millis()) + .thenReturn(baseTime.toEpochMilli()) + .thenReturn(baseTime.plusSeconds(2).toEpochMilli()); + + objectUnderTest.shutdown(dataPrepperShutdownOptions); + + assertThat(objectUnderTest.isForceStopReadingBuffers(), equalTo(true)); + } + + @Test + void isForceStopReadingBuffers_returns_true_if_shutdown_is_called_multiple_times() { + final PipelineShutdown objectUnderTest = createObjectUnderTest(); + + when(dataPrepperShutdownOptions.getBufferReadTimeout()) + .thenReturn(Duration.ofSeconds(1)) + .thenReturn(Duration.ofSeconds(5)); + final Instant baseTime = Instant.now(); + when(clock.millis()) + .thenReturn(baseTime.toEpochMilli()) + .thenReturn(baseTime.plusSeconds(2).toEpochMilli()); + + objectUnderTest.shutdown(dataPrepperShutdownOptions); + objectUnderTest.shutdown(dataPrepperShutdownOptions); + + assertThat(objectUnderTest.isForceStopReadingBuffers(), equalTo(true)); + } + + @Test + void isForceStopReadingBuffers_returns_true_if_shutdown_is_called_in_between_isForceStopReadingBuffers_calls() { + final PipelineShutdown objectUnderTest = createObjectUnderTest(); + + when(dataPrepperShutdownOptions.getBufferReadTimeout()) + .thenReturn(Duration.ofSeconds(1)) + .thenReturn(Duration.ofSeconds(5)); + final Instant baseTime = Instant.now(); + when(clock.millis()) + .thenReturn(baseTime.toEpochMilli()) + .thenReturn(baseTime.plusSeconds(2).toEpochMilli()); + + objectUnderTest.shutdown(dataPrepperShutdownOptions); + assertThat(objectUnderTest.isForceStopReadingBuffers(), equalTo(true)); + + objectUnderTest.shutdown(dataPrepperShutdownOptions); + assertThat(objectUnderTest.isForceStopReadingBuffers(), equalTo(true)); + } + + + @Test + void getBufferDrainTimeout_returns_buffer_getDrainTimeout_if_shutdown_not_called() { + assertThat(createObjectUnderTest().getBufferDrainTimeout(), equalTo(bufferDrainTimeout)); + } + + @Test + void getBufferDrainTimeout_returns_buffer_getDrainTimeout_if_shutdown_called_without_bufferDrainTimeout() { + final PipelineShutdown objectUnderTest = createObjectUnderTest(); + when(dataPrepperShutdownOptions.getBufferDrainTimeout()).thenReturn(null); + objectUnderTest.shutdown(dataPrepperShutdownOptions); + assertThat(objectUnderTest.getBufferDrainTimeout(), equalTo(bufferDrainTimeout)); + } + + @Test + void getBufferDrainTimeout_returns_buffer_shutdownOptions_bufferDrainTimeout_if_provided() { + final PipelineShutdown objectUnderTest = createObjectUnderTest(); + Duration bufferDrainTimeoutFromOptions = Duration.ofSeconds(random.nextInt(100) + 100); + when(dataPrepperShutdownOptions.getBufferDrainTimeout()).thenReturn(bufferDrainTimeoutFromOptions); + objectUnderTest.shutdown(dataPrepperShutdownOptions); + assertThat(objectUnderTest.getBufferDrainTimeout(), equalTo(bufferDrainTimeoutFromOptions)); + } +} \ No newline at end of file diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineTests.java index c2e0ad769f..66300969c0 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineTests.java @@ -9,6 +9,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.DataPrepperShutdownOptions; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.buffer.Buffer; @@ -93,9 +94,9 @@ void setup() { eventFactory = mock(EventFactory.class); acknowledgementSetManager = mock(AcknowledgementSetManager.class); sourceCoordinatorFactory = mock(SourceCoordinatorFactory.class); - processorShutdownTimeout = Duration.ofSeconds(Math.abs(new Random().nextInt(10))); - sinkShutdownTimeout = Duration.ofSeconds(Math.abs(new Random().nextInt(10))); - peerForwarderDrainTimeout = Duration.ofSeconds(Math.abs(new Random().nextInt(10))); + processorShutdownTimeout = Duration.ofMillis(Math.abs(new Random().nextInt(10))); + sinkShutdownTimeout = Duration.ofMillis(Math.abs(new Random().nextInt(10))); + peerForwarderDrainTimeout = Duration.ofMillis(Math.abs(new Random().nextInt(10))); } @AfterEach @@ -620,4 +621,33 @@ void shutdown_does_not_call_removed_PipelineObservers() { testPipeline.shutdown(); verifyNoInteractions(pipelineObserver); } + + @Test + void isForceStopReadingBuffers_returns_false_if_not_in_shutdown() { + final Source> testSource = new TestSource(); + final DataFlowComponent sinkDataFlowComponent = mock(DataFlowComponent.class); + final TestSink testSink = new TestSink(); + when(sinkDataFlowComponent.getComponent()).thenReturn(testSink); + testPipeline = new Pipeline(TEST_PIPELINE_NAME, testSource, new BlockingBuffer(TEST_PIPELINE_NAME), + Collections.emptyList(), Collections.singletonList(sinkDataFlowComponent), router, + eventFactory, acknowledgementSetManager, sourceCoordinatorFactory, TEST_PROCESSOR_THREADS, TEST_READ_BATCH_TIMEOUT, + processorShutdownTimeout, sinkShutdownTimeout, peerForwarderDrainTimeout); + assertThat(testPipeline.isForceStopReadingBuffers(), equalTo(false)); + } + + @Test + void isForceStopReadingBuffers_returns_true_if_bufferReadTimeout_is_exceeded() throws InterruptedException { + final Source> testSource = new TestSource(); + final DataFlowComponent sinkDataFlowComponent = mock(DataFlowComponent.class); + final TestSink testSink = new TestSink(); + when(sinkDataFlowComponent.getComponent()).thenReturn(testSink); + testPipeline = new Pipeline(TEST_PIPELINE_NAME, testSource, new BlockingBuffer(TEST_PIPELINE_NAME), + Collections.emptyList(), Collections.singletonList(sinkDataFlowComponent), router, + eventFactory, acknowledgementSetManager, sourceCoordinatorFactory, TEST_PROCESSOR_THREADS, TEST_READ_BATCH_TIMEOUT, + processorShutdownTimeout, sinkShutdownTimeout, peerForwarderDrainTimeout); + + testPipeline.shutdown(DataPrepperShutdownOptions.builder().withBufferReadTimeout(Duration.ofMillis(1)).build()); + Thread.sleep(2); + assertThat(testPipeline.isForceStopReadingBuffers(), is(true)); + } } diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/server/ShutdownHandlerTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/server/ShutdownHandlerTest.java index 19f1e839e1..0d36d05b1d 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/server/ShutdownHandlerTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/server/ShutdownHandlerTest.java @@ -8,20 +8,29 @@ import com.sun.net.httpserver.Headers; import com.sun.net.httpserver.HttpExchange; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.ArgumentCaptor; import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.DataPrepper; +import org.opensearch.dataprepper.DataPrepperShutdownOptions; import javax.ws.rs.HttpMethod; import java.io.IOException; import java.io.OutputStream; import java.net.HttpURLConnection; +import java.net.URI; +import java.time.Duration; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.lenient; @@ -51,23 +60,6 @@ public void beforeEach() { .thenReturn(new Headers()); } - @Test - public void testWhenShutdownWithPostRequestThenResponseWritten() throws IOException { - when(exchange.getRequestMethod()) - .thenReturn(HttpMethod.POST); - - shutdownHandler.handle(exchange); - - verify(dataPrepper, times(1)) - .shutdownPipelines(); - verify(exchange, times(1)) - .sendResponseHeaders(eq(HttpURLConnection.HTTP_OK), eq(0L)); - verify(responseBody, times(1)) - .close(); - verify(dataPrepper, times(1)) - .shutdownServers(); - } - @ParameterizedTest @ValueSource(strings = { HttpMethod.DELETE, HttpMethod.GET, HttpMethod.PATCH, HttpMethod.PUT }) public void testWhenShutdownWithProhibitedHttpMethodThenErrorResponseWritten(String httpMethod) throws IOException { @@ -82,17 +74,91 @@ public void testWhenShutdownWithProhibitedHttpMethodThenErrorResponseWritten(Str .close(); } - @Test - public void testHandleException() throws IOException { - when(exchange.getRequestMethod()) - .thenReturn(HttpMethod.POST); - doThrow(RuntimeException.class).when(dataPrepper).shutdownPipelines(); - - shutdownHandler.handle(exchange); + @Nested + class WithoutQueryParameters { + @BeforeEach + void setUp() { + when(exchange.getRequestURI()).thenReturn(URI.create("/shutdown")); + } + + @Test + public void testWhenShutdownWithPostRequestThenResponseWritten() throws IOException { + when(exchange.getRequestMethod()) + .thenReturn(HttpMethod.POST); + + shutdownHandler.handle(exchange); + + ArgumentCaptor shutdownOptionsArgumentCaptor = ArgumentCaptor.forClass(DataPrepperShutdownOptions.class); + verify(dataPrepper, times(1)) + .shutdownPipelines(shutdownOptionsArgumentCaptor.capture()); + verify(exchange, times(1)) + .sendResponseHeaders(eq(HttpURLConnection.HTTP_OK), eq(0L)); + verify(responseBody, times(1)) + .close(); + verify(dataPrepper, times(1)) + .shutdownServers(); + + DataPrepperShutdownOptions actualShutdownOptions = shutdownOptionsArgumentCaptor.getValue(); + assertThat(actualShutdownOptions.getBufferDrainTimeout(), nullValue()); + assertThat(actualShutdownOptions.getBufferReadTimeout(), nullValue()); + } + + @Test + public void testHandleException() throws IOException { + when(exchange.getRequestMethod()) + .thenReturn(HttpMethod.POST); + doThrow(RuntimeException.class).when(dataPrepper).shutdownPipelines(any(DataPrepperShutdownOptions.class)); + + shutdownHandler.handle(exchange); + + verify(exchange, times(1)) + .sendResponseHeaders(eq(HttpURLConnection.HTTP_INTERNAL_ERROR), eq(0L)); + verify(responseBody, times(1)) + .close(); + } + } - verify(exchange, times(1)) - .sendResponseHeaders(eq(HttpURLConnection.HTTP_INTERNAL_ERROR), eq(0L)); - verify(responseBody, times(1)) - .close(); + @Nested + class WithoutShutdownQueryParameters { + @BeforeEach + void setUp() { + when(exchange.getRequestURI()).thenReturn(URI.create("/shutdown?bufferReadTimeout=1500ms&bufferDrainTimeout=20s")); + } + + @Test + public void testWhenShutdownWithPostRequestThenResponseWritten() throws IOException { + when(exchange.getRequestMethod()) + .thenReturn(HttpMethod.POST); + + shutdownHandler.handle(exchange); + + final ArgumentCaptor shutdownOptionsArgumentCaptor = ArgumentCaptor.forClass(DataPrepperShutdownOptions.class); + verify(dataPrepper, times(1)) + .shutdownPipelines(shutdownOptionsArgumentCaptor.capture()); + verify(exchange, times(1)) + .sendResponseHeaders(eq(HttpURLConnection.HTTP_OK), eq(0L)); + verify(responseBody, times(1)) + .close(); + verify(dataPrepper, times(1)) + .shutdownServers(); + + final DataPrepperShutdownOptions actualShutdownOptions = shutdownOptionsArgumentCaptor.getValue(); + assertThat(actualShutdownOptions.getBufferDrainTimeout(), equalTo(Duration.ofSeconds(20))); + assertThat(actualShutdownOptions.getBufferReadTimeout(), equalTo(Duration.ofMillis(1500))); + } + + @Test + public void testHandleException() throws IOException { + when(exchange.getRequestMethod()) + .thenReturn(HttpMethod.POST); + doThrow(RuntimeException.class).when(dataPrepper).shutdownPipelines(any(DataPrepperShutdownOptions.class)); + + shutdownHandler.handle(exchange); + + verify(exchange, times(1)) + .sendResponseHeaders(eq(HttpURLConnection.HTTP_INTERNAL_ERROR), eq(0L)); + verify(responseBody, times(1)) + .close(); + } } } diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/DataPrepperDurationDeserializer.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/DataPrepperDurationDeserializer.java index 5005eb9f96..d6ae65e2b0 100644 --- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/DataPrepperDurationDeserializer.java +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/DataPrepperDurationDeserializer.java @@ -11,9 +11,6 @@ import java.io.IOException; import java.time.Duration; -import java.time.format.DateTimeParseException; -import java.util.regex.Matcher; -import java.util.regex.Pattern; /** * This deserializer is used for configurations that use a {@link Duration} type when deserialized by Jackson @@ -24,54 +21,17 @@ */ public class DataPrepperDurationDeserializer extends StdDeserializer { - private static final String SIMPLE_DURATION_REGEX = "^(0|[1-9]\\d*)(s|ms)$"; - private static final Pattern SIMPLE_DURATION_PATTERN = Pattern.compile(SIMPLE_DURATION_REGEX); - public DataPrepperDurationDeserializer() { this(null); } - protected DataPrepperDurationDeserializer(Class vc) { + protected DataPrepperDurationDeserializer(final Class vc) { super(vc); } @Override - public Duration deserialize(JsonParser p, DeserializationContext ctxt) throws IOException { + public Duration deserialize(final JsonParser p, final DeserializationContext ctxt) throws IOException { final String durationString = p.getValueAsString(); - Duration duration; - - try { - duration = Duration.parse(durationString); - } catch (final DateTimeParseException e) { - duration = parseSimpleDuration(durationString); - if (duration == null) { - throw new IllegalArgumentException("Durations must use either ISO 8601 notation or simple notations for seconds (60s) or milliseconds (100ms). Whitespace is ignored."); - } - } - - return duration; - } - - private Duration parseSimpleDuration(final String durationString) throws IllegalArgumentException { - final String durationStringNoSpaces = durationString.replaceAll("\\s", ""); - final Matcher matcher = SIMPLE_DURATION_PATTERN.matcher(durationStringNoSpaces); - if (!matcher.find()) { - return null; - } - - final long durationNumber = Long.parseLong(matcher.group(1)); - final String durationUnit = matcher.group(2); - - return getDurationFromUnitAndNumber(durationNumber, durationUnit); - } - - private Duration getDurationFromUnitAndNumber(final long durationNumber, final String durationUnit) { - switch (durationUnit) { - case "s": - return Duration.ofSeconds(durationNumber); - case "ms": - return Duration.ofMillis(durationNumber); - } - return null; + return DataPrepperDurationParser.parse(durationString); } } diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/DataPrepperDurationParser.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/DataPrepperDurationParser.java new file mode 100644 index 0000000000..e758278cf7 --- /dev/null +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/DataPrepperDurationParser.java @@ -0,0 +1,59 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.pipeline.parser; + +import java.time.Duration; +import java.time.format.DateTimeParseException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Parses strings into {@link Duration} supporting the Data Prepper duration format. + * It supports ISO 8601 notation ("PT20.345S", "PT15M", etc.) and simple durations for + * seconds (60s) and milliseconds (100ms). It does not support combining the units for simple durations ("60s 100ms" is not allowed). + * Whitespace is ignored and leading zeroes are not allowed. + * @since 2.10 + */ +public class DataPrepperDurationParser { + private static final String SIMPLE_DURATION_REGEX = "^(0|[1-9]\\d*)(s|ms)$"; + private static final Pattern SIMPLE_DURATION_PATTERN = Pattern.compile(SIMPLE_DURATION_REGEX); + + public static Duration parse(final String durationString) { + try { + return Duration.parse(durationString); + } catch (final DateTimeParseException e) { + final Duration duration = parseSimpleDuration(durationString); + if (duration == null) { + throw new IllegalArgumentException("Durations must use either ISO 8601 notation or simple notations for seconds (60s) or milliseconds (100ms). Whitespace is ignored."); + } + return duration; + } + } + + private static Duration parseSimpleDuration(final String durationString) throws IllegalArgumentException { + final String durationStringNoSpaces = durationString.replaceAll("\\s", ""); + final Matcher matcher = SIMPLE_DURATION_PATTERN.matcher(durationStringNoSpaces); + if (!matcher.find()) { + return null; + } + + final long durationNumber = Long.parseLong(matcher.group(1)); + final String durationUnit = matcher.group(2); + + return getDurationFromUnitAndNumber(durationNumber, durationUnit); + } + + private static Duration getDurationFromUnitAndNumber(final long durationNumber, final String durationUnit) { + switch (durationUnit) { + case "s": + return Duration.ofSeconds(durationNumber); + case "ms": + return Duration.ofMillis(durationNumber); + } + return null; + } + +} diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/DataPrepperDurationParserTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/DataPrepperDurationParserTest.java new file mode 100644 index 0000000000..4913e9c545 --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/DataPrepperDurationParserTest.java @@ -0,0 +1,56 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.pipeline.parser; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.time.Duration; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class DataPrepperDurationParserTest { + + @ParameterizedTest + @ValueSource(strings = {"6s1s", "60ms 100s", "20.345s", "-1s", "06s", "100m", "100sm", "100"}) + void invalidDurationStringsThrowIllegalArgumentException(final String durationString) { + assertThrows(IllegalArgumentException.class, () -> DataPrepperDurationParser.parse(durationString)); + } + + @Test + void ISO_8601_duration_string_returns_correct_duration() { + final String durationString = "PT15M"; + final Duration result = DataPrepperDurationParser.parse(durationString); + assertThat(result, equalTo(Duration.ofMinutes(15))); + } + + @ParameterizedTest + @ValueSource(strings = {"0s", "0ms"}) + void simple_duration_strings_of_0_return_correct_duration(final String durationString) { + final Duration result = DataPrepperDurationParser.parse(durationString); + + assertThat(result, equalTo(Duration.ofSeconds(0))); + } + + @ParameterizedTest + @ValueSource(strings = {"60s", "60000ms", "60 s", "60000 ms", " 60 s "}) + void simple_duration_strings_of_60_seconds_return_correct_duration(final String durationString) { + final Duration result = DataPrepperDurationParser.parse(durationString); + + assertThat(result, equalTo(Duration.ofSeconds(60))); + } + + @ParameterizedTest + @ValueSource(strings = {"5s", "5000ms", "5 s", "5000 ms", " 5 s "}) + void simple_duration_strings_of_5_seconds_return_correct_duration(final String durationString) { + final Duration result = DataPrepperDurationParser.parse(durationString); + + assertThat(result, equalTo(Duration.ofSeconds(5))); + } +} \ No newline at end of file