Skip to content

Commit

Permalink
Add macros/parsers to RPC module (#495)
Browse files Browse the repository at this point in the history
* rpc: Move verifier parse_deposit_params to parsers.

* rpc-parsers: Add convert_int_to_another.

* rpc-parser: Add parse_partial_sigs.

* rpc-parser: Use internal error type in convert_int_to_another.

* rpc-wrapper: Add tryfrom for VerifierPublicKeys.

* rpc-parser: Add parse_operator_config.

* rpc-parsers: Add parse_operator_challenge_ack_public_hash.

* rpc-wrapper: Add watchtower parsers.

* rpc-parsers: Add fetch_next_from_stream.

* rpc-parsers: Add 2 field related fields to fetch_next_from_stream.

* rpc-parsers: Add parse_next_deposit_finalize_param_schnorr_sig.

* rpc-parsers: Add parse_nonce_gen_first_response.

* rpc-parsers: Add parse_deposit_finalize_param_agg_nonce.

* rpc: Rename parsers in to parser.

* rpc-parser: Add comment for fetch_next_message_from_stream.

* rpc-parser: Fix compilation error in doc comment for fetch_next_message_from_stream by changing no_run to text.

* rpc-parser: Add prefixes to parsers and simplify namings of some.

* rpc-parser: Add from trait impls for operator.

* rpc-parser: Move wrapper in to parser.

* rpc-wrapper: Add from impls for watchtower.

* rpc-parser: Add from impls for aggregator.

* rpc-parser: Add from impls for verifier.

* rpc-parser: Add the new fetch_next_optional_message_from_stream macro.
  • Loading branch information
ceyhunsen authored Feb 4, 2025
1 parent 249c31f commit da079a8
Show file tree
Hide file tree
Showing 12 changed files with 747 additions and 562 deletions.
1 change: 1 addition & 0 deletions core/src/database/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ impl Database {
.collect())
}

/// TODO: wallet_address should have `Address` type.
pub async fn set_operator(
&self,
tx: Option<DatabaseTransaction<'_, '_>>,
Expand Down
165 changes: 59 additions & 106 deletions core/src/rpc/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ use crate::builder::transaction::create_move_to_vault_txhandler;
use crate::config::BridgeConfig;
use crate::rpc::clementine::clementine_operator_client::ClementineOperatorClient;
use crate::rpc::clementine::clementine_verifier_client::ClementineVerifierClient;
use crate::rpc::clementine::VerifierDepositSignParams;
use crate::rpc::error::output_stream_ended_prematurely;
use crate::rpc::parsers;
use crate::rpc::parser;
use crate::{
aggregator::Aggregator,
builder::sighash::{
Expand Down Expand Up @@ -235,30 +236,10 @@ async fn create_nonce_streams(
}))
.await?;

// Get the first responses.
// Get the first responses from verifiers.
let first_responses: Vec<clementine::NonceGenFirstResponse> =
try_join_all(nonce_streams.iter_mut().map(|stream| async {
let nonce_gen_first_response = stream
.message()
.await?
.ok_or(BridgeError::RPCStreamEndedUnexpectedly(
"NonceGen returns nothing".to_string(),
))?
.response
.ok_or(BridgeError::RPCStreamEndedUnexpectedly(
"NonceGen response field is empty".to_string(),
))?;

if let clementine::nonce_gen_response::Response::FirstResponse(
nonce_gen_first_response,
) = nonce_gen_first_response
{
Ok(nonce_gen_first_response)
} else {
Err(BridgeError::RPCInvalidResponse(
"NonceGen response is not FirstResponse".to_string(),
))
}
parser::verifier::parse_nonce_gen_first_response(stream).await
}))
.await?;

Expand Down Expand Up @@ -342,18 +323,8 @@ impl Aggregator {
deposit_params: DepositParams,
) -> Result<RawSignedMoveTx, Status> {
let (deposit_outpoint, evm_address, recovery_taproot_address, user_takes_after) =
parsers::parse_deposit_params(deposit_params)?;
let musig_partial_sigs: Vec<MusigPartialSignature> = partial_sigs
.iter()
.map(|sig: &Vec<u8>| MusigPartialSignature::from_slice(sig))
.collect::<Result<Vec<_>, _>>()
.map_err(|e| {
BridgeError::RPCParamMalformed(
"Partial sigs for movetx could not be parsed into MusigPartialSignature"
.to_string(),
e.to_string(),
)
})?;
parser::parse_deposit_params(deposit_params)?;
let musig_partial_sigs = parser::verifier::parse_partial_sigs(partial_sigs)?;

// create move tx and calculate sighash
let mut move_txhandler = create_move_to_vault_txhandler(
Expand Down Expand Up @@ -392,41 +363,47 @@ impl Aggregator {
impl ClementineAggregator for Aggregator {
#[tracing::instrument(skip_all, err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
async fn setup(&self, _request: Request<Empty>) -> Result<Response<Empty>, Status> {
tracing::info!("Collecting verifier details...");
let verifier_params = try_join_all(self.verifier_clients.iter().map(|client| {
tracing::info!("Collecting verifier public keys...");
let verifier_public_keys = try_join_all(self.verifier_clients.iter().map(|client| {
let mut client = client.clone();

async move {
let response = client.get_params(Request::new(Empty {})).await?;
Ok::<_, Status>(response.into_inner())
let verifier_params = client
.get_params(Request::new(Empty {}))
.await?
.into_inner();
let verifier_public_key = verifier_params.public_key;

Ok::<_, Status>(verifier_public_key)
}
}))
.await?;
let verifier_public_keys: Vec<Vec<u8>> =
verifier_params.into_iter().map(|p| p.public_key).collect();
tracing::debug!("Verifier public keys: {:?}", verifier_public_keys);

tracing::info!("Setting up verifiers...");
try_join_all(self.verifier_clients.iter().map(|client| {
let mut client = client.clone();
{
let verifier_public_keys = clementine::VerifierPublicKeys {
verifier_public_keys: verifier_public_keys.clone(),
};
async move {
let response = client
.set_verifiers(Request::new(verifier_public_keys))
.await?;
Ok::<_, Status>(response.into_inner())
}
let verifier_public_keys = clementine::VerifierPublicKeys {
verifier_public_keys: verifier_public_keys.clone(),
};

async move {
client
.set_verifiers(Request::new(verifier_public_keys))
.await?;

Ok::<_, Status>(())
}
}))
.await?;

tracing::info!("Collecting operator details...");
let operator_params = try_join_all(self.operator_clients.iter().map(|client| {
let mut client = client.clone();

async move {
let mut responses = Vec::new();

let mut params_stream = client
.get_params(Request::new(Empty {}))
.await?
Expand All @@ -440,24 +417,24 @@ impl ClementineAggregator for Aggregator {
}))
.await?;

tracing::info!("Informing verifiers for existing operators...");
tracing::info!("Informing verifiers about existing operators...");
try_join_all(self.verifier_clients.iter().map(|client| {
let mut client = client.clone();
let operator_params = operator_params.clone();

async move {
for params in operator_params {
let (tx, rx) = tokio::sync::mpsc::channel(1280);
let future =
client.set_operator(tokio_stream::wrappers::ReceiverStream::new(rx));
let (tx, rx) = tokio::sync::mpsc::channel(params.len());

for param in params {
tx.send(param)
.await
.map_err(|_| output_stream_ended_prematurely())?;
}

future.await?; // TODO: This is dangerous: If channel size becomes not sufficient, this will block forever.
client
.set_operator(tokio_stream::wrappers::ReceiverStream::new(rx))
.await?;
}

Ok::<_, tonic::Status>(())
Expand All @@ -468,8 +445,10 @@ impl ClementineAggregator for Aggregator {
tracing::info!("Collecting Winternitz public keys from watchtowers...");
let watchtower_params = try_join_all(self.watchtower_clients.iter().map(|client| {
let mut client = client.clone();

async move {
let mut responses = Vec::new();

let mut params_stream = client
.get_params(Request::new(Empty {}))
.await?
Expand All @@ -490,17 +469,17 @@ impl ClementineAggregator for Aggregator {

async move {
for params in watchtower_params {
let (tx, rx) = tokio::sync::mpsc::channel(1280);
let (tx, rx) = tokio::sync::mpsc::channel(params.len());

let future =
client.set_watchtower(tokio_stream::wrappers::ReceiverStream::new(rx));
for param in params {
tx.send(param)
.await
.map_err(|_| output_stream_ended_prematurely())?;
}

future.await?; // TODO: This is dangerous: If channel size becomes not sufficient, this will block forever.
client
.set_watchtower(tokio_stream::wrappers::ReceiverStream::new(rx))
.await?;
}

Ok::<_, tonic::Status>(())
Expand Down Expand Up @@ -533,25 +512,19 @@ impl ClementineAggregator for Aggregator {
#[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
async fn new_deposit(
&self,
deposit_params_req: Request<DepositParams>,
request: Request<DepositParams>,
) -> Result<Response<RawSignedMoveTx>, Status> {
tracing::info!("Received new deposit request: {:?}", deposit_params_req);
let deposit_params = request.into_inner();

// Extract and validate deposit parameters
let deposit_params = deposit_params_req.get_ref().clone();
let (deposit_outpoint, evm_address, recovery_taproot_address, user_takes_after) =
parsers::parse_deposit_params(deposit_params.clone())?;
let verifiers_public_keys = self.config.verifiers_public_keys.clone();

tracing::debug!("Parsed deposit params");
parser::parse_deposit_params(deposit_params.clone())?;

// Generate nonce streams for all verifiers.
let num_required_sigs = calculate_num_required_nofn_sigs(&self.config);
let (first_responses, nonce_streams) =
create_nonce_streams(self.verifier_clients.clone(), num_required_sigs as u32 + 1)
.await?; // ask for +1 for the final movetx signature, but don't send it on deposit_sign stage

// Create deposit signing streams with each verifier
let mut partial_sig_streams =
try_join_all(self.verifier_clients.iter().map(|verifier_client| {
let mut verifier_client = verifier_client.clone();
Expand All @@ -560,34 +533,25 @@ impl ClementineAggregator for Aggregator {
let (tx, rx) = tokio::sync::mpsc::channel(1280);
let stream = verifier_client
.deposit_sign(tokio_stream::wrappers::ReceiverStream::new(rx))
.await?;
.await?
.into_inner();

Ok::<_, Status>((stream.into_inner(), tx))
Ok::<_, Status>((stream, tx))
}
}))
.await?;

tracing::debug!("Generated partial sig streams");

// Create initial deposit session and send to verifiers
let deposit_sign_session = DepositSignSession {
deposit_params: Some(deposit_params_req.into_inner()),
deposit_params: Some(deposit_params.clone()),
nonce_gen_first_responses: first_responses,
};

tracing::debug!("Sending deposit sign session to verifiers");

// Send deposit session to each verifier
for (_, tx) in partial_sig_streams.iter_mut() {
tx.send(clementine::VerifierDepositSignParams {
params: Some(
clementine::verifier_deposit_sign_params::Params::DepositSignFirstParam(
deposit_sign_session.clone(),
),
),
})
.await
.map_err(|e| {
let deposit_sign_param: VerifierDepositSignParams = deposit_sign_session.clone().into();

tx.send(deposit_sign_param).await.map_err(|e| {
Status::internal(format!("Failed to send deposit sign session: {:?}", e))
})?;
}
Expand All @@ -610,14 +574,8 @@ impl ClementineAggregator for Aggregator {
deposit_finalize_streams.into_iter().unzip();

// Send initial finalization params
let deposit_finalize_first_param = clementine::VerifierDepositFinalizeParams {
params: Some(
clementine::verifier_deposit_finalize_params::Params::DepositSignFirstParam(
deposit_sign_session.clone(),
),
),
};

let deposit_finalize_first_param: VerifierDepositFinalizeParams =
deposit_sign_session.clone().into();
for tx in deposit_finalize_sender.iter() {
tx.send(deposit_finalize_first_param.clone())
.await
Expand All @@ -630,7 +588,7 @@ impl ClementineAggregator for Aggregator {
}

// Create sighash stream for transaction signing
let sighash_stream = create_nofn_sighash_stream(
let sighash_stream = Box::pin(create_nofn_sighash_stream(
self.db.clone(),
self.config.clone(),
deposit_outpoint,
Expand All @@ -643,8 +601,7 @@ impl ClementineAggregator for Aggregator {
100,
self.config.bridge_amount_sats,
self.config.network,
);
let sighash_stream = Box::pin(sighash_stream);
));

// Create channels for pipeline communication
let (agg_nonce_sender, agg_nonce_receiver) = channel(32);
Expand All @@ -668,7 +625,7 @@ impl ClementineAggregator for Aggregator {
// Start the signature aggregation pipe.
let sig_agg_handle = tokio::spawn(signature_aggregator(
partial_sig_receiver,
verifiers_public_keys,
self.config.verifiers_public_keys.clone(),
final_sig_sender,
));

Expand Down Expand Up @@ -715,20 +672,17 @@ impl ClementineAggregator for Aggregator {
.map(|tx| async {
for sigs in operator_sigs.iter() {
for sig in sigs.iter() {
tx.send(VerifierDepositFinalizeParams {
params: Some(verifier_deposit_finalize_params::Params::SchnorrSig(
sig.serialize().to_vec(),
)),
})
.await
.map_err(|e| {
let deposit_finalize_param: VerifierDepositFinalizeParams = sig.into();

tx.send(deposit_finalize_param).await.map_err(|e| {
BridgeError::RPCStreamEndedUnexpectedly(format!(
"Can't send operator sigs to verifier: {}",
e
))
})?;
}
}

Ok::<(), BridgeError>(())
})
.collect();
Expand Down Expand Up @@ -758,8 +712,6 @@ impl ClementineAggregator for Aggregator {

#[cfg(test)]
mod tests {
use bitcoin::Txid;

use crate::{
config::BridgeConfig,
create_test_config_with_thread_name,
Expand All @@ -780,6 +732,7 @@ mod tests {
verifier::Verifier,
watchtower::Watchtower,
};
use bitcoin::Txid;
use std::{env, str::FromStr, thread};

#[tokio::test]
Expand Down
3 changes: 1 addition & 2 deletions core/src/rpc/error.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use std::fmt::Display;

use tonic::Status;

pub(crate) fn expected_msg_got_error(msg: Status) -> Status {
pub(crate) fn _expected_msg_got_error(msg: Status) -> Status {
Status::invalid_argument(format!("Expected message, got error: {msg}"))
}

Expand Down
3 changes: 1 addition & 2 deletions core/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ pub mod clementine;
pub mod aggregator;
mod error;
pub mod operator;
mod parsers;
mod parser;
pub mod verifier;
pub mod watchtower;
mod wrapper;

/// Returns gRPC clients.
///
Expand Down
Loading

0 comments on commit da079a8

Please sign in to comment.