Skip to content

Commit

Permalink
API - Server Sent Events, rabbit mq binded to /notifications
Browse files Browse the repository at this point in the history
  • Loading branch information
AlperMulayim committed Sep 16, 2023
1 parent ea6f955 commit e66553c
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 3 deletions.
5 changes: 4 additions & 1 deletion leasesoft/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ public class BuildingController {
@Autowired
public BuildingService service;


@Autowired
@Qualifier("notification-publisher-rabbitmq")
public INotificationPublisher notificationPublisher;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,15 @@

@Component("notification-consumer-rabbitmq")
public class NotificationRabbitMQConsumer {

private LSNotificationDto notification = new LSNotificationDto();

@RabbitListener(queues = {"notification_queue"})
public void receive(@Payload LSNotificationDto notificationDto){
System.out.println("RECEİVED -> " + notificationDto.toString());
this.notification = notificationDto;
}

public LSNotificationDto getNotification(){
return this.notification;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.alper.leasesoftprov2.leasesoft.notifications;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;

import java.io.IOException;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.stream.Stream;

@RestController
@RequestMapping("api/v1/notifications")
public class NotificationReactiveController {

@Autowired
NotificationRabbitMQConsumer rabbitMQConsumer;

@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> getNotification() throws IOException, InterruptedException {

Flux notificationFlux = Flux.fromStream(Stream.of(rabbitMQConsumer.getNotification() != null
? rabbitMQConsumer.getNotification() : ""))
.interval(Duration.ofSeconds(2))
.map(line -> {

return ServerSentEvent.<String>builder()
.data(rabbitMQConsumer.getNotification().getMessage() + " ---- " + LocalDateTime.now().toString())
.event("Notification")
.build();
});

return keepAlive(Duration.ofSeconds(3),
notificationFlux, 1);

}

private <T> Flux keepAlive(Duration duration, Flux<T> data, Integer id) {
Flux<ServerSentEvent<T>> heartBeat = Flux.interval(duration)
.map(e -> ServerSentEvent.<T>builder()
.comment("keep alive for:" + id)
.build());
return Flux.merge(heartBeat, data);
}
}

0 comments on commit e66553c

Please sign in to comment.