Skip to content

Commit

Permalink
update actix ecosystem deps
Browse files Browse the repository at this point in the history
  • Loading branch information
robjtede committed Oct 29, 2023
1 parent 1eeb935 commit 1a44c31
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 20 deletions.
23 changes: 13 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,17 @@ actix-codec = "0.5"
actix-cors = "0.6"
actix-files = "0.6"
actix-http = "3.4"
actix-identity = "0.5"
actix-identity = "0.6"
actix-multipart = "0.6"
actix-multipart-derive = "0.6"
actix-protobuf = "0.10"
actix-session = "0.7"
actix-session = "0.8"
actix-test = "0.1"
actix-tls = "3.1.1"
actix-utils = "3"
actix-web = "4.4"
actix-web-actors = "4.1"
actix-web-lab = "0.19"
actix-web-lab = "0.20"
actix-ws = "0.2.5"
awc = "3.2"

Expand All @@ -104,3 +104,4 @@ serde = { version = "1", features = ["derive"] }
serde_json = "1"
tokio = { version = "1.24.2", features = ["sync", "io-util"] }
tokio-util = "0.7.4"
tokio-stream = "0.1.1"
2 changes: 2 additions & 0 deletions server-sent-events/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ env_logger.workspace = true
futures-util.workspace = true
log.workspace = true
parking_lot = "0.12"
tokio = { workspace = true, features = ["sync"] }
tokio-stream.workspace = true
19 changes: 12 additions & 7 deletions server-sent-events/src/broadcast.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
use std::{sync::Arc, time::Duration};

use actix_web::rt::time::interval;
use actix_web_lab::sse::{self, ChannelStream, Sse};
use actix_web_lab::{
sse::{self, Sse},
util::InfallibleStream,
};
use futures_util::future;
use parking_lot::Mutex;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;

pub struct Broadcaster {
inner: Mutex<BroadcasterInner>,
}

#[derive(Debug, Clone, Default)]
struct BroadcasterInner {
clients: Vec<sse::Sender>,
clients: Vec<mpsc::Sender<sse::Event>>,
}

impl Broadcaster {
Expand Down Expand Up @@ -59,14 +64,14 @@ impl Broadcaster {
}

/// Registers client with broadcaster, returning an SSE response body.
pub async fn new_client(&self) -> Sse<ChannelStream> {
let (tx, rx) = sse::channel(10);
pub async fn new_client(&self) -> Sse<InfallibleStream<ReceiverStream<sse::Event>>> {
let (tx, rx) = mpsc::channel(10);

tx.send(sse::Data::new("connected")).await.unwrap();
tx.send(sse::Data::new("connected").into()).await.unwrap();

self.inner.lock().clients.push(tx);

rx
Sse::from_infallible_receiver(rx)
}

/// Broadcasts `msg` to all clients.
Expand All @@ -75,7 +80,7 @@ impl Broadcaster {

let send_futures = clients
.iter()
.map(|client| client.send(sse::Data::new(msg)));
.map(|client| client.send(sse::Data::new(msg).into()));

// try to send to all clients, ignoring failures
// disconnected clients will get swept up by `remove_stale_clients`
Expand Down

0 comments on commit 1a44c31

Please sign in to comment.