diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/PipelinesDataFlowModel.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/PipelinesDataFlowModel.java index 12b092ac36..7eea063bd0 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/PipelinesDataFlowModel.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/PipelinesDataFlowModel.java @@ -27,12 +27,13 @@ * @since 1.2 */ public class PipelinesDataFlowModel { + public static final String EXTENSION_PLUGIN_TYPE = "extension"; @JsonInclude(JsonInclude.Include.NON_NULL) private DataPrepperVersion version; @JsonAlias("pipeline_configurations") - @JsonProperty("extension") + @JsonProperty(EXTENSION_PLUGIN_TYPE) @JsonInclude(JsonInclude.Include.NON_NULL) @JsonSetter(nulls = Nulls.SKIP) private PipelineExtensions pipelineExtensions; diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryIT.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryIT.java index ea0654563a..89ab07d11d 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryIT.java +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryIT.java @@ -17,6 +17,7 @@ import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.plugins.TestObjectPlugin; import org.opensearch.dataprepper.plugins.test.TestPlugin; +import org.opensearch.dataprepper.validation.PluginErrorCollector; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import java.util.HashMap; @@ -61,6 +62,7 @@ private DefaultPluginFactory createObjectUnderTest() { coreContext.scan(DefaultAcknowledgementSetManager.class.getPackage().getName()); coreContext.scan(DefaultPluginFactory.class.getPackage().getName()); coreContext.register(PluginBeanFactoryProvider.class); + coreContext.registerBean(PluginErrorCollector.class, PluginErrorCollector::new); coreContext.registerBean(ExtensionsConfiguration.class, () -> extensionsConfiguration); coreContext.registerBean(PipelinesDataFlowModel.class, () -> pipelinesDataFlowModel); coreContext.refresh(); diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/ExtensionsIT.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/ExtensionsIT.java index 176e11c4b2..1c4dd2f967 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/ExtensionsIT.java +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/ExtensionsIT.java @@ -31,6 +31,7 @@ import org.opensearch.dataprepper.plugin.TestPluggableInterface; import org.opensearch.dataprepper.plugins.test.TestExtension; import org.opensearch.dataprepper.sourcecoordination.SourceCoordinatorFactory; +import org.opensearch.dataprepper.validation.PluginErrorCollector; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import java.util.ArrayList; @@ -68,6 +69,7 @@ public class ExtensionsIT { private AnnotationConfigApplicationContext publicContext; private AnnotationConfigApplicationContext coreContext; private PluginFactory pluginFactory; + private PluginErrorCollector pluginErrorCollector; private String pluginName; private String pipelineName; @@ -75,6 +77,7 @@ public class ExtensionsIT { void setUp() { pluginName = "test_plugin_using_extension"; pipelineName = UUID.randomUUID().toString(); + pluginErrorCollector = new PluginErrorCollector(); publicContext = new AnnotationConfigApplicationContext(); publicContext.refresh(); @@ -104,6 +107,7 @@ void setUp() { coreContext.registerBean(ObjectMapperConfiguration.class, ObjectMapperConfiguration::new); coreContext.registerBean(ObjectMapper.class, () -> new ObjectMapper(new YAMLFactory())); coreContext.register(PipelineParserConfiguration.class); + coreContext.registerBean(PluginErrorCollector.class, () -> pluginErrorCollector); coreContext.refresh(); pluginFactory = coreContext.getBean(DefaultPluginFactory.class); diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineTransformer.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineTransformer.java index 0f96717c6c..a01103c03a 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineTransformer.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineTransformer.java @@ -8,9 +8,11 @@ import org.opensearch.dataprepper.breaker.CircuitBreakerManager; import org.opensearch.dataprepper.model.annotations.SingleThread; import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.configuration.PipelineModel; import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.peerforwarder.RequiresPeerForwarding; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.model.sink.Sink; @@ -30,10 +32,13 @@ import org.opensearch.dataprepper.pipeline.router.Router; import org.opensearch.dataprepper.pipeline.router.RouterFactory; import org.opensearch.dataprepper.sourcecoordination.SourceCoordinatorFactory; +import org.opensearch.dataprepper.validation.PluginError; +import org.opensearch.dataprepper.validation.PluginErrorCollector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -59,6 +64,7 @@ public class PipelineTransformer { private final EventFactory eventFactory; private final AcknowledgementSetManager acknowledgementSetManager; private final SourceCoordinatorFactory sourceCoordinatorFactory; + private final PluginErrorCollector pluginErrorCollector; public PipelineTransformer(final PipelinesDataFlowModel pipelinesDataFlowModel, final PluginFactory pluginFactory, @@ -68,7 +74,8 @@ public PipelineTransformer(final PipelinesDataFlowModel pipelinesDataFlowModel, final CircuitBreakerManager circuitBreakerManager, final EventFactory eventFactory, final AcknowledgementSetManager acknowledgementSetManager, - final SourceCoordinatorFactory sourceCoordinatorFactory) { + final SourceCoordinatorFactory sourceCoordinatorFactory, + final PluginErrorCollector pluginErrorCollector) { this.pipelinesDataFlowModel = pipelinesDataFlowModel; this.pluginFactory = Objects.requireNonNull(pluginFactory); this.peerForwarderProvider = Objects.requireNonNull(peerForwarderProvider); @@ -78,6 +85,7 @@ public PipelineTransformer(final PipelinesDataFlowModel pipelinesDataFlowModel, this.eventFactory = eventFactory; this.acknowledgementSetManager = acknowledgementSetManager; this.sourceCoordinatorFactory = sourceCoordinatorFactory; + this.pluginErrorCollector = pluginErrorCollector; } public Map transformConfiguration() { @@ -112,11 +120,34 @@ private void buildPipelineFromConfiguration( final PluginSetting sourceSetting = pipelineConfiguration.getSourcePluginSetting(); final Optional pipelineSource = getSourceIfPipelineType(pipelineName, sourceSetting, pipelineMap, pipelineConfigurationMap); - final Source source = pipelineSource.orElseGet(() -> - pluginFactory.loadPlugin(Source.class, sourceSetting)); + final Source source = pipelineSource.orElseGet(() -> { + try { + return pluginFactory.loadPlugin(Source.class, sourceSetting); + } catch (Exception e) { + final PluginError pluginError = PluginError.builder() + .componentType(PipelineModel.SOURCE_PLUGIN_TYPE) + .pipelineName(pipelineName) + .pluginName(sourceSetting.getName()) + .exception(e) + .build(); + pluginErrorCollector.collectPluginError(pluginError); + return null; + } + }); LOG.info("Building buffer for the pipeline [{}]", pipelineName); - final Buffer pipelineDefinedBuffer = pluginFactory.loadPlugin(Buffer.class, pipelineConfiguration.getBufferPluginSetting(), source.getDecoder()); + Buffer pipelineDefinedBuffer = null; + final PluginSetting bufferPluginSetting = pipelineConfiguration.getBufferPluginSetting(); + try { + pipelineDefinedBuffer = pluginFactory.loadPlugin(Buffer.class, bufferPluginSetting, source.getDecoder()); + } catch (Exception e) { + final PluginError pluginError = PluginError.builder() + .componentType(PipelineModel.BUFFER_PLUGIN_TYPE) + .pipelineName(pipelineName) + .pluginName(bufferPluginSetting.getName()) + .build(); + pluginErrorCollector.collectPluginError(pluginError); + } LOG.info("Building processors for the pipeline [{}]", pipelineName); final int processorThreads = pipelineConfiguration.getWorkers(); @@ -125,6 +156,20 @@ private void buildPipelineFromConfiguration( .map(this::newProcessor) .collect(Collectors.toList()); + LOG.info("Building sinks for the pipeline [{}]", pipelineName); + final List> sinks = pipelineConfiguration.getSinkPluginSettings().stream() + .map(this::buildRoutedSinkOrConnector) + .collect(Collectors.toList()); + + final List subPipelinePluginErrors = pluginErrorCollector.getPluginErrors() + .stream().filter(pluginError -> pipelineName.equals(pluginError.getPipelineName())) + .collect(Collectors.toList()); + if (!subPipelinePluginErrors.isEmpty()) { + throw new InvalidPluginConfigurationException( + String.format("One or more plugins are not configured correctly in the pipeline: %s.\n", + pipelineName) + pluginErrorCollector.getConsolidatedErrorMessage()); + } + final List> decoratedProcessorSets = processorSets.stream() .map(processorComponentList -> { final List processors = processorComponentList.stream().map(IdentifiedComponent::getComponent).collect(Collectors.toList()); @@ -138,11 +183,6 @@ private void buildPipelineFromConfiguration( final int readBatchDelay = pipelineConfiguration.getReadBatchDelay(); - LOG.info("Building sinks for the pipeline [{}]", pipelineName); - final List> sinks = pipelineConfiguration.getSinkPluginSettings().stream() - .map(this::buildRoutedSinkOrConnector) - .collect(Collectors.toList()); - final List secondaryBuffers = getSecondaryBuffers(); LOG.info("Constructing MultiBufferDecorator with [{}] secondary buffers for pipeline [{}]", secondaryBuffers.size(), pipelineName); final MultiBufferDecorator multiBufferDecorator = new MultiBufferDecorator(pipelineDefinedBuffer, secondaryBuffers); @@ -167,16 +207,27 @@ private void buildPipelineFromConfiguration( } private List> newProcessor(final PluginSetting pluginSetting) { - final List processors = pluginFactory.loadPlugins( - Processor.class, - pluginSetting, - actualClass -> actualClass.isAnnotationPresent(SingleThread.class) ? - pluginSetting.getNumberOfProcessWorkers() : - 1); - - return processors.stream() - .map(processor -> new IdentifiedComponent<>(processor, pluginSetting.getName())) - .collect(Collectors.toList()); + try { + final List processors = pluginFactory.loadPlugins( + Processor.class, + pluginSetting, + actualClass -> actualClass.isAnnotationPresent(SingleThread.class) ? + pluginSetting.getNumberOfProcessWorkers() : + 1); + + return processors.stream() + .map(processor -> new IdentifiedComponent<>(processor, pluginSetting.getName())) + .collect(Collectors.toList()); + } catch (Exception e) { + final PluginError pluginError = PluginError.builder() + .componentType(PipelineModel.PROCESSOR_PLUGIN_TYPE) + .pipelineName(pluginSetting.getPipelineName()) + .pluginName(pluginSetting.getName()) + .exception(e) + .build(); + pluginErrorCollector.collectPluginError(pluginError); + return Collections.emptyList(); + } } private Optional getSourceIfPipelineType( @@ -213,9 +264,20 @@ private Optional getSourceIfPipelineType( } private DataFlowComponent buildRoutedSinkOrConnector(final SinkContextPluginSetting pluginSetting) { - final Sink sink = buildSinkOrConnector(pluginSetting, pluginSetting.getSinkContext()); - - return new DataFlowComponent<>(sink, pluginSetting.getSinkContext().getRoutes()); + try { + final Sink sink = buildSinkOrConnector(pluginSetting, pluginSetting.getSinkContext()); + + return new DataFlowComponent<>(sink, pluginSetting.getSinkContext().getRoutes()); + } catch (Exception e) { + final PluginError pluginError = PluginError.builder() + .componentType(PipelineModel.SINK_PLUGIN_TYPE) + .pipelineName(pluginSetting.getPipelineName()) + .pluginName(pluginSetting.getName()) + .exception(e) + .build(); + pluginErrorCollector.collectPluginError(pluginError); + return null; + } } private Sink buildSinkOrConnector(final PluginSetting pluginSetting, final SinkContext sinkContext) { diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/config/PipelineParserConfiguration.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/config/PipelineParserConfiguration.java index f4f586c5b6..3fbeedf5af 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/config/PipelineParserConfiguration.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/config/PipelineParserConfiguration.java @@ -21,6 +21,7 @@ import org.opensearch.dataprepper.pipeline.parser.transformer.PipelineConfigurationTransformer; import org.opensearch.dataprepper.pipeline.router.RouterFactory; import org.opensearch.dataprepper.sourcecoordination.SourceCoordinatorFactory; +import org.opensearch.dataprepper.validation.PluginErrorCollector; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; @@ -44,8 +45,9 @@ public PipelineTransformer pipelineParser( final CircuitBreakerManager circuitBreakerManager, final EventFactory eventFactory, final AcknowledgementSetManager acknowledgementSetManager, - final SourceCoordinatorFactory sourceCoordinatorFactory - ) { + final SourceCoordinatorFactory sourceCoordinatorFactory, + final PluginErrorCollector pluginErrorCollector + ) { return new PipelineTransformer(pipelinesDataFlowModel, pluginFactory, peerForwarderProvider, @@ -54,7 +56,8 @@ public PipelineTransformer pipelineParser( circuitBreakerManager, eventFactory, acknowledgementSetManager, - sourceCoordinatorFactory); + sourceCoordinatorFactory, + pluginErrorCollector); } @Bean diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/TestDataProvider.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/TestDataProvider.java index b415b7cf6b..aa72bbf207 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/TestDataProvider.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/TestDataProvider.java @@ -16,7 +16,8 @@ public class TestDataProvider { public static final String VALID_OFF_HEAP_FILE_WITH_ACKS = "src/test/resources/multiple_pipeline_valid_off_heap_buffer_with_acks.yml"; public static final String DISCONNECTED_VALID_OFF_HEAP_FILE_WITH_ACKS = "src/test/resources/multiple_disconnected_pipeline_valid_off_heap_buffer_with_acks.yml"; public static final String CONNECTED_PIPELINE_ROOT_SOURCE_INCORRECT = "src/test/resources/connected_pipeline_incorrect_root_source.yml"; - public static final String CONNECTED_PIPELINE_CHILD_PIPELINE_INCORRECT = "src/test/resources/connected_pipeline_incorrect_child_pipeline.yml"; + public static final String CONNECTED_PIPELINE_CHILD_PIPELINE_INCORRECT_DUE_TO_SINK = "src/test/resources/connected_pipeline_incorrect_child_pipeline_due_to_invalid_sink.yml"; + public static final String CONNECTED_PIPELINE_CHILD_PIPELINE_INCORRECT_DUE_TO_PROCESSOR = "src/test/resources/connected_pipeline_incorrect_child_pipeline_due_to_invalid_processor.yml"; public static final String CYCLE_MULTIPLE_PIPELINE_CONFIG_FILE = "src/test/resources/cyclic_multiple_pipeline_configuration.yml"; public static final String INCORRECT_SOURCE_MULTIPLE_PIPELINE_CONFIG_FILE = "src/test/resources/incorrect_source_multiple_pipeline_configuration.yml"; public static final String MISSING_NAME_MULTIPLE_PIPELINE_CONFIG_FILE = "src/test/resources/missing_name_multiple_pipeline_configuration.yml"; diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelineTransformerTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelineTransformerTests.java index 07817f8ee0..112bb8e93f 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelineTransformerTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelineTransformerTests.java @@ -13,6 +13,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.TestDataProvider; @@ -36,6 +37,8 @@ import org.opensearch.dataprepper.pipeline.router.RouterFactory; import org.opensearch.dataprepper.plugin.DefaultPluginFactory; import org.opensearch.dataprepper.sourcecoordination.SourceCoordinatorFactory; +import org.opensearch.dataprepper.validation.PluginError; +import org.opensearch.dataprepper.validation.PluginErrorCollector; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import java.time.Duration; @@ -82,6 +85,8 @@ class PipelineTransformerTests { @Mock private CircuitBreakerManager circuitBreakerManager; + private PluginErrorCollector pluginErrorCollector; + private EventFactory eventFactory; private DefaultAcknowledgementSetManager acknowledgementSetManager; @@ -93,6 +98,7 @@ void setUp() { peerForwarderProvider = mock(PeerForwarderProvider.class); eventFactory = mock(EventFactory.class); acknowledgementSetManager = mock(DefaultAcknowledgementSetManager.class); + pluginErrorCollector = new PluginErrorCollector(); final AnnotationConfigApplicationContext publicContext = new AnnotationConfigApplicationContext(); publicContext.refresh(); @@ -104,6 +110,7 @@ void setUp() { coreContext.scan(DefaultAcknowledgementSetManager.class.getPackage().getName()); coreContext.scan(DefaultPluginFactory.class.getPackage().getName()); + coreContext.registerBean(PluginErrorCollector.class, () -> pluginErrorCollector); coreContext.registerBean(DataPrepperConfiguration.class, () -> dataPrepperConfiguration); coreContext.registerBean(PipelinesDataFlowModel.class, () -> pipelinesDataFlowModel); coreContext.refresh(); @@ -121,7 +128,7 @@ private PipelineTransformer createObjectUnderTest(final String pipelineConfigura new PipelineConfigurationFileReader(pipelineConfigurationFileLocation)).parseConfiguration(); return new PipelineTransformer(pipelinesDataFlowModel, pluginFactory, peerForwarderProvider, routerFactory, dataPrepperConfiguration, circuitBreakerManager, eventFactory, - acknowledgementSetManager, sourceCoordinatorFactory); + acknowledgementSetManager, sourceCoordinatorFactory, pluginErrorCollector); } @Test @@ -193,17 +200,31 @@ void parseConfiguration_with_invalid_root_pipeline_creates_empty_pipelinesMap() final Map connectedPipelines = pipelineTransformer.transformConfiguration(); assertThat(connectedPipelines.size(), equalTo(0)); verify(dataPrepperConfiguration).getPipelineExtensions(); + assertThat(pluginErrorCollector.getPluginErrors().size(), equalTo(1)); + final PluginError pluginError = pluginErrorCollector.getPluginErrors().get(0); + assertThat(pluginError.getPipelineName(), equalTo("test-pipeline-1")); + assertThat(pluginError.getPluginName(), equalTo("file")); + assertThat(pluginError.getException(), notNullValue()); } - @Test - void parseConfiguration_with_incorrect_child_pipeline_returns_empty_pipelinesMap() { + @ParameterizedTest + @ValueSource(strings = { + TestDataProvider.CONNECTED_PIPELINE_CHILD_PIPELINE_INCORRECT_DUE_TO_SINK, + TestDataProvider.CONNECTED_PIPELINE_CHILD_PIPELINE_INCORRECT_DUE_TO_PROCESSOR + }) + void parseConfiguration_with_incorrect_child_pipeline_returns_empty_pipelinesMap( + final String pipelineConfigurationFileLocation) { mockDataPrepperConfigurationAccesses(); - final PipelineTransformer pipelineTransformer = - createObjectUnderTest(TestDataProvider.CONNECTED_PIPELINE_CHILD_PIPELINE_INCORRECT); + final PipelineTransformer pipelineTransformer = createObjectUnderTest(pipelineConfigurationFileLocation); final Map connectedPipelines = pipelineTransformer.transformConfiguration(); assertThat(connectedPipelines.size(), equalTo(0)); verifyDataPrepperConfigurationAccesses(); verify(dataPrepperConfiguration).getPipelineExtensions(); + assertThat(pluginErrorCollector.getPluginErrors().size(), equalTo(1)); + final PluginError pluginError = pluginErrorCollector.getPluginErrors().get(0); + assertThat(pluginError.getPipelineName(), equalTo("test-pipeline-2")); + assertThat(pluginError.getPluginName(), notNullValue()); + assertThat(pluginError.getException(), notNullValue()); } @Test diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/config/PipelineParserConfigurationTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/config/PipelineParserConfigurationTest.java index 3037b6a68d..9b027c6c75 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/config/PipelineParserConfigurationTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/config/PipelineParserConfigurationTest.java @@ -19,6 +19,7 @@ import org.opensearch.dataprepper.model.event.EventFactory; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.sourcecoordination.SourceCoordinatorFactory; +import org.opensearch.dataprepper.validation.PluginErrorCollector; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; @@ -55,11 +56,15 @@ class PipelineParserConfigurationTest { @Mock private AcknowledgementSetManager acknowledgementSetManager; + @Mock + private PluginErrorCollector pluginErrorCollector; + @Test void pipelineParser() { final PipelineTransformer pipelineTransformer = pipelineParserConfiguration.pipelineParser( pipelinesDataFlowModel, pluginFactory, peerForwarderProvider, routerFactory, - dataPrepperConfiguration, circuitBreakerManager, eventFactory, acknowledgementSetManager, sourceCoordinatorFactory); + dataPrepperConfiguration, circuitBreakerManager, eventFactory, acknowledgementSetManager, + sourceCoordinatorFactory, pluginErrorCollector); assertThat(pipelineTransformer, is(notNullValue())); } diff --git a/data-prepper-core/src/test/resources/connected_pipeline_incorrect_child_pipeline_due_to_invalid_processor.yml b/data-prepper-core/src/test/resources/connected_pipeline_incorrect_child_pipeline_due_to_invalid_processor.yml new file mode 100644 index 0000000000..95a821042f --- /dev/null +++ b/data-prepper-core/src/test/resources/connected_pipeline_incorrect_child_pipeline_due_to_invalid_processor.yml @@ -0,0 +1,26 @@ +# this configuration file is solely for testing formatting +test-pipeline-1: + source: + file: + path: "/tmp/file-source.tmp" + buffer: + bounded_blocking: + sink: + - pipeline: + name: "test-pipeline-2" +test-pipeline-2: + source: + pipeline: + name: "test-pipeline-1" + processor: + - invalid_processor: # this will fail plugin creation + sink: + - pipeline: + name: "test-pipeline-3" +test-pipeline-3: + source: + pipeline: + name: "test-pipeline-2" + sink: + - file: + path: "/tmp/todelete.txt" \ No newline at end of file diff --git a/data-prepper-core/src/test/resources/connected_pipeline_incorrect_child_pipeline.yml b/data-prepper-core/src/test/resources/connected_pipeline_incorrect_child_pipeline_due_to_invalid_sink.yml similarity index 100% rename from data-prepper-core/src/test/resources/connected_pipeline_incorrect_child_pipeline.yml rename to data-prepper-core/src/test/resources/connected_pipeline_incorrect_child_pipeline_due_to_invalid_sink.yml diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/validation/PluginError.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/validation/PluginError.java new file mode 100644 index 0000000000..50a130c4f4 --- /dev/null +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/validation/PluginError.java @@ -0,0 +1,47 @@ +package org.opensearch.dataprepper.validation; + +import lombok.Builder; +import lombok.Getter; +import lombok.NonNull; + +@Getter +@Builder +public class PluginError { + static final String PIPELINE_DELIMITER = ":"; + static final String CAUSED_BY_DELIMITER = " caused by: "; + private final String pipelineName; + private final String componentType; + @NonNull + private final String pluginName; + @NonNull + private final Exception exception; + + public String getErrorMessage() { + final StringBuilder message = new StringBuilder(); + if (pipelineName != null) { + message.append(pipelineName); + message.append(PIPELINE_DELIMITER); + } + if (componentType != null) { + message.append(componentType); + message.append(PIPELINE_DELIMITER); + } + message.append(pluginName); + message.append(PIPELINE_DELIMITER); + message.append(getFlattenedExceptionMessage(CAUSED_BY_DELIMITER)); + return message.toString(); + } + + private String getFlattenedExceptionMessage(final String delimiter) { + final StringBuilder message = new StringBuilder(); + Throwable throwable = exception; + + while (throwable != null) { + message.append(delimiter); + message.append(throwable.getMessage()); + throwable = throwable.getCause(); + } + + return message.toString(); + } +} diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/validation/PluginErrorCollector.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/validation/PluginErrorCollector.java new file mode 100644 index 0000000000..0be5539019 --- /dev/null +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/validation/PluginErrorCollector.java @@ -0,0 +1,31 @@ +package org.opensearch.dataprepper.validation; + +import lombok.Getter; + +import javax.inject.Named; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +@Named +@Getter +public class PluginErrorCollector { + private final List pluginErrors = new ArrayList<>(); + + public void collectPluginError(final PluginError pluginError) { + pluginErrors.add(pluginError); + } + + public List getAllErrorMessages() { + return pluginErrors.stream().map(PluginError::getErrorMessage).collect(Collectors.toList()); + } + + public String getConsolidatedErrorMessage() { + final List allErrorMessages = getAllErrorMessages(); + + return IntStream.range(0, allErrorMessages.size()) + .mapToObj(i -> (i + 1) + ". " + allErrorMessages.get(i)) + .collect(Collectors.joining("\n")); + } +} diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/validation/DataPrepperValidateTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/validation/DataPrepperValidateTest.java new file mode 100644 index 0000000000..447fb21838 --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/validation/DataPrepperValidateTest.java @@ -0,0 +1,3 @@ +package org.opensearch.dataprepper.validation; +class DataPrepperValidateTest { +} \ No newline at end of file diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/validation/PluginErrorCollectorTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/validation/PluginErrorCollectorTest.java new file mode 100644 index 0000000000..324a202fb8 --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/validation/PluginErrorCollectorTest.java @@ -0,0 +1,30 @@ +package org.opensearch.dataprepper.validation; + +import org.junit.jupiter.api.Test; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class PluginErrorCollectorTest { + + @Test + void testWithPluginErrors() { + final PluginErrorCollector objectUnderTest = new PluginErrorCollector(); + final String testErrorMessage1 = "test error message 1"; + final String testErrorMessage2 = "test error message 2"; + final PluginError pluginError1 = mock(PluginError.class); + when(pluginError1.getErrorMessage()).thenReturn(testErrorMessage1); + final PluginError pluginError2 = mock(PluginError.class); + when(pluginError2.getErrorMessage()).thenReturn(testErrorMessage2); + objectUnderTest.collectPluginError(pluginError1); + objectUnderTest.collectPluginError(pluginError2); + assertThat(objectUnderTest.getPluginErrors().size(), equalTo(2)); + assertThat(objectUnderTest.getAllErrorMessages().size(), equalTo(2)); + assertThat(objectUnderTest.getAllErrorMessages(), contains(testErrorMessage1, testErrorMessage2)); + assertThat(objectUnderTest.getConsolidatedErrorMessage(), equalTo( + String.format("1. %s\n2. %s", testErrorMessage1, testErrorMessage2))); + } +} \ No newline at end of file diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/validation/PluginErrorTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/validation/PluginErrorTest.java new file mode 100644 index 0000000000..c02dde376c --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/validation/PluginErrorTest.java @@ -0,0 +1,58 @@ +package org.opensearch.dataprepper.validation; + +import org.junit.jupiter.api.Test; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class PluginErrorTest { + private static final String TEST_PIPELINE_NAME = "test-pipeline"; + private static final String TEST_COMPONENT_TYPE = "test-plugin-type"; + private static final String TEST_PLUGIN_NAME = "test-plugin"; + private static final String TEST_ERROR_MESSAGE = "test error message"; + private static final String TEST_CAUSE_ERROR_MESSAGE = "test cause error message"; + + @Test + void testGetErrorMessageWithPipelineName() { + final Exception exception = mock(Exception.class); + when(exception.getMessage()).thenReturn(TEST_ERROR_MESSAGE); + final PluginError pluginError = PluginError.builder() + .pipelineName(TEST_PIPELINE_NAME) + .componentType(TEST_COMPONENT_TYPE) + .pluginName(TEST_PLUGIN_NAME) + .exception(exception) + .build(); + assertThat(pluginError.getErrorMessage(), equalTo( + "test-pipeline:test-plugin-type:test-plugin: caused by: test error message")); + } + + @Test + void testGetErrorMessageWithoutPipelineName() { + final Exception exception = mock(Exception.class); + when(exception.getMessage()).thenReturn(TEST_ERROR_MESSAGE); + final PluginError pluginError = PluginError.builder() + .pluginName(TEST_PLUGIN_NAME) + .componentType(TEST_COMPONENT_TYPE) + .exception(exception) + .build(); + assertThat(pluginError.getErrorMessage(), equalTo( + "test-plugin-type:test-plugin: caused by: test error message")); + } + + @Test + void testGetErrorMessageWithCause() { + final Exception exception = mock(Exception.class); + final Exception cause = mock(Exception.class); + when(exception.getMessage()).thenReturn(TEST_ERROR_MESSAGE); + when(cause.getMessage()).thenReturn(TEST_CAUSE_ERROR_MESSAGE); + when(exception.getCause()).thenReturn(cause); + final PluginError pluginError = PluginError.builder() + .pluginName(TEST_PLUGIN_NAME) + .exception(exception) + .build(); + assertThat(pluginError.getErrorMessage(), equalTo( + "test-plugin: caused by: test error message caused by: test cause error message")); + } +} \ No newline at end of file diff --git a/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/ExtensionLoader.java b/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/ExtensionLoader.java index 16706448e9..f527d835a4 100644 --- a/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/ExtensionLoader.java +++ b/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/ExtensionLoader.java @@ -6,8 +6,12 @@ package org.opensearch.dataprepper.plugin; import org.opensearch.dataprepper.model.annotations.DataPrepperExtensionPlugin; +import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel; import org.opensearch.dataprepper.model.plugin.ExtensionPlugin; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.plugin.InvalidPluginDefinitionException; +import org.opensearch.dataprepper.validation.PluginError; +import org.opensearch.dataprepper.validation.PluginErrorCollector; import javax.inject.Inject; import javax.inject.Named; @@ -21,26 +25,47 @@ public class ExtensionLoader { private final ExtensionPluginConfigurationConverter extensionPluginConfigurationConverter; private final ExtensionClassProvider extensionClassProvider; private final PluginCreator extensionPluginCreator; + private final PluginErrorCollector pluginErrorCollector; @Inject ExtensionLoader( final ExtensionPluginConfigurationConverter extensionPluginConfigurationConverter, final ExtensionClassProvider extensionClassProvider, - @Named("extensionPluginCreator") final PluginCreator extensionPluginCreator) { + @Named("extensionPluginCreator") final PluginCreator extensionPluginCreator, + final PluginErrorCollector pluginErrorCollector) { this.extensionPluginConfigurationConverter = extensionPluginConfigurationConverter; this.extensionClassProvider = extensionClassProvider; this.extensionPluginCreator = extensionPluginCreator; + this.pluginErrorCollector = pluginErrorCollector; } - List loadExtensions() { - return extensionClassProvider.loadExtensionPluginClasses() + public List loadExtensions() { + final List result = extensionClassProvider.loadExtensionPluginClasses() .stream() .map(extensionClass -> { - final PluginArgumentsContext pluginArgumentsContext = getConstructionContext(extensionClass); - return extensionPluginCreator.newPluginInstance( - extensionClass, pluginArgumentsContext, convertClassToName(extensionClass)); + final String pluginName = convertClassToName(extensionClass); + try { + final PluginArgumentsContext pluginArgumentsContext = getConstructionContext(extensionClass); + return extensionPluginCreator.newPluginInstance( + extensionClass, pluginArgumentsContext, pluginName); + } catch (Exception e) { + final PluginError pluginError = PluginError.builder() + .componentType(PipelinesDataFlowModel.EXTENSION_PLUGIN_TYPE) + .pluginName(pluginName) + .exception(e) + .build(); + pluginErrorCollector.collectPluginError(pluginError); + return null; + } }) .collect(Collectors.toList()); + if (!pluginErrorCollector.getPluginErrors().isEmpty()) { + throw new InvalidPluginConfigurationException( + "One or more extension plugins are not configured correctly.\n" + + pluginErrorCollector.getConsolidatedErrorMessage()); + } else { + return result; + } } private PluginArgumentsContext getConstructionContext(final Class extensionPluginClass) { diff --git a/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/ExtensionLoaderTest.java b/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/ExtensionLoaderTest.java index 176f3e0702..dc47a4f698 100644 --- a/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/ExtensionLoaderTest.java +++ b/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/ExtensionLoaderTest.java @@ -6,6 +6,8 @@ package org.opensearch.dataprepper.plugin; import org.apache.commons.lang3.stream.Streams; +import org.hamcrest.CoreMatchers; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; @@ -17,9 +19,12 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.model.plugin.ExtensionPlugin; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.plugin.InvalidPluginDefinitionException; import org.opensearch.dataprepper.plugins.test.TestExtensionConfig; import org.opensearch.dataprepper.plugins.test.TestExtensionWithConfig; +import org.opensearch.dataprepper.validation.PluginError; +import org.opensearch.dataprepper.validation.PluginErrorCollector; import java.util.ArrayList; import java.util.Collection; @@ -31,7 +36,9 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; @@ -52,9 +59,16 @@ class ExtensionLoaderTest { private PluginCreator extensionPluginCreator; @Captor private ArgumentCaptor pluginArgumentsContextArgumentCaptor; + private PluginErrorCollector pluginErrorCollector; private ExtensionLoader createObjectUnderTest() { - return new ExtensionLoader(extensionPluginConfigurationConverter, extensionClassProvider, extensionPluginCreator); + return new ExtensionLoader(extensionPluginConfigurationConverter, extensionClassProvider, + extensionPluginCreator, pluginErrorCollector); + } + + @BeforeEach + void setUp() { + pluginErrorCollector = new PluginErrorCollector(); } @Test @@ -65,6 +79,7 @@ void loadExtensions_returns_empty_list_when_no_plugin_classes() { assertThat(extensionPlugins, notNullValue()); assertThat(extensionPlugins.size(), equalTo(0)); + assertThat(pluginErrorCollector.getPluginErrors().isEmpty(), is(true)); } @Test @@ -85,6 +100,27 @@ void loadExtensions_returns_single_extension_for_single_plugin_class() { assertThat(extensionPlugins, notNullValue()); assertThat(extensionPlugins.size(), equalTo(1)); assertThat(extensionPlugins.get(0), equalTo(expectedPlugin)); + assertThat(pluginErrorCollector.getPluginErrors().isEmpty(), is(true)); + } + + @Test + void loadExtensions_throws_InvalidPluginConfigurationException_when_invoking_newPluginInstance_throws_exception() { + final Class pluginClass = (Class) mock(ExtensionPlugin.class).getClass(); + + when(extensionClassProvider.loadExtensionPluginClasses()).thenReturn(Collections.singleton(pluginClass)); + + when(extensionPluginCreator.newPluginInstance( + eq(pluginClass), + any(PluginArgumentsContext.class), + startsWith("extension_plugin"))) + .thenThrow(TestException.class); + + assertThrows(InvalidPluginConfigurationException.class, () -> createObjectUnderTest().loadExtensions()); + assertThat(pluginErrorCollector.getPluginErrors().size(), equalTo(1)); + final PluginError pluginError = pluginErrorCollector.getPluginErrors().get(0); + assertThat(pluginError.getPipelineName(), nullValue()); + assertThat(pluginError.getPluginName(), CoreMatchers.startsWith("extension_plugin")); + assertThat(pluginError.getException(), instanceOf(TestException.class)); } @ParameterizedTest @@ -113,6 +149,24 @@ void loadExtensions_returns_single_extension_with_config_for_single_plugin_class assertThat(extensionPlugins, notNullValue()); assertThat(extensionPlugins.size(), equalTo(1)); assertThat(extensionPlugins.get(0), equalTo(expectedPlugin)); + assertThat(pluginErrorCollector.getPluginErrors().isEmpty(), is(true)); + } + + @Test + void loadExtensions_throws_InvalidPluginConfigurationException_when_extensionPluginConfigurationConverter_throws_exception() { + when(extensionClassProvider.loadExtensionPluginClasses()) + .thenReturn(Collections.singleton(TestExtensionWithConfig.class)); + when(extensionClassProvider.loadExtensionPluginClasses()).thenReturn( + Collections.singleton(TestExtensionWithConfig.class)); + when(extensionPluginConfigurationConverter.convert(eq(true), eq(TestExtensionConfig.class), + eq("/test_extension"))).thenThrow(TestException.class); + + assertThrows(InvalidPluginConfigurationException.class, () -> createObjectUnderTest().loadExtensions()); + assertThat(pluginErrorCollector.getPluginErrors().size(), equalTo(1)); + final PluginError pluginError = pluginErrorCollector.getPluginErrors().get(0); + assertThat(pluginError.getPipelineName(), nullValue()); + assertThat(pluginError.getPluginName(), CoreMatchers.startsWith("test_extension_with_config")); + assertThat(pluginError.getException(), instanceOf(TestException.class)); } @Test @@ -148,6 +202,7 @@ void loadExtensions_returns_multiple_extensions_for_multiple_plugin_classes() { for (ExtensionPlugin expectedPlugin : actualPlugins) { assertThat(actualPlugins, hasItem(expectedPlugin)); } + assertThat(pluginErrorCollector.getPluginErrors().isEmpty(), is(true)); } @Test @@ -175,6 +230,7 @@ void loadExtensions_invokes_newPluginInstance_with_PluginArgumentsContext_not_su final Class[] inputClasses = {String.class}; assertThrows(InvalidPluginDefinitionException.class, () -> actualPluginArgumentsContext.createArguments(inputClasses)); + assertThat(pluginErrorCollector.getPluginErrors().isEmpty(), is(true)); } @Test @@ -203,6 +259,7 @@ void loadExtensions_invokes_newPluginInstance_with_PluginArgumentsContext_which_ final Object[] arguments = actualPluginArgumentsContext.createArguments(new Class[]{}); assertThat(arguments, notNullValue()); assertThat(arguments.length, equalTo(0)); + assertThat(pluginErrorCollector.getPluginErrors().isEmpty(), is(true)); } @ParameterizedTest @@ -230,4 +287,8 @@ private interface TestExtension2 extends ExtensionPlugin { } private interface TestExtension3 extends ExtensionPlugin { } + + private static class TestException extends RuntimeException { + + } }