@@ -3,8 +3,6 @@ pub mod swarm;
3
3
4
4
use super :: Error ;
5
5
use std:: {
6
- collections:: VecDeque ,
7
- mem:: MaybeUninit ,
8
6
ops:: DerefMut ,
9
7
sync:: {
10
8
atomic:: { AtomicIsize , Ordering } ,
@@ -13,8 +11,8 @@ use std::{
13
11
time:: Duration ,
14
12
} ;
15
13
16
- use lockfree :: { map :: Map , queue :: Queue , set :: Set } ;
17
- use spin :: Mutex ;
14
+ use crossbeam_queue :: SegQueue ;
15
+ use dashmap :: { DashMap , DashSet } ;
18
16
use tonic:: { service:: Interceptor , * } ;
19
17
use uuid:: Uuid ;
20
18
@@ -24,7 +22,9 @@ use crate::{
24
22
} ;
25
23
26
24
// introduce routing layer error
27
-
25
+ // type Map<K, V> = Mutex<HashMap<K, V>>;
26
+ // type Queue<V> = Mutex<VecDeque<V>>;
27
+ // type Set<V> = Mutex<HashSet<V>>;
28
28
const HEALTHY_THRESHOLD : isize = 100 ;
29
29
type JudgerIntercept = JudgerClient <
30
30
service:: interceptor:: InterceptedService < transport:: Channel , BasicAuthInterceptor > ,
@@ -98,7 +98,9 @@ impl std::ops::Deref for ConnGuard {
98
98
impl Drop for ConnGuard {
99
99
fn drop ( & mut self ) {
100
100
self . upstream . healthy . fetch_add ( -2 , Ordering :: Acquire ) ;
101
- self . upstream . clients . push ( self . conn . take ( ) . unwrap ( ) ) ;
101
+ self . upstream
102
+ . clients
103
+ . push ( self . conn . take ( ) . unwrap ( ) ) ;
102
104
}
103
105
}
104
106
@@ -117,11 +119,11 @@ async fn discover<I: Routable + Send>(
117
119
} ;
118
120
let ( upstream, langs) = Upstream :: new ( detail) . await ?;
119
121
for ( uuid, lang) in langs. into_iter ( ) {
120
- router. langs . insert ( lang) . ok ( ) ;
122
+ router. langs . insert ( lang) ;
121
123
loop {
122
124
match router. routing_table . get ( & uuid) {
123
125
Some ( x) => {
124
- x. 1 . lock ( ) . push_back ( upstream. clone ( ) ) ;
126
+ x. push ( upstream. clone ( ) ) ;
125
127
break ;
126
128
}
127
129
None => {
@@ -139,17 +141,17 @@ async fn discover<I: Routable + Send>(
139
141
}
140
142
141
143
pub struct Router {
142
- routing_table : Map < Uuid , Mutex < VecDeque < Arc < Upstream > > > > ,
143
- pub langs : Set < LangInfo > ,
144
+ routing_table : DashMap < Uuid , SegQueue < Arc < Upstream > > > ,
145
+ pub langs : DashSet < LangInfo > ,
144
146
}
145
147
146
148
impl Router {
147
149
// skip because config contain basic auth secret
148
150
#[ tracing:: instrument( level = "debug" , skip_all) ]
149
151
pub async fn new ( config : Vec < JudgerConfig > ) -> Result < Arc < Self > , Error > {
150
152
let self_ = Arc :: new ( Self {
151
- routing_table : Map :: new ( ) ,
152
- langs : Set :: new ( ) ,
153
+ routing_table : DashMap :: default ( ) ,
154
+ langs : DashSet :: default ( ) ,
153
155
} ) ;
154
156
for config in config. into_iter ( ) {
155
157
match config. judger_type {
@@ -170,25 +172,19 @@ impl Router {
170
172
Ok ( self_)
171
173
}
172
174
pub async fn get ( & self , lang : & Uuid ) -> Result < ConnGuard , Error > {
173
- let queue = self
174
- . routing_table
175
- . get ( lang)
176
- . ok_or ( Error :: BadArgument ( "lang" ) ) ?;
177
- let ( uuid, queue) = queue. as_ref ( ) ;
178
175
179
- let mut queue = queue . lock ( ) ;
176
+ let queue = self . routing_table . get ( lang ) . ok_or ( Error :: BadArgument ( "lang" ) ) ? ;
180
177
181
178
loop {
182
- match queue. pop_front ( ) {
179
+ match queue. pop ( ) {
183
180
Some ( upstream) => {
184
181
if upstream. is_healthy ( ) {
185
- queue. push_back ( upstream. clone ( ) ) ;
186
- drop ( queue) ;
182
+ queue. push ( upstream. clone ( ) ) ;
187
183
return upstream. get ( ) . await ;
188
184
}
189
185
}
190
186
None => {
191
- self . routing_table . remove ( uuid ) ;
187
+ self . routing_table . remove ( lang ) ;
192
188
return Err ( Error :: BadArgument ( "lang" ) ) ;
193
189
}
194
190
}
@@ -199,7 +195,7 @@ impl Router {
199
195
// abstraction for pipelining
200
196
pub struct Upstream {
201
197
healthy : AtomicIsize ,
202
- clients : Queue < JudgerIntercept > ,
198
+ clients : SegQueue < JudgerIntercept > ,
203
199
connection : ConnectionDetail ,
204
200
}
205
201
@@ -221,9 +217,9 @@ impl Upstream {
221
217
result. push ( ( uuid, lang) ) ;
222
218
}
223
219
224
- let clients = Queue :: new ( ) ;
220
+ let clients = SegQueue :: default ( ) ;
225
221
clients. push ( client) ;
226
-
222
+
227
223
Ok ( (
228
224
Arc :: new ( Self {
229
225
healthy : AtomicIsize :: new ( HEALTHY_THRESHOLD ) ,
0 commit comments