Skip to content

Commit

Permalink
Merge pull request #303 from kriszyp/add-mapCatch
Browse files Browse the repository at this point in the history
Add support for catching errors during iteration
  • Loading branch information
kriszyp authored Aug 29, 2024
2 parents f74c02d + 5d9035b commit 3fe3635
Show file tree
Hide file tree
Showing 4 changed files with 304 additions and 136 deletions.
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,22 @@ db.getRange({ start, end, offset: 10, limit: 10 }) // skip first 10 and get next

If you want to get a true array from the range results, the `asArray` property will return the results as an array.

### Catching Errors in Range Iteration
With an array, `map` and `filter` callbacks are immediately executed, but with range iterators, they are executed during iteration, so if an error occurs during iteration, the error will be thrown when the iteration is attempted. It is also critical that when an iteration is finished, the cursor is closed, so by default, if an error occurs during iteration, the cursor will immediately be closed. However, if you want to catch errors that occur in `map` (and `flatMap`) callbacks during iteration, you can use the `mapCatch` method to catch errors that occur during iteration, and allow iteration to continue (without closing the cursor). For example:

```js
let mapped = db.getRange({ start, end }).map(({ key, value }) => {
return thisMightThrowError(value);
}).mapCatch((error) => {
// rather than letting the error terminate the iteration, we can catch it here and return a value to continue iterating:
return 'error occurred';
})
for (let entry of mapped) {
...
}
```
A `mapCatch` callback can return a value to continue iterating, or throw an error to terminate the iteration.

#### Snapshots
By default, a range iterator will use a database snapshot, using a single read transaction that remains open and gives a consistent view of the database at the time it was started, for the duration of iterating through the range. However, if the iteration will take place over a long period of time, keeping a read transaction open for a long time can interfere with LMDB's free space collection and reuse and increase the database size. If you will be using a long duration iterator, you can specify `snapshot: false` flag in the range options to indicate that it snapshotting is not necessary, and it can reset and renew read transactions while iterating, to allow LMDB to collect any space that was freed during iteration.

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,4 @@
"singleQuote": true
},
"optionalDependencies": {}
}
}
185 changes: 126 additions & 59 deletions test/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import {
keyValueToBuffer,
levelup,
open,
version,
version,
TIMESTAMP_PLACEHOLDER,
DIRECT_WRITE_PLACEHOLDER,
} from '../node-index.js';
Expand Down Expand Up @@ -74,15 +74,15 @@ describe('lmdb-js', function () {
}),
);
if (version.patch >= 90) {
describe(
'Basic use with encryption',
basicTests({
compression: false,
encryptionKey: 'Use this key to encrypt the data',
}),
);
//describe('Check encrypted data', basicTests({ compression: false, encryptionKey: 'Use this key to encrypt the data', checkLast: true }));
}
describe(
'Basic use with encryption',
basicTests({
compression: false,
encryptionKey: 'Use this key to encrypt the data',
}),
);
//describe('Check encrypted data', basicTests({ compression: false, encryptionKey: 'Use this key to encrypt the data', checkLast: true }));
}
describe('Basic use with JSON', basicTests({ encoding: 'json' }));
describe(
'Basic use with ordered-binary',
Expand Down Expand Up @@ -354,22 +354,22 @@ describe('lmdb-js', function () {
should.equal(db.get('key1'), 'done!');
}
});
if (version.patch >= 90)
it('repeated ifNoExists', async function () {
let keyBase =
'c333f4e0-f692-4bca-ad45-f805923f974f-c333f4e0-f692-4bca-ad45-f805923f974f-c333f4e0-f692-4bca-ad45-f805923f974f';
let result;
for (let i = 0; i < 500; i++) {
let key = keyBase + (i % 100);
result = db.ifNoExists(keyBase + i, () => {
db.put(keyBase + i, 'changed', 7);
});
if (i % 100 == 0) {
await result;
if (version.patch >= 90)
it('repeated ifNoExists', async function () {
let keyBase =
'c333f4e0-f692-4bca-ad45-f805923f974f-c333f4e0-f692-4bca-ad45-f805923f974f-c333f4e0-f692-4bca-ad45-f805923f974f';
let result;
for (let i = 0; i < 500; i++) {
let key = keyBase + (i % 100);
result = db.ifNoExists(keyBase + i, () => {
db.put(keyBase + i, 'changed', 7);
});
if (i % 100 == 0) {
await result;
}
}
}
await result;
});
await result;
});
it('string with compression and versions', async function () {
let str = expand('Hello world!');
await db.put('key1', str, 53252);
Expand Down Expand Up @@ -1942,6 +1942,42 @@ describe('lmdb-js', function () {
});
});
describe('RangeIterable', function () {
it('map iterate', async function () {
let a = new RangeIterable([1, 2, 3]).map((v) => v * 2);
let finished = 0;
a.onDone = () => {
finished++;
};
let all = [];
for (let v of a) {
all.push(v);
}
all.should.deep.equal([2, 4, 6]);
expect(finished).to.be.equal(1);
all = [];
finished = 0;
let flatMapped = a.flatMap((v) => [v, v + 1]);
for (let v of flatMapped) {
all.push(v);
}
all.should.deep.equal([2, 3, 4, 5, 6, 7]);
expect(finished).to.be.equal(1);
let flatMappedWithCaughtError = a
.flatMap((v) => {
if (v === 4) throw new Error('test');
return [v, v + 1];
})
.mapCatch((error) => {
return { error: error.toString() };
});
all = [];
finished = 0;
for (let v of flatMappedWithCaughtError) {
all.push(v);
}
all.should.deep.equal([2, 3, { error: 'Error: test' }, 6, 7]);
expect(finished).to.be.equal(1);
});
it('concat and iterate', async function () {
let a = new RangeIterable([1, 2, 3]);
let b = new RangeIterable([4, 5, 6]);
Expand All @@ -1950,6 +1986,37 @@ describe('lmdb-js', function () {
all.push(v);
}
all.should.deep.equal([1, 2, 3, 4, 5, 6]);
let aMapped = a.map((v) => v * 2);
all = [];
for (let v of aMapped.concat(b)) {
all.push(v);
}
all.should.deep.equal([2, 4, 6, 4, 5, 6]);
let aMappedWithError = a.map((v) => {
if (v === 2) throw new Error('test');
return v * 2;
});
let finished = 0;
aMappedWithError.onDone = () => {
finished++;
};
expect(() => {
for (let v of aMappedWithError.concat(b)) {
all.push(v);
}
}).to.throw();
expect(finished).to.be.equal(1);
let aMappedWithCaught = aMappedWithError.mapCatch((error) => {
return { error: error.toString() };
});
all = [];
finished = 0;
for (let v of aMappedWithCaught.concat(b)) {
all.push(v);
if (v.error) expect(finished).to.be.equal(0); // should not be finished until after the error
}
all.should.deep.equal([2, { error: 'Error: test' }, 6, 4, 5, 6]);
expect(finished).to.be.equal(1);
});
});
describe('mixed keys', function () {
Expand Down Expand Up @@ -1982,44 +2049,44 @@ describe('lmdb-js', function () {
await lastPromise;
});
});
if (version.patch >= 90) {
describe('Threads', function () {
this.timeout(1000000);
it('will run a group of threads with write transactions', function (done) {
var child = spawn('node', [
fileURLToPath(new URL('./threads.cjs', import.meta.url)),
]);
child.stdout.on('data', function (data) {
console.log(data.toString());
});
child.stderr.on('data', function (data) {
console.error(data.toString());
});
child.on('close', function (code) {
code.should.equal(0);
done();
if (version.patch >= 90) {
describe('Threads', function () {
this.timeout(1000000);
it('will run a group of threads with write transactions', function (done) {
var child = spawn('node', [
fileURLToPath(new URL('./threads.cjs', import.meta.url)),
]);
child.stdout.on('data', function (data) {
console.log(data.toString());
});
child.stderr.on('data', function (data) {
console.error(data.toString());
});
child.on('close', function (code) {
code.should.equal(0);
done();
});
});
});
});
describe('Read-only Threads', function () {
this.timeout(1000000);
it('will run a group of threads with read-only transactions', function (done) {
var child = spawn('node', [
fileURLToPath(new URL('./readonly-threads.cjs', import.meta.url)),
]);
child.stdout.on('data', function (data) {
console.log(data.toString());
});
child.stderr.on('data', function (data) {
console.error(data.toString());
});
child.on('close', function (code) {
code.should.equal(0);
done();
describe('Read-only Threads', function () {
this.timeout(1000000);
it('will run a group of threads with read-only transactions', function (done) {
var child = spawn('node', [
fileURLToPath(new URL('./readonly-threads.cjs', import.meta.url)),
]);
child.stdout.on('data', function (data) {
console.log(data.toString());
});
child.stderr.on('data', function (data) {
console.error(data.toString());
});
child.on('close', function (code) {
code.should.equal(0);
done();
});
});
});
});
}
}
});

function delay(ms) {
Expand Down
Loading

0 comments on commit 3fe3635

Please sign in to comment.