From f1f3cecab843216192821ad7f370cf1b20eee2c4 Mon Sep 17 00:00:00 2001 From: isra17 Date: Fri, 8 Dec 2023 09:27:28 -0500 Subject: [PATCH] Fix error in scheduler crashing everything --- example/def2/obj.yaml | 61 +++++++++++++++++++ example/src/example/__init__.py | 7 +++ .../worker/executors/scheduler.py | 15 ++--- tests/worker/test_scheduler.py | 2 +- 4 files changed, 77 insertions(+), 8 deletions(-) create mode 100644 example/def2/obj.yaml diff --git a/example/def2/obj.yaml b/example/def2/obj.yaml new file mode 100644 index 00000000..c8aced77 --- /dev/null +++ b/example/def2/obj.yaml @@ -0,0 +1,61 @@ +--- +apiVersion: saturn.flared.io/v1alpha1 +kind: SaturnExecutor +metadata: + name: default +spec: + type: ProcessExecutor + options: + max_workers: 1 + pool_type: thread +--- +apiVersion: saturn.flared.io/v1alpha1 +kind: SaturnTopic +metadata: + name: bogus-topic +spec: + type: example.BogusTopic +--- +apiVersion: saturn.flared.io/v1alpha1 +kind: SaturnTopic +metadata: + name: stdout +spec: + type: FileTopic + options: + path: "-" + mode: "w" +--- +apiVersion: saturn.flared.io/v1alpha1 +kind: SaturnTopic +metadata: + name: static-topic +spec: + type: StaticTopic + options: + cycle: true + messages: + - {"id": "0", "args": {"message": "hello-0"}} + - {"id": "1", "args": {"message": "hello-1"}} + - {"id": "2", "args": {"message": "hello-2"}} + - {"id": "3", "args": {"message": "hello-3"}} +--- +apiVersion: saturn.flared.io/v1alpha1 +kind: SaturnJobDefinition +metadata: + name: example-job-definition + labels: + owner: team-saturn +spec: + minimalInterval: "@weekly" + template: + inputs: + a: {topic: bogus-topic} + b: {topic: static-topic} + + output: + default: + - topic: stdout + + pipeline: + name: example.pipelines.fast diff --git a/example/src/example/__init__.py b/example/src/example/__init__.py index e69de29b..c1b5396d 100644 --- a/example/src/example/__init__.py +++ b/example/src/example/__init__.py @@ -0,0 +1,7 @@ +from saturn_engine.worker.topics.dummy import DummyTopic + + +class BogusTopic(DummyTopic): + async def run(self): + raise ValueError("Oops") + yield diff --git a/src/saturn_engine/worker/executors/scheduler.py b/src/saturn_engine/worker/executors/scheduler.py index 97092711..e3a9a77e 100644 --- a/src/saturn_engine/worker/executors/scheduler.py +++ b/src/saturn_engine/worker/executors/scheduler.py @@ -111,16 +111,17 @@ async def process_task(self, task: asyncio.Task) -> AsyncIterator[T]: exception = task.exception() if exception is None: yield task.result() - elif isinstance(exception, StopAsyncIteration): - self.remove(item) elif isinstance(exception, asyncio.CancelledError): pass + elif isinstance(exception, Exception): + self.remove(item) + if not isinstance(exception, StopAsyncIteration): + self.logger.error( + "Exception raised from schedulable item", + extra={"data": {"queue_name": item.name}}, + exc_info=exception, + ) elif exception: - self.logger.error( - "Exception raised from schedulable item", - extra={"data": {"queue_name": item.name}}, - exc_info=exception, - ) raise ValueError("Fatal error in schedulable") from exception except BaseException: # This is an unexpected error, likely a closed generator or diff --git a/tests/worker/test_scheduler.py b/tests/worker/test_scheduler.py index 83cc27a6..e7630b19 100644 --- a/tests/worker/test_scheduler.py +++ b/tests/worker/test_scheduler.py @@ -73,7 +73,7 @@ async def test_scheduler( # TODO: Reenable once we don't cause fatal error in scheduler. -@pytest.mark.skip +#@pytest.mark.skip @pytest.mark.asyncio async def test_scheduler_iter_errors(scheduler: Scheduler) -> None: schedulable1 = make_schedulable(