From bfd2c765520bad65cdb7546ed14af58eb72adeb4 Mon Sep 17 00:00:00 2001 From: stefanbaxter Date: Mon, 29 Apr 2024 11:45:39 +0000 Subject: [PATCH] stabilization of ohip ingestion --- src/GsClient.ts | 69 ++++++++++++++++++++++++------------------------ src/minimizer.ts | 3 ++- 2 files changed, 36 insertions(+), 36 deletions(-) diff --git a/src/GsClient.ts b/src/GsClient.ts index e8f4678..82a3c28 100644 --- a/src/GsClient.ts +++ b/src/GsClient.ts @@ -218,7 +218,7 @@ export class GsClient { log.silly('Ping sent'); timedOut = setTimeout(() => { if (this.activeSocket && this.activeSocket.readyState === WebSocket.OPEN) { - this.terminateClient('Ping timeout'); + this.startConsuming(true); } }, this.env.PING / 2); // if pong not received within this timeframe then recreate connection }, @@ -282,7 +282,7 @@ export class GsClient { this.setStat(result.data.newEvent.eventName); // log.debug(`${result.data.newEvent.eventName}, offset ${result.data.newEvent.metadata.offset}, primaryKey ${result.data.newEvent.primaryKey}${(result.data.newEvent.hotelId) ? `, HotelID: ${result.data.newEvent.hotelId}` : ''}, Created at: ${result.data.newEvent.timestamp}`); let simple = simplifyJSON(result.data); - log.silly(JSON.stringify(result.data)); + // log.silly(JSON.stringify(result.data)); log.silly(JSON.stringify(simple)); if (this.kafkaProducer !== undefined) { this.kafkaProducer.send({ @@ -354,42 +354,41 @@ export class GsClient { this.windowCount = 0; } - public async start(): Promise { - this.client = undefined; - - const initiate = async(reconnect: boolean = false) => { - this.terminateClient('Refreshing connection with new token'); - if (reconnect) { - log.debug(`Refreshing an existing connection in ${this.env.TIMER} ms`); - await this.delay(this.env.TIMER); - } else { - log.debug('Initiating a new connection'); - } - this.client = this.getClient(); - if (this.kafka) - this.kafkaProducer = this.getProducer(this.kafka); - try { - await this.subscribe(this.client); - } catch (error) { - log.error(error); - log.debug(`Retrying in ${this.env.DELAY_BEFORE_RECONNECT} milliseconds`); - setTimeout(() => initiate(true), this.env.DELAY_BEFORE_RECONNECT); - } - }; + public async startConsuming (reconnect: boolean = false, reason: string = '') { + this.terminateClient(`Refreshing connection with new token`); + if (reconnect) { + log.debug(`Refreshing an existing connection in ${this.env.TIMER}ms (${reason})`); + await this.delay(this.env.TIMER); + } else { + log.debug('Initiating a new connection'); + } + this.client = this.getClient(); + if (this.kafka) + this.kafkaProducer = this.getProducer(this.kafka); + try { + await this.subscribe(this.client); + } catch (error) { + log.error(error); + log.debug(`Retrying in ${this.env.DELAY_BEFORE_RECONNECT} milliseconds`); + setTimeout(() => this.startConsuming(true), this.env.DELAY_BEFORE_RECONNECT); + } + } - const stop = async() => { - try { - this.terminateClient('Application stopped by user'); - process.exit(0); - } catch (error) { - log.error(error); - } - }; + public async stopConsuming (reconnect: boolean = false) { + try { + this.terminateClient('Application stopped by user'); + process.exit(0); + } catch (error) { + log.error(error); + } + } - setImmediate(() => initiate(false)); - setInterval(() => {initiate(true);}, this.env.TOKEN_EXPIRY); + public async start(): Promise { + this.client = undefined; + setImmediate(() => this.startConsuming(false)); + setInterval(() => {this.startConsuming(true);}, this.env.TOKEN_EXPIRY); if (this.env.RUN_FOR > 0) { - setInterval(() => {stop();}, this.env.RUN_FOR); + setInterval(() => {this.stopConsuming();}, this.env.RUN_FOR); } } diff --git a/src/minimizer.ts b/src/minimizer.ts index 340e1c9..d66614a 100644 --- a/src/minimizer.ts +++ b/src/minimizer.ts @@ -3,7 +3,8 @@ let monthIndex = ['JAN','FEB','MAR','APR','MAY','JUN','JUL','AUG','SEP','OCT','N export function simplifyJSON(data: any): unknown { let new_properties = data.newEvent.detail.reduce((acc: any, curr: any) => { - acc[curr.elementName.toLowerCase().replaceAll(' ','_')] = curr.newValue || curr.oldValue; + if (curr.newValue !== curr.oldValue || curr.newValue !== '') + acc[curr.elementName.toLowerCase().replaceAll(' ','_')] = curr.newValue || curr.oldValue; return acc; }, {});