-
Notifications
You must be signed in to change notification settings - Fork 20
/
AsyncReactivityTracker.ts
140 lines (116 loc) · 3.82 KB
/
AsyncReactivityTracker.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import { maybePromiseAll } from "../../helpers/maybeWaitForPromises"
import { randomId } from "../../helpers/randomId"
import {
Bounds,
getPrefixContainingBounds,
isTupleWithinBounds,
} from "../../helpers/sortedTupleArray"
import { InMemoryTupleStorage } from "../../storage/InMemoryTupleStorage"
import { MIN, ScanStorageArgs, Tuple, WriteOps } from "../../storage/types"
import { TupleStorageApi } from "../sync/types"
import { TxId } from "../types"
import { AsyncCallback } from "./asyncTypes"
export class AsyncReactivityTracker {
private listenersDb = new InMemoryTupleStorage()
subscribe(args: ScanStorageArgs, callback: AsyncCallback) {
return subscribe(this.listenersDb, args, callback)
}
computeReactivityEmits(writes: WriteOps) {
return getReactivityEmits(this.listenersDb, writes)
}
async emit(emits: ReactivityEmits, txId: TxId) {
let promises: any[] = []
for (const [callback, writes] of emits.entries()) {
try {
// Catch sync callbacks.
promises.push(callback(writes, txId))
} catch (error) {
console.error(error)
}
}
// This trick allows us to return a Promise from a sync TupleDatabase#commit
// when there are async callbacks. And this allows us to create an async client
// on top of a sync client.
return maybePromiseAll(promises)
}
}
type AsyncListener = { callback: AsyncCallback; bounds: Bounds }
function iterateTuplePrefixes(tuple: Tuple) {
const prefixes: Tuple[] = [tuple]
for (let i = 0; i < tuple.length; i++) {
const prefix = tuple.slice(0, i)
prefixes.push(prefix)
}
return prefixes
}
/** Query the listenersDb based on tuple prefixes. */
function getListenersForTuplePrefix(
listenersDb: TupleStorageApi,
tuple: Tuple
) {
const listeners: AsyncListener[] = []
// Look for listeners at each prefix of the tuple.
for (const prefix of iterateTuplePrefixes(tuple)) {
const results = listenersDb.scan({
gte: [prefix],
lte: [[...prefix, MIN]],
})
for (const { value } of results) {
listeners.push(value)
}
}
return listeners
}
/** Query the listenersDb based on tuple prefixes, and additionally check for query bounds. */
function getListenerCallbacksForTuple(
listenersDb: TupleStorageApi,
tuple: Tuple
) {
const callbacks: AsyncCallback[] = []
// Check that the tuple is within the absolute bounds of the query.
for (const listener of getListenersForTuplePrefix(listenersDb, tuple)) {
const { callback, bounds } = listener
if (isTupleWithinBounds(tuple, bounds)) {
callbacks.push(callback)
} else {
// TODO: track how in-efficient listeners are here.
// NOTE: the bounds may only partially span the prefix.
}
}
return callbacks
}
type ReactivityEmits = Map<AsyncCallback, Required<WriteOps>>
function getReactivityEmits(listenersDb: TupleStorageApi, writes: WriteOps) {
const emits: ReactivityEmits = new Map()
for (const { key, value } of writes.set || []) {
const callbacks = getListenerCallbacksForTuple(listenersDb, key)
for (const callback of callbacks) {
if (!emits.has(callback)) emits.set(callback, { set: [], remove: [] })
emits.get(callback)!.set.push({ key, value })
}
}
for (const tuple of writes.remove || []) {
const callbacks = getListenerCallbacksForTuple(listenersDb, tuple)
for (const callback of callbacks) {
if (!emits.has(callback)) emits.set(callback, { set: [], remove: [] })
emits.get(callback)!.remove.push(tuple)
}
}
return emits
}
function subscribe(
listenersDb: TupleStorageApi,
args: ScanStorageArgs,
callback: AsyncCallback
) {
// this.log("db/subscribe", args)
const prefix = getPrefixContainingBounds(args)
const id = randomId()
const value: AsyncListener = { callback, bounds: args }
listenersDb.commit({ set: [{ key: [prefix, id], value }] })
const unsubscribe = () => {
// this.log("db/unsubscribe", args)
listenersDb.commit({ remove: [[prefix, id]] })
}
return unsubscribe
}