diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 000000000..6927c5803 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,18 @@ +{ + "version": "0.2.0", + "configurations": [ + { + "type": "node", + "request": "launch", + "name": "ts client: jest", + "localRoot": "${workspaceFolder}/components/client/typescript", + "program": "${workspaceFolder}/components/client/typescript/node_modules/jest/bin/jest", + "args": [ + "--runInBand", + "--no-cache", + ], + "outputCapture": "std", + "console": "integratedTerminal", + }, + ] +} diff --git a/components/client/typescript/.gitignore b/components/client/typescript/.gitignore index 1eae0cf67..2c9699a8d 100644 --- a/components/client/typescript/.gitignore +++ b/components/client/typescript/.gitignore @@ -1,2 +1,3 @@ dist/ node_modules/ +tmp/ diff --git a/components/client/typescript/jest.config.js b/components/client/typescript/jest.config.js new file mode 100644 index 000000000..355d1892e --- /dev/null +++ b/components/client/typescript/jest.config.js @@ -0,0 +1,15 @@ +module.exports = { + collectCoverageFrom: [ + "src/**/*.ts", + ], + coverageProvider: "v8", + // globalSetup: './tests/setup.ts', + preset: 'ts-jest', + rootDir: '', + testPathIgnorePatterns: [ + "/node_modules/", + "/dist/" + ], + transform: {}, + transformIgnorePatterns: ["./dist/.+\\.js"] +}; diff --git a/components/client/typescript/package-lock.json b/components/client/typescript/package-lock.json index 56cca4649..8011fe4b0 100644 --- a/components/client/typescript/package-lock.json +++ b/components/client/typescript/package-lock.json @@ -1,12 +1,12 @@ { "name": "@hirosystems/chainhook-client", - "version": "1.12.0", + "version": "2.0.0", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@hirosystems/chainhook-client", - "version": "1.12.0", + "version": "2.0.0", "license": "Apache 2.0", "dependencies": { "@fastify/type-provider-typebox": "^3.2.0", diff --git a/components/client/typescript/package.json b/components/client/typescript/package.json index 79208d1f9..f93033738 100644 --- a/components/client/typescript/package.json +++ b/components/client/typescript/package.json @@ -1,12 +1,12 @@ { "name": "@hirosystems/chainhook-client", - "version": "1.12.0", + "version": "2.0.0", "description": "Chainhook TypeScript client", "main": "./dist/index.js", "typings": "./dist/index.d.ts", "scripts": { "build": "rimraf ./dist && tsc --project tsconfig.build.json", - "test": "jest", + "test": "jest --runInBand", "prepublishOnly": "npm run build" }, "files": [ diff --git a/components/client/typescript/src/index.ts b/components/client/typescript/src/index.ts index 3a990e655..8cbaab0bb 100644 --- a/components/client/typescript/src/index.ts +++ b/components/client/typescript/src/index.ts @@ -1,11 +1,94 @@ import { FastifyInstance } from 'fastify'; -import { - ServerOptions, - ChainhookNodeOptions, - ServerPredicate, - OnEventCallback, - buildServer, -} from './server'; +import { buildServer } from './server'; +import { predicateHealthCheck } from './predicates'; +import { Payload } from './schemas/payload'; +import { Static, Type } from '@fastify/type-provider-typebox'; +import { BitcoinIfThisOptionsSchema, BitcoinIfThisSchema } from './schemas/bitcoin/if_this'; +import { StacksIfThisOptionsSchema, StacksIfThisSchema } from './schemas/stacks/if_this'; +import { logger } from './util/logger'; + +const EventObserverOptionsSchema = Type.Object({ + /** Event observer host name (usually '0.0.0.0') */ + hostname: Type.String(), + /** Event observer port */ + port: Type.Integer(), + /** Authorization token for all Chainhook payloads */ + auth_token: Type.String(), + /** Base URL that will be used by Chainhook to send all payloads to this event observer */ + external_base_url: Type.String(), + /** Wait for the chainhook node to be available before submitting predicates */ + wait_for_chainhook_node: Type.Optional(Type.Boolean({ default: true })), + /** Validate the JSON schema of received chainhook payloads and report errors when invalid */ + validate_chainhook_payloads: Type.Optional(Type.Boolean({ default: false })), + /** Validate the authorization token sent by the server is correct. */ + validate_token_authorization: Type.Optional(Type.Boolean({ default: true })), + /** Size limit for received chainhook payloads (default 40MB) */ + body_limit: Type.Optional(Type.Number({ default: 41943040 })), + /** Node type: `chainhook` or `ordhook` */ + node_type: Type.Optional( + Type.Union([Type.Literal('chainhook'), Type.Literal('ordhook')], { + default: 'chainhook', + }) + ), + /** + * Directory where registered predicates will be persisted to disk so they can be recalled on + * restarts. + */ + predicate_disk_file_path: Type.String(), + /** + * How often we should check with the Chainhook server to make sure our predicates are active and + * up to date. If they become obsolete, we will attempt to re-register them. + */ + predicate_health_check_interval_ms: Type.Optional(Type.Integer({ default: 5000 })), +}); +/** Chainhook event observer configuration options */ +export type EventObserverOptions = Static; + +const ChainhookNodeOptionsSchema = Type.Object({ + /** Base URL where the Chainhook node is located */ + base_url: Type.String(), +}); +/** Chainhook node connection options */ +export type ChainhookNodeOptions = Static; + +/** + * Callback that will receive every single payload sent by Chainhook as a result of any predicates + * that have been registered. + */ +export type OnPredicatePayloadCallback = (payload: Payload) => Promise; + +const IfThisThenNothingSchema = Type.Union([ + Type.Composite([ + BitcoinIfThisOptionsSchema, + Type.Object({ + if_this: BitcoinIfThisSchema, + }), + ]), + Type.Composite([ + StacksIfThisOptionsSchema, + Type.Object({ + if_this: StacksIfThisSchema, + }), + ]), +]); +export const EventObserverPredicateSchema = Type.Composite([ + Type.Object({ + name: Type.String(), + version: Type.Integer(), + chain: Type.String(), + }), + Type.Object({ + networks: Type.Object({ + mainnet: Type.Optional(IfThisThenNothingSchema), + testnet: Type.Optional(IfThisThenNothingSchema), + }), + }), +]); +/** + * Partial predicate definition that allows users to build the core parts of a predicate and let the + * event observer fill in the rest. + */ +export type EventObserverPredicate = Static; /** * Local web server that registers predicates and receives events from a Chainhook node. It handles @@ -20,29 +103,43 @@ import { */ export class ChainhookEventObserver { private fastify?: FastifyInstance; - private serverOpts: ServerOptions; - private chainhookOpts: ChainhookNodeOptions; + private observer: EventObserverOptions; + private chainhook: ChainhookNodeOptions; + private healthCheckTimer?: NodeJS.Timer; - constructor(serverOpts: ServerOptions, chainhookOpts: ChainhookNodeOptions) { - this.serverOpts = serverOpts; - this.chainhookOpts = chainhookOpts; + constructor(observer: EventObserverOptions, chainhook: ChainhookNodeOptions) { + this.observer = observer; + this.chainhook = chainhook; } /** - * Start the Chainhook event server. - * @param predicates - Predicates to register + * Starts the Chainhook event observer. + * @param predicates - Predicates to register. If `predicates_disk_file_path` is enabled in the + * observer, predicates stored on disk will take precedent over those specified here. * @param callback - Function to handle every Chainhook event payload sent by the node */ - async start(predicates: ServerPredicate[], callback: OnEventCallback): Promise { + async start( + predicates: EventObserverPredicate[], + callback: OnPredicatePayloadCallback + ): Promise { if (this.fastify) return; - this.fastify = await buildServer(this.serverOpts, this.chainhookOpts, predicates, callback); - await this.fastify.listen({ host: this.serverOpts.hostname, port: this.serverOpts.port }); + this.fastify = await buildServer(this.observer, this.chainhook, predicates, callback); + await this.fastify.listen({ host: this.observer.hostname, port: this.observer.port }); + if (this.observer.predicate_health_check_interval_ms && this.healthCheckTimer === undefined) { + this.healthCheckTimer = setInterval(() => { + predicateHealthCheck(this.observer, this.chainhook).catch(err => + logger.error(err, `ChainhookEventObserver predicate health check error`) + ); + }, this.observer.predicate_health_check_interval_ms); + } } /** * Stop the Chainhook event server gracefully. */ async close(): Promise { + if (this.healthCheckTimer) clearInterval(this.healthCheckTimer); + this.healthCheckTimer = undefined; await this.fastify?.close(); this.fastify = undefined; } diff --git a/components/client/typescript/src/predicates.ts b/components/client/typescript/src/predicates.ts new file mode 100644 index 000000000..93d08ed47 --- /dev/null +++ b/components/client/typescript/src/predicates.ts @@ -0,0 +1,238 @@ +import * as fs from 'fs'; +import * as path from 'path'; +import { logger } from './util/logger'; +import { + Predicate, + PredicateSchema, + SerializedPredicateResponse, + ThenThatHttpPost, +} from './schemas/predicate'; +import { request } from 'undici'; +import { TypeCompiler } from '@sinclair/typebox/compiler'; +import { ChainhookNodeOptions, EventObserverOptions, EventObserverPredicate } from '.'; +import { randomUUID } from 'crypto'; + +/** Keeps the on-disk predicates in memory for faster access. */ +const RegisteredPredicates = new Map(); + +const CompiledPredicateSchema = TypeCompiler.Compile(PredicateSchema); + +/** + * Looks on disk and returns a map of registered Predicates, where the key is the predicate `name` + * as defined by the user. + */ +export function recallPersistedPredicatesFromDisk(basePath: string): Map { + RegisteredPredicates.clear(); + try { + if (!fs.existsSync(basePath)) return RegisteredPredicates; + for (const file of fs.readdirSync(basePath)) { + if (file.endsWith('.json')) { + const text = fs.readFileSync(path.join(basePath, file), 'utf-8'); + const predicate = JSON.parse(text) as JSON; + if (CompiledPredicateSchema.Check(predicate)) { + logger.info( + `ChainhookEventObserver recalled predicate '${predicate.name}' (${predicate.uuid}) from disk` + ); + RegisteredPredicates.set(predicate.name, predicate); + } + } + } + } catch (error) { + logger.error(error, `ChainhookEventObserver unable to retrieve persisted predicates from disk`); + RegisteredPredicates.clear(); + } + return RegisteredPredicates; +} + +export function savePredicateToDisk(basePath: string, predicate: Predicate) { + const predicatePath = `${basePath}/predicate-${encodeURIComponent(predicate.name)}.json`; + try { + fs.mkdirSync(basePath, { recursive: true }); + fs.writeFileSync(predicatePath, JSON.stringify(predicate, null, 2)); + logger.info( + `ChainhookEventObserver persisted predicate '${predicate.name}' (${predicate.uuid}) to disk` + ); + } catch (error) { + logger.error( + error, + `ChainhookEventObserver unable to persist predicate '${predicate.name}' (${predicate.uuid}) to disk` + ); + } +} + +function deletePredicateFromDisk(basePath: string, predicate: Predicate) { + const predicatePath = `${basePath}/predicate-${encodeURIComponent(predicate.name)}.json`; + if (fs.existsSync(predicatePath)) { + fs.rmSync(predicatePath); + logger.info( + `ChainhookEventObserver deleted predicate '${predicate.name}' (${predicate.uuid}) from disk` + ); + } +} + +/** Checks the Chainhook node to see if a predicate is still valid and active */ +async function isPredicateActive( + predicate: Predicate, + chainhook: ChainhookNodeOptions +): Promise { + try { + const result = await request(`${chainhook.base_url}/v1/chainhooks/${predicate.uuid}`, { + method: 'GET', + headers: { accept: 'application/json' }, + throwOnError: true, + }); + const response = (await result.body.json()) as SerializedPredicateResponse; + if (response.status == 404) return undefined; + if ( + response.result.enabled == false || + response.result.status.type == 'interrupted' || + response.result.status.type == 'unconfirmed_expiration' || + response.result.status.type == 'confirmed_expiration' + ) { + return false; + } + return true; + } catch (error) { + logger.error( + error, + `ChainhookEventObserver unable to check if predicate '${predicate.name}' (${predicate.uuid}) is active` + ); + return false; + } +} + +/** + * Registers a predicate in the Chainhook server. Automatically handles pre-existing predicates + * found on disk. + */ +async function registerPredicate( + pendingPredicate: EventObserverPredicate, + diskPredicates: Map, + observer: EventObserverOptions, + chainhook: ChainhookNodeOptions +) { + // First check if we've already registered this predicate in the past, and if so, make sure it's + // still active on the Chainhook server. + if (observer.node_type === 'chainhook') { + const diskPredicate = diskPredicates.get(pendingPredicate.name); + if (diskPredicate) { + switch (await isPredicateActive(diskPredicate, chainhook)) { + case true: + logger.debug( + `ChainhookEventObserver predicate '${diskPredicate.name}' (${diskPredicate.uuid}) is active` + ); + return; + case undefined: + logger.info( + `ChainhookEventObserver predicate '${diskPredicate.name}' (${diskPredicate.uuid}) found on disk but not on the Chainhook server` + ); + break; + case false: + logger.info( + `ChainhookEventObserver predicate '${diskPredicate.name}' (${diskPredicate.uuid}) was being used but is now inactive, removing for re-regristration` + ); + await removePredicate(diskPredicate, observer, chainhook); + break; + } + } + } + + logger.info(`ChainhookEventObserver registering predicate '${pendingPredicate.name}'`); + try { + // Add the `uuid` and `then_that` portions to the predicate. + const thenThat: ThenThatHttpPost = { + http_post: { + url: `${observer.external_base_url}/payload`, + authorization_header: `Bearer ${observer.auth_token}`, + }, + }; + const newPredicate = pendingPredicate as Predicate; + newPredicate.uuid = randomUUID(); + if (newPredicate.networks.mainnet) newPredicate.networks.mainnet.then_that = thenThat; + if (newPredicate.networks.testnet) newPredicate.networks.testnet.then_that = thenThat; + + const path = observer.node_type === 'chainhook' ? `/v1/chainhooks` : `/v1/observers`; + await request(`${chainhook.base_url}${path}`, { + method: 'POST', + body: JSON.stringify(newPredicate), + headers: { 'content-type': 'application/json' }, + throwOnError: true, + }); + logger.info( + `ChainhookEventObserver registered '${newPredicate.name}' predicate (${newPredicate.uuid})` + ); + savePredicateToDisk(observer.predicate_disk_file_path, newPredicate); + RegisteredPredicates.set(newPredicate.name, newPredicate); + } catch (error) { + logger.error(error, `ChainhookEventObserver unable to register predicate`); + } +} + +/** Removes a predicate from the Chainhook server */ +async function removePredicate( + predicate: Predicate, + observer: EventObserverOptions, + chainhook: ChainhookNodeOptions +): Promise { + const nodeType = observer.node_type ?? 'chainhook'; + const path = + nodeType === 'chainhook' + ? `/v1/chainhooks/${predicate.chain}/${encodeURIComponent(predicate.uuid)}` + : `/v1/observers/${encodeURIComponent(predicate.uuid)}`; + try { + await request(`${chainhook.base_url}${path}`, { + method: 'DELETE', + headers: { 'content-type': 'application/json' }, + throwOnError: true, + }); + logger.info(`ChainhookEventObserver removed predicate '${predicate.name}' (${predicate.uuid})`); + deletePredicateFromDisk(observer.predicate_disk_file_path, predicate); + } catch (error) { + logger.error(error, `ChainhookEventObserver unable to deregister predicate`); + } +} + +/** Registers predicates with the Chainhook server when our event observer is booting up */ +export async function registerAllPredicatesOnObserverReady( + predicates: EventObserverPredicate[], + observer: EventObserverOptions, + chainhook: ChainhookNodeOptions +) { + logger.info(predicates, `ChainhookEventObserver connected to ${chainhook.base_url}`); + if (predicates.length === 0) { + logger.info(`ChainhookEventObserver does not have predicates to register`); + return; + } + const diskPredicates = recallPersistedPredicatesFromDisk(observer.predicate_disk_file_path); + for (const predicate of predicates) + await registerPredicate(predicate, diskPredicates, observer, chainhook); +} + +/** Removes predicates from the Chainhook server when our event observer is being closed */ +export async function removeAllPredicatesOnObserverClose( + observer: EventObserverOptions, + chainhook: ChainhookNodeOptions +) { + const diskPredicates = recallPersistedPredicatesFromDisk(observer.predicate_disk_file_path); + if (diskPredicates.size === 0) { + logger.info(`ChainhookEventObserver does not have predicates to close`); + return; + } + logger.info(`ChainhookEventObserver closing predicates at ${chainhook.base_url}`); + const removals = [...RegisteredPredicates.values()].map(predicate => + removePredicate(predicate, observer, chainhook) + ); + await Promise.allSettled(removals); + RegisteredPredicates.clear(); +} + +export async function predicateHealthCheck( + observer: EventObserverOptions, + chainhook: ChainhookNodeOptions +): Promise { + logger.debug(`ChainhookEventObserver performing predicate health check`); + for (const predicate of RegisteredPredicates.values()) { + // This will be a no-op if the predicate is already active. + await registerPredicate(predicate, RegisteredPredicates, observer, chainhook); + } +} diff --git a/components/client/typescript/src/schemas/predicate.ts b/components/client/typescript/src/schemas/predicate.ts index c6e8badf7..13aadd141 100644 --- a/components/client/typescript/src/schemas/predicate.ts +++ b/components/client/typescript/src/schemas/predicate.ts @@ -31,14 +31,10 @@ export type PredicateHeader = Static; export const PredicateSchema = Type.Composite([ PredicateHeaderSchema, Type.Object({ - networks: Type.Union([ - Type.Object({ - mainnet: Type.Union([BitcoinIfThisThenThatSchema, StacksIfThisThenThatSchema]), - }), - Type.Object({ - testnet: Type.Union([BitcoinIfThisThenThatSchema, StacksIfThisThenThatSchema]), - }), - ]), + networks: Type.Object({ + mainnet: Type.Optional(Type.Union([BitcoinIfThisThenThatSchema, StacksIfThisThenThatSchema])), + testnet: Type.Optional(Type.Union([BitcoinIfThisThenThatSchema, StacksIfThisThenThatSchema])), + }), }), ]); export type Predicate = Static; diff --git a/components/client/typescript/src/server.ts b/components/client/typescript/src/server.ts index 638da23bd..4963bd0af 100644 --- a/components/client/typescript/src/server.ts +++ b/components/client/typescript/src/server.ts @@ -12,47 +12,15 @@ import { logger, PINO_CONFIG } from './util/logger'; import { timeout } from './util/helpers'; import { Payload, PayloadSchema } from './schemas/payload'; import { - Predicate, - PredicateHeaderSchema, - SerializedPredicate, - SerializedPredicateResponse, - ThenThatHttpPost, -} from './schemas/predicate'; -import { BitcoinIfThisOptionsSchema, BitcoinIfThisSchema } from './schemas/bitcoin/if_this'; -import { StacksIfThisOptionsSchema, StacksIfThisSchema } from './schemas/stacks/if_this'; - -/** Function type for a Chainhook event callback */ -export type OnEventCallback = (uuid: string, payload: Payload) => Promise; - -const ServerOptionsSchema = Type.Object({ - hostname: Type.String(), - port: Type.Integer(), - auth_token: Type.String(), - external_base_url: Type.String(), - - /** Wait for the chainhook node to be available before submitting predicates */ - wait_for_chainhook_node: Type.Optional(Type.Boolean({ default: true })), - /** Validate the JSON schema of received chainhook payloads and report errors when invalid */ - validate_chainhook_payloads: Type.Optional(Type.Boolean({ default: false })), - /** Validate the authorization token sent by the server is correct. */ - validate_token_authorization: Type.Optional(Type.Boolean({ default: true })), - /** Size limit for received chainhook payloads (default 40MB) */ - body_limit: Type.Optional(Type.Number({ default: 41943040 })), - /** Node type: `chainhook` or `ordhook` */ - node_type: Type.Optional( - Type.Union([Type.Literal('chainhook'), Type.Literal('ordhook')], { - default: 'chainhook', - }) - ), -}); -/** Local event server connection and authentication options */ -export type ServerOptions = Static; - -const ChainhookNodeOptionsSchema = Type.Object({ - base_url: Type.String(), -}); -/** Chainhook node connection options */ -export type ChainhookNodeOptions = Static; + registerAllPredicatesOnObserverReady, + removeAllPredicatesOnObserverClose, +} from './predicates'; +import { + ChainhookNodeOptions, + EventObserverOptions, + EventObserverPredicate, + OnPredicatePayloadCallback, +} from '.'; /** * Throw this error when processing a Chainhook Payload if you believe it is a bad request. This @@ -65,172 +33,37 @@ export class BadPayloadRequestError extends Error { } } -const IfThisThenNothingSchema = Type.Union([ - Type.Composite([ - BitcoinIfThisOptionsSchema, - Type.Object({ - if_this: BitcoinIfThisSchema, - }), - ]), - Type.Composite([ - StacksIfThisOptionsSchema, - Type.Object({ - if_this: StacksIfThisSchema, - }), - ]), -]); -const ServerPredicateSchema = Type.Composite([ - PredicateHeaderSchema, - Type.Object({ - networks: Type.Union([ - Type.Object({ - mainnet: IfThisThenNothingSchema, - }), - Type.Object({ - testnet: IfThisThenNothingSchema, - }), - ]), - }), -]); -/** Chainhook predicates registerable by the local event server */ -export type ServerPredicate = Static; - /** * Build the Chainhook Fastify event server. - * @param serverOpts - Server options - * @param chainhookOpts - Chainhook node options + * @param observer - Event observer options + * @param chainhook - Chainhook node options * @param predicates - Predicates to register * @param callback - Event callback function * @returns Fastify instance */ export async function buildServer( - serverOpts: ServerOptions, - chainhookOpts: ChainhookNodeOptions, - predicates: ServerPredicate[], - callback: OnEventCallback + observer: EventObserverOptions, + chainhook: ChainhookNodeOptions, + predicates: EventObserverPredicate[], + callback: OnPredicatePayloadCallback ) { async function waitForNode(this: FastifyInstance) { - logger.info(`ChainhookEventObserver looking for chainhook node at ${chainhookOpts.base_url}`); + logger.info(`ChainhookEventObserver looking for chainhook node at ${chainhook.base_url}`); while (true) { try { - await request(`${chainhookOpts.base_url}/ping`, { method: 'GET', throwOnError: true }); + await request(`${chainhook.base_url}/ping`, { method: 'GET', throwOnError: true }); break; } catch (error) { - logger.error(error, 'Chainhook node not available, retrying...'); + logger.error(error, 'ChainhookEventObserver chainhook node not available, retrying...'); await timeout(1000); } } } - async function isPredicateActive(predicate: ServerPredicate): Promise { - try { - const result = await request(`${chainhookOpts.base_url}/v1/chainhooks/${predicate.uuid}`, { - method: 'GET', - headers: { accept: 'application/json' }, - throwOnError: true, - }); - const response = (await result.body.json()) as SerializedPredicateResponse; - if (response.status == 404) return undefined; - if ( - response.result.enabled == false || - response.result.status.type == 'interrupted' || - response.result.status.type == 'unconfirmed_expiration' || - response.result.status.type == 'confirmed_expiration' - ) { - return false; - } - return true; - } catch (error) { - logger.error( - error, - `ChainhookEventObserver unable to check if predicate ${predicate.uuid} is active` - ); - return false; - } - } - - async function registerAllPredicates(this: FastifyInstance) { - logger.info(predicates, `ChainhookEventObserver connected to ${chainhookOpts.base_url}`); - if (predicates.length === 0) { - logger.info(`ChainhookEventObserver does not have predicates to register`); - return; - } - const nodeType = serverOpts.node_type ?? 'chainhook'; - const path = nodeType === 'chainhook' ? `/v1/chainhooks` : `/v1/observers`; - const registerUrl = `${chainhookOpts.base_url}${path}`; - for (const predicate of predicates) { - if (nodeType === 'chainhook') { - switch (await isPredicateActive(predicate)) { - case undefined: - // Predicate doesn't exist. - break; - case true: - logger.info( - `ChainhookEventObserver predicate ${predicate.uuid} is already active, skipping registration` - ); - continue; - case false: - logger.info( - `ChainhookEventObserver predicate ${predicate.uuid} was being used but is now inactive, removing for re-regristration` - ); - await removePredicate(predicate); - } - } - logger.info(`ChainhookEventObserver registering predicate ${predicate.uuid}`); - const thenThat: ThenThatHttpPost = { - http_post: { - url: `${serverOpts.external_base_url}/payload`, - authorization_header: `Bearer ${serverOpts.auth_token}`, - }, - }; - try { - const body = predicate as Predicate; - if ('mainnet' in body.networks) body.networks.mainnet.then_that = thenThat; - if ('testnet' in body.networks) body.networks.testnet.then_that = thenThat; - await request(registerUrl, { - method: 'POST', - body: JSON.stringify(body), - headers: { 'content-type': 'application/json' }, - throwOnError: true, - }); - } catch (error) { - logger.error(error, `ChainhookEventObserver unable to register predicate`); - } - } - } - - async function removePredicate(predicate: ServerPredicate): Promise { - const nodeType = serverOpts.node_type ?? 'chainhook'; - const path = - nodeType === 'chainhook' - ? `/v1/chainhooks/${predicate.chain}/${encodeURIComponent(predicate.uuid)}` - : `/v1/observers/${encodeURIComponent(predicate.uuid)}`; - try { - await request(`${chainhookOpts.base_url}${path}`, { - method: 'DELETE', - headers: { 'content-type': 'application/json' }, - throwOnError: true, - }); - logger.info(`ChainhookEventObserver removed predicate ${predicate.uuid}`); - } catch (error) { - logger.error(error, `ChainhookEventObserver unable to deregister predicate`); - } - } - - async function removeAllPredicates(this: FastifyInstance) { - if (predicates.length === 0) { - logger.info(`ChainhookEventObserver does not have predicates to close`); - return; - } - logger.info(`ChainhookEventObserver closing predicates at ${chainhookOpts.base_url}`); - const removals = predicates.map(predicate => removePredicate(predicate)); - await Promise.allSettled(removals); - } - async function isEventAuthorized(request: FastifyRequest, reply: FastifyReply) { - if (!(serverOpts.validate_token_authorization ?? true)) return; + if (!(observer.validate_token_authorization ?? true)) return; const authHeader = request.headers.authorization; - if (authHeader && authHeader === `Bearer ${serverOpts.auth_token}`) { + if (authHeader && authHeader === `Bearer ${observer.auth_token}`) { return; } await reply.code(403).send(); @@ -241,23 +74,22 @@ export async function buildServer( Server, TypeBoxTypeProvider > = (fastify, options, done) => { - const compiledPayloadSchema = TypeCompiler.Compile(PayloadSchema); + const CompiledPayloadSchema = TypeCompiler.Compile(PayloadSchema); fastify.addHook('preHandler', isEventAuthorized); fastify.post('/payload', async (request, reply) => { if ( - (serverOpts.validate_chainhook_payloads ?? false) && - !compiledPayloadSchema.Check(request.body) + (observer.validate_chainhook_payloads ?? false) && + !CompiledPayloadSchema.Check(request.body) ) { logger.error( - [...compiledPayloadSchema.Errors(request.body)], + [...CompiledPayloadSchema.Errors(request.body)], `ChainhookEventObserver received an invalid payload` ); await reply.code(422).send(); return; } - const body = request.body as Payload; try { - await callback(body.chainhook.uuid, body); + await callback(request.body as Payload); await reply.code(200).send(); } catch (error) { if (error instanceof BadPayloadRequestError) { @@ -276,14 +108,14 @@ export async function buildServer( trustProxy: true, logger: PINO_CONFIG, pluginTimeout: 0, // Disable so ping can retry indefinitely - bodyLimit: serverOpts.body_limit ?? 41943040, // 40MB default + bodyLimit: observer.body_limit ?? 41943040, // 40MB default }).withTypeProvider(); - if (serverOpts.wait_for_chainhook_node ?? true) { - fastify.addHook('onReady', waitForNode); - } - fastify.addHook('onReady', registerAllPredicates); - fastify.addHook('onClose', removeAllPredicates); + if (observer.wait_for_chainhook_node ?? true) fastify.addHook('onReady', waitForNode); + fastify.addHook('onReady', async () => + registerAllPredicatesOnObserverReady(predicates, observer, chainhook) + ); + fastify.addHook('onClose', async () => removeAllPredicatesOnObserverClose(observer, chainhook)); await fastify.register(ChainhookEventObserver); return fastify; diff --git a/components/client/typescript/tests/predicates.test.ts b/components/client/typescript/tests/predicates.test.ts new file mode 100644 index 000000000..b6fc01c23 --- /dev/null +++ b/components/client/typescript/tests/predicates.test.ts @@ -0,0 +1,173 @@ +import * as fs from 'fs'; +import * as path from 'path'; +import { Interceptable, MockAgent, setGlobalDispatcher } from 'undici'; +import { ChainhookEventObserver, EventObserverOptions, EventObserverPredicate } from '../src'; +import { recallPersistedPredicatesFromDisk, savePredicateToDisk } from '../src/predicates'; + +function deletePredicates(dir: string) { + const files = fs.readdirSync(dir); + for (const file of files) { + const filePath = path.join(dir, file); + const stat = fs.statSync(filePath); + if (stat.isFile() && file.endsWith('.json')) fs.unlinkSync(filePath); + } +} + +describe('predicates', () => { + let mockAgent: MockAgent; + let mockClient: Interceptable; + let server: ChainhookEventObserver; + let observer: EventObserverOptions; + + const testPredicate: EventObserverPredicate = { + name: 'test', + version: 1, + chain: 'stacks', + networks: { + mainnet: { + if_this: { + scope: 'block_height', + higher_than: 1, + }, + }, + }, + }; + + beforeEach(() => { + mockAgent = new MockAgent(); + mockAgent.disableNetConnect(); + mockClient = mockAgent.get('http://127.0.0.1:20456'); + mockClient + .intercept({ + path: '/ping', + method: 'GET', + }) + .reply(200); + setGlobalDispatcher(mockAgent); + observer = { + hostname: '0.0.0.0', + port: 3999, + auth_token: 'token', + external_base_url: 'http://myserver.com', + wait_for_chainhook_node: true, + validate_chainhook_payloads: false, + predicate_disk_file_path: './tmp', + node_type: 'chainhook', + }; + server = new ChainhookEventObserver(observer, { + base_url: 'http://127.0.0.1:20456', + }); + deletePredicates(observer.predicate_disk_file_path); + }); + + afterEach(async () => { + mockClient + .intercept({ + path: /\/v1\/chainhooks\/stacks\/(.*)/, + method: 'DELETE', + }) + .reply(200); + await server.close(); + await mockAgent.close(); + }); + + test('registers and persists new predicate to disk', async () => { + mockClient + .intercept({ + path: '/v1/chainhooks', + method: 'POST', + }) + .reply(200); + + expect(fs.existsSync(`${observer.predicate_disk_file_path}/predicate-test.json`)).toBe(false); + await server.start([testPredicate], async () => {}); + + expect(fs.existsSync(`${observer.predicate_disk_file_path}/predicate-test.json`)).toBe(true); + const disk = recallPersistedPredicatesFromDisk(observer.predicate_disk_file_path); + const storedPredicate = disk.get('test'); + expect(storedPredicate).not.toBeUndefined(); + expect(storedPredicate?.name).toBe(testPredicate.name); + expect(storedPredicate?.version).toBe(testPredicate.version); + expect(storedPredicate?.chain).toBe(testPredicate.chain); + expect(storedPredicate?.networks.mainnet).toStrictEqual(testPredicate.networks.mainnet); + expect(storedPredicate?.networks.mainnet?.then_that).toStrictEqual({ + http_post: { + authorization_header: 'Bearer token', + url: 'http://myserver.com/payload', + }, + }); + expect(storedPredicate?.uuid).not.toBeUndefined(); + + mockAgent.assertNoPendingInterceptors(); + }); + + describe('pre-stored', () => { + beforeEach(() => { + savePredicateToDisk(observer.predicate_disk_file_path, { + uuid: 'e2777d77-473a-4c1d-9012-152deb36bf4c', + name: 'test', + version: 1, + chain: 'stacks', + networks: { + mainnet: { + if_this: { + scope: 'block_height', + higher_than: 1, + }, + then_that: { + http_post: { + url: 'http://test', + authorization_header: 'Bearer x', + }, + }, + }, + }, + }); + expect(fs.existsSync(`${observer.predicate_disk_file_path}/predicate-test.json`)).toBe(true); + }); + + test('resumes active predicate', async () => { + mockClient + .intercept({ + path: '/v1/chainhooks/e2777d77-473a-4c1d-9012-152deb36bf4c', + method: 'GET', + }) + .reply(200, { result: { enabled: true, status: { type: 'scanning' } }, status: 200 }); + + await server.start([testPredicate], async () => {}); + + mockAgent.assertNoPendingInterceptors(); + expect(fs.existsSync(`${observer.predicate_disk_file_path}/predicate-test.json`)).toBe(true); + }); + + test('re-registers dead predicate', async () => { + mockClient + .intercept({ + path: '/v1/chainhooks/e2777d77-473a-4c1d-9012-152deb36bf4c', + method: 'GET', + }) + .reply(200, { result: { enabled: true, status: { type: 'interrupted' } }, status: 200 }); + mockClient + .intercept({ + path: '/v1/chainhooks/stacks/e2777d77-473a-4c1d-9012-152deb36bf4c', + method: 'DELETE', + }) + .reply(200); + mockClient + .intercept({ + path: '/v1/chainhooks', + method: 'POST', + }) + .reply(200); + + await server.start([testPredicate], async () => {}); + + mockAgent.assertNoPendingInterceptors(); + expect(fs.existsSync(`${observer.predicate_disk_file_path}/predicate-test.json`)).toBe(true); + const disk = recallPersistedPredicatesFromDisk(observer.predicate_disk_file_path); + const storedPredicate = disk.get('test'); + // Should have a different uuid + expect(storedPredicate?.uuid).not.toBe('e2777d77-473a-4c1d-9012-152deb36bf4c'); + }); + }); +}); diff --git a/components/client/typescript/tsconfig.build.json b/components/client/typescript/tsconfig.build.json index 8dbb1002d..2ee4d38c8 100644 --- a/components/client/typescript/tsconfig.build.json +++ b/components/client/typescript/tsconfig.build.json @@ -1,5 +1,6 @@ { "extends": "./tsconfig.json", "exclude": [ + "./tests/**/*.ts" ] } diff --git a/components/client/typescript/tsconfig.json b/components/client/typescript/tsconfig.json index e82cef2b5..b525e5980 100644 --- a/components/client/typescript/tsconfig.json +++ b/components/client/typescript/tsconfig.json @@ -18,5 +18,6 @@ }, "include": [ "./src/**/*.ts", + "./tests/**/*.ts", ], }