Skip to content

Commit e8e12e7

Browse files
committed
dont call onLoadMore in progressive mode when up to date
1 parent 581d7cc commit e8e12e7

File tree

3 files changed

+178
-12
lines changed

3 files changed

+178
-12
lines changed

packages/electric-db-collection/src/electric.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -713,6 +713,7 @@ function createElectricSync<T extends Row<unknown>>(
713713
let transactionStarted = false
714714
const newTxids = new Set<Txid>()
715715
const newSnapshots: Array<PostgresSnapshot> = []
716+
let hasReceivedUpToDate = false // Track if we've completed initial sync in progressive mode
716717

717718
unsubscribeStream = stream.subscribe((messages: Array<Message<T>>) => {
718719
let hasUpToDate = false
@@ -797,9 +798,10 @@ function createElectricSync<T extends Row<unknown>>(
797798

798799
truncate()
799800

800-
// Reset hasUpToDate so we continue accumulating changes until next up-to-date
801+
// Reset flags so we continue accumulating changes until next up-to-date
801802
hasUpToDate = false
802803
hasSnapshotEnd = false
804+
hasReceivedUpToDate = false // Reset for progressive mode - we're starting a new sync
803805
}
804806
}
805807

@@ -818,6 +820,11 @@ function createElectricSync<T extends Row<unknown>>(
818820
markReady()
819821
}
820822

823+
// Track that we've received the first up-to-date for progressive mode
824+
if (hasUpToDate) {
825+
hasReceivedUpToDate = true
826+
}
827+
821828
// Always commit txids when we receive up-to-date, regardless of transaction state
822829
seenTxids.setState((currentTxids) => {
823830
const clonedSeen = new Set<Txid>(currentTxids)
@@ -857,6 +864,10 @@ function createElectricSync<T extends Row<unknown>>(
857864
syncMode === `eager`
858865
? undefined
859866
: async (opts: OnLoadMoreOptions) => {
867+
// In progressive mode, stop requesting snapshots once full sync is complete
868+
if (syncMode === `progressive` && hasReceivedUpToDate) {
869+
return
870+
}
860871
const snapshotParams = compileSQL<T>(opts)
861872
await stream.requestSnapshot(snapshotParams)
862873
}

packages/electric-db-collection/tests/electric-live-query.test.ts

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -734,9 +734,30 @@ describe(`Electric Collection with Live Query - syncMode integration`, () => {
734734
const electricCollection =
735735
createElectricCollectionWithSyncMode(`progressive`)
736736

737-
// Initial sync with limited data
738-
simulateInitialSync([sampleUsers[0]!, sampleUsers[1]!]) // Only Alice and Bob
739-
expect(electricCollection.status).toBe(`ready`)
737+
// Send initial snapshot with limited data (using snapshot-end, not up-to-date)
738+
// This keeps the collection in "loading" state, simulating progressive mode still syncing
739+
subscriber([
740+
{
741+
key: sampleUsers[0]!.id.toString(),
742+
value: sampleUsers[0]!,
743+
headers: { operation: `insert` },
744+
},
745+
{
746+
key: sampleUsers[1]!.id.toString(),
747+
value: sampleUsers[1]!,
748+
headers: { operation: `insert` },
749+
},
750+
{
751+
headers: {
752+
control: `snapshot-end`,
753+
xmin: `100`,
754+
xmax: `110`,
755+
xip_list: [],
756+
},
757+
},
758+
])
759+
760+
expect(electricCollection.status).toBe(`loading`) // Still syncing in progressive mode
740761
expect(electricCollection.size).toBe(2)
741762

742763
// Mock requestSnapshot to return additional data
@@ -894,7 +915,19 @@ describe(`Electric Collection with Live Query - syncMode integration`, () => {
894915
const electricCollection =
895916
createElectricCollectionWithSyncMode(`progressive`)
896917

897-
simulateInitialSync([])
918+
// Send snapshot-end (not up-to-date) to keep collection in loading state
919+
subscriber([
920+
{
921+
headers: {
922+
control: `snapshot-end`,
923+
xmin: `100`,
924+
xmax: `110`,
925+
xip_list: [],
926+
},
927+
},
928+
])
929+
930+
expect(electricCollection.status).toBe(`loading`) // Still syncing in progressive mode
898931

899932
// Create live query with complex WHERE clause
900933
createLiveQueryCollection({

packages/electric-db-collection/tests/electric.test.ts

Lines changed: 129 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1682,7 +1682,7 @@ describe(`Electric Integration`, () => {
16821682
)
16831683
})
16841684

1685-
it(`should request incremental snapshots in progressive mode when syncMore is called`, async () => {
1685+
it(`should request incremental snapshots in progressive mode when syncMore is called before sync completes`, async () => {
16861686
vi.clearAllMocks()
16871687

16881688
const config = {
@@ -1700,14 +1700,26 @@ describe(`Electric Integration`, () => {
17001700

17011701
const testCollection = createCollection(electricCollectionOptions(config))
17021702

1703-
// Send up-to-date to mark collection as ready
1703+
// Send initial data with snapshot-end (but not up-to-date yet - still syncing)
17041704
subscriber([
17051705
{
1706-
headers: { control: `up-to-date` },
1706+
key: `1`,
1707+
value: { id: 1, name: `Test User` },
1708+
headers: { operation: `insert` },
1709+
},
1710+
{
1711+
headers: {
1712+
control: `snapshot-end`,
1713+
xmin: `100`,
1714+
xmax: `110`,
1715+
xip_list: [],
1716+
},
17071717
},
17081718
])
17091719

1710-
// In progressive mode, calling syncMore should request a snapshot
1720+
expect(testCollection.status).toBe(`loading`) // Not ready yet
1721+
1722+
// In progressive mode, calling syncMore should request a snapshot BEFORE full sync completes
17111723
await testCollection.syncMore({ limit: 20 })
17121724

17131725
// Verify requestSnapshot was called
@@ -1769,23 +1781,133 @@ describe(`Electric Integration`, () => {
17691781

17701782
const testCollection = createCollection(electricCollectionOptions(config))
17711783

1772-
// Send initial data and up-to-date
1784+
// Send initial data with snapshot-end (but not up-to-date - still syncing)
17731785
subscriber([
17741786
{
17751787
key: `1`,
17761788
value: { id: 1, name: `Initial User` },
17771789
headers: { operation: `insert` },
17781790
},
1791+
{
1792+
headers: {
1793+
control: `snapshot-end`,
1794+
xmin: `100`,
1795+
xmax: `110`,
1796+
xip_list: [],
1797+
},
1798+
},
1799+
])
1800+
1801+
// Collection should have data but not be ready yet
1802+
expect(testCollection.status).toBe(`loading`)
1803+
expect(testCollection.has(1)).toBe(true)
1804+
1805+
// Should be able to request more data incrementally before full sync completes
1806+
await testCollection.syncMore({ limit: 10 })
1807+
expect(mockRequestSnapshot).toHaveBeenCalled()
1808+
1809+
// Now send up-to-date to complete the sync
1810+
subscriber([
17791811
{
17801812
headers: { control: `up-to-date` },
17811813
},
17821814
])
17831815

1784-
// Collection should be ready with initial data
17851816
expect(testCollection.status).toBe(`ready`)
1817+
})
1818+
1819+
it(`should stop requesting snapshots in progressive mode after first up-to-date`, async () => {
1820+
vi.clearAllMocks()
1821+
1822+
const config = {
1823+
id: `progressive-stop-after-sync-test`,
1824+
shapeOptions: {
1825+
url: `http://test-url`,
1826+
params: {
1827+
table: `test_table`,
1828+
},
1829+
},
1830+
syncMode: `progressive` as const,
1831+
getKey: (item: Row) => item.id as number,
1832+
startSync: true,
1833+
}
1834+
1835+
const testCollection = createCollection(electricCollectionOptions(config))
1836+
1837+
// Send initial data with snapshot-end (not up-to-date yet)
1838+
subscriber([
1839+
{
1840+
key: `1`,
1841+
value: { id: 1, name: `User 1` },
1842+
headers: { operation: `insert` },
1843+
},
1844+
{
1845+
headers: {
1846+
control: `snapshot-end`,
1847+
xmin: `100`,
1848+
xmax: `110`,
1849+
xip_list: [],
1850+
},
1851+
},
1852+
])
1853+
1854+
expect(testCollection.status).toBe(`loading`) // Not ready yet in progressive
17861855
expect(testCollection.has(1)).toBe(true)
17871856

1788-
// Should still be able to request more data incrementally
1857+
// Should be able to request more data before up-to-date
1858+
vi.clearAllMocks()
1859+
await testCollection.syncMore({ limit: 10 })
1860+
expect(mockRequestSnapshot).toHaveBeenCalledTimes(1)
1861+
1862+
// Now send up-to-date to complete the full sync
1863+
subscriber([
1864+
{
1865+
headers: { control: `up-to-date` },
1866+
},
1867+
])
1868+
1869+
expect(testCollection.status).toBe(`ready`)
1870+
1871+
// Try to request more data - should NOT make a request since full sync is complete
1872+
vi.clearAllMocks()
1873+
await testCollection.syncMore({ limit: 10 })
1874+
expect(mockRequestSnapshot).not.toHaveBeenCalled()
1875+
})
1876+
1877+
it(`should allow snapshots in on-demand mode even after up-to-date`, async () => {
1878+
vi.clearAllMocks()
1879+
1880+
const config = {
1881+
id: `on-demand-after-sync-test`,
1882+
shapeOptions: {
1883+
url: `http://test-url`,
1884+
params: {
1885+
table: `test_table`,
1886+
},
1887+
},
1888+
syncMode: `on-demand` as const,
1889+
getKey: (item: Row) => item.id as number,
1890+
startSync: true,
1891+
}
1892+
1893+
const testCollection = createCollection(electricCollectionOptions(config))
1894+
1895+
// Send initial data with up-to-date
1896+
subscriber([
1897+
{
1898+
key: `1`,
1899+
value: { id: 1, name: `User 1` },
1900+
headers: { operation: `insert` },
1901+
},
1902+
{
1903+
headers: { control: `up-to-date` },
1904+
},
1905+
])
1906+
1907+
expect(testCollection.status).toBe(`ready`)
1908+
1909+
// Should STILL be able to request more data in on-demand mode
1910+
vi.clearAllMocks()
17891911
await testCollection.syncMore({ limit: 10 })
17901912
expect(mockRequestSnapshot).toHaveBeenCalled()
17911913
})

0 commit comments

Comments
 (0)