From 49beeca875463825b4e7be58c8d376351e78f80a Mon Sep 17 00:00:00 2001 From: Jochem Brouwer Date: Sun, 6 Sep 2020 20:47:42 +0200 Subject: [PATCH 1/4] use binary search in the prioritized task executor --- src/prioritizedTaskExecutor.ts | 31 +++++++++++++++++-- test/prioritizedTaskExecutor.spec.ts | 45 ++++++++++++++++++---------- 2 files changed, 59 insertions(+), 17 deletions(-) diff --git a/src/prioritizedTaskExecutor.ts b/src/prioritizedTaskExecutor.ts index d7e34f4..a8ada41 100644 --- a/src/prioritizedTaskExecutor.ts +++ b/src/prioritizedTaskExecutor.ts @@ -35,13 +35,40 @@ export class PrioritizedTaskExecutor { fn(() => { this.currentPoolSize-- if (this.queue.length > 0) { - this.queue.sort((a, b) => b.priority - a.priority) const item = this.queue.shift() this.execute(item!.priority, item!.fn) } }) } else { - this.queue.push({ priority, fn }) + if (this.queue.length == 0) { + this.queue.push({ priority, fn }) + } else { + // insert the item in the queue using binary search + let left = 0 + let right = this.queue.length + let mid = () => { + return Math.floor(left + (right - left) / 2) + } + while (true) { + let index = mid() + let value = this.queue[index].priority + console.log(left, right, index, value) + if (value == priority) { + this.queue.splice(index, 0, { priority, fn }) + break + } + if (left == right) { + this.queue.splice(left, 0, { priority, fn }) + break + } + + if (value > priority) { + left = index + } else { + right = index + } + } + } } } diff --git a/test/prioritizedTaskExecutor.spec.ts b/test/prioritizedTaskExecutor.spec.ts index 36c9651..dac2b7d 100644 --- a/test/prioritizedTaskExecutor.spec.ts +++ b/test/prioritizedTaskExecutor.spec.ts @@ -1,24 +1,39 @@ import * as tape from 'tape' import { PrioritizedTaskExecutor } from '../src/prioritizedTaskExecutor' -const taskExecutor = new PrioritizedTaskExecutor(2) +tape('prioritized task executor test', function (t: any) { + t.test('should execute tasks in the right order', (st: any) => { + const taskExecutor = new PrioritizedTaskExecutor(2) + const tasks = [1, 2, 3, 4] + const callbacks = [] as any + const executionOrder = [] as any + tasks.forEach(function (task) { + taskExecutor.execute(task, function (cb: Function) { + executionOrder.push(task) + callbacks.push(cb) + }) + }) -tape('prioritized task executor test', function (t) { - const tasks = [1, 2, 3, 4] - const callbacks = [] as any - const executionOrder = [] as any - tasks.forEach(function (task) { - taskExecutor.execute(task, function (cb: Function) { - executionOrder.push(task) - callbacks.push(cb) + callbacks.forEach(function (callback: Function) { + callback() }) - }) - callbacks.forEach(function (callback: Function) { - callback() + const expectedExecutionOrder = [1, 2, 4, 3] + st.deepEqual(executionOrder, expectedExecutionOrder) + st.end() }) - const expectedExecutionOrder = [1, 2, 4, 3] - t.deepEqual(executionOrder, expectedExecutionOrder) - t.end() + t.test('should queue tasks in the right order', (st: any) => { + const priorityList = [0, 1, 0, 2, 0, 1, 0, 2, 2, 1] + const PTE = new PrioritizedTaskExecutor(0) // this ensures that no task actually gets executed, so this essentially just checks the sort algorithm + priorityList.map((priority) => { + PTE.execute(priority, () => {}) + }) + // have to cast the PTE as to access the private queue + st.deepEqual( + (PTE).queue.map((task: any) => task.priority), + priorityList.sort((a: any, b: any) => b - a), + ) + st.end() + }) }) From 9b5bbf84496f55a5c41a5e93569bd0bdeb21c675 Mon Sep 17 00:00:00 2001 From: Jochem Brouwer Date: Sun, 6 Sep 2020 21:01:43 +0200 Subject: [PATCH 2/4] add lock --- src/baseTrie.ts | 4 +-- src/prioritizedTaskExecutor.ts | 42 ++++++++++++++++++++-------- test/prioritizedTaskExecutor.spec.ts | 22 +++++++-------- 3 files changed, 44 insertions(+), 24 deletions(-) diff --git a/src/baseTrie.ts b/src/baseTrie.ts index fbb660f..59d4bd8 100644 --- a/src/baseTrie.ts +++ b/src/baseTrie.ts @@ -255,7 +255,7 @@ export class Trie { const childRef = child[1] as Buffer const childKey = key.concat(keyExtension) const priority = childKey.length - taskExecutor.execute(priority, async (taskCallback: Function) => { + await taskExecutor.execute(priority, async (taskCallback: Function) => { const childNode = await self._lookupNode(childRef) taskCallback() if (childNode) { @@ -275,7 +275,7 @@ export class Trie { const childKey = key.slice() childKey.push(childIndex) const priority = childKey.length - taskExecutor.execute(priority, async (taskCallback: Function) => { + await taskExecutor.execute(priority, async (taskCallback: Function) => { const childNode = await self._lookupNode(childRef) taskCallback() if (childNode) { diff --git a/src/prioritizedTaskExecutor.ts b/src/prioritizedTaskExecutor.ts index a8ada41..ad88b6d 100644 --- a/src/prioritizedTaskExecutor.ts +++ b/src/prioritizedTaskExecutor.ts @@ -1,3 +1,5 @@ +import Semaphore from 'semaphore-async-await' + interface Task { priority: number fn: Function @@ -10,6 +12,8 @@ export class PrioritizedTaskExecutor { private currentPoolSize: number /** The task queue */ private queue: Task[] + /** The Lock */ + private lock: Semaphore /** * Executes tasks up to maxPoolSize at a time, other items are put in a priority queue. @@ -21,6 +25,7 @@ export class PrioritizedTaskExecutor { this.maxPoolSize = maxPoolSize this.currentPoolSize = 0 this.queue = [] + this.lock = new Semaphore(1) } /** @@ -29,17 +34,22 @@ export class PrioritizedTaskExecutor { * @param priority The priority of the task * @param fn The function that accepts the callback, which must be called upon the task completion. */ - execute(priority: number, fn: Function) { - if (this.currentPoolSize < this.maxPoolSize) { - this.currentPoolSize++ - fn(() => { - this.currentPoolSize-- - if (this.queue.length > 0) { - const item = this.queue.shift() - this.execute(item!.priority, item!.fn) + async execute(priority: number, fn: Function) { + let self = this + function runTask() { + self.currentPoolSize++ + fn(async () => { + self.currentPoolSize-- + if (self.queue.length > 0) { + const item = self.queue.shift() + await self.execute(item!.priority, item!.fn) } }) + } + if (this.currentPoolSize < this.maxPoolSize) { + runTask() } else { + await this.lock.wait() if (this.queue.length == 0) { this.queue.push({ priority, fn }) } else { @@ -50,15 +60,24 @@ export class PrioritizedTaskExecutor { return Math.floor(left + (right - left) / 2) } while (true) { + // note that there is a special case: it could be that during sorting, a Task is finished (reducing currentPoolSize by 1), but this Task was not yet inserted + // therefore, if we want to insert the item we explicitly check that we indeed should Queue it, if not, we execute it and do not insert it. let index = mid() let value = this.queue[index].priority - console.log(left, right, index, value) if (value == priority) { - this.queue.splice(index, 0, { priority, fn }) + if (this.currentPoolSize < this.maxPoolSize) { + runTask() + } else { + this.queue.splice(index, 0, { priority, fn }) + } break } if (left == right) { - this.queue.splice(left, 0, { priority, fn }) + if (this.currentPoolSize < this.maxPoolSize) { + runTask() + } else { + this.queue.splice(left, 0, { priority, fn }) + } break } @@ -69,6 +88,7 @@ export class PrioritizedTaskExecutor { } } } + this.lock.signal() } } diff --git a/test/prioritizedTaskExecutor.spec.ts b/test/prioritizedTaskExecutor.spec.ts index dac2b7d..f283457 100644 --- a/test/prioritizedTaskExecutor.spec.ts +++ b/test/prioritizedTaskExecutor.spec.ts @@ -2,33 +2,33 @@ import * as tape from 'tape' import { PrioritizedTaskExecutor } from '../src/prioritizedTaskExecutor' tape('prioritized task executor test', function (t: any) { - t.test('should execute tasks in the right order', (st: any) => { + t.test('should execute tasks in the right order', async (st: any) => { const taskExecutor = new PrioritizedTaskExecutor(2) const tasks = [1, 2, 3, 4] const callbacks = [] as any const executionOrder = [] as any - tasks.forEach(function (task) { - taskExecutor.execute(task, function (cb: Function) { + for (let task of tasks) { + await taskExecutor.execute(task, function (cb: Function) { executionOrder.push(task) callbacks.push(cb) }) - }) + } - callbacks.forEach(function (callback: Function) { - callback() - }) + for (let callback of callbacks) { + await callback() + } const expectedExecutionOrder = [1, 2, 4, 3] st.deepEqual(executionOrder, expectedExecutionOrder) st.end() }) - t.test('should queue tasks in the right order', (st: any) => { + t.test('should queue tasks in the right order', async (st: any) => { const priorityList = [0, 1, 0, 2, 0, 1, 0, 2, 2, 1] const PTE = new PrioritizedTaskExecutor(0) // this ensures that no task actually gets executed, so this essentially just checks the sort algorithm - priorityList.map((priority) => { - PTE.execute(priority, () => {}) - }) + for (let priority of priorityList) { + await PTE.execute(priority, () => {}) + } // have to cast the PTE as to access the private queue st.deepEqual( (PTE).queue.map((task: any) => task.priority), From 8f0df5dc7be39334f2738dee6b77cedeebfdf8e4 Mon Sep 17 00:00:00 2001 From: Jochem Brouwer Date: Mon, 7 Sep 2020 17:53:55 +0200 Subject: [PATCH 3/4] fix binary search --- src/prioritizedTaskExecutor.ts | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/src/prioritizedTaskExecutor.ts b/src/prioritizedTaskExecutor.ts index ad88b6d..5ed87e0 100644 --- a/src/prioritizedTaskExecutor.ts +++ b/src/prioritizedTaskExecutor.ts @@ -41,7 +41,9 @@ export class PrioritizedTaskExecutor { fn(async () => { self.currentPoolSize-- if (self.queue.length > 0) { + await self.lock.acquire() const item = self.queue.shift() + await self.lock.signal() await self.execute(item!.priority, item!.fn) } }) @@ -80,11 +82,18 @@ export class PrioritizedTaskExecutor { } break } - if (value > priority) { - left = index + left = index + 1 } else { - right = index + right = index - 1 + } + if (left > right) { + if (this.currentPoolSize < this.maxPoolSize) { + runTask() + } else { + this.queue.splice(left, 0, { priority, fn }) + } + break } } } From 4f0e1503f5324243df304e4fc76930e9821f69ba Mon Sep 17 00:00:00 2001 From: Jochem Brouwer Date: Mon, 7 Sep 2020 20:31:08 +0200 Subject: [PATCH 4/4] cleanup the code and document --- src/prioritizedTaskExecutor.ts | 37 ++++++++++++++++++---------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/src/prioritizedTaskExecutor.ts b/src/prioritizedTaskExecutor.ts index 5ed87e0..b38260a 100644 --- a/src/prioritizedTaskExecutor.ts +++ b/src/prioritizedTaskExecutor.ts @@ -36,12 +36,13 @@ export class PrioritizedTaskExecutor { */ async execute(priority: number, fn: Function) { let self = this - function runTask() { + let runTask = () => { self.currentPoolSize++ fn(async () => { + // this callback function can be `await`ed self.currentPoolSize-- if (self.queue.length > 0) { - await self.lock.acquire() + await self.lock.acquire() // we need the lock to be unlocked, because we are editing the queue. const item = self.queue.shift() await self.lock.signal() await self.execute(item!.priority, item!.fn) @@ -61,38 +62,40 @@ export class PrioritizedTaskExecutor { let mid = () => { return Math.floor(left + (right - left) / 2) } + let insert = (index: number) => { + if (this.currentPoolSize < this.maxPoolSize) { + runTask() + } else { + this.queue.splice(index, 0, { priority, fn }) + } + } while (true) { // note that there is a special case: it could be that during sorting, a Task is finished (reducing currentPoolSize by 1), but this Task was not yet inserted // therefore, if we want to insert the item we explicitly check that we indeed should Queue it, if not, we execute it and do not insert it. let index = mid() let value = this.queue[index].priority if (value == priority) { - if (this.currentPoolSize < this.maxPoolSize) { - runTask() - } else { - this.queue.splice(index, 0, { priority, fn }) - } + // we have found the priority value and can thus insert the task at this index. + insert(index) break } if (left == right) { - if (this.currentPoolSize < this.maxPoolSize) { - runTask() - } else { - this.queue.splice(left, 0, { priority, fn }) - } + // we have only one element left, this means that the items left of this index have a higher priority and the items right have a lower priority + // we thus insert it at this index + insert(left) break } if (value > priority) { + // we know everything left of the index has a higher priority, so we do not need to consider these items anymore left = index + 1 } else { + // we know everything right of the index has a lowre priority, so we do not need to consider these items anymore right = index - 1 } if (left > right) { - if (this.currentPoolSize < this.maxPoolSize) { - runTask() - } else { - this.queue.splice(left, 0, { priority, fn }) - } + // this could happen, for instance, if left = 0 right = 1 (the index is 0) and the item has a lower priority. + // then right = -1, so we should insert at the left index + insert(left) break } }