Skip to content

Commit

Permalink
Pass output to worker manager eagerly
Browse files Browse the repository at this point in the history
This means output is still available if we time out
  • Loading branch information
jcreedcmu committed Mar 15, 2024
1 parent aef969e commit 0782eef
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 63 deletions.
48 changes: 7 additions & 41 deletions src/twelf-service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { TwelfStatus, TwelfError, TwelfExecResponse, TwelfSideEffectData } from "./twelf-worker-types";
import { TwelfStatus, TwelfError, TwelfExecResponse, TwelfSideEffectData, TwelfExecStatus } from "./twelf-worker-types";
import { WasiSnapshotPreview1, args_get, args_sizes_get, clock_time_get, environ_sizes_get, fd_write } from "./wasi";

type TwelfExports = {
Expand All @@ -16,11 +16,10 @@ async function getWasm(url: string): Promise<ArrayBuffer> {
return (await fetch(url)).arrayBuffer();
}

export async function mkTwelfService(wasmLoc: string): Promise<TwelfService> {
export async function mkTwelfService(wasmLoc: string, outputCallback: (fd: number, str: string) => void): Promise<TwelfService> {
const twelfWasm = getWasm(wasmLoc);

let mem: WebAssembly.Memory | undefined;
const output: string[] = [];
const argv: string[] = ['twelf'];
const imports: { wasi_snapshot_preview1: WebAssembly.ModuleImports & WasiSnapshotPreview1 } = {
wasi_snapshot_preview1: {
Expand All @@ -39,7 +38,7 @@ export async function mkTwelfService(wasmLoc: string): Promise<TwelfService> {
fd_prestat_get: () => { debug('fd_prestat_get'); },
fd_read: () => { debug('fd_read'); },
fd_seek: () => { debug('fd_seek'); },
fd_write: (...args) => { debug('fd_write'); return fd_write(mem!, output, ...args); },
fd_write: (...args) => { debug('fd_write'); return fd_write(mem!, outputCallback, ...args); },

// Paths
path_filestat_get: () => { debug('path_filestat_get'); },
Expand All @@ -53,44 +52,14 @@ export async function mkTwelfService(wasmLoc: string): Promise<TwelfService> {
mem = exports.memory;
exports.twelf_open(0, 0);

return new TwelfService(source.instance, output);
return new TwelfService(source.instance);
}

export class TwelfService {

constructor(public instance: WebAssembly.Instance, public output: string[]) { }

timeoutStatus(): TwelfExecResponse {
return {
status: { t: 'timeout' },
...this.getSideEffectData()
}
}

getSideEffectData(): TwelfSideEffectData {
const errorRegex = new RegExp('string:(\\d+?).(\\d+?)-(\\d+?).(\\d+?) Error: \n(.*)', 'g');
let m;
const errors: TwelfError[] = [];
while (m = errorRegex.exec(this.output.join(''))) {
errors.push({
range: {
line1: parseInt(m[1]),
col1: parseInt(m[2]),
line2: parseInt(m[3]),
col2: parseInt(m[4]),
},
text: m[5],
});
}
return {
output: [...this.output],
errors,
}
}

async exec(input: string): Promise<TwelfExecResponse> {
this.output.splice(0); // Erase output
constructor(public instance: WebAssembly.Instance) { }

async exec(input: string): Promise<TwelfStatus> {
const exports = this.instance.exports as TwelfExports;
const mem = exports.memory;

Expand All @@ -113,9 +82,6 @@ export class TwelfService {
}
})();

return {
status: { t: 'twelfStatus', status },
...this.getSideEffectData(),
};
return status;
}
}
7 changes: 4 additions & 3 deletions src/twelf-worker-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ export type TwelfExecResponse = {

export type TwelfReadyResponse = {};

export type TwelfResponse =
| { t: 'ready', id: number, response: TwelfReadyResponse }
| { t: 'execResponse', id: number, response: TwelfExecResponse }
export type WorkerMessage =
| { t: 'ready' }
| { t: 'execResponse', id: number, response: TwelfStatus }
| { t: 'output', fd: number, str: string }
;
54 changes: 42 additions & 12 deletions src/twelf-worker.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { TwelfExecRequest, TwelfExecResponse, TwelfResponse, WithId } from "./twelf-worker-types";
import { TwelfExecRequest, TwelfExecResponse, WorkerMessage, WithId, TwelfSideEffectData, TwelfError } from "./twelf-worker-types";

export async function mkTwelfWorker(): Promise<TwelfWorker> {
const worker = new TwelfWorker();
Expand All @@ -8,15 +8,24 @@ export async function mkTwelfWorker(): Promise<TwelfWorker> {

export class TwelfWorker {
requestIdCounter: number = 0;
responseMap: Record<number, (result: TwelfResponse) => void>;
responseMap: Record<number, (result: WorkerMessage) => void>;
worker: Worker;
_readyPromise: Promise<void>;
output: string[] = [];

constructor() {
this.worker = new Worker('./assets/worker.js');
this.worker.onmessage = (msg) => {
const data: TwelfResponse = msg.data;
this.responseMap[data.id](data);
const data: WorkerMessage = msg.data;
switch (data.t) {
case 'ready':
this.responseMap[-1](data); break;
case 'execResponse':
this.responseMap[data.id](data); break;
case 'output':
this.output.push(data.str);
break;
}
}
this.responseMap = {};
const readyPromise = new Promise<void>((res, rej) => {
Expand All @@ -34,23 +43,42 @@ export class TwelfWorker {
};
}

getSideEffectData(): TwelfSideEffectData {
const errorRegex = new RegExp('string:(\\d+?).(\\d+?)-(\\d+?).(\\d+?) Error: \n(.*)', 'g');
let m;
const errors: TwelfError[] = [];
while (m = errorRegex.exec(this.output.join(''))) {
errors.push({
range: {
line1: parseInt(m[1]),
col1: parseInt(m[2]),
line2: parseInt(m[3]),
col2: parseInt(m[4]),
},
text: m[5],
});
}
return {
output: [...this.output],
errors,
}
}

async exec(input: string): Promise<TwelfExecResponse> {
this.output.splice(0); // clear output

const req = this.mkRequest(input);
const prom = new Promise<TwelfResponse>((res, rej) => {
const prom = new Promise<WorkerMessage>((res, rej) => {
this.responseMap[req.id] = res;
});
this.worker.postMessage(req);

return new Promise<TwelfExecResponse>((res, rej) => {
console.log('setting timer');
const t = setTimeout(() => {
console.log('timer reached');
this.worker.terminate();
res({
status: { t: 'timeout' },
// XXX we're missing output and errors
output: [],
errors: [],
...this.getSideEffectData(),
});
}, 2000);

Expand All @@ -59,9 +87,11 @@ export class TwelfWorker {
if (p.t != 'execResponse') {
throw new Error(`expected execReponse but got ${p.t}`);
}
console.log('clearing timer');
clearTimeout(t);
res(p.response);
res({
status: { t: 'twelfStatus', status: p.response },
...this.getSideEffectData(),
});
};

makeRequest();
Expand Down
11 changes: 9 additions & 2 deletions src/wasi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,14 @@ function readIOVectors(
return result;
}

export function fd_write(mem: WebAssembly.Memory, output: string[], _fd: number, ciovs_ptr: number, ciovs_len: number, retptr0: number): number {
export function fd_write(
mem: WebAssembly.Memory,
outputCallback: (fd: number, str: string) => void,
fd: number,
ciovs_ptr: number,
ciovs_len: number,
retptr0: number
): number {
const view = new DataView(mem.buffer);
const iovs = readIOVectors(view, ciovs_ptr, ciovs_len);
const decoder = new TextDecoder();
Expand All @@ -71,7 +78,7 @@ export function fd_write(mem: WebAssembly.Memory, output: string[], _fd: number,
if (iov.byteLength === 0) {
continue;
}
output.push(decoder.decode(iov));
outputCallback(fd, decoder.decode(iov));
bytesWritten += iov.byteLength;
}

Expand Down
12 changes: 7 additions & 5 deletions src/worker.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
import { mkTwelfService } from "./twelf-service";
import { TwelfStatus, TwelfExecRequest, TwelfExecResponse, TwelfResponse, WithId } from "./twelf-worker-types";
import { TwelfStatus, TwelfExecRequest, TwelfExecResponse, WorkerMessage, WithId } from "./twelf-worker-types";

async function go() {
const service = await mkTwelfService('./twelf.wasm');

function post(r: TwelfResponse): void {
function post(r: WorkerMessage): void {
self.postMessage(r);
}

const service = await mkTwelfService('./twelf.wasm', (fd, str) => {
post({ t: 'output', fd, str });
});

self.onmessage = async event => {
const { body, id } = event.data as WithId<TwelfExecRequest>;
post({ t: 'execResponse', id, response: await service.exec(body.input) });
};

post({ t: 'ready', id: -1, response: {} });
post({ t: 'ready' });
}

go();

0 comments on commit 0782eef

Please sign in to comment.