Skip to content

Commit

Permalink
Support large files in file source by using a thread (#4256)
Browse files Browse the repository at this point in the history
Run the file source in its own thread so that Data Prepper can read large files.

Also adds stop calls to RandomStringSourceTests because these were continuing to run and using up memory.

Signed-off-by: David Venable <dlv@amazon.com>
  • Loading branch information
dlvenable authored Mar 12, 2024
1 parent 7cbf824 commit 87d93a0
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,16 @@
public class FileSource implements Source<Record<Object>> {
static final String MESSAGE_KEY = "message";
private static final Logger LOG = LoggerFactory.getLogger(FileSource.class);
private static final TypeReference<Map<String, Object>> MAP_TYPE_REFERENCE = new TypeReference<Map<String, Object>>() {};
private static final TypeReference<Map<String, Object>> MAP_TYPE_REFERENCE = new TypeReference<>() { };

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final long STOP_WAIT_MILLIS = 200;
private final FileSourceConfig fileSourceConfig;
private final FileStrategy fileStrategy;
private final EventFactory eventFactory;

private Thread readThread;

private boolean isStopRequested;
private final int writeTimeout;

Expand All @@ -72,12 +75,26 @@ public FileSource(
@Override
public void start(final Buffer<Record<Object>> buffer) {
checkNotNull(buffer, "Buffer cannot be null for file source to start");
fileStrategy.start(buffer);

LOG.info("Starting file source with {} path.", fileSourceConfig.getFilePathToRead());

readThread = new Thread(() -> {
fileStrategy.start(buffer);
LOG.info("Completed reading file.");
}, "file-source");
readThread.setDaemon(false);
readThread.start();
}

@Override
public void stop() {
isStopRequested = true;

try {
readThread.join(STOP_WAIT_MILLIS);
} catch (final InterruptedException e) {
readThread.interrupt();
}
}

private interface FileStrategy {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ void testPutRecord() {
await().atMost(3, TimeUnit.SECONDS)
.pollDelay(200, TimeUnit.MILLISECONDS)
.untilAsserted(() -> verify(buffer).write(any(), anyInt()));

randomStringSource.stop();
}

@Test
Expand All @@ -55,6 +57,8 @@ void source_continues_to_write_if_a_write_to_buffer_fails() throws TimeoutExcept
await().atMost(3, TimeUnit.SECONDS)
.pollDelay(200, TimeUnit.MILLISECONDS)
.untilAsserted(() -> verify(buffer, atLeast(2)).write(any(), anyInt()));

randomStringSource.stop();
}

@Test
Expand All @@ -81,5 +85,7 @@ void multiple_calls_to_start_throws() {
objectUnderTest.start(buffer);

assertThrows(IllegalStateException.class, () -> objectUnderTest.start(buffer));

objectUnderTest.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
Expand All @@ -49,6 +51,7 @@
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
Expand Down Expand Up @@ -146,17 +149,23 @@ private BlockingBuffer<Record<Object>> getBuffer() {
}

@Test
public void testFileSourceWithEmptyFilePathThrowsRuntimeException() {
public void testFileSourceWithEmptyFilePathDoesNotWriteToBuffer() throws InterruptedException {
buffer = mock(Buffer.class);
pluginSettings.put(FileSourceConfig.ATTRIBUTE_PATH, "");
fileSource = createObjectUnderTest();
assertThrows(RuntimeException.class, () -> fileSource.start(buffer));
fileSource.start(buffer);
Thread.sleep(500);
verifyNoInteractions(buffer);
}

@Test
public void testFileSourceWithNonexistentFilePathThrowsRuntimeException() {
public void testFileSourceWithNonexistentFilePathDoesNotWriteToBuffer() throws InterruptedException {
buffer = mock(Buffer.class);
pluginSettings.put(FileSourceConfig.ATTRIBUTE_PATH, FILE_DOES_NOT_EXIST);
fileSource = createObjectUnderTest();
assertThrows(RuntimeException.class, () -> fileSource.start(buffer));
fileSource.start(buffer);
Thread.sleep(500);
verifyNoInteractions(buffer);
}

@Test
Expand Down Expand Up @@ -285,6 +294,8 @@ void start_will_parse_codec_with_correct_inputStream() throws IOException {

final ArgumentCaptor<InputStream> inputStreamArgumentCaptor = ArgumentCaptor.forClass(InputStream.class);

await().atMost(2, TimeUnit.SECONDS)
.untilAsserted(() -> verify(inputCodec).parse(any(InputStream.class), any(Consumer.class)));
verify(inputCodec).parse(inputStreamArgumentCaptor.capture(), any(Consumer.class));

final InputStream actualInputStream = inputStreamArgumentCaptor.getValue();
Expand All @@ -302,6 +313,9 @@ void start_will_parse_codec_with_a_Consumer_that_writes_to_the_buffer() throws I

final ArgumentCaptor<Consumer> consumerArgumentCaptor = ArgumentCaptor.forClass(Consumer.class);

await().atMost(2, TimeUnit.SECONDS)
.untilAsserted(() -> verify(inputCodec).parse(any(InputStream.class), any(Consumer.class)));

verify(inputCodec).parse(any(InputStream.class), consumerArgumentCaptor.capture());

final Consumer<Record<Event>> actualConsumer = consumerArgumentCaptor.getValue();
Expand All @@ -313,17 +327,19 @@ void start_will_parse_codec_with_a_Consumer_that_writes_to_the_buffer() throws I
}

@Test
void start_will_throw_exception_if_codec_throws() throws IOException, TimeoutException {
void start_will_throw_exception_if_codec_throws() throws IOException, TimeoutException, InterruptedException {

final IOException mockedException = mock(IOException.class);
doThrow(mockedException)
.when(inputCodec).parse(any(InputStream.class), any(Consumer.class));

FileSource objectUnderTest = createObjectUnderTest();

RuntimeException actualException = assertThrows(RuntimeException.class, () -> objectUnderTest.start(buffer));
objectUnderTest.start(buffer);

Thread.sleep(2_000);

assertThat(actualException.getCause(), equalTo(mockedException));
verifyNoInteractions(buffer);
}

}
Expand Down

0 comments on commit 87d93a0

Please sign in to comment.