From 67d1799254108e3312667eea7a3d2af2117f7ebd Mon Sep 17 00:00:00 2001 From: stefanbaxter Date: Mon, 29 Apr 2024 00:21:38 +0000 Subject: [PATCH] customizations for our needs --- src/GsClient.ts | 22 ++++++---------------- 1 file changed, 6 insertions(+), 16 deletions(-) diff --git a/src/GsClient.ts b/src/GsClient.ts index d219178..8cc3ea6 100644 --- a/src/GsClient.ts +++ b/src/GsClient.ts @@ -138,11 +138,7 @@ export class GsClient { transactionTimeout: 30000, maxInFlightRequests: 1 }) - try { - kafkaProducer.connect() - } catch (e) { - log.error(e) - } + kafkaProducer.connect() return kafkaProducer } @@ -284,17 +280,7 @@ export class GsClient { let simple = simplifyJSON(result.data); log.silly(JSON.stringify(simple)); if (this.kafkaProducer !== undefined) { - if (this.kafka && !this.kafkaProducer.isConnected()) { - log.error('Kafka producer not available, Trying to reconnect.'); - this.getProducer(this.kafka) - } - if (!this.kafkaProducer.isConnected()) { - log.error('Kafka producer still not available, panic'); - this.offset -= 2; - this.terminateClient(); - process.abort(); - } - if (this.kafkaProducer.isConnected()) { + try { this.kafkaProducer.send({ topic: this.env.KAFKA_TOPIC || 'ohip-events', messages: [{ @@ -302,6 +288,10 @@ export class GsClient { value: JSON.stringify(simple) }] }); + this.kafkaProducer.commit(); + } catch (e) { + log.error(e); + this.kafkaProducer.abort() } } if (this.env.DUMP_TO_FILE && this.json_file !== undefined)