diff --git a/fly.toml b/fly.toml index 1988b0ea..c941f1d2 100644 --- a/fly.toml +++ b/fly.toml @@ -6,3 +6,7 @@ primary_region = "cdg" [deploy] strategy = "rolling" + +[mount] + source = "spark-evaluate-data" + destination = "/var/lib/spark-evaluate/" \ No newline at end of file diff --git a/index.js b/index.js index 1b7e90f1..7da70c3b 100644 --- a/index.js +++ b/index.js @@ -5,12 +5,15 @@ import { evaluate } from './lib/evaluate.js' import { RoundData } from './lib/round.js' import { refreshDatabase } from './lib/platform-stats.js' import timers from 'node:timers/promises' +import fs from 'node:fs/promises' // Tweak this value to improve the chances of the data being available const PREPROCESS_DELAY = 60_000 const EVALUATE_DELAY = PREPROCESS_DELAY + 60_000 +const ROUND_BUFFER_PATH = '/var/lib/spark-evaluate/round-buffer.ndjson' + export const startEvaluate = async ({ ieContract, ieContractWithSigner, @@ -30,6 +33,25 @@ export const startEvaluate = async ({ const roundsSeen = [] let lastNewEventSeenAt = null + let roundBuffer + try { + roundBuffer = await fs.readFile(ROUND_BUFFER_PATH, 'utf8') + } catch (err) { + if (err.code !== 'ENOENT') { + console.error('CANNOT READ ROUND BUFFER:', err) + Sentry.captureException(err) + } + } + if (roundBuffer) { + const lines = roundBuffer.split('\n').filter(Boolean) + if (lines.length > 1) { + rounds.current = new RoundData(JSON.parse(lines[0])) + for (const line of lines.slice(1)) { + rounds.current.measurements.push(JSON.parse(line)) + } + } + } + const onMeasurementsAdded = async (cid, _roundIndex) => { const roundIndex = BigInt(_roundIndex) if (cidsSeen.includes(cid)) return @@ -67,7 +89,8 @@ export const startEvaluate = async ({ roundIndex, fetchMeasurements, recordTelemetry, - logger + logger, + ROUND_BUFFER_PATH }) } catch (err) { console.error('CANNOT PREPROCESS MEASUREMENTS [ROUND=%s]:', roundIndex, err) @@ -89,6 +112,15 @@ export const startEvaluate = async ({ console.log('Event: RoundStart', { roundIndex }) + try { + await fs.writeFile(ROUND_BUFFER_PATH, '') + } catch (err) { + if (err.code !== 'ENOENT') { + console.error('CANNOT DELETE ROUND BUFFER:', err) + Sentry.captureException(err) + } + } + if (!rounds.current) { console.error('No current round data available, skipping evaluation') return diff --git a/lib/preprocess.js b/lib/preprocess.js index 051664b6..2cb7d0ee 100644 --- a/lib/preprocess.js +++ b/lib/preprocess.js @@ -6,6 +6,7 @@ import { validateBlock } from '@web3-storage/car-block-validator' import { recursive as exporter } from 'ipfs-unixfs-exporter' import createDebug from 'debug' import pRetry from 'p-retry' +import fs from 'node:fs/promises' const debug = createDebug('spark:preprocess') @@ -55,7 +56,8 @@ export const preprocess = async ({ fetchMeasurements, recordTelemetry, logger, - fetchRetries = 14 + fetchRetries = 14, + ROUND_BUFFER_PATH }) => { const start = Date.now() /** @type import('./typings.js').RawMeasurement[] */ @@ -116,6 +118,10 @@ export const preprocess = async ({ logger.log('Retrieval Success Rate: %s%s (%s of %s)', Math.round(100 * okCount / total), '%', okCount, total) round.measurements.push(...validMeasurements) + await fs.appendFile( + ROUND_BUFFER_PATH, + validMeasurements.map(m => JSON.stringify(m)).join('\n') + '\n' + ) recordTelemetry('preprocess', point => { point.intField('round_index', roundIndex)