Skip to content

Commit

Permalink
refactor(client): Use EventEmitter instead of simple hook point
Browse files Browse the repository at this point in the history
  • Loading branch information
fenying committed Apr 15, 2020
1 parent f64a9b8 commit 59c4016
Show file tree
Hide file tree
Showing 10 changed files with 52 additions and 29 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@
## v0.2.0

- refactor(global): Improved the experience.
- refactor(client): Use EventEmitter instead of simple hook point.
- perf(encoder): Fixed the debuff of encoder.
2 changes: 1 addition & 1 deletion commitlint.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ module.exports = {
'global'
]],
'scope-empty': [2, 'never'],
'subject-case': [2, 'always', 'lowerCase'],
'subject-case': [2, 'never', 'lowerCase'],
'subject-min-length': [2, 'always', 5],
'subject-max-length': [2, 'always', 50],
}
Expand Down
41 changes: 28 additions & 13 deletions src/benchmarks/Http/Client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,48 @@
import * as $Televoke from '../../lib';
import { IGa, BENCHMARK_SERVER_PORT, BENCHMARK_SERVER_HOST } from './API';

const ridGenerator = (function() {

let i = 0;
return () => i++;
})();

const CONCURRENCY = 10000;

(async () => {

const client = $Televoke.createHttpClient<IGa>(BENCHMARK_SERVER_HOST, BENCHMARK_SERVER_PORT, Math.random);
const client = $Televoke.createHttpClient<IGa>(BENCHMARK_SERVER_HOST, BENCHMARK_SERVER_PORT, ridGenerator);

await client.connect();

console.time('TCP Invoke Concurrent');
await Promise.all(Array(10000).fill(0).map(() => client.invoke('hi', {name: 'Angus'})));
console.timeEnd('TCP Invoke Concurrent');
await Promise.all(Array(CONCURRENCY).fill(0).map(() => client.invoke('hi', {name: 'Angus'})));

for (let i = 0; i < CONCURRENCY; i++) {

await client.invoke('hi', {name: 'Angus'});
}

console.time(`TCP ${CONCURRENCY} Invokes Concurrent`);
await Promise.all(Array(CONCURRENCY).fill(0).map(() => client.invoke('hi', {name: 'Angus'})));
console.timeEnd(`TCP ${CONCURRENCY} Invokes Concurrent`);

console.time('TCP Invoke Sequence');
for (let i = 0; i < 10000; i++) {
console.time(`TCP ${CONCURRENCY} Invokes Sequence`);
for (let i = 0; i < CONCURRENCY; i++) {

await client.invoke('hi', {name: 'Angus'});
}
console.timeEnd('TCP Invoke Sequence');
console.timeEnd(`TCP ${CONCURRENCY} Invokes Sequence`);

console.time('TCP Call Concurrent');
await Promise.all(Array(10000).fill(0).map(() => client.call('hi', {name: 'Angus'})));
console.timeEnd('TCP Call Concurrent');
console.time(`TCP ${CONCURRENCY} Calls Concurrent`);
await Promise.all(Array(CONCURRENCY).fill(0).map(() => client.call('hi', {name: 'Angus'})));
console.timeEnd(`TCP ${CONCURRENCY} Calls Concurrent`);

console.time('TCP Call Sequence');
for (let i = 0; i < 10000; i++) {
console.time(`TCP ${CONCURRENCY} Calls Sequence`);
for (let i = 0; i < CONCURRENCY; i++) {

await client.call('hi', {name: 'Angus'});
}
console.timeEnd('TCP Call Sequence');
console.timeEnd(`TCP ${CONCURRENCY} Calls Sequence`);

await client.close();

Expand Down
4 changes: 2 additions & 2 deletions src/benchmarks/TCP/Client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ const ridGenerator = (function() {
return () => i++;
})();

const CONCURRENCY = 10000;
const CONCURRENCY = 30000;

(async () => {

const client = $Televoke.createTCPClient<IGa>(BENCHMARK_SERVER_HOST, BENCHMARK_SERVER_PORT, ridGenerator);
const client = $Televoke.createTCPClient<IGa>(BENCHMARK_SERVER_HOST, BENCHMARK_SERVER_PORT, ridGenerator, 60000);

await client.connect();

Expand Down
5 changes: 2 additions & 3 deletions src/lib/Client/Common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

import * as G from '../Common';
import { Events } from '@litert/observable';

export type IRIDGenerator = () => string | number;

Expand All @@ -25,9 +26,7 @@ export interface IResponse<A> extends G.IRawResponse<A> {
crt: number;
}

export interface IClient<S extends G.IServiceAPIs = G.IServiceAPIs> {

onError: (e: unknown) => void;
export interface IClient<S extends G.IServiceAPIs = G.IServiceAPIs> extends Events.IObservable<Events.ICallbackDefinitions> {

connect(): Promise<void>;

Expand Down
9 changes: 6 additions & 3 deletions src/lib/Client/HttpClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ import * as C from './Common';
import * as GE from '../Errors';
import * as G from '../Common';
import * as E from './Errors';
import { Events } from '@litert/observable';

class HttpClient implements C.IClient {
class HttpClient extends Events.EventEmitter<Events.ICallbackDefinitions> implements C.IClient {

private _agent: $Http.Agent;

Expand All @@ -33,6 +34,8 @@ class HttpClient implements C.IClient {
private _timeout: number = 30000
) {

super();

this._agent = new $Http.Agent({
maxSockets: 0,
keepAlive: true,
Expand Down Expand Up @@ -68,7 +71,7 @@ class HttpClient implements C.IClient {

const length = Buffer.byteLength(content);

if (length > 67108864) {
if (length > G.MAX_PACKET_SIZE) {

return reject(new GE.E_PACKET_TOO_LARGE());
}
Expand All @@ -92,7 +95,7 @@ class HttpClient implements C.IClient {

const length = parseInt(resp.headers['content-length']);

if (!Number.isSafeInteger(length) || length > 67108864) { // Maximum request packet is 64MB
if (!Number.isSafeInteger(length) || length > G.MAX_PACKET_SIZE) { // Maximum request packet is 64MB

return reject(new GE.E_PACKET_TOO_LARGE());
}
Expand Down
12 changes: 7 additions & 5 deletions src/lib/Client/TCPClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { Promises } from '@litert/observable';
import * as E from './Errors';
import { createDecoder } from '../Encoder/Decoder';
import { createEncoder } from '../Encoder/Encoder';
import { Events } from '@litert/observable';

enum EStatus {

Expand All @@ -45,7 +46,7 @@ interface IRequest {
packet?: G.IRawRequest;
}

class TCPClient implements C.IClient {
class TCPClient extends Events.EventEmitter<Events.ICallbackDefinitions> implements C.IClient {

private static _promises = Promises.getGlobalFactory();

Expand All @@ -71,15 +72,15 @@ class TCPClient implements C.IClient {

private _encoder = createEncoder();

public onError!: (e: unknown) => void;

public constructor(
private _host: string,
private _port: number,
private _ridGenerator: C.IRIDGenerator,
private _timeout: number = 30000
) {

super();

this._connPrId = `litert:televoke:tcp:client:${this._clientId}:connect`;
this._closePrId = `litert:televoke:tcp:client:${this._clientId}:close`;
}
Expand Down Expand Up @@ -241,8 +242,8 @@ class TCPClient implements C.IClient {

const decoder = createDecoder<G.IRawResponse>();

decoder.onLogicError = this.onError;
decoder.onProtocolError = this.onError;
decoder.onLogicError = (e: unknown) => this.emit('error', e);
decoder.onProtocolError = (e: unknown) => this.emit('error', e);

decoder.onData = (rawData: G.IRawResponse): void => {

Expand All @@ -254,6 +255,7 @@ class TCPClient implements C.IClient {

return;
}

data.cst = req.cst;
data.crt = Date.now();

Expand Down
2 changes: 2 additions & 0 deletions src/lib/Common/Encoding.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
* limitations under the License.
*/

export const MAX_PACKET_SIZE = 67108864;

export interface IWritable {

write(buf: Buffer | string, cb?: () => void): void;
Expand Down
2 changes: 1 addition & 1 deletion src/lib/Encoder/Decoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ class Decoder implements C.IDecoder<any> {
}
}

if (this._packetLength > 67108864) { // Maximum packet size is 64M
if (this._packetLength > C.MAX_PACKET_SIZE) { // Maximum packet size is 64M

this.onProtocolError(new GE.E_PACKET_TOO_LARGE());
this.reset();
Expand Down
2 changes: 1 addition & 1 deletion src/lib/Server/Gateways/Http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class HttpGateway implements C.IGateway {

const length = parseInt(req.headers['content-length']);

if (!Number.isSafeInteger(length) || length > 67108864) { // Maximum request packet is 64MB
if (!Number.isSafeInteger(length) || length > G.MAX_PACKET_SIZE) { // Maximum request packet is 64MB

resp.socket.destroy();
return;
Expand Down

0 comments on commit 59c4016

Please sign in to comment.