Skip to content

Commit

Permalink
event: Fix persistent resend state update (#770)
Browse files Browse the repository at this point in the history
* fix: event persistent status update bug

* update tardis version

* modify some log level

* feat: search add len query OP (#768)

* feat: search add len query OP

* update

* update

---------

Co-authored-by: RWDai <27391645+RWDai@users.noreply.github.com>
  • Loading branch information
4t145 and RWDai authored Jun 12, 2024
1 parent fda8dbf commit ebfca0e
Show file tree
Hide file tree
Showing 9 changed files with 37 additions and 8 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ strum = { version = "0.26", features = ["derive"] }
# tardis
# tardis = { version = "0.1.0-rc.15" }
# tardis = { path = "../tardis/tardis" }
tardis = { git = "https://github.com/ideal-world/tardis.git", rev = "1b4768a" }
tardis = { git = "https://github.com/ideal-world/tardis.git", rev = "47a82de" }
#spacegate

# spacegate-shell = { path = "../spacegate/crates/shell", features = [
Expand Down
9 changes: 8 additions & 1 deletion backend/middlewares/event/src/event_initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tardis::{

use crate::{
api::{event_listener_api, event_proc_api, event_topic_api},
domain::event_topic,
domain::{event_persistent, event_topic},
dto::event_dto::EventTopicAddOrModifyReq,
event_config::{EventConfig, EventInfo, EventInfoManager},
event_constants::{DOMAIN_CODE, KIND_CODE},
Expand Down Expand Up @@ -48,6 +48,13 @@ async fn init_db(domain_code: String, kind_code: String, funs: &TardisFunsInst,
return Ok(());
}

funs.db()
.init(event_persistent::ActiveModel::init(
TardisFuns::reldb().backend(),
None,
TardisFuns::reldb().compatible_type(),
))
.await?;
// Initialize event component RBUM item table and indexs
funs.db().init(event_topic::ActiveModel::init(TardisFuns::reldb().backend(), None, TardisFuns::reldb().compatible_type())).await?;
// Initialize event component RBUM domain data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ impl EventPersistentServ {
let query = Query::update()
.table(event_persistent::Entity)
.value(event_persistent::Column::RetryTimes, Expr::col(event_persistent::Column::RetryTimes).add(1))
.value(event_persistent::Column::Status, event_persistent::Status::Sending.to_string())
.cond_where(event_persistent::Column::Id.eq(id))
.to_owned();
let statement = StatementBuilder::build(&query, &db.get_database_backend());
Expand Down
4 changes: 3 additions & 1 deletion backend/middlewares/event/src/serv/event_proc_serv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tardis::basic::result::TardisResult;
use tardis::cluster::cluster_processor::{ClusterEventTarget, TardisClusterMessageReq};
use tardis::cluster::cluster_publish::publish_event_no_response;
use tardis::futures::StreamExt;
use tardis::log::warn;
use tardis::log::{self as tracing, debug, info, instrument, warn};
use tardis::serde_json::Value;
use tardis::tokio::sync::RwLock;
use tardis::web::poem::web::websocket::{BoxWebSocketUpgraded, WebSocket};
Expand Down Expand Up @@ -99,6 +99,7 @@ impl WsHooks for Hooks {
}
}
}
#[instrument(skip(self))]
async fn on_process(&self, req_msg: TardisWebsocketReq, context: &WsBroadcastContext) -> Option<TardisWebsocketResp> {
if self.persistent {
let result = super::event_persistent_serv::EventPersistentServ::save_message(
Expand Down Expand Up @@ -161,6 +162,7 @@ pub(crate) async fn ws_process(listener_code: String, token: String, websocket:
return ws_error(listener_code, "topic not found", websocket);
};
let sender = get_or_init_sender(listener.topic_code.clone(), topic.queue_size as usize).await;
tardis::log::trace!("[Bios.Event] create {topic:?} process for {token}");
WsBroadcast::new(
sender,
Hooks {
Expand Down
1 change: 0 additions & 1 deletion backend/middlewares/event/tests/test_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,5 @@ async fn init_data() -> TardisResult<()> {
test_event_without_mgr::test(&[&client]).await?;
test_event_with_event_code::test(&[&client]).await?;
test_event_with_im::test(&[&client]).await?;

Ok(())
}
7 changes: 6 additions & 1 deletion backend/middlewares/flow/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@ pub async fn start_flow_event_service(config: &EventTopicConfig) -> TardisResult
}
let resp = client.register(&event_conf.into()).await?;
let ws_client = TardisFuns::ws_client(&resp.ws_addr, |message| async move {
let Ok(json_str) = message.to_text() else { return None };
if !message.is_text() {
return None;
}
let Ok(json_str) = message.to_text() else {
return None;
};
info!("[BIOS.Flow] event msg: {json_str}");
let Ok(TardisWebsocketMessage { msg, event, .. }) = TardisFuns::json.str_to_obj(json_str) else {
return None;
Expand Down
7 changes: 6 additions & 1 deletion backend/spi/spi-log/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@ pub async fn start_log_event_service(config: &EventTopicConfig) -> TardisResult<
}
let resp = client.register(&event_conf.into()).await?;
let ws_client = TardisFuns::ws_client(&resp.ws_addr, |message| async move {
let Ok(json_str) = message.to_text() else { return None };
if !message.is_text() {
return None;
}
let Ok(json_str) = message.to_text() else {
return None;
};
info!("[BIOS.Log] event msg: {json_str}");
let Ok(TardisWebsocketMessage { msg, event, .. }) = TardisFuns::json.str_to_obj(json_str) else {
return None;
Expand Down
7 changes: 6 additions & 1 deletion backend/spi/spi-search/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@ pub async fn start_search_event_service(config: &EventTopicConfig) -> TardisResu
}
let resp = client.register(&event_conf.into()).await?;
let ws_client = TardisFuns::ws_client(&resp.ws_addr, |message| async move {
let Ok(json_str) = message.to_text() else { return None };
if !message.is_text() {
return None;
}
let Ok(json_str) = message.to_text() else {
return None;
};
let Ok(TardisWebsocketMessage { msg, event, .. }) = TardisFuns::json.str_to_obj(json_str) else {
return None;
};
Expand Down
7 changes: 6 additions & 1 deletion backend/supports/iam/src/iam_initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -871,7 +871,12 @@ pub async fn init_ws_iam_event_client() -> TardisResult<()> {
tardis::tokio::time::sleep(std::time::Duration::from_secs(10)).await;
};
let ws_client = TardisFuns::ws_client(&addr, |message| async move {
let Ok(json_str) = message.to_text() else { return None };
if !message.is_text() {
return None;
}
let Ok(json_str) = message.to_text() else {
return None;
};
info!("[BIOS.Iam] event msg: {json_str}");
let Ok(TardisWebsocketMessage { msg, event, .. }) = TardisFuns::json.str_to_obj(json_str) else {
return None;
Expand Down

0 comments on commit ebfca0e

Please sign in to comment.