From 73525b0efdc59d98a6c42b42bc2b49ae639a7e2e Mon Sep 17 00:00:00 2001 From: Yuriy Larin Date: Sun, 26 Nov 2023 23:09:51 +0200 Subject: [PATCH] added handler crate --- Cargo.toml | 2 + brainbit/src/bbit/device.rs | 16 +++- brainbit/src/bbit/eeg_uuids.rs | 8 -- brainbit/src/bbit/mod.rs | 30 ++++++- brainbit/src/bbit/responses.rs | 1 - brainbit/src/lib.rs | 37 +------- examples/battery_level/Cargo.toml | 4 +- examples/battery_level/src/main.rs | 2 +- examples/connect/Cargo.toml | 4 +- handler/Cargo.toml | 10 ++- handler/src/lib.rs | 15 +--- handler/src/main_handler.rs | 120 +++++++++++++++++++++++++ mainapp/Cargo.toml | 9 +- mainapp/src/main.rs | 138 +++-------------------------- 14 files changed, 194 insertions(+), 202 deletions(-) create mode 100644 handler/src/main_handler.rs diff --git a/Cargo.toml b/Cargo.toml index ded8f59..df5a15b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,9 +3,11 @@ members = [ "mainapp", "brainbit", + "handler", "examples/connect", "examples/battery_level", ] +resolver = "2" [workspace.package] version = "0.1.0" diff --git a/brainbit/src/bbit/device.rs b/brainbit/src/bbit/device.rs index 6461586..3f1885b 100644 --- a/brainbit/src/bbit/device.rs +++ b/brainbit/src/bbit/device.rs @@ -6,9 +6,9 @@ use crate::bbit::eeg_uuids::{ }; use crate::bbit::responses::{DeviceInfo, DeviceStatusData}; use crate::bbit::sealed::{Bluetooth, Configure, Connected, EventLoop, Level}; -use crate::{find_characteristic, Error, EventHandler}; +use crate::{find_characteristic, Error}; -use crate::bbit::{ADS1294ChannelInput, ChannelType, MeasurementType}; +use crate::bbit::{ADS1294ChannelInput, ChannelType, EventHandler, MeasurementType}; use btleplug::{ api::{Central, Characteristic, Manager as _, Peripheral as _, ScanFilter}, platform::{Manager, Peripheral}, @@ -308,7 +308,10 @@ impl BBitSensor { tracing::trace!("loop - received DeviceStatusData: {result:?}"); match result { Ok(status_data) => { - let Ok(_) = bt_tx.send(BluetoothEvent::DeviceStatus(status_data)).await else { break }; + let Ok(_) = bt_tx.send(BluetoothEvent::DeviceStatus(status_data)).await + else { + break; + }; } Err(error) => { tracing::debug!("Error receiving Device Status data: {error:?}"); @@ -320,7 +323,12 @@ impl BBitSensor { "loop - received eeg-resist_data: {:02X?}", eeg_or_resist_data ); - let Ok(_) = bt_tx.send(BluetoothEvent::EggOrResistanceData(eeg_or_resist_data)).await else { break }; + let Ok(_) = bt_tx + .send(BluetoothEvent::EggOrResistanceData(eeg_or_resist_data)) + .await + else { + break; + }; } } diff --git a/brainbit/src/bbit/eeg_uuids.rs b/brainbit/src/bbit/eeg_uuids.rs index a679871..6f340a4 100644 --- a/brainbit/src/bbit/eeg_uuids.rs +++ b/brainbit/src/bbit/eeg_uuids.rs @@ -3,14 +3,6 @@ use uuid::{uuid, Uuid}; /// Device name to search for pub const PERIPHERAL_NAME_MATCH_FILTER: &'static str = "BrainBit"; -/// GAT access service for access to several characteristics below -// const GENERIC_ACCESS_SERVICE_UUID: Uuid = uuid!("00001800-0000-1000-8000-00805F9B34FB"); - -/// Device name for reading (in GENERIC_ACCESS_SERVICE_UUID) -// const DEVICE_NAME_STRING_UUID: Uuid = uuid!("00002A00-0000-1000-8000-00805F9B34FB"); -/// Device Appearance value reading (in GENERIC_ACCESS_SERVICE_UUID) -// const DEVICE_APPEARANCE_STRING_UUID: Uuid = uuid!("00002A01-0000-1000-8000-00805F9B34FB"); - /// GAT attribute service for several device's characteristics pub const GENERIC_ATTRIBUTE_SERVICE_UUID: Uuid = uuid!("0000180A-0000-1000-8000-00805F9B34FB"); diff --git a/brainbit/src/bbit/mod.rs b/brainbit/src/bbit/mod.rs index d612eab..d91aded 100644 --- a/brainbit/src/bbit/mod.rs +++ b/brainbit/src/bbit/mod.rs @@ -1,5 +1,7 @@ -use crate::bbit::device::BBitResult; +use crate::bbit::device::{BBitResult, CommandData}; +use crate::bbit::responses::DeviceStatusData; use crate::Error; +use async_trait::async_trait; pub(crate) mod control; pub mod device; @@ -8,6 +10,32 @@ pub mod resist; pub mod responses; pub(crate) mod sealed; +/// Base trait for handling events coming from a BrainBit device. +#[async_trait] +pub trait EventHandler { + /// Dispatched when a internal device status update is received. + /// + /// Contains the status, cmd error, battery level. + async fn device_status_update(&self, _status: DeviceStatusData) {} + + /// Dispatched when an eeg data is received. + /// + /// Contains information about the O1, O2, T3, T4 + interval. + async fn eeg_update(&mut self, _eeg_data: Vec) {} + + /// Dispatched when measurement data is received over the PMD data UUID. + /// + /// Contains data in a [`CommandData`]. + async fn send_command(&self, _command_data: CommandData) {} + + /// Checked at start of each event loop. + /// + /// Returns [`false`] if the event loop should be terminated and close connection. + async fn should_continue(&self) -> bool { + true + } +} + /// List of measurement types you can request. #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub enum MeasurementType { diff --git a/brainbit/src/bbit/responses.rs b/brainbit/src/bbit/responses.rs index ac19b8e..1ec7fbe 100644 --- a/brainbit/src/bbit/responses.rs +++ b/brainbit/src/bbit/responses.rs @@ -64,7 +64,6 @@ pub struct DeviceStatusData { /// Firmware version pub firmware_version: u8, } - impl Default for DeviceStatusData { fn default() -> Self { Self { diff --git a/brainbit/src/lib.rs b/brainbit/src/lib.rs index dc4d85a..7b2c66d 100644 --- a/brainbit/src/lib.rs +++ b/brainbit/src/lib.rs @@ -1,7 +1,6 @@ pub mod bbit; -use crate::bbit::device::{BBitResult, CommandData}; -use crate::bbit::responses::DeviceStatusData; +use crate::bbit::device::BBitResult; pub use async_trait::async_trait; use btleplug::api::{Characteristic, Peripheral as _}; use btleplug::platform::Peripheral; @@ -36,32 +35,6 @@ pub enum Error { HandlerError(#[from] Box), } -/// Base trait for handling events coming from a BrainBit device. -#[async_trait] -pub trait EventHandler { - /// Dispatched when a internal device status update is received. - /// - /// Contains the status, cmd error, battery level. - async fn device_status_update(&self, _status: DeviceStatusData) {} - - /// Dispatched when an eeg data is received. - /// - /// Contains information about the O1, O2, T3, T4 + interval. - async fn eeg_update(&mut self, _eeg_data: Vec) {} - - /// Dispatched when measurement data is received over the PMD data UUID. - /// - /// Contains data in a [`CommandData`]. - async fn send_command(&self, _command_data: CommandData) {} - - /// Checked at start of each event loop. - /// - /// Returns [`false`] if the event loop should be terminated and close connection. - async fn should_continue(&self) -> bool { - true - } -} - /// Private helper to find characteristics from a [`Uuid`]. async fn find_characteristic(device: &Peripheral, uuid: Uuid) -> BBitResult { device @@ -71,11 +44,3 @@ async fn find_characteristic(device: &Peripheral, uuid: Uuid) -> BBitResult usize { - left + right -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn it_works() { - let result = add(2, 2); - assert_eq!(result, 4); - } -} +pub mod main_handler; diff --git a/handler/src/main_handler.rs b/handler/src/main_handler.rs new file mode 100644 index 0000000..fea6812 --- /dev/null +++ b/handler/src/main_handler.rs @@ -0,0 +1,120 @@ +use chrono::Utc; +use lib::bbit::resist::ResistState; +use lib::bbit::responses::{DeviceStatusData, Nss2Status}; +use lib::bbit::EventHandler; +use std::fs::File; +use std::io::Write; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; +use tracing::instrument; + +const SKIP_FIRST_RESIST_RECORDS_NUMBER: u8 = 20; +const STORE_RESIST_RECORDS_NUMBER: u8 = 20; + +#[derive(Debug)] +pub struct BBitHandler { + counter: Arc, + device_status: Mutex, + output: Mutex, + skipped_resist_records_number: AtomicUsize, // AtomicUsize = AtomicUsize::new(0); + current_chanel_number_resist_measure: AtomicUsize, + resist_measure_records: Vec, + resist_results: Mutex, +} + +#[lib::async_trait] +impl EventHandler for BBitHandler { + #[instrument(skip(self))] + async fn device_status_update(&self, status_data: DeviceStatusData) { + let time = Utc::now(); + let formatted: String = time.to_rfc3339_opts(chrono::SecondsFormat::Secs, true); + // formatted = formatted.replace("\'", ""); + let msg = format!("{formatted:?} - {status_data}\n"); + tracing::debug!(msg); + { + // write eeg data to file + let mut lock = self.output.lock().unwrap(); + lock.write_all(msg.as_bytes()).unwrap(); + } + { + // read and update local Device Status + let mut lock = self.device_status.lock().unwrap(); + lock.status_nss2 = status_data.status_nss2; + lock.battery_level = status_data.battery_level; + lock.cmd_error = status_data.cmd_error; + } + self.counter.fetch_add(1, Ordering::SeqCst); + } + + #[instrument(skip_all)] + async fn eeg_update(self: &mut BBitHandler, eeg_data: Vec) { + let time = Utc::now(); + let mut formatted: String = time.to_rfc3339_opts(chrono::SecondsFormat::Secs, true); + formatted = formatted.replace("\'", ""); + // let msg = format!("{formatted:?} - EEG={:>3?}\n", eeg_data); + let msg = format!("{:>3?}\n", eeg_data); + { + let mut lock = self.output.lock().unwrap(); + lock.write_all(msg.as_bytes()).expect("Can't write log..."); + } + let nss2status = self.device_status.lock().unwrap().status_nss2; + match nss2status { + Nss2Status::ResistTransmission => { + tracing::debug!(msg); + let skipped_number = self.skipped_resist_records_number.load(Ordering::Relaxed); + if skipped_number > 0 { + // skip 'SKIP_FIRST_RESIST_RECORDS_NUMBER' records + tracing::debug!("Skipping = {:?}", skipped_number); + self.decrease_skipped_resist_records_number(); + return; + } + let gathered_records_number = self.get_resist_measure_records_len(); + if gathered_records_number >= STORE_RESIST_RECORDS_NUMBER as usize { + tracing::debug!( + "Gathered = {:?} records for ch='{}'", + gathered_records_number, + self.current_chanel_number_resist_measure + .load(Ordering::Relaxed) + ); + // got to next channel + } + } + Nss2Status::EegTransmission => { + tracing::debug!(msg); + } + Nss2Status::Stopped => { + tracing::debug!("Stopped device in main"); + } + _ => {} + } + } +} + +impl BBitHandler { + pub async fn new(counter: Arc) -> color_eyre::Result { + Ok(Self { + counter, + device_status: Mutex::new(DeviceStatusData::default()), + output: Mutex::new(File::create(format!("main_app_output.txt"))?), + skipped_resist_records_number: AtomicUsize::new( + SKIP_FIRST_RESIST_RECORDS_NUMBER as usize, + ), + current_chanel_number_resist_measure: AtomicUsize::new(0), + resist_measure_records: Vec::with_capacity(STORE_RESIST_RECORDS_NUMBER as usize), + resist_results: Mutex::new(ResistState::default()), + }) + } + + fn decrease_skipped_resist_records_number(&mut self) { + self.current_chanel_number_resist_measure + .fetch_sub(1, Ordering::Relaxed); + } + + fn push_resist_data(&mut self, new_data: u8) { + self.resist_measure_records.push(new_data); + } + + fn get_resist_measure_records_len(&self) -> usize { + self.resist_measure_records.len() + } +} diff --git a/mainapp/Cargo.toml b/mainapp/Cargo.toml index 9be48c9..8e1a639 100644 --- a/mainapp/Cargo.toml +++ b/mainapp/Cargo.toml @@ -8,11 +8,12 @@ description = "Console client app" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -lib = { version = "0.1.0", path = "../brainbit" } +lib = { path = "../brainbit" } +handler = { path = "../handler" } -tokio = { workspace = true, features = ["full"] } -tracing = { workspace = true } -tracing-subscriber = { workspace = true, features = ["env-filter"] } +tokio.workspace = true +tracing.workspace = true +tracing-subscriber.workspace = true color-eyre.workspace = true chrono.workspace = true \ No newline at end of file diff --git a/mainapp/src/main.rs b/mainapp/src/main.rs index 6a6034f..bbd176b 100644 --- a/mainapp/src/main.rs +++ b/mainapp/src/main.rs @@ -1,24 +1,16 @@ -use lib::bbit::device::BBitSensor; -use lib::bbit::eeg_uuids::{EventType, PERIPHERAL_NAME_MATCH_FILTER}; -use lib::bbit::resist::{ResistState, ResistsMeasureResult}; -use lib::bbit::responses::{DeviceStatusData, Nss2Status}; -use lib::EventHandler; +use std::sync::Arc; use std::{ io::{self, Write}, sync::atomic::{AtomicUsize, Ordering}, time::Duration, }; + +use tokio::sync::oneshot; use tracing::instrument; use tracing_subscriber::{fmt, prelude::*, EnvFilter}; -use chrono::Utc; -use tokio::io::AsyncWriteExt; -use tokio::{ - fs::File, - sync::{oneshot, Mutex}, -}; - -static COUNTER: AtomicUsize = AtomicUsize::new(0); +use lib::bbit::device::BBitSensor; +use lib::bbit::eeg_uuids::{EventType, PERIPHERAL_NAME_MATCH_FILTER}; #[tokio::main] #[instrument] @@ -53,11 +45,14 @@ async fn main() -> color_eyre::Result<()> { .build() .await?; - let handler = connected.event_loop(Handler::new().await?).await; + let counter: Arc = Arc::new(AtomicUsize::new(0)); + let handler = connected + .event_loop(handler::main_handler::BBitHandler::new(Arc::clone(&counter)).await?) + .await; tracing::info!("BrainBit is connected, event loop is started"); handler.start().await; - get_finish().await?; + get_finish(counter).await?; handler.stop().await; tracing::info!("stopped the event loop, finishing"); @@ -65,123 +60,14 @@ async fn main() -> color_eyre::Result<()> { Ok(()) } -const SKIP_FIRST_RESIST_RECORDS_NUMBER: u8 = 20; -const STORE_RESIST_RECORDS_NUMBER: u8 = 20; - -#[derive(Debug)] -struct Handler { - device_status: Mutex, - output: Mutex, - skipped_resist_records_number: AtomicUsize, // AtomicUsize = AtomicUsize::new(0); - current_chanel_number_resist_measure: AtomicUsize, - resist_measure_records: Vec, - resist_results: Mutex, -} - -impl Handler { - async fn new() -> color_eyre::Result { - Ok(Self { - device_status: Mutex::new(DeviceStatusData::default()), - output: Mutex::new(File::create(format!("main_app_output.txt")).await?), - skipped_resist_records_number: AtomicUsize::new( - SKIP_FIRST_RESIST_RECORDS_NUMBER as usize, - ), - current_chanel_number_resist_measure: AtomicUsize::new(0), - resist_measure_records: Vec::with_capacity(STORE_RESIST_RECORDS_NUMBER as usize), - resist_results: Mutex::new(ResistState::default()), - }) - } - - fn decrease_skipped_resist_records_number(&mut self) { - self.current_chanel_number_resist_measure - .fetch_sub(1, Ordering::Relaxed); - } - - fn push_resist_data(&mut self, new_data: u8) { - self.resist_measure_records.push(new_data); - } - - fn get_resist_measure_records_len(&self) -> usize { - self.resist_measure_records.len() - } -} - -#[lib::async_trait] -impl EventHandler for Handler { - #[instrument(skip(self))] - async fn device_status_update(&self, status_data: DeviceStatusData) { - let time = Utc::now(); - let formatted: String = time.to_rfc3339_opts(chrono::SecondsFormat::Secs, true); - // formatted = formatted.replace("\'", ""); - let msg = format!("{formatted:?} - {status_data}\n"); - tracing::debug!(msg); - { - // write eeg data to file - let mut lock = self.output.lock().await; - lock.write_all(msg.as_bytes()).await.unwrap(); - } - { - // read and update local Device Status - let mut lock = self.device_status.lock().await; - lock.status_nss2 = status_data.status_nss2; - lock.battery_level = status_data.battery_level; - lock.cmd_error = status_data.cmd_error; - } - COUNTER.fetch_add(1, Ordering::SeqCst); - } - - #[instrument(skip_all)] - async fn eeg_update(self: &mut Handler, eeg_data: Vec) { - let time = Utc::now(); - let mut formatted: String = time.to_rfc3339_opts(chrono::SecondsFormat::Secs, true); - formatted = formatted.replace("\'", ""); - // let msg = format!("{formatted:?} - EEG={:>3?}\n", eeg_data); - let msg = format!("{:>3?}\n", eeg_data); - { - let mut lock = self.output.lock().await; - lock.write_all(msg.as_bytes()).await.unwrap(); - } - let nss2status = self.device_status.lock().await.status_nss2; - match nss2status { - Nss2Status::ResistTransmission => { - tracing::debug!(msg); - let skipped_number = self.skipped_resist_records_number.load(Ordering::Relaxed); - if skipped_number > 0 { - // skip 'SKIP_FIRST_RESIST_RECORDS_NUMBER' records - tracing::debug!("Skipping = {:?}", skipped_number); - self.decrease_skipped_resist_records_number(); - return; - } - let gathered_records_number = self.get_resist_measure_records_len(); - if gathered_records_number >= STORE_RESIST_RECORDS_NUMBER as usize { - tracing::debug!( - "Gathered = {:?} records for ch='{}'", - gathered_records_number, - self.current_chanel_number_resist_measure - .load(Ordering::Relaxed) - ); - // got to next channel - } - } - Nss2Status::EegTransmission => { - tracing::debug!(msg); - } - Nss2Status::Stopped => { - tracing::debug!("Stopped device in main"); - } - _ => {} - } - } -} - -async fn get_finish() -> color_eyre::Result<()> { +async fn get_finish(counter: Arc) -> color_eyre::Result<()> { let mut buf = String::new(); let (tx, mut rx) = oneshot::channel(); println!(); print!( "\r({} events received) Would you like to stop? (y/N) ", - COUNTER.load(Ordering::SeqCst) + counter.load(Ordering::SeqCst) ); let task = tokio::task::spawn(async move { loop {