From 59c4016b99a3f90e7935bd4d0c20d43aee03c5df Mon Sep 17 00:00:00 2001 From: Angus Fenying Date: Wed, 15 Apr 2020 18:10:39 +0800 Subject: [PATCH] refactor(client): Use EventEmitter instead of simple hook point --- CHANGES.md | 2 ++ commitlint.config.js | 2 +- src/benchmarks/Http/Client.ts | 41 ++++++++++++++++++++++----------- src/benchmarks/TCP/Client.ts | 4 ++-- src/lib/Client/Common.ts | 5 ++-- src/lib/Client/HttpClient.ts | 9 +++++--- src/lib/Client/TCPClient.ts | 12 ++++++---- src/lib/Common/Encoding.ts | 2 ++ src/lib/Encoder/Decoder.ts | 2 +- src/lib/Server/Gateways/Http.ts | 2 +- 10 files changed, 52 insertions(+), 29 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 7d621e5..3fd4f80 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -3,3 +3,5 @@ ## v0.2.0 - refactor(global): Improved the experience. +- refactor(client): Use EventEmitter instead of simple hook point. +- perf(encoder): Fixed the debuff of encoder. diff --git a/commitlint.config.js b/commitlint.config.js index b5aecb1..1d607e3 100644 --- a/commitlint.config.js +++ b/commitlint.config.js @@ -23,7 +23,7 @@ module.exports = { 'global' ]], 'scope-empty': [2, 'never'], - 'subject-case': [2, 'always', 'lowerCase'], + 'subject-case': [2, 'never', 'lowerCase'], 'subject-min-length': [2, 'always', 5], 'subject-max-length': [2, 'always', 50], } diff --git a/src/benchmarks/Http/Client.ts b/src/benchmarks/Http/Client.ts index 2cbdc39..a25ac08 100644 --- a/src/benchmarks/Http/Client.ts +++ b/src/benchmarks/Http/Client.ts @@ -17,33 +17,48 @@ import * as $Televoke from '../../lib'; import { IGa, BENCHMARK_SERVER_PORT, BENCHMARK_SERVER_HOST } from './API'; +const ridGenerator = (function() { + + let i = 0; + return () => i++; +})(); + +const CONCURRENCY = 10000; + (async () => { - const client = $Televoke.createHttpClient(BENCHMARK_SERVER_HOST, BENCHMARK_SERVER_PORT, Math.random); + const client = $Televoke.createHttpClient(BENCHMARK_SERVER_HOST, BENCHMARK_SERVER_PORT, ridGenerator); await client.connect(); - console.time('TCP Invoke Concurrent'); - await Promise.all(Array(10000).fill(0).map(() => client.invoke('hi', {name: 'Angus'}))); - console.timeEnd('TCP Invoke Concurrent'); + await Promise.all(Array(CONCURRENCY).fill(0).map(() => client.invoke('hi', {name: 'Angus'}))); + + for (let i = 0; i < CONCURRENCY; i++) { + + await client.invoke('hi', {name: 'Angus'}); + } + + console.time(`TCP ${CONCURRENCY} Invokes Concurrent`); + await Promise.all(Array(CONCURRENCY).fill(0).map(() => client.invoke('hi', {name: 'Angus'}))); + console.timeEnd(`TCP ${CONCURRENCY} Invokes Concurrent`); - console.time('TCP Invoke Sequence'); - for (let i = 0; i < 10000; i++) { + console.time(`TCP ${CONCURRENCY} Invokes Sequence`); + for (let i = 0; i < CONCURRENCY; i++) { await client.invoke('hi', {name: 'Angus'}); } - console.timeEnd('TCP Invoke Sequence'); + console.timeEnd(`TCP ${CONCURRENCY} Invokes Sequence`); - console.time('TCP Call Concurrent'); - await Promise.all(Array(10000).fill(0).map(() => client.call('hi', {name: 'Angus'}))); - console.timeEnd('TCP Call Concurrent'); + console.time(`TCP ${CONCURRENCY} Calls Concurrent`); + await Promise.all(Array(CONCURRENCY).fill(0).map(() => client.call('hi', {name: 'Angus'}))); + console.timeEnd(`TCP ${CONCURRENCY} Calls Concurrent`); - console.time('TCP Call Sequence'); - for (let i = 0; i < 10000; i++) { + console.time(`TCP ${CONCURRENCY} Calls Sequence`); + for (let i = 0; i < CONCURRENCY; i++) { await client.call('hi', {name: 'Angus'}); } - console.timeEnd('TCP Call Sequence'); + console.timeEnd(`TCP ${CONCURRENCY} Calls Sequence`); await client.close(); diff --git a/src/benchmarks/TCP/Client.ts b/src/benchmarks/TCP/Client.ts index 1ec0c7b..e861033 100644 --- a/src/benchmarks/TCP/Client.ts +++ b/src/benchmarks/TCP/Client.ts @@ -23,11 +23,11 @@ const ridGenerator = (function() { return () => i++; })(); -const CONCURRENCY = 10000; +const CONCURRENCY = 30000; (async () => { - const client = $Televoke.createTCPClient(BENCHMARK_SERVER_HOST, BENCHMARK_SERVER_PORT, ridGenerator); + const client = $Televoke.createTCPClient(BENCHMARK_SERVER_HOST, BENCHMARK_SERVER_PORT, ridGenerator, 60000); await client.connect(); diff --git a/src/lib/Client/Common.ts b/src/lib/Client/Common.ts index 2d2ccc4..0814caa 100644 --- a/src/lib/Client/Common.ts +++ b/src/lib/Client/Common.ts @@ -15,6 +15,7 @@ */ import * as G from '../Common'; +import { Events } from '@litert/observable'; export type IRIDGenerator = () => string | number; @@ -25,9 +26,7 @@ export interface IResponse extends G.IRawResponse { crt: number; } -export interface IClient { - - onError: (e: unknown) => void; +export interface IClient extends Events.IObservable { connect(): Promise; diff --git a/src/lib/Client/HttpClient.ts b/src/lib/Client/HttpClient.ts index c58cee5..566996c 100644 --- a/src/lib/Client/HttpClient.ts +++ b/src/lib/Client/HttpClient.ts @@ -19,8 +19,9 @@ import * as C from './Common'; import * as GE from '../Errors'; import * as G from '../Common'; import * as E from './Errors'; +import { Events } from '@litert/observable'; -class HttpClient implements C.IClient { +class HttpClient extends Events.EventEmitter implements C.IClient { private _agent: $Http.Agent; @@ -33,6 +34,8 @@ class HttpClient implements C.IClient { private _timeout: number = 30000 ) { + super(); + this._agent = new $Http.Agent({ maxSockets: 0, keepAlive: true, @@ -68,7 +71,7 @@ class HttpClient implements C.IClient { const length = Buffer.byteLength(content); - if (length > 67108864) { + if (length > G.MAX_PACKET_SIZE) { return reject(new GE.E_PACKET_TOO_LARGE()); } @@ -92,7 +95,7 @@ class HttpClient implements C.IClient { const length = parseInt(resp.headers['content-length']); - if (!Number.isSafeInteger(length) || length > 67108864) { // Maximum request packet is 64MB + if (!Number.isSafeInteger(length) || length > G.MAX_PACKET_SIZE) { // Maximum request packet is 64MB return reject(new GE.E_PACKET_TOO_LARGE()); } diff --git a/src/lib/Client/TCPClient.ts b/src/lib/Client/TCPClient.ts index a4febd6..41c6932 100644 --- a/src/lib/Client/TCPClient.ts +++ b/src/lib/Client/TCPClient.ts @@ -21,6 +21,7 @@ import { Promises } from '@litert/observable'; import * as E from './Errors'; import { createDecoder } from '../Encoder/Decoder'; import { createEncoder } from '../Encoder/Encoder'; +import { Events } from '@litert/observable'; enum EStatus { @@ -45,7 +46,7 @@ interface IRequest { packet?: G.IRawRequest; } -class TCPClient implements C.IClient { +class TCPClient extends Events.EventEmitter implements C.IClient { private static _promises = Promises.getGlobalFactory(); @@ -71,8 +72,6 @@ class TCPClient implements C.IClient { private _encoder = createEncoder(); - public onError!: (e: unknown) => void; - public constructor( private _host: string, private _port: number, @@ -80,6 +79,8 @@ class TCPClient implements C.IClient { private _timeout: number = 30000 ) { + super(); + this._connPrId = `litert:televoke:tcp:client:${this._clientId}:connect`; this._closePrId = `litert:televoke:tcp:client:${this._clientId}:close`; } @@ -241,8 +242,8 @@ class TCPClient implements C.IClient { const decoder = createDecoder(); - decoder.onLogicError = this.onError; - decoder.onProtocolError = this.onError; + decoder.onLogicError = (e: unknown) => this.emit('error', e); + decoder.onProtocolError = (e: unknown) => this.emit('error', e); decoder.onData = (rawData: G.IRawResponse): void => { @@ -254,6 +255,7 @@ class TCPClient implements C.IClient { return; } + data.cst = req.cst; data.crt = Date.now(); diff --git a/src/lib/Common/Encoding.ts b/src/lib/Common/Encoding.ts index 7101ebd..429290e 100644 --- a/src/lib/Common/Encoding.ts +++ b/src/lib/Common/Encoding.ts @@ -14,6 +14,8 @@ * limitations under the License. */ +export const MAX_PACKET_SIZE = 67108864; + export interface IWritable { write(buf: Buffer | string, cb?: () => void): void; diff --git a/src/lib/Encoder/Decoder.ts b/src/lib/Encoder/Decoder.ts index d22eb69..66a0002 100644 --- a/src/lib/Encoder/Decoder.ts +++ b/src/lib/Encoder/Decoder.ts @@ -170,7 +170,7 @@ class Decoder implements C.IDecoder { } } - if (this._packetLength > 67108864) { // Maximum packet size is 64M + if (this._packetLength > C.MAX_PACKET_SIZE) { // Maximum packet size is 64M this.onProtocolError(new GE.E_PACKET_TOO_LARGE()); this.reset(); diff --git a/src/lib/Server/Gateways/Http.ts b/src/lib/Server/Gateways/Http.ts index 40fa718..471e68c 100644 --- a/src/lib/Server/Gateways/Http.ts +++ b/src/lib/Server/Gateways/Http.ts @@ -55,7 +55,7 @@ class HttpGateway implements C.IGateway { const length = parseInt(req.headers['content-length']); - if (!Number.isSafeInteger(length) || length > 67108864) { // Maximum request packet is 64MB + if (!Number.isSafeInteger(length) || length > G.MAX_PACKET_SIZE) { // Maximum request packet is 64MB resp.socket.destroy(); return;