-
Notifications
You must be signed in to change notification settings - Fork 790
feat: add graceful cancellation and --stop-on-first-error option in lake
#13075
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
e0b90fc
f10ff80
82b51c1
b4cdc35
c21b541
f6de20a
50c7a8f
50931da
0d82e4c
aae9625
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -163,14 +163,14 @@ namespace Job | |
| [OptDataKind α] (act : JobM α) (caption := "") | ||
| : SpawnM (Job α) := .ofFn fun fetch pkg? stack store ctx _ => | ||
| .ofTask (caption := caption) <$> Task.pure <$> | ||
| (withLoggedIO act).toFn fetch pkg? stack store ctx {} | ||
| (JobResult.ofLogResult <$> (withLoggedIO act).toFn fetch pkg? stack store ctx {}) | ||
|
|
||
| /-- Spawn a job that asynchronously performs `act`. -/ | ||
| @[nospecialize] public protected def async | ||
| [OptDataKind α] (act : JobM α) (prio := Task.Priority.default) (caption := "") | ||
| : SpawnM (Job α) := .ofFn fun fetch pkg? stack store ctx _ => | ||
| .ofTask (caption := caption) <$> BaseIO.asTask (prio := prio) do | ||
| (withLoggedIO act).toFn fetch pkg? stack store ctx {} | ||
| JobResult.ofLogResult <$> (withLoggedIO act).toFn fetch pkg? stack store ctx {} | ||
|
|
||
| /-- Wait for a job to complete and return the result. -/ | ||
| @[inline] public protected def wait (self : Job α) : BaseIO (JobResult α) := do | ||
|
|
@@ -189,8 +189,9 @@ Logs the job's log and throws if there was an error. | |
| -/ | ||
| public protected def await (self : Job α) : LogIO α := do | ||
| match (← self.wait) with | ||
| | .error n {log, ..} => log.replay; throw n | ||
| | .ok a {log, ..} => log.replay; pure a | ||
| | .error (.errorLogged n) {log, ..} => log.replay; throw n | ||
| | .error .cancelled _ => throw 0 | ||
|
|
||
| /-- Apply `f` asynchronously to the job's output. -/ | ||
| @[nospecialize] public protected def mapM | ||
|
|
@@ -201,8 +202,8 @@ public protected def await (self : Job α) : LogIO α := do | |
| BaseIO.mapTask (t := task) (prio := prio) (sync := sync) fun | ||
| | .ok a s => | ||
| let trace := mixTrace trace s.trace | ||
| withLoggedIO (f a) |>.toFn fetch pkg? stack store ctx {s with trace} | ||
| | .error n s => return .error n s | ||
| JobResult.ofLogResult <$> (withLoggedIO (f a)).toFn fetch pkg? stack store ctx {s with trace} | ||
| | .error e s => return .error e s | ||
|
|
||
| /-- | ||
| Apply `f` asynchronously to the job's output | ||
|
|
@@ -220,8 +221,9 @@ and asynchronously await the resulting job. | |
| | .ok job sa => | ||
| return job.task.map (prio := prio) (sync := true) fun | ||
| | .ok b sb => .ok b {sa.merge sb with trace := sb.trace} | ||
| | .error e sb => .error ⟨sa.log.size + e.val⟩ {sa.merge sb with trace := sb.trace} | ||
| | .error e sa => return Task.pure (.error e sa) | ||
| | .error (.errorLogged e) sb => .error (.errorLogged ⟨sa.log.size + e.val⟩) {sa.merge sb with trace := sb.trace} | ||
| | .error e sb => .error e {sa.merge sb with trace := sb.trace} | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of duplicating the |
||
| | .error e sa => return Task.pure (.error (.errorLogged e) sa) | ||
| | .error e sa => return Task.pure (.error e sa) | ||
|
|
||
| /-- | ||
|
|
@@ -246,14 +248,18 @@ results of `a` and `b`. The job `c` errors if either `a` or `b` error. | |
| : Job γ := | ||
| self.zipResultWith (other := other) (prio := prio) (sync := sync) fun | ||
| | .ok a sa, .ok b sb => .ok (f a b) (sa.merge sb) | ||
| | ra, rb => .error 0 (ra.state.merge rb.state) | ||
| | .error (.errorLogged _) sa, rb => .error (.errorLogged 0) (sa.merge rb.state) | ||
| | ra, .error (.errorLogged _) sb => .error (.errorLogged 0) (ra.state.merge sb) | ||
| | ra, rb => .error .cancelled (ra.state.merge rb.state) | ||
|
|
||
| /-- Merges this job with another, discarding its output and trace. -/ | ||
| public def add (self : Job α) (other : Job β) : Job α := | ||
| have : OptDataKind α := self.kind | ||
| self.zipResultWith (other := other) fun | ||
| | .ok a sa, .ok _ sb => .ok a {sa.merge sb with trace := sa.trace} | ||
| | ra, rb => .error 0 {ra.state.merge rb.state with trace := ra.state.trace} | ||
| | .error (.errorLogged _) sa, rb => .error (.errorLogged 0) {sa.merge rb.state with trace := sa.trace} | ||
| | ra, .error (.errorLogged _) sb => .error (.errorLogged 0) {ra.state.merge sb with trace := ra.state.trace} | ||
| | ra, rb => .error .cancelled {ra.state.merge rb.state with trace := ra.state.trace} | ||
|
|
||
| /-- Merges this job with another, discarding both outputs. -/ | ||
| public def mix (self : Job α) (other : Job β) : Job Unit := | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -84,11 +84,16 @@ private structure ModuleImportData where | |||||||
| let impSet := if imp.includeSelf then impSet.insert imp.module else impSet | ||||||||
| .ok impSet s | ||||||||
| | .error e s => .error e s | ||||||||
| | .error .cancelled _ => | ||||||||
| match r with | ||||||||
| | .ok _ => .error .cancelled r.state | ||||||||
| | .error _ _ => r | ||||||||
| | .error _ _ => | ||||||||
| let entry := LogEntry.error s!"{fileName}: bad import '{imp.module.name}'" | ||||||||
| match r with | ||||||||
| | .ok _ s => .error 0 (s.logEntry entry) | ||||||||
| | .error e s => .error e (s.logEntry entry) | ||||||||
| | .ok _ s => .error (.errorLogged 0) (s.logEntry entry) | ||||||||
| | .error (.errorLogged e) s => .error (.errorLogged e) (s.logEntry entry) | ||||||||
| | .error .cancelled s => .error .cancelled s | ||||||||
|
Comment on lines
+95
to
+96
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Marginal, but perhaps faster as
Suggested change
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks a bit cryptic, I'm tempted to leave as-is |
||||||||
| return Job.ofTask <| task.map (sync := true) fun | ||||||||
| | .ok impSet s => .ok impSet.toArray s | ||||||||
| | .error e s => .error e s | ||||||||
|
|
||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,2 @@ | ||
| -- This module intentionally has a syntax error to test --stop-on-first-error | ||
| #check (this is not valid lean syntax |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,2 @@ | ||
| -- This module intentionally has a syntax error to test --stop-on-first-error | ||
| #check (this is not valid lean syntax |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| rm -rf .lake lake-manifest.json *.produced.out |
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could do this with less work with
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is pretty nice, although now that I bit the bullet I don't know if I prefer cancellation being an "error" rather than a distinct state. Does keeping the
abbrevhave some more benefits except for avoiding all those new adaptation functions?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main advantage is you get to reuse all the
EResultmachinery, and the code for error propagation (inbind) can pass theJobErrorobject along unchanged without taking it apart and putting it back together each time, which is likely to be faster at runtime.You could use
JobExceptionorJobStatusinstead, I'm not attached to the name. But for reference, Python usesasyncio.CancelledErrorfor such a concept, so I don't think treating cancellations as an error is really that strange.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sold. Some pattern matching is a bit more awkward, but still pretty okay.