Skip to content

Commit

Permalink
feat: ✨ improve judger memory protection, add exec rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
Eason0729 committed Nov 30, 2023
1 parent c0c9621 commit 06c5260
Show file tree
Hide file tree
Showing 6 changed files with 249 additions and 57 deletions.
158 changes: 112 additions & 46 deletions judger/src/grpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ use uuid::Uuid;
use crate::{
grpc::proto::prelude::judge_response,
init::config::CONFIG,
langs::{prelude::Error as LangError, prelude::*},
langs::{
prelude::{ArtifactFactory, CompileLog, Error as LangError},
RequestError,
},
};

use super::proto::prelude::{judger_server::Judger, *};
Expand All @@ -23,58 +26,72 @@ fn accuracy() -> u64 {
(1000 * 1000 / config.kernel.kernel_hz) as u64
}

impl From<LangError> for Result<JudgeResponse, Status> {
fn from(value: LangError) -> Self {
match value {
LangError::Internal(err) => {
log::warn!("{}", err);
#[cfg(debug_assertions)]
return Err(Status::with_details(
Code::Internal,
"Lanuage internal error: see debug info",
Bytes::from(format!("{}", err)),
));
#[cfg(not(debug_assertions))]
Err(Status::internal("See log for more details"))
}
LangError::BadRequest(err) => match err {
RequestError::LangNotFound(uid) => Err(Status::with_details(
Code::FailedPrecondition,
"language with such uuid does not exist on this judger",
Bytes::from(format!("lang_uid: {}", uid)),
)),
},
LangError::Report(res) => Ok(JudgeResponse {
task: Some(judge_response::Task::Result(JudgeResult {
status: res as i32,
time: 0,
memory: 0,
accuracy: accuracy(),
})),
}),
}
}
}

macro_rules! report {
($result:expr,$tx:expr) => {
match $result {
Ok(x) => x,
Err(err) => match err {
LangError::Internal(err) => {
log::warn!("{}", err);
#[cfg(debug_assertions)]
$tx.send(Err(Status::with_details(
Code::Internal,
"Lanuage internal error: see debug info",
Bytes::from(format!("{}", err)),
)))
.await
.ok();
#[cfg(not(debug_assertions))]
$tx.send(Err(Status::internal("See log for more details")))
.await
.ok();
return ();
}
LangError::BadRequest(err) => {
match err {
RequestError::LangNotFound(uid) => $tx
.send(Err(Status::with_details(
Code::FailedPrecondition,
"language with such uuid does not exist on this judger",
Bytes::from(format!("lang_uid: {}", uid)),
)))
.await
.ok(),
};
return ();
}
LangError::Report(res) => {
$tx.send(Ok(JudgeResponse {
task: Some(judge_response::Task::Result(JudgeResult {
status: res as i32,
time: 0,
memory: 0,
accuracy: accuracy(),
})),
}))
.await
.ok();
return ();
}
},
Err(err) => {
$tx.send(err.into()).await.ok();
return ();
}
}
};
}

macro_rules! resud {
($result:expr) => {
match $result {
Ok(x) => x,
Err(err) => {
log::trace!("{}", err);
return;
}
}
};
}

impl From<CompileLog<'_>> for Log {
fn from(value: CompileLog<'_>) -> Self {
Log {
level: value.level as u32,
msg: value.message.into_owned(),
}
}
}

// Adapter and abstraction for tonic to serve
// utilize artifact factory and other components(in module `langs``)
pub struct Server {
Expand Down Expand Up @@ -174,7 +191,6 @@ impl Judger for Server {
&'a self,
request: tonic::Request<()>,
) -> Result<Response<JudgeInfo>, Status> {
log::trace!("Query judger info");
let config = CONFIG.get().unwrap();

let (meta, _, _) = request.into_parts();
Expand All @@ -189,6 +205,56 @@ impl Judger for Server {
cpu_factor: config.platform.cpu_time_multiplier as f32,
}))
}

#[doc = " Server streaming response type for the Exec method."]
type ExecStream = Pin<Box<dyn futures::Stream<Item = Result<ExecResult, Status>> + Send>>;

#[instrument(skip_all, name = "grpc_exec")]
async fn exec(
&self,
req: tonic::Request<ExecRequest>,
) -> Result<Response<Self::ExecStream>, tonic::Status> {
let (meta, _, payload) = req.into_parts();
check_secret(&meta)?;

let (tx, rx) = mpsc::channel(2);

let factory = self.factory.clone();

let lang_uid = Uuid::parse_str(payload.lang_uid.as_str()).map_err(|e| {
log::warn!("Invalid uuid: {}", e);
Status::failed_precondition("Invalid uuid")
})?;

tokio::spawn(async move {
let input = payload.input;
let time = payload.time;
let memory = payload.memory;

let mut compiled = resud!(factory.compile(&lang_uid, &payload.code).await);

while let Some(x) = compiled
.to_log()
.map(|x| ExecResult {
result: Some(exec_result::Result::Log(x.into())),
})
.next()
{
resud!(tx.send(Ok(x)).await);
}

let exec = resud!(compiled.exec(&input, time, memory).await);

resud!(
tx.send(Ok(ExecResult {
result: Some(exec_result::Result::Output(exec.stdout().to_vec()))
}))
.await
);
});

Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
}
}

fn check_secret(meta: &metadata::MetadataMap) -> Result<(), Status> {
Expand Down
102 changes: 97 additions & 5 deletions judger/src/langs/artifact.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::borrow::Cow;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{collections::BTreeMap, path::Path};

Expand All @@ -11,7 +12,6 @@ use crate::{init::config::CONFIG, langs::RequestError};
use super::{spec::LangSpec, Error, InternalError};

static TRACING_ID: AtomicUsize = AtomicUsize::new(0);

// Artifact factory, load module from disk to compile code
// Rely on container daemon to create container
pub struct ArtifactFactory {
Expand Down Expand Up @@ -95,15 +95,19 @@ impl ArtifactFactory {

if !process.succeed() {
#[cfg(debug_assertions)]
log::debug!("stdout: {}", String::from_utf8_lossy(&process.stdout));
dbg!(process.status);
log::warn!("{}", process.status);
return Err(Error::Report(JudgerCode::Ce));
}

process.stdout.split(|x| *x == b'\n').for_each(|x| {
CompileLog::from_raw(x).log();
});

Ok(CompiledArtifact {
container,
spec,
tracing_id,
stdout: process.stdout,
})
}
}
Expand All @@ -117,11 +121,43 @@ impl Default for ArtifactFactory {
}
}
}

pub struct CompileLog<'a> {
pub level: usize,
pub message: Cow<'a, str>,
}

impl<'a> CompileLog<'a> {
pub fn from_raw(raw: &'a [u8]) -> Self {
let raw: Vec<&[u8]> = raw.splitn(2, |x| *x == b':').collect();
if raw.len() == 1 {
Self {
level: 4,
message: String::from_utf8_lossy(raw[0]),
}
} else {
Self {
level: String::from_utf8_lossy(raw[0]).parse().unwrap_or(4),
message: String::from_utf8_lossy(raw[1]),
}
}
}
pub fn log(&self) {
match self.level {
0 => log::trace!("{}", self.message),
1 => log::debug!("{}", self.message),
2 => log::info!("{}", self.message),
3 => log::warn!("{}", self.message),
_ => log::error!("{}", self.message),
}
}
}
// Wrapper for container which contain compiled program in its volume
pub struct CompiledArtifact<'a> {
container: Container<'a>,
spec: &'a LangSpec,
tracing_id: usize,
stdout: Vec<u8>,
}

impl<'a> CompiledArtifact<'a> {
Expand Down Expand Up @@ -164,8 +200,64 @@ impl<'a> CompiledArtifact<'a> {
tracing_id: self.tracing_id,
})
}
pub async fn exec(
&mut self,
input: &[u8],
time: u64,
memory: u64,
) -> Result<ExecResult, Error> {
log::trace!("Running program -trace:{}", self.tracing_id);
let mut limit = self.spec.judge_limit.clone().apply_platform();

limit.cpu_us *= time;
limit.user_mem *= memory;

let mut process = self
.container
.execute(
self.spec
.judge_args
.iter()
.map(|x| x.as_str())
.collect::<Vec<&str>>(),
limit,
)
.await?;

process.write_all(input).await?;

let process = process.wait().await?;

Ok(ExecResult {
process,
_tracing_id: self.tracing_id,
})
}

pub fn to_log(&self) -> impl Iterator<Item = CompileLog> {
self.stdout
.split(|&x| x == b'\n')
.map(CompileLog::from_raw)
}
}

pub struct ExecResult {
process: ExitProc,
_tracing_id: usize,
}

impl ExecResult {
pub fn time(&self) -> &CpuStatistics {
&self.process.cpu
}
pub fn mem(&self) -> &MemStatistics {
&self.process.mem
}
pub fn stdout(&self) -> &[u8] {
&self.process.stdout
}
}
// Wrapper for result of process(ended process)
// Wrapper for result of process(ended judge process)
// provide information about process's exitcode, resource usage, stdout, stderr
pub struct TaskResult {
process: ExitProc,
Expand Down Expand Up @@ -193,7 +285,7 @@ impl TaskResult {
pub fn assert(&self, input: &[u8], mode: JudgeMatchRule) -> bool {
let newline = b'\n';
let space = b' ';
log::trace!("Ssserting program -trace:{}", self.tracing_id);
log::trace!("Asserting program -trace:{}", self.tracing_id);
let stdout = &self.process.stdout;

match mode {
Expand Down
5 changes: 4 additions & 1 deletion judger/src/sandbox/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,22 +86,25 @@ impl RunningProc {
}

let (cpu, mem) = self.limiter.status().await;
let output_limit = config.platform.output_limit as u64;

let _memory_holder = self._memory_holder.downgrade(output_limit);
Ok(ExitProc {
status,
stdout: buf.to_vec(),
cpu,
mem,
_memory_holder,
})
}
}

#[derive(Debug)]
pub struct ExitProc {
pub status: ExitStatus,
pub stdout: Vec<u8>,
pub cpu: CpuStatistics,
pub mem: MemStatistics,
_memory_holder: MemoryPermit,
}

impl ExitProc {
Expand Down
5 changes: 5 additions & 0 deletions judger/src/sandbox/utils/semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ impl MemoryPermit {
counter: counter.clone(),
}
}
pub fn downgrade(mut self, target: u64) -> Self {
self.counter.deallocate(self.memory - target);
self.memory = target;
self
}
}

impl Drop for MemoryPermit {
Expand Down
Loading

0 comments on commit 06c5260

Please sign in to comment.