From aee84ddc47a6881e074981373cef9586d5daa42f Mon Sep 17 00:00:00 2001 From: ZoOL Date: Wed, 28 Feb 2024 15:27:03 +0800 Subject: [PATCH] use crossbeam-channel replace task --- Cargo.toml | 1 + src/lib.rs | 86 +++++++++++++++++++++++++++------------------------- src/typed.rs | 85 ++++++++++++++++++++++++++------------------------- 3 files changed, 90 insertions(+), 82 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e8dc3ff..1e70bd7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ keywords = ["bevy", "http", "plugin", "wasm"] [dependencies] bevy = { version = "0.13.0", default-features = false, features = ["multi-threaded"] } +crossbeam-channel = "0.5.11" ehttp = { version = "0.5.0", features = ["native-async", "json"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" diff --git a/src/lib.rs b/src/lib.rs index e4c58fe..82e0950 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,7 +2,8 @@ use bevy::ecs::system::CommandQueue; use bevy::prelude::*; -use bevy::tasks::{block_on, poll_once, IoTaskPool, Task}; +use bevy::tasks::IoTaskPool; +use crossbeam_channel::Receiver; use crate::prelude::TypedRequest; use ehttp::{Headers, Request, Response}; @@ -95,7 +96,7 @@ pub struct HttpClient { /// Request mode used on fetch. Only available on wasm builds #[cfg(target_arch = "wasm32")] - pub mode: Option, + pub mode: ehttp::Mode, } impl Default for HttpClient { @@ -107,7 +108,7 @@ impl Default for HttpClient { body: vec![], headers: Some(Headers::new(&[("Accept", "*/*")])), #[cfg(target_arch = "wasm32")] - mode: None, + mode: ehttp::Mode::default(), } } } @@ -394,7 +395,7 @@ impl HttpClient { self.url = Some(request.url); self.body = request.body; self.headers = Some(request.headers); - self.mode = Some(request.mode); + self.mode = request.mode; self } @@ -432,7 +433,7 @@ impl HttpClient { body: self.body, headers: self.headers.expect("headers is required"), #[cfg(target_arch = "wasm32")] - mode: self.mode.expect("mode is required"), + mode: self.mode, }, } } @@ -445,7 +446,7 @@ impl HttpClient { body: self.body, headers: self.headers.expect("headers is required"), #[cfg(target_arch = "wasm32")] - mode: self.mode.expect("mode is required"), + mode: self.mode, }, self.from_entity, ) @@ -469,8 +470,8 @@ impl HttpResponseError { } /// task for ehttp response result -#[derive(Component)] -pub struct RequestTask(pub Task); +#[derive(Component, Debug)] +pub struct RequestTask(pub Receiver); fn handle_request( mut commands: Commands, @@ -486,38 +487,41 @@ fn handle_request( } else { (commands.spawn_empty().id(), false) }; - - let task = thread_pool.spawn(async move { - let mut command_queue = CommandQueue::default(); - - let response = ehttp::fetch_async(req.request).await; - command_queue.push(move |world: &mut World| { - match response { - Ok(res) => { - world - .get_resource_mut::>() - .unwrap() - .send(HttpResponse(res)); - } - Err(e) => { - world - .get_resource_mut::>() - .unwrap() - .send(HttpResponseError::new(e.to_string())); + let (tx, rx) = crossbeam_channel::bounded(1); + + thread_pool + .spawn(async move { + let mut command_queue = CommandQueue::default(); + + let response = ehttp::fetch_async(req.request).await; + command_queue.push(move |world: &mut World| { + match response { + Ok(res) => { + world + .get_resource_mut::>() + .unwrap() + .send(HttpResponse(res)); + } + Err(e) => { + world + .get_resource_mut::>() + .unwrap() + .send(HttpResponseError::new(e.to_string())); + } } - } - if has_from_entity { - world.entity_mut(entity).remove::(); - } else { - world.entity_mut(entity).despawn_recursive(); - } - }); - - command_queue - }); + if has_from_entity { + world.entity_mut(entity).remove::(); + } else { + world.entity_mut(entity).despawn_recursive(); + } + }); + println!("commands_queue: {:?}", command_queue); + tx.send(command_queue).unwrap(); + }) + .detach(); - commands.entity(entity).insert(RequestTask(task)); + commands.entity(entity).insert(RequestTask(rx)); req_res.current_clients += 1; } } @@ -526,11 +530,11 @@ fn handle_request( fn handle_tasks( mut commands: Commands, mut req_res: ResMut, - mut request_tasks: Query<&mut RequestTask>, + mut request_tasks: Query<&RequestTask>, ) { - for mut task in request_tasks.iter_mut() { - if let Some(mut commands_queue) = block_on(poll_once(&mut task.0)) { - commands.append(&mut commands_queue); + for task in request_tasks.iter_mut() { + if let Ok(mut command_queue) = task.0.try_recv() { + commands.append(&mut command_queue); req_res.current_clients -= 1; } } diff --git a/src/typed.rs b/src/typed.rs index c974516..2517e4f 100644 --- a/src/typed.rs +++ b/src/typed.rs @@ -146,56 +146,59 @@ fn handle_typed_request Deserialize<'a> + Send + Sync + 'static>( (commands.spawn_empty().id(), false) }; let req = request.request.clone(); + let (tx, rx) = crossbeam_channel::bounded(1); - let task = thread_pool.spawn(async move { - let mut command_queue = CommandQueue::default(); + thread_pool + .spawn(async move { + let mut command_queue = CommandQueue::default(); - let response = ehttp::fetch_async(req).await; - command_queue.push(move |world: &mut World| { - match response { - Ok(response) => { - let result: Result = - serde_json::from_slice(response.bytes.as_slice()); + let response = ehttp::fetch_async(req).await; + command_queue.push(move |world: &mut World| { + match response { + Ok(response) => { + let result: Result = + serde_json::from_slice(response.bytes.as_slice()); - match result { - // deserialize success, send response - Ok(inner) => { - world - .get_resource_mut::>>() - .unwrap() - .send(TypedResponse { inner }); - } - // deserialize error, send error + response - Err(e) => { - world - .get_resource_mut::>>() - .unwrap() - .send( - TypedResponseError::new(e.to_string()) - .response(response), - ); + match result { + // deserialize success, send response + Ok(inner) => { + world + .get_resource_mut::>>() + .unwrap() + .send(TypedResponse { inner }); + } + // deserialize error, send error + response + Err(e) => { + world + .get_resource_mut::>>() + .unwrap() + .send( + TypedResponseError::new(e.to_string()) + .response(response), + ); + } } } + Err(e) => { + world + .get_resource_mut::>>() + .unwrap() + .send(TypedResponseError::new(e.to_string())); + } } - Err(e) => { - world - .get_resource_mut::>>() - .unwrap() - .send(TypedResponseError::new(e.to_string())); - } - } - if has_from_entity { - world.entity_mut(entity).remove::(); - } else { - world.entity_mut(entity).despawn_recursive(); - } - }); + if has_from_entity { + world.entity_mut(entity).remove::(); + } else { + world.entity_mut(entity).despawn_recursive(); + } + }); - command_queue - }); + tx.send(command_queue).unwrap() + }) + .detach(); - commands.entity(entity).insert(RequestTask(task)); + commands.entity(entity).insert(RequestTask(rx)); req_res.current_clients += 1; } }