Skip to content

Commit

Permalink
return all actions for each user
Browse files Browse the repository at this point in the history
obviously the correct way to do this
  • Loading branch information
billyb2 committed Sep 2, 2024
1 parent f0691aa commit 4620ebb
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 42 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 16 additions & 10 deletions src/internal.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use bfsp::internal::get_action_resp::ActionsPerUser;
use bfsp::internal::internal_file_server_message::{DeleteQueuedAction, Message};
use bfsp::internal::get_queued_action_resp::{Actions, ActionsPerUser};
use bfsp::internal::internal_file_server_message::Message;
use bfsp::internal::{
get_action_resp, queue_action_resp, DeleteQueuedActionResp, GetActionResp, GetStorageCapResp,
GetSuspensionsResp, QueueActionResp, SuspendUsersResp,
get_queued_action_resp, queue_action_resp, DeleteQueuedActionResp, GetQueuedActionResp,
GetStorageCapResp, GetSuspensionsResp, QueueActionResp, SuspendUsersResp,
};
use bfsp::{
chacha20poly1305::XChaCha20Poly1305,
Expand Down Expand Up @@ -86,12 +86,18 @@ async fn handle_internal_message<M: MetaDB>(
}
.encode_to_vec()
}
Message::GetAction(args) => {
Message::GetQueuedActions(args) => {
let user_ids: HashSet<i64> = args.user_ids.into_iter().collect();
let actions = meta_db.get_actions_for_users(user_ids).await.unwrap();

GetActionResp {
response: Some(get_action_resp::Response::Actions(ActionsPerUser {
let actions: HashMap<i64, Actions> = meta_db
.get_actions_for_users(user_ids)
.await
.unwrap()
.into_iter()
.map(|(user_id, actions)| (user_id, Actions { actions }))
.collect();

GetQueuedActionResp {
response: Some(get_queued_action_resp::Response::Actions(ActionsPerUser {
action_info: actions,
})),
}
Expand Down
4 changes: 2 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ async fn main() -> Result<()> {
}
}

#[tracing::instrument(err, skip(bytes))]
#[tracing::instrument(err, skip(bytes, meta_db, chunk_db))]
async fn handle_http_connection<M: MetaDB + 'static, C: ChunkDB + 'static>(
bytes: Bytes,
meta_db: Arc<M>,
Expand All @@ -267,7 +267,7 @@ async fn handle_http_connection<M: MetaDB + 'static, C: ChunkDB + 'static>(
.unwrap())
}

#[tracing::instrument(skip(incoming_session, public_key))]
#[tracing::instrument(skip(incoming_session, public_key, meta_db, chunk_db))]
async fn handle_connection<M: MetaDB + 'static, C: ChunkDB + 'static>(
incoming_session: IncomingSession,
public_key: PublicKey,
Expand Down
60 changes: 31 additions & 29 deletions src/meta_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ pub trait MetaDB: Sized + Send + Sync + std::fmt::Debug {
fn get_actions_for_users(
&self,
user_ids: HashSet<i64>,
) -> impl Future<Output = Result<HashMap<i64, ActionInfo>>> + Send;
) -> impl Future<Output = Result<HashMap<i64, Vec<ActionInfo>>>> + Send;
}

#[derive(Debug)]
Expand Down Expand Up @@ -778,51 +778,53 @@ FROM
async fn get_actions_for_users(
&self,
user_ids: HashSet<i64>,
) -> Result<HashMap<i64, ActionInfo>> {
let mut query =
QueryBuilder::new("select id, action, user_id, execute_at, status where user_id in (");
) -> Result<HashMap<i64, Vec<ActionInfo>>> {
let mut query = QueryBuilder::new(
"select id, action, user_id, execute_at, status from queued_actions where user_id in (",
);
let mut separated = query.separated(",");
for id in user_ids.iter() {
separated.push_bind(id);
}

query.push(") group by user_id");
query.push(");");

let mut actions: HashMap<i64, Vec<ActionInfo>> = HashMap::new();

let actions = query
query
.build()
.fetch_all(&self.pool)
.await?
.into_iter()
.map(|row| {
.for_each(|row| {
let id: i32 = row.get("id");
let action: String = row.get("action");
let user_id: i64 = row.get("user_id");
let execute_at: OffsetDateTime = row.get("execute_at");
let status: String = row.get("status");

(
let action_info = ActionInfo {
id: Some(id),
action,
execute_at: Some(
prost_types::Timestamp::date_time_nanos(
execute_at.year().into(),
execute_at.month().into(),
execute_at.day(),
execute_at.hour(),
execute_at.minute(),
execute_at.second(),
execute_at.nanosecond(),
)
.unwrap(),
),
status,
user_id,
ActionInfo {
id: Some(id),
action,
execute_at: Some(
prost_types::Timestamp::date_time_nanos(
execute_at.year().into(),
execute_at.month().into(),
execute_at.day(),
execute_at.hour(),
execute_at.minute(),
execute_at.second(),
execute_at.nanosecond(),
)
.unwrap(),
),
status,
user_id,
},
)
})
.collect();
};

let actions = actions.entry(user_id).or_insert_with(|| Vec::new());
actions.push(action_info);
});

Ok(actions)
}
Expand Down

0 comments on commit 4620ebb

Please sign in to comment.