Skip to content

Commit

Permalink
spi-stats:fix dimension hierarchy.
Browse files Browse the repository at this point in the history
  • Loading branch information
ljl committed Mar 21, 2024
1 parent a0f7f46 commit b1853b2
Showing 1 changed file with 99 additions and 7 deletions.
106 changes: 99 additions & 7 deletions spi/spi-stats/src/serv/pg/stats_pg_metric_serv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,20 @@ use bios_basic::spi::{
use itertools::Itertools;
use tardis::{
basic::{dto::TardisContext, error::TardisError, result::TardisResult},
config::config_dto::log,
db::{
reldb_client::TardisRelDBClient,
reldb_client::{TardisRelDBClient, TardisRelDBlConnection},
sea_orm::{self, FromQueryResult, Value},
},
log::info,
serde_json::{self, json, Map},
web::poem_openapi::types::{ToJSON, Type},
TardisFunsInst,
};

use crate::{
dto::stats_query_dto::{StatsQueryMetricsReq, StatsQueryMetricsResp},
dto::stats_query_dto::{StatsQueryMetricsReq, StatsQueryMetricsResp, StatsQueryStatementReq, StatsQueryStatementResp},
serv::stats_record_serv::dim_record_paginate,
stats_enumeration::{StatsDataTypeKind, StatsFactColKind},
};

Expand Down Expand Up @@ -115,11 +118,13 @@ pub async fn query_metrics(query_req: &StatsQueryMetricsReq, funs: &TardisFunsIn
col.key as col_key,
col.show_name as show_name,
col.kind as col_kind,
col.dim_rel_conf_dim_key as dim_rel_conf_dim_key,
col.dim_multi_values as dim_multi_values,
col.mes_data_distinct as mes_data_distinct,
col.mes_data_type as mes_data_type,
col.mes_unit as mes_unit,
dim.data_type as dim_data_type,
dim.hierarchy as dim_hierarchy,
fact.query_limit as query_limit
FROM
{fact_col_conf_table_name} col
Expand Down Expand Up @@ -153,6 +158,16 @@ pub async fn query_metrics(query_req: &StatsQueryMetricsReq, funs: &TardisFunsIn
Some(item.try_get("", "dim_data_type")?)
},
query_limit: item.try_get("", "query_limit")?,
dim_rel_conf_dim_key: if item.try_get::<Option<String>>("", "dim_rel_conf_dim_key")?.is_none() {
None
} else {
Some(item.try_get("", "dim_rel_conf_dim_key")?)
},
dim_hierarchy: if item.try_get::<Option<Vec<String>>>("", "dim_hierarchy")?.is_none() {
None
} else {
Some(item.try_get("", "dim_hierarchy")?)
},
})
})
.collect::<TardisResult<Vec<StatsConfInfo>>>()?;
Expand All @@ -176,7 +191,9 @@ pub async fn query_metrics(query_req: &StatsQueryMetricsReq, funs: &TardisFunsIn
dim_multi_values: Some(false),
mes_data_distinct: Some(true),
mes_data_type: Some(StatsDataTypeKind::String),
dim_rel_conf_dim_key: None,
dim_data_type: None,
dim_hierarchy: None,
query_limit,
});
conf_info.push(StatsConfInfo {
Expand All @@ -186,7 +203,9 @@ pub async fn query_metrics(query_req: &StatsQueryMetricsReq, funs: &TardisFunsIn
dim_multi_values: Some(false),
mes_data_distinct: Some(true),
mes_data_type: None,
dim_rel_conf_dim_key: None,
dim_data_type: Some(StatsDataTypeKind::DateTime),
dim_hierarchy: None,
query_limit,
});
conf_info.push(StatsConfInfo {
Expand All @@ -196,7 +215,9 @@ pub async fn query_metrics(query_req: &StatsQueryMetricsReq, funs: &TardisFunsIn
dim_multi_values: Some(false),
mes_data_distinct: Some(true),
mes_data_type: Some(StatsDataTypeKind::Int),
dim_rel_conf_dim_key: None,
dim_data_type: None,
dim_hierarchy: None,
query_limit,
});

Expand Down Expand Up @@ -559,15 +580,52 @@ pub async fn query_metrics(query_req: &StatsQueryMetricsReq, funs: &TardisFunsIn
let select_measure_keys =
sql_part_outer_select_infos.iter().filter(|(_, _, _, is_dimension)| !*is_dimension).map(|(_, alias_name, _, _)| alias_name.to_string()).collect::<Vec<String>>();
let show_names = sql_part_outer_select_infos.into_iter().map(|(_, alias_name, show_name, _)| (alias_name, show_name)).collect::<HashMap<String, String>>();
let dim_record_agg = package_dim_record_agg(conf_info.clone(), funs, ctx).await?;
Ok(StatsQueryMetricsResp {
from: query_req.from.to_string(),
show_names,
group: package_groups(select_dimension_keys, &select_measure_keys, ignore_group_agg, result)
group: package_groups(dim_record_agg, conf_info, select_dimension_keys, &select_measure_keys, ignore_group_agg, result)
.map_err(|msg| TardisError::internal_error(&format!("Fail to package groups: {msg}"), "500-spi-stats-internal-error"))?,
})
}

// todo 下钻 上探
async fn package_dim_record_agg(
conf_info: HashMap<String, StatsConfInfo>,
funs: &TardisFunsInst,
ctx: &TardisContext,
) -> TardisResult<HashMap<String, HashMap<String, serde_json::Value>>> {
let mut result: HashMap<String, HashMap<String, serde_json::Value>> = HashMap::new();
for (col_key, conf) in conf_info.clone() {
if conf.dim_rel_conf_dim_key.is_none() {
continue;
}
let dimension_key = conf.dim_rel_conf_dim_key.unwrap_or_default();
let dimension_hierarchy = if let Some(stats_con_info) = conf_info.get(&col_key) {
stats_con_info.dim_hierarchy.clone()
} else {
None
};
if dimension_hierarchy.unwrap_or(vec![]).len() > 0 {
let dim: HashMap<String, serde_json::Value> = dim_record_paginate(dimension_key.clone(), None, None, 1, 9999, None, None, funs, ctx)
.await?
.records
.into_iter()
.map(|dim| {
(String::from(serde_json::json!(dim.get("key")).as_str().unwrap_or("")), dim)
})
.collect();
result.insert(format!("{}{FUNCTION_SUFFIX_FLAG}", col_key.clone()), dim);
} else {
result.insert(format!("{}{FUNCTION_SUFFIX_FLAG}", col_key), HashMap::new());
}
}
Ok(result)
}

fn package_groups(
dim_record_agg: HashMap<String, HashMap<String, serde_json::Value>>,
conf_info: HashMap<String, StatsConfInfo>,
curr_select_dimension_keys: Vec<String>,
select_measure_keys: &Vec<String>,
ignore_group_agg: bool,
Expand Down Expand Up @@ -599,6 +657,14 @@ fn package_groups(
let mut node = Map::with_capacity(0);

let dimension_key = curr_select_dimension_keys.first().ok_or("curr_select_dimension_keys is empty")?;

// todo 下钻 上探
// let dimension_hierarchy = if let Some(stats_con_info) = conf_info.get(dimension_key.split(FUNCTION_SUFFIX_FLAG).next().unwrap_or("")) {
// stats_con_info.dim_hierarchy.clone()
// } else {
// None
// };
// let dimension_hierarchy_len = dimension_hierarchy.unwrap_or(vec![]).len() as i32;
let mut groups = HashMap::new();
let mut order = Vec::new();
for record in result {
Expand Down Expand Up @@ -626,7 +692,32 @@ fn package_groups(
}
for key in order {
let group = groups.get(&key).expect("groups shouldn't miss the value of key in order");
let sub = package_groups(curr_select_dimension_keys[1..].to_vec(), select_measure_keys, ignore_group_agg, group.to_vec())?;
let sub = package_groups(
dim_record_agg.clone(),
conf_info.clone(),
curr_select_dimension_keys[1..].to_vec(),
select_measure_keys,
ignore_group_agg,
group.to_vec(),
)?;
// todo 下钻 上探
// println!("dimension_key:[{}],key[{}]", dimension_key, key);
// if let Some(dim_record_map) = dim_record_agg.get(dimension_key) {
// println!("dim_record_map:{:?}", dim_record_map);
// if let Some(dim_record) = dim_record_map.get(&key) {
// println!("dim_record:{:?} , {}", dim_record, dimension_hierarchy_len);
// for i in dimension_hierarchy_len..0 {
// let field_key = format!("key{}",i);
// if let Some(val) = dim_record.get(field_key.clone()) {
// if !val.as_str().unwrap_or_default().is_empty() {
// let val = dim_record.get(format!("key{i}")).expect("msg").to_string();
// println!("on [{}]",val );
// node.insert(val, sub.clone());
// }
// }
// }
// }
// }
node.insert(key, sub);
}
Ok(serde_json::Value::Object(node))
Expand All @@ -638,7 +729,6 @@ fn package_groups_agg(record: serde_json::Value) -> Result<serde_json::Value, St
if agg.is_null() {
return Ok(serde_json::Value::Null);
}
println!("{}", agg);
let mut details = Vec::new();
let var_agg = agg.as_str().ok_or("field group_agg should be a string")?;
let vars = var_agg.split(',').collect::<Vec<&str>>();
Expand All @@ -656,14 +746,16 @@ fn package_groups_agg(record: serde_json::Value) -> Result<serde_json::Value, St
}
}

#[derive(sea_orm::FromQueryResult)]
#[derive(sea_orm::FromQueryResult, Clone)]
struct StatsConfInfo {
pub col_key: String,
pub show_name: String,
pub col_kind: StatsFactColKind,
pub dim_multi_values: Option<bool>,
pub mes_data_distinct: Option<bool>,
pub mes_data_type: Option<StatsDataTypeKind>,
pub dim_rel_conf_dim_key: Option<String>,
pub dim_data_type: Option<StatsDataTypeKind>,
pub dim_hierarchy: Option<Vec<String>>,
pub query_limit: i32,
}
}

0 comments on commit b1853b2

Please sign in to comment.