Skip to content

Commit

Permalink
Added storage caps for users
Browse files Browse the repository at this point in the history
Having limits on the amount of data that users can store is obviously
useful so users can pay for what they use up to a certain point,
restricting abusive users, etc. By default we don't have a limit unless
a user is given a specific one, which is done by adding it to a table.
Once we get some more billing work done, we could have big_central hit
the internal API

We also started counting file_metadata in chunk usage. The reason for
this is to prevent abuse. Any user can upload any data into
file_metadata, so ensuring that we check limits there is ideal. Once we
change chunk_metadata to encrypt some of the data, we should also count
that. In reality, this is a KB at *most* per file, so this isn't an
issue for non-abusive users
  • Loading branch information
billyb2 committed Jun 10, 2024
1 parent d30ecce commit b777fc9
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 10 deletions.
4 changes: 4 additions & 0 deletions migrations/20240531213904_storage_caps.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
CREATE TABLE storage_caps(
user_id bigint PRIMARY KEY,
max_bytes bigint NOT NULL
)
20 changes: 16 additions & 4 deletions src/internal.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::Arc;

use bfsp::internal::internal_file_server_message::Message;
use bfsp::internal::GetStorageCapResp;
/// The internal API
use bfsp::Message as ProtoMessage;
use bfsp::{
Expand All @@ -10,11 +11,10 @@ use bfsp::{
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tracing::{event, Level};
use wtransport::endpoint::IncomingSession;

use crate::meta_db::MetaDB;

#[tracing::instrument(skip(key))]
#[tracing::instrument(skip(key, msg))]
async fn handle_internal_message<M: MetaDB>(
meta_db: &M,
key: XChaCha20Poly1305,
Expand All @@ -33,6 +33,17 @@ async fn handle_internal_message<M: MetaDB>(
}
.encode_to_vec()
}
Message::GetStorageCap(query) => {
let user_ids = query.user_ids;
let caps = meta_db.storage_caps(&user_ids).await.unwrap();

GetStorageCapResp {
response: Some(bfsp::internal::get_storage_cap_resp::Response::StorageCaps(
bfsp::internal::get_storage_cap_resp::StorageCap { storage_caps: caps },
)),
}
.encode_to_vec()
}
}
}

Expand All @@ -47,7 +58,8 @@ pub async fn handle_internal_connection<M: MetaDB + 'static>(
let meta_db = Arc::clone(&meta_db);

let internal_private_key = internal_private_key.clone();
tokio::task::spawn(async move {
loop {
let internal_private_key = internal_private_key.clone();
event!(Level::INFO, "Waiting for message");

let len = read_sock.read_u32().await.unwrap();
Expand All @@ -64,5 +76,5 @@ pub async fn handle_internal_connection<M: MetaDB + 'static>(
handle_internal_message(meta_db.as_ref(), internal_private_key, enc_message).await;

write_sock.write_all(resp.as_slice()).await.unwrap();
});
}
}
24 changes: 22 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -556,14 +556,24 @@ async fn handle_upload_chunk<M: MetaDB + 'static, C: ChunkDB + 'static>(
let user_id = get_user_id(token).unwrap();

// 8MiB(?)
if chunk_metadata.size > 1024 * 1024 * 8 {
if chunk.len() > 1024 * 1024 * 8 {
todo!("Deny uploads larger than our max chunk size");
}

if chunk_metadata.nonce.len() != EncryptionNonce::len() {
todo!("Deny uploads with nonced_key != 32 bytes");
}

let storage_usages = meta_db.total_usages(&[user_id]).await.unwrap();
let storage_usage = *storage_usages.get(&user_id).unwrap();

let storage_caps = meta_db.storage_caps(&[user_id]).await.unwrap();
let storage_cap = *storage_caps.get(&user_id).unwrap();

if storage_usage + chunk.len() as u64 > storage_cap {
todo!("Deny uploads that exceed storage cap");
}

let chunk_id = ChunkID::try_from(chunk_metadata.id.as_str()).unwrap();

let meta_db = meta_db.clone();
Expand Down Expand Up @@ -637,8 +647,18 @@ pub async fn handle_upload_file_metadata<D: MetaDB>(
enc_file_meta: EncryptedFileMetadata,
) -> Result<(), UploadMetadataError> {
authorize(Right::Write, token, Vec::new(), Vec::new()).unwrap();

let user_id = get_user_id(token).unwrap();

let storage_usages = meta_db.total_usages(&[user_id]).await.unwrap();
let storage_usage = *storage_usages.get(&user_id).unwrap();

let storage_caps = meta_db.storage_caps(&[user_id]).await.unwrap();
let storage_cap = *storage_caps.get(&user_id).unwrap();

if storage_usage + enc_file_meta.metadata.len() as u64 > storage_cap {
todo!("Deny uploads that exceed storage cap");
}

meta_db
.insert_file_meta(enc_file_meta, user_id)
.await
Expand Down
84 changes: 80 additions & 4 deletions src/meta_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ pub trait MetaDB: Sized + Send + Sync + std::fmt::Debug {
meta_id: String,
user_id: i64,
) -> impl Future<Output = Result<()>> + Send;
fn storage_caps(
&self,
user_ids: &[i64],
) -> impl Future<Output = Result<HashMap<i64, u64>>> + Send;
}

#[derive(Debug)]
Expand Down Expand Up @@ -348,6 +352,33 @@ impl MetaDB for PostgresMetaDB {

#[tracing::instrument(err)]
async fn total_usages(&self, user_ids: &[i64]) -> Result<HashMap<i64, u64>> {
// get the size of all file metadatas
let mut query = QueryBuilder::new(
"select sum(length(encrypted_metadata))::bigint as sum, user_id from file_metadata",
);
if !user_ids.is_empty() {
query.push(" where user_id in (");
{
let mut separated = query.separated(",");
for id in user_ids {
separated.push(id.to_string());
}
}
query.push(")");
}
query.push(" group by user_id");
let query = query.build();
let rows = query.fetch_all(&self.pool).await?;
let mut usages: HashMap<i64, u64> = rows
.into_iter()
.map(|row| {
let sum: i64 = row.get("sum");
let user_id: i64 = row.get("user_id");

(user_id.try_into().unwrap(), sum.try_into().unwrap())
})
.collect();

let mut query =
QueryBuilder::new("select sum(chunk_size)::bigint as sum, user_id from chunks");

Expand All @@ -361,17 +392,62 @@ impl MetaDB for PostgresMetaDB {
}
query.push(")");
}
query.push(" group by user_id");
let query = query.build();
let rows = query.fetch_all(&self.pool).await?;

rows.into_iter().for_each(|row| {
let sum: i64 = row.get("sum");
let user_id: i64 = row.get("user_id");

if let Some(usage) = usages.get_mut(&user_id) {
let sum: u64 = sum.try_into().unwrap();
*usage += sum;
} else {
usages.insert(user_id.try_into().unwrap(), sum.try_into().unwrap());
}
});

Ok(usages)
}

#[tracing::instrument(err)]
async fn storage_caps(&self, user_ids: &[i64]) -> Result<HashMap<i64, u64>> {
let mut query = QueryBuilder::new("select max_bytes, user_id from storage_caps");

if !user_ids.is_empty() {
query.push(" where user_id in (");
{
let mut separated = query.separated(",");
for id in user_ids {
separated.push(id.to_string());
}
}
query.push(")");
}
query.push(" group by user_id");
let query = query.build();
let rows = query.fetch_all(&self.pool).await?;

Ok(rows
let mut caps: HashMap<i64, u64> = rows
.into_iter()
.map(|row| {
let sum = row.get("sum");
let storage_cap: i64 = row.get("sum");
let user_id: i64 = row.get("user_id");

(sum, user_id.try_into().unwrap())
(user_id.try_into().unwrap(), storage_cap.try_into().unwrap())
})
.collect())
.collect();

// 5 GiB
const DEFAULT_CAP: u64 = 5 * 1024 * 1024 * 1024;

user_ids.iter().for_each(|id| {
if !caps.contains_key(id) {
caps.insert(*id, DEFAULT_CAP);
}
});

Ok(caps)
}
}

0 comments on commit b777fc9

Please sign in to comment.