Skip to content

Commit

Permalink
global: add event support (#551)
Browse files Browse the repository at this point in the history
  • Loading branch information
4t145 authored Dec 15, 2023
1 parent 273eb92 commit 34b8020
Show file tree
Hide file tree
Showing 44 changed files with 1,164 additions and 288 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ run_script = { version = "0.10" }
testcontainers-modules = { version = "0.1" }
strum = { version = "0.25", features = ["derive"] }
# tardis
tardis = { version = "=0.1.0-rc.3" }
tardis = { version = "=0.1.0-rc.6" }
# tardis = { path = "../tardis/tardis" }
# tardis = { git = "https://github.com/ideal-world/tardis.git", rev = "9424e16" }
#spacegate
Expand Down
20 changes: 17 additions & 3 deletions middleware/event/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,23 @@ path = "src/lib.rs"
serde.workspace = true
async-trait.workspace = true
lazy_static.workspace = true
tardis = { workspace = true, features = ["reldb-postgres", "web-server", "ws-client"] }
tardis = { workspace = true, features = [
"reldb-postgres",
"web-server",
"ws-client",
"cluster",
] }
bios-basic = { path = "../../basic", features = ["default"] }

bios-sdk-invoke = { path = "../../sdk/invoke", features = [
"spi_log",
"event",
], default-features = false }
[dev-dependencies]
tardis = { workspace = true, features = ["test", "ws-client"] }
tardis = { workspace = true, features = [
"test",
"ws-client",
"cluster",
"web-server",
] }
bios-basic = { path = "../../basic", features = ["default", "test"] }
tokio = { version = "1", features = ["full"] }
11 changes: 5 additions & 6 deletions middleware/event/src/api/event_listener_api.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use bios_basic::TardisFunInstExtractor;
use tardis::web::poem::Request;
use tardis::web::poem_openapi;
use tardis::web::poem_openapi::param::{Path, Query};
use tardis::web::poem_openapi::payload::Json;
use tardis::web::web_resp::{TardisApiResult, TardisResp, Void};

use crate::dto::event_dto::{EventListenerRegisterReq, EventListenerRegisterResp};
use crate::event_constants::get_tardis_inst;
use crate::serv::event_listener_serv;
#[derive(Clone)]
pub struct EventListenerApi;
Expand All @@ -14,15 +13,15 @@ pub struct EventListenerApi;
#[poem_openapi::OpenApi(prefix_path = "/listener")]
impl EventListenerApi {
#[oai(path = "/", method = "post")]
async fn register(&self, listener: Json<EventListenerRegisterReq>, request: &Request) -> TardisApiResult<EventListenerRegisterResp> {
let funs = request.tardis_fun_inst();
async fn register(&self, listener: Json<EventListenerRegisterReq>) -> TardisApiResult<EventListenerRegisterResp> {
let funs = get_tardis_inst();
let resp = event_listener_serv::register(listener.0, &funs).await?;
TardisResp::ok(resp)
}

#[oai(path = "/:listener_code", method = "delete")]
async fn remove(&self, listener_code: Path<String>, token: Query<String>, request: &Request) -> TardisApiResult<Void> {
let funs = request.tardis_fun_inst();
async fn remove(&self, listener_code: Path<String>, token: Query<String>) -> TardisApiResult<Void> {
let funs = get_tardis_inst();
event_listener_serv::remove(&listener_code.0, &token.0, &funs).await?;
TardisResp::ok(Void {})
}
Expand Down
7 changes: 3 additions & 4 deletions middleware/event/src/api/event_proc_api.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use bios_basic::TardisFunInstExtractor;
use tardis::web::poem::web::websocket::{BoxWebSocketUpgraded, WebSocket};
use tardis::web::poem::Request;
use tardis::web::poem_openapi;
use tardis::web::poem_openapi::param::{Path, Query};

use crate::event_constants::get_tardis_inst;
use crate::serv::event_proc_serv;
#[derive(Clone)]
pub struct EventProcApi;
Expand All @@ -12,8 +11,8 @@ pub struct EventProcApi;
#[poem_openapi::OpenApi(prefix_path = "/proc")]
impl EventProcApi {
#[oai(path = "/:listener_code", method = "get")]
async fn ws_process(&self, listener_code: Path<String>, token: Query<String>, websocket: WebSocket, request: &Request) -> BoxWebSocketUpgraded {
let funs = request.tardis_fun_inst();
async fn ws_process(&self, listener_code: Path<String>, token: Query<String>, websocket: WebSocket) -> BoxWebSocketUpgraded {
let funs = get_tardis_inst();
event_proc_serv::ws_process(listener_code.0, token.0, websocket, &funs).await
}
}
18 changes: 8 additions & 10 deletions middleware/event/src/api/event_topic_api.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use bios_basic::rbum::dto::rbum_filer_dto::RbumBasicFilterReq;
use bios_basic::rbum::serv::rbum_item_serv::RbumItemCrudOperation;
use bios_basic::TardisFunInstExtractor;
use tardis::web::context_extractor::TardisContextExtractor;
use tardis::web::poem::Request;
use tardis::web::poem_openapi;
use tardis::web::poem_openapi::param::{Path, Query};
use tardis::web::poem_openapi::payload::Json;
use tardis::web::web_resp::{TardisApiResult, TardisPage, TardisResp, Void};

use crate::dto::event_dto::{EventTopicAddOrModifyReq, EventTopicFilterReq, EventTopicInfoResp};
use crate::event_constants::get_tardis_inst;
use crate::serv::event_topic_serv::EventDefServ;
#[derive(Clone)]
pub struct EventTopicApi;
Expand All @@ -18,24 +17,24 @@ pub struct EventTopicApi;
impl EventTopicApi {
/// Add Event Definition
#[oai(path = "/", method = "post")]
async fn add(&self, mut add_or_modify_req: Json<EventTopicAddOrModifyReq>, ctx: TardisContextExtractor, request: &Request) -> TardisApiResult<String> {
let funs = request.tardis_fun_inst();
async fn add(&self, mut add_or_modify_req: Json<EventTopicAddOrModifyReq>, ctx: TardisContextExtractor) -> TardisApiResult<String> {
let funs = get_tardis_inst();
let id = EventDefServ::add_item(&mut add_or_modify_req.0, &funs, &ctx.0).await?;
TardisResp::ok(id)
}

/// Modify Event Definition
#[oai(path = "/:id", method = "put")]
async fn modify(&self, id: Path<String>, mut add_or_modify_req: Json<EventTopicAddOrModifyReq>, ctx: TardisContextExtractor, request: &Request) -> TardisApiResult<Void> {
let funs = request.tardis_fun_inst();
async fn modify(&self, id: Path<String>, mut add_or_modify_req: Json<EventTopicAddOrModifyReq>, ctx: TardisContextExtractor) -> TardisApiResult<Void> {
let funs = get_tardis_inst();
EventDefServ::modify_item(&id.0, &mut add_or_modify_req.0, &funs, &ctx.0).await?;
TardisResp::ok(Void {})
}

/// Delete Event Definition
#[oai(path = "/:id", method = "delete")]
async fn delete(&self, id: Path<String>, ctx: TardisContextExtractor, request: &Request) -> TardisApiResult<Void> {
let funs = request.tardis_fun_inst();
async fn delete(&self, id: Path<String>, ctx: TardisContextExtractor) -> TardisApiResult<Void> {
let funs = get_tardis_inst();
EventDefServ::delete_item(&id.0, &funs, &ctx.0).await?;
TardisResp::ok(Void {})
}
Expand All @@ -52,9 +51,8 @@ impl EventTopicApi {
desc_by_create: Query<Option<bool>>,
desc_by_update: Query<Option<bool>>,
ctx: TardisContextExtractor,
request: &Request,
) -> TardisApiResult<TardisPage<EventTopicInfoResp>> {
let funs = request.tardis_fun_inst();
let funs = get_tardis_inst();
let result = EventDefServ::paginate_items(
&EventTopicFilterReq {
basic: RbumBasicFilterReq {
Expand Down
12 changes: 6 additions & 6 deletions middleware/event/src/dto/event_dto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tardis::{

#[derive(poem_openapi::Object, Serialize, Deserialize, Debug)]
pub struct EventTopicAddOrModifyReq {
#[oai(validator(pattern = r"^[a-z0-9]+$"))]
// #[oai(validator(pattern = r"^[a-z0-9]+$"))]
pub code: TrimString,
pub name: TrimString,
pub save_message: bool,
Expand All @@ -20,9 +20,9 @@ pub struct EventTopicAddOrModifyReq {
pub mgr_sk: Option<String>,
}

#[derive(poem_openapi::Object, sea_orm::FromQueryResult, Serialize, Deserialize, Debug)]
#[derive(poem_openapi::Object, sea_orm::FromQueryResult, Serialize, Deserialize, Debug, Clone)]
pub struct EventTopicInfoResp {
#[oai(validator(pattern = r"^[a-z0-9]+$"))]
// #[oai(validator(pattern = r"^[a-z0-9]+$"))]
pub code: String,
pub name: String,
pub save_message: bool,
Expand Down Expand Up @@ -55,10 +55,10 @@ impl RbumItemFilterFetcher for EventTopicFilterReq {

#[derive(poem_openapi::Object, Serialize, Deserialize, Debug)]
pub struct EventListenerRegisterReq {
#[oai(validator(pattern = r"^[a-z0-9]+$"))]
// #[oai(validator(pattern = r"^[a-z0-9]+$"))]
pub topic_code: TrimString,
pub topic_sk: Option<String>,
#[oai(validator(pattern = r"^[a-z0-9-_]+$"))]
// #[oai(validator(pattern = r"^[a-z0-9-_]+$"))]
pub events: Option<Vec<TrimString>>,
pub avatars: Vec<TrimString>,
pub subscribe_mode: bool,
Expand All @@ -69,7 +69,7 @@ pub struct EventListenerRegisterResp {
pub listener_code: String,
}

#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct EventListenerInfo {
pub topic_code: String,
pub subscribe_mode: bool,
Expand Down
4 changes: 4 additions & 0 deletions middleware/event/src/event_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ pub struct EventConfig {
pub app_key: AppKeyConfig,
pub event_url: String,
pub log_url: String,
pub event_bus_sk: String,
pub spi_app_id: String,
}

impl Default for EventConfig {
Expand All @@ -19,6 +21,8 @@ impl Default for EventConfig {
app_key: Default::default(),
event_url: "".to_string(),
log_url: "".to_string(),
event_bus_sk: "".to_string(),
spi_app_id: "".to_string(),
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions middleware/event/src/event_constants.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,8 @@
use tardis::{TardisFuns, TardisFunsInst};

pub const DOMAIN_CODE: &str = "event";
pub const KIND_CODE: &str = "event";
pub const SERVICE_EVENT_BUS_AVATAR: &str = "event_bus/service/event";
pub fn get_tardis_inst() -> TardisFunsInst {
TardisFuns::inst_with_db_conn(DOMAIN_CODE.to_string(), None)
}
87 changes: 79 additions & 8 deletions middleware/event/src/event_initializer.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,30 @@
use bios_basic::rbum::{
dto::{rbum_domain_dto::RbumDomainAddReq, rbum_kind_dto::RbumKindAddReq},
rbum_enumeration::RbumScopeLevelKind,
serv::{rbum_crud_serv::RbumCrudOperation, rbum_domain_serv::RbumDomainServ, rbum_kind_serv::RbumKindServ},
serv::{rbum_crud_serv::RbumCrudOperation, rbum_domain_serv::RbumDomainServ, rbum_item_serv::RbumItemCrudOperation, rbum_kind_serv::RbumKindServ},
};
use bios_sdk_invoke::clients::event_client::TOPIC_EVENT_BUS;
use tardis::{
basic::{dto::TardisContext, field::TrimString, result::TardisResult},
db::reldb_client::TardisActiveModel,
web::web_server::TardisWebServer,
log::error,
web::{web_server::TardisWebServer, ws_client::TardisWSClient},
TardisFuns, TardisFunsInst,
};

use crate::{
api::{event_listener_api, event_proc_api, event_topic_api},
domain::event_topic,
event_config::{EventInfo, EventInfoManager},
event_constants::{DOMAIN_CODE, KIND_CODE},
serv::event_topic_serv::EventDefServ,
dto::event_dto::EventTopicAddOrModifyReq,
event_config::{EventConfig, EventInfo, EventInfoManager},
event_constants::{DOMAIN_CODE, KIND_CODE, SERVICE_EVENT_BUS_AVATAR},
serv::{self, event_proc_serv::CreateRemoteSenderSubscriber, event_topic_serv::EventDefServ},
};

pub async fn init(web_server: &TardisWebServer) -> TardisResult<()> {
let mut funs = TardisFuns::inst_with_db_conn(DOMAIN_CODE.to_string(), None);
init_api(web_server).await?;
init_cluster_resource().await;
let ctx = TardisContext {
own_paths: "".to_string(),
ak: "".to_string(),
Expand All @@ -31,13 +36,12 @@ pub async fn init(web_server: &TardisWebServer) -> TardisResult<()> {
funs.begin().await?;
init_db(DOMAIN_CODE.to_string(), KIND_CODE.to_string(), &funs, &ctx).await?;
EventDefServ::init(&funs, &ctx).await?;
funs.commit().await?;
init_api(web_server).await
funs.commit().await
}

async fn init_db(domain_code: String, kind_code: String, funs: &TardisFunsInst, ctx: &TardisContext) -> TardisResult<()> {
if let Some(domain_id) = RbumDomainServ::get_rbum_domain_id_by_code(&domain_code, funs).await? {
let kind_id = RbumKindServ::get_rbum_kind_id_by_code(&kind_code, funs).await?.unwrap();
let kind_id = RbumKindServ::get_rbum_kind_id_by_code(&kind_code, funs).await?.expect("missing event kind");
EventInfoManager::set(EventInfo { kind_id, domain_id })?;
return Ok(());
}
Expand Down Expand Up @@ -75,6 +79,22 @@ async fn init_db(domain_code: String, kind_code: String, funs: &TardisFunsInst,
)
.await?;
EventInfoManager::set(EventInfo { kind_id, domain_id })?;
let config = funs.conf::<EventConfig>();
// create event bus topic
serv::event_topic_serv::EventDefServ::add_item(
&mut EventTopicAddOrModifyReq {
code: TOPIC_EVENT_BUS.into(),
name: TOPIC_EVENT_BUS.into(),
save_message: false,
need_mgr: false,
queue_size: 1024,
use_sk: Some(config.event_bus_sk.clone()),
mgr_sk: None,
},
funs,
ctx,
)
.await?;
Ok(())
}

Expand All @@ -87,3 +107,54 @@ async fn init_api(web_server: &TardisWebServer) -> TardisResult<()> {
.await;
Ok(())
}

async fn init_cluster_resource() {
use crate::serv::event_listener_serv::{listeners, mgr_listeners};
use crate::serv::event_topic_serv::topics;
use tardis::cluster::cluster_processor::subscribe;
subscribe(listeners().clone()).await;
subscribe(mgr_listeners().clone()).await;
subscribe(topics().clone()).await;
subscribe(CreateRemoteSenderSubscriber).await;
}

async fn init_ws_client() -> TardisWSClient {
while !TardisFuns::web_server().is_running().await {
tardis::tokio::task::yield_now().await
}
let funs = TardisFuns::inst_with_db_conn(DOMAIN_CODE.to_string(), None);
let conf = funs.conf::<EventConfig>();
let topic_sk = conf.event_bus_sk.clone();
let client = bios_sdk_invoke::clients::event_client::EventClient::new("http://localhost:8080/event", &funs);
loop {
let addr = loop {
if let Ok(result) = client
.register(&bios_sdk_invoke::clients::event_client::EventListenerRegisterReq {
topic_code: TOPIC_EVENT_BUS.to_string(),
topic_sk: Some(topic_sk.clone()),
events: None,
avatars: vec![SERVICE_EVENT_BUS_AVATAR.into()],
subscribe_mode: false,
})
.await
{
break result.ws_addr;
}
tardis::tokio::time::sleep(std::time::Duration::from_secs(10)).await;
};
let ws_client = TardisFuns::ws_client(&addr, |_| async move { None }).await;
match ws_client {
Ok(ws_client) => {
return ws_client;
}
Err(err) => {
error!("[Bios.Event] failed to connect to event server: {}", err);
tardis::tokio::time::sleep(std::time::Duration::from_secs(10)).await;
}
}
}
}

tardis::tardis_static! {
pub(crate) async ws_client: TardisWSClient = init_ws_client();
}
2 changes: 1 addition & 1 deletion middleware/event/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#![warn(clippy::unwrap_used, clippy::dbg_macro)]
extern crate lazy_static;

mod api;
mod domain;
pub mod dto;
Expand Down
Loading

0 comments on commit 34b8020

Please sign in to comment.