diff --git a/bun.lock b/bun.lock index 81d9c0b..3f84399 100644 --- a/bun.lock +++ b/bun.lock @@ -84,8 +84,11 @@ "name": "@lowerdeck/cron", "version": "1.0.2", "dependencies": { - "@lowerdeck/queue": "^1.0.0", - "@lowerdeck/redis": "^1.0.0", + "@lowerdeck/execution-context": "^1.0.1", + "@lowerdeck/id": "^1.0.4", + "@lowerdeck/queue": "^1.0.2", + "@lowerdeck/redis": "^1.0.2", + "@lowerdeck/sentry": "^1.0.1", "bullmq": "^5.34.3", }, "devDependencies": { @@ -159,10 +162,10 @@ }, "packages/execution-context": { "name": "@lowerdeck/execution-context", - "version": "1.0.0", + "version": "1.0.1", "dependencies": { "@lowerdeck/id": "^1.0.0", - "@lowerdeck/sentry": "^1.0.0", + "@lowerdeck/sentry": "^1.0.1", }, "devDependencies": { "@lowerdeck/tsconfig": "^1.0.0", @@ -390,9 +393,12 @@ "name": "@lowerdeck/queue", "version": "1.0.2", "dependencies": { - "@lowerdeck/delay": "^1.0.0", - "@lowerdeck/memo": "^1.0.0", - "@lowerdeck/redis": "^1.0.0", + "@lowerdeck/delay": "^1.0.3", + "@lowerdeck/execution-context": "^1.0.1", + "@lowerdeck/id": "^1.0.4", + "@lowerdeck/memo": "^1.0.3", + "@lowerdeck/redis": "^1.0.2", + "@lowerdeck/sentry": "^1.0.1", "bullmq": "^5.66.0", "superjson": "^2.2.6", }, @@ -427,10 +433,10 @@ "name": "@lowerdeck/redis", "version": "1.0.2", "dependencies": { - "@lowerdeck/id": "^1.0.0", - "@lowerdeck/memo": "^1.0.0", - "@lowerdeck/random-number": "^1.0.0", - "@lowerdeck/serialize": "^1.0.0", + "@lowerdeck/id": "^1.0.4", + "@lowerdeck/memo": "^1.0.3", + "@lowerdeck/random-number": "^1.0.3", + "@lowerdeck/serialize": "^1.0.3", "ioredis": "^5.8.2", "p-queue": "^9.0.1", "redis": "^5.10.0", @@ -444,7 +450,7 @@ }, "packages/rpc-client": { "name": "@lowerdeck/rpc-client", - "version": "1.0.0", + "version": "1.0.1", "dependencies": { "@lowerdeck/canonicalize": "^1.0.1", "@lowerdeck/error": "^1.0.5", @@ -462,7 +468,7 @@ }, "packages/rpc-server": { "name": "@lowerdeck/rpc-server", - "version": "1.0.0", + "version": "1.0.1", "dependencies": { "@lowerdeck/error": "^1.0.5", "@lowerdeck/execution-context": "^1.0.0", @@ -482,7 +488,7 @@ }, "packages/sentry": { "name": "@lowerdeck/sentry", - "version": "1.0.0", + "version": "1.0.1", "dependencies": { "@sentry/core": "^10.32.1", }, @@ -2032,6 +2038,8 @@ "@lowerdeck/normalize-email/typescript": ["typescript@5.8.2", "", { "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" } }, "sha512-aJn6wq13/afZp/jT9QZmwEjDqqvSGp1VT5GVg+f/t6/oVyrgXM6BY1h9BRh/O5p3PlUPAe+WuiEZOmb/49RqoQ=="], + "@lowerdeck/queue/@lowerdeck/sentry": ["@lowerdeck/sentry@1.0.1", "", { "dependencies": { "@sentry/core": "^10.32.1" } }, "sha512-yyej00ze051iZK12UVudVXaF1+ipPy7bjOUYd9sYp6cN2BzRt8OGhJnvU5bttiol0zaG1bH7DGeReuyqA2wPgQ=="], + "@lowerdeck/queue/typescript": ["typescript@5.8.2", "", { "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" } }, "sha512-aJn6wq13/afZp/jT9QZmwEjDqqvSGp1VT5GVg+f/t6/oVyrgXM6BY1h9BRh/O5p3PlUPAe+WuiEZOmb/49RqoQ=="], "@lowerdeck/redis/typescript": ["typescript@5.8.2", "", { "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" } }, "sha512-aJn6wq13/afZp/jT9QZmwEjDqqvSGp1VT5GVg+f/t6/oVyrgXM6BY1h9BRh/O5p3PlUPAe+WuiEZOmb/49RqoQ=="], diff --git a/packages/cron/CHANGELOG.md b/packages/cron/CHANGELOG.md index 7bf9b2f..e47a407 100644 --- a/packages/cron/CHANGELOG.md +++ b/packages/cron/CHANGELOG.md @@ -1,5 +1,13 @@ # @lowerdeck/cron +## 1.0.3 + +### Patch Changes + +- add sentry and execution context +- Updated dependencies + - @lowerdeck/queue@1.0.3 + ## 1.0.2 ### Patch Changes diff --git a/packages/cron/package.json b/packages/cron/package.json index 6d3e219..1041a9b 100644 --- a/packages/cron/package.json +++ b/packages/cron/package.json @@ -1,6 +1,6 @@ { "name": "@lowerdeck/cron", - "version": "1.0.2", + "version": "1.0.3", "publishConfig": { "access": "public" }, @@ -24,8 +24,11 @@ "build": "microbundle" }, "dependencies": { - "@lowerdeck/queue": "^1.0.2", + "@lowerdeck/execution-context": "^1.0.1", + "@lowerdeck/id": "^1.0.4", + "@lowerdeck/queue": "^1.0.3", "@lowerdeck/redis": "^1.0.2", + "@lowerdeck/sentry": "^1.0.1", "bullmq": "^5.34.3" }, "devDependencies": { diff --git a/packages/cron/src/index.ts b/packages/cron/src/index.ts index 54d399e..b78ab75 100644 --- a/packages/cron/src/index.ts +++ b/packages/cron/src/index.ts @@ -1,7 +1,12 @@ +import { createExecutionContext, provideExecutionContext } from '@lowerdeck/execution-context'; +import { generateCustomId } from '@lowerdeck/id'; import { IQueueProcessor } from '@lowerdeck/queue'; import { parseRedisUrl } from '@lowerdeck/redis'; +import { getSentry } from '@lowerdeck/sentry'; import { Queue, Worker } from 'bullmq'; +let Sentry = getSentry(); + let log = (...any: any[]) => console.log('[CRON MANAGER]:', ...any); let seenNames = new Set(); @@ -49,8 +54,28 @@ export let createCron = ( let worker = new Worker( opts.name, async () => { - log(`Running cron job ${opts.name}`); - await handler(); + provideExecutionContext( + createExecutionContext({ + type: 'scheduled', + contextId: generateCustomId('cron_'), + cron: opts.cron, + name: opts.name + }), + async () => { + log(`Running cron job ${opts.name}`); + + try { + await handler(); + } catch (err) { + Sentry.captureException(err, { + tags: { + cronName: opts.name + } + }); + throw err; + } + } + ); }, { connection } ); diff --git a/packages/queue/CHANGELOG.md b/packages/queue/CHANGELOG.md index d653bd0..282a12b 100644 --- a/packages/queue/CHANGELOG.md +++ b/packages/queue/CHANGELOG.md @@ -1,5 +1,11 @@ # @lowerdeck/queue +## 1.0.3 + +### Patch Changes + +- add sentry and execution context + ## 1.0.2 ### Patch Changes diff --git a/packages/queue/package.json b/packages/queue/package.json index b13c58e..cf56a4a 100644 --- a/packages/queue/package.json +++ b/packages/queue/package.json @@ -1,6 +1,6 @@ { "name": "@lowerdeck/queue", - "version": "1.0.2", + "version": "1.0.3", "publishConfig": { "access": "public" }, @@ -25,8 +25,11 @@ }, "dependencies": { "@lowerdeck/delay": "^1.0.3", + "@lowerdeck/execution-context": "^1.0.1", + "@lowerdeck/id": "^1.0.4", "@lowerdeck/memo": "^1.0.3", "@lowerdeck/redis": "^1.0.2", + "@lowerdeck/sentry": "^1.0.1", "bullmq": "^5.66.0", "superjson": "^2.2.6" }, diff --git a/packages/queue/src/drivers/bullmq.ts b/packages/queue/src/drivers/bullmq.ts index 5aa8490..a7b28c5 100644 --- a/packages/queue/src/drivers/bullmq.ts +++ b/packages/queue/src/drivers/bullmq.ts @@ -1,6 +1,14 @@ import { delay } from '@lowerdeck/delay'; +import { + createExecutionContext, + ExecutionContext, + provideExecutionContext, + withExecutionContextOptional +} from '@lowerdeck/execution-context'; +import { generateSnowflakeId } from '@lowerdeck/id'; import { memo } from '@lowerdeck/memo'; import { parseRedisUrl } from '@lowerdeck/redis'; +import { getSentry } from '@lowerdeck/sentry'; import { DeduplicationOptions, JobsOptions, @@ -16,6 +24,8 @@ import { IQueue } from '../types'; // @ts-ignore import SuperJson from 'superjson'; +let Sentry = getSentry(); + let log = (...any: any[]) => console.log('[QUEUE MANAGER]:', ...any); let anyQueueStartedRef = { started: false }; @@ -57,16 +67,20 @@ export let createBullMqQueue = ( name: opts.name, add: async (payload, opts) => { - let job = await queue.add( - 'j' as any, - { - payload: SuperJson.serialize(payload) - } as any, - { - delay: opts?.delay, - jobId: opts?.id, - deduplication: opts?.deduplication - } + let job = await withExecutionContextOptional( + async ctx => + await queue.add( + 'j' as any, + { + payload: SuperJson.serialize(payload), + $$execution_context$$: ctx + } as any, + { + delay: opts?.delay, + jobId: opts?.id, + deduplication: opts?.deduplication + } + ) ); return { @@ -78,41 +92,47 @@ export let createBullMqQueue = ( }, addMany: async (payloads, opts) => { - await queue.addBulk( - payloads.map( - payload => - ({ - name: 'j', - data: { - payload: SuperJson.serialize(payload) - }, - opts: { - delay: opts?.delay, - jobId: opts?.id, - deduplication: opts?.deduplication - } - }) as any - ) - ); + await withExecutionContextOptional(async ctx => { + await queue.addBulk( + payloads.map( + payload => + ({ + name: 'j', + data: { + payload: SuperJson.serialize(payload), + $$execution_context$$: ctx + }, + opts: { + delay: opts?.delay, + jobId: opts?.id, + deduplication: opts?.deduplication + } + }) as any + ) + ); + }); }, addManyWithOps: async payloads => { - await queue.addBulk( - payloads.map( - payload => - ({ - name: 'j', - data: { - payload: SuperJson.serialize(payload.data) - }, - opts: { - delay: payload.opts?.delay, - jobId: payload.opts?.id, - deduplication: payload.opts?.deduplication - } - }) as any - ) - ); + await withExecutionContextOptional(async ctx => { + await queue.addBulk( + payloads.map( + payload => + ({ + name: 'j', + data: { + payload: SuperJson.serialize(payload.data), + $$execution_context$$: ctx + }, + opts: { + delay: payload.opts?.delay, + jobId: payload.opts?.id, + deduplication: payload.opts?.deduplication + } + }) as any + ) + ); + }); }, process: cb => { @@ -144,13 +164,31 @@ export let createBullMqQueue = ( payload = data.payload; } - await cb(payload as any, job); + let parentExecutionContext = (data as any) + .$$execution_context$$ as ExecutionContext; + while ( + parentExecutionContext && + parentExecutionContext.type == 'job' && + parentExecutionContext.parent + ) + parentExecutionContext = parentExecutionContext.parent; + + await provideExecutionContext( + createExecutionContext({ + type: 'job', + contextId: job.id ?? generateSnowflakeId(), + queue: opts.name, + parent: parentExecutionContext + }), + () => cb(payload as any, job) + ); } catch (e: any) { if (e instanceof QueueRetryError) { await delay(1000); throw e; } else { - console.error(`[QUEUE ERROR - ${opts.name}]`, e); + Sentry.captureException(e); + console.error(e); throw e; } } diff --git a/packages/queue/src/index.ts b/packages/queue/src/index.ts index a23395a..81cba6d 100644 --- a/packages/queue/src/index.ts +++ b/packages/queue/src/index.ts @@ -14,17 +14,14 @@ export let createQueue = (opts: { driver?: 'bullmq' } & BullMqCreateOpt } seenNames.add(opts.name); - if (opts.driver === 'bullmq') { - return createBullMqQueue({ - name: opts.name, - jobOpts: opts.jobOpts, - queueOpts: opts.queueOpts, - workerOpts: opts.workerOpts, - redisUrl: opts.redisUrl - }); - } + return createBullMqQueue({ + name: opts.name, + redisUrl: opts.redisUrl, - throw new Error(`Unknown queue driver: ${opts.driver}`); + jobOpts: opts.jobOpts, + queueOpts: opts.queueOpts, + workerOpts: opts.workerOpts + }); }; export let combineQueueProcessors = (opts: IQueueProcessor[]): IQueueProcessor => {