From a45f17aaa3ed4413b23158091df3c78570e49e19 Mon Sep 17 00:00:00 2001 From: hubertshelley Date: Tue, 27 Feb 2024 11:14:47 +0800 Subject: [PATCH 1/5] scheduler support --- .gitignore | 2 + examples/scheduler/Cargo.lock | 7 + examples/scheduler/Cargo.toml | 10 ++ examples/scheduler/src/main.rs | 24 +++ silent/Cargo.toml | 5 +- silent/src/core/request.rs | 14 +- silent/src/error/mod.rs | 5 +- silent/src/lib.rs | 7 + silent/src/route/root.rs | 9 +- silent/src/scheduler/mod.rs | 193 +++++++++++++++++++++++++ silent/src/scheduler/process_time.rs | 91 ++++++++++++ silent/src/scheduler/storage/mod.rs | 2 + silent/src/scheduler/storage/mysql.rs | 1 + silent/src/scheduler/storage/traits.rs | 8 + silent/src/scheduler/task.rs | 168 +++++++++++++++++++++ silent/src/service/mod.rs | 8 + 16 files changed, 550 insertions(+), 4 deletions(-) create mode 100644 examples/scheduler/Cargo.lock create mode 100644 examples/scheduler/Cargo.toml create mode 100644 examples/scheduler/src/main.rs create mode 100644 silent/src/scheduler/mod.rs create mode 100644 silent/src/scheduler/process_time.rs create mode 100644 silent/src/scheduler/storage/mod.rs create mode 100644 silent/src/scheduler/storage/mysql.rs create mode 100644 silent/src/scheduler/storage/traits.rs create mode 100644 silent/src/scheduler/task.rs diff --git a/.gitignore b/.gitignore index 4fc9956..25b5b9f 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,5 @@ static/ py_env/ .DS_Store build_rs_cov.profraw +migrations/ +/src \ No newline at end of file diff --git a/examples/scheduler/Cargo.lock b/examples/scheduler/Cargo.lock new file mode 100644 index 0000000..e7cecae --- /dev/null +++ b/examples/scheduler/Cargo.lock @@ -0,0 +1,7 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "hello-world" +version = "0.1.0" diff --git a/examples/scheduler/Cargo.toml b/examples/scheduler/Cargo.toml new file mode 100644 index 0000000..ca8403d --- /dev/null +++ b/examples/scheduler/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "example-scheduler" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +chrono = "0.4" +silent = { path = "../../silent", features = ["scheduler"] } diff --git a/examples/scheduler/src/main.rs b/examples/scheduler/src/main.rs new file mode 100644 index 0000000..b42dbd4 --- /dev/null +++ b/examples/scheduler/src/main.rs @@ -0,0 +1,24 @@ +use chrono::Utc; +use silent::prelude::*; +use std::sync::Arc; + +fn main() { + logger::fmt().with_max_level(Level::INFO).init(); + let route = Route::new("").get(|req| async move { + let process_time = Utc::now() + chrono::Duration::seconds(5); + let task = Task::create_with_action_async( + "task_id".to_string(), + process_time.try_into().unwrap(), + "task description".to_string(), + Arc::new(|| { + Box::pin(async { + println!("task run: {:?}", Utc::now()); + Ok(()) + }) + }), + ); + req.scheduler().lock().await.add_task(task)?; + Ok("hello world") + }); + Server::new().run(route); +} diff --git a/silent/Cargo.toml b/silent/Cargo.toml index 8369b8a..b00794c 100644 --- a/silent/Cargo.toml +++ b/silent/Cargo.toml @@ -16,7 +16,7 @@ version.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] default = ["server", "test", "static"] -full = ["admin", "server", "multipart", "ws", "sse", "security", "static", "session", "cookie", "template", "test"] +full = ["admin", "server", "multipart", "ws", "sse", "security", "static", "session", "cookie", "template", "test", "scheduler"] admin = ["server", "sse", "template", "session"] server = ["tokio/fs", "tokio/net", "tokio/rt-multi-thread", "tokio/signal"] ws = [] @@ -29,6 +29,7 @@ cookie = [] template = [] #wasi = ["tokio/sync"] test = ["tokio/macros", "tokio/rt"] +scheduler = [] [dependencies] thiserror = "1.0.53" @@ -71,3 +72,5 @@ http = "1.0.0" http-body = "1.0.0" futures = "0.3.30" tokio-util = "0.7.10" +anyhow = "1.0.79" +cron = "0.12.0" diff --git a/silent/src/core/request.rs b/silent/src/core/request.rs index cf107a5..4191c8d 100644 --- a/silent/src/core/request.rs +++ b/silent/src/core/request.rs @@ -5,18 +5,24 @@ use crate::core::req_body::ReqBody; #[cfg(feature = "multipart")] use crate::core::serde::from_str_multi_val; use crate::header::CONTENT_TYPE; +#[cfg(feature = "scheduler")] +use crate::Scheduler; use crate::{Configs, SilentError}; #[cfg(feature = "cookie")] use cookie::{Cookie, CookieJar}; use http::request::Parts; +use http::Request as BaseRequest; use http::{Extensions, HeaderMap, HeaderValue, Method, Uri, Version}; -use http::{Request as BaseRequest, StatusCode}; use http_body_util::BodyExt; use mime::Mime; use serde::Deserialize; use serde_json::Value; use std::collections::HashMap; use std::net::IpAddr; +#[cfg(feature = "scheduler")] +use std::sync::Arc; +#[cfg(feature = "scheduler")] +use tokio::sync::Mutex; use tokio::sync::OnceCell; use url::form_urlencoded; @@ -346,4 +352,10 @@ impl Request { { self.cookies.get(name.as_ref()) } + #[cfg(feature = "scheduler")] + #[inline] + /// Get `Scheduler` from extensions. + pub fn scheduler(&self) -> &Arc> { + self.extensions().get().unwrap() + } } diff --git a/silent/src/error/mod.rs b/silent/src/error/mod.rs index b0e4888..ed4dafb 100644 --- a/silent/src/error/mod.rs +++ b/silent/src/error/mod.rs @@ -34,7 +34,7 @@ pub enum SilentError { HyperError(#[from] hyper::Error), /// 上传文件读取 错误 #[error("upload file read error `{0}`")] - FileEmpty(#[from] crate::multer::Error), + FileEmpty(#[from] multer::Error), /// Body为空 错误 #[error("body is empty")] BodyEmpty, @@ -56,6 +56,9 @@ pub enum SilentError { /// websocket错误 #[error("websocket error: {0}")] WsError(String), + /// anyhow错误 + #[error("{0}")] + AnyhowError(#[from] anyhow::Error), /// 业务错误 #[error("business error: {msg} ({code})")] BusinessError { diff --git a/silent/src/lib.rs b/silent/src/lib.rs index 6da32b4..c19fa53 100644 --- a/silent/src/lib.rs +++ b/silent/src/lib.rs @@ -11,6 +11,8 @@ pub mod middleware; mod route; #[cfg(feature = "server")] mod rt; +#[cfg(feature = "scheduler")] +mod scheduler; #[cfg(feature = "security")] mod security; #[cfg(feature = "server")] @@ -25,6 +27,7 @@ mod templates; mod ws; // use silent_multer as multer; +#[allow(unused_imports)] #[allow(clippy::single_component_path_imports)] use multer; // use silent_tokio_tungstenite as tokio_tungstenite; @@ -40,6 +43,8 @@ pub use handler::Handler; pub use handler::HandlerWrapper; pub use headers; pub use hyper::{header, Method, StatusCode}; +#[cfg(feature = "scheduler")] +pub use scheduler::{ProcessTime, Scheduler, Task}; pub mod prelude { pub use crate::configs::Configs; @@ -60,6 +65,8 @@ pub mod prelude { pub use crate::route::handler_append::WSHandlerAppend; pub use crate::route::handler_append::{HandlerAppend, HandlerGetter}; pub use crate::route::{Route, RouteService}; + #[cfg(feature = "scheduler")] + pub use crate::scheduler::Task; #[cfg(feature = "security")] pub use crate::security::{argon2, pbkdf2}; #[cfg(feature = "server")] diff --git a/silent/src/route/root.rs b/silent/src/route/root.rs index 4ec65e9..0f2dcf9 100644 --- a/silent/src/route/root.rs +++ b/silent/src/route/root.rs @@ -5,7 +5,7 @@ use crate::route::Route; use crate::session::SessionMiddleware; #[cfg(feature = "template")] use crate::templates::TemplateMiddleware; -use crate::{Configs, MiddleWareHandler, Request, Response, SilentError, StatusCode}; +use crate::{Configs, MiddleWareHandler, Request, Response, Scheduler, SilentError, StatusCode}; #[cfg(feature = "session")] use async_session::{Session, SessionStore}; use chrono::Utc; @@ -13,6 +13,7 @@ use std::fmt; use std::future::Future; use std::net::SocketAddr; use std::sync::Arc; +use tokio::sync::Mutex; #[derive(Clone, Default)] pub struct RootRoute { @@ -22,6 +23,8 @@ pub struct RootRoute { #[cfg(feature = "session")] pub(crate) session_set: bool, pub(crate) configs: Option, + #[cfg(feature = "scheduler")] + pub(crate) scheduler: Arc>, } impl fmt::Debug for RootRoute { @@ -45,6 +48,8 @@ impl RootRoute { #[cfg(feature = "session")] session_set: false, configs: None, + #[cfg(feature = "scheduler")] + scheduler: Arc::new(Mutex::new(Scheduler::new())), } } @@ -108,6 +113,8 @@ impl RootRoute { root_middlewares.push(middleware); } } + #[cfg(feature = "scheduler")] + req.extensions_mut().insert(self.scheduler.clone()); let mut pre_res = Response::empty(); let mut exception_str = None; let res: SilentResult = { diff --git a/silent/src/scheduler/mod.rs b/silent/src/scheduler/mod.rs new file mode 100644 index 0000000..6a6d8f7 --- /dev/null +++ b/silent/src/scheduler/mod.rs @@ -0,0 +1,193 @@ +mod process_time; +mod storage; +mod task; + +use anyhow::{anyhow, Result}; +use std::sync::Arc; +use std::thread; +use tokio::sync::Mutex; +use tracing::{error, info}; + +pub use process_time::ProcessTime; +pub use task::Task; + +#[derive(Debug, Clone)] +pub struct Scheduler { + tasks: Vec, + schedule: bool, +} + +impl Default for Scheduler { + fn default() -> Self { + Self::new() + } +} + +impl Scheduler { + pub fn new() -> Self { + Self { + tasks: Vec::new(), + schedule: true, + } + } + + pub fn add_task(&mut self, task: Task) -> Result<()> { + if self.tasks.iter().any(|t| t.id == task.id) { + return Err(anyhow!(format!("task {id} already exists!", id = task.id))); + } + info!( + "task: ID:{:?} Description:{:?} ProcessTime:{:?} add success!", + task.id, task.description, task.process_time + ); + self.tasks.push(task); + Ok(()) + } + + pub fn remove_task(&mut self, id: &str) { + info!("task: ID:{:?} remove success!", id); + self.tasks.retain(|t| t.id != id); + } + + pub fn remove_task_sub(&mut self, id: &str) { + info!("sub task: ID:{:?} remove success!", id); + self.tasks.retain(|t| t.id.starts_with(id)); + } + + pub fn get_task(&self, id: &str) -> Option<&Task> { + self.tasks.iter().find(|t| t.id == id) + } + + pub fn get_tasks(&self) -> &Vec { + &self.tasks + } + pub async fn run(&mut self) { + let mut removable_list = Vec::new(); + for task in self.tasks.clone() { + if task.is_removable() { + removable_list.push(task.id.clone()); + } + if task.is_async { + tokio::spawn(async move { + match task.clone().run_async().await { + Ok(_) => info!("task: ID:{:?} Description:{:?} ProcessTime:{:?} run success!", task.id, task.description, task.process_time), + Err(e) => error!("task: ID:{:?} Description:{:?} ProcessTime:{:?} run failed! error: {:?}", task.id, task.description, task.process_time, e), + } + }); + } else { + thread::spawn(move || match task.clone().run() { + Ok(_) => info!( + "task: ID:{:?} Description:{:?} ProcessTime:{:?} run success!", + task.id, task.description, task.process_time + ), + Err(e) => error!( + "task: ID:{:?} Description:{:?} ProcessTime:{:?} run failed! error: {:?}", + task.id, task.description, task.process_time, e + ), + }); + } + } + for id in removable_list { + self.remove_task(&id); + } + } + + pub fn stop(&mut self) { + self.schedule = false; + } + + pub async fn schedule(schedule: Arc>) { + loop { + schedule.lock().await.run().await; + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + if !schedule.lock().await.schedule { + schedule.lock().await.schedule = true; + break; + } + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::Arc; + use tokio::sync::Mutex; + + use crate::scheduler::process_time::ProcessTime; + use crate::scheduler::task::Task; + use crate::scheduler::Scheduler; + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn test_scheduler_async() { + let mut scheduler = Scheduler::new(); + let counter = Arc::new(AtomicUsize::new(0)); + let counter_clone1 = counter.clone(); + let sync_task = Task::create_with_action( + "sync_task".to_string(), + ProcessTime::try_from("2015-01-01T00:00:00Z".to_string()).unwrap(), + "sync_task".to_string(), + Arc::new(move || { + counter_clone1.fetch_add(1, Ordering::SeqCst); + Ok(()) + }), + ); + scheduler.add_task(sync_task).unwrap(); + let counter_clone2 = counter.clone(); + let async_task = Task::create_with_action_async( + "async_task".to_string(), + ProcessTime::try_from("2015-01-01T00:00:00Z".to_string()).unwrap(), + "async_task".to_string(), + Arc::new(move || { + let counter_clone = counter_clone2.clone(); + Box::pin(async move { + counter_clone.fetch_add(1, Ordering::SeqCst); + Ok(()) + }) + }), + ); + scheduler.add_task(async_task.clone()).unwrap(); + println!("{:?}", scheduler.get_tasks()); + assert_eq!(scheduler.get_tasks().len(), 2); + assert!(scheduler.add_task(async_task.clone()).is_err()); + scheduler.remove_task(&async_task.id); + scheduler.add_task(async_task).unwrap(); + assert!(scheduler.get_task("async_task").is_some()); + let arc_scheduler = Arc::new(Mutex::new(scheduler)); + let arc_scheduler_clone = arc_scheduler.clone(); + tokio::spawn(async move { + Scheduler::schedule(arc_scheduler_clone).await; + }); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + assert_eq!(counter.load(Ordering::SeqCst), 2); + arc_scheduler.lock().await.stop(); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn test_scheduler() { + let mut scheduler = Scheduler::new(); + let counter = Arc::new(AtomicUsize::new(0)); + let counter_clone = counter.clone(); + let async_task = Task::create_with_action_async( + "async_task".to_string(), + ProcessTime::try_from("*/5 * * * * * *".to_string()).unwrap(), + "async_task".to_string(), + Arc::new(move || { + let counter_clone = counter_clone.clone(); + Box::pin(async move { + counter_clone.fetch_add(1, Ordering::SeqCst); + Ok(()) + }) + }), + ); + scheduler.add_task(async_task).unwrap(); + assert!(scheduler.get_task("async_task").is_some()); + let arc_scheduler = Arc::new(Mutex::new(scheduler)); + let arc_scheduler_clone = arc_scheduler.clone(); + tokio::spawn(async move { + Scheduler::schedule(arc_scheduler_clone).await; + }); + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + assert_eq!(counter.load(Ordering::SeqCst), 1); + arc_scheduler.lock().await.stop(); + } +} diff --git a/silent/src/scheduler/process_time.rs b/silent/src/scheduler/process_time.rs new file mode 100644 index 0000000..dcf5d5f --- /dev/null +++ b/silent/src/scheduler/process_time.rs @@ -0,0 +1,91 @@ +use chrono::{DateTime, Local, Utc}; +use cron::Schedule; +use serde::{Serialize, Serializer}; +use std::str::FromStr; + +#[derive(Debug, Clone)] +pub enum ProcessTime { + Datetime(DateTime), + Crontab(Schedule), +} + +impl Serialize for ProcessTime { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + match self { + ProcessTime::Datetime(d) => serializer.serialize_str(&d.to_string()), + ProcessTime::Crontab(s) => serializer.serialize_str(&s.to_string()), + } + } +} + +impl TryFrom for ProcessTime { + type Error = anyhow::Error; + + fn try_from(value: String) -> Result { + match DateTime::::from_str(&value) { + Ok(datetime) => Ok(ProcessTime::Datetime(datetime)), + Err(_) => Ok(ProcessTime::Crontab(Schedule::from_str(&value)?)), + } + } +} + +impl TryFrom<&str> for ProcessTime { + type Error = anyhow::Error; + + fn try_from(value: &str) -> Result { + match DateTime::::from_str(value) { + Ok(datetime) => Ok(ProcessTime::Datetime(datetime)), + Err(_) => Ok(ProcessTime::Crontab(Schedule::from_str(value)?)), + } + } +} + +impl TryFrom> for ProcessTime { + type Error = anyhow::Error; + + fn try_from(value: DateTime) -> Result { + Ok(ProcessTime::Datetime(value)) + } +} + +impl TryFrom> for ProcessTime { + type Error = anyhow::Error; + + fn try_from(value: DateTime) -> Result { + Ok(ProcessTime::Datetime(value.into())) + } +} + +impl ProcessTime { + pub(crate) fn is_active(&self) -> bool { + match self { + ProcessTime::Datetime(datetime) => { + datetime.timestamp_millis() <= Local::now().timestamp_millis() + } + ProcessTime::Crontab(crontab) => crontab.includes(Local::now()), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_process_time() { + let datetime = Local::now(); + let process_time = ProcessTime::Datetime(datetime + chrono::Duration::seconds(10)); + assert!(!process_time.is_active()); + let process_time = ProcessTime::try_from(datetime).unwrap(); + assert!(process_time.is_active()); + let process_time = ProcessTime::Crontab(Schedule::from_str("* * * * * *").unwrap()); + assert!(process_time.is_active()); + let process_time = ProcessTime::Crontab(Schedule::from_str("0 0 0 1 1 ? 2015").unwrap()); + assert!(!process_time.is_active()); + assert!(ProcessTime::try_from("2023-01-01T00:00:00Z".to_string()).is_ok()); + assert!(ProcessTime::try_from("2023-01-01 00:00:00").is_err()); + } +} diff --git a/silent/src/scheduler/storage/mod.rs b/silent/src/scheduler/storage/mod.rs new file mode 100644 index 0000000..cd5b898 --- /dev/null +++ b/silent/src/scheduler/storage/mod.rs @@ -0,0 +1,2 @@ +mod mysql; +mod traits; diff --git a/silent/src/scheduler/storage/mysql.rs b/silent/src/scheduler/storage/mysql.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/silent/src/scheduler/storage/mysql.rs @@ -0,0 +1 @@ + diff --git a/silent/src/scheduler/storage/traits.rs b/silent/src/scheduler/storage/traits.rs new file mode 100644 index 0000000..14e6a66 --- /dev/null +++ b/silent/src/scheduler/storage/traits.rs @@ -0,0 +1,8 @@ +use async_trait::async_trait; + +// 定时任务存储Trait +#[async_trait] +trait Storage { + async fn load(&mut self); + async fn save(&mut self); +} diff --git a/silent/src/scheduler/task.rs b/silent/src/scheduler/task.rs new file mode 100644 index 0000000..9cc694d --- /dev/null +++ b/silent/src/scheduler/task.rs @@ -0,0 +1,168 @@ +use crate::scheduler::process_time::ProcessTime; +use anyhow::Result; +use serde::Serialize; +use std::fmt::Debug; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; + +pub type JobToRun = dyn Fn() -> Result<()> + Send + Sync; +pub type JobToRunAsync = dyn Fn() -> Pin> + Send>> + Send + Sync; + +#[derive(Clone, Serialize)] +pub struct Task { + pub id: String, + pub process_time: ProcessTime, + pub description: String, + #[serde(skip)] + action: Arc, + #[serde(skip)] + action_async: Arc, + #[serde(skip)] + pub(crate) is_async: bool, +} + +impl Debug for Task { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Task") + .field("id", &self.id) + .field("process_time", &self.process_time) + .field("description", &self.description) + .field("is_async", &self.is_async) + .finish() + } +} + +impl Task { + pub(crate) fn run(&self) -> Result<()> { + match self.is_async { + true => Err(anyhow::anyhow!("async task not support run")), + false => match self.process_time.is_active() { + true => (self.action.clone())(), + false => Ok(()), + }, + } + } + pub(crate) async fn run_async(&self) -> Result<()> { + match self.is_async { + true => match self.process_time.is_active() { + true => (self.action_async.clone())().await, + false => Ok(()), + }, + false => Err(anyhow::anyhow!("sync task not support run_async")), + } + } + + pub fn create_with_action( + id: String, + process_time: ProcessTime, + description: String, + action: Arc, + ) -> Self { + Self { + id, + process_time, + description, + action, + action_async: Arc::new(|| Box::pin(async { Ok(()) })), + is_async: false, + } + } + + pub fn create_with_action_async( + id: String, + process_time: ProcessTime, + description: String, + action_async: Arc, + ) -> Self { + Self { + id, + process_time, + description, + action: Arc::new(|| Ok(())), + action_async, + is_async: true, + } + } + + pub(crate) fn is_removable(&self) -> bool { + match self.process_time { + ProcessTime::Datetime(_) => self.process_time.is_active(), + ProcessTime::Crontab(_) => false, + } + } +} + +#[cfg(test)] +mod tests { + use crate::scheduler::process_time::ProcessTime; + use crate::scheduler::task::Task; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::Arc; + + #[test] + fn test_task() { + let counter = Arc::new(AtomicUsize::new(0)); + let counter_clone = counter.clone(); + let mut task = Task::create_with_action( + "test".to_string(), + ProcessTime::try_from("9999-01-01T00:00:00Z".to_string()).unwrap(), + "test".to_string(), + Arc::new(move || { + counter_clone.fetch_add(1, Ordering::SeqCst); + Ok(()) + }), + ); + println!("{:?}", task); + task.run().unwrap(); + assert_eq!(counter.load(Ordering::SeqCst), 0); + task.process_time = ProcessTime::try_from("2023-01-01T00:00:00Z".to_string()).unwrap(); + task.run().unwrap(); + assert_eq!(counter.load(Ordering::SeqCst), 1); + } + + #[tokio::test] + async fn test_task_async_error() { + let sync_task = Task::create_with_action( + "test".to_string(), + ProcessTime::try_from("2015-01-01T00:00:00Z".to_string()).unwrap(), + "test".to_string(), + Arc::new(move || Ok(())), + ); + assert!(sync_task.run_async().await.is_err()); + assert!(sync_task.run().is_ok()); + assert!(sync_task.is_removable()); + let async_task = Task::create_with_action_async( + "test".to_string(), + ProcessTime::try_from("* * * * * * *".to_string()).unwrap(), + "test".to_string(), + Arc::new(move || Box::pin(async move { Ok(()) })), + ); + assert!(async_task.run().is_err()); + assert!(async_task.run_async().await.is_ok()); + assert!(!async_task.is_removable()) + } + + #[tokio::test] + async fn test_task_async() { + let counter = Arc::new(AtomicUsize::new(0)); + let counter_clone = counter.clone(); + let mut task = Task::create_with_action_async( + "test".to_string(), + ProcessTime::try_from("9999-01-01T00:00:00Z".to_string()).unwrap(), + "test".to_string(), + Arc::new(move || { + let counter_clone = counter_clone.clone(); + Box::pin(async move { + counter_clone.fetch_add(1, Ordering::SeqCst); + Ok(()) + }) + }), + ); + task.run_async().await.unwrap(); + assert_eq!(counter.load(Ordering::SeqCst), 0); + task.process_time = ProcessTime::try_from("2023-01-01T00:00:00Z".to_string()).unwrap(); + task.run_async().await.unwrap(); + assert_eq!(counter.load(Ordering::SeqCst), 1); + } +} diff --git a/silent/src/service/mod.rs b/silent/src/service/mod.rs index 9510347..926e181 100644 --- a/silent/src/service/mod.rs +++ b/silent/src/service/mod.rs @@ -5,6 +5,8 @@ use crate::conn::SilentConnection; use crate::route::RouteService; use crate::service::serve::Serve; use crate::Configs; +#[cfg(feature = "scheduler")] +use crate::Scheduler; use std::net::SocketAddr; use std::sync::Arc; use tokio::net::TcpListener; @@ -99,6 +101,12 @@ impl Server { root_route.set_configs(configs.clone()); #[cfg(feature = "session")] root_route.check_session(); + #[cfg(feature = "scheduler")] + let scheduler = root_route.scheduler.clone(); + #[cfg(feature = "scheduler")] + tokio::spawn(async move { + Scheduler::schedule(scheduler).await; + }); loop { #[cfg(unix)] From a69a8ba762284166ec65abbe463d545847cecb71 Mon Sep 17 00:00:00 2001 From: hubertshelley Date: Tue, 27 Feb 2024 14:26:29 +0800 Subject: [PATCH 2/5] publish v1.1.0 scheduler support --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index a128baf..23bf316 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,4 +10,4 @@ homepage = "https://github.com/hubertshelley/silent" license = "Apache-2.0" readme = "./readme.md" repository = "https://github.com/hubertshelley/silent" -version = "1.0.9" +version = "1.1.0" From a0ed703182d795d4ca1010d04253ba707b1a6845 Mon Sep 17 00:00:00 2001 From: hubertshelley Date: Tue, 27 Feb 2024 14:30:52 +0800 Subject: [PATCH 3/5] publish v1.1.0 scheduler support --- silent/src/route/root.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/silent/src/route/root.rs b/silent/src/route/root.rs index 0f2dcf9..690d57e 100644 --- a/silent/src/route/root.rs +++ b/silent/src/route/root.rs @@ -5,7 +5,9 @@ use crate::route::Route; use crate::session::SessionMiddleware; #[cfg(feature = "template")] use crate::templates::TemplateMiddleware; -use crate::{Configs, MiddleWareHandler, Request, Response, Scheduler, SilentError, StatusCode}; +#[cfg(feature = "scheduler")] +use crate::Scheduler; +use crate::{Configs, MiddleWareHandler, Request, Response, SilentError, StatusCode}; #[cfg(feature = "session")] use async_session::{Session, SessionStore}; use chrono::Utc; @@ -13,6 +15,7 @@ use std::fmt; use std::future::Future; use std::net::SocketAddr; use std::sync::Arc; +#[cfg(feature = "scheduler")] use tokio::sync::Mutex; #[derive(Clone, Default)] From 9315098979fdfd0d815c029adc62fceffcd75f9e Mon Sep 17 00:00:00 2001 From: hubertshelley Date: Tue, 27 Feb 2024 16:29:49 +0800 Subject: [PATCH 4/5] dependencies upgrade --- examples/candle_whisper/Cargo.toml | 14 +++++++------- examples/configs/Cargo.toml | 2 +- examples/custom_handler/Cargo.toml | 2 +- examples/custom_tokio_listener/Cargo.toml | 4 ++-- examples/custom_tokio_runtime/Cargo.toml | 4 ++-- examples/exception_handler/Cargo.toml | 2 +- examples/form/Cargo.toml | 2 +- examples/llma_chat/Cargo.toml | 6 +++--- examples/middleware/Cargo.toml | 2 +- examples/multer_test/Cargo.toml | 2 +- examples/multipart-form/Cargo.toml | 2 +- examples/sse-chat/Cargo.toml | 2 +- examples/templates/Cargo.toml | 2 +- examples/todo/Cargo.toml | 6 +++--- examples/tokenizer_test/Cargo.toml | 4 ++-- examples/tokio-tungstenite_test/Cargo.toml | 2 +- examples/websocket-chat/Cargo.toml | 2 +- examples/websocket/Cargo.toml | 4 ++-- silent/Cargo.toml | 22 +++++++++++----------- 19 files changed, 43 insertions(+), 43 deletions(-) diff --git a/examples/candle_whisper/Cargo.toml b/examples/candle_whisper/Cargo.toml index ac71d05..bf2cd1f 100644 --- a/examples/candle_whisper/Cargo.toml +++ b/examples/candle_whisper/Cargo.toml @@ -17,16 +17,16 @@ cuda = ["candle-core/cuda", "candle-nn/cuda", "candle-transformers/cuda", "dep:b [build-dependencies] anyhow = { version = "1", features = ["backtrace"] } -bindgen_cuda = { version = "0.1.1", optional = true } +bindgen_cuda = { version = "0.1.4", optional = true } [dependencies] -clap = { version = "4.2.4", features = ["derive"] } +clap = { version = "4.5.1", features = ["derive"] } serde = { version = "1.0", features = ["derive"] } silent = { path = "../../silent", features = ["full"] } -symphonia = { version = "0.5.3", features = ["all"] } -anyhow = "1.0.79" -tokio = { version = "1.35.1", features = ["full"] } +symphonia = { version = "0.5.4", features = ["all"] } +anyhow = "1.0.80" +tokio = { version = "1.36.0", features = ["full"] } #candle-core = { version = "0.3.2" } #candle-nn = { version = "0.3.2" } @@ -36,7 +36,7 @@ candle-core = { git = "https://github.com/huggingface/candle" } candle-nn = { git = "https://github.com/huggingface/candle" } candle-transformers = { git = "https://github.com/huggingface/candle" } -tokenizers = { version = "0.15.0", features = ["onig"] } +tokenizers = { version = "0.15.2", features = ["onig"] } rand = "0.8.5" -serde_json = "1.0.109" +serde_json = "1.0.114" byteorder = "1.5.0" diff --git a/examples/configs/Cargo.toml b/examples/configs/Cargo.toml index a1269c6..bfa7ab8 100644 --- a/examples/configs/Cargo.toml +++ b/examples/configs/Cargo.toml @@ -7,4 +7,4 @@ edition = "2021" [dependencies] silent = { path = "../../silent" } -async-trait = "0.1.76" +async-trait = "0.1.77" diff --git a/examples/custom_handler/Cargo.toml b/examples/custom_handler/Cargo.toml index 11de182..cb8b9b2 100644 --- a/examples/custom_handler/Cargo.toml +++ b/examples/custom_handler/Cargo.toml @@ -7,4 +7,4 @@ edition = "2021" [dependencies] silent = { path = "../../silent" } -async-trait = "0.1.76" +async-trait = "0.1.77" diff --git a/examples/custom_tokio_listener/Cargo.toml b/examples/custom_tokio_listener/Cargo.toml index 792056c..e085ace 100644 --- a/examples/custom_tokio_listener/Cargo.toml +++ b/examples/custom_tokio_listener/Cargo.toml @@ -7,5 +7,5 @@ edition = "2021" [dependencies] silent = { path = "../../silent", features = ["full"] } -async-trait = "0.1.76" -tokio = { version = "1.35.1", features = ["full"] } +async-trait = "0.1.77" +tokio = { version = "1.36.0", features = ["full"] } diff --git a/examples/custom_tokio_runtime/Cargo.toml b/examples/custom_tokio_runtime/Cargo.toml index fcddf2c..a0fdeee 100644 --- a/examples/custom_tokio_runtime/Cargo.toml +++ b/examples/custom_tokio_runtime/Cargo.toml @@ -7,5 +7,5 @@ edition = "2021" [dependencies] silent = { path = "../../silent", features = ["full"] } -async-trait = "0.1.76" -tokio = { version = "1.35.1", features = ["full"] } +async-trait = "0.1.77" +tokio = { version = "1.36.0", features = ["full"] } diff --git a/examples/exception_handler/Cargo.toml b/examples/exception_handler/Cargo.toml index 57012e4..aab0cea 100644 --- a/examples/exception_handler/Cargo.toml +++ b/examples/exception_handler/Cargo.toml @@ -7,4 +7,4 @@ edition = "2021" [dependencies] silent = { path = "../../silent", features = ["full"] } -serde = { version = "1.0.193", features = ["derive"] } +serde = { version = "1.0.197", features = ["derive"] } diff --git a/examples/form/Cargo.toml b/examples/form/Cargo.toml index 73ce035..53f2ca7 100644 --- a/examples/form/Cargo.toml +++ b/examples/form/Cargo.toml @@ -7,4 +7,4 @@ edition = "2021" [dependencies] silent = { path = "../../silent" } -serde = { version = "1.0.193", features = ["derive"] } +serde = { version = "1.0.197", features = ["derive"] } diff --git a/examples/llma_chat/Cargo.toml b/examples/llma_chat/Cargo.toml index 6d8f8c0..2203109 100644 --- a/examples/llma_chat/Cargo.toml +++ b/examples/llma_chat/Cargo.toml @@ -6,10 +6,10 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -async-trait = "0.1.76" +async-trait = "0.1.77" llm = "0.1.1" once_cell = "1.19.0" rand = "0.8.5" -serde = { version = "1.0.193", features = ["derive"] } +serde = { version = "1.0.197", features = ["derive"] } silent = { path = "../../silent", features = ["ws"] } -tokio = { version = "1.35.1", features = ["full"] } +tokio = { version = "1.36.0", features = ["full"] } diff --git a/examples/middleware/Cargo.toml b/examples/middleware/Cargo.toml index f3bdbb9..8467cc1 100644 --- a/examples/middleware/Cargo.toml +++ b/examples/middleware/Cargo.toml @@ -6,5 +6,5 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -async-trait = "0.1.76" +async-trait = "0.1.77" silent = { path = "../../silent" } diff --git a/examples/multer_test/Cargo.toml b/examples/multer_test/Cargo.toml index 74bd971..71f3db6 100644 --- a/examples/multer_test/Cargo.toml +++ b/examples/multer_test/Cargo.toml @@ -14,4 +14,4 @@ version.workspace = true bytes = "1.5.0" futures-util = "0.3.30" multer = "3.0.0" -tokio = { version = "1.35.1", features = ["full"] } +tokio = { version = "1.36.0", features = ["full"] } diff --git a/examples/multipart-form/Cargo.toml b/examples/multipart-form/Cargo.toml index 518ee61..dc24894 100644 --- a/examples/multipart-form/Cargo.toml +++ b/examples/multipart-form/Cargo.toml @@ -7,4 +7,4 @@ edition = "2021" [dependencies] silent = { path = "../../silent" } -serde = { version = "1.0.193", features = ["derive"] } +serde = { version = "1.0.197", features = ["derive"] } diff --git a/examples/sse-chat/Cargo.toml b/examples/sse-chat/Cargo.toml index 229e40a..2a5d70d 100644 --- a/examples/sse-chat/Cargo.toml +++ b/examples/sse-chat/Cargo.toml @@ -11,4 +11,4 @@ once_cell = "1" parking_lot = "0.12" tokio = { version = "1", features = ["macros"] } tokio-stream = { version = "0.1", features = ["net"] } -serde = { version = "1.0.193", features = ["derive"] } +serde = { version = "1.0.197", features = ["derive"] } diff --git a/examples/templates/Cargo.toml b/examples/templates/Cargo.toml index be72947..965b738 100644 --- a/examples/templates/Cargo.toml +++ b/examples/templates/Cargo.toml @@ -6,5 +6,5 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -serde = { version = "1.0.193", features = ["derive"] } +serde = { version = "1.0.197", features = ["derive"] } silent = { path = "../../silent", features = ["template"] } diff --git a/examples/todo/Cargo.toml b/examples/todo/Cargo.toml index 6716aec..dc98d10 100644 --- a/examples/todo/Cargo.toml +++ b/examples/todo/Cargo.toml @@ -7,6 +7,6 @@ edition = "2021" [dependencies] silent = { path = "../../silent" } -serde = { version = "1.0.193", features = ["derive"] } -uuid = { version = "1.6.1", features = ["serde", "v4"] } -async-trait = "0.1.76" +serde = { version = "1.0.197", features = ["derive"] } +uuid = { version = "1.7.0", features = ["serde", "v4"] } +async-trait = "0.1.77" diff --git a/examples/tokenizer_test/Cargo.toml b/examples/tokenizer_test/Cargo.toml index 00a8d8c..ea9e84b 100644 --- a/examples/tokenizer_test/Cargo.toml +++ b/examples/tokenizer_test/Cargo.toml @@ -11,7 +11,7 @@ version.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -tokenizers = { version = "0.15.0", features = ["onig"] } -anyhow = "1.0.79" +tokenizers = { version = "0.15.2", features = ["onig"] } +anyhow = "1.0.80" tracing-subscriber = "0.3.18" tracing = "0.1.40" diff --git a/examples/tokio-tungstenite_test/Cargo.toml b/examples/tokio-tungstenite_test/Cargo.toml index c88806e..1e99b99 100644 --- a/examples/tokio-tungstenite_test/Cargo.toml +++ b/examples/tokio-tungstenite_test/Cargo.toml @@ -13,5 +13,5 @@ version.workspace = true [dependencies] futures-channel = "0.3.30" futures-util = "0.3.30" -tokio = { version = "1.35.1", features = ["full"] } +tokio = { version = "1.36.0", features = ["full"] } tokio-tungstenite = "0.21.0" diff --git a/examples/websocket-chat/Cargo.toml b/examples/websocket-chat/Cargo.toml index 1589015..41332fc 100644 --- a/examples/websocket-chat/Cargo.toml +++ b/examples/websocket-chat/Cargo.toml @@ -7,7 +7,7 @@ publish = false [dependencies] silent = { path = "../../silent", features = ["ws"] } -tokio = { version = "1.35.1", features = ["full"] } +tokio = { version = "1.36.0", features = ["full"] } tokio-stream = { version = "0.1.14", features = ["net"] } futures-util = { version = "0.3.30", default-features = false } once_cell = "1.19.0" diff --git a/examples/websocket/Cargo.toml b/examples/websocket/Cargo.toml index fa9f767..924a23d 100644 --- a/examples/websocket/Cargo.toml +++ b/examples/websocket/Cargo.toml @@ -6,9 +6,9 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -async-trait = "0.1.76" +async-trait = "0.1.77" silent = { path = "../../silent", features = ["ws"] } -tokio = { version = "1.35.1", features = ["full"] } +tokio = { version = "1.36.0", features = ["full"] } tokio-tungstenite = "0.21.0" futures-util = "0.3.30" backtrace = "0.3.69" diff --git a/silent/Cargo.toml b/silent/Cargo.toml index b00794c..ad0d433 100644 --- a/silent/Cargo.toml +++ b/silent/Cargo.toml @@ -32,33 +32,33 @@ test = ["tokio/macros", "tokio/rt"] scheduler = [] [dependencies] -thiserror = "1.0.53" -hyper = { version = "1.1.0", features = ["full"] } -tokio = { version = "1.35.1", optional = true } +thiserror = "1.0.57" +hyper = { version = "1.2.0", features = ["full"] } +tokio = { version = "1.36.0", optional = true } bytes = "1.5.0" http-body-util = "0.1.0" tracing = "0.1.40" tracing-subscriber = "0.3.18" -async-trait = "0.1.76" -serde = { version = "1.0.193", features = ["derive"] } -serde_json = "1.0.109" -uuid = "1.6.1" +async-trait = "0.1.77" +serde = { version = "1.0.197", features = ["derive"] } +serde_json = "1.0.114" +uuid = "1.7.0" url = "2.5.0" serde_urlencoded = "0.7.1" multimap = { version = "0.10.0", features = ["serde"] } mime = "0.3.17" -tempfile = "3.9.0" +tempfile = "3.10.1" textnonce = "1.0.0" multer = "3.0.0" #silent-multer = "0.1.0" futures-util = "0.3.30" -chrono = { version = "0.4.31", default-features = false, features = ["clock"] } +chrono = { version = "0.4.34", default-features = false, features = ["clock"] } tokio-tungstenite = "0.21.0" #silent-tokio-tungstenite = "0.1.0" headers = "0.4.0" tokio-stream = { version = "0.1.14", features = ["net"] } pin-project = "1.1" -argon2 = "0.5.2" +argon2 = "0.5.3" pbkdf2 = { version = "0.12", features = ["simple"] } aes-gcm = "0.10.3" aes = "0.8" @@ -72,5 +72,5 @@ http = "1.0.0" http-body = "1.0.0" futures = "0.3.30" tokio-util = "0.7.10" -anyhow = "1.0.79" +anyhow = "1.0.80" cron = "0.12.0" From f53f43f0f146e181770a601aae6ad5936f705522 Mon Sep 17 00:00:00 2001 From: hubertshelley Date: Fri, 1 Mar 2024 11:28:01 +0800 Subject: [PATCH 5/5] dependencies upgrade --- Cargo.toml | 2 +- silent/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 23bf316..d0dac33 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,4 +10,4 @@ homepage = "https://github.com/hubertshelley/silent" license = "Apache-2.0" readme = "./readme.md" repository = "https://github.com/hubertshelley/silent" -version = "1.1.0" +version = "1.1.1" diff --git a/silent/Cargo.toml b/silent/Cargo.toml index ad0d433..25a5ac8 100644 --- a/silent/Cargo.toml +++ b/silent/Cargo.toml @@ -73,4 +73,4 @@ http-body = "1.0.0" futures = "0.3.30" tokio-util = "0.7.10" anyhow = "1.0.80" -cron = "0.12.0" +cron = "0.12.1"