From 05da282c4c4a0fd82072623ba3e3eaf780f50b89 Mon Sep 17 00:00:00 2001 From: Patrick Nercessian <45547116+PatrickNercessian@users.noreply.github.com> Date: Tue, 30 Apr 2024 02:42:51 -0400 Subject: [PATCH] Feat: Created daily_node_metrics table and received/stored station_id (#188) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Created daily_node_metrics table and received/stored station_id * Enriching tests and small syntax/structure changes * Converted 'metric_date' column to 'day' * Use bulk insert for updating daily_node_metrics * Accept 88-char station IDs instead, naming changes --------- Co-authored-by: Miroslav Bajtoš Co-authored-by: Julian Gruber --- README.md | 2 +- lib/platform-stats.js | 29 +++++++++ lib/preprocess.js | 8 +++ lib/public-stats.js | 2 + migrations/006.do.daily-node-metrics.sql | 5 ++ test/helpers/test-data.js | 2 + test/platform-stats.test.js | 81 ++++++++++++++++++++++++ test/preprocess.js | 4 ++ 8 files changed, 132 insertions(+), 1 deletion(-) create mode 100644 lib/platform-stats.js create mode 100644 migrations/006.do.daily-node-metrics.sql create mode 100644 test/platform-stats.test.js diff --git a/README.md b/README.md index dfab5edf..039e5ae3 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ Set up [PostgreSQL](https://www.postgresql.org/) with default settings: - Database: spark_stats Alternatively, set the environment variable `$DATABASE_URL` with -`postgres://${USER}:${PASS}@${HOST}:${POST}/${DATABASE}`. +`postgres://${USER}:${PASS}@${HOST}:${PORT}/${DATABASE}`. The Postgres user and database need to exist already, and the user needs full management permissions for the database. diff --git a/lib/platform-stats.js b/lib/platform-stats.js new file mode 100644 index 00000000..5ffb76f1 --- /dev/null +++ b/lib/platform-stats.js @@ -0,0 +1,29 @@ +import createDebug from 'debug' + +const debug = createDebug('spark:platform-stats') + +/** + * @param {import('pg').Client} pgClient + * @param {import('./preprocess').Measurement[]} honestMeasurements + */ +export const updatePlatformStats = async (pgClient, honestMeasurements) => { + await updateDailyStationStats(pgClient, honestMeasurements) +} + +/** + * @param {import('pg').Client} pgClient + * @param {import('./preprocess').Measurement[]} honestMeasurements + */ +export const updateDailyStationStats = async (pgClient, honestMeasurements) => { + // TODO: when we add more fields, we will update the ON CONFLICT clause + // to update those fields, and we won't just use a Set for the stationIds + // which currently removes all granular measurement details + const stationIds = [...new Set(honestMeasurements.map(m => m.stationId))] + debug('Updating daily station stats, unique_count=%s', stationIds.length) + + await pgClient.query(` + INSERT INTO daily_stations (station_id, day) + SELECT unnest($1::text[]), now() + ON CONFLICT (station_id, day) DO NOTHING + `, [stationIds]) +} diff --git a/lib/preprocess.js b/lib/preprocess.js index 87f498d5..4a4ed8e5 100644 --- a/lib/preprocess.js +++ b/lib/preprocess.js @@ -29,6 +29,7 @@ export class Measurement { this.end_at = parseDateTime(m.end_at) this.status_code = m.status_code this.indexerResult = pointerize(m.indexer_result) + this.stationId = pointerize(m.station_id) } } @@ -200,6 +201,13 @@ const assertValidMeasurement = measurement => { assert(ethers.isAddress(measurement.participantAddress), 'valid participant address required') assert(typeof measurement.inet_group === 'string', 'valid inet group required') assert(typeof measurement.finished_at === 'number', 'field `finished_at` must be set to a number') + if (measurement.stationId) { + assert( + typeof measurement.stationId === 'string' && + measurement.stationId.match(/^[0-9a-fA-F]{88}$/), + 'stationId must be a hex string with 88 characters' + ) + } } /** diff --git a/lib/public-stats.js b/lib/public-stats.js index ababf35d..bc10feb5 100644 --- a/lib/public-stats.js +++ b/lib/public-stats.js @@ -1,6 +1,7 @@ import assert from 'node:assert' import createDebug from 'debug' +import { updatePlatformStats } from './platform-stats.js' import { getTaskId } from './retrieval-stats.js' const debug = createDebug('spark:public-stats') @@ -31,6 +32,7 @@ export const updatePublicStats = async ({ createPgClient, honestMeasurements }) } await updateDailyParticipants(pgClient, participants) await updateIndexerQueryStats(pgClient, honestMeasurements) + await updatePlatformStats(pgClient, honestMeasurements) } finally { await pgClient.end() } diff --git a/migrations/006.do.daily-node-metrics.sql b/migrations/006.do.daily-node-metrics.sql new file mode 100644 index 00000000..557b7b3a --- /dev/null +++ b/migrations/006.do.daily-node-metrics.sql @@ -0,0 +1,5 @@ +CREATE TABLE daily_stations ( + day DATE NOT NULL, + station_id TEXT NOT NULL, + PRIMARY KEY (day, station_id) +) diff --git a/test/helpers/test-data.js b/test/helpers/test-data.js index c3f1c1c5..f4a8761a 100644 --- a/test/helpers/test-data.js +++ b/test/helpers/test-data.js @@ -1,4 +1,5 @@ export const VALID_PARTICIPANT_ADDRESS = '0x000000000000000000000000000000000000dEaD' +export const VALID_STATION_ID = '8800000000000000000000000000000000000000000000000000000000000000000000000000000000000000' export const VALID_TASK = { cid: 'QmUuEoBdjC8D1PfWZCc7JCSK8nj7TV6HbXWDHYHzZHCVGS', @@ -13,6 +14,7 @@ export const VALID_MEASUREMENT = { provider_address: '/dns4/production-ipfs-peer.pinata.cloud/tcp/3000/ws/p2p/Qma8ddFEQWEU8ijWvdxXm3nxU7oHsRtCykAaVz8WUYhiKn', protocol: 'bitswap', participantAddress: VALID_PARTICIPANT_ADDRESS, + stationId: VALID_STATION_ID, inet_group: 'some-group-id', status_code: 200, timeout: false, diff --git a/test/platform-stats.test.js b/test/platform-stats.test.js new file mode 100644 index 00000000..8f37d991 --- /dev/null +++ b/test/platform-stats.test.js @@ -0,0 +1,81 @@ +import assert from 'node:assert' +import pg from 'pg' +import { beforeEach, describe, it } from 'mocha' + +import { DATABASE_URL } from '../lib/config.js' +import { migrateWithPgClient } from '../lib/migrate.js' +import { VALID_MEASUREMENT, VALID_STATION_ID } from './helpers/test-data.js' +import { updateDailyStationStats } from '../lib/platform-stats.js' + +const createPgClient = async () => { + const pgClient = new pg.Client({ connectionString: DATABASE_URL }) + await pgClient.connect() + return pgClient +} + +describe('platform-stats', () => { + let pgClient + before(async () => { + pgClient = await createPgClient() + await migrateWithPgClient(pgClient) + }) + + let today + beforeEach(async () => { + await pgClient.query('DELETE FROM daily_stations') + + // Run all tests inside a transaction to ensure `now()` always returns the same value + // See https://dba.stackexchange.com/a/63549/125312 + // This avoids subtle race conditions when the tests are executed around midnight. + await pgClient.query('BEGIN TRANSACTION') + today = await getCurrentDate() + }) + + afterEach(async () => { + await pgClient.query('END TRANSACTION') + }) + + after(async () => { + await pgClient.end() + }) + + describe('updateDailyStationStats', () => { + it('updates daily station stats for today with multiple measurements', async () => { + const validStationId2 = VALID_STATION_ID.slice(0, -1) + '1' + const honestMeasurements = [ + { ...VALID_MEASUREMENT, stationId: VALID_STATION_ID }, + { ...VALID_MEASUREMENT, stationId: validStationId2 } + ] + + await updateDailyStationStats(pgClient, honestMeasurements) + + const { rows } = await pgClient.query(` + SELECT station_id, day::TEXT FROM daily_stations + ORDER BY station_id` + ) + assert.strictEqual(rows.length, 2) + assert.deepStrictEqual(rows, [ + { station_id: VALID_STATION_ID, day: today }, + { station_id: validStationId2, day: today } + ]) + }) + + it('ignores duplicate measurements for the same station on the same day', async () => { + const honestMeasurements = [ + { ...VALID_MEASUREMENT, stationId: VALID_STATION_ID }, + { ...VALID_MEASUREMENT, stationId: VALID_STATION_ID } + ] + + await updateDailyStationStats(pgClient, honestMeasurements) + + const { rows } = await pgClient.query('SELECT station_id, day::TEXT FROM daily_stations') + assert.strictEqual(rows.length, 1) + assert.deepStrictEqual(rows, [{ station_id: VALID_STATION_ID, day: today }]) + }) + }) + + const getCurrentDate = async () => { + const { rows: [{ today }] } = await pgClient.query('SELECT now()::DATE::TEXT as today') + return today + } +}) diff --git a/test/preprocess.js b/test/preprocess.js index d99edb7e..a69c11b1 100644 --- a/test/preprocess.js +++ b/test/preprocess.js @@ -3,6 +3,7 @@ import { Point } from '../lib/telemetry.js' import assert from 'node:assert' import createDebug from 'debug' import { assertPointFieldValue, assertRecordedTelemetryPoint } from './helpers/assertions.js' +import { VALID_STATION_ID } from './helpers/test-data.js' import { RoundData } from '../lib/round.js' const debug = createDebug('test') @@ -23,6 +24,7 @@ describe('preprocess', () => { const roundIndex = 0 const measurements = [{ participant_address: 'f410ftgmzttyqi3ti4nxbvixa4byql3o5d4eo3jtc43i', + station_id: VALID_STATION_ID, spark_version: '1.2.3', inet_group: 'ig1', finished_at: '2023-11-01T09:00:00.000Z', @@ -41,6 +43,7 @@ describe('preprocess', () => { assert.deepStrictEqual(round.measurements, [ new Measurement({ participant_address: '0x999999cf1046e68e36E1aA2E0E07105eDDD1f08E', + station_id: VALID_STATION_ID, spark_version: '1.2.3', inet_group: 'ig1', finished_at: '2023-11-01T09:00:00.000Z', @@ -97,6 +100,7 @@ describe('getRetrievalResult', () => { spark_version: '1.5.2', zinnia_version: '0.14.0', participant_address: 'f410fgkhpcrbmdvic52o3nivftrjxr7nzw47updmuzra', + station_id: VALID_STATION_ID, finished_at: '2023-11-01T09:42:03.246Z', timeout: false, start_at: '2023-11-01T09:40:03.393Z',