diff --git a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/accumulator/Buffer.java b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/accumulator/Buffer.java new file mode 100644 index 0000000000..c6f6018a4f --- /dev/null +++ b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/accumulator/Buffer.java @@ -0,0 +1,25 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.accumulator; + +import java.io.IOException; + +/** + * A buffer can hold data before flushing it any Sink. + */ +public interface Buffer { + + /** + * Gets the current size of the buffer. This should be the number of bytes. + * @return buffer size. + */ + long getSize(); + int getEventCount(); + long getDuration(); + + byte[] getSinkBufferData() throws IOException; + void writeEvent(byte[] bytes) throws IOException; +} diff --git a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/accumulator/BufferFactory.java b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/accumulator/BufferFactory.java new file mode 100644 index 0000000000..2a99d63aaf --- /dev/null +++ b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/accumulator/BufferFactory.java @@ -0,0 +1,10 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.accumulator; + +public interface BufferFactory { + Buffer getBuffer(); +} diff --git a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/accumulator/BufferTypeOptions.java b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/accumulator/BufferTypeOptions.java new file mode 100644 index 0000000000..e2a2ad7158 --- /dev/null +++ b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/accumulator/BufferTypeOptions.java @@ -0,0 +1,40 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.accumulator; + +import com.fasterxml.jackson.annotation.JsonCreator; + +import java.util.Arrays; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Defines all the buffer types enumerations. + */ +public enum BufferTypeOptions { + + INMEMORY("in_memory", new InMemoryBufferFactory()), + LOCALFILE("local_file", new LocalFileBufferFactory()); + + private final String option; + private final BufferFactory bufferType; + private static final Map OPTIONS_MAP = Arrays.stream(BufferTypeOptions.values()) + .collect(Collectors.toMap(value -> value.option, value -> value)); + + BufferTypeOptions(final String option, final BufferFactory bufferType) { + this.option = option.toLowerCase(); + this.bufferType = bufferType; + } + + public BufferFactory getBufferType() { + return bufferType; + } + + @JsonCreator + static BufferTypeOptions fromOptionValue(final String option) { + return OPTIONS_MAP.get(option); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/accumulator/InMemoryBuffer.java b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/accumulator/InMemoryBuffer.java new file mode 100644 index 0000000000..e583e54260 --- /dev/null +++ b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/accumulator/InMemoryBuffer.java @@ -0,0 +1,64 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.accumulator; + +import org.apache.commons.lang3.time.StopWatch; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +/** + * A buffer can hold in memory data and flushing it to any Sink. + */ +public class InMemoryBuffer implements Buffer { + + private static final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + private int eventCount; + private final StopWatch watch; + + InMemoryBuffer() { + byteArrayOutputStream.reset(); + eventCount = 0; + watch = new StopWatch(); + watch.start(); + } + + @Override + public long getSize() { + return byteArrayOutputStream.size(); + } + + @Override + public int getEventCount() { + return eventCount; + } + + public long getDuration() { + return watch.getTime(TimeUnit.SECONDS); + } + + /** + * collect current buffer data. + * @throws IOException while collecting current buffer data. + */ + @Override + public byte[] getSinkBufferData() throws IOException { + return byteArrayOutputStream.toByteArray(); + } + + /** + * write byte array to output stream. + * + * @param bytes byte array. + * @throws IOException while writing to output stream fails. + */ + @Override + public void writeEvent(byte[] bytes) throws IOException { + byteArrayOutputStream.write(bytes); + byteArrayOutputStream.write(System.lineSeparator().getBytes()); + eventCount++; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/accumulator/InMemoryBufferFactory.java b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/accumulator/InMemoryBufferFactory.java new file mode 100644 index 0000000000..ef6ec5daf9 --- /dev/null +++ b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/accumulator/InMemoryBufferFactory.java @@ -0,0 +1,13 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.accumulator; + +public class InMemoryBufferFactory implements BufferFactory { + @Override + public Buffer getBuffer() { + return new InMemoryBuffer(); + } +} diff --git a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/accumulator/LocalFileBuffer.java b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/accumulator/LocalFileBuffer.java new file mode 100644 index 0000000000..9f9b4a3aac --- /dev/null +++ b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/accumulator/LocalFileBuffer.java @@ -0,0 +1,109 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.accumulator; + +import org.apache.commons.lang3.time.StopWatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.OutputStream; +import java.io.FileOutputStream; +import java.io.IOException; + +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.concurrent.TimeUnit; + +/** + * A buffer can hold local file data and flushing it to any Sink. + */ +public class LocalFileBuffer implements Buffer { + + private static final Logger LOG = LoggerFactory.getLogger(LocalFileBuffer.class); + private final OutputStream outputStream; + private int eventCount; + private final StopWatch watch; + private final File localFile; + + LocalFileBuffer(File tempFile) throws FileNotFoundException { + localFile = tempFile; + outputStream = new BufferedOutputStream(new FileOutputStream(tempFile)); + eventCount = 0; + watch = new StopWatch(); + watch.start(); + } + + @Override + public long getSize() { + try { + outputStream.flush(); + } catch (IOException e) { + LOG.error("An exception occurred while flushing data to buffered output stream :", e); + } + return localFile.length(); + } + + @Override + public int getEventCount() { + return eventCount; + } + + @Override + public long getDuration(){ + return watch.getTime(TimeUnit.SECONDS); + } + + /** + * collect current buffer data. + * @throws IOException while collecting current buffer data. + */ + @Override + public byte[] getSinkBufferData() throws IOException { + final byte[] fileData = Files.readAllBytes(localFile.toPath()); + removeTemporaryFile(); + return fileData; + } + + /** + * write byte array to output stream. + * @param bytes byte array. + * @throws IOException while writing to output stream fails. + */ + @Override + public void writeEvent(byte[] bytes) throws IOException { + outputStream.write(bytes); + outputStream.write(System.lineSeparator().getBytes()); + eventCount++; + } + + /** + * Flushing the buffered data into the output stream. + */ + protected void flushAndCloseStream(){ + try { + outputStream.flush(); + outputStream.close(); + } catch (IOException e) { + LOG.error("An exception occurred while flushing data to buffered output stream :", e); + } + } + + /** + * Remove the local temp file after flushing data to Sink. + */ + protected void removeTemporaryFile() { + if (localFile != null) { + try { + Files.deleteIfExists(Paths.get(localFile.toString())); + } catch (IOException e) { + LOG.error("Unable to delete Local file {}", localFile, e); + } + } + } +} \ No newline at end of file diff --git a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/accumulator/LocalFileBufferFactory.java b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/accumulator/LocalFileBufferFactory.java new file mode 100644 index 0000000000..cf11ba2a39 --- /dev/null +++ b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/accumulator/LocalFileBufferFactory.java @@ -0,0 +1,31 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.accumulator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; + +public class LocalFileBufferFactory implements BufferFactory { + + private static final Logger LOG = LoggerFactory.getLogger(LocalFileBufferFactory.class); + public static final String PREFIX = "local"; + public static final String SUFFIX = ".log"; + @Override + public Buffer getBuffer() { + File tempFile = null; + Buffer localfileBuffer = null; + try { + tempFile = File.createTempFile(PREFIX, SUFFIX); + localfileBuffer = new LocalFileBuffer(tempFile); + } catch (IOException e) { + LOG.error("Unable to create temp file ", e); + } + return localfileBuffer; + } +} diff --git a/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/sink/ThresholdCheck.java b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/sink/ThresholdCheck.java new file mode 100644 index 0000000000..1792aea483 --- /dev/null +++ b/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/sink/ThresholdCheck.java @@ -0,0 +1,37 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink; + +import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.plugins.accumulator.Buffer; + +/** + * Check threshold limits. + */ +public class ThresholdCheck { + + private ThresholdCheck() { + } + + /** + * Check threshold exceeds. + * @param currentBuffer current buffer. + * @param maxEvents maximum event provided by user as threshold. + * @param maxBytes maximum bytes provided by user as threshold. + * @param maxCollectionDuration maximum event collection duration provided by user as threshold. + * @return boolean value whether the threshold are met. + */ + public static boolean checkThresholdExceed(final Buffer currentBuffer, final int maxEvents, final ByteCount maxBytes, final long maxCollectionDuration) { + if (maxEvents > 0) { + return currentBuffer.getEventCount() + 1 > maxEvents || + currentBuffer.getDuration() > maxCollectionDuration || + currentBuffer.getSize() > maxBytes.getBytes(); + } else { + return currentBuffer.getDuration() > maxCollectionDuration || + currentBuffer.getSize() > maxBytes.getBytes(); + } + } +} diff --git a/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/accumulator/BufferTypeOptionsTest.java b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/accumulator/BufferTypeOptionsTest.java new file mode 100644 index 0000000000..2211085b33 --- /dev/null +++ b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/accumulator/BufferTypeOptionsTest.java @@ -0,0 +1,35 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.accumulator; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +@ExtendWith(MockitoExtension.class) +class BufferTypeOptionsTest { + + @Test + void notNull_test() { + assertNotNull(BufferTypeOptions.INMEMORY); + } + + @Test + void get_buffer_type_test() { + assertNotNull(BufferTypeOptions.INMEMORY.getBufferType()); + } + + @Test + void fromOptionValue_test() { + BufferTypeOptions bufferTypeOptions = BufferTypeOptions.fromOptionValue("in_memory"); + assertNotNull(bufferTypeOptions); + assertThat(bufferTypeOptions.toString(), equalTo("INMEMORY")); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/accumulator/InMemoryBufferFactoryTest.java b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/accumulator/InMemoryBufferFactoryTest.java new file mode 100644 index 0000000000..ab533f4015 --- /dev/null +++ b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/accumulator/InMemoryBufferFactoryTest.java @@ -0,0 +1,30 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.accumulator; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; + +class InMemoryBufferFactoryTest { + + @Test + void test_inMemoryBufferFactory_notNull(){ + InMemoryBufferFactory inMemoryBufferFactory = new InMemoryBufferFactory(); + Assertions.assertNotNull(inMemoryBufferFactory); + } + + @Test + void test_buffer_notNull(){ + InMemoryBufferFactory inMemoryBufferFactory = new InMemoryBufferFactory(); + Assertions.assertNotNull(inMemoryBufferFactory); + Buffer buffer = inMemoryBufferFactory.getBuffer(); + Assertions.assertNotNull(buffer); + assertThat(buffer, instanceOf(Buffer.class)); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/accumulator/InMemoryBufferTest.java b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/accumulator/InMemoryBufferTest.java new file mode 100644 index 0000000000..ad07cc4011 --- /dev/null +++ b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/accumulator/InMemoryBufferTest.java @@ -0,0 +1,56 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.accumulator; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.io.IOException; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; + +@ExtendWith(MockitoExtension.class) +class InMemoryBufferTest { + + public static final int MAX_EVENTS = 55; + + private InMemoryBuffer inMemoryBuffer; + + @Test + void test_with_write_event_into_buffer() throws IOException { + inMemoryBuffer = new InMemoryBuffer(); + + while (inMemoryBuffer.getEventCount() < MAX_EVENTS) { + inMemoryBuffer.writeEvent(generateByteArray()); + } + assertThat(inMemoryBuffer.getSize(), greaterThanOrEqualTo(54110L)); + assertThat(inMemoryBuffer.getEventCount(), equalTo(MAX_EVENTS)); + assertThat(inMemoryBuffer.getDuration(), greaterThanOrEqualTo(0L)); + + } + + @Test + void test_getSinkData_success() { + inMemoryBuffer = new InMemoryBuffer(); + Assertions.assertNotNull(inMemoryBuffer); + assertDoesNotThrow(() -> { + inMemoryBuffer.getSinkBufferData(); + }); + } + + private byte[] generateByteArray() { + byte[] bytes = new byte[1000]; + for (int i = 0; i < 1000; i++) { + bytes[i] = (byte) i; + } + return bytes; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/accumulator/LocalFileBufferFactoryTest.java b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/accumulator/LocalFileBufferFactoryTest.java new file mode 100644 index 0000000000..def5990028 --- /dev/null +++ b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/accumulator/LocalFileBufferFactoryTest.java @@ -0,0 +1,29 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.accumulator; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; +class LocalFileBufferFactoryTest { + + @Test + void test_localFileBufferFactory_notNull() { + LocalFileBufferFactory localFileBufferFactory = new LocalFileBufferFactory(); + Assertions.assertNotNull(localFileBufferFactory); + } + + @Test + void test_buffer_notNull() { + LocalFileBufferFactory localFileBufferFactory = new LocalFileBufferFactory(); + Assertions.assertNotNull(localFileBufferFactory); + Buffer buffer = localFileBufferFactory.getBuffer(); + Assertions.assertNotNull(buffer); + assertThat(buffer, instanceOf(LocalFileBuffer.class)); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/accumulator/LocalFileBufferTest.java b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/accumulator/LocalFileBufferTest.java new file mode 100644 index 0000000000..53c556e75c --- /dev/null +++ b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/accumulator/LocalFileBufferTest.java @@ -0,0 +1,84 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.accumulator; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.io.File; +import java.io.IOException; +import java.util.UUID; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.equalTo; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertFalse; + +@ExtendWith(MockitoExtension.class) +class LocalFileBufferTest { + + public static final String KEY = UUID.randomUUID().toString() + ".log"; + public static final String PREFIX = "local"; + public static final String SUFFIX = ".log"; + + private LocalFileBuffer localFileBuffer; + private File tempFile; + + @BeforeEach + void setUp() throws IOException { + tempFile = File.createTempFile(PREFIX, SUFFIX); + localFileBuffer = new LocalFileBuffer(tempFile); + } + + @Test + void test_with_write_events_into_buffer() throws IOException { + while (localFileBuffer.getEventCount() < 55) { + localFileBuffer.writeEvent(generateByteArray()); + } + assertThat(localFileBuffer.getSize(), greaterThan(1l)); + assertThat(localFileBuffer.getEventCount(), equalTo(55)); + assertThat(localFileBuffer.getDuration(), equalTo(0L)); + localFileBuffer.flushAndCloseStream(); + localFileBuffer.removeTemporaryFile(); + assertFalse(tempFile.exists(), "The temp file has not been deleted."); + } + + @Test + void test_without_write_events_into_buffer() { + assertThat(localFileBuffer.getSize(), equalTo(0L)); + assertThat(localFileBuffer.getEventCount(), equalTo(0)); + assertThat(localFileBuffer.getDuration(), equalTo(0L)); + localFileBuffer.flushAndCloseStream(); + localFileBuffer.removeTemporaryFile(); + assertFalse(tempFile.exists(), "The temp file has not been deleted."); + } + + @Test + void test_getSinkData_success() throws IOException{ + Assertions.assertNotNull(localFileBuffer); + assertDoesNotThrow(() -> { + localFileBuffer.getSinkBufferData(); + }); + } + + @AfterEach + void cleanup() { + tempFile.deleteOnExit(); + } + + private byte[] generateByteArray() { + byte[] bytes = new byte[1000]; + for (int i = 0; i < 1000; i++) { + bytes[i] = (byte) i; + } + return bytes; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/sink/ThresholdCheckTest.java b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/sink/ThresholdCheckTest.java new file mode 100644 index 0000000000..23494ecb3d --- /dev/null +++ b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/sink/ThresholdCheckTest.java @@ -0,0 +1,126 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.plugins.accumulator.Buffer; +import org.opensearch.dataprepper.plugins.accumulator.InMemoryBufferFactory; + +import java.io.IOException; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class ThresholdCheckTest { + + private Buffer inMemoryBuffer; + + @BeforeEach + void setUp() throws IOException { + inMemoryBuffer = new InMemoryBufferFactory().getBuffer(); + + while (inMemoryBuffer.getEventCount() < 100) { + inMemoryBuffer.writeEvent(generateByteArray()); + } + } + + @Test + void test_exceedThreshold_true_dueTo_maxEvents_is_less_than_buffered_event_count() { + final int maxEvents = 95; + final ByteCount maxBytes = ByteCount.parse("50kb"); + final long maxCollectionDuration = 15; + boolean isThresholdExceed = ThresholdCheck.checkThresholdExceed(inMemoryBuffer, maxEvents, + maxBytes, maxCollectionDuration); + assertTrue(isThresholdExceed, "Threshold not exceeded"); + } + + @Test + void test_exceedThreshold_false_dueTo_maxEvents_is_greater_than_buffered_event_count() { + final int maxEvents = 105; + final ByteCount maxBytes = ByteCount.parse("50mb"); + final long maxCollectionDuration = 50; + boolean isThresholdExceed = ThresholdCheck.checkThresholdExceed(inMemoryBuffer, maxEvents, maxBytes, + maxCollectionDuration); + assertFalse(isThresholdExceed, "Threshold exceeded"); + } + + @Test + void test_exceedThreshold_ture_dueTo_maxBytes_is_less_than_buffered_byte_count() { + final int maxEvents = 500; + final ByteCount maxBytes = ByteCount.parse("1b"); + final long maxCollectionDuration = 15; + boolean isThresholdExceed = ThresholdCheck.checkThresholdExceed(inMemoryBuffer, maxEvents, maxBytes, + maxCollectionDuration); + assertTrue(isThresholdExceed, "Threshold not exceeded"); + } + + @Test + void test_exceedThreshold_false_dueTo_maxBytes_is_greater_than_buffered_byte_count() { + final int maxEvents = 500; + final ByteCount maxBytes = ByteCount.parse("8mb"); + final long maxCollectionDuration = 15; + boolean isThresholdExceed = ThresholdCheck.checkThresholdExceed(inMemoryBuffer, maxEvents, + maxBytes, maxCollectionDuration); + assertFalse(isThresholdExceed, "Threshold exceeded"); + } + + @Test + void test_exceedThreshold_ture_dueTo_maxCollectionDuration_is_less_than_buffered_event_collection_duration() + throws IOException, InterruptedException { + final int maxEvents = 500; + final ByteCount maxBytes = ByteCount.parse("500mb"); + final long maxCollectionDuration = 10; + + inMemoryBuffer = new InMemoryBufferFactory().getBuffer(); + boolean isThresholdExceed = Boolean.FALSE; + synchronized (this) { + while (inMemoryBuffer.getEventCount() < 100) { + inMemoryBuffer.writeEvent(generateByteArray()); + isThresholdExceed = ThresholdCheck.checkThresholdExceed(inMemoryBuffer, maxEvents, + maxBytes, maxCollectionDuration); + if (isThresholdExceed) { + break; + } + wait(5000); + } + } + assertTrue(isThresholdExceed, "Threshold not exceeded"); + } + + @Test + void test_exceedThreshold_ture_dueTo_maxCollectionDuration_is_greater_than_buffered_event_collection_duration() + throws IOException, InterruptedException { + final int maxEvents = 500; + final ByteCount maxBytes = ByteCount.parse("500mb"); + final long maxCollectionDuration = 240; + + inMemoryBuffer = new InMemoryBufferFactory().getBuffer(); + + boolean isThresholdExceed = Boolean.FALSE; + synchronized (this) { + while (inMemoryBuffer.getEventCount() < 100) { + inMemoryBuffer.writeEvent(generateByteArray()); + isThresholdExceed = ThresholdCheck.checkThresholdExceed(inMemoryBuffer, + maxEvents, maxBytes, maxCollectionDuration); + if (isThresholdExceed) { + break; + } + wait(50); + } + } + assertFalse(isThresholdExceed, "Threshold exceeded"); + } + + private byte[] generateByteArray() { + byte[] bytes = new byte[10000]; + for (int i = 0; i < 10000; i++) { + bytes[i] = (byte) i; + } + return bytes; + } +} diff --git a/data-prepper-plugins/http-sink/build.gradle b/data-prepper-plugins/http-sink/build.gradle index da49c79c4b..21e678348f 100644 --- a/data-prepper-plugins/http-sink/build.gradle +++ b/data-prepper-plugins/http-sink/build.gradle @@ -2,11 +2,14 @@ dependencies { implementation project(':data-prepper-api') implementation project(':data-prepper-plugins:aws-plugin-api') + implementation project(':data-prepper-plugins:common') implementation 'com.fasterxml.jackson.core:jackson-core' implementation 'org.hibernate.validator:hibernate-validator:7.0.5.Final' implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' implementation 'software.amazon.awssdk:s3' implementation 'software.amazon.awssdk:sts' + implementation group: 'org.apache.httpcomponents.client5', name: 'httpclient5', version: '5.2' + implementation 'org.apache.commons:commons-lang3:3.12.0' } test { diff --git a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/ClientFactory.java b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/ClientFactory.java new file mode 100644 index 0000000000..fdb3dcaaa0 --- /dev/null +++ b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/ClientFactory.java @@ -0,0 +1,13 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink; + +/** + * AWS factory class required to create AWS Http Endpoint client for this plugin. + */ +public final class ClientFactory { + private ClientFactory() {} + +} diff --git a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/HTTPSink.java b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/HTTPSink.java index 2c32235287..06d057b625 100644 --- a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/HTTPSink.java +++ b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/HTTPSink.java @@ -4,6 +4,7 @@ */ package org.opensearch.dataprepper.plugins.sink; +import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; @@ -15,10 +16,13 @@ import org.opensearch.dataprepper.model.sink.AbstractSink; import org.opensearch.dataprepper.model.sink.Sink; import org.opensearch.dataprepper.plugins.sink.configuration.HttpSinkConfiguration; +import org.opensearch.dataprepper.plugins.sink.configuration.UrlConfigurationOption; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collection; +import java.util.List; +import java.util.Optional; @DataPrepperPlugin(name = "http", pluginType = Sink.class, pluginConfigurationType = HttpSinkConfiguration.class) public class HTTPSink extends AbstractSink> { @@ -73,4 +77,21 @@ public void doOutput(final Collection> records) { } //TODO: call Service call method } + + + public Optional getAuthHandlerByConfig(final HttpSinkConfiguration sinkConfiguration){ + //TODO: AWS Sigv4 - check + // TODO: call Auth Handlers based on auth Type + + return null; + } + + public List getClassicHttpRequestList(final List urlConfigurationOption){ + // logic for create auth handler for each url based on provided configuration - getAuthHandlerByConfig() + // logic for request preparation for each url + // logic for worker is not there in url level then verify the global workers if global workers also not defined then default 1 + // logic for get the Proxy object if url level proxy enabled else look the global proxy. + // Aws SageMaker headers if headers found in the configuration + return null; + } } \ No newline at end of file diff --git a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/HttpAuthOptions.java b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/HttpAuthOptions.java new file mode 100644 index 0000000000..61b20993d8 --- /dev/null +++ b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/HttpAuthOptions.java @@ -0,0 +1,65 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink; + +import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; +import org.apache.hc.core5.http.ClassicHttpRequest; + +public class HttpAuthOptions { + private String url; + + private CloseableHttpClient closeableHttpClient; + + private ClassicHttpRequest classicHttpRequest; + + private int workers; + + private String proxy; + + public CloseableHttpClient getCloseableHttpClient() { + return closeableHttpClient; + } + + public HttpAuthOptions setCloseableHttpClient(CloseableHttpClient closeableHttpClient) { + this.closeableHttpClient = closeableHttpClient; + return this; + } + + public ClassicHttpRequest getClassicHttpRequest() { + return classicHttpRequest; + } + + public HttpAuthOptions setClassicHttpRequest(ClassicHttpRequest classicHttpRequest) { + this.classicHttpRequest = classicHttpRequest; + return this; + } + + public int getWorkers() { + return workers; + } + + public HttpAuthOptions setWorkers(int workers) { + this.workers = workers; + return this; + } + + public String getUrl() { + return url; + } + + public HttpAuthOptions setUrl(String url) { + this.url = url; + return this; + } + + public String getProxy() { + return proxy; + } + + public HttpAuthOptions setProxy(String proxy) { + this.proxy = proxy; + return this; + } +} diff --git a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/dlq/FailedDlqData.java b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/dlq/FailedDlqData.java new file mode 100644 index 0000000000..62a85cb709 --- /dev/null +++ b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/dlq/FailedDlqData.java @@ -0,0 +1,58 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.dlq; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import org.opensearch.dataprepper.model.event.EventHandle; + +import java.util.Objects; + +public class FailedDlqData { + + private final int status; + + private final String message; + + @JsonIgnore + private final EventHandle eventHandle; + + private FailedDlqData(final int status, + final String message, + final EventHandle eventHandle) { + this.status = status; + Objects.requireNonNull(message); + this.message = message; + this.eventHandle = eventHandle; + } + + public int getStatus() { + return status; + } + + public String getMessage() { + return message; + } + public EventHandle getEventHandle() { + return eventHandle; + } + + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + + private EventHandle eventHandle; + + private int status = 0; + + private String message; + + public FailedDlqData build() { + return new FailedDlqData(status, message, eventHandle); + } + } +} diff --git a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/handler/BasicAuthHttpSinkHandler.java b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/handler/BasicAuthHttpSinkHandler.java new file mode 100644 index 0000000000..c84841c31e --- /dev/null +++ b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/handler/BasicAuthHttpSinkHandler.java @@ -0,0 +1,17 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.handler; + +import org.opensearch.dataprepper.plugins.sink.configuration.HttpSinkConfiguration; + +import java.util.Optional; + +public class BasicAuthHttpSinkHandler implements MultiAuthHttpSinkHandler { + @Override + public Optional authenticate(HttpSinkConfiguration sinkConfiguration) { + // if ssl enabled then set connection manager + return null; + } +} diff --git a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/handler/BearerTokenAuthHttpSinkHandler.java b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/handler/BearerTokenAuthHttpSinkHandler.java new file mode 100644 index 0000000000..fec474e6b8 --- /dev/null +++ b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/handler/BearerTokenAuthHttpSinkHandler.java @@ -0,0 +1,17 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.handler; + +import org.opensearch.dataprepper.plugins.sink.configuration.HttpSinkConfiguration; + +import java.util.Optional; + +public class BearerTokenAuthHttpSinkHandler implements MultiAuthHttpSinkHandler { + @Override + public Optional authenticate(HttpSinkConfiguration sinkConfiguration) { + // if ssl enabled then set connection manager + return null; + } +} diff --git a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/handler/HttpAuthOptions.java b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/handler/HttpAuthOptions.java new file mode 100644 index 0000000000..7ff5810b77 --- /dev/null +++ b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/handler/HttpAuthOptions.java @@ -0,0 +1,61 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.handler; + +import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; +import org.apache.hc.core5.http.ClassicHttpRequest; + +public class HttpAuthOptions { + private String url; + private CloseableHttpClient closeableHttpClient; + private ClassicHttpRequest classicHttpRequest; + private int workers; + private String proxy; + + public CloseableHttpClient getCloseableHttpClient() { + return closeableHttpClient; + } + + public HttpAuthOptions setCloseableHttpClient(CloseableHttpClient closeableHttpClient) { + this.closeableHttpClient = closeableHttpClient; + return this; + } + + public ClassicHttpRequest getClassicHttpRequest() { + return classicHttpRequest; + } + + public HttpAuthOptions setClassicHttpRequest(ClassicHttpRequest classicHttpRequest) { + this.classicHttpRequest = classicHttpRequest; + return this; + } + + public int getWorkers() { + return workers; + } + + public HttpAuthOptions setWorkers(int workers) { + this.workers = workers; + return this; + } + + public String getUrl() { + return url; + } + + public HttpAuthOptions setUrl(String url) { + this.url = url; + return this; + } + + public String getProxy() { + return proxy; + } + + public HttpAuthOptions setProxy(String proxy) { + this.proxy = proxy; + return this; + } +} diff --git a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/handler/MultiAuthHttpSinkHandler.java b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/handler/MultiAuthHttpSinkHandler.java new file mode 100644 index 0000000000..e0db436915 --- /dev/null +++ b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/handler/MultiAuthHttpSinkHandler.java @@ -0,0 +1,14 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.handler; + +import org.opensearch.dataprepper.plugins.sink.configuration.HttpSinkConfiguration; + +import java.util.Optional; + +public interface MultiAuthHttpSinkHandler { + Optional authenticate(final HttpSinkConfiguration sinkConfiguration); + +} diff --git a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/handler/SecuredAuthHttpSinkHandler.java b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/handler/SecuredAuthHttpSinkHandler.java new file mode 100644 index 0000000000..9fb58fe223 --- /dev/null +++ b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/handler/SecuredAuthHttpSinkHandler.java @@ -0,0 +1,18 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.handler; + +import org.opensearch.dataprepper.plugins.sink.configuration.HttpSinkConfiguration; + +import java.util.Optional; + +public class SecuredAuthHttpSinkHandler implements MultiAuthHttpSinkHandler { + @Override + public Optional authenticate(HttpSinkConfiguration sinkConfiguration) { + // logic here to read the certs from ACM/S3/local + // SSL Sigv4 validation and verification and make connection + return null; + } +} diff --git a/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/service/HttpSinkService.java b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/service/HttpSinkService.java new file mode 100644 index 0000000000..8f97dbb5b0 --- /dev/null +++ b/data-prepper-plugins/http-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/service/HttpSinkService.java @@ -0,0 +1,59 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.service; + +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.plugins.accumulator.BufferFactory; +import org.opensearch.dataprepper.plugins.sink.configuration.HttpSinkConfiguration; +import org.opensearch.dataprepper.plugins.sink.handler.HttpAuthOptions; +import org.opensearch.dataprepper.model.codec.OutputCodec; + +import java.util.Collection; +import java.util.List; +public class HttpSinkService { + + private final HttpSinkConfiguration httpSinkConf; + + private final BufferFactory bufferFactory; + + private final List httpAuthOptions; + private OutputCodec codec; + + public HttpSinkService(final OutputCodec codec, + final HttpSinkConfiguration httpSinkConf, + final BufferFactory bufferFactory, + final List httpAuthOptions){ + this.codec= codec; + this.httpSinkConf = httpSinkConf; + this.bufferFactory = bufferFactory; + this.httpAuthOptions = httpAuthOptions; + } + + public void processRecords(Collection> records) { + records.forEach(record -> { + try{ + // logic to fetch the records in batch as per threshold limit - checkThresholdExceed(); + // apply the codec + // push to http end point + }catch(Exception e){ + // In case of any exception, need to write the exception in dlq - logFailureForDlqObjects(); + // In case of any exception, need to push the web hook url- logFailureForWebHook(); + } + //TODO: implement end to end acknowledgement + }); + } + + public static boolean checkThresholdExceed(final Buffer currentBuffer, + final int maxEvents, + final ByteCount maxBytes, + final long maxCollectionDuration) { + // logic for checking the threshold + return true; + } + +}