diff --git a/src/bin/magman-remote.rs b/src/bin/magman-remote.rs deleted file mode 100644 index 7193475..0000000 --- a/src/bin/magman-remote.rs +++ /dev/null @@ -1,9 +0,0 @@ -// [[file:../../magman.note::45c2eed2][45c2eed2]] -use gut::prelude::*; - -fn main() -> Result<()> { - magman::cli::remote_enter_main()?; - - Ok(()) -} -// 45c2eed2 ends here diff --git a/src/bin/magman-server.rs b/src/bin/magman-server.rs deleted file mode 100644 index d4b3e79..0000000 --- a/src/bin/magman-server.rs +++ /dev/null @@ -1,9 +0,0 @@ -// [[file:../../magman.note::32e407b1][32e407b1]] -use gut::prelude::*; - -fn main() -> Result<()> { - magman::cli::server_enter_main()?; - - Ok(()) -} -// 32e407b1 ends here diff --git a/src/cli.rs b/src/cli.rs index 8169c0a..e308379 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -26,135 +26,3 @@ pub fn magorder_enter_main() -> Result<()> { Ok(()) } // ae28adb7 ends here - -// [[file:../magman.note::2ab6d5de][2ab6d5de]] -use crate::remote::{Client, Server}; -// 2ab6d5de ends here - -// [[file:../magman.note::512e88e7][512e88e7]] -/// A client of a unix domain socket server for interacting with the program -/// run in background -#[derive(StructOpt)] -struct ClientCli { - /// Path to the socket file to connect - #[structopt(short = 'u', default_value = "vasp.sock")] - socket_file: PathBuf, - - #[clap(subcommand)] - action: ClientAction, -} - -#[derive(Subcommand)] -enum ClientAction { - Run { - #[clap(flatten)] - run: ClientRun, - }, - /// Request server to add a new node for remote computation. - AddNode { - /// The node to be added into node list for remote computation. - node: String, - }, -} - -#[derive(StructOpt)] -/// request server to run a cmd -struct ClientRun { - /// The cmd to run in remote session - cmd: String, - - /// The working dir to run the cmd - #[structopt(long, default_value = ".")] - wrk_dir: PathBuf, -} - -impl ClientCli { - async fn enter_main(self) -> Result<()> { - // wait a moment for socke file ready - let timeout = 5; - wait_file(&self.socket_file, timeout)?; - - let mut stream = Client::connect(&self.socket_file).await?; - match self.action { - ClientAction::Run { run } => { - let wrk_dir = run.wrk_dir.canonicalize()?; - let wrk_dir = wrk_dir.to_string_lossy(); - stream.interact_with_remote_session(&run.cmd, &wrk_dir).await?; - } - ClientAction::AddNode { node } => { - stream.add_node(node).await?; - } - } - - Ok(()) - } -} -// 512e88e7 ends here - -// [[file:../magman.note::674c2404][674c2404]] -/// A helper program to run VASP calculation in remote node -#[derive(Debug, StructOpt)] -struct ServerCli { - /// Path to the socket file to bind (only valid for interactive calculation) - #[structopt(default_value = "magman.sock")] - socket_file: PathBuf, - - /// The remote nodes for calculations - #[structopt(long, required = true, use_delimiter = true)] - nodes: Vec, -} - -impl ServerCli { - async fn enter_main(self) -> Result<()> { - debug!("Run VASP for interactive calculation ..."); - Server::create(&self.socket_file)?.run_and_serve(self.nodes).await?; - - Ok(()) - } -} -// 674c2404 ends here - -// [[file:../magman.note::5f9971ad][5f9971ad]] -#[derive(Parser)] -struct Cli { - #[clap(flatten)] - verbose: Verbosity, - - #[clap(subcommand)] - command: Commands, -} - -#[derive(Subcommand)] -enum Commands { - Client { - #[clap(flatten)] - client: ClientCli, - }, - Server { - #[clap(flatten)] - server: ServerCli, - }, -} - -#[tokio::main] -pub async fn remote_enter_main() -> Result<()> { - let args = Cli::from_args(); - args.verbose.setup_logger(); - - match args.command { - Commands::Client { client } => { - client.enter_main().await?; - } - Commands::Server { server } => { - debug!("Run VASP for interactive calculation ..."); - server.enter_main().await?; - } - } - - Ok(()) -} -// 5f9971ad ends here - -// [[file:../magman.note::bf91d369][bf91d369]] -pub use crate::restful::cli::*; -// bf91d369 ends here diff --git a/src/interactive.rs b/src/interactive.rs deleted file mode 100644 index a71f9ac..0000000 --- a/src/interactive.rs +++ /dev/null @@ -1,236 +0,0 @@ -// [[file:../magman.note::*docs][docs:1]] -//! This mod is for VASP interactive calculations. -// docs:1 ends here - -// [[file:../magman.note::ae9e9435][ae9e9435]] -use super::*; -// use crate::job::Job; -use crate::restful::Job; - -use tokio::sync::oneshot; -// ae9e9435 ends here - -// [[file:../magman.note::e899191b][e899191b]] -#[derive(Debug)] -// cmd + working_dir + done? -struct Interaction(String, String, oneshot::Sender); - -/// The message sent from client for controlling child process -#[derive(Debug, Clone)] -enum Control { - Quit, - Pause, - Resume, - AddNode(String), -} - -type InteractionOutput = String; -type RxInteractionOutput = tokio::sync::watch::Receiver; -type TxInteractionOutput = tokio::sync::watch::Sender; -type RxInteraction = tokio::sync::mpsc::Receiver; -type TxInteraction = tokio::sync::mpsc::Sender; -type RxControl = tokio::sync::mpsc::Receiver; -type TxControl = tokio::sync::mpsc::Sender; -// e899191b ends here - -// [[file:../magman.note::de5d8bd5][de5d8bd5]] -use crate::job::Node; - -fn shell_script_for_job(cmd: &str, wrk_dir: &std::path::Path, node: &Node) -> String { - let node_name = node.name(); - let wrk_dir = wrk_dir.shell_escape_lossy(); - - format!( - "#! /usr/bin/env bash -cd {wrk_dir} -{cmd} -" - ) -} - -fn create_job_for_remote_session(cmd: &str, wrk_dir: &str, node: &Node) -> Job { - debug!("run cmd {cmd:?} on remote node: {node:?}"); - let script = shell_script_for_job(cmd, wrk_dir.as_ref(), node); - - Job::new(&script) -} -// de5d8bd5 ends here - -// [[file:../magman.note::d88217da][d88217da]] -#[derive(Clone)] -/// Manage client requests in threading environment -pub struct TaskClient { - // for send client request for pause, resume, stop computation on server side - tx_ctl: TxControl, - // for interaction with child process on server side - tx_int: TxInteraction, -} - -mod taskclient { - use super::*; - - impl TaskClient { - pub async fn interact(&mut self, cmd: &str, wrk_dir: &str) -> Result { - // FIXME: refactor - let (tx_resp, rx_resp) = oneshot::channel(); - self.tx_int.send(Interaction(cmd.into(), wrk_dir.into(), tx_resp)).await?; - let out = rx_resp.await?; - Ok(out) - } - - /// Request the server to pause computation - pub async fn pause(&self) -> Result<()> { - trace!("send pause task msg"); - self.tx_ctl.send(Control::Pause).await?; - Ok(()) - } - - /// Request the server to resume computation - pub async fn resume(&self) -> Result<()> { - trace!("send resume task msg"); - self.tx_ctl.send(Control::Resume).await?; - Ok(()) - } - - /// Add one remote node into list for computation - pub async fn add_node(&self, node: String) -> Result<()> { - trace!("send add_node ctl msg"); - self.tx_ctl.send(Control::AddNode(node)).await?; - Ok(()) - } - - /// Request the server to terminate computation - pub async fn terminate(&self) -> Result<()> { - trace!("send quit task msg"); - self.tx_ctl.send(Control::Quit).await?; - Ok(()) - } - } -} -// d88217da ends here - -// [[file:../magman.note::7b4ac45b][7b4ac45b]] -use crate::job::Nodes; - -pub struct TaskServer { - // for receiving interaction message for child process - rx_int: Option, - // for controlling child process - rx_ctl: Option, -} - -type Jobs = (String, String, oneshot::Sender); -type RxJobs = spmc::Receiver; -type TxJobs = spmc::Sender; -async fn handle_client_interaction(jobs: RxJobs, node: &Node) -> Result<()> { - let (cmd, wrk_dir, tx_resp) = jobs.recv()?; - let job = create_job_for_remote_session(&cmd, &wrk_dir, &node); - let name = job.name(); - info!("Start computing job {name} ..."); - // FIXME: remote or local submission, make it selectable - // let mut comput = job.submit()?; - let mut comput = job.submit_remote(node.name())?; - // if computation failed, we should tell the client to exit - match comput.wait_for_output().await { - Ok(out) => { - info!("Job {name} completed, sending stdout to the client ..."); - if let Err(_) = tx_resp.send(out) { - error!("the client has been dropped"); - } - } - Err(err) => { - error!("Job {name:?} failed with error: {err:?}"); - tx_resp.send("Job failure".into()).ok(); - } - } - - Ok(()) -} - -mod taskserver { - use super::*; - - impl TaskServer { - /// Run child process in new session, and serve requests for interactions. - pub async fn run_and_serve(&mut self, nodes: Vec) -> Result<()> { - let mut rx_int = self.rx_int.take().context("no rx_int")?; - let mut rx_ctl = self.rx_ctl.take().context("no rx_ctl")?; - - let nodes = Nodes::new(nodes); - let (mut tx_jobs, rx_jobs) = spmc::channel(); - for i in 0.. { - // make sure run in parallel - let join_handler = { - let jobs = rx_jobs.clone(); - let nodes = nodes.clone(); - tokio::spawn(async move { - match nodes.borrow_node() { - Ok(node) => { - if let Err(err) = handle_client_interaction(jobs, &node).await { - error!("found error when running job: {err:?}"); - } - // return node back - if let Err(err) = nodes.return_node(node) { - error!("found error when return node: {err:?}"); - } - } - Err(err) => { - error!("found error when borrowing node: {err:?}"); - } - } - }) - }; - // handle logic in main thread - tokio::select! { - Ok(_) = join_handler => { - log_dbg!(); - } - Some(int) = rx_int.recv() => { - log_dbg!(); - let Interaction(cmd, wrk_dir, tx_resp) = int; - tx_jobs.send((cmd, wrk_dir, tx_resp))?; - } - Some(ctl) = rx_ctl.recv() => { - log_dbg!(); - match ctl { - Control::AddNode(node) => { - info!("client asked to add a new remote node: {node:?}"); - let nodes = nodes.clone(); - nodes.return_node(node.into()); - } - _ => { - log_dbg!(); - } - } - } - else => { - bail!("Unexpected branch: the communication channels broken?"); - } - } - } - Ok(()) - } - } -} -// 7b4ac45b ends here - -// [[file:../magman.note::8408786a][8408786a]] -/// Create task server and client. The client can be cloned and used in -/// concurrent environment -pub fn new_interactive_task() -> (TaskServer, TaskClient) { - let (tx_int, rx_int) = tokio::sync::mpsc::channel(1); - let (tx_ctl, rx_ctl) = tokio::sync::mpsc::channel(1); - - let server = TaskServer { - rx_int: rx_int.into(), - rx_ctl: rx_ctl.into(), - }; - - let client = TaskClient { - tx_int, - tx_ctl, - }; - - (server, client) -} -// 8408786a ends here diff --git a/src/job.rs b/src/job.rs deleted file mode 100644 index 1c56203..0000000 --- a/src/job.rs +++ /dev/null @@ -1,395 +0,0 @@ -// [[file:../magman.note::3728ca38][3728ca38]] -//! For handling running task/job -use super::*; - -use std::path::{Path, PathBuf}; -use gosh::runner::process::Session; -use gosh::runner::prelude::SpawnSessionExt; - -use tempfile::{tempdir, tempdir_in, TempDir}; -// 3728ca38 ends here - -// [[file:../magman.note::e2dc7cb8][e2dc7cb8]] -pub trait Compute { - fn wait_for_output(&mut self) -> Result; - - fn try_pause(&mut self) -> Result<()> { - todo!() - } - fn try_terminate(&mut self) -> Result<()> { - todo!() - } - fn try_resume(&mut self) -> Result<()> { - todo!() - } -} -// e2dc7cb8 ends here - -// [[file:../magman.note::50e6ed5a][50e6ed5a]] -/// Represents a computational job inputted by user. -#[derive(Debug, Deserialize, Serialize)] -pub struct Job { - /// A unique random name - name: String, - - /// Input string for stdin - input: String, - - /// The content of running script - script: String, - - /// Path to a file for saving input stream of computation - pub inp_file: PathBuf, - - /// Path to a file for saving output stream of computation. - pub out_file: PathBuf, - - /// Path to a file for saving error stream of computation. - pub err_file: PathBuf, - - /// Path to a script file that defining how to start computation - pub run_file: PathBuf, - - /// Extra files required for computation - pub extra_files: Vec, -} - -impl Job { - /// Construct a Job running shell script. - /// - /// # Parameters - /// - /// * script: the content of the script for running the job. - /// - pub fn new(script: &str) -> Self { - Self { - name: random_name(), - script: script.into(), - input: String::new(), - - out_file: "job.out".into(), - err_file: "job.err".into(), - run_file: "run".into(), - inp_file: "job.inp".into(), - extra_files: vec![], - } - } - - /// Add a new file into extra-files list. - pub fn attach_file>(&mut self, file: P) { - let file: PathBuf = file.as_ref().into(); - if !self.extra_files.contains(&file) { - self.extra_files.push(file); - } else { - warn!("try to attach a dumplicated file: {}!", file.display()); - } - } - - /// Return the job name - pub fn name(&self) -> String { - self.name.clone() - } -} - -fn random_name() -> String { - use rand::distributions::Alphanumeric; - use rand::Rng; - - let mut rng = rand::thread_rng(); - std::iter::repeat(()) - .map(|()| rng.sample(Alphanumeric)) - .map(char::from) - .take(6) - .collect() -} -// 50e6ed5a ends here - -// [[file:../magman.note::769262a8][769262a8]] -use crossbeam_channel::{unbounded, Receiver, Sender}; - -/// Represents a remote node for computation -#[derive(Debug, Clone)] -pub struct Node { - name: String, -} - -impl Node { - /// Return the name of remote node - pub fn name(&self) -> &str { - &self.name - } -} - -impl> From for Node { - fn from(node: T) -> Self { - let name = node.into(); - assert!(!name.is_empty(), "node name cannot be empty!"); - Self { name } - } -} - -/// Represents a list of remote nodes allocated for computation -#[derive(Clone)] -pub struct Nodes { - rx: Receiver, - tx: Sender, -} - -impl Nodes { - /// Construct `Nodes` from a list of nodes. - pub fn new>(nodes: impl IntoIterator) -> Self { - let (tx, rx) = unbounded(); - let nodes = nodes.into_iter().collect_vec(); - let n = nodes.len(); - info!("We have {n} nodes in totoal for computation."); - for node in nodes { - tx.send(node.into()).unwrap(); - } - Self { rx, tx } - } - - /// Borrow one node from `Nodes` - pub fn borrow_node(&self) -> Result { - let node = self.rx.recv()?; - let name = &node.name; - info!("client borrowed one node: {name:?}"); - Ok(node) - } - - /// Return one `node` to `Nodes` - pub fn return_node(&self, node: Node) -> Result<()> { - let name = &node.name; - info!("client returned node {name:?}"); - self.tx.send(node)?; - Ok(()) - } -} -// 769262a8 ends here - -// [[file:../magman.note::955c926a][955c926a]] -/// Computation represents a submitted `Job` -pub struct Computation { - job: Job, - - /// command session. The drop order is above Tempdir - session: Option>, - - /// The working directory of computation - wrk_dir: TempDir, -} -// 955c926a ends here - -// [[file:../magman.note::a65e6dae][a65e6dae]] -impl Computation { - /// The full path to the working directory for running the job. - pub fn wrk_dir(&self) -> &Path { - self.wrk_dir.path() - } - - /// The full path to computation input file (stdin). - pub fn inp_file(&self) -> PathBuf { - self.wrk_dir().join(&self.job.inp_file) - } - - /// The full path to computation output file (stdout). - pub fn out_file(&self) -> PathBuf { - self.wrk_dir().join(&self.job.out_file) - } - - /// The full path to computation error file (stderr). - pub fn err_file(&self) -> PathBuf { - self.wrk_dir().join(&self.job.err_file) - } - - /// The full path to the script for running the job. - pub fn run_file(&self) -> PathBuf { - self.wrk_dir().join(&self.job.run_file) - } -} -// a65e6dae ends here - -// [[file:../magman.note::f8672e0c][f8672e0c]] -use tokio::io::AsyncWriteExt; - -pub(crate) fn shell_script_for_run_using_ssh(cmd: &str, wrk_dir: &Path, node: &Node) -> String { - let node_name = node.name(); - let wrk_dir = wrk_dir.shell_escape_lossy(); - - format!( - "#! /usr/bin/env bash -ssh -x -o StrictHostKeyChecking=no {node_name} << END -cd {wrk_dir} -{cmd} -END -" - ) -} - -impl Job { - /// Submit the job and turn it into Computation. - pub fn submit(self) -> Result { - Computation::try_run(self) - } - - // /// Submit job onto remote `node` using `ssh`. - // pub fn submit_using_ssh(self, node: &Node) -> Result { - // let node_name = node.name(); - // let job_name = self.name(); - // debug!("run job {job_name:?} on remote node: {node_name:?}"); - - // let comput = Computation::try_run(self)?; - // let script = shell_script_for_run_using_ssh(cmdline, wrk_dir, node); - - // todo!(); - // } -} - -fn create_run_file(session: &Computation) -> Result<()> { - let run_file = session.run_file(); - gut::fs::write_script_file(&run_file, &session.job.script)?; - wait_file(&run_file, 2)?; - - Ok(()) -} - -impl Computation { - /// Construct `Computation` of user inputted `Job`. - pub fn try_run(job: Job) -> Result { - use std::fs::File; - - // create working directory in scratch space. - let wdir = tempfile::TempDir::new_in(".").expect("temp dir"); - let session = Self { - job, - wrk_dir: wdir.into(), - session: None, - }; - - // create run file and make sure it executable later - create_run_file(&session)?; - gut::fs::write_to_file(&session.inp_file(), &session.job.input)?; - - Ok(session) - } - - /// Wait for background command to complete. - async fn wait(&mut self) -> Result<()> { - if let Some(s) = self.session.as_mut() { - let ecode = s.child.wait().await?; - info!("job session exited: {}", ecode); - if !ecode.success() { - error!("job exited unsessfully."); - let txt = gut::fs::read_file(self.run_file())?; - error!("run file:\n{txt:?}"); - let txt = gut::fs::read_file(self.err_file())?; - error!("std_err:\n{txt:?}"); - } - Ok(()) - } else { - bail!("Job not started yet."); - } - } - - /// Run command in background. - async fn start(&mut self) -> Result<()> { - let program = self.run_file(); - let wdir = self.wrk_dir(); - info!("job work direcotry: {}", wdir.display()); - - let mut session = tokio::process::Command::new(&program) - .current_dir(wdir) - .stdin(std::process::Stdio::piped()) - .stdout(std::process::Stdio::piped()) - .stderr(std::process::Stdio::piped()) - .spawn_session()?; - - let mut stdin = session.child.stdin.take().expect("child did not have a handle to stdout"); - let mut stdout = session.child.stdout.take().expect("child did not have a handle to stdout"); - let mut stderr = session.child.stderr.take().expect("child did not have a handle to stderr"); - - // NOTE: suppose stdin stream is small. - stdin.write_all(self.job.input.as_bytes()).await; - - // redirect stdout and stderr to files for user inspection. - let mut fout = tokio::fs::File::create(self.out_file()).await?; - let mut ferr = tokio::fs::File::create(self.err_file()).await?; - tokio::io::copy(&mut stdout, &mut fout).await?; - tokio::io::copy(&mut stderr, &mut ferr).await?; - - let sid = session.handler().id(); - info!("command running in session {:?}", sid); - self.session = session.into(); - - Ok(()) - } - - /// Start computation, and wait its standard output - pub async fn run_for_output(&mut self) -> Result { - if let Err(err) = self.start().await { - // FIXME: need a better solution - // try again if failed due to NFS file delay issue - let err_msg = format!("{:?}", err); - let run_file = self.run_file(); - warn!("Execute file {run_file:?} failure: {err:?}"); - info!("Wait 1 second before next trial ..."); - gut::utils::sleep(1.0); - self.start().await?; - } - self.wait().await?; - let txt = gut::fs::read_file(self.out_file())?; - Ok(txt) - } - - /// Return true if session already has been started. - pub fn is_started(&self) -> bool { - self.session.is_some() - } -} -// f8672e0c ends here - -// [[file:../magman.note::*extra][extra:1]] -impl Computation { - /// Return a list of full path to extra files required for computation. - pub fn extra_files(&self) -> Vec { - self.job.extra_files.iter().map(|f| self.wrk_dir().join(f)).collect() - } - - /// Check if job has been done correctly. - pub fn is_done(&self) -> bool { - let inpfile = self.inp_file(); - let outfile = self.out_file(); - let errfile = self.err_file(); - - if self.wrk_dir().is_dir() { - if outfile.is_file() && inpfile.is_file() { - if let Ok(time2) = outfile.metadata().and_then(|m| m.modified()) { - if let Ok(time1) = inpfile.metadata().and_then(|m| m.modified()) { - if time2 >= time1 { - return true; - } - } - } - } - } - - false - } - - /// Update file timestamps to make sure `is_done` call return true. - pub fn fake_done(&self) { - todo!() - } -} -// extra:1 ends here - -// [[file:../magman.note::47382715][47382715]] -#[test] -#[ignore] -fn test_text_file_busy() -> Result<()> { - let f = "/home/ybyygu/a"; - if let Err(err) = gut::cli::duct::cmd!(f).read() { - dbg!(err); - } - Ok(()) -} -// 47382715 ends here diff --git a/src/lib.rs b/src/lib.rs index f8a55c4..ac5819f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,10 +15,6 @@ mod magmom; mod search; mod vasp; -mod interactive; -mod job; -mod restful; - pub use config::*; pub use search::*; // 25e28290 ends here @@ -86,7 +82,6 @@ pub fn collect_results_from_dir(d: &std::path::Path) -> Result<()> { // 5dec57d3 ends here // [[file:../magman.note::fd0d637b][fd0d637b]] -mod remote; mod magorder; pub mod cli; // fd0d637b ends here @@ -103,8 +98,8 @@ pub mod docs { }; } - export_doc!(interactive); + // export_doc!(interactive); // export_doc!(remote); - export_doc!(job); + // export_doc!(job); } // 56d334b5 ends here diff --git a/src/remote.rs b/src/remote.rs deleted file mode 100644 index 2a645f4..0000000 --- a/src/remote.rs +++ /dev/null @@ -1,383 +0,0 @@ -// [[file:../magman.note::*imports][imports:1]] -use super::*; -use std::process::Command; - -use bytes; -use tokio; -// imports:1 ends here - -// [[file:../magman.note::6831177b][6831177b]] -/// Shared codes for both server and client sides -mod codec { - use super::*; - use bytes::{Buf, BufMut, Bytes}; - use std::io::{Read, Write}; - use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt}; - use tokio::net::UnixStream; - - /// The request from client side - #[derive(Debug, Eq, PartialEq, Clone)] - pub enum ServerOp { - /// Control server process: pause/resume/quit - Control(Signal), - /// Request to run command using `cmd_line` in `working_dir` - Command((String, String)), - } - - #[derive(Debug, Eq, PartialEq, Clone)] - pub enum Signal { - Quit, - Resume, - Pause, - AddNode(String), - } - - impl ServerOp { - /// Encode message ready for sent over UnixStream - pub fn encode(&self) -> Vec { - use ServerOp::*; - - let mut buf = vec![]; - match self { - Control(sig) => { - buf.put_u8(b'X'); - let sig = match sig { - Signal::Quit => "SIGTERM", - Signal::Resume => "SIGCONT", - Signal::Pause => "SIGSTOP", - Signal::AddNode(node) => node, - }; - encode(&mut buf, sig); - buf - } - Command((input, pattern)) => { - buf.put_u8(b'0'); - encode(&mut buf, input); - encode(&mut buf, pattern); - buf - } - _ => { - todo!(); - } - } - } - - /// Read and decode raw data as operation for server - pub async fn decode(r: &mut R) -> Result { - let mut buf = vec![0_u8; 1]; - r.read_exact(&mut buf).await?; - let mut buf = &buf[..]; - - let op = match buf.get_u8() { - b'0' => { - let cmdline = String::from_utf8_lossy(&decode(r).await?).to_string(); - let wrk_dir = String::from_utf8_lossy(&decode(r).await?).to_string(); - ServerOp::Command((cmdline, wrk_dir)) - } - b'X' => { - let sig = String::from_utf8_lossy(&decode(r).await?).to_string(); - let sig = match sig.as_str() { - "SIGTERM" => Signal::Quit, - "SIGCONT" => Signal::Resume, - "SIGSTOP" => Signal::Pause, - node @ _ => Signal::AddNode(node.into()), - }; - ServerOp::Control(sig) - } - _ => { - todo!(); - } - }; - Ok(op) - } - } - - fn encode(mut buf: B, msg: &str) { - buf.put_u32(msg.len() as u32); - buf.put(msg.as_bytes()); - } - - async fn decode(r: &mut R) -> Result> { - let mut msg = vec![0_u8; 4]; - r.read_exact(&mut msg).await?; - let mut buf = &msg[..]; - let n = buf.get_u32() as usize; - let mut msg = vec![0_u8; n]; - r.read_exact(&mut msg).await?; - Ok(msg) - } - - pub async fn send_msg(stream: &mut UnixStream, msg: &[u8]) -> Result<()> { - stream.write_all(msg).await?; - stream.flush().await?; - Ok(()) - } - - pub async fn send_msg_encode(stream: &mut UnixStream, msg: &str) -> Result<()> { - let mut buf = vec![]; - - encode(&mut buf, msg); - send_msg(stream, &buf).await?; - - Ok(()) - } - - pub async fn recv_msg_decode(stream: &mut UnixStream) -> Result { - let msg = String::from_utf8_lossy(&decode(stream).await?).to_string(); - Ok(msg) - } - - #[tokio::test] - async fn test_async_codec() -> Result<()> { - let op = ServerOp::Control(Signal::Quit); - let d = op.encode(); - let decoded_op = ServerOp::decode(&mut d.as_slice()).await?; - assert_eq!(decoded_op, op); - - let cmdline = "echo hello".to_string(); - let wrk_dir = "/tmp/test".to_string(); - let op = ServerOp::Command((cmdline, wrk_dir)); - let d = op.encode(); - let decoded_op = ServerOp::decode(&mut d.as_slice()).await?; - assert_eq!(decoded_op, op); - - Ok(()) - } -} -// 6831177b ends here - -// [[file:../magman.note::03597617][03597617]] -mod client { - use super::*; - use gut::fs::*; - use std::io::{Read, Write}; - use tokio::net::UnixStream; - - /// Client of Unix domain socket - pub struct Client { - stream: UnixStream, - } - - impl Client { - /// Make connection to unix domain socket server - pub async fn connect(socket_file: &Path) -> Result { - debug!("Connect to socket server: {socket_file:?}"); - let stream = UnixStream::connect(socket_file) - .await - .with_context(|| format!("connect to socket file failure: {socket_file:?}"))?; - - let client = Self { stream }; - Ok(client) - } - - /// Request server to run cmd_line in working_dir and wait until complete. - pub async fn interact(&mut self, cmd_line: &str, working_dir: &str) -> Result { - debug!("Request server to run {cmd_line:?} in {working_dir:?} ..."); - let op = codec::ServerOp::Command((cmd_line.into(), working_dir.into())); - self.send_op(op).await?; - - trace!("receiving output"); - let txt = codec::recv_msg_decode(&mut self.stream).await?; - trace!("got {} bytes", txt.len()); - - Ok(txt) - } - - /// Add remote node into server list for computation. - pub async fn add_node(&mut self, node: String) -> Result<()> { - self.send_op_control(codec::Signal::AddNode(node)).await?; - - Ok(()) - } - - /// Try to ask the background computation to stop - pub async fn try_quit(&mut self) -> Result<()> { - self.send_op_control(codec::Signal::Quit).await?; - - Ok(()) - } - - /// Try to ask the background computation to pause - pub async fn try_pause(&mut self) -> Result<()> { - self.send_op_control(codec::Signal::Pause).await?; - - Ok(()) - } - - /// Try to ask the background computation to resume - pub async fn try_resume(&mut self) -> Result<()> { - self.send_op_control(codec::Signal::Resume).await?; - - Ok(()) - } - - /// Send control signal to server process - async fn send_op_control(&mut self, sig: codec::Signal) -> Result<()> { - debug!("Send control signal {:?}", sig); - let op = codec::ServerOp::Control(sig); - self.send_op(op).await?; - - Ok(()) - } - - async fn send_op(&mut self, op: codec::ServerOp) -> Result<()> { - use tokio::io::AsyncWriteExt; - - self.stream.write_all(&op.encode()).await?; - self.stream.flush().await?; - - Ok(()) - } - } -} -// 03597617 ends here - -// [[file:../magman.note::f5489963][f5489963]] -mod server { - use super::*; - use interactive::new_interactive_task; - use interactive::TaskClient; - - use gut::fs::*; - use tokio::net::{UnixListener, UnixStream}; - - /// Computation server backended by unix domain socket - #[derive(Debug)] - pub struct Server { - socket_file: PathBuf, - listener: UnixListener, - stream: Option, - } - - fn remove_socket_file(s: &Path) -> Result<()> { - if s.exists() { - std::fs::remove_file(s)?; - } - - Ok(()) - } - - impl Server { - async fn wait_for_client_stream(&mut self) -> Result { - let (stream, _) = self.listener.accept().await.context("accept new unix socket client")?; - - Ok(stream) - } - } - - impl Drop for Server { - // clean up existing unix domain socket file - fn drop(&mut self) { - let _ = remove_socket_file(&self.socket_file); - } - } - - impl Server { - /// Create a new socket server. Return error if the server already started. - pub fn create>(path: P) -> Result { - let socket_file = path.as_ref().to_owned(); - if socket_file.exists() { - bail!("Socket server already started: {:?}!", socket_file); - } - - let listener = UnixListener::bind(&socket_file).context("bind socket")?; - debug!("serve socket {:?}", socket_file); - - Ok(Server { - listener, - socket_file, - stream: None, - }) - } - - /// Start and serve the client interaction. - pub async fn run_and_serve(&mut self, nodes: Vec) -> Result<()> { - // watch for user interruption - let ctrl_c = tokio::signal::ctrl_c(); - - // state will be shared with different tasks - let (mut server, client) = new_interactive_task(); - - let h = server.run_and_serve(nodes); - tokio::pin!(h); - - tokio::select! { - _ = ctrl_c => { - // it is hard to exit background computations gracefully, - // for now we do nothing special here - info!("User interrupted. Shutting down ..."); - }, - res = &mut h => { - if let Err(e) = res { - error!("Task server error: {:?}", e); - } - }, - _ = async { - info!("server: start main loop ..."); - for i in 0.. { - // wait for client requests - let mut client_stream = self.wait_for_client_stream().await.unwrap(); - debug!("new incoming connection {}", i); - let task = client.clone(); - // spawn a new task for each client - tokio::spawn(async move { handle_client_requests(client_stream, task).await }); - } - } => { - info!("main loop done?"); - } - } - - Ok(()) - } - } - - async fn handle_client_requests(mut client_stream: UnixStream, mut task: TaskClient) { - use codec::ServerOp; - - while let Ok(op) = ServerOp::decode(&mut client_stream).await { - match op { - ServerOp::Command((cmdline, wrk_dir)) => { - debug!("client asked to run {cmdline:?} in {wrk_dir:?}"); - match task.interact(&cmdline, &wrk_dir).await { - Ok(txt) => { - debug!("sending client text read from stdout"); - if let Err(err) = codec::send_msg_encode(&mut client_stream, &txt).await { - error!("send client msg error: {err:?}"); - } - } - Err(err) => { - error!("interaction error: {:?}", err); - } - } - } - ServerOp::Control(sig) => { - debug!("client sent control signal {:?}", sig); - match sig { - codec::Signal::Quit => task.terminate().await.ok(), - codec::Signal::Pause => task.pause().await.ok(), - codec::Signal::Resume => task.resume().await.ok(), - codec::Signal::AddNode(node) => task.add_node(node).await.ok(), - }; - } - _ => { - unimplemented!(); - } - } - } - } -} -// f5489963 ends here - -// [[file:../magman.note::0ec87ebc][0ec87ebc]] -pub use client::Client; -pub use server::Server; - -impl Client { - pub async fn interact_with_remote_session(&mut self, cmd: &str, wrk_dir: &str) -> Result<()> { - let o = self.interact(cmd, wrk_dir).await?; - println!("stdout from server side:\n{o}"); - - Ok(()) - } -} -// 0ec87ebc ends here diff --git a/src/restful.rs b/src/restful.rs deleted file mode 100644 index e6330a0..0000000 --- a/src/restful.rs +++ /dev/null @@ -1,485 +0,0 @@ -// [[file:../magman.note::b8081727][b8081727]] -use super::*; - -use warp::Filter; -// b8081727 ends here - -// [[file:../magman.note::50e6ed5a][50e6ed5a]] -/// Represents a computational job inputted by user. -#[derive(Debug, Deserialize, Serialize)] -#[serde(default)] -pub struct Job { - /// The content of running script - script: String, - - /// A unique random name - name: String, - - /// Path to a file for saving output stream of computation. - pub out_file: PathBuf, - - /// Path to a file for saving error stream of computation. - pub err_file: PathBuf, - - /// Path to a script file that defining how to start computation - pub run_file: PathBuf, -} - -impl Default for Job { - fn default() -> Self { - Self { - script: "pwd".into(), - name: random_name(), - out_file: "job.out".into(), - err_file: "job.err".into(), - run_file: "run".into(), - } - } -} - -impl Job { - /// Construct a Job running shell script. - /// - /// # Parameters - /// - /// * script: the content of the script for running the job. - /// - pub fn new(script: &str) -> Self { - Self { - script: script.into(), - ..Default::default() - } - } - - /// Return the job name - pub fn name(&self) -> String { - self.name.clone() - } -} - -fn random_name() -> String { - use rand::distributions::Alphanumeric; - use rand::Rng; - - let mut rng = rand::thread_rng(); - std::iter::repeat(()) - .map(|()| rng.sample(Alphanumeric)) - .map(char::from) - .take(6) - .collect() -} -// 50e6ed5a ends here - -// [[file:../magman.note::e19bce71][e19bce71]] -use std::path::{Path, PathBuf}; - -use gosh::runner::prelude::SpawnSessionExt; -use gosh::runner::process::Session; - -use tempfile::{tempdir, tempdir_in, TempDir}; -use tokio::io::AsyncWriteExt; -// e19bce71 ends here - -// [[file:../magman.note::955c926a][955c926a]] -/// Computation represents a submitted `Job` -pub struct Computation { - job: Job, - - /// command session. The drop order is above Tempdir - session: Option>, - - /// The working directory of computation - wrk_dir: TempDir, -} -// 955c926a ends here - -// [[file:../magman.note::a65e6dae][a65e6dae]] -impl Computation { - /// The full path to the working directory for running the job. - fn wrk_dir(&self) -> &Path { - self.wrk_dir.path() - } - - /// The full path to computation output file (stdout). - fn out_file(&self) -> PathBuf { - self.wrk_dir().join(&self.job.out_file) - } - - /// The full path to computation error file (stderr). - fn err_file(&self) -> PathBuf { - self.wrk_dir().join(&self.job.err_file) - } - - /// The full path to the script for running the job. - fn run_file(&self) -> PathBuf { - self.wrk_dir().join(&self.job.run_file) - } -} -// a65e6dae ends here - -// [[file:../magman.note::f8672e0c][f8672e0c]] -impl Job { - /// Submit the job and turn it into Computation. - pub fn submit(self) -> Result { - Computation::try_run(self) - } -} - -impl Computation { - /// create run file and make sure it executable later - fn create_run_file(&self) -> Result<()> { - let run_file = &self.run_file(); - gut::fs::write_script_file(run_file, &self.job.script)?; - wait_file(&run_file, 2)?; - - Ok(()) - } - - /// Construct `Computation` of user inputted `Job`. - fn try_run(job: Job) -> Result { - use std::fs::File; - - // create working directory in scratch space. - let wdir = tempfile::TempDir::new_in(".").expect("temp dir"); - let session = Self { - job, - wrk_dir: wdir.into(), - session: None, - }; - - session.create_run_file()?; - - Ok(session) - } - - /// Wait for background command to complete. - async fn wait(&mut self) -> Result<()> { - if let Some(s) = self.session.as_mut() { - let ecode = s.child.wait().await?; - info!("job session exited: {}", ecode); - if !ecode.success() { - error!("job exited unsuccessfully!"); - let txt = gut::fs::read_file(self.run_file())?; - let run = format!("run file: {txt:?}"); - let txt = gut::fs::read_file(self.err_file())?; - let err = format!("stderr: {txt:?}"); - bail!("Job failed with error:\n{run:?}{err:?}"); - } - Ok(()) - } else { - bail!("Job not started yet."); - } - } - - /// Run command in background. - async fn start(&mut self) -> Result<()> { - let program = self.run_file(); - let wdir = self.wrk_dir(); - info!("job work direcotry: {}", wdir.display()); - - let mut session = tokio::process::Command::new(&program) - .current_dir(wdir) - .stdout(std::process::Stdio::piped()) - .stderr(std::process::Stdio::piped()) - .spawn_session()?; - - let mut stdout = session.child.stdout.take().expect("child did not have a handle to stdout"); - let mut stderr = session.child.stderr.take().expect("child did not have a handle to stderr"); - - // redirect stdout and stderr to files for user inspection. - let mut fout = tokio::fs::File::create(self.out_file()).await?; - let mut ferr = tokio::fs::File::create(self.err_file()).await?; - tokio::io::copy(&mut stdout, &mut fout).await?; - tokio::io::copy(&mut stderr, &mut ferr).await?; - - let sid = session.handler().id(); - info!("command running in session {:?}", sid); - self.session = session.into(); - - Ok(()) - } - - /// Start computation, and wait and return its standard output - pub async fn wait_for_output(&mut self) -> Result { - self.start().await?; - self.wait().await?; - let txt = gut::fs::read_file(self.out_file())?; - Ok(txt) - } - - /// Return true if session already has been started. - fn is_started(&self) -> bool { - self.session.is_some() - } -} -// f8672e0c ends here - -// [[file:../magman.note::34c67980][34c67980]] -impl Computation { - /// Check if job has been done correctly. - fn is_done(&self) -> bool { - let runfile = self.run_file(); - let outfile = self.out_file(); - - if self.wrk_dir().is_dir() { - if outfile.is_file() && runfile.is_file() { - if let Ok(time2) = outfile.metadata().and_then(|m| m.modified()) { - if let Ok(time1) = runfile.metadata().and_then(|m| m.modified()) { - if time2 >= time1 { - return true; - } - } - } - } - } - - false - } - - /// Update file timestamps to make sure `is_done` call return true. - fn fake_done(&self) { - todo!() - } -} -// 34c67980 ends here - -// [[file:../magman.note::08048436][08048436]] -use std::sync::atomic; - -static SERVER_BUSY: atomic::AtomicBool = atomic::AtomicBool::new(false); - -fn server_busy() -> bool { - SERVER_BUSY.load(atomic::Ordering::SeqCst) -} - -fn server_mark_busy() { - if !server_busy() { - SERVER_BUSY.store(true, atomic::Ordering::SeqCst); - } else { - panic!("server is already busy") - } -} - -fn server_mark_free() { - if server_busy() { - SERVER_BUSY.store(false, atomic::Ordering::SeqCst); - } else { - panic!("server is already free") - } -} - -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] -enum ComputationResult { - JobCompleted(String), - JobFailed(String), - NotStarted(String), -} -// 08048436 ends here - -// [[file:../magman.note::07c5146c][07c5146c]] -mod handlers { - use super::*; - - /// POST /jobs with JSON body - pub async fn create_job(job: Job) -> Result { - if !server_busy() { - server_mark_busy(); - let comput = job.submit(); - match comput { - Ok(mut comput) => match comput.wait_for_output().await { - Ok(out) => { - server_mark_free(); - let ret = ComputationResult::JobCompleted(out); - Ok(warp::reply::json(&ret)) - } - Err(err) => { - server_mark_free(); - let msg = format!("{err:?}"); - let ret = ComputationResult::JobFailed(msg); - Ok(warp::reply::json(&ret)) - } - }, - Err(err) => { - server_mark_free(); - let msg = format!("failed to create job: {err:?}"); - error!("{msg}"); - let ret = ComputationResult::JobFailed(msg); - Ok(warp::reply::json(&ret)) - } - } - } else { - server_mark_free(); - let msg = format!("Server is busy"); - let ret = ComputationResult::NotStarted(msg); - Ok(warp::reply::json(&ret)) - } - } -} -// 07c5146c ends here - -// [[file:../magman.note::a5b61fa9][a5b61fa9]] -mod filters { - use super::*; - - fn json_body() -> impl Filter + Clone { - warp::body::content_length_limit(1024 * 3200).and(warp::body::json()) - } - - /// POST /jobs with JSON body - async fn job_run() -> impl Filter + Clone { - warp::path!("jobs") - .and(warp::post()) - .and(json_body()) - .and_then(handlers::create_job) - } - - pub async fn api() -> impl Filter + Clone { - job_run().await - } -} -// a5b61fa9 ends here - -// [[file:../magman.note::1dd5d4ed][1dd5d4ed]] -mod server { - use super::*; - use std::fmt::Debug; - use std::net::{SocketAddr, ToSocketAddrs}; - - /// Computation server. - pub struct Server { - pub address: SocketAddr, - } - - impl Server { - pub fn new(addr: impl ToSocketAddrs + Debug) -> Self { - let addrs: Vec<_> = addr.to_socket_addrs().expect("bad address").collect(); - assert!(addrs.len() > 0, "invalid server address: {addr:?}"); - Self { address: addrs[0] } - } - } -} -// 1dd5d4ed ends here - -// [[file:../magman.note::e324852d][e324852d]] -/// Submit job remotely using REST api service -pub struct RemoteComputation { - job: Job, - client: reqwest::blocking::Client, - service_uri: String, -} - -impl RemoteComputation { - pub async fn wait_for_output(&self) -> Result { - let resp = self - .client - .post(&self.service_uri) - .json(&self.job) - .send()? - .text() - .context("client requests to create job")?; - Ok(resp) - } -} - -impl Job { - /// Remote submission using RESTful service - pub fn submit_remote(self, server_address: &str) -> Result { - // NOTE: the default request timeout is 30 seconds. Here we disable - // timeout using reqwest builder. - let client = reqwest::blocking::Client::builder().timeout(None).build()?; - let uri = format!("http://{}/jobs/", server_address); - let comput = RemoteComputation { - job: self, - service_uri: uri, - client, - }; - - Ok(comput) - } -} -// e324852d ends here - -// [[file:../magman.note::62b9ac23][62b9ac23]] -impl server::Server { - pub async fn bind(addr: &str) { - let server = Self::new(addr); - server.serve().await; - } - - async fn serve(&self) { - let (tx, rx) = tokio::sync::oneshot::channel(); - let services = warp::serve(filters::api().await); - let (addr, server) = services.bind_with_graceful_shutdown(self.address, async { - rx.await.ok(); - }); - println!("listening on {addr:?}"); - - let ctrl_c = tokio::signal::ctrl_c(); - tokio::select! { - _ = server => { - eprintln!("server closed"); - } - _ = ctrl_c => { - let _ = tx.send(()); - eprintln!("user interruption"); - } - } - } -} -// 62b9ac23 ends here - -// [[file:../magman.note::e91e1d87][e91e1d87]] -pub mod cli { - use super::*; - use gut::cli::*; - - /// Application server for remote calculations. - #[derive(StructOpt, Debug)] - struct Cli { - #[structopt(flatten)] - verbose: gut::cli::Verbosity, - - /// Set application server address for binding. - /// - /// * Example - /// - /// - app-server localhost:3030 (default) - /// - app-server tower:7070 - #[structopt(name = "ADDRESS", default_value = "localhost:3030")] - address: String, - } - - #[tokio::main] - pub async fn server_enter_main() -> Result<()> { - let args = Cli::from_args(); - args.verbose.setup_logger(); - server::Server::bind(&args.address).await; - - Ok(()) - } -} -// e91e1d87 ends here - -// [[file:../magman.note::27b117b8][27b117b8]] -#[cfg(test)] -mod tests { - use super::*; - use warp::test::request; - - #[tokio::test] - async fn test_warp_post() { - let api = filters::api().await; - let resp = request().method("POST").path("/jobs").json(&job_pwd()).reply(&api).await; - assert!(resp.status().is_success()); - let x: ComputationResult = serde_json::from_slice(&resp.body()).unwrap(); - assert_eq!(x, ComputationResult::JobCompleted("/tmp\n".into())); - } - - fn job_pwd() -> Job { - let job = Job::new("cd /tmp; pwd"); - let x = serde_json::to_string(&job); - dbg!(x); - job - } -} -// 27b117b8 ends here