Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 71 additions & 23 deletions src/durable-objects/task-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,9 @@ const MAX_NO_PROGRESS_RESUMES = 3;
const MAX_STALL_ITERATIONS = 5;
// Max times the model can call the exact same tool with the same args before we break the loop
const MAX_SAME_TOOL_REPEATS = 3;
// Max total tool calls before forcing a final answer (prevents excessive API usage)
const MAX_TOTAL_TOOLS_FREE = 50;
const MAX_TOTAL_TOOLS_PAID = 100;

/** Get the auto-resume limit based on model cost */
function getAutoResumeLimit(modelAlias: string): number {
Expand Down Expand Up @@ -496,39 +499,26 @@ export class TaskProcessor extends DurableObject<TaskProcessorEnv> {

const timeSinceUpdate = Date.now() - task.lastUpdate;
const isPaidModel = getModel(task.modelAlias)?.isFree !== true;
const isFreeModel = !isPaidModel;
const stuckThreshold = isPaidModel ? STUCK_THRESHOLD_PAID_MS : STUCK_THRESHOLD_FREE_MS;
console.log(`[TaskProcessor] Time since last update: ${timeSinceUpdate}ms (threshold: ${stuckThreshold / 1000}s, ${isPaidModel ? 'paid' : 'free'})`);

// If task updated recently, it's still running - reschedule watchdog
if (timeSinceUpdate < stuckThreshold) {
console.log('[TaskProcessor] Task still active, rescheduling watchdog');
await this.doState.storage.setAlarm(Date.now() + WATCHDOG_INTERVAL_MS);
return;
}

// Task appears stuck - likely DO was terminated by Cloudflare
console.log('[TaskProcessor] Task appears stuck');

// Delete stale status message if it exists
if (task.telegramToken && task.statusMessageId) {
await this.deleteTelegramMessage(task.telegramToken, task.chatId, task.statusMessageId);
}

const resumeCount = task.autoResumeCount ?? 0;
const elapsed = Math.round((Date.now() - task.startTime) / 1000);
const elapsedMs = Date.now() - task.startTime;
const maxResumes = getAutoResumeLimit(task.modelAlias);
const isFreeModel = getModel(task.modelAlias)?.isFree === true;
const maxElapsedMs = isFreeModel ? MAX_ELAPSED_FREE_MS : MAX_ELAPSED_PAID_MS;
const elapsedMs = Date.now() - task.startTime;
const elapsed = Math.round(elapsedMs / 1000);
console.log(`[TaskProcessor] Time since last update: ${timeSinceUpdate}ms, elapsed: ${elapsed}s (threshold: ${stuckThreshold / 1000}s, limit: ${Math.round(maxElapsedMs / 60000)}min, ${isPaidModel ? 'paid' : 'free'})`);

// Check elapsed time cap (prevents runaway tasks)
// Check elapsed time cap FIRST — even if the task is still active,
// stop it if it has exceeded the maximum allowed duration.
// This prevents runaway tasks that make slow progress indefinitely.
if (elapsedMs > maxElapsedMs) {
console.log(`[TaskProcessor] Elapsed time cap reached: ${elapsed}s > ${maxElapsedMs / 1000}s`);
task.status = 'failed';
task.error = `Task exceeded time limit (${Math.round(maxElapsedMs / 60000)}min). Progress saved.`;
await this.doState.storage.put('task', task);

if (task.telegramToken) {
if (task.statusMessageId) {
await this.deleteTelegramMessage(task.telegramToken, task.chatId, task.statusMessageId);
}
await this.sendTelegramMessageWithButtons(
task.telegramToken,
task.chatId,
Expand All @@ -539,6 +529,24 @@ export class TaskProcessor extends DurableObject<TaskProcessorEnv> {
return;
}

// If task updated recently, it's still running - reschedule watchdog
if (timeSinceUpdate < stuckThreshold) {
console.log('[TaskProcessor] Task still active, rescheduling watchdog');
await this.doState.storage.setAlarm(Date.now() + WATCHDOG_INTERVAL_MS);
return;
}

// Task appears stuck - likely DO was terminated by Cloudflare
console.log('[TaskProcessor] Task appears stuck');

// Delete stale status message if it exists
if (task.telegramToken && task.statusMessageId) {
await this.deleteTelegramMessage(task.telegramToken, task.chatId, task.statusMessageId);
}

const resumeCount = task.autoResumeCount ?? 0;
const maxResumes = getAutoResumeLimit(task.modelAlias);

// Check if auto-resume is enabled and under limit
if (task.autoResume && resumeCount < maxResumes && task.telegramToken && task.openrouterKey) {
// --- STALL DETECTION ---
Expand Down Expand Up @@ -996,6 +1004,12 @@ export class TaskProcessor extends DurableObject<TaskProcessorEnv> {
if (existingTask.autoResumeCount !== undefined) {
task.autoResumeCount = existingTask.autoResumeCount;
}
// Preserve original startTime so elapsed time cap works across resumes.
// Without this, each auto-resume resets startTime to Date.now(), making
// the elapsed cap (15min free / 30min paid) never trigger.
if (existingTask.startTime) {
task.startTime = existingTask.startTime;
}
// Preserve stall detection state across resumes
task.toolCountAtLastResume = existingTask.toolCountAtLastResume;
task.noProgressResumes = existingTask.noProgressResumes;
Expand Down Expand Up @@ -1161,6 +1175,30 @@ export class TaskProcessor extends DurableObject<TaskProcessorEnv> {
return; // Exit silently - cancel handler already notified user
}

// Defense-in-depth: check elapsed time cap in the main loop
// The alarm handler also checks this, but this catches cases where
// the task runs continuously without the alarm firing.
{
const loopElapsedMs = Date.now() - task.startTime;
const loopMaxMs = (getModel(task.modelAlias)?.isFree === true) ? MAX_ELAPSED_FREE_MS : MAX_ELAPSED_PAID_MS;
if (loopElapsedMs > loopMaxMs) {
console.log(`[TaskProcessor] Elapsed time cap in main loop: ${Math.round(loopElapsedMs / 1000)}s > ${loopMaxMs / 1000}s`);
task.status = 'failed';
task.error = `Task exceeded time limit (${Math.round(loopMaxMs / 60000)}min). Progress saved.`;
await this.doState.storage.put('task', task);
await this.doState.storage.deleteAlarm();
if (statusMessageId) {
await this.deleteTelegramMessage(request.telegramToken, request.chatId, statusMessageId);
}
await this.sendTelegramMessageWithButtons(
request.telegramToken, request.chatId,
`⏰ Task exceeded ${Math.round(loopMaxMs / 60000)}min time limit (${task.iterations} iterations, ${task.toolsUsed.length} tools).\n\n💡 Progress saved. Tap Resume to continue from checkpoint.`,
[[{ text: '🔄 Resume', callback_data: 'resume:task' }]]
);
return;
}
}

task.iterations++;
task.lastUpdate = Date.now();
currentTool = null;
Expand Down Expand Up @@ -1780,6 +1818,16 @@ export class TaskProcessor extends DurableObject<TaskProcessorEnv> {

console.log(`[TaskProcessor] Iteration ${task.iterations} COMPLETE - total time: ${Date.now() - iterStartTime}ms`);

// Check total tool call limit — prevents excessive API usage on runaway tasks
const maxTotalTools = (getModel(task.modelAlias)?.isFree === true) ? MAX_TOTAL_TOOLS_FREE : MAX_TOTAL_TOOLS_PAID;
if (task.toolsUsed.length >= maxTotalTools) {
console.log(`[TaskProcessor] Total tool call limit reached: ${task.toolsUsed.length} >= ${maxTotalTools}`);
conversationMessages.push({
role: 'user',
content: `[SYSTEM] You have used ${task.toolsUsed.length} tool calls, which is the maximum allowed for this task. You MUST now provide your final answer using the information you have gathered so far. Do NOT call any more tools.`,
});
}

// Continue loop for next iteration
continue;
}
Expand Down
Loading