diff --git a/magic-scheduler/src/main/java/place/sita/magicscheduler/InternalTaskSubmitter.java b/magic-scheduler/src/main/java/place/sita/magicscheduler/InternalTaskSubmitter.java index 42e08bf..a9c9f66 100644 --- a/magic-scheduler/src/main/java/place/sita/magicscheduler/InternalTaskSubmitter.java +++ b/magic-scheduler/src/main/java/place/sita/magicscheduler/InternalTaskSubmitter.java @@ -1,22 +1,17 @@ package place.sita.magicscheduler; -import org.springframework.transaction.annotation.Transactional; - import java.util.List; import java.util.UUID; public interface InternalTaskSubmitter { UUID UUID_FOR_USER_SUBMITTED_TASKS = UUID.fromString("00000000-0000-0000-0000-000000000000"); - @Transactional void submitTaskForLater(UUID id, String code, String parameter, UUID parent, List requirements); - @Transactional default void submitTaskForLater(UUID id, String code, String parameter, UUID parent) { submitTaskForLater(id, code, parameter, parent, List.of()); } - @Transactional default UUID submitTaskForLater(String code, String parameter, UUID parent) { UUID uuid = UUID.randomUUID(); submitTaskForLater(uuid, code, parameter, parent); diff --git a/magic-scheduler/src/main/java/place/sita/magicscheduler/InternalTaskSubmitterImpl.java b/magic-scheduler/src/main/java/place/sita/magicscheduler/InternalTaskSubmitterImpl.java index 64a6297..e09b6c8 100644 --- a/magic-scheduler/src/main/java/place/sita/magicscheduler/InternalTaskSubmitterImpl.java +++ b/magic-scheduler/src/main/java/place/sita/magicscheduler/InternalTaskSubmitterImpl.java @@ -3,8 +3,10 @@ import org.jooq.DSLContext; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.support.TransactionSynchronizationManager; import place.sita.labelle.core.persistence.JqRepo; import place.sita.labelle.jooq.enums.TaskStatus; +import place.sita.magicscheduler.scheduler.TypeSpecificQueue; import place.sita.magicscheduler.scheduler.TypeSpecificQueueRegistry; import java.time.LocalDateTime; @@ -16,51 +18,76 @@ @Component public class InternalTaskSubmitterImpl implements InternalTaskSubmitter { - private final DSLContext dslContext; private final TypeSpecificQueueRegistry registry; + private final DatabaseSubmitter databaseSubmitter; - public InternalTaskSubmitterImpl(DSLContext dslContext, - TypeSpecificQueueRegistry registry) { - this.dslContext = dslContext; + public InternalTaskSubmitterImpl(TypeSpecificQueueRegistry registry, + DatabaseSubmitter databaseSubmitter) { this.registry = registry; + this.databaseSubmitter = databaseSubmitter; } @Override - @Transactional public void submitTaskForLater(UUID id, String code, String parameter, UUID parent, List requirements) { - UUID taskTypeId = JqRepo.fetchOne(() -> - dslContext - .select(TASK_TYPE.ID) - .from(TASK_TYPE) - .where(TASK_TYPE.CODE.eq(code)) - .fetch() - ); - - JqRepo.insertOne(() -> - dslContext.insertInto(TASK) - .columns(TASK.ID, TASK.TASK_TYPE_ID, TASK.CREATION_DATE, TASK.STATUS, TASK.PARENT) - .values(id, taskTypeId, LocalDateTime.now(), TaskStatus.CREATED, parent) - .execute() - ); - - JqRepo.insertOne(() -> - dslContext.insertInto(TASK_CONFIG) - .columns(TASK_CONFIG.TASK_ID, TASK_CONFIG.CONFIG) - .values(id, parameter) - .execute() - ); - - for (UUID requirement : requirements) { + TypeSpecificQueue queue = registry.get(code); + + // we need to borrow the exclusive rights to schedule tasks of this type... + queue.doInSchedulingLock(context -> { + databaseSubmitter.submitTaskForLater(id, code, parameter, parent, requirements); + + if (UUID_FOR_USER_SUBMITTED_TASKS.equals(parent)) { + if (TransactionSynchronizationManager.isActualTransactionActive()) { + // we cannot optimize skipping queue easily, as we are in transaction. If this gets rolled back, + // then we'd have to rip away the job from the scheduler itself. Let's... not do that. Maybe some + // other time. + } else { + context.scheduleWithoutQueue(id, parameter); + } + } + }); + } + + @Component + public static class DatabaseSubmitter { + + private final DSLContext dslContext; + + private DatabaseSubmitter(DSLContext dslContext) { + this.dslContext = dslContext; + } + + @Transactional + public void submitTaskForLater(UUID id, String code, String parameter, UUID parent, List requirements) { + UUID taskTypeId = JqRepo.fetchOne(() -> + dslContext + .select(TASK_TYPE.ID) + .from(TASK_TYPE) + .where(TASK_TYPE.CODE.eq(code)) + .fetch() + ); + JqRepo.insertOne(() -> - dslContext.insertInto(TASK_DEPENDENCIES) + dslContext.insertInto(TASK) + .columns(TASK.ID, TASK.TASK_TYPE_ID, TASK.CREATION_DATE, TASK.STATUS, TASK.PARENT) + .values(id, taskTypeId, LocalDateTime.now(), TaskStatus.CREATED, parent) + .execute() + ); + + JqRepo.insertOne(() -> + dslContext.insertInto(TASK_CONFIG) + .columns(TASK_CONFIG.TASK_ID, TASK_CONFIG.CONFIG) + .values(id, parameter) + .execute() + ); + + for (UUID requirement : requirements) { + JqRepo.insertOne(() -> + dslContext.insertInto(TASK_DEPENDENCIES) .columns(TASK_DEPENDENCIES.TASK_ID, TASK_DEPENDENCIES.REQUIRED_DEPENDENCY_TASK_ID) .values(id, requirement) .execute() - ); - } - - if (UUID_FOR_USER_SUBMITTED_TASKS.equals(parent)) { - registry.get(code).scheduleWithoutQueue(id, parameter); + ); + } } } } diff --git a/magic-scheduler/src/main/java/place/sita/magicscheduler/scheduler/TypeSpecificQueue.java b/magic-scheduler/src/main/java/place/sita/magicscheduler/scheduler/TypeSpecificQueue.java index 7cd57ab..2242658 100644 --- a/magic-scheduler/src/main/java/place/sita/magicscheduler/scheduler/TypeSpecificQueue.java +++ b/magic-scheduler/src/main/java/place/sita/magicscheduler/scheduler/TypeSpecificQueue.java @@ -156,14 +156,32 @@ private void scheduledFetchInTransaction(Configuration inner, int toFetchCount) submitTasksWithConfig(inner, jobsToSubmit); } - public void scheduleWithoutQueue(UUID id, String config) { + private void scheduleWithoutQueue(UUID id, String config) { + dslContext.transaction(inner -> { + submitTasksWithConfig(inner, List.of(new TaskWithConfig(id, config))); + }); + } + + public void doInSchedulingLock(InSchedulingLock r) { + Context context = new Context() { + @Override + public void scheduleWithoutQueue(UUID id, String config) { + TypeSpecificQueue.this.scheduleWithoutQueue(id, config); + } + }; synchronized (schedulingLock) { - dslContext.transaction(inner -> { - submitTasksWithConfig(inner, List.of(new TaskWithConfig(id, config))); - }); + r.run(context); } } + public interface Context { + void scheduleWithoutQueue(UUID id, String config); + } + + public interface InSchedulingLock { + void run(Context ctx); + } + private void submitTasksWithConfig(Configuration inner, List jobsToSubmit) { var jobsToSubmitId = jobsToSubmit.stream().map(TaskWithConfig::id).collect(Collectors.toSet());