From b145680eac17749e36cb46bf45579860f4b3299d Mon Sep 17 00:00:00 2001 From: eason <30045503+Eason0729@users.noreply.github.com> Date: Wed, 20 Dec 2023 00:40:21 +0800 Subject: [PATCH] refactor: :recycle: refactor judger --- judger/src/{grpc/proto.rs => grpc.rs} | 0 judger/src/grpc/mod.rs | 3 -- judger/src/grpc/stage.rs | 7 ----- judger/src/langs/artifact.rs | 23 +++++++++++++-- judger/src/main.rs | 9 +++--- judger/src/{grpc => }/server.rs | 41 ++++++++++++++++----------- judger/src/test/grpc.rs | 2 +- judger/src/test/langs.rs | 2 +- 8 files changed, 51 insertions(+), 36 deletions(-) rename judger/src/{grpc/proto.rs => grpc.rs} (100%) delete mode 100644 judger/src/grpc/mod.rs delete mode 100644 judger/src/grpc/stage.rs rename judger/src/{grpc => }/server.rs (89%) diff --git a/judger/src/grpc/proto.rs b/judger/src/grpc.rs similarity index 100% rename from judger/src/grpc/proto.rs rename to judger/src/grpc.rs diff --git a/judger/src/grpc/mod.rs b/judger/src/grpc/mod.rs deleted file mode 100644 index 04a9002c..00000000 --- a/judger/src/grpc/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub mod proto; -pub mod server; -pub mod stage; diff --git a/judger/src/grpc/stage.rs b/judger/src/grpc/stage.rs deleted file mode 100644 index d21042d4..00000000 --- a/judger/src/grpc/stage.rs +++ /dev/null @@ -1,7 +0,0 @@ -// pub struct CompileOpt{ - -// } - -// pub struct Compile{ - -// } diff --git a/judger/src/langs/artifact.rs b/judger/src/langs/artifact.rs index c2c0f64e..00a5e4d4 100644 --- a/judger/src/langs/artifact.rs +++ b/judger/src/langs/artifact.rs @@ -3,7 +3,7 @@ use std::{collections::BTreeMap, path::Path}; use tokio::fs; use uuid::Uuid; -use crate::grpc::proto::prelude::*; +use crate::grpc::prelude::*; use crate::init::config::CONFIG; use crate::sandbox::prelude::*; @@ -107,12 +107,24 @@ impl Default for ArtifactFactory { } } +/// Log generate from language plugin pub struct CompileLog { pub level: usize, pub message: String, } impl CompileLog { + /// parse log from raw string, slient error(generate blank message) when malformatted + /// + /// according to plugin specification, log should be in following format + /// + /// ```text + /// 0:trace message + /// 1:debug message + /// 2:info message + /// 3:warn message + /// 4:error message + /// ```` pub fn from_raw(raw: &[u8]) -> Self { let raw: Vec<&[u8]> = raw.splitn(2, |x| *x == b':').collect(); Self { @@ -120,6 +132,7 @@ impl CompileLog { message: String::from_utf8_lossy(raw[1]).to_string(), } } + /// log it to the console pub fn log(&self) { match self.level { 0 => log::trace!("{}", self.message), @@ -131,13 +144,18 @@ impl CompileLog { } } } -// Wrapper for container which contain compiled program in its volume + +/// Wrapper for container which contain compiled program in its volume +/// +/// TODO: CompiledInner<'a> was actually derive from ExitProc, consider remove CompiledInner<'a> +/// and replace it with ExitProc pub enum CompiledArtifact<'a> { Fail(ExitProc), Success(CompiledInner<'a>), } impl<'a> CompiledArtifact<'a> { + /// get JudgerCode if the task is surely at state neither AC or WA pub fn get_expection(&self) -> Option { match self { CompiledArtifact::Fail(_) => Some(JudgerCode::Ce), @@ -289,6 +307,7 @@ impl TaskResult { } } impl TaskResult { + /// get JudgerCode if the task is surely at state neither AC or WA pub fn get_expection(&mut self) -> Option { match self { TaskResult::Fail(x) => Some(*x), diff --git a/judger/src/main.rs b/judger/src/main.rs index dd824eeb..f21181f2 100644 --- a/judger/src/main.rs +++ b/judger/src/main.rs @@ -1,11 +1,10 @@ use std::sync::Arc; -use crate::grpc::server::Server; -use grpc::proto::prelude::judger_server::JudgerServer; +use grpc::prelude::judger_server::JudgerServer; use init::config::CONFIG; -use tonic::transport; pub mod grpc; +pub mod server; pub mod init; pub mod langs; pub mod sandbox; @@ -20,9 +19,9 @@ async fn main() { log::info!("Server started"); - let server = Server::new().await; + let server = server::Server::new().await; - transport::Server::builder() + tonic::transport::Server::builder() .add_service(JudgerServer::new(Arc::new(server))) .serve(addr) .await diff --git a/judger/src/grpc/server.rs b/judger/src/server.rs similarity index 89% rename from judger/src/grpc/server.rs rename to judger/src/server.rs index 65162edc..47d6b22a 100644 --- a/judger/src/grpc/server.rs +++ b/judger/src/server.rs @@ -1,5 +1,3 @@ -// TODO: clean up imports -// TODO: error handling use std::{pin::Pin, sync::Arc}; use spin::Mutex; @@ -9,12 +7,12 @@ use tonic::{Response, Status}; use uuid::Uuid; use crate::{ - grpc::proto::prelude::judge_response, + grpc::prelude::judge_response, init::config::CONFIG, langs::prelude::{ArtifactFactory, CompileLog}, }; -use super::proto::prelude::{judger_server::Judger, *}; +use crate::grpc::prelude::{judger_server::Judger, *}; const PENDING_LIMIT: usize = 128; const STREAM_CHUNK: usize = 1024 * 16; @@ -44,7 +42,8 @@ fn parse_uid(uid: &str) -> Result { }) } -async fn force_stream(tx: &mut Sender>, item: T) -> Result<(), Status> { +/// forcely stream message, if client disconnect, there's no need to continue task +async fn force_send(tx: &mut Sender>, item: T) -> Result<(), Status> { match tx.send(Ok(item)).await { Ok(_) => Ok(()), Err(err) => { @@ -54,6 +53,7 @@ async fn force_stream(tx: &mut Sender>, item: T) -> Result< } } +/// check basic auth in request metadata(similar to http header) fn check_secret(req: tonic::Request) -> Result { let (meta, _, payload) = req.into_parts(); let config = CONFIG.get().unwrap(); @@ -83,14 +83,14 @@ impl From for JudgeResponse { } } -// Adapter and abstraction for tonic to serve -// utilize artifact factory and other components(in module `langs``) +/// Server to serve JudgerSet pub struct Server { factory: ArtifactFactory, running: Mutex, } impl Server { + /// init the server with global config(must be set beforehand) pub async fn new() -> Self { let config = CONFIG.get().unwrap(); let mut factory = ArtifactFactory::default(); @@ -98,10 +98,11 @@ impl Server { factory.load_dir(config.plugin.path.clone()).await; Self { - factory: factory, + factory, running: Mutex::new(PENDING_LIMIT), } } + /// check if pending jobs excess PENDING_LIMIT fn check_pending(self: &Arc) -> Result { let mut running = self.running.lock(); if *running > 0 { @@ -121,6 +122,7 @@ impl Drop for PendingGuard { } } +/// start each subtasks, it call stream because next subtask is run only if previous AC async fn judger_stream( factory: &ArtifactFactory, payload: JudgeRequest, @@ -137,7 +139,7 @@ async fn judger_stream( compile.log().for_each(|x| x.log()); if let Some(code) = compile.get_expection() { - force_stream( + force_send( tx, judge_response::Task::Result(JudgeResult { status: code.into(), @@ -150,7 +152,7 @@ async fn judger_stream( for (running_task, test) in payload.tests.into_iter().enumerate() { log::trace!("running at {} task", running_task); - force_stream( + force_send( tx, JudgeResponse { task: Some(judge_response::Task::Case(running_task.try_into().unwrap())), @@ -164,7 +166,7 @@ async fn judger_stream( if let Some(code) = result.get_expection() { log::trace!("yield result: {}", code); - force_stream( + force_send( tx, judge_response::Task::Result(JudgeResult { status: code.into(), @@ -189,7 +191,7 @@ async fn judger_stream( time, memory ); - force_stream( + force_send( tx, judge_response::Task::Result(JudgeResult { status: code.into(), @@ -204,6 +206,9 @@ async fn judger_stream( Ok(()) } +/// start compile and execute the program +/// +/// In future, we should stream the output to eliminate OLE async fn exec_stream( factory: &ArtifactFactory, payload: ExecRequest, @@ -216,11 +221,11 @@ async fn exec_stream( let mut compile = factory.compile(&lang, &payload.code).await?; for log in compile.log() { - force_stream(tx, log.into()).await?; + force_send(tx, log.into()).await?; } if let Some(_) = compile.get_expection() { - force_stream( + force_send( tx, CompileLog { level: 4, @@ -237,7 +242,7 @@ async fn exec_stream( .await?; if let Some(x) = judge.get_expection() { - force_stream( + force_send( tx, CompileLog { level: 4, @@ -248,7 +253,7 @@ async fn exec_stream( .await?; } else { for chunk in judge.process().unwrap().stdout.chunks(STREAM_CHUNK) { - force_stream( + force_send( tx, ExecResult { result: Some(exec_result::Result::Output(chunk.to_vec())), @@ -265,6 +270,7 @@ async fn exec_stream( impl Judger for Arc { type JudgeStream = Pin> + Send>>; + /// see judger.proto async fn judge( &self, req: tonic::Request, @@ -287,6 +293,7 @@ impl Judger for Arc { Ok(Response::new(Box::pin(ReceiverStream::new(rx)))) } + /// see judger.proto async fn judger_info(&self, req: tonic::Request<()>) -> Result, Status> { let config = CONFIG.get().unwrap(); check_secret(req)?; @@ -301,9 +308,9 @@ impl Judger for Arc { })) } - #[doc = " Server streaming response type for the Exec method."] type ExecStream = Pin> + Send>>; + /// see judger.proto async fn exec( &self, req: tonic::Request, diff --git a/judger/src/test/grpc.rs b/judger/src/test/grpc.rs index 61863332..bb8fdf18 100644 --- a/judger/src/test/grpc.rs +++ b/judger/src/test/grpc.rs @@ -5,7 +5,7 @@ // use tonic::transport::{self, Endpoint, Uri}; // use tower::service_fn; -// use crate::grpc::proto::prelude::{ +// use crate::grpc::prelude::{ // judge_response::Task, judger_client::JudgerClient, judger_server::JudgerServer, *, // }; // use crate::grpc::server::Server; diff --git a/judger/src/test/langs.rs b/judger/src/test/langs.rs index 68837c9d..fba590d4 100644 --- a/judger/src/test/langs.rs +++ b/judger/src/test/langs.rs @@ -1,7 +1,7 @@ use uuid::Uuid; use crate::{ - grpc::proto::prelude::JudgeMatchRule, init::config::CONFIG, langs::prelude::ArtifactFactory, + grpc::prelude::JudgeMatchRule, init::config::CONFIG, langs::prelude::ArtifactFactory, }; async fn rlua(factory: &mut ArtifactFactory) {