From aac9f44d95b1f50d620d62689288ce476ad28b49 Mon Sep 17 00:00:00 2001 From: Meet Mangukiya Date: Wed, 3 Jul 2019 08:02:10 +0530 Subject: [PATCH] Implement permessage-deflate extension --- lib/WebSocketClient.js | 18 ++- lib/WebSocketConnection.js | 292 ++++++++++++++++++++++++------------- package.json | 2 + 3 files changed, 204 insertions(+), 108 deletions(-) diff --git a/lib/WebSocketClient.js b/lib/WebSocketClient.js index fb7572b3..590e2682 100644 --- a/lib/WebSocketClient.js +++ b/lib/WebSocketClient.js @@ -24,6 +24,9 @@ var url = require('url'); var crypto = require('crypto'); var WebSocketConnection = require('./WebSocketConnection'); var bufferAllocUnsafe = utils.bufferAllocUnsafe; +var Extensions = require('websocket-extensions'); +var deflate = require('permessage-deflate'); + var protocolSeparators = [ '(', ')', '<', '>', '@', @@ -39,6 +42,7 @@ function WebSocketClient(config) { EventEmitter.call(this); // TODO: Implement extensions + this._exts = new Extensions(); this.config = { // 1MiB max frame size. @@ -103,7 +107,7 @@ function WebSocketClient(config) { } this._req = null; - + switch (this.config.webSocketVersion) { case 8: case 13: @@ -111,13 +115,17 @@ function WebSocketClient(config) { default: throw new Error('Requested webSocketVersion is not supported. Allowed values are 8 and 13.'); } + + if (this.config.perMessageDeflate) { + this._exts.add(deflate); + } } util.inherits(WebSocketClient, EventEmitter); WebSocketClient.prototype.connect = function(requestUrl, protocols, origin, headers, extraRequestOptions) { var self = this; - + if (typeof(protocols) === 'string') { if (protocols.length > 0) { protocols = [protocols]; @@ -193,7 +201,8 @@ WebSocketClient.prototype.connect = function(requestUrl, protocols, origin, head 'Connection': 'Upgrade', 'Sec-WebSocket-Version': this.config.webSocketVersion.toString(10), 'Sec-WebSocket-Key': this.base64nonce, - 'Host': reqHeaders.Host || hostHeaderValue + 'Host': reqHeaders.Host || hostHeaderValue, + 'Sec-WebSocket-Extensions': this._exts.generateOffer(), }); if (this.protocols.length > 0) { @@ -257,6 +266,7 @@ WebSocketClient.prototype.connect = function(requestUrl, protocols, origin, head req.removeListener('error', handleRequestError); self.socket = socket; self.response = response; + self._exts.activate(response.headers['sec-websocket-extensions']); self.firstDataChunk = head; self.validateHandshake(); }); @@ -340,7 +350,7 @@ WebSocketClient.prototype.failHandshake = function(errorDescription) { }; WebSocketClient.prototype.succeedHandshake = function() { - var connection = new WebSocketConnection(this.socket, [], this.protocol, true, this.config); + var connection = new WebSocketConnection(this.socket, [], this.protocol, true, this.config, this._exts); connection.webSocketVersion = this.config.webSocketVersion; connection._addSocketEventListeners(); diff --git a/lib/WebSocketConnection.js b/lib/WebSocketConnection.js index 9f2750cc..91f5705a 100644 --- a/lib/WebSocketConnection.js +++ b/lib/WebSocketConnection.js @@ -38,14 +38,15 @@ var setImmediateImpl = ('setImmediate' in global) ? var idCounter = 0; -function WebSocketConnection(socket, extensions, protocol, maskOutgoingPackets, config) { +function WebSocketConnection(socket, extensions, protocol, maskOutgoingPackets, config, exts) { this._debug = utils.BufferingLogger('websocket:connection', ++idCounter); this._debug('constructor'); - + this._exts = exts; + if (this._debug.enabled) { instrumentSocketForDebugging(this, socket); } - + // Superclass Constructor EventEmitter.call(this); @@ -85,7 +86,7 @@ function WebSocketConnection(socket, extensions, protocol, maskOutgoingPackets, this.currentFrame = new WebSocketFrame(this.maskBytes, this.frameHeader, this.config); this.fragmentationSize = 0; // data received so far... this.frameQueue = []; - + // Various bits of connection state this.connected = true; this.state = STATE_OPEN; @@ -132,7 +133,7 @@ function WebSocketConnection(socket, extensions, protocol, maskOutgoingPackets, } this.socket.setKeepAlive(true, this.config.keepaliveInterval); } - + // The HTTP Client seems to subscribe to socket error events // and re-dispatch them in such a way that doesn't make sense // for users of our client, so we want to make sure nobody @@ -306,22 +307,38 @@ WebSocketConnection.prototype.processReceivedData = function() { } // For now since we don't support extensions, all RSV bits are illegal - if (frame.rsv1 || frame.rsv2 || frame.rsv3) { - this._debug('-- illegal rsv flag'); - process.nextTick(function() { - self.drop(WebSocketConnection.CLOSE_REASON_PROTOCOL_ERROR, - 'Unsupported usage of rsv bits without negotiated extension.'); - }); - return; - } + //if (frame.rsv1 || frame.rsv2 || frame.rsv3) { + //this._debug('-- illegal rsv flag'); + //process.nextTick(function() { + //self.drop(WebSocketConnection.CLOSE_REASON_PROTOCOL_ERROR, + //'Unsupported usage of rsv bits without negotiated extension.'); + //}); + //return; + //} if (!this.assembleFragments) { this._debug('-- emitting frame'); process.nextTick(function() { self.emit('frame', frame); }); } + var adaptedFrame = { + final: frame.fin, + rsv1: frame.rsv1, + rsv2: frame.rsv2, + rsv3: frame.rsv3, + opcode: frame.opcode, + masked: frame.mask, + maskingKey: frame.maskingBytes, + payload: frame.binaryPayload, + }; + + if (!this._exts.validFrameRsv(adaptedFrame)) { + this.drop(WebSocketConnection.CLOSE_REASON_PROTOCOL_ERROR, + 'Invlalid RSV bits') + } + process.nextTick(function() { self.processFrame(frame); }); - + this.currentFrame = new WebSocketFrame(this.maskBytes, this.frameHeader, this.config); // If there's data remaining, schedule additional processing, but yield @@ -474,7 +491,7 @@ WebSocketConnection.prototype.drop = function(reasonCode, description, skipClose this._debug('Emitting WebSocketConnection close event'); this.emit('close', this.closeReasonCode, this.closeDescription); } - + this._debug('Drop: destroying socket'); this.socket.destroy(); }; @@ -511,7 +528,7 @@ WebSocketConnection.prototype.handleCloseTimer = function() { WebSocketConnection.prototype.processFrame = function(frame) { this._debug('processFrame'); this._debug(' -- frame: %s', frame); - + // Any non-control opcode besides 0x00 (continuation) received in the // middle of a fragmented message is illegal. if (this.frameQueue.length !== 0 && (frame.opcode > 0x00 && frame.opcode < 0x08)) { @@ -526,11 +543,26 @@ WebSocketConnection.prototype.processFrame = function(frame) { this._debug('-- Binary Frame'); if (this.assembleFragments) { if (frame.fin) { - // Complete single-frame message received - this._debug('---- Emitting \'message\' event'); - this.emit('message', { - type: 'binary', - binaryData: frame.binaryPayload + const adaptedMessage = { + rsv1: frame.rsv1, + rsv2: frame.rsv2, + rsv3: frame.rsv3, + opcode: frame.opcode, + data: frame.binaryPayload, + }; + + this._exts.processIncomingMessage(adaptedMessage, (err, msg) => { + if (err !== null) { + this.drop(WebSocketConnection.CLOSE_REASON_EXTENSION_REQUIRED, 'Extension failed'); + return; + } + + // Complete single-frame message received + this._debug('---- Emitting \'message\' event'); + this.emit('message', { + type: 'binary', + binaryData: msg.data, + }); }); } else { @@ -544,16 +576,31 @@ WebSocketConnection.prototype.processFrame = function(frame) { this._debug('-- Text Frame'); if (this.assembleFragments) { if (frame.fin) { - if (!Validation.isValidUTF8(frame.binaryPayload)) { - this.drop(WebSocketConnection.CLOSE_REASON_INVALID_DATA, - 'Invalid UTF-8 Data Received'); - return; - } - // Complete single-frame message received - this._debug('---- Emitting \'message\' event'); - this.emit('message', { - type: 'utf8', - utf8Data: frame.binaryPayload.toString('utf8') + const adaptedMessage = { + rsv1: frame.rsv1, + rsv2: frame.rsv2, + rsv3: frame.rsv3, + opcode: frame.opcode, + data: frame.binaryPayload, + }; + + this._exts.processIncomingMessage(adaptedMessage, (err, msg) => { + if (err !== null) { + this.drop(WebSocketConnection.CLOSE_REASON_EXTENSION_REQUIRED, 'Extension failed'); + return; + } + + if (!Validation.isValidUTF8(msg.data)) { + this.drop(WebSocketConnection.CLOSE_REASON_INVALID_DATA, + 'Invalid UTF-8 Data Received'); + return; + } + // Complete single-frame message received + this._debug('---- Emitting \'message\' event'); + this.emit('message', { + type: 'utf8', + utf8Data: msg.data.toString('utf8') + }); }); } else { @@ -596,29 +643,46 @@ WebSocketConnection.prototype.processFrame = function(frame) { this.frameQueue = []; this.fragmentationSize = 0; - switch (opcode) { - case 0x02: // WebSocketOpcode.BINARY_FRAME - this.emit('message', { - type: 'binary', - binaryData: binaryPayload - }); - break; - case 0x01: // WebSocketOpcode.TEXT_FRAME - if (!Validation.isValidUTF8(binaryPayload)) { - this.drop(WebSocketConnection.CLOSE_REASON_INVALID_DATA, - 'Invalid UTF-8 Data Received'); - return; - } - this.emit('message', { - type: 'utf8', - utf8Data: binaryPayload.toString('utf8') - }); - break; - default: - this.drop(WebSocketConnection.CLOSE_REASON_PROTOCOL_ERROR, - 'Unexpected first opcode in fragmentation sequence: 0x' + opcode.toString(16)); + const adaptedMessage = { + rsv1: this.frameQueue[0].rsv1, + rsv2: this.frameQueue[0].rsv2, + rsv3: this.frameQueue[0].rsv3, + opcode: opcode, + data: binaryPayload, + }; + + + this._exts.processIncomingMessage(adaptedMessage, (err, msg) => { + if (err !== null) { + this.drop(WebSocketConnection.CLOSE_REASON_EXTENSION_REQUIRED, 'Extension failed'); return; - } + } + + switch (opcode) { + case 0x02: // WebSocketOpcode.BINARY_FRAME + this.emit('message', { + type: 'binary', + binaryData: msg.data, + }); + break; + case 0x01: // WebSocketOpcode.TEXT_FRAME + if (!Validation.isValidUTF8(msg.data)) { + this.drop(WebSocketConnection.CLOSE_REASON_INVALID_DATA, + 'Invalid UTF-8 Data Received'); + return; + } + this.emit('message', { + type: 'utf8', + utf8Data: msg.data.toString('utf8') + }); + break; + default: + this.drop(WebSocketConnection.CLOSE_REASON_PROTOCOL_ERROR, + 'Unexpected first opcode in fragmentation sequence: 0x' + opcode.toString(16)); + return; + } + }); + } } break; @@ -629,8 +693,8 @@ WebSocketConnection.prototype.processFrame = function(frame) { // logic to emit the ping frame: this is only done when a listener is known to exist // Expose a function allowing the user to override the default ping() behavior var cancelled = false; - var cancel = function() { - cancelled = true; + var cancel = function() { + cancelled = true; }; this.emit('ping', cancel, frame.binaryPayload); @@ -660,7 +724,7 @@ WebSocketConnection.prototype.processFrame = function(frame) { this.socket.end(); return; } - + this._debug('---- Closing handshake initiated by peer.'); // Got request from other party to close connection. // Send back acknowledgement and then hang up. @@ -683,7 +747,7 @@ WebSocketConnection.prototype.processFrame = function(frame) { this.closeReasonCode = frame.closeStatus; respondCloseReasonCode = WebSocketConnection.CLOSE_REASON_PROTOCOL_ERROR; } - + // If there is a textual description in the close frame, extract it. if (frame.binaryPayload.length > 1) { if (!Validation.isValidUTF8(frame.binaryPayload)) { @@ -785,62 +849,82 @@ WebSocketConnection.prototype.fragmentAndSend = function(frame, cb) { throw new Error('You cannot fragment control frames.'); } - var threshold = this.config.fragmentationThreshold; - var length = frame.binaryPayload.length; + const adaptedMessage = { + data: frame.binaryPayload, + opcode: frame.opcode, + rsv1: frame.rsv1, + rsv2: frame.rsv2, + rsv3: frame.rsv3, + }; - // Send immediately if fragmentation is disabled or the message is not - // larger than the fragmentation threshold. - if (!this.config.fragmentOutgoingMessages || (frame.binaryPayload && length <= threshold)) { - frame.fin = true; - this.sendFrame(frame, cb); - return; - } - - var numFragments = Math.ceil(length / threshold); - var sentFragments = 0; - var sentCallback = function fragmentSentCallback(err) { - if (err) { - if (typeof cb === 'function') { - // pass only the first error - cb(err); - cb = null; - } + this._exts.processOutgoingMessage(adaptedMessage, (err, msg) => { + if (err !== null) { + this.drop(WebSocketConnection.CLOSE_REASON_EXTENSION_REQUIRED, 'Extension failed'); return; } - ++sentFragments; - if ((sentFragments === numFragments) && (typeof cb === 'function')) { - cb(); + + frame.rsv1 = msg.rsv1; + frame.rsv2 = msg.rsv2; + frame.rsv3 = msg.rsv3; + frame.binaryPayload = msg.data; + + var threshold = this.config.fragmentationThreshold; + var length = frame.binaryPayload.length; + + // Send immediately if fragmentation is disabled or the message is not + // larger than the fragmentation threshold. + if (!this.config.fragmentOutgoingMessages || (frame.binaryPayload && length <= threshold)) { + frame.fin = true; + this.sendFrame(frame, cb); + return; } - }; - for (var i=1; i <= numFragments; i++) { - var currentFrame = new WebSocketFrame(this.maskBytes, this.frameHeader, this.config); - - // continuation opcode except for first frame. - currentFrame.opcode = (i === 1) ? frame.opcode : 0x00; - - // fin set on last frame only - currentFrame.fin = (i === numFragments); - - // length is likely to be shorter on the last fragment - var currentLength = (i === numFragments) ? length - (threshold * (i-1)) : threshold; - var sliceStart = threshold * (i-1); - - // Slice the right portion of the original payload - currentFrame.binaryPayload = frame.binaryPayload.slice(sliceStart, sliceStart + currentLength); - - this.sendFrame(currentFrame, sentCallback); - } + + var numFragments = Math.ceil(length / threshold); + var sentFragments = 0; + var sentCallback = function fragmentSentCallback(err) { + if (err) { + if (typeof cb === 'function') { + // pass only the first error + cb(err); + cb = null; + } + return; + } + ++sentFragments; + if ((sentFragments === numFragments) && (typeof cb === 'function')) { + cb(); + } + }; + for (var i=1; i <= numFragments; i++) { + var currentFrame = new WebSocketFrame(this.maskBytes, this.frameHeader, this.config); + + // continuation opcode except for first frame. + currentFrame.opcode = (i === 1) ? frame.opcode : 0x00; + + // fin set on last frame only + currentFrame.fin = (i === numFragments); + + // length is likely to be shorter on the last fragment + var currentLength = (i === numFragments) ? length - (threshold * (i-1)) : threshold; + var sliceStart = threshold * (i-1); + + // Slice the right portion of the original payload + currentFrame.binaryPayload = frame.binaryPayload.slice(sliceStart, sliceStart + currentLength); + + this.sendFrame(currentFrame, sentCallback); + } + }); }; WebSocketConnection.prototype.sendCloseFrame = function(reasonCode, description, cb) { if (typeof(reasonCode) !== 'number') { reasonCode = WebSocketConnection.CLOSE_REASON_NORMAL; } - + this._debug('sendCloseFrame state: %s, reasonCode: %d, description: %s', this.state, reasonCode, description); - + if (this.state !== STATE_OPEN && this.state !== STATE_PEER_REQUESTED_CLOSE) { return; } - + var frame = new WebSocketFrame(this.maskBytes, this.frameHeader, this.config); frame.fin = true; frame.opcode = 0x08; // WebSocketOpcode.CONNECTION_CLOSE @@ -848,7 +932,7 @@ WebSocketConnection.prototype.sendCloseFrame = function(reasonCode, description, if (typeof(description) === 'string') { frame.binaryPayload = bufferFromString(description, 'utf8'); } - + this.sendFrame(frame, cb); this.socket.end(); }; @@ -868,13 +952,13 @@ module.exports = WebSocketConnection; function instrumentSocketForDebugging(connection, socket) { /* jshint loopfunc: true */ if (!connection._debug.enabled) { return; } - + var originalSocketEmit = socket.emit; socket.emit = function(event) { connection._debug('||| Socket Event \'%s\'', event); originalSocketEmit.apply(this, arguments); }; - + for (var key in socket) { if ('function' !== typeof(socket[key])) { continue; } if (['emit'].indexOf(key) !== -1) { continue; } diff --git a/package.json b/package.json index 96bd0136..727841bd 100644 --- a/package.json +++ b/package.json @@ -30,7 +30,9 @@ "debug": "^2.2.0", "gulp": "^4.0.2", "nan": "^2.3.3", + "permessage-deflate": "^0.1.7", "typedarray-to-buffer": "^3.1.5", + "websocket-extensions": "^0.1.3", "yaeti": "^0.0.6" }, "devDependencies": {