Skip to content
This repository has been archived by the owner on Jan 19, 2021. It is now read-only.

Use binary search in the prioritized task executor #129

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/baseTrie.ts
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ export class Trie {
const childRef = child[1] as Buffer
const childKey = key.concat(keyExtension)
const priority = childKey.length
taskExecutor.execute(priority, async (taskCallback: Function) => {
await taskExecutor.execute(priority, async (taskCallback: Function) => {
const childNode = await self._lookupNode(childRef)
taskCallback()
if (childNode) {
Expand All @@ -275,7 +275,7 @@ export class Trie {
const childKey = key.slice()
childKey.push(childIndex)
const priority = childKey.length
taskExecutor.execute(priority, async (taskCallback: Function) => {
await taskExecutor.execute(priority, async (taskCallback: Function) => {
const childNode = await self._lookupNode(childRef)
taskCallback()
if (childNode) {
Expand Down
79 changes: 69 additions & 10 deletions src/prioritizedTaskExecutor.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import Semaphore from 'semaphore-async-await'

interface Task {
priority: number
fn: Function
Expand All @@ -10,6 +12,8 @@ export class PrioritizedTaskExecutor {
private currentPoolSize: number
/** The task queue */
private queue: Task[]
/** The Lock */
private lock: Semaphore

/**
* Executes tasks up to maxPoolSize at a time, other items are put in a priority queue.
Expand All @@ -21,6 +25,7 @@ export class PrioritizedTaskExecutor {
this.maxPoolSize = maxPoolSize
this.currentPoolSize = 0
this.queue = []
this.lock = new Semaphore(1)
}

/**
Expand All @@ -29,19 +34,73 @@ export class PrioritizedTaskExecutor {
* @param priority The priority of the task
* @param fn The function that accepts the callback, which must be called upon the task completion.
*/
execute(priority: number, fn: Function) {
if (this.currentPoolSize < this.maxPoolSize) {
this.currentPoolSize++
fn(() => {
this.currentPoolSize--
if (this.queue.length > 0) {
this.queue.sort((a, b) => b.priority - a.priority)
const item = this.queue.shift()
this.execute(item!.priority, item!.fn)
async execute(priority: number, fn: Function) {
let self = this
let runTask = () => {
self.currentPoolSize++
fn(async () => {
// this callback function can be `await`ed
self.currentPoolSize--
if (self.queue.length > 0) {
await self.lock.acquire() // we need the lock to be unlocked, because we are editing the queue.
const item = self.queue.shift()
await self.lock.signal()
await self.execute(item!.priority, item!.fn)
}
})
}
if (this.currentPoolSize < this.maxPoolSize) {
runTask()
} else {
this.queue.push({ priority, fn })
await this.lock.wait()
if (this.queue.length == 0) {
this.queue.push({ priority, fn })
} else {
// insert the item in the queue using binary search
let left = 0
let right = this.queue.length
let mid = () => {
return Math.floor(left + (right - left) / 2)
}
let insert = (index: number) => {
if (this.currentPoolSize < this.maxPoolSize) {
runTask()
} else {
this.queue.splice(index, 0, { priority, fn })
}
}
while (true) {
// note that there is a special case: it could be that during sorting, a Task is finished (reducing currentPoolSize by 1), but this Task was not yet inserted
// therefore, if we want to insert the item we explicitly check that we indeed should Queue it, if not, we execute it and do not insert it.
let index = mid()
let value = this.queue[index].priority
if (value == priority) {
// we have found the priority value and can thus insert the task at this index.
insert(index)
break
}
if (left == right) {
// we have only one element left, this means that the items left of this index have a higher priority and the items right have a lower priority
// we thus insert it at this index
insert(left)
break
}
if (value > priority) {
// we know everything left of the index has a higher priority, so we do not need to consider these items anymore
left = index + 1
} else {
// we know everything right of the index has a lowre priority, so we do not need to consider these items anymore
right = index - 1
}
if (left > right) {
// this could happen, for instance, if left = 0 right = 1 (the index is 0) and the item has a lower priority.
// then right = -1, so we should insert at the left index
insert(left)
break
}
}
}
this.lock.signal()
}
}

Expand Down
47 changes: 31 additions & 16 deletions test/prioritizedTaskExecutor.spec.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,39 @@
import * as tape from 'tape'
import { PrioritizedTaskExecutor } from '../src/prioritizedTaskExecutor'

const taskExecutor = new PrioritizedTaskExecutor(2)
tape('prioritized task executor test', function (t: any) {
t.test('should execute tasks in the right order', async (st: any) => {
const taskExecutor = new PrioritizedTaskExecutor(2)
const tasks = [1, 2, 3, 4]
const callbacks = [] as any
const executionOrder = [] as any
for (let task of tasks) {
await taskExecutor.execute(task, function (cb: Function) {
executionOrder.push(task)
callbacks.push(cb)
})
}

tape('prioritized task executor test', function (t) {
const tasks = [1, 2, 3, 4]
const callbacks = [] as any
const executionOrder = [] as any
tasks.forEach(function (task) {
taskExecutor.execute(task, function (cb: Function) {
executionOrder.push(task)
callbacks.push(cb)
})
})
for (let callback of callbacks) {
await callback()
}

callbacks.forEach(function (callback: Function) {
callback()
const expectedExecutionOrder = [1, 2, 4, 3]
st.deepEqual(executionOrder, expectedExecutionOrder)
st.end()
})

const expectedExecutionOrder = [1, 2, 4, 3]
t.deepEqual(executionOrder, expectedExecutionOrder)
t.end()
t.test('should queue tasks in the right order', async (st: any) => {
const priorityList = [0, 1, 0, 2, 0, 1, 0, 2, 2, 1]
const PTE = new PrioritizedTaskExecutor(0) // this ensures that no task actually gets executed, so this essentially just checks the sort algorithm
for (let priority of priorityList) {
await PTE.execute(priority, () => {})
}
// have to cast the PTE as <any> to access the private queue
st.deepEqual(
(<any>PTE).queue.map((task: any) => task.priority),
priorityList.sort((a: any, b: any) => b - a),
)
st.end()
})
})