Skip to content

Commit

Permalink
ENH: collect plugin config and loading errors during data prepper lau…
Browse files Browse the repository at this point in the history
…nch (#4816)

* ENH: collect plugin errors within core application context

Signed-off-by: George Chen <qchea@amazon.com>
  • Loading branch information
chenqi0805 committed Aug 21, 2024
1 parent b10d465 commit 0387808
Show file tree
Hide file tree
Showing 17 changed files with 420 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@
* @since 1.2
*/
public class PipelinesDataFlowModel {
public static final String EXTENSION_PLUGIN_TYPE = "extension";

@JsonInclude(JsonInclude.Include.NON_NULL)
private DataPrepperVersion version;

@JsonAlias("pipeline_configurations")
@JsonProperty("extension")
@JsonProperty(EXTENSION_PLUGIN_TYPE)
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonSetter(nulls = Nulls.SKIP)
private PipelineExtensions pipelineExtensions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.plugins.TestObjectPlugin;
import org.opensearch.dataprepper.plugins.test.TestPlugin;
import org.opensearch.dataprepper.validation.PluginErrorCollector;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

import java.util.HashMap;
Expand Down Expand Up @@ -61,6 +62,7 @@ private DefaultPluginFactory createObjectUnderTest() {
coreContext.scan(DefaultAcknowledgementSetManager.class.getPackage().getName());
coreContext.scan(DefaultPluginFactory.class.getPackage().getName());
coreContext.register(PluginBeanFactoryProvider.class);
coreContext.registerBean(PluginErrorCollector.class, PluginErrorCollector::new);
coreContext.registerBean(ExtensionsConfiguration.class, () -> extensionsConfiguration);
coreContext.registerBean(PipelinesDataFlowModel.class, () -> pipelinesDataFlowModel);
coreContext.refresh();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.opensearch.dataprepper.plugin.TestPluggableInterface;
import org.opensearch.dataprepper.plugins.test.TestExtension;
import org.opensearch.dataprepper.sourcecoordination.SourceCoordinatorFactory;
import org.opensearch.dataprepper.validation.PluginErrorCollector;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

import java.util.ArrayList;
Expand Down Expand Up @@ -68,13 +69,15 @@ public class ExtensionsIT {
private AnnotationConfigApplicationContext publicContext;
private AnnotationConfigApplicationContext coreContext;
private PluginFactory pluginFactory;
private PluginErrorCollector pluginErrorCollector;
private String pluginName;
private String pipelineName;

@BeforeEach
void setUp() {
pluginName = "test_plugin_using_extension";
pipelineName = UUID.randomUUID().toString();
pluginErrorCollector = new PluginErrorCollector();
publicContext = new AnnotationConfigApplicationContext();
publicContext.refresh();

Expand Down Expand Up @@ -104,6 +107,7 @@ void setUp() {
coreContext.registerBean(ObjectMapperConfiguration.class, ObjectMapperConfiguration::new);
coreContext.registerBean(ObjectMapper.class, () -> new ObjectMapper(new YAMLFactory()));
coreContext.register(PipelineParserConfiguration.class);
coreContext.registerBean(PluginErrorCollector.class, () -> pluginErrorCollector);
coreContext.refresh();

pluginFactory = coreContext.getBean(DefaultPluginFactory.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
import org.opensearch.dataprepper.breaker.CircuitBreakerManager;
import org.opensearch.dataprepper.model.annotations.SingleThread;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.configuration.PipelineModel;
import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.peerforwarder.RequiresPeerForwarding;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.sink.Sink;
Expand All @@ -30,10 +32,13 @@
import org.opensearch.dataprepper.pipeline.router.Router;
import org.opensearch.dataprepper.pipeline.router.RouterFactory;
import org.opensearch.dataprepper.sourcecoordination.SourceCoordinatorFactory;
import org.opensearch.dataprepper.validation.PluginError;
import org.opensearch.dataprepper.validation.PluginErrorCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
Expand All @@ -59,6 +64,7 @@ public class PipelineTransformer {
private final EventFactory eventFactory;
private final AcknowledgementSetManager acknowledgementSetManager;
private final SourceCoordinatorFactory sourceCoordinatorFactory;
private final PluginErrorCollector pluginErrorCollector;

public PipelineTransformer(final PipelinesDataFlowModel pipelinesDataFlowModel,
final PluginFactory pluginFactory,
Expand All @@ -68,7 +74,8 @@ public PipelineTransformer(final PipelinesDataFlowModel pipelinesDataFlowModel,
final CircuitBreakerManager circuitBreakerManager,
final EventFactory eventFactory,
final AcknowledgementSetManager acknowledgementSetManager,
final SourceCoordinatorFactory sourceCoordinatorFactory) {
final SourceCoordinatorFactory sourceCoordinatorFactory,
final PluginErrorCollector pluginErrorCollector) {
this.pipelinesDataFlowModel = pipelinesDataFlowModel;
this.pluginFactory = Objects.requireNonNull(pluginFactory);
this.peerForwarderProvider = Objects.requireNonNull(peerForwarderProvider);
Expand All @@ -78,6 +85,7 @@ public PipelineTransformer(final PipelinesDataFlowModel pipelinesDataFlowModel,
this.eventFactory = eventFactory;
this.acknowledgementSetManager = acknowledgementSetManager;
this.sourceCoordinatorFactory = sourceCoordinatorFactory;
this.pluginErrorCollector = pluginErrorCollector;
}

public Map<String, Pipeline> transformConfiguration() {
Expand Down Expand Up @@ -112,11 +120,34 @@ private void buildPipelineFromConfiguration(
final PluginSetting sourceSetting = pipelineConfiguration.getSourcePluginSetting();
final Optional<Source> pipelineSource = getSourceIfPipelineType(pipelineName, sourceSetting,
pipelineMap, pipelineConfigurationMap);
final Source source = pipelineSource.orElseGet(() ->
pluginFactory.loadPlugin(Source.class, sourceSetting));
final Source source = pipelineSource.orElseGet(() -> {
try {
return pluginFactory.loadPlugin(Source.class, sourceSetting);
} catch (Exception e) {
final PluginError pluginError = PluginError.builder()
.componentType(PipelineModel.SOURCE_PLUGIN_TYPE)
.pipelineName(pipelineName)
.pluginName(sourceSetting.getName())
.exception(e)
.build();
pluginErrorCollector.collectPluginError(pluginError);
return null;
}
});

LOG.info("Building buffer for the pipeline [{}]", pipelineName);
final Buffer pipelineDefinedBuffer = pluginFactory.loadPlugin(Buffer.class, pipelineConfiguration.getBufferPluginSetting(), source.getDecoder());
Buffer pipelineDefinedBuffer = null;
final PluginSetting bufferPluginSetting = pipelineConfiguration.getBufferPluginSetting();
try {
pipelineDefinedBuffer = pluginFactory.loadPlugin(Buffer.class, bufferPluginSetting, source.getDecoder());
} catch (Exception e) {
final PluginError pluginError = PluginError.builder()
.componentType(PipelineModel.BUFFER_PLUGIN_TYPE)
.pipelineName(pipelineName)
.pluginName(bufferPluginSetting.getName())
.build();
pluginErrorCollector.collectPluginError(pluginError);
}

LOG.info("Building processors for the pipeline [{}]", pipelineName);
final int processorThreads = pipelineConfiguration.getWorkers();
Expand All @@ -125,6 +156,20 @@ private void buildPipelineFromConfiguration(
.map(this::newProcessor)
.collect(Collectors.toList());

LOG.info("Building sinks for the pipeline [{}]", pipelineName);
final List<DataFlowComponent<Sink>> sinks = pipelineConfiguration.getSinkPluginSettings().stream()
.map(this::buildRoutedSinkOrConnector)
.collect(Collectors.toList());

final List<PluginError> subPipelinePluginErrors = pluginErrorCollector.getPluginErrors()
.stream().filter(pluginError -> pipelineName.equals(pluginError.getPipelineName()))
.collect(Collectors.toList());
if (!subPipelinePluginErrors.isEmpty()) {
throw new InvalidPluginConfigurationException(
String.format("One or more plugins are not configured correctly in the pipeline: %s.\n",
pipelineName) + pluginErrorCollector.getConsolidatedErrorMessage());
}

final List<List<Processor>> decoratedProcessorSets = processorSets.stream()
.map(processorComponentList -> {
final List<Processor> processors = processorComponentList.stream().map(IdentifiedComponent::getComponent).collect(Collectors.toList());
Expand All @@ -138,11 +183,6 @@ private void buildPipelineFromConfiguration(

final int readBatchDelay = pipelineConfiguration.getReadBatchDelay();

LOG.info("Building sinks for the pipeline [{}]", pipelineName);
final List<DataFlowComponent<Sink>> sinks = pipelineConfiguration.getSinkPluginSettings().stream()
.map(this::buildRoutedSinkOrConnector)
.collect(Collectors.toList());

final List<Buffer> secondaryBuffers = getSecondaryBuffers();
LOG.info("Constructing MultiBufferDecorator with [{}] secondary buffers for pipeline [{}]", secondaryBuffers.size(), pipelineName);
final MultiBufferDecorator multiBufferDecorator = new MultiBufferDecorator(pipelineDefinedBuffer, secondaryBuffers);
Expand All @@ -167,16 +207,27 @@ private void buildPipelineFromConfiguration(
}

private List<IdentifiedComponent<Processor>> newProcessor(final PluginSetting pluginSetting) {
final List<Processor> processors = pluginFactory.loadPlugins(
Processor.class,
pluginSetting,
actualClass -> actualClass.isAnnotationPresent(SingleThread.class) ?
pluginSetting.getNumberOfProcessWorkers() :
1);

return processors.stream()
.map(processor -> new IdentifiedComponent<>(processor, pluginSetting.getName()))
.collect(Collectors.toList());
try {
final List<Processor> processors = pluginFactory.loadPlugins(
Processor.class,
pluginSetting,
actualClass -> actualClass.isAnnotationPresent(SingleThread.class) ?
pluginSetting.getNumberOfProcessWorkers() :
1);

return processors.stream()
.map(processor -> new IdentifiedComponent<>(processor, pluginSetting.getName()))
.collect(Collectors.toList());
} catch (Exception e) {
final PluginError pluginError = PluginError.builder()
.componentType(PipelineModel.PROCESSOR_PLUGIN_TYPE)
.pipelineName(pluginSetting.getPipelineName())
.pluginName(pluginSetting.getName())
.exception(e)
.build();
pluginErrorCollector.collectPluginError(pluginError);
return Collections.emptyList();
}
}

private Optional<Source> getSourceIfPipelineType(
Expand Down Expand Up @@ -213,9 +264,20 @@ private Optional<Source> getSourceIfPipelineType(
}

private DataFlowComponent<Sink> buildRoutedSinkOrConnector(final SinkContextPluginSetting pluginSetting) {
final Sink sink = buildSinkOrConnector(pluginSetting, pluginSetting.getSinkContext());

return new DataFlowComponent<>(sink, pluginSetting.getSinkContext().getRoutes());
try {
final Sink sink = buildSinkOrConnector(pluginSetting, pluginSetting.getSinkContext());

return new DataFlowComponent<>(sink, pluginSetting.getSinkContext().getRoutes());
} catch (Exception e) {
final PluginError pluginError = PluginError.builder()
.componentType(PipelineModel.SINK_PLUGIN_TYPE)
.pipelineName(pluginSetting.getPipelineName())
.pluginName(pluginSetting.getName())
.exception(e)
.build();
pluginErrorCollector.collectPluginError(pluginError);
return null;
}
}

private Sink buildSinkOrConnector(final PluginSetting pluginSetting, final SinkContext sinkContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.dataprepper.pipeline.parser.transformer.PipelineConfigurationTransformer;
import org.opensearch.dataprepper.pipeline.router.RouterFactory;
import org.opensearch.dataprepper.sourcecoordination.SourceCoordinatorFactory;
import org.opensearch.dataprepper.validation.PluginErrorCollector;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
Expand All @@ -44,8 +45,9 @@ public PipelineTransformer pipelineParser(
final CircuitBreakerManager circuitBreakerManager,
final EventFactory eventFactory,
final AcknowledgementSetManager acknowledgementSetManager,
final SourceCoordinatorFactory sourceCoordinatorFactory
) {
final SourceCoordinatorFactory sourceCoordinatorFactory,
final PluginErrorCollector pluginErrorCollector
) {
return new PipelineTransformer(pipelinesDataFlowModel,
pluginFactory,
peerForwarderProvider,
Expand All @@ -54,7 +56,8 @@ public PipelineTransformer pipelineParser(
circuitBreakerManager,
eventFactory,
acknowledgementSetManager,
sourceCoordinatorFactory);
sourceCoordinatorFactory,
pluginErrorCollector);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ public class TestDataProvider {
public static final String VALID_OFF_HEAP_FILE_WITH_ACKS = "src/test/resources/multiple_pipeline_valid_off_heap_buffer_with_acks.yml";
public static final String DISCONNECTED_VALID_OFF_HEAP_FILE_WITH_ACKS = "src/test/resources/multiple_disconnected_pipeline_valid_off_heap_buffer_with_acks.yml";
public static final String CONNECTED_PIPELINE_ROOT_SOURCE_INCORRECT = "src/test/resources/connected_pipeline_incorrect_root_source.yml";
public static final String CONNECTED_PIPELINE_CHILD_PIPELINE_INCORRECT = "src/test/resources/connected_pipeline_incorrect_child_pipeline.yml";
public static final String CONNECTED_PIPELINE_CHILD_PIPELINE_INCORRECT_DUE_TO_SINK = "src/test/resources/connected_pipeline_incorrect_child_pipeline_due_to_invalid_sink.yml";
public static final String CONNECTED_PIPELINE_CHILD_PIPELINE_INCORRECT_DUE_TO_PROCESSOR = "src/test/resources/connected_pipeline_incorrect_child_pipeline_due_to_invalid_processor.yml";
public static final String CYCLE_MULTIPLE_PIPELINE_CONFIG_FILE = "src/test/resources/cyclic_multiple_pipeline_configuration.yml";
public static final String INCORRECT_SOURCE_MULTIPLE_PIPELINE_CONFIG_FILE = "src/test/resources/incorrect_source_multiple_pipeline_configuration.yml";
public static final String MISSING_NAME_MULTIPLE_PIPELINE_CONFIG_FILE = "src/test/resources/missing_name_multiple_pipeline_configuration.yml";
Expand Down
Loading

0 comments on commit 0387808

Please sign in to comment.