Skip to content

Commit

Permalink
Optimize server. In particular, avoid write lock when Users::load has…
Browse files Browse the repository at this point in the history
… cache hit
  • Loading branch information
serprex committed Jan 7, 2024
1 parent bea6a8b commit d8a05e5
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 73 deletions.
7 changes: 3 additions & 4 deletions src/rs/server/src/cardpool.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::collections::HashMap;

use fxhash::FxHashMap;
use serde::{Deserialize, Deserializer, Serialize, Serializer};

use crate::etgutil;

#[derive(Clone, Default, Debug)]
pub struct Cardpool(pub HashMap<i16, u16>);
pub struct Cardpool(pub FxHashMap<i16, u16>);

impl From<&Cardpool> for String {
fn from(pool: &Cardpool) -> String {
Expand All @@ -25,7 +24,7 @@ impl From<&Cardpool> for String {

impl From<&str> for Cardpool {
fn from(code: &str) -> Cardpool {
let mut pool = HashMap::<i16, u16>::new();
let mut pool = FxHashMap::<i16, u16>::default();
for chunk in code.as_bytes().chunks_exact(5) {
let count = etgutil::decode_count(&chunk[..2]);
let code = etgutil::decode_code(&chunk[2..]);
Expand Down
2 changes: 1 addition & 1 deletion src/rs/server/src/handleget.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ async fn handle_get_core(
} else if path.starts_with("/collection/") {
let name = &path["/collection/".len()..];
if let Ok(client) = pgpool.get().await {
if let Some(user) = users.write().await.load(&*client, name).await {
if let Some(user) = users.load(&*client, name).await {
let user = user.lock().await;
let pool = &user.data.pool;
let bound = &user.data.accountbound;
Expand Down
66 changes: 27 additions & 39 deletions src/rs/server/src/handlews.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ struct BzBidSell {
p: i16,
}

pub async fn broadcast<T>(socks: &AsyncSocks, val: &T)
pub async fn broadcast<T>(socks: &AsyncSocksInner, val: &T)
where
T: serde::Serialize,
{
Expand All @@ -72,9 +72,11 @@ pub struct Sock {
hide: bool,
}

pub type AsyncUsers = Arc<RwLock<Users>>;
pub type AsyncSocks = Arc<RwLock<HashMap<usize, Sock>>>;
pub type AsyncUserSocks = Arc<RwLock<HashMap<String, usize>>>;
pub type AsyncUsers = Arc<Users>;
pub type AsyncSocksInner = RwLock<HashMap<usize, Sock>>;
pub type AsyncSocks = Arc<AsyncSocksInner>;
pub type AsyncUserSocksInner = RwLock<HashMap<String, usize>>;
pub type AsyncUserSocks = Arc<AsyncUserSocksInner>;

fn sendmsg<T>(tx: &WsSender, val: &T)
where
Expand Down Expand Up @@ -145,7 +147,7 @@ where
}

async fn login_success(
usersocks: &AsyncUserSocks,
usersocks: &AsyncUserSocksInner,
tx: &WsSender,
sockid: usize,
user: &mut UserObject,
Expand Down Expand Up @@ -353,7 +355,7 @@ pub async fn handle_ws(
client.query_one("select id, auth from users where name = $1", &[&u]).await
{
if a == row.get::<usize, &str>(1) {
if let Some(user) = users.write().await.load(&*client, &u).await {
if let Some(user) = users.load(&*client, &u).await {
let rightsock = usersocks.read().await.get(&u) == Some(&sockid);
if !rightsock {
usersocks.write().await.insert(u.clone(), sockid);
Expand All @@ -377,8 +379,7 @@ pub async fn handle_ws(
}
AuthMessage::modresetpass { m } => {
if u == "serprex" {
let mut wusers = users.write().await;
if let Some(user) = wusers.load(&*client, &m).await {
if let Some(user) = users.load(&*client, &m).await {
let mut user = user.lock().await;
user.auth = String::new();
}
Expand Down Expand Up @@ -514,11 +515,8 @@ pub async fn handle_ws(
}
}
AuthMessage::logout => {
let mut wusers = users.write().await;
let mut wusersocks = usersocks.write().await;
wusersocks.remove(&u);
drop(wusersocks);
wusers.evict(&client, &u).await;
usersocks.write().await.remove(&u);
users.evict(&client, &u).await;
}
AuthMessage::delete => {
let params: &[&(dyn ToSql + Sync)] = &[&userid];
Expand All @@ -535,10 +533,9 @@ pub async fn handle_ws(
.is_ok()
{
trx.commit().await.ok();
let mut wusers = users.write().await;
let mut wusersocks = usersocks.write().await;
wusersocks.remove(&u);
wusers.remove(&u);
users.remove(&u).await;
}
}
}
Expand Down Expand Up @@ -628,9 +625,8 @@ pub async fn handle_ws(
}
}
AuthMessage::modarena { aname, won, lv } => {
let mut wusers = users.write().await;
if let Some(auserid) =
if let Some(other) = wusers.load(&*client, &aname).await {
if let Some(other) = users.load(&*client, &aname).await {
let mut other = other.lock().await;
other.data.gold += if won { 15 } else { 5 };
Some(other.id)
Expand Down Expand Up @@ -709,7 +705,7 @@ pub async fn handle_ws(
}
AuthMessage::setgold { t, g } => {
if role_check(UserRole::Codesmith, &tx, &client, userid).await {
if let Some(tgt) = users.write().await.load(&*client, &t).await {
if let Some(tgt) = users.load(&*client, &t).await {
let mut tgt = tgt.lock().await;
sendmsg(
&tx,
Expand All @@ -727,7 +723,7 @@ pub async fn handle_ws(
}
AuthMessage::addpool { t, pool, bound } => {
if role_check(UserRole::Codesmith, &tx, &client, userid).await {
if let Some(tgt) = users.write().await.load(&*client, &t).await {
if let Some(tgt) = users.load(&*client, &t).await {
let mut tgt = tgt.lock().await;
let curpool = if bound {
&mut tgt.data.accountbound
Expand Down Expand Up @@ -1032,8 +1028,7 @@ pub async fn handle_ws(
};
if let Some(foesockid) = usersocks.read().await.get(&f) {
if let Some(foesock) = socks.read().await.get(&foesockid) {
let mut wusers = users.write().await;
if let Some(foeuser) = wusers.load(&*client, &f).await {
if let Some(foeuser) = users.load(&*client, &f).await {
let foeuserid = foeuser.lock().await.id;
if let Ok(trx) = client.transaction().await {
trx.execute("delete from match_request mr1 where user_id = $1 and accepted", &[&userid]).await.ok();
Expand Down Expand Up @@ -1228,8 +1223,7 @@ pub async fn handle_ws(
if u != f {
if let Some(foesockid) = usersocks.read().await.get(&f) {
if let Some(foesock) = socks.read().await.get(&foesockid) {
let mut wusers = users.write().await;
if let Some(foeuser) = wusers.load(&*client, &f).await {
if let Some(foeuser) = users.load(&*client, &f).await {
sendmsg(
&foesock.tx,
&WsResponse::tradecanceled { u: &u },
Expand All @@ -1253,8 +1247,7 @@ pub async fn handle_ws(
}
AuthMessage::reloadtrade { f } => {
if u != f {
let mut wusers = users.write().await;
if let Some(foeuser) = wusers.load(&*client, &f).await {
if let Some(foeuser) = users.load(&*client, &f).await {
let foeuserid = foeuser.lock().await.id;
if let Ok(trade) = client.query_one(
"select cards, g from trade_request where user_id = $2 and for_user_id = $1", &[&userid, &foeuserid]).await {
Expand All @@ -1279,8 +1272,7 @@ pub async fn handle_ws(
if u != f {
if let Some(foesockid) = usersocks.read().await.get(&f) {
if let Some(foesock) = socks.read().await.get(&foesockid) {
let mut wusers = users.write().await;
if let Some(foeuser) = wusers.load(&*client, &f).await {
if let Some(foeuser) = users.load(&*client, &f).await {
let (mut user, mut foeuser) =
ordered_lock(&user, &foeuser).await;
if let Ok(trx) = client.transaction().await {
Expand Down Expand Up @@ -1773,12 +1765,11 @@ pub async fn handle_ws(
},
);
drop(user);
let mut wusers = users.write().await;
let rusersocks = usersocks.read().await;
let rsocks = socks.read().await;
for sell in sells {
{
if let Some(seller) = wusers.load(&trx, &sell.u).await {
if let Some(seller) = users.load(&trx, &sell.u).await {
let mut seller = seller.lock().await;
if sell.p > 0 {
let c = seller
Expand Down Expand Up @@ -2163,8 +2154,7 @@ pub async fn handle_ws(
},
);
} else {
let mut wusers = users.write().await;
let user = if let Some(user) = wusers.load(&*client, &u).await {
let user = if let Some(user) = users.load(&*client, &u).await {
user
} else {
let user = Arc::new(Mutex::new(UserObject {
Expand All @@ -2176,7 +2166,7 @@ pub async fn handle_ws(
algo: users::HASH_ALGO,
data: UserData { oracle: u32::MAX, ..Default::default() },
}));
wusers.insert(u.clone(), user.clone());
users.insert(u.clone(), user.clone()).await;
user
};
let mut user = user.lock().await;
Expand Down Expand Up @@ -2253,9 +2243,8 @@ pub async fn handle_ws(
.and_then(|v| v.as_str())
.unwrap_or(""),
);
let mut wusers = users.write().await;
if let Some(user) =
wusers.load(&*client, &name).await
users.load(&*client, &name).await
{
let mut user = user.lock().await;
user.auth = g.clone();
Expand Down Expand Up @@ -2287,10 +2276,9 @@ pub async fn handle_ws(
&mut client,
)
.await;
wusers.insert(
name,
Arc::new(Mutex::new(newuser)),
);
users
.insert(name, Arc::new(Mutex::new(newuser)))
.await;
}
} else {
sendmsg(
Expand Down Expand Up @@ -2389,7 +2377,7 @@ pub async fn handle_ws(
}
}
UserMessage::librarywant { f } => {
if let Some(user) = users.write().await.load(&*client, &f).await {
if let Some(user) = users.load(&*client, &f).await {
let user = user.lock().await;
let mut gold = user.data.gold;
let mut pool = user.data.pool.clone();
Expand Down
6 changes: 2 additions & 4 deletions src/rs/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,7 @@ async fn main() {
_ = interval.tick() => (),
}
if let Ok(client) = gcpgpool.get().await {
let mut users = gcusers.write().await;
let _ = tokio::join!(users
.store(&client, gcusersocks.clone(), gcsocks.clone()),
let _ = tokio::join!(gcusers.store(&client, &gcusersocks, &gcsocks),
client.execute("delete from trade_request where expire_at < now()", &[]),
client.execute(
"with expiredids (id) as (select id from games where expire_at < now()) \
Expand Down Expand Up @@ -228,7 +226,7 @@ async fn main() {
drop(closetx);
println!("Shutting down");
if let Ok(client) = sigintpgpool.get().await {
if !sigintusers.write().await.saveall(&client).await {
if !sigintusers.saveall(&client).await {
println!("Error while saving users");
}
} else {
Expand Down
48 changes: 23 additions & 25 deletions src/rs/server/src/users.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use bb8_postgres::tokio_postgres::types::{FromSql, ToSql};
use bb8_postgres::tokio_postgres::{types::Json, Client, GenericClient};
use ring::pbkdf2;
use serde::{Deserialize, Serialize};
use tokio::sync::Mutex;
use tokio::sync::{Mutex, RwLock};

use crate::cardpool::Cardpool;
use crate::handlews::{AsyncSocks, AsyncUserSocks};
use crate::handlews::{AsyncSocksInner, AsyncUserSocksInner};

#[derive(Clone, Copy, Debug, ToSql, FromSql)]
#[postgres(name = "userrole")]
Expand Down Expand Up @@ -114,19 +115,22 @@ impl UserObject {
pub type User = Arc<Mutex<UserObject>>;

#[derive(Default)]
pub struct Users(HashMap<String, (bool, User)>);
pub struct Users(RwLock<HashMap<String, (AtomicBool, User)>>);

impl Users {
pub async fn load<GC>(&mut self, client: &GC, name: &str) -> Option<User>
pub async fn load<GC>(&self, client: &GC, name: &str) -> Option<User>
where
GC: GenericClient,
{
if let Some(&mut (ref mut gc, ref user)) = self.0.get_mut(name) {
*gc = false;
if let Some((ref gc, ref user)) = self.0.read().await.get(name) {
gc.store(false, Ordering::Release);
Some(user.clone())
} else {
let mut wself = self.0.write().await;
if let Some(row) = client.query_opt("select u.id, u.auth, u.salt, u.iter, u.algo, ud.data from user_data ud join users u on u.id = ud.user_id where u.name = $1 and ud.type_id = 1", &[&name]).await.expect("Connection failed while loading user") {
let Json(userdata) = row.try_get::<usize, Json<UserData>>(5).expect("Invalid json for user");
let Ok(Json(userdata)) = row.try_get::<usize, Json<UserData>>(5) else {
panic!("Invalid json for user {}", name)
};
let namestr = name.to_string();
let userarc = Arc::new(Mutex::new(UserObject {
name: namestr.clone(),
Expand All @@ -137,24 +141,24 @@ impl Users {
algo: row.get::<usize, HashAlgo>(4),
data: userdata,
}));
self.insert(namestr, userarc.clone());
wself.insert(namestr, (AtomicBool::new(false), userarc.clone()));
Some(userarc)
} else {
None
}
}
}

pub fn insert(&mut self, name: String, user: User) {
self.0.insert(name, (false, user));
pub async fn insert(&self, name: String, user: User) {
self.0.write().await.insert(name, (AtomicBool::new(false), user));
}

pub fn remove(&mut self, name: &str) {
self.0.remove(name);
pub async fn remove(&self, name: &str) {
self.0.write().await.remove(name);
}

pub async fn evict(&mut self, client: &Client, name: &str) {
if let Some((_, user)) = self.0.remove(name) {
pub async fn evict(&self, client: &Client, name: &str) {
if let Some((_, user)) = self.0.write().await.remove(name) {
let user = user.lock().await;
client
.query(
Expand All @@ -166,9 +170,10 @@ impl Users {
}
}

pub async fn saveall(&mut self, client: &Client) -> bool {
pub async fn saveall(&self, client: &Client) -> bool {
let rself = self.0.read().await;
let mut queries = Vec::new();
for &(_, ref user) in self.0.values() {
for &(_, ref user) in rself.values() {
queries.push(async move {
let user = user.lock().await;
client
Expand All @@ -182,19 +187,12 @@ impl Users {
futures::future::join_all(queries).await.into_iter().all(|x| x.is_ok())
}

pub async fn store(&mut self, client: &Client, usersocks: AsyncUserSocks, socks: AsyncSocks) {
pub async fn store(&self, client: &Client, usersocks: &AsyncUserSocksInner, socks: &AsyncSocksInner) {
if self.saveall(client).await {
let mut usersocks = usersocks.write().await;
let socks = socks.read().await;
usersocks.retain(|_, v| socks.contains_key(v));
self.0.retain(|_, &mut (ref mut gc, _)| {
if *gc {
false
} else {
*gc = true;
true
}
});
self.0.write().await.retain(|_, (ref gc, _)| !gc.swap(true, Ordering::AcqRel));
}
}
}

0 comments on commit d8a05e5

Please sign in to comment.