Skip to content

Commit

Permalink
refactor: remove mutex in client thread (#183)
Browse files Browse the repository at this point in the history
  • Loading branch information
mierak authored Dec 21, 2024
1 parent 7935d12 commit fc2552c
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 78 deletions.
185 changes: 109 additions & 76 deletions src/core/client.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use anyhow::{bail, Context, Result};
use anyhow::Result;
use drop_guard::{ClientDropGuard, DropGuard};
use std::{
collections::VecDeque,
io::{self, Write},
sync::{Arc, Mutex},
thread::Builder,
};

Expand All @@ -22,45 +22,46 @@ use crate::{

pub fn init(
client_rx: Receiver<ClientRequest>,
client_tx: Sender<ClientRequest>,
event_tx: Sender<AppEvent>,
client: Client<'static>,
) -> io::Result<std::thread::JoinHandle<()>> {
std::thread::Builder::new()
.name("client task".to_owned())
.spawn(move || client_task(&client_rx, &client_tx, &event_tx, client))
.spawn(move || client_task(&client_rx, &event_tx, client))
}

fn client_task(
client_rx: &Receiver<ClientRequest>,
client_tx: &Sender<ClientRequest>,
event_tx: &Sender<AppEvent>,
client: Client<'_>,
) {
let (idle_initiate_tx, idle_initiate_rx) = &bounded::<()>(0);
let (idle_confirm_tx, idle_confirm_rx) = &bounded::<()>(0);
fn client_task(client_rx: &Receiver<ClientRequest>, event_tx: &Sender<AppEvent>, client: Client<'_>) {
let (req2idle_tx, req2idle_rx) = &bounded::<Client<'_>>(0);
let (idle2req_tx, idle2req_rx) = &bounded::<Client<'_>>(0);
let (idle_entered_tx, idle_entered_rx) = &bounded::<()>(0);
let (thread_end_ctx, thread_end_rx) = &unbounded::<()>();

std::thread::scope(move |s| {
let mut first_loop = true;
let client = Arc::new(Mutex::new(client));
let (client_return_tx, client_return_rx) = &bounded::<Client<'_>>(1);
client_return_tx.send(client).expect("Client init to succeed");

std::thread::scope(|s| {
let mut first_loop = true;
loop {
log::trace!(first_loop; "Starting worker threads");

let _ = req2idle_rx.try_iter().collect::<Vec<_>>();
let _ = idle2req_rx.try_iter().collect::<Vec<_>>();
let _ = thread_end_rx.try_iter().collect::<Vec<_>>();
let _ = idle_initiate_rx.try_iter().collect::<Vec<_>>();
let _ = idle_confirm_rx.try_iter().collect::<Vec<_>>();
let _ = idle_entered_rx.try_iter().collect::<Vec<_>>();

let mut c = client.lock().expect("No other thread to hold client lock");
let is_client_ok = check_connection(first_loop, &mut c, client_rx, event_tx);
log::trace!(first_loop; "Trying to get returned client");
let mut client = match client_return_rx.recv() {
Ok(client) => client,
Err(err) => {
log::error!(err:?; "Did not receive client from the return channel");
break;
}
};
let is_client_ok = check_connection(first_loop, &mut client, client_rx, event_tx);
first_loop = false;

if is_client_ok {
let mut client_write = c.stream.try_clone().expect("Client write clone to succeed");
drop(c);
let client1 = client.clone();
let client2 = client.clone();
let mut client_write = client.stream.try_clone().expect("Client write clone to succeed");

let idle = Builder::new()
.name("idle".to_string())
Expand All @@ -70,17 +71,31 @@ fn client_task(
tx: thread_end_ctx,
};

loop {
'outer: loop {
select! {
recv(idle_initiate_rx) -> msg => {
if let Err(err) = msg {
log::error!(err:?; "idle error");
break;
recv(req2idle_rx) -> client => {
let mut client = match client {
Ok(c) => ClientDropGuard::new(client_return_tx, c),
Err(err) => {
log::error!(err:?; "idle recv error");
break;
},
};
if let Err(err) = listen_idle(&client1, idle_confirm_tx, event_tx, client_tx) {
log::error!(err:?; "idle error");
break;

log::trace!("Trying to acquire client lock for idle");

let idle_client = try_break!(client.enter_idle(), "Failed to enter idle state");
try_break!(idle_entered_tx.send(()), "Failed to send idle confirmation");
let events: Vec<IdleEvent> = try_break!(idle_client.read_response(), "Failed to read idle events");

log::trace!(events:?; "Got idle events");
for ev in events {
if let Err(err) = event_tx.send(AppEvent::IdleEvent(ev)) {
log::error!(err:?; "Failed to send idle event");
break 'outer;
}
}
try_break!(idle2req_tx.send(client.consume()), "Failed to return client to request thread");
}
recv(thread_end_rx) -> _ => {
log::debug!("recv drop idle");
Expand All @@ -93,8 +108,8 @@ fn client_task(
})
.expect("failed to spawn thread");

try_skip!(idle_initiate_tx.send(()), "Failed to request for client idle");
try_skip!(idle_confirm_rx.recv(), "Idle confirmation failed");
try_skip!(req2idle_tx.send(client), "Failed to request for client idle");
try_break!(idle_entered_rx.recv(), "Idle confirmation failed");

let work = Builder::new()
.name("request".to_string())
Expand All @@ -115,9 +130,10 @@ fn client_task(

buffer.push_back(msg);

log::trace!(buffer:?; "Trying to acquire client lock");
log::trace!(buffer:?; "Trying to receive client from idle thread");
try_break!(client_write.write_all(b"noidle\n"), "Failed to write noidle");
let mut client = try_break!(client2.lock(), "Failed to acquire client lock");
let client = try_break!(idle2req_rx.recv(), "Failed to receive client from idle thread");
let mut client = ClientDropGuard::new(client_return_tx, client);

while let Some(request) = buffer.pop_front() {
while let Ok(request) = client_rx.try_recv() {
Expand Down Expand Up @@ -153,11 +169,9 @@ fn client_task(
}
}

drop(client);
log::trace!("Releasing client lock to idle");

try_break!(idle_initiate_tx.send(()), "Failed to request for client idle");
try_break!(idle_confirm_rx.recv(), "Idle confirmation failed");
log::trace!("Returning client lock to idle");
try_break!(req2idle_tx.send(client.consume()), "Failed to request for client idle");
try_break!(idle_entered_rx.recv(), "Idle confirmation failed");
},
recv(thread_end_rx) -> _ => {
log::debug!("recv drop idle");
Expand All @@ -171,6 +185,8 @@ fn client_task(

idle.join().expect("idle thread not to panic");
work.join().expect("work thread not to panic");
} else {
client_return_tx.send(client).expect("To be able to return the client");
}

let wait_time = std::time::Duration::from_secs(1);
Expand All @@ -184,46 +200,64 @@ fn client_task(
});
}

fn listen_idle(
client: &Arc<Mutex<Client<'_>>>,
idle_confirm_tx: &Sender<()>,
event_tx: &Sender<AppEvent>,
client_tx: &Sender<ClientRequest>,
) -> Result<()> {
log::trace!("Trying to acquire client lock for idle");
let mut client = match client.lock() {
Ok(c) => c,
Err(err) => {
log::error!(err:?; "Failed to acquire client lock");
bail!("Failed to acquire client lock");
mod drop_guard {
use std::ops::DerefMut;

use crossbeam::channel::Sender;

use crate::mpd::client::Client;

pub struct ClientDropGuard<'sender, 'client> {
tx: &'sender Sender<Client<'client>>,
client: Option<Client<'client>>,
}

impl<'sender, 'client> ClientDropGuard<'sender, 'client> {
pub fn new(tx: &'sender Sender<Client<'client>>, client: Client<'client>) -> Self {
Self {
tx,
client: Some(client),
}
}
pub fn consume(mut self) -> Client<'client> {
self.client
.take()
.expect("ClientDropGuard not to be in inconsistent state. Cannot consume self because client was None.")
}
};
}

let idle_client = client.enter_idle().context("Failed to enter idle state")?;
idle_confirm_tx.send(()).context("Failed to send idle confirmation")?;
let events: Vec<IdleEvent> = idle_client.read_response().context("Failed to read idle events")?;
impl Drop for ClientDropGuard<'_, '_> {
fn drop(&mut self) {
if let Some(client) = self.client.take() {
log::trace!("Sending back client on drop");
self.tx.send(client).expect("send to succeed");
}
}
}

log::trace!(events:?; "Got idle events");
for ev in events {
event_tx
.send(AppEvent::IdleEvent(ev))
.context("Failed to send idle event")?;
impl<'client> std::ops::Deref for ClientDropGuard<'_, 'client> {
type Target = Client<'client>;
fn deref(&self) -> &Self::Target {
self.client.as_ref().expect("Cannot deref because client was None")
}
}
if let Err(err) = client_tx.send(ClientRequest::CheckQueue) {
log::error!(err:?; "Failed to send idle event");
bail!("Failed to send idle event");
};
Ok(())
}

struct DropGuard<'a> {
tx: &'a Sender<()>,
name: &'a str,
}
impl Drop for DropGuard<'_> {
fn drop(&mut self) {
log::trace!(name = self.name; "sending drop notification");
self.tx.send(()).expect("send to succeed");
impl DerefMut for ClientDropGuard<'_, '_> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.client.as_mut().expect("Cannot deref_mut because client was None")
}
}

pub struct DropGuard<'a> {
pub tx: &'a Sender<()>,
pub name: &'a str,
}

impl Drop for DropGuard<'_> {
fn drop(&mut self) {
log::trace!(name = self.name; "sending drop notification");
self.tx.send(()).expect("send to succeed");
}
}
}

Expand Down Expand Up @@ -258,6 +292,5 @@ fn handle_client_request(client: &mut Client<'_>, request: ClientRequest) -> Res
(command.callback)(client)?;
Ok(WorkDone::None)
}
ClientRequest::CheckQueue => Ok(WorkDone::None),
}
}
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ fn main() -> Result<()> {
render_loop.start()?;
}

core::client::init(client_rx.clone(), client_tx.clone(), event_tx.clone(), client)?;
core::client::init(client_rx.clone(), event_tx.clone(), client)?;
core::work::init(worker_rx.clone(), client_tx.clone(), event_tx.clone(), context.config)?;
core::input::init(event_tx.clone())?;
let event_loop_handle = core::event_loop::init(context, event_rx, render_loop, terminal)?;
Expand Down
1 change: 0 additions & 1 deletion src/shared/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use super::{
pub(crate) enum ClientRequest {
MpdQuery(MpdQuery),
MpdCommand(MpdCommand),
CheckQueue,
}

#[derive(Debug)]
Expand Down

0 comments on commit fc2552c

Please sign in to comment.