Skip to content

Commit

Permalink
Refactor http source to http source common. (#2)
Browse files Browse the repository at this point in the history
Asynchronous implementation of opensearch api pipeline with pipeline transformation.

Synchronous Opensearch API source implementation

Synchronous implementation changes

Response action implementation

Query metrics support.
  • Loading branch information
sb2k16 authored May 15, 2024
1 parent 3ee920b commit fa17066
Show file tree
Hide file tree
Showing 71 changed files with 3,675 additions and 557 deletions.
2 changes: 2 additions & 0 deletions config/data-prepper-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ssl: false
metric_registries: [CloudWatch]
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
@JsonDeserialize(using = SinkModel.SinkModelDeserializer.class)
public class SinkModel extends PluginModel {

public SinkModel(final String pluginName, final List<String> routes, final String tagsTargetKey, final List<String> includeKeys, final List<String> excludeKeys, final Map<String, Object> pluginSettings) {
this(pluginName, new SinkInternalJsonModel(routes, tagsTargetKey, includeKeys, excludeKeys, pluginSettings));
public SinkModel(final String pluginName, final List<String> routes, final String tagsTargetKey, final List<String> includeKeys, final List<String> excludeKeys, final Map<String, Object> pluginSettings, final List<PluginModel> response_actions) {
this(pluginName, new SinkInternalJsonModel(routes, tagsTargetKey, includeKeys, excludeKeys, pluginSettings, response_actions));
}

private SinkModel(final String pluginName, final SinkInternalJsonModel sinkInnerModel) {
Expand All @@ -55,6 +55,10 @@ public List<String> getExcludeKeys() {
return this.<SinkInternalJsonModel>getInternalJsonModel().excludeKeys;
}

public List<PluginModel> getResponseActions() {
return this.<SinkInternalJsonModel>getInternalJsonModel().response_actions;
}


/**
* Gets the tags target key associated with this Sink.
Expand All @@ -75,16 +79,19 @@ public static class SinkModelBuilder {
private final List<String> includeKeys;
private final List<String> excludeKeys;

private final List<PluginModel> response_actions;

private SinkModelBuilder(final PluginModel pluginModel) {
this.pluginModel = pluginModel;
this.routes = Collections.emptyList();
this.tagsTargetKey = null;
this.includeKeys = Collections.emptyList();
this.excludeKeys = Collections.emptyList();
this.response_actions = Collections.emptyList();
}

public SinkModel build() {
return new SinkModel(pluginModel.getPluginName(), routes, tagsTargetKey, includeKeys, excludeKeys, pluginModel.getPluginSettings());
return new SinkModel(pluginModel.getPluginName(), routes, tagsTargetKey, includeKeys, excludeKeys, pluginModel.getPluginSettings(), response_actions);
}
}

Expand All @@ -110,17 +117,23 @@ private static class SinkInternalJsonModel extends InternalJsonModel {
@JsonProperty("exclude_keys")
private final List<String> excludeKeys;

@JsonInclude(JsonInclude.Include.NON_EMPTY)
@JsonProperty("response_actions")
private final List<PluginModel> response_actions;

@JsonCreator
private SinkInternalJsonModel(@JsonProperty("routes") final List<String> routes, @JsonProperty("tags_target_key") final String tagsTargetKey, @JsonProperty("include_keys") final List<String> includeKeys, @JsonProperty("exclude_keys") final List<String> excludeKeys) {
this(routes, tagsTargetKey, includeKeys, excludeKeys, new HashMap<>());
private SinkInternalJsonModel(@JsonProperty("routes") final List<String> routes, @JsonProperty("tags_target_key") final String tagsTargetKey,
@JsonProperty("include_keys") final List<String> includeKeys, @JsonProperty("exclude_keys") final List<String> excludeKeys, @JsonProperty("response_actions") final List<PluginModel> responseActions) {
this(routes, tagsTargetKey, includeKeys, excludeKeys, new HashMap<>(), responseActions);
}

private SinkInternalJsonModel(final List<String> routes, final String tagsTargetKey, final List<String> includeKeys, final List<String> excludeKeys, final Map<String, Object> pluginSettings) {
private SinkInternalJsonModel(final List<String> routes, final String tagsTargetKey, final List<String> includeKeys, final List<String> excludeKeys, final Map<String, Object> pluginSettings, List<PluginModel> responseActions) {
super(pluginSettings);
this.routes = routes != null ? routes : Collections.emptyList();
this.includeKeys = includeKeys != null ? includeKeys : Collections.emptyList();
this.excludeKeys = excludeKeys != null ? excludeKeys : Collections.emptyList();
this.tagsTargetKey = tagsTargetKey;
response_actions = responseActions;
validateConfiguration();
validateKeys();
}
Expand Down Expand Up @@ -149,7 +162,7 @@ private void validateKeys() {

static class SinkModelDeserializer extends AbstractPluginModelDeserializer<SinkModel, SinkInternalJsonModel> {
SinkModelDeserializer() {
super(SinkModel.class, SinkInternalJsonModel.class, SinkModel::new, () -> new SinkInternalJsonModel(null, null, null, null));
super(SinkModel.class, SinkInternalJsonModel.class, SinkModel::new, () -> new SinkInternalJsonModel(null, null, null, null, null));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public abstract class AbstractSink<T extends Record<?>> implements Sink<T> {
protected static final int DEFAULT_MAX_RETRIES = 600;
protected static final int DEFAULT_WAIT_TIME_MS = 1000;
protected final PluginMetrics pluginMetrics;
protected final PluginSetting pluginSetting;
private final Counter recordsInCounter;
private final SinkLatencyMetrics latencyMetrics;
private final Timer timeElapsedTimer;
Expand All @@ -37,6 +38,7 @@ public AbstractSink(final PluginSetting pluginSetting, int numRetries, int waitT
retryThread = null;
this.maxRetries = numRetries;
this.waitTimeMs = waitTimeMs;
this.pluginSetting = pluginSetting;
}

public AbstractSink(final PluginSetting pluginSetting) {
Expand Down Expand Up @@ -67,12 +69,22 @@ public void output(Collection<T> records) {
timeElapsedTimer.record(() -> doOutput(records));
}

@Override
public Object outputSync(Collection<T> records, boolean isQuery) {
recordsInCounter.increment(records.size()*1.0);
return timeElapsedTimer.record(() -> doOutputSync(records, isQuery));
}

/**
* This method should implement the output logic
* @param records Records to be output
*/
public abstract void doOutput(Collection<T> records);

public Object doOutputSync(Collection<T> records, boolean isQuery) {
return "";
}

@Override
public void shutdown() {
if (retryThread != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ public interface Sink<T extends Record<?>> {
*/
void output(Collection<T> records);

Object outputSync(Collection<T> records, boolean isQuery);

/**
* Prepare sink for shutdown, by cleaning up resources and threads.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.dataprepper.model.sink;

import org.opensearch.dataprepper.model.configuration.PluginModel;

import java.util.Collection;
import java.util.List;

Expand All @@ -18,17 +20,19 @@ public class SinkContext {

private final List<String> includeKeys;
private final List<String> excludeKeys;
private final List<PluginModel> responseActions;


public SinkContext(String tagsTargetKey, Collection<String> routes, List<String> includeKeys, List<String> excludeKeys) {
public SinkContext(String tagsTargetKey, Collection<String> routes, List<String> includeKeys, List<String> excludeKeys, final List<PluginModel> response_actions) {
this.tagsTargetKey = tagsTargetKey;
this.routes = routes;
this.includeKeys = includeKeys;
this.excludeKeys = excludeKeys;
responseActions = response_actions;
}

public SinkContext(String tagsTargetKey) {
this(tagsTargetKey, null, null, null);
this(tagsTargetKey, null, null, null, null);
}

/**
Expand Down Expand Up @@ -56,5 +60,9 @@ public List<String> getIncludeKeys() {
public List<String> getExcludeKeys() {
return excludeKeys;
}

public List<PluginModel> getResponseActions() {
return responseActions;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ void testSerializing_PipelinesDataFlowModel_empty_Plugins_with_nonEmpty_delay_an

final PluginModel source = new PluginModel("testSource", (Map<String, Object>) null);
final List<PluginModel> processors = Collections.singletonList(new PluginModel("testProcessor", (Map<String, Object>) null));
final List<SinkModel> sinks = Collections.singletonList(new SinkModel("testSink", Collections.emptyList(), null, Collections.emptyList(), Collections.emptyList(), null));
final List<SinkModel> sinks = Collections.singletonList(new SinkModel("testSink", Collections.emptyList(), null, Collections.emptyList(), Collections.emptyList(), null, Collections.emptyList()));
final PipelineModel pipelineModel = new PipelineModel(source, null, processors, null, sinks, 8, 50);

final PipelinesDataFlowModel pipelinesDataFlowModel = new PipelinesDataFlowModel(Collections.singletonMap(pipelineName, pipelineModel));
Expand All @@ -74,7 +74,7 @@ void testSerializing_PipelinesDataFlowModel_with_Version() throws IOException {
final DataPrepperVersion version = DataPrepperVersion.parse("2.0");
final PluginModel source = new PluginModel("testSource", (Map<String, Object>) null);
final List<PluginModel> processors = Collections.singletonList(new PluginModel("testProcessor", (Map<String, Object>) null));
final List<SinkModel> sinks = Collections.singletonList(new SinkModel("testSink", Collections.emptyList(), null, Collections.emptyList(), Collections.emptyList(), null));
final List<SinkModel> sinks = Collections.singletonList(new SinkModel("testSink", Collections.emptyList(), null, Collections.emptyList(), Collections.emptyList(), null, Collections.emptyList()));
final PipelineModel pipelineModel = new PipelineModel(source, null, processors, null, sinks, 8, 50);

final PipelinesDataFlowModel pipelinesDataFlowModel = new PipelinesDataFlowModel(version, Collections.singletonMap(pipelineName, pipelineModel));
Expand All @@ -95,7 +95,7 @@ void testSerializing_PipelinesDataFlowModel_skip_null_pipelineExtensions() throw

final PluginModel source = new PluginModel("testSource", (Map<String, Object>) null);
final List<PluginModel> processors = Collections.singletonList(new PluginModel("testProcessor", (Map<String, Object>) null));
final List<SinkModel> sinks = Collections.singletonList(new SinkModel("testSink", Collections.emptyList(), null, Collections.emptyList(), Collections.emptyList(), null));
final List<SinkModel> sinks = Collections.singletonList(new SinkModel("testSink", Collections.emptyList(), null, Collections.emptyList(), Collections.emptyList(), null, Collections.emptyList()));
final PipelineModel pipelineModel = new PipelineModel(source, null, processors, null, sinks, 8, 50);

final PipelinesDataFlowModel pipelinesDataFlowModel = new PipelinesDataFlowModel(
Expand All @@ -117,7 +117,7 @@ void testSerializing_PipelinesDataFlowModel_empty_Plugins_with_nonEmpty_delay_an

final PluginModel source = new PluginModel("testSource", (Map<String, Object>) null);
final List<PluginModel> preppers = Collections.singletonList(new PluginModel("testPrepper", (Map<String, Object>) null));
final List<SinkModel> sinks = Collections.singletonList(new SinkModel("testSink", Collections.singletonList("my-route"), null, Collections.emptyList(), Collections.emptyList(), null));
final List<SinkModel> sinks = Collections.singletonList(new SinkModel("testSink", Collections.singletonList("my-route"), null, Collections.emptyList(), Collections.emptyList(), null, Collections.emptyList()));
final PipelineModel pipelineModel = new PipelineModel(source, null, preppers, Collections.singletonList(new ConditionalRoute("my-route", "/a==b")), sinks, 8, 50);

final PipelinesDataFlowModel pipelinesDataFlowModel = new PipelinesDataFlowModel(Collections.singletonMap(pipelineName, pipelineModel));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ void serialize_into_known_SinkModel() throws IOException {
pluginSettings.put("key1", "value1");
pluginSettings.put("key2", "value2");
final String tagsTargetKey = "tags";
final SinkModel sinkModel = new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), tagsTargetKey, Collections.emptyList(), Collections.emptyList(), pluginSettings);
final SinkModel sinkModel = new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), tagsTargetKey, Collections.emptyList(), Collections.emptyList(), pluginSettings, Collections.emptyList());

final String actualJson = objectMapper.writeValueAsString(sinkModel);

Expand Down Expand Up @@ -134,7 +134,7 @@ void serialize_with_just_pluginModel() throws IOException {
pluginSettings.put("key1", "value1");
pluginSettings.put("key2", "value2");
pluginSettings.put("key3", "value3");
final SinkModel sinkModel = new SinkModel("customPlugin", null, null, Collections.emptyList(), Collections.emptyList(), pluginSettings);
final SinkModel sinkModel = new SinkModel("customPlugin", null, null, Collections.emptyList(), Collections.emptyList(), pluginSettings, Collections.emptyList());

final String actualJson = objectMapper.writeValueAsString(sinkModel);

Expand All @@ -146,7 +146,7 @@ void serialize_with_just_pluginModel() throws IOException {
@Test
void sinkModel_with_include_keys() {
final Map<String, Object> pluginSettings = new LinkedHashMap<>();
final SinkModel sinkModel = new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), null, Arrays.asList("bcd", "abc", "efg"), null, pluginSettings);
final SinkModel sinkModel = new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), null, Arrays.asList("bcd", "abc", "efg"), null, pluginSettings, Collections.emptyList());

assertThat(sinkModel.getExcludeKeys(), equalTo(new ArrayList<String>()));
assertThat(sinkModel.getIncludeKeys(), equalTo(Arrays.asList("bcd", "abc", "efg")));
Expand All @@ -156,13 +156,13 @@ void sinkModel_with_include_keys() {
@Test
void sinkModel_with_invalid_include_keys() {
final Map<String, Object> pluginSettings = new LinkedHashMap<>();
assertThrows(InvalidPluginConfigurationException.class, () -> new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), null, List.of("/bcd"), List.of(), pluginSettings));
assertThrows(InvalidPluginConfigurationException.class, () -> new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), null, List.of("/bcd"), List.of(), pluginSettings, Collections.emptyList()));
}

@Test
void sinkModel_with_exclude_keys() {
final Map<String, Object> pluginSettings = new LinkedHashMap<>();
final SinkModel sinkModel = new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), null, List.of(), Arrays.asList("abc", "bcd", "efg"), pluginSettings);
final SinkModel sinkModel = new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), null, List.of(), Arrays.asList("abc", "bcd", "efg"), pluginSettings, Collections.emptyList());

assertThat(sinkModel.getIncludeKeys(), equalTo(new ArrayList<String>()));
assertThat(sinkModel.getExcludeKeys(), equalTo(Arrays.asList("abc", "bcd", "efg")));
Expand All @@ -172,15 +172,15 @@ void sinkModel_with_exclude_keys() {
@Test
void sinkModel_with_invalid_exclude_keys() {
final Map<String, Object> pluginSettings = new LinkedHashMap<>();
assertThrows(InvalidPluginConfigurationException.class, () -> new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), null, List.of(), List.of("/bcd"), pluginSettings));
assertThrows(InvalidPluginConfigurationException.class, () -> new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), null, List.of(), List.of("/bcd"), pluginSettings, Collections.emptyList()));
}



@Test
void sinkModel_with_both_include_and_exclude_keys() {
final Map<String, Object> pluginSettings = new LinkedHashMap<>();
assertThrows(InvalidPluginConfigurationException.class, () -> new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), null, List.of("abc"), List.of("bcd"), pluginSettings));
assertThrows(InvalidPluginConfigurationException.class, () -> new SinkModel("customSinkPlugin", Arrays.asList("routeA", "routeB"), null, List.of("abc"), List.of("bcd"), pluginSettings, Collections.emptyList()));
}

@Nested
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.model.configuration.PluginModel;

import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -40,8 +41,9 @@ public void testOutputCodecContextAdapter() {
final String testTagsTargetKey = RandomStringUtils.randomAlphabetic(6);
final List<String> testIncludeKeys = Collections.emptyList();
final List<String> testExcludeKeys = Collections.emptyList();
final List<PluginModel> testResponseActions = Collections.emptyList();

SinkContext sinkContext = new SinkContext(testTagsTargetKey, null, testIncludeKeys, testExcludeKeys);
SinkContext sinkContext = new SinkContext(testTagsTargetKey, null, testIncludeKeys, testExcludeKeys, testResponseActions);

OutputCodecContext codecContext = OutputCodecContext.fromSinkContext(sinkContext);
assertThat(codecContext.getTagsTargetKey(), equalTo(testTagsTargetKey));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.model.configuration.PluginModel;

import java.util.Collections;
import java.util.List;
Expand All @@ -24,7 +25,8 @@ public void testSinkContextBasic() {
final List<String> testRoutes = Collections.emptyList();
final List<String> testIncludeKeys = Collections.emptyList();
final List<String> testExcludeKeys = Collections.emptyList();
sinkContext = new SinkContext(testTagsTargetKey, testRoutes, testIncludeKeys, testExcludeKeys);
final List<PluginModel> testResponseActions = Collections.emptyList();
sinkContext = new SinkContext(testTagsTargetKey, testRoutes, testIncludeKeys, testExcludeKeys, testResponseActions);
assertThat(sinkContext.getTagsTargetKey(), equalTo(testTagsTargetKey));
assertThat(sinkContext.getRoutes(), equalTo(testRoutes));
assertThat(sinkContext.getIncludeKeys(), equalTo(testIncludeKeys));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ public void initialize() {
@Override
public void output(Collection<Record<?>> records) {
}


@Override
public Object outputSync(Collection<Record<?>> records, boolean isQuery) {
return null;
}
};

SinkTestClass sink;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ public void output(final Collection<Record<Event>> records) {
});
}

@Override
public Object outputSync(final Collection<Record<Event>> records, boolean isQuery) {
inMemorySinkAccessor.addEvents(testingKey, records);
return null;
}

@Override
public void shutdown() {

Expand Down
Loading

0 comments on commit fa17066

Please sign in to comment.