Skip to content

Commit

Permalink
Fix error in scheduler crashing everything
Browse files Browse the repository at this point in the history
  • Loading branch information
isra17 committed Dec 8, 2023
1 parent d5fe15b commit f1f3cec
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 8 deletions.
61 changes: 61 additions & 0 deletions example/def2/obj.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
---
apiVersion: saturn.flared.io/v1alpha1
kind: SaturnExecutor
metadata:
name: default
spec:
type: ProcessExecutor
options:
max_workers: 1
pool_type: thread
---
apiVersion: saturn.flared.io/v1alpha1
kind: SaturnTopic
metadata:
name: bogus-topic
spec:
type: example.BogusTopic
---
apiVersion: saturn.flared.io/v1alpha1
kind: SaturnTopic
metadata:
name: stdout
spec:
type: FileTopic
options:
path: "-"
mode: "w"
---
apiVersion: saturn.flared.io/v1alpha1
kind: SaturnTopic
metadata:
name: static-topic
spec:
type: StaticTopic
options:
cycle: true
messages:
- {"id": "0", "args": {"message": "hello-0"}}
- {"id": "1", "args": {"message": "hello-1"}}
- {"id": "2", "args": {"message": "hello-2"}}
- {"id": "3", "args": {"message": "hello-3"}}
---
apiVersion: saturn.flared.io/v1alpha1
kind: SaturnJobDefinition
metadata:
name: example-job-definition
labels:
owner: team-saturn
spec:
minimalInterval: "@weekly"
template:
inputs:
a: {topic: bogus-topic}
b: {topic: static-topic}

output:
default:
- topic: stdout

pipeline:
name: example.pipelines.fast
7 changes: 7 additions & 0 deletions example/src/example/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from saturn_engine.worker.topics.dummy import DummyTopic


class BogusTopic(DummyTopic):
async def run(self):
raise ValueError("Oops")
yield
15 changes: 8 additions & 7 deletions src/saturn_engine/worker/executors/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,16 +111,17 @@ async def process_task(self, task: asyncio.Task) -> AsyncIterator[T]:
exception = task.exception()
if exception is None:
yield task.result()
elif isinstance(exception, StopAsyncIteration):
self.remove(item)
elif isinstance(exception, asyncio.CancelledError):
pass
elif isinstance(exception, Exception):
self.remove(item)
if not isinstance(exception, StopAsyncIteration):
self.logger.error(
"Exception raised from schedulable item",
extra={"data": {"queue_name": item.name}},
exc_info=exception,
)
elif exception:
self.logger.error(
"Exception raised from schedulable item",
extra={"data": {"queue_name": item.name}},
exc_info=exception,
)
raise ValueError("Fatal error in schedulable") from exception
except BaseException:
# This is an unexpected error, likely a closed generator or
Expand Down
2 changes: 1 addition & 1 deletion tests/worker/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ async def test_scheduler(


# TODO: Reenable once we don't cause fatal error in scheduler.
@pytest.mark.skip
#@pytest.mark.skip
@pytest.mark.asyncio
async def test_scheduler_iter_errors(scheduler: Scheduler) -> None:
schedulable1 = make_schedulable(
Expand Down

0 comments on commit f1f3cec

Please sign in to comment.