Skip to content

Commit 8eab056

Browse files
samwilliskevin-dp
andcommitted
Handle pushed down predicates in Electric collection
Co-authored-by: Kevin De Porre <kevin@electric-sql.com> Co-authored-by: Sam Willis <sam.willis@gmail.com>
1 parent 1c54b1b commit 8eab056

File tree

6 files changed

+1462
-19
lines changed

6 files changed

+1462
-19
lines changed

.changeset/tender-carpets-cheat.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@tanstack/electric-db-collection": patch
3+
---
4+
5+
Handle predicates that are pushed down.

packages/electric-db-collection/src/electric.ts

Lines changed: 75 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,15 @@ import {
1212
TimeoutWaitingForMatchError,
1313
TimeoutWaitingForTxIdError,
1414
} from "./errors"
15+
import { compileSQL } from "./sql-compiler"
1516
import type {
1617
BaseCollectionConfig,
1718
CollectionConfig,
1819
DeleteMutationFnParams,
1920
InsertMutationFnParams,
21+
LoadSubsetOptions,
2022
SyncConfig,
23+
SyncMode,
2124
UpdateMutationFnParams,
2225
UtilsRecord,
2326
} from "@tanstack/db"
@@ -72,6 +75,24 @@ type InferSchemaOutput<T> = T extends StandardSchemaV1
7275
: Record<string, unknown>
7376
: Record<string, unknown>
7477

78+
/**
79+
* The mode of sync to use for the collection.
80+
* @default `eager`
81+
* @description
82+
* - `eager`:
83+
* - syncs all data immediately on preload
84+
* - collection will be marked as ready once the sync is complete
85+
* - there is no incremental sync
86+
* - `on-demand`:
87+
* - syncs data in incremental snapshots when the collection is queried
88+
* - collection will be marked as ready immediately after the first snapshot is synced
89+
* - `progressive`:
90+
* - syncs all data for the collection in the background
91+
* - uses incremental snapshots during the initial sync to provide a fast path to the data required for queries
92+
* - collection will be marked as ready once the full sync is complete
93+
*/
94+
export type ElectricSyncMode = SyncMode | `progressive`
95+
7596
/**
7697
* Configuration interface for Electric collection options
7798
* @template T - The type of items in the collection
@@ -82,12 +103,13 @@ export interface ElectricCollectionConfig<
82103
TSchema extends StandardSchemaV1 = never,
83104
> extends Omit<
84105
BaseCollectionConfig<T, string | number, TSchema, UtilsRecord, any>,
85-
`onInsert` | `onUpdate` | `onDelete`
106+
`onInsert` | `onUpdate` | `onDelete` | `syncMode`
86107
> {
87108
/**
88109
* Configuration options for the ElectricSQL ShapeStream
89110
*/
90111
shapeOptions: ShapeStreamOptions<GetExtensions<T>>
112+
syncMode?: ElectricSyncMode
91113

92114
/**
93115
* Optional asynchronous handler function called before an insert operation
@@ -281,6 +303,9 @@ export function electricCollectionOptions(
281303
} {
282304
const seenTxids = new Store<Set<Txid>>(new Set([]))
283305
const seenSnapshots = new Store<Array<PostgresSnapshot>>([])
306+
const internalSyncMode = config.syncMode ?? `eager`
307+
const finalSyncMode =
308+
internalSyncMode === `progressive` ? `on-demand` : internalSyncMode
284309
const pendingMatches = new Store<
285310
Map<
286311
string,
@@ -331,6 +356,7 @@ export function electricCollectionOptions(
331356
const sync = createElectricSync<any>(config.shapeOptions, {
332357
seenTxids,
333358
seenSnapshots,
359+
syncMode: internalSyncMode,
334360
pendingMatches,
335361
currentBatchMessages,
336362
removePendingMatches,
@@ -550,6 +576,7 @@ export function electricCollectionOptions(
550576

551577
return {
552578
...restConfig,
579+
syncMode: finalSyncMode,
553580
sync,
554581
onInsert: wrappedOnInsert,
555582
onUpdate: wrappedOnUpdate,
@@ -567,6 +594,7 @@ export function electricCollectionOptions(
567594
function createElectricSync<T extends Row<unknown>>(
568595
shapeOptions: ShapeStreamOptions<GetExtensions<T>>,
569596
options: {
597+
syncMode: ElectricSyncMode
570598
seenTxids: Store<Set<Txid>>
571599
seenSnapshots: Store<Array<PostgresSnapshot>>
572600
pendingMatches: Store<
@@ -590,6 +618,7 @@ function createElectricSync<T extends Row<unknown>>(
590618
const {
591619
seenTxids,
592620
seenSnapshots,
621+
syncMode,
593622
pendingMatches,
594623
currentBatchMessages,
595624
removePendingMatches,
@@ -653,6 +682,12 @@ function createElectricSync<T extends Row<unknown>>(
653682

654683
const stream = new ShapeStream({
655684
...shapeOptions,
685+
// In on-demand mode, we only want to sync changes, so we set the log to `changes_only`
686+
log: syncMode === `on-demand` ? `changes_only` : undefined,
687+
// In on-demand mode, we only need the changes from the point of time the collection was created
688+
// so we default to `now` when there is no saved offset.
689+
offset:
690+
shapeOptions.offset ?? (syncMode === `on-demand` ? `now` : undefined),
656691
signal: abortController.signal,
657692
onError: (errorParams) => {
658693
// Just immediately mark ready if there's an error to avoid blocking
@@ -679,9 +714,11 @@ function createElectricSync<T extends Row<unknown>>(
679714
let transactionStarted = false
680715
const newTxids = new Set<Txid>()
681716
const newSnapshots: Array<PostgresSnapshot> = []
717+
let hasReceivedUpToDate = false // Track if we've completed initial sync in progressive mode
682718

683719
unsubscribeStream = stream.subscribe((messages: Array<Message<T>>) => {
684720
let hasUpToDate = false
721+
let hasSnapshotEnd = false
685722

686723
for (const message of messages) {
687724
// Add message to current batch buffer (for race condition handling)
@@ -746,6 +783,7 @@ function createElectricSync<T extends Row<unknown>>(
746783
})
747784
} else if (isSnapshotEndMessage(message)) {
748785
newSnapshots.push(parseSnapshotMessage(message))
786+
hasSnapshotEnd = true
749787
} else if (isUpToDateMessage(message)) {
750788
hasUpToDate = true
751789
} else if (isMustRefetchMessage(message)) {
@@ -761,12 +799,14 @@ function createElectricSync<T extends Row<unknown>>(
761799

762800
truncate()
763801

764-
// Reset hasUpToDate so we continue accumulating changes until next up-to-date
802+
// Reset flags so we continue accumulating changes until next up-to-date
765803
hasUpToDate = false
804+
hasSnapshotEnd = false
805+
hasReceivedUpToDate = false // Reset for progressive mode - we're starting a new sync
766806
}
767807
}
768808

769-
if (hasUpToDate) {
809+
if (hasUpToDate || hasSnapshotEnd) {
770810
// Clear the current batch buffer since we're now up-to-date
771811
currentBatchMessages.setState(() => [])
772812

@@ -776,8 +816,15 @@ function createElectricSync<T extends Row<unknown>>(
776816
transactionStarted = false
777817
}
778818

779-
// Mark the collection as ready now that sync is up to date
780-
markReady()
819+
if (hasUpToDate || (hasSnapshotEnd && syncMode === `on-demand`)) {
820+
// Mark the collection as ready now that sync is up to date
821+
markReady()
822+
}
823+
824+
// Track that we've received the first up-to-date for progressive mode
825+
if (hasUpToDate) {
826+
hasReceivedUpToDate = true
827+
}
781828

782829
// Always commit txids when we receive up-to-date, regardless of transaction state
783830
seenTxids.setState((currentTxids) => {
@@ -811,12 +858,29 @@ function createElectricSync<T extends Row<unknown>>(
811858
}
812859
})
813860

814-
// Return the unsubscribe function
815-
return () => {
816-
// Unsubscribe from the stream
817-
unsubscribeStream()
818-
// Abort the abort controller to stop the stream
819-
abortController.abort()
861+
// Only set onLoadSubset if the sync mode is not eager, this indicates to the sync
862+
// layer that it can load more data on demand via the requestSnapshot method when,
863+
// the syncMode = `on-demand` or `progressive`
864+
const loadSubset =
865+
syncMode === `eager`
866+
? undefined
867+
: async (opts: LoadSubsetOptions) => {
868+
// In progressive mode, stop requesting snapshots once full sync is complete
869+
if (syncMode === `progressive` && hasReceivedUpToDate) {
870+
return
871+
}
872+
const snapshotParams = compileSQL<T>(opts)
873+
await stream.requestSnapshot(snapshotParams)
874+
}
875+
876+
return {
877+
loadSubset,
878+
cleanup: () => {
879+
// Unsubscribe from the stream
880+
unsubscribeStream()
881+
// Abort the abort controller to stop the stream
882+
abortController.abort()
883+
},
820884
}
821885
},
822886
// Expose the getSyncMetadata function
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
export function serialize(value: unknown): string {
2+
if (typeof value === `string`) {
3+
return `'${value}'`
4+
}
5+
6+
if (typeof value === `number`) {
7+
return value.toString()
8+
}
9+
10+
if (value === null || value === undefined) {
11+
return `NULL`
12+
}
13+
14+
if (typeof value === `boolean`) {
15+
return value ? `true` : `false`
16+
}
17+
18+
if (value instanceof Date) {
19+
return `'${value.toISOString()}'`
20+
}
21+
22+
if (Array.isArray(value)) {
23+
return `ARRAY[${value.map(serialize).join(`,`)}]`
24+
}
25+
26+
throw new Error(`Cannot serialize value: ${JSON.stringify(value)}`)
27+
}
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
import { serialize } from "./pg-serializer"
2+
import type { SubsetParams } from "@electric-sql/client"
3+
import type { IR, LoadSubsetOptions } from "@tanstack/db"
4+
5+
export type CompiledSqlRecord = Omit<SubsetParams, `params`> & {
6+
params?: Array<unknown>
7+
}
8+
9+
export function compileSQL<T>(options: LoadSubsetOptions): SubsetParams {
10+
const { where, orderBy, limit } = options
11+
12+
const params: Array<T> = []
13+
const compiledSQL: CompiledSqlRecord = { params }
14+
15+
if (where) {
16+
// TODO: this only works when the where expression's PropRefs directly reference a column of the collection
17+
// doesn't work if it goes through aliases because then we need to know the entire query to be able to follow the reference until the base collection (cf. followRef function)
18+
compiledSQL.where = compileBasicExpression(where, params)
19+
}
20+
21+
if (orderBy) {
22+
compiledSQL.orderBy = compileOrderBy(orderBy, params)
23+
}
24+
25+
if (limit) {
26+
compiledSQL.limit = limit
27+
}
28+
29+
// Serialize the values in the params array into PG formatted strings
30+
// and transform the array into a Record<string, string>
31+
const paramsRecord = params.reduce(
32+
(acc, param, index) => {
33+
acc[`${index + 1}`] = serialize(param)
34+
return acc
35+
},
36+
{} as Record<string, string>
37+
)
38+
39+
return {
40+
...compiledSQL,
41+
params: paramsRecord,
42+
}
43+
}
44+
45+
/**
46+
* Compiles the expression to a SQL string and mutates the params array with the values.
47+
* @param exp - The expression to compile
48+
* @param params - The params array
49+
* @returns The compiled SQL string
50+
*/
51+
function compileBasicExpression(
52+
exp: IR.BasicExpression<unknown>,
53+
params: Array<unknown>
54+
): string {
55+
switch (exp.type) {
56+
case `val`:
57+
params.push(exp.value)
58+
return `$${params.length}`
59+
case `ref`:
60+
// TODO: doesn't yet support JSON(B) values which could be accessed with nested props
61+
if (exp.path.length !== 1) {
62+
throw new Error(
63+
`Compiler can't handle nested properties: ${exp.path.join(`.`)}`
64+
)
65+
}
66+
return exp.path[0]!
67+
case `func`:
68+
return compileFunction(exp, params)
69+
default:
70+
throw new Error(`Unknown expression type`)
71+
}
72+
}
73+
74+
function compileOrderBy(orderBy: IR.OrderBy, params: Array<unknown>): string {
75+
const compiledOrderByClauses = orderBy.map((clause: IR.OrderByClause) =>
76+
compileOrderByClause(clause, params)
77+
)
78+
return compiledOrderByClauses.join(`,`)
79+
}
80+
81+
function compileOrderByClause(
82+
clause: IR.OrderByClause,
83+
params: Array<unknown>
84+
): string {
85+
// TODO: what to do with stringSort and locale?
86+
// Correctly supporting them is tricky as it depends on Postgres' collation
87+
const { expression, compareOptions } = clause
88+
let sql = compileBasicExpression(expression, params)
89+
90+
if (compareOptions.direction === `desc`) {
91+
sql = `${sql} DESC`
92+
}
93+
94+
if (compareOptions.nulls === `first`) {
95+
sql = `${sql} NULLS FIRST`
96+
}
97+
98+
if (compareOptions.nulls === `last`) {
99+
sql = `${sql} NULLS LAST`
100+
}
101+
102+
return sql
103+
}
104+
105+
function compileFunction(
106+
exp: IR.Func<unknown>,
107+
params: Array<unknown> = []
108+
): string {
109+
const { name, args } = exp
110+
111+
const opName = getOpName(name)
112+
113+
const compiledArgs = args.map((arg: IR.BasicExpression) =>
114+
compileBasicExpression(arg, params)
115+
)
116+
117+
if (isBinaryOp(name)) {
118+
if (compiledArgs.length !== 2) {
119+
throw new Error(`Binary operator ${name} expects 2 arguments`)
120+
}
121+
const [lhs, rhs] = compiledArgs
122+
return `${lhs} ${opName} ${rhs}`
123+
}
124+
125+
return `${opName}(${compiledArgs.join(`,`)})`
126+
}
127+
128+
function isBinaryOp(name: string): boolean {
129+
const binaryOps = [`eq`, `gt`, `gte`, `lt`, `lte`, `and`, `or`]
130+
return binaryOps.includes(name)
131+
}
132+
133+
function getOpName(name: string): string {
134+
const opNames = {
135+
eq: `=`,
136+
gt: `>`,
137+
gte: `>=`,
138+
lt: `<`,
139+
lte: `<=`,
140+
add: `+`,
141+
and: `AND`,
142+
or: `OR`,
143+
not: `NOT`,
144+
isUndefined: `IS NULL`,
145+
isNull: `IS NULL`,
146+
in: `IN`,
147+
like: `LIKE`,
148+
ilike: `ILIKE`,
149+
upper: `UPPER`,
150+
lower: `LOWER`,
151+
length: `LENGTH`,
152+
concat: `CONCAT`,
153+
coalesce: `COALESCE`,
154+
}
155+
156+
const opName = opNames[name as keyof typeof opNames]
157+
158+
if (!opName) {
159+
throw new Error(`Unknown operator/function: ${name}`)
160+
}
161+
162+
return opName
163+
}

0 commit comments

Comments
 (0)