Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions connectors/order-outbox.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
"topic.prefix": "order_outbox_cdc",
"plugin.name": "pgoutput",
"slot.name": "order_outbox_slot",
"snapshot.mode": "schema_only",
"snapshot.mode": "no_data",
"snapshot.locking.mode": "none",
"table.include.list": "public.p_order_outbox",
"tombstones.on.delete": "false",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.field.event.id": "id",
Expand All @@ -25,6 +27,15 @@
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"transforms.outbox.table.expand.json.payload": "true"
"transforms.outbox.table.expand.json.payload": "true",
"producer.acks": "all",
"producer.enable.idempotence": "true",
"producer.max.in.flight.requests.per.connection": "5",
"producer.retries": "100",
"producer.delivery.timeout.ms": "120000",
"producer.retry.backoff.ms": "500",
"producer.compression.type": "lz4",
"producer.linger.ms": "20",
"producer.batch.size": "65536"
}
}
15 changes: 13 additions & 2 deletions connectors/payment-oubox.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
"topic.prefix": "payment_outbox_cdc",
"plugin.name": "pgoutput",
"slot.name": "payment_outbox_slot",
"snapshot.mode": "schema_only",
"snapshot.mode": "no_data",
"snapshot.locking.mode": "none",
"table.include.list": "public.p_payment_outbox",
"tombstones.on.delete": "false",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.field.event.id": "id",
Expand All @@ -25,6 +27,15 @@
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"transforms.outbox.table.expand.json.payload": "true"
"transforms.outbox.table.expand.json.payload": "true",
"producer.acks": "all",
"producer.enable.idempotence": "true",
"producer.max.in.flight.requests.per.connection": "5",
"producer.retries": "100",
"producer.delivery.timeout.ms": "120000",
"producer.retry.backoff.ms": "500",
"producer.compression.type": "lz4",
"producer.linger.ms": "20",
"producer.batch.size": "65536"
}
}
4 changes: 2 additions & 2 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ services:

# --- Infrastructure: Event Broker (Kafka) ---
kafka:
image: confluentinc/cp-kafka:7.5.0
image: apache/kafka:4.0.0
container_name: kafka-local
restart: always
ports:
Expand Down Expand Up @@ -64,7 +64,7 @@ services:
- spot-network

connect:
image: debezium/connect:2.4
image: quay.io/debezium/connect:3.4.0.Final
container_name: connect
depends_on:
- kafka
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ public void completeOrderCancellation(UUID orderId) {
order.finalizeCancel();
log.info("[보상 트랜잭션 완료] 주문 ID {} 가 최종 확정되었습니다.", orderId);
} else {
log.warn("⚠️ [무시됨] 주문 ID {} 는 현재 취소 대기 상태가 아닙니다. (현재 상태: {})",
log.warn("[무시됨] 주문 ID {} 는 현재 취소 대기 상태가 아닙니다. (현재 상태: {})",
orderId, order.getOrderStatus());
}
}
Expand Down Expand Up @@ -404,6 +404,11 @@ public OrderResponseDto completePayment(UUID orderId) {
OrderEntity order = orderRepository.findByIdWithLock(orderId)
.orElseThrow(() -> new IllegalArgumentException("존재하지 않는 주문입니다."));

if (order.getOrderStatus() == OrderStatus.PENDING) {
log.info("[중복방지] 이미 결제 처리가 완료된 주문입니다. 스킵합니다: orderId={}", orderId);
return OrderResponseDto.from(order); // 예외 없이 정상 응답을 반환하여 컨슈머가 Ack를 찍게 함
}

log.info("결제 성공 이벤트 수신 - 주문 확정 처리 시작: orderId={}", orderId);

// 1. 상태 변경(PAYMENT_PENDING -> PENDING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class NotificationListener {
@KafkaListener(topics = "${spring.kafka.topic.payment-auth.required}", groupId = "${spring.kafka.consumer.group.customer}")
public void handleAuthRequired(String message, Acknowledgment ack) {
parseEvent(message, AuthRequiredEvent.class, ack, event ->
log.info("🔔 [고객알림] 유저 {}: 결제 수단이 없어 주문이 대기 중입니다. 사유: {}",
log.info("[고객알림] 유저 {}: 결제 수단이 없어 주문이 대기 중입니다. 사유: {}",
event.getUserId(), event.getMessage())
);
}
Expand All @@ -32,7 +32,7 @@ public void handleAuthRequired(String message, Acknowledgment ack) {
@KafkaListener(topics = "${spring.kafka.topic.order.pending}", groupId = "${spring.kafka.consumer.group.owner}")
public void handleOrderPending(String message, Acknowledgment ack) {
parseEvent(message, OrderPendingEvent.class, ack, event ->
log.info("🔔 [사장알림] 가게 ID {}: 새 주문이 들어왔습니다! (주문 ID: {})",
log.info("[사장알림] 가게 ID {}: 새 주문이 들어왔습니다! (주문 ID: {})",
event.getStoreId(), event.getOrderId())
);
}
Expand All @@ -41,7 +41,7 @@ public void handleOrderPending(String message, Acknowledgment ack) {
@KafkaListener(topics = "${spring.kafka.topic.order.accepted}", groupId = "${spring.kafka.consumer.group.customer}")
public void handleAcceptedCustomer(String message, Acknowledgment ack) {
parseEvent(message, OrderAcceptedEvent.class, ack, event ->
log.info("🔔 [고객알림] 유저 {}: 주문이 수락되었습니다. {}분 뒤 도착 예정!",
log.info("[고객알림] 유저 {}: 주문이 수락되었습니다. {}분 뒤 도착 예정!",
event.getUserId(), event.getEstimatedTime())
);
}
Expand All @@ -50,7 +50,7 @@ public void handleAcceptedCustomer(String message, Acknowledgment ack) {
@KafkaListener(topics = "${spring.kafka.topic.order.accepted}", groupId = "${spring.kafka.consumer.group.chef}")
public void handleAcceptedChef(String message, Acknowledgment ack) {
parseEvent(message, OrderAcceptedEvent.class, ack, event ->
log.info("🔔 [주방알림] 주문번호 {}: 조리 시작! (예상시간: {}분)",
log.info("[주방알림] 주문번호 {}: 조리 시작! (예상시간: {}분)",
event.getOrderId(), event.getEstimatedTime())
);
}
Expand All @@ -63,7 +63,7 @@ private <T> void parseEvent(String message, Class<T> clazz, Acknowledgment ack,
ack.acknowledge();

} catch (Exception e) {
log.error("[{}] 알림 파싱/처리 에러: {}", clazz.getSimpleName(), e.getMessage());
log.error("[{}] 알림 파싱/처리 에러: {}", clazz.getSimpleName(), e.getMessage());
// 필요 시 여기서 예외를 다시 던져서 Kafka Retry 유도 가능
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ public void handlePaymentSucceeded(String message, Acknowledgment ack) {
orderService.completePayment(event.getOrderId());

ack.acknowledge(); // 성공 시 커밋
log.info("[결제 성공] 처리 완료 및 Ack 커밋: OrderID {}", event.getOrderId());
log.info("[결제 성공] 처리 완료 및 Ack 커밋: OrderID {}", event.getOrderId());
} catch (Exception e) {
log.error("❌ [OrderService] 결제 성공 이벤트 처리 중 에러 발생: {}", e.getMessage(), e);
log.error("[OrderEvent] 결제 성공 이벤트 처리 중 에러 발생: {}", e.getMessage(), e);
}
}

Expand All @@ -42,9 +42,9 @@ public void handlePaymentRefunded(String message, Acknowledgment ack) {
orderService.completeOrderCancellation(event.getOrderId());

ack.acknowledge(); // 성공 시 커밋
log.info("[결제 환불] 처리 완료 및 Ack 커밋: OrderID {}", event.getOrderId());
log.info("[결제 환불] 처리 완료 및 Ack 커밋: OrderID {}", event.getOrderId());
} catch (Exception e) {
log.error("[환불 완료 처리 실패] 메시지 소비 중 오류 발생: {}", e.getMessage());
log.error("[환불 완료 처리 실패] 메시지 소비 중 오류 발생: {}", e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ public void saveOutbox(String topic, UUID aggregateId, Object event) {
.build();

outboxRepository.save(outbox);
log.info("[Outbox 저장 성공] topic:{}, AggregateId:{}", topic, aggregateId);
log.info("[Outbox 저장 성공] topic:{}, AggregateId:{}", topic, aggregateId);
} catch (JsonProcessingException e) {
log.error("[Outbox 저장 실패] AggregateId={}, error={}", aggregateId, e.getMessage());
log.error("[Outbox 저장 실패] AggregateId={}, error={}", aggregateId, e.getMessage());
throw new RuntimeException("이벤트 발행 예약 중 오류 발생", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public boolean refundByOrderId(UUID orderId) {
// 1. 주문 ID로 결제 엔티티 조회
PaymentEntity payment = paymentRepository.findActivePaymentByOrderId(orderId)
.orElseThrow(() -> {
log.warn("⚠️ [환불 스킵] 취소 가능한 결제 내역이 없거나 이미 취소되었습니다. OrderID: {}", orderId);
log.warn("[환불 스킵] 취소 가능한 결제 내역이 없거나 이미 취소되었습니다. OrderID: {}", orderId);
return new ResourceNotFoundException("취소 가능한 결제 내역을 찾을 수 없습니다.");
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void handleOrderCreated(String message, Acknowledgment ack) {
paymentEventProducer.reserveAuthRequiredEvent(authEvent);
}
ack.acknowledge();
log.info("주문 생성 메시지 처리 완료 및 오프셋 커밋: orderId={}", event.getOrderId());
log.info("주문 생성 메시지 처리 완료 및 오프셋 커밋: orderId={}", event.getOrderId());

} catch (Exception e) {
// 에러 처리 로직
Expand All @@ -89,10 +89,10 @@ public void handleOrderCancelled(String message, Acknowledgment ack) {
paymentEventProducer.reservePaymentRefundedEvent(event.getOrderId());
}
ack.acknowledge();
log.info("[결제서비스] 환불 및 보상 트랜잭션 완료: orderId={}", event.getOrderId());
log.info("[결제서비스] 환불 및 보상 트랜잭션 완료: orderId={}", event.getOrderId());

} catch (Exception e) {
log.error("[결제서비스] 환불 처리 실패: {}", e.getMessage());
log.error("[결제서비스] 환불 처리 실패: {}", e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ private void saveOutbox(String topic, UUID aggregateId, Object event) {
.build();

outboxRepository.save(outbox);
log.info("[Payment Outbox 저장 성공] topic:{}, AggregateId:{}", topic, aggregateId);
log.info("[Payment Outbox 저장 성공] topic:{}, AggregateId:{}", topic, aggregateId);
} catch (JsonProcessingException e) {
log.error("[Payment Outbox 저장 실패] AggregateId={}, error={}", aggregateId, e.getMessage());
log.error("[Payment Outbox 저장 실패] AggregateId={}, error={}", aggregateId, e.getMessage());
throw new RuntimeException("이벤트 발행 예약 중 오류 발생", e);
}
}
Expand Down
Loading