diff --git a/Cargo.toml b/Cargo.toml index c0af5471d..ee85b6ea3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,7 +64,7 @@ strum = { version = "0.26", features = ["derive"] } # tardis # tardis = { version = "0.1.0-rc.15" } # tardis = { path = "../tardis/tardis" } -tardis = { git = "https://github.com/ideal-world/tardis.git", rev = "9e02713" } +tardis = { git = "https://github.com/ideal-world/tardis.git", rev = "366f128" } #spacegate # spacegate-shell = { path = "../spacegate/crates/shell", features = [ diff --git a/backend/middlewares/schedule/Cargo.toml b/backend/middlewares/schedule/Cargo.toml index 0c0db6bb6..0fd38c738 100644 --- a/backend/middlewares/schedule/Cargo.toml +++ b/backend/middlewares/schedule/Cargo.toml @@ -27,10 +27,8 @@ bios-basic = { path = "../../basic", features = ["default"] } bios-sdk-invoke = { path = "../../../frontend/sdks/invoke", features = [ "spi_log", "spi_kv", "event" ], default-features = false } -[dependencies.tokio-cron-scheduler] -git = "https://github.com/4t145/tokio-cron-scheduler.git" -branch = "time-local" -features = ["cron_local"] +tsuki-scheduler = { version = "0.1.3", features= ["cron", "tokio", "async-scheduler"]} + [dev-dependencies] tardis = { workspace = true, features = ["test", "ws-client"] } bios-basic = { path = "../../basic", features = ["default", "test"] } diff --git a/backend/middlewares/schedule/src/serv/schedule_job_serv.rs b/backend/middlewares/schedule/src/serv/schedule_job_serv.rs index e00c7cfd6..420d77f80 100644 --- a/backend/middlewares/schedule/src/serv/schedule_job_serv.rs +++ b/backend/middlewares/schedule/src/serv/schedule_job_serv.rs @@ -1,7 +1,5 @@ use std::collections::HashMap; -use std::future::Future; use std::net::SocketAddr; -use std::pin::Pin; use std::sync::{Arc, OnceLock}; use std::time::Duration; use std::vec; @@ -11,17 +9,15 @@ use bios_sdk_invoke::clients::spi_kv_client::{KvItemDetailResp, SpiKvClient}; use bios_sdk_invoke::clients::spi_log_client::{LogItemFindReq, SpiLogClient}; use bios_sdk_invoke::invoke_enumeration::InvokeModuleKind; use tardis::basic::dto::TardisContext; -use tardis::basic::error::TardisError; use tardis::basic::result::TardisResult; use tardis::cache::AsyncCommands; -use tardis::chrono::{self, Utc}; -use tardis::db::sea_orm::prelude::Uuid; +use tardis::chrono::{self, TimeDelta, Utc}; use tardis::log::{error, info, trace, warn}; use tardis::tokio::sync::RwLock; use tardis::tokio::time; use tardis::web::web_resp::{TardisPage, TardisResp}; use tardis::{serde_json, TardisFuns, TardisFunsInst}; -use tokio_cron_scheduler::{Job, JobScheduler}; +use tsuki_scheduler::prelude::*; use crate::dto::schedule_job_dto::{KvScheduleJobItemDetailResp, ScheduleJobAddOrModifyReq, ScheduleJobInfoResp, ScheduleJobKvSummaryResp, ScheduleTaskInfoResp}; use crate::schedule_config::ScheduleConfig; @@ -241,25 +237,22 @@ impl ScheduleTaskServ { #[derive(Clone)] pub struct OwnedScheduleTaskServ { #[allow(clippy::type_complexity)] - pub code_uuid: Arc>>>, - pub scheduler: Arc, + pub code_uuid: Arc>>, + pub client: AsyncSchedulerClient, } impl OwnedScheduleTaskServ { pub async fn init(funs: &TardisFunsInst, ctx: &TardisContext) -> TardisResult> { let cache_client = funs.cache(); - let mut scheduler = JobScheduler::new().await.expect("fail to create job scheduler for schedule mw"); - scheduler.set_shutdown_handler(Box::new(|| { - Box::pin(async move { - info!("mw-schedule: global scheduler shutted down"); - }) - })); - scheduler.init().await.expect("fail to init job scheduler for schedule mw"); - scheduler.start().await.expect("fail to start job scheduler for schedule mw"); - let code_uuid_cache_raw = Arc::new(RwLock::new(HashMap::>::new())); + let scheduler = AsyncSchedulerRunner::tokio(); + let client = scheduler.client(); + tardis::tokio::spawn(async move { + let _ = scheduler.run().await; + }); + let code_uuid_cache_raw = Arc::new(RwLock::new(HashMap::::new())); let serv_raw = Arc::new(Self { code_uuid: code_uuid_cache_raw, - scheduler: Arc::new(scheduler), + client, }); let serv = serv_raw.clone(); let sync_db_ctx = ctx.clone(); @@ -371,162 +364,146 @@ impl OwnedScheduleTaskServ { let enable_time = job_config.enable_time; let disable_time = job_config.disable_time; // startup cron scheduler - for cron in job_config.cron { + let code = code.clone(); + let ctx = ctx.clone(); + let lock_key = lock_key.clone(); + let mut schedule_builder = job_config.cron.iter().filter_map(|cron| Cron::local_from_cron_expr(cron).ok()).fold(ScheduleDynBuilder::default(), ScheduleDynBuilder::or); + if let Some(enable_time) = enable_time { + schedule_builder = schedule_builder.after(enable_time); + } + if let Some(disable_time) = disable_time { + if disable_time < Utc::now() { + return Ok(()); + } + schedule_builder = schedule_builder.before(disable_time); + } + schedule_builder = schedule_builder.throttling(TimeDelta::minutes(1)); + let task = Task::tokio(schedule_builder, move || { let callback_req = callback_req.try_clone().expect("body should be a string"); let code = code.clone(); - let ctx = ctx.clone(); let lock_key = lock_key.clone(); - let job = Job::new_async(cron.as_str(), move |_uuid: Uuid, _scheduler: JobScheduler| -> Pin + Send>> { - let callback_req = callback_req.try_clone().expect("body should be a string"); - let code = code.clone(); - let lock_key = lock_key.clone(); - let ctx = ctx.clone(); - if let Some(enable_time) = enable_time { - if enable_time > Utc::now() { - return Box::pin(async move { - trace!("schedule task {code} is not enabled yet, skip", code = code); - }); - } - } - if let Some(disable_time) = disable_time { - if disable_time < Utc::now() { - return Box::pin(async move { - trace!("schedule task {code} is disabled, skip", code = code); - }); - } - } - Box::pin(async move { - let cache_client = TardisFuns::cache(); - let funs = TardisFuns::inst_with_db_conn(DOMAIN_CODE.to_string(), None); - // about set and setnx, see: - // 1. https://redis.io/commands/set/ - // 2. https://redis.io/commands/setnx/ - // At Redis version 2.6.12, setnx command is regarded as deprecated. see: https://redis.io/commands/setnx/ - // "executing" could be any string now, it's just a placeholder - match cache_client.set_nx(&lock_key, "executing").await { - Ok(true) => { - // safety: it's ok to unwrap in this closure, scheduler will restart this job when after panic - let Ok(()) = cache_client.expire(&lock_key, distributed_lock_expire_sec as i64).await else { - return; - }; - trace!("executing schedule task {code}"); - // 1. write log exec start - let Ok(_) = SpiLogClient::add( - "schedule_task", - format!("schedule task {} exec start", code).as_str(), - None, - None, - Some(code.to_string()), - Some("exec-start".to_string()), - Some(tardis::chrono::Utc::now().to_rfc3339()), - Some(Utc::now().to_rfc3339()), - None, - None, - &funs, - &ctx, - ) - .await - else { - return; - }; - // 2. request webhook - match TardisFuns::web_client().raw().execute(callback_req).await { - Ok(resp) => { - let status_code = resp.status(); - let remote_addr = resp.remote_addr().as_ref().map(SocketAddr::to_string); - let response_header: HashMap = resp - .headers() - .into_iter() - .filter_map(|(k, v)| { - let v = v.to_str().ok()?.to_string(); - Some((k.to_string(), v)) - }) - .collect(); - let ext = serde_json::json! { - { - "remote_addr": remote_addr, - "status_code": status_code.to_string(), - "headers": response_header - } - }; - let content = resp.text().await.unwrap_or_default(); - // 3.1. write log exec end - let Ok(_) = SpiLogClient::add( - "schedule_task", - &content, - Some(ext), - None, - Some(code.to_string()), - Some("exec-end".to_string()), - None, - Some(Utc::now().to_rfc3339()), - None, - None, - &funs, - &ctx, - ) - .await - else { - return; - }; - } - Err(e) => { - // 3.2. write log exec end - let Ok(_) = SpiLogClient::add( - "schedule_task", - &e.to_string(), - None, - None, - Some(code.to_string()), - Some("exec-fail".to_string()), - None, - Some(Utc::now().to_rfc3339()), - None, - None, - &funs, - &ctx, - ) - .await - else { - return; - }; - } + let ctx = ctx.clone(); + + async move { + let cache_client = TardisFuns::cache(); + let funs = TardisFuns::inst_with_db_conn(DOMAIN_CODE.to_string(), None); + // about set and setnx, see: + // 1. https://redis.io/commands/set/ + // 2. https://redis.io/commands/setnx/ + // At Redis version 2.6.12, setnx command is regarded as deprecated. see: https://redis.io/commands/setnx/ + // "executing" could be any string now, it's just a placeholder + match cache_client.set_nx(&lock_key, "executing").await { + Ok(true) => { + // safety: it's ok to unwrap in this closure, scheduler will restart this job when after panic + let Ok(()) = cache_client.expire(&lock_key, distributed_lock_expire_sec as i64).await else { + return; + }; + trace!("executing schedule task {code}"); + // 1. write log exec start + let Ok(_) = SpiLogClient::add( + "schedule_task", + format!("schedule task {} exec start", code).as_str(), + None, + None, + Some(code.to_string()), + Some("exec-start".to_string()), + Some(tardis::chrono::Utc::now().to_rfc3339()), + Some(Utc::now().to_rfc3339()), + None, + None, + &funs, + &ctx, + ) + .await + else { + return; + }; + // 2. request webhook + match TardisFuns::web_client().raw().execute(callback_req).await { + Ok(resp) => { + let status_code = resp.status(); + let remote_addr = resp.remote_addr().as_ref().map(SocketAddr::to_string); + let response_header: HashMap = resp + .headers() + .into_iter() + .filter_map(|(k, v)| { + let v = v.to_str().ok()?.to_string(); + Some((k.to_string(), v)) + }) + .collect(); + let ext = serde_json::json! { + { + "remote_addr": remote_addr, + "status_code": status_code.to_string(), + "headers": response_header + } + }; + let content = resp.text().await.unwrap_or_default(); + // 3.1. write log exec end + let Ok(_) = SpiLogClient::add( + "schedule_task", + &content, + Some(ext), + None, + Some(code.to_string()), + Some("exec-end".to_string()), + None, + Some(Utc::now().to_rfc3339()), + None, + None, + &funs, + &ctx, + ) + .await + else { + return; + }; + } + Err(e) => { + // 3.2. write log exec end + let Ok(_) = SpiLogClient::add( + "schedule_task", + &e.to_string(), + None, + None, + Some(code.to_string()), + Some("exec-fail".to_string()), + None, + Some(Utc::now().to_rfc3339()), + None, + None, + &funs, + &ctx, + ) + .await + else { + return; + }; } - trace!("executed schedule task {code}"); - } - Ok(false) => { - trace!("schedule task {} is executed by other nodes, skip", code); - } - Err(e) => { - error!("cannot set lock to schedule task {code}, error: {e}"); } + trace!("executed schedule task {code}"); } - }) - }) - .map_err(|err| { - let msg = format!("fail to create job: {}", err); - TardisError::internal_error(&msg, "500-middlewares-schedual-create-task-failed") - })?; - let uuid = self.scheduler.add(job).await.map_err(|err| { - let msg = format!("fail to add job: {}", err); - TardisError::internal_error(&msg, "500-middlewares-schedual-create-task-failed") - })?; - { - self.code_uuid.write().await.entry(job_config.code.to_string()).or_default().push((uuid, cron.clone())); + Ok(false) => { + trace!("schedule task {} is executed by other nodes, skip", code); + } + Err(e) => { + error!("cannot set lock to schedule task {code}, error: {e}"); + } + } } + }); + let task_uid = TaskUid::uuid(); + self.client.add_task(task_uid, task); + { + self.code_uuid.write().await.insert(job_config.code.to_string(), task_uid); } Ok(()) } pub async fn delete(&self, code: &str) -> TardisResult<()> { let mut uuid_cache = self.code_uuid.write().await; - if let Some(tasks) = uuid_cache.get(code) { - for (uuid, _) in tasks { - self.scheduler.remove(uuid).await.map_err(|err| { - let msg = format!("fail to remove job: {}", err); - TardisError::internal_error(&msg, "500-middlewares-schedual-create-task-failed") - })?; - } + if let Some(uid) = uuid_cache.get(code) { + self.client.remove_task(*uid); uuid_cache.remove(code); } Ok(())