diff --git a/core/src/database/operator.rs b/core/src/database/operator.rs index 8a25cbff..b9f84e60 100644 --- a/core/src/database/operator.rs +++ b/core/src/database/operator.rs @@ -64,6 +64,7 @@ impl Database { .collect()) } + /// TODO: wallet_address should have `Address` type. pub async fn set_operator( &self, tx: Option>, diff --git a/core/src/rpc/aggregator.rs b/core/src/rpc/aggregator.rs index 27b5ba6e..75c72b20 100644 --- a/core/src/rpc/aggregator.rs +++ b/core/src/rpc/aggregator.rs @@ -6,8 +6,9 @@ use crate::builder::transaction::create_move_to_vault_txhandler; use crate::config::BridgeConfig; use crate::rpc::clementine::clementine_operator_client::ClementineOperatorClient; use crate::rpc::clementine::clementine_verifier_client::ClementineVerifierClient; +use crate::rpc::clementine::VerifierDepositSignParams; use crate::rpc::error::output_stream_ended_prematurely; -use crate::rpc::parsers; +use crate::rpc::parser; use crate::{ aggregator::Aggregator, builder::sighash::{ @@ -235,30 +236,10 @@ async fn create_nonce_streams( })) .await?; - // Get the first responses. + // Get the first responses from verifiers. let first_responses: Vec = try_join_all(nonce_streams.iter_mut().map(|stream| async { - let nonce_gen_first_response = stream - .message() - .await? - .ok_or(BridgeError::RPCStreamEndedUnexpectedly( - "NonceGen returns nothing".to_string(), - ))? - .response - .ok_or(BridgeError::RPCStreamEndedUnexpectedly( - "NonceGen response field is empty".to_string(), - ))?; - - if let clementine::nonce_gen_response::Response::FirstResponse( - nonce_gen_first_response, - ) = nonce_gen_first_response - { - Ok(nonce_gen_first_response) - } else { - Err(BridgeError::RPCInvalidResponse( - "NonceGen response is not FirstResponse".to_string(), - )) - } + parser::verifier::parse_nonce_gen_first_response(stream).await })) .await?; @@ -342,18 +323,8 @@ impl Aggregator { deposit_params: DepositParams, ) -> Result { let (deposit_outpoint, evm_address, recovery_taproot_address, user_takes_after) = - parsers::parse_deposit_params(deposit_params)?; - let musig_partial_sigs: Vec = partial_sigs - .iter() - .map(|sig: &Vec| MusigPartialSignature::from_slice(sig)) - .collect::, _>>() - .map_err(|e| { - BridgeError::RPCParamMalformed( - "Partial sigs for movetx could not be parsed into MusigPartialSignature" - .to_string(), - e.to_string(), - ) - })?; + parser::parse_deposit_params(deposit_params)?; + let musig_partial_sigs = parser::verifier::parse_partial_sigs(partial_sigs)?; // create move tx and calculate sighash let mut move_txhandler = create_move_to_vault_txhandler( @@ -392,32 +363,36 @@ impl Aggregator { impl ClementineAggregator for Aggregator { #[tracing::instrument(skip_all, err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))] async fn setup(&self, _request: Request) -> Result, Status> { - tracing::info!("Collecting verifier details..."); - let verifier_params = try_join_all(self.verifier_clients.iter().map(|client| { + tracing::info!("Collecting verifier public keys..."); + let verifier_public_keys = try_join_all(self.verifier_clients.iter().map(|client| { let mut client = client.clone(); + async move { - let response = client.get_params(Request::new(Empty {})).await?; - Ok::<_, Status>(response.into_inner()) + let verifier_params = client + .get_params(Request::new(Empty {})) + .await? + .into_inner(); + let verifier_public_key = verifier_params.public_key; + + Ok::<_, Status>(verifier_public_key) } })) .await?; - let verifier_public_keys: Vec> = - verifier_params.into_iter().map(|p| p.public_key).collect(); tracing::debug!("Verifier public keys: {:?}", verifier_public_keys); tracing::info!("Setting up verifiers..."); try_join_all(self.verifier_clients.iter().map(|client| { let mut client = client.clone(); - { - let verifier_public_keys = clementine::VerifierPublicKeys { - verifier_public_keys: verifier_public_keys.clone(), - }; - async move { - let response = client - .set_verifiers(Request::new(verifier_public_keys)) - .await?; - Ok::<_, Status>(response.into_inner()) - } + let verifier_public_keys = clementine::VerifierPublicKeys { + verifier_public_keys: verifier_public_keys.clone(), + }; + + async move { + client + .set_verifiers(Request::new(verifier_public_keys)) + .await?; + + Ok::<_, Status>(()) } })) .await?; @@ -425,8 +400,10 @@ impl ClementineAggregator for Aggregator { tracing::info!("Collecting operator details..."); let operator_params = try_join_all(self.operator_clients.iter().map(|client| { let mut client = client.clone(); + async move { let mut responses = Vec::new(); + let mut params_stream = client .get_params(Request::new(Empty {})) .await? @@ -440,16 +417,14 @@ impl ClementineAggregator for Aggregator { })) .await?; - tracing::info!("Informing verifiers for existing operators..."); + tracing::info!("Informing verifiers about existing operators..."); try_join_all(self.verifier_clients.iter().map(|client| { let mut client = client.clone(); let operator_params = operator_params.clone(); async move { for params in operator_params { - let (tx, rx) = tokio::sync::mpsc::channel(1280); - let future = - client.set_operator(tokio_stream::wrappers::ReceiverStream::new(rx)); + let (tx, rx) = tokio::sync::mpsc::channel(params.len()); for param in params { tx.send(param) @@ -457,7 +432,9 @@ impl ClementineAggregator for Aggregator { .map_err(|_| output_stream_ended_prematurely())?; } - future.await?; // TODO: This is dangerous: If channel size becomes not sufficient, this will block forever. + client + .set_operator(tokio_stream::wrappers::ReceiverStream::new(rx)) + .await?; } Ok::<_, tonic::Status>(()) @@ -468,8 +445,10 @@ impl ClementineAggregator for Aggregator { tracing::info!("Collecting Winternitz public keys from watchtowers..."); let watchtower_params = try_join_all(self.watchtower_clients.iter().map(|client| { let mut client = client.clone(); + async move { let mut responses = Vec::new(); + let mut params_stream = client .get_params(Request::new(Empty {})) .await? @@ -490,17 +469,17 @@ impl ClementineAggregator for Aggregator { async move { for params in watchtower_params { - let (tx, rx) = tokio::sync::mpsc::channel(1280); + let (tx, rx) = tokio::sync::mpsc::channel(params.len()); - let future = - client.set_watchtower(tokio_stream::wrappers::ReceiverStream::new(rx)); for param in params { tx.send(param) .await .map_err(|_| output_stream_ended_prematurely())?; } - future.await?; // TODO: This is dangerous: If channel size becomes not sufficient, this will block forever. + client + .set_watchtower(tokio_stream::wrappers::ReceiverStream::new(rx)) + .await?; } Ok::<_, tonic::Status>(()) @@ -533,17 +512,12 @@ impl ClementineAggregator for Aggregator { #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))] async fn new_deposit( &self, - deposit_params_req: Request, + request: Request, ) -> Result, Status> { - tracing::info!("Received new deposit request: {:?}", deposit_params_req); + let deposit_params = request.into_inner(); - // Extract and validate deposit parameters - let deposit_params = deposit_params_req.get_ref().clone(); let (deposit_outpoint, evm_address, recovery_taproot_address, user_takes_after) = - parsers::parse_deposit_params(deposit_params.clone())?; - let verifiers_public_keys = self.config.verifiers_public_keys.clone(); - - tracing::debug!("Parsed deposit params"); + parser::parse_deposit_params(deposit_params.clone())?; // Generate nonce streams for all verifiers. let num_required_sigs = calculate_num_required_nofn_sigs(&self.config); @@ -551,7 +525,6 @@ impl ClementineAggregator for Aggregator { create_nonce_streams(self.verifier_clients.clone(), num_required_sigs as u32 + 1) .await?; // ask for +1 for the final movetx signature, but don't send it on deposit_sign stage - // Create deposit signing streams with each verifier let mut partial_sig_streams = try_join_all(self.verifier_clients.iter().map(|verifier_client| { let mut verifier_client = verifier_client.clone(); @@ -560,34 +533,25 @@ impl ClementineAggregator for Aggregator { let (tx, rx) = tokio::sync::mpsc::channel(1280); let stream = verifier_client .deposit_sign(tokio_stream::wrappers::ReceiverStream::new(rx)) - .await?; + .await? + .into_inner(); - Ok::<_, Status>((stream.into_inner(), tx)) + Ok::<_, Status>((stream, tx)) } })) .await?; - tracing::debug!("Generated partial sig streams"); - // Create initial deposit session and send to verifiers let deposit_sign_session = DepositSignSession { - deposit_params: Some(deposit_params_req.into_inner()), + deposit_params: Some(deposit_params.clone()), nonce_gen_first_responses: first_responses, }; tracing::debug!("Sending deposit sign session to verifiers"); - - // Send deposit session to each verifier for (_, tx) in partial_sig_streams.iter_mut() { - tx.send(clementine::VerifierDepositSignParams { - params: Some( - clementine::verifier_deposit_sign_params::Params::DepositSignFirstParam( - deposit_sign_session.clone(), - ), - ), - }) - .await - .map_err(|e| { + let deposit_sign_param: VerifierDepositSignParams = deposit_sign_session.clone().into(); + + tx.send(deposit_sign_param).await.map_err(|e| { Status::internal(format!("Failed to send deposit sign session: {:?}", e)) })?; } @@ -610,14 +574,8 @@ impl ClementineAggregator for Aggregator { deposit_finalize_streams.into_iter().unzip(); // Send initial finalization params - let deposit_finalize_first_param = clementine::VerifierDepositFinalizeParams { - params: Some( - clementine::verifier_deposit_finalize_params::Params::DepositSignFirstParam( - deposit_sign_session.clone(), - ), - ), - }; - + let deposit_finalize_first_param: VerifierDepositFinalizeParams = + deposit_sign_session.clone().into(); for tx in deposit_finalize_sender.iter() { tx.send(deposit_finalize_first_param.clone()) .await @@ -630,7 +588,7 @@ impl ClementineAggregator for Aggregator { } // Create sighash stream for transaction signing - let sighash_stream = create_nofn_sighash_stream( + let sighash_stream = Box::pin(create_nofn_sighash_stream( self.db.clone(), self.config.clone(), deposit_outpoint, @@ -643,8 +601,7 @@ impl ClementineAggregator for Aggregator { 100, self.config.bridge_amount_sats, self.config.network, - ); - let sighash_stream = Box::pin(sighash_stream); + )); // Create channels for pipeline communication let (agg_nonce_sender, agg_nonce_receiver) = channel(32); @@ -668,7 +625,7 @@ impl ClementineAggregator for Aggregator { // Start the signature aggregation pipe. let sig_agg_handle = tokio::spawn(signature_aggregator( partial_sig_receiver, - verifiers_public_keys, + self.config.verifiers_public_keys.clone(), final_sig_sender, )); @@ -715,13 +672,9 @@ impl ClementineAggregator for Aggregator { .map(|tx| async { for sigs in operator_sigs.iter() { for sig in sigs.iter() { - tx.send(VerifierDepositFinalizeParams { - params: Some(verifier_deposit_finalize_params::Params::SchnorrSig( - sig.serialize().to_vec(), - )), - }) - .await - .map_err(|e| { + let deposit_finalize_param: VerifierDepositFinalizeParams = sig.into(); + + tx.send(deposit_finalize_param).await.map_err(|e| { BridgeError::RPCStreamEndedUnexpectedly(format!( "Can't send operator sigs to verifier: {}", e @@ -729,6 +682,7 @@ impl ClementineAggregator for Aggregator { })?; } } + Ok::<(), BridgeError>(()) }) .collect(); @@ -758,8 +712,6 @@ impl ClementineAggregator for Aggregator { #[cfg(test)] mod tests { - use bitcoin::Txid; - use crate::{ config::BridgeConfig, create_test_config_with_thread_name, @@ -780,6 +732,7 @@ mod tests { verifier::Verifier, watchtower::Watchtower, }; + use bitcoin::Txid; use std::{env, str::FromStr, thread}; #[tokio::test] diff --git a/core/src/rpc/error.rs b/core/src/rpc/error.rs index 5b865f2f..e37657e2 100644 --- a/core/src/rpc/error.rs +++ b/core/src/rpc/error.rs @@ -1,8 +1,7 @@ use std::fmt::Display; - use tonic::Status; -pub(crate) fn expected_msg_got_error(msg: Status) -> Status { +pub(crate) fn _expected_msg_got_error(msg: Status) -> Status { Status::invalid_argument(format!("Expected message, got error: {msg}")) } diff --git a/core/src/rpc/mod.rs b/core/src/rpc/mod.rs index bb8f93c9..36b03e43 100644 --- a/core/src/rpc/mod.rs +++ b/core/src/rpc/mod.rs @@ -9,10 +9,9 @@ pub mod clementine; pub mod aggregator; mod error; pub mod operator; -mod parsers; +mod parser; pub mod verifier; pub mod watchtower; -mod wrapper; /// Returns gRPC clients. /// diff --git a/core/src/rpc/operator.rs b/core/src/rpc/operator.rs index d31f4eca..be35f921 100644 --- a/core/src/rpc/operator.rs +++ b/core/src/rpc/operator.rs @@ -1,13 +1,13 @@ use super::clementine::{ - self, clementine_operator_server::ClementineOperator, operator_params, ChallengeAckDigest, - DepositSignSession, Empty, NewWithdrawalSigParams, NewWithdrawalSigResponse, OperatorBurnSig, - OperatorParams, WithdrawalFinalizedParams, + clementine_operator_server::ClementineOperator, DepositSignSession, Empty, + NewWithdrawalSigParams, NewWithdrawalSigResponse, OperatorBurnSig, OperatorParams, + WithdrawalFinalizedParams, }; use super::error::*; use crate::builder::sighash::create_operator_sighash_stream; -use crate::rpc::parsers; +use crate::rpc::parser; use crate::{errors::BridgeError, operator::Operator}; -use bitcoin::{hashes::Hash, Amount, OutPoint}; +use bitcoin::{Amount, OutPoint}; use futures::StreamExt; use std::pin::pin; use tokio::sync::mpsc; @@ -20,80 +20,52 @@ impl ClementineOperator for Operator { type GetParamsStream = ReceiverStream>; #[tracing::instrument(skip_all, err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))] - #[allow(clippy::blocks_in_conditions)] async fn get_params( &self, _request: Request, ) -> Result, Status> { let operator = self.clone(); - let (tx, rx) = mpsc::channel(1280); + let out_stream: Self::GetParamsStream = ReceiverStream::new(rx); + tokio::spawn(async move { - let operator_config = clementine::OperatorConfig { - operator_idx: operator.idx as u32, - collateral_funding_txid: operator.collateral_funding_txid.to_byte_array().to_vec(), - xonly_pk: operator.signer.xonly_public_key.to_string(), - wallet_reimburse_address: operator.config.operator_wallet_addresses[operator.idx] // TODO: Fix this where the config will only have one address. - .clone() - .assume_checked() - .to_string(), - }; - tx.send(Ok(OperatorParams { - response: Some(operator_params::Response::OperatorDetails(operator_config)), - })) - .await - .map_err(|_| output_stream_ended_prematurely())?; - - let winternitz_pubkeys = operator.get_winternitz_public_keys()?; - let winternitz_pubkeys = winternitz_pubkeys - .into_iter() - .map(From::from) - .collect::>(); - for wpk in winternitz_pubkeys { - tx.send(Ok(OperatorParams { - response: Some(operator_params::Response::WinternitzPubkeys(wpk)), - })) + let operator_config: OperatorParams = operator.clone().into(); + tx.send(Ok(operator_config)) .await .map_err(|_| output_stream_ended_prematurely())?; + + for winternitz_public_key in operator.get_winternitz_public_keys()? { + let operator_winternitz_pubkey: OperatorParams = winternitz_public_key.into(); + tx.send(Ok(operator_winternitz_pubkey)) + .await + .map_err(|_| output_stream_ended_prematurely())?; } - let public_hashes = operator.generate_challenge_ack_preimages_and_hashes()?; - let public_hashes = public_hashes - .into_iter() - .map(|hash| ChallengeAckDigest { - hash: hash.to_vec(), - }) - .collect::>(); - - for hash in public_hashes { - tx.send(Ok(OperatorParams { - response: Some(operator_params::Response::ChallengeAckDigests(hash)), - })) - .await - .map_err(|_| output_stream_ended_prematurely())?; + for hash in operator.generate_challenge_ack_preimages_and_hashes()? { + let hash: OperatorParams = hash.into(); + tx.send(Ok(hash)) + .await + .map_err(|_| output_stream_ended_prematurely())?; } Ok::<(), Status>(()) }); - let out_stream: Self::GetParamsStream = ReceiverStream::new(rx); Ok(Response::new(out_stream)) } #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))] - #[allow(clippy::blocks_in_conditions)] async fn deposit_sign( &self, request: Request, ) -> Result, Status> { + let operator = self.clone(); + let (tx, rx) = mpsc::channel(1280); let deposit_sign_session = request.into_inner(); + let (deposit_outpoint, evm_address, recovery_taproot_address, user_takes_after) = - match deposit_sign_session.deposit_params { - Some(deposit_params) => parsers::parse_deposit_params(deposit_params)?, - _ => return Err(expected_msg_got_none("Deposit Params")()), - }; - let (tx, rx) = mpsc::channel(1280); - let operator = self.clone(); + parser::parse_deposit_params(deposit_sign_session.try_into()?)?; + tokio::spawn(async move { let mut sighash_stream = pin!(create_operator_sighash_stream( operator.db, @@ -112,25 +84,28 @@ impl ClementineOperator for Operator { operator.config.bridge_amount_sats, operator.config.network, )); - while let Some(sighash_result) = sighash_stream.next().await { - let sighash = sighash_result?; + + while let Some(sighash) = sighash_stream.next().await { + let sighash = sighash?; + // None because utxos that operators need to sign do not have scripts let sig = operator.signer.sign_with_tweak(sighash, None)?; - let operator_burn_sig = OperatorBurnSig { schnorr_sig: sig.serialize().to_vec(), }; + if tx.send(Ok(operator_burn_sig)).await.is_err() { break; } } + Ok::<_, BridgeError>(()) }); + Ok(Response::new(ReceiverStream::new(rx))) } #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))] - #[allow(clippy::blocks_in_conditions)] async fn new_withdrawal_sig( &self, _: Request, @@ -139,7 +114,6 @@ impl ClementineOperator for Operator { } #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))] - #[allow(clippy::blocks_in_conditions)] async fn withdrawal_finalized( &self, request: Request, diff --git a/core/src/rpc/wrapper.rs b/core/src/rpc/parser/mod.rs similarity index 53% rename from core/src/rpc/wrapper.rs rename to core/src/rpc/parser/mod.rs index 17efcc63..fc79a418 100644 --- a/core/src/rpc/wrapper.rs +++ b/core/src/rpc/parser/mod.rs @@ -1,9 +1,76 @@ -//! # Wrapper For Converting Proto Structures - use super::clementine::{Outpoint, WinternitzPubkey}; +use super::error; use crate::errors::BridgeError; -use bitcoin::{hashes::Hash, OutPoint, Txid}; +use crate::rpc::clementine::DepositParams; +use crate::EVMAddress; +use bitcoin::address::NetworkUnchecked; +use bitcoin::hashes::Hash; +use bitcoin::{OutPoint, Txid}; use bitvm::signatures::winternitz; +use std::fmt::{Debug, Display}; +use std::num::TryFromIntError; +use tonic::Status; + +pub mod operator; +pub mod verifier; +pub mod watchtower; + +/// Converts an integer type in to another integer type. This is needed because +/// tonic defaults to wrong integer types for some parameters. +pub fn convert_int_to_another( + field_name: &str, + value: SOURCE, + try_from: fn(SOURCE) -> Result, +) -> Result +where + SOURCE: Copy + Debug + Display, +{ + try_from(value) + .map_err(|e| error::invalid_argument(field_name, "Given number is out of bounds")(e)) +} + +/// Fetches the next message from a stream which is unwrapped and encapsulated +/// by a [`Result`]. +/// +/// # Parameters +/// +/// - stream: [`tonic::Streaming`] typed input stream +/// - field: Input field ident (struct member) to look in the next message +/// +/// # Returns +/// +/// A [`Result`] containing the next message. Will return an [`Err`] variant if +/// stream has exhausted. +#[macro_export] +macro_rules! fetch_next_message_from_stream { + ($stream:expr, $field:ident) => { + $crate::fetch_next_optional_message_from_stream!($stream, $field).ok_or( + $crate::rpc::error::expected_msg_got_none(stringify!($field))(), + ) + }; +} + +/// Fetches next message from a stream. +/// +/// # Parameters +/// +/// - stream: [`tonic::Streaming`] typed input stream +/// - field: Input field ident (struct member) to look in the next message +/// +/// # Returns +/// +/// An [`Option`] containing the next message. Will return a [`None`] variant if +/// stream has exhausted. +#[macro_export] +macro_rules! fetch_next_optional_message_from_stream { + ($stream:expr, $field:ident) => { + $stream + .message() + .await? + .ok_or($crate::rpc::error::input_ended_prematurely())? + .$field + }; +} impl TryFrom for OutPoint { type Error = BridgeError; @@ -49,7 +116,6 @@ impl TryFrom for winternitz::PublicKey { .collect::, BridgeError>>() } } - impl From for WinternitzPubkey { fn from(value: winternitz::PublicKey) -> Self { { @@ -60,6 +126,39 @@ impl From for WinternitzPubkey { } } +pub fn parse_deposit_params( + deposit_params: DepositParams, +) -> Result< + ( + bitcoin::OutPoint, + EVMAddress, + bitcoin::Address, + u16, + ), + Status, +> { + let deposit_outpoint: bitcoin::OutPoint = deposit_params + .deposit_outpoint + .ok_or(Status::invalid_argument("No deposit outpoint received"))? + .try_into()?; + let evm_address: EVMAddress = deposit_params + .evm_address + .try_into() + .map_err(|_| Status::invalid_argument("Could not parse deposit outpoint EVM address"))?; + let recovery_taproot_address = deposit_params + .recovery_taproot_address + .parse::>() + .map_err(|e| Status::internal(e.to_string()))?; + let user_takes_after = deposit_params.user_takes_after; + + Ok(( + deposit_outpoint, + evm_address, + recovery_taproot_address, + convert_int_to_another("user_takes_after", user_takes_after, u16::try_from)?, + )) +} + #[cfg(test)] mod tests { use crate::rpc::clementine::{Outpoint, WinternitzPubkey}; diff --git a/core/src/rpc/parser/operator.rs b/core/src/rpc/parser/operator.rs new file mode 100644 index 00000000..aa943a56 --- /dev/null +++ b/core/src/rpc/parser/operator.rs @@ -0,0 +1,155 @@ +use crate::{ + fetch_next_message_from_stream, + operator::{Operator, PublicHash}, + rpc::{ + clementine::{ + self, operator_params, ChallengeAckDigest, DepositParams, DepositSignSession, + OperatorParams, + }, + error::expected_msg_got_none, + }, +}; +use bitcoin::{hashes::Hash, Address, Txid, XOnlyPublicKey}; +use bitvm::signatures::winternitz; +use std::str::FromStr; +use tonic::Status; + +impl From for OperatorParams { + fn from(operator: Operator) -> Self { + let operator_config = clementine::OperatorConfig { + operator_idx: operator.idx as u32, + collateral_funding_txid: operator.collateral_funding_txid.to_byte_array().to_vec(), + xonly_pk: operator.signer.xonly_public_key.to_string(), + wallet_reimburse_address: operator.config.operator_wallet_addresses[operator.idx] // TODO: Fix this where the config will only have one address. + .clone() + .assume_checked() + .to_string(), + }; + + OperatorParams { + response: Some(operator_params::Response::OperatorDetails(operator_config)), + } + } +} + +impl From for OperatorParams { + fn from(winternitz_pubkey: winternitz::PublicKey) -> Self { + OperatorParams { + response: Some(operator_params::Response::WinternitzPubkeys( + winternitz_pubkey.into(), + )), + } + } +} + +impl From for OperatorParams { + fn from(public_hash: PublicHash) -> Self { + let hash = ChallengeAckDigest { + hash: public_hash.to_vec(), + }; + + OperatorParams { + response: Some(operator_params::Response::ChallengeAckDigests(hash)), + } + } +} + +impl TryFrom for DepositParams { + type Error = Status; + + fn try_from(deposit_sign_session: DepositSignSession) -> Result { + match deposit_sign_session.deposit_params { + Some(deposit_params) => Ok(deposit_params), + None => Err(expected_msg_got_none("Deposit Params")()), + } + } +} + +/// Parses operator configuration from a given stream. +/// +/// # Returns +/// +/// A tuple, containing: +/// +/// - Operator index +/// - Collateral Funding txid +/// - Operator's X-only public key +/// - Wallet reimburse address +pub async fn parse_details( + stream: &mut tonic::Streaming, +) -> Result<(u32, Txid, XOnlyPublicKey, Address), Status> { + let operator_param = fetch_next_message_from_stream!(stream, response)?; + + let operator_config = + if let operator_params::Response::OperatorDetails(operator_config) = operator_param { + operator_config + } else { + return Err(expected_msg_got_none("OperatorDetails")()); + }; + + let operator_xonly_pk = XOnlyPublicKey::from_str(&operator_config.xonly_pk) + .map_err(|_| Status::invalid_argument("Invalid operator xonly public key".to_string()))?; + + let collateral_funding_txid = Txid::from_byte_array( + operator_config + .collateral_funding_txid + .try_into() + .map_err(|e| { + Status::invalid_argument(format!( + "Failed to convert collateral funding txid to Txid: {:?}", + e + )) + })?, + ); + + let wallet_reimburse_address = Address::from_str(&operator_config.wallet_reimburse_address) + .map_err(|e| { + Status::invalid_argument(format!("Failed to parse wallet reimburse address: {:?}", e)) + })? + .assume_checked(); + + Ok(( + operator_config.operator_idx, + collateral_funding_txid, + operator_xonly_pk, + wallet_reimburse_address, + )) +} + +pub async fn parse_challenge_ack_public_hash( + stream: &mut tonic::Streaming, +) -> Result<[u8; 20], Status> { + let operator_param = fetch_next_message_from_stream!(stream, response)?; + + let digest = if let operator_params::Response::ChallengeAckDigests(digest) = operator_param { + digest + } else { + return Err(Status::invalid_argument("Expected ChallengeAckDigests")); + }; + + // Ensure `digest.hash` is exactly 20 bytes + if digest.hash.len() != 20 { + return Err(Status::invalid_argument( + "Digest hash length is not 20 bytes", + )); + } + + let public_hash: [u8; 20] = digest + .hash + .try_into() + .map_err(|_| Status::invalid_argument("Failed to convert digest hash into PublicHash"))?; + + Ok(public_hash) +} + +pub async fn parse_winternitz_public_keys( + stream: &mut tonic::Streaming, +) -> Result { + let operator_param = fetch_next_message_from_stream!(stream, response)?; + + if let operator_params::Response::WinternitzPubkeys(wpk) = operator_param { + Ok(wpk.try_into()?) + } else { + Err(expected_msg_got_none("WinternitzPubkeys")()) + } +} diff --git a/core/src/rpc/parser/verifier.rs b/core/src/rpc/parser/verifier.rs new file mode 100644 index 00000000..47ae0122 --- /dev/null +++ b/core/src/rpc/parser/verifier.rs @@ -0,0 +1,250 @@ +use super::convert_int_to_another; +use crate::errors::BridgeError; +use crate::fetch_next_optional_message_from_stream; +use crate::rpc::clementine::{ + nonce_gen_response, verifier_deposit_sign_params, DepositSignSession, NonceGenFirstResponse, + PartialSig, VerifierDepositSignParams, VerifierParams, +}; +use crate::verifier::Verifier; +use crate::{ + fetch_next_message_from_stream, + rpc::{ + clementine::{ + self, verifier_deposit_finalize_params, NonceGenResponse, + VerifierDepositFinalizeParams, VerifierPublicKeys, + }, + error::{self, invalid_argument}, + }, + EVMAddress, +}; +use bitcoin::secp256k1::schnorr::Signature; +use bitcoin::secp256k1::PublicKey; +use bitcoin::{address::NetworkUnchecked, secp256k1::schnorr}; +use secp256k1::musig::{MusigAggNonce, MusigPartialSignature, MusigPubNonce}; +use tonic::Status; + +impl TryFrom<&Verifier> for VerifierParams { + type Error = Status; + + fn try_from(verifier: &Verifier) -> Result { + Ok(VerifierParams { + id: convert_int_to_another("id", verifier.idx, u32::try_from)?, + public_key: verifier.signer.public_key.serialize().to_vec(), + num_verifiers: convert_int_to_another( + "num_verifiers", + verifier.config.num_verifiers, + u32::try_from, + )?, + num_watchtowers: convert_int_to_another( + "num_watchtowers", + verifier.config.num_watchtowers, + u32::try_from, + )?, + num_operators: convert_int_to_another( + "num_operators", + verifier.config.num_operators, + u32::try_from, + )?, + num_sequential_collateral_txs: convert_int_to_another( + "num_sequential_collateral_txs", + verifier.config.num_sequential_collateral_txs, + u32::try_from, + )?, + }) + } +} + +impl TryFrom for Vec { + type Error = BridgeError; + + fn try_from(value: VerifierPublicKeys) -> Result { + let inner = value.verifier_public_keys; + + inner + .iter() + .map(|inner_vec| { + PublicKey::from_slice(inner_vec).map_err(|e| { + BridgeError::RPCParamMalformed( + "verifier_public_keys".to_string(), + e.to_string(), + ) + }) + }) + .collect::, _>>() + } +} +impl From> for VerifierPublicKeys { + fn from(value: Vec) -> Self { + let verifier_public_keys: Vec> = value + .into_iter() + .map(|inner| inner.serialize().to_vec()) + .collect(); + + VerifierPublicKeys { + verifier_public_keys, + } + } +} + +impl From for VerifierDepositSignParams { + fn from(value: DepositSignSession) -> Self { + VerifierDepositSignParams { + params: Some(verifier_deposit_sign_params::Params::DepositSignFirstParam( + value, + )), + } + } +} + +impl From for VerifierDepositFinalizeParams { + fn from(value: DepositSignSession) -> Self { + VerifierDepositFinalizeParams { + params: Some( + verifier_deposit_finalize_params::Params::DepositSignFirstParam(value.clone()), + ), + } + } +} + +impl From<&Signature> for VerifierDepositFinalizeParams { + fn from(value: &Signature) -> Self { + VerifierDepositFinalizeParams { + params: Some(verifier_deposit_finalize_params::Params::SchnorrSig( + value.serialize().to_vec(), + )), + } + } +} + +impl From for NonceGenResponse { + fn from(value: NonceGenFirstResponse) -> Self { + NonceGenResponse { + response: Some(nonce_gen_response::Response::FirstResponse(value)), + } + } +} + +impl From<&MusigPubNonce> for NonceGenResponse { + fn from(value: &MusigPubNonce) -> Self { + NonceGenResponse { + response: Some(nonce_gen_response::Response::PubNonce( + value.serialize().to_vec(), + )), + } + } +} + +impl From for PartialSig { + fn from(value: MusigPartialSignature) -> Self { + PartialSig { + partial_sig: value.serialize().to_vec(), + } + } +} + +pub fn parse_deposit_params( + deposit_sign_session: clementine::DepositSignSession, + verifier_idx: usize, +) -> Result< + ( + bitcoin::OutPoint, + EVMAddress, + bitcoin::Address, + u16, + u32, + ), + Status, +> { + let deposit_params = deposit_sign_session + .deposit_params + .ok_or(Status::invalid_argument("No deposit outpoint received"))?; + + let deposit_outpoint: bitcoin::OutPoint = deposit_params + .deposit_outpoint + .ok_or(Status::invalid_argument("No deposit outpoint received"))? + .try_into()?; + let evm_address: EVMAddress = deposit_params.evm_address.try_into().map_err(|e| { + Status::invalid_argument(format!( + "Failed to convert evm_address to EVMAddress: {}", + e + )) + })?; + let recovery_taproot_address = deposit_params + .recovery_taproot_address + .parse::>() + .map_err(|e| Status::internal(e.to_string()))?; + let user_takes_after = deposit_params.user_takes_after; + let session_id = deposit_sign_session.nonce_gen_first_responses[verifier_idx].id; + + Ok(( + deposit_outpoint, + evm_address, + recovery_taproot_address, + super::convert_int_to_another("user_takes_after", user_takes_after, u16::try_from)?, + session_id, + )) +} + +pub fn parse_partial_sigs( + partial_sigs: Vec>, +) -> Result, Status> { + partial_sigs + .iter() + .enumerate() + .map(|(idx, sig)| { + MusigPartialSignature::from_slice(sig).map_err(|e| { + error::invalid_argument( + "partial_sig", + format!("Verifier {idx} returned an invalid partial signature").as_str(), + )(e) + }) + }) + .collect::, _>>() +} + +pub async fn parse_next_deposit_finalize_param_schnorr_sig( + stream: &mut tonic::Streaming, +) -> Result, Status> { + let sig = match fetch_next_optional_message_from_stream!(stream, params) { + Some(sig) => sig, + None => return Ok(None), + }; + + let final_sig = match sig { + verifier_deposit_finalize_params::Params::SchnorrSig(final_sig) => { + schnorr::Signature::from_slice(&final_sig) + .map_err(invalid_argument("FinalSig", "Invalid signature length"))? + } + _ => return Err(Status::internal("Expected FinalSig")), + }; + + Ok(Some(final_sig)) +} + +pub async fn parse_deposit_finalize_param_agg_nonce( + stream: &mut tonic::Streaming, +) -> Result { + let sig = fetch_next_message_from_stream!(stream, params)?; + + match sig { + verifier_deposit_finalize_params::Params::MoveTxAggNonce(aggnonce) => { + Ok(MusigAggNonce::from_slice(&aggnonce) + .map_err(invalid_argument("MusigAggNonce", "failed to parse"))?) + } + _ => Err(Status::internal("Expected FinalSig")), + } +} + +pub async fn parse_nonce_gen_first_response( + stream: &mut tonic::Streaming, +) -> Result { + let nonce_gen_response = fetch_next_message_from_stream!(stream, response)?; + + if let clementine::nonce_gen_response::Response::FirstResponse(nonce_gen_first_response) = + nonce_gen_response + { + Ok(nonce_gen_first_response) + } else { + Err(Status::invalid_argument("Expected first_response")) + } +} diff --git a/core/src/rpc/parser/watchtower.rs b/core/src/rpc/parser/watchtower.rs new file mode 100644 index 00000000..24d61d8d --- /dev/null +++ b/core/src/rpc/parser/watchtower.rs @@ -0,0 +1,68 @@ +use crate::{ + fetch_next_message_from_stream, + rpc::{ + clementine::{watchtower_params, WatchtowerParams}, + error, + }, +}; +use bitcoin::XOnlyPublicKey; +use bitvm::signatures::winternitz::{self, PublicKey as WinternitzPublicKey}; +use tonic::Status; + +impl From for WatchtowerParams { + fn from(value: winternitz::PublicKey) -> Self { + let wpk = value.into(); + + WatchtowerParams { + response: Some(watchtower_params::Response::WinternitzPubkeys(wpk)), + } + } +} + +impl From for WatchtowerParams { + fn from(value: XOnlyPublicKey) -> Self { + let xonly_pk = value.serialize().to_vec(); + + WatchtowerParams { + response: Some(watchtower_params::Response::XonlyPk(xonly_pk)), + } + } +} + +pub async fn parse_id(stream: &mut tonic::Streaming) -> Result { + let watchtower_param = fetch_next_message_from_stream!(stream, response)?; + + if let watchtower_params::Response::WatchtowerId(watchtower_id) = watchtower_param { + Ok(watchtower_id) + } else { + Err(Status::invalid_argument("Expected watchtower id")) + } +} + +pub async fn parse_winternitz_public_key( + stream: &mut tonic::Streaming, +) -> Result { + let watchtower_param = fetch_next_message_from_stream!(stream, response)?; + + if let watchtower_params::Response::WinternitzPubkeys(wpk) = watchtower_param { + Ok(wpk.try_into()?) + } else { + Err(Status::invalid_argument("Expected WinternitzPubkeys")) + } +} + +pub async fn parse_xonly_pk( + stream: &mut tonic::Streaming, +) -> Result { + let watchtower_param = fetch_next_message_from_stream!(stream, response)?; + + if let watchtower_params::Response::XonlyPk(xonly_pk) = watchtower_param { + let xonly_pk = XOnlyPublicKey::from_slice(&xonly_pk).map_err(|e| { + error::invalid_argument("xonly_pk", "Can't convert bytes in to XOnlyPublicKey")(e) + })?; + + Ok(xonly_pk) + } else { + Err(Status::invalid_argument("Expected x-only-pk")) // TODO: tell whats returned too + } +} diff --git a/core/src/rpc/parsers/mod.rs b/core/src/rpc/parsers/mod.rs deleted file mode 100644 index 3a227181..00000000 --- a/core/src/rpc/parsers/mod.rs +++ /dev/null @@ -1,41 +0,0 @@ -use crate::rpc::clementine::DepositParams; -use crate::EVMAddress; -use bitcoin::address::NetworkUnchecked; -use tonic::Status; - -pub fn parse_deposit_params( - deposit_params: DepositParams, -) -> Result< - ( - bitcoin::OutPoint, - EVMAddress, - bitcoin::Address, - u16, - ), - Status, -> { - let deposit_outpoint: bitcoin::OutPoint = deposit_params - .deposit_outpoint - .ok_or(Status::invalid_argument("No deposit outpoint received"))? - .try_into()?; - let evm_address: EVMAddress = deposit_params - .evm_address - .try_into() - .map_err(|_| Status::invalid_argument("Could not parse deposit outpoint EVM address"))?; - let recovery_taproot_address = deposit_params - .recovery_taproot_address - .parse::>() - .map_err(|e| Status::internal(e.to_string()))?; - let user_takes_after = deposit_params.user_takes_after; - Ok(( - deposit_outpoint, - evm_address, - recovery_taproot_address, - u16::try_from(user_takes_after).map_err(|e| { - Status::invalid_argument(format!( - "user_takes_after is too big, failed to convert: {}", - e - )) - })?, - )) -} diff --git a/core/src/rpc/verifier.rs b/core/src/rpc/verifier.rs index 93cfec48..d5cee16d 100644 --- a/core/src/rpc/verifier.rs +++ b/core/src/rpc/verifier.rs @@ -1,9 +1,11 @@ use super::clementine::{ - self, clementine_verifier_server::ClementineVerifier, nonce_gen_response, operator_params, - watchtower_params, Empty, NonceGenRequest, NonceGenResponse, OperatorParams, PartialSig, - VerifierDepositFinalizeParams, VerifierDepositSignParams, VerifierParams, VerifierPublicKeys, - WatchtowerParams, + self, clementine_verifier_server::ClementineVerifier, Empty, NonceGenRequest, NonceGenResponse, + OperatorParams, PartialSig, VerifierDepositFinalizeParams, VerifierDepositSignParams, + VerifierParams, VerifierPublicKeys, WatchtowerParams, }; +use super::error::*; +use crate::fetch_next_optional_message_from_stream; +use crate::utils::SECP; use crate::{ builder::{ self, @@ -15,12 +17,13 @@ use crate::{ transaction::create_move_to_vault_txhandler, }, errors::BridgeError, + fetch_next_message_from_stream, musig2::{self}, + rpc::parser::{self}, utils, verifier::{NofN, NonceSession, Verifier}, - EVMAddress, }; -use bitcoin::{address::NetworkUnchecked, hashes::Hash, Amount, TapTweakHash, Txid}; +use bitcoin::{hashes::Hash, Amount, TapTweakHash, Txid}; use bitcoin::{ secp256k1::{schnorr, Message, PublicKey}, ScriptBuf, XOnlyPublicKey, @@ -29,87 +32,28 @@ use bitvm::signatures::{ signing_winternitz::{generate_winternitz_checksig_leave_variable, WinternitzPublicKey}, winternitz, }; - -use super::error::*; -use crate::utils::SECP; use futures::StreamExt; use secp256k1::musig::{MusigAggNonce, MusigPubNonce, MusigSecNonce}; use std::collections::BTreeMap; -use std::{pin::pin, str::FromStr}; +use std::pin::pin; use tokio::sync::mpsc::{self, error::SendError}; use tokio_stream::wrappers::ReceiverStream; use tonic::{async_trait, Request, Response, Status, Streaming}; -fn get_deposit_params( - deposit_sign_session: clementine::DepositSignSession, - verifier_idx: usize, -) -> Result< - ( - bitcoin::OutPoint, - EVMAddress, - bitcoin::Address, - u16, - u32, - ), - Status, -> { - let deposit_params = deposit_sign_session - .deposit_params - .ok_or(Status::invalid_argument("No deposit outpoint received"))?; - let deposit_outpoint: bitcoin::OutPoint = deposit_params - .deposit_outpoint - .ok_or(Status::invalid_argument("No deposit outpoint received"))? - .try_into()?; - let evm_address: EVMAddress = deposit_params.evm_address.try_into().map_err(|e| { - Status::invalid_argument(format!( - "Failed to convert evm_address to EVMAddress: {}", - e - )) - })?; - let recovery_taproot_address = deposit_params - .recovery_taproot_address - .parse::>() - .map_err(|e| Status::internal(e.to_string()))?; - let user_takes_after = deposit_params.user_takes_after; - - let session_id = deposit_sign_session.nonce_gen_first_responses[verifier_idx].id; - Ok(( - deposit_outpoint, - evm_address, - recovery_taproot_address, - u16::try_from(user_takes_after).map_err(|e| { - Status::invalid_argument(format!( - "user_takes_after is too big, failed to convert: {}", - e - )) - })?, - session_id, - )) -} #[async_trait] impl ClementineVerifier for Verifier { type NonceGenStream = ReceiverStream>; type DepositSignStream = ReceiverStream>; #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))] - #[allow(clippy::blocks_in_conditions)] async fn get_params(&self, _: Request) -> Result, Status> { - let public_key = self.signer.public_key.serialize().to_vec(); - - let params = VerifierParams { - id: self.idx as u32, - public_key, - num_verifiers: self.config.num_verifiers as u32, - num_watchtowers: self.config.num_watchtowers as u32, - num_operators: self.config.num_operators as u32, - num_sequential_collateral_txs: self.config.num_sequential_collateral_txs as u32, - }; + let params: VerifierParams = self.try_into()?; Ok(Response::new(params)) } + /// TODO: This function's contents can be fully moved in to core::verifier. #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))] - #[allow(clippy::blocks_in_conditions)] async fn set_verifiers( &self, req: Request, @@ -119,20 +63,7 @@ impl ClementineVerifier for Verifier { return Err(Status::internal("Verifiers already set")); } - // Extract the public keys from the request - let verifiers_public_keys = req - .into_inner() - .verifier_public_keys - .iter() - .map(|pk| { - PublicKey::from_slice(pk).map_err(|e| { - BridgeError::RPCParamMalformed( - "verifier_public_keys".to_string(), - e.to_string(), - ) - }) - }) - .collect::, BridgeError>>()?; + let verifiers_public_keys: Vec = req.into_inner().try_into()?; let nofn = NofN::new(self.signer.public_key, verifiers_public_keys.clone())?; @@ -147,53 +78,24 @@ impl ClementineVerifier for Verifier { Ok(Response::new(Empty {})) } - // #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))] - #[allow(clippy::blocks_in_conditions)] + #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))] async fn set_operator( &self, req: Request>, ) -> Result, Status> { let mut in_stream = req.into_inner(); - let operator_params = in_stream - .message() - .await - .map_err(expected_msg_got_error)? - .ok_or_else(input_ended_prematurely)? - .response - .ok_or_else(expected_msg_got_none("Response"))?; - - let operator_details = - if let operator_params::Response::OperatorDetails(operator_config) = operator_params { - operator_config - } else { - return Err(expected_msg_got_none("OperatorDetails")()); - }; - - let operator_xonly_pk = - XOnlyPublicKey::from_str(&operator_details.xonly_pk).map_err(|_| { - Status::invalid_argument("Invalid operator xonly public key".to_string()) - })?; + let (operator_idx, collateral_funding_txid, operator_xonly_pk, wallet_reimburse_address) = + parser::operator::parse_details(&mut in_stream).await?; // Save the operator details to the db self.db .set_operator( None, - operator_details.operator_idx as i32, + operator_idx as i32, operator_xonly_pk, - operator_details.wallet_reimburse_address, - Txid::from_byte_array( - operator_details - .collateral_funding_txid - .clone() - .try_into() - .map_err(|e| { - Status::invalid_argument(format!( - "Failed to convert collateral funding txid to Txid: {:?}", - e - )) - })?, - ), + wallet_reimburse_address.to_string(), + collateral_funding_txid, ) .await?; @@ -202,32 +104,14 @@ impl ClementineVerifier for Verifier { * self.config.num_sequential_collateral_txs * utils::ALL_BITVM_INTERMEDIATE_VARIABLES.len() { - let operator_params = in_stream - .message() - .await? - .ok_or(Status::invalid_argument( - "Operator param stream ended early", - ))? - .response - .ok_or(Status::invalid_argument( - "Operator param stream ended early", - ))?; - - if let operator_params::Response::WinternitzPubkeys(wpk) = operator_params { - operator_winternitz_public_keys.push(wpk.try_into()?); - } else { - return Err(expected_msg_got_none("WinternitzPubkeys")()); - } + operator_winternitz_public_keys + .push(parser::operator::parse_winternitz_public_keys(&mut in_stream).await?); } - let operator_winternitz_public_keys = operator_winternitz_public_keys - .into_iter() - .map(Ok) - .collect::, BridgeError>>()?; self.db .set_operator_winternitz_public_keys( None, - operator_details.operator_idx, + operator_idx, operator_winternitz_public_keys.clone(), ) .await?; @@ -237,34 +121,8 @@ impl ClementineVerifier for Verifier { * self.config.num_kickoffs_per_sequential_collateral_tx * self.config.num_watchtowers { - let operator_params = in_stream - .message() - .await? - .ok_or(Status::invalid_argument( - "Operator param stream ended early", - ))? - .response - .ok_or(Status::invalid_argument( - "Operator param stream ended early", - ))?; - - if let operator_params::Response::ChallengeAckDigests(digest) = operator_params { - // Ensure `digest.hash` is exactly 20 bytes - if digest.hash.len() != 20 { - return Err(Status::invalid_argument( - "Digest hash length is not 20 bytes", - )); - } - - // Convert the `Vec` into a `[u8; 20]` - let public_hash: [u8; 20] = digest.hash.try_into().map_err(|_| { - Status::invalid_argument("Failed to convert digest hash into PublicHash") - })?; - - operators_challenge_ack_public_hashes.push(public_hash); - } else { - return Err(Status::invalid_argument("Expected ChallengeAckDigests")); - } + operators_challenge_ack_public_hashes + .push(parser::operator::parse_challenge_ack_public_hash(&mut in_stream).await?); } for i in 0..self.config.num_sequential_collateral_txs { @@ -272,7 +130,7 @@ impl ClementineVerifier for Verifier { self.db .set_operator_challenge_ack_hashes( None, - operator_details.operator_idx as i32, + operator_idx as i32, i as i32, j as i32, &operators_challenge_ack_public_hashes[self.config.num_watchtowers @@ -303,7 +161,7 @@ impl ClementineVerifier for Verifier { .iter() .enumerate() .map(|(idx, (intermediate_step, intermediate_step_size))| { - let winternitz_pk: WinternitzPublicKey = WinternitzPublicKey { + let winternitz_pk = WinternitzPublicKey { public_key: winternitz_public_keys[idx].clone(), parameters: winternitz::Parameters::new( *intermediate_step_size as u32 * 2, @@ -375,7 +233,7 @@ impl ClementineVerifier for Verifier { self.db .set_bitvm_setup( None, - operator_details.operator_idx as i32, + operator_idx as i32, sequential_collateral_tx_idx as i32, kickoff_idx as i32, assert_tx_addrs, @@ -389,7 +247,6 @@ impl ClementineVerifier for Verifier { } #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))] - #[allow(clippy::blocks_in_conditions)] async fn set_watchtower( &self, request: Request>, @@ -402,38 +259,13 @@ impl ClementineVerifier for Verifier { } = &self.config; let mut in_stream = request.into_inner(); - let watchtower_id = in_stream - .message() - .await? - .ok_or(Status::invalid_argument("No message is received"))? - .response - .ok_or(Status::invalid_argument("No message is received"))?; - let watchtower_id = - if let watchtower_params::Response::WatchtowerId(watchtower_id) = watchtower_id { - watchtower_id - } else { - return Err(Status::invalid_argument("Expected watchtower id")); - }; + let watchtower_id = parser::watchtower::parse_id(&mut in_stream).await?; let mut watchtower_winternitz_public_keys = Vec::new(); for _ in 0..self.config.num_operators { - let wpks = in_stream - .message() - .await? - .ok_or(Status::invalid_argument("No message is received"))? - .response - .ok_or(Status::invalid_argument("No message is received"))?; - - if let watchtower_params::Response::WinternitzPubkeys(wpk) = wpks { - watchtower_winternitz_public_keys.push(wpk.try_into()?); - } else { - return Err(Status::invalid_argument("Expected WinternitzPubkeys")); - } + watchtower_winternitz_public_keys + .push(parser::watchtower::parse_winternitz_public_key(&mut in_stream).await?); } - let watchtower_winternitz_public_keys = watchtower_winternitz_public_keys - .into_iter() - .map(Ok) - .collect::, BridgeError>>()?; let required_number_of_pubkeys = num_operators * num_sequential_collateral_txs @@ -446,33 +278,13 @@ impl ClementineVerifier for Verifier { ))); } - let xonly_pk = in_stream - .message() - .await? - .ok_or(Status::invalid_argument("No message is received"))? - .response - .ok_or(Status::invalid_argument("No message is received"))?; - let xonly_pk = if let watchtower_params::Response::XonlyPk(xonly_pk) = xonly_pk { - xonly_pk - } else { - return Err(Status::invalid_argument("Expected x-only-pk")); // TODO: tell whats returned too - }; - tracing::info!( - "Verifier receives watchtower xonly public key bytes: {:?}", - xonly_pk - ); - let xonly_pk = XOnlyPublicKey::from_slice(&xonly_pk).map_err(|_| { - BridgeError::RPCParamMalformed( - "watchtower.xonly_pk".to_string(), - "Invalid xonly key".to_string(), - ) - })?; + let xonly_pk = parser::watchtower::parse_xonly_pk(&mut in_stream).await?; + tracing::info!("Verifier receives watchtower index: {:?}", watchtower_id); tracing::info!( "Verifier receives watchtower xonly public key: {:?}", xonly_pk ); - tracing::info!("Verifier doing this for watchtower: {:?}", watchtower_id); for operator_idx in 0..self.config.num_operators { let index = operator_idx * num_sequential_collateral_txs @@ -526,7 +338,6 @@ impl ClementineVerifier for Verifier { } #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))] - #[allow(clippy::blocks_in_conditions)] async fn nonce_gen( &self, req: Request, @@ -561,29 +372,21 @@ impl ClementineVerifier for Verifier { num_nonces: num_nonces as u32, }; - // now stream the nonces - let (tx, rx) = mpsc::channel(1280); + let (tx, rx) = mpsc::channel(pub_nonces.len() + 1); tokio::spawn(async move { // First send the session id - let response = NonceGenResponse { - response: Some(nonce_gen_response::Response::FirstResponse( - nonce_gen_first_response, - )), - }; - tx.send(Ok(response)).await?; + let session_id: NonceGenResponse = nonce_gen_first_response.into(); + tx.send(Ok(session_id)).await?; // Then send the public nonces for pub_nonce in &pub_nonces[..] { - let response = NonceGenResponse { - response: Some(nonce_gen_response::Response::PubNonce( - pub_nonce.serialize().to_vec(), - )), - }; - tx.send(Ok(response)).await?; + let pub_nonce: NonceGenResponse = pub_nonce.into(); + tx.send(Ok(pub_nonce)).await?; } Ok::<(), SendError<_>>(()) }); + Ok(Response::new(ReceiverStream::new(rx))) } @@ -592,25 +395,13 @@ impl ClementineVerifier for Verifier { req: Request>, ) -> Result, Status> { let mut in_stream = req.into_inner(); + let verifier = self.clone(); let (tx, rx) = mpsc::channel(1280); - let error_tx = tx.clone(); - tracing::info!("Received deposit sign request"); - - let verifier = self.clone(); - let handle = tokio::spawn(async move { - let first_message = in_stream - .message() - .await? - .ok_or(Status::internal("No first message received"))?; - - // Parse the first message - let params = first_message - .params - .ok_or(Status::internal("No deposit outpoint received"))?; + let params = fetch_next_message_from_stream!(in_stream, params)?; let ( deposit_outpoint, @@ -621,7 +412,7 @@ impl ClementineVerifier for Verifier { ) = match params { clementine::verifier_deposit_sign_params::Params::DepositSignFirstParam( deposit_sign_session, - ) => get_deposit_params(deposit_sign_session, verifier.idx)?, + ) => parser::verifier::parse_deposit_params(deposit_sign_session, verifier.idx)?, _ => return Err(Status::invalid_argument("Expected DepositOutpoint")), }; @@ -654,11 +445,10 @@ impl ClementineVerifier for Verifier { "Expected nonce count to be num_required_sigs + 1 (movetx)" ); - while let Some(result) = in_stream.message().await? { - let agg_nonce = match result - .params - .ok_or(Status::internal("No agg nonce received"))? - { + while let Some(result) = + fetch_next_optional_message_from_stream!(&mut in_stream, params) + { + let agg_nonce = match result { clementine::verifier_deposit_sign_params::Params::AggNonce(agg_nonce) => { MusigAggNonce::from_slice(agg_nonce.as_slice()).map_err(|e| { BridgeError::RPCParamMalformed("AggNonce".to_string(), e.to_string()) @@ -728,7 +518,6 @@ impl ClementineVerifier for Verifier { /// operator signatures. It will receive data from the stream in this order -> nofn sigs, movetx agg nonce, operator sigs. /// If everything is correct, it will partially sign the move tx and send it to aggregator. #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))] - #[allow(clippy::blocks_in_conditions)] async fn deposit_finalize( &self, req: Request>, @@ -736,19 +525,12 @@ impl ClementineVerifier for Verifier { use clementine::verifier_deposit_finalize_params::Params; let mut in_stream = req.into_inner(); - let first_message = in_stream - .message() - .await? - .ok_or(Status::internal("No first message received"))?; + let params = fetch_next_message_from_stream!(in_stream, params)?; - // Parse the first message let (deposit_outpoint, evm_address, recovery_taproot_address, user_takes_after, session_id) = - match first_message - .params - .ok_or(Status::internal("No deposit outpoint received"))? - { + match params { Params::DepositSignFirstParam(deposit_sign_session) => { - get_deposit_params(deposit_sign_session, self.idx)? + parser::verifier::parse_deposit_params(deposit_sign_session, self.idx)? } _ => Err(Status::internal("Expected DepositOutpoint"))?, }; @@ -773,7 +555,9 @@ impl ClementineVerifier for Verifier { let mut nonce_idx: usize = 0; - while let Some(result) = in_stream.message().await.map_err(expected_msg_got_error)? { + while let Some(sig) = + parser::verifier::parse_next_deposit_finalize_param_schnorr_sig(&mut in_stream).await? + { let sighash = sighash_stream .next() .await @@ -781,19 +565,9 @@ impl ClementineVerifier for Verifier { .map_err(Into::into) .map_err(sighash_stream_failed)?; - let final_sig = result - .params - .ok_or_else(expected_msg_got_none("FinalSig"))?; - - let final_sig = match final_sig { - Params::SchnorrSig(final_sig) => schnorr::Signature::from_slice(&final_sig) - .map_err(invalid_argument("FinalSig", "Invalid signature length"))?, - _ => return Err(Status::internal("Expected FinalSig")), - }; - tracing::debug!("Verifying Final Signature"); utils::SECP - .verify_schnorr(&final_sig, &Message::from(sighash), &self.nofn_xonly_pk) + .verify_schnorr(&sig, &Message::from(sighash), &self.nofn_xonly_pk) .map_err(|x| { Status::internal(format!( "Nofn Signature {} Verification Failed: {}.", @@ -802,7 +576,7 @@ impl ClementineVerifier for Verifier { )) })?; - verified_sigs.push(final_sig); + verified_sigs.push(sig); tracing::debug!("Final Signature Verified"); nonce_idx += 1; @@ -835,18 +609,8 @@ impl ClementineVerifier for Verifier { bitcoin::TapSighashType::Default, )?; - let agg_nonce = match in_stream - .message() - .await - .map_err(expected_msg_got_error)? - .ok_or_else(expected_msg_got_none("Params.MusigAggNonce"))? - .params - .ok_or_else(expected_msg_got_none("Params.MusigAggNonce"))? - { - Params::MoveTxAggNonce(aggnonce) => MusigAggNonce::from_slice(&aggnonce) - .map_err(invalid_argument("MusigAggNonce", "failed to parse"))?, - _ => Err(expected_msg_got_none("MusigAggNonce")())?, - }; + let agg_nonce = + parser::verifier::parse_deposit_finalize_param_agg_nonce(&mut in_stream).await?; let movetx_secnonce = { let mut session_map = self.nonces.lock().await; @@ -904,33 +668,17 @@ impl ClementineVerifier for Verifier { self.config.bridge_amount_sats, self.config.network, )); - while let Some(in_msg) = in_stream.message().await? { + while let Some(operator_sig) = + parser::verifier::parse_next_deposit_finalize_param_schnorr_sig(&mut in_stream) + .await? + { let sighash = sighash_stream .next() .await .ok_or_else(sighash_stream_ended_prematurely)??; - let operator_sig = in_msg - .params - .ok_or_else(expected_msg_got_none("Operator Signature"))?; - - let final_sig = match operator_sig { - Params::SchnorrSig(final_sig) => schnorr::Signature::from_slice(&final_sig) - .map_err(|_| { - BridgeError::RPCParamMalformed( - "Operator sig".to_string(), - "Invalid signature length".to_string(), - ) - })?, - _ => { - return Err(Status::internal(format!( - "Expected Operator Sig, got: {:?}", - operator_sig - ))) - } - }; utils::SECP - .verify_schnorr(&final_sig, &Message::from(sighash), &tweaked_op_xonly_pk) + .verify_schnorr(&operator_sig, &Message::from(sighash), &tweaked_op_xonly_pk) .map_err(|x| { Status::internal(format!( "Operator {} Signature {}: verification failed: {}.", @@ -940,7 +688,7 @@ impl ClementineVerifier for Verifier { )) })?; - op_deposit_sigs[operator_idx].push(final_sig); + op_deposit_sigs[operator_idx].push(operator_sig); op_sig_count += 1; total_op_sig_count += 1; @@ -958,14 +706,15 @@ impl ClementineVerifier for Verifier { } // sign move tx and save everything to db if everything is correct - let partial_sig = musig2::partial_sign( + let partial_sig: PartialSig = musig2::partial_sign( self.config.verifiers_public_keys.clone(), None, movetx_secnonce, agg_nonce, self.signer.keypair, Message::from_digest(move_tx_sighash.to_byte_array()), - )?; + )? + .into(); // Deposit is not actually finalized here, its only finalized after the aggregator gets all the partial sigs and checks the aggregated sig // TODO: It can create problems if the deposit fails at the end by some verifier not sending movetx partial sig, but we still added sigs to db @@ -975,9 +724,6 @@ impl ClementineVerifier for Verifier { .await?; } - tracing::info!("Deposit finalized, returning partial sig"); - Ok(Response::new(PartialSig { - partial_sig: partial_sig.serialize().to_vec(), - })) + Ok(Response::new(partial_sig)) } } diff --git a/core/src/rpc/watchtower.rs b/core/src/rpc/watchtower.rs index 274e17bf..dc7d7e55 100644 --- a/core/src/rpc/watchtower.rs +++ b/core/src/rpc/watchtower.rs @@ -1,6 +1,5 @@ use super::clementine::{ clementine_watchtower_server::ClementineWatchtower, watchtower_params, Empty, WatchtowerParams, - WinternitzPubkey, }; use crate::watchtower::Watchtower; use tokio::sync::mpsc::{self, error::SendError}; @@ -12,20 +11,17 @@ impl ClementineWatchtower for Watchtower { type GetParamsStream = ReceiverStream>; #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))] - #[allow(clippy::blocks_in_conditions)] async fn get_params( &self, _request: Request, ) -> Result, Status> { - let winternitz_pubkeys = self - .get_watchtower_winternitz_public_keys() - .await? - .into_iter() - .map(From::from) - .collect::>(); let watchtower = self.clone(); + let watchtower_winternitz_public_keys = + watchtower.get_watchtower_winternitz_public_keys().await?; + + let (tx, rx) = mpsc::channel(watchtower_winternitz_public_keys.len() + 2); + let out_stream: Self::GetParamsStream = ReceiverStream::new(rx); - let (tx, rx) = mpsc::channel(1280); tokio::spawn(async move { tx.send(Ok(WatchtowerParams { response: Some(watchtower_params::Response::WatchtowerId( @@ -34,37 +30,23 @@ impl ClementineWatchtower for Watchtower { })) .await?; - for wpk in winternitz_pubkeys { - tx.send(Ok(WatchtowerParams { - response: Some(watchtower_params::Response::WinternitzPubkeys(wpk)), - })) - .await?; + for wpk in watchtower_winternitz_public_keys { + let wpk: WatchtowerParams = wpk.into(); + tx.send(Ok(wpk)).await?; } tracing::info!( - "Watchtower gives watchtower xonly public key: {:?}", - watchtower.actor.xonly_public_key - ); - tracing::info!( - "Watchtower gives watchtower index: {:?}", + "Watchtower gives watchtower xonly public key {:?} for index {}", + watchtower.actor.xonly_public_key, watchtower.config.index ); - let xonly_pk = watchtower.actor.xonly_public_key.serialize().to_vec(); - tracing::info!( - "Watchtower gives watchtower xonly public key bytes: {:?}", - xonly_pk - ); - - tx.send(Ok(WatchtowerParams { - response: Some(watchtower_params::Response::XonlyPk(xonly_pk)), - })) - .await?; + let xonly_pk: WatchtowerParams = watchtower.actor.xonly_public_key.into(); + tx.send(Ok(xonly_pk)).await?; Ok::<(), SendError<_>>(()) }); - let out_stream: Self::GetParamsStream = ReceiverStream::new(rx); Ok(Response::new(out_stream)) } }