diff --git a/src/core/client.rs b/src/core/client.rs index ceacf9b..368393a 100644 --- a/src/core/client.rs +++ b/src/core/client.rs @@ -1,8 +1,8 @@ -use anyhow::{bail, Context, Result}; +use anyhow::Result; +use drop_guard::{ClientDropGuard, DropGuard}; use std::{ collections::VecDeque, io::{self, Write}, - sync::{Arc, Mutex}, thread::Builder, }; @@ -22,45 +22,46 @@ use crate::{ pub fn init( client_rx: Receiver, - client_tx: Sender, event_tx: Sender, client: Client<'static>, ) -> io::Result> { std::thread::Builder::new() .name("client task".to_owned()) - .spawn(move || client_task(&client_rx, &client_tx, &event_tx, client)) + .spawn(move || client_task(&client_rx, &event_tx, client)) } -fn client_task( - client_rx: &Receiver, - client_tx: &Sender, - event_tx: &Sender, - client: Client<'_>, -) { - let (idle_initiate_tx, idle_initiate_rx) = &bounded::<()>(0); - let (idle_confirm_tx, idle_confirm_rx) = &bounded::<()>(0); +fn client_task(client_rx: &Receiver, event_tx: &Sender, client: Client<'_>) { + let (req2idle_tx, req2idle_rx) = &bounded::>(0); + let (idle2req_tx, idle2req_rx) = &bounded::>(0); + let (idle_entered_tx, idle_entered_rx) = &bounded::<()>(0); let (thread_end_ctx, thread_end_rx) = &unbounded::<()>(); - std::thread::scope(move |s| { - let mut first_loop = true; - let client = Arc::new(Mutex::new(client)); + let (client_return_tx, client_return_rx) = &bounded::>(1); + client_return_tx.send(client).expect("Client init to succeed"); + std::thread::scope(|s| { + let mut first_loop = true; loop { log::trace!(first_loop; "Starting worker threads"); + let _ = req2idle_rx.try_iter().collect::>(); + let _ = idle2req_rx.try_iter().collect::>(); let _ = thread_end_rx.try_iter().collect::>(); - let _ = idle_initiate_rx.try_iter().collect::>(); - let _ = idle_confirm_rx.try_iter().collect::>(); + let _ = idle_entered_rx.try_iter().collect::>(); - let mut c = client.lock().expect("No other thread to hold client lock"); - let is_client_ok = check_connection(first_loop, &mut c, client_rx, event_tx); + log::trace!(first_loop; "Trying to get returned client"); + let mut client = match client_return_rx.recv() { + Ok(client) => client, + Err(err) => { + log::error!(err:?; "Did not receive client from the return channel"); + break; + } + }; + let is_client_ok = check_connection(first_loop, &mut client, client_rx, event_tx); first_loop = false; if is_client_ok { - let mut client_write = c.stream.try_clone().expect("Client write clone to succeed"); - drop(c); - let client1 = client.clone(); - let client2 = client.clone(); + let mut client_write = client.stream.try_clone().expect("Client write clone to succeed"); let idle = Builder::new() .name("idle".to_string()) @@ -70,17 +71,31 @@ fn client_task( tx: thread_end_ctx, }; - loop { + 'outer: loop { select! { - recv(idle_initiate_rx) -> msg => { - if let Err(err) = msg { - log::error!(err:?; "idle error"); - break; + recv(req2idle_rx) -> client => { + let mut client = match client { + Ok(c) => ClientDropGuard::new(client_return_tx, c), + Err(err) => { + log::error!(err:?; "idle recv error"); + break; + }, }; - if let Err(err) = listen_idle(&client1, idle_confirm_tx, event_tx, client_tx) { - log::error!(err:?; "idle error"); - break; + + log::trace!("Trying to acquire client lock for idle"); + + let idle_client = try_break!(client.enter_idle(), "Failed to enter idle state"); + try_break!(idle_entered_tx.send(()), "Failed to send idle confirmation"); + let events: Vec = try_break!(idle_client.read_response(), "Failed to read idle events"); + + log::trace!(events:?; "Got idle events"); + for ev in events { + if let Err(err) = event_tx.send(AppEvent::IdleEvent(ev)) { + log::error!(err:?; "Failed to send idle event"); + break 'outer; + } } + try_break!(idle2req_tx.send(client.consume()), "Failed to return client to request thread"); } recv(thread_end_rx) -> _ => { log::debug!("recv drop idle"); @@ -93,8 +108,8 @@ fn client_task( }) .expect("failed to spawn thread"); - try_skip!(idle_initiate_tx.send(()), "Failed to request for client idle"); - try_skip!(idle_confirm_rx.recv(), "Idle confirmation failed"); + try_skip!(req2idle_tx.send(client), "Failed to request for client idle"); + try_break!(idle_entered_rx.recv(), "Idle confirmation failed"); let work = Builder::new() .name("request".to_string()) @@ -115,9 +130,10 @@ fn client_task( buffer.push_back(msg); - log::trace!(buffer:?; "Trying to acquire client lock"); + log::trace!(buffer:?; "Trying to receive client from idle thread"); try_break!(client_write.write_all(b"noidle\n"), "Failed to write noidle"); - let mut client = try_break!(client2.lock(), "Failed to acquire client lock"); + let client = try_break!(idle2req_rx.recv(), "Failed to receive client from idle thread"); + let mut client = ClientDropGuard::new(client_return_tx, client); while let Some(request) = buffer.pop_front() { while let Ok(request) = client_rx.try_recv() { @@ -153,11 +169,9 @@ fn client_task( } } - drop(client); - log::trace!("Releasing client lock to idle"); - - try_break!(idle_initiate_tx.send(()), "Failed to request for client idle"); - try_break!(idle_confirm_rx.recv(), "Idle confirmation failed"); + log::trace!("Returning client lock to idle"); + try_break!(req2idle_tx.send(client.consume()), "Failed to request for client idle"); + try_break!(idle_entered_rx.recv(), "Idle confirmation failed"); }, recv(thread_end_rx) -> _ => { log::debug!("recv drop idle"); @@ -171,6 +185,8 @@ fn client_task( idle.join().expect("idle thread not to panic"); work.join().expect("work thread not to panic"); + } else { + client_return_tx.send(client).expect("To be able to return the client"); } let wait_time = std::time::Duration::from_secs(1); @@ -184,46 +200,64 @@ fn client_task( }); } -fn listen_idle( - client: &Arc>>, - idle_confirm_tx: &Sender<()>, - event_tx: &Sender, - client_tx: &Sender, -) -> Result<()> { - log::trace!("Trying to acquire client lock for idle"); - let mut client = match client.lock() { - Ok(c) => c, - Err(err) => { - log::error!(err:?; "Failed to acquire client lock"); - bail!("Failed to acquire client lock"); +mod drop_guard { + use std::ops::DerefMut; + + use crossbeam::channel::Sender; + + use crate::mpd::client::Client; + + pub struct ClientDropGuard<'sender, 'client> { + tx: &'sender Sender>, + client: Option>, + } + + impl<'sender, 'client> ClientDropGuard<'sender, 'client> { + pub fn new(tx: &'sender Sender>, client: Client<'client>) -> Self { + Self { + tx, + client: Some(client), + } + } + pub fn consume(mut self) -> Client<'client> { + self.client + .take() + .expect("ClientDropGuard not to be in inconsistent state. Cannot consume self because client was None.") } - }; + } - let idle_client = client.enter_idle().context("Failed to enter idle state")?; - idle_confirm_tx.send(()).context("Failed to send idle confirmation")?; - let events: Vec = idle_client.read_response().context("Failed to read idle events")?; + impl Drop for ClientDropGuard<'_, '_> { + fn drop(&mut self) { + if let Some(client) = self.client.take() { + log::trace!("Sending back client on drop"); + self.tx.send(client).expect("send to succeed"); + } + } + } - log::trace!(events:?; "Got idle events"); - for ev in events { - event_tx - .send(AppEvent::IdleEvent(ev)) - .context("Failed to send idle event")?; + impl<'client> std::ops::Deref for ClientDropGuard<'_, 'client> { + type Target = Client<'client>; + fn deref(&self) -> &Self::Target { + self.client.as_ref().expect("Cannot deref because client was None") + } } - if let Err(err) = client_tx.send(ClientRequest::CheckQueue) { - log::error!(err:?; "Failed to send idle event"); - bail!("Failed to send idle event"); - }; - Ok(()) -} -struct DropGuard<'a> { - tx: &'a Sender<()>, - name: &'a str, -} -impl Drop for DropGuard<'_> { - fn drop(&mut self) { - log::trace!(name = self.name; "sending drop notification"); - self.tx.send(()).expect("send to succeed"); + impl DerefMut for ClientDropGuard<'_, '_> { + fn deref_mut(&mut self) -> &mut Self::Target { + self.client.as_mut().expect("Cannot deref_mut because client was None") + } + } + + pub struct DropGuard<'a> { + pub tx: &'a Sender<()>, + pub name: &'a str, + } + + impl Drop for DropGuard<'_> { + fn drop(&mut self) { + log::trace!(name = self.name; "sending drop notification"); + self.tx.send(()).expect("send to succeed"); + } } } @@ -258,6 +292,5 @@ fn handle_client_request(client: &mut Client<'_>, request: ClientRequest) -> Res (command.callback)(client)?; Ok(WorkDone::None) } - ClientRequest::CheckQueue => Ok(WorkDone::None), } } diff --git a/src/main.rs b/src/main.rs index c75332f..2eeebf9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -218,7 +218,7 @@ fn main() -> Result<()> { render_loop.start()?; } - core::client::init(client_rx.clone(), client_tx.clone(), event_tx.clone(), client)?; + core::client::init(client_rx.clone(), event_tx.clone(), client)?; core::work::init(worker_rx.clone(), client_tx.clone(), event_tx.clone(), context.config)?; core::input::init(event_tx.clone())?; let event_loop_handle = core::event_loop::init(context, event_rx, render_loop, terminal)?; diff --git a/src/shared/events.rs b/src/shared/events.rs index f1e39dd..b4d4771 100644 --- a/src/shared/events.rs +++ b/src/shared/events.rs @@ -17,7 +17,6 @@ use super::{ pub(crate) enum ClientRequest { MpdQuery(MpdQuery), MpdCommand(MpdCommand), - CheckQueue, } #[derive(Debug)]