Skip to content
This repository has been archived by the owner on Nov 4, 2024. It is now read-only.

Iss1768 - Kafka Events Plugin (new Galasa Extension) #235

Merged
merged 14 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions build-locally.sh
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ function clean_maven_repo {
rm -fr ~/.m2/repository/dev/galasa/dev.galasa.cps.etcd
rm -fr ~/.m2/repository/dev/galasa/dev.galasa.raw.couchdb
rm -fr ~/.m2/repository/dev/galasa/dev.galasa.cps.rest
rm -fr ~/.m2/repository/dev/galasa/dev.galasa.events.kafka
success "OK"
}

Expand Down Expand Up @@ -207,6 +208,19 @@ function displayCouchDbCodeCoverage {
info "See html report here: file://${BASEDIR}/galasa-extensions-parent/dev.galasa.ras.couchdb/build/jacocoHtml/index.html"
}

function displayKafkaCodeCoverage {
h2 "Calculating Kafka code coverage..."
percent_code_complete=$(cat ${BASEDIR}/galasa-extensions-parent/dev.galasa.events.kafka/build/jacocoHtml/dev.galasa.events.kafka.internal/index.html \
| sed "s/.*<td>Total<\/td>//1" \
| cut -f1 -d'%' \
| sed "s/.*>//g")
info
info
info "Statement code coverage is ${percent_code_complete}%"
info
info "See html report here: file://${BASEDIR}/galasa-extensions-parent/dev.galasa.events.kafka/build/jacocoHtml/index.html"
}

function check_secrets {
h2 "updating secrets baseline"
cd ${BASEDIR}
Expand Down Expand Up @@ -234,6 +248,7 @@ function check_secrets {
clean_maven_repo
build_with_gradle
displayCouchDbCodeCoverage
displayKafkaCodeCoverage
check_secrets

success "Project ${project} built - OK - log is at ${log_file}"
7 changes: 7 additions & 0 deletions galasa-extensions-parent/dev.galasa.events.kafka/bnd.bnd
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-snapshot ${tstamp}
Bundle-Name: dev.galasa.events.kafka
Bundle-Description: Provides an events service from a Kafka cluster
Bundle-License: https://www.eclipse.org/legal/epl-2.0
Export-Package: dev.galasa.events.kafka*
Import-Package: \
dev.galasa.framework.spi,
24 changes: 24 additions & 0 deletions galasa-extensions-parent/dev.galasa.events.kafka/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
plugins {
id 'biz.aQute.bnd.builder'
id 'galasa.extensions'
id 'jacoco'
}

description = 'Galasa Events Plug-In - Kafka'

version = '0.35.0'

dependencies {
implementation 'dev.galasa:kafka.clients:3.7.0'
implementation 'dev.galasa:dev.galasa.framework:0.35.0'

testImplementation(project(':dev.galasa.extensions.mocks'))
}

jacocoTestReport {
reports {
xml.required = true
csv.required = true
html.outputLocation = layout.buildDirectory.dir('jacocoHtml')
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
rootProject.name = "dev.galasa.events.kafka"
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright contributors to the Galasa project
*
* SPDX-License-Identifier: EPL-2.0
*/
package dev.galasa.events.kafka.internal;

import java.util.Properties;

import dev.galasa.framework.spi.EventsException;
import dev.galasa.framework.spi.IConfigurationPropertyStoreService;
import dev.galasa.framework.spi.IEventProducer;

public interface IEventProducerFactory {

IEventProducer createProducer(Properties properties, String topic) throws EventsException;

Properties createProducerConfig(IConfigurationPropertyStoreService cps, String topic) throws KafkaException;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright contributors to the Galasa project
*
* SPDX-License-Identifier: EPL-2.0
*/

package dev.galasa.events.kafka.internal;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import dev.galasa.framework.spi.IEventProducer;
import dev.galasa.framework.spi.events.IEvent;

public class KafkaEventProducer implements IEventProducer {

private final KafkaProducer<String, String> producer;
private final String topic;

public KafkaEventProducer(Properties properties, String topic) {

Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());

KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
this.producer = producer;

this.topic = topic;
}

public void sendEvent(IEvent event){
producer.initTransactions();
producer.beginTransaction();
producer.send(new ProducerRecord<>(topic, event.toString()));
producer.commitTransaction();
}

public void close(){
producer.close();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright contributors to the Galasa project
*
* SPDX-License-Identifier: EPL-2.0
*/
package dev.galasa.events.kafka.internal;

import java.util.Properties;

import org.apache.kafka.common.security.plain.PlainLoginModule;
import org.apache.kafka.common.serialization.StringSerializer;

import dev.galasa.framework.spi.ConfigurationPropertyStoreException;
import dev.galasa.framework.spi.EventsException;
import dev.galasa.framework.spi.IConfigurationPropertyStoreService;
import dev.galasa.framework.spi.SystemEnvironment;

public class KafkaEventProducerFactory implements IEventProducerFactory {

private final String TOKEN = "GALASA_EVENT_STREAMS_TOKEN";

private SystemEnvironment env;

public KafkaEventProducerFactory(SystemEnvironment env) {
this.env = env;
}

public KafkaEventProducer createProducer(Properties properties, String topic) throws EventsException {
KafkaEventProducer eventProducer = new KafkaEventProducer(properties, topic);
return eventProducer;
}

public Properties createProducerConfig(IConfigurationPropertyStoreService cps, String topic) throws KafkaException {
Properties properties = new Properties();

try {
String bootstrapServers = cps.getProperty("bootstrap", "servers");

properties.put("bootstrap.servers", bootstrapServers);
properties.put("topic", topic);
properties.put("key.serializer", StringSerializer.class.getName());
properties.put("value.serializer", StringSerializer.class.getName());
properties.put("sasl.jaas.config", PlainLoginModule.class.getName() + " required username=\"token\" password=\"" + this.env.getenv(TOKEN) + "\";");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "PLAIN");
properties.put("ssl.protocol", "TLSv1.2");
properties.put("ssl.enabled.protocols", "TLSv1.2");
properties.put("ssl.endpoint.identification.algorithm", "HTTPS");
properties.put("transactional.id", "transactional-id");

} catch (ConfigurationPropertyStoreException e) {
throw new KafkaException("Unable to retrieve Kafka properties from the CPS", e);
}

return properties;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright contributors to the Galasa project
*
* SPDX-License-Identifier: EPL-2.0
*/
package dev.galasa.events.kafka.internal;

import dev.galasa.framework.spi.EventsException;
import dev.galasa.framework.spi.IConfigurationPropertyStoreService;
import dev.galasa.framework.spi.IEventProducer;
import dev.galasa.framework.spi.IEventsService;
import dev.galasa.framework.spi.events.IEvent;

import java.util.Map;
import java.util.HashMap;
import java.util.Properties;

public class KafkaEventsService implements IEventsService {

private IConfigurationPropertyStoreService cps;

private IEventProducerFactory producerFactory;

// The EventProducers are cached so they can be reused for performance
// Keyed on the name of the topic as one EventProducer is made for each topic
// Note: Private but getter method is so unit tests can access this.
private Map<String, IEventProducer> producers = new HashMap<String, IEventProducer>();

public KafkaEventsService(IConfigurationPropertyStoreService cps, IEventProducerFactory producerFactory) {
this.cps = cps;
this.producerFactory = producerFactory;
}

@Override
public void produceEvent(String topic, IEvent event) throws EventsException {

if (topic == null || topic.isEmpty()) {
throw new KafkaException("Topic is empty");
}

IEventProducer producer = producers.get(topic);

if (producer == null) {

synchronized (producers) {

producer = producers.get(topic);

if (producer == null) {

Properties properties = this.producerFactory.createProducerConfig(cps, topic);

producer = this.producerFactory.createProducer(properties, topic);
producers.put(topic, producer);
}
}

}

producer.sendEvent(event);
}

@Override
public void shutdown() {
// Shut down all cached EventProducers
for (Map.Entry<String, IEventProducer> entry : producers.entrySet()) {
entry.getValue().close();
}
producers.clear();
}

public Map<String, IEventProducer> getProducers() {
return producers;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright contributors to the Galasa project
*
* SPDX-License-Identifier: EPL-2.0
*/
package dev.galasa.events.kafka.internal;

import javax.validation.constraints.NotNull;

import org.osgi.service.component.annotations.Component;

import dev.galasa.framework.spi.EventsException;
import dev.galasa.framework.spi.IConfigurationPropertyStoreService;
import dev.galasa.framework.spi.IEventsServiceRegistration;
import dev.galasa.framework.spi.IFramework;
import dev.galasa.framework.spi.IFrameworkInitialisation;
import dev.galasa.framework.spi.SystemEnvironment;

@Component(service = { IEventsServiceRegistration.class })
public class KafkaEventsServiceRegistration implements IEventsServiceRegistration {

private final String NAMESPACE = "kafka";

@Override
public void initialise(@NotNull IFrameworkInitialisation frameworkInitialisation)
throws EventsException {

try {

IFramework framework = frameworkInitialisation.getFramework();

SystemEnvironment env = new SystemEnvironment();
KafkaEventProducerFactory producerFactory = new KafkaEventProducerFactory(env);
IConfigurationPropertyStoreService cpsService = framework.getConfigurationPropertyService(NAMESPACE);

frameworkInitialisation.registerEventsService(new KafkaEventsService(cpsService, producerFactory));

} catch (Exception e) {
throw new KafkaException("Unable to register the Kafka Events Service", e);
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright contributors to the Galasa project
*
* SPDX-License-Identifier: EPL-2.0
*/

package dev.galasa.events.kafka.internal;

import dev.galasa.framework.spi.EventsException;

public class KafkaException extends EventsException {

private static final long serialVersionUID = 1L;

/**
* {@inheritDoc}
*/
public KafkaException() {
super();
}

/**
* {@inheritDoc}
*/
public KafkaException(String message) {
super(message);
}

/**
* {@inheritDoc}
*/
public KafkaException(Throwable cause) {
super(cause);
}

/**
* {@inheritDoc}
*/
public KafkaException(String message, Throwable cause) {
super(message, cause);
}

}
Loading