From b777fc9c07b5509dc09e560bfb9943168cbce61c Mon Sep 17 00:00:00 2001 From: Billy Batista Date: Sun, 9 Jun 2024 23:51:09 -0400 Subject: [PATCH] Added storage caps for users Having limits on the amount of data that users can store is obviously useful so users can pay for what they use up to a certain point, restricting abusive users, etc. By default we don't have a limit unless a user is given a specific one, which is done by adding it to a table. Once we get some more billing work done, we could have big_central hit the internal API We also started counting file_metadata in chunk usage. The reason for this is to prevent abuse. Any user can upload any data into file_metadata, so ensuring that we check limits there is ideal. Once we change chunk_metadata to encrypt some of the data, we should also count that. In reality, this is a KB at *most* per file, so this isn't an issue for non-abusive users --- migrations/20240531213904_storage_caps.sql | 4 ++ src/internal.rs | 20 ++++-- src/main.rs | 24 ++++++- src/meta_db.rs | 84 ++++++++++++++++++++-- 4 files changed, 122 insertions(+), 10 deletions(-) create mode 100644 migrations/20240531213904_storage_caps.sql diff --git a/migrations/20240531213904_storage_caps.sql b/migrations/20240531213904_storage_caps.sql new file mode 100644 index 0000000..6c0c47b --- /dev/null +++ b/migrations/20240531213904_storage_caps.sql @@ -0,0 +1,4 @@ +CREATE TABLE storage_caps( + user_id bigint PRIMARY KEY, + max_bytes bigint NOT NULL +) diff --git a/src/internal.rs b/src/internal.rs index bfc1815..c707c84 100644 --- a/src/internal.rs +++ b/src/internal.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use bfsp::internal::internal_file_server_message::Message; +use bfsp::internal::GetStorageCapResp; /// The internal API use bfsp::Message as ProtoMessage; use bfsp::{ @@ -10,11 +11,10 @@ use bfsp::{ use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpStream; use tracing::{event, Level}; -use wtransport::endpoint::IncomingSession; use crate::meta_db::MetaDB; -#[tracing::instrument(skip(key))] +#[tracing::instrument(skip(key, msg))] async fn handle_internal_message( meta_db: &M, key: XChaCha20Poly1305, @@ -33,6 +33,17 @@ async fn handle_internal_message( } .encode_to_vec() } + Message::GetStorageCap(query) => { + let user_ids = query.user_ids; + let caps = meta_db.storage_caps(&user_ids).await.unwrap(); + + GetStorageCapResp { + response: Some(bfsp::internal::get_storage_cap_resp::Response::StorageCaps( + bfsp::internal::get_storage_cap_resp::StorageCap { storage_caps: caps }, + )), + } + .encode_to_vec() + } } } @@ -47,7 +58,8 @@ pub async fn handle_internal_connection( let meta_db = Arc::clone(&meta_db); let internal_private_key = internal_private_key.clone(); - tokio::task::spawn(async move { + loop { + let internal_private_key = internal_private_key.clone(); event!(Level::INFO, "Waiting for message"); let len = read_sock.read_u32().await.unwrap(); @@ -64,5 +76,5 @@ pub async fn handle_internal_connection( handle_internal_message(meta_db.as_ref(), internal_private_key, enc_message).await; write_sock.write_all(resp.as_slice()).await.unwrap(); - }); + } } diff --git a/src/main.rs b/src/main.rs index a3d7dff..595c5a5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -556,7 +556,7 @@ async fn handle_upload_chunk( let user_id = get_user_id(token).unwrap(); // 8MiB(?) - if chunk_metadata.size > 1024 * 1024 * 8 { + if chunk.len() > 1024 * 1024 * 8 { todo!("Deny uploads larger than our max chunk size"); } @@ -564,6 +564,16 @@ async fn handle_upload_chunk( todo!("Deny uploads with nonced_key != 32 bytes"); } + let storage_usages = meta_db.total_usages(&[user_id]).await.unwrap(); + let storage_usage = *storage_usages.get(&user_id).unwrap(); + + let storage_caps = meta_db.storage_caps(&[user_id]).await.unwrap(); + let storage_cap = *storage_caps.get(&user_id).unwrap(); + + if storage_usage + chunk.len() as u64 > storage_cap { + todo!("Deny uploads that exceed storage cap"); + } + let chunk_id = ChunkID::try_from(chunk_metadata.id.as_str()).unwrap(); let meta_db = meta_db.clone(); @@ -637,8 +647,18 @@ pub async fn handle_upload_file_metadata( enc_file_meta: EncryptedFileMetadata, ) -> Result<(), UploadMetadataError> { authorize(Right::Write, token, Vec::new(), Vec::new()).unwrap(); - let user_id = get_user_id(token).unwrap(); + + let storage_usages = meta_db.total_usages(&[user_id]).await.unwrap(); + let storage_usage = *storage_usages.get(&user_id).unwrap(); + + let storage_caps = meta_db.storage_caps(&[user_id]).await.unwrap(); + let storage_cap = *storage_caps.get(&user_id).unwrap(); + + if storage_usage + enc_file_meta.metadata.len() as u64 > storage_cap { + todo!("Deny uploads that exceed storage cap"); + } + meta_db .insert_file_meta(enc_file_meta, user_id) .await diff --git a/src/meta_db.rs b/src/meta_db.rs index d20c411..7a1e37a 100644 --- a/src/meta_db.rs +++ b/src/meta_db.rs @@ -63,6 +63,10 @@ pub trait MetaDB: Sized + Send + Sync + std::fmt::Debug { meta_id: String, user_id: i64, ) -> impl Future> + Send; + fn storage_caps( + &self, + user_ids: &[i64], + ) -> impl Future>> + Send; } #[derive(Debug)] @@ -348,6 +352,33 @@ impl MetaDB for PostgresMetaDB { #[tracing::instrument(err)] async fn total_usages(&self, user_ids: &[i64]) -> Result> { + // get the size of all file metadatas + let mut query = QueryBuilder::new( + "select sum(length(encrypted_metadata))::bigint as sum, user_id from file_metadata", + ); + if !user_ids.is_empty() { + query.push(" where user_id in ("); + { + let mut separated = query.separated(","); + for id in user_ids { + separated.push(id.to_string()); + } + } + query.push(")"); + } + query.push(" group by user_id"); + let query = query.build(); + let rows = query.fetch_all(&self.pool).await?; + let mut usages: HashMap = rows + .into_iter() + .map(|row| { + let sum: i64 = row.get("sum"); + let user_id: i64 = row.get("user_id"); + + (user_id.try_into().unwrap(), sum.try_into().unwrap()) + }) + .collect(); + let mut query = QueryBuilder::new("select sum(chunk_size)::bigint as sum, user_id from chunks"); @@ -361,17 +392,62 @@ impl MetaDB for PostgresMetaDB { } query.push(")"); } + query.push(" group by user_id"); + let query = query.build(); + let rows = query.fetch_all(&self.pool).await?; + + rows.into_iter().for_each(|row| { + let sum: i64 = row.get("sum"); + let user_id: i64 = row.get("user_id"); + + if let Some(usage) = usages.get_mut(&user_id) { + let sum: u64 = sum.try_into().unwrap(); + *usage += sum; + } else { + usages.insert(user_id.try_into().unwrap(), sum.try_into().unwrap()); + } + }); + + Ok(usages) + } + + #[tracing::instrument(err)] + async fn storage_caps(&self, user_ids: &[i64]) -> Result> { + let mut query = QueryBuilder::new("select max_bytes, user_id from storage_caps"); + + if !user_ids.is_empty() { + query.push(" where user_id in ("); + { + let mut separated = query.separated(","); + for id in user_ids { + separated.push(id.to_string()); + } + } + query.push(")"); + } + query.push(" group by user_id"); let query = query.build(); let rows = query.fetch_all(&self.pool).await?; - Ok(rows + let mut caps: HashMap = rows .into_iter() .map(|row| { - let sum = row.get("sum"); + let storage_cap: i64 = row.get("sum"); let user_id: i64 = row.get("user_id"); - (sum, user_id.try_into().unwrap()) + (user_id.try_into().unwrap(), storage_cap.try_into().unwrap()) }) - .collect()) + .collect(); + + // 5 GiB + const DEFAULT_CAP: u64 = 5 * 1024 * 1024 * 1024; + + user_ids.iter().for_each(|id| { + if !caps.contains_key(id) { + caps.insert(*id, DEFAULT_CAP); + } + }); + + Ok(caps) } }