Skip to content

Commit

Permalink
flow:support parent/sub rel & modify field with notify data (#524)
Browse files Browse the repository at this point in the history
  • Loading branch information
ZzIsGod1019 authored Nov 9, 2023
1 parent eb8dd0a commit 1073bbf
Show file tree
Hide file tree
Showing 10 changed files with 93 additions and 44 deletions.
4 changes: 4 additions & 0 deletions middleware/flow/src/dto/flow_external_dto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@ use tardis::web::poem_openapi::{
types::{ParseFromJSON, ToJSON},
};

use super::flow_state_dto::FlowSysStateKind;

#[derive(Serialize, Deserialize, Debug, Default, poem_openapi::Object)]
pub struct FlowExternalReq {
pub kind: FlowExternalKind,
pub curr_tag: String,
pub curr_bus_obj_id: String,
pub inst_id: String,
pub target_state: Option<String>,
pub target_sys_state: Option<FlowSysStateKind>,
pub original_state: Option<String>,
pub original_sys_state: Option<FlowSysStateKind>,
pub owner_paths: String,
pub obj_ids: Vec<String>,
pub params: Vec<FlowExternalParams>,
Expand Down
21 changes: 21 additions & 0 deletions middleware/flow/src/dto/flow_transition_dto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ pub struct FlowTransitionActionChangeInfo {
pub kind: FlowTransitionActionChangeKind,
pub describe: String,
pub obj_tag: Option<String>,
pub obj_tag_rel_kind: Option<TagRelKind>,
pub obj_current_state_id: Option<Vec<String>>,
pub change_condition: Option<StateChangeCondition>,
pub changed_state_id: String,
Expand All @@ -219,6 +220,7 @@ impl From<FlowTransitionActionChangeInfo> for FlowTransitionActionChangeAgg {
var_change_info: None,
state_change_info: Some(FlowTransitionActionByStateChangeInfo {
obj_tag: value.obj_tag.unwrap(),
obj_tag_rel_kind: value.obj_tag_rel_kind,
describe: value.describe,
obj_current_state_id: value.obj_current_state_id,
change_condition: value.change_condition,
Expand All @@ -231,6 +233,7 @@ impl From<FlowTransitionActionChangeInfo> for FlowTransitionActionChangeAgg {
current: value.current,
describe: value.describe,
obj_tag: value.obj_tag,
obj_tag_rel_kind: value.obj_tag_rel_kind,
var_name: value.var_name,
changed_val: value.changed_val,
changed_kind: value.changed_kind,
Expand Down Expand Up @@ -262,6 +265,7 @@ pub struct FlowTransitionActionByVarChangeInfo {
pub current: bool,
pub describe: String,
pub obj_tag: Option<String>,
pub obj_tag_rel_kind: Option<TagRelKind>,
pub var_name: String,
pub changed_val: Option<Value>,
pub changed_kind: Option<FlowTransitionActionByVarChangeInfoChangedKind>,
Expand All @@ -281,6 +285,7 @@ pub enum FlowTransitionActionByVarChangeInfoChangedKind {
#[derive(Serialize, Deserialize, Clone, PartialEq, Debug, poem_openapi::Object, sea_orm::FromJsonQueryResult)]
pub struct FlowTransitionActionByStateChangeInfo {
pub obj_tag: String,
pub obj_tag_rel_kind: Option<TagRelKind>,
pub describe: String,
pub obj_current_state_id: Option<Vec<String>>,
pub change_condition: Option<StateChangeCondition>,
Expand All @@ -296,6 +301,7 @@ pub struct StateChangeCondition {
#[derive(Serialize, Deserialize, Clone, PartialEq, Debug, poem_openapi::Object, sea_orm::FromJsonQueryResult)]
pub struct StateChangeConditionItem {
pub obj_tag: Option<String>,
pub obj_tag_rel_kind: Option<TagRelKind>,
pub state_id: Vec<String>,
pub op: StateChangeConditionOp,
}
Expand All @@ -306,6 +312,21 @@ pub enum StateChangeConditionOp {
Or,
}

#[derive(Serialize, Deserialize, Clone, PartialEq, Debug, poem_openapi::Enum)]
pub enum TagRelKind {
ParentFeed,
SubFeed,
}

impl From<TagRelKind> for String {
fn from(kind: TagRelKind) -> Self {
match kind {
TagRelKind::ParentFeed => "PARENT_FEED".to_string(),
TagRelKind::SubFeed => "SUB_FEED".to_string(),
}
}
}

#[derive(Default)]
pub struct FlowTransitionInitInfo {
pub from_flow_state_name: String,
Expand Down
12 changes: 6 additions & 6 deletions middleware/flow/src/flow_initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,15 @@ pub async fn init_db(mut funs: TardisFunsInst) -> TardisResult<()> {
funs.begin().await?;
if check_initialized(&funs, &ctx).await? {
init_basic_info(&funs).await?;
self::modify_post_actions(&funs, &ctx).await?;
// self::modify_post_actions(&funs, &ctx).await?;
} else {
let db_kind = TardisFuns::reldb().backend();
let compatible_type = TardisFuns::reldb().compatible_type();
funs.db().init(flow_state::ActiveModel::init(db_kind, None, compatible_type.clone())).await?;
funs.db().init(flow_model::ActiveModel::init(db_kind, None, compatible_type.clone())).await?;
funs.db().init(flow_transition::ActiveModel::init(db_kind, None, compatible_type.clone())).await?;
funs.db().init(flow_inst::ActiveModel::init(db_kind, None, compatible_type.clone())).await?;
funs.db().init(flow_config::ActiveModel::init(db_kind, None, compatible_type.clone())).await?;
funs.db().init(flow_state::ActiveModel::init(db_kind, None, compatible_type)).await?;
funs.db().init(flow_model::ActiveModel::init(db_kind, None, compatible_type)).await?;
funs.db().init(flow_transition::ActiveModel::init(db_kind, None, compatible_type)).await?;
funs.db().init(flow_inst::ActiveModel::init(db_kind, None, compatible_type)).await?;
funs.db().init(flow_config::ActiveModel::init(db_kind, None, compatible_type)).await?;
init_rbum_data(&funs, &ctx).await?;
};
funs.commit().await?;
Expand Down
19 changes: 16 additions & 3 deletions middleware/flow/src/serv/flow_external_serv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ use tardis::{
};

use crate::{
dto::flow_external_dto::{
FlowExternalFetchRelObjResp, FlowExternalKind, FlowExternalModifyFieldResp, FlowExternalNotifyChangesResp, FlowExternalParams, FlowExternalQueryFieldResp, FlowExternalReq,
FlowExternalResp,
dto::{
flow_external_dto::{
FlowExternalFetchRelObjResp, FlowExternalKind, FlowExternalModifyFieldResp, FlowExternalNotifyChangesResp, FlowExternalParams, FlowExternalQueryFieldResp,
FlowExternalReq, FlowExternalResp,
},
flow_state_dto::FlowSysStateKind,
},
flow_config::FlowConfig,
flow_constants,
Expand Down Expand Up @@ -67,7 +70,9 @@ impl FlowExternalServ {
rel_business_obj_id: &str,
inst_id: &str,
target_state: Option<String>,
target_sys_state: Option<FlowSysStateKind>,
original_state: Option<String>,
original_sys_state: Option<FlowSysStateKind>,
params: Vec<FlowExternalParams>,
ctx: &TardisContext,
funs: &TardisFunsInst,
Expand All @@ -84,7 +89,9 @@ impl FlowExternalServ {
curr_tag: tag.to_string(),
curr_bus_obj_id: rel_business_obj_id.to_string(),
target_state,
target_sys_state,
original_state,
original_sys_state,
params,
..Default::default()
};
Expand All @@ -110,7 +117,9 @@ impl FlowExternalServ {
inst_id: &str,
rel_business_obj_id: &str,
target_state: String,
target_sys_state: FlowSysStateKind,
original_state: String,
original_sys_state: FlowSysStateKind,
ctx: &TardisContext,
funs: &TardisFunsInst,
) -> TardisResult<FlowExternalNotifyChangesResp> {
Expand All @@ -126,7 +135,9 @@ impl FlowExternalServ {
curr_tag: tag.to_string(),
curr_bus_obj_id: rel_business_obj_id.to_string(),
target_state: Some(target_state),
target_sys_state: Some(target_sys_state),
original_state: Some(original_state),
original_sys_state: Some(original_sys_state),
..Default::default()
};
debug!("do_notify_changes body: {:?}", body);
Expand Down Expand Up @@ -187,7 +198,9 @@ impl FlowExternalServ {
owner_paths: own_paths.to_string(),
obj_ids: rel_business_obj_ids,
target_state: None,
target_sys_state: None,
original_state: None,
original_sys_state: None,
params: vec![],
};
debug!("do_query_field body: {:?}", body);
Expand Down
39 changes: 27 additions & 12 deletions middleware/flow/src/serv/flow_inst_serv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,9 @@ impl FlowInstServ {
&flow_inst_detail.rel_business_obj_id,
&flow_inst_detail.id,
Some(next_flow_state.name.clone()),
Some(next_flow_state.sys_state.clone()),
Some(prev_flow_state.name.clone()),
Some(prev_flow_state.sys_state.clone()),
params,
ctx,
funs,
Expand Down Expand Up @@ -776,7 +778,9 @@ impl FlowInstServ {
&flow_inst_detail.id,
&flow_inst_detail.rel_business_obj_id,
next_flow_state.name.clone(),
next_flow_state.sys_state,
prev_flow_state.name.clone(),
prev_flow_state.sys_state,
ctx,
funs,
)
Expand Down Expand Up @@ -830,9 +834,13 @@ impl FlowInstServ {
}
let rel_tag = change_info.obj_tag.unwrap_or_default();
if !rel_tag.is_empty() {
let obj_tag = if let Some(obj_tag_rel_kind) = change_info.obj_tag_rel_kind.clone() {
String::from(obj_tag_rel_kind)
} else {
rel_tag.clone()
};
let mut resp =
FlowExternalServ::do_fetch_rel_obj(&current_model.tag, &current_inst.id, &current_inst.rel_business_obj_id, vec![rel_tag.clone()], ctx, funs)
.await?;
FlowExternalServ::do_fetch_rel_obj(&current_model.tag, &current_inst.id, &current_inst.rel_business_obj_id, vec![obj_tag], ctx, funs).await?;
if !resp.rel_bus_objs.is_empty() {
for rel_bus_obj_id in resp.rel_bus_objs.pop().unwrap().rel_bus_obj_ids {
let inst_id = Self::get_inst_ids_by_rel_business_obj_id(vec![rel_bus_obj_id.clone()], funs, ctx).await?.pop().unwrap_or_default();
Expand All @@ -842,6 +850,8 @@ impl FlowInstServ {
&inst_id,
None,
None,
None,
None,
vec![FlowExternalParams {
rel_tag: None,
var_id: None,
Expand All @@ -862,6 +872,8 @@ impl FlowInstServ {
&current_inst.id,
None,
None,
None,
None,
vec![FlowExternalParams {
rel_tag: None,
var_id: None,
Expand All @@ -878,15 +890,13 @@ impl FlowInstServ {
}
FlowTransitionActionChangeKind::State => {
if let Some(change_info) = post_change.state_change_info {
let mut resp = FlowExternalServ::do_fetch_rel_obj(
&current_model.tag,
&current_inst.id,
&current_inst.rel_business_obj_id,
vec![change_info.obj_tag.clone()],
ctx,
funs,
)
.await?;
let obj_tag = if let Some(obj_tag_rel_kind) = change_info.obj_tag_rel_kind.clone() {
String::from(obj_tag_rel_kind)
} else {
change_info.obj_tag.clone()
};
let mut resp =
FlowExternalServ::do_fetch_rel_obj(&current_model.tag, &current_inst.id, &current_inst.rel_business_obj_id, vec![obj_tag], ctx, funs).await?;
if !resp.rel_bus_objs.is_empty() {
let inst_ids = Self::find_inst_ids_by_rel_obj_ids(resp.rel_bus_objs.pop().unwrap().rel_bus_obj_ids, &change_info, funs, ctx).await?;
Self::do_modify_state_by_post_action(inst_ids, &change_info, updated_instance_list, funs, ctx).await?;
Expand Down Expand Up @@ -915,7 +925,12 @@ impl FlowInstServ {
let mut rel_tags = vec![];
for condition_item in change_condition.conditions.iter() {
if condition_item.obj_tag.is_some() && !condition_item.state_id.is_empty() {
rel_tags.push(condition_item.obj_tag.clone().unwrap());
let obj_tag = if let Some(obj_tag_rel_kind) = condition_item.obj_tag_rel_kind.clone() {
String::from(obj_tag_rel_kind)
} else {
condition_item.obj_tag.clone().unwrap()
};
rel_tags.push(obj_tag);
}
}
let inst_id = Self::get_inst_ids_by_rel_business_obj_id(vec![rel_obj_id.clone()], funs, ctx).await?.pop().unwrap_or_default();
Expand Down
21 changes: 4 additions & 17 deletions middleware/flow/tests/test_flow_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,22 @@ async fn test_flow_api() -> TardisResult<()> {
let docker = testcontainers::clients::Cli::default();
let _x = init_rbum_test_container::init(&docker, None).await?;

let funs = flow_constants::get_tardis_inst();
flow_initializer::init_db(funs).await?;

let web_server = TardisFuns::web_server();
flow_initializer::init(&web_server).await.unwrap();
kv_initializer::init(&web_server).await.unwrap();
web_server.add_module("mock", mock_api::MockApi).await;
init_spi_kv().await?;

tokio::spawn(async move {
web_server.start().await.unwrap();
});

sleep(Duration::from_millis(500)).await;

let mut flow_client = TestHttpClient::new("https://localhost:8080/flow".to_string());
let mut kv_client = TestHttpClient::new("https://localhost:8080/spi-kv".to_string());
let mut flow_client = TestHttpClient::new(format!("https://localhost:8080/{}", flow_constants::DOMAIN_CODE));
init_flow_data().await?;
init_spi_kv().await?;

test_flow_scenes_fsm::test(&mut flow_client, &mut kv_client).await?;
test_flow_scenes_fsm::test(&mut flow_client).await?;
truncate_flow_data().await?;

Ok(())
Expand Down Expand Up @@ -75,16 +72,6 @@ async fn init_spi_kv() -> TardisResult<()> {
// Initialize RBUM
bios_basic::rbum::rbum_initializer::init(kv_constants::DOMAIN_CODE, RbumConfig::default()).await?;

let web_server = TardisFuns::web_server();
// Initialize SPI KV
kv_initializer::init(&web_server).await.unwrap();

tokio::spawn(async move {
web_server.start().await.unwrap();
});

sleep(Duration::from_millis(500)).await;

let funs = TardisFuns::inst_with_db_conn(kv_constants::DOMAIN_CODE.to_string(), None);
let kind_id = RbumKindServ::get_rbum_kind_id_by_code(spi_constants::SPI_PG_KIND_CODE, &funs).await?.unwrap();
let ctx = TardisContext {
Expand Down
9 changes: 7 additions & 2 deletions middleware/flow/tests/test_flow_scenes_fsm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use tardis::web::poem_openapi::types::Type;
use tardis::web::web_resp::{TardisPage, Void};
use tardis::TardisFuns;

pub async fn test(flow_client: &mut TestHttpClient, _kv_client: &mut TestHttpClient) -> TardisResult<()> {
pub async fn test(flow_client: &mut TestHttpClient) -> TardisResult<()> {
info!("【test_flow_scenes_fsm】");

let mut ctx = TardisContext {
Expand All @@ -46,7 +46,7 @@ pub async fn test(flow_client: &mut TestHttpClient, _kv_client: &mut TestHttpCli

// 1. enter platform
// 1-1. check default model
let mut models: TardisPage<FlowModelSummaryResp> = flow_client.get("/cc/model/?tag=REQ&page_number=1&page_size=100").await;
let mut models: TardisPage<FlowModelSummaryResp> = flow_client.get("/cc/model?tag=REQ&page_number=1&page_size=100").await;
let init_model = models.records.pop().unwrap();
info!("models: {:?}", init_model);
assert_eq!(&init_model.name, "待开始-进行中-已完成-已关闭");
Expand Down Expand Up @@ -223,11 +223,13 @@ pub async fn test(flow_client: &mut TestHttpClient, _kv_client: &mut TestHttpCli
kind: FlowTransitionActionChangeKind::State,
describe: "".to_string(),
obj_tag: Some("TICKET".to_string()),
obj_tag_rel_kind: None,
obj_current_state_id: Some(vec![ticket_model_agg.init_state_id.clone()]),
change_condition: Some(StateChangeCondition {
current: true,
conditions: vec![StateChangeConditionItem {
obj_tag: Some("ITER".to_string()),
obj_tag_rel_kind: None,
state_id: vec![iter_model_agg.init_state_id.clone()],
op: StateChangeConditionOp::And,
}],
Expand Down Expand Up @@ -267,6 +269,7 @@ pub async fn test(flow_client: &mut TestHttpClient, _kv_client: &mut TestHttpCli
kind: FlowTransitionActionChangeKind::Var,
describe: "".to_string(),
obj_tag: Some("".to_string()),
obj_tag_rel_kind: None,
obj_current_state_id: None,
change_condition: None,
changed_state_id: "".to_string(),
Expand Down Expand Up @@ -350,6 +353,7 @@ pub async fn test(flow_client: &mut TestHttpClient, _kv_client: &mut TestHttpCli
kind: FlowTransitionActionChangeKind::State,
describe: "".to_string(),
obj_tag: Some("TICKET".to_string()),
obj_tag_rel_kind: None,
obj_current_state_id: None,
change_condition: None,
changed_state_id: ticket_model_agg.states.iter().find(|state| state.name == "处理中").unwrap().id.clone(),
Expand Down Expand Up @@ -393,6 +397,7 @@ pub async fn test(flow_client: &mut TestHttpClient, _kv_client: &mut TestHttpCli
kind: FlowTransitionActionChangeKind::State,
describe: "".to_string(),
obj_tag: Some("PROJ".to_string()),
obj_tag_rel_kind: None,
obj_current_state_id: None,
change_condition: None,
changed_state_id: proj_model_agg.states.iter().find(|state| state.name == "存在风险").unwrap().id.clone(),
Expand Down
2 changes: 1 addition & 1 deletion spi/spi-search/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ spi-es = ["tardis/web-client"]

[dependencies]
serde.workspace = true
tardis = { workspace = true, features = ["reldb-postgres", "web-server", "web-client"] }
tardis = { workspace = true, features = ["reldb-postgres", "web-server", "web-client", "openapi-rapidoc"] }
bios-basic = { path = "../../basic", features = ["default"] }
strum = { workerspace = true, features = ["derive"] }
pinyin = { version = "0.10" }
Expand Down
6 changes: 5 additions & 1 deletion spi/spi-search/src/serv/pg/search_pg_item_serv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,11 @@ pub async fn modify(tag: &str, key: &str, modify_req: &mut SearchItemModifyReq,
sql_sets.push(format!("title = ${}", params.len() + 1));
params.push(Value::from(title));
sql_sets.push(format!("title_tsv = to_tsvector('public.chinese_zh', ${})", params.len() + 1));
params.push(Value::from(format!("{},{}", title, generate_word_combinations(to_pinyin_vec(title, Pinyin::plain)).join(","))));
params.push(Value::from(format!(
"{},{}",
title,
generate_word_combinations(to_pinyin_vec(title, Pinyin::plain)).join(",")
)));
};
if let Some(content) = &modify_req.content {
sql_sets.push(format!("content = ${}", params.len() + 1));
Expand Down
Loading

0 comments on commit 1073bbf

Please sign in to comment.