Skip to content

Commit

Permalink
flow: improve code && fix bug (#736)
Browse files Browse the repository at this point in the history
  • Loading branch information
ZzIsGod1019 authored May 20, 2024
1 parent 3357531 commit 8020aa9
Show file tree
Hide file tree
Showing 12 changed files with 113 additions and 73 deletions.
2 changes: 1 addition & 1 deletion backend/middlewares/flow/src/api/cc/flow_cc_model_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ impl FlowCcModelApi {
) -> TardisApiResult<HashMap<String, FlowModelSummaryResp>> {
let mut funs = flow_constants::get_tardis_inst();
funs.begin().await?;
let tag_ids= tag_ids.split(',').map(|tag_id| tag_id.to_string()).collect_vec();
let tag_ids = tag_ids.split(',').map(|tag_id| tag_id.to_string()).collect_vec();
let result = FlowModelServ::find_or_add_models(tag_ids, temp_id.0, is_shared.unwrap_or(false), &funs, &ctx.0).await?;
funs.commit().await?;
TardisResp::ok(result)
Expand Down
2 changes: 1 addition & 1 deletion backend/middlewares/flow/src/domain/flow_transition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use tardis::db::sea_orm::prelude::Json;
use tardis::db::sea_orm::*;
use tardis::{chrono, TardisCreateEntity, TardisEmptyBehavior, TardisEmptyRelation};

use crate::dto::flow_transition_dto::{FlowTransitionPostActionInfo, FlowTransitionDoubleCheckInfo, FlowTransitionFrontActionInfo};
use crate::dto::flow_transition_dto::{FlowTransitionDoubleCheckInfo, FlowTransitionFrontActionInfo, FlowTransitionPostActionInfo};
use crate::dto::flow_var_dto::FlowVarInfo;

/// Transfer / 流转
Expand Down
8 changes: 4 additions & 4 deletions backend/middlewares/flow/src/dto/flow_external_dto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use tardis::web::poem_openapi::{
use super::{flow_state_dto::FlowSysStateKind, flow_transition_dto::FlowTransitionActionByVarChangeInfoChangedKind};

/// External data exchange requests
///
///
/// 对外数据交换请求
#[derive(Serialize, Deserialize, Debug, Default, poem_openapi::Object)]
pub struct FlowExternalReq {
Expand Down Expand Up @@ -68,7 +68,7 @@ pub struct FlowExternalReq {
}

/// Type of request initiated, ex: query field, modification field, status change notification...
///
///
/// 发起请求的类型,例:查询字段,修改字段,状态变更通知..
#[derive(Clone, Debug, Default, PartialEq, Eq, Deserialize, Serialize, poem_openapi::Enum)]
pub enum FlowExternalKind {
Expand All @@ -84,7 +84,7 @@ pub enum FlowExternalKind {
}

/// When kind is ModifyField, the field is modified in a specific way, for example: validate the content, post action, precondition trigger ...
///
///
/// 当 kind 为 ModifyField 时,字段被修改的具体操作方式,例:验证内容,后置动作,前置条件触发..
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Deserialize, Serialize, poem_openapi::Enum)]
pub enum FlowExternalCallbackOp {
Expand All @@ -99,7 +99,7 @@ pub enum FlowExternalCallbackOp {
}

/// 扩展字段
///
///
/// Extended params
#[derive(Debug, Deserialize, Serialize, poem_openapi::Object, Clone)]
pub struct FlowExternalParams {
Expand Down
4 changes: 2 additions & 2 deletions backend/middlewares/flow/src/dto/flow_state_dto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ pub struct FlowStateDetailResp {
}

/// Type of state
///
///
/// 状态类型
#[derive(Clone, Default, Debug, PartialEq, Eq, Deserialize, Serialize, poem_openapi::Enum, EnumIter, sea_orm::DeriveActiveEnum)]
#[sea_orm(rs_type = "String", db_type = "String(Some(255))")]
Expand Down Expand Up @@ -208,4 +208,4 @@ pub struct FlowStateAggResp {
pub is_init: bool,
pub ext: FlowStateRelModelExt,
pub transitions: Vec<FlowTransitionDetailResp>,
}
}
28 changes: 14 additions & 14 deletions backend/middlewares/flow/src/dto/flow_transition_dto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,18 +105,18 @@ pub struct FlowTransitionModifyReq {
pub double_check: Option<FlowTransitionDoubleCheckInfo>,
/// 验证内容
pub vars_collect: Option<Vec<FlowVarInfo>>,
/// 是否通知
pub is_notify: Option<bool>,
/// 触发前回调的配置信息
pub action_by_pre_callback: Option<String>,
/// 触发后回调的配置信息
pub action_by_post_callback: Option<String>,
/// 后置动作的配置信息
pub action_by_post_changes: Option<Vec<FlowTransitionPostActionInfo>>,
/// 前置动作的配置信息
pub action_by_front_changes: Option<Vec<FlowTransitionFrontActionInfo>>,
/// 排序
pub sort: Option<i64>,
/// 是否通知
pub is_notify: Option<bool>,
/// 触发前回调的配置信息
pub action_by_pre_callback: Option<String>,
/// 触发后回调的配置信息
pub action_by_post_callback: Option<String>,
/// 后置动作的配置信息
pub action_by_post_changes: Option<Vec<FlowTransitionPostActionInfo>>,
/// 前置动作的配置信息
pub action_by_front_changes: Option<Vec<FlowTransitionFrontActionInfo>>,
/// 排序
pub sort: Option<i64>,
}

#[derive(Serialize, Deserialize, Default, Debug, Clone, poem_openapi::Object, sea_orm::FromQueryResult)]
Expand Down Expand Up @@ -472,8 +472,8 @@ pub enum StateChangeConditionOp {
}

/// 对象tag的关联类型。当 kind 为 State 时该字段生效,当 kind 为 var 时该字段统一为 default。
/// 目前有默认和父子关系两种。为空时,代表是默认关联类型。当值为 Default 时,obj_tag 为 req/issue/test/task等。当值为 ParentOrSub 时,obj_tag 为 parent/sub.
/// 例:当值为 ParentOrSub,obj_tag 为 parent。表示为当前操作对象所关联的父级对象。
/// 目前有默认和父子关系两种。为空时,代表是默认关联类型。当值为 Default 时,obj_tag 为 req/issue/test/task等。当值为 ParentOrSub 时,obj_tag 为 parent/sub.
/// 例:当值为 ParentOrSub,obj_tag 为 parent。表示为当前操作对象所关联的父级对象。
#[derive(Serialize, Deserialize, Clone, PartialEq, Debug, poem_openapi::Enum)]
pub enum TagRelKind {
Default,
Expand Down
35 changes: 22 additions & 13 deletions backend/middlewares/flow/src/serv/flow_event_serv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,14 @@ impl FlowEventServ {
ctx,
)
.await?;
let next_flow_transition = flow_model.transitions().into_iter().find(|trans| trans.id == flow_transition_id);
let model_transition = flow_model.transitions();
let next_flow_transition = model_transition.iter().find(|trans| trans.id == flow_transition_id);
if next_flow_transition.is_none() {
return Err(funs.err().not_found("flow_inst", "transfer", "no transferable state", "404-flow-inst-transfer-state-not-found"));
}
let next_flow_transition = next_flow_transition.unwrap();
let prev_flow_state = FlowStateServ::get_item(
&flow_inst_detail.current_state_id,
&next_flow_transition.from_flow_state_id,
&FlowStateFilterReq {
basic: RbumBasicFilterReq {
with_sub_own_paths: true,
Expand All @@ -197,13 +198,11 @@ impl FlowEventServ {
)
.await?;

let model_transition = flow_model.transitions();
let next_transition_detail = model_transition.iter().find(|trans| trans.id == flow_transition_id).unwrap().to_owned();
if FlowModelServ::check_post_action_ring(next_transition_detail.clone(), (false, vec![]), funs, ctx).await?.0 {
if FlowModelServ::check_post_action_ring(next_flow_transition.clone(), (false, vec![]), funs, ctx).await?.0 {
return Err(funs.err().not_found("flow_inst", "transfer", "this post action exist endless loop", "500-flow-transition-endless-loop"));
}

let post_changes = next_transition_detail.action_by_post_changes();
let post_changes = next_flow_transition.action_by_post_changes();
if post_changes.is_empty() {
return Ok(());
}
Expand All @@ -225,18 +224,28 @@ impl FlowEventServ {
FlowTransitionActionByVarChangeInfoChangedKind::AddOrSub => {
if change_info.changed_val.is_some()
&& change_info.changed_val.clone().unwrap().is_object()
&& flow_inst_detail.current_vars.clone().unwrap_or_default().get(&change_info.var_name).is_some()
&& change_info.changed_val.clone().unwrap().as_object().unwrap().get("value").is_some()
&& change_info.changed_val.clone().unwrap().as_object().unwrap().get("op").is_some()
{
let original_map = flow_inst_detail.current_vars.clone().unwrap_or_default();
let original_value = if let Some(original_value) = FlowInstServ::find_var_by_inst_id(flow_inst_id, &change_info.var_name, funs, ctx).await? {
Some(original_value)
} else {
FlowInstServ::find_var_by_inst_id(flow_inst_id, &format!("custom_{}", change_info.var_name), funs, ctx).await?
};

let target_value = change_info.changed_val.clone().unwrap().as_object().unwrap().get("value").unwrap().as_i64().unwrap_or_default();
let changed_op = change_info.changed_val.clone().unwrap().as_object().unwrap().get("op").unwrap().as_str().unwrap_or_default().to_string();
if let Some(original_value) = original_map.get(&change_info.var_name) {
if let Some(original_value) = original_value {
change_info.changed_kind = Some(FlowTransitionActionByVarChangeInfoChangedKind::ChangeContent);
match changed_op.as_str() {
"add" => change_info.changed_val = Some(json!(original_value.as_str().unwrap_or_default().parse::<i64>().unwrap_or_default() + target_value)),
"sub" => change_info.changed_val = Some(json!(original_value.as_str().unwrap_or_default().parse::<i64>().unwrap_or_default() - target_value)),
"add" => {
change_info.changed_val =
Some(json!(original_value.as_str().unwrap_or_default().parse::<i64>().unwrap_or_default() + target_value))
}
"sub" => {
change_info.changed_val =
Some(json!(original_value.as_str().unwrap_or_default().parse::<i64>().unwrap_or_default() - target_value))
}
_ => {}
}
}
Expand All @@ -261,7 +270,7 @@ impl FlowEventServ {
let inst_id = FlowInstServ::get_inst_ids_by_rel_business_obj_id(vec![rel_bus_obj_id.clone()], funs, ctx).await?.pop().unwrap_or_default();
FlowExternalServ::do_modify_field(
&rel_tag,
&next_transition_detail,
next_flow_transition,
&rel_bus_obj_id,
&inst_id,
FlowExternalCallbackOp::PostAction,
Expand Down Expand Up @@ -325,7 +334,7 @@ impl FlowEventServ {
if !modify_self_field_params.is_empty() {
FlowExternalServ::do_modify_field(
&flow_model.tag,
&next_transition_detail,
next_flow_transition,
&flow_inst_detail.rel_business_obj_id,
&flow_inst_detail.id,
FlowExternalCallbackOp::PostAction,
Expand Down
17 changes: 17 additions & 0 deletions backend/middlewares/flow/src/serv/flow_inst_serv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1223,4 +1223,21 @@ impl FlowInstServ {

Ok(())
}

pub async fn find_var_by_inst_id(inst_id: &str, key: &str, funs: &TardisFunsInst, ctx: &TardisContext) -> TardisResult<Option<Value>> {
let mut current_vars = Self::get(inst_id, funs, ctx).await?.current_vars;
if current_vars.is_none() || current_vars.clone().unwrap_or_default().get(key).is_none() {
let new_vars = Self::get_new_vars(inst_id, funs, ctx).await?;
Self::modify_current_vars(
inst_id,
&TardisFuns::json.json_to_obj::<HashMap<String, Value>>(new_vars).unwrap_or_default(),
funs,
ctx,
)
.await?;
current_vars = Self::get(inst_id, funs, ctx).await?.current_vars;
}

Ok(current_vars.unwrap_or_default().get(key).cloned())
}
}
26 changes: 13 additions & 13 deletions backend/middlewares/flow/src/serv/flow_model_serv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -741,28 +741,28 @@ impl FlowModelServ {
// First iterate over the models
for model in models {
if tags.contains(&model.tag) {
result.insert(
model.tag.clone(),
model,
);
result.insert(model.tag.clone(), model);
}
}
// Iterate over the tag based on the existing result and get the default model
for tag in tags {
if !result.contains_key(&tag) {
// copy custom model
let model_id = Self::add_custom_model(&tag, None, template_id.clone(), funs, ctx).await?;
let added_model = Self::find_one_item(&FlowModelFilterReq {
basic: RbumBasicFilterReq {
ids: Some(vec![model_id]),
let added_model = Self::find_one_item(
&FlowModelFilterReq {
basic: RbumBasicFilterReq {
ids: Some(vec![model_id]),
..Default::default()
},
..Default::default()
},
..Default::default()
}, funs, ctx).await?.unwrap_or_default();
result.insert(
tag.to_string(),
added_model,
);
funs,
ctx,
)
.await?
.unwrap_or_default();
result.insert(tag.to_string(), added_model);
}
}

Expand Down
16 changes: 5 additions & 11 deletions backend/middlewares/flow/tests/test_flow_scenes_fsm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use bios_mw_flow::dto::flow_model_dto::{
};
use bios_mw_flow::dto::flow_state_dto::{FlowStateAddReq, FlowStateRelModelExt, FlowStateSummaryResp, FlowSysStateKind};
use bios_mw_flow::dto::flow_transition_dto::{
FlowTransitionActionByVarChangeInfoChangedKind, FlowTransitionPostActionInfo, FlowTransitionActionChangeKind, FlowTransitionAddReq, FlowTransitionDoubleCheckInfo,
FlowTransitionModifyReq, FlowTransitionSortStateInfoReq, FlowTransitionSortStatesReq, StateChangeCondition, StateChangeConditionItem, StateChangeConditionOp,
FlowTransitionActionByVarChangeInfoChangedKind, FlowTransitionActionChangeKind, FlowTransitionAddReq, FlowTransitionDoubleCheckInfo, FlowTransitionModifyReq,
FlowTransitionPostActionInfo, FlowTransitionSortStateInfoReq, FlowTransitionSortStatesReq, StateChangeCondition, StateChangeConditionItem, StateChangeConditionOp,
};

use bios_mw_flow::dto::flow_var_dto::{FlowVarInfo, RbumDataTypeKind, RbumWidgetTypeKind};
Expand Down Expand Up @@ -70,9 +70,7 @@ pub async fn test(flow_client: &mut TestHttpClient) -> TardisResult<()> {
let mut modify_configs = vec![];
let tags = vec!["REQ", "PROJ", "ITER", "TICKET", "MOCK"];
for tag in tags {
modify_configs.push(FlowModelAddCustomModelItemReq {
tag: tag.to_string(),
});
modify_configs.push(FlowModelAddCustomModelItemReq { tag: tag.to_string() });
}
let result: Vec<FlowModelAddCustomModelResp> = flow_client
.post(
Expand Down Expand Up @@ -503,9 +501,7 @@ pub async fn test(flow_client: &mut TestHttpClient) -> TardisResult<()> {
"/cc/model/add_custom_model",
&FlowModelAddCustomModelReq {
proj_template_id: Some(share_template_id.clone()),
bind_model_objs: vec![FlowModelAddCustomModelItemReq {
tag: "REQ".to_string(),
}],
bind_model_objs: vec![FlowModelAddCustomModelItemReq { tag: "REQ".to_string() }],
},
)
.await;
Expand Down Expand Up @@ -597,9 +593,7 @@ pub async fn test(flow_client: &mut TestHttpClient) -> TardisResult<()> {
let mut modify_configs = vec![];
let tags = vec!["REQ", "PROJ", "ITER", "TICKET"];
for tag in tags {
modify_configs.push(FlowModelAddCustomModelItemReq {
tag: tag.to_string(),
});
modify_configs.push(FlowModelAddCustomModelItemReq { tag: tag.to_string() });
}
let result: Vec<FlowModelAddCustomModelResp> = flow_client
.post(
Expand Down
2 changes: 1 addition & 1 deletion backend/spi/spi-search/src/api/ci/search_ci_item_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl SearchCiItemApi {
}

/// Refresh TSV Result By Tag
///
///
/// 通过指定 tag 刷新分词结果
#[oai(path = "/:tag/refresh", method = "put")]
async fn refresh_tsv(&self, tag: Path<String>, ctx: TardisContextExtractor) -> TardisApiResult<Void> {
Expand Down
4 changes: 2 additions & 2 deletions backend/spi/spi-search/src/serv/es/search_es_item_serv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,6 @@ pub async fn query_metrics(_query_req: &SearchQueryMetricsReq, funs: &TardisFuns
Err(funs.err().format_error("search_es_item_serv", "query_metrics", "not supports", "500-not-supports"))
}

pub async fn refresh_tsv(tag: &str, funs: &TardisFunsInst, _ctx: &TardisContext, _inst: &SpiBsInst) -> TardisResult<()> {
pub async fn refresh_tsv(_tag: &str, funs: &TardisFunsInst, _ctx: &TardisContext, _inst: &SpiBsInst) -> TardisResult<()> {
Err(funs.err().format_error("search_es_item_serv", "refresh_tsv", "not supports", "500-not-supports"))
}
}
42 changes: 31 additions & 11 deletions backend/spi/spi-search/src/serv/pg/search_pg_item_serv.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
use std::{collections::HashMap, vec};

use itertools::Itertools;
use pinyin::{to_pinyin_vec, Pinyin};
use tardis::{
basic::{dto::TardisContext, error::TardisError, result::TardisResult}, chrono::Utc, db::{
basic::{dto::TardisContext, error::TardisError, result::TardisResult},
chrono::Utc,
db::{
reldb_client::{TardisRelDBClient, TardisRelDBlConnection},
sea_orm::{FromQueryResult, Value},
}, futures::future::join_all, serde_json::{self, json, Map}, web::web_resp::TardisPage, TardisFuns, TardisFunsInst
},
futures::future::join_all,
serde_json::{self, json, Map},
web::web_resp::TardisPage,
TardisFuns, TardisFunsInst,
};
use itertools::Itertools;

use bios_basic::{dto::BasicQueryCondInfo, enumeration::BasicQueryOpKind, helper::db_helper, spi::spi_funs::SpiBsInst};

use crate::{
dto::search_item_dto::{
AdvBasicQueryCondInfo, SearchItemAddReq, SearchItemModifyReq, SearchItemSearchQScopeKind, SearchItemSearchReq, SearchItemSearchResp, SearchQueryMetricsReq, SearchQueryMetricsResp
AdvBasicQueryCondInfo, SearchItemAddReq, SearchItemModifyReq, SearchItemSearchQScopeKind, SearchItemSearchReq, SearchItemSearchResp, SearchQueryMetricsReq,
SearchQueryMetricsResp,
},
search_config::SearchConfig,
};
Expand Down Expand Up @@ -1569,12 +1576,25 @@ pub async fn refresh_tsv(tag: &str, funs: &TardisFunsInst, ctx: &TardisContext,
let (conn, table_name) = search_pg_initializer::init_table_and_conn(bs_inst, tag, ctx, false).await?;
let result = conn.query_all(&format!("SELECT key, title FROM {table_name}"), vec![]).await?;
join_all(
result.into_iter().map(|row| async move {
modify(tag, row.try_get::<String>("", "key").expect("not found key").as_str(), &mut SearchItemModifyReq {
title: Some(row.try_get("", "title").expect("not found title")),
..Default::default()
}, funs, ctx, inst).await.expect("modify error")
}).collect_vec(),
).await;
result
.into_iter()
.map(|row| async move {
modify(
tag,
row.try_get::<String>("", "key").expect("not found key").as_str(),
&mut SearchItemModifyReq {
title: Some(row.try_get("", "title").expect("not found title")),
..Default::default()
},
funs,
ctx,
inst,
)
.await
.expect("modify error")
})
.collect_vec(),
)
.await;
Ok(())
}

0 comments on commit 8020aa9

Please sign in to comment.