diff --git a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/common/concurrent/BackgroundThreadFactory.java b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/common/concurrent/BackgroundThreadFactory.java new file mode 100644 index 0000000000..cd214c323e --- /dev/null +++ b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/common/concurrent/BackgroundThreadFactory.java @@ -0,0 +1,51 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.common.concurrent; + +import java.util.Objects; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A {@link ThreadFactory} that names threads with a prefix and + * sets as daemon threads so that they do not interrupt shutdown. + *

+ * The thread name will be namePrefix-threadNumber. + */ +public class BackgroundThreadFactory implements ThreadFactory { + private final String namePrefix; + private final ThreadFactory delegateThreadFactory; + private final AtomicInteger threadNumber = new AtomicInteger(1); + + BackgroundThreadFactory(final String namePrefix, final ThreadFactory delegateThreadFactory) { + this.namePrefix = Objects.requireNonNull(namePrefix); + if(namePrefix.isEmpty()) { + throw new IllegalArgumentException("The thread factory was given an empty namePrefix. It must be provided."); + } + this.delegateThreadFactory = Objects.requireNonNull(delegateThreadFactory); + } + + /** + * Creates a new instance of {@link BackgroundThreadFactory} with a specified namePrefix + * using {@link Executors#defaultThreadFactory()} as the delegate {@link ThreadFactory}. + * + * @param namePrefix The prefix for the thread name. + * @return A new instance. + */ + public static BackgroundThreadFactory defaultExecutorThreadFactory(final String namePrefix) { + return new BackgroundThreadFactory(namePrefix, Executors.defaultThreadFactory()); + } + + @Override + public Thread newThread(final Runnable runnable) { + final Thread thread = delegateThreadFactory.newThread(runnable); + thread.setName(namePrefix + "-" + threadNumber.getAndIncrement()); + thread.setDaemon(false); + + return thread; + } +} diff --git a/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/common/concurrent/BackgroundThreadFactoryTest.java b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/common/concurrent/BackgroundThreadFactoryTest.java new file mode 100644 index 0000000000..c2dbe5766a --- /dev/null +++ b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/common/concurrent/BackgroundThreadFactoryTest.java @@ -0,0 +1,108 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.common.concurrent; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.UUID; +import java.util.concurrent.ThreadFactory; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class BackgroundThreadFactoryTest { + @Mock + private ThreadFactory delegateThreadFactory; + + @Mock + private Runnable runnable; + private String namePrefix; + + @BeforeEach + void setUp() { + namePrefix = UUID.randomUUID().toString(); + } + + private BackgroundThreadFactory createObjectUnderTest() { + return new BackgroundThreadFactory(namePrefix, delegateThreadFactory); + } + + @Test + void constructor_throws_with_null_name() { + namePrefix = null; + assertThrows(NullPointerException.class, this::createObjectUnderTest); + } + + @Test + void constructor_throws_with_empty_name() { + namePrefix = ""; + assertThrows(IllegalArgumentException.class, this::createObjectUnderTest); + } + + @Test + void constructor_throws_with_null_delegate() { + delegateThreadFactory = null; + assertThrows(NullPointerException.class, this::createObjectUnderTest); + } + + @Nested + class WithNewThread { + + @Mock + private Thread threadFromDelegate; + + @BeforeEach + void setUp() { + when(delegateThreadFactory.newThread(runnable)) + .thenReturn(threadFromDelegate); + } + + @Test + void newThread_returns_thread_from_inner() { + assertThat(createObjectUnderTest().newThread(runnable), + equalTo(threadFromDelegate)); + } + + @Test + void newThread_assigns_name() { + createObjectUnderTest().newThread(runnable); + verify(threadFromDelegate).setName(namePrefix + "-1"); + } + + @Test + void newThread_sets_daemon_to_false() { + createObjectUnderTest().newThread(runnable); + verify(threadFromDelegate).setDaemon(false); + } + } + + @Test + void newThread_called_multiple_times_uses_new_thread_name() { + when(delegateThreadFactory.newThread(runnable)) + .thenAnswer(a -> mock(Thread.class)); + + final BackgroundThreadFactory objectUnderTest = createObjectUnderTest(); + + final Thread thread1 = objectUnderTest.newThread(runnable); + final Thread thread2 = objectUnderTest.newThread(runnable); + final Thread thread3 = objectUnderTest.newThread(runnable); + + verify(thread1).setName(namePrefix + "-1"); + verify(thread2).setName(namePrefix + "-2"); + verify(thread3).setName(namePrefix + "-3"); + } + +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsService.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsService.java index 1ae2e09662..b05d2806d4 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsService.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsService.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.source.s3; import com.linecorp.armeria.client.retry.Backoff; +import org.opensearch.dataprepper.common.concurrent.BackgroundThreadFactory; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.slf4j.Logger; @@ -32,7 +33,7 @@ public class SqsService { private final SqsClient sqsClient; private final PluginMetrics pluginMetrics; private final AcknowledgementSetManager acknowledgementSetManager; - private ExecutorService executorService; + private final ExecutorService executorService; public SqsService(final AcknowledgementSetManager acknowledgementSetManager, final S3SourceConfig s3SourceConfig, @@ -44,7 +45,7 @@ public SqsService(final AcknowledgementSetManager acknowledgementSetManager, this.pluginMetrics = pluginMetrics; this.acknowledgementSetManager = acknowledgementSetManager; this.sqsClient = createSqsClient(credentialsProvider); - executorService = Executors.newFixedThreadPool(s3SourceConfig.getNumWorkers()); + executorService = Executors.newFixedThreadPool(s3SourceConfig.getNumWorkers(), BackgroundThreadFactory.defaultExecutorThreadFactory("s3-source-sqs")); } public void start() {