Skip to content

Commit

Permalink
ENH: Plugin errors consolidator
Browse files Browse the repository at this point in the history
Signed-off-by: George Chen <qchea@amazon.com>
  • Loading branch information
chenqi0805 committed Aug 22, 2024
1 parent 0387808 commit d587b54
Show file tree
Hide file tree
Showing 12 changed files with 93 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.dataprepper.plugins.TestObjectPlugin;
import org.opensearch.dataprepper.plugins.test.TestPlugin;
import org.opensearch.dataprepper.validation.PluginErrorCollector;
import org.opensearch.dataprepper.validation.PluginErrorsConsolidator;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

import java.util.HashMap;
Expand Down Expand Up @@ -63,6 +64,7 @@ private DefaultPluginFactory createObjectUnderTest() {
coreContext.scan(DefaultPluginFactory.class.getPackage().getName());
coreContext.register(PluginBeanFactoryProvider.class);
coreContext.registerBean(PluginErrorCollector.class, PluginErrorCollector::new);
coreContext.registerBean(PluginErrorsConsolidator.class, PluginErrorsConsolidator::new);
coreContext.registerBean(ExtensionsConfiguration.class, () -> extensionsConfiguration);
coreContext.registerBean(PipelinesDataFlowModel.class, () -> pipelinesDataFlowModel);
coreContext.refresh();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.opensearch.dataprepper.plugins.test.TestExtension;
import org.opensearch.dataprepper.sourcecoordination.SourceCoordinatorFactory;
import org.opensearch.dataprepper.validation.PluginErrorCollector;
import org.opensearch.dataprepper.validation.PluginErrorsConsolidator;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

import java.util.ArrayList;
Expand Down Expand Up @@ -70,6 +71,7 @@ public class ExtensionsIT {
private AnnotationConfigApplicationContext coreContext;
private PluginFactory pluginFactory;
private PluginErrorCollector pluginErrorCollector;
private PluginErrorsConsolidator pluginErrorsConsolidator;
private String pluginName;
private String pipelineName;

Expand All @@ -78,6 +80,7 @@ void setUp() {
pluginName = "test_plugin_using_extension";
pipelineName = UUID.randomUUID().toString();
pluginErrorCollector = new PluginErrorCollector();
pluginErrorsConsolidator = new PluginErrorsConsolidator();
publicContext = new AnnotationConfigApplicationContext();
publicContext.refresh();

Expand Down Expand Up @@ -108,6 +111,7 @@ void setUp() {
coreContext.registerBean(ObjectMapper.class, () -> new ObjectMapper(new YAMLFactory()));
coreContext.register(PipelineParserConfiguration.class);
coreContext.registerBean(PluginErrorCollector.class, () -> pluginErrorCollector);
coreContext.registerBean(PluginErrorsConsolidator.class, () -> pluginErrorsConsolidator);
coreContext.refresh();

pluginFactory = coreContext.getBean(DefaultPluginFactory.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.opensearch.dataprepper.sourcecoordination.SourceCoordinatorFactory;
import org.opensearch.dataprepper.validation.PluginError;
import org.opensearch.dataprepper.validation.PluginErrorCollector;
import org.opensearch.dataprepper.validation.PluginErrorsConsolidator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -65,6 +66,7 @@ public class PipelineTransformer {
private final AcknowledgementSetManager acknowledgementSetManager;
private final SourceCoordinatorFactory sourceCoordinatorFactory;
private final PluginErrorCollector pluginErrorCollector;
private final PluginErrorsConsolidator pluginErrorsConsolidator;

public PipelineTransformer(final PipelinesDataFlowModel pipelinesDataFlowModel,
final PluginFactory pluginFactory,
Expand All @@ -75,7 +77,8 @@ public PipelineTransformer(final PipelinesDataFlowModel pipelinesDataFlowModel,
final EventFactory eventFactory,
final AcknowledgementSetManager acknowledgementSetManager,
final SourceCoordinatorFactory sourceCoordinatorFactory,
final PluginErrorCollector pluginErrorCollector) {
final PluginErrorCollector pluginErrorCollector,
final PluginErrorsConsolidator pluginErrorsConsolidator) {
this.pipelinesDataFlowModel = pipelinesDataFlowModel;
this.pluginFactory = Objects.requireNonNull(pluginFactory);
this.peerForwarderProvider = Objects.requireNonNull(peerForwarderProvider);
Expand All @@ -86,6 +89,7 @@ public PipelineTransformer(final PipelinesDataFlowModel pipelinesDataFlowModel,
this.acknowledgementSetManager = acknowledgementSetManager;
this.sourceCoordinatorFactory = sourceCoordinatorFactory;
this.pluginErrorCollector = pluginErrorCollector;
this.pluginErrorsConsolidator = pluginErrorsConsolidator;
}

public Map<String, Pipeline> transformConfiguration() {
Expand Down Expand Up @@ -167,7 +171,8 @@ private void buildPipelineFromConfiguration(
if (!subPipelinePluginErrors.isEmpty()) {
throw new InvalidPluginConfigurationException(
String.format("One or more plugins are not configured correctly in the pipeline: %s.\n",
pipelineName) + pluginErrorCollector.getConsolidatedErrorMessage());
pipelineName) + pluginErrorsConsolidator.consolidatedErrorMessage(
subPipelinePluginErrors));
}

final List<List<Processor>> decoratedProcessorSets = processorSets.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.dataprepper.pipeline.router.RouterFactory;
import org.opensearch.dataprepper.sourcecoordination.SourceCoordinatorFactory;
import org.opensearch.dataprepper.validation.PluginErrorCollector;
import org.opensearch.dataprepper.validation.PluginErrorsConsolidator;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
Expand All @@ -46,7 +47,8 @@ public PipelineTransformer pipelineParser(
final EventFactory eventFactory,
final AcknowledgementSetManager acknowledgementSetManager,
final SourceCoordinatorFactory sourceCoordinatorFactory,
final PluginErrorCollector pluginErrorCollector
final PluginErrorCollector pluginErrorCollector,
final PluginErrorsConsolidator pluginErrorsConsolidator
) {
return new PipelineTransformer(pipelinesDataFlowModel,
pluginFactory,
Expand All @@ -57,7 +59,8 @@ public PipelineTransformer pipelineParser(
eventFactory,
acknowledgementSetManager,
sourceCoordinatorFactory,
pluginErrorCollector);
pluginErrorCollector,
pluginErrorsConsolidator);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.dataprepper.sourcecoordination.SourceCoordinatorFactory;
import org.opensearch.dataprepper.validation.PluginError;
import org.opensearch.dataprepper.validation.PluginErrorCollector;
import org.opensearch.dataprepper.validation.PluginErrorsConsolidator;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

import java.time.Duration;
Expand Down Expand Up @@ -87,6 +88,8 @@ class PipelineTransformerTests {

private PluginErrorCollector pluginErrorCollector;

private PluginErrorsConsolidator pluginErrorsConsolidator;

private EventFactory eventFactory;

private DefaultAcknowledgementSetManager acknowledgementSetManager;
Expand All @@ -99,6 +102,7 @@ void setUp() {
eventFactory = mock(EventFactory.class);
acknowledgementSetManager = mock(DefaultAcknowledgementSetManager.class);
pluginErrorCollector = new PluginErrorCollector();
pluginErrorsConsolidator = new PluginErrorsConsolidator();
final AnnotationConfigApplicationContext publicContext = new AnnotationConfigApplicationContext();
publicContext.refresh();

Expand Down Expand Up @@ -126,9 +130,11 @@ private PipelineTransformer createObjectUnderTest(final String pipelineConfigura

final PipelinesDataFlowModel pipelinesDataFlowModel = new PipelinesDataflowModelParser(
new PipelineConfigurationFileReader(pipelineConfigurationFileLocation)).parseConfiguration();
return new PipelineTransformer(pipelinesDataFlowModel, pluginFactory, peerForwarderProvider,
routerFactory, dataPrepperConfiguration, circuitBreakerManager, eventFactory,
acknowledgementSetManager, sourceCoordinatorFactory, pluginErrorCollector);
return new PipelineTransformer(
pipelinesDataFlowModel, pluginFactory, peerForwarderProvider,
routerFactory, dataPrepperConfiguration, circuitBreakerManager, eventFactory,
acknowledgementSetManager, sourceCoordinatorFactory, pluginErrorCollector,
pluginErrorsConsolidator);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.sourcecoordination.SourceCoordinatorFactory;
import org.opensearch.dataprepper.validation.PluginErrorCollector;
import org.opensearch.dataprepper.validation.PluginErrorsConsolidator;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -59,12 +60,15 @@ class PipelineParserConfigurationTest {
@Mock
private PluginErrorCollector pluginErrorCollector;

@Mock
private PluginErrorsConsolidator pluginErrorsConsolidator;

@Test
void pipelineParser() {
final PipelineTransformer pipelineTransformer = pipelineParserConfiguration.pipelineParser(
pipelinesDataFlowModel, pluginFactory, peerForwarderProvider, routerFactory,
dataPrepperConfiguration, circuitBreakerManager, eventFactory, acknowledgementSetManager,
sourceCoordinatorFactory, pluginErrorCollector);
sourceCoordinatorFactory, pluginErrorCollector, pluginErrorsConsolidator);

assertThat(pipelineTransformer, is(notNullValue()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
import javax.inject.Named;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

@Named
@Getter
Expand All @@ -16,16 +14,4 @@ public class PluginErrorCollector {
public void collectPluginError(final PluginError pluginError) {
pluginErrors.add(pluginError);
}

public List<String> getAllErrorMessages() {
return pluginErrors.stream().map(PluginError::getErrorMessage).collect(Collectors.toList());
}

public String getConsolidatedErrorMessage() {
final List<String> allErrorMessages = getAllErrorMessages();

return IntStream.range(0, allErrorMessages.size())
.mapToObj(i -> (i + 1) + ". " + allErrorMessages.get(i))
.collect(Collectors.joining("\n"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.opensearch.dataprepper.validation;

import javax.inject.Named;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

@Named
public class PluginErrorsConsolidator {
public String consolidatedErrorMessage(final List<PluginError> pluginErrors) {
final List<String> allErrorMessages = pluginErrors.stream()
.map(PluginError::getErrorMessage)
.collect(Collectors.toList());
return IntStream.range(0, allErrorMessages.size())
.mapToObj(i -> (i + 1) + ". " + allErrorMessages.get(i))
.collect(Collectors.joining("\n"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,17 @@

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class PluginErrorCollectorTest {

@Test
void testWithPluginErrors() {
final PluginErrorCollector objectUnderTest = new PluginErrorCollector();
final String testErrorMessage1 = "test error message 1";
final String testErrorMessage2 = "test error message 2";
final PluginError pluginError1 = mock(PluginError.class);
when(pluginError1.getErrorMessage()).thenReturn(testErrorMessage1);
final PluginError pluginError2 = mock(PluginError.class);
when(pluginError2.getErrorMessage()).thenReturn(testErrorMessage2);
objectUnderTest.collectPluginError(pluginError1);
objectUnderTest.collectPluginError(pluginError2);
assertThat(objectUnderTest.getPluginErrors().size(), equalTo(2));
assertThat(objectUnderTest.getAllErrorMessages().size(), equalTo(2));
assertThat(objectUnderTest.getAllErrorMessages(), contains(testErrorMessage1, testErrorMessage2));
assertThat(objectUnderTest.getConsolidatedErrorMessage(), equalTo(
String.format("1. %s\n2. %s", testErrorMessage1, testErrorMessage2)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.opensearch.dataprepper.validation;

import org.junit.jupiter.api.Test;

import java.util.List;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class PluginErrorsConsolidatorTest {

@Test
void testWithPluginErrors() {
final PluginErrorsConsolidator objectUnderTest = new PluginErrorsConsolidator();
final String testErrorMessage1 = "test error message 1";
final String testErrorMessage2 = "test error message 2";
final PluginError pluginError1 = mock(PluginError.class);
when(pluginError1.getErrorMessage()).thenReturn(testErrorMessage1);
final PluginError pluginError2 = mock(PluginError.class);
when(pluginError2.getErrorMessage()).thenReturn(testErrorMessage2);
final String consolidatedErrorMessage = objectUnderTest.consolidatedErrorMessage(
List.of(pluginError1, pluginError2));
assertThat(consolidatedErrorMessage, equalTo(
String.format("1. %s\n2. %s", testErrorMessage1, testErrorMessage2)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.dataprepper.model.plugin.InvalidPluginDefinitionException;
import org.opensearch.dataprepper.validation.PluginError;
import org.opensearch.dataprepper.validation.PluginErrorCollector;
import org.opensearch.dataprepper.validation.PluginErrorsConsolidator;

import javax.inject.Inject;
import javax.inject.Named;
Expand All @@ -26,17 +27,20 @@ public class ExtensionLoader {
private final ExtensionClassProvider extensionClassProvider;
private final PluginCreator extensionPluginCreator;
private final PluginErrorCollector pluginErrorCollector;
private final PluginErrorsConsolidator pluginErrorsConsolidator;

@Inject
ExtensionLoader(
final ExtensionPluginConfigurationConverter extensionPluginConfigurationConverter,
final ExtensionClassProvider extensionClassProvider,
@Named("extensionPluginCreator") final PluginCreator extensionPluginCreator,
final PluginErrorCollector pluginErrorCollector) {
final PluginErrorCollector pluginErrorCollector,
final PluginErrorsConsolidator pluginErrorsConsolidator) {
this.extensionPluginConfigurationConverter = extensionPluginConfigurationConverter;
this.extensionClassProvider = extensionClassProvider;
this.extensionPluginCreator = extensionPluginCreator;
this.pluginErrorCollector = pluginErrorCollector;
this.pluginErrorsConsolidator = pluginErrorsConsolidator;
}

public List<? extends ExtensionPlugin> loadExtensions() {
Expand All @@ -59,10 +63,14 @@ public List<? extends ExtensionPlugin> loadExtensions() {
}
})
.collect(Collectors.toList());
if (!pluginErrorCollector.getPluginErrors().isEmpty()) {
final List<PluginError> extensionPluginErrors = pluginErrorCollector.getPluginErrors()
.stream().filter(pluginError -> PipelinesDataFlowModel.EXTENSION_PLUGIN_TYPE
.equals(pluginError.getComponentType()))
.collect(Collectors.toList());
if (!extensionPluginErrors.isEmpty()) {
throw new InvalidPluginConfigurationException(
"One or more extension plugins are not configured correctly.\n"
+ pluginErrorCollector.getConsolidatedErrorMessage());
+ pluginErrorsConsolidator.consolidatedErrorMessage(extensionPluginErrors));
} else {
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.dataprepper.plugins.test.TestExtensionWithConfig;
import org.opensearch.dataprepper.validation.PluginError;
import org.opensearch.dataprepper.validation.PluginErrorCollector;
import org.opensearch.dataprepper.validation.PluginErrorsConsolidator;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -60,15 +61,17 @@ class ExtensionLoaderTest {
@Captor
private ArgumentCaptor<PluginArgumentsContext> pluginArgumentsContextArgumentCaptor;
private PluginErrorCollector pluginErrorCollector;
private PluginErrorsConsolidator pluginErrorsConsolidator;

private ExtensionLoader createObjectUnderTest() {
return new ExtensionLoader(extensionPluginConfigurationConverter, extensionClassProvider,
extensionPluginCreator, pluginErrorCollector);
extensionPluginCreator, pluginErrorCollector, pluginErrorsConsolidator);
}

@BeforeEach
void setUp() {
pluginErrorCollector = new PluginErrorCollector();
pluginErrorsConsolidator = new PluginErrorsConsolidator();
}

@Test
Expand Down

0 comments on commit d587b54

Please sign in to comment.