1
1
import elasticsearch from '@elastic/elasticsearch'
2
+ import debug from 'debug'
2
3
import Redis from 'ioredis'
3
4
import _ from 'lodash/fp.js'
4
5
import {
6
+ assertEventually ,
5
7
initState as initRedisAndMongoState ,
6
8
numDocs ,
7
9
} from 'mongochangestream-testing'
8
10
import { type Db , MongoClient } from 'mongodb'
9
11
import ms from 'ms'
10
12
import { setTimeout } from 'node:timers/promises'
11
- import { describe , expect , test } from 'vitest'
12
- import debug from 'debug'
13
+ import { describe , test } from 'vitest'
14
+
15
+ import { initSync , SyncOptions } from './index.js'
13
16
14
17
// Output via console.info (stdout) instead of stderr.
15
18
// Without this debug statements are swallowed by vitest.
16
19
debug . log = console . info . bind ( console )
17
20
18
- import { initSync , SyncOptions } from './index.js'
19
-
20
21
const index = 'testing'
21
22
22
23
const getConns = _ . memoize ( async ( ) => {
@@ -79,13 +80,14 @@ describe.sequential('syncCollection', () => {
79
80
await initElasticState ( sync , db )
80
81
81
82
const initialScan = await sync . runInitialScan ( )
82
- // Wait for initial scan to complete
83
83
await initialScan . start ( )
84
- await setTimeout ( ms ( '1s' ) )
84
+ // Test that all of the records are eventually synced.
85
+ await assertEventually ( async ( ) => {
86
+ const countResponse = await elasticClient . count ( { index } )
87
+ return countResponse . count == numDocs
88
+ } , `Less than ${ numDocs } records were processed` )
85
89
// Stop
86
90
await initialScan . stop ( )
87
- const countResponse = await elasticClient . count ( { index } )
88
- expect ( countResponse . count ) . toBe ( numDocs )
89
91
} )
90
92
test ( 'should process records via change stream' , async ( ) => {
91
93
const { coll, db, elasticClient } = await getConns ( )
@@ -95,18 +97,20 @@ describe.sequential('syncCollection', () => {
95
97
96
98
const changeStream = await sync . processChangeStream ( )
97
99
changeStream . start ( )
100
+ // Give change stream time to connect.
98
101
await setTimeout ( ms ( '1s' ) )
99
102
const date = new Date ( )
100
103
// Update records
101
104
coll . updateMany ( { } , { $set : { createdAt : date } } )
102
- // Wait for the change stream events to be processed
103
- await setTimeout ( ms ( '2s' ) )
104
- const countResponse = await elasticClient . count ( {
105
- index,
106
- query : { range : { createdAt : { gte : date } } } ,
107
- } )
105
+ // Test that all of the records are eventually synced.
106
+ await assertEventually ( async ( ) => {
107
+ const countResponse = await elasticClient . count ( {
108
+ index,
109
+ query : { range : { createdAt : { gte : date } } } ,
110
+ } )
111
+ return countResponse . count == numDocs
112
+ } , `Less than ${ numDocs } records were processed` )
108
113
// Stop
109
114
await changeStream . stop ( )
110
- expect ( countResponse . count ) . toBe ( numDocs )
111
115
} )
112
116
} )
0 commit comments