diff --git a/ci_test.sh b/ci_test.sh index 6683800..f504132 100755 --- a/ci_test.sh +++ b/ci_test.sh @@ -1,8 +1,13 @@ #!/bin/bash +set -e +set -x + cargo build roslaunch launch/test_launch.launch & sleep 2 cargo run --example amiga-mock-server -- --port 50050 & -cargo run -- --test-mode --port 50050 & -cargo run --example test_amiga_vel_publisher_subscriber +sleep 2 +cargo run -- --port 50050 & +sleep 2 +cargo run --example test_amiga_vel_pubsub echo DONE diff --git a/examples/amiga-mock-server.rs b/examples/amiga-mock-server.rs index d5cdc80..59d0c4a 100644 --- a/examples/amiga-mock-server.rs +++ b/examples/amiga-mock-server.rs @@ -81,7 +81,7 @@ impl CanbusService for AmigaMockService { match item { Ok(v) => { let state = v.command.clone().unwrap(); - debug!("state received: {:?}", state); + info!("cmd received: {:?}", state); let twist = Twist2d { linear_velocity_x: state.linear_velocity_x, @@ -139,6 +139,8 @@ impl CanbusService for AmigaMockService { })) .await .unwrap(); + // throttle to 20Hz + tokio::time::sleep(std::time::Duration::from_millis(50)).await; } }); @@ -160,7 +162,7 @@ struct Args { #[tokio::main] async fn main() -> Result<(), Box> { let subscriber = FmtSubscriber::builder() - .with_max_level(Level::DEBUG) + .with_max_level(Level::INFO) .finish(); tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed"); @@ -171,7 +173,7 @@ async fn main() -> Result<(), Box> { let server = AmigaMockService { twist_state: Arc::new(Mutex::new(Twist2d::default())), }; - info!("Trying to start server."); + debug!("Trying to start server."); Server::builder() .add_service(CanbusServiceServer::new(server)) diff --git a/examples/amiga_vel_subscriber.rs b/examples/amiga_vel_subscriber.rs index 4bd8c8b..74457bd 100644 --- a/examples/amiga_vel_subscriber.rs +++ b/examples/amiga_vel_subscriber.rs @@ -4,7 +4,7 @@ use tracing_subscriber::FmtSubscriber; fn main() { tracing::subscriber::set_global_default( FmtSubscriber::builder() - .with_max_level(Level::DEBUG) + .with_max_level(Level::INFO) .finish(), ) .expect("setting default subscriber failed"); @@ -30,9 +30,6 @@ fn main() { .unwrap(); info!("amiga_vel_subscriber launched"); - // Block the thread until a count=10 - - //loop { while rosrust::is_ok() { // Sleep to maintain 20Hz rate rate.sleep(); diff --git a/examples/test_amiga_vel_publisher_subscriber.rs b/examples/test_amiga_vel_publisher_subscriber.rs deleted file mode 100644 index 9e0aaa1..0000000 --- a/examples/test_amiga_vel_publisher_subscriber.rs +++ /dev/null @@ -1,66 +0,0 @@ -use tracing::{debug, info, Level}; -use tracing_subscriber::FmtSubscriber; - -fn main() { - tracing::subscriber::set_global_default( - FmtSubscriber::builder() - .with_max_level(Level::DEBUG) - .finish(), - ) - .expect("setting default subscriber failed"); - - // Initialize node - rosrust::init("amiga_vel_subscriber"); - - // Create object that maintains 20Hz between sleep requests - let rate = rosrust::rate(20.0); - - let arc_count = std::sync::Arc::new(std::sync::Mutex::::new(0)); - let arc_count_clone = arc_count.clone(); - - let _subscriber_raii = rosrust::subscribe( - "/amiga/vel", - 100, - move |v: rosrust_msg::geometry_msgs::TwistStamped| { - debug!("Received: {:?}", v); - let mut data = arc_count_clone.lock().unwrap(); - *data += 1; - }, - ) - .unwrap(); - info!("amiga_vel_subscriber launched"); - let cmd_vel_pub = rosrust::publish("/amiga/cmd_vel", 2).unwrap(); - - let mut pub_num = 0; - while rosrust::is_ok() { - if pub_num < 10 { - // Create string message - let msg = rosrust_msg::geometry_msgs::Twist { - linear: rosrust_msg::geometry_msgs::Vector3 { - x: pub_num as f64 * 0.001, - y: 0.0, - z: 0.0, - }, - angular: rosrust_msg::geometry_msgs::Vector3 { - x: 0.0, - y: 0.0, - z: 0.0, - }, - }; - - // Send string message to topic via publisher - cmd_vel_pub.send(msg).unwrap(); - pub_num += 1; - } - - // Sleep to maintain 20Hz rate - rate.sleep(); - let c = *arc_count.lock().unwrap(); - info!("amiga_vel_subscriber got: {}", c); - - if *arc_count.lock().unwrap() > 5 { - break; - } - } - info!("amiga_vel_subscriber done"); -} diff --git a/examples/test_amiga_vel_pubsub.rs b/examples/test_amiga_vel_pubsub.rs new file mode 100644 index 0000000..13ecd7b --- /dev/null +++ b/examples/test_amiga_vel_pubsub.rs @@ -0,0 +1,72 @@ +use tracing::{info, Level}; +use tracing_subscriber::FmtSubscriber; + +fn main() { + tracing::subscriber::set_global_default( + FmtSubscriber::builder() + .with_max_level(Level::DEBUG) + .finish(), + ) + .expect("setting default subscriber failed"); + + // Initialize node + rosrust::init("amiga_vel_subscriber"); + + // Create object that maintains 20Hz between sleep requests + let rate = rosrust::rate(20.0); + + let shared_state = std::sync::Arc::new(std::sync::Mutex::< + rosrust_msg::geometry_msgs::TwistStamped, + >::default()); + let shared_state_clone = shared_state.clone(); + + let _subscriber_raii = rosrust::subscribe( + "/amiga/vel", + 100, + move |v: rosrust_msg::geometry_msgs::TwistStamped| { + info!("Received: {:?}", v); + let mut data = shared_state_clone.lock().unwrap(); + *data = v; + }, + ) + .unwrap(); + info!("amiga_vel_subscriber launched"); + let cmd_vel_pub = rosrust::publish("/amiga/cmd_vel", 2).unwrap(); + + let mut pub_num = 0; + while rosrust::is_ok() { + info!("HERE!!!"); + + // Create string message + let msg = rosrust_msg::geometry_msgs::Twist { + linear: rosrust_msg::geometry_msgs::Vector3 { + x: pub_num as f64 * 0.1, + y: 0.0, + z: 0.0, + }, + angular: rosrust_msg::geometry_msgs::Vector3 { + x: 0.0, + y: 0.0, + z: 0.5, + }, + }; + + // Send string message to topic via publisher + cmd_vel_pub.send(msg).unwrap(); + pub_num += 1; + + // Sleep to maintain 20Hz rate + rate.sleep(); + let state = shared_state.lock().unwrap().clone(); + info!("amiga_vel_subscriber got: {:?}", state); + + let x = state.twist.linear.x; + let theta = state.twist.angular.z; + + if x >= 1.0 && (theta - 0.5).abs() < 0.001 { + // We received twist state which achieved linear and angular vel as commanded. + break; + } + } + info!("amiga_vel_subscriber done"); +} diff --git a/src/main.rs b/src/main.rs index 33e0de5..9a79dcd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,8 +5,9 @@ use amiga_ros_bridge::grpc::farm_ng::canbus::proto::{ use clap::Parser; use futures::stream::Stream; use futures::StreamExt; +use log::warn; use rosrust::Time; -use tokio::sync::mpsc; +use tokio::sync::{broadcast, mpsc}; use tonic::Status; use tracing::{debug, info, trace, Level}; use tracing_subscriber::FmtSubscriber; @@ -32,15 +33,19 @@ impl Stream for RosToGrpcStream { ) -> std::task::Poll> { // Maps x (geometry_msgs::Twist) to SendVehicleTwistCommandRequest, unless x is // Poll::Pending (stream not ready yet). - self.rx.poll_recv(cx).map(|x| { - let twist = x.unwrap().unwrap(); - Some(SendVehicleTwistCommandRequest { - command: Some(Twist2d { - linear_velocity_x: twist.linear.x as f32, - linear_velocity_y: twist.linear.y as f32, - angular_velocity: twist.angular.z as f32, - }), - }) + self.rx.poll_recv(cx).map(|x| match x { + Some(msg) => { + debug!("x: {:?}", msg); + let twist = msg.unwrap(); + Some(SendVehicleTwistCommandRequest { + command: Some(Twist2d { + linear_velocity_x: twist.linear.x as f32, + linear_velocity_y: twist.linear.y as f32, + angular_velocity: twist.angular.z as f32, + }), + }) + } + None => None, }) } } @@ -88,27 +93,32 @@ impl AmgigRosBridgeGrpcClient { // async way to forward geometry_msgs::Twist to StreamVehicleTwistStateRequest async fn streams( &mut self, + mut shutdown_rx1: tokio::sync::broadcast::Receiver<()>, rx: tokio::sync::mpsc::Receiver>, - is_test_mode: bool, - ) { + ) -> Result<(), Box> { debug!("Attempting to connect to stream_vehicle_twist_state"); - let mut stream = self + let s = self .client .stream_vehicle_twist_state(StreamVehicleTwistStateRequest {}) - .await - .unwrap() - .into_inner(); + .await?; + + let mut stream = s.into_inner(); debug!("stream_vehicle_twist_state connected"); let handler = tokio::spawn(async move { - let state_pub = rosrust::publish("/amiga/vel", 100).unwrap(); - let mut count = 0; // message counter + let state_pub = match rosrust::publish("/amiga/vel", 100) { + Ok(s) => s, + Err(e) => panic!("Failed to start ros publisher: {}", e), + }; + let count = 0; // message counter // iterate over the stream while let Some(maybe_reply) = stream.next().await { - // println!("\treceived: {:?}", item.unwrap()); - let reply = maybe_reply.unwrap(); + let reply = match maybe_reply { + Ok(r) => r, + Err(e) => panic!("stream_vehicle_twist_state error: {}", e), + }; let twist_state: Twist2d = reply.state.unwrap(); // convert to ROS TwistStamped @@ -116,11 +126,6 @@ impl AmgigRosBridgeGrpcClient { // republish messages to ROS state_pub.send(msg).unwrap(); - count += 1; // increment message counter - if is_test_mode && count > 5000 { - info!("Test mode: shutting down command stream"); - break; - } } }); @@ -129,23 +134,22 @@ impl AmgigRosBridgeGrpcClient { let stream = self .client .send_vehicle_twist_command(RosToGrpcStream { rx }); - let mut stream = stream.await.unwrap().into_inner(); + let s = stream.await?; + let mut stream = s.into_inner(); debug!("send_vehicle_twist_command connected"); - let mut count = 0; - loop { // forward the stream to the server and do nothing with the reply - let _ = stream.next().await; - - count += 1; - if is_test_mode && count > 5000 { - info!("Test mode: shutting down state stream"); + tokio::select! { + _res = stream.next() => {}, + _ = shutdown_rx1.recv() => { + info!("Shutdown signal received; shutting down."); break; - } + }}; } - handler.await.unwrap(); + handler.abort(); + Ok(()) } // stream is dropped here and the disconnect info is send to server @@ -157,36 +161,28 @@ struct Args { host: String, #[arg(short, long, default_value_t = 50060)] port: u32, - #[arg(short, long, default_value_t = false)] - test_mode: bool, } -//#[tokio::main] -//async fn main() { -fn main() { +fn main() -> Result<(), Box> { // setting console log level - let subscriber = FmtSubscriber::builder() - .with_max_level(Level::INFO) - .finish(); - tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed"); + tracing::subscriber::set_global_default( + FmtSubscriber::builder() + .with_max_level(Level::INFO) + .finish(), + ) + .expect("setting default subscriber failed"); // parse arguments using the popular clap crate. let args: Args = Args::parse(); let host: String = args.host; let port: u32 = args.port; - let is_test_mode: bool = args.test_mode; - - if is_test_mode { - info!("Test mode: starting up"); - } // launching the tokio runtime debug!("Starting up tokio runtime"); let runtime = tokio::runtime::Builder::new_multi_thread() .worker_threads(1) .enable_all() - .build() - .unwrap(); + .build()?; // Async channels to pass messages across threads in an async tokio context. let (tx, rx): ( @@ -194,17 +190,24 @@ fn main() { tokio::sync::mpsc::Receiver>, ) = mpsc::channel(128); + let (shutdown_tx, shutdown_rx1) = broadcast::channel(16); + // set the address of the gRPC server let address: String = format!("http://{host}:{port}"); debug!("Connecting to gRPC server at {}", address); // the gRPC client takes the receiver rx. let handle = runtime.spawn(async move { - AmgigRosBridgeGrpcClient::connect(address) - .await - .unwrap() - .streams(rx, args.test_mode) - .await + match AmgigRosBridgeGrpcClient::connect(address).await { + Ok(mut client) => match client.streams(shutdown_rx1, rx).await { + Ok(_) => warn!("streaming finished."), + Err(_) => warn!("streaming error."), + }, + Err(e) => { + warn!("Error creating gRPC client: {}", e); + } + } + Ok::<(), Result<(), StdError>>(()) }); info!("Connected to gRPC server."); @@ -222,23 +225,36 @@ fn main() { 100, move |v: rosrust_msg::geometry_msgs::Twist| { debug!("/amiga/cmd_vel received: {:?}", v); - // using "blocking_send" since this closure is not "async". - tx.blocking_send(Ok(v)).unwrap(); + // Using "blocking_send" since this closure is not "async". + match tx.blocking_send(Ok(v)) { + Ok(_) => { + trace!("Re sending /amiga/cmd_vel to channel") + } + Err(e) => { + panic!("Unexpected transport error: {}", e); + } + } trace!("re-published to channel"); }, - ) - .unwrap(); + )?; debug!("Subscriber thread started to cmd_vel"); - runtime.block_on(handle).unwrap(); - - // NOTE: this will shutdown the runtime when the user presses Ctrl-C - //match tokio::signal::ctrl_c().await { - // Ok(_) => { - // info!("Ctrl-C received, shutting down"); - // } - // Err(e) => { - // eprintln!("Error: {}", e); - // } - //} + runtime.spawn(async move { + match tokio::signal::ctrl_c().await { + Ok(_) => { + info!("Ctrl-C received, shutting down"); + shutdown_tx.send(()).unwrap(); + } + Err(e) => { + warn!("Error: {}", e); + } + } + }); + + match runtime.block_on(handle) { + Ok(_) => {} + Err(e) => warn!("Error: {}", e), + } + + Ok(()) }