Skip to content

Commit

Permalink
feat(protocol): added experimental supports for transports between wo…
Browse files Browse the repository at this point in the history
…rker thread
  • Loading branch information
fenying committed Jun 30, 2024
1 parent cc7a135 commit c7f9a6a
Show file tree
Hide file tree
Showing 16 changed files with 1,099 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## v1.1.0

- feat(protocol): added experimental supports for transports between worker thread.
- fix(protocol): Simplified the ITransporter.end method.
- fix(decoder): should decode protocol error as special error.

Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@litert/televoke",
"version": "1.0.3",
"version": "1.1.0",
"description": "A simple RPC service framework.",
"main": "lib/index.js",
"scripts": {
Expand Down
40 changes: 40 additions & 0 deletions src/examples/worker-thread/benchmark-main-server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* Copyright 2024 Angus.Fenying <fenying@litert.org>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import * as Tv from '../../lib';
import * as NodeWorker from 'node:worker_threads';
import * as WorkerThread from '../../lib/transporters/worker-thread';

const router = new Tv.Servers.SimpleJsonApiRouter()
.registerApi('say', (ctx, text: string): string => text);

const server = new Tv.Servers.TvServer(router)
.on('error', (e) => { console.error(e); });

const wtGateway = WorkerThread.createMainThreadGateway(server);

(async () => {

await wtGateway.start();

const benchmarkWorker = new NodeWorker.Worker(`${__dirname}/benchmark-worker-client.js`, {})
.on('online', () => {
wtGateway.registerWorker(benchmarkWorker);
});

console.log('Server started.');

})().catch(console.error);
39 changes: 39 additions & 0 deletions src/examples/worker-thread/benchmark-worker-client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* Copyright 2024 Angus.Fenying <fenying@litert.org>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import * as Tv from '../../lib';
import * as WorkerThread from '../../lib/transporters/worker-thread';
import { IApis } from '../network/shared';

const cases = new Array(50000).fill(0) as number[];

(async () => {

const client: Tv.Clients.IClient<IApis> = Tv.Clients.createJsonApiClient<IApis>(
WorkerThread.connectToMainThreadServer()
);

client.on('error', (e) => console.error(`[Client] Unexpected error: ${e}`));

for (let i = 0; i < 10; i++) {
console.time('[televoke2/worker-thread] 50000 requests');
await Promise.all(cases.map(() => client.invoke('say', 'test')));
console.timeEnd('[televoke2/worker-thread] 50000 requests');
}

client.close();

})().catch(console.error);
130 changes: 130 additions & 0 deletions src/examples/worker-thread/main-client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/**
* Copyright 2024 Angus.Fenying <fenying@litert.org>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import * as Tv from '../../lib';
import * as NodeWorker from 'node:worker_threads';
import * as WorkerThread from '../../lib/transporters/worker-thread';
import { IApis, sleep, testSendingStream, testRecvStream, holdProcess } from '../network/shared';

holdProcess();

(async () => {

const worker = new NodeWorker.Worker(`${__dirname}/worker-server.js`, {});

const client: Tv.Clients.IClient<IApis> = Tv.Clients.createJsonApiClient<IApis>(
WorkerThread.connectToWorkerThreadServer(worker)
);

await new Promise<void>((resolve) => worker.on('online', resolve));

client.on('error', (e) => console.error(`[Client] Unexpected error: ${e}`));

client.on('push_message', (msg) => {

console.log('[Client] Message from server: ' + Buffer.concat(msg).toString());
});

do {

await sleep(100);

try {

switch (([
'debug',
'test_bad_response',
'test_bad_response_async',
'startStream2Server',
'startStream2Client',
'hi',
'shit',
] as const)[Math.floor(Math.random() * 7)]) {
case 'debug':
console.log('[Client] [Start Invoke] debug');
await client.invoke('debug', new Date() + ': Hello, world!');
console.log('[Client] [End Invoke] debug');
break;
case 'test_bad_response':
console.log('[Client] [Start Invoke] test_bad_response');
await client.invoke('test_bad_response');
console.log('[Client] [End Invoke] test_bad_response');
break;
case 'test_bad_response_async':
console.log('[Client] [Start Invoke] test_bad_response_async');
await client.invoke('test_bad_response_async');
console.log('[Client] [End Invoke] test_bad_response_async');
break;
case 'shit':
console.log('[Client] [Start Invoke] shit');
try {
await client.invoke('shit');
}
catch (e) {
console.error(e);
}
console.log('[Client] [End Invoke] shit');
break;
case 'hi':
console.log('[Client] [Start Invoke] hi');
console.log('Response: ' + await client.invoke('hi', new Date() + ': Hello, world!'));
console.log('[Client] [End Invoke] hi');
break;
case 'startStream2Server': {
console.log('[Client] [Start Invoke] startStream2Server');
const streamId = await client.invoke('startStream2Server');
console.log('[Client] [End Invoke] startStream2Server');
setImmediate(() => {
testSendingStream(client, streamId, 'Client').catch(console.error);
});
break;
}
case 'startStream2Client': {
console.log('[Client] [Start Invoke] startStream2Client');
if (!client.transporter?.writable) {

await client.connect();
}
const stream = client.streams.create();
testRecvStream(stream, 'Client');
await client.invoke('startStream2Client', stream.id);
console.log('[Client] [End Invoke] startStream2Client');
break;
}
// case 'serverShouldCloseConn':
// if (Math.random() < 0.1) {
// console.log('[Client] [Start Invoke] serverShouldCloseConn');
// await client.invoke('serverShouldCloseConn');
// console.log('[Client] [End Invoke] serverShouldCloseConn');
// }
// break;
// case 'clientShouldCloseConn': {
// if (Math.random() < 0.1) {
// console.log('[Client] clientShouldCloseConn');
// client.close();
// }
// break;
// }
}
}
catch (e) {

console.error('[Client] Error: ', e);
}
}
while (1);

})().catch(console.error);
114 changes: 114 additions & 0 deletions src/examples/worker-thread/main-server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/**
* Copyright 2024 Angus.Fenying <fenying@litert.org>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import * as Tv from '../../lib';
import * as NodeWorker from 'node:worker_threads';
import * as WorkerThread from '../../lib/transporters/worker-thread';
import { holdProcess, testRecvStream, testSendingStream } from '../network/shared';

const router = new Tv.Servers.SimpleJsonApiRouter();

const server = new Tv.Servers.TvServer(router)
.on('error', (e) => { console.error(e); });

router
.registerApi('debug', (ctx, text: string): void => {

console.log(`[Server] Channel#${ctx.channel.id} invoked debug`);
console.log(`[Server] Client says: ${text}`);

if (Math.random() > 0.5) {
console.log('[Server] Sent a message to client');
ctx.channel.sendMessage('hello').catch(console.error);
}
})
.registerApi('test_bad_response', (ctx): unknown => {

console.log(`[Server] Channel#${ctx.channel.id} invoked test_bad_response`);

return {
f: BigInt(123)
};
})
.registerApi('test_bad_response_async', (ctx): Promise<unknown> => {

console.log(`[Server] Channel#${ctx.channel.id} invoked test_bad_response`);

return Promise.resolve({
f: BigInt(123)
});
})
.registerApi('say', (ctx, text: string): string => {

return text;
})
.registerApi('startStream2Server', (ctx): number => {

console.log(`[Server] Channel#${ctx.channel.id} invoked startStream2Server`);
const stream = ctx.channel.streams.create();
console.log(`[Server] Opened stream #${stream.id}`);

testRecvStream(stream, 'Server');

return stream.id;
})
.registerApi('serverShouldCloseConn', (ctx): void => {

console.log(`[Server] Channel#${ctx.channel.id} invoked serverShouldCloseConn`);

ctx.channel.close();
})
.registerApi('hi', (ctx, text: string): string => {

console.log(`[Server] Channel#${ctx.channel.id} invoked hi`);

return Buffer.from(text).toString('base64url');
})
.registerApi('shit', (ctx): string => {

console.log(`[Server] Channel#${ctx.channel.id} invoked shit`);

if (Math.random() > 0.5) {
throw { test: 'this error should be unclear to clients' };
}
else {
throw new Tv.TvErrorResponse({
test: 'this error should be visible to clients'
});
}
})
.registerApi('startStream2Client', (ctx, streamId: number): void => {

console.log(`[Server] Channel#${ctx.channel.id} invoked startStream2Client`);
testSendingStream(ctx.channel, streamId, 'Server').catch(console.error);
});

const wtGateway = WorkerThread.createMainThreadGateway(server);

holdProcess();

(async () => {

await wtGateway.start();

const worker = new NodeWorker.Worker(`${__dirname}/worker-client.js`, {})
.on('online', () => {
wtGateway.registerWorker(worker);
});

console.log('Server started.');

})().catch(console.error);
Loading

0 comments on commit c7f9a6a

Please sign in to comment.