In previous blog posts, I have demonstrated how to use Angular/Spring WebFlux to build a simple and stupid Chat application with varied protocols, including SSE, WebSocket, and RSocket. We also introduced the specific Postgres/R2dbc features (NOTIFY
and LISTEN
) in a previous post, Postgres itself can be used as a simple message broker.
In this post, we will rebuild the Chat application as those we've created, but in this example project we will use R2dbc and the Postgres specific NOTIFY
and LISTEN
statement.
Open your browser, go to Spring Initilizr page, customize the project.
- Project: Maven (since Spring Boot 3.0, the build tool is switched to Gradle by default in this starter UI)
- Language: Java
- Dependencies: Spring Reactive Web, Spring Data R2dbc, Postgres Driver, Lombok
you can click the EXPLORE button to preview the project structure in new dialog before downloading it.
Hit the GENERATE button to export the project skeleton codes into a zip archive.
Download the archive and extract the files into your disc, and import into your IDE, eg. IDEA or VSCode with Spring Tools.
Declares a Message
class as payload format that is sent from the Postgres NOTIFY
statement and received in the LISTEN
statement.
record Message(UUID id, String body, LocalDateTime sentAt) {
}
Create a Notifier
and Listener
respectively to execute the NOTIFY
and LISTEN
statements in Postgres.
@Component
@Slf4j
@RequiredArgsConstructor
class Notifier {
private final ConnectionFactory connectionFactory;
private final ObjectMapper objectMapper;
PostgresqlConnection sender;
@PostConstruct
public void initialize() throws InterruptedException {
sender = Mono.from(connectionFactory.create())
.cast(PostgresqlConnection.class)
.block();
}
@SneakyThrows
public Mono<Void> send(CreateMessageCommand data) {
var message = new Message(UUID.randomUUID(), data.body(), LocalDateTime.now());
var messageJson = objectMapper.writeValueAsString(message);
return sender.createStatement("NOTIFY messages, '" + messageJson + "'")
.execute()
.flatMap(PostgresqlResult::getRowsUpdated)
.log("sending notification::")
.then();
}
@PreDestroy
public void destroy() {
sender.close().subscribe();
}
}
@Component
@Slf4j
@RequiredArgsConstructor
class Listener {
private final ConnectionFactory connectionFactory;
private final ObjectMapper objectMapper;
PostgresqlConnection receiver;
@PostConstruct
public void initialize() throws InterruptedException {
receiver = Mono.from(connectionFactory.create())
.cast(PostgresqlConnection.class)
.block();
receiver.createStatement("LISTEN messages")
.execute()
.flatMap(PostgresqlResult::getRowsUpdated)
.log("listen::")
.subscribe();
}
public Flux<Message> getMessages() {
return receiver.getNotifications()
.delayElements(Duration.ofMillis(100))
.log()
.map(notification -> {
log.debug("received notification: {}", notification);
try {
return objectMapper.readValue(notification.getParameter(), Message.class);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
});
}
@PreDestroy
public void destroy() {
receiver.close().subscribe();
}
}
Next we will create a frontend controller role to send a message and track the messages that is just sent from varied clients. Here, we use Server Sent Event
protocol again to emit the caught messages to clients in time.
Instead of the traditional @RestController
annotated classes, we will use the programmatic functional APIs to centralize all the handling methods in one class - MessageHandler
.
@Component
@RequiredArgsConstructor
class MessageHandler {
private final Notifier notifier;
private final Listener listener;
public Mono<ServerResponse> all(ServerRequest req) {
return ok()
.contentType(MediaType.TEXT_EVENT_STREAM)
.body(this.listener.getMessages(), Message.class);
}
public Mono<ServerResponse> create(ServerRequest req) {
return req.bodyToMono(CreateMessageCommand.class)
.flatMap(this.notifier::send)
.flatMap(__ -> ok().bodyValue("The message was sent"));
}
}
The CreateMessageCommand
is used to wrap the request body when sending a message.
Then define a RouterFunction
bean to declare the routing rules at runtime.
@Configuration
class WebConfig {
@Bean
public RouterFunction<ServerResponse> routes(MessageHandler messageHandler) {
return route()
.path("/messages", () -> route()
.nest(
path(""),
() -> route()
.GET("", messageHandler::all)
.POST("", messageHandler::create)
.build()
)
).build();
}
}
In this example project, we do not save the messages into the Postgres database, we just use it as a simple message broker. You can add this missing work yourself.
And at the same time, we only create the backend API project here, and we have no plan to repeat the frontend codes in preview posts. If you know some basic of Angular framework, and you are interested in how to connect to our backend API, copy the client codes from angular-spring-sse-sample, and experience it yourself.
Before running the application, ensure there is a running Postgres that is ready to connect.
Create a docker-compose.yaml file in the project root folder.
services:
postgres:
image: postgres
ports:
- "5432:5432"
restart: always
environment:
POSTGRES_PASSWORD: password
POSTGRES_DB: blogdb
POSTGRES_USER: user
volumes:
- ./data/postgresql:/var/lib/postgresql
- ./pg-initdb.d:/docker-entrypoint-initdb.d
Execute the following command to start a Postgres in Docker using this predefined docker compose file.
Next try to run Spring Application
class directly in IDE or execute mvn clean spring-boot:run
in the project root folder to build and run the application.
Open a terminal window, eg. Windows Terminal or CMD.
curl http://localhost:8080/messages
It will be frozen and waiting for messages.
Open another terminal window, use curl
command to send some sample messages.
> curl -X POST http://localhost:8080/messages -H "Content-Type: application/json" -d "{\"body\" :\"test\"}"
> curl -X POST http://localhost:8080/messages -H "Content-Type: application/json" -d "{\"body\" :\"test 2\"}"
> curl -X POST http://localhost:8080/messages -H "Content-Type: application/json" -d "{\"body\" :\"test 3\"}"
Switch back to the first terminal window, you will see information similar to the following.
> curl http://localhost:8080/messages
data:{"id":"8354fa38-4960-4d09-a4f1-7f0e7eab907a","body":"test","sentAt":"2023-01-14T12:09:49.1174014"}
data:{"id":"88f0d028-410b-40c6-9150-57a2228cb592","body":"test 2","sentAt":"2023-01-14T12:10:47.7973986"}
data:{"id":"8b8e59b6-40c0-4a8a-b291-6205d8a8c1f1","body":"test 3","sentAt":"2023-01-14T12:11:01.4874459"}
Get the complete sample codes from my Github.