Skip to content

Commit

Permalink
experiments
Browse files Browse the repository at this point in the history
Signed-off-by: Marc-Antoine Perennou <Marc-Antoine@Perennou.com>
  • Loading branch information
Keruspe committed Aug 30, 2024
1 parent 74f18af commit 8a00c90
Show file tree
Hide file tree
Showing 7 changed files with 250 additions and 19 deletions.
122 changes: 122 additions & 0 deletions examples/t.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
use lapin::{
message::DeliveryResult, options::*, publisher_confirm::Confirmation, types::FieldTable,
BasicProperties, Connection, ConnectionProperties,
};
use tracing::info;

fn main() {
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "info");
}

tracing_subscriber::fmt::init();

let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
let recovery_config = lapin::experimental::RecoveryConfig {
auto_recover_channels: true,
};

async_global_executor::block_on(async {
let conn = Connection::connect(
&addr,
ConnectionProperties::default().with_experimental_recovery_config(recovery_config),
)
.await
.expect("connection error");

info!("CONNECTED");

{
let channel1 = conn.create_channel().await.expect("create_channel");
let channel2 = conn.create_channel().await.expect("create_channel");
channel1
.confirm_select(ConfirmSelectOptions::default())
.await
.expect("confirm_select");
channel1
.queue_declare(
"recover-test",
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await
.expect("queue_declare");

info!("will consume");
let channel = channel2.clone();
channel2
.basic_consume(
"recover-test",
"my_consumer",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await
.expect("basic_consume")
.set_delegate(move |delivery: DeliveryResult| {
let channel = channel.clone();
async move {
info!(message=?delivery, "received message");
if let Ok(Some(delivery)) = delivery {
delivery
.ack(BasicAckOptions::default())
.await
.expect("basic_ack");
if &delivery.data[..] == b"after" {
channel
.basic_cancel("my_consumer", BasicCancelOptions::default())
.await
.expect("basic_cancel");
}
}
}
});

info!("will publish");
let confirm = channel1
.basic_publish(
"",
"recover-test",
BasicPublishOptions::default(),
b"before",
BasicProperties::default(),
)
.await
.expect("basic_publish")
.await
.expect("publisher-confirms");
assert_eq!(confirm, Confirmation::Ack(None));

info!("before fail");
assert!(channel1
.queue_declare(
"fake queue",
QueueDeclareOptions {
passive: true,
..QueueDeclareOptions::default()
},
FieldTable::default(),
)
.await
.is_err());
info!("after fail");

info!("publish after");
let confirm = channel1
.basic_publish(
"",
"recover-test",
BasicPublishOptions::default(),
b"after",
BasicProperties::default(),
)
.await
.expect("basic_publish")
.await
.expect("publisher-confirms");
assert_eq!(confirm, Confirmation::Ack(None));
}

conn.run().expect("conn.run");
});
}
9 changes: 9 additions & 0 deletions src/acknowledgement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ impl Acknowledgements {
pub(crate) fn on_channel_error(&self, error: Error) {
self.0.lock().on_channel_error(error);
}

pub(crate) fn reset(&self) {
self.0.lock().reset();
}
}

impl fmt::Debug for Acknowledgements {
Expand Down Expand Up @@ -174,4 +178,9 @@ impl Inner {
}
}
}

fn reset(&mut self) {
// FIXME(recovery): handle pendings ??
self.delivery_tag = IdSequence::new(false);
}
}
46 changes: 37 additions & 9 deletions src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ impl fmt::Debug for Channel {
}

impl Channel {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
channel_id: ChannelId,
configuration: Configuration,
Expand Down Expand Up @@ -579,13 +580,20 @@ impl Channel {
}

fn on_channel_close_ok_sent(&self, error: Option<Error>) {
self.set_closed(
error
.clone()
.unwrap_or(Error::InvalidChannelState(ChannelState::Closing)),
);
if let Some(error) = error {
self.error_handler.on_error(error);
match (self.recovery_config.auto_recover_channels, error) {
(true, Some(error)) if error.is_amqp_soft_error() => {
self.status.set_reconnecting(error)
}
(_, error) => {
self.set_closed(
error
.clone()
.unwrap_or(Error::InvalidChannelState(ChannelState::Closing)),
);
if let Some(error) = error {
self.error_handler.on_error(error);
}
}
}
}

Expand Down Expand Up @@ -868,6 +876,15 @@ impl Channel {
resolver: PromiseResolver<Channel>,
channel: Channel,
) -> Result<()> {
if self.recovery_config.auto_recover_channels {
self.status.update_recovery_context(|ctx| {
ctx.set_expected_replies(self.frames.take_expected_replies(self.id));
});
self.acknowledgements.reset();
if !self.status.confirm() {
self.status.finalize_recovery();
}
}
self.set_state(ChannelState::Connected);
resolver.resolve(channel);
Ok(())
Expand Down Expand Up @@ -907,8 +924,19 @@ impl Channel {
self.set_closing(error.clone().ok());
let error = error.map_err(|error| info!(channel=%self.id, ?method, code_to_error=%error, "Channel closed with a non-error code")).ok();
let channel = self.clone();
self.internal_rpc
.register_internal_future(async move { channel.channel_close_ok(error).await });
self.internal_rpc.register_internal_future(async move {
channel.channel_close_ok(error).await?;
if channel.recovery_config.auto_recover_channels {
let ch = channel.clone();
channel.channel_open(ch).await?;
if channel.status.confirm() {
channel
.confirm_select(ConfirmSelectOptions::default())
.await?;
}
}
Ok(())
});
Ok(())
}

Expand Down
33 changes: 33 additions & 0 deletions src/channel_recovery_context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use crate::{
frames::{ExpectedReply, Frames},
Error,
};

use std::collections::VecDeque;

pub(crate) struct ChannelRecoveryContext {
pub(crate) cause: Error,
expected_replies: Option<VecDeque<ExpectedReply>>,
}

impl ChannelRecoveryContext {
pub(crate) fn new(cause: Error) -> Self {
Self {
cause,
expected_replies: None,
}
}

pub(crate) fn set_expected_replies(
&mut self,
expected_replies: Option<VecDeque<ExpectedReply>>,
) {
self.expected_replies = expected_replies;
}

pub(crate) fn finalize_recovery(self) {
if let Some(replies) = self.expected_replies {
Frames::cancel_expected_replies(replies, self.cause);
}
}
}
37 changes: 34 additions & 3 deletions src/channel_status.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::{
channel_receiver_state::{ChannelReceiverStates, DeliveryCause},
channel_recovery_context::ChannelRecoveryContext,
types::{ChannelId, Identifier, PayloadSize},
Result,
Error, Result,
};
use parking_lot::Mutex;
use std::{fmt, sync::Arc};
Expand All @@ -12,7 +13,7 @@ pub struct ChannelStatus(Arc<Mutex<Inner>>);

impl ChannelStatus {
pub fn initializing(&self) -> bool {
self.0.lock().state == ChannelState::Initial
[ChannelState::Initial, ChannelState::Reconnecting].contains(&self.0.lock().state)
}

pub fn closing(&self) -> bool {
Expand All @@ -23,6 +24,17 @@ impl ChannelStatus {
self.0.lock().state == ChannelState::Connected
}

pub(crate) fn update_recovery_context<F: Fn(&mut ChannelRecoveryContext)>(&self, apply: F) {
let mut inner = self.0.lock();
if let Some(context) = inner.recovery_context.as_mut() {
apply(context);
}
}

pub(crate) fn finalize_recovery(&self) {
self.0.lock().finalize_recovery();
}

pub(crate) fn can_receive_messages(&self) -> bool {
[ChannelState::Closing, ChannelState::Connected].contains(&self.0.lock().state)
}
Expand All @@ -32,8 +44,10 @@ impl ChannelStatus {
}

pub(crate) fn set_confirm(&self) {
self.0.lock().confirm = true;
let mut inner = self.0.lock();
inner.confirm = true;
trace!("Publisher confirms activated");
inner.finalize_recovery();
}

pub fn state(&self) -> ChannelState {
Expand All @@ -44,6 +58,12 @@ impl ChannelStatus {
self.0.lock().state = state;
}

pub(crate) fn set_reconnecting(&self, error: Error) {
let mut inner = self.0.lock();
inner.state = ChannelState::Reconnecting;
inner.recovery_context = Some(ChannelRecoveryContext::new(error));
}

pub(crate) fn auto_close(&self, id: ChannelId) -> bool {
id != 0 && self.0.lock().state == ChannelState::Connected
}
Expand Down Expand Up @@ -116,6 +136,7 @@ impl ChannelStatus {
pub enum ChannelState {
#[default]
Initial,
Reconnecting,
Connected,
Closing,
Closed,
Expand All @@ -141,6 +162,7 @@ struct Inner {
send_flow: bool,
state: ChannelState,
receiver_state: ChannelReceiverStates,
recovery_context: Option<ChannelRecoveryContext>,
}

impl Default for Inner {
Expand All @@ -150,6 +172,15 @@ impl Default for Inner {
send_flow: true,
state: ChannelState::default(),
receiver_state: ChannelReceiverStates::default(),
recovery_context: None,
}
}
}

impl Inner {
pub(crate) fn finalize_recovery(&mut self) {
if let Some(ctx) = self.recovery_context.take() {
ctx.finalize_recovery();
}
}
}
21 changes: 14 additions & 7 deletions src/frames.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,21 @@ impl Frames {
self.inner.lock().drop_pending(error);
}

pub(crate) fn take_expected_replies(
&self,
channel_id: ChannelId,
) -> Option<VecDeque<ExpectedReply>> {
self.inner.lock().expected_replies.remove(&channel_id)
}

pub(crate) fn clear_expected_replies(&self, channel_id: ChannelId, error: Error) {
self.inner.lock().clear_expected_replies(channel_id, error);
if let Some(replies) = self.take_expected_replies(channel_id) {
Self::cancel_expected_replies(replies, error)
}
}

pub(crate) fn cancel_expected_replies(replies: VecDeque<ExpectedReply>, error: Error) {
Inner::cancel_expected_replies(replies, error)
}

pub(crate) fn poison(&self) -> Option<Error> {
Expand Down Expand Up @@ -265,12 +278,6 @@ impl Inner {
None
}

fn clear_expected_replies(&mut self, channel_id: ChannelId, error: Error) {
if let Some(replies) = self.expected_replies.remove(&channel_id) {
Self::cancel_expected_replies(replies, error);
}
}

fn cancel_expected_replies(replies: VecDeque<ExpectedReply>, error: Error) {
for ExpectedReply(reply, cancel) in replies {
match reply {
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ mod buffer;
mod channel;
mod channel_closer;
mod channel_receiver_state;
mod channel_recovery_context;
mod channel_status;
mod channels;
mod configuration;
Expand Down

0 comments on commit 8a00c90

Please sign in to comment.