Skip to content

Commit

Permalink
refactor: ♻️ refactor judger
Browse files Browse the repository at this point in the history
  • Loading branch information
Eason0729 committed Dec 19, 2023
1 parent 25dfcf7 commit b145680
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 36 deletions.
File renamed without changes.
3 changes: 0 additions & 3 deletions judger/src/grpc/mod.rs

This file was deleted.

7 changes: 0 additions & 7 deletions judger/src/grpc/stage.rs

This file was deleted.

23 changes: 21 additions & 2 deletions judger/src/langs/artifact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{collections::BTreeMap, path::Path};
use tokio::fs;
use uuid::Uuid;

use crate::grpc::proto::prelude::*;
use crate::grpc::prelude::*;
use crate::init::config::CONFIG;
use crate::sandbox::prelude::*;

Expand Down Expand Up @@ -107,19 +107,32 @@ impl Default for ArtifactFactory {
}
}

/// Log generate from language plugin
pub struct CompileLog {
pub level: usize,
pub message: String,
}

impl CompileLog {
/// parse log from raw string, slient error(generate blank message) when malformatted
///
/// according to plugin specification, log should be in following format
///
/// ```text
/// 0:trace message
/// 1:debug message
/// 2:info message
/// 3:warn message
/// 4:error message
/// ````
pub fn from_raw(raw: &[u8]) -> Self {
let raw: Vec<&[u8]> = raw.splitn(2, |x| *x == b':').collect();
Self {
level: String::from_utf8_lossy(raw[0]).parse().unwrap_or(4),
message: String::from_utf8_lossy(raw[1]).to_string(),
}
}
/// log it to the console
pub fn log(&self) {
match self.level {
0 => log::trace!("{}", self.message),
Expand All @@ -131,13 +144,18 @@ impl CompileLog {
}
}
}
// Wrapper for container which contain compiled program in its volume

/// Wrapper for container which contain compiled program in its volume
///
/// TODO: CompiledInner<'a> was actually derive from ExitProc, consider remove CompiledInner<'a>
/// and replace it with ExitProc
pub enum CompiledArtifact<'a> {
Fail(ExitProc),
Success(CompiledInner<'a>),
}

impl<'a> CompiledArtifact<'a> {
/// get JudgerCode if the task is surely at state neither AC or WA
pub fn get_expection(&self) -> Option<JudgerCode> {
match self {
CompiledArtifact::Fail(_) => Some(JudgerCode::Ce),
Expand Down Expand Up @@ -289,6 +307,7 @@ impl TaskResult {
}
}
impl TaskResult {
/// get JudgerCode if the task is surely at state neither AC or WA
pub fn get_expection(&mut self) -> Option<JudgerCode> {
match self {
TaskResult::Fail(x) => Some(*x),
Expand Down
9 changes: 4 additions & 5 deletions judger/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::sync::Arc;

use crate::grpc::server::Server;
use grpc::proto::prelude::judger_server::JudgerServer;
use grpc::prelude::judger_server::JudgerServer;
use init::config::CONFIG;
use tonic::transport;

pub mod grpc;
pub mod server;
pub mod init;
pub mod langs;
pub mod sandbox;
Expand All @@ -20,9 +19,9 @@ async fn main() {

log::info!("Server started");

let server = Server::new().await;
let server = server::Server::new().await;

transport::Server::builder()
tonic::transport::Server::builder()
.add_service(JudgerServer::new(Arc::new(server)))
.serve(addr)
.await
Expand Down
41 changes: 24 additions & 17 deletions judger/src/grpc/server.rs → judger/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
// TODO: clean up imports
// TODO: error handling
use std::{pin::Pin, sync::Arc};

use spin::Mutex;
Expand All @@ -9,12 +7,12 @@ use tonic::{Response, Status};
use uuid::Uuid;

use crate::{
grpc::proto::prelude::judge_response,
grpc::prelude::judge_response,
init::config::CONFIG,
langs::prelude::{ArtifactFactory, CompileLog},
};

use super::proto::prelude::{judger_server::Judger, *};
use crate::grpc::prelude::{judger_server::Judger, *};

const PENDING_LIMIT: usize = 128;
const STREAM_CHUNK: usize = 1024 * 16;
Expand Down Expand Up @@ -44,7 +42,8 @@ fn parse_uid(uid: &str) -> Result<Uuid, Status> {
})
}

async fn force_stream<T>(tx: &mut Sender<Result<T, Status>>, item: T) -> Result<(), Status> {
/// forcely stream message, if client disconnect, there's no need to continue task
async fn force_send<T>(tx: &mut Sender<Result<T, Status>>, item: T) -> Result<(), Status> {
match tx.send(Ok(item)).await {
Ok(_) => Ok(()),
Err(err) => {
Expand All @@ -54,6 +53,7 @@ async fn force_stream<T>(tx: &mut Sender<Result<T, Status>>, item: T) -> Result<
}
}

/// check basic auth in request metadata(similar to http header)
fn check_secret<T>(req: tonic::Request<T>) -> Result<T, Status> {
let (meta, _, payload) = req.into_parts();
let config = CONFIG.get().unwrap();
Expand Down Expand Up @@ -83,25 +83,26 @@ impl From<judge_response::Task> for JudgeResponse {
}
}

// Adapter and abstraction for tonic to serve
// utilize artifact factory and other components(in module `langs``)
/// Server to serve JudgerSet
pub struct Server {
factory: ArtifactFactory,
running: Mutex<usize>,
}

impl Server {
/// init the server with global config(must be set beforehand)
pub async fn new() -> Self {
let config = CONFIG.get().unwrap();
let mut factory = ArtifactFactory::default();

factory.load_dir(config.plugin.path.clone()).await;

Self {
factory: factory,
factory,
running: Mutex::new(PENDING_LIMIT),
}
}
/// check if pending jobs excess PENDING_LIMIT
fn check_pending(self: &Arc<Self>) -> Result<PendingGuard, Status> {
let mut running = self.running.lock();
if *running > 0 {
Expand All @@ -121,6 +122,7 @@ impl Drop for PendingGuard {
}
}

/// start each subtasks, it call stream because next subtask is run only if previous AC
async fn judger_stream(
factory: &ArtifactFactory,
payload: JudgeRequest,
Expand All @@ -137,7 +139,7 @@ async fn judger_stream(
compile.log().for_each(|x| x.log());

if let Some(code) = compile.get_expection() {
force_stream(
force_send(
tx,
judge_response::Task::Result(JudgeResult {
status: code.into(),
Expand All @@ -150,7 +152,7 @@ async fn judger_stream(

for (running_task, test) in payload.tests.into_iter().enumerate() {
log::trace!("running at {} task", running_task);
force_stream(
force_send(
tx,
JudgeResponse {
task: Some(judge_response::Task::Case(running_task.try_into().unwrap())),
Expand All @@ -164,7 +166,7 @@ async fn judger_stream(

if let Some(code) = result.get_expection() {
log::trace!("yield result: {}", code);
force_stream(
force_send(
tx,
judge_response::Task::Result(JudgeResult {
status: code.into(),
Expand All @@ -189,7 +191,7 @@ async fn judger_stream(
time,
memory
);
force_stream(
force_send(
tx,
judge_response::Task::Result(JudgeResult {
status: code.into(),
Expand All @@ -204,6 +206,9 @@ async fn judger_stream(
Ok(())
}

/// start compile and execute the program
///
/// In future, we should stream the output to eliminate OLE
async fn exec_stream(
factory: &ArtifactFactory,
payload: ExecRequest,
Expand All @@ -216,11 +221,11 @@ async fn exec_stream(
let mut compile = factory.compile(&lang, &payload.code).await?;

for log in compile.log() {
force_stream(tx, log.into()).await?;
force_send(tx, log.into()).await?;
}

if let Some(_) = compile.get_expection() {
force_stream(
force_send(
tx,
CompileLog {
level: 4,
Expand All @@ -237,7 +242,7 @@ async fn exec_stream(
.await?;

if let Some(x) = judge.get_expection() {
force_stream(
force_send(
tx,
CompileLog {
level: 4,
Expand All @@ -248,7 +253,7 @@ async fn exec_stream(
.await?;
} else {
for chunk in judge.process().unwrap().stdout.chunks(STREAM_CHUNK) {
force_stream(
force_send(
tx,
ExecResult {
result: Some(exec_result::Result::Output(chunk.to_vec())),
Expand All @@ -265,6 +270,7 @@ async fn exec_stream(
impl Judger for Arc<Server> {
type JudgeStream = Pin<Box<dyn futures::Stream<Item = Result<JudgeResponse, Status>> + Send>>;

/// see judger.proto
async fn judge(
&self,
req: tonic::Request<JudgeRequest>,
Expand All @@ -287,6 +293,7 @@ impl Judger for Arc<Server> {

Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
}
/// see judger.proto
async fn judger_info(&self, req: tonic::Request<()>) -> Result<Response<JudgeInfo>, Status> {
let config = CONFIG.get().unwrap();
check_secret(req)?;
Expand All @@ -301,9 +308,9 @@ impl Judger for Arc<Server> {
}))
}

#[doc = " Server streaming response type for the Exec method."]
type ExecStream = Pin<Box<dyn futures::Stream<Item = Result<ExecResult, Status>> + Send>>;

/// see judger.proto
async fn exec(
&self,
req: tonic::Request<ExecRequest>,
Expand Down
2 changes: 1 addition & 1 deletion judger/src/test/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
// use tonic::transport::{self, Endpoint, Uri};
// use tower::service_fn;

// use crate::grpc::proto::prelude::{
// use crate::grpc::prelude::{
// judge_response::Task, judger_client::JudgerClient, judger_server::JudgerServer, *,
// };
// use crate::grpc::server::Server;
Expand Down
2 changes: 1 addition & 1 deletion judger/src/test/langs.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use uuid::Uuid;

use crate::{
grpc::proto::prelude::JudgeMatchRule, init::config::CONFIG, langs::prelude::ArtifactFactory,
grpc::prelude::JudgeMatchRule, init::config::CONFIG, langs::prelude::ArtifactFactory,
};

async fn rlua(factory: &mut ArtifactFactory) {
Expand Down

0 comments on commit b145680

Please sign in to comment.