Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exception handling and health actuator implementation #12

Merged
merged 3 commits into from
Sep 22, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
regular kafka Health and test endpoint
SVAdithya committed Sep 22, 2024
commit 7ad7a24e39ca8e88d1eb22bf5c4b3c4d87a848bb
Original file line number Diff line number Diff line change
@@ -1,20 +1,28 @@
package com.kafka.learn.kafkastudy.controller;

import com.kafka.learn.kafkastudy.config.KafkaConsumerConfiguration;
import com.kafka.learn.kafkastudy.controller.model.AppService;
import com.kafka.learn.kafkastudy.health.AppHealthIndicator;
import lombok.AllArgsConstructor;
import org.springframework.stereotype.Controller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

@Controller
import java.time.ZonedDateTime;

@RestController
@AllArgsConstructor
public class HealthController {
private static final Logger logger = LoggerFactory.getLogger(HealthController.class);

private AppHealthIndicator appHealthIndicator;
@GetMapping("/health/{service}")
public ResponseBody healthCheck(@PathVariable AppService service){
appHealthIndicator.getHealth(Boolean.TRUE);
return null;
public String healthCheck(@PathVariable AppService service){
logger.info("Start time : {}", ZonedDateTime.now());
String s = appHealthIndicator.health().toString();
logger.info("Health update : {}, {}",ZonedDateTime.now(), s);
return s;
}
}
Original file line number Diff line number Diff line change
@@ -2,7 +2,8 @@

public enum AppService {
MONGO,
KAFKA,
REGULAR_KAFKA,
REACTIVE_KAFKA,
APP,
OTHERS
}
Original file line number Diff line number Diff line change
@@ -1,54 +1,72 @@
package com.kafka.learn.kafkastudy.health;

import com.mongodb.reactivestreams.client.MongoClient;
import com.kafka.learn.kafkastudy.controller.model.AppService;
import lombok.AllArgsConstructor;
import org.apache.kafka.clients.admin.AdminClient;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.boot.actuate.health.Status;
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.Optional;

@AllArgsConstructor
@Component
public class AppHealthIndicator implements HealthIndicator {
private final MongoClient mongoClient;
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
//private final AdminClient kafkaAdminClient;


@Override
public Health getHealth(boolean includeDetails) {
return HealthIndicator.super.getHealth(includeDetails);
}
private final ReactiveMongoTemplate reactiveMongoTemplate;
private final KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
//private final AdminClient adminClient;

@Override
public Health health() {
return Health.up()
.withDetails(kafkaHealth().getDetails())
.withDetails(mongoHealth().getDetails())
.build();
try {
return Health.up()
.withDetail(AppService.REGULAR_KAFKA.name(), regularKafkaHealth())
//.withDetail(AppService.REACTIVE_KAFKA.name(), reactiveKafkaHealth())
.withDetail(AppService.MONGO.name(), mongoHealth())
.build();
}catch (Exception e) {
return Health.down().build();
}
}

private Health kafkaHealth() {
private String regularKafkaHealth() {
try {
//kafkaAdminClient.describeCluster().nodes().get();
MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer("listenerOne");
MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer("regularKafkaListener");

if (listenerContainer != null && listenerContainer.isRunning())
return Health.up().withDetail("RegularKafkaListener", "UP").build();
return Health.down().withDetail("RegularKafkaListener", "DOWN").build();
return Status.UP.toString();
return Status.DOWN.toString();
} catch (Exception e) {
return Health.down().withDetail("Kafka", "Not Available").withException(e).build();
return Status.DOWN.toString();
}
}

/* //TODO:
private String reactiveKafkaHealth() {
try {
ListTopicsResult result = adminClient.listTopics();
return result.listings().get().size() > 0 ? Status.UP.toString() : Status.DOWN.toString(); // Wait for the result
} catch (InterruptedException | ExecutionException e) {
return Status.DOWN.toString(); // Topic is not accessible
}
}
*/

private Health mongoHealth() {
private String mongoHealth() {
try {
mongoClient.listDatabaseNames();
return Health.up().withDetail("MongoDB", "Available").build();
Mono<Health> h = reactiveMongoTemplate.executeCommand("{ ping: 1 }")
.flatMap(result -> Mono.just(Health.up().withDetail(AppService.MONGO.name(), Status.UP).build()))
.onErrorResume(ex -> Mono.just(Health.down().withDetail(AppService.MONGO.name(), Status.DOWN).build()));
Optional<Health> health = h.blockOptional(Duration.ofSeconds(5));
return health.isPresent() ? health.get().getStatus().toString() : Status.DOWN.toString();
} catch (Exception e) {
return Health.down().withDetail("MongoDB", "Not Available").withException(e).build();
return "DOWN";
}
}
}
Original file line number Diff line number Diff line change
@@ -8,10 +8,14 @@

@Component
public class RegularKafkaListener {
private static final Logger logger = LoggerFactory.getLogger(RegularKafkaListener.class);
private static final Logger logger = LoggerFactory.getLogger(RegularKafkaListener.class);

@KafkaListener(topics = "#{'${kafka.regular.topic}'}", containerFactory = "regularKafkaListenerContainerFactory")
public void consume(Message<String> message) {
logger.info("Regular message: {}, Regular header: {}", message.getPayload(), message.getHeaders() );
}
@KafkaListener(
id = "regularKafkaListener",
topics = "#{'${kafka.regular.topic}'}",
containerFactory = "regularKafkaListenerContainerFactory"
)
public void consume(Message<String> message) {
logger.info("Regular message: {}, Regular header: {}", message.getPayload(), message.getHeaders());
}
}