Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: Plugin errors consolidator #4863

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.dataprepper.plugins.TestObjectPlugin;
import org.opensearch.dataprepper.plugins.test.TestPlugin;
import org.opensearch.dataprepper.validation.PluginErrorCollector;
import org.opensearch.dataprepper.validation.PluginErrorsConsolidator;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

import java.util.HashMap;
Expand Down Expand Up @@ -63,6 +64,7 @@ private DefaultPluginFactory createObjectUnderTest() {
coreContext.scan(DefaultPluginFactory.class.getPackage().getName());
coreContext.register(PluginBeanFactoryProvider.class);
coreContext.registerBean(PluginErrorCollector.class, PluginErrorCollector::new);
coreContext.registerBean(PluginErrorsConsolidator.class, PluginErrorsConsolidator::new);
coreContext.registerBean(ExtensionsConfiguration.class, () -> extensionsConfiguration);
coreContext.registerBean(PipelinesDataFlowModel.class, () -> pipelinesDataFlowModel);
coreContext.refresh();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.opensearch.dataprepper.plugins.test.TestExtension;
import org.opensearch.dataprepper.sourcecoordination.SourceCoordinatorFactory;
import org.opensearch.dataprepper.validation.PluginErrorCollector;
import org.opensearch.dataprepper.validation.PluginErrorsConsolidator;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

import java.util.ArrayList;
Expand Down Expand Up @@ -70,6 +71,7 @@ public class ExtensionsIT {
private AnnotationConfigApplicationContext coreContext;
private PluginFactory pluginFactory;
private PluginErrorCollector pluginErrorCollector;
private PluginErrorsConsolidator pluginErrorsConsolidator;
private String pluginName;
private String pipelineName;

Expand All @@ -78,6 +80,7 @@ void setUp() {
pluginName = "test_plugin_using_extension";
pipelineName = UUID.randomUUID().toString();
pluginErrorCollector = new PluginErrorCollector();
pluginErrorsConsolidator = new PluginErrorsConsolidator();
publicContext = new AnnotationConfigApplicationContext();
publicContext.refresh();

Expand Down Expand Up @@ -108,6 +111,7 @@ void setUp() {
coreContext.registerBean(ObjectMapper.class, () -> new ObjectMapper(new YAMLFactory()));
coreContext.register(PipelineParserConfiguration.class);
coreContext.registerBean(PluginErrorCollector.class, () -> pluginErrorCollector);
coreContext.registerBean(PluginErrorsConsolidator.class, () -> pluginErrorsConsolidator);
coreContext.refresh();

pluginFactory = coreContext.getBean(DefaultPluginFactory.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.opensearch.dataprepper.sourcecoordination.SourceCoordinatorFactory;
import org.opensearch.dataprepper.validation.PluginError;
import org.opensearch.dataprepper.validation.PluginErrorCollector;
import org.opensearch.dataprepper.validation.PluginErrorsConsolidator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -65,6 +66,7 @@ public class PipelineTransformer {
private final AcknowledgementSetManager acknowledgementSetManager;
private final SourceCoordinatorFactory sourceCoordinatorFactory;
private final PluginErrorCollector pluginErrorCollector;
private final PluginErrorsConsolidator pluginErrorsConsolidator;

public PipelineTransformer(final PipelinesDataFlowModel pipelinesDataFlowModel,
final PluginFactory pluginFactory,
Expand All @@ -75,7 +77,8 @@ public PipelineTransformer(final PipelinesDataFlowModel pipelinesDataFlowModel,
final EventFactory eventFactory,
final AcknowledgementSetManager acknowledgementSetManager,
final SourceCoordinatorFactory sourceCoordinatorFactory,
final PluginErrorCollector pluginErrorCollector) {
final PluginErrorCollector pluginErrorCollector,
final PluginErrorsConsolidator pluginErrorsConsolidator) {
this.pipelinesDataFlowModel = pipelinesDataFlowModel;
this.pluginFactory = Objects.requireNonNull(pluginFactory);
this.peerForwarderProvider = Objects.requireNonNull(peerForwarderProvider);
Expand All @@ -86,6 +89,7 @@ public PipelineTransformer(final PipelinesDataFlowModel pipelinesDataFlowModel,
this.acknowledgementSetManager = acknowledgementSetManager;
this.sourceCoordinatorFactory = sourceCoordinatorFactory;
this.pluginErrorCollector = pluginErrorCollector;
this.pluginErrorsConsolidator = pluginErrorsConsolidator;
}

public Map<String, Pipeline> transformConfiguration() {
Expand Down Expand Up @@ -167,7 +171,8 @@ private void buildPipelineFromConfiguration(
if (!subPipelinePluginErrors.isEmpty()) {
throw new InvalidPluginConfigurationException(
String.format("One or more plugins are not configured correctly in the pipeline: %s.\n",
pipelineName) + pluginErrorCollector.getConsolidatedErrorMessage());
pipelineName) + pluginErrorsConsolidator.consolidatedErrorMessage(
subPipelinePluginErrors));
}

final List<List<Processor>> decoratedProcessorSets = processorSets.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.dataprepper.pipeline.router.RouterFactory;
import org.opensearch.dataprepper.sourcecoordination.SourceCoordinatorFactory;
import org.opensearch.dataprepper.validation.PluginErrorCollector;
import org.opensearch.dataprepper.validation.PluginErrorsConsolidator;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
Expand All @@ -46,7 +47,8 @@ public PipelineTransformer pipelineParser(
final EventFactory eventFactory,
final AcknowledgementSetManager acknowledgementSetManager,
final SourceCoordinatorFactory sourceCoordinatorFactory,
final PluginErrorCollector pluginErrorCollector
final PluginErrorCollector pluginErrorCollector,
final PluginErrorsConsolidator pluginErrorsConsolidator
) {
return new PipelineTransformer(pipelinesDataFlowModel,
pluginFactory,
Expand All @@ -57,7 +59,8 @@ public PipelineTransformer pipelineParser(
eventFactory,
acknowledgementSetManager,
sourceCoordinatorFactory,
pluginErrorCollector);
pluginErrorCollector,
pluginErrorsConsolidator);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.dataprepper.sourcecoordination.SourceCoordinatorFactory;
import org.opensearch.dataprepper.validation.PluginError;
import org.opensearch.dataprepper.validation.PluginErrorCollector;
import org.opensearch.dataprepper.validation.PluginErrorsConsolidator;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

import java.time.Duration;
Expand Down Expand Up @@ -87,6 +88,8 @@ class PipelineTransformerTests {

private PluginErrorCollector pluginErrorCollector;

private PluginErrorsConsolidator pluginErrorsConsolidator;

private EventFactory eventFactory;

private DefaultAcknowledgementSetManager acknowledgementSetManager;
Expand All @@ -99,6 +102,7 @@ void setUp() {
eventFactory = mock(EventFactory.class);
acknowledgementSetManager = mock(DefaultAcknowledgementSetManager.class);
pluginErrorCollector = new PluginErrorCollector();
pluginErrorsConsolidator = new PluginErrorsConsolidator();
final AnnotationConfigApplicationContext publicContext = new AnnotationConfigApplicationContext();
publicContext.refresh();

Expand All @@ -111,6 +115,7 @@ void setUp() {

coreContext.scan(DefaultPluginFactory.class.getPackage().getName());
coreContext.registerBean(PluginErrorCollector.class, () -> pluginErrorCollector);
coreContext.registerBean(PluginErrorsConsolidator.class, () -> pluginErrorsConsolidator);
coreContext.registerBean(DataPrepperConfiguration.class, () -> dataPrepperConfiguration);
coreContext.registerBean(PipelinesDataFlowModel.class, () -> pipelinesDataFlowModel);
coreContext.refresh();
Expand All @@ -126,9 +131,11 @@ private PipelineTransformer createObjectUnderTest(final String pipelineConfigura

final PipelinesDataFlowModel pipelinesDataFlowModel = new PipelinesDataflowModelParser(
new PipelineConfigurationFileReader(pipelineConfigurationFileLocation)).parseConfiguration();
return new PipelineTransformer(pipelinesDataFlowModel, pluginFactory, peerForwarderProvider,
routerFactory, dataPrepperConfiguration, circuitBreakerManager, eventFactory,
acknowledgementSetManager, sourceCoordinatorFactory, pluginErrorCollector);
return new PipelineTransformer(
pipelinesDataFlowModel, pluginFactory, peerForwarderProvider,
routerFactory, dataPrepperConfiguration, circuitBreakerManager, eventFactory,
acknowledgementSetManager, sourceCoordinatorFactory, pluginErrorCollector,
pluginErrorsConsolidator);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.sourcecoordination.SourceCoordinatorFactory;
import org.opensearch.dataprepper.validation.PluginErrorCollector;
import org.opensearch.dataprepper.validation.PluginErrorsConsolidator;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -59,12 +60,15 @@ class PipelineParserConfigurationTest {
@Mock
private PluginErrorCollector pluginErrorCollector;

@Mock
private PluginErrorsConsolidator pluginErrorsConsolidator;

@Test
void pipelineParser() {
final PipelineTransformer pipelineTransformer = pipelineParserConfiguration.pipelineParser(
pipelinesDataFlowModel, pluginFactory, peerForwarderProvider, routerFactory,
dataPrepperConfiguration, circuitBreakerManager, eventFactory, acknowledgementSetManager,
sourceCoordinatorFactory, pluginErrorCollector);
sourceCoordinatorFactory, pluginErrorCollector, pluginErrorsConsolidator);

assertThat(pipelineTransformer, is(notNullValue()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
import javax.inject.Named;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

@Named
@Getter
Expand All @@ -16,16 +14,4 @@ public class PluginErrorCollector {
public void collectPluginError(final PluginError pluginError) {
pluginErrors.add(pluginError);
}

public List<String> getAllErrorMessages() {
return pluginErrors.stream().map(PluginError::getErrorMessage).collect(Collectors.toList());
}

public String getConsolidatedErrorMessage() {
final List<String> allErrorMessages = getAllErrorMessages();

return IntStream.range(0, allErrorMessages.size())
.mapToObj(i -> (i + 1) + ". " + allErrorMessages.get(i))
.collect(Collectors.joining("\n"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.opensearch.dataprepper.validation;

import javax.inject.Named;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

@Named
public class PluginErrorsConsolidator {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think rather than make this a consolidator that creates a string, a more flexible approach would be to make a PluginErrorsHandler. Let it decide how to handle multiple errors. Maybe it handles it by creating a structured object. Maybe it handles it by logging. Maybe by stdout.

public interface PluginErrorsHandler {
  public void handleErrors(final Collection<PluginError> pluginErrors);
}

And then we can have a default which logs:

@Named
public class LoggingPluginErrorsHandler implements PluginErrorsHandler {
  public void handleErrors(final Collection<PluginError> pluginErrors) {
    // build string
    // log to Slf4j
  }
}

public String consolidatedErrorMessage(final List<PluginError> pluginErrors) {
final List<String> allErrorMessages = pluginErrors.stream()
.map(PluginError::getErrorMessage)
.collect(Collectors.toList());
return IntStream.range(0, allErrorMessages.size())
.mapToObj(i -> (i + 1) + ". " + allErrorMessages.get(i))
.collect(Collectors.joining("\n"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,17 @@

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class PluginErrorCollectorTest {

@Test
void testWithPluginErrors() {
final PluginErrorCollector objectUnderTest = new PluginErrorCollector();
final String testErrorMessage1 = "test error message 1";
final String testErrorMessage2 = "test error message 2";
final PluginError pluginError1 = mock(PluginError.class);
when(pluginError1.getErrorMessage()).thenReturn(testErrorMessage1);
final PluginError pluginError2 = mock(PluginError.class);
when(pluginError2.getErrorMessage()).thenReturn(testErrorMessage2);
objectUnderTest.collectPluginError(pluginError1);
objectUnderTest.collectPluginError(pluginError2);
assertThat(objectUnderTest.getPluginErrors().size(), equalTo(2));
assertThat(objectUnderTest.getAllErrorMessages().size(), equalTo(2));
assertThat(objectUnderTest.getAllErrorMessages(), contains(testErrorMessage1, testErrorMessage2));
assertThat(objectUnderTest.getConsolidatedErrorMessage(), equalTo(
String.format("1. %s\n2. %s", testErrorMessage1, testErrorMessage2)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.opensearch.dataprepper.validation;

import org.junit.jupiter.api.Test;

import java.util.List;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class PluginErrorsConsolidatorTest {

@Test
void testWithPluginErrors() {
final PluginErrorsConsolidator objectUnderTest = new PluginErrorsConsolidator();
final String testErrorMessage1 = "test error message 1";
final String testErrorMessage2 = "test error message 2";
final PluginError pluginError1 = mock(PluginError.class);
when(pluginError1.getErrorMessage()).thenReturn(testErrorMessage1);
final PluginError pluginError2 = mock(PluginError.class);
when(pluginError2.getErrorMessage()).thenReturn(testErrorMessage2);
final String consolidatedErrorMessage = objectUnderTest.consolidatedErrorMessage(
List.of(pluginError1, pluginError2));
assertThat(consolidatedErrorMessage, equalTo(
String.format("1. %s\n2. %s", testErrorMessage1, testErrorMessage2)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.dataprepper.model.plugin.InvalidPluginDefinitionException;
import org.opensearch.dataprepper.validation.PluginError;
import org.opensearch.dataprepper.validation.PluginErrorCollector;
import org.opensearch.dataprepper.validation.PluginErrorsConsolidator;

import javax.inject.Inject;
import javax.inject.Named;
Expand All @@ -26,17 +27,20 @@ public class ExtensionLoader {
private final ExtensionClassProvider extensionClassProvider;
private final PluginCreator extensionPluginCreator;
private final PluginErrorCollector pluginErrorCollector;
private final PluginErrorsConsolidator pluginErrorsConsolidator;

@Inject
ExtensionLoader(
final ExtensionPluginConfigurationConverter extensionPluginConfigurationConverter,
final ExtensionClassProvider extensionClassProvider,
@Named("extensionPluginCreator") final PluginCreator extensionPluginCreator,
final PluginErrorCollector pluginErrorCollector) {
final PluginErrorCollector pluginErrorCollector,
final PluginErrorsConsolidator pluginErrorsConsolidator) {
this.extensionPluginConfigurationConverter = extensionPluginConfigurationConverter;
this.extensionClassProvider = extensionClassProvider;
this.extensionPluginCreator = extensionPluginCreator;
this.pluginErrorCollector = pluginErrorCollector;
this.pluginErrorsConsolidator = pluginErrorsConsolidator;
}

public List<? extends ExtensionPlugin> loadExtensions() {
Expand All @@ -59,10 +63,14 @@ public List<? extends ExtensionPlugin> loadExtensions() {
}
})
.collect(Collectors.toList());
if (!pluginErrorCollector.getPluginErrors().isEmpty()) {
final List<PluginError> extensionPluginErrors = pluginErrorCollector.getPluginErrors()
.stream().filter(pluginError -> PipelinesDataFlowModel.EXTENSION_PLUGIN_TYPE
.equals(pluginError.getComponentType()))
.collect(Collectors.toList());
if (!extensionPluginErrors.isEmpty()) {
Comment on lines +66 to +70
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have one of the unit tests cover this filtering? to verify that if there are pluginErrors with type other than "extension", they will be filtered out.

throw new InvalidPluginConfigurationException(
"One or more extension plugins are not configured correctly.\n"
+ pluginErrorCollector.getConsolidatedErrorMessage());
+ pluginErrorsConsolidator.consolidatedErrorMessage(extensionPluginErrors));
} else {
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.dataprepper.plugins.test.TestExtensionWithConfig;
import org.opensearch.dataprepper.validation.PluginError;
import org.opensearch.dataprepper.validation.PluginErrorCollector;
import org.opensearch.dataprepper.validation.PluginErrorsConsolidator;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -60,15 +61,17 @@ class ExtensionLoaderTest {
@Captor
private ArgumentCaptor<PluginArgumentsContext> pluginArgumentsContextArgumentCaptor;
private PluginErrorCollector pluginErrorCollector;
private PluginErrorsConsolidator pluginErrorsConsolidator;

private ExtensionLoader createObjectUnderTest() {
return new ExtensionLoader(extensionPluginConfigurationConverter, extensionClassProvider,
extensionPluginCreator, pluginErrorCollector);
extensionPluginCreator, pluginErrorCollector, pluginErrorsConsolidator);
}

@BeforeEach
void setUp() {
pluginErrorCollector = new PluginErrorCollector();
pluginErrorsConsolidator = new PluginErrorsConsolidator();
}

@Test
Expand Down
Loading