Skip to content

Commit 8e33267

Browse files
committed
Merge branch 'main' into zz-flow-0104
2 parents f60d868 + 091bde2 commit 8e33267

File tree

27 files changed

+1015
-244
lines changed

27 files changed

+1015
-244
lines changed

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ strum = { version = "0.26", features = ["derive"] }
6565
# tardis
6666
# tardis = { version = "0.1.0-rc.17" }
6767
# tardis = { version = "0.2.0", path = "../tardis/tardis" }
68-
tardis = { git = "https://github.com/ideal-world/tardis.git", rev = "f666d50" }
68+
tardis = { git = "https://github.com/ideal-world/tardis.git", rev = "aeb4c85" }
6969
# asteroid-mq = { git = "https://github.com/4t145/asteroid-mq.git", rev = "d59c64d" }
7070
asteroid-mq = { git = "https://github.com/4t145/asteroid-mq.git", rev = "b26fa4f" }
7171
# asteroid-mq = { version = "0.1.0-alpha.5" }
@@ -78,7 +78,7 @@ asteroid-mq-sdk = { git = "https://github.com/4t145/asteroid-mq.git", rev = "b26
7878
# "k8s",
7979
# "ext-axum",
8080
# ] }
81-
spacegate-shell = { git = "https://github.com/ideal-world/spacegate.git", rev="fe747e4", features = [
81+
spacegate-shell = { git = "https://github.com/ideal-world/spacegate.git", rev="8065bb6", features = [
8282
"cache",
8383
"k8s",
8484
"ext-axum",

backend/basic/src/rbum/serv/rbum_cert_serv.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -842,7 +842,7 @@ impl RbumCertServ {
842842
let rbum_cert = funs.db().get_dto::<IdAndSkResp>(&query).await?;
843843
if let Some(rbum_cert) = rbum_cert {
844844
if Self::cert_is_locked(&rbum_cert.rel_rbum_id, funs).await? {
845-
return Err(funs.err().unauthorized(&Self::get_obj_name(), "valid", "cert is locked", "400-rbum-cert-lock"));
845+
return Err(funs.err().error("400-rbum-cert-lock", &Self::get_obj_name(), "valid", "cert is locked", "400-rbum-cert-lock"));
846846
}
847847
if !ignore_end_time && rbum_cert.end_time < Utc::now() {
848848
return Err(funs.err().conflict(&Self::get_obj_name(), "valid", "sk is expired", "409-rbum-cert-sk-expire"));

backend/gateways/spacegate-plugins/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,12 @@ spacegate-shell = { workspace = true, features = [
2222
"ext-redis",
2323
"ext-axum",
2424
"plugin-east-west-traffic-white-list",
25+
"plugin-limit"
2526
] }
2627

2728
bios-sdk-invoke = { version = "0.2.0", path = "../../../frontend/sdks/invoke", features = [
2829
"spi_log",
30+
"reach",
2931
], default-features = false }
3032

3133

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,6 @@
1-
use http::Extensions;
2-
use spacegate_shell::{kernel::extension::ExtensionPack as _, BoxError};
3-
use tardis::serde_json::{self, Value};
4-
5-
use self::audit_log_param::LogParamContent;
61

72
pub mod audit_log_param;
83
pub mod before_encrypt_body;
94
pub mod cert_info;
105
pub mod notification;
116
pub mod request_crypto_status;
12-
pub enum ExtensionPackEnum {
13-
LogParamContent(),
14-
None,
15-
}
16-
17-
impl From<String> for ExtensionPackEnum {
18-
fn from(value: String) -> Self {
19-
match value.as_str() {
20-
"log_content" => ExtensionPackEnum::LogParamContent(),
21-
_ => ExtensionPackEnum::None,
22-
}
23-
}
24-
}
25-
impl ExtensionPackEnum {
26-
pub fn _to_value(&self, ext: &Extensions) -> Result<Option<Value>, BoxError> {
27-
match self {
28-
ExtensionPackEnum::LogParamContent() => {
29-
if let Some(ext) = LogParamContent::get(ext) {
30-
return Ok(Some(serde_json::to_value(ext)?));
31-
}
32-
}
33-
ExtensionPackEnum::None => (),
34-
}
35-
Ok(None)
36-
}
37-
}

backend/gateways/spacegate-plugins/src/extension/audit_log_param.rs

Lines changed: 100 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,23 @@
1+
use bios_sdk_invoke::clients::spi_log_client::{self, LogItemAddV2Req};
2+
use http::uri::Scheme;
3+
use serde::{Deserialize, Serialize};
4+
use spacegate_shell::{
5+
hyper::HeaderMap,
6+
kernel::{
7+
extension::{EnterTime, OriginalIpAddr, PeerAddr},
8+
Extract,
9+
},
10+
SgRequestExt, SgResponse,
11+
};
112
use std::time::Duration;
13+
use tardis::{basic::dto::TardisContext, TardisFuns, TardisFunsInst};
14+
use tardis::{log as tracing, tokio};
215

3-
use serde::{Deserialize, Serialize};
4-
use spacegate_shell::{hyper::HeaderMap, kernel::extension::ExtensionPack};
16+
use crate::{audit_log::AuditLogPlugin, plugin::PluginBiosExt};
517

6-
use super::cert_info::RoleInfo;
18+
use super::cert_info::{CertInfo, RoleInfo};
719

8-
#[derive(Clone)]
20+
#[derive(Debug, Clone)]
921
pub struct AuditLogParam {
1022
pub request_path: String,
1123
pub request_method: String,
@@ -14,6 +26,41 @@ pub struct AuditLogParam {
1426
pub request_ip: String,
1527
}
1628

29+
impl Extract for AuditLogParam {
30+
fn extract(req: &http::Request<spacegate_shell::SgBody>) -> Self {
31+
AuditLogParam {
32+
request_path: req.uri().path().to_string(),
33+
request_method: req.method().to_string(),
34+
request_headers: req.headers().clone(),
35+
request_scheme: req.uri().scheme().unwrap_or(&Scheme::HTTP).to_string(),
36+
request_ip: req.extract::<OriginalIpAddr>().to_string(),
37+
}
38+
}
39+
}
40+
41+
impl AuditLogParam {
42+
pub fn merge_audit_log_param_content(self, response: &SgResponse, success: bool, header_token_name: &str) -> LogParamContent {
43+
let cert_info = response.extensions().get::<CertInfo>();
44+
let start_time = response.extensions().get::<EnterTime>().map(|time| time.0);
45+
let end_time = std::time::Instant::now();
46+
let param = self;
47+
LogParamContent {
48+
op: param.request_method,
49+
name: cert_info.and_then(|info| info.name.clone()).unwrap_or_default(),
50+
user_id: cert_info.map(|info| info.id.clone()),
51+
role: cert_info.map(|info| info.roles.clone()).unwrap_or_default(),
52+
ip: param.request_ip,
53+
path: param.request_path,
54+
scheme: param.request_scheme,
55+
token: param.request_headers.get(header_token_name).and_then(|v| v.to_str().ok().map(|v| v.to_string())),
56+
server_timing: start_time.map(|st| end_time - st),
57+
resp_status: response.status().as_u16().to_string(),
58+
success,
59+
own_paths: cert_info.and_then(|info| info.own_paths.clone()),
60+
}
61+
}
62+
}
63+
1764
#[derive(Clone, Serialize, Deserialize)]
1865
pub struct LogParamContent {
1966
pub op: String,
@@ -31,4 +78,52 @@ pub struct LogParamContent {
3178
pub success: bool,
3279
}
3380

34-
impl ExtensionPack for LogParamContent {}
81+
impl LogParamContent {
82+
pub fn send_audit_log<P: PluginBiosExt>(self, spi_app_id: &str, log_url: &str, tag: &str) {
83+
send_audit_log(spi_app_id, log_url, tag, self,P::get_funs_inst_by_plugin_code());
84+
}
85+
}
86+
87+
fn send_audit_log(spi_app_id: &str, log_url: &str, tag: &str, content: LogParamContent, funs: TardisFunsInst) {
88+
let spi_ctx = TardisContext {
89+
ak: spi_app_id.to_string(),
90+
own_paths: spi_app_id.to_string(),
91+
..Default::default()
92+
};
93+
94+
let tag = tag.to_string();
95+
if !log_url.is_empty() && !spi_app_id.is_empty() {
96+
tokio::task::spawn(async move {
97+
match spi_log_client::SpiLogClient::addv2(
98+
LogItemAddV2Req {
99+
tag,
100+
content: TardisFuns::json.obj_to_json(&content).unwrap_or_default(),
101+
kind: None,
102+
ext: Some(content.to_value()),
103+
key: None,
104+
op: Some(content.op),
105+
rel_key: None,
106+
idempotent_id: None,
107+
ts: Some(tardis::chrono::Utc::now()),
108+
owner: content.user_id,
109+
own_paths: None,
110+
msg: None,
111+
owner_name: None,
112+
push: false,
113+
disable: None,
114+
},
115+
&funs,
116+
&spi_ctx,
117+
)
118+
.await
119+
{
120+
Ok(_) => {
121+
tracing::debug!("[Plugin.AuditLog] add log success")
122+
}
123+
Err(e) => {
124+
tracing::warn!("[Plugin.AuditLog] failed to add log:{e}")
125+
}
126+
};
127+
});
128+
}
129+
}
Lines changed: 129 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,59 +1,157 @@
1-
use std::{borrow::Cow, collections::HashMap, sync::Arc};
1+
use std::{collections::HashMap, sync::Arc, time::Duration};
22

3-
use http::{header::CONTENT_TYPE, HeaderName, HeaderValue, Uri};
43
use serde::{Deserialize, Serialize};
5-
use spacegate_shell::{kernel::backend_service::http_client_service::HttpClient, SgBody, SgRequest};
6-
use tardis::{log as tracing, serde_json};
4+
use tardis::{basic::dto::TardisContext, log as tracing, tokio};
75

86
/// Context to call notification api
97
///
10-
/// Extract it from request extensions, and call [`NotificationContext::notify`] to send notification
8+
/// Extract it from request extensions, and call [`NotificationContext::report`] to send notification
119
#[derive(Debug, Clone)]
1210
pub struct NotificationContext {
13-
pub(crate) api: Arc<Uri>,
14-
pub(crate) headers: Arc<HashMap<HeaderName, HeaderValue>>,
15-
pub(crate) client: HttpClient,
11+
pub(crate) reach_api: Arc<str>,
12+
pub(crate) log_api: Arc<str>,
13+
pub(crate) spi_app_id: Arc<str>,
14+
pub(crate) audit_log_tag: Arc<str>,
15+
pub(crate) audit_log_token_header_name: Arc<str>,
16+
pub(crate) templates: Arc<HashMap<String, NotifyPluginConfigTemplatesItem>>,
17+
pub(crate) audit_param: Arc<AuditLogParam>,
18+
pub(crate) cache_client: RedisClient,
19+
pub(crate) dedup_cache_cool_down: Duration,
1620
}
1721

1822
impl NotificationContext {
19-
fn build_notification_request(&self, req: &ReachMsgSendReq) -> SgRequest {
20-
let req_bytes = serde_json::to_vec(&req).expect("ReachMsgSendReq is a valid json");
21-
let body = SgBody::full(req_bytes);
22-
let mut req = SgRequest::new(body);
23-
req.headers_mut().insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
24-
*req.uri_mut() = self.api.as_ref().clone();
25-
for (k, v) in self.headers.iter() {
26-
req.headers_mut().insert(k.clone(), v.clone());
23+
pub fn submit_notify(&self, req: ReachRequest, dedup_hash: u64) {
24+
if self.reach_api.is_empty() {
25+
tracing::debug!("reach api is empty, skip sending notification");
26+
return;
2727
}
28-
req
28+
let cache_client = self.cache_client.clone();
29+
let ctx = TardisContext {
30+
ak: self.spi_app_id.to_string(),
31+
own_paths: self.spi_app_id.to_string(),
32+
..Default::default()
33+
};
34+
let cool_down = self.dedup_cache_cool_down.as_secs().min(1);
35+
tokio::spawn(async move {
36+
let key = format!("sg:plugin:{}:dedup:{}", NotifyPlugin::CODE, dedup_hash);
37+
let mut conn = cache_client.get_conn().await;
38+
// check if the key exists
39+
if let Ok(Some(_)) = conn.get::<'_, _, Option<String>>(&key).await {
40+
tracing::debug!("dedup cache hit, skip sending notification");
41+
return;
42+
}
43+
44+
// set the dedup key
45+
if let Err(e) = conn.set_ex::<'_, _, _,Option<String>>(&key, "1", cool_down).await {
46+
tracing::error!(error = ?e, "set dedup cache failed");
47+
return;
48+
}
49+
50+
let funs = NotifyPlugin::get_funs_inst_by_plugin_code();
51+
tracing::debug!(?req, "submit notify");
52+
let response = match req {
53+
ReachRequest::ByScene(req) => {
54+
bios_sdk_invoke::clients::reach_client::ReachClient::send_message(&req.into(), &funs, &ctx).await
55+
56+
}
57+
ReachRequest::ByTemplate { contact, template_id, replace } => {
58+
bios_sdk_invoke::clients::reach_client::ReachClient::general_send(&contact, &template_id, &replace, &funs, &ctx).await
59+
}
60+
};
61+
if let Err(e) = response {
62+
tracing::error!(error = ?e, "send notification failed");
63+
}
64+
});
2965
}
30-
pub async fn notify(&self, req: &ReachMsgSendReq) {
31-
let notify_response = self.client.clone().request(self.build_notification_request(req)).await;
32-
if !notify_response.status().is_success() {
33-
tracing::warn!(response = ?notify_response, "send notification failed");
34-
}
66+
pub fn report<R: Report>(&self, response: &SgResponse, report: R) {
67+
let replace = report.get_replacement();
68+
let key = report.key();
3569

36-
let Ok(response) = notify_response.into_body().dump().await.inspect_err(|e| {
37-
tracing::error!(error = ?e, "failed to read response body");
38-
}) else {
39-
return;
40-
};
41-
let response_str = String::from_utf8_lossy(response.get_dumped().expect("just dump body"));
42-
tracing::debug!(response = ?response_str, "receive notification api response");
70+
if let Some(template) = self.templates.get(key) {
71+
if let Some(notify_req) = template.reach.as_ref() {
72+
let mut req = notify_req.clone();
73+
req.merge_replace(replace.clone());
74+
let context = self.clone();
75+
context.submit_notify(req, report.dedup_hash());
76+
}
77+
if let Some(log_template) = template.audit_log.as_ref() {
78+
let formatted = format_template(log_template, &replace);
79+
self.submit_audit_log(response, Some(formatted));
80+
}
81+
}
82+
}
83+
pub fn submit_audit_log(&self, response: &SgResponse, extra_info: Option<String>) {
84+
let mut log_param_content = self.audit_param.as_ref().clone().merge_audit_log_param_content(response, true, &self.audit_log_token_header_name);
85+
if let Some(extra_info) = extra_info {
86+
log_param_content.op = extra_info;
87+
}
88+
log_param_content.send_audit_log::<NotifyPlugin>(&self.spi_app_id, &self.log_api, &self.audit_log_tag);
4389
}
4490
}
4591

46-
#[derive(Debug, Serialize, Deserialize)]
92+
pub struct TamperReport {}
93+
94+
pub struct UnauthorizedOperationReport {}
95+
96+
pub struct CertLockReport {}
97+
98+
#[derive(Debug, Clone)]
99+
pub struct ContentFilterForbiddenReport {
100+
pub(crate) forbidden_reason: String,
101+
}
102+
103+
use spacegate_shell::{
104+
ext_redis::{redis::AsyncCommands, RedisClient},
105+
plugin::{
106+
schemars::{self, JsonSchema},
107+
Plugin,
108+
},
109+
SgResponse,
110+
};
111+
112+
use crate::plugin::{
113+
notify::{format_template, NotifyPlugin, NotifyPluginConfigTemplatesItem, ReachRequest, Report},
114+
PluginBiosExt,
115+
};
116+
117+
use super::audit_log_param::AuditLogParam;
118+
#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
47119
pub struct ReachMsgSendReq {
48120
pub scene_code: String,
49121
pub receives: Vec<ReachMsgReceive>,
50122
pub rel_item_id: String,
51123
pub replace: HashMap<String, String>,
52124
}
53125

54-
#[derive(Debug, Serialize, Deserialize)]
126+
impl ReachMsgSendReq {
127+
pub fn merge_replace<K: Into<String>>(&mut self, replace: impl IntoIterator<Item = (K, String)>) {
128+
self.replace.extend(replace.into_iter().map(|(k, v)| (k.into(), v)));
129+
}
130+
}
131+
132+
impl From<ReachMsgSendReq> for bios_sdk_invoke::clients::reach_client::ReachMsgSendReq {
133+
fn from(val: ReachMsgSendReq) -> Self {
134+
bios_sdk_invoke::clients::reach_client::ReachMsgSendReq {
135+
scene_code: val.scene_code,
136+
receives: val.receives.into_iter().map(Into::into).collect(),
137+
rel_item_id: val.rel_item_id,
138+
replace: val.replace,
139+
}
140+
}
141+
}
142+
#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
55143
pub struct ReachMsgReceive {
56144
pub receive_group_code: String,
57145
pub receive_kind: String,
58146
pub receive_ids: Vec<String>,
59147
}
148+
149+
impl From<ReachMsgReceive> for bios_sdk_invoke::clients::reach_client::ReachMsgReceive {
150+
fn from(val: ReachMsgReceive) -> Self {
151+
bios_sdk_invoke::clients::reach_client::ReachMsgReceive {
152+
receive_group_code: val.receive_group_code,
153+
receive_kind: val.receive_kind,
154+
receive_ids: val.receive_ids,
155+
}
156+
}
157+
}

0 commit comments

Comments
 (0)