From 32c2329fcac1643ec051246503e51f26778f6fbc Mon Sep 17 00:00:00 2001 From: Val213 <112376067+val213@users.noreply.github.com> Date: Tue, 26 Nov 2024 23:33:53 +0800 Subject: [PATCH] =?UTF-8?q?feat(main):=20=E5=BC=95=E5=85=A5=E5=A4=9A?= =?UTF-8?q?=E7=BA=BF=E7=A8=8B=E8=AF=BB=E5=8F=96=20ctl=20=E5=91=BD=E5=90=8D?= =?UTF-8?q?=E7=AE=A1=E9=81=93=EF=BC=8C=E7=94=A8=E4=BF=A1=E5=8F=B7=E6=9C=BA?= =?UTF-8?q?=E5=88=B6=E6=94=B9=E5=86=99=20service=20=E5=92=8C=20task=20?= =?UTF-8?q?=E7=9A=84=E7=8A=B6=E6=80=81=E6=A3=80=E6=9F=A5=20(#47)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: 多线程读取ctl命名管道,信号机制改写service和task的状态检查 --- Cargo.toml | 3 +- src/main.rs | 15 ++-- src/manager/mod.rs | 140 +++++++++++++++++++--------------- src/systemctl/listener/mod.rs | 44 ++++++----- src/unit/mod.rs | 2 +- src/unit/signal.rs | 15 ++++ 6 files changed, 131 insertions(+), 88 deletions(-) create mode 100644 src/unit/signal.rs diff --git a/Cargo.toml b/Cargo.toml index 054421b..1b4d432 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,12 +14,13 @@ path = "systemctl/src/main.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +nix = "0.23" hashbrown = "0.11" cfg-if = { version = "1.0" } lazy_static = { version = "1.4.0" } libc = "0.2" humantime = "2.1" - +tokio = { version = "1.25", features = ["full"] } [profile.release] panic = 'abort' diff --git a/src/main.rs b/src/main.rs index 073e18a..935c5a1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,13 +7,13 @@ mod systemctl; mod task; mod time; mod unit; - +use crate::executor::Executor; use error::ErrorFormat; use manager::{timer_manager::TimerManager, Manager}; use parse::UnitParser; +use std::thread; use systemctl::listener::Systemctl; - -use crate::executor::Executor; +use unit::signal::init_signal_handler; pub struct FileDescriptor(usize); @@ -54,6 +54,13 @@ fn main() { println!("Parse {} success!", path); } + // 初始化信号处理程序 + init_signal_handler(); + // 监听systemctl + thread::spawn(move || { + Systemctl::ctl_listen(); + }); + // 启动完服务后进入主循环 loop { // 检查各服务运行状态 @@ -62,7 +69,5 @@ fn main() { Manager::check_cmd_proc(); // 检查计时器任务 TimerManager::check_timer(); - // 监听systemctl - Systemctl::ctl_listen(); } } diff --git a/src/manager/mod.rs b/src/manager/mod.rs index a1f11db..8215ebe 100644 --- a/src/manager/mod.rs +++ b/src/manager/mod.rs @@ -7,92 +7,106 @@ pub use unit_manager::*; use crate::executor::ExitStatus; use self::timer_manager::TimerManager; - +use crate::unit::signal::SIGCHILD_SIGNAL_RECEIVED; +use nix::sys::wait::{waitpid, WaitPidFlag, WaitStatus}; +use nix::unistd::Pid; +use std::sync::atomic::Ordering; pub struct Manager; impl Manager { - /// ## 检查当前DragonReach运行的项目状态,并对其分发处理 + /// ## 检查当前 DragonReach 运行的项目状态,并对其分发处理 pub fn check_running_status() { - // 检查正在运行的Unit - let mut running_manager = RUNNING_TABLE.write().unwrap(); - let mut exited_unit: Vec<(usize, ExitStatus)> = Vec::new(); - for unit in running_manager.mut_running_table() { - let proc = unit.1; - match proc.try_wait() { - //进程正常退出 - Ok(Some(status)) => { - exited_unit.push(( - *unit.0, - ExitStatus::from_exit_code(status.code().unwrap_or(0)), - )); - } - //进程错误退出(或启动失败) - Err(e) => { - eprintln!("unit error: {}", e); - - //test - exited_unit.push((*unit.0, ExitStatus::from_exit_code(!0))); + if SIGCHILD_SIGNAL_RECEIVED + .compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst) + .is_ok() + { + let mut exited_unit: Vec<(usize, ExitStatus)> = Vec::new(); + let mut running_manager = RUNNING_TABLE.write().unwrap(); + // 检查所有运行中的 Unit + for unit in running_manager.mut_running_table() { + let pid = Pid::from_raw(unit.1.id() as i32); + // 检查 Unit 的运行状态 + match waitpid(Some(pid), Some(WaitPidFlag::WNOHANG)) { + // 若 Unit 为正常退出,则将其加入退出列表 + Ok(WaitStatus::Exited(_, status)) => { + exited_unit.push((*unit.0, ExitStatus::from_exit_code(status))); + } + // 若 Unit 为被信号终止,则将其加入退出列表,并输出日志 + Ok(WaitStatus::Signaled(_, signal, _)) => { + eprintln!("unit terminated by signal: {}", signal); + exited_unit.push((*unit.0, ExitStatus::from_exit_code(!0))); + } + // 其他错误情况 + Err(_) => { + eprintln!("unit waitpid error"); + } + // 若 Unit 正常运行,则不做处理 + Ok(_) => {} } - //进程处于正常运行状态 - _ => {} } - } - //释放锁,以便后续删除操作能拿到锁 - drop(running_manager); - // 处理退出的Unit - for tmp in exited_unit { - // 从运行表中擦除该unit - UnitManager::remove_running(tmp.0); + drop(running_manager); - // 取消该任务的定时器任务 - TimerManager::cancel_timer(tmp.0); + // 处理退出的 Unit + for tmp in exited_unit { + // 将该任务从运行表中移除 + UnitManager::remove_running(tmp.0); - let _ = UnitManager::get_unit_with_id(&tmp.0) - .unwrap() - .lock() - .unwrap() - .exit(); //交付给相应类型的Unit类型去执行退出后的逻辑 + // 取消该任务的定时器任务 + TimerManager::cancel_timer(tmp.0); - TimerManager::update_next_trigger(tmp.0, false); //更新所有归属于此unit的计时器 + // 交付处理子进程退出逻辑 + let _ = UnitManager::get_unit_with_id(&tmp.0) + .unwrap() + .lock() + .unwrap() + .exit(); - // 交付处理子进程退出逻辑 - let unit = UnitManager::get_unit_with_id(&tmp.0).unwrap(); - unit.lock().unwrap().after_exit(tmp.1); - } + // 更新属于该 Unit 的定时器任务 + TimerManager::update_next_trigger(tmp.0, false); - // 若无运行中任务,则取出IDLE任务运行 - if UnitManager::running_count() == 0 { - let unit = UnitManager::pop_a_idle_service(); - match unit { - Some(unit) => { + // 交付处理子进程退出后逻辑 + let unit = UnitManager::get_unit_with_id(&tmp.0).unwrap(); + unit.lock().unwrap().after_exit(tmp.1); + } + // 若无运行中任务,则取出 IDLE 任务运行 + if UnitManager::running_count() == 0 { + if let Some(unit) = UnitManager::pop_a_idle_service() { let _ = unit.lock().unwrap().run(); } - None => {} } } } /// ## 检查当前所有cmd进程的运行状态 pub fn check_cmd_proc() { - let mut exited = Vec::new(); - let mut table = CMD_PROCESS_TABLE.write().unwrap(); - for tuple in table.iter_mut() { - let mut proc = tuple.1.lock().unwrap(); - match proc.try_wait() { - // 正常运行 - Ok(None) => {} - // 停止运行,从表中删除数据 - _ => { - // TODO: 应该添加错误处理,有一些命令执行失败会影响服务正常运行 - // 后续应该添加机制来执行服务相关命令启动失败的回调 - exited.push(*tuple.0); + if SIGCHILD_SIGNAL_RECEIVED + .compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst) + .is_ok() + { + let mut exited = Vec::new(); + let mut table = CMD_PROCESS_TABLE.write().unwrap(); + + for tuple in table.iter_mut() { + let pid = Pid::from_raw(tuple.1.lock().unwrap().id() as i32); + match waitpid(Some(pid), Some(WaitPidFlag::WNOHANG)) { + // 若 cmd 停止运行,则将其加入退出列表 + Ok(WaitStatus::Exited(_, _)) | Ok(WaitStatus::Signaled(_, _, _)) => { + eprintln!("cmd exited"); + exited.push(*tuple.0); + } + Ok(_) => {} + Err(_) => { + // TODO: 应该添加错误处理,有一些命令执行失败会影响服务正常运行 + // 后续应该添加机制来执行服务相关命令启动失败的回调 + eprintln!("cmd waitpid error"); + } } } - } - for id in exited { - table.remove(&id); + for id in exited { + table.remove(&id); + } } } } diff --git a/src/systemctl/listener/mod.rs b/src/systemctl/listener/mod.rs index ea1f2ec..e525095 100644 --- a/src/systemctl/listener/mod.rs +++ b/src/systemctl/listener/mod.rs @@ -1,15 +1,14 @@ +use super::ctl_parser::{CommandOperation, CtlParser, Pattern}; +use super::{ctl_path, DRAGON_REACH_CTL_PIPE}; +use crate::error::ErrorFormat; +use crate::manager::ctl_manager::CtlManager; +use lazy_static::lazy_static; use std::fs::{self, File}; use std::io::Read; use std::os::fd::FromRawFd; use std::sync::{Arc, Mutex}; - -use lazy_static::lazy_static; - -use crate::error::ErrorFormat; -use crate::manager::ctl_manager::CtlManager; - -use super::ctl_parser::{CommandOperation, CtlParser, Pattern}; -use super::{ctl_path, DRAGON_REACH_CTL_PIPE}; +use std::thread; +use std::time::Duration; lazy_static! { static ref CTL_READER: Mutex> = { @@ -17,7 +16,7 @@ lazy_static! { Mutex::new(Arc::new(file)) }; } - +#[derive(Debug)] pub struct Command { pub(crate) operation: CommandOperation, pub(crate) args: Option>, @@ -68,18 +67,27 @@ impl Systemctl { /// 持续从系统服务控制管道中读取命令。 /// pub fn ctl_listen() { + println!("ctl listen"); let mut guard = CTL_READER.lock().unwrap(); let mut s = String::new(); - if let Ok(size) = guard.read_to_string(&mut s) { - if size == 0 { - return; - } - match CtlParser::parse_ctl(&s) { - Ok(cmd) => { - let _ = CtlManager::exec_ctl(cmd); + loop { + s.clear(); + match guard.read_to_string(&mut s) { + Ok(size) if size > 0 => match CtlParser::parse_ctl(&s) { + Ok(cmd) => { + let _ = CtlManager::exec_ctl(cmd); + } + Err(e) => { + eprintln!("Failed to parse command: {}", e.error_format()); + } + }, + Ok(_) => { + // 如果读取到的大小为0,说明没有数据可读,适当休眠 + thread::sleep(Duration::from_millis(100)); } - Err(err) => { - eprintln!("parse tcl command error: {}", err.error_format()); + Err(e) => { + eprintln!("Failed to read from pipe: {}", e); + break; } } } diff --git a/src/unit/mod.rs b/src/unit/mod.rs index f38c77b..3922b33 100644 --- a/src/unit/mod.rs +++ b/src/unit/mod.rs @@ -12,9 +12,9 @@ use crate::parse::parse_util::UnitParseUtil; use crate::parse::Segment; pub mod service; +pub mod signal; pub mod target; pub mod timer; - use self::target::TargetUnit; pub fn generate_unit_id() -> usize { diff --git a/src/unit/signal.rs b/src/unit/signal.rs new file mode 100644 index 0000000..f1bbd92 --- /dev/null +++ b/src/unit/signal.rs @@ -0,0 +1,15 @@ +use nix::sys::signal::{self, SigHandler, Signal}; +use std::sync::atomic::{AtomicBool, Ordering}; + +pub static SIGCHILD_SIGNAL_RECEIVED: AtomicBool = AtomicBool::new(false); + +extern "C" fn handle_sigchld(_: libc::c_int) { + SIGCHILD_SIGNAL_RECEIVED.store(true, Ordering::SeqCst); +} + +pub fn init_signal_handler() { + unsafe { + signal::signal(Signal::SIGCHLD, SigHandler::Handler(handle_sigchld)) + .expect("Error setting SIGUSR1 handler"); + } +}