Skip to content

Commit

Permalink
core/local/chokidar: Run initial scan step once (#2104)
Browse files Browse the repository at this point in the history
The initial scan step in the Chokidar watcher can be potentially long
when a lot of documents are present in the local synchronization
directory (i.e. there are many events to process).
The step is run as part of the asynchronous events flushing callback
and could thus potentially be run each time events are flushed.

To prevent this, we mark the `initialScan` object (if it exists) as
flushed at the very beginning of the flushing callback.
However, the step itself does not check this attribute before
processing events and so a second flush while an initial scan is being
run could trigger a second initial scan run.

We now check the `flushed` attribute's value before running an initial
scan step and move its assignment to the step itself to co-locate it
with its usage.
  • Loading branch information
taratatach committed Jul 20, 2021
1 parent b866241 commit 33a6ce8
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 80 deletions.
8 changes: 4 additions & 4 deletions core/local/chokidar/analysis.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import type {
LocalFileDeletion,
LocalFileMove
} from './local_change'
import type { InitialScan } from './initial_scan'
import type { InitialScanParams } from './initial_scan'
*/

const log = logger({
Expand All @@ -59,13 +59,13 @@ module.exports = function analysis(
events /*: LocalEvent[] */,
{
pendingChanges,
initialScan
} /*: { pendingChanges: LocalChange[], initialScan: ?InitialScan } */
initialScanParams
} /*: { pendingChanges: LocalChange[], initialScanParams: ?InitialScanParams } */
) /*: LocalChange[] */ {
const changes /*: LocalChange[] */ = analyseEvents(events, pendingChanges)
sortBeforeSquash(changes)
squashMoves(changes)
sortChanges(changes, initialScan != null)
sortChanges(changes, initialScanParams != null)
return separatePendingChanges(changes, pendingChanges)
}

Expand Down
68 changes: 34 additions & 34 deletions core/local/chokidar/initial_scan.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import type LocalEventBuffer from './event_buffer'
import type { Pouch } from '../../pouch'
import type { SavedMetadata } from '../../metadata'
export type InitialScan = {
export type InitialScanParams = {
paths: string[],
emptyDirRetryCount: number,
flushed: boolean,
Expand All @@ -30,20 +30,20 @@ export type InitialScan = {
export type InitialScanOpts = {
buffer: LocalEventBuffer<ChokidarEvent>,
initialScan: ?InitialScan,
initialScanParams: InitialScanParams,
pouch: Pouch
}
*/

const detectOfflineUnlinkEvents = async (
initialScan /*: InitialScan */,
initialScanParams /*: InitialScanParams */,
pouch /*: Pouch */
) /*: Promise<{offlineEvents: Array<ChokidarEvent>, unappliedMoves: string[], emptySyncDir: boolean}> */ => {
// Try to detect removed files & folders
const events /*: Array<ChokidarEvent> */ = []
const docs /*: SavedMetadata[] */ = await pouch.initialScanDocs()
const inInitialScan = doc =>
initialScan.paths.indexOf(metadata.id(doc.path)) !== -1
initialScanParams.paths.indexOf(metadata.id(doc.path)) !== -1

// the Syncdir is empty error only occurs if there was some docs beforehand
let emptySyncDir = docs.length > NB_OF_DELETABLE_ELEMENT
Expand All @@ -66,39 +66,39 @@ const detectOfflineUnlinkEvents = async (

const step = async (
rawEvents /*: ChokidarEvent[] */,
{ buffer, initialScan, pouch } /*: InitialScanOpts */
) /*: Promise<?Array<ChokidarEvent>> */ => {
let events = rawEvents.filter(e => e.path !== '') // @TODO handle root dir events
if (initialScan != null) {
const paths = initialScan.paths
events
.filter(e => e.type.startsWith('add'))
.forEach(e => paths.push(metadata.id(e.path)))

const {
offlineEvents,
unappliedMoves,
emptySyncDir
} = await detectOfflineUnlinkEvents(initialScan, pouch)
events = offlineEvents.concat(events)

events = events.filter(e => {
return unappliedMoves.indexOf(metadata.id(e.path)) === -1
})

if (emptySyncDir) {
// it is possible this is a temporary faillure (too late mounting)
// push back the events and wait until next flush.
buffer.unflush(rawEvents)
if (--initialScan.emptyDirRetryCount === 0) {
throw new Error(SYNC_DIR_EMPTY_MESSAGE)
}
return initialScan.resolve()
{ buffer, initialScanParams, pouch } /*: InitialScanOpts */
) /*: Promise<Array<ChokidarEvent>> */ => {
// We mark the initial scan as flushed as soon as possible to avoid
// concurrent initial scan processings from later flushes.
initialScanParams.flushed = true
let events = rawEvents

events
.filter(e => e.type.startsWith('add'))
.forEach(e => initialScanParams.paths.push(metadata.id(e.path)))

const {
offlineEvents,
unappliedMoves,
emptySyncDir
} = await detectOfflineUnlinkEvents(initialScanParams, pouch)
events = offlineEvents.concat(events)

events = events.filter(e => {
return unappliedMoves.indexOf(metadata.id(e.path)) === -1
})

if (emptySyncDir) {
// it is possible this is a temporary faillure (too late mounting)
// push back the events and wait until next flush.
buffer.unflush(rawEvents)
if (--initialScanParams.emptyDirRetryCount === 0) {
throw new Error(SYNC_DIR_EMPTY_MESSAGE)
}

log.debug({ initialEvents: events })
return []
}

log.debug({ initialEvents: events }, 'Done with initial scan')
return events
}

Expand Down
8 changes: 4 additions & 4 deletions core/local/chokidar/prepare_events.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ const logger = require('../../utils/logger')

/*::
import type { ChokidarEvent } from './event'
import type { InitialScan } from './initial_scan'
import type { InitialScanParams } from './initial_scan'
import type { LocalEvent } from './local_event'
import type { Metadata } from '../../metadata'
import type { Pouch } from '../../pouch'
type PrepareEventsOpts = {
+checksum: (string) => Promise<string>,
initialScan: ?InitialScan,
initialScanParams: ?InitialScanParams,
pouch: Pouch,
syncPath: string
}
Expand Down Expand Up @@ -71,7 +71,7 @@ const oldMetadata = async (
*/
const step = async (
events /*: ChokidarEvent[] */,
{ checksum, initialScan, pouch, syncPath } /*: PrepareEventsOpts */
{ checksum, initialScanParams, pouch, syncPath } /*: PrepareEventsOpts */
) /*: Promise<LocalEvent[]> */ => {
return Promise.map(
events,
Expand All @@ -86,7 +86,7 @@ const step = async (

if (e.type === 'add' || e.type === 'change') {
if (
initialScan &&
initialScanParams && // FIXME: remove this line so we don't recompute unnecessary checksumss after thee initial scan
e2.old &&
e2.path.normalize() === e2.old.path.normalize() &&
e2.old.local &&
Expand Down
39 changes: 29 additions & 10 deletions core/local/chokidar/watcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import type { Pouch } from '../../pouch'
import type Prep from '../../prep'
import type { Checksumer } from '../checksumer'
import type { ChokidarEvent } from './event'
import type { InitialScan } from './initial_scan'
import type { InitialScanParams } from './initial_scan'
import type { LocalEvent } from './local_event'
import type { LocalChange } from './local_change'
import type EventEmitter from 'events'
Expand All @@ -52,6 +52,10 @@ log.chokidar = log.child({
component: 'Chokidar'
})

function hasPath(event /*: ChokidarEvent */) /*: boolean %checks */ {
return event.path !== ''
}

/**
* This file contains the filesystem watcher that will trigger operations when
* a file or a folder is added/removed/changed locally.
Expand All @@ -64,7 +68,7 @@ class LocalWatcher {
prep: Prep
pouch: Pouch
events: EventEmitter
initialScan: ?InitialScan
initialScanParams: ?InitialScanParams
checksumer: Checksumer
watcher: any // chokidar
buffer: LocalEventBuffer<ChokidarEvent>
Expand Down Expand Up @@ -145,7 +149,8 @@ class LocalWatcher {
path /*: ?string */,
stats /*: ?fs.Stats */
) => {
const isInitialScan = this.initialScan && !this.initialScan.flushed
const isInitialScan =
this.initialScanParams && !this.initialScanParams.flushed
log.chokidar.debug({ path, stats, isInitialScan }, eventType)
const newEvent = chokidarEvent.build(eventType, path, stats)
if (newEvent.type !== eventType) {
Expand All @@ -162,7 +167,7 @@ class LocalWatcher {
// To detect which files&folders have been removed since the last run of
// cozy-desktop, we keep all the paths seen by chokidar during its
// initial scan in @paths to compare them with pouchdb database.
this.initialScan = {
this.initialScanParams = {
paths: [],
emptyDirRetryCount: 3,
resolve,
Expand Down Expand Up @@ -201,13 +206,27 @@ class LocalWatcher {
async onFlush(rawEvents /*: ChokidarEvent[] */) {
log.debug(`Flushed ${rawEvents.length} events`)

if (this.initialScan) this.initialScan.flushed = true
this.events.emit('buffering-end')
syncDir.ensureExistsSync(this)
this.events.emit('local-start')

const events = await initialScan.step(rawEvents, this)
if (!events) return
let events = rawEvents.filter(hasPath) // @TODO handle root dir events
if (events.length > 0) {
// We need to destructure `this` otherwise Flow won't detect that
// `this.initialScanParams` is not null even within the conditional block.
const { buffer, pouch, initialScanParams } = this
if (initialScanParams != null && !initialScanParams.flushed) {
events = await initialScan.step(events, {
initialScanParams,
buffer,
pouch
})
}
}
if (events.length === 0) {
if (this.initialScanParams != null) this.initialScanParams.resolve()
return
}

log.trace('Prepare events...')
const preparedEvents /*: LocalEvent[] */ = await prepareEvents.step(
Expand Down Expand Up @@ -236,9 +255,9 @@ class LocalWatcher {
release()
this.events.emit('local-end')
}
if (this.initialScan != null) {
this.initialScan.resolve()
this.initialScan = null
if (this.initialScanParams != null) {
this.initialScanParams.resolve()
this.initialScanParams = null
}
}

Expand Down
20 changes: 13 additions & 7 deletions test/unit/local/chokidar/analysis.js
Original file line number Diff line number Diff line change
Expand Up @@ -899,7 +899,10 @@ onPlatform('darwin', () => {
]
const pendingChanges = []
should(
analysis(events, { pendingChanges, initialScan: true })
analysis(events, {
pendingChanges,
initialScanParams: { flushed: false }
})
).deepEqual([
{
sideName,
Expand Down Expand Up @@ -1001,7 +1004,10 @@ onPlatform('darwin', () => {
]
const pendingChanges = []
should(
analysis(events, { pendingChanges, initialScan: true })
analysis(events, {
pendingChanges,
initialScanParams: { flushed: false }
})
).deepEqual([
{
sideName,
Expand Down Expand Up @@ -1223,7 +1229,7 @@ onPlatform('darwin', () => {

const changes = analysis(events, {
pendingChanges,
initialScan: false
initialScanParams: undefined
})
changes
.map(change => [
Expand Down Expand Up @@ -1262,7 +1268,7 @@ onPlatform('darwin', () => {

const changes = analysis(events, {
pendingChanges,
initialScan: false
initialScanParams: undefined
})
changes
.map(change => [
Expand Down Expand Up @@ -1298,7 +1304,7 @@ onPlatform('darwin', () => {

const changes = analysis(events, {
pendingChanges,
initialScan: false
initialScanParams: undefined
})
changes
.map(change => [
Expand Down Expand Up @@ -1337,7 +1343,7 @@ onPlatform('darwin', () => {

const changes = analysis(events, {
pendingChanges,
initialScan: false
initialScanParams: undefined
})
changes
.map(change => [
Expand Down Expand Up @@ -1374,7 +1380,7 @@ onPlatform('darwin', () => {

const changes = analysis(events, {
pendingChanges,
initialScan: true
initialScanParams: { flushed: false }
})
changes
.map(change => [change.type, change.path])
Expand Down
4 changes: 2 additions & 2 deletions test/unit/local/chokidar/prepare_events.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ onPlatform('darwin', () => {
const checksum = sinon.spy()
await prepareEvents.step(events, {
checksum,
initialScan: true,
initialScanParams: { flushed: false },
pouch: this.pouch,
syncPath: this.config.syncPath
})
Expand Down Expand Up @@ -114,7 +114,7 @@ onPlatform('darwin', () => {
const checksum = sinon.spy()
await prepareEvents.step(events, {
checksum,
initialScan: true,
initialScanParams: { flushed: false },
pouch: this.pouch,
syncPath: this.config.syncPath
})
Expand Down
Loading

0 comments on commit 33a6ce8

Please sign in to comment.