From cc16a76d5a4386913ec27e33087e43b3606244df Mon Sep 17 00:00:00 2001 From: Kirill Zaborsky Date: Tue, 17 Jun 2025 10:27:09 +0300 Subject: [PATCH 1/2] Add external tx hash to map bridge events to their corresponding transactions --- packages/examples/six-sigma/src/lib.rs | 1 + packages/kolme/src/api_server.rs | 13 ++++++--- packages/kolme/src/core/execute.rs | 6 ++++- packages/kolme/src/core/types.rs | 28 +++++++++++++++++++ packages/kolme/src/listener/cosmos.rs | 2 ++ packages/kolme/src/listener/mod.rs | 8 +++++- packages/kolme/src/listener/solana.rs | 37 +++++++++++++++++--------- packages/kolme/src/submitter/mod.rs | 10 ++++--- packages/kolme/src/submitter/solana.rs | 6 ++--- 9 files changed, 86 insertions(+), 25 deletions(-) diff --git a/packages/examples/six-sigma/src/lib.rs b/packages/examples/six-sigma/src/lib.rs index 01d7d1fa..5cf234af 100644 --- a/packages/examples/six-sigma/src/lib.rs +++ b/packages/examples/six-sigma/src/lib.rs @@ -875,6 +875,7 @@ mod tests { .and_then(|log_event| match log_event { LogEvent::ProcessedBridgeEvent(LogBridgeEvent::Regular { bridge_event_id: event_id, + tx_hash: _, account_id, }) => (event_id == bridge_event_id).then_some(account_id), _ => unreachable!(), diff --git a/packages/kolme/src/api_server.rs b/packages/kolme/src/api_server.rs index 7bf9ed68..a077f425 100644 --- a/packages/kolme/src/api_server.rs +++ b/packages/kolme/src/api_server.rs @@ -171,9 +171,15 @@ async fn handle_websocket( }; ApiNotification::NewBlock { block, logs } } - Notification::GenesisInstantiation { chain, contract } => { - ApiNotification::GenesisInstantiation { chain, contract } - } + Notification::GenesisInstantiation { + chain, + tx_hash, + contract, + } => ApiNotification::GenesisInstantiation { + chain, + tx_hash, + contract, + }, Notification::FailedTransaction(failed) => ApiNotification::FailedTransaction(failed), Notification::LatestBlock(latest_block) => ApiNotification::LatestBlock(latest_block), }; @@ -206,6 +212,7 @@ pub enum ApiNotification { /// A claim by a submitter that it has instantiated a bridge contract. GenesisInstantiation { chain: ExternalChain, + tx_hash: Option, contract: String, }, /// A transaction failed in the processor. diff --git a/packages/kolme/src/core/execute.rs b/packages/kolme/src/core/execute.rs index 5047f774..1913f4d6 100644 --- a/packages/kolme/src/core/execute.rs +++ b/packages/kolme/src/core/execute.rs @@ -168,10 +168,11 @@ impl ExecutionContext<'_, App> { } Message::Listener { chain, + tx_hash, event, event_id, } => { - self.listener(*chain, event, *event_id)?; + self.listener(*chain, tx_hash.clone(), event, *event_id)?; } Message::Approve { chain, @@ -196,6 +197,7 @@ impl ExecutionContext<'_, App> { fn listener( &mut self, chain: ExternalChain, + tx_hash: Option, event: &BridgeEvent, event_id: BridgeEventId, ) -> Result<()> { @@ -219,6 +221,7 @@ impl ExecutionContext<'_, App> { event_id, PendingBridgeEvent { event: event.clone(), + tx_hash, attestations: BTreeSet::new(), }, ); @@ -335,6 +338,7 @@ impl ExecutionContext<'_, App> { .deposit(asset_config.asset_id, amount)?; } self.log_event(LogEvent::ProcessedBridgeEvent(LogBridgeEvent::Regular { + tx_hash: pending.tx_hash, bridge_event_id: event_id, account_id, }))?; diff --git a/packages/kolme/src/core/types.rs b/packages/kolme/src/core/types.rs index dc69c497..fe2db438 100644 --- a/packages/kolme/src/core/types.rs +++ b/packages/kolme/src/core/types.rs @@ -15,6 +15,27 @@ pub use error::KolmeError; pub type SolanaClient = solana_client::nonblocking::rpc_client::RpcClient; pub type SolanaPubsubClient = solana_client::nonblocking::pubsub_client::PubsubClient; +#[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq, Clone, Debug)] +pub struct ExternalTxHash(pub String); + +impl MerkleSerialize for ExternalTxHash { + fn merkle_serialize( + &self, + serializer: &mut MerkleSerializer, + ) -> std::result::Result<(), MerkleSerialError> { + serializer.store(&self.0) + } +} + +impl MerkleDeserialize for ExternalTxHash { + fn merkle_deserialize( + deserializer: &mut MerkleDeserializer, + _version: usize, + ) -> Result { + Ok(ExternalTxHash(deserializer.load()?)) + } +} + #[derive( serde::Serialize, serde::Deserialize, @@ -420,6 +441,7 @@ impl MerkleDeserialize for PendingBridgeAction { #[derive(serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq, Debug)] pub struct PendingBridgeEvent { pub event: BridgeEvent, + pub tx_hash: Option, /// Attestations from the listeners pub attestations: BTreeSet, } @@ -428,9 +450,11 @@ impl MerkleSerialize for PendingBridgeEvent { fn merkle_serialize(&self, serializer: &mut MerkleSerializer) -> Result<(), MerkleSerialError> { let Self { event, + tx_hash, attestations, } = self; serializer.store_json(event)?; + serializer.store(tx_hash)?; serializer.store(attestations)?; Ok(()) } @@ -443,6 +467,7 @@ impl MerkleDeserialize for PendingBridgeEvent { ) -> Result { Ok(Self { event: deserializer.load_json()?, + tx_hash: deserializer.load()?, attestations: deserializer.load()?, }) } @@ -1042,6 +1067,7 @@ pub enum Message { App(AppMessage), Listener { chain: ExternalChain, + tx_hash: Option, event_id: BridgeEventId, event: BridgeEvent, }, @@ -1642,6 +1668,7 @@ pub enum Notification { /// A claim by a submitter that it has instantiated a bridge contract. GenesisInstantiation { chain: ExternalChain, + tx_hash: Option, contract: String, }, /// A transaction failed in the processor. @@ -1686,6 +1713,7 @@ pub enum LogEvent { #[serde(rename_all = "snake_case")] pub enum LogBridgeEvent { Regular { + tx_hash: Option, bridge_event_id: BridgeEventId, account_id: AccountId, }, diff --git a/packages/kolme/src/listener/cosmos.rs b/packages/kolme/src/listener/cosmos.rs index ea3e58f0..c91c9ed3 100644 --- a/packages/kolme/src/listener/cosmos.rs +++ b/packages/kolme/src/listener/cosmos.rs @@ -120,6 +120,7 @@ pub(crate) fn to_kolme_message( Message::Listener { chain, + tx_hash: None, event_id, event: BridgeEvent::Regular { wallet: Wallet(wallet), @@ -130,6 +131,7 @@ pub(crate) fn to_kolme_message( } BridgeEventMessage::Signed { wallet, action_id } => Message::Listener { chain, + tx_hash: None, event_id, event: BridgeEvent::Signed { wallet: Wallet(wallet), diff --git a/packages/kolme/src/listener/mod.rs b/packages/kolme/src/listener/mod.rs index e401cd27..926cff42 100644 --- a/packages/kolme/src/listener/mod.rs +++ b/packages/kolme/src/listener/mod.rs @@ -90,7 +90,12 @@ impl Listener { return Ok(contracts); } - if let Notification::GenesisInstantiation { chain, contract } = receiver.recv().await? { + if let Notification::GenesisInstantiation { + chain, + tx_hash, + contract, + } = receiver.recv().await? + { if chain.name() != name { continue; } @@ -147,6 +152,7 @@ impl Listener { &self.secret, vec![Message::Listener { chain, + tx_hash, event: BridgeEvent::Instantiated { contract }, event_id: BridgeEventId::start(), }], diff --git a/packages/kolme/src/listener/solana.rs b/packages/kolme/src/listener/solana.rs index 03134af7..85481e9a 100644 --- a/packages/kolme/src/listener/solana.rs +++ b/packages/kolme/src/listener/solana.rs @@ -117,7 +117,9 @@ async fn listen_internal( continue; } - let Some(msg) = extract_bridge_message_from_logs(&resp.value.logs)? else { + let Some((tx_hash, msg)) = + extract_bridge_message_from_logs(&resp.value.signature, &resp.value.logs)? + else { tracing::warn!( "No bridge message data log was found in TX {} logs.", resp.value.signature @@ -144,7 +146,7 @@ async fn listen_internal( .expect("should have at least one TX processed.") .next(); } else { - let msg = to_kolme_message::(msg, chain); + let msg = to_kolme_message::(tx_hash, msg, chain); kolme .sign_propose_await_transaction(secret, vec![msg]) @@ -168,7 +170,7 @@ async fn catch_up( ) -> Result> { tracing::info!("Catching up on missing bridge events until {}.", last_seen); - let mut messages = vec![]; + let mut messages_with_hashes = vec![]; let txs = client.get_signatures_for_address(contract).await?; // First entry is the latest transaction, we want to work up until to the target ID provided. @@ -196,7 +198,8 @@ async fn catch_up( continue; }; - let Some(msg) = extract_bridge_message_from_logs(&logs)? else { + let Some((tx_hash, msg)) = extract_bridge_message_from_logs(&sig.to_string(), &logs)? + else { tracing::warn!("No bridge message data log was found in TX {} logs.", sig); continue; @@ -206,24 +209,24 @@ async fn catch_up( break; } - messages.push(msg); + messages_with_hashes.push((tx_hash, msg)); } - assert!(messages.is_sorted_by(|a, b| a.id > b.id)); - let Some(latest_id) = messages.first().map(|x| x.id) else { + assert!(messages_with_hashes.is_sorted_by(|(_, a), (_, b)| a.id > b.id)); + let Some(latest_id) = messages_with_hashes.first().map(|(_, x)| x.id) else { return Ok(None); }; tracing::info!( "Found {} missed bridge events. Proposing Kolme transaction...", - messages.len() + messages_with_hashes.len() ); // Now process in reverse insertion order - from oldest to newest. - let kolme_messages: Vec> = messages + let kolme_messages: Vec> = messages_with_hashes .into_iter() .rev() - .map(|x| to_kolme_message(x, chain)) + .map(|(tx_hash, x)| to_kolme_message(tx_hash, x, chain)) .collect(); kolme @@ -233,7 +236,10 @@ async fn catch_up( Ok(Some(BridgeEventId(latest_id))) } -fn extract_bridge_message_from_logs(logs: &[String]) -> Result> { +fn extract_bridge_message_from_logs( + signature: &str, + logs: &[String], +) -> Result> { const PROGRAM_DATA_LOG: &str = "Program data: "; // Our program data should always be the last "Program data:" entry even if CPI was invoked. @@ -253,7 +259,7 @@ fn extract_bridge_message_from_logs(logs: &[String]) -> Result return Ok(Some(result)), + Ok(result) => return Ok(Some((ExternalTxHash(signature.to_owned()), result))), Err(e) => { if logs.iter().any(|x| x.contains("Instruction: Initialize")) { tracing::info!( @@ -270,7 +276,11 @@ fn extract_bridge_message_from_logs(logs: &[String]) -> Result(msg: BridgeMessage, chain: SolanaChain) -> Message { +fn to_kolme_message( + tx_hash: ExternalTxHash, + msg: BridgeMessage, + chain: SolanaChain, +) -> Message { let event_id = BridgeEventId(msg.id); let wallet = Pubkey::new_from_array(msg.wallet).to_string(); let event = match msg.ty { @@ -299,6 +309,7 @@ fn to_kolme_message(msg: BridgeMessage, chain: SolanaChain) -> Message { Message::Listener { chain: chain.into(), + tx_hash: Some(tx_hash), event_id, event, } diff --git a/packages/kolme/src/submitter/mod.rs b/packages/kolme/src/submitter/mod.rs index 5e065c22..bac173e5 100644 --- a/packages/kolme/src/submitter/mod.rs +++ b/packages/kolme/src/submitter/mod.rs @@ -99,6 +99,7 @@ impl Submitter { Notification::NewBlock(_) => (), Notification::GenesisInstantiation { chain: _, + tx_hash: _, contract: _, } => continue, Notification::FailedTransaction { .. } => continue, @@ -128,7 +129,7 @@ impl Submitter { } async fn handle_genesis(&mut self, genesis_action: GenesisAction) -> Result<()> { - let (contract_addr, chain) = match genesis_action { + let (contract_addr, txhash, chain) = match genesis_action { GenesisAction::InstantiateCosmos { chain, code_id, @@ -146,7 +147,7 @@ impl Submitter { let addr = cosmos::instantiate(&cosmos, seed_phrase, code_id, args).await?; - (addr, chain.into()) + (addr, None, chain.into()) } GenesisAction::InstantiateSolana { chain, @@ -163,14 +164,15 @@ impl Submitter { let client = self.kolme.read().get_solana_client(chain).await; - solana::instantiate(&client, keypair, &program_id, args).await?; + let txhash = solana::instantiate(&client, keypair, &program_id, args).await?; - (program_id, chain.into()) + (program_id, Some(txhash), chain.into()) } }; self.kolme.notify(Notification::GenesisInstantiation { chain, + tx_hash: txhash, contract: contract_addr, }); self.genesis_created.insert(chain); diff --git a/packages/kolme/src/submitter/solana.rs b/packages/kolme/src/submitter/solana.rs index 144edda9..36549ea8 100644 --- a/packages/kolme/src/submitter/solana.rs +++ b/packages/kolme/src/submitter/solana.rs @@ -14,16 +14,16 @@ pub async fn instantiate( keypair: &Keypair, program_id: &str, set: ValidatorSet, -) -> Result<()> { +) -> Result { let data = InitializeIxData { set }; let program_pubkey = Pubkey::from_str(program_id)?; let blockhash = client.get_latest_blockhash().await?; let tx = init_tx(program_pubkey, blockhash, keypair, &data).map_err(|x| anyhow::anyhow!(x))?; - client.send_and_confirm_transaction(&tx).await?; + let signature = client.send_and_confirm_transaction(&tx).await?; - Ok(()) + Ok(ExternalTxHash(signature.to_string())) } pub async fn execute( From e5d3d8c4a5b0600cbe716364027cb67244657503 Mon Sep 17 00:00:00 2001 From: Kirill Zaborsky Date: Tue, 17 Jun 2025 14:50:32 +0300 Subject: [PATCH 2/2] Improve versioning as per Michael's comments --- packages/kolme/src/core/types.rs | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/packages/kolme/src/core/types.rs b/packages/kolme/src/core/types.rs index fe2db438..573d2401 100644 --- a/packages/kolme/src/core/types.rs +++ b/packages/kolme/src/core/types.rs @@ -18,8 +18,8 @@ pub type SolanaPubsubClient = solana_client::nonblocking::pubsub_client::PubsubC #[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq, Clone, Debug)] pub struct ExternalTxHash(pub String); -impl MerkleSerialize for ExternalTxHash { - fn merkle_serialize( +impl MerkleSerializeRaw for ExternalTxHash { + fn merkle_serialize_raw( &self, serializer: &mut MerkleSerializer, ) -> std::result::Result<(), MerkleSerialError> { @@ -27,11 +27,10 @@ impl MerkleSerialize for ExternalTxHash { } } -impl MerkleDeserialize for ExternalTxHash { - fn merkle_deserialize( +impl MerkleDeserializeRaw for ExternalTxHash { + fn merkle_deserialize_raw( deserializer: &mut MerkleDeserializer, - _version: usize, - ) -> Result { + ) -> std::result::Result { Ok(ExternalTxHash(deserializer.load()?)) } } @@ -446,6 +445,8 @@ pub struct PendingBridgeEvent { pub attestations: BTreeSet, } +const VERSION_WITH_TX_HASH: usize = 1; + impl MerkleSerialize for PendingBridgeEvent { fn merkle_serialize(&self, serializer: &mut MerkleSerializer) -> Result<(), MerkleSerialError> { let Self { @@ -454,21 +455,29 @@ impl MerkleSerialize for PendingBridgeEvent { attestations, } = self; serializer.store_json(event)?; - serializer.store(tx_hash)?; serializer.store(attestations)?; + serializer.store(tx_hash)?; Ok(()) } + + fn merkle_version() -> usize { + VERSION_WITH_TX_HASH + } } impl MerkleDeserialize for PendingBridgeEvent { fn merkle_deserialize( deserializer: &mut MerkleDeserializer, - _version: usize, + version: usize, ) -> Result { Ok(Self { event: deserializer.load_json()?, - tx_hash: deserializer.load()?, attestations: deserializer.load()?, + tx_hash: if version >= VERSION_WITH_TX_HASH { + deserializer.load()? + } else { + None + }, }) } }