Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove some unwraps, including one which panicked in production #43

Merged
merged 3 commits into from
Oct 9, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 41 additions & 29 deletions src/faucet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@
// along with the Discord Faucet library. If not, see <https://mit-license.org/>.

use anyhow::{Error, Result};
use async_std::{channel::Receiver, sync::RwLock, task::JoinHandle};
use async_std::{
channel::Receiver,
sync::{RwLock, RwLockUpgradableReadGuard},
task::{sleep, JoinHandle},
};
use clap::Parser;
use ethers::{
prelude::SignerMiddleware,
Expand Down Expand Up @@ -467,17 +471,12 @@ impl Faucet {
async fn handle_non_faucet_transfer(&self, receipt: &TransactionReceipt) -> Result<()> {
tracing::debug!("Handling external incoming transfer to {:?}", receipt.to);
if let Some(receiver) = receipt.to {
if self
.state
.read()
.await
.clients_being_funded
.contains_key(&receiver)
{
let state = self.state.upgradable_read().await;
if state.clients_being_funded.contains_key(&receiver) {
let balance = self.balance(receiver).await?;
if balance >= self.config.min_funding_balance() {
tracing::info!("Funded client {:?} with external transfer", receiver);
let mut state = self.state.write().await;
let mut state = RwLockUpgradableReadGuard::upgrade(state).await;
if let Some(transfer_index) =
state.transfer_queue.iter().position(|r| r.to() == receiver)
{
Expand Down Expand Up @@ -511,7 +510,10 @@ impl Faucet {

// Only continue if there's an inflight transfer or the recipient is a client being funded.
let is_relevant = inflight.is_some()
|| (tx.to.is_some() && state.clients_being_funded.contains_key(&tx.to.unwrap()));
|| tx
.to
.as_ref()
.is_some_and(|to| state.clients_being_funded.contains_key(to));

drop(state);

Expand All @@ -530,13 +532,12 @@ impl Faucet {

tracing::debug!("Got receipt {:?}", receipt);

if inflight.is_none() {
return self.handle_non_faucet_transfer(&receipt).await;
}

let Transfer {
let Some(Transfer {
sender, request, ..
} = inflight.unwrap();
}) = inflight
else {
return self.handle_non_faucet_transfer(&receipt).await;
};

tracing::info!("Received receipt for {request:?}");
// Do all external calls before state modifications
Expand Down Expand Up @@ -599,18 +600,29 @@ impl Faucet {
async fn monitor_transactions(&self) -> Result<()> {
loop {
let mut stream = match &self.ws_provider {
Some(provider) => provider
.subscribe_blocks()
.await
.unwrap()
.filter_map(|block| async move {
if block.hash.is_none() {
tracing::warn!("Received block without hash, ignoring: {block:?}");
}
block.hash
})
.boxed(),
None => self.provider.watch_blocks().await.unwrap().boxed(),
Some(provider) => match provider.subscribe_blocks().await {
Ok(stream) => stream
.filter_map(|block| async move {
if block.hash.is_none() {
tracing::warn!("Received block without hash, ignoring: {block:?}");
}
block.hash
})
.boxed(),
Err(err) => {
tracing::error!("Error reconnecting to block stream: {err}");
sleep(Duration::from_secs(1)).await;
continue;
}
},
None => match self.provider.watch_blocks().await {
Ok(stream) => stream.boxed(),
Err(err) => {
tracing::error!("Error reconnecting to block stream: {err}");
sleep(Duration::from_secs(1)).await;
continue;
}
},
};

self.state.write().await.monitoring_started = true;
Expand Down Expand Up @@ -641,7 +653,7 @@ impl Faucet {
// If we get here, the subscription was closed. This happens for example
// if the RPC server is restarted.
tracing::warn!("Block subscription closed, will restart ...");
async_std::task::sleep(Duration::from_secs(5)).await;
sleep(Duration::from_secs(5)).await;
}
}

Expand Down
Loading