From ce94e38b63e9c9e840557ffba4579d6846485934 Mon Sep 17 00:00:00 2001 From: JinGeun Lee Date: Thu, 20 Mar 2025 23:19:54 +0900 Subject: [PATCH 1/2] Update async job streaming stop condition and update test cases --- runpod/endpoint/asyncio/asyncio_runner.py | 9 +++++++-- tests/test_endpoint/test_asyncio_runner.py | 4 +--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/runpod/endpoint/asyncio/asyncio_runner.py b/runpod/endpoint/asyncio/asyncio_runner.py index 8b87d7ce..4a4dde1f 100644 --- a/runpod/endpoint/asyncio/asyncio_runner.py +++ b/runpod/endpoint/asyncio/asyncio_runner.py @@ -1,4 +1,4 @@ -""" Module for running endpoints asynchronously. """ +"""Module for running endpoints asynchronously.""" # pylint: disable=too-few-public-methods,R0801 @@ -89,9 +89,14 @@ async def stream(self) -> Any: while True: await asyncio.sleep(1) stream_partial = await self._fetch_job(source="stream") - if stream_partial["status"] not in FINAL_STATES: + if ( + stream_partial["status"] not in FINAL_STATES + or len(stream_partial["stream"]) > 0 + ): for chunk in stream_partial.get("stream", []): yield chunk["output"] + elif stream_partial["status"] in FINAL_STATES: + break async def cancel(self) -> dict: """Cancels current job diff --git a/tests/test_endpoint/test_asyncio_runner.py b/tests/test_endpoint/test_asyncio_runner.py index 088a549f..0ec77caf 100644 --- a/tests/test_endpoint/test_asyncio_runner.py +++ b/tests/test_endpoint/test_asyncio_runner.py @@ -1,4 +1,4 @@ -""" Unit tests for the asyncio_runner module. """ +"""Unit tests for the asyncio_runner module.""" # pylint: disable=too-few-public-methods @@ -114,8 +114,6 @@ async def json_side_effect(): outputs = [] async for stream_output in job.stream(): outputs.append(stream_output) - if not responses: # Break the loop when responses are exhausted - break assert outputs == ["OUTPUT1", "OUTPUT2"] From 5b86a507af0e51f96c5dd1b8993008d96a83cfec Mon Sep 17 00:00:00 2001 From: JinGeun Lee Date: Tue, 25 Mar 2025 11:05:34 +0900 Subject: [PATCH 2/2] apply code review --- runpod/endpoint/asyncio/asyncio_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runpod/endpoint/asyncio/asyncio_runner.py b/runpod/endpoint/asyncio/asyncio_runner.py index 4a4dde1f..e5c458f5 100644 --- a/runpod/endpoint/asyncio/asyncio_runner.py +++ b/runpod/endpoint/asyncio/asyncio_runner.py @@ -91,7 +91,7 @@ async def stream(self) -> Any: stream_partial = await self._fetch_job(source="stream") if ( stream_partial["status"] not in FINAL_STATES - or len(stream_partial["stream"]) > 0 + or len(stream_partial.get("stream", [])) > 0 ): for chunk in stream_partial.get("stream", []): yield chunk["output"]