Skip to content
This repository has been archived by the owner on Jun 29, 2023. It is now read-only.

Commit

Permalink
Merge pull request #667 from thehubbleproject/tx2db
Browse files Browse the repository at this point in the history
Persist transactions in database
  • Loading branch information
kautukkundan authored Oct 1, 2021
2 parents 722d56d + 52b9901 commit c1d1f19
Show file tree
Hide file tree
Showing 6 changed files with 234 additions and 25 deletions.
4 changes: 2 additions & 2 deletions test/client/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ describe("Client Integration", function() {
assert.equal(storageSyncer.batches.count(), numBatches);
assert.equal(storagePacker.batches.count(), numBatches);

assert.equal(storageSyncer.transactions.count(), maxTransfers);
assert.equal(storagePacker.transactions.count(), maxTransfers);
assert.equal(await storageSyncer.transactions.count(), maxTransfers);
assert.equal(await storagePacker.transactions.count(), maxTransfers);
}).timeout(900000);
});
53 changes: 32 additions & 21 deletions test/client/transaction.test.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
import { BigNumber } from "@ethersproject/bignumber";
import { arrayify } from "@ethersproject/bytes";
import chai, { assert } from "chai";
import chaiAsPromised from "chai-as-promised";
import del from "del";
import { BlsSigner } from "../../ts/blsSigner";
import { OffchainTx } from "../../ts/client/features/interface";
import { TransferOffchainTx } from "../../ts/client/features/transfer";
import { Status } from "../../ts/client/storageEngine/transactions/constants";
import { TransactionStorage } from "../../ts/client/storageEngine/transactions/interfaces";
import { TransactionMemoryStorage } from "../../ts/client/storageEngine/transactions/memory";
import { TransactionDBStorage } from "../../ts/client/storageEngine/transactions/db";
import {
StatusTransitionInvalid,
TransactionAlreadyExists,
TransactionDoesNotExist
} from "../../ts/exceptions";
import * as mcl from "../../ts/mcl";
import { randHex } from "../../ts/utils";

chai.use(chaiAsPromised);

Expand All @@ -21,25 +25,32 @@ const txFactory = (
fee: number,
nonce: number
): OffchainTx => {
return new TransferOffchainTx(
const tx = new TransferOffchainTx(
BigNumber.from(fromIndex),
BigNumber.from(toIndex),
BigNumber.from(amount),
BigNumber.from(fee),
BigNumber.from(nonce)
);
const signer = BlsSigner.new(arrayify(randHex(32)));
tx.signature = signer.sign(tx.message());
return tx;
};

describe("TransactionMemoryStorage", () => {
let storage: TransactionStorage;
describe("TransactionDBStorage", () => {
let storage = new TransactionDBStorage();

beforeEach(function() {
storage = new TransactionMemoryStorage();
before(async function() {
await del("./leveldb/*");
await mcl.init();
});

describe("get", () => {
it("returns undefined is transaction has not been added", async function() {
assert.isUndefined(await storage.get("abc123"));
it("throws error if transaction has not been added", async function() {
assert.isRejected(
storage.get("abc123"),
/.*Key not found in database*/
);
});

it("returns the correct transaction", async function() {
Expand All @@ -51,13 +62,13 @@ describe("TransactionMemoryStorage", () => {

await Promise.all(txns.map(async t => storage.pending(t)));

assert.equal(storage.count(), 2);
assert.equal(await storage.count(), 2);

for (const t of txns) {
const txnStatus = await storage.get(t.message());
assert.equal(txnStatus?.transaction, t);
assert.equal(txnStatus?.transaction.hash(), t.hash());
const txnStatusFromTx = await storage.get(t);
assert.equal(txnStatusFromTx?.transaction, t);
assert.equal(txnStatusFromTx?.transaction.hash(), t.hash());
}
});
});
Expand All @@ -69,7 +80,7 @@ describe("TransactionMemoryStorage", () => {
const txnMsg = txn.message();

const pendingStatus = await storage.pending(txn);
assert.equal(pendingStatus.transaction, txn);
assert.equal(pendingStatus.transaction.hash(), txn.hash());
assert.equal(pendingStatus.status, Status.Pending);

const meta = {
Expand All @@ -78,7 +89,7 @@ describe("TransactionMemoryStorage", () => {
l1BlockIncluded: 101112
};
const submittedStatus = await storage.submitted(txnMsg, meta);
assert.equal(submittedStatus.transaction, txn);
assert.equal(submittedStatus.transaction.hash(), txn.hash());
assert.equal(submittedStatus.status, Status.Submitted);
assert.equal(submittedStatus.batchID, meta.batchID);
assert.equal(submittedStatus.l1TxnHash, meta.l1TxnHash);
Expand All @@ -88,7 +99,7 @@ describe("TransactionMemoryStorage", () => {
);

const finalizedStatus = await storage.finalized(txnMsg);
assert.equal(finalizedStatus.transaction, txn);
assert.equal(finalizedStatus.transaction.hash(), txn.hash());
assert.equal(finalizedStatus.status, Status.Finalized);
assert.equal(finalizedStatus.batchID, meta.batchID);
assert.equal(finalizedStatus.l1TxnHash, meta.l1TxnHash);
Expand All @@ -99,21 +110,21 @@ describe("TransactionMemoryStorage", () => {
});

it("properly transitions to failed state from pending", async function() {
const txn = txFactory(10, 11, 10101, 101, 0);
const txn = txFactory(10, 11, 1010, 101, 0);
await storage.pending(txn);

const detail = "whoops";
const failedStatus = await storage.failed(
txn.message(),
detail
);
assert.equal(failedStatus.transaction, txn);
assert.equal(failedStatus.transaction.hash(), txn.hash());
assert.equal(failedStatus.status, Status.Failed);
assert.equal(failedStatus.detail, detail);
});

it("properly transitions to failed state from submitted", async function() {
const txn = txFactory(11, 12, 20202, 202, 0);
const txn = txFactory(11, 12, 2020, 202, 0);
await storage.pending(txn);
const meta = {
batchID: 111,
Expand All @@ -127,7 +138,7 @@ describe("TransactionMemoryStorage", () => {
txn.message(),
detail
);
assert.equal(failedStatus.transaction, txn);
assert.equal(failedStatus.transaction.hash(), txn.hash());
assert.equal(failedStatus.status, Status.Failed);
assert.equal(failedStatus.detail, detail);
});
Expand Down Expand Up @@ -234,7 +245,7 @@ describe("TransactionMemoryStorage", () => {
finalized: false
});

assert.equal(status.transaction, txn);
assert.equal(status.transaction.hash(), txn.hash());
assert.equal(status.status, Status.Submitted);
assert.equal(status.batchID, meta.batchID);
assert.equal(status.l1TxnHash, meta.l1TxnHash);
Expand All @@ -254,7 +265,7 @@ describe("TransactionMemoryStorage", () => {
finalized: true
});

assert.equal(status.transaction, txn);
assert.equal(status.transaction.hash(), txn.hash());
assert.equal(status.status, Status.Finalized);
assert.equal(status.batchID, meta.batchID);
assert.equal(status.l1TxnHash, meta.l1TxnHash);
Expand Down
2 changes: 2 additions & 0 deletions ts/client/database/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ export const close = async (): Promise<void> => {
export const pubkeyDB = sub(db, "pubkey");
export const stateDB = sub(db, "state");
export const nodeDB = sub(db, "node");
// export const batchDB = sub(db, "batch");
export const txDB = sub(db, "tx");
export const pubkey2statesDB = sub(db, "pubkey2states");
196 changes: 196 additions & 0 deletions ts/client/storageEngine/transactions/db.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
import { arrayify } from "@ethersproject/bytes";
import {
StatusTransitionInvalid,
TransactionAlreadyExists,
TransactionDoesNotExist
} from "../../../exceptions";
import { txDB } from "../../database/connection";
import { OffchainTx } from "../../features/interface";
import { TransferOffchainTx } from "../../features/transfer";
import { Status } from "./constants";
import {
SubmitMeta,
SyncMeta,
TransactionStatus,
TransactionStorage,
TransationMessageOrObject
} from "./interfaces";

/**
* levelDB implementation of TransactionStorage
*/
export class TransactionDBStorage implements TransactionStorage {
private readonly db = txDB;

public async get(
msgOrTxn: TransationMessageOrObject
): Promise<TransactionStatus> {
return this.fromDB(this.getMessage(msgOrTxn));
}

public async pending(txn: OffchainTx): Promise<TransactionStatus> {
const txnMessage = txn.message();
await this.throwIfAlreadyExists(txnMessage);

const txnStatus = {
status: Status.Pending,
transaction: txn
};

await this.toDB(txnMessage, txnStatus);
return txnStatus;
}

public async submitted(
msgOrTxn: TransationMessageOrObject,
{ batchID, l1TxnHash, l1BlockIncluded }: SubmitMeta
): Promise<TransactionStatus> {
const msg = this.getMessage(msgOrTxn);
const txnStatus = await this.getStatusOrFail(msg);
this.validateStatusTransition(txnStatus.status, Status.Submitted);
const submittedTxnStatus = {
...txnStatus,
status: Status.Submitted,
batchID,
l1TxnHash,
l1BlockIncluded
};
await this.toDB(msg, submittedTxnStatus);
return submittedTxnStatus;
}

public async finalized(
msgOrTxn: TransationMessageOrObject
): Promise<TransactionStatus> {
return this.transition(this.getMessage(msgOrTxn), Status.Finalized);
}

public async failed(
msgOrTxn: TransationMessageOrObject,
detail: string
): Promise<TransactionStatus> {
return this.transition(
this.getMessage(msgOrTxn),
Status.Failed,
detail
);
}

public async sync(
txn: OffchainTx,
{ batchID, l1TxnHash, l1BlockIncluded, finalized }: SyncMeta
): Promise<TransactionStatus> {
const txnMessage = txn.message();
await this.throwIfAlreadyExists(txnMessage);

const status = finalized ? Status.Finalized : Status.Submitted;
const txnStatus = {
transaction: txn,
status,
batchID,
l1TxnHash,
l1BlockIncluded
};
await this.toDB(txnMessage, txnStatus);
return txnStatus;
}

public async count(): Promise<number> {
const stream = this.db.createKeyStream();
let count = 0;
for await (const _ of stream) {
count++;
}
return count;
}

private async throwIfAlreadyExists(txnMessage: string) {
try {
const itemFound = await this.get(txnMessage);
if (itemFound) {
throw new TransactionAlreadyExists(txnMessage);
}
} catch (error) {
if (error.name !== "NotFoundError") {
throw error;
}
}
}

private getMessage(msgOrTxn: TransationMessageOrObject): string {
if (typeof msgOrTxn === "string") {
return msgOrTxn;
}
return msgOrTxn.message();
}

private async getStatusOrFail(message: string): Promise<TransactionStatus> {
try {
return await this.get(message);
} catch (error) {
if (error.name === "NotFoundError") {
throw new TransactionDoesNotExist(message);
}
throw error;
}
}

private validateStatusTransition(cur: Status, next: Status) {
// Validate transition from pending
if (
cur === Status.Pending &&
(next === Status.Submitted || next === Status.Failed)
) {
return;
// Validate transition from submited
} else if (
cur === Status.Submitted &&
(next === Status.Finalized || next === Status.Failed)
) {
return;
}
// Fail on everything else
throw new StatusTransitionInvalid(cur, next);
}

private async transition(
message: string,
next: Status,
detail?: string
): Promise<TransactionStatus> {
const txnStatus = await this.getStatusOrFail(message);
this.validateStatusTransition(txnStatus.status, next);
const newTxnStatus = {
...txnStatus,
status: next,
detail
};
await this.toDB(message, newTxnStatus);
return newTxnStatus;
}

private async toDB(msg: string, status: TransactionStatus) {
const serialized = JSON.stringify({
tx: status.transaction.serialize(),
status: status.status,
detail: status.detail,
batchID: status.batchID,
l1TxnHash: status.l1TxnHash,
l1BlockIncluded: status.l1BlockIncluded
});
await this.db.put(msg, serialized);
}

private async fromDB(msg: string): Promise<TransactionStatus> {
const object = JSON.parse(await this.db.get(msg));

return {
transaction: TransferOffchainTx.deserialize(arrayify(object.tx)),
status: object.status,
detail: object.detail,
batchID: object.batchID,
l1TxnHash: object.l1TxnHash,
l1BlockIncluded: object.l1BlockIncluded
};
}
}
2 changes: 1 addition & 1 deletion ts/client/storageEngine/transactions/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,5 +90,5 @@ export interface TransactionStorage {
/**
* @returns Current number of transactions.
*/
count(): number;
count(): Promise<number>;
}
2 changes: 1 addition & 1 deletion ts/client/storageEngine/transactions/memory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ export class TransactionMemoryStorage implements TransactionStorage {
return txnStatus;
}

public count(): number {
public async count(): Promise<number> {
return Object.keys(this.transactionMessageToStatus).length;
}

Expand Down

0 comments on commit c1d1f19

Please sign in to comment.