diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index a5ad93c3..455d3c37 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -14,13 +14,13 @@ jobs: runs-on: ubuntu-latest services: rabbitmq: - image: rabbitmq + image: rabbitmq:3.12-alpine ports: - 5672:5672 strategy: matrix: - node-version: [10.x, 12.x, 14.x, 16.x, 18.x] + node-version: [10.x, 12.x, 14.x, 16.x, 18.x, 20.x] # See supported Node.js release schedule at https://nodejs.org/en/about/releases/ steps: @@ -34,6 +34,19 @@ jobs: # Install all prerequisites - run: npm ci + # Ensure RabbitMQ is available before continuing + - run: | + n=0 + while : + do + sleep 5 + echo 'HELO\n\n\n\n' | nc localhost 5672 | grep AMQP + [[ $? = 0 ]] && break || ((n++)) + (( n >= 5 )) && break + done + + - run: echo 'HELO\n\n\n\n' | nc localhost 5672 | grep AMQP + # Run the tests - run: make test diff --git a/bin/generate-defs.js b/bin/generate-defs.js index ac7c71b5..6f71e018 100644 --- a/bin/generate-defs.js +++ b/bin/generate-defs.js @@ -495,7 +495,7 @@ function decoderFn(method) { break; case 'longstr': println('len = buffer.readUInt32BE(offset); offset += 4;'); - println('val = buffer.slice(offset, offset + len);'); + println('val = buffer.subarray(offset, offset + len);'); println('offset += len;'); break; case 'shortstr': @@ -505,7 +505,7 @@ function decoderFn(method) { break; case 'table': println('len = buffer.readUInt32BE(offset); offset += 4;'); - println('val = decodeFields(buffer.slice(offset, offset + len));'); + println('val = decodeFields(buffer.subarray(offset, offset + len));'); println('offset += len;'); break; default: @@ -657,7 +657,7 @@ function encodePropsFn(props) { // size does not include the frame header or frame end byte println('buffer.writeUInt32BE(offset - 7, 3);'); println('buffer.writeUInt16BE(flags, 19);'); - println('return buffer.slice(0, offset + 1);'); + println('return buffer.subarray(0, offset + 1);'); println('}'); } @@ -697,7 +697,7 @@ function decodePropsFn(props) { break; case 'longstr': println('len = buffer.readUInt32BE(offset); offset += 4;'); - println('val = buffer.slice(offset, offset + len);'); + println('val = buffer.subarray(offset, offset + len);'); println('offset += len;'); break; case 'shortstr': @@ -707,7 +707,7 @@ function decodePropsFn(props) { break; case 'table': println('len = buffer.readUInt32BE(offset); offset += 4;'); - println('val = decodeFields(buffer.slice(offset, offset + len));'); + println('val = decodeFields(buffer.subarray(offset, offset + len));'); println('offset += len;'); break; default: diff --git a/lib/callback_model.js b/lib/callback_model.js index 90d96a68..a5f968f6 100644 --- a/lib/callback_model.js +++ b/lib/callback_model.js @@ -6,7 +6,7 @@ const util = require('util') const defs = require('./defs'); -const EventEmitter = require('events').EventEmitter; +const EventEmitter = require('events'); const BaseChannel = require('./channel').BaseChannel; const acceptMessage = require('./channel').acceptMessage; const Args = require('./api_args'); diff --git a/lib/channel.js b/lib/channel.js index 5abe1552..1cd4643b 100644 --- a/lib/channel.js +++ b/lib/channel.js @@ -11,7 +11,7 @@ var closeMsg = require('./format').closeMessage; var inspect = require('./format').inspect; var methodName = require('./format').methodName; var assert = require('assert'); -var EventEmitter = require('events').EventEmitter; +var EventEmitter = require('events'); var fmt = require('util').format; var IllegalOperationError = require('./error').IllegalOperationError; var stackCapture = require('./error').stackCapture; diff --git a/lib/codec.js b/lib/codec.js index 2c23ebd2..13954976 100644 --- a/lib/codec.js +++ b/lib/codec.js @@ -262,7 +262,7 @@ function decodeFields(slice) { break; case 'F': len = slice.readUInt32BE(offset); offset += 4; - val = decodeFields(slice.slice(offset, offset + len)); + val = decodeFields(slice.subarray(offset, offset + len)); offset += len; break; case 'A': @@ -290,7 +290,7 @@ function decodeFields(slice) { break; case 'x': len = slice.readUInt32BE(offset); offset += 4; - val = slice.slice(offset, offset + len); + val = slice.subarray(offset, offset + len); offset += len; break; default: diff --git a/lib/connection.js b/lib/connection.js index a2435cf2..b37e3cf6 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -13,7 +13,7 @@ var Mux = require('./mux').Mux; var Duplex = require('stream').Duplex || require('readable-stream/duplex'); -var EventEmitter = require('events').EventEmitter; +var EventEmitter = require('events'); var Heart = require('./heartbeat').Heart; var methodName = require('./format').methodName; @@ -538,7 +538,7 @@ class Connection extends EventEmitter { for (var offset = 0; offset < body.length; offset += maxBody) { var end = offset + maxBody; - var slice = (end > body.length) ? body.slice(offset) : body.slice(offset, end); + var slice = (end > body.length) ? body.subarray(offset) : body.subarray(offset, end); var bodyFrame = makeBodyFrame(channel, slice); writeResult = buffer.write(bodyFrame); } diff --git a/lib/credentials.js b/lib/credentials.js index 194659a8..a9452845 100644 --- a/lib/credentials.js +++ b/lib/credentials.js @@ -27,7 +27,7 @@ module.exports.amqplain = function(user, passwd) { response: function() { const buffer = Buffer.alloc(16384); const size = codec.encodeTable(buffer, { LOGIN: user, PASSWORD: passwd}, 0); - return buffer.slice(4, size); + return buffer.subarray(4, size); }, username: user, password: passwd diff --git a/lib/heartbeat.js b/lib/heartbeat.js index 9c6c2e69..e5f373ab 100644 --- a/lib/heartbeat.js +++ b/lib/heartbeat.js @@ -47,7 +47,7 @@ 'use strict'; -var EventEmitter = require('events').EventEmitter; +var EventEmitter = require('events'); // Exported so that we can mess with it in tests module.exports.UNITS_TO_MS = 1000; diff --git a/lib/mux.js b/lib/mux.js index 433622fe..c712cd77 100644 --- a/lib/mux.js +++ b/lib/mux.js @@ -13,117 +13,114 @@ var assert = require('assert'); var schedule = (typeof setImmediate === 'function') ? setImmediate : process.nextTick; -function Mux(downstream) { - this.newStreams = []; - this.oldStreams = []; - this.blocked = false; - this.scheduledRead = false; - - this.out = downstream; - var self = this; - downstream.on('drain', function() { - self.blocked = false; - self._readIncoming(); - }); -} - -// There are 2 states we can be in: - -// - waiting for outbound capacity, which will be signalled by a -// - 'drain' event on the downstream; or, - -// - no packets to send, waiting for an inbound buffer to have -// packets, which will be signalled by a 'readable' event - -// If we write all packets available whenever there is outbound -// capacity, we will either run out of outbound capacity (`#write` -// returns false), or run out of packets (all calls to an -// `inbound.read()` have returned null). +class Mux { + constructor (downstream) { + this.newStreams = []; + this.oldStreams = []; + this.blocked = false; + this.scheduledRead = false; + + this.out = downstream; + var self = this; + downstream.on('drain', function () { + self.blocked = false; + self._readIncoming(); + }); + } -Mux.prototype._readIncoming = function() { + // There are 2 states we can be in: + // - waiting for outbound capacity, which will be signalled by a + // - 'drain' event on the downstream; or, + // - no packets to send, waiting for an inbound buffer to have + // packets, which will be signalled by a 'readable' event + // If we write all packets available whenever there is outbound + // capacity, we will either run out of outbound capacity (`#write` + // returns false), or run out of packets (all calls to an + // `inbound.read()` have returned null). + _readIncoming () { + + // We may be sent here speculatively, if an incoming stream has + // become readable + if (this.blocked) return; + + var accepting = true; + var out = this.out; + + // Try to read a chunk from each stream in turn, until all streams + // are empty, or we exhaust our ability to accept chunks. + function roundrobin (streams) { + var s; + while (accepting && (s = streams.shift())) { + var chunk = s.read(); + if (chunk !== null) { + accepting = out.write(chunk); + streams.push(s); + } + } + } - // We may be sent here speculatively, if an incoming stream has - // become readable - if (this.blocked) return; + roundrobin(this.newStreams); + + // Either we exhausted the new queues, or we ran out of capacity. If + // we ran out of capacity, all the remaining new streams (i.e., + // those with packets left) become old streams. This effectively + // prioritises streams that keep their buffers close to empty over + // those that are constantly near full. + if (accepting) { // all new queues are exhausted, write as many as + // we can from the old streams + assert.equal(0, this.newStreams.length); + roundrobin(this.oldStreams); + } + else { // ran out of room + assert(this.newStreams.length > 0, "Expect some new streams to remain"); + Array.prototype.push.apply(this.oldStreams, this.newStreams); + this.newStreams = []; + } + // We may have exhausted all the old queues, or run out of room; + // either way, all we need to do is record whether we have capacity + // or not, so any speculative reads will know + this.blocked = !accepting; + } - var accepting = true; - var out = this.out; + _scheduleRead () { + var self = this; - // Try to read a chunk from each stream in turn, until all streams - // are empty, or we exhaust our ability to accept chunks. - function roundrobin(streams) { - var s; - while (accepting && (s = streams.shift())) { - var chunk = s.read(); - if (chunk !== null) { - accepting = out.write(chunk); - streams.push(s); - } + if (!self.scheduledRead) { + schedule(function () { + self.scheduledRead = false; + self._readIncoming(); + }); + self.scheduledRead = true; } } - roundrobin(this.newStreams); + pipeFrom (readable) { + var self = this; - // Either we exhausted the new queues, or we ran out of capacity. If - // we ran out of capacity, all the remaining new streams (i.e., - // those with packets left) become old streams. This effectively - // prioritises streams that keep their buffers close to empty over - // those that are constantly near full. - - if (accepting) { // all new queues are exhausted, write as many as - // we can from the old streams - assert.equal(0, this.newStreams.length); - roundrobin(this.oldStreams); - } - else { // ran out of room - assert(this.newStreams.length > 0, "Expect some new streams to remain"); - Array.prototype.push.apply(this.oldStreams, this.newStreams); - this.newStreams = []; - } - // We may have exhausted all the old queues, or run out of room; - // either way, all we need to do is record whether we have capacity - // or not, so any speculative reads will know - this.blocked = !accepting; -}; - -Mux.prototype._scheduleRead = function() { - var self = this; - - if (!self.scheduledRead) { - schedule(function() { - self.scheduledRead = false; - self._readIncoming(); - }); - self.scheduledRead = true; - } -}; + function enqueue () { + self.newStreams.push(readable); + self._scheduleRead(); + } -Mux.prototype.pipeFrom = function(readable) { - var self = this; + function cleanup () { + readable.removeListener('readable', enqueue); + readable.removeListener('error', cleanup); + readable.removeListener('end', cleanup); + readable.removeListener('unpipeFrom', cleanupIfMe); + } + function cleanupIfMe (dest) { + if (dest === self) cleanup(); + } - function enqueue() { - self.newStreams.push(readable); - self._scheduleRead(); + readable.on('unpipeFrom', cleanupIfMe); + readable.on('end', cleanup); + readable.on('error', cleanup); + readable.on('readable', enqueue); } - function cleanup() { - readable.removeListener('readable', enqueue); - readable.removeListener('error', cleanup); - readable.removeListener('end', cleanup); - readable.removeListener('unpipeFrom', cleanupIfMe); - } - function cleanupIfMe(dest) { - if (dest === self) cleanup(); + unpipeFrom (readable) { + readable.emit('unpipeFrom', this); } - - readable.on('unpipeFrom', cleanupIfMe); - readable.on('end', cleanup); - readable.on('error', cleanup); - readable.on('readable', enqueue); -}; - -Mux.prototype.unpipeFrom = function(readable) { - readable.emit('unpipeFrom', this); -}; +} module.exports.Mux = Mux; diff --git a/test/codec.js b/test/codec.js index 8a668c42..2acc773d 100644 --- a/test/codec.js +++ b/test/codec.js @@ -70,7 +70,7 @@ suite("Implicit encodings", function() { test(name, function() { var buffer = Buffer.alloc(1000); var size = codec.encodeTable(buffer, val, 0); - var result = buffer.slice(4, size); + var result = buffer.subarray(4, size); assert.deepEqual(expect, bufferToArray(result)); }); }); @@ -83,7 +83,7 @@ var amqp = require('./data'); function roundtrip_table(t) { var buf = Buffer.alloc(4096); var size = codec.encodeTable(buf, t, 0); - var decoded = codec.decodeFields(buf.slice(4, size)); // ignore the length-prefix + var decoded = codec.decodeFields(buf.subarray(4, size)); // ignore the length-prefix try { assert.deepEqual(removeExplicitTypes(t), decoded); } @@ -204,7 +204,7 @@ function roundtripMethod(Method) { return forAll(Method).satisfy(function(method) { var buf = defs.encodeMethod(method.id, 0, method.fields); // FIXME depends on framing, ugh - var fs1 = defs.decode(method.id, buf.slice(11, buf.length)); + var fs1 = defs.decode(method.id, buf.subarray(11, buf.length)); assertEqualModuloDefaults(method, fs1); return true; }); @@ -215,7 +215,7 @@ function roundtripProperties(Properties) { var buf = defs.encodeProperties(properties.id, 0, properties.size, properties.fields); // FIXME depends on framing, ugh - var fs1 = defs.decode(properties.id, buf.slice(19, buf.length)); + var fs1 = defs.decode(properties.id, buf.subarray(19, buf.length)); assert.equal(properties.size, ints.readUInt64BE(buf, 11)); assertEqualModuloDefaults(properties, fs1); return true; diff --git a/test/frame.js b/test/frame.js index 602ca3c0..53551afc 100644 --- a/test/frame.js +++ b/test/frame.js @@ -38,9 +38,9 @@ suite("Explicit parsing", function() { test('Parse partitioned', function() { var input = inputs(); var frames = new Frames(input); - input.write(HB.slice(0, 3)); + input.write(HB.subarray(0, 3)); assert(!frames.recvFrame()); - input.write(HB.slice(3)); + input.write(HB.subarray(3)); assert(frames.recvFrame() === HEARTBEAT); assert(!frames.recvFrame()); }); @@ -143,9 +143,9 @@ suite("Parsing", function() { var onethird = Math.floor(full.length / 3); var twothirds = 2 * onethird; return [ - full.slice(0, onethird), - full.slice(onethird, twothirds), - full.slice(twothirds) + full.subarray(0, onethird), + full.subarray(onethird, twothirds), + full.subarray(twothirds) ]; })); });