Skip to content

Commit

Permalink
feat: work decorator with Batch Processing
Browse files Browse the repository at this point in the history
  • Loading branch information
samaratungajs committed Nov 24, 2024
1 parent 42bc9a1 commit 067076a
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 2 deletions.
17 changes: 17 additions & 0 deletions README.MD
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,23 @@ export class MyCronJobService {
}
}

```
#### Batch Processing using the `@Work` decorator

```ts
import { Injectable, Logger } from '@nestjs/common';
import { Work } from '@wavezync/nestjs-pgboss';

@Injectable()
export class BatchJobHandler {
private readonly logger = new Logger(BatchJobHandler.name);

@Work('batch-job', { batchSize: 5 })
async handleBatchJobs(jobs: any[]) {
this.logger.log(`Handle batch of ${jobs.length} jobs`);
}
}

```

## Test
Expand Down
11 changes: 10 additions & 1 deletion lib/decorators/job.decorator.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import { SetMetadata } from "@nestjs/common";
import { JobOptions } from "pg-boss";
import { JobOptions, WorkOptions } from "pg-boss";

export const JOB_NAME = "JOB_NAME";
export const JOB_OPTIONS = "JOB_OPTIONS";
export const CRON_EXPRESSION = "CRON_EXPRESSION";
export const CRON_OPTIONS = "CRON_OPTIONS";
export const PG_BOSS_JOB_METADATA = "PG_BOSS_JOB_METADATA";
export const WORK_NAME = "WORK_NAME";
export const WORK_OPTIONS = "WORK_OPTIONS";

export function Job<_TData extends object = any>(
name: string,
Expand Down Expand Up @@ -38,3 +40,10 @@ export function CronJob<_TData extends object = any>(
);
};
}

export function Work(name: string, options: WorkOptions = {}) {
return (target: any, key: string, descriptor: PropertyDescriptor) => {
SetMetadata(WORK_NAME, name)(target, key, descriptor);
SetMetadata(WORK_OPTIONS, options)(target, key, descriptor);
};
}
23 changes: 23 additions & 0 deletions lib/handler-scanner.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import {
JOB_OPTIONS,
CRON_EXPRESSION,
CRON_OPTIONS,
WORK_NAME,
WORK_OPTIONS,
} from "./decorators/job.decorator";
import { InstanceWrapper } from "@nestjs/core/injector/instance-wrapper";
import PgBoss, { WorkWithMetadataHandler } from "pg-boss";
Expand Down Expand Up @@ -44,6 +46,27 @@ export class HandlerScannerService {

for (const methodName of methodNames) {
const methodRef = instance[methodName];

const workName = this.reflector.get<string>(WORK_NAME, methodRef);
const workOptions = this.reflector.get<PgBoss.WorkOptions>(
WORK_OPTIONS,
methodRef,
);

if (workName) {
try {
await this.pgBossService.registerWorker(
workName,
async (jobs) => await methodRef.call(instance, jobs),
workOptions,
);
this.logger.log(`Registered worker: ${workName}`);
} catch (error) {
this.logger.error(error, `Error registering worker ${workName}`);
}
continue;
}

const jobName = this.reflector.get<string>(JOB_NAME, methodRef);
const jobOptions = this.reflector.get<PgBoss.WorkOptions>(
JOB_OPTIONS,
Expand Down
16 changes: 16 additions & 0 deletions lib/pgboss.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,20 @@ export class PgBossService {
handler,
);
}

async registerWorker<TData extends object>(
name: string,
handler: (jobs: PgBoss.JobWithMetadata<TData>[]) => Promise<void>,
options?: PgBoss.WorkOptions,
) {
await this.pgBoss.createQueue(name);
await this.pgBoss.work<TData>(
name,
{ ...options, includeMetadata: true },
async (jobs) => {
const jobArray = Array.isArray(jobs) ? jobs : [jobs];
await handler(jobArray);
},
);
}
}
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@wavezync/nestjs-pgboss",
"version": "3.0.1",
"version": "3.1.0",
"description": "A NestJS module that integrates pg-boss for job scheduling and handling.",
"license": "MIT",
"author": "samaratungajs@wavezync.com",
Expand Down

0 comments on commit 067076a

Please sign in to comment.