From aafed6682e35a798ea57252c71d5850c1a5c8674 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EA=B9=80=EC=8A=B9=EC=A7=84?= Date: Thu, 10 Oct 2024 17:57:39 +0900 Subject: [PATCH] [FIX] divide retry queue and existing queue --- .../config/rabbitmq/RabbitMQConfig.java | 42 ++++++++++++-- .../exception/NotificationException.java | 10 ++++ .../messagequeue/FcmConsumer.java | 55 ++++++++++++++++--- .../global/constant/RabbitMQConstant.java | 3 + 4 files changed, 98 insertions(+), 12 deletions(-) create mode 100644 backend/src/main/java/com/twtw/backend/domain/notification/exception/NotificationException.java diff --git a/backend/src/main/java/com/twtw/backend/config/rabbitmq/RabbitMQConfig.java b/backend/src/main/java/com/twtw/backend/config/rabbitmq/RabbitMQConfig.java index 807f5b75..0cc03db4 100644 --- a/backend/src/main/java/com/twtw/backend/config/rabbitmq/RabbitMQConfig.java +++ b/backend/src/main/java/com/twtw/backend/config/rabbitmq/RabbitMQConfig.java @@ -16,6 +16,7 @@ import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; import org.springframework.retry.backoff.ExponentialBackOffPolicy; import org.springframework.retry.policy.SimpleRetryPolicy; import org.springframework.retry.support.RetryTemplate; @@ -50,10 +51,7 @@ public Binding locationBinding() { @Bean public Queue notificationQueue() { - return QueueBuilder.durable(RabbitMQConstant.NOTIFICATION_QUEUE.getName()) - .deadLetterExchange(RabbitMQConstant.DEAD_LETTER_EXCHANGE.getName()) - .deadLetterRoutingKey(RabbitMQConstant.DEAD_LETTER_ROUTING_KEY.getName()) - .build(); + return QueueBuilder.durable(RabbitMQConstant.NOTIFICATION_QUEUE.getName()).build(); } @Bean @@ -68,6 +66,26 @@ public Binding notificationBinding() { .with(RabbitMQConstant.NOTIFICATION_ROUTING_KEY.getName()); } + @Bean + public Queue notificationRetryQueue() { + return QueueBuilder.durable(RabbitMQConstant.NOTIFICATION_RETRY_QUEUE.getName()) + .deadLetterExchange(RabbitMQConstant.DEAD_LETTER_EXCHANGE.getName()) + .deadLetterRoutingKey(RabbitMQConstant.DEAD_LETTER_ROUTING_KEY.getName()) + .build(); + } + + @Bean + public DirectExchange notificationRetryTopicExchange() { + return new DirectExchange(RabbitMQConstant.NOTIFICATION_RETRY_EXCHANGE.getName()); + } + + @Bean + public Binding notificationRetryBinding() { + return BindingBuilder.bind(notificationRetryQueue()) + .to(notificationRetryTopicExchange()) + .with(RabbitMQConstant.NOTIFICATION_RETRY_ROUTING_KEY.getName()); + } + @Bean public Queue deadLetterQueue() { return QueueBuilder.durable(RabbitMQConstant.DEAD_LETTER_QUEUE.getName()).build(); @@ -119,6 +137,10 @@ public RabbitAdmin rabbitAdmin(final ConnectionFactory connectionFactory) { rabbitAdmin.declareExchange(notificationTopicExchange()); rabbitAdmin.declareBinding(notificationBinding()); + rabbitAdmin.declareQueue(notificationRetryQueue()); + rabbitAdmin.declareExchange(notificationRetryTopicExchange()); + rabbitAdmin.declareBinding(notificationRetryBinding()); + rabbitAdmin.declareQueue(deadLetterQueue()); rabbitAdmin.declareExchange(deadLetterExchange()); rabbitAdmin.declareBinding(deadLetterBinding()); @@ -126,6 +148,7 @@ public RabbitAdmin rabbitAdmin(final ConnectionFactory connectionFactory) { return rabbitAdmin; } + @Primary @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory( ConnectionFactory connectionFactory) { @@ -134,6 +157,17 @@ public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory( factory.setConnectionFactory(connectionFactory); factory.setConcurrentConsumers(20); factory.setMaxConcurrentConsumers(200); + return factory; + } + + @Bean + public SimpleRabbitListenerContainerFactory retryRabbitListenerContainerFactory( + ConnectionFactory connectionFactory) { + final SimpleRabbitListenerContainerFactory factory = + new SimpleRabbitListenerContainerFactory(); + factory.setConnectionFactory(connectionFactory); + factory.setConcurrentConsumers(5); + factory.setMaxConcurrentConsumers(50); factory.setRetryTemplate(retryTemplate()); return factory; } diff --git a/backend/src/main/java/com/twtw/backend/domain/notification/exception/NotificationException.java b/backend/src/main/java/com/twtw/backend/domain/notification/exception/NotificationException.java new file mode 100644 index 00000000..e8466e3b --- /dev/null +++ b/backend/src/main/java/com/twtw/backend/domain/notification/exception/NotificationException.java @@ -0,0 +1,10 @@ +package com.twtw.backend.domain.notification.exception; + +public class NotificationException extends IllegalArgumentException { + + private static final String MESSAGE = "잘못된 알림이 요청됐습니다."; + + public NotificationException() { + super(MESSAGE); + } +} diff --git a/backend/src/main/java/com/twtw/backend/domain/notification/messagequeue/FcmConsumer.java b/backend/src/main/java/com/twtw/backend/domain/notification/messagequeue/FcmConsumer.java index 21cb1f96..f53531ec 100644 --- a/backend/src/main/java/com/twtw/backend/domain/notification/messagequeue/FcmConsumer.java +++ b/backend/src/main/java/com/twtw/backend/domain/notification/messagequeue/FcmConsumer.java @@ -5,11 +5,14 @@ import com.rabbitmq.client.Channel; import com.twtw.backend.domain.notification.dto.NotificationRequest; import com.twtw.backend.domain.notification.entity.Notification; +import com.twtw.backend.domain.notification.exception.NotificationException; import com.twtw.backend.domain.notification.repository.NotificationRepository; +import com.twtw.backend.global.constant.RabbitMQConstant; import lombok.RequiredArgsConstructor; import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; @@ -22,25 +25,61 @@ @Component @RequiredArgsConstructor public class FcmConsumer { + private final FirebaseMessaging firebaseMessaging; private final NotificationRepository notificationRepository; + private final RabbitTemplate rabbitTemplate; - @Transactional - @RabbitListener(queues = "notification.queue") + @Transactional(noRollbackFor = {NotificationException.class}) + @RabbitListener( + queues = "notification.queue", + containerFactory = "rabbitListenerContainerFactory") public void sendNotification( final NotificationRequest request, final Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) final long tag) - throws FirebaseMessagingException, IOException { - firebaseMessaging.send(request.toMessage()); + throws IOException { + try { + send(request); + channel.basicAck(tag, false); + } catch (final NotificationException e) { + channel.basicNack(tag, false, false); + } catch (final Exception e) { + rabbitTemplate.convertAndSend( + RabbitMQConstant.NOTIFICATION_RETRY_EXCHANGE.getName(), + RabbitMQConstant.NOTIFICATION_RETRY_ROUTING_KEY.getName(), + request); + channel.basicAck(tag, false); + } + } + @Transactional(noRollbackFor = {NotificationException.class}) + @RabbitListener( + queues = "notification.retry.queue", + containerFactory = "retryRabbitListenerContainerFactory") + public void retrySendNotification( + final NotificationRequest request, + final Channel channel, + @Header(AmqpHeaders.DELIVERY_TAG) final long tag) + throws IOException { + try { + send(request); + channel.basicAck(tag, false); + } catch (final NotificationException e) { + channel.basicNack(tag, false, false); + } catch (final Exception e) { + channel.basicNack(tag, false, true); + } + } + + private void send(final NotificationRequest request) throws FirebaseMessagingException { final Optional notification = notificationRepository.findById(UUID.fromString(request.getNotificationId())); - if (notification.isPresent()) { - notification.get().complete(); - return; + if (notification.isEmpty()) { + throw new NotificationException(); } - channel.basicNack(tag, false, false); + firebaseMessaging.send(request.toMessage()); + notification.get().complete(); } } diff --git a/backend/src/main/java/com/twtw/backend/global/constant/RabbitMQConstant.java b/backend/src/main/java/com/twtw/backend/global/constant/RabbitMQConstant.java index a072e6a3..e058fb05 100644 --- a/backend/src/main/java/com/twtw/backend/global/constant/RabbitMQConstant.java +++ b/backend/src/main/java/com/twtw/backend/global/constant/RabbitMQConstant.java @@ -13,6 +13,9 @@ public enum RabbitMQConstant { NOTIFICATION_QUEUE("notification.queue"), NOTIFICATION_EXCHANGE("notification"), NOTIFICATION_ROUTING_KEY("notification"), + NOTIFICATION_RETRY_QUEUE("notification.retry.queue"), + NOTIFICATION_RETRY_EXCHANGE("notification.retry"), + NOTIFICATION_RETRY_ROUTING_KEY("notification.retry"), DEAD_LETTER_QUEUE("deadletter.queue"), DEAD_LETTER_EXCHANGE("deadletter"), DEAD_LETTER_ROUTING_KEY("deadletter");