Skip to content

Commit

Permalink
feat(rabbit-bus): first running implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
gtoselli committed Mar 27, 2024
1 parent 79a5271 commit ed39501
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 3 deletions.
6 changes: 3 additions & 3 deletions cspell.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"language": "en",
"words": []
}
"language": "en",
"words": ["fizzbuds"]
}
1 change: 1 addition & 0 deletions packages/ddd-tookit-rabbit-bus/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"amqplib": "^0.10.3"
},
"devDependencies": {
"@types/amqplib": "^0.10.5",
"@types/jest": "^29.5.2",
"@types/lodash": "^4.14.195",
"@types/node": "^20.3.1",
Expand Down
93 changes: 93 additions & 0 deletions packages/ddd-tookit-rabbit-bus/src/rabbit-event-bus.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import { RabbitEventBus } from './rabbit-event-bus';
import { Event, IEventHandler, ILogger } from '@fizzbuds/ddd-toolkit/src';

const loggerMock: ILogger = {
log: jest.fn(),
debug: jest.fn(),
warn: jest.fn(),
error: jest.fn(),
};

class FooEvent extends Event<{ foo: string }> {
constructor(public readonly payload: { foo: string }) {
super(payload);
}
}

describe('RabbitEventBus', () => {
describe('Given a RabbitEventBus instance', () => {
let rabbitEventBus: RabbitEventBus;

beforeEach(() => {
rabbitEventBus = new RabbitEventBus(
'amqp://user:password@localhost',
'exchange',
'queue',
10,
3,
undefined,
loggerMock,
);
});

describe('When init is called', () => {
beforeEach(async () => {
await rabbitEventBus.init();
});

afterEach(async () => {
await rabbitEventBus.terminate();
});

it('should be connected', async () => {
await rabbitEventBus.publish({ name: 'test', payload: 'test' });
});
});

describe('Given an initialized RabbitEventBus', () => {
beforeEach(async () => await rabbitEventBus.init());
afterEach(async () => await rabbitEventBus.terminate());

describe('Given a handler subscribed to an event', () => {
const handlerMock = jest.fn();

class FooEventHandler implements IEventHandler<FooEvent> {
async handle(event: FooEvent) {
handlerMock(event);
}
}

beforeEach(() => {
rabbitEventBus.subscribe(FooEvent, new FooEventHandler());
});

describe('When publish an event', () => {
it('should call the handler', async () => {
const event = new FooEvent({ foo: 'bar' });
await rabbitEventBus.publish(event);

await waitFor(() => expect(handlerMock).toBeCalledWith(event));
});
});
});
});
});
});

async function waitFor(statement: () => void, timeout = 1000): Promise<void> {
const startTime = Date.now();

let latestStatementError;
while (true) {
try {
statement();
return;
} catch (e) {
latestStatementError = e;
}

if (Date.now() - startTime > timeout) throw latestStatementError;

await new Promise((resolve) => setTimeout(resolve, 100));
}
}
99 changes: 99 additions & 0 deletions packages/ddd-tookit-rabbit-bus/src/rabbit-event-bus.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import {
ExponentialBackoff,
IEvent,
IEventBus,
IEventClass,
IEventHandler,
ILogger,
IRetryMechanism,
} from '@fizzbuds/ddd-toolkit/src';
import { Channel, ConfirmChannel, connect, Connection, ConsumeMessage } from 'amqplib';
import { inspect } from 'util';

export class RabbitEventBus implements IEventBus {
private connection: Connection;
private consumerChannel: Channel;
private producerChannel: ConfirmChannel;

private handlers: { [key: string]: IEventHandler<IEvent<unknown>> } = {};

constructor(
private readonly amqpUrl: string,
private readonly exchangeName: string,
private readonly queueName: string,
private readonly consumerPrefetch: number = 10,
private readonly maxAttempts: number = 3,
private readonly exponentialBackoff: IRetryMechanism = new ExponentialBackoff(1000),
private readonly logger: ILogger,
) {}

public async init(): Promise<void> {
this.connection = await connect(this.amqpUrl);
this.consumerChannel = await this.connection.createChannel();
this.producerChannel = await this.connection.createConfirmChannel();

await this.consumerChannel.assertExchange(this.exchangeName, 'topic', { durable: true });
await this.consumerChannel.assertQueue(this.queueName, { arguments: { 'x-queue-type': 'quorum' } });
await this.consumerChannel.prefetch(this.consumerPrefetch);

await this.consumerChannel.consume(this.queueName, this.onMessage.bind(this));
}

public async subscribe<T extends IEvent<unknown>>(event: IEventClass<T>, handler: IEventHandler<T>): Promise<void> {
if (this.handlers[event.name]) throw new Error(`Handler for event ${event.name} already exists`);
await this.consumerChannel.bindQueue(this.queueName, this.exchangeName, event.name);

this.handlers[event.name] = handler;
}

public async publish<T extends IEvent<unknown>>(event: T): Promise<void> {
const serializedEvent = JSON.stringify(event);
const message = Buffer.from(serializedEvent);
this.producerChannel.publish(this.exchangeName, event.name, message);
await this.producerChannel.waitForConfirms();
}

public async terminate(): Promise<void> {
await this.consumerChannel.close();
await this.producerChannel.close();
await this.connection.close();
}

private async onMessage(rawMessage: ConsumeMessage | null) {
if (rawMessage === null) return;
const parsedMessage = JSON.parse(rawMessage.content.toString());

if (!this.isAValidMessage(parsedMessage)) {
this.consumerChannel.nack(rawMessage, false, false);
this.logger.warn(`Message discarded due to invalid format`);
return;
}

const handler = this.handlers[parsedMessage.name];
if (!handler) {
this.consumerChannel.nack(rawMessage, false, false);
this.logger.warn(`Message discarded due to missing handler for ${parsedMessage.name}`);
return;
}

try {
await handler.handle(parsedMessage);
this.consumerChannel.ack(rawMessage);
} catch (e) {
this.logger.warn(`Error handling message due ${inspect(e)}`);
const deliveryCount = rawMessage.properties.headers?.['x-delivery-count'] || 0;
if (deliveryCount < this.maxAttempts) {
await new Promise((resolve) => setTimeout(resolve, this.exponentialBackoff.getDelay(deliveryCount)));
this.consumerChannel.nack(rawMessage, false, true);
this.logger.warn(`Message re-queued due ${inspect(e)}`);
} else {
this.consumerChannel.nack(rawMessage, false, false);
this.logger.error(`Message sent to dlq due ${inspect(e)}`);
}
}
}

private isAValidMessage(parsedMessage: any): boolean {
return !!(parsedMessage.name && parsedMessage.payload);
}
}
3 changes: 3 additions & 0 deletions packages/ddd-tookit/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,6 @@ export * from './generic-id';
export * from './errors';
export * from './repo/mongo-query-repo';
export * from './event-bus/event-bus.interface';
export * from './logger';
export * from './event-bus/event';
export * from './event-bus/exponential-backoff';
9 changes: 9 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit ed39501

Please sign in to comment.