Skip to content

Commit f1e506a

Browse files
authored
feat: asynchronous timer API (#6306)
This PR implements a basic asynchronous timer API on top of the libuv work. It purposely puts this into `Std.Internal` as we might still have to change the API as we continue develop of the async library across releases so I would only like to stabilize it once we are certain this is a fine API. A few additional notes: - we currently do not implement a bind operator on `AsyncTask` on purpose as `Task.bind` on `Task.pure` is a non trivial operation and users should be aware of it. Furthermore there is the consideration that as they will have to bind on both `IO` and `AsyncTask` we might want to make potential task points explicit in the syntax (did somebody say `await`?). - the API generally takes inspiration from https://docs.rs/tokio/latest/tokio/time/index.html, though it has to adapt as Rust's and Lean's asynchronity concepts are sufficiently different. Stacked on top of #6219.
1 parent cd71bf0 commit f1e506a

File tree

5 files changed

+362
-0
lines changed

5 files changed

+362
-0
lines changed

src/Std/Internal.lean

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ Released under Apache 2.0 license as described in the file LICENSE.
44
Authors: Henrik Böving
55
-/
66
prelude
7+
import Std.Internal.Async
78
import Std.Internal.Parsec
89
import Std.Internal.UV
910

src/Std/Internal/Async.lean

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
/-
2+
Copyright (c) 2024 Lean FRO, LLC. All rights reserved.
3+
Released under Apache 2.0 license as described in the file LICENSE.
4+
Authors: Henrik Böving
5+
-/
6+
prelude
7+
import Std.Internal.Async.Basic
8+
import Std.Internal.Async.Timer

src/Std/Internal/Async/Basic.lean

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/-
2+
Copyright (c) 2024 Lean FRO, LLC. All rights reserved.
3+
Released under Apache 2.0 license as described in the file LICENSE.
4+
Authors: Henrik Böving
5+
-/
6+
prelude
7+
import Init.Core
8+
import Init.System.IO
9+
import Init.System.Promise
10+
11+
namespace Std
12+
namespace Internal
13+
namespace IO
14+
namespace Async
15+
16+
/--
17+
A `Task` that may resolve to a value or an `IO.Error`.
18+
-/
19+
def AsyncTask (α : Type u) : Type u := Task (Except IO.Error α)
20+
21+
namespace AsyncTask
22+
23+
/--
24+
Construct an `AsyncTask` that is already resolved with value `x`.
25+
-/
26+
@[inline]
27+
protected def pure (x : α) : AsyncTask α := Task.pure <| .ok x
28+
29+
instance : Pure AsyncTask where
30+
pure := AsyncTask.pure
31+
32+
/--
33+
Create a new `AsyncTask` that will run after `x` has finished.
34+
If `x`:
35+
- errors, return an `AsyncTask` that reolves to the error.
36+
- succeeds, run `f` on the result of `x` and return the `AsyncTask` produced by `f`.
37+
-/
38+
@[inline]
39+
protected def bind (x : AsyncTask α) (f : α → AsyncTask β) : AsyncTask β :=
40+
Task.bind x fun r =>
41+
match r with
42+
| .ok a => f a
43+
| .error e => Task.pure <| .error e
44+
45+
/--
46+
Create a new `AsyncTask` that will run after `x` has finished.
47+
If `x`:
48+
- errors, return an `AsyncTask` that reolves to the error.
49+
- succeeds, return an `AsyncTask` that resolves to `f x`.
50+
-/
51+
@[inline]
52+
def map (f : α → β) (x : AsyncTask α) : AsyncTask β :=
53+
Task.map (x := x) fun r =>
54+
match r with
55+
| .ok a => .ok (f a)
56+
| .error e => .error e
57+
58+
/--
59+
Similar to `bind`, however `f` has access to the `IO` monad. If `f` throws an error, the returned
60+
`AsyncTask` resolves to that error.
61+
-/
62+
@[inline]
63+
def bindIO (x : AsyncTask α) (f : α → IO (AsyncTask β)) : BaseIO (AsyncTask β) :=
64+
IO.bindTask x fun r =>
65+
match r with
66+
| .ok a => f a
67+
| .error e => .error e
68+
69+
/--
70+
Similar to `bind`, however `f` has access to the `IO` monad. If `f` throws an error, the returned
71+
`AsyncTask` resolves to that error.
72+
-/
73+
@[inline]
74+
def mapIO (f : α → IO β) (x : AsyncTask α) : BaseIO (AsyncTask β) :=
75+
IO.mapTask (t := x) fun r =>
76+
match r with
77+
| .ok a => f a
78+
| .error e => .error e
79+
80+
/--
81+
Block until the `AsyncTask` in `x` finishes.
82+
-/
83+
def block (x : AsyncTask α) : IO α := do
84+
let res := x.get
85+
match res with
86+
| .ok a => return a
87+
| .error e => .error e
88+
89+
/--
90+
Create an `AsyncTask` that resolves to the value of `x`.
91+
-/
92+
@[inline]
93+
def ofPromise (x : IO.Promise (Except IO.Error α)) : AsyncTask α :=
94+
x.result
95+
96+
/--
97+
Create an `AsyncTask` that resolves to the value of `x`.
98+
-/
99+
@[inline]
100+
def ofPurePromise (x : IO.Promise α) : AsyncTask α :=
101+
x.result.map pure
102+
103+
/--
104+
Obtain the `IO.TaskState` of `x`.
105+
-/
106+
@[inline]
107+
def getState (x : AsyncTask α) : BaseIO IO.TaskState :=
108+
IO.getTaskState x
109+
110+
end AsyncTask
111+
112+
end Async
113+
end IO
114+
end Internal
115+
end Std

src/Std/Internal/Async/Timer.lean

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/-
2+
Copyright (c) 2024 Lean FRO, LLC. All rights reserved.
3+
Released under Apache 2.0 license as described in the file LICENSE.
4+
Authors: Henrik Böving
5+
-/
6+
prelude
7+
import Std.Time
8+
import Std.Internal.UV
9+
import Std.Internal.Async.Basic
10+
11+
12+
namespace Std
13+
namespace Internal
14+
namespace IO
15+
namespace Async
16+
17+
/--
18+
`Sleep` can be used to sleep for some duration once.
19+
The underlying timer has millisecond resolution.
20+
-/
21+
structure Sleep where
22+
private ofNative ::
23+
native : Internal.UV.Timer
24+
25+
namespace Sleep
26+
27+
/--
28+
Set up a `Sleep` that waits for `duration` milliseconds.
29+
This function only initializes but does not yet start the timer.
30+
-/
31+
@[inline]
32+
def mk (duration : Std.Time.Millisecond.Offset) : IO Sleep := do
33+
let native ← Internal.UV.Timer.mk duration.toInt.toNat.toUInt64 false
34+
return ofNative native
35+
36+
/--
37+
If:
38+
- `s` is not yet running start it and return an `AsyncTask` that will resolve once the previously
39+
configured `duration` has run out.
40+
- `s` is already or not anymore running return the same `AsyncTask` as the first call to `wait`.
41+
-/
42+
@[inline]
43+
def wait (s : Sleep) : IO (AsyncTask Unit) := do
44+
let promise ← s.native.next
45+
return .ofPurePromise promise
46+
47+
/--
48+
If:
49+
- `s` is still running this will delay the resolution of `AsyncTask`s created with `wait` by
50+
`duration` milliseconds.
51+
- `s` is not yet or not anymore running this is a no-op.
52+
-/
53+
@[inline]
54+
def reset (s : Sleep) : IO Unit :=
55+
s.native.reset
56+
57+
/--
58+
If:
59+
- `s` is still running this stops `s` without resolving any remaing `AsyncTask` that were created
60+
through `wait`. Note that if another `AsyncTask` is binding on any of these it is going hang
61+
forever without further intervention.
62+
- `s` is not yet or not anymore running this is a no-op.
63+
-/
64+
@[inline]
65+
def stop (s : Sleep) : IO Unit :=
66+
s.native.stop
67+
68+
end Sleep
69+
70+
/--
71+
Return an `AsyncTask` that resolves after `duration`.
72+
-/
73+
def sleep (duration : Std.Time.Millisecond.Offset) : IO (AsyncTask Unit) := do
74+
let sleeper ← Sleep.mk duration
75+
sleeper.wait
76+
77+
/--
78+
`Interval` can be used to repeatedly wait for some duration like a clock.
79+
The underlying timer has millisecond resolution.
80+
-/
81+
structure Interval where
82+
private ofNative ::
83+
native : Internal.UV.Timer
84+
85+
86+
namespace Interval
87+
88+
/--
89+
Setup up an `Interval` that waits for `duration` milliseconds.
90+
This function only initializes but does not yet start the timer.
91+
-/
92+
@[inline]
93+
def mk (duration : Std.Time.Millisecond.Offset) (_ : 0 < duration := by decide) : IO Interval := do
94+
let native ← Internal.UV.Timer.mk duration.toInt.toNat.toUInt64 true
95+
return ofNative native
96+
97+
/--
98+
If:
99+
- `i` is not yet running start it and return an `AsyncTask` that resolves right away as the 0th
100+
multiple of `duration` has elapsed.
101+
- `i` is already running and:
102+
- the tick from the last call of `i` has not yet finished return the same `AsyncTask` as the last
103+
call
104+
- the tick frrom the last call of `i` has finished return a new `AsyncTask` that waits for the
105+
closest next tick from the time of calling this function.
106+
- `i` is not running aymore this is a no-op.
107+
-/
108+
@[inline]
109+
def tick (i : Interval) : IO (AsyncTask Unit) := do
110+
let promise ← i.native.next
111+
return .ofPurePromise promise
112+
113+
/--
114+
If:
115+
- `Interval.tick` was called on `i` before the timer restarts counting from now and the next tick
116+
happens in `duration`.
117+
- `i` is not yet or not anymore running this is a no-op.
118+
-/
119+
@[inline]
120+
def reset (i : Interval) : IO Unit :=
121+
i.native.reset
122+
123+
/--
124+
If:
125+
- `i` is still running this stops `i` without resolving any remaing `AsyncTask` that were created
126+
through `tick`. Note that if another `AsyncTask` is binding on any of these it is going hang
127+
forever without further intervention.
128+
- `i` is not yet or not anymore running this is a no-op.
129+
-/
130+
@[inline]
131+
def stop (i : Interval) : IO Unit :=
132+
i.native.stop
133+
134+
end Interval
135+
136+
end Async
137+
end IO
138+
end Internal
139+
end Std
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
import Std.Internal.Async.Timer
2+
3+
/-
4+
these tests are just some preliminary ones as `async_sleep.lean` already contains extensive tests
5+
for the entire timer state machine and `Async.Timer` is merely a light wrapper around it.
6+
-/
7+
8+
open Std.Internal.IO.Async
9+
10+
def BASE_DURATION : Std.Time.Millisecond.Offset := 10
11+
12+
namespace SleepTest
13+
14+
def oneSleep : IO Unit := do
15+
let task ← go
16+
assert! (← task.block) == 37
17+
where
18+
go : IO (AsyncTask Nat) := do
19+
let sleep ← Sleep.mk BASE_DURATION
20+
(← sleep.wait).mapIO fun _ =>
21+
return 37
22+
23+
def doubleSleep : IO Unit := do
24+
let task ← go
25+
assert! (← task.block) == 37
26+
where
27+
go : IO (AsyncTask Nat) := do
28+
let sleep ← Sleep.mk BASE_DURATION
29+
(← sleep.wait).bindIO fun _ => do
30+
(← sleep.wait).mapIO fun _ =>
31+
return 37
32+
33+
def resetSleep : IO Unit := do
34+
let task ← go
35+
assert! (← task.block) == 37
36+
where
37+
go : IO (AsyncTask Nat) := do
38+
let sleep ← Sleep.mk BASE_DURATION
39+
let waiter ← sleep.wait
40+
sleep.reset
41+
waiter.mapIO fun _ =>
42+
return 37
43+
44+
def simpleSleep : IO Unit := do
45+
let task ← go
46+
assert! (← task.block) == 37
47+
where
48+
go : IO (AsyncTask Nat) := do
49+
(← sleep BASE_DURATION).mapIO fun _ =>
50+
return 37
51+
52+
#eval oneSleep
53+
#eval doubleSleep
54+
#eval resetSleep
55+
#eval simpleSleep
56+
57+
end SleepTest
58+
59+
namespace IntervalTest
60+
61+
def oneSleep : IO Unit := do
62+
let task ← go
63+
assert! (← task.block) == 37
64+
where
65+
go : IO (AsyncTask Nat) := do
66+
let interval ← Interval.mk BASE_DURATION
67+
(← interval.tick).mapIO fun _ => do
68+
interval.stop
69+
return 37
70+
71+
def doubleSleep : IO Unit := do
72+
let task ← go
73+
assert! (← task.block) == 37
74+
where
75+
go : IO (AsyncTask Nat) := do
76+
let interval ← Interval.mk BASE_DURATION
77+
(← interval.tick).bindIO fun _ => do
78+
(← interval.tick).mapIO fun _ => do
79+
interval.stop
80+
return 37
81+
82+
def resetSleep : IO Unit := do
83+
let task ← go
84+
assert! (← task.block) == 37
85+
where
86+
go : IO (AsyncTask Nat) := do
87+
let interval ← Interval.mk BASE_DURATION
88+
(← interval.tick).bindIO fun _ => do
89+
let waiter ← interval.tick
90+
interval.reset
91+
waiter.mapIO fun _ => do
92+
interval.stop
93+
return 37
94+
95+
#eval oneSleep
96+
#eval doubleSleep
97+
#eval resetSleep
98+
99+
end IntervalTest

0 commit comments

Comments
 (0)