Skip to content
This repository has been archived by the owner on Nov 4, 2024. It is now read-only.

Commit

Permalink
Unit tests completed for the Kafka plugin
Browse files Browse the repository at this point in the history
Signed-off-by: Jade Carino <carino_jade@yahoo.co.uk>
  • Loading branch information
jadecarino committed Jun 16, 2024
1 parent 6c5e50f commit 286eb1a
Show file tree
Hide file tree
Showing 9 changed files with 185 additions and 34 deletions.
4 changes: 2 additions & 2 deletions galasa-extensions-parent/dev.galasa.events.kafka/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ plugins {

description = 'Galasa Events Plug-In - Kafka'

version = '0.34.0'
version = '0.35.0'

dependencies {
implementation 'dev.galasa:kafka.clients:3.7.0'
implementation 'dev.galasa:dev.galasa.framework:0.34.0'
implementation 'dev.galasa:dev.galasa.framework:0.35.0'

testImplementation(project(':dev.galasa.extensions.mocks'))
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@

import dev.galasa.framework.spi.EventsException;
import dev.galasa.framework.spi.IConfigurationPropertyStoreService;
import dev.galasa.framework.spi.IEventProducer;

public interface IEventProducerFactory {

KafkaEventProducer createProducer(Properties properties, String topic) throws EventsException;
IEventProducer createProducer(Properties properties, String topic) throws EventsException;

Properties createProducerConfig(IConfigurationPropertyStoreService cps, String topic) throws KafkaException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@

import dev.galasa.framework.spi.EventsException;
import dev.galasa.framework.spi.IConfigurationPropertyStoreService;
import dev.galasa.framework.spi.IEventProducer;
import dev.galasa.framework.spi.IEventsService;
import dev.galasa.framework.spi.events.IEvent;

import java.util.Map;
import java.util.HashMap;
import java.util.Properties;

public class KafkaEventsService implements IEventsService {
Expand All @@ -21,8 +23,8 @@ public class KafkaEventsService implements IEventsService {

// The EventProducers are cached so they can be reused for performance
// Keyed on the name of the topic as one EventProducer is made for each topic
// Note: Protected so unit tests can access this directly.
protected Map<String, KafkaEventProducer> producers;
// Note: Private but getter method is so unit tests can access this.
private Map<String, IEventProducer> producers = new HashMap<String, IEventProducer>();

public KafkaEventsService(IConfigurationPropertyStoreService cps, IEventProducerFactory producerFactory) {
this.cps = cps;
Expand All @@ -36,9 +38,7 @@ public void produceEvent(String topic, IEvent event) throws EventsException {
throw new KafkaException("Topic is empty");
}

KafkaEventProducer producer;

producer = producers.get(topic);
IEventProducer producer = producers.get(topic);

if (producer == null) {

Expand All @@ -63,9 +63,14 @@ public void produceEvent(String topic, IEvent event) throws EventsException {
@Override
public void shutdown() {
// Shut down all cached EventProducers
for (Map.Entry<String, KafkaEventProducer> entry : producers.entrySet()) {
for (Map.Entry<String, IEventProducer> entry : producers.entrySet()) {
entry.getValue().close();
}
producers.clear();
}

public Map<String, IEventProducer> getProducers() {
return producers;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,64 +5,169 @@
*/
package dev.galasa.events.kafka;

import static org.assertj.core.api.Assertions.*;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.junit.Test;

import dev.galasa.events.kafka.internal.KafkaEventsService;
import dev.galasa.events.kafka.internal.KafkaException;
import dev.galasa.events.kafka.mocks.MockEventProducer;
import dev.galasa.events.kafka.mocks.MockEventProducerFactory;
import dev.galasa.extensions.mocks.MockEnvironment;
import dev.galasa.extensions.mocks.cps.MockConfigurationPropertyStoreService;
import dev.galasa.extensions.mocks.events.MockEvent;
import dev.galasa.framework.spi.ConfigurationPropertyStoreException;
import dev.galasa.framework.spi.EventsException;
import dev.galasa.framework.spi.events.IEvent;

public class TestKafkaEventsService {

private KafkaEventsService createKafkaEventsService() {
Map<String,String> props = new HashMap<String,String>();
props.put("bootstrap.servers", "broker1,broker2");
MockConfigurationPropertyStoreService mockCps = new MockConfigurationPropertyStoreService(props);
MockEnvironment mockEnv = new MockEnvironment();
MockEventProducerFactory mockFactory = new MockEventProducerFactory(mockEnv);
KafkaEventsService kafkaEventsService = new KafkaEventsService(mockCps, mockFactory);
return kafkaEventsService;
}

@Test
public void TestCanCreateAKafkaEventsService() throws ConfigurationPropertyStoreException {
createKafkaEventsService();
}

@Test
public void TestCanProduceAnEventWithValidTopicAndValidEvent() throws Exception {
// Given...
Map<String,String> props = new HashMap<String,String>();
props.put("bootstrap.servers", "broker1,broker2");
KafkaEventsService kafkaEventsService = createKafkaEventsService();

MockConfigurationPropertyStoreService mockCps = new MockConfigurationPropertyStoreService(props);
String topic = "Topic.MyTopic";
MockEvent mockEvent = new MockEvent("2024-06-16T12:49:01.921998Z", "This is a mock event!");

MockEnvironment mockEnv = new MockEnvironment();
int numProducersBefore = kafkaEventsService.getProducers().size();
assertThat(numProducersBefore).isEqualTo(0);

MockEventProducerFactory mockFactory = new MockEventProducerFactory(mockEnv);
// When...
kafkaEventsService.produceEvent(topic, mockEvent);

// Then...
new KafkaEventsService(mockCps, mockFactory);
int numProducersAfter = kafkaEventsService.getProducers().size();
assertThat(numProducersAfter).isEqualTo(1);

MockEventProducer mockProducer = (MockEventProducer) kafkaEventsService.getProducers().get(topic);
assertThat(mockProducer).isNotNull();

List<IEvent> events = mockProducer.getEvents();
assertThat(events).contains(mockEvent);
}

@Test
public void TestCanProduceAnEventWithValidTopicAndValidEvent() throws Exception {
public void TestProduceTwoEventsWithSameTopicUsesCachedProducer() throws EventsException {
// Given...
Map<String,String> props = new HashMap<String,String>();
props.put("bootstrap.servers", "broker1,broker2");
KafkaEventsService kafkaEventsService = createKafkaEventsService();

MockConfigurationPropertyStoreService mockCps = new MockConfigurationPropertyStoreService(props);
String topic = "Topic.MyTopic";
MockEvent mockEvent1 = new MockEvent("2024-06-16T12:49:01.921998Z", "This is a mock event!");
MockEvent mockEvent2 = new MockEvent("2024-06-16T12:49:01.921998Z", "This is another mock event!");

MockEnvironment mockEnv = new MockEnvironment();
int numProducersBefore = kafkaEventsService.getProducers().size();
assertThat(numProducersBefore).isEqualTo(0);

MockEventProducerFactory mockFactory = new MockEventProducerFactory(mockEnv);
// When...
kafkaEventsService.produceEvent(topic, mockEvent1);

KafkaEventsService kafkaEventsService = new KafkaEventsService(mockCps, mockFactory);
// Then...
int numProducersAfterEvent1 = kafkaEventsService.getProducers().size();
assertThat(numProducersAfterEvent1).isEqualTo(1);

// When...
kafkaEventsService.produceEvent(null, null);
kafkaEventsService.produceEvent(topic, mockEvent2);

// Then...
int numProducersAfterEvent2 = kafkaEventsService.getProducers().size();
assertThat(numProducersAfterEvent2).isEqualTo(1);
}

@Test
public void TestProduceTwoEventsWithDifferentTopicsUseDifferentProducers() throws EventsException {
// Given...
KafkaEventsService kafkaEventsService = createKafkaEventsService();

String topic1 = "Topic.MyTopic1";
MockEvent mockEvent1 = new MockEvent("2024-06-16T12:49:01.921998Z", "This is a mock event about one topic!");

String topic2 = "Topic.MyTopic2";
MockEvent mockEvent2 = new MockEvent("2024-06-16T12:49:01.921998Z", "This is a mock event about a different topic!");

int numProducersBefore = kafkaEventsService.getProducers().size();
assertThat(numProducersBefore).isEqualTo(0);

// When...
kafkaEventsService.produceEvent(topic1, mockEvent1);

// Then...
int numProducersAfterEvent1 = kafkaEventsService.getProducers().size();
assertThat(numProducersAfterEvent1).isEqualTo(1);

// When...
kafkaEventsService.produceEvent(topic2, mockEvent2);

// Then...
int numProducersAfterEvent2 = kafkaEventsService.getProducers().size();
assertThat(numProducersAfterEvent2).isEqualTo(2);
}

@Test
public void TestProduceEventWithInvalidTopicAndInvalidEventReturnsError() {
public void TestProduceEventWithEmptyTopicReturnsError() throws EventsException {
// Given...
KafkaEventsService kafkaEventsService = createKafkaEventsService();

String topic = "";
MockEvent mockEvent = new MockEvent("2024-06-16T12:49:01.921998Z", "This is a mock event!");

int numProducersBefore = kafkaEventsService.getProducers().size();
assertThat(numProducersBefore).isEqualTo(0);

// When...
KafkaException thrown = catchThrowableOfType(() -> kafkaEventsService.produceEvent(topic, mockEvent), KafkaException.class);

// Then...
assertThat(thrown).isNotNull();
assertThat(thrown.getMessage()).contains("Topic is empty");

int numProducersAfter = kafkaEventsService.getProducers().size();
assertThat(numProducersAfter).isEqualTo(0);
}

@Test
public void TestCanShutdown() {
public void TestCanShutdown() throws EventsException {
// Given...
KafkaEventsService kafkaEventsService = createKafkaEventsService();

String topic = "Topic.MyTopic";
MockEvent mockEvent = new MockEvent("2024-06-16T12:49:01.921998Z", "This is a mock event!");

int numProducersBefore = kafkaEventsService.getProducers().size();
assertThat(numProducersBefore).isEqualTo(0);

// When...
kafkaEventsService.produceEvent(topic, mockEvent);

// Then...
int numProducersAfterEvent = kafkaEventsService.getProducers().size();
assertThat(numProducersAfterEvent).isEqualTo(1);

// When...
kafkaEventsService.shutdown();

// Then...
int numProducersAfterShutdown = kafkaEventsService.getProducers().size();
assertThat(numProducersAfterShutdown).isEqualTo(0);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright contributors to the Galasa project
*
* SPDX-License-Identifier: EPL-2.0
*/
package dev.galasa.events.kafka.mocks;

import java.util.Properties;
import java.util.ArrayList;
import java.util.List;

import dev.galasa.framework.spi.IEventProducer;
import dev.galasa.framework.spi.events.IEvent;

public class MockEventProducer implements IEventProducer {

public List<IEvent> events = new ArrayList<IEvent>();

public MockEventProducer(Properties properties, String topic) {
}

@Override
public void sendEvent(IEvent event) {
events.add(event);
}

@Override
public void close() {
}

public List<IEvent> getEvents() {
return events;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import java.util.Properties;

import dev.galasa.events.kafka.internal.IEventProducerFactory;
import dev.galasa.events.kafka.internal.KafkaEventProducer;
import dev.galasa.events.kafka.internal.KafkaException;
import dev.galasa.extensions.mocks.MockEnvironment;
import dev.galasa.framework.spi.EventsException;
Expand All @@ -23,15 +22,17 @@ public MockEventProducerFactory(MockEnvironment env) {
}

@Override
public KafkaEventProducer createProducer(Properties properties, String topic) throws EventsException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'createProducer'");
public MockEventProducer createProducer(Properties properties, String topic) throws EventsException {
MockEventProducer producer = new MockEventProducer(properties, topic);
return producer;
}

@Override
public Properties createProducerConfig(IConfigurationPropertyStoreService cps, String topic) throws KafkaException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'createProducerConfig'");
Properties properties = new Properties();
properties.put("topic", topic);
return properties;

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@
import java.net.URL;
import java.util.Properties;
import java.util.Random;
import java.util.Map;
import java.util.HashMap;

import javax.validation.constraints.NotNull;

import dev.galasa.extensions.mocks.cps.MockConfigurationPropertyStoreService;
import dev.galasa.framework.spi.Api;
import dev.galasa.framework.spi.ConfigurationPropertyStoreException;
import dev.galasa.framework.spi.DynamicStatusStoreException;
Expand Down Expand Up @@ -46,7 +49,8 @@ public boolean isInitialised() {
@Override
public @NotNull IConfigurationPropertyStoreService getConfigurationPropertyService(@NotNull String namespace)
throws ConfigurationPropertyStoreException {
throw new UnsupportedOperationException("Unimplemented method 'getConfigurationPropertyService'");
Map<String,String> props = new HashMap<String,String>();
return new MockConfigurationPropertyStoreService(props);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public void registerCredentialsStore(@NotNull ICredentialsStore credentialsStore

@Override
public @NotNull IFramework getFramework() {
return this.framework;
return new MockFramework();
}

@Override
Expand Down
2 changes: 1 addition & 1 deletion release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ framework:
codecoverage: true

- artifact: dev.galasa.events.kafka
version: 0.34.0
version: 0.35.0
obr: true
isolated: true
codecoverage: true

0 comments on commit 286eb1a

Please sign in to comment.