Skip to content

Commit

Permalink
[FIX] divide retry queue and existing queue
Browse files Browse the repository at this point in the history
  • Loading branch information
ohksj77 committed Oct 10, 2024
1 parent bf07e52 commit aafed66
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
Expand Down Expand Up @@ -50,10 +51,7 @@ public Binding locationBinding() {

@Bean
public Queue notificationQueue() {
return QueueBuilder.durable(RabbitMQConstant.NOTIFICATION_QUEUE.getName())
.deadLetterExchange(RabbitMQConstant.DEAD_LETTER_EXCHANGE.getName())
.deadLetterRoutingKey(RabbitMQConstant.DEAD_LETTER_ROUTING_KEY.getName())
.build();
return QueueBuilder.durable(RabbitMQConstant.NOTIFICATION_QUEUE.getName()).build();
}

@Bean
Expand All @@ -68,6 +66,26 @@ public Binding notificationBinding() {
.with(RabbitMQConstant.NOTIFICATION_ROUTING_KEY.getName());
}

@Bean
public Queue notificationRetryQueue() {
return QueueBuilder.durable(RabbitMQConstant.NOTIFICATION_RETRY_QUEUE.getName())
.deadLetterExchange(RabbitMQConstant.DEAD_LETTER_EXCHANGE.getName())
.deadLetterRoutingKey(RabbitMQConstant.DEAD_LETTER_ROUTING_KEY.getName())
.build();
}

@Bean
public DirectExchange notificationRetryTopicExchange() {
return new DirectExchange(RabbitMQConstant.NOTIFICATION_RETRY_EXCHANGE.getName());
}

@Bean
public Binding notificationRetryBinding() {
return BindingBuilder.bind(notificationRetryQueue())
.to(notificationRetryTopicExchange())
.with(RabbitMQConstant.NOTIFICATION_RETRY_ROUTING_KEY.getName());
}

@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable(RabbitMQConstant.DEAD_LETTER_QUEUE.getName()).build();
Expand Down Expand Up @@ -119,13 +137,18 @@ public RabbitAdmin rabbitAdmin(final ConnectionFactory connectionFactory) {
rabbitAdmin.declareExchange(notificationTopicExchange());
rabbitAdmin.declareBinding(notificationBinding());

rabbitAdmin.declareQueue(notificationRetryQueue());
rabbitAdmin.declareExchange(notificationRetryTopicExchange());
rabbitAdmin.declareBinding(notificationRetryBinding());

rabbitAdmin.declareQueue(deadLetterQueue());
rabbitAdmin.declareExchange(deadLetterExchange());
rabbitAdmin.declareBinding(deadLetterBinding());

return rabbitAdmin;
}

@Primary
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory) {
Expand All @@ -134,6 +157,17 @@ public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
factory.setConnectionFactory(connectionFactory);
factory.setConcurrentConsumers(20);
factory.setMaxConcurrentConsumers(200);
return factory;
}

@Bean
public SimpleRabbitListenerContainerFactory retryRabbitListenerContainerFactory(
ConnectionFactory connectionFactory) {
final SimpleRabbitListenerContainerFactory factory =
new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrentConsumers(5);
factory.setMaxConcurrentConsumers(50);
factory.setRetryTemplate(retryTemplate());
return factory;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.twtw.backend.domain.notification.exception;

public class NotificationException extends IllegalArgumentException {

private static final String MESSAGE = "잘못된 알림이 요청됐습니다.";

public NotificationException() {
super(MESSAGE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@
import com.rabbitmq.client.Channel;
import com.twtw.backend.domain.notification.dto.NotificationRequest;
import com.twtw.backend.domain.notification.entity.Notification;
import com.twtw.backend.domain.notification.exception.NotificationException;
import com.twtw.backend.domain.notification.repository.NotificationRepository;
import com.twtw.backend.global.constant.RabbitMQConstant;

import lombok.RequiredArgsConstructor;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
Expand All @@ -22,25 +25,61 @@
@Component
@RequiredArgsConstructor
public class FcmConsumer {

private final FirebaseMessaging firebaseMessaging;
private final NotificationRepository notificationRepository;
private final RabbitTemplate rabbitTemplate;

@Transactional
@RabbitListener(queues = "notification.queue")
@Transactional(noRollbackFor = {NotificationException.class})
@RabbitListener(
queues = "notification.queue",
containerFactory = "rabbitListenerContainerFactory")
public void sendNotification(
final NotificationRequest request,
final Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) final long tag)
throws FirebaseMessagingException, IOException {
firebaseMessaging.send(request.toMessage());
throws IOException {
try {
send(request);
channel.basicAck(tag, false);
} catch (final NotificationException e) {
channel.basicNack(tag, false, false);
} catch (final Exception e) {
rabbitTemplate.convertAndSend(
RabbitMQConstant.NOTIFICATION_RETRY_EXCHANGE.getName(),
RabbitMQConstant.NOTIFICATION_RETRY_ROUTING_KEY.getName(),
request);
channel.basicAck(tag, false);
}
}

@Transactional(noRollbackFor = {NotificationException.class})
@RabbitListener(
queues = "notification.retry.queue",
containerFactory = "retryRabbitListenerContainerFactory")
public void retrySendNotification(
final NotificationRequest request,
final Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) final long tag)
throws IOException {
try {
send(request);
channel.basicAck(tag, false);
} catch (final NotificationException e) {
channel.basicNack(tag, false, false);
} catch (final Exception e) {
channel.basicNack(tag, false, true);
}
}

private void send(final NotificationRequest request) throws FirebaseMessagingException {
final Optional<Notification> notification =
notificationRepository.findById(UUID.fromString(request.getNotificationId()));

if (notification.isPresent()) {
notification.get().complete();
return;
if (notification.isEmpty()) {
throw new NotificationException();
}
channel.basicNack(tag, false, false);
firebaseMessaging.send(request.toMessage());
notification.get().complete();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ public enum RabbitMQConstant {
NOTIFICATION_QUEUE("notification.queue"),
NOTIFICATION_EXCHANGE("notification"),
NOTIFICATION_ROUTING_KEY("notification"),
NOTIFICATION_RETRY_QUEUE("notification.retry.queue"),
NOTIFICATION_RETRY_EXCHANGE("notification.retry"),
NOTIFICATION_RETRY_ROUTING_KEY("notification.retry"),
DEAD_LETTER_QUEUE("deadletter.queue"),
DEAD_LETTER_EXCHANGE("deadletter"),
DEAD_LETTER_ROUTING_KEY("deadletter");
Expand Down

0 comments on commit aafed66

Please sign in to comment.