diff --git a/src/baseTrie.ts b/src/baseTrie.ts index fbb660f..59d4bd8 100644 --- a/src/baseTrie.ts +++ b/src/baseTrie.ts @@ -255,7 +255,7 @@ export class Trie { const childRef = child[1] as Buffer const childKey = key.concat(keyExtension) const priority = childKey.length - taskExecutor.execute(priority, async (taskCallback: Function) => { + await taskExecutor.execute(priority, async (taskCallback: Function) => { const childNode = await self._lookupNode(childRef) taskCallback() if (childNode) { @@ -275,7 +275,7 @@ export class Trie { const childKey = key.slice() childKey.push(childIndex) const priority = childKey.length - taskExecutor.execute(priority, async (taskCallback: Function) => { + await taskExecutor.execute(priority, async (taskCallback: Function) => { const childNode = await self._lookupNode(childRef) taskCallback() if (childNode) { diff --git a/src/prioritizedTaskExecutor.ts b/src/prioritizedTaskExecutor.ts index d7e34f4..b38260a 100644 --- a/src/prioritizedTaskExecutor.ts +++ b/src/prioritizedTaskExecutor.ts @@ -1,3 +1,5 @@ +import Semaphore from 'semaphore-async-await' + interface Task { priority: number fn: Function @@ -10,6 +12,8 @@ export class PrioritizedTaskExecutor { private currentPoolSize: number /** The task queue */ private queue: Task[] + /** The Lock */ + private lock: Semaphore /** * Executes tasks up to maxPoolSize at a time, other items are put in a priority queue. @@ -21,6 +25,7 @@ export class PrioritizedTaskExecutor { this.maxPoolSize = maxPoolSize this.currentPoolSize = 0 this.queue = [] + this.lock = new Semaphore(1) } /** @@ -29,19 +34,73 @@ export class PrioritizedTaskExecutor { * @param priority The priority of the task * @param fn The function that accepts the callback, which must be called upon the task completion. */ - execute(priority: number, fn: Function) { - if (this.currentPoolSize < this.maxPoolSize) { - this.currentPoolSize++ - fn(() => { - this.currentPoolSize-- - if (this.queue.length > 0) { - this.queue.sort((a, b) => b.priority - a.priority) - const item = this.queue.shift() - this.execute(item!.priority, item!.fn) + async execute(priority: number, fn: Function) { + let self = this + let runTask = () => { + self.currentPoolSize++ + fn(async () => { + // this callback function can be `await`ed + self.currentPoolSize-- + if (self.queue.length > 0) { + await self.lock.acquire() // we need the lock to be unlocked, because we are editing the queue. + const item = self.queue.shift() + await self.lock.signal() + await self.execute(item!.priority, item!.fn) } }) + } + if (this.currentPoolSize < this.maxPoolSize) { + runTask() } else { - this.queue.push({ priority, fn }) + await this.lock.wait() + if (this.queue.length == 0) { + this.queue.push({ priority, fn }) + } else { + // insert the item in the queue using binary search + let left = 0 + let right = this.queue.length + let mid = () => { + return Math.floor(left + (right - left) / 2) + } + let insert = (index: number) => { + if (this.currentPoolSize < this.maxPoolSize) { + runTask() + } else { + this.queue.splice(index, 0, { priority, fn }) + } + } + while (true) { + // note that there is a special case: it could be that during sorting, a Task is finished (reducing currentPoolSize by 1), but this Task was not yet inserted + // therefore, if we want to insert the item we explicitly check that we indeed should Queue it, if not, we execute it and do not insert it. + let index = mid() + let value = this.queue[index].priority + if (value == priority) { + // we have found the priority value and can thus insert the task at this index. + insert(index) + break + } + if (left == right) { + // we have only one element left, this means that the items left of this index have a higher priority and the items right have a lower priority + // we thus insert it at this index + insert(left) + break + } + if (value > priority) { + // we know everything left of the index has a higher priority, so we do not need to consider these items anymore + left = index + 1 + } else { + // we know everything right of the index has a lowre priority, so we do not need to consider these items anymore + right = index - 1 + } + if (left > right) { + // this could happen, for instance, if left = 0 right = 1 (the index is 0) and the item has a lower priority. + // then right = -1, so we should insert at the left index + insert(left) + break + } + } + } + this.lock.signal() } } diff --git a/test/prioritizedTaskExecutor.spec.ts b/test/prioritizedTaskExecutor.spec.ts index 36c9651..f283457 100644 --- a/test/prioritizedTaskExecutor.spec.ts +++ b/test/prioritizedTaskExecutor.spec.ts @@ -1,24 +1,39 @@ import * as tape from 'tape' import { PrioritizedTaskExecutor } from '../src/prioritizedTaskExecutor' -const taskExecutor = new PrioritizedTaskExecutor(2) +tape('prioritized task executor test', function (t: any) { + t.test('should execute tasks in the right order', async (st: any) => { + const taskExecutor = new PrioritizedTaskExecutor(2) + const tasks = [1, 2, 3, 4] + const callbacks = [] as any + const executionOrder = [] as any + for (let task of tasks) { + await taskExecutor.execute(task, function (cb: Function) { + executionOrder.push(task) + callbacks.push(cb) + }) + } -tape('prioritized task executor test', function (t) { - const tasks = [1, 2, 3, 4] - const callbacks = [] as any - const executionOrder = [] as any - tasks.forEach(function (task) { - taskExecutor.execute(task, function (cb: Function) { - executionOrder.push(task) - callbacks.push(cb) - }) - }) + for (let callback of callbacks) { + await callback() + } - callbacks.forEach(function (callback: Function) { - callback() + const expectedExecutionOrder = [1, 2, 4, 3] + st.deepEqual(executionOrder, expectedExecutionOrder) + st.end() }) - const expectedExecutionOrder = [1, 2, 4, 3] - t.deepEqual(executionOrder, expectedExecutionOrder) - t.end() + t.test('should queue tasks in the right order', async (st: any) => { + const priorityList = [0, 1, 0, 2, 0, 1, 0, 2, 2, 1] + const PTE = new PrioritizedTaskExecutor(0) // this ensures that no task actually gets executed, so this essentially just checks the sort algorithm + for (let priority of priorityList) { + await PTE.execute(priority, () => {}) + } + // have to cast the PTE as to access the private queue + st.deepEqual( + (PTE).queue.map((task: any) => task.priority), + priorityList.sort((a: any, b: any) => b - a), + ) + st.end() + }) })