diff --git a/README.md b/README.md index accf3c276..baff05ca2 100644 --- a/README.md +++ b/README.md @@ -228,6 +228,22 @@ db.getRange({ start, end, offset: 10, limit: 10 }) // skip first 10 and get next If you want to get a true array from the range results, the `asArray` property will return the results as an array. +### Catching Errors in Range Iteration +With an array, `map` and `filter` callbacks are immediately executed, but with range iterators, they are executed during iteration, so if an error occurs during iteration, the error will be thrown when the iteration is attempted. It is also critical that when an iteration is finished, the cursor is closed, so by default, if an error occurs during iteration, the cursor will immediately be closed. However, if you want to catch errors that occur in `map` (and `flatMap`) callbacks during iteration, you can use the `mapCatch` method to catch errors that occur during iteration, and allow iteration to continue (without closing the cursor). For example: + +```js +let mapped = db.getRange({ start, end }).map(({ key, value }) => { + return thisMightThrowError(value); +}).mapCatch((error) => { + // rather than letting the error terminate the iteration, we can catch it here and return a value to continue iterating: + return 'error occurred'; +}) +for (let entry of mapped) { +... +} +``` +A `mapCatch` callback can return a value to continue iterating, or throw an error to terminate the iteration. + #### Snapshots By default, a range iterator will use a database snapshot, using a single read transaction that remains open and gives a consistent view of the database at the time it was started, for the duration of iterating through the range. However, if the iteration will take place over a long period of time, keeping a read transaction open for a long time can interfere with LMDB's free space collection and reuse and increase the database size. If you will be using a long duration iterator, you can specify `snapshot: false` flag in the range options to indicate that it snapshotting is not necessary, and it can reset and renew read transactions while iterating, to allow LMDB to collect any space that was freed during iteration. diff --git a/package.json b/package.json index ccbcd54d1..940fcaac9 100644 --- a/package.json +++ b/package.json @@ -111,4 +111,4 @@ "singleQuote": true }, "optionalDependencies": {} -} +} \ No newline at end of file diff --git a/test/index.test.js b/test/index.test.js index 1a79c661f..96ce99663 100644 --- a/test/index.test.js +++ b/test/index.test.js @@ -24,7 +24,7 @@ import { keyValueToBuffer, levelup, open, - version, + version, TIMESTAMP_PLACEHOLDER, DIRECT_WRITE_PLACEHOLDER, } from '../node-index.js'; @@ -74,15 +74,15 @@ describe('lmdb-js', function () { }), ); if (version.patch >= 90) { - describe( - 'Basic use with encryption', - basicTests({ - compression: false, - encryptionKey: 'Use this key to encrypt the data', - }), - ); - //describe('Check encrypted data', basicTests({ compression: false, encryptionKey: 'Use this key to encrypt the data', checkLast: true })); - } + describe( + 'Basic use with encryption', + basicTests({ + compression: false, + encryptionKey: 'Use this key to encrypt the data', + }), + ); + //describe('Check encrypted data', basicTests({ compression: false, encryptionKey: 'Use this key to encrypt the data', checkLast: true })); + } describe('Basic use with JSON', basicTests({ encoding: 'json' })); describe( 'Basic use with ordered-binary', @@ -354,22 +354,22 @@ describe('lmdb-js', function () { should.equal(db.get('key1'), 'done!'); } }); - if (version.patch >= 90) - it('repeated ifNoExists', async function () { - let keyBase = - 'c333f4e0-f692-4bca-ad45-f805923f974f-c333f4e0-f692-4bca-ad45-f805923f974f-c333f4e0-f692-4bca-ad45-f805923f974f'; - let result; - for (let i = 0; i < 500; i++) { - let key = keyBase + (i % 100); - result = db.ifNoExists(keyBase + i, () => { - db.put(keyBase + i, 'changed', 7); - }); - if (i % 100 == 0) { - await result; + if (version.patch >= 90) + it('repeated ifNoExists', async function () { + let keyBase = + 'c333f4e0-f692-4bca-ad45-f805923f974f-c333f4e0-f692-4bca-ad45-f805923f974f-c333f4e0-f692-4bca-ad45-f805923f974f'; + let result; + for (let i = 0; i < 500; i++) { + let key = keyBase + (i % 100); + result = db.ifNoExists(keyBase + i, () => { + db.put(keyBase + i, 'changed', 7); + }); + if (i % 100 == 0) { + await result; + } } - } - await result; - }); + await result; + }); it('string with compression and versions', async function () { let str = expand('Hello world!'); await db.put('key1', str, 53252); @@ -1942,6 +1942,42 @@ describe('lmdb-js', function () { }); }); describe('RangeIterable', function () { + it('map iterate', async function () { + let a = new RangeIterable([1, 2, 3]).map((v) => v * 2); + let finished = 0; + a.onDone = () => { + finished++; + }; + let all = []; + for (let v of a) { + all.push(v); + } + all.should.deep.equal([2, 4, 6]); + expect(finished).to.be.equal(1); + all = []; + finished = 0; + let flatMapped = a.flatMap((v) => [v, v + 1]); + for (let v of flatMapped) { + all.push(v); + } + all.should.deep.equal([2, 3, 4, 5, 6, 7]); + expect(finished).to.be.equal(1); + let flatMappedWithCaughtError = a + .flatMap((v) => { + if (v === 4) throw new Error('test'); + return [v, v + 1]; + }) + .mapCatch((error) => { + return { error: error.toString() }; + }); + all = []; + finished = 0; + for (let v of flatMappedWithCaughtError) { + all.push(v); + } + all.should.deep.equal([2, 3, { error: 'Error: test' }, 6, 7]); + expect(finished).to.be.equal(1); + }); it('concat and iterate', async function () { let a = new RangeIterable([1, 2, 3]); let b = new RangeIterable([4, 5, 6]); @@ -1950,6 +1986,37 @@ describe('lmdb-js', function () { all.push(v); } all.should.deep.equal([1, 2, 3, 4, 5, 6]); + let aMapped = a.map((v) => v * 2); + all = []; + for (let v of aMapped.concat(b)) { + all.push(v); + } + all.should.deep.equal([2, 4, 6, 4, 5, 6]); + let aMappedWithError = a.map((v) => { + if (v === 2) throw new Error('test'); + return v * 2; + }); + let finished = 0; + aMappedWithError.onDone = () => { + finished++; + }; + expect(() => { + for (let v of aMappedWithError.concat(b)) { + all.push(v); + } + }).to.throw(); + expect(finished).to.be.equal(1); + let aMappedWithCaught = aMappedWithError.mapCatch((error) => { + return { error: error.toString() }; + }); + all = []; + finished = 0; + for (let v of aMappedWithCaught.concat(b)) { + all.push(v); + if (v.error) expect(finished).to.be.equal(0); // should not be finished until after the error + } + all.should.deep.equal([2, { error: 'Error: test' }, 6, 4, 5, 6]); + expect(finished).to.be.equal(1); }); }); describe('mixed keys', function () { @@ -1982,44 +2049,44 @@ describe('lmdb-js', function () { await lastPromise; }); }); - if (version.patch >= 90) { - describe('Threads', function () { - this.timeout(1000000); - it('will run a group of threads with write transactions', function (done) { - var child = spawn('node', [ - fileURLToPath(new URL('./threads.cjs', import.meta.url)), - ]); - child.stdout.on('data', function (data) { - console.log(data.toString()); - }); - child.stderr.on('data', function (data) { - console.error(data.toString()); - }); - child.on('close', function (code) { - code.should.equal(0); - done(); + if (version.patch >= 90) { + describe('Threads', function () { + this.timeout(1000000); + it('will run a group of threads with write transactions', function (done) { + var child = spawn('node', [ + fileURLToPath(new URL('./threads.cjs', import.meta.url)), + ]); + child.stdout.on('data', function (data) { + console.log(data.toString()); + }); + child.stderr.on('data', function (data) { + console.error(data.toString()); + }); + child.on('close', function (code) { + code.should.equal(0); + done(); + }); }); }); - }); - describe('Read-only Threads', function () { - this.timeout(1000000); - it('will run a group of threads with read-only transactions', function (done) { - var child = spawn('node', [ - fileURLToPath(new URL('./readonly-threads.cjs', import.meta.url)), - ]); - child.stdout.on('data', function (data) { - console.log(data.toString()); - }); - child.stderr.on('data', function (data) { - console.error(data.toString()); - }); - child.on('close', function (code) { - code.should.equal(0); - done(); + describe('Read-only Threads', function () { + this.timeout(1000000); + it('will run a group of threads with read-only transactions', function (done) { + var child = spawn('node', [ + fileURLToPath(new URL('./readonly-threads.cjs', import.meta.url)), + ]); + child.stdout.on('data', function (data) { + console.log(data.toString()); + }); + child.stderr.on('data', function (data) { + console.error(data.toString()); + }); + child.on('close', function (code) { + code.should.equal(0); + done(); + }); }); }); - }); - } + } }); function delay(ms) { diff --git a/util/RangeIterable.js b/util/RangeIterable.js index 90211e6c8..1d52ffb95 100644 --- a/util/RangeIterable.js +++ b/util/RangeIterable.js @@ -11,6 +11,7 @@ const RETURN_DONE = { if (!Symbol.asyncIterator) { Symbol.asyncIterator = Symbol.for('Symbol.asyncIterator'); } +const NO_OPTIONS = {}; export class RangeIterable { constructor(sourceArray) { @@ -21,33 +22,36 @@ export class RangeIterable { map(func) { let source = this; let iterable = new RangeIterable(); - iterable.iterate = (async) => { - let iterator = source[async ? Symbol.asyncIterator : Symbol.iterator](); + iterable.iterate = (options = NO_OPTIONS) => { + const { async } = options; + let iterator = + source[async ? Symbol.asyncIterator : Symbol.iterator](options); if (!async) source.isSync = true; - let i = 0; + let i = -1; return { next(resolvedResult) { - try { - let result; - do { - let iteratorResult; + let result; + do { + let iteratorResult; + try { if (resolvedResult) { iteratorResult = resolvedResult; resolvedResult = null; // don't go in this branch on next iteration } else { + i++; iteratorResult = iterator.next(); if (iteratorResult.then) { if (!async) { this.throw( new Error( - 'Can not synchronously iterate with asynchronous values', + 'Can not synchronously iterate with promises as iterator results', ), ); } return iteratorResult.then( (iteratorResult) => this.next(iteratorResult), (error) => { - this.throw(error); + return this.throw(error); }, ); } @@ -57,31 +61,47 @@ export class RangeIterable { if (iterable.onDone) iterable.onDone(); return iteratorResult; } - result = func.call(source, iteratorResult.value, i++); - if (result && result.then && async) { - // if async, wait for promise to resolve before returning iterator result - return result.then( - (result) => - result === SKIP - ? this.next() - : { - value: result, - }, - (error) => { - this.throw(error); - }, - ); + try { + result = func.call(source, iteratorResult.value, i); + if (result && result.then && async) { + // if async, wait for promise to resolve before returning iterator result + return result.then( + (result) => + result === SKIP + ? this.next() + : { + value: result, + }, + (error) => { + if (options.continueOnRecoverableError) + error.continueIteration = true; + return this.throw(error); + }, + ); + } + } catch (error) { + // if the error came from the user function, we can potentially mark it for continuing iteration + if (options.continueOnRecoverableError) + error.continueIteration = true; + throw error; // throw to next catch to handle } - } while (result === SKIP); - if (result === DONE) { - return this.return(); + } catch (error) { + if (iterable.mapError) { + // if we have mapError, we can use it to further handle errors + try { + result = iterable.mapError(error, i); + } catch (mapError) { + return this.throw(mapError); + } + } else return this.throw(error); } - return { - value: result, - }; - } catch (error) { - this.throw(error); + } while (result === SKIP); + if (result === DONE) { + return this.return(); } + return { + value: result, + }; }, return(value) { if (!this.done) { @@ -93,6 +113,21 @@ export class RangeIterable { return RETURN_DONE; }, throw(error) { + if (error.continueIteration) { + // if it's a recoverable error, we can return or throw without closing the iterator + if (iterable.returnRecoverableErrors) + try { + return { + value: iterable.returnRecoverableErrors(error), + }; + } catch (error) { + // if this throws, we need to go back to closing the iterator + this.return(); + throw error; + } + if (options.continueOnRecoverableError) throw error; // throw without closing iterator + } + // else we are done with the iterator (and can throw) this.return(); throw error; }, @@ -100,20 +135,30 @@ export class RangeIterable { }; return iterable; } - [Symbol.asyncIterator]() { - return (this.iterator = this.iterate(true)); + [Symbol.asyncIterator](options) { + if (options) options.async = true; + else options = { async: true }; + return (this.iterator = this.iterate(options)); } - [Symbol.iterator]() { - return (this.iterator = this.iterate()); + [Symbol.iterator](options) { + return (this.iterator = this.iterate(options)); } filter(func) { - return this.map((element) => { + let iterable = this.map((element) => { let result = func(element); // handle promise if (result?.then) return result.then((result) => (result ? element : SKIP)); else return result ? element : SKIP; }); + let iterate = iterable.iterate; + iterable.iterate = (options) => { + // explicitly prevent continue on recoverable error with filter + if (options.continueOnRecoverableError) + options.continueOnRecoverableError = false; + return iterate(options); + }; + return iterable; } forEach(callback) { @@ -125,21 +170,23 @@ export class RangeIterable { } concat(secondIterable) { let concatIterable = new RangeIterable(); - concatIterable.iterate = (async) => { - let iterator = (this.iterator = this.iterate(async)); + concatIterable.iterate = (options = NO_OPTIONS) => { + let iterator = (this.iterator = this.iterate(options)); let isFirst = true; function iteratorDone(result) { if (isFirst) { try { isFirst = false; iterator = - secondIterable[async ? Symbol.asyncIterator : Symbol.iterator](); + secondIterable[ + options.async ? Symbol.asyncIterator : Symbol.iterator + ](); result = iterator.next(); if (concatIterable.onDone) { if (result.then) { - if (!async) + if (!options.async) throw new Error( - 'Can not synchronously iterate with asynchronous values', + 'Can not synchronously iterate with promises as iterator results', ); result.then( (result) => { @@ -167,7 +214,7 @@ export class RangeIterable { if (result.then) { if (!async) throw new Error( - 'Can synchronously iterate with asynchronous values', + 'Can not synchronously iterate with promises as iterator results', ); return result.then((result) => { if (result.done) return iteratorDone(result); @@ -177,8 +224,7 @@ export class RangeIterable { if (result.done) return iteratorDone(result); return result; } catch (error) { - this.return(); - throw error; + this.throw(error); } }, return(value) { @@ -191,6 +237,7 @@ export class RangeIterable { return RETURN_DONE; }, throw(error) { + if (options.continueOnRecoverableError) throw error; this.return(); throw error; }, @@ -201,8 +248,8 @@ export class RangeIterable { flatMap(callback) { let mappedIterable = new RangeIterable(); - mappedIterable.iterate = (async) => { - let iterator = (this.iterator = this.iterate(async)); + mappedIterable.iterate = (options = NO_OPTIONS) => { + let iterator = (this.iterator = this.iterate(options)); let isFirst = true; let currentSubIterator; return { @@ -216,9 +263,9 @@ export class RangeIterable { resolvedResult = undefined; } else result = currentSubIterator.next(); if (result.then) { - if (!async) + if (!options.async) throw new Error( - 'Can not synchronously iterate with asynchronous values', + 'Can not synchronously iterate with promises as iterator results', ); return result.then((result) => this.next(result)); } @@ -228,9 +275,9 @@ export class RangeIterable { } let result = resolvedResult ?? iterator.next(); if (result.then) { - if (!async) + if (!options.async) throw new Error( - 'Can not synchronously iterate with asynchronous values', + 'Can not synchronously iterate with promises as iterator results', ); currentSubIterator = undefined; return result.then((result) => this.next(result)); @@ -239,32 +286,47 @@ export class RangeIterable { if (mappedIterable.onDone) mappedIterable.onDone(); return result; } - let value = callback(result.value); - if (value?.then) { - if (!async) - throw new Error( - 'Can not synchronously iterate with asynchronous values', + try { + let value = callback(result.value); + if (value?.then) { + if (!options.async) + throw new Error( + 'Can not synchronously iterate with promises as iterator results', + ); + return value.then( + (value) => { + if ( + Array.isArray(value) || + value instanceof RangeIterable + ) { + currentSubIterator = value[Symbol.iterator](); + return this.next(); + } else { + currentSubIterator = null; + return { value }; + } + }, + (error) => { + if (options.continueOnRecoverableError) + error.continueIteration = true; + this.throw(error); + }, ); - return value.then((value) => { - if (Array.isArray(value) || value instanceof RangeIterable) { - currentSubIterator = value[Symbol.iterator](); - return this.next(); - } else { - currentSubIterator = null; - return { value }; - } - }); - } - if (Array.isArray(value) || value instanceof RangeIterable) - currentSubIterator = value[Symbol.iterator](); - else { - currentSubIterator = null; - return { value }; + } + if (Array.isArray(value) || value instanceof RangeIterable) + currentSubIterator = value[Symbol.iterator](); + else { + currentSubIterator = null; + return { value }; + } + } catch (error) { + if (options.continueOnRecoverableError) + error.continueIteration = true; + throw error; } } while (true); } catch (error) { - this.return(); - throw error; + this.throw(error); } }, return() { @@ -272,10 +334,12 @@ export class RangeIterable { if (currentSubIterator) currentSubIterator.return(); return iterator.return(); }, - throw() { + throw(error) { + if (options.continueOnRecoverableError) throw error; if (mappedIterable.onDone) mappedIterable.onDone(); - if (currentSubIterator) currentSubIterator.throw(); - return iterator.throw(); + if (currentSubIterator) currentSubIterator.return(); + this.return(); + throw error; }, }; }; @@ -283,7 +347,7 @@ export class RangeIterable { } slice(start, end) { - return this.map((element, i) => { + let iterable = this.map((element, i) => { if (i < start) return SKIP; if (i >= end) { DONE.value = element; @@ -291,6 +355,27 @@ export class RangeIterable { } return element; }); + iterable.mapError = (error, i) => { + if (i < start) return SKIP; + if (i >= end) { + return DONE; + } + throw error; + }; + return iterable; + } + mapCatch(catch_callback) { + let iterable = this.map((element) => { + return element; + }); + let iterate = iterable.iterate; + iterable.iterate = (options = NO_OPTIONS) => { + // we need to ensure the whole stack + // of iterables is set up to handle recoverable errors and continue iteration + return iterate({ ...options, continueOnRecoverableError: true }); + }; + iterable.returnRecoverableErrors = catch_callback; + return iterable; } next() { if (!this.iterator) this.iterator = this.iterate();