Skip to content

Commit

Permalink
Add check_db_health function to BinderCoreV2 and refactor HTTP server…
Browse files Browse the repository at this point in the history
… to use Arc for core_v2; add health check endpoint; cleanup and refactor request handling logic
  • Loading branch information
nullchinchilla committed Aug 11, 2024
1 parent 09cb0f1 commit 61b990e
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 72 deletions.
6 changes: 6 additions & 0 deletions src/bindercore_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -969,6 +969,12 @@ impl BinderCoreV2 {
// txn.commit().await?;
Ok(())
}

/// Checks the health of the database connection by executing a simple query.
pub async fn check_db_health(&self) -> anyhow::Result<()> {
sqlx::query("SELECT 1").fetch_one(&self.postgres).await?;
Ok(())
}
}

/// Verify a captcha.
Expand Down
140 changes: 68 additions & 72 deletions src/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,102 +52,98 @@ static FAILED_REQUESTS: AtomicU64 = AtomicU64::new(0);
static TRUE_RPS: AtomicU32 = AtomicU32::new(150);

pub async fn start_server(core_v2: BinderCoreV2, opt: Opt) -> anyhow::Result<()> {
// smolscale::spawn(async {
// loop {
// let fail_count = FAILED_REQUESTS.swap(0, Ordering::Relaxed);
// let current_rps = TRUE_RPS.load(Ordering::Relaxed);
// let next_rps = if fail_count > 0 {
// (current_rps * 19 / 20).max(100)
// } else {
// (current_rps + 20).min(BASE_RPS)
// };
// TRUE_RPS.store(next_rps, Ordering::Relaxed);
// log::warn!("*** FAIL COUNT {fail_count}, current_rps {current_rps} ***");
// smol::Timer::after(Duration::from_secs(2)).await;
// }
// })
// .detach();

let my_sk = core_v2.get_master_sk().await?;
let statsd_client = Arc::new(statsd::Client::new(opt.statsd_addr, "geph4.binder").unwrap());
log::info!("NEW HTTP listening on {}", opt.listen_new);
let core_v2 = Arc::new(core_v2);
let bcw = BinderCoreWrapper {
core_v2: Arc::new(core_v2),
core_v2: core_v2.clone(),
melnode_cache: Cache::builder()
.time_to_live(Duration::from_secs(5))
.build()
.into(),
};
let bcw = Arc::new(BinderService(bcw.clone()));
let statsd_client = statsd_client.clone();
let serve = warp::post()
.and(warp::body::content_length_limit(1024 * 512))
.and(warp::body::bytes())
.then(move |s: bytes::Bytes| {
let my_sk = my_sk.clone();
let bcw = bcw.clone();
let statsd_client = statsd_client.clone();

async move {
// let chunk = BASE_RPS / TRUE_RPS.load(Ordering::Relaxed);
// if GOVERNOR
// .check_n(NonZeroU32::new(chunk).unwrap())
// .unwrap()
// .is_err()
// {
// // statsd_client.incr("GOVERNED");
// // log::warn!("SLOWING DOWN");
// return http::Response::builder()
// .status(http::StatusCode::TOO_MANY_REQUESTS)
// .body(Bytes::from_static(b"WAAY Too Many Requests"))
// .unwrap();
// }

let fallible = smolscale::spawn(async move {
let (decrypted, their_pk) = if let Ok(val) = run_blocking({
let my_sk = my_sk.clone();
move || box_decrypt(&s, my_sk.clone())
})
.await
{
val
} else {
return Ok(Bytes::new());
};
let serve = warp::get()
.and(warp::path::end())
.then({
let core_v2 = core_v2.clone();
move || {
let core_v2 = core_v2.clone();
async move {
let start = Instant::now();
let req: JrpcRequest = serde_json::from_slice(&decrypted)?;
statsd_client.incr(&req.method);
let method = req.method.clone();
let resp = bcw.respond_raw(req).await;

statsd_client.timer(
&format!("latencyv2.{}", method),
start.elapsed().as_secs_f64(),
);
let resp = serde_json::to_vec(&resp)?;
let resp = run_blocking(move || box_encrypt(&resp, my_sk, their_pk)).await;
anyhow::Ok(resp)
});
match fallible.await {
Ok(res) => http::Response::builder().body(res).unwrap(),
Err(e) => {
log::error!("error handling request: {:?}", e);
FAILED_REQUESTS.fetch_add(1, Ordering::Relaxed);
if let Err(err) = core_v2.check_db_health().await {
http::Response::builder()
.status(http::StatusCode::INTERNAL_SERVER_ERROR)
.body(Bytes::copy_from_slice(
format!("internal server error: {:?}", e).as_bytes(),
format!("internal server error: {:?}", err).as_bytes(),
))
.unwrap()
} else {
http::Response::builder()
.status(http::StatusCode::OK)
.body(Bytes::copy_from_slice(
format!("db latency: {:?}", start.elapsed()).as_bytes(),
))
.unwrap()
}
}
}
});
})
.or(warp::post()
.and(warp::body::content_length_limit(1024 * 512))
.and(warp::body::bytes())
.then(move |s: bytes::Bytes| {
let my_sk = my_sk.clone();
let bcw = bcw.clone();
let statsd_client = statsd_client.clone();

async move {
let fallible = smolscale::spawn(async move {
let (decrypted, their_pk) = if let Ok(val) = run_blocking({
let my_sk = my_sk.clone();
move || box_decrypt(&s, my_sk.clone())
})
.await
{
val
} else {
return Ok(Bytes::new());
};
let start = Instant::now();
let req: JrpcRequest = serde_json::from_slice(&decrypted)?;
statsd_client.incr(&req.method);
let method = req.method.clone();
let resp = bcw.respond_raw(req).await;

statsd_client.timer(
&format!("latencyv2.{}", method),
start.elapsed().as_secs_f64(),
);
let resp = serde_json::to_vec(&resp)?;
let resp = run_blocking(move || box_encrypt(&resp, my_sk, their_pk)).await;
anyhow::Ok(resp)
});
match fallible.await {
Ok(res) => http::Response::builder().body(res).unwrap(),
Err(e) => {
log::error!("error handling request: {:?}", e);
FAILED_REQUESTS.fetch_add(1, Ordering::Relaxed);
http::Response::builder()
.status(http::StatusCode::INTERNAL_SERVER_ERROR)
.body(Bytes::copy_from_slice(
format!("internal server error: {:?}", e).as_bytes(),
))
.unwrap()
}
}
}
}));

warp::serve(serve).run(opt.listen_new).compat().await;
Ok(())
}

#[derive(Clone)]
struct BinderCoreWrapper {
core_v2: Arc<BinderCoreV2>,
Expand Down

0 comments on commit 61b990e

Please sign in to comment.