From e39b2e2910571eba03c1ba2f84a087e4903858c9 Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Mon, 5 Feb 2024 16:12:05 +0100 Subject: [PATCH] fix: remove abortable iterator (#488) AbortableSource is slow because it races promises against every chunk causing extra async work. It's only really necessary if we're going to pass the source off to another component. Here we don't do that so it's simpler to just add a listener for the abort event and close the stream. --- package.json | 1 - src/stream.ts | 28 ++++++++++++++++++---------- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/package.json b/package.json index 9e4f6129..510a40c8 100644 --- a/package.json +++ b/package.json @@ -78,7 +78,6 @@ "@libp2p/peer-id": "^4.0.5", "@libp2p/pubsub": "^9.0.8", "@multiformats/multiaddr": "^12.1.14", - "abortable-iterator": "^5.0.1", "denque": "^2.1.0", "it-length-prefixed": "^9.0.4", "it-pipe": "^3.0.1", diff --git a/src/stream.ts b/src/stream.ts index 505c7897..7c9db2eb 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -1,4 +1,3 @@ -import { abortableSource } from 'abortable-iterator' import { encode, decode } from 'it-length-prefixed' import { pipe } from 'it-pipe' import { pushable, type Pushable } from 'it-pushable' @@ -25,8 +24,15 @@ export class OutboundStream { this.closeController = new AbortController() this.maxBufferSize = opts.maxBufferSize ?? Infinity + this.closeController.signal.addEventListener('abort', () => { + rawStream.close() + .catch(err => { + rawStream.abort(err) + }) + }) + pipe( - abortableSource(this.pushable, this.closeController.signal, { returnOnAbort: true }), + this.pushable, this.rawStream ).catch(errCallback) } @@ -59,7 +65,6 @@ export class OutboundStream { this.closeController.abort() // similar to pushable.end() but clear the internal buffer await this.pushable.return() - await this.rawStream.close() } } @@ -73,17 +78,20 @@ export class InboundStream { this.rawStream = rawStream this.closeController = new AbortController() - this.source = abortableSource( - pipe(this.rawStream, (source) => decode(source, opts)), - this.closeController.signal, - { - returnOnAbort: true - } + this.closeController.signal.addEventListener('abort', () => { + rawStream.close() + .catch(err => { + rawStream.abort(err) + }) + }) + + this.source = pipe( + this.rawStream, + (source) => decode(source, opts) ) } async close (): Promise { this.closeController.abort() - await this.rawStream.close() } }