Skip to content

Commit

Permalink
Support exposing generator functions
Browse files Browse the repository at this point in the history
  • Loading branch information
andywer committed Jun 21, 2020
1 parent d2f734d commit 300342e
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 4 deletions.
6 changes: 6 additions & 0 deletions src/serializers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { MessageRelay } from "../types/common"
import { JsonSerializable, Serializer, SerializerImplementation } from "../types/serializers"
import { isSerializedCallback, DefaultCallbackSerializer } from "./callbacks"
import { isSerializedError, DefaultErrorSerializer } from "./errors"
import { isIterator, isSerializedIterator, DefaultIteratorSerializer } from "./iterators"

export {
JsonSerializable,
Expand Down Expand Up @@ -36,6 +37,8 @@ export const DefaultSerializer = (): Serializer<JsonSerializable> => {
return errorSerializer.deserialize(message, sender)
} else if (isSerializedCallback(message)) {
return callbackSerializer.deserialize(message, sender)
} else if (isSerializedIterator(message)) {
return iteratorSerializer.deserialize(message, sender)
} else {
return message
}
Expand All @@ -45,6 +48,8 @@ export const DefaultSerializer = (): Serializer<JsonSerializable> => {
return errorSerializer.serialize(input) as any as JsonSerializable
} else if (isCallback(input)) {
return callbackSerializer.serialize(input) as any as JsonSerializable
} else if (isIterator(input)) {
return iteratorSerializer.serialize(input) as any as JsonSerializable
} else {
return input
}
Expand All @@ -53,6 +58,7 @@ export const DefaultSerializer = (): Serializer<JsonSerializable> => {

const callbackSerializer = DefaultCallbackSerializer(serializer)
const errorSerializer = DefaultErrorSerializer()
const iteratorSerializer = DefaultIteratorSerializer(serializer)

return serializer
}
47 changes: 47 additions & 0 deletions src/serializers/iterators.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import DebugLogger from "debug"
import { createProxyFunction } from "../common/call-proxy"
import { Callback, RemoteCallback } from "../common/callbacks"
import { MessageRelay } from "../types/common"
import { SerializedIterator, Serializer } from "../types/serializers"

const debug = DebugLogger("threads:callback:messages")

export const DefaultIteratorSerializer = (rootSerializer: Serializer): Serializer<SerializedIterator, Iterator<any> | AsyncIterator<any>, AsyncIterator<any>> => ({
deserialize(message: SerializedIterator, origin: MessageRelay): AsyncIterator<any> & AsyncIterable<any> {
const remoteNext = createProxyFunction<[], IteratorResult<any>>(origin, rootSerializer, message.next_fid, debug)
const remoteCallback = RemoteCallback<() => Promise<IteratorResult<any>>>(remoteNext)

const next = async () => {
const result = await remoteCallback()
if (result.done) {
remoteCallback.release()
}
return result
}

const asyncIterator = {
[Symbol.asyncIterator]: () => asyncIterator,
next
}
return asyncIterator
},
serialize(iter: Iterator<any> | AsyncIterator<any>): SerializedIterator {
const next = Callback(async () => {
const result = await iter.next()
if (result.done) {
next.release()
}
return result
})
return {
__iterator_marker: "$$iterator",
next_fid: next.id
}
}
})

export const isIterator = (thing: any): thing is Iterator<any> | AsyncIterator<any> =>
thing && typeof thing === "object" && "next" in thing && typeof thing.next === "function"

export const isSerializedIterator = (thing: any): thing is SerializedIterator =>
thing && typeof thing === "object" && "__iterator_marker" in thing && thing.__iterator_marker === "$$iterator"
11 changes: 9 additions & 2 deletions src/types/master.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,19 @@ export type StripAsync<Type> =
? ObservableBaseType
: Type

export type AsyncifyIterator<Type> =
Type extends Iterator<infer T>
? AsyncIterator<T> & AsyncIterable<T>
: Type extends AsyncIterator<infer T2>
? AsyncIterator<T2> & AsyncIterable<T2>
: Type

export type ModuleMethods = { [methodName: string]: (...args: any) => any }

export type ProxyableFunction<Args extends any[], ReturnType> =
Args extends []
? () => ObservablePromise<StripAsync<ReturnType>>
: (...args: Args) => ObservablePromise<StripAsync<ReturnType>>
? () => ObservablePromise<AsyncifyIterator<StripAsync<ReturnType>>>
: (...args: Args) => ObservablePromise<AsyncifyIterator<StripAsync<ReturnType>>>

export type ModuleProxy<Methods extends ModuleMethods> = {
[method in keyof Methods]: ProxyableFunction<Parameters<Methods[method]>, ReturnType<Methods[method]>>
Expand Down
9 changes: 7 additions & 2 deletions src/types/serializers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ export type JsonSerializable =
| JsonSerializableObject
| JsonSerializableObject[]

export interface Serializer<Msg = JsonSerializable, Input = any> {
deserialize(message: Msg, sender: MessageRelay | null): Input
export interface Serializer<Msg = JsonSerializable, Input = any, Deserialized = Input> {
deserialize(message: Msg, sender: MessageRelay | null): Deserialized
serialize(input: Input): Msg
}

Expand All @@ -38,3 +38,8 @@ export interface SerializedError {
name: string
stack?: string
}

export interface SerializedIterator {
__iterator_marker: "$$iterator"
next_fid: number
}
36 changes: 36 additions & 0 deletions test/iterators.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import test from "ava"
import { spawn, Callback, Thread, Worker } from "../src/index"
import { AsyncGenerator } from "./workers/async-generator"
import { Generator } from "./workers/generator"

test("can use a generator function exposed by a worker", async t => {
const generate = await spawn<Generator>(new Worker("./workers/generator"))

try {
const results: number[] = []

for await (const i of await generate(3)) {
results.push(i)
}

t.deepEqual(results, [1, 2, 3])
} finally {
await Thread.terminate(generate)
}
})

test("can use an async generator function exposed by a worker", async t => {
const generate = await spawn<AsyncGenerator>(new Worker("./workers/async-generator"))

try {
const results: number[] = []

for await (const i of await generate(3)) {
results.push(i)
}

t.deepEqual(results, [1, 2, 3])
} finally {
await Thread.terminate(generate)
}
})
12 changes: 12 additions & 0 deletions test/workers/async-generator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { expose } from "../../src/worker"

export type AsyncGenerator = (count: number) => AsyncIterator<number>

const delay = (ms: number) => new Promise(resolve => setTimeout(resolve, ms))

expose(async function* generator(count: number) {
for (let i = 1; i <= count; i++) {
await delay(2)
yield i
}
})
9 changes: 9 additions & 0 deletions test/workers/generator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { expose } from "../../src/worker"

export type Generator = (count: number) => Iterator<number>

expose(function *generator(count: number) {
for (let i = 1; i <= count; i++) {
yield i
}
})

0 comments on commit 300342e

Please sign in to comment.