diff --git a/lib/handlers/wsHandler.js b/lib/handlers/wsHandler.js new file mode 100644 index 000000000..53c0c6717 --- /dev/null +++ b/lib/handlers/wsHandler.js @@ -0,0 +1,236 @@ +'use strict'; + +const co = require('co'); +const WebSocket = require('ws'); +const logUtil = require('../log'); + +/** + * construct the request headers based on original connection, + * but delete the `sec-websocket-*` headers as they are already consumed by AnyProxy + */ +function getNoWsHeaders(headers) { + const originHeaders = Object.assign({}, headers); + + Object.keys(originHeaders).forEach((key) => { + // if the key matchs 'sec-websocket', delete it + if (/sec-websocket/ig.test(key)) { + delete originHeaders[key]; + } + }); + + delete originHeaders.connection; + delete originHeaders.upgrade; + return originHeaders; +} + +/** + * get request info from the ws client + * @param @required wsClient the ws client of WebSocket + */ +function getWsReqInfo(wsReq) { + const headers = wsReq.headers || {}; + const host = headers.host; + const hostname = host.split(':')[0]; + const port = host.split(':')[1]; + // TODO 如果是windows机器,url是不是全路径?需要对其过滤,取出 + const path = wsReq.url || '/'; + const isEncript = wsReq.connection && wsReq.connection.encrypted; + + return { + url: `${isEncript ? 'wss' : 'ws'}://${hostname}:${port}${path}`, + headers: headers, // the full headers of origin ws connection + noWsHeaders: getNoWsHeaders(headers), + secure: Boolean(isEncript), + hostname: hostname, + port: port, + path: path + }; +} + +/** + * When the source ws is closed, we need to close the target websocket. + * If the source ws is normally closed, that is, the code is reserved, we need to transfrom them + * @param {object} event CloseEvent + */ +const getCloseFromOriginEvent = (closeEvent) => { + const code = closeEvent.code || ''; + const reason = closeEvent.reason || ''; + let targetCode = ''; + let targetReason = ''; + if (code >= 1004 && code <= 1006) { + targetCode = 1000; // normal closure + targetReason = `Normally closed. The origin ws is closed at code: ${code} and reason: ${reason}`; + } else { + targetCode = code; + targetReason = reason; + } + + return { + code: targetCode, + reason: targetReason + }; +} + +/** + * get a websocket event handler + * @param @required {object} wsClient + */ +function handleWs(userRule, recorder, wsClient, wsReq) { + const self = this; + let resourceInfoId = -1; + const resourceInfo = { + wsMessages: [] // all ws messages go through AnyProxy + }; + const clientMsgQueue = []; + const serverInfo = getWsReqInfo(wsReq); + // proxy-layer websocket client + const proxyWs = new WebSocket(serverInfo.url, '', { + rejectUnauthorized: !self.dangerouslyIgnoreUnauthorized, + headers: serverInfo.noWsHeaders + }); + + if (recorder) { + Object.assign(resourceInfo, { + host: serverInfo.hostname, + method: 'WebSocket', + path: serverInfo.path, + url: serverInfo.url, + req: wsReq, + startTime: new Date().getTime() + }); + resourceInfoId = recorder.appendRecord(resourceInfo); + } + + /** + * store the messages before the proxy ws is ready + */ + const sendProxyMessage = (finalMsg) => { + const message = finalMsg.data; + if (proxyWs.readyState === 1) { + // if there still are msg queue consuming, keep it going + if (clientMsgQueue.length > 0) { + clientMsgQueue.push(message); + } else { + proxyWs.send(message); + } + } else { + clientMsgQueue.push(message); + } + }; + + /** + * consume the message in queue when the proxy ws is not ready yet + * will handle them from the first one-by-one + */ + const consumeMsgQueue = () => { + while (clientMsgQueue.length > 0) { + const message = clientMsgQueue.shift(); + proxyWs.send(message); + } + }; + + /** + * consruct a message Record from message event + * @param @required {object} finalMsg based on the MessageEvent from websockt.onmessage + * @param @required {boolean} isToServer whether the message is to or from server + */ + const recordMessage = (finalMsg, isToServer) => { + const message = { + time: Date.now(), + message: finalMsg.data, + isToServer: isToServer + }; + + // resourceInfo.wsMessages.push(message); + recorder && recorder.updateRecordWsMessage(resourceInfoId, message); + }; + + /** + * prepare messageDetail object for intercept hooks + * @param {object} messageEvent + * @returns {object} + */ + const prepareMessageDetail = (messageEvent) => { + return { + requestOptions: { + port: serverInfo.port, + hostname: serverInfo.hostname, + path: serverInfo.path, + secure: serverInfo.secure, + }, + url: serverInfo.url, + data: messageEvent.data, + }; + }; + + proxyWs.onopen = () => { + consumeMsgQueue(); + }; + + // this event is fired when the connection is build and headers is returned + proxyWs.on('upgrade', (response) => { + resourceInfo.endTime = new Date().getTime(); + const headers = response.headers; + resourceInfo.res = { //construct a self-defined res object + statusCode: response.statusCode, + headers: headers, + }; + + resourceInfo.statusCode = response.statusCode; + resourceInfo.resHeader = headers; + resourceInfo.resBody = ''; + resourceInfo.length = resourceInfo.resBody.length; + + recorder && recorder.updateRecord(resourceInfoId, resourceInfo); + }); + + proxyWs.onerror = (e) => { + // https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent#Status_codes + wsClient.close(1001, e.message); + proxyWs.close(1001); + }; + + proxyWs.onmessage = (event) => { + co(function *() { + const modifiedMsg = (yield userRule.beforeSendWsMessageToClient(prepareMessageDetail(event))) || {}; + const finalMsg = { + data: modifiedMsg.data || event.data, + }; + recordMessage(finalMsg, false); + wsClient.readyState === 1 && wsClient.send(finalMsg.data); + }); + }; + + proxyWs.onclose = (event) => { + logUtil.debug(`proxy ws closed with code: ${event.code} and reason: ${event.reason}`); + const targetCloseInfo = getCloseFromOriginEvent(event); + wsClient.readyState !== 3 && wsClient.close(targetCloseInfo.code, targetCloseInfo.reason); + }; + + wsClient.onmessage = (event) => { + co(function *() { + const modifiedMsg = (yield userRule.beforeSendWsMessageToServer(prepareMessageDetail(event))) || {}; + const finalMsg = { + data: modifiedMsg.data || event.data, + }; + recordMessage(finalMsg, true); + sendProxyMessage(finalMsg); + }); + }; + + wsClient.onclose = (event) => { + logUtil.debug(`original ws closed with code: ${event.code} and reason: ${event.reason}`); + const targetCloseInfo = getCloseFromOriginEvent(event); + proxyWs.readyState !== 3 && proxyWs.close(targetCloseInfo.code, targetCloseInfo.reason); + }; +} + +module.exports = function getWsHandler(userRule, recorder, wsClient, wsReq) { + try { + handleWs.call(this, userRule, recorder, wsClient, wsReq); + } catch (e) { + logUtil.debug('WebSocket Proxy Error:' + e.message); + logUtil.debug(e.stack); + console.error(e); + } +} diff --git a/lib/requestHandler.js b/lib/requestHandler.js index 012a3edbd..16ec93adc 100644 --- a/lib/requestHandler.js +++ b/lib/requestHandler.js @@ -11,10 +11,10 @@ const http = require('http'), Stream = require('stream'), logUtil = require('./log'), co = require('co'), - WebSocket = require('ws'), HttpsServerMgr = require('./httpsServerMgr'), brotliTorb = require('brotli'), - Readable = require('stream').Readable; + Readable = require('stream').Readable, + getWsHandler = require('./handlers/wsHandler'); const requestErrorHandler = require('./requestErrorHandler'); @@ -202,55 +202,6 @@ function fetchRemoteResponse(protocol, options, reqData, config) { }); } -/** -* get request info from the ws client, includes: - host - port - path - protocol ws/wss - - @param @required wsClient the ws client of WebSocket -* -*/ -function getWsReqInfo(wsReq) { - const headers = wsReq.headers || {}; - const host = headers.host; - const hostName = host.split(':')[0]; - const port = host.split(':')[1]; - - // TODO 如果是windows机器,url是不是全路径?需要对其过滤,取出 - const path = wsReq.url || '/'; - - const isEncript = wsReq.connection && wsReq.connection.encrypted; - /** - * construct the request headers based on original connection, - * but delete the `sec-websocket-*` headers as they are already consumed by AnyProxy - */ - const getNoWsHeaders = () => { - const originHeaders = Object.assign({}, headers); - const originHeaderKeys = Object.keys(originHeaders); - originHeaderKeys.forEach((key) => { - // if the key matchs 'sec-websocket', delete it - if (/sec-websocket/ig.test(key)) { - delete originHeaders[key]; - } - }); - - delete originHeaders.connection; - delete originHeaders.upgrade; - return originHeaders; - } - - - return { - headers: headers, // the full headers of origin ws connection - noWsHeaders: getNoWsHeaders(), - hostName: hostName, - port: port, - path: path, - protocol: isEncript ? 'wss' : 'ws' - }; -} /** * get a request handler for http/https server * @@ -689,162 +640,7 @@ function getConnectReqHandler(userRule, recorder, httpsServerMgr) { } } -/** -* get a websocket event handler - @param @required {object} wsClient -*/ -function getWsHandler(userRule, recorder, wsClient, wsReq) { - const self = this; - try { - let resourceInfoId = -1; - const resourceInfo = { - wsMessages: [] // all ws messages go through AnyProxy - }; - const clientMsgQueue = []; - const serverInfo = getWsReqInfo(wsReq); - const wsUrl = `${serverInfo.protocol}://${serverInfo.hostName}:${serverInfo.port}${serverInfo.path}`; - const proxyWs = new WebSocket(wsUrl, '', { - rejectUnauthorized: !self.dangerouslyIgnoreUnauthorized, - headers: serverInfo.noWsHeaders - }); - - if (recorder) { - Object.assign(resourceInfo, { - host: serverInfo.hostName, - method: 'WebSocket', - path: serverInfo.path, - url: wsUrl, - req: wsReq, - startTime: new Date().getTime() - }); - resourceInfoId = recorder.appendRecord(resourceInfo); - } - - /** - * store the messages before the proxy ws is ready - */ - const sendProxyMessage = (event) => { - const message = event.data; - if (proxyWs.readyState === 1) { - // if there still are msg queue consuming, keep it going - if (clientMsgQueue.length > 0) { - clientMsgQueue.push(message); - } else { - proxyWs.send(message); - } - } else { - clientMsgQueue.push(message); - } - } - - /** - * consume the message in queue when the proxy ws is not ready yet - * will handle them from the first one-by-one - */ - const consumeMsgQueue = () => { - while (clientMsgQueue.length > 0) { - const message = clientMsgQueue.shift(); - proxyWs.send(message); - } - } - - /** - * When the source ws is closed, we need to close the target websocket. - * If the source ws is normally closed, that is, the code is reserved, we need to transfrom them - */ - const getCloseFromOriginEvent = (event) => { - const code = event.code || ''; - const reason = event.reason || ''; - let targetCode = ''; - let targetReason = ''; - if (code >= 1004 && code <= 1006) { - targetCode = 1000; // normal closure - targetReason = `Normally closed. The origin ws is closed at code: ${code} and reason: ${reason}`; - } else { - targetCode = code; - targetReason = reason; - } - - return { - code: targetCode, - reason: targetReason - } - } - - /** - * consruct a message Record from message event - * @param @required {event} messageEvent the event from websockt.onmessage - * @param @required {boolean} isToServer whether the message is to or from server - * - */ - const recordMessage = (messageEvent, isToServer) => { - const message = { - time: Date.now(), - message: messageEvent.data, - isToServer: isToServer - }; - - // resourceInfo.wsMessages.push(message); - recorder && recorder.updateRecordWsMessage(resourceInfoId, message); - }; - - proxyWs.onopen = () => { - consumeMsgQueue(); - } - - // this event is fired when the connection is build and headers is returned - proxyWs.on('upgrade', (response) => { - resourceInfo.endTime = new Date().getTime(); - const headers = response.headers; - resourceInfo.res = { //construct a self-defined res object - statusCode: response.statusCode, - headers: headers, - }; - - resourceInfo.statusCode = response.statusCode; - resourceInfo.resHeader = headers; - resourceInfo.resBody = ''; - resourceInfo.length = resourceInfo.resBody.length; - - recorder && recorder.updateRecord(resourceInfoId, resourceInfo); - }); - - proxyWs.onerror = (e) => { - // https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent#Status_codes - wsClient.close(1001, e.message); - proxyWs.close(1001); - } - - proxyWs.onmessage = (event) => { - recordMessage(event, false); - wsClient.readyState === 1 && wsClient.send(event.data); - } - - proxyWs.onclose = (event) => { - logUtil.debug(`proxy ws closed with code: ${event.code} and reason: ${event.reason}`); - const targetCloseInfo = getCloseFromOriginEvent(event); - wsClient.readyState !== 3 && wsClient.close(targetCloseInfo.code, targetCloseInfo.reason); - } - - wsClient.onmessage = (event) => { - recordMessage(event, true); - sendProxyMessage(event); - } - - wsClient.onclose = (event) => { - logUtil.debug(`original ws closed with code: ${event.code} and reason: ${event.reason}`); - const targetCloseInfo = getCloseFromOriginEvent(event); - proxyWs.readyState !== 3 && proxyWs.close(targetCloseInfo.code, targetCloseInfo.reason); - } - } catch (e) { - logUtil.debug('WebSocket Proxy Error:' + e.message); - logUtil.debug(e.stack); - console.error(e); - } -} - class RequestHandler { - /** * Creates an instance of RequestHandler. * diff --git a/lib/rule_default.js b/lib/rule_default.js index b3d29e9f3..c4e2a57d6 100644 --- a/lib/rule_default.js +++ b/lib/rule_default.js @@ -78,4 +78,28 @@ module.exports = { *onClientSocketError(requestDetail, error) { return null; }, + + /** + * + * @param {object} messageDetail + * @param {string|buffer} messageDetail.data + * @param {string} messageDetail.url + * @param {object} messageDetail.requestOptions + * @returns + */ + *beforeSendWsMessageToServer(messageDetail) { + return null; + }, + + /** + * + * @param {object} messageDetail + * @param {string|buffer} messageDetail.data + * @param {string} messageDetail.url + * @param {object} messageDetail.requestOptions + * @returns + */ + *beforeSendWsMessageToClient(messageDetail) { + return null; + } }; diff --git a/test/server/server.js b/test/server/server.js index 1dff4e0f3..3ac9db5c2 100644 --- a/test/server/server.js +++ b/test/server/server.js @@ -80,8 +80,11 @@ function KoaServer() { self.requestRecordMap[key] = { headers: wsReq.headers, - body: '' + body: '', + messages: [], } + + return self.requestRecordMap[key]; }; this.start(); @@ -280,18 +283,20 @@ KoaServer.prototype.createWsServer = function (httpServer) { path: '/test/socket' }); wsServer.on('connection', (ws, wsReq) => { - const self = this; - self.logWsRequest(wsReq); - const messageObj = { + const logRecord = this.logWsRequest(wsReq); + + ws.send(JSON.stringify({ type: 'initial', content: 'default message' - }; + })); - ws.send(JSON.stringify(messageObj)); ws.on('message', message => { printLog('message from request socket: ' + message); - self.handleRecievedMessage(ws, message); + this.handleRecievedMessage(ws, message); + logRecord.messages.push(message); }); + + ws.on('error', e => console.error('error happened in websocket server', e)); }) }; @@ -322,7 +327,6 @@ KoaServer.prototype.start = function () { this.httpServer = app.listen(DEFAULT_PORT); this.createWsServer(this.httpServer); - printLog('HTTP is now listening on port :' + DEFAULT_PORT); certMgr.getCertificate('localhost', (error, keyContent, crtContent) => { @@ -336,19 +340,7 @@ KoaServer.prototype.start = function () { }, app.callback()); // create wss server - const wss = new WebSocketServer({ - server: self.httpsServer - }); - - wss.on('connection', (ws, wsReq) => { - self.logWsRequest(wsReq); - ws.on('message', (message) => { - printLog('received in wss: ' + message); - self.handleRecievedMessage(ws, message); - }); - }); - - wss.on('error', e => console.error('error happened in wss:%s', e)); + this.createWsServer(self.httpsServer); self.httpsServer.listen(HTTPS_PORT); diff --git a/test/spec_rule/rule/rule_replace_ws_message.js b/test/spec_rule/rule/rule_replace_ws_message.js new file mode 100644 index 000000000..67e785e30 --- /dev/null +++ b/test/spec_rule/rule/rule_replace_ws_message.js @@ -0,0 +1,26 @@ +module.exports = { + *summary() { + return 'The rule to replace websocket message'; + }, + + *beforeSendWsMessageToClient(requestDetail) { + const message = requestDetail.data; + try { + const messageObject = JSON.parse(message); + if (messageObject.type === 'onMessage') { + messageObject.content = 'replaced by beforeSendWsMessageToClient'; + return { + data: JSON.stringify(messageObject), + } + } + } catch (err) { /* ignore error */ } + + return null; + }, + + *beforeSendWsMessageToServer() { + return { + data: 'replaced by beforeSendWsMessageToServer', + }; + }, +}; diff --git a/test/spec_rule/rule_replace_ws_message_spec.js b/test/spec_rule/rule_replace_ws_message_spec.js new file mode 100644 index 000000000..48e292d09 --- /dev/null +++ b/test/spec_rule/rule_replace_ws_message_spec.js @@ -0,0 +1,69 @@ +const async = require('async'); +const ProxyServerUtil = require('../util/ProxyServerUtil.js'); +const TestServer = require('../server/server.js'); +const { printLog } = require('../util/CommonUtil.js'); +const { proxyWs, generateWsUrl } = require('../util/HttpUtil.js'); +const rule = require('./rule/rule_replace_ws_message'); + +describe('Rule to replace the websocket message', () => { + let testServer = null; + let proxyServer = null; + + beforeAll((done) => { + jasmine.DEFAULT_TIMEOUT_INTERVAL = 50000; + printLog('Start server for rule_replace_ws_message_spec'); + + testServer = new TestServer(); + proxyServer = ProxyServerUtil.proxyServerWithRule(rule); + + setTimeout(done, 2000); + }); + + afterAll(() => { + testServer && testServer.close(); + proxyServer && proxyServer.close(); + printLog('Close server for rule_replace_ws_message_spec'); + }); + + it('should replace websocket message from server', (done) => { + async.mapSeries([ + { scheme: 'ws', masked: false }, + { scheme: 'ws', masked: true }, + { scheme: 'wss', masked: false }, + { scheme: 'wss', masked: true }, + ], (unit, callback) => { + const url = generateWsUrl(unit.scheme, '/test/socket'); + const wsClient = proxyWs(url); + + wsClient.on('open', () => { + wsClient.send('test', unit.masked); + }); + + wsClient.on('message', (message) => { + // test beforeSendWsMessageToServer + const requestRecord = testServer.getProxyRequestRecord(url); + expect(requestRecord.messages[0]).toBe('replaced by beforeSendWsMessageToServer'); + + try { + const result = JSON.parse(message); + if (result.type === 'onMessage') { + // test beforeSendWsMessageToClient + expect(result.content).toBe('replaced by beforeSendWsMessageToClient'); + callback(); + } + } catch (err) { /* ignore error */ } + }); + + wsClient.on('error', (err) => { + printLog('Error happened in proxy websocket'); + callback(err); + }); + }, (err) => { + if (err) { + done.fail(err); + } else { + done(); + } + }); + }); +}); diff --git a/test/util/CommonUtil.js b/test/util/CommonUtil.js index 1c4b7cf0b..602f34ed5 100644 --- a/test/util/CommonUtil.js +++ b/test/util/CommonUtil.js @@ -6,7 +6,7 @@ const color = require('colorful'); function _isDeepEqual(source, target) { // if the objects are Array - if (source.constructor === Array && target.constructor === Array) { + if (Array.isArray(source) && Array.isArray(target)) { if (source.length !== target.length) { return false; }