diff --git a/.gitignore b/.gitignore index 54c1431..d8d51cd 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ dist __pycache__ .pytest_cache .mypy_cache +.coverage diff --git a/actress/lib.py b/actress/lib.py deleted file mode 100644 index 4861114..0000000 --- a/actress/lib.py +++ /dev/null @@ -1,1120 +0,0 @@ -from typing import TypeVar - -# import * as Task from "./task.js" -# export * from "./task.js" - -# /** -# * Turns a task (that never fails or sends messages) into an effect of it's -# * result. -# * -# * @template T -# * @param {Task.Task} task -# * @returns {Task.Effect} -# */ -# export const effect = function* (task) { -# const message = yield* task -# yield* send(message) -# } - -# /** -# * Gets a handle to the task that invoked it. Useful when task needs to -# * suspend execution until some outside event occurs, in which case handle -# * can be used resume execution (see `suspend` code example for more details) -# * -# * @template T, M, X -# * @returns {Task.Task, never>} -# */ -# export function* current() { -# return /** @type {Task.Controller} */ (yield CURRENT) -# } - -# /** -# * Suspends the current task (task that invokes it), which can then be -# * resumed from another task or an outside event (e.g. `setTimeout` callback) -# * by calling the `resume` with an task's handle. -# * -# * Calling this in almost all cases is preceeded by call to `current()` in -# * order to obtain a `handle` which can be passed to `resume` function -# * to resume the execution. -# * -# * Note: This task never fails, although it may never resume either. However -# * you can utilize `finally` block to do a necessary cleanup in case execution -# * is aborted. -# * -# * @example -# * ```js -# * import { current, suspend, resume } from "actor" -# * function * sleep(duration) { -# * // get a reference to this task so we can resume it. -# * const self = yield * current() -# * // resume this task when timeout fires -# * const id = setTimeout(() => resume(self), duration) -# * try { -# * // suspend this task nothing below this line will run until task is -# * // resumed. -# * yield * suspend() -# * } finally { -# * // if task is aborted finally block will still run which given you -# * // chance to cleanup. -# * clearTimeout(id) -# * } -# * } -# * ``` -# * -# * @returns {Task.Task} -# */ -# export const suspend = function* () { -# yield SUSPEND -# } - -# /** -# * Suspends execution for the given duration in milliseconds, after which -# * execution is resumed (unless it was aborted in the meantime). -# * -# * @example -# * ```js -# * function * demo() { -# * console.log("I'm going to take small nap") -# * yield * sleep(200) -# * console.log("I am back to work") -# * } -# * ``` -# * -# * @param {number} [duration] -# * @returns {Task.Task} -# */ -# export function* sleep(duration = 0) { -# const task = yield* current() -# const id = setTimeout(enqueue, duration, task) - -# try { -# yield* suspend() -# } finally { -# clearTimeout(id) -# } -# } - -# /** -# * Provides equivalent of `await` in async functions. Specifically it takes -# * a value that you can `await` on (that is `Promise|T`) and suspends -# * execution until promise is settled. If promise succeeds execution is resumed -# * with `T` otherwise an error of type `X` is thrown (which is by default -# * `unknown` since promises do not encode error type). -# * -# * It is useful when you need to deal with potentially async set of operations -# * without having to check if thing is a promise at every step. -# * -# * Please note: This that execution is suspended even if given value is not a -# * promise, however scheduler will still resume it in the same tick of the event -# * loop after, just processing other scheduled tasks. This avoids problematic -# * race condititions that can otherwise occur when values are sometimes promises -# * and other times are not. -# * -# * @example -# * ```js -# * function * fetchJSON (url, options) { -# * const response = yield * wait(fetch(url, options)) -# * const json = yield * wait(response.json()) -# * return json -# * } -# * ``` -# * -# * @template T, [X=unknown] -# * @param {Task.Await} input -# * @returns {Task.Task} -# */ -# export const wait = function* (input) { -# const task = yield* current() -# if (isAsync(input)) { -# let failed = false -# /** @type {unknown} */ -# let output = undefined -# input.then( -# value => { -# failed = false -# output = value -# enqueue(task) -# }, -# error => { -# failed = true -# output = error -# enqueue(task) -# } -# ) - -# yield* suspend() -# if (failed) { -# throw output -# } else { -# return /** @type {T} */ (output) -# } -# } else { -# // This may seem redundunt but it is not, by enqueuing this task we allow -# // scheduler to perform other queued tasks first. This way many race -# // conditions can be avoided when values are sometimes promises and other -# // times aren't. -# // Unlike `await` however this will resume in the same tick. -# main(wake(task)) -# yield* suspend() -# return input -# } -# } - -# /** -# * @template T, X, M -# * @param {Task.Controller} task -# * @returns {Task.Task} -# */ -# function* wake(task) { -# enqueue(task) -# } - -# /** -# * Checks if value value is a promise (or it's lookalike). -# * -# * @template T -# * @param {any} node -# * @returns {node is PromiseLike} -# */ - -# const isAsync = node => -# node != null && -# typeof (/** @type {{then?:unknown}} */ (node).then) === "function" - -# /** -# * Task that sends given message (or rather an effect producing this message). -# * Please note, that while you could use `yield message` instead, but you'd risk -# * having to deal with potential breaking changes if library internals change -# * in the future, which in fact may happen as anticipated improvements in -# * TS generator inference could enable replace need for `yield *`. -# * -# * @see https://github.com/microsoft/TypeScript/issues/43632 -# * -# * @template T -# * @param {T} message -# * @returns {Task.Effect} -# */ -# export const send = function* (message) { -# yield /** @type {Task.Message} */ (message) -# } - -T = TypeVar("T") - -def send(message: T) - -# /** -# * Takes several effects and merges them into a single effect of tagged -# * variants so that their source could be identified via `type` field. -# * -# * @example -# * ```js -# * listen({ -# * read: Task.effect(dbRead), -# * write: Task.effect(dbWrite) -# * }) -# * ``` -# * -# * @template {string} Tag -# * @template T -# * @param {{ [K in Tag]: Task.Effect }} source -# * @returns {Task.Effect>} -# */ -# export const listen = function* (source) { -# /** @type {Task.Fork>[]} */ -# const forks = [] -# for (const entry of Object.entries(source)) { -# const [name, effect] = /** @type {[Tag, Task.Effect]} */ (entry) -# if (effect !== NONE) { -# forks.push(yield* fork(tag(effect, name))) -# } -# } - -# yield* group(forks) -# } - -# /** -# * Takes several tasks and creates an effect of them all. -# * -# * @example -# * ```js -# * Task.effects([ -# * dbRead, -# * dbWrite -# * ]) -# * ``` -# * -# * @template {string} Tag -# * @template T -# * @param {Task.Task[]} tasks -# * @returns {Task.Effect} -# */ - -# export const effects = tasks => -# tasks.length > 0 ? batch(tasks.map(effect)) : NONE - -# /** -# * Takes several effects and combines them into a one. -# * -# * @template T -# * @param {Task.Effect[]} effects -# * @returns {Task.Effect} -# */ -# export function* batch(effects) { -# const forks = [] -# for (const effect of effects) { -# forks.push(yield* fork(effect)) -# } - -# yield* group(forks) -# } - -# /** -# * @template {string} Tag -# * @template T -# * @typedef {{type: Tag} & {[K in Tag]: T}} Tagged -# */ -# /** -# * Tags an effect by boxing each event with an object that has `type` field -# * corresponding to given tag and same named field holding original message -# * e.g. given `nums` effect that produces numbers, `tag(nums, "inc")` would -# * create an effect that produces events like `{type:'inc', inc:1}`. -# * -# * @template {string} Tag -# * @template T, M, X -# * @param {Task.Task} effect -# * @param {Tag} tag -# * @returns {Task.Task>} -# */ -# export const tag = (effect, tag) => -# // @ts-ignore -# effect === NONE -# ? NONE -# : effect instanceof Tagger -# ? new Tagger([...effect.tags, tag], effect.source) -# : new Tagger([tag], effect) - -# /** -# * @template {string} Tag -# * @template Success, Failure, Message -# * -# * @implements {Task.Task>} -# * @implements {Task.Controller>} -# */ -# class Tagger { -# /** -# * @param {Task.Task} source -# * @param {string[]} tags -# */ -# constructor(tags, source) { -# this.tags = tags -# this.source = source -# /** @type {Task.Controller} */ -# this.controller -# } -# /* c8 ignore next 3 */ -# [Symbol.iterator]() { -# if (!this.controller) { -# this.controller = this.source[Symbol.iterator]() -# } -# return this -# } -# /** -# * @param {Task.TaskState} state -# * @returns {Task.TaskState>} -# */ -# box(state) { -# if (state.done) { -# return state -# } else { -# switch (state.value) { -# case SUSPEND: -# case CURRENT: -# return /** @type {Task.TaskState>} */ ( -# state -# ) -# default: { -# // Instead of boxing result at each transform step we perform in-place -# // mutation as we know nothing else is accessing this value. -# const tagged = /** @type {{ done: false, value: any }} */ (state) -# let { value } = tagged -# for (const tag of this.tags) { -# value = withTag(tag, value) -# } -# tagged.value = value -# return tagged -# } -# } -# } -# } -# /** -# * -# * @param {Task.Instruction} instruction -# */ -# next(instruction) { -# return this.box(this.controller.next(instruction)) -# } -# /** -# * -# * @param {Failure} error -# */ -# throw(error) { -# return this.box(this.controller.throw(error)) -# } -# /** -# * @param {Success} value -# */ -# return(value) { -# return this.box(this.controller.return(value)) -# } - -# get [Symbol.toStringTag]() { -# return "TaggedEffect" -# } -# } - -# /** -# * Returns empty `Effect`, that is produces no messages. Kind of like `[]` or -# * `""` but for effects. -# * -# * @type {() => Task.Effect} -# */ -# export const none = () => NONE - -# /** -# * Takes iterable of tasks and runs them concurrently, returning array of -# * results in an order of tasks (not the order of completion). If any of the -# * tasks fail all the rest are aborted and error is throw into calling task. -# * -# * > This is basically equivalent of `Promise.all` except cancelation logic -# * because tasks unlike promises can be cancelled. -# * -# * @template T, X -# * @param {Iterable>} tasks -# * @returns {Task.Task} -# */ -# export const all = function* (tasks) { -# const self = yield* current() - -# /** @type {(id:number) => (value:T) => void} */ -# const succeed = id => value => { -# delete forks[id] -# results[id] = value -# count -= 1 -# if (count === 0) { -# enqueue(self) -# } -# } - -# /** @type {(error:X) => void} */ -# const fail = error => { -# for (const handle of forks) { -# if (handle) { -# enqueue(abort(handle, error)) -# } -# } - -# enqueue(abort(self, error)) -# } - -# /** @type {Task.Fork[]} */ -# let forks = [] -# let count = 0 -# for (const task of tasks) { -# forks.push(yield* fork(then(task, succeed(count++), fail))) -# } -# const results = new Array(count) - -# if (count > 0) { -# yield* suspend() -# } - -# return results -# } - -# /** -# * @template {string} Tag -# * @template T -# * @param {Tag} tag -# * @param {Task.Message} value -# */ -# const withTag = (tag, value) => -# /** @type {Tagged} */ -# ({ type: tag, [tag]: value }) - -# /** -# * Kind of like promise.then which is handy when you want to extract result -# * from the given task from the outside. -# * -# * @template T, U, X, M -# * @param {Task.Task} task -# * @param {(value:T) => U} resolve -# * @param {(error:X) => U} reject -# * @returns {Task.Task} -# */ -# export function* then(task, resolve, reject) { -# try { -# return resolve(yield* task) -# } catch (error) { -# return reject(/** @type {X} */ (error)) -# } -# } - -# // Special control instructions recognized by a scheduler. -# const CURRENT = Symbol("current") -# const SUSPEND = Symbol("suspend") -# /** @typedef {typeof SUSPEND|typeof CURRENT} Control */ - -# /** -# * @template M -# * @param {Task.Instruction} value -# * @returns {value is M} -# */ -# export const isMessage = value => { -# switch (value) { -# case SUSPEND: -# case CURRENT: -# return false -# default: -# return true -# } -# } - -# /** -# * @template M -# * @param {Task.Instruction} value -# * @returns {value is Control} -# */ -# export const isInstruction = value => !isMessage(value) - -# /** -# * @template T, X, M -# * @implements {Task.TaskGroup} -# */ -# class Group { -# /** -# * @template T, X, M -# * @param {Task.Controller|Task.Fork} member -# * @returns {Task.Group} -# */ -# static of(member) { -# return ( -# /** @type {{group?:Task.TaskGroup}} */ (member).group || MAIN -# ) -# } - -# /** -# * @template T, X, M -# * @param {(Task.Controller|Task.Fork) & {group?:Task.TaskGroup}} member -# * @param {Task.TaskGroup} group -# */ -# static enqueue(member, group) { -# member.group = group -# group.stack.active.push(member) -# } -# /** -# * @param {Task.Controller} driver -# * @param {Task.Controller[]} [active] -# * @param {Set>} [idle] -# * @param {Task.Stack} [stack] -# */ -# constructor( -# driver, -# active = [], -# idle = new Set(), -# stack = new Stack(active, idle) -# ) { -# this.driver = driver -# this.parent = Group.of(driver) -# this.stack = stack -# this.id = ++ID -# } -# } - -# /** -# * @template T, X, M -# * @implements {Task.Main} -# */ -# class Main { -# constructor() { -# this.status = IDLE -# this.stack = new Stack() -# this.id = /** @type {0} */ (0) -# } -# } - -# /** -# * @template T, X, M -# */ -# class Stack { -# /** -# * @param {Task.Controller[]} [active] -# * @param {Set>} [idle] -# */ -# constructor(active = [], idle = new Set()) { -# this.active = active -# this.idle = idle -# } - -# /** -# * -# * @param {Task.Stack} stack -# * @returns -# */ -# static size({ active, idle }) { -# return active.length + idle.size -# } -# } - -# /** -# * Starts a main task. -# * -# * @param {Task.Task} task -# */ -# export const main = task => enqueue(task[Symbol.iterator]()) - -# /** -# * @template T, X, M -# * @param {Task.Controller} task -# */ -# const enqueue = task => { -# let group = Group.of(task) -# group.stack.active.push(task) -# group.stack.idle.delete(task) - -# // then walk up the group chain and unblock their driver tasks. -# while (group.parent) { -# const { idle, active } = group.parent.stack -# if (idle.has(group.driver)) { -# idle.delete(group.driver) -# active.push(group.driver) -# } else { -# // if driver was not blocked it must have been unblocked by -# // other task so stop there. -# break -# } - -# group = group.parent -# } - -# if (MAIN.status === IDLE) { -# MAIN.status = ACTIVE -# while (true) { -# try { -# for (const _message of step(MAIN)) { -# } -# MAIN.status = IDLE -# break -# } catch (_error) { -# // Top level task may crash and throw an error, but given this is a main -# // group we do not want to interupt other unrelated tasks, which is why -# // we discard the error and the task that caused it. -# MAIN.stack.active.shift() -# } -# } -# } -# } - -# /** -# * @template T, X, M -# * @param {Task.Controller} task -# */ -# export const resume = task => enqueue(task) - -# /** -# * @template T, X, M -# * @param {Task.Group} group -# */ - -# const step = function* (group) { -# const { active } = group.stack -# let task = active[0] -# group.stack.idle.delete(task) -# while (task) { -# /** @type {Task.TaskState} */ -# let state = INIT -# // Keep processing insturctions until task is done, it send suspend request -# // or it's has been removed from the active queue. -# // ⚠️ Group changes require extra care so please make sure to understand -# // the detail here. It occurs when spawned task(s) are joined into a group -# // which will change the task driver, that is when `task === active[0]` will -# // became false and need to to drop the task immediately otherwise race -# // condition will occur due to task been driven by multiple concurrent -# // schedulers. -# loop: while (!state.done && task === active[0]) { -# const instruction = state.value -# switch (instruction) { -# // if task is suspended we add it to the idle list and break the loop -# // to move to a next task. -# case SUSPEND: -# group.stack.idle.add(task) -# break loop -# // if task requested a context (which is usually to suspend itself) -# // pass back a task reference and continue. -# case CURRENT: -# state = task.next(task) -# break -# default: -# // otherwise task sent a message which we yield to the driver and -# // continue -# state = task.next( -# yield /** @type {M & Task.Message}*/ (instruction) -# ) -# break -# } -# } - -# // If task is complete, or got suspended we move to a next task -# active.shift() -# task = active[0] -# group.stack.idle.delete(task) -# } -# } - -# /** -# * Executes given task concurrently with a current task (task that spawned it). -# * Spawned task is detached from the task that spawned it and it can outlive it -# * and / or fail without affecting a task that spawned it. If you need to wait -# * on concurrent task completion consider using `fork` instead which can be -# * later `joined`. If you just want a to block on task execution you can just -# * `yield* work()` directly instead. -# * -# * @param {Task.Task} task -# * @returns {Task.Task} -# */ -# export function* spawn(task) { -# main(task) -# } - -# /** -# * Executes given task concurrently with current task (the task that initiated -# * fork). Froked task is detached from the task that created it and it can -# * outlive it and / or fail without affecting it. You do however get a handle -# * for the fork which could be used to `join` the task, in which case `joining` -# * task will block until fork finishes execution. -# * -# * This is also a primary interface for executing tasks from the outside of the -# * task context. Function returns `Fork` which implements `Promise` interface -# * so it could be awaited. Please note that calling `fork` does not really do -# * anything, it lazily starts execution when you either `await fork(work())` -# * from arbitray context or `yield* fork(work())` in anothe task context. -# * -# * @template T, X, M -# * @param {Task.Task} task -# * @param {Task.ForkOptions} [options] -# * @returns {Task.Fork} -# */ -# export const fork = (task, options) => new Fork(task, options) - -# /** -# * Exits task succesfully with a given return value. -# * -# * @template T, M, X -# * @param {Task.Controller} handle -# * @param {T} value -# * @returns {Task.Task} -# */ -# export const exit = (handle, value) => conclude(handle, { ok: true, value }) - -# /** -# * Terminates task execution execution. Only takes task that produces no -# * result, if your task has non `void` return type you should use `exit` instead. -# * -# * @template M, X -# * @param {Task.Controller} handle -# */ -# export const terminate = handle => -# conclude(handle, { ok: true, value: undefined }) - -# /** -# * Aborts given task with an error. Task error type should match provided error. -# * -# * @template T, M, X -# * @param {Task.Controller} handle -# * @param {X} [error] -# */ -# export const abort = (handle, error) => conclude(handle, { ok: false, error }) - -# /** -# * Aborts given task with an given error. -# * -# * @template T, M, X -# * @param {Task.Controller} handle -# * @param {Task.Result} result -# * @returns {Task.Task & Task.Controller} -# */ -# function* conclude(handle, result) { -# try { -# const task = handle -# const state = result.ok -# ? task.return(result.value) -# : task.throw(result.error) - -# if (!state.done) { -# if (state.value === SUSPEND) { -# const { idle } = Group.of(task).stack -# idle.add(task) -# } else { -# enqueue(task) -# } -# } -# } catch (error) {} -# } - -# /** -# * Groups multiple forks togather and joins joins them with current task. -# * -# * @template T, X, M -# * @param {Task.Fork[]} forks -# * @returns {Task.Task} -# */ -# export function* group(forks) { -# // Abort eraly if there'se no work todo. -# if (forks.length === 0) return - -# const self = yield* current() -# /** @type {Task.TaskGroup} */ -# const group = new Group(self) -# /** @type {Task.Failure|null} */ -# let failure = null - -# for (const fork of forks) { -# const { result } = fork -# if (result) { -# if (!result.ok && !failure) { -# failure = result -# } -# continue -# } -# move(fork, group) -# } - -# // Keep work looping until there is nom more work to be done -# try { -# if (failure) { -# throw failure.error -# } - -# while (true) { -# yield* step(group) -# if (Stack.size(group.stack) > 0) { -# yield* suspend() -# } else { -# break -# } -# } -# } catch (error) { -# for (const task of group.stack.active) { -# yield* abort(task, error) -# } - -# for (const task of group.stack.idle) { -# yield* abort(task, error) -# enqueue(task) -# } - -# throw error -# } -# } - -# /** -# * @template T, X, M -# * @param {Task.Fork} fork -# * @param {Task.TaskGroup} group -# */ -# const move = (fork, group) => { -# const from = Group.of(fork) -# if (from !== group) { -# const { active, idle } = from.stack -# const target = group.stack -# fork.group = group -# // If it is idle just move from one group to the other -# // and update the group task thinks it belongs to. -# if (idle.has(fork)) { -# idle.delete(fork) -# target.idle.add(fork) -# } else { -# const index = active.indexOf(fork) -# // If task is in the job queue, we move it to a target job queue. Moving -# // top task in the queue requires extra care so it does not end up -# // processed by two groups which would lead to race. For that reason -# // `step` loop checks for group changes on each turn. -# if (index >= 0) { -# active.splice(index, 1) -# target.active.push(fork) -# } -# // otherwise task is complete -# } -# } -# } - -# /** -# * @template T, X, M -# * @param {Task.Fork} fork -# * @returns {Task.Task} -# */ -# export function* join(fork) { -# // If fork is still idle activate it. -# if (fork.status === IDLE) { -# yield* fork -# } - -# if (!fork.result) { -# yield* group([fork]) -# } - -# const result = /** @type {Task.Result} */ (fork.result) -# if (result.ok) { -# return result.value -# } else { -# throw result.error -# } -# } - -# /** -# * @template T, X -# * @implements {Task.Future} -# */ -# class Future { -# /** -# * @param {Task.StateHandler} handler -# */ -# constructor(handler) { -# this.handler = handler -# /** -# * @abstract -# * @type {Task.Result|void} -# */ -# this.result -# } -# /** -# * @type {Promise} -# */ -# get promise() { -# const { result } = this -# const promise = -# result == null -# ? new Promise((succeed, fail) => { -# this.handler.onsuccess = succeed -# this.handler.onfailure = fail -# }) -# : result.ok -# ? Promise.resolve(result.value) -# : Promise.reject(result.error) -# Object.defineProperty(this, "promise", { value: promise }) -# return promise -# } - -# /** -# * @template U, [E=never] -# * @param {((value:T) => U | PromiseLike)|undefined|null} [onresolve] -# * @param {((error:X) => E|PromiseLike)|undefined|null} [onreject] -# * @returns {Promise} -# */ -# then(onresolve, onreject) { -# return this.activate().promise.then(onresolve, onreject) -# } -# /** -# * @template [U=never] -# * @param {(error:X) => U} onreject -# */ -# catch(onreject) { -# return /** @type {Task.Future} */ ( -# this.activate().promise.catch(onreject) -# ) -# } -# /** -# * @param {() => void} onfinally -# * @returns {Task.Future} -# */ -# finally(onfinally) { -# return /** @type {Task.Future} */ ( -# this.activate().promise.finally(onfinally) -# ) -# } -# /** -# * @abstract -# */ -# /* c8 ignore next 3 */ -# activate() { -# return this -# } -# } - -# /** -# * @template T, X, M -# * @implements {Task.Fork} -# * @implements {Task.Controller} -# * @implements {Task.Task, never>} -# * @implements {Task.Future} -# * @extends {Future} -# */ -# class Fork extends Future { -# /** -# * @param {Task.Task} task -# * @param {Task.ForkOptions} [options] -# * @param {Task.StateHandler} [handler] -# * @param {Task.TaskState} [state] -# */ -# constructor(task, options = BLANK, handler = {}, state = INIT) { -# super(handler) -# this.id = ++ID -# this.name = options.name || "" -# /** @type {Task.Task} */ -# this.task = task -# this.state = state -# this.status = IDLE -# /** @type {Task.Result} */ -# this.result -# this.handler = handler - -# /** @type {Task.Controller} */ -# this.controller -# } - -# *resume() { -# resume(this) -# } - -# /** -# * @returns {Task.Task} -# */ -# join() { -# return join(this) -# } - -# /** -# * @param {X} error -# */ -# abort(error) { -# return abort(this, error) -# } -# /** -# * @param {T} value -# */ -# exit(value) { -# return exit(this, value) -# } -# get [Symbol.toStringTag]() { -# return "Fork" -# } - -# /** -# * @returns {Task.Controller, never, never>} -# */ -# *[Symbol.iterator]() { -# return this.activate() -# } - -# activate() { -# this.controller = this.task[Symbol.iterator]() -# this.status = ACTIVE -# enqueue(this) -# return this -# } - -# /** -# * @private -# * @param {any} error -# * @returns {never} -# */ -# panic(error) { -# this.result = { ok: false, error } -# this.status = FINISHED -# const { handler } = this -# if (handler.onfailure) { -# handler.onfailure(error) -# } - -# throw error -# } - -# /** -# * @private -# * @param {Task.TaskState} state -# */ -# step(state) { -# this.state = state -# if (state.done) { -# this.result = { ok: true, value: state.value } -# this.status = FINISHED -# const { handler } = this -# if (handler.onsuccess) { -# handler.onsuccess(state.value) -# } -# } - -# return state -# } - -# /** -# * @param {unknown} value -# */ -# next(value) { -# try { -# return this.step(this.controller.next(value)) -# } catch (error) { -# return this.panic(error) -# } -# } -# /** -# * @param {T} value -# */ -# return(value) { -# try { -# return this.step(this.controller.return(value)) -# } catch (error) { -# return this.panic(error) -# } -# } -# /** -# * @param {X} error -# */ -# throw(error) { -# try { -# return this.step(this.controller.throw(error)) -# } catch (error) { -# return this.panic(error) -# } -# } -# } - -# /** -# * @template M -# * @param {Task.Effect} init -# * @param {(message:M) => Task.Effect} next -# * @returns {Task.Task} -# */ -# export const loop = function* (init, next) { -# /** @type {Task.Controller} */ -# const controller = yield* current() -# const group = new Group(controller) -# Group.enqueue(init[Symbol.iterator](), group) - -# while (true) { -# for (const message of step(group)) { -# Group.enqueue(next(message)[Symbol.iterator](), group) -# } - -# if (Stack.size(group.stack) > 0) { -# yield* suspend() -# } else { -# break -# } -# } -# } - -# let ID = 0 -# /** @type {Task.Status} */ -# const IDLE = "idle" -# const ACTIVE = "active" -# const FINISHED = "finished" -# /** @type {Task.TaskState} */ -# const INIT = { done: false, value: CURRENT } - -# const BLANK = {} - -# /** @type {Task.Effect} */ -# const NONE = (function* none() {})() - -# /** @type {Task.Main} */ -# const MAIN = new Main() diff --git a/actress/task.py b/actress/task.py index ada64a1..dfb617d 100644 --- a/actress/task.py +++ b/actress/task.py @@ -1,214 +1,1083 @@ -from typing import Generator, Literal, TypeVar, Union - - -class Symbol(object): - """Symbolic global constant""" - - __slots__ = ["_name", "_module"] - __name__ = property(lambda s: s._name) - __module__ = property(lambda s: s._module) - - def __init__(self, symbol, moduleName): - self.__class__._name.__set__(self, symbol) - self.__class__._module.__set__(self, moduleName) - - def __reduce__(self): - return self._name - - def __setattr__(self, attr, val): - raise TypeError("Symbols are immutable") - - def __repr__(self): - return self.__name__ - - __str__ = __repr__ - - -CURRENT = Symbol("current") -SUSPEND = Symbol("suspend") - -Control = Union[CURRENT, SUSPEND] - -# export type Instruction = Message | Control - -# export type Await = T | PromiseLike - -# export type Result = -# | Success -# | Failure - -# export interface Success { -# readonly ok: true -# readonly value: T -# } - -# export interface Failure { -# readonly ok: false -# readonly error: X -# } - -# type CompileError = `🚨 ${Reason}` - -# /** -# * Helper type to guard users against easy to make mistakes. -# */ -# export type Message = T extends Task -# ? CompileError<`You must 'yield * fn()' to delegate task instead of 'yield fn()' which yields generator instead`> -# : T extends (...args: any) => Generator -# ? CompileError<`You must yield invoked generator as in 'yield * fn()' instead of yielding generator function`> -# : T - -# /** -# * Task is a unit of computation that runs concurrently, a light-weight -# * process (in Erlang terms). You can spawn bunch of them and provided -# * cooperative scheduler will interleave their execution. -# * -# * Tasks have three type variables first two describing result of the -# * computation `Success` that corresponds to return type and `Failure` -# * describing an error type (caused by thrown exceptions). Third type -# * varibale `Message` describes type of messages this task may produce. -# * -# * Please note that that TS does not really check exceptions so `Failure` -# * type can not be guaranteed. Yet, we find them more practical that omitting -# * them as TS does for `Promise` types. -# * -# * Our tasks are generators (not the generator functions, but what you get -# * invoking them) that are executed by (library provided) provided scheduler. -# * Scheduler recognizes two special `Control` instructions yield by generator. -# * When scheduler gets `context` instruction it will resume generator with -# * a handle that can be used to resume running generator after it is suspended. -# * When `suspend` instruction is received scheduler will suspend execution until -# * it is resumed by queueing it from the outside event. -# */ -# export interface Task< -# Success extends unknown = unknown, -# Failure = Error, -# Message extends unknown = never -# > { -# [Symbol.iterator](): Controller -# } - -Success = TypeVar("Success") -Failure = TypeVar("Failure") -Message = TypeVar("Message") - -TaskState = Union[Success, Message] - -# Generator[yield_type, send_type, return_type] -# Generator - - -class Task[Success, Message, Failure]: - # def __iter__(): Controller[Success, Message, Failure] - def __iter__(): - Generator[ - Union[Success, Message], - Task[Success, Message, Failure], - Union[Success, Message], - ] - - -# class Controller[Success, Message, Failure](Generator[Union[Success, Message], Task[Success, Message, Failure], Union[Success, Message]]): - -# export interface Controller< -# Success extends unknown = unknown, -# Failure extends unknown = Error, -# Message extends unknown = never -# > { -# throw(error: Failure): TaskState -# return(value: Success): TaskState -# next( -# value: Task | unknown -# ): TaskState -# } - -# export type TaskState< -# Success extends unknown = unknown, -# Message = unknown -# > = IteratorResult, Success> - -# /** -# * Effect represents potentially asynchronous operation that results in a set -# * of events. It is often comprised of multiple `Task` and represents either -# * chain of events or a concurrent set of events (stretched over time). -# * `Effect` campares to a `Stream` the same way as `Task` compares to `Promise`. -# * It is not representation of an eventual result, but rather representation of -# * an operation which if execute will produce certain result. `Effect` can also -# * be compared to an `EventEmitter`, because very often their `Event` type -# * variable is a union of various event types, unlike `EventEmitter`s however -# * `Effect`s have inherent finality to them an in that regard they are more like -# * `Stream`s. -# * -# * You may notice that `Effect`, is just a `Task` which never fails, nor has a -# * (meaningful) result. Instead it can produce events (send messages). -# */ -# export interface Effect extends Task {} - -Status = Literal["idle", "active", "finished"] - -# export type Group = Main | TaskGroup - -# export interface TaskGroup { -# id: number -# parent: Group -# driver: Controller -# stack: Stack - -# result?: Result -# } - -# export interface Main { -# id: 0 -# parent?: null -# status: Status -# stack: Stack -# } - -# export interface Stack { -# active: Controller[] -# idle: Set> -# } - -# /** -# * Like promise but lazy. It corresponds to a task that is activated when -# * then method is called. -# */ -# export interface Future extends PromiseLike { -# then( -# handle?: (value: Success) => U | PromiseLike, -# onrejected?: (error: Failure) => G | PromiseLike -# ): Promise - -# catch(handle: (error: Failure) => U): Future - -# finally(handle: () => void): Future -# } - -# export interface Fork< -# Success extends unknown = unknown, -# Failure extends unknown = Error, -# Message extends unknown = never -# > extends Controller, -# Task, never>, -# Future { -# readonly id: number - -# group?: void | TaskGroup - -# result?: Result -# status: Status -# resume(): Task -# join(): Task - -# abort(error: Failure): Task -# exit(value: Success): Task -# } - -# export interface ForkOptions { -# name?: string -# } - -# export interface StateHandler { -# onsuccess?: (value: T) => void -# onfailure?: (error: X) => void -# } +from __future__ import annotations +import asyncio +import weakref +from dataclasses import dataclass +from typing import Any, Generic, Literal, NoReturn, Optional, TypeVar, TypedDict, Union, cast, overload +from collections.abc import Awaitable, Generator, Callable, Iterable +from enum import Enum +from typing_extensions import TypeAlias + +T = TypeVar("T") # value of task returned (on success) +X = TypeVar("X", bound= Exception) # exception raised by task (failure) +M = TypeVar("M") # message yielded by task +U = TypeVar("U") + + +class CurrentInstruction: + def __repr__(self) -> str: + return '' + + +class SuspendInstruction: + def __repr__(self) -> str: + return '' + + +# Special control instructions recognized by the scheduler. +CURRENT = CurrentInstruction() +SUSPEND = SuspendInstruction() + +Control: TypeAlias = Union[CurrentInstruction, SuspendInstruction] +Instruction: TypeAlias = Union[Control, M] +Controller: TypeAlias = Generator[Union[Control, M], Any, T] +Task: TypeAlias = Generator[Union[Control, M], Any, T] # In python the generator object is also the iterator +""" +Task is a unit of computation that runs concurrently, a light-weight +process (in Erlang terms). You can spawn bunch of them and provided +cooperative scheduler will interleave their execution. + +Tasks have three type variables first two describing result of the +computation `Success` that corresponds to return type and `Failure` +describing an error type (caused by thrown exceptions). Third type +varibale `Message` describes type of messages this task may produce. + +Please note that that Python does not really check exceptions so `Failure` +type can not be guaranteed. Yet, we find them more practical than omitting +them as Python does for `Future` types and its derivatives. + +Our tasks are generators (not the generator functions, but what you get +invoking them) that are executed by (library provided) provided scheduler. +Scheduler recognizes two special `Control` instructions yield by generator. +When scheduler gets `current` instruction it will resume generator with +a handle that can be used to resume running generator after it is suspended. +When `suspend` instruction is received scheduler will suspend execution until +it is resumed by queueing it from the outside event. +""" + +# `M` == `Event` +Effect: TypeAlias = Generator[M, Any, None] +""" +Effect represents potentially asynchronous operations that results in a set of events. +It is often comprised of multiple `Task` and represents either chain of events or a +concurrent set of events (stretched over time). +Effect compares to a `Stream` in Javascript in the same way as `Task` compares to +`Future` in Python. It is not a representation of an eventual result but rather a +representation of an operation which if executed will produce certain result. `Effect` +can also be compared to an `EventEmitter` in Javascript, but very often their `Event` +type variable (`M` type variable in Python imlementation) is a union of various event +types, unlike `EventEmitter`s however `Effect`s have inherent finality to them and in +that regard are more like Javascript `Streams` +""" + +@dataclass +class Success(Generic[T]): + "Result of a successful task." + value: T + ok: bool = True + + +@dataclass +class Failure(Generic[X]): + "Result of a failed task." + error: X + ok: bool = False + + +Result: TypeAlias = Union[Success[T], Failure[X]] + + +@dataclass +class StateHandler(Generic[T, X]): + onsuccess: Optional[Callable[[T], None]] = None + onfailure: Optional[Callable[[X], None]] = None + +ID = 0 +"""Unique IDs for entities (`Fork`, `Group`, ...)""" + + +class ForkOptions(TypedDict): + name: Optional[str] + + +class Status(str, Enum): + """Task execution status.""" + IDLE = "idle" + ACTIVE = "active" + FINISHED = "finished" + + +class Stack(Generic[T, X, M]): + """Stack of active and idle tasks""" + def __init__( + self, + active: Optional[list[ControllerFork[T, X, M]]] = None, + idle: Optional[set[ControllerFork[T, X, M]]] = None + ) -> None: + # gymnastics to align with reference JS implementation without sacrificing + # safety. Using mutable types as default args in Python can lead to weird errors + # that arise from a shared state created at definition time as opposed to each + # time the function is called. + active_eval = active if active is not None else [] + idle_eval = idle if idle is not None else set() + self.active: list[ControllerFork[T, X, M]] = active_eval + self.idle: set[ControllerFork[T, X, M]] = idle_eval + + @staticmethod + def size(stack: Stack[T, X, M]) -> int: + """Get total number of tasks in stack.""" + return len(stack.active) + len(stack.idle) + + +def is_async(node: Any) -> bool: + """ + Checks if a value is awaitable (or its lookalike). + """ + if ( + asyncio.isfuture(node) or + asyncio.iscoroutine(node) or + isinstance(node, Awaitable) + ): + return True + return False + +def sleep(duration: float = 0) -> Task[Control, None]: + """ + Suspends execution for the given duration in milliseconds, after which execution + is resumed (unless it was aborted in the meantime). + + Args: + duration: Time to sleep in milliseconds + + Example: + ``` + def demo(): + print("I'm going to take a small nap") + yield from sleep(200) + print("I am back to work") + ``` + """ + task = yield from current() + loop = asyncio.get_running_loop() + + # convert duration to millisecs + handle = loop.call_later(duration/1000, lambda: enqueue(task)) + + try: + yield from suspend() + finally: + handle.cancel() + +@overload +def wait(input: Awaitable[T]) -> Task[Control, T]: ... + +@overload +def wait(input: T) -> Task[Control, T]: ... + +def wait(input: Union[Awaitable[T], T]) -> Task[Control, T]: + """ + Provides equivalent of `await` in async functions. Specifically it takes a value + that you can `await` on (that is `[T]`, i.e futures, coroutines) and suspends + execution until future is settled. If future succeeds execution is resumed with `T` + otherwise an error of type `X` is thrown (which is by default `unknown` since + futures do not encode error type). + + It is useful when you need to deal with potentially async set of operations + without having to check if thing is an `await`-able at every step. + + Please note that execution is suspended even if given value is not a + promise, however scheduler will still resume it in the same tick of the event + loop after, just processing other scheduled tasks. This avoids problematic + race condititions that can otherwise occur when values are sometimes promises + and other times are not. + + Args: + input_value: A value or awaitable to wait on + + Returns: + The resolved value + + Raises: + Exception if the future fails + + Example: + ``` + def fetch_json(url, options): + const response = yield from wait(fetch(url, options)) + const json = yield from wait(response.json()) + return json + ``` + """ + task = yield from current() + if is_async(input): + # no need to track `failed = False` like in reference JS impl because failure + # can be tracked by only the `Result` object in the `output` variable below + output: Result[T, Exception] = None # type: ignore[assignment] + + async def handle_async(): + nonlocal output + try: + value = await (cast(Awaitable[T], input)) + output = Success(value) + except Exception as error: + output = Failure(error) + enqueue(task) + + loop = asyncio.get_running_loop() + # schedule the async handler on the loop + loop.create_task(handle_async()) + + yield from suspend() + + # check for failure with `output` instead of `failure` variable like in the + # reference implementation + if isinstance(output, Failure): + raise output.error + else: + return output.value + else: + # this may seem redundant but it is not, by enqueuing this task we allow + # scheduler to perform other queued tasks first. This way many race conditions + # can be avoided when values are sometimes promises and other times aren't. + # unlike `await` however this will resume in the same tick. + + # when the `wake` task is enqueued in func `main()`, it is enqueued in the `Main` + # group. and when executed, it enqueues `task`. thereby maintaining consistency + # in suspending immediately while `enqueue(task)` runs async after other tasks + # as it similarly happens if `input` was `Awaitable` + main(wake(task)) + yield from suspend() # suspension happens immediately + return cast(T, input) + +def wake(task: Task[M, T]) -> Task[None, None]: + enqueue(task) + yield + +def main(task: Task[None, None]) -> None: + """ + Starts a main task. + + Args: + task: A task that produces no output (returns None, never fails, sends no messages) + """ + controller = iter(task) + enqueue(controller) # controller == iterator + +def is_message(value: Instruction[M]) -> bool: + if value is SUSPEND or value is CURRENT: + return False + return True + +def is_instruction(value: Instruction[M]) -> bool: + return not is_message(value) + + +class Future_(Generic[T, X]): + """ + Base class for awaitable task handles. + + Provides Promise-like interface for tasks, allowing them to be awaited + from async contexts. + """ + + def __init__(self, handler: Optional[StateHandler[T, X]] = None): + self.handler = handler or StateHandler() + self.result: Optional[Result[T, X]] = None + self._promise: Optional[asyncio.Future[T]] = None + + def _get_promise(self) -> asyncio.Future[T]: + """ + Lazily create and cache an asyncio.Future for this task. + """ + if self._promise is not None: + return self._promise + + # If we already have a result, create a pre-resolved future + if self.result is not None: + loop = asyncio.get_running_loop() + future: asyncio.Future[T] = loop.create_future() + + if isinstance(self.result, Success): + future.set_result(self.result.value) + else: + future.set_exception(self.result.error) + + self._promise = future + return future + + # Otherwise, create a future and wire up handlers + loop = asyncio.get_running_loop() + future = loop.create_future() + + # Store original handlers + original_onsuccess = self.handler.onsuccess + original_onfailure = self.handler.onfailure + + def onsuccess(value: T) -> None: + if not future.done(): + future.set_result(value) + if original_onsuccess: + original_onsuccess(value) + + def onfailure(error: X) -> None: + if not future.done(): + future.set_exception(error) + if original_onfailure: + original_onfailure(error) + + self.handler.onsuccess = onsuccess + self.handler.onfailure = onfailure + + self._promise = future + return future + + def __await__(self) -> Generator[Any, Any, T]: + # Get the promise and await it + promise = self._get_promise() + + # Activate the task (runs synchronously in MAIN scheduler) + self.activate() + + return promise.__await__() + + def activate(self) -> Future_[T, X]: + """ + Activate the task. Overriden in subclasses. + """ + return self + + +class Fork(Future_[T, X], Generic[T, X, M]): + """ + A handle to a running task that can be awaited. + + Implements both the generator protocol (for use within tasks) and + the awaitable protocol (for use in async functions). + """ + def __init__( + self, + task: Task[M, T], + handler: Optional[StateHandler[T, X]] = None, + options: Optional[ForkOptions] = None, + ) -> None: + super().__init__(handler or StateHandler()) + + global ID + ID += 1 + self.id = ID + self.name: str = "" if options is None else (options.get('name', None) or "") + self.task = task + self.state: Union[Instruction[M], StopIteration] = CURRENT + self.status = Status.IDLE + self.controller: Optional[Controller[M, T]] = None + self.group: Optional[Group[T, X, M]] = None + + + def resume(self) -> Task[None, None]: + resume(self) + yield + + def join(self) -> Task[Optional[M], T]: + return join(self) + + def abort(self, error: X) -> Task[None, None]: + return abort(self, error) + + def exit_(self, value: T) -> Task[None, None]: + return exit_(self, value) + + def activate(self) -> Fork[T, X, M]: + """ + Activate the task and enqueue it for execution. + """ + # Only activate if not already active or finished + if self.controller is None: + self.controller = iter(self.task) + self.status = Status.ACTIVE + enqueue(self) + return self + + def __iter__(self) -> Generator[Any, Any, Fork[T, X, M]]: + """ + Make Fork iterable for use with yield from. + """ + # making __iter__ a generator ensures that `yield from fork(work())` schedules + # the fork for concurrent execution without enabling synchronous driving of the + # forked task after activation and then returns immediately (since MAIN group + # is likely already actively being executed by the scheduler and `work` already + # queued in `MAIN`s active queue). + self.activate() + return self + yield + + def _panic(self, error: X) -> NoReturn: + self.result = Failure(ok=False, error=error) + self.status = Status.FINISHED + if self.handler.onfailure is not None: + self.handler.onfailure(error) + raise error + + def _step(self, state: Union[Instruction[M], StopIteration]) -> None: + self.state = state + #`StopIteration` signifies end of a generator in Python and holds its return + # value on success + if isinstance(state, StopIteration): + self.result = Success(ok=True, value=state.value) + self.status = Status.FINISHED + if self.handler.onsuccess is not None: + self.handler.onsuccess(state.value) + # `state` is not returned here like in the reference js implementation because - + # `state` here can be set to `StopIteration(value: T)` which signals competion + # in Python, but in order to keep `Fork` behaviour predictable and similar to + # normal generators (e.g `controller` generators) `StopIteration` isn't returned + # but raised. So no point in returning `state` (especially on completion), + # within the class or - via the generator protocol methods - outside the class. + # just run success handlers, update state and set success value in result. + + def __next__(self) -> Instruction[M]: + try: + # note that: `task.send(None) == next(task)` + # also cast the type for `self.controller` to eliminate the type-checker + # from inferring that `self.controller` is still `None` after fork + # activation + state = cast(Controller, self.controller).send(None) + self._step(state) + return state + except StopIteration as e: + # `StopIteration` means task is finished, therefore task `result`, `status` + # can be set and success handler function ran. + self._step(e) + raise + except Exception as e: + return self._panic(e) # type: ignore[arg-type] + + def send(self, value: Any) -> Instruction[M]: + try: + # cast the type for `self.controller` to eliminate the type-checker + # from inferring that `self.controller` is still `None` after fork + # activation + state = cast(Controller, self.controller).send(value) + self._step(state) + return state + except StopIteration as e: + # `StopIteration` means task is finished, therefore task `result`, `status` + # can be set and success handler function ran. + self._step(e) + raise + except Exception as e: + return self._panic(e) # type: ignore[arg-type] + + def throw(self, error: Union[Exception, GeneratorExit]) -> Instruction[M]: + try: + # also cast the type for `self.controller` to eliminate the type-checker + # from inferring that `self.controller` is still `None` after fork + # activation + state = cast(Controller, self.controller).throw(error) + self._step(state) + return state + except StopIteration as e: + # `StopIteration` means task is finished, therefore task `result`, `status` + # can be set and success handler function ran. + self._step(e) + raise + # `GeneratorExit` is outside the scope of `Exception`. it subclasses + # `BaseException` instead - `Exception`'s parent class. Catch both exc. + except (Exception, GeneratorExit) as e: + return self._panic(e) # type: ignore[arg-type] + + def return_(self, value: T) -> T: + try: + # also cast the type for `self.controller` to eliminate the type-checker + # from inferring that `self.controller` is still `None` after fork + # activation + cast(Controller, self.controller).throw(StopIteration(value)) + except StopIteration as e: + self._step(e) + except Exception as e: + self._panic(e) # type: ignore[arg-type] + else: + # the controller yielded instead of terminating. therefore enforce close + cast(Controller, self.controller).close() + return value + + def close(self) -> None: + try: + # self.controller.close() == self.controller.throw(GeneratorExit) + # also cast the type for `self.controller` to eliminate the type-checker + # from inferring that `self.controller` is still `None` after fork + # activation + cast(Controller, self.controller).close() + except (Exception) as e: + self._panic(e) # type: ignore[arg-type] + + def __repr__(self) -> str: + return f"Fork(id={self.id}, status='{self.status}')" + +# type alias added for convenience +TaskFork: TypeAlias = Union[Task, Fork[T, X, M]] +ControllerFork: TypeAlias = Union[Controller, Fork[T, X, M]] + +class Main(Generic[T, X, M]): + """Default or Fallback Task Group.""" + def __init__(self) -> None: + self.status = Status.IDLE + self.stack = Stack() + self.id: Literal[0] = 0 + self.parent: Optional[TaskGroup[T, X, M]] = None + + +MAIN = Main() +"""Singleton main group""" + +# Python generator objects cannot carry arbitrary attributes like in the reference JS +# implementation, so loop/group helpers store membership here to route resumed generators +# back to their group. +_GROUP_MEMBERSHIP: weakref.WeakKeyDictionary[object, Group[Any, Any, Any]] = weakref.WeakKeyDictionary() + + +class TaskGroup(Generic[T, X, M]): + """Task group for managing concurrent tasks.""" + def __init__( + self, + driver: ControllerFork[T, X, M], + active: Optional[list[ControllerFork[T, X, M]]] = None, + idle: Optional[set[ControllerFork[T, X, M]]] = None, + stack: Optional[Stack[T, X, M]] = None + ) -> None: + self.driver = driver + self.parent = TaskGroup.of(driver) + + # gymnastics to align with reference JS implementation without sacrificing + # safety. Using mutable types as default args in Python can lead to weird errors + # that arise from a shared state created at definition time as opposed to each + # time the function is called. + active_eval = active if active is not None else [] + idle_eval = idle if idle is not None else set() + if active is not None or idle is not None: + self.stack = Stack(active_eval, idle_eval) + elif stack is not None: + self.stack = stack + else: + self.stack = Stack() + + global ID + ID += 1 + self.id = ID + + @staticmethod + def of(member: ControllerFork[T, X, M]) -> Group[T, X, M]: + # since `Generator` objects don't have the `group` attribute if `member` + # is not a `Fork` then it's group is the default `MAIN` group + group = getattr(member, 'group', None) + if group is not None: + return group + mapped = _GROUP_MEMBERSHIP.get(member) + return cast(Group[T, X, M], mapped if mapped is not None else MAIN) + + + @staticmethod + def enqueue(member: TaskFork[T, X, M], group: TaskGroup[T, X, M]) -> None: + try: + member.group = group # type: ignore[union-attr] + except AttributeError: + _GROUP_MEMBERSHIP[member] = group + finally: + group.stack.active.append(member) + + +Group = Union[TaskGroup[T, X, M], Main[T, X, M]] + +def enqueue(task: ControllerFork[T, X, M]) -> None: + group = TaskGroup.of(task) + + group.stack.active.append(task) + group.stack.idle.discard(task) + + # then walk up the group chain and unblock their driver tasks + while group.parent is not None: + idle = group.parent.stack.idle + active = group.parent.stack.active + + # only to appease type checkers, since `MAIN` doesn't have a driver and `MAIN` + # has a parent of `None` so this loop won't run in that case + driver = getattr(group, "driver", None) + if driver is not None and driver in idle: + idle.remove(driver) + active.append(driver) + else: + # if driver was not blocked it must have been unblocked by other task so + # stop there + break + + # crawl up to the parent group + group = group.parent + + if MAIN.status == Status.IDLE: + MAIN.status = Status.ACTIVE + while True: + try: + for m in step(MAIN): + pass + MAIN.status = Status.IDLE + break + except: + # Top level task may crash and throw an error, but given this is a main + # group we do not want to interrupt other unrelated tasks, which is why + # we discard the error and the task that caused it. + MAIN.stack.active.pop(0) + +def step(group: Group[T, X, M]) -> Generator[M, Any, None]: + active = group.stack.active + task = active[0] if active else None + if task: + group.stack.idle.discard(task) + while task: + # Keep processing instructions until task is done, sends a `SUSPEND` request or + # it has been removed from the active queue. + # ⚠️ Group changes require extra care so please make sure to understand the + # detail here. It occurs when a spawned task(s) are joined into a group which + # will change the driver and the group the task belongs to, that is when the + # conditional statement: `task == active[0]` will become false and the task + # would need to be dropped immediately otherwise race dondition will occur due + # to task been driven by multiple concurrent schedulers. + try: + input_value = None + while task == active[0]: + instruction = task.send(input_value) + if instruction is SUSPEND: + group.stack.idle.add(task) + break + # if task requested a context (which is usually to suspend itself) pass back + # a task reference and continue. + elif instruction is CURRENT: + input_value = task + continue + else: + # otherwise task sent a message which we yield to the driver and + # continue + input_value = yield instruction # type: ignore[misc] + continue + except StopIteration: + # task finished + pass + + # if task is complete or got suspended we move to a next task + if active and active[0] == task: + active.pop(0) + + task = active[0] if active else None + if task: + group.stack.idle.discard(task) + +def spawn(task: Task[None, None]) -> Task[None, None]: + """ + Executes a given task concurrently with a current task (task that spawned it). + Spawned task is detached from the task that spawned it and it can outlive it and/or + fail without affecting a task that spawned it. If you need to wait on a concurrent + task completion consider using `fork` instead which can later be `join`ed. If you + just want a task to block on another task's execution you can just use: + `yield from work()` directly instead. + """ + main(task) + return + yield + +def fork(task: Task[M, T], options: ForkOptions | None = None) -> Fork[T, Exception, M]: + """ + Executes a given task concurrently with current task (the task that initiated fork) + Forked task is detached from the task that created it and it can outlive it and / + or fail without affecting it. You do however get a handle for the fork which could + used to `join` the task, in which case `join`ing task would block until fork + finishes execution. + + This is also a primary interface for executing tasks from the outside of the task + context. Function returns `Fork` which implements `Future` interface so it can be + awaited. Please note that calling `fork` does not really do anything, it lazily + starts execution when you either `await fork(work())` from arbitrary context or + `yield from fork(work())` in another task context. + """ + return Fork(task, options=options) + +def current() -> Generator[CurrentInstruction, Controller[M, T], Controller[M, T]]: + return (yield CURRENT) + +def suspend() -> Generator[SuspendInstruction, Any, None]: + yield SUSPEND + +def resume(task: Controller[M, T] | Fork[T, X, M]) -> None: + enqueue(task) + +def conclude( + handle: ControllerFork[T, X, M], + result: Result[T, Exception] +) -> Task[None, None]: + """ + Concludes a given task with a result (either success `value` `T` or `error` `X`) + + Args: + handle: Task controller + result: Success or failure result + """ + try: + task = handle + if isinstance(result, Success): + try: + # force the generator to end and return with `result.value` + state = task.throw(StopIteration(result.value)) + except StopIteration: + return + elif isinstance(result, Failure): + state = task.throw(result.error) + while state is CURRENT: + state = task.send(task) + except Exception: + pass + else: + # incase `task` has a `finally` block that still yields values into `state` + if state is SUSPEND: + idle = TaskGroup.of(task).stack.idle + idle.add(task) + elif state is not None: + enqueue(task) + return + yield + + +def abort(handle: ControllerFork[T, X, M], error: Exception) -> Task[None, None]: + """ + Aborts given task with an error. Task error type should match provided error. + + Args: + handle: Task controller to abort + error: Error to throw into the task + """ + yield from conclude(handle, Failure(error)) + + +def exit_(handle: ControllerFork[T, X, M], value: Any) -> Task[None, None]: + """ + Exits a task successfully with a return value. + + Args: + handle: Task controller to exit + value: Return value on exit + """ + yield from conclude(handle, Success(value)) + + +def terminate(handle: ControllerFork[None, X, M]) -> Task[None, None]: + """ + Terminates a task (only for tasks with void return type). If your task has a + non-`void` return type you should use `exit` instead. + + Args: + handle: Task controller to terminate + """ + yield from conclude(handle, Success(value=None)) + + +def group(forks: list[Fork[T, X, M]]) -> Task[Optional[Instruction[M]], None]: + """ + Groups multiple forks together and joins them with current task. + """ + # abort early if there's no work to do + if len(forks) == 0: return + + self_ = yield from current() + group = TaskGroup(self_) + failure: Optional[Failure[X]] = None + + for fork in forks: + result = fork.result + if result is not None: + # only the first error should be recorded, so `failure` has to be `None` + if not result.ok and failure is None: + failure = cast(Failure, result) + continue + move(fork, group) + + # keep work looping until there is no more work to be done + try: + # raise the exception that caused the first recorded failure result + if failure: + raise failure.error + while True: + # blocks the calling task: `self_`, to run all tasks in the active queue of + # the group to completion + yield from step(group) + # but there might be suspended tasks in `group.stack.idle` + if Stack.size(group.stack) > 0: + # if there are grouped forked tasks that are suspended, then suspend + # driver too. + # NOTE: that `enqueue()` resumes the driver when suspended forked task + # resumes. since `enqueue()` unblocks the drivers of a task's group + # before starting the scheduler and processing the resumed task in it. + yield from suspend() + else: + break + except Exception as error: + # only iterate over a copy of active/idle queue to be safe + for task in list(group.stack.active): + yield from abort(task, error) + + for task in list(group.stack.idle): + yield from abort(task, error) + enqueue(task) # `conclude` might add idle tasks back into `idle` queue + + raise error + + +def move(fork: Fork[T, X, M], group: TaskGroup[T, X, M]) -> None: + """Move a fork from one group to another.""" + from_ = TaskGroup.of(fork) + if from_ is not group: + active, idle = (from_.stack.active, from_.stack.idle) + target = group.stack + fork.group = group + # if it is idle just move from one group to the other and update the group task + # thinks it belongs to. + if fork in idle: + idle.remove(fork) + target.idle.add(fork) + elif fork in active: + index = active.index(fork) + # if task is in the job queue, we move it to a target job queue. Moving top + # task in the queue requires extra care so it does not end up processed by + # two groups which would lead to race. For that reason `step` loop checks + # checks for group changes on each turn + if index >= 0: + active.pop(index) + target.active.append(fork) + # otherwise task is complete + +def join(fork: Fork[T, X, M]) -> Task[Optional[Instruction[M]], T]: + """ + Joins a forked task back into the current task. + + Suspends the current task until the fork completes, then resumes with its result. + If the fork fails, the error is thrown. + + Args: + fork_obj: The fork to join + + Returns: + The fork's result + + Raises: + The fork's error if it failed + """ + if (fork.status == Status.IDLE): + yield from fork + + # if fork didn't complete, process `fork` in the scheduler and block until + # completion + if fork.result is None: + yield from group([fork]) + + result: Result[T, X] = fork.result # type: ignore[assignment] + if isinstance(result, Success): + return result.value + else: + raise result.error + +def send(message: M) -> Effect[M]: + """ + Task that sends a given message (or rather an effect producing this message). + Please note, that while you could use `yield mesage` instead, the reference + implementation for this library written in TS had risks of breaking changes in the + TS generator inference which could enable a replacement for `yield *`. + For uniformity purposes, we decided to stick with the same approach for the Python + implementation as well. + """ + yield message + +def effect(task: Task[None, T]) -> Effect[T]: + """ + Turns a task (that never fails or sends messages) into an effect of its result. + """ + message = yield from task # type: ignore[misc] + yield from send(message) + +def loop(init: Effect[M], next_: Callable[[M], Effect[M]]) -> Task[None, None]: + controller = yield from current() + group = TaskGroup(controller) + TaskGroup.enqueue(iter(init), group) + + while True: + for msg in step(group): + try: + # incase `next` only accepts keyword args + effect = next_(**msg) # type: ignore[arg-type] + except TypeError: + effect = next_(cast(M, msg)) + TaskGroup.enqueue(iter(effect), group) + if Stack.size(group.stack) > 0: + yield from suspend() + else: + break + +Tag: TypeAlias = str + + +class Tagger(Generic[T, X, M]): + def __init__(self, tags: list[str], source: Fork[T, X, M]) -> None: + self.tags = tags + self.source = source + self.controller: Optional[Controller] = None + + def __iter__(self): + if not self.controller: + self.controller = iter(self.source) + return self + + def box( + self, state: Union[Instruction[M], StopIteration] + ) -> Union[Control, Tagged[M]]: + if isinstance(state, StopIteration): + return state.value + else: + if state is CURRENT or state is SUSPEND: + return state # type: ignore[return-value] + else: # tag non-control instructions + # Instead of boxing result at each transform step we perform in-place + # mutation as we know nothing else is accessing this value. + tagged = state + for tag in self.tags: + tagged = with_tag(tag, tagged) + return cast(Tagged, tagged) + + def __next__(self) -> Union[Control, Tagged[M]]: + return self.box(next(cast(Fork, self.controller))) + + def close(self) -> None: + cast(Fork, self.controller).close() + + def send(self, instruction: Instruction[M]) -> Union[Control, Tagged[M]]: + return self.box(cast(Fork, self.controller).send(instruction)) + + def throw(self, error: Exception) -> Union[Control, Tagged[M]]: + return self.box(cast(Fork, self.controller).throw(error)) + + def return_(self, value: T) -> Union[Control, Tagged[T]]: + return self.box(cast(Fork, self.controller).return_(value)) # type: ignore[return-value] + + def __str__(self) -> str: + return "TaggedEffect" + + +def _none_effect() -> Effect[None]: + return + yield # necessary for this func to be recognized as a generator that yields nothing + +NONE_: Effect[None] = _none_effect() + +def none_() -> Effect[None]: + """ + Returns empty `Effect`, that is produces no messages. Kind of like `[]` or `""` but + for effects. + """ + return NONE_ + +def then_( + task: Task[M, T], + resolve: Callable[[T], U], + reject: Callable[[X], U], +) -> Task[M, U]: + try: + return resolve((yield from task)) + except Exception as e: + return reject(e) # type: ignore[arg-type] + +def all_(tasks: Iterable[Task[M, T]]) -> Task[Any, list[T]]: + """ + Takes iterable of tasks and runs them concurrently, returning an array of results in + an order of the tasks (not the order of completion). If any of the tasks fail all + the rest are aborted and error is thrown into the calling task. + """ + self_ = yield from current() + forks: list[Optional[Fork[T, Exception, None]]] = [] + results: list[Optional[T]] = [] + count_ = 0 + + def succeed(idx: int) -> Callable[[T], None]: + def handler(value: T) -> None: + nonlocal count_ + forks[idx] = None + results[idx] = value + count_ -= 1 + if count_ == 0: + enqueue(self_) + return handler + + def fail(error: Exception) -> None: + for handle in forks: + if handle is not None: + enqueue(abort(handle, error)) + enqueue(abort(self_, error)) + + for i, task in enumerate(tasks): + results.append(None) # keeps the results list at a size of len(tasks) + fk = (yield from fork(then_(task, succeed(i), fail))) + forks.append(fk) # type: ignore[arg-type] + count_ += 1 + + if count_ > 0: + yield from suspend() + + return cast(list[T], results) + + +Tagged: TypeAlias = dict[str, Union[Tag, M]] +""" +The dictionary is shaped like: {"type": tag, tag: value}. There is no way to express +that dictionary shape in Python's current type system at the time of writing this +implementation. +""" + +def with_tag(tag: Tag, value: M) -> Tagged[M]: + return {"type": tag, tag: value} + +def tag(effect: Union[ControllerFork[T, X, M], Tagger[T, X, M]], tag: str) -> Effect[Union[Control, Tagged[M]]]: + """ + Tags an effect by boxing each event with an object that has `type` field + corresponding to the given tag and same named field holding original message e.g + given `nums` effect that produces numbers, `tag(nums, "inc")` would create an effect + that produces events like `{"type": "inc", "inc": 1}` + """ + if effect is NONE_: + return NONE_ # type: ignore[return-value] + elif isinstance(effect, Tagger): + return Tagger(tags=(effect.tags + [tag]), source=effect.source) # type: ignore[return-value] + else: + return Tagger([tag], effect) # type:ignore[return-value] + +def listen(sources: dict[Tag, Effect[M]]) -> Effect[Union[Control, Tagged[M]]]: + """ + Takes several effects and merges them into a single effect of tagged variants so + that their source could be identified via `type` field. + """ + forks: list[Fork] = [] + for entry in sources.items(): + name, eff = entry + if eff is not NONE_: + forks.append( + (yield from fork(tag(eff, name))) # type: ignore[arg-type] + ) + yield from group(forks) # type: ignore[misc] + +def batch(effects: list[Effect[T]]) -> Effect[T]: + """ + Takes several effects and combines them into one effect + """ + forks: list[Fork] = [] + for eff in effects: + forks.append((yield from fork(eff))) # type: ignore[arg-type] + yield from group(forks) # type: ignore[misc] + +def effects(tasks: list[Task[None, T]]) -> Effect[Optional[T]]: + """ + Takes several tasks and creates an effect of them all. + """ + if tasks: + return batch([effect(task) for task in tasks]) + else: + return NONE_ diff --git a/py.typed b/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/pyproject.toml b/pyproject.toml index 483f3ea..6ddfdb7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,15 +5,13 @@ build-backend = "hatchling.build" [project] name = "actress" version = "0.0.1" -authors = [ - { name="Alan Shaw", email="alan@storacha.network" }, -] +authors = [{ name = "Alan Shaw", email = "alan@storacha.network" }] description = "An implementation of actor in Python." readme = "README.md" requires-python = ">=3.9" classifiers = [ - "Programming Language :: Python :: 3", - "Operating System :: OS Independent", + "Programming Language :: Python :: 3", + "Operating System :: OS Independent", ] license = "Apache-2.0 OR MIT" license-files = ["LICENSE.md"] @@ -21,3 +19,20 @@ license-files = ["LICENSE.md"] [project.urls] Homepage = "https://github.com/storacha/py-actress" Issues = "https://github.com/storacha/py-actress/issues" + +[project.optional-dependencies] +dev = [ + "mypy>=1.19", + "pylint>=4.0", + "pytest>=9.0", + "pytest-asyncio>=1.3", + "pytest-cov>=7.0", + "pytest-timeout>=2.4", +] + +[tool.pytest] +minversion = "9.0" +addopts = ["-ra", "-q", "--cov=actress", "--cov-report=html", "--cov-branch"] +testpaths = ["test"] +asyncio_mode = "auto" +timeout = "5" diff --git a/requirements.txt b/requirements.txt index 455ce99..542b9ed 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,54 @@ -typing-validation==1.2.11.post4 -typing_extensions==4.12.2 +# +# This file is autogenerated by pip-compile with Python 3.13 +# by the following command: +# +# pip-compile --extra=dev --output-file=requirements.txt pyproject.toml +# +astroid==4.0.3 + # via pylint +coverage[toml]==7.13.2 + # via pytest-cov +dill==0.4.1 + # via pylint +iniconfig==2.3.0 + # via pytest +isort==7.0.0 + # via pylint +librt==0.7.8 + # via mypy +mccabe==0.7.0 + # via pylint +mypy==1.19.1 + # via actress (pyproject.toml) +mypy-extensions==1.1.0 + # via mypy +packaging==26.0 + # via pytest +pathspec==1.0.4 + # via mypy +platformdirs==4.5.1 + # via pylint +pluggy==1.6.0 + # via + # pytest + # pytest-cov +pygments==2.19.2 + # via pytest +pylint==4.0.4 + # via actress (pyproject.toml) +pytest==9.0.2 + # via + # actress (pyproject.toml) + # pytest-asyncio + # pytest-cov + # pytest-timeout +pytest-asyncio==1.3.0 + # via actress (pyproject.toml) +pytest-cov==7.0.0 + # via actress (pyproject.toml) +pytest-timeout==2.4.0 + # via actress (pyproject.toml) +tomlkit==0.14.0 + # via pylint +typing-extensions==4.12.2 + # via mypy diff --git a/test/conftest.py b/test/conftest.py new file mode 100644 index 0000000..5275dde --- /dev/null +++ b/test/conftest.py @@ -0,0 +1,11 @@ +# import pytest + +# from test.utils import Logger +# +# @pytest.fixture +# def create_log() -> Callable[[], Logger]: +# def _create_log(): +# from .utils import create_log as make_log +# return make_log() +# +# return _create_log diff --git a/test/test_lib.py b/test/test_lib.py deleted file mode 100644 index 934bce3..0000000 --- a/test/test_lib.py +++ /dev/null @@ -1,5 +0,0 @@ -import pytest - - -def test_placeholder() -> None: - assert True diff --git a/test/test_task.py b/test/test_task.py new file mode 100644 index 0000000..c9fc596 --- /dev/null +++ b/test/test_task.py @@ -0,0 +1,1298 @@ +import asyncio +from collections.abc import Generator +import json + +import pytest + +from actress import task +from .utils import create_log, inspect, InspectResult + + +@pytest.mark.asyncio +class TestWait: + async def test_wait_on_non_promise(self): + is_sync = True + def worker(): + message = yield from task.wait(5) + assert is_sync == True, "expect to be sync" + return message + + fork = inspect(worker()).activate() + is_sync = False + result = await fork + + assert result == InspectResult(ok=True, value=5, mail=[], error=None) + + async def test_wait_on_promise(self): + is_sync = True + + loop = asyncio.get_running_loop() + promise = loop.create_future() + promise.set_result(5) + + def main(): + message = yield from task.wait(promise) + assert is_sync == False, "expect to be async" + return message + + fork = inspect(main()).activate() + is_sync = False + result = await fork + + assert result == InspectResult(ok=True, value=5, mail=[], error=None) + + async def test_lets_you_yield(self): + def main(): + value = yield 5 + assert value == None, "return None on normal yield" + + result = await inspect(main()) + assert result == InspectResult(value=None, ok=True, mail=[5], error=None) + + async def test_throw_on_failed_promises(self): + boom = Exception("boom!") + + loop = asyncio.get_running_loop() + promise = loop.create_future() + promise.set_exception(boom) + + def main(): + message = yield from task.wait(promise) + return message + + result = await inspect(main()) + assert result == InspectResult(ok=False, value=None, mail=[], error=boom) + + async def test_can_catch_promise_errors(self): + boom = Exception("boom!") + + loop = asyncio.get_running_loop() + promise = loop.create_future() + promise.set_exception(boom) + + def main(): + try: + message = yield from task.wait(promise) + return message + except Exception as e: + return e + + result = await inspect(main()) + assert result == InspectResult(ok=True, value=boom, mail=[], error=None) + + async def test_can_intercept_thrown_errors(self): + boom = Exception("boom!") + def fail(): + raise boom + + def main(): + if False: yield + return fail() + + result = await inspect(main()) + assert result == InspectResult(ok=False, mail=[], error=boom, value=None) + + async def test_can_catch_thrown_errors(self): + boom = Exception("boom!") + def fail(): + raise boom + + def main(): + try: + if False: yield + return fail() + except Exception as e: + return e + + result = await inspect(main()) + assert result == InspectResult(ok=True, mail=[], value=boom, error=None) + + async def test_use_finally(self): + boom = Exception("boom!") + finalized = False + + loop = asyncio.get_running_loop() + promise = loop.create_future() + promise.set_exception(boom) + + def main(): + nonlocal finalized + try: + message = yield from task.wait(promise) + return message + finally: + finalized = True + + result = await inspect(main()) + assert result == InspectResult(ok=False, mail=[], error=boom, value=None) + assert finalized == True + + +@pytest.mark.asyncio +class TestMessaging: + async def test_can_send_message(self): + def main(): + yield from task.send("one") + yield from task.send("two") + + result = await inspect(main()) + assert result == InspectResult(value=None, ok=True, mail=["one", "two"], error=None) + + async def test_can_send_message_in_finally(self): + def main(): + try: + yield from task.send("one") + yield from task.send("two") + finally: + yield from task.send("three") + + result = await inspect(main()) + assert result == InspectResult(value=None, ok=True, mail=["one", "two", "three"], error=None) + + async def test_can_send_message_after_exception(self): + boom = Exception("boom!") + def main(): + try: + yield from task.send("one") + yield from task.send("two") + raise boom + yield from task.send("three") + finally: + yield from task.send("four") + + result = await inspect(main()) + assert result == InspectResult(value=None, ok=False, mail=["one", "two", "four"], error=boom) + + async def test_can_send_message_after_rejected_promise(self): + boom = Exception("boom!") + def main(): + try: + yield from task.send("one") + yield from task.send("two") + + loop = asyncio.get_running_loop() + promise = loop.create_future() + promise.set_exception(boom) + + yield from task.wait(promise) + yield from task.send("three") + finally: + yield from task.send("four") + + result = await(inspect(main())) + assert result == InspectResult(ok=False, error=boom, value=None, mail=["one", "two", "four"]) + + async def test_can_send_message_before_rejected_promise_in_finally(self): + boom = Exception("boom!") + oops = Exception("oops!") + def main(): + try: + yield from task.send("one") + yield from task.send("two") + + loop = asyncio.get_running_loop() + promise = loop.create_future() + promise.set_exception(boom) + + yield from task.wait(promise) + yield from task.send("three") + finally: + loop = asyncio.get_running_loop() + promise = loop.create_future() + promise.set_exception(oops) + + yield from task.wait(promise) + yield from task.send("four") + + result = await(inspect(main())) + assert result == InspectResult(ok=False, error=oops, value=None, mail=["one", "two"]) + + async def test_subtasks_can_send_messages(self): + oops = Exception("oops") + def worker(): + yield from task.send("c1") + + def main(): + try: + yield from task.send("one") + yield from task.send("two") + yield from worker() + yield from task.send("three") + finally: + yield from task.send("four") + + loop = asyncio.get_running_loop() + promise = loop.create_future() + promise.set_exception(oops) + + yield from task.wait(promise) + yield from task.send("five") + + result = await inspect(main()) + assert result == InspectResult( + ok=False, mail=["one", "two", "c1", "three", "four"], value = None, error=oops + ) + +@pytest.mark.asyncio +class TestSubtasks: + async def test_subtask_crashes_parent(self): + err = Exception(5) + def worker(x, y): + return (yield from task.wait(x)) + (yield from task.wait(y)) + + def main(): + loop = asyncio.get_running_loop() + promise = loop.create_future() + promise.set_exception(err) + + one = yield from worker(1, 2) + two = yield from worker(promise, one) + return two + + result = await inspect(main()) + assert result == InspectResult(ok=False, value=None, mail=[], error=err) + + async def test_fork_does_not_crash_parent(self): + boom = Exception("boom") + + _logger = create_log() + output, log = (_logger["output"], _logger["log"],) + + loop = asyncio.get_running_loop() + + def work(id: str): + log(f"start {id}") + yield from task.send(f"{id}#1") + + promise = loop.create_future() + promise.set_exception(boom) + + yield from task.wait(promise) + return 0 + + def main(): + yield from task.fork(work("A")) + + promise_1 = loop.create_future() + promise_1.set_result("one") + yield from task.wait(promise_1) + + yield from task.fork(work("B")) + + promise_2 = loop.create_future() + promise_2.set_result("two") + return (yield from task.wait(promise_2)) + + result = await inspect(main()) + assert result == InspectResult(ok=True, value="two", mail=[], error=None) + assert output == ["start A", "start B"] + + + async def test_waiting_on_forks_result_crashes_parent(self): + _logger = create_log() + output, log = (_logger["output"], _logger["log"],) + + def worker(id: str): + log(f"Start {id}") + yield from task.send(f"{id}#1") + + loop = asyncio.get_running_loop() + promise = loop.create_future() + + boom = Exception(f"{id}!boom") + promise.set_exception(boom) + yield from task.wait(promise) + + def main(): + a = yield from task.fork(worker("A")) + + loop = asyncio.get_running_loop() + promise = loop.create_future() + promise.set_result("one") + yield from task.wait(promise) + + b = yield from task.fork(worker("B")) + + yield from task.send("hi") + + yield from task.group([a, b]) + + loop = asyncio.get_running_loop() + promise = loop.create_future() + promise.set_result("two") + yield from task.wait(promise) + + return 0 + + result = await inspect(main()) + expected = InspectResult( + ok=False, value=None, error=Exception("A!boom"), mail=["hi", "B#1"] + ) + assert result["ok"] == expected["ok"] + assert result["mail"] == expected["mail"] + assert result["value"] == expected["value"] + assert str(result["error"]) == str(expected["error"]) + + async def test_joining_failed_forks_crashes_parent(self): + _logger = create_log() + output, log = (_logger["output"], _logger["log"],) + + def work(id): + log(f"Start {id}") + yield from task.send(f"{id}#1") + return id + + def main(): + a = yield from task.fork(work("A")) + + loop = asyncio.get_running_loop() + promise = loop.create_future() + promise.set_result("one") + yield from task.wait(promise) + + b = yield from task.fork(work("B")) + + yield from task.send("hi") + + result = yield from task.join(b) + assert result == "B" + + result2 = yield from task.join(a) + assert result2 == "A" + + result = await inspect(main()) + + assert result == InspectResult(ok=True, value=None, mail=["hi", "B#1"], error=None) + assert output == ["Start A", "Start B"] + + async def test_failing_group_member_terminates_group(self): + _logger = create_log() + output, log = (_logger["output"], _logger["log"],) + boom = Exception("boom") + + def work(ms = 0, name = "", crash = False): + log(f"{name} on duty") + if crash: + yield from task.sleep(ms) + raise boom + + try: + yield from task.sleep(ms) + log(f"{name} is done") + finally: + log(f"{name} cancelled") + + def main(): + a = yield from task.fork(work(1, "A")) + + yield from task.sleep(2) + + b = yield from task.fork(work(8, "B")) + c = yield from task.fork(work(14, "C")) + d = yield from task.fork(work(4, "D", True)) + e = yield from task.fork(work(10, "E")) + + try: + yield from task.group([a, b, c, d, e]) + except Exception as e: + yield from task.sleep(30) + return e + + assert ( + ((await inspect(main()))) == + InspectResult(ok=True, value=boom, mail=[], error=None) + ) + assert sorted(output) == sorted([ + "A on duty", + "B on duty", + "C on duty", + "D on duty", + "E on duty", + "A is done", + "E cancelled", + "A cancelled", + "B cancelled", + "C cancelled", + ]) + + async def test_failed_task_fails_the_group(self): + _logger = create_log() + output, log = (_logger["output"], _logger["log"],) + boom = Exception("boom") + + def fail(error=boom) -> Generator[None, None, None]: + raise error + yield + + def work(ms = 0, name = "", crash = False): + log(f"{name} on duty") + + try: + yield from task.sleep(ms) + log(f"{name} is done") + finally: + log(f"{name} cancelled") + + def main(): + f = yield from task.fork(fail(boom)) + a = yield from task.fork(work(2, "a")) + yield from task.sleep() + + yield from task.group([a, (yield from task.fork(work(4, "b"))), f, (yield from task.fork(work(2, "c")))]) + + assert ( + ((await inspect(main()))) == + InspectResult(ok=False, value=None, mail=[], error=boom) + ) + await task.fork(task.sleep(10)) + assert output == ["a on duty", "a cancelled"] + + async def test_can_make_empty_group(self): + def main(): + return (yield from task.group([])) + assert ( + (await inspect(main())) == + InspectResult(ok=True, value=None, error=None, mail=[]) + ) + + +class TestConcurrency: + async def test_can_run_tasks_concurrently(self): + def worker(name: str, duration: float, count: int): + for n in range(1, count + 1): + yield from task.sleep(duration) + yield from task.send(f"{name}#{n}") + + def main(): + a = yield from task.fork(worker("a", 5, 6)) + yield from task.sleep(5) + b = yield from task.fork(worker("b", 7, 7)) + + yield from task.group([a, b]) + + result = await inspect(main()) + mail: list[str] = result["mail"] # type: ignore[assignemnt] + assert sorted(mail) == [ + "a#1", + "a#2", + "a#3", + "a#4", + "a#5", + "a#6", + "b#1", + "b#2", + "b#3", + "b#4", + "b#5", + "b#6", + "b#7", + ], "has all the items" + + async def test_can_fork_and_join(self): + _logger = create_log() + output, log = (_logger["output"], _logger["log"],) + + def work(name): + log(f"> {name} sleep") + yield from task.sleep(2) + log(f"< {name} wake") + + def main(): + log("Spawn A") + a = yield from task.fork(work("A")) + + log("Sleep") + yield from task.sleep(20) + + log("Spawn B") + b = yield from task.fork(work("B")) + + log("Join") + merge = task.group([a, b]) + yield from merge + + log("Nap") + yield from task.sleep(2) + + log("Exit") + + await task.fork(main(), task.ForkOptions(name="🤖")) + + assert output == [ + "Spawn A", + "Sleep", + "> A sleep", + "< A wake", + "Spawn B", + "Join", + "> B sleep", + "< B wake", + "Nap", + "Exit", + ] + + async def test_joining_failed_task_throws(self): + boom = Exception("boom") + + def work(): + raise boom + + def main(): + worker = yield from task.fork(work()) + yield from task.sleep(10) + + yield from task.join(worker) + + result = await inspect(main()) + assert result == InspectResult(ok=False, error=boom, value=None, mail=[]) + + async def test_spawn_can_outlive_parent(self): + _logger = create_log() + output, log = (_logger["output"], _logger["log"],) + + def worker(): + log("start fork") + yield from task.sleep(2) + log("exit fork") + + def main(): + log("start main") + yield from task.spawn(worker()) + log("exit main") + + await task.fork(main()) + await task.fork(task.sleep(20)) + + assert output == [ + "start main", + "exit main", + "start fork", + "exit fork", + ] + + async def test_throws_on_exit(self): + boom = Exception("boom") + def work(): + try: + yield from task.sleep(5) + finally: + raise boom + + def main(): + worker = yield from task.fork(work()) + yield from task.sleep() + yield from task.exit_(worker, None) + + assert (await inspect(main())) == InspectResult(ok=True, value=None, mail=[], error=None) + + +class TestCanAbort: + async def test_can_terminate_sleeping_task(self): + _logger = create_log() + output, log = (_logger["output"], _logger["log"],) + + def worker(): + log("start worker") + yield from task.sleep(20) + log("wake worker") + + def main(): + log("fork worker") + fork_task = yield from task.fork(worker()) + log("nap") + yield from task.sleep(1) + log("terminate worker") + yield from task.terminate(fork_task) + log("exit main") + + expect = [ + "fork worker", + "nap", + "start worker", + "terminate worker", + "exit main", + ] + await task.fork(main()) + assert output == expect + await task.fork(task.sleep(30)) + + async def test_sleeping_task_can_still_cleanup(self): + _logger = create_log() + output, log = (_logger["output"], _logger["log"],) + + def worker(): + log("start worker") + loop = asyncio.get_running_loop() + id = loop.call_later(10/1000, (lambda: log("timeout fired"))) + + try: + yield from task.suspend() + finally: + id.cancel() + log("can clean up even though aborted") + + def main(): + log("fork worker") + fork_task = yield from task.fork(worker()) + log("nap") + yield from task.sleep(1) + log("abort worker") + yield from task.terminate(fork_task) + log("exit main") + + expect = [ + "fork worker", + "nap", + "start worker", + "abort worker", + "can clean up even though aborted", + "exit main", + ] + await task.fork(main()) + await task.fork(task.sleep(30)) + + + async def test_can_abort_with_an_error(self): + _logger = create_log() + output, log = (_logger["output"], _logger["log"],) + + def worker(): + try: + log("start worker") + yield from task.sleep(20) + log("wake worker") + except Exception as e: + log(f"aborted {e}") + + def main(): + log("fork worker") + fork = yield from task.fork(worker()) + log("nap") + yield from task.sleep(1) + log("abort worker") + yield from task.abort(fork, error=Exception("kill")) + log("exit main") + + expect = [ + "fork worker", + "nap", + "start worker", + "abort worker", + "aborted kill", + "exit main", + ] + await task.fork(main()) + assert output == expect + await task.fork(task.sleep(30)) + + assert output == expect + + async def test_can_still_do_things_when_aborted(self): + _logger = create_log() + output, log = (_logger["output"], _logger["log"],) + + def worker(): + try: + log("start worker") + yield from task.sleep(20) + log("wake worker") + except Exception as e: + log(f"aborted {e}") + yield from task.sleep(2) + log("ok bye") + + def main(): + log("fork worker") + fork = yield from task.fork(worker()) + log("nap") + yield from task.sleep(1) + log("abort worker") + yield from task.abort(fork, Exception("kill")) + log("exit main") + + expect = [ + "fork worker", + "nap", + "start worker", + "abort worker", + "aborted kill", + "exit main", + ] + await task.fork(main()) + assert output == expect + await task.fork(task.sleep(10)) + assert output == [*expect] + ["ok bye"] + + async def test_can_still_suspend_after_aborting(self): + _logger = create_log() + output, log = (_logger["output"], _logger["log"],) + + def worker(): + current_task = yield from task.current() + try: + log("start worker") + yield from task.sleep(20) + log("wake worker") + except Exception as e: + log(f"aborted {e}") + loop = asyncio.get_running_loop() + loop.call_later(2/1000, lambda: task.resume(current_task)) + log("suspend after abort") + yield from task.suspend() + log("ok bye now") + + def main(): + log("fork worker") + fork = yield from task.fork(worker()) + log("nap") + yield from task.sleep(1) + log("abort worker") + yield from task.abort(fork, Exception("kill")) + log("exit main") + + expect = [ + "fork worker", + "nap", + "start worker", + "abort worker", + "aborted kill", + "suspend after abort", + "exit main", + ] + await task.fork(main()) + assert output == expect + await task.fork(task.sleep(10)) + assert output == [*expect] + ["ok bye now"] + + async def test_can_exit_the_task(self): + _logger = create_log() + output, log = (_logger["output"], _logger["log"],) + + def main(): + log("fork worker") + fork = yield from task.fork(worker()) + log("nap") + yield from task.sleep(1) + log("exit worker") + yield from task.exit_(fork, 0) + log("exit main") + + def worker(): + try: + log("start worker") + yield from task.sleep(20) + log("wake worker") + return 0 + except Exception as e: + # generator exits with `StopIteration` in Python - a feature not a bug + log(f"aborted {e}") + return 1 + + expect = [ + "fork worker", + "nap", + "start worker", + "exit worker", + "aborted generator raised StopIteration", + "exit main", + ] + await task.fork(main()) + print(output) + assert output == expect + await task.fork(task.sleep(30)) + assert output == expect + + +class TestPromise: + async def test_fails_promise_if_task_fails(self): + class boom(Exception): + def __init__(self, message: str) -> None: + super().__init__(message) + + def main(): + raise boom("boom") + yield + + with pytest.raises(boom) as exc_info: + result = await task.fork(main()) + assert str(exc_info.value) == "boom" + + async def test_can_use_await(self): + # alternative for js test: "can use then" + def work(): + yield from task.sleep(1) + return 0 + + result = await task.fork(work()) + assert result == 0 + + async def test_can_bubble_error(self): + # alternative for js test: "can use catch" + boom = Exception("boom") + + def work(): + yield from task.sleep(1) + raise boom + yield + + try: + await task.fork(work()) + except Exception as e: + assert e == boom + + async def test_can_use_finally(self): + def work(): + yield from task.sleep(1) + return 0 + + invoked = False + try: + result = await task.fork(work()) + finally: + invoked = True + + assert result == 0 + assert invoked == True + + async def test_has_to_string_tag(self): + fork = task.fork(task.sleep(2)) + assert str(fork) == f"Fork(id={fork.id}, status='{fork.status}')" + + +class TestTag: + async def test_tags_effect(self): + def fx(): + yield from task.send(1) + yield from task.sleep(2) + yield from task.send(2) + + result = await inspect(task.tag(fx(), "fx")) + assert result == InspectResult( + ok=True, value=None, error=None, mail=[ + {"type": "fx", "fx": 1}, + {"type": "fx", "fx": 2}, + ] + ) + + async def test_tags_with_errors(self): + error = Exception("boom") + def fx(): + yield from task.send(1) + raise error + + def main(): + yield from task.tag(fx(), "fx") + + result = await inspect(main()) + assert result == InspectResult( + ok=False, error=error, value=None, mail=[{"type": "fx", "fx": 1}] + ) + + async def test_can_terminate_tagged(self): + _logger = create_log() + output, log = (_logger["output"], _logger["log"],) + + def fx(): + yield from task.send(1) + log("send 1") + yield from task.sleep(2) + yield from task.send(2) + log("send 2") + + def main(): + fork = yield from task.fork(task.tag(fx(), "fx")) + yield from task.sleep(1) + yield from task.terminate(fork) + + result = await inspect(main()) + assert result == InspectResult(ok=True, value=None, error=None, mail=[]) + assert output == ["send 1"] + await task.fork(task.sleep(5)) + assert output == ["send 1"] + + async def test_can_abort_tagged(self): + _logger = create_log() + output, log = (_logger["output"], _logger["log"],) + + def fx(): + yield from task.send(1) + log("send 1") + yield from task.sleep(1) + yield from task.send(2) + log("send 2") + + def main(): + tagged = task.tag(fx(), "fx") + assert str(tagged) == "TaggedEffect" + fork = yield from task.fork(tagged) + yield from task.sleep(1) + yield from task.abort(fork, Exception("kill")) + + result = await inspect(main()) + assert result == InspectResult(ok=True, value=None, error=None, mail=[]) + assert output == ["send 1"] + await task.fork(task.sleep(5)) + assert output == ["send 1"] + + async def test_can_double_tag(self): + def fx(): + yield from task.send(1) + yield from task.sleep(1) + yield from task.send(2) + + tagged = task.tag(task.tag(fx(), "foo"), "bar") + assert ( + (await inspect(tagged)) == + InspectResult( + ok=True, + value=None, + mail=[ + {"type": "bar", "bar": {"type": "foo", "foo": 1}}, + {"type": "bar", "bar": {"type": "foo", "foo": 2}} + ], + error=None + ) + ) + + async def test_tagging_none_is_no_op(self): + def fx(): + yield from task.send(1) + yield from task.sleep(1) + yield from task.send(2) + + tagged = task.tag(task.tag(task.none_(), "foo"), "bar") + assert ( + (await inspect(tagged)) == + InspectResult(ok=True, value=None, mail=[], error=None) + ) + + +class TestEffect: + async def test_can_listen_to_several_fx(self): + def source(delay, count): + for n in range(count): + yield from task.sleep(delay) + yield from task.send(n) + + fx = task.listen({ + "beep": source(3, 5), + "bop": source(5, 3), + "buz": source(2, 2), + }) + + result = await inspect(fx) + mail = result.pop("mail") + assert result == {"ok": True, "value": None, "error": None} + inbox = list(map(lambda m: json.dumps(m), mail)) # type: ignore[arg-type] + + expect = [ + {"type": "beep", "beep": 0}, + {"type": "beep", "beep": 1}, + {"type": "beep", "beep": 2}, + {"type": "beep", "beep": 3}, + {"type": "beep", "beep": 4}, + {"type": "bop", "bop": 0}, + {"type": "bop", "bop": 1}, + {"type": "bop", "bop": 2}, + {"type": "buz", "buz": 0}, + {"type": "buz", "buz": 1}, + ] + print(f"DEBUG: \n{sorted(inbox)}") + assert sorted(inbox) != inbox, "messages arent supposed ordered by actors" + assert sorted(inbox) == list(map(lambda v: json.dumps(v), expect)), "all messages has to be recieved" + + async def test_can_listen_to_none(self): + assert ( + (await inspect(task.listen({}))) == + InspectResult(ok=True, value=None, mail=[], error=None) + ) + + async def test_can_produce_no_messages_on_empty_tasks(self): + _logger = create_log() + output, log = (_logger["output"], _logger["log"],) + + def work(): + log("start work") + yield from task.sleep(2) + log("end work") + + main = task.listen({"none": work()}) + + assert ( + (await inspect(main)) == + InspectResult(ok=True, value=None, mail=[], error=None) + ) + + async def test_can_turn_task_into_effect(self): + def work(): + task.sleep(1) + return "hi" + yield + + fx = task.effect(work()) + + assert ( + (await inspect(fx)) == + InspectResult(ok=True, value=None, mail=["hi"], error=None) + ) + + async def test_can_turn_multiple_tasks_into_effect(self): + def fx(msg="", delay=1): + yield from task.sleep(delay) + return msg + + effect = task.effects([fx("foo", 5), fx("bar", 1), fx("baz", 2)]) + assert ( + (await inspect(effect)) == InspectResult( + ok=True, value=None, mail=["bar", "baz", "foo"], error=None + ) + ) + + async def test_can_turn_zero_tasks_into_effect(self): + effect = task.effects([]) + assert ( + (await inspect(effect)) == InspectResult( + ok=True, value=None, mail=[], error=None + ) + ) + + async def test_can_batch_multiple_effects(self): + def fx(msg="", delay=1): + yield from task.sleep(delay) + yield from task.send(msg) + + effect = task.batch([fx("foo", 5), fx("bar", 1), fx("baz", 2)]) + assert ( + (await inspect(effect)) == InspectResult( + ok=True, value=None, mail=["bar", "baz", "foo"], error=None + ) + ) + + async def test_can_loop(self): + _logger = create_log() + output, log = (_logger["output"], _logger["log"],) + + def step(*, n: int = 0): + log(f"<< {n}") + while (n := n - 1) > 0: + log(f">> {n}") + yield from task.sleep(n) + yield from task.send({"n": n}) + + main = await task.fork(task.loop(step(n=4), step)) # type: ignore[arg-type] + assert sorted(output) != output + assert sorted(output) == sorted([ + "<< 4", + ">> 3", + ">> 2", + ">> 1", + "<< 3", + ">> 2", + ">> 1", + "<< 2", + ">> 1", + "<< 1", + "<< 2", + ">> 1", + "<< 1", + "<< 1", + "<< 1", + ]) + + async def test_can_wait_in_a_loop(self): + _logger = create_log() + output, log = (_logger["output"], _logger["log"],) + + def msg_func(message): + log(f"<< {message}") + result = yield from task.wait(0) + log(f">> {result}") + return + yield + + main = task.loop(task.send("start"), msg_func) + + assert (await task.fork(main)) == None + assert output == ["<< start", ">> 0"] + + +class TestAllOperator: + async def test_can_get_all_results(self): + _logger = create_log() + output, log = (_logger["output"], _logger["log"],) + + def work(duration: int, result: str): + yield from task.sleep(duration) + log(result) + return result + + def main(): + result = yield from task.all_([ + work(2, "a"), + work(9, "b"), + work(5, "c"), + work(0, "d"), + ]) + return result + + result = await task.fork(main()) + assert result == ["a", "b", "c", "d"] + assert result != output + assert sorted(result) == sorted(output) + + async def test_on_failure_all_other_tasks_are_aborted(self): + _logger = create_log() + output, log = (_logger["output"], _logger["log"],) + + def work(duration: int, name: str, crash: bool = False): + yield from task.sleep(duration) + log(name) + if crash: + raise Exception(f"{name}") + else: + return name + + def main(): + result = yield from task.all_([ + work(2, "a"), + work(9, "b"), + work(5, "c", True), + work(0, "d"), + work(8, "e"), + ]) + return result + + result = await inspect(main()) + assert result["ok"] == False + assert str(result["error"]) == str(Exception("c")) + assert result["mail"] == [] + + await task.fork(task.sleep(20)) + assert sorted(output) == sorted(["d", "a", "c"]) + + async def test_can_make_all_of_none(self): + assert (await task.fork(task.all_([]))) == [] + + +class TestForkAPI: + async def test_can_use_abort_method(self): + _logger = create_log() + output, log = (_logger["output"], _logger["log"],) + + kill = Exception("kill") + + def work(): + log("start work") + yield from task.sleep(2) + log("end work") + + def main(): + worker = yield from task.fork(work()) + yield from task.sleep(0) + log("kill") + yield from worker.abort(kill) + log("nap") + yield from task.sleep(5) + log("exit") + + await task.fork(main()) + assert output == ["start work", "kill", "nap", "exit"] + + async def test_can_use_exit_method(self): + _logger = create_log() + output, log = (_logger["output"], _logger["log"],) + + def work(): + try: + log("start work") + yield from task.sleep(2) + log("end work") + finally: + log("cancel work") + + def main(): + worker = yield from task.fork(work()) + yield from task.sleep(0) + log("kill") + yield from worker.exit_(None) + log("nap") + yield from task.sleep(5) + log("exit") + + await task.fork(main()) + assert output == [ + "start work", + "kill", + "cancel work", + "nap", + "exit", + ] + + async def test_can_use_resume_method(self): + _logger = create_log() + output, log = (_logger["output"], _logger["log"],) + + def work(): + log("suspend work") + yield from task.suspend() + log("resume work") + + def main(): + worker = yield from task.fork(work()) + yield from task.sleep(2) + yield from worker.resume() + log("exit") + + await task.fork(main()) + assert output == ["suspend work", "exit", "resume work"] + + async def test_can_use_join_method(self): + def work(): + yield from task.send("a") + yield from task.sleep(2) + yield from task.send("b") + return 0 + + def main(): + worker = yield from task.fork(work()) + yield from task.sleep(0) + result = yield from worker.join() + return result + + result = await inspect(main()) + assert result == InspectResult(ok=True, value=0, mail=["b"], error=None) + + async def test_has_to_string_tag(self): + fork_data_dict = {} + def main(): + fork = yield from task.fork(task.sleep(2)) + fork_data_dict["id"] = fork.id + fork_data_dict["status"] = fork.status + return str(fork) + + assert (await task.fork(main())) == f"Fork(id={fork_data_dict['id']}, status='{fork_data_dict['status']}')" + + async def test_is_iterator(self): + def work(): + yield from task.send("a") + yield from task.send("b") + yield from task.send("c") + + def main(): + fork = yield from task.fork(work()) + return list(fork) + + assert (await task.fork(main())) == [] + + async def test_can_join_non_active_fork(self): + def work(): + yield from task.send("hi") + + worker = task.fork(work()) + + def main(): + yield from task.join(worker) + + assert (await inspect(main())) == InspectResult( + mail=["hi"], ok=True, value=None, error=None + ) diff --git a/test/utils.py b/test/utils.py new file mode 100644 index 0000000..e3ce309 --- /dev/null +++ b/test/utils.py @@ -0,0 +1,47 @@ +from typing import Callable, Generic, Optional, TypedDict, cast +from actress.task import T, M, Control, Fork, Task, fork, is_instruction + + +class Logger(TypedDict): + output: list[str] + log: Callable[[str], list[str]] + + +def create_log() -> Logger: + output: list[str] = [] + def log(message): + output.append(message) + return output + + return Logger(output=output, log=log) + + +class InspectResult(TypedDict, Generic[M, T]): + ok: bool + value: Optional[T] + mail: list[M] # Messages sent by task + error: Optional[Exception] + + +def inspector(task: Task[M, T]) -> Task[Control, InspectResult[M, T]]: + mail: list[M] = [] + controller = iter(task) + input = None + try: + while True: + try: + step = controller.send(input) + except StopIteration as e: + return InspectResult(ok=True, value=e.value, mail=mail, error=None) + else: + instruction = step + if is_instruction(instruction): + input = yield cast(Control, instruction) + else: + print(f"Message yielded: {instruction}") + mail.append(cast(M, instruction)) + except Exception as e: + return InspectResult(ok=False, value=None, error=e, mail=mail) + +def inspect(task: Task[M, T]) -> Fork[InspectResult[M, T], Exception, Control]: + return fork(inspector(task))