Skip to content

Commit

Permalink
spi-stats:fix fact record idempotent ignore updates.
Browse files Browse the repository at this point in the history
  • Loading branch information
ljl committed Aug 1, 2024
1 parent 89a2945 commit eb14328
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 12 deletions.
13 changes: 13 additions & 0 deletions backend/spi/spi-stats/src/dto/stats_record_dto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ pub struct StatsFactRecordLoadReq {
/// 幂等id
/// ps: 幂等id用于确保同一个请求不会重复处理
pub idempotent_id: Option<String>,

/// ignore updates
/// ps: If idempotent_id has a value and the record is hit, this field takes effect to ignore or update, default is true to ignore updates
///
/// 忽略更新
/// ps: 如果 idempotent_id 有值并且命中纪录,则该字段生效忽略或者进行更新,默认为 true 忽略更新
pub ignore_updates: Option<bool>,

/// Field data
/// 字段数据
Expand Down Expand Up @@ -48,6 +55,12 @@ pub struct StatsFactRecordsLoadReq {
/// 幂等id
/// ps: 幂等id用于确保同一个请求不会重复处理
pub idempotent_id: Option<String>,
/// ignore updates
/// ps: If idempotent_id has a value and the record is hit, this field takes effect to ignore or update, default is true to ignore updates
///
/// 忽略更新
/// ps: 如果 idempotent_id 有值并且命中纪录,则该字段生效忽略或者进行更新,默认为 true 忽略更新
pub ignore_updates: Option<bool>,
/// Field data
/// 字段数据
///
Expand Down
115 changes: 104 additions & 11 deletions backend/spi/spi-stats/src/serv/pg/stats_pg_record_serv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,25 @@ use bios_basic::spi::{
spi_initializer::common_pg::{self, package_table_name},
};
use itertools::Itertools;
use serde_json::Map;
use tardis::{
basic::{dto::TardisContext, result::TardisResult},
chrono::{DateTime, Utc},
db::{
reldb_client::{TardisRelDBClient, TardisRelDBlConnection},
sea_orm::{FromQueryResult, Value},
},
log::{info, trace},
log::info,
serde_json,
web::web_resp::TardisPage,
TardisFuns, TardisFunsInst,
};

use crate::{
dto::stats_record_dto::{StatsDimRecordAddReq, StatsFactRecordLoadReq, StatsFactRecordsLoadReq},
dto::{
stats_conf_dto::StatsConfFactColInfoResp,
stats_record_dto::{StatsDimRecordAddReq, StatsFactRecordLoadReq, StatsFactRecordsLoadReq},
},
stats_enumeration::StatsFactColKind,
};

Expand Down Expand Up @@ -157,10 +161,14 @@ pub(crate) async fn fact_record_load(
"400-spi-stats-invalid-request",
)
})?;
// 如果存在幂等id 且已经存在对应数据,则丢弃数据
if let Some(idempotent_id) = add_req.idempotent_id {

// 如果存在幂等id 且已经存在对应数据,则根据丢弃数据
if let Some(idempotent_id) = add_req.idempotent_id.clone() {
let idempotent_data_resp = fact_get_idempotent_record_raw(fact_conf_key, &idempotent_id, &conn, ctx).await?;
if idempotent_data_resp.is_some() {
if !add_req.ignore_updates.unwrap_or(true) {
return self::fact_records_modify(fact_conf_key, &idempotent_id, req_data.clone(), fact_col_conf_set, conn, funs, ctx, inst).await;
}
return Ok(());
}
}
Expand Down Expand Up @@ -355,13 +363,6 @@ pub(crate) async fn fact_records_load(
let mut value_sets = vec![];

for add_req in add_req_set {
// 如果存在幂等id 且已经存在对应数据,则丢弃数据
if let Some(idempotent_id) = add_req.idempotent_id.clone() {
let idempotent_data_resp = fact_get_idempotent_record_raw(fact_conf_key, &idempotent_id, &conn, ctx).await?;
if idempotent_data_resp.is_some() {
continue;
}
}
let Some(req_data) = add_req.data.as_object() else {
return Err(funs.err().bad_request(
"fact_record",
Expand All @@ -370,6 +371,16 @@ pub(crate) async fn fact_records_load(
"400-spi-stats-invalid-request",
));
};
// 如果存在幂等id 且已经存在对应数据,则丢弃数据
if let Some(idempotent_id) = add_req.idempotent_id.clone() {
let idempotent_data_resp = fact_get_idempotent_record_raw(fact_conf_key, &idempotent_id, &conn, ctx).await?;
if idempotent_data_resp.is_some() {
if !add_req.ignore_updates.unwrap_or(true) {
return self::fact_records_modify(fact_conf_key, &idempotent_id, req_data.clone(), fact_col_conf_set, conn, funs, ctx, inst).await;
}
continue;
}
}
let mut values = vec![
Value::from(&add_req.key),
Value::from(add_req.own_paths),
Expand Down Expand Up @@ -469,6 +480,88 @@ pub(crate) async fn fact_records_load(
Ok(())
}

async fn fact_records_modify(
fact_conf_key: &str,
idempotent_id: &str,
req_data: Map<String, serde_json::Value>,
fact_col_conf_set: Vec<StatsConfFactColInfoResp>,
conn: TardisRelDBlConnection,
funs: &TardisFunsInst,
ctx: &TardisContext,
inst: &SpiBsInst,
) -> TardisResult<()> {
let mut sql_sets = vec![];
let mut params = vec![Value::from(idempotent_id.to_string())];
for (req_fact_col_key, req_fact_col_value) in req_data {
let fact_col_conf = fact_col_conf_set.iter().find(|c| c.key == req_fact_col_key).ok_or_else(|| {
funs.err().not_found(
"fact_record",
"load",
&format!("The fact column config [{req_fact_col_key}] not exists."),
"404-spi-stats-fact-col-conf-not-exist",
)
})?;
if fact_col_conf.kind == StatsFactColKind::Dimension {
let Some(key) = fact_col_conf.dim_rel_conf_dim_key.as_ref() else {
return Err(funs.err().not_found("fact_record", "load", "Fail to get conf_dim_key", "400-spi-stats-fail-to-get-dim-config-key"));
};
let Some(dim_conf) = stats_pg_conf_dim_serv::get(key, &conn, ctx, inst).await? else {
return Err(funs.err().not_found(
"fact_record",
"load",
&format!("Fail to get dim_conf by key [{key}]"),
"400-spi˚-stats-fail-to-get-dim-config-key",
));
};
// TODO check value enum when stable_ds = true
sql_sets.push(format!("{} = ${}", req_fact_col_key.to_string(), params.len() + 1));
if fact_col_conf.dim_multi_values.unwrap_or(false) {
params.push(dim_conf.data_type.json_to_sea_orm_value_array(&req_fact_col_value, false)?);
} else {
params.push(dim_conf.data_type.json_to_sea_orm_value(&req_fact_col_value, false)?);
}
} else if fact_col_conf.kind == StatsFactColKind::Measure {
let Some(mes_data_type) = fact_col_conf.mes_data_type.as_ref() else {
return Err(funs.err().bad_request(
"fact_record",
"load",
"Col_conf.mes_data_type shouldn't be empty while fact_col_conf.kind is Measure",
"400-spi-stats-invalid-request",
));
};
sql_sets.push(format!("{} = ${}", req_fact_col_key.to_string(), params.len() + 1));
params.push(mes_data_type.json_to_sea_orm_value(&req_fact_col_value, false)?);
} else {
let Some(req_fact_col_value) = req_fact_col_value.as_str() else {
return Err(funs.err().bad_request(
"fact_record",
"load",
&format!("For the key [{req_fact_col_key}], value: [{req_fact_col_value}] is not a string"),
"400-spi-stats-invalid-request",
));
};
sql_sets.push(format!("{} = ${}", req_fact_col_key.to_string(), params.len() + 1));
params.push(req_fact_col_value.into());
}
}
if sql_sets.is_empty() {
return Err(funs.err().bad_request("fact_record", "load", &format!("The fact column no data"), "400-spi-stats-invalid-request"));
}
let table_name = package_table_name(&format!("stats_inst_fact_{fact_conf_key}"), ctx);
conn.execute_one(
&format!(
r#"UPDATE {table_name}
SET {}
WHERE idempotent_id = $1"#,
sql_sets.join(",")
),
params,
)
.await?;
conn.commit().await?;
Ok(())
}

pub(crate) async fn fact_record_delete(fact_conf_key: &str, fact_record_key: &str, funs: &TardisFunsInst, ctx: &TardisContext, inst: &SpiBsInst) -> TardisResult<()> {
let bs_inst = inst.inst::<TardisRelDBClient>();
let (mut conn, _) = common_pg::init_conn(bs_inst).await?;
Expand Down
41 changes: 40 additions & 1 deletion backend/spi/spi-stats/tests/test_stats_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ pub async fn test_fact_record(client: &mut TestHttpClient) -> TardisResult<()> {
data: json!({}),
ext: None,
idempotent_id: None,
ignore_updates: None
},
)
.await
Expand All @@ -132,6 +133,7 @@ pub async fn test_fact_record(client: &mut TestHttpClient) -> TardisResult<()> {
data: json!({"xx":1}),
ext: Some(json!({"xx":1})),
idempotent_id: None,
ignore_updates: None
},
)
.await
Expand Down Expand Up @@ -183,6 +185,35 @@ pub async fn test_fact_record(client: &mut TestHttpClient) -> TardisResult<()> {
// "404-spi-stats-fact_record-load"
// );

let _: Void = client
.put(
"/ci/record/fact/req/rec1",
&StatsFactRecordLoadReq {
own_paths: "t1/a1".to_string(),
ct: Utc::now(),
data: json!({
"source":"zhejiang",
"status": "open",
"priority":1,
"tag":["t1","t2"],
"creator":"acc001",
"act_hours": 41,
"plan_hours": 45
}),
ext: Some(json!({
"source":"zhejiang",
"status": "open",
"priority":1,
"tag":["t1","t2"],
"creator":"acc001",
"act_hours": 40,
"plan_hours": 45
})),
idempotent_id: Some("1".to_string()),
ignore_updates: Some(false),
},
)
.await;
let _: Void = client
.put(
"/ci/record/fact/req/rec1",
Expand All @@ -207,7 +238,8 @@ pub async fn test_fact_record(client: &mut TestHttpClient) -> TardisResult<()> {
"act_hours": 40,
"plan_hours": 45
})),
idempotent_id: None,
idempotent_id: Some("1".to_string()),
ignore_updates: Some(false),
},
)
.await;
Expand Down Expand Up @@ -236,6 +268,7 @@ pub async fn test_fact_record(client: &mut TestHttpClient) -> TardisResult<()> {
"plan_hours": 10
})),
idempotent_id: None,
ignore_updates: None,
},
)
.await;
Expand All @@ -258,6 +291,7 @@ pub async fn test_fact_record(client: &mut TestHttpClient) -> TardisResult<()> {
"status": "progress",
})),
idempotent_id: None,
ignore_updates: None,
},
)
.await;
Expand All @@ -275,6 +309,7 @@ pub async fn test_fact_record(client: &mut TestHttpClient) -> TardisResult<()> {
"priority": 1,
})),
idempotent_id: None,
ignore_updates: None,
},
)
.await;
Expand All @@ -293,6 +328,7 @@ pub async fn test_fact_record(client: &mut TestHttpClient) -> TardisResult<()> {
data: json!({}),
ext: None,
idempotent_id: None,
ignore_updates: None
}],
)
.await
Expand All @@ -312,6 +348,7 @@ pub async fn test_fact_record(client: &mut TestHttpClient) -> TardisResult<()> {
data: json!({}),
ext: None,
idempotent_id: None,
ignore_updates: None
}],
)
.await
Expand Down Expand Up @@ -371,6 +408,7 @@ pub async fn test_fact_record(client: &mut TestHttpClient) -> TardisResult<()> {
"plan_hours": 45
})),
idempotent_id: None,
ignore_updates: None,
},
StatsFactRecordsLoadReq {
key: "rec4".to_string(),
Expand All @@ -395,6 +433,7 @@ pub async fn test_fact_record(client: &mut TestHttpClient) -> TardisResult<()> {
"plan_hours": 45
})),
idempotent_id: None,
ignore_updates: None,
},
],
)
Expand Down

0 comments on commit eb14328

Please sign in to comment.