Skip to content

Commit

Permalink
Merge pull request #264 from google/263-add-new-port-forwarding-mode-…
Browse files Browse the repository at this point in the history
…to-cro3-dut-shell-or-other-command

dut monitor: Support local port forwarding option
  • Loading branch information
mhiramat authored Oct 16, 2024
2 parents 7711061 + 2bf22b2 commit 7d12fd6
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 28 deletions.
40 changes: 35 additions & 5 deletions src/cmd/dut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use cro3::dut::fetch_dut_info_in_parallel;
use cro3::dut::register_dut;
use cro3::dut::DutInfo;
use cro3::dut::MonitoredDut;
use cro3::dut::PortForwarding;
use cro3::dut::SshInfo;
use cro3::dut::SSH_CACHE;
use cro3::repo::get_cros_dir;
Expand All @@ -63,6 +64,7 @@ use cro3::servo::LocalServo;
use cro3::servo::ServoList;
use lazy_static::lazy_static;
use rayon::prelude::*;
use regex::Regex;
use termion::screen::IntoAlternateScreen;
use tracing::error;
use tracing::info;
Expand Down Expand Up @@ -180,8 +182,8 @@ fn run_dut_vnc(args: &ArgsVnc) -> Result<()> {
error!("Failed to kill previous vnc instance: {e}")
}

let mut child_kmsvnc = target.start_port_forwarding(vnc_port, 5900, "kmsvnc")?;
let mut child_novnc = target.start_port_forwarding(web_port, 6080, "novnc")?;
let mut child_kmsvnc = target.start_port_forwarding(vnc_port, 5900, "kmsvnc", &[])?;
let mut child_novnc = target.start_port_forwarding(web_port, 6080, "novnc", &[])?;

warn!("To use VNC via web browser, please open:");
warn!(" http://localhost:{web_port}/vnc.html");
Expand All @@ -202,19 +204,47 @@ fn run_dut_vnc(args: &ArgsVnc) -> Result<()> {
/// open a SSH monitor
#[argh(subcommand, name = "monitor")]
struct ArgsDutMonitor {
/// DUT identifiers to monitor
/// DUT identifiers to monitor. This accepts sub portforwardings after colon
/// (e.g. dut,ADDR:PORT,...)
#[argh(positional)]
duts: Vec<String>,
}

fn parse_fwport(fwport: &str, loport: u16) -> Result<PortForwarding> {
// Lazy IPv4/v6 matching
let v4_re = Regex::new(r"^(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}):(\d+)$").unwrap();
let v6_re = Regex::new(r"^(\[([0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}\]):(\d+)$").unwrap();

let (addr, port_str) = if let Some(caps) = v4_re.captures(fwport) {
(caps.get(1).unwrap().as_str(), caps.get(2).unwrap().as_str())
} else if let Some(caps) = v6_re.captures(fwport) {
(caps.get(1).unwrap().as_str(), caps.get(3).unwrap().as_str())
} else {
return Err(anyhow!("Failed to parse {fwport}"));
};
PortForwarding::new(loport, addr, port_str.parse()?)
}

fn run_dut_monitor(args: &ArgsDutMonitor) -> Result<()> {
cros::ensure_testing_rsa_is_there()?;
let mut targets: Vec<MonitoredDut> = Vec::new();
let mut port = 4022;

for dut in &args.duts {
targets.push(MonitoredDut::new(dut, port)?);
for raw_dut in &args.duts {
let ports: Vec<&str> = raw_dut.split(',').collect();
let mut fwports: Vec<PortForwarding> = vec![];
let dut_port = port;

port += 1;

if ports.len() > 1 {
for fwport in ports.iter().skip(1) {
fwports.push(parse_fwport(fwport, port)?);
port += 1;
}
}

targets.push(MonitoredDut::new(ports[0], dut_port, &fwports)?);
}

let mut screen = stdout().into_alternate_screen().unwrap();
Expand Down
110 changes: 87 additions & 23 deletions src/dut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use rand::thread_rng;
use rayon::prelude::*;
use regex::Regex;
use serde::{Deserialize, Serialize};
//use strum::additional_attributes;
use tracing::error;
use tracing::info;
use url::Url;
Expand Down Expand Up @@ -82,6 +83,30 @@ lazy_static! {

pub static SSH_CACHE: KvCache<SshInfo> = KvCache::new("ssh_cache");

/// PortForwarding represents Local/Remote forwarding on ssh
#[derive(Debug, Clone)]
pub struct PortForwarding {
fwport: u16,
address: String,
port: u16,
}
impl PortForwarding {
pub fn new(fwport: u16, address: &str, port: u16) -> Result<Self> {
let pfw = PortForwarding {
fwport,
address: address.to_string(),
port,
};
Ok(pfw)
}
pub fn to_ssh_args(&self) -> Vec<String> {
vec![
"-L".to_string(),
format!("{}:{}:{}", self.fwport, self.address, self.port),
]
}
}

/// MonitoredDut holds connection to a monitoring Dut
#[derive(Debug)]
pub struct MonitoredDut {
Expand All @@ -90,24 +115,39 @@ pub struct MonitoredDut {
port: u16,
child: Option<async_process::Child>,
reconnecting: bool,
fwports: Vec<PortForwarding>,
}

fn spawn_ssh_process(
ssh: &SshInfo,
port: u16,
fwports: &[PortForwarding],
) -> Result<async_process::Child> {
let opt_args: Vec<Vec<String>> = fwports.iter().map(|x| x.to_ssh_args()).collect();
let opt_args: Vec<String> = opt_args.concat();
let ref_args: Vec<&str> = opt_args.iter().map(|x| x.as_ref()).collect();

block_on(ssh.start_ssh_forwarding(port, &ref_args))
}

impl MonitoredDut {
pub fn new(dut: &str, port: u16) -> Result<Self> {
pub fn new(dut: &str, port: u16, fwports: &[PortForwarding]) -> Result<Self> {
let ssh = SshInfo::new(dut).context("failed to create SshInfo")?;
let dut = MonitoredDut {
ssh: ssh.clone(),
dut: dut.to_string(),
port,
child: block_on(ssh.start_ssh_forwarding(port)).ok(),
child: spawn_ssh_process(&ssh, port, fwports).ok(),
reconnecting: false,
fwports: fwports.to_vec(),
};
Ok(dut)
}
pub fn reconnecting(&self) -> bool {
self.reconnecting
}
fn reconnect(&mut self) -> Result<String> {
let new_child = block_on(self.ssh.start_ssh_forwarding(self.port));
let new_child = spawn_ssh_process(&self.ssh, self.port, &self.fwports);
if let Err(e) = &new_child {
error!("Failed to reconnect: {e:?}");
};
Expand All @@ -123,12 +163,23 @@ impl MonitoredDut {
match child.try_status()? {
None => {
self.reconnecting = false;
Ok(format!(
let status: String = format!(
"{:<31}\t127.0.0.1:{:<5}\t{}",
&self.dut,
self.port,
&self.ssh.host_and_port()
))
);
let status2: Vec<String> = self
.fwports
.iter()
.map(|fwp: &PortForwarding| {
format!(
"\n{:<31}\t127.0.0.1:{:<5}\t{}:{}",
" +-", fwp.fwport, &fwp.address, fwp.port
)
})
.collect();
Ok(status + status2.join("").as_str())
}
Some(_status) => self.reconnect(),
}
Expand Down Expand Up @@ -627,18 +678,23 @@ impl SshInfo {
port: u16,
dut_port: u16,
command: &str,
additional_ssh_args: &[&str],
) -> Result<async_process::Child> {
let fwport: String = format!("{}:127.0.0.1:{}", port, dut_port);
let mut args = vec![
"-L",
&fwport,
"-o",
"ExitOnForwardFailure yes",
"-o",
"ServerAliveInterval=5",
"-o",
"ServerAliveCountMax=1",
];
args.extend(additional_ssh_args.iter());

let child = self
.ssh_cmd_async(Some(&[
"-L",
&format!("{}:127.0.0.1:{}", port, dut_port),
"-o",
"ExitOnForwardFailure yes",
"-o",
"ServerAliveInterval=5",
"-o",
"ServerAliveCountMax=1",
]))?
.ssh_cmd_async(Some(&args))?
.arg(command)
.kill_on_drop(true)
.stdin(Stdio::piped())
Expand All @@ -649,18 +705,26 @@ impl SshInfo {
}
// Start SSH port forwarding on a given port.
// The future will be resolved once the first connection attempt is succeeded.
pub async fn start_ssh_forwarding(&self, port: u16) -> Result<async_process::Child> {
self.start_ssh_forwarding_in_range(Range {
start: port,
end: port + 1,
})
pub async fn start_ssh_forwarding(
&self,
port: u16,
additional_ssh_args: &[&str],
) -> Result<async_process::Child> {
self.start_ssh_forwarding_in_range(
Range {
start: port,
end: port + 1,
},
additional_ssh_args,
)
.await
.map(|e| e.0)
}
// Start SSH port forwarding in a given range without timeout.
async fn start_ssh_forwarding_in_range(
&self,
port_range: Range<u16>,
additional_ssh_args: &[&str],
) -> Result<(async_process::Child, u16)> {
const COMMON_PORT_FORWARD_TOKEN: &str = "cro3-ssh-portforward";
let sshcmd = &format!("echo {COMMON_PORT_FORWARD_TOKEN}; sleep 8h");
Expand All @@ -669,7 +733,7 @@ impl SshInfo {
ports.shuffle(&mut rng);
for port in ports {
// Try to establish port forwarding
let mut child = self.start_port_forwarding(port, 22, sshcmd)?;
let mut child = self.start_port_forwarding(port, 22, sshcmd, additional_ssh_args)?;
let (ssh_stdout, ssh_stderr) = get_async_lines(&mut child);
let ssh_stdout = ssh_stdout.context(anyhow!("ssh_stdout was None"))?;
let ssh_stderr = ssh_stderr.context(anyhow!("ssh_stderr was None"))?;
Expand Down Expand Up @@ -709,7 +773,7 @@ impl SshInfo {
/// start_ssh_forwarding, and the same port will be used for reconnecting
/// while this cro3 instance is running.
fn start_ssh_forwarding_background_in_range(&self, port_range: Range<u16>) -> Result<u16> {
let (mut child, port) = block_on(self.start_ssh_forwarding_in_range(port_range))?;
let (mut child, port) = block_on(self.start_ssh_forwarding_in_range(port_range, &[]))?;
let ssh = self.clone();
thread::spawn(move || {
block_on(async move {
Expand All @@ -718,7 +782,7 @@ impl SshInfo {
info!("cro3: SSH forwarding process exited with {status:?}");
loop {
info!("cro3: Reconnecting to {ssh:?}...");
if let Ok(new_child) = ssh.start_ssh_forwarding(port).await {
if let Ok(new_child) = ssh.start_ssh_forwarding(port, &[]).await {
child = new_child;
break;
}
Expand Down

0 comments on commit 7d12fd6

Please sign in to comment.