From a9357faf82930638105d66af0f734545cc47b6e1 Mon Sep 17 00:00:00 2001 From: Luca Joss Date: Tue, 16 Jan 2024 20:42:23 +0100 Subject: [PATCH] WIP --- crates/relayer/src/channel.rs | 96 ++++++++- crates/relayer/src/supervisor/spawn.rs | 4 +- .../channel_upgrade/upgrade_handshake.rs | 195 +++++++++++++++++- 3 files changed, 289 insertions(+), 6 deletions(-) diff --git a/crates/relayer/src/channel.rs b/crates/relayer/src/channel.rs index ee57ee72f1..d1a686f29a 100644 --- a/crates/relayer/src/channel.rs +++ b/crates/relayer/src/channel.rs @@ -789,6 +789,7 @@ impl Channel { &mut self, state: State, ) -> Result<(Option, Next), ChannelError> { + tracing::warn!("self state `{}` other state `{:#?}`", state, self.counterparty_state()); let event = match (state, self.counterparty_state()?) { // Open handshake steps (State::Init, State::Uninitialized) => Some(self.build_chan_open_try_and_send()?), @@ -825,24 +826,111 @@ impl Channel { None => Some(self.build_chan_upgrade_cancel_and_send()?), } } - (State::Flushing, State::Open(UpgradeState::Upgrading)) => { + /*(State::Flushing, State::Open(UpgradeState::Upgrading)) => { match self.build_chan_upgrade_ack_and_send()? { Some(event) => Some(event), None => Some(self.flipped().build_chan_upgrade_cancel_and_send()?), } - } + }*/ (State::Flushcomplete, State::Flushing) => { match self.build_chan_upgrade_confirm_and_send()? { Some(event) => Some(event), None => Some(self.flipped().build_chan_upgrade_cancel_and_send()?), } } - (State::Flushcomplete, State::Open(UpgradeState::Upgrading)) => { - Some(self.flipped().build_chan_upgrade_open_and_send()?) + (State::Flushing, State::Open(UpgradeState::Upgrading)) => { + + let dst_latest_height = self + .dst_chain() + .query_latest_height() + .map_err(|e| ChannelError::chain_query(self.dst_chain().id(), e))?; + let src_latest_height = self + .src_chain() + .query_latest_height() + .map_err(|e| ChannelError::chain_query(self.src_chain().id(), e))?; + let (error_receipt, _) = self + .dst_chain() + .query_upgrade_error( + QueryUpgradeErrorRequest { + port_id: self.dst_port_id().to_string(), + channel_id: self.dst_channel_id().unwrap().to_string(), + }, + dst_latest_height, + ) + .map_err(|e| ChannelError::chain_query(self.src_chain().id(), e))?; + + let (channel_end, _) = self + .src_chain() + .query_channel( + QueryChannelRequest { + port_id: self.src_port_id().clone(), + channel_id: self.src_channel_id().unwrap().clone(), + height: QueryHeight::Specific(src_latest_height), + }, + IncludeProof::Yes, + ) + .map_err(|e| ChannelError::query(self.src_chain().id(), e))?; + + if error_receipt.sequence == channel_end.upgrade_sequence { + Some(self.flipped().build_chan_upgrade_cancel_and_send()?) + } else { + match self.build_chan_upgrade_ack_and_send()? { + Some(event) => Some(event), + None => Some(self.flipped().build_chan_upgrade_cancel_and_send()?), + } + } } + (State::Open(UpgradeState::NotUpgrading), State::Flushing) => { + + let dst_latest_height = self + .dst_chain() + .query_latest_height() + .map_err(|e| ChannelError::chain_query(self.dst_chain().id(), e))?; + let src_latest_height = self + .src_chain() + .query_latest_height() + .map_err(|e| ChannelError::chain_query(self.src_chain().id(), e))?; + let (error_receipt, _) = self + .src_chain() + .query_upgrade_error( + QueryUpgradeErrorRequest { + port_id: self.src_port_id().to_string(), + channel_id: self.src_channel_id().unwrap().to_string(), + }, + src_latest_height, + ) + .map_err(|e| ChannelError::chain_query(self.src_chain().id(), e))?; + + let (channel_end, _) = self + .dst_chain() + .query_channel( + QueryChannelRequest { + port_id: self.dst_port_id().clone(), + channel_id: self.dst_channel_id().unwrap().clone(), + height: QueryHeight::Specific(dst_latest_height), + }, + IncludeProof::Yes, + ) + .map_err(|e| ChannelError::query(self.src_chain().id(), e))?; + + if error_receipt.sequence == channel_end.upgrade_sequence { + Some(self.build_chan_upgrade_cancel_and_send()?) + } else { + match self.build_chan_upgrade_ack_and_send()? { + Some(event) => Some(event), + None => Some(self.build_chan_upgrade_cancel_and_send()?), + } + } + } + /*(State::Flushcomplete, State::Open(UpgradeState::Upgrading)) => { + Some(self.flipped().build_chan_upgrade_open_and_send()?) + }*/ (State::Open(UpgradeState::Upgrading), State::Flushcomplete) => { Some(self.build_chan_upgrade_open_and_send()?) } + (State::Open(UpgradeState::NotUpgrading), State::Flushcomplete) => { + Some(self.build_chan_upgrade_open_and_send()?) + } _ => None, }; diff --git a/crates/relayer/src/supervisor/spawn.rs b/crates/relayer/src/supervisor/spawn.rs index 788a38f12f..2c863913db 100644 --- a/crates/relayer/src/supervisor/spawn.rs +++ b/crates/relayer/src/supervisor/spawn.rs @@ -249,10 +249,12 @@ impl<'a, Chain: ChainHandle> SpawnContext<'a, Chain> { .channel_end .is_upgrading(&channel_scan.counterparty); + tracing::warn!("is_channel_upgrading: {is_channel_upgrading}"); + if (mode.clients.enabled || mode.packets.enabled) && chan_state_src.is_open() && (chan_state_dst.is_open() || chan_state_dst.is_closed()) - && !is_channel_upgrading + //&& !is_channel_upgrading { if mode.clients.enabled { // Spawn the client worker diff --git a/tools/integration-test/src/tests/channel_upgrade/upgrade_handshake.rs b/tools/integration-test/src/tests/channel_upgrade/upgrade_handshake.rs index c2871b528f..b66ea1f3c8 100644 --- a/tools/integration-test/src/tests/channel_upgrade/upgrade_handshake.rs +++ b/tools/integration-test/src/tests/channel_upgrade/upgrade_handshake.rs @@ -8,12 +8,13 @@ use std::thread::sleep; use ibc_relayer::chain::requests::{IncludeProof, QueryChannelRequest, QueryHeight}; use ibc_relayer_types::core::ics04_channel::packet::Sequence; use ibc_relayer_types::core::ics04_channel::version::Version; +use ibc_relayer_types::events::IbcEventType; use ibc_test_framework::chain::config::{set_max_deposit_period, set_voting_period}; use ibc_test_framework::prelude::*; use ibc_test_framework::relayer::channel::{ assert_eventually_channel_established, assert_eventually_channel_upgrade_ack, assert_eventually_channel_upgrade_cancel, assert_eventually_channel_upgrade_open, - assert_eventually_channel_upgrade_try, ChannelUpgradableAttributes, + assert_eventually_channel_upgrade_try, ChannelUpgradableAttributes, assert_eventually_channel_upgrade_flushing, }; #[test] @@ -36,6 +37,11 @@ fn test_channel_upgrade_timeout_confirm_handshake() -> Result<(), Error> { run_binary_channel_test(&ChannelUpgradeTimeoutConfirmHandshake) } +#[test] +fn test_channel_upgrade_timeout_when_flushing_handshake() -> Result<(), Error> { + run_binary_channel_test(&ChannelUpgradeHandshakeTimeoutWhenFlushingHandshake) +} + const MAX_DEPOSIT_PERIOD: &str = "10s"; const VOTING_PERIOD: u64 = 10; @@ -530,3 +536,190 @@ impl BinaryChannelTest for ChannelUpgradeTimeoutConfirmHandshake { }) } } + +struct ChannelUpgradeHandshakeTimeoutWhenFlushingHandshake; + +impl TestOverrides for ChannelUpgradeHandshakeTimeoutWhenFlushingHandshake { + fn modify_relayer_config(&self, config: &mut Config) { + config.mode.channels.enabled = true; + + config.mode.clients.misbehaviour = false; + } + + fn modify_genesis_file(&self, genesis: &mut serde_json::Value) -> Result<(), Error> { + set_max_deposit_period(genesis, MAX_DEPOSIT_PERIOD)?; + set_voting_period(genesis, VOTING_PERIOD)?; + Ok(()) + } + + fn should_spawn_supervisor(&self) -> bool { + false + } +} + +impl BinaryChannelTest for ChannelUpgradeHandshakeTimeoutWhenFlushingHandshake { + fn run( + &self, + _config: &TestConfig, + relayer: RelayerDriver, + chains: ConnectedChains, + channels: ConnectedChannel, + ) -> Result<(), Error> { + info!("Check that channels are both in OPEN State"); + + assert_eventually_channel_established( + &chains.handle_b, + &chains.handle_a, + &channels.channel_id_b.as_ref(), + &channels.port_b.as_ref(), + )?; + + let channel_end_a = chains + .handle_a + .query_channel( + QueryChannelRequest { + port_id: channels.port_a.0.clone(), + channel_id: channels.channel_id_a.0.clone(), + height: QueryHeight::Latest, + }, + IncludeProof::No, + ) + .map(|(channel_end, _)| channel_end) + .map_err(|e| eyre!("Error querying ChannelEnd A: {e}"))?; + + let channel_end_b = chains + .handle_b + .query_channel( + QueryChannelRequest { + port_id: channels.port_b.0.clone(), + channel_id: channels.channel_id_b.0.clone(), + height: QueryHeight::Latest, + }, + IncludeProof::No, + ) + .map(|(channel_end, _)| channel_end) + .map_err(|e| eyre!("Error querying ChannelEnd B: {e}"))?; + + let old_version = channel_end_a.version; + let old_ordering = channel_end_a.ordering; + let old_connection_hops_a = channel_end_a.connection_hops; + let old_connection_hops_b = channel_end_b.connection_hops; + + let channel = channels.channel; + let new_version = Version::ics20_with_fee(); + + let old_attrs = ChannelUpgradableAttributes::new( + old_version.clone(), + old_version.clone(), + old_ordering, + old_connection_hops_a.clone(), + old_connection_hops_b.clone(), + Sequence::from(1), + ); + + info!("Will update channel params to set a shorter upgrade timeout..."); + + // the upgrade timeout should be long enough for chain a + // to complete Ack successfully so that it goes into `FLUSHING` + chains.node_b.chain_driver().update_channel_params( + 25000000000, + chains.handle_b().get_signer().unwrap().as_ref(), + "1", + )?; + + info!("Will initialise upgrade handshake with governance proposal..."); + warn!("id : {}", chains.node_a.chain_driver().chain_id()); + warn!("port id: {}", channel.src_port_id()); + warn!("channel id: {}", channel.src_channel_id().unwrap()); + + chains.node_a.chain_driver().initialise_channel_upgrade( + channel.src_port_id().as_str(), + channel.src_channel_id().unwrap().as_str(), + old_ordering.as_str(), + old_connection_hops_a.first().unwrap().as_str(), + &serde_json::to_string(&new_version.0).unwrap(), + chains.handle_a().get_signer().unwrap().as_ref(), + "1", + )?; + + info!("Will run ChanUpgradeTry step..."); + + channel.build_chan_upgrade_try_and_send()?; + + info!("Check that the step ChanUpgradeTry was correctly executed..."); + + assert_eventually_channel_upgrade_try( + &chains.handle_b, + &chains.handle_a, + &channels.channel_id_b.as_ref(), + &channels.port_b.as_ref(), + &old_attrs.flipped(), + )?; + + // send a IBC transfer message from chain a to chain b + // so that we have an in-flight packet and chain a + // will move to `FLUSHING` during Ack + let denom_a = chains.node_a.denom(); + let wallet_a = chains.node_a.wallets().user1().cloned(); + let wallet_b = chains.node_b.wallets().user1().cloned(); + let a_to_b_amount = 12345u64; + + info!( + "Sending IBC transfer from chain {} to chain {} with amount of {} {}", + chains.chain_id_a(), + chains.chain_id_b(), + a_to_b_amount, + denom_a + ); + + chains.node_a.chain_driver().ibc_transfer_token( + &channels.port_a.as_ref(), + &channels.channel_id_a.as_ref(), + &wallet_a.as_ref(), + &wallet_b.address(), + &denom_a.with_amount(a_to_b_amount).as_ref(), + )?; + + info!("Will run ChanUpgradeAck step..."); + + channel.flipped().build_chan_upgrade_ack_and_send()?; + + info!("Check that the step ChanUpgradeAck was correctly executed..."); + + assert_eventually_channel_upgrade_flushing( + &chains.handle_a, + &chains.handle_b, + &channels.channel_id_a.as_ref(), + &channels.port_a.as_ref(), + &old_attrs, + )?; + + // wait enough time so that timeout expires while chain a is in FLUSHING + sleep(Duration::from_nanos(35000000000)); + + info!("Will run ChanUpgradeTimeout step..."); + + // Since the chain a has not moved to `FLUSH_COMPLETE` before the upgrade timeout + // expired, then we can submit `MsgChannelUpgradeTimeout` on chain b + // to cancel the upgrade and move the channel back to `OPEN` + let timeout_event = channel.build_chan_upgrade_timeout_and_send()?; + assert_eq!( + timeout_event.event_type(), + IbcEventType::UpgradeTimeoutChannel + ); + + relayer.with_supervisor(||{ + info!("Check that the step ChanUpgradeTimeout was correctly executed..."); + + assert_eventually_channel_upgrade_cancel( + &chains.handle_b, + &chains.handle_a, + &channels.channel_id_b.as_ref(), + &channels.port_b.as_ref(), + &old_attrs.flipped(), + )?; + + Ok(()) + }) + } +}