From 0d150f560a3c32f34cd4092d03cb3262a6009ef0 Mon Sep 17 00:00:00 2001 From: Angus ZENG Date: Sun, 30 Jun 2024 17:59:44 +0800 Subject: [PATCH] fix(test): remade the examples --- .vscode/launch.json | 111 ++++++- src/examples/memory.ts | 134 +------- .../network/legacy-http-benchmark-unix.ts | 5 +- src/examples/network/legacy-http-benchmark.ts | 5 +- src/examples/network/legacy-http-client.ts | 58 +--- .../network/legacy-http-unix-client.ts | 46 +-- src/examples/network/legacy-https-client.ts | 46 +-- src/examples/network/lwdfx-tcp-benchmark.ts | 5 +- src/examples/network/lwdfx-tcp-client.ts | 90 +---- src/examples/network/lwdfx-tls-client.ts | 90 +---- src/examples/network/lwdfx-unix-benchmark.ts | 5 +- src/examples/network/lwdfx-unix-client.ts | 90 +---- src/examples/network/server.ts | 77 +---- src/examples/network/test-conn-reset.ts | 11 +- src/examples/network/ws-benchmark.ts | 5 +- .../network/ws-client-timeout-test.ts | 5 +- src/examples/network/ws-client.ts | 102 +----- src/examples/network/ws-unix-benchmark.ts | 5 +- src/examples/network/ws-unix-client.ts | 90 +---- src/examples/network/wss-client.ts | 90 +---- src/examples/shared/client.ts | 310 ++++++++++++++++++ src/examples/shared/decl.ts | 50 +++ src/examples/shared/log.ts | 46 +++ src/examples/shared/router.ts | 122 +++++++ .../{network/shared.ts => shared/stream.ts} | 73 ++--- src/examples/shared/test-utils.ts | 34 ++ .../worker-thread/benchmark-worker-client.ts | 4 +- src/examples/worker-thread/main-client.ts | 100 +----- src/examples/worker-thread/main-server.ts | 77 +---- src/examples/worker-thread/worker-client.ts | 100 +----- src/examples/worker-thread/worker-server.ts | 77 +---- 31 files changed, 776 insertions(+), 1287 deletions(-) create mode 100644 src/examples/shared/client.ts create mode 100644 src/examples/shared/decl.ts create mode 100644 src/examples/shared/log.ts create mode 100644 src/examples/shared/router.ts rename src/examples/{network/shared.ts => shared/stream.ts} (53%) create mode 100644 src/examples/shared/test-utils.ts diff --git a/.vscode/launch.json b/.vscode/launch.json index 617f2e8..65a6dbd 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -7,7 +7,40 @@ { "type": "node", "request": "launch", - "name": "[Televoke] Network: Start Server", + "name": "[Televoke] Example: Memory", + "program": "${workspaceFolder}/src/examples/memory.ts", + "sourceMaps": true, + "cwd": "${workspaceFolder}", + "outFiles": [ + "${workspaceFolder}/**/*.js", + ] + }, + { + "type": "node", + "request": "launch", + "name": "[Televoke] Example: Thread Worker - Server on Main Thread", + "program": "${workspaceFolder}/src/examples/worker-thread/main-server.ts", + "sourceMaps": true, + "cwd": "${workspaceFolder}", + "outFiles": [ + "${workspaceFolder}/**/*.js", + ] + }, + { + "type": "node", + "request": "launch", + "name": "[Televoke] Example: Thread Worker - Server on Worker Thread", + "program": "${workspaceFolder}/src/examples/worker-thread/main-client.ts", + "sourceMaps": true, + "cwd": "${workspaceFolder}", + "outFiles": [ + "${workspaceFolder}/**/*.js", + ] + }, + { + "type": "node", + "request": "launch", + "name": "[Televoke] Example: Network - Start Server", "program": "${workspaceFolder}/src/examples/network/server.ts", "sourceMaps": true, "cwd": "${workspaceFolder}", @@ -18,7 +51,7 @@ { "type": "node", "request": "launch", - "name": "[Televoke] Network: Start Legacy Http Client", + "name": "[Televoke] Example: Network - Start Legacy Http Client (TCP)", "program": "${workspaceFolder}/src/examples/network/legacy-http-client.ts", "sourceMaps": true, "cwd": "${workspaceFolder}", @@ -29,7 +62,62 @@ { "type": "node", "request": "launch", - "name": "[Televoke] Network: Start WebSocket Client", + "name": "[Televoke] Example: Network - Start Legacy Http Client (Unix Socket)", + "program": "${workspaceFolder}/src/examples/network/legacy-http-unix-client.ts", + "sourceMaps": true, + "cwd": "${workspaceFolder}", + "outFiles": [ + "${workspaceFolder}/**/*.js", + ] + }, + { + "type": "node", + "request": "launch", + "name": "[Televoke] Example: Network - Start Legacy Http Client (TLS)", + "program": "${workspaceFolder}/src/examples/network/legacy-http-client.ts", + "sourceMaps": true, + "cwd": "${workspaceFolder}", + "outFiles": [ + "${workspaceFolder}/**/*.js", + ] + }, + { + "type": "node", + "request": "launch", + "name": "[Televoke] Example: Network - Start LwDFX Client (TCP)", + "program": "${workspaceFolder}/src/examples/network/lwdfx-tcp-client.ts", + "sourceMaps": true, + "cwd": "${workspaceFolder}", + "outFiles": [ + "${workspaceFolder}/**/*.js", + ] + }, + { + "type": "node", + "request": "launch", + "name": "[Televoke] Example: Network - Start LwDFX Client (Unix Socket)", + "program": "${workspaceFolder}/src/examples/network/lwdfx-unix-client.ts", + "sourceMaps": true, + "cwd": "${workspaceFolder}", + "outFiles": [ + "${workspaceFolder}/**/*.js", + ] + }, + { + "type": "node", + "request": "launch", + "name": "[Televoke] Example: Network - Start LwDFX Client (TLS)", + "program": "${workspaceFolder}/src/examples/network/lwdfx-tls-client.ts", + "sourceMaps": true, + "cwd": "${workspaceFolder}", + "outFiles": [ + "${workspaceFolder}/**/*.js", + ] + }, + { + "type": "node", + "request": "launch", + "name": "[Televoke] Example: Network - Start WebSocket Client (TCP)", "program": "${workspaceFolder}/src/examples/network/ws-client.ts", "sourceMaps": true, "cwd": "${workspaceFolder}", @@ -40,13 +128,24 @@ { "type": "node", "request": "launch", - "name": "[Televoke] Demo: HTTPS Client", - "program": "${workspaceFolder}/src/examples/Https.ts", + "name": "[Televoke] Example: Network - Start WebSocket Client (Unix Socket)", + "program": "${workspaceFolder}/src/examples/network/ws-unix-client.ts", "sourceMaps": true, "cwd": "${workspaceFolder}", "outFiles": [ "${workspaceFolder}/**/*.js", ] - } + }, + { + "type": "node", + "request": "launch", + "name": "[Televoke] Example: Network - Start WebSocket Client (TLS)", + "program": "${workspaceFolder}/src/examples/network/wss-client.ts", + "sourceMaps": true, + "cwd": "${workspaceFolder}", + "outFiles": [ + "${workspaceFolder}/**/*.js", + ] + }, ] } \ No newline at end of file diff --git a/src/examples/memory.ts b/src/examples/memory.ts index 2ea2c91..aff18e8 100644 --- a/src/examples/memory.ts +++ b/src/examples/memory.ts @@ -14,150 +14,28 @@ * limitations under the License. */ -import * as Crypto from 'node:crypto'; import * as Tv from '../lib'; import * as Memory from '../lib/transporters/memory'; +import { IApis } from './shared/decl'; +import { doClientTest } from './shared/client'; +import { router } from './shared/router'; -const router = new Tv.Servers.SimpleJsonApiRouter(); - -const server = new Tv.Servers.TvServer(router); - -interface IApis { - - debug(text: string): void; - - startStream2Server(): number; - - startStream2Client(streamId: number): void; -} - -function sumBuffer(d: Buffer, b: number): number { - - // eslint-disable-next-line @typescript-eslint/prefer-for-of - for (let i = 0; i < d.length; i++) { - - b += d[i]; - } - - return b; -} - -router - .registerApi('debug', (ctx, text: string): void => { - - console.log(`[Server] Channel#${ctx.channel.id} invoked debug`); - console.log(`[Server] Client says: ${text}`); - - if (Math.random() > 0.5) { - console.log('[Server] Sent a message to client'); - ctx.channel.sendMessage('hello').catch(console.error); - } - }) - .registerApi('startStream2Server', (ctx): number => { - - console.log(`[Server] Channel#${ctx.channel.id} invoked startStream2Server`); - const stream = ctx.channel.streams.create(); - console.log(`[Server] Opened stream #${stream.id}`); - - testRecvStream(stream, 'Server'); - - return stream.id; - }) - .registerApi('startStream2Client', (ctx, streamId: number): void => { - - console.log(`[Server] Channel#${ctx.channel.id} invoked startStream2Client`); - testSendingStream(ctx.channel, streamId, 'Server').catch(console.error); - }); +const server = new Tv.Servers.TvServer(router) + .on('error', (e) => { console.error(e); }); const memoryGateway = Memory.createServer({ name: 'hello', 'server': server }, (s) => { server.registerChannel(s); }); -function sleep(ms: number): Promise { - - return new Promise((resolve) => { - - setTimeout(resolve, ms); - }); -} - -// setInterval(() => { return; }, 1000); - -function testRecvStream(stream: Tv.IBinaryReadStream, endpoint: 'Client' | 'Server'): void { - - let sum = 0; - stream.on('data', (chunk) => { - - sum = sumBuffer(chunk, sum); - - }).on('end', () => { - - console.log(`[${endpoint}] Stream #${stream.id} all received, sum = ${sum}`); - }); -} - -async function testSendingStream( - ch: Pick, 'sendBinaryChunk'>, - streamId: number, - endpoint: 'Client' | 'Server' -): Promise { - - const buffers = new Array(Math.ceil(Math.random() * 10)) - .fill(0) - .map(() => Crypto.randomBytes(Math.ceil(Math.random() * 1024))); - - let sum = 0; - - for (const [i, p] of buffers.entries()) { - - sum = sumBuffer(p, sum); - - await ch.sendBinaryChunk(streamId, i, p); - - await sleep(Math.floor(1000 * Math.random())); - } - - await ch.sendBinaryChunk(streamId, buffers.length, null); - - console.log(`[${endpoint}]: Stream #${streamId} all sent, sum = ${sum}`); -} - (async () => { await memoryGateway.start(); - console.log(`Server [${memoryGateway.name}] started.`); - const client: Tv.Clients.IClient = Tv.Clients.createJsonApiClient( Memory.createConnector(memoryGateway.name) ); - client.on('push_message', (msg) => { - - console.log('[Client] Message from server: ' + Buffer.concat(msg).toString()); - }); - - do { - - await sleep(100); - - switch ((['debug', 'startStream2Server', 'startStream2Client'] as const)[Math.floor(Math.random() * 3)]) { - case 'debug': - await client.invoke('debug', new Date() + ': Hello, world!'); - break; - case 'startStream2Server': - testSendingStream(client, await client.invoke('startStream2Server'), 'Client') - .catch(console.error); - break; - case 'startStream2Client': { - const stream = client.streams.create(); - await client.invoke('startStream2Client', stream.id); - testRecvStream(stream, 'Client'); - break; - } - } - } - while (1); + doClientTest(client); })().catch(console.error); diff --git a/src/examples/network/legacy-http-benchmark-unix.ts b/src/examples/network/legacy-http-benchmark-unix.ts index 1c3db66..2565686 100644 --- a/src/examples/network/legacy-http-benchmark-unix.ts +++ b/src/examples/network/legacy-http-benchmark-unix.ts @@ -15,7 +15,8 @@ */ import * as Tv from '../../lib'; -import { IApis, getClaOption } from './shared'; +import { getClaOption } from '../shared/test-utils'; +import { IApis } from '../shared/decl'; const cases = new Array(10000).fill(0) as number[]; @@ -29,7 +30,7 @@ const cases = new Array(10000).fill(0) as number[]; for (let i = 0; i < 10; i++) { console.time('[televoke1/http] 10000 requests'); - await Promise.all(cases.map(() => client.invoke('say', 'test'))); + await Promise.all(cases.map(() => client.invoke('echo', 'test'))); console.timeEnd('[televoke1/http] 10000 requests'); } diff --git a/src/examples/network/legacy-http-benchmark.ts b/src/examples/network/legacy-http-benchmark.ts index 8b3e4e0..b6030e4 100644 --- a/src/examples/network/legacy-http-benchmark.ts +++ b/src/examples/network/legacy-http-benchmark.ts @@ -15,7 +15,8 @@ */ import * as Tv from '../../lib'; -import { IApis, getClaOption } from './shared'; +import { getClaOption } from '../shared/test-utils'; +import { IApis } from '../shared/decl'; const cases = new Array(10000).fill(0) as number[]; @@ -30,7 +31,7 @@ const cases = new Array(10000).fill(0) as number[]; for (let i = 0; i < 10; i++) { console.time('[televoke1/http] 10000 requests'); - await Promise.all(cases.map(() => client.invoke('say', 'test'))); + await Promise.all(cases.map(() => client.invoke('echo', 'test'))); console.timeEnd('[televoke1/http] 10000 requests'); } diff --git a/src/examples/network/legacy-http-client.ts b/src/examples/network/legacy-http-client.ts index d3c3367..f35edc7 100644 --- a/src/examples/network/legacy-http-client.ts +++ b/src/examples/network/legacy-http-client.ts @@ -15,7 +15,9 @@ */ import * as Tv from '../../lib'; -import { IApis, getClaOption, holdProcess, sleep } from './shared'; +import { doClientTest } from '../shared/client'; +import { getClaOption, holdProcess } from '../shared/test-utils'; +import { IApis } from '../shared/decl'; holdProcess(); @@ -26,58 +28,6 @@ holdProcess(); hostname: getClaOption('hostname', '127.0.0.1'), }); - client.on('error', (e) => console.error(`[Client] Unexpected error: ${e}`)); - - do { - - await sleep(1000); - - try { - - switch (([ - 'debug', - 'test_bad_response', - 'test_bad_response_async', - 'hi', - 'shit', - ] as const)[Math.floor(Math.random() * 5)]) { - case 'debug': - console.log('[Client] [Start Invoke] debug'); - await client.invoke('debug', new Date() + ': Hello, world!'); - console.log('[Client] [End Invoke] debug'); - break; - case 'test_bad_response': - console.log('[Client] [Start Invoke] test_bad_response'); - await client.invoke('test_bad_response'); - console.log('[Client] [End Invoke] test_bad_response'); - break; - case 'test_bad_response_async': - console.log('[Client] [Start Invoke] test_bad_response_async'); - await client.invoke('test_bad_response_async'); - console.log('[Client] [End Invoke] test_bad_response_async'); - break; - case 'shit': - console.log('[Client] [Start Invoke] shit'); - try { - await client.invoke('shit'); - } - catch (e) { - console.error(e); - } - console.log('[Client] [End Invoke] shit'); - break; - case 'hi': - console.log('[Client] [Start Invoke] hi'); - console.log('Response: ' + await client.invoke('hi', new Date() + ': Hello, world!')); - console.log('[Client] [End Invoke] hi'); - break; - } - } - catch (e) { - - console.error('[Client] Error: ', e); - } - } - while (1); + doClientTest(client, true); })().catch(console.error); diff --git a/src/examples/network/legacy-http-unix-client.ts b/src/examples/network/legacy-http-unix-client.ts index 8575c9b..4b32334 100644 --- a/src/examples/network/legacy-http-unix-client.ts +++ b/src/examples/network/legacy-http-unix-client.ts @@ -15,7 +15,9 @@ */ import * as Tv from '../../lib'; -import { IApis, getClaOption, holdProcess, sleep } from './shared'; +import { doClientTest } from '../shared/client'; +import { getClaOption, holdProcess } from '../shared/test-utils'; +import { IApis } from '../shared/decl'; holdProcess(); @@ -25,46 +27,6 @@ holdProcess(); socketPath: getClaOption('socket-path', '/tmp/televoke2-http.sock'), }); - client.on('error', (e) => console.error(`[Client] Unexpected error: ${e}`)); - - do { - - await sleep(100); - - try { - - switch (([ - 'debug', - 'hi', - 'shit', - ] as const)[Math.floor(Math.random() * 3)]) { - case 'debug': - console.log('[Client] [Start Invoke] debug'); - await client.invoke('debug', new Date() + ': Hello, world!'); - console.log('[Client] [End Invoke] debug'); - break; - case 'shit': - console.log('[Client] [Start Invoke] shit'); - try { - await client.invoke('shit'); - } - catch (e) { - console.error(e); - } - console.log('[Client] [End Invoke] shit'); - break; - case 'hi': - console.log('[Client] [Start Invoke] hi'); - console.log('Response: ' + await client.invoke('hi', new Date() + ': Hello, world!')); - console.log('[Client] [End Invoke] hi'); - break; - } - } - catch (e) { - - console.error('[Client] Error: ', e); - } - } - while (1); + doClientTest(client, true); })().catch(console.error); diff --git a/src/examples/network/legacy-https-client.ts b/src/examples/network/legacy-https-client.ts index acc52a3..52e3d74 100644 --- a/src/examples/network/legacy-https-client.ts +++ b/src/examples/network/legacy-https-client.ts @@ -16,7 +16,9 @@ import * as FS from 'node:fs'; import * as Tv from '../../lib'; -import { IApis, getClaOption, holdProcess, sleep } from './shared'; +import { getClaOption, holdProcess } from '../shared/test-utils'; +import { IApis } from '../shared/decl'; +import { doClientTest } from '../shared/client'; holdProcess(); @@ -30,46 +32,6 @@ holdProcess(); 'ca': FS.readFileSync(`${__dirname}/../../debug/pki/ca.pem`), }); - client.on('error', (e) => console.error(`[Client] Unexpected error: ${e}`)); - - do { - - await sleep(100); - - try { - - switch (([ - 'debug', - 'hi', - 'shit', - ] as const)[Math.floor(Math.random() * 3)]) { - case 'debug': - console.log('[Client] [Start Invoke] debug'); - await client.invoke('debug', new Date() + ': Hello, world!'); - console.log('[Client] [End Invoke] debug'); - break; - case 'shit': - console.log('[Client] [Start Invoke] shit'); - try { - await client.invoke('shit'); - } - catch (e) { - console.error(e); - } - console.log('[Client] [End Invoke] shit'); - break; - case 'hi': - console.log('[Client] [Start Invoke] hi'); - console.log('Response: ' + await client.invoke('hi', new Date() + ': Hello, world!')); - console.log('[Client] [End Invoke] hi'); - break; - } - } - catch (e) { - - console.error('[Client] Error: ', e); - } - } - while (1); + doClientTest(client, true); })().catch(console.error); diff --git a/src/examples/network/lwdfx-tcp-benchmark.ts b/src/examples/network/lwdfx-tcp-benchmark.ts index e1011c2..1bb317d 100644 --- a/src/examples/network/lwdfx-tcp-benchmark.ts +++ b/src/examples/network/lwdfx-tcp-benchmark.ts @@ -16,7 +16,8 @@ import * as Tv from '../../lib'; import * as LwDfx from '../../lib/transporters/lwdfx'; -import { IApis, getClaOption } from './shared'; +import { getClaOption } from '../shared/test-utils'; +import { IApis } from '../shared/decl'; const cases = new Array(50000).fill(0) as number[]; @@ -33,7 +34,7 @@ const cases = new Array(50000).fill(0) as number[]; for (let i = 0; i < 10; i++) { console.time('[televoke2/lwdfx] 50000 requests'); - await Promise.all(cases.map(() => client.invoke('say', 'test'))); + await Promise.all(cases.map(() => client.invoke('echo', 'test'))); console.timeEnd('[televoke2/lwdfx] 50000 requests'); } diff --git a/src/examples/network/lwdfx-tcp-client.ts b/src/examples/network/lwdfx-tcp-client.ts index 812346e..7a640a8 100644 --- a/src/examples/network/lwdfx-tcp-client.ts +++ b/src/examples/network/lwdfx-tcp-client.ts @@ -16,7 +16,9 @@ import * as Tv from '../../lib'; import * as LwDfx from '../../lib/transporters/lwdfx'; -import { IApis, sleep, testSendingStream, testRecvStream, getClaOption, holdProcess } from './shared'; +import { getClaOption, holdProcess } from '../shared/test-utils'; +import { IApis } from '../shared/decl'; +import { doClientTest } from '../shared/client'; holdProcess(); @@ -29,90 +31,6 @@ holdProcess(); }) ); - client.on('error', (e) => { console.error(`[Client] Unexpected error: ${e as any}`); }); - - client.on('push_message', (msg) => { - - console.log('[Client] Message from server: ' + Buffer.concat(msg).toString()); - }); - - do { - - await sleep(100); - - try { - - switch (([ - 'debug', - 'startStream2Server', - 'startStream2Client', - 'serverShouldCloseConn', - 'clientShouldCloseConn', - 'hi', - 'shit', - ] as const)[Math.floor(Math.random() * 7)]) { - case 'debug': - console.log('[Client] [Start Invoke] debug'); - await client.invoke('debug', `${new Date().toISOString()}: Hello, world!`); - console.log('[Client] [End Invoke] debug'); - break; - case 'shit': - console.log('[Client] [Start Invoke] shit'); - try { - await client.invoke('shit'); - } - catch (e) { - console.error(e); - } - console.log('[Client] [End Invoke] shit'); - break; - case 'hi': - console.log('[Client] [Start Invoke] hi'); - console.log('Response: ' + await client.invoke('hi', new Date().toISOString() + ': Hello, world!')); - console.log('[Client] [End Invoke] hi'); - break; - case 'startStream2Server': { - console.log('[Client] [Start Invoke] startStream2Server'); - const streamId = await client.invoke('startStream2Server'); - console.log('[Client] [End Invoke] startStream2Server'); - setImmediate(() => { - testSendingStream(client, streamId, 'Client').catch(console.error); - }); - break; - } - case 'startStream2Client': { - console.log('[Client] [Start Invoke] startStream2Client'); - if (!client.transporter?.writable) { - - await client.connect(); - } - const stream = client.streams.create(); - testRecvStream(stream, 'Client'); - await client.invoke('startStream2Client', stream.id); - console.log('[Client] [End Invoke] startStream2Client'); - break; - } - case 'serverShouldCloseConn': - if (Math.random() < 0.01) { - console.log('[Client] [Start Invoke] serverShouldCloseConn'); - await client.invoke('serverShouldCloseConn'); - console.log('[Client] [End Invoke] serverShouldCloseConn'); - } - break; - case 'clientShouldCloseConn': { - if (Math.random() < 0.01) { - console.log('[Client] clientShouldCloseConn'); - client.close(); - } - break; - } - } - } - catch (e) { - - console.error('[Client] Error: ', e); - } - } - while (1); + await doClientTest(client); })().catch(console.error); diff --git a/src/examples/network/lwdfx-tls-client.ts b/src/examples/network/lwdfx-tls-client.ts index ef1681d..5dd364d 100644 --- a/src/examples/network/lwdfx-tls-client.ts +++ b/src/examples/network/lwdfx-tls-client.ts @@ -17,7 +17,9 @@ import * as FS from 'node:fs'; import * as Tv from '../../lib'; import * as LwDfx from '../../lib/transporters/lwdfx'; -import { IApis, sleep, testSendingStream, testRecvStream, getClaOption, holdProcess } from './shared'; +import { getClaOption, holdProcess } from '../shared/test-utils'; +import { IApis } from '../shared/decl'; +import { doClientTest } from '../shared/client'; holdProcess(); @@ -34,90 +36,6 @@ holdProcess(); }) ); - client.on('error', (e) => { console.error(`[Client] Unexpected error: ${e as any}`); }); - - client.on('push_message', (msg) => { - - console.log('[Client] Message from server: ' + Buffer.concat(msg).toString()); - }); - - do { - - await sleep(100); - - try { - - switch (([ - 'debug', - 'startStream2Server', - 'startStream2Client', - 'serverShouldCloseConn', - 'clientShouldCloseConn', - 'hi', - 'shit', - ] as const)[Math.floor(Math.random() * 7)]) { - case 'debug': - console.log('[Client] [Start Invoke] debug'); - await client.invoke('debug', `${new Date().toISOString()}: Hello, world!`); - console.log('[Client] [End Invoke] debug'); - break; - case 'shit': - console.log('[Client] [Start Invoke] shit'); - try { - await client.invoke('shit'); - } - catch (e) { - console.error(e); - } - console.log('[Client] [End Invoke] shit'); - break; - case 'hi': - console.log('[Client] [Start Invoke] hi'); - console.log('Response: ' + await client.invoke('hi', new Date().toISOString() + ': Hello, world!')); - console.log('[Client] [End Invoke] hi'); - break; - case 'startStream2Server': { - console.log('[Client] [Start Invoke] startStream2Server'); - const streamId = await client.invoke('startStream2Server'); - console.log('[Client] [End Invoke] startStream2Server'); - setImmediate(() => { - testSendingStream(client, streamId, 'Client').catch(console.error); - }); - break; - } - case 'startStream2Client': { - console.log('[Client] [Start Invoke] startStream2Client'); - if (!client.transporter?.writable) { - - await client.connect(); - } - const stream = client.streams.create(); - testRecvStream(stream, 'Client'); - await client.invoke('startStream2Client', stream.id); - console.log('[Client] [End Invoke] startStream2Client'); - break; - } - case 'serverShouldCloseConn': - if (Math.random() < 0.01) { - console.log('[Client] [Start Invoke] serverShouldCloseConn'); - await client.invoke('serverShouldCloseConn'); - console.log('[Client] [End Invoke] serverShouldCloseConn'); - } - break; - case 'clientShouldCloseConn': { - if (Math.random() < 0.01) { - console.log('[Client] clientShouldCloseConn'); - client.close(); - } - break; - } - } - } - catch (e) { - - console.error('[Client] Error: ', e); - } - } - while (1); + await doClientTest(client); })().catch(console.error); diff --git a/src/examples/network/lwdfx-unix-benchmark.ts b/src/examples/network/lwdfx-unix-benchmark.ts index 6cb4245..40cadf5 100644 --- a/src/examples/network/lwdfx-unix-benchmark.ts +++ b/src/examples/network/lwdfx-unix-benchmark.ts @@ -16,7 +16,8 @@ import * as Tv from '../../lib'; import * as LwDfx from '../../lib/transporters/lwdfx'; -import { IApis, getClaOption } from './shared'; +import { getClaOption } from '../shared/test-utils'; +import { IApis } from '../shared/decl'; const cases = new Array(50000).fill(0) as number[]; @@ -32,7 +33,7 @@ const cases = new Array(50000).fill(0) as number[]; for (let i = 0; i < 10; i++) { console.time('[televoke2/lwdfx/unix] 50000 requests'); - await Promise.all(cases.map(() => client.invoke('say', 'test'))); + await Promise.all(cases.map(() => client.invoke('echo', 'test'))); console.timeEnd('[televoke2/lwdfx/unix] 50000 requests'); } diff --git a/src/examples/network/lwdfx-unix-client.ts b/src/examples/network/lwdfx-unix-client.ts index cdc325c..c92a4cc 100644 --- a/src/examples/network/lwdfx-unix-client.ts +++ b/src/examples/network/lwdfx-unix-client.ts @@ -16,7 +16,9 @@ import * as Tv from '../../lib'; import * as LwDfx from '../../lib/transporters/lwdfx'; -import { IApis, sleep, testSendingStream, testRecvStream, getClaOption, holdProcess } from './shared'; +import { getClaOption, holdProcess } from '../shared/test-utils'; +import { IApis } from '../shared/decl'; +import { doClientTest } from '../shared/client'; holdProcess(); @@ -28,90 +30,6 @@ holdProcess(); }) ); - client.on('error', (e) => console.error(`[Client] Unexpected error: ${e}`)); - - client.on('push_message', (msg) => { - - console.log('[Client] Message from server: ' + Buffer.concat(msg).toString()); - }); - - do { - - await sleep(100); - - try { - - switch (([ - 'debug', - 'startStream2Server', - 'startStream2Client', - 'serverShouldCloseConn', - 'clientShouldCloseConn', - 'hi', - 'shit', - ] as const)[Math.floor(Math.random() * 7)]) { - case 'debug': - console.log('[Client] [Start Invoke] debug'); - await client.invoke('debug', new Date() + ': Hello, world!'); - console.log('[Client] [End Invoke] debug'); - break; - case 'shit': - console.log('[Client] [Start Invoke] shit'); - try { - await client.invoke('shit'); - } - catch (e) { - console.error(e); - } - console.log('[Client] [End Invoke] shit'); - break; - case 'hi': - console.log('[Client] [Start Invoke] hi'); - console.log('Response: ' + await client.invoke('hi', new Date() + ': Hello, world!')); - console.log('[Client] [End Invoke] hi'); - break; - case 'startStream2Server': { - console.log('[Client] [Start Invoke] startStream2Server'); - const streamId = await client.invoke('startStream2Server'); - console.log('[Client] [End Invoke] startStream2Server'); - setImmediate(() => { - testSendingStream(client, streamId, 'Client').catch(console.error); - }); - break; - } - case 'startStream2Client': { - console.log('[Client] [Start Invoke] startStream2Client'); - if (!client.transporter?.writable) { - - await client.connect(); - } - const stream = client.streams.create(); - testRecvStream(stream, 'Client'); - await client.invoke('startStream2Client', stream.id); - console.log('[Client] [End Invoke] startStream2Client'); - break; - } - case 'serverShouldCloseConn': - if (Math.random() < 0.01) { - console.log('[Client] [Start Invoke] serverShouldCloseConn'); - await client.invoke('serverShouldCloseConn'); - console.log('[Client] [End Invoke] serverShouldCloseConn'); - } - break; - case 'clientShouldCloseConn': { - if (Math.random() < 0.01) { - console.log('[Client] clientShouldCloseConn'); - client.close(); - } - break; - } - } - } - catch (e) { - - console.error('[Client] Error: ', e); - } - } - while (1); + await doClientTest(client); })().catch(console.error); diff --git a/src/examples/network/server.ts b/src/examples/network/server.ts index 75305af..df92bad 100644 --- a/src/examples/network/server.ts +++ b/src/examples/network/server.ts @@ -20,87 +20,14 @@ import * as LwDfx from '../../lib/transporters/lwdfx'; import * as LegacyHttp from '../../lib/transporters/legacy-http'; import * as HttpListener from '../../lib/transporters/http-listener'; import * as WebSocket from '../../lib/transporters/websocket'; -import { getClaOption, holdProcess, testRecvStream, testSendingStream } from './shared'; - -const router = new Tv.Servers.SimpleJsonApiRouter(); +import { getClaOption, holdProcess } from '../shared/test-utils'; +import { router } from '../shared/router'; const NETWORK_HOSTNAME = getClaOption('listen-hostname', '127.0.0.1'); const server = new Tv.Servers.TvServer(router) .on('error', (e) => { console.error(e); }); -router - .registerApi('debug', (ctx, text: string): void => { - - console.log(`[Server] Channel#${ctx.channel.id} invoked debug`); - console.log(`[Server] Client says: ${text}`); - - if (Math.random() > 0.5) { - console.log('[Server] Sent a message to client'); - ctx.channel.sendMessage('hello').catch(console.error); - } - }) - .registerApi('test_bad_response', (ctx): unknown => { - - console.log(`[Server] Channel#${ctx.channel.id} invoked test_bad_response`); - - return { - f: BigInt(123) - }; - }) - .registerApi('test_bad_response_async', (ctx): Promise => { - - console.log(`[Server] Channel#${ctx.channel.id} invoked test_bad_response`); - - return Promise.resolve({ - f: BigInt(123) - }); - }) - .registerApi('say', (ctx, text: string): string => { - - return text; - }) - .registerApi('startStream2Server', (ctx): number => { - - console.log(`[Server] Channel#${ctx.channel.id} invoked startStream2Server`); - const stream = ctx.channel.streams.create(); - console.log(`[Server] Opened stream #${stream.id}`); - - testRecvStream(stream, 'Server'); - - return stream.id; - }) - .registerApi('serverShouldCloseConn', (ctx): void => { - - console.log(`[Server] Channel#${ctx.channel.id} invoked serverShouldCloseConn`); - - ctx.channel.close(); - }) - .registerApi('hi', (ctx, text: string): string => { - - console.log(`[Server] Channel#${ctx.channel.id} invoked hi`); - - return Buffer.from(text).toString('base64url'); - }) - .registerApi('shit', (ctx): string => { - - console.log(`[Server] Channel#${ctx.channel.id} invoked shit`); - - if (Math.random() > 0.5) { - throw { test: 'this error should be unclear to clients' }; - } - else { - throw new Tv.TvErrorResponse({ - test: 'this error should be visible to clients' - }); - } - }) - .registerApi('startStream2Client', (ctx, streamId: number): void => { - - console.log(`[Server] Channel#${ctx.channel.id} invoked startStream2Client`); - testSendingStream(ctx.channel, streamId, 'Server').catch(console.error); - }); - const lwdfxTcpGateway = LwDfx.createTcpGateway(server, { port: parseInt(getClaOption('lwdfx-tcp-port', '8698')), hostname: NETWORK_HOSTNAME, diff --git a/src/examples/network/test-conn-reset.ts b/src/examples/network/test-conn-reset.ts index 6ec3b4b..479be90 100644 --- a/src/examples/network/test-conn-reset.ts +++ b/src/examples/network/test-conn-reset.ts @@ -15,7 +15,8 @@ */ import * as Tv from '../../lib'; -import { IApis, getClaOption } from './shared'; +import { getClaOption } from '../shared/test-utils'; +import { IApis } from '../shared/decl'; function syncSleep(ms: number): void { @@ -45,7 +46,7 @@ async function testLegacyHttpDisabledRetry(): Promise { console.log('- Case: Normal Invoke'); try { - await client.invoke('hi', new Date() + ': Hello, world!'); + await client.invoke('echo', new Date() + ': Hello, world!'); console.info(' PASSED: No exception thrown.'); } @@ -61,7 +62,7 @@ async function testLegacyHttpDisabledRetry(): Promise { try { - await client.invoke('hi', new Date() + ': Hello, world!'); + await client.invoke('echo', new Date() + ': Hello, world!'); console.error(' FAILED: No exception thrown.'); } @@ -87,7 +88,7 @@ async function testLegacyHttpEnabledRetry(): Promise { console.log('- Case: Normal Invoke'); try { - await client.invoke('hi', new Date() + ': Hello, world!'); + await client.invoke('echo', new Date() + ': Hello, world!'); console.info(' PASSED: No exception thrown.'); } @@ -103,7 +104,7 @@ async function testLegacyHttpEnabledRetry(): Promise { try { - await client.invoke('hi', new Date() + ': Hello, world!'); + await client.invoke('echo', new Date() + ': Hello, world!'); console.info(' PASSED: No exception thrown.'); } diff --git a/src/examples/network/ws-benchmark.ts b/src/examples/network/ws-benchmark.ts index a8a3c27..91fb6e1 100644 --- a/src/examples/network/ws-benchmark.ts +++ b/src/examples/network/ws-benchmark.ts @@ -16,7 +16,8 @@ import * as Tv from '../../lib'; import * as WebSocket from '../../lib/transporters/websocket'; -import { IApis, getClaOption } from './shared'; +import { getClaOption } from '../shared/test-utils'; +import { IApis } from '../shared/decl'; const cases = new Array(50000).fill(0) as number[]; @@ -33,7 +34,7 @@ const cases = new Array(50000).fill(0) as number[]; for (let i = 0; i < 10; i++) { console.time('[televoke2/ws] 50000 requests'); - await Promise.all(cases.map(() => client.invoke('say', 'test'))); + await Promise.all(cases.map(() => client.invoke('echo', 'test'))); console.timeEnd('[televoke2/ws] 50000 requests'); } diff --git a/src/examples/network/ws-client-timeout-test.ts b/src/examples/network/ws-client-timeout-test.ts index c41e66c..87f0462 100644 --- a/src/examples/network/ws-client-timeout-test.ts +++ b/src/examples/network/ws-client-timeout-test.ts @@ -16,7 +16,8 @@ import * as Tv from '../../lib'; import * as WebSocket from '../../lib/transporters/websocket'; -import { IApis, getClaOption, holdProcess, sleep, } from './shared'; +import { getClaOption, holdProcess, sleep } from '../shared/test-utils'; +import { IApis } from '../shared/decl'; holdProcess(); @@ -47,7 +48,7 @@ holdProcess(); while (1) { - await client.invoke('debug', new Date() + ': Hello, world!'); + await client.invoke('echo', new Date() + ': Hello, world!'); await sleep(invokeInterval); } diff --git a/src/examples/network/ws-client.ts b/src/examples/network/ws-client.ts index a396d68..d4ab887 100644 --- a/src/examples/network/ws-client.ts +++ b/src/examples/network/ws-client.ts @@ -16,7 +16,9 @@ import * as Tv from '../../lib'; import * as WebSocket from '../../lib/transporters/websocket'; -import { IApis, sleep, testSendingStream, testRecvStream, getClaOption, holdProcess } from './shared'; +import { doClientTest } from '../shared/client'; +import { getClaOption, holdProcess } from '../shared/test-utils'; +import { IApis } from '../shared/decl'; holdProcess(); @@ -29,102 +31,6 @@ holdProcess(); }) ); - client.on('error', (e) => console.error(`[Client] Unexpected error: ${e}`)); - - client.on('push_message', (msg) => { - - console.log('[Client] Message from server: ' + Buffer.concat(msg).toString()); - }); - - do { - - await sleep(1000); - - try { - - switch (([ - 'debug', - 'test_bad_response', - 'test_bad_response_async', - 'startStream2Server', - 'startStream2Client', - 'serverShouldCloseConn', - 'clientShouldCloseConn', - 'hi', - 'shit', - ] as const)[Math.floor(Math.random() * 9)]) { - case 'debug': - console.log('[Client] [Start Invoke] debug'); - await client.invoke('debug', new Date() + ': Hello, world!'); - console.log('[Client] [End Invoke] debug'); - break; - case 'test_bad_response': - console.log('[Client] [Start Invoke] test_bad_response'); - await client.invoke('test_bad_response'); - console.log('[Client] [End Invoke] test_bad_response'); - break; - case 'test_bad_response_async': - console.log('[Client] [Start Invoke] test_bad_response_async'); - await client.invoke('test_bad_response_async'); - console.log('[Client] [End Invoke] test_bad_response_async'); - break; - case 'shit': - console.log('[Client] [Start Invoke] shit'); - try { - await client.invoke('shit'); - } - catch (e) { - console.error(e); - } - console.log('[Client] [End Invoke] shit'); - break; - case 'hi': - console.log('[Client] [Start Invoke] hi'); - console.log('Response: ' + await client.invoke('hi', new Date() + ': Hello, world!')); - console.log('[Client] [End Invoke] hi'); - break; - case 'startStream2Server': { - console.log('[Client] [Start Invoke] startStream2Server'); - const streamId = await client.invoke('startStream2Server'); - console.log('[Client] [End Invoke] startStream2Server'); - setImmediate(() => { - testSendingStream(client, streamId, 'Client').catch(console.error); - }); - break; - } - case 'startStream2Client': { - console.log('[Client] [Start Invoke] startStream2Client'); - if (!client.transporter?.writable) { - - await client.connect(); - } - const stream = client.streams.create(); - testRecvStream(stream, 'Client'); - await client.invoke('startStream2Client', stream.id); - console.log('[Client] [End Invoke] startStream2Client'); - break; - } - case 'serverShouldCloseConn': - if (Math.random() < 0.1) { - console.log('[Client] [Start Invoke] serverShouldCloseConn'); - await client.invoke('serverShouldCloseConn'); - console.log('[Client] [End Invoke] serverShouldCloseConn'); - } - break; - case 'clientShouldCloseConn': { - if (Math.random() < 0.1) { - console.log('[Client] clientShouldCloseConn'); - client.close(); - } - break; - } - } - } - catch (e) { - - console.error('[Client] Error: ', e); - } - } - while (1); + await doClientTest(client); })().catch(console.error); diff --git a/src/examples/network/ws-unix-benchmark.ts b/src/examples/network/ws-unix-benchmark.ts index ced5d65..59b08c8 100644 --- a/src/examples/network/ws-unix-benchmark.ts +++ b/src/examples/network/ws-unix-benchmark.ts @@ -16,7 +16,8 @@ import * as Tv from '../../lib'; import * as WebSocket from '../../lib/transporters/websocket'; -import { IApis, getClaOption } from './shared'; +import { getClaOption } from '../shared/test-utils'; +import { IApis } from '../shared/decl'; const cases = new Array(50000).fill(0) as number[]; @@ -32,7 +33,7 @@ const cases = new Array(50000).fill(0) as number[]; for (let i = 0; i < 10; i++) { console.time('[televoke2/ws/unix] 50000 requests'); - await Promise.all(cases.map(() => client.invoke('say', 'test'))); + await Promise.all(cases.map(() => client.invoke('echo', 'test'))); console.timeEnd('[televoke2/ws/unix] 50000 requests'); } diff --git a/src/examples/network/ws-unix-client.ts b/src/examples/network/ws-unix-client.ts index 61eca35..454e9ba 100644 --- a/src/examples/network/ws-unix-client.ts +++ b/src/examples/network/ws-unix-client.ts @@ -16,7 +16,9 @@ import * as Tv from '../../lib'; import * as WebSocket from '../../lib/transporters/websocket'; -import { IApis, sleep, testSendingStream, testRecvStream, getClaOption, holdProcess } from './shared'; +import { getClaOption, holdProcess } from '../shared/test-utils'; +import { IApis } from '../shared/decl'; +import { doClientTest } from '../shared/client'; holdProcess(); @@ -28,90 +30,6 @@ holdProcess(); }) ); - client.on('error', (e) => console.error(`[Client] Unexpected error: ${e}`)); - - client.on('push_message', (msg) => { - - console.log('[Client] Message from server: ' + Buffer.concat(msg).toString()); - }); - - do { - - await sleep(100); - - try { - - switch (([ - 'debug', - 'startStream2Server', - 'startStream2Client', - 'serverShouldCloseConn', - 'clientShouldCloseConn', - 'hi', - 'shit', - ] as const)[Math.floor(Math.random() * 7)]) { - case 'debug': - console.log('[Client] [Start Invoke] debug'); - await client.invoke('debug', new Date() + ': Hello, world!'); - console.log('[Client] [End Invoke] debug'); - break; - case 'shit': - console.log('[Client] [Start Invoke] shit'); - try { - await client.invoke('shit'); - } - catch (e) { - console.error(e); - } - console.log('[Client] [End Invoke] shit'); - break; - case 'hi': - console.log('[Client] [Start Invoke] hi'); - console.log('Response: ' + await client.invoke('hi', new Date() + ': Hello, world!')); - console.log('[Client] [End Invoke] hi'); - break; - case 'startStream2Server': { - console.log('[Client] [Start Invoke] startStream2Server'); - const streamId = await client.invoke('startStream2Server'); - console.log('[Client] [End Invoke] startStream2Server'); - setImmediate(() => { - testSendingStream(client, streamId, 'Client').catch(console.error); - }); - break; - } - case 'startStream2Client': { - console.log('[Client] [Start Invoke] startStream2Client'); - if (!client.transporter?.writable) { - - await client.connect(); - } - const stream = client.streams.create(); - testRecvStream(stream, 'Client'); - await client.invoke('startStream2Client', stream.id); - console.log('[Client] [End Invoke] startStream2Client'); - break; - } - case 'serverShouldCloseConn': - if (Math.random() < 0.1) { - console.log('[Client] [Start Invoke] serverShouldCloseConn'); - await client.invoke('serverShouldCloseConn'); - console.log('[Client] [End Invoke] serverShouldCloseConn'); - } - break; - case 'clientShouldCloseConn': { - if (Math.random() < 0.1) { - console.log('[Client] clientShouldCloseConn'); - client.close(); - } - break; - } - } - } - catch (e) { - - console.error('[Client] Error: ', e); - } - } - while (1); + await doClientTest(client); })().catch(console.error); diff --git a/src/examples/network/wss-client.ts b/src/examples/network/wss-client.ts index 305d25f..5f449b3 100644 --- a/src/examples/network/wss-client.ts +++ b/src/examples/network/wss-client.ts @@ -17,7 +17,9 @@ import * as FS from 'node:fs'; import * as Tv from '../../lib'; import * as WebSocket from '../../lib/transporters/websocket'; -import { IApis, sleep, testSendingStream, testRecvStream, getClaOption, holdProcess } from './shared'; +import { getClaOption, holdProcess } from '../shared/test-utils'; +import { IApis } from '../shared/decl'; +import { doClientTest } from '../shared/client'; holdProcess(); @@ -32,90 +34,6 @@ holdProcess(); }) ); - client.on('error', (e) => console.error(`[Client] Unexpected error: ${e}`)); - - client.on('push_message', (msg) => { - - console.log('[Client] Message from server: ' + Buffer.concat(msg).toString()); - }); - - do { - - await sleep(100); - - try { - - switch (([ - 'debug', - 'startStream2Server', - 'startStream2Client', - 'serverShouldCloseConn', - 'clientShouldCloseConn', - 'hi', - 'shit', - ] as const)[Math.floor(Math.random() * 7)]) { - case 'debug': - console.log('[Client] [Start Invoke] debug'); - await client.invoke('debug', new Date() + ': Hello, world!'); - console.log('[Client] [End Invoke] debug'); - break; - case 'shit': - console.log('[Client] [Start Invoke] shit'); - try { - await client.invoke('shit'); - } - catch (e) { - console.error(e); - } - console.log('[Client] [End Invoke] shit'); - break; - case 'hi': - console.log('[Client] [Start Invoke] hi'); - console.log('Response: ' + await client.invoke('hi', new Date() + ': Hello, world!')); - console.log('[Client] [End Invoke] hi'); - break; - case 'startStream2Server': { - console.log('[Client] [Start Invoke] startStream2Server'); - const streamId = await client.invoke('startStream2Server'); - console.log('[Client] [End Invoke] startStream2Server'); - setImmediate(() => { - testSendingStream(client, streamId, 'Client').catch(console.error); - }); - break; - } - case 'startStream2Client': { - console.log('[Client] [Start Invoke] startStream2Client'); - if (!client.transporter?.writable) { - - await client.connect(); - } - const stream = client.streams.create(); - testRecvStream(stream, 'Client'); - await client.invoke('startStream2Client', stream.id); - console.log('[Client] [End Invoke] startStream2Client'); - break; - } - case 'serverShouldCloseConn': - if (Math.random() < 0.1) { - console.log('[Client] [Start Invoke] serverShouldCloseConn'); - await client.invoke('serverShouldCloseConn'); - console.log('[Client] [End Invoke] serverShouldCloseConn'); - } - break; - case 'clientShouldCloseConn': { - if (Math.random() < 0.1) { - console.log('[Client] clientShouldCloseConn'); - client.close(); - } - break; - } - } - } - catch (e) { - - console.error('[Client] Error: ', e); - } - } - while (1); + await doClientTest(client); })().catch(console.error); diff --git a/src/examples/shared/client.ts b/src/examples/shared/client.ts new file mode 100644 index 0000000..9020bc4 --- /dev/null +++ b/src/examples/shared/client.ts @@ -0,0 +1,310 @@ +/** + * Copyright 2024 Angus.Fenying + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as Tv from '../../lib'; +import { clientLogs } from './log'; +import { IApis } from './decl'; +import { testSendingStream, testRecvStream } from './stream'; +import { sleep } from './test-utils'; + +export async function doClientTest( + client: Tv.Clients.IClient, + connLess: boolean = false, + +): Promise { + + const pendingMessages: Record = {}; + + client.on('error', (e) => clientLogs.error(`Unexpected error: ${e}`)); + + client.on('push_message', (msg) => { + + const sMsg = Buffer.concat(msg).toString(); + + if (pendingMessages[sMsg]) { + + clientLogs.ok('Received message in time: ' + sMsg); + } + else { + + clientLogs.warning('Message delayed from server: ' + sMsg); + } + + delete pendingMessages[sMsg]; + }); + + client.on('close', () => clientLogs.warning('Connection closed')); + + const tests = [ + 'echo', + 'api_not_found', + 'invalid_error_reply', + 'valid_error_reply', + 'invalid_data_in_json', + 'minimal_api', + 'void_reply', + 'server_close_connection', + 'client_close_connection', + 'test_message', + 'send_stream', + 'recv_stream', + ] as const; + + const autoAsync = (n: T): T | `${T}Async` => Math.random() > 0.5 ? n : `${n}Async`; + + do { + + await sleep(200); + + const caseName = tests[Math.floor(Math.random() * tests.length)]; + + const mkMsg = (msg: string) => `[${caseName}] ${msg}`; + + try { + + clientLogs.info(`[Started] ${caseName}`); + switch (caseName) { + case 'minimal_api': + try { + + await client.invoke(autoAsync('minimalApi')); + clientLogs.ok(mkMsg(`Passed`)); + } + catch (e) { + + clientLogs.error(mkMsg(`Unexpected error thrown, error: ${e}`)); + } + break; + case 'void_reply': + try { + + await client.invoke(autoAsync('replyVoid'), 'test'); + clientLogs.ok(mkMsg(`Passed`)); + } + catch (e) { + + clientLogs.error(mkMsg(`Unexpected error thrown, error: ${e}`)); + } + break; + case 'invalid_error_reply': + try { + + await client.invoke('throwInvalidError'); + clientLogs.error(mkMsg(`Expected error not thrown`)); + } + catch (e) { + + if (e instanceof Tv.errors.server_internal_error) { + + clientLogs.ok(mkMsg(`Expected error thrown, error: ${e}`)); + } + else { + + clientLogs.error(mkMsg(`Unexpected error thrown, error: ${e}`)); + } + } + break; + case 'valid_error_reply': + try { + + await client.invoke('throwInvalidError'); + clientLogs.error(mkMsg(`Expected error not thrown`)); + } + catch (e) { + + if (e instanceof Tv.errors.server_internal_error) { + + clientLogs.ok(mkMsg(`Got expected error thrown, error: ${e}`)); + } + else { + + clientLogs.error(mkMsg(`Unexpected error thrown, error: ${e}`)); + } + } + break; + case 'echo': + try { + + if ('hello' === await client.invoke(autoAsync('echo'), 'hello')) { + + clientLogs.ok(mkMsg(`Got expected response`)); + } + else { + + clientLogs.error(mkMsg(`Unexpected response`)); + } + } + catch (e) { + + clientLogs.error(mkMsg(`Unexpected error thrown, error: ${e}`)); + } + break; + case 'api_not_found': + try { + await client.invoke('notFoundApi'); + } + catch (e) { + + if (e instanceof Tv.errors.api_not_found) { + + clientLogs.ok(mkMsg(`Got expected error thrown, error: ${e}`)); + } + else { + + clientLogs.error(mkMsg(`Unexpected error thrown, error: ${e}`)); + } + } + break; + case 'invalid_data_in_json': + try { + await client.invoke(autoAsync('replyNonValidDataInJson')); + } + catch (e) { + + if (e instanceof Tv.errors.server_internal_error) { + + clientLogs.ok(mkMsg(`Got expected error thrown, error: ${e}`)); + } + else { + + clientLogs.error(mkMsg(`Unexpected error thrown, error: ${e}`)); + } + } + break; + case 'test_message': + try { + + const msg = Date.now() + Math.random().toString(); + + pendingMessages[msg] = true; + + await client.invoke(autoAsync('sendMeMessage'), msg); + + if (msg in pendingMessages) { + + setTimeout(() => { + + if (msg in pendingMessages) { + + clientLogs.error(mkMsg(`Message not received in time`)); + } + + }, 1000); + } + + clientLogs.ok(mkMsg(`Message triggered`)); + } + catch (e) { + + if (connLess && e instanceof Tv.errors.server_internal_error) { + + clientLogs.warning(mkMsg(`Specific command is not implemented by current protocol, skipped`)); + break; + } + + clientLogs.error(mkMsg(`Unexpected error thrown, error: ${e}`)); + } + break; + case 'send_stream': { + try { + + const streamId = await client.invoke('createStreamToReceive'); + setImmediate(() => { + testSendingStream(client, streamId, clientLogs) + .catch(e => clientLogs.error(`Failed to send data to stream#${streamId}`)); + }); + } + catch (e) { + + if (connLess && e instanceof Tv.errors.server_internal_error) { + + clientLogs.warning(mkMsg(`Specific command is not implemented by current protocol, skipped`)); + break; + } + + if (e instanceof Tv.errors.system_busy) { + + clientLogs.warning(mkMsg(`Server is busy, skipped`)); + } + else { + + clientLogs.error(mkMsg(`Unexpected error thrown, error: ${e}`)); + } + } + break; + } + case 'recv_stream': { + try { + + if (!client.transporter?.writable) { + + await client.connect(); + } + const stream = client.streams.create(); + testRecvStream(stream, clientLogs); + await client.invoke('startSendingThroughStream', stream.id); + } + catch (e) { + + if (connLess && e instanceof Tv.errors.cmd_not_impl) { + + clientLogs.warning(mkMsg(`Specific command is not implemented by current protocol, skipped`)); + break; + } + else { + + clientLogs.error(mkMsg(`Unexpected error thrown, error: ${e}`)); + } + } + break; + } + case 'server_close_connection': + if (Math.random() < 0.1) { + clientLogs.info(mkMsg(`Executed`)); + try { + await client.invoke('closeConnection'); + clientLogs.ok(mkMsg(`done`)); + } + catch (e) { + + clientLogs.error(mkMsg(`Unexpected error thrown, error: ${e}`)); + } + } + else { + clientLogs.info(mkMsg(`Skipped`)); + } + break; + case 'client_close_connection': { + if (Math.random() < 0.1) { + clientLogs.info(mkMsg(`Executed`)); + client.close(); + } + else { + clientLogs.info(mkMsg(`Skipped`)); + } + break; + } + } + clientLogs.info(mkMsg(`Ended`)); + } + catch (e) { + + clientLogs.error(mkMsg(`Error: ${e}`)); + } + } + while (1); + +} \ No newline at end of file diff --git a/src/examples/shared/decl.ts b/src/examples/shared/decl.ts new file mode 100644 index 0000000..21f9591 --- /dev/null +++ b/src/examples/shared/decl.ts @@ -0,0 +1,50 @@ +/** + * Copyright 2024 Angus.Fenying + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +export interface IApis { + + closeConnection(): void; + + startSendingThroughStream(streamId: number): void; + + createStreamToReceive(): number; + + replyNonValidDataInJsonAsync(): unknown; + + replyNonValidDataInJson(): unknown; + + sendMeMessageAsync(msg: string): void; + + sendMeMessage(msg: string): void; + + replyVoidAsync(msg: string): void; + + replyVoid(msg: string): void; + + echoAsync(text: string): string; + + echo(text: string): string; + + minimalApi(): void; + + minimalApiAsync(): void; + + throwValidError(): void; + + throwInvalidError(): void; + + notFoundApi(): void; +} diff --git a/src/examples/shared/log.ts b/src/examples/shared/log.ts new file mode 100644 index 0000000..3982141 --- /dev/null +++ b/src/examples/shared/log.ts @@ -0,0 +1,46 @@ +/** + * Copyright 2024 Angus.Fenying + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +export class Logger { + + public constructor( + private _endpoint: string + ) {} + + public error(message: string): void { + + console.error(`[${new Date().toISOString()}][ERROR] ${this._endpoint}: ${message}`); + } + + public ok(message: string): void { + + console.info(`[${new Date().toISOString()}][OK] ${this._endpoint}: ${message}`); + } + + public info(message: string): void { + + console.info(`[${new Date().toISOString()}][INFO] ${this._endpoint}: ${message}`); + } + + public warning(message: string): void { + + console.warn(`[${new Date().toISOString()}][WARNING] ${this._endpoint}: ${message}`); + } +} + +export const serverLogs = new Logger('Server'); + +export const clientLogs = new Logger('Client'); diff --git a/src/examples/shared/router.ts b/src/examples/shared/router.ts new file mode 100644 index 0000000..65eef13 --- /dev/null +++ b/src/examples/shared/router.ts @@ -0,0 +1,122 @@ +/** + * Copyright 2024 Angus.Fenying + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as Tv from '../../lib'; +import { serverLogs } from './log'; +import { testRecvStream, testSendingStream } from './stream'; + +export const router: Tv.Servers.IRouter = new Tv.Servers.SimpleJsonApiRouter() + .registerApi('minimalApi', (ctx): void => { + + serverLogs.ok(`Channel#${ctx.channel.id} invoked minimalApi`); + }) + .registerApi('minimalApiAsync', async (ctx): Promise => { + + serverLogs.ok(`Channel#${ctx.channel.id} invoked minimalApiAsync`); + }) + .registerApi('replyVoid', (ctx, msg: string): void => { + + serverLogs.ok(`Channel#${ctx.channel.id} invoked replyVoid with message: ${msg}`); + }) + .registerApi('replyVoidAsync', async (ctx, msg: string): Promise => { + + serverLogs.ok(`Channel#${ctx.channel.id} invoked replyVoidAsync with message: ${msg}`); + }) + .registerApi('echo', (ctx, text: string): string => { + + serverLogs.ok(`Channel#${ctx.channel.id} invoked echo with text: ${text}`); + + return text; + }) + .registerApi('echoAsync', async (ctx, text: string): Promise => { + + serverLogs.ok(`Channel#${ctx.channel.id} invoked echoAsync with text: ${text}`); + + return text; + }) + .registerApi('sendMeMessage', (ctx, msg: string): void => { + + serverLogs.ok(`Channel#${ctx.channel.id} invoked sendMeMessage with message: ${msg}`); + + ctx.channel.sendMessage(msg).catch(e => serverLogs.error(`Failed while sending message, error: ${e}`)); + }) + .registerApi('sendMeMessageAsync', async (ctx, msg: string): Promise => { + + serverLogs.ok(`Channel#${ctx.channel.id} invoked sendMeMessageAsync with message: ${msg}`); + + try { + await ctx.channel.sendMessage(msg); + } + catch (e) { + serverLogs.error(`Failed while sending message, error: ${e}`); + } + }) + .registerApi('replyNonValidDataInJson', (ctx): unknown => { + + serverLogs.ok(`Channel#${ctx.channel.id} invoked test_bad_response`); + + return { + f: BigInt(123) + }; + }) + .registerApi('replyNonValidDataInJsonAsync', (ctx): Promise => { + + serverLogs.ok(`Channel#${ctx.channel.id} invoked test_bad_response`); + + return Promise.resolve({ + f: BigInt(123) + }); + }) + .registerApi('createStreamToReceive', (ctx): number => { + + serverLogs.ok(`Channel#${ctx.channel.id} invoked createStreamToReceive`); + const stream = ctx.channel.streams.create(); + serverLogs.ok(`Opened stream #${stream.id}`); + + testRecvStream(stream, serverLogs); + + return stream.id; + }) + .registerApi('startSendingThroughStream', (ctx, streamId: number): void => { + + serverLogs.ok(`Channel#${ctx.channel.id} invoked startSendingThroughStream`); + testSendingStream(ctx.channel, streamId, serverLogs) + .catch(e => serverLogs.error(`Failed to send data to stream#${streamId}`)); + }) + .registerApi('closeConnection', (ctx): void => { + + serverLogs.ok(`Channel#${ctx.channel.id} invoked closeConnection`); + + ctx.channel.close(); + }) + .registerApi('throwValidError', (ctx): void => { + + serverLogs.ok(`Channel#${ctx.channel.id} invoked throwValidError`); + + throw new Tv.TvErrorResponse({ + text: 'The data of this error could be sent to client', + code: 12345 + }); + }) + .registerApi('throwInvalidError', (ctx): void => { + + serverLogs.ok(`Channel#${ctx.channel.id} invoked throwValidError`); + + throw { + text: 'The data of this error will not be sent to client', + code: 12345 + }; + }); diff --git a/src/examples/network/shared.ts b/src/examples/shared/stream.ts similarity index 53% rename from src/examples/network/shared.ts rename to src/examples/shared/stream.ts index 9e9a798..b4ab6e2 100644 --- a/src/examples/network/shared.ts +++ b/src/examples/shared/stream.ts @@ -16,31 +16,12 @@ import * as Crypto from 'node:crypto'; import * as Tv from '../../lib'; - -export interface IApis { - - debug(text: string): void; - - say(text: string): void; - - startStream2Server(): number; - - startStream2Client(streamId: number): void; - - serverShouldCloseConn(): void; - - hi(text: string): string; - - shit(): string; - - test_bad_response(): unknown; - - test_bad_response_async(): unknown; -} +import type { IApis } from './decl'; +import { sleep } from './test-utils'; +import { Logger } from './log'; function sumBuffer(d: Buffer, b: number): number { - // eslint-disable-next-line @typescript-eslint/prefer-for-of for (let i = 0; i < d.length; i++) { b += d[i]; @@ -49,9 +30,12 @@ function sumBuffer(d: Buffer, b: number): number { return b; } -export function testRecvStream(stream: Tv.IBinaryReadStream, endpoint: 'Client' | 'Server'): void { +export function testRecvStream(stream: Tv.IBinaryReadStream, logger: Logger): void { let sum = 0; + + const subject = `Stream #${stream.id}`; + stream.on('data', (chunk) => { sum = sumBuffer(chunk, sum); @@ -60,23 +44,31 @@ export function testRecvStream(stream: Tv.IBinaryReadStream, endpoint: 'Client' if (stream.readableAborted) { - console.log(`[${endpoint}] Stream #${stream.id} aborted`); + logger.warning(`${subject} aborted`); } else { - console.log(`[${endpoint}] Stream #${stream.id} all received, sum = ${sum}`); + logger.ok(`${subject} all received, sum = ${sum}`); } }).on('error', (e) => { - console.error(`[${endpoint}] Stream #${stream.id} error: `, e); + if (e instanceof Tv.errors.stream_aborted) { + logger.warning(`${subject} aborted`); + } + else if (e instanceof Tv.errors.timeout) { + logger.warning(`${subject} timeout`); + } + else { + logger.error(`${subject} unexpected error: ${e}`); + } }); } export async function testSendingStream( ch: Pick, 'sendBinaryChunk'>, streamId: number, - endpoint: 'Client' | 'Server' + logger: Logger ): Promise { const buffers = new Array(Math.ceil(Math.random() * 10)) @@ -87,19 +79,21 @@ export async function testSendingStream( let i = 0; + const subject = `Stream #${streamId}`; + for (const [idx, p] of buffers.entries()) { sum = sumBuffer(p, sum); if (Math.random() > 0.9) { - console.log(`[${endpoint}] Aborting stream #${streamId} at chunk#${i} of ${p.length} bytes.`); + logger.warning(`${subject} aborted at chunk#${i} of ${p.length} bytes.`); await ch.sendBinaryChunk(streamId, false, null); return; } - console.log(`[${endpoint}] Sending chunk#${i++} of ${p.length} bytes to stream #${streamId}`); + logger.ok(`Sending chunk#${i++} of ${p.length} bytes to ${subject}`); await ch.sendBinaryChunk(streamId, idx, p); await sleep(Math.ceil(1000 * Math.random())); @@ -107,24 +101,5 @@ export async function testSendingStream( await ch.sendBinaryChunk(streamId, buffers.length, null); - console.log(`[${endpoint}] Stream #${streamId} all sent, sum = ${sum}`); -} - -export function sleep(ms: number): Promise { - - return new Promise((resolve) => { - - setTimeout(resolve, ms); - }); -} - -export function getClaOption(name: string, defaultValue: string): string { - - const ret = process.argv.find(i => i.startsWith(`--${name}=`)); - - return ret?.slice(name.length + 3).trim() ?? defaultValue; -} - -export function holdProcess(): void { - setInterval(() => { console.log(`[${new Date().toISOString()}]: Process holding`); }, 1000); + logger.ok(`[${subject}] Stream #${streamId} all sent, sum = ${sum}`); } diff --git a/src/examples/shared/test-utils.ts b/src/examples/shared/test-utils.ts new file mode 100644 index 0000000..fd0bc8c --- /dev/null +++ b/src/examples/shared/test-utils.ts @@ -0,0 +1,34 @@ +/** + * Copyright 2024 Angus.Fenying + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +export function sleep(ms: number): Promise { + + return new Promise((resolve) => { + + setTimeout(resolve, ms); + }); +} + +export function getClaOption(name: string, defaultValue: string): string { + + const ret = process.argv.find(i => i.startsWith(`--${name}=`)); + + return ret?.slice(name.length + 3).trim() ?? defaultValue; +} + +export function holdProcess(): void { + setInterval(() => { console.log(`[${new Date().toISOString()}]: Process holding`); }, 1000); +} diff --git a/src/examples/worker-thread/benchmark-worker-client.ts b/src/examples/worker-thread/benchmark-worker-client.ts index c20c349..ab1ec1b 100644 --- a/src/examples/worker-thread/benchmark-worker-client.ts +++ b/src/examples/worker-thread/benchmark-worker-client.ts @@ -16,7 +16,7 @@ import * as Tv from '../../lib'; import * as WorkerThread from '../../lib/transporters/worker-thread'; -import { IApis } from '../network/shared'; +import { IApis } from '../shared/decl'; const cases = new Array(50000).fill(0) as number[]; @@ -30,7 +30,7 @@ const cases = new Array(50000).fill(0) as number[]; for (let i = 0; i < 10; i++) { console.time('[televoke2/worker-thread] 50000 requests'); - await Promise.all(cases.map(() => client.invoke('say', 'test'))); + await Promise.all(cases.map(() => client.invoke('echo', 'test'))); console.timeEnd('[televoke2/worker-thread] 50000 requests'); } diff --git a/src/examples/worker-thread/main-client.ts b/src/examples/worker-thread/main-client.ts index 2bec18a..88b7665 100644 --- a/src/examples/worker-thread/main-client.ts +++ b/src/examples/worker-thread/main-client.ts @@ -17,7 +17,9 @@ import * as Tv from '../../lib'; import * as NodeWorker from 'node:worker_threads'; import * as WorkerThread from '../../lib/transporters/worker-thread'; -import { IApis, sleep, testSendingStream, testRecvStream, holdProcess } from '../network/shared'; +import { holdProcess } from '../shared/test-utils'; +import { IApis } from '../shared/decl'; +import { doClientTest } from '../shared/client'; holdProcess(); @@ -31,100 +33,6 @@ holdProcess(); await new Promise((resolve) => worker.on('online', resolve)); - client.on('error', (e) => console.error(`[Client] Unexpected error: ${e}`)); - - client.on('push_message', (msg) => { - - console.log('[Client] Message from server: ' + Buffer.concat(msg).toString()); - }); - - do { - - await sleep(100); - - try { - - switch (([ - 'debug', - 'test_bad_response', - 'test_bad_response_async', - 'startStream2Server', - 'startStream2Client', - 'hi', - 'shit', - ] as const)[Math.floor(Math.random() * 7)]) { - case 'debug': - console.log('[Client] [Start Invoke] debug'); - await client.invoke('debug', new Date() + ': Hello, world!'); - console.log('[Client] [End Invoke] debug'); - break; - case 'test_bad_response': - console.log('[Client] [Start Invoke] test_bad_response'); - await client.invoke('test_bad_response'); - console.log('[Client] [End Invoke] test_bad_response'); - break; - case 'test_bad_response_async': - console.log('[Client] [Start Invoke] test_bad_response_async'); - await client.invoke('test_bad_response_async'); - console.log('[Client] [End Invoke] test_bad_response_async'); - break; - case 'shit': - console.log('[Client] [Start Invoke] shit'); - try { - await client.invoke('shit'); - } - catch (e) { - console.error(e); - } - console.log('[Client] [End Invoke] shit'); - break; - case 'hi': - console.log('[Client] [Start Invoke] hi'); - console.log('Response: ' + await client.invoke('hi', new Date() + ': Hello, world!')); - console.log('[Client] [End Invoke] hi'); - break; - case 'startStream2Server': { - console.log('[Client] [Start Invoke] startStream2Server'); - const streamId = await client.invoke('startStream2Server'); - console.log('[Client] [End Invoke] startStream2Server'); - setImmediate(() => { - testSendingStream(client, streamId, 'Client').catch(console.error); - }); - break; - } - case 'startStream2Client': { - console.log('[Client] [Start Invoke] startStream2Client'); - if (!client.transporter?.writable) { - - await client.connect(); - } - const stream = client.streams.create(); - testRecvStream(stream, 'Client'); - await client.invoke('startStream2Client', stream.id); - console.log('[Client] [End Invoke] startStream2Client'); - break; - } - // case 'serverShouldCloseConn': - // if (Math.random() < 0.1) { - // console.log('[Client] [Start Invoke] serverShouldCloseConn'); - // await client.invoke('serverShouldCloseConn'); - // console.log('[Client] [End Invoke] serverShouldCloseConn'); - // } - // break; - // case 'clientShouldCloseConn': { - // if (Math.random() < 0.1) { - // console.log('[Client] clientShouldCloseConn'); - // client.close(); - // } - // break; - // } - } - } - catch (e) { - - console.error('[Client] Error: ', e); - } - } - while (1); + doClientTest(client); })().catch(console.error); diff --git a/src/examples/worker-thread/main-server.ts b/src/examples/worker-thread/main-server.ts index 2db8e5d..f99b78a 100644 --- a/src/examples/worker-thread/main-server.ts +++ b/src/examples/worker-thread/main-server.ts @@ -17,85 +17,12 @@ import * as Tv from '../../lib'; import * as NodeWorker from 'node:worker_threads'; import * as WorkerThread from '../../lib/transporters/worker-thread'; -import { holdProcess, testRecvStream, testSendingStream } from '../network/shared'; - -const router = new Tv.Servers.SimpleJsonApiRouter(); +import { holdProcess } from '../shared/test-utils'; +import { router } from '../shared/router'; const server = new Tv.Servers.TvServer(router) .on('error', (e) => { console.error(e); }); -router - .registerApi('debug', (ctx, text: string): void => { - - console.log(`[Server] Channel#${ctx.channel.id} invoked debug`); - console.log(`[Server] Client says: ${text}`); - - if (Math.random() > 0.5) { - console.log('[Server] Sent a message to client'); - ctx.channel.sendMessage('hello').catch(console.error); - } - }) - .registerApi('test_bad_response', (ctx): unknown => { - - console.log(`[Server] Channel#${ctx.channel.id} invoked test_bad_response`); - - return { - f: BigInt(123) - }; - }) - .registerApi('test_bad_response_async', (ctx): Promise => { - - console.log(`[Server] Channel#${ctx.channel.id} invoked test_bad_response`); - - return Promise.resolve({ - f: BigInt(123) - }); - }) - .registerApi('say', (ctx, text: string): string => { - - return text; - }) - .registerApi('startStream2Server', (ctx): number => { - - console.log(`[Server] Channel#${ctx.channel.id} invoked startStream2Server`); - const stream = ctx.channel.streams.create(); - console.log(`[Server] Opened stream #${stream.id}`); - - testRecvStream(stream, 'Server'); - - return stream.id; - }) - .registerApi('serverShouldCloseConn', (ctx): void => { - - console.log(`[Server] Channel#${ctx.channel.id} invoked serverShouldCloseConn`); - - ctx.channel.close(); - }) - .registerApi('hi', (ctx, text: string): string => { - - console.log(`[Server] Channel#${ctx.channel.id} invoked hi`); - - return Buffer.from(text).toString('base64url'); - }) - .registerApi('shit', (ctx): string => { - - console.log(`[Server] Channel#${ctx.channel.id} invoked shit`); - - if (Math.random() > 0.5) { - throw { test: 'this error should be unclear to clients' }; - } - else { - throw new Tv.TvErrorResponse({ - test: 'this error should be visible to clients' - }); - } - }) - .registerApi('startStream2Client', (ctx, streamId: number): void => { - - console.log(`[Server] Channel#${ctx.channel.id} invoked startStream2Client`); - testSendingStream(ctx.channel, streamId, 'Server').catch(console.error); - }); - const wtGateway = WorkerThread.createMainThreadGateway(server); holdProcess(); diff --git a/src/examples/worker-thread/worker-client.ts b/src/examples/worker-thread/worker-client.ts index d856abc..c0ee236 100644 --- a/src/examples/worker-thread/worker-client.ts +++ b/src/examples/worker-thread/worker-client.ts @@ -16,7 +16,9 @@ import * as Tv from '../../lib'; import * as WorkerThread from '../../lib/transporters/worker-thread'; -import { IApis, sleep, testSendingStream, testRecvStream, holdProcess } from '../network/shared'; +import { holdProcess } from '../shared/test-utils'; +import { doClientTest } from '../shared/client'; +import { IApis } from '../shared/decl'; holdProcess(); @@ -26,100 +28,6 @@ holdProcess(); WorkerThread.connectToMainThreadServer() ); - client.on('error', (e) => console.error(`[Client] Unexpected error: ${e}`)); - - client.on('push_message', (msg) => { - - console.log('[Client] Message from server: ' + Buffer.concat(msg).toString()); - }); - - do { - - await sleep(100); - - try { - - switch (([ - 'debug', - 'test_bad_response', - 'test_bad_response_async', - 'startStream2Server', - 'startStream2Client', - 'hi', - 'shit', - ] as const)[Math.floor(Math.random() * 7)]) { - case 'debug': - console.log('[Client] [Start Invoke] debug'); - await client.invoke('debug', new Date() + ': Hello, world!'); - console.log('[Client] [End Invoke] debug'); - break; - case 'test_bad_response': - console.log('[Client] [Start Invoke] test_bad_response'); - await client.invoke('test_bad_response'); - console.log('[Client] [End Invoke] test_bad_response'); - break; - case 'test_bad_response_async': - console.log('[Client] [Start Invoke] test_bad_response_async'); - await client.invoke('test_bad_response_async'); - console.log('[Client] [End Invoke] test_bad_response_async'); - break; - case 'shit': - console.log('[Client] [Start Invoke] shit'); - try { - await client.invoke('shit'); - } - catch (e) { - console.error(e); - } - console.log('[Client] [End Invoke] shit'); - break; - case 'hi': - console.log('[Client] [Start Invoke] hi'); - console.log('Response: ' + await client.invoke('hi', new Date() + ': Hello, world!')); - console.log('[Client] [End Invoke] hi'); - break; - case 'startStream2Server': { - console.log('[Client] [Start Invoke] startStream2Server'); - const streamId = await client.invoke('startStream2Server'); - console.log('[Client] [End Invoke] startStream2Server'); - setImmediate(() => { - testSendingStream(client, streamId, 'Client').catch(console.error); - }); - break; - } - case 'startStream2Client': { - console.log('[Client] [Start Invoke] startStream2Client'); - if (!client.transporter?.writable) { - - await client.connect(); - } - const stream = client.streams.create(); - testRecvStream(stream, 'Client'); - await client.invoke('startStream2Client', stream.id); - console.log('[Client] [End Invoke] startStream2Client'); - break; - } - // case 'serverShouldCloseConn': - // if (Math.random() < 0.1) { - // console.log('[Client] [Start Invoke] serverShouldCloseConn'); - // await client.invoke('serverShouldCloseConn'); - // console.log('[Client] [End Invoke] serverShouldCloseConn'); - // } - // break; - // case 'clientShouldCloseConn': { - // if (Math.random() < 0.1) { - // console.log('[Client] clientShouldCloseConn'); - // client.close(); - // } - // break; - // } - } - } - catch (e) { - - console.error('[Client] Error: ', e); - } - } - while (1); + doClientTest(client); })().catch(console.error); diff --git a/src/examples/worker-thread/worker-server.ts b/src/examples/worker-thread/worker-server.ts index b075634..43a3845 100644 --- a/src/examples/worker-thread/worker-server.ts +++ b/src/examples/worker-thread/worker-server.ts @@ -16,85 +16,12 @@ import * as Tv from '../../lib'; import * as WorkerThread from '../../lib/transporters/worker-thread'; -import { holdProcess, testRecvStream, testSendingStream } from '../network/shared'; - -const router = new Tv.Servers.SimpleJsonApiRouter(); +import { router } from '../shared/router'; +import { holdProcess } from '../shared/test-utils'; const server = new Tv.Servers.TvServer(router) .on('error', (e) => { console.error(e); }); -router - .registerApi('debug', (ctx, text: string): void => { - - console.log(`[Server] Channel#${ctx.channel.id} invoked debug`); - console.log(`[Server] Client says: ${text}`); - - if (Math.random() > 0.5) { - console.log('[Server] Sent a message to client'); - ctx.channel.sendMessage('hello').catch(console.error); - } - }) - .registerApi('test_bad_response', (ctx): unknown => { - - console.log(`[Server] Channel#${ctx.channel.id} invoked test_bad_response`); - - return { - f: BigInt(123) - }; - }) - .registerApi('test_bad_response_async', (ctx): Promise => { - - console.log(`[Server] Channel#${ctx.channel.id} invoked test_bad_response`); - - return Promise.resolve({ - f: BigInt(123) - }); - }) - .registerApi('say', (ctx, text: string): string => { - - return text; - }) - .registerApi('startStream2Server', (ctx): number => { - - console.log(`[Server] Channel#${ctx.channel.id} invoked startStream2Server`); - const stream = ctx.channel.streams.create(); - console.log(`[Server] Opened stream #${stream.id}`); - - testRecvStream(stream, 'Server'); - - return stream.id; - }) - .registerApi('serverShouldCloseConn', (ctx): void => { - - console.log(`[Server] Channel#${ctx.channel.id} invoked serverShouldCloseConn`); - - ctx.channel.close(); - }) - .registerApi('hi', (ctx, text: string): string => { - - console.log(`[Server] Channel#${ctx.channel.id} invoked hi`); - - return Buffer.from(text).toString('base64url'); - }) - .registerApi('shit', (ctx): string => { - - console.log(`[Server] Channel#${ctx.channel.id} invoked shit`); - - if (Math.random() > 0.5) { - throw { test: 'this error should be unclear to clients' }; - } - else { - throw new Tv.TvErrorResponse({ - test: 'this error should be visible to clients' - }); - } - }) - .registerApi('startStream2Client', (ctx, streamId: number): void => { - - console.log(`[Server] Channel#${ctx.channel.id} invoked startStream2Client`); - testSendingStream(ctx.channel, streamId, 'Server').catch(console.error); - }); - holdProcess(); const wtGateway = WorkerThread.createWorkerThreadGateway(server);