Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: helia supports additional byteProviders #276

Closed
7 changes: 5 additions & 2 deletions packages/helia/src/helia.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import { sha256, sha512 } from 'multiformats/hashes/sha2'
import { CustomProgressEvent } from 'progress-events'
import { PinsImpl } from './pins.js'
import { BlockStorage } from './storage.js'
import { BlockProvider } from './utils/block-provider.js'
import { assertDatastoreVersionIsCurrent } from './utils/datastore-version.js'
import { NetworkedStorage } from './utils/networked-storage.js'
import type { HeliaInit } from '.'
import type { ByteProvider, HeliaInit } from '.'
import type { GCOptions, Helia } from '@helia/interface'
import type { Pins } from '@helia/interface/pins'
import type { Libp2p } from '@libp2p/interface'
Expand All @@ -22,6 +23,7 @@ const log = logger('helia')
interface HeliaImplInit<T extends Libp2p = Libp2p> extends HeliaInit<T> {
libp2p: T
blockstore: Blockstore
byteProviders: ByteProvider[]
datastore: Datastore
}

Expand Down Expand Up @@ -58,7 +60,8 @@ export class HeliaImpl implements Helia {
})

const networkedStorage = new NetworkedStorage(init.blockstore, {
bitswap: this.#bitswap
bitswap: this.#bitswap,
blockProviders: init.byteProviders.map((provider) => new BlockProvider(init.blockstore, provider))
})

this.pins = new PinsImpl(init.datastore, networkedStorage, init.dagWalkers ?? [])
Expand Down
9 changes: 9 additions & 0 deletions packages/helia/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ import { logger } from '@libp2p/logger'
import { MemoryBlockstore } from 'blockstore-core'
import { MemoryDatastore } from 'datastore-core'
import { HeliaImpl } from './helia.js'
import { getDefaultByteProviders } from './utils/byte-provider-defaults.js'
import { createLibp2p } from './utils/libp2p.js'
import { name, version } from './version.js'
import type { DefaultLibp2pServices } from './utils/libp2p-defaults.js'
import type { Helia } from '@helia/interface'
import type { ByteProvider } from '@helia/interface/blocks'
import type { Libp2p } from '@libp2p/interface'
import type { Blockstore } from 'interface-blockstore'
import type { Datastore } from 'interface-datastore'
Expand Down Expand Up @@ -112,6 +114,12 @@ export interface HeliaInit<T extends Libp2p = Libp2p> {
* webworker), pass true here to hold the gc lock in this process.
*/
holdGcLock?: boolean

/**
* ByteProviders are used to fetch blocks from other sources when they are not
* present in the blockstore.
*/
byteProviders?: ByteProvider[]
}

/**
Expand All @@ -135,6 +143,7 @@ export async function createHelia (init: HeliaInit = {}): Promise<Helia<unknown>
...init,
datastore,
blockstore,
byteProviders: init.byteProviders ?? getDefaultByteProviders(),
libp2p
})

Expand Down
66 changes: 66 additions & 0 deletions packages/helia/src/utils/block-provider.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import { logger } from '@libp2p/logger'
import forEach from 'it-foreach'
import type { Pair, GetOfflineOptions, ByteProvider } from '@helia/interface/blocks'
import type { AbortOptions } from '@libp2p/interface'
import type { Blockstore } from 'interface-blockstore'
import type { AwaitIterable } from 'interface-store'
import type { CID } from 'multiformats/cid'

const log = logger('helia:block-provider')

export interface GetOptions extends AbortOptions {
progress?: (evt: Event) => void
}

/**
* BlockProvider is a partial implementation of the Blocks interface that only handles block retrieval.
*
* This takes a {@link ByteProvider} and {@link Blockstore}. When a block is requested, it will first
* check the blockstore for the block. If it is not found, it will then call the provider to get the bytes. Once the
* bytes are retrieved, they are validated as a "block" and then that block is stored in the blockstore.
*
*/
export class BlockProvider {
private readonly blockstore: Blockstore
readonly #provider: ByteProvider

/**
* Create a new BlockProvider
*/
constructor (blockstore: Blockstore, provider: ByteProvider) {
this.blockstore = blockstore
this.#provider = provider
}

/**
* Get a block by cid, using the given ByteProvider
*/
async get (cid: CID, options: GetOfflineOptions & AbortOptions): Promise<Uint8Array> {
if (options.offline !== true && !(await this.blockstore.has(cid))) {
try {
const block = await this.#provider.get(cid, options)

await this.blockstore.put(cid, block, options)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could wrap the blockstore on put that re-hashes it to verify.

we don't need to parse the data.. we just need to verify that when we hash the block with the cid.multihash.code is the same.

add method to this class for verifying bytes. might be able to use util methods in blockstore classes/package


return block
} catch (err) {
log.error('failed to get block for %s', cid.toString(), err)
}
}

return this.blockstore.get(cid, options)
}

/**
* Get multiple blocks back from an (async) iterable of cids
*/
async * getMany (cids: AwaitIterable<CID>, options: GetOfflineOptions & AbortOptions): AsyncIterable<Pair> {
yield * this.blockstore.getMany(forEach(cids, async (cid): Promise<void> => {
if (options.offline !== true && !(await this.blockstore.has(cid))) {
const block = await this.#provider.get(cid, options)

await this.blockstore.put(cid, block, options)
}
}))
}
}
12 changes: 12 additions & 0 deletions packages/helia/src/utils/byte-provider-defaults.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { getGatewayBlockProvider } from './byte-provider-gateway.js'
import type { ByteProvider } from '@helia/interface/blocks'

export function getDefaultByteProviders (): ByteProvider[] {
return [
getGatewayBlockProvider('https://dweb.link'), // 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
getGatewayBlockProvider('https://cf-ipfs.com'), // 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
getGatewayBlockProvider('https://4everland.io'), // 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
getGatewayBlockProvider('https://w3s.link'), // 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
getGatewayBlockProvider('https://cloudflare-ipfs.com') // 2023-10-03: IPNS, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
]
}
26 changes: 26 additions & 0 deletions packages/helia/src/utils/byte-provider-gateway.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { logger } from '@libp2p/logger'
import { getRawBlockFromGateway } from './get-raw-block-from-gateway.js'
import type { ByteProvider } from '@helia/interface/blocks'
import type { CID } from 'multiformats/cid'

const log = logger('helia:gateway-block-provider')

export function getGatewayBlockProvider (url: URL | string): ByteProvider {
const byteProvider: ByteProvider = {
get: async (cid: CID, options = {}) => {
log('getting block for %s from %s', cid.toString(), url.toString())
try {
const block = await getRawBlockFromGateway(url, cid, options.signal)
log('got block for %s from %s', cid.toString(), url.toString())

return block
} catch (err) {
log.error('failed to get block for %s from %s', cid.toString(), url.toString(), err)

throw err
}
}
}

return byteProvider
}
31 changes: 31 additions & 0 deletions packages/helia/src/utils/get-raw-block-from-gateway.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import type { CID } from 'multiformats/cid'

export async function getRawBlockFromGateway (url: string | URL, cid: CID, signal?: AbortSignal): Promise<Uint8Array> {
const gwUrl = new URL(url)

gwUrl.pathname = `/ipfs/${cid.toString()}`
gwUrl.search = '?format=raw' // necessary as not every gateway supports dag-cbor, but every should support sending raw block as-is
if (signal?.aborted === true) {
throw new Error(`Signal to fetch raw block for CID ${cid} from gateway ${gwUrl.toString()} was aborted prior to fetch`)
}
try {
const res = await fetch(gwUrl.toString(), {
signal,
headers: {
// also set header, just in case ?format= is filtered out by some reverse proxy
Accept: 'application/vnd.ipld.raw'
},
cache: 'force-cache'
})
if (!res.ok) {
throw new Error(`unable to fetch raw block for CID ${cid} from gateway ${gwUrl.toString()}`)
}
return new Uint8Array(await res.arrayBuffer())
} catch (cause) {
// @ts-expect-error - TS thinks signal?.aborted can only be false now because it was checked for true above.
if (signal?.aborted === true) {
throw new Error(`fetching raw block for CID ${cid} from gateway ${gwUrl.toString()} was aborted`)
}
throw new Error(`unable to fetch raw block for CID ${cid}`)
}
}
59 changes: 53 additions & 6 deletions packages/helia/src/utils/networked-storage.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import filter from 'it-filter'
import forEach from 'it-foreach'
import { CustomProgressEvent, type ProgressOptions } from 'progress-events'
import type { BlockProvider } from './block-provider.js'
import type { Blocks, Pair, DeleteManyBlocksProgressEvents, DeleteBlockProgressEvents, GetBlockProgressEvents, GetManyBlocksProgressEvents, PutManyBlocksProgressEvents, PutBlockProgressEvents, GetAllBlocksProgressEvents, GetOfflineOptions } from '@helia/interface/blocks'
import type { AbortOptions } from '@libp2p/interface'
import type { Blockstore } from 'interface-blockstore'
Expand All @@ -11,6 +12,7 @@ import type { CID } from 'multiformats/cid'
export interface BlockStorageInit {
holdGcLock?: boolean
bitswap?: Bitswap
blockProviders?: BlockProvider[]
}

export interface GetOptions extends AbortOptions {
Expand All @@ -24,13 +26,15 @@ export interface GetOptions extends AbortOptions {
export class NetworkedStorage implements Blocks {
private readonly child: Blockstore
private readonly bitswap?: Bitswap
readonly #blockProviders: BlockProvider[]

/**
* Create a new BlockStorage
*/
constructor (blockstore: Blockstore, options: BlockStorageInit = {}) {
this.child = blockstore
this.bitswap = options.bitswap
this.#blockProviders = options.blockProviders ?? []
}

unwrap (): Blockstore {
Expand Down Expand Up @@ -79,13 +83,57 @@ export class NetworkedStorage implements Blocks {
yield * this.child.putMany(notifyEach, options)
}

async #_get (cid: CID, options: GetOfflineOptions & AbortOptions & (ProgressOptions<GetBlockProgressEvents> | ProgressOptions<GetManyBlocksProgressEvents>)): Promise<Uint8Array> {
const blockGetPromises: Array<Promise<Uint8Array>> = []
/**
* We need to create a new AbortController that aborts when:
* 1. options.signal is aborted
* 2. any of the blockGetPromises are resolved
*/
const blockProviderController = new AbortController()
const newOptions = { ...options, signal: blockProviderController.signal }
if (options.signal != null) {
// abort the blockProvider signal when the options.signal is aborted
options.signal.addEventListener('abort', (): void => {
blockProviderController.abort()
})
}

if (this.bitswap?.isStarted() === true) {
options.onProgress?.(new CustomProgressEvent<CID>('blocks:get:bitswap:get', cid))
blockGetPromises.push(this.bitswap.want(cid, newOptions))
}

for (const provider of this.#blockProviders) {
// if the signal has already been aborted, don't bother requesting from other providers.
if (!blockProviderController.signal.aborted) {
options.onProgress?.(new CustomProgressEvent<CID>('blocks:get:byte-provider:get', cid))
const providerPromise = provider.get(cid, newOptions)
void providerPromise.then(() => {
// if a provider resolves, abort the signal so we don't request bytes from any other providers
blockProviderController.abort()
})
blockGetPromises.push(providerPromise)
}
}

try {
const block = await Promise.any(blockGetPromises)
// cancel all other block get promises
blockProviderController.abort()

return block
} catch (err) {
throw new Error(`Could not get block for ${cid.toString()} from any provider`)
}
}

/**
* Get a block by cid
*/
async get (cid: CID, options: GetOfflineOptions & AbortOptions & ProgressOptions<GetBlockProgressEvents> = {}): Promise<Uint8Array> {
if (options.offline !== true && this.bitswap?.isStarted() != null && !(await this.child.has(cid))) {
options.onProgress?.(new CustomProgressEvent<CID>('blocks:get:bitswap:get', cid))
const block = await this.bitswap.want(cid, options)
if (options.offline !== true && !(await this.child.has(cid))) {
const block = await this.#_get(cid, options)

options.onProgress?.(new CustomProgressEvent<CID>('blocks:get:blockstore:put', cid))
await this.child.put(cid, block, options)
Expand All @@ -105,9 +153,8 @@ export class NetworkedStorage implements Blocks {
options.onProgress?.(new CustomProgressEvent('blocks:get-many:blockstore:get-many'))

yield * this.child.getMany(forEach(cids, async (cid): Promise<void> => {
if (options.offline !== true && this.bitswap?.isStarted() === true && !(await this.child.has(cid))) {
options.onProgress?.(new CustomProgressEvent<CID>('blocks:get-many:bitswap:get', cid))
const block = await this.bitswap.want(cid, options)
if (options.offline !== true && !(await this.child.has(cid))) {
const block = await this.#_get(cid, options)
options.onProgress?.(new CustomProgressEvent<CID>('blocks:get-many:blockstore:put', cid))
await this.child.put(cid, block, options)
}
Expand Down
1 change: 1 addition & 0 deletions packages/helia/test/fixtures/create-helia.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import type { Helia } from '@helia/interface'

export async function createHelia (): Promise<Helia> {
return createNode({
byteProviders: [],
libp2p: {
addresses: {
listen: [
Expand Down
1 change: 1 addition & 0 deletions packages/helia/test/pins.depth-limited.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ describe('pins (depth limited)', () => {
helia = await createHelia({
datastore: new MemoryDatastore(),
blockstore,
byteProviders: [],
libp2p: await createLibp2p({
transports: [
webSockets()
Expand Down
1 change: 1 addition & 0 deletions packages/helia/test/pins.recursive.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ describe('pins (recursive)', () => {

helia = await createHelia({
datastore: new MemoryDatastore(),
byteProviders: [],
blockstore,
libp2p: await createLibp2p({
transports: [
Expand Down
Loading