Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/ideal-world/bios
Browse files Browse the repository at this point in the history
  • Loading branch information
gudaoxuri committed May 7, 2024
2 parents 77a4868 + 1aa0475 commit 849e4de
Show file tree
Hide file tree
Showing 25 changed files with 126 additions and 210 deletions.
14 changes: 6 additions & 8 deletions backend/basic/src/helper/request_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,12 @@ pub async fn try_set_real_ip_from_req_to_ctx(request: &Request, ctx: &TardisCont
/// assert_eq!(parse_forwarded_ip("Forwarded: proto=http; for=192.168.0"), None);
/// ```
pub fn parse_forwarded_ip(forwarded_value: &str) -> Option<IpAddr> {
forwarded_value
.strip_prefix("Forwarded: ")
.and_then(|forwarded_value| {
forwarded_value
.split(';')
.find(|part| part.trim().starts_with("for="))
.and_then(|part| part.trim()[4..].split(',').next().and_then(|ip_str| IpAddr::from_str(ip_str).ok()))
})
forwarded_value.strip_prefix("Forwarded: ").and_then(|forwarded_value| {
forwarded_value
.split(';')
.find(|part| part.trim().starts_with("for="))
.and_then(|part| part.trim()[4..].split(',').next().and_then(|ip_str| IpAddr::from_str(ip_str).ok()))
})
}

/// Try to get real ip from request
Expand Down
4 changes: 2 additions & 2 deletions backend/basic/src/rbum/dto/rbum_kind_dto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ pub struct RbumKindAddReq {
/// Resource kind module
///
/// 资源类型模块
///
///
/// Default is ``empty``
///
///
/// 默认为 ``空``
///
/// Used to further divide the resource kind. For example, there are multiple resource kinds under the ``cmdb compute`` module, such as ``ecs, ec2, k8s``.
Expand Down
2 changes: 1 addition & 1 deletion backend/basic/src/rbum/rbum_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::fmt::Debug;
use std::sync::Mutex;

use lazy_static::lazy_static;
use serde::{de, Deserialize, Serialize};
use serde::{Deserialize, Serialize};
use tardis::basic::error::TardisError;
use tardis::basic::result::TardisResult;
use tardis::TardisFunsInst;
Expand Down
34 changes: 23 additions & 11 deletions backend/middlewares/flow/src/api/cc/flow_cc_inst_api.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::HashMap;

use serde_json::Value;
use tardis::web::context_extractor::TardisContextExtractor;
use tardis::web::poem::Request;
use tardis::web::poem_openapi;
Expand All @@ -20,7 +21,8 @@ pub struct FlowCcInstApi;
/// Flow instance process API
#[poem_openapi::OpenApi(prefix_path = "/cc/inst")]
impl FlowCcInstApi {
/// Start Instance / 启动实例
/// Start Instance(Return Instance ID)
/// 启动实例(返回实例ID)
#[oai(path = "/", method = "post")]
async fn start(&self, add_req: Json<FlowInstStartReq>, ctx: TardisContextExtractor, _request: &Request) -> TardisApiResult<String> {
let mut funs = flow_constants::get_tardis_inst();
Expand All @@ -30,7 +32,8 @@ impl FlowCcInstApi {
TardisResp::ok(result)
}

/// Abort Instance / 中止实例
/// Abort Instance
/// 终止实例
#[oai(path = "/:flow_inst_id", method = "put")]
async fn abort(&self, flow_inst_id: Path<String>, abort_req: Json<FlowInstAbortReq>, ctx: TardisContextExtractor, _request: &Request) -> TardisApiResult<Void> {
let mut funs = flow_constants::get_tardis_inst();
Expand All @@ -40,15 +43,17 @@ impl FlowCcInstApi {
TardisResp::ok(Void {})
}

/// Get Instance By Instance Id / 获取实例信息
/// Get Instance By Instance Id
/// 获取实例信息
#[oai(path = "/:flow_inst_id", method = "get")]
async fn get(&self, flow_inst_id: Path<String>, ctx: TardisContextExtractor, _request: &Request) -> TardisApiResult<FlowInstDetailResp> {
let funs = flow_constants::get_tardis_inst();
let result = FlowInstServ::get(&flow_inst_id.0, &funs, &ctx.0).await?;
TardisResp::ok(result)
}

/// Find Instances / 获取实例列表
/// Find Instances
/// 获取实例列表
#[oai(path = "/", method = "get")]
async fn paginate(
&self,
Expand All @@ -66,7 +71,8 @@ impl FlowCcInstApi {
TardisResp::ok(result)
}

/// Find Next Transitions / 获取下一个流转状态列表
/// Find Next Transitions
/// 获取下一个流转状态列表
#[oai(path = "/:flow_inst_id/transition/next", method = "put")]
async fn find_next_transitions(
&self,
Expand All @@ -80,7 +86,8 @@ impl FlowCcInstApi {
TardisResp::ok(result)
}

/// Find the state and transfer information of the specified model in batch / 批量获取指定模型的状态及流转信息
/// Find the state and transfer information of the specified model in batch
/// 批量获取指定模型的状态及流转信息
#[oai(path = "/batch/state_transitions", method = "put")]
async fn find_state_and_next_transitions(
&self,
Expand All @@ -93,7 +100,8 @@ impl FlowCcInstApi {
TardisResp::ok(result)
}

/// Transfer State By State Id / 流转
/// Transfer State By Transaction Id
/// 通过动作ID流转状态
#[oai(path = "/:flow_inst_id/transition/transfer", method = "put")]
async fn transfer(
&self,
Expand All @@ -111,7 +119,8 @@ impl FlowCcInstApi {
TardisResp::ok(result)
}

/// Batch transfer State By State Id / 批量流转
/// Batch transfer State By Transaction Id
/// 批量流转
#[oai(path = "/batch/:flow_inst_ids/transition/transfer", method = "put")]
async fn batch_transfer(
&self,
Expand All @@ -138,7 +147,8 @@ impl FlowCcInstApi {
TardisResp::ok(result)
}

/// Modify Assigned / 同步执行人信息
/// Modify Assigned[Deprecated]
/// 同步执行人信息[已废弃]
#[oai(path = "/:flow_inst_id/transition/modify_assigned", method = "post")]
async fn modify_assigned(
&self,
Expand All @@ -149,12 +159,14 @@ impl FlowCcInstApi {
) -> TardisApiResult<Void> {
let mut funs = flow_constants::get_tardis_inst();
funs.begin().await?;
FlowInstServ::modify_assigned(&flow_inst_id.0, &modify_req.0.current_assigned, &funs, &ctx.0).await?;
let vars = HashMap::from([("current_assigned".to_string(), Value::String(modify_req.0.current_assigned))]);
FlowInstServ::modify_current_vars(&flow_inst_id.0, &vars, &funs, &ctx.0).await?;
funs.commit().await?;
TardisResp::ok(Void {})
}

/// Modify list of variables / 同步当前变量列表
/// Modify list of variables
/// 同步当前变量列表
#[oai(path = "/:flow_inst_id/modify_current_vars", method = "patch")]
async fn modify_current_vars(
&self,
Expand Down
4 changes: 3 additions & 1 deletion backend/middlewares/flow/src/api/ci/flow_ci_inst_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::collections::HashMap;

use bios_basic::rbum::helper::rbum_scope_helper::check_without_owner_and_unsafe_fill_ctx;
use tardis::log::debug;
use tardis::serde_json::Value;
use tardis::web::context_extractor::TardisContextExtractor;
use tardis::web::poem::web::Path;
use tardis::web::poem::Request;
Expand Down Expand Up @@ -128,7 +129,8 @@ impl FlowCiInstApi {
let mut funs = flow_constants::get_tardis_inst();
check_without_owner_and_unsafe_fill_ctx(request, &funs, &mut ctx.0)?;
funs.begin().await?;
FlowInstServ::modify_assigned(&flow_inst_id.0, &modify_req.0.current_assigned, &funs, &ctx.0).await?;
let vars = HashMap::from([("assigned_to".to_string(), Value::String(modify_req.0.current_assigned))]);
FlowInstServ::modify_current_vars(&flow_inst_id.0, &vars, &funs, &ctx.0).await?;
funs.commit().await?;
TardisResp::ok(Void {})
}
Expand Down
3 changes: 0 additions & 3 deletions backend/middlewares/flow/src/domain/flow_inst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,4 @@ pub struct Model {
pub transitions: Option<Vec<FlowInstTransitionInfo>>,

pub own_paths: String,

/// Current Assigned / 指定执行人
pub current_assigned: Option<String>,
}
4 changes: 1 addition & 3 deletions backend/middlewares/flow/src/dto/flow_inst_dto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ pub struct FlowInstSummaryResp {
pub rel_business_obj_id: String,

pub current_state_id: String,
pub current_assigned: Option<String>,

pub create_ctx: FlowOperationContext,
pub create_time: DateTime<Utc>,
Expand All @@ -90,7 +89,6 @@ pub struct FlowInstDetailResp {
pub current_state_kind: Option<FlowSysStateKind>,
pub current_state_ext: Option<String>,

pub current_assigned: Option<String>,
pub current_vars: Option<HashMap<String, Value>>,

pub create_vars: Option<HashMap<String, Value>>,
Expand Down Expand Up @@ -141,7 +139,7 @@ pub struct FlowInstFindNextTransitionsReq {
pub vars: Option<HashMap<String, Value>>,
}

#[derive(Serialize, Deserialize, Debug, poem_openapi::Object)]
#[derive(Serialize, Deserialize, Debug, poem_openapi::Object, Clone)]
pub struct FlowInstFindNextTransitionResp {
pub next_flow_transition_id: String,
pub next_flow_transition_name: String,
Expand Down
2 changes: 2 additions & 0 deletions backend/middlewares/flow/src/dto/flow_transition_dto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,8 @@ pub enum FlowTransitionActionByVarChangeInfoChangedKind {
AutoGetOperator,
#[sea_orm(string_value = "select_field")]
SelectField,
#[sea_orm(string_value = "and_or_subs")]
AddOrSub,
}

#[derive(Serialize, Deserialize, Clone, PartialEq, Debug, poem_openapi::Object, sea_orm::FromJsonQueryResult)]
Expand Down
14 changes: 1 addition & 13 deletions backend/middlewares/flow/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use tardis::{
};

use crate::{
flow_constants::{self, get_tardis_inst, EVENT_FRONT_CHANGE, EVENT_MODIFY_ASSIGNED, EVENT_POST_CHANGE},
flow_constants::{self, get_tardis_inst, EVENT_FRONT_CHANGE, EVENT_POST_CHANGE},
serv::flow_event_serv::FlowEventServ,
};
pub const RECONNECT_INTERVAL: Duration = Duration::from_secs(10);
Expand Down Expand Up @@ -44,18 +44,6 @@ pub async fn start_flow_event_service(config: &EventTopicConfig) -> TardisResult
});
}
Some(EVENT_POST_CHANGE) => {}
Some(EVENT_MODIFY_ASSIGNED) => {
let Ok((inst_id, assigned_id, ctx)) = TardisFuns::json.json_to_obj::<(String, String, _)>(msg) else {
return None;
};
tokio::spawn(async move {
let funs = get_tardis_inst();
let result = FlowEventServ::do_modify_assigned(&inst_id, &assigned_id, &ctx, &funs).await;
if let Err(err) = result {
error!("[BIOS.Log] failed to do front change: {}, inst_id: {}", err, inst_id);
}
});
}
Some(unknown_event) => {
warn!("[BIOS.Flow] event receive unknown event {unknown_event}")
}
Expand Down
1 change: 0 additions & 1 deletion backend/middlewares/flow/src/flow_constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ pub const EVENT_FRONT_CHANGE: &str = "event_front_change";
pub const EVENT_POST_CHANGE: &str = "event_post_change";
pub const EVENT_UPDATE_STATE: &str = "event_update_state";
pub const EVENT_MODIFY_FIELD: &str = "event_modify_field";
pub const EVENT_MODIFY_ASSIGNED: &str = "event_modify_assigned";

pub fn get_tardis_inst() -> TardisFunsInst {
TardisFuns::inst_with_db_conn(DOMAIN_CODE.to_string(), None)
Expand Down
16 changes: 1 addition & 15 deletions backend/middlewares/flow/src/serv/clients/event_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,16 @@ use tardis::{
TardisFuns,
};

use crate::flow_constants::{EVENT_FRONT_CHANGE, EVENT_MODIFY_ASSIGNED, EVENT_POST_CHANGE};
use crate::flow_constants::{EVENT_FRONT_CHANGE, EVENT_POST_CHANGE};

#[async_trait]
pub trait FlowEventExt {
async fn publish_modify_assigned(&self, inst_id: String, assigned_id: String, from: String, spi_app_id: String, ctx: &TardisContext) -> TardisResult<()>;
async fn publish_front_change(&self, inst_id: String, from: String, spi_app_id: String, ctx: &TardisContext) -> TardisResult<()>;
async fn publish_post_change(&self, inst_id: String, next_transition_id: String, from: String, spi_app_id: String, ctx: &TardisContext) -> TardisResult<()>;
}

#[async_trait]
impl FlowEventExt for TardisWSClient {
async fn publish_modify_assigned(&self, inst_id: String, assigned_id: String, from: String, spi_app_id: String, ctx: &TardisContext) -> TardisResult<()> {
let spi_ctx = TardisContext { owner: spi_app_id, ..ctx.clone() };
let req = TardisWebsocketReq {
msg: TardisFuns::json.obj_to_json(&(inst_id, assigned_id, spi_ctx)).expect("invalid json"),
to_avatars: Some(vec!["flow/service".into()]),
from_avatar: from,
event: Some(EVENT_MODIFY_ASSIGNED.into()),
..Default::default()
};
info!("event add log {}", TardisFuns::json.obj_to_string(&req).expect("invalid json"));
self.send_obj(&req).await?;
return Ok(());
}
async fn publish_front_change(&self, inst_id: String, from: String, spi_app_id: String, ctx: &TardisContext) -> TardisResult<()> {
let spi_ctx = TardisContext { owner: spi_app_id, ..ctx.clone() };
let req = TardisWebsocketReq {
Expand Down
73 changes: 28 additions & 45 deletions backend/middlewares/flow/src/serv/flow_event_serv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use tardis::{
db::sea_orm::{
self,
sea_query::{Expr, Query},
Set,
},
TardisFunsInst,
};
Expand Down Expand Up @@ -166,17 +165,6 @@ impl FlowEventServ {
ctx,
)
.await?;
// let flow_transitions = flow_model
// .transitions()
// .into_iter()
// .filter(|trans| trans.from_flow_state_id == flow_inst_detail.current_state_id && !trans.action_by_post_changes().is_empty())
// .sorted_by_key(|trans| trans.sort)
// .collect_vec();
// if flow_transitions.is_empty() {
// return Ok(());
// }
// let next_flow_transition =
// FlowInstServ::do_find_next_transitions(&flow_inst_detail, &flow_model, Some(flow_transition_id.to_string()), &None, true, funs, ctx).await?.next_flow_transitions.pop();
let next_flow_transition = flow_model.transitions().into_iter().find(|trans| trans.id == flow_transition_id);
if next_flow_transition.is_none() {
return Err(funs.err().not_found("flow_inst", "transfer", "no transferable state", "404-flow-inst-transfer-state-not-found"));
Expand Down Expand Up @@ -228,9 +216,34 @@ impl FlowEventServ {
match post_change.kind {
FlowTransitionActionChangeKind::Var => {
if let Some(mut change_info) = post_change.var_change_info {
if change_info.changed_kind.is_some() && change_info.changed_kind.clone().unwrap() == FlowTransitionActionByVarChangeInfoChangedKind::AutoGetOperateTime {
change_info.changed_val = Some(json!(Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true)));
change_info.changed_kind = Some(FlowTransitionActionByVarChangeInfoChangedKind::ChangeContent);
if change_info.changed_kind.is_some() {
match change_info.changed_kind.clone().unwrap() {
FlowTransitionActionByVarChangeInfoChangedKind::AutoGetOperateTime => {
change_info.changed_val = Some(json!(Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true)));
change_info.changed_kind = Some(FlowTransitionActionByVarChangeInfoChangedKind::ChangeContent);
}
FlowTransitionActionByVarChangeInfoChangedKind::AddOrSub => {
if change_info.changed_val.is_some()
&& change_info.changed_val.clone().unwrap().is_object()
&& flow_inst_detail.current_vars.clone().unwrap_or_default().get(&change_info.var_name).is_some()
&& change_info.changed_val.clone().unwrap().as_object().unwrap().get("value").is_some()
&& change_info.changed_val.clone().unwrap().as_object().unwrap().get("op").is_some()
{
let original_map = flow_inst_detail.current_vars.clone().unwrap_or_default();
let target_value = change_info.changed_val.clone().unwrap().as_object().unwrap().get("value").unwrap().as_i64().unwrap_or_default();
let changed_op = change_info.changed_val.clone().unwrap().as_object().unwrap().get("op").unwrap().as_str().unwrap_or_default().to_string();
if let Some(original_value) = original_map.get(&change_info.var_name) {
change_info.changed_kind = Some(FlowTransitionActionByVarChangeInfoChangedKind::ChangeContent);
match changed_op.as_str() {
"add" => change_info.changed_val = Some(json!(original_value.as_i64().unwrap_or_default() + target_value)),
"sub" => change_info.changed_val = Some(json!(original_value.as_i64().unwrap_or_default() - target_value)),
_ => {}
}
}
}
}
_ => {}
};
}
let rel_tag = change_info.obj_tag.unwrap_or_default();
if !rel_tag.is_empty() {
Expand Down Expand Up @@ -489,34 +502,4 @@ impl FlowEventServ {
}
Ok(())
}

pub async fn do_modify_assigned(flow_inst_id: &str, assigned_id: &str, ctx: &TardisContext, funs: &TardisFunsInst) -> TardisResult<()> {
if funs
.db()
.count(
Query::select()
.column((flow_inst::Entity, flow_inst::Column::Id))
.from(flow_inst::Entity)
.and_where(Expr::col((flow_inst::Entity, flow_inst::Column::Id)).eq(flow_inst_id.to_string()))
.and_where(Expr::col((flow_inst::Entity, flow_inst::Column::OwnPaths)).like(format!("{}%", ctx.own_paths))),
)
.await?
== 0
{
return Err(funs.err().not_found(
"flow_inst",
"modify_assigned",
&format!("flow instance {} not found", flow_inst_id),
"404-flow-inst-not-found",
));
}
let flow_inst = flow_inst::ActiveModel {
id: Set(flow_inst_id.to_string()),
current_assigned: Set(Some(assigned_id.to_string())),
..Default::default()
};
funs.db().update_one(flow_inst, ctx).await?;

Ok(())
}
}
Loading

0 comments on commit 849e4de

Please sign in to comment.