Skip to content

Commit

Permalink
feat(command-bus): introduce typed context to local command bus
Browse files Browse the repository at this point in the history
  • Loading branch information
gtoselli committed Sep 10, 2024
1 parent 07cf236 commit 0c48728
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 17 deletions.
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@
"lint-staged": {
"*.ts": "eslint --fix",
"*.json": "prettier --write"
}
},
"packageManager": "pnpm@9.9.0+sha512.60c18acd138bff695d339be6ad13f7e936eea6745660d4cc4a776d5247c540d0edee1a563695c183a66eb917ef88f2b4feb1fc25f32a7adcadc7aaf3438e99c1"
}
11 changes: 7 additions & 4 deletions packages/ddd-toolkit/src/command-bus/command-bus.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@ export interface ICommandClass<C extends ICommand<unknown, unknown>> {
new (payload: unknown): C;
}

export interface ICommandHandler<C extends ICommand<unknown, unknown>> {
handle: (command: C) => Promise<C['_returnType']>;
export interface ICommandHandler<C extends ICommand<unknown, unknown>, TContext = void> {
handle: (command: C, context?: TContext) => Promise<C['_returnType']>;
}

export interface ICommandBus {
register<C extends ICommand<unknown, unknown>>(command: ICommandClass<C>, handler: ICommandHandler<C>): void;
export interface ICommandBus<TContext> {
register<C extends ICommand<unknown, unknown>>(
command: ICommandClass<C>,
handler: ICommandHandler<C, TContext>,
): void;

send<C extends ICommand<unknown, unknown>>(command: C): Promise<void>;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export interface IContextManager<TContext> {
wrapWithContext<T>(operation: (context: TContext) => Promise<T>): Promise<T>;
}
53 changes: 48 additions & 5 deletions packages/ddd-toolkit/src/command-bus/local-command-bus.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Command } from './command';
import { waitFor } from '../utils';
import { ICommandHandler } from './command-bus.interface';
import { ILogger } from '../logger';
import { IContextManager } from './context-manager.interface';

const loggerMock: ILogger = {
log: jest.fn(),
Expand All @@ -24,17 +25,15 @@ class BarCommand extends Command<{ foo: string }> {
}

describe('LocalCommandBus', () => {
afterEach(() => jest.resetAllMocks());

describe('Given an command bus', () => {
let commandBus: LocalCommandBus;
let commandBus: LocalCommandBus<unknown>;

beforeEach(() => {
commandBus = new LocalCommandBus(loggerMock, 3, 100);
});

afterEach(() => {
jest.resetAllMocks();
});

describe('Given no registered handler to foo command', () => {
describe('When send a foo command', () => {
it('Should log warning message', async () => {
Expand Down Expand Up @@ -160,6 +159,50 @@ describe('LocalCommandBus', () => {
});
});

describe('Given a command bus with context manager', () => {
type TContext = { contextKey: string };

let commandBus: LocalCommandBus<TContext>;

class FooContextManager implements IContextManager<TContext> {
public async wrapWithContext<T>(operation: (context: TContext) => Promise<T>): Promise<T> {
const context: TContext = { contextKey: 'foo-bar' };
return await operation(context);
}
}

beforeEach(() => {
commandBus = new LocalCommandBus(loggerMock, 3, 100, new FooContextManager());
});

it('should be defined', () => {
expect(commandBus).toBeDefined();
});

describe('Given one registered handler to foo command', () => {
const FooHandlerMock = jest.fn();

class FooCommandHandler implements ICommandHandler<FooCommand> {
async handle(...args: unknown[]) {
return await FooHandlerMock(args);
}
}

beforeEach(() => {
commandBus.register(FooCommand, new FooCommandHandler());
});

describe('When sendSync a foo command', () => {
it('context should be passed to command', async () => {
const command = new FooCommand({ foo: 'bar' });
await commandBus.sendSync(command);

expect(FooHandlerMock).toHaveBeenCalledWith([command, { contextKey: 'foo-bar' }]);
});
});
});
});

it('default retry max attempts should be 0', () => {
const commandBus = new LocalCommandBus(loggerMock);
expect(commandBus['retryMaxAttempts']).toBe(0);
Expand Down
21 changes: 14 additions & 7 deletions packages/ddd-toolkit/src/command-bus/local-command-bus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,32 @@ import { ILogger } from '../logger';
import { ICommand, ICommandBus, ICommandClass, ICommandHandler } from './command-bus.interface';
import { ExponentialBackoff, IRetryMechanism } from '../event-bus';
import { inspect } from 'util';
import { IContextManager } from './context-manager.interface';

export class LocalCommandBus implements ICommandBus {
export class LocalCommandBus<TContext = void> implements ICommandBus<TContext> {
private readonly retryMechanism: IRetryMechanism;

private handlers: { [key: string]: ICommandHandler<ICommand<unknown, unknown>> } = {};
private handlers: { [key: string]: ICommandHandler<ICommand<unknown, unknown>, TContext> } = {};

constructor(
private logger: ILogger,
private readonly retryMaxAttempts = 0,
retryInitialDelay = 500,
private readonly contextManager?: IContextManager<TContext>,
) {
this.retryMechanism = new ExponentialBackoff(retryInitialDelay);
}

public register<C extends ICommand<unknown, unknown>>(
command: ICommandClass<C>,
handler: ICommandHandler<C>,
handler: ICommandHandler<C, TContext>,
): void {
if (this.handlers[command.name]) throw new Error(`Command ${command.name} is already registered`);
this.handlers[command.name] = handler;
}

public async send<C extends ICommand<unknown, unknown>>(command: C): Promise<void> {
const handler = this.handlers[command.name] as ICommandHandler<C>;
const handler = this.handlers[command.name] as ICommandHandler<C, TContext>;
if (!handler) {
this.logger.warn(`No handler found for ${command.name}`);
return;
Expand All @@ -35,14 +37,19 @@ export class LocalCommandBus implements ICommandBus {
}

public async sendSync<C extends ICommand<unknown, unknown>>(command: C): Promise<C['_returnType']> {
const handler = this.handlers[command.name] as ICommandHandler<C>;
const handler = this.handlers[command.name] as ICommandHandler<C, TContext>;
if (!handler) throw new Error(`No handler found for ${command.name}`);
return await handler.handle(command);

return this.contextManager
? await this.contextManager.wrapWithContext(async (context) => {
return await handler.handle(command, context);
})
: await handler.handle(command);
}

private async handleCommand<C extends ICommand<unknown, unknown>>(
command: C,
handler: ICommandHandler<C>,
handler: ICommandHandler<C, TContext>,
attempt = 0,
) {
try {
Expand Down

0 comments on commit 0c48728

Please sign in to comment.