Skip to content

Commit 1a2d7ea

Browse files
authored
Merge pull request #66 from minghuaw/avoid_copy_partial_payload
Avoid copy partial payload for multi-frame delivery
2 parents 22e5416 + b007e07 commit 1a2d7ea

File tree

11 files changed

+236
-63
lines changed

11 files changed

+236
-63
lines changed

examples/protocol_test/src/bin/connector.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ use fe2o3_amqp::{types::primitives::Value, Connection, Delivery, Receiver, Sende
44
use tracing::{instrument, Level};
55
use tracing_subscriber::FmtSubscriber;
66

7-
// const SASL_PLAIN: &str = "";
8-
const SASL_PLAIN: &str = "guest:guest";
7+
const SASL_PLAIN: &str = "";
8+
// const SASL_PLAIN: &str = "guest:guest";
99
const BASE_ADDR: &str = "localhost:5672";
1010

1111
#[instrument]

examples/protocol_test/src/bin/listener.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ async fn session_main(mut session: ListenerSessionHandle) {
3434
let handle = tokio::spawn(async move {
3535
tracing::info!("Incoming link is connected (remote: sender, local: receiver");
3636
let delivery = recver.recv::<Value>().await.unwrap();
37-
tracing::info!(message = ?delivery.message());
37+
tracing::info!(id = ?delivery.delivery_id());
3838
recver.accept(&delivery).await.unwrap();
3939
if let Err(e) = recver.close().await {
4040
// The remote may close the session
@@ -72,7 +72,7 @@ async fn main() {
7272
// let connection_acceptor = ConnectionAcceptor::new("test_conn_listener");
7373
let connection_acceptor = ConnectionAcceptor::builder()
7474
.container_id("example_connection_acceptor")
75-
.sasl_acceptor(SaslPlainMechanism::new("guest", "guest"))
75+
// .sasl_acceptor(SaslPlainMechanism::new("guest", "guest"))
7676
.build();
7777

7878
while let Ok((stream, addr)) = tcp_listener.accept().await {

examples/protocol_test/src/bin/receiver.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,7 @@ async fn main() {
7070
// let body = delivery.into_body();
7171
println!("{:?}", delivery.delivery_id());
7272

73-
if let Err(err) = receiver.close().await {
74-
println!("{}", err);
75-
}
76-
73+
receiver.close().await.unwrap();
7774
session.end().await.unwrap();
7875
connection.close().await.unwrap();
7976
}

examples/protocol_test/src/bin/send_large_content.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ async fn main() -> Result<()> {
206206

207207
// let args: Vec<String> = env::args().collect();
208208
// let test_sender = TestSender::try_from(args)?;
209-
let message_iter = create_message_sizes("string", "[10, 10, 10, 10]")?;
209+
let message_iter = create_message_sizes("string", "[10, 10]")?;
210210
let test_sender = TestSender {
211211
broker_addr: "localhost:5672".to_string(),
212212
target_addr: "q1".to_string(),

examples/protocol_test/src/bin/sender.rs

Lines changed: 27 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -22,40 +22,41 @@ async fn main() {
2222
let subscriber = FmtSubscriber::builder()
2323
// all spans/events with a level higher than TRACE (e.g, debug, info, warn, etc.)
2424
// will be written to stdout.
25-
.with_max_level(Level::TRACE)
25+
.with_max_level(Level::DEBUG)
2626
// .with_max_level(Level::DEBUG)
2727
// completes the builder.
2828
.finish();
2929

3030
tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");
3131

32-
let addr = "localhost:5671";
33-
let domain = "localhost";
34-
let stream = TcpStream::connect(addr).await.unwrap();
35-
let connector = native_tls::TlsConnector::builder()
36-
.danger_accept_invalid_certs(true)
37-
.build()
38-
.unwrap();
39-
let connector = tokio_native_tls::TlsConnector::from(connector);
40-
let tls_stream = connector.connect(domain, stream).await.unwrap();
41-
42-
// let mut connection = Connection::open("connection-1", "amqp://guest:guest@localhost:5671")
32+
// let addr = "localhost:5671";
33+
// let domain = "localhost";
34+
// let stream = TcpStream::connect(addr).await.unwrap();
35+
// let connector = native_tls::TlsConnector::builder()
36+
// .danger_accept_invalid_certs(true)
37+
// .build()
38+
// .unwrap();
39+
// let connector = tokio_native_tls::TlsConnector::from(connector);
40+
// let tls_stream = connector.connect(domain, stream).await.unwrap();
41+
42+
// let mut connection = Connection::builder()
43+
// .container_id("connection-1")
44+
// .scheme("amqp")
45+
// .max_frame_size(1000)
46+
// .channel_max(9)
47+
// .idle_time_out(50_000 as u32)
48+
// // .sasl_profile(SaslProfile::Plain {
49+
// // username: "guest".into(),
50+
// // password: "guest".into(),
51+
// // })
52+
// .open_with_stream(tls_stream)
53+
// // .open("amqp://localhost:5672")
54+
// // .open("amqp://guest:guest@localhost:5672")
55+
// // .open("localhost:5672")
4356
// .await
4457
// .unwrap();
45-
let mut connection = Connection::builder()
46-
.container_id("connection-1")
47-
.scheme("amqp")
48-
.max_frame_size(1000)
49-
.channel_max(9)
50-
.idle_time_out(50_000 as u32)
51-
// .sasl_profile(SaslProfile::Plain {
52-
// username: "guest".into(),
53-
// password: "guest".into(),
54-
// })
55-
.open_with_stream(tls_stream)
56-
// .open("amqp://localhost:5672")
57-
// .open("amqp://guest:guest@localhost:5672")
58-
// .open("localhost:5672")
58+
59+
let mut connection = Connection::open("connection-1", "amqp://localhost:5672")
5960
.await
6061
.unwrap();
6162

fe2o3-amqp/src/control.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ pub(crate) enum SessionControl {
8787
impl std::fmt::Display for SessionControl {
8888
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
8989
match self {
90-
SessionControl::End(_) => write!(f, "End"),
90+
SessionControl::End(err) => write!(f, "End({:?})", err),
9191
SessionControl::AllocateLink {
9292
link_name: _,
9393
link_relay: _,

fe2o3-amqp/src/endpoint/link.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use tokio::sync::mpsc;
1414
use crate::{
1515
control::SessionControl,
1616
link::{delivery::Delivery, state::LinkState, LinkFrame},
17-
Payload,
17+
Payload, util::{AsByteIterator, IntoReader},
1818
};
1919

2020
use super::{OutputHandle, Settlement};
@@ -163,10 +163,10 @@ pub(crate) trait ReceiverLink: Link + LinkExt {
163163

164164
// More than one transfer frames should be hanlded by the
165165
// `Receiver`
166-
async fn on_complete_transfer<'a, T>(
166+
async fn on_complete_transfer<'a, T, P>(
167167
&'a mut self,
168168
transfer: Transfer,
169-
payload: Payload,
169+
payload: P,
170170
) -> Result<
171171
(
172172
Delivery<T>,
@@ -175,7 +175,8 @@ pub(crate) trait ReceiverLink: Link + LinkExt {
175175
Self::TransferError,
176176
>
177177
where
178-
T: DecodeIntoMessage + Send;
178+
T: DecodeIntoMessage + Send,
179+
for<'b> P: IntoReader + AsByteIterator<'b> + Send + 'a;
179180

180181
async fn dispose(
181182
&mut self,

fe2o3-amqp/src/link/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ mod source;
55
use std::{collections::BTreeMap, marker::PhantomData, sync::Arc};
66

77
use async_trait::async_trait;
8-
use bytes::Buf;
98
use fe2o3_amqp_types::{
109
definitions::{
1110
self, AmqpError, DeliveryNumber, DeliveryTag, MessageFormat, ReceiverSettleMode, Role,

fe2o3-amqp/src/link/receiver.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
use std::time::Duration;
44

55
use async_trait::async_trait;
6-
use bytes::BytesMut;
76
use fe2o3_amqp_types::{
87
definitions::{self, DeliveryNumber, DeliveryTag, SequenceNo},
98
messaging::{
@@ -21,7 +20,7 @@ use crate::{
2120
control::SessionControl,
2221
endpoint::{self, LinkAttach, LinkDetach, LinkExt},
2322
session::SessionHandle,
24-
Payload,
23+
Payload,
2524
};
2625

2726
use super::{
@@ -63,20 +62,20 @@ macro_rules! or_assign {
6362
#[derive(Debug)]
6463
pub(crate) struct IncompleteTransfer {
6564
pub performative: Transfer,
66-
pub buffer: BytesMut,
65+
pub buffer: Vec<Payload>,
6766
pub section_number: u32,
6867
pub section_offset: u64,
6968
}
7069

7170
impl IncompleteTransfer {
7271
pub fn new(transfer: Transfer, partial_payload: Payload) -> Self {
7372
let (number, offset) = section_number_and_offset(partial_payload.as_ref());
74-
let mut buffer = BytesMut::new();
75-
// TODO: anyway to make this not copying the bytes?
76-
buffer.extend(partial_payload);
73+
// let mut buffer = BytesMut::new();
74+
// // TODO: anyway to make this not copying the bytes?
75+
// buffer.extend(partial_payload);
7776
Self {
7877
performative: transfer,
79-
buffer,
78+
buffer: vec![partial_payload],
8079
section_number: number,
8180
section_offset: offset,
8281
}
@@ -140,7 +139,7 @@ impl IncompleteTransfer {
140139
self.section_offset = offset;
141140
}
142141

143-
self.buffer.extend(other);
142+
self.buffer.push(other);
144143
}
145144
}
146145

@@ -654,9 +653,10 @@ where
654653
match self.incomplete_transfer.take() {
655654
Some(mut incomplete) => {
656655
incomplete.or_assign(transfer)?;
657-
incomplete.buffer.extend(payload);
656+
incomplete.buffer.push(payload);
657+
658658
self.link
659-
.on_complete_transfer(incomplete.performative, incomplete.buffer.freeze())
659+
.on_complete_transfer(incomplete.performative, incomplete.buffer)
660660
.await?
661661
}
662662
None => {

fe2o3-amqp/src/link/receiver_link.rs

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use fe2o3_amqp_types::messaging::message::DecodeIntoMessage;
22
use serde_amqp::format_code::EncodingCodes;
33

4+
use crate::util::{AsByteIterator, IntoReader};
5+
46
use super::*;
57

68
const DESCRIBED_TYPE: u8 = EncodingCodes::DescribedType as u8;
@@ -146,10 +148,10 @@ where
146148
}
147149
}
148150

149-
async fn on_complete_transfer<'a, T>(
151+
async fn on_complete_transfer<'a, T, P>(
150152
&'a mut self,
151153
transfer: Transfer,
152-
payload: Payload,
154+
payload: P,
153155
) -> Result<
154156
(
155157
Delivery<T>,
@@ -159,6 +161,7 @@ where
159161
>
160162
where
161163
T: DecodeIntoMessage + Send,
164+
for<'b> P: IntoReader + AsByteIterator<'b> + Send + 'a,
162165
{
163166
// ReceiverFlowState will not wait until link credit is available.
164167
// Will return with an error if there is not enough link credit.
@@ -182,7 +185,7 @@ where
182185
let (message, delivery_state) = if settled_by_sender {
183186
// If the message is pre-settled, there is no need to
184187
// add to the unsettled map and no need to reply to the Sender
185-
let message = T::decode_into_message(payload.reader())
188+
let message = T::decode_into_message(payload.into_reader())
186189
.map_err(|_| Self::TransferError::MessageDecodeError)?;
187190
(message, None)
188191
} else {
@@ -208,7 +211,7 @@ where
208211
// once it has arrived without waiting for the sender to settle first.
209212
ReceiverSettleMode::First => {
210213
// Spontaneously settle the message with an Accept
211-
let message = T::decode_into_message(payload.reader())
214+
let message = T::decode_into_message(payload.into_reader())
212215
.map_err(|_| Self::TransferError::MessageDecodeError)?;
213216

214217
(message, Some(DeliveryState::Accepted(Accepted {})))
@@ -218,9 +221,9 @@ where
218221
// disposition from the sender.
219222
ReceiverSettleMode::Second => {
220223
// Add to unsettled map
221-
let section_offset = rfind_offset_of_complete_message(payload.as_ref())
224+
let section_offset = rfind_offset_of_complete_message(&payload)
222225
.ok_or(Self::TransferError::MessageDecodeError)?;
223-
let message = T::decode_into_message(payload.reader())
226+
let message = T::decode_into_message(payload.into_reader())
224227
.map_err(|_| Self::TransferError::MessageDecodeError)?;
225228
let section_number = message.sections();
226229

@@ -320,13 +323,16 @@ where
320323
}
321324

322325
/// Finds offset of a complete message
323-
fn rfind_offset_of_complete_message(bytes: &[u8]) -> Option<u64> {
326+
fn rfind_offset_of_complete_message<'a, B>(bytes: &'a B) -> Option<u64>
327+
where
328+
B: AsByteIterator<'a>,
329+
{
324330
// For a complete message, the only need is to check Footer or Body
325-
326-
let len = bytes.len();
327-
let mut iter = bytes
328-
.iter()
329-
.zip(bytes.iter().skip(1).zip(bytes.iter().skip(2)));
331+
let b0 = bytes.as_byte_iterator();
332+
let b1 = bytes.as_byte_iterator().skip(1);
333+
let b2 = bytes.as_byte_iterator().skip(2);
334+
let len = b0.len();
335+
let mut iter = b0.zip(b1.zip(b2));
330336

331337
iter.rposition(|(&b0, (&b1, &b2))| {
332338
matches!(

0 commit comments

Comments
 (0)