diff --git a/src/signals/Signals__Computed.res b/src/signals/Signals__Computed.res index 542a6e2..6e58ce4 100644 --- a/src/signals/Signals__Computed.res +++ b/src/signals/Signals__Computed.res @@ -4,36 +4,42 @@ module Core = Signals__Core module Scheduler = Signals__Scheduler let make = (compute: unit => 'a, ~name: option=?): Signal.t<'a> => { - // Create backing signal with magic initial value (optimized path) - let backingSignal = Signal.makeForComputed((Obj.magic(): 'a), ~name?) + let id = Id.make() - // Create observer ID - let observerId = Id.make() + // Create a mutable ref to hold the signal so the compute function can update it + // Using Obj.magic to avoid Option wrapper overhead + let signalRef: ref> = ref(Obj.magic()) - // Recompute function - updates backing signal's value directly + // Recompute function - updates the signal's value directly let recompute = () => { - let newValue = compute() - backingSignal.value = newValue + signalRef.contents.value = compute() } - // Create observer using Core types, with backingSubs for dirty propagation - let observer = Core.makeObserver(observerId, #Computed(backingSignal.id), recompute, ~name?, ~backingSubs=backingSignal.subs) - - // Initial computation under tracking (no clearDeps needed - observer is fresh) - let prev = Scheduler.currentObserver.contents - Scheduler.currentObserver := Some(observer) - observer.run() - Core.clearDirty(observer) - Scheduler.currentObserver := prev - - // Level will be computed lazily on first retrack (starts at 0) + // Create combined subs (this IS the observer for the computed) + let subs = Core.makeComputedSubs(recompute) + + // Initial computation under tracking to establish dependencies + let prev = Scheduler.currentComputedSubs.contents + Scheduler.currentComputedSubs := Some(subs) + let initialValue = compute() + Scheduler.currentComputedSubs := prev + + // Create the signal with the initial value + let signal: Signal.t<'a> = { + id, + value: initialValue, + equals: (_, _) => false, // Computeds always check freshness via dirty flag + name, + subs, + } - // Register for lookup by signal ID (needed for ensureComputedFresh and dirty propagation) - Scheduler.registerComputed(backingSignal.id, observer, backingSignal.subs) + // Set the ref so recompute can access the signal + signalRef := signal + Core.clearSubsDirty(subs) - backingSignal + signal } let dispose = (signal: Signal.t<'a>): unit => { - Scheduler.unregisterComputed(signal.id, signal.subs) + Core.clearSubsDeps(signal.subs) } diff --git a/src/signals/Signals__Core.res b/src/signals/Signals__Core.res index b0fbc54..3df7fbd 100644 --- a/src/signals/Signals__Core.res +++ b/src/signals/Signals__Core.res @@ -7,7 +7,7 @@ let flag_pending = 2 let flag_running = 4 // Observer kind tag -type kind = [#Effect | #Computed(int)] +type kind = [#Effect | #Computed] // Forward declare mutually recursive types type rec link = { @@ -24,35 +24,66 @@ type rec link = { } // Signal subscriber list (head/tail of linked list) +// For computeds, this same object also serves as the observer (combined structure) and subs = { mutable first: option, mutable last: option, - mutable version: int, // signal version for freshness check - // For computed signals: direct reference to backing observer (avoids Map lookup) - mutable computedObserver: option, + mutable version: int, + // === Observer fields (only used for computeds) === + // If compute is Some, this subs is a computed signal + mutable compute: option unit>, + mutable firstDep: option, + mutable lastDep: option, + mutable flags: int, + mutable level: int, } -// Observer with dependency list +// Observer for effects only (computeds use subs directly) and observer = { id: int, kind: kind, run: unit => unit, - // Dependency linked list (replaces Set) mutable firstDep: option, mutable lastDep: option, - // State flags (replaces dirty: bool) mutable flags: int, mutable level: int, name: option, - // For computed observers: direct reference to backing signal's subs (avoids Map lookup) + // For computed observers: direct reference to backing subs (the combined object) mutable backingSubs: option, } -// Create empty subscriber list -let makeSubs = (): subs => {first: None, last: None, version: 0, computedObserver: None} +// Create empty subscriber list (for plain signals) +let makeSubs = (): subs => { + first: None, + last: None, + version: 0, + compute: None, + firstDep: None, + lastDep: None, + flags: 0, + level: 0, +} + +// Create subs for a computed (with compute function) +let makeComputedSubs = (compute: unit => unit): subs => { + first: None, + last: None, + version: 0, + compute: Some(compute), + firstDep: None, + lastDep: None, + flags: flag_dirty, // start dirty + level: 0, +} // Create observer -let makeObserver = (id: int, kind: kind, run: unit => unit, ~name: option=?, ~backingSubs: option=?): observer => { +let makeObserver = ( + id: int, + kind: kind, + run: unit => unit, + ~name: option=?, + ~backingSubs: option=?, +): observer => { id, kind, run, @@ -64,13 +95,28 @@ let makeObserver = (id: int, kind: kind, run: unit => unit, ~name: option Int.Bitwise.land(o.flags, flag_dirty) !== 0 -let setDirty = (o: observer): unit => o.flags = Int.Bitwise.lor(o.flags, flag_dirty) -let clearDirty = (o: observer): unit => o.flags = Int.Bitwise.land(o.flags, Int.Bitwise.lnot(flag_dirty)) -let isPending = (o: observer): bool => Int.Bitwise.land(o.flags, flag_pending) !== 0 -let setPending = (o: observer): unit => o.flags = Int.Bitwise.lor(o.flags, flag_pending) -let clearPending = (o: observer): unit => o.flags = Int.Bitwise.land(o.flags, Int.Bitwise.lnot(flag_pending)) +// Flag operations for observer (using Int.Bitwise module) +let isDirty = (o: observer): bool => Int.bitwiseAnd(o.flags, flag_dirty) !== 0 +let setDirty = (o: observer): unit => o.flags = Int.bitwiseOr(o.flags, flag_dirty) +let clearDirty = (o: observer): unit => + o.flags = Int.bitwiseAnd(o.flags, Int.bitwiseNot(flag_dirty)) +let isPending = (o: observer): bool => Int.bitwiseAnd(o.flags, flag_pending) !== 0 +let setPending = (o: observer): unit => o.flags = Int.bitwiseOr(o.flags, flag_pending) +let clearPending = (o: observer): unit => + o.flags = Int.bitwiseAnd(o.flags, Int.bitwiseNot(flag_pending)) + +// Flag operations for subs (for computeds - subs IS the observer) +let isSubsDirty = (s: subs): bool => Int.bitwiseAnd(s.flags, flag_dirty) !== 0 +let setSubsDirty = (s: subs): unit => s.flags = Int.bitwiseOr(s.flags, flag_dirty) +let clearSubsDirty = (s: subs): unit => + s.flags = Int.bitwiseAnd(s.flags, Int.bitwiseNot(flag_dirty)) +let isSubsPending = (s: subs): bool => Int.bitwiseAnd(s.flags, flag_pending) !== 0 +let setSubsPending = (s: subs): unit => s.flags = Int.bitwiseOr(s.flags, flag_pending) +let clearSubsPending = (s: subs): unit => + s.flags = Int.bitwiseAnd(s.flags, Int.bitwiseNot(flag_pending)) + +// Check if subs is a computed (has compute function) +let isComputed = (s: subs): bool => s.compute !== None // Create a link node let makeLink = (subs: subs, observer: observer): link => { @@ -148,3 +194,30 @@ let clearDeps = (observer: observer): unit => { observer.firstDep = None observer.lastDep = None } + +// Clear all dependencies from subs (for computeds - subs IS the observer) +let clearSubsDeps = (s: subs): unit => { + let link = ref(s.firstDep) + while link.contents !== None { + switch link.contents { + | Some(l) => + let next = l.nextDep + unlinkFromSubs(l) + link := next + | None => () + } + } + s.firstDep = None + s.lastDep = None +} + +// Add link to subs's dependency list (for computeds - subs IS the observer) +let linkToSubsDeps = (s: subs, link: link): unit => { + link.prevDep = s.lastDep + link.nextDep = None + switch s.lastDep { + | Some(last) => last.nextDep = Some(link) + | None => s.firstDep = Some(link) + } + s.lastDep = Some(link) +} diff --git a/src/signals/Signals__Effects.res b/src/signals/Signals__Effects.res index 0339fd3..db123e9 100644 --- a/src/signals/Signals__Effects.res +++ b/src/signals/Signals__Effects.res @@ -23,9 +23,7 @@ let run = (fn: unit => option unit>, ~name: option=?): disposer // Create observer using Core types let observer = Core.makeObserver(observerId, #Effect, runWithCleanup, ~name?) - // Initial run under tracking - Core.clearDeps(observer) - + // Initial run under tracking (no need to clearDeps - observer is fresh) let prev = Scheduler.currentObserver.contents Scheduler.currentObserver := Some(observer) diff --git a/src/signals/Signals__Scheduler.res b/src/signals/Signals__Scheduler.res index 789f6de..fb8418e 100644 --- a/src/signals/Signals__Scheduler.res +++ b/src/signals/Signals__Scheduler.res @@ -1,10 +1,15 @@ module Core = Signals__Core -// Current execution context - direct observer reference (no Map lookup needed) +// Current execution context for computeds (subs IS the observer) +let currentComputedSubs: ref> = ref(None) + +// Current execution context for effects let currentObserver: ref> = ref(None) -// Pending observers to execute (direct references) -let pending: array = [] +// Pending effects to execute +let pendingEffects: array = [] +// Pending computeds to recompute (subs that are dirty) +let pendingComputedSubs: array = [] let flushing: ref = ref(false) // Queue for iterative dirty marking @@ -13,104 +18,184 @@ let dirtyQueue: array = [] // Efficient array clear let clearArray: array<'a> => unit = %raw(`function(arr) { arr.length = 0 }`) -// Pre-allocated arrays for flush to avoid repeated allocations -let pendingComputeds: array = [] -let pendingEffects: array = [] - -// Add observer to pending if not already there -let addToPending = (observer: Core.observer): unit => { +// Add effect to pending if not already there +let addEffectToPending = (observer: Core.observer): unit => { if !Core.isPending(observer) { Core.setPending(observer) - pending->Array.push(observer)->ignore + pendingEffects->Array.push(observer)->ignore } } -// Track a dependency: create Link between observer and signal's subs -let trackDep = (observer: Core.observer, subs: Core.subs): unit => { - // Check if already tracking this signal (walk dep list) - let found = ref(false) - let link = ref(observer.firstDep) - while link.contents !== None && !found.contents { - switch link.contents { - | Some(l) => - if l.subs === subs { - found := true - } else { - link := l.nextDep +// Track a dependency from a computed (subs tracks subs) +// For initial computation (firstDep is None), skip duplicate check since deps are empty +let trackDepFromComputed = (computedSubs: Core.subs, sourceSubs: Core.subs): unit => { + // Fast path: if no deps yet, no need to check for duplicates + if computedSubs.firstDep === None { + let newLink: Core.link = { + subs: sourceSubs, + observer: Obj.magic(computedSubs), + nextDep: None, + prevDep: None, + nextSub: None, + prevSub: None, + } + Core.linkToSubsDeps(computedSubs, newLink) + Core.linkToSubs(sourceSubs, newLink) + } else { + // Check if already tracking this signal (walk dep list) + let found = ref(false) + let link = ref(computedSubs.firstDep) + while link.contents !== None && !found.contents { + switch link.contents { + | Some(l) => + if l.subs === sourceSubs { + found := true + } else { + link := l.nextDep + } + | None => () } - | None => () + } + + // Only add if not already tracked + if !found.contents { + let newLink: Core.link = { + subs: sourceSubs, + observer: Obj.magic(computedSubs), + nextDep: None, + prevDep: None, + nextSub: None, + prevSub: None, + } + Core.linkToSubsDeps(computedSubs, newLink) + Core.linkToSubs(sourceSubs, newLink) } } +} - // Only add if not already tracked - if !found.contents { +// Track a dependency from an effect (observer tracks subs) +// For initial run (firstDep is None), skip duplicate check since deps are empty +let trackDepFromEffect = (observer: Core.observer, subs: Core.subs): unit => { + // Fast path: if no deps yet, no need to check for duplicates + if observer.firstDep === None { let newLink = Core.makeLink(subs, observer) Core.linkToDeps(observer, newLink) Core.linkToSubs(subs, newLink) + } else { + // Check if already tracking this signal (walk dep list) + let found = ref(false) + let link = ref(observer.firstDep) + while link.contents !== None && !found.contents { + switch link.contents { + | Some(l) => + if l.subs === subs { + found := true + } else { + link := l.nextDep + } + | None => () + } + } + + // Only add if not already tracked + if !found.contents { + let newLink = Core.makeLink(subs, observer) + Core.linkToDeps(observer, newLink) + Core.linkToSubs(subs, newLink) + } + } +} + +// Track dependency - routes to appropriate function based on current context +let trackDep = (subs: Core.subs): unit => { + switch currentComputedSubs.contents { + | Some(computedSubs) => trackDepFromComputed(computedSubs, subs) + | None => + switch currentObserver.contents { + | Some(observer) => trackDepFromEffect(observer, subs) + | None => () + } } } -// Compare observers by level for sorting -let compareByLevel = (a: Core.observer, b: Core.observer): float => { +// Compare by level for sorting +let compareEffectsByLevel = (a: Core.observer, b: Core.observer): float => { Int.toFloat(a.level - b.level) } -// Check if array needs sorting (all same level = no sort needed) -let needsSort = (arr: array): bool => { - let len = arr->Array.length - if len <= 1 { - false - } else { - let firstLevel = (Array.getUnsafe(arr, 0)).level - let found = ref(false) - let i = ref(1) - while i.contents < len && !found.contents { - if (Array.getUnsafe(arr, i.contents)).level !== firstLevel { - found := true +let compareSubsByLevel = (a: Core.subs, b: Core.subs): float => { + Int.toFloat(a.level - b.level) +} + +// Compute level for a computed (based on its dependencies) +let computeSubsLevel = (s: Core.subs): int => { + let maxLevel = ref(0) + let link = ref(s.firstDep) + while link.contents !== None { + switch link.contents { + | Some(l) => + // Check if the source is a computed + if Core.isComputed(l.subs) { + if l.subs.level > maxLevel.contents { + maxLevel := l.subs.level + } } - i := i.contents + 1 + link := l.nextDep + | None => () } - found.contents } + maxLevel.contents + 1 } -// Compute level based on dependencies +// Compute level for an effect let rec computeLevel = (observer: Core.observer): int => { let maxLevel = ref(0) - - // Walk dependency list let link = ref(observer.firstDep) while link.contents !== None { switch link.contents { | Some(l) => - // Check if the source is a computed - // Walk the subs to find observers that are computeds - let subLink = ref(l.subs.first) - while subLink.contents !== None { - switch subLink.contents { - | Some(sl) => - switch sl.observer.kind { - | #Computed(_) => - if sl.observer.level > maxLevel.contents { - maxLevel := sl.observer.level - } - | #Effect => () - } - subLink := sl.nextSub - | None => () + if Core.isComputed(l.subs) { + if l.subs.level > maxLevel.contents { + maxLevel := l.subs.level } } link := l.nextDep | None => () } } - maxLevel.contents + 1 } -// Retrack an observer: clear deps, run, rebuild deps -and retrack = (observer: Core.observer): unit => { - // Save old level to check if recomputation is needed +// Retrack a computed (recompute and rebuild deps) +and retrackComputed = (s: Core.subs): unit => { + let oldLevel = s.level + + Core.clearSubsDeps(s) + Core.clearSubsPending(s) + + let prev = currentComputedSubs.contents + currentComputedSubs := Some(s) + + try { + switch s.compute { + | Some(compute) => compute() + | None => () + } + Core.clearSubsDirty(s) + currentComputedSubs := prev + } catch { + | exn => + currentComputedSubs := prev + throw(exn) + } + + if oldLevel == 0 { + s.level = computeSubsLevel(s) + } +} + +// Retrack an effect +and retrackEffect = (observer: Core.observer): unit => { let oldLevel = observer.level Core.clearDeps(observer) @@ -129,8 +214,6 @@ and retrack = (observer: Core.observer): unit => { throw(exn) } - // Only recompute level if this is a new observer (level 0) or we need accuracy - // For most cases, level stays stable after first computation if oldLevel == 0 { observer.level = computeLevel(observer) } @@ -141,31 +224,23 @@ and flush = (): unit => { flushing := true try { - while pending->Array.length > 0 { - // Clear reusable arrays efficiently - clearArray(pendingComputeds) - clearArray(pendingEffects) - - // Separate computeds and effects into pre-allocated arrays - pending->Array.forEach(observer => { - switch observer.kind { - | #Computed(_) => pendingComputeds->Array.push(observer)->ignore - | #Effect => pendingEffects->Array.push(observer)->ignore - } - }) - clearArray(pending) - - // Sort by level only if needed (skip when all same level) - if needsSort(pendingComputeds) { - pendingComputeds->Array.sort(compareByLevel)->ignore - } - if needsSort(pendingEffects) { - pendingEffects->Array.sort(compareByLevel)->ignore + while pendingEffects->Array.length > 0 || pendingComputedSubs->Array.length > 0 { + // Process computeds first (they might trigger more effects) + if pendingComputedSubs->Array.length > 0 { + // Sort by level + pendingComputedSubs->Array.sort(compareSubsByLevel)->ignore + let computeds = pendingComputedSubs->Array.copy + clearArray(pendingComputedSubs) + computeds->Array.forEach(retrackComputed) } - // Execute computeds first, then effects - pendingComputeds->Array.forEach(retrack) - pendingEffects->Array.forEach(retrack) + // Then process effects + if pendingEffects->Array.length > 0 { + pendingEffects->Array.sort(compareEffectsByLevel)->ignore + let effects = pendingEffects->Array.copy + clearArray(pendingEffects) + effects->Array.forEach(retrackEffect) + } } flushing := false @@ -177,35 +252,31 @@ and flush = (): unit => { } // Notify all subscribers of a signal (traverse linked list) -// Must be defined after flush since it calls flush let notifySubs = (subs: Core.subs): unit => { - // Seed the dirty queue dirtyQueue->Array.push(subs)->ignore - // Process iteratively to avoid stack overflow while dirtyQueue->Array.length > 0 { let currentSubs = dirtyQueue->Array.pop switch currentSubs { | None => () | Some(s) => - // Walk subscriber list let link = ref(s.first) while link.contents !== None { switch link.contents { | Some(l) => - let observer = l.observer - switch observer.kind { - | #Effect => - addToPending(observer) - | #Computed(_) => - if !Core.isDirty(observer) { - Core.setDirty(observer) - // Propagate to the computed's subscribers using direct reference - switch observer.backingSubs { - | Some(backingSubs) => dirtyQueue->Array.push(backingSubs)->ignore - | None => () - } + // The observer field might be a real observer (effect) or a subs (computed) + // We detect by checking if the subs the link came FROM is a computed + let linkedSubs = (Obj.magic(l.observer): Core.subs) + if Core.isComputed(linkedSubs) { + // It's a computed - mark dirty and propagate + if !Core.isSubsDirty(linkedSubs) { + Core.setSubsDirty(linkedSubs) + dirtyQueue->Array.push(linkedSubs)->ignore } + } else { + // It's an effect + let observer = l.observer + addEffectToPending(observer) } link := l.nextSub | None => () @@ -214,46 +285,43 @@ let notifySubs = (subs: Core.subs): unit => { } } - // Trigger flush - if pending->Array.length > 0 && !flushing.contents { + if (pendingEffects->Array.length > 0 || pendingComputedSubs->Array.length > 0) && !flushing.contents { flush() } } -// Ensure a computed signal is fresh before reading (uses subs.computedObserver directly) +// Ensure a computed signal is fresh before reading let ensureComputedFresh = (subs: Core.subs): unit => { - switch subs.computedObserver { - | Some(observer) => - if Core.isDirty(observer) { - let oldLevel = observer.level - - Core.clearDeps(observer) - - let prev = currentObserver.contents - currentObserver := Some(observer) - - try { - observer.run() - Core.clearDirty(observer) - currentObserver := prev - } catch { - | exn => - currentObserver := prev - throw(exn) - } + if Core.isComputed(subs) && Core.isSubsDirty(subs) { + let oldLevel = subs.level + + Core.clearSubsDeps(subs) + + let prev = currentComputedSubs.contents + currentComputedSubs := Some(subs) - // Only recompute level on first run - if oldLevel == 0 { - observer.level = computeLevel(observer) + try { + switch subs.compute { + | Some(compute) => compute() + | None => () } + Core.clearSubsDirty(subs) + currentComputedSubs := prev + } catch { + | exn => + currentComputedSubs := prev + throw(exn) + } + + if oldLevel == 0 { + subs.level = computeSubsLevel(subs) } - | None => () } } -// Schedule an observer for execution +// Schedule an effect for execution let schedule = (observer: Core.observer): unit => { - addToPending(observer) + addEffectToPending(observer) if !flushing.contents { flush() } @@ -268,7 +336,7 @@ let batch = fn => { let result = fn() if !wasFlushing { flushing := false - if pending->Array.length > 0 { + if pendingEffects->Array.length > 0 || pendingComputedSubs->Array.length > 0 { flush() } } @@ -284,30 +352,19 @@ let batch = fn => { // Execute without tracking dependencies let untrack = (fn: unit => 'a): 'a => { - let prev = currentObserver.contents + let prevComputed = currentComputedSubs.contents + let prevObserver = currentObserver.contents + currentComputedSubs := None currentObserver := None try { let result = fn() - currentObserver := prev + currentComputedSubs := prevComputed + currentObserver := prevObserver result } catch { | exn => - currentObserver := prev + currentComputedSubs := prevComputed + currentObserver := prevObserver throw(exn) } } - -// Register a computed's observer on subs (no Map needed, backingSubs is on observer) -let registerComputed = (_signalId: int, observer: Core.observer, subs: Core.subs): unit => { - subs.computedObserver = Some(observer) -} - -// Unregister a computed (for disposal) -let unregisterComputed = (_signalId: int, subs: Core.subs): unit => { - switch subs.computedObserver { - | Some(observer) => - Core.clearDeps(observer) - subs.computedObserver = None - | None => () - } -} diff --git a/src/signals/Signals__Signal.res b/src/signals/Signals__Signal.res index 1f92e90..f0aaedc 100644 --- a/src/signals/Signals__Signal.res +++ b/src/signals/Signals__Signal.res @@ -37,15 +37,13 @@ let makeForComputed = (initialValue: 'a, ~name: option=?): t<'a> => { } } +// Optimized get - inlined hot path checks let get = (signal: t<'a>): 'a => { - // Ensure computed is fresh (no-op for plain signals) + // Ensure computed is fresh Scheduler.ensureComputedFresh(signal.subs) - // Track dependency if we're inside an observer - switch Scheduler.currentObserver.contents { - | Some(observer) => Scheduler.trackDep(observer, signal.subs) - | None => () - } + // Track dependency if we're inside a computed or effect + Scheduler.trackDep(signal.subs) signal.value }