Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion src/core/application/services/WorkerOrchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -241,11 +241,53 @@ export class WorkerOrchestrator implements IWorkerOrchestrator {
console.log(`[WorkerOrchestrator] Reset ${orphanedMergingTasks.count} orphaned MERGING tasks to READY_TO_MERGE`);
}

// Clean up orphaned TaskDetail records whose parent Task is in a terminal/non-processing state.
// These are pages left in PENDING state from tasks that have been FAILED, CANCELLED, etc.
// Without cleanup, these orphaned pages block ConverterWorker from finding valid work.
// Note: This step intentionally runs after the PROCESSING->PENDING reset above,
// so that any pages from terminal tasks that were both orphaned AND in PROCESSING state
// are first reset to PENDING, then caught here and marked as FAILED.
const terminalTaskStatuses = [
TaskStatus.CREATED,
TaskStatus.FAILED,
TaskStatus.CANCELLED,
TaskStatus.COMPLETED,
TaskStatus.PARTIAL_FAILED,
];

// Find tasks in terminal states that still have PENDING pages
const terminalTasks = await prisma.task.findMany({
where: {
status: { in: terminalTaskStatuses },
},
select: { id: true },
});

let orphanedPendingPages = 0;
if (terminalTasks.length > 0) {
const terminalTaskIds = terminalTasks.map((t) => t.id);
const result = await prisma.taskDetail.updateMany({
where: {
task: { in: terminalTaskIds },
status: PageStatus.PENDING,
},
data: {
status: PageStatus.FAILED,
error: 'Orphaned: parent task no longer active',
},
});
orphanedPendingPages = result.count;
if (orphanedPendingPages > 0) {
console.log(`[WorkerOrchestrator] Marked ${orphanedPendingPages} orphaned PENDING pages as FAILED (parent task in terminal state)`);
}
}

const result: CleanupResult = {
orphanedPages: orphanedPages.count,
orphanedSplittingTasks: orphanedSplittingTasks.count,
orphanedMergingTasks: orphanedMergingTasks.count,
total: orphanedPages.count + orphanedSplittingTasks.count + orphanedMergingTasks.count,
orphanedPendingPages,
total: orphanedPages.count + orphanedSplittingTasks.count + orphanedMergingTasks.count + orphanedPendingPages,
};

if (result.total === 0) {
Expand All @@ -262,6 +304,7 @@ export class WorkerOrchestrator implements IWorkerOrchestrator {
orphanedPages: 0,
orphanedSplittingTasks: 0,
orphanedMergingTasks: 0,
orphanedPendingPages: 0,
total: 0,
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ describe('WorkerOrchestrator', () => {
// Default: no orphaned work
prismaMock.taskDetail.updateMany.mockResolvedValue({ count: 0 } as any)
prismaMock.task.updateMany.mockResolvedValue({ count: 0 } as any)
prismaMock.task.findMany.mockResolvedValue([] as any)
})

afterEach(() => {
Expand Down Expand Up @@ -250,9 +251,15 @@ describe('WorkerOrchestrator', () => {
})

describe('cleanupOrphanedWork', () => {
// Helper to set up default mocks for cleanup (no terminal tasks with orphaned pages)
function setupDefaultCleanupMocks() {
prismaMock.task.findMany.mockResolvedValue([] as any)
}

it('should reset PROCESSING pages to PENDING', async () => {
prismaMock.taskDetail.updateMany.mockResolvedValue({ count: 5 } as any)
prismaMock.task.updateMany.mockResolvedValue({ count: 0 } as any)
setupDefaultCleanupMocks()

const result = await orchestrator.cleanupOrphanedWork()

Expand All @@ -275,6 +282,7 @@ describe('WorkerOrchestrator', () => {
prismaMock.task.updateMany
.mockResolvedValueOnce({ count: 2 } as any) // SPLITTING -> PENDING
.mockResolvedValueOnce({ count: 0 } as any) // MERGING -> READY_TO_MERGE
setupDefaultCleanupMocks()

const result = await orchestrator.cleanupOrphanedWork()

Expand All @@ -296,6 +304,7 @@ describe('WorkerOrchestrator', () => {
prismaMock.task.updateMany
.mockResolvedValueOnce({ count: 0 } as any) // SPLITTING -> PENDING
.mockResolvedValueOnce({ count: 3 } as any) // MERGING -> READY_TO_MERGE
setupDefaultCleanupMocks()

const result = await orchestrator.cleanupOrphanedWork()

Expand All @@ -312,27 +321,55 @@ describe('WorkerOrchestrator', () => {
expect(result.orphanedMergingTasks).toBe(3)
})

it('should mark orphaned PENDING pages from terminal tasks as FAILED', async () => {
prismaMock.taskDetail.updateMany
.mockResolvedValueOnce({ count: 0 } as any) // PROCESSING -> PENDING (first call)
.mockResolvedValueOnce({ count: 4 } as any) // orphaned PENDING -> FAILED (second call)
prismaMock.task.updateMany.mockResolvedValue({ count: 0 } as any)
prismaMock.task.findMany.mockResolvedValue([
{ id: 'failed-task-1' },
{ id: 'cancelled-task-2' },
] as any)

const result = await orchestrator.cleanupOrphanedWork()

expect(prismaMock.taskDetail.updateMany).toHaveBeenCalledWith({
where: {
task: { in: ['failed-task-1', 'cancelled-task-2'] },
status: PageStatus.PENDING,
},
data: {
status: PageStatus.FAILED,
error: 'Orphaned: parent task no longer active',
}
})
expect(result.orphanedPendingPages).toBe(4)
})

it('should return total=0 when no orphaned work', async () => {
prismaMock.taskDetail.updateMany.mockResolvedValue({ count: 0 } as any)
prismaMock.task.updateMany.mockResolvedValue({ count: 0 } as any)
setupDefaultCleanupMocks()

const result = await orchestrator.cleanupOrphanedWork()

expect(result.total).toBe(0)
expect(result.orphanedPages).toBe(0)
expect(result.orphanedSplittingTasks).toBe(0)
expect(result.orphanedMergingTasks).toBe(0)
expect(result.orphanedPendingPages).toBe(0)
})

it('should return sum of all orphaned items as total', async () => {
prismaMock.taskDetail.updateMany.mockResolvedValue({ count: 5 } as any)
prismaMock.task.updateMany
.mockResolvedValueOnce({ count: 2 } as any)
.mockResolvedValueOnce({ count: 3 } as any)
setupDefaultCleanupMocks()

const result = await orchestrator.cleanupOrphanedWork()

expect(result.total).toBe(10) // 5 + 2 + 3
expect(result.total).toBe(10) // 5 + 2 + 3 + 0 (no terminal tasks)
})

it('should return empty result on error without interrupting startup', async () => {
Expand All @@ -344,6 +381,7 @@ describe('WorkerOrchestrator', () => {
orphanedPages: 0,
orphanedSplittingTasks: 0,
orphanedMergingTasks: 0,
orphanedPendingPages: 0,
total: 0
})
})
Expand All @@ -352,6 +390,7 @@ describe('WorkerOrchestrator', () => {
const consoleSpy = vi.spyOn(console, 'log').mockImplementation(() => {})
prismaMock.taskDetail.updateMany.mockResolvedValue({ count: 2 } as any)
prismaMock.task.updateMany.mockResolvedValue({ count: 0 } as any)
setupDefaultCleanupMocks()

await orchestrator.cleanupOrphanedWork()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export interface CleanupResult {
orphanedPages: number;
orphanedSplittingTasks: number;
orphanedMergingTasks: number;
orphanedPendingPages: number;
total: number;
}

Expand Down
43 changes: 21 additions & 22 deletions src/core/application/workers/ConverterWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,27 +159,39 @@ export class ConverterWorker extends WorkerBase {
/**
* Claim a PENDING page for processing using optimistic locking.
*
* Query conditions:
* - task.status = PROCESSING
* - task.status != CANCELLED
* - page.status = PENDING
* - page.worker_id = null
* Strategy: First find tasks in valid states (PROCESSING/COMPLETED),
* then find PENDING pages only within those tasks. This avoids the problem
* where orphaned PENDING pages from terminal tasks exhaust claim attempts.
*
* Order: retry_count ASC, page ASC (prioritize fresh pages)
*/
private async claimPage(): Promise<PageWithTask | null> {
const maxAttempts = 5;
const checkedTaskIds: string[] = []; // Track tasks we've already checked and skipped

// Pre-filter: find tasks in valid states for conversion.
// Done once before the retry loop since task states rarely change
// within the milliseconds between optimistic lock retries.
const activeTasks = await prisma.task.findMany({
where: {
status: { in: [TaskStatus.PROCESSING, TaskStatus.COMPLETED] },
},
select: { id: true },
});

if (activeTasks.length === 0) {
return null;
}

const activeTaskIds = activeTasks.map((t) => t.id);

for (let attempt = 0; attempt < maxAttempts; attempt++) {
try {
// Step 1: Find a candidate page, excluding tasks we've already checked
// Step 1: Find a candidate page within active tasks only
const candidate = await prisma.taskDetail.findFirst({
where: {
status: PageStatus.PENDING,
worker_id: null,
// Exclude pages from tasks we've already checked and found not in PROCESSING state
...(checkedTaskIds.length > 0 && { task: { notIn: checkedTaskIds } }),
task: { in: activeTaskIds },
},
orderBy: [
{ retry_count: 'asc' },
Expand All @@ -191,19 +203,6 @@ export class ConverterWorker extends WorkerBase {
return null;
}

// Step 2: Verify the parent task is in a valid state for processing
// Allow PROCESSING (normal flow) and COMPLETED (single page retry)
const task = await prisma.task.findUnique({
where: { id: candidate.task },
select: { status: true },
});

if (!task || (task.status !== TaskStatus.PROCESSING && task.status !== TaskStatus.COMPLETED)) {
// Task not in correct state, remember it and try next
checkedTaskIds.push(candidate.task);
continue;
}

// Step 3: Try to claim using optimistic locking
const result = await prisma.taskDetail.updateMany({
where: {
Expand Down
56 changes: 32 additions & 24 deletions src/core/application/workers/__tests__/ConverterWorker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ vi.mock('../../../infrastructure/db/index.js', () => ({
prisma: {
$transaction: vi.fn(),
task: {
findMany: vi.fn().mockResolvedValue([]),
findUnique: vi.fn(),
update: vi.fn(),
},
Expand Down Expand Up @@ -91,8 +92,8 @@ describe('ConverterWorker', () => {

describe('run()', () => {
it('should set isRunning to true when started', async () => {
// Mock claimPage to return null (no pages)
vi.mocked(prisma.taskDetail.findFirst).mockResolvedValue(null);
// Mock claimPage to return null (no active tasks)
vi.mocked(prisma.task.findMany).mockResolvedValue([]);

const runPromise = worker.run();
await new Promise((resolve) => setTimeout(resolve, 50));
Expand All @@ -104,7 +105,7 @@ describe('ConverterWorker', () => {
});

it('should stop when stop() is called', async () => {
vi.mocked(prisma.taskDetail.findFirst).mockResolvedValue(null);
vi.mocked(prisma.task.findMany).mockResolvedValue([]);

const runPromise = worker.run();
await new Promise((resolve) => setTimeout(resolve, 50));
Expand All @@ -118,12 +119,12 @@ describe('ConverterWorker', () => {
it('should continue running after error in main loop', async () => {
let callCount = 0;

vi.mocked(prisma.taskDetail.findFirst).mockImplementation(async () => {
vi.mocked(prisma.task.findMany).mockImplementation(async () => {
callCount++;
if (callCount === 1) {
throw new Error('Transient error');
}
return null;
return [];
});

const runPromise = worker.run();
Expand Down Expand Up @@ -159,8 +160,8 @@ describe('ConverterWorker', () => {
updatedAt: new Date(),
};

vi.mocked(prisma.task.findMany).mockResolvedValue([{ id: 'task123' }] as any);
vi.mocked(prisma.taskDetail.findFirst).mockResolvedValue(mockPage as any);
vi.mocked(prisma.task.findUnique).mockResolvedValue({ status: TaskStatus.PROCESSING } as any);
vi.mocked(prisma.taskDetail.updateMany).mockResolvedValue({ count: 1 });
vi.mocked(prisma.taskDetail.findUnique).mockResolvedValue({
...mockPage,
Expand All @@ -174,29 +175,18 @@ describe('ConverterWorker', () => {
expect(prisma.taskDetail.updateMany).toHaveBeenCalled();
});

it('should return null if no PENDING pages exist', async () => {
vi.mocked(prisma.taskDetail.findFirst).mockResolvedValue(null);
it('should return null if no active tasks exist', async () => {
vi.mocked(prisma.task.findMany).mockResolvedValue([]);

const result = await (worker as any).claimPage();

expect(result).toBeNull();
expect(prisma.taskDetail.findFirst).not.toHaveBeenCalled();
});

it('should skip pages from tasks not in PROCESSING state', async () => {
const mockPage = {
id: 1,
task: 'task123',
status: PageStatus.PENDING,
worker_id: null,
};

// First call returns a page, second call returns null
vi.mocked(prisma.taskDetail.findFirst)
.mockResolvedValueOnce(mockPage as any)
.mockResolvedValueOnce(null);

// Task is not in PROCESSING state
vi.mocked(prisma.task.findUnique).mockResolvedValue({ status: TaskStatus.CANCELLED } as any);
it('should return null if no PENDING pages exist in active tasks', async () => {
vi.mocked(prisma.task.findMany).mockResolvedValue([{ id: 'task123' }] as any);
vi.mocked(prisma.taskDetail.findFirst).mockResolvedValue(null);

const result = await (worker as any).claimPage();

Expand All @@ -211,10 +201,10 @@ describe('ConverterWorker', () => {
worker_id: null,
};

vi.mocked(prisma.task.findMany).mockResolvedValue([{ id: 'task123' }] as any);
vi.mocked(prisma.taskDetail.findFirst)
.mockResolvedValueOnce(mockPage as any)
.mockResolvedValueOnce(null);
vi.mocked(prisma.task.findUnique).mockResolvedValue({ status: TaskStatus.PROCESSING } as any);
// First attempt fails (another worker claimed it)
vi.mocked(prisma.taskDetail.updateMany).mockResolvedValueOnce({ count: 0 });

Expand All @@ -227,6 +217,7 @@ describe('ConverterWorker', () => {
it('should prioritize pages with lower retry_count', async () => {
let queryParams: any;

vi.mocked(prisma.task.findMany).mockResolvedValue([{ id: 'task123' }] as any);
vi.mocked(prisma.taskDetail.findFirst).mockImplementation(async (params: any) => {
queryParams = params;
return null;
Expand All @@ -236,6 +227,23 @@ describe('ConverterWorker', () => {

expect(queryParams.orderBy).toEqual([{ retry_count: 'asc' }, { page: 'asc' }]);
});

it('should only search within active task IDs', async () => {
let queryParams: any;

vi.mocked(prisma.task.findMany).mockResolvedValue([
{ id: 'task-a' },
{ id: 'task-b' },
] as any);
vi.mocked(prisma.taskDetail.findFirst).mockImplementation(async (params: any) => {
queryParams = params;
return null;
});

await (worker as any).claimPage();

expect(queryParams.where.task).toEqual({ in: ['task-a', 'task-b'] });
});
});

describe('convertPage()', () => {
Expand Down
2 changes: 1 addition & 1 deletion src/main/ipc/handlers/__tests__/completion.handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ describe('Completion Handler', () => {
}),
expect.objectContaining({
type: 'text',
text: 'Test connection.'
text: 'Please identify the largest letter in the image.'
})
])
})
Expand Down
Loading