Skip to content

Commit

Permalink
stabilization of ohip ingestion
Browse files Browse the repository at this point in the history
  • Loading branch information
acmeguy committed Apr 29, 2024
1 parent 41aa73b commit bfd2c76
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 36 deletions.
69 changes: 34 additions & 35 deletions src/GsClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ export class GsClient {
log.silly('Ping sent');
timedOut = setTimeout(() => {
if (this.activeSocket && this.activeSocket.readyState === WebSocket.OPEN) {
this.terminateClient('Ping timeout');
this.startConsuming(true);
}
}, this.env.PING / 2); // if pong not received within this timeframe then recreate connection
},
Expand Down Expand Up @@ -282,7 +282,7 @@ export class GsClient {
this.setStat(result.data.newEvent.eventName);
// log.debug(`${result.data.newEvent.eventName}, offset ${result.data.newEvent.metadata.offset}, primaryKey ${result.data.newEvent.primaryKey}${(result.data.newEvent.hotelId) ? `, HotelID: ${result.data.newEvent.hotelId}` : ''}, Created at: ${result.data.newEvent.timestamp}`);
let simple = simplifyJSON(result.data);
log.silly(JSON.stringify(result.data));
// log.silly(JSON.stringify(result.data));
log.silly(JSON.stringify(simple));
if (this.kafkaProducer !== undefined) {
this.kafkaProducer.send({
Expand Down Expand Up @@ -354,42 +354,41 @@ export class GsClient {
this.windowCount = 0;
}

public async start(): Promise<void> {
this.client = undefined;

const initiate = async(reconnect: boolean = false) => {
this.terminateClient('Refreshing connection with new token');
if (reconnect) {
log.debug(`Refreshing an existing connection in ${this.env.TIMER} ms`);
await this.delay(this.env.TIMER);
} else {
log.debug('Initiating a new connection');
}
this.client = this.getClient();
if (this.kafka)
this.kafkaProducer = this.getProducer(this.kafka);
try {
await this.subscribe(this.client);
} catch (error) {
log.error(error);
log.debug(`Retrying in ${this.env.DELAY_BEFORE_RECONNECT} milliseconds`);
setTimeout(() => initiate(true), this.env.DELAY_BEFORE_RECONNECT);
}
};
public async startConsuming (reconnect: boolean = false, reason: string = '') {
this.terminateClient(`Refreshing connection with new token`);
if (reconnect) {
log.debug(`Refreshing an existing connection in ${this.env.TIMER}ms (${reason})`);
await this.delay(this.env.TIMER);
} else {
log.debug('Initiating a new connection');
}
this.client = this.getClient();
if (this.kafka)
this.kafkaProducer = this.getProducer(this.kafka);
try {
await this.subscribe(this.client);
} catch (error) {
log.error(error);
log.debug(`Retrying in ${this.env.DELAY_BEFORE_RECONNECT} milliseconds`);
setTimeout(() => this.startConsuming(true), this.env.DELAY_BEFORE_RECONNECT);
}
}

const stop = async() => {
try {
this.terminateClient('Application stopped by user');
process.exit(0);
} catch (error) {
log.error(error);
}
};
public async stopConsuming (reconnect: boolean = false) {
try {
this.terminateClient('Application stopped by user');
process.exit(0);
} catch (error) {
log.error(error);
}
}

setImmediate(() => initiate(false));
setInterval(() => {initiate(true);}, this.env.TOKEN_EXPIRY);
public async start(): Promise<void> {
this.client = undefined;
setImmediate(() => this.startConsuming(false));
setInterval(() => {this.startConsuming(true);}, this.env.TOKEN_EXPIRY);
if (this.env.RUN_FOR > 0) {
setInterval(() => {stop();}, this.env.RUN_FOR);
setInterval(() => {this.stopConsuming();}, this.env.RUN_FOR);
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/minimizer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ let monthIndex = ['JAN','FEB','MAR','APR','MAY','JUN','JUL','AUG','SEP','OCT','N
export function simplifyJSON(data: any): unknown {

let new_properties = data.newEvent.detail.reduce((acc: any, curr: any) => {
acc[curr.elementName.toLowerCase().replaceAll(' ','_')] = curr.newValue || curr.oldValue;
if (curr.newValue !== curr.oldValue || curr.newValue !== '')
acc[curr.elementName.toLowerCase().replaceAll(' ','_')] = curr.newValue || curr.oldValue;
return acc;
}, {});

Expand Down

0 comments on commit bfd2c76

Please sign in to comment.