diff --git a/backend/src/controller/judger/route/mod.rs b/backend/src/controller/judger/route/mod.rs index c6bc7af9..0292a79d 100644 --- a/backend/src/controller/judger/route/mod.rs +++ b/backend/src/controller/judger/route/mod.rs @@ -4,7 +4,7 @@ pub mod swarm; use super::Error; use std::{ - collections::{BTreeMap, VecDeque}, + collections::VecDeque, ops::DerefMut, sync::{ atomic::{AtomicIsize, Ordering}, @@ -15,7 +15,7 @@ use std::{ use lockfree::{map::Map, queue::Queue, set::Set}; use spin::Mutex; -use tonic::{service::Interceptor,*}; +use tonic::{service::Interceptor, *}; use uuid::Uuid; use crate::{ @@ -25,6 +25,7 @@ use crate::{ // introduce routing layer error +const HEALTHY_THRESHOLD: isize = 100; type JudgerIntercept = JudgerClient>; @@ -74,7 +75,8 @@ pub struct ConnGuard { impl ConnGuard { pub fn report_success(&mut self) { - self.upstream.healthy.fetch_add(2, Ordering::SeqCst); + self.upstream.healthy.fetch_add(3, Ordering::Acquire); + self.upstream.healthy.fetch_min(HEALTHY_THRESHOLD, Ordering::Acquire); } } @@ -93,7 +95,7 @@ impl std::ops::Deref for ConnGuard { impl Drop for ConnGuard { fn drop(&mut self) { - self.upstream.healthy.fetch_add(-1, Ordering::Acquire); + self.upstream.healthy.fetch_add(-2, Ordering::Acquire); self.upstream.clients.push(self.conn.take().unwrap()); } } @@ -171,14 +173,19 @@ impl Router { let mut queue = queue.lock(); - match queue.pop_front() { - Some(upstream) => { - queue.push_back(upstream.clone()); - upstream.get().await - } - None => { - self.routing_table.remove(uuid); - Err(Error::BadArgument("lang")) + loop { + match queue.pop_front() { + Some(upstream) => { + if upstream.is_healthy() { + queue.push_back(upstream.clone()); + drop(queue); + return upstream.get().await; + } + } + None => { + self.routing_table.remove(uuid); + return Err(Error::BadArgument("lang")); + } } } } @@ -186,9 +193,7 @@ impl Router { // abstraction for pipelining pub struct Upstream { - // healthy score healthy: AtomicIsize, - // if transport layer fail, spawn a new one clients: Queue, connection: ConnectionDetail, } @@ -211,13 +216,16 @@ impl Upstream { } Ok(( Arc::new(Self { - healthy: AtomicIsize::new(20), + healthy: AtomicIsize::new(HEALTHY_THRESHOLD), clients: Queue::new(), connection: detail, }), result, )) } + fn is_healthy(&self) -> bool { + self.healthy.load(Ordering::Acquire) > 0 + } async fn get(self: Arc) -> Result { let conn = match self.clients.pop() { Some(x) => x, diff --git a/backend/src/controller/judger/route/swarm.rs b/backend/src/controller/judger/route/swarm.rs index 7be9feea..3e0205cb 100644 --- a/backend/src/controller/judger/route/swarm.rs +++ b/backend/src/controller/judger/route/swarm.rs @@ -1,4 +1,4 @@ -use std::{collections::HashSet, net::IpAddr, sync::Arc, time::Duration}; +use std::{collections::HashSet, net::IpAddr, time::Duration}; use super::{ConnectionDetail, Error}; use crate::init::config;