diff --git a/src/matrix.rs b/src/matrix.rs index 6f83a37..55d6364 100644 --- a/src/matrix.rs +++ b/src/matrix.rs @@ -1,5 +1,3 @@ -use crate::ws_coin::connect; - use super::{ price_queue::{PlotKind, PriceQueue}, screen::{ @@ -52,10 +50,7 @@ impl BtcEthMatrix { ]; BtcEthMatrix { pq: PriceQueue::default(), - ws_coin: WsCoin { - socket: connect(&markets).await.unwrap(), - markets, - }, + ws_coin: WsCoin::new(markets).await, btc_price: None, eth_price: None, } @@ -113,10 +108,7 @@ impl BtcTimeMatrix { }]; BtcTimeMatrix { pq: PriceQueue::default(), - ws_coin: WsCoin { - socket: connect(&markets).await.unwrap(), - markets, - }, + ws_coin: WsCoin::new(markets).await, price: None, } } diff --git a/src/ws_coin/mod.rs b/src/ws_coin/mod.rs index 9d5072a..b3d4734 100644 --- a/src/ws_coin/mod.rs +++ b/src/ws_coin/mod.rs @@ -3,8 +3,8 @@ pub mod parse_json; use futures::{SinkExt, Stream, StreamExt}; use ordered_float::NotNan; use parse_json::{parse_json, Msg}; -use std::{error::Error, fmt, time::Duration}; -use tokio::net::TcpStream; +use std::{error::Error, fmt, sync::Arc, time::Duration}; +use tokio::{net::TcpStream, sync::Mutex}; use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream}; use url::Url; @@ -16,6 +16,7 @@ pub struct Price { pub price: NotNan, } +#[derive(Clone)] pub struct Market { pub symbol: String, pub name: String, @@ -26,6 +27,7 @@ pub enum RecvError { RecevingError(String), UnexpectedMsgError(String), ParsingError(String), + Disconnected, } impl Error for RecvError {} @@ -36,16 +38,28 @@ impl fmt::Display for RecvError { RecvError::RecevingError(err_str) => write!(f, "{}", err_str), RecvError::UnexpectedMsgError(err_str) => write!(f, "{}", err_str), RecvError::ParsingError(err_str) => write!(f, "{}", err_str), + RecvError::Disconnected => write!(f, "Disconnected"), } } } +#[derive(Clone)] pub struct WsCoin { - pub markets: Vec, - pub socket: PriceSocket, + markets: Vec, + socket: Arc>, + reconnecting: Arc>, +} +impl WsCoin { + pub async fn new(markets: Vec) -> Self { + WsCoin { + socket: Arc::new(Mutex::new(connect(&markets).await.unwrap())), + markets, + reconnecting: Arc::new(Mutex::new(false)), + } + } } -pub async fn connect(markets: &[Market]) -> anyhow::Result { +async fn connect(markets: &[Market]) -> anyhow::Result { println!("Connecting to Binance WebSocket..."); let url = Url::parse("wss://data-stream.binance.com/ws/test")?; let (mut socket, _) = connect_async(url).await?; @@ -67,16 +81,23 @@ impl WsCoin { symbol: "BTCUSDT".to_string(), name: "BTC".to_string(), }]; - WsCoin { - socket: connect(&markets).await.unwrap(), - markets, - } + Self::new(markets).await } - async fn connect(&mut self) { + + async fn reconnect(&self) { + if *self.reconnecting.lock().await { + return; + } + + *self.reconnecting.lock().await = true; + println!("Reconnect in 60s..."); + tokio::time::sleep(Duration::from_secs(60)).await; + println!("Reconnecting..."); + let mut socket = self.socket.lock().await; loop { match connect(&self.markets).await { - Ok(socket) => { - self.socket = socket; + Ok(new_socket) => { + *socket = new_socket; break; } Err(error) => { @@ -84,18 +105,12 @@ impl WsCoin { } } } - } - - async fn reconnect(&mut self) { - println!("Reconnect in 60s..."); - tokio::time::sleep(Duration::from_secs(60)).await; - println!("Reconnecting..."); - self.connect().await; println!("Reconnected. \n\n\n\n\n\n\n\n"); + *self.reconnecting.lock().await = false; } async fn recv_price(&mut self) -> Result { - while let Some(msg) = self.socket.next().await { + while let Some(msg) = self.socket.lock().await.next().await { match msg { Ok(Message::Text(msg)) => match parse_json(&msg) { Ok(msg) => match msg { @@ -122,6 +137,8 @@ impl WsCoin { }, Ok(Message::Ping(_)) => { self.socket + .lock() + .await .send(Message::Pong(vec![])) .await .expect("failed sending pong"); @@ -135,7 +152,7 @@ impl WsCoin { Err(error) => return Err(RecvError::RecevingError(error.to_string())), } } - Err(RecvError::RecevingError("Disconnected".to_string())) + Err(RecvError::Disconnected) } fn subscribe(&mut self) -> impl Stream + '_ { @@ -144,8 +161,14 @@ impl WsCoin { match self.recv_price().await { Ok(price) => yield price, Err(error) => { - println!("{}\n\n\n\n\n\n\n\n", error); - self.reconnect().await + match error { + RecvError::Disconnected => (), + _ => { + println!("Error happened: {}\n\n\n\n\n\n\n\n", error); + } + } + let _self = self.clone(); + tokio::spawn(async move { _self.reconnect().await }); } } }