Skip to content

Commit

Permalink
evet: Event persistent support (#760)
Browse files Browse the repository at this point in the history
* schedule: support more detailed callback

* support event persistent storage

* update tardis version

---------

Co-authored-by: hermitCode <297984816@qq.com>
  • Loading branch information
4t145 and LiJieLong authored Jun 5, 2024
1 parent e2cd635 commit bbb2186
Show file tree
Hide file tree
Showing 9 changed files with 327 additions and 97 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ strum = { version = "0.26", features = ["derive"] }
# tardis = { version = "0.1.0-rc.15" }
# tardis = { path = "../tardis/tardis" }
tardis = { git = "https://github.com/ideal-world/tardis.git", rev = "1c5f812" }

#spacegate

# spacegate-shell = { path = "../spacegate/crates/shell", features = [
Expand Down
2 changes: 1 addition & 1 deletion backend/middlewares/event/src/api/event_proc_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ impl EventProcApi {
#[oai(path = "/:listener_code", method = "get")]
async fn ws_process(&self, listener_code: Path<String>, token: Query<String>, websocket: WebSocket) -> BoxWebSocketUpgraded {
let funs = get_tardis_inst();
event_proc_serv::ws_process(listener_code.0, token.0, websocket, &funs).await
event_proc_serv::ws_process(listener_code.0, token.0, websocket, funs).await
}
}
1 change: 1 addition & 0 deletions backend/middlewares/event/src/domain.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod event_topic;
pub mod event_persistent;
69 changes: 69 additions & 0 deletions backend/middlewares/event/src/domain/event_persistent.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@

use serde::{Deserialize, Serialize};
use tardis::chrono::Utc;
use tardis::db::sea_orm::{self, DeriveEntityModel, DerivePrimaryKey, DeriveRelation, EntityName, EntityTrait, EnumIter, PrimaryKeyTrait};
use tardis::serde_json::Value;
use tardis::{chrono, TardisCreateEntity, TardisEmptyBehavior, TardisEmptyRelation};
/// Event Topic model
///
/// 事件主题模型
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, TardisCreateEntity, TardisEmptyBehavior, TardisEmptyRelation)]
#[sea_orm(table_name = "event_persistent")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub id: String,
pub message: Value,
pub inst_id: String,
pub mgr_node: bool,
pub subscribe_mode: bool,
pub topic: String,
pub status: String,
pub error: Option<String>,
#[sea_orm(extra = "DEFAULT 0")]
pub retry_times: i32,
#[sea_orm(extra = "DEFAULT CURRENT_TIMESTAMP")]
pub create_time: chrono::DateTime<Utc>,
#[sea_orm(extra = "DEFAULT CURRENT_TIMESTAMP")]
pub update_time: chrono::DateTime<Utc>,
}

#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub enum Status {
Sending,
Success,
Failed,
Unknown,
}

impl Status {
pub const fn as_str(&self) -> &'static str {
match self {
Status::Sending => "Sending",
Status::Success => "Success",
Status::Failed => "Failed",
_ => "Unknown"
}
}
}

impl std::fmt::Display for Status {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(<&'static str>::from(*self))
}
}

impl From<Status> for &'static str {
fn from(val: Status) -> Self {
val.as_str()
}
}
impl From<&str> for Status {
fn from(value: &str) -> Self {
match value {
"Sending" => Self::Sending,
"Success" => Self::Success,
"Failed" => Self::Failed,
_ => Status::Unknown,
}
}
}
4 changes: 4 additions & 0 deletions backend/middlewares/event/src/event_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ pub struct EventConfig {
pub event_url: String,
pub event_bus_sk: String,
pub spi_app_id: String,
pub resend_threshold: u32,
pub resend_interval_sec: Option<u32>,
}

impl Default for EventConfig {
Expand All @@ -24,6 +26,8 @@ impl Default for EventConfig {
event_url: "".to_string(),
event_bus_sk: "".to_string(),
spi_app_id: "".to_string(),
resend_threshold: 3,
resend_interval_sec: None,
}
}
}
Expand Down
25 changes: 22 additions & 3 deletions backend/middlewares/event/src/event_initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::{
dto::event_dto::EventTopicAddOrModifyReq,
event_config::{EventConfig, EventInfo, EventInfoManager},
event_constants::{DOMAIN_CODE, KIND_CODE},
serv::{self, event_proc_serv::CreateRemoteSenderSubscriber, event_topic_serv::EventDefServ},
serv::{self, event_proc_serv::CreateRemoteSenderHandler, event_topic_serv::EventDefServ},
};

pub async fn init(web_server: &TardisWebServer) -> TardisResult<()> {
Expand All @@ -36,7 +36,9 @@ pub async fn init(web_server: &TardisWebServer) -> TardisResult<()> {
funs.begin().await?;
init_db(DOMAIN_CODE.to_string(), KIND_CODE.to_string(), &funs, &ctx).await?;
EventDefServ::init(&funs, &ctx).await?;
funs.commit().await
funs.commit().await?;
init_scan_and_resend_task();
Ok(())
}

async fn init_db(domain_code: String, kind_code: String, funs: &TardisFunsInst, ctx: &TardisContext) -> TardisResult<()> {
Expand Down Expand Up @@ -115,7 +117,24 @@ async fn init_cluster_resource() {
subscribe(listeners().clone()).await;
subscribe(mgr_listeners().clone()).await;
subscribe(topics().clone()).await;
subscribe(CreateRemoteSenderSubscriber).await;
subscribe(CreateRemoteSenderHandler).await;
}

fn init_scan_and_resend_task() {
let funs = TardisFuns::inst_with_db_conn(DOMAIN_CODE.to_string(), None);

let config = funs.conf::<EventConfig>();
let Some(interval_sec) = config.resend_interval_sec else {
return;
};
let mut interval = tardis::tokio::time::interval(tardis::tokio::time::Duration::from_secs(interval_sec as u64));
tardis::tokio::spawn(async move {
loop {
interval.tick().await;
let funs = TardisFuns::inst_with_db_conn(DOMAIN_CODE.to_string(), None);
let _ = crate::serv::event_proc_serv::scan_and_resend(funs.into()).await;
}
});
}

async fn init_log_ws_client() -> TardisWSClient {
Expand Down
1 change: 1 addition & 0 deletions backend/middlewares/event/src/serv.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod event_listener_serv;
pub mod event_proc_serv;
pub mod event_topic_serv;
pub mod event_persistent_serv;
94 changes: 94 additions & 0 deletions backend/middlewares/event/src/serv/event_persistent_serv.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use tardis::basic::result::TardisResult;
use tardis::db::sea_orm::sea_query::{Expr, Query};
use tardis::db::sea_orm::{ColumnTrait, ConnectionTrait, EntityTrait, QueryFilter, QueryOrder, Set};
use tardis::futures::{Stream, StreamExt};
use tardis::web::ws_processor::{TardisWebsocketReq, WsBroadcastContext};
use tardis::{serde_json, TardisFunsInst};

use crate::domain::event_persistent;

pub struct EventPersistentServ;
impl EventPersistentServ {
pub async fn save_message(message: PersistentMessage, funs: &TardisFunsInst) -> TardisResult<()> {
if let Some(id) = message.req.msg_id.to_owned() {
let db = funs.db();
let _ = event_persistent::Entity::insert(event_persistent::ActiveModel {
id: Set(id),
message: Set(serde_json::to_value(message.req).expect("TardisWebsocketReq cannot be converted to json")),
status: Set(event_persistent::Status::Sending.to_string()),
topic: Set(message.topic),
inst_id: Set(message.context.inst_id.clone()),
mgr_node: Set(message.context.mgr_node),
subscribe_mode: Set(message.context.subscribe_mode),
..Default::default()
})
.exec(db.raw_conn())
.await?;
}
Ok(())
}
pub async fn sending(id: String, funs: &TardisFunsInst) -> TardisResult<()> {
use tardis::db::sea_orm::StatementBuilder;
let db = funs.db().raw_conn();
let query = Query::update()
.table(event_persistent::Entity)
.value(event_persistent::Column::RetryTimes, Expr::col(event_persistent::Column::RetryTimes).add(1))
.cond_where(event_persistent::Column::Id.eq(id))
.to_owned();
let statement = StatementBuilder::build(&query, &db.get_database_backend());
db.execute(statement).await?;

Ok(())
}
pub async fn send_success(id: String, funs: &TardisFunsInst) -> TardisResult<()> {
let db = funs.db().raw_conn();
event_persistent::Entity::update(event_persistent::ActiveModel {
id: Set(id),
status: Set(event_persistent::Status::Success.to_string()),
..Default::default()
})
.filter(event_persistent::Column::Status.eq(event_persistent::Status::Sending.as_str()))
.exec(db)
.await?;
Ok(())
}
pub async fn send_fail(id: String, error: impl Into<String>, funs: &TardisFunsInst) -> TardisResult<()> {
let db = funs.db().raw_conn();
event_persistent::Entity::update(event_persistent::ActiveModel {
id: Set(id),
status: Set(event_persistent::Status::Success.to_string()),
error: Set(Some(error.into())),
..Default::default()
})
.filter(event_persistent::Column::Status.eq(event_persistent::Status::Sending.as_str()))
.exec(db)
.await?;
Ok(())
}

pub async fn scan_failed(funs: &TardisFunsInst, threshold: i32) -> TardisResult<impl Stream<Item = PersistentMessage> + '_> {
let db = funs.db().raw_conn();
Ok(event_persistent::Entity::find()
.filter(event_persistent::Column::Status.eq(event_persistent::Status::Failed.as_str()).and(event_persistent::Column::RetryTimes.lt(threshold)))
.order_by_desc(event_persistent::Column::UpdateTime)
.stream(db)
.await?
.filter_map(|item| async move {
let item = item.ok()?;
let req = serde_json::from_value::<TardisWebsocketReq>(item.message).ok()?;
let topic = item.topic;
let context = WsBroadcastContext {
inst_id: item.inst_id,
mgr_node: item.mgr_node,
subscribe_mode: item.subscribe_mode,
};
Some(PersistentMessage { req, context, topic })
}))
}
}

pub struct PersistentMessage {
pub req: TardisWebsocketReq,
pub context: WsBroadcastContext,
pub topic: String,
}
Loading

0 comments on commit bbb2186

Please sign in to comment.