Skip to content

Commit

Permalink
fix #46 bullmq webhook sender
Browse files Browse the repository at this point in the history
  • Loading branch information
dillonstreator committed Jan 10, 2025
1 parent e487d2e commit 99b0d0a
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 32 deletions.
5 changes: 3 additions & 2 deletions packages/server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"@babel/core": "^7.23.3",
"@babel/preset-env": "^7.23.3",
"@babel/preset-typescript": "^7.23.3",
"@crypt.fyi/core": "*",
"@tapjs/typescript": "^3.0.0",
"@types/bytes": "^3.1.4",
"@types/jest": "^29.5.8",
Expand All @@ -34,8 +35,7 @@
"tap": "^21.0.1",
"ts-node": "^10.9.2",
"typescript": "^5.2.2",
"undici": "^6.11.1",
"@crypt.fyi/core": "*"
"undici": "^6.11.1"
},
"dependencies": {
"@fastify/compress": "^7.0.0",
Expand All @@ -50,6 +50,7 @@
"@opentelemetry/sdk-node": "^0.45.1",
"@opentelemetry/sdk-trace-base": "^1.18.1",
"@opentelemetry/semantic-conventions": "^1.18.1",
"bullmq": "^5.34.8",
"bytes": "^3.1.2",
"fastify": "^4.28.1",
"fastify-type-provider-zod": "^2.0.0",
Expand Down
17 changes: 10 additions & 7 deletions packages/server/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,22 @@ import gracefulShutdown from 'http-graceful-shutdown';
import Redis from 'ioredis';
import { createRedisVault } from './vault/redis';
import { createTokenGenerator } from './vault/tokens';
import { createHTTPJSONWebhookSender } from './webhook';
import { createBullMQWebhookSender } from './webhook';

const main = async () => {
const logger = await initLogging(config);
const redis = new Redis(config.redisUrl);
await redis.ping();
const bullmqRedis = new Redis(config.redisUrl, { maxRetriesPerRequest: null });
const tokenGenerator = createTokenGenerator({
vaultEntryIdentifierLength: config.vaultEntryIdentifierLength,
vaultEntryDeleteTokenLength: config.vaultEntryDeleteTokenLength,
});
const vault = createRedisVault(
redis,
tokenGenerator,
createHTTPJSONWebhookSender(logger),
config.encryptionKey,
);
const { webhookSender, cleanup: cleanupWebhookSender } = createBullMQWebhookSender({
logger,
redis: bullmqRedis,
});
const vault = createRedisVault(redis, tokenGenerator, webhookSender, config.encryptionKey);

const app = await initApp(config, {
logger,
Expand All @@ -42,6 +42,9 @@ const main = async () => {
},
onShutdown: async () => {
await app.shutdown();
await cleanupWebhookSender();
await bullmqRedis.quit();
await redis.quit();
await otlpShutdown();
},
finally: () => {
Expand Down
2 changes: 2 additions & 0 deletions packages/server/src/logging.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ export const initLogging = async (config: Config): Promise<pino.Logger> => {
version: config.serviceVersion,
},
serializers: {
err: pino.stdSerializers.err,
error: pino.stdSerializers.err,
// https://fastify.dev/docs/v2.15.x/Documentation/Logging/#log-redaction
req(request) {
return {
Expand Down
108 changes: 86 additions & 22 deletions packages/server/src/webhook.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { Queue, Worker } from 'bullmq';
import { Logger } from './logging';
import Redis from 'ioredis';

export type WebhookEvent = 'READ' | 'BURN' | 'FAILURE_KEY_PASSWORD' | 'FAILURE_IP_ADDRESS';

Expand All @@ -15,16 +17,61 @@ export interface WebhookSender {
send(message: Message): Promise<void>;
}

const DEFAULT_REQUEST_TIMEOUT_MS = 3000;

export const createHTTPJSONWebhookSender = (
logger: Logger,
fetchFn: typeof fetch = fetch,
): WebhookSender => {
export const createNopWebhookSender = (): WebhookSender => {
return {
send: async (message) => {
send: async () => {},
};
};

type BullMQWebhookSenderOptions = {
logger: Logger;
redis: Redis;
fetchFn?: typeof fetch;
requestTimeoutMs?: number;
maxAttempts?: number;
backoffType?: 'fixed' | 'exponential';
backoffDelayMs?: number;
removeOnComplete?: boolean;
removeOnFail?: boolean;
};

export const createBullMQWebhookSender = ({
logger,
redis,
fetchFn = fetch,
requestTimeoutMs = 3000,
maxAttempts = 5,
backoffType = 'exponential',
backoffDelayMs = 1000,
removeOnComplete = true,
removeOnFail = true,
}: BullMQWebhookSenderOptions): { webhookSender: WebhookSender; cleanup: () => Promise<void> } => {
const QUEUE_NAME = 'webhooks';
const JOB_NAME = 'webhook';
const queue = new Queue<Message>(QUEUE_NAME, {
connection: redis,
defaultJobOptions: {
attempts: maxAttempts,
removeOnComplete,
removeOnFail,
backoff: {
type: backoffType,
delay: backoffDelayMs,
},
},
});
const worker = new Worker<Message>(
QUEUE_NAME,
async (job) => {
if (job.name !== JOB_NAME) {
logger.warn({ job: job.name }, 'skipping unexpected job');
return;
}

const { data: message } = job;

const ac = new AbortController();
setTimeout(() => ac.abort(), DEFAULT_REQUEST_TIMEOUT_MS);
setTimeout(() => ac.abort(), requestTimeoutMs);
try {
const response = await fetchFn(message.url, {
method: 'POST',
Expand All @@ -35,32 +82,49 @@ export const createHTTPJSONWebhookSender = (
signal: ac.signal,
});
if (!response.ok) {
logger.error(
{
event: message.event,
id: message.id,
status: response.status,
statusText: response.statusText,
},
'Failed to send webhook',
);
throw new Error(`Unexpected status code: ${response.status}`);
}

logger.info({ jobId: job.id, event: message.event, id: message.id }, 'Webhook sent');
} catch (error) {
logger.error(
logger.info(
{
jobId: job.id,
event: message.event,
id: message.id,
error,
},
'Failed to send webhook',
);
throw error;
}
},
};
};
{ connection: redis },
);

worker.on('failed', (job, error) => {
if (job && job.attemptsMade >= maxAttempts) {
logger.info(
{
jobId: job.id,
id: job.data.id,
event: job.data.event,
error,
attempts: job.attemptsMade,
},
'Webhook failed permanently after all retry attempts',
);
}
});

export const createNopWebhookSender = (): WebhookSender => {
return {
send: async () => {},
webhookSender: {
send: async (message) => {
await queue.add(JOB_NAME, message);
},
},
cleanup: async () => {
await Promise.all([queue.close(), worker.close()]);
},
};
};
95 changes: 94 additions & 1 deletion yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1917,6 +1917,36 @@
resolved "https://registry.npmjs.org/@lukeed/ms/-/ms-2.0.2.tgz"
integrity sha512-9I2Zn6+NJLfaGoz9jN3lpwDgAYvfGeNYdbAIjJOqzs4Tpc+VU3Jqq4IofSUBKajiDS8k9fZIg18/z13mpk1bsA==

"@msgpackr-extract/msgpackr-extract-darwin-arm64@3.0.3":
version "3.0.3"
resolved "https://registry.yarnpkg.com/@msgpackr-extract/msgpackr-extract-darwin-arm64/-/msgpackr-extract-darwin-arm64-3.0.3.tgz#9edec61b22c3082018a79f6d1c30289ddf3d9d11"
integrity sha512-QZHtlVgbAdy2zAqNA9Gu1UpIuI8Xvsd1v8ic6B2pZmeFnFcMWiPLfWXh7TVw4eGEZ/C9TH281KwhVoeQUKbyjw==

"@msgpackr-extract/msgpackr-extract-darwin-x64@3.0.3":
version "3.0.3"
resolved "https://registry.yarnpkg.com/@msgpackr-extract/msgpackr-extract-darwin-x64/-/msgpackr-extract-darwin-x64-3.0.3.tgz#33677a275204898ad8acbf62734fc4dc0b6a4855"
integrity sha512-mdzd3AVzYKuUmiWOQ8GNhl64/IoFGol569zNRdkLReh6LRLHOXxU4U8eq0JwaD8iFHdVGqSy4IjFL4reoWCDFw==

"@msgpackr-extract/msgpackr-extract-linux-arm64@3.0.3":
version "3.0.3"
resolved "https://registry.yarnpkg.com/@msgpackr-extract/msgpackr-extract-linux-arm64/-/msgpackr-extract-linux-arm64-3.0.3.tgz#19edf7cdc2e7063ee328403c1d895a86dd28f4bb"
integrity sha512-YxQL+ax0XqBJDZiKimS2XQaf+2wDGVa1enVRGzEvLLVFeqa5kx2bWbtcSXgsxjQB7nRqqIGFIcLteF/sHeVtQg==

"@msgpackr-extract/msgpackr-extract-linux-arm@3.0.3":
version "3.0.3"
resolved "https://registry.yarnpkg.com/@msgpackr-extract/msgpackr-extract-linux-arm/-/msgpackr-extract-linux-arm-3.0.3.tgz#94fb0543ba2e28766c3fc439cabbe0440ae70159"
integrity sha512-fg0uy/dG/nZEXfYilKoRe7yALaNmHoYeIoJuJ7KJ+YyU2bvY8vPv27f7UKhGRpY6euFYqEVhxCFZgAUNQBM3nw==

"@msgpackr-extract/msgpackr-extract-linux-x64@3.0.3":
version "3.0.3"
resolved "https://registry.yarnpkg.com/@msgpackr-extract/msgpackr-extract-linux-x64/-/msgpackr-extract-linux-x64-3.0.3.tgz#4a0609ab5fe44d07c9c60a11e4484d3c38bbd6e3"
integrity sha512-cvwNfbP07pKUfq1uH+S6KJ7dT9K8WOE4ZiAcsrSes+UY55E/0jLYc+vq+DO7jlmqRb5zAggExKm0H7O/CBaesg==

"@msgpackr-extract/msgpackr-extract-win32-x64@3.0.3":
version "3.0.3"
resolved "https://registry.yarnpkg.com/@msgpackr-extract/msgpackr-extract-win32-x64/-/msgpackr-extract-win32-x64-3.0.3.tgz#0aa5502d547b57abfc4ac492de68e2006e417242"
integrity sha512-x0fWaQtYp4E6sktbsdAqnehxDgEc/VwM7uLsRCYWaiGu0ykYdZPiS8zCWdnjHwyiumousxfBm4SO31eXqwEZhQ==

"@noble/ciphers@^1.1.3":
version "1.1.3"
resolved "https://registry.npmjs.org/@noble/ciphers/-/ciphers-1.1.3.tgz"
Expand Down Expand Up @@ -5076,6 +5106,19 @@ buffer@^6.0.3:
base64-js "^1.3.1"
ieee754 "^1.2.1"

bullmq@^5.34.8:
version "5.34.8"
resolved "https://registry.yarnpkg.com/bullmq/-/bullmq-5.34.8.tgz#7feb80b4a55ff5401b1ae72be3f5e30244e1e56a"
integrity sha512-id5mmPg3K8tNXQ9VVlmUxBSeLmliIWUrB8Hd5c62PFrIiHywz4TN1PEqU6OWvYXEvoFCr8/BlnbE4JCrGqPVmg==
dependencies:
cron-parser "^4.9.0"
ioredis "^5.4.1"
msgpackr "^1.11.2"
node-abort-controller "^3.1.1"
semver "^7.5.4"
tslib "^2.0.0"
uuid "^9.0.0"

bundle-require@^5.0.0:
version "5.0.0"
resolved "https://registry.npmjs.org/bundle-require/-/bundle-require-5.0.0.tgz"
Expand Down Expand Up @@ -5518,6 +5561,13 @@ create-require@^1.1.0:
resolved "https://registry.npmjs.org/create-require/-/create-require-1.1.1.tgz"
integrity sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==

cron-parser@^4.9.0:
version "4.9.0"
resolved "https://registry.yarnpkg.com/cron-parser/-/cron-parser-4.9.0.tgz#0340694af3e46a0894978c6f52a6dbb5c0f11ad5"
integrity sha512-p0SaNjrHOnQeR8/VnfGbmg9te2kfyYSQ7Sc/j/6DtPL3JQvKxmjO9TSjNFpujqV3vEYYBvNNvXSxzyksBWAx1Q==
dependencies:
luxon "^3.2.1"

cross-env@^7.0.3:
version "7.0.3"
resolved "https://registry.yarnpkg.com/cross-env/-/cross-env-7.0.3.tgz#865264b29677dc015ba8418918965dd232fc54cf"
Expand Down Expand Up @@ -5662,6 +5712,11 @@ depd@2.0.0:
resolved "https://registry.npmjs.org/depd/-/depd-2.0.0.tgz"
integrity sha512-g7nH6P6dyDioJogAAGprGpCtVImJhpPk/roCzdb3fIh61/s/nPsfR6onyMwkCAR/OlC3yBC0lESvUoQEAssIrw==

detect-libc@^2.0.1:
version "2.0.3"
resolved "https://registry.yarnpkg.com/detect-libc/-/detect-libc-2.0.3.tgz#f0cd503b40f9939b894697d19ad50895e30cf700"
integrity sha512-bwy0MGW55bG41VqxxypOsdSdGqLwXPI/focwgTYCFMbdUiBAxLg9CFzG08sz2aqzknwiX7Hkl0bQENjg8iLByw==

detect-newline@^3.0.0:
version "3.1.0"
resolved "https://registry.npmjs.org/detect-newline/-/detect-newline-3.1.0.tgz"
Expand Down Expand Up @@ -8117,6 +8172,11 @@ lru-cache@^5.1.1:
dependencies:
yallist "^3.0.2"

luxon@^3.2.1:
version "3.5.0"
resolved "https://registry.yarnpkg.com/luxon/-/luxon-3.5.0.tgz#6b6f65c5cd1d61d1fd19dbf07ee87a50bf4b8e20"
integrity sha512-rh+Zjr6DNfUYR3bPwJEnuwDdqMbxZW7LOQfUN4B54+Cl+0o5zaU9RJ6bcidfDtC1cWCZXQ+nvX8bf6bAji37QQ==

magic-string@^0.30.5:
version "0.30.17"
resolved "https://registry.npmjs.org/magic-string/-/magic-string-0.30.17.tgz"
Expand Down Expand Up @@ -8399,6 +8459,27 @@ ms@^2.1.3:
resolved "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz"
integrity sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==

msgpackr-extract@^3.0.2:
version "3.0.3"
resolved "https://registry.yarnpkg.com/msgpackr-extract/-/msgpackr-extract-3.0.3.tgz#e9d87023de39ce714872f9e9504e3c1996d61012"
integrity sha512-P0efT1C9jIdVRefqjzOQ9Xml57zpOXnIuS+csaB4MdZbTdmGDLo8XhzBG1N7aO11gKDDkJvBLULeFTo46wwreA==
dependencies:
node-gyp-build-optional-packages "5.2.2"
optionalDependencies:
"@msgpackr-extract/msgpackr-extract-darwin-arm64" "3.0.3"
"@msgpackr-extract/msgpackr-extract-darwin-x64" "3.0.3"
"@msgpackr-extract/msgpackr-extract-linux-arm" "3.0.3"
"@msgpackr-extract/msgpackr-extract-linux-arm64" "3.0.3"
"@msgpackr-extract/msgpackr-extract-linux-x64" "3.0.3"
"@msgpackr-extract/msgpackr-extract-win32-x64" "3.0.3"

msgpackr@^1.11.2:
version "1.11.2"
resolved "https://registry.yarnpkg.com/msgpackr/-/msgpackr-1.11.2.tgz#4463b7f7d68f2e24865c395664973562ad24473d"
integrity sha512-F9UngXRlPyWCDEASDpTf6c9uNhGPTqnTeLVt7bN+bU1eajoR/8V9ys2BRaV5C/e5ihE6sJ9uPIKaYt6bFuO32g==
optionalDependencies:
msgpackr-extract "^3.0.2"

multimatch@6.0.0:
version "6.0.0"
resolved "https://registry.yarnpkg.com/multimatch/-/multimatch-6.0.0.tgz#c72a9bddbc94baa4727efd613b5d22a1fe4d6ee3"
Expand Down Expand Up @@ -8452,6 +8533,11 @@ negotiator@^0.6.3:
resolved "https://registry.npmjs.org/negotiator/-/negotiator-0.6.4.tgz"
integrity sha512-myRT3DiWPHqho5PrJaIRyaMv2kgYf0mUVgBNOYMuCH5Ki1yEiQaf/ZJuQ62nvpc44wL5WDbTX7yGJi1Neevw8w==

node-abort-controller@^3.1.1:
version "3.1.1"
resolved "https://registry.yarnpkg.com/node-abort-controller/-/node-abort-controller-3.1.1.tgz#a94377e964a9a37ac3976d848cb5c765833b8548"
integrity sha512-AGK2yQKIjRuqnc6VkX2Xj5d+QW8xZ87pa1UK6yA6ouUyuxfHuMP6umE5QK7UmTeOAymo+Zx1Fxiuw9rVx8taHQ==

node-fetch@^2.6.9:
version "2.7.0"
resolved "https://registry.npmjs.org/node-fetch/-/node-fetch-2.7.0.tgz"
Expand All @@ -8464,6 +8550,13 @@ node-forge@^1.3.1:
resolved "https://registry.yarnpkg.com/node-forge/-/node-forge-1.3.1.tgz#be8da2af243b2417d5f646a770663a92b7e9ded3"
integrity sha512-dPEtOeMvF9VMcYV/1Wb8CPoVAXtp6MKMlcbAt4ddqmGqUJ6fQZFXkNZNkNlfevtNkGtaSoXf/vNNNSvgrdXwtA==

node-gyp-build-optional-packages@5.2.2:
version "5.2.2"
resolved "https://registry.yarnpkg.com/node-gyp-build-optional-packages/-/node-gyp-build-optional-packages-5.2.2.tgz#522f50c2d53134d7f3a76cd7255de4ab6c96a3a4"
integrity sha512-s+w+rBWnpTMwSFbaE0UXsRlg7hU4FjekKU4eyAih5T8nJuNZT1nNsskXpxmeqSK9UzkBl6UgRlnKc8hz8IEqOw==
dependencies:
detect-libc "^2.0.1"

node-gyp@^10.0.0:
version "10.2.0"
resolved "https://registry.npmjs.org/node-gyp/-/node-gyp-10.2.0.tgz"
Expand Down Expand Up @@ -10989,7 +11082,7 @@ uuid@^8.3.2:
resolved "https://registry.npmjs.org/uuid/-/uuid-8.3.2.tgz"
integrity sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg==

uuid@^9.0.1:
uuid@^9.0.0, uuid@^9.0.1:
version "9.0.1"
resolved "https://registry.npmjs.org/uuid/-/uuid-9.0.1.tgz"
integrity sha512-b+1eJOlsR9K8HJpow9Ok3fiWOWSIcIzXodvv0rQjVoOVNpWMpxf1wZNpt4y9h10odCNrqnYp1OBzRktckBe3sA==
Expand Down

0 comments on commit 99b0d0a

Please sign in to comment.