Skip to content

Commit

Permalink
Merge pull request #53 from energywebfoundation/chore/improve-etr-pro…
Browse files Browse the repository at this point in the history
…cessing

chore(origin-247-transfer): optimize batching, download only necessary ETRs
  • Loading branch information
soanvig authored Nov 24, 2021
2 parents cfa65bb + 7c0fce4 commit ca6ae76
Show file tree
Hide file tree
Showing 12 changed files with 64 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ export class CertificateForUnitTestsService<T> implements PublicPart<Certificate
return this.db.filter((entry) => {
const isDateOk =
new Date(entry.generationStartTime * 1000) >= generationStartFrom &&
new Date(entry.generationStartTime * 1000) <= generationStartTo;
new Date(entry.generationEndTime * 1000) >= generationEndFrom &&
new Date(entry.generationEndTime * 1000) <= generationEndTo;
new Date(entry.creationTime * 1000) >= creationTimeFrom &&
new Date(entry.generationStartTime * 1000) <= generationStartTo &&
new Date(entry.generationEndTime * 1000) >= generationEndFrom &&
new Date(entry.generationEndTime * 1000) <= generationEndTo &&
new Date(entry.creationTime * 1000) >= creationTimeFrom &&
new Date(entry.creationTime * 1000) <= creationTimeTo;

const isDeviceOk = deviceId ? entry.deviceId === entry.deviceId : true;
Expand Down
5 changes: 5 additions & 0 deletions packages/origin-247-transfer/src/batch/events.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import { IEvent } from '@nestjs/cqrs';

export class AwaitingTransferEvent implements IEvent {}
export class AwaitingIssuanceEvent implements IEvent {}
export class AwaitingValidationEvent implements IEvent {}
5 changes: 2 additions & 3 deletions packages/origin-247-transfer/src/batch/issue.batch.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import { EventsHandler, IEventHandler, IEvent } from '@nestjs/cqrs';
import { EventsHandler, IEventHandler } from '@nestjs/cqrs';
import { Inject } from '@nestjs/common';
import { BatchConfiguration, BATCH_CONFIGURATION_TOKEN } from './configuration';
import { IssueService } from '../issue.service';
import { queueThrottle } from './queueThrottle';

export class AwaitingIssuanceEvent implements IEvent {}
import { AwaitingIssuanceEvent } from './events';

@EventsHandler(AwaitingIssuanceEvent)
export class AwaitingIssuanceEventHandler implements IEventHandler<AwaitingIssuanceEvent> {
Expand Down
5 changes: 2 additions & 3 deletions packages/origin-247-transfer/src/batch/transfer.batch.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import { IEvent, EventsHandler, IEventHandler } from '@nestjs/cqrs';
import { EventsHandler, IEventHandler } from '@nestjs/cqrs';
import { Inject } from '@nestjs/common';
import { TransferService } from '../transfer.service';
import { queueThrottle } from './queueThrottle';
import { BatchConfiguration, BATCH_CONFIGURATION_TOKEN } from './configuration';

export class AwaitingTransferEvent implements IEvent {}
import { AwaitingTransferEvent } from './events';

@EventsHandler(AwaitingTransferEvent)
export class AwaitingTransferEventHandler implements IEventHandler<AwaitingTransferEvent> {
Expand Down
5 changes: 2 additions & 3 deletions packages/origin-247-transfer/src/batch/validate.batch.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import { EventsHandler, IEventHandler, IEvent } from '@nestjs/cqrs';
import { EventsHandler, IEventHandler } from '@nestjs/cqrs';
import { Inject } from '@nestjs/common';
import { BatchConfiguration, BATCH_CONFIGURATION_TOKEN } from './configuration';
import { ValidateService } from '../validate.service';
import { queueThrottle } from './queueThrottle';

export class AwaitingValidationEvent implements IEvent {}
import { AwaitingValidationEvent } from './events';

@EventsHandler(AwaitingValidationEvent)
export class AwaitingValidationEventHandler implements IEventHandler<AwaitingValidationEvent> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { EventsHandler, IEventHandler, QueryBus, EventBus } from '@nestjs/cqrs';
import { Inject, Logger } from '@nestjs/common';
import { GenerationReadingStoredEvent } from '../events/GenerationReadingStored.event';
import { IssueService } from '../issue.service';
import { AwaitingIssuanceEvent } from '../batch/events';
import {
EnergyTransferRequestRepository,
ENERGY_TRANSFER_REQUEST_REPOSITORY
Expand All @@ -10,7 +11,6 @@ import {
GetTransferSitesQuery,
IGetTransferSitesQueryResponse
} from '../queries/GetTransferSites.query';
import { AwaitingIssuanceEvent } from '../batch/issue.batch';

@EventsHandler(GenerationReadingStoredEvent)
export class GenerationReadingStoredEventHandler
Expand Down
14 changes: 7 additions & 7 deletions packages/origin-247-transfer/src/issue.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ import {
} from './repositories/EnergyTransferRequest.repository';
import { EventBus } from '@nestjs/cqrs';
import { EnergyTransferRequest, State } from './EnergyTransferRequest';
import { chunk } from 'lodash';
import { AwaitingValidationEvent } from './batch/validate.batch';
import { BatchConfiguration, BATCH_CONFIGURATION_TOKEN } from './batch/configuration';
import { AwaitingValidationEvent, AwaitingIssuanceEvent } from './batch/events';

@Injectable()
export class IssueService {
Expand All @@ -23,13 +22,14 @@ export class IssueService {
) {}

public async issueTask() {
const etrs = await this.etrRepository.findByState(State.IssuanceAwaiting);
const etrs = await this.etrRepository.findByState(State.IssuanceAwaiting, {
limit: this.batchConfiguration.issueBatchSize
});

const etrGroups = chunk(etrs, this.batchConfiguration.issueBatchSize);
await this.issueCertificates(etrs);

for (const group of etrGroups) {
await this.issueCertificates(group);
}
// Loop
this.eventBus.publish(new AwaitingIssuanceEvent());
}

private async issueCertificates(etrs: EnergyTransferRequest[]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ import { EnergyTransferRequest, NewAttributesParams, State } from '../EnergyTran

export const ENERGY_TRANSFER_REQUEST_REPOSITORY = Symbol.for('ENERGY_TRANSFER_REQUEST_REPOSITORY');

export interface IFindByStateOptions {
limit: number;
}

export interface EnergyTransferRequestRepository {
createNew(command: NewAttributesParams): Promise<EnergyTransferRequest>;
findByCertificateId(certificateId: number): Promise<EnergyTransferRequest | null>;
findById(id: number): Promise<EnergyTransferRequest | null>;
findAll(): Promise<EnergyTransferRequest[]>;
findByState(status: State): Promise<EnergyTransferRequest[]>;
findByState(status: State, options: IFindByStateOptions): Promise<EnergyTransferRequest[]>;
save(entity: EnergyTransferRequest): Promise<void>;
saveManyInTransaction(entity: EnergyTransferRequest[]): Promise<void>;
updateWithLock(id: number, cb: (entity: EnergyTransferRequest) => void): Promise<void>;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import { Injectable } from '@nestjs/common';
import { EnergyTransferRequest, NewAttributesParams, State } from '../EnergyTransferRequest';
import { EnergyTransferRequestRepository } from './EnergyTransferRequest.repository';
import {
EnergyTransferRequestRepository,
IFindByStateOptions
} from './EnergyTransferRequest.repository';

@Injectable()
export class EnergyTransferRequestInMemoryRepository implements EnergyTransferRequestRepository {
Expand Down Expand Up @@ -36,8 +39,11 @@ export class EnergyTransferRequestInMemoryRepository implements EnergyTransferRe
return request ?? null;
}

public async findByState(state: State): Promise<EnergyTransferRequest[]> {
return this.db.filter((e) => e.toAttrs().state === state);
public async findByState(
state: State,
options: IFindByStateOptions
): Promise<EnergyTransferRequest[]> {
return this.db.filter((e) => e.toAttrs().state === state).slice(0, options.limit);
}

public async findById(id: number): Promise<EnergyTransferRequest | null> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ import { InjectRepository, InjectConnection } from '@nestjs/typeorm';
import { Repository, Connection } from 'typeorm';
import { EnergyTransferRequest, NewAttributesParams, State } from '../EnergyTransferRequest';
import { EnergyTransferRequestEntity, tableName } from './EnergyTransferRequest.entity';
import { EnergyTransferRequestRepository } from './EnergyTransferRequest.repository';
import { omit } from 'lodash';
import {
EnergyTransferRequestRepository,
IFindByStateOptions
} from './EnergyTransferRequest.repository';

@Injectable()
export class EnergyTransferRequestPostgresRepository implements EnergyTransferRequestRepository {
Expand Down Expand Up @@ -55,9 +57,13 @@ export class EnergyTransferRequestPostgresRepository implements EnergyTransferRe
return request ? EnergyTransferRequest.fromAttrs(request) : null;
}

public async findByState(state: State): Promise<EnergyTransferRequest[]> {
public async findByState(
state: State,
options: IFindByStateOptions
): Promise<EnergyTransferRequest[]> {
const results = await this.repository.find({
state
where: { state },
take: options.limit
});

return results.map(EnergyTransferRequest.fromAttrs);
Expand Down
17 changes: 10 additions & 7 deletions packages/origin-247-transfer/src/transfer.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ import {
ENERGY_TRANSFER_REQUEST_REPOSITORY
} from './repositories/EnergyTransferRequest.repository';
import { EnergyTransferRequest, State } from './EnergyTransferRequest';
import { chunk } from 'lodash';
import { BatchConfiguration, BATCH_CONFIGURATION_TOKEN } from './batch/configuration';
import { EventBus } from '@nestjs/cqrs';
import { AwaitingTransferEvent } from './batch/events';

@Injectable()
export class TransferService {
Expand All @@ -16,17 +17,19 @@ export class TransferService {
@Inject(ENERGY_TRANSFER_REQUEST_REPOSITORY)
private etrRepository: EnergyTransferRequestRepository,
@Inject(BATCH_CONFIGURATION_TOKEN)
private batchConfiguration: BatchConfiguration
private batchConfiguration: BatchConfiguration,
private eventBus: EventBus
) {}

public async transferTask() {
const etrs = await this.etrRepository.findByState(State.TransferAwaiting);
const etrs = await this.etrRepository.findByState(State.TransferAwaiting, {
limit: this.batchConfiguration.transferBatchSize
});

const chunkedEtrs = chunk(etrs, this.batchConfiguration.transferBatchSize);
await this.transferCertificates(etrs);

for (const chunk of chunkedEtrs) {
await this.transferCertificates(chunk);
}
// Loop
this.eventBus.publish(new AwaitingTransferEvent());
}

private async transferCertificates(etrs: EnergyTransferRequest[]): Promise<void> {
Expand Down
14 changes: 7 additions & 7 deletions packages/origin-247-transfer/src/validate.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@ import {
TransferValidationStatus,
UpdateStatusCode
} from './EnergyTransferRequest';
import { chunk } from 'lodash';
import { AwaitingTransferEvent } from './batch/transfer.batch';
import { BatchConfiguration, BATCH_CONFIGURATION_TOKEN } from './batch/configuration';
import { AwaitingValidationEvent, AwaitingTransferEvent } from './batch/events';

export interface IUpdateValidationStatusCommand {
requestId: number;
Expand All @@ -41,13 +40,14 @@ export class ValidateService {
) {}

public async validateTask(): Promise<void> {
const etrs = await this.etrRepository.findByState(State.ValidationAwaiting);
const etrs = await this.etrRepository.findByState(State.ValidationAwaiting, {
limit: this.batchConfiguration.validateBatchSize
});

const chunkedEtrs = chunk(etrs, this.batchConfiguration.validateBatchSize);
await this.startValidation(etrs);

for (const chunk of chunkedEtrs) {
await this.startValidation(chunk);
}
// Loop
this.eventBus.publish(new AwaitingValidationEvent());
}

private async startValidation(etrs: EnergyTransferRequest[]): Promise<void> {
Expand Down

0 comments on commit ca6ae76

Please sign in to comment.