Skip to content

Commit

Permalink
Merge branch 'chrivers/json-extractor-workaround' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
chrivers committed Jan 30, 2025
2 parents 64960cf + 7656932 commit af39c3a
Show file tree
Hide file tree
Showing 16 changed files with 625 additions and 45 deletions.
443 changes: 443 additions & 0 deletions doc/hue-zigbee-format.md

Large diffs are not rendered by default.

23 changes: 10 additions & 13 deletions src/resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,21 @@ use crate::hue::api::{GroupedLightUpdate, LightUpdate, SceneUpdate, Update};
use crate::hue::event::EventBlock;
use crate::hue::version::SwVersion;
use crate::model::state::{AuxData, State};
use crate::server::hueevents::HueEventStream;
use crate::z2m::request::ClientRequest;

#[derive(Clone, Debug)]
pub struct Resources {
state: State,
version: SwVersion,
state_updates: Arc<Notify>,
pub hue_updates: Sender<EventBlock>,
pub z2m_updates: Sender<Arc<ClientRequest>>,
hue_event_stream: HueEventStream,
}

impl Resources {
const MAX_SCENE_ID: u32 = 100;
const HUE_EVENTS_BUFFER_SIZE: usize = 128;

#[allow(clippy::new_without_default)]
#[must_use]
Expand All @@ -38,8 +40,8 @@ impl Resources {
state,
version,
state_updates: Arc::new(Notify::new()),
hue_updates: Sender::new(32),
z2m_updates: Sender::new(32),
hue_event_stream: HueEventStream::new(Self::HUE_EVENTS_BUFFER_SIZE),
}
}

Expand Down Expand Up @@ -127,7 +129,8 @@ impl Resources {

if let Some(delta) = Self::generate_update(obj)? {
let id_v1 = self.state.id_v1(id);
self.hue_event(EventBlock::update(id, id_v1, delta)?);
self.hue_event_stream
.hue_event(EventBlock::update(id, id_v1, delta)?);
}

self.state_updates.notify_one();
Expand Down Expand Up @@ -186,7 +189,7 @@ impl Resources {

log::trace!("Send event: {evt:?}");

self.hue_event(evt);
self.hue_event_stream.hue_event(evt);

Ok(())
}
Expand All @@ -199,7 +202,7 @@ impl Resources {

let evt = EventBlock::delete(link)?;

self.hue_event(evt);
self.hue_event_stream.hue_event(evt);

Ok(())
}
Expand Down Expand Up @@ -429,14 +432,8 @@ impl Resources {
}

#[must_use]
pub fn hue_channel(&self) -> Receiver<EventBlock> {
self.hue_updates.subscribe()
}

fn hue_event(&self, evt: EventBlock) {
if let Err(err) = self.hue_updates.send(evt) {
log::trace!("Overflow on hue event pipe: {err}");
}
pub const fn hue_event_stream(&self) -> &HueEventStream {
&self.hue_event_stream
}

#[must_use]
Expand Down
12 changes: 5 additions & 7 deletions src/routes/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use axum::{
extract::{Path, State},
response::IntoResponse,
routing::{get, post, put},
Json, Router,
Router,
};

use bytes::Bytes;
Expand All @@ -13,19 +13,17 @@ use serde_json::{json, Value};
use tokio::sync::MutexGuard;
use uuid::Uuid;

use crate::error::{ApiError, ApiResult};
use crate::hue::api::{Device, GroupedLight, Light, RType, ResourceLink, Room, Scene, V1Reply};
use crate::hue::legacy_api::{
ApiGroup, ApiLight, ApiLightStateUpdate, ApiResourceType, ApiScene, ApiUserConfig,
Capabilities, HueResult, NewUser, NewUserReply,
ApiGroup, ApiGroupActionUpdate, ApiLight, ApiLightStateUpdate, ApiResourceType, ApiScene,
ApiUserConfig, Capabilities, HueResult, NewUser, NewUserReply,
};
use crate::resource::Resources;
use crate::routes::extractor::Json;
use crate::server::appstate::AppState;
use crate::z2m::request::ClientRequest;
use crate::z2m::update::DeviceUpdate;
use crate::{
error::{ApiError, ApiResult},
hue::legacy_api::ApiGroupActionUpdate,
};

async fn get_api_config(State(state): State<AppState>) -> impl IntoResponse {
Json(state.api_short_config().await)
Expand Down
3 changes: 2 additions & 1 deletion src/routes/clip/device.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use axum::{
extract::{Path, State},
routing::put,
Json, Router,
Router,
};
use serde_json::Value;
use uuid::Uuid;

use crate::hue::api::{Device, DeviceUpdate, RType, V2Reply};
use crate::routes::clip::ApiV2Result;
use crate::routes::extractor::Json;
use crate::server::appstate::AppState;

async fn put_device(
Expand Down
3 changes: 2 additions & 1 deletion src/routes/clip/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ use axum::{
extract::{Path, State},
response::IntoResponse,
routing::{delete, get, post, put},
Json, Router,
Router,
};
use serde_json::Value;
use uuid::Uuid;

use crate::error::ApiError;
use crate::hue::api::{RType, Resource, ResourceLink, V2Reply};
use crate::routes::clip::ApiV2Result;
use crate::routes::extractor::Json;
use crate::server::appstate::AppState;

async fn get_root(State(state): State<AppState>) -> impl IntoResponse {
Expand Down
3 changes: 2 additions & 1 deletion src/routes/clip/grouped_light.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use axum::{
extract::{Path, State},
routing::put,
Json, Router,
Router,
};
use serde_json::Value;
use uuid::Uuid;

use crate::hue::api::{GroupedLight, GroupedLightUpdate, RType, V2Reply};
use crate::routes::clip::ApiV2Result;
use crate::routes::extractor::Json;
use crate::server::appstate::AppState;
use crate::z2m::request::ClientRequest;
use crate::z2m::update::DeviceUpdate;
Expand Down
3 changes: 2 additions & 1 deletion src/routes/clip/light.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use axum::{
extract::{Path, State},
routing::get,
Json, Router,
Router,
};
use serde_json::Value;
use uuid::Uuid;

use crate::hue::api::{Light, LightUpdate, RType, V2Reply};
use crate::routes::clip::ApiV2Result;
use crate::routes::extractor::Json;
use crate::server::appstate::AppState;
use crate::z2m::request::ClientRequest;
use crate::z2m::update::DeviceUpdate;
Expand Down
3 changes: 2 additions & 1 deletion src/routes/clip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ pub mod grouped_light;
pub mod light;
pub mod scene;

use axum::{Json, Router};
use axum::Router;
use serde::Serialize;
use serde_json::Value;

use crate::error::ApiResult;
use crate::hue::api::V2Reply;
use crate::routes::extractor::Json;
use crate::server::appstate::AppState;

type ApiV2Result = ApiResult<Json<V2Reply<Value>>>;
Expand Down
3 changes: 2 additions & 1 deletion src/routes/clip/scene.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use axum::{
extract::{Path, State},
response::IntoResponse,
routing::{delete, post, put},
Json, Router,
Router,
};
use serde_json::Value;
use uuid::Uuid;
Expand All @@ -13,6 +13,7 @@ use crate::hue::api::{
};
use crate::model::state::AuxData;
use crate::routes::clip::ApiV2Result;
use crate::routes::extractor::Json;
use crate::server::appstate::AppState;
use crate::z2m::request::ClientRequest;

Expand Down
41 changes: 25 additions & 16 deletions src/routes/eventstream.rs
Original file line number Diff line number Diff line change
@@ -1,39 +1,48 @@
use axum::extract::State;
use axum::http::{HeaderMap, HeaderValue};
use axum::response::sse::{Event, Sse};
use axum::routing::get;
use axum::Router;
use chrono::Utc;
use futures::stream::Stream;
use futures::stream::{self, Stream};
use futures::StreamExt;
use tokio_stream::wrappers::BroadcastStream;

use crate::error::ApiResult;
use crate::server::appstate::AppState;

pub async fn get_clip_v2(
headers: HeaderMap,
State(state): State<AppState>,
) -> Sse<impl Stream<Item = ApiResult<Event>>> {
let hello = tokio_stream::iter([Ok(Event::default().comment("hi"))]);
let last_event_id = headers.get("last-event-id").map(HeaderValue::to_str);

let mut prev_ts = Utc::now().timestamp();
let mut idx = 0;

let channel = state.res.lock().await.hue_channel();
let channel = state.res.lock().await.hue_event_stream().subscribe();
let stream = BroadcastStream::new(channel);
let events = match last_event_id {
Some(Ok(id)) => {
let previous_events = state
.res
.lock()
.await
.hue_event_stream()
.events_sent_after_id(id);
stream::iter(previous_events.into_iter().map(Ok))
.chain(stream)
.boxed()
}
_ => stream.boxed(),
};

let stream = BroadcastStream::new(channel).map(move |e| {
let json = [e?];
let stream = events.map(move |e| {
let evt = e?;
let evt_id = evt.id();
let json = [evt.block];
log::trace!(
"## EVENT ##: {}",
serde_json::to_string(&json).unwrap_or_else(|_| "ERROR".to_string())
);
let ts = Utc::now().timestamp();
if ts == prev_ts {
idx += 1;
} else {
idx = 0;
prev_ts = ts;
}
Ok(Event::default().id(format!("{ts}:{idx}")).json_data(json)?)
Ok(Event::default().id(evt_id).json_data(json)?)
});

Sse::new(hello.chain(stream))
Expand Down
33 changes: 33 additions & 0 deletions src/routes/extractor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use axum::extract::rejection::JsonRejection;
use axum::extract::{FromRequest, Request};
use axum::response::IntoResponse;
use bytes::Bytes;
use serde::de::DeserializeOwned;
use serde::Serialize;

// Simple wrapper around axum::Json, which skips the header requirements.
//
// The axum version requires "Content-Type: application/json", which many
// (buggy) apps don't actually send. So we are forced to skip this check.
pub struct Json<T>(pub T);

#[axum::async_trait]
impl<S, T> FromRequest<S> for Json<T>
where
axum::Json<T>: FromRequest<S, Rejection = JsonRejection>,
T: DeserializeOwned,
S: Send + Sync,
{
type Rejection = JsonRejection;

async fn from_request(req: Request, state: &S) -> Result<Self, Self::Rejection> {
let bytes = Bytes::from_request(req, state).await?;
Ok(Self(axum::Json::from_bytes(&bytes)?.0))
}
}

impl<T: Serialize> IntoResponse for Json<T> {
fn into_response(self) -> axum::response::Response {
axum::Json(self.0).into_response()
}
}
3 changes: 2 additions & 1 deletion src/routes/licenses.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use axum::response::IntoResponse;
use axum::routing::get;
use axum::{Json, Router};
use axum::Router;
use itertools::Itertools;
use serde_json::{json, Value};

use crate::routes::extractor::Json;
use crate::server::appstate::AppState;

async fn packages() -> Json<Value> {
Expand Down
4 changes: 3 additions & 1 deletion src/routes/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use axum::response::{IntoResponse, Response};
use axum::{Json, Router};
use axum::Router;
use hyper::StatusCode;
use serde_json::Value;

use crate::error::ApiError;
use crate::hue::api::V2Reply;
use crate::routes::extractor::Json;
use crate::server::appstate::AppState;

pub mod api;
pub mod clip;
pub mod eventstream;
pub mod extractor;
pub mod licenses;

impl IntoResponse for ApiError {
Expand Down
Loading

0 comments on commit af39c3a

Please sign in to comment.