Skip to content

Commit

Permalink
Merge pull request #11 from meshtastic/ajmcquilkin/connection-hang-de…
Browse files Browse the repository at this point in the history
…bugging

Connection reliability fixes
  • Loading branch information
ajmcquilkin committed Mar 20, 2024
2 parents aa95930 + 501ad98 commit a45b777
Show file tree
Hide file tree
Showing 9 changed files with 676 additions and 118 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,4 @@ jobs:
- name: Run test suite
working-directory: ./
run: cargo test
run: cargo test --features ts-gen
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ categories = ["embedded", "config", "encoding"]
authors = ["Adam McQuilkin"]
readme = "README.md"
license = "GPL-3.0"
version = "0.1.5"
version = "0.1.6"
edition = "2021"

[lib]
Expand Down Expand Up @@ -54,3 +54,7 @@ serde_json = { version = "1.0", optional = true }
thiserror = "1.0.48"
uuid = "1.6.1"
btleplug = "0.11.5"

[dev-dependencies]
fern = { version = "0.6.2", features = ["colored"] }
humantime = "2.1.0"
24 changes: 24 additions & 0 deletions examples/basic_serial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,36 @@
extern crate meshtastic;

use std::io::{self, BufRead};
use std::time::SystemTime;

use meshtastic::api::StreamApi;
use meshtastic::utils;

/// Set up the logger to output to stdout
/// **Note:** the invokation of this function is commented out in main by default.
fn setup_logger() -> Result<(), fern::InitError> {
fern::Dispatch::new()
.format(|out, message, record| {
out.finish(format_args!(
"[{} {} {}] {}",
humantime::format_rfc3339_seconds(SystemTime::now()),
record.level(),
record.target(),
message
))
})
.level(log::LevelFilter::Trace)
.chain(std::io::stdout())
.apply()?;

Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Uncomment this to enable logging
// setup_logger()?;

let stream_api = StreamApi::new();

let available_ports = utils::stream::available_serial_ports()?;
Expand Down
24 changes: 24 additions & 0 deletions examples/basic_tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,36 @@
extern crate meshtastic;

use std::io::{self, BufRead};
use std::time::SystemTime;

use meshtastic::api::StreamApi;
use meshtastic::utils;

/// Set up the logger to output to stdout
/// **Note:** the invokation of this function is commented out in main by default.
fn setup_logger() -> Result<(), fern::InitError> {
fern::Dispatch::new()
.format(|out, message, record| {
out.finish(format_args!(
"[{} {} {}] {}",
humantime::format_rfc3339_seconds(SystemTime::now()),
record.level(),
record.target(),
message
))
})
.level(log::LevelFilter::Trace)
.chain(std::io::stdout())
.apply()?;

Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Uncomment this to enable logging
// setup_logger()?;

let stream_api = StreamApi::new();

println!("Enter the address of a TCP port to connect to, in the form \"IP:PORT\":");
Expand Down
84 changes: 81 additions & 3 deletions src/connections/handlers.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use crate::errors_internal::{Error, InternalChannelError, InternalStreamError};
use crate::protobufs;
use crate::types::EncodedToRadioPacketWithHeader;
use crate::utils::format_data_packet;
use log::{debug, error, trace};
use prost::Message;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::spawn;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
Expand All @@ -12,6 +14,10 @@ use crate::connections::stream_buffer::StreamBuffer;

use super::wrappers::encoded_data::IncomingStreamData;

/// Interval for sending heartbeat packets to the radio (in seconds).
/// Needs to be less than this: https://github.com/meshtastic/firmware/blob/eb372c190ec82366998c867acc609a418130d842/src/SerialConsole.cpp#L8
pub const CLIENT_HEARTBEAT_INTERVAL: u64 = 5 * 60; // 5 minutes

pub fn spawn_read_handler<R>(
cancellation_token: CancellationToken,
read_stream: R,
Expand Down Expand Up @@ -159,14 +165,86 @@ async fn start_processing_handler(
mut read_output_rx: tokio::sync::mpsc::UnboundedReceiver<IncomingStreamData>,
decoded_packet_tx: UnboundedSender<protobufs::FromRadio>,
) {
trace!("Started message processing handler");
debug!("Started message processing handler");

let mut buffer = StreamBuffer::new(decoded_packet_tx);

while let Some(message) = read_output_rx.recv().await {
trace!("Processing {} bytes from radio", message.data().len());
buffer.process_incoming_bytes(message);
}

trace!("Processing read_output_rx channel closed");
debug!("Processing read_output_rx channel closed");
}

pub fn spawn_heartbeat_handler(
cancellation_token: CancellationToken,
write_input_tx: UnboundedSender<EncodedToRadioPacketWithHeader>,
) -> JoinHandle<Result<(), Error>> {
let handle = start_heartbeat_handler(cancellation_token.clone(), write_input_tx);

spawn(async move {
tokio::select! {
_ = cancellation_token.cancelled() => {
debug!("Heartbeat handler cancelled");
Ok(())
}
write_result = handle => {
if let Err(e) = &write_result {
error!("Heartbeat handler unexpectedly terminated {e:?}");
}
write_result
}
}
})
}

async fn start_heartbeat_handler(
_cancellation_token: CancellationToken,
write_input_tx: UnboundedSender<EncodedToRadioPacketWithHeader>,
) -> Result<(), Error> {
debug!("Started heartbeat handler");

loop {
tokio::time::sleep(std::time::Duration::from_secs(CLIENT_HEARTBEAT_INTERVAL)).await;

let heartbeat_packet = protobufs::ToRadio {
payload_variant: Some(protobufs::to_radio::PayloadVariant::Heartbeat(
protobufs::Heartbeat::default(),
)),
};

let mut buffer = Vec::new();
match heartbeat_packet.encode(&mut buffer) {
Ok(_) => (),
Err(e) => {
error!("Error encoding heartbeat packet: {:?}", e);
continue;
}
};

let packet_with_header = match format_data_packet(buffer.into()) {
Ok(p) => p,
Err(e) => {
error!("Error formatting heartbeat packet: {:?}", e);
continue;
}
};

trace!("Sending heartbeat packet");

if let Err(e) = write_input_tx.send(packet_with_header) {
error!("Error writing heartbeat packet to stream: {:?}", e);
return Err(Error::InternalStreamError(
InternalStreamError::StreamWriteError {
source: Box::new(e),
},
));
}

log::info!("Sent heartbeat packet");
}

// debug!("Heartbeat handler finished");

// Return type should be never (!)
}
8 changes: 8 additions & 0 deletions src/connections/stream_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ pub struct ConnectedStreamApi<State = state::Configured> {
read_handle: JoinHandle<Result<(), Error>>,
write_handle: JoinHandle<Result<(), Error>>,
processing_handle: JoinHandle<Result<(), Error>>,
heartbeat_handle: JoinHandle<Result<(), Error>>,

cancellation_token: CancellationToken,

Expand Down Expand Up @@ -197,6 +198,8 @@ impl<State> ConnectedStreamApi<State> {
priority: 0, // * not transmitted
rx_rssi: 0, // * not transmitted
delayed: 0, // * not transmitted
hop_start: 0, // * set on device
via_mqtt: false,
from: own_node_id.id(),
to: packet_destination.id(),
id: generate_rand_id(),
Expand Down Expand Up @@ -447,6 +450,9 @@ impl StreamApi {
decoded_packet_tx,
);

let heartbeat_handle =
handlers::spawn_heartbeat_handler(cancellation_token.clone(), write_input_tx.clone());

// Persist channels and kill switch to struct

let write_input_tx = write_input_tx;
Expand All @@ -461,6 +467,7 @@ impl StreamApi {
read_handle,
write_handle,
processing_handle,
heartbeat_handle,
cancellation_token,
typestate: PhantomData,
},
Expand Down Expand Up @@ -536,6 +543,7 @@ impl ConnectedStreamApi<state::Connected> {
read_handle: self.read_handle,
write_handle: self.write_handle,
processing_handle: self.processing_handle,
heartbeat_handle: self.heartbeat_handle,
cancellation_token: self.cancellation_token,
typestate: PhantomData,
})
Expand Down
Loading

0 comments on commit a45b777

Please sign in to comment.