From ed395015d0e519ebf8ef9d7b0111053abfa068eb Mon Sep 17 00:00:00 2001 From: Gabriele Toselli Date: Wed, 27 Mar 2024 23:04:07 +0100 Subject: [PATCH] feat(rabbit-bus): first running implementation --- cspell.json | 6 +- packages/ddd-tookit-rabbit-bus/package.json | 1 + .../src/rabbit-event-bus.spec.ts | 93 +++++++++++++++++ .../src/rabbit-event-bus.ts | 99 +++++++++++++++++++ packages/ddd-tookit/src/index.ts | 3 + pnpm-lock.yaml | 9 ++ 6 files changed, 208 insertions(+), 3 deletions(-) create mode 100644 packages/ddd-tookit-rabbit-bus/src/rabbit-event-bus.spec.ts create mode 100644 packages/ddd-tookit-rabbit-bus/src/rabbit-event-bus.ts diff --git a/cspell.json b/cspell.json index 52a3b27..6cc2e8b 100644 --- a/cspell.json +++ b/cspell.json @@ -1,4 +1,4 @@ { - "language": "en", - "words": [] -} \ No newline at end of file + "language": "en", + "words": ["fizzbuds"] +} diff --git a/packages/ddd-tookit-rabbit-bus/package.json b/packages/ddd-tookit-rabbit-bus/package.json index 566a57a..6680b75 100644 --- a/packages/ddd-tookit-rabbit-bus/package.json +++ b/packages/ddd-tookit-rabbit-bus/package.json @@ -20,6 +20,7 @@ "amqplib": "^0.10.3" }, "devDependencies": { + "@types/amqplib": "^0.10.5", "@types/jest": "^29.5.2", "@types/lodash": "^4.14.195", "@types/node": "^20.3.1", diff --git a/packages/ddd-tookit-rabbit-bus/src/rabbit-event-bus.spec.ts b/packages/ddd-tookit-rabbit-bus/src/rabbit-event-bus.spec.ts new file mode 100644 index 0000000..6cd8c93 --- /dev/null +++ b/packages/ddd-tookit-rabbit-bus/src/rabbit-event-bus.spec.ts @@ -0,0 +1,93 @@ +import { RabbitEventBus } from './rabbit-event-bus'; +import { Event, IEventHandler, ILogger } from '@fizzbuds/ddd-toolkit/src'; + +const loggerMock: ILogger = { + log: jest.fn(), + debug: jest.fn(), + warn: jest.fn(), + error: jest.fn(), +}; + +class FooEvent extends Event<{ foo: string }> { + constructor(public readonly payload: { foo: string }) { + super(payload); + } +} + +describe('RabbitEventBus', () => { + describe('Given a RabbitEventBus instance', () => { + let rabbitEventBus: RabbitEventBus; + + beforeEach(() => { + rabbitEventBus = new RabbitEventBus( + 'amqp://user:password@localhost', + 'exchange', + 'queue', + 10, + 3, + undefined, + loggerMock, + ); + }); + + describe('When init is called', () => { + beforeEach(async () => { + await rabbitEventBus.init(); + }); + + afterEach(async () => { + await rabbitEventBus.terminate(); + }); + + it('should be connected', async () => { + await rabbitEventBus.publish({ name: 'test', payload: 'test' }); + }); + }); + + describe('Given an initialized RabbitEventBus', () => { + beforeEach(async () => await rabbitEventBus.init()); + afterEach(async () => await rabbitEventBus.terminate()); + + describe('Given a handler subscribed to an event', () => { + const handlerMock = jest.fn(); + + class FooEventHandler implements IEventHandler { + async handle(event: FooEvent) { + handlerMock(event); + } + } + + beforeEach(() => { + rabbitEventBus.subscribe(FooEvent, new FooEventHandler()); + }); + + describe('When publish an event', () => { + it('should call the handler', async () => { + const event = new FooEvent({ foo: 'bar' }); + await rabbitEventBus.publish(event); + + await waitFor(() => expect(handlerMock).toBeCalledWith(event)); + }); + }); + }); + }); + }); +}); + +async function waitFor(statement: () => void, timeout = 1000): Promise { + const startTime = Date.now(); + + let latestStatementError; + while (true) { + try { + statement(); + return; + } catch (e) { + latestStatementError = e; + } + + if (Date.now() - startTime > timeout) throw latestStatementError; + + await new Promise((resolve) => setTimeout(resolve, 100)); + } +} diff --git a/packages/ddd-tookit-rabbit-bus/src/rabbit-event-bus.ts b/packages/ddd-tookit-rabbit-bus/src/rabbit-event-bus.ts new file mode 100644 index 0000000..8b02708 --- /dev/null +++ b/packages/ddd-tookit-rabbit-bus/src/rabbit-event-bus.ts @@ -0,0 +1,99 @@ +import { + ExponentialBackoff, + IEvent, + IEventBus, + IEventClass, + IEventHandler, + ILogger, + IRetryMechanism, +} from '@fizzbuds/ddd-toolkit/src'; +import { Channel, ConfirmChannel, connect, Connection, ConsumeMessage } from 'amqplib'; +import { inspect } from 'util'; + +export class RabbitEventBus implements IEventBus { + private connection: Connection; + private consumerChannel: Channel; + private producerChannel: ConfirmChannel; + + private handlers: { [key: string]: IEventHandler> } = {}; + + constructor( + private readonly amqpUrl: string, + private readonly exchangeName: string, + private readonly queueName: string, + private readonly consumerPrefetch: number = 10, + private readonly maxAttempts: number = 3, + private readonly exponentialBackoff: IRetryMechanism = new ExponentialBackoff(1000), + private readonly logger: ILogger, + ) {} + + public async init(): Promise { + this.connection = await connect(this.amqpUrl); + this.consumerChannel = await this.connection.createChannel(); + this.producerChannel = await this.connection.createConfirmChannel(); + + await this.consumerChannel.assertExchange(this.exchangeName, 'topic', { durable: true }); + await this.consumerChannel.assertQueue(this.queueName, { arguments: { 'x-queue-type': 'quorum' } }); + await this.consumerChannel.prefetch(this.consumerPrefetch); + + await this.consumerChannel.consume(this.queueName, this.onMessage.bind(this)); + } + + public async subscribe>(event: IEventClass, handler: IEventHandler): Promise { + if (this.handlers[event.name]) throw new Error(`Handler for event ${event.name} already exists`); + await this.consumerChannel.bindQueue(this.queueName, this.exchangeName, event.name); + + this.handlers[event.name] = handler; + } + + public async publish>(event: T): Promise { + const serializedEvent = JSON.stringify(event); + const message = Buffer.from(serializedEvent); + this.producerChannel.publish(this.exchangeName, event.name, message); + await this.producerChannel.waitForConfirms(); + } + + public async terminate(): Promise { + await this.consumerChannel.close(); + await this.producerChannel.close(); + await this.connection.close(); + } + + private async onMessage(rawMessage: ConsumeMessage | null) { + if (rawMessage === null) return; + const parsedMessage = JSON.parse(rawMessage.content.toString()); + + if (!this.isAValidMessage(parsedMessage)) { + this.consumerChannel.nack(rawMessage, false, false); + this.logger.warn(`Message discarded due to invalid format`); + return; + } + + const handler = this.handlers[parsedMessage.name]; + if (!handler) { + this.consumerChannel.nack(rawMessage, false, false); + this.logger.warn(`Message discarded due to missing handler for ${parsedMessage.name}`); + return; + } + + try { + await handler.handle(parsedMessage); + this.consumerChannel.ack(rawMessage); + } catch (e) { + this.logger.warn(`Error handling message due ${inspect(e)}`); + const deliveryCount = rawMessage.properties.headers?.['x-delivery-count'] || 0; + if (deliveryCount < this.maxAttempts) { + await new Promise((resolve) => setTimeout(resolve, this.exponentialBackoff.getDelay(deliveryCount))); + this.consumerChannel.nack(rawMessage, false, true); + this.logger.warn(`Message re-queued due ${inspect(e)}`); + } else { + this.consumerChannel.nack(rawMessage, false, false); + this.logger.error(`Message sent to dlq due ${inspect(e)}`); + } + } + } + + private isAValidMessage(parsedMessage: any): boolean { + return !!(parsedMessage.name && parsedMessage.payload); + } +} diff --git a/packages/ddd-tookit/src/index.ts b/packages/ddd-tookit/src/index.ts index 458dc5a..344c7ff 100644 --- a/packages/ddd-tookit/src/index.ts +++ b/packages/ddd-tookit/src/index.ts @@ -5,3 +5,6 @@ export * from './generic-id'; export * from './errors'; export * from './repo/mongo-query-repo'; export * from './event-bus/event-bus.interface'; +export * from './logger'; +export * from './event-bus/event'; +export * from './event-bus/exponential-backoff'; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 99f1578..70c0bec 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -97,6 +97,9 @@ importers: specifier: workspace:^ version: link:../ddd-tookit devDependencies: + '@types/amqplib': + specifier: ^0.10.5 + version: 0.10.5 '@types/jest': specifier: ^29.5.2 version: 29.5.2 @@ -2399,6 +2402,12 @@ packages: resolution: {integrity: sha512-vxhUy4J8lyeyinH7Azl1pdd43GJhZH/tP2weN8TntQblOY+A0XbT8DJk1/oCPuOOyg/Ja757rG0CgHcWC8OfMA==} dev: true + /@types/amqplib@0.10.5: + resolution: {integrity: sha512-/cSykxROY7BWwDoi4Y4/jLAuZTshZxd8Ey1QYa/VaXriMotBDoou7V/twJiOSHzU6t1Kp1AHAUXGCgqq+6DNeg==} + dependencies: + '@types/node': 20.3.1 + dev: true + /@types/babel__core@7.20.5: resolution: {integrity: sha512-qoQprZvz5wQFJwMDqeseRXWv3rqMvhgpbXFfVyWhbx9X47POIA6i/+dXefEmZKoAgOaTdaIgNSMqMIU61yRyzA==} dependencies: