diff --git a/Cargo.toml b/Cargo.toml index 3e64b086f..0e38cc0c9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,6 +65,7 @@ strum = { version = "0.26", features = ["derive"] } # tardis = { version = "0.1.0-rc.15" } # tardis = { path = "../tardis/tardis" } tardis = { git = "https://github.com/ideal-world/tardis.git", rev = "1c5f812" } + #spacegate # spacegate-shell = { path = "../spacegate/crates/shell", features = [ diff --git a/backend/middlewares/event/src/api/event_proc_api.rs b/backend/middlewares/event/src/api/event_proc_api.rs index 35f823335..9f69a3b6e 100644 --- a/backend/middlewares/event/src/api/event_proc_api.rs +++ b/backend/middlewares/event/src/api/event_proc_api.rs @@ -18,6 +18,6 @@ impl EventProcApi { #[oai(path = "/:listener_code", method = "get")] async fn ws_process(&self, listener_code: Path, token: Query, websocket: WebSocket) -> BoxWebSocketUpgraded { let funs = get_tardis_inst(); - event_proc_serv::ws_process(listener_code.0, token.0, websocket, &funs).await + event_proc_serv::ws_process(listener_code.0, token.0, websocket, funs).await } } diff --git a/backend/middlewares/event/src/domain.rs b/backend/middlewares/event/src/domain.rs index cffe927d9..3eb2ed414 100644 --- a/backend/middlewares/event/src/domain.rs +++ b/backend/middlewares/event/src/domain.rs @@ -1 +1,2 @@ pub mod event_topic; +pub mod event_persistent; \ No newline at end of file diff --git a/backend/middlewares/event/src/domain/event_persistent.rs b/backend/middlewares/event/src/domain/event_persistent.rs new file mode 100644 index 000000000..3474c3f28 --- /dev/null +++ b/backend/middlewares/event/src/domain/event_persistent.rs @@ -0,0 +1,69 @@ + +use serde::{Deserialize, Serialize}; +use tardis::chrono::Utc; +use tardis::db::sea_orm::{self, DeriveEntityModel, DerivePrimaryKey, DeriveRelation, EntityName, EntityTrait, EnumIter, PrimaryKeyTrait}; +use tardis::serde_json::Value; +use tardis::{chrono, TardisCreateEntity, TardisEmptyBehavior, TardisEmptyRelation}; +/// Event Topic model +/// +/// 事件主题模型 +#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, TardisCreateEntity, TardisEmptyBehavior, TardisEmptyRelation)] +#[sea_orm(table_name = "event_persistent")] +pub struct Model { + #[sea_orm(primary_key, auto_increment = false)] + pub id: String, + pub message: Value, + pub inst_id: String, + pub mgr_node: bool, + pub subscribe_mode: bool, + pub topic: String, + pub status: String, + pub error: Option, + #[sea_orm(extra = "DEFAULT 0")] + pub retry_times: i32, + #[sea_orm(extra = "DEFAULT CURRENT_TIMESTAMP")] + pub create_time: chrono::DateTime, + #[sea_orm(extra = "DEFAULT CURRENT_TIMESTAMP")] + pub update_time: chrono::DateTime, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +pub enum Status { + Sending, + Success, + Failed, + Unknown, +} + +impl Status { + pub const fn as_str(&self) -> &'static str { + match self { + Status::Sending => "Sending", + Status::Success => "Success", + Status::Failed => "Failed", + _ => "Unknown" + } + } +} + +impl std::fmt::Display for Status { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(<&'static str>::from(*self)) + } +} + +impl From for &'static str { + fn from(val: Status) -> Self { + val.as_str() + } +} +impl From<&str> for Status { + fn from(value: &str) -> Self { + match value { + "Sending" => Self::Sending, + "Success" => Self::Success, + "Failed" => Self::Failed, + _ => Status::Unknown, + } + } +} \ No newline at end of file diff --git a/backend/middlewares/event/src/event_config.rs b/backend/middlewares/event/src/event_config.rs index 43c131bed..4e0d04aa8 100644 --- a/backend/middlewares/event/src/event_config.rs +++ b/backend/middlewares/event/src/event_config.rs @@ -13,6 +13,8 @@ pub struct EventConfig { pub event_url: String, pub event_bus_sk: String, pub spi_app_id: String, + pub resend_threshold: u32, + pub resend_interval_sec: Option, } impl Default for EventConfig { @@ -24,6 +26,8 @@ impl Default for EventConfig { event_url: "".to_string(), event_bus_sk: "".to_string(), spi_app_id: "".to_string(), + resend_threshold: 3, + resend_interval_sec: None, } } } diff --git a/backend/middlewares/event/src/event_initializer.rs b/backend/middlewares/event/src/event_initializer.rs index 5d7d677df..1765e1eb1 100644 --- a/backend/middlewares/event/src/event_initializer.rs +++ b/backend/middlewares/event/src/event_initializer.rs @@ -18,7 +18,7 @@ use crate::{ dto::event_dto::EventTopicAddOrModifyReq, event_config::{EventConfig, EventInfo, EventInfoManager}, event_constants::{DOMAIN_CODE, KIND_CODE}, - serv::{self, event_proc_serv::CreateRemoteSenderSubscriber, event_topic_serv::EventDefServ}, + serv::{self, event_proc_serv::CreateRemoteSenderHandler, event_topic_serv::EventDefServ}, }; pub async fn init(web_server: &TardisWebServer) -> TardisResult<()> { @@ -36,7 +36,9 @@ pub async fn init(web_server: &TardisWebServer) -> TardisResult<()> { funs.begin().await?; init_db(DOMAIN_CODE.to_string(), KIND_CODE.to_string(), &funs, &ctx).await?; EventDefServ::init(&funs, &ctx).await?; - funs.commit().await + funs.commit().await?; + init_scan_and_resend_task(); + Ok(()) } async fn init_db(domain_code: String, kind_code: String, funs: &TardisFunsInst, ctx: &TardisContext) -> TardisResult<()> { @@ -115,7 +117,24 @@ async fn init_cluster_resource() { subscribe(listeners().clone()).await; subscribe(mgr_listeners().clone()).await; subscribe(topics().clone()).await; - subscribe(CreateRemoteSenderSubscriber).await; + subscribe(CreateRemoteSenderHandler).await; +} + +fn init_scan_and_resend_task() { + let funs = TardisFuns::inst_with_db_conn(DOMAIN_CODE.to_string(), None); + + let config = funs.conf::(); + let Some(interval_sec) = config.resend_interval_sec else { + return; + }; + let mut interval = tardis::tokio::time::interval(tardis::tokio::time::Duration::from_secs(interval_sec as u64)); + tardis::tokio::spawn(async move { + loop { + interval.tick().await; + let funs = TardisFuns::inst_with_db_conn(DOMAIN_CODE.to_string(), None); + let _ = crate::serv::event_proc_serv::scan_and_resend(funs.into()).await; + } + }); } async fn init_log_ws_client() -> TardisWSClient { diff --git a/backend/middlewares/event/src/serv.rs b/backend/middlewares/event/src/serv.rs index eddea19ba..ae2e5e6d5 100644 --- a/backend/middlewares/event/src/serv.rs +++ b/backend/middlewares/event/src/serv.rs @@ -1,3 +1,4 @@ pub mod event_listener_serv; pub mod event_proc_serv; pub mod event_topic_serv; +pub mod event_persistent_serv; \ No newline at end of file diff --git a/backend/middlewares/event/src/serv/event_persistent_serv.rs b/backend/middlewares/event/src/serv/event_persistent_serv.rs new file mode 100644 index 000000000..e42c1292b --- /dev/null +++ b/backend/middlewares/event/src/serv/event_persistent_serv.rs @@ -0,0 +1,94 @@ +use tardis::basic::result::TardisResult; +use tardis::db::sea_orm::sea_query::{Expr, Query}; +use tardis::db::sea_orm::{ColumnTrait, ConnectionTrait, EntityTrait, QueryFilter, QueryOrder, Set}; +use tardis::futures::{Stream, StreamExt}; +use tardis::web::ws_processor::{TardisWebsocketReq, WsBroadcastContext}; +use tardis::{serde_json, TardisFunsInst}; + +use crate::domain::event_persistent; + +pub struct EventPersistentServ; +impl EventPersistentServ { + pub async fn save_message(message: PersistentMessage, funs: &TardisFunsInst) -> TardisResult<()> { + if let Some(id) = message.req.msg_id.to_owned() { + let db = funs.db(); + let _ = event_persistent::Entity::insert(event_persistent::ActiveModel { + id: Set(id), + message: Set(serde_json::to_value(message.req).expect("TardisWebsocketReq cannot be converted to json")), + status: Set(event_persistent::Status::Sending.to_string()), + topic: Set(message.topic), + inst_id: Set(message.context.inst_id.clone()), + mgr_node: Set(message.context.mgr_node), + subscribe_mode: Set(message.context.subscribe_mode), + ..Default::default() + }) + .exec(db.raw_conn()) + .await?; + } + Ok(()) + } + pub async fn sending(id: String, funs: &TardisFunsInst) -> TardisResult<()> { + use tardis::db::sea_orm::StatementBuilder; + let db = funs.db().raw_conn(); + let query = Query::update() + .table(event_persistent::Entity) + .value(event_persistent::Column::RetryTimes, Expr::col(event_persistent::Column::RetryTimes).add(1)) + .cond_where(event_persistent::Column::Id.eq(id)) + .to_owned(); + let statement = StatementBuilder::build(&query, &db.get_database_backend()); + db.execute(statement).await?; + + Ok(()) + } + pub async fn send_success(id: String, funs: &TardisFunsInst) -> TardisResult<()> { + let db = funs.db().raw_conn(); + event_persistent::Entity::update(event_persistent::ActiveModel { + id: Set(id), + status: Set(event_persistent::Status::Success.to_string()), + ..Default::default() + }) + .filter(event_persistent::Column::Status.eq(event_persistent::Status::Sending.as_str())) + .exec(db) + .await?; + Ok(()) + } + pub async fn send_fail(id: String, error: impl Into, funs: &TardisFunsInst) -> TardisResult<()> { + let db = funs.db().raw_conn(); + event_persistent::Entity::update(event_persistent::ActiveModel { + id: Set(id), + status: Set(event_persistent::Status::Success.to_string()), + error: Set(Some(error.into())), + ..Default::default() + }) + .filter(event_persistent::Column::Status.eq(event_persistent::Status::Sending.as_str())) + .exec(db) + .await?; + Ok(()) + } + + pub async fn scan_failed(funs: &TardisFunsInst, threshold: i32) -> TardisResult + '_> { + let db = funs.db().raw_conn(); + Ok(event_persistent::Entity::find() + .filter(event_persistent::Column::Status.eq(event_persistent::Status::Failed.as_str()).and(event_persistent::Column::RetryTimes.lt(threshold))) + .order_by_desc(event_persistent::Column::UpdateTime) + .stream(db) + .await? + .filter_map(|item| async move { + let item = item.ok()?; + let req = serde_json::from_value::(item.message).ok()?; + let topic = item.topic; + let context = WsBroadcastContext { + inst_id: item.inst_id, + mgr_node: item.mgr_node, + subscribe_mode: item.subscribe_mode, + }; + Some(PersistentMessage { req, context, topic }) + })) + } +} + +pub struct PersistentMessage { + pub req: TardisWebsocketReq, + pub context: WsBroadcastContext, + pub topic: String, +} diff --git a/backend/middlewares/event/src/serv/event_proc_serv.rs b/backend/middlewares/event/src/serv/event_proc_serv.rs index 4df0b7407..522b8a188 100644 --- a/backend/middlewares/event/src/serv/event_proc_serv.rs +++ b/backend/middlewares/event/src/serv/event_proc_serv.rs @@ -1,28 +1,28 @@ +use std::collections::HashMap; use std::sync::Arc; -use std::{borrow::Cow, collections::HashMap}; use serde::{Deserialize, Serialize}; -use tardis::basic::dto::TardisContext; +use tardis::basic::error::TardisError; use tardis::basic::result::TardisResult; use tardis::cluster::cluster_processor::{ClusterEventTarget, TardisClusterMessageReq}; use tardis::cluster::cluster_publish::publish_event_no_response; -use tardis::log::{info, warn}; +use tardis::futures::StreamExt; +use tardis::log::warn; use tardis::serde_json::Value; use tardis::tokio::sync::RwLock; use tardis::web::poem::web::websocket::{BoxWebSocketUpgraded, WebSocket}; -use tardis::web::ws_processor::{ws_broadcast, ws_echo, TardisWebsocketMgrMessage, TardisWebsocketResp}; +use tardis::web::ws_processor::{ws_echo, TardisWebsocketMgrMessage, TardisWebsocketReq, TardisWebsocketResp, WsBroadcast, WsBroadcastContext, WsHooks}; use tardis::{ - cluster::{cluster_broadcast::ClusterBroadcastChannel, cluster_processor::TardisClusterSubscriber}, + cluster::{cluster_broadcast::ClusterBroadcastChannel, cluster_processor::ClusterHandler}, tardis_static, }; use tardis::{TardisFuns, TardisFunsInst}; use crate::dto::event_dto::EventMessageMgrWrap; use crate::event_config::EventConfig; -use crate::event_constants::DOMAIN_CODE; -use crate::event_initializer::{default_log_avatar, ws_log_client}; use super::event_listener_serv::{listeners, mgr_listeners}; +use super::event_persistent_serv::PersistentMessage; use super::event_topic_serv::topics; tardis_static! { @@ -36,14 +36,13 @@ pub struct CreateRemoteSenderEvent { pub capacity: usize, } -pub struct CreateRemoteSenderSubscriber; +pub struct CreateRemoteSenderHandler; -#[async_trait::async_trait] -impl TardisClusterSubscriber for CreateRemoteSenderSubscriber { - fn event_name(&self) -> Cow<'static, str> { +impl ClusterHandler for CreateRemoteSenderHandler { + fn event_name(&self) -> String { "bios/event/create_remote_sender".into() } - async fn subscribe(&self, message_req: TardisClusterMessageReq) -> TardisResult> { + async fn handle(self: Arc, message_req: TardisClusterMessageReq) -> TardisResult> { let CreateRemoteSenderEvent { topic_code, capacity } = TardisFuns::json.json_to_obj(message_req.msg)?; let clst_bc_tx = ClusterBroadcastChannel::new(topic_code.clone(), capacity); @@ -66,7 +65,7 @@ pub async fn get_or_init_sender(topic_code: String, capacity: usize) -> Arc Arc BoxWebSocketUpgraded { +pub struct Hooks { + persistent: bool, + need_mgr: bool, + topic_code: String, + funs: Arc, +} + +impl WsHooks for Hooks { + async fn on_fail(&self, id: String, error: TardisError, _context: &WsBroadcastContext) { + if self.persistent { + let result = super::event_persistent_serv::EventPersistentServ::send_fail(id, error.to_string(), &self.funs).await; + if let Err(error) = result { + warn!("[Event] send fail failed: {error}"); + } + } + } + async fn on_success(&self, id: String, _context: &WsBroadcastContext) { + if self.persistent { + let result = super::event_persistent_serv::EventPersistentServ::send_success(id, &self.funs).await; + if let Err(error) = result { + warn!("[Event] send fail failed: {error}"); + } + } + } + async fn on_process(&self, req_msg: TardisWebsocketReq, context: &WsBroadcastContext) -> Option { + if self.persistent { + let result = super::event_persistent_serv::EventPersistentServ::save_message( + PersistentMessage { + req: req_msg.clone(), + context: context.clone(), + topic: self.topic_code.clone(), + }, + &self.funs, + ) + .await; + if let Err(error) = result { + warn!("[Event] save message failed: {error}"); + } + } + if !self.need_mgr || context.mgr_node { + return Some(TardisWebsocketResp { + msg: req_msg.msg, + to_avatars: req_msg.to_avatars.unwrap_or_default(), + ignore_avatars: vec![], + }); + } + // TODO set cache + let topic_code = self.topic_code.clone(); + let msg_avatar = if let Some(req_event_code) = req_msg.event.clone() { + mgr_listeners().get((topic_code.clone(), req_event_code)).await + } else { + mgr_listeners().get((topic_code.clone(), Default::default())).await + }; + let Ok(Some(msg_avatar)) = msg_avatar else { + warn!( + "[Event] topic [{}] event code [{}] management node not found", + topic_code, + &req_msg.event.unwrap_or_default() + ); + return None; + }; + Some(TardisWebsocketResp { + msg: TardisFuns::json + .obj_to_json(&EventMessageMgrWrap { + msg: req_msg.msg, + ori_from_avatar: req_msg.from_avatar, + ori_to_avatars: req_msg.to_avatars, + }) + .expect("EventMessageMgrWrap not a valid json value"), + to_avatars: vec![msg_avatar.clone()], + ignore_avatars: vec![], + }) + } +} + +pub(crate) async fn ws_process(listener_code: String, token: String, websocket: WebSocket, funs: TardisFunsInst) -> BoxWebSocketUpgraded { let Ok(Some(listener)) = listeners().get(listener_code.clone()).await else { return ws_error(listener_code, "listener not found", websocket); }; if listener.token != token { return ws_error(listener_code, "permission check failed", websocket); } - let Ok(Some(topic)) = topics().get(listener.topic_code.clone()).await else { return ws_error(listener_code, "topic not found", websocket); }; - let need_mgr = topic.need_mgr; - let save_message = topic.save_message; - let is_mgr = listener.mgr; - let sender = get_or_init_sender(listener.topic_code.clone(), topic.queue_size as usize).await; - ws_broadcast( - listener.avatars.clone(), - listener.mgr, - listener.subscribe_mode, - HashMap::from([ - ("listener_code".to_string(), listener_code), - ("topic_code".to_string(), listener.topic_code.clone()), - ("spi_app_id".to_string(), funs.conf::().spi_app_id.clone()), - ]), - websocket, + WsBroadcast::new( sender, - move |req_msg, ext| async move { - if save_message { - let spi_app_id = ext.get("spi_app_id").expect("spi_app_id was modified unexpectedly"); - if spi_app_id.is_empty() { - info!("[Event] MESSAGE LOG: {}", TardisFuns::json.obj_to_string(&req_msg).expect("req_msg not a valid json value")); - } else { - use bios_sdk_invoke::clients::spi_log_client::{LogItemAddReq, SpiLogEventExt}; - let ws_client = ws_log_client().await; - let ctx = TardisContext { - owner: spi_app_id.clone(), - ..Default::default() - }; - let req = LogItemAddReq { - tag: DOMAIN_CODE.to_string(), - content: TardisFuns::json.obj_to_string(&req_msg).expect("req_msg not a valid json value"), - kind: None, - ext: None, - key: None, - op: None, - rel_key: None, - id: None, - ts: None, - owner: Some(ctx.owner.clone()), - own_paths: Some(ctx.own_paths.clone()), - }; - if let Err(e) = ws_client.publish_add_log(&req, default_log_avatar().await.clone(), spi_app_id.clone(), &ctx).await { - warn!("[BIOS.Event] publish log fail: {}", e); - } - } - } - if !need_mgr || is_mgr { - return Some(TardisWebsocketResp { - msg: req_msg.msg, - to_avatars: req_msg.to_avatars.unwrap_or_default(), - ignore_avatars: vec![], - }); - } - // TODO set cache - let topic_code = ext.get("topic_code").expect("topic_code was modified unexpectedly"); - let msg_avatar = if let Some(req_event_code) = req_msg.event.clone() { - mgr_listeners().get((topic_code.clone(), req_event_code)).await - } else { - mgr_listeners().get((topic_code.clone(), Default::default())).await - }; - let Ok(Some(msg_avatar)) = msg_avatar else { - warn!( - "[Event] topic [{}] event code [{}] management node not found", - topic_code, - &req_msg.event.unwrap_or_default() - ); - return None; - }; - Some(TardisWebsocketResp { - msg: TardisFuns::json - .obj_to_json(&EventMessageMgrWrap { - msg: req_msg.msg, - ori_from_avatar: req_msg.from_avatar, - ori_to_avatars: req_msg.to_avatars, - }) - .expect("EventMessageMgrWrap not a valid json value"), - to_avatars: vec![msg_avatar.clone()], - ignore_avatars: vec![], - }) + Hooks { + persistent: topic.save_message, + need_mgr: topic.need_mgr, + topic_code: listener.topic_code.clone(), + funs: Arc::new(funs), }, - |_, _| async move {}, + WsBroadcastContext::new(listener.mgr, listener.subscribe_mode), ) + .run(listener.avatars.clone(), websocket) .await } @@ -185,3 +188,41 @@ fn ws_error(req_session: String, error: &str, websocket: WebSocket) -> BoxWebSoc |_, _| async move {}, ) } + +pub async fn scan_and_resend(funs: Arc) -> TardisResult<()> { + let config = funs.conf::(); + let threshold = config.resend_threshold; + let scanner = super::event_persistent_serv::EventPersistentServ::scan_failed(&funs, threshold as i32).await?; + let mut scanner = std::pin::pin!(scanner); + while let Some(PersistentMessage { req, context, topic }) = scanner.next().await { + let funs = funs.clone(); + tardis::tokio::spawn(async move { + let Some(id) = req.msg_id.clone() else { + warn!("[Bios.Event] msg_id not found: {req:?}"); + return Ok(()); + }; + let Ok(Some(topic_resp)) = topics().get(topic.clone()).await else { + warn!("[Bios.Event] topic not found: {topic}"); + return Ok(()); + }; + let sender = get_or_init_sender(topic_resp.code.clone(), topic_resp.queue_size as usize).await; + super::event_persistent_serv::EventPersistentServ::sending(id.clone(), &funs).await?; + let broadcast = WsBroadcast::new( + sender, + Hooks { + persistent: topic_resp.save_message, + need_mgr: topic_resp.need_mgr, + topic_code: topic_resp.code, + funs: funs.clone(), + }, + context, + ); + let resend_result = broadcast.handle_req(req).await; + if let Err(error) = resend_result { + super::event_persistent_serv::EventPersistentServ::send_fail(id, error, &funs).await?; + } + TardisResult::Ok(()) + }); + } + Ok(()) +}