Skip to content

Commit

Permalink
Feat: Created daily_node_metrics table and received/stored station_id (
Browse files Browse the repository at this point in the history
…#188)

* Created daily_node_metrics table and received/stored station_id

* Enriching tests and small syntax/structure changes

* Converted 'metric_date' column to 'day'

* Use bulk insert for updating daily_node_metrics

* Accept 88-char station IDs instead, naming changes

---------

Co-authored-by: Miroslav Bajtoš <oss@bajtos.net>
Co-authored-by: Julian Gruber <julian@juliangruber.com>
  • Loading branch information
3 people authored Apr 30, 2024
1 parent aafadc7 commit 05da282
Showing 8 changed files with 132 additions and 1 deletion.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -13,7 +13,7 @@ Set up [PostgreSQL](https://www.postgresql.org/) with default settings:
- Database: spark_stats

Alternatively, set the environment variable `$DATABASE_URL` with
`postgres://${USER}:${PASS}@${HOST}:${POST}/${DATABASE}`.
`postgres://${USER}:${PASS}@${HOST}:${PORT}/${DATABASE}`.

The Postgres user and database need to exist already, and the user
needs full management permissions for the database.
29 changes: 29 additions & 0 deletions lib/platform-stats.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import createDebug from 'debug'

const debug = createDebug('spark:platform-stats')

/**
* @param {import('pg').Client} pgClient
* @param {import('./preprocess').Measurement[]} honestMeasurements
*/
export const updatePlatformStats = async (pgClient, honestMeasurements) => {
await updateDailyStationStats(pgClient, honestMeasurements)
}

/**
* @param {import('pg').Client} pgClient
* @param {import('./preprocess').Measurement[]} honestMeasurements
*/
export const updateDailyStationStats = async (pgClient, honestMeasurements) => {
// TODO: when we add more fields, we will update the ON CONFLICT clause
// to update those fields, and we won't just use a Set for the stationIds
// which currently removes all granular measurement details
const stationIds = [...new Set(honestMeasurements.map(m => m.stationId))]
debug('Updating daily station stats, unique_count=%s', stationIds.length)

await pgClient.query(`
INSERT INTO daily_stations (station_id, day)
SELECT unnest($1::text[]), now()
ON CONFLICT (station_id, day) DO NOTHING
`, [stationIds])
}
8 changes: 8 additions & 0 deletions lib/preprocess.js
Original file line number Diff line number Diff line change
@@ -29,6 +29,7 @@ export class Measurement {
this.end_at = parseDateTime(m.end_at)
this.status_code = m.status_code
this.indexerResult = pointerize(m.indexer_result)
this.stationId = pointerize(m.station_id)
}
}

@@ -200,6 +201,13 @@ const assertValidMeasurement = measurement => {
assert(ethers.isAddress(measurement.participantAddress), 'valid participant address required')
assert(typeof measurement.inet_group === 'string', 'valid inet group required')
assert(typeof measurement.finished_at === 'number', 'field `finished_at` must be set to a number')
if (measurement.stationId) {
assert(
typeof measurement.stationId === 'string' &&
measurement.stationId.match(/^[0-9a-fA-F]{88}$/),
'stationId must be a hex string with 88 characters'
)
}
}

/**
2 changes: 2 additions & 0 deletions lib/public-stats.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import assert from 'node:assert'
import createDebug from 'debug'

import { updatePlatformStats } from './platform-stats.js'
import { getTaskId } from './retrieval-stats.js'

const debug = createDebug('spark:public-stats')
@@ -31,6 +32,7 @@ export const updatePublicStats = async ({ createPgClient, honestMeasurements })
}
await updateDailyParticipants(pgClient, participants)
await updateIndexerQueryStats(pgClient, honestMeasurements)
await updatePlatformStats(pgClient, honestMeasurements)
} finally {
await pgClient.end()
}
5 changes: 5 additions & 0 deletions migrations/006.do.daily-node-metrics.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE daily_stations (
day DATE NOT NULL,
station_id TEXT NOT NULL,
PRIMARY KEY (day, station_id)
)
2 changes: 2 additions & 0 deletions test/helpers/test-data.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export const VALID_PARTICIPANT_ADDRESS = '0x000000000000000000000000000000000000dEaD'
export const VALID_STATION_ID = '8800000000000000000000000000000000000000000000000000000000000000000000000000000000000000'

export const VALID_TASK = {
cid: 'QmUuEoBdjC8D1PfWZCc7JCSK8nj7TV6HbXWDHYHzZHCVGS',
@@ -13,6 +14,7 @@ export const VALID_MEASUREMENT = {
provider_address: '/dns4/production-ipfs-peer.pinata.cloud/tcp/3000/ws/p2p/Qma8ddFEQWEU8ijWvdxXm3nxU7oHsRtCykAaVz8WUYhiKn',
protocol: 'bitswap',
participantAddress: VALID_PARTICIPANT_ADDRESS,
stationId: VALID_STATION_ID,
inet_group: 'some-group-id',
status_code: 200,
timeout: false,
81 changes: 81 additions & 0 deletions test/platform-stats.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import assert from 'node:assert'
import pg from 'pg'
import { beforeEach, describe, it } from 'mocha'

import { DATABASE_URL } from '../lib/config.js'
import { migrateWithPgClient } from '../lib/migrate.js'
import { VALID_MEASUREMENT, VALID_STATION_ID } from './helpers/test-data.js'
import { updateDailyStationStats } from '../lib/platform-stats.js'

const createPgClient = async () => {
const pgClient = new pg.Client({ connectionString: DATABASE_URL })
await pgClient.connect()
return pgClient
}

describe('platform-stats', () => {
let pgClient
before(async () => {
pgClient = await createPgClient()
await migrateWithPgClient(pgClient)
})

let today
beforeEach(async () => {
await pgClient.query('DELETE FROM daily_stations')

// Run all tests inside a transaction to ensure `now()` always returns the same value
// See https://dba.stackexchange.com/a/63549/125312
// This avoids subtle race conditions when the tests are executed around midnight.
await pgClient.query('BEGIN TRANSACTION')
today = await getCurrentDate()
})

afterEach(async () => {
await pgClient.query('END TRANSACTION')
})

after(async () => {
await pgClient.end()
})

describe('updateDailyStationStats', () => {
it('updates daily station stats for today with multiple measurements', async () => {
const validStationId2 = VALID_STATION_ID.slice(0, -1) + '1'
const honestMeasurements = [
{ ...VALID_MEASUREMENT, stationId: VALID_STATION_ID },
{ ...VALID_MEASUREMENT, stationId: validStationId2 }
]

await updateDailyStationStats(pgClient, honestMeasurements)

const { rows } = await pgClient.query(`
SELECT station_id, day::TEXT FROM daily_stations
ORDER BY station_id`
)
assert.strictEqual(rows.length, 2)
assert.deepStrictEqual(rows, [
{ station_id: VALID_STATION_ID, day: today },
{ station_id: validStationId2, day: today }
])
})

it('ignores duplicate measurements for the same station on the same day', async () => {
const honestMeasurements = [
{ ...VALID_MEASUREMENT, stationId: VALID_STATION_ID },
{ ...VALID_MEASUREMENT, stationId: VALID_STATION_ID }
]

await updateDailyStationStats(pgClient, honestMeasurements)

const { rows } = await pgClient.query('SELECT station_id, day::TEXT FROM daily_stations')
assert.strictEqual(rows.length, 1)
assert.deepStrictEqual(rows, [{ station_id: VALID_STATION_ID, day: today }])
})
})

const getCurrentDate = async () => {
const { rows: [{ today }] } = await pgClient.query('SELECT now()::DATE::TEXT as today')
return today
}
})
4 changes: 4 additions & 0 deletions test/preprocess.js
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@ import { Point } from '../lib/telemetry.js'
import assert from 'node:assert'
import createDebug from 'debug'
import { assertPointFieldValue, assertRecordedTelemetryPoint } from './helpers/assertions.js'
import { VALID_STATION_ID } from './helpers/test-data.js'
import { RoundData } from '../lib/round.js'

const debug = createDebug('test')
@@ -23,6 +24,7 @@ describe('preprocess', () => {
const roundIndex = 0
const measurements = [{
participant_address: 'f410ftgmzttyqi3ti4nxbvixa4byql3o5d4eo3jtc43i',
station_id: VALID_STATION_ID,
spark_version: '1.2.3',
inet_group: 'ig1',
finished_at: '2023-11-01T09:00:00.000Z',
@@ -41,6 +43,7 @@ describe('preprocess', () => {
assert.deepStrictEqual(round.measurements, [
new Measurement({
participant_address: '0x999999cf1046e68e36E1aA2E0E07105eDDD1f08E',
station_id: VALID_STATION_ID,
spark_version: '1.2.3',
inet_group: 'ig1',
finished_at: '2023-11-01T09:00:00.000Z',
@@ -97,6 +100,7 @@ describe('getRetrievalResult', () => {
spark_version: '1.5.2',
zinnia_version: '0.14.0',
participant_address: 'f410fgkhpcrbmdvic52o3nivftrjxr7nzw47updmuzra',
station_id: VALID_STATION_ID,
finished_at: '2023-11-01T09:42:03.246Z',
timeout: false,
start_at: '2023-11-01T09:40:03.393Z',

0 comments on commit 05da282

Please sign in to comment.