Skip to content

Commit

Permalink
make requester use a thread pool to distribute the requests
Browse files Browse the repository at this point in the history
  • Loading branch information
tofubert committed Dec 19, 2024
1 parent 78ed61e commit 683a9fb
Showing 1 changed file with 79 additions and 55 deletions.
134 changes: 79 additions & 55 deletions src/backend/nc_request/nc_requester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,68 +106,92 @@ pub struct NCRequest {
}

impl NCRequest {
async fn handle_req(worker: &NCRequestWorker, req: ApiRequests) {
log::debug!("got a new API Request {}", req);
match req {
ApiRequests::FetchChatInitial(token, maxMessage, response) => {
response
.send(Some(
worker.fetch_chat_initial(&token, maxMessage).await.unwrap(),
))
.expect("could not Send.");
}
ApiRequests::FetchChatUpdate(token, maxMessage, last_message, response) => {
response
.send(Some(
worker
.fetch_chat_update(&token, maxMessage, last_message)
.await
.unwrap(),
))
.expect("could not Send.");
}
ApiRequests::FetchRoomsInitial(response) => {
response
.send(Some(worker.fetch_rooms_initial().await.unwrap()))
.expect("could not Send.");
}
ApiRequests::FetchRoomsUpdate(last_timestamp, response) => {
response
.send(Some(
worker.fetch_rooms_update(last_timestamp).await.unwrap(),
))
.expect("could not Send.");
}
ApiRequests::SendMessage(token, message, response) => {
response
.send(Some(worker.send_message(message, &token).await.unwrap()))
.expect("could not Send.");
}
ApiRequests::FetchAutocompleteUsers(name, response) => {
response
.send(Some(worker.fetch_autocomplete_users(&name).await.unwrap()))
.expect("could not Send.");
}
ApiRequests::FetchParticipants(token, response) => {
response
.send(Some(worker.fetch_participants(&token).await.unwrap()))
.expect("could not Send.");
}
ApiRequests::MarkChatRead(token, last_message, response) => {
worker.mark_chat_read(&token, last_message).await.unwrap();
response.send(Some(())).expect("could not Send.");
}
ApiRequests::None => {
log::warn!("Unknown Request");
}
}
}
pub fn new(config: &Config) -> Self {
let (tx, mut rx) = mpsc::channel::<ApiRequests>(50);

let worker = NCRequestWorker::new(config).unwrap();
let mut worker_queue = vec![];

for i in 1..6 {
let (tx_worker, mut rx_worker) = mpsc::channel::<ApiRequests>(10);

worker_queue.push(tx_worker);
let worker = NCRequestWorker::new(config).unwrap();

tokio::spawn(async move {
loop {
if let Some(req) = rx_worker.recv().await {
NCRequest::handle_req(&worker, req).await;
};
}
});
}

tokio::spawn(async move {
loop {
if let Some(req) = rx.recv().await {
log::debug!("got a new API Request {}", req);
match req {
ApiRequests::FetchChatInitial(token, maxMessage, response) => {
response
.send(Some(
worker.fetch_chat_initial(&token, maxMessage).await.unwrap(),
))
.expect("could not Send.");
}
ApiRequests::FetchChatUpdate(token, maxMessage, last_message, response) => {
response
.send(Some(
worker
.fetch_chat_update(&token, maxMessage, last_message)
.await
.unwrap(),
))
.expect("could not Send.");
}
ApiRequests::FetchRoomsInitial(response) => {
response
.send(Some(worker.fetch_rooms_initial().await.unwrap()))
.expect("could not Send.");
}
ApiRequests::FetchRoomsUpdate(last_timestamp, response) => {
response
.send(Some(
worker.fetch_rooms_update(last_timestamp).await.unwrap(),
))
.expect("could not Send.");
}
ApiRequests::SendMessage(token, message, response) => {
response
.send(Some(worker.send_message(message, &token).await.unwrap()))
.expect("could not Send.");
}
ApiRequests::FetchAutocompleteUsers(name, response) => {
response
.send(Some(worker.fetch_autocomplete_users(&name).await.unwrap()))
.expect("could not Send.");
}
ApiRequests::FetchParticipants(token, response) => {
response
.send(Some(worker.fetch_participants(&token).await.unwrap()))
.expect("could not Send.");
}
ApiRequests::MarkChatRead(token, last_message, response) => {
worker.mark_chat_read(&token, last_message).await.unwrap();
response.send(Some(())).expect("could not Send.");
}
ApiRequests::None => {
log::warn!("Unknown Request");
}
}
worker_queue.sort_by_key(tokio::sync::mpsc::Sender::capacity);
worker_queue
.first()
.expect("No Thread?")
.send(req)
.await
.expect("Failed to fwd request to worker.");
};
}
});
Expand Down

0 comments on commit 683a9fb

Please sign in to comment.