Skip to content

Commit

Permalink
feat: ✨ finish playground set
Browse files Browse the repository at this point in the history
  • Loading branch information
Eason0729 committed Dec 11, 2023
1 parent 5a3df55 commit 629dec1
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 24 deletions.
58 changes: 45 additions & 13 deletions backend/src/controller/judger/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
mod pubsub;
mod route;
use std::sync::{
atomic::{AtomicI64, Ordering},
Arc,

use std::{
pin::Pin,
sync::{
atomic::{AtomicI64, Ordering},
Arc,
},
};
use tokio_stream::{Stream, StreamExt, StreamMap};

use crate::{
grpc::TonicStream,
Expand All @@ -15,7 +20,6 @@ use leaky_bucket::RateLimiter;
use opentelemetry::{global, metrics::ObservableGauge};
use sea_orm::{ActiveModelTrait, ActiveValue, EntityTrait, QueryOrder};
use thiserror::Error;
use tokio_stream::StreamExt;
use tonic::Status;
use tracing::{instrument, Instrument, Span};
use uuid::Uuid;
Expand All @@ -35,16 +39,17 @@ use self::{
use super::code::Code;
use entity::*;

struct Waker;

impl std::task::Wake for Waker {
fn wake(self: Arc<Self>) {
log::error!("waker wake");
}
}
const PALYGROUND_TIME: u64 = 500 * 1000;
const PALYGROUND_MEM: u64 = 256 * 1024 * 1024;

macro_rules! check_rate_limit {
($s:expr) => {{
struct Waker;
impl std::task::Wake for Waker {
fn wake(self: Arc<Self>) {
log::error!("waker wake");
}
}
let waker = Arc::new(Waker).into();
let mut cx = std::task::Context::from_waker(&waker);

Expand Down Expand Up @@ -140,6 +145,12 @@ impl<'a> Drop for MeterGuard<'a> {
}
}

pub struct PlaygroundPayload {
pub input: Vec<u8>,
pub code: Vec<u8>,
pub lang: Uuid,
}

pub struct JudgerController {
router: Arc<Router>,
pubsub: Arc<PubSub<Result<SubmitStatus, Status>, i32>>,
Expand Down Expand Up @@ -309,9 +320,30 @@ impl JudgerController {
pub fn list_lang(&self) -> Vec<LangInfo> {
self.router.langs.iter().map(|x| x.clone()).collect()
}
pub async fn playground(&self) -> Result<TonicStream<PlaygroundResult>, Error> {
// endpoint should check uuid exist
pub async fn playground(
&self,
payload: PlaygroundPayload,
) -> Result<TonicStream<PlaygroundResult>, Error> {
check_rate_limit!(self);

todo!()
let mut conn = self.router.get(&payload.lang).await?;

let res = conn
.exec(ExecRequest {
lang_uid: payload.lang.to_string(),
code: payload.code,
memory: PALYGROUND_MEM,
time: PALYGROUND_TIME,
input: payload.input,
})
.await?;

conn.report_success();

Ok(Box::pin(
res.into_inner()
.map(|x| x.map(Into::<PlaygroundResult>::into)),
))
}
}
49 changes: 47 additions & 2 deletions backend/src/endpoint/playground.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
use super::endpoints::*;
use super::tools::*;

use crate::controller::judger::PlaygroundPayload;
use crate::grpc::backend;
use crate::grpc::backend::playground_set_server::*;
use crate::grpc::backend::*;
use crate::grpc::judger;
use crate::grpc::judger::exec_result;
use crate::grpc::judger::ExecResult;

const PLAYGROUND_CODE_LEN: usize = 32 * 1024;

#[async_trait]
impl PlaygroundSet for Arc<Server> {
Expand All @@ -12,8 +19,46 @@ impl PlaygroundSet for Arc<Server> {
#[instrument(skip_all, level = "debug")]
async fn run(
&self,
_req: Request<PlaygroundRequest>,
req: Request<PlaygroundRequest>,
) -> Result<Response<Self::RunStream>, Status> {
Err(Status::unimplemented("Not implemented"))
let (auth, req) = self.parse_request(req).await?;
let (user_id, _) = auth.ok_or_default()?;

tracing::debug!(user_id = user_id, "playground_start");

if req.code.len() > PLAYGROUND_CODE_LEN {
return Err(Error::BufferTooLarge("code").into());
}

let lang = Uuid::parse_str(&req.lang).map_err(Error::InvaildUUID)?;

Ok(Response::new(
self.judger
.playground(PlaygroundPayload {
input: req.input,
code: req.code,
lang,
})
.await?,
))
}
}

impl From<ExecResult> for PlaygroundResult {
fn from(value: ExecResult) -> Self {
PlaygroundResult {
result: Some(match value.result.unwrap() {
exec_result::Result::Output(x) => playground_result::Result::Output(x),
exec_result::Result::Log(x) => playground_result::Result::Compile(x.into()),
}),
}
}
}
impl From<judger::Log> for backend::Log {
fn from(value: judger::Log) -> Self {
backend::Log {
level: value.level,
msg: value.msg,
}
}
}
14 changes: 10 additions & 4 deletions backend/src/endpoint/submit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use crate::grpc::judger::LangInfo;
use entity::{submit::*, *};
use tokio_stream::wrappers::ReceiverStream;

const SUBMIT_CODE_LEN: usize = 32 * 1024;

impl Filter for Entity {
fn read_filter<S: QueryFilter + Send>(query: S, _: &Auth) -> Result<S, Error> {
Ok(query)
Expand Down Expand Up @@ -145,6 +147,10 @@ impl SubmitSet for Arc<Server> {
let (auth, req) = self.parse_request(req).await?;
let (user_id, _) = auth.ok_or_default()?;

if req.info.code.len() > SUBMIT_CODE_LEN {
return Err(Error::BufferTooLarge("info.code").into());
}

let lang = Uuid::parse_str(req.info.lang.as_str()).map_err(Into::<Error>::into)?;

let problem = problem::Entity::find_by_id(req.info.problem_id)
Expand Down Expand Up @@ -177,7 +183,7 @@ impl SubmitSet for Arc<Server> {
.build()
.unwrap();

let id = self.submit.submit(submit).await?;
let id = self.judger.submit(submit).await?;

tracing::debug!(id = id, "submit_created");
self.metrics.submit.add(1, &[]);
Expand Down Expand Up @@ -216,7 +222,7 @@ impl SubmitSet for Arc<Server> {
tracing::trace!(id = req.id);

Ok(Response::new(
self.submit.follow(req.id).await.unwrap_or_else(|| {
self.judger.follow(req.id).await.unwrap_or_else(|| {
Box::pin(ReceiverStream::new(tokio::sync::mpsc::channel(16).1))
as Self::FollowStream
}),
Expand Down Expand Up @@ -266,7 +272,7 @@ impl SubmitSet for Arc<Server> {
.build()
.unwrap();

self.submit.submit(rejudge).await?;
self.judger.submit(rejudge).await?;

self.dup.store(user_id, uuid, submit_id);

Expand All @@ -278,7 +284,7 @@ impl SubmitSet for Arc<Server> {

#[instrument(skip_all, level = "debug")]
async fn list_langs(&self, _: Request<()>) -> Result<Response<Self::ListLangsStream>, Status> {
let langs = self.submit.list_lang().into_iter().map(|x| Ok(x.into()));
let langs = self.judger.list_lang().into_iter().map(|x| Ok(x.into()));

Ok(Response::new(
Box::pin(tokio_stream::iter(langs)) as TonicStream<_>
Expand Down
1 change: 0 additions & 1 deletion backend/src/endpoint/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use crate::grpc::backend::*;

use entity::user;
use entity::user::*;
use opentelemetry::KeyValue;

impl Filter for Entity {
fn read_filter<S: QueryFilter + Send>(query: S, _: &Auth) -> Result<S, Error> {
Expand Down
5 changes: 5 additions & 0 deletions backend/src/endpoint/util/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ pub enum Error {
Unreachable(&'static str),
#[error("Number too large(or small)")]
NumberTooLarge,
#[error("Buffer `{0}` too large")]
BufferTooLarge(&'static str),
#[error("Already exist")]
AlreadyExist(&'static str),
}
Expand Down Expand Up @@ -62,6 +64,9 @@ impl From<Error> for tonic::Status {
}
Error::Unreachable(x) => report_internal!(error, "{}", x),
Error::NumberTooLarge => tonic::Status::invalid_argument("number too large"),
Error::BufferTooLarge(x) => {
tonic::Status::invalid_argument(format!("buffer {} too large", x))
}
Error::AlreadyExist(x) => {
tracing::trace!(hint = x, "entity_exist");
tonic::Status::already_exists(x)
Expand Down
4 changes: 2 additions & 2 deletions backend/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const MAX_FRAME_SIZE: u32 = 1024 * 1024 * 8;

pub struct Server {
pub token: Arc<token::TokenController>,
pub submit: Arc<judger::JudgerController>,
pub judger: Arc<judger::JudgerController>,
pub dup: duplicate::DupController,
pub crypto: crypto::CryptoController,
pub metrics: metrics::MetricsController,
Expand Down Expand Up @@ -72,7 +72,7 @@ impl Server {

Arc::new(Server {
token: token::TokenController::new(&span),
submit: Arc::new(submit.unwrap()),
judger: Arc::new(submit.unwrap()),
dup: duplicate::DupController::new(&span),
crypto: crypto::CryptoController::new(&config, &span),
metrics: metrics::MetricsController::new(&otel_guard.meter_provider),
Expand Down
4 changes: 2 additions & 2 deletions proto/backend.proto
Original file line number Diff line number Diff line change
Expand Up @@ -630,14 +630,14 @@ message PlaygroundRequest{
required bytes input=3;
}

message CompileLog{
message Log{
required uint32 level = 1;
required string msg = 2;
}

message PlaygroundResult{
oneof result{
CompileLog compile=1;
Log compile=1;
bytes output=2;
}
}
Expand Down

0 comments on commit 629dec1

Please sign in to comment.