From 1ad45b1dc7d7b9b989722815dc663c4895307ded Mon Sep 17 00:00:00 2001 From: Jianrong Date: Tue, 12 Dec 2023 19:55:51 +1100 Subject: [PATCH] clean all redis before running any jobs --- src/processors/index.ts | 3 +++ src/processors/lite.ts | 4 ++++ src/processors/standard.ts | 4 ++++ src/processors/zero.ts | 4 ++++ 4 files changed, 15 insertions(+) diff --git a/src/processors/index.ts b/src/processors/index.ts index 1c2dd2c..e0fc282 100644 --- a/src/processors/index.ts +++ b/src/processors/index.ts @@ -42,6 +42,9 @@ export class Processors implements ProcessorInterface { } async reset() { + // Reset all the queues first + await Promise.all(_.map(this.processors, async(p) => await p.clean())); + const modes: Mode[] = await this.getRunningModes(); // Depending on the mode, we choose different processor to work on modes.map(async (m) => { diff --git a/src/processors/lite.ts b/src/processors/lite.ts index cc5b199..de7bfae 100644 --- a/src/processors/lite.ts +++ b/src/processors/lite.ts @@ -37,6 +37,10 @@ export class Lite implements ProcessorInterface { return this; }; + async clean(): Promise { + await this.queue.obliterate({ force: true }); + } + // In lite mode, the reset does nothing other than just trigger the jobs. We can trigger it multiple time, it has no effect async reset(): Promise { await this.queue.add({}, { jobId: NAME, repeat: { cron: config.cronJob.liteJobExpression}}); diff --git a/src/processors/standard.ts b/src/processors/standard.ts index d7b2f95..a4329d8 100644 --- a/src/processors/standard.ts +++ b/src/processors/standard.ts @@ -46,6 +46,10 @@ export class Standard implements ProcessorInterface { return this; } + async clean(): Promise { + await this.queue.obliterate({ force: true }); + } + // Reset and start the state sync until success async reset() { try { diff --git a/src/processors/zero.ts b/src/processors/zero.ts index b90859d..3dd8e6c 100644 --- a/src/processors/zero.ts +++ b/src/processors/zero.ts @@ -42,6 +42,10 @@ export class Zero implements ProcessorInterface { async reset(): Promise { await this.queue.add({}, REPEAT_JOB_OPT); } + + async clean(): Promise { + await this.queue.obliterate({ force: true }); + } async processEvent() { const payloads = await this.zeroService.getPayloads();