Skip to content

Commit ddc8f3b

Browse files
authored
Merge pull request #233 from minghuaw/0.10-dev
0.10 dev
2 parents 4a383cd + c733079 commit ddc8f3b

File tree

21 files changed

+112
-43
lines changed

21 files changed

+112
-43
lines changed

examples/Makefile.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ do
1414
elif [ "$entry" == "$wasm32_in_browser" ]; then
1515
echo "Ignored"
1616
elif [ -d "$entry" ]; then
17+
cargo update
1718
cargo check --manifest-path "$entry/Cargo.toml" --all-targets --all-features
1819
fi
1920
done

examples/receiver/src/main.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use fe2o3_amqp::{
22
connection::Connection, link::Receiver, session::Session, types::primitives::Value, Delivery,
3+
link::RecvError,
34
};
45

56
#[tokio::main]
@@ -12,7 +13,16 @@ async fn main() {
1213
.await
1314
.unwrap();
1415

15-
let delivery: Delivery<Value> = receiver.recv().await.unwrap();
16+
let delivery: Delivery<Value> = match receiver.recv().await {
17+
Ok(delivery) => delivery,
18+
Err(e) => match e {
19+
RecvError::MessageDecode(e) => {
20+
receiver.reject(e.info, None).await.unwrap();
21+
panic!("Message decode error: {:?}", e.source)
22+
},
23+
_ => panic!("Unexpected error: {:?}", e),
24+
}
25+
};
1626
receiver.accept(&delivery).await.unwrap();
1727

1828
receiver.close().await.unwrap();

fe2o3-amqp-cbs/Cargo.toml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "fe2o3-amqp-cbs"
3-
version = "0.9.0"
3+
version = "0.10.0"
44
edition = "2021"
55
description = "An experimental impl of AMQP 1.0 CBS extension"
66
license = "MIT/Apache-2.0"
@@ -13,5 +13,6 @@ readme = "Readme.md"
1313
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
1414

1515
[dependencies]
16-
fe2o3-amqp = { version = "0.9.0", path = "../fe2o3-amqp" }
17-
fe2o3-amqp-management = { version = "0.9.0", path = "../fe2o3-amqp-management" }
16+
fe2o3-amqp = { version = "0.10.0", path = "../fe2o3-amqp" }
17+
fe2o3-amqp-management = { version = "0.10.0", path = "../fe2o3-amqp-management" }
18+
trait-variant = "0.1.1"

fe2o3-amqp-cbs/Changelog.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
# Change Log
22

3-
## 0.9.0
3+
## 0.10.0
4+
5+
- Updated `fe2o3-amqp` to "0.10.0"
6+
- Replace GAT with async fn in trait for `AsyncCbsTokenProvider` trait
7+
8+
## 0.9.0
49

510
- Unified versioning with other `fe2o3-amqp` crates
611

fe2o3-amqp-cbs/src/lib.rs

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
//! Please note that because the CBS protocol is still in draft, this crate is expected to see
66
//! breaking changes in all future releases until the draft becomes stable.
77
8-
use std::future::Future;
9-
108
use token::CbsToken;
119

1210
pub mod client;
@@ -29,20 +27,16 @@ pub trait CbsTokenProvider {
2927
}
3028

3129
/// An async version of `CbsTokenProvider`
32-
pub trait AsyncCbsTokenProvider {
30+
#[trait_variant::make(AsyncCbsTokenProvider: Send)]
31+
pub trait LocalAsyncCbsTokenProvider {
3332
/// The associated error type
3433
type Error;
3534

36-
/// The associated future type
37-
type Fut<'a>: Future<Output = Result<CbsToken<'a>, Self::Error>>
38-
where
39-
Self: 'a;
40-
4135
/// Get a CBS token
42-
fn get_token_async(
36+
async fn get_token_async(
4337
&mut self,
4438
container_id: impl AsRef<str>,
4539
resource_id: impl AsRef<str>,
4640
claims: impl IntoIterator<Item = impl AsRef<str>>,
47-
) -> Self::Fut<'_>;
41+
) -> Result<CbsToken<'_>, Self::Error>;
4842
}

fe2o3-amqp-ext/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "fe2o3-amqp-ext"
3-
version = "0.9.0"
3+
version = "0.10.0"
44
edition = "2021"
55
description = "Extension types to fe2o3-amqp"
66
license = "MIT/Apache-2.0"
@@ -12,5 +12,5 @@ readme = "Readme.md"
1212
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
1313

1414
[dependencies]
15-
serde_amqp = { version = "0.9.0", path = "../serde_amqp", features = ["derive"] }
16-
fe2o3-amqp-types = { version = "0.9.0", path = "../fe2o3-amqp-types" }
15+
serde_amqp = { version = "0.10.0", path = "../serde_amqp", features = ["derive"] }
16+
fe2o3-amqp-types = { version = "0.10.0", path = "../fe2o3-amqp-types" }

fe2o3-amqp-ext/Changelog.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Changelog
22

3+
## 0.10.0
4+
5+
1. Unified versioning with other `fe2o3-amqp` crates
6+
37
## 0.9.0
48

59
1. Unified versioning with other `fe2o3-amqp` crates

fe2o3-amqp-management/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "fe2o3-amqp-management"
3-
version = "0.9.1"
3+
version = "0.10.0"
44
edition = "2021"
55
description = "An experimental impl of AMQP 1.0 management extension"
66
license = "MIT/Apache-2.0"
@@ -13,8 +13,8 @@ readme = "Readme.md"
1313
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
1414

1515
[dependencies]
16-
fe2o3-amqp = { version = "0.9.3", path = "../fe2o3-amqp" }
17-
fe2o3-amqp-types = { version = "0.9.1", path = "../fe2o3-amqp-types/" }
16+
fe2o3-amqp = { version = "0.10.0", path = "../fe2o3-amqp" }
17+
fe2o3-amqp-types = { version = "0.10.0", path = "../fe2o3-amqp-types/" }
1818
serde = "1"
1919
thiserror = "1"
2020

fe2o3-amqp-management/Changelog.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Changelog
22

3+
## 0.10.0
4+
5+
1. Unified versioning with other `fe2o3-amqp` crates
6+
37
## 0.2.3
48

59
1. Backported 0.9.1

fe2o3-amqp-types/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "fe2o3-amqp-types"
3-
version = "0.9.1"
3+
version = "0.10.0"
44
edition = "2021"
55
description = "Implementation of AMQP1.0 data types"
66
license = "MIT/Apache-2.0"
@@ -33,7 +33,7 @@ transaction = ["primitive", "messaging"]
3333
security = ["primitive"]
3434

3535
[dependencies]
36-
serde_amqp = { version = "0.9.1", path = "../serde_amqp", features = ["derive", "extensions"] }
36+
serde_amqp = { version = "0.10.0", path = "../serde_amqp", features = ["derive", "extensions"] }
3737
serde = { version = "1", features = ["derive"] }
3838
serde_bytes = "0.11"
3939
ordered-float = { version = "4", features = ["serde"] }

fe2o3-amqp-types/Changelog.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Change Log
22

3+
## 0.10.0
4+
5+
1. Unified versioning with other `fe2o3-amqp` crates
6+
37
## 0.7.2
48

59
1. (Backporting 0.9.1) Updated `serde_amqp` to "0.5.10"

fe2o3-amqp-ws/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "fe2o3-amqp-ws"
3-
version = "0.9.0"
3+
version = "0.10.0"
44
edition = "2021"
55
description = "WebSocket binding stream for AMQP1.0"
66
license = "MIT/Apache-2.0"

fe2o3-amqp-ws/Changelog.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# fe2o3-amqp-ws
22

3+
## 0.10.0
4+
5+
1. Unified versioning with other `fe2o3-amqp` crates
6+
37
## 0.9.0
48

59
1. Unified versioning with other `fe2o3-amqp` crates

fe2o3-amqp/Cargo.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "fe2o3-amqp"
3-
version = "0.9.8"
3+
version = "0.10.0"
44
edition = "2021"
55
description = "An implementation of AMQP1.0 protocol based on serde and tokio"
66
license = "MIT/Apache-2.0"
@@ -40,8 +40,8 @@ acceptor = []
4040
scram = ["sha-1", "sha2", "rand", "base64", "stringprep", "hmac", "pbkdf2"]
4141

4242
[dependencies]
43-
serde_amqp = { version = "0.9.1", path = "../serde_amqp" }
44-
fe2o3-amqp-types = { version = "0.9.1", path = "../fe2o3-amqp-types" }
43+
serde_amqp = { version = "0.10.0", path = "../serde_amqp" }
44+
fe2o3-amqp-types = { version = "0.10.0", path = "../fe2o3-amqp-types" }
4545

4646
bytes = "1"
4747
tokio-util = { version = "0.7", features = ["codec"] } # tokio-rs/tokio#4816
@@ -85,7 +85,7 @@ fluvio-wasm-timer = "0.2"
8585
[dev-dependencies]
8686
tokio-test = { version = "0.4" }
8787
testcontainers = "0.15"
88-
fe2o3-amqp-ext = { version = "0.9.0", path = "../fe2o3-amqp-ext" }
88+
fe2o3-amqp-ext = { version = "0.10.0", path = "../fe2o3-amqp-ext" }
8989

9090
[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies]
9191
tokio = { version = "1", features = ["rt", "rt-multi-thread", "macros", "parking_lot"] }

fe2o3-amqp/Changelog.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# Change Log
22

3+
## 0.10.0
4+
5+
1. Changed `RecvError::MessageDecodeError` to `RecvError::MessageDecode(MessageDecodeError)`.
6+
2. Added `MessageDecodeError` which carries the source `serde_amqp::Error` when deserializing the
7+
message body fails and the delivery info that can be used to dispose the message.
8+
39
## 0.9.8
410

511
1. Fixed bug with reattaching session with sender.

fe2o3-amqp/src/link/delivery.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ pub struct DeliveryInfo {
3030
/// Receiver settle mode that is carried by the transfer frame
3131
pub(crate) rcv_settle_mode: Option<ReceiverSettleMode>,
3232

33-
_sealed: Sealed,
33+
pub(crate) _sealed: Sealed,
3434
}
3535

3636
impl DeliveryInfo {

fe2o3-amqp/src/link/error.rs

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use crate::session::error::AllocLinkError;
77
#[cfg(docsrs)]
88
use fe2o3_amqp_types::transaction::Coordinator;
99

10-
use super::{receiver::DetachedReceiver, sender::DetachedSender};
10+
use super::{delivery::DeliveryInfo, receiver::DetachedReceiver, sender::DetachedSender};
1111

1212
/// Error associated with detaching
1313
#[derive(Debug, thiserror::Error)]
@@ -482,7 +482,7 @@ pub(crate) enum ReceiverTransferError {
482482

483483
/// Decoding Message failed
484484
#[error("Decoding Message failed")]
485-
MessageDecodeError,
485+
MessageDecode(#[from] MessageDecodeError),
486486

487487
/// If the negotiated link value is first, then it is illegal to set this
488488
/// field to second.
@@ -494,6 +494,24 @@ pub(crate) enum ReceiverTransferError {
494494
InconsistentFieldInMultiFrameDelivery,
495495
}
496496

497+
/// Error decoding message
498+
#[derive(Debug)]
499+
pub struct MessageDecodeError {
500+
/// Delivery info
501+
pub info: DeliveryInfo,
502+
503+
/// Source error
504+
pub source: serde_amqp::Error,
505+
}
506+
507+
impl std::fmt::Display for MessageDecodeError {
508+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
509+
write!(f, "{:?}: {}", self.info, self.source)
510+
}
511+
}
512+
513+
impl std::error::Error for MessageDecodeError {}
514+
497515
/// Errors associated with receiving
498516
#[derive(Debug, thiserror::Error)]
499517
pub enum RecvError {
@@ -515,7 +533,7 @@ pub enum RecvError {
515533

516534
/// Decoding Message failed
517535
#[error("Decoding Message failed")]
518-
MessageDecodeError,
536+
MessageDecode(#[from] MessageDecodeError),
519537

520538
/// If the negotiated link value is first, then it is illegal to set this
521539
/// field to second.
@@ -537,7 +555,7 @@ impl From<ReceiverTransferError> for RecvError {
537555
ReceiverTransferError::TransferLimitExceeded => RecvError::TransferLimitExceeded,
538556
ReceiverTransferError::DeliveryIdIsNone => RecvError::DeliveryIdIsNone,
539557
ReceiverTransferError::DeliveryTagIsNone => RecvError::DeliveryTagIsNone,
540-
ReceiverTransferError::MessageDecodeError => RecvError::MessageDecodeError,
558+
ReceiverTransferError::MessageDecode(err) => RecvError::MessageDecode(err),
541559
ReceiverTransferError::IllegalRcvSettleModeInTransfer => {
542560
RecvError::IllegalRcvSettleModeInTransfer
543561
}

fe2o3-amqp/src/link/receiver_link.rs

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use serde_amqp::format_code::EncodingCodes;
66

77
use crate::{
88
endpoint::LinkExt,
9-
util::{is_consecutive, AsByteIterator, IntoReader},
9+
util::{is_consecutive, AsByteIterator, IntoReader, Sealed},
1010
};
1111

1212
use super::{delivery::DeliveryInfo, *};
@@ -154,12 +154,11 @@ where
154154
.ok_or(Self::TransferError::DeliveryTagIsNone)?;
155155
let message_format = transfer.message_format;
156156

157-
let (message, mode) = if settled_by_sender {
157+
let (result, mode) = if settled_by_sender {
158158
// If the message is pre-settled, there is no need to
159159
// add to the unsettled map and no need to reply to the Sender
160-
let message = T::decode_into_message(payload.into_reader())
161-
.map_err(|_| Self::TransferError::MessageDecodeError)?;
162-
(message, None)
160+
let result = T::decode_into_message(payload.into_reader());
161+
(result, None)
163162
} else {
164163
// If the message is being sent settled by the sender, the value of this
165164
// field is ignored.
@@ -177,8 +176,7 @@ where
177176
None => None,
178177
};
179178

180-
let message = T::decode_into_message(payload.into_reader())
181-
.map_err(|_| Self::TransferError::MessageDecodeError)?;
179+
let result = T::decode_into_message(payload.into_reader());
182180

183181
let state = DeliveryState::Received(Received {
184182
section_number, // What is section number?
@@ -201,7 +199,23 @@ where
201199
.get_or_insert(OrderedMap::new())
202200
.insert(delivery_tag.clone(), Some(state));
203201
}
204-
(message, mode)
202+
(result, mode)
203+
};
204+
205+
let message = match result {
206+
Ok(message) => message,
207+
Err(source) => {
208+
let info = DeliveryInfo {
209+
delivery_id,
210+
delivery_tag,
211+
rcv_settle_mode: mode,
212+
_sealed: Sealed {},
213+
};
214+
return Err(MessageDecodeError {
215+
source,
216+
info,
217+
}.into());
218+
}
205219
};
206220

207221
let link_output_handle = self

fe2o3-amqp/src/transaction/coordinator.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ impl TxnCoordinator {
223223
}
224224
RecvError::DeliveryIdIsNone
225225
| RecvError::DeliveryTagIsNone
226-
| RecvError::MessageDecodeError
226+
| RecvError::MessageDecode(_)
227227
| RecvError::IllegalRcvSettleModeInTransfer
228228
| RecvError::InconsistentFieldInMultiFrameDelivery
229229
| RecvError::TransactionalAcquisitionIsNotImeplemented => {

serde_amqp/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "serde_amqp"
3-
version = "0.9.1"
3+
version = "0.10.0"
44
edition = "2021"
55
description = "A serde implementation of AMQP1.0 protocol."
66
license = "MIT/Apache-2.0"

serde_amqp/Changelog.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Change Log
22

3+
## 0.10.0
4+
5+
1. Unified versioning with other `fe2o3-amqp` crates
6+
37
## 0.5.10
48

59
1. Backported `0.9.1`

0 commit comments

Comments
 (0)