Skip to content

Commit

Permalink
Merge pull request #144 from flowforge/2483-keep-tunnel-connected
Browse files Browse the repository at this point in the history
Modify editor tunnel to reconnect to platform if connection drops
  • Loading branch information
Steve-Mcl authored Jul 14, 2023
2 parents 9a7044d + e18583c commit 7f1d883
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 72 deletions.
158 changes: 118 additions & 40 deletions lib/editor/tunnel.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,6 @@ function newWsConnection (url, /** @type {WebSocket.ClientOptions} */ options) {
return new WebSocket(url)
}

async function isWsConnectionReady (ws) {
const connection = new Promise((resolve, reject) => {
const timer = setInterval(() => {
if (ws.readyState === WebSocket.OPEN) {
clearInterval(timer)
resolve(true)
} else {
reject(new Error('Connection timeout'))
}
}, 2000)
})
return connection
}

class EditorTunnel {
constructor (config, options) {
// this.client = new WebSocketClientOLD()
Expand All @@ -37,6 +23,10 @@ class EditorTunnel {
this.config = config
this.options = options || {}

// How long to wait before attempting to reconnect. Start at 500ms - back
// off if connect fails
this.reconnectDelay = 500

const forgeURL = new URL(config.forgeURL)
forgeURL.protocol = forgeURL.protocol === 'http:' ? 'ws:' : 'wss:'
this.url = forgeURL.toString()
Expand Down Expand Up @@ -64,23 +54,29 @@ class EditorTunnel {
this.close()
}
const forgeWSEndpoint = `${this.url}api/v1/devices/${this.deviceId}/editor/comms/${this.options.token}`
info(`Connecting editor tunner to ${forgeWSEndpoint}`)
info(`Connecting editor tunnel to ${forgeWSEndpoint}`)

// * Enable Device Editor (Step 8) - (device->forge:WS) Initiate WS connection (with token)
const socket = newWsConnection(forgeWSEndpoint, {
headers: {
'x-access-token': this.options.token
}
})
socket.onopen = (evt) => {
socket.on('message', async (message) => {
// a message from the editor
info('Editor tunnel connected')
// Reset reconnectDelay
this.reconnectDelay = 500
this.socket.on('message', async (message) => {
// a message coming over the tunnel from a remote editor
const request = JSON.parse(message.toString('utf-8'))
if (request.ws) {
// This is a websocket packet to proxy.
if (request.id && request.url) {
// This is the initial connect request.
// A websocket related event
if (request.id !== undefined && request.url) {
// An editor has created a new comms connection.
// Create a corresponding local connection to the
// local runtime
const localWSEndpoint = `${thisTunnel.localWSProtocol}://127.0.0.1:${thisTunnel.port}/device-editor${request.url}`
debug(`Connecting local comms to ${localWSEndpoint}`)
debug(`[${request.id}] Connecting local comms to ${localWSEndpoint}`)

const tunnelledWSClient = newWsConnection(localWSEndpoint, { rejectUnauthorized: false })
thisTunnel.wsClients[request.id] = tunnelledWSClient
Expand All @@ -93,35 +89,74 @@ class EditorTunnel {
}
}
tunnelledWSClient._id = request.id // for debugging and tracking

tunnelledWSClient.on('open', () => {
debug(`[${request.id}] Local comms connected`)
tunnelledWSClient.on('message', (data) => {
// The runtime is sending a message to an editor
const sendData = {
id: request.id,
ws: true,
body: data.toString('utf-8')
}
socket.send(JSON.stringify(sendData))
// console.log(`[${request.id}] R>E`, sendData.body)
this.socket?.send(JSON.stringify(sendData))
})
// Now the local comms is connected, send anything
// that had got queued up whilst we were getting
// connected
while (tunnelledWSClient._messageQueue.length > 0) {
tunnelledWSClient.send(tunnelledWSClient._messageQueue.shift())
}
// tunnelledWSClient.send(JSON.stringify(request)) // TODO: send the request body sent initially?
})

tunnelledWSClient.on('close', (code, reason) => {
debug(`[${request.id}] Local comms connection closed code=${code} reason=${reason}`)
// WS to local node-red has closed. Send a notification
// to the platform so it can close the proxied editor
// websocket to match
this.socket?.send(JSON.stringify({
id: request.id,
ws: true,
closed: true
}))
thisTunnel.wsClients[request.id]?.removeAllListeners()
thisTunnel.wsClients[request.id] = null
})
tunnelledWSClient.on('error', (err) => {
console.error(err)
warn(`[${request.id}] Local comms connection error`)
warn(err)
thisTunnel.wsClients[request.id]?.close(1006, err.message)
thisTunnel.wsClients[request.id] = null
})
} else if (thisTunnel.wsClients[request.id]) {
thisTunnel.wsClients[request.id].sendOrQueue(request.body)
// A message relating to an existing comms connection
if (request.closed) {
// An editor has closed its websocket - so we should
// close the corresponding local connection
debug(`[${request.id}] Closing local comms connection`)
thisTunnel.wsClients[request.id].close()
} else {
// An editor has sent a message over the websocket
// - forward over the local connection
// console.log(`[${request.id}] E>R`, request.body)
const wsClient = thisTunnel.wsClients[request.id]
let body = request.body
if (/\/comms$/.test(wsClient.url)) {
if (/^{"auth":/.test(body)) {
// This is the comms auth packet. Substitute the active
// access token
body = `{"auth":"${this.options.token}"}`
}
}
wsClient.sendOrQueue(body)
}
} else {
warn(`[${request.id}] Unexpected editor comms packet ${JSON.stringify(request, null, 4)}`)
this.close(1006, 'Non-connect packet received for unknown connection id') // 1006 = Abnormal closure
}
} else {
// An http related event
const reqHeaders = { ...request.headers }
// add bearer token to the request headers
if (thisTunnel.options.token) {
Expand All @@ -133,7 +168,7 @@ class EditorTunnel {
const fullUrl = `${thisTunnel.localProtocol}://127.0.0.1:${thisTunnel.port}/device-editor${url}`
// ↓ useful for debugging but very noisy
// console.log('Making a request to:', fullUrl, 'x-access-token:', request.method, reqHeaders['x-access-token'])
debug(`proxy [${request.method}] ${fullUrl}`)
// debug(`proxy [${request.method}] ${fullUrl}`)
const options = {
headers: reqHeaders,
method: request.method,
Expand All @@ -144,20 +179,20 @@ class EditorTunnel {
options.https = { rejectUnauthorized: true }
}
got(fullUrl, options).then(response => {
debug(`proxy [${request.method}] ${fullUrl} : sending response`)
// debug(`proxy [${request.method}] ${fullUrl} : sending response: status ${response.statusCode}`)
// send response back to the forge
socket.send(JSON.stringify({
this.socket?.send(JSON.stringify({
id: request.id,
headers: response.headers,
body: response.rawBody,
status: response.statusCode
}))
}).catch(_err => {
debug(`proxy [${request.method}] ${fullUrl} : error ${_err.toString()}`)
// debug(`proxy [${request.method}] ${fullUrl} : error ${_err.toString()}`)
// ↓ useful for debugging but noisy due to .map files
// console.log(err)
// console.log(JSON.stringify(request))
socket.send(JSON.stringify({
this.socket?.send(JSON.stringify({
id: request.id,
body: undefined,
status: 404
Expand All @@ -166,41 +201,84 @@ class EditorTunnel {
}
})
}
socket.on('close', (code, reason) => {
socket.on('close', async (code, reason) => {
// The socket connection to the platform has closed
info(`Editor tunnel closed code=${code} reason=${reason}`)
this.close(code, reason)
socket.removeAllListeners()
this.socket = null
clearTimeout(this.reconnectTimeout)

// Assume we need to be reconnecting. If this close is due
// to a request from the platform to turn off editor access,
// .close will get called
const reconnectDelay = this.reconnectDelay
// Bump the delay for next time... 500ms, 1.5s, 4.5s, 10s 10s 10s...
this.reconnectDelay = Math.min(this.reconnectDelay * 3, 10000)
this.reconnectTimeout = setTimeout(() => {
this.connect()
}, reconnectDelay)
})
socket.on('error', (err) => {
warn(`Editor tunnel error: ${err}`)
console.warn('socket.error', err)
this.close(1006, err.message) // 1006 = Abnormal Closure
})
this.socket = socket
return !!(await isWsConnectionReady(this.socket))
return !!(await this.waitForConnection())
}

close (code, reason) {
code = code || 1000
reason = reason || 'Normal Closure'
// loop through each ws client and close its connection
// loop through each local comms ws client and close its connection
Object.keys(this.wsClients || {}).forEach(c => {
this.wsClients[c]?.close()
delete this.wsClients[c]
})

// // close the tunnel connection
// if (this.connection) {
// this.connection.close()
// this.connection.removeAllListeners()
// this.connection = null
// }
// close the socket
if (this.socket) {
this.socket.close()
// Remove the event listeners so we don't trigger the reconnect
// handling
this.socket.removeAllListeners()
info('Editor tunnel closed')
}
this.socket = null
// ensure any active timers are stopped
clearInterval(this.connectionReadyInterval)
clearTimeout(this.reconnectTimeout)
}

async waitForConnection () {
return new Promise((resolve, reject) => {
const startTime = Date.now()
clearInterval(this.connectionReadyInterval)
// Poll every 2 seconds, but timeout after 10
this.connectionReadyInterval = setInterval(() => {
if (this.socket) {
if (this.socket.readyState === WebSocket.OPEN) {
clearInterval(this.connectionReadyInterval)
resolve(true)
} else if (this.socket.readyState !== WebSocket.CONNECTING || Date.now() - startTime > 10000) {
// Stop polling if readyState is CLOSING/CLOSED, or we've been
// trying to connect to over 10s
if (this.socket.readyState === WebSocket.CONNECTING) {
// Timed out - close the socket
try {
this.socket.close()
} catch (err) {
}
}
clearInterval(this.connectionReadyInterval)
resolve(false)
}
} else {
clearInterval(this.connectionReadyInterval)
resolve(false)
}
}, 2000)
})
}
}

Expand Down
1 change: 1 addition & 0 deletions lib/mqtt.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class MQTTClient {
this.client = mqtt.connect(brokerURL, this.brokerConfig)

this.client.on('connect', () => {
info('MQTT connected')
this.client.publish(this.statusTopic, JSON.stringify(this.agent.getState()))
})
this.client.on('close', () => { })
Expand Down
62 changes: 30 additions & 32 deletions lib/template/template-settings.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,50 +8,48 @@ const { existsSync, readFileSync } = require('fs')
settings.editorTheme.header = settings.editorTheme.header || {}
settings.editorTheme.header.title = settings.editorTheme.header.title || `Device: ${process.env.FF_DEVICE_NAME}`

const authCache = {}

const auth = {
type: 'credentials', // the type of the auth
tokenHeader: 'x-access-token', // the header where node-red expects to find the token
tokens: function (token) {
return new Promise(function (resolve, reject) {
// call the endpoint to validate the token
const [prefix, deviceId] = (token + '').split('_')
if (prefix !== 'ffde' || !deviceId || !token) {
resolve(null)
return
tokens: async function (token) {
const [prefix, deviceId] = (token + '').split('_')
if (prefix !== 'ffde' || !deviceId || !token) {
return
}
// Check the local cache to see if this token has been verified in the
// last 30 seconds
if (authCache[token]) {
if (Date.now() - authCache[token].ts < 30000) {
return authCache[token].result
}
got.get(`${settings.flowforge.forgeURL}/api/v1/devices/${deviceId}/editor/token`, {
}
try {
const result = await got.get(`${settings.flowforge.forgeURL}/api/v1/devices/${deviceId}/editor/token`, {
timeout: 2000,
headers: {
'x-access-token': token,
'user-agent': 'FlowForge Device Agent Node-RED admin auth'
}
}).then((res) => {
const { username, permissions } = JSON.parse(res.body)
if (username && permissions) {
resolve({ username, permissions })
return
}
resolve(null)
}).catch((err) => {
console.error(err)
resolve(null)
})
})
},
users: function (username) {
return new Promise(function (resolve) {
resolve(null)
})
const { username, permissions } = JSON.parse(result.body)
if (username && permissions) {
// Cache the successful result
authCache[token] = {
ts: Date.now(),
result: { username, permissions }
}
return { username, permissions }
}
} catch (err) {
}
},
authenticate: function (username, password) {
return new Promise(function (resolve) {
resolve(null)
})
users: async function (username) {
return null
},
default: function () {
return new Promise(function (resolve) {
resolve(null)
})
authenticate: async function (username, password) {
return null
}
}

Expand Down

0 comments on commit 7f1d883

Please sign in to comment.