Skip to content

Commit

Permalink
Introducing PCC locking for DBTransaction
Browse files Browse the repository at this point in the history
  • Loading branch information
CMCDragonkai committed Jun 30, 2022
1 parent 58a62f2 commit a12afb4
Show file tree
Hide file tree
Showing 7 changed files with 348 additions and 66 deletions.
14 changes: 7 additions & 7 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
},
"dependencies": {
"@matrixai/async-init": "^1.8.1",
"@matrixai/async-locks": "^2.3.1",
"@matrixai/async-locks": "^3.0.0",
"@matrixai/errors": "^1.1.2",
"@matrixai/logger": "^2.2.2",
"@matrixai/logger": "^2.3.0",
"@matrixai/resources": "^1.1.3",
"@matrixai/workers": "^1.3.3",
"node-gyp-build": "4.4.0",
Expand Down
8 changes: 8 additions & 0 deletions src/DB.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { ResourceAcquire } from '@matrixai/resources';
import type { RWLockWriter } from '@matrixai/async-locks';
import type {
KeyPath,
LevelPath,
Expand All @@ -20,6 +21,7 @@ import {
CreateDestroyStartStop,
ready,
} from '@matrixai/async-init/dist/CreateDestroyStartStop';
import { LockBox } from '@matrixai/async-locks';
import DBIterator from './DBIterator';
import DBTransaction from './DBTransaction';
import { rocksdbP } from './rocksdb';
Expand Down Expand Up @@ -69,6 +71,7 @@ class DB {
protected fs: FileSystem;
protected logger: Logger;
protected workerManager?: DBWorkerManagerInterface;
protected _lockBox: LockBox<RWLockWriter> = new LockBox();
protected _db: RocksDBDatabase;
/**
* References to iterators
Expand Down Expand Up @@ -97,6 +100,10 @@ class DB {
return this._transactionRefs;
}

get lockBox(): Readonly<LockBox<RWLockWriter>> {
return this._lockBox;
}

constructor({
dbPath,
crypto,
Expand Down Expand Up @@ -193,6 +200,7 @@ class DB {
return async () => {
const tran = new DBTransaction({
db: this,
lockBox: this._lockBox,
logger: this.logger,
});
return [
Expand Down
176 changes: 145 additions & 31 deletions src/DBTransaction.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
import type { ResourceRelease } from '@matrixai/resources';
import type {
LockBox,
MultiLockRequest as AsyncLocksMultiLockRequest,
} from '@matrixai/async-locks';
import type DB from './DB';
import type {
ToString,
KeyPath,
LevelPath,
DBIteratorOptions,
DBClearOptions,
DBCountOptions,
MultiLockRequest,
} from './types';
import type {
RocksDBTransaction,
Expand All @@ -13,6 +20,7 @@ import type {
} from './rocksdb/types';
import Logger from '@matrixai/logger';
import { CreateDestroy, ready } from '@matrixai/async-init/dist/CreateDestroy';
import { RWLockWriter } from '@matrixai/async-locks';
import DBIterator from './DBIterator';
import { rocksdbP } from './rocksdb';
import * as utils from './utils';
Expand All @@ -21,37 +29,44 @@ import * as errors from './errors';
interface DBTransaction extends CreateDestroy {}
@CreateDestroy()
class DBTransaction {
public readonly id: number;

protected _db: DB;
protected logger: Logger;

protected lockBox: LockBox<RWLockWriter>;
protected _locks: Map<
string,
{
lock: RWLockWriter;
type: 'read' | 'write';
release: ResourceRelease;
}
> = new Map();
protected _options: RocksDBTransactionOptions;
protected _transaction: RocksDBTransaction;
protected _id: number;
protected _snapshot: RocksDBTransactionSnapshot;

protected _iteratorRefs: Set<DBIterator<any, any>> = new Set();
protected _callbacksSuccess: Array<() => any> = [];
protected _callbacksFailure: Array<(e?: Error) => any> = [];
protected _callbacksFinally: Array<(e?: Error) => any> = [];
protected _committed: boolean = false;
protected _rollbacked: boolean = false;

/**
* References to iterators
*/
protected _iteratorRefs: Set<DBIterator<any, any>> = new Set();

public constructor({
db,
lockBox,
logger,
...options
}: {
db: DB;
lockBox: LockBox<RWLockWriter>;
logger?: Logger;
} & RocksDBTransactionOptions) {
logger = logger ?? new Logger(this.constructor.name);
logger.debug(`Constructing ${this.constructor.name}`);
this.logger = logger;
this._db = db;
this.lockBox = lockBox;
const options_ = {
...options,
// Transactions should be synchronous
Expand All @@ -61,21 +76,24 @@ class DBTransaction {
this._options = options_;
this._transaction = rocksdbP.transactionInit(db.db, options_);
db.transactionRefs.add(this);
this._id = rocksdbP.transactionId(this._transaction);
logger.debug(`Constructed ${this.constructor.name} ${this._id}`);
this.id = rocksdbP.transactionId(this._transaction);
logger.debug(`Constructed ${this.constructor.name} ${this.id}`);
}

/**
* Destroy the transaction
* This cannot be called until the transaction is committed or rollbacked
*/
public async destroy() {
this.logger.debug(`Destroying ${this.constructor.name} ${this._id}`);
this._db.transactionRefs.delete(this);
this.logger.debug(`Destroying ${this.constructor.name} ${this.id}`);
if (!this._committed && !this._rollbacked) {
throw new errors.ErrorDBTransactionNotCommittedNorRollbacked();
}
this.logger.debug(`Destroyed ${this.constructor.name} ${this._id}`);
this._db.transactionRefs.delete(this);
// Unlock all locked keys in reverse
const lockedKeys = [...this._locks.keys()].reverse();
await this.unlock(...lockedKeys);
this.logger.debug(`Destroyed ${this.constructor.name} ${this.id}`);
}

get db(): Readonly<DB> {
Expand All @@ -86,17 +104,6 @@ class DBTransaction {
return this._transaction;
}

get id(): number {
return this._id;
}

/**
* @internal
*/
get iteratorRefs(): Readonly<Set<DBIterator<any, any>>> {
return this._iteratorRefs;
}

get callbacksSuccess(): Readonly<Array<() => any>> {
return this._callbacksSuccess;
}
Expand All @@ -117,6 +124,98 @@ class DBTransaction {
return this._rollbacked;
}

get locks(): ReadonlyMap<
string,
{
lock: RWLockWriter;
type: 'read' | 'write';
release: ResourceRelease;
}
> {
return this._locks;
}

/**
* @internal
*/
get iteratorRefs(): Readonly<Set<DBIterator<any, any>>> {
return this._iteratorRefs;
}

/**
* Lock a sequence of lock requests
* If the lock request doesn't specify, it
* defaults to using `RWLockWriter` with `write` type
* Keys are locked in string sorted order
* Even though keys can be arbitrary strings, by convention, you should use
* keys that correspond to keys in the database
* Locking with the same key is idempotent therefore lock re-entrancy is enabled
* Keys are automatically unlocked in reverse sorted order
* when the transaction is destroyed
* There is no support for lock upgrading or downgrading
* There is no deadlock detection
*/
public async lock(
...requests: Array<MultiLockRequest | string>
): Promise<void> {
const requests_: Array<AsyncLocksMultiLockRequest<RWLockWriter>> = [];
for (const request of requests) {
if (Array.isArray(request)) {
const [key, ...lockingParams] = request;
const key_ = key.toString();
const lock = this._locks.get(key_);
// Default the lock type to `write`
const lockType = (lockingParams[0] = lockingParams[0] ?? 'write');
if (lock == null) {
requests_.push([key_, RWLockWriter, ...lockingParams]);
} else if (lock.type !== lockType) {
throw new errors.ErrorDBTransactionLockType();
}
} else {
const key_ = request.toString();
const lock = this._locks.get(key_);
if (lock == null) {
// Default to using `RWLockWriter` write lock for just string keys
requests_.push([key_, RWLockWriter, 'write']);
} else if (lock.type !== 'write') {
throw new errors.ErrorDBTransactionLockType();
}
}
}
if (requests_.length > 0) {
// Duplicates are eliminated, and the returned acquisitions are sorted
const lockAcquires = this.lockBox.lockMulti(...requests_);
for (const [key, lockAcquire, ...lockingParams] of lockAcquires) {
const [lockRelease, lock] = await lockAcquire();
// The `Map` will maintain insertion order
// these must be unlocked in reverse order
// when the transaction is destroyed
this._locks.set(key as string, {
lock: lock!,
type: lockingParams[0]!, // The `type` is defaulted to `write`
release: lockRelease,
});
}
}
}

/**
* Unlock a sequence of lock keys
* Unlocking will be done in the order of the keys
* A transaction instance is only allowed to unlock keys that it previously
* locked, all keys that are not part of the `this._locks` is ignored
* Unlocking the same keys is idempotent
*/
public async unlock(...keys: Array<ToString>): Promise<void> {
for (const key of keys) {
const key_ = key.toString();
const lock = this._locks.get(key_);
if (lock == null) continue;
this._locks.delete(key_);
await lock.release();
}
}

public async get<T>(
keyPath: KeyPath | string | Buffer,
raw?: false,
Expand Down Expand Up @@ -344,7 +443,7 @@ class DBTransaction {
if (this._committed) {
return;
}
this.logger.debug(`Committing ${this.constructor.name} ${this._id}`);
this.logger.debug(`Committing ${this.constructor.name} ${this.id}`);
for (const iterator of this._iteratorRefs) {
await iterator.destroy();
}
Expand All @@ -357,12 +456,14 @@ class DBTransaction {
} catch (e) {
if (e.code === 'TRANSACTION_CONFLICT') {
this.logger.debug(
`Failed Committing ${this.constructor.name} ${this._id} due to ${errors.ErrorDBTransactionConflict.name}`,
`Failed Committing ${this.constructor.name} ${this.id} due to ${errors.ErrorDBTransactionConflict.name}`,
);
throw new errors.ErrorDBTransactionConflict(undefined, { cause: e });
throw new errors.ErrorDBTransactionConflict(undefined, {
cause: e,
});
} else {
this.logger.debug(
`Failed Committing ${this.constructor.name} ${this._id} due to ${e.message}`,
`Failed Committing ${this.constructor.name} ${this.id} due to ${e.message}`,
);
throw e;
}
Expand All @@ -376,7 +477,7 @@ class DBTransaction {
}
}
await this.destroy();
this.logger.debug(`Committed ${this.constructor.name} ${this._id}`);
this.logger.debug(`Committed ${this.constructor.name} ${this.id}`);
}

@ready(new errors.ErrorDBTransactionDestroyed())
Expand All @@ -387,7 +488,7 @@ class DBTransaction {
if (this._rollbacked) {
return;
}
this.logger.debug(`Rollbacking ${this.constructor.name} ${this._id}`);
this.logger.debug(`Rollbacking ${this.constructor.name} ${this.id}`);
for (const iterator of this._iteratorRefs) {
await iterator.destroy();
}
Expand All @@ -405,7 +506,20 @@ class DBTransaction {
}
}
await this.destroy();
this.logger.debug(`Rollbacked ${this.constructor.name} ${this._id}`);
this.logger.debug(`Rollbacked ${this.constructor.name} ${this.id}`);
}

/**
* Set the snapshot manually
* This ensures that consistent reads and writes start
* after this method is executed
* This is idempotent
* Note that normally snapshots are set lazily upon the first
* transaction db operation
*/
@ready(new errors.ErrorDBTransactionDestroyed())
public setSnapshot(): void {
this.setupSnapshot();
}

/**
Expand Down
6 changes: 6 additions & 0 deletions src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ class ErrorDBTransactionConflict<T> extends ErrorDBTransaction<T> {
static description = 'DBTransaction cannot commit due to conflicting writes';
}

class ErrorDBTransactionLockType<T> extends ErrorDBTransaction<T> {
static description =
'DBTransaction does not support upgrading or downgrading the lock type';
}

export {
ErrorDB,
ErrorDBRunning,
Expand All @@ -103,4 +108,5 @@ export {
ErrorDBTransactionRollbacked,
ErrorDBTransactionNotCommittedNorRollbacked,
ErrorDBTransactionConflict,
ErrorDBTransactionLockType,
};
Loading

0 comments on commit a12afb4

Please sign in to comment.