From e52f9e28613288ad10a7bb429b4e31c8b678aa55 Mon Sep 17 00:00:00 2001 From: Julian Gruber Date: Sat, 31 Aug 2024 20:33:12 +0200 Subject: [PATCH 1/4] Add persistent round buffer. Closes #3 --- fly.toml | 4 ++++ index.js | 34 +++++++++++++++++++++++++++++++++- lib/preprocess.js | 8 +++++++- 3 files changed, 44 insertions(+), 2 deletions(-) diff --git a/fly.toml b/fly.toml index 1988b0ea..c941f1d2 100644 --- a/fly.toml +++ b/fly.toml @@ -6,3 +6,7 @@ primary_region = "cdg" [deploy] strategy = "rolling" + +[mount] + source = "spark-evaluate-data" + destination = "/var/lib/spark-evaluate/" \ No newline at end of file diff --git a/index.js b/index.js index 1b7e90f1..7da70c3b 100644 --- a/index.js +++ b/index.js @@ -5,12 +5,15 @@ import { evaluate } from './lib/evaluate.js' import { RoundData } from './lib/round.js' import { refreshDatabase } from './lib/platform-stats.js' import timers from 'node:timers/promises' +import fs from 'node:fs/promises' // Tweak this value to improve the chances of the data being available const PREPROCESS_DELAY = 60_000 const EVALUATE_DELAY = PREPROCESS_DELAY + 60_000 +const ROUND_BUFFER_PATH = '/var/lib/spark-evaluate/round-buffer.ndjson' + export const startEvaluate = async ({ ieContract, ieContractWithSigner, @@ -30,6 +33,25 @@ export const startEvaluate = async ({ const roundsSeen = [] let lastNewEventSeenAt = null + let roundBuffer + try { + roundBuffer = await fs.readFile(ROUND_BUFFER_PATH, 'utf8') + } catch (err) { + if (err.code !== 'ENOENT') { + console.error('CANNOT READ ROUND BUFFER:', err) + Sentry.captureException(err) + } + } + if (roundBuffer) { + const lines = roundBuffer.split('\n').filter(Boolean) + if (lines.length > 1) { + rounds.current = new RoundData(JSON.parse(lines[0])) + for (const line of lines.slice(1)) { + rounds.current.measurements.push(JSON.parse(line)) + } + } + } + const onMeasurementsAdded = async (cid, _roundIndex) => { const roundIndex = BigInt(_roundIndex) if (cidsSeen.includes(cid)) return @@ -67,7 +89,8 @@ export const startEvaluate = async ({ roundIndex, fetchMeasurements, recordTelemetry, - logger + logger, + ROUND_BUFFER_PATH }) } catch (err) { console.error('CANNOT PREPROCESS MEASUREMENTS [ROUND=%s]:', roundIndex, err) @@ -89,6 +112,15 @@ export const startEvaluate = async ({ console.log('Event: RoundStart', { roundIndex }) + try { + await fs.writeFile(ROUND_BUFFER_PATH, '') + } catch (err) { + if (err.code !== 'ENOENT') { + console.error('CANNOT DELETE ROUND BUFFER:', err) + Sentry.captureException(err) + } + } + if (!rounds.current) { console.error('No current round data available, skipping evaluation') return diff --git a/lib/preprocess.js b/lib/preprocess.js index 051664b6..2cb7d0ee 100644 --- a/lib/preprocess.js +++ b/lib/preprocess.js @@ -6,6 +6,7 @@ import { validateBlock } from '@web3-storage/car-block-validator' import { recursive as exporter } from 'ipfs-unixfs-exporter' import createDebug from 'debug' import pRetry from 'p-retry' +import fs from 'node:fs/promises' const debug = createDebug('spark:preprocess') @@ -55,7 +56,8 @@ export const preprocess = async ({ fetchMeasurements, recordTelemetry, logger, - fetchRetries = 14 + fetchRetries = 14, + ROUND_BUFFER_PATH }) => { const start = Date.now() /** @type import('./typings.js').RawMeasurement[] */ @@ -116,6 +118,10 @@ export const preprocess = async ({ logger.log('Retrieval Success Rate: %s%s (%s of %s)', Math.round(100 * okCount / total), '%', okCount, total) round.measurements.push(...validMeasurements) + await fs.appendFile( + ROUND_BUFFER_PATH, + validMeasurements.map(m => JSON.stringify(m)).join('\n') + '\n' + ) recordTelemetry('preprocess', point => { point.intField('round_index', roundIndex) From 33b2fd4ecdd5e37e178cdae7885f954b194854c7 Mon Sep 17 00:00:00 2001 From: Julian Gruber Date: Sat, 31 Aug 2024 20:39:53 +0200 Subject: [PATCH 2/4] refactor --- index.js | 33 ++++++++------------------------- lib/preprocess.js | 10 +++------- lib/round-buffer.js | 42 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 53 insertions(+), 32 deletions(-) create mode 100644 lib/round-buffer.js diff --git a/index.js b/index.js index 7da70c3b..c68767af 100644 --- a/index.js +++ b/index.js @@ -5,15 +5,13 @@ import { evaluate } from './lib/evaluate.js' import { RoundData } from './lib/round.js' import { refreshDatabase } from './lib/platform-stats.js' import timers from 'node:timers/promises' -import fs from 'node:fs/promises' +import { recoverRound, clearRoundBuffer } from './lib/round-buffer.js' // Tweak this value to improve the chances of the data being available const PREPROCESS_DELAY = 60_000 const EVALUATE_DELAY = PREPROCESS_DELAY + 60_000 -const ROUND_BUFFER_PATH = '/var/lib/spark-evaluate/round-buffer.ndjson' - export const startEvaluate = async ({ ieContract, ieContractWithSigner, @@ -33,23 +31,11 @@ export const startEvaluate = async ({ const roundsSeen = [] let lastNewEventSeenAt = null - let roundBuffer try { - roundBuffer = await fs.readFile(ROUND_BUFFER_PATH, 'utf8') + rounds.current = await recoverRound() } catch (err) { - if (err.code !== 'ENOENT') { - console.error('CANNOT READ ROUND BUFFER:', err) - Sentry.captureException(err) - } - } - if (roundBuffer) { - const lines = roundBuffer.split('\n').filter(Boolean) - if (lines.length > 1) { - rounds.current = new RoundData(JSON.parse(lines[0])) - for (const line of lines.slice(1)) { - rounds.current.measurements.push(JSON.parse(line)) - } - } + console.error('CANNOT RECOVER ROUND:', err) + Sentry.captureException(err) } const onMeasurementsAdded = async (cid, _roundIndex) => { @@ -89,8 +75,7 @@ export const startEvaluate = async ({ roundIndex, fetchMeasurements, recordTelemetry, - logger, - ROUND_BUFFER_PATH + logger }) } catch (err) { console.error('CANNOT PREPROCESS MEASUREMENTS [ROUND=%s]:', roundIndex, err) @@ -113,12 +98,10 @@ export const startEvaluate = async ({ console.log('Event: RoundStart', { roundIndex }) try { - await fs.writeFile(ROUND_BUFFER_PATH, '') + await clearRoundBuffer() } catch (err) { - if (err.code !== 'ENOENT') { - console.error('CANNOT DELETE ROUND BUFFER:', err) - Sentry.captureException(err) - } + console.error('CANNOT CLEAR ROUND BUFFER:', err) + Sentry.captureException(err) } if (!rounds.current) { diff --git a/lib/preprocess.js b/lib/preprocess.js index 2cb7d0ee..a08bd633 100644 --- a/lib/preprocess.js +++ b/lib/preprocess.js @@ -6,7 +6,7 @@ import { validateBlock } from '@web3-storage/car-block-validator' import { recursive as exporter } from 'ipfs-unixfs-exporter' import createDebug from 'debug' import pRetry from 'p-retry' -import fs from 'node:fs/promises' +import { appendToRoundBuffer } from './round-buffer.js' const debug = createDebug('spark:preprocess') @@ -56,8 +56,7 @@ export const preprocess = async ({ fetchMeasurements, recordTelemetry, logger, - fetchRetries = 14, - ROUND_BUFFER_PATH + fetchRetries = 14 }) => { const start = Date.now() /** @type import('./typings.js').RawMeasurement[] */ @@ -118,10 +117,7 @@ export const preprocess = async ({ logger.log('Retrieval Success Rate: %s%s (%s of %s)', Math.round(100 * okCount / total), '%', okCount, total) round.measurements.push(...validMeasurements) - await fs.appendFile( - ROUND_BUFFER_PATH, - validMeasurements.map(m => JSON.stringify(m)).join('\n') + '\n' - ) + await appendToRoundBuffer(validMeasurements) recordTelemetry('preprocess', point => { point.intField('round_index', roundIndex) diff --git a/lib/round-buffer.js b/lib/round-buffer.js new file mode 100644 index 00000000..83ea1c45 --- /dev/null +++ b/lib/round-buffer.js @@ -0,0 +1,42 @@ +import fs from 'node:fs/promises' +import { RoundData } from './round.js' + +const ROUND_BUFFER_PATH = '/var/lib/spark-evaluate/round-buffer.ndjson' + +export const recoverRound = async () => { + let roundBuffer + try { + roundBuffer = await fs.readFile(ROUND_BUFFER_PATH, 'utf8') + } catch (err) { + if (err.code !== 'ENOENT') { + throw err + } + } + if (roundBuffer) { + const lines = roundBuffer.split('\n').filter(Boolean) + if (lines.length > 1) { + const round = new RoundData(JSON.parse(lines[0])) + for (const line of lines.slice(1)) { + round.measurements.push(JSON.parse(line)) + } + return round + } + } +} + +export const appendToRoundBuffer = async validMeasurements => { + await fs.appendFile( + ROUND_BUFFER_PATH, + validMeasurements.map(m => JSON.stringify(m)).join('\n') + '\n' + ) +} + +export const clearRoundBuffer = async () => { + try { + await fs.writeFile(ROUND_BUFFER_PATH, '') + } catch (err) { + if (err.code !== 'ENOENT') { + throw err + } + } +} From cf650ec20c8e56489ffe21b4cc487539a5797efe Mon Sep 17 00:00:00 2001 From: Julian Gruber Date: Sat, 31 Aug 2024 20:40:56 +0200 Subject: [PATCH 3/4] style --- fly.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fly.toml b/fly.toml index c941f1d2..06a84f1d 100644 --- a/fly.toml +++ b/fly.toml @@ -9,4 +9,4 @@ primary_region = "cdg" [mount] source = "spark-evaluate-data" - destination = "/var/lib/spark-evaluate/" \ No newline at end of file + destination = "/var/lib/spark-evaluate/" From 0174f768f5b57eb125137bce9d8a046594e80489 Mon Sep 17 00:00:00 2001 From: Julian Gruber Date: Sat, 31 Aug 2024 20:42:42 +0200 Subject: [PATCH 4/4] todo --- lib/round-buffer.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/round-buffer.js b/lib/round-buffer.js index 83ea1c45..d3eea5a7 100644 --- a/lib/round-buffer.js +++ b/lib/round-buffer.js @@ -3,6 +3,7 @@ import { RoundData } from './round.js' const ROUND_BUFFER_PATH = '/var/lib/spark-evaluate/round-buffer.ndjson' +// TODO: Handle when it's from the wrong round export const recoverRound = async () => { let roundBuffer try { @@ -31,6 +32,7 @@ export const appendToRoundBuffer = async validMeasurements => { ) } +// TODO: Write round index as first line export const clearRoundBuffer = async () => { try { await fs.writeFile(ROUND_BUFFER_PATH, '')