Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
ljoss17 committed Jan 16, 2024
1 parent 55bc5ad commit a9357fa
Show file tree
Hide file tree
Showing 3 changed files with 289 additions and 6 deletions.
96 changes: 92 additions & 4 deletions crates/relayer/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> Channel<ChainA, ChainB> {
&mut self,
state: State,
) -> Result<(Option<IbcEvent>, Next), ChannelError> {
tracing::warn!("self state `{}` other state `{:#?}`", state, self.counterparty_state());
let event = match (state, self.counterparty_state()?) {
// Open handshake steps
(State::Init, State::Uninitialized) => Some(self.build_chan_open_try_and_send()?),
Expand Down Expand Up @@ -825,24 +826,111 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> Channel<ChainA, ChainB> {
None => Some(self.build_chan_upgrade_cancel_and_send()?),
}
}
(State::Flushing, State::Open(UpgradeState::Upgrading)) => {
/*(State::Flushing, State::Open(UpgradeState::Upgrading)) => {
match self.build_chan_upgrade_ack_and_send()? {
Some(event) => Some(event),
None => Some(self.flipped().build_chan_upgrade_cancel_and_send()?),
}
}
}*/
(State::Flushcomplete, State::Flushing) => {
match self.build_chan_upgrade_confirm_and_send()? {
Some(event) => Some(event),
None => Some(self.flipped().build_chan_upgrade_cancel_and_send()?),
}
}
(State::Flushcomplete, State::Open(UpgradeState::Upgrading)) => {
Some(self.flipped().build_chan_upgrade_open_and_send()?)
(State::Flushing, State::Open(UpgradeState::Upgrading)) => {

let dst_latest_height = self
.dst_chain()
.query_latest_height()
.map_err(|e| ChannelError::chain_query(self.dst_chain().id(), e))?;
let src_latest_height = self
.src_chain()
.query_latest_height()
.map_err(|e| ChannelError::chain_query(self.src_chain().id(), e))?;
let (error_receipt, _) = self
.dst_chain()
.query_upgrade_error(
QueryUpgradeErrorRequest {
port_id: self.dst_port_id().to_string(),
channel_id: self.dst_channel_id().unwrap().to_string(),
},
dst_latest_height,
)
.map_err(|e| ChannelError::chain_query(self.src_chain().id(), e))?;

let (channel_end, _) = self
.src_chain()
.query_channel(
QueryChannelRequest {
port_id: self.src_port_id().clone(),
channel_id: self.src_channel_id().unwrap().clone(),
height: QueryHeight::Specific(src_latest_height),
},
IncludeProof::Yes,
)
.map_err(|e| ChannelError::query(self.src_chain().id(), e))?;

if error_receipt.sequence == channel_end.upgrade_sequence {
Some(self.flipped().build_chan_upgrade_cancel_and_send()?)
} else {
match self.build_chan_upgrade_ack_and_send()? {
Some(event) => Some(event),
None => Some(self.flipped().build_chan_upgrade_cancel_and_send()?),
}
}
}
(State::Open(UpgradeState::NotUpgrading), State::Flushing) => {

let dst_latest_height = self
.dst_chain()
.query_latest_height()
.map_err(|e| ChannelError::chain_query(self.dst_chain().id(), e))?;
let src_latest_height = self
.src_chain()
.query_latest_height()
.map_err(|e| ChannelError::chain_query(self.src_chain().id(), e))?;
let (error_receipt, _) = self
.src_chain()
.query_upgrade_error(
QueryUpgradeErrorRequest {
port_id: self.src_port_id().to_string(),
channel_id: self.src_channel_id().unwrap().to_string(),
},
src_latest_height,
)
.map_err(|e| ChannelError::chain_query(self.src_chain().id(), e))?;

let (channel_end, _) = self
.dst_chain()
.query_channel(
QueryChannelRequest {
port_id: self.dst_port_id().clone(),
channel_id: self.dst_channel_id().unwrap().clone(),
height: QueryHeight::Specific(dst_latest_height),
},
IncludeProof::Yes,
)
.map_err(|e| ChannelError::query(self.src_chain().id(), e))?;

if error_receipt.sequence == channel_end.upgrade_sequence {
Some(self.build_chan_upgrade_cancel_and_send()?)
} else {
match self.build_chan_upgrade_ack_and_send()? {
Some(event) => Some(event),
None => Some(self.build_chan_upgrade_cancel_and_send()?),
}
}
}
/*(State::Flushcomplete, State::Open(UpgradeState::Upgrading)) => {
Some(self.flipped().build_chan_upgrade_open_and_send()?)
}*/
(State::Open(UpgradeState::Upgrading), State::Flushcomplete) => {
Some(self.build_chan_upgrade_open_and_send()?)
}
(State::Open(UpgradeState::NotUpgrading), State::Flushcomplete) => {
Some(self.build_chan_upgrade_open_and_send()?)
}

_ => None,
};
Expand Down
4 changes: 3 additions & 1 deletion crates/relayer/src/supervisor/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,10 +249,12 @@ impl<'a, Chain: ChainHandle> SpawnContext<'a, Chain> {
.channel_end
.is_upgrading(&channel_scan.counterparty);

tracing::warn!("is_channel_upgrading: {is_channel_upgrading}");

if (mode.clients.enabled || mode.packets.enabled)
&& chan_state_src.is_open()
&& (chan_state_dst.is_open() || chan_state_dst.is_closed())
&& !is_channel_upgrading
//&& !is_channel_upgrading
{
if mode.clients.enabled {
// Spawn the client worker
Expand Down
195 changes: 194 additions & 1 deletion tools/integration-test/src/tests/channel_upgrade/upgrade_handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ use std::thread::sleep;
use ibc_relayer::chain::requests::{IncludeProof, QueryChannelRequest, QueryHeight};
use ibc_relayer_types::core::ics04_channel::packet::Sequence;
use ibc_relayer_types::core::ics04_channel::version::Version;
use ibc_relayer_types::events::IbcEventType;
use ibc_test_framework::chain::config::{set_max_deposit_period, set_voting_period};
use ibc_test_framework::prelude::*;
use ibc_test_framework::relayer::channel::{
assert_eventually_channel_established, assert_eventually_channel_upgrade_ack,
assert_eventually_channel_upgrade_cancel, assert_eventually_channel_upgrade_open,
assert_eventually_channel_upgrade_try, ChannelUpgradableAttributes,
assert_eventually_channel_upgrade_try, ChannelUpgradableAttributes, assert_eventually_channel_upgrade_flushing,
};

#[test]
Expand All @@ -36,6 +37,11 @@ fn test_channel_upgrade_timeout_confirm_handshake() -> Result<(), Error> {
run_binary_channel_test(&ChannelUpgradeTimeoutConfirmHandshake)
}

#[test]
fn test_channel_upgrade_timeout_when_flushing_handshake() -> Result<(), Error> {
run_binary_channel_test(&ChannelUpgradeHandshakeTimeoutWhenFlushingHandshake)
}

const MAX_DEPOSIT_PERIOD: &str = "10s";
const VOTING_PERIOD: u64 = 10;

Expand Down Expand Up @@ -530,3 +536,190 @@ impl BinaryChannelTest for ChannelUpgradeTimeoutConfirmHandshake {
})
}
}

struct ChannelUpgradeHandshakeTimeoutWhenFlushingHandshake;

impl TestOverrides for ChannelUpgradeHandshakeTimeoutWhenFlushingHandshake {
fn modify_relayer_config(&self, config: &mut Config) {
config.mode.channels.enabled = true;

config.mode.clients.misbehaviour = false;
}

fn modify_genesis_file(&self, genesis: &mut serde_json::Value) -> Result<(), Error> {
set_max_deposit_period(genesis, MAX_DEPOSIT_PERIOD)?;
set_voting_period(genesis, VOTING_PERIOD)?;
Ok(())
}

fn should_spawn_supervisor(&self) -> bool {
false
}
}

impl BinaryChannelTest for ChannelUpgradeHandshakeTimeoutWhenFlushingHandshake {
fn run<ChainA: ChainHandle, ChainB: ChainHandle>(
&self,
_config: &TestConfig,
relayer: RelayerDriver,
chains: ConnectedChains<ChainA, ChainB>,
channels: ConnectedChannel<ChainA, ChainB>,
) -> Result<(), Error> {
info!("Check that channels are both in OPEN State");

assert_eventually_channel_established(
&chains.handle_b,
&chains.handle_a,
&channels.channel_id_b.as_ref(),
&channels.port_b.as_ref(),
)?;

let channel_end_a = chains
.handle_a
.query_channel(
QueryChannelRequest {
port_id: channels.port_a.0.clone(),
channel_id: channels.channel_id_a.0.clone(),
height: QueryHeight::Latest,
},
IncludeProof::No,
)
.map(|(channel_end, _)| channel_end)
.map_err(|e| eyre!("Error querying ChannelEnd A: {e}"))?;

let channel_end_b = chains
.handle_b
.query_channel(
QueryChannelRequest {
port_id: channels.port_b.0.clone(),
channel_id: channels.channel_id_b.0.clone(),
height: QueryHeight::Latest,
},
IncludeProof::No,
)
.map(|(channel_end, _)| channel_end)
.map_err(|e| eyre!("Error querying ChannelEnd B: {e}"))?;

let old_version = channel_end_a.version;
let old_ordering = channel_end_a.ordering;
let old_connection_hops_a = channel_end_a.connection_hops;
let old_connection_hops_b = channel_end_b.connection_hops;

let channel = channels.channel;
let new_version = Version::ics20_with_fee();

let old_attrs = ChannelUpgradableAttributes::new(
old_version.clone(),
old_version.clone(),
old_ordering,
old_connection_hops_a.clone(),
old_connection_hops_b.clone(),
Sequence::from(1),
);

info!("Will update channel params to set a shorter upgrade timeout...");

// the upgrade timeout should be long enough for chain a
// to complete Ack successfully so that it goes into `FLUSHING`
chains.node_b.chain_driver().update_channel_params(
25000000000,
chains.handle_b().get_signer().unwrap().as_ref(),
"1",
)?;

info!("Will initialise upgrade handshake with governance proposal...");
warn!("id : {}", chains.node_a.chain_driver().chain_id());
warn!("port id: {}", channel.src_port_id());
warn!("channel id: {}", channel.src_channel_id().unwrap());

chains.node_a.chain_driver().initialise_channel_upgrade(
channel.src_port_id().as_str(),
channel.src_channel_id().unwrap().as_str(),
old_ordering.as_str(),
old_connection_hops_a.first().unwrap().as_str(),
&serde_json::to_string(&new_version.0).unwrap(),
chains.handle_a().get_signer().unwrap().as_ref(),
"1",
)?;

info!("Will run ChanUpgradeTry step...");

channel.build_chan_upgrade_try_and_send()?;

info!("Check that the step ChanUpgradeTry was correctly executed...");

assert_eventually_channel_upgrade_try(
&chains.handle_b,
&chains.handle_a,
&channels.channel_id_b.as_ref(),
&channels.port_b.as_ref(),
&old_attrs.flipped(),
)?;

// send a IBC transfer message from chain a to chain b
// so that we have an in-flight packet and chain a
// will move to `FLUSHING` during Ack
let denom_a = chains.node_a.denom();
let wallet_a = chains.node_a.wallets().user1().cloned();
let wallet_b = chains.node_b.wallets().user1().cloned();
let a_to_b_amount = 12345u64;

info!(
"Sending IBC transfer from chain {} to chain {} with amount of {} {}",
chains.chain_id_a(),
chains.chain_id_b(),
a_to_b_amount,
denom_a
);

chains.node_a.chain_driver().ibc_transfer_token(
&channels.port_a.as_ref(),
&channels.channel_id_a.as_ref(),
&wallet_a.as_ref(),
&wallet_b.address(),
&denom_a.with_amount(a_to_b_amount).as_ref(),
)?;

info!("Will run ChanUpgradeAck step...");

channel.flipped().build_chan_upgrade_ack_and_send()?;

info!("Check that the step ChanUpgradeAck was correctly executed...");

assert_eventually_channel_upgrade_flushing(
&chains.handle_a,
&chains.handle_b,
&channels.channel_id_a.as_ref(),
&channels.port_a.as_ref(),
&old_attrs,
)?;

// wait enough time so that timeout expires while chain a is in FLUSHING
sleep(Duration::from_nanos(35000000000));

info!("Will run ChanUpgradeTimeout step...");

// Since the chain a has not moved to `FLUSH_COMPLETE` before the upgrade timeout
// expired, then we can submit `MsgChannelUpgradeTimeout` on chain b
// to cancel the upgrade and move the channel back to `OPEN`
let timeout_event = channel.build_chan_upgrade_timeout_and_send()?;
assert_eq!(
timeout_event.event_type(),
IbcEventType::UpgradeTimeoutChannel
);

relayer.with_supervisor(||{
info!("Check that the step ChanUpgradeTimeout was correctly executed...");

assert_eventually_channel_upgrade_cancel(
&chains.handle_b,
&chains.handle_a,
&channels.channel_id_b.as_ref(),
&channels.port_b.as_ref(),
&old_attrs.flipped(),
)?;

Ok(())
})
}
}

0 comments on commit a9357fa

Please sign in to comment.