diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index b282f51b4e885e..1b40192d9458ba 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -833,9 +833,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { // Start the flow if it hasn't been started already. if (dest.writableNeedDrain === true) { - if (state.flowing) { - pause(); - } + pause(); } else if (!state.flowing) { debug('pipe resume'); src.resume(); diff --git a/test/parallel/test-stream-pipe-deadlock.js b/test/parallel/test-stream-pipe-deadlock.js new file mode 100644 index 00000000000000..bf75445877baef --- /dev/null +++ b/test/parallel/test-stream-pipe-deadlock.js @@ -0,0 +1,27 @@ +'use strict'; + +const common = require('../common'); +const { Readable, Writable } = require('stream'); + +// https://github.com/nodejs/node/issues/48666 +(async () => { + // Prepare src that is internally ended, with buffered data pending + const src = new Readable({ read() {} }); + src.push(Buffer.alloc(100)); + src.push(null); + src.pause(); + + // Give it time to settle + await new Promise((resolve) => setImmediate(resolve)); + + const dst = new Writable({ + highWaterMark: 1000, + write(buf, enc, cb) { + process.nextTick(cb); + } + }); + + dst.write(Buffer.alloc(1000)); // Fill write buffer + dst.on('finish', common.mustCall()); + src.pipe(dst); +})().then(common.mustCall());