From 61b990e6692737203ff25c14143a432f953e24ea Mon Sep 17 00:00:00 2001 From: nullchinchilla Date: Sun, 11 Aug 2024 08:35:40 -0400 Subject: [PATCH] Add check_db_health function to BinderCoreV2 and refactor HTTP server to use Arc for core_v2; add health check endpoint; cleanup and refactor request handling logic --- src/bindercore_v2.rs | 6 ++ src/serve.rs | 140 +++++++++++++++++++++---------------------- 2 files changed, 74 insertions(+), 72 deletions(-) diff --git a/src/bindercore_v2.rs b/src/bindercore_v2.rs index adec4a8..2530e45 100644 --- a/src/bindercore_v2.rs +++ b/src/bindercore_v2.rs @@ -969,6 +969,12 @@ impl BinderCoreV2 { // txn.commit().await?; Ok(()) } + + /// Checks the health of the database connection by executing a simple query. + pub async fn check_db_health(&self) -> anyhow::Result<()> { + sqlx::query("SELECT 1").fetch_one(&self.postgres).await?; + Ok(()) + } } /// Verify a captcha. diff --git a/src/serve.rs b/src/serve.rs index ea29bdd..dd3c107 100644 --- a/src/serve.rs +++ b/src/serve.rs @@ -52,27 +52,12 @@ static FAILED_REQUESTS: AtomicU64 = AtomicU64::new(0); static TRUE_RPS: AtomicU32 = AtomicU32::new(150); pub async fn start_server(core_v2: BinderCoreV2, opt: Opt) -> anyhow::Result<()> { - // smolscale::spawn(async { - // loop { - // let fail_count = FAILED_REQUESTS.swap(0, Ordering::Relaxed); - // let current_rps = TRUE_RPS.load(Ordering::Relaxed); - // let next_rps = if fail_count > 0 { - // (current_rps * 19 / 20).max(100) - // } else { - // (current_rps + 20).min(BASE_RPS) - // }; - // TRUE_RPS.store(next_rps, Ordering::Relaxed); - // log::warn!("*** FAIL COUNT {fail_count}, current_rps {current_rps} ***"); - // smol::Timer::after(Duration::from_secs(2)).await; - // } - // }) - // .detach(); - let my_sk = core_v2.get_master_sk().await?; let statsd_client = Arc::new(statsd::Client::new(opt.statsd_addr, "geph4.binder").unwrap()); log::info!("NEW HTTP listening on {}", opt.listen_new); + let core_v2 = Arc::new(core_v2); let bcw = BinderCoreWrapper { - core_v2: Arc::new(core_v2), + core_v2: core_v2.clone(), melnode_cache: Cache::builder() .time_to_live(Duration::from_secs(5)) .build() @@ -80,74 +65,85 @@ pub async fn start_server(core_v2: BinderCoreV2, opt: Opt) -> anyhow::Result<()> }; let bcw = Arc::new(BinderService(bcw.clone())); let statsd_client = statsd_client.clone(); - let serve = warp::post() - .and(warp::body::content_length_limit(1024 * 512)) - .and(warp::body::bytes()) - .then(move |s: bytes::Bytes| { - let my_sk = my_sk.clone(); - let bcw = bcw.clone(); - let statsd_client = statsd_client.clone(); - - async move { - // let chunk = BASE_RPS / TRUE_RPS.load(Ordering::Relaxed); - // if GOVERNOR - // .check_n(NonZeroU32::new(chunk).unwrap()) - // .unwrap() - // .is_err() - // { - // // statsd_client.incr("GOVERNED"); - // // log::warn!("SLOWING DOWN"); - // return http::Response::builder() - // .status(http::StatusCode::TOO_MANY_REQUESTS) - // .body(Bytes::from_static(b"WAAY Too Many Requests")) - // .unwrap(); - // } - - let fallible = smolscale::spawn(async move { - let (decrypted, their_pk) = if let Ok(val) = run_blocking({ - let my_sk = my_sk.clone(); - move || box_decrypt(&s, my_sk.clone()) - }) - .await - { - val - } else { - return Ok(Bytes::new()); - }; + let serve = warp::get() + .and(warp::path::end()) + .then({ + let core_v2 = core_v2.clone(); + move || { + let core_v2 = core_v2.clone(); + async move { let start = Instant::now(); - let req: JrpcRequest = serde_json::from_slice(&decrypted)?; - statsd_client.incr(&req.method); - let method = req.method.clone(); - let resp = bcw.respond_raw(req).await; - - statsd_client.timer( - &format!("latencyv2.{}", method), - start.elapsed().as_secs_f64(), - ); - let resp = serde_json::to_vec(&resp)?; - let resp = run_blocking(move || box_encrypt(&resp, my_sk, their_pk)).await; - anyhow::Ok(resp) - }); - match fallible.await { - Ok(res) => http::Response::builder().body(res).unwrap(), - Err(e) => { - log::error!("error handling request: {:?}", e); - FAILED_REQUESTS.fetch_add(1, Ordering::Relaxed); + if let Err(err) = core_v2.check_db_health().await { http::Response::builder() .status(http::StatusCode::INTERNAL_SERVER_ERROR) .body(Bytes::copy_from_slice( - format!("internal server error: {:?}", e).as_bytes(), + format!("internal server error: {:?}", err).as_bytes(), + )) + .unwrap() + } else { + http::Response::builder() + .status(http::StatusCode::OK) + .body(Bytes::copy_from_slice( + format!("db latency: {:?}", start.elapsed()).as_bytes(), )) .unwrap() } } } - }); + }) + .or(warp::post() + .and(warp::body::content_length_limit(1024 * 512)) + .and(warp::body::bytes()) + .then(move |s: bytes::Bytes| { + let my_sk = my_sk.clone(); + let bcw = bcw.clone(); + let statsd_client = statsd_client.clone(); + + async move { + let fallible = smolscale::spawn(async move { + let (decrypted, their_pk) = if let Ok(val) = run_blocking({ + let my_sk = my_sk.clone(); + move || box_decrypt(&s, my_sk.clone()) + }) + .await + { + val + } else { + return Ok(Bytes::new()); + }; + let start = Instant::now(); + let req: JrpcRequest = serde_json::from_slice(&decrypted)?; + statsd_client.incr(&req.method); + let method = req.method.clone(); + let resp = bcw.respond_raw(req).await; + + statsd_client.timer( + &format!("latencyv2.{}", method), + start.elapsed().as_secs_f64(), + ); + let resp = serde_json::to_vec(&resp)?; + let resp = run_blocking(move || box_encrypt(&resp, my_sk, their_pk)).await; + anyhow::Ok(resp) + }); + match fallible.await { + Ok(res) => http::Response::builder().body(res).unwrap(), + Err(e) => { + log::error!("error handling request: {:?}", e); + FAILED_REQUESTS.fetch_add(1, Ordering::Relaxed); + http::Response::builder() + .status(http::StatusCode::INTERNAL_SERVER_ERROR) + .body(Bytes::copy_from_slice( + format!("internal server error: {:?}", e).as_bytes(), + )) + .unwrap() + } + } + } + })); warp::serve(serve).run(opt.listen_new).compat().await; Ok(()) } - #[derive(Clone)] struct BinderCoreWrapper { core_v2: Arc,