diff --git a/README.md b/README.md
index 11131a9..89c162a 100644
--- a/README.md
+++ b/README.md
@@ -18,29 +18,38 @@ https://github.com/fauxauldrich/kafka-service/wiki
## Running the application locally
**Run as a Docker container**
+
Fastest way to get started is by using the image available on [Docker Hub](https://hub.docker.com/r/shubhendumadhukar/kafka-service):
- Use the `docker-compose.yml` to bring up your container. (Modify `docker-compose.yml` to reflect the dir path for your truststore files under volumes and update environment variables accordingly)
- Or alternatively, to build image locally use Dockerfile provided.
-- Build with : `docker build -t kafka-service:${VERSION} .`
-- Run as a container using: `docker run -d -env spring.kafka.bootstrap-servers=127.0.0.1:9092 -env kafkaservice.truststore.location=/home/truststore.jks -env kafkaservice.truststore.password=Password@123 --name kafka-service -p 8080:8080 -v /dir/containing/truststore.jks/files:/app/certs kafka-service:${VERSION}`
-
-There are several other ways to run a Spring Boot application on your local machine. One way is to execute the `main` method in the `com.fauxauldrich.kafkaservice.KafkaServiceApplication` class from your IDE.
+ - Build with : `docker build -t kafka-service:${VERSION} .`
+ - Run as a container using:
+ ````shell
+ docker run -d -env spring.kafka.bootstrap-servers=127.0.0.1:9092 -env kafkaservice.truststore.location=/app/certs/truststore.jks -env kafkaservice.truststore.password=Password@123 --name kafka-service -p 8080:8080 -v /dir/containing/truststore.jks/files:/app/certs kafka-service:${VERSION}```
+ ````
-Alternatively you can use the [Spring Boot Maven plugin](https://docs.spring.io/spring-boot/docs/current/reference/html/build-tool-plugins-maven-plugin.html) like so:
+**Run from your IDE**
-```shell
-mvn spring-boot:run
-```
+- Execute the `main` method in the `com.fauxauldrich.kafkaservice.KafkaServiceApplication` class from your IDE.
+- Or you can use the [Spring Boot Maven plugin](https://docs.spring.io/spring-boot/docs/current/reference/html/build-tool-plugins-maven-plugin.html) like so:
+ ```shell
+ mvn spring-boot:run
+ ```
-You can also download the JAR file from [Releases](https://github.com/fauxauldrich/kafka-service/releases) and run it locally:
+**Run as a JAR**
-- `java -Dspring.kafka.bootstrap-servers=127.0.0.1:9092 -Dkafkaservice.truststore.location=/home/truststore.jks -Dkafkaservice.truststore.password=Password -jar kafka-service-${VERSION}.jar`
+- You can also download the JAR file from [Releases](https://github.com/fauxauldrich/kafka-service/releases) and run it locally:
+ ```shell
+ java -Dspring.kafka.bootstrap-servers=127.0.0.1:9092 -Dkafkaservice.truststore.location=/home/truststore.jks -Dkafkaservice.truststore.password=Password -jar kafka-service-${VERSION}-java11.jar
+ ```
+- Or, you can build locally and run the jar file
-Or, you can build locally and run the jar file
+ ```shell
+ ./mvnw clean install package -f pom.xml
-- `./mvnw clean install package -f pom.xml`
-- `java -Dspring.kafka.bootstrap-servers=127.0.0.1:9092 -Dkafkaservice.truststore.location=/home/truststore.jks -Dkafkaservice.truststore.password=Password -jar target/kafka-service-${VERSION}.jar`
+ java -Dspring.kafka.bootstrap-servers=127.0.0.1:9092 -Dkafkaservice.truststore.location=/home/truststore.jks -Dkafkaservice.truststore.password=Password -jar target/kafka-service-${VERSION}-java11.jar
+ ```
## Copyright
diff --git a/pom.xml b/pom.xml
index 4418fc4..99f7197 100644
--- a/pom.xml
+++ b/pom.xml
@@ -9,10 +9,10 @@
com.fauxauldrich
kafka-service
- 0.0.1
+ 0.0.1-java11
jar
kafka-service
- Demo project for Spring Boot
+ HTTP Service for Testing Kafka Cluster
11
@@ -54,6 +54,16 @@
guava
29.0-jre
+
+ io.micrometer
+ micrometer-core
+ 1.6.0
+
+
+ io.micrometer
+ micrometer-registry-prometheus
+ 1.6.0
+
diff --git a/src/main/java/com/fauxauldrich/kafkaservice/resource/DynamicProducerResource.java b/src/main/java/com/fauxauldrich/kafkaservice/resource/DynamicProducerResource.java
index 38ad6b4..d70abfa 100644
--- a/src/main/java/com/fauxauldrich/kafkaservice/resource/DynamicProducerResource.java
+++ b/src/main/java/com/fauxauldrich/kafkaservice/resource/DynamicProducerResource.java
@@ -15,6 +15,8 @@
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
+import io.micrometer.core.annotation.Timed;
+
@RestController
@RequestMapping("/kafka/dynamic")
public class DynamicProducerResource {
@@ -26,54 +28,62 @@ public class DynamicProducerResource {
private Handlebars handebarsConfig;
@PostMapping("/publish")
+ @Timed(description = "publish_dynamic")
public String postSimple(@RequestParam("topic") String TOPIC, @RequestParam(required = false) String KEY,
@RequestParam(required = false) Integer PARTITION_ID, @RequestBody DynamicProducerModel payload)
throws IOException {
Template template = handebarsConfig.compileInline(payload.getMessage());
+ Template keyTemplate = handebarsConfig.compileInline(KEY);
- producerService.publish(payload.getBrokers(), TOPIC, KEY, template.apply(""), PARTITION_ID);
+ producerService.publish(payload.getBrokers(), TOPIC, keyTemplate.apply(""), template.apply(""), PARTITION_ID);
return "Published successfully";
}
@PostMapping("/publish/with-headers")
+ @Timed(description = "publish_dynamic_with_headers")
public String postSimpleWithHeaders(@RequestParam("topic") String TOPIC, @RequestParam(required = false) String KEY,
@RequestParam(required = false) Integer PARTITION_ID, @RequestBody DynamicProducerModelWithHeaders payload)
throws IOException {
Template template = handebarsConfig.compileInline(payload.getMessage());
+ Template keyTemplate = handebarsConfig.compileInline(KEY);
- producerService.publishWithHeaders(payload.getBrokers(), TOPIC, KEY, template.apply(""), PARTITION_ID,
- payload.getHeaders());
+ producerService.publishWithHeaders(payload.getBrokers(), TOPIC, keyTemplate.apply(""), template.apply(""),
+ PARTITION_ID, payload.getHeaders());
return "Published successfully";
}
@PostMapping("/publish/secure")
+ @Timed(description = "publish_dynamic_secure")
public String postSecure(@RequestParam("topic") String TOPIC, @RequestParam() String TRUSTSTORE_LOCATION,
@RequestParam() String TRUSTSTORE_PASSWORD, @RequestParam(required = false) String KEY,
@RequestParam(required = false) Integer PARTITION_ID, @RequestBody DynamicProducerModel payload)
throws IOException {
Template template = handebarsConfig.compileInline(payload.getMessage());
+ Template keyTemplate = handebarsConfig.compileInline(KEY);
- producerService.publishSecure(payload.getBrokers(), TOPIC, KEY, template.apply(""), PARTITION_ID,
- TRUSTSTORE_LOCATION, TRUSTSTORE_PASSWORD);
+ producerService.publishSecure(payload.getBrokers(), TOPIC, keyTemplate.apply(""), template.apply(""),
+ PARTITION_ID, TRUSTSTORE_LOCATION, TRUSTSTORE_PASSWORD);
return "Published successfully";
}
@PostMapping("/publish/secure/with-headers")
+ @Timed(description = "publish_dynamic_secure_with_headers")
public String postSecureWithHeaders(@RequestParam("topic") String TOPIC, @RequestParam() String TRUSTSTORE_LOCATION,
@RequestParam() String TRUSTSTORE_PASSWORD, @RequestParam(required = false) String KEY,
@RequestParam(required = false) Integer PARTITION_ID, @RequestBody DynamicProducerModelWithHeaders payload)
throws IOException {
Template template = handebarsConfig.compileInline(payload.getMessage());
+ Template keyTemplate = handebarsConfig.compileInline(KEY);
- producerService.publishSecureWithHeaders(payload.getBrokers(), TOPIC, KEY, template.apply(""), PARTITION_ID,
- TRUSTSTORE_LOCATION, TRUSTSTORE_PASSWORD, payload.getHeaders());
+ producerService.publishSecureWithHeaders(payload.getBrokers(), TOPIC, keyTemplate.apply(""), template.apply(""),
+ PARTITION_ID, TRUSTSTORE_LOCATION, TRUSTSTORE_PASSWORD, payload.getHeaders());
return "Published successfully";
}
diff --git a/src/main/java/com/fauxauldrich/kafkaservice/resource/ProducerResource.java b/src/main/java/com/fauxauldrich/kafkaservice/resource/ProducerResource.java
index 5cf94f2..99b918b 100644
--- a/src/main/java/com/fauxauldrich/kafkaservice/resource/ProducerResource.java
+++ b/src/main/java/com/fauxauldrich/kafkaservice/resource/ProducerResource.java
@@ -18,6 +18,8 @@
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
+import io.micrometer.core.annotation.Timed;
+
@RestController
@RequestMapping("/kafka")
public class ProducerResource {
@@ -31,15 +33,17 @@ public class ProducerResource {
private Handlebars handebarsConfig;
@PostMapping("/publish")
+ @Timed(description = "publish_simple")
public String postSimple(@RequestParam("topic") String TOPIC, @RequestParam(required = false) String KEY,
@RequestParam(required = false) Integer PARTITION_ID, @RequestBody ProducerModel payload)
throws IOException {
Template template = handebarsConfig.compileInline(payload.getMessage());
if (KEY != null) {
+ Template keyTemplate = handebarsConfig.compileInline(KEY);
if (PARTITION_ID != null) {
- kafkaTemplate.send(TOPIC, PARTITION_ID, KEY, template.apply(""));
+ kafkaTemplate.send(TOPIC, PARTITION_ID, keyTemplate.apply(""), template.apply(""));
}
- kafkaTemplate.send(TOPIC, KEY, template.apply(""));
+ kafkaTemplate.send(TOPIC, keyTemplate.apply(""), template.apply(""));
} else {
kafkaTemplate.send(TOPIC, template.apply(""));
}
@@ -47,6 +51,7 @@ public String postSimple(@RequestParam("topic") String TOPIC, @RequestParam(requ
}
@PostMapping("/publish/with-headers")
+ @Timed(description = "publish_with_headers")
public String postSimpleWithHeaders(@RequestParam("topic") String TOPIC, @RequestParam(required = false) String KEY,
@RequestParam(required = false) Integer PARTITION_ID, @RequestBody ProducerModelWithHeaders payload)
throws IOException {
@@ -55,17 +60,20 @@ public String postSimpleWithHeaders(@RequestParam("topic") String TOPIC, @Reques
ProducerRecord record;
if (KEY != null) {
+ Template keyTemplate = handebarsConfig.compileInline(KEY);
if (PARTITION_ID != null) {
- record = new ProducerRecord<>(TOPIC, PARTITION_ID, KEY, template.apply(""));
+ record = new ProducerRecord<>(TOPIC, PARTITION_ID, keyTemplate.apply(""), template.apply(""));
} else {
- record = new ProducerRecord<>(TOPIC, KEY, template.apply(""));
+ record = new ProducerRecord<>(TOPIC, keyTemplate.apply(""), template.apply(""));
}
} else {
- record = new ProducerRecord<>(TOPIC, PARTITION_ID, KEY, template.apply(""));
+ record = new ProducerRecord<>(TOPIC, template.apply(""));
}
- for (Map.Entry entry : payload.getHeaders().entrySet())
- record.headers().add(new RecordHeader(entry.getKey(), entry.getValue().getBytes()));
+ for (Map.Entry entry : payload.getHeaders().entrySet()) {
+ Template headerTemplate = handebarsConfig.compileInline(entry.getValue());
+ record.headers().add(new RecordHeader(entry.getKey(), (headerTemplate.apply("")).getBytes()));
+ }
kafkaTemplate.send(record);
@@ -73,15 +81,17 @@ record = new ProducerRecord<>(TOPIC, PARTITION_ID, KEY, template.apply(""));
}
@PostMapping("/publish/secure")
+ @Timed(description = "publish_secure")
public String postSecure(@RequestParam("topic") String TOPIC, @RequestParam(required = false) String KEY,
@RequestParam(required = false) Integer PARTITION_ID, @RequestBody ProducerModel payload)
throws IOException {
Template template = handebarsConfig.compileInline(payload.getMessage());
if (KEY != null) {
+ Template keyTemplate = handebarsConfig.compileInline(KEY);
if (PARTITION_ID != null) {
- kafkaSecureTemplate.send(TOPIC, PARTITION_ID, KEY, template.apply(""));
+ kafkaSecureTemplate.send(TOPIC, PARTITION_ID, keyTemplate.apply(""), template.apply(""));
}
- kafkaSecureTemplate.send(TOPIC, KEY, template.apply(""));
+ kafkaSecureTemplate.send(TOPIC, keyTemplate.apply(""), template.apply(""));
} else {
kafkaSecureTemplate.send(TOPIC, template.apply(""));
}
@@ -89,6 +99,7 @@ public String postSecure(@RequestParam("topic") String TOPIC, @RequestParam(requ
}
@PostMapping("/publish/secure/with-headers")
+ @Timed(description = "publish_secure_with_headers")
public String postSecureWithHeaders(@RequestParam("topic") String TOPIC, @RequestParam(required = false) String KEY,
@RequestParam(required = false) Integer PARTITION_ID, @RequestBody ProducerModelWithHeaders payload)
throws IOException {
@@ -97,17 +108,20 @@ public String postSecureWithHeaders(@RequestParam("topic") String TOPIC, @Reques
ProducerRecord record;
if (KEY != null) {
+ Template keyTemplate = handebarsConfig.compileInline(KEY);
if (PARTITION_ID != null) {
- record = new ProducerRecord<>(TOPIC, PARTITION_ID, KEY, template.apply(""));
+ record = new ProducerRecord<>(TOPIC, PARTITION_ID, keyTemplate.apply(""), template.apply(""));
} else {
- record = new ProducerRecord<>(TOPIC, KEY, template.apply(""));
+ record = new ProducerRecord<>(TOPIC, keyTemplate.apply(""), template.apply(""));
}
} else {
- record = new ProducerRecord<>(TOPIC, PARTITION_ID, KEY, template.apply(""));
+ record = new ProducerRecord<>(TOPIC, template.apply(""));
}
- for (Map.Entry entry : payload.getHeaders().entrySet())
- record.headers().add(new RecordHeader(entry.getKey(), entry.getValue().getBytes()));
+ for (Map.Entry entry : payload.getHeaders().entrySet()) {
+ Template headerTemplate = handebarsConfig.compileInline(entry.getValue());
+ record.headers().add(new RecordHeader(entry.getKey(), (headerTemplate.apply("")).getBytes()));
+ }
kafkaSecureTemplate.send(record);
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
index eb78af9..3433e0c 100644
--- a/src/main/resources/application.properties
+++ b/src/main/resources/application.properties
@@ -1,5 +1,5 @@
spring.kafka.bootstrap-servers=127.0.0.1:9092
springdoc.swagger-ui.path=/swagger-ui
-management.endpoints.web.exposure.include=health,info,metrics,env
+management.endpoints.web.exposure.include=health,info,metrics,env,prometheus
kafkaservice.truststore.location=/home/truststore.jks
kafkaservice.truststore.password=Password@123