From c7f9a6acc9bef0b2fb9baa722c96cd1ea4384519 Mon Sep 17 00:00:00 2001 From: Angus ZENG Date: Sun, 28 Apr 2024 20:32:34 +0800 Subject: [PATCH] feat(protocol): added experimental supports for transports between worker thread --- CHANGES.md | 1 + package-lock.json | 4 +- package.json | 2 +- .../worker-thread/benchmark-main-server.ts | 40 ++++ .../worker-thread/benchmark-worker-client.ts | 39 ++++ src/examples/worker-thread/main-client.ts | 130 +++++++++++ src/examples/worker-thread/main-server.ts | 114 ++++++++++ src/examples/worker-thread/worker-client.ts | 125 +++++++++++ src/examples/worker-thread/worker-server.ts | 110 ++++++++++ src/lib/transporters/worker-thread/README.md | 18 ++ .../worker-thread/WorkerThread.Client.ts | 88 ++++++++ .../worker-thread/WorkerThread.Constants.ts | 19 ++ .../worker-thread/WorkerThread.Errors.ts | 53 +++++ .../worker-thread/WorkerThread.Server.ts | 136 ++++++++++++ .../worker-thread/WorkerThread.Transporter.ts | 204 ++++++++++++++++++ src/lib/transporters/worker-thread/index.ts | 19 ++ 16 files changed, 1099 insertions(+), 3 deletions(-) create mode 100644 src/examples/worker-thread/benchmark-main-server.ts create mode 100644 src/examples/worker-thread/benchmark-worker-client.ts create mode 100644 src/examples/worker-thread/main-client.ts create mode 100644 src/examples/worker-thread/main-server.ts create mode 100644 src/examples/worker-thread/worker-client.ts create mode 100644 src/examples/worker-thread/worker-server.ts create mode 100644 src/lib/transporters/worker-thread/README.md create mode 100644 src/lib/transporters/worker-thread/WorkerThread.Client.ts create mode 100644 src/lib/transporters/worker-thread/WorkerThread.Constants.ts create mode 100644 src/lib/transporters/worker-thread/WorkerThread.Errors.ts create mode 100644 src/lib/transporters/worker-thread/WorkerThread.Server.ts create mode 100644 src/lib/transporters/worker-thread/WorkerThread.Transporter.ts create mode 100644 src/lib/transporters/worker-thread/index.ts diff --git a/CHANGES.md b/CHANGES.md index d9813da..26ed092 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,6 +2,7 @@ ## v1.1.0 +- feat(protocol): added experimental supports for transports between worker thread. - fix(protocol): Simplified the ITransporter.end method. - fix(decoder): should decode protocol error as special error. diff --git a/package-lock.json b/package-lock.json index e027c1a..35be41f 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@litert/televoke", - "version": "1.0.3", + "version": "1.1.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@litert/televoke", - "version": "1.0.3", + "version": "1.1.0", "license": "Apache-2.0", "devDependencies": { "@commitlint/cli": "^19.2.1", diff --git a/package.json b/package.json index 7dcf96a..cce4c8e 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@litert/televoke", - "version": "1.0.3", + "version": "1.1.0", "description": "A simple RPC service framework.", "main": "lib/index.js", "scripts": { diff --git a/src/examples/worker-thread/benchmark-main-server.ts b/src/examples/worker-thread/benchmark-main-server.ts new file mode 100644 index 0000000..1d20bfb --- /dev/null +++ b/src/examples/worker-thread/benchmark-main-server.ts @@ -0,0 +1,40 @@ +/** + * Copyright 2024 Angus.Fenying + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as Tv from '../../lib'; +import * as NodeWorker from 'node:worker_threads'; +import * as WorkerThread from '../../lib/transporters/worker-thread'; + +const router = new Tv.Servers.SimpleJsonApiRouter() + .registerApi('say', (ctx, text: string): string => text); + +const server = new Tv.Servers.TvServer(router) + .on('error', (e) => { console.error(e); }); + +const wtGateway = WorkerThread.createMainThreadGateway(server); + +(async () => { + + await wtGateway.start(); + + const benchmarkWorker = new NodeWorker.Worker(`${__dirname}/benchmark-worker-client.js`, {}) + .on('online', () => { + wtGateway.registerWorker(benchmarkWorker); + }); + + console.log('Server started.'); + +})().catch(console.error); diff --git a/src/examples/worker-thread/benchmark-worker-client.ts b/src/examples/worker-thread/benchmark-worker-client.ts new file mode 100644 index 0000000..c20c349 --- /dev/null +++ b/src/examples/worker-thread/benchmark-worker-client.ts @@ -0,0 +1,39 @@ +/** + * Copyright 2024 Angus.Fenying + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as Tv from '../../lib'; +import * as WorkerThread from '../../lib/transporters/worker-thread'; +import { IApis } from '../network/shared'; + +const cases = new Array(50000).fill(0) as number[]; + +(async () => { + + const client: Tv.Clients.IClient = Tv.Clients.createJsonApiClient( + WorkerThread.connectToMainThreadServer() + ); + + client.on('error', (e) => console.error(`[Client] Unexpected error: ${e}`)); + + for (let i = 0; i < 10; i++) { + console.time('[televoke2/worker-thread] 50000 requests'); + await Promise.all(cases.map(() => client.invoke('say', 'test'))); + console.timeEnd('[televoke2/worker-thread] 50000 requests'); + } + + client.close(); + +})().catch(console.error); diff --git a/src/examples/worker-thread/main-client.ts b/src/examples/worker-thread/main-client.ts new file mode 100644 index 0000000..2bec18a --- /dev/null +++ b/src/examples/worker-thread/main-client.ts @@ -0,0 +1,130 @@ +/** + * Copyright 2024 Angus.Fenying + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as Tv from '../../lib'; +import * as NodeWorker from 'node:worker_threads'; +import * as WorkerThread from '../../lib/transporters/worker-thread'; +import { IApis, sleep, testSendingStream, testRecvStream, holdProcess } from '../network/shared'; + +holdProcess(); + +(async () => { + + const worker = new NodeWorker.Worker(`${__dirname}/worker-server.js`, {}); + + const client: Tv.Clients.IClient = Tv.Clients.createJsonApiClient( + WorkerThread.connectToWorkerThreadServer(worker) + ); + + await new Promise((resolve) => worker.on('online', resolve)); + + client.on('error', (e) => console.error(`[Client] Unexpected error: ${e}`)); + + client.on('push_message', (msg) => { + + console.log('[Client] Message from server: ' + Buffer.concat(msg).toString()); + }); + + do { + + await sleep(100); + + try { + + switch (([ + 'debug', + 'test_bad_response', + 'test_bad_response_async', + 'startStream2Server', + 'startStream2Client', + 'hi', + 'shit', + ] as const)[Math.floor(Math.random() * 7)]) { + case 'debug': + console.log('[Client] [Start Invoke] debug'); + await client.invoke('debug', new Date() + ': Hello, world!'); + console.log('[Client] [End Invoke] debug'); + break; + case 'test_bad_response': + console.log('[Client] [Start Invoke] test_bad_response'); + await client.invoke('test_bad_response'); + console.log('[Client] [End Invoke] test_bad_response'); + break; + case 'test_bad_response_async': + console.log('[Client] [Start Invoke] test_bad_response_async'); + await client.invoke('test_bad_response_async'); + console.log('[Client] [End Invoke] test_bad_response_async'); + break; + case 'shit': + console.log('[Client] [Start Invoke] shit'); + try { + await client.invoke('shit'); + } + catch (e) { + console.error(e); + } + console.log('[Client] [End Invoke] shit'); + break; + case 'hi': + console.log('[Client] [Start Invoke] hi'); + console.log('Response: ' + await client.invoke('hi', new Date() + ': Hello, world!')); + console.log('[Client] [End Invoke] hi'); + break; + case 'startStream2Server': { + console.log('[Client] [Start Invoke] startStream2Server'); + const streamId = await client.invoke('startStream2Server'); + console.log('[Client] [End Invoke] startStream2Server'); + setImmediate(() => { + testSendingStream(client, streamId, 'Client').catch(console.error); + }); + break; + } + case 'startStream2Client': { + console.log('[Client] [Start Invoke] startStream2Client'); + if (!client.transporter?.writable) { + + await client.connect(); + } + const stream = client.streams.create(); + testRecvStream(stream, 'Client'); + await client.invoke('startStream2Client', stream.id); + console.log('[Client] [End Invoke] startStream2Client'); + break; + } + // case 'serverShouldCloseConn': + // if (Math.random() < 0.1) { + // console.log('[Client] [Start Invoke] serverShouldCloseConn'); + // await client.invoke('serverShouldCloseConn'); + // console.log('[Client] [End Invoke] serverShouldCloseConn'); + // } + // break; + // case 'clientShouldCloseConn': { + // if (Math.random() < 0.1) { + // console.log('[Client] clientShouldCloseConn'); + // client.close(); + // } + // break; + // } + } + } + catch (e) { + + console.error('[Client] Error: ', e); + } + } + while (1); + +})().catch(console.error); diff --git a/src/examples/worker-thread/main-server.ts b/src/examples/worker-thread/main-server.ts new file mode 100644 index 0000000..2db8e5d --- /dev/null +++ b/src/examples/worker-thread/main-server.ts @@ -0,0 +1,114 @@ +/** + * Copyright 2024 Angus.Fenying + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as Tv from '../../lib'; +import * as NodeWorker from 'node:worker_threads'; +import * as WorkerThread from '../../lib/transporters/worker-thread'; +import { holdProcess, testRecvStream, testSendingStream } from '../network/shared'; + +const router = new Tv.Servers.SimpleJsonApiRouter(); + +const server = new Tv.Servers.TvServer(router) + .on('error', (e) => { console.error(e); }); + +router + .registerApi('debug', (ctx, text: string): void => { + + console.log(`[Server] Channel#${ctx.channel.id} invoked debug`); + console.log(`[Server] Client says: ${text}`); + + if (Math.random() > 0.5) { + console.log('[Server] Sent a message to client'); + ctx.channel.sendMessage('hello').catch(console.error); + } + }) + .registerApi('test_bad_response', (ctx): unknown => { + + console.log(`[Server] Channel#${ctx.channel.id} invoked test_bad_response`); + + return { + f: BigInt(123) + }; + }) + .registerApi('test_bad_response_async', (ctx): Promise => { + + console.log(`[Server] Channel#${ctx.channel.id} invoked test_bad_response`); + + return Promise.resolve({ + f: BigInt(123) + }); + }) + .registerApi('say', (ctx, text: string): string => { + + return text; + }) + .registerApi('startStream2Server', (ctx): number => { + + console.log(`[Server] Channel#${ctx.channel.id} invoked startStream2Server`); + const stream = ctx.channel.streams.create(); + console.log(`[Server] Opened stream #${stream.id}`); + + testRecvStream(stream, 'Server'); + + return stream.id; + }) + .registerApi('serverShouldCloseConn', (ctx): void => { + + console.log(`[Server] Channel#${ctx.channel.id} invoked serverShouldCloseConn`); + + ctx.channel.close(); + }) + .registerApi('hi', (ctx, text: string): string => { + + console.log(`[Server] Channel#${ctx.channel.id} invoked hi`); + + return Buffer.from(text).toString('base64url'); + }) + .registerApi('shit', (ctx): string => { + + console.log(`[Server] Channel#${ctx.channel.id} invoked shit`); + + if (Math.random() > 0.5) { + throw { test: 'this error should be unclear to clients' }; + } + else { + throw new Tv.TvErrorResponse({ + test: 'this error should be visible to clients' + }); + } + }) + .registerApi('startStream2Client', (ctx, streamId: number): void => { + + console.log(`[Server] Channel#${ctx.channel.id} invoked startStream2Client`); + testSendingStream(ctx.channel, streamId, 'Server').catch(console.error); + }); + +const wtGateway = WorkerThread.createMainThreadGateway(server); + +holdProcess(); + +(async () => { + + await wtGateway.start(); + + const worker = new NodeWorker.Worker(`${__dirname}/worker-client.js`, {}) + .on('online', () => { + wtGateway.registerWorker(worker); + }); + + console.log('Server started.'); + +})().catch(console.error); diff --git a/src/examples/worker-thread/worker-client.ts b/src/examples/worker-thread/worker-client.ts new file mode 100644 index 0000000..d856abc --- /dev/null +++ b/src/examples/worker-thread/worker-client.ts @@ -0,0 +1,125 @@ +/** + * Copyright 2024 Angus.Fenying + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as Tv from '../../lib'; +import * as WorkerThread from '../../lib/transporters/worker-thread'; +import { IApis, sleep, testSendingStream, testRecvStream, holdProcess } from '../network/shared'; + +holdProcess(); + +(async () => { + + const client: Tv.Clients.IClient = Tv.Clients.createJsonApiClient( + WorkerThread.connectToMainThreadServer() + ); + + client.on('error', (e) => console.error(`[Client] Unexpected error: ${e}`)); + + client.on('push_message', (msg) => { + + console.log('[Client] Message from server: ' + Buffer.concat(msg).toString()); + }); + + do { + + await sleep(100); + + try { + + switch (([ + 'debug', + 'test_bad_response', + 'test_bad_response_async', + 'startStream2Server', + 'startStream2Client', + 'hi', + 'shit', + ] as const)[Math.floor(Math.random() * 7)]) { + case 'debug': + console.log('[Client] [Start Invoke] debug'); + await client.invoke('debug', new Date() + ': Hello, world!'); + console.log('[Client] [End Invoke] debug'); + break; + case 'test_bad_response': + console.log('[Client] [Start Invoke] test_bad_response'); + await client.invoke('test_bad_response'); + console.log('[Client] [End Invoke] test_bad_response'); + break; + case 'test_bad_response_async': + console.log('[Client] [Start Invoke] test_bad_response_async'); + await client.invoke('test_bad_response_async'); + console.log('[Client] [End Invoke] test_bad_response_async'); + break; + case 'shit': + console.log('[Client] [Start Invoke] shit'); + try { + await client.invoke('shit'); + } + catch (e) { + console.error(e); + } + console.log('[Client] [End Invoke] shit'); + break; + case 'hi': + console.log('[Client] [Start Invoke] hi'); + console.log('Response: ' + await client.invoke('hi', new Date() + ': Hello, world!')); + console.log('[Client] [End Invoke] hi'); + break; + case 'startStream2Server': { + console.log('[Client] [Start Invoke] startStream2Server'); + const streamId = await client.invoke('startStream2Server'); + console.log('[Client] [End Invoke] startStream2Server'); + setImmediate(() => { + testSendingStream(client, streamId, 'Client').catch(console.error); + }); + break; + } + case 'startStream2Client': { + console.log('[Client] [Start Invoke] startStream2Client'); + if (!client.transporter?.writable) { + + await client.connect(); + } + const stream = client.streams.create(); + testRecvStream(stream, 'Client'); + await client.invoke('startStream2Client', stream.id); + console.log('[Client] [End Invoke] startStream2Client'); + break; + } + // case 'serverShouldCloseConn': + // if (Math.random() < 0.1) { + // console.log('[Client] [Start Invoke] serverShouldCloseConn'); + // await client.invoke('serverShouldCloseConn'); + // console.log('[Client] [End Invoke] serverShouldCloseConn'); + // } + // break; + // case 'clientShouldCloseConn': { + // if (Math.random() < 0.1) { + // console.log('[Client] clientShouldCloseConn'); + // client.close(); + // } + // break; + // } + } + } + catch (e) { + + console.error('[Client] Error: ', e); + } + } + while (1); + +})().catch(console.error); diff --git a/src/examples/worker-thread/worker-server.ts b/src/examples/worker-thread/worker-server.ts new file mode 100644 index 0000000..b075634 --- /dev/null +++ b/src/examples/worker-thread/worker-server.ts @@ -0,0 +1,110 @@ +/** + * Copyright 2024 Angus.Fenying + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as Tv from '../../lib'; +import * as WorkerThread from '../../lib/transporters/worker-thread'; +import { holdProcess, testRecvStream, testSendingStream } from '../network/shared'; + +const router = new Tv.Servers.SimpleJsonApiRouter(); + +const server = new Tv.Servers.TvServer(router) + .on('error', (e) => { console.error(e); }); + +router + .registerApi('debug', (ctx, text: string): void => { + + console.log(`[Server] Channel#${ctx.channel.id} invoked debug`); + console.log(`[Server] Client says: ${text}`); + + if (Math.random() > 0.5) { + console.log('[Server] Sent a message to client'); + ctx.channel.sendMessage('hello').catch(console.error); + } + }) + .registerApi('test_bad_response', (ctx): unknown => { + + console.log(`[Server] Channel#${ctx.channel.id} invoked test_bad_response`); + + return { + f: BigInt(123) + }; + }) + .registerApi('test_bad_response_async', (ctx): Promise => { + + console.log(`[Server] Channel#${ctx.channel.id} invoked test_bad_response`); + + return Promise.resolve({ + f: BigInt(123) + }); + }) + .registerApi('say', (ctx, text: string): string => { + + return text; + }) + .registerApi('startStream2Server', (ctx): number => { + + console.log(`[Server] Channel#${ctx.channel.id} invoked startStream2Server`); + const stream = ctx.channel.streams.create(); + console.log(`[Server] Opened stream #${stream.id}`); + + testRecvStream(stream, 'Server'); + + return stream.id; + }) + .registerApi('serverShouldCloseConn', (ctx): void => { + + console.log(`[Server] Channel#${ctx.channel.id} invoked serverShouldCloseConn`); + + ctx.channel.close(); + }) + .registerApi('hi', (ctx, text: string): string => { + + console.log(`[Server] Channel#${ctx.channel.id} invoked hi`); + + return Buffer.from(text).toString('base64url'); + }) + .registerApi('shit', (ctx): string => { + + console.log(`[Server] Channel#${ctx.channel.id} invoked shit`); + + if (Math.random() > 0.5) { + throw { test: 'this error should be unclear to clients' }; + } + else { + throw new Tv.TvErrorResponse({ + test: 'this error should be visible to clients' + }); + } + }) + .registerApi('startStream2Client', (ctx, streamId: number): void => { + + console.log(`[Server] Channel#${ctx.channel.id} invoked startStream2Client`); + testSendingStream(ctx.channel, streamId, 'Server').catch(console.error); + }); + +holdProcess(); + +const wtGateway = WorkerThread.createWorkerThreadGateway(server); + +(async () => { + + await wtGateway.start(); + + console.log('Server started.'); + +})().catch(console.error); + + diff --git a/src/lib/transporters/worker-thread/README.md b/src/lib/transporters/worker-thread/README.md new file mode 100644 index 0000000..c965027 --- /dev/null +++ b/src/lib/transporters/worker-thread/README.md @@ -0,0 +1,18 @@ +# Televoke2/WorkerThread + +This module provides the Televoke2 protocol over Node.js Worker Threads, which allows you to connect the main thread and +worker threads with the Televoke2 protocol. + +You could either make the main thread as a server or a client, and the worker threads as clients or servers. + +## Examples + +- Main Thread As Server + + - Check the [server code (main thread)](../../../examples/worker-thread/main-server.ts) + - Check the [client code (worker thread)](../../../examples/worker-thread/worker-client.ts) + +- Worker Thread As Server + + - Check the [server code (worker thread)](../../../examples/worker-thread/worker-server.ts) + - Check the [client code (main thread)](../../../examples/worker-thread/main-client.ts) diff --git a/src/lib/transporters/worker-thread/WorkerThread.Client.ts b/src/lib/transporters/worker-thread/WorkerThread.Client.ts new file mode 100644 index 0000000..107d9dd --- /dev/null +++ b/src/lib/transporters/worker-thread/WorkerThread.Client.ts @@ -0,0 +1,88 @@ +/** + * Copyright 2024 Angus.Fenying + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as NodeWorker from 'node:worker_threads'; +import type * as dT from '../Transporter.decl'; +import * as eWT from './WorkerThread.Errors'; +import { MAIN_THREAD_PROTOCOL_NAME } from './WorkerThread.Constants'; +import { MainThreadTransporter, WorkerThreadTransporter } from './WorkerThread.Transporter'; + +class MainServerConnector implements dT.IConnector { + + private _transporter: dT.ITransporter | null = null; + + public async connect(): Promise { + + return Promise.resolve(this._transporter ??= new WorkerThreadTransporter(MAIN_THREAD_PROTOCOL_NAME)); + } +} + +class WorkerServerConnector implements dT.IConnector { + + private _worker: NodeWorker.Worker | null = null; + + public constructor(worker: NodeWorker.Worker) { + + this._worker = worker + .on('exit', () => { this._worker = null; }); + } + + private _transporter: dT.ITransporter | null = null; + + public async connect(): Promise { + + if (!this._worker) { + + throw new eWT.E_WORKER_THREAD_OFFLINE(); + } + + this._transporter ??= new MainThreadTransporter(MAIN_THREAD_PROTOCOL_NAME, this._worker); + + return Promise.resolve(this._transporter); + } +} + +let inst: MainServerConnector | null = null; + +/** + * Create a connector to connect to the server running in the main thread. + * + * @experimental + */ +export function connectToMainThreadServer(): dT.IConnector { + + if (NodeWorker.isMainThread) { + + throw new eWT.E_WORKER_THREAD_ONLY(); + } + + return inst ??= new MainServerConnector(); +} + +/** + * Create a connector to connect to the server running in the worker thread. + * + * @experimental + */ +export function connectToWorkerThreadServer(worker: NodeWorker.Worker): dT.IConnector { + + if (!NodeWorker.isMainThread) { + + throw new eWT.E_MAIN_THREAD_ONLY(); + } + + return new WorkerServerConnector(worker); +} diff --git a/src/lib/transporters/worker-thread/WorkerThread.Constants.ts b/src/lib/transporters/worker-thread/WorkerThread.Constants.ts new file mode 100644 index 0000000..d7965b8 --- /dev/null +++ b/src/lib/transporters/worker-thread/WorkerThread.Constants.ts @@ -0,0 +1,19 @@ +/** + * Copyright 2024 Angus.Fenying + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +export const MAIN_THREAD_PROTOCOL_NAME = 'worker_thread/main'; + +export const WORKER_THREAD_PROTOCOL_NAME = 'worker_thread/worker'; diff --git a/src/lib/transporters/worker-thread/WorkerThread.Errors.ts b/src/lib/transporters/worker-thread/WorkerThread.Errors.ts new file mode 100644 index 0000000..316ac0f --- /dev/null +++ b/src/lib/transporters/worker-thread/WorkerThread.Errors.ts @@ -0,0 +1,53 @@ +/** + * Copyright 2024 Angus.Fenying + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { TelevokeError } from '../../shared'; + +export const E_WORKER_THREAD_OFFLINE = class extends TelevokeError { + + public constructor(origin: unknown = null) { + + super( + 'worker_thread_offline', + 'The worker thread is offline.', + origin + ); + } +}; + +export const E_MAIN_THREAD_ONLY = class extends TelevokeError { + + public constructor(origin: unknown = null) { + + super( + 'main_thread_only', + 'The operation is only allowed in the main thread.', + origin + ); + } +}; + +export const E_WORKER_THREAD_ONLY = class extends TelevokeError { + + public constructor(origin: unknown = null) { + + super( + 'worker_thread_only', + 'The operation is only allowed in a worker thread.', + origin + ); + } +}; diff --git a/src/lib/transporters/worker-thread/WorkerThread.Server.ts b/src/lib/transporters/worker-thread/WorkerThread.Server.ts new file mode 100644 index 0000000..c4edf1c --- /dev/null +++ b/src/lib/transporters/worker-thread/WorkerThread.Server.ts @@ -0,0 +1,136 @@ +/** + * Copyright 2024 Angus.Fenying + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as NodeWorker from 'node:worker_threads'; +import type * as dT from '../Transporter.decl'; +import * as cWT from './WorkerThread.Constants'; +import { EventEmitter } from 'node:events'; +import { MainThreadTransporter, WorkerThreadTransporter } from './WorkerThread.Transporter'; +import * as eWT from './WorkerThread.Errors'; + +export interface IMainThreadGateway extends dT.IGateway { + + /** + * Add a worker thread to the gateway, so that the gateway can communicate with it. + * + * > The worker thread must be online already. + * + * @param worker The worker instance on the main thread. + */ + registerWorker(worker: NodeWorker.Worker): void; +} + +const VOID_OK_PROMISE = Promise.resolve(); + +class MainThreadGateway extends EventEmitter implements IMainThreadGateway { + + public constructor( + private readonly _server: dT.IServer, + ) { super(); } + + public get running(): boolean { + + return true; + } + + public start(): Promise { + + return VOID_OK_PROMISE; + } + + public stop(): Promise { + + return VOID_OK_PROMISE; + } + + public registerWorker(worker: NodeWorker.Worker): void { + + if (worker?.threadId === -1) { + + throw new eWT.E_WORKER_THREAD_OFFLINE(); + } + + this._server.registerChannel(new MainThreadTransporter(cWT.MAIN_THREAD_PROTOCOL_NAME, worker)); + } +} + +class WorkerThreadGateway extends EventEmitter implements dT.IGateway { + + private _transporter: WorkerThreadTransporter | null = null; + + public constructor( + private readonly _server: dT.IServer, + ) { super(); } + + public get running(): boolean { + + return !!this._transporter; + } + + public start(): Promise { + + if (this._transporter) { + + return VOID_OK_PROMISE; + } + + this._server.registerChannel(this._transporter = new WorkerThreadTransporter(cWT.WORKER_THREAD_PROTOCOL_NAME)); + + return VOID_OK_PROMISE; + } + + public stop(): Promise { + + this._transporter?.end(); + this._transporter = null; + + return VOID_OK_PROMISE; + } +} + +/** + * Create a gateway that runs in the main thread. + * + * @param server The server instance. + * @experimental + */ +export function createMainThreadGateway(server: dT.IServer): IMainThreadGateway { + + if (!NodeWorker.isMainThread) { + + throw new eWT.E_MAIN_THREAD_ONLY(); + } + + return new MainThreadGateway(server); +} + +let workerThreadGateway: WorkerThreadGateway | null = null; + +/** + * Create a gateway that runs in the worker thread. + * + * @param server The server instance. + * @experimental + */ +export function createWorkerThreadGateway(server: dT.IServer): dT.IGateway { + + if (NodeWorker.isMainThread) { + + throw new eWT.E_WORKER_THREAD_ONLY(); + } + + return workerThreadGateway ??= new WorkerThreadGateway(server); +} diff --git a/src/lib/transporters/worker-thread/WorkerThread.Transporter.ts b/src/lib/transporters/worker-thread/WorkerThread.Transporter.ts new file mode 100644 index 0000000..87aefe2 --- /dev/null +++ b/src/lib/transporters/worker-thread/WorkerThread.Transporter.ts @@ -0,0 +1,204 @@ +/** + * Copyright 2024 Angus.Fenying + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as NodeWorker from 'node:worker_threads'; +import * as Shared from '../../shared'; +import type * as dT from '../Transporter.decl'; +import * as eWT from './WorkerThread.Errors'; +import { EventEmitter } from 'node:events'; + +const PROPERTY_NAMES = ['remoteAddress', 'remotePort', 'localAddress', 'localPort', 'threadId']; + +const MSG_PREFIX = 'televoke2://'; + +abstract class AbstractThreadTransporter extends EventEmitter { + + public constructor( + public readonly protocol: string + ) { + + super(); + } + + public getPropertyNames(): string[] { + + return PROPERTY_NAMES; + } + + public getAllProperties(): Record { + + return { + 'remoteAddress': '127.0.0.1', + 'remotePort': 0, + 'localAddress': '127.0.0.1', + 'localPort': 0, + 'threadId': NodeWorker.threadId, + }; + } + + public getProperty(name: string): unknown { + + switch (name) { + case 'localPort': + return 0; + case 'localAddress': + return '127.0.0.1'; + case 'remotePort': + return 0; + case 'remoteAddress': + return '127.0.0.1'; + case 'threadId': + return NodeWorker.threadId; + default: + return undefined; + } + } + + public abstract destroy(): void; + + public end(): void { + + this.destroy(); + } + + protected _write( + worker: NodeWorker.MessagePort | NodeWorker.Worker, + frame: Array + ): void { + + for (let i = 0; i < frame.length; i++) { + + const f = frame[i]; + + if (!(f instanceof Buffer)) { + + frame[i] = Buffer.from(f); + } + } + + try { + + worker.postMessage(MSG_PREFIX + Buffer.concat(frame as Buffer[]).toString('base64')); + } + catch (e) { + + throw new Shared.errors.network_error({ reason: 'conn_lost', cause: e }); + } + } + + protected _onMessage = (frame: unknown): void => { + + if (typeof frame !== 'string' || !frame.startsWith(MSG_PREFIX)) { + + return; + } + + this.emit('frame', [Buffer.from(frame.slice(MSG_PREFIX.length), 'base64')]); + }; +} + +export class MainThreadTransporter extends AbstractThreadTransporter implements dT.ITransporter, Shared.ITransporter { + + private _closed: boolean = false; + + public constructor( + protocol: string, + protected readonly _worker: NodeWorker.Worker + ) { + + super(protocol); + + this._worker + .on('message', this._onMessage) + .on('error', this._onError) + .on('exit', this._onExit); + } + + private readonly _onError = (e: unknown): void => { this.emit('error', e); }; + + private readonly _onExit = (): void => { this.destroy(); }; + + public destroy(): void { + + this._closed = true; + + this._worker.removeListener('message', this._onMessage); + this._worker.removeListener('error', this._onError); + this._worker.removeListener('exit', this._onExit); + + this.emit('close'); + } + + public get writable(): boolean { + + return !this._closed; + } + + public write(frame: Array): void { + + if (this._closed) { + + throw new Shared.errors.network_error({ reason: 'conn_lost' }); + } + + this._write(this._worker, frame); + } +} + +export class WorkerThreadTransporter extends AbstractThreadTransporter implements dT.ITransporter, Shared.ITransporter { + + private _hold: NodeJS.Timeout | null = setInterval(() => { return; }, 3600_000); + + public constructor(protocol: string) { + + super(protocol); + + if (NodeWorker.isMainThread) { + + throw new eWT.E_WORKER_THREAD_ONLY(); + } + + NodeWorker.parentPort!.on('message', this._onMessage); + } + + public destroy(): void { + + if (this._hold) { + + clearInterval(this._hold); + this._hold = null; + + NodeWorker.parentPort!.removeListener('message', this._onMessage); + + this.emit('close'); + } + } + + public get writable(): boolean { + + return !!this._hold; + } + + public write(frame: Array): void { + + if (!this._hold) { + + throw new Shared.errors.network_error({ reason: 'conn_lost' }); + } + + super._write(NodeWorker.parentPort!, frame); + } +} diff --git a/src/lib/transporters/worker-thread/index.ts b/src/lib/transporters/worker-thread/index.ts new file mode 100644 index 0000000..430e328 --- /dev/null +++ b/src/lib/transporters/worker-thread/index.ts @@ -0,0 +1,19 @@ +/** + * Copyright 2024 Angus.Fenying + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +export * from './WorkerThread.Client'; +export * from './WorkerThread.Server'; +export * from './WorkerThread.Constants';