From 6c498666a4ff4f16e455f69f2880b8e625d1d065 Mon Sep 17 00:00:00 2001 From: timonmerk Date: Mon, 23 Sep 2024 18:40:15 +0200 Subject: [PATCH] add start and stop of stream --- gui_dev/package.json | 6 +-- gui_dev/src/pages/Dashboard.jsx | 23 ++++++--- gui_dev/src/pages/Settings/Settings.jsx | 4 +- gui_dev/src/stores/sessionStore.js | 48 +++++++++++++++++++ py_neuromodulation/gui/backend/app_backend.py | 24 +++++++--- py_neuromodulation/gui/backend/app_pynm.py | 4 +- py_neuromodulation/gui/backend/app_socket.py | 2 +- py_neuromodulation/stream/stream.py | 9 ++-- 8 files changed, 96 insertions(+), 24 deletions(-) diff --git a/gui_dev/package.json b/gui_dev/package.json index 1a1cf43f..2bbc79f8 100644 --- a/gui_dev/package.json +++ b/gui_dev/package.json @@ -30,14 +30,14 @@ "@vitejs/plugin-react": "^4.3.1", "@welldone-software/why-did-you-render": "^8.0.3", "babel-plugin-react-compiler": "latest", - "eslint": "^9.10.0", + "eslint": "^9.11.0", "eslint-config-prettier": "^9.1.0", - "eslint-plugin-jsdoc": "^50.2.3", + "eslint-plugin-jsdoc": "^50.2.4", "eslint-plugin-react": "^7.36.1", "eslint-plugin-react-compiler": "latest", "eslint-plugin-react-hooks": "^4.6.2", "eslint-plugin-react-refresh": "^0.4.12", "prettier": "^3.3.3", - "vite": "^5.4.6" + "vite": "^5.4.7" } } diff --git a/gui_dev/src/pages/Dashboard.jsx b/gui_dev/src/pages/Dashboard.jsx index c9d21aa5..2ade4872 100644 --- a/gui_dev/src/pages/Dashboard.jsx +++ b/gui_dev/src/pages/Dashboard.jsx @@ -1,8 +1,19 @@ import { Graph } from "@/components"; -import { Box } from "@mui/material"; +import { Box, Button } from "@mui/material"; +import { useSessionStore } from "@/stores"; -export const Dashboard = () => ( - - - -); +export const Dashboard = () => { + + const startStream = useSessionStore((state) => state.startStream); + const stopStream = useSessionStore((state) => state.stopStream); + + return( + <> + + + + + + + ) +} diff --git a/gui_dev/src/pages/Settings/Settings.jsx b/gui_dev/src/pages/Settings/Settings.jsx index c2f1dff9..e2cf5f2a 100644 --- a/gui_dev/src/pages/Settings/Settings.jsx +++ b/gui_dev/src/pages/Settings/Settings.jsx @@ -259,10 +259,10 @@ export const Settings = () => { variant="contained" component={Link} color="primary" - to="/decoding" + to="/dashboard" sx={{ mt: 2 }} > - Run Stream + Start Stream ); diff --git a/gui_dev/src/stores/sessionStore.js b/gui_dev/src/stores/sessionStore.js index 6d30be86..df78c066 100644 --- a/gui_dev/src/stores/sessionStore.js +++ b/gui_dev/src/stores/sessionStore.js @@ -247,6 +247,54 @@ export const useSessionStore = createPersistStore("session", (set, get) => ({ ); }, + startStream: async () => { + try { + console.log("Start Stream"); + + const response = await fetch("/api/stream-control", { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + // This needs to be adapted depending on the backend changes + body: JSON.stringify({ "action" : "start"}), + }); + + if (!response.ok) { + throw new Error(`Failed start stream: ${await response.text()}`); + } + + const result = await response.json(); + console.log("Stream started:", result); + } catch (error) { + console.error("Failed to start stream:", error); + } + }, + + stopStream: async () => { + try { + console.log("Stop Stream"); + + const response = await fetch("/api/stream-control-stop", { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + // This needs to be adapted depending on the backend changes + body: JSON.stringify({ "action" : "stop"}), + }); + + if (!response.ok) { + throw new Error(`Failed stopping stream: ${await response.text()}`); + } + + const result = await response.json(); + console.log("Stream Stopping:", result); + } catch (error) { + console.error("Failed to stop stream:", error); + } + }, + resetSession: () => get().setStateAndSync({ sourceType: null, diff --git a/py_neuromodulation/gui/backend/app_backend.py b/py_neuromodulation/gui/backend/app_backend.py index 74514cea..0f87fe58 100644 --- a/py_neuromodulation/gui/backend/app_backend.py +++ b/py_neuromodulation/gui/backend/app_backend.py @@ -111,6 +111,15 @@ async def update_settings(data: dict): ##### PYNM CONTROL ##### ######################## + @self.post("/api/stream-control-stop") + async def handle_stream_control_stop(data: dict): + action = data["action"] + self.logger.info("Stopping stream") + if action == "stop": + self.pynm_state.stream_handling_queue.put("stop") + self.pynm_state.stream.stream_handling_queue.put("stop") + return {"message": f"Stream action '{action}' executed"} + @self.post("/api/stream-control") async def handle_stream_control(data: dict): action = data["action"] @@ -118,18 +127,21 @@ async def handle_stream_control(data: dict): # TODO: create out_dir and experiment_name text filds in frontend self.logger.info("websocket:") self.logger.info(self.websocket_manager) + # TODO: I cannot interact with stream_state_queue, + # since the async function is really waiting until the stream finished await self.pynm_state.start_run_function( - out_dir=data["out_dir"], - experiment_name=data["experiment_name"], - websocket_manager=self.websocket_manager, + #out_dir=data["out_dir"], + #experiment_name=data["experiment_name"], + websocket_manager_features=self.websocket_manager, ) if action == "stop": - if self.pynm_state.stream.is_running is False: - # TODO: if the message starts with ERROR we could show the message in a popup - return {"message": "ERROR: Stream is not running"} + #if self.pynm_state.stream.is_running is False: + # # TODO: if the message starts with ERROR we could show the message in a popup + # return {"message": "ERROR: Stream is not running"} # initiate stream stop and feature save + self.pynm_state.stream_handling_queue.put("stop") self.pynm_state.stream.stream_handling_queue.put("stop") return {"message": f"Stream action '{action}' executed"} diff --git a/py_neuromodulation/gui/backend/app_pynm.py b/py_neuromodulation/gui/backend/app_pynm.py index f60c51c0..7f26ea61 100644 --- a/py_neuromodulation/gui/backend/app_pynm.py +++ b/py_neuromodulation/gui/backend/app_pynm.py @@ -33,7 +33,7 @@ async def start_run_function( # The stream will then put the results in the queue # there should be another websocket in which the results are sent to the frontend - stream_handling_queue = Queue() + self.stream_handling_queue = Queue() self.logger.info("setup stream Process") @@ -54,7 +54,7 @@ async def start_run_function( await self.stream.run( out_dir=out_dir, experiment_name=experiment_name, - stream_handling_queue=stream_handling_queue, + stream_handling_queue=self.stream_handling_queue, is_stream_lsl=self.lsl_stream_name is not None, stream_lsl_name=self.lsl_stream_name if self.lsl_stream_name is not None diff --git a/py_neuromodulation/gui/backend/app_socket.py b/py_neuromodulation/gui/backend/app_socket.py index b0e24b6d..37dd403f 100644 --- a/py_neuromodulation/gui/backend/app_socket.py +++ b/py_neuromodulation/gui/backend/app_socket.py @@ -41,7 +41,7 @@ async def send_message(self, message: str | dict): if self.active_connections: for connection in self.active_connections: if type(message) is dict: - await connection.send_json(json.dump(message)) + await connection.send_json(message) elif type(message) is str: await connection.send_text(message) self.logger.info(f"Message sent") diff --git a/py_neuromodulation/stream/stream.py b/py_neuromodulation/stream/stream.py index cfb8e865..4ee5ea5d 100644 --- a/py_neuromodulation/stream/stream.py +++ b/py_neuromodulation/stream/stream.py @@ -258,11 +258,11 @@ async def run( nm.logger.log_to_file(out_dir) # Initialize mp.Pool for multiprocessing - self.pool = mp.Pool(processes=self.settings.n_jobs) + #self.pool = mp.Pool(processes=self.settings.n_jobs) # Set up shared memory for multiprocessing - self.shared_memory = mp.Array(ctypes.c_double, self.settings.n_jobs * self.settings.n_jobs) + #self.shared_memory = mp.Array(ctypes.c_double, self.settings.n_jobs * self.settings.n_jobs) # Set up multiprocessing semaphores - self.semaphore = mp.Semaphore(self.settings.n_jobs) + #self.semaphore = mp.Semaphore(self.settings.n_jobs) # Initialize generator self.generator: Iterator @@ -339,7 +339,8 @@ async def run( if websocket_featues is not None: nm.logger.info("Sending message to Websocket") #nm.logger.info(feature_dict) - await websocket_featues.send_message(feature_dict) + #await websocket_featues.send_cbor(feature_dict) + #await websocket_featues.send_message(feature_dict) self.batch_count += 1 if self.batch_count % self.save_interval == 0: self.db.commit()