From 154deed6e1f42419d707d5e8373505668faea577 Mon Sep 17 00:00:00 2001 From: pankajjangid05 Date: Thu, 11 May 2023 17:14:37 +0530 Subject: [PATCH 1/2] Notification testing using cache --- .../com/uci/outbound/consumers/OutboundKafkaController.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/main/java/com/uci/outbound/consumers/OutboundKafkaController.java b/src/main/java/com/uci/outbound/consumers/OutboundKafkaController.java index 147d0bb..34c9922 100644 --- a/src/main/java/com/uci/outbound/consumers/OutboundKafkaController.java +++ b/src/main/java/com/uci/outbound/consumers/OutboundKafkaController.java @@ -52,6 +52,8 @@ public class OutboundKafkaController { @Value("${spring.mail.recipient}") private String recipient; + private static long count = 0; + @EventListener(ApplicationStartedEvent.class) public void onMessage() { reactiveKafkaReceiver @@ -125,6 +127,8 @@ public void accept(Throwable e) { @Override public void accept(XMessageDAO xMessageDAO) { log.info("XMessage Object saved is with sent user ID >> " + xMessageDAO.getUserId()); + count++; + log.info("Insert Record in Cass : "+count); } }); } catch (Exception e) { From c73426eaad1e571aaebb722f708b7e62223ccf13 Mon Sep 17 00:00:00 2001 From: pankajjangid05 Date: Thu, 18 May 2023 11:25:34 +0530 Subject: [PATCH 2/2] Add logs to check process complete time --- .../consumers/OutboundKafkaController.java | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/uci/outbound/consumers/OutboundKafkaController.java b/src/main/java/com/uci/outbound/consumers/OutboundKafkaController.java index 34c9922..fbb95e3 100644 --- a/src/main/java/com/uci/outbound/consumers/OutboundKafkaController.java +++ b/src/main/java/com/uci/outbound/consumers/OutboundKafkaController.java @@ -56,15 +56,18 @@ public class OutboundKafkaController { @EventListener(ApplicationStartedEvent.class) public void onMessage() { + reactiveKafkaReceiver .doOnNext(new Consumer>() { @Override public void accept(ReceiverRecord msg) { log.info("kafka message receieved!"); + final long startTime = System.nanoTime(); + logTimeTaken(startTime, 0, "process-start: %d ms"); XMessage currentXmsg = null; try { currentXmsg = XMessageParser.parse(new ByteArrayInputStream(msg.value().getBytes())); - sendOutboundMessage(currentXmsg); + sendOutboundMessage(currentXmsg, startTime); } catch (Exception e) { HashMap attachments = new HashMap<>(); attachments.put("Exception", ExceptionUtils.getStackTrace(e)); @@ -92,7 +95,7 @@ public void accept(Throwable e) { * @param currentXmsg * @throws Exception */ - public void sendOutboundMessage(XMessage currentXmsg) throws Exception { + public void sendOutboundMessage(XMessage currentXmsg, long startTime) throws Exception { String channel = currentXmsg.getChannelURI(); String provider = currentXmsg.getProviderURI(); IProvider iprovider = factoryProvider.getProvider(provider, channel); @@ -128,7 +131,8 @@ public void accept(Throwable e) { public void accept(XMessageDAO xMessageDAO) { log.info("XMessage Object saved is with sent user ID >> " + xMessageDAO.getUserId()); count++; - log.info("Insert Record in Cass : "+count); +// log.info("Insert Record in Cass : "+count); + logTimeTaken(startTime, 0, "Insert Record in Cass : " + count +" ::: process-end: %d ms"); } }); } catch (Exception e) { @@ -170,4 +174,14 @@ private void sentEmail(XMessage xMessage, String subject, String body, String re // log.info("EmailDetails :" + emailDetails); emailService.sendMailWithAttachment(emailDetails); } + + private void logTimeTaken(long startTime, int checkpointID, String formatedMsg) { + long endTime = System.nanoTime(); + long duration = (endTime - startTime) / 1000000; + if(formatedMsg == null) { + log.info(String.format("CP-%d: %d ms", checkpointID, duration)); + } else { + log.info(String.format(formatedMsg, duration)); + } + } }