Skip to content

Commit

Permalink
core/remote/watcher: Fetch directories one by one (#2207)
Browse files Browse the repository at this point in the history
When a directory is re-included in a Desktop client's synchronization,
we need to manually fetch its content.

We used to do so sub-directory by sub-directory but this would require
a lot of fetch requests from the client on a large Cozy and could lead
to client crashes.
We then tried to fetch them layer by layer using a more complex Mango
query. Unfortunately, these requests could not make use of CouchDB
indexes and would be so slow on a large Cozy that we would never get a
response before the request would time out.

We're now trying a different approach. We'll fetch these
sub-directories one by one but we won't fetch all of them before
processing them. Instead, we'll merge a directory's direct content
before fetching the next sibling.
Sub-directories fetched in this process will be marked as needing a
content fetch so we can resume from where we stopped in case the
client is stopped.
  • Loading branch information
taratatach authored Mar 9, 2022
2 parents f7fcd0f + 522b04c commit fa5b473
Show file tree
Hide file tree
Showing 10 changed files with 257 additions and 328 deletions.
15 changes: 10 additions & 5 deletions core/metadata.js
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ export type Metadata = {
fileid?: string,
moveFrom?: SavedMetadata,
cozyMetadata?: Object,
metadata?: Object
metadata?: Object,
needsContentFetching: boolean
}
export type SavedMetadata = PouchRecord & Metadata
Expand Down Expand Up @@ -244,7 +245,8 @@ function fromRemoteDir(remoteDir /*: MetadataRemoteDir */) /*: Metadata */ {
docType: localDocType(remoteDir.type),
path: pathUtils.remoteToLocal(remoteDir.path),
created_at: timestamp.roundedRemoteDate(remoteDir.created_at),
updated_at: timestamp.roundedRemoteDate(remoteDir.updated_at)
updated_at: timestamp.roundedRemoteDate(remoteDir.updated_at),
needsContentFetching: false
}

const fields = Object.getOwnPropertyNames(remoteDir).filter(
Expand Down Expand Up @@ -276,7 +278,8 @@ function fromRemoteFile(remoteFile /*: MetadataRemoteFile */) /*: Metadata */ {
size: parseInt(remoteFile.size, 10),
executable: !!remoteFile.executable,
created_at: timestamp.roundedRemoteDate(remoteFile.created_at),
updated_at: timestamp.roundedRemoteDate(remoteFile.updated_at)
updated_at: timestamp.roundedRemoteDate(remoteFile.updated_at),
needsContentFetching: false
}

const fields = Object.getOwnPropertyNames(remoteFile).filter(
Expand Down Expand Up @@ -808,7 +811,8 @@ function buildDir(
docType: 'folder',
updated_at: stats.mtime.toISOString(),
ino: stats.ino,
tags: []
tags: [],
needsContentFetching: false
}
// FIXME: we should probably not set remote at this point
if (remote) {
Expand Down Expand Up @@ -845,7 +849,8 @@ function buildFile(
class: className,
size,
executable,
tags: []
tags: [],
needsContentFetching: false
}
// FIXME: we should probably not set remote at this point
if (remote) {
Expand Down
17 changes: 16 additions & 1 deletion core/pouch/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,10 @@ class Pouch {
return results.rows.map(row => row.doc)
}

async needingContentFetching() /*: Promise<SavedMetadata[]> */ {
return await this.getAll('needsContentFetching')
}

/* Views */

// Create all required views in the database
Expand All @@ -430,7 +434,8 @@ class Pouch {
this.addByPathView,
this.addByLocalPathView,
this.addByChecksumView,
this.addByRemoteIdView
this.addByRemoteIdView,
this.addNeedsContentFetchingView
],
err => {
if (err) reject(err)
Expand Down Expand Up @@ -536,6 +541,16 @@ class Pouch {
await this.createDesignDoc('byRemoteId', query)
}

async addNeedsContentFetchingView() {
const query = function(doc) {
if (doc.needsContentFetching && !doc.trashed) {
// $FlowFixMe
return emit(doc._id) // eslint-disable-line no-undef
}
}.toString()
await this.createDesignDoc('needsContentFetching', query)
}

// Create or update given design doc
async createDesignDoc(name /*: string */, query /*: string */) {
const doc = {
Expand Down
45 changes: 9 additions & 36 deletions core/remote/cozy.js
Original file line number Diff line number Diff line change
Expand Up @@ -444,64 +444,37 @@ class RemoteCozy {
return results[0]
}

// XXX: This only fetches the direct children of a directory, not children of
// sub-directories.
async getDirectoryContent(
dir /*: RemoteDir */,
{ client } /*: { client: ?CozyClient } */ = {}
) /*: Promise<$ReadOnlyArray<MetadataRemoteInfo>> */ {
client = client || (await this.newClient())

let dirContent = []
let nextDirs = [dir]
while (nextDirs.length) {
const nestedContent = await this.getDirectoriesContent(nextDirs, {
client
})
dirContent = dirContent.concat(nestedContent.docs)
nextDirs = nestedContent.nextDirs
}

return dirContent.sort((a, b) => {
if (a.path < b.path) return -1
if (a.path > b.path) return 1
return 0
})
}

async getDirectoriesContent(
dirs /*: $ReadOnlyArray<RemoteDir> */,
{ client } /*: { client: CozyClient } */
) /*: Promise<{ nextDirs: $ReadOnlyArray<MetadataRemoteDir>, docs: $ReadOnlyArray<MetadataRemoteInfo> }> */ {
const queryDef = Q(FILES_DOCTYPE)
.where({
dir_id: { $in: dirs.map(dir => dir._id) },
dir_id: dir._id,
name: { $gt: null }
})
.indexFields(['dir_id', 'name'])
.sortBy([{ dir_id: 'asc' }, { name: 'asc' }])
.limitBy(3000)

const resp = await this.queryAll(queryDef, { client })
const { data } = await this.queryAll(queryDef, { client })

const nextDirs = []
const docs = []
for (const j of resp.data) {
const remoteDocs = []
for (const j of data) {
const remoteJson = jsonApiToRemoteJsonDoc(j)
if (remoteJson._deleted) continue

const parentDir = dirs.find(
dir => dir._id === remoteJson.attributes.dir_id
)
const remoteDoc = await this.toRemoteDoc(remoteJson, parentDir)
const remoteDoc = await this.toRemoteDoc(remoteJson, dir)

if (!this.isExcludedDirectory(remoteDoc)) {
docs.push(remoteDoc)

if (remoteDoc.type === DIR_TYPE) {
nextDirs.push(remoteDoc)
}
remoteDocs.push(remoteDoc)
}
}
return { nextDirs, docs }
return remoteDocs
}

async queryAll(queryDef /*: Q */, { client } /*: { client: CozyClient } */) {
Expand Down
125 changes: 87 additions & 38 deletions core/remote/watcher/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,17 @@ const folderMightHaveBeenExcluded = (
return metadata.extractRevNumber(remoteDir) > 2
}

const needsContentFetching = (
remoteDoc /*: MetadataRemoteInfo|RemoteDeletion */,
{ isRecursiveFetch = false } /*: { isRecursiveFetch: boolean } */ = {}
) /*: boolean %checks */ => {
return (
!remoteDoc._deleted &&
remoteDoc.type === 'directory' &&
(folderMightHaveBeenExcluded(remoteDoc) || isRecursiveFetch)
)
}

/** Get changes from the remote Cozy and prepare them for merge */
class RemoteWatcher {
/*::
Expand Down Expand Up @@ -160,6 +171,7 @@ class RemoteWatcher {
const release = await this.pouch.lock(this)
try {
this.events.emit('buffering-start')

const seq = await this.pouch.getRemoteSeq()
const { last_seq, docs, isInitialFetch } = await this.remoteCozy.changes(
seq
Expand All @@ -168,12 +180,19 @@ class RemoteWatcher {

if (docs.length === 0) {
log.debug('No remote changes for now')
await this.fetchReincludedContent()
return
}

this.events.emit('remote-start')
this.events.emit('buffering-end')
await this.pullMany(docs, { isInitialFetch })

if (isInitialFetch) {
await this.processRemoteChanges(docs, { isInitialFetch })
} else {
await this.processRemoteChanges(docs)
await this.fetchReincludedContent()
}

let target = -1
target = (await this.pouch.db.changes({ limit: 1, descending: true }))
Expand All @@ -193,6 +212,24 @@ class RemoteWatcher {
}
}

async fetchReincludedContent() {
let dirs = await this.pouch.needingContentFetching()

while (dirs.length) {
for (const dir of dirs) {
log.trace({ path: dir.path }, 'Fetching content of unknown folder...')
const children = await this.remoteCozy.getDirectoryContent(dir.remote)

await this.processRemoteChanges(children, { isRecursiveFetch: true })

dir.needsContentFetching = false
await this.pouch.put(dir)
}

dirs = await this.pouch.needingContentFetching()
}
}

async olds(
remoteDocs /*: $ReadOnlyArray<MetadataRemoteInfo|RemoteDeletion> */
) /*: Promise<SavedMetadata[]> */ {
Expand All @@ -203,52 +240,40 @@ class RemoteWatcher {
return await this.pouch.allByRemoteIds(remoteIds)
}

/** Pull multiple changed or deleted docs
*
* FIXME: Misleading method name?
/** Process multiple changed or deleted docs
*/
async pullMany(
async processRemoteChanges(
docs /*: $ReadOnlyArray<MetadataRemoteInfo|RemoteDeletion> */,
{ isInitialFetch } /*: { isInitialFetch: boolean } */
{
isInitialFetch = false,
isRecursiveFetch = false
} /*: { isInitialFetch?: boolean, isRecursiveFetch?: boolean } */ = {}
) /*: Promise<void> */ {
let changes = await this.analyse(docs, await this.olds(docs))

for (const change of changes) {
if (
!isInitialFetch &&
change.type === 'DirAddition' &&
folderMightHaveBeenExcluded(change.doc.remote)
) {
log.trace(
{ path: change.doc.path },
'Fetching content of unknown folder...'
)
const children = (await this.remoteCozy.getDirectoryContent(
change.doc.remote
)).filter(child =>
docs.every(doc => doc._id !== child._id || doc._rev !== child._rev)
)
const childChanges = await this.analyse(
children,
await this.olds(children)
)
changes = changes.concat(childChanges)
}
}
let changes = await this.analyse(docs, await this.olds(docs), {
isInitialFetch,
isRecursiveFetch
})

log.trace('Apply changes...')
const errors = await this.applyAll(changes)
if (errors.length) throw errors[0].err

log.trace('Done with pull.')
log.trace('Done with changes processing.')
}

async analyse(
remoteDocs /*: $ReadOnlyArray<MetadataRemoteInfo|RemoteDeletion> */,
olds /*: SavedMetadata[] */
olds /*: SavedMetadata[] */,
{
isInitialFetch = false,
isRecursiveFetch = false
} /*: { isInitialFetch?: boolean, isRecursiveFetch?: boolean } */ = {}
) /*: Promise<RemoteChange[]> */ {
log.trace('Contextualize and analyse changesfeed results...')
const changes = this.identifyAll(remoteDocs, olds)
const changes = this.identifyAll(remoteDocs, olds, {
isInitialFetch,
isRecursiveFetch
})
log.trace('Done with analysis.')

remoteChange.sortByPath(changes)
Expand All @@ -268,15 +293,24 @@ class RemoteWatcher {

identifyAll(
remoteDocs /*: $ReadOnlyArray<MetadataRemoteInfo|RemoteDeletion> */,
olds /*: SavedMetadata[] */
olds /*: SavedMetadata[] */,
{
isInitialFetch = false,
isRecursiveFetch = false
} /*: { isInitialFetch?: boolean, isRecursiveFetch?: boolean } */ = {}
) {
const changes /*: Array<RemoteChange> */ = []
const originalMoves = []

const oldsByRemoteId = _.keyBy(olds, 'remote._id')
for (const remoteDoc of remoteDocs) {
const was /*: ?SavedMetadata */ = oldsByRemoteId[remoteDoc._id]
changes.push(this.identifyChange(remoteDoc, was, changes, originalMoves))
changes.push(
this.identifyChange(remoteDoc, was, changes, originalMoves, {
isInitialFetch,
isRecursiveFetch
})
)
}

return changes
Expand All @@ -286,7 +320,11 @@ class RemoteWatcher {
remoteDoc /*: MetadataRemoteInfo|RemoteDeletion */,
was /*: ?SavedMetadata */,
previousChanges /*: Array<RemoteChange> */,
originalMoves /*: Array<RemoteDirMove|RemoteDescendantChange> */
originalMoves /*: Array<RemoteDirMove|RemoteDescendantChange> */,
{
isInitialFetch = false,
isRecursiveFetch = false
} /*: { isInitialFetch?: boolean, isRecursiveFetch?: boolean } */ = {}
) /*: RemoteChange */ {
const oldpath /*: ?string */ = was ? was.path : undefined
log.debug(
Expand Down Expand Up @@ -334,7 +372,8 @@ class RemoteWatcher {
remoteDoc,
was,
previousChanges,
originalMoves
originalMoves,
{ isInitialFetch, isRecursiveFetch }
)
}
}
Expand All @@ -354,7 +393,11 @@ class RemoteWatcher {
remoteDoc /*: MetadataRemoteInfo */,
was /*: ?SavedMetadata */,
previousChanges /*: Array<RemoteChange> */,
originalMoves /*: Array<RemoteDirMove|RemoteDescendantChange> */
originalMoves /*: Array<RemoteDirMove|RemoteDescendantChange> */,
{
isInitialFetch = false,
isRecursiveFetch = false
} /*: { isInitialFetch?: boolean, isRecursiveFetch?: boolean } */ = {}
) /*: RemoteChange */ {
const doc /*: Metadata */ = metadata.fromRemoteDoc(remoteDoc)
try {
Expand Down Expand Up @@ -426,6 +469,12 @@ class RemoteWatcher {
}

if (!was || inRemoteTrash(was.remote)) {
if (!isInitialFetch) {
doc.needsContentFetching = needsContentFetching(doc.remote, {
isRecursiveFetch
})
}

return remoteChange.added(doc)
} else if (metadata.samePath(was, doc)) {
if (
Expand Down
Loading

0 comments on commit fa5b473

Please sign in to comment.