diff --git a/src/providers/messages.js b/src/providers/messages.js index a79ad132..be3644dc 100644 --- a/src/providers/messages.js +++ b/src/providers/messages.js @@ -1,10 +1,12 @@ -import {OBJECT_TYPES, DATA_TYPES, MDB_TYPE} from '../const'; -import { yamcs } from 'yamcs-protobufjs-static'; +import { OBJECT_TYPES, DATA_TYPES, MDB_TYPE } from '../const'; +import { google, yamcs } from 'yamcs-protobufjs-static'; const { SubscribeEventsRequest } = yamcs.protobuf.events; const { SubscribeAlarmsRequest } = yamcs.protobuf.alarms; const { SubscribeCommandsRequest } = yamcs.protobuf.commanding; const { SubscribeParametersRequest, SubscribeMdbChangesRequest } = yamcs.protobuf.processing; +const { ClientMessage, CancelOptions } = yamcs.api; +const { Any } = google.protobuf; const typeMap = { [OBJECT_TYPES.COMMANDS_OBJECT_TYPE]: DATA_TYPES.DATA_TYPE_COMMANDS, @@ -23,9 +25,21 @@ const typeMap = { export const SUBSCRIBE = buildSubscribeMessages(); // eslint-disable-next-line func-style export const UNSUBSCRIBE = (subscriptionDetails) => { - return { - call: `${subscriptionDetails.call}` - }; + const { call } = subscriptionDetails; + const payload = CancelOptions.create({ + call + }); + const arrayBuffer = CancelOptions.encode(payload).finish(); + const any = Any.create({ + type_url: CancelOptions.getTypeUrl(), + value: arrayBuffer + }); + const clientMessage = ClientMessage.create({ + type: 'cancel', + options: any + }); + + return ClientMessage.encode(clientMessage).finish(); }; function buildSubscribeMessages() { @@ -34,7 +48,7 @@ function buildSubscribeMessages() { for (const [objectType, dataType] of Object.entries(typeMap)) { subscriptionMessages[objectType] = (subscriptionDetails) => { - const { instance, processor = "realtime", name } = subscriptionDetails; + const { subscriptionId, instance, processor = "realtime", name } = subscriptionDetails; let payload; let arrayBuffer; let message; @@ -42,7 +56,7 @@ function buildSubscribeMessages() { if (isEventType(objectType)) { message = { - instance: `${instance}` + instance }; err = SubscribeEventsRequest.verify(message); if (err) { @@ -51,10 +65,14 @@ function buildSubscribeMessages() { payload = SubscribeEventsRequest.create(message); arrayBuffer = SubscribeEventsRequest.encode(payload).finish(); + payload = Any.create({ + type_url: SubscribeEventsRequest.getTypeUrl(), + value: arrayBuffer + }); } else if (isAlarmType(objectType)) { message = { - instance: `${instance}`, - processor: `${processor}` + instance, + processor }; err = SubscribeAlarmsRequest.verify(message); if (err) { @@ -63,10 +81,14 @@ function buildSubscribeMessages() { payload = SubscribeAlarmsRequest.create(message); arrayBuffer = SubscribeAlarmsRequest.encode(payload).finish(); + payload = Any.create({ + type_url: SubscribeAlarmsRequest.getTypeUrl(), + value: arrayBuffer + }); } else if (isCommandType(objectType)) { message = { - instance: `${instance}`, - processor: `${processor}`, + instance, + processor, ignorePastCommands: true }; err = SubscribeCommandsRequest.verify(message); @@ -77,10 +99,14 @@ function buildSubscribeMessages() { payload = SubscribeCommandsRequest.create(message); arrayBuffer = SubscribeCommandsRequest.encode(payload).finish(); + payload = Any.create({ + type_url: SubscribeCommandsRequest.getTypeUrl(), + value: arrayBuffer + }); } else if (isMdbChangesType(objectType)) { message = { - instance: `${instance}`, - processor: `${processor}` + instance, + processor }; err = SubscribeMdbChangesRequest.verify(message); if (err) { @@ -89,12 +115,16 @@ function buildSubscribeMessages() { payload = SubscribeMdbChangesRequest.create(message); arrayBuffer = SubscribeMdbChangesRequest.encode(payload).finish(); + payload = Any.create({ + type_url: SubscribeMdbChangesRequest.getTypeUrl(), + value: arrayBuffer + }); } else { const id = [yamcs.protobuf.NamedObjectId.create({ name: `${name}`})]; message = { - type: `${dataType}`, - instance: `${instance}`, - processor: `${processor}`, + type: dataType, + instance, + processor, id, sendFromCache: true, updateOnExpiration: true @@ -106,9 +136,19 @@ function buildSubscribeMessages() { payload = SubscribeParametersRequest.create(message); arrayBuffer = SubscribeParametersRequest.encode(payload).finish(); + payload = Any.create({ + type_url: SubscribeParametersRequest.getTypeUrl(), + value: arrayBuffer + }); } - return arrayBuffer; + const clientMessage = ClientMessage.create({ + type: dataType, + id: subscriptionId, + options: payload + }); + + return ClientMessage.encode(clientMessage).finish(); }; } diff --git a/src/providers/realtime-provider.js b/src/providers/realtime-provider.js index a09b8469..2b8c660d 100644 --- a/src/providers/realtime-provider.js +++ b/src/providers/realtime-provider.js @@ -39,6 +39,9 @@ import { } from '../utils.js'; import { commandToTelemetryDatum } from './commands'; import { eventToTelemetryDatum, eventShouldBeFiltered } from './events'; +import { yamcs } from 'yamcs-protobufjs-static'; +const { ServerMessage, Reply } = yamcs.api; +const { SubscribeParametersData } = yamcs.protobuf.processing; const FALLBACK_AND_WAIT_MS = [1000, 5000, 5000, 10000, 10000, 30000]; export default class RealtimeProvider { @@ -202,6 +205,13 @@ export default class RealtimeProvider { this.requests.push(request); } } + /** + * @param {import('yamcs-protobufjs-static').google.protobuf.Timestamp} timestamp the protobuf timestamp object + * @returns {number} milliseconds since epoch + */ + timestampObjectToMilliseconds(timestamp) { + return Math.floor((timestamp.seconds * 1000) + (timestamp.nanos / 1000000)); + } connect() { if (this.connected) { @@ -226,7 +236,7 @@ export default class RealtimeProvider { }; this.socket.onmessage = (event) => { - const message = JSON.parse(event.data); + const message = ServerMessage.decode(new Uint8Array(event.data)); if (!this.isSupportedDataType(message.type)) { return; @@ -236,7 +246,8 @@ export default class RealtimeProvider { let subscriptionDetails; if (isReply) { - const id = message.data.replyTo; + const reply = Reply.decode(message.data.value); + const id = reply.replyTo; const call = message.call; subscriptionDetails = this.subscriptionsById[id]; subscriptionDetails.call = call; @@ -250,13 +261,13 @@ export default class RealtimeProvider { } if (this.isTelemetryMessage(message)) { - let values = message.data.values || []; - let parentName = subscriptionDetails.domainObject.name; + const values = SubscribeParametersData.decode(new Uint8Array(message.data.value))?.toJSON()?.values || []; + const parentName = subscriptionDetails.domainObject.name; values.forEach(parameter => { let datum = { id: qualifiedNameToId(subscriptionDetails.name), - timestamp: parameter[METADATA_TIME_KEY] + timestamp: this.timestampObjectToMilliseconds(parameter[METADATA_TIME_KEY]) }; let value = getValue(parameter, parentName);