Skip to content

Commit

Permalink
feat: asynchronous timer API (#6306)
Browse files Browse the repository at this point in the history
This PR implements a basic asynchronous timer API on top of the libuv
work.

It purposely puts this into `Std.Internal` as we might still have to
change the API as we continue develop of the async library across
releases so I would only like to stabilize it once we are certain this
is a fine API.

A few additional notes:
- we currently do not implement a bind operator on `AsyncTask` on
purpose as `Task.bind` on `Task.pure` is a non trivial operation and
users should be aware of it. Furthermore there is the consideration that
as they will have to bind on both `IO` and `AsyncTask` we might want to
make potential task points explicit in the syntax (did somebody say
`await`?).
- the API generally takes inspiration from
https://docs.rs/tokio/latest/tokio/time/index.html, though it has to
adapt as Rust's and Lean's asynchronity concepts are sufficiently
different.

Stacked on top of #6219.
  • Loading branch information
hargoniX committed Jan 13, 2025
1 parent f3acab7 commit 5f76539
Show file tree
Hide file tree
Showing 5 changed files with 362 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/Std/Internal.lean
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ Released under Apache 2.0 license as described in the file LICENSE.
Authors: Henrik Böving
-/
prelude
import Std.Internal.Async
import Std.Internal.Parsec
import Std.Internal.UV

Expand Down
8 changes: 8 additions & 0 deletions src/Std/Internal/Async.lean
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/-
Copyright (c) 2024 Lean FRO, LLC. All rights reserved.
Released under Apache 2.0 license as described in the file LICENSE.
Authors: Henrik Böving
-/
prelude
import Std.Internal.Async.Basic
import Std.Internal.Async.Timer
115 changes: 115 additions & 0 deletions src/Std/Internal/Async/Basic.lean
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/-
Copyright (c) 2024 Lean FRO, LLC. All rights reserved.
Released under Apache 2.0 license as described in the file LICENSE.
Authors: Henrik Böving
-/
prelude
import Init.Core
import Init.System.IO
import Init.System.Promise

namespace Std
namespace Internal
namespace IO
namespace Async

/--
A `Task` that may resolve to a value or an `IO.Error`.
-/
def AsyncTask (α : Type u) : Type u := Task (Except IO.Error α)

namespace AsyncTask

/--
Construct an `AsyncTask` that is already resolved with value `x`.
-/
@[inline]
protected def pure (x : α) : AsyncTask α := Task.pure <| .ok x

instance : Pure AsyncTask where
pure := AsyncTask.pure

/--
Create a new `AsyncTask` that will run after `x` has finished.
If `x`:
- errors, return an `AsyncTask` that reolves to the error.
- succeeds, run `f` on the result of `x` and return the `AsyncTask` produced by `f`.
-/
@[inline]
protected def bind (x : AsyncTask α) (f : α → AsyncTask β) : AsyncTask β :=
Task.bind x fun r =>
match r with
| .ok a => f a
| .error e => Task.pure <| .error e

/--
Create a new `AsyncTask` that will run after `x` has finished.
If `x`:
- errors, return an `AsyncTask` that reolves to the error.
- succeeds, return an `AsyncTask` that resolves to `f x`.
-/
@[inline]
def map (f : α → β) (x : AsyncTask α) : AsyncTask β :=
Task.map (x := x) fun r =>
match r with
| .ok a => .ok (f a)
| .error e => .error e

/--
Similar to `bind`, however `f` has access to the `IO` monad. If `f` throws an error, the returned
`AsyncTask` resolves to that error.
-/
@[inline]
def bindIO (x : AsyncTask α) (f : α → IO (AsyncTask β)) : BaseIO (AsyncTask β) :=
IO.bindTask x fun r =>
match r with
| .ok a => f a
| .error e => .error e

/--
Similar to `bind`, however `f` has access to the `IO` monad. If `f` throws an error, the returned
`AsyncTask` resolves to that error.
-/
@[inline]
def mapIO (f : α → IO β) (x : AsyncTask α) : BaseIO (AsyncTask β) :=
IO.mapTask (t := x) fun r =>
match r with
| .ok a => f a
| .error e => .error e

/--
Block until the `AsyncTask` in `x` finishes.
-/
def block (x : AsyncTask α) : IO α := do
let res := x.get
match res with
| .ok a => return a
| .error e => .error e

/--
Create an `AsyncTask` that resolves to the value of `x`.
-/
@[inline]
def ofPromise (x : IO.Promise (Except IO.Error α)) : AsyncTask α :=
x.result

/--
Create an `AsyncTask` that resolves to the value of `x`.
-/
@[inline]
def ofPurePromise (x : IO.Promise α) : AsyncTask α :=
x.result.map pure

/--
Obtain the `IO.TaskState` of `x`.
-/
@[inline]
def getState (x : AsyncTask α) : BaseIO IO.TaskState :=
IO.getTaskState x

end AsyncTask

end Async
end IO
end Internal
end Std
139 changes: 139 additions & 0 deletions src/Std/Internal/Async/Timer.lean
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/-
Copyright (c) 2024 Lean FRO, LLC. All rights reserved.
Released under Apache 2.0 license as described in the file LICENSE.
Authors: Henrik Böving
-/
prelude
import Std.Time
import Std.Internal.UV
import Std.Internal.Async.Basic


namespace Std
namespace Internal
namespace IO
namespace Async

/--
`Sleep` can be used to sleep for some duration once.
The underlying timer has millisecond resolution.
-/
structure Sleep where
private ofNative ::
native : Internal.UV.Timer

namespace Sleep

/--
Set up a `Sleep` that waits for `duration` milliseconds.
This function only initializes but does not yet start the timer.
-/
@[inline]
def mk (duration : Std.Time.Millisecond.Offset) : IO Sleep := do
let native ← Internal.UV.Timer.mk duration.toInt.toNat.toUInt64 false
return ofNative native

/--
If:
- `s` is not yet running start it and return an `AsyncTask` that will resolve once the previously
configured `duration` has run out.
- `s` is already or not anymore running return the same `AsyncTask` as the first call to `wait`.
-/
@[inline]
def wait (s : Sleep) : IO (AsyncTask Unit) := do
let promise ← s.native.next
return .ofPurePromise promise

/--
If:
- `s` is still running this will delay the resolution of `AsyncTask`s created with `wait` by
`duration` milliseconds.
- `s` is not yet or not anymore running this is a no-op.
-/
@[inline]
def reset (s : Sleep) : IO Unit :=
s.native.reset

/--
If:
- `s` is still running this stops `s` without resolving any remaing `AsyncTask` that were created
through `wait`. Note that if another `AsyncTask` is binding on any of these it is going hang
forever without further intervention.
- `s` is not yet or not anymore running this is a no-op.
-/
@[inline]
def stop (s : Sleep) : IO Unit :=
s.native.stop

end Sleep

/--
Return an `AsyncTask` that resolves after `duration`.
-/
def sleep (duration : Std.Time.Millisecond.Offset) : IO (AsyncTask Unit) := do
let sleeper ← Sleep.mk duration
sleeper.wait

/--
`Interval` can be used to repeatedly wait for some duration like a clock.
The underlying timer has millisecond resolution.
-/
structure Interval where
private ofNative ::
native : Internal.UV.Timer


namespace Interval

/--
Setup up an `Interval` that waits for `duration` milliseconds.
This function only initializes but does not yet start the timer.
-/
@[inline]
def mk (duration : Std.Time.Millisecond.Offset) (_ : 0 < duration := by decide) : IO Interval := do
let native ← Internal.UV.Timer.mk duration.toInt.toNat.toUInt64 true
return ofNative native

/--
If:
- `i` is not yet running start it and return an `AsyncTask` that resolves right away as the 0th
multiple of `duration` has elapsed.
- `i` is already running and:
- the tick from the last call of `i` has not yet finished return the same `AsyncTask` as the last
call
- the tick frrom the last call of `i` has finished return a new `AsyncTask` that waits for the
closest next tick from the time of calling this function.
- `i` is not running aymore this is a no-op.
-/
@[inline]
def tick (i : Interval) : IO (AsyncTask Unit) := do
let promise ← i.native.next
return .ofPurePromise promise

/--
If:
- `Interval.tick` was called on `i` before the timer restarts counting from now and the next tick
happens in `duration`.
- `i` is not yet or not anymore running this is a no-op.
-/
@[inline]
def reset (i : Interval) : IO Unit :=
i.native.reset

/--
If:
- `i` is still running this stops `i` without resolving any remaing `AsyncTask` that were created
through `tick`. Note that if another `AsyncTask` is binding on any of these it is going hang
forever without further intervention.
- `i` is not yet or not anymore running this is a no-op.
-/
@[inline]
def stop (i : Interval) : IO Unit :=
i.native.stop

end Interval

end Async
end IO
end Internal
end Std
99 changes: 99 additions & 0 deletions tests/lean/run/async_surface_sleep.lean
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import Std.Internal.Async.Timer

/-
these tests are just some preliminary ones as `async_sleep.lean` already contains extensive tests
for the entire timer state machine and `Async.Timer` is merely a light wrapper around it.
-/

open Std.Internal.IO.Async

def BASE_DURATION : Std.Time.Millisecond.Offset := 10

namespace SleepTest

def oneSleep : IO Unit := do
let task ← go
assert! (← task.block) == 37
where
go : IO (AsyncTask Nat) := do
let sleep ← Sleep.mk BASE_DURATION
(← sleep.wait).mapIO fun _ =>
return 37

def doubleSleep : IO Unit := do
let task ← go
assert! (← task.block) == 37
where
go : IO (AsyncTask Nat) := do
let sleep ← Sleep.mk BASE_DURATION
(← sleep.wait).bindIO fun _ => do
(← sleep.wait).mapIO fun _ =>
return 37

def resetSleep : IO Unit := do
let task ← go
assert! (← task.block) == 37
where
go : IO (AsyncTask Nat) := do
let sleep ← Sleep.mk BASE_DURATION
let waiter ← sleep.wait
sleep.reset
waiter.mapIO fun _ =>
return 37

def simpleSleep : IO Unit := do
let task ← go
assert! (← task.block) == 37
where
go : IO (AsyncTask Nat) := do
(← sleep BASE_DURATION).mapIO fun _ =>
return 37

#eval oneSleep
#eval doubleSleep
#eval resetSleep
#eval simpleSleep

end SleepTest

namespace IntervalTest

def oneSleep : IO Unit := do
let task ← go
assert! (← task.block) == 37
where
go : IO (AsyncTask Nat) := do
let interval ← Interval.mk BASE_DURATION
(← interval.tick).mapIO fun _ => do
interval.stop
return 37

def doubleSleep : IO Unit := do
let task ← go
assert! (← task.block) == 37
where
go : IO (AsyncTask Nat) := do
let interval ← Interval.mk BASE_DURATION
(← interval.tick).bindIO fun _ => do
(← interval.tick).mapIO fun _ => do
interval.stop
return 37

def resetSleep : IO Unit := do
let task ← go
assert! (← task.block) == 37
where
go : IO (AsyncTask Nat) := do
let interval ← Interval.mk BASE_DURATION
(← interval.tick).bindIO fun _ => do
let waiter ← interval.tick
interval.reset
waiter.mapIO fun _ => do
interval.stop
return 37

#eval oneSleep
#eval doubleSleep
#eval resetSleep

end IntervalTest

0 comments on commit 5f76539

Please sign in to comment.