diff --git a/packages/helia/src/helia.ts b/packages/helia/src/helia.ts index dd8ba172e..095a6506d 100644 --- a/packages/helia/src/helia.ts +++ b/packages/helia/src/helia.ts @@ -6,9 +6,10 @@ import { sha256, sha512 } from 'multiformats/hashes/sha2' import { CustomProgressEvent } from 'progress-events' import { PinsImpl } from './pins.js' import { BlockStorage } from './storage.js' +import { BlockProvider } from './utils/block-provider.js' import { assertDatastoreVersionIsCurrent } from './utils/datastore-version.js' import { NetworkedStorage } from './utils/networked-storage.js' -import type { HeliaInit } from '.' +import type { ByteProvider, HeliaInit } from '.' import type { GCOptions, Helia } from '@helia/interface' import type { Pins } from '@helia/interface/pins' import type { Libp2p } from '@libp2p/interface' @@ -22,6 +23,7 @@ const log = logger('helia') interface HeliaImplInit extends HeliaInit { libp2p: T blockstore: Blockstore + byteProviders: ByteProvider[] datastore: Datastore } @@ -58,7 +60,8 @@ export class HeliaImpl implements Helia { }) const networkedStorage = new NetworkedStorage(init.blockstore, { - bitswap: this.#bitswap + bitswap: this.#bitswap, + blockProviders: init.byteProviders.map((provider) => new BlockProvider(init.blockstore, provider)) }) this.pins = new PinsImpl(init.datastore, networkedStorage, init.dagWalkers ?? []) diff --git a/packages/helia/src/index.ts b/packages/helia/src/index.ts index 47606f2d6..0067183c0 100644 --- a/packages/helia/src/index.ts +++ b/packages/helia/src/index.ts @@ -25,10 +25,12 @@ import { logger } from '@libp2p/logger' import { MemoryBlockstore } from 'blockstore-core' import { MemoryDatastore } from 'datastore-core' import { HeliaImpl } from './helia.js' +import { getDefaultByteProviders } from './utils/byte-provider-defaults.js' import { createLibp2p } from './utils/libp2p.js' import { name, version } from './version.js' import type { DefaultLibp2pServices } from './utils/libp2p-defaults.js' import type { Helia } from '@helia/interface' +import type { ByteProvider } from '@helia/interface/blocks' import type { Libp2p } from '@libp2p/interface' import type { Blockstore } from 'interface-blockstore' import type { Datastore } from 'interface-datastore' @@ -112,6 +114,12 @@ export interface HeliaInit { * webworker), pass true here to hold the gc lock in this process. */ holdGcLock?: boolean + + /** + * ByteProviders are used to fetch blocks from other sources when they are not + * present in the blockstore. + */ + byteProviders?: ByteProvider[] } /** @@ -135,6 +143,7 @@ export async function createHelia (init: HeliaInit = {}): Promise ...init, datastore, blockstore, + byteProviders: init.byteProviders ?? getDefaultByteProviders(), libp2p }) diff --git a/packages/helia/src/utils/block-provider.ts b/packages/helia/src/utils/block-provider.ts new file mode 100644 index 000000000..97c6fe054 --- /dev/null +++ b/packages/helia/src/utils/block-provider.ts @@ -0,0 +1,66 @@ +import { logger } from '@libp2p/logger' +import forEach from 'it-foreach' +import type { Pair, GetOfflineOptions, ByteProvider } from '@helia/interface/blocks' +import type { AbortOptions } from '@libp2p/interface' +import type { Blockstore } from 'interface-blockstore' +import type { AwaitIterable } from 'interface-store' +import type { CID } from 'multiformats/cid' + +const log = logger('helia:block-provider') + +export interface GetOptions extends AbortOptions { + progress?: (evt: Event) => void +} + +/** + * BlockProvider is a partial implementation of the Blocks interface that only handles block retrieval. + * + * This takes a {@link ByteProvider} and {@link Blockstore}. When a block is requested, it will first + * check the blockstore for the block. If it is not found, it will then call the provider to get the bytes. Once the + * bytes are retrieved, they are validated as a "block" and then that block is stored in the blockstore. + * + */ +export class BlockProvider { + private readonly blockstore: Blockstore + readonly #provider: ByteProvider + + /** + * Create a new BlockProvider + */ + constructor (blockstore: Blockstore, provider: ByteProvider) { + this.blockstore = blockstore + this.#provider = provider + } + + /** + * Get a block by cid, using the given ByteProvider + */ + async get (cid: CID, options: GetOfflineOptions & AbortOptions): Promise { + if (options.offline !== true && !(await this.blockstore.has(cid))) { + try { + const block = await this.#provider.get(cid, options) + + await this.blockstore.put(cid, block, options) + + return block + } catch (err) { + log.error('failed to get block for %s', cid.toString(), err) + } + } + + return this.blockstore.get(cid, options) + } + + /** + * Get multiple blocks back from an (async) iterable of cids + */ + async * getMany (cids: AwaitIterable, options: GetOfflineOptions & AbortOptions): AsyncIterable { + yield * this.blockstore.getMany(forEach(cids, async (cid): Promise => { + if (options.offline !== true && !(await this.blockstore.has(cid))) { + const block = await this.#provider.get(cid, options) + + await this.blockstore.put(cid, block, options) + } + })) + } +} diff --git a/packages/helia/src/utils/byte-provider-defaults.ts b/packages/helia/src/utils/byte-provider-defaults.ts new file mode 100644 index 000000000..5884c55d8 --- /dev/null +++ b/packages/helia/src/utils/byte-provider-defaults.ts @@ -0,0 +1,12 @@ +import { getGatewayBlockProvider } from './byte-provider-gateway.js' +import type { ByteProvider } from '@helia/interface/blocks' + +export function getDefaultByteProviders (): ByteProvider[] { + return [ + getGatewayBlockProvider('https://dweb.link'), // 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/ + getGatewayBlockProvider('https://cf-ipfs.com'), // 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/ + getGatewayBlockProvider('https://4everland.io'), // 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/ + getGatewayBlockProvider('https://w3s.link'), // 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/ + getGatewayBlockProvider('https://cloudflare-ipfs.com') // 2023-10-03: IPNS, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/ + ] +} diff --git a/packages/helia/src/utils/byte-provider-gateway.ts b/packages/helia/src/utils/byte-provider-gateway.ts new file mode 100644 index 000000000..17324a588 --- /dev/null +++ b/packages/helia/src/utils/byte-provider-gateway.ts @@ -0,0 +1,26 @@ +import { logger } from '@libp2p/logger' +import { getRawBlockFromGateway } from './get-raw-block-from-gateway.js' +import type { ByteProvider } from '@helia/interface/blocks' +import type { CID } from 'multiformats/cid' + +const log = logger('helia:gateway-block-provider') + +export function getGatewayBlockProvider (url: URL | string): ByteProvider { + const byteProvider: ByteProvider = { + get: async (cid: CID, options = {}) => { + log('getting block for %s from %s', cid.toString(), url.toString()) + try { + const block = await getRawBlockFromGateway(url, cid, options.signal) + log('got block for %s from %s', cid.toString(), url.toString()) + + return block + } catch (err) { + log.error('failed to get block for %s from %s', cid.toString(), url.toString(), err) + + throw err + } + } + } + + return byteProvider +} diff --git a/packages/helia/src/utils/get-raw-block-from-gateway.ts b/packages/helia/src/utils/get-raw-block-from-gateway.ts new file mode 100644 index 000000000..481d74f8d --- /dev/null +++ b/packages/helia/src/utils/get-raw-block-from-gateway.ts @@ -0,0 +1,31 @@ +import type { CID } from 'multiformats/cid' + +export async function getRawBlockFromGateway (url: string | URL, cid: CID, signal?: AbortSignal): Promise { + const gwUrl = new URL(url) + + gwUrl.pathname = `/ipfs/${cid.toString()}` + gwUrl.search = '?format=raw' // necessary as not every gateway supports dag-cbor, but every should support sending raw block as-is + if (signal?.aborted === true) { + throw new Error(`Signal to fetch raw block for CID ${cid} from gateway ${gwUrl.toString()} was aborted prior to fetch`) + } + try { + const res = await fetch(gwUrl.toString(), { + signal, + headers: { + // also set header, just in case ?format= is filtered out by some reverse proxy + Accept: 'application/vnd.ipld.raw' + }, + cache: 'force-cache' + }) + if (!res.ok) { + throw new Error(`unable to fetch raw block for CID ${cid} from gateway ${gwUrl.toString()}`) + } + return new Uint8Array(await res.arrayBuffer()) + } catch (cause) { + // @ts-expect-error - TS thinks signal?.aborted can only be false now because it was checked for true above. + if (signal?.aborted === true) { + throw new Error(`fetching raw block for CID ${cid} from gateway ${gwUrl.toString()} was aborted`) + } + throw new Error(`unable to fetch raw block for CID ${cid}`) + } +} diff --git a/packages/helia/src/utils/networked-storage.ts b/packages/helia/src/utils/networked-storage.ts index 4ef96a8fa..411b39149 100644 --- a/packages/helia/src/utils/networked-storage.ts +++ b/packages/helia/src/utils/networked-storage.ts @@ -1,6 +1,7 @@ import filter from 'it-filter' import forEach from 'it-foreach' import { CustomProgressEvent, type ProgressOptions } from 'progress-events' +import type { BlockProvider } from './block-provider.js' import type { Blocks, Pair, DeleteManyBlocksProgressEvents, DeleteBlockProgressEvents, GetBlockProgressEvents, GetManyBlocksProgressEvents, PutManyBlocksProgressEvents, PutBlockProgressEvents, GetAllBlocksProgressEvents, GetOfflineOptions } from '@helia/interface/blocks' import type { AbortOptions } from '@libp2p/interface' import type { Blockstore } from 'interface-blockstore' @@ -11,6 +12,7 @@ import type { CID } from 'multiformats/cid' export interface BlockStorageInit { holdGcLock?: boolean bitswap?: Bitswap + blockProviders?: BlockProvider[] } export interface GetOptions extends AbortOptions { @@ -24,6 +26,7 @@ export interface GetOptions extends AbortOptions { export class NetworkedStorage implements Blocks { private readonly child: Blockstore private readonly bitswap?: Bitswap + readonly #blockProviders: BlockProvider[] /** * Create a new BlockStorage @@ -31,6 +34,7 @@ export class NetworkedStorage implements Blocks { constructor (blockstore: Blockstore, options: BlockStorageInit = {}) { this.child = blockstore this.bitswap = options.bitswap + this.#blockProviders = options.blockProviders ?? [] } unwrap (): Blockstore { @@ -79,13 +83,57 @@ export class NetworkedStorage implements Blocks { yield * this.child.putMany(notifyEach, options) } + async #_get (cid: CID, options: GetOfflineOptions & AbortOptions & (ProgressOptions | ProgressOptions)): Promise { + const blockGetPromises: Array> = [] + /** + * We need to create a new AbortController that aborts when: + * 1. options.signal is aborted + * 2. any of the blockGetPromises are resolved + */ + const blockProviderController = new AbortController() + const newOptions = { ...options, signal: blockProviderController.signal } + if (options.signal != null) { + // abort the blockProvider signal when the options.signal is aborted + options.signal.addEventListener('abort', (): void => { + blockProviderController.abort() + }) + } + + if (this.bitswap?.isStarted() === true) { + options.onProgress?.(new CustomProgressEvent('blocks:get:bitswap:get', cid)) + blockGetPromises.push(this.bitswap.want(cid, newOptions)) + } + + for (const provider of this.#blockProviders) { + // if the signal has already been aborted, don't bother requesting from other providers. + if (!blockProviderController.signal.aborted) { + options.onProgress?.(new CustomProgressEvent('blocks:get:byte-provider:get', cid)) + const providerPromise = provider.get(cid, newOptions) + void providerPromise.then(() => { + // if a provider resolves, abort the signal so we don't request bytes from any other providers + blockProviderController.abort() + }) + blockGetPromises.push(providerPromise) + } + } + + try { + const block = await Promise.any(blockGetPromises) + // cancel all other block get promises + blockProviderController.abort() + + return block + } catch (err) { + throw new Error(`Could not get block for ${cid.toString()} from any provider`) + } + } + /** * Get a block by cid */ async get (cid: CID, options: GetOfflineOptions & AbortOptions & ProgressOptions = {}): Promise { - if (options.offline !== true && this.bitswap?.isStarted() != null && !(await this.child.has(cid))) { - options.onProgress?.(new CustomProgressEvent('blocks:get:bitswap:get', cid)) - const block = await this.bitswap.want(cid, options) + if (options.offline !== true && !(await this.child.has(cid))) { + const block = await this.#_get(cid, options) options.onProgress?.(new CustomProgressEvent('blocks:get:blockstore:put', cid)) await this.child.put(cid, block, options) @@ -105,9 +153,8 @@ export class NetworkedStorage implements Blocks { options.onProgress?.(new CustomProgressEvent('blocks:get-many:blockstore:get-many')) yield * this.child.getMany(forEach(cids, async (cid): Promise => { - if (options.offline !== true && this.bitswap?.isStarted() === true && !(await this.child.has(cid))) { - options.onProgress?.(new CustomProgressEvent('blocks:get-many:bitswap:get', cid)) - const block = await this.bitswap.want(cid, options) + if (options.offline !== true && !(await this.child.has(cid))) { + const block = await this.#_get(cid, options) options.onProgress?.(new CustomProgressEvent('blocks:get-many:blockstore:put', cid)) await this.child.put(cid, block, options) } diff --git a/packages/helia/test/fixtures/create-helia.ts b/packages/helia/test/fixtures/create-helia.ts index a8a0dd53f..8f01779ed 100644 --- a/packages/helia/test/fixtures/create-helia.ts +++ b/packages/helia/test/fixtures/create-helia.ts @@ -7,6 +7,7 @@ import type { Helia } from '@helia/interface' export async function createHelia (): Promise { return createNode({ + byteProviders: [], libp2p: { addresses: { listen: [ diff --git a/packages/helia/test/pins.depth-limited.spec.ts b/packages/helia/test/pins.depth-limited.spec.ts index 201983ab4..c398d9fe6 100644 --- a/packages/helia/test/pins.depth-limited.spec.ts +++ b/packages/helia/test/pins.depth-limited.spec.ts @@ -29,6 +29,7 @@ describe('pins (depth limited)', () => { helia = await createHelia({ datastore: new MemoryDatastore(), blockstore, + byteProviders: [], libp2p: await createLibp2p({ transports: [ webSockets() diff --git a/packages/helia/test/pins.recursive.spec.ts b/packages/helia/test/pins.recursive.spec.ts index 1bc4a7bda..9d420645e 100644 --- a/packages/helia/test/pins.recursive.spec.ts +++ b/packages/helia/test/pins.recursive.spec.ts @@ -26,6 +26,7 @@ describe('pins (recursive)', () => { helia = await createHelia({ datastore: new MemoryDatastore(), + byteProviders: [], blockstore, libp2p: await createLibp2p({ transports: [ diff --git a/packages/helia/test/utils/block-provider.spec.ts b/packages/helia/test/utils/block-provider.spec.ts new file mode 100644 index 000000000..bf319979c --- /dev/null +++ b/packages/helia/test/utils/block-provider.spec.ts @@ -0,0 +1,110 @@ +/* eslint-env mocha */ + +import { expect } from 'aegir/chai' +import { MemoryBlockstore } from 'blockstore-core' +import delay from 'delay' +import all from 'it-all' +import * as raw from 'multiformats/codecs/raw' +import Sinon from 'sinon' +import { type StubbedInstance, stubInterface } from 'sinon-ts' +import { NetworkedStorage } from '../../src/utils/networked-storage.js' +import { createBlock } from '../fixtures/create-block.js' +import type { BlockProvider } from '../../src/utils/block-provider.js' +import type { Blockstore } from 'interface-blockstore' +import type { Bitswap } from 'ipfs-bitswap' +import type { CID } from 'multiformats/cid' + +describe('block-provider', () => { + let storage: NetworkedStorage + let blockstore: Blockstore + let bitswap: StubbedInstance + let blockProvider: StubbedInstance + let blocks: Array<{ cid: CID, block: Uint8Array }> + + beforeEach(async () => { + blocks = [] + + for (let i = 0; i < 10; i++) { + blocks.push(await createBlock(raw.code, Uint8Array.from([0, 1, 2, i]))) + } + + blockstore = new MemoryBlockstore() + bitswap = stubInterface() + blockProvider = stubInterface() + storage = new NetworkedStorage(blockstore, { + bitswap, + blockProviders: [blockProvider] + }) + // disable bitswap + bitswap.isStarted.returns(false) + }) + + it('gets a block from the blockProvider when it is not in the blockstore', async () => { + const { cid, block } = blocks[0] + + blockProvider.get.withArgs(cid, Sinon.match.any).resolves(block) + + expect(await blockstore.has(cid)).to.be.false() + + const returned = await storage.get(cid) + + expect(await blockstore.has(cid)).to.be.true() + expect(returned).to.equalBytes(block) + expect(blockProvider.get.calledWith(cid)).to.be.true() + }) + + it('gets many blocks from blockProvider when they are not in the blockstore', async () => { + const count = 5 + + for (let i = 0; i < count; i++) { + const { cid, block } = blocks[i] + blockProvider.get.withArgs(cid, Sinon.match.any).resolves(block) + + expect(await blockstore.has(cid)).to.be.false() + } + + const retrieved = await all(storage.getMany(async function * () { + for (let i = 0; i < count; i++) { + yield blocks[i].cid + await delay(10) + } + }())) + + expect(retrieved).to.deep.equal(new Array(count).fill(0).map((_, i) => blocks[i])) + + for (let i = 0; i < count; i++) { + const { cid } = blocks[i] + expect(blockProvider.get.calledWith(cid)).to.be.true() + expect(await blockstore.has(cid)).to.be.true() + } + }) + + it('gets some blocks from blockProvider when they are not in the blockstore', async () => { + const count = 5 + + // blocks 0,1,3,4 are in the blockstore + await blockstore.put(blocks[0].cid, blocks[0].block) + await blockstore.put(blocks[1].cid, blocks[1].block) + await blockstore.put(blocks[3].cid, blocks[3].block) + await blockstore.put(blocks[4].cid, blocks[4].block) + + // block #2 comes from blockProvider but slowly + blockProvider.get.withArgs(blocks[2].cid).callsFake(async () => { + await delay(100) + return blocks[2].block + }) + + const retrieved = await all(storage.getMany(async function * () { + for (let i = 0; i < count; i++) { + yield blocks[i].cid + await delay(10) + } + }())) + + expect(retrieved).to.deep.equal(new Array(count).fill(0).map((_, i) => blocks[i])) + + for (let i = 0; i < count; i++) { + expect(await blockstore.has(blocks[i].cid)).to.be.true() + } + }) +}) diff --git a/packages/helia/test/utils/networked-storage.spec.ts b/packages/helia/test/utils/networked-storage.spec.ts index 8ce33f648..a920b666f 100644 --- a/packages/helia/test/utils/networked-storage.spec.ts +++ b/packages/helia/test/utils/networked-storage.spec.ts @@ -14,7 +14,7 @@ import type { Blockstore } from 'interface-blockstore' import type { Bitswap } from 'ipfs-bitswap' import type { CID } from 'multiformats/cid' -describe('storage', () => { +describe('networked-storage', () => { let storage: NetworkedStorage let blockstore: Blockstore let bitswap: StubbedInstance @@ -30,7 +30,8 @@ describe('storage', () => { blockstore = new MemoryBlockstore() bitswap = stubInterface() storage = new NetworkedStorage(blockstore, { - bitswap + bitswap, + blockProviders: [] }) }) diff --git a/packages/interface/src/blocks.ts b/packages/interface/src/blocks.ts index b0d494068..a67b3826e 100644 --- a/packages/interface/src/blocks.ts +++ b/packages/interface/src/blocks.ts @@ -61,3 +61,5 @@ ProgressOptions, ProgressOptions { } + +export * from './byte-provider.js' diff --git a/packages/interface/src/byte-provider.ts b/packages/interface/src/byte-provider.ts new file mode 100644 index 000000000..3ece17c53 --- /dev/null +++ b/packages/interface/src/byte-provider.ts @@ -0,0 +1,23 @@ +import type { AbortOptions } from '@libp2p/interface' +import type { CID } from 'multiformats/cid' +import type { ProgressEvent, ProgressOptions } from 'progress-events' + +export type ByteProviderProgressEventError = ProgressEvent<'helia:byte-provider:error', Error> +export type ByteProviderProgressEventRequest = ProgressEvent<'helia:byte-provider:request', CID> +export type ByteProviderProgressEventSuccess = ProgressEvent<'helia:byte-provider:success', CID> + +export type ByteProviderEvents = + ByteProviderProgressEventRequest | + ByteProviderProgressEventSuccess | + ByteProviderProgressEventError + +export interface ByteProviderGetOptions extends AbortOptions, ProgressOptions { + +} + +export interface ByteProvider extends ProgressOptions { + /** + * Get some bytes from the byte provider + */ + get: (cid: CID, options?: ByteProviderGetOptions) => Promise +} diff --git a/packages/interop/test/fixtures/create-helia.browser.ts b/packages/interop/test/fixtures/create-helia.browser.ts index 38864c445..639dbb129 100644 --- a/packages/interop/test/fixtures/create-helia.browser.ts +++ b/packages/interop/test/fixtures/create-helia.browser.ts @@ -40,6 +40,7 @@ export async function createHeliaNode (init?: Partial): Promise): Promise