Skip to content

Commit

Permalink
Boiler plate code of HttpSink for #874. (#2916)
Browse files Browse the repository at this point in the history
* Boiler plate code of HttpSink for #874.
Signed-off-by: mallikagogoi7 <mallikagogoi7@gmail.com>

* Added copyright on classes of HttpSink for #874.
Signed-off-by: mallikagogoi7 <mallikagogoi7@gmail.com>

* Moved Accumulator package to common for #874.
Signed-off-by: mallikagogoi7 <mallikagogoi7@gmail.com>

* Test cases added for accumulator related classes for #874.
Signed-off-by: mallikagogoi7 <mallikagogoi7@gmail.com>

* Added HttpSink related methos in accumulator for #874.
Signed-off-by: mallikagogoi7 <mallikagogoi7@gmail.com>

* Removed plugin specific methods from common for #874.
Signed-off-by: mallikagogoi7 <mallikagogoi7@gmail.com>
  • Loading branch information
mallikagogoi7 committed Jul 5, 2023
1 parent 4aa35a3 commit c78de95
Show file tree
Hide file tree
Showing 25 changed files with 1,035 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.accumulator;

import java.io.IOException;

/**
* A buffer can hold data before flushing it any Sink.
*/
public interface Buffer {

/**
* Gets the current size of the buffer. This should be the number of bytes.
* @return buffer size.
*/
long getSize();
int getEventCount();
long getDuration();

byte[] getSinkBufferData() throws IOException;
void writeEvent(byte[] bytes) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.accumulator;

public interface BufferFactory {
Buffer getBuffer();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.accumulator;

import com.fasterxml.jackson.annotation.JsonCreator;

import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Defines all the buffer types enumerations.
*/
public enum BufferTypeOptions {

INMEMORY("in_memory", new InMemoryBufferFactory()),
LOCALFILE("local_file", new LocalFileBufferFactory());

private final String option;
private final BufferFactory bufferType;
private static final Map<String, BufferTypeOptions> OPTIONS_MAP = Arrays.stream(BufferTypeOptions.values())
.collect(Collectors.toMap(value -> value.option, value -> value));

BufferTypeOptions(final String option, final BufferFactory bufferType) {
this.option = option.toLowerCase();
this.bufferType = bufferType;
}

public BufferFactory getBufferType() {
return bufferType;
}

@JsonCreator
static BufferTypeOptions fromOptionValue(final String option) {
return OPTIONS_MAP.get(option);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.accumulator;

import org.apache.commons.lang3.time.StopWatch;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

/**
* A buffer can hold in memory data and flushing it to any Sink.
*/
public class InMemoryBuffer implements Buffer {

private static final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
private int eventCount;
private final StopWatch watch;

InMemoryBuffer() {
byteArrayOutputStream.reset();
eventCount = 0;
watch = new StopWatch();
watch.start();
}

@Override
public long getSize() {
return byteArrayOutputStream.size();
}

@Override
public int getEventCount() {
return eventCount;
}

public long getDuration() {
return watch.getTime(TimeUnit.SECONDS);
}

/**
* collect current buffer data.
* @throws IOException while collecting current buffer data.
*/
@Override
public byte[] getSinkBufferData() throws IOException {
return byteArrayOutputStream.toByteArray();
}

/**
* write byte array to output stream.
*
* @param bytes byte array.
* @throws IOException while writing to output stream fails.
*/
@Override
public void writeEvent(byte[] bytes) throws IOException {
byteArrayOutputStream.write(bytes);
byteArrayOutputStream.write(System.lineSeparator().getBytes());
eventCount++;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.accumulator;

public class InMemoryBufferFactory implements BufferFactory {
@Override
public Buffer getBuffer() {
return new InMemoryBuffer();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.accumulator;

import org.apache.commons.lang3.time.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.OutputStream;
import java.io.FileOutputStream;
import java.io.IOException;

import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.concurrent.TimeUnit;

/**
* A buffer can hold local file data and flushing it to any Sink.
*/
public class LocalFileBuffer implements Buffer {

private static final Logger LOG = LoggerFactory.getLogger(LocalFileBuffer.class);
private final OutputStream outputStream;
private int eventCount;
private final StopWatch watch;
private final File localFile;

LocalFileBuffer(File tempFile) throws FileNotFoundException {
localFile = tempFile;
outputStream = new BufferedOutputStream(new FileOutputStream(tempFile));
eventCount = 0;
watch = new StopWatch();
watch.start();
}

@Override
public long getSize() {
try {
outputStream.flush();
} catch (IOException e) {
LOG.error("An exception occurred while flushing data to buffered output stream :", e);
}
return localFile.length();
}

@Override
public int getEventCount() {
return eventCount;
}

@Override
public long getDuration(){
return watch.getTime(TimeUnit.SECONDS);
}

/**
* collect current buffer data.
* @throws IOException while collecting current buffer data.
*/
@Override
public byte[] getSinkBufferData() throws IOException {
final byte[] fileData = Files.readAllBytes(localFile.toPath());
removeTemporaryFile();
return fileData;
}

/**
* write byte array to output stream.
* @param bytes byte array.
* @throws IOException while writing to output stream fails.
*/
@Override
public void writeEvent(byte[] bytes) throws IOException {
outputStream.write(bytes);
outputStream.write(System.lineSeparator().getBytes());
eventCount++;
}

/**
* Flushing the buffered data into the output stream.
*/
protected void flushAndCloseStream(){
try {
outputStream.flush();
outputStream.close();
} catch (IOException e) {
LOG.error("An exception occurred while flushing data to buffered output stream :", e);
}
}

/**
* Remove the local temp file after flushing data to Sink.
*/
protected void removeTemporaryFile() {
if (localFile != null) {
try {
Files.deleteIfExists(Paths.get(localFile.toString()));
} catch (IOException e) {
LOG.error("Unable to delete Local file {}", localFile, e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.accumulator;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;

public class LocalFileBufferFactory implements BufferFactory {

private static final Logger LOG = LoggerFactory.getLogger(LocalFileBufferFactory.class);
public static final String PREFIX = "local";
public static final String SUFFIX = ".log";
@Override
public Buffer getBuffer() {
File tempFile = null;
Buffer localfileBuffer = null;
try {
tempFile = File.createTempFile(PREFIX, SUFFIX);
localfileBuffer = new LocalFileBuffer(tempFile);
} catch (IOException e) {
LOG.error("Unable to create temp file ", e);
}
return localfileBuffer;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink;

import org.opensearch.dataprepper.model.types.ByteCount;
import org.opensearch.dataprepper.plugins.accumulator.Buffer;

/**
* Check threshold limits.
*/
public class ThresholdCheck {

private ThresholdCheck() {
}

/**
* Check threshold exceeds.
* @param currentBuffer current buffer.
* @param maxEvents maximum event provided by user as threshold.
* @param maxBytes maximum bytes provided by user as threshold.
* @param maxCollectionDuration maximum event collection duration provided by user as threshold.
* @return boolean value whether the threshold are met.
*/
public static boolean checkThresholdExceed(final Buffer currentBuffer, final int maxEvents, final ByteCount maxBytes, final long maxCollectionDuration) {
if (maxEvents > 0) {
return currentBuffer.getEventCount() + 1 > maxEvents ||
currentBuffer.getDuration() > maxCollectionDuration ||
currentBuffer.getSize() > maxBytes.getBytes();
} else {
return currentBuffer.getDuration() > maxCollectionDuration ||
currentBuffer.getSize() > maxBytes.getBytes();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.accumulator;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertNotNull;

@ExtendWith(MockitoExtension.class)
class BufferTypeOptionsTest {

@Test
void notNull_test() {
assertNotNull(BufferTypeOptions.INMEMORY);
}

@Test
void get_buffer_type_test() {
assertNotNull(BufferTypeOptions.INMEMORY.getBufferType());
}

@Test
void fromOptionValue_test() {
BufferTypeOptions bufferTypeOptions = BufferTypeOptions.fromOptionValue("in_memory");
assertNotNull(bufferTypeOptions);
assertThat(bufferTypeOptions.toString(), equalTo("INMEMORY"));
}
}
Loading

0 comments on commit c78de95

Please sign in to comment.