From 731546be13f0642c056b46ab9251c5735d176425 Mon Sep 17 00:00:00 2001 From: timonmerk Date: Fri, 27 Sep 2024 17:29:50 +0200 Subject: [PATCH] fix stopping stream through asyncio --- gui_dev/package.json | 8 +++---- py_neuromodulation/gui/backend/app_backend.py | 17 +++++--------- py_neuromodulation/gui/backend/app_pynm.py | 23 ++++++++++--------- py_neuromodulation/stream/stream.py | 7 ++++-- pyproject.toml | 1 + 5 files changed, 28 insertions(+), 28 deletions(-) diff --git a/gui_dev/package.json b/gui_dev/package.json index 2bbc79f8..e485a5d5 100644 --- a/gui_dev/package.json +++ b/gui_dev/package.json @@ -30,14 +30,14 @@ "@vitejs/plugin-react": "^4.3.1", "@welldone-software/why-did-you-render": "^8.0.3", "babel-plugin-react-compiler": "latest", - "eslint": "^9.11.0", + "eslint": "^9.11.1", "eslint-config-prettier": "^9.1.0", - "eslint-plugin-jsdoc": "^50.2.4", - "eslint-plugin-react": "^7.36.1", + "eslint-plugin-jsdoc": "^50.3.0", + "eslint-plugin-react": "^7.37.0", "eslint-plugin-react-compiler": "latest", "eslint-plugin-react-hooks": "^4.6.2", "eslint-plugin-react-refresh": "^0.4.12", "prettier": "^3.3.3", - "vite": "^5.4.7" + "vite": "^5.4.8" } } diff --git a/py_neuromodulation/gui/backend/app_backend.py b/py_neuromodulation/gui/backend/app_backend.py index 0f87fe58..33364ba0 100644 --- a/py_neuromodulation/gui/backend/app_backend.py +++ b/py_neuromodulation/gui/backend/app_backend.py @@ -1,5 +1,6 @@ import numpy as np import logging +import asyncio import importlib.metadata from datetime import datetime from pathlib import Path @@ -116,8 +117,8 @@ async def handle_stream_control_stop(data: dict): action = data["action"] self.logger.info("Stopping stream") if action == "stop": - self.pynm_state.stream_handling_queue.put("stop") - self.pynm_state.stream.stream_handling_queue.put("stop") + asyncio.create_task(self.pynm_state.stream_handling_queue.put("stop")) + #self.pynm_state.stream.stream_handling_queue.put("stop") return {"message": f"Stream action '{action}' executed"} @self.post("/api/stream-control") @@ -129,20 +130,14 @@ async def handle_stream_control(data: dict): self.logger.info(self.websocket_manager) # TODO: I cannot interact with stream_state_queue, # since the async function is really waiting until the stream finished - await self.pynm_state.start_run_function( + asyncio.create_task(self.pynm_state.start_run_function( #out_dir=data["out_dir"], #experiment_name=data["experiment_name"], websocket_manager_features=self.websocket_manager, - ) + )) if action == "stop": - #if self.pynm_state.stream.is_running is False: - # # TODO: if the message starts with ERROR we could show the message in a popup - # return {"message": "ERROR: Stream is not running"} - - # initiate stream stop and feature save - self.pynm_state.stream_handling_queue.put("stop") - self.pynm_state.stream.stream_handling_queue.put("stop") + await self.pynm_state.stream_handling_queue.put("stop") return {"message": f"Stream action '{action}' executed"} diff --git a/py_neuromodulation/gui/backend/app_pynm.py b/py_neuromodulation/gui/backend/app_pynm.py index 7f26ea61..a89a9e62 100644 --- a/py_neuromodulation/gui/backend/app_pynm.py +++ b/py_neuromodulation/gui/backend/app_pynm.py @@ -1,7 +1,7 @@ import asyncio import logging import numpy as np -from multiprocessing import Process, Queue +from multiprocessing import Process from py_neuromodulation.stream import Stream, NMSettings from py_neuromodulation.utils import set_channels @@ -33,7 +33,7 @@ async def start_run_function( # The stream will then put the results in the queue # there should be another websocket in which the results are sent to the frontend - self.stream_handling_queue = Queue() + self.stream_handling_queue = asyncio.Queue() self.logger.info("setup stream Process") @@ -51,15 +51,16 @@ async def start_run_function( # }, # ) #asyncio.run( - await self.stream.run( - out_dir=out_dir, - experiment_name=experiment_name, - stream_handling_queue=self.stream_handling_queue, - is_stream_lsl=self.lsl_stream_name is not None, - stream_lsl_name=self.lsl_stream_name - if self.lsl_stream_name is not None - else "", - websocket_featues=websocket_manager_features, + asyncio.create_task(self.stream.run( + out_dir=out_dir, + experiment_name=experiment_name, + stream_handling_queue=self.stream_handling_queue, + is_stream_lsl=self.lsl_stream_name is not None, + stream_lsl_name=self.lsl_stream_name + if self.lsl_stream_name is not None + else "", + websocket_featues=websocket_manager_features, + ) ) # self.logger.info("initialized run process") diff --git a/py_neuromodulation/stream/stream.py b/py_neuromodulation/stream/stream.py index 4ee5ea5d..446cec73 100644 --- a/py_neuromodulation/stream/stream.py +++ b/py_neuromodulation/stream/stream.py @@ -1,5 +1,6 @@ """Module for generic and offline data streams.""" +import asyncio from typing import TYPE_CHECKING from collections.abc import Iterator import numpy as np @@ -297,10 +298,12 @@ async def run( prev_batch_end = 0 for timestamps, data_batch in self.generator: self.is_running = True + await asyncio.sleep(0.001) if self.stream_handling_queue is not None: + nm.logger.info("Checking for stop signal") if not self.stream_handling_queue.empty(): - value = self.stream_handling_queue.get() - if value == "stop": + stop_signal = await asyncio.wait_for(self.stream_handling_queue.get(), timeout=0.01) + if stop_signal == "stop": break if data_batch is None: break diff --git a/pyproject.toml b/pyproject.toml index c20736da..c0cdae46 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -62,6 +62,7 @@ dependencies = [ "uvicorn>=0.30.6", "websockets>=13.0", "seaborn >= 0.11", + "cbor2>=5.6.4", ] [project.optional-dependencies]