diff --git a/src/index.ts b/src/index.ts index 00afde4..e5c64ac 100644 --- a/src/index.ts +++ b/src/index.ts @@ -137,19 +137,52 @@ createConnection(ormConfig as ConnectionOptions) process.exit(1); }); -async function schedule(connection: Connection | null, producer: Producer | null, func: any, funcName: string) { - const start = new Date().getTime(); - await func(connection, producer); - const end = new Date().getTime(); - const duration = end - start; - let wait: number; - if (duration > SECONDS_BETWEEN_RUNS * 1000) { - wait = 0; - logger.warn(`${funcName} is taking longer than desiered interval`); - } else { - wait = SECONDS_BETWEEN_RUNS * 1000 - duration; +async function schedule( + connection: Connection | null, + producer: Producer | null, + func: any, + funcName: string, + consecutiveErrors: number = 0 +) { + try { + const start = new Date().getTime(); + await func(connection, producer); + const end = new Date().getTime(); + const duration = end - start; + + if (consecutiveErrors > 0) { + logger.info(`${funcName} recovered after ${consecutiveErrors} consecutive errors`); + } + + let wait: number; + if (duration > SECONDS_BETWEEN_RUNS * 1000) { + wait = 0; + logger.warn(`${funcName} is taking longer than desired interval`); + } else { + wait = SECONDS_BETWEEN_RUNS * 1000 - duration; + } + setTimeout(() => { + schedule(connection, producer, func, funcName, 0); + }, wait); + + } catch (err: any) { + consecutiveErrors++; + + const errorMessage = err?.message || err?.toString() || 'Unknown error'; + + logger.error(`Error in ${funcName} (consecutive error #${consecutiveErrors}): ${errorMessage}`); + + const MAX_CONSECUTIVE_ERRORS = 3; + if (consecutiveErrors >= MAX_CONSECUTIVE_ERRORS) { + logger.fatal(`${funcName} failed ${MAX_CONSECUTIVE_ERRORS} times consecutively. Stopping.`); + process.exit(1); + } + + const backoffDelay = SECONDS_BETWEEN_RUNS * 1000 * Math.pow(2, consecutiveErrors); + + logger.warn(`Retrying ${funcName} in ${backoffDelay / 1000}s (consecutive errors: ${consecutiveErrors})`); + setTimeout(() => { + schedule(connection, producer, func, funcName, consecutiveErrors); + }, backoffDelay); } - setTimeout(() => { - schedule(connection, producer, func, funcName); - }, wait); }