Skip to content

Commit

Permalink
New message notification
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillSmirnow committed Mar 11, 2024
1 parent 000eee1 commit 88a3529
Show file tree
Hide file tree
Showing 10 changed files with 206 additions and 1 deletion.
5 changes: 5 additions & 0 deletions crypto-messenger-desktop/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package cryptomessenger.desktop.infrastructure.client.websocket;

import cryptomessenger.desktop.infrastructure.client.websocket.handler.MessageHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.messaging.converter.StringMessageConverter;
import org.springframework.messaging.simp.stomp.StompFrameHandler;
import org.springframework.messaging.simp.stomp.StompHeaders;
import org.springframework.messaging.simp.stomp.StompSession;
import org.springframework.messaging.simp.stomp.StompSession.Subscription;
import org.springframework.messaging.simp.stomp.StompSessionHandlerAdapter;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.web.socket.messaging.WebSocketStompClient;

import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.util.Set;
import java.util.function.Consumer;

import static java.util.Collections.emptySet;
import static java.util.stream.Collectors.toSet;

@Slf4j
@Component
@RequiredArgsConstructor
public class WebSocketManager {

private final Set<MessageHandler> messageHandlers;

@Value("${app.server-base-url}")
private String serverBaseUrl;

private StompSession session;
private Set<Subscription> subscriptions = emptySet();

@EventListener(ApplicationReadyEvent.class)
private void connect() {
var client = new WebSocketStompClient(new StandardWebSocketClient());
client.setMessageConverter(new StringMessageConverter(StandardCharsets.UTF_8));
client.connectAsync(getConnectionUrl(), new SessionHandler())
.thenAccept(establishedSession -> {
log.info("Connection established");
session = establishedSession;
refreshSubscriptions();
});
}

private String getConnectionUrl() {
return serverBaseUrl.replaceFirst("http", "ws") + "/ws";
}

public void refreshSubscriptions() {
subscriptions.forEach(Subscription::unsubscribe);
subscriptions = messageHandlers.stream().map(handler -> {
var destination = handler.getDestination();
var subscription = session.subscribe(destination, new FrameHandler(handler.getHandler()));
log.info("Subscribed to {}", destination);
return subscription;
}).collect(toSet());
}

private static class SessionHandler extends StompSessionHandlerAdapter {
@Override
public void handleTransportError(StompSession session, Throwable exception) {
log.warn("Transport error: {}", exception.getMessage());
}
}

@RequiredArgsConstructor
private static class FrameHandler implements StompFrameHandler {

private final Consumer<String> consumer;

@Override
public Type getPayloadType(StompHeaders headers) {
return String.class;
}

@Override
public void handleFrame(StompHeaders headers, Object payload) {
consumer.accept((String) payload);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package cryptomessenger.desktop.infrastructure.client.websocket.handler;

import java.util.function.Consumer;

public interface MessageHandler {

String getDestination();

Consumer<String> getHandler();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package cryptomessenger.desktop.infrastructure.client.websocket.handler;

import cryptomessenger.desktop.infrastructure.client.user.UserClient;
import cryptomessenger.desktop.infrastructure.localstorage.LocalStorage;
import cryptomessenger.desktop.service.LocalStorageKeys;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import org.springframework.stereotype.Component;

import java.util.function.Consumer;

@Component
@RequiredArgsConstructor
public class NewMessageHandler implements MessageHandler {

private final LocalStorage localStorage;
private final UserClient userClient;

@Setter
private Runnable onNewMessage;

@Override
public String getDestination() {
var username = localStorage.getString(LocalStorageKeys.CURRENT_USERNAME);
var user = userClient.getByUsername(username);
return "/user/%s/messages/new".formatted(user.getId());
}

@Override
public Consumer<String> getHandler() {
return message -> {
if (onNewMessage != null) {
onNewMessage.run();
}
};
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package cryptomessenger.desktop.infrastructure.ui.controller;

import cryptomessenger.desktop.infrastructure.client.websocket.WebSocketManager;
import cryptomessenger.desktop.infrastructure.client.websocket.handler.NewMessageHandler;
import cryptomessenger.desktop.infrastructure.ui.Refreshable;
import cryptomessenger.desktop.infrastructure.ui.dialog.SendMessageDialog;
import cryptomessenger.desktop.service.message.Message;
import cryptomessenger.desktop.service.message.MessageService;
import cryptomessenger.desktop.service.user.UserService;
import cryptomessenger.desktop.utility.Player;
import cryptomessenger.desktop.utility.ThreadFactories;
import javafx.collections.FXCollections;
import javafx.event.ActionEvent;
Expand All @@ -19,6 +22,8 @@
import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Component;

import java.net.URL;
import java.util.ResourceBundle;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -31,6 +36,8 @@ public class MainSceneController implements Refreshable {
private final UserService userService;
private final MessageService messageService;
private final SendMessageDialog sendMessageDialog;
private final NewMessageHandler newMessageHandler;
private final WebSocketManager webSocketManager;

public TextField usernameField;
public Button registerButton;
Expand All @@ -41,6 +48,12 @@ public class MainSceneController implements Refreshable {

private ScheduledExecutorService executor;

@Override
public void initialize(URL location, ResourceBundle resources) {
refresh();
newMessageHandler.setOnNewMessage(() -> Player.playAudio("/sounds/new-message.mp3"));
}

@Override
public void refresh() {
var username = userService.getCurrentUsername();
Expand Down Expand Up @@ -90,6 +103,7 @@ public void onSendMessage(ActionEvent actionEvent) {

public void onRefresh(ActionEvent actionEvent) {
refresh();
webSocketManager.refreshSubscriptions();
}

public void onReply(ActionEvent actionEvent) {
Expand Down
5 changes: 5 additions & 0 deletions crypto-messenger-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package cryptomessenger.server.infrastructure.websocket;

import cryptomessenger.server.service.message.Message;
import cryptomessenger.server.service.message.NewMessageListener;
import lombok.RequiredArgsConstructor;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class NewMessageNotifier implements NewMessageListener {

private final SimpMessagingTemplate simpMessagingTemplate;

@Override
public void onNewMessage(Message message) {
var receiver = message.getReceiverId().toString();
simpMessagingTemplate.convertAndSendToUser(receiver, "/messages/new", "New message");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package cryptomessenger.server.infrastructure.websocket;

import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfiguration implements WebSocketMessageBrokerConfigurer {

@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.springframework.stereotype.Service;

import java.time.LocalDateTime;
import java.util.Set;
import java.util.UUID;

import static java.util.UUID.randomUUID;
Expand All @@ -15,6 +16,7 @@
public class MessageServiceImpl implements MessageService {

private final MessageRepository messageRepository;
private final Set<NewMessageListener> newMessageListeners;

@Override
public Message send(MessageSending sending) {
Expand All @@ -26,7 +28,9 @@ public Message send(MessageSending sending) {
.contentForSender(sending.getContentForSender())
.contentForReceiver(sending.getContentForReceiver())
.build();
return messageRepository.save(message);
messageRepository.save(message);
newMessageListeners.forEach(listener -> listener.onNewMessage(message));
return message;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package cryptomessenger.server.service.message;

public interface NewMessageListener {

void onNewMessage(Message message);
}

0 comments on commit 88a3529

Please sign in to comment.