diff --git a/packages/util/src/config.ts b/packages/util/src/config.ts index b36fbade..4c5feb3a 100644 --- a/packages/util/src/config.ts +++ b/packages/util/src/config.ts @@ -29,6 +29,11 @@ export interface JobQueueConfig { // Block range in which logs are fetched during historical blocks processing historicalLogsBlockRange?: number; + // Factor to find the next multiple endBlock number when deciding eth_getLogs block range + // Used to fetch logs in aligned block ranges (paired with caching proxy server) + // If set to 0, historicalLogsBlockRange will be used to instead to decide block ranges + historicalLogsBlockRangeEndFactor?: number + // Max block range of historical processing after which it waits for completion of events processing // If set to -1 historical processing does not wait for events processing and completes till latest canonical block historicalMaxFetchAhead?: number; diff --git a/packages/util/src/events.ts b/packages/util/src/events.ts index ace4da5a..c91ec9e9 100644 --- a/packages/util/src/events.ts +++ b/packages/util/src/events.ts @@ -152,6 +152,18 @@ export class EventWatcher { endBlockNumber = Math.min(startBlockNumber + historicalMaxFetchAhead, endBlockNumber); } + if (this._config.jobQueue.historicalLogsBlockRangeEndFactor) { + // Set endBlockNumber to a multiple lower than computed endBlockNumber (or canonical block) + // For using aligned block ranges + endBlockNumber = Math.floor(endBlockNumber / this._config.jobQueue.historicalLogsBlockRangeEndFactor) * this._config.jobQueue.historicalLogsBlockRangeEndFactor; + } + + if (endBlockNumber < startBlockNumber) { + await this.startRealtimeBlockProcessing(startBlockNumber); + + return; + } + this._historicalProcessingEndBlockNumber = endBlockNumber; log(`Starting historical block processing in batches from ${startBlockNumber} up to block ${this._historicalProcessingEndBlockNumber}`); diff --git a/packages/util/src/indexer.ts b/packages/util/src/indexer.ts index 0fd0f0ba..9b6bc600 100644 --- a/packages/util/src/indexer.ts +++ b/packages/util/src/indexer.ts @@ -469,10 +469,27 @@ export class Indexer { topics }); - const blockLogsMap = this._reduceLogsToBlockLogsMap(logs); - // Create unique list of tx required + let blockLogsMap = this._reduceLogsToBlockLogsMap(logs); + + // Filter blocks which have no events from watched contracts + blockLogsMap = Array.from(blockLogsMap.entries()) + .filter(([, logs]) => { + return logs.some(log => { + const contractAddress = ethers.utils.getAddress(log.account.address); + return this.isContractAddressWatched(contractAddress)?.length; + }); + }) + .reduce((acc, [blockHash, logs]) => { + acc.set(blockHash, logs); + return acc; + }, new Map()); + + // Create unique list of txs required const txHashes = Array.from([ - ...new Set(logs.map((log: any) => log.transaction.hash)) + ...new Set( + Array.from(blockLogsMap.values()) + .flat() + .map((log: any) => log.transaction.hash)) ]); // Fetch blocks with transactions for the logs returned @@ -543,7 +560,7 @@ export class Indexer { return blocksWithDbEvents; } - _reduceLogsToBlockLogsMap (logs: any[]): Map { + _reduceLogsToBlockLogsMap (logs: any[]): Map { return logs.reduce((acc: Map, log: any) => { const { blockHash: logBlockHash } = log; assert(typeof logBlockHash === 'string'); diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index f544c290..4964ac16 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -252,7 +252,15 @@ export class JobRunner { this._lastHistoricalProcessingEndBlockNumber = processingEndBlockNumber; const logsBlockRange = this._jobQueueConfig.historicalLogsBlockRange ?? DEFAULT_HISTORICAL_LOGS_BLOCK_RANGE; - const endBlock = Math.min(startBlock + logsBlockRange, processingEndBlockNumber); + let rangeEndBlock = startBlock + logsBlockRange; + + if (this._jobQueueConfig.historicalLogsBlockRangeEndFactor) { + // Set rangeEndBlock to the next multiple of historicalLogsBlockRangeEndFactor after startBlock number + // For using aligned block ranges + rangeEndBlock = Math.ceil(startBlock / this._jobQueueConfig.historicalLogsBlockRangeEndFactor) * this._jobQueueConfig.historicalLogsBlockRangeEndFactor; + } + + const endBlock = Math.min(rangeEndBlock, processingEndBlockNumber); log(`Processing historical blocks from ${startBlock} to ${endBlock}`); const blocks = await fetchAndSaveFilteredLogsAndBlocks( @@ -720,9 +728,9 @@ export class JobRunner { this._blockAndEventsMap.delete(block.blockHash); - // Check if new contract was added and filterLogsByAddresses is set to true - if (isNewContractWatched && this._indexer.upstreamConfig.ethServer.filterLogsByAddresses) { - // Check if historical processing is running and that current block is being processed was trigerred by historical processing + // Check if new contract was added + if (isNewContractWatched) { + // Check if historical processing is running and that current block being processed was trigerred by historical processing if (this._historicalProcessingCompletedUpto && this._historicalProcessingCompletedUpto > block.blockNumber) { const nextBlockNumberToProcess = block.blockNumber + 1;