Skip to content

Commit

Permalink
{consumer|provider}.disconnect() now returns a boolean
Browse files Browse the repository at this point in the history
  • Loading branch information
macno committed Dec 20, 2023
1 parent 7fcf015 commit 2f7afcc
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 6 deletions.
4 changes: 2 additions & 2 deletions src/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ import { EachMessageHandler, ProducerRecord } from 'kafkajs';
export interface Consumer {
connect(): Promise<void>;
setCallback(func: EachMessageHandler): void;
disconnect(): Promise<void>;
disconnect(): Promise<boolean>;
subscribe(topics: string[], autoCommit: boolean, fromBeginning: boolean): Promise<void>;
}

export interface Producer {
send(message: ProducerRecord): Promise<boolean>;
connect(): Promise<void>;
disconnect(): Promise<void>;
disconnect(): Promise<boolean>;
}
7 changes: 5 additions & 2 deletions src/kafka/kafka-consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,18 @@ export class KafkaConsumer implements GenericConsumer {
if (this.connected) return;
await this.consumer.connect();
this.connected = true;
this.logger.info('connected');
}

public async disconnect(): Promise<void> {
if (!this.connected) return;
public async disconnect(): Promise<boolean> {
if (!this.connected) return true;
try {
await this.consumer.disconnect();
this.connected = false;
return true;
} catch (err) {
this.logger.error(`Error on disconnect due to: ${err}`);
}
return false;
}
}
6 changes: 4 additions & 2 deletions src/kafka/kafka-producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@ export class KafkaProducer implements GenericProducer {
}
}

public async disconnect(): Promise<void> {
if (!this.connected) return;
public async disconnect(): Promise<boolean> {
if (!this.connected) return true;
try {
await this.producer.disconnect();
this.connected = false;
return true;
} catch (err) {
this.logger.error(`Error on disconnect due to: ${err}`);
}
return false;
}
}

0 comments on commit 2f7afcc

Please sign in to comment.