From 52a8760c9775a5fe803af9bc23252d5a2ef50d84 Mon Sep 17 00:00:00 2001 From: MKPLKN <15703708+MKPLKN@users.noreply.github.com> Date: Sun, 1 Sep 2024 21:24:37 +0300 Subject: [PATCH 01/12] Add monitor --- index.js | 5 +++ lib/monitor.js | 89 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+) create mode 100644 lib/monitor.js diff --git a/index.js b/index.js index 1ee1a67..f2b6ee9 100644 --- a/index.js +++ b/index.js @@ -10,6 +10,7 @@ const safetyCatch = require('safety-catch') const crypto = require('hypercore-crypto') const Hypercore = require('hypercore') const { BLOCK_NOT_AVAILABLE, BAD_ARGUMENT } = require('hypercore-errors') +const Monitor = require('./lib/monitor') const keyEncoding = new SubEncoder('files', 'utf-8') @@ -278,6 +279,10 @@ module.exports = class Hyperdrive extends ReadyResource { return this.blobs } + monitor (name, opts = {}) { + return new Monitor(this, { name, ...opts }) + } + async get (name, opts) { const node = await this.entry(name, opts) if (!node?.value.blob) return null diff --git a/lib/monitor.js b/lib/monitor.js new file mode 100644 index 0000000..4824b19 --- /dev/null +++ b/lib/monitor.js @@ -0,0 +1,89 @@ +const ReadyResource = require('ready-resource') + +module.exports = class Monitor extends ReadyResource { + constructor (drive, opts = {}) { + super() + this.drive = drive + this.blobs = null + this.name = opts.name + this.entry = opts.entry + this.upload = opts.upload !== false && opts.download !== true + this.download = opts.download !== false && !this.upload + + this._boundOnAppend = this._onAppend.bind(this) + this._boundOnUpload = this._onUpload.bind(this) + this._boundOnDownload = this._onDownload.bind(this) + this.drive.on('close', () => this.close()) + + this.stats = { + type: this.upload ? 'upload' : 'download', + startTime: 0, + percentage: 0, + speed: null, + blocks: null, + bytes: null, + totalBytes: null, + totalBlocks: null + } + } + + async _open () { + await this.drive.ready() + this.blobs = await this.drive.getBlobs() + this.entry = await this.drive.entry(this.name) + if (this.entry) this._setEntryInfo() + // Handlers + this.blobs.core.on('append', this._boundOnAppend) + if (this.upload) this.blobs.core.on('upload', this._boundOnUpload) + if (this.download) this.blobs.core.on('download', this._boundOnDownload) + } + + async _close () { + this.blobs.core.off('append', this._boundOnAppend) + if (this.upload) this.blobs.core.off('upload', this._boundOnUpload) + if (this.download) this.blobs.core.off('download', this._boundOnDownload) + } + + async _onAppend () { + if (this.entry) return + await new Promise(resolve => setImmediate(resolve)) + this.entry = await this.drive.entry(this.name) + if (this.entry) this._setEntryInfo() + } + + async _onUpload (index, bytes, from) { + this._updateStats(index, bytes) + this.emit('update', { stats: this.stats }) + } + + async _onDownload (index, bytes, from) { + this._updateStats(index, bytes) + this.emit('update', { stats: this.stats }) + } + + _setEntryInfo () { + if (this.stats.totalBytes || this.stats.totalBlocks) return + this.stats.totalBytes = this.entry.value.blob.byteLength + this.stats.totalBlocks = this.entry.value.blob.blockLength + } + + _updateStats (index, bytes) { + if (!this.entry) return + if (!isWithinRange(index, this.entry)) return + if (!this.stats.startTime) this.stats.startTime = Date.now() + + this.stats.blocks++ + this.stats.bytes += bytes + this.stats.percentage = Number(((this.stats.bytes / this.stats.totalBytes) * 100).toFixed(2)) + const timeElapsed = (Date.now() - this.stats.startTime) / 1000 + if (timeElapsed > 0) { + this.stats.speed = Math.floor(this.stats.bytes / timeElapsed) // Speed in bytes/sec + } + } +} + +function isWithinRange (index, entry) { + if (!entry || !entry.value) return + const { blockOffset, blockLength } = entry.value.blob + return index >= blockOffset && index < blockOffset + blockLength +} From 05f6208c7a87b9f3b2078789cd8dec145d916ea1 Mon Sep 17 00:00:00 2001 From: MKPLKN <15703708+MKPLKN@users.noreply.github.com> Date: Mon, 2 Sep 2024 10:09:20 +0300 Subject: [PATCH 02/12] Simplify monitor + tests --- lib/monitor.js | 33 +++++++++++----------------- test.js | 59 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+), 20 deletions(-) diff --git a/lib/monitor.js b/lib/monitor.js index 4824b19..d599184 100644 --- a/lib/monitor.js +++ b/lib/monitor.js @@ -7,16 +7,13 @@ module.exports = class Monitor extends ReadyResource { this.blobs = null this.name = opts.name this.entry = opts.entry - this.upload = opts.upload !== false && opts.download !== true - this.download = opts.download !== false && !this.upload this._boundOnAppend = this._onAppend.bind(this) - this._boundOnUpload = this._onUpload.bind(this) - this._boundOnDownload = this._onDownload.bind(this) + this._boundUpdateStats = this._updateStats.bind(this) this.drive.on('close', () => this.close()) + // Updated on each upload/download event this.stats = { - type: this.upload ? 'upload' : 'download', startTime: 0, percentage: 0, speed: null, @@ -34,14 +31,14 @@ module.exports = class Monitor extends ReadyResource { if (this.entry) this._setEntryInfo() // Handlers this.blobs.core.on('append', this._boundOnAppend) - if (this.upload) this.blobs.core.on('upload', this._boundOnUpload) - if (this.download) this.blobs.core.on('download', this._boundOnDownload) + this.blobs.core.on('upload', this._boundUpdateStats) + this.blobs.core.on('download', this._boundUpdateStats) } async _close () { this.blobs.core.off('append', this._boundOnAppend) - if (this.upload) this.blobs.core.off('upload', this._boundOnUpload) - if (this.download) this.blobs.core.off('download', this._boundOnDownload) + this.blobs.core.off('upload', this._boundUpdateStats) + this.blobs.core.off('download', this._boundUpdateStats) } async _onAppend () { @@ -51,16 +48,6 @@ module.exports = class Monitor extends ReadyResource { if (this.entry) this._setEntryInfo() } - async _onUpload (index, bytes, from) { - this._updateStats(index, bytes) - this.emit('update', { stats: this.stats }) - } - - async _onDownload (index, bytes, from) { - this._updateStats(index, bytes) - this.emit('update', { stats: this.stats }) - } - _setEntryInfo () { if (this.stats.totalBytes || this.stats.totalBlocks) return this.stats.totalBytes = this.entry.value.blob.byteLength @@ -68,7 +55,7 @@ module.exports = class Monitor extends ReadyResource { } _updateStats (index, bytes) { - if (!this.entry) return + if (!this.entry || this.closing) return if (!isWithinRange(index, this.entry)) return if (!this.stats.startTime) this.stats.startTime = Date.now() @@ -79,6 +66,12 @@ module.exports = class Monitor extends ReadyResource { if (timeElapsed > 0) { this.stats.speed = Math.floor(this.stats.bytes / timeElapsed) // Speed in bytes/sec } + + this.emit('update') + if (this.stats.totalBytes === this.stats.bytes) { + this.emit('done') + this.close() + } } } diff --git a/test.js b/test.js index 2fb7c4f..3d6a9e7 100644 --- a/test.js +++ b/test.js @@ -1562,6 +1562,65 @@ test('drive.list (recursive false) ignore', async (t) => { t.alike(entries, expectedEntries) }) +test('upload/download can be monitored', async (t) => { + t.plan(23) + const { corestore, drive, swarm, mirror } = await testenv(t.teardown) + swarm.on('connection', (conn) => corestore.replicate(conn)) + swarm.join(drive.discoveryKey, { server: true, client: false }) + await swarm.flush() + + mirror.swarm.on('connection', (conn) => mirror.corestore.replicate(conn)) + mirror.swarm.join(drive.discoveryKey, { server: false, client: true }) + await mirror.swarm.flush() + + const file = '/example.md' + const bytes = 1024 * 100 // big enough to trigger more than one update event + const buffer = Buffer.alloc(bytes, '0') + + { + // Start monitoring upload + const monitor = drive.monitor(file) + await monitor.ready() + t.is(monitor.name, file) + const expectedBlocks = [2, 1] + const expectedBytes = [bytes, 65536] + monitor.on('update', () => { + t.is(monitor.stats.blocks, expectedBlocks.pop()) + t.is(monitor.stats.bytes, expectedBytes.pop()) + t.is(monitor.stats.totalBlocks, 2) + t.is(monitor.stats.totalBytes, bytes) + }) + monitor.on('done', () => { + t.is(monitor.stats.blocks, 2) + t.is(monitor.stats.bytes, bytes) + }) + monitor.on('close', () => t.pass('Monitor is closed after its done')) + } + + await drive.put(file, buffer) + + { + // Start monitoring download + const monitor = mirror.drive.monitor(file) + await monitor.ready() + const expectedBlocks = [2, 1] + const expectedBytes = [bytes, 65536] + monitor.on('update', () => { + t.is(monitor.stats.blocks, expectedBlocks.pop()) + t.is(monitor.stats.bytes, expectedBytes.pop()) + t.is(monitor.stats.totalBlocks, 2) + t.is(monitor.stats.totalBytes, bytes) + }) + monitor.on('done', () => { + t.is(monitor.stats.blocks, 2) + t.is(monitor.stats.bytes, bytes) + }) + monitor.on('close', () => t.pass('Monitor is closed after its done')) + } + + await mirror.drive.get(file) +}) + async function testenv (teardown) { const corestore = new Corestore(RAM) await corestore.ready() From 97dadc69d822624d1336c649fbb117fc981249ba Mon Sep 17 00:00:00 2001 From: MKPLKN <15703708+MKPLKN@users.noreply.github.com> Date: Mon, 2 Sep 2024 10:22:54 +0300 Subject: [PATCH 03/12] Remove 'done' event --- lib/monitor.js | 4 ---- test.js | 12 +----------- 2 files changed, 1 insertion(+), 15 deletions(-) diff --git a/lib/monitor.js b/lib/monitor.js index d599184..5c4e948 100644 --- a/lib/monitor.js +++ b/lib/monitor.js @@ -68,10 +68,6 @@ module.exports = class Monitor extends ReadyResource { } this.emit('update') - if (this.stats.totalBytes === this.stats.bytes) { - this.emit('done') - this.close() - } } } diff --git a/test.js b/test.js index 3d6a9e7..202ff15 100644 --- a/test.js +++ b/test.js @@ -1563,7 +1563,7 @@ test('drive.list (recursive false) ignore', async (t) => { }) test('upload/download can be monitored', async (t) => { - t.plan(23) + t.plan(17) const { corestore, drive, swarm, mirror } = await testenv(t.teardown) swarm.on('connection', (conn) => corestore.replicate(conn)) swarm.join(drive.discoveryKey, { server: true, client: false }) @@ -1590,11 +1590,6 @@ test('upload/download can be monitored', async (t) => { t.is(monitor.stats.totalBlocks, 2) t.is(monitor.stats.totalBytes, bytes) }) - monitor.on('done', () => { - t.is(monitor.stats.blocks, 2) - t.is(monitor.stats.bytes, bytes) - }) - monitor.on('close', () => t.pass('Monitor is closed after its done')) } await drive.put(file, buffer) @@ -1611,11 +1606,6 @@ test('upload/download can be monitored', async (t) => { t.is(monitor.stats.totalBlocks, 2) t.is(monitor.stats.totalBytes, bytes) }) - monitor.on('done', () => { - t.is(monitor.stats.blocks, 2) - t.is(monitor.stats.bytes, bytes) - }) - monitor.on('close', () => t.pass('Monitor is closed after its done')) } await mirror.drive.get(file) From b5721c11fca22e21f896ac36e3294244bcc99ae3 Mon Sep 17 00:00:00 2001 From: MKPLKN <15703708+MKPLKN@users.noreply.github.com> Date: Tue, 3 Sep 2024 10:11:10 +0300 Subject: [PATCH 04/12] load the local state on download --- lib/monitor.js | 79 ++++++++++++++++++++++++++++++++++++++------------ package.json | 1 + test.js | 44 ++++++++++++++++++++++++---- 3 files changed, 99 insertions(+), 25 deletions(-) diff --git a/lib/monitor.js b/lib/monitor.js index 5c4e948..bb1fdc3 100644 --- a/lib/monitor.js +++ b/lib/monitor.js @@ -1,4 +1,6 @@ const ReadyResource = require('ready-resource') +const debounce = require('debounceify') +const safetyCatch = require('safety-catch') module.exports = class Monitor extends ReadyResource { constructor (drive, opts = {}) { @@ -7,20 +9,24 @@ module.exports = class Monitor extends ReadyResource { this.blobs = null this.name = opts.name this.entry = opts.entry + this.isDownload = opts.download === true - this._boundOnAppend = this._onAppend.bind(this) - this._boundUpdateStats = this._updateStats.bind(this) + this._boundOnAppend = debounce(this._onAppend.bind(this)) + this._boundOnUpload = this._onUpload.bind(this) + this._boundOnDownload = this._onDownload.bind(this) this.drive.on('close', () => this.close()) // Updated on each upload/download event this.stats = { startTime: 0, percentage: 0, + peersCount: 0, speed: null, blocks: null, - bytes: null, - totalBytes: null, - totalBlocks: null + totalBytes: null, // local + bytes loaded during monitoring + monitoringBytes: null, // bytes loaded during monitoring + targetBytes: null, + targetBlocks: null } } @@ -29,16 +35,26 @@ module.exports = class Monitor extends ReadyResource { this.blobs = await this.drive.getBlobs() this.entry = await this.drive.entry(this.name) if (this.entry) this._setEntryInfo() + + // load the local state for the file. + // upload is a bit more tricky + if (this.entry && this.isDownload) { + await this._loadLocalState().catch(safetyCatch).finally(() => { + this._calculateStats() + this.emit('update') + }) + } + // Handlers this.blobs.core.on('append', this._boundOnAppend) - this.blobs.core.on('upload', this._boundUpdateStats) - this.blobs.core.on('download', this._boundUpdateStats) + this.blobs.core.on('upload', this._boundOnUpload) + this.blobs.core.on('download', this._boundOnDownload) } async _close () { this.blobs.core.off('append', this._boundOnAppend) - this.blobs.core.off('upload', this._boundUpdateStats) - this.blobs.core.off('download', this._boundUpdateStats) + this.blobs.core.off('upload', this._boundOnUpload) + this.blobs.core.off('download', this._boundOnDownload) } async _onAppend () { @@ -49,25 +65,50 @@ module.exports = class Monitor extends ReadyResource { } _setEntryInfo () { - if (this.stats.totalBytes || this.stats.totalBlocks) return - this.stats.totalBytes = this.entry.value.blob.byteLength - this.stats.totalBlocks = this.entry.value.blob.blockLength + if (this.stats.targetBytes || this.stats.targetBlocks) return + this.stats.targetBytes = this.entry.value.blob.byteLength + this.stats.targetBlocks = this.entry.value.blob.blockLength + this.stats.blockOffset = this.entry.value.blob.blockOffset + this.stats.byteOffset = this.entry.value.blob.byteOffset + } + + async _onUpload (index, bytes, from) { + this._updateStats(index, bytes, from) + } + + async _onDownload (index, bytes, from) { + this._updateStats(index, bytes, from) + } + + async _loadLocalState () { + // TODO: think this will only work if its linear + const stream = this.blobs.createReadStream(this.entry.value.blob, { wait: false }) + for await (const bytes of stream) { + this.stats.totalBytes += bytes.length + this.stats.blocks++ + } } - _updateStats (index, bytes) { + _updateStats (index, bytes, from) { if (!this.entry || this.closing) return if (!isWithinRange(index, this.entry)) return - if (!this.stats.startTime) this.stats.startTime = Date.now() + this.stats.peersCount = from.replicator.peers.length this.stats.blocks++ - this.stats.bytes += bytes - this.stats.percentage = Number(((this.stats.bytes / this.stats.totalBytes) * 100).toFixed(2)) + this.stats.monitoringBytes += bytes + this.stats.totalBytes += bytes + + this._calculateStats() + this.emit('update') + } + + _calculateStats () { + if (!this.stats.startTime) this.stats.startTime = Date.now() + this.stats.percentage = Number(((this.stats.totalBytes / this.stats.targetBytes) * 100).toFixed(2)) const timeElapsed = (Date.now() - this.stats.startTime) / 1000 if (timeElapsed > 0) { - this.stats.speed = Math.floor(this.stats.bytes / timeElapsed) // Speed in bytes/sec + this.stats.speed = Math.floor(this.stats.monitoringBytes / timeElapsed) // Speed in bytes/sec } - - this.emit('update') } } diff --git a/package.json b/package.json index 2791626..193e241 100644 --- a/package.json +++ b/package.json @@ -21,6 +21,7 @@ }, "homepage": "https://github.com/holepunchto/hyperdrive#readme", "dependencies": { + "debounceify": "^1.1.0", "hyperbee": "^2.11.1", "hyperblobs": "^2.3.0", "hypercore": "^10.33.0", diff --git a/test.js b/test.js index 202ff15..885ab79 100644 --- a/test.js +++ b/test.js @@ -1586,9 +1586,9 @@ test('upload/download can be monitored', async (t) => { const expectedBytes = [bytes, 65536] monitor.on('update', () => { t.is(monitor.stats.blocks, expectedBlocks.pop()) - t.is(monitor.stats.bytes, expectedBytes.pop()) - t.is(monitor.stats.totalBlocks, 2) - t.is(monitor.stats.totalBytes, bytes) + t.is(monitor.stats.monitoringBytes, expectedBytes.pop()) + t.is(monitor.stats.targetBlocks, 2) + t.is(monitor.stats.targetBytes, bytes) }) } @@ -1602,15 +1602,47 @@ test('upload/download can be monitored', async (t) => { const expectedBytes = [bytes, 65536] monitor.on('update', () => { t.is(monitor.stats.blocks, expectedBlocks.pop()) - t.is(monitor.stats.bytes, expectedBytes.pop()) - t.is(monitor.stats.totalBlocks, 2) - t.is(monitor.stats.totalBytes, bytes) + t.is(monitor.stats.monitoringBytes, expectedBytes.pop()) + t.is(monitor.stats.targetBlocks, 2) + t.is(monitor.stats.targetBytes, bytes) }) } await mirror.drive.get(file) }) +test('monitor loads the local state on download', async (t) => { + t.plan(3) + const { corestore, drive, swarm, mirror } = await testenv(t.teardown) + swarm.on('connection', (conn) => corestore.replicate(conn)) + swarm.join(drive.discoveryKey, { server: true, client: false }) + await swarm.flush() + + mirror.swarm.on('connection', (conn) => mirror.corestore.replicate(conn)) + mirror.swarm.join(drive.discoveryKey, { server: false, client: true }) + await mirror.swarm.flush() + + const file = '/example.md' + const bytes = 1234 + const buffer = Buffer.alloc(bytes, '0') + await drive.put(file, buffer) + + observe() + async function observe () { + for await (const _ of mirror.drive.watch()) { /* eslint-disable-line */ + await mirror.drive.get(file) + // Start monitoring after we've downloaded the file + const monitor = mirror.drive.monitor(file, { download: true }) + monitor.on('update', () => { + t.is(monitor.stats.percentage, 100) + t.is(monitor.stats.totalBytes, bytes) + t.is(monitor.stats.targetBytes, bytes) + }) + await monitor.ready() + } + } +}) + async function testenv (teardown) { const corestore = new Corestore(RAM) await corestore.ready() From 5e17e1501fcde507531f3bfca0f4990f79e05cab Mon Sep 17 00:00:00 2001 From: MKPLKN <15703708+MKPLKN@users.noreply.github.com> Date: Tue, 3 Sep 2024 13:27:39 +0300 Subject: [PATCH 05/12] Change how local state of the file is loaded --- lib/monitor.js | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/lib/monitor.js b/lib/monitor.js index bb1fdc3..9e94729 100644 --- a/lib/monitor.js +++ b/lib/monitor.js @@ -36,8 +36,8 @@ module.exports = class Monitor extends ReadyResource { this.entry = await this.drive.entry(this.name) if (this.entry) this._setEntryInfo() - // load the local state for the file. - // upload is a bit more tricky + // load the local state of the file. + // upload is a bit more tricky... if (this.entry && this.isDownload) { await this._loadLocalState().catch(safetyCatch).finally(() => { this._calculateStats() @@ -81,11 +81,15 @@ module.exports = class Monitor extends ReadyResource { } async _loadLocalState () { - // TODO: think this will only work if its linear - const stream = this.blobs.createReadStream(this.entry.value.blob, { wait: false }) - for await (const bytes of stream) { - this.stats.totalBytes += bytes.length - this.stats.blocks++ + // @TODO: There's a better way to do this? + let blockIdx = this.stats.blockOffset + while (blockIdx <= this.stats.targetBlocks) { + if (await this.blobs.core.core.bitfield.get(blockIdx)) { + const bytes = await this.blobs.core.core.blocks.get(blockIdx) + this.stats.totalBytes += bytes.length + this.stats.blocks++ + } + blockIdx++ } } From ca5af17a21bfd1dc9aab1ec1b4a6cb8249f7e174 Mon Sep 17 00:00:00 2001 From: MKPLKN <15703708+MKPLKN@users.noreply.github.com> Date: Tue, 3 Sep 2024 21:05:50 +0300 Subject: [PATCH 06/12] Skip bitfield for now --- lib/monitor.js | 73 +++++++++++++++++++------------------------------- test.js | 52 ++++++++--------------------------- 2 files changed, 39 insertions(+), 86 deletions(-) diff --git a/lib/monitor.js b/lib/monitor.js index 9e94729..f6c2711 100644 --- a/lib/monitor.js +++ b/lib/monitor.js @@ -1,6 +1,5 @@ const ReadyResource = require('ready-resource') const debounce = require('debounceify') -const safetyCatch = require('safety-catch') module.exports = class Monitor extends ReadyResource { constructor (drive, opts = {}) { @@ -9,15 +8,13 @@ module.exports = class Monitor extends ReadyResource { this.blobs = null this.name = opts.name this.entry = opts.entry - this.isDownload = opts.download === true this._boundOnAppend = debounce(this._onAppend.bind(this)) this._boundOnUpload = this._onUpload.bind(this) this._boundOnDownload = this._onDownload.bind(this) this.drive.on('close', () => this.close()) - // Updated on each upload/download event - this.stats = { + const stats = { startTime: 0, percentage: 0, peersCount: 0, @@ -28,6 +25,10 @@ module.exports = class Monitor extends ReadyResource { targetBytes: null, targetBlocks: null } + + // Updated on each upload/download event + this.uploadStats = { ...stats } + this.downloadStats = { ...stats } } async _open () { @@ -36,15 +37,6 @@ module.exports = class Monitor extends ReadyResource { this.entry = await this.drive.entry(this.name) if (this.entry) this._setEntryInfo() - // load the local state of the file. - // upload is a bit more tricky... - if (this.entry && this.isDownload) { - await this._loadLocalState().catch(safetyCatch).finally(() => { - this._calculateStats() - this.emit('update') - }) - } - // Handlers this.blobs.core.on('append', this._boundOnAppend) this.blobs.core.on('upload', this._boundOnUpload) @@ -65,53 +57,44 @@ module.exports = class Monitor extends ReadyResource { } _setEntryInfo () { - if (this.stats.targetBytes || this.stats.targetBlocks) return - this.stats.targetBytes = this.entry.value.blob.byteLength - this.stats.targetBlocks = this.entry.value.blob.blockLength - this.stats.blockOffset = this.entry.value.blob.blockOffset - this.stats.byteOffset = this.entry.value.blob.byteOffset - } + if (!this.downloadStats.targetBytes || !this.downloadStats.targetBlocks) { + this.downloadStats.targetBytes = this.entry.value.blob.byteLength + this.downloadStats.targetBlocks = this.entry.value.blob.blockLength + } - async _onUpload (index, bytes, from) { - this._updateStats(index, bytes, from) + if (!this.uploadStats.targetBytes || !this.uploadStats.targetBlocks) { + this.uploadStats.targetBytes = this.entry.value.blob.byteLength + this.uploadStats.targetBlocks = this.entry.value.blob.blockLength + } } - async _onDownload (index, bytes, from) { - this._updateStats(index, bytes, from) + _onUpload (index, bytes, from) { + this._updateStats(this.uploadStats, index, bytes, from) } - async _loadLocalState () { - // @TODO: There's a better way to do this? - let blockIdx = this.stats.blockOffset - while (blockIdx <= this.stats.targetBlocks) { - if (await this.blobs.core.core.bitfield.get(blockIdx)) { - const bytes = await this.blobs.core.core.blocks.get(blockIdx) - this.stats.totalBytes += bytes.length - this.stats.blocks++ - } - blockIdx++ - } + _onDownload (index, bytes, from) { + this._updateStats(this.downloadStats, index, bytes, from) } - _updateStats (index, bytes, from) { + _updateStats (stats, index, bytes, from) { if (!this.entry || this.closing) return if (!isWithinRange(index, this.entry)) return - this.stats.peersCount = from.replicator.peers.length - this.stats.blocks++ - this.stats.monitoringBytes += bytes - this.stats.totalBytes += bytes + stats.peersCount = from.replicator.peers.length + stats.blocks++ + stats.monitoringBytes += bytes + stats.totalBytes += bytes - this._calculateStats() + this._calculateStats(stats) this.emit('update') } - _calculateStats () { - if (!this.stats.startTime) this.stats.startTime = Date.now() - this.stats.percentage = Number(((this.stats.totalBytes / this.stats.targetBytes) * 100).toFixed(2)) - const timeElapsed = (Date.now() - this.stats.startTime) / 1000 + _calculateStats (stats) { + if (!stats.startTime) stats.startTime = Date.now() + stats.percentage = Number(((stats.totalBytes / stats.targetBytes) * 100).toFixed(2)) + const timeElapsed = (Date.now() - stats.startTime) / 1000 if (timeElapsed > 0) { - this.stats.speed = Math.floor(this.stats.monitoringBytes / timeElapsed) // Speed in bytes/sec + stats.speed = Math.floor(stats.monitoringBytes / timeElapsed) // bytes/sec } } } diff --git a/test.js b/test.js index 885ab79..01025a6 100644 --- a/test.js +++ b/test.js @@ -1563,7 +1563,7 @@ test('drive.list (recursive false) ignore', async (t) => { }) test('upload/download can be monitored', async (t) => { - t.plan(17) + t.plan(21) const { corestore, drive, swarm, mirror } = await testenv(t.teardown) swarm.on('connection', (conn) => corestore.replicate(conn)) swarm.join(drive.discoveryKey, { server: true, client: false }) @@ -1585,10 +1585,11 @@ test('upload/download can be monitored', async (t) => { const expectedBlocks = [2, 1] const expectedBytes = [bytes, 65536] monitor.on('update', () => { - t.is(monitor.stats.blocks, expectedBlocks.pop()) - t.is(monitor.stats.monitoringBytes, expectedBytes.pop()) - t.is(monitor.stats.targetBlocks, 2) - t.is(monitor.stats.targetBytes, bytes) + t.is(monitor.uploadStats.blocks, expectedBlocks.pop()) + t.is(monitor.uploadStats.monitoringBytes, expectedBytes.pop()) + t.is(monitor.uploadStats.targetBlocks, 2) + t.is(monitor.uploadStats.targetBytes, bytes) + t.absent(monitor.downloadStats.blocks) }) } @@ -1601,48 +1602,17 @@ test('upload/download can be monitored', async (t) => { const expectedBlocks = [2, 1] const expectedBytes = [bytes, 65536] monitor.on('update', () => { - t.is(monitor.stats.blocks, expectedBlocks.pop()) - t.is(monitor.stats.monitoringBytes, expectedBytes.pop()) - t.is(monitor.stats.targetBlocks, 2) - t.is(monitor.stats.targetBytes, bytes) + t.is(monitor.downloadStats.blocks, expectedBlocks.pop()) + t.is(monitor.downloadStats.monitoringBytes, expectedBytes.pop()) + t.is(monitor.downloadStats.targetBlocks, 2) + t.is(monitor.downloadStats.targetBytes, bytes) + t.absent(monitor.uploadStats.blocks) }) } await mirror.drive.get(file) }) -test('monitor loads the local state on download', async (t) => { - t.plan(3) - const { corestore, drive, swarm, mirror } = await testenv(t.teardown) - swarm.on('connection', (conn) => corestore.replicate(conn)) - swarm.join(drive.discoveryKey, { server: true, client: false }) - await swarm.flush() - - mirror.swarm.on('connection', (conn) => mirror.corestore.replicate(conn)) - mirror.swarm.join(drive.discoveryKey, { server: false, client: true }) - await mirror.swarm.flush() - - const file = '/example.md' - const bytes = 1234 - const buffer = Buffer.alloc(bytes, '0') - await drive.put(file, buffer) - - observe() - async function observe () { - for await (const _ of mirror.drive.watch()) { /* eslint-disable-line */ - await mirror.drive.get(file) - // Start monitoring after we've downloaded the file - const monitor = mirror.drive.monitor(file, { download: true }) - monitor.on('update', () => { - t.is(monitor.stats.percentage, 100) - t.is(monitor.stats.totalBytes, bytes) - t.is(monitor.stats.targetBytes, bytes) - }) - await monitor.ready() - } - } -}) - async function testenv (teardown) { const corestore = new Corestore(RAM) await corestore.ready() From 8e8574636cccaf09bc2c12ee0e12d1993bf27f47 Mon Sep 17 00:00:00 2001 From: MKPLKN <15703708+MKPLKN@users.noreply.github.com> Date: Wed, 4 Sep 2024 19:34:38 +0300 Subject: [PATCH 07/12] Use speedometer --- lib/monitor.js | 28 ++++++++++++---------------- package.json | 1 + 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/lib/monitor.js b/lib/monitor.js index f6c2711..377a3b2 100644 --- a/lib/monitor.js +++ b/lib/monitor.js @@ -1,4 +1,5 @@ const ReadyResource = require('ready-resource') +const speedometer = require('speedometer') const debounce = require('debounceify') module.exports = class Monitor extends ReadyResource { @@ -8,6 +9,7 @@ module.exports = class Monitor extends ReadyResource { this.blobs = null this.name = opts.name this.entry = opts.entry + this.speed = null this._boundOnAppend = debounce(this._onAppend.bind(this)) this._boundOnUpload = this._onUpload.bind(this) @@ -18,12 +20,12 @@ module.exports = class Monitor extends ReadyResource { startTime: 0, percentage: 0, peersCount: 0, - speed: null, - blocks: null, - totalBytes: null, // local + bytes loaded during monitoring - monitoringBytes: null, // bytes loaded during monitoring - targetBytes: null, - targetBlocks: null + speed: 0, + blocks: 0, + totalBytes: 0, // local + bytes loaded during monitoring + monitoringBytes: 0, // bytes loaded during monitoring + targetBytes: 0, + targetBlocks: 0 } // Updated on each upload/download event @@ -80,23 +82,17 @@ module.exports = class Monitor extends ReadyResource { if (!this.entry || this.closing) return if (!isWithinRange(index, this.entry)) return + if (!stats.startTime) stats.startTime = Date.now() + if (!this.speed) this.speed = speedometer() stats.peersCount = from.replicator.peers.length stats.blocks++ stats.monitoringBytes += bytes stats.totalBytes += bytes + stats.speed = this.speed(bytes) + stats.percentage = Number(((stats.totalBytes / stats.targetBytes) * 100).toFixed(2)) - this._calculateStats(stats) this.emit('update') } - - _calculateStats (stats) { - if (!stats.startTime) stats.startTime = Date.now() - stats.percentage = Number(((stats.totalBytes / stats.targetBytes) * 100).toFixed(2)) - const timeElapsed = (Date.now() - stats.startTime) / 1000 - if (timeElapsed > 0) { - stats.speed = Math.floor(stats.monitoringBytes / timeElapsed) // bytes/sec - } - } } function isWithinRange (index, entry) { diff --git a/package.json b/package.json index 193e241..0ae8eb3 100644 --- a/package.json +++ b/package.json @@ -30,6 +30,7 @@ "mirror-drive": "^1.2.0", "ready-resource": "^1.0.0", "safety-catch": "^1.0.2", + "speedometer": "^1.1.0", "streamx": "^2.12.4", "sub-encoder": "^2.1.1", "unix-path-resolve": "^1.0.2" From 2733b4ff98a8c77b261e268d0c833640150ce35c Mon Sep 17 00:00:00 2001 From: MKPLKN <15703708+MKPLKN@users.noreply.github.com> Date: Thu, 5 Sep 2024 08:55:30 +0300 Subject: [PATCH 08/12] expose upload/download speed methods --- lib/monitor.js | 18 +++++++++++++++--- test.js | 4 +++- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/lib/monitor.js b/lib/monitor.js index 377a3b2..bf3ceae 100644 --- a/lib/monitor.js +++ b/lib/monitor.js @@ -9,7 +9,6 @@ module.exports = class Monitor extends ReadyResource { this.blobs = null this.name = opts.name this.entry = opts.entry - this.speed = null this._boundOnAppend = debounce(this._onAppend.bind(this)) this._boundOnUpload = this._onUpload.bind(this) @@ -31,6 +30,9 @@ module.exports = class Monitor extends ReadyResource { // Updated on each upload/download event this.uploadStats = { ...stats } this.downloadStats = { ...stats } + + this.uploadSpeedometer = null + this.downloadSpeedometer = null } async _open () { @@ -71,10 +73,14 @@ module.exports = class Monitor extends ReadyResource { } _onUpload (index, bytes, from) { + if (!this.uploadSpeedometer) this.uploadSpeedometer = speedometer() + this.uploadStats.speed = this.uploadSpeedometer(bytes) this._updateStats(this.uploadStats, index, bytes, from) } _onDownload (index, bytes, from) { + if (!this.downloadSpeedometer) this.downloadSpeedometer = speedometer() + this.downloadStats.speed = this.downloadSpeedometer(bytes) this._updateStats(this.downloadStats, index, bytes, from) } @@ -83,16 +89,22 @@ module.exports = class Monitor extends ReadyResource { if (!isWithinRange(index, this.entry)) return if (!stats.startTime) stats.startTime = Date.now() - if (!this.speed) this.speed = speedometer() stats.peersCount = from.replicator.peers.length stats.blocks++ stats.monitoringBytes += bytes stats.totalBytes += bytes - stats.speed = this.speed(bytes) stats.percentage = Number(((stats.totalBytes / stats.targetBytes) * 100).toFixed(2)) this.emit('update') } + + downloadSpeed () { + return this.downloadSpeedometer ? this.downloadSpeedometer() : 0 + } + + uploadSpeed () { + return this.uploadSpeedometer ? this.uploadSpeedometer() : 0 + } } function isWithinRange (index, entry) { diff --git a/test.js b/test.js index 01025a6..b748ddf 100644 --- a/test.js +++ b/test.js @@ -1563,7 +1563,7 @@ test('drive.list (recursive false) ignore', async (t) => { }) test('upload/download can be monitored', async (t) => { - t.plan(21) + t.plan(25) const { corestore, drive, swarm, mirror } = await testenv(t.teardown) swarm.on('connection', (conn) => corestore.replicate(conn)) swarm.join(drive.discoveryKey, { server: true, client: false }) @@ -1589,6 +1589,7 @@ test('upload/download can be monitored', async (t) => { t.is(monitor.uploadStats.monitoringBytes, expectedBytes.pop()) t.is(monitor.uploadStats.targetBlocks, 2) t.is(monitor.uploadStats.targetBytes, bytes) + t.is(monitor.uploadSpeed(), monitor.uploadStats.speed) t.absent(monitor.downloadStats.blocks) }) } @@ -1606,6 +1607,7 @@ test('upload/download can be monitored', async (t) => { t.is(monitor.downloadStats.monitoringBytes, expectedBytes.pop()) t.is(monitor.downloadStats.targetBlocks, 2) t.is(monitor.downloadStats.targetBytes, bytes) + t.is(monitor.downloadSpeed(), monitor.downloadStats.speed) t.absent(monitor.uploadStats.blocks) }) } From 589bc24fb216e4072cc50a1fa67ca02451598712 Mon Sep 17 00:00:00 2001 From: MKPLKN <15703708+MKPLKN@users.noreply.github.com> Date: Thu, 5 Sep 2024 20:07:59 +0300 Subject: [PATCH 09/12] gc monitors --- index.js | 13 ++++++++++++- lib/monitor.js | 5 ++--- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/index.js b/index.js index f2b6ee9..54be5a1 100644 --- a/index.js +++ b/index.js @@ -31,6 +31,7 @@ module.exports = class Hyperdrive extends ReadyResource { this.blobs = null this.supportsMetadata = true this.encryptionKey = opts.encryptionKey || null + this.monitors = new Set() this._active = opts.active !== false this._openingBlobs = null @@ -189,6 +190,8 @@ module.exports = class Hyperdrive extends ReadyResource { if (!this._checkout && !this._batching) { await this.corestore.close() } + + await this.closeMonitors() } async _openBlobsFromHeader (opts) { @@ -280,7 +283,15 @@ module.exports = class Hyperdrive extends ReadyResource { } monitor (name, opts = {}) { - return new Monitor(this, { name, ...opts }) + const monitor = new Monitor(this, { name, ...opts }) + this.monitors.add(monitor) + return monitor + } + + async closeMonitors () { + const closing = [] + for (const monitor of this.monitors) closing.push(monitor.close()) + await Promise.allSettled(closing) } async get (name, opts) { diff --git a/lib/monitor.js b/lib/monitor.js index bf3ceae..47cd01a 100644 --- a/lib/monitor.js +++ b/lib/monitor.js @@ -13,12 +13,11 @@ module.exports = class Monitor extends ReadyResource { this._boundOnAppend = debounce(this._onAppend.bind(this)) this._boundOnUpload = this._onUpload.bind(this) this._boundOnDownload = this._onDownload.bind(this) - this.drive.on('close', () => this.close()) const stats = { startTime: 0, percentage: 0, - peersCount: 0, + peers: 0, speed: 0, blocks: 0, totalBytes: 0, // local + bytes loaded during monitoring @@ -89,7 +88,7 @@ module.exports = class Monitor extends ReadyResource { if (!isWithinRange(index, this.entry)) return if (!stats.startTime) stats.startTime = Date.now() - stats.peersCount = from.replicator.peers.length + stats.peers = from.replicator.peers.length stats.blocks++ stats.monitoringBytes += bytes stats.totalBytes += bytes From f2aa6b550a40f0acd7712e17b6f75a231d4763a5 Mon Sep 17 00:00:00 2001 From: MKPLKN <15703708+MKPLKN@users.noreply.github.com> Date: Fri, 6 Sep 2024 13:57:31 +0300 Subject: [PATCH 10/12] Remove monitor from the Set on close --- lib/monitor.js | 1 + test.js | 9 +++++++++ 2 files changed, 10 insertions(+) diff --git a/lib/monitor.js b/lib/monitor.js index 47cd01a..4eee778 100644 --- a/lib/monitor.js +++ b/lib/monitor.js @@ -50,6 +50,7 @@ module.exports = class Monitor extends ReadyResource { this.blobs.core.off('append', this._boundOnAppend) this.blobs.core.off('upload', this._boundOnUpload) this.blobs.core.off('download', this._boundOnDownload) + this.drive.monitors.delete(this) } async _onAppend () { diff --git a/test.js b/test.js index b748ddf..cb4d860 100644 --- a/test.js +++ b/test.js @@ -1615,6 +1615,15 @@ test('upload/download can be monitored', async (t) => { await mirror.drive.get(file) }) +test('monitor is removed from the Set on close', async (t) => { + const { drive } = await testenv(t.teardown) + const monitor = drive.monitor('/example.md') + await monitor.ready() + t.is(drive.monitors.size, 1) + await monitor.close() + t.is(drive.monitors.size, 0) +}) + async function testenv (teardown) { const corestore = new Corestore(RAM) await corestore.ready() From ca57bc6fa4a9174eea82771e8c799c7b2251edf9 Mon Sep 17 00:00:00 2001 From: MKPLKN <15703708+MKPLKN@users.noreply.github.com> Date: Mon, 9 Sep 2024 08:11:08 +0300 Subject: [PATCH 11/12] Resolve comments --- lib/monitor.js | 26 +++++++++++--------------- package.json | 1 - test.js | 7 ++++--- 3 files changed, 15 insertions(+), 19 deletions(-) diff --git a/lib/monitor.js b/lib/monitor.js index 4eee778..7721325 100644 --- a/lib/monitor.js +++ b/lib/monitor.js @@ -1,16 +1,15 @@ const ReadyResource = require('ready-resource') +const safetyCatch = require('safety-catch') const speedometer = require('speedometer') -const debounce = require('debounceify') module.exports = class Monitor extends ReadyResource { constructor (drive, opts = {}) { super() this.drive = drive this.blobs = null - this.name = opts.name - this.entry = opts.entry + this.name = opts.name || null + this.entry = opts.entry || null - this._boundOnAppend = debounce(this._onAppend.bind(this)) this._boundOnUpload = this._onUpload.bind(this) this._boundOnDownload = this._onDownload.bind(this) @@ -32,34 +31,27 @@ module.exports = class Monitor extends ReadyResource { this.uploadSpeedometer = null this.downloadSpeedometer = null + + this.ready().catch(safetyCatch) } async _open () { await this.drive.ready() this.blobs = await this.drive.getBlobs() - this.entry = await this.drive.entry(this.name) + if (!this.entry && this.name) this.entry = await this.drive.entry(this.name) if (this.entry) this._setEntryInfo() // Handlers - this.blobs.core.on('append', this._boundOnAppend) this.blobs.core.on('upload', this._boundOnUpload) this.blobs.core.on('download', this._boundOnDownload) } async _close () { - this.blobs.core.off('append', this._boundOnAppend) this.blobs.core.off('upload', this._boundOnUpload) this.blobs.core.off('download', this._boundOnDownload) this.drive.monitors.delete(this) } - async _onAppend () { - if (this.entry) return - await new Promise(resolve => setImmediate(resolve)) - this.entry = await this.drive.entry(this.name) - if (this.entry) this._setEntryInfo() - } - _setEntryInfo () { if (!this.downloadStats.targetBytes || !this.downloadStats.targetBlocks) { this.downloadStats.targetBytes = this.entry.value.blob.byteLength @@ -93,7 +85,7 @@ module.exports = class Monitor extends ReadyResource { stats.blocks++ stats.monitoringBytes += bytes stats.totalBytes += bytes - stats.percentage = Number(((stats.totalBytes / stats.targetBytes) * 100).toFixed(2)) + stats.percentage = toFixed(stats.totalBytes / stats.targetBytes * 100) this.emit('update') } @@ -112,3 +104,7 @@ function isWithinRange (index, entry) { const { blockOffset, blockLength } = entry.value.blob return index >= blockOffset && index < blockOffset + blockLength } + +function toFixed (n) { + return Math.round(n * 100) / 100 +} diff --git a/package.json b/package.json index 0ae8eb3..01e508a 100644 --- a/package.json +++ b/package.json @@ -21,7 +21,6 @@ }, "homepage": "https://github.com/holepunchto/hyperdrive#readme", "dependencies": { - "debounceify": "^1.1.0", "hyperbee": "^2.11.1", "hyperblobs": "^2.3.0", "hypercore": "^10.33.0", diff --git a/test.js b/test.js index cb4d860..38bcfc0 100644 --- a/test.js +++ b/test.js @@ -1563,7 +1563,7 @@ test('drive.list (recursive false) ignore', async (t) => { }) test('upload/download can be monitored', async (t) => { - t.plan(25) + t.plan(27) const { corestore, drive, swarm, mirror } = await testenv(t.teardown) swarm.on('connection', (conn) => corestore.replicate(conn)) swarm.join(drive.discoveryKey, { server: true, client: false }) @@ -1576,6 +1576,7 @@ test('upload/download can be monitored', async (t) => { const file = '/example.md' const bytes = 1024 * 100 // big enough to trigger more than one update event const buffer = Buffer.alloc(bytes, '0') + await drive.put(file, buffer) { // Start monitoring upload @@ -1590,12 +1591,11 @@ test('upload/download can be monitored', async (t) => { t.is(monitor.uploadStats.targetBlocks, 2) t.is(monitor.uploadStats.targetBytes, bytes) t.is(monitor.uploadSpeed(), monitor.uploadStats.speed) + if (!expectedBlocks.length) t.is(monitor.uploadStats.percentage, 100) t.absent(monitor.downloadStats.blocks) }) } - await drive.put(file, buffer) - { // Start monitoring download const monitor = mirror.drive.monitor(file) @@ -1608,6 +1608,7 @@ test('upload/download can be monitored', async (t) => { t.is(monitor.downloadStats.targetBlocks, 2) t.is(monitor.downloadStats.targetBytes, bytes) t.is(monitor.downloadSpeed(), monitor.downloadStats.speed) + if (!expectedBlocks.length) t.is(monitor.downloadStats.percentage, 100) t.absent(monitor.uploadStats.blocks) }) } From 26e68a139e424fc7f5d745bc66acda1fd4eb97b2 Mon Sep 17 00:00:00 2001 From: MKPLKN <15703708+MKPLKN@users.noreply.github.com> Date: Mon, 9 Sep 2024 09:54:43 +0300 Subject: [PATCH 12/12] Add a note --- lib/monitor.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/monitor.js b/lib/monitor.js index 7721325..c41d30d 100644 --- a/lib/monitor.js +++ b/lib/monitor.js @@ -85,7 +85,8 @@ module.exports = class Monitor extends ReadyResource { stats.blocks++ stats.monitoringBytes += bytes stats.totalBytes += bytes - stats.percentage = toFixed(stats.totalBytes / stats.targetBytes * 100) + // NOTE: you should not rely on the percentage until the monitor is initialized with the local state of the file + stats.percentage = toFixed(stats.blocks / stats.targetBlocks * 100) this.emit('update') }