Skip to content

Commit 3490a4f

Browse files
committed
fix: support resumable pinning
1 parent 7f9dbb5 commit 3490a4f

File tree

2 files changed

+52
-5
lines changed

2 files changed

+52
-5
lines changed

packages/helia/src/pins.ts

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,17 @@ interface DatastorePinnedBlock {
3535
pinnedBy: Uint8Array[]
3636
}
3737

38+
/**
39+
* Callback for updating a {@link DatastorePinnedBlock}'s properties when
40+
* calling `#updatePinnedBlock`
41+
*
42+
* The callback should return `false` to prevent any pinning modifications to
43+
* the block, and true in all other cases.
44+
*/
45+
interface WithPinnedBlockCallback {
46+
(pinnedBlock: DatastorePinnedBlock): boolean
47+
}
48+
3849
const DATASTORE_PIN_PREFIX = '/pin/'
3950
const DATASTORE_BLOCK_PREFIX = '/pinned-block/'
4051
const DATASTORE_ENCODING = base36
@@ -82,11 +93,12 @@ export class PinsImpl implements Pins {
8293
await this.#updatePinnedBlock(cid, (pinnedBlock: DatastorePinnedBlock) => {
8394
// do not update pinned block if this block is already pinned by this CID
8495
if (pinnedBlock.pinnedBy.find(c => uint8ArrayEquals(c, cid.bytes)) != null) {
85-
return
96+
return false
8697
}
8798

8899
pinnedBlock.pinCount++
89100
pinnedBlock.pinnedBy.push(cid.bytes)
101+
return true
90102
}, options)
91103

92104
return cid
@@ -157,7 +169,7 @@ export class PinsImpl implements Pins {
157169
/**
158170
* Update the pin count for the CID
159171
*/
160-
async #updatePinnedBlock (cid: CID, withPinnedBlock: (pinnedBlock: DatastorePinnedBlock) => void, options: AddOptions): Promise<void> {
172+
async #updatePinnedBlock (cid: CID, withPinnedBlock: WithPinnedBlockCallback, options: AddOptions): Promise<void> {
161173
const blockKey = new Key(`${DATASTORE_BLOCK_PREFIX}${DATASTORE_ENCODING.encode(cid.multihash.bytes)}`)
162174

163175
let pinnedBlock: DatastorePinnedBlock = {
@@ -173,7 +185,10 @@ export class PinsImpl implements Pins {
173185
}
174186
}
175187

176-
withPinnedBlock(pinnedBlock)
188+
const shouldContinue = withPinnedBlock(pinnedBlock)
189+
if (!shouldContinue) {
190+
return
191+
}
177192

178193
if (pinnedBlock.pinCount === 0) {
179194
if (await this.datastore.has(blockKey)) {
@@ -197,9 +212,10 @@ export class PinsImpl implements Pins {
197212
yield async () => {
198213
const cid = await promise()
199214

200-
await this.#updatePinnedBlock(cid, (pinnedBlock): void => {
215+
await this.#updatePinnedBlock(cid, (pinnedBlock): boolean => {
201216
pinnedBlock.pinCount--
202217
pinnedBlock.pinnedBy = pinnedBlock.pinnedBy.filter(c => uint8ArrayEquals(c, cid.bytes))
218+
return true
203219
}, {
204220
...options,
205221
depth: pin.depth

packages/helia/test/pins.recursive.spec.ts

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import { MemoryDatastore } from 'datastore-core'
88
import all from 'it-all'
99
import parallel from 'it-parallel'
1010
import { createLibp2p } from 'libp2p'
11-
import { createHelia } from '../src/index.js'
11+
import { type AddPinEvents, createHelia } from '../src/index.js'
1212
import { createDag, type DAGNode } from './fixtures/create-dag.js'
1313
import { dagWalker } from './fixtures/dag-walker.js'
1414
import type { Helia } from '@helia/interface'
@@ -87,4 +87,35 @@ describe('pins (recursive)', () => {
8787
}
8888
}
8989
})
90+
91+
it('can resume an interrupted pinning operation', async () => {
92+
// dag has 13 nodes. We should abort after 5
93+
const events: AddPinEvents[] = []
94+
const getPinIterator = (): ReturnType<typeof helia.pins.add> => helia.pins.add(dag['level-0'].cid, {
95+
onProgress: (evt) => {
96+
if (evt.type === 'helia:pin:add') {
97+
events.push(evt)
98+
}
99+
}
100+
})
101+
const pinIter = getPinIterator()
102+
let output = await pinIter.next()
103+
const firstTryPins = []
104+
105+
while (output.done === false && events.length < 5) {
106+
firstTryPins.push(await output.value())
107+
output = await pinIter.next()
108+
}
109+
110+
expect(firstTryPins).to.have.lengthOf(5)
111+
expect(events.length).to.eq(5)
112+
expect(output.done).to.eq(false) // we're not actually done. We simulated a crash
113+
114+
// now resume, and consume the entire iterator to completion
115+
const pin = await all(parallel(getPinIterator()))
116+
117+
expect(pin).to.have.lengthOf(13)
118+
// we did not re-pin things we already pinned
119+
expect(events.length).to.eq(13)
120+
})
90121
})

0 commit comments

Comments
 (0)