Skip to content

Improve error handling #7

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions src/connections/handlers.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::errors_internal::{Error, InternalStreamError};
use crate::errors_internal::{Error, InternalChannelError, InternalStreamError};
use crate::protobufs;
use crate::types::EncodedToRadioPacketWithHeader;
use log::{debug, error, trace};
Expand All @@ -16,7 +16,7 @@ pub fn spawn_read_handler<R>(
cancellation_token: CancellationToken,
read_stream: R,
read_output_tx: UnboundedSender<IncomingStreamData>,
) -> JoinHandle<()>
) -> JoinHandle<Result<(), Error>>
where
R: AsyncReadExt + Send + Unpin + 'static,
{
Expand All @@ -27,9 +27,11 @@ where
tokio::select! {
_ = cancellation_token.cancelled() => {
debug!("Read handler cancelled");
Ok(())
}
e = handle => {
error!("Read handler unexpectedly terminated: {:#?}", e);
e
}
}
})
Expand Down Expand Up @@ -82,7 +84,7 @@ pub fn spawn_write_handler<W>(
cancellation_token: CancellationToken,
write_stream: W,
write_input_rx: tokio::sync::mpsc::UnboundedReceiver<EncodedToRadioPacketWithHeader>,
) -> JoinHandle<()>
) -> JoinHandle<Result<(), Error>>
where
W: AsyncWriteExt + Send + Unpin + 'static,
{
Expand All @@ -91,10 +93,14 @@ where
spawn(async move {
tokio::select! {
_ = cancellation_token.cancelled() => {
debug!("Write handler cancelled");
debug!("Write handler cancelled");
Ok(())
}
_ = handle => {
error!("Write handler unexpectedly terminated");
write_result = handle => {
if let Err(e) = &write_result {
error!("Write handler unexpectedly terminated {e:?}");
}
write_result
}
}
})
Expand Down Expand Up @@ -132,16 +138,18 @@ pub fn spawn_processing_handler(
cancellation_token: CancellationToken,
read_output_rx: UnboundedReceiver<IncomingStreamData>,
decoded_packet_tx: UnboundedSender<protobufs::FromRadio>,
) -> JoinHandle<()> {
) -> JoinHandle<Result<(), Error>> {
let handle = start_processing_handler(read_output_rx, decoded_packet_tx);

spawn(async move {
tokio::select! {
_ = cancellation_token.cancelled() => {
debug!("Message processing handler cancelled");
debug!("Message processing handler cancelled");
Ok(())
}
_ = handle => {
error!("Message processing handler unexpectedly terminated");
error!("Message processing handler unexpectedly terminated");
Err(Error::InternalChannelError(InternalChannelError::ChannelClosedEarly {}))
}
}
})
Expand All @@ -150,7 +158,7 @@ pub fn spawn_processing_handler(
async fn start_processing_handler(
mut read_output_rx: tokio::sync::mpsc::UnboundedReceiver<IncomingStreamData>,
decoded_packet_tx: UnboundedSender<protobufs::FromRadio>,
) -> Result<(), Error> {
) {
trace!("Started message processing handler");

let mut buffer = StreamBuffer::new(decoded_packet_tx);
Expand All @@ -161,6 +169,4 @@ async fn start_processing_handler(
}

trace!("Processing read_output_rx channel closed");

Ok(())
}
19 changes: 12 additions & 7 deletions src/connections/stream_api.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use futures_util::future::join3;
use log::trace;
use prost::Message;
use std::{fmt::Display, marker::PhantomData};
Expand Down Expand Up @@ -70,9 +71,9 @@ pub struct StreamApi;
pub struct ConnectedStreamApi<State = state::Configured> {
write_input_tx: UnboundedSender<EncodedToRadioPacketWithHeader>,

read_handle: JoinHandle<()>,
write_handle: JoinHandle<()>,
processing_handle: JoinHandle<()>,
read_handle: JoinHandle<Result<(), Error>>,
write_handle: JoinHandle<Result<(), Error>>,
processing_handle: JoinHandle<Result<(), Error>>,

cancellation_token: CancellationToken,

Expand Down Expand Up @@ -586,11 +587,15 @@ impl ConnectedStreamApi<state::Configured> {

// Close worker threads

self.read_handle.await?;
self.write_handle.await?;
self.processing_handle.await?;
let (read_result, write_result, processing_result) =
join3(self.read_handle, self.write_handle, self.processing_handle).await;

trace!("TCP handlers fully disconnected");
// Note: we only return the first error.
read_result??;
write_result??;
processing_result??;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the ?? something the user will need to do regularly? This feels like an annoying pattern

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an internal function and we can certainly make it nicer in a public interface, but I haven't finalized my thoughts on it.


trace!("Handlers fully disconnected");

Ok(StreamApi)
}
Expand Down
3 changes: 3 additions & 0 deletions src/errors_internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ pub enum InternalChannelError {
/// An error indicating that the library failed to write to an internal data channel.
#[error(transparent)]
IncomingStreamDataWriteError(#[from] tokio::sync::mpsc::error::SendError<IncomingStreamData>),

#[error("Channel unexpectedly closed")]
ChannelClosedEarly,
}

mod test {
Expand Down