Skip to content

Commit

Permalink
feat: followup fixes for #1945 after testing nft-ttr cron (#2000)
Browse files Browse the repository at this point in the history
* fix nft-ttr logging and multiple gateways support

* remove byteLength label from RetrievalDurationMetric

* yarn

* yarn

* cron.yml action does npm rebuild sharp before test

* Revert "cron.yml action does npm rebuild sharp before test"

This reverts commit 3d00e0d.

* cron ava disble workerThreads because sharp doesn't play nice with them https://sharp.pixelplumbing.com/install#worker-threads

* cron workflow runs on pull_request to all branches, not just main

* cron: add linearBuckets to retrieval_duration_seconds histogram

* measureNftTimeToRetrievability.js starts retrievals in parallel after store so both start right after upload. previous serial method would lead to too-low metrics for the second gateway
  • Loading branch information
gobengo authored Jun 22, 2022
1 parent bbbe70d commit 0765548
Show file tree
Hide file tree
Showing 10 changed files with 5,192 additions and 5,584 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/cron-nft-ttr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ jobs:
PUSHGATEWAY_URL: https://pushgateway.k8s.locotorp.info
PUSHGATEWAY_BASIC_AUTH: ${{ secrets.PUSHGATEWAY_BASIC_AUTH }}
run: |
yarn workspace cron run start:nft-ttr measure --logConfigAndExit
yarn workspace cron run start:nft-ttr measure \
--minImageSizeBytes=10000000 \
--gateways https://nftstorage.link https://dweb.link \
--gateway https://nftstorage.link \
--gateway https://dweb.link \
--metricsPushGateway $PUSHGATEWAY_URL \
--metricsPushGatewayJobName $PUSHGATEWAY_JOBNAME \
Expand Down
2 changes: 0 additions & 2 deletions .github/workflows/cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ on:
- '.github/workflows/cron-nft-ttr.yml'
- 'yarn.lock'
pull_request:
branches:
- main
paths:
- 'packages/cron/**'
- '.github/workflows/cron.yml'
Expand Down
3 changes: 3 additions & 0 deletions packages/cron/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@
"eslint-plugin-prettier": "^4.0.0",
"npm-run-all": "^4.1.5"
},
"ava": {
"workerThreads": false
},
"eslintConfig": {
"plugins": [
"@typescript-eslint"
Expand Down
29 changes: 18 additions & 11 deletions packages/cron/src/bin/nft-ttr.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,14 @@ export function createMeasureSecretsFromEnv(env) {
export function createMeasureOptionsFromSade(sadeOptions, secrets) {
// build gateways
assert.ok(
hasOwnProperty(sadeOptions, 'gateways'),
'expected sadeOptions to have gateways'
hasOwnProperty(sadeOptions, 'gateway'),
'expected sadeOptions to have gateway'
)
const gatewaysArgv = sadeOptions.gateways
assert.ok(Array.isArray(gatewaysArgv) || typeof gatewaysArgv === 'string')
const gatewaysArgvsArray = Array.isArray(gatewaysArgv)
? gatewaysArgv.map(String)
: [...gatewaysArgv.split(' ')]
const gatewayArgv = sadeOptions.gateway
assert.ok(Array.isArray(gatewayArgv) || typeof gatewayArgv === 'string')
const gatewaysArgvsArray = Array.isArray(gatewayArgv)
? gatewayArgv.map(String)
: [...gatewayArgv.split(' ')]
const gateways = gatewaysArgvsArray.map((g) => {
try {
return new URL(g)
Expand Down Expand Up @@ -133,11 +133,17 @@ export function createMeasureOptionsFromSade(sadeOptions, secrets) {
secrets.metricsPushGatewayAuthorization
)

const logConfigAndExit = Boolean(
hasOwnProperty(sadeOptions, 'logConfigAndExit') &&
sadeOptions.logConfigAndExit
)

// build final options
const options = {
...defaultMeasureOptions(),
gateways,
images: createTestImages(1, minImageSizeBytes),
logConfigAndExit,
metricsPushGateway,
metricsPushGatewayJobName,
minImageSizeBytes,
Expand Down Expand Up @@ -197,8 +203,8 @@ export async function* cli(
false
)
.option(
'--gateways',
'IPFS gateway(s) to use to measure time to retrieval of the upload',
'--gateway',
'IPFS gateway to use to measure time to retrieval of the upload',
'https://nftstorage.link'
)
.option(
Expand All @@ -210,7 +216,7 @@ export async function* cli(
iterable = measureNftTimeToRetrievability({
secrets,
...createMeasureOptionsFromSade(opts, secrets),
...options,
console: options.console,
})
})
argParser.parse(argv)
Expand All @@ -235,7 +241,8 @@ function parseBasicAuth(basicAuthEnvVarString) {
*/
async function main(argv) {
// eslint-disable-next-line no-empty,@typescript-eslint/no-unused-vars,no-unused-vars
for await (const _ of cli(argv)) {
for await (const _ of cli(argv, { console, env: process.env })) {
console.debug(_)
}
}

Expand Down
78 changes: 18 additions & 60 deletions packages/cron/src/bin/nft-ttr.spec.js
Original file line number Diff line number Diff line change
@@ -1,36 +1,40 @@
import { test } from '../lib/testing.js'
import {
cli as binNftTtr,
createMeasureOptionsFromSade,
createMeasureSecretsFromEnv,
} from './nft-ttr.js'
import {
createStubbedImageFetcher,
createStubStoreFunction,
} from '../jobs/measureNftTimeToRetrievability.js'
import all from 'it-all'
import { Writable } from 'node:stream'

const defaultTestMinImageSizeBytes = 10 * 1e6

test('createMeasureOptionsFromSade', (t) => {
const sampleGateways = ['https://nftstorage.link/', 'https://dweb.link/']
const sampleSade = {
_: [],
minImageSizeBytes: 1,
metricsPushGateway:
'https://pushgateway.k8s.locotorp.info/metrics/job/nftstorage_ci/instance/github_action',
url: 'https://nft.storage',
metricsPushGatewayJobName: 'nft-ttr',
gateways: 'https://nftstorage.link/',
gateway: sampleGateways,
logConfigAndExit: true,
}
const options = createMeasureOptionsFromSade(sampleSade, {
const secrets = {
metricsPushGatewayAuthorization: '',
})
}
const options = createMeasureOptionsFromSade(sampleSade, secrets)
t.assert(options)
t.is(options.gateways.length, 1)
t.is(options.gateways[0].toString(), sampleSade.gateways)
t.is(options.gateways.length, 2)
t.is(options.gateways[0].toString(), sampleGateways[0])
t.is(options.gateways[1].toString(), sampleGateways[1])
t.is(options.metricsPushGateway?.toString(), sampleSade.metricsPushGateway)
t.is(options.metricsPushGatewayJobName, sampleSade.metricsPushGatewayJobName)
t.is(options.logConfigAndExit, true)

// ensure single gateway is parsed correctly
const options2 = createMeasureOptionsFromSade(
{ ...sampleSade, gateway: sampleGateways[0] },
secrets
)
t.is(options2.gateways.length, 1)
t.is(options2.gateways[0].toString(), sampleGateways[0])
})

test('createMeasureSecretsFromEnv', (t) => {
Expand All @@ -52,49 +56,3 @@ test('createMeasureSecretsFromEnv', (t) => {
'Basic dXNlcjpwYXNz'
)
})

test(`bin/nft-ttr works with --minImageSizeBytes=${defaultTestMinImageSizeBytes} and multiple gateways`, async (t) => {
const minImageSizeBytes = defaultTestMinImageSizeBytes
const gateways = ['https://nftstorage.link', 'https://dweb.link']
const command = [
'fakeNodePath',
'fakeScriptPath',
'measure',
`--minImageSizeBytes=${minImageSizeBytes}`,
`--gateways=${gateways.join(' ')}`,
]
const activities = await all(
binNftTtr(command, {
console: new console.Console(new Writable(), new Writable()),
env: {
NFT_STORAGE_API_KEY: '',
},
store: createStubStoreFunction(),
fetchImage: createStubbedImageFetcher(minImageSizeBytes),
})
)
let retrieveCount = 0
const gatewaysNeedingRetrieval = new Set(gateways)
for (const activity of activities) {
if (activity.type !== 'retrieve') {
continue
}
const retrieve = activity
retrieveCount++
t.assert(retrieve)
t.is(
typeof retrieve.duration.size,
'number',
'expected retrieve duration size to be a number'
)
t.assert(retrieve.contentLength > minImageSizeBytes)
for (const gateway of gatewaysNeedingRetrieval) {
if (retrieve.url.toString().startsWith(gateway)) {
gatewaysNeedingRetrieval.delete(gateway)
break
}
}
}
t.is(gatewaysNeedingRetrieval.size, 0)
t.is(retrieveCount, gateways.length)
})
92 changes: 58 additions & 34 deletions packages/cron/src/jobs/measureNftTimeToRetrievability.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { NFTStorage } from 'nft.storage'
import { Milliseconds, now } from '../lib/time.js'
import { Pushgateway } from 'prom-client'
import { createRandomImage, createRandomImageBlob } from '../lib/random.js'
import { pipeline, parallelMerge, flatten } from 'streaming-iterables'

export const EXAMPLE_NFT_IMG_URL = new URL(
'https://bafybeiarmhq3d7msony7zfq67gmn46syuv6jrc6dagob2wflunxiyaksj4.ipfs.dweb.link/1681.png'
Expand All @@ -11,6 +12,7 @@ export const EXAMPLE_NFT_IMG_URL = new URL(
* @typedef RetrieveLog
* @property {string} image
* @property {"retrieve"} type
* @property {URL} gateway
* @property {URL} url
* @property {number} contentLength
* @property {Date} startTime
Expand Down Expand Up @@ -67,7 +69,7 @@ export function createStubStoreFunction() {
* @property {AsyncIterable<Blob>} images - images to upload/retrieve
* @property {StoreFunction} [store] - function to store nft
* @property {string} [url] - URL to nft.storage to measure
* @property {boolean} [logConfigAndExit] - if true, log config and exit
* @property {boolean} logConfigAndExit - if true, log config and exit
* @property {URL} [metricsPushGateway] - Server to send metrics to. should reference a https://github.com/prometheus/pushgateway
* @property {URL[]} gateways - IPFS Gateway to test retrieval from
* @property {Console} console - logger
Expand Down Expand Up @@ -101,15 +103,18 @@ function readMeasureTtrOptions(options) {
*/

/**
* @typedef StoreLog
* @property {"store"} type
* @typedef StoreStartLog
* @property {"store/start"} type
* @property {string} image
* @property {Date} startTime
* @property {Milliseconds} duration
*/

/**
* @typedef {StoreLog|Activity<"start"|"finish">|RetrieveLog} MeasureTtrLog
* @typedef StoreLog
* @property {"store"} type
* @property {string} image
* @property {Date} startTime
* @property {Milliseconds} duration
*/

/**
Expand All @@ -119,7 +124,10 @@ function readMeasureTtrOptions(options) {
* * retrieve image through ipfs gateway
* @param {MeasureTtrOptions} options
* @returns {AsyncIterable<
* StoreLog|Activity<"start"|"finish">|RetrieveLog
* | StoreStartLog
* | StoreLog
* | Activity<"start"|"finish">
* | RetrieveLog
* >}
*/
export async function* measureNftTimeToRetrievability(options) {
Expand All @@ -133,7 +141,6 @@ export async function* measureNftTimeToRetrievability(options) {
const start = {
type: 'start',
}
config.console.debug(start)
yield start
for await (const image of config.images) {
const imageId = Number(new Date()).toString()
Expand All @@ -149,12 +156,13 @@ export async function* measureNftTimeToRetrievability(options) {
const storeStartedAt = now()
/** @type {StoreFunction} */
const store = config.store || ((nft) => client.store(nft))
const storeBeforeLog = {
type: 'store/before',
/** @type {StoreStartLog} */
const storeStartLog = {
type: 'store/start',
image: imageId,
startTime: new Date(),
}
config.console.debug(storeBeforeLog)
yield storeStartLog
const metadata = await store(nft)
const storeEndAt = now()
/** @type {StoreLog} */
Expand All @@ -165,22 +173,30 @@ export async function* measureNftTimeToRetrievability(options) {
duration: Milliseconds.subtract(storeEndAt, storeStartedAt),
}
yield storeLog
config.console.debug(storeLog)
for (const gateway of config.gateways) {
/** @type {RetrieveLog} */
let retrieval
try {
retrieval = await retrieve(options, {
id: imageId,
url: createGatewayRetrievalUrl(gateway, metadata.ipnft),
})
} catch (error) {
console.error('error retrieving', error)
throw error
}
yield retrieval
await options.pushRetrieveMetrics(options, retrieval)
}
yield* pipeline(
() =>
parallelMerge(
config.gateways.map(async function* (gateway) {
/** @type {RetrieveLog} */
let retrieval
try {
retrieval = await retrieve(
{ ...options, gateway },
{
id: imageId,
url: createGatewayRetrievalUrl(gateway, metadata.ipnft),
}
)
} catch (error) {
console.error('error retrieving', error)
throw error
}
yield retrieval
await options.pushRetrieveMetrics(options, retrieval)
})
),
flatten
)
}
/** @type {Activity<"finish">} */
yield { type: 'finish' }
Expand Down Expand Up @@ -213,9 +229,9 @@ export const httpImageFetcher = (fetch) => async (url) => {
}

/**
* @typedef {object }RetrieveImageOptions
* @typedef {object} RetrieveImageOptions
* @property {URL} gateway
* @property {ImageFetcher} fetchImage
* @property {RetrievalMetricsLogger} pushRetrieveMetrics
* @property {Console} console
*/

Expand All @@ -237,6 +253,7 @@ async function retrieve(options, image) {
/** @type {RetrieveLog} */
const retrieveLog = {
type: 'retrieve',
gateway: options.gateway,
url: image.url,
image: image.id,
/** length in bytes */
Expand Down Expand Up @@ -270,8 +287,7 @@ function createGatewayRetrievalUrl(gatewayUrl, ipnftCid) {
*/
export function createStubbedRetrievalMetricsLogger() {
/** @type {RetrievalMetricsLogger} */
const push = async (options, retrieval) => {
options.console.debug({ type: 'stubbedRetrievalMetricsLogger', retrieval })
const push = async () => {
return Promise.resolve()
}
return push
Expand Down Expand Up @@ -303,14 +319,22 @@ export function createPromClientRetrievalMetricsLogger(
)
/** @type {RetrievalMetricsLogger} */
const push = async (options, retrieval) => {
metric.observe(retrieval.duration, {
byteLength: retrieval.contentLength,
})
const value = retrieval.duration
metric.observe(value, {})
const pushAddArgs = {
jobName: metricsPushGatewayJobName,
}
await pushgateway.pushAdd(pushAddArgs)
options.console.debug({ type: 'pushgateway.pushAdd', args: pushAddArgs })
options.console.debug({
type: 'metricPushed',
metric: {
name: metric.name,
},
observation: {
value,
},
pushgateway: pushAddArgs,
})
}
return push
}
Expand Down
Loading

0 comments on commit 0765548

Please sign in to comment.