Skip to content

Commit

Permalink
🔀 Merge branch 'devel' into release
Browse files Browse the repository at this point in the history
- Performance improvement
  - Call 'reject' rather than throwing Error
  - Use Atomics
- README.md is updated
  - logo of badges are updated
- package.json is updated
  - 'badges' is added

Signed-off-by: kei-g <km.8k6ce+github@gmail.com>
  • Loading branch information
kei-g committed Aug 23, 2021
2 parents aa2ca7e + a706fb9 commit c281044
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 37 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# async-iterable-queue [![License](https://img.shields.io/github/license/kei-g/async-iterable-queue)](https://opensource.org/licenses/BSD-3-Clause) [![Libraries.io dependency status for latest release](https://img.shields.io/librariesio/release/npm/async-iterable-queue)](https://npmjs.com/package/async-iterable-queue?activeTab=dependencies) [![Travis CI](https://img.shields.io/travis/com/kei-g/async-iterable-queue?logo=travis)](https://www.travis-ci.com/github/kei-g/async-iterable-queue) [![npm](https://img.shields.io/npm/v/async-iterable-queue)](https://npmjs.com/package/async-iterable-queue)
# async-iterable-queue [![License](https://img.shields.io/github/license/kei-g/async-iterable-queue)](https://opensource.org/licenses/BSD-3-Clause) [![Libraries.io dependency status for latest release](https://img.shields.io/librariesio/release/npm/async-iterable-queue?logo=nodedotjs)](https://npmjs.com/package/async-iterable-queue?activeTab=dependencies) [![Travis CI](https://img.shields.io/travis/com/kei-g/async-iterable-queue?logo=travis)](https://www.travis-ci.com/github/kei-g/async-iterable-queue) [![npm](https://img.shields.io/npm/v/async-iterable-queue?logo=npm)](https://npmjs.com/package/async-iterable-queue)

[![npms.io (maintenance)](https://img.shields.io/npms-io/maintenance-score/async-iterable-queue)](https://npms.io/search?q=async-iterable-queue) [![npms.io (quality)](https://img.shields.io/npms-io/quality-score/async-iterable-queue)](https://npms.io/search?q=async-iterable-queue)
[![npms.io (maintenance)](https://img.shields.io/npms-io/maintenance-score/async-iterable-queue?logo=npm)](https://npms.io/search?q=async-iterable-queue) [![npms.io (quality)](https://img.shields.io/npms-io/quality-score/async-iterable-queue?logo=npm)](https://npms.io/search?q=async-iterable-queue)

Async Iterable Queue
66 changes: 36 additions & 30 deletions async-iterable-queue.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { EventEmitter } from 'stream'
import { assert } from 'console'

/**
* 非同期反復可能な先入れ先出し型の待ち行列への非同期反復子
Expand Down Expand Up @@ -39,7 +40,11 @@ class AIQAsyncIterator<T> implements AsyncIterator<T> {
/**
* 非同期反復可能な先入れ先出し型の待ち行列の状態を表す型
*/
type AIQState = 'ending' | 'finished'
enum AIQState {
ending = 1,
finished = 2,
undefined = 0,
}

/**
* 非同期反復可能な先入れ先出し型の待ち行列
Expand All @@ -63,18 +68,14 @@ export class AsyncIterableQueue<T> implements AsyncIterable<T> {
/**
* この待ち行列の現在の状態
*/
#state?: AIQState
readonly #state = new Uint8Array([AIQState.undefined])

/**
* コンストラクタ
*/
constructor() {
const resolveAsync = createAsyncResolver({
finish: () => {
const state = this.#state
this.#state = 'finished'
return state
},
finish: () => Atomics.exchange(this.#state, 0, AIQState.finished),
resolvers: this.#resolvers,
})
this.#emitter.on('deq', async () => {
Expand All @@ -93,28 +94,36 @@ export class AsyncIterableQueue<T> implements AsyncIterable<T> {
* @param cb 終端が読み取られた後に呼ばれるコールバック関数
*/
end(cb?: NoParameterCallback): Promise<void> {
const state = this.#state
if (state)
throw new Error(state)
this.#state = 'ending'
return new Promise((resolve: Resolver<void>) => (
this.#emitter.emit('enq', new Terminator(cb)),
resolve()
))
return new Promise(
(resolve: Resolver<void>, reject: SingleParameterAction<unknown>) => {
const state = Atomics.compareExchange(
this.#state,
0,
AIQState.undefined,
AIQState.ending,
)
if (state !== AIQState.undefined)
return reject(new Error(AIQState[state]))
this.#emitter.emit('enq', new Terminator(cb))
return resolve()
}
)
}

/**
* この待ち行列の末尾に要素を追加する
* @param value 要素の値
*/
push(value: T): Promise<void> {
const state = this.#state
if (state)
throw new Error(state)
return new Promise((resolve: Resolver<void>) => (
this.#emitter.emit('enq', value),
resolve()
))
return new Promise(
(resolve: Resolver<void>, reject: SingleParameterAction<unknown>) => {
const state = Atomics.load(this.#state, 0)
if (state !== AIQState.undefined)
return reject(new Error(AIQState[state]))
this.#emitter.emit('enq', value)
return resolve()
}
)
}

/**
Expand Down Expand Up @@ -185,15 +194,12 @@ class Terminator {
try {
const result = this.cb()
if (result instanceof Promise)
result.catch(reject).then(resolve)
else
resolve()
return result.catch(reject).then(resolve)
}
catch (err: unknown) {
reject(err)
return reject(err)
}
else
resolve()
return resolve()
})
}
}
Expand All @@ -219,8 +225,8 @@ const createAsyncResolver = <T>(param: AsyncResolverCreateParameter<T>) => {
if (value instanceof Terminator) {
const state = param.finish()
await resolveAsync({ done: true } as IteratorResult<T>)
if (state === 'ending')
await value.call()
assert(state === AIQState.ending)
await value.call()
}
else
await resolveAsync({
Expand Down
34 changes: 33 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,38 @@
"email": "km.8k6ce+github@gmail.com",
"name": "kei-g"
},
"badges": [
{
"description": "license",
"href": "https://img.shields.io/github/license/kei-g/async-iterable-queue",
"url": "https://opensource.org/licenses/BSD-3-Clause"
},
{
"description": "dependency",
"href": "https://img.shields.io/librariesio/release/npm/async-iterable-queue?logo=nodedotjs",
"url": "https://npmjs.com/package/async-iterable-queue?activeTab=dependencies"
},
{
"description": "Travis CI",
"href": "https://img.shields.io/travis/com/kei-g/async-iterable-queue?logo=travis",
"url": "https://www.travis-ci.com/github/kei-g/async-iterable-queue"
},
{
"description": "version",
"href": "https://img.shields.io/npm/v/async-iterable-queue?logo=npm",
"url": "https://npmjs.com/package/async-iterable-queue"
},
{
"description": "maintainance",
"href": "https://img.shields.io/npms-io/maintenance-score/async-iterable-queue?logo=npm",
"url": "https://npms.io/search?q=async-iterable-queue"
},
{
"description": "quality",
"href": "https://img.shields.io/npms-io/quality-score/async-iterable-queue?logo=npm",
"url": "https://npms.io/search?q=async-iterable-queue"
}
],
"bugs": {
"url": "https://github.com/kei-g/async-iterable-queue/issues"
},
Expand Down Expand Up @@ -41,7 +73,7 @@
"main": "lib/async-iterable-queue.js",
"name": "async-iterable-queue",
"nyc": {
"branches": 93,
"branches": 100,
"functions": 100,
"lines": 100,
"statements": 100
Expand Down
19 changes: 15 additions & 4 deletions test/async-iterable-queue.spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { AsyncIterableQueue } from '../async-iterable-queue'
import { describe, it } from 'mocha'
import { expect } from 'chai'
import { throws } from 'assert'

const source = [
Math.LN2,
Expand All @@ -23,7 +22,13 @@ describe('failure mission', async () => {
for (const value of source)
await q.push(value)
await q.end()
throws(() => q.end())
let error: Error
await q.end()
.catch((reason: unknown) => {
if (reason instanceof Error)
error = reason
})
.finally(() => expect(error).instanceOf(Error))
}
await Promise.all([popAsync(q), pushAsync()])
})
Expand All @@ -33,7 +38,13 @@ describe('failure mission', async () => {
for (const value of source)
await q.push(value)
await q.end()
throws(() => q.push(Math.SQRT2))
let error: Error
await q.push(Math.SQRT2)
.catch((reason?: unknown) => {
if (reason instanceof Error)
error = reason
})
.finally(() => expect(error).instanceOf(Error))
}
await Promise.all([popAsync(q), pushAsync()])
})
Expand All @@ -44,7 +55,7 @@ describe('failure mission', async () => {
await q.push(value)
await q.end(() => {
throw new Error()
})
}).catch((reason?: unknown) => expect(reason).instanceOf(Error))
}
await Promise.all([popAsync(q), pushAsync()])
})
Expand Down

0 comments on commit c281044

Please sign in to comment.