Skip to content

Commit

Permalink
event: use new event center (#832)
Browse files Browse the repository at this point in the history
* new event center

* ignore package lock file

* remove package-lock
  • Loading branch information
4t145 authored Aug 30, 2024
1 parent f68b1d8 commit 59bca1f
Show file tree
Hide file tree
Showing 55 changed files with 628 additions and 5,816 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,6 @@ backend/services/spacegate/devsh/

.uuid
backend/services/bios-all/dev.sh

# front end
package-lock.json
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ strum = { version = "0.26", features = ["derive"] }
# tardis
# tardis = { version = "0.1.0-rc.16" }
# tardis = { path = "../tardis/tardis" }
tardis = { git = "https://github.com/ideal-world/tardis.git", rev = "17fed580" }

tardis = { git = "https://github.com/ideal-world/tardis.git", rev = "763b987" }
asteroid-mq = { git = "https://github.com/4t145/asteroid-mq.git", rev = "d59c64d" }
# asteroid-mq = { path = "../asteroid/asteroid-mq" }
#spacegate

# spacegate-shell = { path = "../spacegate/crates/shell", features = [
Expand Down
72 changes: 30 additions & 42 deletions backend/basic/src/process/task_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
//! 异步任务处理器
use std::{collections::HashMap, future::Future, sync::Arc};

use bios_sdk_invoke::clients::event_client::{BiosEventCenter, Event, EventCenter, EventExt};
use bios_sdk_invoke::clients::event_client::{
asteroid_mq::prelude::{EventAttribute, Subject, TopicCode},
get_topic, EventAttributeExt,
};
use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};
use tardis::{
Expand All @@ -21,7 +24,7 @@ lazy_static! {
}
const TASK_PROCESSOR_DATA_EX_SEC: u64 = 60 * 60 * 24;
const TASK_IN_CTX_FLAG: &str = "task_id";

const TASK_TOPIC: TopicCode = TopicCode::const_new("task");
/// Set task status event flag
/// 设置任务状态事件标识
pub const EVENT_SET_TASK_STATUS_FLAG: &str = "task/set_status";
Expand Down Expand Up @@ -102,17 +105,19 @@ impl TaskProcessor {
to_avatars: Option<Vec<String>>,
) -> TardisResult<()> {
Self::set_status(cache_key, task_id, status, cache_client).await?;
if let Some(ec) = BiosEventCenter::public() {
ec.publish(
TaskSetStatusEventReq {
task_id,
data: status,
msg: format!("task status: {}", status),
}
.with_source(from_avatar)
.with_targets(to_avatars),
)
.await?;
if let Some(topic) = get_topic(&TASK_TOPIC) {
// todo: broadcast event to users
// topic
// .send_event(
// TaskSetStatusEventReq {
// task_id,
// data: status,
// msg: format!("task status: {}", status),
// }
// .json(),
// )
// .await
// .map_err(mq_error)?;
}
Ok(())
}
Expand All @@ -137,9 +142,8 @@ impl TaskProcessor {
to_avatars: Option<Vec<String>>,
) -> TardisResult<()> {
Self::set_process_data(cache_key, task_id, data.clone(), cache_client).await?;
if let Some(ec) = BiosEventCenter::public() {
let msg = format!("set task process: {}", &TardisFuns::json.json_to_string(data.clone())?);
ec.publish(TaskSetProcessDataEventReq { task_id, data, msg }.with_source(from_avatar).with_targets(to_avatars)).await?;
if let Some(topic) = get_topic(&TASK_TOPIC) {
// todo: broadcast event to users
}
Ok(())
}
Expand Down Expand Up @@ -214,16 +218,8 @@ impl TaskProcessor {
}
});
TASK_HANDLE.write().await.insert(task_id, handle);
if let Some(ec) = BiosEventCenter::public() {
ec.publish(
TaskExecuteEventReq {
task_id,
msg: "execute task start".to_owned(),
}
.with_source(from_avatar_clone)
.with_targets(to_avatars_clone),
)
.await?;
if let Some(topic) = get_topic(&TASK_TOPIC) {
// todo: broadcast event to users
}
if let Some(ctx) = ctx {
if let Some(exist_task_ids) = ctx.get_ext(TASK_IN_CTX_FLAG).await? {
Expand All @@ -246,16 +242,8 @@ impl TaskProcessor {
to_avatars: Option<Vec<String>>,
) -> TardisResult<u64> {
let task_id = TaskProcessor::init_status(cache_key, Some(task_id), cache_client).await?;
if let Some(ec) = BiosEventCenter::public() {
ec.publish(
TaskExecuteEventReq {
task_id,
msg: "execute task start".to_owned(),
}
.with_source(from_avatar)
.with_targets(to_avatars),
)
.await?;
if let Some(topic) = get_topic(&TASK_TOPIC) {
// todo: broadcast event to users
}
Ok(task_id)
}
Expand Down Expand Up @@ -322,14 +310,14 @@ struct TaskExecuteEventReq {
pub msg: String,
}

impl Event for TaskSetStatusEventReq {
const CODE: &'static str = EVENT_SET_TASK_STATUS_FLAG;
impl EventAttribute for TaskSetStatusEventReq {
const SUBJECT: Subject = Subject::const_new(EVENT_SET_TASK_STATUS_FLAG.as_bytes());
}

impl Event for TaskSetProcessDataEventReq {
const CODE: &'static str = EVENT_SET_TASK_PROCESS_DATA_FLAG;
impl EventAttribute for TaskSetProcessDataEventReq {
const SUBJECT: Subject = Subject::const_new(EVENT_SET_TASK_PROCESS_DATA_FLAG.as_bytes());
}

impl Event for TaskExecuteEventReq {
const CODE: &'static str = EVENT_EXECUTE_TASK_FLAG;
impl EventAttribute for TaskExecuteEventReq {
const SUBJECT: Subject = Subject::const_new(EVENT_EXECUTE_TASK_FLAG.as_bytes());
}
3 changes: 3 additions & 0 deletions backend/middlewares/event/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ bios-sdk-invoke = { path = "../../../frontend/sdks/invoke", features = [
"spi_log",
"event",
], default-features = false }
asteroid-mq = { workspace = true, features = ["cluster-k8s", "json"] }


[dev-dependencies]
tardis = { workspace = true, features = [
"test",
Expand Down
11 changes: 5 additions & 6 deletions backend/middlewares/event/src/api/event_listener_api.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use tardis::basic::error::TardisError;
use tardis::web::poem_openapi;
use tardis::web::poem_openapi::param::{Path, Query};
use tardis::web::poem_openapi::payload::Json;
Expand All @@ -19,18 +20,16 @@ impl EventListenerApi {
/// 注册事件监听器
#[oai(path = "/", method = "post")]
async fn register(&self, listener: Json<EventListenerRegisterReq>) -> TardisApiResult<EventListenerRegisterResp> {
let funs = get_tardis_inst();
let resp = event_listener_serv::register(listener.0, &funs).await?;
TardisResp::ok(resp)

TardisResp::err(TardisError::not_implemented("unimplemented", "unimplemented"))
}

/// Remove event listener
///
/// 移除事件监听器
#[oai(path = "/:listener_code", method = "delete")]
async fn remove(&self, listener_code: Path<String>, token: Query<String>) -> TardisApiResult<Void> {
let funs = get_tardis_inst();
event_listener_serv::remove(&listener_code.0, &token.0, &funs).await?;
TardisResp::ok(Void {})
TardisResp::err(TardisError::not_implemented("unimplemented", "unimplemented"))

}
}
7 changes: 2 additions & 5 deletions backend/middlewares/event/src/api/event_proc_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ use tardis::web::poem::web::websocket::{BoxWebSocketUpgraded, WebSocket};
use tardis::web::poem_openapi;
use tardis::web::poem_openapi::param::{Path, Query};

use crate::event_constants::get_tardis_inst;
use crate::serv::event_proc_serv;

#[derive(Clone)]
pub struct EventProcApi;

Expand All @@ -17,8 +16,6 @@ impl EventProcApi {
/// 处理事件
#[oai(path = "/:listener_code", method = "get")]
async fn ws_process(&self, listener_code: Path<String>, token: Query<String>, websocket: WebSocket) -> Result<BoxWebSocketUpgraded, tardis::web::poem::Error> {
let funs = get_tardis_inst();
let upgraded = event_proc_serv::ws_process(listener_code.0, token.0, websocket, funs).await?;
Ok(upgraded)
Err(tardis::web::poem::Error::from_status(tardis::web::poem::http::StatusCode::NOT_IMPLEMENTED))
}
}
83 changes: 25 additions & 58 deletions backend/middlewares/event/src/domain/event_topic.rs
Original file line number Diff line number Diff line change
@@ -1,69 +1,36 @@
use tardis::basic::dto::TardisContext;
use tardis::db::reldb_client::TardisActiveModel;
use std::num::NonZeroU32;
use tardis::db::sea_orm;
use tardis::db::sea_orm::sea_query::{ColumnDef, IndexCreateStatement, Table, TableCreateStatement};
use tardis::db::sea_orm::prelude::*;
use tardis::db::sea_orm::*;

/// Event Topic model
///
/// 事件主题模型
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)]
#[sea_orm(table_name = "event_topic")]
use asteroid_mq::prelude::{TopicCode, TopicConfig, TopicOverflowConfig, TopicOverflowPolicy};
use tardis::{TardisCreateEntity, TardisEmptyBehavior, TardisEmptyRelation};
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, TardisCreateEntity, TardisEmptyBehavior, TardisEmptyRelation)]
#[sea_orm(table_name = "mq_topic")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
#[sea_orm(primary_key, auto_increment = true)]
pub id: String,
/// Whether to save messages
///
/// 是否保存消息
pub save_message: bool,
/// Whether a management node is required
///
/// 是否需要管理节点
pub need_mgr: bool,
pub queue_size: i32,
/// If need_mgr is false, this field is used when registering
///
/// 如果 need_mgr 为 false,则在注册时使用该sk
pub use_sk: String,
/// If need_mgr is true, this field is used when registering
///
/// 如果 need_mgr 为 true,则在注册时使用该sk
pub mgr_sk: String,

pub blocking: bool,
pub topic_code: String,
pub overflow_policy: String,
pub overflow_size: i32,
#[fill_ctx]
pub own_paths: String,
}

impl TardisActiveModel for ActiveModel {
fn fill_ctx(&mut self, ctx: &TardisContext, is_insert: bool) {
if is_insert {
self.own_paths = Set(ctx.own_paths.to_string());
}
}

fn create_table_statement(db: DbBackend) -> TableCreateStatement {
let mut builder = Table::create();
builder
.table(Entity.table_ref())
.if_not_exists()
.col(ColumnDef::new(Column::Id).not_null().string().primary_key())
.col(ColumnDef::new(Column::SaveMessage).not_null().boolean())
.col(ColumnDef::new(Column::NeedMgr).not_null().boolean())
.col(ColumnDef::new(Column::QueueSize).not_null().integer())
.col(ColumnDef::new(Column::UseSk).not_null().string())
.col(ColumnDef::new(Column::MgrSk).not_null().string())
.col(ColumnDef::new(Column::OwnPaths).not_null().string());
if db == DatabaseBackend::MySql {
builder.engine("InnoDB").character_set("utf8mb4").collate("utf8mb4_0900_as_cs");
impl Model {
pub fn into_topic_config(self) -> TopicConfig {
TopicConfig {
code: TopicCode::new(self.topic_code),
blocking: self.blocking,
overflow_config: Some(TopicOverflowConfig {
policy: match self.overflow_policy.as_str() {
"RejectNew" => TopicOverflowPolicy::RejectNew,
"DropOld" => TopicOverflowPolicy::DropOld,
_ => TopicOverflowPolicy::default(),
},
size: NonZeroU32::new(self.overflow_size.clamp(1, i32::MAX) as u32).expect("clamped"),
}),
}
builder.to_owned()
}

fn create_index_statement() -> Vec<IndexCreateStatement> {
vec![]
}
}

impl ActiveModelBehavior for ActiveModel {}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
77 changes: 59 additions & 18 deletions backend/middlewares/event/src/dto/event_dto.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,77 @@
use std::num::NonZeroU32;

use asteroid_mq::prelude::{TopicCode, TopicConfig, TopicOverflowConfig, TopicOverflowPolicy};
use bios_basic::rbum::dto::rbum_filer_dto::{RbumBasicFilterReq, RbumItemFilterFetcher};
use serde::{Deserialize, Serialize};
use tardis::{
basic::field::TrimString,
db::sea_orm::{self},
db::sea_orm::{self, FromQueryResult},
serde_json::Value,
web::poem_openapi,
};

#[derive(poem_openapi::Object, Serialize, Deserialize, Debug)]
#[derive(poem_openapi::Object, Serialize, Deserialize, Debug, Clone)]
pub struct EventTopicConfig {
pub code: String,
pub blocking: bool,
pub topic_code: String,
pub overflow_policy: String,
pub overflow_size: i32,
}

#[derive(poem_openapi::Object, Serialize, Deserialize, Debug, Clone, FromQueryResult)]

pub struct EventTopicAddOrModifyReq {
// #[oai(validator(pattern = r"^[a-z0-9]+$"))]
pub code: TrimString,
pub name: TrimString,
pub save_message: bool,
pub need_mgr: bool,
#[oai(validator(minimum(value = "1", exclusive = "false")))]
pub queue_size: i32,
pub use_sk: Option<String>,
pub mgr_sk: Option<String>,
pub code: String,
pub name: String,
pub blocking: bool,
pub topic_code: String,
pub overflow_policy: String,
pub overflow_size: i32,
}
impl EventTopicAddOrModifyReq {
pub fn into_topic_config(self) -> TopicConfig {
TopicConfig {
code: TopicCode::new(self.topic_code),
blocking: self.blocking,
overflow_config: Some(TopicOverflowConfig {
policy: match self.overflow_policy.as_str() {
"RejectNew" => TopicOverflowPolicy::RejectNew,
"DropOld" => TopicOverflowPolicy::DropOld,
_ => TopicOverflowPolicy::default(),
},
size: NonZeroU32::new(self.overflow_size.clamp(1, i32::MAX) as u32).expect("clamped"),
}),
}
}
}

#[derive(poem_openapi::Object, sea_orm::FromQueryResult, Serialize, Deserialize, Debug, Clone)]
#[derive(poem_openapi::Object, Serialize, Deserialize, Debug, Clone, FromQueryResult)]
pub struct EventTopicInfoResp {
// #[oai(validator(pattern = r"^[a-z0-9]+$"))]
pub code: String,
pub name: String,
pub save_message: bool,
pub need_mgr: bool,
#[oai(validator(minimum(value = "1", exclusive = "false")))]
pub queue_size: i32,
pub use_sk: String,
pub mgr_sk: String,
pub blocking: bool,
pub topic_code: String,
pub overflow_policy: String,
pub overflow_size: i32,
}

impl EventTopicInfoResp {
pub fn into_topic_config(self) -> TopicConfig {
TopicConfig {
code: TopicCode::new(self.topic_code),
blocking: self.blocking,
overflow_config: Some(TopicOverflowConfig {
policy: match self.overflow_policy.as_str() {
"RejectNew" => TopicOverflowPolicy::RejectNew,
"DropOld" => TopicOverflowPolicy::DropOld,
_ => TopicOverflowPolicy::default(),
},
size: NonZeroU32::new(self.overflow_size.clamp(1, i32::MAX) as u32).expect("clamped"),
}),
}
}
}

#[derive(poem_openapi::Object, Serialize, Deserialize, Debug, Clone, Default)]
Expand Down
Loading

0 comments on commit 59bca1f

Please sign in to comment.