From 55724fb09bd7efbfe898a529c076a119af33a3a3 Mon Sep 17 00:00:00 2001 From: Tadeuchi Date: Tue, 26 Dec 2023 15:47:50 +0100 Subject: [PATCH] insert validity in batches --- src/PgContractCache.ts | 80 +++++++++++++++--- src/PgContractCacheOptions.ts | 1 + src/__tests__/pg-cache-validity.test.ts | 106 ++++++++++++++++++++++++ src/__tests__/utils.ts | 5 +- 4 files changed, 178 insertions(+), 14 deletions(-) create mode 100644 src/__tests__/pg-cache-validity.test.ts diff --git a/src/PgContractCache.ts b/src/PgContractCache.ts index 300f735..8908e3f 100644 --- a/src/PgContractCache.ts +++ b/src/PgContractCache.ts @@ -22,9 +22,13 @@ export class PgContractCache implements BasicSortKeyCache> if (!pgCacheOptions) { this.pgCacheOptions = { minEntriesPerContract: 10, - maxEntriesPerContract: 100 + maxEntriesPerContract: 100, + validityBatchSize: 1000 }; } + if (!pgCacheOptions.validityBatchSize) { + pgCacheOptions.validityBatchSize = 1000; + } this.pool = new Pool(pgCacheOptions); } @@ -130,6 +134,32 @@ export class PgContractCache implements BasicSortKeyCache> return null; } + async getValidityAll(cacheKey: CacheKey): Promise { + const getBenchmark = Benchmark.measure(); + const result = await this.connection().query( + `WITH validity_page AS + (SELECT tx_id, valid + from warp.validity + where key = $1 + and sort_key <= $2 + ORDER BY sort_key DESC, id DESC) + select json_object_agg(tx_id, valid) as v, count(*) as count + from validity_page;`, + [cacheKey.key, cacheKey.sortKey] + ); + + getBenchmark.stop(); + this.benchmarkLogger.debug('PG Benchmark', { + 'getValidityAll ': getBenchmark.elapsed(), + 'cacheKey ': cacheKey + }); + + if (result && result.rows.length > 0) { + return new PgContractValidity(result.rows[0].v, result.rows[0].count); + } + return new PgContractValidity({}, 0); + } + async getLast(key: string): Promise> | null> { const getLastBenchmark = Benchmark.measure(); const result = await this.connection().query( @@ -285,18 +315,19 @@ export class PgContractCache implements BasicSortKeyCache> [stateCacheKey.key, stateCacheKey.sortKey, stringifiedState] ); - for (const tx in value.validity) { - await this.connection().query( - ` - INSERT INTO warp.validity (key, sort_key, tx_id, valid, error_message) - VALUES ($1, $2, $3, $4, $5) - ON CONFLICT(key, tx_id) DO UPDATE - SET valid = EXCLUDED.valid, - sort_key = EXCLUDED.sort_key, - error_message = EXCLUDED.error_message`, - [stateCacheKey.key, stateCacheKey.sortKey, tx, value.validity[tx], value.errorMessages[tx]] - ); + let insertValues = ''; + let batchCounter = 0; + for (const [tx, v] of Object.entries(value.validity)) { + batchCounter++; + insertValues += `${batchCounter > 1 ? "," : ""} ('${stateCacheKey.key}', '${stateCacheKey.sortKey}', '${tx}', ${v}, '${value.errorMessages[tx]}')` + if (batchCounter % this.pgCacheOptions.validityBatchSize === 0) { + await this.queryInsertValidity(insertValues); + insertValues = ''; + batchCounter = 0; + } } + await this.queryInsertValidity(insertValues); + insertBenchmark.stop(); this.benchmarkLogger.debug('PG Benchmark', { 'insert ': insertBenchmark.elapsed() @@ -309,6 +340,21 @@ export class PgContractCache implements BasicSortKeyCache> }); } + private async queryInsertValidity(formattedValues: string): Promise { + if (formattedValues) { + await this.connection().query( + `INSERT INTO warp.validity (key, sort_key, tx_id, valid, error_message) + VALUES + ${formattedValues} + ON CONFLICT(key, tx_id) + DO UPDATE + SET valid = EXCLUDED.valid, + sort_key = EXCLUDED.sort_key, + error_message = EXCLUDED.error_message` + ); + } + } + private async removeOldestEntries(key: string) { const rs = await this.connection().query( ` @@ -394,3 +440,13 @@ export class PgContractCache implements BasicSortKeyCache> ); } } + +export class PgContractValidity { + constructor(v: Record, count: number) { + this.validity = v; + this.count = count; + } + + readonly validity: Record; + readonly count: number +} diff --git a/src/PgContractCacheOptions.ts b/src/PgContractCacheOptions.ts index a64c9f0..60f77b7 100644 --- a/src/PgContractCacheOptions.ts +++ b/src/PgContractCacheOptions.ts @@ -3,4 +3,5 @@ import { ClientConfig } from 'pg'; export interface PgContractCacheOptions extends ClientConfig { minEntriesPerContract: number; maxEntriesPerContract: number; + validityBatchSize?: number; } diff --git a/src/__tests__/pg-cache-validity.test.ts b/src/__tests__/pg-cache-validity.test.ts new file mode 100644 index 0000000..ad2f9ec --- /dev/null +++ b/src/__tests__/pg-cache-validity.test.ts @@ -0,0 +1,106 @@ +import { contractCache, evalState } from './utils'; +import { CacheKey } from 'warp-contracts'; + +describe('Postgres cache', () => { + it('should return proper validity', async () => { + const sut = await contractCache(0, 100); + + await sut.put( + { + key: 'contract0', + sortKey: '000000860512,1643210931796,81e1bea09d3262ee36ce8cfdbbb2ce3feb18a717c3020c47d206cb8ecb43b767' + }, + evalState( + { result: 'contract0:sortKey0' }, + { + zz11: true, + zz12: false, + zz13: true, + zz14: true, + zz15: true + }, + { + zz12: 'bum!!' + } + ) + ); + await sut.put( + { + key: 'contract1', + sortKey: '000000860513,1643210931796,81e1bea09d3262ee36ce8cfdbbb2ce3feb18a717c3020c47d206cb8ecb43b767' + }, + evalState( + { result: 'contract1:sortKey1' }, + { + zz11: true, + zz12: false, + zz13: true + }, + { + zz12: 'bum!!' + } + ) + ); + await sut.put( + { + key: 'contract0', + sortKey: '000000860514,1643210931796,81e1bea09d3262ee36ce8cfdbbb2ce3feb18a717c3020c47d206cb8ecb43b767' + }, + evalState( + { result: 'contract1:sortKey2' }, + { + zz16: true, + zz17: false, + zz18: true + }, + { + zz17: 'bum!!' + } + ) + ); + await sut.setSignature( + { + key: 'contract0', + sortKey: '000000860514,1643210931796,81e1bea09d3262ee36ce8cfdbbb2ce3feb18a717c3020c47d206cb8ecb43b767' + }, + 'asd', + 'asd' + ); + + expect( + await sut.get( + new CacheKey( + 'contract0', + '000000860514,1643210931796,81e1bea09d3262ee36ce8cfdbbb2ce3feb18a717c3020c47d206cb8ecb43b767' + ) + ) + ).toEqual({ + sortKey: '000000860514,1643210931796,81e1bea09d3262ee36ce8cfdbbb2ce3feb18a717c3020c47d206cb8ecb43b767', + cachedValue: evalState({ result: 'contract1:sortKey2' }) + }); + + expect( + await sut.getValidityAll( + new CacheKey( + 'contract0', + '000000860514,1643210931796,81e1bea09d3262ee36ce8cfdbbb2ce3feb18a717c3020c47d206cb8ecb43b767' + ) + ) + ).toEqual({ + count: '8', + validity: { + zz11: true, + zz12: false, + zz13: true, + zz14: true, + zz15: true, + zz16: true, + zz17: false, + zz18: true + } + }); + + await sut.drop(); + await sut.close(); + }); +}); diff --git a/src/__tests__/utils.ts b/src/__tests__/utils.ts index be56357..61dc7d3 100644 --- a/src/__tests__/utils.ts +++ b/src/__tests__/utils.ts @@ -17,6 +17,7 @@ export const contractCache = async function ( const pgOptions: PgContractCacheOptions = { minEntriesPerContract: maxEntries || 100 * numRepeatingEntries, maxEntriesPerContract: maxEntries || 100 * numRepeatingEntries, + validityBatchSize: 2, user: 'postgres', password: 'postgres', host: 'localhost', @@ -61,6 +62,6 @@ export const sortKeyCache = async function ( return new PgSortKeyCache(pgOptions); }; -export const evalState = function (value: any) { - return new EvalStateResult(value, {}, {}); +export const evalState = function (value: any, validity = {}, err = {}) { + return new EvalStateResult(value, validity, err); };