Skip to content

Commit

Permalink
Fix potential source of problems with optimistic scheduling without a…
Browse files Browse the repository at this point in the history
…ctual queue
  • Loading branch information
kamil-sita committed Jun 8, 2024
1 parent beca743 commit 4f9745f
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 43 deletions.
Original file line number Diff line number Diff line change
@@ -1,22 +1,17 @@
package place.sita.magicscheduler;

import org.springframework.transaction.annotation.Transactional;

import java.util.List;
import java.util.UUID;

public interface InternalTaskSubmitter {
UUID UUID_FOR_USER_SUBMITTED_TASKS = UUID.fromString("00000000-0000-0000-0000-000000000000");

@Transactional
void submitTaskForLater(UUID id, String code, String parameter, UUID parent, List<UUID> requirements);

@Transactional
default void submitTaskForLater(UUID id, String code, String parameter, UUID parent) {
submitTaskForLater(id, code, parameter, parent, List.of());
}

@Transactional
default UUID submitTaskForLater(String code, String parameter, UUID parent) {
UUID uuid = UUID.randomUUID();
submitTaskForLater(uuid, code, parameter, parent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import org.jooq.DSLContext;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import place.sita.labelle.core.persistence.JqRepo;
import place.sita.labelle.jooq.enums.TaskStatus;
import place.sita.magicscheduler.scheduler.TypeSpecificQueue;
import place.sita.magicscheduler.scheduler.TypeSpecificQueueRegistry;

import java.time.LocalDateTime;
Expand All @@ -16,51 +18,76 @@
@Component
public class InternalTaskSubmitterImpl implements InternalTaskSubmitter {

private final DSLContext dslContext;
private final TypeSpecificQueueRegistry registry;
private final DatabaseSubmitter databaseSubmitter;

public InternalTaskSubmitterImpl(DSLContext dslContext,
TypeSpecificQueueRegistry registry) {
this.dslContext = dslContext;
public InternalTaskSubmitterImpl(TypeSpecificQueueRegistry registry,
DatabaseSubmitter databaseSubmitter) {
this.registry = registry;
this.databaseSubmitter = databaseSubmitter;
}

@Override
@Transactional
public void submitTaskForLater(UUID id, String code, String parameter, UUID parent, List<UUID> requirements) {
UUID taskTypeId = JqRepo.fetchOne(() ->
dslContext
.select(TASK_TYPE.ID)
.from(TASK_TYPE)
.where(TASK_TYPE.CODE.eq(code))
.fetch()
);

JqRepo.insertOne(() ->
dslContext.insertInto(TASK)
.columns(TASK.ID, TASK.TASK_TYPE_ID, TASK.CREATION_DATE, TASK.STATUS, TASK.PARENT)
.values(id, taskTypeId, LocalDateTime.now(), TaskStatus.CREATED, parent)
.execute()
);

JqRepo.insertOne(() ->
dslContext.insertInto(TASK_CONFIG)
.columns(TASK_CONFIG.TASK_ID, TASK_CONFIG.CONFIG)
.values(id, parameter)
.execute()
);

for (UUID requirement : requirements) {
TypeSpecificQueue queue = registry.get(code);

// we need to borrow the exclusive rights to schedule tasks of this type...
queue.doInSchedulingLock(context -> {
databaseSubmitter.submitTaskForLater(id, code, parameter, parent, requirements);

if (UUID_FOR_USER_SUBMITTED_TASKS.equals(parent)) {
if (TransactionSynchronizationManager.isActualTransactionActive()) {
// we cannot optimize skipping queue easily, as we are in transaction. If this gets rolled back,
// then we'd have to rip away the job from the scheduler itself. Let's... not do that. Maybe some
// other time.
} else {
context.scheduleWithoutQueue(id, parameter);
}
}
});
}

@Component
public static class DatabaseSubmitter {

private final DSLContext dslContext;

private DatabaseSubmitter(DSLContext dslContext) {
this.dslContext = dslContext;
}

@Transactional
public void submitTaskForLater(UUID id, String code, String parameter, UUID parent, List<UUID> requirements) {
UUID taskTypeId = JqRepo.fetchOne(() ->
dslContext
.select(TASK_TYPE.ID)
.from(TASK_TYPE)
.where(TASK_TYPE.CODE.eq(code))
.fetch()
);

JqRepo.insertOne(() ->
dslContext.insertInto(TASK_DEPENDENCIES)
dslContext.insertInto(TASK)
.columns(TASK.ID, TASK.TASK_TYPE_ID, TASK.CREATION_DATE, TASK.STATUS, TASK.PARENT)
.values(id, taskTypeId, LocalDateTime.now(), TaskStatus.CREATED, parent)
.execute()
);

JqRepo.insertOne(() ->
dslContext.insertInto(TASK_CONFIG)
.columns(TASK_CONFIG.TASK_ID, TASK_CONFIG.CONFIG)
.values(id, parameter)
.execute()
);

for (UUID requirement : requirements) {
JqRepo.insertOne(() ->
dslContext.insertInto(TASK_DEPENDENCIES)
.columns(TASK_DEPENDENCIES.TASK_ID, TASK_DEPENDENCIES.REQUIRED_DEPENDENCY_TASK_ID)
.values(id, requirement)
.execute()
);
}

if (UUID_FOR_USER_SUBMITTED_TASKS.equals(parent)) {
registry.get(code).scheduleWithoutQueue(id, parameter);
);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,14 +156,32 @@ private void scheduledFetchInTransaction(Configuration inner, int toFetchCount)
submitTasksWithConfig(inner, jobsToSubmit);
}

public void scheduleWithoutQueue(UUID id, String config) {
private void scheduleWithoutQueue(UUID id, String config) {
dslContext.transaction(inner -> {
submitTasksWithConfig(inner, List.of(new TaskWithConfig(id, config)));
});
}

public void doInSchedulingLock(InSchedulingLock r) {
Context context = new Context() {
@Override
public void scheduleWithoutQueue(UUID id, String config) {
TypeSpecificQueue.this.scheduleWithoutQueue(id, config);
}
};
synchronized (schedulingLock) {
dslContext.transaction(inner -> {
submitTasksWithConfig(inner, List.of(new TaskWithConfig(id, config)));
});
r.run(context);
}
}

public interface Context {
void scheduleWithoutQueue(UUID id, String config);
}

public interface InSchedulingLock {
void run(Context ctx);
}

private void submitTasksWithConfig(Configuration inner, List<TaskWithConfig> jobsToSubmit) {
var jobsToSubmitId = jobsToSubmit.stream().map(TaskWithConfig::id).collect(Collectors.toSet());

Expand Down

0 comments on commit 4f9745f

Please sign in to comment.