Skip to content

Commit

Permalink
Merge branch 'main' into chore/linting
Browse files Browse the repository at this point in the history
  • Loading branch information
kibertoad authored Nov 28, 2023
2 parents 6b0f791 + 03f458d commit 85319b5
Show file tree
Hide file tree
Showing 11 changed files with 133 additions and 123 deletions.
17 changes: 15 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ jobs:
runs-on: ubuntu-latest
services:
rabbitmq:
image: rabbitmq
image: rabbitmq:3.12-alpine
ports:
- 5672:5672

strategy:
matrix:
node-version: [10.x, 12.x, 14.x, 16.x, 18.x]
node-version: [10.x, 12.x, 14.x, 16.x, 18.x, 20.x]
# See supported Node.js release schedule at https://nodejs.org/en/about/releases/

steps:
Expand All @@ -34,6 +34,19 @@ jobs:
# Install all prerequisites
- run: npm ci

# Ensure RabbitMQ is available before continuing
- run: |
n=0
while :
do
sleep 5
echo 'HELO\n\n\n\n' | nc localhost 5672 | grep AMQP
[[ $? = 0 ]] && break || ((n++))
(( n >= 5 )) && break
done
- run: echo 'HELO\n\n\n\n' | nc localhost 5672 | grep AMQP

# Run the tests
- run: make test

Expand Down
10 changes: 5 additions & 5 deletions bin/generate-defs.js
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ function decoderFn(method) {
break;
case 'longstr':
println('len = buffer.readUInt32BE(offset); offset += 4;');
println('val = buffer.slice(offset, offset + len);');
println('val = buffer.subarray(offset, offset + len);');
println('offset += len;');
break;
case 'shortstr':
Expand All @@ -505,7 +505,7 @@ function decoderFn(method) {
break;
case 'table':
println('len = buffer.readUInt32BE(offset); offset += 4;');
println('val = decodeFields(buffer.slice(offset, offset + len));');
println('val = decodeFields(buffer.subarray(offset, offset + len));');
println('offset += len;');
break;
default:
Expand Down Expand Up @@ -657,7 +657,7 @@ function encodePropsFn(props) {
// size does not include the frame header or frame end byte
println('buffer.writeUInt32BE(offset - 7, 3);');
println('buffer.writeUInt16BE(flags, 19);');
println('return buffer.slice(0, offset + 1);');
println('return buffer.subarray(0, offset + 1);');
println('}');
}

Expand Down Expand Up @@ -697,7 +697,7 @@ function decodePropsFn(props) {
break;
case 'longstr':
println('len = buffer.readUInt32BE(offset); offset += 4;');
println('val = buffer.slice(offset, offset + len);');
println('val = buffer.subarray(offset, offset + len);');
println('offset += len;');
break;
case 'shortstr':
Expand All @@ -707,7 +707,7 @@ function decodePropsFn(props) {
break;
case 'table':
println('len = buffer.readUInt32BE(offset); offset += 4;');
println('val = decodeFields(buffer.slice(offset, offset + len));');
println('val = decodeFields(buffer.subarray(offset, offset + len));');
println('offset += len;');
break;
default:
Expand Down
2 changes: 1 addition & 1 deletion lib/callback_model.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

const util = require('util')
const defs = require('./defs');
const EventEmitter = require('events').EventEmitter;
const EventEmitter = require('events');
const BaseChannel = require('./channel').BaseChannel;
const acceptMessage = require('./channel').acceptMessage;
const Args = require('./api_args');
Expand Down
2 changes: 1 addition & 1 deletion lib/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ var closeMsg = require('./format').closeMessage;
var inspect = require('./format').inspect;
var methodName = require('./format').methodName;
var assert = require('assert');
var EventEmitter = require('events').EventEmitter;
var EventEmitter = require('events');
var fmt = require('util').format;
var IllegalOperationError = require('./error').IllegalOperationError;
var stackCapture = require('./error').stackCapture;
Expand Down
4 changes: 2 additions & 2 deletions lib/codec.js
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ function decodeFields(slice) {
break;
case 'F':
len = slice.readUInt32BE(offset); offset += 4;
val = decodeFields(slice.slice(offset, offset + len));
val = decodeFields(slice.subarray(offset, offset + len));
offset += len;
break;
case 'A':
Expand Down Expand Up @@ -290,7 +290,7 @@ function decodeFields(slice) {
break;
case 'x':
len = slice.readUInt32BE(offset); offset += 4;
val = slice.slice(offset, offset + len);
val = slice.subarray(offset, offset + len);
offset += len;
break;
default:
Expand Down
4 changes: 2 additions & 2 deletions lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ var Mux = require('./mux').Mux;
var Duplex =
require('stream').Duplex ||
require('readable-stream/duplex');
var EventEmitter = require('events').EventEmitter;
var EventEmitter = require('events');
var Heart = require('./heartbeat').Heart;

var methodName = require('./format').methodName;
Expand Down Expand Up @@ -538,7 +538,7 @@ class Connection extends EventEmitter {

for (var offset = 0; offset < body.length; offset += maxBody) {
var end = offset + maxBody;
var slice = (end > body.length) ? body.slice(offset) : body.slice(offset, end);
var slice = (end > body.length) ? body.subarray(offset) : body.subarray(offset, end);
var bodyFrame = makeBodyFrame(channel, slice);
writeResult = buffer.write(bodyFrame);
}
Expand Down
2 changes: 1 addition & 1 deletion lib/credentials.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ module.exports.amqplain = function(user, passwd) {
response: function() {
const buffer = Buffer.alloc(16384);
const size = codec.encodeTable(buffer, { LOGIN: user, PASSWORD: passwd}, 0);
return buffer.slice(4, size);
return buffer.subarray(4, size);
},
username: user,
password: passwd
Expand Down
2 changes: 1 addition & 1 deletion lib/heartbeat.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@

'use strict';

var EventEmitter = require('events').EventEmitter;
var EventEmitter = require('events');

// Exported so that we can mess with it in tests
module.exports.UNITS_TO_MS = 1000;
Expand Down
195 changes: 96 additions & 99 deletions lib/mux.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,117 +13,114 @@ var assert = require('assert');
var schedule = (typeof setImmediate === 'function') ?
setImmediate : process.nextTick;

function Mux(downstream) {
this.newStreams = [];
this.oldStreams = [];
this.blocked = false;
this.scheduledRead = false;

this.out = downstream;
var self = this;
downstream.on('drain', function() {
self.blocked = false;
self._readIncoming();
});
}

// There are 2 states we can be in:

// - waiting for outbound capacity, which will be signalled by a
// - 'drain' event on the downstream; or,

// - no packets to send, waiting for an inbound buffer to have
// packets, which will be signalled by a 'readable' event

// If we write all packets available whenever there is outbound
// capacity, we will either run out of outbound capacity (`#write`
// returns false), or run out of packets (all calls to an
// `inbound.read()` have returned null).
class Mux {
constructor (downstream) {
this.newStreams = [];
this.oldStreams = [];
this.blocked = false;
this.scheduledRead = false;

this.out = downstream;
var self = this;
downstream.on('drain', function () {
self.blocked = false;
self._readIncoming();
});
}

Mux.prototype._readIncoming = function() {
// There are 2 states we can be in:
// - waiting for outbound capacity, which will be signalled by a
// - 'drain' event on the downstream; or,
// - no packets to send, waiting for an inbound buffer to have
// packets, which will be signalled by a 'readable' event
// If we write all packets available whenever there is outbound
// capacity, we will either run out of outbound capacity (`#write`
// returns false), or run out of packets (all calls to an
// `inbound.read()` have returned null).
_readIncoming () {

// We may be sent here speculatively, if an incoming stream has
// become readable
if (this.blocked) return;

var accepting = true;
var out = this.out;

// Try to read a chunk from each stream in turn, until all streams
// are empty, or we exhaust our ability to accept chunks.
function roundrobin (streams) {
var s;
while (accepting && (s = streams.shift())) {
var chunk = s.read();
if (chunk !== null) {
accepting = out.write(chunk);
streams.push(s);
}
}
}

// We may be sent here speculatively, if an incoming stream has
// become readable
if (this.blocked) return;
roundrobin(this.newStreams);

// Either we exhausted the new queues, or we ran out of capacity. If
// we ran out of capacity, all the remaining new streams (i.e.,
// those with packets left) become old streams. This effectively
// prioritises streams that keep their buffers close to empty over
// those that are constantly near full.
if (accepting) { // all new queues are exhausted, write as many as
// we can from the old streams
assert.equal(0, this.newStreams.length);
roundrobin(this.oldStreams);
}
else { // ran out of room
assert(this.newStreams.length > 0, "Expect some new streams to remain");
Array.prototype.push.apply(this.oldStreams, this.newStreams);
this.newStreams = [];
}
// We may have exhausted all the old queues, or run out of room;
// either way, all we need to do is record whether we have capacity
// or not, so any speculative reads will know
this.blocked = !accepting;
}

var accepting = true;
var out = this.out;
_scheduleRead () {
var self = this;

// Try to read a chunk from each stream in turn, until all streams
// are empty, or we exhaust our ability to accept chunks.
function roundrobin(streams) {
var s;
while (accepting && (s = streams.shift())) {
var chunk = s.read();
if (chunk !== null) {
accepting = out.write(chunk);
streams.push(s);
}
if (!self.scheduledRead) {
schedule(function () {
self.scheduledRead = false;
self._readIncoming();
});
self.scheduledRead = true;
}
}

roundrobin(this.newStreams);
pipeFrom (readable) {
var self = this;

// Either we exhausted the new queues, or we ran out of capacity. If
// we ran out of capacity, all the remaining new streams (i.e.,
// those with packets left) become old streams. This effectively
// prioritises streams that keep their buffers close to empty over
// those that are constantly near full.

if (accepting) { // all new queues are exhausted, write as many as
// we can from the old streams
assert.equal(0, this.newStreams.length);
roundrobin(this.oldStreams);
}
else { // ran out of room
assert(this.newStreams.length > 0, "Expect some new streams to remain");
Array.prototype.push.apply(this.oldStreams, this.newStreams);
this.newStreams = [];
}
// We may have exhausted all the old queues, or run out of room;
// either way, all we need to do is record whether we have capacity
// or not, so any speculative reads will know
this.blocked = !accepting;
};

Mux.prototype._scheduleRead = function() {
var self = this;

if (!self.scheduledRead) {
schedule(function() {
self.scheduledRead = false;
self._readIncoming();
});
self.scheduledRead = true;
}
};
function enqueue () {
self.newStreams.push(readable);
self._scheduleRead();
}

Mux.prototype.pipeFrom = function(readable) {
var self = this;
function cleanup () {
readable.removeListener('readable', enqueue);
readable.removeListener('error', cleanup);
readable.removeListener('end', cleanup);
readable.removeListener('unpipeFrom', cleanupIfMe);
}
function cleanupIfMe (dest) {
if (dest === self) cleanup();
}

function enqueue() {
self.newStreams.push(readable);
self._scheduleRead();
readable.on('unpipeFrom', cleanupIfMe);
readable.on('end', cleanup);
readable.on('error', cleanup);
readable.on('readable', enqueue);
}

function cleanup() {
readable.removeListener('readable', enqueue);
readable.removeListener('error', cleanup);
readable.removeListener('end', cleanup);
readable.removeListener('unpipeFrom', cleanupIfMe);
}
function cleanupIfMe(dest) {
if (dest === self) cleanup();
unpipeFrom (readable) {
readable.emit('unpipeFrom', this);
}

readable.on('unpipeFrom', cleanupIfMe);
readable.on('end', cleanup);
readable.on('error', cleanup);
readable.on('readable', enqueue);
};

Mux.prototype.unpipeFrom = function(readable) {
readable.emit('unpipeFrom', this);
};
}

module.exports.Mux = Mux;
Loading

0 comments on commit 85319b5

Please sign in to comment.