diff --git a/apps/user-service/build.gradle b/apps/user-service/build.gradle index 8aa7715a..1983ae8f 100644 --- a/apps/user-service/build.gradle +++ b/apps/user-service/build.gradle @@ -41,6 +41,9 @@ dependencies { implementation 'org.springframework.boot:spring-boot-starter-aop' implementation 'org.springframework.boot:spring-boot-starter-validation' + // Java 표준 라이프사이클 어노테이션 (@PostConstruct, @PreDestroy) + implementation 'javax.annotation:javax.annotation-api:1.3.2' + // MyBatis implementation 'org.mybatis.spring.boot:mybatis-spring-boot-starter:3.0.5' diff --git a/apps/user-service/src/main/java/site/icebang/domain/execution/mapper/JobRunMapper.java b/apps/user-service/src/main/java/site/icebang/domain/execution/mapper/JobRunMapper.java new file mode 100644 index 00000000..887e0bed --- /dev/null +++ b/apps/user-service/src/main/java/site/icebang/domain/execution/mapper/JobRunMapper.java @@ -0,0 +1,10 @@ +package site.icebang.domain.execution.mapper; + +import org.apache.ibatis.annotations.Mapper; +import site.icebang.domain.execution.model.JobRun; + +@Mapper +public interface JobRunMapper { + void save(JobRun jobRun); + void update(JobRun jobRun); +} \ No newline at end of file diff --git a/apps/user-service/src/main/java/site/icebang/domain/execution/mapper/WorkflowRunMapper.java b/apps/user-service/src/main/java/site/icebang/domain/execution/mapper/WorkflowRunMapper.java new file mode 100644 index 00000000..76fda718 --- /dev/null +++ b/apps/user-service/src/main/java/site/icebang/domain/execution/mapper/WorkflowRunMapper.java @@ -0,0 +1,10 @@ +package site.icebang.domain.execution.mapper; + +import org.apache.ibatis.annotations.Mapper; +import site.icebang.domain.execution.model.WorkflowRun; + +@Mapper +public interface WorkflowRunMapper { + void save(WorkflowRun workflowRun); + void update(WorkflowRun workflowRun); +} \ No newline at end of file diff --git a/apps/user-service/src/main/java/site/icebang/domain/execution/model/JobRun.java b/apps/user-service/src/main/java/site/icebang/domain/execution/model/JobRun.java new file mode 100644 index 00000000..5fd3ef5b --- /dev/null +++ b/apps/user-service/src/main/java/site/icebang/domain/execution/model/JobRun.java @@ -0,0 +1,25 @@ +package site.icebang.domain.execution.model; + +import java.time.LocalDateTime; +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@Getter +@Setter +@Builder +@NoArgsConstructor(access = AccessLevel.PROTECTED) +@AllArgsConstructor +public class JobRun { + private Long id; + private Long workflowRunId; + private Long jobId; + private String status; + private LocalDateTime startedAt; + private LocalDateTime finishedAt; + private Integer executionOrder; + private LocalDateTime createdAt; +} \ No newline at end of file diff --git a/apps/user-service/src/main/java/site/icebang/domain/execution/model/WorkflowRun.java b/apps/user-service/src/main/java/site/icebang/domain/execution/model/WorkflowRun.java new file mode 100644 index 00000000..ee38e036 --- /dev/null +++ b/apps/user-service/src/main/java/site/icebang/domain/execution/model/WorkflowRun.java @@ -0,0 +1,30 @@ +package site.icebang.domain.execution.model; + +import java.time.LocalDateTime; +import java.util.UUID; +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@Getter +@Setter +@Builder +@NoArgsConstructor(access = AccessLevel.PROTECTED) +@AllArgsConstructor +public class WorkflowRun { + + private Long id; + private Long workflowId; + private String traceId; + private String runNumber; + private String status; + private String triggerType; + private LocalDateTime startedAt; + private LocalDateTime finishedAt; + private Long createdBy; + private LocalDateTime createdAt; + +} \ No newline at end of file diff --git a/apps/user-service/src/main/java/site/icebang/domain/job/mapper/JobMapper.java b/apps/user-service/src/main/java/site/icebang/domain/job/mapper/JobMapper.java new file mode 100644 index 00000000..538306b5 --- /dev/null +++ b/apps/user-service/src/main/java/site/icebang/domain/job/mapper/JobMapper.java @@ -0,0 +1,10 @@ +package site.icebang.domain.job.mapper; + +import java.util.Optional; +import org.apache.ibatis.annotations.Mapper; +import site.icebang.domain.job.model.Job; + +@Mapper +public interface JobMapper { + Optional findById(Long id); +} \ No newline at end of file diff --git a/apps/user-service/src/main/java/site/icebang/domain/job/model/Job.java b/apps/user-service/src/main/java/site/icebang/domain/job/model/Job.java new file mode 100644 index 00000000..9e2fe2bf --- /dev/null +++ b/apps/user-service/src/main/java/site/icebang/domain/job/model/Job.java @@ -0,0 +1,21 @@ +package site.icebang.domain.job.model; + +import java.time.LocalDateTime; +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; + +@Getter +@NoArgsConstructor(access = AccessLevel.PROTECTED) +@AllArgsConstructor +public class Job { + private Long id; + private String name; + private String description; + private boolean isEnabled; + private LocalDateTime createdAt; + private Long createdBy; + private LocalDateTime updatedAt; + private Long updatedBy; +} \ No newline at end of file diff --git a/apps/user-service/src/main/java/site/icebang/domain/mapping/mapper/WorkflowJobMapper.java b/apps/user-service/src/main/java/site/icebang/domain/mapping/mapper/WorkflowJobMapper.java new file mode 100644 index 00000000..97f9b14c --- /dev/null +++ b/apps/user-service/src/main/java/site/icebang/domain/mapping/mapper/WorkflowJobMapper.java @@ -0,0 +1,10 @@ +package site.icebang.domain.mapping.mapper; + +import java.util.List; +import org.apache.ibatis.annotations.Mapper; + +@Mapper +public interface WorkflowJobMapper { + // A workflow can have multiple jobs, ordered by execution_order + List findJobIdsByWorkflowId(Long workflowId); +} \ No newline at end of file diff --git a/apps/user-service/src/main/java/site/icebang/domain/schedule/mapper/ScheduleMapper.java b/apps/user-service/src/main/java/site/icebang/domain/schedule/mapper/ScheduleMapper.java new file mode 100644 index 00000000..b64730a9 --- /dev/null +++ b/apps/user-service/src/main/java/site/icebang/domain/schedule/mapper/ScheduleMapper.java @@ -0,0 +1,14 @@ +package site.icebang.domain.schedule.mapper; + +import java.util.List; +import java.util.Optional; +import org.apache.ibatis.annotations.Mapper; +import site.icebang.domain.schedule.model.Schedule; + +@Mapper +public interface ScheduleMapper { + void save(Schedule schedule); + void update(Schedule schedule); + Optional findById(Long id); + List findAllActive(); +} \ No newline at end of file diff --git a/apps/user-service/src/main/java/site/icebang/domain/schedule/model/Schedule.java b/apps/user-service/src/main/java/site/icebang/domain/schedule/model/Schedule.java new file mode 100644 index 00000000..d5d8f51a --- /dev/null +++ b/apps/user-service/src/main/java/site/icebang/domain/schedule/model/Schedule.java @@ -0,0 +1,30 @@ +package site.icebang.domain.schedule.model; + +import java.time.LocalDateTime; +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@Getter +@Setter // 서비스 레이어에서의 상태 변경 및 MyBatis 매핑을 위해 사용 +@Builder +@NoArgsConstructor(access = AccessLevel.PROTECTED) +@AllArgsConstructor +public class Schedule { + + private Long id; + private Long workflowId; + private String cronExpression; + private String parameters; // JSON format + private boolean isActive; + private String lastRunStatus; + private LocalDateTime lastRunAt; + private LocalDateTime createdAt; + private Long createdBy; + private LocalDateTime updatedAt; + private Long updatedBy; + private String scheduleText; +} \ No newline at end of file diff --git a/apps/user-service/src/main/java/site/icebang/schedule/runner/SchedulerInitializer.java b/apps/user-service/src/main/java/site/icebang/domain/schedule/runner/SchedulerInitializer.java similarity index 72% rename from apps/user-service/src/main/java/site/icebang/schedule/runner/SchedulerInitializer.java rename to apps/user-service/src/main/java/site/icebang/domain/schedule/runner/SchedulerInitializer.java index ee8580dd..a96d5402 100644 --- a/apps/user-service/src/main/java/site/icebang/schedule/runner/SchedulerInitializer.java +++ b/apps/user-service/src/main/java/site/icebang/domain/schedule/runner/SchedulerInitializer.java @@ -1,4 +1,4 @@ -package site.icebang.schedule.runner; +package site.icebang.domain.schedule.runner; import java.util.List; @@ -9,9 +9,9 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import site.icebang.schedule.mapper.ScheduleMapper; -import site.icebang.schedule.model.Schedule; -import site.icebang.schedule.service.DynamicSchedulerService; +import site.icebang.domain.schedule.mapper.ScheduleMapper; +import site.icebang.domain.schedule.model.Schedule; +import site.icebang.domain.schedule.service.DynamicSchedulerService; @Slf4j @Component @@ -24,7 +24,7 @@ public class SchedulerInitializer implements ApplicationRunner { @Override public void run(ApplicationArguments args) { log.info(">>>> Initializing schedules from database..."); - List activeSchedules = scheduleMapper.findAllByIsActive(true); + List activeSchedules = scheduleMapper.findAllActive(); activeSchedules.forEach(dynamicSchedulerService::register); log.info(">>>> {} active schedules have been registered.", activeSchedules.size()); } diff --git a/apps/user-service/src/main/java/site/icebang/domain/schedule/service/DynamicSchedulerService.java b/apps/user-service/src/main/java/site/icebang/domain/schedule/service/DynamicSchedulerService.java new file mode 100644 index 00000000..2a35b711 --- /dev/null +++ b/apps/user-service/src/main/java/site/icebang/domain/schedule/service/DynamicSchedulerService.java @@ -0,0 +1,60 @@ +package site.icebang.domain.schedule.service; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledFuture; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.support.CronTrigger; +import org.springframework.stereotype.Service; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import site.icebang.domain.schedule.model.Schedule; +import site.icebang.domain.workflow.service.WorkflowExecutionService; + +@Slf4j +@Service +@RequiredArgsConstructor +public class DynamicSchedulerService { + + private final TaskScheduler taskScheduler; + private final WorkflowExecutionService workflowExecutionService; + + private final Map> scheduledTasks = new ConcurrentHashMap<>(); + + public void register(Schedule schedule) { + // 스케줄 실행 시 WorkflowExecutionService를 호출하는 Runnable 생성 + Runnable runnable = () -> { + try { + log.debug("Triggering workflow execution for scheduleId: {}", schedule.getId()); + // 실제 워크플로우 실행은 WorkflowExecutionService에 위임 (비동기 호출) + workflowExecutionService.execute(schedule.getWorkflowId(), "SCHEDULE", schedule.getId()); + } catch (Exception e) { + // Async 예외는 기본적으로 처리되지 않으므로 여기서 로그를 남기는 것이 중요 + log.error("Failed to submit workflow execution for scheduleId: {}", schedule.getId(), e); + } + }; + + CronTrigger trigger = new CronTrigger(schedule.getCronExpression()); + ScheduledFuture future = taskScheduler.schedule(runnable, trigger); + + // 기존에 등록된 스케줄이 있다면 취소하고 새로 등록 (업데이트 지원) + ScheduledFuture oldFuture = scheduledTasks.put(schedule.getId(), future); + if (oldFuture != null) { + oldFuture.cancel(false); + } + + log.info(">>>> Schedule registered/updated: id={}, workflowId={}, cron='{}'", + schedule.getId(), schedule.getWorkflowId(), schedule.getCronExpression()); + } + + public void remove(Long scheduleId) { + ScheduledFuture future = scheduledTasks.remove(scheduleId); + if (future != null) { + future.cancel(true); // true: 실행 중인 태스크를 인터럽트 + log.info(">>>> Schedule removed: id={}", scheduleId); + } else { + log.warn(">>>> Attempted to remove a schedule that was not found in the scheduler: id={}", scheduleId); + } + } +} \ No newline at end of file diff --git a/apps/user-service/src/main/java/site/icebang/domain/schedule/service/ScheduleService.java b/apps/user-service/src/main/java/site/icebang/domain/schedule/service/ScheduleService.java new file mode 100644 index 00000000..1d9cde71 --- /dev/null +++ b/apps/user-service/src/main/java/site/icebang/domain/schedule/service/ScheduleService.java @@ -0,0 +1,58 @@ +package site.icebang.domain.schedule.service; + +import java.util.List; +import javax.annotation.PostConstruct; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import site.icebang.domain.schedule.model.Schedule; +import site.icebang.domain.schedule.mapper.ScheduleMapper; + +@Slf4j +@Service +@RequiredArgsConstructor +public class ScheduleService { + + private final ScheduleMapper scheduleMapper; + private final DynamicSchedulerService dynamicSchedulerService; + + /** + * 애플리케이션 시작 시 활성화된 모든 스케줄을 초기화합니다. + * 이를 통해 서버가 재시작되어도 스케줄이 자동으로 복원됩니다. + */ + @PostConstruct + public void initializeSchedules() { + log.info("Initializing active schedules from database..."); + List activeSchedules = scheduleMapper.findAllActive(); + activeSchedules.forEach(dynamicSchedulerService::register); + log.info("{} active schedules have been initialized.", activeSchedules.size()); + } + + @Transactional + public Schedule createSchedule(Schedule schedule) { + // 1. DB에 스케줄 저장 + scheduleMapper.save(schedule); + + // 2. 메모리의 스케줄러에 동적으로 등록 + if (schedule.isActive()) { // Lombok Getter for boolean is isActive() + dynamicSchedulerService.register(schedule); + } + + return schedule; + } + + // TODO: 스케줄 수정 로직(updateSchedule) 구현이 필요합니다. + + @Transactional + public void deactivateSchedule(Long scheduleId) { + // 1. DB에서 스케줄을 비활성화 + Schedule schedule = scheduleMapper.findById(scheduleId) + .orElseThrow(() -> new IllegalArgumentException("Schedule not found with id: " + scheduleId)); + schedule.setActive(false); + scheduleMapper.update(schedule); + + // 2. 메모리의 스케줄러에서 제거 + dynamicSchedulerService.remove(scheduleId); + } +} \ No newline at end of file diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/controller/WorkflowController.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/controller/WorkflowController.java index 39077eca..59825f54 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/controller/WorkflowController.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/controller/WorkflowController.java @@ -1,9 +1,7 @@ package site.icebang.domain.workflow.controller; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.ModelAttribute; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.*; import lombok.RequiredArgsConstructor; @@ -11,6 +9,7 @@ import site.icebang.common.dto.PageParams; import site.icebang.common.dto.PageResult; import site.icebang.domain.workflow.dto.WorkflowCardDto; +import site.icebang.domain.workflow.service.WorkflowExecutionService; import site.icebang.domain.workflow.service.WorkflowService; @RestController @@ -18,6 +17,7 @@ @RequiredArgsConstructor public class WorkflowController { private final WorkflowService workflowService; + private final WorkflowExecutionService workflowExecutionService; @GetMapping("") public ApiResponse> getWorkflowList( @@ -25,4 +25,24 @@ public ApiResponse> getWorkflowList( PageResult result = workflowService.getPagedResult(pageParams); return ApiResponse.success(result); } + + + /** + * 지정된 ID의 워크플로우를 수동으로 실행합니다. + * + * @param workflowId 실행할 워크플로우의 ID + * @return HTTP 202 Accepted + */ + @PostMapping("/{workflowId}/execute") + public ResponseEntity executeWorkflow(@PathVariable Long workflowId) { + // TODO: Spring Security 등 인증 체계에서 실제 사용자 ID를 가져와야 합니다. + Long currentUserId = 1L; // 임시 사용자 ID + + // 워크플로우 실행 서비스 호출. 'MANUAL' 타입으로 실행을 요청합니다. + // @Async로 동작하므로, 이 호출은 즉시 반환되고 워크플로우는 백그라운드에서 실행됩니다. + workflowExecutionService.execute(workflowId, "MANUAL", currentUserId); + + // 작업이 성공적으로 접수되었음을 알리는 202 Accepted 상태를 반환합니다. + return ResponseEntity.accepted().build(); + } } diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/dto/WorkflowCardDto.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/dto/WorkflowCardDto.java index b54a29c0..91f1029c 100644 --- a/apps/user-service/src/main/java/site/icebang/domain/workflow/dto/WorkflowCardDto.java +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/dto/WorkflowCardDto.java @@ -1,6 +1,13 @@ package site.icebang.domain.workflow.dto; -import lombok.Data; +import lombok.Getter; +import lombok.NoArgsConstructor; -@Data -public class WorkflowCardDto {} +@Getter +@NoArgsConstructor +public class WorkflowCardDto { + private Long id; + private String name; + private String description; + private boolean isEnabled; +} \ No newline at end of file diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/WorkflowMapper.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/WorkflowMapper.java new file mode 100644 index 00000000..152477af --- /dev/null +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/mapper/WorkflowMapper.java @@ -0,0 +1,17 @@ +package site.icebang.domain.workflow.mapper; + +import java.util.List; +import java.util.Optional; +import org.apache.ibatis.annotations.Mapper; +import site.icebang.common.dto.PageParams; +import site.icebang.domain.workflow.dto.WorkflowCardDto; +import site.icebang.domain.workflow.model.Workflow; + +@Mapper +public interface WorkflowMapper { + Optional findById(Long id); + + List selectWorkflowList(PageParams pageParams); + + int selectWorkflowCount(PageParams pageParams); +} \ No newline at end of file diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/model/Workflow.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/model/Workflow.java new file mode 100644 index 00000000..01d32485 --- /dev/null +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/model/Workflow.java @@ -0,0 +1,29 @@ +package site.icebang.domain.workflow.model; + +import java.time.LocalDateTime; +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; + +@Getter +@Builder +@NoArgsConstructor(access = AccessLevel.PROTECTED) +@AllArgsConstructor +public class Workflow { + + private Long id; + private String name; + private String description; + private boolean isEnabled; + private LocalDateTime createdAt; + private Long createdBy; + private LocalDateTime updatedAt; + private Long updatedBy; + /** + * 워크플로우별 기본 설정값 (JSON) + */ + private String defaultConfig; + +} \ No newline at end of file diff --git a/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java new file mode 100644 index 00000000..9a07b4d3 --- /dev/null +++ b/apps/user-service/src/main/java/site/icebang/domain/workflow/service/WorkflowExecutionService.java @@ -0,0 +1,164 @@ +package site.icebang.domain.workflow.service; + +import java.time.LocalDateTime; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobParameters; +import org.springframework.batch.core.JobParametersBuilder; +import org.springframework.batch.core.launch.JobLauncher; +import org.springframework.context.ApplicationContext; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; +import site.icebang.domain.execution.mapper.JobRunMapper; +import site.icebang.domain.execution.mapper.WorkflowRunMapper; +import site.icebang.domain.execution.model.JobRun; +import site.icebang.domain.execution.model.WorkflowRun; +import site.icebang.domain.job.mapper.JobMapper; +import site.icebang.domain.job.model.Job; +import site.icebang.domain.mapping.mapper.WorkflowJobMapper; + +@Slf4j +@Service +@RequiredArgsConstructor +public class WorkflowExecutionService { + + private final JobLauncher jobLauncher; + private final ApplicationContext applicationContext; + private final WorkflowJobMapper workflowJobMapper; + private final JobMapper jobMapper; + private final WorkflowRunMapper workflowRunMapper; + private final JobRunMapper jobRunMapper; + + /** + * 워크플로우 실행을 비동기적으로 조율합니다. + * 이 메서드 자체는 트랜잭션을 갖지 않으며, 내부적으로 호출하는 메서드들이 + * 각각 새로운 트랜잭션을 시작하여 실행 상태를 독립적으로 기록합니다. + */ + @Async + public void execute(Long workflowId, String triggerType, Long triggerId) { + log.info("Starting workflow execution for workflowId: {}, triggered by: {}", workflowId, triggerType); + + // Step 1: 워크플로우 실행을 시작하고, 그 결과를 별도의 트랜잭션에 기록합니다. + WorkflowRun workflowRun = this.initiateWorkflowExecution(workflowId, triggerType); + + try { + // Step 2: 워크플로우에 속한 Job들을 순차적으로 실행합니다. + List jobIds = workflowJobMapper.findJobIdsByWorkflowId(workflowId); + if (jobIds.isEmpty()) { + log.warn("No jobs found for workflowId: {}. Marking workflow as SUCCESS.", workflowId); + this.finalizeWorkflowExecution(workflowRun.getId(), "SUCCESS"); + return; + } + + AtomicInteger executionOrder = new AtomicInteger(1); + for (Long jobId : jobIds) { + // 각 Job의 실행과 상태 기록은 독립적인 트랜잭션으로 처리됩니다. + this.executeJobInWorkflow(jobId, workflowRun.getId(), workflowRun.getTraceId(), executionOrder.getAndIncrement()); + } + + // Step 3: 모든 Job이 성공적으로 완료되면, 워크플로우의 최종 상태를 'SUCCESS'로 기록합니다. + this.finalizeWorkflowExecution(workflowRun.getId(), "SUCCESS"); + log.info("Workflow execution successful for traceId: {}", workflowRun.getTraceId()); + + } catch (Exception e) { + // Step 4: Job 실행 중 예외가 발생하면, 워크플로우의 최종 상태를 'FAILED'로 기록합니다. + log.error("Workflow execution failed for traceId: {}. Reason: {}", workflowRun.getTraceId(), e.getMessage(), e); + this.finalizeWorkflowExecution(workflowRun.getId(), "FAILED"); + } + } + + /** + * 워크플로우 실행을 초기화하고 DB에 기록합니다. + * 항상 새로운 트랜잭션에서 실행되어, 이 단계의 성공이 보장됩니다. + */ + @Transactional(propagation = Propagation.REQUIRES_NEW) + public WorkflowRun initiateWorkflowExecution(Long workflowId, String triggerType) { + WorkflowRun workflowRun = WorkflowRun.builder() + .workflowId(workflowId) + .traceId(UUID.randomUUID().toString()) + .status("PENDING") + .triggerType(triggerType) + .build(); + workflowRunMapper.save(workflowRun); + + // 상태를 'RUNNING'으로 변경하고 시작 시간을 기록합니다. + workflowRun.setStartedAt(LocalDateTime.now()); + workflowRun.setStatus("RUNNING"); + workflowRunMapper.update(workflowRun); + log.debug("Initiated workflow run with traceId: {}", workflowRun.getTraceId()); + return workflowRun; + } + + /** + * 워크플로우 실행을 최종 상태(SUCCESS/FAILED)로 업데이트합니다. + * 항상 새로운 트랜잭션에서 실행되어, 실패 시에도 상태 기록이 롤백되지 않습니다. + */ + @Transactional(propagation = Propagation.REQUIRES_NEW) + public void finalizeWorkflowExecution(Long workflowRunId, String status) { + WorkflowRun updatePayload = WorkflowRun.builder() + .id(workflowRunId) + .status(status) + .finishedAt(LocalDateTime.now()) + .build(); + workflowRunMapper.update(updatePayload); + log.debug("Finalized workflow run id: {} with status: {}", workflowRunId, status); + } + + /** + * 워크플로우 내의 단일 Job을 실행하고 그 결과를 DB에 기록합니다. + * 항상 새로운 트랜잭션에서 실행되어, 각 Job의 실행 결과가 독립적으로 커밋됩니다. + * 실패 시 예외를 던져 상위의 orchestrator가 인지하도록 합니다. + */ + @Transactional(propagation = Propagation.REQUIRES_NEW) + public void executeJobInWorkflow(Long jobId, Long workflowRunId, String traceId, int executionOrder) throws Exception { + Job job = jobMapper.findById(jobId) + .orElseThrow(() -> new IllegalStateException("Job not found with id: " + jobId)); + + JobRun jobRun = JobRun.builder() + .workflowRunId(workflowRunId) + .jobId(jobId) + .status("PENDING") + .executionOrder(executionOrder) + .build(); + jobRunMapper.save(jobRun); + + try { + org.springframework.batch.core.Job jobToRun = applicationContext.getBean(job.getName(), org.springframework.batch.core.Job.class); + + JobParameters jobParameters = new JobParametersBuilder() + .addString("traceId", traceId) + .addLong("workflowRunId", workflowRunId) + .addLong("jobRunId", jobRun.getId()) + .addString("runDateTime", LocalDateTime.now().toString()) + .toJobParameters(); + + jobRun.setStatus("RUNNING"); + jobRun.setStartedAt(LocalDateTime.now()); + jobRunMapper.update(jobRun); + log.info("Executing job '{}' (id:{}) for workflow traceId: {}", job.getName(), jobId, traceId); + + JobExecution jobExecution = jobLauncher.run(jobToRun, jobParameters); + + if (jobExecution.getStatus().isUnsuccessful()) { + throw new RuntimeException("Batch job '" + job.getName() + "' failed with status: " + jobExecution.getStatus()); + } + + jobRun.setStatus("SUCCESS"); + log.info("Successfully executed job '{}' (id:{})", job.getName(), jobId); + + } catch (Exception e) { + jobRun.setStatus("FAILED"); + log.error("Failed to execute job '{}' (id:{}). Reason: {}", job.getName(), jobId, e.getMessage()); + throw e; // 워크플로우 전체를 실패 처리하기 위해 예외를 다시 던집니다. + } finally { + jobRun.setFinishedAt(LocalDateTime.now()); + jobRunMapper.update(jobRun); + } + } +} \ No newline at end of file diff --git a/apps/user-service/src/main/java/site/icebang/schedule/mapper/ScheduleMapper.java b/apps/user-service/src/main/java/site/icebang/schedule/mapper/ScheduleMapper.java deleted file mode 100644 index b1a92f1e..00000000 --- a/apps/user-service/src/main/java/site/icebang/schedule/mapper/ScheduleMapper.java +++ /dev/null @@ -1,12 +0,0 @@ -package site.icebang.schedule.mapper; - -import java.util.List; - -import org.apache.ibatis.annotations.Mapper; - -import site.icebang.schedule.model.Schedule; - -@Mapper -public interface ScheduleMapper { - List findAllByIsActive(boolean isActive); -} diff --git a/apps/user-service/src/main/java/site/icebang/schedule/model/Schedule.java b/apps/user-service/src/main/java/site/icebang/schedule/model/Schedule.java deleted file mode 100644 index ced2900c..00000000 --- a/apps/user-service/src/main/java/site/icebang/schedule/model/Schedule.java +++ /dev/null @@ -1,14 +0,0 @@ -package site.icebang.schedule.model; - -import lombok.Getter; -import lombok.Setter; - -@Getter -@Setter -public class Schedule { - private Long scheduleId; - private Long workflowId; - private String cronExpression; - private boolean isActive; - // ... 기타 필요한 컬럼 -} diff --git a/apps/user-service/src/main/java/site/icebang/schedule/service/DynamicSchedulerService.java b/apps/user-service/src/main/java/site/icebang/schedule/service/DynamicSchedulerService.java deleted file mode 100644 index b81e30eb..00000000 --- a/apps/user-service/src/main/java/site/icebang/schedule/service/DynamicSchedulerService.java +++ /dev/null @@ -1,66 +0,0 @@ -package site.icebang.schedule.service; - -import java.time.LocalDateTime; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledFuture; - -import org.springframework.batch.core.Job; -import org.springframework.batch.core.JobParametersBuilder; -import org.springframework.batch.core.launch.JobLauncher; -import org.springframework.context.ApplicationContext; -import org.springframework.scheduling.TaskScheduler; -import org.springframework.scheduling.support.CronTrigger; -import org.springframework.stereotype.Service; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; - -import site.icebang.schedule.model.Schedule; - -@Slf4j -@Service -@RequiredArgsConstructor -public class DynamicSchedulerService { - - private final TaskScheduler taskScheduler; - private final JobLauncher jobLauncher; - private final ApplicationContext applicationContext; - private final Map> scheduledTasks = new ConcurrentHashMap<>(); - - public void register(Schedule schedule) { - // TODO: schedule.getWorkflowId()를 기반으로 실행할 Job의 이름을 DB에서 조회 - String jobName = "blogContentJob"; // 예시 - Job jobToRun = applicationContext.getBean(jobName, Job.class); - - Runnable runnable = - () -> { - try { - JobParametersBuilder paramsBuilder = new JobParametersBuilder(); - paramsBuilder.addString("runAt", LocalDateTime.now().toString()); - paramsBuilder.addLong("scheduleId", schedule.getScheduleId()); - jobLauncher.run(jobToRun, paramsBuilder.toJobParameters()); - } catch (Exception e) { - log.error( - "Failed to run scheduled job for scheduleId: {}", schedule.getScheduleId(), e); - } - }; - - CronTrigger trigger = new CronTrigger(schedule.getCronExpression()); - ScheduledFuture future = taskScheduler.schedule(runnable, trigger); - scheduledTasks.put(schedule.getScheduleId(), future); - log.info( - ">>>> Schedule registered: id={}, cron={}", - schedule.getScheduleId(), - schedule.getCronExpression()); - } - - public void remove(Long scheduleId) { - ScheduledFuture future = scheduledTasks.get(scheduleId); - if (future != null) { - future.cancel(true); - scheduledTasks.remove(scheduleId); - log.info(">>>> Schedule removed: id={}", scheduleId); - } - } -} diff --git a/apps/user-service/src/main/resources/mybatis/mapper/JobMapper.xml b/apps/user-service/src/main/resources/mybatis/mapper/JobMapper.xml new file mode 100644 index 00000000..e2c6e6b1 --- /dev/null +++ b/apps/user-service/src/main/resources/mybatis/mapper/JobMapper.xml @@ -0,0 +1,23 @@ + + + + + + + \ No newline at end of file diff --git a/apps/user-service/src/main/resources/mybatis/mapper/JobRunMapper.xml b/apps/user-service/src/main/resources/mybatis/mapper/JobRunMapper.xml new file mode 100644 index 00000000..2617bdb4 --- /dev/null +++ b/apps/user-service/src/main/resources/mybatis/mapper/JobRunMapper.xml @@ -0,0 +1,22 @@ + + + + + + INSERT INTO job_run (workflow_run_id, job_id, status, execution_order) + VALUES (#{workflowRunId}, #{jobId}, #{status}, #{executionOrder}) + + + + UPDATE job_run + + status = #{status}, + started_at = #{startedAt}, + finished_at = #{finishedAt}, + + WHERE id = #{id} + + + \ No newline at end of file diff --git a/apps/user-service/src/main/resources/mybatis/mapper/ScheduleMapper.xml b/apps/user-service/src/main/resources/mybatis/mapper/ScheduleMapper.xml index f9629b8a..f1bab4cb 100644 --- a/apps/user-service/src/main/resources/mybatis/mapper/ScheduleMapper.xml +++ b/apps/user-service/src/main/resources/mybatis/mapper/ScheduleMapper.xml @@ -1,17 +1,51 @@ - - + + + - + + + + + + + + + + + + + + - + SELECT * FROM schedule WHERE id = #{id} + + + \ No newline at end of file diff --git a/apps/user-service/src/main/resources/mybatis/mapper/WorkflowJobMapper.xml b/apps/user-service/src/main/resources/mybatis/mapper/WorkflowJobMapper.xml new file mode 100644 index 00000000..fa2ed9c8 --- /dev/null +++ b/apps/user-service/src/main/resources/mybatis/mapper/WorkflowJobMapper.xml @@ -0,0 +1,18 @@ + + + + + + + \ No newline at end of file diff --git a/apps/user-service/src/main/resources/mybatis/mapper/WorkflowMapper.xml b/apps/user-service/src/main/resources/mybatis/mapper/WorkflowMapper.xml new file mode 100644 index 00000000..ae658fca --- /dev/null +++ b/apps/user-service/src/main/resources/mybatis/mapper/WorkflowMapper.xml @@ -0,0 +1,41 @@ + + + + + + + + + + + \ No newline at end of file diff --git a/apps/user-service/src/main/resources/mybatis/mapper/WorkflowRunMapper.xml b/apps/user-service/src/main/resources/mybatis/mapper/WorkflowRunMapper.xml new file mode 100644 index 00000000..972af123 --- /dev/null +++ b/apps/user-service/src/main/resources/mybatis/mapper/WorkflowRunMapper.xml @@ -0,0 +1,22 @@ + + + + + + INSERT INTO workflow_run (workflow_id, trace_id, run_number, status, trigger_type, started_at, finished_at, created_by) + VALUES (#{workflowId}, #{traceId}, #{runNumber}, #{status}, #{triggerType}, #{startedAt}, #{finishedAt}, #{createdBy}) + + + + UPDATE workflow_run + + status = #{status}, + started_at = #{startedAt}, + finished_at = #{finishedAt}, + + WHERE id = #{id} + + + \ No newline at end of file