Skip to content

Commit

Permalink
customizations for our needs
Browse files Browse the repository at this point in the history
  • Loading branch information
acmeguy committed Apr 29, 2024
1 parent 72497e8 commit 67d1799
Showing 1 changed file with 6 additions and 16 deletions.
22 changes: 6 additions & 16 deletions src/GsClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,7 @@ export class GsClient {
transactionTimeout: 30000,
maxInFlightRequests: 1
})
try {
kafkaProducer.connect()
} catch (e) {
log.error(e)
}
kafkaProducer.connect()
return kafkaProducer
}

Expand Down Expand Up @@ -284,24 +280,18 @@ export class GsClient {
let simple = simplifyJSON(result.data);
log.silly(JSON.stringify(simple));
if (this.kafkaProducer !== undefined) {
if (this.kafka && !this.kafkaProducer.isConnected()) {
log.error('Kafka producer not available, Trying to reconnect.');
this.getProducer(this.kafka)
}
if (!this.kafkaProducer.isConnected()) {
log.error('Kafka producer still not available, panic');
this.offset -= 2;
this.terminateClient();
process.abort();
}
if (this.kafkaProducer.isConnected()) {
try {
this.kafkaProducer.send({
topic: this.env.KAFKA_TOPIC || 'ohip-events',
messages: [{
key: result.data.newEvent.metadata.uniqueEventId,
value: JSON.stringify(simple)
}]
});
this.kafkaProducer.commit();
} catch (e) {
log.error(e);
this.kafkaProducer.abort()
}
}
if (this.env.DUMP_TO_FILE && this.json_file !== undefined)
Expand Down

0 comments on commit 67d1799

Please sign in to comment.