diff --git a/docker-compose/definitions.json b/docker-compose/definitions.json index d8a7c8d1..cdafff68 100644 --- a/docker-compose/definitions.json +++ b/docker-compose/definitions.json @@ -29,6 +29,36 @@ "x-dead-letter-routing-key": "email.deadLetter" } }, + { + "name": "email.send.wait.5s", + "vhost": "/", + "durable": true, + "arguments": { + "x-message-ttl": 5000, + "x-dead-letter-exchange": "", + "x-dead-letter-routing-key": "email.send.queue" + } + }, + { + "name": "email.send.wait.10s", + "vhost": "/", + "durable": true, + "arguments": { + "x-message-ttl": 10000, + "x-dead-letter-exchange": "", + "x-dead-letter-routing-key": "email.send.queue" + } + }, + { + "name": "email.send.wait.20s", + "vhost": "/", + "durable": true, + "arguments": { + "x-message-ttl": 20000, + "x-dead-letter-exchange": "", + "x-dead-letter-routing-key": "email.send.queue" + } + }, { "name": "crawling.full.queue", "vhost": "/", diff --git a/email-worker/src/email/email.consumer.ts b/email-worker/src/email/email.consumer.ts index bae25f9f..abc19422 100644 --- a/email-worker/src/email/email.consumer.ts +++ b/email-worker/src/email/email.consumer.ts @@ -3,8 +3,9 @@ import { inject, injectable } from 'tsyringe'; import { DEPENDENCY_SYMBOLS } from '../types/dependency-symbols'; import { EmailService } from './email.service'; import logger from '../logger'; -import { RMQ_QUEUES } from '../rabbitmq/rabbitmq.constant'; +import { RETRY_CONFIG, RMQ_QUEUES } from '../rabbitmq/rabbitmq.constant'; import { EmailPayload, EmailPayloadConstant } from '../types/types'; +import { Options } from 'amqplib/properties'; @injectable() export class EmailConsumer { @@ -25,7 +26,7 @@ export class EmailConsumer { this.consumerTag = await this.rabbitmqService.consumeMessage( RMQ_QUEUES.EMAIL_SEND, - async (payload: EmailPayload) => { + async (payload: EmailPayload, retryCount: number) => { if (this.shuttingDownFlag) { logger.warn('[EmailConsumer] Shutdown 중, 메시지 처리 건너뜀'); throw new Error('SHUTDOWN_IN_PROGRESS'); @@ -39,6 +40,8 @@ export class EmailConsumer { try { await this.handleEmailByType(payload); logger.info('[EmailConsumer] 이메일 전송 완료'); + } catch (error) { + await this.handleEmailByError(error, payload, retryCount); } finally { this.pendingTasks--; logger.info(`[EmailConsumer] 남은 작업: ${this.pendingTasks}`); @@ -120,4 +123,117 @@ export class EmailConsumer { }, 10000); }); } + + async handleEmailByError( + error: any, + payload: EmailPayload, + retryCount: number, + ) { + const stringifiedMessage = JSON.stringify(payload); + const retryOptions: Options.Publish = { + headers: { + 'x-retry-count': retryCount + 1, + }, + }; + + // Node.js 네트워크 레벨의 에러 + const isNetworkError = + error.code === 'ESOCKET' || + error.message?.includes('ECONNREFUSED') || + error.message?.includes('ETIMEDOUT') || + error.message?.includes('Unexpected socket close'); + if (isNetworkError) { + if (retryCount >= RETRY_CONFIG.MAX_RETRY) { + await this.rabbitmqService.sendMessageToQueue( + RMQ_QUEUES.EMAIL_DEAD_LETTER, + stringifiedMessage, + { + headers: { + 'x-retry-count': retryCount, + 'x-error-code': error.code || 'NONE', + 'x-error-message': error.message, + 'x-failed-at': new Date().toISOString(), + 'x-failure-type': 'MAX_RETRIES_EXCEEDED', + }, + }, + ); + return; + } + + await this.rabbitmqService.sendMessageToQueue( + RETRY_CONFIG.WAITING_QUEUE[retryCount], + stringifiedMessage, + retryOptions, + ); + return; + } + // SMTP 레벨의 에러 + if (error.responseCode) { + if (error.responseCode >= 500) { + await this.rabbitmqService.sendMessageToQueue( + RMQ_QUEUES.EMAIL_DEAD_LETTER, + stringifiedMessage, + { + headers: { + 'x-retry-count': retryCount, + 'x-error-code': error.code || 'NONE', + 'x-response-code': error.responseCode, + 'x-error-message': error.message, + 'x-failed-at': new Date().toISOString(), + 'x-failure-type': 'SMTP_PERMANENT_FAILURE', + }, + }, + ); + return; + } + + if (error.responseCode >= 400) { + if (retryCount >= RETRY_CONFIG.MAX_RETRY) { + await this.rabbitmqService.sendMessageToQueue( + RMQ_QUEUES.EMAIL_DEAD_LETTER, + stringifiedMessage, + { + headers: { + 'x-retry-count': retryCount, + 'x-error-code': error.code || 'NONE', + 'x-response-code': error.responseCode, + 'x-error-message': error.message, + 'x-failed-at': new Date().toISOString(), + 'x-failure-type': 'MAX_RETRIES_EXCEEDED', + }, + }, + ); + return; + } + await this.rabbitmqService.sendMessageToQueue( + RETRY_CONFIG.WAITING_QUEUE[retryCount], + stringifiedMessage, + retryOptions, + ); + return; + } + } + + logger.error( + `[EmailConsumer] 알 수 없는 에러로 DLQ 메시지 발행 + 오류 메시지: ${error.message} + 스택 트레이스: ${error.stack}`, + ); + // 즉시 DLQ로 메시지 발행 + // todo: Slack 이나 Discord 연동을 통한 새로운 에러에 대한 알림 구현 + await this.rabbitmqService.sendMessageToQueue( + RMQ_QUEUES.EMAIL_DEAD_LETTER, + stringifiedMessage, + { + headers: { + 'x-retry-count': retryCount, + 'x-error-code': error.code || 'UNKNOWN', + 'x-error-message': error.message || 'Unknown error', + 'x-error-stack': error.stack, + 'x-failed-at': new Date().toISOString(), + 'x-failure-type': 'UNKNOWN_ERROR', + }, + }, + ); + } } diff --git a/email-worker/src/email/email.service.ts b/email-worker/src/email/email.service.ts index 4f1f6929..1d0c46a8 100644 --- a/email-worker/src/email/email.service.ts +++ b/email-worker/src/email/email.service.ts @@ -48,7 +48,12 @@ export class EmailService { await this.transporter.sendMail(mailOptions); logger.info(`${mailOptions.to} 이메일 전송 성공`); } catch (error) { - logger.error(`${mailOptions.to} 이메일 전송 실패: ${error}`); + logger.error( + `${mailOptions.to} 이메일 전송 실패 + 오류 메시지: ${error.message} + 스택 트레이스: ${error.stack}`, + ); + throw error; } } diff --git a/email-worker/src/rabbitmq/rabbitmq.constant.ts b/email-worker/src/rabbitmq/rabbitmq.constant.ts index 237caf44..90def841 100644 --- a/email-worker/src/rabbitmq/rabbitmq.constant.ts +++ b/email-worker/src/rabbitmq/rabbitmq.constant.ts @@ -2,24 +2,36 @@ export const RMQ_EXCHANGES = { EMAIL: 'EmailExchange', CRAWLING: 'CrawlingExchange', DEAD_LETTER: 'DeadLetterExchange', -}; +} as const; export const RMQ_QUEUES = { EMAIL_SEND: 'email.send.queue', CRAWLING_FULL: 'crawling.full.queue', + EMAIL_SEND_WAIT_5S: 'email.send.wait.5s', + EMAIL_SEND_WAIT_10S: 'email.send.wait.10s', + EMAIL_SEND_WAIT_20S: 'email.send.wait.20s', EMAIL_DEAD_LETTER: 'email.deadLetter.queue', CRAWLING_FULL_DEAD_LETTER: 'crawling.full.deadLetter.queue', -}; +} as const; export const RMQ_ROUTING_KEYS = { EMAIL_SEND: 'email.send', CRAWLING_FULL: 'crawling.full', EMAIL_DEAD_LETTER: 'email.deadLetter', CRAWLING_FULL_DEAD_LETTER: 'crawling.full.deadLetter', -}; +} as const; export const RMQ_EXCHANGE_TYPE = { DIRECT: 'direct', TOPIC: 'topic', FANOUT: 'fanout', -}; +} as const; + +export const RETRY_CONFIG = { + MAX_RETRY: 3, + WAITING_QUEUE: [ + RMQ_QUEUES.EMAIL_SEND_WAIT_5S, + RMQ_QUEUES.EMAIL_SEND_WAIT_10S, + RMQ_QUEUES.EMAIL_SEND_WAIT_20S, + ], +} as const; diff --git a/email-worker/src/rabbitmq/rabbitmq.service.ts b/email-worker/src/rabbitmq/rabbitmq.service.ts index ab1d7f76..ca7dbabc 100644 --- a/email-worker/src/rabbitmq/rabbitmq.service.ts +++ b/email-worker/src/rabbitmq/rabbitmq.service.ts @@ -1,6 +1,6 @@ import { inject, injectable } from 'tsyringe'; import { RabbitMQManager } from './rabbitmq.manager'; -import { ConsumeMessage } from 'amqplib/properties'; +import { Options } from 'amqplib/properties'; import { DEPENDENCY_SYMBOLS } from '../types/dependency-symbols'; import logger from '../logger'; @@ -20,9 +20,18 @@ export class RabbitmqService { channel.publish(exchange, routingKey, Buffer.from(message)); } + async sendMessageToQueue( + queue: string, + message: string, + options?: Options.Publish, + ) { + const channel = await this.rabbitMQManager.getChannel(); + channel.sendToQueue(queue, Buffer.from(message), options); + } + async consumeMessage( queue: string, - onMessage: (payload: T) => void | Promise, + onMessage: (payload: T, retryCount: number) => void | Promise, ) { const channel = await this.rabbitMQManager.getChannel(); const { consumerTag } = await channel.consume(queue, async (message) => { @@ -30,7 +39,8 @@ export class RabbitmqService { if (!message) return; const parsedMessage = JSON.parse(message.content.toString()) as T; - await onMessage(parsedMessage); + const retryCount = message.properties.headers?.['x-retry-count'] || 0; + await onMessage(parsedMessage, retryCount); channel.ack(message); } catch (error) {