Skip to content

Commit

Permalink
Better error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
WestXu committed May 7, 2024
1 parent b60ddc5 commit d3d7613
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 33 deletions.
12 changes: 2 additions & 10 deletions src/matrix.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use crate::ws_coin::connect;

use super::{
price_queue::{PlotKind, PriceQueue},
screen::{
Expand Down Expand Up @@ -52,10 +50,7 @@ impl BtcEthMatrix {
];
BtcEthMatrix {
pq: PriceQueue::default(),
ws_coin: WsCoin {
socket: connect(&markets).await.unwrap(),
markets,
},
ws_coin: WsCoin::new(markets).await,
btc_price: None,
eth_price: None,
}
Expand Down Expand Up @@ -113,10 +108,7 @@ impl BtcTimeMatrix {
}];
BtcTimeMatrix {
pq: PriceQueue::default(),
ws_coin: WsCoin {
socket: connect(&markets).await.unwrap(),
markets,
},
ws_coin: WsCoin::new(markets).await,
price: None,
}
}
Expand Down
69 changes: 46 additions & 23 deletions src/ws_coin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ pub mod parse_json;
use futures::{SinkExt, Stream, StreamExt};
use ordered_float::NotNan;
use parse_json::{parse_json, Msg};
use std::{error::Error, fmt, time::Duration};
use tokio::net::TcpStream;
use std::{error::Error, fmt, sync::Arc, time::Duration};
use tokio::{net::TcpStream, sync::Mutex};
use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
use url::Url;

Expand All @@ -16,6 +16,7 @@ pub struct Price {
pub price: NotNan<f64>,
}

#[derive(Clone)]
pub struct Market {
pub symbol: String,
pub name: String,
Expand All @@ -26,6 +27,7 @@ pub enum RecvError {
RecevingError(String),
UnexpectedMsgError(String),
ParsingError(String),
Disconnected,
}

impl Error for RecvError {}
Expand All @@ -36,16 +38,28 @@ impl fmt::Display for RecvError {
RecvError::RecevingError(err_str) => write!(f, "{}", err_str),
RecvError::UnexpectedMsgError(err_str) => write!(f, "{}", err_str),
RecvError::ParsingError(err_str) => write!(f, "{}", err_str),
RecvError::Disconnected => write!(f, "Disconnected"),
}
}
}

#[derive(Clone)]
pub struct WsCoin {
pub markets: Vec<Market>,
pub socket: PriceSocket,
markets: Vec<Market>,
socket: Arc<Mutex<PriceSocket>>,
reconnecting: Arc<Mutex<bool>>,
}
impl WsCoin {
pub async fn new(markets: Vec<Market>) -> Self {
WsCoin {
socket: Arc::new(Mutex::new(connect(&markets).await.unwrap())),
markets,
reconnecting: Arc::new(Mutex::new(false)),
}
}
}

pub async fn connect(markets: &[Market]) -> anyhow::Result<PriceSocket> {
async fn connect(markets: &[Market]) -> anyhow::Result<PriceSocket> {
println!("Connecting to Binance WebSocket...");
let url = Url::parse("wss://data-stream.binance.com/ws/test")?;
let (mut socket, _) = connect_async(url).await?;
Expand All @@ -67,35 +81,36 @@ impl WsCoin {
symbol: "BTCUSDT".to_string(),
name: "BTC".to_string(),
}];
WsCoin {
socket: connect(&markets).await.unwrap(),
markets,
}
Self::new(markets).await
}
async fn connect(&mut self) {

async fn reconnect(&self) {
if *self.reconnecting.lock().await {
return;
}

*self.reconnecting.lock().await = true;
println!("Reconnect in 60s...");
tokio::time::sleep(Duration::from_secs(60)).await;
println!("Reconnecting...");
let mut socket = self.socket.lock().await;
loop {
match connect(&self.markets).await {
Ok(socket) => {
self.socket = socket;
Ok(new_socket) => {
*socket = new_socket;
break;
}
Err(error) => {
println!("error happened during connection: {error}, retrying...");
}
}
}
}

async fn reconnect(&mut self) {
println!("Reconnect in 60s...");
tokio::time::sleep(Duration::from_secs(60)).await;
println!("Reconnecting...");
self.connect().await;
println!("Reconnected. \n\n\n\n\n\n\n\n");
*self.reconnecting.lock().await = false;
}

async fn recv_price(&mut self) -> Result<Price, RecvError> {
while let Some(msg) = self.socket.next().await {
while let Some(msg) = self.socket.lock().await.next().await {
match msg {
Ok(Message::Text(msg)) => match parse_json(&msg) {
Ok(msg) => match msg {
Expand All @@ -122,6 +137,8 @@ impl WsCoin {
},
Ok(Message::Ping(_)) => {
self.socket
.lock()
.await
.send(Message::Pong(vec![]))
.await
.expect("failed sending pong");
Expand All @@ -135,7 +152,7 @@ impl WsCoin {
Err(error) => return Err(RecvError::RecevingError(error.to_string())),
}
}
Err(RecvError::RecevingError("Disconnected".to_string()))
Err(RecvError::Disconnected)
}

fn subscribe(&mut self) -> impl Stream<Item = Price> + '_ {
Expand All @@ -144,8 +161,14 @@ impl WsCoin {
match self.recv_price().await {
Ok(price) => yield price,
Err(error) => {
println!("{}\n\n\n\n\n\n\n\n", error);
self.reconnect().await
match error {
RecvError::Disconnected => (),
_ => {
println!("Error happened: {}\n\n\n\n\n\n\n\n", error);
}
}
let _self = self.clone();
tokio::spawn(async move { _self.reconnect().await });
}
}
}
Expand Down

0 comments on commit d3d7613

Please sign in to comment.