From 318843ece2cdfee9d0e03c9b45fe6991abc6f057 Mon Sep 17 00:00:00 2001 From: 4t145 Date: Tue, 2 Apr 2024 11:08:07 +0800 Subject: [PATCH] gateway: Gateway support plugin instance (#671) --- Cargo.toml | 4 +- gateway/spacegate-lib/src/extension.rs | 33 +++ .../src/extension/audit_log_param.rs | 26 ++- gateway/spacegate-lib/src/lib.rs | 4 +- gateway/spacegate-lib/src/plugin.rs | 1 + .../spacegate-lib/src/plugin/anti_replay.rs | 90 ++++--- gateway/spacegate-lib/src/plugin/anti_xss.rs | 58 +++-- gateway/spacegate-lib/src/plugin/audit_log.rs | 87 +++---- gateway/spacegate-lib/src/plugin/auth.rs | 71 +++--- .../spacegate-lib/src/plugin/auth/tests.rs | 2 +- gateway/spacegate-lib/src/plugin/ip_time.rs | 59 ++--- .../spacegate-lib/src/plugin/ip_time/tests.rs | 4 +- .../src/plugin/op_redis_publisher.rs | 219 ++++++++++++++++++ .../src/plugin/rewrite_ns_b_ip.rs | 52 +++-- gateway/spacegate-lib/tests/export_schemas.rs | 2 +- services/spacegate/Cargo.toml | 2 +- services/spacegate/Dockerfile | 2 +- .../tests/spi_conf_nacos_compatible_test.rs | 2 +- 18 files changed, 483 insertions(+), 235 deletions(-) create mode 100644 gateway/spacegate-lib/src/plugin/op_redis_publisher.rs diff --git a/Cargo.toml b/Cargo.toml index 3e7bcf0c9..ebf531cb3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -73,11 +73,13 @@ tardis = { git = "https://github.com/ideal-world/tardis.git", rev = "191f3ec" } # "cache", # "k8s", # "ext-redis", +# "ext-axum", # ] } -spacegate-shell = { git = "https://github.com/ideal-world/spacegate.git", branch = "master", features = [ +spacegate-shell = { git = "https://github.com/ideal-world/spacegate.git", branch = "dev", features = [ "cache", "k8s", "ext-redis", + "ext-axum", ] } spacegate-plugin = { git = "https://github.com/ideal-world/spacegate.git", branch = "master" } diff --git a/gateway/spacegate-lib/src/extension.rs b/gateway/spacegate-lib/src/extension.rs index 69774f59e..7843bbd1b 100644 --- a/gateway/spacegate-lib/src/extension.rs +++ b/gateway/spacegate-lib/src/extension.rs @@ -1,4 +1,37 @@ +use http::Extensions; +use spacegate_shell::{kernel::extension::ExtensionPack as _, BoxError}; +use tardis::serde_json::{self, Value}; + +use self::audit_log_param::LogParamContent; + pub mod audit_log_param; pub mod before_encrypt_body; pub mod cert_info; pub mod request_crypto_status; + +pub enum ExtensionPackEnum { + LogParamContent(), + None, +} + +impl From for ExtensionPackEnum { + fn from(value: String) -> Self { + match value.as_str() { + "log_content" => ExtensionPackEnum::LogParamContent(), + _ => ExtensionPackEnum::None, + } + } +} +impl ExtensionPackEnum { + pub fn to_value(&self, ext: &Extensions) -> Result, BoxError> { + match self { + ExtensionPackEnum::LogParamContent() => { + if let Some(ext) = LogParamContent::get(ext) { + return Ok(Some(serde_json::to_value(ext)?)); + } + } + ExtensionPackEnum::None => (), + } + Ok(None) + } +} diff --git a/gateway/spacegate-lib/src/extension/audit_log_param.rs b/gateway/spacegate-lib/src/extension/audit_log_param.rs index 9d529f4d0..b7d80f7e5 100644 --- a/gateway/spacegate-lib/src/extension/audit_log_param.rs +++ b/gateway/spacegate-lib/src/extension/audit_log_param.rs @@ -1,4 +1,9 @@ -use spacegate_shell::hyper::HeaderMap; +use std::time::Duration; + +use serde::{Deserialize, Serialize}; +use spacegate_shell::{hyper::HeaderMap, kernel::extension::ExtensionPack}; + +use super::cert_info::RoleInfo; #[derive(Clone)] pub struct AuditLogParam { @@ -8,3 +13,22 @@ pub struct AuditLogParam { pub request_scheme: String, pub request_ip: String, } + +#[derive(Clone, Serialize, Deserialize)] +pub struct LogParamContent { + pub op: String, + pub name: String, + pub user_id: Option, + pub own_paths: Option, + pub role: Vec, + pub ip: String, + pub path: String, + pub scheme: String, + pub token: Option, + pub server_timing: Option, + pub resp_status: String, + //Indicates whether the business operation was successful. + pub success: bool, +} + +impl ExtensionPack for LogParamContent {} diff --git a/gateway/spacegate-lib/src/lib.rs b/gateway/spacegate-lib/src/lib.rs index e0616aaa9..fc138af71 100644 --- a/gateway/spacegate-lib/src/lib.rs +++ b/gateway/spacegate-lib/src/lib.rs @@ -8,12 +8,14 @@ mod marker; mod plugin; pub const PACKAGE_NAME: &str = "spacegate_lib"; +use plugin::op_redis_publisher; use spacegate_shell::plugin::SgPluginRepository; pub fn register_lib_plugins(repo: &SgPluginRepository) { - repo.register::(); + repo.register::(); repo.register::(); repo.register::(); repo.register::(); repo.register::(); repo.register::(); + repo.register::(); } diff --git a/gateway/spacegate-lib/src/plugin.rs b/gateway/spacegate-lib/src/plugin.rs index 865bb4a7b..0075d0d00 100644 --- a/gateway/spacegate-lib/src/plugin.rs +++ b/gateway/spacegate-lib/src/plugin.rs @@ -3,4 +3,5 @@ pub mod anti_xss; pub mod audit_log; pub mod auth; pub mod ip_time; +pub mod op_redis_publisher; pub mod rewrite_ns_b_ip; diff --git a/gateway/spacegate-lib/src/plugin/anti_replay.rs b/gateway/spacegate-lib/src/plugin/anti_replay.rs index 033fe8f1d..78254c580 100644 --- a/gateway/spacegate-lib/src/plugin/anti_replay.rs +++ b/gateway/spacegate-lib/src/plugin/anti_replay.rs @@ -3,18 +3,18 @@ use std::time::Duration; use serde::{Deserialize, Serialize}; use spacegate_shell::hyper::{Request, Response, StatusCode}; -use spacegate_shell::kernel::extension::{PeerAddr, Reflect}; -use spacegate_shell::kernel::helper_layers::bidirection_filter::{Bdf, BdfLayer, BoxReqFut, BoxRespFut}; -use spacegate_shell::plugin::{def_plugin, MakeSgLayer, PluginError}; +use spacegate_shell::kernel::extension::PeerAddr; +use spacegate_shell::kernel::helper_layers::function::Inner; +use spacegate_shell::plugin::{Plugin, PluginError}; use spacegate_shell::spacegate_ext_redis::{redis::AsyncCommands, RedisClient}; -use spacegate_shell::{SgBody, SgBoxLayer, SgRequestExt, SgResponseExt}; +use spacegate_shell::{BoxError, SgBody, SgRequestExt, SgResponseExt}; +use tardis::serde_json; use tardis::{ basic::result::TardisResult, tokio::{self}, }; -def_plugin!("anti_replay", AntiReplayPlugin, SgFilterAntiReplay); #[cfg(feature = "schema")] use spacegate_plugin::schemars; #[cfg(feature = "schema")] @@ -22,16 +22,16 @@ spacegate_plugin::schema!(AntiReplayPlugin, SgFilterAntiReplay); #[derive(Serialize, Deserialize, Clone)] #[cfg_attr(feature = "schema", derive(schemars::JsonSchema))] #[serde(default)] -pub struct SgFilterAntiReplay { +pub struct AntiReplayPlugin { cache_key: String, // millisecond time: u64, } -impl Default for SgFilterAntiReplay { +impl Default for AntiReplayPlugin { fn default() -> Self { Self { - cache_key: "sg:plugin:anti_replay".to_string(), + cache_key: "sg:plugin:anti_replay".into(), time: 5000, } } @@ -43,44 +43,6 @@ pub struct AntiReplayDigest { client: RedisClient, } -impl Bdf for SgFilterAntiReplay { - type FutureReq = BoxReqFut; - type FutureResp = BoxRespFut; - fn on_req(self: Arc, mut req: Request) -> Self::FutureReq { - Box::pin(async move { - if let Some(client) = req.get_redis_client_by_gateway_name() { - let md5 = get_md5(&req).map_err(PluginError::internal_error::)?; - let digest = AntiReplayDigest { - md5: Arc::from(md5), - client: client.clone(), - }; - if get_status(&digest.md5, &self.cache_key, &client).await.map_err(PluginError::internal_error::)? { - return Err(Response::with_code_message( - StatusCode::TOO_MANY_REQUESTS, - "[SG.Plugin.Anti_Replay] Request denied due to replay attack. Please refresh and resubmit the request.", - )); - } else { - set_status(&digest.md5, &self.cache_key, true, &client).await.map_err(PluginError::internal_error::)?; - } - req.extensions_mut().get_mut::().expect("missing reflect").insert(digest); - } - Ok(req) - }) - } - fn on_resp(self: Arc, resp: Response) -> Self::FutureResp { - Box::pin(async move { - if let Some(digest) = resp.extensions().get::() { - let digest = digest.clone(); - tokio::spawn(async move { - tokio::time::sleep(Duration::from_millis(self.time)).await; - let _ = set_status(&digest.md5, self.cache_key.as_ref(), false, &digest.client).await; - }); - } - resp - }) - } -} - fn get_md5(req: &Request) -> TardisResult { let remote_addr = req.extensions().get::().expect("missing peer address").0; let uri = req.uri(); @@ -120,9 +82,39 @@ async fn get_status(md5: &str, cache_key: &str, cache_client: &RedisClient) -> T Ok(status1 && status2) } -impl MakeSgLayer for SgFilterAntiReplay { - fn make_layer(&self) -> Result { - Ok(SgBoxLayer::new(BdfLayer::new(self.clone()))) +impl Plugin for AntiReplayPlugin { + const CODE: &'static str = "anti-replay"; + fn create(plugin_config: spacegate_shell::plugin::PluginConfig) -> Result { + let config: AntiReplayPlugin = serde_json::from_value(plugin_config.spec)?; + Ok(config) + } + async fn call(&self, req: Request, inner: Inner) -> Result, BoxError> { + if let Some(client) = req.get_redis_client_by_gateway_name() { + let md5 = get_md5(&req).map_err(PluginError::internal_error::)?; + let digest = AntiReplayDigest { + md5: Arc::from(md5), + client: client.clone(), + }; + if get_status(&digest.md5, &self.cache_key, &client).await.map_err(PluginError::internal_error::)? { + return Ok(Response::with_code_message( + StatusCode::TOO_MANY_REQUESTS, + "[SG.Plugin.Anti_Replay] Request denied due to replay attack. Please refresh and resubmit the request.", + )); + } else { + set_status(&digest.md5, &self.cache_key, true, &client).await.map_err(PluginError::internal_error::)?; + } + let resp = inner.call(req).await; + let time = self.time; + let cache_key = self.cache_key.clone(); + tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(time)).await; + let _ = set_status(&digest.md5, cache_key.as_ref(), false, &digest.client).await; + }); + Ok(resp) + } else { + let resp = inner.call(req).await; + Ok(resp) + } } } diff --git a/gateway/spacegate-lib/src/plugin/anti_xss.rs b/gateway/spacegate-lib/src/plugin/anti_xss.rs index 0e1276b06..e5a73a0a2 100644 --- a/gateway/spacegate-lib/src/plugin/anti_xss.rs +++ b/gateway/spacegate-lib/src/plugin/anti_xss.rs @@ -1,11 +1,11 @@ -use std::{fmt, sync::Arc}; +use std::fmt; use serde::{Deserialize, Serialize}; use spacegate_shell::{ hyper::{header, Response}, - kernel::helper_layers::map_response::MapResponseLayer, - plugin::{def_plugin, MakeSgLayer}, - BoxError, SgBody, SgBoxLayer, + kernel::helper_layers::function::Inner, + plugin::Plugin, + BoxError, SgBody, }; macro_rules! append_value { @@ -16,15 +16,15 @@ macro_rules! append_value { }; } -def_plugin!("anti_xss", AntiXssPlugin, SgFilterAntiXSS); #[cfg(feature = "schema")] use spacegate_plugin::schemars; +use tardis::serde_json; #[cfg(feature = "schema")] spacegate_plugin::schema!(AntiXssPlugin, SgFilterAntiXSS); #[derive(Default, Serialize, Deserialize)] #[cfg_attr(feature = "schema", derive(schemars::JsonSchema))] #[serde(default)] -pub struct SgFilterAntiXSS { +pub struct AntiXssConfig { csp_config: CSPConfig, } @@ -150,23 +150,37 @@ impl fmt::Display for SandBoxValue { } } -impl MakeSgLayer for SgFilterAntiXSS { - fn make_layer(&self) -> Result { - let header = Arc::new(header::HeaderValue::from_str(&self.csp_config.to_string_header_value())?); +pub struct AntiXssPlugin { + csp_config: CSPConfig, + header: header::HeaderValue, +} + +impl Plugin for AntiXssPlugin { + const CODE: &'static str = "anti-xss"; + fn create(plugin_config: spacegate_shell::plugin::PluginConfig) -> Result { + let config: AntiXssConfig = serde_json::from_value(plugin_config.spec)?; + let header = header::HeaderValue::from_str(&config.csp_config.to_string_header_value())?; + Ok(AntiXssPlugin { + csp_config: config.csp_config, + header, + }) + } + async fn call(&self, req: http::Request, inner: Inner) -> Result, BoxError> { let report_only = self.csp_config.report_only; - Ok(SgBoxLayer::new(MapResponseLayer::new(move |mut resp: Response| { - let mut enable = false; - if let Some(content_type) = resp.headers().get(header::CONTENT_TYPE) { - enable = content_type.eq("text/html") || content_type.eq("text/css") || content_type.eq("application/javascript") || content_type.eq("application/x-javascript"); - }; - if enable { - if report_only { - let _ = resp.headers_mut().append(header::CONTENT_SECURITY_POLICY_REPORT_ONLY, header.as_ref().clone()); - } else { - let _ = resp.headers_mut().append(header::CONTENT_SECURITY_POLICY, header.as_ref().clone()); - } + let mut resp = inner.call(req).await; + let header = &self.header; + + let mut enable = false; + if let Some(content_type) = resp.headers().get(header::CONTENT_TYPE) { + enable = content_type.eq("text/html") || content_type.eq("text/css") || content_type.eq("application/javascript") || content_type.eq("application/x-javascript"); + }; + if enable { + if report_only { + let _ = resp.headers_mut().append(header::CONTENT_SECURITY_POLICY_REPORT_ONLY, header.clone()); + } else { + let _ = resp.headers_mut().append(header::CONTENT_SECURITY_POLICY, header.clone()); } - resp - }))) + } + Ok(resp) } } diff --git a/gateway/spacegate-lib/src/plugin/audit_log.rs b/gateway/spacegate-lib/src/plugin/audit_log.rs index d9b646a09..c9b8eb9cd 100644 --- a/gateway/spacegate-lib/src/plugin/audit_log.rs +++ b/gateway/spacegate-lib/src/plugin/audit_log.rs @@ -1,8 +1,7 @@ use std::collections::HashMap; use std::str::FromStr; -use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Instant; use bios_sdk_invoke::clients::spi_log_client; use bios_sdk_invoke::invoke_config::InvokeConfig; @@ -15,8 +14,8 @@ use serde::{Deserialize, Serialize}; use spacegate_shell::hyper::{Request, Response}; use spacegate_shell::kernel::extension::{EnterTime, PeerAddr, Reflect}; -use spacegate_shell::kernel::helper_layers::bidirection_filter::{Bdf, BdfLayer, BoxRespFut}; -use spacegate_shell::plugin::{JsonValue, MakeSgLayer, Plugin, PluginError}; +use spacegate_shell::kernel::helper_layers::function::Inner; +use spacegate_shell::plugin::{Plugin, PluginError}; use spacegate_shell::{BoxError, SgBody}; use tardis::basic::dto::TardisContext; use tardis::log::{debug, trace, warn}; @@ -31,9 +30,9 @@ use tardis::{ TardisFuns, TardisFunsInst, }; -use crate::extension::audit_log_param::AuditLogParam; +use crate::extension::audit_log_param::{AuditLogParam, LogParamContent}; use crate::extension::before_encrypt_body::BeforeEncryptBody; -use crate::extension::cert_info::{CertInfo, RoleInfo}; +use crate::extension::cert_info::CertInfo; pub const CODE: &str = "audit_log"; @@ -45,7 +44,7 @@ spacegate_plugin::schema!(AuditLogPlugin, SgFilterAuditLog); #[derive(Serialize, Deserialize, Clone)] #[cfg_attr(feature = "schema", derive(schemars::JsonSchema))] #[serde(default)] -pub struct SgFilterAuditLog { +pub struct AuditLogPlugin { log_url: String, spi_app_id: String, tag: String, @@ -60,7 +59,7 @@ pub struct SgFilterAuditLog { head_key_auth_ident: String, } -impl SgFilterAuditLog { +impl AuditLogPlugin { async fn get_log_content(&self, mut resp: Response) -> TardisResult<(Response, Option)> { let Some(param) = resp.extensions_mut().remove::() else { warn!("[Plugin.AuditLog] missing audit log param"); @@ -138,6 +137,7 @@ impl SgFilterAuditLog { server_timing: start_time.map(|st| end_time - st), resp_status: resp.status().as_u16().to_string(), success, + own_paths: resp.extensions().get::().and_then(|info| info.own_paths.clone()), }; Ok((resp, Some(content))) } @@ -242,7 +242,7 @@ impl SgFilterAuditLog { } } -impl Default for SgFilterAuditLog { +impl Default for AuditLogPlugin { fn default() -> Self { Self { log_url: "".to_string(), @@ -259,69 +259,39 @@ impl Default for SgFilterAuditLog { } } -impl Bdf for SgFilterAuditLog { - type FutureReq = std::future::Ready, Response>>; - - type FutureResp = BoxRespFut; - - fn on_req(self: Arc, req: Request) -> Self::FutureReq { - std::future::ready(self.req(req)) - } - - fn on_resp(self: Arc, resp: Response) -> Self::FutureResp { - Box::pin(async move { - match self.resp(resp).await { - Ok(resp) => resp, - Err(e) => e, - } - }) - } -} - -impl MakeSgLayer for SgFilterAuditLog { - fn make_layer(&self) -> Result { - let layer = BdfLayer::new(self.clone()); - Ok(spacegate_shell::SgBoxLayer::new(layer)) - } -} - -pub struct AuditLogPlugin; - impl Plugin for AuditLogPlugin { - type MakeLayer = SgFilterAuditLog; + // type MakeLayer = SgFilterAuditLog; const CODE: &'static str = CODE; - fn create(_: Option, value: JsonValue) -> Result { - let mut plugin: SgFilterAuditLog = serde_json::from_value(value).map_err(|e| -> BoxError { format!("[Plugin.AuditLog] deserialize error:{e}").into() })?; + + fn create(config: spacegate_shell::plugin::PluginConfig) -> Result { + let mut plugin: AuditLogPlugin = serde_json::from_value(config.spec.clone()).map_err(|e| -> BoxError { format!("[Plugin.AuditLog] deserialize error:{e}").into() })?; plugin.init()?; Ok(plugin) } + async fn call(&self, req: Request, inner: Inner) -> Result, BoxError> { + match self.req(req) { + Ok(req) => { + let resp = inner.call(req).await; + match self.resp(resp).await { + Ok(resp) => Ok(resp), + Err(e) => Ok(e), + } + } + Err(resp) => Ok(resp), + } + } } fn get_tardis_inst() -> TardisFunsInst { TardisFuns::inst(CODE.to_string(), None) } -#[derive(Serialize, Deserialize)] -pub struct LogParamContent { - pub op: String, - pub name: String, - pub user_id: Option, - pub role: Vec, - pub ip: String, - pub path: String, - pub scheme: String, - pub token: Option, - pub server_timing: Option, - pub resp_status: String, - //Indicates whether the business operation was successful. - pub success: bool, -} - impl LogParamContent { fn to_value(&self) -> Value { json!({ "name":self.name, "id":self.user_id, + "own_paths":self.own_paths, "ip":self.ip, "op":self.op, "path":self.path, @@ -331,7 +301,6 @@ impl LogParamContent { }) } } - #[cfg(test)] mod test { use http::{HeaderName, Request, Response}; @@ -341,13 +310,13 @@ mod test { }; use tardis::tokio; - use super::SgFilterAuditLog; + use super::AuditLogPlugin; #[tokio::test] async fn test_log_content() { let ent_time = std::time::Instant::now(); println!("test_log_content"); - let mut sg_filter_audit_log = SgFilterAuditLog { + let mut sg_filter_audit_log = AuditLogPlugin { log_url: "xxx".to_string(), spi_app_id: "xxx".to_string(), exclude_log_path: vec!["/api/test".to_string(), "/cc/api/test/file".to_string()], diff --git a/gateway/spacegate-lib/src/plugin/auth.rs b/gateway/spacegate-lib/src/plugin/auth.rs index 3794da367..762ac969b 100644 --- a/gateway/spacegate-lib/src/plugin/auth.rs +++ b/gateway/spacegate-lib/src/plugin/auth.rs @@ -15,12 +15,9 @@ use spacegate_shell::{ http::{self, HeaderMap, HeaderName, HeaderValue, StatusCode}, Method, Request, Response, }, - kernel::{ - extension::Reflect, - helper_layers::bidirection_filter::{Bdf, BdfLayer, BoxReqFut, BoxRespFut}, - }, - plugin::{JsonValue, MakeSgLayer, Plugin, PluginError}, - BoxError, SgBody, SgBoxLayer, + kernel::{extension::Reflect, helper_layers::function::Inner}, + plugin::{Plugin, PluginConfig, PluginError}, + BoxError, SgBody, }; use std::{ collections::HashMap, @@ -125,7 +122,7 @@ impl Default for SgPluginAuthConfig { } } #[derive(Clone)] -pub struct SgPluginAuth { +pub struct AuthPlugin { auth_config: AuthConfig, cors_allow_origin: HeaderValue, cors_allow_methods: HeaderValue, @@ -148,9 +145,9 @@ pub struct SgPluginAuth { auth_path_ignore_prefix: String, } -impl From for SgPluginAuth { +impl From for AuthPlugin { fn from(value: SgPluginAuthConfig) -> Self { - SgPluginAuth { + AuthPlugin { auth_config: value.auth_config, cors_allow_origin: HeaderValue::from_str(&value.cors_allow_origin).expect("cors_allow_origin is invalid"), cors_allow_methods: HeaderValue::from_str(&value.cors_allow_methods).expect("cors_allow_methods is invalid"), @@ -163,7 +160,7 @@ impl From for SgPluginAuth { } } -impl<'de> Deserialize<'de> for SgPluginAuth { +impl<'de> Deserialize<'de> for AuthPlugin { fn deserialize(deserializer: D) -> Result where D: serde::Deserializer<'de>, @@ -172,7 +169,7 @@ impl<'de> Deserialize<'de> for SgPluginAuth { } } -impl SgPluginAuth { +impl AuthPlugin { fn cors(&self, resp: &mut Response) -> TardisResult<()> { resp.headers_mut().insert(header::ACCESS_CONTROL_ALLOW_ORIGIN, self.cors_allow_origin.clone()); resp.headers_mut().insert(header::ACCESS_CONTROL_ALLOW_ORIGIN, self.cors_allow_origin.clone()); @@ -198,7 +195,7 @@ impl SgPluginAuth { } } -impl SgPluginAuth { +impl AuthPlugin { async fn req(&self, mut req: Request) -> Result, Response> { req.extensions_mut().get_mut::().expect("missing reflect").insert(RequestCryptoParam::default()); @@ -314,31 +311,6 @@ impl SgPluginAuth { } } -impl Bdf for SgPluginAuth { - type FutureReq = BoxReqFut; - - type FutureResp = BoxRespFut; - - fn on_req(self: Arc, req: Request) -> Self::FutureReq { - Box::pin(async move { self.req(req).await }) - } - - fn on_resp(self: Arc, resp: Response) -> Self::FutureResp { - Box::pin(async move { - match self.resp(resp).await { - Ok(resp) => resp, - Err(e) => e, - } - }) - } -} - -impl MakeSgLayer for SgPluginAuth { - fn make_layer(&self) -> Result { - Ok(SgBoxLayer::new(BdfLayer::new(self.clone()))) - } -} - async fn handle_mix_req(auth_config: &AuthConfig, mix_replace_url: &str, req: Request) -> Result, BoxError> { let (mut parts, mut body) = req.into_parts(); if !body.is_dumped() { @@ -514,14 +486,12 @@ fn headermap_to_hashmap(old_headers: &HeaderMap) -> TardisResult, value: JsonValue) -> Result { - let config: SgPluginAuthConfig = serde_json::from_value(value)?; - let filter: SgPluginAuth = config.clone().into(); + // type MakeLayer = SgPluginAuth; + fn create(plugin_config: PluginConfig) -> Result { + let config: SgPluginAuthConfig = serde_json::from_value(plugin_config.spec.clone())?; + let plugin: AuthPlugin = config.clone().into(); let tardis_init = Arc::new(Once::new()); { let tardis_init = tardis_init.clone(); @@ -539,8 +509,21 @@ impl Plugin for AuthPlugin { while !tardis_init.is_completed() { // blocking wait tardis setup } - Ok(filter) + Ok(plugin) + } + async fn call(&self, req: Request, inner: Inner) -> Result, BoxError> { + let req = match self.req(req).await { + Ok(req) => req, + Err(resp) => return Ok(resp), + }; + let resp = inner.call(req).await; + Ok(match self.resp(resp).await { + Ok(resp) => resp, + Err(resp) => resp, + }) } + // fn create(_: Option, value: JsonValue) -> Result { + // } } #[cfg(test)] diff --git a/gateway/spacegate-lib/src/plugin/auth/tests.rs b/gateway/spacegate-lib/src/plugin/auth/tests.rs index 759353a30..20a39ed85 100644 --- a/gateway/spacegate-lib/src/plugin/auth/tests.rs +++ b/gateway/spacegate-lib/src/plugin/auth/tests.rs @@ -141,7 +141,7 @@ async fn test_auth_plugin_crypto() { }; filter_auth.setup_tardis().await.unwrap(); - let auth_plugin: SgPluginAuth = filter_auth.into(); + let auth_plugin: AuthPlugin = filter_auth.into(); let req = Request::builder() .method(Method::GET) diff --git a/gateway/spacegate-lib/src/plugin/ip_time.rs b/gateway/spacegate-lib/src/plugin/ip_time.rs index f2b6de82b..c283b6b28 100644 --- a/gateway/spacegate-lib/src/plugin/ip_time.rs +++ b/gateway/spacegate-lib/src/plugin/ip_time.rs @@ -4,13 +4,11 @@ use std::sync::Arc; use ipnet::IpNet; use serde::{Deserialize, Serialize}; use spacegate_shell::hyper::{Request, Response, StatusCode}; -use spacegate_shell::kernel::{ - extension::PeerAddr, - helper_layers::filter::{Filter, FilterRequestLayer}, -}; -use spacegate_shell::plugin::{JsonValue, MakeSgLayer, Plugin}; +use spacegate_shell::kernel::helper_layers::function::Inner; +use spacegate_shell::kernel::{extension::PeerAddr}; +use spacegate_shell::plugin::Plugin; -use spacegate_shell::{BoxError, SgBody, SgBoxLayer, SgResponseExt}; +use spacegate_shell::{BoxError, SgBody, SgResponseExt}; use tardis::{log, serde_json}; pub const CODE: &str = "ip_time"; @@ -40,7 +38,7 @@ pub enum SgFilterIpTimeMode { BlackList, } -impl From for SgFilterIpTime { +impl From for IpTimePlugin { fn from(value: SgFilterIpTimeConfig) -> Self { let mut rules = Vec::new(); let white_list_mode = value.mode; @@ -61,7 +59,7 @@ impl From for SgFilterIpTime { rules.push((net, rule.time_rule.clone())) } } - SgFilterIpTime { + IpTimePlugin { mode: white_list_mode, rules: rules.into(), } @@ -74,7 +72,7 @@ pub struct SgFilterIpTimeConfigRule { } #[derive(Debug, Clone)] -pub struct SgFilterIpTime { +pub struct IpTimePlugin { // # enhancement: // should be a time segment list // - segment list @@ -85,7 +83,7 @@ pub struct SgFilterIpTime { pub rules: Arc<[(IpNet, IpTimeRule)]>, } -impl SgFilterIpTime { +impl IpTimePlugin { pub fn check_ip(&self, ip: &IpAddr) -> bool { match self.mode { SgFilterIpTimeMode::WhiteList => { @@ -99,37 +97,28 @@ impl SgFilterIpTime { } } } - -impl Filter for SgFilterIpTime { - fn filter(&self, req: Request) -> Result, Response> { +impl Plugin for IpTimePlugin { + const CODE: &'static str = CODE; + fn create(config: spacegate_shell::plugin::PluginConfig) -> Result { + let ip_time_config: SgFilterIpTimeConfig = serde_json::from_value(config.spec.clone())?; + let plugin: IpTimePlugin = ip_time_config.into(); + Ok(plugin) + } + async fn call(&self, req: Request, inner: Inner) -> Result, BoxError> { let Some(socket_addr) = req.extensions().get::() else { - return Err(Response::with_code_message(StatusCode::BAD_GATEWAY, "Cannot get peer address, it's a implementation bug")); + return Err("Cannot get peer address, it's a implementation bug".into()); }; let socket_addr = socket_addr.0; let passed = self.check_ip(&socket_addr.ip()); log::trace!("[{CODE}] Check ip time rule from {socket_addr}, passed {passed}"); if !passed { - return Err(Response::with_code_message(StatusCode::FORBIDDEN, "[SG.Plugin.IpTime] Blocked by ip-time plugin")); + return Ok(Response::with_code_message(StatusCode::FORBIDDEN, "Blocked by ip-time plugin")); } - Ok(req) - } -} - -pub struct SgIpTimePlugin; - -impl Plugin for SgIpTimePlugin { - const CODE: &'static str = CODE; - type MakeLayer = SgFilterIpTime; - fn create(_: Option, value: JsonValue) -> Result { - let config: SgFilterIpTimeConfig = serde_json::from_value(value)?; - let filter: SgFilterIpTime = config.into(); - Ok(filter) - } -} - -impl MakeSgLayer for SgFilterIpTime { - fn make_layer(&self) -> Result { - let layer = FilterRequestLayer::new(self.clone()); - Ok(SgBoxLayer::new(layer)) + Ok(inner.call(req).await) } + // fn create(_: Option, value: JsonValue) -> Result { + // let config: SgFilterIpTimeConfig = serde_json::from_value(value)?; + // let filter: SgFilterIpTime = config.into(); + // Ok(filter) + // } } diff --git a/gateway/spacegate-lib/src/plugin/ip_time/tests.rs b/gateway/spacegate-lib/src/plugin/ip_time/tests.rs index eeffe106e..33b52c50d 100644 --- a/gateway/spacegate-lib/src/plugin/ip_time/tests.rs +++ b/gateway/spacegate-lib/src/plugin/ip_time/tests.rs @@ -1,6 +1,6 @@ use std::net::IpAddr; -use super::{SgFilterIpTime, SgFilterIpTimeConfig}; +use super::{IpTimePlugin, SgFilterIpTimeConfig}; use tardis::{chrono::Local, serde_json}; #[test] @@ -8,7 +8,7 @@ fn parse_config() { let json_str = include_str!("tests/testconfig.json"); println!("Init ip-time plugin, local timezone offset: {tz}", tz = Local::now().offset()); let value: Vec = serde_json::from_str(json_str).expect("fail to parse config into json"); - let filters = value.into_iter().map(SgFilterIpTime::from).collect::>(); + let filters = value.into_iter().map(IpTimePlugin::from).collect::>(); let ip: IpAddr = "123.123.123.123".parse().expect("invalid ip"); let passed = filters[0].check_ip(&ip); assert!(!passed); diff --git a/gateway/spacegate-lib/src/plugin/op_redis_publisher.rs b/gateway/spacegate-lib/src/plugin/op_redis_publisher.rs new file mode 100644 index 000000000..1efcb68ec --- /dev/null +++ b/gateway/spacegate-lib/src/plugin/op_redis_publisher.rs @@ -0,0 +1,219 @@ +use std::{ + str::FromStr as _, + time::{Duration, Instant}, +}; + +use http::Response; +use jsonpath_rust::JsonPathInst; +use serde::{Deserialize, Serialize}; +use spacegate_shell::{ + kernel::{ + extension::{EnterTime, GatewayName}, + SgRequest, SgResponse, + }, + plugin::{Inner, Plugin, PluginConfig}, + spacegate_ext_redis::global_repo, + BoxError, +}; +use tardis::{ + cache::Script, + log::{self, warn}, + serde_json::{self, Value}, + tokio, +}; + +use crate::extension::{audit_log_param::AuditLogParam, before_encrypt_body::BeforeEncryptBody, cert_info::CertInfo}; + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(default)] +pub struct RedisPublisherConfig { + success_json_path: String, + success_json_path_values: Vec, +} + +impl Default for RedisPublisherConfig { + fn default() -> Self { + Self { + success_json_path: "$.code".to_string(), + success_json_path_values: vec!["200".to_string(), "201".to_string()], + } + } +} +pub struct RedisPublisherPlugin { + pub key: String, + pub script: Script, + jsonpath_inst: Option, + success_json_path_values: Vec, +} + +impl Plugin for RedisPublisherPlugin { + const CODE: &'static str = "op_redis_publisher"; + + fn create(config: PluginConfig) -> Result { + let id = config.none_mono_id(); + let layer_config = serde_json::from_value::(config.spec.clone())?; + + Ok(Self { + key: id.redis_prefix(), + jsonpath_inst: if let Ok(jsonpath_inst) = JsonPathInst::from_str(&layer_config.success_json_path).map_err(|e| log::error!("[Plugin.AuditLog] invalid json path:{e}")) { + Some(jsonpath_inst) + } else { + None + }, + success_json_path_values: layer_config.success_json_path_values, + script: Script::new( + r##" + local channel = KEYS[1]; + local message = ARGV[1]; + + return redis.call('PUBLISH',channel,message); + "##, + ), + }) + } + + async fn call(&self, req: SgRequest, inner: Inner) -> Result, spacegate_shell::BoxError> { + let Some(gateway_name) = req.extensions().get::() else { + return Err("missing gateway name".into()); + }; + let Some(client) = global_repo().get(gateway_name) else { + return Err("missing redis client".into()); + }; + let spec_id = RedisPublisherPlugin::parse_spec_id(&req); + + let resp = inner.call(req).await; + + let (resp, content) = self.op_log(resp).await?; + + if let Some(mut content) = content { + content.spec_id = spec_id; + let key = self.key.clone(); + let script = self.script.clone(); + tokio::task::spawn(async move { + let mut conn = client.get_conn().await; + match serde_json::to_string(&content) { + Ok(v) => { + match script.key(&key).arg(v).invoke_async::<_, bool>(&mut conn).await { + Ok(_) => { + log::trace!("[Plugin.OPRedisPublisher]Publish channel:{key} success") + } + Err(e) => { + log::warn!("[Plugin.OPRedisPublisher] failed to Publish:{e}") + } + }; + } + Err(e) => { + log::warn!("[Plugin.OPRedisPublisher] failed to Deserialize:{e}") + } + } + }); + } + + Ok(resp) + } +} + +impl RedisPublisherPlugin { + fn parse_spec_id(req: &SgRequest) -> Option { + let segments: Vec<_> = req.uri().path().split('/').collect(); + //找到segment为op-api的下一个segment就是spec_id + if let Some(index) = segments.iter().position(|&seg| seg == "op-api") { + // 确保 "op-api" 后面还有段 + if let Some(spec_id) = segments.get(index + 1) { + return Some(spec_id.to_string()); + } + } + None + } + + async fn op_log(&self, mut resp: SgResponse) -> Result<(SgResponse, Option), BoxError> { + let body_string = if let Some(raw_body) = resp.extensions().get::().map(|b| b.clone().get()) { + serde_json::from_str::(&String::from_utf8_lossy(&raw_body)) + } else { + let body = if let Some(dumped) = resp.body().get_dumped() { + dumped.clone() + } else { + let (parts, body) = resp.into_parts(); + let body = body.dump().await.map_err(|e: BoxError| format!("[SG.Filter.AuditLog] dump body error: {e}"))?; + resp = Response::from_parts(parts, body.dump_clone().expect("")); + body.get_dumped().expect("not expect").clone() + }; + serde_json::from_slice::(&body) + }; + + let Some(param) = resp.extensions().get::() else { + warn!("[Plugin.OpRedisPubilsher] missing audit log param"); + return Ok((resp, None)); + }; + + let start_time = resp.extensions().get::().map(|time| time.0); + let end_time = Instant::now(); + + let success = match body_string { + Ok(json) => { + if let Some(jsonpath_inst) = &self.jsonpath_inst { + if let Some(matching_value) = jsonpath_inst.find_slice(&json).first() { + if matching_value.is_string() { + let mut is_match = false; + for value in self.success_json_path_values.clone() { + if Some(value.as_str()) == matching_value.as_str() { + is_match = true; + break; + } + } + is_match + } else if matching_value.is_number() { + let mut is_match = false; + for value in self.success_json_path_values.clone() { + let value = value.parse::(); + if value.is_ok() && value.ok() == matching_value.as_i64() { + is_match = true; + break; + } + } + is_match + } else { + false + } + } else { + false + } + } else { + false + } + } + Err(_) => false, + }; + + let content = OpLogContent { + op: param.request_method.clone(), + name: resp.extensions().get::().and_then(|info| info.name.clone()).unwrap_or_default(), + user_id: resp.extensions().get::().map(|info| info.id.clone()), + ip: param.request_ip.clone(), + path: param.request_path.clone(), + scheme: param.request_scheme.clone(), + server_timing: start_time.map(|st| end_time - st), + resp_status: resp.status().as_u16().to_string(), + success, + own_paths: resp.extensions().get::().and_then(|info| info.own_paths.clone()), + spec_id: None, + }; + Ok((resp, Some(content))) + } +} + +#[derive(Deserialize, Serialize)] +struct OpLogContent { + pub op: String, + pub name: String, + pub user_id: Option, + pub own_paths: Option, + pub ip: String, + pub path: String, + pub scheme: String, + pub server_timing: Option, + pub resp_status: String, + //Indicates whether the business operation was successful. + pub success: bool, + pub spec_id: Option, +} diff --git a/gateway/spacegate-lib/src/plugin/rewrite_ns_b_ip.rs b/gateway/spacegate-lib/src/plugin/rewrite_ns_b_ip.rs index ad18a0c5a..ba17840ad 100644 --- a/gateway/spacegate-lib/src/plugin/rewrite_ns_b_ip.rs +++ b/gateway/spacegate-lib/src/plugin/rewrite_ns_b_ip.rs @@ -5,19 +5,19 @@ use spacegate_shell::extension::k8s_service::K8sService; use spacegate_shell::hyper::Request; use spacegate_shell::hyper::{http::uri, Response}; use spacegate_shell::kernel::extension::PeerAddr; -use spacegate_shell::plugin::{def_filter_plugin, Filter, PluginError}; -use spacegate_shell::SgBody; +use spacegate_shell::kernel::helper_layers::function::Inner; +use spacegate_shell::plugin::{Plugin, PluginConfig, PluginError}; +use spacegate_shell::{BoxError, SgBody}; use std::net::IpAddr; use std::str::FromStr; use std::sync::Arc; -use tardis::log; - -def_filter_plugin!("rewrite_ns", RewriteNsPlugin, SgFilterRewriteNs); +use tardis::{log, serde_json}; +// def_plugin!("rewrite_ns", RewriteNsPlugin, SgFilterRewriteNs); /// Kube available only! #[derive(Clone)] -pub struct SgFilterRewriteNs { +pub struct RewriteNsPlugin { pub ip_list: Arc<[IpNet]>, pub target_ns: String, } @@ -25,17 +25,17 @@ pub struct SgFilterRewriteNs { #[derive(Serialize, Deserialize)] #[cfg_attr(feature = "schema", derive(schemars::JsonSchema))] #[serde(default)] -pub struct SgFilterRewriteNsConfig { +pub struct RewriteNsConfig { pub ip_list: Vec, pub target_ns: String, } -impl<'de> Deserialize<'de> for SgFilterRewriteNs { +impl<'de> Deserialize<'de> for RewriteNsPlugin { fn deserialize(deserializer: D) -> Result where D: serde::Deserializer<'de>, { - SgFilterRewriteNsConfig::deserialize(deserializer).map(|config| { + RewriteNsConfig::deserialize(deserializer).map(|config| { let ip_list: Vec = config .ip_list .iter() @@ -43,12 +43,12 @@ impl<'de> Deserialize<'de> for SgFilterRewriteNs { p.parse() .or(p.parse::().map(IpNet::from)) .map_err(|e| { - log::warn!("[{CODE}] Cannot parse ip `{p}` when loading config: {e}"); + log::warn!("Cannot parse ip `{p}` when loading config: {e}"); }) .ok() }) .collect(); - SgFilterRewriteNs { + RewriteNsPlugin { ip_list: ip_list.into(), target_ns: config.target_ns, } @@ -56,17 +56,37 @@ impl<'de> Deserialize<'de> for SgFilterRewriteNs { } } -impl Default for SgFilterRewriteNsConfig { +impl Default for RewriteNsConfig { fn default() -> Self { - SgFilterRewriteNsConfig { + RewriteNsConfig { ip_list: vec![], target_ns: "default".to_string(), } } } -impl Filter for SgFilterRewriteNs { - fn filter(&self, mut req: Request) -> Result, Response> { +impl Plugin for RewriteNsPlugin { + const CODE: &'static str = "rewrite-ns"; + fn create(plugin_config: PluginConfig) -> Result { + let config: RewriteNsConfig = serde_json::from_value(plugin_config.spec)?; + let ip_list: Vec = config + .ip_list + .iter() + .filter_map(|p| { + p.parse() + .or(p.parse::().map(IpNet::from)) + .map_err(|e| { + log::warn!("Cannot parse ip `{p}` when loading config: {e}"); + }) + .ok() + }) + .collect(); + Ok(RewriteNsPlugin { + ip_list: ip_list.into(), + target_ns: config.target_ns, + }) + } + async fn call(&self, mut req: Request, inner: Inner) -> Result, BoxError> { 'change_ns: { if let Some(k8s_service) = req.extensions().get::().cloned() { let Some(ref ns) = k8s_service.0.namespace else { break 'change_ns }; @@ -86,7 +106,7 @@ impl Filter for SgFilterRewriteNs { } } } - Ok(req) + Ok(inner.call(req).await) } } diff --git a/gateway/spacegate-lib/tests/export_schemas.rs b/gateway/spacegate-lib/tests/export_schemas.rs index 4b8ff4f5b..4d3ec90e6 100644 --- a/gateway/spacegate-lib/tests/export_schemas.rs +++ b/gateway/spacegate-lib/tests/export_schemas.rs @@ -19,7 +19,7 @@ macro_rules! export_plugins { #[test] fn export_schema() { use spacegate_lib::plugin::{ - anti_replay::AntiReplayPlugin, anti_xss::AntiXssPlugin, audit_log::AuditLogPlugin, auth::AuthPlugin, ip_time::SgIpTimePlugin, rewrite_ns_b_ip::RewriteNsPlugin, + anti_replay::AntiReplayPlugin, anti_xss::AntiXssPlugin, audit_log::AuditLogPlugin, auth::AuthPlugin, ip_time::IpTimePlugin, rewrite_ns_b_ip::RewriteNsPlugin, }; export_plugins!("schema": AntiReplayPlugin diff --git a/services/spacegate/Cargo.toml b/services/spacegate/Cargo.toml index d61e501c7..df0352017 100644 --- a/services/spacegate/Cargo.toml +++ b/services/spacegate/Cargo.toml @@ -15,7 +15,7 @@ publish.workspace = true [dependencies] serde.workspace = true lazy_static.workspace = true -spacegate-shell = { workspace = true, features = ["k8s", "plugin-all", "ext-redis", "cache"] } +spacegate-shell = { workspace = true, features = ["k8s", "plugin-all", "ext-redis", "ext-axum", "cache"] } tardis = { workerspace = true } spacegate-lib = { path = "../../gateway/spacegate-lib" } envy = "0.4" diff --git a/services/spacegate/Dockerfile b/services/spacegate/Dockerfile index 8c89f95c5..7d835d24e 100644 --- a/services/spacegate/Dockerfile +++ b/services/spacegate/Dockerfile @@ -1,4 +1,4 @@ -FROM ubuntu +FROM ubuntu:22.04 # Set time zone RUN apt-get update && \ diff --git a/spi/spi-conf/tests/spi_conf_nacos_compatible_test.rs b/spi/spi-conf/tests/spi_conf_nacos_compatible_test.rs index 8ee7e4246..ef80ee70a 100644 --- a/spi/spi-conf/tests/spi_conf_nacos_compatible_test.rs +++ b/spi/spi-conf/tests/spi_conf_nacos_compatible_test.rs @@ -5,7 +5,7 @@ use bios_spi_conf::{ conf_constants::DOMAIN_CODE, dto::conf_auth_dto::{RegisterRequest, RegisterResponse}, }; -use reqwest::header::HeaderName; +use tardis::web::reqwest::header::HeaderName; use tardis::{ basic::{dto::TardisContext, field::TrimString, result::TardisResult}, log,