diff --git a/app/src/lib.rs b/app/src/lib.rs index a35f02d4..fbbd6e2a 100644 --- a/app/src/lib.rs +++ b/app/src/lib.rs @@ -11,6 +11,7 @@ use fetch_git_hash::fetch_git_hash; use bitcoin::{Network, bip32}; use lightning_invoice::{Bolt11Invoice, ParseOrSemanticError}; use std::{str::FromStr, time::UNIX_EPOCH}; +use tokio::sync::Mutex; use cdk_common::SECP256K1; use chrono::Duration; @@ -23,7 +24,7 @@ use portal::{ KeyHandshakeConversation, }, payments::{ - PaymentRequestContent, PaymentRequestEvent, PaymentRequestListenerConversation, + PaymentRequestContent, PaymentRequestListenerConversation, PaymentStatusSenderConversation, RecurringPaymentStatusSenderConversation, }, }, @@ -48,9 +49,9 @@ use portal::{ payment::{ CashuDirectContentWithKey, CashuRequestContentWithKey, CashuResponseContent, CashuResponseStatus, CloseRecurringPaymentContent, CloseRecurringPaymentResponse, - InvoiceRequestContent, InvoiceRequestContentWithKey, InvoiceResponse, - PaymentResponseContent, RecurringPaymentRequestContent, - RecurringPaymentResponseContent, SinglePaymentRequestContent, + InvoiceRequestContent, InvoiceResponse, PaymentResponseContent, + RecurringPaymentRequestContent, RecurringPaymentResponseContent, + SinglePaymentRequestContent, }, }, }, @@ -173,7 +174,6 @@ impl From for MnemonicError { } } - #[derive(uniffi::Object)] pub struct Nsec { keys: portal::nostr::Keys, @@ -195,10 +195,10 @@ impl Nsec { } pub fn derive_cashu(&self) -> Vec { - use bitcoin::hashes::sha256; use bitcoin::hashes::Hash; use bitcoin::hashes::HashEngine; - + use bitcoin::hashes::sha256; + let mut engine = sha256::HashEngine::default(); engine.input(&self.keys.secret_key().secret_bytes()); engine.input("cashu".as_bytes()); @@ -214,7 +214,6 @@ pub struct Keypair { #[uniffi::export] impl Keypair { - pub fn public_key(&self) -> portal::protocol::model::bindings::PublicKey { portal::protocol::model::bindings::PublicKey(self.inner.public_key()) } @@ -268,6 +267,15 @@ pub struct PortalApp { router: Arc>>, relay_pool: Arc, runtime: Arc, + + auth_challenge_rx: Mutex>, + payment_request_rx: Mutex>, + closed_recurring_payment_rx: + Mutex>, + invoice_request_rx: + Mutex>, + cashu_request_rx: Mutex>, + cashu_direct_rx: Mutex>, } #[derive(uniffi::Record, Debug)] pub struct Bolt11InvoiceData { @@ -339,85 +347,6 @@ pub enum CallbackError { Error(String), } -#[uniffi::export(with_foreign)] -#[async_trait::async_trait] -pub trait AuthChallengeListener: Send + Sync { - async fn on_auth_challenge( - &self, - event: AuthChallengeEvent, - ) -> Result; -} - -#[uniffi::export(with_foreign)] -#[async_trait::async_trait] -pub trait PaymentStatusNotifier: Send + Sync { - async fn notify(&self, status: PaymentResponseContent) -> Result<(), CallbackError>; -} - -struct LocalStatusNotifier { - router: Arc>>, - request: PaymentRequestEvent, -} - -#[async_trait::async_trait] -impl PaymentStatusNotifier for LocalStatusNotifier { - async fn notify(&self, status: PaymentResponseContent) -> Result<(), CallbackError> { - let conv = PaymentStatusSenderConversation::new( - self.request.service_key.into(), - self.request.recipient.into(), - status, - ); - self.router - .add_conversation(Box::new(OneShotSenderAdapter::new_with_user( - self.request.recipient.into(), - vec![], - conv, - ))) - .await - .map_err(|e| CallbackError::Error(e.to_string()))?; - - Ok(()) - } -} - -#[uniffi::export(with_foreign)] -#[async_trait::async_trait] -pub trait PaymentRequestListener: Send + Sync { - async fn on_single_payment_request( - &self, - event: SinglePaymentRequest, - notifier: Arc, - ) -> Result<(), CallbackError>; - async fn on_recurring_payment_request( - &self, - event: RecurringPaymentRequest, - ) -> Result; -} - -#[uniffi::export(with_foreign)] -#[async_trait::async_trait] -pub trait ClosedRecurringPaymentListener: Send + Sync { - async fn on_closed_recurring_payment( - &self, - event: CloseRecurringPaymentResponse, - ) -> Result<(), CallbackError>; -} - -#[uniffi::export(with_foreign)] -#[async_trait::async_trait] -pub trait InvoiceRequestListener: Send + Sync { - async fn on_invoice_requests( - &self, - event: InvoiceRequestContentWithKey, - ) -> Result; -} - -#[uniffi::export(with_foreign)] -#[async_trait::async_trait] -pub trait InvoiceResponseListener: Send + Sync { - async fn on_invoice_response(&self, event: InvoiceResponse) -> Result<(), CallbackError>; -} - #[uniffi::export(with_foreign)] #[async_trait::async_trait] pub trait RelayStatusListener: Send + Sync { @@ -428,21 +357,6 @@ pub trait RelayStatusListener: Send + Sync { ) -> Result<(), CallbackError>; } -#[uniffi::export(with_foreign)] -#[async_trait::async_trait] -pub trait CashuRequestListener: Send + Sync { - async fn on_cashu_request( - &self, - event: CashuRequestContentWithKey, - ) -> Result; -} - -#[uniffi::export(with_foreign)] -#[async_trait::async_trait] -pub trait CashuDirectListener: Send + Sync { - async fn on_cashu_direct(&self, event: CashuDirectContentWithKey) -> Result<(), CallbackError>; -} - #[uniffi::export] impl PortalApp { #[uniffi::constructor] @@ -495,10 +409,59 @@ impl PortalApp { router.add_relay(relay.clone(), false).await?; } + let auth_challenge_rx: NotificationStream = router + .add_and_subscribe(Box::new(MultiKeyListenerAdapter::new( + AuthChallengeListenerConversation::new(router.keypair().public_key()), + router.keypair().subkey_proof().cloned(), + ))) + .await?; + let payment_request_rx: NotificationStream = + router + .add_and_subscribe(Box::new(MultiKeyListenerAdapter::new( + PaymentRequestListenerConversation::new(router.keypair().public_key()), + router.keypair().subkey_proof().cloned(), + ))) + .await?; + let closed_recurring_payment_rx: NotificationStream< + portal::protocol::model::payment::CloseRecurringPaymentResponse, + > = router + .add_and_subscribe(Box::new(MultiKeyListenerAdapter::new( + CloseRecurringPaymentReceiverConversation::new(router.keypair().public_key()), + router.keypair().subkey_proof().cloned(), + ))) + .await?; + let invoice_request_rx: NotificationStream< + portal::protocol::model::payment::InvoiceRequestContentWithKey, + > = router + .add_and_subscribe(Box::new(MultiKeyListenerAdapter::new( + InvoiceReceiverConversation::new(router.keypair().public_key()), + router.keypair().subkey_proof().cloned(), + ))) + .await?; + let cashu_request_rx: NotificationStream = router + .add_and_subscribe(Box::new(MultiKeyListenerAdapter::new( + CashuRequestReceiverConversation::new(router.keypair().public_key()), + router.keypair().subkey_proof().cloned(), + ))) + .await?; + let cashu_direct_rx: NotificationStream = router + .add_and_subscribe(Box::new(MultiKeyListenerAdapter::new( + CashuDirectReceiverConversation::new(router.keypair().public_key()), + router.keypair().subkey_proof().cloned(), + ))) + .await?; + Ok(Arc::new(Self { router, relay_pool, runtime, + + auth_challenge_rx: Mutex::new(auth_challenge_rx), + payment_request_rx: Mutex::new(payment_request_rx), + closed_recurring_payment_rx: Mutex::new(closed_recurring_payment_rx), + invoice_request_rx: Mutex::new(invoice_request_rx), + cashu_request_rx: Mutex::new(cashu_request_rx), + cashu_direct_rx: Mutex::new(cashu_direct_rx), })) } @@ -606,109 +569,116 @@ impl PortalApp { Ok(()) } - pub async fn listen_for_auth_challenge( + pub async fn next_auth_challenge(&self) -> Result { + let auth_challenge = self + .auth_challenge_rx + .lock() + .await + .next() + .await + .ok_or(AppError::ListenerDisconnected)? + .map_err(|e| AppError::ParseError(e.to_string()))?; + log::debug!("Received auth challenge: {:?}", auth_challenge); + Ok(auth_challenge) + } + + pub async fn reply_auth_challenge( &self, - evt: Arc, + event: AuthChallengeEvent, + status: AuthResponseStatus, ) -> Result<(), AppError> { - let inner = AuthChallengeListenerConversation::new(self.router.keypair().public_key()); - let mut rx: NotificationStream = self - .router - .add_and_subscribe(Box::new(MultiKeyListenerAdapter::new( - inner, - self.router.keypair().subkey_proof().cloned(), + let recipient = event.recipient.clone(); + + let conv = AuthResponseConversation::new( + event, + self.router.keypair().subkey_proof().cloned(), + status, + ); + self.router + .add_conversation(Box::new(OneShotSenderAdapter::new_with_user( + recipient.into(), + vec![], + conv, ))) .await?; - while let Ok(response) = rx.next().await.ok_or(AppError::ListenerDisconnected)? { - let evt = Arc::clone(&evt); - let router = Arc::clone(&self.router); - - let _ = self.runtime.add_task(async move { - log::debug!("Received auth challenge: {:?}", response); - - let status = evt.on_auth_challenge(response.clone()).await?; - log::debug!("Auth challenge callback result: {:?}", status); + Ok(()) + } - let conv = AuthResponseConversation::new( - response.clone(), - router.keypair().subkey_proof().cloned(), - status, - ); - router - .add_conversation(Box::new(OneShotSenderAdapter::new_with_user( - response.recipient.into(), - vec![], - conv, - ))) - .await?; - - Ok::<(), AppError>(()) - }); + pub async fn next_payment_request(&self) -> Result { + let request = self + .payment_request_rx + .lock() + .await + .next() + .await + .ok_or(AppError::ListenerDisconnected)?; + let request = request.map_err(|e| AppError::ParseError(e.to_string()))?; + + log::debug!("Received payment request: {:?}", request); + + match &request.content { + PaymentRequestContent::Single(content) => { + Ok(IncomingPaymentRequest::Single(SinglePaymentRequest { + service_key: request.service_key.clone(), + recipient: request.recipient.clone(), + expires_at: request.expires_at, + content: content.clone(), + event_id: request.event_id.clone(), + })) + } + PaymentRequestContent::Recurring(content) => { + Ok(IncomingPaymentRequest::Recurring(RecurringPaymentRequest { + service_key: request.service_key.clone(), + recipient: request.recipient.clone(), + expires_at: request.expires_at, + content: content.clone(), + event_id: request.event_id.clone(), + })) + } } - - Ok(()) } - pub async fn listen_for_payment_request( + pub async fn reply_single_payment_request( &self, - evt: Arc, + request: SinglePaymentRequest, + status: PaymentResponseContent, ) -> Result<(), AppError> { - let inner = PaymentRequestListenerConversation::new(self.router.keypair().public_key()); - let mut rx: NotificationStream = self - .router - .add_and_subscribe(Box::new(MultiKeyListenerAdapter::new( - inner, - self.router.keypair().subkey_proof().cloned(), + let conv = PaymentStatusSenderConversation::new( + request.service_key.clone().into(), + request.recipient.clone().into(), + status, + ); + let recipient = request.recipient.into(); + self.router + .add_conversation(Box::new(OneShotSenderAdapter::new_with_user( + recipient, + vec![], + conv, ))) .await?; - while let Ok(request) = rx.next().await.ok_or(AppError::ListenerDisconnected)? { - let evt = Arc::clone(&evt); - let router = Arc::clone(&self.router); - - let _ = self.runtime.add_task(async move { - match &request.content { - PaymentRequestContent::Single(content) => { - let req = SinglePaymentRequest { - service_key: request.service_key, - recipient: request.recipient, - expires_at: request.expires_at, - content: content.clone(), - event_id: request.event_id.clone(), - }; - evt.on_single_payment_request( - req, - Arc::new(LocalStatusNotifier { router, request }), - ) - .await?; - } - PaymentRequestContent::Recurring(content) => { - let req = RecurringPaymentRequest { - service_key: request.service_key, - recipient: request.recipient, - expires_at: request.expires_at, - content: content.clone(), - event_id: request.event_id.clone(), - }; - let status = evt.on_recurring_payment_request(req).await?; - let conv = RecurringPaymentStatusSenderConversation::new( - request.service_key.into(), - request.recipient.into(), - status, - ); - router - .add_conversation(Box::new(OneShotSenderAdapter::new_with_user( - request.recipient.into(), - vec![], - conv, - ))) - .await?; - } - } + Ok(()) + } - Ok::<(), AppError>(()) - }); - } + pub async fn reply_recurring_payment_request( + &self, + request: RecurringPaymentRequest, + status: RecurringPaymentResponseContent, + ) -> Result<(), AppError> { + let conv = RecurringPaymentStatusSenderConversation::new( + request.service_key.clone().into(), + request.recipient.clone().into(), + status, + ); + let recipient = request.recipient.into(); + self.router + .add_conversation(Box::new(OneShotSenderAdapter::new_with_user( + recipient, + vec![], + conv, + ))) + .await?; Ok(()) } @@ -793,34 +763,19 @@ impl PortalApp { Ok(()) } - pub async fn listen_closed_recurring_payment( + pub async fn next_closed_recurring_payment( &self, - evt: Arc, - ) -> Result<(), AppError> { - let inner = - CloseRecurringPaymentReceiverConversation::new(self.router.keypair().public_key()); - let mut rx: NotificationStream< - portal::protocol::model::payment::CloseRecurringPaymentResponse, - > = self - .router - .add_and_subscribe(Box::new(MultiKeyListenerAdapter::new( - inner, - self.router.keypair().subkey_proof().cloned(), - ))) - .await?; - - while let Ok(response) = rx.next().await.ok_or(AppError::ListenerDisconnected)? { - let evt = Arc::clone(&evt); - - let _ = self.runtime.add_task(async move { - log::debug!("Received closed recurring payment: {:?}", response); - - let _ = evt.on_closed_recurring_payment(response).await?; - - Ok::<(), AppError>(()) - }); - } - Ok(()) + ) -> Result { + let response = self + .closed_recurring_payment_rx + .lock() + .await + .next() + .await + .ok_or(AppError::ListenerDisconnected)?; + let response = response.map_err(|e| AppError::ParseError(e.to_string()))?; + log::debug!("Received closed recurring payment: {:?}", response); + Ok(response) } pub async fn add_relay(&self, url: String) -> Result<(), AppError> { @@ -852,51 +807,42 @@ impl PortalApp { Ok(()) } - pub async fn listen_invoice_requests( + pub async fn next_invoice_request( &self, - evt: Arc, - ) -> Result<(), AppError> { - let inner = InvoiceReceiverConversation::new(self.router.keypair().public_key()); - let mut rx: NotificationStream< - portal::protocol::model::payment::InvoiceRequestContentWithKey, - > = self - .router - .add_and_subscribe(Box::new(MultiKeyListenerAdapter::new( - inner, - self.router.keypair().subkey_proof().cloned(), - ))) - .await?; - - while let Ok(request) = rx.next().await.ok_or(AppError::ListenerDisconnected)? { - log::debug!("Received invoice request payment: {:?}", request); - - let recipient: nostr::key::PublicKey = request.recipient.into(); - - let evt = Arc::clone(&evt); - let router = Arc::clone(&self.router); - - let _ = self.runtime.add_task(async move { - let invoice = evt.on_invoice_requests(request.clone()).await?; - - let invoice_response = InvoiceResponse { - request: request, - invoice: invoice.invoice, - payment_hash: invoice.payment_hash, - }; + ) -> Result { + let request = self + .invoice_request_rx + .lock() + .await + .next() + .await + .ok_or(AppError::ListenerDisconnected)?; + let request = request.map_err(|e| AppError::ParseError(e.to_string()))?; + log::debug!("Received invoice request payment: {:?}", request); + Ok(request) + } - let conv = InvoiceSenderConversation::new(invoice_response); + pub async fn reply_invoice_request( + &self, + request: portal::protocol::model::payment::InvoiceRequestContentWithKey, + invoice: MakeInvoiceResponse, + ) -> Result<(), AppError> { + let recipient = request.recipient.clone().into(); + let invoice_response = InvoiceResponse { + request, + invoice: invoice.invoice, + payment_hash: invoice.payment_hash, + }; - router - .add_conversation(Box::new(OneShotSenderAdapter::new_with_user( - recipient.to_owned(), - vec![], - conv, - ))) - .await?; + let conv = InvoiceSenderConversation::new(invoice_response); - Ok::<(), AppError>(()) - }); - } + self.router + .add_conversation(Box::new(OneShotSenderAdapter::new_with_user( + recipient, + vec![], + conv, + ))) + .await?; Ok(()) } @@ -914,8 +860,7 @@ impl PortalApp { &self, recipient: PublicKey, content: InvoiceRequestContent, - evt: Arc, - ) -> Result<(), AppError> { + ) -> Result, AppError> { let conv = InvoiceRequestConversation::new( self.router.keypair().public_key(), self.router.keypair().subkey_proof().cloned(), @@ -931,71 +876,54 @@ impl PortalApp { .await?; if let Ok(invoice_response) = rx.next().await.ok_or(AppError::ListenerDisconnected)? { - let _ = evt.on_invoice_response(invoice_response.clone()).await?; + return Ok(Some(invoice_response)); } - Ok(()) + Ok(None) } - pub async fn listen_cashu_requests( - &self, - evt: Arc, - ) -> Result<(), AppError> { - let inner = CashuRequestReceiverConversation::new(self.router.keypair().public_key()); - let mut rx: NotificationStream = self - .router - .add_and_subscribe(Box::new(MultiKeyListenerAdapter::new( - inner, - self.router.keypair().subkey_proof().cloned(), - ))) - .await?; - - while let Ok(request) = rx.next().await.ok_or(AppError::ListenerDisconnected)? { - let evt = Arc::clone(&evt); - let router = Arc::clone(&self.router); - let _ = self.runtime.add_task(async move { - let status = evt.on_cashu_request(request.clone()).await?; - - let recipient = request.recipient.into(); - let response = CashuResponseContent { - request: request, - status: status, - }; - let conv = CashuResponseSenderConversation::new(response); - router - .add_conversation(Box::new(OneShotSenderAdapter::new_with_user( - recipient, - vec![], - conv, - ))) - .await?; - Ok::<(), AppError>(()) - }); - } - Ok(()) + pub async fn next_cashu_request(&self) -> Result { + let request = self + .cashu_request_rx + .lock() + .await + .next() + .await + .ok_or(AppError::ListenerDisconnected)?; + let request = request.map_err(|e| AppError::ParseError(e.to_string()))?; + log::debug!("Received cashu request: {:?}", request); + Ok(request) } - pub async fn listen_cashu_direct( + pub async fn reply_cashu_request( &self, - evt: Arc, + request: CashuRequestContentWithKey, + status: CashuResponseStatus, ) -> Result<(), AppError> { - let inner = CashuDirectReceiverConversation::new(self.router.keypair().public_key()); - let mut rx: NotificationStream = self - .router - .add_and_subscribe(Box::new(MultiKeyListenerAdapter::new( - inner, - self.router.keypair().subkey_proof().cloned(), + let recipient = request.recipient.clone().into(); + let response = CashuResponseContent { request, status }; + let conv = CashuResponseSenderConversation::new(response); + self.router + .add_conversation(Box::new(OneShotSenderAdapter::new_with_user( + recipient, + vec![], + conv, ))) .await?; - - while let Ok(response) = rx.next().await.ok_or(AppError::ListenerDisconnected)? { - let evt = Arc::clone(&evt); - let _ = self.runtime.add_task(async move { - let _ = evt.on_cashu_direct(response.clone()).await?; - Ok::<(), AppError>(()) - }); - } Ok(()) } + + pub async fn next_cashu_direct(&self) -> Result { + let response = self + .cashu_direct_rx + .lock() + .await + .next() + .await + .ok_or(AppError::ListenerDisconnected)?; + let response = response.map_err(|e| AppError::ParseError(e.to_string()))?; + log::debug!("Received cashu direct: {:?}", response); + Ok(response) + } } impl PortalApp { @@ -1109,7 +1037,7 @@ impl From for RelayStatus { } } -#[derive(Debug, uniffi::Record)] +#[derive(Debug, Clone, uniffi::Record)] pub struct SinglePaymentRequest { pub service_key: PublicKey, pub recipient: PublicKey, @@ -1118,7 +1046,7 @@ pub struct SinglePaymentRequest { pub event_id: String, } -#[derive(Debug, uniffi::Record)] +#[derive(Debug, Clone, uniffi::Record)] pub struct RecurringPaymentRequest { pub service_key: PublicKey, pub recipient: PublicKey, @@ -1127,6 +1055,12 @@ pub struct RecurringPaymentRequest { pub event_id: String, } +#[derive(Debug, Clone, uniffi::Enum)] +pub enum IncomingPaymentRequest { + Single(SinglePaymentRequest), + Recurring(RecurringPaymentRequest), +} + #[derive(Debug, thiserror::Error, uniffi::Error)] pub enum AppError { #[error("Failed to connect to relay: {0}")] diff --git a/cli/src/bin/cashu.rs b/cli/src/bin/cashu.rs index 26f4dbbf..c316a110 100644 --- a/cli/src/bin/cashu.rs +++ b/cli/src/bin/cashu.rs @@ -1,28 +1,11 @@ use std::{sync::Arc, time::Duration as StdDuration}; -use app::{CallbackError, CashuRequestListener}; use cli::{CliError, create_app_instance, create_sdk_instance}; use portal::protocol::model::{ Timestamp, - payment::{CashuRequestContent, CashuRequestContentWithKey, CashuResponseStatus}, + payment::{CashuRequestContent, CashuResponseStatus}, }; -struct LogCashuRequestListener; - -#[async_trait::async_trait] -impl CashuRequestListener for LogCashuRequestListener { - async fn on_cashu_request( - &self, - event: CashuRequestContentWithKey, - ) -> Result { - log::info!("Received Cashu request: {:?}", event); - // Always approve for test - Ok(CashuResponseStatus::Success { - token: "testtoken123".to_string(), - }) - } -} - #[tokio::main] async fn main() -> Result<(), CliError> { env_logger::init(); @@ -35,14 +18,32 @@ async fn main() -> Result<(), CliError> { relays.clone(), ) .await?; - let _receiver = receiver.clone(); + let receiver_loop = Arc::clone(&receiver); tokio::spawn(async move { - log::info!("Receiver: Setting up Cashu request listener"); - _receiver - .listen_cashu_requests(Arc::new(LogCashuRequestListener)) - .await - .expect("Receiver: Error creating listener"); + log::info!("Receiver: Setting up Cashu request loop"); + loop { + match receiver_loop.next_cashu_request().await { + Ok(event) => { + log::info!("Receiver: Cashu request {:?}", event); + if let Err(e) = receiver_loop + .reply_cashu_request( + event, + CashuResponseStatus::Success { + token: "testtoken123".to_string(), + }, + ) + .await + { + log::error!("Receiver: Failed to reply to Cashu request: {:?}", e); + } + } + Err(e) => { + log::error!("Receiver: Cashu loop error: {:?}", e); + break; + } + } + } }); let sender_sdk = create_sdk_instance( diff --git a/cli/src/bin/invoices.rs b/cli/src/bin/invoices.rs index 2ba928ee..afdbfe0c 100644 --- a/cli/src/bin/invoices.rs +++ b/cli/src/bin/invoices.rs @@ -1,38 +1,8 @@ use std::sync::Arc; -use app::{ - CallbackError, InvoiceRequestListener, InvoiceResponseListener, nwc::MakeInvoiceResponse, -}; +use app::nwc::MakeInvoiceResponse; use cli::{CliError, create_app_instance}; -use portal::protocol::model::{ - Timestamp, - payment::{InvoiceRequestContent, InvoiceRequestContentWithKey, InvoiceResponse}, -}; - -struct LogInvoiceRequestListener; - -#[async_trait::async_trait] -impl InvoiceRequestListener for LogInvoiceRequestListener { - async fn on_invoice_requests( - &self, - event: InvoiceRequestContentWithKey, - ) -> Result { - Ok(MakeInvoiceResponse { - invoice: String::from("bolt11"), - payment_hash: Some(String::from("bolt11 hash")), - }) - } -} - -struct LogInvoiceResponseListener; - -#[async_trait::async_trait] -impl InvoiceResponseListener for LogInvoiceResponseListener { - async fn on_invoice_response(&self, event: InvoiceResponse) -> Result<(), CallbackError> { - log::info!("Received an invoice: {:?}", event); - Ok(()) - } -} +use portal::protocol::model::{Timestamp, payment::InvoiceRequestContent}; #[tokio::main] async fn main() -> Result<(), CliError> { @@ -46,17 +16,35 @@ async fn main() -> Result<(), CliError> { relays.clone(), ) .await?; - let _receiver = receiver.clone(); + let receiver_loop = Arc::clone(&receiver); tokio::spawn(async move { - log::info!("Receiver: Setting up invoice request listener"); - _receiver - .listen_invoice_requests(Arc::new(LogInvoiceRequestListener)) - .await - .expect("Receiver: Error creating listener"); + log::info!("Receiver: Setting up invoice request loop"); + loop { + match receiver_loop.next_invoice_request().await { + Ok(request) => { + if let Err(e) = receiver_loop + .reply_invoice_request( + request, + MakeInvoiceResponse { + invoice: String::from("bolt11"), + payment_hash: Some(String::from("bolt11 hash")), + }, + ) + .await + { + log::error!("Receiver: Failed to reply to invoice request: {:?}", e); + } + } + Err(e) => { + log::error!("Receiver: Invoice loop error: {:?}", e); + break; + } + } + } }); - let (sender_key, sender) = create_app_instance( + let (_sender_key, sender) = create_app_instance( "Sender", "draft sunny old taxi chimney ski tilt suffer subway bundle once story", relays.clone(), @@ -81,10 +69,11 @@ async fn main() -> Result<(), CliError> { description: Some(String::from("Dinner")), refund_invoice: None, }, - Arc::new(LogInvoiceResponseListener), ) .await .unwrap(); + + log::info!("Sender: Invoice response {:?}", result); }); log::info!("Apps created"); diff --git a/cli/src/bin/main.rs b/cli/src/bin/main.rs index 149bfc50..61abd1df 100644 --- a/cli/src/bin/main.rs +++ b/cli/src/bin/main.rs @@ -1,21 +1,18 @@ use std::{io::Write, str::FromStr, sync::Arc}; use app::{ - AuthChallengeListener, CallbackError, CashuDirectListener, CashuRequestListener, - ClosedRecurringPaymentListener, Mnemonic, PaymentRequestListener, PaymentStatusNotifier, - PortalApp, RecurringPaymentRequest, RelayStatus, RelayStatusListener, RelayUrl, - SinglePaymentRequest, auth::AuthChallengeEvent, get_git_hash, nwc::NWC, parse_bolt11, + CallbackError, IncomingPaymentRequest, Mnemonic, PortalApp, RelayStatus, RelayStatusListener, + RelayUrl, SinglePaymentRequest, get_git_hash, nwc, nwc::NWC, parse_bolt11, }; -use log::info; +use log::{error, info}; use portal::{ - nostr::nips::{nip19::ToBech32, nip47::PayInvoiceRequest}, + nostr::nips::nip19::ToBech32, protocol::{ key_handshake::KeyHandshakeUrl, model::{ auth::AuthResponseStatus, payment::{ - CashuDirectContentWithKey, CashuRequestContentWithKey, CashuResponseStatus, - CloseRecurringPaymentResponse, PaymentResponseContent, PaymentStatus, + CashuResponseStatus, PaymentResponseContent, PaymentStatus, RecurringPaymentResponseContent, RecurringPaymentStatus, }, }, @@ -37,138 +34,6 @@ impl RelayStatusListener for LogRelayStatusChange { } } -struct ApproveLogin(Arc); - -#[async_trait::async_trait] -impl AuthChallengeListener for ApproveLogin { - async fn on_auth_challenge( - &self, - event: AuthChallengeEvent, - ) -> Result { - log::info!("Received auth challenge: {:?}", event); - - dbg!(self.0.fetch_profile(event.service_key).await); - - Ok(AuthResponseStatus::Approved { - granted_permissions: vec![], - session_token: String::from("ABC"), - }) - } -} - -struct ApprovePayment(Arc); - -#[async_trait::async_trait] -impl PaymentRequestListener for ApprovePayment { - async fn on_single_payment_request( - &self, - event: SinglePaymentRequest, - notifier: Arc, - ) -> Result<(), CallbackError> { - log::info!("Received single payment request: {:?}", event); - - notifier - .notify(PaymentResponseContent { - status: PaymentStatus::Approved, - request_id: event.content.request_id.clone(), - }) - .await?; - - let nwc = self.0.clone(); - tokio::task::spawn(async move { - let payment_result = nwc - .pay_invoice(PayInvoiceRequest { - id: None, - invoice: event.content.invoice, - amount: None, - }) - .await; - log::info!("Payment result: {:?}", payment_result); - - match payment_result { - Ok(payment) => { - notifier - .notify(PaymentResponseContent { - status: PaymentStatus::Success { - preimage: Some(payment.preimage), - }, - request_id: event.content.request_id, - }) - .await - .unwrap(); - } - Err(e) => { - log::error!("Payment failed: {:?}", e); - notifier - .notify(PaymentResponseContent { - status: PaymentStatus::Failed { - reason: Some(e.to_string()), - }, - request_id: event.content.request_id, - }) - .await - .unwrap(); - } - } - }); - - Ok(()) - } - - async fn on_recurring_payment_request( - &self, - event: RecurringPaymentRequest, - ) -> Result { - log::info!("Received recurring payment request: {:?}", event); - Ok(RecurringPaymentResponseContent { - status: RecurringPaymentStatus::Confirmed { - subscription_id: "randomid".to_string(), - authorized_amount: event.content.amount, - authorized_currency: event.content.currency, - authorized_recurrence: event.content.recurrence, - }, - request_id: event.content.request_id, - }) - } -} - -struct LogClosedRecurringPayment; - -#[async_trait::async_trait] -impl ClosedRecurringPaymentListener for LogClosedRecurringPayment { - async fn on_closed_recurring_payment( - &self, - event: CloseRecurringPaymentResponse, - ) -> Result<(), CallbackError> { - log::warn!("Received closed recurring payment: {:?}", event); - Ok(()) - } -} - -struct LogCashuRequestListener; - -#[async_trait::async_trait] -impl CashuRequestListener for LogCashuRequestListener { - async fn on_cashu_request( - &self, - event: CashuRequestContentWithKey, - ) -> Result { - log::info!("Received Cashu request: {:?}", event); - // Always approve for test - Ok(CashuResponseStatus::Success { - token: "testtoken123".to_string(), - }) - } -} - -#[async_trait::async_trait] -impl CashuDirectListener for LogCashuRequestListener { - async fn on_cashu_direct(&self, event: CashuDirectContentWithKey) -> Result<(), CallbackError> { - log::info!("Received Cashu direct: {:?}", event); - Ok(()) - } -} - #[tokio::main] async fn main() -> Result<(), Box> { env_logger::init(); @@ -184,10 +49,12 @@ async fn main() -> Result<(), Box> { // Testing database so commented for now let nwc_str = std::env::var("CLI_NWC_URL").expect("CLI_NWC_URL is not set"); - let nwc = NWC::new(nwc_str.parse()?, Arc::new(LogRelayStatusChange)).unwrap_or_else(|e| { - dbg!(e); - panic!(); - }); + let nwc = Arc::new( + NWC::new(nwc_str.parse()?, Arc::new(LogRelayStatusChange)).unwrap_or_else(|e| { + dbg!(e); + panic!(); + }), + ); log::info!( "Public key: {:?}", @@ -244,39 +111,30 @@ async fn main() -> Result<(), Box> { // .await? // ); - let _app = Arc::clone(&app); + let auth_app = Arc::clone(&app); tokio::spawn(async move { - _app.listen_for_auth_challenge(Arc::new(ApproveLogin(Arc::clone(&_app)))) - .await - .unwrap(); + auth_challenge_loop(auth_app).await; }); - let _app = Arc::clone(&app); + let payment_app = Arc::clone(&app); + let payment_nwc = Arc::clone(&nwc); tokio::spawn(async move { - _app.listen_for_payment_request(Arc::new(ApprovePayment(Arc::new(nwc::NWC::new(nwc_str.parse().unwrap()))))) - .await - .unwrap(); + payment_request_loop(payment_app, payment_nwc).await; }); - let _app = Arc::clone(&app); + let closed_app = Arc::clone(&app); tokio::spawn(async move { - _app.listen_closed_recurring_payment(Arc::new(LogClosedRecurringPayment)) - .await - .unwrap(); + closed_recurring_loop(closed_app).await; }); - let _app = Arc::clone(&app); + let cashu_request_app = Arc::clone(&app); tokio::spawn(async move { - _app.listen_cashu_direct(Arc::new(LogCashuRequestListener)) - .await - .unwrap(); + cashu_request_loop(cashu_request_app).await; }); - let _app = Arc::clone(&app); + let cashu_direct_app = Arc::clone(&app); tokio::spawn(async move { - _app.listen_cashu_requests(Arc::new(LogCashuRequestListener)) - .await - .unwrap(); + cashu_direct_loop(cashu_direct_app).await; }); // let _app = Arc::clone(&app); @@ -316,10 +174,15 @@ async fn main() -> Result<(), Box> { let bolt11_invoice = Bolt11Invoice::from_str(invoice_str)?; let payment_hash_string = bolt11_invoice.payment_hash().to_string(); - let result = nwc.lookup_invoice_from_payment_hash(payment_hash_string).await; + let result = nwc + .lookup_invoice_from_payment_hash(payment_hash_string) + .await; match result { Ok(lur) => { - info!("invoice from lookup_invoice_with_payment_hash -> {}", lur.invoice.unwrap()); + info!( + "invoice from lookup_invoice_with_payment_hash -> {}", + lur.invoice.unwrap() + ); } Err(e) => { info!("{}", e); @@ -329,8 +192,14 @@ async fn main() -> Result<(), Box> { dbg!(get_git_hash()); tokio::spawn(async move { const INVOICE: &str = "lnbc100n1p5fvqfdsp586d9yz88deyfxm2mxgh39n39lezmpnkcv0a35uh38fvnjzlaxdzqpp59nwc8zac6psv09wysxvulgwj0t23jh3g5r4l5qzgpdsnel94w5zshp5mndu23huxkp6jgynf8agfjfaypgfjs2z8glq8fs9zqjfpnf34jnqcqpjrzjqgc7enr9zr4ju8yhezsep4h2p9ncf2nuxkp423pq2k4v3vsx2nunyz60tsqqj9qqqqqqqqqpqqqqqysqjq9qxpqysgqala28sswmp68uc9axqt893n48lzzt7l3uzkzjzlmlzurczpc647sxn4vrt4hvm30v5vv2ysvxhxeej78j903emrrjh02xdrl6z9alzqqns0w5s"; - let invoice_data = parse_bolt11(INVOICE); - dbg!(invoice_data); + match parse_bolt11(INVOICE) { + Ok(invoice_data) => { + dbg!(invoice_data); + } + Err(e) => { + error!("Failed to parse bolt11 invoice: {:?}", e); + } + } }); println!("\nEnter the auth init URL:"); @@ -345,3 +214,154 @@ async fn main() -> Result<(), Box> { Ok(()) } + +async fn auth_challenge_loop(app: Arc) { + loop { + match app.next_auth_challenge().await { + Ok(event) => { + info!("Received auth challenge: {:?}", event); + let _ = app.fetch_profile(event.service_key.clone()).await; + let status = AuthResponseStatus::Approved { + granted_permissions: vec![], + session_token: String::from("ABC"), + }; + if let Err(e) = app.reply_auth_challenge(event, status).await { + error!("Failed to reply to auth challenge: {:?}", e); + } + } + Err(e) => { + error!("Auth challenge loop error: {:?}", e); + break; + } + } + } +} + +async fn payment_request_loop(app: Arc, nwc: Arc) { + loop { + match app.next_payment_request().await { + Ok(IncomingPaymentRequest::Single(request)) => { + info!("Received single payment request: {:?}", request); + let single_app = Arc::clone(&app); + let single_nwc = Arc::clone(&nwc); + tokio::spawn(async move { + process_single_payment_request(single_app, single_nwc, request).await; + }); + } + Ok(IncomingPaymentRequest::Recurring(request)) => { + info!("Received recurring payment request: {:?}", request); + let content = request.content.clone(); + let status = RecurringPaymentResponseContent { + status: RecurringPaymentStatus::Confirmed { + subscription_id: "randomid".to_string(), + authorized_amount: content.amount, + authorized_currency: content.currency, + authorized_recurrence: content.recurrence, + }, + request_id: content.request_id, + }; + if let Err(e) = app.reply_recurring_payment_request(request, status).await { + error!("Failed to reply to recurring payment request: {:?}", e); + } + } + Err(e) => { + error!("Payment request loop error: {:?}", e); + break; + } + } + } +} + +async fn process_single_payment_request( + app: Arc, + nwc: Arc, + request: SinglePaymentRequest, +) { + if let Err(e) = app + .reply_single_payment_request( + request.clone(), + PaymentResponseContent { + status: PaymentStatus::Approved, + request_id: request.content.request_id.clone(), + }, + ) + .await + { + error!("Failed to send approval for payment request: {:?}", e); + return; + } + + let payment_result = nwc.pay_invoice(request.content.invoice.clone()).await; + info!("Payment result: {:?}", payment_result); + + let status = match payment_result { + Ok(preimage) => PaymentResponseContent { + status: PaymentStatus::Success { + preimage: Some(preimage), + }, + request_id: request.content.request_id.clone(), + }, + Err(e) => { + error!("Payment failed: {:?}", e); + PaymentResponseContent { + status: PaymentStatus::Failed { + reason: Some(e.to_string()), + }, + request_id: request.content.request_id.clone(), + } + } + }; + + if let Err(e) = app.reply_single_payment_request(request, status).await { + error!("Failed to send payment status update: {:?}", e); + } +} + +async fn closed_recurring_loop(app: Arc) { + loop { + match app.next_closed_recurring_payment().await { + Ok(event) => info!("Received closed recurring payment: {:?}", event), + Err(e) => { + error!("Closed recurring payment loop error: {:?}", e); + break; + } + } + } +} + +async fn cashu_request_loop(app: Arc) { + loop { + match app.next_cashu_request().await { + Ok(event) => { + info!("Received Cashu request: {:?}", event); + if let Err(e) = app + .reply_cashu_request( + event, + CashuResponseStatus::Success { + token: "testtoken123".to_string(), + }, + ) + .await + { + error!("Failed to reply to Cashu request: {:?}", e); + } + } + Err(e) => { + error!("Cashu request loop error: {:?}", e); + break; + } + } + } +} + +async fn cashu_direct_loop(app: Arc) { + loop { + match app.next_cashu_direct().await { + Ok(event) => info!("Received Cashu direct: {:?}", event), + Err(e) => { + error!("Cashu direct loop error: {:?}", e); + break; + } + } + } +} diff --git a/rest/src/command.rs b/rest/src/command.rs index 466156c9..5ee736d8 100644 --- a/rest/src/command.rs +++ b/rest/src/command.rs @@ -1,8 +1,9 @@ use portal::profile::Profile; -use portal::protocol::model::Timestamp; use portal::protocol::model::payment::{ - Currency, InvoiceRequestContent, RecurrenceInfo, RecurringPaymentRequestContent, SinglePaymentRequestContent + Currency, InvoiceRequestContent, RecurrenceInfo, RecurringPaymentRequestContent, + SinglePaymentRequestContent, }; +use portal::protocol::model::Timestamp; use serde::Deserialize; #[derive(Debug, Deserialize)] @@ -22,7 +23,6 @@ pub enum Command { }, // SDK methods - NewKeyHandshakeUrl { static_token: Option, no_request: Option, @@ -117,11 +117,10 @@ pub struct SinglePaymentParams { pub amount: u64, pub currency: Currency, pub auth_token: Option, - + pub subscription_id: Option, } - #[derive(Debug, Deserialize)] pub struct RecurringPaymentParams { pub description: Option, diff --git a/rest/src/response.rs b/rest/src/response.rs index af9faeae..7917dbed 100644 --- a/rest/src/response.rs +++ b/rest/src/response.rs @@ -1,8 +1,8 @@ use nostr::nips::nip05::Nip05Profile; use portal::profile::Profile; -use portal::protocol::model::Timestamp; use portal::protocol::model::auth::AuthResponseStatus; use portal::protocol::model::payment::{CashuResponseStatus, RecurringPaymentResponseContent}; +use portal::protocol::model::Timestamp; use serde::Serialize; // Response structs for each API diff --git a/rest/src/ws.rs b/rest/src/ws.rs index 94c86c74..f9e4e174 100644 --- a/rest/src/ws.rs +++ b/rest/src/ws.rs @@ -20,7 +20,8 @@ use portal::nostr_relay_pool::RelayOptions; use portal::protocol::calendar::Calendar; use portal::protocol::jwt::CustomClaims; use portal::protocol::model::payment::{ - CashuDirectContent, CashuRequestContent, Currency, ExchangeRate, PaymentStatus, RecurringPaymentRequestContent, SinglePaymentRequestContent + CashuDirectContent, CashuRequestContent, Currency, ExchangeRate, PaymentStatus, + RecurringPaymentRequestContent, SinglePaymentRequestContent, }; use portal::protocol::model::Timestamp; use rand::RngCore; @@ -441,7 +442,12 @@ async fn handle_command(command: CommandWithId, ctx: Arc) { }); } Err(e) => { - let _ = ctx.send_error_message(&command.id, &format!("Failed to fetch market data: {}", e)).await; + let _ = ctx + .send_error_message( + &command.id, + &format!("Failed to fetch market data: {}", e), + ) + .await; return; } } @@ -520,7 +526,6 @@ async fn handle_command(command: CommandWithId, ctx: Arc) { let amount = payment_request.amount; let mut msat_amount = payment_request.amount; - // If the currency is fiat, we need to convert it to millisats let mut current_exchange_rate = None; if let Currency::Fiat(currency) = &payment_request.currency { @@ -528,7 +533,7 @@ async fn handle_command(command: CommandWithId, ctx: Arc) { let market_data = ctx.market_api.clone().fetch_market_data(¤cy).await; match market_data { Ok(market_data) => { - msat_amount = market_data.calculate_millisats(fiat_amount) as u64; + msat_amount = market_data.calculate_millisats(fiat_amount) as u64; current_exchange_rate = Some(ExchangeRate { rate: market_data.rate, source: market_data.source, @@ -536,19 +541,20 @@ async fn handle_command(command: CommandWithId, ctx: Arc) { }); } Err(e) => { - let _ = ctx.send_error_message(&command.id, &format!("Failed to fetch market data: {}", e)).await; + let _ = ctx + .send_error_message( + &command.id, + &format!("Failed to fetch market data: {}", e), + ) + .await; return; } } } - // TODO: fetch and apply fiat exchange rate let invoice = match wallet - .make_invoice( - msat_amount, - Some(payment_request.description.clone()), - ) + .make_invoice(msat_amount, Some(payment_request.description.clone())) .await { Ok(invoice) => invoice, @@ -1446,7 +1452,9 @@ async fn handle_command(command: CommandWithId, ctx: Arc) { let calendar = match Calendar::from_str(&calendar) { Ok(calendar) => calendar, Err(e) => { - let _ = ctx.send_error_message(&command.id, &format!("Invalid calendar: {}", e)).await; + let _ = ctx + .send_error_message(&command.id, &format!("Invalid calendar: {}", e)) + .await; return; } }; diff --git a/src/protocol/calendar.rs b/src/protocol/calendar.rs index 87e03d14..61b6f461 100644 --- a/src/protocol/calendar.rs +++ b/src/protocol/calendar.rs @@ -518,7 +518,10 @@ impl Calendar { && matches!(self.day, TimeComponent::Any) && matches!(self.hour, TimeComponent::Any) { - if let TimeComponent::Range { step: Some(step), .. } = &self.minute { + if let TimeComponent::Range { + step: Some(step), .. + } = &self.minute + { return Some(format!("Every {} minutes", step)); } } @@ -527,13 +530,19 @@ impl Calendar { && matches!(self.month, TimeComponent::Any) && matches!(self.day, TimeComponent::Any) { - if let TimeComponent::Range { step: Some(step), .. } = &self.hour { + if let TimeComponent::Range { + step: Some(step), .. + } = &self.hour + { return Some(format!("Every {} hours", step)); } } if matches!(self.year, TimeComponent::Any) && matches!(self.month, TimeComponent::Any) { - if let TimeComponent::Range { step: Some(step), .. } = &self.day { + if let TimeComponent::Range { + step: Some(step), .. + } = &self.day + { return Some(format!("Every {} days", step)); } } @@ -1093,8 +1102,6 @@ mod tests { .unwrap() .with_timezone(&chrono_tz::Europe::Rome) ); - - } #[test] @@ -1286,10 +1293,7 @@ mod tests { second: TimeComponent::Values(vec![0]), timezone: None, }; - assert_eq!( - cal_every_10.to_human_readable(true), - "Every 10 minutes" - ); + assert_eq!(cal_every_10.to_human_readable(true), "Every 10 minutes"); let cal_every_6h = Calendar { weekdays: None, @@ -1305,10 +1309,7 @@ mod tests { second: TimeComponent::Values(vec![0]), timezone: None, }; - assert_eq!( - cal_every_6h.to_human_readable(true), - "Every 6 hours" - ); + assert_eq!(cal_every_6h.to_human_readable(true), "Every 6 hours"); let cal_every_3d = Calendar { weekdays: None, diff --git a/src/router/actor.rs b/src/router/actor.rs index ae454c90..16814588 100644 --- a/src/router/actor.rs +++ b/src/router/actor.rs @@ -1089,7 +1089,7 @@ struct ConversationState { #[derive(Debug)] enum InnerConversationState { Standard(ConversationBox), - Alias + Alias, } impl InnerConversationState { @@ -1230,7 +1230,10 @@ impl ConversationState { Err(mpsc::error::SendError(_)) => { // Channel is closed, remove dead subscriber // Do not push to alive_subscribers - log::warn!("Removing subscriber from conversation {} because channel is closed", self.id); + log::warn!( + "Removing subscriber from conversation {} because channel is closed", + self.id + ); } } } diff --git a/wallet/src/breez.rs b/wallet/src/breez.rs index 0214cd75..901751e8 100644 --- a/wallet/src/breez.rs +++ b/wallet/src/breez.rs @@ -81,7 +81,10 @@ impl PortalWallet for BreezSparkWallet { .. }) => { if invoice == *payment_invoice { - return Ok((payment.status == PaymentStatus::Completed, preimage.clone())); + return Ok(( + payment.status == PaymentStatus::Completed, + preimage.clone(), + )); } } _ => {}