Skip to content

Commit

Permalink
various fixes for TCP bridging
Browse files Browse the repository at this point in the history
* flow control in tcp->amqp bridge
* improved handling of close
* handle sender_close on server tunnel correctly
  • Loading branch information
grs committed Sep 18, 2019
1 parent df71e9a commit 82828a7
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 51 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM enmasseproject/nodejs-base:6
FROM registry.access.redhat.com/ubi8/nodejs-10

RUN mkdir -p /opt/app-root/
WORKDIR /opt/app-root/
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.deployer
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM enmasseproject/nodejs-base:6
FROM registry.access.redhat.com/ubi8/nodejs-10

RUN mkdir -p /opt/app-root/
WORKDIR /opt/app-root/
Expand Down
59 changes: 34 additions & 25 deletions lib/bridges.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ var net = require('net');
var rhea = require('rhea');
var url_parse = require('url').parse;

var log = require('./log.js').logger();
var rclient = require('./request.js');
var tunnel = require('./tunnel.js');

function AmqpToHttpBridge(address, host, port) {
console.log('Created AMQP to HTTP bridge %s => %s:%s', address, host, port);
log.info('Created AMQP to HTTP bridge %s => %s:%s', address, host, port);
this.host = host;
this.port = port;
this.address = address;
Expand Down Expand Up @@ -50,7 +51,7 @@ AmqpToHttpBridge.prototype.incoming = function (context) {
headers: headers
};
var request = http.request(options, function (response) {
console.log('%s: %s', url, response.statusCode);
log.info('%s: %s', url, response.statusCode);
var message_out = {
to: context.message.reply_to,
correlation_id: context.message.correlation_id,
Expand All @@ -68,7 +69,7 @@ AmqpToHttpBridge.prototype.incoming = function (context) {
response.on('data', function (chunk) { message_out.body += chunk; });
response.on('end', function () {
context.delivery.accept();
console.log('server sending reply: %j', message_out);
log.info('server sending reply: %j', message_out);
context.connection.send(message_out);
});
});
Expand All @@ -85,7 +86,7 @@ AmqpToHttpBridge.prototype.stop = function () {
};

function HttpToAmqpBridge(port, address) {
console.log('Created HTTP to AMQP bridge %s => %s', port, address);
log.info('Created HTTP to AMQP bridge %s => %s', port, address);
this.port = port;
this.address = address;
var container = rhea.create_container({enable_sasl_external:true});
Expand All @@ -95,15 +96,15 @@ function HttpToAmqpBridge(port, address) {
//listen for http requests
this.server = http.createServer(this.request.bind(this));
this.server.listen(this.port, '0.0.0.0');
console.log('listening for http on %s', this.port);
log.info('listening for http on %s', this.port);
}

HttpToAmqpBridge.prototype.request = function (request, response) {
var self = this;
var url = url_parse(request.url);
var path = url.pathname;
var address = request.headers.host ? request.headers.host.split(':')[0] + path : url_parse(request.url);
console.log('outgoing request %s (%s)', request.headers.host, address);
log.info('outgoing request %s (%s)', request.headers.host, address);

var body = '';
request.on('data', function (data) { body += data; });
Expand All @@ -123,9 +124,9 @@ HttpToAmqpBridge.prototype.request = function (request, response) {
}
}

console.log('client sending message: %s %s', message_out.subject, message_out.to);
log.info('client sending message: %s %s', message_out.subject, message_out.to);
self.client.request(message_out).then(function (message_in) {
console.log('got reply for outbound request: %s %s', message_in.subject, message_in.to);
log.info('got reply for outbound request: %s %s', message_in.subject, message_in.to);
for (var key in message_in.application_properties) {
response.setHeader(key, message_in.application_properties[key]);
}
Expand All @@ -143,7 +144,7 @@ HttpToAmqpBridge.prototype.request = function (request, response) {
};

function AmqpToTcpBridge(address, host, port) {
console.log('Created AMQP to TCP bridge %s => %s:%s', address, host, port);
log.info('Created AMQP to TCP bridge %s => %s:%s', address, host, port);
this.address = address;
this.host = host;
this.port = port;
Expand All @@ -154,11 +155,11 @@ function AmqpToTcpBridge(address, host, port) {

this.connection.on('connection_open', this.on_connection_open.bind(this));
this.connection.on('receiver_open', this.on_receiver_open.bind(this));
this.mgmt_client = rclient.create(this.connection, '$management');
}

AmqpToTcpBridge.prototype.on_connection_open = function (context) {
//send mgmt request to create link route for address
this.mgmt_client = rclient.create(this.connection, '$management');
var props = {
'operation': 'CREATE',
'type': 'org.apache.qpid.dispatch.router.connection.linkRoute',
Expand All @@ -171,22 +172,23 @@ AmqpToTcpBridge.prototype.on_connection_open = function (context) {
var self = this;
this.mgmt_client.request({application_properties:props,body:definition}).then(function (result) {
if (result.application_properties.statusCode >= 200 && result.application_properties.statusCode < 300) {
console.info('[%s] created connection scoped link route', self.container.id);
log.info('[%s] created connection scoped link route', self.container.id);
} else {
console.error('[%s] failed to create connection scoped link route: %s [%s]', self.container.id, result.application_properties.statusDescription, result.application_properties.statusCode);
log.error('[%s] failed to create connection scoped link route: %s [%s]', self.container.id, result.application_properties.statusDescription, result.application_properties.statusCode);
}
}).catch(function (error) {
console.error('[%s] failed to create connection scoped link route: %j', self.container.id, error);
log.error('[%s] failed to create connection scoped link route: %j', self.container.id, error);
});

};

AmqpToTcpBridge.prototype.on_receiver_open = function (context) {
if (context.receiver === this.mgmt_client.receiver) return;
log.info('[%s] receiver attached', this.container.id);
var socket = net.connect({host: this.host, port: this.port});
var self = this;
socket.on('connect', function () {
console.log('[%s] socket connected to %s:%s', self.container.id, self.host, self.port);
log.info('[%s] socket connected to %s:%s', self.container.id, self.host, self.port);
tunnel.server(context.receiver, socket);
});
};
Expand All @@ -196,41 +198,48 @@ AmqpToTcpBridge.prototype.stop = function () {
};

function TcpToAmqpBridge(port, address) {
console.log('Created TCP to AMQP bridge %s => %s', port, address);
log.info('Created TCP to AMQP bridge %s => %s', port, address);
this.port = port;
this.address = address;

this.container = rhea.create_container({enable_sasl_external:true});
this.container = rhea.create_container({enable_sasl_external:true, reconnect:false});
this.container.id = process.env.HOSTNAME + '_tcp:' + this.port + '=>amqp:' + address;
this.server = net.createServer(this.incoming_connection.bind(this));
this.server.on('error', function (e) {
console.error(err);
console.error(e);
});
var id = this.container.id
var self = this;
this.server.listen(this.port, function () {
console.log('[%s] listening on %s', id, self.server.address().port);
log.info('[%s] listening on %s', id, self.server.address().port);
});
}

function socket_to_string(socket) {
return socket.remoteAddress + ':' + socket.remotePort;
}

TcpToAmqpBridge.prototype.incoming_connection = function (socket) {
log.info('[%s] socket accepted %s', this.container.id, socket_to_string(socket));
socket.pause();
var connection = this.container.connect();
var self = this;
var client = tunnel.client(connection, this.address, function (data) {
var client = tunnel.client(connection, this.address, socket_to_string(socket), function (data) {
socket.write(data);
}, function (error) {
console.log('[%s] tunnel disconnected', self.container.id);
console.log(error);
log.info('[%s] tunnel ended %s %s', self.container.id, error, socket_to_string(socket));
socket.resume();
socket.end();
}, function (ready) {
log.info('[%s] socket %s %s', self.container.id, (ready ? "resumed" : "paused"), socket_to_string(socket));
if (ready) socket.resume();
else socket.pause();
});

console.log('[%s] socket accepted', this.container.id);
socket.on('data', function (data) {
client.write({body:data});
});
var self = this;
socket.on('end', function () {
console.log('[%s] socket disconnected', self.container.id);
log.info('[%s] socket disconnected %s', self.container.id, socket_to_string(socket));
client.close();
});
};
Expand Down
Loading

0 comments on commit 82828a7

Please sign in to comment.