Skip to content

Commit

Permalink
flow: front condition (#482)
Browse files Browse the repository at this point in the history
* flow: update dto

* flow:update

* flow: front condition finish
  • Loading branch information
ZzIsGod1019 authored Oct 12, 2023
1 parent 7d1a2da commit 95e10c7
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 42 deletions.
27 changes: 24 additions & 3 deletions middleware/flow/src/dto/flow_transition_dto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ pub struct FlowTransitionActionChangeInfo {
pub current: bool,
pub var_name: String,
pub changed_val: Option<Value>,
pub changed_current_time: Option<bool>,
pub changed_kind: Option<FlowTransitionActionByVarChangeInfoChangedKind>,
}

impl From<FlowTransitionActionChangeInfo> for FlowTransitionActionChangeAgg {
Expand All @@ -232,7 +232,7 @@ impl From<FlowTransitionActionChangeInfo> for FlowTransitionActionChangeAgg {
obj_tag: value.obj_tag,
var_name: value.var_name,
changed_val: value.changed_val,
changed_current_time: value.changed_current_time,
changed_kind: value.changed_kind,
}),
state_change_info: None,
},
Expand Down Expand Up @@ -263,7 +263,18 @@ pub struct FlowTransitionActionByVarChangeInfo {
pub obj_tag: Option<String>,
pub var_name: String,
pub changed_val: Option<Value>,
pub changed_current_time: Option<bool>,
pub changed_kind: Option<FlowTransitionActionByVarChangeInfoChangedKind>,
}

#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize, poem_openapi::Enum, strum::EnumIter, sea_orm::DeriveActiveEnum)]
#[sea_orm(rs_type = "String", db_type = "String(Some(255))")]
pub enum FlowTransitionActionByVarChangeInfoChangedKind {
#[sea_orm(string_value = "clean")]
Clean,
#[sea_orm(string_value = "change_content")]
ChangeContent,
#[sea_orm(string_value = "auto_get_operate_time")]
AutoGetOperateTime,
}

#[derive(Serialize, Deserialize, Clone, PartialEq, Debug, poem_openapi::Object, sea_orm::FromJsonQueryResult)]
Expand Down Expand Up @@ -366,6 +377,9 @@ pub enum FlowTransitionFrontActionInfoRelevanceRelation {
#[serde(rename = "not_in")]
#[oai(rename = "not_in")]
NotIn,
#[serde(rename = "between")]
#[oai(rename = "between")]
Between,
}

impl FlowTransitionFrontActionInfoRelevanceRelation {
Expand All @@ -381,6 +395,13 @@ impl FlowTransitionFrontActionInfoRelevanceRelation {
FlowTransitionFrontActionInfoRelevanceRelation::NotLike => !left_value.contains(&right_value),
FlowTransitionFrontActionInfoRelevanceRelation::In => TardisFuns::json.str_to_obj::<Vec<String>>(&right_value).unwrap_or_default().contains(&left_value),
FlowTransitionFrontActionInfoRelevanceRelation::NotIn => !TardisFuns::json.str_to_obj::<Vec<String>>(&right_value).unwrap_or_default().contains(&left_value),
FlowTransitionFrontActionInfoRelevanceRelation::Between => {
let time_interval = TardisFuns::json.str_to_obj::<Vec<String>>(&right_value).unwrap_or_default();
if time_interval.len() != 2 {
return false;
}
left_value >= time_interval[0] && left_value <= time_interval[1]
}
}
}
}
Expand Down
54 changes: 52 additions & 2 deletions middleware/flow/src/flow_initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,17 @@ use bios_basic::rbum::{
};
use bios_sdk_invoke::invoke_initializer;

use itertools::Itertools;
use serde_json::Value;
use tardis::{
basic::{dto::TardisContext, field::TrimString, result::TardisResult},
db::{reldb_client::TardisActiveModel, sea_orm::sea_query::Table},
db::{
reldb_client::TardisActiveModel,
sea_orm::{
self,
sea_query::{Query, Table},
},
},
log::info,
web::web_server::TardisWebServer,
TardisFuns, TardisFunsInst,
Expand All @@ -24,7 +32,7 @@ use crate::{
dto::{
flow_model_dto::FlowModelFilterReq,
flow_state_dto::FlowSysStateKind,
flow_transition_dto::{FlowTransitionDoubleCheckInfo, FlowTransitionInitInfo},
flow_transition_dto::{FlowTransitionActionChangeInfo, FlowTransitionDoubleCheckInfo, FlowTransitionInitInfo, FlowTransitionActionChangeKind, FlowTransitionActionByVarChangeInfoChangedKind},
},
flow_config::{BasicInfo, FlowBasicInfoManager, FlowConfig},
flow_constants,
Expand Down Expand Up @@ -77,6 +85,7 @@ pub async fn init_db(mut funs: TardisFunsInst) -> TardisResult<()> {
funs.db().init(flow_inst::ActiveModel::init(db_kind, None, compatible_type.clone())).await?;
funs.db().init(flow_config::ActiveModel::init(db_kind, None, compatible_type.clone())).await?;
init_rbum_data(&funs, &ctx).await?;
self::modify_post_actions(&funs, &ctx).await?;
};
funs.commit().await?;
Ok(())
Expand Down Expand Up @@ -115,6 +124,47 @@ async fn init_basic_info<'a>(funs: &TardisFunsInst) -> TardisResult<()> {
Ok(())
}

pub async fn modify_post_actions(funs: &TardisFunsInst, ctx: &TardisContext) -> TardisResult<()> {
#[derive(sea_orm::FromQueryResult)]
pub struct FlowTransactionPostAction {
id: String,
action_by_post_changes: Value,
}
let transactions = funs.db()
.find_dtos::<FlowTransactionPostAction>(
Query::select()
.columns([
(flow_transition::Entity, flow_transition::Column::Id),
(flow_transition::Entity, flow_transition::Column::ActionByPostChanges),
])
.from(flow_transition::Entity),
)
.await?
.into_iter()
.filter(|res| !TardisFuns::json.json_to_obj::<Vec<FlowTransitionActionChangeInfo>>(res.action_by_post_changes.clone()).unwrap_or_default().is_empty())
.collect_vec();
for transaction in transactions {
let mut post_changes = TardisFuns::json.json_to_obj::<Vec<FlowTransitionActionChangeInfo>>(transaction.action_by_post_changes.clone()).unwrap_or_default();
for post_change in post_changes.iter_mut() {
if post_change.changed_kind.is_none() && post_change.kind == FlowTransitionActionChangeKind::Var {
if post_change.changed_val.is_some() {
post_change.changed_kind = Some(FlowTransitionActionByVarChangeInfoChangedKind::ChangeContent);
} else {
post_change.changed_kind = Some(FlowTransitionActionByVarChangeInfoChangedKind::Clean);
}
}
}
let flow_transition = flow_transition::ActiveModel {
id: sea_orm::ActiveValue::Set(transaction.id.clone()),
action_by_post_changes: sea_orm::ActiveValue::Set(TardisFuns::json.obj_to_json(&post_changes)?),
..Default::default()
};
funs.db().update_one(flow_transition, ctx).await?;
}

Ok(())
}

pub async fn init_rbum_data(funs: &TardisFunsInst, ctx: &TardisContext) -> TardisResult<()> {
let kind_state_id = add_kind(flow_constants::RBUM_KIND_STATE_CODE, flow_constants::RBUM_EXT_TABLE_STATE, funs, ctx).await?;
let kind_model_id = add_kind(flow_constants::RBUM_KIND_MODEL_CODE, flow_constants::RBUM_EXT_TABLE_MODEL, funs, ctx).await?;
Expand Down
71 changes: 55 additions & 16 deletions middleware/flow/src/serv/flow_inst_serv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use itertools::Itertools;
use serde_json::json;
use tardis::{
basic::{dto::TardisContext, result::TardisResult},
chrono::{DateTime, Utc},
chrono::{DateTime, SecondsFormat, Utc},
db::sea_orm::{
self,
sea_query::{Alias, Cond, Expr, Query},
Expand All @@ -41,8 +41,8 @@ use crate::{
flow_model_dto::{FlowModelDetailResp, FlowModelFilterReq},
flow_state_dto::{FlowStateFilterReq, FlowStateRelModelExt, FlowSysStateKind},
flow_transition_dto::{
FlowTransitionActionByStateChangeInfo, FlowTransitionActionChangeAgg, FlowTransitionActionChangeInfo, FlowTransitionActionChangeKind, FlowTransitionDetailResp,
FlowTransitionFrontActionInfo, FlowTransitionFrontActionRightValue, StateChangeConditionOp,
FlowTransitionActionByStateChangeInfo, FlowTransitionActionByVarChangeInfoChangedKind, FlowTransitionActionChangeAgg, FlowTransitionActionChangeInfo,
FlowTransitionActionChangeKind, FlowTransitionDetailResp, FlowTransitionFrontActionInfo, FlowTransitionFrontActionRightValue, StateChangeConditionOp,
},
},
serv::{flow_model_serv::FlowModelServ, flow_state_serv::FlowStateServ},
Expand Down Expand Up @@ -585,8 +585,26 @@ impl FlowInstServ {
Ok(())
}

#[async_recursion]
pub async fn transfer(flow_inst_id: &str, transfer_req: &FlowInstTransferReq, funs: &TardisFunsInst, ctx: &TardisContext) -> TardisResult<FlowInstTransferResp> {
// record updated instance id
let mut updated_instance_list: Vec<String> = Vec::new();
let result = Self::do_transfer(flow_inst_id, transfer_req, &mut updated_instance_list, funs, ctx).await;

for updated_instance_id in updated_instance_list {
Self::do_front_change(&updated_instance_id, ctx, funs).await?;
}

result
}

#[async_recursion]
async fn do_transfer(
flow_inst_id: &str,
transfer_req: &FlowInstTransferReq,
updated_instance_list: &mut Vec<String>,
funs: &TardisFunsInst,
ctx: &TardisContext,
) -> TardisResult<FlowInstTransferResp> {
let global_ctx = TardisContext {
own_paths: "".to_string(),
..ctx.clone()
Expand Down Expand Up @@ -725,6 +743,7 @@ impl FlowInstServ {
}

funs.db().update_one(flow_inst, ctx).await?;
updated_instance_list.push(flow_inst_id.to_string());

// get updated instance detail
let flow_inst_detail = Self::get(flow_inst_id, funs, ctx).await?;
Expand Down Expand Up @@ -753,7 +772,7 @@ impl FlowInstServ {
let post_changes =
model_transition.into_iter().find(|model_transition| model_transition.id == next_flow_transition.next_flow_transition_id).unwrap_or_default().action_by_post_changes();
if !post_changes.is_empty() {
Self::do_post_change(&flow_inst_detail, &flow_model, post_changes, ctx, funs).await?;
Self::do_post_change(&flow_inst_detail, &flow_model, post_changes, updated_instance_list, ctx, funs).await?;
}
let next_flow_transitions = Self::do_find_next_transitions(&flow_inst_detail, &flow_model, None, &None, funs, ctx).await?.next_flow_transitions;

Expand Down Expand Up @@ -783,6 +802,7 @@ impl FlowInstServ {
current_inst: &FlowInstDetailResp,
current_model: &FlowModelDetailResp,
post_changes: Vec<FlowTransitionActionChangeInfo>,
updated_instance_list: &mut Vec<String>,
ctx: &TardisContext,
funs: &TardisFunsInst,
) -> TardisResult<()> {
Expand All @@ -791,8 +811,8 @@ impl FlowInstServ {
match post_change.kind {
FlowTransitionActionChangeKind::Var => {
if let Some(mut change_info) = post_change.var_change_info {
if change_info.changed_current_time.is_some() && change_info.changed_current_time.unwrap() {
change_info.changed_val = Some(json!(Utc::now().to_string()));
if change_info.changed_kind.is_some() && change_info.changed_kind.unwrap() == FlowTransitionActionByVarChangeInfoChangedKind::AutoGetOperateTime {
change_info.changed_val = Some(json!(Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true)));
}
let rel_tag = change_info.obj_tag.unwrap_or_default();
if !rel_tag.is_empty() {
Expand All @@ -818,6 +838,7 @@ impl FlowInstServ {
funs,
)
.await?;
updated_instance_list.push(inst_id.clone());
}
}
} else {
Expand All @@ -837,6 +858,7 @@ impl FlowInstServ {
funs,
)
.await?;
updated_instance_list.push(current_inst.id.clone());
}
}
}
Expand All @@ -853,7 +875,7 @@ impl FlowInstServ {
.await?;
if !resp.rel_bus_objs.is_empty() {
let inst_ids = Self::find_inst_ids_by_rel_obj_ids(resp.rel_bus_objs.pop().unwrap().rel_bus_obj_ids, &change_info, funs, ctx).await?;
Self::do_modify_state_by_post_action(inst_ids, &change_info, funs, ctx).await?;
Self::do_modify_state_by_post_action(inst_ids, &change_info, updated_instance_list, funs, ctx).await?;
}
}
}
Expand Down Expand Up @@ -960,6 +982,7 @@ impl FlowInstServ {
async fn do_modify_state_by_post_action(
rel_inst_ids: Vec<String>,
change_info: &FlowTransitionActionByStateChangeInfo,
updated_instance_list: &mut Vec<String>,
funs: &TardisFunsInst,
ctx: &TardisContext,
) -> TardisResult<()> {
Expand Down Expand Up @@ -988,13 +1011,14 @@ impl FlowInstServ {
.collect_vec()
.pop();
if let Some(transition) = transition_resp {
Self::transfer(
Self::do_transfer(
&rel_inst.id,
&FlowInstTransferReq {
flow_transition_id: transition.next_flow_transition_id,
message: None,
vars: None,
},
updated_instance_list,
funs,
ctx,
)
Expand Down Expand Up @@ -1182,11 +1206,14 @@ impl FlowInstServ {
..Default::default()
};
funs.db().update_one(flow_inst, ctx).await?;
Self::do_front_change(flow_inst_id, ctx, funs).await?;

Ok(())
}

async fn do_front_change(flow_inst_detail: &FlowInstDetailResp, ctx: &TardisContext, funs: &TardisFunsInst) -> TardisResult<()> {
#[async_recursion]
async fn do_front_change(flow_inst_id: &str, ctx: &TardisContext, funs: &TardisFunsInst) -> TardisResult<()> {
let flow_inst_detail = Self::get(flow_inst_id, funs, ctx).await?;
let flow_model = FlowModelServ::get_item(
&flow_inst_detail.rel_flow_model_id,
&FlowModelFilterReq {
Expand All @@ -1205,20 +1232,32 @@ impl FlowInstServ {
.transitions()
.into_iter()
.filter(|trans| trans.from_flow_state_id == flow_inst_detail.current_state_id && !trans.action_by_front_changes().is_empty())
.sorted_by_key(|trans| trans.sort)
.collect_vec();
if flow_transitions.is_empty() {
return Ok(());
}
for flow_transition in flow_transitions {
if Self::check_front_conditions(&flow_inst_detail, flow_transition.action_by_front_changes())? {
Self::transfer(
&flow_inst_detail.id,
&FlowInstTransferReq {
flow_transition_id: flow_transition.id.clone(),
message: None,
vars: None,
},
funs,
ctx,
)
.await?;
break;
}
}

Ok(())
}

async fn check_front_conditions(
flow_inst_detail: &FlowInstDetailResp,
conditions: Vec<FlowTransitionFrontActionInfo>,
ctx: &TardisContext,
funs: &TardisFunsInst,
) -> TardisResult<bool> {
fn check_front_conditions(flow_inst_detail: &FlowInstDetailResp, conditions: Vec<FlowTransitionFrontActionInfo>) -> TardisResult<bool> {
if flow_inst_detail.current_vars.is_none() {
return Ok(false);
}
Expand Down
15 changes: 15 additions & 0 deletions middleware/flow/src/serv/flow_model_serv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1196,6 +1196,7 @@ impl FlowModelServ {
current_chain.push(transition_detail.id.clone());

let model_detail = Self::get_item(&transition_detail.rel_flow_model_id, &FlowModelFilterReq::default(), funs, ctx).await?;
// check post changes
let post_changes = transition_detail
.action_by_post_changes()
.into_iter()
Expand Down Expand Up @@ -1245,6 +1246,20 @@ impl FlowModelServ {
}
}
}
// check front changes
let flow_transitions = model_detail
.transitions()
.into_iter()
.filter(|trans| trans.from_flow_state_id == transition_detail.to_flow_state_id && !trans.action_by_front_changes().is_empty())
.sorted_by_key(|trans| trans.sort)
.collect_vec();
for transition_detail in flow_transitions {
(is_ring, current_chain) = Self::check_post_action_ring(transition_detail, (is_ring, current_chain.clone()), funs, ctx).await?;
if is_ring {
return Ok((true, current_chain));
}
}

Ok((is_ring, current_chain))
}

Expand Down
Loading

0 comments on commit 95e10c7

Please sign in to comment.