Skip to content

Commit

Permalink
optimize worker queue
Browse files Browse the repository at this point in the history
  • Loading branch information
tofubert committed Dec 19, 2024
1 parent 683a9fb commit 28f65da
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 68 deletions.
26 changes: 23 additions & 3 deletions src/backend/nc_request/nc_requester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,15 +184,35 @@ impl NCRequest {

tokio::spawn(async move {
loop {
if let Some(req) = rx.recv().await {
let mut buffer: Vec<ApiRequests> = vec![];
let added = rx.recv_many(&mut buffer, 5).await;
log::debug!("got {} requests to API", added);

if added == 0 {
buffer.push(rx.recv().await.expect("Failed to get message"));
}

while worker_queue
.first()
.expect("No Element in worker queue")
.capacity()
< 5
{
worker_queue.sort_by_key(tokio::sync::mpsc::Sender::capacity);
}
log::debug!(
"Capacity of first {} and last {} worker",
worker_queue.first().unwrap().capacity(),
worker_queue.last().unwrap().capacity()
);
for message in buffer {
worker_queue
.first()
.expect("No Thread?")
.send(req)
.send(message)
.await
.expect("Failed to fwd request to worker.");
};
}
}
});
log::info!("Spawned API Thread");
Expand Down
85 changes: 48 additions & 37 deletions src/backend/nc_room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,14 @@ impl NCRoom {
token: &Token,
messages: &mut Vec<NCMessage>,
) -> Result<(), Box<dyn std::error::Error>> {
let response_onceshot = requester
.lock()
.await
.request_chat_initial(token, 200)
.await
.unwrap();
let response_onceshot = {
requester
.lock()
.await
.request_chat_initial(token, 200)
.await
.unwrap()
};
let response = response_onceshot
.await
.expect("Failed for fetch chat update")
Expand Down Expand Up @@ -255,12 +257,14 @@ impl NCRoomInterface for NCRoom {
requester: Arc<Mutex<Requester>>,
) -> Result<String, Box<dyn std::error::Error>> {
log::debug!("Send Message {}", &message);
let response_onceshot = requester
.lock()
.await
.request_send_message(message, &self.room_data.token)
.await
.unwrap();
let response_onceshot = {
requester
.lock()
.await
.request_send_message(message, &self.room_data.token)
.await
.unwrap()
};
let response = response_onceshot
.await
.expect("Failed for fetch chat participants");
Expand All @@ -278,16 +282,18 @@ impl NCRoomInterface for NCRoom {
if let Some(data) = data_option {
self.room_data = data.clone();
}
let response_onceshot = requester
.lock()
.await
.request_chat_update(
&self.room_data.token,
200,
self.messages.last().unwrap().get_id(),
)
.await
.unwrap();
let response_onceshot = {
requester
.lock()
.await
.request_chat_update(
&self.room_data.token,
200,
self.messages.last().unwrap().get_id(),
)
.await
.unwrap()
};
let response = response_onceshot
.await
.expect("Failed for fetch chat update")
Expand All @@ -307,12 +313,15 @@ impl NCRoomInterface for NCRoom {
for message in response {
self.messages.push(message.into());
}
let response_onceshot = requester
.lock()
.await
.request_participants(&self.room_data.token)
.await
.unwrap();
let response_onceshot = {
requester
.lock()
.await
.request_participants(&self.room_data.token)
.await
.unwrap()
};

self.participants = response_onceshot
.await
.expect("Failed for fetch chat participants")
Expand All @@ -329,15 +338,17 @@ impl NCRoomInterface for NCRoom {
requester: Arc<Mutex<Requester>>,
) -> Result<(), Box<dyn std::error::Error>> {
if !self.messages.is_empty() {
let response_onceshot = requester
.lock()
.await
.request_mark_chat_read(
&self.room_data.token,
self.messages.last().ok_or("No last message")?.get_id(),
)
.await
.unwrap();
let response_onceshot = {
requester
.lock()
.await
.request_mark_chat_read(
&self.room_data.token,
self.messages.last().ok_or("No last message")?.get_id(),
)
.await
.unwrap()
};
response_onceshot
.await
.expect("Failed for fetch chat participants")
Expand Down
65 changes: 37 additions & 28 deletions src/backend/nc_talk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::{
path::{Path, PathBuf},
sync::Arc,
};
use tokio::sync::Mutex;
use tokio::{sync::Mutex, task::JoinHandle};

use super::{
nc_request::Token,
Expand Down Expand Up @@ -62,13 +62,17 @@ impl<Requester: NCRequestInterface + 'static + std::marker::Send> NCTalk<Request
rooms: &mut HashMap<Token, NCRoom>,
chat_log_path: PathBuf,
) {
let v = response.into_iter().map(|child| {
tokio::spawn(NCTalk::<Requester>::new_room(
child,
Arc::clone(&raw_requester),
chat_log_path.clone(),
))
});
let v: Vec<JoinHandle<(String, Option<NCRoom>)>> = response
.into_iter()
.map(|child| {
tokio::spawn(NCTalk::<Requester>::new_room(
child,
Arc::clone(&raw_requester),
chat_log_path.clone(),
))
})
.collect();
log::debug!("Got {} initial threads", v.len());
for jh in v {
let (name, room_option) = jh.await.unwrap();
if let Some(room) = room_option {
Expand Down Expand Up @@ -96,6 +100,7 @@ impl<Requester: NCRequestInterface + 'static + std::marker::Send> NCTalk<Request
)),
);
}
log::debug!("Got {} initial threads", handles.capacity());
for (token, room_future) in &mut handles {
//we can safely unwrap here bc the json file on disk shall never be this broken.
let mut json_room = room_future.await?.unwrap();
Expand Down Expand Up @@ -140,12 +145,14 @@ impl<Requester: NCRequestInterface + 'static + std::marker::Send> NCTalk<Request

let requester = Arc::new(Mutex::new(raw_requester));

let resp = requester
.lock()
.await
.request_rooms_initial()
.await
.expect("Initial fetching of rooms on startup failed.");
let resp = {
requester
.lock()
.await
.request_rooms_initial()
.await
.expect("Initial fetching of rooms on startup failed.")
};
let (response, last_requested) = resp
.await
.expect("Initial fetching of rooms failed.")
Expand Down Expand Up @@ -363,25 +370,27 @@ impl<Requester: NCRequestInterface + 'static + std::marker::Sync> NCBackend for

async fn update_rooms(&mut self, force_update: bool) -> Result<Vec<String>, Box<dyn Error>> {
let (response, timestamp) = if force_update {
let resp = self
.requester
.lock()
.await
.request_rooms_update(self.last_requested)
.await
.expect("Initial fetching of rooms on startup failed.");
let resp = {
self.requester
.lock()
.await
.request_rooms_update(self.last_requested)
.await
.expect("Initial fetching of rooms on startup failed.")
};
resp.await
.expect("Initial fetching of rooms failed.")
.ok_or("No rooms found")
.expect("No rooms")
} else {
let resp = self
.requester
.lock()
.await
.request_rooms_initial()
.await
.expect("Initial fetching of rooms on startup failed.");
let resp = {
self.requester
.lock()
.await
.request_rooms_initial()
.await
.expect("Initial fetching of rooms on startup failed.")
};
resp.await
.expect("Initial fetching of rooms failed.")
.ok_or("No rooms found")
Expand Down

0 comments on commit 28f65da

Please sign in to comment.