diff --git a/galasa-extensions-parent/dev.galasa.events.kafka/build.gradle b/galasa-extensions-parent/dev.galasa.events.kafka/build.gradle index 0acfdd57..8237ce71 100644 --- a/galasa-extensions-parent/dev.galasa.events.kafka/build.gradle +++ b/galasa-extensions-parent/dev.galasa.events.kafka/build.gradle @@ -5,11 +5,11 @@ plugins { description = 'Galasa Events Plug-In - Kafka' -version = '0.34.0' +version = '0.35.0' dependencies { implementation 'dev.galasa:kafka.clients:3.7.0' - implementation 'dev.galasa:dev.galasa.framework:0.34.0' + implementation 'dev.galasa:dev.galasa.framework:0.35.0' testImplementation(project(':dev.galasa.extensions.mocks')) } diff --git a/galasa-extensions-parent/dev.galasa.events.kafka/src/main/java/dev/galasa/events/kafka/internal/IEventProducerFactory.java b/galasa-extensions-parent/dev.galasa.events.kafka/src/main/java/dev/galasa/events/kafka/internal/IEventProducerFactory.java index df0ad29d..d14cfa53 100644 --- a/galasa-extensions-parent/dev.galasa.events.kafka/src/main/java/dev/galasa/events/kafka/internal/IEventProducerFactory.java +++ b/galasa-extensions-parent/dev.galasa.events.kafka/src/main/java/dev/galasa/events/kafka/internal/IEventProducerFactory.java @@ -9,10 +9,11 @@ import dev.galasa.framework.spi.EventsException; import dev.galasa.framework.spi.IConfigurationPropertyStoreService; +import dev.galasa.framework.spi.IEventProducer; public interface IEventProducerFactory { - KafkaEventProducer createProducer(Properties properties, String topic) throws EventsException; + IEventProducer createProducer(Properties properties, String topic) throws EventsException; Properties createProducerConfig(IConfigurationPropertyStoreService cps, String topic) throws KafkaException; diff --git a/galasa-extensions-parent/dev.galasa.events.kafka/src/main/java/dev/galasa/events/kafka/internal/KafkaEventsService.java b/galasa-extensions-parent/dev.galasa.events.kafka/src/main/java/dev/galasa/events/kafka/internal/KafkaEventsService.java index 5080dd27..35b63c68 100644 --- a/galasa-extensions-parent/dev.galasa.events.kafka/src/main/java/dev/galasa/events/kafka/internal/KafkaEventsService.java +++ b/galasa-extensions-parent/dev.galasa.events.kafka/src/main/java/dev/galasa/events/kafka/internal/KafkaEventsService.java @@ -7,10 +7,12 @@ import dev.galasa.framework.spi.EventsException; import dev.galasa.framework.spi.IConfigurationPropertyStoreService; +import dev.galasa.framework.spi.IEventProducer; import dev.galasa.framework.spi.IEventsService; import dev.galasa.framework.spi.events.IEvent; import java.util.Map; +import java.util.HashMap; import java.util.Properties; public class KafkaEventsService implements IEventsService { @@ -21,8 +23,8 @@ public class KafkaEventsService implements IEventsService { // The EventProducers are cached so they can be reused for performance // Keyed on the name of the topic as one EventProducer is made for each topic - // Note: Protected so unit tests can access this directly. - protected Map producers; + // Note: Private but getter method is so unit tests can access this. + private Map producers = new HashMap(); public KafkaEventsService(IConfigurationPropertyStoreService cps, IEventProducerFactory producerFactory) { this.cps = cps; @@ -36,9 +38,7 @@ public void produceEvent(String topic, IEvent event) throws EventsException { throw new KafkaException("Topic is empty"); } - KafkaEventProducer producer; - - producer = producers.get(topic); + IEventProducer producer = producers.get(topic); if (producer == null) { @@ -63,9 +63,14 @@ public void produceEvent(String topic, IEvent event) throws EventsException { @Override public void shutdown() { // Shut down all cached EventProducers - for (Map.Entry entry : producers.entrySet()) { + for (Map.Entry entry : producers.entrySet()) { entry.getValue().close(); } + producers.clear(); + } + + public Map getProducers() { + return producers; } } diff --git a/galasa-extensions-parent/dev.galasa.events.kafka/src/test/java/dev/galasa/events/kafka/TestKafkaEventsService.java b/galasa-extensions-parent/dev.galasa.events.kafka/src/test/java/dev/galasa/events/kafka/TestKafkaEventsService.java index cd9dbcbf..9ed38879 100644 --- a/galasa-extensions-parent/dev.galasa.events.kafka/src/test/java/dev/galasa/events/kafka/TestKafkaEventsService.java +++ b/galasa-extensions-parent/dev.galasa.events.kafka/src/test/java/dev/galasa/events/kafka/TestKafkaEventsService.java @@ -5,64 +5,169 @@ */ package dev.galasa.events.kafka; +import static org.assertj.core.api.Assertions.*; + import java.util.HashMap; +import java.util.List; import java.util.Map; import org.junit.Test; import dev.galasa.events.kafka.internal.KafkaEventsService; +import dev.galasa.events.kafka.internal.KafkaException; +import dev.galasa.events.kafka.mocks.MockEventProducer; import dev.galasa.events.kafka.mocks.MockEventProducerFactory; import dev.galasa.extensions.mocks.MockEnvironment; import dev.galasa.extensions.mocks.cps.MockConfigurationPropertyStoreService; +import dev.galasa.extensions.mocks.events.MockEvent; import dev.galasa.framework.spi.ConfigurationPropertyStoreException; +import dev.galasa.framework.spi.EventsException; +import dev.galasa.framework.spi.events.IEvent; public class TestKafkaEventsService { + private KafkaEventsService createKafkaEventsService() { + Map props = new HashMap(); + props.put("bootstrap.servers", "broker1,broker2"); + MockConfigurationPropertyStoreService mockCps = new MockConfigurationPropertyStoreService(props); + MockEnvironment mockEnv = new MockEnvironment(); + MockEventProducerFactory mockFactory = new MockEventProducerFactory(mockEnv); + KafkaEventsService kafkaEventsService = new KafkaEventsService(mockCps, mockFactory); + return kafkaEventsService; + } + @Test public void TestCanCreateAKafkaEventsService() throws ConfigurationPropertyStoreException { + createKafkaEventsService(); + } + + @Test + public void TestCanProduceAnEventWithValidTopicAndValidEvent() throws Exception { // Given... - Map props = new HashMap(); - props.put("bootstrap.servers", "broker1,broker2"); + KafkaEventsService kafkaEventsService = createKafkaEventsService(); - MockConfigurationPropertyStoreService mockCps = new MockConfigurationPropertyStoreService(props); + String topic = "Topic.MyTopic"; + MockEvent mockEvent = new MockEvent("2024-06-16T12:49:01.921998Z", "This is a mock event!"); - MockEnvironment mockEnv = new MockEnvironment(); + int numProducersBefore = kafkaEventsService.getProducers().size(); + assertThat(numProducersBefore).isEqualTo(0); - MockEventProducerFactory mockFactory = new MockEventProducerFactory(mockEnv); + // When... + kafkaEventsService.produceEvent(topic, mockEvent); // Then... - new KafkaEventsService(mockCps, mockFactory); + int numProducersAfter = kafkaEventsService.getProducers().size(); + assertThat(numProducersAfter).isEqualTo(1); + + MockEventProducer mockProducer = (MockEventProducer) kafkaEventsService.getProducers().get(topic); + assertThat(mockProducer).isNotNull(); + + List events = mockProducer.getEvents(); + assertThat(events).contains(mockEvent); } @Test - public void TestCanProduceAnEventWithValidTopicAndValidEvent() throws Exception { + public void TestProduceTwoEventsWithSameTopicUsesCachedProducer() throws EventsException { // Given... - Map props = new HashMap(); - props.put("bootstrap.servers", "broker1,broker2"); + KafkaEventsService kafkaEventsService = createKafkaEventsService(); - MockConfigurationPropertyStoreService mockCps = new MockConfigurationPropertyStoreService(props); + String topic = "Topic.MyTopic"; + MockEvent mockEvent1 = new MockEvent("2024-06-16T12:49:01.921998Z", "This is a mock event!"); + MockEvent mockEvent2 = new MockEvent("2024-06-16T12:49:01.921998Z", "This is another mock event!"); - MockEnvironment mockEnv = new MockEnvironment(); + int numProducersBefore = kafkaEventsService.getProducers().size(); + assertThat(numProducersBefore).isEqualTo(0); - MockEventProducerFactory mockFactory = new MockEventProducerFactory(mockEnv); + // When... + kafkaEventsService.produceEvent(topic, mockEvent1); - KafkaEventsService kafkaEventsService = new KafkaEventsService(mockCps, mockFactory); + // Then... + int numProducersAfterEvent1 = kafkaEventsService.getProducers().size(); + assertThat(numProducersAfterEvent1).isEqualTo(1); // When... - kafkaEventsService.produceEvent(null, null); + kafkaEventsService.produceEvent(topic, mockEvent2); // Then... + int numProducersAfterEvent2 = kafkaEventsService.getProducers().size(); + assertThat(numProducersAfterEvent2).isEqualTo(1); + } + @Test + public void TestProduceTwoEventsWithDifferentTopicsUseDifferentProducers() throws EventsException { + // Given... + KafkaEventsService kafkaEventsService = createKafkaEventsService(); + + String topic1 = "Topic.MyTopic1"; + MockEvent mockEvent1 = new MockEvent("2024-06-16T12:49:01.921998Z", "This is a mock event about one topic!"); + + String topic2 = "Topic.MyTopic2"; + MockEvent mockEvent2 = new MockEvent("2024-06-16T12:49:01.921998Z", "This is a mock event about a different topic!"); + + int numProducersBefore = kafkaEventsService.getProducers().size(); + assertThat(numProducersBefore).isEqualTo(0); + + // When... + kafkaEventsService.produceEvent(topic1, mockEvent1); + + // Then... + int numProducersAfterEvent1 = kafkaEventsService.getProducers().size(); + assertThat(numProducersAfterEvent1).isEqualTo(1); + + // When... + kafkaEventsService.produceEvent(topic2, mockEvent2); + + // Then... + int numProducersAfterEvent2 = kafkaEventsService.getProducers().size(); + assertThat(numProducersAfterEvent2).isEqualTo(2); } @Test - public void TestProduceEventWithInvalidTopicAndInvalidEventReturnsError() { + public void TestProduceEventWithEmptyTopicReturnsError() throws EventsException { + // Given... + KafkaEventsService kafkaEventsService = createKafkaEventsService(); + + String topic = ""; + MockEvent mockEvent = new MockEvent("2024-06-16T12:49:01.921998Z", "This is a mock event!"); + int numProducersBefore = kafkaEventsService.getProducers().size(); + assertThat(numProducersBefore).isEqualTo(0); + + // When... + KafkaException thrown = catchThrowableOfType(() -> kafkaEventsService.produceEvent(topic, mockEvent), KafkaException.class); + + // Then... + assertThat(thrown).isNotNull(); + assertThat(thrown.getMessage()).contains("Topic is empty"); + + int numProducersAfter = kafkaEventsService.getProducers().size(); + assertThat(numProducersAfter).isEqualTo(0); } @Test - public void TestCanShutdown() { + public void TestCanShutdown() throws EventsException { + // Given... + KafkaEventsService kafkaEventsService = createKafkaEventsService(); + + String topic = "Topic.MyTopic"; + MockEvent mockEvent = new MockEvent("2024-06-16T12:49:01.921998Z", "This is a mock event!"); + + int numProducersBefore = kafkaEventsService.getProducers().size(); + assertThat(numProducersBefore).isEqualTo(0); + // When... + kafkaEventsService.produceEvent(topic, mockEvent); + + // Then... + int numProducersAfterEvent = kafkaEventsService.getProducers().size(); + assertThat(numProducersAfterEvent).isEqualTo(1); + + // When... + kafkaEventsService.shutdown(); + + // Then... + int numProducersAfterShutdown = kafkaEventsService.getProducers().size(); + assertThat(numProducersAfterShutdown).isEqualTo(0); } } diff --git a/galasa-extensions-parent/dev.galasa.events.kafka/src/test/java/dev/galasa/events/kafka/mocks/MockEventProducer.java b/galasa-extensions-parent/dev.galasa.events.kafka/src/test/java/dev/galasa/events/kafka/mocks/MockEventProducer.java new file mode 100644 index 00000000..7a7bb826 --- /dev/null +++ b/galasa-extensions-parent/dev.galasa.events.kafka/src/test/java/dev/galasa/events/kafka/mocks/MockEventProducer.java @@ -0,0 +1,35 @@ +/* + * Copyright contributors to the Galasa project + * + * SPDX-License-Identifier: EPL-2.0 + */ +package dev.galasa.events.kafka.mocks; + +import java.util.Properties; +import java.util.ArrayList; +import java.util.List; + +import dev.galasa.framework.spi.IEventProducer; +import dev.galasa.framework.spi.events.IEvent; + +public class MockEventProducer implements IEventProducer { + + public List events = new ArrayList(); + + public MockEventProducer(Properties properties, String topic) { + } + + @Override + public void sendEvent(IEvent event) { + events.add(event); + } + + @Override + public void close() { + } + + public List getEvents() { + return events; + } + +} diff --git a/galasa-extensions-parent/dev.galasa.events.kafka/src/test/java/dev/galasa/events/kafka/mocks/MockEventProducerFactory.java b/galasa-extensions-parent/dev.galasa.events.kafka/src/test/java/dev/galasa/events/kafka/mocks/MockEventProducerFactory.java index 75b42ac3..47e5ba16 100644 --- a/galasa-extensions-parent/dev.galasa.events.kafka/src/test/java/dev/galasa/events/kafka/mocks/MockEventProducerFactory.java +++ b/galasa-extensions-parent/dev.galasa.events.kafka/src/test/java/dev/galasa/events/kafka/mocks/MockEventProducerFactory.java @@ -8,7 +8,6 @@ import java.util.Properties; import dev.galasa.events.kafka.internal.IEventProducerFactory; -import dev.galasa.events.kafka.internal.KafkaEventProducer; import dev.galasa.events.kafka.internal.KafkaException; import dev.galasa.extensions.mocks.MockEnvironment; import dev.galasa.framework.spi.EventsException; @@ -23,15 +22,17 @@ public MockEventProducerFactory(MockEnvironment env) { } @Override - public KafkaEventProducer createProducer(Properties properties, String topic) throws EventsException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'createProducer'"); + public MockEventProducer createProducer(Properties properties, String topic) throws EventsException { + MockEventProducer producer = new MockEventProducer(properties, topic); + return producer; } @Override public Properties createProducerConfig(IConfigurationPropertyStoreService cps, String topic) throws KafkaException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'createProducerConfig'"); + Properties properties = new Properties(); + properties.put("topic", topic); + return properties; + } } diff --git a/galasa-extensions-parent/dev.galasa.extensions.mocks/src/main/java/dev/galasa/extensions/mocks/MockFramework.java b/galasa-extensions-parent/dev.galasa.extensions.mocks/src/main/java/dev/galasa/extensions/mocks/MockFramework.java index 9ab6f0cc..56d67b4c 100644 --- a/galasa-extensions-parent/dev.galasa.extensions.mocks/src/main/java/dev/galasa/extensions/mocks/MockFramework.java +++ b/galasa-extensions-parent/dev.galasa.extensions.mocks/src/main/java/dev/galasa/extensions/mocks/MockFramework.java @@ -8,9 +8,12 @@ import java.net.URL; import java.util.Properties; import java.util.Random; +import java.util.Map; +import java.util.HashMap; import javax.validation.constraints.NotNull; +import dev.galasa.extensions.mocks.cps.MockConfigurationPropertyStoreService; import dev.galasa.framework.spi.Api; import dev.galasa.framework.spi.ConfigurationPropertyStoreException; import dev.galasa.framework.spi.DynamicStatusStoreException; @@ -46,7 +49,8 @@ public boolean isInitialised() { @Override public @NotNull IConfigurationPropertyStoreService getConfigurationPropertyService(@NotNull String namespace) throws ConfigurationPropertyStoreException { - throw new UnsupportedOperationException("Unimplemented method 'getConfigurationPropertyService'"); + Map props = new HashMap(); + return new MockConfigurationPropertyStoreService(props); } @Override diff --git a/galasa-extensions-parent/dev.galasa.extensions.mocks/src/main/java/dev/galasa/extensions/mocks/MockFrameworkInitialisation.java b/galasa-extensions-parent/dev.galasa.extensions.mocks/src/main/java/dev/galasa/extensions/mocks/MockFrameworkInitialisation.java index 23af24a4..bedd2cb5 100644 --- a/galasa-extensions-parent/dev.galasa.extensions.mocks/src/main/java/dev/galasa/extensions/mocks/MockFrameworkInitialisation.java +++ b/galasa-extensions-parent/dev.galasa.extensions.mocks/src/main/java/dev/galasa/extensions/mocks/MockFrameworkInitialisation.java @@ -123,7 +123,7 @@ public void registerCredentialsStore(@NotNull ICredentialsStore credentialsStore @Override public @NotNull IFramework getFramework() { - return this.framework; + return new MockFramework(); } @Override diff --git a/release.yaml b/release.yaml index 41dfe1a8..81647eee 100644 --- a/release.yaml +++ b/release.yaml @@ -44,7 +44,7 @@ framework: codecoverage: true - artifact: dev.galasa.events.kafka - version: 0.34.0 + version: 0.35.0 obr: true isolated: true codecoverage: true \ No newline at end of file