Skip to content

Commit

Permalink
Corrects the JSON output codec to write Events as provided rather tha…
Browse files Browse the repository at this point in the history
…n convert to string. Also fixes the include/exclude keys. Adds a boolean check in OutputCodecContext so that this can be used by other codecs. (opensearch-project#3195)

Signed-off-by: David Venable <dlv@amazon.com>
  • Loading branch information
dlvenable authored Aug 18, 2023
1 parent 3fcf5c1 commit f9a3a60
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import java.util.Collections;
import java.util.List;
import java.util.function.Predicate;

/**
* Data Prepper Output Codec Context class.
Expand All @@ -18,6 +19,7 @@ public class OutputCodecContext {

private final List<String> includeKeys;
private final List<String> excludeKeys;
private final Predicate<String> inclusionPredicate;

public OutputCodecContext() {
this(null, Collections.emptyList(), Collections.emptyList());
Expand All @@ -28,6 +30,14 @@ public OutputCodecContext(String tagsTargetKey, List<String> includeKeys, List<S
this.tagsTargetKey = tagsTargetKey;
this.includeKeys = includeKeys;
this.excludeKeys = excludeKeys;

if (includeKeys != null && !includeKeys.isEmpty()) {
inclusionPredicate = k -> includeKeys.contains(k);
} else if (excludeKeys != null && !excludeKeys.isEmpty()) {
inclusionPredicate = k -> !excludeKeys.contains(k);
} else {
inclusionPredicate = k -> true;
}
}


Expand All @@ -49,4 +59,8 @@ public List<String> getIncludeKeys() {
public List<String> getExcludeKeys() {
return excludeKeys;
}

public boolean shouldIncludeKey(String key) {
return inclusionPredicate.test(key);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,13 @@

import java.util.Collections;
import java.util.List;
import java.util.UUID;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.jupiter.api.Assertions.assertNull;

public class OutputCodecContextTest {


@Test
public void testOutputCodecContextBasic() {
final String testTagsTargetKey = RandomStringUtils.randomAlphabetic(6);
Expand All @@ -32,8 +31,6 @@ public void testOutputCodecContextBasic() {
assertNull(emptyContext.getTagsTargetKey());
assertThat(emptyContext.getIncludeKeys(), equalTo(testIncludeKeys));
assertThat(emptyContext.getExcludeKeys(), equalTo(testExcludeKeys));


}

@Test
Expand All @@ -53,7 +50,43 @@ public void testOutputCodecContextAdapter() {
assertNull(emptyContext.getTagsTargetKey());
assertThat(emptyContext.getIncludeKeys(), equalTo(testIncludeKeys));
assertThat(emptyContext.getExcludeKeys(), equalTo(testExcludeKeys));
}

@Test
void shouldIncludeKey_returns_expected_when_no_include_exclude() {
OutputCodecContext objectUnderTest = new OutputCodecContext(null, null, null);
assertThat(objectUnderTest.shouldIncludeKey(UUID.randomUUID().toString()), equalTo(true));
}

@Test
void shouldIncludeKey_returns_expected_when_empty_lists_for_include_exclude() {
OutputCodecContext objectUnderTest = new OutputCodecContext(null, Collections.emptyList(), Collections.emptyList());
assertThat(objectUnderTest.shouldIncludeKey(UUID.randomUUID().toString()), equalTo(true));
}

@Test
void shouldIncludeKey_returns_expected_when_includeKey() {
String includeKey1 = UUID.randomUUID().toString();
String includeKey2 = UUID.randomUUID().toString();
final List<String> includeKeys = List.of(includeKey1, includeKey2);

OutputCodecContext objectUnderTest = new OutputCodecContext(null, includeKeys, null);

assertThat(objectUnderTest.shouldIncludeKey(includeKey1), equalTo(true));
assertThat(objectUnderTest.shouldIncludeKey(includeKey2), equalTo(true));
assertThat(objectUnderTest.shouldIncludeKey(UUID.randomUUID().toString()), equalTo(false));
}

@Test
void shouldIncludeKey_returns_expected_when_excludeKey() {
String excludeKey1 = UUID.randomUUID().toString();
String excludeKey2 = UUID.randomUUID().toString();
final List<String> excludeKeys = List.of(excludeKey1, excludeKey2);

OutputCodecContext objectUnderTest = new OutputCodecContext(null, null, excludeKeys);

assertThat(objectUnderTest.shouldIncludeKey(excludeKey1), equalTo(false));
assertThat(objectUnderTest.shouldIncludeKey(excludeKey2), equalTo(false));
assertThat(objectUnderTest.shouldIncludeKey(UUID.randomUUID().toString()), equalTo(true));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.codec.OutputCodec;
Expand All @@ -15,18 +17,21 @@

import java.io.IOException;
import java.io.OutputStream;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* An implementation of {@link OutputCodec} which deserializes Data-Prepper events
* and writes them to Output Stream as JSON Data
*/
@DataPrepperPlugin(name = "json", pluginType = OutputCodec.class, pluginConfigurationType = JsonOutputCodecConfig.class)
public class JsonOutputCodec implements OutputCodec {

private final ObjectMapper objectMapper = new ObjectMapper();
private static final String JSON = "json";
private static final JsonFactory factory = new JsonFactory();
JsonOutputCodecConfig config;
private final JsonOutputCodecConfig config;
private JsonGenerator generator;
private OutputCodecContext codecContext;

Expand All @@ -36,6 +41,11 @@ public JsonOutputCodec(final JsonOutputCodecConfig config) {
this.config = config;
}

@Override
public String getExtension() {
return JSON;
}

@Override
public void start(final OutputStream outputStream, Event event, final OutputCodecContext codecContext) throws IOException {
Objects.requireNonNull(outputStream);
Expand All @@ -59,27 +69,30 @@ public void complete(final OutputStream outputStream) throws IOException {
@Override
public synchronized void writeEvent(final Event event, final OutputStream outputStream) throws IOException {
Objects.requireNonNull(event);
Map<String, Object> dataMap = getDataMapToSerialize(event);
objectMapper.writeValue(generator, dataMap);
generator.flush();
}

private Map<String, Object> getDataMapToSerialize(Event event) throws JsonProcessingException {
final Event modifiedEvent;
if (codecContext.getTagsTargetKey() != null) {
modifiedEvent = addTagsToEvent(event, codecContext.getTagsTargetKey());
} else {
modifiedEvent = event;
}
generator.writeStartObject();
final boolean isExcludeKeyAvailable = !codecContext.getExcludeKeys().isEmpty();
for (final String key : modifiedEvent.toMap().keySet()) {
if (isExcludeKeyAvailable && codecContext.getExcludeKeys().contains(key)) {
continue;
}
generator.writeStringField(key, modifiedEvent.toMap().get(key).toString());
}
generator.writeEndObject();
generator.flush();
}
Map<String, Object> dataMap = modifiedEvent.toMap();

@Override
public String getExtension() {
return JSON;
if ((codecContext.getIncludeKeys() != null && !codecContext.getIncludeKeys().isEmpty()) ||
(codecContext.getExcludeKeys() != null && !codecContext.getExcludeKeys().isEmpty())) {

Map<String, Object> finalDataMap = dataMap;
dataMap = dataMap.keySet()
.stream()
.filter(codecContext::shouldIncludeKey)
.collect(Collectors.toMap(Function.identity(), finalDataMap::get));
}
return dataMap;
}
}

Expand Down
Loading

0 comments on commit f9a3a60

Please sign in to comment.