diff --git a/connectors/order-outbox.json b/connectors/order-outbox.json index 2d4b78b4..1d40ef9c 100644 --- a/connectors/order-outbox.json +++ b/connectors/order-outbox.json @@ -11,8 +11,10 @@ "topic.prefix": "order_outbox_cdc", "plugin.name": "pgoutput", "slot.name": "order_outbox_slot", - "snapshot.mode": "schema_only", + "snapshot.mode": "no_data", + "snapshot.locking.mode": "none", "table.include.list": "public.p_order_outbox", + "tombstones.on.delete": "false", "transforms": "outbox", "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter", "transforms.outbox.table.field.event.id": "id", @@ -25,6 +27,15 @@ "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "value.converter.schemas.enable": "false", - "transforms.outbox.table.expand.json.payload": "true" + "transforms.outbox.table.expand.json.payload": "true", + "producer.acks": "all", + "producer.enable.idempotence": "true", + "producer.max.in.flight.requests.per.connection": "5", + "producer.retries": "100", + "producer.delivery.timeout.ms": "120000", + "producer.retry.backoff.ms": "500", + "producer.compression.type": "lz4", + "producer.linger.ms": "20", + "producer.batch.size": "65536" } } \ No newline at end of file diff --git a/connectors/payment-oubox.json b/connectors/payment-oubox.json index 0db158f6..0675d4a9 100644 --- a/connectors/payment-oubox.json +++ b/connectors/payment-oubox.json @@ -11,8 +11,10 @@ "topic.prefix": "payment_outbox_cdc", "plugin.name": "pgoutput", "slot.name": "payment_outbox_slot", - "snapshot.mode": "schema_only", + "snapshot.mode": "no_data", + "snapshot.locking.mode": "none", "table.include.list": "public.p_payment_outbox", + "tombstones.on.delete": "false", "transforms": "outbox", "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter", "transforms.outbox.table.field.event.id": "id", @@ -25,6 +27,15 @@ "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "value.converter.schemas.enable": "false", - "transforms.outbox.table.expand.json.payload": "true" + "transforms.outbox.table.expand.json.payload": "true", + "producer.acks": "all", + "producer.enable.idempotence": "true", + "producer.max.in.flight.requests.per.connection": "5", + "producer.retries": "100", + "producer.delivery.timeout.ms": "120000", + "producer.retry.backoff.ms": "500", + "producer.compression.type": "lz4", + "producer.linger.ms": "20", + "producer.batch.size": "65536" } } \ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yaml index c8d22b29..77c00294 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -32,7 +32,7 @@ services: # --- Infrastructure: Event Broker (Kafka) --- kafka: - image: confluentinc/cp-kafka:7.5.0 + image: apache/kafka:4.0.0 container_name: kafka-local restart: always ports: @@ -64,7 +64,7 @@ services: - spot-network connect: - image: debezium/connect:2.4 + image: quay.io/debezium/connect:3.4.0.Final container_name: connect depends_on: - kafka diff --git a/spot-order/src/main/java/com/example/Spot/order/application/service/OrderServiceImpl.java b/spot-order/src/main/java/com/example/Spot/order/application/service/OrderServiceImpl.java index 9a7ad78b..7da78b84 100644 --- a/spot-order/src/main/java/com/example/Spot/order/application/service/OrderServiceImpl.java +++ b/spot-order/src/main/java/com/example/Spot/order/application/service/OrderServiceImpl.java @@ -332,7 +332,7 @@ public void completeOrderCancellation(UUID orderId) { order.finalizeCancel(); log.info("[보상 트랜잭션 완료] 주문 ID {} 가 최종 확정되었습니다.", orderId); } else { - log.warn("⚠️ [무시됨] 주문 ID {} 는 현재 취소 대기 상태가 아닙니다. (현재 상태: {})", + log.warn("[무시됨] 주문 ID {} 는 현재 취소 대기 상태가 아닙니다. (현재 상태: {})", orderId, order.getOrderStatus()); } } @@ -404,6 +404,11 @@ public OrderResponseDto completePayment(UUID orderId) { OrderEntity order = orderRepository.findByIdWithLock(orderId) .orElseThrow(() -> new IllegalArgumentException("존재하지 않는 주문입니다.")); + if (order.getOrderStatus() == OrderStatus.PENDING) { + log.info("[중복방지] 이미 결제 처리가 완료된 주문입니다. 스킵합니다: orderId={}", orderId); + return OrderResponseDto.from(order); // 예외 없이 정상 응답을 반환하여 컨슈머가 Ack를 찍게 함 + } + log.info("결제 성공 이벤트 수신 - 주문 확정 처리 시작: orderId={}", orderId); // 1. 상태 변경(PAYMENT_PENDING -> PENDING) diff --git a/spot-order/src/main/java/com/example/Spot/order/infrastructure/listener/NotificationListener.java b/spot-order/src/main/java/com/example/Spot/order/infrastructure/listener/NotificationListener.java index c217fcd2..efaf741e 100644 --- a/spot-order/src/main/java/com/example/Spot/order/infrastructure/listener/NotificationListener.java +++ b/spot-order/src/main/java/com/example/Spot/order/infrastructure/listener/NotificationListener.java @@ -23,7 +23,7 @@ public class NotificationListener { @KafkaListener(topics = "${spring.kafka.topic.payment-auth.required}", groupId = "${spring.kafka.consumer.group.customer}") public void handleAuthRequired(String message, Acknowledgment ack) { parseEvent(message, AuthRequiredEvent.class, ack, event -> - log.info("🔔 [고객알림] 유저 {}: 결제 수단이 없어 주문이 대기 중입니다. 사유: {}", + log.info("[고객알림] 유저 {}: 결제 수단이 없어 주문이 대기 중입니다. 사유: {}", event.getUserId(), event.getMessage()) ); } @@ -32,7 +32,7 @@ public void handleAuthRequired(String message, Acknowledgment ack) { @KafkaListener(topics = "${spring.kafka.topic.order.pending}", groupId = "${spring.kafka.consumer.group.owner}") public void handleOrderPending(String message, Acknowledgment ack) { parseEvent(message, OrderPendingEvent.class, ack, event -> - log.info("🔔 [사장알림] 가게 ID {}: 새 주문이 들어왔습니다! (주문 ID: {})", + log.info("[사장알림] 가게 ID {}: 새 주문이 들어왔습니다! (주문 ID: {})", event.getStoreId(), event.getOrderId()) ); } @@ -41,7 +41,7 @@ public void handleOrderPending(String message, Acknowledgment ack) { @KafkaListener(topics = "${spring.kafka.topic.order.accepted}", groupId = "${spring.kafka.consumer.group.customer}") public void handleAcceptedCustomer(String message, Acknowledgment ack) { parseEvent(message, OrderAcceptedEvent.class, ack, event -> - log.info("🔔 [고객알림] 유저 {}: 주문이 수락되었습니다. {}분 뒤 도착 예정!", + log.info("[고객알림] 유저 {}: 주문이 수락되었습니다. {}분 뒤 도착 예정!", event.getUserId(), event.getEstimatedTime()) ); } @@ -50,7 +50,7 @@ public void handleAcceptedCustomer(String message, Acknowledgment ack) { @KafkaListener(topics = "${spring.kafka.topic.order.accepted}", groupId = "${spring.kafka.consumer.group.chef}") public void handleAcceptedChef(String message, Acknowledgment ack) { parseEvent(message, OrderAcceptedEvent.class, ack, event -> - log.info("🔔 [주방알림] 주문번호 {}: 조리 시작! (예상시간: {}분)", + log.info("[주방알림] 주문번호 {}: 조리 시작! (예상시간: {}분)", event.getOrderId(), event.getEstimatedTime()) ); } @@ -63,7 +63,7 @@ private void parseEvent(String message, Class clazz, Acknowledgment ack, ack.acknowledge(); } catch (Exception e) { - log.error("❌ [{}] 알림 파싱/처리 에러: {}", clazz.getSimpleName(), e.getMessage()); + log.error("[{}] 알림 파싱/처리 에러: {}", clazz.getSimpleName(), e.getMessage()); // 필요 시 여기서 예외를 다시 던져서 Kafka Retry 유도 가능 } } diff --git a/spot-order/src/main/java/com/example/Spot/order/infrastructure/listener/OrderEventListener.java b/spot-order/src/main/java/com/example/Spot/order/infrastructure/listener/OrderEventListener.java index 34c3f69d..73649a40 100644 --- a/spot-order/src/main/java/com/example/Spot/order/infrastructure/listener/OrderEventListener.java +++ b/spot-order/src/main/java/com/example/Spot/order/infrastructure/listener/OrderEventListener.java @@ -27,9 +27,9 @@ public void handlePaymentSucceeded(String message, Acknowledgment ack) { orderService.completePayment(event.getOrderId()); ack.acknowledge(); // 성공 시 커밋 - log.info("✅ [결제 성공] 처리 완료 및 Ack 커밋: OrderID {}", event.getOrderId()); + log.info("[결제 성공] 처리 완료 및 Ack 커밋: OrderID {}", event.getOrderId()); } catch (Exception e) { - log.error("❌ [OrderService] 결제 성공 이벤트 처리 중 에러 발생: {}", e.getMessage(), e); + log.error("[OrderEvent] 결제 성공 이벤트 처리 중 에러 발생: {}", e.getMessage(), e); } } @@ -42,9 +42,9 @@ public void handlePaymentRefunded(String message, Acknowledgment ack) { orderService.completeOrderCancellation(event.getOrderId()); ack.acknowledge(); // 성공 시 커밋 - log.info("✅ [결제 환불] 처리 완료 및 Ack 커밋: OrderID {}", event.getOrderId()); + log.info("[결제 환불] 처리 완료 및 Ack 커밋: OrderID {}", event.getOrderId()); } catch (Exception e) { - log.error("❌ [환불 완료 처리 실패] 메시지 소비 중 오류 발생: {}", e.getMessage()); + log.error("[환불 완료 처리 실패] 메시지 소비 중 오류 발생: {}", e.getMessage()); } } } diff --git a/spot-order/src/main/java/com/example/Spot/order/infrastructure/producer/OrderEventProducer.java b/spot-order/src/main/java/com/example/Spot/order/infrastructure/producer/OrderEventProducer.java index 31c73004..1fe17048 100644 --- a/spot-order/src/main/java/com/example/Spot/order/infrastructure/producer/OrderEventProducer.java +++ b/spot-order/src/main/java/com/example/Spot/order/infrastructure/producer/OrderEventProducer.java @@ -82,9 +82,9 @@ public void saveOutbox(String topic, UUID aggregateId, Object event) { .build(); outboxRepository.save(outbox); - log.info("✅[Outbox 저장 성공] topic:{}, AggregateId:{}", topic, aggregateId); + log.info("[Outbox 저장 성공] topic:{}, AggregateId:{}", topic, aggregateId); } catch (JsonProcessingException e) { - log.error("❌[Outbox 저장 실패] AggregateId={}, error={}", aggregateId, e.getMessage()); + log.error("[Outbox 저장 실패] AggregateId={}, error={}", aggregateId, e.getMessage()); throw new RuntimeException("이벤트 발행 예약 중 오류 발생", e); } } diff --git a/spot-payment/src/main/java/com/example/Spot/payments/application/service/PaymentService.java b/spot-payment/src/main/java/com/example/Spot/payments/application/service/PaymentService.java index cb52268b..299c5e2e 100644 --- a/spot-payment/src/main/java/com/example/Spot/payments/application/service/PaymentService.java +++ b/spot-payment/src/main/java/com/example/Spot/payments/application/service/PaymentService.java @@ -132,7 +132,7 @@ public boolean refundByOrderId(UUID orderId) { // 1. 주문 ID로 결제 엔티티 조회 PaymentEntity payment = paymentRepository.findActivePaymentByOrderId(orderId) .orElseThrow(() -> { - log.warn("⚠️ [환불 스킵] 취소 가능한 결제 내역이 없거나 이미 취소되었습니다. OrderID: {}", orderId); + log.warn("[환불 스킵] 취소 가능한 결제 내역이 없거나 이미 취소되었습니다. OrderID: {}", orderId); return new ResourceNotFoundException("취소 가능한 결제 내역을 찾을 수 없습니다."); }); diff --git a/spot-payment/src/main/java/com/example/Spot/payments/infrastructure/listener/PaymentListener.java b/spot-payment/src/main/java/com/example/Spot/payments/infrastructure/listener/PaymentListener.java index 3fb5a017..8ccdff1d 100644 --- a/spot-payment/src/main/java/com/example/Spot/payments/infrastructure/listener/PaymentListener.java +++ b/spot-payment/src/main/java/com/example/Spot/payments/infrastructure/listener/PaymentListener.java @@ -65,7 +65,7 @@ public void handleOrderCreated(String message, Acknowledgment ack) { paymentEventProducer.reserveAuthRequiredEvent(authEvent); } ack.acknowledge(); - log.info("✅ 주문 생성 메시지 처리 완료 및 오프셋 커밋: orderId={}", event.getOrderId()); + log.info("주문 생성 메시지 처리 완료 및 오프셋 커밋: orderId={}", event.getOrderId()); } catch (Exception e) { // 에러 처리 로직 @@ -89,10 +89,10 @@ public void handleOrderCancelled(String message, Acknowledgment ack) { paymentEventProducer.reservePaymentRefundedEvent(event.getOrderId()); } ack.acknowledge(); - log.info("✅ [결제서비스] 환불 및 보상 트랜잭션 완료: orderId={}", event.getOrderId()); + log.info("[결제서비스] 환불 및 보상 트랜잭션 완료: orderId={}", event.getOrderId()); } catch (Exception e) { - log.error("❌ [결제서비스] 환불 처리 실패: {}", e.getMessage()); + log.error("[결제서비스] 환불 처리 실패: {}", e.getMessage()); } } } diff --git a/spot-payment/src/main/java/com/example/Spot/payments/infrastructure/producer/PaymentEventProducer.java b/spot-payment/src/main/java/com/example/Spot/payments/infrastructure/producer/PaymentEventProducer.java index 43156070..510c49c0 100644 --- a/spot-payment/src/main/java/com/example/Spot/payments/infrastructure/producer/PaymentEventProducer.java +++ b/spot-payment/src/main/java/com/example/Spot/payments/infrastructure/producer/PaymentEventProducer.java @@ -58,9 +58,9 @@ private void saveOutbox(String topic, UUID aggregateId, Object event) { .build(); outboxRepository.save(outbox); - log.info("✅[Payment Outbox 저장 성공] topic:{}, AggregateId:{}", topic, aggregateId); + log.info("[Payment Outbox 저장 성공] topic:{}, AggregateId:{}", topic, aggregateId); } catch (JsonProcessingException e) { - log.error("❌[Payment Outbox 저장 실패] AggregateId={}, error={}", aggregateId, e.getMessage()); + log.error("[Payment Outbox 저장 실패] AggregateId={}, error={}", aggregateId, e.getMessage()); throw new RuntimeException("이벤트 발행 예약 중 오류 발생", e); } }