Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(main): 引入多线程读取 ctl 命名管道,用信号机制改写 service 和 task 的状态检查 #47

Merged
merged 6 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ path = "systemctl/src/main.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
nix = "0.23"
hashbrown = "0.11"
cfg-if = { version = "1.0" }
lazy_static = { version = "1.4.0" }
libc = "0.2"
humantime = "2.1"

tokio = { version = "1.25", features = ["full"] }
[profile.release]
panic = 'abort'

Expand Down
15 changes: 10 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ mod systemctl;
mod task;
mod time;
mod unit;

use crate::executor::Executor;
use error::ErrorFormat;
use manager::{timer_manager::TimerManager, Manager};
use parse::UnitParser;
use std::thread;
use systemctl::listener::Systemctl;

use crate::executor::Executor;
use unit::signal::init_signal_handler;

pub struct FileDescriptor(usize);

Expand Down Expand Up @@ -54,6 +54,13 @@ fn main() {
println!("Parse {} success!", path);
}

// 初始化信号处理程序
init_signal_handler();
// 监听systemctl
thread::spawn(move || {
Systemctl::ctl_listen();
});

// 启动完服务后进入主循环
loop {
// 检查各服务运行状态
Expand All @@ -62,7 +69,5 @@ fn main() {
Manager::check_cmd_proc();
// 检查计时器任务
TimerManager::check_timer();
// 监听systemctl
Systemctl::ctl_listen();
}
}
140 changes: 77 additions & 63 deletions src/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,92 +7,106 @@ pub use unit_manager::*;
use crate::executor::ExitStatus;

use self::timer_manager::TimerManager;

use crate::unit::signal::SIGCHILD_SIGNAL_RECEIVED;
use nix::sys::wait::{waitpid, WaitPidFlag, WaitStatus};
use nix::unistd::Pid;
use std::sync::atomic::Ordering;
pub struct Manager;

impl Manager {
/// ## 检查当前DragonReach运行的项目状态,并对其分发处理
/// ## 检查当前 DragonReach 运行的项目状态,并对其分发处理
pub fn check_running_status() {
// 检查正在运行的Unit
let mut running_manager = RUNNING_TABLE.write().unwrap();
let mut exited_unit: Vec<(usize, ExitStatus)> = Vec::new();
for unit in running_manager.mut_running_table() {
let proc = unit.1;
match proc.try_wait() {
//进程正常退出
Ok(Some(status)) => {
exited_unit.push((
*unit.0,
ExitStatus::from_exit_code(status.code().unwrap_or(0)),
));
}
//进程错误退出(或启动失败)
Err(e) => {
eprintln!("unit error: {}", e);

//test
exited_unit.push((*unit.0, ExitStatus::from_exit_code(!0)));
if SIGCHILD_SIGNAL_RECEIVED
.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
let mut exited_unit: Vec<(usize, ExitStatus)> = Vec::new();
let mut running_manager = RUNNING_TABLE.write().unwrap();
// 检查所有运行中的 Unit
for unit in running_manager.mut_running_table() {
let pid = Pid::from_raw(unit.1.id() as i32);
// 检查 Unit 的运行状态
match waitpid(Some(pid), Some(WaitPidFlag::WNOHANG)) {
// 若 Unit 为正常退出,则将其加入退出列表
Ok(WaitStatus::Exited(_, status)) => {
exited_unit.push((*unit.0, ExitStatus::from_exit_code(status)));
}
// 若 Unit 为被信号终止,则将其加入退出列表,并输出日志
Ok(WaitStatus::Signaled(_, signal, _)) => {
eprintln!("unit terminated by signal: {}", signal);
exited_unit.push((*unit.0, ExitStatus::from_exit_code(!0)));
}
// 其他错误情况
Err(_) => {
eprintln!("unit waitpid error");
}
// 若 Unit 正常运行,则不做处理
Ok(_) => {}
}
//进程处于正常运行状态
_ => {}
}
}
//释放锁,以便后续删除操作能拿到锁
drop(running_manager);

// 处理退出的Unit
for tmp in exited_unit {
// 从运行表中擦除该unit
UnitManager::remove_running(tmp.0);
drop(running_manager);

// 取消该任务的定时器任务
TimerManager::cancel_timer(tmp.0);
// 处理退出的 Unit
for tmp in exited_unit {
// 将该任务从运行表中移除
UnitManager::remove_running(tmp.0);

let _ = UnitManager::get_unit_with_id(&tmp.0)
.unwrap()
.lock()
.unwrap()
.exit(); //交付给相应类型的Unit类型去执行退出后的逻辑
// 取消该任务的定时器任务
TimerManager::cancel_timer(tmp.0);

TimerManager::update_next_trigger(tmp.0, false); //更新所有归属于此unit的计时器
// 交付处理子进程退出逻辑
let _ = UnitManager::get_unit_with_id(&tmp.0)
.unwrap()
.lock()
.unwrap()
.exit();

// 交付处理子进程退出逻辑
let unit = UnitManager::get_unit_with_id(&tmp.0).unwrap();
unit.lock().unwrap().after_exit(tmp.1);
}
// 更新属于该 Unit 的定时器任务
TimerManager::update_next_trigger(tmp.0, false);

// 若无运行中任务,则取出IDLE任务运行
if UnitManager::running_count() == 0 {
let unit = UnitManager::pop_a_idle_service();
match unit {
Some(unit) => {
// 交付处理子进程退出后逻辑
let unit = UnitManager::get_unit_with_id(&tmp.0).unwrap();
unit.lock().unwrap().after_exit(tmp.1);
}
// 若无运行中任务,则取出 IDLE 任务运行
if UnitManager::running_count() == 0 {
if let Some(unit) = UnitManager::pop_a_idle_service() {
let _ = unit.lock().unwrap().run();
}
None => {}
}
}
}

/// ## 检查当前所有cmd进程的运行状态
pub fn check_cmd_proc() {
let mut exited = Vec::new();
let mut table = CMD_PROCESS_TABLE.write().unwrap();
for tuple in table.iter_mut() {
let mut proc = tuple.1.lock().unwrap();
match proc.try_wait() {
// 正常运行
Ok(None) => {}
// 停止运行,从表中删除数据
_ => {
// TODO: 应该添加错误处理,有一些命令执行失败会影响服务正常运行
// 后续应该添加机制来执行服务相关命令启动失败的回调
exited.push(*tuple.0);
if SIGCHILD_SIGNAL_RECEIVED
.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
let mut exited = Vec::new();
let mut table = CMD_PROCESS_TABLE.write().unwrap();

for tuple in table.iter_mut() {
let pid = Pid::from_raw(tuple.1.lock().unwrap().id() as i32);
match waitpid(Some(pid), Some(WaitPidFlag::WNOHANG)) {
// 若 cmd 停止运行,则将其加入退出列表
Ok(WaitStatus::Exited(_, _)) | Ok(WaitStatus::Signaled(_, _, _)) => {
eprintln!("cmd exited");
exited.push(*tuple.0);
}
Ok(_) => {}
Err(_) => {
// TODO: 应该添加错误处理,有一些命令执行失败会影响服务正常运行
// 后续应该添加机制来执行服务相关命令启动失败的回调
eprintln!("cmd waitpid error");
}
}
}
}

for id in exited {
table.remove(&id);
for id in exited {
table.remove(&id);
}
}
}
}
44 changes: 26 additions & 18 deletions src/systemctl/listener/mod.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
use super::ctl_parser::{CommandOperation, CtlParser, Pattern};
use super::{ctl_path, DRAGON_REACH_CTL_PIPE};
use crate::error::ErrorFormat;
use crate::manager::ctl_manager::CtlManager;
use lazy_static::lazy_static;
use std::fs::{self, File};
use std::io::Read;
use std::os::fd::FromRawFd;
use std::sync::{Arc, Mutex};

use lazy_static::lazy_static;

use crate::error::ErrorFormat;
use crate::manager::ctl_manager::CtlManager;

use super::ctl_parser::{CommandOperation, CtlParser, Pattern};
use super::{ctl_path, DRAGON_REACH_CTL_PIPE};
use std::thread;
use std::time::Duration;

lazy_static! {
static ref CTL_READER: Mutex<Arc<File>> = {
let file = Systemctl::init_listener();
Mutex::new(Arc::new(file))
};
}

#[derive(Debug)]
pub struct Command {
pub(crate) operation: CommandOperation,
pub(crate) args: Option<Vec<String>>,
Expand Down Expand Up @@ -68,18 +67,27 @@ impl Systemctl {
/// 持续从系统服务控制管道中读取命令。
///
pub fn ctl_listen() {
println!("ctl listen");
let mut guard = CTL_READER.lock().unwrap();
let mut s = String::new();
if let Ok(size) = guard.read_to_string(&mut s) {
if size == 0 {
return;
}
match CtlParser::parse_ctl(&s) {
Ok(cmd) => {
let _ = CtlManager::exec_ctl(cmd);
loop {
s.clear();
match guard.read_to_string(&mut s) {
Ok(size) if size > 0 => match CtlParser::parse_ctl(&s) {
Ok(cmd) => {
let _ = CtlManager::exec_ctl(cmd);
}
Err(e) => {
eprintln!("Failed to parse command: {}", e.error_format());
}
},
Ok(_) => {
// 如果读取到的大小为0,说明没有数据可读,适当休眠
thread::sleep(Duration::from_millis(100));
}
Err(err) => {
eprintln!("parse tcl command error: {}", err.error_format());
Err(e) => {
eprintln!("Failed to read from pipe: {}", e);
break;
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/unit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ use crate::parse::parse_util::UnitParseUtil;
use crate::parse::Segment;

pub mod service;
pub mod signal;
pub mod target;
pub mod timer;

use self::target::TargetUnit;

pub fn generate_unit_id() -> usize {
Expand Down
15 changes: 15 additions & 0 deletions src/unit/signal.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use nix::sys::signal::{self, SigHandler, Signal};
use std::sync::atomic::{AtomicBool, Ordering};

pub static SIGCHILD_SIGNAL_RECEIVED: AtomicBool = AtomicBool::new(false);

extern "C" fn handle_sigchld(_: libc::c_int) {
SIGCHILD_SIGNAL_RECEIVED.store(true, Ordering::SeqCst);
}

pub fn init_signal_handler() {
unsafe {
signal::signal(Signal::SIGCHLD, SigHandler::Handler(handle_sigchld))
.expect("Error setting SIGUSR1 handler");
}
}
Loading