Skip to content

Commit

Permalink
chore: split it up
Browse files Browse the repository at this point in the history
  • Loading branch information
hargoniX committed Jan 5, 2025
1 parent f8a67ce commit 77d8e67
Show file tree
Hide file tree
Showing 3 changed files with 249 additions and 192 deletions.
194 changes: 2 additions & 192 deletions src/Std/Internal/Async.lean
Original file line number Diff line number Diff line change
Expand Up @@ -4,195 +4,5 @@ Released under Apache 2.0 license as described in the file LICENSE.
Authors: Henrik Böving
-/
prelude
import Std.Time
import Std.Internal.UV


namespace Std
namespace Internal
namespace IO
namespace Async

/--
A `Task` that may resolve to a value or an `IO.Error`.
-/
def AsyncTask (α : Type u) : Type u := Task (Except IO.Error α)

namespace AsyncTask

@[inline]
protected def pure (x : α) : AsyncTask α := Task.pure <| .ok x

instance : Pure AsyncTask where
pure := AsyncTask.pure

@[inline]
protected def bind (x : AsyncTask α) (f : α → AsyncTask β) : AsyncTask β :=
Task.bind x fun r =>
match r with
| .ok a => f a
| .error e => Task.pure <| .error e

-- TODO: variants with explicit error handling

@[inline]
def bindIO (x : AsyncTask α) (f : α → IO (AsyncTask β)) : BaseIO (AsyncTask β) :=
IO.bindTask x fun r =>
match r with
| .ok a => f a
| .error e => .error e

@[inline]
def mapIO (f : α → β) (x : AsyncTask α) : BaseIO (AsyncTask β) :=
IO.mapTask (t := x) fun r =>
match r with
| .ok a => pure (f a)
| .error e => .error e

/--
Run the `AsyncTask` in `x` and block until it finishes.
-/
def spawnBlocking (x : IO (AsyncTask α)) : IO α := do
let t ← x
let res := t.get
match res with
| .ok a => return a
| .error e => .error e

@[inline]
def spawn (x : IO α) : BaseIO (AsyncTask α) := do
IO.asTask x

@[inline]
def ofPromise (x : IO.Promise α) : AsyncTask α :=
x.result.map pure

@[inline]
def getState (x : AsyncTask α) : BaseIO IO.TaskState :=
IO.getTaskState x

end AsyncTask

/--
`Sleep` can be used to sleep for some duration once.
The underlying timer has millisecond resolution.
-/
structure Sleep where
private ofNative ::
native : Internal.UV.Timer

namespace Sleep

/--
Set up a `Sleep` that waits for `duration` milliseconds.
This function only initializes but does not yet start the timer.
-/
@[inline]
def mk (duration : Std.Time.Millisecond.Offset) : IO Sleep := do
let native ← Internal.UV.Timer.mk duration.toInt.toNat.toUInt64 false
return ofNative native

/--
If:
- `s` is not yet running start it and return an `AsyncTask` that will resolve once the previously
configured `duration` has run out.
- `s` is already or not anymore running return the same `AsyncTask` as the first call to `wait`.
-/
@[inline]
def wait (s : Sleep) : IO (AsyncTask Unit) := do
let promise ← s.native.next
return .ofPromise promise

/--
If:
- `s` is still running this will delay the resolution of `AsyncTask`s created with `wait` by
`duration` milliseconds.
- `s` is not yet or not anymore running this is a no-op.
-/
@[inline]
def reset (s : Sleep) : IO Unit :=
s.native.reset

/--
If:
- `s` is still running this stops `s` without resolving any remaing `AsyncTask` that were created
through `wait`. Note that if another `AsyncTask` is binding on any of these it is going hang
forever without further intervention.
- `s` is not yet or not anymore running this is a no-op.
-/
@[inline]
def stop (s : Sleep) : IO Unit :=
s.native.stop

end Sleep

/--
Return an `AsyncTask` that resolves after `duration`.
-/
def sleep (duration : Std.Time.Millisecond.Offset) : IO (AsyncTask Unit) := do
let sleeper ← Sleep.mk duration
sleeper.wait

/--
`Interval` can be used to repeatedly wait for some duration like a clock.
The underlying timer has millisecond resolution.
-/
structure Interval where
private ofNative ::
native : Internal.UV.Timer


namespace Interval

/--
Setup up an `Interval` that waits for `duration` milliseconds.
This function only initializes but does not yet start the timer.
-/
@[inline]
def mk (duration : Std.Time.Millisecond.Offset) : IO Interval := do
let native ← Internal.UV.Timer.mk duration.toInt.toNat.toUInt64 true
return ofNative native

/--
If:
- `i` is not yet running start it and return an `AsyncTask` that resolves right away as the 0th
multiple of `duration` has elapsed.
- `i` is already running and:
- the tick from the last call of `i` has not yet finished return the same `AsyncTask` as the last
call
- the tick frrom the last call of `i` has finished return a new `AsyncTask` that waits for the
closest next tick from the time of calling this function.
- `i` is not running aymore this is a no-op.
-/
@[inline]
def tick (i : Interval) : IO (AsyncTask Unit) := do
let promise ← i.native.next
return .ofPromise promise

/--
If:
- `Interval.tick` was called on `i` before the timer restarts counting from now and the next tick
happens in `duration`.
- `i` is not yet or not anymore running this is a no-op.
-/
@[inline]
def reset (i : Interval) : IO Unit :=
i.native.reset

/--
If:
- `i` is still running this stops `i` without resolving any remaing `AsyncTask` that were created
through `tick`. Note that if another `AsyncTask` is binding on any of these it is going hang
forever without further intervention.
- `i` is not yet or not anymore running this is a no-op.
-/
@[inline]
def stop (i : Interval) : IO Unit :=
i.native.stop

end Interval

end Async
end IO
end Internal
end Std
import Std.Internal.Async.Basic
import Std.Internal.Async.Timer
108 changes: 108 additions & 0 deletions src/Std/Internal/Async/Basic.lean
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/-
Copyright (c) 2024 Lean FRO, LLC. All rights reserved.
Released under Apache 2.0 license as described in the file LICENSE.
Authors: Henrik Böving
-/
prelude
import Init.Core
import Init.System.IO
import Init.System.Promise

namespace Std
namespace Internal
namespace IO
namespace Async

/--
A `Task` that may resolve to a value or an `IO.Error`.
-/
def AsyncTask (α : Type u) : Type u := Task (Except IO.Error α)

namespace AsyncTask

/--
Construct an `AsyncTask` that is already resolved with value `x`.
-/
@[inline]
protected def pure (x : α) : AsyncTask α := Task.pure <| .ok x

instance : Pure AsyncTask where
pure := AsyncTask.pure

/--
Create a new `AsyncTask` that will run after `x` has finished.
If `x`:
- errors, return an `AsyncTask` that reolves to the error.
- succeeds, run `f` on the result of `x` and return the `AsyncTask` produced by `f`.
-/
@[inline]
protected def bind (x : AsyncTask α) (f : α → AsyncTask β) : AsyncTask β :=
Task.bind x fun r =>
match r with
| .ok a => f a
| .error e => Task.pure <| .error e

/--
Create a new `AsyncTask` that will run after `x` has finished.
If `x`:
- errors, return an `AsyncTask` that reolves to the error.
- succeeds, return an `AsyncTask` that resolves to `f x`.
-/
@[inline]
def map (f : α → β) (x : AsyncTask α) : AsyncTask β :=
Task.map (x := x) fun r =>
match r with
| .ok a => .ok (f a)
| .error e => .error e

/--
Similar to `bind`, however `f` has access to the `IO` monad. If `f` throws an error, the returned
`AsyncTask` resolves to that error.
-/
@[inline]
def bindIO (x : AsyncTask α) (f : α → IO (AsyncTask β)) : BaseIO (AsyncTask β) :=
IO.bindTask x fun r =>
match r with
| .ok a => f a
| .error e => .error e

/--
Similar to `bind`, however `f` has access to the `IO` monad. If `f` throws an error, the returned
`AsyncTask` resolves to that error.
-/
@[inline]
def mapIO (f : α → IO β) (x : AsyncTask α) : BaseIO (AsyncTask β) :=
IO.mapTask (t := x) fun r =>
match r with
| .ok a => f a
| .error e => .error e

/--
Block until the `AsyncTask` in `x` finishes.
-/
def block (x : AsyncTask α) : IO α := do
let res := x.get
match res with
| .ok a => return a
| .error e => .error e

/--
Create an `AsyncTask` that resolves to the value of `x`.
-/
@[inline]
def ofPromise (x : IO.Promise α) : AsyncTask α :=
x.result.map pure

/--
Obtain the `IO.TaskState` of `x`.
-/
@[inline]
def getState (x : AsyncTask α) : BaseIO IO.TaskState :=
IO.getTaskState x

end AsyncTask

end Async
end IO
end Internal
end Std
Loading

0 comments on commit 77d8e67

Please sign in to comment.