Skip to content

Commit

Permalink
insert validity in batches
Browse files Browse the repository at this point in the history
  • Loading branch information
Tadeuchi committed Dec 29, 2023
1 parent 8e4e117 commit 55724fb
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 14 deletions.
80 changes: 68 additions & 12 deletions src/PgContractCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,13 @@ export class PgContractCache<V> implements BasicSortKeyCache<EvalStateResult<V>>
if (!pgCacheOptions) {
this.pgCacheOptions = {
minEntriesPerContract: 10,
maxEntriesPerContract: 100
maxEntriesPerContract: 100,
validityBatchSize: 1000
};
}
if (!pgCacheOptions.validityBatchSize) {
pgCacheOptions.validityBatchSize = 1000;
}
this.pool = new Pool(pgCacheOptions);
}

Expand Down Expand Up @@ -130,6 +134,32 @@ export class PgContractCache<V> implements BasicSortKeyCache<EvalStateResult<V>>
return null;
}

async getValidityAll(cacheKey: CacheKey): Promise<PgContractValidity> {
const getBenchmark = Benchmark.measure();
const result = await this.connection().query(
`WITH validity_page AS
(SELECT tx_id, valid
from warp.validity
where key = $1
and sort_key <= $2
ORDER BY sort_key DESC, id DESC)
select json_object_agg(tx_id, valid) as v, count(*) as count
from validity_page;`,
[cacheKey.key, cacheKey.sortKey]
);

getBenchmark.stop();
this.benchmarkLogger.debug('PG Benchmark', {
'getValidityAll ': getBenchmark.elapsed(),
'cacheKey ': cacheKey
});

if (result && result.rows.length > 0) {
return new PgContractValidity(result.rows[0].v, result.rows[0].count);
}
return new PgContractValidity({}, 0);
}

async getLast(key: string): Promise<SortKeyCacheResult<EvalStateResult<V>> | null> {
const getLastBenchmark = Benchmark.measure();
const result = await this.connection().query(
Expand Down Expand Up @@ -285,18 +315,19 @@ export class PgContractCache<V> implements BasicSortKeyCache<EvalStateResult<V>>
[stateCacheKey.key, stateCacheKey.sortKey, stringifiedState]
);

for (const tx in value.validity) {
await this.connection().query(
`
INSERT INTO warp.validity (key, sort_key, tx_id, valid, error_message)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT(key, tx_id) DO UPDATE
SET valid = EXCLUDED.valid,
sort_key = EXCLUDED.sort_key,
error_message = EXCLUDED.error_message`,
[stateCacheKey.key, stateCacheKey.sortKey, tx, value.validity[tx], value.errorMessages[tx]]
);
let insertValues = '';
let batchCounter = 0;
for (const [tx, v] of Object.entries(value.validity)) {
batchCounter++;
insertValues += `${batchCounter > 1 ? "," : ""} ('${stateCacheKey.key}', '${stateCacheKey.sortKey}', '${tx}', ${v}, '${value.errorMessages[tx]}')`
if (batchCounter % this.pgCacheOptions.validityBatchSize === 0) {
await this.queryInsertValidity(insertValues);
insertValues = '';
batchCounter = 0;
}
}
await this.queryInsertValidity(insertValues);

insertBenchmark.stop();
this.benchmarkLogger.debug('PG Benchmark', {
'insert ': insertBenchmark.elapsed()
Expand All @@ -309,6 +340,21 @@ export class PgContractCache<V> implements BasicSortKeyCache<EvalStateResult<V>>
});
}

private async queryInsertValidity(formattedValues: string): Promise<void> {
if (formattedValues) {
await this.connection().query(
`INSERT INTO warp.validity (key, sort_key, tx_id, valid, error_message)
VALUES
${formattedValues}
ON CONFLICT(key, tx_id)
DO UPDATE
SET valid = EXCLUDED.valid,
sort_key = EXCLUDED.sort_key,
error_message = EXCLUDED.error_message`
);
}
}

private async removeOldestEntries(key: string) {
const rs = await this.connection().query(
`
Expand Down Expand Up @@ -394,3 +440,13 @@ export class PgContractCache<V> implements BasicSortKeyCache<EvalStateResult<V>>
);
}
}

export class PgContractValidity {
constructor(v: Record<string, boolean>, count: number) {
this.validity = v;
this.count = count;
}

readonly validity: Record<string, boolean>;
readonly count: number
}
1 change: 1 addition & 0 deletions src/PgContractCacheOptions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ import { ClientConfig } from 'pg';
export interface PgContractCacheOptions extends ClientConfig {
minEntriesPerContract: number;
maxEntriesPerContract: number;
validityBatchSize?: number;
}
106 changes: 106 additions & 0 deletions src/__tests__/pg-cache-validity.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import { contractCache, evalState } from './utils';
import { CacheKey } from 'warp-contracts';

describe('Postgres cache', () => {
it('should return proper validity', async () => {
const sut = await contractCache(0, 100);

await sut.put(
{
key: 'contract0',
sortKey: '000000860512,1643210931796,81e1bea09d3262ee36ce8cfdbbb2ce3feb18a717c3020c47d206cb8ecb43b767'
},
evalState(
{ result: 'contract0:sortKey0' },
{
zz11: true,
zz12: false,
zz13: true,
zz14: true,
zz15: true
},
{
zz12: 'bum!!'
}
)
);
await sut.put(
{
key: 'contract1',
sortKey: '000000860513,1643210931796,81e1bea09d3262ee36ce8cfdbbb2ce3feb18a717c3020c47d206cb8ecb43b767'
},
evalState(
{ result: 'contract1:sortKey1' },
{
zz11: true,
zz12: false,
zz13: true
},
{
zz12: 'bum!!'
}
)
);
await sut.put(
{
key: 'contract0',
sortKey: '000000860514,1643210931796,81e1bea09d3262ee36ce8cfdbbb2ce3feb18a717c3020c47d206cb8ecb43b767'
},
evalState(
{ result: 'contract1:sortKey2' },
{
zz16: true,
zz17: false,
zz18: true
},
{
zz17: 'bum!!'
}
)
);
await sut.setSignature(
{
key: 'contract0',
sortKey: '000000860514,1643210931796,81e1bea09d3262ee36ce8cfdbbb2ce3feb18a717c3020c47d206cb8ecb43b767'
},
'asd',
'asd'
);

expect(
await sut.get(
new CacheKey(
'contract0',
'000000860514,1643210931796,81e1bea09d3262ee36ce8cfdbbb2ce3feb18a717c3020c47d206cb8ecb43b767'
)
)
).toEqual({
sortKey: '000000860514,1643210931796,81e1bea09d3262ee36ce8cfdbbb2ce3feb18a717c3020c47d206cb8ecb43b767',
cachedValue: evalState({ result: 'contract1:sortKey2' })
});

expect(
await sut.getValidityAll(
new CacheKey(
'contract0',
'000000860514,1643210931796,81e1bea09d3262ee36ce8cfdbbb2ce3feb18a717c3020c47d206cb8ecb43b767'
)
)
).toEqual({
count: '8',
validity: {
zz11: true,
zz12: false,
zz13: true,
zz14: true,
zz15: true,
zz16: true,
zz17: false,
zz18: true
}
});

await sut.drop();
await sut.close();
});
});
5 changes: 3 additions & 2 deletions src/__tests__/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export const contractCache = async function (
const pgOptions: PgContractCacheOptions = {
minEntriesPerContract: maxEntries || 100 * numRepeatingEntries,
maxEntriesPerContract: maxEntries || 100 * numRepeatingEntries,
validityBatchSize: 2,
user: 'postgres',
password: 'postgres',
host: 'localhost',
Expand Down Expand Up @@ -61,6 +62,6 @@ export const sortKeyCache = async function (
return new PgSortKeyCache<unknown>(pgOptions);
};

export const evalState = function (value: any) {
return new EvalStateResult(value, {}, {});
export const evalState = function (value: any, validity = {}, err = {}) {
return new EvalStateResult(value, validity, err);
};

0 comments on commit 55724fb

Please sign in to comment.