diff --git a/config/common.yml b/config/common.yml index 1d7dd461..badadad7 100644 --- a/config/common.yml +++ b/config/common.yml @@ -74,12 +74,7 @@ toss: customerKey: ${spring.toss.customerKey:${TOSS_CUSTOMER_KEY}} secretKey: ${spring.toss.secretKey:${TOSS_SECRET_KEY}} timeout: 10 - -logging: - level: - org.apache.kafka: WARN - com.zaxxer.hikari: WARN - + management: endpoints: web: diff --git a/connectors/order-outbox.json b/connectors/order-outbox.json new file mode 100644 index 00000000..2d4b78b4 --- /dev/null +++ b/connectors/order-outbox.json @@ -0,0 +1,30 @@ +{ + "name": "order-outbox-connector", + "config": { + "connector.class": "io.debezium.connector.postgresql.PostgresConnector", + "tasks.max": "1", + "database.hostname": "${env:DB_HOST}", + "database.port": "5432", + "database.user": "${env:SPRING_DATASOURCE_USERNAME}", + "database.password": "${env:SPRING_DATASOURCE_PASSWORD}", + "database.dbname": "${env:DB_NAME}", + "topic.prefix": "order_outbox_cdc", + "plugin.name": "pgoutput", + "slot.name": "order_outbox_slot", + "snapshot.mode": "schema_only", + "table.include.list": "public.p_order_outbox", + "transforms": "outbox", + "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter", + "transforms.outbox.table.field.event.id": "id", + "transforms.outbox.table.field.event.key": "aggregate_id", + "transforms.outbox.table.field.event.type": "event_type", + "transforms.outbox.table.field.event.payload": "payload", + "transforms.outbox.route.by.field": "event_type", + "transforms.outbox.route.topic.replacement": "${routedByValue}", + "key.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "key.converter.schemas.enable": "false", + "value.converter.schemas.enable": "false", + "transforms.outbox.table.expand.json.payload": "true" + } +} \ No newline at end of file diff --git a/connectors/payment-oubox.json b/connectors/payment-oubox.json new file mode 100644 index 00000000..0db158f6 --- /dev/null +++ b/connectors/payment-oubox.json @@ -0,0 +1,30 @@ +{ + "name": "payment-outbox-connector", + "config": { + "connector.class": "io.debezium.connector.postgresql.PostgresConnector", + "tasks.max": "1", + "database.hostname": "${env:DB_HOST}", + "database.port": "5432", + "database.user": "${env:SPRING_DATASOURCE_USERNAME}", + "database.password": "${env:SPRING_DATASOURCE_PASSWORD}", + "database.dbname": "${env:DB_NAME}", + "topic.prefix": "payment_outbox_cdc", + "plugin.name": "pgoutput", + "slot.name": "payment_outbox_slot", + "snapshot.mode": "schema_only", + "table.include.list": "public.p_payment_outbox", + "transforms": "outbox", + "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter", + "transforms.outbox.table.field.event.id": "id", + "transforms.outbox.table.field.event.key": "aggregate_id", + "transforms.outbox.table.field.event.type": "event_type", + "transforms.outbox.table.field.event.payload": "payload", + "transforms.outbox.route.by.field": "event_type", + "transforms.outbox.route.topic.replacement": "${routedByValue}", + "key.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "key.converter.schemas.enable": "false", + "value.converter.schemas.enable": "false", + "transforms.outbox.table.expand.json.payload": "true" + } +} \ No newline at end of file diff --git a/connectors/register-connectors.sh b/connectors/register-connectors.sh new file mode 100644 index 00000000..8049533b --- /dev/null +++ b/connectors/register-connectors.sh @@ -0,0 +1,10 @@ +#!/bin/sh +echo "Waiting for Kafka Connect..." +while [ $(curl -s -o /dev/null -w "%{http_code}" http://connect:8083) -ne 200 ]; do + sleep 3 +done + +echo "Registering connectors from /configs..." +for file in /configs/*.json; do + curl -X POST -H "Content-Type: application/json" -d @"$file" http://connect:8083/connectors +done \ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yaml index 4d8a5d04..c8d22b29 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -5,7 +5,12 @@ services: db: image: postgres:15-alpine container_name: local-postgres_db - command: postgres -c max_connections=300 + command: > + postgres + -c max_connections=300 + -c wal_level=logical + -c max_replication_slots=10 + -c max_wal_senders=10 environment: - POSTGRES_DB=myapp_db - POSTGRES_USER=admin @@ -36,7 +41,7 @@ services: environment: KAFKA_NODE_ID: 1 KAFKA_LOG4J_ROOT_LOGLEVEL: 'WARN' - KAFKA_LOG4J_LOGGERS: 'kafka.controller=WARN,state.change.logger=WARN,kafka.producer.async.DefaultEventHandler=WARN' + KAFKA_LOG4J_LOGGERS: 'kafka=WARN,state.change.logger=WARN' KAFKA_PROCESS_ROLES: 'broker,controller' KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:19093' KAFKA_LISTENERS: 'INTERNAL://0.0.0.0:29092,EXTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:19093' @@ -57,15 +62,55 @@ services: - kafka-data:/var/lib/kafka/data networks: - spot-network + + connect: + image: debezium/connect:2.4 + container_name: connect + depends_on: + - kafka + - db + ports: + - "8888:8083" + env_file: + - .env + environment: + CONNECT_CONFIG_PROVIDERS: 'env' + CONNECT_CONFIG_PROVIDERS_ENV_CLASS: 'org.apache.kafka.common.config.provider.EnvVarConfigProvider' + BOOTSTRAP_SERVERS: kafka:29092 + GROUP_ID: 1 + CONFIG_STORAGE_TOPIC: my_connect_configs + OFFSET_STORAGE_TOPIC: my_connect_offsets + STATUS_STORAGE_TOPIC: my_connect_statuses + KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter + VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter + CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false" + CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false" + LOGGING_LEVEL: 'WARN' + CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR" + networks: + - spot-network + + connect-init: + image: curlimages/curl:latest + container_name: connect-init + depends_on: + - connect + networks: + - spot-network + volumes: + - ./connectors:/configs + entrypoint: ["/bin/sh", "/configs/register-connectors.sh"] kafka-ui: image: provectuslabs/kafka-ui:latest container_name: kafka-ui-local ports: - - "8989:8080" # spot-order(8082)와 충돌을 피하기 위해 8989로 변경 + - "8989:8080" environment: KAFKA_CLUSTERS_0_NAME: local-spot-cluster KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: 'kafka:29092' + KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: connect + KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: 'http://connect:8083' LOGGING_LEVEL_ROOT: WARN LOGGING_LEVEL_COM_PROVECTUS: WARN depends_on: @@ -79,6 +124,8 @@ services: context: ./spot-gateway dockerfile: Dockerfile container_name: spot-gateway + environment: + - LOGGING_LEVEL_ROOT=WARN ports: - "8080:8080" volumes: @@ -104,6 +151,7 @@ services: environment: - DB_HOST=db - KAFKA_BOOTSTRAP_SERVERS=kafka:29092 # 필요 시 추가 + - LOGGING_LEVEL_ROOT=WARN volumes: - ./config:/config depends_on: @@ -127,6 +175,7 @@ services: environment: - DB_HOST=db - KAFKA_BOOTSTRAP_SERVERS=kafka:29092 # 필요 시 추가 + - LOGGING_LEVEL_ROOT=WARN volumes: - ./config:/config depends_on: @@ -146,6 +195,7 @@ services: environment: - DB_HOST=db - KAFKA_BOOTSTRAP_SERVERS=kafka:29092 # 필요 시 추가 + - LOGGING_LEVEL_ROOT=WARN volumes: - ./config:/config depends_on: @@ -166,6 +216,7 @@ services: environment: - DB_HOST=db - KAFKA_BOOTSTRAP_SERVERS=kafka:29092 # 필요 시 추가 + - LOGGING_LEVEL_ROOT=WARN volumes: - ./config:/config depends_on: diff --git a/kafka.sh b/kafka.sh index b14ef42c..8d2dd620 100755 --- a/kafka.sh +++ b/kafka.sh @@ -1,14 +1,11 @@ #!/bin/bash -echo "===기존 Order,Payment 컨테이너 종료 및 삭제===" -docker compose stop spot-order spot-payment -docker compose rm -f spot-order spot-payment +echo "===기존 컨테이너 종료 및 삭제===" +docker compose down echo "===Order,Payment 서비스 빌드===" (cd spot-order && ./gradlew bootJar -x test) (cd spot-payment && ./gradlew bootJar -x test) echo "===Order,Payment 빌드 및 시작===" -docker compose build spot-order spot-payment -docker compose up -d spot-order spot-payment - +docker compose up --build \ No newline at end of file diff --git a/spot-order/src/main/java/com/example/Spot/order/application/service/OrderOutboxCleanupScheduler.java b/spot-order/src/main/java/com/example/Spot/order/application/service/OrderOutboxCleanupScheduler.java new file mode 100644 index 00000000..d9774783 --- /dev/null +++ b/spot-order/src/main/java/com/example/Spot/order/application/service/OrderOutboxCleanupScheduler.java @@ -0,0 +1,24 @@ +package com.example.Spot.order.application.service; + +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Component +@RequiredArgsConstructor +public class OrderOutboxCleanupScheduler { + + private final OrderOutboxCleanupService cleanupService; + + @Scheduled(cron = "0 0 3 * * *") + public void run() { + try { + cleanupService.cleanup(); + } catch (Exception e) { + log.error("[ORDER_OUTBOX-CLEANUP] scheduler failed", e); + } + } +} diff --git a/spot-order/src/main/java/com/example/Spot/order/application/service/OrderOutboxCleanupService.java b/spot-order/src/main/java/com/example/Spot/order/application/service/OrderOutboxCleanupService.java new file mode 100644 index 00000000..c96d77bf --- /dev/null +++ b/spot-order/src/main/java/com/example/Spot/order/application/service/OrderOutboxCleanupService.java @@ -0,0 +1,30 @@ +package com.example.Spot.order.application.service; + +import java.time.LocalDateTime; + +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import com.example.Spot.order.domain.repository.OrderOutboxRepository; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Service +@RequiredArgsConstructor +public class OrderOutboxCleanupService { + + private static final int RETENTION_DAYS = 7; + private final OrderOutboxRepository orderOutboxRepository; + + @Transactional + public void cleanup() { + LocalDateTime threshold = LocalDateTime.now().minusDays(RETENTION_DAYS); + int deletedCount = orderOutboxRepository.deleteOlderThan(threshold); + + if (deletedCount > 0) { + log.info("[ORDER_OUTBOX-CLEANUP] deleted {} rows (threshold={})", deletedCount, threshold); + } + } +} diff --git a/spot-order/src/main/java/com/example/Spot/order/application/service/OrderOutboxScheduler.java b/spot-order/src/main/java/com/example/Spot/order/application/service/OrderOutboxScheduler.java deleted file mode 100644 index 95c69d40..00000000 --- a/spot-order/src/main/java/com/example/Spot/order/application/service/OrderOutboxScheduler.java +++ /dev/null @@ -1,38 +0,0 @@ -package com.example.Spot.order.application.service; - -import java.time.LocalDateTime; -import java.util.List; - -import org.springframework.data.domain.PageRequest; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; - -import com.example.Spot.order.domain.entity.OrderOutboxEntity; -import com.example.Spot.order.domain.repository.OrderOutboxRepository; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; - -@Slf4j -@Component -@RequiredArgsConstructor -public class OrderOutboxScheduler { - - private final OrderOutboxRepository outboxRepository; - private final OrderOutboxService orderOutboxService; - - @Scheduled(fixedDelay = 1000) - public void processOutbox() { - // INIT 상태인 데이터를 오래된 순으로 10개만 가져옴 - List outboxes = outboxRepository.findTop10ReadyTopublish( - LocalDateTime.now(), PageRequest.of(0, 10)); - - if (outboxes.isEmpty()) { - return; - } - - for (OrderOutboxEntity outbox : outboxes) { - orderOutboxService.publishIndividualEvent(outbox); - } - } -} diff --git a/spot-order/src/main/java/com/example/Spot/order/application/service/OrderOutboxService.java b/spot-order/src/main/java/com/example/Spot/order/application/service/OrderOutboxService.java deleted file mode 100644 index c31cd449..00000000 --- a/spot-order/src/main/java/com/example/Spot/order/application/service/OrderOutboxService.java +++ /dev/null @@ -1,38 +0,0 @@ -package com.example.Spot.order.application.service; - -import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Propagation; -import org.springframework.transaction.annotation.Transactional; - -import com.example.Spot.order.domain.entity.OrderOutboxEntity; -import com.example.Spot.order.domain.repository.OrderOutboxRepository; -import com.example.Spot.order.infrastructure.producer.OrderEventProducer; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; - -@Slf4j -@Service -@RequiredArgsConstructor -public class OrderOutboxService { - - private final OrderEventProducer orderEventProducer; - private final OrderOutboxRepository outboxRepository; - - @Transactional(propagation = Propagation.REQUIRES_NEW) - public void publishIndividualEvent(OrderOutboxEntity outbox) { - try { - orderEventProducer.publish(outbox); - outbox.markPublished(); - outboxRepository.saveAndFlush(outbox); - log.info("✅ [Order 발행 성공] ID: {}", outbox.getAggregateId()); - } catch (Exception e) { - outbox.retryFailed(); - if (outbox.getRetryCount() >= 10) { - outbox.markFailed(); - } - outboxRepository.saveAndFlush(outbox); - log.error("❌ [Order 발행 실패] ID: {}, 사유: {}", outbox.getAggregateId(), e.getMessage()); - } - } -} diff --git a/spot-order/src/main/java/com/example/Spot/order/domain/entity/OrderOutboxEntity.java b/spot-order/src/main/java/com/example/Spot/order/domain/entity/OrderOutboxEntity.java index 64fa7b00..7a820cd9 100644 --- a/spot-order/src/main/java/com/example/Spot/order/domain/entity/OrderOutboxEntity.java +++ b/spot-order/src/main/java/com/example/Spot/order/domain/entity/OrderOutboxEntity.java @@ -1,17 +1,13 @@ package com.example.Spot.order.domain.entity; -import java.time.LocalDateTime; import java.util.UUID; import org.hibernate.annotations.UuidGenerator; import com.example.Spot.global.common.BaseEntity; -import com.example.Spot.order.domain.enums.OutboxStatus; import jakarta.persistence.Column; import jakarta.persistence.Entity; -import jakarta.persistence.EnumType; -import jakarta.persistence.Enumerated; import jakarta.persistence.GeneratedValue; import jakarta.persistence.Id; import jakarta.persistence.Index; @@ -24,7 +20,7 @@ @Entity @Getter @Table(name = "p_order_outbox", indexes = { - @Index(name = "idx_order_outbox_status_next_attempt", columnList = "outbox_status, next_attempt_at") + @Index(name = "idx_order_outbox_created_at", columnList = "created_at") }) @NoArgsConstructor(access = AccessLevel.PROTECTED) public class OrderOutboxEntity extends BaseEntity { @@ -47,47 +43,11 @@ public class OrderOutboxEntity extends BaseEntity { @Column(columnDefinition = "TEXT", nullable = false) private String payload; - @Enumerated(EnumType.STRING) - @Column(name = "outbox_status", nullable = false) - private OutboxStatus outboxStatus = OutboxStatus.INIT; - - @Column(name = "retry_count", nullable = false) - private Integer retryCount; - - @Column(name = "next_attempt_at") - private LocalDateTime nextAttemptAt; - - @Column(name = "published_at") - private LocalDateTime publishedAt; - @Builder - public OrderOutboxEntity(String aggregateType, UUID aggregateId, String eventKey, String eventType, String payload) { + public OrderOutboxEntity(String aggregateType, UUID aggregateId, String eventType, String payload) { this.aggregateType = aggregateType; this.aggregateId = aggregateId; this.eventType = eventType; this.payload = payload; - this.retryCount = 0; - this.outboxStatus = OutboxStatus.INIT; - this.nextAttemptAt = LocalDateTime.now(); - } - - public void markPublished() { - if (this.outboxStatus.canTransitionTo(OutboxStatus.PUBLISHED)) { - this.outboxStatus = OutboxStatus.PUBLISHED; - this.publishedAt = LocalDateTime.now(); - } - } - - public void markFailed() { - if (this.outboxStatus.canTransitionTo(OutboxStatus.FAILED)) { - this.outboxStatus = OutboxStatus.FAILED; - } - } - - public void retryFailed() { - this.retryCount++; - long delaySeconds = (long) Math.min(Math.pow(2, this.retryCount), 3600); - this.nextAttemptAt = LocalDateTime.now().plusSeconds(delaySeconds); } } - diff --git a/spot-order/src/main/java/com/example/Spot/order/domain/enums/OutboxStatus.java b/spot-order/src/main/java/com/example/Spot/order/domain/enums/OutboxStatus.java deleted file mode 100644 index b5dfe1d2..00000000 --- a/spot-order/src/main/java/com/example/Spot/order/domain/enums/OutboxStatus.java +++ /dev/null @@ -1,27 +0,0 @@ -package com.example.Spot.order.domain.enums; - -public enum OutboxStatus { - INIT("발행 대기"), - PUBLISHED("발행 완료"), - FAILED("발행 실패"); - - private final String description; - - OutboxStatus(String description) { - this.description = description; - } - - public String getDescription() { - return description; - } - - // 상태 전환 검증 - public boolean canTransitionTo(OutboxStatus newStatus) { - return switch (this) { - case INIT -> newStatus == PUBLISHED || newStatus == FAILED; - case FAILED -> newStatus == PUBLISHED; - default -> false; - }; - } -} - diff --git a/spot-order/src/main/java/com/example/Spot/order/domain/repository/OrderOutboxRepository.java b/spot-order/src/main/java/com/example/Spot/order/domain/repository/OrderOutboxRepository.java index 2bdc33df..856ca2b9 100644 --- a/spot-order/src/main/java/com/example/Spot/order/domain/repository/OrderOutboxRepository.java +++ b/spot-order/src/main/java/com/example/Spot/order/domain/repository/OrderOutboxRepository.java @@ -1,20 +1,20 @@ package com.example.Spot.order.domain.repository; import java.time.LocalDateTime; -import java.util.List; import java.util.UUID; -import org.springframework.data.domain.Pageable; import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Modifying; import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.query.Param; +import org.springframework.transaction.annotation.Transactional; import com.example.Spot.order.domain.entity.OrderOutboxEntity; public interface OrderOutboxRepository extends JpaRepository { - @Query("SELECT o FROM OrderOutboxEntity o " + - "WHERE o.outboxStatus = 'INIT' " + - "AND o.nextAttemptAt <= :now " + - "ORDER By o.createdAt ASC") - List findTop10ReadyTopublish(@Param("now") LocalDateTime now, Pageable pageable); + + @Transactional + @Modifying + @Query("DELETE FROM OrderOutboxEntity o WHERE o.createdAt < :threshold") + int deleteOlderThan(@Param("threshold") LocalDateTime threshold); } diff --git a/spot-order/src/main/java/com/example/Spot/order/infrastructure/event/publish/OrderPendingEvent.java b/spot-order/src/main/java/com/example/Spot/order/infrastructure/event/publish/OrderPendingEvent.java index bc9c7b91..a51ec6c3 100644 --- a/spot-order/src/main/java/com/example/Spot/order/infrastructure/event/publish/OrderPendingEvent.java +++ b/spot-order/src/main/java/com/example/Spot/order/infrastructure/event/publish/OrderPendingEvent.java @@ -1,6 +1,5 @@ package com.example.Spot.order.infrastructure.event.publish; -import java.time.LocalDateTime; import java.util.UUID; import lombok.AllArgsConstructor; @@ -15,5 +14,4 @@ public class OrderPendingEvent { private UUID storeId; private UUID orderId; - private LocalDateTime timestamp; } diff --git a/spot-order/src/main/java/com/example/Spot/order/infrastructure/producer/OrderEventProducer.java b/spot-order/src/main/java/com/example/Spot/order/infrastructure/producer/OrderEventProducer.java index a22f4770..31c73004 100644 --- a/spot-order/src/main/java/com/example/Spot/order/infrastructure/producer/OrderEventProducer.java +++ b/spot-order/src/main/java/com/example/Spot/order/infrastructure/producer/OrderEventProducer.java @@ -1,6 +1,5 @@ package com.example.Spot.order.infrastructure.producer; -import java.time.LocalDateTime; import java.util.UUID; import org.springframework.beans.factory.annotation.Value; @@ -50,7 +49,6 @@ public void reserveOrderPending(UUID storeId, UUID orderId) { OrderPendingEvent event = OrderPendingEvent.builder() .storeId(storeId) .orderId(orderId) - .timestamp(LocalDateTime.now()) .build(); saveOutbox(orderPendingTopic, orderId, event); } @@ -90,10 +88,4 @@ public void saveOutbox(String topic, UUID aggregateId, Object event) { throw new RuntimeException("이벤트 발행 예약 중 오류 발생", e); } } - - public void publish(OrderOutboxEntity outbox) throws Exception { - kafkaTemplate.send(outbox.getEventType(), outbox.getAggregateId().toString(), outbox.getPayload()).get(); - } } - - diff --git a/spot-payment/src/main/java/com/example/Spot/payments/application/service/PaymentOutboxCleanupScheduler.java b/spot-payment/src/main/java/com/example/Spot/payments/application/service/PaymentOutboxCleanupScheduler.java new file mode 100644 index 00000000..1154057a --- /dev/null +++ b/spot-payment/src/main/java/com/example/Spot/payments/application/service/PaymentOutboxCleanupScheduler.java @@ -0,0 +1,24 @@ +package com.example.Spot.payments.application.service; + +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Component +@RequiredArgsConstructor +public class PaymentOutboxCleanupScheduler { + + private final PaymentOutboxCleanupService cleanupService; + + @Scheduled(cron = "0 0 3 * * *") + public void run() { + try { + cleanupService.cleanup(); + } catch (Exception e) { + log.error("[PAYMENT_OUTBOX-CLEANUP] scheduler failed", e); + } + } +} diff --git a/spot-payment/src/main/java/com/example/Spot/payments/application/service/PaymentOutboxCleanupService.java b/spot-payment/src/main/java/com/example/Spot/payments/application/service/PaymentOutboxCleanupService.java new file mode 100644 index 00000000..07b7b96a --- /dev/null +++ b/spot-payment/src/main/java/com/example/Spot/payments/application/service/PaymentOutboxCleanupService.java @@ -0,0 +1,30 @@ +package com.example.Spot.payments.application.service; + +import java.time.LocalDateTime; + +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import com.example.Spot.payments.domain.repository.PaymentOutboxRepository; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Service +@RequiredArgsConstructor +public class PaymentOutboxCleanupService { + + private static final int RETENTION_DAYS = 7; + private final PaymentOutboxRepository paymentOutboxRepository; + + @Transactional + public void cleanup() { + LocalDateTime threshold = LocalDateTime.now().minusDays(RETENTION_DAYS); + int deletedCount = paymentOutboxRepository.deleteOlderThan(threshold); + + if (deletedCount > 0) { + log.info("[PAYMENT_OUTBOX-CLEANUP] deleted {} rows (threshold={})", deletedCount, threshold); + } + } +} diff --git a/spot-payment/src/main/java/com/example/Spot/payments/application/service/PaymentOutboxScheduler.java b/spot-payment/src/main/java/com/example/Spot/payments/application/service/PaymentOutboxScheduler.java deleted file mode 100644 index 9b483dc8..00000000 --- a/spot-payment/src/main/java/com/example/Spot/payments/application/service/PaymentOutboxScheduler.java +++ /dev/null @@ -1,35 +0,0 @@ -package com.example.Spot.payments.application.service; - -import java.time.LocalDateTime; -import java.util.List; - -import org.springframework.data.domain.PageRequest; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; - -import com.example.Spot.payments.domain.entity.PaymentOutboxEntity; -import com.example.Spot.payments.domain.repository.PaymentOutboxRepository; - -import lombok.RequiredArgsConstructor; - -@Component -@RequiredArgsConstructor -public class PaymentOutboxScheduler { - - private final PaymentOutboxRepository outboxRepository; - private final PaymentOutboxService paymentOutboxService; - - @Scheduled(fixedDelay = 1000) - public void processOutbox() { - List outboxes = outboxRepository.findTop10ReadyTopublish( - LocalDateTime.now(), PageRequest.of(0, 10)); - - if (outboxes.isEmpty()) { - return; - } - - for (PaymentOutboxEntity outbox : outboxes) { - paymentOutboxService.publishIndividualEvent(outbox); - } - } -} diff --git a/spot-payment/src/main/java/com/example/Spot/payments/application/service/PaymentOutboxService.java b/spot-payment/src/main/java/com/example/Spot/payments/application/service/PaymentOutboxService.java deleted file mode 100644 index d590bab1..00000000 --- a/spot-payment/src/main/java/com/example/Spot/payments/application/service/PaymentOutboxService.java +++ /dev/null @@ -1,41 +0,0 @@ -package com.example.Spot.payments.application.service; - -import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Propagation; -import org.springframework.transaction.annotation.Transactional; - -import com.example.Spot.payments.domain.entity.PaymentOutboxEntity; -import com.example.Spot.payments.domain.repository.PaymentOutboxRepository; -import com.example.Spot.payments.infrastructure.producer.PaymentEventProducer; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; - -@Slf4j -@Service -@RequiredArgsConstructor -public class PaymentOutboxService { - - private final PaymentEventProducer paymentEventProducer; - private final PaymentOutboxRepository outboxRepository; - - @Transactional(propagation = Propagation.REQUIRED) - public void publishIndividualEvent(PaymentOutboxEntity outbox) { - try { - paymentEventProducer.publish(outbox); - outbox.markPublished(); - outboxRepository.saveAndFlush(outbox); - - log.info("✅ [결제 이벤트 발행 성공] AggregateId: {}, Topic: {}", - outbox.getAggregateId(), outbox.getEventType()); - } catch (Exception e) { - outbox.retryFailed(); - if (outbox.getRetryCount() >= 10) { - outbox.markFailed(); - } - outboxRepository.saveAndFlush(outbox); - log.error("❌ [결제 이벤트 발행 실패] AggregateId: {}, 사유: {}", - outbox.getAggregateId(), e.getMessage()); - } - } -} diff --git a/spot-payment/src/main/java/com/example/Spot/payments/domain/entity/PaymentOutboxEntity.java b/spot-payment/src/main/java/com/example/Spot/payments/domain/entity/PaymentOutboxEntity.java index 1ee16dfb..daf7f311 100644 --- a/spot-payment/src/main/java/com/example/Spot/payments/domain/entity/PaymentOutboxEntity.java +++ b/spot-payment/src/main/java/com/example/Spot/payments/domain/entity/PaymentOutboxEntity.java @@ -1,17 +1,13 @@ package com.example.Spot.payments.domain.entity; -import java.time.LocalDateTime; import java.util.UUID; import org.hibernate.annotations.UuidGenerator; import com.example.Spot.global.common.BaseEntity; -import com.example.Spot.payments.domain.enums.OutboxStatus; import jakarta.persistence.Column; import jakarta.persistence.Entity; -import jakarta.persistence.EnumType; -import jakarta.persistence.Enumerated; import jakarta.persistence.GeneratedValue; import jakarta.persistence.Id; import jakarta.persistence.Index; @@ -23,7 +19,7 @@ @Entity @Table(name = "p_payment_outbox", indexes = { - @Index(name = "idx_payment_outbox_status_next_attempt", columnList = "outbox_status, next_attempt_at") + @Index(name = "idx_payment_outbox_created_at", columnList = "created_at") }) @Getter @NoArgsConstructor(access = AccessLevel.PROTECTED) @@ -47,46 +43,11 @@ public class PaymentOutboxEntity extends BaseEntity { @Column(columnDefinition = "TEXT", nullable = false) private String payload; - @Enumerated(EnumType.STRING) - @Column(name = "outbox_status", nullable = false) - private OutboxStatus outboxStatus = OutboxStatus.INIT; - - @Column(name = "retry_count", nullable = false) - private Integer retryCount; - - @Column(name = "next_attempt_at") - private LocalDateTime nextAttemptAt; - - @Column(name = "published_at") - private LocalDateTime publishedAt; - @Builder public PaymentOutboxEntity(String aggregateType, UUID aggregateId, String eventKey, String eventType, String payload) { this.aggregateType = aggregateType; this.aggregateId = aggregateId; this.eventType = eventType; this.payload = payload; - this.retryCount = 0; - this.outboxStatus = OutboxStatus.INIT; - this.nextAttemptAt = LocalDateTime.now(); - } - - public void markPublished() { - if (this.outboxStatus.canTransitionTo(OutboxStatus.PUBLISHED)) { - this.outboxStatus = OutboxStatus.PUBLISHED; - this.publishedAt = LocalDateTime.now(); - } - } - - public void markFailed() { - if (this.outboxStatus.canTransitionTo(OutboxStatus.FAILED)) { - this.outboxStatus = OutboxStatus.FAILED; - } - } - - public void retryFailed() { - this.retryCount++; - long delaySeconds = (long) Math.min(Math.pow(2, this.retryCount), 3600); - this.nextAttemptAt = LocalDateTime.now().plusSeconds(delaySeconds); } } diff --git a/spot-payment/src/main/java/com/example/Spot/payments/domain/enums/OutboxStatus.java b/spot-payment/src/main/java/com/example/Spot/payments/domain/enums/OutboxStatus.java deleted file mode 100644 index 7f4f7c6b..00000000 --- a/spot-payment/src/main/java/com/example/Spot/payments/domain/enums/OutboxStatus.java +++ /dev/null @@ -1,26 +0,0 @@ -package com.example.Spot.payments.domain.enums; - -public enum OutboxStatus { - INIT("발행 대기"), - PUBLISHED("발행 완료"), - FAILED("발행 실패"); - - private final String description; - - OutboxStatus(String description) { - this.description = description; - } - - public String getDescription() { - return description; - } - - // 상태 전환 검증 - public boolean canTransitionTo(OutboxStatus newStatus) { - return switch (this) { - case INIT -> newStatus == PUBLISHED || newStatus == FAILED; - case FAILED -> newStatus == PUBLISHED; - default -> false; - }; - } -} diff --git a/spot-payment/src/main/java/com/example/Spot/payments/domain/repository/PaymentOutboxRepository.java b/spot-payment/src/main/java/com/example/Spot/payments/domain/repository/PaymentOutboxRepository.java index 5a850f0f..c9563d9b 100644 --- a/spot-payment/src/main/java/com/example/Spot/payments/domain/repository/PaymentOutboxRepository.java +++ b/spot-payment/src/main/java/com/example/Spot/payments/domain/repository/PaymentOutboxRepository.java @@ -1,20 +1,20 @@ package com.example.Spot.payments.domain.repository; import java.time.LocalDateTime; -import java.util.List; import java.util.UUID; -import org.springframework.data.domain.Pageable; import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Modifying; import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.query.Param; +import org.springframework.transaction.annotation.Transactional; import com.example.Spot.payments.domain.entity.PaymentOutboxEntity; public interface PaymentOutboxRepository extends JpaRepository { - @Query("SELECT p FROM PaymentOutboxEntity p " + - "WHERE p.outboxStatus = 'INIT' " + - "AND p.nextAttemptAt <= :now " + - "ORDER BY p.createdAt ASC") - List findTop10ReadyTopublish(@Param("now")LocalDateTime now, Pageable pageable); + + @Transactional + @Modifying + @Query("DELETE FROM PaymentOutboxEntity p WHERE p.createdAt < :threshold") + int deleteOlderThan(@Param("threshold") LocalDateTime threshold); } diff --git a/spot-payment/src/main/java/com/example/Spot/payments/infrastructure/producer/PaymentEventProducer.java b/spot-payment/src/main/java/com/example/Spot/payments/infrastructure/producer/PaymentEventProducer.java index d2d4de7e..43156070 100644 --- a/spot-payment/src/main/java/com/example/Spot/payments/infrastructure/producer/PaymentEventProducer.java +++ b/spot-payment/src/main/java/com/example/Spot/payments/infrastructure/producer/PaymentEventProducer.java @@ -4,7 +4,6 @@ import org.springframework.beans.factory.annotation.Value; -import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import com.example.Spot.payments.domain.entity.PaymentOutboxEntity; @@ -23,7 +22,6 @@ @RequiredArgsConstructor public class PaymentEventProducer { - private final KafkaTemplate kafkaTemplate; private final PaymentOutboxRepository outboxRepository; private final ObjectMapper objectMapper; @@ -66,8 +64,4 @@ private void saveOutbox(String topic, UUID aggregateId, Object event) { throw new RuntimeException("이벤트 발행 예약 중 오류 발생", e); } } - - public void publish(PaymentOutboxEntity outbox) throws Exception { - kafkaTemplate.send(outbox.getEventType(), outbox.getAggregateId().toString(), outbox.getPayload()).get(); - } }