Skip to content

Commit

Permalink
feat(bus): implement exponential backoff retry strategy (#14)
Browse files Browse the repository at this point in the history
closes #13
  • Loading branch information
RomanReznichenko authored Apr 12, 2022
1 parent 5c02ac6 commit 5c75843
Show file tree
Hide file tree
Showing 11 changed files with 1,324 additions and 1,141 deletions.
2,182 changes: 1,070 additions & 1,112 deletions package-lock.json

Large diffs are not rendered by default.

61 changes: 52 additions & 9 deletions packages/bus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ npm i -s @secbox/bus
To use the RabbitMQ Event Bus, pass the following options object to the constructor method:

```ts
import { RMQEventBus } from '@secbox/bus';
import { RMQEventBus, ExponentialBackoffRetryStrategy } from '@secbox/bus';

const config = new Configuration({
cluster: 'app.neuralegion.com'
Expand All @@ -24,15 +24,19 @@ const config = new Configuration({
const repeaterId = 'your Repeater ID';
const token = 'your API key';

const bus = new RMQEventBus(config.container, {
exchange: 'EventBus',
clientQueue: `agent:${repeaterId}`,
appQueue: 'app',
credentials: {
username: 'bot',
password: token
const bus = new RMQEventBus(
config.container,
new ExponentialBackoffRetryStrategy({ maxDepth: 5 }),
{
exchange: 'EventBus',
clientQueue: `agent:${repeaterId}`,
appQueue: 'app',
credentials: {
username: 'bot',
password: token
}
}
});
);
```

The options are specific to the chosen transporter. The `RabbitMQ` implementation exposes the properties described below:
Expand Down Expand Up @@ -160,6 +164,45 @@ await bus.execute(command);

For more information, please see `@secbox/core`.

#### Retry Strategy

For some noncritical operations, it is better to fail as soon as possible rather than retry a coupe of times.
For example, it is better to fail right after a smaller number of retries with only a short delay between retry attempts, and display a message to the user.

By default, you can use the [Exponential backoff](https://en.wikipedia.org/wiki/Exponential_backoff) retry strategy to retry an action when errors like `ETIMEDOUT` appear.

You can implement your own to match the business requirements and the nature of the failure:

```ts
export class CustomRetryStrategy implements RetryStrategy {
public async acquire<T extends (...args: unknown[]) => unknown>(
task: T
): Promise<ReturnType<T>> {
let times = 0;

for (;;) {
try {
return await task();
} catch {
times++;

if (times === 3) {
throw e;
}
}
}
}
}
```

Once a retry strategy is implemented, you can use it like that:

```ts
const retryStrategy = new CustomRetryStrategy();

const bus = new RMQEventBus(container, retryStrategy, options);
```

## License

Copyright © 2022 [NeuraLegion](https://github.com/NeuraLegion).
Expand Down
2 changes: 1 addition & 1 deletion packages/bus/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@
"bus"
],
"peerDependencies": {
"@secbox/core": "^0.2.0"
"@secbox/core": "^0.3.0"
}
}
23 changes: 20 additions & 3 deletions packages/bus/src/brokers/RMQEventBus.spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
/* eslint-disable max-classes-per-file */
import { RMQEventBus } from './RMQEventBus';
import { RMQEventBusConfig } from './RMQEventBusConfig';
import { bind, Command, Event, EventHandler, NoResponse } from '@secbox/core';
import {
bind,
Command,
Event,
EventHandler,
NoResponse,
RetryStrategy
} from '@secbox/core';
import {
anyFunction,
anyOfClass,
Expand Down Expand Up @@ -73,6 +80,7 @@ describe('RMQEventBus', () => {
const mockedChannelWrapper = mock<ChannelWrapper>();
const mockedChannel = mock<Channel>();
const mockedDependencyContainer = mock<DependencyContainer>();
const mockedRetryStrategy = mock<RetryStrategy>();
const options: RMQEventBusConfig = {
url: 'amqp://localhost:5672',
exchange: 'event-bus',
Expand Down Expand Up @@ -100,7 +108,14 @@ describe('RMQEventBus', () => {
when(
mockedChannel.consume(anyString(), anyFunction(), anything())
).thenResolve({ consumerTag: 'tag' } as any);
rmq = new RMQEventBus(instance(mockedDependencyContainer), options);
when(mockedRetryStrategy.acquire(anyFunction())).thenCall(
(callback: (...args: unknown[]) => unknown) => callback()
);
rmq = new RMQEventBus(
instance(mockedDependencyContainer),
instance(mockedRetryStrategy),
options
);
});

afterEach(() => {
Expand All @@ -111,12 +126,14 @@ describe('RMQEventBus', () => {
| Channel
| RMQEventBusConfig
| DependencyContainer
| RetryStrategy
>(
mockedConnectionManager,
mockedChannelWrapper,
mockedChannel,
spiedOptions,
mockedDependencyContainer
mockedDependencyContainer,
mockedRetryStrategy
);
jest.resetModules();
jest.resetAllMocks();
Expand Down
43 changes: 27 additions & 16 deletions packages/bus/src/brokers/RMQEventBus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
EventBus,
EventConstructor,
EventHandler,
RetryStrategy,
EventHandlerConstructor,
EventHandlerNotFound,
IllegalOperation,
Expand All @@ -27,6 +28,16 @@ interface ParsedConsumeMessage<T = unknown> {
correlationId?: string;
}

interface RawMessage<T = unknown> {
payload: T;
routingKey: string;
exchange?: string;
type?: string;
correlationId?: string;
replyTo?: string;
timestamp?: Date;
}

@autoInjectable()
export class RMQEventBus implements EventBus {
private client: AmqpConnectionManager | undefined;
Expand All @@ -46,6 +57,8 @@ export class RMQEventBus implements EventBus {

constructor(
private readonly container: DependencyContainer,
@inject(RetryStrategy)
private readonly retryStrategy: RetryStrategy,
@inject(RMQEventBusConfig) private readonly options: RMQEventBusConfig
) {
this.subject.setMaxListeners(Infinity);
Expand Down Expand Up @@ -102,8 +115,9 @@ export class RMQEventBus implements EventBus {
public async publish<T>(event: Event<T>): Promise<void> {
const { type, payload, correlationId, createdAt } = event;

await this.sendMessage(payload, {
await this.tryToSendMessage({
type,
payload,
correlationId,
routingKey: type,
timestamp: createdAt,
Expand All @@ -124,8 +138,9 @@ export class RMQEventBus implements EventBus {
: Promise.resolve(undefined);

try {
await this.sendMessage(payload, {
await this.tryToSendMessage({
type,
payload,
correlationId,
timestamp: createdAt,
routingKey: this.options.appQueue,
Expand Down Expand Up @@ -336,7 +351,8 @@ export class RMQEventBus implements EventBus {
const response = await handler.handle(event.payload);

if (response && event.replyTo) {
await this.sendMessage(response, {
await this.tryToSendMessage({
payload: response,
routingKey: event.replyTo,
correlationId: event.correlationId
});
Expand All @@ -346,31 +362,26 @@ export class RMQEventBus implements EventBus {
}
}

private async sendMessage(
payload: unknown,
options: {
routingKey: string;
exchange?: string;
type?: string;
correlationId?: string;
replyTo?: string;
timestamp?: Date;
}
): Promise<void> {
private async tryToSendMessage(options: RawMessage): Promise<void> {
await this.retryStrategy.acquire(() => this.sendMessage(options));
}

private sendMessage(options: RawMessage) {
if (!this.channel) {
throw new IllegalOperation(this);
}

const {
type,
payload,
replyTo,
routingKey,
correlationId,
type,
exchange = '',
timestamp = new Date()
} = options;

await this.channel.publish(
return this.channel.publish(
exchange ?? '',
routingKey,
Buffer.from(JSON.stringify(payload)),
Expand Down
9 changes: 9 additions & 0 deletions packages/bus/src/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,10 @@
import { ExponentialBackoffRetryStrategy } from './retry-strategies';
import { container } from 'tsyringe';
import { RetryStrategy } from '@secbox/core';

container.register(RetryStrategy, {
useFactory: () => new ExponentialBackoffRetryStrategy({ maxDepth: 5 })
});

export * from './brokers';
export * from './retry-strategies';
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import { ExponentialBackoffRetryStrategy } from './ExponentialBackoffRetryStrategy';

describe('ExponentialBackoffRetryStrategy', () => {
const findArg = <R>(
args: [unknown, unknown],
expected: 'function' | 'number'
): R => (typeof args[0] === expected ? args[0] : args[1]) as R;

beforeEach(() => {
jest.useFakeTimers();

const mockedImplementation = jest
.spyOn(global, 'setTimeout')
.getMockImplementation();

jest
.spyOn(global, 'setTimeout')
.mockImplementation((...args: [unknown, unknown]) => {
// ADHOC: depending on implementation (promisify vs raw), the method signature will be different
const callback = findArg<(..._: unknown[]) => void>(args, 'function');
const ms = findArg<number>(args, 'number');
const timer = mockedImplementation?.(callback, ms);

jest.runAllTimers();

return timer as NodeJS.Timeout;
});
});

afterEach(() => {
jest.useRealTimers();
jest.resetAllMocks();
});

it('should not retry if function does not throw error', async () => {
const retryStrategy = new ExponentialBackoffRetryStrategy({ maxDepth: 1 });
const input = jest.fn().mockResolvedValue(undefined);

await retryStrategy.acquire(input);

expect(input).toHaveBeenCalledTimes(1);
});

it('should return a result execution immediately', async () => {
const retryStrategy = new ExponentialBackoffRetryStrategy({ maxDepth: 1 });
const input = jest.fn().mockReturnValue(undefined);

await retryStrategy.acquire(input);

expect(input).toHaveBeenCalledTimes(1);
});

it('should prevent retries if error does not have a correct code', async () => {
const retryStrategy = new ExponentialBackoffRetryStrategy({ maxDepth: 1 });
const input = jest.fn().mockRejectedValue(new Error('Unhandled error'));

const result = retryStrategy.acquire(input);

await expect(result).rejects.toThrow('Unhandled error');
expect(input).toHaveBeenCalledTimes(1);
});

it('should retry two times and throw an error', async () => {
const retryStrategy = new ExponentialBackoffRetryStrategy({ maxDepth: 2 });
const error = new Error('Unhandled error');
(error as any).code = 'ECONNRESET';
const input = jest.fn().mockRejectedValue(error);

const result = retryStrategy.acquire(input);

await expect(result).rejects.toThrow(error);
expect(input).toHaveBeenCalledTimes(3);
});

it('should return a result execution after a two retries', async () => {
const retryStrategy = new ExponentialBackoffRetryStrategy({ maxDepth: 2 });
const error = new Error('Unhandled error');
(error as any).code = 'ECONNRESET';
const input = jest
.fn()
.mockRejectedValueOnce(error)
.mockRejectedValueOnce(error)
.mockResolvedValue(undefined);

await retryStrategy.acquire(input);

expect(input).toHaveBeenCalledTimes(3);
});
});
Loading

0 comments on commit 5c75843

Please sign in to comment.