Skip to content

Commit

Permalink
make event center independent
Browse files Browse the repository at this point in the history
  • Loading branch information
4t145 committed Jan 22, 2025
1 parent 3261eff commit d5e8178
Show file tree
Hide file tree
Showing 29 changed files with 526 additions and 242 deletions.
5 changes: 2 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,9 @@ strum = { version = "0.26", features = ["derive"] }
# tardis = { version = "0.1.0-rc.17" }
# tardis = { version = "0.2.0", path = "../tardis/tardis" }
tardis = { git = "https://github.com/ideal-world/tardis.git", rev = "aeb4c85" }
# asteroid-mq = { git = "https://github.com/4t145/asteroid-mq.git", rev = "d59c64d" }
asteroid-mq = { git = "https://github.com/4t145/asteroid-mq.git", rev = "b26fa4f" }
asteroid-mq = { git = "https://github.com/4t145/asteroid-mq.git", rev = "ed5811b" }
# asteroid-mq = { version = "0.1.0-alpha.5" }
asteroid-mq-sdk = { git = "https://github.com/4t145/asteroid-mq.git", rev = "b26fa4f" }
asteroid-mq-sdk = { git = "https://github.com/4t145/asteroid-mq.git", rev = "ed5811b" }
# asteroid-mq-sdk = { version = "0.1.0-alpha.5" }
#spacegate

Expand Down
12 changes: 6 additions & 6 deletions backend/basic/src/process/task_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
//! 异步任务处理器
#[cfg(feature = "with-mq")]
use bios_sdk_invoke::clients::event_client::{
asteroid_mq::prelude::{EventAttribute, Subject, TopicCode},
get_topic,
asteroid_mq_sdk::model::{event::EventAttribute, Subject, TopicCode},
mq_client_node_opt,
};
use lazy_static::lazy_static;

Expand Down Expand Up @@ -107,7 +107,7 @@ impl TaskProcessor {
) -> TardisResult<()> {
Self::set_status(cache_key, task_id, status, cache_client).await?;
#[cfg(feature = "with-mq")]
if let Some(_topic) = get_topic(&TASK_TOPIC) {
if let Some(_topic) = mq_client_node_opt() {
// todo: broadcast event to users
// topic
// .send_event(
Expand Down Expand Up @@ -145,7 +145,7 @@ impl TaskProcessor {
) -> TardisResult<()> {
Self::set_process_data(cache_key, task_id, data.clone(), cache_client).await?;
#[cfg(feature = "with-mq")]
if let Some(_topic) = get_topic(&TASK_TOPIC) {
if let Some(_topic) = mq_client_node_opt() {
// todo: broadcast event to users
}
Ok(())
Expand Down Expand Up @@ -222,7 +222,7 @@ impl TaskProcessor {
});
TASK_HANDLE.write().await.insert(task_id, handle);
#[cfg(feature = "with-mq")]
if let Some(_topic) = get_topic(&TASK_TOPIC) {
if let Some(_topic) = mq_client_node_opt() {
// todo: broadcast event to users
}
if let Some(ctx) = ctx {
Expand All @@ -247,7 +247,7 @@ impl TaskProcessor {
) -> TardisResult<u64> {
let task_id = TaskProcessor::init_status(cache_key, Some(task_id), cache_client).await?;
#[cfg(feature = "with-mq")]
if let Some(_topic) = get_topic(&TASK_TOPIC) {
if let Some(_topic) = mq_client_node_opt() {
// todo: broadcast event to users
}
Ok(task_id)
Expand Down
21 changes: 13 additions & 8 deletions backend/middlewares/event/src/api/ca/event_connect_api.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use asteroid_mq::bytes::Bytes;
use asteroid_mq::model::codec::{self, DynCodec};
use asteroid_mq::model::EdgeAuth;
use asteroid_mq::prelude::{Node, NodeId};
use asteroid_mq::protocol::node::edge::codec::CodecKind;
use asteroid_mq::protocol::node::edge::packet::Auth;

use asteroid_mq::protocol::node::edge::EdgeConfig;
use tardis::web::poem::web::websocket::{BoxWebSocketUpgraded, WebSocket};
use tardis::web::poem_openapi;
Expand All @@ -26,14 +26,19 @@ impl EventConnectApi {
///
/// 连接客户端节点
#[oai(path = "/", method = "get")]
async fn ws_process(&self, node_id: Query<String>, websocket: WebSocket) -> Result<BoxWebSocketUpgraded, tardis::web::poem::Error> {
async fn ws_process(&self, node_id: Query<String>, codec: Query<String>, websocket: WebSocket) -> Result<BoxWebSocketUpgraded, tardis::web::poem::Error> {
let peer_id = NodeId::from_base64(&node_id).map_err(|e| tardis::web::poem::Error::from_string(e.to_string(), StatusCode::BAD_REQUEST))?;
let _ctx = self.register_serv.get_ctx(peer_id).await.map_err(|e| tardis::web::poem::Error::from_string(e.to_string(), StatusCode::UNAUTHORIZED))?;
let config = EdgeConfig {
peer_id,
supported_codec_kinds: vec![CodecKind::JSON].into_iter().collect(),
peer_auth: Auth { payload: Bytes::new() },
peer_auth: EdgeAuth::default(),
};
let _ctx = self.register_serv.get_ctx(peer_id).await.map_err(|e| tardis::web::poem::Error::from_string(e.to_string(), StatusCode::UNAUTHORIZED))?;
let codec = match codec.0.to_lowercase().as_str() {
"json" => DynCodec::new(codec::Json),
"bincode" => DynCodec::new(codec::Bincode),
_ => return Err(tardis::web::poem::Error::from_string("unsupported codec", StatusCode::BAD_REQUEST)),
};

let Some(node) = TardisFuns::store().get_singleton::<Node>() else {
return Err(tardis::web::poem::Error::from_string(
"mq server node have not initialized",
Expand All @@ -43,7 +48,7 @@ impl EventConnectApi {
let register_serv = self.register_serv.clone();
let upgraded: BoxWebSocketUpgraded = websocket.on_upgrade(Box::new(|stream| {
Box::pin(async move {
let ws = PoemWs::new(stream);
let ws = PoemWs::new(stream, codec);
let Ok(node_id) = node.create_edge_connection(ws, config).await.inspect_err(|e| {
tracing::error!(?e, "failed to create edge connection");
}) else {
Expand Down
1 change: 1 addition & 0 deletions backend/middlewares/event/src/api/ca/event_register_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ impl EventRegisterApi {
#[oai(path = "/", method = "put")]
async fn register(&self, ctx: TardisContextExtractor) -> TardisApiResult<EventRegisterResp> {
let resp = self.register_serv.register_ctx(&ctx.0).await?;
tardis::tracing::debug!(?resp, "register event node");
TardisResp::ok(resp)
}
}
34 changes: 26 additions & 8 deletions backend/middlewares/event/src/api/ci/event_topic_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,36 @@ impl EventTopicApi {
///
/// 添加事件主题
#[oai(path = "/", method = "post")]
async fn add(&self, add_or_modify_req: Json<EventTopicConfig>, ctx: TardisContextExtractor) -> TardisApiResult<Void> {
async fn add(&self, add_or_modify_req: Json<EventTopicConfig>, ctx: TardisContextExtractor) -> TardisApiResult<String> {
let funs = get_tardis_inst();
let mut add_or_modify_req = add_or_modify_req.0.into_rbum_req();
EventTopicServ::add_item(&mut add_or_modify_req, &funs, &ctx.0).await?;
TardisResp::ok(Void)
let id = EventTopicServ::add_item(&mut add_or_modify_req, &funs, &ctx.0).await?;
TardisResp::ok(id)
}

/// Add Event Definition
///
/// 添加事件主题
#[oai(path = "/", method = "get")]
async fn get_by_code(&self, topic_code: Query<String>, ctx: TardisContextExtractor) -> TardisApiResult<Option<EventTopicInfoResp>> {
let funs = get_tardis_inst();
let result = EventTopicServ::find_one_item(
&EventTopicFilterReq {
basic: Default::default(),
topic_code: Some(topic_code.0),
},
&funs,
&ctx.0,
)
.await?;
TardisResp::ok(result)
}

/// Modify Event Definition
///
/// 修改事件主题
#[oai(path = "/:id", method = "put")]
async fn modify(&self, id: Path<String>, add_or_modify_req: Json<EventTopicConfig>, ctx: TardisContextExtractor) -> TardisApiResult<Void> {
#[oai(path = "/", method = "put")]
async fn modify(&self, id: Query<String>, add_or_modify_req: Json<EventTopicConfig>, ctx: TardisContextExtractor) -> TardisApiResult<Void> {
let funs = get_tardis_inst();
let mut add_or_modify_req = add_or_modify_req.0.into_rbum_req();
EventTopicServ::modify_item(&id.0, &mut add_or_modify_req, &funs, &ctx.0).await?;
Expand All @@ -43,8 +61,8 @@ impl EventTopicApi {
/// Delete Event Definition
///
/// 删除事件主题
#[oai(path = "/:id", method = "delete")]
async fn delete(&self, id: Path<String>, ctx: TardisContextExtractor) -> TardisApiResult<Void> {
#[oai(path = "/", method = "delete")]
async fn delete(&self, id: Query<String>, ctx: TardisContextExtractor) -> TardisApiResult<Void> {
let funs = get_tardis_inst();
EventTopicServ::delete_item(&id.0, &funs, &ctx.0).await?;
TardisResp::ok(Void {})
Expand All @@ -53,7 +71,7 @@ impl EventTopicApi {
/// Find Event Definitions
///
/// 查找事件主题
#[oai(path = "/", method = "get")]
#[oai(path = "/paged", method = "get")]
async fn paginate(
&self,
id: Query<Option<String>>,
Expand Down
3 changes: 3 additions & 0 deletions backend/middlewares/event/src/event_config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use bios_basic::rbum::rbum_config::RbumConfig;

use bios_sdk_invoke::invoke_config::InvokeConfig;
use serde::{Deserialize, Serialize};
use std::{fmt::Debug, sync::Mutex};
use tardis::{
Expand All @@ -19,6 +20,7 @@ pub struct EventConfig {
pub durable: bool,
pub avatars: Vec<String>,
pub cluster: Option<String>,
pub invoke: InvokeConfig,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(default)]
Expand Down Expand Up @@ -54,6 +56,7 @@ impl Default for EventConfig {
durable: true,
cluster: Some(Self::CLUSTER_K8S.to_string()),
raft: None,
invoke: Default::default(),
}
}
}
Expand Down
17 changes: 14 additions & 3 deletions backend/middlewares/event/src/event_initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use bios_basic::rbum::{
rbum_enumeration::RbumScopeLevelKind,
serv::{rbum_crud_serv::RbumCrudOperation, rbum_domain_serv::RbumDomainServ, rbum_item_serv::RbumItemCrudOperation, rbum_kind_serv::RbumKindServ},
};
use bios_sdk_invoke::clients::event_client::SPI_RPC_TOPIC;
use tardis::tracing;
use bios_sdk_invoke::{clients::event_client::SPI_RPC_TOPIC, invoke_initializer};
use tardis::{basic::error::TardisError, tracing};
use tardis::{
basic::{dto::TardisContext, field::TrimString, result::TardisResult},
db::reldb_client::TardisActiveModel,
Expand Down Expand Up @@ -61,6 +61,7 @@ pub async fn init(web_server: &TardisWebServer) -> TardisResult<()> {

async fn init_db(domain_code: String, kind_code: String, funs: &TardisFunsInst, ctx: &TardisContext) -> TardisResult<()> {
bios_basic::rbum::rbum_initializer::init(funs.module_code(), funs.conf::<EventConfig>().rbum.clone()).await?;
invoke_initializer::init(funs.module_code(), funs.conf::<EventConfig>().invoke.clone())?;
if let Some(domain_id) = RbumDomainServ::get_rbum_domain_id_by_code(&domain_code, funs).await? {
let kind_id = RbumKindServ::get_rbum_kind_id_by_code(&kind_code, funs).await?.expect("missing event kind");
EventInfoManager::set(EventInfo { kind_id, domain_id })?;
Expand Down Expand Up @@ -131,7 +132,6 @@ async fn init_api(web_server: &TardisWebServer) -> TardisResult<()> {

#[instrument(skip(config, funs, ctx))]
async fn init_mq_cluster(config: &EventConfig, funs: TardisFunsInst, ctx: TardisContext) -> TardisResult<()> {
use bios_sdk_invoke::clients::event_client::mq_error;
tracing::info!(?config, "init mq cluster",);
let funs = Arc::new(funs);
let mq_node = init_mq_node(config, funs.clone(), &ctx).await;
Expand Down Expand Up @@ -199,3 +199,14 @@ pub async fn init_mq_node(config: &EventConfig, funs: Arc<TardisFunsInst>, ctx:
node
}
}

pub fn mq_node_opt() -> Option<asteroid_mq::prelude::Node> {
TardisFuns::store().get_singleton::<asteroid_mq::prelude::Node>()
}
pub fn mq_node() -> asteroid_mq::prelude::Node {
mq_node_opt().expect("mq node not initialized")
}

pub fn mq_error(err: asteroid_mq::Error) -> TardisError {
TardisError::internal_error(&err.to_string(), "mq-error")
}
54 changes: 29 additions & 25 deletions backend/middlewares/event/src/serv/event_connect_serv.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::task::ready;

use asteroid_mq::protocol::node::edge::{
codec::CodecKind,
connection::{NodeConnection, NodeConnectionError, NodeConnectionErrorKind},
packet::EdgePacket,
use asteroid_mq::model::{
codec::{Codec, CodecKind, DynCodec},
connection::{EdgeConnectionError, EdgeConnectionErrorKind, EdgeNodeConnection},
EdgePayload,
};
use tardis::{
futures::{Sink, Stream},
Expand All @@ -14,49 +14,53 @@ pin_project_lite::pin_project! {
pub struct PoemWs {
#[pin]
inner: WebSocketStream,
codec: DynCodec,
}
}
impl PoemWs {
pub fn new(inner: WebSocketStream) -> Self {
Self { inner }
pub fn new(inner: WebSocketStream, codec: DynCodec) -> Self {
Self { inner, codec }
}
}
impl Sink<EdgePacket> for PoemWs {
type Error = NodeConnectionError;
impl Sink<EdgePayload> for PoemWs {
type Error = EdgeConnectionError;

fn poll_ready(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
self.project().inner.poll_ready(cx).map_err(|e| NodeConnectionError::new(NodeConnectionErrorKind::Underlying(Box::new(e)), "web socket poll ready failed"))
self.project().inner.poll_ready(cx).map_err(|e| EdgeConnectionError::new(EdgeConnectionErrorKind::Underlying(Box::new(e)), "web socket poll ready failed"))
}

fn start_send(self: std::pin::Pin<&mut Self>, item: EdgePacket) -> Result<(), Self::Error> {
self.project()
.inner
.start_send(Message::Binary(item.payload.to_vec()))
.map_err(|e| NodeConnectionError::new(NodeConnectionErrorKind::Underlying(Box::new(e)), "web socket start send failed"))
fn start_send(self: std::pin::Pin<&mut Self>, item: EdgePayload) -> Result<(), Self::Error> {
let this = self.project();
this.inner
.start_send(Message::Binary(
this.codec.encode(&item).map_err(EdgeConnectionError::codec("web socket start send failed"))?,
))
.map_err(|e| EdgeConnectionError::new(EdgeConnectionErrorKind::Underlying(Box::new(e)), "web socket start send failed"))
}

fn poll_flush(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
self.project().inner.poll_flush(cx).map_err(|e| NodeConnectionError::new(NodeConnectionErrorKind::Underlying(Box::new(e)), "web socket poll flush failed"))
self.project().inner.poll_flush(cx).map_err(|e| EdgeConnectionError::new(EdgeConnectionErrorKind::Underlying(Box::new(e)), "web socket poll flush failed"))
}

fn poll_close(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
self.project().inner.poll_close(cx).map_err(|e| NodeConnectionError::new(NodeConnectionErrorKind::Underlying(Box::new(e)), "web socket poll close failed"))
self.project().inner.poll_close(cx).map_err(|e| EdgeConnectionError::new(EdgeConnectionErrorKind::Underlying(Box::new(e)), "web socket poll close failed"))
}
}

impl Stream for PoemWs {
type Item = Result<EdgePacket, NodeConnectionError>;
type Item = Result<EdgePayload, EdgeConnectionError>;

fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> {
let next = ready!(self.project().inner.poll_next(cx));
let this = self.project();
let next = ready!(this.inner.poll_next(cx));
match next {
Some(Ok(Message::Binary(data))) => {
let packet = EdgePacket::new(CodecKind::JSON, data);
std::task::Poll::Ready(Some(Ok(packet)))
let payload_result = this.codec.decode(&data).map_err(EdgeConnectionError::codec("axum ws poll next failed"));
std::task::Poll::Ready(Some(payload_result))
}
Some(Ok(Message::Text(data))) => {
let packet = EdgePacket::new(CodecKind::JSON, data);
std::task::Poll::Ready(Some(Ok(packet)))
let payload_result = this.codec.decode(data.as_bytes()).map_err(EdgeConnectionError::codec("axum ws poll next failed"));
std::task::Poll::Ready(Some(payload_result))
}
Some(Ok(Message::Close(_))) => {
tracing::debug!("received close message");
Expand All @@ -68,13 +72,13 @@ impl Stream for PoemWs {
cx.waker().wake_by_ref();
std::task::Poll::Pending
}
Some(Err(e)) => std::task::Poll::Ready(Some(Err(NodeConnectionError::new(
NodeConnectionErrorKind::Underlying(Box::new(e)),
Some(Err(e)) => std::task::Poll::Ready(Some(Err(EdgeConnectionError::new(
EdgeConnectionErrorKind::Underlying(Box::new(e)),
"web socket poll next failed",
)))),
None => std::task::Poll::Ready(None),
}
}
}

impl NodeConnection for PoemWs {}
impl EdgeNodeConnection for PoemWs {}
3 changes: 2 additions & 1 deletion backend/middlewares/event/src/serv/event_topic_serv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use bios_basic::rbum::dto::rbum_filer_dto::RbumBasicFilterReq;
use bios_basic::rbum::dto::rbum_item_dto::{RbumItemKernelAddReq, RbumItemKernelModifyReq};
use bios_basic::rbum::rbum_enumeration::RbumScopeLevelKind;
use bios_basic::rbum::serv::rbum_item_serv::RbumItemCrudOperation;
use bios_sdk_invoke::clients::event_client::mq_node_opt;
use bios_sdk_invoke::clients::event_client::mq_client_node_opt;
use tardis::basic::dto::TardisContext;
use tardis::basic::error::TardisError;
use tardis::basic::result::TardisResult;
Expand All @@ -21,6 +21,7 @@ use tardis::TardisFunsInst;
use crate::domain::event_topic;
use crate::dto::event_dto::{EventTopicAddOrModifyReq, EventTopicFilterReq, EventTopicInfoResp, SetTopicAuth, TopicAuth};
use crate::event_config::EventInfoManager;
use crate::event_initializer::mq_node_opt;

use super::event_auth_serv::EventAuthServ;

Expand Down
3 changes: 3 additions & 0 deletions backend/middlewares/event/tests/config/conf-default.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
enable = true
cluster = "singleton"
durable = true
[csm.event.invoke]
spi_app_id = "test"


[fw.web_server]
port = 8080
Expand Down
Loading

0 comments on commit d5e8178

Please sign in to comment.