From 5159751a96d00d12daef2271fb5cd8845c629eb3 Mon Sep 17 00:00:00 2001 From: Bernhard Seefeld Date: Tue, 9 Dec 2025 15:06:24 -0800 Subject: [PATCH 1/7] feat(memory): Incremental schema subscription updates (#2224) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(memory): Incremental schema subscription updates Optimize getSchemaSubscriptionMatches by preserving the schemaTracker between invocations instead of re-running full querySchema for every subscription after each commit. Changes: - Extend SchemaSubscription to store schemaTracker (docβ†’schema mappings) - Add evaluateDocumentLinks() for single-doc schema evaluation - Add querySchemaWithTracker() to expose schemaTracker from initial query - Replace full re-query with incremental update: 1. Find changed docs that exist in subscription's schemaTracker 2. Re-evaluate only those docs with their associated schemas 3. Follow new links incrementally (not already in schemaTracker) 4. Accumulate only new/changed facts for the response This significantly reduces work for subscriptions with large result sets where commits typically only affect a small portion of tracked documents. Falls back to full re-query on errors for safety. πŸ€– Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude * fmt * fix(memory): Use correct key format for schemaTracker lookup The schemaTracker uses "id/type" format (from BaseObjectManager.toKey), but extractChangedDocKeys was using "id\0type" format with null separator. This caused changed docs to never match entries in schemaTracker, breaking incremental subscription updates. Also use lastIndexOf('/') instead of split to handle IDs containing slashes. πŸ€– Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude * fix(memory): Track docs in schemaTracker even without schemaContext When schemaContext is undefined, the document was being loaded into the manager but not added to schemaTracker. This caused incremental updates to miss changes to these documents since they wouldn't be found in findAffectedDocs. Now we always add the document to schemaTracker with its selector, ensuring all documents in the query result can be tracked for changes. πŸ€– Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude * fix(memory): Add fallback when schemaTracker misses watchedObjects match The incremental update optimization now uses a two-phase approach: 1. First check if Subscription.match() triggers on watchedObjects 2. Then try to use schemaTracker for incremental processing 3. Fall back to full re-query for subscriptions where watchedObjects matches but schemaTracker doesn't have the changed docs This is more complex than ideal - the root cause is that schemaTracker and watchedObjects can get out of sync because they're populated via different code paths and use different key formats: - watchedObjects: watch:///${space}/${of}/${the} - schemaTracker: ${of}/${the} TODO: Unify these tracking mechanisms so watchedObjects becomes redundant and can be removed. πŸ€– Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude * fix(memory): Use indexOf instead of lastIndexOf to parse docKey The docKey format is "id/type" where type can contain slashes (e.g., "application/json"). Using lastIndexOf("/") incorrectly split "of:HASH/application/json" into: - docId: "of:HASH/application" (wrong) - docType: "json" (wrong) Using indexOf("/") correctly splits at the first slash: - docId: "of:HASH" (correct) - docType: "application/json" (correct) This was causing selectFact to return null, so incremental updates never returned any facts, breaking subscription notifications. πŸ€– Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude * refactor(memory): Simplify schema subscription matching Replace dual tracking (watchedObjects + schemaTracker) with clearer separation: - schemaTracker: tracks which docs to watch with which schemas - sentDocs: tracks which docs have been sent to the client - isWildcardQuery: flag for queries with of: "_" For wildcard queries, match changed docs by type pattern instead of re-running the full query. Both wildcard and non-wildcard queries now use the same incremental processing flow. Remove fallback re-query code paths - throw on mount error instead. πŸ€– Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude * fix(memory): Add cycle detection for growing path cycles in incremental updates Track per-document visit counts to detect cycles like A -> A/foo -> A/foo/foo that create infinitely growing paths. Limits each document to 100 visits before logging a warning and stopping further traversal. πŸ€– Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --------- Co-authored-by: Claude --- packages/memory/memory.ts | 29 +++ packages/memory/provider.ts | 389 ++++++++++++++++++++++++++++---- packages/memory/space-schema.ts | 83 ++++++- packages/memory/space.ts | 60 ++++- 4 files changed, 502 insertions(+), 59 deletions(-) diff --git a/packages/memory/memory.ts b/packages/memory/memory.ts index f9c3081943..e9d84e32e1 100644 --- a/packages/memory/memory.ts +++ b/packages/memory/memory.ts @@ -168,6 +168,35 @@ export const querySchema = async (session: Session, query: SchemaQuery) => { }); }; +/** + * Internal variant of querySchema that also returns the schemaTracker. + * Used by provider.ts for incremental subscription updates. + */ +export const querySchemaWithTracker = async ( + session: Session, + query: SchemaQuery, +) => { + return await traceAsync("memory.querySchemaWithTracker", async (span) => { + addMemoryAttributes(span, { + operation: "querySchemaWithTracker", + space: query.sub, + }); + + const { ok: space, error } = await mount(session, query.sub); + if (error) { + span.setAttribute("mount.status", "error"); + return { error }; + } + + span.setAttribute("mount.status", "success"); + // Cast is safe: the Space class implements both SpaceSession and Session + return Space.querySchemaWithTracker( + space as unknown as Space.Session, + query, + ); + }); +}; + export const transact = async (session: Session, transaction: Transaction) => { return await traceAsync("memory.transact", async (span) => { addMemoryAttributes(span, { diff --git a/packages/memory/provider.ts b/packages/memory/provider.ts index f20e2c3ecf..d7453d0cf6 100644 --- a/packages/memory/provider.ts +++ b/packages/memory/provider.ts @@ -36,13 +36,21 @@ import { } from "./interface.ts"; import * as SelectionBuilder from "./selection.ts"; import * as Memory from "./memory.ts"; -import { refer } from "./reference.ts"; -import { redactCommitData, selectFact } from "./space.ts"; +import { fromString as causeFromString, refer } from "./reference.ts"; +import { + redactCommitData, + selectFact, + type Session as SpaceSession, +} from "./space.ts"; +import { evaluateDocumentLinks } from "./space-schema.ts"; import * as Subscription from "./subscription.ts"; import * as FactModule from "./fact.ts"; import { setRevision } from "@commontools/memory/selection"; import { getLogger } from "@commontools/utils/logger"; import { ACL_TYPE, isACL } from "./acl.ts"; +import { MapSet } from "@commontools/runner/traverse"; +import { deepEqual } from "@commontools/runner"; +import type { SchemaPathSelector } from "./consumer.ts"; const logger = getLogger("memory-provider", { enabled: true, @@ -145,8 +153,15 @@ class MemoryProvider< export class SchemaSubscription { constructor( public invocation: SchemaQuery, - public watchedObjects: Set, public since: number = -1, + // Track which docs were scanned with which schemas for incremental updates + public schemaTracker: MapSet = new MapSet( + deepEqual, + ), + // True if this is a wildcard query (of: "_") that can't use incremental updates + public isWildcardQuery: boolean = false, + // Track which docs have been sent to the client (by address format) + public sentDocs: Set = new Set(), ) {} } @@ -319,12 +334,42 @@ class MemoryProviderSession< }); } case "/memory/graph/query": { - const result = await this.memory.querySchema(invocation); - // We maintain subscriptions at this level, but really need more data from the query response - if (invocation.args.subscribe && result.ok !== undefined) { - this.addSchemaSubscription(of, invocation, result.ok); + // Use querySchemaWithTracker when subscribing to capture the schemaTracker + // for incremental updates on subsequent commits + if (invocation.args.subscribe) { + const trackerResult = await Memory.querySchemaWithTracker( + this.memory as Memory.Memory, + invocation, + ); + if ("error" in trackerResult) { + return this.perform({ + the: "task/return", + of, + is: trackerResult, + }); + } + const { selection, schemaTracker } = trackerResult.ok; + this.addSchemaSubscription(of, invocation, selection, schemaTracker); this.memory.subscribe(this); + + // Filter out any known results + if (invocation.args.excludeSent) { + const space = invocation.sub; + const factSelection = selection[space]; + const factVersions = [...FactModule.iterate(factSelection)]; + selection[space] = this.toSelection( + this.filterKnownFacts(factVersions), + ); + } + return this.perform({ + the: "task/return", + of, + is: { ok: selection }, + }); } + + // Non-subscribing queries use the regular querySchema + const result = await this.memory.querySchema(invocation); // Filter out any known results if (result.ok !== undefined && invocation.args.excludeSent) { const space = invocation.sub; @@ -507,79 +552,323 @@ class MemoryProviderSession< of: JobId, invocation: SchemaQuery, result: Selection, + schemaTracker?: MapSet, ) { const space = invocation.sub; const factSelection = result[space]; const factVersions = [...FactModule.iterate(factSelection)]; - const includedFacts = new Set( - factVersions.map((fv) => this.formatAddress(space, fv)), - ); const since = factVersions.reduce( (acc, cur, _i) => cur.since > acc ? cur.since : acc, -1, ); + + // Check if this is a wildcard query (of: "_") + // Wildcard queries can't benefit from incremental updates via schemaTracker + const isWildcardQuery = this.isWildcardQuery(invocation); + + // Track which docs were sent in the initial query result + const sentDocs = new Set( + factVersions.map((fv) => this.formatAddress(space, fv)), + ); + const subscription = new SchemaSubscription( invocation, - includedFacts, since, + schemaTracker ?? new MapSet(deepEqual), + isWildcardQuery, + sentDocs, ); this.schemaChannels.set(of, subscription); } + /** + * Check if a schema query contains any wildcard selectors (of: "_"). + * Wildcard queries match based on type rather than specific document IDs. + */ + private isWildcardQuery( + invocation: SchemaQuery, + ): boolean { + const selectSchema = invocation.args.selectSchema; + for (const of of Object.keys(selectSchema)) { + if (of === "_") return true; + } + return false; + } + + /** + * For wildcard queries, find changed docs that match the type pattern. + * Returns affected docs with the schema from the wildcard selector. + */ + private findAffectedDocsForWildcard( + changedDocs: Set, + invocation: SchemaQuery, + ): Array<{ docKey: string; schemas: Set }> { + const affected: Array< + { docKey: string; schemas: Set } + > = []; + const selectSchema = invocation.args.selectSchema; + + // Get the wildcard selector's type patterns + const wildcardSelector = selectSchema["_"]; + if (!wildcardSelector) return affected; + + // Build a map of type -> schemas for matching + const typeSchemas = new Map>(); + for (const [the, causes] of Object.entries(wildcardSelector)) { + const schemas = new Set(); + for (const schema of Object.values(causes)) { + schemas.add(schema as SchemaPathSelector); + } + if (schemas.size > 0) { + typeSchemas.set(the, schemas); + } + } + + // Match changed docs against type patterns + for (const docKey of changedDocs) { + const slashIndex = docKey.indexOf("/"); + if (slashIndex === -1) continue; + const docType = docKey.slice(slashIndex + 1); + + // Check if this type matches a wildcard pattern + const schemas = typeSchemas.get(docType) ?? typeSchemas.get("_"); + if (schemas && schemas.size > 0) { + affected.push({ docKey, schemas: new Set(schemas) }); + } + } + + return affected; + } + + /** + * Incrementally find schema subscription matches after a transaction. + * + * For wildcard queries (of: "_"): Match changed docs against type pattern. + * For specific document queries: Use schemaTracker to find affected docs. + * + * Both paths then incrementally update by following links. + */ private async getSchemaSubscriptionMatches( transaction: Transaction, ): Promise<[JobId | undefined, number, Revision[]]> { const schemaMatches = new Map>(); const space = transaction.sub; let maxSince = -1; - let lastId; - // Eventually, we should support multiple spaces, but currently the since handling is per-space - // Our websockets are also per-space, so there's larger issues involved. + let lastId: JobId | undefined; + + // Early exit if no schema subscriptions + if (this.schemaChannels.size === 0) { + return [undefined, -1, []]; + } + + // Extract changed document keys from transaction + const changedDocs = this.extractChangedDocKeys(transaction); + if (changedDocs.size === 0) { + return [undefined, -1, []]; + } + + // Get access to the space session for evaluating documents + const mountResult = await Memory.mount( + this.memory as Memory.Memory, + space, + ); + if (mountResult.error) { + throw new Error(`Failed to mount space ${space}: ${mountResult.error}`); + } + const spaceSession = mountResult.ok as unknown as SpaceSession; + for (const [id, subscription] of this.schemaChannels) { - if ( - Subscription.match(transaction, subscription.watchedObjects) - ) { - // Re-run our original query, but not as a subscription - const newArgs = { ...subscription.invocation.args, subscribe: false }; - const newInvocation = { ...subscription.invocation, args: newArgs }; - // We need to bypass the perform queue to avoid a deadlock - const result = await Memory.querySchema( - this.memory as Memory.Memory, - newInvocation, - ); - if (result.error) { - console.warn("Encountered querySchema error", result.error); - continue; - } - const factSelection = result.ok![space]; - const factVersions = [...FactModule.iterate(factSelection)]; - const includedFacts = new Map( - factVersions.map((fv) => [this.formatAddress(space, fv), fv]), - ); - const since = factVersions.reduce( - (acc, cur, _i) => cur.since > acc ? cur.since : acc, - -1, - ); - // We only need to include the facts that are newer than our query - const newFacts = includedFacts.entries().filter(( - [address, factVersion], - ) => - factVersion.since > subscription.since || - !subscription.watchedObjects.has(address) - ); - for (const [address, factVersion] of newFacts) { + // Find affected docs - method depends on query type + const affectedDocs = subscription.isWildcardQuery + ? this.findAffectedDocsForWildcard(changedDocs, subscription.invocation) + : this.findAffectedDocs(changedDocs, subscription.schemaTracker); + + // No affected docs means this subscription doesn't care about these changes + if (affectedDocs.length === 0) { + continue; + } + + // Process affected docs incrementally + const result = this.processIncrementalUpdate( + spaceSession, + subscription, + affectedDocs, + space, + ); + + // Collect facts that are either newer or haven't been sent yet + let hasNewFacts = false; + for (const [address, factVersion] of result.newFacts) { + const isNewer = factVersion.since > subscription.since; + const notSentYet = !subscription.sentDocs.has(address); + + if (isNewer || notSentYet) { schemaMatches.set(address, factVersion); + subscription.sentDocs.add(address); + hasNewFacts = true; + if (isNewer) { + subscription.since = factVersion.since; + } } - // Update our subscription - subscription.watchedObjects = new Set(includedFacts.keys()); - subscription.since = since; + } + + if (hasNewFacts) { lastId = id; - maxSince = since > maxSince ? since : maxSince; + maxSince = Math.max(maxSince, subscription.since); } } + return [lastId, maxSince, [...schemaMatches.values()]]; } + /** + * Extract document keys (id/type format) from a transaction's changes. + */ + private extractChangedDocKeys( + transaction: Transaction, + ): Set { + const changedDocs = new Set(); + for (const fact of SelectionBuilder.iterate(transaction.args.changes)) { + if (fact.value !== true) { + // Format matches what schemaTracker uses: "id/type" (from BaseObjectManager.toKey) + changedDocs.add(`${fact.of}/${fact.the}`); + } + } + return changedDocs; + } + + /** + * Find docs in changedDocs that are tracked by the subscription's schemaTracker. + * Returns list of (docKey, schemas) pairs. + */ + private findAffectedDocs( + changedDocs: Set, + schemaTracker: MapSet, + ): Array<{ docKey: string; schemas: Set }> { + const affected: Array< + { docKey: string; schemas: Set } + > = []; + for (const docKey of changedDocs) { + const schemas = schemaTracker.get(docKey); + if (schemas && schemas.size > 0) { + affected.push({ docKey, schemas: new Set(schemas) }); + } + } + return affected; + } + + /** + * Process incremental update for a subscription given affected docs. + * Re-evaluates each affected doc with its schemas and follows new links. + */ + private processIncrementalUpdate( + spaceSession: SpaceSession, + subscription: SchemaSubscription, + affectedDocs: Array<{ docKey: string; schemas: Set }>, + space: Space, + ): { newFacts: Map> } { + const newFacts = new Map>(); + const classification = subscription.invocation.args.classification; + + // Queue of (docKey, schema) pairs to process + const pendingPairs: Array<{ docKey: string; schema: SchemaPathSelector }> = + []; + + // Initialize with affected docs and their schemas + for (const { docKey, schemas } of affectedDocs) { + for (const schema of schemas) { + pendingPairs.push({ docKey, schema }); + } + } + + // Process pending pairs - may grow as we discover new links + // Track processed (docKey, schema) pairs to avoid redundant work + const processedPairs = new Set(); + // Also track how many times each doc has been processed (regardless of schema) + // to detect growing path cycles like A -> A/foo -> A/foo/foo + const docProcessCount = new Map(); + const MAX_DOC_VISITS = 100; // Limit visits per doc to catch growing cycles + + while (pendingPairs.length > 0) { + const { docKey, schema } = pendingPairs.pop()!; + const pairKey = `${docKey}|${JSON.stringify(schema)}`; + + // Skip if already processed this exact pair + if (processedPairs.has(pairKey)) { + continue; + } + processedPairs.add(pairKey); + + // Check if we've visited this doc too many times (growing path cycle) + const visitCount = (docProcessCount.get(docKey) ?? 0) + 1; + if (visitCount > MAX_DOC_VISITS) { + logger.warn( + "incremental-update-cycle", + () => [ + `Document ${docKey} visited ${visitCount} times, possible growing path cycle`, + ], + ); + continue; + } + docProcessCount.set(docKey, visitCount); + + // Parse docKey back to id and type (format is "id/type" from BaseObjectManager.toKey) + // Note: type can contain slashes (e.g., "application/json"), so we split on the FIRST slash + // The id is always in the form "of:HASH" which doesn't contain slashes + const slashIndex = docKey.indexOf("/"); + if (slashIndex === -1) { + continue; + } + const docId = docKey.slice(0, slashIndex); + const docType = docKey.slice(slashIndex + 1); + + // Load the fact for this document to include in results + const fact = selectFact(spaceSession, { + of: docId as `${string}:${string}`, + the: docType as `${string}/${string}`, + }); + + if (!fact || fact.is === undefined) { + // Document doesn't exist yet - skip + continue; + } + + const address = this.formatAddress(space, fact); + newFacts.set(address, { + of: fact.of, + the: fact.the, + cause: causeFromString(fact.cause), + is: fact.is, + since: fact.since, + }); + + // Evaluate this document with the schema to find its current links + const links = evaluateDocumentLinks( + spaceSession, + { id: docId, type: docType }, + schema, + classification, + ); + + // Find new links: targets in links that aren't already in subscription.schemaTracker + if (links !== null) { + for (const [targetDocKey, targetSchemas] of links) { + for (const targetSchema of targetSchemas) { + if ( + !subscription.schemaTracker.hasValue(targetDocKey, targetSchema) + ) { + // New link discovered - add to pending and track it + pendingPairs.push({ docKey: targetDocKey, schema: targetSchema }); + subscription.schemaTracker.add(targetDocKey, targetSchema); + } + } + } + } + } + + return { newFacts }; + } + private async getAcl(space: MemorySpace): Promise { try { const result = await Memory.mount(this.memory as Memory.Memory, space); diff --git a/packages/memory/space-schema.ts b/packages/memory/space-schema.ts index 2f987d5d0b..50d48bd872 100644 --- a/packages/memory/space-schema.ts +++ b/packages/memory/space-schema.ts @@ -27,6 +27,7 @@ import type { Entity, FactSelection, MemorySpace, + MIME, SchemaQuery, } from "./interface.ts"; import { SelectAllString } from "./schema.ts"; @@ -151,10 +152,15 @@ export class ServerObjectManager extends BaseObjectManager< } } +export interface SelectSchemaResult { + facts: FactSelection; + schemaTracker: MapSet; +} + export const selectSchema = ( session: SpaceStoreSession, { selectSchema, since, classification }: SchemaQuery["args"], -): FactSelection => { +): SelectSchemaResult => { const startTime = performance.timeOrigin + performance.now(); const providedClassifications = new Set(classification); @@ -236,10 +242,18 @@ export const selectSchema = ( ) { if ( factSelector.of !== SelectAllString && - factSelector.the !== SelectAllString && - !getRevision(includedFacts, factSelector.of, factSelector.the) + factSelector.the !== SelectAllString ) { - setEmptyObj(includedFacts, factSelector.of, factSelector.the); + // Track all specifically-queried entities in schemaTracker so incremental + // updates can detect changes to them, even if they don't have data yet + const docKey = `${factSelector.of}/${factSelector.the}`; + if (!schemaTracker.has(docKey)) { + schemaTracker.add(docKey, factSelector.value); + } + + if (!getRevision(includedFacts, factSelector.of, factSelector.the)) { + setEmptyObj(includedFacts, factSelector.of, factSelector.the); + } } } const endTime = performance.timeOrigin + performance.now(); @@ -247,9 +261,59 @@ export const selectSchema = ( logger.info("slow-select", () => ["Slow selectSchema:", selectSchema]); } - return includedFacts; + return { facts: includedFacts, schemaTracker }; }; +/** + * Evaluates a single document with a schema and returns the links it contains. + * Used for incremental subscription updates - when a document changes, we re-evaluate + * just that document to find what links it now has. + * + * @param session - The space store session + * @param docAddress - The document to evaluate (id and type) + * @param schema - The schema to apply + * @param classification - Classification claims for access control + * @returns A MapSet of target document addresses to their schemas, or null if doc not found + */ +export function evaluateDocumentLinks( + session: SpaceStoreSession, + docAddress: { id: string; type: string }, + schema: SchemaPathSelector, + classification?: string[], +): MapSet | null { + const providedClassifications = new Set(classification); + const manager = new ServerObjectManager(session, providedClassifications); + const tracker = new CompoundCycleTracker< + Immutable, + SchemaContext | undefined + >(); + const cfc = new ContextualFlowControl(); + const schemaTracker = new MapSet(deepEqual); + + // Load the document + const address = { + id: docAddress.id as Entity, + type: docAddress.type as MIME, + path: [] as string[], + }; + const fact = manager.load(address); + if (fact === null || fact.value === undefined) { + return null; + } + + // Create the IAttestation with cause/since (we don't need these for link evaluation) + const attestation: IAttestation & { cause: CauseString; since: number } = { + ...fact, + cause: "" as CauseString, // Not needed for link evaluation + since: 0, + }; + + // Run the schema traversal to populate schemaTracker with links + loadFactsForDoc(manager, attestation, schema, tracker, cfc, schemaTracker); + + return schemaTracker; +} + // The fact passed in is the IAttestation for the top level 'is', so path // is empty. function loadFactsForDoc( @@ -260,6 +324,13 @@ function loadFactsForDoc( cfc: ContextualFlowControl, schemaTracker: MapSet, ) { + // Track all facts regardless of their value type + // This ensures watchedObjects and schemaTracker stay in sync + const factKey = manager.toKey(fact.address); + if (!schemaTracker.has(factKey)) { + schemaTracker.add(factKey, selector); + } + if (isObject(fact.value)) { if (selector.schemaContext !== undefined) { const factValue: IAttestation = { @@ -292,6 +363,8 @@ function loadFactsForDoc( // If we didn't provide a schema context, we still want the selected // object in our manager, so load it directly. manager.load(fact.address); + // Also track it in schemaTracker so incremental updates can find it + schemaTracker.add(manager.toKey(fact.address), selector); } // Also load any source links and recipes loadSource(manager, fact, new Set(), schemaTracker); diff --git a/packages/memory/space.ts b/packages/memory/space.ts index e44133478d..3c90f3390e 100644 --- a/packages/memory/space.ts +++ b/packages/memory/space.ts @@ -55,7 +55,7 @@ import { } from "./selection.ts"; import { SelectAllString } from "./schema.ts"; import * as Error from "./error.ts"; -import { selectSchema } from "./space-schema.ts"; +import { selectSchema, type SelectSchemaResult } from "./space-schema.ts"; import { JSONValue } from "@commontools/runner"; import { isObject } from "../utils/src/types.ts"; export type * from "./interface.ts"; @@ -1049,17 +1049,17 @@ export const querySchema = ( } try { - const result = session.store.transaction(selectSchema)( + const { facts } = session.store.transaction(selectSchema)( session, command.args, ); - const entities = Object.keys(result || {}).length; + const entities = Object.keys(facts || {}).length; span.setAttribute("querySchema.result_entity_count", entities); return { ok: { - [command.sub]: result, + [command.sub]: facts, } as Selection, }; } catch (error) { @@ -1077,6 +1077,58 @@ export const querySchema = ( }); }; +/** + * Internal variant of querySchema that also returns the schemaTracker. + * Used by provider.ts for incremental subscription updates. + */ +export const querySchemaWithTracker = ( + session: Session, + command: SchemaQuery, +): Result< + { + selection: Selection; + schemaTracker: SelectSchemaResult["schemaTracker"]; + }, + AuthorizationError | QueryError +> => { + return traceSync("space.querySchemaWithTracker", (span) => { + addMemoryAttributes(span, { + operation: "querySchemaWithTracker", + space: session.subject, + }); + + try { + const { facts, schemaTracker } = session.store.transaction(selectSchema)( + session, + command.args, + ); + + const entities = Object.keys(facts || {}).length; + span.setAttribute("querySchema.result_entity_count", entities); + + return { + ok: { + selection: { + [command.sub]: facts, + } as Selection, + schemaTracker, + }, + }; + } catch (error) { + if ((error as Error)?.name === "AuthorizationError") { + return { error: error as AuthorizationError }; + } + return { + error: Error.query( + command.sub, + command.args.selectSchema, + error as SqliteError, + ), + }; + } + }); +}; + export const LABEL_TYPE = "application/label+json" as const; export type FactSelectionValue = { is?: JSONValue; since: number }; // Get the labels associated with a set of commits. From 36abc025442a35c61a1b502b78edb02ea454f574 Mon Sep 17 00:00:00 2001 From: Bernhard Seefeld Date: Tue, 9 Dec 2025 15:23:02 -0800 Subject: [PATCH 2/7] feat(runner): auto-start charms when events are sent to unhandled storage locations (#2235) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(runner): auto-start charms when events are sent to unhandled storage locations When an event is sent to a storage location that has no registered event handler, the scheduler now attempts to start the underlying charm. This enables lazy charm initialization - charms can be started on-demand when they first receive an event. The implementation: - Adds ensureCharmRunning() utility that traverses the source cell chain to find the process cell, then starts the charm via runtime.runSynced() - Modifies queueEvent() to call ensureCharmRunning() when no handler is found - Includes infinite loop protection to prevent re-queuing if the charm doesn't register a handler for the event πŸ€– Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude * test(runner): add tests for ensureCharmRunning and auto-start behavior Tests cover: - Return false for cells without process cell structure - Return false for cells without TYPE in process cell - Return false for cells without resultRef in process cell - Successfully start charm with valid process cell structure - Infinite loop protection (don't attempt to start twice) - Graceful handling of events for orphan cells - No infinite retry when charm doesn't register handler πŸ€– Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude * fmt * test: improved tests for ensureCharmRunning - Adds test proving charm starts when event is sent to a cell with no handler - Adds test proving handler is called when defined for the stream path - Tests verify infinite loop protection (charm only starts once per path) - Updates orphan cell test to check actual values instead of expect(true) πŸ€– Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude * refactor(runner): simplify ensureCharmRunning by removing tracking Remove the startAttemptedForCell Set and cellLinkKey helper since runtime.runSynced() is already idempotent for running charms - calling it multiple times simply returns without doing anything. πŸ€– Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude * fix(runner): prevent infinite loop in queueEvent when no handler exists When re-queuing an event after starting a charm, pass a flag to prevent triggering another charm load attempt. This avoids infinite loops when the charm doesn't register a handler for that event type. πŸ€– Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude * test(runner): add test for restarting stopped charms Verifies that ensureCharmRunning properly restarts a charm that was previously stopped via runtime.runner.stop(). πŸ€– Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --------- Co-authored-by: Claude --- packages/runner/src/ensure-charm-running.ts | 143 ++++ packages/runner/src/scheduler.ts | 19 + .../runner/test/ensure-charm-running.test.ts | 697 ++++++++++++++++++ 3 files changed, 859 insertions(+) create mode 100644 packages/runner/src/ensure-charm-running.ts create mode 100644 packages/runner/test/ensure-charm-running.test.ts diff --git a/packages/runner/src/ensure-charm-running.ts b/packages/runner/src/ensure-charm-running.ts new file mode 100644 index 0000000000..504aa292d8 --- /dev/null +++ b/packages/runner/src/ensure-charm-running.ts @@ -0,0 +1,143 @@ +import { getLogger } from "@commontools/utils/logger"; +import { TYPE } from "./builder/types.ts"; +import type { Cell } from "./cell.ts"; +import { type NormalizedFullLink, parseLink } from "./link-utils.ts"; +import type { IRuntime } from "./runtime.ts"; + +const logger = getLogger("ensure-charm-running", { + enabled: false, + level: "debug", +}); + +/** + * Ensures the charm responsible for a given storage location is running. + * + * Note: We don't track which charms we've already started because calling + * runtime.runSynced() on an already-running charm is idempotent - it simply + * returns without doing anything. This keeps the code simple and stateless. + * + * This function traverses the source cell chain to find the root process cell, + * then starts the charm if it's not already running. + * + * The traversal logic: + * 1. Start with the cell at the cellLink location + * 2. While getSourceCell() returns something, follow it (this traverses + * through linked cells to find the process cell) + * 3. Once there's no source cell, look at resultRef in the resulting document + * 4. If resultRef is a link, that's the result cell - call runtime.runSynced() + * on it to start the charm + * + * @param runtime - The runtime instance + * @param cellLink - The location that received an event or should be current + * @returns Promise - true if a charm was started, false otherwise + */ +export async function ensureCharmRunning( + runtime: IRuntime, + cellLink: NormalizedFullLink, +): Promise { + try { + const tx = runtime.edit(); + + try { + // Get the cell at the event link location + let currentCell: Cell | undefined = runtime.getCellFromLink( + // We'll find the charm information at the root of what could be the + // process cell already, hence remove the path: + { ...cellLink, path: [] }, + undefined, + tx, + ); + + // Traverse up the source cell chain + // This follows links from derived cells back to the process cell + let sourceCell = currentCell.getSourceCell(); + while (sourceCell) { + logger.debug("ensure-charm", () => [ + `Following source cell from ${currentCell?.getAsNormalizedFullLink().id} to ${sourceCell?.getAsNormalizedFullLink().id}`, + ]); + currentCell = sourceCell; + sourceCell = currentCell.getSourceCell(); + } + + // currentCell is now the process cell (or the original cell if no sources) + // Check if it has a resultRef and a TYPE (indicating it's a process cell) + const processData = currentCell.getRaw(); + + if (!processData || typeof processData !== "object") { + logger.debug("ensure-charm", () => [ + `No process data found at ${currentCell.getAsNormalizedFullLink().id}`, + ]); + return false; + } + + const recipeId = (processData as Record)[TYPE]; + const resultRef = (processData as Record).resultRef; + + if (!recipeId) { + logger.debug("ensure-charm", () => [ + `No recipe ID (TYPE) found in process cell`, + ]); + return false; + } + + if (!resultRef) { + logger.debug("ensure-charm", () => [ + `No resultRef found in process cell`, + ]); + return false; + } + + // resultRef should be a link to the result cell + // Parse it and get the result cell + const resultLink = parseLink(resultRef, currentCell); + if (!resultLink) { + logger.debug("ensure-charm", () => [ + `Invalid resultRef: ${resultRef}`, + ]); + return false; + } + + const resultCell = runtime.getCellFromLink(resultLink, undefined, tx); + + // Commit the read transaction before starting the charm + await tx.commit(); + + // Load the recipe + const recipe = await runtime.recipeManager.loadRecipe( + recipeId as string, + cellLink.space, + ); + + if (!recipe) { + logger.debug("ensure-charm", () => [ + `Failed to load recipe: ${recipeId}`, + ]); + return false; + } + + logger.debug("ensure-charm", () => [ + `Starting charm with recipe ${recipeId} for result cell ${resultCell.getAsNormalizedFullLink().id}`, + ]); + + // Start the charm - this will register event handlers + await runtime.runSynced(resultCell, recipe); + + logger.debug("ensure-charm", () => [ + `Charm started successfully`, + ]); + + return true; + } catch (error) { + // Make sure to commit/rollback the transaction on error + try { + await tx.commit(); + } catch { + // Ignore commit errors on cleanup + } + throw error; + } + } catch (error) { + logger.error("ensure-charm", "Error ensuring charm is running:", error); + return false; + } +} diff --git a/packages/runner/src/scheduler.ts b/packages/runner/src/scheduler.ts index 39579afa15..005d91e00a 100644 --- a/packages/runner/src/scheduler.ts +++ b/packages/runner/src/scheduler.ts @@ -34,6 +34,7 @@ import { sortAndCompactPaths, type SortedAndCompactPaths, } from "./reactive-dependencies.ts"; +import { ensureCharmRunning } from "./ensure-charm-running.ts"; const logger = getLogger("scheduler", { enabled: false, @@ -337,9 +338,13 @@ export class Scheduler implements IScheduler { event: any, retries: number = DEFAULT_RETRIES_FOR_EVENTS, onCommit?: (tx: IExtendedStorageTransaction) => void, + doNotLoadCharmIfNotRunning: boolean = false, ): void { + let handlerFound = false; + for (const [link, handler] of this.eventHandlers) { if (areNormalizedLinksSame(link, eventLink)) { + handlerFound = true; this.queueExecution(); this.eventQueue.push({ action: (tx: IExtendedStorageTransaction) => handler(tx, event), @@ -348,6 +353,20 @@ export class Scheduler implements IScheduler { }); } } + + // If no handler was found, try to start the charm that should handle this event + if (!handlerFound && !doNotLoadCharmIfNotRunning) { + // Use an async IIFE to handle the async operation without blocking + (async () => { + const started = await ensureCharmRunning(this.runtime, eventLink); + if (started) { + // Charm was started, re-queue the event. Don't trigger loading again + // if this didn't result in registering a handler, as trying again + // won't change this. + this.queueEvent(eventLink, event, retries, onCommit, true); + } + })(); + } } addEventHandler(handler: EventHandler, ref: NormalizedFullLink): Cancel { diff --git a/packages/runner/test/ensure-charm-running.test.ts b/packages/runner/test/ensure-charm-running.test.ts new file mode 100644 index 0000000000..9a3d5416cc --- /dev/null +++ b/packages/runner/test/ensure-charm-running.test.ts @@ -0,0 +1,697 @@ +import { afterEach, beforeEach, describe, it } from "@std/testing/bdd"; +import { expect } from "@std/expect"; +import { Identity } from "@commontools/identity"; +import { StorageManager } from "@commontools/runner/storage/cache.deno"; +import { type Recipe, TYPE } from "../src/builder/types.ts"; +import { Runtime } from "../src/runtime.ts"; +import type { IExtendedStorageTransaction } from "../src/storage/interface.ts"; +import { ensureCharmRunning } from "../src/ensure-charm-running.ts"; + +const signer = await Identity.fromPassphrase("test operator"); +const space = signer.did(); + +describe("ensureCharmRunning", () => { + let storageManager: ReturnType; + let runtime: Runtime; + let tx: IExtendedStorageTransaction; + + beforeEach(() => { + storageManager = StorageManager.emulate({ as: signer }); + runtime = new Runtime({ + apiUrl: new URL(import.meta.url), + storageManager, + }); + tx = runtime.edit(); + }); + + afterEach(async () => { + await tx.commit(); + await runtime?.dispose(); + await storageManager?.close(); + }); + + it("should return false for cells without process cell structure", async () => { + // Create a cell that has no charm structure (no process cell, no recipe) + const orphanCell = runtime.getCell<{ $stream: true }>( + space, + "orphan-cell-test", + undefined, + tx, + ); + orphanCell.set({ $stream: true }); + + await tx.commit(); + tx = runtime.edit(); + + // ensureCharmRunning should return false - no charm to start + const result = await ensureCharmRunning( + runtime, + orphanCell.getAsNormalizedFullLink(), + ); + + expect(result).toBe(false); + }); + + it("should return false for cells without TYPE in process cell", async () => { + // Create a result cell that points to a process cell without TYPE + const resultCell = runtime.getCell( + space, + "no-type-test-result", + undefined, + tx, + ); + + const processCell = runtime.getCell( + space, + "no-type-test-process", + undefined, + tx, + ); + + // Set up the result cell to point to the process cell + resultCell.set({ value: 1 }); + resultCell.setSourceCell(processCell); + + // Process cell has no TYPE + processCell.set({ + argument: { value: 1 }, + resultRef: resultCell.getAsLink({ base: processCell }), + }); + + await tx.commit(); + tx = runtime.edit(); + + // ensureCharmRunning should return false - no TYPE means no recipe + const result = await ensureCharmRunning( + runtime, + resultCell.getAsNormalizedFullLink(), + ); + + expect(result).toBe(false); + }); + + it("should return false for cells without resultRef in process cell", async () => { + // Create a simple recipe + const recipe: Recipe = { + argumentSchema: { type: "object" }, + resultSchema: { type: "object" }, + result: {}, + nodes: [], + }; + + const recipeId = runtime.recipeManager.registerRecipe(recipe); + + // Create a result cell that points to a process cell without resultRef + const resultCell = runtime.getCell( + space, + "no-resultref-test-result", + undefined, + tx, + ); + + const processCell = runtime.getCell( + space, + "no-resultref-test-process", + undefined, + tx, + ); + + resultCell.set({ value: 1 }); + resultCell.setSourceCell(processCell); + + // Process cell has TYPE but no resultRef + processCell.set({ + [TYPE]: recipeId, + argument: { value: 1 }, + // Missing resultRef! + }); + + await tx.commit(); + tx = runtime.edit(); + + // ensureCharmRunning should return false - no resultRef + const result = await ensureCharmRunning( + runtime, + resultCell.getAsNormalizedFullLink(), + ); + + expect(result).toBe(false); + }); + + it("should start a charm with valid process cell structure", async () => { + // Create a simple recipe + let recipeRan = false; + const recipe: Recipe = { + argumentSchema: { + type: "object", + properties: { value: { type: "number" } }, + }, + resultSchema: { + type: "object", + properties: { doubled: { type: "number" } }, + }, + result: { + doubled: { $alias: { path: ["internal", "doubled"] } }, + }, + nodes: [ + { + module: { + type: "javascript", + implementation: (value: number) => { + recipeRan = true; + return value * 2; + }, + }, + inputs: { $alias: { path: ["argument", "value"] } }, + outputs: { $alias: { path: ["internal", "doubled"] } }, + }, + ], + }; + + const recipeId = runtime.recipeManager.registerRecipe(recipe); + + // Create result cell + const resultCell = runtime.getCell( + space, + "valid-charm-test-result", + undefined, + tx, + ); + + // Create process cell + const processCell = runtime.getCell( + space, + "valid-charm-test-process", + undefined, + tx, + ); + + // Set up the structure + resultCell.set({ + doubled: { + $alias: { path: ["internal", "doubled"], cell: processCell.entityId }, + }, + }); + resultCell.setSourceCell(processCell); + + processCell.set({ + [TYPE]: recipeId, + argument: { value: 5 }, + resultRef: resultCell.getAsLink({ base: processCell }), + internal: {}, + }); + + await tx.commit(); + tx = runtime.edit(); + + // ensureCharmRunning should return true and start the charm + const result = await ensureCharmRunning( + runtime, + resultCell.getAsNormalizedFullLink(), + ); + + expect(result).toBe(true); + + // Wait for the charm to run + await runtime.idle(); + + expect(recipeRan).toBe(true); + }); + + it("should be idempotent - calling multiple times is safe", async () => { + // Create a simple recipe + let startCount = 0; + const recipe: Recipe = { + argumentSchema: { type: "object" }, + resultSchema: { type: "object" }, + result: {}, + nodes: [ + { + module: { + type: "javascript", + implementation: () => { + startCount++; + }, + }, + inputs: {}, + outputs: {}, + }, + ], + }; + + const recipeId = runtime.recipeManager.registerRecipe(recipe); + + const resultCell = runtime.getCell( + space, + "idempotent-start-test-result", + undefined, + tx, + ); + + const processCell = runtime.getCell( + space, + "idempotent-start-test-process", + undefined, + tx, + ); + + resultCell.set({}); + resultCell.setSourceCell(processCell); + + processCell.set({ + [TYPE]: recipeId, + argument: {}, + resultRef: resultCell.getAsLink({ base: processCell }), + internal: {}, + }); + + await tx.commit(); + tx = runtime.edit(); + + // First call should return true (charm started) + const result1 = await ensureCharmRunning( + runtime, + resultCell.getAsNormalizedFullLink(), + ); + expect(result1).toBe(true); + + await runtime.idle(); + + // Second call should also return true - ensureCharmRunning doesn't track + // previous calls because runtime.runSynced() is idempotent for already-running charms + const result2 = await ensureCharmRunning( + runtime, + resultCell.getAsNormalizedFullLink(), + ); + expect(result2).toBe(true); + + // The charm's lift should only have run once because runSynced is idempotent + expect(startCount).toBe(1); + }); + + it("should restart a stopped charm when called again", async () => { + // Create a simple recipe that tracks how many times it starts + let startCount = 0; + const recipe: Recipe = { + argumentSchema: { type: "object" }, + resultSchema: { type: "object" }, + result: {}, + nodes: [ + { + module: { + type: "javascript", + implementation: () => { + startCount++; + }, + }, + inputs: {}, + outputs: {}, + }, + ], + }; + + const recipeId = runtime.recipeManager.registerRecipe(recipe); + + const resultCell = runtime.getCell( + space, + "restart-after-stop-test-result", + undefined, + tx, + ); + + const processCell = runtime.getCell( + space, + "restart-after-stop-test-process", + undefined, + tx, + ); + + resultCell.set({}); + resultCell.setSourceCell(processCell); + + processCell.set({ + [TYPE]: recipeId, + argument: {}, + resultRef: resultCell.getAsLink({ base: processCell }), + internal: {}, + }); + + await tx.commit(); + tx = runtime.edit(); + + // First call should start the charm + const result1 = await ensureCharmRunning( + runtime, + resultCell.getAsNormalizedFullLink(), + ); + expect(result1).toBe(true); + + await runtime.idle(); + expect(startCount).toBe(1); + + // Stop the charm + runtime.runner.stop(resultCell); + + // Call again - should restart the charm since it was stopped + const result2 = await ensureCharmRunning( + runtime, + resultCell.getAsNormalizedFullLink(), + ); + expect(result2).toBe(true); + + await runtime.idle(); + + // The charm's lift should have run twice now (once for each start) + expect(startCount).toBe(2); + }); + + it("should handle events for cells without associated charms gracefully", async () => { + // Create a cell that has no charm structure + const orphanCell = runtime.getCell<{ $stream: true }>( + space, + "orphan-event-cell-test", + undefined, + tx, + ); + orphanCell.set({ $stream: true }); + + await tx.commit(); + tx = runtime.edit(); + + // Send an event to this cell - should not crash + runtime.scheduler.queueEvent( + orphanCell.getAsNormalizedFullLink(), + { type: "click" }, + ); + + // Wait for processing - should complete without errors + await runtime.idle(); + await new Promise((resolve) => setTimeout(resolve, 50)); + await runtime.idle(); + + // If we get here, the event was handled gracefully (dropped) + expect(true).toBe(true); + }); +}); + +describe("queueEvent with auto-start", () => { + let storageManager: ReturnType; + let runtime: Runtime; + let tx: IExtendedStorageTransaction; + + beforeEach(() => { + storageManager = StorageManager.emulate({ as: signer }); + runtime = new Runtime({ + apiUrl: new URL(import.meta.url), + storageManager, + }); + tx = runtime.edit(); + }); + + afterEach(async () => { + await tx.commit(); + await runtime?.dispose(); + await storageManager?.close(); + }); + + it("should start charm when event sent to result cell path, but not retry if no handler", async () => { + // Create a recipe with a reactive lift (to prove it starts) but NO event handler + let liftRunCount = 0; + + const recipe: Recipe = { + argumentSchema: { + type: "object", + properties: { + value: { type: "number" }, + }, + }, + resultSchema: { + type: "object", + properties: { + doubled: { type: "number" }, + events: { type: "object" }, + }, + }, + initial: { + internal: { + events: { $stream: true }, + }, + }, + result: { + doubled: { $alias: { path: ["internal", "doubled"] } }, + events: { $alias: { path: ["internal", "events"] } }, + }, + nodes: [ + { + // This lift will run when the charm starts, proving the charm started + module: { + type: "javascript", + implementation: (value: number) => { + liftRunCount++; + return value * 2; + }, + }, + inputs: { $alias: { path: ["argument", "value"] } }, + outputs: { $alias: { path: ["internal", "doubled"] } }, + }, + // Note: NO handler node for events - this is intentional + ], + }; + + const recipeId = runtime.recipeManager.registerRecipe(recipe); + + // Create result cell + const resultCell = runtime.getCell( + space, + "no-handler-start-test-result", + undefined, + tx, + ); + + // Create process cell + const processCell = runtime.getCell( + space, + "no-handler-start-test-process", + undefined, + tx, + ); + + // Set up result cell - events points to internal/events in process cell + resultCell.set({ + doubled: { + $alias: { path: ["internal", "doubled"], cell: processCell.entityId }, + }, + events: { + $alias: { path: ["internal", "events"], cell: processCell.entityId }, + }, + }); + resultCell.setSourceCell(processCell); + + // Set up process cell - internal.events must be set to $stream: true + // (both in recipe.initial and directly on the cell) + processCell.set({ + [TYPE]: recipeId, + argument: { value: 5 }, + resultRef: resultCell.getAsLink({ base: processCell }), + internal: { + events: { $stream: true }, + }, + }); + + await tx.commit(); + tx = runtime.edit(); + + // Verify charm is not running yet + expect(liftRunCount).toBe(0); + + // Send an event to the result cell's events path + // ensureCharmRunning will: + // 1. Get cell at resultCell (with path removed) + // 2. Follow getSourceCell() to find processCell + // 3. Find TYPE and resultRef in processCell + // 4. Start the charm + const eventsLink = resultCell.key("events").getAsNormalizedFullLink(); + runtime.scheduler.queueEvent(eventsLink, { type: "click" }); + + // Wait for processing + await runtime.idle(); + await new Promise((resolve) => setTimeout(resolve, 100)); + await runtime.idle(); + + // The charm should have been started (lift ran) + expect(liftRunCount).toBe(1); + + // The result should show the lift's output + expect(resultCell.getAsQueryResult()).toMatchObject({ doubled: 10 }); + + // Send another event - ensureCharmRunning may be called again but + // runSynced is idempotent so the charm won't restart + runtime.scheduler.queueEvent(eventsLink, { type: "click" }); + + await runtime.idle(); + await new Promise((resolve) => setTimeout(resolve, 50)); + await runtime.idle(); + + // Lift should still only have run once because runSynced is idempotent + expect(liftRunCount).toBe(1); + }); + + it("should start charm and process event when handler is defined", async () => { + // Create a recipe with a handler that reads from the stream + let liftRunCount = 0; + let handlerRunCount = 0; + const receivedEvents: any[] = []; + + const recipe: Recipe = { + argumentSchema: { + type: "object", + properties: { + value: { type: "number" }, + }, + }, + resultSchema: { + type: "object", + properties: { + doubled: { type: "number" }, + events: { type: "object" }, + eventCount: { type: "number" }, + }, + }, + initial: { + internal: { + events: { $stream: true }, + eventCount: 0, + }, + }, + result: { + doubled: { $alias: { path: ["internal", "doubled"] } }, + events: { $alias: { path: ["internal", "events"] } }, + eventCount: { $alias: { path: ["internal", "eventCount"] } }, + }, + nodes: [ + { + // This lift will run when the charm starts + module: { + type: "javascript", + implementation: (value: number) => { + liftRunCount++; + return value * 2; + }, + }, + inputs: { $alias: { path: ["argument", "value"] } }, + outputs: { $alias: { path: ["internal", "doubled"] } }, + }, + { + // Handler that reads from the stream + module: { + type: "javascript", + wrapper: "handler", + implementation: (event: any, ctx: { eventCount: number }) => { + handlerRunCount++; + receivedEvents.push(event); + ctx.eventCount = (ctx.eventCount || 0) + 1; + }, + }, + inputs: { + $event: { $alias: { path: ["internal", "events"] } }, + $ctx: { + eventCount: { $alias: { path: ["internal", "eventCount"] } }, + }, + }, + outputs: { + eventCount: { $alias: { path: ["internal", "eventCount"] } }, + }, + }, + ], + }; + + const recipeId = runtime.recipeManager.registerRecipe(recipe); + + // Create result cell + const resultCell = runtime.getCell( + space, + "with-handler-start-test-result", + undefined, + tx, + ); + + // Create process cell + const processCell = runtime.getCell( + space, + "with-handler-start-test-process", + undefined, + tx, + ); + + // Set up result cell + resultCell.set({ + doubled: { + $alias: { path: ["internal", "doubled"], cell: processCell.entityId }, + }, + events: { + $alias: { path: ["internal", "events"], cell: processCell.entityId }, + }, + eventCount: { + $alias: { + path: ["internal", "eventCount"], + cell: processCell.entityId, + }, + }, + }); + resultCell.setSourceCell(processCell); + + // Set up process cell - internal.events must be set to $stream: true + // (both in recipe.initial and directly on the cell) + processCell.set({ + [TYPE]: recipeId, + argument: { value: 5 }, + resultRef: resultCell.getAsLink({ base: processCell }), + internal: { + events: { $stream: true }, + eventCount: 0, + }, + }); + + await tx.commit(); + tx = runtime.edit(); + + // Verify charm is not running yet + expect(liftRunCount).toBe(0); + expect(handlerRunCount).toBe(0); + + // Send an event - should start charm and process the event + // The handler is registered for the internal/events path on process cell + const eventsLink = processCell.key("internal").key("events") + .getAsNormalizedFullLink(); + runtime.scheduler.queueEvent(eventsLink, { type: "click", x: 10 }); + + // Wait for processing + await runtime.idle(); + await new Promise((resolve) => setTimeout(resolve, 100)); + await runtime.idle(); + + // The charm should have been started + expect(liftRunCount).toBe(1); + + // The handler should have been called + expect(handlerRunCount).toBe(1); + expect(receivedEvents).toEqual([{ type: "click", x: 10 }]); + + // Send another event - handler should be called again + runtime.scheduler.queueEvent(eventsLink, { type: "click", x: 20 }); + + await runtime.idle(); + await new Promise((resolve) => setTimeout(resolve, 50)); + await runtime.idle(); + + // Handler should have run twice now + expect(handlerRunCount).toBe(2); + expect(receivedEvents).toEqual([ + { type: "click", x: 10 }, + { type: "click", x: 20 }, + ]); + + // Lift should still only have run once (charm only started once) + expect(liftRunCount).toBe(1); + }); +}); From a85cc8e0285bbfb955a0ddf0edb00692fef9367e Mon Sep 17 00:00:00 2001 From: gideon Date: Wed, 10 Dec 2025 08:35:24 +0800 Subject: [PATCH 3/7] =?UTF-8?q?fix(runner):=20capture=20reactivity=20log?= =?UTF-8?q?=20after=20callback=20to=20track=20all=20cell=20=E2=80=A6=20(#2?= =?UTF-8?q?237)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(runner): capture reactivity log after callback to track all cell reads In subscribeToReferencedDocs(), the reactivity log was being captured BEFORE the callback executed. This meant that any cell reads occurring during the callback - such as accessing properties of derived arrays during JSX rendering - were not tracked for subscription purposes. The symptom: styles referencing derived data (e.g., 'derive()' output used for dynamic background colors) would render correctly on initial load but never update when the underlying cell changed. The handler would fire, the derive would recompute, but the UI wouldn't re-render. Root cause: When rendering JSX like: background: filteredItemsWithHighlight[index]?.highlightBg The array element access happens during the effect callback. Since txToReactivityLog(tx) was called before callback(value), these reads weren't captured in the subscription, so changes to the derived cell's internal data didn't trigger the effect to re-run. The fix moves txToReactivityLog(tx) to after callback(value), ensuring all cell reads during rendering are captured and subscribed to. * add regression test --- packages/runner/src/cell.ts | 2 +- packages/runner/test/cell.test.ts | 60 +++++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 1 deletion(-) diff --git a/packages/runner/src/cell.ts b/packages/runner/src/cell.ts index 006f54a4b8..6c49119ea1 100644 --- a/packages/runner/src/cell.ts +++ b/packages/runner/src/cell.ts @@ -1277,10 +1277,10 @@ function subscribeToReferencedDocs( link, true, ); - const log = txToReactivityLog(tx); // Call the callback once with initial value. let cleanup: Cancel | undefined | void = callback(value); + const log = txToReactivityLog(tx); // Technically unnecessary since we don't expect/allow callbacks to sink to // write to other cells, and we retry by design anyway below when read data diff --git a/packages/runner/test/cell.test.ts b/packages/runner/test/cell.test.ts index 9cd34d0503..e41b4329c5 100644 --- a/packages/runner/test/cell.test.ts +++ b/packages/runner/test/cell.test.ts @@ -1130,6 +1130,66 @@ describe("asCell", () => { expect(values).toEqual([42, 300]); // Got called again }); + it("should trigger sink when linked cell changes and is read during callback", async () => { + // This test verifies that cell reads happening DURING the sink callback + // are properly tracked for reactivity. The fix moves txToReactivityLog() + // to after the callback so that reads like JSON.stringify traversing + // through linked cells are captured in the subscription. + + // Create an inner cell that will be linked to + const innerCell = runtime.getCell<{ value: string }>( + space, + "sink-callback-reads-inner", + undefined, + tx, + ); + innerCell.set({ value: "initial" }); + + // Create a container cell with schema: true (no validation, raw access) + // that contains a link to the inner cell + const containerCell = runtime.getCell<{ nested: unknown }>( + space, + "sink-callback-reads-container", + true, // schema: true means no schema validation + tx, + ); + containerCell.setRaw({ + nested: innerCell.getAsLink(), + }); + + tx.commit(); + tx = runtime.edit(); + + // Track callback invocations - use JSON.stringify to force reading + // through the link during the callback + const callbackResults: string[] = []; + const cancel = containerCell.sink((value) => { + // This read through the linked cell happens DURING the callback. + // Before the fix, this read wasn't tracked, so changes to innerCell + // wouldn't trigger this sink to re-run. + const serialized = JSON.stringify(value); + callbackResults.push(serialized); + }); + + // Should have been called once with initial value + expect(callbackResults.length).toBe(1); + expect(callbackResults[0]).toContain("initial"); + + // Now update the inner cell + innerCell.withTx(tx).set({ value: "updated" }); + tx.commit(); + tx = runtime.edit(); + + await runtime.idle(); + + // The sink should have been triggered again because we read through + // the link during the callback + expect(callbackResults.length).toBe(2); + expect(callbackResults[1]).toContain("updated"); + + cancel(); + }); + it("behaves correctly when setting a cell to itself", () => { const c = runtime.getCell<{ a: number }>( space, From 9d2039389f069a8eae119a4e70625492fad34058 Mon Sep 17 00:00:00 2001 From: Ben Follington <5009316+bfollington@users.noreply.github.com> Date: Wed, 10 Dec 2025 10:46:33 +1000 Subject: [PATCH 4/7] Allow overriding `index.md` URL for `listPatternIndex` (#2240) --- packages/patterns/common-tools.tsx | 7 ++++++- packages/patterns/pattern-index.tsx | 23 +++++++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) create mode 100644 packages/patterns/pattern-index.tsx diff --git a/packages/patterns/common-tools.tsx b/packages/patterns/common-tools.tsx index 2e9158422d..94d20f244e 100644 --- a/packages/patterns/common-tools.tsx +++ b/packages/patterns/common-tools.tsx @@ -11,6 +11,7 @@ import { ifElse, navigateTo, recipe, + wish, } from "commontools"; ///// COMMON TOOLS (get it?) //// @@ -270,8 +271,12 @@ type ListPatternIndexInput = Record; export const listPatternIndex = recipe( ({ _ }) => { + const patternIndexUrl = wish<{ url: string }>({ query: "#pattern-index" }); + const { pending, result } = fetchData({ - url: "/api/patterns/index.md", + url: computed(() => + patternIndexUrl.result.url ?? "/api/patterns/index.md" + ), mode: "text", }); return ifElse(computed(() => pending || !result), undefined, { result }); diff --git a/packages/patterns/pattern-index.tsx b/packages/patterns/pattern-index.tsx new file mode 100644 index 0000000000..04dac0b8da --- /dev/null +++ b/packages/patterns/pattern-index.tsx @@ -0,0 +1,23 @@ +/// +import { type Default, NAME, pattern, str, UI } from "commontools"; + +type Input = { url: Default }; + +/** A URL to a #pattern-index */ +type Output = { url: string }; + +const PatternIndexUrl = pattern( + ({ url }) => { + return { + [NAME]: str`Pattern Index: ${url}`, + [UI]: ( + + + + ), + url, + }; + }, +); + +export default PatternIndexUrl; From 3d2fcf8180abad43fe37f3256df5ceb3e5ccb56f Mon Sep 17 00:00:00 2001 From: gideon Date: Wed, 10 Dec 2025 09:21:27 +0800 Subject: [PATCH 5/7] initial draft implementation of action (#2204) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * initial draft implementation of action * fix Handler/HandlerFactory types to return Stream instead of OpaqueRef - Handler.with and HandlerFactory now correctly typed to return Stream - stream() function in opaque-ref.ts now returns Stream - ActionFunction now returns HandlerFactory - Schema generator detects Stream in callable return types for result schema This makes the types accurately reflect runtime behavior where handlers return Streams. Fixes action() result schema generation. πŸ€– Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude * add runtime tests for action, fix missing { proxy: true } option --- packages/api/index.ts | 19 +- packages/runner/src/builder/factory.ts | 3 +- packages/runner/src/builder/module.ts | 33 ++- packages/runner/src/builder/opaque-ref.ts | 8 +- packages/runner/src/builder/types.ts | 2 + packages/runner/test/module.test.ts | 14 +- .../src/formatters/object-formatter.ts | 68 ++++++ packages/ts-transformers/src/ast/call-kind.ts | 1 + .../closures/strategies/action-strategy.ts | 200 ++++++++++++++++++ .../src/closures/transformer.ts | 2 + .../src/closures/utils/schema-factory.ts | 11 + .../closures/action-basic.expected.tsx | 42 ++++ .../fixtures/closures/action-basic.input.tsx | 12 ++ .../action-generic-event.expected.tsx | 54 +++++ .../closures/action-generic-event.input.tsx | 17 ++ .../closures/action-with-event.expected.tsx | 53 +++++ .../closures/action-with-event.input.tsx | 16 ++ .../complex-nested-types.expected.ts | 7 +- 18 files changed, 553 insertions(+), 9 deletions(-) create mode 100644 packages/ts-transformers/src/closures/strategies/action-strategy.ts create mode 100644 packages/ts-transformers/test/fixtures/closures/action-basic.expected.tsx create mode 100644 packages/ts-transformers/test/fixtures/closures/action-basic.input.tsx create mode 100644 packages/ts-transformers/test/fixtures/closures/action-generic-event.expected.tsx create mode 100644 packages/ts-transformers/test/fixtures/closures/action-generic-event.input.tsx create mode 100644 packages/ts-transformers/test/fixtures/closures/action-with-event.expected.tsx create mode 100644 packages/ts-transformers/test/fixtures/closures/action-with-event.input.tsx diff --git a/packages/api/index.ts b/packages/api/index.ts index af9fd0b374..921611f908 100644 --- a/packages/api/index.ts +++ b/packages/api/index.ts @@ -655,7 +655,7 @@ export type toJSON = { }; export type Handler = Module & { - with: (inputs: Opaque>) => OpaqueRef; + with: (inputs: Opaque>) => Stream; }; export type NodeFactory = @@ -674,7 +674,7 @@ export type ModuleFactory = & toJSON; export type HandlerFactory = - & ((inputs: Opaque>) => OpaqueRef) + & ((inputs: Opaque>) => Stream) & Handler & toJSON; @@ -1139,6 +1139,20 @@ export type HandlerFunction = { ): ModuleFactory, Stream>>; }; +/** + * ActionFunction creates a handler that doesn't use the state parameter. + * + * This is to handler as computed is to lift/derive: + * - User writes: action((e) => count.set(e.data)) + * - Transformer rewrites to: handler((e, { count }) => count.set(e.data))({ count }) + * + * The transformer extracts closures and makes them explicit, just like how + * computed(() => expr) becomes derive({}, () => expr) with closure extraction. + */ +export type ActionFunction = { + (fn: (event: T) => void): HandlerFactory; +}; + /** * DeriveFunction creates a reactive computation that transforms input values. * @@ -1354,6 +1368,7 @@ export declare const recipe: RecipeFunction; export declare const patternTool: PatternToolFunction; export declare const lift: LiftFunction; export declare const handler: HandlerFunction; +export declare const action: ActionFunction; /** @deprecated Use compute() instead */ export declare const derive: DeriveFunction; export declare const computed: ComputedFunction; diff --git a/packages/runner/src/builder/factory.ts b/packages/runner/src/builder/factory.ts index ba51d8cf3e..4a682fcb61 100644 --- a/packages/runner/src/builder/factory.ts +++ b/packages/runner/src/builder/factory.ts @@ -23,7 +23,7 @@ import { } from "./types.ts"; import { h } from "@commontools/html"; import { pattern, recipe } from "./recipe.ts"; -import { byRef, computed, derive, handler, lift } from "./module.ts"; +import { action, byRef, computed, derive, handler, lift } from "./module.ts"; import { compileAndRun, fetchData, @@ -84,6 +84,7 @@ export const createBuilder = (): { // Module creation lift, handler, + action, derive, computed, diff --git a/packages/runner/src/builder/module.ts b/packages/runner/src/builder/module.ts index 1576e27d2a..1d50ec8cff 100644 --- a/packages/runner/src/builder/module.ts +++ b/packages/runner/src/builder/module.ts @@ -10,6 +10,7 @@ import type { OpaqueRef, Schema, SchemaWithoutCell, + Stream, StripCell, toJSON, } from "./types.ts"; @@ -122,7 +123,7 @@ function handlerInternal( ); const module: Handler & toJSON & { - bind: (inputs: Opaque>) => OpaqueRef; + bind: (inputs: Opaque>) => Stream; } = { type: "javascript", implementation: handler, @@ -135,7 +136,7 @@ function handlerInternal( ...(schema !== undefined ? { argumentSchema: schema } : {}), }; - const factory = Object.assign((props: Opaque>): OpaqueRef => { + const factory = Object.assign((props: Opaque>): Stream => { const eventStream = stream(eventSchema); // Set stream marker (cast to E as stream is typed for the events it accepts) @@ -225,3 +226,31 @@ export function derive(...args: any[]): OpaqueRef { // unsafe closures: like derive, but doesn't need any arguments export const computed: (fn: () => T) => OpaqueRef = (fn: () => T) => lift(fn)(undefined); + +/** + * action: Creates a handler that doesn't use the state parameter. + * + * This is to handler as computed is to lift/derive: + * - User writes: action((e) => count.set(e.data)) + * - Transformer rewrites to: handler((e, { count }) => count.set(e.data))({ count }) + * + * The transformer extracts closures and makes them explicit, just like how + * computed(() => expr) becomes derive({}, () => expr) with closure extraction. + * + * NOTE: This function should never be called directly at runtime because the + * CTS transformer rewrites action() calls to handler() calls. If this function + * is reached, it means CTS is not enabled. + * + * @param _event - A function that receives an event and performs side effects + * @throws Error if called directly (CTS must be enabled for action() to work) + */ +export function action( + _event: (event: T) => void, +): HandlerFactory; +export function action( + _event: (event: T) => void, +): HandlerFactory { + throw new Error( + "action() must be used with CTS enabled - add /// to your file", + ); +} diff --git a/packages/runner/src/builder/opaque-ref.ts b/packages/runner/src/builder/opaque-ref.ts index 897111de81..f12fb44286 100644 --- a/packages/runner/src/builder/opaque-ref.ts +++ b/packages/runner/src/builder/opaque-ref.ts @@ -4,6 +4,7 @@ import { type Opaque, type OpaqueRef, type SchemaWithoutCell, + type Stream, } from "./types.ts"; import { getTopFrame } from "./recipe.ts"; import { createCell } from "../cell.ts"; @@ -83,6 +84,9 @@ export function opaqueRef( export function stream( schema?: JSONSchema, -): OpaqueRef { - return opaqueRefWithCell(undefined, schema, "stream"); +): Stream { + // The runtime creates a Stream cell, but opaqueRefWithCell is typed to return OpaqueRef + return opaqueRefWithCell(undefined, schema, "stream") as unknown as Stream< + T + >; } diff --git a/packages/runner/src/builder/types.ts b/packages/runner/src/builder/types.ts index 59004fe94a..4c646d4e04 100644 --- a/packages/runner/src/builder/types.ts +++ b/packages/runner/src/builder/types.ts @@ -2,6 +2,7 @@ import { isObject, type Mutable } from "@commontools/utils/types"; import type { SchemaContext } from "@commontools/memory/interface"; import type { + ActionFunction, AsCell, AsComparableCell, AsOpaqueCell, @@ -234,6 +235,7 @@ export interface BuilderFunctionsAndConstants { // Module creation lift: LiftFunction; handler: HandlerFunction; + action: ActionFunction; derive: DeriveFunction; computed: ComputedFunction; diff --git a/packages/runner/test/module.test.ts b/packages/runner/test/module.test.ts index 26612ba3bc..9bd7e48130 100644 --- a/packages/runner/test/module.test.ts +++ b/packages/runner/test/module.test.ts @@ -10,7 +10,7 @@ import { type Module, type OpaqueRef, } from "../src/builder/types.ts"; -import { handler, lift } from "../src/builder/module.ts"; +import { action, handler, lift } from "../src/builder/module.ts"; import { opaqueRef } from "../src/builder/opaque-ref.ts"; import { popFrame, pushFrame } from "../src/builder/recipe.ts"; import { Runtime } from "../src/runtime.ts"; @@ -243,4 +243,16 @@ describe("module", () => { expect([...nodes][0].inputs.$event).toBe(stream); }); }); + + describe("action function", () => { + it("throws error when called without CTS enabled", () => { + // action() should only be used with CTS enabled, which rewrites it to handler() + // When called directly at runtime (without CTS), it should throw an error + expect(() => { + action<{ data: string }>(({ data }) => { + void data; + }); + }).toThrow("action() must be used with CTS enabled"); + }); + }); }); diff --git a/packages/schema-generator/src/formatters/object-formatter.ts b/packages/schema-generator/src/formatters/object-formatter.ts index ccadd027ea..fcc0320a61 100644 --- a/packages/schema-generator/src/formatters/object-formatter.ts +++ b/packages/schema-generator/src/formatters/object-formatter.ts @@ -10,6 +10,7 @@ import { isFunctionLike, safeGetPropertyType, } from "../type-utils.ts"; +import { getCellWrapperInfo } from "../typescript/cell-brand.ts"; import type { SchemaGenerator } from "../schema-generator.ts"; import { extractDocFromSymbolAndDecls, getDeclDocs } from "../doc-utils.ts"; import { getLogger } from "@commontools/utils/logger"; @@ -20,6 +21,53 @@ const logger = getLogger("schema-generator.object", { level: "warn", }); +/** + * Check if a callable type (like ModuleFactory or HandlerFactory) returns a wrapper type. + * ModuleFactory when called returns OpaqueRef. + * If R is Stream, we should generate { asStream: true } instead of skipping. + * If R is Cell, we should generate { asCell: true } instead of skipping. + * + * Returns the schema definition for the wrapper if detected, undefined otherwise. + */ +function getWrapperSchemaFromCallable( + type: ts.Type, + checker: ts.TypeChecker, +): SchemaDefinition | undefined { + const callSignatures = type.getCallSignatures(); + if (callSignatures.length === 0) return undefined; + + // Get the return type of the first call signature + const callReturnType = callSignatures[0]!.getReturnType(); + + // Check if the return type is a wrapper (Stream, Cell, or OpaqueRef<...>) + const wrapperInfo = getCellWrapperInfo(callReturnType, checker); + if (wrapperInfo?.kind === "Stream") { + return { asStream: true }; + } + if (wrapperInfo?.kind === "Cell") { + return { asCell: true }; + } + + // Also check if it's an OpaqueRef wrapping a Stream or Cell + if (wrapperInfo?.kind === "OpaqueRef") { + // Get the inner type of OpaqueRef + const typeRef = wrapperInfo.typeRef; + const typeArgs = checker.getTypeArguments(typeRef); + if (typeArgs.length > 0) { + const innerType = typeArgs[0]!; + const innerWrapperInfo = getCellWrapperInfo(innerType, checker); + if (innerWrapperInfo?.kind === "Stream") { + return { asStream: true }; + } + if (innerWrapperInfo?.kind === "Cell") { + return { asCell: true }; + } + } + } + + return undefined; +} + /** * Check if a type is a union that includes undefined. * When a property type is `T | undefined`, the property is considered optional. @@ -135,6 +183,26 @@ export class ObjectFormatter implements TypeFormatter { ); if (isFunctionLike(resolvedPropType)) { + // Special case: ModuleFactory/HandlerFactory types that return Stream or Cell + // should generate { asStream: true } or { asCell: true } instead of being skipped + const wrapperSchema = getWrapperSchemaFromCallable( + resolvedPropType, + checker, + ); + if (wrapperSchema) { + // This is a factory that returns a wrapper type (Stream or Cell) + // Respect the same optional detection logic as regular properties + const hasOptionalFlag = (prop.flags & ts.SymbolFlags.Optional) !== 0; + const hasUndefinedUnion = isUnionWithUndefined(resolvedPropType); + const isDefaultWithUndefinedInner = isDefaultNodeWithUndefined( + propTypeNode, + checker, + ); + const isOptional = hasOptionalFlag || hasUndefinedUnion || + isDefaultWithUndefinedInner; + if (!isOptional) required.push(propName); + properties[propName] = wrapperSchema; + } continue; } diff --git a/packages/ts-transformers/src/ast/call-kind.ts b/packages/ts-transformers/src/ast/call-kind.ts index 4b2c2d42c2..8b6658b688 100644 --- a/packages/ts-transformers/src/ast/call-kind.ts +++ b/packages/ts-transformers/src/ast/call-kind.ts @@ -48,6 +48,7 @@ const BUILDER_SYMBOL_NAMES = new Set([ "recipe", "pattern", "handler", + "action", "lift", "computed", "render", diff --git a/packages/ts-transformers/src/closures/strategies/action-strategy.ts b/packages/ts-transformers/src/closures/strategies/action-strategy.ts new file mode 100644 index 0000000000..9f85bf6dd1 --- /dev/null +++ b/packages/ts-transformers/src/closures/strategies/action-strategy.ts @@ -0,0 +1,200 @@ +import ts from "typescript"; +import type { TransformationContext } from "../../core/mod.ts"; +import type { ClosureTransformationStrategy } from "./strategy.ts"; +import { detectCallKind, registerSyntheticCallType } from "../../ast/mod.ts"; +import { CaptureCollector } from "../capture-collector.ts"; +import { RecipeBuilder } from "../utils/recipe-builder.ts"; +import { SchemaFactory } from "../utils/schema-factory.ts"; +import { buildCapturePropertyAssignments } from "./map-strategy.ts"; +import { unwrapArrowFunction } from "../utils/ast-helpers.ts"; + +/** + * ActionStrategy transforms action() calls to handler() calls with explicit closures. + * + * This is to handler as computed is to lift/derive: + * - Input: action(() => count.set(count.get() + 1)) + * - Output: handler((_, { count }) => count.set(count.get() + 1))({ count }) + * + * The action callback takes zero or one parameters (optional event) and closes + * over scope variables. The transformer extracts these closures and makes them + * explicit as handler params. + * + * Examples: + * - action(() => doSomething()) β†’ no event, schema is false + * - action((e) => doSomething(e.target)) β†’ has event, schema is inferred + * + * ## Limitation: Arrow Functions Only + * + * Currently only arrow functions are supported, not function expressions. + * This matches the behavior of HandlerStrategy for JSX event handlers. + * + * Supported: action(() => count.set(count.get() + 1)) + * NOT supported: action(function() { count.set(count.get() + 1) }) + * + * To support function expressions in the future: + * 1. Update RecipeBuilder.buildHandlerCallback to accept FunctionExpression + * (currently typed as ArrowFunction only) + * 2. Update this strategy to use isFunctionLikeExpression instead of unwrapArrowFunction + * 3. Potentially update HandlerStrategy for consistency + * 4. Add test cases for function expression callbacks + */ +export class ActionStrategy implements ClosureTransformationStrategy { + canTransform( + node: ts.Node, + context: TransformationContext, + ): boolean { + return ts.isCallExpression(node) && isActionCall(node, context); + } + + transform( + node: ts.Node, + context: TransformationContext, + visitor: ts.Visitor, + ): ts.Node | undefined { + if (!ts.isCallExpression(node)) return undefined; + return transformActionCall(node, context, visitor); + } +} + +/** + * Check if a call expression is an action() call from commontools + */ +function isActionCall( + node: ts.CallExpression, + context: TransformationContext, +): boolean { + const callKind = detectCallKind(node, context.checker); + return callKind?.kind === "builder" && callKind.builderName === "action"; +} + +/** + * Extract the callback function from an action call. + * Action has one signature: action(callback) + * + * Note: Only arrow functions are supported (see class doc for limitation details). + */ +function extractActionCallback( + actionCall: ts.CallExpression, +): ts.ArrowFunction | undefined { + const args = actionCall.arguments; + + if (args.length === 1) { + const callback = args[0]; + if (callback) { + return unwrapArrowFunction(callback); + } + } + + return undefined; +} + +/** + * Transform an action call to a handler call with explicit closures. + * Converts: action(() => count.set(count.get() + 1)) + * To: handler((_, { count }) => count.set(count.get() + 1))({ count }) + */ +function transformActionCall( + actionCall: ts.CallExpression, + context: TransformationContext, + visitor: ts.Visitor, +): ts.CallExpression | undefined { + const { factory, checker } = context; + + // Extract callback + const callback = extractActionCallback(actionCall); + if (!callback) { + return undefined; + } + + // Recursively transform the callback body first + const transformedBody = ts.visitNode( + callback.body, + visitor, + ) as ts.ConciseBody; + + // Collect captures + const collector = new CaptureCollector(checker); + const { captureTree } = collector.analyze(callback); + + // Initialize RecipeBuilder + const builder = new RecipeBuilder(context); + builder.setCaptureTree(captureTree); + + // Determine event parameter name: + // - If callback has an event param, preserve its name + // - Otherwise use "_" to indicate unused + const eventParam = callback.parameters[0]; + const eventParamName = eventParam && ts.isIdentifier(eventParam.name) + ? eventParam.name.text + : "_"; + + // Build the handler callback with (event, params) signature + const handlerCallback = builder.buildHandlerCallback( + callback, + transformedBody, + eventParamName, + "__ct_action_params", + ); + + // Build type information for handler params using SchemaFactory + const schemaFactory = new SchemaFactory(context); + + // For action, event parameter is optional: + // - action(() => ...) β†’ event schema is `false` (never type) + // - action((e) => ...) β†’ event schema is inferred from the parameter + const eventTypeNode = callback.parameters.length > 0 + ? schemaFactory.createHandlerEventSchema(callback) + : schemaFactory.createActionEventSchema(); + + // State schema is based on captures + const stateTypeNode = schemaFactory.createHandlerStateSchema( + captureTree, + undefined, // no explicit state parameter in action + ); + + // Build the handler call: handler(callback) + const handlerExpr = context.ctHelpers.getHelperExpr("handler"); + const handlerCall = factory.createCallExpression( + handlerExpr, + [eventTypeNode, stateTypeNode], + [handlerCallback], + ); + + // Build the params object: { count, ... } + const paramProperties = buildCapturePropertyAssignments(captureTree, factory); + const paramsObject = factory.createObjectLiteralExpression( + paramProperties, + paramProperties.length > 0, + ); + + // Build the final call: handler(...)({ captures }) + const finalCall = factory.createCallExpression( + handlerCall, + undefined, + [paramsObject], + ); + + // Register the return type in the TypeRegistry for schema inference. + // This enables SchemaInjectionTransformer to correctly infer the pattern's result type + // when an action is returned as a property (e.g., return { inc: action(...) }). + // Without this registration, the synthetic handler call has no type information, + // resulting in an empty result schema for the pattern. + // + // Note: The action call has type `ModuleFactory>`, but the finalCall + // is `handler(...)({...})` which CALLS the factory. We need the return type of that call, + // which is `OpaqueRef>`. + const typeRegistry = context.options.typeRegistry; + if (typeRegistry) { + // Get the type of the original action call (ModuleFactory>) + const actionType = checker.getTypeAtLocation(actionCall); + // Get the call signature to find what type is returned when calling the factory + const callSignatures = actionType.getCallSignatures(); + if (callSignatures.length > 0) { + const callReturnType = callSignatures[0]!.getReturnType(); + // This should be OpaqueRef> - the type of calling handler(...)({...}) + registerSyntheticCallType(finalCall, callReturnType, typeRegistry); + } + } + + return finalCall; +} diff --git a/packages/ts-transformers/src/closures/transformer.ts b/packages/ts-transformers/src/closures/transformer.ts index 5636ffcc8b..3b28b22323 100644 --- a/packages/ts-transformers/src/closures/transformer.ts +++ b/packages/ts-transformers/src/closures/transformer.ts @@ -1,6 +1,7 @@ import ts from "typescript"; import { TransformationContext, Transformer } from "../core/mod.ts"; import { visitEachChildWithJsx } from "../ast/mod.ts"; +import { ActionStrategy } from "./strategies/action-strategy.ts"; import { MapStrategy } from "./strategies/map-strategy.ts"; import { DeriveStrategy } from "./strategies/derive-strategy.ts"; import { HandlerStrategy } from "./strategies/handler-strategy.ts"; @@ -21,6 +22,7 @@ function createClosureTransformVisitor( ): ts.Visitor { const strategies: ClosureTransformationStrategy[] = [ new HandlerStrategy(), + new ActionStrategy(), new MapStrategy(), new DeriveStrategy(), ]; diff --git a/packages/ts-transformers/src/closures/utils/schema-factory.ts b/packages/ts-transformers/src/closures/utils/schema-factory.ts index b2317d81d8..872f1a35cf 100644 --- a/packages/ts-transformers/src/closures/utils/schema-factory.ts +++ b/packages/ts-transformers/src/closures/utils/schema-factory.ts @@ -213,6 +213,17 @@ export class SchemaFactory { return factory.createTypeLiteralNode(typeElements); } + /** + * Build a TypeNode for action's event parameter. + * + * Actions don't use the event parameter, so we return `never` type + * which generates `false` in JSON Schema (no valid value). + */ + createActionEventSchema(): ts.TypeNode { + const { factory } = this.context; + return factory.createKeywordTypeNode(ts.SyntaxKind.NeverKeyword); + } + /** * Build a TypeNode for the handler event parameter and register it in TypeRegistry. */ diff --git a/packages/ts-transformers/test/fixtures/closures/action-basic.expected.tsx b/packages/ts-transformers/test/fixtures/closures/action-basic.expected.tsx new file mode 100644 index 0000000000..e1b7a9b668 --- /dev/null +++ b/packages/ts-transformers/test/fixtures/closures/action-basic.expected.tsx @@ -0,0 +1,42 @@ +import * as __ctHelpers from "commontools"; +import { Cell, pattern, action } from "commontools"; +interface State { + count: Cell; +} +export default pattern(({ count }) => { + return { + inc: __ctHelpers.handler(false as const satisfies __ctHelpers.JSONSchema, { + type: "object", + properties: { + count: { + type: "number", + asCell: true + } + }, + required: ["count"] + } as const satisfies __ctHelpers.JSONSchema, (_, { count }) => count.set(count.get() + 1))({ + count: count + }), + }; +}, { + type: "object", + properties: { + count: { + type: "number", + asCell: true + } + }, + required: ["count"] +} as const satisfies __ctHelpers.JSONSchema, { + type: "object", + properties: { + inc: { + asStream: true + } + }, + required: ["inc"] +} as const satisfies __ctHelpers.JSONSchema); +// @ts-ignore: Internals +function h(...args: any[]) { return __ctHelpers.h.apply(null, args); } +// @ts-ignore: Internals +h.fragment = __ctHelpers.h.fragment; diff --git a/packages/ts-transformers/test/fixtures/closures/action-basic.input.tsx b/packages/ts-transformers/test/fixtures/closures/action-basic.input.tsx new file mode 100644 index 0000000000..fb2ef2c4af --- /dev/null +++ b/packages/ts-transformers/test/fixtures/closures/action-basic.input.tsx @@ -0,0 +1,12 @@ +/// +import { Cell, pattern, action } from "commontools"; + +interface State { + count: Cell; +} + +export default pattern(({ count }) => { + return { + inc: action(() => count.set(count.get() + 1)), + }; +}); diff --git a/packages/ts-transformers/test/fixtures/closures/action-generic-event.expected.tsx b/packages/ts-transformers/test/fixtures/closures/action-generic-event.expected.tsx new file mode 100644 index 0000000000..12854a45fc --- /dev/null +++ b/packages/ts-transformers/test/fixtures/closures/action-generic-event.expected.tsx @@ -0,0 +1,54 @@ +import * as __ctHelpers from "commontools"; +import { Cell, pattern, action } from "commontools"; +interface MyEvent { + data: string; +} +interface State { + value: Cell; +} +export default pattern(({ value }) => { + return { + // Test action((e) => ...) variant (type parameter instead of inline annotation) + update: __ctHelpers.handler({ + type: "object", + properties: { + data: { + type: "string" + } + }, + required: ["data"] + } as const satisfies __ctHelpers.JSONSchema, { + type: "object", + properties: { + value: { + type: "string", + asCell: true + } + }, + required: ["value"] + } as const satisfies __ctHelpers.JSONSchema, (e, { value }) => value.set(e.data))({ + value: value + }), + }; +}, { + type: "object", + properties: { + value: { + type: "string", + asCell: true + } + }, + required: ["value"] +} as const satisfies __ctHelpers.JSONSchema, { + type: "object", + properties: { + update: { + asStream: true + } + }, + required: ["update"] +} as const satisfies __ctHelpers.JSONSchema); +// @ts-ignore: Internals +function h(...args: any[]) { return __ctHelpers.h.apply(null, args); } +// @ts-ignore: Internals +h.fragment = __ctHelpers.h.fragment; diff --git a/packages/ts-transformers/test/fixtures/closures/action-generic-event.input.tsx b/packages/ts-transformers/test/fixtures/closures/action-generic-event.input.tsx new file mode 100644 index 0000000000..34ef927a08 --- /dev/null +++ b/packages/ts-transformers/test/fixtures/closures/action-generic-event.input.tsx @@ -0,0 +1,17 @@ +/// +import { Cell, pattern, action } from "commontools"; + +interface MyEvent { + data: string; +} + +interface State { + value: Cell; +} + +export default pattern(({ value }) => { + return { + // Test action((e) => ...) variant (type parameter instead of inline annotation) + update: action((e) => value.set(e.data)), + }; +}); diff --git a/packages/ts-transformers/test/fixtures/closures/action-with-event.expected.tsx b/packages/ts-transformers/test/fixtures/closures/action-with-event.expected.tsx new file mode 100644 index 0000000000..31d1f784e0 --- /dev/null +++ b/packages/ts-transformers/test/fixtures/closures/action-with-event.expected.tsx @@ -0,0 +1,53 @@ +import * as __ctHelpers from "commontools"; +import { Cell, pattern, action } from "commontools"; +interface MyEvent { + data: string; +} +interface State { + value: Cell; +} +export default pattern(({ value }) => { + return { + update: __ctHelpers.handler({ + type: "object", + properties: { + data: { + type: "string" + } + }, + required: ["data"] + } as const satisfies __ctHelpers.JSONSchema, { + type: "object", + properties: { + value: { + type: "string", + asCell: true + } + }, + required: ["value"] + } as const satisfies __ctHelpers.JSONSchema, (e, { value }) => value.set(e.data))({ + value: value + }), + }; +}, { + type: "object", + properties: { + value: { + type: "string", + asCell: true + } + }, + required: ["value"] +} as const satisfies __ctHelpers.JSONSchema, { + type: "object", + properties: { + update: { + asStream: true + } + }, + required: ["update"] +} as const satisfies __ctHelpers.JSONSchema); +// @ts-ignore: Internals +function h(...args: any[]) { return __ctHelpers.h.apply(null, args); } +// @ts-ignore: Internals +h.fragment = __ctHelpers.h.fragment; diff --git a/packages/ts-transformers/test/fixtures/closures/action-with-event.input.tsx b/packages/ts-transformers/test/fixtures/closures/action-with-event.input.tsx new file mode 100644 index 0000000000..30f7d1791e --- /dev/null +++ b/packages/ts-transformers/test/fixtures/closures/action-with-event.input.tsx @@ -0,0 +1,16 @@ +/// +import { Cell, pattern, action } from "commontools"; + +interface MyEvent { + data: string; +} + +interface State { + value: Cell; +} + +export default pattern(({ value }) => { + return { + update: action((e: MyEvent) => value.set(e.data)), + }; +}); diff --git a/packages/ts-transformers/test/fixtures/handler-schema/complex-nested-types.expected.ts b/packages/ts-transformers/test/fixtures/handler-schema/complex-nested-types.expected.ts index 403c17c929..61befe0933 100644 --- a/packages/ts-transformers/test/fixtures/handler-schema/complex-nested-types.expected.ts +++ b/packages/ts-transformers/test/fixtures/handler-schema/complex-nested-types.expected.ts @@ -120,7 +120,12 @@ const _updateTags = handler({ export { userHandler }; export default recipe(false as const satisfies __ctHelpers.JSONSchema, { type: "object", - properties: {} + properties: { + userHandler: { + asStream: true + } + }, + required: ["userHandler"] } as const satisfies __ctHelpers.JSONSchema, () => { return { userHandler }; }); From cc268b89df4ae3b4959908d8f4f9a6c9a6019150 Mon Sep 17 00:00:00 2001 From: Bernhard Seefeld Date: Tue, 9 Dec 2025 17:51:47 -0800 Subject: [PATCH 6/7] refactor(memory): Move sentDocs tracking from per-subscription to per-session (#2241) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, each SchemaSubscription tracked its own `sentDocs` Set and `since` value to avoid sending duplicate documents. This was suboptimal because the same document could be sent multiple times on the same WebSocket session if requested by different subscriptions. Now we use the session-level `lastRevision` Map (which already existed for non-schema subscriptions) to track which documents have been sent and at what `since` value. This ensures that each document is only sent once per session, regardless of how many subscriptions request it. Changes: - Remove `since` and `sentDocs` fields from SchemaSubscription class - Simplify addSchemaSubscription to not track sent docs - Update getSchemaSubscriptionMatches to use session-level lastRevision - Let filterKnownFacts handle updating lastRevision when facts are sent πŸ€– Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Claude --- packages/memory/provider.ts | 51 ++++++++++--------------------------- 1 file changed, 14 insertions(+), 37 deletions(-) diff --git a/packages/memory/provider.ts b/packages/memory/provider.ts index d7453d0cf6..601f94ff3f 100644 --- a/packages/memory/provider.ts +++ b/packages/memory/provider.ts @@ -153,15 +153,12 @@ class MemoryProvider< export class SchemaSubscription { constructor( public invocation: SchemaQuery, - public since: number = -1, // Track which docs were scanned with which schemas for incremental updates public schemaTracker: MapSet = new MapSet( deepEqual, ), // True if this is a wildcard query (of: "_") that can't use incremental updates public isWildcardQuery: boolean = false, - // Track which docs have been sent to the client (by address format) - public sentDocs: Set = new Set(), ) {} } @@ -551,34 +548,21 @@ class MemoryProviderSession< private addSchemaSubscription( of: JobId, invocation: SchemaQuery, - result: Selection, + _result: Selection, schemaTracker?: MapSet, ) { - const space = invocation.sub; - const factSelection = result[space]; - const factVersions = [...FactModule.iterate(factSelection)]; - const since = factVersions.reduce( - (acc, cur, _i) => cur.since > acc ? cur.since : acc, - -1, - ); - // Check if this is a wildcard query (of: "_") // Wildcard queries can't benefit from incremental updates via schemaTracker const isWildcardQuery = this.isWildcardQuery(invocation); - // Track which docs were sent in the initial query result - const sentDocs = new Set( - factVersions.map((fv) => this.formatAddress(space, fv)), - ); - const subscription = new SchemaSubscription( invocation, - since, schemaTracker ?? new MapSet(deepEqual), isWildcardQuery, - sentDocs, ); this.schemaChannels.set(of, subscription); + // Note: lastRevision is updated by filterKnownFacts when excludeSent is used, + // and by getSchemaSubscriptionMatches for subsequent updates } /** @@ -696,26 +680,19 @@ class MemoryProviderSession< space, ); - // Collect facts that are either newer or haven't been sent yet - let hasNewFacts = false; - for (const [address, factVersion] of result.newFacts) { - const isNewer = factVersion.since > subscription.since; - const notSentYet = !subscription.sentDocs.has(address); - - if (isNewer || notSentYet) { - schemaMatches.set(address, factVersion); - subscription.sentDocs.add(address); - hasNewFacts = true; - if (isNewer) { - subscription.since = factVersion.since; - } + // Collect facts that are newer than what we've sent on this session + // Note: we don't update lastRevision here - that happens in filterKnownFacts + // when facts are actually sent to the client + for (const [_address, factVersion] of result.newFacts) { + const factKey = this.toKey(factVersion); + const previousSince = this.lastRevision.get(factKey); + + if (previousSince === undefined || previousSince < factVersion.since) { + schemaMatches.set(factKey, factVersion); + lastId = id; + maxSince = Math.max(maxSince, factVersion.since); } } - - if (hasNewFacts) { - lastId = id; - maxSince = Math.max(maxSince, subscription.since); - } } return [lastId, maxSince, [...schemaMatches.values()]]; From a5237e7742c252a258760e818e9102bc9bb6344e Mon Sep 17 00:00:00 2001 From: gideon Date: Wed, 10 Dec 2025 10:00:46 +0800 Subject: [PATCH 7/7] =?UTF-8?q?fix(api):=20handle=20intersection=20types?= =?UTF-8?q?=20in=20OpaqueRef=20to=20prevent=20double-wr=E2=80=A6=20(#2238)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous approach used `T extends AnyBrandedCell` (without tuple wrapper) to handle intersection types like `OpaqueCell & Y`. However, this caused TypeScript to distribute over union types, which broke the null-preservation fix from commit 3f3de5515. For example, `string | null extends AnyBrandedCell` distributes to: - `string extends AnyBrandedCell` (evaluated separately) - `null extends AnyBrandedCell` (evaluated separately) This changed the type structure and caused schema generation to lose null. The correct approach handles nullable intersection types in the nullable handling section of OpaqueRefInner: `[NonNullable] extends [AnyBrandedCell] ? T` This: 1. Uses the tuple wrapper to prevent distribution 2. Strips null/undefined first with NonNullable 3. Then checks if the remaining type is a branded cell For `(OpaqueCell & X) | undefined`: - NonNullable gives `OpaqueCell & X` - Tuple check correctly identifies it as a branded cell - Returns T unchanged (preserving the union with undefined) πŸ€– Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --------- Co-authored-by: Claude Opus 4.5 --- packages/api/index.ts | 4 +- .../test/opaqueref-intersection.test.ts | 112 ++++++++++++++++++ 2 files changed, 115 insertions(+), 1 deletion(-) create mode 100644 packages/runner/test/opaqueref-intersection.test.ts diff --git a/packages/api/index.ts b/packages/api/index.ts index 921611f908..3c348185c1 100644 --- a/packages/api/index.ts +++ b/packages/api/index.ts @@ -503,8 +503,10 @@ type OpaqueRefInner = [T] extends : [T] extends [Array] ? Array> : [T] extends [AnyBrandedCell] ? T : [T] extends [object] ? { [K in keyof T]: OpaqueRef } - // For nullable types (T | null | undefined), extract and map the non-null object/array part + // For nullable types (T | null | undefined), extract and map the non-null part : [NonNullable] extends [never] ? T + // Handle nullable branded cells (e.g., (OpaqueCell & X) | undefined) - don't wrap + : [NonNullable] extends [AnyBrandedCell] ? T : [NonNullable] extends [Array] ? Array> : [NonNullable] extends [object] ? { [K in keyof NonNullable]: OpaqueRef[K]> } diff --git a/packages/runner/test/opaqueref-intersection.test.ts b/packages/runner/test/opaqueref-intersection.test.ts new file mode 100644 index 0000000000..fa9192a6a8 --- /dev/null +++ b/packages/runner/test/opaqueref-intersection.test.ts @@ -0,0 +1,112 @@ +/** + * Regression test for OpaqueRef intersection type handling. + * + * This is a minimal reproduction of the type error from + * community-patterns/patterns/jkomoros/components/search-select-prototype.tsx + * + * Without the fix, this pattern fails to compile with: + * Type 'OpaqueCell<{ value: OpaqueCell & string; ... }> & {...}' + * is not assignable to type 'NormalizedItem'. + * Types of property 'group' are incompatible. + * + * The fix adds `[NonNullable] extends [AnyBrandedCell]` check to + * OpaqueRefInner to handle nullable intersection types correctly without + * causing distribution over union types (which would lose null/undefined). + * + * NOTE: This is a type-level test. The assertions at runtime are trivial; + * the real test is that this file compiles successfully. + */ +import { describe, it } from "@std/testing/bdd"; +import { expect } from "@std/expect"; +import type { OpaqueCell, OpaqueRef } from "@commontools/api"; + +interface NormalizedItem { + value: string; + label: string; + group?: string; +} + +describe("OpaqueRef intersection type handling", () => { + it("should not double-wrap properties that are already intersection types", () => { + // This reproduces the pattern from search-select-prototype.tsx: + // + // 1. props.items comes from pattern props (already OpaqueRef wrapped) + // 2. normalizedItems = computed(() => items.map(item => ({ + // value: item.value, // OpaqueCell & string + // label: item.label, // OpaqueCell & string + // group: item.group, // (OpaqueCell & string) | undefined + // }))) + // The mapped result has properties that ARE ALREADY INTERSECTION TYPES + // + // 3. computed() wraps this with OpaqueRef + // + // 4. itemLookup iterates normalizedItems, assigning items to Record + // + // THE BUG: When OpaqueRef processes MappedItem, it wraps the already-wrapped + // properties AGAIN, creating nested OpaqueCell types that aren't assignable. + + // Simulate what map() produces when iterating OpaqueRef-wrapped items: + // Properties are already intersection types from the source OpaqueRef + type MappedItem = { + value: OpaqueCell & string; + label: OpaqueCell & string; + group?: (OpaqueCell & string) | undefined; + }; + + // computed() wraps the result with OpaqueRef + type NormalizedItems = OpaqueRef; + + // When we iterate, we get elements of this type + type NormalizedElement = NormalizedItems extends Array ? U : never; + + // THE CRITICAL TYPE TEST: This function signature compiles only if + // NormalizedElement is assignable to NormalizedItem. + // + // Without the fix, item.group has type: + // OpaqueCell<(OpaqueCell & string) | undefined> & {...} + // which is NOT assignable to string | undefined + // + // With the fix, item.group keeps the original type: + // (OpaqueCell & string) | undefined + // which IS assignable to string | undefined + function assignToLookup(item: NormalizedElement) { + const lookup: Record = {}; + lookup[item.value] = item; + return lookup; + } + + // Runtime assertion is trivial - the type check is what matters + expect(assignToLookup).toBeDefined(); + }); + + it("should allow string methods on intersection type properties", () => { + // Same setup: properties are already intersection types from OpaqueRef + type MappedItem = { + value: OpaqueCell & string; + label: OpaqueCell & string; + group?: (OpaqueCell & string) | undefined; + }; + + type NormalizedItems = OpaqueRef; + type NormalizedElement = NormalizedItems extends Array ? U : never; + + // THE CRITICAL TYPE TEST: This function signature compiles only if + // item.group has string methods available. + // + // Without the fix, item.group has type OpaqueCell<...> which has no + // call signatures for toLowerCase + // + // With the fix, item.group keeps the intersection type where string + // methods are available + function filterItems(items: NormalizedElement[], q: string) { + return items.filter( + (item) => + item.label.toLowerCase().includes(q) || + (item.group?.toLowerCase().includes(q) ?? false), + ); + } + + // Runtime assertion is trivial - the type check is what matters + expect(filterItems).toBeDefined(); + }); +});