diff --git a/judger/src/grpc/server.rs b/judger/src/grpc/server.rs index f752f8f1..487a706d 100644 --- a/judger/src/grpc/server.rs +++ b/judger/src/grpc/server.rs @@ -11,7 +11,10 @@ use uuid::Uuid; use crate::{ grpc::proto::prelude::judge_response, init::config::CONFIG, - langs::{prelude::Error as LangError, prelude::*}, + langs::{ + prelude::{ArtifactFactory, CompileLog, Error as LangError}, + RequestError, + }, }; use super::proto::prelude::{judger_server::Judger, *}; @@ -23,58 +26,72 @@ fn accuracy() -> u64 { (1000 * 1000 / config.kernel.kernel_hz) as u64 } +impl From for Result { + fn from(value: LangError) -> Self { + match value { + LangError::Internal(err) => { + log::warn!("{}", err); + #[cfg(debug_assertions)] + return Err(Status::with_details( + Code::Internal, + "Lanuage internal error: see debug info", + Bytes::from(format!("{}", err)), + )); + #[cfg(not(debug_assertions))] + Err(Status::internal("See log for more details")) + } + LangError::BadRequest(err) => match err { + RequestError::LangNotFound(uid) => Err(Status::with_details( + Code::FailedPrecondition, + "language with such uuid does not exist on this judger", + Bytes::from(format!("lang_uid: {}", uid)), + )), + }, + LangError::Report(res) => Ok(JudgeResponse { + task: Some(judge_response::Task::Result(JudgeResult { + status: res as i32, + time: 0, + memory: 0, + accuracy: accuracy(), + })), + }), + } + } +} + macro_rules! report { ($result:expr,$tx:expr) => { match $result { Ok(x) => x, - Err(err) => match err { - LangError::Internal(err) => { - log::warn!("{}", err); - #[cfg(debug_assertions)] - $tx.send(Err(Status::with_details( - Code::Internal, - "Lanuage internal error: see debug info", - Bytes::from(format!("{}", err)), - ))) - .await - .ok(); - #[cfg(not(debug_assertions))] - $tx.send(Err(Status::internal("See log for more details"))) - .await - .ok(); - return (); - } - LangError::BadRequest(err) => { - match err { - RequestError::LangNotFound(uid) => $tx - .send(Err(Status::with_details( - Code::FailedPrecondition, - "language with such uuid does not exist on this judger", - Bytes::from(format!("lang_uid: {}", uid)), - ))) - .await - .ok(), - }; - return (); - } - LangError::Report(res) => { - $tx.send(Ok(JudgeResponse { - task: Some(judge_response::Task::Result(JudgeResult { - status: res as i32, - time: 0, - memory: 0, - accuracy: accuracy(), - })), - })) - .await - .ok(); - return (); - } - }, + Err(err) => { + $tx.send(err.into()).await.ok(); + return (); + } } }; } +macro_rules! resud { + ($result:expr) => { + match $result { + Ok(x) => x, + Err(err) => { + log::trace!("{}", err); + return; + } + } + }; +} + +impl From> for Log { + fn from(value: CompileLog<'_>) -> Self { + Log { + level: value.level as u32, + msg: value.message.into_owned(), + } + } +} + // Adapter and abstraction for tonic to serve // utilize artifact factory and other components(in module `langs``) pub struct Server { @@ -174,7 +191,6 @@ impl Judger for Server { &'a self, request: tonic::Request<()>, ) -> Result, Status> { - log::trace!("Query judger info"); let config = CONFIG.get().unwrap(); let (meta, _, _) = request.into_parts(); @@ -189,6 +205,56 @@ impl Judger for Server { cpu_factor: config.platform.cpu_time_multiplier as f32, })) } + + #[doc = " Server streaming response type for the Exec method."] + type ExecStream = Pin> + Send>>; + + #[instrument(skip_all, name = "grpc_exec")] + async fn exec( + &self, + req: tonic::Request, + ) -> Result, tonic::Status> { + let (meta, _, payload) = req.into_parts(); + check_secret(&meta)?; + + let (tx, rx) = mpsc::channel(2); + + let factory = self.factory.clone(); + + let lang_uid = Uuid::parse_str(payload.lang_uid.as_str()).map_err(|e| { + log::warn!("Invalid uuid: {}", e); + Status::failed_precondition("Invalid uuid") + })?; + + tokio::spawn(async move { + let input = payload.input; + let time = payload.time; + let memory = payload.memory; + + let mut compiled = resud!(factory.compile(&lang_uid, &payload.code).await); + + while let Some(x) = compiled + .to_log() + .map(|x| ExecResult { + result: Some(exec_result::Result::Log(x.into())), + }) + .next() + { + resud!(tx.send(Ok(x)).await); + } + + let exec = resud!(compiled.exec(&input, time, memory).await); + + resud!( + tx.send(Ok(ExecResult { + result: Some(exec_result::Result::Output(exec.stdout().to_vec())) + })) + .await + ); + }); + + Ok(Response::new(Box::pin(ReceiverStream::new(rx)))) + } } fn check_secret(meta: &metadata::MetadataMap) -> Result<(), Status> { diff --git a/judger/src/langs/artifact.rs b/judger/src/langs/artifact.rs index b28610c4..5f5b388b 100644 --- a/judger/src/langs/artifact.rs +++ b/judger/src/langs/artifact.rs @@ -1,3 +1,4 @@ +use std::borrow::Cow; use std::sync::atomic::{AtomicUsize, Ordering}; use std::{collections::BTreeMap, path::Path}; @@ -11,7 +12,6 @@ use crate::{init::config::CONFIG, langs::RequestError}; use super::{spec::LangSpec, Error, InternalError}; static TRACING_ID: AtomicUsize = AtomicUsize::new(0); - // Artifact factory, load module from disk to compile code // Rely on container daemon to create container pub struct ArtifactFactory { @@ -95,15 +95,19 @@ impl ArtifactFactory { if !process.succeed() { #[cfg(debug_assertions)] - log::debug!("stdout: {}", String::from_utf8_lossy(&process.stdout)); - dbg!(process.status); + log::warn!("{}", process.status); return Err(Error::Report(JudgerCode::Ce)); } + process.stdout.split(|x| *x == b'\n').for_each(|x| { + CompileLog::from_raw(x).log(); + }); + Ok(CompiledArtifact { container, spec, tracing_id, + stdout: process.stdout, }) } } @@ -117,11 +121,43 @@ impl Default for ArtifactFactory { } } } + +pub struct CompileLog<'a> { + pub level: usize, + pub message: Cow<'a, str>, +} + +impl<'a> CompileLog<'a> { + pub fn from_raw(raw: &'a [u8]) -> Self { + let raw: Vec<&[u8]> = raw.splitn(2, |x| *x == b':').collect(); + if raw.len() == 1 { + Self { + level: 4, + message: String::from_utf8_lossy(raw[0]), + } + } else { + Self { + level: String::from_utf8_lossy(raw[0]).parse().unwrap_or(4), + message: String::from_utf8_lossy(raw[1]), + } + } + } + pub fn log(&self) { + match self.level { + 0 => log::trace!("{}", self.message), + 1 => log::debug!("{}", self.message), + 2 => log::info!("{}", self.message), + 3 => log::warn!("{}", self.message), + _ => log::error!("{}", self.message), + } + } +} // Wrapper for container which contain compiled program in its volume pub struct CompiledArtifact<'a> { container: Container<'a>, spec: &'a LangSpec, tracing_id: usize, + stdout: Vec, } impl<'a> CompiledArtifact<'a> { @@ -164,8 +200,64 @@ impl<'a> CompiledArtifact<'a> { tracing_id: self.tracing_id, }) } + pub async fn exec( + &mut self, + input: &[u8], + time: u64, + memory: u64, + ) -> Result { + log::trace!("Running program -trace:{}", self.tracing_id); + let mut limit = self.spec.judge_limit.clone().apply_platform(); + + limit.cpu_us *= time; + limit.user_mem *= memory; + + let mut process = self + .container + .execute( + self.spec + .judge_args + .iter() + .map(|x| x.as_str()) + .collect::>(), + limit, + ) + .await?; + + process.write_all(input).await?; + + let process = process.wait().await?; + + Ok(ExecResult { + process, + _tracing_id: self.tracing_id, + }) + } + + pub fn to_log(&self) -> impl Iterator { + self.stdout + .split(|&x| x == b'\n') + .map(CompileLog::from_raw) + } +} + +pub struct ExecResult { + process: ExitProc, + _tracing_id: usize, +} + +impl ExecResult { + pub fn time(&self) -> &CpuStatistics { + &self.process.cpu + } + pub fn mem(&self) -> &MemStatistics { + &self.process.mem + } + pub fn stdout(&self) -> &[u8] { + &self.process.stdout + } } -// Wrapper for result of process(ended process) +// Wrapper for result of process(ended judge process) // provide information about process's exitcode, resource usage, stdout, stderr pub struct TaskResult { process: ExitProc, @@ -193,7 +285,7 @@ impl TaskResult { pub fn assert(&self, input: &[u8], mode: JudgeMatchRule) -> bool { let newline = b'\n'; let space = b' '; - log::trace!("Ssserting program -trace:{}", self.tracing_id); + log::trace!("Asserting program -trace:{}", self.tracing_id); let stdout = &self.process.stdout; match mode { diff --git a/judger/src/sandbox/process.rs b/judger/src/sandbox/process.rs index d3f91ad3..bc0e2363 100644 --- a/judger/src/sandbox/process.rs +++ b/judger/src/sandbox/process.rs @@ -86,22 +86,25 @@ impl RunningProc { } let (cpu, mem) = self.limiter.status().await; + let output_limit = config.platform.output_limit as u64; + let _memory_holder = self._memory_holder.downgrade(output_limit); Ok(ExitProc { status, stdout: buf.to_vec(), cpu, mem, + _memory_holder, }) } } -#[derive(Debug)] pub struct ExitProc { pub status: ExitStatus, pub stdout: Vec, pub cpu: CpuStatistics, pub mem: MemStatistics, + _memory_holder: MemoryPermit, } impl ExitProc { diff --git a/judger/src/sandbox/utils/semaphore.rs b/judger/src/sandbox/utils/semaphore.rs index f36d3047..a45c6d42 100644 --- a/judger/src/sandbox/utils/semaphore.rs +++ b/judger/src/sandbox/utils/semaphore.rs @@ -137,6 +137,11 @@ impl MemoryPermit { counter: counter.clone(), } } + pub fn downgrade(mut self, target: u64) -> Self { + self.counter.deallocate(self.memory - target); + self.memory = target; + self + } } impl Drop for MemoryPermit { diff --git a/judger/src/test/sandbox.rs b/judger/src/test/sandbox.rs index d818b53f..6a30e37b 100644 --- a/judger/src/test/sandbox.rs +++ b/judger/src/test/sandbox.rs @@ -47,7 +47,7 @@ async fn cgroup_cpu() { let process = container .execute( - vec!["/rlua-54", "violate","cpu"], + vec!["/rlua-54", "violate", "cpu"], Limit { cpu_us: 1000 * 1000 * 1000, rt_us: 1000 * 1000 * 1000, @@ -81,7 +81,7 @@ async fn network() { let process = container .execute( - vec!["/rlua-54", "violate","net"], + vec!["/rlua-54", "violate", "net"], Limit { cpu_us: 1000 * 1000 * 1000, rt_us: 1000 * 1000 * 1000, @@ -114,7 +114,7 @@ async fn memory() { let process = container .execute( - vec!["/rlua-54", "violate","mem"], + vec!["/rlua-54", "violate", "mem"], Limit { cpu_us: 1000 * 1000 * 1000, rt_us: 1000 * 1000 * 1000, @@ -147,7 +147,7 @@ async fn disk() { let process = container .execute( - vec!["/rlua-54", "violate","disk"], + vec!["/rlua-54", "violate", "disk"], Limit { cpu_us: 1000 * 1000 * 1000, rt_us: 1000 * 1000 * 1000, @@ -181,7 +181,7 @@ async fn syscall() { let process = container .execute( - vec!["/rlua-54", "violate","syscall"], + vec!["/rlua-54", "violate", "syscall"], Limit { cpu_us: 1000 * 1000 * 1000, rt_us: 1000 * 1000 * 1000, diff --git a/proto/judger.proto b/proto/judger.proto index d7faa184..eb93886b 100644 --- a/proto/judger.proto +++ b/proto/judger.proto @@ -16,6 +16,30 @@ message JudgeRequest { repeated TestIO tests = 6; } +message ExecRequest { + required string lang_uid = 1; + required bytes code = 2; + // memory limit in byte + required uint64 memory = 3; + // time limit in nanosecond + required uint64 time = 4; + // len must > 0 + required bytes input = 5; +} + +message Log{ + required uint32 level = 1; + required string msg = 2; +} + +message ExecResult { + oneof result { + // number of test case running(or finished) + bytes output = 1; + Log log = 2; + } +} + message TestIO { required bytes input = 1; required bytes output = 2; @@ -101,4 +125,6 @@ service Judger { rpc Judge(JudgeRequest) returns (stream JudgeResponse); // Get judger info, useful for getting supported language and load balancing rpc JudgerInfo(google.protobuf.Empty) returns (JudgeInfo); + // Execute the sandbox once, OLE also apply + rpc Exec(ExecRequest) returns (stream ExecResult); }