Skip to content

Commit

Permalink
flow: support artifacts filter (#869)
Browse files Browse the repository at this point in the history
* flow: modify comments

* flow: fix bug (instance artifacts can't save)

* flow: fix bug (modify field failed)

* flow: update

* flow: support artifacts filter
  • Loading branch information
ZzIsGod1019 authored Dec 5, 2024
1 parent a5c5db7 commit 375d16f
Show file tree
Hide file tree
Showing 4 changed files with 299 additions and 13 deletions.
11 changes: 9 additions & 2 deletions backend/middlewares/flow/src/api/cc/flow_cc_inst_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ use tardis::web::web_resp::{TardisApiResult, TardisPage, TardisResp, Void};

use crate::dto::flow_external_dto::FlowExternalCallbackOp;
use crate::dto::flow_inst_dto::{
FlowInstAbortReq, FlowInstDetailResp, FlowInstFindNextTransitionResp, FlowInstFindNextTransitionsReq, FlowInstFindStateAndTransitionsReq, FlowInstFindStateAndTransitionsResp,
FlowInstModifyAssignedReq, FlowInstModifyCurrentVarsReq, FlowInstOperateReq, FlowInstStartReq, FlowInstSummaryResp, FlowInstTransferReq, FlowInstTransferResp,
FlowInstAbortReq, FlowInstDetailResp, FlowInstFindNextTransitionResp, FlowInstFindNextTransitionsReq, FlowInstFindStateAndTransitionsReq, FlowInstFindStateAndTransitionsResp, FlowInstModifyAssignedReq, FlowInstModifyCurrentVarsReq, FlowInstOperateReq, FlowInstSearchReq, FlowInstStartReq, FlowInstSummaryResp, FlowInstTransferReq, FlowInstTransferResp
};
use crate::flow_constants;
use crate::helper::loop_check_helper;
Expand Down Expand Up @@ -249,4 +248,12 @@ impl FlowCcInstApi {
ctx.0.execute_task().await?;
TardisResp::ok(Void {})
}

/// Search Items
#[oai(path = "/search", method = "put")]
async fn search(&self, mut search_req: Json<FlowInstSearchReq>, ctx: TardisContextExtractor) -> TardisApiResult<TardisPage<FlowInstSummaryResp>> {
let funs = flow_constants::get_tardis_inst();
let resp = FlowInstServ::search(&mut search_req.0, &funs, &ctx.0).await?;
TardisResp::ok(resp)
}
}
58 changes: 58 additions & 0 deletions backend/middlewares/flow/src/dto/flow_inst_dto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,3 +498,61 @@ pub struct FlowInstSummaryResult {

pub tag: String,
}

#[derive(poem_openapi::Object, Serialize, Deserialize, Debug)]
pub struct FlowInstSearchReq {
// Search conditions
pub query: FlowInstFilterReq,
// Advanced search
pub query_kind: Option<FlowInstQueryKind>,
// Sort
// When the record set is very large, it will seriously affect the performance, it is not recommended to use.
pub sort: Option<Vec<FlowInstSearchSortReq>>,
pub page: FlowInstSearchPageReq,
}

#[derive(Serialize, Deserialize, Debug, poem_openapi::Enum, Eq, Hash, PartialEq, Clone)]
pub enum FlowInstQueryKind {
/// 全部
All,
/// 待录入
Form,
/// 待审批
Approval,
/// 我创建的
Create,
}

#[derive(poem_openapi::Object, Serialize, Deserialize, Debug, Clone)]
pub struct FlowInstSearchPageReq {
pub number: u32,
pub size: u16,
// Get the total number of matching records.
// When the record set is very large, it will seriously affect the performance. It is not recommended to open it.
pub fetch_total: bool,
}

#[derive(poem_openapi::Object, Serialize, Deserialize, Debug, Clone)]
pub struct FlowInstSearchSortReq {
pub in_field: Option<String>,
#[oai(validator(min_length = "2"))]
pub field: String,
pub order: FlowInstSearchSortKind,
}

#[derive(poem_openapi::Enum, Serialize, Deserialize, Debug, Clone)]
pub enum FlowInstSearchSortKind {
#[oai(rename = "asc")]
Asc,
#[oai(rename = "desc")]
Desc,
}

impl FlowInstSearchSortKind {
pub fn to_sql(&self) -> String {
match self {
FlowInstSearchSortKind::Asc => "ASC".to_string(),
FlowInstSearchSortKind::Desc => "DESC".to_string(),
}
}
}
241 changes: 231 additions & 10 deletions backend/middlewares/flow/src/serv/flow_inst_serv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,11 @@ use tardis::{
};

use crate::{
domain::{flow_inst, flow_model},
domain::flow_inst,
dto::{
flow_external_dto::{FlowExternalCallbackOp, FlowExternalParams},
flow_inst_dto::{
FLowInstStateApprovalConf, FLowInstStateConf, FLowInstStateFormConf, FlowApprovalResultKind, FlowInstAbortReq, FlowInstArtifacts, FlowInstArtifactsModifyReq,
FlowInstBatchBindReq, FlowInstBatchBindResp, FlowInstDetailResp, FlowInstFilterReq, FlowInstFindNextTransitionResp, FlowInstFindNextTransitionsReq,
FlowInstFindStateAndTransitionsReq, FlowInstFindStateAndTransitionsResp, FlowInstOperateReq, FlowInstStartReq, FlowInstSummaryResp, FlowInstSummaryResult,
FlowInstTransferReq, FlowInstTransferResp, FlowInstTransitionInfo, FlowOperationContext,
FLowInstStateApprovalConf, FLowInstStateConf, FLowInstStateFormConf, FlowApprovalResultKind, FlowInstAbortReq, FlowInstArtifacts, FlowInstArtifactsModifyReq, FlowInstBatchBindReq, FlowInstBatchBindResp, FlowInstDetailResp, FlowInstFilterReq, FlowInstFindNextTransitionResp, FlowInstFindNextTransitionsReq, FlowInstFindStateAndTransitionsReq, FlowInstFindStateAndTransitionsResp, FlowInstOperateReq, FlowInstQueryKind, FlowInstSearchPageReq, FlowInstSearchReq, FlowInstSearchSortReq, FlowInstStartReq, FlowInstSummaryResp, FlowInstSummaryResult, FlowInstTransferReq, FlowInstTransferResp, FlowInstTransitionInfo, FlowOperationContext
},
flow_model_dto::FlowModelFilterReq,
flow_model_version_dto::{FlowModelVersionDetailResp, FlowModelVersionFilterReq},
Expand Down Expand Up @@ -207,10 +204,6 @@ impl FlowInstServ {
.left_join(
RBUM_ITEM_TABLE.clone(),
Expr::col((RBUM_ITEM_TABLE.clone(), ID_FIELD.clone())).equals((flow_inst::Entity, flow_inst::Column::RelFlowVersionId)),
)
.left_join(
flow_model::Entity,
Expr::col((flow_model::Entity, flow_model::Column::Id)).equals((flow_inst::Entity, flow_inst::Column::RelFlowVersionId)),
);
if let Some(ids) = &filter.ids {
query.and_where(Expr::col((flow_inst::Entity, flow_inst::Column::Id)).is_in(ids));
Expand Down Expand Up @@ -575,7 +568,7 @@ impl FlowInstServ {
finish: Some(false),
..Default::default()
}, funs, ctx).await?.into_iter().map(|inst| inst.id.clone()).collect_vec();
state_and_next_transitions.iter_mut().map(|item| {
let _ = state_and_next_transitions.iter_mut().map(|item| {
if unfinished_approve_flow_insts.contains(&item.flow_inst_id) {
item.next_flow_transitions.clear();
}
Expand Down Expand Up @@ -1839,4 +1832,232 @@ impl FlowInstServ {

Ok(())
}

// search
pub async fn search(search_req: &mut FlowInstSearchReq, funs: &TardisFunsInst, ctx: &TardisContext) -> TardisResult<TardisPage<FlowInstSummaryResp>> {
let mut where_fragments: Vec<String> = vec!["1=1".to_string()];
let mut sql_vals: Vec<sea_orm::Value> = vec![];
let table_alias_name = "flow_inst";

Self::package_query(table_alias_name, search_req.query.clone(), &mut sql_vals, &mut where_fragments, funs, ctx)?;
Self::package_query_kind(table_alias_name, search_req.query_kind.clone().unwrap_or(FlowInstQueryKind::All), &mut sql_vals, &mut where_fragments, funs, ctx)?;
let order_fragments = Self::package_order(table_alias_name, search_req.sort.clone())?;
let page_fragments = Self::package_page(search_req.page.clone(), &mut sql_vals)?;
let result = funs.db().query_all(format!(
r#"SELECT
flow_inst.id,
flow_inst.rel_flow_version_id,
flow_inst.rel_business_obj_id,
flow_inst.current_state_id,
flow_inst.create_ctx,
flow_inst.create_time,
flow_inst.finish_ctx,
flow_inst.finish_time,
flow_inst.finish_abort,
flow_inst.output_message,
flow_inst.own_paths,
flow_inst.tag,
model_version.name as rel_flow_model_name
{}
FROM
flow_inst
LEFT JOIN flow_state AS current_state ON flow_inst.current_state_id = current_state.id
LEFT JOIN rbum_item AS model_version ON flow_inst.rel_flow_version_id = model_version.id
WHERE
{}
{}
{};"#,
if search_req.page.fetch_total { ", count(*) OVER() AS total" } else { "" },
where_fragments.join(" AND "),
if order_fragments.is_empty() {
"".to_string()
} else {
format!("ORDER BY {}", order_fragments.join(", "))
},
page_fragments
)
.as_str(),
sql_vals
).await?;

let mut total_size: i64 = 0;
let result = result
.into_iter()
.map(|item| {
if search_req.page.fetch_total && total_size == 0 {
total_size = item.try_get("", "total")?;
}
Ok(FlowInstSummaryResp {
id: item.try_get("", "id")?,
rel_flow_version_id: item.try_get("", "rel_flow_version_id")?,
rel_flow_model_name: item.try_get("", "rel_flow_model_name")?,
rel_business_obj_id: item.try_get("", "rel_business_obj_id")?,
current_state_id: item.try_get("", "current_state_id")?,
create_ctx: item.try_get("", "create_ctx")?,
create_time: item.try_get("", "create_time")?,
finish_ctx: item.try_get("", "finish_ctx")?,
finish_time: item.try_get("", "finish_time")?,
finish_abort: item.try_get("", "finish_abort").unwrap_or_default(),
output_message: item.try_get("", "output_message")?,
own_paths: item.try_get("", "own_paths")?,
tag: item.try_get("", "tag")?,
})
})
.collect::<TardisResult<Vec<FlowInstSummaryResp>>>()?;

Ok(TardisPage {
page_size: search_req.page.size as u64,
page_number: search_req.page.number as u64,
total_size: total_size as u64,
records: result,
})
}

fn package_query_kind(table_alias_name: &str, query_kind: FlowInstQueryKind, sql_vals: &mut Vec<sea_orm::Value>, where_fragments: &mut Vec<String>, _funs: &TardisFunsInst, ctx: &TardisContext) -> TardisResult<()> {
match query_kind {
FlowInstQueryKind::Create => {
sql_vals.push(sea_orm::Value::from(ctx.owner.clone()));
where_fragments.push(format!("{}.create_ctx ->> 'owner' = ${}", table_alias_name, sql_vals.len()));
},
FlowInstQueryKind::Form => {
let mut child_or_where_fragments = vec![];
sql_vals.push(sea_orm::Value::from(ctx.owner.clone()));
child_or_where_fragments.push(format!("EXISTS (SELECT 1 FROM jsonb_array_elements_text({}.artifacts->'guard_conf'->'guard_by_spec_account_ids') AS elem WHERE elem IN (${}))", table_alias_name, sql_vals.len()));
if !ctx.roles.is_empty() {
sql_vals.push(sea_orm::Value::from(ctx.roles.clone().join(", ").to_string()));
child_or_where_fragments.push(format!("EXISTS (SELECT 1 FROM jsonb_array_elements_text({}.artifacts->'guard_conf'->'guard_by_spec_role_ids') AS elem WHERE elem IN (${}))", table_alias_name, sql_vals.len()));
}
if !ctx.groups.is_empty() {
sql_vals.push(sea_orm::Value::from(ctx.groups.clone().join(", ").to_string()));
child_or_where_fragments.push(format!("EXISTS (SELECT 1 FROM jsonb_array_elements_text({}.artifacts->'guard_conf'->'guard_by_spec_org_ids') AS elem WHERE elem IN (${}))", table_alias_name, sql_vals.len()));
}
where_fragments.push(format!("current_state.state_kind = 'form' AND ({})", child_or_where_fragments.join(" OR ")));
},
FlowInstQueryKind::Approval => {
let mut child_or_where_fragments = vec![];
sql_vals.push(sea_orm::Value::from(ctx.owner.clone()));
child_or_where_fragments.push(format!("EXISTS (SELECT 1 FROM jsonb_array_elements_text({}.artifacts->'guard_conf'->'guard_by_spec_account_ids') AS elem WHERE elem IN (${}))", table_alias_name, sql_vals.len()));
if !ctx.roles.is_empty() {
sql_vals.push(sea_orm::Value::from(ctx.roles.clone().join(", ").to_string()));
child_or_where_fragments.push(format!("EXISTS (SELECT 1 FROM jsonb_array_elements_text({}.artifacts->'guard_conf'->'guard_by_spec_role_ids') AS elem WHERE elem IN (${}))", table_alias_name, sql_vals.len()));
}
if !ctx.groups.is_empty() {
sql_vals.push(sea_orm::Value::from(ctx.groups.clone().join(", ").to_string()));
child_or_where_fragments.push(format!("EXISTS (SELECT 1 FROM jsonb_array_elements_text({}.artifacts->'guard_conf'->'guard_by_spec_org_ids') AS elem WHERE elem IN (${}))", table_alias_name, sql_vals.len()));
}
where_fragments.push(format!("current_state.state_kind = 'approval' AND ({})", child_or_where_fragments.join(" OR ")));
},
FlowInstQueryKind::All => {
let mut or_where_fragments = vec![];
sql_vals.push(sea_orm::Value::from(ctx.owner.clone()));
or_where_fragments.push(format!("({}.create_ctx ->> 'owner' = ${})", table_alias_name, sql_vals.len()));

let mut form_or_where_fragments = vec![];
sql_vals.push(sea_orm::Value::from(ctx.owner.clone()));
form_or_where_fragments.push(format!("EXISTS (SELECT 1 FROM jsonb_array_elements_text({}.artifacts->'guard_conf'->'guard_by_spec_account_ids') AS elem WHERE elem IN (${}))", table_alias_name, sql_vals.len()));
if !ctx.roles.is_empty() {
sql_vals.push(sea_orm::Value::from(ctx.roles.clone().join(", ").to_string()));
form_or_where_fragments.push(format!("EXISTS (SELECT 1 FROM jsonb_array_elements_text({}.artifacts->'guard_conf'->'guard_by_spec_role_ids') AS elem WHERE elem IN (${}))", table_alias_name, sql_vals.len()));
}
if !ctx.groups.is_empty() {
sql_vals.push(sea_orm::Value::from(ctx.groups.clone().join(", ").to_string()));
form_or_where_fragments.push(format!("EXISTS (SELECT 1 FROM jsonb_array_elements_text({}.artifacts->'guard_conf'->'guard_by_spec_org_ids') AS elem WHERE elem IN (${}))", table_alias_name, sql_vals.len()));
}
or_where_fragments.push(format!("(current_state.state_kind = 'form' AND ({}))", form_or_where_fragments.join(" OR ")));

let mut approval_or_where_fragments = vec![];
sql_vals.push(sea_orm::Value::from(ctx.owner.clone()));
approval_or_where_fragments.push(format!("EXISTS (SELECT 1 FROM jsonb_array_elements_text({}.artifacts->'guard_conf'->'guard_by_spec_account_ids') AS elem WHERE elem IN (${}))", table_alias_name, sql_vals.len()));
if !ctx.roles.is_empty() {
sql_vals.push(sea_orm::Value::from(ctx.roles.clone().join(", ").to_string()));
approval_or_where_fragments.push(format!("EXISTS (SELECT 1 FROM jsonb_array_elements_text({}.artifacts->'guard_conf'->'guard_by_spec_role_ids') AS elem WHERE elem IN (${}))", table_alias_name, sql_vals.len()));
}
if !ctx.groups.is_empty() {
sql_vals.push(sea_orm::Value::from(ctx.groups.clone().join(", ").to_string()));
approval_or_where_fragments.push(format!("EXISTS (SELECT 1 FROM jsonb_array_elements_text({}.artifacts->'guard_conf'->'guard_by_spec_org_ids') AS elem WHERE elem IN (${}))", table_alias_name, sql_vals.len()));
}
or_where_fragments.push(format!("(current_state.state_kind = 'approval' AND ({}))", approval_or_where_fragments.join(" OR ")));
where_fragments.push(format!("( {} )", or_where_fragments.join(" OR ")));
},
}
Ok(())
}

fn package_query(table_alias_name: &str, query: FlowInstFilterReq, sql_vals: &mut Vec<sea_orm::Value>, where_fragments: &mut Vec<String>, _funs: &TardisFunsInst, ctx: &TardisContext) -> TardisResult<()> {
if let Some(ids) = query.ids {
if !ids.is_empty() {
where_fragments.push(format!(
"{}.id = ANY (ARRAY[{}])",
table_alias_name,
(0..ids.len()).map(|idx| format!("${}", sql_vals.len() + idx + 1)).collect::<Vec<String>>().join(",")
));
for id in ids {
sql_vals.push(sea_orm::Value::from(id.to_string()));
}
}
}
if let Some(flow_version_id) = query.flow_version_id {
sql_vals.push(sea_orm::Value::from(flow_version_id));
where_fragments.push(format!("{}.rel_flow_version_id = ${}", table_alias_name, sql_vals.len()));
}
if let Some(rel_business_obj_ids) = query.rel_business_obj_ids {
if !rel_business_obj_ids.is_empty() {
where_fragments.push(format!(
"{}.rel_business_obj_id = ANY (ARRAY[{}])",
table_alias_name,
(0..rel_business_obj_ids.len()).map(|idx| format!("${}", sql_vals.len() + idx + 1)).collect::<Vec<String>>().join(",")
));
for rel_business_obj_id in rel_business_obj_ids {
sql_vals.push(sea_orm::Value::from(rel_business_obj_id.to_string()));
}
}
}
if let Some(tag) = query.tag {
sql_vals.push(sea_orm::Value::from(tag));
where_fragments.push(format!("{}.tag = ${}", table_alias_name, sql_vals.len()));
}
if let Some(main) = query.main {
sql_vals.push(sea_orm::Value::from(main));
where_fragments.push(format!("{}.main = ${}", table_alias_name, sql_vals.len()));
}
if let Some(finish) = query.finish {
if finish {
where_fragments.push(format!("{}.finish_time is not null", table_alias_name));
} else {
where_fragments.push(format!("{}.finish_time is null", table_alias_name));
}
}
if let Some(current_state_id) = query.current_state_id {
sql_vals.push(sea_orm::Value::from(current_state_id));
where_fragments.push(format!("{}.current_state_id = ${}", table_alias_name, sql_vals.len()));
}
if query.with_sub.unwrap_or(false) {
sql_vals.push(sea_orm::Value::from(format!("{}%", ctx.own_paths)));
where_fragments.push(format!("{}.own_paths like ${}", table_alias_name, sql_vals.len()));
} else {
sql_vals.push(sea_orm::Value::from(ctx.own_paths.clone()));
where_fragments.push(format!("{}.own_paths = ${}", table_alias_name, sql_vals.len()));
}
Ok(())
}

fn package_page(page: FlowInstSearchPageReq, sql_vals: &mut Vec<sea_orm::Value>) -> TardisResult<String> {
sql_vals.push(sea_orm::Value::from(page.size));
sql_vals.push(sea_orm::Value::from((page.number - 1) * page.size as u32));
Ok(format!("LIMIT ${} OFFSET ${}", sql_vals.len() - 1, sql_vals.len()))
}

fn package_order(table_alias_name: &str, sort: Option<Vec<FlowInstSearchSortReq>>) -> TardisResult<Vec<String>> {
let mut order_fragments: Vec<String> = Vec::new();
if let Some(sort) = &sort {
for sort_item in sort {
if let Some(in_field) = &sort_item.in_field {
order_fragments.push(format!("{}.{} -> '{}' {}", table_alias_name, in_field,sort_item.field, sort_item.order.to_sql()));
} else {
order_fragments.push(format!("{}.{} {}", table_alias_name, sort_item.field, sort_item.order.to_sql()));
}
}
}
Ok(order_fragments)
}
}
2 changes: 1 addition & 1 deletion backend/spi/spi-search/src/serv/pg/search_pg_item_serv.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashMap, fmt::format, vec};
use std::{collections::HashMap, vec};

use itertools::Itertools;
use pinyin::{to_pinyin_vec, Pinyin};
Expand Down

0 comments on commit 375d16f

Please sign in to comment.