Skip to content

Commit 1746887

Browse files
hargoniXalgebraic-devTwoFX
authored andcommitted
feat: implement basic async IO with timers (leanprover#6505)
This PR implements a basic async framework as well as asynchronously running timers using libuv. --------- Co-authored-by: Sofia Rodrigues <sofia@algebraic.dev> Co-authored-by: Markus Himmel <markus@himmel-villmar.de> Co-authored-by: Markus Himmel <markus@lean-fro.org>
1 parent 842aeb9 commit 1746887

File tree

16 files changed

+1246
-4
lines changed

16 files changed

+1246
-4
lines changed

src/Std/Internal.lean

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ Released under Apache 2.0 license as described in the file LICENSE.
44
Authors: Henrik Böving
55
-/
66
prelude
7+
import Std.Internal.Async
78
import Std.Internal.Parsec
9+
import Std.Internal.UV
810

911
/-!
1012
This directory is used for components of the standard library that are either considered

src/Std/Internal/Async.lean

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
/-
2+
Copyright (c) 2024 Lean FRO, LLC. All rights reserved.
3+
Released under Apache 2.0 license as described in the file LICENSE.
4+
Authors: Henrik Böving
5+
-/
6+
prelude
7+
import Std.Internal.Async.Basic
8+
import Std.Internal.Async.Timer

src/Std/Internal/Async/Basic.lean

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/-
2+
Copyright (c) 2024 Lean FRO, LLC. All rights reserved.
3+
Released under Apache 2.0 license as described in the file LICENSE.
4+
Authors: Henrik Böving
5+
-/
6+
prelude
7+
import Init.Core
8+
import Init.System.IO
9+
import Init.System.Promise
10+
11+
namespace Std
12+
namespace Internal
13+
namespace IO
14+
namespace Async
15+
16+
/--
17+
A `Task` that may resolve to a value or an `IO.Error`.
18+
-/
19+
def AsyncTask (α : Type u) : Type u := Task (Except IO.Error α)
20+
21+
namespace AsyncTask
22+
23+
/--
24+
Construct an `AsyncTask` that is already resolved with value `x`.
25+
-/
26+
@[inline]
27+
protected def pure (x : α) : AsyncTask α := Task.pure <| .ok x
28+
29+
instance : Pure AsyncTask where
30+
pure := AsyncTask.pure
31+
32+
/--
33+
Create a new `AsyncTask` that will run after `x` has finished.
34+
If `x`:
35+
- errors, return an `AsyncTask` that resolves to the error.
36+
- succeeds, run `f` on the result of `x` and return the `AsyncTask` produced by `f`.
37+
-/
38+
@[inline]
39+
protected def bind (x : AsyncTask α) (f : α → AsyncTask β) : AsyncTask β :=
40+
Task.bind x fun r =>
41+
match r with
42+
| .ok a => f a
43+
| .error e => Task.pure <| .error e
44+
45+
/--
46+
Create a new `AsyncTask` that will run after `x` has finished.
47+
If `x`:
48+
- errors, return an `AsyncTask` that reolves to the error.
49+
- succeeds, return an `AsyncTask` that resolves to `f x`.
50+
-/
51+
@[inline]
52+
def map (f : α → β) (x : AsyncTask α) : AsyncTask β :=
53+
Task.map (x := x) fun r =>
54+
match r with
55+
| .ok a => .ok (f a)
56+
| .error e => .error e
57+
58+
/--
59+
Similar to `bind`, however `f` has access to the `IO` monad. If `f` throws an error, the returned
60+
`AsyncTask` resolves to that error.
61+
-/
62+
@[inline]
63+
def bindIO (x : AsyncTask α) (f : α → IO (AsyncTask β)) : BaseIO (AsyncTask β) :=
64+
IO.bindTask x fun r =>
65+
match r with
66+
| .ok a => f a
67+
| .error e => .error e
68+
69+
/--
70+
Similar to `bind`, however `f` has access to the `IO` monad. If `f` throws an error, the returned
71+
`AsyncTask` resolves to that error.
72+
-/
73+
@[inline]
74+
def mapIO (f : α → IO β) (x : AsyncTask α) : BaseIO (AsyncTask β) :=
75+
IO.mapTask (t := x) fun r =>
76+
match r with
77+
| .ok a => f a
78+
| .error e => .error e
79+
80+
/--
81+
Block until the `AsyncTask` in `x` finishes.
82+
-/
83+
def block (x : AsyncTask α) : IO α := do
84+
let res := x.get
85+
match res with
86+
| .ok a => return a
87+
| .error e => .error e
88+
89+
/--
90+
Create an `AsyncTask` that resolves to the value of `x`.
91+
-/
92+
@[inline]
93+
def ofPromise (x : IO.Promise (Except IO.Error α)) : AsyncTask α :=
94+
x.result
95+
96+
/--
97+
Create an `AsyncTask` that resolves to the value of `x`.
98+
-/
99+
@[inline]
100+
def ofPurePromise (x : IO.Promise α) : AsyncTask α :=
101+
x.result.map pure
102+
103+
/--
104+
Obtain the `IO.TaskState` of `x`.
105+
-/
106+
@[inline]
107+
def getState (x : AsyncTask α) : BaseIO IO.TaskState :=
108+
IO.getTaskState x
109+
110+
end AsyncTask
111+
112+
end Async
113+
end IO
114+
end Internal
115+
end Std

src/Std/Internal/Async/Timer.lean

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/-
2+
Copyright (c) 2024 Lean FRO, LLC. All rights reserved.
3+
Released under Apache 2.0 license as described in the file LICENSE.
4+
Authors: Henrik Böving
5+
-/
6+
prelude
7+
import Std.Time
8+
import Std.Internal.UV
9+
import Std.Internal.Async.Basic
10+
11+
12+
namespace Std
13+
namespace Internal
14+
namespace IO
15+
namespace Async
16+
17+
/--
18+
`Sleep` can be used to sleep for some duration once.
19+
The underlying timer has millisecond resolution.
20+
-/
21+
structure Sleep where
22+
private ofNative ::
23+
native : Internal.UV.Timer
24+
25+
namespace Sleep
26+
27+
/--
28+
Set up a `Sleep` that waits for `duration` milliseconds.
29+
This function only initializes but does not yet start the timer.
30+
-/
31+
@[inline]
32+
def mk (duration : Std.Time.Millisecond.Offset) : IO Sleep := do
33+
let native ← Internal.UV.Timer.mk duration.toInt.toNat.toUInt64 false
34+
return ofNative native
35+
36+
/--
37+
If:
38+
- `s` is not yet running start it and return an `AsyncTask` that will resolve once the previously
39+
configured `duration` has run out.
40+
- `s` is already or not anymore running return the same `AsyncTask` as the first call to `wait`.
41+
-/
42+
@[inline]
43+
def wait (s : Sleep) : IO (AsyncTask Unit) := do
44+
let promise ← s.native.next
45+
return .ofPurePromise promise
46+
47+
/--
48+
If:
49+
- `s` is still running the timer restarts counting from now and finishes after `duration`
50+
milliseconds.
51+
- `s` is not yet or not anymore running this is a no-op.
52+
-/
53+
@[inline]
54+
def reset (s : Sleep) : IO Unit :=
55+
s.native.reset
56+
57+
/--
58+
If:
59+
- `s` is still running this stops `s` without resolving any remaining `AsyncTask`s that were created
60+
through `wait`. Note that if another `AsyncTask` is binding on any of these it is going hang
61+
forever without further intervention.
62+
- `s` is not yet or not anymore running this is a no-op.
63+
-/
64+
@[inline]
65+
def stop (s : Sleep) : IO Unit :=
66+
s.native.stop
67+
68+
end Sleep
69+
70+
/--
71+
Return an `AsyncTask` that resolves after `duration`.
72+
-/
73+
def sleep (duration : Std.Time.Millisecond.Offset) : IO (AsyncTask Unit) := do
74+
let sleeper ← Sleep.mk duration
75+
sleeper.wait
76+
77+
/--
78+
`Interval` can be used to repeatedly wait for some duration like a clock.
79+
The underlying timer has millisecond resolution.
80+
-/
81+
structure Interval where
82+
private ofNative ::
83+
native : Internal.UV.Timer
84+
85+
86+
namespace Interval
87+
88+
/--
89+
Setup up an `Interval` that waits for `duration` milliseconds.
90+
This function only initializes but does not yet start the timer.
91+
-/
92+
@[inline]
93+
def mk (duration : Std.Time.Millisecond.Offset) (_ : 0 < duration := by decide) : IO Interval := do
94+
let native ← Internal.UV.Timer.mk duration.toInt.toNat.toUInt64 true
95+
return ofNative native
96+
97+
/--
98+
If:
99+
- `i` is not yet running start it and return an `AsyncTask` that resolves right away as the 0th
100+
multiple of `duration` has elapsed.
101+
- `i` is already running and:
102+
- the tick from the last call of `i` has not yet finished return the same `AsyncTask` as the last
103+
call
104+
- the tick frrom the last call of `i` has finished return a new `AsyncTask` that waits for the
105+
closest next tick from the time of calling this function.
106+
- `i` is not running aymore this is a no-op.
107+
-/
108+
@[inline]
109+
def tick (i : Interval) : IO (AsyncTask Unit) := do
110+
let promise ← i.native.next
111+
return .ofPurePromise promise
112+
113+
/--
114+
If:
115+
- `Interval.tick` was called on `i` before the timer restarts counting from now and the next tick
116+
happens in `duration`.
117+
- `i` is not yet or not anymore running this is a no-op.
118+
-/
119+
@[inline]
120+
def reset (i : Interval) : IO Unit :=
121+
i.native.reset
122+
123+
/--
124+
If:
125+
- `i` is still running this stops `i` without resolving any remaing `AsyncTask` that were created
126+
through `tick`. Note that if another `AsyncTask` is binding on any of these it is going hang
127+
forever without further intervention.
128+
- `i` is not yet or not anymore running this is a no-op.
129+
-/
130+
@[inline]
131+
def stop (i : Interval) : IO Unit :=
132+
i.native.stop
133+
134+
end Interval
135+
136+
end Async
137+
end IO
138+
end Internal
139+
end Std

src/Std/Internal/UV.lean

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/-
2+
Copyright (c) 2024 Lean FRO, LLC. All rights reserved.
3+
Released under Apache 2.0 license as described in the file LICENSE.
4+
Authors: Sofia Rodrigues
5+
-/
6+
prelude
7+
import Init.System.IO
8+
import Init.System.Promise
9+
10+
namespace Std
11+
namespace Internal
12+
namespace UV
13+
14+
namespace Loop
15+
16+
/--
17+
Options for configuring the event loop behavior.
18+
-/
19+
structure Loop.Options where
20+
/--
21+
Accumulate the amount of idle time the event loop spends in the event provider.
22+
-/
23+
accumulateIdleTime : Bool := False
24+
25+
/--
26+
Block a SIGPROF signal when polling for new events. It's commonly used for unnecessary wakeups
27+
when using a sampling profiler.
28+
-/
29+
blockSigProfSignal : Bool := False
30+
31+
/--
32+
Configures the event loop with the specified options.
33+
-/
34+
@[extern "lean_uv_event_loop_configure"]
35+
opaque configure (options : Loop.Options) : BaseIO Unit
36+
37+
/--
38+
Checks if the event loop is still active and processing events.
39+
-/
40+
@[extern "lean_uv_event_loop_alive"]
41+
opaque alive : BaseIO Bool
42+
43+
end Loop
44+
45+
private opaque TimerImpl : NonemptyType.{0}
46+
47+
/--
48+
`Timer`s are used to generate `IO.Promise`s that resolve after some time.
49+
50+
A `Timer` can be in one of 3 states:
51+
- Right after construction it's initial.
52+
- While it is ticking it's running.
53+
- If it has stopped for some reason it's finished.
54+
55+
This together with whether it was set up as `repeating` with `Timer.new` determines the behavior
56+
of all functions on `Timer`s.
57+
-/
58+
def Timer : Type := TimerImpl.type
59+
60+
instance : Nonempty Timer := TimerImpl.property
61+
62+
namespace Timer
63+
64+
/--
65+
This creates a `Timer` in the initial state and doesn't run it yet.
66+
- If `repeating` is `false` this constructs a timer that resolves once after `durationMs`
67+
milliseconds, counting from when it's run.
68+
- If `repeating` is `true` this constructs a timer that resolves after multiples of `durationMs`
69+
milliseconds, counting from when it's run. Note that this includes the 0th multiple right after
70+
starting the timer. Furthermore a repeating timer will only be freed after `Timer.stop` is called.
71+
-/
72+
@[extern "lean_uv_timer_mk"]
73+
opaque mk (timeout : UInt64) (repeating : Bool) : IO Timer
74+
75+
/--
76+
This function has different behavior depending on the state and configuration of the `Timer`:
77+
- if `repeating` is `false` and:
78+
- it is initial, run it and return a new `IO.Promise` that is set to resolve once `durationMs`
79+
milliseconds have elapsed. After this `IO.Promise` is resolved the `Timer` is finished.
80+
- it is running or finished, return the same `IO.Promise` that the first call to `next` returned.
81+
- if `repeating` is `true` and:
82+
- it is initial, run it and return a new `IO.Promise` that resolves right away
83+
(as it is the 0th multiple of `durationMs`).
84+
- it is running, check whether the last returned `IO.Promise` is already resolved:
85+
- If it is, return a new `IO.Promise` that resolves upon finishing the next cycle
86+
- If it is not, return the last `IO.Promise`
87+
This ensures that the returned `IO.Promise` resolves at the next repetition of the timer.
88+
- if it is finished, return the last `IO.Promise` created by `next`. Notably this could be one
89+
that never resolves if the timer was stopped before fulfilling the last one.
90+
-/
91+
@[extern "lean_uv_timer_next"]
92+
opaque next (timer : @& Timer) : IO (IO.Promise Unit)
93+
94+
/--
95+
This function has different behavior depending on the state and configuration of the `Timer`:
96+
- If it is initial or finished this is a no-op.
97+
- If it is running and `repeating` is `false` this will delay the resolution of the timer until
98+
`durationMs` milliseconds after the call of this function.
99+
- Delay the resolution of the next tick of the timer until `durationMs` milliseconds after the
100+
call of this function, then continue normal ticking behavior from there.
101+
-/
102+
@[extern "lean_uv_timer_reset"]
103+
opaque reset (timer : @& Timer) : IO Unit
104+
105+
/--
106+
This function has different behavior depending on the state of the `Timer`:
107+
- If it is initial or finished this is a no-op.
108+
- If it is running the execution of the timer is stopped and it is put into the finished state.
109+
Note that if the last `IO.Promise` generated by `next` is unresolved and being waited
110+
on this creates a memory leak and the waiting task is not going to be awoken anymore.
111+
-/
112+
@[extern "lean_uv_timer_stop"]
113+
opaque stop (timer : @& Timer) : IO Unit
114+
115+
end Timer
116+
117+
end UV
118+
end Internal
119+
end Std

0 commit comments

Comments
 (0)