Skip to content

Commit d9e7a98

Browse files
committed
feat: 🚧 draft playground set
1 parent 06c5260 commit d9e7a98

File tree

8 files changed

+163
-10
lines changed

8 files changed

+163
-10
lines changed

backend/Cargo.lock

Lines changed: 25 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ base64 = "0.21.5"
2828
uuid = "1.5.0"
2929
tonic-web = "0.9.2"
3030
quick_cache = "0.4.0"
31+
itertools = "0.12.0"
32+
leaky-bucket = "1.0.1"
3133

3234
[dependencies.tokio-stream]
3335
version = "0.1.14"
@@ -43,7 +45,7 @@ features =["async-await", "log"]
4345

4446
[dependencies.tokio]
4547
version = "1.28.0"
46-
features =["macros", "rt-multi-thread", "full"]
48+
features =["macros", "rt-multi-thread", "full","time"]
4749

4850
[dependencies.sea-orm]
4951
version = "0.12.2"

backend/src/controller/submit.rs

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,20 @@
11
use std::sync::Arc;
22

3+
use futures::Future;
4+
use leaky_bucket::RateLimiter;
35
use sea_orm::{ActiveModelTrait, ActiveValue, EntityTrait, QueryOrder};
46
use thiserror::Error;
57
use tokio_stream::StreamExt;
68
use tonic::Status;
79
use uuid::Uuid;
810

911
use crate::{
12+
endpoint::util::{error, stream::TonicStream},
1013
grpc::{
11-
backend::{submit_status, JudgeResult as BackendResult, SubmitStatus},
12-
judger::{judge_response, JudgeRequest, JudgeResponse, JudgeResult, JudgerCode, TestIo},
14+
backend::{submit_status, JudgeResult as BackendResult, PlaygroundResult, SubmitStatus},
15+
judger::{
16+
judge_response, JudgeRequest, JudgeResponse, JudgeResult, JudgerCode, LangInfo, TestIo,
17+
},
1318
},
1419
init::{config::CONFIG, db::DB},
1520
};
@@ -21,7 +26,26 @@ use super::util::{
2126
};
2227
use entity::*;
2328

24-
type TonicStream<T> = std::pin::Pin<Box<dyn tokio_stream::Stream<Item = Result<T, Status>> + Send>>;
29+
struct Waker;
30+
31+
impl std::task::Wake for Waker {
32+
fn wake(self: Arc<Self>) {
33+
log::error!("waker wake");
34+
}
35+
}
36+
37+
macro_rules! check_rate_limit {
38+
($s:expr) => {{
39+
let waker = Arc::new(Waker).into();
40+
let mut cx = std::task::Context::from_waker(&waker);
41+
42+
let ac = $s.limiter.clone().acquire_owned(1);
43+
tokio::pin!(ac);
44+
if ac.as_mut().poll(&mut cx).is_pending() {
45+
return Err(Error::RateLimit);
46+
}
47+
}};
48+
}
2549

2650
#[derive(Debug, Error)]
2751
pub enum Error {
@@ -39,8 +63,8 @@ pub enum Error {
3963
Tonic(#[from] tonic::transport::Error),
4064
#[error("`{0}`")]
4165
Internal(&'static str),
42-
// #[error("judger tls error")]
43-
// TlsError,
66+
#[error("Rate limit exceeded")]
67+
RateLimit,
4468
}
4569

4670
impl From<Error> for super::Error {
@@ -58,6 +82,9 @@ impl From<Error> for super::Error {
5882
Error::Tonic(x) => super::Error::Tonic(x),
5983
Error::Internal(x) => super::Error::Internal(x),
6084
Error::GrpcReport(x) => super::Error::GrpcReport(x),
85+
Error::RateLimit => super::Error::GrpcReport(tonic::Status::resource_exhausted(
86+
"judge rate limit exceeded",
87+
)),
6188
}
6289
}
6390
}
@@ -93,10 +120,10 @@ impl From<JudgeResult> for SubmitStatus {
93120
}
94121
}
95122

96-
#[derive(Clone)]
97123
pub struct SubmitController {
98124
router: Arc<Router>,
99125
pubsub: Arc<PubSub<Result<SubmitStatus, Status>, i32>>,
126+
limiter: Arc<RateLimiter>,
100127
}
101128

102129
impl SubmitController {
@@ -105,6 +132,14 @@ impl SubmitController {
105132
Ok(SubmitController {
106133
router: Router::new(&config.judger).await?,
107134
pubsub: Arc::new(PubSub::default()),
135+
limiter: Arc::new(
136+
RateLimiter::builder()
137+
.max(25)
138+
.initial(10)
139+
.refill(2)
140+
.interval(std::time::Duration::from_millis(100))
141+
.build(),
142+
),
108143
})
109144
}
110145
async fn stream(
@@ -170,6 +205,7 @@ impl SubmitController {
170205
}
171206
}
172207
pub async fn submit(&self, submit: Submit) -> Result<i32, Error> {
208+
check_rate_limit!(self);
173209
let db = DB.get().unwrap();
174210

175211
let mut conn = self.router.get(&submit.lang).await?;
@@ -225,6 +261,13 @@ impl SubmitController {
225261
pub async fn follow(&self, submit_id: i32) -> Option<TonicStream<SubmitStatus>> {
226262
self.pubsub.subscribe(&submit_id)
227263
}
264+
pub fn list_lang(&self) -> Vec<LangInfo> {
265+
self.router.langs()
266+
}
267+
pub async fn playground(&self) -> Result<TonicStream<PlaygroundResult>, Error> {
268+
check_rate_limit!(self);
269+
todo!()
270+
}
228271
}
229272

230273
impl From<Error> for Status {

backend/src/controller/util/router.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::{
77
},
88
};
99

10+
use itertools::Itertools;
1011
use spin::{Mutex, RwLock};
1112
use tonic::*;
1213
use uuid::Uuid;
@@ -148,6 +149,9 @@ impl Upstream {
148149

149150
self_
150151
}
152+
fn langs(&self) -> Vec<LangInfo> {
153+
self.langs.read().values().cloned().collect()
154+
}
151155
async fn health_check(&self) -> Result<(), Error> {
152156
macro_rules! health {
153157
($e:expr) => {
@@ -208,6 +212,14 @@ impl Router {
208212
next_entry: AtomicUsize::new(0),
209213
}))
210214
}
215+
pub fn langs(&self) -> Vec<LangInfo> {
216+
self.upstreams
217+
.iter()
218+
.map(|x| x.langs())
219+
.flatten()
220+
.unique_by(|x| x.lang_uid.clone())
221+
.collect()
222+
}
211223
pub async fn get(&self, uid: &Uuid) -> Result<ConnGuard, Error> {
212224
let server_count = self.upstreams.len();
213225
for _ in 0..(server_count * 2 + 1) {

backend/src/endpoint/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
pub mod contest;
22
mod education;
3+
pub mod playground;
34
pub mod problem;
45
pub mod submit;
56
pub mod testcase;

backend/src/endpoint/playground.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
use super::endpoints::*;
2+
use super::tools::*;
3+
use super::util::stream::TonicStream;
4+
5+
use crate::grpc::backend::playground_set_server::*;
6+
use crate::grpc::backend::*;
7+
8+
#[async_trait]
9+
impl PlaygroundSet for Arc<Server> {
10+
#[doc = " Server streaming response type for the Run method."]
11+
type RunStream = TonicStream<PlaygroundResult>;
12+
13+
async fn run(
14+
&self,
15+
req: Request<PlaygroundRequest>,
16+
) -> Result<Response<Self::RunStream>, Status> {
17+
Err(Status::unimplemented("Not implemented"))
18+
}
19+
}

backend/src/endpoint/submit.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use crate::controller::util::code::Code;
88
use crate::grpc::backend::submit_set_server::*;
99
use crate::grpc::backend::StateCode as BackendCode;
1010
use crate::grpc::backend::*;
11+
use crate::grpc::judger::LangInfo;
1112

1213
use entity::{submit::*, *};
1314
use tokio_stream::wrappers::ReceiverStream;
@@ -249,4 +250,26 @@ impl SubmitSet for Arc<Server> {
249250

250251
Ok(Response::new(()))
251252
}
253+
254+
#[doc = " Server streaming response type for the ListLangs method."]
255+
type ListLangsStream = TonicStream<Language>;
256+
257+
async fn list_langs(&self, _: Request<()>) -> Result<Response<Self::ListLangsStream>, Status> {
258+
let langs = self.submit.list_lang().into_iter().map(|x| Ok(x.into()));
259+
260+
Ok(Response::new(
261+
Box::pin(tokio_stream::iter(langs)) as TonicStream<_>
262+
))
263+
}
264+
}
265+
266+
impl From<LangInfo> for Language {
267+
fn from(value: LangInfo) -> Self {
268+
Language {
269+
lang_uid: value.lang_uid,
270+
lang_name: value.lang_name,
271+
info: value.info,
272+
lang_ext: value.lang_ext,
273+
}
274+
}
252275
}

proto/backend.proto

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,13 @@ message RejudgeRequest {
152152
required string request_id = 2;
153153
}
154154

155+
message Language {
156+
required string lang_uid = 1;
157+
required string lang_name = 2;
158+
required string info = 3;
159+
required string lang_ext = 4;
160+
}
161+
155162
service SubmitSet {
156163
rpc List(ListRequest) returns (ListSubmitResponse);
157164
rpc Info(SubmitId) returns (SubmitInfo);
@@ -164,6 +171,8 @@ service SubmitSet {
164171
// are not guarantee to yield status
165172
rpc Follow(SubmitId) returns (stream SubmitStatus);
166173
rpc Rejudge(RejudgeRequest) returns (google.protobuf.Empty);
174+
175+
rpc ListLangs(google.protobuf.Empty) returns (stream Language);
167176
}
168177

169178
// Announcements
@@ -615,3 +624,25 @@ service TokenSet {
615624

616625
rpc Logout(google.protobuf.Empty) returns (google.protobuf.Empty);
617626
}
627+
628+
message PlaygroundRequest{
629+
required bytes code=1;
630+
required string lang=2;
631+
required bytes input=3;
632+
}
633+
634+
message CompileLog{
635+
required uint32 level = 1;
636+
required string msg = 2;
637+
}
638+
639+
message PlaygroundResult{
640+
oneof result{
641+
CompileLog compile=1;
642+
bytes output=2;
643+
}
644+
}
645+
646+
service PlaygroundSet{
647+
rpc Run(PlaygroundRequest) returns (stream PlaygroundResult);
648+
}

0 commit comments

Comments
 (0)