From 8e585faeef157c762c32cbb4c515b1bdcafa2626 Mon Sep 17 00:00:00 2001 From: lars-berger Date: Sat, 17 Aug 2024 20:05:25 +0800 Subject: [PATCH] refactor: simplify provider initialization and cleanup (#84) --- packages/desktop/src/main.rs | 11 +- .../desktop/src/providers/battery/provider.rs | 2 +- packages/desktop/src/providers/config.rs | 4 +- .../desktop/src/providers/cpu/provider.rs | 2 +- .../desktop/src/providers/host/provider.rs | 2 +- .../src/providers/interval_provider.rs | 124 ------- packages/desktop/src/providers/ip/provider.rs | 2 +- .../src/providers/komorebi/provider.rs | 16 +- packages/desktop/src/providers/manager.rs | 305 ------------------ .../desktop/src/providers/memory/provider.rs | 2 +- packages/desktop/src/providers/mod.rs | 6 +- .../desktop/src/providers/network/provider.rs | 2 +- packages/desktop/src/providers/provider.rs | 155 ++++++--- .../desktop/src/providers/provider_manager.rs | 133 ++++++++ .../desktop/src/providers/provider_ref.rs | 202 ++++++++++++ packages/desktop/src/providers/variables.rs | 4 +- .../desktop/src/providers/weather/provider.rs | 2 +- 17 files changed, 480 insertions(+), 494 deletions(-) delete mode 100644 packages/desktop/src/providers/interval_provider.rs delete mode 100644 packages/desktop/src/providers/manager.rs create mode 100644 packages/desktop/src/providers/provider_manager.rs create mode 100644 packages/desktop/src/providers/provider_ref.rs diff --git a/packages/desktop/src/main.rs b/packages/desktop/src/main.rs index 8e2e94e9..608c394a 100644 --- a/packages/desktop/src/main.rs +++ b/packages/desktop/src/main.rs @@ -1,6 +1,9 @@ use std::{collections::HashMap, env, sync::Arc}; use clap::Parser; +use providers::{ + config::ProviderConfig, provider_manager::init_provider_manager, +}; use serde::Serialize; use tauri::{ AppHandle, Manager, State, WebviewUrl, WebviewWindowBuilder, Window, @@ -18,7 +21,7 @@ use tracing_subscriber::EnvFilter; use crate::{ cli::{Cli, CliCommand}, monitors::get_monitors_str, - providers::{config::ProviderConfig, manager::ProviderManager}, + providers::provider_manager::ProviderManager, sys_tray::setup_sys_tray, util::window_ext::WindowExt, }; @@ -72,7 +75,7 @@ async fn listen_provider( provider_manager: State<'_, ProviderManager>, ) -> anyhow::Result<(), String> { provider_manager - .listen(config_hash, config, tracked_access) + .create(config_hash, config, tracked_access) .await .map_err(|err| err.to_string()) } @@ -83,7 +86,7 @@ async fn unlisten_provider( provider_manager: State<'_, ProviderManager>, ) -> anyhow::Result<(), String> { provider_manager - .unlisten(config_hash) + .destroy(config_hash) .await .map_err(|err| err.to_string()) } @@ -169,7 +172,7 @@ async fn main() { // Add application icon to system tray. setup_sys_tray(app)?; - providers::manager::init(app)?; + init_provider_manager(app); let args_map = OpenWindowArgsMap(Default::default()); let args_map_ref = args_map.0.clone(); diff --git a/packages/desktop/src/providers/battery/provider.rs b/packages/desktop/src/providers/battery/provider.rs index e288cec8..942fa365 100644 --- a/packages/desktop/src/providers/battery/provider.rs +++ b/packages/desktop/src/providers/battery/provider.rs @@ -13,7 +13,7 @@ use tokio::task::AbortHandle; use super::{BatteryProviderConfig, BatteryVariables}; use crate::providers::{ - interval_provider::IntervalProvider, variables::ProviderVariables, + provider::IntervalProvider, variables::ProviderVariables, }; pub struct BatteryProvider { diff --git a/packages/desktop/src/providers/config.rs b/packages/desktop/src/providers/config.rs index db431ace..40da9c9a 100644 --- a/packages/desktop/src/providers/config.rs +++ b/packages/desktop/src/providers/config.rs @@ -1,6 +1,6 @@ use serde::Deserialize; -#[cfg(all(windows, target_arch = "x86_64"))] +#[cfg(windows)] use super::komorebi::KomorebiProviderConfig; use super::{ battery::BatteryProviderConfig, cpu::CpuProviderConfig, @@ -16,7 +16,7 @@ pub enum ProviderConfig { Cpu(CpuProviderConfig), Host(HostProviderConfig), Ip(IpProviderConfig), - #[cfg(all(windows, target_arch = "x86_64"))] + #[cfg(windows)] Komorebi(KomorebiProviderConfig), Memory(MemoryProviderConfig), Network(NetworkProviderConfig), diff --git a/packages/desktop/src/providers/cpu/provider.rs b/packages/desktop/src/providers/cpu/provider.rs index 65a3eb0b..9af50d1a 100644 --- a/packages/desktop/src/providers/cpu/provider.rs +++ b/packages/desktop/src/providers/cpu/provider.rs @@ -6,7 +6,7 @@ use tokio::{sync::Mutex, task::AbortHandle}; use super::{CpuProviderConfig, CpuVariables}; use crate::providers::{ - interval_provider::IntervalProvider, variables::ProviderVariables, + provider::IntervalProvider, variables::ProviderVariables, }; pub struct CpuProvider { diff --git a/packages/desktop/src/providers/host/provider.rs b/packages/desktop/src/providers/host/provider.rs index ca31c61d..b1f81c5f 100644 --- a/packages/desktop/src/providers/host/provider.rs +++ b/packages/desktop/src/providers/host/provider.rs @@ -6,7 +6,7 @@ use tokio::{sync::Mutex, task::AbortHandle}; use super::{HostProviderConfig, HostVariables}; use crate::providers::{ - interval_provider::IntervalProvider, variables::ProviderVariables, + provider::IntervalProvider, variables::ProviderVariables, }; pub struct HostProvider { diff --git a/packages/desktop/src/providers/interval_provider.rs b/packages/desktop/src/providers/interval_provider.rs deleted file mode 100644 index 28a5514a..00000000 --- a/packages/desktop/src/providers/interval_provider.rs +++ /dev/null @@ -1,124 +0,0 @@ -use std::{sync::Arc, time::Duration}; - -use async_trait::async_trait; -use tokio::{ - sync::mpsc::Sender, - task::{self, AbortHandle}, - time, -}; - -use super::{ - manager::{ProviderOutput, VariablesResult}, - provider::Provider, - variables::ProviderVariables, -}; - -/// Require interval providers to have a refresh interval in their config. -pub trait IntervalConfig { - fn refresh_interval(&self) -> u64; -} - -#[macro_export] -macro_rules! impl_interval_config { - ($struct_name:ident) => { - use crate::providers::interval_provider::IntervalConfig; - - impl IntervalConfig for $struct_name { - fn refresh_interval(&self) -> u64 { - self.refresh_interval - } - } - }; -} - -#[async_trait] -pub trait IntervalProvider { - type Config: Sync + Send + 'static + IntervalConfig; - type State: Sync + Send + 'static; - - /// Default to 2 seconds as the minimum refresh interval. - fn min_refresh_interval(&self) -> Duration { - Duration::from_secs(2) - } - - fn config(&self) -> Arc; - - fn state(&self) -> Arc; - - fn abort_handle(&self) -> &Option; - - fn set_abort_handle(&mut self, abort_handle: AbortHandle); - - async fn get_refreshed_variables( - config: &Self::Config, - state: &Self::State, - ) -> anyhow::Result; -} - -#[async_trait] -impl Provider for T { - fn min_refresh_interval(&self) -> Duration { - T::min_refresh_interval(self) - } - - async fn on_start( - &mut self, - config_hash: String, - emit_output_tx: Sender, - ) { - let config = self.config(); - let state = self.state(); - - let forever = task::spawn(async move { - let mut interval = - time::interval(Duration::from_millis(config.refresh_interval())); - - loop { - // The first tick fires immediately. - interval.tick().await; - - _ = emit_output_tx - .send(ProviderOutput { - config_hash: config_hash.clone(), - variables: to_variables_result( - T::get_refreshed_variables(&config, &state).await, - ), - }) - .await; - } - }); - - self.set_abort_handle(forever.abort_handle()); - _ = forever.await; - } - - async fn on_refresh( - &mut self, - config_hash: String, - emit_output_tx: Sender, - ) { - _ = emit_output_tx - .send(ProviderOutput { - config_hash, - variables: to_variables_result( - T::get_refreshed_variables(&self.config(), &self.state()).await, - ), - }) - .await; - } - - async fn on_stop(&mut self) { - if let Some(handle) = &self.abort_handle() { - handle.abort(); - } - } -} - -fn to_variables_result( - result: anyhow::Result, -) -> VariablesResult { - match result { - Ok(variables) => VariablesResult::Data(variables), - Err(err) => VariablesResult::Error(err.to_string()), - } -} diff --git a/packages/desktop/src/providers/ip/provider.rs b/packages/desktop/src/providers/ip/provider.rs index d09a96f1..0fa936be 100644 --- a/packages/desktop/src/providers/ip/provider.rs +++ b/packages/desktop/src/providers/ip/provider.rs @@ -7,7 +7,7 @@ use tokio::task::AbortHandle; use super::{ipinfo_res::IpinfoRes, IpProviderConfig, IpVariables}; use crate::providers::{ - interval_provider::IntervalProvider, variables::ProviderVariables, + provider::IntervalProvider, variables::ProviderVariables, }; pub struct IpProvider { diff --git a/packages/desktop/src/providers/komorebi/provider.rs b/packages/desktop/src/providers/komorebi/provider.rs index 3ffbc117..f387f60c 100644 --- a/packages/desktop/src/providers/komorebi/provider.rs +++ b/packages/desktop/src/providers/komorebi/provider.rs @@ -18,8 +18,8 @@ use super::{ }; use crate::providers::{ komorebi::KomorebiVariables, - manager::{ProviderOutput, VariablesResult}, provider::Provider, + provider_ref::{ProviderOutput, VariablesResult}, variables::ProviderVariables, }; @@ -123,16 +123,18 @@ impl KomorebiProvider { #[async_trait] impl Provider for KomorebiProvider { - // State should always be up to date. - fn min_refresh_interval(&self) -> Duration { - Duration::MAX + fn min_refresh_interval(&self) -> Option { + // State should always be up to date. + None } async fn on_start( &mut self, - config_hash: String, + config_hash: &str, emit_output_tx: Sender, ) { + let config_hash = config_hash.to_string(); + let task_handle = task::spawn(async move { let socket = komorebi_client::subscribe(SOCKET_NAME).unwrap(); debug!("Connected to Komorebi socket."); @@ -166,7 +168,7 @@ impl Provider for KomorebiProvider { Err(error) => { _ = emit_output_tx .send(ProviderOutput { - config_hash: config_hash.clone(), + config_hash: config_hash.to_string(), variables: VariablesResult::Error(error.to_string()), }) .await; @@ -181,7 +183,7 @@ impl Provider for KomorebiProvider { async fn on_refresh( &mut self, - _config_hash: String, + _config_hash: &str, _emit_output_tx: Sender, ) { // No-op. diff --git a/packages/desktop/src/providers/manager.rs b/packages/desktop/src/providers/manager.rs deleted file mode 100644 index fa50a1ed..00000000 --- a/packages/desktop/src/providers/manager.rs +++ /dev/null @@ -1,305 +0,0 @@ -use std::{ - sync::Arc, - time::{Duration, Instant}, -}; - -use anyhow::{bail, Context}; -use serde::Serialize; -use sysinfo::{Networks, System}; -use tauri::{App, AppHandle, Emitter, Manager, Runtime}; -use tokio::{ - sync::{ - mpsc::{self, Sender}, - Mutex, - }, - task, -}; -use tracing::{info, warn}; - -#[cfg(all(windows, target_arch = "x86_64"))] -use super::komorebi::KomorebiProvider; -use super::{ - battery::BatteryProvider, config::ProviderConfig, cpu::CpuProvider, - host::HostProvider, ip::IpProvider, memory::MemoryProvider, - network::NetworkProvider, variables::ProviderVariables, - weather::WeatherProvider, -}; -use crate::providers::provider::Provider; - -pub struct ListenProviderArgs { - pub config_hash: String, - pub config: ProviderConfig, - pub tracked_access: Vec, -} - -pub struct UnlistenProviderArgs { - pub config_hash: String, -} - -/// Reference to a currently active provider. -#[derive(Debug, Clone)] -pub struct ProviderRef { - config_hash: String, - min_refresh_interval: Duration, - prev_refresh: Option, - prev_output: Option>, - refresh_tx: Sender<()>, - stop_tx: Sender<()>, -} - -#[derive(Serialize, Debug, Clone)] -#[serde(rename_all = "camelCase")] -pub struct ProviderOutput { - pub config_hash: String, - pub variables: VariablesResult, -} - -#[derive(Serialize, Debug, Clone)] -#[serde(rename_all = "camelCase")] -pub enum VariablesResult { - Data(ProviderVariables), - Error(String), -} - -/// Wrapper around the creation and deletion of providers. -pub struct ProviderManager { - listen_input_tx: Sender, - unlisten_input_tx: Sender, -} - -/// Initializes `ProviderManager` in Tauri state. -pub fn init(app: &mut App) -> anyhow::Result<()> { - app.manage(ProviderManager::new(app.handle().clone())); - Ok(()) -} - -/// Create a channel for outputting provider variables to client. -fn handle_provider_emit_output( - app_handle: (impl Emitter + Sync + Send + 'static), - active_providers: Arc>>, -) -> Sender { - let (output_sender, mut output_receiver) = - mpsc::channel::(1); - - task::spawn(async move { - while let Some(output) = output_receiver.recv().await { - info!("Emitting for provider: {}", output.config_hash); - let output = Box::new(output); - - if let Err(err) = app_handle.emit("provider-emit", output.clone()) { - warn!("Error emitting provider output: {:?}", err); - } - - if let Ok(mut providers) = active_providers.try_lock() { - // Find provider that matches given config hash. - let found_provider = providers - .iter_mut() - .find(|provider| *provider.config_hash == output.config_hash); - - if let Some(found) = found_provider { - found.prev_refresh = Some(Instant::now()); - found.prev_output = Some(output); - } - } else { - warn!("Failed to update provider output cache."); - } - } - }); - - output_sender -} - -/// Create a channel for handling provider listen commands from client. -fn handle_provider_listen_input( - active_providers: Arc>>, - emit_output_tx: Sender, -) -> Sender { - let (listen_input_tx, mut listen_input_rx) = - mpsc::channel::(1); - - let sysinfo = Arc::new(Mutex::new(System::new_all())); - let netinfo = Arc::new(Mutex::new(Networks::new_with_refreshed_list())); - - task::spawn(async move { - while let Some(input) = listen_input_rx.recv().await { - // Find provider that matches given config hash. - let found_provider = { - active_providers - .lock() - .await - .iter() - .find(|&provider| *provider.config_hash == input.config_hash) - .cloned() - }; - - // If a provider with the given config already exists, refresh it and - // return early. - if let Some(found) = found_provider { - // The previous output of providers is cached. If within the - // minimum refresh interval, send the previous output. - match (found.prev_refresh, found.prev_output.clone()) { - (Some(prev_refresh), Some(prev_output)) - if prev_refresh.elapsed() < found.min_refresh_interval => - { - _ = emit_output_tx.send(*prev_output).await - } - _ => _ = found.refresh_tx.send(()).await, - } - - continue; - }; - - let (refresh_tx, refresh_rx) = mpsc::channel::<()>(1); - let (stop_tx, stop_rx) = mpsc::channel::<()>(1); - let emit_output_tx = emit_output_tx.clone(); - - // Attempt to create a new provider. - let new_provider = - create_provider(input.config, sysinfo.clone(), netinfo.clone()); - - if let Err(err) = new_provider { - _ = emit_output_tx - .send(ProviderOutput { - config_hash: input.config_hash, - variables: VariablesResult::Error(err.to_string()), - }) - .await; - - continue; - } else if let Ok(mut new_provider) = new_provider { - let min_refresh_interval = new_provider.min_refresh_interval(); - - let mut providers = active_providers.lock().await; - providers.push(ProviderRef { - config_hash: input.config_hash.clone(), - min_refresh_interval, - prev_refresh: None, - prev_output: None, - refresh_tx, - stop_tx, - }); - - task::spawn(async move { - new_provider - .start(input.config_hash, emit_output_tx, refresh_rx, stop_rx) - .await - }); - } - } - }); - - listen_input_tx -} - -fn create_provider( - config: ProviderConfig, - sysinfo: Arc>, - netinfo: Arc>, -) -> anyhow::Result> { - let provider: Box = match config { - ProviderConfig::Battery(config) => { - Box::new(BatteryProvider::new(config)?) - } - ProviderConfig::Cpu(config) => { - Box::new(CpuProvider::new(config, sysinfo)) - } - ProviderConfig::Host(config) => { - Box::new(HostProvider::new(config, sysinfo)) - } - ProviderConfig::Ip(config) => Box::new(IpProvider::new(config)), - #[cfg(all(windows, target_arch = "x86_64"))] - ProviderConfig::Komorebi(config) => { - Box::new(KomorebiProvider::new(config)) - } - ProviderConfig::Memory(config) => { - Box::new(MemoryProvider::new(config, sysinfo)) - } - ProviderConfig::Network(config) => { - Box::new(NetworkProvider::new(config, netinfo)) - } - ProviderConfig::Weather(config) => { - Box::new(WeatherProvider::new(config)) - } - #[allow(unreachable_patterns)] - _ => bail!("Provider not supported on this operating system."), - }; - - Ok(provider) -} - -/// Create a channel for handling provider unlisten commands from client. -fn handle_provider_unlisten_input( - active_providers: Arc>>, -) -> Sender { - let (unlisten_input_tx, mut unlisten_input_rx) = - mpsc::channel::(1); - - task::spawn(async move { - while let Some(input) = unlisten_input_rx.recv().await { - // Find provider that matches given config hash. - let mut providers = active_providers.lock().await; - let found_index = providers - .iter() - .position(|provider| provider.config_hash == input.config_hash); - - // Stop the given provider. This triggers any necessary cleanup. - if let Some(found_index) = found_index { - if let Some(provider) = providers.get(found_index) { - let _ = provider.stop_tx.send(()).await; - providers.remove(found_index); - } - }; - } - }); - - unlisten_input_tx -} - -impl ProviderManager { - pub fn new(app_handle: AppHandle) -> ProviderManager { - let active_providers = Arc::new(Mutex::new(vec![])); - - let emit_output_tx = - handle_provider_emit_output(app_handle, active_providers.clone()); - - let listen_input_tx = handle_provider_listen_input( - active_providers.clone(), - emit_output_tx, - ); - - let unlisten_input_tx = - handle_provider_unlisten_input(active_providers); - - ProviderManager { - listen_input_tx, - unlisten_input_tx, - } - } - - /// Create a provider with the given config. - pub async fn listen( - &self, - config_hash: String, - config: ProviderConfig, - tracked_access: Vec, - ) -> anyhow::Result<()> { - self - .listen_input_tx - .send(ListenProviderArgs { - config_hash, - config, - tracked_access, - }) - .await - .context("error msg") - } - - /// Destroy and clean up a provider with the given config. - pub async fn unlisten(&self, config_hash: String) -> anyhow::Result<()> { - self - .unlisten_input_tx - .send(UnlistenProviderArgs { config_hash }) - .await - .context("error msg") - } -} diff --git a/packages/desktop/src/providers/memory/provider.rs b/packages/desktop/src/providers/memory/provider.rs index f66038d6..75dda699 100644 --- a/packages/desktop/src/providers/memory/provider.rs +++ b/packages/desktop/src/providers/memory/provider.rs @@ -6,7 +6,7 @@ use tokio::{sync::Mutex, task::AbortHandle}; use super::{MemoryProviderConfig, MemoryVariables}; use crate::providers::{ - interval_provider::IntervalProvider, variables::ProviderVariables, + provider::IntervalProvider, variables::ProviderVariables, }; pub struct MemoryProvider { diff --git a/packages/desktop/src/providers/mod.rs b/packages/desktop/src/providers/mod.rs index bc94fa81..b902a359 100644 --- a/packages/desktop/src/providers/mod.rs +++ b/packages/desktop/src/providers/mod.rs @@ -2,13 +2,13 @@ pub mod battery; pub mod config; pub mod cpu; pub mod host; -pub mod interval_provider; pub mod ip; -#[cfg(all(windows, target_arch = "x86_64"))] +#[cfg(windows)] pub mod komorebi; -pub mod manager; pub mod memory; pub mod network; pub mod provider; +pub mod provider_manager; +pub mod provider_ref; pub mod variables; pub mod weather; diff --git a/packages/desktop/src/providers/network/provider.rs b/packages/desktop/src/providers/network/provider.rs index ce28921b..480cd20f 100644 --- a/packages/desktop/src/providers/network/provider.rs +++ b/packages/desktop/src/providers/network/provider.rs @@ -11,7 +11,7 @@ use super::{ NetworkTraffic, NetworkVariables, }; use crate::providers::{ - interval_provider::IntervalProvider, variables::ProviderVariables, + provider::IntervalProvider, variables::ProviderVariables, }; pub struct NetworkProvider { diff --git a/packages/desktop/src/providers/provider.rs b/packages/desktop/src/providers/provider.rs index 6fa5b96b..3f4ca313 100644 --- a/packages/desktop/src/providers/provider.rs +++ b/packages/desktop/src/providers/provider.rs @@ -1,65 +1,140 @@ -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use async_trait::async_trait; -use tokio::sync::mpsc::{Receiver, Sender}; -use tracing::info; +use tokio::{ + sync::mpsc::Sender, + task::{self, AbortHandle}, + time, +}; -use super::manager::ProviderOutput; +use super::{provider_ref::ProviderOutput, variables::ProviderVariables}; #[async_trait] pub trait Provider { - fn min_refresh_interval(&self) -> Duration; - + /// Callback for when the provider is started. async fn on_start( &mut self, - config_hash: String, + config_hash: &str, emit_output_tx: Sender, ); + /// Callback for when the provider is refreshed. async fn on_refresh( &mut self, - config_hash: String, + config_hash: &str, emit_output_tx: Sender, ); + /// Callback for when the provider is stopped. async fn on_stop(&mut self); - async fn start( + /// Minimum interval between refreshes. + /// + /// Affects how the provider output is cached. + fn min_refresh_interval(&self) -> Option; +} + +#[async_trait] +pub trait IntervalProvider { + type Config: Sync + Send + 'static + IntervalConfig; + type State: Sync + Send + 'static; + + /// Default to 2 seconds as the minimum refresh interval. + fn min_refresh_interval(&self) -> Option { + Some(Duration::from_secs(2)) + } + + fn config(&self) -> Arc; + + fn state(&self) -> Arc; + + fn abort_handle(&self) -> &Option; + + fn set_abort_handle(&mut self, abort_handle: AbortHandle); + + async fn get_refreshed_variables( + config: &Self::Config, + state: &Self::State, + ) -> anyhow::Result; +} + +#[async_trait] +impl Provider for T { + fn min_refresh_interval(&self) -> Option { + T::min_refresh_interval(self) + } + + async fn on_start( &mut self, - config_hash: String, + config_hash: &str, emit_output_tx: Sender, - mut refresh_rx: Receiver<()>, - mut stop_rx: Receiver<()>, ) { - let mut has_started = false; - - // Loop to avoid exiting the select on refresh. - loop { - let config_hash = config_hash.clone(); - let emit_output_tx = emit_output_tx.clone(); - - tokio::select! { - // Default match arm which handles initialization of the provider. - // This has a precondition to avoid running again on refresh. - _ = { - info!("Starting provider: {}", config_hash); - has_started = true; - self.on_start(config_hash.clone(), emit_output_tx.clone()) - }, if !has_started => break, - - // On refresh, re-emit provider variables and continue looping. - Some(_) = refresh_rx.recv() => { - info!("Refreshing provider: {}", config_hash); - _ = self.on_refresh(config_hash, emit_output_tx).await; - }, - - // On stop, perform any necessary clean up and exit the loop. - Some(_) = stop_rx.recv() => { - info!("Stopping provider: {}", config_hash); - _ = self.on_stop().await; - break; - }, + let config = self.config(); + let state = self.state(); + let config_hash = config_hash.to_string(); + + let interval_task = task::spawn(async move { + let mut interval = + time::interval(Duration::from_millis(config.refresh_interval())); + + loop { + // The first tick fires immediately. + interval.tick().await; + + _ = emit_output_tx + .send(ProviderOutput { + config_hash: config_hash.clone(), + variables: T::get_refreshed_variables(&config, &state) + .await + .into(), + }) + .await; } + }); + + self.set_abort_handle(interval_task.abort_handle()); + _ = interval_task.await; + } + + async fn on_refresh( + &mut self, + config_hash: &str, + emit_output_tx: Sender, + ) { + _ = emit_output_tx + .send(ProviderOutput { + config_hash: config_hash.to_string(), + variables: T::get_refreshed_variables( + &self.config(), + &self.state(), + ) + .await + .into(), + }) + .await; + } + + async fn on_stop(&mut self) { + if let Some(handle) = &self.abort_handle() { + handle.abort(); } } } + +/// Require interval providers to have a refresh interval in their config. +pub trait IntervalConfig { + fn refresh_interval(&self) -> u64; +} + +#[macro_export] +macro_rules! impl_interval_config { + ($struct_name:ident) => { + use crate::providers::provider::IntervalConfig; + + impl IntervalConfig for $struct_name { + fn refresh_interval(&self) -> u64 { + self.refresh_interval + } + } + }; +} diff --git a/packages/desktop/src/providers/provider_manager.rs b/packages/desktop/src/providers/provider_manager.rs new file mode 100644 index 00000000..845595f8 --- /dev/null +++ b/packages/desktop/src/providers/provider_manager.rs @@ -0,0 +1,133 @@ +use std::{collections::HashMap, sync::Arc}; + +use sysinfo::{Networks, System}; +use tauri::{App, AppHandle, Emitter, Manager, Runtime}; +use tokio::{ + sync::{ + mpsc::{self}, + Mutex, + }, + task, +}; +use tracing::{info, warn}; + +use super::{ + config::ProviderConfig, + provider_ref::{ProviderOutput, ProviderRef}, +}; + +/// Initializes `ProviderManager` in Tauri state. +pub fn init_provider_manager(app: &mut App) { + let mut manager = ProviderManager::new(); + manager.start(app.handle()); + app.manage(manager); +} + +/// State shared between providers. +pub struct SharedProviderState { + pub sysinfo: Arc>, + pub netinfo: Arc>, +} + +/// Manages the creation and cleanup of providers. +pub struct ProviderManager { + emit_output_tx: mpsc::Sender, + emit_output_rx: Option>, + providers: Arc>>, + shared_state: SharedProviderState, +} + +impl ProviderManager { + pub fn new() -> Self { + let (emit_output_tx, emit_output_rx) = + mpsc::channel::(1); + + Self { + emit_output_tx, + emit_output_rx: Some(emit_output_rx), + providers: Arc::new(Mutex::new(HashMap::new())), + shared_state: SharedProviderState { + sysinfo: Arc::new(Mutex::new(System::new_all())), + netinfo: Arc::new(Mutex::new(Networks::new_with_refreshed_list())), + }, + } + } + + /// Starts listening for provider outputs and emits them to frontend + /// clients. + pub fn start(&mut self, app_handle: &AppHandle) { + let mut emit_output_rx = self.emit_output_rx.take().unwrap(); + let providers = self.providers.clone(); + let app_handle = app_handle.clone(); + + task::spawn(async move { + while let Some(output) = emit_output_rx.recv().await { + info!("Emitting for provider: {}", output.config_hash); + + let output = Box::new(output); + + if let Err(err) = app_handle.emit("provider-emit", output.clone()) + { + warn!("Error emitting provider output: {:?}", err); + } + + // Update the provider's output cache. + if let Ok(mut providers) = providers.try_lock() { + if let Some(found_provider) = + providers.get_mut(&output.config_hash) + { + found_provider.update_cache(output); + } + } else { + warn!("Failed to update provider output cache."); + } + } + }); + } + + /// Creates a provider with the given config. + pub async fn create( + &self, + config_hash: String, + config: ProviderConfig, + _tracked_access: Vec, + ) -> anyhow::Result<()> { + let mut providers = self.providers.lock().await; + + // If a provider with the given config already exists, refresh it + // and return early. + if let Some(found_provider) = providers.get(&config_hash) { + if let Err(err) = found_provider.refresh().await { + warn!("Error refreshing provider: {:?}", err); + } + + return Ok(()); + }; + + let provider_ref = ProviderRef::new( + config_hash.clone(), + config, + self.emit_output_tx.clone(), + &self.shared_state, + )?; + + providers.insert(config_hash, provider_ref); + + Ok(()) + } + + /// Destroys and cleans up the provider with the given config. + pub async fn destroy(&self, config_hash: String) -> anyhow::Result<()> { + let mut providers = self.providers.lock().await; + + if let Some(found_provider) = providers.get_mut(&config_hash) { + if let Err(err) = found_provider.stop().await { + warn!("Error stopping provider: {:?}", err); + } + } + + providers.remove(&config_hash); + + Ok(()) + } +} diff --git a/packages/desktop/src/providers/provider_ref.rs b/packages/desktop/src/providers/provider_ref.rs new file mode 100644 index 00000000..21f5a921 --- /dev/null +++ b/packages/desktop/src/providers/provider_ref.rs @@ -0,0 +1,202 @@ +use std::time::{Duration, Instant}; + +use anyhow::bail; +use serde::Serialize; +use tokio::{sync::mpsc, task}; +use tracing::info; + +#[cfg(windows)] +use super::komorebi::KomorebiProvider; +use super::{ + battery::BatteryProvider, config::ProviderConfig, cpu::CpuProvider, + host::HostProvider, ip::IpProvider, memory::MemoryProvider, + network::NetworkProvider, provider::Provider, + provider_manager::SharedProviderState, variables::ProviderVariables, + weather::WeatherProvider, +}; + +/// Reference to an active provider. +#[derive(Debug, Clone)] +pub struct ProviderRef { + pub config_hash: String, + pub min_refresh_interval: Option, + pub cache: Option, + pub emit_output_tx: mpsc::Sender, + pub refresh_tx: mpsc::Sender<()>, + pub stop_tx: mpsc::Sender<()>, +} + +#[derive(Debug, Clone)] +pub struct ProviderCache { + pub timestamp: Instant, + pub output: Box, +} + +/// Output emitted to frontend clients. +#[derive(Serialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +pub struct ProviderOutput { + pub config_hash: String, + pub variables: VariablesResult, +} + +/// Provider variable output emitted to frontend clients. +/// +/// This is used instead of a normal `Result` type to serialize it in a +/// nicer way. +#[derive(Serialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +pub enum VariablesResult { + Data(ProviderVariables), + Error(String), +} + +/// Implements conversion from an `anyhow::Result`. +impl From> for VariablesResult { + fn from(result: anyhow::Result) -> Self { + match result { + Ok(data) => VariablesResult::Data(data), + Err(err) => VariablesResult::Error(err.to_string()), + } + } +} + +impl ProviderRef { + pub fn new( + config_hash: String, + config: ProviderConfig, + emit_output_tx: mpsc::Sender, + shared_state: &SharedProviderState, + ) -> anyhow::Result { + let provider = Self::create_provider(config, shared_state)?; + + let (refresh_tx, refresh_rx) = mpsc::channel::<()>(1); + let (stop_tx, stop_rx) = mpsc::channel::<()>(1); + + let min_refresh_interval = provider.min_refresh_interval(); + let config_hash_clone = config_hash.clone(); + let emit_output_tx_clone = emit_output_tx.clone(); + + task::spawn(async move { + Self::start_provider( + provider, + config_hash_clone, + emit_output_tx_clone, + refresh_rx, + stop_rx, + ) + }); + + Ok(Self { + config_hash, + min_refresh_interval, + cache: None, + emit_output_tx, + refresh_tx, + stop_tx, + }) + } + + /// Starts the provider. + async fn start_provider( + mut provider: Box, + config_hash: String, + emit_output_tx: mpsc::Sender, + mut refresh_rx: mpsc::Receiver<()>, + mut stop_rx: mpsc::Receiver<()>, + ) { + info!("Starting provider: {}", config_hash); + provider + .on_start(&config_hash, emit_output_tx.clone()) + .await; + + // Loop to avoid exiting the select on refresh. + loop { + tokio::select! { + // On refresh, re-emit provider variables and continue looping. + Some(_) = refresh_rx.recv() => { + info!("Refreshing provider: {}", config_hash); + _ = provider.on_refresh(&config_hash, emit_output_tx.clone()).await; + }, + + // On stop, perform any necessary clean up and exit the loop. + Some(_) = stop_rx.recv() => { + info!("Stopping provider: {}", config_hash); + _ = provider.on_stop().await; + break; + }, + } + } + } + + fn create_provider( + config: ProviderConfig, + shared_state: &SharedProviderState, + ) -> anyhow::Result> { + let provider: Box = match config { + ProviderConfig::Battery(config) => { + Box::new(BatteryProvider::new(config)?) + } + ProviderConfig::Cpu(config) => { + Box::new(CpuProvider::new(config, shared_state.sysinfo.clone())) + } + ProviderConfig::Host(config) => { + Box::new(HostProvider::new(config, shared_state.sysinfo.clone())) + } + ProviderConfig::Ip(config) => Box::new(IpProvider::new(config)), + #[cfg(windows)] + ProviderConfig::Komorebi(config) => { + Box::new(KomorebiProvider::new(config)) + } + ProviderConfig::Memory(config) => { + Box::new(MemoryProvider::new(config, shared_state.sysinfo.clone())) + } + ProviderConfig::Network(config) => Box::new(NetworkProvider::new( + config, + shared_state.netinfo.clone(), + )), + ProviderConfig::Weather(config) => { + Box::new(WeatherProvider::new(config)) + } + #[allow(unreachable_patterns)] + _ => bail!("Provider not supported on this operating system."), + }; + + Ok(provider) + } + + /// Updates cache with the given output. + pub fn update_cache(&mut self, output: Box) { + self.cache = Some(ProviderCache { + timestamp: Instant::now(), + output, + }); + } + + /// Refreshes the provider. + /// + /// Since the previous output of providers is cached, if within the + /// minimum refresh interval, send the previous output. + pub async fn refresh(&self) -> anyhow::Result<()> { + let min_refresh_interval = + self.min_refresh_interval.unwrap_or(Duration::MAX); + + match &self.cache { + Some(cache) if cache.timestamp.elapsed() >= min_refresh_interval => { + self.emit_output_tx.send(*cache.output.clone()).await?; + } + _ => self.refresh_tx.send(()).await?, + }; + + Ok(()) + } + + /// Stops the given provider. + /// + /// This triggers any necessary cleanup. + pub async fn stop(&self) -> anyhow::Result<()> { + self.stop_tx.send(()).await?; + + Ok(()) + } +} diff --git a/packages/desktop/src/providers/variables.rs b/packages/desktop/src/providers/variables.rs index 7fbe3716..e6337f92 100644 --- a/packages/desktop/src/providers/variables.rs +++ b/packages/desktop/src/providers/variables.rs @@ -1,6 +1,6 @@ use serde::Serialize; -#[cfg(all(windows, target_arch = "x86_64"))] +#[cfg(windows)] use super::komorebi::KomorebiVariables; use super::{ battery::BatteryVariables, cpu::CpuVariables, host::HostVariables, @@ -15,7 +15,7 @@ pub enum ProviderVariables { Cpu(CpuVariables), Host(HostVariables), Ip(IpVariables), - #[cfg(all(windows, target_arch = "x86_64"))] + #[cfg(windows)] Komorebi(KomorebiVariables), Memory(MemoryVariables), Network(NetworkVariables), diff --git a/packages/desktop/src/providers/weather/provider.rs b/packages/desktop/src/providers/weather/provider.rs index 70cd55aa..c5e7e553 100644 --- a/packages/desktop/src/providers/weather/provider.rs +++ b/packages/desktop/src/providers/weather/provider.rs @@ -9,7 +9,7 @@ use super::{ WeatherVariables, }; use crate::providers::{ - interval_provider::IntervalProvider, variables::ProviderVariables, + provider::IntervalProvider, variables::ProviderVariables, }; pub struct WeatherProvider {