Skip to content

Commit 6662f58

Browse files
authored
Merge pull request #11 from hdoordt/dyn-delivery
Introduce DynDelivery to allow for combining buses with similar payloads
2 parents 9ce4ba0 + 91beb7d commit 6662f58

File tree

3 files changed

+136
-15
lines changed

3 files changed

+136
-15
lines changed

src/chan/rpc/comm.rs

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
use std::{any::type_name, marker::PhantomData};
22

3-
use futures::Stream;
3+
use futures::{Future, Stream, StreamExt};
44
use lapin::BasicProperties;
55
use serde::{Deserialize, Serialize};
66
use tokio::sync::mpsc;
77
use tracing::debug;
88
use uuid::Uuid;
99

1010
use crate::{
11-
error::Error, Bus, Channel, Consumer, Delivery, DirectBus, Publisher, ReplyError, Result,
12-
RpcChannel,
11+
error::Error, Bus, Channel, Consumer, Delivery, DirectBus, DynRpcDelivery, Publisher,
12+
ReplyError, Result, RpcChannel,
1313
};
1414

1515
use super::ReplyReceiver;
@@ -154,7 +154,7 @@ where
154154
/// Reply to a message that was sent by the receiver of the initial message.
155155
pub async fn reply_forth(
156156
&self,
157-
back_payload: &B::ForthPayload,
157+
forth_payload: &B::ForthPayload,
158158
chan: &impl Channel,
159159
) -> Result<()> {
160160
let Some(correlation_uuid) = self.get_uuid() else {
@@ -166,15 +166,15 @@ where
166166

167167
let correlation_uuid = correlation_uuid?;
168168

169-
let bytes = serde_json::to_vec(back_payload)?;
169+
let bytes = serde_json::to_vec(forth_payload)?;
170170

171171
debug!("Replying forth to message with correlation UUID {correlation_uuid}");
172172
chan.publish_with_properties(&bytes, reply_to, Default::default(), correlation_uuid)
173173
.await
174174
}
175175

176176
/// Reply to a message that was sent by the receiver of the initial message and await
177-
/// any further messages from the receiver.
177+
/// multiple replies from the receiver.
178178
/// The messages can be obtained by calling [futures::StreamExt::next] on the returned [Stream].
179179
pub async fn reply_recv_many(
180180
&self,
@@ -210,6 +210,33 @@ where
210210
.await?;
211211
Ok(rx)
212212
}
213+
214+
/// Reply to a message that was sent by the receiver of the initial message and await
215+
/// one reply from the receiver.
216+
/// The message can be obtained by `await`ing the returned [Future].
217+
pub async fn reply_recv(
218+
&self,
219+
back_payload: &B::BackPayload,
220+
rpc_chan: &RpcChannel,
221+
) -> Result<impl Future<Output = Delivery<CommReply<B>>>> {
222+
let rx = self.reply_recv_many(back_payload, rpc_chan).await?;
223+
Ok(async move {
224+
// As `rx` won't be dropped, we can assume that either
225+
// this future never resolves, or it resolves to a `Some`
226+
rx.take(1).next().await.unwrap()
227+
})
228+
}
229+
230+
/// Convert this [Delivery] into a [DynRpcDelivery]
231+
pub fn into_dyn_comm(self) -> DynRpcDelivery<B::BackPayload, B::ForthPayload> {
232+
DynRpcDelivery {
233+
inner: crate::DynDelivery {
234+
inner: self.inner,
235+
_marker: PhantomData,
236+
},
237+
_reply: PhantomData,
238+
}
239+
}
213240
}
214241

215242
#[cfg(test)]

src/chan/rpc/mod.rs

Lines changed: 69 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,11 @@
1-
use std::{any::type_name, fmt::Display, marker::PhantomData, sync::Arc, task::Poll};
1+
use std::{
2+
any::type_name,
3+
fmt::Display,
4+
marker::PhantomData,
5+
ops::{Deref, DerefMut},
6+
sync::Arc,
7+
task::Poll,
8+
};
29

310
use async_trait::async_trait;
411
use dashmap::DashMap;
@@ -10,10 +17,10 @@ use tokio::{
1017
sync::mpsc,
1118
task::{self, JoinHandle},
1219
};
13-
use tracing::debug;
20+
use tracing::{debug, warn};
1421
use uuid::Uuid;
1522

16-
use crate::{delivery_uuid, error::Error, Bus, Connection, Delivery, Result};
23+
use crate::{delivery_uuid, error::Error, Bus, Connection, Delivery, DynDelivery, Result};
1724

1825
use super::{direct::DirectBus, Channel, Consumer, Publisher};
1926

@@ -104,11 +111,15 @@ impl RpcChannel {
104111
Err(_) => todo!("report error"),
105112
};
106113

107-
if let Some(tx) = pending_replies.get(&msg_id) {
108-
tx.send(msg).unwrap();
109-
} else {
110-
todo!("Report absence of sender");
111-
};
114+
let forwarding_success =
115+
if let Some(tx) = pending_replies.get(&msg_id) {
116+
tx.send(msg).is_ok()
117+
} else {
118+
false
119+
};
120+
if forwarding_success {
121+
warn!("Received reply cannot be forwarded due to dropped Receiver. UUID: {}", msg_id);
122+
}
112123
}
113124
});
114125
// `forward_reply` should run to completion
@@ -247,7 +258,7 @@ where
247258
payload: &B::PublishPayload,
248259
) -> Result<impl Future<Output = Option<Delivery<Reply<B>>>>> {
249260
let rx = self.publish_recv_many(args, payload).await?;
250-
Ok(async { rx.take(1).next().await })
261+
Ok(async move { rx.take(1).next().await })
251262
}
252263
}
253264

@@ -276,6 +287,55 @@ where
276287
}
277288
}
278289

290+
#[derive(Debug)]
291+
/// A Delivery that is not tied to a [Bus], but instead is generic over
292+
/// the publish and reply payload of the [RpcBus] or [RpcCommBus] associated with the [Delivery]
293+
/// it was converted from. It can be used to combine [Delivery]s that originate from different
294+
/// [RpcBus]es, that have identical `PublishPayload` and `ReplyPayload` types defined,
295+
/// and allow for deserializing the `PublishPayload` as well as replying with the `ReplyPayload`
296+
pub struct DynRpcDelivery<P, R> {
297+
inner: DynDelivery<P>,
298+
_reply: PhantomData<R>,
299+
}
300+
301+
impl<'dp, 'dr, P, R> DynRpcDelivery<P, R>
302+
where
303+
P: Deserialize<'dp> + Serialize,
304+
R: Deserialize<'dr> + Serialize,
305+
{
306+
/// Reply to a [DynDelivery]
307+
pub async fn reply(&self, reply_payload: &R, chan: &impl Channel) -> Result<()> {
308+
let Some(correlation_uuid) = self.inner.get_uuid() else {
309+
return Err(Error::Reply(ReplyError::NoCorrelationUuid));
310+
};
311+
let Some(reply_to) = self.inner.inner.properties.reply_to().as_ref().map(|r | r.as_str()) else {
312+
return Err(Error::Reply(ReplyError::NoReplyToConfigured))
313+
};
314+
315+
let correlation_uuid = correlation_uuid?;
316+
317+
let bytes = serde_json::to_vec(reply_payload)?;
318+
319+
debug!("Replying forth to message with correlation UUID {correlation_uuid}");
320+
chan.publish_with_properties(&bytes, reply_to, Default::default(), correlation_uuid)
321+
.await
322+
}
323+
}
324+
325+
impl<P, R> Deref for DynRpcDelivery<P, R> {
326+
type Target = DynDelivery<P>;
327+
328+
fn deref(&self) -> &Self::Target {
329+
&self.inner
330+
}
331+
}
332+
333+
impl<P, R> DerefMut for DynRpcDelivery<P, R> {
334+
fn deref_mut(&mut self) -> &mut Self::Target {
335+
&mut self.inner
336+
}
337+
}
338+
279339
#[pin_project(PinnedDrop)]
280340
struct ReplyReceiver<B> {
281341
#[pin]

src/lib.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,14 @@ where
7272
}
7373
Ok(())
7474
}
75+
76+
/// Convert this [Delivery] into a [DynDelivery]
77+
pub fn into_dyn(self) -> DynDelivery<B::PublishPayload> {
78+
DynDelivery {
79+
inner: self.inner,
80+
_marker: PhantomData,
81+
}
82+
}
7583
}
7684

7785
impl<B> From<lapin::message::Delivery> for Delivery<B> {
@@ -89,3 +97,29 @@ fn delivery_uuid(delivery: &lapin::message::Delivery) -> Option<Result<Uuid>> {
8997
};
9098
Some(Uuid::from_str(correlation_id.as_str()).map_err(Into::into))
9199
}
100+
101+
#[derive(Debug)]
102+
/// A Delivery that is not tied to a [Bus], but instead is generic over
103+
/// the publish payload of the [Bus] associated with the [Delivery]
104+
/// it was converted from. It can be used to combine [Delivery]s that originate from different
105+
/// [Bus]es, that have identical `PublishPayload` types defined,
106+
/// and allow for deserializing the `PublishPayload`
107+
pub struct DynDelivery<P> {
108+
inner: lapin::message::Delivery,
109+
_marker: PhantomData<P>,
110+
}
111+
112+
impl<'dp, P> DynDelivery<P>
113+
where
114+
P: Deserialize<'dp> + Serialize,
115+
{
116+
/// Get the message correlation [Uuid]
117+
pub fn get_uuid(&self) -> Option<Result<Uuid>> {
118+
delivery_uuid(&self.inner)
119+
}
120+
121+
/// Deserialize and return the payload from the [DynDelivery]
122+
pub fn get_payload(&'dp self) -> Result<P> {
123+
Ok(serde_json::from_slice(&self.inner.data)?)
124+
}
125+
}

0 commit comments

Comments
 (0)