Skip to content

Commit

Permalink
clean all redis before running any jobs (#40)
Browse files Browse the repository at this point in the history
  • Loading branch information
wjrjerome authored Dec 14, 2023
1 parent f0381ce commit defc363
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 0 deletions.
3 changes: 3 additions & 0 deletions src/processors/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ export class Processors implements ProcessorInterface {
}

async reset() {
// Reset all the queues first
await Promise.all(_.map(this.processors, async(p) => await p.clean()));

const modes: Mode[] = await this.getRunningModes();
// Depending on the mode, we choose different processor to work on
modes.map(async (m) => {
Expand Down
4 changes: 4 additions & 0 deletions src/processors/lite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ export class Lite implements ProcessorInterface {
return this;
};

async clean(): Promise<void> {
await this.queue.obliterate({ force: true });
}

// In lite mode, the reset does nothing other than just trigger the jobs. We can trigger it multiple time, it has no effect
async reset(): Promise<void> {
await this.queue.add({}, { jobId: NAME, repeat: { cron: config.cronJob.liteJobExpression}});
Expand Down
4 changes: 4 additions & 0 deletions src/processors/standard.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ export class Standard implements ProcessorInterface {
return this;
}

async clean(): Promise<void> {
await this.queue.obliterate({ force: true });
}

// Reset and start the state sync until success
async reset() {
try {
Expand Down
4 changes: 4 additions & 0 deletions src/processors/zero.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ export class Zero implements ProcessorInterface {
async reset(): Promise<void> {
await this.queue.add({}, REPEAT_JOB_OPT);
}

async clean(): Promise<void> {
await this.queue.obliterate({ force: true });
}

async processEvent() {
const payloads = await this.zeroService.getPayloads();
Expand Down

0 comments on commit defc363

Please sign in to comment.