Skip to content

Commit

Permalink
fix(bus): channel closed error while processing an incoming message (#…
Browse files Browse the repository at this point in the history
…161)

closes #159
  • Loading branch information
derevnjuk authored Jun 19, 2023
1 parent a1217c4 commit 8e9363d
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 24 deletions.
13 changes: 8 additions & 5 deletions packages/bus/src/dispatchers/RMQEventBus.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
Command,
Event,
EventHandler,
EventHandlerNotFound,
Logger,
NoResponse,
RetryStrategy
Expand Down Expand Up @@ -615,7 +616,7 @@ describe('RMQEventBus', () => {
).once();
});

it('should throw an error if no active subscriptions', async () => {
it('should log an error if no active subscriptions', async () => {
// arrange
const payload = { foo: 'bar' };
const message = {
Expand All @@ -630,11 +631,13 @@ describe('RMQEventBus', () => {
}
} as ConsumeMessage;

// act / assert
// act
await processMessage(message);
// assert
verify(spiedHandler.handle(anything())).never();
await expect(processMessage(message)).rejects.toThrow(
'Event handler not found'
);
verify(
mockedLogger.error(anyString(), anyOfClass(EventHandlerNotFound))
).once();
});

it('should skip a redelivered event', async () => {
Expand Down
38 changes: 19 additions & 19 deletions packages/bus/src/dispatchers/RMQEventBus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ export class RMQEventBus implements EventBus {
string,
EventHandler<unknown, unknown>[]
>();
private readonly consumerTags: string[] = [];

private readonly REPLY_QUEUE_NAME = 'amq.rabbitmq.reply-to';

constructor(
Expand Down Expand Up @@ -141,17 +139,12 @@ export class RMQEventBus implements EventBus {
public async destroy(): Promise<void> {
try {
if (this.channel) {
await Promise.all(
this.consumerTags.map(consumerTag =>
this.channel?.cancel(consumerTag)
)
);
await this.channel.cancelAll();
await this.channel.close();
}

delete this.channel;

this.consumerTags.splice(0, this.consumerTags.length);
this.subject.removeAllListeners();
} catch (e) {
this.logger.error('Cannot terminate event bus gracefully');
Expand Down Expand Up @@ -272,27 +265,34 @@ export class RMQEventBus implements EventBus {
}

private async startReplyQueueConsume(channel: Channel): Promise<void> {
const { consumerTag } = await channel.consume(
await channel.consume(
this.REPLY_QUEUE_NAME,
(msg: ConsumeMessage | null) => (msg ? this.processReply(msg) : void 0),
{
noAck: true
}
);

this.consumerTags.push(consumerTag);
}

private async startBasicConsume(channel: Channel): Promise<void> {
const { consumerTag } = await channel.consume(
await channel.consume(
this.options.clientQueue,
(msg: ConsumeMessage | null) => (msg ? this.processMessage(msg) : void 0),
async (msg: ConsumeMessage | null) => {
try {
if (msg) {
await this.processMessage(msg);
}
} catch (e) {
this.logger.error(
'Error while processing a message due to error occurred: ',
e
);
}
},
{
noAck: true
}
);

this.consumerTags.push(consumerTag);
}

private async bindExchangesToQueue(channel: Channel): Promise<void> {
Expand Down Expand Up @@ -371,12 +371,12 @@ export class RMQEventBus implements EventBus {
});
}
} catch (e) {
this.logger.debug(
'Error while processing a message (%s) due to error occurred: %s. Event: %j',
this.logger.error(
'Error occurred while precessing a message (%s)',
event.correlationId,
e.message,
event
e
);
this.logger.debug('Failed message (%s): %j', event.correlationId, event);
}
}

Expand Down

0 comments on commit 8e9363d

Please sign in to comment.