Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use aligned block ranges in eth_getLogs requests during historical processing #527

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions packages/util/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ export interface JobQueueConfig {
// Block range in which logs are fetched during historical blocks processing
historicalLogsBlockRange?: number;

// Factor to find the next multiple endBlock number when deciding eth_getLogs block range
// Used to fetch logs in aligned block ranges (paired with caching proxy server)
// If set to 0, historicalLogsBlockRange will be used to instead to decide block ranges
historicalLogsBlockRangeEndFactor?: number

// Max block range of historical processing after which it waits for completion of events processing
// If set to -1 historical processing does not wait for events processing and completes till latest canonical block
historicalMaxFetchAhead?: number;
Expand Down
12 changes: 12 additions & 0 deletions packages/util/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,18 @@ export class EventWatcher {
endBlockNumber = Math.min(startBlockNumber + historicalMaxFetchAhead, endBlockNumber);
}

if (this._config.jobQueue.historicalLogsBlockRangeEndFactor) {
// Set endBlockNumber to a multiple lower than computed endBlockNumber (or canonical block)
// For using aligned block ranges
endBlockNumber = Math.floor(endBlockNumber / this._config.jobQueue.historicalLogsBlockRangeEndFactor) * this._config.jobQueue.historicalLogsBlockRangeEndFactor;
}

if (endBlockNumber < startBlockNumber) {
await this.startRealtimeBlockProcessing(startBlockNumber);

return;
}

this._historicalProcessingEndBlockNumber = endBlockNumber;
log(`Starting historical block processing in batches from ${startBlockNumber} up to block ${this._historicalProcessingEndBlockNumber}`);

Expand Down
25 changes: 21 additions & 4 deletions packages/util/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -469,10 +469,27 @@ export class Indexer {
topics
});

const blockLogsMap = this._reduceLogsToBlockLogsMap(logs);
// Create unique list of tx required
let blockLogsMap = this._reduceLogsToBlockLogsMap(logs);

// Filter blocks which have no events from watched contracts
blockLogsMap = Array.from(blockLogsMap.entries())
.filter(([, logs]) => {
return logs.some(log => {
const contractAddress = ethers.utils.getAddress(log.account.address);
return this.isContractAddressWatched(contractAddress)?.length;
});
})
.reduce((acc, [blockHash, logs]) => {
acc.set(blockHash, logs);
return acc;
}, new Map());

// Create unique list of txs required
const txHashes = Array.from([
...new Set<string>(logs.map((log: any) => log.transaction.hash))
...new Set<string>(
Array.from(blockLogsMap.values())
.flat()
.map((log: any) => log.transaction.hash))
]);

// Fetch blocks with transactions for the logs returned
Expand Down Expand Up @@ -543,7 +560,7 @@ export class Indexer {
return blocksWithDbEvents;
}

_reduceLogsToBlockLogsMap (logs: any[]): Map<string, any> {
_reduceLogsToBlockLogsMap (logs: any[]): Map<string, any[]> {
return logs.reduce((acc: Map<string, any>, log: any) => {
const { blockHash: logBlockHash } = log;
assert(typeof logBlockHash === 'string');
Expand Down
16 changes: 12 additions & 4 deletions packages/util/src/job-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,15 @@ export class JobRunner {

this._lastHistoricalProcessingEndBlockNumber = processingEndBlockNumber;
const logsBlockRange = this._jobQueueConfig.historicalLogsBlockRange ?? DEFAULT_HISTORICAL_LOGS_BLOCK_RANGE;
const endBlock = Math.min(startBlock + logsBlockRange, processingEndBlockNumber);
let rangeEndBlock = startBlock + logsBlockRange;

if (this._jobQueueConfig.historicalLogsBlockRangeEndFactor) {
// Set rangeEndBlock to the next multiple of historicalLogsBlockRangeEndFactor after startBlock number
// For using aligned block ranges
rangeEndBlock = Math.ceil(startBlock / this._jobQueueConfig.historicalLogsBlockRangeEndFactor) * this._jobQueueConfig.historicalLogsBlockRangeEndFactor;
}

const endBlock = Math.min(rangeEndBlock, processingEndBlockNumber);
log(`Processing historical blocks from ${startBlock} to ${endBlock}`);

const blocks = await fetchAndSaveFilteredLogsAndBlocks(
Expand Down Expand Up @@ -720,9 +728,9 @@ export class JobRunner {

this._blockAndEventsMap.delete(block.blockHash);

// Check if new contract was added and filterLogsByAddresses is set to true
if (isNewContractWatched && this._indexer.upstreamConfig.ethServer.filterLogsByAddresses) {
// Check if historical processing is running and that current block is being processed was trigerred by historical processing
// Check if new contract was added
if (isNewContractWatched) {
// Check if historical processing is running and that current block being processed was trigerred by historical processing
if (this._historicalProcessingCompletedUpto && this._historicalProcessingCompletedUpto > block.blockNumber) {
const nextBlockNumberToProcess = block.blockNumber + 1;

Expand Down
Loading