diff --git a/src/cmd/dut.rs b/src/cmd/dut.rs index 1890786e..d412ff1a 100644 --- a/src/cmd/dut.rs +++ b/src/cmd/dut.rs @@ -55,6 +55,7 @@ use cro3::dut::fetch_dut_info_in_parallel; use cro3::dut::register_dut; use cro3::dut::DutInfo; use cro3::dut::MonitoredDut; +use cro3::dut::PortForwarding; use cro3::dut::SshInfo; use cro3::dut::SSH_CACHE; use cro3::repo::get_cros_dir; @@ -63,6 +64,7 @@ use cro3::servo::LocalServo; use cro3::servo::ServoList; use lazy_static::lazy_static; use rayon::prelude::*; +use regex::Regex; use termion::screen::IntoAlternateScreen; use tracing::error; use tracing::info; @@ -180,8 +182,8 @@ fn run_dut_vnc(args: &ArgsVnc) -> Result<()> { error!("Failed to kill previous vnc instance: {e}") } - let mut child_kmsvnc = target.start_port_forwarding(vnc_port, 5900, "kmsvnc")?; - let mut child_novnc = target.start_port_forwarding(web_port, 6080, "novnc")?; + let mut child_kmsvnc = target.start_port_forwarding(vnc_port, 5900, "kmsvnc", &[])?; + let mut child_novnc = target.start_port_forwarding(web_port, 6080, "novnc", &[])?; warn!("To use VNC via web browser, please open:"); warn!(" http://localhost:{web_port}/vnc.html"); @@ -202,19 +204,47 @@ fn run_dut_vnc(args: &ArgsVnc) -> Result<()> { /// open a SSH monitor #[argh(subcommand, name = "monitor")] struct ArgsDutMonitor { - /// DUT identifiers to monitor + /// DUT identifiers to monitor. This accepts sub portforwardings after colon + /// (e.g. dut,ADDR:PORT,...) #[argh(positional)] duts: Vec, } +fn parse_fwport(fwport: &str, loport: u16) -> Result { + // Lazy IPv4/v6 matching + let v4_re = Regex::new(r"^(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}):(\d+)$").unwrap(); + let v6_re = Regex::new(r"^(\[([0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}\]):(\d+)$").unwrap(); + + let (addr, port_str) = if let Some(caps) = v4_re.captures(fwport) { + (caps.get(1).unwrap().as_str(), caps.get(2).unwrap().as_str()) + } else if let Some(caps) = v6_re.captures(fwport) { + (caps.get(1).unwrap().as_str(), caps.get(3).unwrap().as_str()) + } else { + return Err(anyhow!("Failed to parse {fwport}")); + }; + PortForwarding::new(loport, addr, port_str.parse()?) +} + fn run_dut_monitor(args: &ArgsDutMonitor) -> Result<()> { cros::ensure_testing_rsa_is_there()?; let mut targets: Vec = Vec::new(); let mut port = 4022; - for dut in &args.duts { - targets.push(MonitoredDut::new(dut, port)?); + for raw_dut in &args.duts { + let ports: Vec<&str> = raw_dut.split(',').collect(); + let mut fwports: Vec = vec![]; + let dut_port = port; + port += 1; + + if ports.len() > 1 { + for fwport in ports.iter().skip(1) { + fwports.push(parse_fwport(fwport, port)?); + port += 1; + } + } + + targets.push(MonitoredDut::new(ports[0], dut_port, &fwports)?); } let mut screen = stdout().into_alternate_screen().unwrap(); diff --git a/src/dut.rs b/src/dut.rs index 6ab89d59..2214e7cf 100644 --- a/src/dut.rs +++ b/src/dut.rs @@ -32,6 +32,7 @@ use rand::thread_rng; use rayon::prelude::*; use regex::Regex; use serde::{Deserialize, Serialize}; +//use strum::additional_attributes; use tracing::error; use tracing::info; use url::Url; @@ -82,6 +83,30 @@ lazy_static! { pub static SSH_CACHE: KvCache = KvCache::new("ssh_cache"); +/// PortForwarding represents Local/Remote forwarding on ssh +#[derive(Debug, Clone)] +pub struct PortForwarding { + fwport: u16, + address: String, + port: u16, +} +impl PortForwarding { + pub fn new(fwport: u16, address: &str, port: u16) -> Result { + let pfw = PortForwarding { + fwport, + address: address.to_string(), + port, + }; + Ok(pfw) + } + pub fn to_ssh_args(&self) -> Vec { + vec![ + "-L".to_string(), + format!("{}:{}:{}", self.fwport, self.address, self.port), + ] + } +} + /// MonitoredDut holds connection to a monitoring Dut #[derive(Debug)] pub struct MonitoredDut { @@ -90,16 +115,31 @@ pub struct MonitoredDut { port: u16, child: Option, reconnecting: bool, + fwports: Vec, } + +fn spawn_ssh_process( + ssh: &SshInfo, + port: u16, + fwports: &[PortForwarding], +) -> Result { + let opt_args: Vec> = fwports.iter().map(|x| x.to_ssh_args()).collect(); + let opt_args: Vec = opt_args.concat(); + let ref_args: Vec<&str> = opt_args.iter().map(|x| x.as_ref()).collect(); + + block_on(ssh.start_ssh_forwarding(port, &ref_args)) +} + impl MonitoredDut { - pub fn new(dut: &str, port: u16) -> Result { + pub fn new(dut: &str, port: u16, fwports: &[PortForwarding]) -> Result { let ssh = SshInfo::new(dut).context("failed to create SshInfo")?; let dut = MonitoredDut { ssh: ssh.clone(), dut: dut.to_string(), port, - child: block_on(ssh.start_ssh_forwarding(port)).ok(), + child: spawn_ssh_process(&ssh, port, fwports).ok(), reconnecting: false, + fwports: fwports.to_vec(), }; Ok(dut) } @@ -107,7 +147,7 @@ impl MonitoredDut { self.reconnecting } fn reconnect(&mut self) -> Result { - let new_child = block_on(self.ssh.start_ssh_forwarding(self.port)); + let new_child = spawn_ssh_process(&self.ssh, self.port, &self.fwports); if let Err(e) = &new_child { error!("Failed to reconnect: {e:?}"); }; @@ -123,12 +163,23 @@ impl MonitoredDut { match child.try_status()? { None => { self.reconnecting = false; - Ok(format!( + let status: String = format!( "{:<31}\t127.0.0.1:{:<5}\t{}", &self.dut, self.port, &self.ssh.host_and_port() - )) + ); + let status2: Vec = self + .fwports + .iter() + .map(|fwp: &PortForwarding| { + format!( + "\n{:<31}\t127.0.0.1:{:<5}\t{}:{}", + " +-", fwp.fwport, &fwp.address, fwp.port + ) + }) + .collect(); + Ok(status + status2.join("").as_str()) } Some(_status) => self.reconnect(), } @@ -627,18 +678,23 @@ impl SshInfo { port: u16, dut_port: u16, command: &str, + additional_ssh_args: &[&str], ) -> Result { + let fwport: String = format!("{}:127.0.0.1:{}", port, dut_port); + let mut args = vec![ + "-L", + &fwport, + "-o", + "ExitOnForwardFailure yes", + "-o", + "ServerAliveInterval=5", + "-o", + "ServerAliveCountMax=1", + ]; + args.extend(additional_ssh_args.iter()); + let child = self - .ssh_cmd_async(Some(&[ - "-L", - &format!("{}:127.0.0.1:{}", port, dut_port), - "-o", - "ExitOnForwardFailure yes", - "-o", - "ServerAliveInterval=5", - "-o", - "ServerAliveCountMax=1", - ]))? + .ssh_cmd_async(Some(&args))? .arg(command) .kill_on_drop(true) .stdin(Stdio::piped()) @@ -649,11 +705,18 @@ impl SshInfo { } // Start SSH port forwarding on a given port. // The future will be resolved once the first connection attempt is succeeded. - pub async fn start_ssh_forwarding(&self, port: u16) -> Result { - self.start_ssh_forwarding_in_range(Range { - start: port, - end: port + 1, - }) + pub async fn start_ssh_forwarding( + &self, + port: u16, + additional_ssh_args: &[&str], + ) -> Result { + self.start_ssh_forwarding_in_range( + Range { + start: port, + end: port + 1, + }, + additional_ssh_args, + ) .await .map(|e| e.0) } @@ -661,6 +724,7 @@ impl SshInfo { async fn start_ssh_forwarding_in_range( &self, port_range: Range, + additional_ssh_args: &[&str], ) -> Result<(async_process::Child, u16)> { const COMMON_PORT_FORWARD_TOKEN: &str = "cro3-ssh-portforward"; let sshcmd = &format!("echo {COMMON_PORT_FORWARD_TOKEN}; sleep 8h"); @@ -669,7 +733,7 @@ impl SshInfo { ports.shuffle(&mut rng); for port in ports { // Try to establish port forwarding - let mut child = self.start_port_forwarding(port, 22, sshcmd)?; + let mut child = self.start_port_forwarding(port, 22, sshcmd, additional_ssh_args)?; let (ssh_stdout, ssh_stderr) = get_async_lines(&mut child); let ssh_stdout = ssh_stdout.context(anyhow!("ssh_stdout was None"))?; let ssh_stderr = ssh_stderr.context(anyhow!("ssh_stderr was None"))?; @@ -709,7 +773,7 @@ impl SshInfo { /// start_ssh_forwarding, and the same port will be used for reconnecting /// while this cro3 instance is running. fn start_ssh_forwarding_background_in_range(&self, port_range: Range) -> Result { - let (mut child, port) = block_on(self.start_ssh_forwarding_in_range(port_range))?; + let (mut child, port) = block_on(self.start_ssh_forwarding_in_range(port_range, &[]))?; let ssh = self.clone(); thread::spawn(move || { block_on(async move { @@ -718,7 +782,7 @@ impl SshInfo { info!("cro3: SSH forwarding process exited with {status:?}"); loop { info!("cro3: Reconnecting to {ssh:?}..."); - if let Ok(new_child) = ssh.start_ssh_forwarding(port).await { + if let Ok(new_child) = ssh.start_ssh_forwarding(port, &[]).await { child = new_child; break; }