Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Dynamic subscription filters #45

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 214 additions & 0 deletions examples/data-history-museum-with-ownership/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
import * as algokit from '@algorandfoundation/algokit-utils'
import { TransactionResult } from '@algorandfoundation/algokit-utils/types/indexer'
import algosdk from 'algosdk'
import fs from 'fs'
import path from 'path'
import { DynamicAlgorandSubscriber } from '../../src'
import TransactionType = algosdk.TransactionType

if (!fs.existsSync(path.join(__dirname, '..', '..', '.env')) && !process.env.ALGOD_SERVER) {
// eslint-disable-next-line no-console
console.error('Copy /.env.sample to /.env before starting the application.')
process.exit(1)
}

interface DHMAsset {
id: number
name: string
unit: string
mediaUrl: string
metadata: Record<string, unknown>
created: string
lastModified: string
owner: string
ownerModified: string
}

interface DHMFilterState {
assetIds: number[]
}

async function getDHMSubscriber() {
const algod = await algokit.getAlgoClient()
const indexer = await algokit.getAlgoIndexerClient()
const subscriber = new DynamicAlgorandSubscriber<DHMFilterState>(
{
maxIndexerRoundsToSync: 10_000_000,
dynamicFilters: async (filterState, pollLevel) => [
...(pollLevel === 0
? [
{
name: 'dhm-asset',
filter: {
type: TransactionType.acfg,
// Data History Museum creator accounts
sender: (await algokit.isTestNet(algod))
? 'ER7AMZRPD5KDVFWTUUVOADSOWM4RQKEEV2EDYRVSA757UHXOIEKGMBQIVU'
: 'EHYQCYHUC6CIWZLBX5TDTLVJ4SSVE4RRTMKFDCG4Z4Q7QSQ2XWIQPMKBPU',
},
},
]
: []),
...(filterState.assetIds.length > 0
? [
{
name: 'dhm-ownership-change',
filter: {
type: TransactionType.axfer,
assetId: filterState.assetIds,
minAmount: 1,
},
},
]
: []),
],
filterStatePersistence: {
get: getFilterState,
set: saveFilterState,
},
frequencyInSeconds: 1,
maxRoundsToSync: 500,
syncBehaviour: 'catchup-with-indexer',
watermarkPersistence: {
get: getLastWatermark,
set: saveWatermark,
},
},
algod,
indexer,
)
subscriber.onBatch('dhm-asset', async (events) => {
// eslint-disable-next-line no-console
console.log(`Received ${events.length} asset changes (${events.filter((t) => t['created-asset-index']).length} new assets)`)

// Append any new asset ids to the filter state so ownership is picked up of them
subscriber.appendFilterState({ assetIds: events.filter((e) => e['created-asset-index']).map((e) => e['created-asset-index']!) })
})
subscriber.onBatch('dhm-ownership-change', async (events) => {
// eslint-disable-next-line no-console
console.log(`Received ${events.length} ownership changes`)
})
subscriber.onPoll(async (pollMetadata) => {
// Save all of the Data History Museum Verifiably Authentic Digital Historical Artifacts
await saveDHMTransactions(pollMetadata.subscribedTransactions)
})
return subscriber
}

function getArc69Metadata(t: TransactionResult) {
let metadata = {}
try {
if (t.note && t.note.startsWith('ey')) metadata = JSON.parse(Buffer.from(t.note, 'base64').toString('utf-8'))
// eslint-disable-next-line no-empty
} catch (e) {}
return metadata
}

async function saveDHMTransactions(transactions: TransactionResult[]) {
const assets = await getSavedTransactions<DHMAsset>('dhm-assets.json')

for (const t of transactions) {
if (t['created-asset-index']) {
assets.push({
id: t['created-asset-index'],
name: t['asset-config-transaction']!.params!.name!,
unit: t['asset-config-transaction']!.params!['unit-name']!,
mediaUrl: t['asset-config-transaction']!.params!.url!,
metadata: getArc69Metadata(t),
created: new Date(t['round-time']! * 1000).toISOString(),
lastModified: new Date(t['round-time']! * 1000).toISOString(),
owner: t.sender,
ownerModified: new Date(t['round-time']! * 1000).toISOString(),
})
} else if (t['asset-config-transaction']) {
const asset = assets.find((a) => a.id === t['asset-config-transaction']!['asset-id'])
if (!asset) {
// eslint-disable-next-line no-console
console.error(t)
throw new Error(`Unable to find existing asset data for ${t['asset-config-transaction']!['asset-id']}`)
}
if (!t['asset-config-transaction']!.params) {
// Asset was deleted, remove it
assets.splice(assets.indexOf(asset), 1)
} else {
asset!.metadata = getArc69Metadata(t)
asset!.lastModified = new Date(t['round-time']! * 1000).toISOString()
}
} else if (t['asset-transfer-transaction']) {
const asset = assets.find((a) => a.id === t['asset-transfer-transaction']!['asset-id'])
if (!asset) {
// eslint-disable-next-line no-console
console.error(t)
throw new Error(`Unable to find existing asset data for ${t['asset-transfer-transaction']!['asset-id']}`)
}
if (t['asset-transfer-transaction'].amount > 0) {
asset.owner = t['asset-transfer-transaction']!.receiver
asset.ownerModified = new Date(t['round-time']! * 1000).toISOString()
}
}
}

await saveTransactions(assets, 'dhm-assets.json')
}

// Basic methods that persist using filesystem - for illustrative purposes only

async function saveFilterState(state: DHMFilterState) {
fs.writeFileSync(path.join(__dirname, 'filters.json'), JSON.stringify(state), { encoding: 'utf-8' })
}

async function getFilterState(): Promise<DHMFilterState> {
if (!fs.existsSync(path.join(__dirname, 'filters.json'))) return { assetIds: [] }
const existing = fs.readFileSync(path.join(__dirname, 'filters.json'), 'utf-8')
const existingData = JSON.parse(existing) as DHMFilterState
// eslint-disable-next-line no-console
console.log(`Found existing filter state in filters.json; syncing with ${existingData.assetIds.length} assets`)
return existingData
}

async function saveWatermark(watermark: number) {
fs.writeFileSync(path.join(__dirname, 'watermark.txt'), watermark.toString(), { encoding: 'utf-8' })
}

async function getLastWatermark(): Promise<number> {
if (!fs.existsSync(path.join(__dirname, 'watermark.txt'))) return 15_000_000
const existing = fs.readFileSync(path.join(__dirname, 'watermark.txt'), 'utf-8')
// eslint-disable-next-line no-console
console.log(`Found existing sync watermark in watermark.txt; syncing from ${existing}`)
return Number(existing)
}

async function getSavedTransactions<T>(fileName: string): Promise<T[]> {
const existing = fs.existsSync(path.join(__dirname, fileName))
? (JSON.parse(fs.readFileSync(path.join(__dirname, fileName), 'utf-8')) as T[])
: []
return existing
}

async function saveTransactions(transactions: unknown[], fileName: string) {
fs.writeFileSync(path.join(__dirname, fileName), JSON.stringify(transactions, undefined, 2), { encoding: 'utf-8' })
// eslint-disable-next-line no-console
console.log(`Saved ${transactions.length} transactions to ${fileName}`)
}

// eslint-disable-next-line no-console
process.on('uncaughtException', (e) => console.error(e))
;(async () => {
const subscriber = await getDHMSubscriber()

if (process.env.RUN_LOOP === 'true') {
subscriber.start()
;['SIGINT', 'SIGTERM', 'SIGQUIT'].forEach((signal) =>
process.on(signal, () => {
// eslint-disable-next-line no-console
console.log(`Received ${signal}; stopping subscriber...`)
subscriber.stop(signal)
}),
)
} else {
await subscriber.pollOnce()
}
})().catch((e) => {
// eslint-disable-next-line no-console
console.error(e)
})
136 changes: 136 additions & 0 deletions src/dynamic-subscriber.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import * as algokit from '@algorandfoundation/algokit-utils'
import algosdk from 'algosdk'
import { AlgorandSubscriber } from './subscriber'
import {
getAlgodSubscribedTransactions,
getArc28EventsToProcess,
getIndexerCatchupTransactions,
prepareSubscriptionPoll,
processExtraSubscriptionTransactionFields,
} from './subscriptions'
import type {
DynamicAlgorandSubscriberConfig,
NamedTransactionFilter,
SubscribedTransaction,
TransactionSubscriptionResult,
} from './types/subscription'
import Algodv2 = algosdk.Algodv2
import Indexer = algosdk.Indexer

export class DynamicAlgorandSubscriber<T> extends AlgorandSubscriber {
private pendingStateChanges: { action: 'append' | 'delete' | 'set'; stateChange: Partial<T> }[] = []
private dynamicConfig: DynamicAlgorandSubscriberConfig<T>

constructor(config: DynamicAlgorandSubscriberConfig<T>, algod: Algodv2, indexer?: Indexer) {
super(
{
filters: [],
...config,
},
algod,
indexer,
)
this.dynamicConfig = config
}

protected override async _pollOnce(watermark: number): Promise<TransactionSubscriptionResult> {
let subscribedTransactions: SubscribedTransaction[] = []
let filterState: T = await this.dynamicConfig.filterStatePersistence.get()

const subscribe = async (filters: NamedTransactionFilter[]) => {
if (filters.length === 0) return []
const catchupTransactions = await getIndexerCatchupTransactions(filters, pollMetadata, arc28EventsToProcess, this.indexer)
const algodTransactions = await getAlgodSubscribedTransactions(filters, pollMetadata, arc28EventsToProcess)
const subscribedTransactions = catchupTransactions
.concat(algodTransactions)
.map((t) => processExtraSubscriptionTransactionFields(t, arc28EventsToProcess, this.config.arc28Events ?? []))
await this._processFilters({ subscribedTransactions, ...pollMetadata })
return subscribedTransactions
}

const filters = await this.dynamicConfig.dynamicFilters(filterState, 0, watermark)
this.filterNames = filters
.map((f) => f.name)
.filter((value, index, self) => {
// Remove duplicates
return self.findIndex((x) => x === value) === index
})
const pollMetadata = await prepareSubscriptionPoll({ ...this.config, watermark, filters }, this.algod)
const arc28EventsToProcess = getArc28EventsToProcess(this.config.arc28Events ?? [])

subscribedTransactions = await subscribe(filters)

let pollLevel = 0
while (this.pendingStateChanges.length > 0) {
const stateChangeCount = this.pendingStateChanges.length
let filterStateToProcess = { ...filterState }
for (const change of this.pendingStateChanges) {
switch (change.action) {
case 'append':
for (const key of Object.keys(change.stateChange)) {
const k = key as keyof T
if (!filterState[k] || !Array.isArray(filterState[k])) {
filterState[k] = change.stateChange[k]!
} else {
filterState[k] = (filterState[k] as unknown[]).concat(change.stateChange[k]) as T[keyof T]
}
}
filterStateToProcess = { ...filterStateToProcess, ...change.stateChange }
break
case 'delete':
for (const key of Object.keys(change.stateChange)) {
const k = key as keyof T
delete filterState[k]
delete filterStateToProcess[k]
}
break
case 'set':
filterState = { ...filterState, ...change.stateChange }
filterStateToProcess = { ...filterState, ...change.stateChange }
break
}
}
this.pendingStateChanges = []
const newFilters = await this.dynamicConfig.dynamicFilters(filterStateToProcess, ++pollLevel, watermark)
this.filterNames = newFilters
.map((f) => f.name)
.filter((value, index, self) => {
// Remove duplicates
return self.findIndex((x) => x === value) === index
})

algokit.Config.logger.debug(
`Poll level ${pollLevel}: Found ${stateChangeCount} pending state changes and applied them to get ${newFilters.length} filters; syncing...`,
)

subscribedTransactions = subscribedTransactions.concat(await subscribe(newFilters))
}

await this.dynamicConfig.filterStatePersistence.set(filterState)

return {
syncedRoundRange: pollMetadata.syncedRoundRange,
newWatermark: pollMetadata.newWatermark,
currentRound: pollMetadata.currentRound,
blockMetadata: pollMetadata.blockMetadata,
subscribedTransactions: subscribedTransactions.sort(
(a, b) => a['confirmed-round']! - b['confirmed-round']! || a['intra-round-offset']! - b['intra-round-offset']!,
),
}
}

appendFilterState(stateChange: Partial<T>) {
this.pendingStateChanges.push({ action: 'append', stateChange })
}

deleteFilterState(stateChange: (keyof T)[]) {
this.pendingStateChanges.push({
action: 'delete',
stateChange: stateChange.reduce((acc, key) => ({ ...acc, [key]: true }), {} as Partial<T>),
})
}

setFilterState(stateChange: Partial<T>) {
this.pendingStateChanges.push({ action: 'set', stateChange })
}
}
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './dynamic-subscriber'
export * from './subscriber'
export * from './subscriptions'
Loading
Loading