Skip to content

Commit

Permalink
schedule: replace schedule lib to support throttling and multi schedu…
Browse files Browse the repository at this point in the history
…le (#786)

* schedule: replace schedule lib to support throttling and multi schedule

* use a newer version for scheduler lib
  • Loading branch information
4t145 authored Jun 27, 2024
1 parent 9c7c8e9 commit 4f836c4
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 166 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ strum = { version = "0.26", features = ["derive"] }
# tardis
# tardis = { version = "0.1.0-rc.15" }
# tardis = { path = "../tardis/tardis" }
tardis = { git = "https://github.com/ideal-world/tardis.git", rev = "9e02713" }
tardis = { git = "https://github.com/ideal-world/tardis.git", rev = "366f128" }
#spacegate

# spacegate-shell = { path = "../spacegate/crates/shell", features = [
Expand Down
6 changes: 2 additions & 4 deletions backend/middlewares/schedule/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@ bios-basic = { path = "../../basic", features = ["default"] }
bios-sdk-invoke = { path = "../../../frontend/sdks/invoke", features = [
"spi_log", "spi_kv", "event"
], default-features = false }
[dependencies.tokio-cron-scheduler]
git = "https://github.com/4t145/tokio-cron-scheduler.git"
branch = "time-local"
features = ["cron_local"]
tsuki-scheduler = { version = "0.1.3", features= ["cron", "tokio", "async-scheduler"]}

[dev-dependencies]
tardis = { workspace = true, features = ["test", "ws-client"] }
bios-basic = { path = "../../basic", features = ["default", "test"] }
Expand Down
299 changes: 138 additions & 161 deletions backend/middlewares/schedule/src/serv/schedule_job_serv.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use std::collections::HashMap;
use std::future::Future;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::{Arc, OnceLock};
use std::time::Duration;
use std::vec;
Expand All @@ -11,17 +9,15 @@ use bios_sdk_invoke::clients::spi_kv_client::{KvItemDetailResp, SpiKvClient};
use bios_sdk_invoke::clients::spi_log_client::{LogItemFindReq, SpiLogClient};
use bios_sdk_invoke::invoke_enumeration::InvokeModuleKind;
use tardis::basic::dto::TardisContext;
use tardis::basic::error::TardisError;
use tardis::basic::result::TardisResult;
use tardis::cache::AsyncCommands;
use tardis::chrono::{self, Utc};
use tardis::db::sea_orm::prelude::Uuid;
use tardis::chrono::{self, TimeDelta, Utc};
use tardis::log::{error, info, trace, warn};
use tardis::tokio::sync::RwLock;
use tardis::tokio::time;
use tardis::web::web_resp::{TardisPage, TardisResp};
use tardis::{serde_json, TardisFuns, TardisFunsInst};
use tokio_cron_scheduler::{Job, JobScheduler};
use tsuki_scheduler::prelude::*;

use crate::dto::schedule_job_dto::{KvScheduleJobItemDetailResp, ScheduleJobAddOrModifyReq, ScheduleJobInfoResp, ScheduleJobKvSummaryResp, ScheduleTaskInfoResp};
use crate::schedule_config::ScheduleConfig;
Expand Down Expand Up @@ -241,25 +237,22 @@ impl ScheduleTaskServ {
#[derive(Clone)]
pub struct OwnedScheduleTaskServ {
#[allow(clippy::type_complexity)]
pub code_uuid: Arc<RwLock<HashMap<String, Vec<(Uuid, String)>>>>,
pub scheduler: Arc<JobScheduler>,
pub code_uuid: Arc<RwLock<HashMap<String, TaskUid>>>,
pub client: AsyncSchedulerClient<Tokio>,
}

impl OwnedScheduleTaskServ {
pub async fn init(funs: &TardisFunsInst, ctx: &TardisContext) -> TardisResult<Arc<Self>> {
let cache_client = funs.cache();
let mut scheduler = JobScheduler::new().await.expect("fail to create job scheduler for schedule mw");
scheduler.set_shutdown_handler(Box::new(|| {
Box::pin(async move {
info!("mw-schedule: global scheduler shutted down");
})
}));
scheduler.init().await.expect("fail to init job scheduler for schedule mw");
scheduler.start().await.expect("fail to start job scheduler for schedule mw");
let code_uuid_cache_raw = Arc::new(RwLock::new(HashMap::<String, Vec<(Uuid, String)>>::new()));
let scheduler = AsyncSchedulerRunner::tokio();
let client = scheduler.client();
tardis::tokio::spawn(async move {
let _ = scheduler.run().await;
});
let code_uuid_cache_raw = Arc::new(RwLock::new(HashMap::<String, TaskUid>::new()));
let serv_raw = Arc::new(Self {
code_uuid: code_uuid_cache_raw,
scheduler: Arc::new(scheduler),
client,
});
let serv = serv_raw.clone();
let sync_db_ctx = ctx.clone();
Expand Down Expand Up @@ -371,162 +364,146 @@ impl OwnedScheduleTaskServ {
let enable_time = job_config.enable_time;
let disable_time = job_config.disable_time;
// startup cron scheduler
for cron in job_config.cron {
let code = code.clone();
let ctx = ctx.clone();
let lock_key = lock_key.clone();
let mut schedule_builder = job_config.cron.iter().filter_map(|cron| Cron::local_from_cron_expr(cron).ok()).fold(ScheduleDynBuilder::default(), ScheduleDynBuilder::or);
if let Some(enable_time) = enable_time {
schedule_builder = schedule_builder.after(enable_time);
}
if let Some(disable_time) = disable_time {
if disable_time < Utc::now() {
return Ok(());
}
schedule_builder = schedule_builder.before(disable_time);
}
schedule_builder = schedule_builder.throttling(TimeDelta::minutes(1));
let task = Task::tokio(schedule_builder, move || {
let callback_req = callback_req.try_clone().expect("body should be a string");
let code = code.clone();
let ctx = ctx.clone();
let lock_key = lock_key.clone();
let job = Job::new_async(cron.as_str(), move |_uuid: Uuid, _scheduler: JobScheduler| -> Pin<Box<dyn Future<Output = ()> + Send>> {
let callback_req = callback_req.try_clone().expect("body should be a string");
let code = code.clone();
let lock_key = lock_key.clone();
let ctx = ctx.clone();
if let Some(enable_time) = enable_time {
if enable_time > Utc::now() {
return Box::pin(async move {
trace!("schedule task {code} is not enabled yet, skip", code = code);
});
}
}
if let Some(disable_time) = disable_time {
if disable_time < Utc::now() {
return Box::pin(async move {
trace!("schedule task {code} is disabled, skip", code = code);
});
}
}
Box::pin(async move {
let cache_client = TardisFuns::cache();
let funs = TardisFuns::inst_with_db_conn(DOMAIN_CODE.to_string(), None);
// about set and setnx, see:
// 1. https://redis.io/commands/set/
// 2. https://redis.io/commands/setnx/
// At Redis version 2.6.12, setnx command is regarded as deprecated. see: https://redis.io/commands/setnx/
// "executing" could be any string now, it's just a placeholder
match cache_client.set_nx(&lock_key, "executing").await {
Ok(true) => {
// safety: it's ok to unwrap in this closure, scheduler will restart this job when after panic
let Ok(()) = cache_client.expire(&lock_key, distributed_lock_expire_sec as i64).await else {
return;
};
trace!("executing schedule task {code}");
// 1. write log exec start
let Ok(_) = SpiLogClient::add(
"schedule_task",
format!("schedule task {} exec start", code).as_str(),
None,
None,
Some(code.to_string()),
Some("exec-start".to_string()),
Some(tardis::chrono::Utc::now().to_rfc3339()),
Some(Utc::now().to_rfc3339()),
None,
None,
&funs,
&ctx,
)
.await
else {
return;
};
// 2. request webhook
match TardisFuns::web_client().raw().execute(callback_req).await {
Ok(resp) => {
let status_code = resp.status();
let remote_addr = resp.remote_addr().as_ref().map(SocketAddr::to_string);
let response_header: HashMap<String, String> = resp
.headers()
.into_iter()
.filter_map(|(k, v)| {
let v = v.to_str().ok()?.to_string();
Some((k.to_string(), v))
})
.collect();
let ext = serde_json::json! {
{
"remote_addr": remote_addr,
"status_code": status_code.to_string(),
"headers": response_header
}
};
let content = resp.text().await.unwrap_or_default();
// 3.1. write log exec end
let Ok(_) = SpiLogClient::add(
"schedule_task",
&content,
Some(ext),
None,
Some(code.to_string()),
Some("exec-end".to_string()),
None,
Some(Utc::now().to_rfc3339()),
None,
None,
&funs,
&ctx,
)
.await
else {
return;
};
}
Err(e) => {
// 3.2. write log exec end
let Ok(_) = SpiLogClient::add(
"schedule_task",
&e.to_string(),
None,
None,
Some(code.to_string()),
Some("exec-fail".to_string()),
None,
Some(Utc::now().to_rfc3339()),
None,
None,
&funs,
&ctx,
)
.await
else {
return;
};
}
let ctx = ctx.clone();

async move {
let cache_client = TardisFuns::cache();
let funs = TardisFuns::inst_with_db_conn(DOMAIN_CODE.to_string(), None);
// about set and setnx, see:
// 1. https://redis.io/commands/set/
// 2. https://redis.io/commands/setnx/
// At Redis version 2.6.12, setnx command is regarded as deprecated. see: https://redis.io/commands/setnx/
// "executing" could be any string now, it's just a placeholder
match cache_client.set_nx(&lock_key, "executing").await {
Ok(true) => {
// safety: it's ok to unwrap in this closure, scheduler will restart this job when after panic
let Ok(()) = cache_client.expire(&lock_key, distributed_lock_expire_sec as i64).await else {
return;
};
trace!("executing schedule task {code}");
// 1. write log exec start
let Ok(_) = SpiLogClient::add(
"schedule_task",
format!("schedule task {} exec start", code).as_str(),
None,
None,
Some(code.to_string()),
Some("exec-start".to_string()),
Some(tardis::chrono::Utc::now().to_rfc3339()),
Some(Utc::now().to_rfc3339()),
None,
None,
&funs,
&ctx,
)
.await
else {
return;
};
// 2. request webhook
match TardisFuns::web_client().raw().execute(callback_req).await {
Ok(resp) => {
let status_code = resp.status();
let remote_addr = resp.remote_addr().as_ref().map(SocketAddr::to_string);
let response_header: HashMap<String, String> = resp
.headers()
.into_iter()
.filter_map(|(k, v)| {
let v = v.to_str().ok()?.to_string();
Some((k.to_string(), v))
})
.collect();
let ext = serde_json::json! {
{
"remote_addr": remote_addr,
"status_code": status_code.to_string(),
"headers": response_header
}
};
let content = resp.text().await.unwrap_or_default();
// 3.1. write log exec end
let Ok(_) = SpiLogClient::add(
"schedule_task",
&content,
Some(ext),
None,
Some(code.to_string()),
Some("exec-end".to_string()),
None,
Some(Utc::now().to_rfc3339()),
None,
None,
&funs,
&ctx,
)
.await
else {
return;
};
}
Err(e) => {
// 3.2. write log exec end
let Ok(_) = SpiLogClient::add(
"schedule_task",
&e.to_string(),
None,
None,
Some(code.to_string()),
Some("exec-fail".to_string()),
None,
Some(Utc::now().to_rfc3339()),
None,
None,
&funs,
&ctx,
)
.await
else {
return;
};
}
trace!("executed schedule task {code}");
}
Ok(false) => {
trace!("schedule task {} is executed by other nodes, skip", code);
}
Err(e) => {
error!("cannot set lock to schedule task {code}, error: {e}");
}
trace!("executed schedule task {code}");
}
})
})
.map_err(|err| {
let msg = format!("fail to create job: {}", err);
TardisError::internal_error(&msg, "500-middlewares-schedual-create-task-failed")
})?;
let uuid = self.scheduler.add(job).await.map_err(|err| {
let msg = format!("fail to add job: {}", err);
TardisError::internal_error(&msg, "500-middlewares-schedual-create-task-failed")
})?;
{
self.code_uuid.write().await.entry(job_config.code.to_string()).or_default().push((uuid, cron.clone()));
Ok(false) => {
trace!("schedule task {} is executed by other nodes, skip", code);
}
Err(e) => {
error!("cannot set lock to schedule task {code}, error: {e}");
}
}
}
});
let task_uid = TaskUid::uuid();
self.client.add_task(task_uid, task);
{
self.code_uuid.write().await.insert(job_config.code.to_string(), task_uid);
}
Ok(())
}

pub async fn delete(&self, code: &str) -> TardisResult<()> {
let mut uuid_cache = self.code_uuid.write().await;
if let Some(tasks) = uuid_cache.get(code) {
for (uuid, _) in tasks {
self.scheduler.remove(uuid).await.map_err(|err| {
let msg = format!("fail to remove job: {}", err);
TardisError::internal_error(&msg, "500-middlewares-schedual-create-task-failed")
})?;
}
if let Some(uid) = uuid_cache.get(code) {
self.client.remove_task(*uid);
uuid_cache.remove(code);
}
Ok(())
Expand Down

0 comments on commit 4f836c4

Please sign in to comment.