From 52b9901e860999fcbc51a90236031ad6a6793aa5 Mon Sep 17 00:00:00 2001 From: kautuk kundan Date: Thu, 23 Sep 2021 12:15:29 +0530 Subject: [PATCH] saving transactions to db Use txn hash in tests fixed mantissa error updated transaction storage interface linting fix changed returned object when loaded from db Co-authored-by: Jacob Caban-Tomski --- test/client/integration.test.ts | 4 +- test/client/transaction.test.ts | 53 +++-- ts/client/database/connection.ts | 2 + ts/client/storageEngine/transactions/db.ts | 196 ++++++++++++++++++ .../storageEngine/transactions/interfaces.ts | 2 +- .../storageEngine/transactions/memory.ts | 2 +- 6 files changed, 234 insertions(+), 25 deletions(-) create mode 100644 ts/client/storageEngine/transactions/db.ts diff --git a/test/client/integration.test.ts b/test/client/integration.test.ts index 21cb4cc7..bc6e63cd 100644 --- a/test/client/integration.test.ts +++ b/test/client/integration.test.ts @@ -169,7 +169,7 @@ describe("Client Integration", function() { assert.equal(storageSyncer.batches.count(), numBatches); assert.equal(storagePacker.batches.count(), numBatches); - assert.equal(storageSyncer.transactions.count(), maxTransfers); - assert.equal(storagePacker.transactions.count(), maxTransfers); + assert.equal(await storageSyncer.transactions.count(), maxTransfers); + assert.equal(await storagePacker.transactions.count(), maxTransfers); }).timeout(900000); }); diff --git a/test/client/transaction.test.ts b/test/client/transaction.test.ts index 2ad4489e..271526a4 100644 --- a/test/client/transaction.test.ts +++ b/test/client/transaction.test.ts @@ -1,16 +1,20 @@ import { BigNumber } from "@ethersproject/bignumber"; +import { arrayify } from "@ethersproject/bytes"; import chai, { assert } from "chai"; import chaiAsPromised from "chai-as-promised"; +import del from "del"; +import { BlsSigner } from "../../ts/blsSigner"; import { OffchainTx } from "../../ts/client/features/interface"; import { TransferOffchainTx } from "../../ts/client/features/transfer"; import { Status } from "../../ts/client/storageEngine/transactions/constants"; -import { TransactionStorage } from "../../ts/client/storageEngine/transactions/interfaces"; -import { TransactionMemoryStorage } from "../../ts/client/storageEngine/transactions/memory"; +import { TransactionDBStorage } from "../../ts/client/storageEngine/transactions/db"; import { StatusTransitionInvalid, TransactionAlreadyExists, TransactionDoesNotExist } from "../../ts/exceptions"; +import * as mcl from "../../ts/mcl"; +import { randHex } from "../../ts/utils"; chai.use(chaiAsPromised); @@ -21,25 +25,32 @@ const txFactory = ( fee: number, nonce: number ): OffchainTx => { - return new TransferOffchainTx( + const tx = new TransferOffchainTx( BigNumber.from(fromIndex), BigNumber.from(toIndex), BigNumber.from(amount), BigNumber.from(fee), BigNumber.from(nonce) ); + const signer = BlsSigner.new(arrayify(randHex(32))); + tx.signature = signer.sign(tx.message()); + return tx; }; -describe("TransactionMemoryStorage", () => { - let storage: TransactionStorage; +describe("TransactionDBStorage", () => { + let storage = new TransactionDBStorage(); - beforeEach(function() { - storage = new TransactionMemoryStorage(); + before(async function() { + await del("./leveldb/*"); + await mcl.init(); }); describe("get", () => { - it("returns undefined is transaction has not been added", async function() { - assert.isUndefined(await storage.get("abc123")); + it("throws error if transaction has not been added", async function() { + assert.isRejected( + storage.get("abc123"), + /.*Key not found in database*/ + ); }); it("returns the correct transaction", async function() { @@ -51,13 +62,13 @@ describe("TransactionMemoryStorage", () => { await Promise.all(txns.map(async t => storage.pending(t))); - assert.equal(storage.count(), 2); + assert.equal(await storage.count(), 2); for (const t of txns) { const txnStatus = await storage.get(t.message()); - assert.equal(txnStatus?.transaction, t); + assert.equal(txnStatus?.transaction.hash(), t.hash()); const txnStatusFromTx = await storage.get(t); - assert.equal(txnStatusFromTx?.transaction, t); + assert.equal(txnStatusFromTx?.transaction.hash(), t.hash()); } }); }); @@ -69,7 +80,7 @@ describe("TransactionMemoryStorage", () => { const txnMsg = txn.message(); const pendingStatus = await storage.pending(txn); - assert.equal(pendingStatus.transaction, txn); + assert.equal(pendingStatus.transaction.hash(), txn.hash()); assert.equal(pendingStatus.status, Status.Pending); const meta = { @@ -78,7 +89,7 @@ describe("TransactionMemoryStorage", () => { l1BlockIncluded: 101112 }; const submittedStatus = await storage.submitted(txnMsg, meta); - assert.equal(submittedStatus.transaction, txn); + assert.equal(submittedStatus.transaction.hash(), txn.hash()); assert.equal(submittedStatus.status, Status.Submitted); assert.equal(submittedStatus.batchID, meta.batchID); assert.equal(submittedStatus.l1TxnHash, meta.l1TxnHash); @@ -88,7 +99,7 @@ describe("TransactionMemoryStorage", () => { ); const finalizedStatus = await storage.finalized(txnMsg); - assert.equal(finalizedStatus.transaction, txn); + assert.equal(finalizedStatus.transaction.hash(), txn.hash()); assert.equal(finalizedStatus.status, Status.Finalized); assert.equal(finalizedStatus.batchID, meta.batchID); assert.equal(finalizedStatus.l1TxnHash, meta.l1TxnHash); @@ -99,7 +110,7 @@ describe("TransactionMemoryStorage", () => { }); it("properly transitions to failed state from pending", async function() { - const txn = txFactory(10, 11, 10101, 101, 0); + const txn = txFactory(10, 11, 1010, 101, 0); await storage.pending(txn); const detail = "whoops"; @@ -107,13 +118,13 @@ describe("TransactionMemoryStorage", () => { txn.message(), detail ); - assert.equal(failedStatus.transaction, txn); + assert.equal(failedStatus.transaction.hash(), txn.hash()); assert.equal(failedStatus.status, Status.Failed); assert.equal(failedStatus.detail, detail); }); it("properly transitions to failed state from submitted", async function() { - const txn = txFactory(11, 12, 20202, 202, 0); + const txn = txFactory(11, 12, 2020, 202, 0); await storage.pending(txn); const meta = { batchID: 111, @@ -127,7 +138,7 @@ describe("TransactionMemoryStorage", () => { txn.message(), detail ); - assert.equal(failedStatus.transaction, txn); + assert.equal(failedStatus.transaction.hash(), txn.hash()); assert.equal(failedStatus.status, Status.Failed); assert.equal(failedStatus.detail, detail); }); @@ -234,7 +245,7 @@ describe("TransactionMemoryStorage", () => { finalized: false }); - assert.equal(status.transaction, txn); + assert.equal(status.transaction.hash(), txn.hash()); assert.equal(status.status, Status.Submitted); assert.equal(status.batchID, meta.batchID); assert.equal(status.l1TxnHash, meta.l1TxnHash); @@ -254,7 +265,7 @@ describe("TransactionMemoryStorage", () => { finalized: true }); - assert.equal(status.transaction, txn); + assert.equal(status.transaction.hash(), txn.hash()); assert.equal(status.status, Status.Finalized); assert.equal(status.batchID, meta.batchID); assert.equal(status.l1TxnHash, meta.l1TxnHash); diff --git a/ts/client/database/connection.ts b/ts/client/database/connection.ts index e366d317..fd0ba9ce 100644 --- a/ts/client/database/connection.ts +++ b/ts/client/database/connection.ts @@ -10,4 +10,6 @@ export const close = async (): Promise => { export const pubkeyDB = sub(db, "pubkey"); export const stateDB = sub(db, "state"); export const nodeDB = sub(db, "node"); +// export const batchDB = sub(db, "batch"); +export const txDB = sub(db, "tx"); export const pubkey2statesDB = sub(db, "pubkey2states"); diff --git a/ts/client/storageEngine/transactions/db.ts b/ts/client/storageEngine/transactions/db.ts new file mode 100644 index 00000000..fb817295 --- /dev/null +++ b/ts/client/storageEngine/transactions/db.ts @@ -0,0 +1,196 @@ +import { arrayify } from "@ethersproject/bytes"; +import { + StatusTransitionInvalid, + TransactionAlreadyExists, + TransactionDoesNotExist +} from "../../../exceptions"; +import { txDB } from "../../database/connection"; +import { OffchainTx } from "../../features/interface"; +import { TransferOffchainTx } from "../../features/transfer"; +import { Status } from "./constants"; +import { + SubmitMeta, + SyncMeta, + TransactionStatus, + TransactionStorage, + TransationMessageOrObject +} from "./interfaces"; + +/** + * levelDB implementation of TransactionStorage + */ +export class TransactionDBStorage implements TransactionStorage { + private readonly db = txDB; + + public async get( + msgOrTxn: TransationMessageOrObject + ): Promise { + return this.fromDB(this.getMessage(msgOrTxn)); + } + + public async pending(txn: OffchainTx): Promise { + const txnMessage = txn.message(); + await this.throwIfAlreadyExists(txnMessage); + + const txnStatus = { + status: Status.Pending, + transaction: txn + }; + + await this.toDB(txnMessage, txnStatus); + return txnStatus; + } + + public async submitted( + msgOrTxn: TransationMessageOrObject, + { batchID, l1TxnHash, l1BlockIncluded }: SubmitMeta + ): Promise { + const msg = this.getMessage(msgOrTxn); + const txnStatus = await this.getStatusOrFail(msg); + this.validateStatusTransition(txnStatus.status, Status.Submitted); + const submittedTxnStatus = { + ...txnStatus, + status: Status.Submitted, + batchID, + l1TxnHash, + l1BlockIncluded + }; + await this.toDB(msg, submittedTxnStatus); + return submittedTxnStatus; + } + + public async finalized( + msgOrTxn: TransationMessageOrObject + ): Promise { + return this.transition(this.getMessage(msgOrTxn), Status.Finalized); + } + + public async failed( + msgOrTxn: TransationMessageOrObject, + detail: string + ): Promise { + return this.transition( + this.getMessage(msgOrTxn), + Status.Failed, + detail + ); + } + + public async sync( + txn: OffchainTx, + { batchID, l1TxnHash, l1BlockIncluded, finalized }: SyncMeta + ): Promise { + const txnMessage = txn.message(); + await this.throwIfAlreadyExists(txnMessage); + + const status = finalized ? Status.Finalized : Status.Submitted; + const txnStatus = { + transaction: txn, + status, + batchID, + l1TxnHash, + l1BlockIncluded + }; + await this.toDB(txnMessage, txnStatus); + return txnStatus; + } + + public async count(): Promise { + const stream = this.db.createKeyStream(); + let count = 0; + for await (const _ of stream) { + count++; + } + return count; + } + + private async throwIfAlreadyExists(txnMessage: string) { + try { + const itemFound = await this.get(txnMessage); + if (itemFound) { + throw new TransactionAlreadyExists(txnMessage); + } + } catch (error) { + if (error.name !== "NotFoundError") { + throw error; + } + } + } + + private getMessage(msgOrTxn: TransationMessageOrObject): string { + if (typeof msgOrTxn === "string") { + return msgOrTxn; + } + return msgOrTxn.message(); + } + + private async getStatusOrFail(message: string): Promise { + try { + return await this.get(message); + } catch (error) { + if (error.name === "NotFoundError") { + throw new TransactionDoesNotExist(message); + } + throw error; + } + } + + private validateStatusTransition(cur: Status, next: Status) { + // Validate transition from pending + if ( + cur === Status.Pending && + (next === Status.Submitted || next === Status.Failed) + ) { + return; + // Validate transition from submited + } else if ( + cur === Status.Submitted && + (next === Status.Finalized || next === Status.Failed) + ) { + return; + } + // Fail on everything else + throw new StatusTransitionInvalid(cur, next); + } + + private async transition( + message: string, + next: Status, + detail?: string + ): Promise { + const txnStatus = await this.getStatusOrFail(message); + this.validateStatusTransition(txnStatus.status, next); + const newTxnStatus = { + ...txnStatus, + status: next, + detail + }; + await this.toDB(message, newTxnStatus); + return newTxnStatus; + } + + private async toDB(msg: string, status: TransactionStatus) { + const serialized = JSON.stringify({ + tx: status.transaction.serialize(), + status: status.status, + detail: status.detail, + batchID: status.batchID, + l1TxnHash: status.l1TxnHash, + l1BlockIncluded: status.l1BlockIncluded + }); + await this.db.put(msg, serialized); + } + + private async fromDB(msg: string): Promise { + const object = JSON.parse(await this.db.get(msg)); + + return { + transaction: TransferOffchainTx.deserialize(arrayify(object.tx)), + status: object.status, + detail: object.detail, + batchID: object.batchID, + l1TxnHash: object.l1TxnHash, + l1BlockIncluded: object.l1BlockIncluded + }; + } +} diff --git a/ts/client/storageEngine/transactions/interfaces.ts b/ts/client/storageEngine/transactions/interfaces.ts index 052c6bcf..2f340c5e 100644 --- a/ts/client/storageEngine/transactions/interfaces.ts +++ b/ts/client/storageEngine/transactions/interfaces.ts @@ -90,5 +90,5 @@ export interface TransactionStorage { /** * @returns Current number of transactions. */ - count(): number; + count(): Promise; } diff --git a/ts/client/storageEngine/transactions/memory.ts b/ts/client/storageEngine/transactions/memory.ts index 99238424..8ef14d6f 100644 --- a/ts/client/storageEngine/transactions/memory.ts +++ b/ts/client/storageEngine/transactions/memory.ts @@ -102,7 +102,7 @@ export class TransactionMemoryStorage implements TransactionStorage { return txnStatus; } - public count(): number { + public async count(): Promise { return Object.keys(this.transactionMessageToStatus).length; }