Skip to content

Commit

Permalink
feat: ✨ add ability to perform passive healthcheck to judger, service…
Browse files Browse the repository at this point in the history
… discovery with built-in dns for docker swarm setup
  • Loading branch information
Eason0729 committed Dec 6, 2023
1 parent 6068b1e commit bfa944c
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 16 deletions.
38 changes: 23 additions & 15 deletions backend/src/controller/judger/route/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ pub mod swarm;

use super::Error;
use std::{
collections::{BTreeMap, VecDeque},
collections::VecDeque,
ops::DerefMut,
sync::{
atomic::{AtomicIsize, Ordering},
Expand All @@ -15,7 +15,7 @@ use std::{

use lockfree::{map::Map, queue::Queue, set::Set};
use spin::Mutex;
use tonic::{service::Interceptor,*};
use tonic::{service::Interceptor, *};
use uuid::Uuid;

use crate::{
Expand All @@ -25,6 +25,7 @@ use crate::{

// introduce routing layer error

const HEALTHY_THRESHOLD: isize = 100;
type JudgerIntercept =
JudgerClient<service::interceptor::InterceptedService<transport::Channel, AuthInterceptor>>;

Expand Down Expand Up @@ -74,7 +75,8 @@ pub struct ConnGuard {

impl ConnGuard {
pub fn report_success(&mut self) {
self.upstream.healthy.fetch_add(2, Ordering::SeqCst);
self.upstream.healthy.fetch_add(3, Ordering::Acquire);
self.upstream.healthy.fetch_min(HEALTHY_THRESHOLD, Ordering::Acquire);
}
}

Expand All @@ -93,7 +95,7 @@ impl std::ops::Deref for ConnGuard {

impl Drop for ConnGuard {
fn drop(&mut self) {
self.upstream.healthy.fetch_add(-1, Ordering::Acquire);
self.upstream.healthy.fetch_add(-2, Ordering::Acquire);
self.upstream.clients.push(self.conn.take().unwrap());
}
}
Expand Down Expand Up @@ -171,24 +173,27 @@ impl Router {

let mut queue = queue.lock();

match queue.pop_front() {
Some(upstream) => {
queue.push_back(upstream.clone());
upstream.get().await
}
None => {
self.routing_table.remove(uuid);
Err(Error::BadArgument("lang"))
loop {
match queue.pop_front() {
Some(upstream) => {
if upstream.is_healthy() {
queue.push_back(upstream.clone());
drop(queue);
return upstream.get().await;
}
}
None => {
self.routing_table.remove(uuid);
return Err(Error::BadArgument("lang"));
}
}
}
}
}

// abstraction for pipelining
pub struct Upstream {
// healthy score
healthy: AtomicIsize,
// if transport layer fail, spawn a new one
clients: Queue<JudgerIntercept>,
connection: ConnectionDetail,
}
Expand All @@ -211,13 +216,16 @@ impl Upstream {
}
Ok((
Arc::new(Self {
healthy: AtomicIsize::new(20),
healthy: AtomicIsize::new(HEALTHY_THRESHOLD),
clients: Queue::new(),
connection: detail,
}),
result,
))
}
fn is_healthy(&self) -> bool {
self.healthy.load(Ordering::Acquire) > 0
}
async fn get(self: Arc<Self>) -> Result<ConnGuard, Error> {
let conn = match self.clients.pop() {
Some(x) => x,
Expand Down
2 changes: 1 addition & 1 deletion backend/src/controller/judger/route/swarm.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashSet, net::IpAddr, sync::Arc, time::Duration};
use std::{collections::HashSet, net::IpAddr, time::Duration};

use super::{ConnectionDetail, Error};
use crate::init::config;
Expand Down

0 comments on commit bfa944c

Please sign in to comment.