Skip to content

Commit

Permalink
feat: use protobufjs for telemetry ws sub/unsub
Browse files Browse the repository at this point in the history
- TODO: decoding commands, events, ?
  • Loading branch information
ozyx committed Dec 7, 2023
1 parent 4e145b5 commit 19f9b8b
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 22 deletions.
74 changes: 57 additions & 17 deletions src/providers/messages.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import {OBJECT_TYPES, DATA_TYPES, MDB_TYPE} from '../const';
import { yamcs } from 'yamcs-protobufjs-static';
import { OBJECT_TYPES, DATA_TYPES, MDB_TYPE } from '../const';
import { google, yamcs } from 'yamcs-protobufjs-static';

const { SubscribeEventsRequest } = yamcs.protobuf.events;
const { SubscribeAlarmsRequest } = yamcs.protobuf.alarms;
const { SubscribeCommandsRequest } = yamcs.protobuf.commanding;
const { SubscribeParametersRequest, SubscribeMdbChangesRequest } = yamcs.protobuf.processing;
const { ClientMessage, CancelOptions } = yamcs.api;
const { Any } = google.protobuf;

const typeMap = {
[OBJECT_TYPES.COMMANDS_OBJECT_TYPE]: DATA_TYPES.DATA_TYPE_COMMANDS,
Expand All @@ -23,9 +25,21 @@ const typeMap = {
export const SUBSCRIBE = buildSubscribeMessages();
// eslint-disable-next-line func-style
export const UNSUBSCRIBE = (subscriptionDetails) => {
return {
call: `${subscriptionDetails.call}`
};
const { call } = subscriptionDetails;
const payload = CancelOptions.create({
call
});
const arrayBuffer = CancelOptions.encode(payload).finish();
const any = Any.create({
type_url: CancelOptions.getTypeUrl(),
value: arrayBuffer
});
const clientMessage = ClientMessage.create({
type: 'cancel',
options: any
});

return ClientMessage.encode(clientMessage).finish();
};

function buildSubscribeMessages() {
Expand All @@ -34,15 +48,15 @@ function buildSubscribeMessages() {
for (const [objectType, dataType] of Object.entries(typeMap)) {

subscriptionMessages[objectType] = (subscriptionDetails) => {
const { instance, processor = "realtime", name } = subscriptionDetails;
const { subscriptionId, instance, processor = "realtime", name } = subscriptionDetails;
let payload;
let arrayBuffer;
let message;
let err;

if (isEventType(objectType)) {
message = {
instance: `${instance}`
instance
};
err = SubscribeEventsRequest.verify(message);
if (err) {
Expand All @@ -51,10 +65,14 @@ function buildSubscribeMessages() {

payload = SubscribeEventsRequest.create(message);
arrayBuffer = SubscribeEventsRequest.encode(payload).finish();
payload = Any.create({
type_url: SubscribeEventsRequest.getTypeUrl(),
value: arrayBuffer
});
} else if (isAlarmType(objectType)) {
message = {
instance: `${instance}`,
processor: `${processor}`
instance,
processor
};
err = SubscribeAlarmsRequest.verify(message);
if (err) {
Expand All @@ -63,10 +81,14 @@ function buildSubscribeMessages() {

payload = SubscribeAlarmsRequest.create(message);
arrayBuffer = SubscribeAlarmsRequest.encode(payload).finish();
payload = Any.create({
type_url: SubscribeAlarmsRequest.getTypeUrl(),
value: arrayBuffer
});
} else if (isCommandType(objectType)) {
message = {
instance: `${instance}`,
processor: `${processor}`,
instance,
processor,
ignorePastCommands: true
};
err = SubscribeCommandsRequest.verify(message);
Expand All @@ -77,10 +99,14 @@ function buildSubscribeMessages() {
payload = SubscribeCommandsRequest.create(message);
arrayBuffer = SubscribeCommandsRequest.encode(payload).finish();

payload = Any.create({
type_url: SubscribeCommandsRequest.getTypeUrl(),
value: arrayBuffer
});
} else if (isMdbChangesType(objectType)) {
message = {
instance: `${instance}`,
processor: `${processor}`
instance,
processor
};
err = SubscribeMdbChangesRequest.verify(message);
if (err) {
Expand All @@ -89,12 +115,16 @@ function buildSubscribeMessages() {

payload = SubscribeMdbChangesRequest.create(message);
arrayBuffer = SubscribeMdbChangesRequest.encode(payload).finish();
payload = Any.create({
type_url: SubscribeMdbChangesRequest.getTypeUrl(),
value: arrayBuffer
});
} else {
const id = [yamcs.protobuf.NamedObjectId.create({ name: `${name}`})];
message = {
type: `${dataType}`,
instance: `${instance}`,
processor: `${processor}`,
type: dataType,
instance,
processor,
id,
sendFromCache: true,
updateOnExpiration: true
Expand All @@ -106,9 +136,19 @@ function buildSubscribeMessages() {

payload = SubscribeParametersRequest.create(message);
arrayBuffer = SubscribeParametersRequest.encode(payload).finish();
payload = Any.create({
type_url: SubscribeParametersRequest.getTypeUrl(),
value: arrayBuffer
});
}

return arrayBuffer;
const clientMessage = ClientMessage.create({
type: dataType,
id: subscriptionId,
options: payload
});

return ClientMessage.encode(clientMessage).finish();
};
}

Expand Down
21 changes: 16 additions & 5 deletions src/providers/realtime-provider.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ import {
} from '../utils.js';
import { commandToTelemetryDatum } from './commands';
import { eventToTelemetryDatum, eventShouldBeFiltered } from './events';
import { yamcs } from 'yamcs-protobufjs-static';
const { ServerMessage, Reply } = yamcs.api;
const { SubscribeParametersData } = yamcs.protobuf.processing;

const FALLBACK_AND_WAIT_MS = [1000, 5000, 5000, 10000, 10000, 30000];
export default class RealtimeProvider {
Expand Down Expand Up @@ -202,6 +205,13 @@ export default class RealtimeProvider {
this.requests.push(request);
}
}
/**
* @param {import('yamcs-protobufjs-static').google.protobuf.Timestamp} timestamp the protobuf timestamp object
* @returns {number} milliseconds since epoch
*/
timestampObjectToMilliseconds(timestamp) {
return Math.floor((timestamp.seconds * 1000) + (timestamp.nanos / 1000000));
}

connect() {
if (this.connected) {
Expand All @@ -226,7 +236,7 @@ export default class RealtimeProvider {
};

this.socket.onmessage = (event) => {
const message = JSON.parse(event.data);
const message = ServerMessage.decode(new Uint8Array(event.data));

if (!this.isSupportedDataType(message.type)) {
return;
Expand All @@ -236,7 +246,8 @@ export default class RealtimeProvider {
let subscriptionDetails;

if (isReply) {
const id = message.data.replyTo;
const reply = Reply.decode(message.data.value);
const id = reply.replyTo;
const call = message.call;
subscriptionDetails = this.subscriptionsById[id];
subscriptionDetails.call = call;
Expand All @@ -250,13 +261,13 @@ export default class RealtimeProvider {
}

if (this.isTelemetryMessage(message)) {
let values = message.data.values || [];
let parentName = subscriptionDetails.domainObject.name;
const values = SubscribeParametersData.decode(new Uint8Array(message.data.value))?.toJSON()?.values || [];
const parentName = subscriptionDetails.domainObject.name;

values.forEach(parameter => {
let datum = {
id: qualifiedNameToId(subscriptionDetails.name),
timestamp: parameter[METADATA_TIME_KEY]
timestamp: this.timestampObjectToMilliseconds(parameter[METADATA_TIME_KEY])
};
let value = getValue(parameter, parentName);

Expand Down

0 comments on commit 19f9b8b

Please sign in to comment.