Skip to content

Commit 8e7fa0c

Browse files
committed
Improve error handling
Propagate errors in the handlers. The errors are now caught in the `disconnect` function but we should work towards notifying the user even sooner. This is an improvement though, as previously the user hasn't been notified at all.
1 parent 9bff8a9 commit 8e7fa0c

File tree

3 files changed

+33
-19
lines changed

3 files changed

+33
-19
lines changed

src/connections/handlers.rs

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::errors_internal::{Error, InternalStreamError};
1+
use crate::errors_internal::{Error, InternalChannelError, InternalStreamError};
22
use crate::protobufs;
33
use crate::types::EncodedToRadioPacketWithHeader;
44
use log::{debug, error, trace};
@@ -16,7 +16,7 @@ pub fn spawn_read_handler<R>(
1616
cancellation_token: CancellationToken,
1717
read_stream: R,
1818
read_output_tx: UnboundedSender<IncomingStreamData>,
19-
) -> JoinHandle<()>
19+
) -> JoinHandle<Result<(), Error>>
2020
where
2121
R: AsyncReadExt + Send + Unpin + 'static,
2222
{
@@ -27,9 +27,11 @@ where
2727
tokio::select! {
2828
_ = cancellation_token.cancelled() => {
2929
debug!("Read handler cancelled");
30+
Ok(())
3031
}
3132
e = handle => {
3233
error!("Read handler unexpectedly terminated: {:#?}", e);
34+
e
3335
}
3436
}
3537
})
@@ -82,7 +84,7 @@ pub fn spawn_write_handler<W>(
8284
cancellation_token: CancellationToken,
8385
write_stream: W,
8486
write_input_rx: tokio::sync::mpsc::UnboundedReceiver<EncodedToRadioPacketWithHeader>,
85-
) -> JoinHandle<()>
87+
) -> JoinHandle<Result<(), Error>>
8688
where
8789
W: AsyncWriteExt + Send + Unpin + 'static,
8890
{
@@ -91,10 +93,14 @@ where
9193
spawn(async move {
9294
tokio::select! {
9395
_ = cancellation_token.cancelled() => {
94-
debug!("Write handler cancelled");
96+
debug!("Write handler cancelled");
97+
Ok(())
9598
}
96-
_ = handle => {
97-
error!("Write handler unexpectedly terminated");
99+
write_result = handle => {
100+
if let Err(e) = &write_result {
101+
error!("Write handler unexpectedly terminated {e:?}");
102+
}
103+
write_result
98104
}
99105
}
100106
})
@@ -132,16 +138,18 @@ pub fn spawn_processing_handler(
132138
cancellation_token: CancellationToken,
133139
read_output_rx: UnboundedReceiver<IncomingStreamData>,
134140
decoded_packet_tx: UnboundedSender<protobufs::FromRadio>,
135-
) -> JoinHandle<()> {
141+
) -> JoinHandle<Result<(), Error>> {
136142
let handle = start_processing_handler(read_output_rx, decoded_packet_tx);
137143

138144
spawn(async move {
139145
tokio::select! {
140146
_ = cancellation_token.cancelled() => {
141-
debug!("Message processing handler cancelled");
147+
debug!("Message processing handler cancelled");
148+
Ok(())
142149
}
143150
_ = handle => {
144-
error!("Message processing handler unexpectedly terminated");
151+
error!("Message processing handler unexpectedly terminated");
152+
Err(Error::InternalChannelError(InternalChannelError::ChannelClosedEarly {}))
145153
}
146154
}
147155
})
@@ -150,7 +158,7 @@ pub fn spawn_processing_handler(
150158
async fn start_processing_handler(
151159
mut read_output_rx: tokio::sync::mpsc::UnboundedReceiver<IncomingStreamData>,
152160
decoded_packet_tx: UnboundedSender<protobufs::FromRadio>,
153-
) -> Result<(), Error> {
161+
) {
154162
trace!("Started message processing handler");
155163

156164
let mut buffer = StreamBuffer::new(decoded_packet_tx);
@@ -161,6 +169,4 @@ async fn start_processing_handler(
161169
}
162170

163171
trace!("Processing read_output_rx channel closed");
164-
165-
Ok(())
166172
}

src/connections/stream_api.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use futures_util::future::join3;
12
use log::trace;
23
use prost::Message;
34
use std::{fmt::Display, marker::PhantomData};
@@ -70,9 +71,9 @@ pub struct StreamApi;
7071
pub struct ConnectedStreamApi<State = state::Configured> {
7172
write_input_tx: UnboundedSender<EncodedToRadioPacketWithHeader>,
7273

73-
read_handle: JoinHandle<()>,
74-
write_handle: JoinHandle<()>,
75-
processing_handle: JoinHandle<()>,
74+
read_handle: JoinHandle<Result<(), Error>>,
75+
write_handle: JoinHandle<Result<(), Error>>,
76+
processing_handle: JoinHandle<Result<(), Error>>,
7677

7778
cancellation_token: CancellationToken,
7879

@@ -586,11 +587,15 @@ impl ConnectedStreamApi<state::Configured> {
586587

587588
// Close worker threads
588589

589-
self.read_handle.await?;
590-
self.write_handle.await?;
591-
self.processing_handle.await?;
590+
let (read_result, write_result, processing_result) =
591+
join3(self.read_handle, self.write_handle, self.processing_handle).await;
592592

593-
trace!("TCP handlers fully disconnected");
593+
// Note: we only return the first error.
594+
read_result??;
595+
write_result??;
596+
processing_result??;
597+
598+
trace!("Handlers fully disconnected");
594599

595600
Ok(StreamApi)
596601
}

src/errors_internal.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,9 @@ pub enum InternalChannelError {
8686
/// An error indicating that the library failed to write to an internal data channel.
8787
#[error(transparent)]
8888
IncomingStreamDataWriteError(#[from] tokio::sync::mpsc::error::SendError<IncomingStreamData>),
89+
90+
#[error("Channel unexpectedly closed")]
91+
ChannelClosedEarly,
8992
}
9093

9194
mod test {

0 commit comments

Comments
 (0)