Skip to content

Commit

Permalink
Add set_level and status to bridge (RobotWebTools#135)
Browse files Browse the repository at this point in the history
Prior to this PR, the bridge returned "set_level" in response to requests - this doesn't match the protocol spec which claims set_level is a request to set the reporting level, not a response.

This PR makes the bridge return actual status responses, and suppresses status messages below a level configurable with the set_level message. I also updated the error cases to forward the actual messages instead of just using debug(), and added a command-line flag to set the starting status level (for testing and/or other scripting).

Fixes RobotWebTools#5
  • Loading branch information
smartin015 authored Mar 22, 2020
1 parent cfe23e7 commit 59163ba
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 30 deletions.
3 changes: 2 additions & 1 deletion bin/rosbridge.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const pkg = require('../package.json');
app
.version(pkg.version)
.option('-p, --port [port_number]', 'Listen port, default to :9090')
.option('-a, --address [address_string]', 'Address')
.option('-a, --address [address_string]', 'Remote server address (client mode); server mode if unset')
.option('-r, --retry_startup_delay [delay_ms]', 'Retry startup delay in millisecond')
.option('-o, --fragment_timeout [timeout_ms]', 'Fragment timeout in millisecond')
.option('-d, --delay_between_messages [delay_ms]', 'Delay between messages in millisecond')
Expand All @@ -32,6 +32,7 @@ app
.option('-s, --services_glob [glob_list]', 'A list or None')
.option('-g, --params_glob [glob_list]', 'A list or None')
.option('-b, --bson_only_mode', 'Unsupported in WebSocket server, will be ignored')
.option('-l, --status_level [level_string]', 'Status level (one of "error", "warning", "info", "none"; default "error")')
.parse(process.argv);

rosbridge.createServer(app);
2 changes: 1 addition & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ function createServer(options) {
}

const makeBridge = (ws) => {
let bridge = new Bridge(node, ws);
let bridge = new Bridge(node, ws, options.status_level);
bridgeMap.set(bridge.bridgeId, bridge);

bridge.on('error', (error) => {
Expand Down
66 changes: 41 additions & 25 deletions lib/bridge.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ const EventEmitter = require('events');
const uuidv4 = require('uuid/v4');
const {validator} = require('rclnodejs');

const STATUS_LEVELS = ['error', 'warning', 'info', 'none'];

class MessageParser {
constructor() {
this._buffer = '';
Expand Down Expand Up @@ -65,7 +67,8 @@ class MessageParser {
}

class Bridge extends EventEmitter {
constructor(node, ws) {

constructor(node, ws, statusLevel) {
super();
this._ws = ws;
this._parser = new MessageParser();
Expand All @@ -76,6 +79,7 @@ class Bridge extends EventEmitter {
this._registerConnectionEvent(ws);
this._rebuildOpMap();
this._topicsPublished = new Map();
this._setStatusLevel(statusLevel || 'error');
debug(`Web bridge ${this._bridgeId} is created`);
}

Expand Down Expand Up @@ -152,11 +156,17 @@ class Bridge extends EventEmitter {
}

_rebuildOpMap() {
this._registerOpMap('set_level', (command) => {
if (STATUS_LEVELS.indexOf(command.level) === -1) {
throw new Error(`Invalid status level ${command.level}; must be one of ${STATUS_LEVELS}`);
}
this._setStatusLevel(command.level);
});

this._registerOpMap('advertise', (command) => {
let topic = command.topic;
if (this._topicsPublished.has(topic) && (this._topicsPublished.get(topic) !== command.type)) {
debug(`The topic ${topic} already exists with a different type ${this._topicsPublished.get(topic)}.`);
throw new Error();
throw new Error(`The topic ${topic} already exists with a different type ${this._topicsPublished.get(topic)}.`);
}
debug(`advertise a topic: ${topic}`);
this._topicsPublished.set(topic, command.type);
Expand All @@ -168,8 +178,7 @@ class Bridge extends EventEmitter {
this._validateTopicOrService(command.topic);

if (!this._topicsPublished.has(topic)) {
debug(`The topic ${topic} does not exist.`);
let error = new Error();
let error = new Error(`The topic ${topic} does not exist`);
error.level = 'warning';
throw error;
}
Expand Down Expand Up @@ -200,8 +209,7 @@ class Bridge extends EventEmitter {
this._validateTopicOrService(topic);

if (!this._resourceProvider.hasSubscription(topic)) {
debug(`The topic ${topic} does not exist.`);
let error = new Error();
let error = new Error(`The topic ${topic} does not exist.`);
error.level = 'warning';
throw error;
}
Expand Down Expand Up @@ -252,8 +260,7 @@ class Bridge extends EventEmitter {
this._validateTopicOrService(serviceName);

if (!this._resourceProvider.hasService(serviceName)) {
debug(`The service ${serviceName} does not exist.`);
let error = new Error();
let error = new Error(`The service ${serviceName} does not exist.`);
error.level = 'warning';
throw error;
}
Expand All @@ -265,18 +272,15 @@ class Bridge extends EventEmitter {
executeCommand(command) {
try {
const op = this._opMap[command.op];
if (op) {
op.apply(this, [command]);
this._sendBackOperationStatus();
} else {
debug(`Operation ${command.op} is not supported.`);
this._sendBackOperationStatus({id: command.id, op: command.op});
if (!op) {
throw new Error(`Operation ${command.op} is not supported`);
}
op.apply(this, [command]);
this._sendBackOperationStatus(command.id, 'none', 'OK');
} catch (e) {
debug(`Exception caught in Bridge.executeCommand(): ${e}`);
e.id = command.id;
e.op = command.op;
this._sendBackOperationStatus(e);
this._sendBackErrorStatus(e);
}
}

Expand All @@ -286,19 +290,31 @@ class Bridge extends EventEmitter {
this._ws.send(JSON.stringify(response));
}

_sendBackOperationStatus(error) {
let command;
if (error) {
error.level = error.level || 'error';
command = {op: 'set_level', id: error.id, level: error.level};
debug(`Error: ${error} happened when executing command ${error.op}`);
} else {
command = {op: 'set_level', level: 'none'};
_sendBackErrorStatus(error) {
const msg = `${error.op}: ${error}`;
return this._sendBackOperationStatus(error.id, error.level || 'error', msg);
}

_sendBackOperationStatus(id, level, msg) {
let command = {
op: 'status',
level: level || 'none',
msg: msg || '',
id: id,
};
if (this._statusLevel < STATUS_LEVELS.indexOf(level)) {
debug('Suppressed: ' + JSON.stringify(command));
return;
}
debug('Response: ' + JSON.stringify(command));
this._ws.send(JSON.stringify(command));
}

_setStatusLevel(level) {
this._statusLevel = STATUS_LEVELS.indexOf(level);
debug(`Status level set to ${level} (${this._statusLevel})`);
}

_validateTopicOrService(name) {
if (name.startsWith('/')) {
validator.validateFullTopicName(name);
Expand Down
8 changes: 6 additions & 2 deletions test/nodejs/protocol/entry.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ var rosbridge = path.resolve(__dirname, '../../../bin/rosbridge.js');

describe('Rosbridge v2.0 protocol testing', function() {
var webSocketServer;
this.timeout(60 * 1000);
this.timeout(5 * 1000);

before(function(done) {
webSocketServer = child.fork(rosbridge, {silent: true});
webSocketServer = child.fork(rosbridge, ['-l', 'none'], {silent: true});
webSocketServer.stdout.on('data', function(data) {
done();
});
Expand Down Expand Up @@ -83,6 +83,10 @@ describe('Rosbridge v2.0 protocol testing', function() {
require('./test-unadvertise-service.js')();
});

describe('set_level operation', function() {
require('./test-set-level.js')();
});

// Disable this case temporarily, sine it gets stuck on Windows CI.
// describe('response operations', function() {
// require('./test-response-op.js')();
Expand Down
3 changes: 2 additions & 1 deletion test/nodejs/protocol/test-call-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,10 @@ module.exports = function() {
ws.send(JSON.stringify(testData.callServiceMsg));
});
ws.on('message', function(data) {
console.log(data);
let response = JSON.parse(data);

if (response.op === 'set_level') {
if (response.op === 'status') {
assert.deepStrictEqual(response.level, testData.opStatus);
ws.close();
done();
Expand Down
105 changes: 105 additions & 0 deletions test/nodejs/protocol/test-set-level.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright (c) 2017 Intel Corporation. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

'use strict';

const assert = require('assert');
const WebSocket = require('ws');

module.exports = function() {
let testCasesData = [
{
title: 'set_level to error',
ops: [
{
payload: {op: 'set_level', id: 'id1', level: 'error'},
status: null
}
],
},
{
title: 'set_level to warning',
ops: [
{
payload: {op: 'set_level', id: 'id1', level: 'warning'},
status: null
}
],
},
{
title: 'set_level to info',
ops: [
{
payload: {op: 'set_level', id: 'id1', level: 'info'},
status: null
}
],
},
{
title: 'set_level to none',
ops: [
{
payload: {op: 'set_level', id: 'id1', level: 'none'},
status: 'none'
}
],
},
{
title: 'set_level to invalid',
ops: [
{
payload: {op: 'set_level', id: 'id1', level: 'invalid'},
status: 'error'
}
],
},
];

testCasesData.forEach((testData, index) => {
it(testData.title, function() {
return new Promise((resolve, reject) => {
let ws = new WebSocket('ws://127.0.0.1:9090');
let counter = 0;
let timeout = null;

function handleMessage(data) {
if (timeout !== null) {
clearTimeout(timeout);
timeout = null;
}
if (data !== null || testData.ops[counter].status !== null) {
let response = JSON.parse(data);
assert.deepStrictEqual(response.level, testData.ops[counter].status);
}

counter++;
if (counter === testData.ops.length) {
ws.close();
resolve();
} else {
ws.send(JSON.stringify(testData.ops[counter].payload));
}
}
ws.on('message', handleMessage);

ws.on('open', function() {
ws.send(JSON.stringify(testData.ops[0].payload));
if (testData.ops[0].status === null) {
timeout = setTimeout(() => handleMessage(null), 100);
}
});
});
});
});
};

0 comments on commit 59163ba

Please sign in to comment.