Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flow: finish front action #485

Merged
merged 6 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions basic/src/rbum/dto/rbum_safe_dto.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use serde::{Serialize, Deserialize};
use serde::{Deserialize, Serialize};
#[cfg(feature = "default")]
use tardis::db::sea_orm;
use tardis::{
chrono::{DateTime, Utc},
web::poem_openapi,
};
#[cfg(feature = "default")]
use tardis::db::sea_orm;
#[derive(Debug, Clone, Default, Serialize)]
#[cfg_attr(feature = "default", derive(poem_openapi::Object, sea_orm::FromQueryResult))]
pub struct RbumSafeSummaryResp {
Expand Down
2 changes: 1 addition & 1 deletion clients/hwsms/src/ext/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Extented features

#[cfg(feature = "reach")]
mod reach;
mod reach;
14 changes: 10 additions & 4 deletions clients/hwsms/src/ext/reach.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
use std::{collections::HashSet, sync::{Arc, OnceLock}};
use std::{
collections::HashSet,
sync::{Arc, OnceLock},
};

use bios_reach::{
client::{GenericTemplate, SendChannel},
dto::{ContentReplace, ReachChannelKind}, config::ReachConfig, consts::MODULE_CODE,
config::ReachConfig,
consts::MODULE_CODE,
dto::{ContentReplace, ReachChannelKind},
};
use tardis::{
async_trait::async_trait,
basic::{error::TardisError, result::TardisResult}, TardisFuns,
basic::{error::TardisError, result::TardisResult},
TardisFuns,
};

use crate::{SendSmsRequest, SmsContent, SmsClient};
use crate::{SendSmsRequest, SmsClient, SmsContent};

#[async_trait]
impl SendChannel for crate::SmsClient {
Expand Down
7 changes: 4 additions & 3 deletions clients/hwsms/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@

use tardis::{
basic::result::TardisResult,
chrono::{Utc, SecondsFormat},
chrono::{SecondsFormat, Utc},
crypto::rust_crypto::sha2::Sha256,
rand::random,
url::Url,
web::reqwest::{
header::{HeaderMap, HeaderValue, AUTHORIZATION},
Client,
}, crypto::rust_crypto::sha2::Sha256,
},
};
mod ext;
mod api;
mod ext;
pub use api::*;
mod model;
pub use model::*;
Expand Down
50 changes: 28 additions & 22 deletions gateway/spacegate/src/plugin/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub struct SgFilterAuth {
/// |---------------|-----------|---------------------|
/// | `/apis` | `apis` | `/{true_url}` |
/// |`/prefix/apis` | `apis` |`/prefix/{true_url}` |
mix_replace_url:String,
mix_replace_url: String,
}

impl Default for SgFilterAuth {
Expand Down Expand Up @@ -201,9 +201,9 @@ impl SgPluginFilter for SgFilterAuth {
return Ok((true, ctx));
}

log::trace!("[Plugin.Auth] request filter info: request path is {}",ctx.request.get_uri().path());
log::trace!("[Plugin.Auth] request filter info: request path is {}", ctx.request.get_uri().path());
if ctx.request.get_method().eq(&Method::GET) && ctx.request.get_uri().path() == self.fetch_server_config_path.as_str() {
log::debug!("[Plugin.Auth] request path hit fetch server config path: {}",self.fetch_server_config_path);
log::debug!("[Plugin.Auth] request path hit fetch server config path: {}", self.fetch_server_config_path);
ctx.set_action(SgRouteFilterRequestAction::Response);
let mut headers = HeaderMap::new();
headers.insert(http::header::CONTENT_TYPE, HeaderValue::from_static("application/json"));
Expand All @@ -223,7 +223,7 @@ impl SgPluginFilter for SgFilterAuth {

if self.auth_config.strict_security_mode && !is_true_mix_req {
log::debug!("[Plugin.Auth] handle mix request");
let mut ctx = mix_req_to_ctx(&self.auth_config,&self.mix_replace_url, ctx).await?;
let mut ctx = mix_req_to_ctx(&self.auth_config, &self.mix_replace_url, ctx).await?;
ctx.request.set_header_str(&self.header_is_mix_req, "true")?;
return Ok((false, ctx));
}
Expand Down Expand Up @@ -294,7 +294,7 @@ impl SgPluginFilter for SgFilterAuth {
}
}

async fn mix_req_to_ctx(auth_config: &AuthConfig,mix_replace_url:&str, mut ctx: SgRoutePluginContext) -> TardisResult<SgRoutePluginContext> {
async fn mix_req_to_ctx(auth_config: &AuthConfig, mix_replace_url: &str, mut ctx: SgRoutePluginContext) -> TardisResult<SgRoutePluginContext> {
let body = ctx.request.take_body_into_bytes().await?;
let string_body = String::from_utf8_lossy(&body).trim_matches('"').to_string();
if string_body.is_empty() {
Expand Down Expand Up @@ -355,7 +355,7 @@ async fn mix_req_to_ctx(auth_config: &AuthConfig,mix_replace_url:&str, mut ctx:
None => real_ip,
};
ctx.request.set_header_str("X-Forwarded-For", &forwarded_for)?;
ctx.request.set_header_str(hyper::header::CONTENT_LENGTH.as_str(),mix_body.body.as_bytes().len().to_string().as_str())?;
ctx.request.set_header_str(hyper::header::CONTENT_LENGTH.as_str(), mix_body.body.as_bytes().len().to_string().as_str())?;
ctx.request.set_body(mix_body.body);
Ok(ctx)
}
Expand Down Expand Up @@ -471,7 +471,7 @@ mod tests {
use super::*;

#[tokio::test]
async fn test(){
async fn test() {
env::set_var("RUST_LOG", "info,bios_spacegate=trace,bios_auth=trace,tardis=trace");
tracing_subscriber::fmt::init();

Expand Down Expand Up @@ -733,7 +733,7 @@ mod tests {
cache_url: env::var("TARDIS_FW.CACHE.URL").unwrap(),
..Default::default()
};
filter_auth.auth_config.strict_security_mode=true;
filter_auth.auth_config.strict_security_mode = true;

filter_auth
.init(&SgPluginFilterInitDto {
Expand Down Expand Up @@ -765,7 +765,7 @@ mod tests {
let data: Value = serde_json::from_str(&String::from_utf8_lossy(
&hyper::body::to_bytes(server_config_resp.body_mut()).await.unwrap().iter().cloned().collect::<Vec<u8>>(),
))
.unwrap();
.unwrap();

let pub_key = data["data"]["pub_key"].as_str().unwrap();
let server_sm2 = TardisCryptoSm2 {};
Expand All @@ -774,24 +774,28 @@ mod tests {
let front_pri_key = TardisFuns::crypto.sm2.new_private_key().unwrap();
let front_pub_key = TardisFuns::crypto.sm2.new_public_key(&front_pri_key).unwrap();


//=========request GET by apis============
let true_path="get_path";
let body=MixRequestBody{
let true_path = "get_path";
let body = MixRequestBody {
method: "GET".to_string(),
uri: true_path.to_string(),
body: "".to_string(),
headers: Default::default(),
ts: 0.0,
};
let mix_body=TardisFuns::json.obj_to_string(&body).unwrap();
let mix_body = TardisFuns::json.obj_to_string(&body).unwrap();
let mut header = HeaderMap::new();
let (crypto_body, bios_crypto_value) = crypto_req(&mix_body, server_public_key.serialize().unwrap().as_ref(), front_pub_key.serialize().unwrap().as_ref(), true);
let (crypto_body, bios_crypto_value) = crypto_req(
&mix_body,
server_public_key.serialize().unwrap().as_ref(),
front_pub_key.serialize().unwrap().as_ref(),
true,
);
header.insert("Bios-Crypto", bios_crypto_value.parse().unwrap());
header.insert(hyper::header::CONTENT_LENGTH,crypto_body.as_bytes().len().to_string().parse().unwrap());
header.insert(hyper::header::CONTENT_LENGTH, crypto_body.as_bytes().len().to_string().parse().unwrap());
let ctx = SgRoutePluginContext::new_http(
Method::POST,
Uri::from_str(&format!("http://sg.idealworld.group/{}",filter_auth.mix_replace_url)).unwrap(),
Uri::from_str(&format!("http://sg.idealworld.group/{}", filter_auth.mix_replace_url)).unwrap(),
Version::HTTP_11,
header,
Body::from(crypto_body),
Expand All @@ -801,17 +805,19 @@ mod tests {
);
let (is_ok, mut before_filter_ctx) = filter_auth.req_filter("", ctx).await.unwrap();
assert!(!is_ok);
assert_eq!(before_filter_ctx.get_action(),&SgRouteFilterRequestAction::Redirect);
assert_eq!(before_filter_ctx.request.get_uri().path(),&format!("/{}",true_path));
assert_eq!(before_filter_ctx.request.get_method(),&Method::GET);
assert_eq!(before_filter_ctx.request.get_headers().get(hyper::header::CONTENT_LENGTH),Some(&HeaderValue::from_static("0")));
assert_eq!(before_filter_ctx.get_action(), &SgRouteFilterRequestAction::Redirect);
assert_eq!(before_filter_ctx.request.get_uri().path(), &format!("/{}", true_path));
assert_eq!(before_filter_ctx.request.get_method(), &Method::GET);
assert_eq!(
before_filter_ctx.request.get_headers().get(hyper::header::CONTENT_LENGTH),
Some(&HeaderValue::from_static("0"))
);
let (is_ok, mut before_filter_ctx) = filter_auth.req_filter("", before_filter_ctx).await.unwrap();
assert!(is_ok);
println!("before_filter_ctx=={:?}",before_filter_ctx);
println!("before_filter_ctx=={:?}", before_filter_ctx);
let req_body = before_filter_ctx.request.dump_body().await.unwrap();
assert!(req_body.is_empty());


filter_auth.destroy().await.unwrap();
}

Expand Down
11 changes: 11 additions & 0 deletions middleware/flow/src/api/cc/flow_cc_inst_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,15 @@ impl FlowCcInstApi {
funs.commit().await?;
TardisResp::ok(Void {})
}

/// trigger instance front action / 触发前置动作
#[oai(path = "/trigger_front_action", method = "get")]
async fn trigger_front_action(&self) -> TardisApiResult<Void> {
let mut funs = flow_constants::get_tardis_inst();
funs.begin().await?;
FlowInstServ::trigger_front_action(&funs).await?;
funs.commit().await?;

TardisResp::ok(Void {})
}
}
13 changes: 11 additions & 2 deletions middleware/flow/src/dto/flow_external_dto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,26 @@ use tardis::web::poem_openapi::{
types::{ParseFromJSON, ToJSON},
};

#[derive(Serialize, Deserialize, Debug, poem_openapi::Object)]
#[derive(Serialize, Deserialize, Debug, Default, poem_openapi::Object)]
pub struct FlowExternalReq {
pub kind: FlowExternalKind,
pub curr_tag: String,
pub curr_bus_obj_id: String,
pub inst_id: String,
pub target_state: Option<String>,
pub original_state: Option<String>,
pub owner_paths: String,
pub obj_ids: Vec<String>,
pub params: Vec<FlowExternalParams>,
}

#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize, poem_openapi::Enum)]
#[derive(Clone, Debug, Default, PartialEq, Eq, Deserialize, Serialize, poem_openapi::Enum)]
pub enum FlowExternalKind {
#[default]
FetchRelObj,
ModifyField,
NotifyChanges,
QueryField,
}

#[derive(Debug, Deserialize, Serialize, poem_openapi::Object, Clone)]
Expand Down Expand Up @@ -59,3 +63,8 @@ pub struct FlowExternalModifyFieldResp {}

#[derive(Serialize, Deserialize, Debug, poem_openapi::Object)]
pub struct FlowExternalNotifyChangesResp {}

#[derive(Serialize, Deserialize, Debug, Default, poem_openapi::Object)]
pub struct FlowExternalQueryFieldResp {
pub objs: Vec<Value>,
}
9 changes: 6 additions & 3 deletions middleware/flow/src/flow_initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ use crate::{
dto::{
flow_model_dto::FlowModelFilterReq,
flow_state_dto::FlowSysStateKind,
flow_transition_dto::{FlowTransitionActionChangeInfo, FlowTransitionDoubleCheckInfo, FlowTransitionInitInfo, FlowTransitionActionChangeKind, FlowTransitionActionByVarChangeInfoChangedKind},
flow_transition_dto::{
FlowTransitionActionByVarChangeInfoChangedKind, FlowTransitionActionChangeInfo, FlowTransitionActionChangeKind, FlowTransitionDoubleCheckInfo, FlowTransitionInitInfo,
},
},
flow_config::{BasicInfo, FlowBasicInfoManager, FlowConfig},
flow_constants,
Expand Down Expand Up @@ -76,6 +78,7 @@ pub async fn init_db(mut funs: TardisFunsInst) -> TardisResult<()> {
funs.begin().await?;
if check_initialized(&funs, &ctx).await? {
init_basic_info(&funs).await?;
self::modify_post_actions(&funs, &ctx).await?;
} else {
let db_kind = TardisFuns::reldb().backend();
let compatible_type = TardisFuns::reldb().compatible_type();
Expand All @@ -85,7 +88,6 @@ pub async fn init_db(mut funs: TardisFunsInst) -> TardisResult<()> {
funs.db().init(flow_inst::ActiveModel::init(db_kind, None, compatible_type.clone())).await?;
funs.db().init(flow_config::ActiveModel::init(db_kind, None, compatible_type.clone())).await?;
init_rbum_data(&funs, &ctx).await?;
self::modify_post_actions(&funs, &ctx).await?;
};
funs.commit().await?;
Ok(())
Expand Down Expand Up @@ -130,7 +132,8 @@ pub async fn modify_post_actions(funs: &TardisFunsInst, ctx: &TardisContext) ->
id: String,
action_by_post_changes: Value,
}
let transactions = funs.db()
let transactions = funs
.db()
.find_dtos::<FlowTransactionPostAction>(
Query::select()
.columns([
Expand Down
47 changes: 43 additions & 4 deletions middleware/flow/src/serv/flow_external_serv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use tardis::{

use crate::{
dto::flow_external_dto::{
FlowExternalFetchRelObjResp, FlowExternalKind, FlowExternalModifyFieldResp, FlowExternalNotifyChangesResp, FlowExternalParams, FlowExternalReq, FlowExternalResp,
FlowExternalFetchRelObjResp, FlowExternalKind, FlowExternalModifyFieldResp, FlowExternalNotifyChangesResp, FlowExternalParams, FlowExternalQueryFieldResp, FlowExternalReq,
FlowExternalResp,
},
flow_config::FlowConfig,
flow_constants,
Expand All @@ -33,8 +34,6 @@ impl FlowExternalServ {
inst_id: inst_id.to_string(),
curr_tag: tag.to_string(),
curr_bus_obj_id: rel_business_obj_id.to_string(),
target_state: None,
original_state: None,
params: rel_tags
.into_iter()
.map(|tag| FlowExternalParams {
Expand All @@ -44,6 +43,7 @@ impl FlowExternalServ {
value: None,
})
.collect_vec(),
..Default::default()
};
debug!("do_fetch_rel_obj body: {:?}", body);
let resp: FlowExternalResp<FlowExternalFetchRelObjResp> = funs
Expand Down Expand Up @@ -83,6 +83,7 @@ impl FlowExternalServ {
target_state,
original_state,
params,
..Default::default()
};
debug!("do_modify_field body: {:?}", body);
let resp: FlowExternalResp<FlowExternalModifyFieldResp> = funs
Expand Down Expand Up @@ -120,7 +121,7 @@ impl FlowExternalServ {
curr_bus_obj_id: rel_business_obj_id.to_string(),
target_state: Some(target_state),
original_state: Some(original_state),
params: vec![],
..Default::default()
};
debug!("do_notify_changes body: {:?}", body);
let resp: FlowExternalResp<FlowExternalNotifyChangesResp> = funs
Expand Down Expand Up @@ -153,6 +154,44 @@ impl FlowExternalServ {
}
}

pub async fn do_query_field(
tag: &str,
rel_business_obj_ids: Vec<String>,
own_paths: &str,
ctx: &TardisContext,
funs: &TardisFunsInst,
) -> TardisResult<FlowExternalQueryFieldResp> {
let external_url = Self::get_external_url(tag, ctx, funs).await?;
if external_url.is_empty() {
return Ok(FlowExternalQueryFieldResp::default());
}

let header = Self::headers(None, funs, ctx).await?;
let body = FlowExternalReq {
kind: FlowExternalKind::QueryField,
inst_id: "".to_string(),
curr_tag: tag.to_string(),
curr_bus_obj_id: "".to_string(),
owner_paths: own_paths.to_string(),
obj_ids: rel_business_obj_ids,
target_state: None,
original_state: None,
params: vec![],
};
debug!("do_query_field body: {:?}", body);
let resp: FlowExternalResp<FlowExternalQueryFieldResp> = funs
.web_client()
.post(&external_url, &body, header)
.await?
.body
.ok_or_else(|| funs.err().internal_error("flow_external", "do_query_field", "illegal response", "500-external-illegal-response"))?;
if let Some(data) = resp.body {
Ok(data)
} else {
Err(funs.err().internal_error("flow_external", "do_query_field", "illegal response", "500-external-illegal-response"))
}
}

async fn get_external_url(tag: &str, ctx: &TardisContext, funs: &TardisFunsInst) -> TardisResult<String> {
let external_url = SpiKvClient::get_item(format!("{}:config:{}", flow_constants::DOMAIN_CODE, tag), None, funs, ctx)
.await?
Expand Down
Loading