diff --git a/Cargo.toml b/Cargo.toml index f4fa4805d..18e1c7440 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,7 +65,7 @@ strum = { version = "0.26", features = ["derive"] } # tardis # tardis = { version = "0.1.0-rc.17" } # tardis = { version = "0.2.0", path = "../tardis/tardis" } -tardis = { git = "https://github.com/ideal-world/tardis.git", rev = "f666d50" } +tardis = { git = "https://github.com/ideal-world/tardis.git", rev = "0058079" } # asteroid-mq = { git = "https://github.com/4t145/asteroid-mq.git", rev = "d59c64d" } asteroid-mq = { git = "https://github.com/4t145/asteroid-mq.git", rev = "b26fa4f" } # asteroid-mq = { version = "0.1.0-alpha.5" } @@ -78,7 +78,7 @@ asteroid-mq-sdk = { git = "https://github.com/4t145/asteroid-mq.git", rev = "b26 # "k8s", # "ext-axum", # ] } -spacegate-shell = { git = "https://github.com/ideal-world/spacegate.git", rev="fe747e4", features = [ +spacegate-shell = { git = "https://github.com/ideal-world/spacegate.git", rev="8065bb6", features = [ "cache", "k8s", "ext-axum", diff --git a/backend/basic/src/rbum/serv/rbum_cert_serv.rs b/backend/basic/src/rbum/serv/rbum_cert_serv.rs index 9dd422b10..b5bd1378d 100644 --- a/backend/basic/src/rbum/serv/rbum_cert_serv.rs +++ b/backend/basic/src/rbum/serv/rbum_cert_serv.rs @@ -842,7 +842,7 @@ impl RbumCertServ { let rbum_cert = funs.db().get_dto::(&query).await?; if let Some(rbum_cert) = rbum_cert { if Self::cert_is_locked(&rbum_cert.rel_rbum_id, funs).await? { - return Err(funs.err().unauthorized(&Self::get_obj_name(), "valid", "cert is locked", "400-rbum-cert-lock")); + return Err(funs.err().error("400-rbum-cert-lock", &Self::get_obj_name(), "valid", "cert is locked", "400-rbum-cert-lock")); } if !ignore_end_time && rbum_cert.end_time < Utc::now() { return Err(funs.err().conflict(&Self::get_obj_name(), "valid", "sk is expired", "409-rbum-cert-sk-expire")); diff --git a/backend/gateways/spacegate-plugins/Cargo.toml b/backend/gateways/spacegate-plugins/Cargo.toml index fd89176a4..6297d5e95 100644 --- a/backend/gateways/spacegate-plugins/Cargo.toml +++ b/backend/gateways/spacegate-plugins/Cargo.toml @@ -22,10 +22,12 @@ spacegate-shell = { workspace = true, features = [ "ext-redis", "ext-axum", "plugin-east-west-traffic-white-list", + "plugin-limit" ] } bios-sdk-invoke = { version = "0.2.0", path = "../../../frontend/sdks/invoke", features = [ "spi_log", + "reach", ], default-features = false } diff --git a/backend/gateways/spacegate-plugins/src/extension.rs b/backend/gateways/spacegate-plugins/src/extension.rs index 557f47fdf..e66e8cc51 100644 --- a/backend/gateways/spacegate-plugins/src/extension.rs +++ b/backend/gateways/spacegate-plugins/src/extension.rs @@ -1,37 +1,6 @@ -use http::Extensions; -use spacegate_shell::{kernel::extension::ExtensionPack as _, BoxError}; -use tardis::serde_json::{self, Value}; - -use self::audit_log_param::LogParamContent; pub mod audit_log_param; pub mod before_encrypt_body; pub mod cert_info; pub mod notification; pub mod request_crypto_status; -pub enum ExtensionPackEnum { - LogParamContent(), - None, -} - -impl From for ExtensionPackEnum { - fn from(value: String) -> Self { - match value.as_str() { - "log_content" => ExtensionPackEnum::LogParamContent(), - _ => ExtensionPackEnum::None, - } - } -} -impl ExtensionPackEnum { - pub fn _to_value(&self, ext: &Extensions) -> Result, BoxError> { - match self { - ExtensionPackEnum::LogParamContent() => { - if let Some(ext) = LogParamContent::get(ext) { - return Ok(Some(serde_json::to_value(ext)?)); - } - } - ExtensionPackEnum::None => (), - } - Ok(None) - } -} diff --git a/backend/gateways/spacegate-plugins/src/extension/audit_log_param.rs b/backend/gateways/spacegate-plugins/src/extension/audit_log_param.rs index b7d80f7e5..3f0144b3c 100644 --- a/backend/gateways/spacegate-plugins/src/extension/audit_log_param.rs +++ b/backend/gateways/spacegate-plugins/src/extension/audit_log_param.rs @@ -1,11 +1,25 @@ +use bios_sdk_invoke::clients::spi_log_client::{self, LogItemAddV2Req}; +use http::uri::Scheme; +use serde::{Deserialize, Serialize}; +use spacegate_shell::{ + hyper::HeaderMap, + kernel::{ + extension::{EnterTime, OriginalIpAddr, PeerAddr}, + Extract, + }, + SgRequestExt, SgResponse, +}; use std::time::Duration; +use tardis::{basic::dto::TardisContext, TardisFuns, TardisFunsInst}; +use tardis::{log as tracing, tokio}; -use serde::{Deserialize, Serialize}; -use spacegate_shell::{hyper::HeaderMap, kernel::extension::ExtensionPack}; -use super::cert_info::RoleInfo; -#[derive(Clone)] +use crate::{audit_log::AuditLogPlugin, plugin::PluginBiosExt}; + +use super::cert_info::{CertInfo, RoleInfo}; + +#[derive(Debug, Clone)] pub struct AuditLogParam { pub request_path: String, pub request_method: String, @@ -14,6 +28,41 @@ pub struct AuditLogParam { pub request_ip: String, } +impl Extract for AuditLogParam { + fn extract(req: &http::Request) -> Self { + AuditLogParam { + request_path: req.uri().path().to_string(), + request_method: req.method().to_string(), + request_headers: req.headers().clone(), + request_scheme: req.uri().scheme().unwrap_or(&Scheme::HTTP).to_string(), + request_ip: req.extract::().to_string(), + } + } +} + +impl AuditLogParam { + pub fn merge_audit_log_param_content(self, response: &SgResponse, success: bool, header_token_name: &str) -> LogParamContent { + let cert_info = response.extensions().get::(); + let start_time = response.extensions().get::().map(|time| time.0); + let end_time = std::time::Instant::now(); + let param = self; + LogParamContent { + op: param.request_method, + name: cert_info.and_then(|info| info.name.clone()).unwrap_or_default(), + user_id: cert_info.map(|info| info.id.clone()), + role: cert_info.map(|info| info.roles.clone()).unwrap_or_default(), + ip: param.request_ip, + path: param.request_path, + scheme: param.request_scheme, + token: param.request_headers.get(header_token_name).and_then(|v| v.to_str().ok().map(|v| v.to_string())), + server_timing: start_time.map(|st| end_time - st), + resp_status: response.status().as_u16().to_string(), + success, + own_paths: cert_info.and_then(|info| info.own_paths.clone()), + } + } +} + #[derive(Clone, Serialize, Deserialize)] pub struct LogParamContent { pub op: String, @@ -31,4 +80,56 @@ pub struct LogParamContent { pub success: bool, } -impl ExtensionPack for LogParamContent {} +impl LogParamContent { + pub fn send_audit_log(self, spi_app_id: &str, log_url: &str, tag: &str) { + send_audit_log(spi_app_id, log_url, tag, self) + } +} + + + +fn send_audit_log(spi_app_id: &str, log_url: &str, tag: &str, content: LogParamContent) { + let funs = AuditLogPlugin::get_funs_inst_by_plugin_code(); + + let spi_ctx = TardisContext { + ak: spi_app_id.to_string(), + own_paths: spi_app_id.to_string(), + ..Default::default() + }; + + let tag = tag.to_string(); + if !log_url.is_empty() && spi_app_id.is_empty() { + tokio::task::spawn(async move { + match spi_log_client::SpiLogClient::addv2( + LogItemAddV2Req { + tag, + content: TardisFuns::json.obj_to_json(&content).unwrap_or_default(), + kind: None, + ext: Some(content.to_value()), + key: None, + op: Some(content.op), + rel_key: None, + idempotent_id: None, + ts: Some(tardis::chrono::Utc::now()), + owner: content.user_id, + own_paths: None, + msg: None, + owner_name: None, + push: false, + disable: None, + }, + &funs, + &spi_ctx, + ) + .await + { + Ok(_) => { + tracing::trace!("[Plugin.AuditLog] add log success") + } + Err(e) => { + tracing::warn!("[Plugin.AuditLog] failed to add log:{e}") + } + }; + }); + } +} diff --git a/backend/gateways/spacegate-plugins/src/extension/notification.rs b/backend/gateways/spacegate-plugins/src/extension/notification.rs index 840686c55..50a978311 100644 --- a/backend/gateways/spacegate-plugins/src/extension/notification.rs +++ b/backend/gateways/spacegate-plugins/src/extension/notification.rs @@ -1,49 +1,112 @@ -use std::{borrow::Cow, collections::HashMap, sync::Arc}; +use std::{collections::HashMap, sync::Arc, time::Duration}; -use http::{header::CONTENT_TYPE, HeaderName, HeaderValue, Uri}; use serde::{Deserialize, Serialize}; -use spacegate_shell::{kernel::backend_service::http_client_service::HttpClient, SgBody, SgRequest}; -use tardis::{log as tracing, serde_json}; +use tardis::{basic::dto::TardisContext, log as tracing, tokio}; /// Context to call notification api /// -/// Extract it from request extensions, and call [`NotificationContext::notify`] to send notification +/// Extract it from request extensions, and call [`NotificationContext::report`] to send notification #[derive(Debug, Clone)] pub struct NotificationContext { - pub(crate) api: Arc, - pub(crate) headers: Arc>, - pub(crate) client: HttpClient, + pub(crate) reach_api: Arc, + pub(crate) log_api: Arc, + pub(crate) spi_app_id: Arc, + pub(crate) audit_log_tag: Arc, + pub(crate) audit_log_token_header_name: Arc, + pub(crate) templates: Arc>, + pub(crate) audit_param: Arc, + pub(crate) cache_client: RedisClient, + pub(crate) dedup_cache_cool_down: Duration, } impl NotificationContext { - fn build_notification_request(&self, req: &ReachMsgSendReq) -> SgRequest { - let req_bytes = serde_json::to_vec(&req).expect("ReachMsgSendReq is a valid json"); - let body = SgBody::full(req_bytes); - let mut req = SgRequest::new(body); - req.headers_mut().insert(CONTENT_TYPE, HeaderValue::from_static("application/json")); - *req.uri_mut() = self.api.as_ref().clone(); - for (k, v) in self.headers.iter() { - req.headers_mut().insert(k.clone(), v.clone()); + pub fn submit_notify(&self, req: ReachMsgSendReq, dedup_hash: u64) { + if self.reach_api.is_empty() { + tracing::debug!("reach api is empty, skip sending notification"); + return; } - req + let cache_client = self.cache_client.clone(); + let ctx = TardisContext { + ak: self.spi_app_id.to_string(), + own_paths: self.spi_app_id.to_string(), + ..Default::default() + }; + let cool_down = self.dedup_cache_cool_down.as_secs().min(1); + tokio::spawn(async move { + let key = format!("sg:plugin:{}:{}", NotifyPlugin::CODE, dedup_hash); + let mut conn = cache_client.get_conn().await; + // check if the key exists + if let Ok(Some(_)) = conn.get::<'_, _, Option>(&key).await { + tracing::debug!("dedup cache hit, skip sending notification"); + return; + } + + // set the dedup key + if let Err(e) = conn.set_ex::<'_, _, _,Option>(&key, "1", cool_down).await { + tracing::error!(error = ?e, "set dedup cache failed"); + return; + } + + let funs = NotifyPlugin::get_funs_inst_by_plugin_code(); + let response = bios_sdk_invoke::clients::reach_client::ReachClient::send_message(&req.into(), &funs, &ctx).await; + if let Err(e) = response { + tracing::error!(error = ?e, "send notification failed"); + } + }); } - pub async fn notify(&self, req: &ReachMsgSendReq) { - let notify_response = self.client.clone().request(self.build_notification_request(req)).await; - if !notify_response.status().is_success() { - tracing::warn!(response = ?notify_response, "send notification failed"); - } + pub fn report(&self, response: &SgResponse, report: R) { + let replace = report.get_replacement(); + let key = report.key(); - let Ok(response) = notify_response.into_body().dump().await.inspect_err(|e| { - tracing::error!(error = ?e, "failed to read response body"); - }) else { - return; - }; - let response_str = String::from_utf8_lossy(response.get_dumped().expect("just dump body")); - tracing::debug!(response = ?response_str, "receive notification api response"); + if let Some(template) = self.templates.get(key) { + if let Some(notify_req) = template.reach.as_ref() { + let mut req = notify_req.clone(); + req.merge_replace(replace.clone()); + let context = self.clone(); + context.submit_notify(req, report.dedup_hash()); + } + if let Some(log_template) = template.audit_log.as_ref() { + let formatted = format_template(log_template, &replace); + self.submit_audit_log(response, Some(formatted)); + } + } + } + pub fn submit_audit_log(&self, response: &SgResponse, extra_info: Option) { + let mut log_param_content = self.audit_param.as_ref().clone().merge_audit_log_param_content(response, true, &self.audit_log_token_header_name); + if let Some(extra_info) = extra_info { + log_param_content.op = extra_info; + } + log_param_content.send_audit_log(&self.spi_app_id, &self.log_api, &self.audit_log_tag); } } -#[derive(Debug, Serialize, Deserialize)] +pub struct TamperReport {} + +pub struct UnauthorizedOperationReport {} + +pub struct CertLockReport {} + +#[derive(Debug, Clone)] +pub struct ContentFilterForbiddenReport { + pub(crate) forbidden_reason: String, +} + +use spacegate_shell::{ + ext_redis::{redis::AsyncCommands, RedisClient}, + plugin::{ + schemars::{self, JsonSchema}, + Plugin, + }, + SgResponse, +}; + +use crate::plugin::{ + notify::{format_template, NotifyPlugin, NotifyPluginConfigTemplates, NotifyPluginConfigTemplatesItem, Report}, + PluginBiosExt, +}; + +use super::audit_log_param::AuditLogParam; +#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)] pub struct ReachMsgSendReq { pub scene_code: String, pub receives: Vec, @@ -51,9 +114,35 @@ pub struct ReachMsgSendReq { pub replace: HashMap, } -#[derive(Debug, Serialize, Deserialize)] +impl ReachMsgSendReq { + pub fn merge_replace>(&mut self, replace: impl IntoIterator) { + self.replace.extend(replace.into_iter().map(|(k, v)| (k.into(), v))); + } +} + +impl From for bios_sdk_invoke::clients::reach_client::ReachMsgSendReq { + fn from(val: ReachMsgSendReq) -> Self { + bios_sdk_invoke::clients::reach_client::ReachMsgSendReq { + scene_code: val.scene_code, + receives: val.receives.into_iter().map(Into::into).collect(), + rel_item_id: val.rel_item_id, + replace: val.replace, + } + } +} +#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)] pub struct ReachMsgReceive { pub receive_group_code: String, pub receive_kind: String, pub receive_ids: Vec, } + +impl From for bios_sdk_invoke::clients::reach_client::ReachMsgReceive { + fn from(val: ReachMsgReceive) -> Self { + bios_sdk_invoke::clients::reach_client::ReachMsgReceive { + receive_group_code: val.receive_group_code, + receive_kind: val.receive_kind, + receive_ids: val.receive_ids, + } + } +} diff --git a/backend/gateways/spacegate-plugins/src/lib.rs b/backend/gateways/spacegate-plugins/src/lib.rs index 69435faa0..2ffa9dcb1 100644 --- a/backend/gateways/spacegate-plugins/src/lib.rs +++ b/backend/gateways/spacegate-plugins/src/lib.rs @@ -6,6 +6,7 @@ mod consts; mod extension; mod marker; mod plugin; +mod utils; pub const PACKAGE_NAME: &str = "spacegate_lib"; use plugin::{notify, op_redis_publisher}; diff --git a/backend/gateways/spacegate-plugins/src/plugin.rs b/backend/gateways/spacegate-plugins/src/plugin.rs index f335721f0..775d8f90f 100644 --- a/backend/gateways/spacegate-plugins/src/plugin.rs +++ b/backend/gateways/spacegate-plugins/src/plugin.rs @@ -1,8 +1,22 @@ +use spacegate_shell::plugin::Plugin; +use tardis::{TardisFuns, TardisFunsInst}; + pub mod anti_replay; pub mod anti_xss; pub mod audit_log; pub mod auth; +pub mod content_filter; pub mod ip_time; pub mod notify; pub mod op_redis_publisher; pub mod rewrite_ns_b_ip; + +pub trait PluginBiosExt { + fn get_funs_inst_by_plugin_code() -> TardisFunsInst; +} + +impl PluginBiosExt for T { + fn get_funs_inst_by_plugin_code() -> TardisFunsInst { + TardisFuns::inst(Self::CODE, None) + } +} diff --git a/backend/gateways/spacegate-plugins/src/plugin/anti_xss.rs b/backend/gateways/spacegate-plugins/src/plugin/anti_xss.rs index 0b63ca6ce..9498b8b62 100644 --- a/backend/gateways/spacegate-plugins/src/plugin/anti_xss.rs +++ b/backend/gateways/spacegate-plugins/src/plugin/anti_xss.rs @@ -9,10 +9,12 @@ use spacegate_shell::{ }; macro_rules! append_value { - ($result:expr, $field:expr, $value:expr) => { - if let Some(val) = $value { - $result.push_str(&format!("{} {};", $field, val)); - } + ($result:ident {$($field:literal: $value:expr ,)*}) => { + $(if let Some(val) = $value { + $result.push_str($field); + $result.push(' '); + $result.push_str(val.as_str()); + })* }; } @@ -56,28 +58,29 @@ pub struct CSPConfig { impl CSPConfig { fn to_string_header_value(&self) -> String { let mut result = format!("default-src {};", self.default_src); - append_value!(result, "base-uri", &self.base_uri); - append_value!(result, "child-src", &self.child_src); - append_value!(result, "connect-src", &self.connect_src); - append_value!(result, "font-src", &self.font_src); - append_value!(result, "form-action", &self.form_action); - append_value!(result, "frame-ancestors", &self.frame_ancestors); - append_value!(result, "frame-src", &self.frame_src); - append_value!(result, "img-src", &self.img_src); - append_value!(result, "manifest-src", &self.manifest_src); - append_value!(result, "media-src", &self.media_src); - append_value!(result, "object-src", &self.object_src); - append_value!(result, "sandbox", &self.sandbox); - append_value!(result, "script-src", &self.script_src); - append_value!(result, "script-src-attr", &self.script_src_attr); - append_value!(result, "script-src-elem", &self.script_src_elem); - append_value!(result, "strict-dynamic", &self.strict_dynamic); - append_value!(result, "style-src", &self.style_src); - append_value!(result, "style-src-attr", &self.style_src_attr); - append_value!(result, "style-src-elem", &self.style_src_elem); - append_value!(result, "worker-src", &self.worker_src); - append_value!(result, "report-to", &self.report_to); - + append_value!(result { + "base-uri": &self.base_uri, + "child-src": &self.child_src, + "connect-src": &self.connect_src, + "font-src": &self.font_src, + "form-action": &self.form_action, + "frame-ancestors": &self.frame_ancestors, + "frame-src": &self.frame_src, + "img-src": &self.img_src, + "manifest-src": &self.manifest_src, + "media-src": &self.media_src, + "object-src": &self.object_src, + "sandbox": &self.sandbox, + "script-src": &self.script_src, + "script-src-attr": &self.script_src_attr, + "script-src-elem": &self.script_src_elem, + "strict-dynamic": &self.strict_dynamic, + "style-src": &self.style_src, + "style-src-attr": &self.style_src_attr, + "style-src-elem": &self.style_src_elem, + "worker-src": &self.worker_src, + "report-to": &self.report_to, + }); result } } @@ -130,18 +133,24 @@ pub enum SandBoxValue { impl fmt::Display for SandBoxValue { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.as_str()) + } +} + +impl SandBoxValue { + pub fn as_str(&self) -> &'static str { match self { - SandBoxValue::None => write!(f, ""), - SandBoxValue::AllowForms => write!(f, "allow-forms"), - SandBoxValue::AllowModals => write!(f, "allow-modals"), - SandBoxValue::AllowOrientationLock => write!(f, "allow-orientation-lock"), - SandBoxValue::AllowPointerLock => write!(f, "allow-pointer-lock"), - SandBoxValue::AllowPopups => write!(f, "allow-popups"), - SandBoxValue::AllowPopupsToEscapeSandbox => write!(f, "allow-popups-to-escape-sandbox"), - SandBoxValue::AllowPresentation => write!(f, "allow-presentation"), - SandBoxValue::AllowSameOrigin => write!(f, "allow-same-origin"), - SandBoxValue::AllowScripts => write!(f, "allow-scripts"), - SandBoxValue::AllowTopNavigation => write!(f, "allow-top-navigation"), + SandBoxValue::None => "", + SandBoxValue::AllowForms => "allow-forms", + SandBoxValue::AllowModals => "allow-modals", + SandBoxValue::AllowOrientationLock => "allow-orientation-lock", + SandBoxValue::AllowPointerLock => "allow-pointer-lock", + SandBoxValue::AllowPopups => "allow-popups", + SandBoxValue::AllowPopupsToEscapeSandbox => "allow-popups-to-escape-sandbox", + SandBoxValue::AllowPresentation => "allow-presentation", + SandBoxValue::AllowSameOrigin => "allow-same-origin", + SandBoxValue::AllowScripts => "allow-scripts", + SandBoxValue::AllowTopNavigation => "allow-top-navigation", } } } diff --git a/backend/gateways/spacegate-plugins/src/plugin/audit_log.rs b/backend/gateways/spacegate-plugins/src/plugin/audit_log.rs index 003a39083..8a4016bcb 100644 --- a/backend/gateways/spacegate-plugins/src/plugin/audit_log.rs +++ b/backend/gateways/spacegate-plugins/src/plugin/audit_log.rs @@ -1,23 +1,19 @@ use std::collections::HashMap; use std::str::FromStr; -use std::time::Instant; -use bios_sdk_invoke::clients::spi_log_client::{self, LogItemAddV2Req}; use bios_sdk_invoke::invoke_config::InvokeConfig; use bios_sdk_invoke::invoke_enumeration::InvokeModuleKind; use bios_sdk_invoke::invoke_initializer; -use http::uri::Scheme; use jsonpath_rust::JsonPathInst; use serde::{Deserialize, Serialize}; use spacegate_shell::hyper::{Request, Response}; -use spacegate_shell::kernel::extension::{EnterTime, PeerAddr, Reflect}; +use spacegate_shell::kernel::extension::Reflect; use spacegate_shell::kernel::helper_layers::function::Inner; use spacegate_shell::plugin::{Plugin, PluginError}; -use spacegate_shell::{BoxError, SgBody}; -use tardis::basic::dto::TardisContext; +use spacegate_shell::{BoxError, SgBody, SgRequestExt}; use tardis::log::{debug, trace, warn}; use tardis::serde_json::{json, Value}; @@ -26,8 +22,6 @@ use tardis::{ basic::result::TardisResult, log, serde_json::{self}, - tokio::{self}, - TardisFuns, TardisFunsInst, }; use crate::extension::audit_log_param::{AuditLogParam, LogParamContent}; @@ -71,10 +65,6 @@ impl AuditLogPlugin { } } trace!("[Plugin.AuditLog] exclude log path do not matched: path {}", path); - - let start_time = resp.extensions().get::().map(|time| time.0); - let end_time = Instant::now(); - let body_string = if let Some(raw_body) = resp.extensions_mut().remove::().map(|b| b.get()) { serde_json::from_str::(&String::from_utf8_lossy(&raw_body)) } else { @@ -123,20 +113,7 @@ impl AuditLogPlugin { } Err(_) => false, }; - let content = LogParamContent { - op: param.request_method, - name: resp.extensions().get::().and_then(|info| info.name.clone()).unwrap_or_default(), - user_id: resp.extensions().get::().map(|info| info.id.clone()), - role: resp.extensions().get::().map(|info| info.roles.clone()).unwrap_or_default(), - ip: param.request_ip, - path: param.request_path, - scheme: param.request_scheme, - token: param.request_headers.get(&self.header_token_name).and_then(|v| v.to_str().ok().map(|v| v.to_string())), - server_timing: start_time.map(|st| end_time - st), - resp_status: resp.status().as_u16().to_string(), - success, - own_paths: resp.extensions().get::().and_then(|info| info.own_paths.clone()), - }; + let content = param.merge_audit_log_param_content(&resp, success, &self.header_token_name); Ok((resp, Some(content))) } @@ -165,14 +142,7 @@ impl AuditLogPlugin { } fn req(&self, mut req: Request) -> Result, Response> { - let param = AuditLogParam { - request_path: req.uri().path().to_string(), - request_method: req.method().to_string(), - request_headers: req.headers().clone(), - request_scheme: req.uri().scheme().unwrap_or(&Scheme::HTTP).to_string(), - request_ip: req.extensions().get::().ok_or_else(|| PluginError::internal_error::("[Plugin.AuditLog] missing peer addr"))?.0.ip().to_string(), - }; - + let param = req.extract::(); if let Some(ident) = req.headers().get(self.head_key_auth_ident.clone()) { let ident = ident.to_str().unwrap_or_default().to_string(); let reflect = req.extensions_mut().get_mut::().expect("missing reflect"); @@ -198,49 +168,7 @@ impl AuditLogPlugin { let (resp, content) = self.get_log_content(resp).await.map_err(PluginError::internal_error::)?; if let Some(content) = content { - let funs = get_tardis_inst(); - - let spi_ctx = TardisContext { - owner: resp.extensions().get::().map(|info| info.id.clone()).unwrap_or_default(), - roles: resp.extensions().get::().map(|info| info.roles.clone().into_iter().map(|r| r.id).collect()).unwrap_or_default(), - ..Default::default() - }; - - let tag = self.tag.clone(); - if !self.log_url.is_empty() && !self.spi_app_id.is_empty() { - tokio::task::spawn(async move { - match spi_log_client::SpiLogClient::addv2( - LogItemAddV2Req { - tag, - content: TardisFuns::json.obj_to_json(&content).unwrap_or_default(), - kind: None, - ext: Some(content.to_value()), - key: None, - op: Some(content.op), - rel_key: None, - idempotent_id: None, - ts: Some(tardis::chrono::Utc::now()), - owner: content.user_id, - own_paths: None, - msg: None, - owner_name: None, - push: false, - disable: None, - }, - &funs, - &spi_ctx, - ) - .await - { - Ok(_) => { - log::trace!("[Plugin.AuditLog] add log success") - } - Err(e) => { - log::warn!("[Plugin.AuditLog] failed to add log:{e}") - } - }; - }); - } + content.send_audit_log(&self.spi_app_id, &self.log_url, &self.tag); } Ok(resp) @@ -295,12 +223,8 @@ impl Plugin for AuditLogPlugin { } } -fn get_tardis_inst() -> TardisFunsInst { - TardisFuns::inst(CODE.to_string(), None) -} - impl LogParamContent { - fn to_value(&self) -> Value { + pub fn to_value(&self) -> Value { json!({ "name":self.name, "id":self.user_id, diff --git a/backend/gateways/spacegate-plugins/src/plugin/auth.rs b/backend/gateways/spacegate-plugins/src/plugin/auth.rs index 5e7ac23b0..428e50594 100644 --- a/backend/gateways/spacegate-plugins/src/plugin/auth.rs +++ b/backend/gateways/spacegate-plugins/src/plugin/auth.rs @@ -33,13 +33,12 @@ use std::{ }; use tardis::{ basic::{error::TardisError, result::TardisResult}, - cache::AsyncCommands as _, config::config_dto::CacheModuleConfig, serde_json::{self, json}, tokio::{sync::RwLock, task::JoinHandle}, tracing::{self as tracing, instrument, warn}, url::Url, - web::web_resp::TardisResp, + web::web_resp::{TardisResp, HEADER_X_TARDIS_ERROR}, TardisFuns, }; use tardis::{config::config_dto::TardisComponentConfig, web::poem_openapi::types::Type}; @@ -270,7 +269,7 @@ impl AuthPlugin { let is_east_west_traffic = req.extensions().get::().map(Deref::deref).unwrap_or(&false); if self.auth_config.strict_security_mode && !is_true_mix_req && !is_east_west_traffic { tracing::debug!("[SG.Filter.Auth] handle mix request"); - return Ok(handle_mix_req(&self, req).await.map_err(PluginError::internal_error::)?); + return Ok(handle_mix_req(self, req).await.map_err(PluginError::internal_error::)?); } req.headers_mut().append(&self.header_is_mix_req, HeaderValue::from_static("false")); @@ -303,12 +302,7 @@ impl AuthPlugin { } Err(e) => { tracing::info!("[SG.Filter.Auth] auth return error {:?}", e); - let err_resp = Response::builder() - .header(http::header::CONTENT_TYPE, HeaderValue::from_static("application/json")) - .status(StatusCode::from_str(&e.code).unwrap_or(StatusCode::BAD_GATEWAY)) - .body(SgBody::full(format!("[SG.Filter.Auth] auth return error:{e}"))) - .map_err(PluginError::internal_error::)?; - Err(err_resp) + Err(tardis_error_into_response(e)) } } } @@ -601,6 +595,20 @@ impl Plugin for AuthPlugin { } } +pub fn tardis_error_into_response(e: TardisError) -> Response { + let status_code = if e.code.len() >= 3 { + StatusCode::from_str(&e.code[0..3]).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR) + } else { + StatusCode::INTERNAL_SERVER_ERROR + }; + Response::builder() + .header(http::header::CONTENT_TYPE, HeaderValue::from_static("application/json")) + .header(HEADER_X_TARDIS_ERROR, &e.code) + .status(status_code) + .body(SgBody::full(format!("[SG.Filter.Auth] auth return error:{e}"))) + .expect("invalid response") +} + #[cfg(test)] #[allow(clippy::unwrap_used)] mod tests; diff --git a/backend/gateways/spacegate-plugins/src/plugin/content_filter.rs b/backend/gateways/spacegate-plugins/src/plugin/content_filter.rs new file mode 100644 index 000000000..18584b80c --- /dev/null +++ b/backend/gateways/spacegate-plugins/src/plugin/content_filter.rs @@ -0,0 +1,130 @@ +use http::StatusCode; +use serde::{Deserialize, Serialize}; +use spacegate_shell::hyper::body::Body; +use spacegate_shell::plugin::Plugin; +use spacegate_shell::plugin::{ + plugin_meta, + schemars::{self, JsonSchema}, +}; +use spacegate_shell::{BoxError, SgResponse, SgResponseExt}; +use std::ops::Deref; +use std::str::FromStr; +use std::{fmt::Display, sync::Arc}; +use tardis::regex::bytes::Regex as BytesRegex; +use tardis::serde_json; + +use crate::extension::notification::ContentFilterForbiddenReport; + +#[derive(Debug, Clone)] +pub enum BytesFilter { + Regex(BytesRegex), +} + +impl JsonSchema for BytesFilter { + fn schema_name() -> String { + String::schema_name() + } + + fn json_schema(gen: &mut schemars::gen::SchemaGenerator) -> schemars::schema::Schema { + String::json_schema(gen) + } + + fn schema_id() -> std::borrow::Cow<'static, str> { + String::schema_id() + } + + fn is_referenceable() -> bool { + String::is_referenceable() + } +} + +impl Display for BytesFilter { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + BytesFilter::Regex(regex) => write!(f, "{}", regex.as_str()), + } + } +} + +impl FromStr for BytesFilter { + type Err = String; + + fn from_str(s: &str) -> Result { + BytesRegex::new(s).map(BytesFilter::Regex).map_err(|e| e.to_string()) + } +} +impl Serialize for BytesFilter { + fn serialize(&self, serializer: S) -> Result { + match self { + BytesFilter::Regex(re) => serializer.serialize_str(re.as_str()), + } + } +} + +impl<'de> Deserialize<'de> for BytesFilter { + fn deserialize>(deserializer: D) -> Result { + let s = String::deserialize(deserializer)?; + BytesRegex::new(&s).map(BytesFilter::Regex).map_err(serde::de::Error::custom) + } +} + +impl BytesFilter { + pub fn matches(&self, bytes: &[u8]) -> bool { + match self { + BytesFilter::Regex(re) => re.is_match(bytes), + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default, JsonSchema)] +pub struct ContentFilterConfig { + content_length_limit: Option, + forbidden_content_filter: Vec, +} +#[derive(Debug, Clone)] +pub struct ContentFilterPlugin(Arc); +impl Deref for ContentFilterPlugin { + type Target = ContentFilterConfig; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} +impl Plugin for ContentFilterPlugin { + const CODE: &'static str = "content-filter"; + fn meta() -> spacegate_shell::model::PluginMetaData { + plugin_meta! { + description: "Filter content based on type, keywords and length." + } + } + + async fn call(&self, mut req: spacegate_shell::SgRequest, inner: spacegate_shell::plugin::Inner) -> Result { + if let Some(length_limit) = self.content_length_limit { + let size = req.body().size_hint(); + if size.lower() > length_limit as u64 { + return Ok(SgResponse::with_code_empty(StatusCode::PAYLOAD_TOO_LARGE)); + } + } + if !self.forbidden_content_filter.is_empty() { + let (parts, body) = req.into_parts(); + let body = body.dump().await?; + for filter in &self.forbidden_content_filter { + let bytes = body.get_dumped().expect("dumped"); + if filter.matches(bytes) { + let mut response = SgResponse::with_code_empty(StatusCode::BAD_REQUEST); + response.extensions_mut().insert(ContentFilterForbiddenReport { + forbidden_reason: filter.to_string(), + }); + return Ok(response); + } + } + req = spacegate_shell::SgRequest::from_parts(parts, body); + } + Ok(inner.call(req).await) + } + + fn create(plugin_config: spacegate_shell::model::PluginConfig) -> Result { + let config = serde_json::from_value(plugin_config.spec)?; + Ok(ContentFilterPlugin(Arc::new(config))) + } +} diff --git a/backend/gateways/spacegate-plugins/src/plugin/notify.rs b/backend/gateways/spacegate-plugins/src/plugin/notify.rs index 5d6d44b13..bdf40f2e0 100644 --- a/backend/gateways/spacegate-plugins/src/plugin/notify.rs +++ b/backend/gateways/spacegate-plugins/src/plugin/notify.rs @@ -1,25 +1,207 @@ -use std::{collections::HashMap, sync::Arc}; +use std::{ + collections::HashMap, + hash::{Hash, Hasher}, + net::IpAddr, + sync::{Arc, OnceLock}, +}; -use http::{HeaderName, HeaderValue, Uri}; +use bios_sdk_invoke::{invoke_config::InvokeConfig, invoke_enumeration::InvokeModuleKind, invoke_initializer}; +use http::HeaderValue; use serde::{Deserialize, Serialize}; use spacegate_shell::{ - kernel::backend_service::http_client_service::get_client, - plugin::{plugin_meta, schema, schemars, Inner, Plugin, PluginSchemaExt}, + ext_redis::RedisClient, + kernel::extension::OriginalIpAddr, + plugin::{plugin_meta, plugins::limit::RateLimitReport, schema, schemars, Inner, Plugin, PluginSchemaExt}, BoxError, SgRequest, SgRequestExt, SgResponse, }; -use tardis::serde_json; +use tardis::{ + regex::{self, Regex}, + serde_json, +}; -use crate::extension::notification::NotificationContext; +use crate::extension::{ + audit_log_param::AuditLogParam, + cert_info::CertInfo, + notification::{CertLockReport, ContentFilterForbiddenReport, NotificationContext, ReachMsgSendReq, TamperReport, UnauthorizedOperationReport}, +}; #[derive(Serialize, Deserialize, schemars::JsonSchema)] pub struct NotifyPluginConfig { - api: String, - headers: HashMap, + /// templates for different notification types + /// - rate_limit: rate limit notification + /// - count: number of requests + /// - time_window: time window + /// - tamper: tamper notification + /// - unauthorized_operation: unauthorized operation notification + /// - cert_lock: cert lock notification + /// - content_filter_forbidden: content filter forbidden notification + /// - reason: forbidden reason + templates: HashMap, + log_api: String, + reach_api: String, + spi_app_id: String, + audit_log_tag: String, + audit_log_token_header_name: String, + dedup_cache_cool_down: String, +} + +impl Default for NotifyPluginConfig { + fn default() -> Self { + Self { + templates: Default::default(), + log_api: "http://bios.devops:8080".into(), + reach_api: "http://bios.devops:8080".into(), + spi_app_id: "".into(), + audit_log_tag: "iam_abnormal".into(), + audit_log_token_header_name: "Bios-Token".into(), + dedup_cache_cool_down: "10m".into(), + } + } +} + +pub trait Report { + fn get_replacement(&self) -> HashMap<&'static str, String>; + fn key(&self) -> &'static str; + fn dedup_hash(&self) -> u64 { + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + self.key().hash(&mut hasher); + hasher.finish() + } +} + +pub struct WithUserAndIp<'a, T> { + pub(crate) user: Option<&'a str>, + pub(crate) ip: IpAddr, + pub(crate) raw: &'a T, +} + +pub trait ReportExt { + fn with_user_and_ip<'a>(&'a self, user: Option<&'a str>, ip: IpAddr) -> WithUserAndIp<'a, Self> + where + Self: Sized; +} + +impl ReportExt for T { + fn with_user_and_ip<'a>(&'a self, user: Option<&'a str>, ip: IpAddr) -> WithUserAndIp<'a, Self> { + WithUserAndIp { user, ip, raw: self } + } +} + +impl Report for WithUserAndIp<'_, T> +where + T: Report, +{ + fn get_replacement(&self) -> HashMap<&'static str, String> { + let mut replace = self.raw.get_replacement(); + let mut formatted_user = self.ip.to_string(); + if let Some(tamper_user) = self.user { + formatted_user.push('('); + formatted_user.push_str(tamper_user); + formatted_user.push(')'); + } + replace.insert("user", formatted_user); + replace + } + fn key(&self) -> &'static str { + self.raw.key() + } + fn dedup_hash(&self) -> u64 { + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + self.raw.dedup_hash().hash(&mut hasher); + self.ip.hash(&mut hasher); + hasher.finish() + } +} + +impl Report for RateLimitReport { + fn get_replacement(&self) -> HashMap<&'static str, String> { + let mut replace = HashMap::new(); + let count = format!("{}", self.plugin.max_request_number); + let time_window = format!("{:.000}s", self.plugin.time_window_ms as f64 / 1000.0); + replace.insert("count", count); + replace.insert("time_window", time_window); + replace + } + fn key(&self) -> &'static str { + "rate_limit" + } +} + +impl Report for CertLockReport { + fn get_replacement(&self) -> HashMap<&'static str, String> { + HashMap::new() + } + fn key(&self) -> &'static str { + "cert_lock" + } +} + +impl Report for TamperReport { + fn get_replacement(&self) -> HashMap<&'static str, String> { + HashMap::new() + } + fn key(&self) -> &'static str { + "tamper" + } } + +impl Report for UnauthorizedOperationReport { + fn get_replacement(&self) -> HashMap<&'static str, String> { + HashMap::new() + } + fn key(&self) -> &'static str { + "unauthorized_operation" + } +} + +impl Report for ContentFilterForbiddenReport { + fn get_replacement(&self) -> HashMap<&'static str, String> { + let mut replace = HashMap::new(); + replace.insert("reason", self.forbidden_reason.to_string()); + replace + } + fn key(&self) -> &'static str { + "content_filter_forbidden" + } +} +#[derive(Serialize, Deserialize, schemars::JsonSchema, Default, Debug)] +pub struct NotifyPluginConfigTemplates { + pub rate_limit: NotifyPluginConfigTemplatesItem, + pub tamper: NotifyPluginConfigTemplatesItem, + pub unauthorized_operation: NotifyPluginConfigTemplatesItem, +} +#[derive(Serialize, Deserialize, schemars::JsonSchema, Default, Debug)] +pub struct NotifyPluginConfigTemplatesItem { + pub reach: Option, + pub audit_log: Option, +} + schema!(NotifyPlugin, NotifyPluginConfig); #[derive(Debug, Clone)] pub struct NotifyPlugin { - api: Arc, - headers: Arc>, + log_api: Arc, + reach_api: Arc, + spi_app_id: Arc, + audit_log_tag: Arc, + audit_log_token_header_name: Arc, + templates: Arc>, + dedup_cache_cool_down: std::time::Duration, +} + +pub fn format_template(template: &str, replace: &HashMap<&'static str, String>) -> String { + static PLACEHOLDER_RE: OnceLock = OnceLock::new(); + const NULL_STR: &str = ""; + fn re() -> &'static Regex { + PLACEHOLDER_RE.get_or_init(|| Regex::new(r"\$\{(\w+)\}").expect("invalid regex")) + } + let formatted = re().replace_all(template, |caps: ®ex::Captures| { + let replaced = replace.get(caps.get(1).expect("regex should have a capture").as_str()); + if let Some(replaced) = replaced { + replaced.as_str() + } else { + NULL_STR + } + }); + formatted.to_string() } impl Plugin for NotifyPlugin { @@ -30,36 +212,112 @@ impl Plugin for NotifyPlugin { } } fn create(plugin_config: spacegate_shell::model::PluginConfig) -> Result { - // parse uri let config: NotifyPluginConfig = serde_json::from_value(plugin_config.spec)?; - let api = config.api.parse::()?; - let headers = config - .headers - .into_iter() - .map_while(|(k, v)| { - if let (Ok(k), Ok(v)) = (k.parse::(), v.parse::()) { - Some((k, v)) - } else { - None - } - }) - .collect(); + if config.spi_app_id.is_empty() { + tardis::log::error!("[Plugin.AuditLog] log_url or spi_app_id is empty!"); + } else { + invoke_initializer::init( + Self::CODE, + InvokeConfig { + spi_app_id: config.spi_app_id.clone(), + module_urls: HashMap::from([ + (InvokeModuleKind::Log.to_string(), config.log_api.clone()), + (InvokeModuleKind::Reach.to_string(), config.reach_api.clone()), + ]), + ..Default::default() + }, + )?; + } + let dedup_cache_cool_down = crate::utils::parse_duration(&config.dedup_cache_cool_down)?; Ok(Self { - api: Arc::new(api), - headers: Arc::new(headers), + log_api: config.log_api.into(), + reach_api: config.reach_api.into(), + spi_app_id: config.spi_app_id.into(), + audit_log_tag: config.audit_log_tag.into(), + audit_log_token_header_name: config.audit_log_token_header_name.into(), + templates: Arc::new(config.templates), + dedup_cache_cool_down, }) } async fn call(&self, mut req: SgRequest, inner: Inner) -> Result { + let audit_param = req.extract::(); + let redis_client = req.extract::>(); + let Some(redis_client) = redis_client else { + // skip report since redis client is not available + return Ok(inner.call(req).await); + }; let context = NotificationContext { - api: self.api.clone(), - headers: self.headers.clone(), - client: get_client(), + templates: self.templates.clone(), + log_api: self.log_api.clone(), + reach_api: self.reach_api.clone(), + spi_app_id: self.spi_app_id.clone(), + audit_log_tag: self.audit_log_tag.clone(), + audit_log_token_header_name: self.audit_log_token_header_name.clone(), + audit_param: Arc::new(audit_param), + cache_client: redis_client, + dedup_cache_cool_down: self.dedup_cache_cool_down, }; req.extensions_mut().insert(context.clone()); - req.reflect_mut().insert(context); - Ok(inner.call(req).await) + let req_cert_info = req.extensions().get::().cloned(); + let ip = req.extract::().0; + let response = inner.call(req).await; + let user = req_cert_info.as_ref().or_else(|| response.extensions().get::()).and_then(|c| c.name.clone()); + if let Some(rate_limit_report) = response.extensions().get::().filter(|r| r.rising_edge) { + context.report(&response, rate_limit_report.with_user_and_ip(user.as_deref(), ip)); + } + if let Some(flag) = response.extensions().get::() { + context.report(&response, flag.with_user_and_ip(user.as_deref(), ip)); + } + if let Some(error_code) = response.headers().get(tardis::web::web_resp::HEADER_X_TARDIS_ERROR) { + if let Some(error_kind) = KnownError::try_from_header_value(error_code) { + match error_kind { + KnownError::Tamper => { + context.report(&response, TamperReport {}.with_user_and_ip(user.as_deref(), ip)); + } + KnownError::UnauthorizedOperation => { + context.report(&response, UnauthorizedOperationReport {}.with_user_and_ip(user.as_deref(), ip)); + } + KnownError::CertLock => { + context.report(&response, CertLockReport {}.with_user_and_ip(user.as_deref(), ip)); + } + } + } + } + Ok(response) } fn schema_opt() -> Option { Some(NotifyPlugin::schema()) } } + +pub enum KnownError { + Tamper, + UnauthorizedOperation, + CertLock, +} + +impl KnownError { + pub fn try_from_header_value(header_value: &HeaderValue) -> Option { + match header_value.to_str().ok()? { + code if code.starts_with("401-signature-error") => Some(Self::Tamper), + code if code.starts_with("403-req-permission-denied") || code.starts_with("401-auth-req-unauthorized") => Some(Self::UnauthorizedOperation), + code if code.starts_with("400-rbum-cert-lock") => Some(Self::CertLock), + _ => None, + } + } +} + + +#[cfg(test)] +mod test { + #[test] + fn test_template() { + use super::format_template; + let template = "hello ${obj}"; + let mut replace = std::collections::HashMap::new(); + replace.insert("obj", "world".to_string()); + assert_eq!(format_template(template, &replace), "hello world"); + let replace = std::collections::HashMap::new(); + assert_eq!(format_template(template, &replace), "hello "); + } +} \ No newline at end of file diff --git a/backend/gateways/spacegate-plugins/src/utils.rs b/backend/gateways/spacegate-plugins/src/utils.rs new file mode 100644 index 000000000..44c6f0366 --- /dev/null +++ b/backend/gateways/spacegate-plugins/src/utils.rs @@ -0,0 +1,71 @@ +use std::time::Duration; + +use spacegate_shell::BoxError; + +pub fn parse_duration(duration: &str) -> Result { + // ()* + // = [0-9]+ + // = "ns" | "us" | "ms" | "s" | "m" | "h" | "d" + let mut duration = duration; + let mut total_duration = Duration::new(0, 0); + while !duration.is_empty() { + // Parse number + let (number, rest) = match duration.find(|c: char| !c.is_ascii_digit() && c != ' ') { + Some(index) => duration.split_at(index), + None => (duration, ""), + }; + let number = number.trim().parse::()?; + duration = rest; + + // Parse unit + let (unit, rest) = match duration.find(|c: char| !c.is_alphabetic() && c != ' ') { + Some(index) => duration.split_at(index), + None => (duration, ""), + }; + + let unit = match unit.trim() { + "ns" | "nanosecond" => Duration::from_nanos(number), + "us" | "microsecond" => Duration::from_micros(number), + "ms" | "millisecond" => Duration::from_millis(number), + "s" | "second" => Duration::from_secs(number), + "m" | "min" => Duration::from_secs(number * 60), + "h" | "hour" => Duration::from_secs(number * 60 * 60), + "d" | "day" => Duration::from_secs(number * 60 * 60 * 24), + _ => return Err(format!("Invalid unit: {}", unit).into()), + }; + + total_duration += unit; + duration = rest; + } + Ok(total_duration) +} + +#[cfg(test)] +mod test { + #[test] + fn test_duration_parse() { + use std::time::Duration; + macro_rules! test { + ($( + $str: literal => $duration: expr, + )*) => { + $( + let duration = super::parse_duration($str).unwrap(); + assert_eq!(duration, $duration); + )* + }; + } + test! { + "1ns" => Duration::from_nanos(1), + "1us" => Duration::from_micros(1), + "1ms" => Duration::from_millis(1), + "1s" => Duration::from_secs(1), + "1m" => Duration::from_secs(60), + "1h" => Duration::from_secs(60 * 60), + "1d" => Duration::from_secs(60 * 60 * 24), + "1ns1us1ms1s1m1h1d" => Duration::from_nanos(1) + Duration::from_micros(1) + Duration::from_millis(1) + Duration::from_secs(1) + Duration::from_secs(60) + Duration::from_secs(60 * 60) + Duration::from_secs(60 * 60 * 24), + "1d 1h 1min 1s 1ms 1us 1ns" => Duration::from_secs(60 * 60 * 24) + Duration::from_secs(60 * 60) + Duration::from_secs(60) + Duration::from_secs(1) + Duration::from_millis(1) + Duration::from_micros(1) + Duration::from_nanos(1), + } + + } +} \ No newline at end of file diff --git a/backend/middlewares/flow/src/lib.rs b/backend/middlewares/flow/src/lib.rs index 904c8122c..0fdfabd26 100644 --- a/backend/middlewares/flow/src/lib.rs +++ b/backend/middlewares/flow/src/lib.rs @@ -7,4 +7,4 @@ pub mod flow_config; pub mod flow_constants; pub mod flow_initializer; mod helper; -mod serv; +mod serv; \ No newline at end of file diff --git a/backend/supports/auth/src/error.rs b/backend/supports/auth/src/error.rs new file mode 100644 index 000000000..30a6d4933 --- /dev/null +++ b/backend/supports/auth/src/error.rs @@ -0,0 +1,11 @@ +use tardis::basic::error::TardisError; + +pub trait AuthError { + // 410: The request signature is incorrect. + fn signature_error(msg: &str, locale_code: &str) -> TardisError { + TardisError::custom("401-signature-error", msg, locale_code) + } +} + +impl AuthError for TardisError {} + diff --git a/backend/supports/auth/src/lib.rs b/backend/supports/auth/src/lib.rs index b2418a093..81a25ee31 100644 --- a/backend/supports/auth/src/lib.rs +++ b/backend/supports/auth/src/lib.rs @@ -7,3 +7,4 @@ pub mod auth_initializer; pub mod dto; pub mod helper; pub mod serv; +mod error; diff --git a/backend/supports/auth/src/serv/auth_crypto_serv.rs b/backend/supports/auth/src/serv/auth_crypto_serv.rs index bc46a4306..7aef9e2d1 100644 --- a/backend/supports/auth/src/serv/auth_crypto_serv.rs +++ b/backend/supports/auth/src/serv/auth_crypto_serv.rs @@ -13,7 +13,7 @@ use lazy_static::lazy_static; use crate::{ auth_config::AuthConfig, auth_constants::DOMAIN_CODE, - dto::auth_crypto_dto::{AuthEncryptReq, AuthEncryptResp}, + dto::auth_crypto_dto::{AuthEncryptReq, AuthEncryptResp}, error::AuthError, }; lazy_static! { @@ -94,7 +94,7 @@ pub async fn decrypt_req( "[Auth] input_keys have four key,and body:{body},input_sm3_digest:{input_sm3_digest},sm3(body):{}", TardisFuns::crypto.digest.sm3(body)? ); - return Err(TardisError::bad_request("[Auth] Encrypted request: body digest error.", "401-auth-req-crypto-error")); + return Err(TardisError::signature_error("[Auth] Encrypted request: body digest error.", "401-auth-req-crypto-error")); } let data = TardisFuns::crypto @@ -121,7 +121,7 @@ pub async fn decrypt_req( "[Auth] input_keys have three key,and body:{body},input_sm3_digest:{input_sm3_digest},sm3(body):{}", TardisFuns::crypto.digest.sm3(body)? ); - return Err(TardisError::bad_request("[Auth] Encrypted request: body digest error.", "401-auth-req-crypto-error")); + return Err(TardisError::signature_error("[Auth] Encrypted request: body digest error.", "401-auth-req-crypto-error")); } let data = TardisFuns::crypto diff --git a/backend/supports/auth/src/serv/auth_kernel_serv.rs b/backend/supports/auth/src/serv/auth_kernel_serv.rs index fd2460b02..644b50790 100644 --- a/backend/supports/auth/src/serv/auth_kernel_serv.rs +++ b/backend/supports/auth/src/serv/auth_kernel_serv.rs @@ -484,10 +484,14 @@ pub async fn do_auth(ctx: &AuthContext) -> TardisResult, + pub rel_item_id: String, + pub replace: HashMap, +} + +#[derive(Debug, Serialize, Clone)] +pub struct ReachMsgReceive { + pub receive_group_code: String, + pub receive_kind: String, + pub receive_ids: Vec, +} +impl ReachClient { + pub async fn send_message(req: &ReachMsgSendReq, funs: &TardisFunsInst, ctx: &TardisContext) -> TardisResult<()> { + let schedule_url: String = BaseSpiClient::module_url(InvokeModuleKind::Reach, funs).await?; + let headers = BaseSpiClient::headers(None, funs, ctx).await?; + funs.web_client().put_obj_to_str(&format!("{schedule_url}/ci/message/send"), &req, headers.clone()).await?; + Ok(()) + } +} diff --git a/frontend/sdks/invoke/src/invoke_config.rs b/frontend/sdks/invoke/src/invoke_config.rs index 0019e135b..961e8699d 100644 --- a/frontend/sdks/invoke/src/invoke_config.rs +++ b/frontend/sdks/invoke/src/invoke_config.rs @@ -31,6 +31,14 @@ impl Default for InvokeConfig { (InvokeModuleKind::Iam.to_string(), "http://127.0.0.1:8080/iam".to_string()), (InvokeModuleKind::Stats.to_string(), "http://127.0.0.1:8080/spi-stats".to_string()), (InvokeModuleKind::Event.to_string(), "http://127.0.0.1:8080/event".to_string()), + (InvokeModuleKind::Reach.to_string(), "http://127.0.0.1:8080/reach".to_string()), + ]), + module_configs: HashMap::from([ + (InvokeModuleKind::Kv.to_string(), InvokeModuleConfig { in_event: false }), + (InvokeModuleKind::Log.to_string(), InvokeModuleConfig { in_event: false }), + (InvokeModuleKind::Search.to_string(), InvokeModuleConfig { in_event: false }), + (InvokeModuleKind::Schedule.to_string(), InvokeModuleConfig { in_event: false }), + (InvokeModuleKind::Stats.to_string(), InvokeModuleConfig { in_event: false }), ]), module_configs: HashMap::from([ (InvokeModuleKind::Kv.to_string(), InvokeModuleConfig { in_event: false }), diff --git a/frontend/sdks/invoke/src/invoke_enumeration.rs b/frontend/sdks/invoke/src/invoke_enumeration.rs index 5d62b066f..c56f69a01 100644 --- a/frontend/sdks/invoke/src/invoke_enumeration.rs +++ b/frontend/sdks/invoke/src/invoke_enumeration.rs @@ -26,6 +26,8 @@ pub enum InvokeModuleKind { Iam, #[oai(rename = "event")] Event, + #[oai(rename = "reach")] + Reach, } impl std::fmt::Display for InvokeModuleKind { @@ -42,6 +44,7 @@ impl std::fmt::Display for InvokeModuleKind { InvokeModuleKind::Schedule => write!(f, "schedule"), InvokeModuleKind::Iam => write!(f, "iam"), InvokeModuleKind::Event => write!(f, "event"), + InvokeModuleKind::Reach => write!(f, "reach"), } } }