From 9b5bbf84496f55a5c41a5e93569bd0bdeb21c675 Mon Sep 17 00:00:00 2001 From: Jochem Brouwer Date: Sun, 6 Sep 2020 21:01:43 +0200 Subject: [PATCH] add lock --- src/baseTrie.ts | 4 +-- src/prioritizedTaskExecutor.ts | 42 ++++++++++++++++++++-------- test/prioritizedTaskExecutor.spec.ts | 22 +++++++-------- 3 files changed, 44 insertions(+), 24 deletions(-) diff --git a/src/baseTrie.ts b/src/baseTrie.ts index fbb660f..59d4bd8 100644 --- a/src/baseTrie.ts +++ b/src/baseTrie.ts @@ -255,7 +255,7 @@ export class Trie { const childRef = child[1] as Buffer const childKey = key.concat(keyExtension) const priority = childKey.length - taskExecutor.execute(priority, async (taskCallback: Function) => { + await taskExecutor.execute(priority, async (taskCallback: Function) => { const childNode = await self._lookupNode(childRef) taskCallback() if (childNode) { @@ -275,7 +275,7 @@ export class Trie { const childKey = key.slice() childKey.push(childIndex) const priority = childKey.length - taskExecutor.execute(priority, async (taskCallback: Function) => { + await taskExecutor.execute(priority, async (taskCallback: Function) => { const childNode = await self._lookupNode(childRef) taskCallback() if (childNode) { diff --git a/src/prioritizedTaskExecutor.ts b/src/prioritizedTaskExecutor.ts index a8ada41..ad88b6d 100644 --- a/src/prioritizedTaskExecutor.ts +++ b/src/prioritizedTaskExecutor.ts @@ -1,3 +1,5 @@ +import Semaphore from 'semaphore-async-await' + interface Task { priority: number fn: Function @@ -10,6 +12,8 @@ export class PrioritizedTaskExecutor { private currentPoolSize: number /** The task queue */ private queue: Task[] + /** The Lock */ + private lock: Semaphore /** * Executes tasks up to maxPoolSize at a time, other items are put in a priority queue. @@ -21,6 +25,7 @@ export class PrioritizedTaskExecutor { this.maxPoolSize = maxPoolSize this.currentPoolSize = 0 this.queue = [] + this.lock = new Semaphore(1) } /** @@ -29,17 +34,22 @@ export class PrioritizedTaskExecutor { * @param priority The priority of the task * @param fn The function that accepts the callback, which must be called upon the task completion. */ - execute(priority: number, fn: Function) { - if (this.currentPoolSize < this.maxPoolSize) { - this.currentPoolSize++ - fn(() => { - this.currentPoolSize-- - if (this.queue.length > 0) { - const item = this.queue.shift() - this.execute(item!.priority, item!.fn) + async execute(priority: number, fn: Function) { + let self = this + function runTask() { + self.currentPoolSize++ + fn(async () => { + self.currentPoolSize-- + if (self.queue.length > 0) { + const item = self.queue.shift() + await self.execute(item!.priority, item!.fn) } }) + } + if (this.currentPoolSize < this.maxPoolSize) { + runTask() } else { + await this.lock.wait() if (this.queue.length == 0) { this.queue.push({ priority, fn }) } else { @@ -50,15 +60,24 @@ export class PrioritizedTaskExecutor { return Math.floor(left + (right - left) / 2) } while (true) { + // note that there is a special case: it could be that during sorting, a Task is finished (reducing currentPoolSize by 1), but this Task was not yet inserted + // therefore, if we want to insert the item we explicitly check that we indeed should Queue it, if not, we execute it and do not insert it. let index = mid() let value = this.queue[index].priority - console.log(left, right, index, value) if (value == priority) { - this.queue.splice(index, 0, { priority, fn }) + if (this.currentPoolSize < this.maxPoolSize) { + runTask() + } else { + this.queue.splice(index, 0, { priority, fn }) + } break } if (left == right) { - this.queue.splice(left, 0, { priority, fn }) + if (this.currentPoolSize < this.maxPoolSize) { + runTask() + } else { + this.queue.splice(left, 0, { priority, fn }) + } break } @@ -69,6 +88,7 @@ export class PrioritizedTaskExecutor { } } } + this.lock.signal() } } diff --git a/test/prioritizedTaskExecutor.spec.ts b/test/prioritizedTaskExecutor.spec.ts index dac2b7d..f283457 100644 --- a/test/prioritizedTaskExecutor.spec.ts +++ b/test/prioritizedTaskExecutor.spec.ts @@ -2,33 +2,33 @@ import * as tape from 'tape' import { PrioritizedTaskExecutor } from '../src/prioritizedTaskExecutor' tape('prioritized task executor test', function (t: any) { - t.test('should execute tasks in the right order', (st: any) => { + t.test('should execute tasks in the right order', async (st: any) => { const taskExecutor = new PrioritizedTaskExecutor(2) const tasks = [1, 2, 3, 4] const callbacks = [] as any const executionOrder = [] as any - tasks.forEach(function (task) { - taskExecutor.execute(task, function (cb: Function) { + for (let task of tasks) { + await taskExecutor.execute(task, function (cb: Function) { executionOrder.push(task) callbacks.push(cb) }) - }) + } - callbacks.forEach(function (callback: Function) { - callback() - }) + for (let callback of callbacks) { + await callback() + } const expectedExecutionOrder = [1, 2, 4, 3] st.deepEqual(executionOrder, expectedExecutionOrder) st.end() }) - t.test('should queue tasks in the right order', (st: any) => { + t.test('should queue tasks in the right order', async (st: any) => { const priorityList = [0, 1, 0, 2, 0, 1, 0, 2, 2, 1] const PTE = new PrioritizedTaskExecutor(0) // this ensures that no task actually gets executed, so this essentially just checks the sort algorithm - priorityList.map((priority) => { - PTE.execute(priority, () => {}) - }) + for (let priority of priorityList) { + await PTE.execute(priority, () => {}) + } // have to cast the PTE as to access the private queue st.deepEqual( (PTE).queue.map((task: any) => task.priority),