From 1f85b758872167ddca2c6dd18fe5d7cef280b466 Mon Sep 17 00:00:00 2001 From: Stuart Miller Date: Thu, 8 Jun 2017 13:30:19 +1200 Subject: [PATCH 1/3] Write tar files to a stream instead of to the filesystem first. This improves performance when writing to a stream (i.e. there is no need to wait for the contents to be written to disk first), and can help solve EMFILE errors. This is achieved by using tar-stream instead of regular tar. This commit introduces a small shim in front of the file system and tar-stream to allow both direct disk writes and tar file stream writes. It then uses the appropriate shim (file system or tar-stream) throughout the codebase to create directories and store files. --- .editorconfig | 4 + index.js | 612 ++++++++++++++++++++++++-------------------------- index.min.js | 308 +++++++++++++------------ package.json | 3 +- 4 files changed, 462 insertions(+), 465 deletions(-) create mode 100644 .editorconfig diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..5e9d853 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,4 @@ +[*] +indent_style=space +indent_size=2 + diff --git a/index.js b/index.js index 9d57b59..4b30672 100644 --- a/index.js +++ b/index.js @@ -13,18 +13,99 @@ * initialize module */ var systemRegex = /^system\./; -var fs = require('graceful-fs'); +var fs = require('graceful-fs').gracefulify(require('fs-extra')); var path = require('path'); var BSON; var logger; var meta; +var fileSystemDocumentStore = function (root) { + var dbDir = root; + var makeDir = function (pathname, next) { + fs.stat(pathname, function (err, stats) { + + if (err && err.code === 'ENOENT') { // no file or dir + logger('make dir at ' + pathname); + return fs.mkdirp(pathname, function (err) { + + next(err, pathname); + }); + + } else if (stats && stats.isDirectory() === false) { // pathname is a file + logger('unlink file at ' + pathname); + return fs.unlink(pathname, function (err) { + + if (err) { // unlink fail. permission maybe + return next(err); + } + + logger('make dir at ' + pathname); + fs.mkdir(pathname, function (err) { + + next(err, pathname); + }); + }); + + } else { // already a dir + next(null, pathname); + } + }); + }; + return { + addDatabase: function (dbName, next) { + dbDir = path.join(root, dbName); + return makeDir(dbDir, next); + }, + addCollection: function addCollection(relativePath, next) { + var pathname = path.join(dbDir, relativePath); + return makeDir(pathname, next); + }, + store: function store(collectionName, relativePath, content, callback) { + fs.writeFile(path.join(dbDir, collectionName, relativePath), content, callback); + }, + close: function () { + } + }; +}; + +var streamingDocumentStore = function (root, stream) { + var tar = require('tar-stream'); + var pack = tar.pack(); // pack is a streams2 stream + pack.pipe(stream); + + var dbDir = root; + return { + addDatabase: function addDatabase(dbName, next) { + dbDir = path.join(root, dbName); + pack.entry({name: dbDir, type: 'directory'}); + next(); + }, + + addCollection: function addCollection(filename, next) { + if (filename !== '') { + pack.entry({name: path.join(dbDir, filename), type: 'directory'}); + } + next(); + }, + + store: function store(collectionName, filename, content, callback) { + pack.entry({name: path.join(dbDir, collectionName, filename)}, content); + if (callback) { + callback(); + } + }, + close: function close() { + pack.finalize(); + } + }; +}; + /* * functions */ /** * error handler - * + * * @function error * @param {Object} err - raised error */ @@ -37,142 +118,53 @@ function error(err) { /** * save collection metadata to file - * + * * @function writeMetadata * @param {Object} collection - db collection * @param {String} metadata - path of metadata * @param {Function} next - callback */ -function writeMetadata(collection, metadata, next) { - - return collection.indexes(function(err, indexes) { - - if (err) { - return next(err); - } - - fs.writeFile(metadata + collection.collectionName, JSON.stringify(indexes), - next); - }); -} - -/** - * make dir - * - * @function makeDir - * @param {String} pathname - pathname of dir - * @param {Function} next - callback - */ -function makeDir(pathname, next) { - - fs.stat(pathname, function(err, stats) { - - if (err && err.code === 'ENOENT') { // no file or dir - logger('make dir at ' + pathname); - return fs.mkdir(pathname, function(err) { - - next(err, pathname); - }); - - } else if (stats && stats.isDirectory() === false) { // pathname is a file - logger('unlink file at ' + pathname); - return fs.unlink(pathname, function(err) { - - if (err) { // unlink fail. permission maybe - return next(err); - } - - logger('make dir at ' + pathname); - fs.mkdir(pathname, function(err) { - - next(err, pathname); - }); - }); - - } else { // already a dir - next(null, pathname); - } - }); -} - -/** - * remove dir - * - * @function rmDir - * @param {String} pathname - path of dir - * @param {Function} [next] - callback - */ -function rmDir(pathname, next) { - - fs.readdirSync(pathname).forEach(function(first) { // database - - var database = pathname + first; - if (fs.statSync(database).isDirectory() === false) { - return next(Error('path is not a Directory')); - } - - var metadata = ''; - var collections = fs.readdirSync(database); - var metadataPath = path.join(database, '.metadata'); - if (fs.existsSync(metadataPath) === true) { - metadata = metadataPath + path.sep; - delete collections[collections.indexOf('.metadata')]; // undefined is not a dir - } - - collections.forEach(function(second) { // collection - - var collection = path.join(database, second); - if (fs.statSync(collection).isDirectory() === false) { - return; - } - - fs.readdirSync(collection).forEach(function(third) { // document - - var document = path.join(collection, third); - fs.unlinkSync(document); - return next ? next(null, document) : ''; - }); - - if (metadata !== '') { - fs.unlinkSync(metadata + second); +function writeMetadata(documentStore) { + return function (collection, metadata, next) { + return collection.indexes(function (err, indexes) { + if (err) { + return next(err); } - fs.rmdirSync(collection); + documentStore.store('.metadata', collection.collectionName, JSON.stringify(indexes), next); }); - - if (metadata !== '') { - fs.rmdirSync(metadata); - } - return fs.rmdirSync(database); - }); + }; } + /** * JSON parser async - * + * * @function toJson * @param {Objecy} doc - document from stream * @param {String} collectionPath - path of collection */ -function toJsonAsync(doc, collectionPath) { - - fs.writeFile(collectionPath + doc._id + '.json', JSON.stringify(doc)); +function toJsonAsync(documentStore) { + return function (doc, collectionPath) { + documentStore.store(collectionPath, doc._id + '.json', JSON.stringify(doc)); + }; } /** * BSON parser async - * + * * @function toBson * @param {Objecy} doc - document from stream * @param {String} collectionPath - path of collection */ -function toBsonAsync(doc, collectionPath) { - - fs.writeFile(collectionPath + doc._id + '.bson', BSON.serialize(doc)); +function toBsonAsync(documentStore) { + return function (doc, collectionPath) { + documentStore.store(collectionPath, doc._id + '.bson', BSON.serialize(doc)); + }; } /** * get data from all available collections - * + * * @function allCollections * @param {Object} db - database * @param {String} name - path of dir @@ -181,52 +173,54 @@ function toBsonAsync(doc, collectionPath) { * @param {Function} parser - data parser * @param {Function} next - callback */ -function allCollections(db, name, query, metadata, parser, next) { +function allCollections(documentStore) { + return function (db, name, query, metadata, parser, next) { - return db.collections(function(err, collections) { + return db.collections(function (err, collections) { - if (err) { - return next(err); - } - - var last = ~~collections.length, index = 0; - if (last === 0) { // empty set - return next(err); - } - - collections.forEach(function(collection) { + if (err) { + return next(err); + } - if (systemRegex.test(collection.collectionName) === true) { - return last === ++index ? next(null) : null; + var last = ~~collections.length, index = 0; + if (last === 0) { // empty set + return next(err); } - logger('select collection ' + collection.collectionName); - makeDir(name + collection.collectionName + path.sep, function(err, name) { + collections.forEach(function (collection) { - if (err) { - return last === ++index ? next(err) : error(err); + if (systemRegex.test(collection.collectionName) === true) { + return last === ++index ? next(null) : null; } - meta(collection, metadata, function() { + logger('select collection ' + collection.collectionName); + documentStore.addCollection(collection.collectionName, function (err) { + + if (err) { + return last === ++index ? next(err) : error(err); + } - var stream = collection.find(query).snapshot(true).stream(); + meta(collection, metadata, function () { - stream.once('end', function() { + var stream = collection.find(query).snapshot(true).stream(); - return last === ++index ? next(null) : null; - }).on('data', function(doc) { + stream.once('end', function () { - parser(doc, name); + return last === ++index ? next(null) : null; + }).on('data', function (doc) { + + parser(doc, collection.collectionName); + }); }); }); }); }); - }); + }; } /** * get data from all available collections without query (parallelCollectionScan) - * + * * @function allCollectionsScan * @param {Object} db - database * @param {String} name - path of dir @@ -235,70 +229,72 @@ function allCollections(db, name, query, metadata, parser, next) { * @param {Function} parser - data parser * @param {Function} next - callback */ -function allCollectionsScan(db, name, numCursors, metadata, parser, next) { - - return db.collections(function(err, collections) { - - if (err) { - return next(err); - } +function allCollectionsScan(documentStore) { + return function (db, name, numCursors, metadata, parser, next) { - var last = ~~collections.length, index = 0; - if (last === 0) { // empty set - return next(null); - } + return db.collections(function (err, collections) { - collections.forEach(function(collection) { + if (err) { + return next(err); + } - if (systemRegex.test(collection.collectionName) === true) { - return last === ++index ? next(null) : null; + var last = ~~collections.length, index = 0; + if (last === 0) { // empty set + return next(null); } - logger('select collection scan ' + collection.collectionName); - makeDir(name + collection.collectionName + path.sep, function(err, name) { + collections.forEach(function (collection) { - if (err) { - return last === ++index ? next(err) : error(err); + if (systemRegex.test(collection.collectionName) === true) { + return last === ++index ? next(null) : null; } - meta(collection, metadata, function() { + logger('select collection scan ' + collection.collectionName); + documentStore.addCollection(collection.collectionName, function (err) { - collection.parallelCollectionScan({ - numCursors: numCursors - }, function(err, cursors) { + if (err) { + return last === ++index ? next(err) : error(err); + } - if (err) { - return last === ++index ? next(err) : error(err); - } + meta(collection, metadata, function () { - var ii, cursorsDone; - ii = cursorsDone = ~~cursors.length; - if (ii === 0) { // empty set - return last === ++index ? next(null) : null; - } + collection.parallelCollectionScan({ + numCursors: numCursors + }, function (err, cursors) { + + if (err) { + return last === ++index ? next(err) : error(err); + } - for (var i = 0; i < ii; ++i) { - cursors[i].once('end', function() { + var ii, cursorsDone; + ii = cursorsDone = ~~cursors.length; + if (ii === 0) { // empty set + return last === ++index ? next(null) : null; + } - // No more cursors let's ensure we got all results - if (--cursorsDone === 0) { - return last === ++index ? next(null) : null; - } - }).on('data', function(doc) { + for (var i = 0; i < ii; ++i) { + cursors[i].once('end', function () { - parser(doc, name); - }); - } + // No more cursors let's ensure we got all results + if (--cursorsDone === 0) { + return last === ++index ? next(null) : null; + } + }).on('data', function (doc) { + + parser(doc, collection.collectionName); + }); + } + }); }); }); }); }); - }); + }; } /** * get data from some collections - * + * * @function someCollections * @param {Object} db - database * @param {String} name - path of dir @@ -308,50 +304,52 @@ function allCollectionsScan(db, name, numCursors, metadata, parser, next) { * @param {Function} next - callback * @param {Array} collections - selected collections */ -function someCollections(db, name, query, metadata, parser, next, collections) { +function someCollections(documentStore) { + return function (db, name, query, metadata, parser, next, collections) { - var last = ~~collections.length, index = 0; - if (last === 0) { - return next(null); - } - - collections.forEach(function(collection) { - - db.collection(collection, { - strict: true - }, function(err, collection) { + var last = ~~collections.length, index = 0; + if (last === 0) { + return next(null); + } - if (err) { // returns an error if the collection does not exist - return last === ++index ? next(err) : error(err); - } + collections.forEach(function (collection) { - logger('select collection ' + collection.collectionName); - makeDir(name + collection.collectionName + path.sep, function(err, name) { + db.collection(collection, { + strict: true + }, function (err, collection) { - if (err) { + if (err) { // returns an error if the collection does not exist return last === ++index ? next(err) : error(err); } - meta(collection, metadata, function() { + logger('select collection ' + collection.collectionName); + documentStore.addCollection(collection.collectionName, function (err) { + + if (err) { + return last === ++index ? next(err) : error(err); + } + + meta(collection, metadata, function () { - var stream = collection.find(query).snapshot(true).stream(); + var stream = collection.find(query).snapshot(true).stream(); - stream.once('end', function() { + stream.once('end', function () { - return last === ++index ? next(null) : null; - }).on('data', function(doc) { + return last === ++index ? next(null) : null; + }).on('data', function (doc) { - parser(doc, name); + parser(doc, collection.collectionName); + }); }); }); }); }); - }); + }; } /** * get data from some collections without query (parallelCollectionScan) - * + * * @function someCollectionsScan * @param {Object} db - database * @param {String} name - path of dir @@ -361,69 +359,71 @@ function someCollections(db, name, query, metadata, parser, next, collections) { * @param {Function} next - callback * @param {Array} collections - selected collections */ -function someCollectionsScan(db, name, numCursors, metadata, parser, next, - collections) { +function someCollectionsScan(documentStore) { + return function (db, name, numCursors, metadata, parser, next, + collections) { - var last = ~~collections.length, index = 0; - if (last === 0) { // empty set - return next(null); - } - - collections.forEach(function(collection) { - - db.collection(collection, { - strict: true - }, function(err, collection) { + var last = ~~collections.length, index = 0; + if (last === 0) { // empty set + return next(null); + } - if (err) { // returns an error if the collection does not exist - return last === ++index ? next(err) : error(err); - } + collections.forEach(function (collection) { - logger('select collection scan ' + collection.collectionName); - makeDir(name + collection.collectionName + path.sep, function(err, name) { + db.collection(collection, { + strict: true + }, function (err, collection) { - if (err) { + if (err) { // returns an error if the collection does not exist return last === ++index ? next(err) : error(err); } - meta(collection, metadata, function() { + logger('select collection scan ' + collection.collectionName); + documentStore.addCollection(collection.collectionName, function (err) { - collection.parallelCollectionScan({ - numCursors: numCursors - }, function(err, cursors) { + if (err) { + return last === ++index ? next(err) : error(err); + } - if (err) { - return last === ++index ? next(err) : error(err); - } + meta(collection, metadata, function () { - var ii, cursorsDone; - ii = cursorsDone = ~~cursors.length; - if (ii === 0) { // empty set - return last === ++index ? next(null) : null; - } + collection.parallelCollectionScan({ + numCursors: numCursors + }, function (err, cursors) { + + if (err) { + return last === ++index ? next(err) : error(err); + } - for (var i = 0; i < ii; ++i) { - cursors[i].once('end', function() { + var ii, cursorsDone; + ii = cursorsDone = ~~cursors.length; + if (ii === 0) { // empty set + return last === ++index ? next(null) : null; + } - // No more cursors let's ensure we got all results - if (--cursorsDone === 0) { - return last === ++index ? next(null) : null; - } - }).on('data', function(doc) { + for (var i = 0; i < ii; ++i) { + cursors[i].once('end', function () { - parser(doc, name); - }); - } + // No more cursors let's ensure we got all results + if (--cursorsDone === 0) { + return last === ++index ? next(null) : null; + } + }).on('data', function (doc) { + + parser(doc, collection.collectionName); + }); + } + }); }); }); }); }); - }); + }; } /** * function wrapper - * + * * @function wrapper * @param {Object} my - parsed options */ @@ -437,31 +437,31 @@ function wrapper(my) { case 'bson': BSON = require('bson'); BSON = new BSON(); - parser = toBsonAsync; + parser = toBsonAsync(my.documentStore); break; case 'json': // JSON error on ObjectId, Date and Long - parser = toJsonAsync; + parser = toJsonAsync(my.documentStore); break; default: throw new Error('missing parser option'); } } - var discriminator = allCollections; + var discriminator = allCollections(my.documentStore); if (my.collections !== null) { - discriminator = someCollections; + discriminator = someCollections(my.documentStore); if (my.numCursors) { - discriminator = someCollectionsScan; + discriminator = someCollectionsScan(my.documentStore); my.query = my.numCursors; // override } } else if (my.numCursors) { - discriminator = allCollectionsScan; + discriminator = allCollectionsScan(my.documentStore); my.query = my.numCursors; // override } if (my.logger === null) { - logger = function() { + logger = function () { return; }; @@ -479,7 +479,7 @@ function wrapper(my) { logger('backup start'); var log = require('mongodb').Logger; log.setLevel('info'); - log.setCurrentLogger(function(msg) { + log.setCurrentLogger(function (msg) { return logger(msg); }); @@ -487,9 +487,9 @@ function wrapper(my) { var metadata = ''; if (my.metadata === true) { - meta = writeMetadata; + meta = writeMetadata(my.documentStore); } else { - meta = function(a, b, c) { + meta = function (a, b, c) { return c(); }; @@ -497,7 +497,7 @@ function wrapper(my) { /** * latest callback - * + * * @return {Null} */ function callback(err) { @@ -512,87 +512,48 @@ function wrapper(my) { } } - require('mongodb').MongoClient.connect(my.uri, my.options, function(err, db) { + require('mongodb').MongoClient.connect(my.uri, my.options, function (err, db) { logger('db open'); if (err) { return callback(err); } - var root = my.tar === null ? my.root : my.dir; - makeDir(root, function(err, name) { - - if (err) { - return callback(err); - } - - makeDir(name + db.databaseName + path.sep, function(err, name) { - - function go() { - - // waiting for `db.fsyncLock()` on node driver - return discriminator(db, name, my.query, metadata, parser, - function(err) { - - logger('db close'); - db.close(); - if (err) { - return callback(err); - } - - if (my.tar) { - makeDir(my.root, function(e, name) { + my.documentStore.addDatabase(db.databaseName, function (err, name) { - if (err) { - error(err); - } - - var dest; - if (my.stream) { // user stream - logger('send tar file to stream'); - dest = my.stream; - } else { // filesystem stream - logger('make tar file at ' + name + my.tar); - dest = fs.createWriteStream(name + my.tar); - } + function go() { - var packer = require('tar').Pack().on('error', callback).on( - 'end', function() { + // waiting for `db.fsyncLock()` on node driver + return discriminator(db, name, my.query, metadata, parser, + function (err) { - rmDir(root); - callback(null); - }); - - require('fstream').Reader({ - path: root + db.databaseName, - type: 'Directory' - }).on('error', callback).pipe(packer).pipe(dest); - }); + logger('db close'); + db.close(); + if (err) { + return callback(err); + } - } else { - callback(null); - } - }, my.collections); - } + my.documentStore.close(); + callback(null); + }, my.collections); + } - if (err) { - return callback(err); - } + if (err) { + return callback(err); + } - if (my.metadata === false) { - go(); - } else { - metadata = name + '.metadata' + path.sep; - makeDir(metadata, go); - } - }); + if (my.metadata === false) { + go(); + } else { + my.documentStore.addCollection('.metadata', go); + } }); }); } /** * option setting - * + * * @exports backup * @function backup * @param {Object} options - various options. Check README.md @@ -612,7 +573,6 @@ function backup(options) { } var my = { - dir: path.join(__dirname, 'dump', path.sep), uri: String(opt.uri), root: path.resolve(String(opt.root || '')) + path.sep, stream: opt.stream || null, @@ -626,9 +586,19 @@ function backup(options) { options: typeof opt.options === 'object' ? opt.options : {}, metadata: Boolean(opt.metadata) }; + if (my.tar && !my.stream) { + my.stream = fs.createWriteStream(path.join(my.root, my.tar)); + } if (my.stream) { my.tar = true; // override + my.documentStore = streamingDocumentStore(my.root, my.stream); + } else { + my.documentStore = fileSystemDocumentStore(my.root); } return wrapper(my); } module.exports = backup; + + + + diff --git a/index.min.js b/index.min.js index 6a35041..4e0293e 100644 --- a/index.min.js +++ b/index.min.js @@ -4,149 +4,133 @@ function error(err) { err && logger(err.message); } -function writeMetadata(collection, metadata, next) { - return collection.indexes(function(err, indexes) { - if (err) return next(err); - fs.writeFile(metadata + collection.collectionName, JSON.stringify(indexes), next); - }); -} - -function makeDir(pathname, next) { - fs.stat(pathname, function(err, stats) { - return err && "ENOENT" === err.code ? (logger("make dir at " + pathname), fs.mkdir(pathname, function(err) { - next(err, pathname); - })) : stats && !1 === stats.isDirectory() ? (logger("unlink file at " + pathname), - fs.unlink(pathname, function(err) { +function writeMetadata(documentStore) { + return function(collection, metadata, next) { + return collection.indexes(function(err, indexes) { if (err) return next(err); - logger("make dir at " + pathname), fs.mkdir(pathname, function(err) { - next(err, pathname); - }); - })) : void next(null, pathname); - }); -} - -function rmDir(pathname, next) { - fs.readdirSync(pathname).forEach(function(first) { - var database = pathname + first; - if (!1 === fs.statSync(database).isDirectory()) return next(Error("path is not a Directory")); - var metadata = "", collections = fs.readdirSync(database), metadataPath = path.join(database, ".metadata"); - return !0 === fs.existsSync(metadataPath) && (metadata = metadataPath + path.sep, - delete collections[collections.indexOf(".metadata")]), collections.forEach(function(second) { - var collection = path.join(database, second); - !1 !== fs.statSync(collection).isDirectory() && (fs.readdirSync(collection).forEach(function(third) { - var document = path.join(collection, third); - return fs.unlinkSync(document), next ? next(null, document) : ""; - }), "" !== metadata && fs.unlinkSync(metadata + second), fs.rmdirSync(collection)); - }), "" !== metadata && fs.rmdirSync(metadata), fs.rmdirSync(database); - }); + documentStore.store(".metadata", collection.collectionName, JSON.stringify(indexes), next); + }); + }; } -function toJsonAsync(doc, collectionPath) { - fs.writeFile(collectionPath + doc._id + ".json", JSON.stringify(doc)); +function toJsonAsync(documentStore) { + return function(doc, collectionPath) { + documentStore.store(collectionPath, doc._id + ".json", JSON.stringify(doc)); + }; } -function toBsonAsync(doc, collectionPath) { - fs.writeFile(collectionPath + doc._id + ".bson", BSON.serialize(doc)); +function toBsonAsync(documentStore) { + return function(doc, collectionPath) { + documentStore.store(collectionPath, doc._id + ".bson", BSON.serialize(doc)); + }; } -function allCollections(db, name, query, metadata, parser, next) { - return db.collections(function(err, collections) { - if (err) return next(err); - var last = ~~collections.length, index = 0; - if (0 === last) return next(err); - collections.forEach(function(collection) { - if (!0 === systemRegex.test(collection.collectionName)) return last === ++index ? next(null) : null; - logger("select collection " + collection.collectionName), makeDir(name + collection.collectionName + path.sep, function(err, name) { - if (err) return last === ++index ? next(err) : error(err); - meta(collection, metadata, function() { - collection.find(query).snapshot(!0).stream().once("end", function() { - return last === ++index ? next(null) : null; - }).on("data", function(doc) { - parser(doc, name); +function allCollections(documentStore) { + return function(db, name, query, metadata, parser, next) { + return db.collections(function(err, collections) { + if (err) return next(err); + var last = ~~collections.length, index = 0; + if (0 === last) return next(err); + collections.forEach(function(collection) { + if (!0 === systemRegex.test(collection.collectionName)) return last === ++index ? next(null) : null; + logger("select collection " + collection.collectionName), documentStore.addCollection(collection.collectionName, function(err) { + if (err) return last === ++index ? next(err) : error(err); + meta(collection, metadata, function() { + collection.find(query).snapshot(!0).stream().once("end", function() { + return last === ++index ? next(null) : null; + }).on("data", function(doc) { + parser(doc, collection.collectionName); + }); }); }); }); }); - }); + }; } -function allCollectionsScan(db, name, numCursors, metadata, parser, next) { - return db.collections(function(err, collections) { - if (err) return next(err); - var last = ~~collections.length, index = 0; - if (0 === last) return next(null); - collections.forEach(function(collection) { - if (!0 === systemRegex.test(collection.collectionName)) return last === ++index ? next(null) : null; - logger("select collection scan " + collection.collectionName), makeDir(name + collection.collectionName + path.sep, function(err, name) { - if (err) return last === ++index ? next(err) : error(err); - meta(collection, metadata, function() { - collection.parallelCollectionScan({ - numCursors: numCursors - }, function(err, cursors) { - if (err) return last === ++index ? next(err) : error(err); - var ii, cursorsDone; - if (0 === (ii = cursorsDone = ~~cursors.length)) return last === ++index ? next(null) : null; - for (var i = 0; i < ii; ++i) cursors[i].once("end", function() { - if (0 == --cursorsDone) return last === ++index ? next(null) : null; - }).on("data", function(doc) { - parser(doc, name); +function allCollectionsScan(documentStore) { + return function(db, name, numCursors, metadata, parser, next) { + return db.collections(function(err, collections) { + if (err) return next(err); + var last = ~~collections.length, index = 0; + if (0 === last) return next(null); + collections.forEach(function(collection) { + if (!0 === systemRegex.test(collection.collectionName)) return last === ++index ? next(null) : null; + logger("select collection scan " + collection.collectionName), documentStore.addCollection(collection.collectionName, function(err) { + if (err) return last === ++index ? next(err) : error(err); + meta(collection, metadata, function() { + collection.parallelCollectionScan({ + numCursors: numCursors + }, function(err, cursors) { + if (err) return last === ++index ? next(err) : error(err); + var ii, cursorsDone; + if (0 === (ii = cursorsDone = ~~cursors.length)) return last === ++index ? next(null) : null; + for (var i = 0; i < ii; ++i) cursors[i].once("end", function() { + if (0 == --cursorsDone) return last === ++index ? next(null) : null; + }).on("data", function(doc) { + parser(doc, collection.collectionName); + }); }); }); }); }); }); - }); + }; } -function someCollections(db, name, query, metadata, parser, next, collections) { - var last = ~~collections.length, index = 0; - if (0 === last) return next(null); - collections.forEach(function(collection) { - db.collection(collection, { - strict: !0 - }, function(err, collection) { - if (err) return last === ++index ? next(err) : error(err); - logger("select collection " + collection.collectionName), makeDir(name + collection.collectionName + path.sep, function(err, name) { +function someCollections(documentStore) { + return function(db, name, query, metadata, parser, next, collections) { + var last = ~~collections.length, index = 0; + if (0 === last) return next(null); + collections.forEach(function(collection) { + db.collection(collection, { + strict: !0 + }, function(err, collection) { if (err) return last === ++index ? next(err) : error(err); - meta(collection, metadata, function() { - collection.find(query).snapshot(!0).stream().once("end", function() { - return last === ++index ? next(null) : null; - }).on("data", function(doc) { - parser(doc, name); + logger("select collection " + collection.collectionName), documentStore.addCollection(collection.collectionName, function(err) { + if (err) return last === ++index ? next(err) : error(err); + meta(collection, metadata, function() { + collection.find(query).snapshot(!0).stream().once("end", function() { + return last === ++index ? next(null) : null; + }).on("data", function(doc) { + parser(doc, collection.collectionName); + }); }); }); }); }); - }); + }; } -function someCollectionsScan(db, name, numCursors, metadata, parser, next, collections) { - var last = ~~collections.length, index = 0; - if (0 === last) return next(null); - collections.forEach(function(collection) { - db.collection(collection, { - strict: !0 - }, function(err, collection) { - if (err) return last === ++index ? next(err) : error(err); - logger("select collection scan " + collection.collectionName), makeDir(name + collection.collectionName + path.sep, function(err, name) { +function someCollectionsScan(documentStore) { + return function(db, name, numCursors, metadata, parser, next, collections) { + var last = ~~collections.length, index = 0; + if (0 === last) return next(null); + collections.forEach(function(collection) { + db.collection(collection, { + strict: !0 + }, function(err, collection) { if (err) return last === ++index ? next(err) : error(err); - meta(collection, metadata, function() { - collection.parallelCollectionScan({ - numCursors: numCursors - }, function(err, cursors) { - if (err) return last === ++index ? next(err) : error(err); - var ii, cursorsDone; - if (0 === (ii = cursorsDone = ~~cursors.length)) return last === ++index ? next(null) : null; - for (var i = 0; i < ii; ++i) cursors[i].once("end", function() { - if (0 == --cursorsDone) return last === ++index ? next(null) : null; - }).on("data", function(doc) { - parser(doc, name); + logger("select collection scan " + collection.collectionName), documentStore.addCollection(collection.collectionName, function(err) { + if (err) return last === ++index ? next(err) : error(err); + meta(collection, metadata, function() { + collection.parallelCollectionScan({ + numCursors: numCursors + }, function(err, cursors) { + if (err) return last === ++index ? next(err) : error(err); + var ii, cursorsDone; + if (0 === (ii = cursorsDone = ~~cursors.length)) return last === ++index ? next(null) : null; + for (var i = 0; i < ii; ++i) cursors[i].once("end", function() { + if (0 == --cursorsDone) return last === ++index ? next(null) : null; + }).on("data", function(doc) { + parser(doc, collection.collectionName); + }); }); }); }); }); }); - }); + }; } function wrapper(my) { @@ -156,19 +140,19 @@ function wrapper(my) { var parser; if ("function" == typeof my.parser) parser = my.parser; else switch (my.parser.toLowerCase()) { case "bson": - BSON = require("bson"), BSON = new BSON(), parser = toBsonAsync; + BSON = require("bson"), BSON = new BSON(), parser = toBsonAsync(my.documentStore); break; case "json": - parser = toJsonAsync; + parser = toJsonAsync(my.documentStore); break; default: throw new Error("missing parser option"); } - var discriminator = allCollections; - if (null !== my.collections ? (discriminator = someCollections, my.numCursors && (discriminator = someCollectionsScan, - my.query = my.numCursors)) : my.numCursors && (discriminator = allCollectionsScan, + var discriminator = allCollections(my.documentStore); + if (null !== my.collections ? (discriminator = someCollections(my.documentStore), + my.numCursors && (discriminator = someCollectionsScan(my.documentStore), my.query = my.numCursors)) : my.numCursors && (discriminator = allCollectionsScan(my.documentStore), my.query = my.numCursors), null === my.logger) logger = function() {}; else { (logger = require("logger-request")({ filename: my.logger, @@ -186,35 +170,19 @@ function wrapper(my) { }); } var metadata = ""; - meta = !0 === my.metadata ? writeMetadata : function(a, b, c) { + meta = !0 === my.metadata ? writeMetadata(my.documentStore) : function(a, b, c) { return c(); }, require("mongodb").MongoClient.connect(my.uri, my.options, function(err, db) { if (logger("db open"), err) return callback(err); - var root = null === my.tar ? my.root : my.dir; - makeDir(root, function(err, name) { + my.documentStore.addDatabase(db.databaseName, function(err, name) { + function go() { + return discriminator(db, name, my.query, metadata, parser, function(err) { + if (logger("db close"), db.close(), err) return callback(err); + my.documentStore.close(), callback(null); + }, my.collections); + } if (err) return callback(err); - makeDir(name + db.databaseName + path.sep, function(err, name) { - function go() { - return discriminator(db, name, my.query, metadata, parser, function(err) { - if (logger("db close"), db.close(), err) return callback(err); - my.tar ? makeDir(my.root, function(e, name) { - err && error(err); - var dest; - my.stream ? (logger("send tar file to stream"), dest = my.stream) : (logger("make tar file at " + name + my.tar), - dest = fs.createWriteStream(name + my.tar)); - var packer = require("tar").Pack().on("error", callback).on("end", function() { - rmDir(root), callback(null); - }); - require("fstream").Reader({ - path: root + db.databaseName, - type: "Directory" - }).on("error", callback).pipe(packer).pipe(dest); - }) : callback(null); - }, my.collections); - } - if (err) return callback(err); - !1 === my.metadata ? go() : makeDir(metadata = name + ".metadata" + path.sep, go); - }); + !1 === my.metadata ? go() : my.documentStore.addCollection(".metadata", go); }); }); } @@ -227,7 +195,6 @@ function backup(options) { if (fs.existsSync(opt.root) && !fs.statSync(opt.root).isDirectory()) throw new Error("root option is not a directory"); } var my = { - dir: path.join(__dirname, "dump", path.sep), uri: String(opt.uri), root: path.resolve(String(opt.root || "")) + path.sep, stream: opt.stream || null, @@ -241,9 +208,64 @@ function backup(options) { options: "object" == typeof opt.options ? opt.options : {}, metadata: Boolean(opt.metadata) }; - return my.stream && (my.tar = !0), wrapper(my); + return my.tar && !my.stream && (my.stream = fs.createWriteStream(path.join(my.root, my.tar))), + my.stream ? (my.tar = !0, my.documentStore = streamingDocumentStore(my.root, my.stream)) : my.documentStore = fileSystemDocumentStore(my.root), + wrapper(my); } -var systemRegex = /^system\./, fs = require("graceful-fs"), path = require("path"), BSON, logger, meta; +var systemRegex = /^system\./, fs = require("graceful-fs").gracefulify(require("fs-extra")), path = require("path"), BSON, logger, meta, fileSystemDocumentStore = function(root) { + var dbDir = root, makeDir = function(pathname, next) { + fs.stat(pathname, function(err, stats) { + return err && "ENOENT" === err.code ? (logger("make dir at " + pathname), fs.mkdirp(pathname, function(err) { + next(err, pathname); + })) : stats && !1 === stats.isDirectory() ? (logger("unlink file at " + pathname), + fs.unlink(pathname, function(err) { + if (err) return next(err); + logger("make dir at " + pathname), fs.mkdir(pathname, function(err) { + next(err, pathname); + }); + })) : void next(null, pathname); + }); + }; + return { + addDatabase: function(dbName, next) { + return dbDir = path.join(root, dbName), makeDir(dbDir, next); + }, + addCollection: function(relativePath, next) { + var pathname = path.join(dbDir, relativePath); + return makeDir(pathname, next); + }, + store: function(collectionName, relativePath, content, callback) { + fs.writeFile(path.join(dbDir, collectionName, relativePath), content, callback); + }, + close: function() {} + }; +}, streamingDocumentStore = function(root, stream) { + var pack = require("tar-stream").pack(); + pack.pipe(stream); + var dbDir = root; + return { + addDatabase: function(dbName, next) { + dbDir = path.join(root, dbName), pack.entry({ + name: dbDir, + type: "directory" + }), next(); + }, + addCollection: function(filename, next) { + "" !== filename && pack.entry({ + name: path.join(dbDir, filename), + type: "directory" + }), next(); + }, + store: function(collectionName, filename, content, callback) { + pack.entry({ + name: path.join(dbDir, collectionName, filename) + }, content), callback && callback(); + }, + close: function() { + pack.finalize(); + } + }; +}; module.exports = backup; diff --git a/package.json b/package.json index a0d71b4..82fdcff 100644 --- a/package.json +++ b/package.json @@ -25,11 +25,12 @@ "main": "index.min.js", "dependencies": { "bson": "1.0.4", + "fs-extra": "^3.0.1", "fstream": "1.0.11", "graceful-fs": "4.1.11", "logger-request": "3.8.0", "mongodb": "2.2.26", - "tar": "2.2.1" + "tar-stream": "^1.5.4" }, "devDependencies": { "grunt": "~1.0", From 82b80c6e74e47a06c0710942b1970fb48113713f Mon Sep 17 00:00:00 2001 From: Stuart Miller Date: Fri, 9 Jun 2017 09:48:17 +1200 Subject: [PATCH 2/3] Revert index.min.js for code review purposes --- index.min.js | 308 ++++++++++++++++++++++++--------------------------- 1 file changed, 143 insertions(+), 165 deletions(-) diff --git a/index.min.js b/index.min.js index 4e0293e..6a35041 100644 --- a/index.min.js +++ b/index.min.js @@ -4,133 +4,149 @@ function error(err) { err && logger(err.message); } -function writeMetadata(documentStore) { - return function(collection, metadata, next) { - return collection.indexes(function(err, indexes) { +function writeMetadata(collection, metadata, next) { + return collection.indexes(function(err, indexes) { + if (err) return next(err); + fs.writeFile(metadata + collection.collectionName, JSON.stringify(indexes), next); + }); +} + +function makeDir(pathname, next) { + fs.stat(pathname, function(err, stats) { + return err && "ENOENT" === err.code ? (logger("make dir at " + pathname), fs.mkdir(pathname, function(err) { + next(err, pathname); + })) : stats && !1 === stats.isDirectory() ? (logger("unlink file at " + pathname), + fs.unlink(pathname, function(err) { if (err) return next(err); - documentStore.store(".metadata", collection.collectionName, JSON.stringify(indexes), next); - }); - }; + logger("make dir at " + pathname), fs.mkdir(pathname, function(err) { + next(err, pathname); + }); + })) : void next(null, pathname); + }); } -function toJsonAsync(documentStore) { - return function(doc, collectionPath) { - documentStore.store(collectionPath, doc._id + ".json", JSON.stringify(doc)); - }; +function rmDir(pathname, next) { + fs.readdirSync(pathname).forEach(function(first) { + var database = pathname + first; + if (!1 === fs.statSync(database).isDirectory()) return next(Error("path is not a Directory")); + var metadata = "", collections = fs.readdirSync(database), metadataPath = path.join(database, ".metadata"); + return !0 === fs.existsSync(metadataPath) && (metadata = metadataPath + path.sep, + delete collections[collections.indexOf(".metadata")]), collections.forEach(function(second) { + var collection = path.join(database, second); + !1 !== fs.statSync(collection).isDirectory() && (fs.readdirSync(collection).forEach(function(third) { + var document = path.join(collection, third); + return fs.unlinkSync(document), next ? next(null, document) : ""; + }), "" !== metadata && fs.unlinkSync(metadata + second), fs.rmdirSync(collection)); + }), "" !== metadata && fs.rmdirSync(metadata), fs.rmdirSync(database); + }); } -function toBsonAsync(documentStore) { - return function(doc, collectionPath) { - documentStore.store(collectionPath, doc._id + ".bson", BSON.serialize(doc)); - }; +function toJsonAsync(doc, collectionPath) { + fs.writeFile(collectionPath + doc._id + ".json", JSON.stringify(doc)); } -function allCollections(documentStore) { - return function(db, name, query, metadata, parser, next) { - return db.collections(function(err, collections) { - if (err) return next(err); - var last = ~~collections.length, index = 0; - if (0 === last) return next(err); - collections.forEach(function(collection) { - if (!0 === systemRegex.test(collection.collectionName)) return last === ++index ? next(null) : null; - logger("select collection " + collection.collectionName), documentStore.addCollection(collection.collectionName, function(err) { - if (err) return last === ++index ? next(err) : error(err); - meta(collection, metadata, function() { - collection.find(query).snapshot(!0).stream().once("end", function() { - return last === ++index ? next(null) : null; - }).on("data", function(doc) { - parser(doc, collection.collectionName); - }); - }); - }); - }); - }); - }; +function toBsonAsync(doc, collectionPath) { + fs.writeFile(collectionPath + doc._id + ".bson", BSON.serialize(doc)); } -function allCollectionsScan(documentStore) { - return function(db, name, numCursors, metadata, parser, next) { - return db.collections(function(err, collections) { - if (err) return next(err); - var last = ~~collections.length, index = 0; - if (0 === last) return next(null); - collections.forEach(function(collection) { - if (!0 === systemRegex.test(collection.collectionName)) return last === ++index ? next(null) : null; - logger("select collection scan " + collection.collectionName), documentStore.addCollection(collection.collectionName, function(err) { - if (err) return last === ++index ? next(err) : error(err); - meta(collection, metadata, function() { - collection.parallelCollectionScan({ - numCursors: numCursors - }, function(err, cursors) { - if (err) return last === ++index ? next(err) : error(err); - var ii, cursorsDone; - if (0 === (ii = cursorsDone = ~~cursors.length)) return last === ++index ? next(null) : null; - for (var i = 0; i < ii; ++i) cursors[i].once("end", function() { - if (0 == --cursorsDone) return last === ++index ? next(null) : null; - }).on("data", function(doc) { - parser(doc, collection.collectionName); - }); - }); +function allCollections(db, name, query, metadata, parser, next) { + return db.collections(function(err, collections) { + if (err) return next(err); + var last = ~~collections.length, index = 0; + if (0 === last) return next(err); + collections.forEach(function(collection) { + if (!0 === systemRegex.test(collection.collectionName)) return last === ++index ? next(null) : null; + logger("select collection " + collection.collectionName), makeDir(name + collection.collectionName + path.sep, function(err, name) { + if (err) return last === ++index ? next(err) : error(err); + meta(collection, metadata, function() { + collection.find(query).snapshot(!0).stream().once("end", function() { + return last === ++index ? next(null) : null; + }).on("data", function(doc) { + parser(doc, name); }); }); }); }); - }; + }); } -function someCollections(documentStore) { - return function(db, name, query, metadata, parser, next, collections) { +function allCollectionsScan(db, name, numCursors, metadata, parser, next) { + return db.collections(function(err, collections) { + if (err) return next(err); var last = ~~collections.length, index = 0; if (0 === last) return next(null); collections.forEach(function(collection) { - db.collection(collection, { - strict: !0 - }, function(err, collection) { + if (!0 === systemRegex.test(collection.collectionName)) return last === ++index ? next(null) : null; + logger("select collection scan " + collection.collectionName), makeDir(name + collection.collectionName + path.sep, function(err, name) { if (err) return last === ++index ? next(err) : error(err); - logger("select collection " + collection.collectionName), documentStore.addCollection(collection.collectionName, function(err) { - if (err) return last === ++index ? next(err) : error(err); - meta(collection, metadata, function() { - collection.find(query).snapshot(!0).stream().once("end", function() { - return last === ++index ? next(null) : null; + meta(collection, metadata, function() { + collection.parallelCollectionScan({ + numCursors: numCursors + }, function(err, cursors) { + if (err) return last === ++index ? next(err) : error(err); + var ii, cursorsDone; + if (0 === (ii = cursorsDone = ~~cursors.length)) return last === ++index ? next(null) : null; + for (var i = 0; i < ii; ++i) cursors[i].once("end", function() { + if (0 == --cursorsDone) return last === ++index ? next(null) : null; }).on("data", function(doc) { - parser(doc, collection.collectionName); + parser(doc, name); }); }); }); }); }); - }; + }); } -function someCollectionsScan(documentStore) { - return function(db, name, numCursors, metadata, parser, next, collections) { - var last = ~~collections.length, index = 0; - if (0 === last) return next(null); - collections.forEach(function(collection) { - db.collection(collection, { - strict: !0 - }, function(err, collection) { +function someCollections(db, name, query, metadata, parser, next, collections) { + var last = ~~collections.length, index = 0; + if (0 === last) return next(null); + collections.forEach(function(collection) { + db.collection(collection, { + strict: !0 + }, function(err, collection) { + if (err) return last === ++index ? next(err) : error(err); + logger("select collection " + collection.collectionName), makeDir(name + collection.collectionName + path.sep, function(err, name) { if (err) return last === ++index ? next(err) : error(err); - logger("select collection scan " + collection.collectionName), documentStore.addCollection(collection.collectionName, function(err) { - if (err) return last === ++index ? next(err) : error(err); - meta(collection, metadata, function() { - collection.parallelCollectionScan({ - numCursors: numCursors - }, function(err, cursors) { - if (err) return last === ++index ? next(err) : error(err); - var ii, cursorsDone; - if (0 === (ii = cursorsDone = ~~cursors.length)) return last === ++index ? next(null) : null; - for (var i = 0; i < ii; ++i) cursors[i].once("end", function() { - if (0 == --cursorsDone) return last === ++index ? next(null) : null; - }).on("data", function(doc) { - parser(doc, collection.collectionName); - }); + meta(collection, metadata, function() { + collection.find(query).snapshot(!0).stream().once("end", function() { + return last === ++index ? next(null) : null; + }).on("data", function(doc) { + parser(doc, name); + }); + }); + }); + }); + }); +} + +function someCollectionsScan(db, name, numCursors, metadata, parser, next, collections) { + var last = ~~collections.length, index = 0; + if (0 === last) return next(null); + collections.forEach(function(collection) { + db.collection(collection, { + strict: !0 + }, function(err, collection) { + if (err) return last === ++index ? next(err) : error(err); + logger("select collection scan " + collection.collectionName), makeDir(name + collection.collectionName + path.sep, function(err, name) { + if (err) return last === ++index ? next(err) : error(err); + meta(collection, metadata, function() { + collection.parallelCollectionScan({ + numCursors: numCursors + }, function(err, cursors) { + if (err) return last === ++index ? next(err) : error(err); + var ii, cursorsDone; + if (0 === (ii = cursorsDone = ~~cursors.length)) return last === ++index ? next(null) : null; + for (var i = 0; i < ii; ++i) cursors[i].once("end", function() { + if (0 == --cursorsDone) return last === ++index ? next(null) : null; + }).on("data", function(doc) { + parser(doc, name); }); }); }); }); }); - }; + }); } function wrapper(my) { @@ -140,19 +156,19 @@ function wrapper(my) { var parser; if ("function" == typeof my.parser) parser = my.parser; else switch (my.parser.toLowerCase()) { case "bson": - BSON = require("bson"), BSON = new BSON(), parser = toBsonAsync(my.documentStore); + BSON = require("bson"), BSON = new BSON(), parser = toBsonAsync; break; case "json": - parser = toJsonAsync(my.documentStore); + parser = toJsonAsync; break; default: throw new Error("missing parser option"); } - var discriminator = allCollections(my.documentStore); - if (null !== my.collections ? (discriminator = someCollections(my.documentStore), - my.numCursors && (discriminator = someCollectionsScan(my.documentStore), my.query = my.numCursors)) : my.numCursors && (discriminator = allCollectionsScan(my.documentStore), + var discriminator = allCollections; + if (null !== my.collections ? (discriminator = someCollections, my.numCursors && (discriminator = someCollectionsScan, + my.query = my.numCursors)) : my.numCursors && (discriminator = allCollectionsScan, my.query = my.numCursors), null === my.logger) logger = function() {}; else { (logger = require("logger-request")({ filename: my.logger, @@ -170,19 +186,35 @@ function wrapper(my) { }); } var metadata = ""; - meta = !0 === my.metadata ? writeMetadata(my.documentStore) : function(a, b, c) { + meta = !0 === my.metadata ? writeMetadata : function(a, b, c) { return c(); }, require("mongodb").MongoClient.connect(my.uri, my.options, function(err, db) { if (logger("db open"), err) return callback(err); - my.documentStore.addDatabase(db.databaseName, function(err, name) { - function go() { - return discriminator(db, name, my.query, metadata, parser, function(err) { - if (logger("db close"), db.close(), err) return callback(err); - my.documentStore.close(), callback(null); - }, my.collections); - } + var root = null === my.tar ? my.root : my.dir; + makeDir(root, function(err, name) { if (err) return callback(err); - !1 === my.metadata ? go() : my.documentStore.addCollection(".metadata", go); + makeDir(name + db.databaseName + path.sep, function(err, name) { + function go() { + return discriminator(db, name, my.query, metadata, parser, function(err) { + if (logger("db close"), db.close(), err) return callback(err); + my.tar ? makeDir(my.root, function(e, name) { + err && error(err); + var dest; + my.stream ? (logger("send tar file to stream"), dest = my.stream) : (logger("make tar file at " + name + my.tar), + dest = fs.createWriteStream(name + my.tar)); + var packer = require("tar").Pack().on("error", callback).on("end", function() { + rmDir(root), callback(null); + }); + require("fstream").Reader({ + path: root + db.databaseName, + type: "Directory" + }).on("error", callback).pipe(packer).pipe(dest); + }) : callback(null); + }, my.collections); + } + if (err) return callback(err); + !1 === my.metadata ? go() : makeDir(metadata = name + ".metadata" + path.sep, go); + }); }); }); } @@ -195,6 +227,7 @@ function backup(options) { if (fs.existsSync(opt.root) && !fs.statSync(opt.root).isDirectory()) throw new Error("root option is not a directory"); } var my = { + dir: path.join(__dirname, "dump", path.sep), uri: String(opt.uri), root: path.resolve(String(opt.root || "")) + path.sep, stream: opt.stream || null, @@ -208,64 +241,9 @@ function backup(options) { options: "object" == typeof opt.options ? opt.options : {}, metadata: Boolean(opt.metadata) }; - return my.tar && !my.stream && (my.stream = fs.createWriteStream(path.join(my.root, my.tar))), - my.stream ? (my.tar = !0, my.documentStore = streamingDocumentStore(my.root, my.stream)) : my.documentStore = fileSystemDocumentStore(my.root), - wrapper(my); + return my.stream && (my.tar = !0), wrapper(my); } -var systemRegex = /^system\./, fs = require("graceful-fs").gracefulify(require("fs-extra")), path = require("path"), BSON, logger, meta, fileSystemDocumentStore = function(root) { - var dbDir = root, makeDir = function(pathname, next) { - fs.stat(pathname, function(err, stats) { - return err && "ENOENT" === err.code ? (logger("make dir at " + pathname), fs.mkdirp(pathname, function(err) { - next(err, pathname); - })) : stats && !1 === stats.isDirectory() ? (logger("unlink file at " + pathname), - fs.unlink(pathname, function(err) { - if (err) return next(err); - logger("make dir at " + pathname), fs.mkdir(pathname, function(err) { - next(err, pathname); - }); - })) : void next(null, pathname); - }); - }; - return { - addDatabase: function(dbName, next) { - return dbDir = path.join(root, dbName), makeDir(dbDir, next); - }, - addCollection: function(relativePath, next) { - var pathname = path.join(dbDir, relativePath); - return makeDir(pathname, next); - }, - store: function(collectionName, relativePath, content, callback) { - fs.writeFile(path.join(dbDir, collectionName, relativePath), content, callback); - }, - close: function() {} - }; -}, streamingDocumentStore = function(root, stream) { - var pack = require("tar-stream").pack(); - pack.pipe(stream); - var dbDir = root; - return { - addDatabase: function(dbName, next) { - dbDir = path.join(root, dbName), pack.entry({ - name: dbDir, - type: "directory" - }), next(); - }, - addCollection: function(filename, next) { - "" !== filename && pack.entry({ - name: path.join(dbDir, filename), - type: "directory" - }), next(); - }, - store: function(collectionName, filename, content, callback) { - pack.entry({ - name: path.join(dbDir, collectionName, filename) - }, content), callback && callback(); - }, - close: function() { - pack.finalize(); - } - }; -}; +var systemRegex = /^system\./, fs = require("graceful-fs"), path = require("path"), BSON, logger, meta; module.exports = backup; From 36b035eb27ae26fe5de2dc0ad20fd22f7e07bdf0 Mon Sep 17 00:00:00 2001 From: Stuart Miller Date: Wed, 14 Jun 2017 10:04:25 +1200 Subject: [PATCH 3/3] Store documentStore as a global variable to make the diff easier to read. --- index.js | 360 +++++++++++++++++++++++++++---------------------------- 1 file changed, 174 insertions(+), 186 deletions(-) diff --git a/index.js b/index.js index 4b30672..ff6eee5 100644 --- a/index.js +++ b/index.js @@ -19,28 +19,30 @@ var BSON; var logger; var meta; -var fileSystemDocumentStore = function (root) { +var documentStore = undefined; + +var fileSystemDocumentStore = function(root) { var dbDir = root; - var makeDir = function (pathname, next) { - fs.stat(pathname, function (err, stats) { + var makeDir = function(pathname, next) { + fs.stat(pathname, function(err, stats) { if (err && err.code === 'ENOENT') { // no file or dir logger('make dir at ' + pathname); - return fs.mkdirp(pathname, function (err) { + return fs.mkdirp(pathname, function(err) { next(err, pathname); }); } else if (stats && stats.isDirectory() === false) { // pathname is a file logger('unlink file at ' + pathname); - return fs.unlink(pathname, function (err) { + return fs.unlink(pathname, function(err) { if (err) { // unlink fail. permission maybe return next(err); } logger('make dir at ' + pathname); - fs.mkdir(pathname, function (err) { + fs.mkdir(pathname, function(err) { next(err, pathname); }); @@ -52,7 +54,7 @@ var fileSystemDocumentStore = function (root) { }); }; return { - addDatabase: function (dbName, next) { + addDatabase: function(dbName, next) { dbDir = path.join(root, dbName); return makeDir(dbDir, next); }, @@ -63,12 +65,12 @@ var fileSystemDocumentStore = function (root) { store: function store(collectionName, relativePath, content, callback) { fs.writeFile(path.join(dbDir, collectionName, relativePath), content, callback); }, - close: function () { + close: function() { } }; }; -var streamingDocumentStore = function (root, stream) { +var streamingDocumentStore = function(root, stream) { var tar = require('tar-stream'); var pack = tar.pack(); // pack is a streams2 stream pack.pipe(stream); @@ -124,15 +126,15 @@ function error(err) { * @param {String} metadata - path of metadata * @param {Function} next - callback */ -function writeMetadata(documentStore) { - return function (collection, metadata, next) { - return collection.indexes(function (err, indexes) { - if (err) { - return next(err); - } - documentStore.store('.metadata', collection.collectionName, JSON.stringify(indexes), next); - }); - }; +function writeMetadata(collection, metadata, next) { + + return collection.indexes(function(err, indexes) { + + if (err) { + return next(err); + } + documentStore.store('.metadata', collection.collectionName, JSON.stringify(indexes), next); + }); } @@ -143,10 +145,9 @@ function writeMetadata(documentStore) { * @param {Objecy} doc - document from stream * @param {String} collectionPath - path of collection */ -function toJsonAsync(documentStore) { - return function (doc, collectionPath) { - documentStore.store(collectionPath, doc._id + '.json', JSON.stringify(doc)); - }; +function toJsonAsync(doc, collectionPath) { + + documentStore.store(collectionPath, doc._id + '.json', JSON.stringify(doc)); } /** @@ -156,10 +157,9 @@ function toJsonAsync(documentStore) { * @param {Objecy} doc - document from stream * @param {String} collectionPath - path of collection */ -function toBsonAsync(documentStore) { - return function (doc, collectionPath) { - documentStore.store(collectionPath, doc._id + '.bson', BSON.serialize(doc)); - }; +function toBsonAsync(doc, collectionPath) { + + documentStore.store(collectionPath, doc._id + '.bson', BSON.serialize(doc)); } /** @@ -173,49 +173,47 @@ function toBsonAsync(documentStore) { * @param {Function} parser - data parser * @param {Function} next - callback */ -function allCollections(documentStore) { - return function (db, name, query, metadata, parser, next) { +function allCollections(db, name, query, metadata, parser, next) { - return db.collections(function (err, collections) { + return db.collections(function(err, collections) { - if (err) { - return next(err); - } + if (err) { + return next(err); + } - var last = ~~collections.length, index = 0; - if (last === 0) { // empty set - return next(err); - } + var last = ~~collections.length, index = 0; + if (last === 0) { // empty set + return next(err); + } - collections.forEach(function (collection) { + collections.forEach(function(collection) { - if (systemRegex.test(collection.collectionName) === true) { - return last === ++index ? next(null) : null; - } + if (systemRegex.test(collection.collectionName) === true) { + return last === ++index ? next(null) : null; + } - logger('select collection ' + collection.collectionName); - documentStore.addCollection(collection.collectionName, function (err) { + logger('select collection ' + collection.collectionName); + documentStore.addCollection(collection.collectionName, function(err) { - if (err) { - return last === ++index ? next(err) : error(err); - } + if (err) { + return last === ++index ? next(err) : error(err); + } - meta(collection, metadata, function () { + meta(collection, metadata, function() { - var stream = collection.find(query).snapshot(true).stream(); + var stream = collection.find(query).snapshot(true).stream(); - stream.once('end', function () { + stream.once('end', function() { - return last === ++index ? next(null) : null; - }).on('data', function (doc) { + return last === ++index ? next(null) : null; + }).on('data', function(doc) { - parser(doc, collection.collectionName); - }); + parser(doc, collection.collectionName); }); }); }); }); - }; + }); } /** @@ -229,67 +227,65 @@ function allCollections(documentStore) { * @param {Function} parser - data parser * @param {Function} next - callback */ -function allCollectionsScan(documentStore) { - return function (db, name, numCursors, metadata, parser, next) { +function allCollectionsScan(db, name, numCursors, metadata, parser, next) { - return db.collections(function (err, collections) { + return db.collections(function(err, collections) { - if (err) { - return next(err); - } + if (err) { + return next(err); + } - var last = ~~collections.length, index = 0; - if (last === 0) { // empty set - return next(null); - } + var last = ~~collections.length, index = 0; + if (last === 0) { // empty set + return next(null); + } - collections.forEach(function (collection) { + collections.forEach(function(collection) { - if (systemRegex.test(collection.collectionName) === true) { - return last === ++index ? next(null) : null; - } + if (systemRegex.test(collection.collectionName) === true) { + return last === ++index ? next(null) : null; + } - logger('select collection scan ' + collection.collectionName); - documentStore.addCollection(collection.collectionName, function (err) { + logger('select collection scan ' + collection.collectionName); + documentStore.addCollection(collection.collectionName, function(err) { - if (err) { - return last === ++index ? next(err) : error(err); - } + if (err) { + return last === ++index ? next(err) : error(err); + } - meta(collection, metadata, function () { + meta(collection, metadata, function() { - collection.parallelCollectionScan({ - numCursors: numCursors - }, function (err, cursors) { + collection.parallelCollectionScan({ + numCursors: numCursors + }, function(err, cursors) { - if (err) { - return last === ++index ? next(err) : error(err); - } + if (err) { + return last === ++index ? next(err) : error(err); + } - var ii, cursorsDone; - ii = cursorsDone = ~~cursors.length; - if (ii === 0) { // empty set - return last === ++index ? next(null) : null; - } + var ii, cursorsDone; + ii = cursorsDone = ~~cursors.length; + if (ii === 0) { // empty set + return last === ++index ? next(null) : null; + } - for (var i = 0; i < ii; ++i) { - cursors[i].once('end', function () { + for (var i = 0; i < ii; ++i) { + cursors[i].once('end', function() { - // No more cursors let's ensure we got all results - if (--cursorsDone === 0) { - return last === ++index ? next(null) : null; - } - }).on('data', function (doc) { + // No more cursors let's ensure we got all results + if (--cursorsDone === 0) { + return last === ++index ? next(null) : null; + } + }).on('data', function(doc) { - parser(doc, collection.collectionName); - }); - } - }); + parser(doc, collection.collectionName); + }); + } }); }); }); }); - }; + }); } /** @@ -304,47 +300,45 @@ function allCollectionsScan(documentStore) { * @param {Function} next - callback * @param {Array} collections - selected collections */ -function someCollections(documentStore) { - return function (db, name, query, metadata, parser, next, collections) { +function someCollections(db, name, query, metadata, parser, next, collections) { - var last = ~~collections.length, index = 0; - if (last === 0) { - return next(null); - } + var last = ~~collections.length, index = 0; + if (last === 0) { + return next(null); + } - collections.forEach(function (collection) { + collections.forEach(function(collection) { - db.collection(collection, { - strict: true - }, function (err, collection) { + db.collection(collection, { + strict: true + }, function(err, collection) { - if (err) { // returns an error if the collection does not exist - return last === ++index ? next(err) : error(err); - } + if (err) { // returns an error if the collection does not exist + return last === ++index ? next(err) : error(err); + } - logger('select collection ' + collection.collectionName); - documentStore.addCollection(collection.collectionName, function (err) { + logger('select collection ' + collection.collectionName); + documentStore.addCollection(collection.collectionName, function(err) { - if (err) { - return last === ++index ? next(err) : error(err); - } + if (err) { + return last === ++index ? next(err) : error(err); + } - meta(collection, metadata, function () { + meta(collection, metadata, function() { - var stream = collection.find(query).snapshot(true).stream(); + var stream = collection.find(query).snapshot(true).stream(); - stream.once('end', function () { + stream.once('end', function() { - return last === ++index ? next(null) : null; - }).on('data', function (doc) { + return last === ++index ? next(null) : null; + }).on('data', function(doc) { - parser(doc, collection.collectionName); - }); + parser(doc, collection.collectionName); }); }); }); }); - }; + }); } /** @@ -359,66 +353,64 @@ function someCollections(documentStore) { * @param {Function} next - callback * @param {Array} collections - selected collections */ -function someCollectionsScan(documentStore) { - return function (db, name, numCursors, metadata, parser, next, - collections) { +function someCollectionsScan(db, name, numCursors, metadata, parser, next, + collections) { - var last = ~~collections.length, index = 0; - if (last === 0) { // empty set - return next(null); - } + var last = ~~collections.length, index = 0; + if (last === 0) { // empty set + return next(null); + } - collections.forEach(function (collection) { + collections.forEach(function(collection) { - db.collection(collection, { - strict: true - }, function (err, collection) { + db.collection(collection, { + strict: true + }, function(err, collection) { - if (err) { // returns an error if the collection does not exist - return last === ++index ? next(err) : error(err); - } + if (err) { // returns an error if the collection does not exist + return last === ++index ? next(err) : error(err); + } - logger('select collection scan ' + collection.collectionName); - documentStore.addCollection(collection.collectionName, function (err) { + logger('select collection scan ' + collection.collectionName); + documentStore.addCollection(collection.collectionName, function(err) { - if (err) { - return last === ++index ? next(err) : error(err); - } + if (err) { + return last === ++index ? next(err) : error(err); + } - meta(collection, metadata, function () { + meta(collection, metadata, function() { - collection.parallelCollectionScan({ - numCursors: numCursors - }, function (err, cursors) { + collection.parallelCollectionScan({ + numCursors: numCursors + }, function(err, cursors) { - if (err) { - return last === ++index ? next(err) : error(err); - } + if (err) { + return last === ++index ? next(err) : error(err); + } - var ii, cursorsDone; - ii = cursorsDone = ~~cursors.length; - if (ii === 0) { // empty set - return last === ++index ? next(null) : null; - } + var ii, cursorsDone; + ii = cursorsDone = ~~cursors.length; + if (ii === 0) { // empty set + return last === ++index ? next(null) : null; + } - for (var i = 0; i < ii; ++i) { - cursors[i].once('end', function () { + for (var i = 0; i < ii; ++i) { + cursors[i].once('end', function() { - // No more cursors let's ensure we got all results - if (--cursorsDone === 0) { - return last === ++index ? next(null) : null; - } - }).on('data', function (doc) { + // No more cursors let's ensure we got all results + if (--cursorsDone === 0) { + return last === ++index ? next(null) : null; + } + }).on('data', function(doc) { - parser(doc, collection.collectionName); - }); - } - }); + parser(doc, collection.collectionName); + }); + } }); }); }); }); - }; + }); } /** @@ -437,31 +429,31 @@ function wrapper(my) { case 'bson': BSON = require('bson'); BSON = new BSON(); - parser = toBsonAsync(my.documentStore); + parser = toBsonAsync; break; case 'json': // JSON error on ObjectId, Date and Long - parser = toJsonAsync(my.documentStore); + parser = toJsonAsync; break; default: throw new Error('missing parser option'); } } - var discriminator = allCollections(my.documentStore); + var discriminator = allCollections; if (my.collections !== null) { - discriminator = someCollections(my.documentStore); + discriminator = someCollections; if (my.numCursors) { - discriminator = someCollectionsScan(my.documentStore); + discriminator = someCollectionsScan; my.query = my.numCursors; // override } } else if (my.numCursors) { - discriminator = allCollectionsScan(my.documentStore); + discriminator = allCollectionsScan; my.query = my.numCursors; // override } if (my.logger === null) { - logger = function () { + logger = function() { return; }; @@ -479,7 +471,7 @@ function wrapper(my) { logger('backup start'); var log = require('mongodb').Logger; log.setLevel('info'); - log.setCurrentLogger(function (msg) { + log.setCurrentLogger(function(msg) { return logger(msg); }); @@ -487,9 +479,9 @@ function wrapper(my) { var metadata = ''; if (my.metadata === true) { - meta = writeMetadata(my.documentStore); + meta = writeMetadata; } else { - meta = function (a, b, c) { + meta = function(a, b, c) { return c(); }; @@ -512,20 +504,20 @@ function wrapper(my) { } } - require('mongodb').MongoClient.connect(my.uri, my.options, function (err, db) { + require('mongodb').MongoClient.connect(my.uri, my.options, function(err, db) { logger('db open'); if (err) { return callback(err); } - my.documentStore.addDatabase(db.databaseName, function (err, name) { + documentStore.addDatabase(db.databaseName, function(err, name) { function go() { // waiting for `db.fsyncLock()` on node driver return discriminator(db, name, my.query, metadata, parser, - function (err) { + function(err) { logger('db close'); db.close(); @@ -533,7 +525,7 @@ function wrapper(my) { return callback(err); } - my.documentStore.close(); + documentStore.close(); callback(null); }, my.collections); } @@ -545,7 +537,7 @@ function wrapper(my) { if (my.metadata === false) { go(); } else { - my.documentStore.addCollection('.metadata', go); + documentStore.addCollection('.metadata', go); } }); }); @@ -591,14 +583,10 @@ function backup(options) { } if (my.stream) { my.tar = true; // override - my.documentStore = streamingDocumentStore(my.root, my.stream); + documentStore = streamingDocumentStore(my.root, my.stream); } else { - my.documentStore = fileSystemDocumentStore(my.root); + documentStore = fileSystemDocumentStore(my.root); } return wrapper(my); } module.exports = backup; - - - -