diff --git a/examples/t.rs b/examples/t.rs new file mode 100644 index 00000000..22dfbc6f --- /dev/null +++ b/examples/t.rs @@ -0,0 +1,122 @@ +use lapin::{ + message::DeliveryResult, options::*, publisher_confirm::Confirmation, types::FieldTable, + BasicProperties, Connection, ConnectionProperties, +}; +use tracing::info; + +fn main() { + if std::env::var("RUST_LOG").is_err() { + std::env::set_var("RUST_LOG", "info"); + } + + tracing_subscriber::fmt::init(); + + let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into()); + let recovery_config = lapin::experimental::RecoveryConfig { + auto_recover_channels: true, + }; + + async_global_executor::block_on(async { + let conn = Connection::connect( + &addr, + ConnectionProperties::default().with_experimental_recovery_config(recovery_config), + ) + .await + .expect("connection error"); + + info!("CONNECTED"); + + { + let channel1 = conn.create_channel().await.expect("create_channel"); + let channel2 = conn.create_channel().await.expect("create_channel"); + channel1 + .confirm_select(ConfirmSelectOptions::default()) + .await + .expect("confirm_select"); + channel1 + .queue_declare( + "recover-test", + QueueDeclareOptions::default(), + FieldTable::default(), + ) + .await + .expect("queue_declare"); + + info!("will consume"); + let channel = channel2.clone(); + channel2 + .basic_consume( + "recover-test", + "my_consumer", + BasicConsumeOptions::default(), + FieldTable::default(), + ) + .await + .expect("basic_consume") + .set_delegate(move |delivery: DeliveryResult| { + let channel = channel.clone(); + async move { + info!(message=?delivery, "received message"); + if let Ok(Some(delivery)) = delivery { + delivery + .ack(BasicAckOptions::default()) + .await + .expect("basic_ack"); + if &delivery.data[..] == b"after" { + channel + .basic_cancel("my_consumer", BasicCancelOptions::default()) + .await + .expect("basic_cancel"); + } + } + } + }); + + info!("will publish"); + let confirm = channel1 + .basic_publish( + "", + "recover-test", + BasicPublishOptions::default(), + b"before", + BasicProperties::default(), + ) + .await + .expect("basic_publish") + .await + .expect("publisher-confirms"); + assert_eq!(confirm, Confirmation::Ack(None)); + + info!("before fail"); + assert!(channel1 + .queue_declare( + "fake queue", + QueueDeclareOptions { + passive: true, + ..QueueDeclareOptions::default() + }, + FieldTable::default(), + ) + .await + .is_err()); + info!("after fail"); + + info!("publish after"); + let confirm = channel1 + .basic_publish( + "", + "recover-test", + BasicPublishOptions::default(), + b"after", + BasicProperties::default(), + ) + .await + .expect("basic_publish") + .await + .expect("publisher-confirms"); + assert_eq!(confirm, Confirmation::Ack(None)); + } + + conn.run().expect("conn.run"); + }); +} diff --git a/src/acknowledgement.rs b/src/acknowledgement.rs index f5ce8886..6fac1ec0 100644 --- a/src/acknowledgement.rs +++ b/src/acknowledgement.rs @@ -62,6 +62,10 @@ impl Acknowledgements { pub(crate) fn on_channel_error(&self, error: Error) { self.0.lock().on_channel_error(error); } + + pub(crate) fn reset(&self) { + self.0.lock().reset(); + } } impl fmt::Debug for Acknowledgements { @@ -174,4 +178,9 @@ impl Inner { } } } + + fn reset(&mut self) { + // FIXME(recovery): handle pendings ?? + self.delivery_tag = IdSequence::new(false); + } } diff --git a/src/channel.rs b/src/channel.rs index 198b45ff..76947cd5 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -86,6 +86,7 @@ impl fmt::Debug for Channel { } impl Channel { + #[allow(clippy::too_many_arguments)] pub(crate) fn new( channel_id: ChannelId, configuration: Configuration, @@ -573,13 +574,20 @@ impl Channel { } fn on_channel_close_ok_sent(&self, error: Option) { - self.set_closed( - error - .clone() - .unwrap_or(Error::InvalidChannelState(ChannelState::Closing)), - ); - if let Some(error) = error { - self.error_handler.on_error(error); + match (self.recovery_config.auto_recover_channels, error) { + (true, Some(error)) if error.is_amqp_soft_error() => { + self.status.set_reconnecting(error) + } + (_, error) => { + self.set_closed( + error + .clone() + .unwrap_or(Error::InvalidChannelState(ChannelState::Closing)), + ); + if let Some(error) = error { + self.error_handler.on_error(error); + } + } } } @@ -862,6 +870,15 @@ impl Channel { resolver: PromiseResolver, channel: Channel, ) -> Result<()> { + if self.recovery_config.auto_recover_channels { + self.status.update_recovery_context(|ctx| { + ctx.set_expected_replies(self.frames.take_expected_replies(self.id)); + }); + self.acknowledgements.reset(); + if !self.status.confirm() { + self.status.finalize_recovery(); + } + } self.set_state(ChannelState::Connected); resolver.resolve(channel); Ok(()) @@ -901,8 +918,19 @@ impl Channel { self.set_closing(error.clone().ok()); let error = error.map_err(|error| info!(channel=%self.id, ?method, code_to_error=%error, "Channel closed with a non-error code")).ok(); let channel = self.clone(); - self.internal_rpc - .register_internal_future(async move { channel.channel_close_ok(error).await }); + self.internal_rpc.register_internal_future(async move { + channel.channel_close_ok(error).await?; + if channel.recovery_config.auto_recover_channels { + let ch = channel.clone(); + channel.channel_open(ch).await?; + if channel.status.confirm() { + channel + .confirm_select(ConfirmSelectOptions::default()) + .await?; + } + } + Ok(()) + }); Ok(()) } diff --git a/src/channel_recovery_context.rs b/src/channel_recovery_context.rs new file mode 100644 index 00000000..468b305b --- /dev/null +++ b/src/channel_recovery_context.rs @@ -0,0 +1,33 @@ +use crate::{ + frames::{ExpectedReply, Frames}, + Error, +}; + +use std::collections::VecDeque; + +pub(crate) struct ChannelRecoveryContext { + pub(crate) cause: Error, + expected_replies: Option>, +} + +impl ChannelRecoveryContext { + pub(crate) fn new(cause: Error) -> Self { + Self { + cause, + expected_replies: None, + } + } + + pub(crate) fn set_expected_replies( + &mut self, + expected_replies: Option>, + ) { + self.expected_replies = expected_replies; + } + + pub(crate) fn finalize_recovery(self) { + if let Some(replies) = self.expected_replies { + Frames::cancel_expected_replies(replies, self.cause); + } + } +} diff --git a/src/channel_status.rs b/src/channel_status.rs index 230020e2..37471454 100644 --- a/src/channel_status.rs +++ b/src/channel_status.rs @@ -1,7 +1,8 @@ use crate::{ channel_receiver_state::{ChannelReceiverStates, DeliveryCause}, + channel_recovery_context::ChannelRecoveryContext, types::{ChannelId, Identifier, PayloadSize}, - Result, + Error, Result, }; use parking_lot::Mutex; use std::{fmt, sync::Arc}; @@ -12,7 +13,7 @@ pub struct ChannelStatus(Arc>); impl ChannelStatus { pub fn initializing(&self) -> bool { - self.0.lock().state == ChannelState::Initial + [ChannelState::Initial, ChannelState::Reconnecting].contains(&self.0.lock().state) } pub fn closing(&self) -> bool { @@ -23,6 +24,17 @@ impl ChannelStatus { self.0.lock().state == ChannelState::Connected } + pub(crate) fn update_recovery_context(&self, apply: F) { + let mut inner = self.0.lock(); + if let Some(context) = inner.recovery_context.as_mut() { + apply(context); + } + } + + pub(crate) fn finalize_recovery(&self) { + self.0.lock().finalize_recovery(); + } + pub(crate) fn can_receive_messages(&self) -> bool { [ChannelState::Closing, ChannelState::Connected].contains(&self.0.lock().state) } @@ -32,8 +44,10 @@ impl ChannelStatus { } pub(crate) fn set_confirm(&self) { - self.0.lock().confirm = true; + let mut inner = self.0.lock(); + inner.confirm = true; trace!("Publisher confirms activated"); + inner.finalize_recovery(); } pub fn state(&self) -> ChannelState { @@ -44,6 +58,12 @@ impl ChannelStatus { self.0.lock().state = state; } + pub(crate) fn set_reconnecting(&self, error: Error) { + let mut inner = self.0.lock(); + inner.state = ChannelState::Reconnecting; + inner.recovery_context = Some(ChannelRecoveryContext::new(error)); + } + pub(crate) fn auto_close(&self, id: ChannelId) -> bool { id != 0 && self.0.lock().state == ChannelState::Connected } @@ -116,6 +136,7 @@ impl ChannelStatus { pub enum ChannelState { #[default] Initial, + Reconnecting, Connected, Closing, Closed, @@ -141,6 +162,7 @@ struct Inner { send_flow: bool, state: ChannelState, receiver_state: ChannelReceiverStates, + recovery_context: Option, } impl Default for Inner { @@ -150,6 +172,15 @@ impl Default for Inner { send_flow: true, state: ChannelState::default(), receiver_state: ChannelReceiverStates::default(), + recovery_context: None, + } + } +} + +impl Inner { + pub(crate) fn finalize_recovery(&mut self) { + if let Some(ctx) = self.recovery_context.take() { + ctx.finalize_recovery(); } } } diff --git a/src/frames.rs b/src/frames.rs index 139f6df9..b6122131 100644 --- a/src/frames.rs +++ b/src/frames.rs @@ -87,8 +87,21 @@ impl Frames { self.inner.lock().drop_pending(error); } + pub(crate) fn take_expected_replies( + &self, + channel_id: ChannelId, + ) -> Option> { + self.inner.lock().expected_replies.remove(&channel_id) + } + pub(crate) fn clear_expected_replies(&self, channel_id: ChannelId, error: Error) { - self.inner.lock().clear_expected_replies(channel_id, error); + if let Some(replies) = self.take_expected_replies(channel_id) { + Self::cancel_expected_replies(replies, error) + } + } + + pub(crate) fn cancel_expected_replies(replies: VecDeque, error: Error) { + Inner::cancel_expected_replies(replies, error) } pub(crate) fn poison(&self) -> Option { @@ -265,12 +278,6 @@ impl Inner { None } - fn clear_expected_replies(&mut self, channel_id: ChannelId, error: Error) { - if let Some(replies) = self.expected_replies.remove(&channel_id) { - Self::cancel_expected_replies(replies, error); - } - } - fn cancel_expected_replies(replies: VecDeque, error: Error) { for ExpectedReply(reply, cancel) in replies { match reply { diff --git a/src/lib.rs b/src/lib.rs index 9f3d8d6f..a5ac4e33 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -136,6 +136,7 @@ mod buffer; mod channel; mod channel_closer; mod channel_receiver_state; +mod channel_recovery_context; mod channel_status; mod channels; mod configuration;