Skip to content

Commit

Permalink
Async kv map (#257)
Browse files Browse the repository at this point in the history
  • Loading branch information
oliver-oloughlin authored Jan 26, 2025
1 parent 203a123 commit bfd9f2e
Show file tree
Hide file tree
Showing 13 changed files with 174 additions and 127 deletions.
2 changes: 1 addition & 1 deletion deno.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@olli/kvdex",
"version": "3.0.2",
"version": "3.1.0",
"exports": {
".": "./mod.ts",
"./zod": "./src/ext/zod/mod.ts",
Expand Down
17 changes: 9 additions & 8 deletions src/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ export class Collection<

// Create hsitory entries iterator
const listOptions = createListOptions(options);
const iter = this.kv.list(selector, listOptions);
const iter = await this.kv.list(selector, listOptions);

// Collect history entries
let count = 0;
Expand Down Expand Up @@ -1258,7 +1258,7 @@ export class Collection<
// Perform quick delete if all documents are to be deleted
if (selectsAll(options)) {
// Create list iterator and empty keys list, init atomic operation
const iter = this.kv.list({ prefix: this._keys.base }, options);
const iter = await this.kv.list({ prefix: this._keys.base }, options);

const keys: DenoKvStrictKey[] = [];
const atomic = new AtomicWrapper(this.kv);
Expand All @@ -1270,7 +1270,8 @@ export class Collection<

// Set history entries if keeps history
if (this._keepsHistory) {
for await (const { key } of this.kv.list({ prefix: this._keys.id })) {
const historyIter = await this.kv.list({ prefix: this._keys.id });
for await (const { key } of historyIter) {
const id = getDocumentId(key as DenoKvStrictKey);

if (!id) {
Expand Down Expand Up @@ -1836,7 +1837,7 @@ export class Collection<

// Perform efficient count if counting all document entries
if (selectsAll(options)) {
const iter = this.kv.list({ prefix: this._keys.id }, options);
const iter = await this.kv.list({ prefix: this._keys.id }, options);
for await (const _ of iter) {
result++;
}
Expand Down Expand Up @@ -2073,8 +2074,8 @@ export class Collection<
const atomic = new AtomicWrapper(this.kv);
const historyKeyPrefix = extendKey(this._keys.history, id);
const historySegmentKeyPrefix = extendKey(this._keys.historySegment, id);
const historyIter = this.kv.list({ prefix: historyKeyPrefix });
const historySegmentIter = this.kv.list({
const historyIter = await this.kv.list({ prefix: historyKeyPrefix });
const historySegmentIter = await this.kv.list({
prefix: historySegmentKeyPrefix,
});

Expand Down Expand Up @@ -2464,7 +2465,7 @@ export class Collection<
if (this._encoder) {
const atomic = new AtomicWrapper(this.kv);
const keyPrefix = extendKey(this._keys.segment, id);
const iter = this.kv.list({ prefix: keyPrefix });
const iter = await this.kv.list({ prefix: keyPrefix });

for await (const { key } of iter) {
atomic.delete(key as DenoKvStrictKey);
Expand Down Expand Up @@ -2585,7 +2586,7 @@ export class Collection<
// Create list iterator with given options
const selector = createListSelector(prefixKey, options);
const listOptions = createListOptions(options);
const iter = this.kv.list(selector, listOptions);
const iter = await this.kv.list(selector, listOptions);

// Initiate lists
const docs: Document<TOutput, ParseId<TOptions>>[] = [];
Expand Down
33 changes: 33 additions & 0 deletions src/ext/kv/async_lock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
export class AsyncLock {
private queue: PromiseWithResolvers<void>[];

constructor() {
this.queue = [];
}

async run<T>(fn: () => Promise<T>): Promise<T> {
await this.lock();
const result = await fn();
this.release();
return result;
}

async close() {
for (const lock of this.queue) {
lock.resolve();
await lock.promise;
}
}

private async lock() {
const prev = this.queue.at(-1);
const next = Promise.withResolvers<void>();
this.queue.push(next);
await prev?.promise;
}

private release() {
const lock = this.queue.shift();
lock?.resolve();
}
}
83 changes: 46 additions & 37 deletions src/ext/kv/atomic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,20 @@ import type {
DenoKvSetOptions,
DenoKvStrictKey,
} from "../../types.ts";
import { allFulfilled } from "../../utils.ts";
import type { AsyncLock } from "./async_lock.ts";
import type { MapKv } from "./map_kv.ts";
import { createVersionstamp } from "./utils.ts";

export class MapKvAtomicOperation implements DenoAtomicOperation {
private kv: MapKv;
private checks: (() => boolean)[];
private ops: ((versionstamp: string) => void)[];
private lock: AsyncLock;
private checks: (() => Promise<boolean>)[];
private ops: ((versionstamp: string) => Promise<void>)[];

constructor(kv: MapKv) {
constructor(kv: MapKv, lock: AsyncLock) {
this.kv = kv;
this.lock = lock;
this.checks = [];
this.ops = [];
}
Expand All @@ -26,9 +30,9 @@ export class MapKvAtomicOperation implements DenoAtomicOperation {
value: unknown,
options?: DenoKvSetOptions,
): DenoAtomicOperation {
this.ops.push((versionstamp) =>
this.kv._set(key, value, versionstamp, options)
);
this.ops.push(async (versionstamp) => {
await this.kv._set(key, value, versionstamp, options);
});
return this;
}

Expand All @@ -38,10 +42,10 @@ export class MapKvAtomicOperation implements DenoAtomicOperation {
}

min(key: DenoKvStrictKey, n: bigint): DenoAtomicOperation {
this.ops.push((versionstamp) => {
const { value } = this.kv.get(key);
this.ops.push(async (versionstamp) => {
const { value } = await this.kv.get(key);
if (!value) {
this.kv._set(key, { value: n }, versionstamp);
await this.kv._set(key, { value: n }, versionstamp);
return;
}

Expand All @@ -50,7 +54,7 @@ export class MapKvAtomicOperation implements DenoAtomicOperation {
throw new Error("Min operation can only be performed on KvU64 value");
}

this.kv._set(key, {
await this.kv._set(key, {
value: n < val ? n : val,
}, versionstamp);
});
Expand All @@ -59,10 +63,10 @@ export class MapKvAtomicOperation implements DenoAtomicOperation {
}

max(key: DenoKvStrictKey, n: bigint): DenoAtomicOperation {
this.ops.push((versionstamp) => {
const { value } = this.kv.get(key);
this.ops.push(async (versionstamp) => {
const { value } = await this.kv.get(key);
if (!value) {
this.kv._set(key, { value: n }, versionstamp);
await this.kv._set(key, { value: n }, versionstamp);
return;
}

Expand All @@ -71,7 +75,7 @@ export class MapKvAtomicOperation implements DenoAtomicOperation {
throw new Error("Max operation can only be performed on KvU64 value");
}

this.kv._set(key, {
await this.kv._set(key, {
value: n > val ? n : val,
}, versionstamp);
});
Expand All @@ -80,10 +84,10 @@ export class MapKvAtomicOperation implements DenoAtomicOperation {
}

sum(key: DenoKvStrictKey, n: bigint): DenoAtomicOperation {
this.ops.push((versionstamp) => {
const { value } = this.kv.get(key);
this.ops.push(async (versionstamp) => {
const { value } = await this.kv.get(key);
if (!value) {
this.kv._set(key, { value: n }, versionstamp);
await this.kv._set(key, { value: n }, versionstamp);
return;
}

Expand All @@ -92,7 +96,7 @@ export class MapKvAtomicOperation implements DenoAtomicOperation {
throw new Error("Sum operation can only be performed on KvU64 value");
}

this.kv._set(key, {
await this.kv._set(key, {
value: n + val,
}, versionstamp);
});
Expand All @@ -102,8 +106,8 @@ export class MapKvAtomicOperation implements DenoAtomicOperation {

check(...checks: DenoAtomicCheck[]): DenoAtomicOperation {
checks.forEach(({ key, versionstamp }) => {
this.checks.push(() => {
const entry = this.kv.get(key);
this.checks.push(async () => {
const entry = await this.kv.get(key);
return entry.versionstamp === versionstamp;
});
});
Expand All @@ -112,31 +116,36 @@ export class MapKvAtomicOperation implements DenoAtomicOperation {
}

enqueue(value: unknown, options?: DenoKvEnqueueOptions): DenoAtomicOperation {
this.ops.push((versionstamp) => {
this.kv._enqueue(value, versionstamp, options);
this.ops.push(async (versionstamp) => {
await this.kv._enqueue(value, versionstamp, options);
});

return this;
}

commit(): DenoKvCommitError | DenoKvCommitResult {
const passedChecks = this.checks
.map((check) => check())
.every((check) => check);
async commit(): Promise<DenoKvCommitError | DenoKvCommitResult> {
return await this.lock.run(async () => {
const checks = await Promise.allSettled(
this.checks.map((check) => check()),
);

if (!passedChecks) {
return {
ok: false,
};
}
const passedChecks = checks.every((checkResult) =>
checkResult.status === "fulfilled" && checkResult.value
);

const versionstamp = createVersionstamp();
if (!passedChecks) {
return {
ok: false,
};
}

this.ops.forEach((op) => op(versionstamp));
const versionstamp = createVersionstamp();
await allFulfilled(this.ops.map((op) => op(versionstamp)));

return {
ok: true,
versionstamp,
};
return {
ok: true,
versionstamp,
};
});
}
}
Loading

0 comments on commit bfd9f2e

Please sign in to comment.