Skip to content

Commit

Permalink
Merge pull request #64 from duvholt/sse-resume
Browse files Browse the repository at this point in the history
Support for resuming Hue event stream
  • Loading branch information
chrivers authored Jan 26, 2025
2 parents 3b78533 + 7c4ba99 commit 0237916
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 29 deletions.
23 changes: 10 additions & 13 deletions src/resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,21 @@ use crate::hue::api::{GroupedLightUpdate, LightUpdate, SceneUpdate, Update};
use crate::hue::event::EventBlock;
use crate::hue::version::SwVersion;
use crate::model::state::{AuxData, State};
use crate::server::hueevents::HueEventStream;
use crate::z2m::request::ClientRequest;

#[derive(Clone, Debug)]
pub struct Resources {
state: State,
version: SwVersion,
state_updates: Arc<Notify>,
pub hue_updates: Sender<EventBlock>,
pub z2m_updates: Sender<Arc<ClientRequest>>,
hue_event_stream: HueEventStream,
}

impl Resources {
const MAX_SCENE_ID: u32 = 100;
const HUE_EVENTS_BUFFER_SIZE: usize = 128;

#[allow(clippy::new_without_default)]
#[must_use]
Expand All @@ -38,8 +40,8 @@ impl Resources {
state,
version,
state_updates: Arc::new(Notify::new()),
hue_updates: Sender::new(32),
z2m_updates: Sender::new(32),
hue_event_stream: HueEventStream::new(Self::HUE_EVENTS_BUFFER_SIZE),
}
}

Expand Down Expand Up @@ -126,7 +128,8 @@ impl Resources {

if let Some(delta) = Self::generate_update(obj)? {
let id_v1 = self.state.id_v1(id);
self.hue_event(EventBlock::update(id, id_v1, delta)?);
self.hue_event_stream
.hue_event(EventBlock::update(id, id_v1, delta)?);
}

self.state_updates.notify_one();
Expand Down Expand Up @@ -185,7 +188,7 @@ impl Resources {

log::trace!("Send event: {evt:?}");

self.hue_event(evt);
self.hue_event_stream.hue_event(evt);

Ok(())
}
Expand All @@ -198,7 +201,7 @@ impl Resources {

let evt = EventBlock::delete(link)?;

self.hue_event(evt);
self.hue_event_stream.hue_event(evt);

Ok(())
}
Expand Down Expand Up @@ -428,14 +431,8 @@ impl Resources {
}

#[must_use]
pub fn hue_channel(&self) -> Receiver<EventBlock> {
self.hue_updates.subscribe()
}

fn hue_event(&self, evt: EventBlock) {
if let Err(err) = self.hue_updates.send(evt) {
log::trace!("Overflow on hue event pipe: {err}");
}
pub const fn hue_event_stream(&self) -> &HueEventStream {
&self.hue_event_stream
}

#[must_use]
Expand Down
41 changes: 25 additions & 16 deletions src/routes/eventstream.rs
Original file line number Diff line number Diff line change
@@ -1,39 +1,48 @@
use axum::extract::State;
use axum::http::{HeaderMap, HeaderValue};
use axum::response::sse::{Event, Sse};
use axum::routing::get;
use axum::Router;
use chrono::Utc;
use futures::stream::Stream;
use futures::stream::{self, Stream};
use futures::StreamExt;
use tokio_stream::wrappers::BroadcastStream;

use crate::error::ApiResult;
use crate::server::appstate::AppState;

pub async fn get_clip_v2(
headers: HeaderMap,
State(state): State<AppState>,
) -> Sse<impl Stream<Item = ApiResult<Event>>> {
let hello = tokio_stream::iter([Ok(Event::default().comment("hi"))]);
let last_event_id = headers.get("last-event-id").map(HeaderValue::to_str);

let mut prev_ts = Utc::now().timestamp();
let mut idx = 0;

let channel = state.res.lock().await.hue_channel();
let channel = state.res.lock().await.hue_event_stream().subscribe();
let stream = BroadcastStream::new(channel);
let events = match last_event_id {
Some(Ok(id)) => {
let previous_events = state
.res
.lock()
.await
.hue_event_stream()
.events_sent_after_id(id);
stream::iter(previous_events.into_iter().map(Ok))
.chain(stream)
.boxed()
}
_ => stream.boxed(),
};

let stream = BroadcastStream::new(channel).map(move |e| {
let json = [e?];
let stream = events.map(move |e| {
let evt = e?;
let evt_id = evt.id();
let json = [evt.block];
log::trace!(
"## EVENT ##: {}",
serde_json::to_string(&json).unwrap_or_else(|_| "ERROR".to_string())
);
let ts = Utc::now().timestamp();
if ts == prev_ts {
idx += 1;
} else {
idx = 0;
prev_ts = ts;
}
Ok(Event::default().id(format!("{ts}:{idx}")).json_data(json)?)
Ok(Event::default().id(evt_id).json_data(json)?)
});

Sse::new(hello.chain(stream))
Expand Down
88 changes: 88 additions & 0 deletions src/server/hueevents.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use std::collections::VecDeque;

use chrono::{DateTime, Utc};
use tokio::sync::broadcast::{Receiver, Sender};

use crate::hue::event::EventBlock;

#[derive(Clone, Debug)]
pub struct HueEventRecord {
timestamp: DateTime<Utc>,
index: u32,
pub block: EventBlock,
}

impl HueEventRecord {
#[must_use]
pub fn id(&self) -> String {
format!("{}:{}", self.timestamp.timestamp(), self.index)
}
}

#[derive(Clone, Debug)]
pub struct HueEventStream {
timestamp: DateTime<Utc>,
index: u32,
hue_updates: Sender<HueEventRecord>,
buffer: VecDeque<HueEventRecord>,
}

impl HueEventStream {
#[must_use]
pub fn new(buffer_capacity: usize) -> Self {
Self {
timestamp: Utc::now(),
index: 0,
hue_updates: Sender::new(32),
buffer: VecDeque::with_capacity(buffer_capacity),
}
}

fn add_to_buffer(&mut self, record: HueEventRecord) {
if self.buffer.len() == self.buffer.capacity() {
self.buffer.pop_front();
self.buffer.push_back(record);
debug_assert!(self.buffer.len() == self.buffer.capacity());
} else {
self.buffer.push_back(record);
}
}

fn generate_record(&mut self, block: EventBlock) -> HueEventRecord {
let timestamp = Utc::now();
if timestamp.timestamp() == self.timestamp.timestamp() {
self.index += 1;
} else {
self.index = 0;
self.timestamp = timestamp;
}
HueEventRecord {
block,
timestamp,
index: self.index,
}
}

#[must_use]
pub fn events_sent_after_id(&self, id: &str) -> Vec<HueEventRecord> {
let mut events = self.buffer.iter().skip_while(|record| record.id() != id);
match events.next() {
Some(_) => events.cloned().collect(),
// return all events if requested event is not in buffer
None => self.buffer.iter().cloned().collect(),
}
}

pub fn hue_event(&mut self, block: EventBlock) {
let record = self.generate_record(block);
self.add_to_buffer(record.clone());
if let Err(err) = self.hue_updates.send(record) {
log::trace!("Overflow on hue event pipe: {err}");
}
}

#[must_use]
pub fn subscribe(&self) -> Receiver<HueEventRecord> {
self.hue_updates.subscribe()
}
}
1 change: 1 addition & 0 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod appstate;
pub mod banner;
pub mod certificate;
pub mod hueevents;
pub mod updater;

use std::fs::File;
Expand Down

0 comments on commit 0237916

Please sign in to comment.