Skip to content

Commit 4c7c1c0

Browse files
fix: fix up publishing to Kafka
There were a few issues with the Kafka producer - firstly, to enable it required setting 'KAFKA_ENABLED=false' due to an incorrect comparison with 'false'. The kafkajs library requires setting the partitioner value to not throw an error. I found it was necessary to modify some of the connection logic and add some async/await defs because we were publishing to a connection that was being closed, resulting in an error and subsequent failure to publish messages.
1 parent 84c946b commit 4c7c1c0

File tree

2 files changed

+31
-22
lines changed

2 files changed

+31
-22
lines changed

src/GsClient.ts

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { Call } from './Call.js';
55
import { simplifyEvent } from './minimizer.js';
66
import { v4 as uuidv4 } from 'uuid';
77
import {openSync, writeSync} from "node:fs";
8-
import {CompressionTypes, Kafka, KafkaConfig} from 'kafkajs';
8+
import {CompressionTypes, Kafka, KafkaConfig, Partitioners, Producer} from 'kafkajs';
99

1010
enum bucketTypes {
1111
HOUR = 'HOUR',
@@ -47,7 +47,7 @@ export interface Environment {
4747
KAFKA_HOST: string | undefined;
4848
KAFKA_USER: string | undefined;
4949
KAFKA_PASSWORD: string | undefined;
50-
KAFKA_TOPIC: string | undefined;
50+
KAFKA_TOPIC: string;
5151
KAFKA_CLIENT_ID: string | undefined;
5252
}
5353

@@ -68,6 +68,7 @@ export const errorCodeMappings: { [id: string] : string; } = {
6868
'1013': 'Try Again Later',
6969
'1014': 'Bad Gateway',
7070
'1015': 'TLS Handshake',
71+
'4408': 'Connection initialisation timeout',
7172
'4409': 'Too many requests',
7273
};
7374

@@ -92,7 +93,7 @@ export class GsClient {
9293
private client: Client | undefined;
9394
private client_id: any = undefined;
9495
private kafka: Kafka | undefined;
95-
private kafkaProducer: any | undefined;
96+
private kafkaProducer: Producer | undefined;
9697

9798
public constructor(env: Environment, wsUrl: string, oauthUrl: string, oauthOptions: OAuthOptions, call: Call) {
9899
this.offset = env.OFFSET;
@@ -110,50 +111,55 @@ export class GsClient {
110111
this.json_file = openSync('./events.json','w');
111112

112113
if (this.env.KAFKA_ENABLED && this.env.KAFKA_HOST) {
114+
log.debug('Setting up Kafka connection')
113115
let kafkaConfig: KafkaConfig = {
114116
brokers: this.env.KAFKA_HOST.split(','),
115-
clientId: 'ohip-shovel'
117+
clientId:this.env.KAFKA_CLIENT_ID,
118+
connectionTimeout: 30000,
116119
};
117120
if (this.env.KAFKA_USER !== undefined && this.env.KAFKA_PASSWORD !== undefined) {
118121
kafkaConfig['sasl'] = {
119122
mechanism: 'plain',
120-
username: this.env.KAFKA_USER,
121-
password: this.env.KAFKA_PASSWORD,
123+
username: this.env.KAFKA_USER,
124+
password: this.env.KAFKA_PASSWORD,
122125
}
123126
}
124127
this.kafka = new Kafka(kafkaConfig)
125128
this.kafkaProducer = this.getProducer(this.kafka);
129+
log.debug(`Kafka configured to write to topic ${this.env.KAFKA_TOPIC}`)
126130
}
127131
this.registerShutdownHook(); // make sure to dispose and terminate the client on shutdown
128132
}
129133

130-
public getProducer(kafka: Kafka): any {
134+
public getProducer(kafka: Kafka): Producer {
131135
let kafkaProducer = kafka.producer({
132136
allowAutoTopicCreation: true,
133137
transactionTimeout: 30000,
134-
maxInFlightRequests: 1
138+
maxInFlightRequests: 1,
139+
createPartitioner: Partitioners.LegacyPartitioner,
135140
})
136-
kafkaProducer.connect()
137141
return kafkaProducer
138142
}
139143

140-
public registerShutdownHook(): void {
141-
process.on('SIGINT', () => {
144+
public async registerShutdownHook(): Promise<void> {
145+
process.on('SIGINT', async () => {
142146
log.info('Received SIGINT signal');
143-
this.terminateClient('SIGINT');
147+
await this.terminateClient('SIGINT');
144148
setTimeout(process.exit(0), 2000);
145149
});
146-
process.on('SIGTERM', () => {
150+
process.on('SIGTERM', async () => {
147151
log.info('Received SIGTERM signal');
148-
this.terminateClient('SIGTERM');
152+
await this.terminateClient('SIGTERM');
149153
setTimeout(process.exit(0), 2000);
150154
});
151155
}
152156

153-
public terminateClient(reason: string): void {
157+
public async terminateClient(reason: string): Promise<void> {
154158
log.info(`Terminating client: ${reason}`)
155159
if (this.kafkaProducer !== undefined) {
156-
this.kafkaProducer.disconnect();
160+
log.info('Disconnecting from Kafka')
161+
await this.kafkaProducer.disconnect();
162+
log.info('Disconnected from Kafka')
157163
}
158164
if (this.offset !== undefined) {
159165
log.info(`Last offset processed: ${this.offset}`);
@@ -311,7 +317,7 @@ export class GsClient {
311317
}
312318

313319
private setStat(eventName: string): void {
314-
this.windowCount = this.windowCount + 1;
320+
this.windowCount = this.windowCount + 1;
315321
// total events per event type
316322
if (!this.statsSummary[eventName]){
317323
this.statsSummary[eventName] = 1;
@@ -355,16 +361,17 @@ export class GsClient {
355361
}
356362

357363
public async startConsuming (reconnect: boolean = false, reason: string = '') {
358-
this.terminateClient(`Refreshing connection with new token`);
364+
await this.terminateClient(`Refreshing connection with new token`);
359365
if (reconnect) {
360366
log.debug(`Refreshing an existing connection in ${this.env.TIMER}ms (${reason})`);
361367
await this.delay(this.env.TIMER);
362368
} else {
363369
log.debug('Initiating a new connection');
364370
}
365371
this.client = this.getClient();
366-
if (this.kafka)
367-
this.kafkaProducer = this.getProducer(this.kafka);
372+
if (this.kafkaProducer !== undefined)
373+
await this.kafkaProducer.connect()
374+
368375
try {
369376
await this.subscribe(this.client);
370377
} catch (error) {
@@ -376,7 +383,9 @@ export class GsClient {
376383

377384
public async stopConsuming (reconnect: boolean = false) {
378385
try {
379-
if (!reconnect) this.terminateClient('Application stopped by user');
386+
if (!reconnect) {
387+
await this.terminateClient('Application stopped by user');
388+
}
380389
process.exit(0);
381390
} catch (error) {
382391
log.error(error);

src/app.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ const env: Environment = {
3333
DUMP_TO_FILE: process.env.DUMP_TO_FILE==='true',
3434
SEGMENT_CONVERSION: process.env.SEGMENT_CONVERSION==='true',
3535
STACK_VALUES: process.env.STACK_VALUES==='true',
36-
KAFKA_ENABLED: process.env.KAFKA_ENABLED==='false',
36+
KAFKA_ENABLED: process.env.KAFKA_ENABLED==='true',
3737
KAFKA_HOST: process.env.KAFKA_HOST,
3838
KAFKA_USER: process.env.KAFKA_USER,
3939
KAFKA_PASSWORD: process.env.KAFKA_PASSWORD,

0 commit comments

Comments
 (0)