The ProtoBuf.js JavaScript (JS) library allows to create messages using Google's Protocol Buffers, which allow to specify in addition to messages also remote procedure call (RPC) services: However, it is the responsibility of the developer to come up with an actual RPC mechanism!
Hence, this package ProtoBuf.Rpc.js provides a corresponding RPC implementation via XMLHttpRequest (XHR) and WebSockets (WS): It is based on a request and response pattern. However when using WebSockets, then it is also possible to have a publish and subscribe pattern - to realize subscriptions from a client to the server side.
package Reflector;
message AckRequest {
string timestamp = 1;
}
message AckResult {
string timestamp = 1;
}
service Service {
rpc ack(AckRequest) returns(AckResult);
}
Above you have the content of the reflector.proto
specification: Here the RPC service has been named Reflector.Service
, but any other designation is possible. It provides a single method ack
, which takes an AckRequest
and returns an AckResult
. Both structures contain a single timestamp
field, where the result's timestamp shall simply be a copy of the request's timestamp.
The ack
method allows to measure the round trip time (RTT) from the client to the server. In Node.js a client side script could look like:
let ProtoBuf = require('protobufjs');
ProtoBuf.Rpc = require('protobufjs-rpc');
let ReflectorFactory = ProtoBuf.loadSync('uri/for/reflector.proto'),
Reflector = ReflectorFactory.lookup('Reflector');
let reflector_svc = new ProtoBuf.Rpc(Reflector.Service, {
url: 'ws://localhost:8088'
});
reflector_svc.on('open', function () {
let req = {
timestamp: new Date().toISOString()
};
reflector_svc.ack(req, function (error, res) {
if (error !== null) throw new Error(error);
assert(res.timestamp);
let ms = new Date() - new Date(res.timestamp);
assert(ms > 0);
});
});
package Listener;
message SubRequest {
string timestamp = 1;
}
message SubResult {
string timestamp = 1;
}
service Service {
rpc sub(SubRequest) returns(stream SubResult);
}
The listener.proto
specification above describes an RPC service named Listener.Service
. It provides a single method sub
, which takes a SubRequest
and returns a SubResult
. Both structures contain again a single timestamp
field, where the result's timestamp shall again simply be a copy of the request's timestamp.
Further, SubResult
is designated as a stream
, which causes the service's RCP implementation (in ProtoBuf.Rpc.js) to keep the callback handler of the sub
invocation - as long as the Listener.Service
is around. Therefore, the server is able to push an arbitrary number of messages to the client (realizing an effective publish and subscribe pattern).
The sub
method allows to subscribe from the client to the server. In Node.js a client side script could look like:
let ProtoBuf = require('protobufjs');
ProtoBuf.Rpc = require('protobufjs-rpc');
let ListenerFactory = ProtoBuf.loadSync('uri/for/listener.proto'),
Listener = ListenerFactory.lookup('Listener');
let listener_svc = new ProtoBuf.Rpc(Listener.Service, {
url: 'ws://localhost:8088'
});
listener_svc.on('open', function () {
let req = {
timestamp: new Date().toISOString()
};
listener_svc.sub(req/*, function (error, res) {
if (error !== null) throw new Error(error);
assert(res.timestamp);
let ms = new Date() - new Date(res.timestamp);
assert(ms > 0);
}*/);
});
The callback above is actually not required, since it is possible to subscribe to a single or multiple handlers via the data
event:
listener_svc.on('data', function (res, method) {
test.ok(res.timestamp);
});
listener_svc.on('data', function (res, method) {
test.ok(method); // method.name === 'sub'
});
listener_svc.on('end', function () {
listener_svc.off('data');
});
Further notice, that upon an end
event any callback handlers for the data
event are switched off. Wrapping up the service the transport layer gets closed and the end
event is emitted:
listener_svc.end();
It is also possible to switch off a particular handler by providing the off
function a reference to the original callback (see event emitters for further information):
let callback = function (res, method) {
// ...
};
listener_svc.on('data', callback);
listener_svc.on('end', function () {
listener_svc.off('data', callback);
});
The Protocol Buffers leave the actual implementation of RCP services open. Accordingly, ProtoBuf.js - which this ProtoBuf.Rpc.js library is based on - does not provide a corresponding solution either. Therefore, this library attempts to fill this gap by using a minimal and lightweight yet extensible approach:
+-------------------------------------------------+
| RPC Service : Invocation of an RPC method |
+-------------------------------------------------+
| Encoding : Binary buffers (or JSON, Base64) | JavaScript Client
+-------------------------------------------------+
| Transport : WebSockets (or XMLHttpRequest) |
+-------------------------------------------------+
|v| |^|
|v| RPC Request RPC Response |^|
|v| |^|
+-------------------------------------------------+
| Transport : WebSockets (or XMLHttpRequest) |
+-------------------------------------------------+
| Encoding : Binary buffers (or JSON, Base64) | Any Server
+-------------------------------------------------+
| RPC Service : Method execution and return |
+-------------------------------------------------+
As you see the RPC invocation follows a simple request-response pattern, where the initial request is triggered by the JS client, upon which the server answers with a response. The request and response messages are defined as:
message Rpc {
message Request {
string name = 1;
fixed32 id = 2;
bytes data = 3;
}
message Response {
fixed32 id = 2;
bytes data = 3;
}
}
The fully qualified name
(FQN) of an Rpc.Request
indicates, which method on which service shall be executed (on the server side), for example .Reflector.Service.ack
:
-
We chose to use a string for
name
(instead of working maybe with hash values), mainly to ease debugging and logging. -
The random
id
number is a (temporary) unique identifier for the request, allowing the response to be delivered to the correct handler on the client side. Since it is a 32 bit integer it should be sufficient; especially because it is assumed that many - but relatively short lived - requests will be dispatched.Upon a successful response (or an error) the
id
is freed for re-use. -
The
data
bytes carry an encoding of the current request, where for example in case of.Reflector.Service.ack
it simply would be the byte representation of the timestamp.
This ProtoBuf.Rpc.js library offers abstractions for RPC services on the client side (Node.js and BrowserJS compatible), whereas for the server side only simple and functional examples in Node.js, Python and QT/C++ have been provided.
Clone the library with GIT:
git clone https://github.com/hsk81/protobuf-rpc-js pb-rpc.git
Invoke the installation of dependencies with NPM:
cd pb-rpc.git && npm install
Protocol Buffers require the special compiler protoc to create language bindings from the proto
files: But since the ProtoBuf.js library is able to process these files on the fly, no such compiler is required (for JavaScript based clients or servers).
Start the server and enable console logging:
cd pb-rpc.git && npm run rpc-server.js -- -l
Start the client and enable Reflector
service acknowledgments:
cd pb-rpc.git && npm run rpc-client.js -- -n1
For the next 10
seconds the client will keep invoking the corresponding functionality on the server, measure the RTT in milli-seconds and log them to the standard output:
dT[ack]@0: 5.464066
dT[ack]@0: 3.234051
dT[ack]@0: 1.066847
dT[ack]@0: 0.651019
dT[ack]@0: 0.369384
dT[ack]@0: 0.016113
dT[ack]@0: 0.005941
dT[ack]@0: 0.027208
Here the RTTs start high with about 5.4
milli-seconds, apparently due to the Node.js' initial JIT optimizations. But very quickly they go down to a sub-milli-second range.
By increasing the numbers assigned to the arguments, for example by setting --n-ack=2
, you can control the throughput (which adversely effects the RTT latencies). For the detailed discussion of the performance characteristics see the log/README.md
file.
The ./example/client/js-www/index.html
demonstrates that the ProtoBuf.Rpc.js library has browser support. But you need first to run the corresponding index.js
static server (via the www-server.js
NPM script) to be able to provide the index.html
to a browser:
cd pb-rpc.git && npm run www-server.js
Ensure that your rpc-server
is still running and then open the http://localhost:8080
address: On the console the static file server should produce an output similar to:
Paper Server - listening on http://localhost:8080/
[200] /
[200] /lib/dcodeIO/long.min.js
[200] /lib/dcodeIO/bytebuffer.min.js
[200] /lib/dcodeIO/protobuf.min.js
[200] /lib/dcodeIO/protobuf-rpc.min.js
[200] /protocol/api.proto
[200] /protocol/reflector.proto
The page on http://localhost:8080
should simply be empty, but opening up the console (via for example F12
) and checking the output you should discover:
(index):56 [on:ack] e {timestamp: "2015-11-19T07:25:17.665Z"} e {timestamp: "2015-11-19T07:25:17.665Z"} null
So apparently, the acknowledgment performed as expected: The content of the left hand side curly brackets represents the request payload (i.e. Rpc.Request.data
) and the content of the right hand side curly brackets represents the response payload (i.e. Rpc.Reponse.data
).
Both Rpc.Request
and Rpc.Response
have data
fields, which is a list of bytes carrying a custom message. For example, when a Reflector.AckRequest
is sent and Reflector.AckResponse
is received, then they are packed into a Rpc.Request
and a Rpc.Response
:
- Rpc.Request:
+------------------------------------------------------------------------------+
| name=.Reflector.ack|id=<fixed32>|data=<[Reflector.AckRequest:timestamp=".."]>|
+------------------------------------------------------------------------------+
- Rpc.Response:
+------------------------------------------------------------------------------+
| id=<fixed32>|data=<[Reflector.AckResponse:timestamp=".."]> |
+------------------------------------------------------------------------------+
By default both the request and response messages are sent using a compact binary encoding (without any labels).
As already mentioned this ProtoBuf.Rpc.js library provides abstractions for the client side only. Therefore, on the server side you are on your own - a straight forward way to process the requests would be to check them in a switch statement and then run the corresponding functionality:
TRANSPORT.onmessage = function (data) {
let req, rpc_req = Rpc.Request.decode(data),
res, rpc_res;
switch (rpc_req.name) {
case '.Reflector.Service.ack':
req = Api.Reflector.AckRequest.decode(rpc_req.data);
res = Api.Reflector.AckResult.encode({
timestamp: req.timestamp
});
break;
// case '.Listener.Service.sub':
// res = process_sub(req);;
// break;
default:
throw(new Error(rpc_req.name + ': not supported'));
}
rpc_res = Rpc.Response.encode({
id: rpc_req.id, data: res.finish()
});
//
// `TRANSPORT.send` can be invoked multiple times in case of a
// *publish* and *subscribe* pattern (requiring the result to be
// designated as a **stream** in the `*.proto` specification). But
// otherwise, a single call of `TRANSPORT.send` is enough!
//
TRANSPORT.send(rpc_res.finish());
};
Here TRANSPORT
could for example be a WebSocket
or an XMLHttpRequest
, to receive and send messages. However, for a publish and subscribe pattern the TRANSPORT
layer requires a WebSocket
connection.
A QT/C++ version of rpc-server
with <QtWebSockets>
has been implemented.
For the following to work a QT5+
installation with qmake
is required. Further, the protobuf3 package contains C++ includes (header files) and corresponding libraries, which are necessary for compilation and linkage:
cd pb-rpc.git && make build-server-cpp
Once compilation is done, you can run it with:
cd pb-rpc.git && npm run rpc-server.cpp -- -l
When you instantiate the reflector_svc
service you can provide an additional transport
parameter:
- For
WebSocket
(asynchronous):
var reflector_svc = new ProtoBuf.Rpc(Reflector.Service, {
transport: ProtoBuf.Rpc.Transport.Ws,
url: 'ws://localhost:8089'
});
- For
XMLHttpRequest
(asynchronous):
var reflector_svc = new ProtoBuf.Rpc(Reflector.Service, {
transport: ProtoBuf.Rpc.Transport.Xhr,
url: 'http://localhost:8088'
});
- For
XMLHttpRequest
(synchronous):
var reflector_svc = new ProtoBuf.Rpc(Reflector.Service, {
transport: ProtoBuf.Rpc.Transport.Xhr.bind(null, {sync: true}),
url: 'http://localhost:8088'
});
If the transport
parameter is omitted, then by default ProtoBuf.Rpc.Transport.Ws
for WebSockets will be used. It sends its requests asynchronously, whereas the ProtoBuf.Rpc.Transport.Xhr
transport sends them either asynchronously or synchronously.
By providing a constructor defining the open
and send
functions, it is easily possible to introduce a custom transport layer:
var my_service = new ProtoBuf.Rpc(My.Service, {
url: 'my-transport://host:port', transport: function (opts) {
this.open = function (url) {
this.my_url = url;
assert(this.my_url);
this.my_socket = new MyTransport(url);
assert(this.my_socket);
};
this.send = function (buffer, msg_callback, err_callback) {
this.my_socket.onmessage = function (ev) {
msg_callback(ev.data);
};
this.my_socket.onerror = function (err) {
err_callback(err);
};
this.my_socket.send(buffer);
};
}
});
Any property of the transport can be accessed via my_service.transport
, for example my_service.transport.my_url
or my_service.transport.my_socket
.
This library would have not been possible without the support of dizmo.com, a great company developing «The Interface of Things».