Skip to content

Commit 73b303d

Browse files
Created New NotificationConsumerReactive File for Notification
1 parent 03483c0 commit 73b303d

File tree

1 file changed

+176
-0
lines changed

1 file changed

+176
-0
lines changed
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
package com.uci.outbound.consumers;
2+
3+
import com.uci.adapter.provider.factory.IProvider;
4+
import com.uci.adapter.provider.factory.ProviderFactory;
5+
import com.uci.dao.models.XMessageDAO;
6+
import com.uci.dao.repository.XMessageRepository;
7+
import com.uci.dao.utils.XMessageDAOUtils;
8+
import com.uci.utils.cache.service.RedisCacheService;
9+
import com.uci.utils.model.EmailDetails;
10+
import com.uci.utils.service.EmailServiceImpl;
11+
import lombok.RequiredArgsConstructor;
12+
import lombok.extern.slf4j.Slf4j;
13+
import messagerosa.core.model.XMessage;
14+
import messagerosa.xml.XMessageParser;
15+
import org.apache.commons.lang3.exception.ExceptionUtils;
16+
import org.springframework.beans.factory.annotation.Autowired;
17+
import org.springframework.beans.factory.annotation.Value;
18+
import org.springframework.boot.context.event.ApplicationStartedEvent;
19+
import org.springframework.context.event.EventListener;
20+
import org.springframework.stereotype.Component;
21+
import reactor.core.publisher.Flux;
22+
import reactor.core.publisher.Mono;
23+
import reactor.kafka.receiver.ReceiverRecord;
24+
25+
import java.io.ByteArrayInputStream;
26+
import java.time.Duration;
27+
import java.util.HashMap;
28+
import java.util.List;
29+
import java.util.function.Consumer;
30+
31+
@Component
32+
@RequiredArgsConstructor
33+
@Slf4j
34+
public class NotificationConsumerReactive {
35+
@Autowired
36+
private EmailServiceImpl emailService;
37+
@Value("${spring.mail.recipient}")
38+
private String recipient;
39+
@Autowired
40+
private ProviderFactory factoryProvider;
41+
@Autowired
42+
private RedisCacheService redisCacheService;
43+
@Autowired
44+
private XMessageRepository xMessageRepo;
45+
46+
private final Flux<ReceiverRecord<String, String>> reactiveKafkaReceiverNotification;
47+
48+
private long notificationCount, otherCount, consumeCount;
49+
50+
51+
@EventListener(ApplicationStartedEvent.class)
52+
public void onMessage() {
53+
try {
54+
reactiveKafkaReceiverNotification
55+
.doOnNext(this::logMessage)
56+
.flatMap(this::sendOutboundMessage)
57+
.onBackpressureBuffer()
58+
.bufferTimeout(1000, Duration.ofSeconds(10))
59+
.flatMap(this::persistToCassandra)
60+
.doOnError(this::handleKafkaFluxError)
61+
.subscribe();
62+
} catch (Exception ex) {
63+
log.error("OutboundKafkaController:Exception: Exception: " + ex.getMessage());
64+
}
65+
}
66+
67+
private void logMessage(ReceiverRecord<String, String> msg) {
68+
log.info("NotificationConsumerReactive:Notification topic consume from kafka: " + consumeCount);
69+
}
70+
71+
public Mono<XMessage> sendOutboundMessage(ReceiverRecord<String, String> msg) {
72+
return Mono.defer(() -> {
73+
try {
74+
XMessage currentXmsg = XMessageParser.parse(new ByteArrayInputStream(msg.value().getBytes()));
75+
String channel = currentXmsg.getChannelURI();
76+
String provider = currentXmsg.getProviderURI();
77+
IProvider iprovider = factoryProvider.getProvider(provider, channel);
78+
return iprovider.processOutBoundMessageF(currentXmsg)
79+
.onErrorResume(e -> {
80+
HashMap<String, String> attachments = new HashMap<>();
81+
attachments.put("Exception", ExceptionUtils.getStackTrace(e));
82+
attachments.put("XMessage", currentXmsg.toString());
83+
sentEmail(currentXmsg, "Error in Outbound", "PFA", null, attachments);
84+
log.error("OutboundKafkaController:Exception: Exception in processOutBoundMessageF:" + e.getMessage());
85+
return Mono.just(new XMessage());
86+
});
87+
} catch (Exception e) {
88+
HashMap<String, String> attachments = new HashMap<>();
89+
attachments.put("Exception", ExceptionUtils.getStackTrace(e));
90+
attachments.put("XMessage", msg.toString());
91+
sentEmail(null, "Error in Outbound", "PFA", null, attachments);
92+
log.error("OutboundKafkaController:Exception: " + e.getMessage());
93+
return Mono.just(new XMessage());
94+
}
95+
});
96+
}
97+
98+
public Flux<XMessage> persistToCassandra(List<XMessage> xMessageList) {
99+
log.info("Buffer data : " + xMessageList.size() + " [0] : " + xMessageList.get(0));
100+
return Flux.fromIterable(xMessageList)
101+
.doOnNext(this::saveXMessage)
102+
.doOnError(msg -> log.error("OutboundKafkaController:Exception: " + msg));
103+
}
104+
105+
106+
public void saveXMessage(XMessage xMessage) {
107+
if (xMessage.getApp() != null) {
108+
try {
109+
log.info("Outbound convertXMessageToDAO : " + xMessage.toString());
110+
XMessageDAO dao = null;
111+
dao = XMessageDAOUtils.convertXMessageToDAO(xMessage);
112+
redisCacheService.setXMessageDaoCache(xMessage.getTo().getUserID(), dao);
113+
xMessageRepo
114+
.insert(dao)
115+
.doOnError(new Consumer<Throwable>() {
116+
@Override
117+
public void accept(Throwable e) {
118+
redisCacheService.deleteXMessageDaoCache(xMessage.getTo().getUserID());
119+
log.error("OutboundKafkaController:Exception: " + e.getMessage());
120+
}
121+
})
122+
.subscribe(new Consumer<XMessageDAO>() {
123+
@Override
124+
public void accept(XMessageDAO xMessageDAO) {
125+
log.info("XMessage Object saved is with sent user ID >> " + xMessageDAO.getUserId());
126+
127+
String channel = xMessage.getChannelURI();
128+
String provider = xMessage.getProviderURI();
129+
130+
if (provider.toLowerCase().equals("firebase") && channel.toLowerCase().equals("web")) {
131+
notificationCount++;
132+
log.info("OutboundKafkaController:Notification Insert Record in Cass : " + notificationCount);
133+
// logTimeTaken(startTime, 0, "OutboundKafkaController:Notification Insert Record in Cass : " + notificationCount + " ::: process-end: %d ms");
134+
} else {
135+
otherCount++;
136+
// logTimeTaken(startTime, 0, "Other Insert Record in Cass : " + otherCount + " ::: process-end: %d ms");
137+
log.info("Other Insert Record in Cass : " + otherCount);
138+
}
139+
}
140+
});
141+
} catch (Exception e) {
142+
HashMap<String, String> attachments = new HashMap<>();
143+
attachments.put("Exception", ExceptionUtils.getStackTrace(e));
144+
attachments.put("XMessage", xMessage.toString());
145+
sentEmail(xMessage, "Error in Outbound", "PFA", null, attachments);
146+
log.error("OutboundKafkaController:Exception: Exception in convertXMessageToDAO: " + e.getMessage());
147+
}
148+
} else {
149+
log.info("OutboundKafkaController:XMessage -> app is empty");
150+
}
151+
}
152+
153+
private void handleKafkaFluxError(Throwable e) {
154+
HashMap<String, String> attachments = new HashMap<>();
155+
attachments.put("Exception", ExceptionUtils.getStackTrace(e));
156+
try {
157+
sentEmail(null, "Error in Outbound", "PFA", null, attachments);
158+
} catch (Exception ex) {
159+
log.error("NotificationConsumerReactive:Exception:");
160+
}
161+
log.error("NotificationConsumerReactive:Exception: " + e.getMessage());
162+
}
163+
164+
private void sentEmail(XMessage xMessage, String subject, String body, String attachmentFileName, HashMap<String, String> attachments) {
165+
log.info("Email Sending....");
166+
EmailDetails emailDetails = new EmailDetails().builder()
167+
.subject(subject)
168+
.msgBody(body)
169+
.recipient(recipient)
170+
.attachment(xMessage == null ? "" : xMessage.toString())
171+
.attachmentFileName(attachmentFileName)
172+
.attachments(attachments)
173+
.build();
174+
emailService.sendMailWithAttachment(emailDetails);
175+
}
176+
}

0 commit comments

Comments
 (0)