Skip to content

Commit

Permalink
testing dagpb content cache
Browse files Browse the repository at this point in the history
  • Loading branch information
fforbeck committed Jan 27, 2025
1 parent 09f749d commit 03a7f78
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 27 deletions.
20 changes: 15 additions & 5 deletions src/bindings.d.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,26 @@
import { CID } from '@web3-storage/gateway-lib/handlers'
import { Environment as RateLimiterEnvironment } from './middleware/withRateLimit.types.ts'
import { Environment as CarParkFetchEnvironment } from './middleware/withCarParkFetch.types.ts'
import { Environment as CarBlockEnvironment } from './middleware/withCarBlockHandler.types.ts'
import { Environment as ContentClaimsDagulaEnvironment } from './middleware/withCarBlockHandler.types.ts'
import { Environment as ContentClaimsDagulaEnvironment } from './middleware/withContentClaimsDagula.types.ts'
import { Environment as EgressTrackerEnvironment } from './middleware/withEgressTracker.types.ts'
import { Environment as EgressClientEnvironment } from './middleware/withEgressClient.types.ts'
import { Environment as GatewayIdentityEnvironment } from './middleware/withGatewayIdentity.types.ts'
import { Environment as DelegationsStorageEnvironment } from './middleware/withDelegationsStorage.types.ts'
import { Environment as LocatorEnvironment } from './middleware/withLocator.types.ts'
import { UnknownLink } from 'multiformats'
import { DIDKey } from '@ucanto/principal/ed25519'

export interface Environment
extends CarBlockEnvironment,
RateLimiterEnvironment,
ContentClaimsDagulaEnvironment,
EgressTrackerEnvironment {
extends RateLimiterEnvironment,
CarBlockEnvironment,
CarParkFetchEnvironment,
ContentClaimsDagulaEnvironment,
EgressClientEnvironment,
EgressTrackerEnvironment,
GatewayIdentityEnvironment,
DelegationsStorageEnvironment,
LocatorEnvironment {
VERSION: string
CONTENT_CLAIMS_SERVICE_URL?: string
HONEYCOMB_API_KEY: string
Expand Down
33 changes: 18 additions & 15 deletions src/middleware/withContentClaimsDagula.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Dagula } from 'dagula'
import { base58 } from 'multiformats/bases/base58'
import { base58btc } from 'multiformats/bases/base58'
import * as BatchingFetcher from '@web3-storage/blob-fetcher/fetcher/batching'
import * as dagPb from '@ipld/dag-pb'

Expand Down Expand Up @@ -28,24 +28,24 @@ import * as dagPb from '@ipld/dag-pb'
* >
* )}
*/
export function withContentClaimsDagula(handler) {
export function withContentClaimsDagula (handler) {
return async (request, env, ctx) => {
const { locator } = ctx
const fetcher = BatchingFetcher.create(locator, ctx.fetch)
const dagula = new Dagula({
async get(cid) {
async get (cid) {
const dagPbContent = await getDagPbContent(env, fetcher, cid, ctx)
if (dagPbContent) {
return dagPbContent
}
const res = await fetcher.fetch(cid.multihash)
return res.ok ? { cid, bytes: await res.ok.bytes() } : undefined
},
async stream(cid, options) {
async stream (cid, options) {
const res = await fetcher.fetch(cid.multihash, options)
return res.ok ? res.ok.stream() : undefined
},
async stat(cid) {
async stat (cid) {
const res = await locator.locate(cid.multihash)
return res.ok ? { size: res.ok.site[0].range.length } : undefined
}
Expand All @@ -64,7 +64,7 @@ export function withContentClaimsDagula(handler) {
* @param {import('@web3-storage/gateway-lib').Context} ctx
* @returns {Promise<{ cid: import('multiformats').UnknownLink, bytes: Uint8Array } | undefined>}
*/
async function getDagPbContent(env, fetcher, cid, ctx) {
async function getDagPbContent (env, fetcher, cid, ctx) {
if (env.FF_DAGPB_CONTENT_CACHE_ENABLED === 'true' && cid.code === dagPb.code) {
const cachedBytes = await getCachedDagPbBytes(env, cid)
if (cachedBytes) {
Expand Down Expand Up @@ -95,11 +95,13 @@ async function getDagPbContent(env, fetcher, cid, ctx) {
* @param {Uint8Array} bytes
* @returns {Promise<void>}
*/
async function cacheDagPbBytes(env, cid, bytes) {
if (env.FF_DAGPB_CONTENT_CACHE_MAX_SIZE_MB && bytes.length <= env.FF_DAGPB_CONTENT_CACHE_MAX_SIZE_MB * 1024 * 1024) {
async function cacheDagPbBytes (env, cid, bytes) {
const maxSize = env.FF_DAGPB_CONTENT_CACHE_MAX_SIZE_MB ? parseInt(env.FF_DAGPB_CONTENT_CACHE_MAX_SIZE_MB) * 1024 * 1024 : undefined
if (maxSize && bytes.length <= maxSize) {
try {
const ttlSeconds = env.FF_DAGPB_CONTENT_CACHE_TTL_SECONDS ?? 0
await env.DAGPB_CONTENT_CACHE.put(getDagPbKey(cid), bytes.buffer, {
const ttlSeconds = env.FF_DAGPB_CONTENT_CACHE_TTL_SECONDS ? parseInt(env.FF_DAGPB_CONTENT_CACHE_TTL_SECONDS) : 0
const key = getDagPbKey(cid)
await env.DAGPB_CONTENT_CACHE.put(key, bytes, {
expirationTtl: ttlSeconds > 60 ? ttlSeconds : undefined
})
} catch (/** @type {any} */ error) {
Expand All @@ -115,20 +117,21 @@ async function cacheDagPbBytes(env, cid, bytes) {
* @param {import('multiformats').UnknownLink} cid
* @returns {Promise<Uint8Array | null>}
*/
async function getCachedDagPbBytes(env, cid) {
const dagPbBytes = await env.DAGPB_CONTENT_CACHE.get(getDagPbKey(cid), 'arrayBuffer')
async function getCachedDagPbBytes (env, cid) {
const key = getDagPbKey(cid)
const dagPbBytes = await env.DAGPB_CONTENT_CACHE.get(key, 'arrayBuffer')
if (dagPbBytes) {
return new Uint8Array(dagPbBytes)
}
return null
}

/**
* Returns the base58 encoded key for the DAG Protobuf content in the KV store.
* Returns the base58btc encoded key for the DAG Protobuf content in the KV store.
*
* @param {import('multiformats').UnknownLink} cid
* @returns {string}
*/
function getDagPbKey(cid) {
return base58.encode(cid.multihash.bytes)
function getDagPbKey (cid) {
return base58btc.encode(cid.multihash.bytes)
}
4 changes: 2 additions & 2 deletions src/middleware/withContentClaimsDagula.types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ export interface Environment extends MiddlewareEnvironment {
* The number that represents when to expire the key-value pair in seconds from now.
* The minimum value is 60 seconds. Any value less than 60MB will not be used.
*/
FF_DAGPB_CONTENT_CACHE_TTL_SECONDS?: number
FF_DAGPB_CONTENT_CACHE_TTL_SECONDS?: string
/**
* The maximum size of the key-value pair in MB.
* The minimum value is 1 MB. Any value less than 1MB will not be used.
*/
FF_DAGPB_CONTENT_CACHE_MAX_SIZE_MB?: number
FF_DAGPB_CONTENT_CACHE_MAX_SIZE_MB?: string
/**
* The flag that enables the DAGPB content cache.
*/
Expand Down
108 changes: 103 additions & 5 deletions test/miniflare/freeway.spec.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { describe, before, beforeEach, after, it } from 'node:test'
import assert from 'node:assert'
import { randomBytes } from 'node:crypto'
import { Miniflare } from 'miniflare'
import { Log, LogLevel, Miniflare } from 'miniflare'
import * as Link from 'multiformats/link'
import { sha256 } from 'multiformats/hashes/sha2'
import * as raw from 'multiformats/codecs/raw'
Expand Down Expand Up @@ -49,17 +49,26 @@ describe('freeway', () => {
const { port } = server.address()
url = new URL(`http://127.0.0.1:${port}`)
miniflare = new Miniflare({
host: '127.0.0.1',
port: 8787,
inspectorPort: 9898,
log: new Log(LogLevel.INFO),
cache: false, // Disable Worker Global Cache to test cache middlewares
bindings: {
CONTENT_CLAIMS_SERVICE_URL: claimsService.url.toString(),
CARPARK_PUBLIC_BUCKET_URL: url.toString(),
GATEWAY_SERVICE_DID: 'did:example:gateway'
GATEWAY_SERVICE_DID: 'did:example:gateway',
DAGPB_CONTENT_CACHE: 'DAGPB_CONTENT_CACHE',
FF_DAGPB_CONTENT_CACHE_ENABLED: 'true',
FF_DAGPB_CONTENT_CACHE_TTL_SECONDS: 300,
FF_DAGPB_CONTENT_CACHE_MAX_SIZE_MB: 2
},
inspectorPort: 9898,
scriptPath: 'dist/worker.mjs',
modules: true,
compatibilityFlags: ['nodejs_compat'],
compatibilityDate: '2024-09-23',
r2Buckets: ['CARPARK']
r2Buckets: ['CARPARK'],
kvNamespaces: ['DAGPB_CONTENT_CACHE']
})

bucket = await miniflare.getR2Bucket('CARPARK')
Expand All @@ -70,10 +79,15 @@ describe('freeway', () => {
builder = new Builder(bucket)
})

beforeEach(() => {
beforeEach(async () => {
claimsService.resetCallCount()
claimsService.resetClaims()
bucketService.resetCallCount()
const dagpbCache = await miniflare.getKVNamespace('DAGPB_CONTENT_CACHE')
const keys = await dagpbCache.list()
for (const key of keys.keys) {
await dagpbCache.delete(key.name)
}
})

after(() => {
Expand Down Expand Up @@ -481,4 +495,88 @@ describe('freeway', () => {
}
}))
})

it('should be faster to get a file in a directory when the protobuf directory structure is cached', async () => {
// Generate 3 files wrapped in a folder, >2MB each to force a unixfs file header block (dag protobuf)
const input = [
new File([randomBytes(2_050_550)], 'data.txt'),
new File([randomBytes(2_050_550)], 'image.png'),
new File([randomBytes(2_050_550)], 'image2.png')
]
// Adding to the builder will generate the unixfs file header block
const { root, shards } = await builder.add(input)
assert.equal(root.code, 112, 'Root should be a protobuf directory code 112')

// Generate claims for the shards
for (const shard of shards) {
const location = new URL(toBlobKey(shard.multihash), url)
const res = await fetch(location)
assert(res.body)
const claims = await generateBlockLocationClaims(claimsService.signer, shard, res.body, location)
claimsService.addClaims(claims)
}

// Check that the cache is empty
const dagpb = await miniflare.getKVNamespace('DAGPB_CONTENT_CACHE')
const cachedContent1 = await dagpb.list()
assert.equal(cachedContent1.keys.length, 0, 'Cache should be empty')

// First request adds the file to the cache, so it takes longer
const start = performance.now()
const res = await miniflare.dispatchFetch(`http://localhost:8787/ipfs/${root}/${input[2].name}`, {
headers: {
'Cache-Control': 'no-cache'
}
})
if (!res.ok) assert.fail(`unexpected response: ${await res.text()}`)
const end = performance.now()
assertBlobEqual(input[2], await res.blob())

const cachedContent2 = await dagpb.list()
assert(cachedContent2.keys.length > 0, 'Cache should have one or more keys')

// Second request retrieves the file from the cache, so it should take less time than the first request
const start2 = performance.now()
console.log('SECOND REQUEST')
const res2 = await miniflare.dispatchFetch(`http://localhost:8787/ipfs/${root}/${input[2].name}`, {
headers: {
'Cache-Control': 'no-cache'
}
})
if (!res2.ok) assert.fail(`unexpected response: ${await res2.text()}`)
const end2 = performance.now()
assertBlobEqual(input[2], await res2.blob())
assert(end2 - start2 < end - start, 'Second request should take less time than the first request')
})

it('should not cache content if it is not dag protobuf content', async () => {
// Generate 1 file, >1MB each and do not wrap it in a folder
const input = new File([randomBytes(256)], 'data.txt')
const { root, shards } = await builder.add(input)
assert.equal(root.code, 85, 'Root should be a raw file code 85')

// Generate claims for the shards
for (const shard of shards) {
const location = new URL(toBlobKey(shard.multihash), url)
const res = await fetch(location)
assert(res.body)
const claims = await generateBlockLocationClaims(claimsService.signer, shard, res.body, location)
claimsService.addClaims(claims)
}

// Check that the cache is empty
const dagpb = await miniflare.getKVNamespace('DAGPB_CONTENT_CACHE')
const cachedContent = await dagpb.list()
assert.equal(cachedContent.keys.length, 0, 'Cache should be empty')

// It should not add the file to the cache, because it is not dag protobuf content
const res = await miniflare.dispatchFetch(`http://localhost:8787/ipfs/${root}`, {
headers: {
'Cache-Control': 'no-cache'
}
})
if (!res.ok) assert.fail(`unexpected response: ${await res.text()}`)
assertBlobEqual(input, await res.blob())
assert.equal(cachedContent.keys.length, 0, 'Cache should be empty')
})
})
3 changes: 3 additions & 0 deletions wrangler.toml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ r2_buckets = [
DEBUG = "true"
FF_RATE_LIMITER_ENABLED = "false"
FF_EGRESS_TRACKER_ENABLED = "false"
FF_DAGPB_CONTENT_CACHE_ENABLED = "true"
FF_DAGPB_CONTENT_CACHE_MAX_SIZE_MB = 2
FF_DAGPB_CONTENT_CACHE_TTL_SECONDS = 300
FF_TELEMETRY_ENABLED = "true"
TELEMETRY_RATIO = 1.0
FF_RAMP_UP_PROBABILITY = "100"
Expand Down

0 comments on commit 03a7f78

Please sign in to comment.