From 7f35f59b6dec8dbcfa8ea4505cab4955045beaa5 Mon Sep 17 00:00:00 2001 From: "Rob Moore (MakerX)" Date: Sat, 30 Mar 2024 01:46:06 +0800 Subject: [PATCH 1/2] feat: Dynamic subscription filters --- .../index.ts | 214 +++++++++++++++ src/dynamic-subscriber.ts | 135 ++++++++++ src/index.ts | 1 + src/subscriber.ts | 73 ++++-- src/subscriptions.ts | 247 ++++++++++++------ src/types/subscription.ts | 45 +++- 6 files changed, 606 insertions(+), 109 deletions(-) create mode 100644 examples/data-history-museum-with-ownership/index.ts create mode 100644 src/dynamic-subscriber.ts diff --git a/examples/data-history-museum-with-ownership/index.ts b/examples/data-history-museum-with-ownership/index.ts new file mode 100644 index 0000000..0d287ee --- /dev/null +++ b/examples/data-history-museum-with-ownership/index.ts @@ -0,0 +1,214 @@ +import * as algokit from '@algorandfoundation/algokit-utils' +import { TransactionResult } from '@algorandfoundation/algokit-utils/types/indexer' +import algosdk from 'algosdk' +import fs from 'fs' +import path from 'path' +import { DynamicAlgorandSubscriber } from '../../src' +import TransactionType = algosdk.TransactionType + +if (!fs.existsSync(path.join(__dirname, '..', '..', '.env')) && !process.env.ALGOD_SERVER) { + // eslint-disable-next-line no-console + console.error('Copy /.env.sample to /.env before starting the application.') + process.exit(1) +} + +interface DHMAsset { + id: number + name: string + unit: string + mediaUrl: string + metadata: Record + created: string + lastModified: string + owner: string + ownerModified: string +} + +interface DHMFilterState { + assetIds: number[] +} + +async function getDHMSubscriber() { + const algod = await algokit.getAlgoClient() + const indexer = await algokit.getAlgoIndexerClient() + const subscriber = new DynamicAlgorandSubscriber( + { + maxIndexerRoundsToSync: 10_000_000, + dynamicFilters: async (filterState, pollLevel) => [ + ...(pollLevel === 0 + ? [ + { + name: 'dhm-asset', + filter: { + type: TransactionType.acfg, + // Data History Museum creator accounts + sender: (await algokit.isTestNet(algod)) + ? 'ER7AMZRPD5KDVFWTUUVOADSOWM4RQKEEV2EDYRVSA757UHXOIEKGMBQIVU' + : 'EHYQCYHUC6CIWZLBX5TDTLVJ4SSVE4RRTMKFDCG4Z4Q7QSQ2XWIQPMKBPU', + }, + }, + ] + : []), + ...(filterState.assetIds.length > 0 + ? [ + { + name: 'dhm-ownership-change', + filter: { + type: TransactionType.axfer, + assetId: filterState.assetIds, + minAmount: 1, + }, + }, + ] + : []), + ], + filterStatePersistence: { + get: getFilterState, + set: saveFilterState, + }, + frequencyInSeconds: 1, + maxRoundsToSync: 500, + syncBehaviour: 'catchup-with-indexer', + watermarkPersistence: { + get: getLastWatermark, + set: saveWatermark, + }, + }, + algod, + indexer, + ) + subscriber.onBatch('dhm-asset', async (events) => { + // eslint-disable-next-line no-console + console.log(`Received ${events.length} asset changes (${events.filter((t) => t['created-asset-index']).length} new assets)`) + + // Append any new asset ids to the filter state so ownership is picked up of them + subscriber.appendFilterState({ assetIds: events.filter((e) => e['created-asset-index']).map((e) => e['created-asset-index']!) }) + }) + subscriber.onBatch('dhm-ownership-change', async (events) => { + // eslint-disable-next-line no-console + console.log(`Received ${events.length} ownership changes`) + }) + subscriber.onPoll(async (pollMetadata) => { + // Save all of the Data History Museum Verifiably Authentic Digital Historical Artifacts + await saveDHMTransactions(pollMetadata.subscribedTransactions) + }) + return subscriber +} + +function getArc69Metadata(t: TransactionResult) { + let metadata = {} + try { + if (t.note && t.note.startsWith('ey')) metadata = JSON.parse(Buffer.from(t.note, 'base64').toString('utf-8')) + // eslint-disable-next-line no-empty + } catch (e) {} + return metadata +} + +async function saveDHMTransactions(transactions: TransactionResult[]) { + const assets = await getSavedTransactions('dhm-assets.json') + + for (const t of transactions) { + if (t['created-asset-index']) { + assets.push({ + id: t['created-asset-index'], + name: t['asset-config-transaction']!.params!.name!, + unit: t['asset-config-transaction']!.params!['unit-name']!, + mediaUrl: t['asset-config-transaction']!.params!.url!, + metadata: getArc69Metadata(t), + created: new Date(t['round-time']! * 1000).toISOString(), + lastModified: new Date(t['round-time']! * 1000).toISOString(), + owner: t.sender, + ownerModified: new Date(t['round-time']! * 1000).toISOString(), + }) + } else if (t['asset-config-transaction']) { + const asset = assets.find((a) => a.id === t['asset-config-transaction']!['asset-id']) + if (!asset) { + // eslint-disable-next-line no-console + console.error(t) + throw new Error(`Unable to find existing asset data for ${t['asset-config-transaction']!['asset-id']}`) + } + if (!t['asset-config-transaction']!.params) { + // Asset was deleted, remove it + assets.splice(assets.indexOf(asset), 1) + } else { + asset!.metadata = getArc69Metadata(t) + asset!.lastModified = new Date(t['round-time']! * 1000).toISOString() + } + } else if (t['asset-transfer-transaction']) { + const asset = assets.find((a) => a.id === t['asset-transfer-transaction']!['asset-id']) + if (!asset) { + // eslint-disable-next-line no-console + console.error(t) + throw new Error(`Unable to find existing asset data for ${t['asset-transfer-transaction']!['asset-id']}`) + } + if (t['asset-transfer-transaction'].amount > 0) { + asset.owner = t['asset-transfer-transaction']!.receiver + asset.ownerModified = new Date(t['round-time']! * 1000).toISOString() + } + } + } + + await saveTransactions(assets, 'dhm-assets.json') +} + +// Basic methods that persist using filesystem - for illustrative purposes only + +async function saveFilterState(state: DHMFilterState) { + fs.writeFileSync(path.join(__dirname, 'filters.json'), JSON.stringify(state), { encoding: 'utf-8' }) +} + +async function getFilterState(): Promise { + if (!fs.existsSync(path.join(__dirname, 'filters.json'))) return { assetIds: [] } + const existing = fs.readFileSync(path.join(__dirname, 'filters.json'), 'utf-8') + const existingData = JSON.parse(existing) as DHMFilterState + // eslint-disable-next-line no-console + console.log(`Found existing filter state in filters.json; syncing with ${existingData.assetIds.length} assets`) + return existingData +} + +async function saveWatermark(watermark: number) { + fs.writeFileSync(path.join(__dirname, 'watermark.txt'), watermark.toString(), { encoding: 'utf-8' }) +} + +async function getLastWatermark(): Promise { + if (!fs.existsSync(path.join(__dirname, 'watermark.txt'))) return 15_000_000 + const existing = fs.readFileSync(path.join(__dirname, 'watermark.txt'), 'utf-8') + // eslint-disable-next-line no-console + console.log(`Found existing sync watermark in watermark.txt; syncing from ${existing}`) + return Number(existing) +} + +async function getSavedTransactions(fileName: string): Promise { + const existing = fs.existsSync(path.join(__dirname, fileName)) + ? (JSON.parse(fs.readFileSync(path.join(__dirname, fileName), 'utf-8')) as T[]) + : [] + return existing +} + +async function saveTransactions(transactions: unknown[], fileName: string) { + fs.writeFileSync(path.join(__dirname, fileName), JSON.stringify(transactions, undefined, 2), { encoding: 'utf-8' }) + // eslint-disable-next-line no-console + console.log(`Saved ${transactions.length} transactions to ${fileName}`) +} + +// eslint-disable-next-line no-console +process.on('uncaughtException', (e) => console.error(e)) +;(async () => { + const subscriber = await getDHMSubscriber() + + if (process.env.RUN_LOOP === 'true') { + subscriber.start() + ;['SIGINT', 'SIGTERM', 'SIGQUIT'].forEach((signal) => + process.on(signal, () => { + // eslint-disable-next-line no-console + console.log(`Received ${signal}; stopping subscriber...`) + subscriber.stop(signal) + }), + ) + } else { + await subscriber.pollOnce() + } +})().catch((e) => { + // eslint-disable-next-line no-console + console.error(e) +}) diff --git a/src/dynamic-subscriber.ts b/src/dynamic-subscriber.ts new file mode 100644 index 0000000..13c1550 --- /dev/null +++ b/src/dynamic-subscriber.ts @@ -0,0 +1,135 @@ +import * as algokit from '@algorandfoundation/algokit-utils' +import algosdk from 'algosdk' +import { AlgorandSubscriber } from './subscriber' +import { + getAlgodSubscribedTransactions, + getArc28EventsToProcess, + getIndexerCatchupTransactions, + prepareSubscriptionPoll, + processExtraSubscriptionTransactionFields, +} from './subscriptions' +import type { + DynamicAlgorandSubscriberConfig, + NamedTransactionFilter, + SubscribedTransaction, + TransactionSubscriptionResult, +} from './types/subscription' +import Algodv2 = algosdk.Algodv2 +import Indexer = algosdk.Indexer + +export class DynamicAlgorandSubscriber extends AlgorandSubscriber { + private pendingStateChanges: { action: 'append' | 'delete' | 'set'; stateChange: Partial }[] = [] + private dynamicConfig: DynamicAlgorandSubscriberConfig + + constructor(config: DynamicAlgorandSubscriberConfig, algod: Algodv2, indexer?: Indexer) { + super( + { + filters: [], + ...config, + }, + algod, + indexer, + ) + this.dynamicConfig = config + } + + protected override async _pollOnce(watermark: number): Promise { + let subscribedTransactions: SubscribedTransaction[] = [] + let filterState: T = await this.dynamicConfig.filterStatePersistence.get() + + const subscribe = async (filters: NamedTransactionFilter[]) => { + if (filters.length === 0) return [] + const catchupTransactions = await getIndexerCatchupTransactions(filters, pollMetadata, arc28EventsToProcess, this.indexer) + const algodTransactions = await getAlgodSubscribedTransactions(filters, pollMetadata, arc28EventsToProcess) + const subscribedTransactions = catchupTransactions + .concat(algodTransactions) + .map((t) => processExtraSubscriptionTransactionFields(t, arc28EventsToProcess, this.config.arc28Events ?? [])) + await this._processFilters({ subscribedTransactions, ...pollMetadata }) + return subscribedTransactions + } + + const filters = await this.dynamicConfig.dynamicFilters(filterState, 0, watermark) + this.filterNames = filters + .map((f) => f.name) + .filter((value, index, self) => { + // Remove duplicates + return self.findIndex((x) => x === value) === index + }) + const pollMetadata = await prepareSubscriptionPoll({ ...this.config, watermark, filters }, this.algod) + const arc28EventsToProcess = getArc28EventsToProcess(this.config.arc28Events ?? []) + + subscribedTransactions = await subscribe(filters) + + let pollLevel = 0 + while (this.pendingStateChanges.length > 0) { + const stateChangeCount = this.pendingStateChanges.length + let filterStateToProcess = { ...filterState } + for (const change of this.pendingStateChanges) { + switch (change.action) { + case 'append': + for (const key of Object.keys(change.stateChange)) { + const k = key as keyof T + if (!filterState[k] || !Array.isArray(filterState[k])) { + filterState[k] = change.stateChange[k]! + } else { + filterState[k] = (filterState[k] as unknown[]).concat(change.stateChange[k]) as T[keyof T] + } + } + filterStateToProcess = { ...filterStateToProcess, ...change.stateChange } + break + case 'delete': + for (const key of Object.keys(change.stateChange)) { + const k = key as keyof T + delete filterState[k] + delete filterStateToProcess[k] + } + break + case 'set': + filterState = { ...filterState, ...change.stateChange } + filterStateToProcess = { ...filterState, ...change.stateChange } + break + } + } + this.pendingStateChanges = [] + const newFilters = await this.dynamicConfig.dynamicFilters(filterStateToProcess, ++pollLevel, watermark) + this.filterNames = newFilters + .map((f) => f.name) + .filter((value, index, self) => { + // Remove duplicates + return self.findIndex((x) => x === value) === index + }) + + algokit.Config.logger.debug( + `Poll level ${pollLevel}: Found ${stateChangeCount} pending state changes and applied them to get ${newFilters.length} filters; syncing...`, + ) + + subscribedTransactions = subscribedTransactions.concat(await subscribe(newFilters)) + } + + await this.dynamicConfig.filterStatePersistence.set(filterState) + + return { + syncedRoundRange: pollMetadata.syncedRoundRange, + newWatermark: pollMetadata.newWatermark, + currentRound: pollMetadata.currentRound, + subscribedTransactions: subscribedTransactions.sort( + (a, b) => a['confirmed-round']! - b['confirmed-round']! || a['intra-round-offset']! - b['intra-round-offset']!, + ), + } + } + + appendFilterState(stateChange: Partial) { + this.pendingStateChanges.push({ action: 'append', stateChange }) + } + + deleteFilterState(stateChange: (keyof T)[]) { + this.pendingStateChanges.push({ + action: 'delete', + stateChange: stateChange.reduce((acc, key) => ({ ...acc, [key]: true }), {} as Partial), + }) + } + + setFilterState(stateChange: Partial) { + this.pendingStateChanges.push({ action: 'set', stateChange }) + } +} diff --git a/src/index.ts b/src/index.ts index afe6820..631b8e6 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,2 +1,3 @@ +export * from './dynamic-subscriber' export * from './subscriber' export * from './subscriptions' diff --git a/src/subscriber.ts b/src/subscriber.ts index 07e5800..a570be7 100644 --- a/src/subscriber.ts +++ b/src/subscriber.ts @@ -4,7 +4,7 @@ import { getSubscribedTransactions } from './subscriptions' import { AsyncEventEmitter, AsyncEventListener } from './types/async-event-emitter' import type { AlgorandSubscriberConfig, - BeforePollMetadata, + BeforeSubscriptionPollMetadata, SubscribedTransaction, TransactionSubscriptionResult, TypedAsyncEventListener, @@ -17,14 +17,14 @@ import Indexer = algosdk.Indexer * Handles the logic for subscribing to the Algorand blockchain and emitting events. */ export class AlgorandSubscriber { - private algod: Algodv2 - private indexer: Indexer | undefined - private config: AlgorandSubscriberConfig - private abortController: AbortController - private eventEmitter: AsyncEventEmitter - private started: boolean = false - private startPromise: Promise | undefined - private filterNames: string[] + protected algod: Algodv2 + protected indexer: Indexer | undefined + protected config: AlgorandSubscriberConfig + protected abortController: AbortController + protected eventEmitter: AsyncEventEmitter + protected started: boolean = false + protected startPromise: Promise | undefined + protected filterNames: string[] /** * Create a new `AlgorandSubscriber`. @@ -51,31 +51,20 @@ export class AlgorandSubscriber { } } - /** - * Execute a single subscription poll. - * - * This is useful when executing in the context of a process - * triggered by a recurring schedule / cron. - * @returns The poll result - */ - async pollOnce(): Promise { - const watermark = await this.config.watermarkPersistence.get() - - const currentRound = (await this.algod.status().do())['last-round'] as number - await this.eventEmitter.emitAsync('before:poll', { - watermark, - currentRound, - } satisfies BeforePollMetadata) - - const pollResult = await getSubscribedTransactions( + /** Perform a single subscribe for a given watermark and subscription config */ + protected async _pollSubscribe(watermark: number, config: AlgorandSubscriberConfig): Promise { + return await getSubscribedTransactions( { watermark, - ...this.config, + ...config, }, this.algod, this.indexer, ) + } + /** Process the filters and event emittance for the result of a single poll */ + protected async _processFilters(pollResult: TransactionSubscriptionResult) { try { for (const filterName of this.filterNames) { const mapper = this.config.filters.find((f) => f.name === filterName)?.mapper @@ -91,6 +80,34 @@ export class AlgorandSubscriber { algokit.Config.logger.error(`Error processing event emittance`, e) throw e } + } + + /** Perform a single poll for the given watermark */ + protected async _pollOnce(watermark: number): Promise { + const pollResult = await this._pollSubscribe(watermark, this.config) + await this._processFilters(pollResult) + + return pollResult + } + + /** + * Execute a single subscription poll. + * + * This is useful when executing in the context of a process + * triggered by a recurring schedule / cron. + * @returns The poll result + */ + async pollOnce(): Promise { + const watermark = await this.config.watermarkPersistence.get() + + const currentRound = (await this.algod.status().do())['last-round'] as number + await this.eventEmitter.emitAsync('before:poll', { + watermark, + currentRound, + } satisfies BeforeSubscriptionPollMetadata) + + const pollResult = await this._pollOnce(watermark) + await this.config.watermarkPersistence.set(pollResult.newWatermark) return pollResult } @@ -217,7 +234,7 @@ export class AlgorandSubscriber { * @param listener The listener function to invoke with the pre-poll metadata * @returns The subscriber so `on*` calls can be chained */ - onBeforePoll(listener: TypedAsyncEventListener) { + onBeforePoll(listener: TypedAsyncEventListener) { this.eventEmitter.on('before:poll', listener as AsyncEventListener) return this } diff --git a/src/subscriptions.ts b/src/subscriptions.ts index 9fbeb7d..938d63b 100644 --- a/src/subscriptions.ts +++ b/src/subscriptions.ts @@ -17,6 +17,7 @@ import type { Arc28EventGroup, Arc28EventToProcess, EmittedArc28Event } from './ import type { TransactionInBlock } from './types/block' import { BlockMetadata, + SubscriptionPollMetadata, type BalanceChange, type NamedTransactionFilter, type SubscribedTransaction, @@ -47,25 +48,12 @@ const deduplicateSubscribedTransactionsReducer = (dedupedTransactions: Subscribe } /** - * Executes a single pull/poll to subscribe to transactions on the configured Algorand - * blockchain for the given subscription context. - * @param subscription The subscription context. - * @param algod An Algod client. - * @param indexer An optional indexer client, only needed when `onMaxRounds` is `catchup-with-indexer`. - * @returns The result of this subscription pull/poll. + * Returns a flat list of arc-28 event definitions ready for processing. + * @param arc28Events The definition of ARC-28 event groups + * @returns The individual event definitions */ -export async function getSubscribedTransactions( - subscription: TransactionSubscriptionParams, - algod: Algodv2, - indexer?: Indexer, -): Promise { - const { watermark, filters, maxRoundsToSync: _maxRoundsToSync, syncBehaviour: onMaxRounds } = subscription - const maxRoundsToSync = _maxRoundsToSync ?? 500 - const currentRound = (await algod.status().do())['last-round'] as number - let blockMetadata: BlockMetadata[] | undefined - - // Pre-calculate a flat list of all ARC-28 events to process - const arc28Events = (subscription.arc28Events ?? []).flatMap((g) => +export function getArc28EventsToProcess(arc28Events: Arc28EventGroup[]) { + return (arc28Events ?? []).flatMap((g) => g.events.map((e) => { // https://github.com/algorandfoundation/ARCs/blob/main/ARCs/arc-0028.md#sample-interpretation-of-event-log-data const eventSignature = `${e.name}(${e.args.map((a) => a.type).join(',')})` @@ -81,14 +69,30 @@ export async function getSubscribedTransactions( } satisfies Arc28EventToProcess }), ) +} + +/** + * Creates the metadata needed to perform a single subscription poll. + * @param subscription The subscription configuration + * @param algod An algod instance + * @returns The metadata for the poll + */ +export async function prepareSubscriptionPoll( + subscription: TransactionSubscriptionParams, + algod: Algodv2, +): Promise { + const { watermark, maxRoundsToSync: _maxRoundsToSync, syncBehaviour: onMaxRounds } = subscription + const maxRoundsToSync = _maxRoundsToSync ?? 500 + const currentRound = (await algod.status().do())['last-round'] as number + let blockMetadata: BlockMetadata[] | undefined // Nothing to sync we at the tip of the chain already if (currentRound <= watermark) { return { currentRound: currentRound, newWatermark: watermark, - subscribedTransactions: [], syncedRoundRange: [currentRound, currentRound], + arc28EventGroups: subscription.arc28Events ?? [], } } @@ -96,8 +100,6 @@ export async function getSubscribedTransactions( let algodSyncFromRoundNumber = watermark + 1 let startRound = algodSyncFromRoundNumber let endRound = currentRound - let catchupTransactions: SubscribedTransaction[] = [] - let start = +new Date() let skipAlgodSync = false // If we are less than `maxRoundsToSync` from the tip of the chain then we consult the `syncBehaviour` to determine what to do @@ -123,10 +125,6 @@ export async function getSubscribedTransactions( } break case 'catchup-with-indexer': - if (!indexer) { - throw new Error("Can't catch up using indexer since it's not provided") - } - // If we have more than `maxIndexerRoundsToSync` rounds to sync from indexer then we skip algod sync and just sync that many rounds from indexer indexerSyncToRoundNumber = currentRound - maxRoundsToSync if (subscription.maxIndexerRoundsToSync && indexerSyncToRoundNumber - startRound + 1 > subscription.maxIndexerRoundsToSync) { @@ -137,43 +135,6 @@ export async function getSubscribedTransactions( algodSyncFromRoundNumber = indexerSyncToRoundNumber + 1 } - algokit.Config.logger.debug( - `Catching up from round ${startRound} to round ${indexerSyncToRoundNumber} via indexer; this may take a few seconds`, - ) - - // Retrieve and process transactions from indexer in groups of 30 so we don't get rate limited - for (const chunkedFilters of chunkArray(filters, 30)) { - catchupTransactions = catchupTransactions.concat( - ( - await Promise.all( - // For each filter - chunkedFilters.map(async (f) => - // Retrieve all pre-filtered transactions from the indexer - (await algokit.searchTransactions(indexer, indexerPreFilter(f.filter, startRound, indexerSyncToRoundNumber))).transactions - // Re-run the pre-filter in-memory so we properly extract inner transactions - .flatMap((t) => getFilteredIndexerTransactions(t, f)) - // Run the post-filter so we get the final list of matching transactions - .filter(indexerPostFilter(f.filter, arc28Events, subscription.arc28Events ?? [])), - ), - ) - ) - // Collapse the filtered transactions into a single array - .flat(), - ) - } - - catchupTransactions = catchupTransactions - // Sort by transaction order - .sort((a, b) => a['confirmed-round']! - b['confirmed-round']! || a['intra-round-offset']! - b['intra-round-offset']!) - // Collapse duplicate transactions - .reduce(deduplicateSubscribedTransactionsReducer, [] as SubscribedTransaction[]) - - algokit.Config.logger.debug( - `Retrieved ${catchupTransactions.length} transactions from round ${startRound} to round ${ - algodSyncFromRoundNumber - 1 - } via indexer in ${(+new Date() - start) / 1000}s`, - ) - break default: throw new Error('Not implemented') @@ -181,19 +142,11 @@ export async function getSubscribedTransactions( } // Retrieve and process blocks from algod - let algodTransactions: SubscribedTransaction[] = [] + let blockTransactions: TransactionInBlock[] | undefined = undefined if (!skipAlgodSync) { - start = +new Date() + const start = +new Date() const blocks = await getBlocksBulk({ startRound: algodSyncFromRoundNumber, maxRound: endRound }, algod) - const blockTransactions = blocks.flatMap((b) => getBlockTransactions(b.block)) - algodTransactions = filters - .flatMap((f) => - blockTransactions - .filter((t) => transactionFilter(f.filter, arc28Events, subscription.arc28Events ?? [])(t!)) - .map((t) => getIndexerTransactionFromAlgodTransaction(t, f.name)), - ) - .reduce(deduplicateSubscribedTransactionsReducer, []) - + blockTransactions = blocks.flatMap((b) => getBlockTransactions(b.block)) blockMetadata = blocks.map((b) => blockDataToBlockMetadata(b)) algokit.Config.logger.debug( @@ -201,20 +154,154 @@ export async function getSubscribedTransactions( (+new Date() - start) / 1000 }s`, ) - } else { - algokit.Config.logger.debug( - `Skipping algod sync since we have more than ${subscription.maxIndexerRoundsToSync} rounds to sync from indexer.`, - ) } return { + algodSyncRange: !skipAlgodSync ? [algodSyncFromRoundNumber, endRound] : undefined, + indexerSyncRange: indexerSyncToRoundNumber ? [startRound, indexerSyncToRoundNumber] : undefined, syncedRoundRange: [startRound, endRound], newWatermark: endRound, currentRound, blockMetadata, + blockTransactions, + arc28EventGroups: subscription.arc28Events ?? [], + } +} + +/** + * Run indexer catchup for the given filters and poll metadata + * @param filters The filters to apply + * @param pollMetadata The metadata for the poll + * @param indexer The indexer instance + * @returns The set of caught up, filtered transactions + */ +export async function getIndexerCatchupTransactions( + filters: NamedTransactionFilter[], + pollMetadata: SubscriptionPollMetadata, + arc28EventsToProcess: Arc28EventToProcess[], + indexer?: Indexer, +): Promise { + const { indexerSyncRange, arc28EventGroups } = pollMetadata + + if (!indexerSyncRange) { + return [] + } else if (!indexer) { + throw new Error("Can't catch up using indexer since it's not provided") + } + + const [startRound, endRound] = indexerSyncRange + const start = +new Date() + + let catchupTransactions: SubscribedTransaction[] = [] + + const filtersToRetrieve = filters.flatMap((f) => + Array.isArray(f.filter.assetId) + ? f.filter.assetId.map((id) => ({ name: f.name, filter: { ...f.filter, assetId: id } })) + : Array.isArray(f.filter.appId) + ? f.filter.appId.map((id) => ({ name: f.name, filter: { ...f.filter, appId: id } })) + : [f], + ) + + algokit.Config.logger.debug( + `Catching up from round ${startRound} to round ${endRound} via indexer${filtersToRetrieve.length > 10 ? ` for ${filtersToRetrieve.length} paginated searches, 10 at a time` : ''}; this may take a few seconds`, + ) + + // Retrieve and process transactions from indexer in groups of 10 so we don't get rate limited + for (const chunkedFilters of chunkArray(filtersToRetrieve, 10)) { + catchupTransactions = catchupTransactions.concat( + ( + await Promise.all( + // For each filter + chunkedFilters.map(async (f) => + // Retrieve all pre-filtered transactions from the indexer + (await algokit.searchTransactions(indexer, indexerPreFilter(f.filter, startRound, endRound))).transactions + // Re-run the pre-filter in-memory so we properly extract inner transactions + .flatMap((t) => getFilteredIndexerTransactions(t, f)) + // Run the post-filter so we get the final list of matching transactions + .filter(indexerPostFilter(f.filter, arc28EventsToProcess, arc28EventGroups ?? [])), + ), + ) + ) + // Collapse the filtered transactions into a single array + .flat(), + ) + } + + catchupTransactions = catchupTransactions + // Sort by transaction order + .sort((a, b) => a['confirmed-round']! - b['confirmed-round']! || a['intra-round-offset']! - b['intra-round-offset']!) + // Collapse duplicate transactions + .reduce(deduplicateSubscribedTransactionsReducer, [] as SubscribedTransaction[]) + + algokit.Config.logger.debug( + `Retrieved ${catchupTransactions.length} transactions from round ${startRound} to round ${endRound} via indexer in ${(+new Date() - start) / 1000}s`, + ) + return catchupTransactions +} + +/** + * Run indexer catchup for the given filters and poll metadata + * @param filters The filters to apply + * @param pollMetadata The metadata for the poll + * @param indexer The indexer instance + * @returns The set of caught up, filtered transactions + */ +export async function getAlgodSubscribedTransactions( + filters: NamedTransactionFilter[], + pollMetadata: SubscriptionPollMetadata, + arc28EventsToProcess: Arc28EventToProcess[], +): Promise { + const { algodSyncRange, arc28EventGroups, blockTransactions } = pollMetadata + + if (!algodSyncRange) { + return [] + } else if (blockTransactions === undefined) { + throw new Error("Can't catch up using algod since no block transactions were provided") + } + + const [startRound, endRound] = algodSyncRange + const start = +new Date() + + const algodTransactions = filters + .flatMap((f) => + blockTransactions + .filter((t) => transactionFilter(f.filter, arc28EventsToProcess, arc28EventGroups)(t!)) + .map((t) => getIndexerTransactionFromAlgodTransaction(t, f.name)), + ) + .reduce(deduplicateSubscribedTransactionsReducer, []) + + algokit.Config.logger.debug( + `Processed ${blockTransactions.length} algod transactions for round(s) ${startRound}-${endRound} in ${(+new Date() - start) / 1000}s`, + ) + + return algodTransactions +} + +/** + * Executes a single pull/poll to subscribe to transactions on the configured Algorand + * blockchain for the given subscription context. + * @param subscription The subscription context. + * @param algod An Algod client. + * @param indexer An optional indexer client, only needed when `onMaxRounds` is `catchup-with-indexer`. + * @returns The result of this subscription pull/poll. + */ +export async function getSubscribedTransactions( + subscription: TransactionSubscriptionParams, + algod: Algodv2, + indexer?: Indexer, +): Promise { + const pollMetadata = await prepareSubscriptionPoll(subscription, algod) + const arc28EventsToProcess = getArc28EventsToProcess(subscription.arc28Events ?? []) + const catchupTransactions = await getIndexerCatchupTransactions(subscription.filters, pollMetadata, arc28EventsToProcess, indexer) + const algodTransactions = await getAlgodSubscribedTransactions(subscription.filters, pollMetadata, arc28EventsToProcess) + + return { + syncedRoundRange: pollMetadata.syncedRoundRange, + newWatermark: pollMetadata.newWatermark, + currentRound: pollMetadata.currentRound, subscribedTransactions: catchupTransactions .concat(algodTransactions) - .map((t) => processExtraFields(t, arc28Events, subscription.arc28Events ?? [])), + .map((t) => processExtraSubscriptionTransactionFields(t, arc28EventsToProcess, subscription.arc28Events ?? [])), } } @@ -226,7 +313,7 @@ function transactionIsInArc28EventGroup(group: Arc28EventGroup, appId: number, t ) } -function processExtraFields( +export function processExtraSubscriptionTransactionFields( transaction: TransactionResult | SubscribedTransaction, arc28Events: Arc28EventToProcess[], arc28Groups: Arc28EventGroup[], @@ -252,7 +339,7 @@ function processExtraFields( ), balanceChanges: extractBalanceChangesFromIndexerTransaction(transaction), 'inner-txns': transaction['inner-txns'] - ? transaction['inner-txns'].map((inner) => processExtraFields(inner, arc28Events, arc28Groups)) + ? transaction['inner-txns'].map((inner) => processExtraSubscriptionTransactionFields(inner, arc28Events, arc28Groups)) : undefined, } } diff --git a/src/types/subscription.ts b/src/types/subscription.ts index a1519db..f725fcc 100644 --- a/src/types/subscription.ts +++ b/src/types/subscription.ts @@ -1,5 +1,6 @@ import type { ApplicationOnComplete, TransactionResult } from '@algorandfoundation/algokit-utils/types/indexer' import algosdk from 'algosdk' +import { TransactionInBlock } from '../transform' import { Arc28EventGroup, EmittedArc28Event } from './arc-28' import TransactionType = algosdk.TransactionType @@ -92,13 +93,31 @@ export enum BalanceChangeRole { } /** Metadata about an impending subscription poll. */ -export interface BeforePollMetadata { +export interface BeforeSubscriptionPollMetadata { /** The current watermark of the subscriber */ watermark: number /** The current round of algod */ currentRound: number } +/** Metadata needed to conduct a single subscription poll. */ +export interface SubscriptionPollMetadata { + /** The range of rounds to sync using algod; if undefined then algod sync not needed. */ + algodSyncRange?: [startRound: number, endRound: number] + /** The range of rounds to sync using indexer; if undefined then indexer sync not needed. */ + indexerSyncRange?: [startRound: number, endRound: number] + /** The range of rounds being synced. */ + syncedRoundRange: [startRound: number, endRound: number] + /** The new watermark to persist after this poll is complete. */ + newWatermark: number + /** The current round according to algod when the poll was started. */ + currentRound: number + /** The full set of transactions from algod for `algodSyncRange` or `undefined` if `algodSyncRange` is `undefined. */ + blockTransactions?: TransactionInBlock[] + /** The set of ARC-28 event groups to process against the subscribed transactions */ + arc28EventGroups: Arc28EventGroup[] +} + /** Common parameters to control a single subscription pull/poll for both `AlgorandSubscriber` and `getSubscribedTransactions`. */ export interface CoreTransactionSubscriptionParams { /** The filter(s) to apply to find transactions of interest. @@ -244,6 +263,30 @@ export interface TransactionSubscriptionParams extends CoreTransactionSubscripti watermark: number } +/** Configuration for a `DynamicAlgorandSubscriber` */ +export interface DynamicAlgorandSubscriberConfig extends Omit { + /** + * A function that returns a set of filters based on a given filter state and hierarchical poll level. + * @param state The filter state to return filters for + * @param pollLevel The hierarchical poll level; starts at 0 and increments by 1 each time a new poll is needed because of filter changes caused by the previous poll + * @param watermark The current watermark being processed + * @returns The set of filters to subscribe to / emit events for + */ + dynamicFilters: (state: T, pollLevel: number, watermark: number) => Promise[]> + + /** Methods to retrieve and persist the current filter state so syncing is resilient */ + filterStatePersistence: { + /** Returns the current filter state that syncing has previously been processed to */ + get: () => Promise + /** Persist the new filter state that has been created */ + set: (newState: T) => Promise + } + /** The frequency to poll for new blocks in seconds; defaults to 1s */ + frequencyInSeconds?: number + /** Whether to wait via algod `/status/wait-for-block-after` endpoint when at the tip of the chain; reduces latency of subscription */ + waitForBlockWhenAtTip?: boolean +} + /** Configuration for an `AlgorandSubscriber`. */ export interface AlgorandSubscriberConfig extends CoreTransactionSubscriptionParams { /** The set of filters to subscribe to / emit events for, along with optional data mappers. */ From 6345ea1dd1a1d5262071fe3925ce9f6d1c7ff32b Mon Sep 17 00:00:00 2001 From: "Rob Moore (MakerX)" Date: Wed, 3 Apr 2024 10:55:44 +0800 Subject: [PATCH 2/2] tests: Testing dynamic subscriber --- src/dynamic-subscriber.ts | 1 + src/subscriptions.ts | 1 + src/types/subscription.ts | 11 +- tests/scenarios/dynamic-subscriber.spec.ts | 366 +++++++++++++++++++++ 4 files changed, 377 insertions(+), 2 deletions(-) create mode 100644 tests/scenarios/dynamic-subscriber.spec.ts diff --git a/src/dynamic-subscriber.ts b/src/dynamic-subscriber.ts index 13c1550..364fdc3 100644 --- a/src/dynamic-subscriber.ts +++ b/src/dynamic-subscriber.ts @@ -112,6 +112,7 @@ export class DynamicAlgorandSubscriber extends AlgorandSubscriber { syncedRoundRange: pollMetadata.syncedRoundRange, newWatermark: pollMetadata.newWatermark, currentRound: pollMetadata.currentRound, + blockMetadata: pollMetadata.blockMetadata, subscribedTransactions: subscribedTransactions.sort( (a, b) => a['confirmed-round']! - b['confirmed-round']! || a['intra-round-offset']! - b['intra-round-offset']!, ), diff --git a/src/subscriptions.ts b/src/subscriptions.ts index 938d63b..198afb2 100644 --- a/src/subscriptions.ts +++ b/src/subscriptions.ts @@ -299,6 +299,7 @@ export async function getSubscribedTransactions( syncedRoundRange: pollMetadata.syncedRoundRange, newWatermark: pollMetadata.newWatermark, currentRound: pollMetadata.currentRound, + blockMetadata: pollMetadata.blockMetadata, subscribedTransactions: catchupTransactions .concat(algodTransactions) .map((t) => processExtraSubscriptionTransactionFields(t, arc28EventsToProcess, subscription.arc28Events ?? [])), diff --git a/src/types/subscription.ts b/src/types/subscription.ts index f725fcc..84b1846 100644 --- a/src/types/subscription.ts +++ b/src/types/subscription.ts @@ -1,7 +1,7 @@ import type { ApplicationOnComplete, TransactionResult } from '@algorandfoundation/algokit-utils/types/indexer' import algosdk from 'algosdk' -import { TransactionInBlock } from '../transform' import { Arc28EventGroup, EmittedArc28Event } from './arc-28' +import { TransactionInBlock } from './block' import TransactionType = algosdk.TransactionType /** The result of a single subscription pull/poll. */ @@ -116,6 +116,10 @@ export interface SubscriptionPollMetadata { blockTransactions?: TransactionInBlock[] /** The set of ARC-28 event groups to process against the subscribed transactions */ arc28EventGroups: Arc28EventGroup[] + /** The metadata about any blocks that were retrieved from algod as part + * of the subscription poll. + */ + blockMetadata?: BlockMetadata[] } /** Common parameters to control a single subscription pull/poll for both `AlgorandSubscriber` and `getSubscribedTransactions`. */ @@ -263,6 +267,9 @@ export interface TransactionSubscriptionParams extends CoreTransactionSubscripti watermark: number } +/** A function that returns a set of filters based on a given filter state and hierarchical poll level. */ +export type DynamicFilterLambda = (state: T, pollLevel: number, watermark: number) => Promise[]> + /** Configuration for a `DynamicAlgorandSubscriber` */ export interface DynamicAlgorandSubscriberConfig extends Omit { /** @@ -272,7 +279,7 @@ export interface DynamicAlgorandSubscriberConfig extends Omit Promise[]> + dynamicFilters: DynamicFilterLambda /** Methods to retrieve and persist the current filter state so syncing is resilient */ filterStatePersistence: { diff --git a/tests/scenarios/dynamic-subscriber.spec.ts b/tests/scenarios/dynamic-subscriber.spec.ts new file mode 100644 index 0000000..ccd5d3c --- /dev/null +++ b/tests/scenarios/dynamic-subscriber.spec.ts @@ -0,0 +1,366 @@ +/* eslint-disable no-console */ +import * as algokit from '@algorandfoundation/algokit-utils' +import { algorandFixture } from '@algorandfoundation/algokit-utils/testing' +import { SendTransactionFrom } from '@algorandfoundation/algokit-utils/types/transaction' +import { Algodv2, Indexer } from 'algosdk' +import { afterEach, beforeEach, describe, expect, test, vitest } from 'vitest' +import { DynamicAlgorandSubscriber } from '../../src' +import { DynamicAlgorandSubscriberConfig, DynamicFilterLambda } from '../../src/types' +import { SendXTransactions } from '../transactions' +import { waitFor } from '../wait' +import { InMemoryWatermark } from '../watermarks' + +describe('DynamicAlgorandSubscriber', () => { + const localnet = algorandFixture() + + beforeEach(localnet.beforeEach, 10e6) + afterEach(() => { + vitest.clearAllMocks() + }) + + const InMemoryFilterState = (get: () => T, set: (s: T) => void) => ({ + set: async (s: T) => { + set(s) + }, + get: async () => get(), + }) + + const getSubscriber = ( + config: { + testAccount: SendTransactionFrom + initialFilterState: T + filters: DynamicFilterLambda + configOverrides?: Partial> + initialWatermark?: number + }, + algod: Algodv2, + indexer?: Indexer, + ) => { + let watermark = config.initialWatermark ?? 0 + let filterState = config.initialFilterState + const subscribedTxns: string[] = [] + + const subscriber = new DynamicAlgorandSubscriber( + { + ...config.configOverrides, + dynamicFilters: config.filters, + syncBehaviour: config.configOverrides?.syncBehaviour ?? 'sync-oldest', + watermarkPersistence: InMemoryWatermark( + () => watermark, + (w) => (watermark = w), + ), + filterStatePersistence: InMemoryFilterState( + () => filterState, + (s) => (filterState = s), + ), + }, + algod, + indexer, + ) + return { + subscriber, + subscribedTestAccountTxns: subscribedTxns, + getWatermark: () => watermark, + } + } + + test('Subscribes to transactions correctly when controlling polling', async () => { + const { algod, testAccount, generateAccount } = localnet.context + const { lastTxnRound, txIds } = await SendXTransactions(1, testAccount, algod) + const { + subscriber, + subscribedTestAccountTxns: subscribedTxns, + getWatermark, + } = getSubscriber({ testAccount, initialWatermark: lastTxnRound - 1 }, algod) + + // Initial catch up with indexer + const result = await subscriber.pollOnce() + expect(subscribedTxns.length).toBe(1) + expect(subscribedTxns[0]).toBe(txIds[0]) + expect(getWatermark()).toBeGreaterThanOrEqual(lastTxnRound) + expect(result.currentRound).toBeGreaterThanOrEqual(lastTxnRound) + expect(result.newWatermark).toBe(result.currentRound) + expect(result.syncedRoundRange).toEqual([lastTxnRound, result.currentRound]) + expect(result.subscribedTransactions.length).toBe(1) + expect(result.subscribedTransactions.map((t) => t.id)).toEqual(txIds) + + // Random transaction + const { lastTxnRound: lastTxnRound2 } = await SendXTransactions(1, await generateAccount({ initialFunds: (3).algos() }), algod) + await subscriber.pollOnce() + expect(subscribedTxns.length).toBe(1) + expect(getWatermark()).toBeGreaterThanOrEqual(lastTxnRound2) + + // Another subscribed transaction + const { lastTxnRound: lastTxnRound3, txIds: txIds3 } = await SendXTransactions(1, testAccount, algod) + await subscriber.pollOnce() + expect(subscribedTxns.length).toBe(2) + expect(subscribedTxns[1]).toBe(txIds3[0]) + expect(getWatermark()).toBeGreaterThanOrEqual(lastTxnRound3) + }) + + test('Subscribes to transactions with multiple filters correctly', async () => { + const { algod, testAccount, generateAccount } = localnet.context + const randomAccount = await generateAccount({ initialFunds: (3).algos() }) + const senders = [await generateAccount({ initialFunds: (5).algos() }), await generateAccount({ initialFunds: (5).algos() })] + const sender1TxnIds: string[] = [] + let sender1TxnIdsfromBatch: string[] = [] + const sender2Rounds: number[] = [] + let sender2RoundsfromBatch: number[] = [] + const { lastTxnRound: firstTxnRound, txIds } = await SendXTransactions(1, testAccount, algod) + const { txIds: txIds1 } = await SendXTransactions(2, senders[0], algod) + const { lastTxnRound, txIds: txIds2, txns: txns2 } = await SendXTransactions(2, senders[1], algod) + const { subscriber, getWatermark } = getSubscriber( + { + testAccount, + initialWatermark: firstTxnRound - 1, + configOverrides: { + maxRoundsToSync: 100, + filters: [ + { + name: 'sender1', + filter: { + sender: algokit.getSenderAddress(senders[0]), + }, + mapper: (txs) => Promise.resolve(txs.map((t) => t.id)), + }, + { + name: 'sender2', + filter: { + sender: algokit.getSenderAddress(senders[1]), + }, + mapper: (txs) => Promise.resolve(txs.map((t) => t['confirmed-round']!)), + }, + ], + }, + }, + algod, + ) + subscriber.onBatch('sender1', (r) => { + sender1TxnIdsfromBatch = r + }) + subscriber.on('sender1', (r) => { + sender1TxnIds.push(r) + }) + subscriber.onBatch('sender2', (r) => { + sender2RoundsfromBatch = r + }) + subscriber.on('sender2', (r) => { + sender2Rounds.push(r) + }) + + // Initial catch up + const result = await subscriber.pollOnce() + console.log( + `Synced ${result.subscribedTransactions.length} transactions from rounds ${result.syncedRoundRange[0]}-${result.syncedRoundRange[1]} when current round is ${result.currentRound}`, + result.subscribedTransactions.map((t) => t.id), + ) + const subscribedTxns = result.subscribedTransactions + expect(subscribedTxns.length).toBe(5) + expect(subscribedTxns[0].id).toBe(txIds[0]) + expect(subscribedTxns[1].id).toBe(txIds1[0]) + expect(subscribedTxns[2].id).toBe(txIds1[1]) + expect(subscribedTxns[3].id).toBe(txIds2[0]) + expect(subscribedTxns[4].id).toBe(txIds2[1]) + expect(result.currentRound).toBeGreaterThanOrEqual(lastTxnRound) + expect(result.newWatermark).toBe(result.currentRound) + expect(getWatermark()).toBeGreaterThanOrEqual(result.currentRound) + expect(result.syncedRoundRange).toEqual([firstTxnRound, result.currentRound]) + expect(result.subscribedTransactions.length).toBe(5) + expect(result.subscribedTransactions.map((t) => t.id)).toEqual(txIds.concat(txIds1, txIds2)) + expect(sender1TxnIds).toEqual(txIds1) + expect(sender1TxnIdsfromBatch).toEqual(sender1TxnIds) + expect(sender2Rounds).toEqual(txns2.map((t) => Number(t.confirmation!.confirmedRound!))) + expect(sender2RoundsfromBatch).toEqual(sender2Rounds) + + // Random transaction + const { lastTxnRound: lastTxnRound2 } = await SendXTransactions(1, randomAccount, algod) + const result2 = await subscriber.pollOnce() + expect(result2.subscribedTransactions.length).toBe(0) + expect(getWatermark()).toBeGreaterThanOrEqual(lastTxnRound2) + + // More subscribed transactions + const { txIds: txIds3 } = await SendXTransactions(1, testAccount, algod) + const { txIds: txIds13 } = await SendXTransactions(2, senders[0], algod) + const { lastTxnRound: lastSubscribedRound3, txIds: txIds23, txns: txns23 } = await SendXTransactions(2, senders[1], algod) + + const result3 = await subscriber.pollOnce() + console.log( + `Synced ${result3.subscribedTransactions.length} transactions from rounds ${result3.syncedRoundRange[0]}-${result3.syncedRoundRange[1]} when current round is ${result3.currentRound}`, + result3.subscribedTransactions.map((t) => t.id), + ) + const subscribedTxns3 = result3.subscribedTransactions + expect(subscribedTxns3.length).toBe(5) + expect(subscribedTxns3[0].id).toBe(txIds3[0]) + expect(subscribedTxns3[1].id).toBe(txIds13[0]) + expect(subscribedTxns3[2].id).toBe(txIds13[1]) + expect(subscribedTxns3[3].id).toBe(txIds23[0]) + expect(subscribedTxns3[4].id).toBe(txIds23[1]) + expect(result3.currentRound).toBeGreaterThanOrEqual(lastSubscribedRound3) + expect(result3.newWatermark).toBe(result3.currentRound) + expect(getWatermark()).toBeGreaterThanOrEqual(result3.currentRound) + expect(result3.syncedRoundRange).toEqual([result2.newWatermark + 1, result3.currentRound]) + expect(result3.subscribedTransactions.length).toBe(5) + expect(result3.subscribedTransactions.map((t) => t.id)).toEqual(txIds3.concat(txIds13, txIds23)) + expect(sender1TxnIds).toEqual(txIds1.concat(txIds13)) + expect(sender1TxnIdsfromBatch).toEqual(txIds13) + expect(sender2Rounds).toEqual( + txns2.map((t) => Number(t.confirmation!.confirmedRound!)).concat(txns23.map((t) => Number(t.confirmation!.confirmedRound!))), + ) + expect(sender2RoundsfromBatch).toEqual(txns23.map((t) => Number(t.confirmation!.confirmedRound!))) + }) + + test('Subscribes to transactions at regular intervals when started and can be stopped', async () => { + const { algod, testAccount } = localnet.context + const { lastTxnRound, txIds } = await SendXTransactions(1, testAccount, algod) + const { + subscriber, + subscribedTestAccountTxns: subscribedTxns, + getWatermark, + } = getSubscriber( + { testAccount, configOverrides: { maxRoundsToSync: 1, frequencyInSeconds: 0.1 }, initialWatermark: lastTxnRound - 1 }, + algod, + ) + const roundsSynced: number[] = [] + + console.log('Starting subscriber') + subscriber.start((r) => roundsSynced.push(r.currentRound)) + + console.log('Waiting for ~0.5s') + await new Promise((resolve) => setTimeout(resolve, 500)) + const pollCountBeforeStopping = roundsSynced.length + + console.log('Stopping subscriber') + await subscriber.stop('TEST') + const pollCountAfterStopping = roundsSynced.length + + // Assert + expect(subscribedTxns.length).toBe(1) + expect(subscribedTxns[0]).toBe(txIds[0]) + expect(getWatermark()).toBeGreaterThanOrEqual(lastTxnRound) + // Polling frequency is 0.1s and we waited ~0.5s, LocalNet latency is low so expect 3-7 polls + expect(pollCountBeforeStopping).toBeGreaterThanOrEqual(3) + expect(pollCountBeforeStopping).toBeLessThanOrEqual(7) + // Expect no more than 1 extra poll after we called stop + expect(pollCountAfterStopping - pollCountBeforeStopping).toBeLessThanOrEqual(1) + }) + + test('Waits until transaction appears by default when started', async () => { + const { algod, testAccount } = localnet.context + const currentRound = (await algod.status().do())['last-round'] as number + const { + subscriber, + subscribedTestAccountTxns: subscribedTxns, + getWatermark, + } = getSubscriber( + { + testAccount, + // Polling for 10s means we are definitely testing the algod waiting works + configOverrides: { frequencyInSeconds: 10, waitForBlockWhenAtTip: true, syncBehaviour: 'sync-oldest' }, + initialWatermark: currentRound - 1, + }, + algod, + ) + const roundsSynced: number[] = [] + + console.log('Starting subscriber') + subscriber.start((r) => roundsSynced.push(r.currentRound)) + + console.log('Waiting for up to 2s until subscriber has caught up to tip of chain') + await waitFor(() => roundsSynced.length > 0, 2000) + + console.log('Issuing transaction') + const pollCountBeforeIssuing = roundsSynced.length + const { lastTxnRound, txIds } = await SendXTransactions(1, testAccount, algod) + + console.log(`Waiting for up to 2s for round ${lastTxnRound} to get processed`) + await waitFor(() => subscribedTxns.length === 1, 5000) + const pollCountAfterIssuing = roundsSynced.length + + console.log('Stopping subscriber') + await subscriber.stop('TEST') + + // Assert + expect(subscribedTxns.length).toBe(1) + expect(subscribedTxns[0]).toBe(txIds[0]) + expect(getWatermark()).toBeGreaterThanOrEqual(lastTxnRound) + // Expect at least 1 poll to have occurred + expect(pollCountAfterIssuing - pollCountBeforeIssuing).toBeGreaterThanOrEqual(1) + }) + + test('Correctly fires various on* methods', async () => { + const { algod, testAccount, generateAccount } = localnet.context + const randomAccount = await generateAccount({ initialFunds: (3).algos() }) + const { txns, txIds } = await SendXTransactions(2, testAccount, algod) + const { txIds: txIds2 } = await SendXTransactions(2, randomAccount, algod) + const initialWatermark = Number(txns[0].confirmation!.confirmedRound!) - 1 + const eventsEmitted: string[] = [] + let pollComplete = false + const { subscriber } = getSubscriber( + { + testAccount: algokit.randomAccount(), + initialWatermark, + configOverrides: { + maxRoundsToSync: 100, + syncBehaviour: 'sync-oldest', + frequencyInSeconds: 1000, + filters: [ + { + name: 'account1', + filter: { + sender: algokit.getSenderAddress(testAccount), + }, + }, + { + name: 'account2', + filter: { + sender: algokit.getSenderAddress(randomAccount), + }, + }, + ], + }, + }, + algod, + ) + subscriber + .onBatch('account1', (b) => { + eventsEmitted.push(`batch:account1:${b.map((b) => b.id).join(':')}`) + }) + .on('account1', (t) => { + eventsEmitted.push(`account1:${t.id}`) + }) + .onBatch('account2', (b) => { + eventsEmitted.push(`batch:account2:${b.map((b) => b.id).join(':')}`) + }) + .on('account2', (t) => { + eventsEmitted.push(`account2:${t.id}`) + }) + .onBeforePoll((metadata) => { + eventsEmitted.push(`before:poll:${metadata.watermark}`) + }) + .onPoll((result) => { + eventsEmitted.push(`poll:${result.subscribedTransactions.map((b) => b.id).join(':')}`) + }) + + subscriber.start((result) => { + eventsEmitted.push(`inspect:${result.subscribedTransactions.map((b) => b.id).join(':')}`) + pollComplete = true + }) + + console.log('Waiting for up to 2s until subscriber has polled') + await waitFor(() => pollComplete, 2000) + + const expectedBatchResult = `${txIds[0]}:${txIds[1]}:${txIds2[0]}:${txIds2[1]}` + expect(eventsEmitted).toEqual([ + `before:poll:${initialWatermark}`, + `batch:account1:${txIds[0]}:${txIds[1]}`, + `account1:${txIds[0]}`, + `account1:${txIds[1]}`, + `batch:account2:${txIds2[0]}:${txIds2[1]}`, + `account2:${txIds2[0]}`, + `account2:${txIds2[1]}`, + `inspect:${expectedBatchResult}`, + `poll:${expectedBatchResult}`, + ]) + await subscriber.stop('TEST') + }) +})