Skip to content

Commit

Permalink
Name the S3 source's SQS worker threads (#4279)
Browse files Browse the repository at this point in the history
Creates a new common class for creating background threads. Name the threads for the S3 source's SQS worker threads.

Signed-off-by: David Venable <dlv@amazon.com>
  • Loading branch information
dlvenable authored Mar 15, 2024
1 parent a5defe7 commit 19b18a1
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.common.concurrent;

import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
* A {@link ThreadFactory} that names threads with a prefix and
* sets as daemon threads so that they do not interrupt shutdown.
* <p>
* The thread name will be <i>namePrefix</i>-<i>threadNumber</i>.
*/
public class BackgroundThreadFactory implements ThreadFactory {
private final String namePrefix;
private final ThreadFactory delegateThreadFactory;
private final AtomicInteger threadNumber = new AtomicInteger(1);

BackgroundThreadFactory(final String namePrefix, final ThreadFactory delegateThreadFactory) {
this.namePrefix = Objects.requireNonNull(namePrefix);
if(namePrefix.isEmpty()) {
throw new IllegalArgumentException("The thread factory was given an empty namePrefix. It must be provided.");
}
this.delegateThreadFactory = Objects.requireNonNull(delegateThreadFactory);
}

/**
* Creates a new instance of {@link BackgroundThreadFactory} with a specified namePrefix
* using {@link Executors#defaultThreadFactory()} as the delegate {@link ThreadFactory}.
*
* @param namePrefix The prefix for the thread name.
* @return A new instance.
*/
public static BackgroundThreadFactory defaultExecutorThreadFactory(final String namePrefix) {
return new BackgroundThreadFactory(namePrefix, Executors.defaultThreadFactory());
}

@Override
public Thread newThread(final Runnable runnable) {
final Thread thread = delegateThreadFactory.newThread(runnable);
thread.setName(namePrefix + "-" + threadNumber.getAndIncrement());
thread.setDaemon(false);

return thread;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.common.concurrent;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import java.util.UUID;
import java.util.concurrent.ThreadFactory;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class BackgroundThreadFactoryTest {
@Mock
private ThreadFactory delegateThreadFactory;

@Mock
private Runnable runnable;
private String namePrefix;

@BeforeEach
void setUp() {
namePrefix = UUID.randomUUID().toString();
}

private BackgroundThreadFactory createObjectUnderTest() {
return new BackgroundThreadFactory(namePrefix, delegateThreadFactory);
}

@Test
void constructor_throws_with_null_name() {
namePrefix = null;
assertThrows(NullPointerException.class, this::createObjectUnderTest);
}

@Test
void constructor_throws_with_empty_name() {
namePrefix = "";
assertThrows(IllegalArgumentException.class, this::createObjectUnderTest);
}

@Test
void constructor_throws_with_null_delegate() {
delegateThreadFactory = null;
assertThrows(NullPointerException.class, this::createObjectUnderTest);
}

@Nested
class WithNewThread {

@Mock
private Thread threadFromDelegate;

@BeforeEach
void setUp() {
when(delegateThreadFactory.newThread(runnable))
.thenReturn(threadFromDelegate);
}

@Test
void newThread_returns_thread_from_inner() {
assertThat(createObjectUnderTest().newThread(runnable),
equalTo(threadFromDelegate));
}

@Test
void newThread_assigns_name() {
createObjectUnderTest().newThread(runnable);
verify(threadFromDelegate).setName(namePrefix + "-1");
}

@Test
void newThread_sets_daemon_to_false() {
createObjectUnderTest().newThread(runnable);
verify(threadFromDelegate).setDaemon(false);
}
}

@Test
void newThread_called_multiple_times_uses_new_thread_name() {
when(delegateThreadFactory.newThread(runnable))
.thenAnswer(a -> mock(Thread.class));

final BackgroundThreadFactory objectUnderTest = createObjectUnderTest();

final Thread thread1 = objectUnderTest.newThread(runnable);
final Thread thread2 = objectUnderTest.newThread(runnable);
final Thread thread3 = objectUnderTest.newThread(runnable);

verify(thread1).setName(namePrefix + "-1");
verify(thread2).setName(namePrefix + "-2");
verify(thread3).setName(namePrefix + "-3");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.source.s3;

import com.linecorp.armeria.client.retry.Backoff;
import org.opensearch.dataprepper.common.concurrent.BackgroundThreadFactory;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.slf4j.Logger;
Expand All @@ -32,7 +33,7 @@ public class SqsService {
private final SqsClient sqsClient;
private final PluginMetrics pluginMetrics;
private final AcknowledgementSetManager acknowledgementSetManager;
private ExecutorService executorService;
private final ExecutorService executorService;

public SqsService(final AcknowledgementSetManager acknowledgementSetManager,
final S3SourceConfig s3SourceConfig,
Expand All @@ -44,7 +45,7 @@ public SqsService(final AcknowledgementSetManager acknowledgementSetManager,
this.pluginMetrics = pluginMetrics;
this.acknowledgementSetManager = acknowledgementSetManager;
this.sqsClient = createSqsClient(credentialsProvider);
executorService = Executors.newFixedThreadPool(s3SourceConfig.getNumWorkers());
executorService = Executors.newFixedThreadPool(s3SourceConfig.getNumWorkers(), BackgroundThreadFactory.defaultExecutorThreadFactory("s3-source-sqs"));
}

public void start() {
Expand Down

0 comments on commit 19b18a1

Please sign in to comment.