diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/PipelinesDataFlowModel.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/PipelinesDataFlowModel.java index 12b092ac36..7eea063bd0 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/PipelinesDataFlowModel.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/PipelinesDataFlowModel.java @@ -27,12 +27,13 @@ * @since 1.2 */ public class PipelinesDataFlowModel { + public static final String EXTENSION_PLUGIN_TYPE = "extension"; @JsonInclude(JsonInclude.Include.NON_NULL) private DataPrepperVersion version; @JsonAlias("pipeline_configurations") - @JsonProperty("extension") + @JsonProperty(EXTENSION_PLUGIN_TYPE) @JsonInclude(JsonInclude.Include.NON_NULL) @JsonSetter(nulls = Nulls.SKIP) private PipelineExtensions pipelineExtensions; diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineTransformer.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineTransformer.java index 9bd7725b80..a01103c03a 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineTransformer.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelineTransformer.java @@ -8,6 +8,7 @@ import org.opensearch.dataprepper.breaker.CircuitBreakerManager; import org.opensearch.dataprepper.model.annotations.SingleThread; import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.configuration.PipelineModel; import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.peerforwarder.RequiresPeerForwarding; @@ -124,6 +125,7 @@ private void buildPipelineFromConfiguration( return pluginFactory.loadPlugin(Source.class, sourceSetting); } catch (Exception e) { final PluginError pluginError = PluginError.builder() + .componentType(PipelineModel.SOURCE_PLUGIN_TYPE) .pipelineName(pipelineName) .pluginName(sourceSetting.getName()) .exception(e) @@ -140,6 +142,7 @@ private void buildPipelineFromConfiguration( pipelineDefinedBuffer = pluginFactory.loadPlugin(Buffer.class, bufferPluginSetting, source.getDecoder()); } catch (Exception e) { final PluginError pluginError = PluginError.builder() + .componentType(PipelineModel.BUFFER_PLUGIN_TYPE) .pipelineName(pipelineName) .pluginName(bufferPluginSetting.getName()) .build(); @@ -217,6 +220,7 @@ private List> newProcessor(final PluginSetting pl .collect(Collectors.toList()); } catch (Exception e) { final PluginError pluginError = PluginError.builder() + .componentType(PipelineModel.PROCESSOR_PLUGIN_TYPE) .pipelineName(pluginSetting.getPipelineName()) .pluginName(pluginSetting.getName()) .exception(e) @@ -266,6 +270,7 @@ private DataFlowComponent buildRoutedSinkOrConnector(final SinkContextPlug return new DataFlowComponent<>(sink, pluginSetting.getSinkContext().getRoutes()); } catch (Exception e) { final PluginError pluginError = PluginError.builder() + .componentType(PipelineModel.SINK_PLUGIN_TYPE) .pipelineName(pluginSetting.getPipelineName()) .pluginName(pluginSetting.getName()) .exception(e) diff --git a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/validation/PluginError.java b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/validation/PluginError.java index 6c374e04a7..50a130c4f4 100644 --- a/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/validation/PluginError.java +++ b/data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/validation/PluginError.java @@ -10,6 +10,7 @@ public class PluginError { static final String PIPELINE_DELIMITER = ":"; static final String CAUSED_BY_DELIMITER = " caused by: "; private final String pipelineName; + private final String componentType; @NonNull private final String pluginName; @NonNull @@ -21,6 +22,10 @@ public String getErrorMessage() { message.append(pipelineName); message.append(PIPELINE_DELIMITER); } + if (componentType != null) { + message.append(componentType); + message.append(PIPELINE_DELIMITER); + } message.append(pluginName); message.append(PIPELINE_DELIMITER); message.append(getFlattenedExceptionMessage(CAUSED_BY_DELIMITER)); diff --git a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/validation/PluginErrorTest.java b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/validation/PluginErrorTest.java index 6b44f24924..c02dde376c 100644 --- a/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/validation/PluginErrorTest.java +++ b/data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/validation/PluginErrorTest.java @@ -9,6 +9,7 @@ class PluginErrorTest { private static final String TEST_PIPELINE_NAME = "test-pipeline"; + private static final String TEST_COMPONENT_TYPE = "test-plugin-type"; private static final String TEST_PLUGIN_NAME = "test-plugin"; private static final String TEST_ERROR_MESSAGE = "test error message"; private static final String TEST_CAUSE_ERROR_MESSAGE = "test cause error message"; @@ -19,11 +20,12 @@ void testGetErrorMessageWithPipelineName() { when(exception.getMessage()).thenReturn(TEST_ERROR_MESSAGE); final PluginError pluginError = PluginError.builder() .pipelineName(TEST_PIPELINE_NAME) + .componentType(TEST_COMPONENT_TYPE) .pluginName(TEST_PLUGIN_NAME) .exception(exception) .build(); assertThat(pluginError.getErrorMessage(), equalTo( - "test-pipeline:test-plugin: caused by: test error message")); + "test-pipeline:test-plugin-type:test-plugin: caused by: test error message")); } @Test @@ -32,10 +34,11 @@ void testGetErrorMessageWithoutPipelineName() { when(exception.getMessage()).thenReturn(TEST_ERROR_MESSAGE); final PluginError pluginError = PluginError.builder() .pluginName(TEST_PLUGIN_NAME) + .componentType(TEST_COMPONENT_TYPE) .exception(exception) .build(); assertThat(pluginError.getErrorMessage(), equalTo( - "test-plugin: caused by: test error message")); + "test-plugin-type:test-plugin: caused by: test error message")); } @Test diff --git a/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/ExtensionLoader.java b/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/ExtensionLoader.java index 279fe94c6b..513da3f863 100644 --- a/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/ExtensionLoader.java +++ b/data-prepper-plugin-framework/src/main/java/org/opensearch/dataprepper/plugin/ExtensionLoader.java @@ -6,6 +6,8 @@ package org.opensearch.dataprepper.plugin; import org.opensearch.dataprepper.model.annotations.DataPrepperExtensionPlugin; +import org.opensearch.dataprepper.model.configuration.PipelineModel; +import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel; import org.opensearch.dataprepper.model.plugin.ExtensionPlugin; import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.plugin.InvalidPluginDefinitionException; @@ -49,6 +51,7 @@ public List loadExtensions() { extensionClass, pluginArgumentsContext, pluginName); } catch (Exception e) { final PluginError pluginError = PluginError.builder() + .componentType(PipelinesDataFlowModel.EXTENSION_PLUGIN_TYPE) .pluginName(pluginName) .exception(e) .build();