diff --git a/config/common.gypi b/config/common.gypi index 85489305..16f5c9fa 100644 --- a/config/common.gypi +++ b/config/common.gypi @@ -17,6 +17,7 @@ "miniz_path": "../deps/miniz", "miniz_inc_path": "../deps/miniz-inc", "node_libs_path": "../deps/node-libs", + "npm_bin_path": "../node_modules/.bin", "approxidate_path": "../deps/approxidate", "tools_path": "../tools", 'enable_ssl%': 0, diff --git a/config/libcolony.gyp b/config/libcolony.gyp index 619ba549..45f61ec5 100644 --- a/config/libcolony.gyp +++ b/config/libcolony.gyp @@ -126,6 +126,19 @@ '<(SHARED_INTERMEDIATE_DIR)/<(_target_name).c' ], 'actions': [ + { + 'action_name': 'bundle_streamplex', + 'variables': { + 'main_file': ' START", new Date()); + tls.connect({host:PROXY_HOST, port:PROXY_PORT, proxy:false, ca:(PROXY_CERT && [PROXY_CERT])}, function () { + var proxySocket = this, + tunnel = streamplex(streamplex.B_SIDE); + tunnel.pipe(proxySocket).pipe(tunnel); + proxySocket.on('error', shutdownTunnel); + proxySocket.on('close', shutdownTunnel); + proxySocket.on('error', cb); + + var idleTimeout; + tunnel.on('inactive', function () { + if (_PROXY_DBG) console.log("TUNNEL -> inactive", new Date()); + idleTimeout = setTimeout(shutdownTunnel, PROXY_IDLE); + }); + tunnel.on('active', function () { + if (_PROXY_DBG) console.log("TUNNEL -> active", new Date()); + clearTimeout(idleTimeout); + }); + + tunnel.sendMessage({token:PROXY_TOKEN}); + tunnel.once('message', function (d) { + if (_PROXY_DBG) console.log("TUNNEL: auth response?", d); + proxySocket.removeListener('error', cb); + if (!d.authed) cb(new Error("Authorization failed.")); + else cb(null, tunnel); + }); + function shutdownTunnel(e) { + if (_PROXY_DBG) console.log("TUNNEL -> STOP", new Date()); + tunnel.destroy(e); + if (this !== proxySocket) proxySocket.end(); + proxySocket.removeListener('close', shutdownTunnel); + } + }).on('error', cb); +} + +var tunnelKeeper = new events.EventEmitter(); + +tunnelKeeper.getTunnel = function (cb) { // CAUTION: syncronous callback! + if (this._tunnel) return cb(null, this._tunnel); + + var self = this; + if (!this._pending) createTunnel(function (e, tunnel) { + delete self._pending; + if (e) return self.emit('tunnel', e); + + self._tunnel = tunnel; + tunnel.on('close', function () { + self._tunnel = null; + }); + var streamProto = Object.create(ProxiedSocket.prototype); + streamProto._tunnel = tunnel; + tunnel._streamProto = streamProto; + self.emit('tunnel', null, tunnel); + }); + this._pending = true; + this.once('tunnel', cb); +}; + +var local_matchers = PROXY_LOCAL.split(' ').map(function (str) { + var parts = str.split('/'); + if (parts.length > 1) { + // IPv4 + mask + var bits = +parts[1], + mask = 0xFFFFFFFF << (32-bits) >>> 0, + base = net._ipStrToInt(parts[0]) & mask; // NOTE: left signed to match test below + return function (addr, host) { + return ((addr & mask) === base); + }; + } else if (str[0] === '.') { + // base including subdomains + str = str.slice(1); + return function (addr, host) { + var idx = host.lastIndexOf(str); + return (~idx && idx + str.length === host.length); + }; + } else return function (addr, host) { + // exact domain/address + return (host === str); + } +}); + +function protoForConnection(host, port, opts, cb) { // CAUTION: syncronous callback! + var addr = (net.isIPv4(host)) ? net._ipStrToInt(host) : null, + force_local = !PROXY_TOKEN || (opts._secure && !PROXY_TRUSTED) || (opts.proxy === false), + local = force_local || local_matchers.some(function (matcher) { return matcher(addr, host); }); + if (_PROXY_DBG) { + if (force_local) console.log( + "Forced to use local socket to \"%s\". [token: %s, secure/trusted: %s/%s, opts override: %s]", + host, Boolean(PROXY_TOKEN), Boolean(opts._secure), Boolean(PROXY_TRUSTED), (opts.proxy === false) + ); + else console.log("Proxied socket to \"%s\"? %s", host, !local); + } + if (local) cb(null, net._CC3KSocket.prototype); + else tunnelKeeper.getTunnel(function (e, tunnel) { + if (e) return cb(e); + cb(null, tunnel._streamProto); + }); +} + +/** + * ProxiedSocket + */ + +function ProxiedSocket(opts) { + if (!(this instanceof ProxiedSocket)) return new ProxiedSocket(opts); + net.Socket.call(this, opts); + this._tunnel = this._opts.tunnel; + this._setup(this._opts); +} +util.inherits(ProxiedSocket, net.Socket); + +ProxiedSocket.prototype._setup = function () { + var type = (this._secure) ? 'tls' : 'net'; + this._transport = this._tunnel.createStream(type); + + var self = this; + // TODO: it'd be great if we is-a substream instead of has-a… + this._transport.on('data', function (d) { + var more = self.push(d); + if (!more) self._transport.pause(); + }); + this._transport.on('end', function () { + self.push(null); + }); + + function reEmit(evt) { + self._transport.on(evt, function test() { + var args = Array.prototype.concat.apply([evt], arguments); + self.emit.apply(self, args); + }); + } + ['connect', 'secureConnect', 'error', 'timeout', 'close'].forEach(reEmit); +}; + +ProxiedSocket.prototype._read = function () { + this._transport.resume(); +}; +ProxiedSocket.prototype._write = function (buf, enc, cb) { + this._transport.write(buf, enc, cb); +}; + +ProxiedSocket.prototype._connect = function (port, host) { + this.remotePort = port; + this.remoteAddress = host; + this._transport.remoteEmit('_pls_connect', port, host); +}; + +ProxiedSocket.prototype.setTimeout = function (msecs, cb) { + this._transport.remoteEmit('_pls_timeout', msecs); + if (cb) { + if (msecs) this.once('timeout', cb); + else this.removeListener('timeout', cb); + } +}; + +ProxiedSocket.prototype.destroy = function () { + this._transport.destroy(); + this.end(); +}; + +exports._protoForConnection = protoForConnection; diff --git a/src/colony/modules/net.js b/src/colony/modules/net.js index e14cbfa4..e7e618c8 100644 --- a/src/colony/modules/net.js +++ b/src/colony/modules/net.js @@ -14,8 +14,7 @@ var tm = process.binding('tm'); var util = require('util'); var dns = require('dns'); -var Stream = require('stream'); -var tls = require('tls'); +var stream = require('stream'); /** * ip/helpers @@ -30,6 +29,13 @@ function isIP (host) { else return 0; } +function ipStrToInt(ip) { + var addr = ip.split('.').map(Number); + addr = ((addr[0] << 24) | (addr[1] << 16) | (addr[2] << 8) | addr[3]) >>> 0; + return addr; +} + + function isPipeName(s) { return util.isString(s) && toNumber(s) === false; } @@ -37,24 +43,88 @@ function isPipeName(s) { function toNumber(x) { return (x = Number(x)) >= 0 ? x : false; } /** - * TCPSocket + * Socket */ -function TCPSocket (socket, _secure) { - Stream.Duplex.call(this); +function Socket(opts) { + if (!(this instanceof Socket)) return new Socket(opts); + switch (typeof opts) { + case 'number': + opts = { fd: opts }; // Legacy interface. + break; + case 'undefined': + opts = {}; + break; + } + stream.Duplex.call(this, opts); - if (typeof socket === 'object') { - this.socket = socket.fd; - // TODO: respect readable/writable flags - if (socket.allowHalfOpen) console.warn("Ignoring allowHalfOpen option."); - } + this._opts = opts; + this._secure = opts._secure; + this._pendingCalls = []; +} +util.inherits(Socket, stream.Duplex); + +['_read', '_write', 'close', 'destroy', 'setTimeout', 'setNoDelay'].forEach(function (method) { + Socket.prototype[method] = function () { + this._pendingCalls.push({method:method, arguments:arguments}); + }; +}); + +Socket.prototype._doPending = function (proto) { + var self = this; + this._pendingCalls.forEach(function (call) { + proto[call.method].apply(self, call.arguments); + }); + delete this._pendingCalls; +}; + + +Socket.prototype.connect = function (opts, cb) { + // NOTE: imported here to avoid circular dependency + var net_proxied = require('_net_proxied'); + + if (typeof opts !== 'object') { + var args = normalizeConnectArgs(arguments, this._secure); + return Socket.prototype.connect.apply(this, args); + } + + if (cb && this._secure) this.once('secureConnect', cb); + else if (cb) this.once('connect', cb); + + var self = this, + port = +opts.port, + host = opts.host || "127.0.0.1"; + net_proxied._protoForConnection(host, port, this._opts, function (e, proto) { + if (e) return self.emit('error', e); + self.__proto__ = proto; // HACK: convert to "concrete" subclass here, now that we know necessary type + self._setup(self._opts); // pass original (constructor) opts + self._doPending(proto); + self._connect(port, host, cb); + }); + + return this; +}; - this._secure = _secure; + +/** + * TCPSocket + */ + +function TCPSocket (opts) { + if (!(this instanceof TCPSocket)) return new TCPSocket(opts); + Socket.call(this, opts); + this._setup(this._opts); +} +util.inherits(TCPSocket, Socket); + + +TCPSocket.prototype._setup = function (opts) { this._outgoing = []; this._sending = false; this._queueEnd = false; - this.socket = (socket === undefined) ? null : socket; - + // TODO: this.socket should be this._socket — it is not public! + this.socket = (this._opts.fd === undefined) ? null : this._opts.fd; + var self = this; self.on('finish', function () { @@ -72,9 +142,9 @@ function TCPSocket (socket, _secure) { } } process.on('tcp-close', this._closehandler) -} +}; + -util.inherits(TCPSocket, Stream.Duplex); TCPSocket._portsUsed = Object.create(null); @@ -88,52 +158,14 @@ TCPSocket._requestPort = function (port) { return port; }; -function normalizeConnectArgs(args) { - var options = {}; - - if (util.isObject(args[0])) { - // connect(options, [cb]) - options = args[0]; - } else if (isPipeName(args[0])) { - // connect(path, [cb]); - options.path = args[0]; - } else { - // connect(port, [host], [cb]) - options.port = args[0]; - if (util.isString(args[1])) { - options.host = args[1]; - } - } - - var cb = args[args.length - 1]; - return util.isFunction(cb) ? [options, cb] : [options]; -} - -TCPSocket.prototype.connect = function (/*options | [port], [host], [cb]*/) { +TCPSocket.prototype._connect = function (port, host) { var self = this; - - var args = normalizeConnectArgs(arguments); - var opts = args[0]; - if (opts.allowHalfOpen) console.warn("Ignoring allowHalfOpen option."); - var port = +opts.port; - var host = opts.host || "127.0.0.1"; - var cb = args[1]; self.remotePort = port; self.remoteAddress = host; // TODO: proper value for these? self.localPort = 0; self.localAddress = "0.0.0.0"; - - if (cb) { - if (self._secure) { - self.once('secureConnect', cb); - } - else { - self.once('connect', cb); - } - - } - + if (isIP(host)) { setUpConnection(host); } else { @@ -149,8 +181,8 @@ TCPSocket.prototype.connect = function (/*options | [port], [host], [cb]*/) { if (self.socket == null) { if (self._secure) { var custom_certs = null; - self._ssl_checkCerts = (opts.rejectUnauthorized !== false); - if (opts.ca) custom_certs = opts.ca.map(function (pem_data) { + self._ssl_checkCerts = (self._opts.rejectUnauthorized !== false); + if (self._opts.ca) custom_certs = self._opts.ca.map(function (pem_data) { // TODO: review PEM specs and axTLS needs; make more thorough if needed return Buffer(pem_data.toString().split('\n').filter(function (line) { return line && line.indexOf('-----') !== 0; @@ -179,10 +211,8 @@ TCPSocket.prototype.connect = function (/*options | [port], [host], [cb]*/) { var retries = 0; setImmediate(function doConnect() { - var addr = ip.split('.').map(Number); - addr = ((addr[0] << 24) | (addr[1] << 16) | (addr[2] << 8) | addr[3]) >>> 0; - - var ret = tm.tcp_connect(self.socket, addr, port); + var addr = ipStrToInt(ip), + ret = tm.tcp_connect(self.socket, addr, port); if (ret == -tm.ENETUNREACH) { // we're not connected to the internet self.emit('error', new Error("ENETUNREACH: Wifi is not connected")); @@ -284,6 +314,7 @@ TCPSocket.prototype.connect = function (/*options | [port], [host], [cb]*/) { } }; + var tls = require('tls'); // NOTE: imported here to avoid circular dependency! if (self._ssl_checkCerts && !tls.checkServerIdentity(host, self._ssl_cert)) { return self.emit('error', new Error('Hostname/IP doesn\'t match certificate\'s altnames')); } @@ -532,19 +563,49 @@ TCPSocket.prototype.setNoDelay = function (val) { if (val) console.warn("Ignoring call to setNoDelay. TCP_NODELAY socket option not supported."); }; -function connect (port, host, callback) { - var client = new TCPSocket(null); - TCPSocket.prototype.connect.apply(client, arguments); - return client; -}; -// HACK: this is a quick solution to the regressions introduced by 5fb859605b183b70b246328bff24f4e4f8b50dab -// a more complete solution is implemented in a different PR: c015017492980271fa583fce57d798de26a12dab -function _secureConnect (options, callback) { - var client = new TCPSocket(null, true); - TCPSocket.prototype.connect.apply(client, arguments); - return client; -}; +function normalizeConnectArgs(args, _secure) { + var options = {}; + + if (util.isObject(args[0])) { + // connect(options, [cb]) + options = args[0]; + } else if (isPipeName(args[0])) { + // connect(path, [cb]); + options.path = args[0]; + } else { + // connect(port, [host], [cb]) + options.port = args[0]; + if (util.isString(args[1])) { + options.host = args[1]; + } + } + + if (_secure) { + var listArgs = args; + if (util.isObject(listArgs[1])) { + options = util._extend(options, listArgs[1]); + } else if (util.isObject(listArgs[2])) { + options = util._extend(options, listArgs[2]); + } + } + + var cb = args[args.length - 1]; + return util.isFunction(cb) ? [options, cb] : [options]; +} + +function connect () { + var args = normalizeConnectArgs(arguments); + var s = new Socket(args[0]); + return Socket.prototype.connect.apply(s, args); +} + +function secureConnect () { + var args = normalizeConnectArgs(arguments, true); + args[0]._secure = true; + var s = new Socket(args[0]); + return Socket.prototype.connect.apply(s, args); +} /** @@ -603,7 +664,7 @@ TCPServer.prototype.listen = function (port, host, backlog, cb) { , port = _[2]; if (client >= 0) { - var clientsocket = new TCPSocket(client); + var clientsocket = new TCPSocket({fd:client}); clientsocket.connected = true; clientsocket.localAddress = self.localAddress; // TODO: https://forums.tessel.io/t/get-ip-address-of-tessel-in-code/203 clientsocket.localPort = self.localPort; @@ -654,9 +715,10 @@ function createServer (opts, onsocket) { exports.isIP = isIP; exports.isIPv4 = isIPv4; +exports._ipStrToInt = ipStrToInt; exports.connect = exports.createConnection = connect; -exports._secureConnect = _secureConnect; +exports._secureConnect = secureConnect; // TLS module uses this exports.createServer = createServer; -exports.Socket = TCPSocket; +exports.Socket = Socket; exports.Server = TCPServer; -exports._normalizeConnectArgs = normalizeConnectArgs; +exports._CC3KSocket = TCPSocket; // net-proxied module uses this diff --git a/src/colony/modules/tls.js b/src/colony/modules/tls.js index 291c518f..4f67d2d5 100644 --- a/src/colony/modules/tls.js +++ b/src/colony/modules/tls.js @@ -14,25 +14,8 @@ function NotImplementedException () { throw new Error('Not yet implemented.'); } -exports.connect = function connect () { - var arguments = normalizeConnectArgs(arguments); - var options = arguments[0]; - var callback = arguments[1]; - return net._secureConnect(options, callback); -} - -function normalizeConnectArgs(listArgs) { - var args = net._normalizeConnectArgs(listArgs); - var options = args[0]; - var cb = args[1]; - if (util.isObject(listArgs[1])) { - options = util._extend(options, listArgs[1]); - } else if (util.isObject(listArgs[2])) { - options = util._extend(options, listArgs[2]); - } - - return (cb) ? [options, cb] : [options]; -} +if (!net._secureConnect) throw Error("Circular import detected!"); +exports.connect = net._secureConnect; function checkServerIdentity (host, cert) { // Create regexp to much hostnames diff --git a/test/suite/http-proxied.js b/test/suite/http-proxied.js new file mode 100644 index 00000000..63825b89 --- /dev/null +++ b/test/suite/http-proxied.js @@ -0,0 +1,2 @@ +require("../wrap").setupProxy(); +require("./http.js"); diff --git a/test/suite/https-proxied.js b/test/suite/https-proxied.js new file mode 100644 index 00000000..dd2d7b9b --- /dev/null +++ b/test/suite/https-proxied.js @@ -0,0 +1,3 @@ +require("../wrap").setupProxy(); +process.env.PROXY_TRUSTED = true; +require("./https.js"); diff --git a/test/suite/https.js b/test/suite/https.js index 6d4419d7..a02c2d2d 100644 --- a/test/suite/https.js +++ b/test/suite/https.js @@ -3,7 +3,6 @@ var test = require('tinytap'), test('https', function (t) { var https = require('https') - console.log('imported'); https.get("https://tessel-httpbin.herokuapp.com/", function (res) { t.equal(res.statusCode, 200, 'https status code is 200') t.equal(res.connection.remotePort, 443, 'remote port is 443 for https') diff --git a/test/suite/net-proxied.js b/test/suite/net-proxied.js new file mode 100644 index 00000000..cb60c25e --- /dev/null +++ b/test/suite/net-proxied.js @@ -0,0 +1,2 @@ +require("../wrap").setupProxy(); +require("./net.js"); diff --git a/test/suite/net.js b/test/suite/net.js index 454cf654..ebc70b41 100644 --- a/test/suite/net.js +++ b/test/suite/net.js @@ -77,7 +77,7 @@ test('client-basic', function (t) { t.ok(client instanceof net.Socket, "returned socket"); client.on('connect', function () { t.pass("socket connected"); - client.write("GET / HTTP/1.1\nHost: tessel-httpbin.herokuapp.com\nAccept: text/plain\n\n"); + client.write("GET /ip HTTP/1.1\nHost: tessel-httpbin.herokuapp.com\nConnection: close\nAccept: text/plain\n\n"); }); client.on('error', function () { t.fail("socket error"); @@ -89,7 +89,7 @@ test('client-basic', function (t) { client.end(); }); client.on('end', function () { - t.ok(true, "socket closed"); + t.pass("socket closed"); t.end(); }); }); diff --git a/test/suite/tls-proxied.js b/test/suite/tls-proxied.js new file mode 100644 index 00000000..7a5dbc59 --- /dev/null +++ b/test/suite/tls-proxied.js @@ -0,0 +1,3 @@ +require("../wrap").setupProxy(); +process.env.PROXY_TRUSTED = true; +require("./tls.js"); diff --git a/test/suite/tls.js b/test/suite/tls.js index e3f4869f..50a554d7 100644 --- a/test/suite/tls.js +++ b/test/suite/tls.js @@ -10,17 +10,15 @@ var options = { }; var socket = tls.connect(options, function connected() { - tap.eq(true, true, 'connect callback is called'); + tap.ok(true, 'connect callback is called'); }); socket.once('secureConnect', function() { - tap.eq(true, true, 'secureConnect event is called'); + tap.ok(true, 'secureConnect event is called'); socket.write('GET / HTTP/1.1\nAccept: */*\nHost: www.google.com\nUser-Agent: HTTPie/0.7.2\n\n'); }); socket.once('data', function(data) { - tap.eq(data.length > 0, true, 'we got data back from google over a secure TCP socket'); + tap.ok(data.length > 0, 'we got data back from google over a secure TCP socket'); socket.destroy(); }); - - diff --git a/test/wrap.js b/test/wrap.js new file mode 100644 index 00000000..c050c2c1 --- /dev/null +++ b/test/wrap.js @@ -0,0 +1,27 @@ +// helper used to help rerun net/http/etc. tests under proxy + +exports.setupProxy = function () { + if (process.env.TM_API_KEY || process.env.PROXY_TOKEN) throw Error("Misconfiguration: neither TM_API_KEY nor PROXY_TOKEN should be set while running tests."); + + process.env.PROXY_IDLE = 1e3; // so tests don't stare at their toes for a minute and a half… + + // TODO: remove this dev code once env vars support merged and production proxy back available + process.env.PROXY_HOST = "localhost"; + process.env.PROXY_PORT = 5005; + process.env.PROXY_TOKEN = "DEV-CRED"; + try { + process.env.PROXY_CERT = require('fs').readFileSync("../proxy/config/public-cert.pem").toString(); + } catch (e) { + // HACK: `make test` runs under different `process.cwd()` than when trying an individual test directly… + process.env.PROXY_CERT = require('fs').readFileSync("../../../proxy/config/public-cert.pem").toString(); + } + //process.env._PROXY_DBG = true; + return; + + if (!process.env.TEST_TM_API_KEY) { + console.warn("Cannot run proxied version of tests unless TEST_TM_API_KEY environment variable is set."); + process.exit(0); + } + + process.env.PROXY_TOKEN = process.env.TEST_TM_API_KEY; +};