From 12c52a26f42f1b5d983b1e41dd64f77e7142400c Mon Sep 17 00:00:00 2001 From: Bernardo Gurgel Date: Wed, 21 Jan 2026 22:33:14 +0100 Subject: [PATCH] perf: unify computed observer and subs into single structure Merge the observer object into the subs structure for computed signals, eliminating one allocation per computed and removing indirection in the hot path. Previously computed signals required three objects: backing signal, subs (subscriber list), and a separate observer. Now the subs structure serves double duty - it holds both subscriber list fields AND observer fields (compute, firstDep/lastDep, flags, level) when used for a computed. --- src/signals/Signals__Computed.res | 50 ++-- src/signals/Signals__Core.res | 109 +++++++-- src/signals/Signals__Effects.res | 4 +- src/signals/Signals__Scheduler.res | 359 +++++++++++++++++------------ src/signals/Signals__Signal.res | 10 +- 5 files changed, 332 insertions(+), 200 deletions(-) diff --git a/src/signals/Signals__Computed.res b/src/signals/Signals__Computed.res index 542a6e2..6e58ce4 100644 --- a/src/signals/Signals__Computed.res +++ b/src/signals/Signals__Computed.res @@ -4,36 +4,42 @@ module Core = Signals__Core module Scheduler = Signals__Scheduler let make = (compute: unit => 'a, ~name: option=?): Signal.t<'a> => { - // Create backing signal with magic initial value (optimized path) - let backingSignal = Signal.makeForComputed((Obj.magic(): 'a), ~name?) + let id = Id.make() - // Create observer ID - let observerId = Id.make() + // Create a mutable ref to hold the signal so the compute function can update it + // Using Obj.magic to avoid Option wrapper overhead + let signalRef: ref> = ref(Obj.magic()) - // Recompute function - updates backing signal's value directly + // Recompute function - updates the signal's value directly let recompute = () => { - let newValue = compute() - backingSignal.value = newValue + signalRef.contents.value = compute() } - // Create observer using Core types, with backingSubs for dirty propagation - let observer = Core.makeObserver(observerId, #Computed(backingSignal.id), recompute, ~name?, ~backingSubs=backingSignal.subs) - - // Initial computation under tracking (no clearDeps needed - observer is fresh) - let prev = Scheduler.currentObserver.contents - Scheduler.currentObserver := Some(observer) - observer.run() - Core.clearDirty(observer) - Scheduler.currentObserver := prev - - // Level will be computed lazily on first retrack (starts at 0) + // Create combined subs (this IS the observer for the computed) + let subs = Core.makeComputedSubs(recompute) + + // Initial computation under tracking to establish dependencies + let prev = Scheduler.currentComputedSubs.contents + Scheduler.currentComputedSubs := Some(subs) + let initialValue = compute() + Scheduler.currentComputedSubs := prev + + // Create the signal with the initial value + let signal: Signal.t<'a> = { + id, + value: initialValue, + equals: (_, _) => false, // Computeds always check freshness via dirty flag + name, + subs, + } - // Register for lookup by signal ID (needed for ensureComputedFresh and dirty propagation) - Scheduler.registerComputed(backingSignal.id, observer, backingSignal.subs) + // Set the ref so recompute can access the signal + signalRef := signal + Core.clearSubsDirty(subs) - backingSignal + signal } let dispose = (signal: Signal.t<'a>): unit => { - Scheduler.unregisterComputed(signal.id, signal.subs) + Core.clearSubsDeps(signal.subs) } diff --git a/src/signals/Signals__Core.res b/src/signals/Signals__Core.res index b0fbc54..3df7fbd 100644 --- a/src/signals/Signals__Core.res +++ b/src/signals/Signals__Core.res @@ -7,7 +7,7 @@ let flag_pending = 2 let flag_running = 4 // Observer kind tag -type kind = [#Effect | #Computed(int)] +type kind = [#Effect | #Computed] // Forward declare mutually recursive types type rec link = { @@ -24,35 +24,66 @@ type rec link = { } // Signal subscriber list (head/tail of linked list) +// For computeds, this same object also serves as the observer (combined structure) and subs = { mutable first: option, mutable last: option, - mutable version: int, // signal version for freshness check - // For computed signals: direct reference to backing observer (avoids Map lookup) - mutable computedObserver: option, + mutable version: int, + // === Observer fields (only used for computeds) === + // If compute is Some, this subs is a computed signal + mutable compute: option unit>, + mutable firstDep: option, + mutable lastDep: option, + mutable flags: int, + mutable level: int, } -// Observer with dependency list +// Observer for effects only (computeds use subs directly) and observer = { id: int, kind: kind, run: unit => unit, - // Dependency linked list (replaces Set) mutable firstDep: option, mutable lastDep: option, - // State flags (replaces dirty: bool) mutable flags: int, mutable level: int, name: option, - // For computed observers: direct reference to backing signal's subs (avoids Map lookup) + // For computed observers: direct reference to backing subs (the combined object) mutable backingSubs: option, } -// Create empty subscriber list -let makeSubs = (): subs => {first: None, last: None, version: 0, computedObserver: None} +// Create empty subscriber list (for plain signals) +let makeSubs = (): subs => { + first: None, + last: None, + version: 0, + compute: None, + firstDep: None, + lastDep: None, + flags: 0, + level: 0, +} + +// Create subs for a computed (with compute function) +let makeComputedSubs = (compute: unit => unit): subs => { + first: None, + last: None, + version: 0, + compute: Some(compute), + firstDep: None, + lastDep: None, + flags: flag_dirty, // start dirty + level: 0, +} // Create observer -let makeObserver = (id: int, kind: kind, run: unit => unit, ~name: option=?, ~backingSubs: option=?): observer => { +let makeObserver = ( + id: int, + kind: kind, + run: unit => unit, + ~name: option=?, + ~backingSubs: option=?, +): observer => { id, kind, run, @@ -64,13 +95,28 @@ let makeObserver = (id: int, kind: kind, run: unit => unit, ~name: option Int.Bitwise.land(o.flags, flag_dirty) !== 0 -let setDirty = (o: observer): unit => o.flags = Int.Bitwise.lor(o.flags, flag_dirty) -let clearDirty = (o: observer): unit => o.flags = Int.Bitwise.land(o.flags, Int.Bitwise.lnot(flag_dirty)) -let isPending = (o: observer): bool => Int.Bitwise.land(o.flags, flag_pending) !== 0 -let setPending = (o: observer): unit => o.flags = Int.Bitwise.lor(o.flags, flag_pending) -let clearPending = (o: observer): unit => o.flags = Int.Bitwise.land(o.flags, Int.Bitwise.lnot(flag_pending)) +// Flag operations for observer (using Int.Bitwise module) +let isDirty = (o: observer): bool => Int.bitwiseAnd(o.flags, flag_dirty) !== 0 +let setDirty = (o: observer): unit => o.flags = Int.bitwiseOr(o.flags, flag_dirty) +let clearDirty = (o: observer): unit => + o.flags = Int.bitwiseAnd(o.flags, Int.bitwiseNot(flag_dirty)) +let isPending = (o: observer): bool => Int.bitwiseAnd(o.flags, flag_pending) !== 0 +let setPending = (o: observer): unit => o.flags = Int.bitwiseOr(o.flags, flag_pending) +let clearPending = (o: observer): unit => + o.flags = Int.bitwiseAnd(o.flags, Int.bitwiseNot(flag_pending)) + +// Flag operations for subs (for computeds - subs IS the observer) +let isSubsDirty = (s: subs): bool => Int.bitwiseAnd(s.flags, flag_dirty) !== 0 +let setSubsDirty = (s: subs): unit => s.flags = Int.bitwiseOr(s.flags, flag_dirty) +let clearSubsDirty = (s: subs): unit => + s.flags = Int.bitwiseAnd(s.flags, Int.bitwiseNot(flag_dirty)) +let isSubsPending = (s: subs): bool => Int.bitwiseAnd(s.flags, flag_pending) !== 0 +let setSubsPending = (s: subs): unit => s.flags = Int.bitwiseOr(s.flags, flag_pending) +let clearSubsPending = (s: subs): unit => + s.flags = Int.bitwiseAnd(s.flags, Int.bitwiseNot(flag_pending)) + +// Check if subs is a computed (has compute function) +let isComputed = (s: subs): bool => s.compute !== None // Create a link node let makeLink = (subs: subs, observer: observer): link => { @@ -148,3 +194,30 @@ let clearDeps = (observer: observer): unit => { observer.firstDep = None observer.lastDep = None } + +// Clear all dependencies from subs (for computeds - subs IS the observer) +let clearSubsDeps = (s: subs): unit => { + let link = ref(s.firstDep) + while link.contents !== None { + switch link.contents { + | Some(l) => + let next = l.nextDep + unlinkFromSubs(l) + link := next + | None => () + } + } + s.firstDep = None + s.lastDep = None +} + +// Add link to subs's dependency list (for computeds - subs IS the observer) +let linkToSubsDeps = (s: subs, link: link): unit => { + link.prevDep = s.lastDep + link.nextDep = None + switch s.lastDep { + | Some(last) => last.nextDep = Some(link) + | None => s.firstDep = Some(link) + } + s.lastDep = Some(link) +} diff --git a/src/signals/Signals__Effects.res b/src/signals/Signals__Effects.res index 0339fd3..db123e9 100644 --- a/src/signals/Signals__Effects.res +++ b/src/signals/Signals__Effects.res @@ -23,9 +23,7 @@ let run = (fn: unit => option unit>, ~name: option=?): disposer // Create observer using Core types let observer = Core.makeObserver(observerId, #Effect, runWithCleanup, ~name?) - // Initial run under tracking - Core.clearDeps(observer) - + // Initial run under tracking (no need to clearDeps - observer is fresh) let prev = Scheduler.currentObserver.contents Scheduler.currentObserver := Some(observer) diff --git a/src/signals/Signals__Scheduler.res b/src/signals/Signals__Scheduler.res index 789f6de..fb8418e 100644 --- a/src/signals/Signals__Scheduler.res +++ b/src/signals/Signals__Scheduler.res @@ -1,10 +1,15 @@ module Core = Signals__Core -// Current execution context - direct observer reference (no Map lookup needed) +// Current execution context for computeds (subs IS the observer) +let currentComputedSubs: ref> = ref(None) + +// Current execution context for effects let currentObserver: ref> = ref(None) -// Pending observers to execute (direct references) -let pending: array = [] +// Pending effects to execute +let pendingEffects: array = [] +// Pending computeds to recompute (subs that are dirty) +let pendingComputedSubs: array = [] let flushing: ref = ref(false) // Queue for iterative dirty marking @@ -13,104 +18,184 @@ let dirtyQueue: array = [] // Efficient array clear let clearArray: array<'a> => unit = %raw(`function(arr) { arr.length = 0 }`) -// Pre-allocated arrays for flush to avoid repeated allocations -let pendingComputeds: array = [] -let pendingEffects: array = [] - -// Add observer to pending if not already there -let addToPending = (observer: Core.observer): unit => { +// Add effect to pending if not already there +let addEffectToPending = (observer: Core.observer): unit => { if !Core.isPending(observer) { Core.setPending(observer) - pending->Array.push(observer)->ignore + pendingEffects->Array.push(observer)->ignore } } -// Track a dependency: create Link between observer and signal's subs -let trackDep = (observer: Core.observer, subs: Core.subs): unit => { - // Check if already tracking this signal (walk dep list) - let found = ref(false) - let link = ref(observer.firstDep) - while link.contents !== None && !found.contents { - switch link.contents { - | Some(l) => - if l.subs === subs { - found := true - } else { - link := l.nextDep +// Track a dependency from a computed (subs tracks subs) +// For initial computation (firstDep is None), skip duplicate check since deps are empty +let trackDepFromComputed = (computedSubs: Core.subs, sourceSubs: Core.subs): unit => { + // Fast path: if no deps yet, no need to check for duplicates + if computedSubs.firstDep === None { + let newLink: Core.link = { + subs: sourceSubs, + observer: Obj.magic(computedSubs), + nextDep: None, + prevDep: None, + nextSub: None, + prevSub: None, + } + Core.linkToSubsDeps(computedSubs, newLink) + Core.linkToSubs(sourceSubs, newLink) + } else { + // Check if already tracking this signal (walk dep list) + let found = ref(false) + let link = ref(computedSubs.firstDep) + while link.contents !== None && !found.contents { + switch link.contents { + | Some(l) => + if l.subs === sourceSubs { + found := true + } else { + link := l.nextDep + } + | None => () } - | None => () + } + + // Only add if not already tracked + if !found.contents { + let newLink: Core.link = { + subs: sourceSubs, + observer: Obj.magic(computedSubs), + nextDep: None, + prevDep: None, + nextSub: None, + prevSub: None, + } + Core.linkToSubsDeps(computedSubs, newLink) + Core.linkToSubs(sourceSubs, newLink) } } +} - // Only add if not already tracked - if !found.contents { +// Track a dependency from an effect (observer tracks subs) +// For initial run (firstDep is None), skip duplicate check since deps are empty +let trackDepFromEffect = (observer: Core.observer, subs: Core.subs): unit => { + // Fast path: if no deps yet, no need to check for duplicates + if observer.firstDep === None { let newLink = Core.makeLink(subs, observer) Core.linkToDeps(observer, newLink) Core.linkToSubs(subs, newLink) + } else { + // Check if already tracking this signal (walk dep list) + let found = ref(false) + let link = ref(observer.firstDep) + while link.contents !== None && !found.contents { + switch link.contents { + | Some(l) => + if l.subs === subs { + found := true + } else { + link := l.nextDep + } + | None => () + } + } + + // Only add if not already tracked + if !found.contents { + let newLink = Core.makeLink(subs, observer) + Core.linkToDeps(observer, newLink) + Core.linkToSubs(subs, newLink) + } + } +} + +// Track dependency - routes to appropriate function based on current context +let trackDep = (subs: Core.subs): unit => { + switch currentComputedSubs.contents { + | Some(computedSubs) => trackDepFromComputed(computedSubs, subs) + | None => + switch currentObserver.contents { + | Some(observer) => trackDepFromEffect(observer, subs) + | None => () + } } } -// Compare observers by level for sorting -let compareByLevel = (a: Core.observer, b: Core.observer): float => { +// Compare by level for sorting +let compareEffectsByLevel = (a: Core.observer, b: Core.observer): float => { Int.toFloat(a.level - b.level) } -// Check if array needs sorting (all same level = no sort needed) -let needsSort = (arr: array): bool => { - let len = arr->Array.length - if len <= 1 { - false - } else { - let firstLevel = (Array.getUnsafe(arr, 0)).level - let found = ref(false) - let i = ref(1) - while i.contents < len && !found.contents { - if (Array.getUnsafe(arr, i.contents)).level !== firstLevel { - found := true +let compareSubsByLevel = (a: Core.subs, b: Core.subs): float => { + Int.toFloat(a.level - b.level) +} + +// Compute level for a computed (based on its dependencies) +let computeSubsLevel = (s: Core.subs): int => { + let maxLevel = ref(0) + let link = ref(s.firstDep) + while link.contents !== None { + switch link.contents { + | Some(l) => + // Check if the source is a computed + if Core.isComputed(l.subs) { + if l.subs.level > maxLevel.contents { + maxLevel := l.subs.level + } } - i := i.contents + 1 + link := l.nextDep + | None => () } - found.contents } + maxLevel.contents + 1 } -// Compute level based on dependencies +// Compute level for an effect let rec computeLevel = (observer: Core.observer): int => { let maxLevel = ref(0) - - // Walk dependency list let link = ref(observer.firstDep) while link.contents !== None { switch link.contents { | Some(l) => - // Check if the source is a computed - // Walk the subs to find observers that are computeds - let subLink = ref(l.subs.first) - while subLink.contents !== None { - switch subLink.contents { - | Some(sl) => - switch sl.observer.kind { - | #Computed(_) => - if sl.observer.level > maxLevel.contents { - maxLevel := sl.observer.level - } - | #Effect => () - } - subLink := sl.nextSub - | None => () + if Core.isComputed(l.subs) { + if l.subs.level > maxLevel.contents { + maxLevel := l.subs.level } } link := l.nextDep | None => () } } - maxLevel.contents + 1 } -// Retrack an observer: clear deps, run, rebuild deps -and retrack = (observer: Core.observer): unit => { - // Save old level to check if recomputation is needed +// Retrack a computed (recompute and rebuild deps) +and retrackComputed = (s: Core.subs): unit => { + let oldLevel = s.level + + Core.clearSubsDeps(s) + Core.clearSubsPending(s) + + let prev = currentComputedSubs.contents + currentComputedSubs := Some(s) + + try { + switch s.compute { + | Some(compute) => compute() + | None => () + } + Core.clearSubsDirty(s) + currentComputedSubs := prev + } catch { + | exn => + currentComputedSubs := prev + throw(exn) + } + + if oldLevel == 0 { + s.level = computeSubsLevel(s) + } +} + +// Retrack an effect +and retrackEffect = (observer: Core.observer): unit => { let oldLevel = observer.level Core.clearDeps(observer) @@ -129,8 +214,6 @@ and retrack = (observer: Core.observer): unit => { throw(exn) } - // Only recompute level if this is a new observer (level 0) or we need accuracy - // For most cases, level stays stable after first computation if oldLevel == 0 { observer.level = computeLevel(observer) } @@ -141,31 +224,23 @@ and flush = (): unit => { flushing := true try { - while pending->Array.length > 0 { - // Clear reusable arrays efficiently - clearArray(pendingComputeds) - clearArray(pendingEffects) - - // Separate computeds and effects into pre-allocated arrays - pending->Array.forEach(observer => { - switch observer.kind { - | #Computed(_) => pendingComputeds->Array.push(observer)->ignore - | #Effect => pendingEffects->Array.push(observer)->ignore - } - }) - clearArray(pending) - - // Sort by level only if needed (skip when all same level) - if needsSort(pendingComputeds) { - pendingComputeds->Array.sort(compareByLevel)->ignore - } - if needsSort(pendingEffects) { - pendingEffects->Array.sort(compareByLevel)->ignore + while pendingEffects->Array.length > 0 || pendingComputedSubs->Array.length > 0 { + // Process computeds first (they might trigger more effects) + if pendingComputedSubs->Array.length > 0 { + // Sort by level + pendingComputedSubs->Array.sort(compareSubsByLevel)->ignore + let computeds = pendingComputedSubs->Array.copy + clearArray(pendingComputedSubs) + computeds->Array.forEach(retrackComputed) } - // Execute computeds first, then effects - pendingComputeds->Array.forEach(retrack) - pendingEffects->Array.forEach(retrack) + // Then process effects + if pendingEffects->Array.length > 0 { + pendingEffects->Array.sort(compareEffectsByLevel)->ignore + let effects = pendingEffects->Array.copy + clearArray(pendingEffects) + effects->Array.forEach(retrackEffect) + } } flushing := false @@ -177,35 +252,31 @@ and flush = (): unit => { } // Notify all subscribers of a signal (traverse linked list) -// Must be defined after flush since it calls flush let notifySubs = (subs: Core.subs): unit => { - // Seed the dirty queue dirtyQueue->Array.push(subs)->ignore - // Process iteratively to avoid stack overflow while dirtyQueue->Array.length > 0 { let currentSubs = dirtyQueue->Array.pop switch currentSubs { | None => () | Some(s) => - // Walk subscriber list let link = ref(s.first) while link.contents !== None { switch link.contents { | Some(l) => - let observer = l.observer - switch observer.kind { - | #Effect => - addToPending(observer) - | #Computed(_) => - if !Core.isDirty(observer) { - Core.setDirty(observer) - // Propagate to the computed's subscribers using direct reference - switch observer.backingSubs { - | Some(backingSubs) => dirtyQueue->Array.push(backingSubs)->ignore - | None => () - } + // The observer field might be a real observer (effect) or a subs (computed) + // We detect by checking if the subs the link came FROM is a computed + let linkedSubs = (Obj.magic(l.observer): Core.subs) + if Core.isComputed(linkedSubs) { + // It's a computed - mark dirty and propagate + if !Core.isSubsDirty(linkedSubs) { + Core.setSubsDirty(linkedSubs) + dirtyQueue->Array.push(linkedSubs)->ignore } + } else { + // It's an effect + let observer = l.observer + addEffectToPending(observer) } link := l.nextSub | None => () @@ -214,46 +285,43 @@ let notifySubs = (subs: Core.subs): unit => { } } - // Trigger flush - if pending->Array.length > 0 && !flushing.contents { + if (pendingEffects->Array.length > 0 || pendingComputedSubs->Array.length > 0) && !flushing.contents { flush() } } -// Ensure a computed signal is fresh before reading (uses subs.computedObserver directly) +// Ensure a computed signal is fresh before reading let ensureComputedFresh = (subs: Core.subs): unit => { - switch subs.computedObserver { - | Some(observer) => - if Core.isDirty(observer) { - let oldLevel = observer.level - - Core.clearDeps(observer) - - let prev = currentObserver.contents - currentObserver := Some(observer) - - try { - observer.run() - Core.clearDirty(observer) - currentObserver := prev - } catch { - | exn => - currentObserver := prev - throw(exn) - } + if Core.isComputed(subs) && Core.isSubsDirty(subs) { + let oldLevel = subs.level + + Core.clearSubsDeps(subs) + + let prev = currentComputedSubs.contents + currentComputedSubs := Some(subs) - // Only recompute level on first run - if oldLevel == 0 { - observer.level = computeLevel(observer) + try { + switch subs.compute { + | Some(compute) => compute() + | None => () } + Core.clearSubsDirty(subs) + currentComputedSubs := prev + } catch { + | exn => + currentComputedSubs := prev + throw(exn) + } + + if oldLevel == 0 { + subs.level = computeSubsLevel(subs) } - | None => () } } -// Schedule an observer for execution +// Schedule an effect for execution let schedule = (observer: Core.observer): unit => { - addToPending(observer) + addEffectToPending(observer) if !flushing.contents { flush() } @@ -268,7 +336,7 @@ let batch = fn => { let result = fn() if !wasFlushing { flushing := false - if pending->Array.length > 0 { + if pendingEffects->Array.length > 0 || pendingComputedSubs->Array.length > 0 { flush() } } @@ -284,30 +352,19 @@ let batch = fn => { // Execute without tracking dependencies let untrack = (fn: unit => 'a): 'a => { - let prev = currentObserver.contents + let prevComputed = currentComputedSubs.contents + let prevObserver = currentObserver.contents + currentComputedSubs := None currentObserver := None try { let result = fn() - currentObserver := prev + currentComputedSubs := prevComputed + currentObserver := prevObserver result } catch { | exn => - currentObserver := prev + currentComputedSubs := prevComputed + currentObserver := prevObserver throw(exn) } } - -// Register a computed's observer on subs (no Map needed, backingSubs is on observer) -let registerComputed = (_signalId: int, observer: Core.observer, subs: Core.subs): unit => { - subs.computedObserver = Some(observer) -} - -// Unregister a computed (for disposal) -let unregisterComputed = (_signalId: int, subs: Core.subs): unit => { - switch subs.computedObserver { - | Some(observer) => - Core.clearDeps(observer) - subs.computedObserver = None - | None => () - } -} diff --git a/src/signals/Signals__Signal.res b/src/signals/Signals__Signal.res index 1f92e90..f0aaedc 100644 --- a/src/signals/Signals__Signal.res +++ b/src/signals/Signals__Signal.res @@ -37,15 +37,13 @@ let makeForComputed = (initialValue: 'a, ~name: option=?): t<'a> => { } } +// Optimized get - inlined hot path checks let get = (signal: t<'a>): 'a => { - // Ensure computed is fresh (no-op for plain signals) + // Ensure computed is fresh Scheduler.ensureComputedFresh(signal.subs) - // Track dependency if we're inside an observer - switch Scheduler.currentObserver.contents { - | Some(observer) => Scheduler.trackDep(observer, signal.subs) - | None => () - } + // Track dependency if we're inside a computed or effect + Scheduler.trackDep(signal.subs) signal.value }