diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryIT.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryIT.java index 89ab07d11d..4c52c614d4 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryIT.java +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugin/DefaultPluginFactoryIT.java @@ -17,7 +17,9 @@ import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.plugins.TestObjectPlugin; import org.opensearch.dataprepper.plugins.test.TestPlugin; +import org.opensearch.dataprepper.validation.LoggingPluginErrorsHandler; import org.opensearch.dataprepper.validation.PluginErrorCollector; +import org.opensearch.dataprepper.validation.PluginErrorsHandler; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import java.util.HashMap; @@ -63,6 +65,7 @@ private DefaultPluginFactory createObjectUnderTest() { coreContext.scan(DefaultPluginFactory.class.getPackage().getName()); coreContext.register(PluginBeanFactoryProvider.class); coreContext.registerBean(PluginErrorCollector.class, PluginErrorCollector::new); + coreContext.registerBean(PluginErrorsHandler.class, LoggingPluginErrorsHandler::new); coreContext.registerBean(ExtensionsConfiguration.class, () -> extensionsConfiguration); coreContext.registerBean(PipelinesDataFlowModel.class, () -> pipelinesDataFlowModel); coreContext.refresh(); diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/ExtensionsIT.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/ExtensionsIT.java index 1c4dd2f967..53dee72e55 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/ExtensionsIT.java +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/ExtensionsIT.java @@ -31,7 +31,9 @@ import org.opensearch.dataprepper.plugin.TestPluggableInterface; import org.opensearch.dataprepper.plugins.test.TestExtension; import org.opensearch.dataprepper.sourcecoordination.SourceCoordinatorFactory; +import org.opensearch.dataprepper.validation.LoggingPluginErrorsHandler; import org.opensearch.dataprepper.validation.PluginErrorCollector; +import org.opensearch.dataprepper.validation.PluginErrorsHandler; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import java.util.ArrayList; @@ -70,6 +72,7 @@ public class ExtensionsIT { private AnnotationConfigApplicationContext coreContext; private PluginFactory pluginFactory; private PluginErrorCollector pluginErrorCollector; + private PluginErrorsHandler pluginErrorsHandler; private String pluginName; private String pipelineName; @@ -78,6 +81,7 @@ void setUp() { pluginName = "test_plugin_using_extension"; pipelineName = UUID.randomUUID().toString(); pluginErrorCollector = new PluginErrorCollector(); + pluginErrorsHandler = new LoggingPluginErrorsHandler(); publicContext = new AnnotationConfigApplicationContext(); publicContext.refresh(); @@ -108,6 +112,7 @@ void setUp() { coreContext.registerBean(ObjectMapper.class, () -> new ObjectMapper(new YAMLFactory())); coreContext.register(PipelineParserConfiguration.class); coreContext.registerBean(PluginErrorCollector.class, () -> pluginErrorCollector); + coreContext.registerBean(PluginErrorsHandler.class, () -> pluginErrorsHandler); coreContext.refresh(); pluginFactory = coreContext.getBean(DefaultPluginFactory.class); diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineTransformer.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineTransformer.java index a01103c03a..b3f4aede00 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineTransformer.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineTransformer.java @@ -34,6 +34,7 @@ import org.opensearch.dataprepper.sourcecoordination.SourceCoordinatorFactory; import org.opensearch.dataprepper.validation.PluginError; import org.opensearch.dataprepper.validation.PluginErrorCollector; +import org.opensearch.dataprepper.validation.PluginErrorsHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,6 +66,7 @@ public class PipelineTransformer { private final AcknowledgementSetManager acknowledgementSetManager; private final SourceCoordinatorFactory sourceCoordinatorFactory; private final PluginErrorCollector pluginErrorCollector; + private final PluginErrorsHandler pluginErrorsHandler; public PipelineTransformer(final PipelinesDataFlowModel pipelinesDataFlowModel, final PluginFactory pluginFactory, @@ -75,7 +77,8 @@ public PipelineTransformer(final PipelinesDataFlowModel pipelinesDataFlowModel, final EventFactory eventFactory, final AcknowledgementSetManager acknowledgementSetManager, final SourceCoordinatorFactory sourceCoordinatorFactory, - final PluginErrorCollector pluginErrorCollector) { + final PluginErrorCollector pluginErrorCollector, + final PluginErrorsHandler pluginErrorsHandler) { this.pipelinesDataFlowModel = pipelinesDataFlowModel; this.pluginFactory = Objects.requireNonNull(pluginFactory); this.peerForwarderProvider = Objects.requireNonNull(peerForwarderProvider); @@ -86,6 +89,7 @@ public PipelineTransformer(final PipelinesDataFlowModel pipelinesDataFlowModel, this.acknowledgementSetManager = acknowledgementSetManager; this.sourceCoordinatorFactory = sourceCoordinatorFactory; this.pluginErrorCollector = pluginErrorCollector; + this.pluginErrorsHandler = pluginErrorsHandler; } public Map transformConfiguration() { @@ -165,9 +169,10 @@ private void buildPipelineFromConfiguration( .stream().filter(pluginError -> pipelineName.equals(pluginError.getPipelineName())) .collect(Collectors.toList()); if (!subPipelinePluginErrors.isEmpty()) { + pluginErrorsHandler.handleErrors(subPipelinePluginErrors); throw new InvalidPluginConfigurationException( String.format("One or more plugins are not configured correctly in the pipeline: %s.\n", - pipelineName) + pluginErrorCollector.getConsolidatedErrorMessage()); + pipelineName)); } final List> decoratedProcessorSets = processorSets.stream() diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/config/PipelineParserConfiguration.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/config/PipelineParserConfiguration.java index 3fbeedf5af..440e618f3b 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/config/PipelineParserConfiguration.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/config/PipelineParserConfiguration.java @@ -22,6 +22,7 @@ import org.opensearch.dataprepper.pipeline.router.RouterFactory; import org.opensearch.dataprepper.sourcecoordination.SourceCoordinatorFactory; import org.opensearch.dataprepper.validation.PluginErrorCollector; +import org.opensearch.dataprepper.validation.PluginErrorsHandler; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; @@ -46,7 +47,8 @@ public PipelineTransformer pipelineParser( final EventFactory eventFactory, final AcknowledgementSetManager acknowledgementSetManager, final SourceCoordinatorFactory sourceCoordinatorFactory, - final PluginErrorCollector pluginErrorCollector + final PluginErrorCollector pluginErrorCollector, + final PluginErrorsHandler pluginErrorsHandler ) { return new PipelineTransformer(pipelinesDataFlowModel, pluginFactory, @@ -57,7 +59,8 @@ public PipelineTransformer pipelineParser( eventFactory, acknowledgementSetManager, sourceCoordinatorFactory, - pluginErrorCollector); + pluginErrorCollector, + pluginErrorsHandler); } @Bean diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelineTransformerTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelineTransformerTests.java index 112bb8e93f..13b30965a6 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelineTransformerTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelineTransformerTests.java @@ -14,6 +14,8 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.TestDataProvider; @@ -22,6 +24,7 @@ import org.opensearch.dataprepper.core.event.EventFactoryApplicationContextMarker; import org.opensearch.dataprepper.model.breaker.CircuitBreaker; import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.configuration.PipelineModel; import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventFactory; @@ -39,14 +42,18 @@ import org.opensearch.dataprepper.sourcecoordination.SourceCoordinatorFactory; import org.opensearch.dataprepper.validation.PluginError; import org.opensearch.dataprepper.validation.PluginErrorCollector; +import org.opensearch.dataprepper.validation.PluginErrorsHandler; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Random; +import java.util.Set; import java.util.UUID; import java.util.stream.Stream; @@ -84,6 +91,10 @@ class PipelineTransformerTests { private SourceCoordinatorFactory sourceCoordinatorFactory; @Mock private CircuitBreakerManager circuitBreakerManager; + @Mock + private PluginErrorsHandler pluginErrorsHandler; + @Captor + private ArgumentCaptor> pluginErrorsArgumentCaptor; private PluginErrorCollector pluginErrorCollector; @@ -111,6 +122,7 @@ void setUp() { coreContext.scan(DefaultPluginFactory.class.getPackage().getName()); coreContext.registerBean(PluginErrorCollector.class, () -> pluginErrorCollector); + coreContext.registerBean(PluginErrorsHandler.class, () -> pluginErrorsHandler); coreContext.registerBean(DataPrepperConfiguration.class, () -> dataPrepperConfiguration); coreContext.registerBean(PipelinesDataFlowModel.class, () -> pipelinesDataFlowModel); coreContext.refresh(); @@ -126,9 +138,11 @@ private PipelineTransformer createObjectUnderTest(final String pipelineConfigura final PipelinesDataFlowModel pipelinesDataFlowModel = new PipelinesDataflowModelParser( new PipelineConfigurationFileReader(pipelineConfigurationFileLocation)).parseConfiguration(); - return new PipelineTransformer(pipelinesDataFlowModel, pluginFactory, peerForwarderProvider, - routerFactory, dataPrepperConfiguration, circuitBreakerManager, eventFactory, - acknowledgementSetManager, sourceCoordinatorFactory, pluginErrorCollector); + return new PipelineTransformer( + pipelinesDataFlowModel, pluginFactory, peerForwarderProvider, + routerFactory, dataPrepperConfiguration, circuitBreakerManager, eventFactory, + acknowledgementSetManager, sourceCoordinatorFactory, pluginErrorCollector, + pluginErrorsHandler); } @Test @@ -214,17 +228,29 @@ void parseConfiguration_with_invalid_root_pipeline_creates_empty_pipelinesMap() }) void parseConfiguration_with_incorrect_child_pipeline_returns_empty_pipelinesMap( final String pipelineConfigurationFileLocation) { + pluginErrorCollector.collectPluginError( + PluginError.builder() + .componentType("non-pipeline-plugin") + .pluginName("preexisting-plugin") + .exception(new RuntimeException()) + .build()); mockDataPrepperConfigurationAccesses(); final PipelineTransformer pipelineTransformer = createObjectUnderTest(pipelineConfigurationFileLocation); final Map connectedPipelines = pipelineTransformer.transformConfiguration(); assertThat(connectedPipelines.size(), equalTo(0)); verifyDataPrepperConfigurationAccesses(); verify(dataPrepperConfiguration).getPipelineExtensions(); - assertThat(pluginErrorCollector.getPluginErrors().size(), equalTo(1)); - final PluginError pluginError = pluginErrorCollector.getPluginErrors().get(0); + assertThat(pluginErrorCollector.getPluginErrors().size(), equalTo(2)); + final PluginError pluginError = pluginErrorCollector.getPluginErrors().get(1); assertThat(pluginError.getPipelineName(), equalTo("test-pipeline-2")); assertThat(pluginError.getPluginName(), notNullValue()); assertThat(pluginError.getException(), notNullValue()); + verify(pluginErrorsHandler).handleErrors(pluginErrorsArgumentCaptor.capture()); + final Collection pluginErrorCollection = pluginErrorsArgumentCaptor.getValue(); + assertThat(pluginErrorCollection.size(), equalTo(1)); + final PluginError capturedPluginError = new ArrayList<>(pluginErrorCollection).get(0); + assertThat(Set.of(PipelineModel.PROCESSOR_PLUGIN_TYPE, PipelineModel.SINK_PLUGIN_TYPE) + .contains(capturedPluginError.getComponentType()), is(true)); } @Test diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/config/PipelineParserConfigurationTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/config/PipelineParserConfigurationTest.java index 9b027c6c75..ab657dfc94 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/config/PipelineParserConfigurationTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/config/PipelineParserConfigurationTest.java @@ -20,6 +20,7 @@ import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.sourcecoordination.SourceCoordinatorFactory; import org.opensearch.dataprepper.validation.PluginErrorCollector; +import org.opensearch.dataprepper.validation.PluginErrorsHandler; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; @@ -59,12 +60,15 @@ class PipelineParserConfigurationTest { @Mock private PluginErrorCollector pluginErrorCollector; + @Mock + private PluginErrorsHandler pluginErrorsHandler; + @Test void pipelineParser() { final PipelineTransformer pipelineTransformer = pipelineParserConfiguration.pipelineParser( pipelinesDataFlowModel, pluginFactory, peerForwarderProvider, routerFactory, dataPrepperConfiguration, circuitBreakerManager, eventFactory, acknowledgementSetManager, - sourceCoordinatorFactory, pluginErrorCollector); + sourceCoordinatorFactory, pluginErrorCollector, pluginErrorsHandler); assertThat(pipelineTransformer, is(notNullValue())); } diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/validation/LoggingPluginErrorsHandler.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/validation/LoggingPluginErrorsHandler.java new file mode 100644 index 0000000000..879f18823e --- /dev/null +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/validation/LoggingPluginErrorsHandler.java @@ -0,0 +1,26 @@ +package org.opensearch.dataprepper.validation; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.inject.Named; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +@Named +public class LoggingPluginErrorsHandler implements PluginErrorsHandler { + private static final Logger LOG = LoggerFactory.getLogger(LoggingPluginErrorsHandler.class); + + @Override + public void handleErrors(final Collection pluginErrors) { + final List allErrorMessages = pluginErrors.stream() + .map(PluginError::getErrorMessage) + .collect(Collectors.toList()); + final String consolidatedErrorMessage = IntStream.range(0, allErrorMessages.size()) + .mapToObj(i -> (i + 1) + ". " + allErrorMessages.get(i)) + .collect(Collectors.joining("\n")); + LOG.error(consolidatedErrorMessage); + } +} diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/validation/PluginErrorCollector.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/validation/PluginErrorCollector.java index 0be5539019..62b6654c0b 100644 --- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/validation/PluginErrorCollector.java +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/validation/PluginErrorCollector.java @@ -5,8 +5,6 @@ import javax.inject.Named; import java.util.ArrayList; import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.IntStream; @Named @Getter @@ -16,16 +14,4 @@ public class PluginErrorCollector { public void collectPluginError(final PluginError pluginError) { pluginErrors.add(pluginError); } - - public List getAllErrorMessages() { - return pluginErrors.stream().map(PluginError::getErrorMessage).collect(Collectors.toList()); - } - - public String getConsolidatedErrorMessage() { - final List allErrorMessages = getAllErrorMessages(); - - return IntStream.range(0, allErrorMessages.size()) - .mapToObj(i -> (i + 1) + ". " + allErrorMessages.get(i)) - .collect(Collectors.joining("\n")); - } } diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/validation/PluginErrorsHandler.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/validation/PluginErrorsHandler.java new file mode 100644 index 0000000000..3417ff5bfa --- /dev/null +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/validation/PluginErrorsHandler.java @@ -0,0 +1,8 @@ +package org.opensearch.dataprepper.validation; + +import java.util.Collection; + +public interface PluginErrorsHandler { + + public void handleErrors(final Collection pluginErrors); +} diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/validation/DataPrepperValidateTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/validation/DataPrepperValidateTest.java deleted file mode 100644 index 447fb21838..0000000000 --- a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/validation/DataPrepperValidateTest.java +++ /dev/null @@ -1,3 +0,0 @@ -package org.opensearch.dataprepper.validation; -class DataPrepperValidateTest { -} \ No newline at end of file diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/validation/LoggingPluginErrorsHandlerTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/validation/LoggingPluginErrorsHandlerTest.java new file mode 100644 index 0000000000..f4b8fbbec5 --- /dev/null +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/validation/LoggingPluginErrorsHandlerTest.java @@ -0,0 +1,35 @@ +package org.opensearch.dataprepper.validation; + +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collection; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +class LoggingPluginErrorsHandlerTest { + @Test + void testHandleErrors() { + final Logger mockLogger = mock(Logger.class); + + try (final MockedStatic mockedLoggerFactory = mockStatic(LoggerFactory.class)) { + mockedLoggerFactory.when(() -> LoggerFactory.getLogger(LoggingPluginErrorsHandler.class)) + .thenReturn(mockLogger); + final LoggingPluginErrorsHandler loggingPluginErrorsHandler = new LoggingPluginErrorsHandler(); + final PluginError error1 = mock(PluginError.class); + final PluginError error2 = mock(PluginError.class); + when(error1.getErrorMessage()).thenReturn("Error 1 occurred"); + when(error2.getErrorMessage()).thenReturn("Error 2 occurred"); + final Collection pluginErrors = Arrays.asList(error1, error2); + loggingPluginErrorsHandler.handleErrors(pluginErrors); + final String expectedMessage = "1. Error 1 occurred\n2. Error 2 occurred"; + verify(mockLogger).error(expectedMessage); + } + } +} \ No newline at end of file diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/validation/PluginErrorCollectorTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/validation/PluginErrorCollectorTest.java index 324a202fb8..d0b5fbd8b2 100644 --- a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/validation/PluginErrorCollectorTest.java +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/validation/PluginErrorCollectorTest.java @@ -4,27 +4,17 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.contains; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; class PluginErrorCollectorTest { @Test void testWithPluginErrors() { final PluginErrorCollector objectUnderTest = new PluginErrorCollector(); - final String testErrorMessage1 = "test error message 1"; - final String testErrorMessage2 = "test error message 2"; final PluginError pluginError1 = mock(PluginError.class); - when(pluginError1.getErrorMessage()).thenReturn(testErrorMessage1); final PluginError pluginError2 = mock(PluginError.class); - when(pluginError2.getErrorMessage()).thenReturn(testErrorMessage2); objectUnderTest.collectPluginError(pluginError1); objectUnderTest.collectPluginError(pluginError2); assertThat(objectUnderTest.getPluginErrors().size(), equalTo(2)); - assertThat(objectUnderTest.getAllErrorMessages().size(), equalTo(2)); - assertThat(objectUnderTest.getAllErrorMessages(), contains(testErrorMessage1, testErrorMessage2)); - assertThat(objectUnderTest.getConsolidatedErrorMessage(), equalTo( - String.format("1. %s\n2. %s", testErrorMessage1, testErrorMessage2))); } } \ No newline at end of file diff --git a/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/ExtensionLoader.java b/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/ExtensionLoader.java index f527d835a4..bd895f6e51 100644 --- a/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/ExtensionLoader.java +++ b/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/ExtensionLoader.java @@ -12,6 +12,7 @@ import org.opensearch.dataprepper.model.plugin.InvalidPluginDefinitionException; import org.opensearch.dataprepper.validation.PluginError; import org.opensearch.dataprepper.validation.PluginErrorCollector; +import org.opensearch.dataprepper.validation.PluginErrorsHandler; import javax.inject.Inject; import javax.inject.Named; @@ -26,17 +27,20 @@ public class ExtensionLoader { private final ExtensionClassProvider extensionClassProvider; private final PluginCreator extensionPluginCreator; private final PluginErrorCollector pluginErrorCollector; + private final PluginErrorsHandler pluginErrorsHandler; @Inject ExtensionLoader( final ExtensionPluginConfigurationConverter extensionPluginConfigurationConverter, final ExtensionClassProvider extensionClassProvider, @Named("extensionPluginCreator") final PluginCreator extensionPluginCreator, - final PluginErrorCollector pluginErrorCollector) { + final PluginErrorCollector pluginErrorCollector, + final PluginErrorsHandler pluginErrorsHandler) { this.extensionPluginConfigurationConverter = extensionPluginConfigurationConverter; this.extensionClassProvider = extensionClassProvider; this.extensionPluginCreator = extensionPluginCreator; this.pluginErrorCollector = pluginErrorCollector; + this.pluginErrorsHandler = pluginErrorsHandler; } public List loadExtensions() { @@ -59,10 +63,14 @@ public List loadExtensions() { } }) .collect(Collectors.toList()); - if (!pluginErrorCollector.getPluginErrors().isEmpty()) { + final List extensionPluginErrors = pluginErrorCollector.getPluginErrors() + .stream().filter(pluginError -> PipelinesDataFlowModel.EXTENSION_PLUGIN_TYPE + .equals(pluginError.getComponentType())) + .collect(Collectors.toList()); + if (!extensionPluginErrors.isEmpty()) { + pluginErrorsHandler.handleErrors(extensionPluginErrors); throw new InvalidPluginConfigurationException( - "One or more extension plugins are not configured correctly.\n" - + pluginErrorCollector.getConsolidatedErrorMessage()); + "One or more extension plugins are not configured correctly."); } else { return result; } diff --git a/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/ExtensionLoaderTest.java b/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/ExtensionLoaderTest.java index dc47a4f698..bf46579377 100644 --- a/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/ExtensionLoaderTest.java +++ b/data-prepper-plugin-framework/src/test/java/org/opensearch/dataprepper/plugin/ExtensionLoaderTest.java @@ -18,6 +18,7 @@ import org.mockito.Captor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel; import org.opensearch.dataprepper.model.plugin.ExtensionPlugin; import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.plugin.InvalidPluginDefinitionException; @@ -25,6 +26,7 @@ import org.opensearch.dataprepper.plugins.test.TestExtensionWithConfig; import org.opensearch.dataprepper.validation.PluginError; import org.opensearch.dataprepper.validation.PluginErrorCollector; +import org.opensearch.dataprepper.validation.PluginErrorsHandler; import java.util.ArrayList; import java.util.Collection; @@ -57,13 +59,17 @@ class ExtensionLoaderTest { private ExtensionClassProvider extensionClassProvider; @Mock private PluginCreator extensionPluginCreator; + @Mock + private PluginErrorsHandler pluginErrorsHandler; @Captor private ArgumentCaptor pluginArgumentsContextArgumentCaptor; + @Captor + private ArgumentCaptor> pluginErrorsArgumentCaptor; private PluginErrorCollector pluginErrorCollector; private ExtensionLoader createObjectUnderTest() { return new ExtensionLoader(extensionPluginConfigurationConverter, extensionClassProvider, - extensionPluginCreator, pluginErrorCollector); + extensionPluginCreator, pluginErrorCollector, pluginErrorsHandler); } @BeforeEach @@ -105,6 +111,12 @@ void loadExtensions_returns_single_extension_for_single_plugin_class() { @Test void loadExtensions_throws_InvalidPluginConfigurationException_when_invoking_newPluginInstance_throws_exception() { + pluginErrorCollector.collectPluginError( + PluginError.builder() + .componentType("non-extension") + .pluginName("preexisting-plugin") + .exception(new RuntimeException()) + .build()); final Class pluginClass = (Class) mock(ExtensionPlugin.class).getClass(); when(extensionClassProvider.loadExtensionPluginClasses()).thenReturn(Collections.singleton(pluginClass)); @@ -116,11 +128,16 @@ void loadExtensions_throws_InvalidPluginConfigurationException_when_invoking_new .thenThrow(TestException.class); assertThrows(InvalidPluginConfigurationException.class, () -> createObjectUnderTest().loadExtensions()); - assertThat(pluginErrorCollector.getPluginErrors().size(), equalTo(1)); - final PluginError pluginError = pluginErrorCollector.getPluginErrors().get(0); + assertThat(pluginErrorCollector.getPluginErrors().size(), equalTo(2)); + final PluginError pluginError = pluginErrorCollector.getPluginErrors().get(1); assertThat(pluginError.getPipelineName(), nullValue()); assertThat(pluginError.getPluginName(), CoreMatchers.startsWith("extension_plugin")); assertThat(pluginError.getException(), instanceOf(TestException.class)); + verify(pluginErrorsHandler).handleErrors(pluginErrorsArgumentCaptor.capture()); + final Collection pluginErrorCollection = pluginErrorsArgumentCaptor.getValue(); + assertThat(pluginErrorCollection.size(), equalTo(1)); + final PluginError capturedPluginError = new ArrayList<>(pluginErrorCollection).get(0); + assertThat(capturedPluginError.getComponentType(), equalTo(PipelinesDataFlowModel.EXTENSION_PLUGIN_TYPE)); } @ParameterizedTest