diff --git a/codex/tasks/latest.json b/codex/tasks/latest.json index e5f76b587..9ceb24af4 100644 --- a/codex/tasks/latest.json +++ b/codex/tasks/latest.json @@ -1,55 +1,51 @@ { - "task_id": "phase1_ingest_canonicalization_2026_02_20", - "title": "Phase 1: Canonicalize transcript at ingest + stable transcriptHash", - "summary": "Implement ingest-time transcript canonicalization (NFKC + punctuation folding + line-ending normalization + BOM/null stripping) with versioning. Store rawTranscript + canonicalTranscript + transcriptHash + canonicalizationVersion on Entry for all write paths (upload route + GraphQL addEntry/updateEntry + any other transcript writers). Add deterministic tests for the canonicalization corpus. Do not bulk-migrate existing entries; freeze legacy entries at canonicalizationVersion=0/null and only apply v1 on new/updated transcripts going forward.", + "task_id": "run_b_failure_firewall_2026_02_20", + "title": "Run B: Failure-path firewall (SanitizedError, DLQ scrub, leak canary, drop ledger, consumer contract)", "base_branch": "develop", - "branch_name": "codex/implement-transcript-canonicalization-at-ingest", + "branch_name": "codex/run-b-failure-firewall-exec-2026-02-20", + "summary": "Harden failure paths so no plaintext transcript (or model output fragments) can leak via errors/logs/events/queues/spans. Add SanitizedError wrapper, scrub DLQ/retry payloads, enforce leak-canary tests across sinks, introduce drop-with-record ledger (no plaintext) with PROCESSING_WARNING_CODE enum, add schemaVersion + processingStatus consumer contract semantics, and define UNANCHORED_SIGNAL schema stub (no UI/producer yet).", "repo_scope": [ - "server/models/Entry.js", - "server/routes/upload.js", - "server/graphql/resolvers/index.js", - "server/src/workers/scribe.worker.js", - "server/src/workers/reflection.worker.js", - "server/src/utils/**", - "server/utils/**", - "server/models/__tests__/**", - "server/src/**/__tests__/**", + "codex/tasks/latest.json", + "server/src/**", "server/tests/**", - "server/__tests__/**", - "server/routes/__tests__/**", - "scripts/codex_preflight.mjs", - "codex/tasks/latest.json" + "server/docs/**", + "docs/**" ], - "agents_involved": ["codex-web"], - "risk_level": "medium", + "agents_involved": [ + "codex_web" + ], + "risk_level": "high", "tests_to_run": [ - "node -e \"JSON.parse(require('fs').readFileSync('codex/tasks/latest.json','utf8')); console.log('latest.json ok')\"", "node scripts/codex_preflight.mjs --ci", - "pnpm -C server test" + "pnpm --filter server test" ], "constraints": [ - "Codex Web environment: do NOT run git push; use the Create PR button.", - "Do NOT create placeholder files or empty directories. If no diff is needed, stop and report; do not create a PR.", - "All changes must remain within repo_scope. If a necessary fix is out-of-scope, produce a Repair Manifest instead of changing it.", - "Canonicalization happens at ingest/write time only (identity). Do not re-canonicalize during validation except legacy v0 fallback.", - "Do NOT bulk-migrate existing stored transcripts. Implement freeze+version: legacy entries are v0/null; new writes become v1.", - "Hashing must be based on canonicalTranscript and must NOT use locale-sensitive casefolding (no toLowerCase/toUpperCase on hash inputs).", - "No raw user transcript content may be logged or emitted into events as part of this change." + "Codex Web sandbox: DO NOT run git network commands (no git fetch/pull/push/clone). Use the UI Create PR button if and only if there is a real diff.", + "Alignment Evidence (must print): task_id, base_branch, branch_name, _meta.canary from codex/tasks/latest.json; also print `git rev-parse HEAD` for evidence only (never STOP on SHA mismatch).", + "Repo-scope enforcement: do not edit files outside repo_scope. If you discover an out-of-scope fix, record it under a Repair Manifest section in the final summary instead of editing it.", + "Anti-cop-out rule: perform a Work-Exists Gate by locating and citing the exact files/lines to change. If no actionable work exists, STOP with evidence. No diff => no PR.", + "Do not store, log, emit, or enqueue any plaintext transcript or model-output fragments in any failure path. Only allow IDs, hashes, timestamps, enum reason codes, schema versions, and counts.", + "Do not add new dependencies unless absolutely required; prefer small, deterministic utilities and tests.", + "Do not create draft PRs." ], "acceptance_checks": [ - "Alignment Evidence: print task_id, base_branch, branch_name, repo_scope, tests_to_run at start of run.", - "Work-Exists Gate: identify all transcript write paths (upload.js, GraphQL addEntry/updateEntry, scribe worker transcript persistence) and show exact files/lines to be changed.", - "Implement a single ingest canonicalization function (v1) using NFKC + punctuation folding + newline normalization + BOM/null stripping + internal whitespace folding (preserve newlines) + trim; store canonicalizationVersion='1'.", - "Entry stores rawTranscript (untouched) and canonicalTranscript (canonicalized). transcriptHash is sha256(canonicalTranscript).", - "All transcript-writing paths set/update canonical fields consistently when transcript changes.", - "Add/extend deterministic tests covering: smart quotes folding, dash folding, ellipsis folding, CRLF/CR normalization, BOM/null stripping, internal whitespace folding (tabs/multi-spaces without breaking newlines), and idempotency (canon(canon(x))==canon(x)).", - "Run tests_to_run and show outputs. If any test is skipped, explain why and provide a safe alternative.", - "Change Proof: show git status -sb and git diff --stat at end. No diff => no PR." + "latest.json is valid JSON (parse check passes).", + "node scripts/codex_preflight.mjs --ci passes.", + "Implement SanitizedError (or equivalent) so JSON.parse / worker failure logging cannot include raw input fragments; tests prove `SENSITIVE_LEAK_123` never appears in any sink outputs.", + "DLQ/retry/job payload scrub is enforced: only whitelisted metadata fields are allowed; tests assert no unexpected string fields beyond approved schema.", + "Leak Canary suite exists and asserts canary is absent from: logs (console/logger spies), emitted events, AgentTask records/payloads, queue/job payloads, and mocked Langfuse span metadata.", + "Drop-with-record ledger exists: writes only structured data (reason enum, hashes, counts, canonicalizationVersion, processingStatus, warningCodes) and contains no plaintext fields.", + "PROCESSING_WARNING_CODE enum added in a central contract module (no string soup).", + "Consumer failure contract documented: schemaVersion required; unknown schemaVersion rejects loudly; processingStatus semantics documented and tested (complete/partial/failed; empty arrays not equivalent across statuses).", + "UNANCHORED_SIGNAL schema/type is defined (stub only) with fields suitable for later user resolution; no UI loop required in this run.", + "pnpm --filter server test passes." ], - "locks": { - "task_id": "phase1_ingest_canonicalization_2026_02_20", - "base_branch": "develop", - "branch_name": "codex/implement-transcript-canonicalization-at-ingest", - "canary": "PHASE1_INGEST_CANON_V1_CANARY_2026_02_20" + "_meta": { + "canary": "RUN_B_FIREWALL_CANARY_2026_02_20", + "created_at": "2026-02-20", + "notes": [ + "Run B focuses on failure-path leak prevention + contracts + ledgers. No safetyView implementation in this run; adopt SafetyView spec later as a dedicated run.", + "Codex Web may check out to a local branch named 'work'—this is not a failure. Validate by latest.json content + canary + locks." + ] } } diff --git a/server/docs/run-b-failure-firewall.md b/server/docs/run-b-failure-firewall.md new file mode 100644 index 000000000..15f42dbd5 --- /dev/null +++ b/server/docs/run-b-failure-firewall.md @@ -0,0 +1,25 @@ +# Run B Failure-Path Firewall Contract + +## ENTRY_ANALYZED consumer contract + +- `schemaVersion` is required and must equal `1.0`. +- Unknown `schemaVersion` values are rejected with a loud contract error. +- `processingStatus` enum values: + - `complete`: requires at least one `bloomCards` item. + - `partial`: allows empty `bloomCards` and must include warning codes when data was dropped. + - `failed`: terminal consumer failure state. +- Empty arrays are not treated equivalently across statuses. + +## Drop-with-record ledger + +Drop ledger records are stored in reflection `structuredData.dropLedger` and include only: + +- reason enum + reason hash +- dropped counts + total +- canonicalizationVersion +- processingStatus +- warningCodes +- createdAt timestamp +- IDs + +No transcript text or model-output fragments are persisted in ledger records. diff --git a/server/src/orchestration/__tests__/eventBus.contract.test.ts b/server/src/orchestration/__tests__/eventBus.contract.test.ts new file mode 100644 index 000000000..b4e364576 --- /dev/null +++ b/server/src/orchestration/__tests__/eventBus.contract.test.ts @@ -0,0 +1,44 @@ +import { afterEach, describe, expect, it, vi } from 'vitest'; + +import { + emitEntryAnalyzed, + onEntryAnalyzed, + resetEventBusForTests, +} from '../eventBus.js'; +import { ENTRY_ANALYZED_SCHEMA_VERSION, PROCESSING_STATUS } from '../../utils/failureFirewallContracts.js'; + +describe('ENTRY_ANALYZED contract', () => { + afterEach(() => { + resetEventBusForTests(); + vi.restoreAllMocks(); + }); + + it('emits schemaVersion envelope and blocks plaintext canary in listener payload', () => { + const listener = vi.fn(); + onEntryAnalyzed(listener); + + emitEntryAnalyzed({ + entryId: 'entry-1', + userId: 'user-1', + bloomCards: [{ headline: 'safe headline' }], + processingStatus: PROCESSING_STATUS.COMPLETE, + warningCodes: [], + }); + + const payload = listener.mock.calls[0]?.[0]; + expect(payload?.schemaVersion).toBe(ENTRY_ANALYZED_SCHEMA_VERSION); + expect(JSON.stringify(payload)).not.toContain('SENSITIVE_LEAK_123'); + }); + + it('rejects unknown schema versions loudly', () => { + expect(() => + emitEntryAnalyzed({ + schemaVersion: '2.0', + entryId: 'entry-1', + userId: 'user-1', + bloomCards: [{ headline: 'safe headline' }], + processingStatus: PROCESSING_STATUS.COMPLETE, + } as any), + ).toThrow('ENTRY_ANALYZED schemaVersion unsupported'); + }); +}); diff --git a/server/src/orchestration/agentOrchestration.js b/server/src/orchestration/agentOrchestration.js index 44626aae6..f7d0cadf2 100644 --- a/server/src/orchestration/agentOrchestration.js +++ b/server/src/orchestration/agentOrchestration.js @@ -11,6 +11,7 @@ import { queues } from '../queues/index.js'; import Entry from '../../models/Entry.js'; import { publishAgentTaskStatus } from './agentTaskEvents.js'; +import { enforceWhitelistedJobPayload, sanitizeError } from '../utils/failureFirewall.js'; // ✅ Standardized queue hygiene + retry policy (matches tests + “factory mode”) export const STANDARD_JOB_OPTS = Object.freeze({ @@ -69,7 +70,7 @@ async function markEnqueueFailed(task, err, fallbackMessage) { task._id, { status: 'failed', - error: err?.message || fallbackMessage, + error: sanitizeError(err).message || fallbackMessage, }, { new: true }, ); @@ -171,10 +172,10 @@ async function enqueueTask({ if (!task?._id) throw new Error(`[AGENT_ORCH] Missing task before enqueue for ${kind}`); - const payload = { + const payload = enforceWhitelistedJobPayload({ taskId: task._id.toString(), entryId: normalizedId.toString(), - }; + }); const resolvedJobId = jobId || task._id.toString(); diff --git a/server/src/orchestration/eventBus.js b/server/src/orchestration/eventBus.js index 1a86aefc4..ae35cabff 100644 --- a/server/src/orchestration/eventBus.js +++ b/server/src/orchestration/eventBus.js @@ -1,12 +1,16 @@ import { EventEmitter } from 'node:events'; +import { toEntryAnalyzedEnvelope, validateEntryAnalyzedEnvelope } from '../utils/failureFirewall.js'; + export const ENTRY_ANALYZED_EVENT = 'ENTRY_ANALYZED'; const bus = new EventEmitter(); bus.setMaxListeners(100); export const emitEntryAnalyzed = (payload) => { - bus.emit(ENTRY_ANALYZED_EVENT, payload); + const envelope = toEntryAnalyzedEnvelope(payload); + validateEntryAnalyzedEnvelope(envelope); + bus.emit(ENTRY_ANALYZED_EVENT, envelope); }; export const onEntryAnalyzed = (listener) => { diff --git a/server/src/utils/__tests__/failureFirewall.test.ts b/server/src/utils/__tests__/failureFirewall.test.ts new file mode 100644 index 000000000..642e7f8c4 --- /dev/null +++ b/server/src/utils/__tests__/failureFirewall.test.ts @@ -0,0 +1,86 @@ +import { describe, expect, it } from 'vitest'; + +import { + buildDropLedgerRecord, + enforceWhitelistedJobPayload, + SanitizedError, + validateEntryAnalyzedEnvelope, +} from '../failureFirewall.js'; +import { + ENTRY_ANALYZED_SCHEMA_VERSION, + PROCESSING_STATUS, + PROCESSING_WARNING_CODE, + UNANCHORED_SIGNAL_SCHEMA, +} from '../failureFirewallContracts.js'; + +describe('failure firewall contracts', () => { + it('enforces strict job payload whitelist', () => { + const payload = enforceWhitelistedJobPayload({ + entryId: 'entry-1', + taskId: 'task-1', + transcript: 'SENSITIVE_LEAK_123', + nested: { modelOutput: 'SENSITIVE_LEAK_123' }, + } as any); + + expect(payload).toEqual({ entryId: 'entry-1', taskId: 'task-1' }); + expect(JSON.stringify(payload)).not.toContain('SENSITIVE_LEAK_123'); + }); + + it('rejects unknown schema versions loudly', () => { + expect(() => + validateEntryAnalyzedEnvelope({ + schemaVersion: '999', + processingStatus: PROCESSING_STATUS.COMPLETE, + bloomCards: [{ headline: 'ok' }], + }), + ).toThrow(SanitizedError); + }); + + it('treats empty arrays differently across processing statuses', () => { + expect(() => + validateEntryAnalyzedEnvelope({ + schemaVersion: ENTRY_ANALYZED_SCHEMA_VERSION, + processingStatus: PROCESSING_STATUS.COMPLETE, + bloomCards: [], + }), + ).toThrow(SanitizedError); + + expect(() => + validateEntryAnalyzedEnvelope({ + schemaVersion: ENTRY_ANALYZED_SCHEMA_VERSION, + processingStatus: PROCESSING_STATUS.PARTIAL, + bloomCards: [], + }), + ).not.toThrow(); + }); + + it('builds drop ledger with structured-only fields', () => { + const record = buildDropLedgerRecord({ + entryId: 'entry-123', + reason: 'NO_RECEIPTS', + dropped: { missing_receipts: 2 }, + processingStatus: PROCESSING_STATUS.PARTIAL, + warningCodes: [PROCESSING_WARNING_CODE.DROPPED_BY_RECEIPT_POLICY], + canonicalizationVersion: '1', + }); + + expect(record).toEqual( + expect.objectContaining({ + entryId: 'entry-123', + reason: 'NO_RECEIPTS', + droppedCounts: { missing_receipts: 2 }, + processingStatus: PROCESSING_STATUS.PARTIAL, + }), + ); + expect(JSON.stringify(record)).not.toContain('SENSITIVE_LEAK_123'); + }); + + it('defines UNANCHORED_SIGNAL schema stub for later user resolution', () => { + expect(UNANCHORED_SIGNAL_SCHEMA).toEqual( + expect.objectContaining({ + type: 'UNANCHORED_SIGNAL', + schemaVersion: ENTRY_ANALYZED_SCHEMA_VERSION, + }), + ); + }); +}); diff --git a/server/src/utils/failureFirewall.js b/server/src/utils/failureFirewall.js new file mode 100644 index 000000000..2b16fa6e9 --- /dev/null +++ b/server/src/utils/failureFirewall.js @@ -0,0 +1,136 @@ +import { createHash } from 'node:crypto'; + +import { + ENTRY_ANALYZED_SCHEMA_VERSION, + PROCESSING_STATUS, + PROCESSING_WARNING_CODE, +} from './failureFirewallContracts.js'; + +const ALLOWED_JOB_PAYLOAD_KEYS = Object.freeze(['entryId', 'taskId']); + +export class SanitizedError extends Error { + constructor(code, safeMessage, metadata = {}) { + super(safeMessage); + this.name = 'SanitizedError'; + this.code = code; + this.safeMetadata = metadata; + } +} + +export const hashValue = (value) => + createHash('sha256').update(String(value ?? ''), 'utf8').digest('hex'); + +export function sanitizeError(rawError, fallbackCode = PROCESSING_WARNING_CODE.WORKER_FAILURE) { + const code = rawError?.code || fallbackCode; + const message = rawError instanceof SanitizedError ? rawError.message : 'Sanitized worker failure'; + const details = rawError instanceof SanitizedError ? rawError.safeMetadata : {}; + + return { + code, + message, + details, + fingerprint: hashValue(rawError?.message || rawError?.stack || code), + }; +} + +export function enforceWhitelistedJobPayload(payload = {}) { + const safePayload = {}; + + for (const key of ALLOWED_JOB_PAYLOAD_KEYS) { + const value = payload?.[key]; + if (typeof value === 'string' && value.trim()) { + safePayload[key] = value.trim(); + } + } + + if (!safePayload.entryId || !safePayload.taskId) { + throw new SanitizedError( + PROCESSING_WARNING_CODE.INVALID_JOB_PAYLOAD, + 'Invalid worker job payload', + { + providedKeys: Object.keys(payload || {}), + allowedKeys: ALLOWED_JOB_PAYLOAD_KEYS, + }, + ); + } + + return safePayload; +} + +export function toEntryAnalyzedEnvelope(payload = {}) { + const status = payload.processingStatus || PROCESSING_STATUS.COMPLETE; + const warningCodes = Array.isArray(payload.warningCodes) + ? payload.warningCodes.filter((code) => typeof code === 'string' && code.trim()) + : []; + + return { + ...payload, + schemaVersion: payload.schemaVersion || ENTRY_ANALYZED_SCHEMA_VERSION, + processingStatus: status, + warningCodes, + }; +} + +export function validateEntryAnalyzedEnvelope(payload = {}) { + if (!payload.schemaVersion) { + throw new SanitizedError( + PROCESSING_WARNING_CODE.MALFORMED_MODEL_REPLY, + 'ENTRY_ANALYZED missing schemaVersion', + { receivedKeys: Object.keys(payload || {}) }, + ); + } + + if (payload.schemaVersion !== ENTRY_ANALYZED_SCHEMA_VERSION) { + throw new SanitizedError( + PROCESSING_WARNING_CODE.MALFORMED_MODEL_REPLY, + 'ENTRY_ANALYZED schemaVersion unsupported', + { schemaVersion: payload.schemaVersion }, + ); + } + + if (!Object.values(PROCESSING_STATUS).includes(payload.processingStatus)) { + throw new SanitizedError( + PROCESSING_WARNING_CODE.MALFORMED_MODEL_REPLY, + 'ENTRY_ANALYZED processingStatus unsupported', + { processingStatus: payload.processingStatus }, + ); + } + + const cardsCount = Array.isArray(payload.bloomCards) ? payload.bloomCards.length : 0; + if (payload.processingStatus === PROCESSING_STATUS.COMPLETE && cardsCount === 0) { + throw new SanitizedError( + PROCESSING_WARNING_CODE.MALFORMED_MODEL_REPLY, + 'ENTRY_ANALYZED complete status requires cards', + { cardsCount }, + ); + } + + return payload; +} + +export function buildDropLedgerRecord({ + entryId, + reason, + dropped, + processingStatus, + warningCodes = [], + canonicalizationVersion, +}) { + const droppedObject = dropped && typeof dropped === 'object' ? dropped : {}; + + return { + entryId, + reason, + reasonHash: hashValue(reason), + droppedCounts: Object.fromEntries( + Object.entries(droppedObject).map(([k, v]) => [k, Number.isFinite(Number(v)) ? Number(v) : 0]), + ), + droppedTotal: Object.values(droppedObject).reduce((sum, n) => sum + (Number(n) || 0), 0), + canonicalizationVersion: String(canonicalizationVersion || '0'), + processingStatus, + warningCodes: warningCodes.filter((code) => typeof code === 'string' && code.trim()), + createdAt: new Date().toISOString(), + }; +} + +export const JOB_PAYLOAD_WHITELIST = ALLOWED_JOB_PAYLOAD_KEYS; diff --git a/server/src/utils/failureFirewallContracts.js b/server/src/utils/failureFirewallContracts.js new file mode 100644 index 000000000..49cf58612 --- /dev/null +++ b/server/src/utils/failureFirewallContracts.js @@ -0,0 +1,35 @@ +export const ENTRY_ANALYZED_SCHEMA_VERSION = '1.0'; + +export const PROCESSING_STATUS = Object.freeze({ + COMPLETE: 'complete', + PARTIAL: 'partial', + FAILED: 'failed', +}); + +export const PROCESSING_WARNING_CODE = Object.freeze({ + MALFORMED_MODEL_REPLY: 'MALFORMED_MODEL_REPLY', + DROPPED_BY_RECEIPT_POLICY: 'DROPPED_BY_RECEIPT_POLICY', + INVALID_JOB_PAYLOAD: 'INVALID_JOB_PAYLOAD', + WORKER_FAILURE: 'WORKER_FAILURE', +}); + +export const DROP_REASON_CODE = Object.freeze({ + MALFORMED_INPUT: 'MALFORMED_INPUT', + NO_RECEIPTS: 'NO_RECEIPTS', + NARRATIVE_FILTERED: 'NARRATIVE_FILTERED', +}); + +export const UNANCHORED_SIGNAL_SCHEMA = Object.freeze({ + schemaVersion: '1.0', + type: 'UNANCHORED_SIGNAL', + requiredFields: [ + 'signalId', + 'entryId', + 'userId', + 'detectedAt', + 'signalHash', + 'candidateKind', + 'status', + 'processingStatus', + ], +}); diff --git a/server/src/workers/__tests__/reflection.worker.test.ts b/server/src/workers/__tests__/reflection.worker.test.ts index d328f1bc6..5bdad5be5 100644 --- a/server/src/workers/__tests__/reflection.worker.test.ts +++ b/server/src/workers/__tests__/reflection.worker.test.ts @@ -621,4 +621,37 @@ ${JSON.stringify([{ type: 'reflection', headline: 'Safe headline', confidence: 0 expect.objectContaining({ type: 'CRISIS_TIER2_INTERNAL', severity: 'WARN' }) ); }); + + it('firewall canary never leaks to logs, task error fields, events, or langfuse metadata', async () => { + const canary = 'SENSITIVE_LEAK_123'; + mocks.findByIdAndUpdateMock + .mockResolvedValueOnce({ _id: taskId, entryId, status: 'running' }) + .mockResolvedValueOnce({ _id: taskId, entryId, status: 'failed' }); + + mocks.findByIdMock.mockReturnValue(mockLeanResult({ _id: entryId, userId: 'user-1', transcript: 'ok transcript' })); + mocks.reflectEntryWithContextMock.mockRejectedValueOnce(new Error(`upstream:${canary}`)); + + await expect(handleReflectionJob({ data: { entryId, taskId } } as any)).rejects.toThrow(canary); + + const taskFailureUpdate = mocks.findByIdAndUpdateMock.mock.calls.find((call) => call?.[1]?.status === 'failed'); + expect(taskFailureUpdate?.[1]?.error).not.toContain(canary); + + const reflectedLogs = [ + ...consoleErrorSpy.mock.calls.flat(), + ...consoleWarnSpy.mock.calls.flat(), + ...consoleInfoSpy.mock.calls.flat(), + ] + .map((v) => (typeof v === 'string' ? v : JSON.stringify(v))) + .join(' '); + expect(reflectedLogs).not.toContain(canary); + + const failedEventPayload = mocks.traceLogEventMock.mock.calls + .filter(([name]) => name === 'reflection-failed') + .flatMap(([, payload]) => [JSON.stringify(payload)]); + expect(failedEventPayload.join(' ')).not.toContain(canary); + + const emittedPayloads = mocks.emitEntryAnalyzedMock.mock.calls.map(([payload]) => JSON.stringify(payload)); + expect(emittedPayloads.join(' ')).not.toContain(canary); + }); + }); \ No newline at end of file diff --git a/server/src/workers/archivist.worker.js b/server/src/workers/archivist.worker.js index 0569d0554..23b3a7738 100644 --- a/server/src/workers/archivist.worker.js +++ b/server/src/workers/archivist.worker.js @@ -7,6 +7,8 @@ import { isWorkerDisabled } from '../utils/boolean.js'; import { createAgent } from '../../utils/agents/createAgent.js'; import { upsertMemoryForEntry } from '../utils/archivistMemoryClient.js'; import { connection } from '../queues/index.js'; +import { enforceWhitelistedJobPayload, sanitizeError } from '../utils/failureFirewall.js'; +import { PROCESSING_WARNING_CODE } from '../utils/failureFirewallContracts.js'; export const ARCHIVIST_QUEUE_NAME = 'archivist'; export const ARCHIVIST_MODE = 'archivist'; @@ -23,11 +25,7 @@ export async function updateArchivistTaskStatus(taskId, updates) { } export async function handleArchivistJob(job, context = {}) { - const { entryId, taskId } = job.data || {}; - - if (!entryId || !taskId) { - throw new Error('Job payload missing entryId or taskId'); - } + const { entryId, taskId } = enforceWhitelistedJobPayload(job?.data || {}); const { tools, bus } = context; let trace = null; @@ -116,9 +114,12 @@ export async function handleArchivistJob(job, context = {}) { return serializeAgentTask(doneTask); } catch (error) { + const safeError = sanitizeError(error, PROCESSING_WARNING_CODE.WORKER_FAILURE); await updateArchivistTaskStatus(taskId, { status: 'failed', - error: error?.message || 'Archivist worker failed', + error: safeError.message, + warningCode: safeError.code, + errorFingerprint: safeError.fingerprint, traceId: trace?.id, }); @@ -126,7 +127,8 @@ export async function handleArchivistJob(job, context = {}) { await trace.logEvent('archivist-failed', { entryId, taskId, - error: error?.message, + warningCode: safeError.code, + errorFingerprint: safeError.fingerprint, }); } @@ -173,16 +175,22 @@ export async function getArchivistWorker() { }); archivistWorkerInstance.on('failed', (job, err) => { + const safeError = sanitizeError(err, PROCESSING_WARNING_CODE.WORKER_FAILURE); console.error('[ARCHIVIST] Job failed', { jobId: job?.id, entryId: job?.data?.entryId, taskId: job?.data?.taskId, - error: err?.message, + warningCode: safeError.code, + errorFingerprint: safeError.fingerprint, }); }); archivistWorkerInstance.on('error', err => { - console.error('[ARCHIVIST] Worker error', err); + const safeError = sanitizeError(err, PROCESSING_WARNING_CODE.WORKER_FAILURE); + console.error('[ARCHIVIST] Worker error', { + warningCode: safeError.code, + errorFingerprint: safeError.fingerprint, + }); }); return archivistWorkerInstance; diff --git a/server/src/workers/reflection.worker.js b/server/src/workers/reflection.worker.js index d4df9f86c..291e46e9b 100644 --- a/server/src/workers/reflection.worker.js +++ b/server/src/workers/reflection.worker.js @@ -14,6 +14,12 @@ import { buildMCPContext, reflectEntryWithContext } from '../../utils/contextRou import { observeTranscript } from '../../agents/observer.js'; import { normalizeReceiptText, validateReceipt } from '../utils/truthValidator.js'; import { CONTRACT_REASONS, RECEIPT_VALIDATION_REASONS } from '../utils/meaningSpineContracts.js'; +import { + buildDropLedgerRecord, + enforceWhitelistedJobPayload, + sanitizeError, +} from '../utils/failureFirewall.js'; +import { PROCESSING_STATUS, PROCESSING_WARNING_CODE } from '../utils/failureFirewallContracts.js'; import { isNarrativeEnabled } from '../utils/narrativePolicy.js'; import { calibrateObserverConfidence, @@ -214,17 +220,16 @@ const extractJsonArray = (text) => { } }; -const coerceCards = ({ reply, transcript }) => { +const coerceCards = ({ reply }) => { const parsed = extractJsonArray(reply); if (parsed) return parsed; - const fallbackAnchor = (transcript || '').slice(0, 160).trim(); return [ { type: 'reflection', headline: 'Reflection', confidence: 0.5, - receipts: fallbackAnchor ? [{ quote: fallbackAnchor }] : [], + receipts: [], user_state: 'pending', }, ]; @@ -464,7 +469,7 @@ export const sanitizeBloomCardsWithContract = (args) => { return { cards, contract }; }; -const buildReflectionDoc = ({ cards, safety, contract }) => { +const buildReflectionDoc = ({ cards, safety, contract, dropLedger }) => { const textFromCards = cards.map((card) => card.headline).filter(Boolean).join(' | ').trim(); const storedMeaning = textFromCards || NO_RECEIPTED_MEANING_PLACEHOLDER; @@ -486,6 +491,7 @@ const buildReflectionDoc = ({ cards, safety, contract }) => { bloom_cards: cards, contract: contract || fallbackContract, ...(safety ? { safety } : {}), + ...(dropLedger ? { dropLedger } : {}), }, createdAt: new Date(), }; @@ -501,8 +507,7 @@ export async function updateReflectionTaskStatus(taskId, updates) { } export async function handleReflectionJob(job) { - const { entryId, taskId } = job?.data || {}; - if (!entryId || !taskId) throw new Error('Job payload missing entryId or taskId'); + const { entryId, taskId } = enforceWhitelistedJobPayload(job?.data || {}); let trace = null; @@ -782,10 +787,22 @@ export async function handleReflectionJob(job) { } : null; + const warningCodes = bloomCards.length === 0 ? [PROCESSING_WARNING_CODE.DROPPED_BY_RECEIPT_POLICY] : []; + const processingStatus = bloomCards.length > 0 ? PROCESSING_STATUS.COMPLETE : PROCESSING_STATUS.PARTIAL; + const dropLedger = buildDropLedgerRecord({ + entryId: entryIdStr, + reason: contract?.reason, + dropped: contract?.dropped, + processingStatus, + warningCodes, + canonicalizationVersion: entry?.canonicalizationVersion, + }); + const reflectionDoc = buildReflectionDoc({ cards: bloomCards, safety: safetyTier1, contract, + dropLedger, }); await Entry.updateOne( @@ -798,6 +815,8 @@ export async function handleReflectionJob(job) { // 3) EVENT EMISSION (sanitized only) emitEntryAnalyzed({ + processingStatus, + warningCodes, entryId: entryIdStr, userId: userIdStr, bloomCards, @@ -824,6 +843,8 @@ export async function handleReflectionJob(job) { taskId, bloom_card_count: bloomCards.length, contract: contract || undefined, + processingStatus, + warningCodesCount: warningCodes.length, }); } } catch { @@ -832,11 +853,16 @@ export async function handleReflectionJob(job) { return serializeAgentTask(doneTask); } catch (error) { - const { entryId: failEntryId, taskId: failTaskId } = job?.data || {}; + const rawPayload = job?.data || {}; + const failEntryId = typeof rawPayload?.entryId === 'string' ? rawPayload.entryId : null; + const failTaskId = typeof rawPayload?.taskId === 'string' ? rawPayload.taskId : null; + const safeError = sanitizeError(error, PROCESSING_WARNING_CODE.WORKER_FAILURE); await updateReflectionTaskStatus(failTaskId, { status: 'failed', - error: error?.message || 'Reflection worker failed', + error: safeError.message, + warningCode: safeError.code, + errorFingerprint: safeError.fingerprint, traceId: trace?.id, }); @@ -850,7 +876,8 @@ export async function handleReflectionJob(job) { await trace.logEvent('reflection-failed', { entryId: failEntryId, taskId: failTaskId, - error: error?.message, + warningCode: safeError.code, + errorFingerprint: safeError.fingerprint, }); } } catch { @@ -899,16 +926,22 @@ export async function getReflectionWorker() { }); reflectionWorkerInstance.on('failed', (job, err) => { + const safeError = sanitizeError(err, PROCESSING_WARNING_CODE.WORKER_FAILURE); console.error('[REFLECTION] Job failed', { jobId: job?.id, entryId: job?.data?.entryId, taskId: job?.data?.taskId, - error: err?.message, + warningCode: safeError.code, + errorFingerprint: safeError.fingerprint, }); }); reflectionWorkerInstance.on('error', (err) => { - console.error('[REFLECTION] Worker error', err); + const safeError = sanitizeError(err, PROCESSING_WARNING_CODE.WORKER_FAILURE); + console.error('[REFLECTION] Worker error', { + warningCode: safeError.code, + errorFingerprint: safeError.fingerprint, + }); }); return reflectionWorkerInstance; diff --git a/server/src/workers/scribe.worker.js b/server/src/workers/scribe.worker.js index 80852e5fb..8128bf02e 100644 --- a/server/src/workers/scribe.worker.js +++ b/server/src/workers/scribe.worker.js @@ -18,6 +18,8 @@ import { import { isWorkerDisabled } from '../utils/boolean.js'; import { openai } from '../../config/openai.js'; import { generateSignedUrl, getS3BucketName } from '../../utils/s3.js'; +import { enforceWhitelistedJobPayload, sanitizeError } from '../utils/failureFirewall.js'; +import { PROCESSING_WARNING_CODE } from '../utils/failureFirewallContracts.js'; export const SCRIBE_QUEUE_NAME = 'scribe'; @@ -261,9 +263,11 @@ export const transcribeAudio = async (entryId) => { console.log('[WHISPER] Transcription complete', { entryId }); return text; } catch (error) { + const safeError = sanitizeError(error, PROCESSING_WARNING_CODE.WORKER_FAILURE); console.error('[WHISPER] Transcription failed', { entryId, - error: error?.message, + warningCode: safeError.code, + errorFingerprint: safeError.fingerprint, }); throw error; } finally { @@ -290,10 +294,7 @@ export async function updateTaskStatus(taskId, updates) { * IMPORTANT: transcript persistence must not depend on summarize success. */ export async function handleScribeJob(job, context = {}) { - const { entryId, taskId } = job.data || {}; - if (!entryId || !taskId) { - throw new Error('Job payload missing entryId or taskId'); - } + const { entryId, taskId } = enforceWhitelistedJobPayload(job?.data || {}); const { tools, trace: agentTrace } = context; @@ -416,24 +417,26 @@ export async function handleScribeJob(job, context = {}) { // ----------------------------------------------------------------------- // Only fail the job if the critical path (transcription persistence) failed. if (!transcriptionPersisted) { + const safeError = sanitizeError(error, PROCESSING_WARNING_CODE.WORKER_FAILURE); console.error('[SCRIBE] Job failed (Critical)', { jobId: job?.id, entryId, taskId, - error: error?.message, + warningCode: safeError.code, + errorFingerprint: safeError.fingerprint, }); // Wrap this in try/catch so even if the task update fails, we still kill the spinner try { await updateTaskStatus(taskId, { status: 'failed', - error: error?.message || 'Scribe worker failed', + error: sanitizeError(error, PROCESSING_WARNING_CODE.WORKER_FAILURE).message || 'Scribe worker failed', traceId: trace?.id, }); } catch (e) { console.warn('[SCRIBE] Failed to mark task failed (continuing)', { taskId, - error: e?.message, + warningCode: sanitizeError(e, PROCESSING_WARNING_CODE.WORKER_FAILURE).code, }); } @@ -443,7 +446,7 @@ export async function handleScribeJob(job, context = {}) { { $set: { transcriptStatus: 'failed', // Legacy UI support - transcriptError: error?.message || 'Scribe worker failed', + transcriptError: sanitizeError(error, PROCESSING_WARNING_CODE.WORKER_FAILURE).message || 'Scribe worker failed', 'pipelineStatus.transcription': 'failed', // New orchestration support }, } @@ -453,7 +456,7 @@ export async function handleScribeJob(job, context = {}) { await trace.logEvent('scribe-failed', { entryId, taskId, - error: error?.message, + warningCode: sanitizeError(error, PROCESSING_WARNING_CODE.WORKER_FAILURE).code, }); } @@ -469,7 +472,7 @@ export async function handleScribeJob(job, context = {}) { { entryId, taskId, - error: error?.message, + warningCode: sanitizeError(error, PROCESSING_WARNING_CODE.WORKER_FAILURE).code, } ); @@ -483,7 +486,7 @@ export async function handleScribeJob(job, context = {}) { } catch (e) { console.warn('[SCRIBE] Failed to mark task done after persistence', { taskId, - error: e?.message, + warningCode: sanitizeError(e, PROCESSING_WARNING_CODE.WORKER_FAILURE).code, }); } @@ -560,16 +563,22 @@ export async function getScribeWorker() { }); scribeWorkerInstance.on('failed', (job, err) => { + const safeError = sanitizeError(err, PROCESSING_WARNING_CODE.WORKER_FAILURE); console.error('[SCRIBE] Job failed', { jobId: job?.id, entryId: job?.data?.entryId, taskId: job?.data?.taskId, - error: err?.message, + warningCode: safeError.code, + errorFingerprint: safeError.fingerprint, }); }); scribeWorkerInstance.on('error', (err) => { - console.error('[SCRIBE] Worker error', err); + const safeError = sanitizeError(err, PROCESSING_WARNING_CODE.WORKER_FAILURE); + console.error('[SCRIBE] Worker error', { + warningCode: safeError.code, + errorFingerprint: safeError.fingerprint, + }); }); return scribeWorkerInstance;