Skip to content

Commit

Permalink
Fix events queue job retry by resetting watcher in job-runner (#470)
Browse files Browse the repository at this point in the history
* Reset watcher after events job retry in job-runner

* Push next historical job from job-runner instead of event-watcher
  • Loading branch information
nikugogoi authored Nov 14, 2023
1 parent 6ce8d47 commit 7c4f9fb
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 29 deletions.
33 changes: 11 additions & 22 deletions packages/util/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,17 @@ export class EventWatcher {
}

async startBlockProcessing (): Promise<void> {
// Get latest block in chain and sync status from DB.
const [{ block: latestBlock }, syncStatus] = await Promise.all([
// Wait for events job queue to be empty before starting historical or realtime processing
await this._jobQueue.waitForEmptyQueue(QUEUE_EVENT_PROCESSING);

// Get latest block in chain and sync status from DB
// Also get historical-processing queu size
const [{ block: latestBlock }, syncStatus, historicalProcessingQueueSize] = await Promise.all([
this._ethClient.getBlockByHash(),
this._indexer.getSyncStatus(),
// Wait for events job queue to be empty before starting historical or realtime processing
this._jobQueue.waitForEmptyQueue(QUEUE_EVENT_PROCESSING)
this._jobQueue.getQueueSize(QUEUE_HISTORICAL_PROCESSING, 'completed')
]);

const historicalProcessingQueueSize = await this._jobQueue.getQueueSize(QUEUE_HISTORICAL_PROCESSING, 'completed');

// Stop if there are active or pending historical processing jobs
// Might be created on encountering template create in events processing
if (historicalProcessingQueueSize > 0) {
Expand Down Expand Up @@ -144,7 +145,8 @@ export class EventWatcher {
{
blockNumber: startBlockNumber,
processingEndBlockNumber: this._historicalProcessingEndBlockNumber
}
},
{ priority: 1 }
);
}

Expand Down Expand Up @@ -241,22 +243,11 @@ export class EventWatcher {
if (nextBatchStartBlockNumber > this._historicalProcessingEndBlockNumber) {
// Start next batch of historical processing or realtime processing
this.startBlockProcessing();

return;
}

// Push job for next batch of blocks
await this._jobQueue.pushJob(
QUEUE_HISTORICAL_PROCESSING,
{
blockNumber: nextBatchStartBlockNumber,
processingEndBlockNumber: this._historicalProcessingEndBlockNumber
}
);
}

async eventProcessingCompleteHandler (job: PgBoss.JobWithMetadata<any>): Promise<void> {
const { id, retrycount, data: { request: { data }, failed, state, createdOn } } = job;
const { id, data: { request: { data }, failed, state, createdOn, retryCount } } = job;

if (failed) {
log(`Job ${id} for queue ${QUEUE_EVENT_PROCESSING} failed`);
Expand All @@ -274,9 +265,7 @@ export class EventWatcher {
assert(blockProgress);

// Check if job was retried
if (retrycount > 0) {
// Reset watcher to remove any data after this block
await this._indexer.resetWatcherToBlock(blockProgress.blockNumber);
if (retryCount > 0) {
// Start block processing (Try restarting historical processing or continue realtime processing)
this.startBlockProcessing();
}
Expand Down
2 changes: 1 addition & 1 deletion packages/util/src/job-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type JobCompleteCallback = (job: PgBoss.Job | PgBoss.JobWithMetadata) => Promise
const DEFAULT_JOBS_PER_INTERVAL = 5;

// Interval time to check for events queue to be empty
const EMPTY_QUEUE_CHECK_INTERVAL = 5000;
const EMPTY_QUEUE_CHECK_INTERVAL = 1000;

const log = debug('vulcanize:job-queue');

Expand Down
33 changes: 27 additions & 6 deletions packages/util/src/job-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,12 @@ export class JobRunner {
if (this._historicalProcessingCompletedUpto) {
// Check if historical processing start is for a previous block which happens incase of template create
if (startBlock < this._historicalProcessingCompletedUpto) {
// Delete any pending historical processing jobs
await this.jobQueue.deleteJobs(QUEUE_HISTORICAL_PROCESSING);
await Promise.all([
// Delete any pending historical processing jobs
this.jobQueue.deleteJobs(QUEUE_HISTORICAL_PROCESSING),
// Remove pending events queue jobs
this.jobQueue.deleteJobs(QUEUE_EVENT_PROCESSING)
]);

// Wait for events queue to be empty
log(`Waiting for events queue to be empty before resetting watcher to block ${startBlock - 1}`);
Expand Down Expand Up @@ -242,6 +246,17 @@ export class JobRunner {

this._historicalProcessingCompletedUpto = endBlock;

if (endBlock < processingEndBlockNumber) {
// If endBlock is lesser than processingEndBlockNumber push new historical job
await this.jobQueue.pushJob(
QUEUE_HISTORICAL_PROCESSING,
{
blockNumber: endBlock + 1,
processingEndBlockNumber: processingEndBlockNumber
}
);
}

await this.jobQueue.markComplete(
job,
{ isComplete: true, endBlock }
Expand Down Expand Up @@ -679,9 +694,15 @@ export class JobRunner {
this._endBlockProcessTimer = lastBlockProcessDuration.startTimer();
await this._indexer.updateSyncStatusProcessedBlock(block.blockHash, block.blockNumber);

// If this was a retry attempt, unset the indexing error flag in sync status
if (retryCount > 0) {
await this._indexer.updateSyncStatusIndexingError(false);
await Promise.all([
// If this was a retry attempt, unset the indexing error flag in sync status
this._indexer.updateSyncStatusIndexingError(false),
// Reset watcher after succesfull retry so that block processing starts after this block
this._indexer.resetWatcherToBlock(block.blockNumber)
]);

log(`Watcher reset to block ${block.blockNumber} after succesffully retrying events processing`);
}
} catch (error) {
log(`Error in processing events for block ${block.blockNumber} hash ${block.blockHash}`);
Expand All @@ -691,8 +712,8 @@ export class JobRunner {
this._indexer.clearProcessedBlockData(block),
// Delete all pending event processing jobs
this.jobQueue.deleteJobs(QUEUE_EVENT_PROCESSING),
// Delete all pending historical processing jobs
this.jobQueue.deleteJobs(QUEUE_HISTORICAL_PROCESSING, 'active'),
// Delete all active and pending historical processing jobs
this.jobQueue.deleteJobs(QUEUE_HISTORICAL_PROCESSING, 'completed'),
// Set the indexing error flag in sync status
this._indexer.updateSyncStatusIndexingError(true)
]);
Expand Down

0 comments on commit 7c4f9fb

Please sign in to comment.