Skip to content

Commit

Permalink
support prometheus queries
Browse files Browse the repository at this point in the history
  • Loading branch information
Jayko001 committed Nov 16, 2023
1 parent a2d64c0 commit 3aeedc9
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 31 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
supabase-wrappers = "0.1"
tokio = { version = "1", features = ["full"] }
urlencoding = "2.1.3"

[dev-dependencies]
pgrx-tests = "=0.9.7"
Expand Down
68 changes: 37 additions & 31 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,40 +8,44 @@ use supabase_wrappers::prelude::*;
use tokio::runtime::Runtime;
pgrx::pg_module_magic!();
use std::time::Duration;
use urlencoding::encode;

// convert response body text to rows
fn resp_to_rows(obj: &str, resp: &JsonValue) -> Vec<Row> {
fn resp_to_rows(obj: &str, resp: &JsonValue, quals: &[Qual]) -> Vec<Row> {
let mut result = Vec::new();

match obj {
"metrics" => {
if let Some(result_array) = resp["data"]["result"].as_array() {
for result_obj in result_array {
let metric_name = result_obj["metric"]["__name__"]
.as_str()
.unwrap_or_default()
.to_string();
let metric_labels = result_obj["metric"].clone();
if let Some(values_array) = result_obj["values"].as_array() {
for value_pair in values_array {
if let (Some(time_str), Some(value_str)) =
(value_pair[0].as_i64(), value_pair[1].as_str())
{
if let (metric_time, Ok(metric_value)) =
(time_str, value_str.parse::<f64>())
let metric_name_filter = quals
.iter()
.find(|qual| qual.field == "metric_name" && qual.operator == "=");
if let Some(mut metric_name) = metric_name_filter
.map(|qual| PrometheusFdw::value_to_promql_string(&qual.value))
{
let metric_labels = result_obj["metric"].clone();
if let Some(values_array) = result_obj["values"].as_array() {
for value_pair in values_array {
if let (Some(time_str), Some(value_str)) =
(value_pair[0].as_i64(), value_pair[1].as_str())
{
let mut row = Row::new();
row.push(
"metric_name",
Some(Cell::String(metric_name.clone())),
);
row.push(
"metric_labels",
Some(Cell::Json(JsonB(metric_labels.clone()))),
);
row.push("metric_time", Some(Cell::I64(metric_time)));
row.push("metric_value", Some(Cell::F64(metric_value)));
result.push(row);
if let (metric_time, Ok(metric_value)) =
(time_str, value_str.parse::<f64>())
{
let mut row = Row::new();
row.push(
"metric_name",
Some(Cell::String(metric_name.clone())),
);
row.push(
"metric_labels",
Some(Cell::Json(JsonB(metric_labels.clone()))),
);
row.push("metric_time", Some(Cell::I64(metric_time)));
row.push("metric_value", Some(Cell::F64(metric_value)));
result.push(row);
}
}
}
}
Expand Down Expand Up @@ -119,7 +123,7 @@ impl PrometheusFdw {
let ret = format!(
"{}/api/v1/query_range?query={}&start={}&end={}&step={}",
self.base_url.as_ref().unwrap(),
metric_name,
encode(&metric_name),
lower_timestamp,
upper_timestamp,
step
Expand Down Expand Up @@ -196,8 +200,11 @@ impl ForeignDataWrapper for PrometheusFdw {
let body = self.rt.block_on(async { resp.text().await });
match body {
Ok(body) => {
// warning!("body: {}", body);
let json: JsonValue = serde_json::from_str(&body).unwrap();
result = resp_to_rows(&obj, &json);
// warning!("json: {}", json);
result = resp_to_rows(&obj, &json, &quals);
// warning!("result: {:#?}", result);
}
Err(e) => {
warning!("failed to get body: {}", e);
Expand All @@ -217,10 +224,9 @@ impl ForeignDataWrapper for PrometheusFdw {
fn iter_scan(&mut self, row: &mut Row) -> Option<()> {
if let Some(ref mut result) = self.scan_result {
if !result.is_empty() {
return result
.drain(0..1)
.last()
.map(|src_row| row.replace_with(src_row));
return result.drain(0..1).last().map(|src_row| {
row.replace_with(src_row);
});
}
}
None
Expand Down

0 comments on commit 3aeedc9

Please sign in to comment.