Skip to content

Commit

Permalink
feat: Improve commands batching performance (#80)
Browse files Browse the repository at this point in the history
* feat: Improve commands batching performance

* fix: Remove logs
  • Loading branch information
JonaszJestem authored Feb 2, 2022
1 parent 5b9fdac commit a3a8011
Show file tree
Hide file tree
Showing 18 changed files with 363 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export interface ICertificateEvent {
createdAt: Date;
}

type PersistedEvent =
export type PersistedEvent =
| CertificateIssuancePersistedEvent
| CertificateClaimPersistedEvent
| CertificateTransferPersistedEvent
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import { Inject, Injectable } from '@nestjs/common';
import { CertificateCommandRepository } from './repositories/CertificateCommand/CertificateCommand.repository';
import { CertificateEventRepository } from './repositories/CertificateEvent/CertificateEvent.repository';
import {
CertificateEventRepository,
UnsavedEvent
} from './repositories/CertificateEvent/CertificateEvent.repository';
import { EventBus } from '@nestjs/cqrs';
import { IClaimCommand, IIssueCommand, IIssueCommandParams, ITransferCommand } from '../types';
import {
Expand All @@ -19,7 +22,8 @@ import {
CertificateTransferPersistedEvent,
CertificateTransferredEvent,
ICertificateEvent,
isPersistedEvent
isPersistedEvent,
PersistedEvent
} from './events/Certificate.events';
import { CertificateCommandEntity } from './repositories/CertificateCommand/CertificateCommand.entity';
import { CertificateReadModelRepository } from './repositories/CertificateReadModel/CertificateReadModel.repository';
Expand All @@ -40,6 +44,13 @@ import {
validateIssueCommand,
validateTransferCommand
} from './validators';
import {
createAggregatesFromCertificateGroups,
createEventFromCommand,
createIssueEventsFromCommands,
groupByInternalCertificateId,
zipEventsWithCommandId
} from './utils/batch.utils';

@Injectable()
export class OffChainCertificateService<T = null> {
Expand All @@ -57,21 +68,15 @@ export class OffChainCertificateService<T = null> {
public async getAll(
options: IGetAllCertificatesOptions = {}
): Promise<ICertificateReadModel<T>[]> {
const certificates = await this.readModelRepo.getAll(options);

return certificates;
return await this.readModelRepo.getAll(options);
}

public async getById(id: number): Promise<ICertificateReadModel<T> | null> {
const certificate = await this.readModelRepo.getByInternalCertificateId(id);

return certificate;
return await this.readModelRepo.getByInternalCertificateId(id);
}

public async getByIds(ids: number[]): Promise<ICertificateReadModel<T>[]> {
const certificates = await this.readModelRepo.getManyByInternalCertificateIds(ids);

return certificates;
return await this.readModelRepo.getManyByInternalCertificateIds(ids);
}

public async issue(params: IIssueCommandParams<T>): Promise<number> {
Expand All @@ -87,11 +92,14 @@ export class OffChainCertificateService<T = null> {

public async issueWithoutValidation(command: IIssueCommand<T>): Promise<number> {
const savedCommand = await this.certCommandRepo.save({ payload: command });

const event = CertificateIssuedEvent.createNew(
await this.generateInternalCertificateId(),
command
);

const aggregate = await this.createAggregate([event]);

await this.propagate(event, savedCommand, aggregate);

return event.internalCertificateId;
Expand All @@ -113,6 +121,7 @@ export class OffChainCertificateService<T = null> {
await validateTransferCommand(command);
return await this.transferWithoutValidation(command);
}

public async transferWithoutValidation(command: ITransferCommand): Promise<void> {
const savedCommand = await this.certCommandRepo.save({ payload: command });
const event = CertificateTransferredEvent.createNew(command.certificateId, command);
Expand Down Expand Up @@ -169,40 +178,71 @@ export class OffChainCertificateService<T = null> {
}

public async batchIssue(originalCertificates: IIssueCommandParams<T>[]): Promise<number[]> {
if (!originalCertificates.length) {
return [];
}

const commands: IIssueCommand<T>[] = originalCertificates.map((c) => ({
...c,
fromTime: Math.round(c.fromTime.getTime() / 1000),
toTime: Math.round(c.toTime.getTime() / 1000)
}));

await validateBatchIssueCommands(commands);
await this.validateBatchIssue(commands);

const certs: number[] = [];
const savedCommands = await this.certCommandRepo.saveMany(
commands.map((command) => ({ payload: command }))
);

for (const command of commands) {
const certificateId = await this.issueWithoutValidation(command);
certs.push(certificateId);
}
const events = await createIssueEventsFromCommands(commands, () =>
this.generateInternalCertificateId()
);
const eventsByCertificateId = groupByInternalCertificateId(events);

const aggregates = await createAggregatesFromCertificateGroups(
eventsByCertificateId,
(events) => this.createAggregate(events)
);

await this.propagateMany(zipEventsWithCommandId(events, savedCommands), aggregates);

return certs;
return events.map((event) => event.internalCertificateId);
}

public async batchClaim(commands: IClaimCommand[]): Promise<void> {
if (!commands.length) {
return;
}
await validateBatchClaimCommands(commands);
await this.validateBatchClaim(commands);

for (const command of commands) {
await this.claimWithoutValidation(command);
}
await this.handleBatch(commands);
}

public async batchTransfer(commands: ITransferCommand[]): Promise<void> {
if (!commands.length) {
return;
}

await validateBatchTransferCommands(commands);
await this.validateBatchTransfer(commands);
await this.handleBatch(commands);
}

for (const command of commands) {
await this.transferWithoutValidation(command);
}
private async handleBatch(commands: (IClaimCommand | ITransferCommand)[]) {
const savedCommands = await this.certCommandRepo.saveMany(
commands.map((command) => ({ payload: command }))
);
const events = commands.map((command) => createEventFromCommand(command));

const eventsByCertificate = groupByInternalCertificateId(events);

const aggregates = await createAggregatesFromCertificateGroups(
eventsByCertificate,
(events) => this.createAggregate(events)
);

await this.propagateMany(zipEventsWithCommandId(events, savedCommands), aggregates);
}

private async validateBatchIssue(commands: IIssueCommand<T>[]): Promise<void> {
Expand All @@ -216,43 +256,39 @@ export class OffChainCertificateService<T = null> {
}

private async validateBatchClaim(commands: IClaimCommand[]): Promise<void> {
const grouped = this.groupCommandsByCertificate(commands);
const events = commands.map((command) => createEventFromCommand(command));
const eventsByCertificateId = groupByInternalCertificateId(events);

try {
for (const certificateId in grouped) {
const events = grouped[certificateId].map((c) => {
return CertificateClaimedEvent.createNew(parseInt(certificateId), c);
});
await this.createAggregate(events);
}
await createAggregatesFromCertificateGroups(eventsByCertificateId, (events) =>
this.createAggregate(events)
);
} catch (err) {
throw new CertificateErrors.BatchError(err);
}
}

private async validateBatchTransfer(commands: ITransferCommand[]): Promise<void> {
const grouped = this.groupCommandsByCertificate(commands);
const events = commands.map((command) => createEventFromCommand(command));
const eventsByCertificateId = groupByInternalCertificateId(events);

try {
for (const certificateId in grouped) {
const events = grouped[certificateId].map((c) => {
return CertificateTransferredEvent.createNew(parseInt(certificateId), c);
});
await this.createAggregate(events);
}
await createAggregatesFromCertificateGroups(eventsByCertificateId, (events) =>
this.createAggregate(events)
);
} catch (err) {
throw new CertificateErrors.BatchError(err);
}
}

private async createAggregate(events: ICertificateEvent[]): Promise<CertificateAggregate<T>> {
const aggregate = CertificateAggregate.fromEvents<T>([
...(await this.certEventRepo.getByInternalCertificateId(
events[0].internalCertificateId
)),
...events
]);
return aggregate;
private async createAggregate(
newEvents: ICertificateEvent[]
): Promise<CertificateAggregate<T>> {
const certificateId = newEvents[0].internalCertificateId;

const previousEvents = await this.certEventRepo.getByInternalCertificateId(certificateId);

return CertificateAggregate.fromEvents<T>([...previousEvents, ...newEvents]);
}

private async generateInternalCertificateId(): Promise<number> {
Expand All @@ -271,22 +307,29 @@ export class OffChainCertificateService<T = null> {

if (isPersistedEvent(savedEvent)) {
await this.certEventRepo.updateAttempt({
eventId: savedEvent.payload.persistedEventId,
eventIds: [savedEvent.payload.persistedEventId],
error: errorMessage
});
}
}

private groupCommandsByCertificate<T extends { certificateId: number }>(
commands: T[]
): Record<number, T[]> {
const certificates: Record<number, T[]> = {};
commands.forEach((command) => {
const exists = certificates[command.certificateId];
exists
? (certificates[command.certificateId] = [...exists, command])
: (certificates[command.certificateId] = [command]);
private async propagateMany(
events: (UnsavedEvent & { commandId: number })[],
aggregates: CertificateAggregate<T>[],
errorMessage?: string
): Promise<void> {
await this.eventBus.publishAll(events);
await this.readModelRepo.saveMany(
aggregates.map((aggregate) => aggregate.getCertificate())
);
const savedEvents = await this.certificateEventService.saveMany(events);
const persistedEvents = savedEvents.filter((event) =>
isPersistedEvent(event)
) as PersistedEvent[];

await this.certEventRepo.updateAttempt({
eventIds: persistedEvents.map((event) => event.payload.persistedEventId),
error: errorMessage
});
return certificates;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ export type NewCertificateCommand = Omit<CertificateCommandEntity, 'id' | 'creat
export interface CertificateCommandRepository {
save(certificateCommand: NewCertificateCommand): Promise<CertificateCommandEntity>;

saveMany(certificateCommands: NewCertificateCommand[]): Promise<CertificateCommandEntity[]>;

getById(commandId: number): Promise<CertificateCommandEntity | null>;

getAll(): Promise<CertificateCommandEntity[]>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import {
export class CertificateCommandInMemoryRepository implements CertificateCommandRepository {
private db: CertificateCommandEntity[] = [];

public async save(generation: NewCertificateCommand): Promise<CertificateCommandEntity> {
public async save(command: NewCertificateCommand): Promise<CertificateCommandEntity> {
const entity = {
...generation,
...command,
createdAt: new Date(),
id: this.db.length + 1
};
Expand All @@ -22,6 +22,23 @@ export class CertificateCommandInMemoryRepository implements CertificateCommandR
return entity;
}

public async saveMany(commands: NewCertificateCommand[]): Promise<CertificateCommandEntity[]> {
const savedEntities: CertificateCommandEntity[] = [];

commands.forEach((command) => {
const entity = {
...command,
createdAt: new Date(),
id: this.db.length + 1
};

this.db.push(entity);
savedEntities.push(entity);
});

return savedEntities;
}

public async getAll(): Promise<CertificateCommandEntity[]> {
return this.db;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@ export class CertificateCommandPostgresRepository implements CertificateCommandR
private repository: Repository<CertificateCommandEntity>
) {}

public async save(generation: NewCertificateCommand): Promise<CertificateCommandEntity> {
return await this.repository.save(generation);
public async save(command: NewCertificateCommand): Promise<CertificateCommandEntity> {
return await this.repository.save(command);
}

public async saveMany(commands: NewCertificateCommand[]): Promise<CertificateCommandEntity[]> {
return await this.repository.save(commands);
}

public async getAll(): Promise<CertificateCommandEntity[]> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,30 @@ export interface IGetToProcessOptions {
limit: number | null;
}

export type UnsavedEvent = Omit<ICertificateEvent, 'id'>;

export interface CertificateEventRepository {
save(
event: Omit<ICertificateEvent, 'id'>,
event: UnsavedEvent,
commandId: number,
txManager: EntityManager | null
): Promise<ICertificateEvent>;

saveMany(
events: (UnsavedEvent & { commandId: number })[],
txManager: EntityManager | null
): Promise<ICertificateEvent[]>;

createSynchronizationAttempt(
eventId: number,
txManager: EntityManager | null
): Promise<CertificateSynchronizationAttemptEntity>;

createSynchronizationAttempts(
eventId: number[],
txManager: EntityManager | null
): Promise<CertificateSynchronizationAttemptEntity[]>;

getByInternalCertificateId(internalCertId: number): Promise<ICertificateEvent[]>;

/**
Expand All @@ -26,9 +38,9 @@ export interface CertificateEventRepository {
getBlockchainIdMap(internalCertIds: number[]): Promise<Record<number, number>>;

updateAttempt(updateData: {
eventId: number;
eventIds: number[];
error?: string;
}): Promise<CertificateSynchronizationAttemptEntity>;
}): Promise<CertificateSynchronizationAttemptEntity[]>;

getAll(): Promise<ICertificateEvent[]>;

Expand Down
Loading

0 comments on commit a3a8011

Please sign in to comment.