Skip to content

Commit

Permalink
Add persistent round buffer. Closes #3
Browse files Browse the repository at this point in the history
  • Loading branch information
juliangruber committed Aug 31, 2024
1 parent 43d89a4 commit e52f9e2
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 2 deletions.
4 changes: 4 additions & 0 deletions fly.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,7 @@ primary_region = "cdg"

[deploy]
strategy = "rolling"

[mount]
source = "spark-evaluate-data"
destination = "/var/lib/spark-evaluate/"
34 changes: 33 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ import { evaluate } from './lib/evaluate.js'
import { RoundData } from './lib/round.js'
import { refreshDatabase } from './lib/platform-stats.js'
import timers from 'node:timers/promises'
import fs from 'node:fs/promises'

// Tweak this value to improve the chances of the data being available
const PREPROCESS_DELAY = 60_000

const EVALUATE_DELAY = PREPROCESS_DELAY + 60_000

const ROUND_BUFFER_PATH = '/var/lib/spark-evaluate/round-buffer.ndjson'

export const startEvaluate = async ({
ieContract,
ieContractWithSigner,
Expand All @@ -30,6 +33,25 @@ export const startEvaluate = async ({
const roundsSeen = []
let lastNewEventSeenAt = null

let roundBuffer
try {
roundBuffer = await fs.readFile(ROUND_BUFFER_PATH, 'utf8')
} catch (err) {
if (err.code !== 'ENOENT') {
console.error('CANNOT READ ROUND BUFFER:', err)
Sentry.captureException(err)
}
}
if (roundBuffer) {
const lines = roundBuffer.split('\n').filter(Boolean)
if (lines.length > 1) {
rounds.current = new RoundData(JSON.parse(lines[0]))
for (const line of lines.slice(1)) {
rounds.current.measurements.push(JSON.parse(line))
}
}
}

const onMeasurementsAdded = async (cid, _roundIndex) => {
const roundIndex = BigInt(_roundIndex)
if (cidsSeen.includes(cid)) return
Expand Down Expand Up @@ -67,7 +89,8 @@ export const startEvaluate = async ({
roundIndex,
fetchMeasurements,
recordTelemetry,
logger
logger,
ROUND_BUFFER_PATH
})
} catch (err) {
console.error('CANNOT PREPROCESS MEASUREMENTS [ROUND=%s]:', roundIndex, err)
Expand All @@ -89,6 +112,15 @@ export const startEvaluate = async ({

console.log('Event: RoundStart', { roundIndex })

try {
await fs.writeFile(ROUND_BUFFER_PATH, '')
} catch (err) {
if (err.code !== 'ENOENT') {
console.error('CANNOT DELETE ROUND BUFFER:', err)
Sentry.captureException(err)
}
}

if (!rounds.current) {
console.error('No current round data available, skipping evaluation')
return
Expand Down
8 changes: 7 additions & 1 deletion lib/preprocess.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { validateBlock } from '@web3-storage/car-block-validator'
import { recursive as exporter } from 'ipfs-unixfs-exporter'
import createDebug from 'debug'
import pRetry from 'p-retry'
import fs from 'node:fs/promises'

const debug = createDebug('spark:preprocess')

Expand Down Expand Up @@ -55,7 +56,8 @@ export const preprocess = async ({
fetchMeasurements,
recordTelemetry,
logger,
fetchRetries = 14
fetchRetries = 14,
ROUND_BUFFER_PATH
}) => {
const start = Date.now()
/** @type import('./typings.js').RawMeasurement[] */
Expand Down Expand Up @@ -116,6 +118,10 @@ export const preprocess = async ({
logger.log('Retrieval Success Rate: %s%s (%s of %s)', Math.round(100 * okCount / total), '%', okCount, total)

round.measurements.push(...validMeasurements)
await fs.appendFile(
ROUND_BUFFER_PATH,
validMeasurements.map(m => JSON.stringify(m)).join('\n') + '\n'
)

recordTelemetry('preprocess', point => {
point.intField('round_index', roundIndex)
Expand Down

0 comments on commit e52f9e2

Please sign in to comment.