Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 37 additions & 41 deletions codex/tasks/latest.json
Original file line number Diff line number Diff line change
@@ -1,55 +1,51 @@
{
"task_id": "phase1_ingest_canonicalization_2026_02_20",
"title": "Phase 1: Canonicalize transcript at ingest + stable transcriptHash",
"summary": "Implement ingest-time transcript canonicalization (NFKC + punctuation folding + line-ending normalization + BOM/null stripping) with versioning. Store rawTranscript + canonicalTranscript + transcriptHash + canonicalizationVersion on Entry for all write paths (upload route + GraphQL addEntry/updateEntry + any other transcript writers). Add deterministic tests for the canonicalization corpus. Do not bulk-migrate existing entries; freeze legacy entries at canonicalizationVersion=0/null and only apply v1 on new/updated transcripts going forward.",
"task_id": "run_b_failure_firewall_2026_02_20",
"title": "Run B: Failure-path firewall (SanitizedError, DLQ scrub, leak canary, drop ledger, consumer contract)",
"base_branch": "develop",
"branch_name": "codex/implement-transcript-canonicalization-at-ingest",
"branch_name": "codex/run-b-failure-firewall-exec-2026-02-20",
"summary": "Harden failure paths so no plaintext transcript (or model output fragments) can leak via errors/logs/events/queues/spans. Add SanitizedError wrapper, scrub DLQ/retry payloads, enforce leak-canary tests across sinks, introduce drop-with-record ledger (no plaintext) with PROCESSING_WARNING_CODE enum, add schemaVersion + processingStatus consumer contract semantics, and define UNANCHORED_SIGNAL schema stub (no UI/producer yet).",
"repo_scope": [
"server/models/Entry.js",
"server/routes/upload.js",
"server/graphql/resolvers/index.js",
"server/src/workers/scribe.worker.js",
"server/src/workers/reflection.worker.js",
"server/src/utils/**",
"server/utils/**",
"server/models/__tests__/**",
"server/src/**/__tests__/**",
"codex/tasks/latest.json",
"server/src/**",
"server/tests/**",
"server/__tests__/**",
"server/routes/__tests__/**",
"scripts/codex_preflight.mjs",
"codex/tasks/latest.json"
"server/docs/**",
"docs/**"
],
"agents_involved": ["codex-web"],
"risk_level": "medium",
"agents_involved": [
"codex_web"
],
"risk_level": "high",
"tests_to_run": [
"node -e \"JSON.parse(require('fs').readFileSync('codex/tasks/latest.json','utf8')); console.log('latest.json ok')\"",
"node scripts/codex_preflight.mjs --ci",
"pnpm -C server test"
"pnpm --filter server test"
],
"constraints": [
"Codex Web environment: do NOT run git push; use the Create PR button.",
"Do NOT create placeholder files or empty directories. If no diff is needed, stop and report; do not create a PR.",
"All changes must remain within repo_scope. If a necessary fix is out-of-scope, produce a Repair Manifest instead of changing it.",
"Canonicalization happens at ingest/write time only (identity). Do not re-canonicalize during validation except legacy v0 fallback.",
"Do NOT bulk-migrate existing stored transcripts. Implement freeze+version: legacy entries are v0/null; new writes become v1.",
"Hashing must be based on canonicalTranscript and must NOT use locale-sensitive casefolding (no toLowerCase/toUpperCase on hash inputs).",
"No raw user transcript content may be logged or emitted into events as part of this change."
"Codex Web sandbox: DO NOT run git network commands (no git fetch/pull/push/clone). Use the UI Create PR button if and only if there is a real diff.",
"Alignment Evidence (must print): task_id, base_branch, branch_name, _meta.canary from codex/tasks/latest.json; also print `git rev-parse HEAD` for evidence only (never STOP on SHA mismatch).",
"Repo-scope enforcement: do not edit files outside repo_scope. If you discover an out-of-scope fix, record it under a Repair Manifest section in the final summary instead of editing it.",
"Anti-cop-out rule: perform a Work-Exists Gate by locating and citing the exact files/lines to change. If no actionable work exists, STOP with evidence. No diff => no PR.",
"Do not store, log, emit, or enqueue any plaintext transcript or model-output fragments in any failure path. Only allow IDs, hashes, timestamps, enum reason codes, schema versions, and counts.",
"Do not add new dependencies unless absolutely required; prefer small, deterministic utilities and tests.",
"Do not create draft PRs."
],
"acceptance_checks": [
"Alignment Evidence: print task_id, base_branch, branch_name, repo_scope, tests_to_run at start of run.",
"Work-Exists Gate: identify all transcript write paths (upload.js, GraphQL addEntry/updateEntry, scribe worker transcript persistence) and show exact files/lines to be changed.",
"Implement a single ingest canonicalization function (v1) using NFKC + punctuation folding + newline normalization + BOM/null stripping + internal whitespace folding (preserve newlines) + trim; store canonicalizationVersion='1'.",
"Entry stores rawTranscript (untouched) and canonicalTranscript (canonicalized). transcriptHash is sha256(canonicalTranscript).",
"All transcript-writing paths set/update canonical fields consistently when transcript changes.",
"Add/extend deterministic tests covering: smart quotes folding, dash folding, ellipsis folding, CRLF/CR normalization, BOM/null stripping, internal whitespace folding (tabs/multi-spaces without breaking newlines), and idempotency (canon(canon(x))==canon(x)).",
"Run tests_to_run and show outputs. If any test is skipped, explain why and provide a safe alternative.",
"Change Proof: show git status -sb and git diff --stat at end. No diff => no PR."
"latest.json is valid JSON (parse check passes).",
"node scripts/codex_preflight.mjs --ci passes.",
"Implement SanitizedError (or equivalent) so JSON.parse / worker failure logging cannot include raw input fragments; tests prove `SENSITIVE_LEAK_123` never appears in any sink outputs.",
"DLQ/retry/job payload scrub is enforced: only whitelisted metadata fields are allowed; tests assert no unexpected string fields beyond approved schema.",
"Leak Canary suite exists and asserts canary is absent from: logs (console/logger spies), emitted events, AgentTask records/payloads, queue/job payloads, and mocked Langfuse span metadata.",
"Drop-with-record ledger exists: writes only structured data (reason enum, hashes, counts, canonicalizationVersion, processingStatus, warningCodes) and contains no plaintext fields.",
"PROCESSING_WARNING_CODE enum added in a central contract module (no string soup).",
"Consumer failure contract documented: schemaVersion required; unknown schemaVersion rejects loudly; processingStatus semantics documented and tested (complete/partial/failed; empty arrays not equivalent across statuses).",
"UNANCHORED_SIGNAL schema/type is defined (stub only) with fields suitable for later user resolution; no UI loop required in this run.",
"pnpm --filter server test passes."
],
"locks": {
"task_id": "phase1_ingest_canonicalization_2026_02_20",
"base_branch": "develop",
"branch_name": "codex/implement-transcript-canonicalization-at-ingest",
"canary": "PHASE1_INGEST_CANON_V1_CANARY_2026_02_20"
"_meta": {
"canary": "RUN_B_FIREWALL_CANARY_2026_02_20",
"created_at": "2026-02-20",
"notes": [
"Run B focuses on failure-path leak prevention + contracts + ledgers. No safetyView implementation in this run; adopt SafetyView spec later as a dedicated run.",
"Codex Web may check out to a local branch named 'work'—this is not a failure. Validate by latest.json content + canary + locks."
]
}
}
25 changes: 25 additions & 0 deletions server/docs/run-b-failure-firewall.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Run B Failure-Path Firewall Contract

## ENTRY_ANALYZED consumer contract

- `schemaVersion` is required and must equal `1.0`.
- Unknown `schemaVersion` values are rejected with a loud contract error.
- `processingStatus` enum values:
- `complete`: requires at least one `bloomCards` item.
- `partial`: allows empty `bloomCards` and must include warning codes when data was dropped.
- `failed`: terminal consumer failure state.
- Empty arrays are not treated equivalently across statuses.

## Drop-with-record ledger

Drop ledger records are stored in reflection `structuredData.dropLedger` and include only:

- reason enum + reason hash
- dropped counts + total
- canonicalizationVersion
- processingStatus
- warningCodes
- createdAt timestamp
- IDs

No transcript text or model-output fragments are persisted in ledger records.
44 changes: 44 additions & 0 deletions server/src/orchestration/__tests__/eventBus.contract.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { afterEach, describe, expect, it, vi } from 'vitest';

import {
emitEntryAnalyzed,
onEntryAnalyzed,
resetEventBusForTests,
} from '../eventBus.js';
import { ENTRY_ANALYZED_SCHEMA_VERSION, PROCESSING_STATUS } from '../../utils/failureFirewallContracts.js';

describe('ENTRY_ANALYZED contract', () => {
afterEach(() => {
resetEventBusForTests();
vi.restoreAllMocks();
});

it('emits schemaVersion envelope and blocks plaintext canary in listener payload', () => {
const listener = vi.fn();
onEntryAnalyzed(listener);

emitEntryAnalyzed({
entryId: 'entry-1',
userId: 'user-1',
bloomCards: [{ headline: 'safe headline' }],
processingStatus: PROCESSING_STATUS.COMPLETE,
warningCodes: [],
});

const payload = listener.mock.calls[0]?.[0];
expect(payload?.schemaVersion).toBe(ENTRY_ANALYZED_SCHEMA_VERSION);
expect(JSON.stringify(payload)).not.toContain('SENSITIVE_LEAK_123');
});

it('rejects unknown schema versions loudly', () => {
expect(() =>
emitEntryAnalyzed({
schemaVersion: '2.0',
entryId: 'entry-1',
userId: 'user-1',
bloomCards: [{ headline: 'safe headline' }],
processingStatus: PROCESSING_STATUS.COMPLETE,
} as any),
).toThrow('ENTRY_ANALYZED schemaVersion unsupported');
});
});
7 changes: 4 additions & 3 deletions server/src/orchestration/agentOrchestration.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { queues } from '../queues/index.js';
import Entry from '../../models/Entry.js';

import { publishAgentTaskStatus } from './agentTaskEvents.js';
import { enforceWhitelistedJobPayload, sanitizeError } from '../utils/failureFirewall.js';

// ✅ Standardized queue hygiene + retry policy (matches tests + “factory mode”)
export const STANDARD_JOB_OPTS = Object.freeze({
Expand Down Expand Up @@ -69,7 +70,7 @@ async function markEnqueueFailed(task, err, fallbackMessage) {
task._id,
{
status: 'failed',
error: err?.message || fallbackMessage,
error: sanitizeError(err).message || fallbackMessage,
},
{ new: true },
);
Expand Down Expand Up @@ -171,10 +172,10 @@ async function enqueueTask({

if (!task?._id) throw new Error(`[AGENT_ORCH] Missing task before enqueue for ${kind}`);

const payload = {
const payload = enforceWhitelistedJobPayload({
taskId: task._id.toString(),
entryId: normalizedId.toString(),
};
});

const resolvedJobId = jobId || task._id.toString();

Expand Down
6 changes: 5 additions & 1 deletion server/src/orchestration/eventBus.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import { EventEmitter } from 'node:events';

import { toEntryAnalyzedEnvelope, validateEntryAnalyzedEnvelope } from '../utils/failureFirewall.js';

export const ENTRY_ANALYZED_EVENT = 'ENTRY_ANALYZED';

const bus = new EventEmitter();
bus.setMaxListeners(100);

export const emitEntryAnalyzed = (payload) => {
bus.emit(ENTRY_ANALYZED_EVENT, payload);
const envelope = toEntryAnalyzedEnvelope(payload);
validateEntryAnalyzedEnvelope(envelope);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Keep ENTRY_ANALYZED emitter backward-compatible

emitEntryAnalyzed now always validates the envelope, and with the new defaults a payload that omits processingStatus is treated as complete and must include non-empty bloomCards. Existing callers in this repo (for example scripts/simulate_nervous_system.js) still emit minimal payloads like { entryId, userId, cardsCreatedCount, analyzedAt }, so they now throw ENTRY_ANALYZED complete status requires cards before any listener executes, breaking that experiment workflow. Please either migrate those emitters in the same change or add a compatibility path when bloomCards is absent.

Useful? React with 👍 / 👎.

bus.emit(ENTRY_ANALYZED_EVENT, envelope);
};

export const onEntryAnalyzed = (listener) => {
Expand Down
86 changes: 86 additions & 0 deletions server/src/utils/__tests__/failureFirewall.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import { describe, expect, it } from 'vitest';

import {
buildDropLedgerRecord,
enforceWhitelistedJobPayload,
SanitizedError,
validateEntryAnalyzedEnvelope,
} from '../failureFirewall.js';
import {
ENTRY_ANALYZED_SCHEMA_VERSION,
PROCESSING_STATUS,
PROCESSING_WARNING_CODE,
UNANCHORED_SIGNAL_SCHEMA,
} from '../failureFirewallContracts.js';

describe('failure firewall contracts', () => {
it('enforces strict job payload whitelist', () => {
const payload = enforceWhitelistedJobPayload({
entryId: 'entry-1',
taskId: 'task-1',
transcript: 'SENSITIVE_LEAK_123',
nested: { modelOutput: 'SENSITIVE_LEAK_123' },
} as any);

expect(payload).toEqual({ entryId: 'entry-1', taskId: 'task-1' });
expect(JSON.stringify(payload)).not.toContain('SENSITIVE_LEAK_123');
});

it('rejects unknown schema versions loudly', () => {
expect(() =>
validateEntryAnalyzedEnvelope({
schemaVersion: '999',
processingStatus: PROCESSING_STATUS.COMPLETE,
bloomCards: [{ headline: 'ok' }],
}),
).toThrow(SanitizedError);
});

it('treats empty arrays differently across processing statuses', () => {
expect(() =>
validateEntryAnalyzedEnvelope({
schemaVersion: ENTRY_ANALYZED_SCHEMA_VERSION,
processingStatus: PROCESSING_STATUS.COMPLETE,
bloomCards: [],
}),
).toThrow(SanitizedError);

expect(() =>
validateEntryAnalyzedEnvelope({
schemaVersion: ENTRY_ANALYZED_SCHEMA_VERSION,
processingStatus: PROCESSING_STATUS.PARTIAL,
bloomCards: [],
}),
).not.toThrow();
});

it('builds drop ledger with structured-only fields', () => {
const record = buildDropLedgerRecord({
entryId: 'entry-123',
reason: 'NO_RECEIPTS',
dropped: { missing_receipts: 2 },
processingStatus: PROCESSING_STATUS.PARTIAL,
warningCodes: [PROCESSING_WARNING_CODE.DROPPED_BY_RECEIPT_POLICY],
canonicalizationVersion: '1',
});

expect(record).toEqual(
expect.objectContaining({
entryId: 'entry-123',
reason: 'NO_RECEIPTS',
droppedCounts: { missing_receipts: 2 },
processingStatus: PROCESSING_STATUS.PARTIAL,
}),
);
expect(JSON.stringify(record)).not.toContain('SENSITIVE_LEAK_123');
});

it('defines UNANCHORED_SIGNAL schema stub for later user resolution', () => {
expect(UNANCHORED_SIGNAL_SCHEMA).toEqual(
expect.objectContaining({
type: 'UNANCHORED_SIGNAL',
schemaVersion: ENTRY_ANALYZED_SCHEMA_VERSION,
}),
);
});
});
Loading
Loading