Skip to content

Commit

Permalink
[Hot Fix] - Logs message change is NotificationConsumerReactive file
Browse files Browse the repository at this point in the history
  • Loading branch information
pankajjangid05 committed Jun 6, 2023
1 parent 73b303d commit e02425c
Showing 1 changed file with 12 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void onMessage() {
.doOnError(this::handleKafkaFluxError)
.subscribe();
} catch (Exception ex) {
log.error("OutboundKafkaController:Exception: Exception: " + ex.getMessage());
log.error("NotificationConsumerReactive:Exception: Exception: " + ex.getMessage());
}
}

Expand All @@ -81,15 +81,15 @@ public Mono<XMessage> sendOutboundMessage(ReceiverRecord<String, String> msg) {
attachments.put("Exception", ExceptionUtils.getStackTrace(e));
attachments.put("XMessage", currentXmsg.toString());
sentEmail(currentXmsg, "Error in Outbound", "PFA", null, attachments);
log.error("OutboundKafkaController:Exception: Exception in processOutBoundMessageF:" + e.getMessage());
log.error("NotificationConsumerReactive:Exception: Exception in processOutBoundMessageF:" + e.getMessage());
return Mono.just(new XMessage());
});
} catch (Exception e) {
HashMap<String, String> attachments = new HashMap<>();
attachments.put("Exception", ExceptionUtils.getStackTrace(e));
attachments.put("XMessage", msg.toString());
sentEmail(null, "Error in Outbound", "PFA", null, attachments);
log.error("OutboundKafkaController:Exception: " + e.getMessage());
log.error("NotificationConsumerReactive:Exception: " + e.getMessage());
return Mono.just(new XMessage());
}
});
Expand All @@ -99,14 +99,14 @@ public Flux<XMessage> persistToCassandra(List<XMessage> xMessageList) {
log.info("Buffer data : " + xMessageList.size() + " [0] : " + xMessageList.get(0));
return Flux.fromIterable(xMessageList)
.doOnNext(this::saveXMessage)
.doOnError(msg -> log.error("OutboundKafkaController:Exception: " + msg));
.doOnError(msg -> log.error("NotificationConsumerReactive:Exception: " + msg));
}


public void saveXMessage(XMessage xMessage) {
if (xMessage.getApp() != null) {
try {
log.info("Outbound convertXMessageToDAO : " + xMessage.toString());
log.info("NotificationConsumerReactive:saveXMessage::convertXMessageToDAO : " + xMessage.toString());
XMessageDAO dao = null;
dao = XMessageDAOUtils.convertXMessageToDAO(xMessage);
redisCacheService.setXMessageDaoCache(xMessage.getTo().getUserID(), dao);
Expand All @@ -116,21 +116,21 @@ public void saveXMessage(XMessage xMessage) {
@Override
public void accept(Throwable e) {
redisCacheService.deleteXMessageDaoCache(xMessage.getTo().getUserID());
log.error("OutboundKafkaController:Exception: " + e.getMessage());
log.error("NotificationConsumerReactive:Exception: " + e.getMessage());
}
})
.subscribe(new Consumer<XMessageDAO>() {
@Override
public void accept(XMessageDAO xMessageDAO) {
log.info("XMessage Object saved is with sent user ID >> " + xMessageDAO.getUserId());
log.info("NotificationConsumerReactive: XMessage Object saved is with sent user ID >> " + xMessageDAO.getUserId());

String channel = xMessage.getChannelURI();
String provider = xMessage.getProviderURI();

if (provider.toLowerCase().equals("firebase") && channel.toLowerCase().equals("web")) {
notificationCount++;
log.info("OutboundKafkaController:Notification Insert Record in Cass : " + notificationCount);
// logTimeTaken(startTime, 0, "OutboundKafkaController:Notification Insert Record in Cass : " + notificationCount + " ::: process-end: %d ms");
log.info("NotificationConsumerReactive:Notification Insert Record in Cass : " + notificationCount);
// logTimeTaken(startTime, 0, "NotificationConsumerReactive:Notification Insert Record in Cass : " + notificationCount + " ::: process-end: %d ms");
} else {
otherCount++;
// logTimeTaken(startTime, 0, "Other Insert Record in Cass : " + otherCount + " ::: process-end: %d ms");
Expand All @@ -143,10 +143,10 @@ public void accept(XMessageDAO xMessageDAO) {
attachments.put("Exception", ExceptionUtils.getStackTrace(e));
attachments.put("XMessage", xMessage.toString());
sentEmail(xMessage, "Error in Outbound", "PFA", null, attachments);
log.error("OutboundKafkaController:Exception: Exception in convertXMessageToDAO: " + e.getMessage());
log.error("NotificationConsumerReactive:Exception: Exception in convertXMessageToDAO: " + e.getMessage());
}
} else {
log.info("OutboundKafkaController:XMessage -> app is empty");
log.info("NotificationConsumerReactive:XMessage -> app is empty " + xMessage);
}
}

Expand All @@ -156,7 +156,7 @@ private void handleKafkaFluxError(Throwable e) {
try {
sentEmail(null, "Error in Outbound", "PFA", null, attachments);
} catch (Exception ex) {
log.error("NotificationConsumerReactive:Exception:");
log.error("NotificationConsumerReactive:Exception:" + ex.getMessage());
}
log.error("NotificationConsumerReactive:Exception: " + e.getMessage());
}
Expand Down

0 comments on commit e02425c

Please sign in to comment.