Skip to content

Commit

Permalink
ENH: support plugin loading in conifg (opensearch-project#4974)
Browse files Browse the repository at this point in the history
---------

Signed-off-by: George Chen <qchea@amazon.com>
  • Loading branch information
chenqi0805 authored Oct 2, 2024
1 parent 99da08a commit 7bac644
Show file tree
Hide file tree
Showing 17 changed files with 264 additions and 36 deletions.
2 changes: 1 addition & 1 deletion data-prepper-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ dependencies {
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jdk8'
implementation libs.parquet.common
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
implementation libs.commons.lang3
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
testImplementation project(':data-prepper-test-common')
testImplementation 'org.skyscreamer:jsonassert:1.5.3'
testImplementation libs.commons.io
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.opensearch.dataprepper.model.annotations;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Annotates a field that uses Data Prepper plugin config as its value.
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD})
public @interface UsesDataPrepperPlugin {
/**
* The class type for this plugin.
*
* @return The Java class
* @since 1.2
*/
Class<?> pluginType();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.dataprepper.model.annotations.DataPrepperPlugin.DEFAULT_ALTERNATE_NAME;
import static org.opensearch.dataprepper.model.annotations.DataPrepperPlugin.DEFAULT_DEPRECATED_NAME;
Expand Down Expand Up @@ -60,6 +61,20 @@ public <T> Optional<Class<? extends T>> findPluginClass(final Class<T> pluginTyp
return Optional.ofNullable((Class<? extends T>) supportedTypesMap.get(pluginType));
}

@Override
public <T> Set<Class<? extends T>> findPluginClasses(Class<T> pluginType) {
if (nameToSupportedTypeToPluginType == null) {
nameToSupportedTypeToPluginType = scanForPlugins();
}

return nameToSupportedTypeToPluginType.values().stream()
.flatMap(supportedTypeToPluginType ->
supportedTypeToPluginType.entrySet().stream()
.filter(entry -> pluginType.equals(entry.getKey()))
.flatMap(entry -> Stream.of((Class<? extends T>) entry.getValue())))
.collect(Collectors.toSet());
}

private Map<String, Map<Class<?>, Class<?>>> scanForPlugins() {
final Set<Class<?>> dataPrepperPluginClasses =
reflections.getTypesAnnotatedWith(DataPrepperPlugin.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.plugin;

import java.util.Collection;
import java.util.Optional;

/**
Expand All @@ -27,4 +28,15 @@ public interface PluginProvider {
* @since 1.2
*/
<T> Optional<Class<? extends T>> findPluginClass(Class<T> pluginType, String pluginName);

/**
* Finds the Java classes for a specific pluginType.
*
* @param pluginType The type of plugin which is being supported.
* e.g. {@link org.opensearch.dataprepper.model.sink.Sink}.
* @param <T> The type
* @return An {@link Collection} of Java classes for plugins
* @since 1.2
*/
<T> Collection<Class<? extends T>> findPluginClasses(Class<T> pluginType);
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,24 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.sink.Sink;
import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.plugins.test.TestSink;
import org.opensearch.dataprepper.plugins.test.TestSource;
import org.reflections.Reflections;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.BDDMockito.given;
Expand Down Expand Up @@ -61,6 +65,22 @@ void findPlugin_should_scan_for_plugins() {
.getTypesAnnotatedWith(DataPrepperPlugin.class);
}

@Test
void findPlugins_should_scan_for_plugins() {
final ClasspathPluginProvider objectUnderTest = createObjectUnderTest();

then(reflections).shouldHaveNoInteractions();

given(reflections.getTypesAnnotatedWith(DataPrepperPlugin.class))
.willReturn(Collections.emptySet());

objectUnderTest.findPluginClasses(Sink.class);

then(reflections)
.should()
.getTypesAnnotatedWith(DataPrepperPlugin.class);
}

@Test
void findPlugin_should_scan_for_plugins_only_once() {
final ClasspathPluginProvider objectUnderTest = createObjectUnderTest();
Expand All @@ -76,6 +96,21 @@ void findPlugin_should_scan_for_plugins_only_once() {
.getTypesAnnotatedWith(DataPrepperPlugin.class);
}

@Test
void findPlugins_should_scan_for_plugins_only_once() {
final ClasspathPluginProvider objectUnderTest = createObjectUnderTest();

given(reflections.getTypesAnnotatedWith(DataPrepperPlugin.class))
.willReturn(Collections.emptySet());

for (int i = 0; i < 10; i++)
objectUnderTest.findPluginClasses(Sink.class);

then(reflections)
.should()
.getTypesAnnotatedWith(DataPrepperPlugin.class);
}

@Test
void findPlugin_should_return_empty_if_no_plugins_found() {
given(reflections.getTypesAnnotatedWith(DataPrepperPlugin.class))
Expand Down Expand Up @@ -130,6 +165,17 @@ void findPlugin_should_return_plugin_if_found_for_alternate_name_and_type_using_
assertThat(optionalPlugin.get(), equalTo(TestSource.class));
}

@Test
void findPlugins_should_return_empty_if_no_plugins_found() {
given(reflections.getTypesAnnotatedWith(DataPrepperPlugin.class))
.willReturn(Collections.emptySet());

final Collection<Class<? extends PluginSetting>> foundPlugins = createObjectUnderTest().findPluginClasses(
PluginSetting.class);
assertThat(foundPlugins, notNullValue());
assertThat(foundPlugins.isEmpty(), is(true));
}

@Nested
class WithPredefinedPlugins {

Expand Down Expand Up @@ -161,5 +207,13 @@ void findPlugin_should_return_plugin_if_found_for_name_and_type_using_pluginType
assertThat(optionalPlugin.isPresent(), equalTo(true));
assertThat(optionalPlugin.get(), equalTo(TestSink.class));
}

@Test
void findPlugins_should_return_plugins_if_plugin_found_for_specified_type() {
final Set<Class<? extends Source>> foundPlugins = createObjectUnderTest().findPluginClasses(Source.class);
assertThat(foundPlugins, notNullValue());
assertThat(foundPlugins.size(), equalTo(1));
assertThat(foundPlugins.stream().iterator().next(), equalTo(TestSource.class));
}
}
}
1 change: 1 addition & 0 deletions data-prepper-plugin-schema-cli/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ application {

dependencies {
implementation project(':data-prepper-plugins')
implementation project(':data-prepper-plugin-framework')
implementation project(':data-prepper-plugin-schema')
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'org.reflections:reflections:0.10.2'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@
import com.github.victools.jsonschema.generator.SchemaVersion;
import com.github.victools.jsonschema.module.jakarta.validation.JakartaValidationModule;
import com.github.victools.jsonschema.module.jakarta.validation.JakartaValidationOption;
import org.opensearch.dataprepper.plugin.ClasspathPluginProvider;
import org.opensearch.dataprepper.plugin.PluginProvider;
import org.opensearch.dataprepper.schemas.module.CustomJacksonModule;
import org.reflections.Reflections;
import org.reflections.scanners.Scanners;
import org.reflections.util.ClasspathHelper;
import org.reflections.util.ConfigurationBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine;
Expand Down Expand Up @@ -58,11 +56,9 @@ public void run() {
new JakartaValidationModule(JakartaValidationOption.NOT_NULLABLE_FIELD_IS_REQUIRED,
JakartaValidationOption.INCLUDE_PATTERN_EXPRESSIONS)
);
final Reflections reflections = new Reflections(new ConfigurationBuilder()
.setUrls(ClasspathHelper.forPackage(DEFAULT_PLUGINS_CLASSPATH))
.setScanners(Scanners.TypesAnnotated, Scanners.SubTypes));
final PluginProvider pluginProvider = new ClasspathPluginProvider();
final PluginConfigsJsonSchemaConverter pluginConfigsJsonSchemaConverter = new PluginConfigsJsonSchemaConverter(
reflections, new JsonSchemaConverter(modules), siteUrl, siteBaseUrl);
pluginProvider, new JsonSchemaConverter(modules, pluginProvider), siteUrl, siteBaseUrl);
final Class<?> pluginType = pluginConfigsJsonSchemaConverter.pluginTypeNameToPluginType(pluginTypeName);
final Map<String, String> pluginNameToJsonSchemaMap = pluginConfigsJsonSchemaConverter.convertPluginConfigsIntoJsonSchemas(
SchemaVersion.DRAFT_2020_12, OptionPreset.PLAIN_JSON, pluginType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,27 @@
import com.github.victools.jsonschema.generator.SchemaGeneratorConfig;
import com.github.victools.jsonschema.generator.SchemaGeneratorConfigBuilder;
import com.github.victools.jsonschema.generator.SchemaGeneratorConfigPart;
import com.github.victools.jsonschema.generator.SchemaGeneratorGeneralConfigPart;
import com.github.victools.jsonschema.generator.SchemaVersion;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.UsesDataPrepperPlugin;
import org.opensearch.dataprepper.plugin.PluginProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

public class JsonSchemaConverter {
private static final Logger LOG = LoggerFactory.getLogger(JsonSchemaConverter.class);
static final String DEPRECATED_SINCE_KEY = "deprecated";
private final List<Module> jsonSchemaGeneratorModules;
private final PluginProvider pluginProvider;

public JsonSchemaConverter(final List<Module> jsonSchemaGeneratorModules) {
public JsonSchemaConverter(final List<Module> jsonSchemaGeneratorModules, final PluginProvider pluginProvider) {
this.jsonSchemaGeneratorModules = jsonSchemaGeneratorModules;
this.pluginProvider = pluginProvider;
}

public ObjectNode convertIntoJsonSchema(
Expand All @@ -30,7 +41,9 @@ public ObjectNode convertIntoJsonSchema(
loadJsonSchemaGeneratorModules(configBuilder);
final SchemaGeneratorConfigPart<FieldScope> scopeSchemaGeneratorConfigPart = configBuilder.forFields();
overrideInstanceAttributeWithDeprecated(scopeSchemaGeneratorConfigPart);
overrideTargetTypeWithUsesDataPrepperPlugin(scopeSchemaGeneratorConfigPart);
resolveDefaultValueFromJsonProperty(scopeSchemaGeneratorConfigPart);
overrideDataPrepperPluginTypeAttribute(configBuilder.forTypesInGeneral(), schemaVersion, optionPreset);

final SchemaGeneratorConfig config = configBuilder.build();
final SchemaGenerator generator = new SchemaGenerator(config);
Expand All @@ -52,6 +65,37 @@ private void overrideInstanceAttributeWithDeprecated(
});
}

private void overrideTargetTypeWithUsesDataPrepperPlugin(
final SchemaGeneratorConfigPart<FieldScope> scopeSchemaGeneratorConfigPart) {
scopeSchemaGeneratorConfigPart.withTargetTypeOverridesResolver(field -> Optional
.ofNullable(field.getAnnotationConsideringFieldAndGetterIfSupported(UsesDataPrepperPlugin.class))
.map(usesDataPrepperPlugin ->
pluginProvider.findPluginClasses(usesDataPrepperPlugin.pluginType()).stream())
.map(stream -> stream.map(specificSubtype -> field.getContext().resolve(specificSubtype)))
.map(stream -> stream.collect(Collectors.toList()))
.orElse(null));
}

private void overrideDataPrepperPluginTypeAttribute(
final SchemaGeneratorGeneralConfigPart schemaGeneratorGeneralConfigPart,
final SchemaVersion schemaVersion, final OptionPreset optionPreset) {
schemaGeneratorGeneralConfigPart.withTypeAttributeOverride((node, scope, context) -> {
final DataPrepperPlugin dataPrepperPlugin = scope.getType().getErasedType()
.getAnnotation(DataPrepperPlugin.class);
if (dataPrepperPlugin != null) {
final ObjectNode propertiesNode = node.putObject("properties");
try {
final ObjectNode schemaNode = this.convertIntoJsonSchema(
schemaVersion, optionPreset, dataPrepperPlugin.pluginConfigurationType());
propertiesNode.set(dataPrepperPlugin.name(), schemaNode);
} catch (JsonProcessingException e) {
LOG.error("Encountered error retrieving JSON schema for {}", dataPrepperPlugin.name(), e);
throw new RuntimeException(e);
}
}
});
}

private void resolveDefaultValueFromJsonProperty(
final SchemaGeneratorConfigPart<FieldScope> scopeSchemaGeneratorConfigPart) {
scopeSchemaGeneratorConfigPart.withDefaultResolver(field -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.opensearch.dataprepper.schemas;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.github.victools.jsonschema.generator.OptionPreset;
import com.github.victools.jsonschema.generator.SchemaVersion;
Expand All @@ -10,7 +9,7 @@
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.sink.Sink;
import org.opensearch.dataprepper.model.source.Source;
import org.reflections.Reflections;
import org.opensearch.dataprepper.plugin.PluginProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -51,15 +50,15 @@ public class PluginConfigsJsonSchemaConverter {

private final String siteUrl;
private final String siteBaseUrl;
private final Reflections reflections;
private final PluginProvider pluginProvider;
private final JsonSchemaConverter jsonSchemaConverter;

public PluginConfigsJsonSchemaConverter(
final Reflections reflections,
final PluginProvider pluginProvider,
final JsonSchemaConverter jsonSchemaConverter,
final String siteUrl,
final String siteBaseUrl) {
this.reflections = reflections;
this.pluginProvider = pluginProvider;
this.jsonSchemaConverter = jsonSchemaConverter;
this.siteUrl = siteUrl == null ? SITE_URL_PLACEHOLDER : siteUrl;
this.siteBaseUrl = siteBaseUrl == null ? SITE_BASE_URL_PLACEHOLDER : siteBaseUrl;
Expand Down Expand Up @@ -90,8 +89,8 @@ public Map<String, String> convertPluginConfigsIntoJsonSchemas(
addPluginName(jsonSchemaNode, pluginName);
addDocumentationLink(jsonSchemaNode, pluginName, pluginType);
value = jsonSchemaNode.toPrettyString();
} catch (JsonProcessingException e) {
LOG.error("Encountered error retrieving JSON schema for {}", pluginName);
} catch (final Exception e) {
LOG.error("Encountered error retrieving JSON schema for {}", pluginName, e);
return Stream.empty();
}
return Stream.of(Map.entry(entry.getKey(), value));
Expand All @@ -107,7 +106,7 @@ private Map<String, Class<?>> scanForPluginConfigs(final Class<?> pluginType) {
if (ConditionalRoute.class.equals(pluginType)) {
return Map.of(CONDITIONAL_ROUTE_PROCESSOR_NAME, ConditionalRoute.class);
}
return reflections.getTypesAnnotatedWith(DataPrepperPlugin.class).stream()
return pluginProvider.findPluginClasses(pluginType).stream()
.map(clazz -> clazz.getAnnotation(DataPrepperPlugin.class))
.filter(dataPrepperPlugin -> pluginType.equals(dataPrepperPlugin.pluginType()))
.collect(Collectors.toMap(
Expand Down
Loading

0 comments on commit 7bac644

Please sign in to comment.