Skip to content

Commit

Permalink
Merge pull request #896 from 4t145/gateway-notification-support
Browse files Browse the repository at this point in the history
gateway: notification support
  • Loading branch information
4t145 authored Jan 9, 2025
2 parents e429cc1 + def6843 commit 8de4069
Show file tree
Hide file tree
Showing 23 changed files with 864 additions and 233 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ strum = { version = "0.26", features = ["derive"] }
# tardis
# tardis = { version = "0.1.0-rc.17" }
# tardis = { version = "0.2.0", path = "../tardis/tardis" }
tardis = { git = "https://github.com/ideal-world/tardis.git", rev = "f666d50" }
tardis = { git = "https://github.com/ideal-world/tardis.git", rev = "0058079" }
# asteroid-mq = { git = "https://github.com/4t145/asteroid-mq.git", rev = "d59c64d" }
asteroid-mq = { git = "https://github.com/4t145/asteroid-mq.git", rev = "b26fa4f" }
# asteroid-mq = { version = "0.1.0-alpha.5" }
Expand All @@ -78,7 +78,7 @@ asteroid-mq-sdk = { git = "https://github.com/4t145/asteroid-mq.git", rev = "b26
# "k8s",
# "ext-axum",
# ] }
spacegate-shell = { git = "https://github.com/ideal-world/spacegate.git", rev="fe747e4", features = [
spacegate-shell = { git = "https://github.com/ideal-world/spacegate.git", rev="8065bb6", features = [
"cache",
"k8s",
"ext-axum",
Expand Down
2 changes: 1 addition & 1 deletion backend/basic/src/rbum/serv/rbum_cert_serv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,7 @@ impl RbumCertServ {
let rbum_cert = funs.db().get_dto::<IdAndSkResp>(&query).await?;
if let Some(rbum_cert) = rbum_cert {
if Self::cert_is_locked(&rbum_cert.rel_rbum_id, funs).await? {
return Err(funs.err().unauthorized(&Self::get_obj_name(), "valid", "cert is locked", "400-rbum-cert-lock"));
return Err(funs.err().error("400-rbum-cert-lock", &Self::get_obj_name(), "valid", "cert is locked", "400-rbum-cert-lock"));
}
if !ignore_end_time && rbum_cert.end_time < Utc::now() {
return Err(funs.err().conflict(&Self::get_obj_name(), "valid", "sk is expired", "409-rbum-cert-sk-expire"));
Expand Down
2 changes: 2 additions & 0 deletions backend/gateways/spacegate-plugins/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ spacegate-shell = { workspace = true, features = [
"ext-redis",
"ext-axum",
"plugin-east-west-traffic-white-list",
"plugin-limit"
] }

bios-sdk-invoke = { version = "0.2.0", path = "../../../frontend/sdks/invoke", features = [
"spi_log",
"reach",
], default-features = false }


Expand Down
31 changes: 0 additions & 31 deletions backend/gateways/spacegate-plugins/src/extension.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,6 @@
use http::Extensions;
use spacegate_shell::{kernel::extension::ExtensionPack as _, BoxError};
use tardis::serde_json::{self, Value};

use self::audit_log_param::LogParamContent;

pub mod audit_log_param;
pub mod before_encrypt_body;
pub mod cert_info;
pub mod notification;
pub mod request_crypto_status;
pub enum ExtensionPackEnum {
LogParamContent(),
None,
}

impl From<String> for ExtensionPackEnum {
fn from(value: String) -> Self {
match value.as_str() {
"log_content" => ExtensionPackEnum::LogParamContent(),
_ => ExtensionPackEnum::None,
}
}
}
impl ExtensionPackEnum {
pub fn _to_value(&self, ext: &Extensions) -> Result<Option<Value>, BoxError> {
match self {
ExtensionPackEnum::LogParamContent() => {
if let Some(ext) = LogParamContent::get(ext) {
return Ok(Some(serde_json::to_value(ext)?));
}
}
ExtensionPackEnum::None => (),
}
Ok(None)
}
}
111 changes: 106 additions & 5 deletions backend/gateways/spacegate-plugins/src/extension/audit_log_param.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,25 @@
use bios_sdk_invoke::clients::spi_log_client::{self, LogItemAddV2Req};
use http::uri::Scheme;
use serde::{Deserialize, Serialize};
use spacegate_shell::{
hyper::HeaderMap,
kernel::{
extension::{EnterTime, OriginalIpAddr, PeerAddr},
Extract,
},
SgRequestExt, SgResponse,
};
use std::time::Duration;
use tardis::{basic::dto::TardisContext, TardisFuns, TardisFunsInst};
use tardis::{log as tracing, tokio};

use serde::{Deserialize, Serialize};
use spacegate_shell::{hyper::HeaderMap, kernel::extension::ExtensionPack};

use super::cert_info::RoleInfo;

#[derive(Clone)]
use crate::{audit_log::AuditLogPlugin, plugin::PluginBiosExt};

use super::cert_info::{CertInfo, RoleInfo};

#[derive(Debug, Clone)]
pub struct AuditLogParam {
pub request_path: String,
pub request_method: String,
Expand All @@ -14,6 +28,41 @@ pub struct AuditLogParam {
pub request_ip: String,
}

impl Extract for AuditLogParam {
fn extract(req: &http::Request<spacegate_shell::SgBody>) -> Self {
AuditLogParam {
request_path: req.uri().path().to_string(),
request_method: req.method().to_string(),
request_headers: req.headers().clone(),
request_scheme: req.uri().scheme().unwrap_or(&Scheme::HTTP).to_string(),
request_ip: req.extract::<OriginalIpAddr>().to_string(),
}
}
}

impl AuditLogParam {
pub fn merge_audit_log_param_content(self, response: &SgResponse, success: bool, header_token_name: &str) -> LogParamContent {
let cert_info = response.extensions().get::<CertInfo>();
let start_time = response.extensions().get::<EnterTime>().map(|time| time.0);
let end_time = std::time::Instant::now();
let param = self;
LogParamContent {
op: param.request_method,
name: cert_info.and_then(|info| info.name.clone()).unwrap_or_default(),
user_id: cert_info.map(|info| info.id.clone()),
role: cert_info.map(|info| info.roles.clone()).unwrap_or_default(),
ip: param.request_ip,
path: param.request_path,
scheme: param.request_scheme,
token: param.request_headers.get(header_token_name).and_then(|v| v.to_str().ok().map(|v| v.to_string())),
server_timing: start_time.map(|st| end_time - st),
resp_status: response.status().as_u16().to_string(),
success,
own_paths: cert_info.and_then(|info| info.own_paths.clone()),
}
}
}

#[derive(Clone, Serialize, Deserialize)]
pub struct LogParamContent {
pub op: String,
Expand All @@ -31,4 +80,56 @@ pub struct LogParamContent {
pub success: bool,
}

impl ExtensionPack for LogParamContent {}
impl LogParamContent {
pub fn send_audit_log(self, spi_app_id: &str, log_url: &str, tag: &str) {
send_audit_log(spi_app_id, log_url, tag, self)
}
}



fn send_audit_log(spi_app_id: &str, log_url: &str, tag: &str, content: LogParamContent) {
let funs = AuditLogPlugin::get_funs_inst_by_plugin_code();

let spi_ctx = TardisContext {
ak: spi_app_id.to_string(),
own_paths: spi_app_id.to_string(),
..Default::default()
};

let tag = tag.to_string();
if !log_url.is_empty() && spi_app_id.is_empty() {
tokio::task::spawn(async move {
match spi_log_client::SpiLogClient::addv2(
LogItemAddV2Req {
tag,
content: TardisFuns::json.obj_to_json(&content).unwrap_or_default(),
kind: None,
ext: Some(content.to_value()),
key: None,
op: Some(content.op),
rel_key: None,
idempotent_id: None,
ts: Some(tardis::chrono::Utc::now()),
owner: content.user_id,
own_paths: None,
msg: None,
owner_name: None,
push: false,
disable: None,
},
&funs,
&spi_ctx,
)
.await
{
Ok(_) => {
tracing::trace!("[Plugin.AuditLog] add log success")
}
Err(e) => {
tracing::warn!("[Plugin.AuditLog] failed to add log:{e}")
}
};
});
}
}
151 changes: 120 additions & 31 deletions backend/gateways/spacegate-plugins/src/extension/notification.rs
Original file line number Diff line number Diff line change
@@ -1,59 +1,148 @@
use std::{borrow::Cow, collections::HashMap, sync::Arc};
use std::{collections::HashMap, sync::Arc, time::Duration};

use http::{header::CONTENT_TYPE, HeaderName, HeaderValue, Uri};
use serde::{Deserialize, Serialize};
use spacegate_shell::{kernel::backend_service::http_client_service::HttpClient, SgBody, SgRequest};
use tardis::{log as tracing, serde_json};
use tardis::{basic::dto::TardisContext, log as tracing, tokio};

/// Context to call notification api
///
/// Extract it from request extensions, and call [`NotificationContext::notify`] to send notification
/// Extract it from request extensions, and call [`NotificationContext::report`] to send notification
#[derive(Debug, Clone)]
pub struct NotificationContext {
pub(crate) api: Arc<Uri>,
pub(crate) headers: Arc<HashMap<HeaderName, HeaderValue>>,
pub(crate) client: HttpClient,
pub(crate) reach_api: Arc<str>,
pub(crate) log_api: Arc<str>,
pub(crate) spi_app_id: Arc<str>,
pub(crate) audit_log_tag: Arc<str>,
pub(crate) audit_log_token_header_name: Arc<str>,
pub(crate) templates: Arc<HashMap<String, NotifyPluginConfigTemplatesItem>>,
pub(crate) audit_param: Arc<AuditLogParam>,
pub(crate) cache_client: RedisClient,
pub(crate) dedup_cache_cool_down: Duration,
}

impl NotificationContext {
fn build_notification_request(&self, req: &ReachMsgSendReq) -> SgRequest {
let req_bytes = serde_json::to_vec(&req).expect("ReachMsgSendReq is a valid json");
let body = SgBody::full(req_bytes);
let mut req = SgRequest::new(body);
req.headers_mut().insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
*req.uri_mut() = self.api.as_ref().clone();
for (k, v) in self.headers.iter() {
req.headers_mut().insert(k.clone(), v.clone());
pub fn submit_notify(&self, req: ReachMsgSendReq, dedup_hash: u64) {
if self.reach_api.is_empty() {
tracing::debug!("reach api is empty, skip sending notification");
return;
}
req
let cache_client = self.cache_client.clone();
let ctx = TardisContext {
ak: self.spi_app_id.to_string(),
own_paths: self.spi_app_id.to_string(),
..Default::default()
};
let cool_down = self.dedup_cache_cool_down.as_secs().min(1);
tokio::spawn(async move {
let key = format!("sg:plugin:{}:{}", NotifyPlugin::CODE, dedup_hash);
let mut conn = cache_client.get_conn().await;
// check if the key exists
if let Ok(Some(_)) = conn.get::<'_, _, Option<String>>(&key).await {
tracing::debug!("dedup cache hit, skip sending notification");
return;
}

// set the dedup key
if let Err(e) = conn.set_ex::<'_, _, _,Option<String>>(&key, "1", cool_down).await {
tracing::error!(error = ?e, "set dedup cache failed");
return;
}

let funs = NotifyPlugin::get_funs_inst_by_plugin_code();
let response = bios_sdk_invoke::clients::reach_client::ReachClient::send_message(&req.into(), &funs, &ctx).await;
if let Err(e) = response {
tracing::error!(error = ?e, "send notification failed");
}
});
}
pub async fn notify(&self, req: &ReachMsgSendReq) {
let notify_response = self.client.clone().request(self.build_notification_request(req)).await;
if !notify_response.status().is_success() {
tracing::warn!(response = ?notify_response, "send notification failed");
}
pub fn report<R: Report>(&self, response: &SgResponse, report: R) {
let replace = report.get_replacement();
let key = report.key();

let Ok(response) = notify_response.into_body().dump().await.inspect_err(|e| {
tracing::error!(error = ?e, "failed to read response body");
}) else {
return;
};
let response_str = String::from_utf8_lossy(response.get_dumped().expect("just dump body"));
tracing::debug!(response = ?response_str, "receive notification api response");
if let Some(template) = self.templates.get(key) {
if let Some(notify_req) = template.reach.as_ref() {
let mut req = notify_req.clone();
req.merge_replace(replace.clone());
let context = self.clone();
context.submit_notify(req, report.dedup_hash());
}
if let Some(log_template) = template.audit_log.as_ref() {
let formatted = format_template(log_template, &replace);
self.submit_audit_log(response, Some(formatted));
}
}
}
pub fn submit_audit_log(&self, response: &SgResponse, extra_info: Option<String>) {
let mut log_param_content = self.audit_param.as_ref().clone().merge_audit_log_param_content(response, true, &self.audit_log_token_header_name);
if let Some(extra_info) = extra_info {
log_param_content.op = extra_info;
}
log_param_content.send_audit_log(&self.spi_app_id, &self.log_api, &self.audit_log_tag);
}
}

#[derive(Debug, Serialize, Deserialize)]
pub struct TamperReport {}

pub struct UnauthorizedOperationReport {}

pub struct CertLockReport {}

#[derive(Debug, Clone)]
pub struct ContentFilterForbiddenReport {
pub(crate) forbidden_reason: String,
}

use spacegate_shell::{
ext_redis::{redis::AsyncCommands, RedisClient},
plugin::{
schemars::{self, JsonSchema},
Plugin,
},
SgResponse,
};

use crate::plugin::{
notify::{format_template, NotifyPlugin, NotifyPluginConfigTemplates, NotifyPluginConfigTemplatesItem, Report},
PluginBiosExt,
};

use super::audit_log_param::AuditLogParam;
#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
pub struct ReachMsgSendReq {
pub scene_code: String,
pub receives: Vec<ReachMsgReceive>,
pub rel_item_id: String,
pub replace: HashMap<String, String>,
}

#[derive(Debug, Serialize, Deserialize)]
impl ReachMsgSendReq {
pub fn merge_replace<K: Into<String>>(&mut self, replace: impl IntoIterator<Item = (K, String)>) {
self.replace.extend(replace.into_iter().map(|(k, v)| (k.into(), v)));
}
}

impl From<ReachMsgSendReq> for bios_sdk_invoke::clients::reach_client::ReachMsgSendReq {
fn from(val: ReachMsgSendReq) -> Self {
bios_sdk_invoke::clients::reach_client::ReachMsgSendReq {
scene_code: val.scene_code,
receives: val.receives.into_iter().map(Into::into).collect(),
rel_item_id: val.rel_item_id,
replace: val.replace,
}
}
}
#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
pub struct ReachMsgReceive {
pub receive_group_code: String,
pub receive_kind: String,
pub receive_ids: Vec<String>,
}

impl From<ReachMsgReceive> for bios_sdk_invoke::clients::reach_client::ReachMsgReceive {
fn from(val: ReachMsgReceive) -> Self {
bios_sdk_invoke::clients::reach_client::ReachMsgReceive {
receive_group_code: val.receive_group_code,
receive_kind: val.receive_kind,
receive_ids: val.receive_ids,
}
}
}
1 change: 1 addition & 0 deletions backend/gateways/spacegate-plugins/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod consts;
mod extension;
mod marker;
mod plugin;
mod utils;

pub const PACKAGE_NAME: &str = "spacegate_lib";
use plugin::{notify, op_redis_publisher};
Expand Down
Loading

0 comments on commit 8de4069

Please sign in to comment.