diff --git a/cli/src/execution/parallel.ts b/cli/src/execution/parallel.ts index 5318088e..af82ac17 100644 --- a/cli/src/execution/parallel.ts +++ b/cli/src/execution/parallel.ts @@ -9,6 +9,7 @@ import { syncPrdToIssue } from "../git/issue-sync.ts"; import { abortMerge, analyzePreMerge, + createIntegrationBranch, deleteLocalBranch, mergeAgentBranch, sortByConflictLikelihood, @@ -310,84 +311,114 @@ export async function runParallel( // Track processed tasks in dry-run mode (since we don't modify the source file) const dryRunProcessedIds = new Set(); - // Process tasks in batches - let iteration = 0; + // Track the current base branch for chaining (integration branch pattern) + let currentBaseBranch = originalBaseBranch; - while (true) { - // Check iteration limit - if (maxIterations > 0 && iteration >= maxIterations) { - logInfo(`Reached max iterations (${maxIterations})`); - break; - } + // Track created integration branches for cleanup + const createdIntegrationBranches: string[] = []; - // Get tasks for this batch - let tasks: Task[] = []; + try { + // Process tasks in batches + let iteration = 0; + + while (true) { + // Check iteration limit + if (maxIterations > 0 && iteration >= maxIterations) { + logInfo(`Reached max iterations (${maxIterations})`); + break; + } - const taskSourceWithGroups = taskSource as TaskSource & { - getParallelGroup?: (title: string) => Promise; - getTasksInGroup?: (group: number) => Promise; - }; + // Get tasks for this batch + let tasks: Task[] = []; - if (taskSourceWithGroups.getParallelGroup && taskSourceWithGroups.getTasksInGroup) { - let nextTask = await taskSource.getNextTask(); - if (dryRun && nextTask && dryRunProcessedIds.has(nextTask.id)) { - const allTasks = await taskSource.getAllTasks(); - nextTask = allTasks.find((task) => !dryRunProcessedIds.has(task.id)) || null; - } - if (!nextTask) break; + const taskSourceWithGroups = taskSource as TaskSource & { + getParallelGroup?: (title: string) => Promise; + getTasksInGroup?: (group: number) => Promise; + }; + + if (taskSourceWithGroups.getParallelGroup && taskSourceWithGroups.getTasksInGroup) { + let nextTask = await taskSource.getNextTask(); + if (dryRun && nextTask && dryRunProcessedIds.has(nextTask.id)) { + const allTasks = await taskSource.getAllTasks(); + nextTask = allTasks.find((task) => !dryRunProcessedIds.has(task.id)) || null; + } + if (!nextTask) break; - const group = await taskSourceWithGroups.getParallelGroup(nextTask.title); - if (group > 0) { - tasks = await taskSourceWithGroups.getTasksInGroup(group); + const group = await taskSourceWithGroups.getParallelGroup(nextTask.title); + if (group > 0) { + tasks = await taskSourceWithGroups.getTasksInGroup(group); + if (dryRun) { + tasks = tasks.filter((task) => !dryRunProcessedIds.has(task.id)); + } + } else { + tasks = [nextTask]; + } + } else { + tasks = await taskSource.getAllTasks(); if (dryRun) { tasks = tasks.filter((task) => !dryRunProcessedIds.has(task.id)); } - } else { - tasks = [nextTask]; } - } else { - tasks = await taskSource.getAllTasks(); - if (dryRun) { - tasks = tasks.filter((task) => !dryRunProcessedIds.has(task.id)); + + if (tasks.length === 0) { + logSuccess("All tasks completed!"); + break; } - } - if (tasks.length === 0) { - logSuccess("All tasks completed!"); - break; - } + // Limit to maxParallel + const batch = tasks.slice(0, maxParallel); + iteration++; - // Limit to maxParallel - const batch = tasks.slice(0, maxParallel); - iteration++; + const batchStartTime = Date.now(); + logInfo(`Batch ${iteration}: ${batch.length} tasks in parallel`); - const batchStartTime = Date.now(); - logInfo(`Batch ${iteration}: ${batch.length} tasks in parallel`); + if (dryRun) { + logInfo("(dry run) Skipping batch"); + // Track processed tasks to avoid infinite loop + for (const task of batch) { + dryRunProcessedIds.add(task.id); + } + continue; + } - if (dryRun) { - logInfo("(dry run) Skipping batch"); - // Track processed tasks to avoid infinite loop + // Log task names being processed for (const task of batch) { - dryRunProcessedIds.add(task.id); + logInfo(` -> ${task.title}`); } - continue; - } - - // Log task names being processed - for (const task of batch) { - logInfo(` -> ${task.title}`); - } - // Run agents in parallel (using sandbox or worktree mode) - const promises = batch.map((task) => { - globalAgentNum++; + // Run agents in parallel (using sandbox or worktree mode) + const promises = batch.map((task) => { + globalAgentNum++; + + const runInSandbox = () => + runAgentInSandbox( + engine, + task, + globalAgentNum, + getSandboxBase(workDir), + workDir, + prdSource, + prdFile, + prdIsFolder, + maxRetries, + retryDelay, + skipTests, + skipLint, + browserEnabled, + modelOverride, + engineArgs, + ); + + if (effectiveUseSandbox) { + return runInSandbox(); + } - const runInSandbox = () => - runAgentInSandbox( + return runAgentInWorktree( engine, task, globalAgentNum, - getSandboxBase(workDir), + currentBaseBranch, + isolationBase, workDir, prdSource, prdFile, @@ -399,269 +430,356 @@ export async function runParallel( browserEnabled, modelOverride, engineArgs, - ); - - if (effectiveUseSandbox) { - return runInSandbox(); - } - - return runAgentInWorktree( - engine, - task, - globalAgentNum, - baseBranch, - isolationBase, - workDir, - prdSource, - prdFile, - prdIsFolder, - maxRetries, - retryDelay, - skipTests, - skipLint, - browserEnabled, - modelOverride, - engineArgs, - ).then((res) => { - if (shouldFallbackToSandbox(res.error)) { - logWarn(`Agent ${globalAgentNum}: Worktree unavailable, retrying in sandbox mode.`); - if (res.worktreeDir) { - cleanupAgentWorktree(res.worktreeDir, res.branchName, workDir).catch(() => { - // Ignore cleanup failures during fallback - }); + ).then((res) => { + if (shouldFallbackToSandbox(res.error)) { + logWarn(`Agent ${globalAgentNum}: Worktree unavailable, retrying in sandbox mode.`); + if (res.worktreeDir) { + cleanupAgentWorktree(res.worktreeDir, res.branchName, workDir).catch(() => { + // Ignore cleanup failures during fallback + }); + } + return runInSandbox(); } - return runInSandbox(); - } - return res; + return res; + }); }); - }); - const results = await Promise.all(promises); - - // Process results and collect worktrees for parallel cleanup - let sawRetryableFailure = false; - const worktreesToCleanup: Array<{ worktreeDir: string; branchName: string }> = []; - - for (const agentResult of results) { - const { - task, - agentNum, - worktreeDir, - result: aiResult, - error, - usedSandbox: agentUsedSandbox, - } = agentResult; - let branchName = agentResult.branchName; - let failureReason: string | undefined = error; - let retryableFailure = false; - let preserveSandbox = false; - - if (!failureReason && aiResult?.success && agentUsedSandbox && worktreeDir) { - try { - const modifiedFiles = await getModifiedFiles(worktreeDir, workDir); - if (modifiedFiles.length > 0) { - const commitResult = await commitSandboxChanges( - workDir, - modifiedFiles, - worktreeDir, - task.title, - agentNum, - originalBaseBranch, - ); + const results = await Promise.all(promises); - if (commitResult.success) { - branchName = commitResult.branchName; - logDebug( - `Agent ${agentNum}: Committed ${commitResult.filesCommitted} files to ${branchName}`, + // Process results and collect worktrees for parallel cleanup + let sawRetryableFailure = false; + const worktreesToCleanup: Array<{ worktreeDir: string; branchName: string }> = []; + + for (const agentResult of results) { + const { + task, + agentNum, + worktreeDir, + result: aiResult, + error, + usedSandbox: agentUsedSandbox, + } = agentResult; + let branchName = agentResult.branchName; + let failureReason: string | undefined = error; + let retryableFailure = false; + let preserveSandbox = false; + + if (!failureReason && aiResult?.success && agentUsedSandbox && worktreeDir) { + try { + const modifiedFiles = await getModifiedFiles(worktreeDir, workDir); + if (modifiedFiles.length > 0) { + const commitResult = await commitSandboxChanges( + workDir, + modifiedFiles, + worktreeDir, + task.title, + agentNum, + currentBaseBranch, ); - } else { - failureReason = commitResult.error || "Failed to commit sandbox changes"; - preserveSandbox = true; // Preserve work for manual recovery + + if (commitResult.success) { + // Update branches map or result to ensure we track the generated branch + branchName = commitResult.branchName; + // Update the agentResult in place so we can find it later + agentResult.branchName = branchName; + logDebug( + `Agent ${agentNum}: Committed ${commitResult.filesCommitted} files to ${branchName}`, + ); + } else { + failureReason = commitResult.error || "Failed to commit sandbox changes"; + preserveSandbox = true; // Preserve work for manual recovery + } } + } catch (commitErr) { + failureReason = commitErr instanceof Error ? commitErr.message : String(commitErr); + preserveSandbox = true; // Preserve work for manual recovery } - } catch (commitErr) { - failureReason = commitErr instanceof Error ? commitErr.message : String(commitErr); - preserveSandbox = true; // Preserve work for manual recovery } - } - if (failureReason) { - retryableFailure = isRetryableError(failureReason); - if (retryableFailure) { - const deferrals = recordDeferredTask(taskSource.type, task, workDir, prdFile); - if (deferrals >= maxRetries) { - logError(`Task "${task.title}" failed after ${deferrals} deferrals: ${failureReason}`); + if (failureReason) { + retryableFailure = isRetryableError(failureReason); + if (retryableFailure) { + const deferrals = recordDeferredTask(taskSource.type, task, workDir, prdFile); + if (deferrals >= maxRetries) { + logError(`Task "${task.title}" failed after ${deferrals} deferrals: ${failureReason}`); + logTaskProgress(task.title, "failed", workDir); + result.tasksFailed++; + notifyTaskFailed(task.title, failureReason); + await taskSource.markComplete(task.id); + clearDeferredTask(taskSource.type, task, workDir, prdFile); + retryableFailure = false; + } else { + logWarn(`Task "${task.title}" deferred (${deferrals}/${maxRetries}): ${failureReason}`); + result.tasksFailed++; + } + } else { + logError(`Task "${task.title}" failed: ${failureReason}`); logTaskProgress(task.title, "failed", workDir); result.tasksFailed++; notifyTaskFailed(task.title, failureReason); + + // Mark failed task as complete to remove it from the queue + // This prevents infinite retry loops - the task has already been retried maxRetries times await taskSource.markComplete(task.id); clearDeferredTask(taskSource.type, task, workDir, prdFile); - retryableFailure = false; - } else { - logWarn(`Task "${task.title}" deferred (${deferrals}/${maxRetries}): ${failureReason}`); - result.tasksFailed++; } - } else { - logError(`Task "${task.title}" failed: ${failureReason}`); - logTaskProgress(task.title, "failed", workDir); - result.tasksFailed++; - notifyTaskFailed(task.title, failureReason); + } else if (aiResult?.success) { + logSuccess(`Task "${task.title}" completed`); + result.totalInputTokens += aiResult.inputTokens; + result.totalOutputTokens += aiResult.outputTokens; - // Mark failed task as complete to remove it from the queue - // This prevents infinite retry loops - the task has already been retried maxRetries times await taskSource.markComplete(task.id); - clearDeferredTask(taskSource.type, task, workDir, prdFile); - } - } else if (aiResult?.success) { - logSuccess(`Task "${task.title}" completed`); - result.totalInputTokens += aiResult.inputTokens; - result.totalOutputTokens += aiResult.outputTokens; + logTaskProgress(task.title, "completed", workDir); + result.tasksCompleted++; - await taskSource.markComplete(task.id); - logTaskProgress(task.title, "completed", workDir); - result.tasksCompleted++; - - notifyTaskComplete(task.title); - clearDeferredTask(taskSource.type, task, workDir, prdFile); + notifyTaskComplete(task.title); + clearDeferredTask(taskSource.type, task, workDir, prdFile); - // Track successful branch for merge phase - if (branchName) { - completedBranches.push(branchName); - } - } else { - const errMsg = aiResult?.error || "Unknown error"; - retryableFailure = isRetryableError(errMsg); - if (retryableFailure) { - const deferrals = recordDeferredTask(taskSource.type, task, workDir, prdFile); - if (deferrals >= maxRetries) { - logError(`Task "${task.title}" failed after ${deferrals} deferrals: ${errMsg}`); + // Track successful branch for merge phase + if (branchName) { + completedBranches.push(branchName); + } + } else { + const errMsg = aiResult?.error || "Unknown error"; + retryableFailure = isRetryableError(errMsg); + if (retryableFailure) { + const deferrals = recordDeferredTask(taskSource.type, task, workDir, prdFile); + if (deferrals >= maxRetries) { + logError(`Task "${task.title}" failed after ${deferrals} deferrals: ${errMsg}`); + logTaskProgress(task.title, "failed", workDir); + result.tasksFailed++; + notifyTaskFailed(task.title, errMsg); + failureReason = errMsg; + await taskSource.markComplete(task.id); + clearDeferredTask(taskSource.type, task, workDir, prdFile); + retryableFailure = false; + } else { + logWarn(`Task "${task.title}" deferred (${deferrals}/${maxRetries}): ${errMsg}`); + result.tasksFailed++; + failureReason = errMsg; + } + } else { + logError(`Task "${task.title}" failed: ${errMsg}`); logTaskProgress(task.title, "failed", workDir); result.tasksFailed++; notifyTaskFailed(task.title, errMsg); failureReason = errMsg; + + // Mark failed task as complete to remove it from the queue + // This prevents infinite retry loops - the task has already been retried maxRetries times await taskSource.markComplete(task.id); clearDeferredTask(taskSource.type, task, workDir, prdFile); - retryableFailure = false; - } else { - logWarn(`Task "${task.title}" deferred (${deferrals}/${maxRetries}): ${errMsg}`); - result.tasksFailed++; - failureReason = errMsg; } - } else { - logError(`Task "${task.title}" failed: ${errMsg}`); - logTaskProgress(task.title, "failed", workDir); - result.tasksFailed++; - notifyTaskFailed(task.title, errMsg); - failureReason = errMsg; - - // Mark failed task as complete to remove it from the queue - // This prevents infinite retry loops - the task has already been retried maxRetries times - await taskSource.markComplete(task.id); - clearDeferredTask(taskSource.type, task, workDir, prdFile); } - } - // Cleanup sandbox inline or collect worktree for parallel cleanup - if (worktreeDir) { - if (agentUsedSandbox) { - if (failureReason || preserveSandbox) { - logWarn(`Sandbox preserved for manual review: ${worktreeDir}`); + // Cleanup sandbox inline or collect worktree for parallel cleanup + if (worktreeDir) { + if (agentUsedSandbox) { + if (failureReason || preserveSandbox) { + logWarn(`Sandbox preserved for manual review: ${worktreeDir}`); + } else { + // Sandbox cleanup is simpler - just delete the directory + await cleanupSandbox(worktreeDir); + logDebug(`Cleaned up sandbox: ${worktreeDir}`); + } } else { - // Sandbox cleanup is simpler - just delete the directory - await cleanupSandbox(worktreeDir); - logDebug(`Cleaned up sandbox: ${worktreeDir}`); + // Collect worktree for parallel cleanup below + worktreesToCleanup.push({ worktreeDir, branchName }); } - } else { - // Collect worktree for parallel cleanup below - worktreesToCleanup.push({ worktreeDir, branchName }); + } + + if (retryableFailure) { + sawRetryableFailure = true; } } - if (retryableFailure) { - sawRetryableFailure = true; + // Cleanup all worktrees in parallel + if (worktreesToCleanup.length > 0) { + const cleanupResults = await Promise.all( + worktreesToCleanup.map(({ worktreeDir, branchName }) => + cleanupAgentWorktree(worktreeDir, branchName, workDir).then((cleanup) => ({ + worktreeDir, + leftInPlace: cleanup.leftInPlace, + })), + ), + ); + + // Log any worktrees left in place + for (const { worktreeDir, leftInPlace } of cleanupResults) { + if (leftInPlace) { + logInfo(`Worktree left in place (uncommitted changes): ${worktreeDir}`); + } + } } - } - // Cleanup all worktrees in parallel - if (worktreesToCleanup.length > 0) { - const cleanupResults = await Promise.all( - worktreesToCleanup.map(({ worktreeDir, branchName }) => - cleanupAgentWorktree(worktreeDir, branchName, workDir).then((cleanup) => ({ - worktreeDir, - leftInPlace: cleanup.leftInPlace, - })), - ), - ); + // Sync PRD to GitHub issue once per batch (after all tasks processed) + // This prevents multiple concurrent syncs and reduces API calls + if (syncIssue && prdFile && result.tasksCompleted > 0) { + await syncPrdToIssue(prdFile, syncIssue, workDir); + } - // Log any worktrees left in place - for (const { worktreeDir, leftInPlace } of cleanupResults) { - if (leftInPlace) { - logInfo(`Worktree left in place (uncommitted changes): ${worktreeDir}`); + // Log batch completion time + const batchDuration = formatDuration(Date.now() - batchStartTime); + logInfo(`Batch ${iteration} completed in ${batchDuration}`); + // If any retryable failure occurred, stop the run to allow retry later + // CHAINING LOGIC: Merge this batch's work into an integration branch + // Only do this if we have successful branches and we aren't stopping early + if (!skipMerge && !dryRun && !sawRetryableFailure && completedBranches.length > 0) { + // We need to find which branches were successful in THIS batch + // refetching from the results array is safer + const currentBatchBranches = results + .map(r => r.branchName) // This might be empty if failed + .filter(b => b && completedBranches.includes(b)); + + if (currentBatchBranches.length > 0) { + let integrationBranch: string | undefined; + try { + logInfo(`Creating integration branch for batch ${iteration}...`); + // Create integration branch from current base + integrationBranch = await createIntegrationBranch( + iteration, + currentBaseBranch, + workDir, + originalBaseBranch // Use original base branch as prefix for namespacing + ); + createdIntegrationBranches.push(integrationBranch); + + logInfo(`Merging ${currentBatchBranches.length} branch(es) into ${integrationBranch}...`); + + // Merge the batch's branches into the integration branch + // We use a cleaner merge function or the existing mergeCompletedBranches logic? + // Use mergeCompletedBranches but pointed at the integration branch + await mergeCompletedBranches( + currentBatchBranches, + integrationBranch, + engine, + workDir, + modelOverride, + engineArgs + ); + + // Update current base branch for the next batch + currentBaseBranch = integrationBranch; + logSuccess(`Batch ${iteration} integrated into ${currentBaseBranch}`); + + // Remove merged branches from completedBranches to avoid re-merging them at the end + for (const branch of currentBatchBranches) { + const idx = completedBranches.indexOf(branch); + if (idx !== -1) { + completedBranches.splice(idx, 1); + } + } + } catch (err) { + logError(`Failed to create integration branch: ${err}`); + + // Cleanup the orphaned integration branch if it was created + if (integrationBranch) { + try { + await deleteLocalBranch(integrationBranch, workDir, true); + logDebug(`Cleaned up orphaned integration branch: ${integrationBranch}`); + } catch (cleanupErr) { + logWarn(`Failed to cleanup orphaned branch ${integrationBranch}: ${cleanupErr}`); + } + } + + logWarn("Integration failed, stopping execution to preserve dependency chain."); + // Stop execution because subsequent batches depend on this integration + break; + } } } - } - // Sync PRD to GitHub issue once per batch (after all tasks processed) - // This prevents multiple concurrent syncs and reduces API calls - if (syncIssue && prdFile && result.tasksCompleted > 0) { - await syncPrdToIssue(prdFile, syncIssue, workDir); + if (sawRetryableFailure) { + logWarn("Stopping early due to retryable errors. Try again later."); + break; + } } - // Log batch completion time - const batchDuration = formatDuration(Date.now() - batchStartTime); - logInfo(`Batch ${iteration} completed in ${batchDuration}`); - // If any retryable failure occurred, stop the run to allow retry later - if (sawRetryableFailure) { - logWarn("Stopping early due to retryable errors. Try again later."); - break; + // Merge phase: merge final integration branch back to base branch + // If we used integration branches, 'currentBaseBranch' holds the accumulated work. + // We need to merge 'currentBaseBranch' into 'originalBaseBranch'. + // We ALSO need to merge any leftover branches in 'completedBranches' (e.g. if a batch failed to integrate). + + const branchesToMerge = [...completedBranches]; + if (currentBaseBranch !== originalBaseBranch) { + branchesToMerge.push(currentBaseBranch); } - } + + if (!skipMerge && !dryRun && branchesToMerge.length > 0) { + const git = simpleGit(workDir); + let stashed = false; + try { + const status = await git.status(); + const hasChanges = status.files.length > 0 || status.not_added.length > 0; + if (hasChanges) { + await git.stash(["push", "-u", "-m", "ralphy-merge-stash"]); + stashed = true; + logDebug("Stashed local changes before merge phase"); + } + } catch (stashErr) { + logWarn(`Failed to stash local changes: ${stashErr}`); + } - // Merge phase: merge completed branches back to base branch - if (!skipMerge && !dryRun && completedBranches.length > 0) { - const git = simpleGit(workDir); - let stashed = false; - try { - const status = await git.status(); - const hasChanges = status.files.length > 0 || status.not_added.length > 0; - if (hasChanges) { - await git.stash(["push", "-u", "-m", "ralphy-merge-stash"]); - stashed = true; - logDebug("Stashed local changes before merge phase"); + try { + if (currentBaseBranch !== originalBaseBranch) { + // We have a chain of integration branches. + logInfo(`Merging final integration branch ${currentBaseBranch} and ${completedBranches.length} other(s) into ${originalBaseBranch}`); + } + + await mergeCompletedBranches( + branchesToMerge, + originalBaseBranch, + engine, + workDir, + modelOverride, + engineArgs + ); + + // Restore starting branch if we're not already on it + const currentBranch = await getCurrentBranch(workDir); + if (currentBranch !== startingBranch) { + logDebug(`Restoring starting branch: ${startingBranch}`); + await returnToBaseBranch(startingBranch, workDir); + } + } finally { + if (stashed) { + try { + await git.stash(["pop"]); + logDebug("Restored stashed changes after merge phase"); + } catch (stashErr) { + logWarn(`Failed to restore stashed changes: ${stashErr}`); + } + } } - } catch (stashErr) { - logWarn(`Failed to stash local changes: ${stashErr}`); } + return result; + } finally { + // 1. Restore starting branch FIRST so we can delete other branches safely + // This must happen regardless of whether we merged, skipped, or failed try { - await mergeCompletedBranches( - completedBranches, - originalBaseBranch, - engine, - workDir, - modelOverride, - engineArgs, - ); - - // Restore starting branch if we're not already on it const currentBranch = await getCurrentBranch(workDir); if (currentBranch !== startingBranch) { logDebug(`Restoring starting branch: ${startingBranch}`); await returnToBaseBranch(startingBranch, workDir); } - } finally { - if (stashed) { + } catch (restoreErr) { + logWarn(`Failed to restore starting branch: ${restoreErr}`); + } + + // 2. Cleanup all intermediate integration branches + if (createdIntegrationBranches.length > 0) { + // Cleanup distinct branches (deduplicate just in case) + const distinctBranches = [...new Set(createdIntegrationBranches)]; + for (const branch of distinctBranches) { try { - await git.stash(["pop"]); - logDebug("Restored stashed changes after merge phase"); - } catch (stashErr) { - logWarn(`Failed to restore stashed changes: ${stashErr}`); + await deleteLocalBranch(branch, workDir, true); + } catch { + // Ignore errors } } } } - - return result; } /** diff --git a/cli/src/git/merge.ts b/cli/src/git/merge.ts index 8fce8a1f..480969d6 100644 --- a/cli/src/git/merge.ts +++ b/cli/src/git/merge.ts @@ -103,9 +103,17 @@ export async function createIntegrationBranch( groupNum: number, baseBranch: string, workDir: string, + prefix?: string, ): Promise { const git: SimpleGit = simpleGit(workDir); - const branchName = `ralphy/integration-group-${groupNum}`; + + let branchName = `ralphy/integration-group-${groupNum}`; + + if (prefix) { + // Sanitize prefix: replace non-alphanumeric chars (mostly slashes) with dashes + const sanitizedPrefix = prefix.replace(/[^a-zA-Z0-9-]/g, "-"); + branchName = `ralphy/${sanitizedPrefix}-integration-group-${groupNum}`; + } // Checkout base branch first await git.checkout(baseBranch);