Skip to content

Commit

Permalink
refactor: 🎨 remove oncecell
Browse files Browse the repository at this point in the history
  • Loading branch information
Eason0729 committed Dec 4, 2023
1 parent 30d5760 commit ecfee38
Show file tree
Hide file tree
Showing 12 changed files with 44 additions and 130 deletions.
14 changes: 0 additions & 14 deletions backend/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ toml = "0.7.4"
prost-types = "0.11.9"
entity = { path = "./entity" }
chrono = "0.4.26"
sha256 = "1.3.0"
thiserror = "1.0.44"
ring = "^0.17"
lockfree = "0.5.1"
Expand Down
7 changes: 7 additions & 0 deletions backend/src/controller/submit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use futures::Future;
use leaky_bucket::RateLimiter;
use sea_orm::{ActiveModelTrait, ActiveValue, EntityTrait, QueryOrder};
use thiserror::Error;
use tokio::sync::OnceCell;
use tokio_stream::StreamExt;
use tonic::Status;
use uuid::Uuid;
Expand All @@ -26,6 +27,8 @@ use self::{
use super::code::Code;
use entity::*;

pub static SECRET: OnceCell<&'static str> = OnceCell::const_new();

struct Waker;

impl std::task::Wake for Waker {
Expand Down Expand Up @@ -118,6 +121,10 @@ pub struct SubmitController {

impl SubmitController {
pub async fn new(config: &GlobalConfig) -> Result<Self, Error> {
if let Some(secret) = &config.judger_secret {
let secret = Box::new(secret.clone()).leak();
SECRET.set(secret).unwrap();
};
Ok(SubmitController {
router: Router::new(&config.judger).await?,
pubsub: Arc::new(PubSub::default()),
Expand Down
54 changes: 1 addition & 53 deletions backend/src/controller/submit/pubsub.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use spin::mutex::Mutex;
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
// use std::pin::Pin;
// use std::task::Poll;
use std::{collections::HashMap, hash::Hash, sync::Arc};
use tokio::sync::broadcast::*;
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::{Stream, StreamExt};

Expand Down Expand Up @@ -64,27 +62,6 @@ where
M: Clone + Send + 'static,
I: Eq + Clone + Hash + Send + 'static,
{
pub fn stream(
self: &Arc<Self>,
mut stream: impl Stream<Item = M> + Unpin + Send + 'static,
id: I,
) {
let tx = {
let (tx, rx) = channel(16);
self.outgoing.lock().insert(id.clone(), rx);
tx
};

let self_ = self.clone();
tokio::spawn(async move {
while let Some(messenge) = stream.next().await {
if tx.send(messenge).is_err() {
log::trace!("PubSub: messege")
}
}
self_.outgoing.lock().remove(&id);
});
}
pub fn publish(self: &Arc<Self>, id: I) -> PubGuard<M, I> {
let (tx, rx) = channel(16);
self.outgoing.lock().insert(id.clone(), rx);
Expand All @@ -107,32 +84,3 @@ where
})
}
}

// pub struct SubStream<M>(BroadcastStream<Option<M>>);

// impl<M> Stream for SubStream<M>
// where
// M: 'static + Clone + Send,
// {
// type Item = M;

// fn poll_next(
// mut self: Pin<&mut Self>,
// cx: &mut std::task::Context<'_>,
// ) -> std::task::Poll<Option<Self::Item>> {
// let a = Pin::new(&mut self.0);
// if let Poll::Ready(x) = BroadcastStream::poll_next(a, cx) {
// if let Some(x) = x {
// if let Ok(x) = x {
// Poll::Ready(x)
// } else {
// Poll::Ready(None)
// }
// } else {
// Poll::Ready(None)
// }
// } else {
// Poll::Pending
// }
// }
// }
20 changes: 5 additions & 15 deletions backend/src/controller/submit/router.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// TODO: we need docker swarm in dns setup, so we need to accept dns round robin
use std::{
collections::{BTreeMap, VecDeque},
ops::DerefMut,
Expand All @@ -14,10 +15,10 @@ use uuid::Uuid;

use crate::{
grpc::judger::{judger_client::*, *},
init::config::{self, CONFIG},
init::config::{self},
};

use super::super::submit::Error;
use super::{super::submit::Error, SECRET};

const PIPELINE: usize = 8;
const JUDGER_QUE_MAX: usize = 16;
Expand All @@ -30,8 +31,7 @@ type AuthIntercept = JudgerClient<
>,
>;
fn auth_middleware(mut req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
let config = CONFIG.get().unwrap();
match &config.judger_secret {
match &SECRET.get() {
Some(secret) => {
let token: metadata::MetadataValue<_> = format!("basic {}", secret).parse().unwrap();
req.metadata_mut().insert("Authorization", token);
Expand Down Expand Up @@ -189,15 +189,6 @@ impl Router {
Ok(_) => upstreams.push(upstream),
}
}
// let futs: Box<dyn Future<Output = Arc<Upstream>>> = configs
// .iter()
// .map(|x| async {
// let upstream = Upstream::new(Arc::new(x.clone()));
// upstream.health_check().await;
// upstream
// })
// .collect();
// tokio::join!(futs);
if upstreams.is_empty() {
return Err(Error::JudgerUnavailable);
}
Expand All @@ -209,8 +200,7 @@ impl Router {
pub fn langs(&self) -> Vec<LangInfo> {
self.upstreams
.iter()
.map(|x| x.langs())
.flatten()
.flat_map(|x| x.langs())
.unique_by(|x| x.lang_uid.clone())
.collect()
}
Expand Down
22 changes: 6 additions & 16 deletions backend/src/endpoint/util/error.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::report_internal;

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Premission deny: `{0}`")]
Expand Down Expand Up @@ -27,19 +29,12 @@ impl From<Error> for tonic::Status {
log::debug!("Client request inaccessible resource, hint: {}", x);
tonic::Status::permission_denied(x)
}
Error::DBErr(x) => {
log::error!("{}", x);
#[cfg(feature = "unsecured-log")]
return tonic::Status::internal(format!("{}", x));
tonic::Status::unavailable("")
}
Error::DBErr(x) => report_internal!(error, "{}", x),
// all argument should be checked before processing,
// so this error is considered as internal error
Error::BadArgument(x) => {
log::warn!("Client sent invaild argument: payload.{}", x);
#[cfg(feature = "unsecured-log")]
return tonic::Status::invalid_argument(format!("Bad Argument {}", x));
tonic::Status::invalid_argument("")
log::debug!("Client sent invaild argument: payload.{}", x);
tonic::Status::invalid_argument(x)
}
Error::NotInPayload(x) => {
log::trace!("{} is not found in client payload", x);
Expand All @@ -63,12 +58,7 @@ impl From<Error> for tonic::Status {
"Invaild request_id(should be a client generated UUIDv4)",
)
}
Error::Unreachable(x) => {
log::error!("Function should be unreachable: {}", x);
#[cfg(feature = "unsecured-log")]
return tonic::Status::internal(format!("Function should be unreachable: {}", x));
tonic::Status::aborted("")
}
Error::Unreachable(x) => report_internal!(error, "{}", x),
}
}
}
12 changes: 4 additions & 8 deletions backend/src/init/config.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use std::{path::PathBuf, sync::Arc};

use serde::{Deserialize, Serialize};
use tokio::{fs, io::AsyncReadExt, sync::OnceCell};

pub static CONFIG: OnceCell<GlobalConfig> = OnceCell::const_new();
use tokio::{fs, io::AsyncReadExt};

static CONFIG_PATH: &str = "config/config.toml";

Expand Down Expand Up @@ -66,7 +64,7 @@ impl Default for Database {
}
}

pub async fn init() {
pub async fn init() -> GlobalConfig {
if fs::metadata(CONFIG_PATH).await.is_ok() {
let mut buf = Vec::new();
let mut config = fs::File::open(CONFIG_PATH)
Expand All @@ -76,14 +74,13 @@ pub async fn init() {
let config =
std::str::from_utf8(&buf).expect("Config file may container non-utf8 character");
let config: GlobalConfig = toml::from_str(config).unwrap();
CONFIG.set(config).ok();
config
} else {
println!("Unable to find {}, generating default config", CONFIG_PATH);
let config: GlobalConfig = toml::from_str("").unwrap();

let config_txt = toml::to_string(&config).unwrap();
fs::write(CONFIG_PATH, config_txt).await.unwrap();
CONFIG.set(config).ok();

println!(
"Config generated, please edit {} before restart",
Expand All @@ -96,11 +93,10 @@ pub async fn init() {

#[cfg(test)]
mod test {
use super::{init, CONFIG};
use super::init;

#[tokio::test]
async fn default() {
init().await;
assert!(CONFIG.get().is_some());
}
}
14 changes: 6 additions & 8 deletions backend/src/init/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@ use sea_orm::{ActiveModelTrait, ActiveValue, Database, DatabaseConnection};
use tokio::fs;
use tokio::sync::OnceCell;

use super::config::CONFIG;
use super::config::GlobalConfig;
use crate::controller::token::UserPermBytes;

pub static DB: OnceCell<DatabaseConnection> = OnceCell::const_new();

pub async fn init() {
let config = CONFIG.get().unwrap();
pub async fn init(config: &GlobalConfig) {
let uri = format!("sqlite://{}", config.database.path.clone());

match Database::connect(&uri).await {
Expand All @@ -24,7 +23,7 @@ pub async fn init() {
fs::File::create(PathBuf::from(config.database.path.clone()))
.await
.unwrap();
first_migration().await;
first_migration(config).await;

let db: DatabaseConnection = Database::connect(&uri).await.unwrap();

Expand All @@ -33,8 +32,7 @@ pub async fn init() {
}
}
}
fn hash(src: &str) -> Vec<u8> {
let config = CONFIG.get().unwrap();
fn hash(config: &GlobalConfig, src: &str) -> Vec<u8> {
digest::digest(
&digest::SHA256,
&[src.as_bytes(), config.database.salt.as_bytes()].concat(),
Expand All @@ -43,7 +41,7 @@ fn hash(src: &str) -> Vec<u8> {
.to_vec()
}

pub async fn first_migration() {
pub async fn first_migration(config: &GlobalConfig) {
let db = DB.get().unwrap();
let mut perm = UserPermBytes::default();

Expand All @@ -59,7 +57,7 @@ pub async fn first_migration() {
entity::user::ActiveModel {
permission: ActiveValue::Set(perm.0),
username: ActiveValue::Set("admin".to_owned()),
password: ActiveValue::Set(hash("admin")),
password: ActiveValue::Set(hash(config, "admin")),
..Default::default()
}
.save(db)
Expand Down
6 changes: 2 additions & 4 deletions backend/src/init/logger.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use tracing::Level;

use super::config::CONFIG;

pub fn init() {
let config = CONFIG.get().unwrap();
use super::config::GlobalConfig;

pub fn init(config: &GlobalConfig) {
let level = match config.log_level {
0 => Level::TRACE,
1 => Level::DEBUG,
Expand Down
11 changes: 7 additions & 4 deletions backend/src/init/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use self::config::GlobalConfig;

pub mod config;
pub mod db;
pub mod logger;

pub async fn new() {
config::init().await;
logger::init();
db::init().await;
pub async fn new() -> GlobalConfig {
let config = config::init().await;
logger::init(&config);
db::init(&config).await;
config
}
4 changes: 2 additions & 2 deletions backend/src/macro_tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ macro_rules! report_internal {
($level:ident,$pattern:literal) => {{
log::$level!($pattern);
tonic::Status::unknown("unknown error")
};};
}};
($level:ident,$pattern:literal, $error:expr) => {{
log::$level!($pattern, $error);
tonic::Status::unknown("unknown error")
};};
}};
}

#[macro_export]
Expand Down
Loading

0 comments on commit ecfee38

Please sign in to comment.