Skip to content

Commit

Permalink
use crossbeam-channel replace task
Browse files Browse the repository at this point in the history
  • Loading branch information
foxzool committed Feb 28, 2024
1 parent b4151df commit aee84dd
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 82 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ keywords = ["bevy", "http", "plugin", "wasm"]

[dependencies]
bevy = { version = "0.13.0", default-features = false, features = ["multi-threaded"] }
crossbeam-channel = "0.5.11"
ehttp = { version = "0.5.0", features = ["native-async", "json"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
Expand Down
86 changes: 45 additions & 41 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

use bevy::ecs::system::CommandQueue;
use bevy::prelude::*;
use bevy::tasks::{block_on, poll_once, IoTaskPool, Task};
use bevy::tasks::IoTaskPool;
use crossbeam_channel::Receiver;

use crate::prelude::TypedRequest;
use ehttp::{Headers, Request, Response};
Expand Down Expand Up @@ -95,7 +96,7 @@ pub struct HttpClient {

/// Request mode used on fetch. Only available on wasm builds
#[cfg(target_arch = "wasm32")]
pub mode: Option<Mode>,
pub mode: ehttp::Mode,
}

impl Default for HttpClient {
Expand All @@ -107,7 +108,7 @@ impl Default for HttpClient {
body: vec![],
headers: Some(Headers::new(&[("Accept", "*/*")])),
#[cfg(target_arch = "wasm32")]
mode: None,
mode: ehttp::Mode::default(),
}
}
}
Expand Down Expand Up @@ -394,7 +395,7 @@ impl HttpClient {
self.url = Some(request.url);
self.body = request.body;
self.headers = Some(request.headers);
self.mode = Some(request.mode);
self.mode = request.mode;

self
}
Expand Down Expand Up @@ -432,7 +433,7 @@ impl HttpClient {
body: self.body,
headers: self.headers.expect("headers is required"),
#[cfg(target_arch = "wasm32")]
mode: self.mode.expect("mode is required"),
mode: self.mode,
},
}
}
Expand All @@ -445,7 +446,7 @@ impl HttpClient {
body: self.body,
headers: self.headers.expect("headers is required"),
#[cfg(target_arch = "wasm32")]
mode: self.mode.expect("mode is required"),
mode: self.mode,
},
self.from_entity,
)
Expand All @@ -469,8 +470,8 @@ impl HttpResponseError {
}

/// task for ehttp response result
#[derive(Component)]
pub struct RequestTask(pub Task<CommandQueue>);
#[derive(Component, Debug)]
pub struct RequestTask(pub Receiver<CommandQueue>);

fn handle_request(
mut commands: Commands,
Expand All @@ -486,38 +487,41 @@ fn handle_request(
} else {
(commands.spawn_empty().id(), false)
};

let task = thread_pool.spawn(async move {
let mut command_queue = CommandQueue::default();

let response = ehttp::fetch_async(req.request).await;
command_queue.push(move |world: &mut World| {
match response {
Ok(res) => {
world
.get_resource_mut::<Events<HttpResponse>>()
.unwrap()
.send(HttpResponse(res));
}
Err(e) => {
world
.get_resource_mut::<Events<HttpResponseError>>()
.unwrap()
.send(HttpResponseError::new(e.to_string()));
let (tx, rx) = crossbeam_channel::bounded(1);

thread_pool
.spawn(async move {
let mut command_queue = CommandQueue::default();

let response = ehttp::fetch_async(req.request).await;
command_queue.push(move |world: &mut World| {
match response {
Ok(res) => {
world
.get_resource_mut::<Events<HttpResponse>>()
.unwrap()
.send(HttpResponse(res));
}
Err(e) => {
world
.get_resource_mut::<Events<HttpResponseError>>()
.unwrap()
.send(HttpResponseError::new(e.to_string()));
}
}
}

if has_from_entity {
world.entity_mut(entity).remove::<RequestTask>();
} else {
world.entity_mut(entity).despawn_recursive();
}
});

command_queue
});
if has_from_entity {
world.entity_mut(entity).remove::<RequestTask>();
} else {
world.entity_mut(entity).despawn_recursive();
}
});
println!("commands_queue: {:?}", command_queue);
tx.send(command_queue).unwrap();
})
.detach();

commands.entity(entity).insert(RequestTask(task));
commands.entity(entity).insert(RequestTask(rx));
req_res.current_clients += 1;
}
}
Expand All @@ -526,11 +530,11 @@ fn handle_request(
fn handle_tasks(
mut commands: Commands,
mut req_res: ResMut<HttpClientSetting>,
mut request_tasks: Query<&mut RequestTask>,
mut request_tasks: Query<&RequestTask>,
) {
for mut task in request_tasks.iter_mut() {
if let Some(mut commands_queue) = block_on(poll_once(&mut task.0)) {
commands.append(&mut commands_queue);
for task in request_tasks.iter_mut() {
if let Ok(mut command_queue) = task.0.try_recv() {
commands.append(&mut command_queue);
req_res.current_clients -= 1;
}
}
Expand Down
85 changes: 44 additions & 41 deletions src/typed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,56 +146,59 @@ fn handle_typed_request<T: for<'a> Deserialize<'a> + Send + Sync + 'static>(
(commands.spawn_empty().id(), false)
};
let req = request.request.clone();
let (tx, rx) = crossbeam_channel::bounded(1);

let task = thread_pool.spawn(async move {
let mut command_queue = CommandQueue::default();
thread_pool
.spawn(async move {
let mut command_queue = CommandQueue::default();

let response = ehttp::fetch_async(req).await;
command_queue.push(move |world: &mut World| {
match response {
Ok(response) => {
let result: Result<T, _> =
serde_json::from_slice(response.bytes.as_slice());
let response = ehttp::fetch_async(req).await;
command_queue.push(move |world: &mut World| {
match response {
Ok(response) => {
let result: Result<T, _> =
serde_json::from_slice(response.bytes.as_slice());

match result {
// deserialize success, send response
Ok(inner) => {
world
.get_resource_mut::<Events<TypedResponse<T>>>()
.unwrap()
.send(TypedResponse { inner });
}
// deserialize error, send error + response
Err(e) => {
world
.get_resource_mut::<Events<TypedResponseError<T>>>()
.unwrap()
.send(
TypedResponseError::new(e.to_string())
.response(response),
);
match result {
// deserialize success, send response
Ok(inner) => {
world
.get_resource_mut::<Events<TypedResponse<T>>>()
.unwrap()
.send(TypedResponse { inner });
}
// deserialize error, send error + response
Err(e) => {
world
.get_resource_mut::<Events<TypedResponseError<T>>>()
.unwrap()
.send(
TypedResponseError::new(e.to_string())
.response(response),
);
}
}
}
Err(e) => {
world
.get_resource_mut::<Events<TypedResponseError<T>>>()
.unwrap()
.send(TypedResponseError::new(e.to_string()));
}
}
Err(e) => {
world
.get_resource_mut::<Events<TypedResponseError<T>>>()
.unwrap()
.send(TypedResponseError::new(e.to_string()));
}
}

if has_from_entity {
world.entity_mut(entity).remove::<RequestTask>();
} else {
world.entity_mut(entity).despawn_recursive();
}
});
if has_from_entity {
world.entity_mut(entity).remove::<RequestTask>();
} else {
world.entity_mut(entity).despawn_recursive();
}
});

command_queue
});
tx.send(command_queue).unwrap()
})
.detach();

commands.entity(entity).insert(RequestTask(task));
commands.entity(entity).insert(RequestTask(rx));
req_res.current_clients += 1;
}
}
Expand Down

0 comments on commit aee84dd

Please sign in to comment.