Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions src/lib/merge-train.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,71 @@ const INSTALL_COMMAND_BY_LOCKFILE = [



export async function validateTouchSet(
jobBranch: string,
baseBranch: string,
touchSet: string[],
opts?: { cwd?: string },
): Promise<{ valid: boolean; violations?: string[]; changedFiles?: string[] }> {
if (touchSet.length === 0) {
return { valid: true };
}

const diffResult = await gitCommand(['diff', '--name-only', `${baseBranch}...${jobBranch}`], opts);
if (diffResult.exitCode !== 0) {
return { valid: false, violations: [`Failed to diff: ${diffResult.stderr}`] };
}

const changedFiles = diffResult.stdout.split('\n').map(f => f.trim()).filter(Boolean);
if (changedFiles.length === 0) {
return { valid: true, changedFiles: [] };
}

const violations: string[] = [];
for (const file of changedFiles) {
const matchesAny = touchSet.some(pattern => {
const glob = new Bun.Glob(pattern);
return glob.match(file);
});
if (!matchesAny) {
violations.push(file);
}
}

return {
valid: violations.length === 0,
violations: violations.length > 0 ? violations : undefined,
changedFiles,
};
}

export async function checkMergeability(
integrationWorktree: string,
jobBranch: string,
): Promise<{ canMerge: boolean; conflicts?: string[] }> {
const testMerge = await gitCommand([
'-C', integrationWorktree,
'merge', '--no-commit', '--no-ff', jobBranch,
]);

// Always clean up — leave worktree pristine
await gitCommand(['-C', integrationWorktree, 'merge', '--abort']).catch(() => {});
await gitCommand(['-C', integrationWorktree, 'reset', '--hard', 'HEAD']).catch(() => {});
await gitCommand(['-C', integrationWorktree, 'clean', '-fd']).catch(() => {});

if (testMerge.exitCode !== 0) {
const conflicts = extractConflicts(
[testMerge.stdout, testMerge.stderr].filter(Boolean).join('\n'),
);
return {
canMerge: false,
conflicts: conflicts.length > 0 ? conflicts : undefined,
};
}

return { canMerge: true };
}

async function rollbackMerge(worktreePath: string): Promise<void> {
// Try merge --abort first
await gitCommand(['-C', worktreePath, 'merge', '--abort']).catch(() => {});
Expand Down
99 changes: 59 additions & 40 deletions src/lib/orchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import type { PlanSpec, JobSpec, PlanStatus, CheckpointType } from './plan-types
import { loadPlan, savePlan, updatePlanJob, clearPlan, validateGhAuth } from './plan-state';
import { getDefaultBranch } from './git';
import { createIntegrationBranch, deleteIntegrationBranch } from './integration';
import { MergeTrain, type MergeTestReport } from './merge-train';
import { MergeTrain, checkMergeability, type MergeTestReport, validateTouchSet } from './merge-train';
import { addJob, getRunningJobs, updateJob, loadJobState, removeJob, type Job } from './job-state';
import { JobMonitor } from './monitor';
import { removeReport } from './reports';
Expand Down Expand Up @@ -473,6 +473,22 @@ export class Orchestrator {

for (const job of mergeOrder) {
if (job.status === 'completed') {
if (job.touchSet && job.touchSet.length > 0 && job.branch && plan.integrationBranch) {
const validation = await validateTouchSet(job.branch, plan.integrationBranch, job.touchSet);
if (!validation.valid && validation.violations) {
await updatePlanJob(plan.id, job.name, {
status: 'failed',
error: `Modified files outside touchSet: ${validation.violations.join(', ')}. Expected only: ${job.touchSet.join(', ')}`,
});
job.status = 'failed';

this.showToast('Mission Control', `Job "${job.name}" touched files outside its touchSet. Plan paused.`, 'error');
this.notify(`❌ Job "${job.name}" modified files outside its touchSet:\n Violations: ${validation.violations.join(', ')}\n Allowed: ${job.touchSet.join(', ')}\nFix the branch and retry with mc_plan_approve(checkpoint: "on_error", retry: "${job.name}").`);
await this.setCheckpoint('on_error', plan);
return;
}
}

await updatePlanJob(plan.id, job.name, { status: 'ready_to_merge' });
job.status = 'ready_to_merge';
}
Expand All @@ -494,6 +510,22 @@ export class Orchestrator {
continue;
}

if (job.branch && plan.integrationWorktree) {
const mergeCheck = await checkMergeability(plan.integrationWorktree, job.branch);
if (!mergeCheck.canMerge) {
await updatePlanJob(plan.id, job.name, {
status: 'needs_rebase',
error: mergeCheck.conflicts?.join(', ') ?? 'merge conflict detected in trial merge',
});
job.status = 'needs_rebase';

this.showToast('Mission Control', `Job "${job.name}" has merge conflicts. Plan paused.`, 'error');
this.notify(`❌ Job "${job.name}" would conflict with the integration branch.\n Files: ${mergeCheck.conflicts?.join(', ') ?? 'unknown'}\nRebase the job branch and retry with mc_plan_approve(checkpoint: "on_error", retry: "${job.name}").`);
await this.setCheckpoint('on_error', plan);
return;
}
}

if (this.isSupervisor(plan) && !this.approvedForMerge.has(job.name)) {
await this.setCheckpoint('pre_merge', plan);
return;
Expand Down Expand Up @@ -538,14 +570,10 @@ export class Orchestrator {
error: mergeResult.files?.join(', ') ?? 'merge conflict',
});

if (this.isSupervisor(plan)) {
await this.setCheckpoint('on_error', plan);
return;
}

plan.status = 'failed';
this.showToast('Mission Control', `Merge conflict in job "${nextJob.name}".`, 'error');
this.notify(`❌ Merge conflict in job "${nextJob.name}". Files: ${mergeResult.files?.join(', ') ?? 'unknown'}. Plan failed.`);
this.showToast('Mission Control', `Merge conflict in job "${nextJob.name}". Plan paused.`, 'error');
this.notify(`❌ Merge conflict in job "${nextJob.name}". Files: ${mergeResult.files?.join(', ') ?? 'unknown'}. Fix the branch and retry with mc_plan_approve(checkpoint: "on_error", retry: "${nextJob.name}").`);
await this.setCheckpoint('on_error', plan);
return;
} else {
await updatePlanJob(plan.id, nextJob.name, {
status: 'failed',
Expand All @@ -557,14 +585,10 @@ export class Orchestrator {
this.notify(`🧪 ${nextJob.name}: ${testSummary}`);
}

if (this.isSupervisor(plan)) {
await this.setCheckpoint('on_error', plan);
return;
}

plan.status = 'failed';
this.showToast('Mission Control', `Job "${nextJob.name}" failed during merge.`, 'error');
this.notify(`❌ Job "${nextJob.name}" failed merge tests. Plan failed.`);
this.showToast('Mission Control', `Job "${nextJob.name}" failed merge tests. Plan paused.`, 'error');
this.notify(`❌ Job "${nextJob.name}" failed merge tests. Fix the branch and retry with mc_plan_approve(checkpoint: "on_error", retry: "${nextJob.name}").`);
await this.setCheckpoint('on_error', plan);
return;
}
}

Expand Down Expand Up @@ -763,22 +787,24 @@ If your work needs human review before it can proceed: mc_report(status: "needs_
}

private handleJobComplete = (job: Job): void => {
if (job.planId && this.activePlanId && job.planId === this.activePlanId) {
if (!this.firstJobCompleted) {
this.firstJobCompleted = true;
this.showToast('Mission Control', `First job completed: "${job.name}".`, 'success');
}
if (!job.planId || !this.activePlanId || job.planId !== this.activePlanId) {
return;
}

updatePlanJob(job.planId, job.name, {
status: 'completed',
}).catch((error) => {
console.error('Failed to update completed job state:', error);
});
if (!this.firstJobCompleted) {
this.firstJobCompleted = true;
this.showToast('Mission Control', `First job completed: "${job.name}".`, 'success');
}

this.reconcile().catch((error) => {
console.error('Reconcile after completion failed:', error);
const planId = job.planId;
(async () => {
await updatePlanJob(planId, job.name, {
status: 'completed',
});
}
await this.reconcile();
})().catch((error) => {
console.error('Failed to reconcile completed job state:', error);
});
}

private handleJobFailed = (job: Job): void => {
Expand All @@ -794,16 +820,9 @@ If your work needs human review before it can proceed: mc_report(status: "needs_
return;
}

if (this.isSupervisor(plan)) {
await this.setCheckpoint('on_error', plan);
return;
}

plan.status = 'failed';
plan.completedAt = new Date().toISOString();
await savePlan(plan);
this.showToast('Mission Control', `Plan failed: job "${job.name}" failed.`, 'error');
this.notify(`❌ Plan failed: job "${job.name}" failed.`);
this.showToast('Mission Control', `Job "${job.name}" failed. Plan paused.`, 'error');
this.notify(`❌ Job "${job.name}" failed. Fix and retry with mc_plan_approve(checkpoint: "on_error", retry: "${job.name}").`);
await this.setCheckpoint('on_error', plan);
})
.catch(() => {})
.finally(() => {
Expand Down
4 changes: 2 additions & 2 deletions src/lib/plan-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ export const VALID_JOB_TRANSITIONS: Record<JobStatus, JobStatus[]> = {
waiting_deps: ['running', 'stopped', 'canceled'],
running: ['completed', 'failed', 'stopped', 'canceled'],
completed: ['ready_to_merge', 'stopped', 'canceled'],
failed: ['stopped', 'canceled'],
ready_to_merge: ['merging', 'stopped', 'canceled'],
failed: ['ready_to_merge', 'stopped', 'canceled'],
ready_to_merge: ['merging', 'needs_rebase', 'stopped', 'canceled'],
merging: ['merged', 'conflict', 'stopped', 'canceled'],
merged: ['needs_rebase'],
conflict: ['ready_to_merge', 'stopped', 'canceled'],
Expand Down
33 changes: 30 additions & 3 deletions src/tools/plan-approve.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { tool, type ToolDefinition } from '@opencode-ai/plugin';
import { loadPlan, savePlan } from '../lib/plan-state';
import { loadPlan, savePlan, updatePlanJob } from '../lib/plan-state';
import { Orchestrator } from '../lib/orchestrator';
import { getSharedMonitor, getSharedNotifyCallback, setSharedOrchestrator } from '../lib/orchestrator-singleton';
import type { CheckpointType } from '../lib/plan-types';
Expand All @@ -10,21 +10,47 @@ import { resolvePostCreateHook } from '../lib/worktree-setup';

export const mc_plan_approve: ToolDefinition = tool({
description:
'Approve a pending copilot plan or clear a supervisor checkpoint to continue execution',
'Approve a pending copilot plan, clear a supervisor checkpoint, or retry a failed job to continue execution',
args: {
checkpoint: tool.schema
.enum(['pre_merge', 'on_error', 'pre_pr'])
.optional()
.describe('Specific checkpoint to clear (for supervisor mode)'),
retry: tool.schema
.string()
.optional()
.describe('Name of a failed, conflict, or needs_rebase job to retry'),
},
async execute(args) {
const plan = await loadPlan();
if (!plan) {
throw new Error('No active plan to approve');
}

if (args.retry) {
const job = plan.jobs.find((j) => j.name === args.retry);
if (!job) {
throw new Error(`Job "${args.retry}" not found in plan`);
}
if (job.status !== 'failed' && job.status !== 'conflict' && job.status !== 'needs_rebase') {
throw new Error(`Job "${args.retry}" is not in a retryable state (current: ${job.status}). Only failed, conflict, or needs_rebase jobs can be retried.`);
}
}

if (plan.status === 'paused' && plan.checkpoint) {
const checkpoint = (args.checkpoint ?? plan.checkpoint) as CheckpointType;

if (args.retry) {
const job = plan.jobs.find(j => j.name === args.retry);
if (!job) {
throw new Error(`Job "${args.retry}" not found in plan`);
}
if (job.status !== 'failed' && job.status !== 'conflict' && job.status !== 'needs_rebase') {
throw new Error(`Job "${args.retry}" is not in a retryable state (current: ${job.status}). Only failed, conflict, or needs_rebase jobs can be retried.`);
}
await updatePlanJob(plan.id, args.retry, { status: 'ready_to_merge', error: undefined });
}

plan.status = 'running';
plan.checkpoint = null;
await savePlan(plan);
Expand All @@ -35,8 +61,9 @@ export const mc_plan_approve: ToolDefinition = tool({
orchestrator.setPlanModelSnapshot(getCurrentModel());
await orchestrator.resumePlan();

const retryMsg = args.retry ? ` Job "${args.retry}" reset to ready_to_merge.` : '';
return [
`Checkpoint "${checkpoint}" cleared. Plan "${plan.name}" resuming.`,
`Checkpoint "${checkpoint}" cleared.${retryMsg} Plan "${plan.name}" resuming.`,
'',
` ID: ${plan.id}`,
` Mode: ${plan.mode}`,
Expand Down
15 changes: 8 additions & 7 deletions tests/integration/orchestration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ describe('orchestration integration', () => {
expect(tmux.sendKeys).toHaveBeenCalledTimes(0);
}, 30000);

it('marks plan failed when a merge conflict occurs and leaves integration clean', async () => {
it('pauses plan on merge conflict for retry instead of failing', async () => {
const monitor = new FakeMonitor();
mockTmux();
const orchestrator = new Orchestrator(monitor as any, {
Expand Down Expand Up @@ -497,22 +497,23 @@ describe('orchestration integration', () => {
await waitForCondition(async () => {
const currentPlan = await loadPlan();
const second = currentPlan?.jobs.find((job) => job.id === 'job-2');
return currentPlan?.status === 'failed' && second?.status === 'conflict';
return currentPlan?.status === 'paused' && second?.status === 'needs_rebase';
}, 8000);

const failedPlan = await loadPlan();
expect(failedPlan?.status).toBe('failed');
expect(failedPlan?.jobs.find((job) => job.id === 'job-2')?.status).toBe('conflict');
const pausedPlan = await loadPlan();
expect(pausedPlan?.status).toBe('paused');
expect(pausedPlan?.checkpoint).toBe('on_error');
expect(pausedPlan?.jobs.find((job) => job.id === 'job-2')?.status).toBe('needs_rebase');

const status = await mustExec(
['git', 'status', '--porcelain'],
failedPlan!.integrationWorktree!,
pausedPlan!.integrationWorktree!,
);
expect(status).toBe('');

const mergeHead = await exec(
['git', 'rev-parse', '-q', '--verify', 'MERGE_HEAD'],
failedPlan!.integrationWorktree!,
pausedPlan!.integrationWorktree!,
);
expect(mergeHead.exitCode).not.toBe(0);
}, 30000);
Expand Down
Loading