Skip to content

Commit

Permalink
check and cache revoked tokens
Browse files Browse the repository at this point in the history
we should eventually store this in sqlite or something, but the list is
tiny so in mem is fine
  • Loading branch information
billyb2 committed Sep 17, 2024
1 parent 9514b8a commit 4a52825
Show file tree
Hide file tree
Showing 9 changed files with 97 additions and 23 deletions.
2 changes: 1 addition & 1 deletion .envrc
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# this is *not* the prod public key, obviously lol
export BIG_CENTRAL_URL="http://localhost:4000";
use flake;
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ tracing = { version = "0.1" }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
opentelemetry-otlp = { version = "0.15.0", features = ["http-proto", "reqwest", "reqwest-client", "reqwest-rustls", "trace", "tokio"] }
opentelemetry = "0.22"
reqwest = "0.11"
reqwest = { version = "0.11", features = ["json"] }
opentelemetry_sdk = { version = "0.22", features = ["rt-tokio", "trace"] }
tracing-opentelemetry = "0.23"

Expand All @@ -37,7 +37,8 @@ rcgen = "0.12"
warp = "0.3"
serde = "1"
serde_json = { version = "1", features = ["raw_value"] }
bytes = "1.6"
bytes = "1"
hex = "0.4"

[features]
s3 = ["rust-s3"]
Expand Down
20 changes: 5 additions & 15 deletions src/action.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
use anyhow::anyhow;
use bfsp::internal::ActionInfo;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use tracing::error_span;
use tracing::{error, Level};

use rand::Rng;

use crate::{chunk_db::ChunkDB, meta_db::MetaDB};
use crate::meta_db::MetaDB;

#[derive(Debug)]
enum Action {
Expand All @@ -35,10 +33,7 @@ impl TryFrom<String> for Action {
}
}

pub async fn check_run_actions_loop<M: MetaDB + 'static, C: ChunkDB + 'static>(
meta_db: Arc<M>,
chunk_db: Arc<C>,
) {
pub async fn check_run_actions_loop<M: MetaDB + 'static>(meta_db: Arc<M>) {
loop {
tracing::span!(Level::INFO, "run_current_actions");

Expand All @@ -49,10 +44,9 @@ pub async fn check_run_actions_loop<M: MetaDB + 'static, C: ChunkDB + 'static>(
Ok(actions) => {
for action_info in actions.into_iter() {
let meta_db = Arc::clone(&meta_db);
let chunk_db = Arc::clone(&chunk_db);

tokio::task::spawn(async move {
match run_action(Arc::clone(&meta_db), chunk_db, &action_info).await {
match run_action(Arc::clone(&meta_db), &action_info).await {
Ok(_) => {
let _ = meta_db.executed_action(action_info.id.unwrap()).await;
}
Expand All @@ -74,12 +68,8 @@ pub async fn check_run_actions_loop<M: MetaDB + 'static, C: ChunkDB + 'static>(
}
}

#[tracing::instrument(err, skip(meta_db, chunk_db))]
async fn run_action<M: MetaDB, C: ChunkDB>(
meta_db: Arc<M>,
chunk_db: Arc<C>,
action_info: &ActionInfo,
) -> anyhow::Result<()> {
#[tracing::instrument(err, skip(meta_db))]
async fn run_action<M: MetaDB>(meta_db: Arc<M>, action_info: &ActionInfo) -> anyhow::Result<()> {
let action: Action = action_info.action.clone().try_into()?;
let user_id = action_info.user_id;

Expand Down
8 changes: 7 additions & 1 deletion src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use biscuit_auth::{
};
use tracing::{event, Level};

use crate::meta_db::MetaDB;
use crate::{meta_db::MetaDB, tokens::check_token_revoked};

#[derive(Debug)]
pub enum Right {
Expand All @@ -20,6 +20,7 @@ pub enum Right {
Delete,
Usage,
Payment,
Settings,
}

impl Right {
Expand All @@ -31,6 +32,7 @@ impl Right {
Right::Delete => "delete",
Right::Usage => "usage",
Right::Payment => "payment",
Right::Settings => "settings",
}
}
}
Expand All @@ -42,6 +44,10 @@ pub async fn authorize<M: MetaDB>(
file_ids: Vec<String>,
meta_db: &M,
) -> anyhow::Result<i64> {
if check_token_revoked(token).await {
return Err(anyhow!("token is revoked"));
}

let user_id = get_user_id(token)?;

// first, check if the user has been suspended from the right they're trying to execute
Expand Down
Empty file removed src/io.rs
Empty file.
8 changes: 5 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod chunk_db;
mod internal;
mod meta_db;
mod tls;
mod tokens;

use action::check_run_actions_loop;
use anyhow::anyhow;
Expand Down Expand Up @@ -32,6 +33,7 @@ use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use tokens::refresh_revoked_tokens;
use tokio::{fs, io};
use tracing::{event, Level};
use tracing_opentelemetry::PreSampledTracer;
Expand Down Expand Up @@ -133,12 +135,11 @@ async fn main() -> Result<()> {
let chunk_db_clone = Arc::clone(&chunk_db);
let meta_db_clone = Arc::clone(&meta_db);

tokio::task::spawn(async move { refresh_revoked_tokens().await });
tokio::task::spawn(async move { chunk_db_clone.garbage_collect(meta_db_clone).await });

let chunk_db_clone = Arc::clone(&chunk_db);
let meta_db_clone = Arc::clone(&meta_db);

tokio::task::spawn(async move { check_run_actions_loop(meta_db_clone, chunk_db_clone).await });
tokio::task::spawn(async move { check_run_actions_loop(meta_db_clone).await });

let internal_tcp_addr = "[::]:9990".to_socket_addrs().unwrap().next().unwrap();

Expand Down Expand Up @@ -492,6 +493,7 @@ pub async fn handle_message<M: MetaDB + 'static, C: ChunkDB + 'static>(
.encode_to_vec(),
Err(_) => todo!(),
},
_ => todo!(),
}
.prepend_len())
}
Expand Down
2 changes: 1 addition & 1 deletion src/meta_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use bfsp::{
use serde::{Deserialize, Serialize};
use sqlx::{
types::{time::OffsetDateTime, Json},
Execute, Executor, PgPool, QueryBuilder, Row,
Executor, PgPool, QueryBuilder, Row,
};
use thiserror::Error;

Expand Down
74 changes: 74 additions & 0 deletions src/tokens.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use std::{collections::HashSet, env, sync::OnceLock, time::Duration};

use anyhow::anyhow;
use biscuit_auth::Biscuit;
use reqwest::StatusCode;
use tokio::sync::RwLock;

pub type RevocationIdentiifer = Vec<u8>;

static REVOKED_TOKENS: OnceLock<RwLock<HashSet<RevocationIdentiifer>>> = OnceLock::new();

#[tracing::instrument]
pub async fn refresh_revoked_tokens() {
let mut token_num: u32 = 0;
loop {
match single_update_revoked_tokens(token_num, 100).await {
Ok(tokens_inserted) => {
// we don't want to get too far behind, we we should keep iterating up til we can't
token_num += tokens_inserted;
if tokens_inserted > 0 {
continue;
}
}
Err(err) => {
tracing::error!(
token_num = token_num,
page_size = 100,
"Error updating revoked tokens: {err}"
);
}
}
tokio::time::sleep(Duration::from_secs(5 * 60)).await;
}
}

pub async fn check_token_revoked(token: &Biscuit) -> bool {
for identifier in token.revocation_identifiers().iter() {
let revoked_tokens = REVOKED_TOKENS.get_or_init(|| RwLock::new(HashSet::new()));
if revoked_tokens.read().await.contains(identifier.as_slice()) {
return true;
}
}

false
}

#[tracing::instrument(err)]
async fn single_update_revoked_tokens(token_num: u32, page_size: u32) -> anyhow::Result<u32> {
let revoked_tokens = REVOKED_TOKENS.get_or_init(|| RwLock::new(HashSet::new()));
let big_central_url = big_central_url();
let resp = reqwest::get(format!(
"{big_central_url}/api/v1/revoked_tokens?token_num={token_num}&page_size={page_size}"
))
.await?;

if resp.status() != StatusCode::OK {
return Err(anyhow!("{}", resp.text().await?));
}

let tokens: Vec<String> = resp.json().await?;
let num_tokens: u32 = tokens.len().try_into()?;

let revoked_tokens = &mut revoked_tokens.write().await;
for token in tokens.into_iter() {
let token: RevocationIdentiifer = hex::decode(token)?;
revoked_tokens.insert(token);
}

Ok(num_tokens)
}

fn big_central_url() -> String {
env::var("BIG_CENTRAL_URL").unwrap_or_else(|_| "https://bbfs.io".to_string())
}

0 comments on commit 4a52825

Please sign in to comment.