Skip to content

Commit

Permalink
Close problem or disconnected socket/provider asap.
Browse files Browse the repository at this point in the history
1. Avoid retry.
2. Close will be triggered in order: socket close > socket error >
heartbeat timeout.
3. Make sure we only close once.
  • Loading branch information
lsm committed Oct 23, 2015
1 parent 17184cb commit 248cb4f
Showing 1 changed file with 44 additions and 8 deletions.
52 changes: 44 additions & 8 deletions lib/rpc/axon.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,35 +22,67 @@ AxonAdapter.send = function(data) {
}

AxonAdapter.connect = function(provider) {
var self = this
var name = provider.name
var endpoint = 'tcp://' + provider.address + ':' + provider.rpcPort
debug('[%s] connect to endpoint "%s"', name, endpoint)

var socket = axon.socket('req')
var closing = false

function closeSocket() {
if (!closing) {
closing = true
socket.close()
self.onProviderDisconnect(provider)
debug('[%s] provider "%s" closed', name, endpoint)
}
}

var self = this
socket.on('close', function() {
debug('[%s] provider %s closed', name, endpoint)
self.onProviderDisconnect(provider)
debug('[%s] socket on close, provider "%s"', name, endpoint)
closeSocket()
})

socket.on('socket error', function(err) {
debug('[%s] socket on error [%s], provider "%s"', name, err.code, endpoint)
closeSocket()
})

socket.on('connect', function(sock) {
var closeListeners = sock.listeners('close')
sock.removeAllListeners('close')

closeListeners.unshift(function() {
debug('[%s] sock on close, provider "%s"', name, endpoint)
closeSocket()
})

closeListeners.forEach(function(listener) {
sock.on('close', listener)
})
})

socket.connect(endpoint, function() {
debug('[%s] connected to endpoint "%s"', name, endpoint)

// heartbeat
var hid
var lastPong = Date.now()

function heartbeat() {
var now = Date.now()
if (now - lastPong > 3000) {
debug('[%s] heartbeat timeout, close socket', name)
// timeout, disconnect socket
debug('[%s] heartbeat timeout, close socket provider "%s"', name, endpoint)
clearInterval(hid)
socket.close()
closeSocket()
} else {
socket.send('ping', function(msg) {
lastPong = Date.now()
// debug('[%s] client got ' + msg, name)
// debug('[%s] client got ' + msg, name)
if (msg === 'pong') {
lastPong = Date.now()
}
})
}
}
Expand All @@ -73,12 +105,16 @@ AxonAdapter.startServer = function(port, host) {

socket.on('message', function(msg, callback) {
if (msg === 'ping') {
// debug('server got ping')
callback('pong')
// debug('server got ping')
} else {
self.dispatch(msg, callback)
}
})

socket.on('connect', function(sock) {
debug('client connected from', sock._peername)
})
})

return promise
Expand Down

0 comments on commit 248cb4f

Please sign in to comment.