Skip to content

Commit

Permalink
Merge pull request #20 from wavezync/feat/work-decorator
Browse files Browse the repository at this point in the history
feat: allow job & cron job handlers to process job arrays
  • Loading branch information
kasvith authored Nov 25, 2024
2 parents 42bc9a1 + cb02c06 commit d069f3e
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 19 deletions.
4 changes: 2 additions & 2 deletions README.MD
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ export class MyJobHandler {
private readonly logger = new Logger(MyJobHandler.name);

@Job('my-job')
async handleMyJob(job: { data: any }) {
this.logger.log('Handling job with data:', job.data);
async handleMyJob(jobs: any[]) {
this.logger.log(`Handling ${jobs.length} job(s)`);
}
}

Expand Down
6 changes: 4 additions & 2 deletions lib/decorators/job.decorator.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import { SetMetadata } from "@nestjs/common";
import { JobOptions } from "pg-boss";
import { JobOptions, WorkOptions } from "pg-boss";

export const JOB_NAME = "JOB_NAME";
export const JOB_OPTIONS = "JOB_OPTIONS";
export const CRON_EXPRESSION = "CRON_EXPRESSION";
export const CRON_OPTIONS = "CRON_OPTIONS";
export const PG_BOSS_JOB_METADATA = "PG_BOSS_JOB_METADATA";
export const WORK_NAME = "WORK_NAME";
export const WORK_OPTIONS = "WORK_OPTIONS";

export function Job<_TData extends object = any>(
name: string,
options: JobOptions = {},
options: WorkOptions = {},
) {
return (target: any, key: string, descriptor: PropertyDescriptor) => {
SetMetadata(JOB_NAME, name)(target, key, descriptor);
Expand Down
27 changes: 12 additions & 15 deletions lib/handler-scanner.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ import {
CRON_OPTIONS,
} from "./decorators/job.decorator";
import { InstanceWrapper } from "@nestjs/core/injector/instance-wrapper";
import PgBoss, { WorkWithMetadataHandler } from "pg-boss";
import PgBoss from "pg-boss";
import { LOGGER } from "./utils/consts";
import { normalizeJob } from "./utils/helpers";

@Injectable()
export class HandlerScannerService {
Expand Down Expand Up @@ -44,6 +43,7 @@ export class HandlerScannerService {

for (const methodName of methodNames) {
const methodRef = instance[methodName];

const jobName = this.reflector.get<string>(JOB_NAME, methodRef);
const jobOptions = this.reflector.get<PgBoss.WorkOptions>(
JOB_OPTIONS,
Expand All @@ -59,30 +59,27 @@ export class HandlerScannerService {
);

if (jobName) {
const boundHandler: WorkWithMetadataHandler<any> = async (job) => {
const extractedJob = normalizeJob(job);
await methodRef.call(instance, extractedJob);
};
try {
if (cronExpression) {
await this.pgBossService.registerCronJob(
jobName,
cronExpression,
boundHandler,
methodRef.bind(instance),
{},
cronOptions,
);
this.logger.log(`Registered cron job: ${jobName}`);
} else {
await this.pgBossService.registerJob(
jobName,
boundHandler,
jobOptions,
);
this.logger.log(`Registered job: ${jobName}`);
continue;
}

await this.pgBossService.registerJob(
jobName,
methodRef.bind(instance),
jobOptions,
);
this.logger.log(`Registered job: ${jobName}`);
} catch (error) {
this.logger.error(`Error registering job ${jobName}:`, error);
this.logger.error(error, `Error registering job ${jobName}`);
}
}
}
Expand Down

0 comments on commit d069f3e

Please sign in to comment.