Skip to content

Commit

Permalink
Devtools: telemetry recording
Browse files Browse the repository at this point in the history
  • Loading branch information
kobkaz committed Feb 20, 2024
1 parent a3e6313 commit 8d6ff56
Show file tree
Hide file tree
Showing 6 changed files with 3,343 additions and 4 deletions.
1 change: 1 addition & 0 deletions tmtc-c2a/devtools_frontend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"@protobuf-ts/plugin": "^2.8.2",
"@types/react": "18.2.55",
"@types/react-dom": "18.2.19",
"@types/wicg-file-system-access": "^2023.10.4",
"@typescript-eslint/eslint-plugin": "6.21.0",
"@typescript-eslint/parser": "6.21.0",
"@vitejs/plugin-react": "4.2.1",
Expand Down
54 changes: 51 additions & 3 deletions tmtc-c2a/devtools_frontend/src/components/Layout.tsx
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Classes, Icon } from "@blueprintjs/core";
import React, { useMemo } from "react";
import React, { useMemo, useEffect, useState } from "react";
import {
Link,
NavLink,
Expand All @@ -13,6 +13,7 @@ import { SatelliteSchema } from "../proto/tmtc_generic_c2a";
import { Panel, PanelGroup, PanelResizeHandle } from "react-resizable-panels";
import { IconNames } from "@blueprintjs/icons";
import type { GrpcClientService } from "../worker";
import type { RecorderService, RecordingStatus } from "../recorderWorker";

type TelemetryMenuItem = {
name: string;
Expand All @@ -31,6 +32,42 @@ const TelemetryListSidebar: React.FC<TelemetryListSidebarProps> = ({
activeName: tmivName,
telemetryListItems,
}) => {
const recorder = (useLoaderData() as ClientContext).client;
const [recorderStatus, setRecordingStatus] = useState<RecordingStatus | null>(
null
);
useEffect(() => {
const readerP = recorder
.openRecordingStatusStream()
.then((stream) => stream.getReader());
let cancel;
const cancelP = new Promise((resolve) => (cancel = resolve));
Promise.all([readerP, cancelP]).then(([reader]) => reader.cancel());
readerP.then(async (reader) => {
// eslint-disable-next-line no-constant-condition
while (true) {
const next = await reader.read();
if (next.done) {
break;
}
setRecordingStatus(next.value);
}
});
return cancel;
}, [recorder]);
const toggleRecordingStatus = async (name: string) => {
if (!recorderStatus?.directoryIsSet) {
const directoryHandle = await window.showDirectoryPicker({
mode: "readwrite",
});
recorder.setRootRecordDirectory(directoryHandle);
}
if (recorderStatus?.recordingTelemetries.has(name)) {
recorder.disableRecording(name);
} else {
recorder.enableRecording(name);
}
};
return (
<div className="p-1 h-full flex-1 flex flex-col">
<ul>
Expand Down Expand Up @@ -65,6 +102,17 @@ const TelemetryListSidebar: React.FC<TelemetryListSidebarProps> = ({
tmivName === item.name ? Classes.ACTIVE : ""
}`}
>
<span onClick={() => toggleRecordingStatus(item.name)}>
{recorderStatus?.recordingTelemetries.has(item.name)
? <div className="py-1">
<div className="w-3 h-3 bg-red-600 rounded-full"></div>
</div>
: <div className="py-1">
<div className="w-3 h-3 bg-gray-600 rounded-full"></div>
</div>

}
</span>
<code
className={`${Classes.FILL} ${Classes.TEXT_OVERFLOW_ELLIPSIS}`}
>
Expand All @@ -91,10 +139,10 @@ export const Layout = () => {
const items: TelemetryMenuItem[] = [];
const channelNames = Object.keys(ctx.satelliteSchema.telemetryChannels);
for (const [componentName, componentSchema] of Object.entries(
ctx.satelliteSchema.telemetryComponents,
ctx.satelliteSchema.telemetryComponents
)) {
for (const [telemetryName, telemetrySchema] of Object.entries(
componentSchema.telemetries,
componentSchema.telemetries
)) {
for (const channelName of channelNames) {
const name = `${channelName}.${componentName}.${telemetryName}`;
Expand Down
2 changes: 2 additions & 0 deletions tmtc-c2a/devtools_frontend/src/main.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { Callout, FocusStyleManager, Intent } from "@blueprintjs/core";
import { CommandView } from "./components/CommandView";
import { buildClient } from "./client";
import type { GrpcClientService } from "./worker";
import type { RecorderService } from "./recorderWorker";
import { IconNames } from "@blueprintjs/icons";
import { FriendlyError } from "./error";

Expand All @@ -35,6 +36,7 @@ const clientLoader: LoaderFunction = async () => {
details: "Make sure that your tmtc-c2a is running.",
});
})!;

return { client, satelliteSchema };
};

Expand Down
227 changes: 227 additions & 0 deletions tmtc-c2a/devtools_frontend/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ export type GrpcClientService = {
getSatelliteSchema(): Promise<GetSateliteSchemaResponse>;
postCommand(input: PostCommandRequest): Promise<PostCommandResponse>;
openTelemetryStream(tmivName: string): Promise<ReadableStream<Tmiv>>;

setRootRecordDirectory(directory: FileSystemDirectoryHandle): Promise<void>;
hasRecordDirectory(): Promise<boolean>;
enableRecording(telemetryName: string): Promise<void>;
disableRecording(telemetryName: string): Promise<void>;
openRecordingStatusStream(): Promise<ReadableStream<RecordingStatus>>;
};

export type WorkerRpcService = {
Expand Down Expand Up @@ -62,6 +68,175 @@ const startTelemetryStream = async () => {
}
};

export type RecordingStatus = {
directoryIsSet: boolean;
recordingTelemetries: Set<string>;
};
type RecordingStatusListener = (status: RecordingStatus) => void;
let rootRecordDirectory: FileSystemDirectoryHandle | undefined;
const recorders: Map<string, TelemetryRecorder> = new Map();
let nextRecordingStatusListenerId = 0;
const recordStatusListeners: Map<number, RecordingStatusListener> = new Map();

const currentRecordingStatus = (): RecordingStatus => {
return {
directoryIsSet: rootRecordDirectory !== undefined,
recordingTelemetries: new Set(recorders.keys()),
};
};

const addRecordingStatusListener = (
listener: RecordingStatusListener,
): number => {
listener(currentRecordingStatus());
const id = nextRecordingStatusListenerId++;
recordStatusListeners.set(id, listener);
return id;
};

const removeRecordingStatusListener = (id: number): void => {
recordStatusListeners.delete(id);
};

const notifyRecordingStatusListener = (): void => {
const status = currentRecordingStatus();
for (const listener of recordStatusListeners.values()) {
listener(status);
}
};

// TODO: flush on worker termination
class TelemetryRecorder {
telemetryName: string;
cancel: () => void = () => {};
cancelled: Promise<null>;
onStop: () => void;
constructor(
rootRecordDirectory: FileSystemDirectoryHandle,
telemetryName: string,
onStop: () => void,
) {
this.telemetryName = telemetryName;
this.cancelled = new Promise((resolve) => {
this.cancel = () => resolve(null);
});
this.onStop = onStop;
this.run(rootRecordDirectory);
}

private async write(
recordDirectory: FileSystemDirectoryHandle,
writable: FileSystemWritableFileStream,
tmiv: Tmiv,
) {
// if tmiv has @blob field, save it to a file
let blobFileName = "";
const blob = tmiv.fields.find((field) => {
return field.name === "@blob";
});
if (blob !== undefined) {
if (blob.value.oneofKind == "bytes") {
const blobBytes = blob.value.bytes;
//FIXME: safer name consrtuction
const blobDirectory = await recordDirectory.getDirectoryHandle(
"blob_data",
{
create: true,
},
);

// FIXME: readable time format?
// FIXME: avoid name collision
blobFileName = `${Date.now()}.dat`;
const recordFile = await blobDirectory.getFileHandle(blobFileName, {
create: true,
});
const blobWritable = await recordFile.createWritable();
await blobWritable.write(blobBytes);
await blobWritable.close();
}
}

const fields: { [key: string]: any } = {};
for (const field of tmiv.fields) {
if (field.name.includes("@RAW")) {
continue;
}
const name = field.name;
let value = undefined;
if (field.name == "@blob") {
value = blobFileName;
} else if (field.value.oneofKind == "integer") {
value = Number(field.value.integer);
} else if (field.value.oneofKind == "string") {
value = field.value.string;
} else if (field.value.oneofKind == "double") {
value = field.value.double;
} else if (field.value.oneofKind == "enum") {
value = field.value.enum;
} else if (field.value.oneofKind == "bytes") {
value = field.value.bytes;
}
fields[name] = value;
}
await writable.write(JSON.stringify(fields));
await writable.write("\n");
}

private async run(rootRecordDirectory: FileSystemDirectoryHandle) {
// FIXME: safer name construction
const recordDirectoryName = this.telemetryName;
const recordDirectory = await rootRecordDirectory.getDirectoryHandle(
recordDirectoryName,
{ create: true },
);
// FIXME: readable time format?
const recordFile = await recordDirectory.getFileHandle(
`${Date.now()}.log`,
{
create: true,
},
);

const telemetryStream = await server.openTelemetryStream(
this.telemetryName,
);
const reader = telemetryStream.getReader();
let writable = await recordFile.createWritable();
let lastFlushTime = Date.now();

// eslint-disable-next-line no-constant-condition
while (true) {
const next = await Promise.race([reader.read(), this.cancelled]);
if (next === null) {
// cancelled
break;
}
if (next.done) {
break;
}
const tmiv = next.value;

await this.write(recordDirectory, writable, tmiv);
if (Date.now() - lastFlushTime > 10000) {
lastFlushTime = Date.now();
await writable.close();
writable = await recordFile.createWritable({ keepExistingData: true });
const size = (await recordFile.getFile()).size;
await writable.seek(size);
}
}

await writable.close();
this.onStop();
}

stop() {
// do not call onStop here
this.cancel();
}
}

const server = {
async getSatelliteSchema(): Promise<GetSateliteSchemaResponse> {
const { response } = await tmtcGenericC2a.getSatelliteSchema({});
Expand Down Expand Up @@ -89,6 +264,58 @@ const server = {
},
});
},

async setRootRecordDirectory(
directory: FileSystemDirectoryHandle,
): Promise<void> {
if (rootRecordDirectory === undefined) {
rootRecordDirectory = directory;
}
},

async hasRecordDirectory(): Promise<boolean> {
return rootRecordDirectory !== undefined;
},

async enableRecording(telemetryName: string): Promise<void> {
if (rootRecordDirectory === undefined) {
return;
}
if (recorders.has(telemetryName)) {
return;
}

const recorder = new TelemetryRecorder(
rootRecordDirectory,
telemetryName,
() => {
recorders.delete(telemetryName);
notifyRecordingStatusListener();
},
);
recorders.set(telemetryName, recorder);
notifyRecordingStatusListener();
},

async disableRecording(telemetryName: string): Promise<void> {
const recorder = recorders.get(telemetryName);
if (recorder === undefined) {
return;
}
recorder.stop();
},

async openRecordingStatusStream(): Promise<ReadableStream<RecordingStatus>> {
let id: number | undefined;
return new ReadableStream({
start(controller) {
id = addRecordingStatusListener((status) => controller.enqueue(status));
},
cancel() {
removeRecordingStatusListener(id!);
},
});
},
};

self.addEventListener("connect", (e) => {
Expand Down
2 changes: 1 addition & 1 deletion tmtc-c2a/devtools_frontend/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"baseUrl": "./",
"target": "ESNext",
"lib": ["DOM", "DOM.Iterable", "ESNext", "WebWorker"],
"types": ["vite/client"],
"types": ["vite/client", "@types/wicg-file-system-access" ],
"allowJs": false,
"skipLibCheck": false,
"esModuleInterop": false,
Expand Down
Loading

0 comments on commit 8d6ff56

Please sign in to comment.