From d38d0a8d8f473ce792371653ac1fa089c4aa1dcd Mon Sep 17 00:00:00 2001 From: skcdev Date: Tue, 13 Feb 2024 01:05:05 +1300 Subject: [PATCH 1/5] track progress in onInactivitySeconds --- src/inactivitySeconds.ts | 66 ++++++++++++++++++++++++++-------------- 1 file changed, 43 insertions(+), 23 deletions(-) diff --git a/src/inactivitySeconds.ts b/src/inactivitySeconds.ts index 975851d..a7c1ba8 100644 --- a/src/inactivitySeconds.ts +++ b/src/inactivitySeconds.ts @@ -4,30 +4,50 @@ import { logger } from "./logger.js"; const CHECK_INACTIVITY_INTERVAL = 1000; -export function onInactivitySeconds(emitter: BlockEmitter, inactivitySeconds: number, hasStopBlock: boolean) { - let lastUpdate = now(); - let isFinished = false; - - async function checkInactivity() { - if (now() - lastUpdate > inactivitySeconds) { - logger.error(`Process will exit due to inactivity for ${inactivitySeconds} seconds`); - process.exit(1); // force quit - } - if (isFinished) return; // exit out of the loop - await setTimeout(CHECK_INACTIVITY_INTERVAL); - checkInactivity(); - } - - // Check for inactivity after starting - emitter.on("anyMessage", (message, cursor, clock) => { - lastUpdate = now(); - if (hasStopBlock && clock.number >= emitter.request.stopBlockNum - 1n) { - isFinished = true; - } - }); - checkInactivity(); +export function onInactivitySeconds( + emitter: BlockEmitter, + inactivitySeconds: number, + hasStopBlock: boolean +) { + let lastUpdate = now(); + let isFinished = false; + let lastTotalBytesRead = 0n; + let currentTotalBytesRead = 0n; + + async function checkInactivity() { + if (currentTotalBytesRead > lastTotalBytesRead) { + lastUpdate = now(); + lastTotalBytesRead = currentTotalBytesRead; + } + + if (now() - lastUpdate > inactivitySeconds) { + logger.error( + `Process will exit due to inactivity for ${inactivitySeconds} seconds` + ); + process.exit(1); // force quit + } + if (isFinished) return; // exit out of the loop + await setTimeout(CHECK_INACTIVITY_INTERVAL); + checkInactivity(); + } + + // Check for inactivity after starting + emitter.on("clock", (clock) => { + lastUpdate = now(); + if (hasStopBlock && clock.number >= emitter.request.stopBlockNum - 1n) { + isFinished = true; + } + }); + + emitter.on("progress", (progress) => { + if (progress.processedBytes) { + currentTotalBytesRead = progress.processedBytes.totalBytesRead; + } + }); + + checkInactivity(); } export function now() { - return Math.floor(new Date().getTime() / 1000); // in seconds + return Math.floor(new Date().getTime() / 1000); // in seconds } From 56cd3a5d422912e78134338dca7b091adbd7509a Mon Sep 17 00:00:00 2001 From: skcdev Date: Tue, 13 Feb 2024 01:07:04 +1300 Subject: [PATCH 2/5] add comments --- src/inactivitySeconds.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/inactivitySeconds.ts b/src/inactivitySeconds.ts index a7c1ba8..2da845a 100644 --- a/src/inactivitySeconds.ts +++ b/src/inactivitySeconds.ts @@ -15,6 +15,7 @@ export function onInactivitySeconds( let currentTotalBytesRead = 0n; async function checkInactivity() { + // Refresh lastUpdate/lastTotalBytesRead if totalBytesRead is increasing if (currentTotalBytesRead > lastTotalBytesRead) { lastUpdate = now(); lastTotalBytesRead = currentTotalBytesRead; @@ -31,7 +32,7 @@ export function onInactivitySeconds( checkInactivity(); } - // Check for inactivity after starting + // Check clock events for inactivity after starting emitter.on("clock", (clock) => { lastUpdate = now(); if (hasStopBlock && clock.number >= emitter.request.stopBlockNum - 1n) { @@ -39,6 +40,7 @@ export function onInactivitySeconds( } }); + // Check progress events for inactivity after starting emitter.on("progress", (progress) => { if (progress.processedBytes) { currentTotalBytesRead = progress.processedBytes.totalBytesRead; From a49d0282105290257426997ba7dcddc556c12e0a Mon Sep 17 00:00:00 2001 From: Denis Date: Mon, 12 Feb 2024 10:08:18 -0500 Subject: [PATCH 3/5] Update inactivitySeconds.ts --- src/inactivitySeconds.ts | 86 ++++++++++++++++++++-------------------- 1 file changed, 43 insertions(+), 43 deletions(-) diff --git a/src/inactivitySeconds.ts b/src/inactivitySeconds.ts index 2da845a..8b91d2f 100644 --- a/src/inactivitySeconds.ts +++ b/src/inactivitySeconds.ts @@ -5,51 +5,51 @@ import { logger } from "./logger.js"; const CHECK_INACTIVITY_INTERVAL = 1000; export function onInactivitySeconds( - emitter: BlockEmitter, - inactivitySeconds: number, - hasStopBlock: boolean + emitter: BlockEmitter, + inactivitySeconds: number, + hasStopBlock: boolean ) { - let lastUpdate = now(); - let isFinished = false; - let lastTotalBytesRead = 0n; - let currentTotalBytesRead = 0n; - - async function checkInactivity() { - // Refresh lastUpdate/lastTotalBytesRead if totalBytesRead is increasing - if (currentTotalBytesRead > lastTotalBytesRead) { - lastUpdate = now(); - lastTotalBytesRead = currentTotalBytesRead; - } - - if (now() - lastUpdate > inactivitySeconds) { - logger.error( - `Process will exit due to inactivity for ${inactivitySeconds} seconds` - ); - process.exit(1); // force quit - } - if (isFinished) return; // exit out of the loop - await setTimeout(CHECK_INACTIVITY_INTERVAL); - checkInactivity(); - } - - // Check clock events for inactivity after starting - emitter.on("clock", (clock) => { - lastUpdate = now(); - if (hasStopBlock && clock.number >= emitter.request.stopBlockNum - 1n) { - isFinished = true; - } - }); - - // Check progress events for inactivity after starting - emitter.on("progress", (progress) => { - if (progress.processedBytes) { - currentTotalBytesRead = progress.processedBytes.totalBytesRead; - } - }); - - checkInactivity(); + let lastUpdate = now(); + let isFinished = false; + let lastTotalBytesRead = 0n; + let currentTotalBytesRead = 0n; + + async function checkInactivity() { + // Refresh lastUpdate/lastTotalBytesRead if totalBytesRead is increasing + if (currentTotalBytesRead > lastTotalBytesRead) { + lastUpdate = now(); + lastTotalBytesRead = currentTotalBytesRead; + } + + if (now() - lastUpdate > inactivitySeconds) { + logger.error( + `Process will exit due to inactivity for ${inactivitySeconds} seconds` + ); + process.exit(1); // force quit + } + if (isFinished) return; // exit out of the loop + await setTimeout(CHECK_INACTIVITY_INTERVAL); + checkInactivity(); + } + + // Check clock events for inactivity after starting + emitter.on("clock", (clock) => { + lastUpdate = now(); + if (hasStopBlock && clock.number >= emitter.request.stopBlockNum - 1n) { + isFinished = true; + } + }); + + // Check progress events for inactivity after starting + emitter.on("progress", (progress) => { + if (progress.processedBytes) { + currentTotalBytesRead = progress.processedBytes.totalBytesRead; + } + }); + + checkInactivity(); } export function now() { - return Math.floor(new Date().getTime() / 1000); // in seconds + return Math.floor(new Date().getTime() / 1000); // in seconds } From 151b89baa04c76c843e174d245a056890a65dc2d Mon Sep 17 00:00:00 2001 From: Denis Date: Mon, 12 Feb 2024 10:09:51 -0500 Subject: [PATCH 4/5] Update inactivitySeconds.ts --- src/inactivitySeconds.ts | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/src/inactivitySeconds.ts b/src/inactivitySeconds.ts index 8b91d2f..df570e7 100644 --- a/src/inactivitySeconds.ts +++ b/src/inactivitySeconds.ts @@ -4,11 +4,7 @@ import { logger } from "./logger.js"; const CHECK_INACTIVITY_INTERVAL = 1000; -export function onInactivitySeconds( - emitter: BlockEmitter, - inactivitySeconds: number, - hasStopBlock: boolean -) { +export function onInactivitySeconds(emitter: BlockEmitter, inactivitySeconds: number, hasStopBlock: boolean) { let lastUpdate = now(); let isFinished = false; let lastTotalBytesRead = 0n; @@ -22,9 +18,7 @@ export function onInactivitySeconds( } if (now() - lastUpdate > inactivitySeconds) { - logger.error( - `Process will exit due to inactivity for ${inactivitySeconds} seconds` - ); + logger.error(`Process will exit due to inactivity for ${inactivitySeconds} seconds`); process.exit(1); // force quit } if (isFinished) return; // exit out of the loop From 5a2d7ca23e8a858286e18d45f1e5df4d3ac1666a Mon Sep 17 00:00:00 2001 From: Denis Date: Mon, 12 Feb 2024 12:28:47 -0500 Subject: [PATCH 5/5] Update inactivitySeconds.ts --- src/inactivitySeconds.ts | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/src/inactivitySeconds.ts b/src/inactivitySeconds.ts index df570e7..0eaf034 100644 --- a/src/inactivitySeconds.ts +++ b/src/inactivitySeconds.ts @@ -1,22 +1,15 @@ import type { BlockEmitter } from "@substreams/node"; import { setTimeout } from "timers/promises"; import { logger } from "./logger.js"; +import { substreams_sink_progress_message } from "./prometheus.js"; const CHECK_INACTIVITY_INTERVAL = 1000; export function onInactivitySeconds(emitter: BlockEmitter, inactivitySeconds: number, hasStopBlock: boolean) { let lastUpdate = now(); let isFinished = false; - let lastTotalBytesRead = 0n; - let currentTotalBytesRead = 0n; async function checkInactivity() { - // Refresh lastUpdate/lastTotalBytesRead if totalBytesRead is increasing - if (currentTotalBytesRead > lastTotalBytesRead) { - lastUpdate = now(); - lastTotalBytesRead = currentTotalBytesRead; - } - if (now() - lastUpdate > inactivitySeconds) { logger.error(`Process will exit due to inactivity for ${inactivitySeconds} seconds`); process.exit(1); // force quit @@ -34,10 +27,26 @@ export function onInactivitySeconds(emitter: BlockEmitter, inactivitySeconds: nu } }); + emitter.on("close", error => { + if ( error ) { + console.error(error); + process.exit(1); // force quit + } + lastUpdate = now(); + isFinished = true; + }); + + emitter.on("fatalError", error => { + console.error(error); + process.exit(1); // force quit + }); + // Check progress events for inactivity after starting emitter.on("progress", (progress) => { - if (progress.processedBytes) { - currentTotalBytesRead = progress.processedBytes.totalBytesRead; + const totalBytesRead = Number(progress.processedBytes?.totalBytesRead ?? 0); + if (totalBytesRead > 0) { + lastUpdate = now(); + substreams_sink_progress_message?.inc(totalBytesRead); } });