From 0749a61144bfdc33253649833078c79bb8906b48 Mon Sep 17 00:00:00 2001 From: Masami Hiramatsu Date: Wed, 28 Aug 2024 01:24:40 +0900 Subject: [PATCH 1/5] dut monitor: Support local port forwarding option Add the options for local port forwarding for each DUT on `cro3 dut monitor`. This allows user to make DUT working as gateway. e.g. cro3 dut monitor mydut,192.168.111.222:3456 This will map some local port to 192.168.111.222:3456 via mydut. --- src/cmd/dut.rs | 25 ++++++++--- src/dut.rs | 115 +++++++++++++++++++++++++++++++++++++++---------- 2 files changed, 112 insertions(+), 28 deletions(-) diff --git a/src/cmd/dut.rs b/src/cmd/dut.rs index 1890786e..1bcb3753 100644 --- a/src/cmd/dut.rs +++ b/src/cmd/dut.rs @@ -55,6 +55,7 @@ use cro3::dut::fetch_dut_info_in_parallel; use cro3::dut::register_dut; use cro3::dut::DutInfo; use cro3::dut::MonitoredDut; +use cro3::dut::PortForwarding; use cro3::dut::SshInfo; use cro3::dut::SSH_CACHE; use cro3::repo::get_cros_dir; @@ -180,8 +181,8 @@ fn run_dut_vnc(args: &ArgsVnc) -> Result<()> { error!("Failed to kill previous vnc instance: {e}") } - let mut child_kmsvnc = target.start_port_forwarding(vnc_port, 5900, "kmsvnc")?; - let mut child_novnc = target.start_port_forwarding(web_port, 6080, "novnc")?; + let mut child_kmsvnc = target.start_port_forwarding(vnc_port, 5900, "kmsvnc", None)?; + let mut child_novnc = target.start_port_forwarding(web_port, 6080, "novnc", None)?; warn!("To use VNC via web browser, please open:"); warn!(" http://localhost:{web_port}/vnc.html"); @@ -202,7 +203,7 @@ fn run_dut_vnc(args: &ArgsVnc) -> Result<()> { /// open a SSH monitor #[argh(subcommand, name = "monitor")] struct ArgsDutMonitor { - /// DUT identifiers to monitor + /// DUT identifiers to monitor. This accepts sub portforwardings after colon (e.g. dut,ADDR:PORT,...) #[argh(positional)] duts: Vec, } @@ -212,9 +213,23 @@ fn run_dut_monitor(args: &ArgsDutMonitor) -> Result<()> { let mut targets: Vec = Vec::new(); let mut port = 4022; - for dut in &args.duts { - targets.push(MonitoredDut::new(dut, port)?); + for raw_dut in &args.duts { + let ports: Vec<&str> = raw_dut.split(',').collect(); + let mut fwports: Vec = vec![]; + let dut_port = port; + port += 1; + + if ports.len() > 1 { + for fwport in ports.iter().skip(1) { + let parts: Vec<&str> = fwport.split(':').collect(); + let port_no: u16 = parts[1].parse().expect("Invalid port number."); + fwports.push(PortForwarding::new(port, parts[0], port_no)?); + port += 1; + } + } + + targets.push(MonitoredDut::new(ports[0], dut_port, &fwports)?); } let mut screen = stdout().into_alternate_screen().unwrap(); diff --git a/src/dut.rs b/src/dut.rs index 6ab89d59..f64898e0 100644 --- a/src/dut.rs +++ b/src/dut.rs @@ -32,6 +32,7 @@ use rand::thread_rng; use rayon::prelude::*; use regex::Regex; use serde::{Deserialize, Serialize}; +//use strum::additional_attributes; use tracing::error; use tracing::info; use url::Url; @@ -82,6 +83,30 @@ lazy_static! { pub static SSH_CACHE: KvCache = KvCache::new("ssh_cache"); +/// PortForwarding represents Local/Remote forwarding on ssh +#[derive(Debug, Clone)] +pub struct PortForwarding { + fwport: u16, + address: String, + port: u16, +} +impl PortForwarding { + pub fn new(fwport: u16, address: &str, port: u16) -> Result { + let pfw = PortForwarding { + fwport, + address: address.to_string(), + port, + }; + Ok(pfw) + } + pub fn to_ssh_args(&self) -> Vec { + vec![ + "-L".to_string(), + format!("{}:{}:{}", self.fwport, self.address, self.port), + ] + } +} + /// MonitoredDut holds connection to a monitoring Dut #[derive(Debug)] pub struct MonitoredDut { @@ -90,16 +115,37 @@ pub struct MonitoredDut { port: u16, child: Option, reconnecting: bool, + fwports: Vec, +} + +fn spawn_ssh_process( + ssh: &SshInfo, + port: u16, + fwports: &[PortForwarding], +) -> Result { + let opt_args: Vec> = fwports.iter().map(|x| x.to_ssh_args()).collect(); + let opt_args: Vec = opt_args.concat(); + let ref_args: Vec<&str> = opt_args.iter().map(|x| x.as_ref()).collect(); + + block_on(ssh.start_ssh_forwarding( + port, + match ref_args.len() > 1 { + true => Some(&ref_args), + false => None, + }, + )) } + impl MonitoredDut { - pub fn new(dut: &str, port: u16) -> Result { + pub fn new(dut: &str, port: u16, fwports: &[PortForwarding]) -> Result { let ssh = SshInfo::new(dut).context("failed to create SshInfo")?; let dut = MonitoredDut { ssh: ssh.clone(), dut: dut.to_string(), port, - child: block_on(ssh.start_ssh_forwarding(port)).ok(), + child: spawn_ssh_process(&ssh, port, fwports).ok(), reconnecting: false, + fwports: fwports.to_vec(), }; Ok(dut) } @@ -107,7 +153,7 @@ impl MonitoredDut { self.reconnecting } fn reconnect(&mut self) -> Result { - let new_child = block_on(self.ssh.start_ssh_forwarding(self.port)); + let new_child = spawn_ssh_process(&self.ssh, self.port, &self.fwports); if let Err(e) = &new_child { error!("Failed to reconnect: {e:?}"); }; @@ -123,12 +169,20 @@ impl MonitoredDut { match child.try_status()? { None => { self.reconnecting = false; - Ok(format!( + let mut status = format!( "{:<31}\t127.0.0.1:{:<5}\t{}", &self.dut, self.port, &self.ssh.host_and_port() - )) + ); + self.fwports.iter().for_each(|fwp: &PortForwarding| { + status += format!( + "\n{:<31}\t127.0.0.1:{:<5}\t{}:{}", + " +-", fwp.fwport, &fwp.address, fwp.port + ) + .as_str() + }); + Ok(status) } Some(_status) => self.reconnect(), } @@ -627,18 +681,25 @@ impl SshInfo { port: u16, dut_port: u16, command: &str, + additional_ssh_args: Option<&Vec<&str>>, ) -> Result { + let fixed_args = [ + "-L", + &format!("{}:127.0.0.1:{}", port, dut_port), + "-o", + "ExitOnForwardFailure yes", + "-o", + "ServerAliveInterval=5", + "-o", + "ServerAliveCountMax=1", + ]; + let mut args: Vec<&str> = fixed_args.to_vec(); + if let Some(opt_args) = additional_ssh_args { + args.extend(opt_args.iter()); + } + let child = self - .ssh_cmd_async(Some(&[ - "-L", - &format!("{}:127.0.0.1:{}", port, dut_port), - "-o", - "ExitOnForwardFailure yes", - "-o", - "ServerAliveInterval=5", - "-o", - "ServerAliveCountMax=1", - ]))? + .ssh_cmd_async(Some(&args))? .arg(command) .kill_on_drop(true) .stdin(Stdio::piped()) @@ -649,11 +710,18 @@ impl SshInfo { } // Start SSH port forwarding on a given port. // The future will be resolved once the first connection attempt is succeeded. - pub async fn start_ssh_forwarding(&self, port: u16) -> Result { - self.start_ssh_forwarding_in_range(Range { - start: port, - end: port + 1, - }) + pub async fn start_ssh_forwarding( + &self, + port: u16, + additional_ssh_args: Option<&Vec<&str>>, + ) -> Result { + self.start_ssh_forwarding_in_range( + Range { + start: port, + end: port + 1, + }, + additional_ssh_args, + ) .await .map(|e| e.0) } @@ -661,6 +729,7 @@ impl SshInfo { async fn start_ssh_forwarding_in_range( &self, port_range: Range, + additional_ssh_args: Option<&Vec<&str>>, ) -> Result<(async_process::Child, u16)> { const COMMON_PORT_FORWARD_TOKEN: &str = "cro3-ssh-portforward"; let sshcmd = &format!("echo {COMMON_PORT_FORWARD_TOKEN}; sleep 8h"); @@ -669,7 +738,7 @@ impl SshInfo { ports.shuffle(&mut rng); for port in ports { // Try to establish port forwarding - let mut child = self.start_port_forwarding(port, 22, sshcmd)?; + let mut child = self.start_port_forwarding(port, 22, sshcmd, additional_ssh_args)?; let (ssh_stdout, ssh_stderr) = get_async_lines(&mut child); let ssh_stdout = ssh_stdout.context(anyhow!("ssh_stdout was None"))?; let ssh_stderr = ssh_stderr.context(anyhow!("ssh_stderr was None"))?; @@ -709,7 +778,7 @@ impl SshInfo { /// start_ssh_forwarding, and the same port will be used for reconnecting /// while this cro3 instance is running. fn start_ssh_forwarding_background_in_range(&self, port_range: Range) -> Result { - let (mut child, port) = block_on(self.start_ssh_forwarding_in_range(port_range))?; + let (mut child, port) = block_on(self.start_ssh_forwarding_in_range(port_range, None))?; let ssh = self.clone(); thread::spawn(move || { block_on(async move { @@ -718,7 +787,7 @@ impl SshInfo { info!("cro3: SSH forwarding process exited with {status:?}"); loop { info!("cro3: Reconnecting to {ssh:?}..."); - if let Ok(new_child) = ssh.start_ssh_forwarding(port).await { + if let Ok(new_child) = ssh.start_ssh_forwarding(port, None).await { child = new_child; break; } From 89bd627db7cfddec3673258966fe8aa38cf2a197 Mon Sep 17 00:00:00 2001 From: Masami Hiramatsu Date: Sat, 7 Sep 2024 14:51:52 +0900 Subject: [PATCH 2/5] dut monitor: Accept IPv6 for port fowarding Accept (very lazy)IPv6 address for port fowarding. User can pass the address as; cro3 dut monitor DUT_NAME,IPV4:PORT,[IPV6]:port --- src/cmd/dut.rs | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/src/cmd/dut.rs b/src/cmd/dut.rs index 1bcb3753..505911a4 100644 --- a/src/cmd/dut.rs +++ b/src/cmd/dut.rs @@ -64,6 +64,7 @@ use cro3::servo::LocalServo; use cro3::servo::ServoList; use lazy_static::lazy_static; use rayon::prelude::*; +use regex::Regex; use termion::screen::IntoAlternateScreen; use tracing::error; use tracing::info; @@ -203,11 +204,28 @@ fn run_dut_vnc(args: &ArgsVnc) -> Result<()> { /// open a SSH monitor #[argh(subcommand, name = "monitor")] struct ArgsDutMonitor { - /// DUT identifiers to monitor. This accepts sub portforwardings after colon (e.g. dut,ADDR:PORT,...) + /// DUT identifiers to monitor. This accepts sub portforwardings after colon + /// (e.g. dut,ADDR:PORT,...) #[argh(positional)] duts: Vec, } +fn parse_fwport(fwport: &str, loport: u16) -> Result { + // Lazy IPv4/v6 matching + let v4_re = Regex::new(r"^(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}):(\d+)$").unwrap(); + let v6_re = Regex::new(r"^(\[([0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}\]):(\d+)$").unwrap(); + + let (addr, port_str) = if let Some(caps) = v4_re.captures(fwport) { + (caps.get(1).unwrap().as_str(), caps.get(2).unwrap().as_str()) + } else if let Some(caps) = v6_re.captures(fwport) { + (caps.get(1).unwrap().as_str(), caps.get(3).unwrap().as_str()) + } else { + bail!("Failed to parse {fwport}") + }; + let port_no: u16 = port_str.parse().expect("Invalid port number."); + PortForwarding::new(loport, addr, port_no) +} + fn run_dut_monitor(args: &ArgsDutMonitor) -> Result<()> { cros::ensure_testing_rsa_is_there()?; let mut targets: Vec = Vec::new(); @@ -222,9 +240,7 @@ fn run_dut_monitor(args: &ArgsDutMonitor) -> Result<()> { if ports.len() > 1 { for fwport in ports.iter().skip(1) { - let parts: Vec<&str> = fwport.split(':').collect(); - let port_no: u16 = parts[1].parse().expect("Invalid port number."); - fwports.push(PortForwarding::new(port, parts[0], port_no)?); + fwports.push(parse_fwport(fwport, port)?); port += 1; } } From f5be03ef3396771702ead9be6e7ac3dac95413a1 Mon Sep 17 00:00:00 2001 From: Masami Hiramatsu Date: Sat, 7 Sep 2024 15:26:23 +0900 Subject: [PATCH 3/5] dut monitor: Remove unneeded mut for current status Instead of pushing each sub status to the DUT status, just create another status vector string and join it. --- src/dut.rs | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/src/dut.rs b/src/dut.rs index f64898e0..19b76bb2 100644 --- a/src/dut.rs +++ b/src/dut.rs @@ -169,20 +169,23 @@ impl MonitoredDut { match child.try_status()? { None => { self.reconnecting = false; - let mut status = format!( + let status: String = format!( "{:<31}\t127.0.0.1:{:<5}\t{}", &self.dut, self.port, &self.ssh.host_and_port() ); - self.fwports.iter().for_each(|fwp: &PortForwarding| { - status += format!( - "\n{:<31}\t127.0.0.1:{:<5}\t{}:{}", - " +-", fwp.fwport, &fwp.address, fwp.port - ) - .as_str() - }); - Ok(status) + let status2: Vec = self + .fwports + .iter() + .map(|fwp: &PortForwarding| { + format!( + "\n{:<31}\t127.0.0.1:{:<5}\t{}:{}", + " +-", fwp.fwport, &fwp.address, fwp.port + ) + }) + .collect(); + Ok(status + status2.join("").as_str()) } Some(_status) => self.reconnect(), } From a7400a81fa1f3130180311c6ca6d2693db398709 Mon Sep 17 00:00:00 2001 From: Masami Hiramatsu Date: Sat, 7 Sep 2024 17:35:04 +0900 Subject: [PATCH 4/5] dut: simplify Option<&Vec<&str>> to &[&str] There is no reason to pass Vec with Option, and if it is not mutable the slice is enough. --- src/cmd/dut.rs | 4 ++-- src/dut.rs | 28 ++++++++++------------------ 2 files changed, 12 insertions(+), 20 deletions(-) diff --git a/src/cmd/dut.rs b/src/cmd/dut.rs index 505911a4..7e0a650f 100644 --- a/src/cmd/dut.rs +++ b/src/cmd/dut.rs @@ -182,8 +182,8 @@ fn run_dut_vnc(args: &ArgsVnc) -> Result<()> { error!("Failed to kill previous vnc instance: {e}") } - let mut child_kmsvnc = target.start_port_forwarding(vnc_port, 5900, "kmsvnc", None)?; - let mut child_novnc = target.start_port_forwarding(web_port, 6080, "novnc", None)?; + let mut child_kmsvnc = target.start_port_forwarding(vnc_port, 5900, "kmsvnc", &[])?; + let mut child_novnc = target.start_port_forwarding(web_port, 6080, "novnc", &[])?; warn!("To use VNC via web browser, please open:"); warn!(" http://localhost:{web_port}/vnc.html"); diff --git a/src/dut.rs b/src/dut.rs index 19b76bb2..2214e7cf 100644 --- a/src/dut.rs +++ b/src/dut.rs @@ -127,13 +127,7 @@ fn spawn_ssh_process( let opt_args: Vec = opt_args.concat(); let ref_args: Vec<&str> = opt_args.iter().map(|x| x.as_ref()).collect(); - block_on(ssh.start_ssh_forwarding( - port, - match ref_args.len() > 1 { - true => Some(&ref_args), - false => None, - }, - )) + block_on(ssh.start_ssh_forwarding(port, &ref_args)) } impl MonitoredDut { @@ -684,11 +678,12 @@ impl SshInfo { port: u16, dut_port: u16, command: &str, - additional_ssh_args: Option<&Vec<&str>>, + additional_ssh_args: &[&str], ) -> Result { - let fixed_args = [ + let fwport: String = format!("{}:127.0.0.1:{}", port, dut_port); + let mut args = vec![ "-L", - &format!("{}:127.0.0.1:{}", port, dut_port), + &fwport, "-o", "ExitOnForwardFailure yes", "-o", @@ -696,10 +691,7 @@ impl SshInfo { "-o", "ServerAliveCountMax=1", ]; - let mut args: Vec<&str> = fixed_args.to_vec(); - if let Some(opt_args) = additional_ssh_args { - args.extend(opt_args.iter()); - } + args.extend(additional_ssh_args.iter()); let child = self .ssh_cmd_async(Some(&args))? @@ -716,7 +708,7 @@ impl SshInfo { pub async fn start_ssh_forwarding( &self, port: u16, - additional_ssh_args: Option<&Vec<&str>>, + additional_ssh_args: &[&str], ) -> Result { self.start_ssh_forwarding_in_range( Range { @@ -732,7 +724,7 @@ impl SshInfo { async fn start_ssh_forwarding_in_range( &self, port_range: Range, - additional_ssh_args: Option<&Vec<&str>>, + additional_ssh_args: &[&str], ) -> Result<(async_process::Child, u16)> { const COMMON_PORT_FORWARD_TOKEN: &str = "cro3-ssh-portforward"; let sshcmd = &format!("echo {COMMON_PORT_FORWARD_TOKEN}; sleep 8h"); @@ -781,7 +773,7 @@ impl SshInfo { /// start_ssh_forwarding, and the same port will be used for reconnecting /// while this cro3 instance is running. fn start_ssh_forwarding_background_in_range(&self, port_range: Range) -> Result { - let (mut child, port) = block_on(self.start_ssh_forwarding_in_range(port_range, None))?; + let (mut child, port) = block_on(self.start_ssh_forwarding_in_range(port_range, &[]))?; let ssh = self.clone(); thread::spawn(move || { block_on(async move { @@ -790,7 +782,7 @@ impl SshInfo { info!("cro3: SSH forwarding process exited with {status:?}"); loop { info!("cro3: Reconnecting to {ssh:?}..."); - if let Ok(new_child) = ssh.start_ssh_forwarding(port, None).await { + if let Ok(new_child) = ssh.start_ssh_forwarding(port, &[]).await { child = new_child; break; } From 2bf22b2da2100b48608296d37c7aa025aa4455e9 Mon Sep 17 00:00:00 2001 From: Masami Hiramatsu Date: Sat, 7 Sep 2024 18:11:54 +0900 Subject: [PATCH 5/5] cmd/dut: Simplify parse_fwport() Simply return an errror instead of abort, and parse port_str without temporary variable. --- src/cmd/dut.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/cmd/dut.rs b/src/cmd/dut.rs index 7e0a650f..d412ff1a 100644 --- a/src/cmd/dut.rs +++ b/src/cmd/dut.rs @@ -220,10 +220,9 @@ fn parse_fwport(fwport: &str, loport: u16) -> Result { } else if let Some(caps) = v6_re.captures(fwport) { (caps.get(1).unwrap().as_str(), caps.get(3).unwrap().as_str()) } else { - bail!("Failed to parse {fwport}") + return Err(anyhow!("Failed to parse {fwport}")); }; - let port_no: u16 = port_str.parse().expect("Invalid port number."); - PortForwarding::new(loport, addr, port_no) + PortForwarding::new(loport, addr, port_str.parse()?) } fn run_dut_monitor(args: &ArgsDutMonitor) -> Result<()> {