Skip to content

Commit

Permalink
Support plugins defining the EventKey in the plugin configuration cla…
Browse files Browse the repository at this point in the history
…sses. Data Prepper will deserialize the EventKey from the pipeline configuration and validate @notempty validations. Builds on the opensearch-project#1916. (opensearch-project#4635)

Signed-off-by: David Venable <dlv@amazon.com>
  • Loading branch information
dlvenable authored Jun 19, 2024
1 parent 4e947c2 commit 7d16ea1
Show file tree
Hide file tree
Showing 15 changed files with 431 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.event;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* An annotation for an {@link EventKey} used in a Data Prepper pipeline configuration.
* <p>
* Unless you need all actions on a configuration, you should use this annotation to
* provide the most appropriate validation.
*
* @since 2.9
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD})
public @interface EventKeyConfiguration {
/**
* Defines the {@link EventKeyFactory.EventAction}s to use when creating the {@link EventKey}
* for the configuration.
*
* @return The desired event actions.
* @since 2.9
*/
EventKeyFactory.EventAction[] value();
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ void run_with_single_record() {
assertThat(records.get(0).getData(), notNullValue());
assertThat(records.get(0).getData().get("message", String.class), equalTo(messageValue));
assertThat(records.get(0).getData().get("test1", String.class), equalTo("knownPrefix10"));
assertThat(records.get(0).getData().get("test1_copy", String.class), equalTo("knownPrefix10"));
}

@Test
Expand Down Expand Up @@ -113,6 +114,8 @@ void pipeline_with_single_batch_of_records() {
equalTo(inputRecord.getData().get("message", String.class)));
assertThat(recordData.get("test1", String.class),
equalTo("knownPrefix1" + i));
assertThat(recordData.get("test1_copy", String.class),
equalTo("knownPrefix1" + i));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins;

import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;

import java.util.Collection;

@DataPrepperPlugin(name = "simple_copy_test", pluginType = Processor.class, pluginConfigurationType = SimpleCopyProcessorConfig.class)
public class SimpleCopyProcessor implements Processor<Record<Event>, Record<Event>> {
private final SimpleCopyProcessorConfig simpleCopyProcessorConfig;
int count = 0;

@DataPrepperPluginConstructor
public SimpleCopyProcessor(final SimpleCopyProcessorConfig simpleCopyProcessorConfig) {
this.simpleCopyProcessorConfig = simpleCopyProcessorConfig;
}

@Override
public Collection<Record<Event>> execute(final Collection<Record<Event>> records) {
for (final Record<Event> record : records) {
final Object value = record.getData().get(simpleCopyProcessorConfig.getSource(), Object.class);
record.getData().put(simpleCopyProcessorConfig.getTarget(), value);
count++;
}

return records;
}

@Override
public void prepareForShutdown() {

}

@Override
public boolean isReadyForShutdown() {
return false;
}

@Override
public void shutdown() {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins;

import org.opensearch.dataprepper.model.event.EventKey;
import org.opensearch.dataprepper.model.event.EventKeyConfiguration;
import org.opensearch.dataprepper.model.event.EventKeyFactory;

public class SimpleCopyProcessorConfig {
@EventKeyConfiguration(EventKeyFactory.EventAction.GET)
private EventKey source;
private EventKey target;

public EventKey getSource() {
return source;
}

public EventKey getTarget() {
return target;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventKey;
import org.opensearch.dataprepper.model.event.EventKeyFactory;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;

Expand All @@ -22,10 +21,8 @@ public class SimpleProcessor implements Processor<Record<Event>, Record<Event>>
int count = 0;

@DataPrepperPluginConstructor
public SimpleProcessor(
final SimpleProcessorConfig simpleProcessorConfig,
final EventKeyFactory eventKeyFactory) {
eventKey1 = eventKeyFactory.createEventKey(simpleProcessorConfig.getKey1());
public SimpleProcessor(final SimpleProcessorConfig simpleProcessorConfig) {
eventKey1 = simpleProcessorConfig.getKey1();
valuePrefix1 = simpleProcessorConfig.getValuePrefix1();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@

package org.opensearch.dataprepper.plugins;

import org.opensearch.dataprepper.model.event.EventKey;
import org.opensearch.dataprepper.model.event.EventKeyConfiguration;
import org.opensearch.dataprepper.model.event.EventKeyFactory;

public class SimpleProcessorConfig {
private String key1;
@EventKeyConfiguration(EventKeyFactory.EventAction.PUT)
private EventKey key1;
private String valuePrefix1;

public String getKey1() {
public EventKey getKey1() {
return key1;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ processor-pipeline:
- simple_test:
key1: /test1
value_prefix1: knownPrefix1
- simple_copy_test:
source: /test1
target: /test1_copy

sink:
- in_memory:
Expand Down
1 change: 1 addition & 0 deletions data-prepper-pipeline-parser/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ dependencies {
implementation 'org.projectlombok:lombok:1.18.22'
implementation 'com.jayway.jsonpath:json-path:2.6.0'
implementation 'javax.inject:javax.inject:1'
implementation 'javax.annotation:javax.annotation-api:1.3.2'
implementation(libs.spring.core) {
exclude group: 'commons-logging', module: 'commons-logging'
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.core.validators;

import jakarta.validation.ConstraintValidator;
import jakarta.validation.ConstraintValidatorContext;
import org.opensearch.dataprepper.model.event.EventKey;

import jakarta.validation.constraints.NotEmpty;

public class NotEmptyValidatorForEventKey implements ConstraintValidator<NotEmpty, EventKey> {
@Override
public boolean isValid(final EventKey eventKey, final ConstraintValidatorContext constraintValidatorContext) {
if(eventKey == null) {
return false;
}
return !eventKey.getKey().isEmpty();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.pipeline.parser;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.BeanProperty;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.deser.ContextualDeserializer;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import org.opensearch.dataprepper.model.event.EventKey;
import org.opensearch.dataprepper.model.event.EventKeyConfiguration;
import org.opensearch.dataprepper.model.event.EventKeyFactory;

import java.io.IOException;

public class EventKeyDeserializer extends StdDeserializer<EventKey> implements ContextualDeserializer {
private final EventKeyFactory eventKeyFactory;
private final EventKeyFactory.EventAction[] eventAction;

/**
* Constructs a new {@link EventKeyDeserializer} from an {@link EventKeyFactory}.
*
* @param eventKeyFactory The factory for creating {@link EventKey} objects.
*/
public EventKeyDeserializer(final EventKeyFactory eventKeyFactory) {
this(eventKeyFactory, new EventKeyFactory.EventAction[] {EventKeyFactory.EventAction.ALL});
}

private EventKeyDeserializer(final EventKeyFactory eventKeyFactory, final EventKeyFactory.EventAction[] eventAction) {
super(EventKey.class);
this.eventKeyFactory = eventKeyFactory;
this.eventAction = eventAction;
}

@Override
public EventKey deserialize(final JsonParser parser, final DeserializationContext ctxt) throws IOException {
final String eventKeyString = parser.getValueAsString();

return eventKeyFactory.createEventKey(eventKeyString, eventAction);
}

@Override
public JsonDeserializer<?> createContextual(final DeserializationContext deserializationContext, final BeanProperty property) {
if(property == null)
return this;

final EventKeyConfiguration eventKeyConfiguration = property.getAnnotation(EventKeyConfiguration.class);

if(eventKeyConfiguration == null)
return this;

final EventKeyFactory.EventAction[] eventAction = eventKeyConfiguration.value();

return new EventKeyDeserializer(eventKeyFactory, eventAction);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#
# Copyright OpenSearch Contributors
# SPDX-License-Identifier: Apache-2.0
#

org.opensearch.dataprepper.core.validators.NotEmptyValidatorForEventKey
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.core.validators;

import jakarta.validation.ConstraintValidatorContext;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.event.EventKey;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class NotEmptyValidatorForEventKeyTest {
@Mock
private EventKey eventKey;

@Mock
private ConstraintValidatorContext context;

private NotEmptyValidatorForEventKey createObjectUnderTest() {
return new NotEmptyValidatorForEventKey();
}

@Test
void isValid_returns_false_if_EventKey_is_empty() {
assertThat(createObjectUnderTest().isValid(null, context), equalTo(false));
}

@Test
void isValid_returns_false_if_EventKey_getKey_is_empty() {
when(eventKey.getKey()).thenReturn("");
assertThat(createObjectUnderTest().isValid(eventKey, context), equalTo(false));
}

@ParameterizedTest
@ValueSource(strings = {"/", "a", "/abcdefghijklmnopqrstuvwxyz"})
void isValid_returns_true_if_EventKey_getKey_is_not_empty(final String key) {
when(eventKey.getKey()).thenReturn(key);
assertThat(createObjectUnderTest().isValid(eventKey, context), equalTo(true));
}
}
Loading

0 comments on commit 7d16ea1

Please sign in to comment.