Skip to content

Commit

Permalink
refactor(rabbit-bus): unifies producer and consumer channel
Browse files Browse the repository at this point in the history
  • Loading branch information
gtoselli committed Apr 22, 2024
1 parent 3f1b048 commit 6a6a038
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 48 deletions.
5 changes: 5 additions & 0 deletions .changeset/tall-feet-build.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@fizzbuds/ddd-toolkit-rabbit-bus": patch
---

unifies producer and consumer channel
50 changes: 17 additions & 33 deletions packages/ddd-toolkit-rabbit-bus/src/rabbit-connection.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Channel, ConfirmChannel, connect, Connection } from 'amqplib';
import { ConfirmChannel, connect, Connection } from 'amqplib';
import { ILogger } from '@fizzbuds/ddd-toolkit';
import { inspect } from 'util';

Expand All @@ -9,8 +9,7 @@ export class RabbitConnection {
private static RECONNECTION_TIMEOUT = 2000;

private connection: Connection;
private consumerChannel: Channel;
private producerChannel: ConfirmChannel;
private channel: ConfirmChannel;

private waiting = false;
private stopping = false;
Expand Down Expand Up @@ -39,8 +38,7 @@ export class RabbitConnection {
this.logger.debug(`Connection with rabbit closed with ${inspect(reason)} try to reconnect`);
this.scheduleReconnection();
});
await this.setupConsumerChannel();
await this.setupProducerChannel();
await this.setupChannel();
await this.setupExchanges();
await this.setupDqlQueue();
this.logger.debug('Rabbit connection established');
Expand All @@ -50,19 +48,14 @@ export class RabbitConnection {
}
}

public getConsumerChannel(): Channel {
return this.consumerChannel;
}

public getProducerChannel(): ConfirmChannel {
return this.producerChannel;
public getChannel(): ConfirmChannel {
return this.channel;
}

public async terminate() {
this.logger.debug('Stopping rabbit connection');
this.stopping = true;
await this.producerChannel?.close();
await this.consumerChannel?.close();
await this.channel?.close();
await this.connection?.close();
this.logger.debug('Rabbit connection stopped');
}
Expand All @@ -84,43 +77,34 @@ export class RabbitConnection {
}, RabbitConnection.RECONNECTION_TIMEOUT);
}

private async setupConsumerChannel() {
this.consumerChannel = await this.connection.createConfirmChannel();
await this.consumerChannel.prefetch(this.prefetch);
private async setupChannel() {
this.channel = await this.connection.createConfirmChannel();
await this.channel.prefetch(this.prefetch);

this.consumerChannel.on('error', async (err) => {
this.channel.on('error', async (err) => {
if (!this.stopping) return;
this.logger.error(`Consumer channel with rabbit closed with ${inspect(err)} try to recreate`);
await new Promise((resolve) => setTimeout(resolve, RabbitConnection.RECONNECTION_TIMEOUT));
await this.setupConsumerChannel();
});
}

private async setupProducerChannel() {
this.producerChannel = await this.connection.createConfirmChannel();
this.producerChannel.on('error', async (err) => {
this.logger.error(`Producer channel with rabbit closed with ${inspect(err)} try to recreate`);
await new Promise((resolve) => setTimeout(resolve, RabbitConnection.RECONNECTION_TIMEOUT));
await this.setupProducerChannel();
await this.setupChannel();
});
}

private async setupExchanges() {
if (!this.producerChannel) throw new Error('Unable to setup exchange because channel is null');
await this.producerChannel.assertExchange(this.exchangeName, 'direct', {
if (!this.channel) throw new Error('Unable to setup exchange because channel is null');
await this.channel.assertExchange(this.exchangeName, 'direct', {
durable: true,
});
await this.producerChannel.assertExchange(this.deadLetterExchangeName, 'topic');
await this.channel.assertExchange(this.deadLetterExchangeName, 'topic');
}

private async setupDqlQueue() {
if (!this.consumerChannel) throw new Error('Unable to setup dql queue because channel is null');
await this.consumerChannel.assertQueue(this.deadLetterQueueName, {
if (!this.channel) throw new Error('Unable to setup dql queue because channel is null');
await this.channel.assertQueue(this.deadLetterQueueName, {
durable: true,
arguments: {
'x-queue-type': 'quorum',
},
});
await this.consumerChannel.bindQueue(this.deadLetterQueueName, this.deadLetterExchangeName, '#');
await this.channel.bindQueue(this.deadLetterQueueName, this.deadLetterExchangeName, '#');
}
}
8 changes: 4 additions & 4 deletions packages/ddd-toolkit-rabbit-bus/src/rabbit-event-bus.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ describe('RabbitEventBus', () => {

describe('When publish an invalid event (not json)', () => {
it('should log with warn level', async () => {
rabbitEventBus['connection']['producerChannel'].publish('exchange', 'FooEvent', Buffer.from(''));
await rabbitEventBus['connection']['producerChannel'].waitForConfirms();
rabbitEventBus['connection']['channel'].publish('exchange', 'FooEvent', Buffer.from(''));
await rabbitEventBus['connection']['channel'].waitForConfirms();

await waitFor(() =>
expect(loggerMock.warn).toBeCalledWith('Message discarded due to invalid format (not json)'),
Expand All @@ -65,12 +65,12 @@ describe('RabbitEventBus', () => {

describe('When publish an invalid event (without event)', () => {
it('should log with warn level', async () => {
rabbitEventBus['connection']['producerChannel'].publish(
rabbitEventBus['connection']['channel'].publish(
'exchange',
'FooEvent',
Buffer.from(JSON.stringify({})),
);
await rabbitEventBus['connection']['producerChannel'].waitForConfirms();
await rabbitEventBus['connection']['channel'].waitForConfirms();

await waitFor(() => expect(loggerMock.warn).toBeCalledWith('Message discarded due to invalid format'));
});
Expand Down
22 changes: 11 additions & 11 deletions packages/ddd-toolkit-rabbit-bus/src/rabbit-event-bus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,23 +49,23 @@ export class RabbitEventBus implements IEventBus {
if (this.handlers.find((h) => h.queueName === queueName))
throw new Error(`Handler ${handler.constructor.name} already exists`);

await this.connection.getConsumerChannel().assertQueue(queueName, {
await this.connection.getChannel().assertQueue(queueName, {
durable: true,
arguments: { 'x-queue-type': 'quorum', 'x-expires': this.queueExpirationMs },
deadLetterExchange: this.deadLetterExchangeName,
});

this.handlers.push({ eventName: event.name, queueName, handler });

await this.connection.getConsumerChannel().consume(queueName, (msg) => this.onMessage(msg, queueName));
await this.connection.getConsumerChannel().bindQueue(queueName, this.exchangeName, event.name);
await this.connection.getChannel().consume(queueName, (msg) => this.onMessage(msg, queueName));
await this.connection.getChannel().bindQueue(queueName, this.exchangeName, event.name);
}

public async publish<T extends IEvent<unknown>>(event: T): Promise<void> {
const serializedEvent = JSON.stringify(event);
const message = Buffer.from(serializedEvent);
this.connection.getProducerChannel().publish(this.exchangeName, event.name, message);
await this.connection.getProducerChannel().waitForConfirms();
this.connection.getChannel().publish(this.exchangeName, event.name, message);
await this.connection.getChannel().waitForConfirms();
}

public async terminate(): Promise<void> {
Expand All @@ -79,13 +79,13 @@ export class RabbitEventBus implements IEventBus {
try {
parsedMessage = JSON.parse(rawMessage.content.toString());
} catch (e) {
this.connection.getConsumerChannel().nack(rawMessage, false, false);
this.connection.getChannel().nack(rawMessage, false, false);
this.logger.warn(`Message discarded due to invalid format (not json)`);
return;
}

if (!this.isAValidMessage(parsedMessage)) {
this.connection.getConsumerChannel().nack(rawMessage, false, false);
this.connection.getChannel().nack(rawMessage, false, false);
this.logger.warn(`Message discarded due to invalid format`);
return;
}
Expand All @@ -95,23 +95,23 @@ export class RabbitEventBus implements IEventBus {
const handler = this.handlers.find((h) => h.eventName === event.name && h.queueName === queueName)?.handler;

if (!handler) {
this.connection.getConsumerChannel().nack(rawMessage, false, false);
this.connection.getChannel().nack(rawMessage, false, false);
this.logger.warn(`Message discarded due to missing handler for ${event.name}`);
return;
}

try {
await handler.handle(event);
this.connection.getConsumerChannel().ack(rawMessage);
this.connection.getChannel().ack(rawMessage);
} catch (e) {
this.logger.warn(`Error handling message due ${inspect(e)}`);
const deliveryCount = rawMessage.properties.headers?.['x-delivery-count'] || 0;
if (deliveryCount < this.maxAttempts) {
await new Promise((resolve) => setTimeout(resolve, this.exponentialBackoff.getDelay(deliveryCount)));
this.connection.getConsumerChannel().nack(rawMessage, false, true);
this.connection.getChannel().nack(rawMessage, false, true);
this.logger.warn(`Message re-queued due ${inspect(e)}`);
} else {
this.connection.getConsumerChannel().nack(rawMessage, false, false);
this.connection.getChannel().nack(rawMessage, false, false);
this.logger.error(`Message sent to dlq due ${inspect(e)}`);
}
}
Expand Down

0 comments on commit 6a6a038

Please sign in to comment.