From d5c2d249aea05a011a18b1dd8f9cfa8a3fdc05d5 Mon Sep 17 00:00:00 2001 From: Ethan Fast Date: Thu, 24 Dec 2020 15:08:10 -0800 Subject: [PATCH 01/11] upgrade --- Cargo.toml | 2 +- src/model/python.rs | 9 +++++++++ tests/nash/account.rs | 2 +- tests/nash/market.rs | 2 +- 4 files changed, 12 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3fe32881..e181b828 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ keywords = ["cryptocurrency", "exchange", "openlimits", "api"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -default = ["rust_gmp"] +default = ["rust_gmp", "python"] rust_gmp = ["nash-protocol/rust_gmp", "nash-native-client/rust_gmp"] num_bigint = ["nash-protocol/num_bigint", "nash-native-client/num_bigint"] python = ["pyo3"] diff --git a/src/model/python.rs b/src/model/python.rs index 44ace6ba..1b8f7cc1 100644 --- a/src/model/python.rs +++ b/src/model/python.rs @@ -7,6 +7,7 @@ use crate::nash::{Environment, NashCredentials, NashParameters}; use pyo3::exceptions::PyException; use pyo3::prelude::{FromPyObject, IntoPy, PyObject, PyResult, Python, ToPyObject}; use pyo3::types::PyDict; +use std::time::Duration; // Python to Rust... @@ -199,12 +200,20 @@ impl<'a> FromPyObject<'a> for NashParameters { "timeout not included in nash credentials", ))? .extract()?; + let timeout = Duration::from_millis(timeout); + let sign_states_loop_interval: Option = py_dict + .get_item("timeout") + .ok_or(PyException::new_err( + "sign states loop interval not included in nash credentials", + ))? + .extract()?; Ok(NashParameters { affiliate_code, credentials, client_id, environment, timeout, + sign_states_loop_interval }) } } diff --git a/tests/nash/account.rs b/tests/nash/account.rs index 407c0196..34c18b08 100644 --- a/tests/nash/account.rs +++ b/tests/nash/account.rs @@ -255,7 +255,7 @@ async fn init() -> Nash { secret: env::var("NASH_API_SECRET").expect("Couldn't get environment variable."), session: env::var("NASH_API_KEY").expect("Couldn't get environment variable."), }), - environment: Environment::Sandbox, + environment: Environment::Production, client_id: 1, timeout: NativeDuration::new(10, 0), sign_states_loop_interval: None, diff --git a/tests/nash/market.rs b/tests/nash/market.rs index 86a0bfd6..361112ae 100644 --- a/tests/nash/market.rs +++ b/tests/nash/market.rs @@ -98,7 +98,7 @@ async fn init() -> Nash { secret: env::var("NASH_API_SECRET").expect("Couldn't get environment variable."), session: env::var("NASH_API_KEY").expect("Couldn't get environment variable."), }), - environment: Environment::Sandbox, + environment: Environment::Production, client_id: 1, timeout: Duration::new(10, 0), sign_states_loop_interval: None, From 2e2d1324ada5ac0547c9136e5150341453606566 Mon Sep 17 00:00:00 2001 From: Ethan Fast Date: Thu, 24 Dec 2020 15:17:30 -0800 Subject: [PATCH 02/11] add test --- tests/nash/account.rs | 20 +++++++++++++++++++- tests/nash/market.rs | 2 +- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/tests/nash/account.rs b/tests/nash/account.rs index 34c18b08..e34aec4e 100644 --- a/tests/nash/account.rs +++ b/tests/nash/account.rs @@ -15,6 +15,7 @@ use openlimits::{ use rust_decimal::prelude::{Decimal, FromStr}; use std::env; use std::time::Duration as NativeDuration; +use std::sync::Arc; // FIXME: https://github.com/nash-io/openlimits/issues/157 // #[tokio::test] @@ -206,6 +207,23 @@ async fn get_order_history() { println!("{:?}", resp); } +#[tokio::test] +async fn test_concurrent_requests() { + let exchange = init().await; + let client = Arc::new(exchange); + async fn make_request(client: Arc, i: u64){ + let req = client.get_account_balances(None).await; + if !req.is_err() { + println!("completed req {}", i); + } + } + let mut tasks = Vec::new(); + for i in 0..10 { + tasks.push(tokio::spawn(make_request(client.clone(), i))); + } + futures::future::join_all(tasks).await; +} + // #[tokio::test] // async fn get_all_open_orders() { // let mut exchange = init().await; @@ -255,7 +273,7 @@ async fn init() -> Nash { secret: env::var("NASH_API_SECRET").expect("Couldn't get environment variable."), session: env::var("NASH_API_KEY").expect("Couldn't get environment variable."), }), - environment: Environment::Production, + environment: Environment::Sandbox, client_id: 1, timeout: NativeDuration::new(10, 0), sign_states_loop_interval: None, diff --git a/tests/nash/market.rs b/tests/nash/market.rs index 361112ae..86a0bfd6 100644 --- a/tests/nash/market.rs +++ b/tests/nash/market.rs @@ -98,7 +98,7 @@ async fn init() -> Nash { secret: env::var("NASH_API_SECRET").expect("Couldn't get environment variable."), session: env::var("NASH_API_KEY").expect("Couldn't get environment variable."), }), - environment: Environment::Production, + environment: Environment::Sandbox, client_id: 1, timeout: Duration::new(10, 0), sign_states_loop_interval: None, From 3cab5e196ebe15c3a16892a590935d5883fef699 Mon Sep 17 00:00:00 2001 From: Ethan Fast Date: Thu, 24 Dec 2020 15:20:34 -0800 Subject: [PATCH 03/11] remove python feature default added for testing --- Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e181b828..187b3e58 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "openlimits" -version = "0.1.6" +version = "0.1.7" authors = ["steffel <2143646+steffenix@users.noreply.github.com>", "Ethan Fast "] edition = "2018" description = "A open source Rust high performance cryptocurrency trading API with support for multiple exchanges and language wrappers. Focused in safety, correctness and speed." @@ -12,7 +12,7 @@ keywords = ["cryptocurrency", "exchange", "openlimits", "api"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -default = ["rust_gmp", "python"] +default = ["rust_gmp"] rust_gmp = ["nash-protocol/rust_gmp", "nash-native-client/rust_gmp"] num_bigint = ["nash-protocol/num_bigint", "nash-native-client/num_bigint"] python = ["pyo3"] From 6bfb283c0db5c934a2f09b0768a4273d4f51131d Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 24 Dec 2020 23:19:56 +0000 Subject: [PATCH 04/11] Format Rust code using rustfmt --- src/model/python.rs | 2 +- tests/nash/account.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/model/python.rs b/src/model/python.rs index 1b8f7cc1..161666c2 100644 --- a/src/model/python.rs +++ b/src/model/python.rs @@ -213,7 +213,7 @@ impl<'a> FromPyObject<'a> for NashParameters { client_id, environment, timeout, - sign_states_loop_interval + sign_states_loop_interval, }) } } diff --git a/tests/nash/account.rs b/tests/nash/account.rs index e34aec4e..5dcaa654 100644 --- a/tests/nash/account.rs +++ b/tests/nash/account.rs @@ -14,8 +14,8 @@ use openlimits::{ }; use rust_decimal::prelude::{Decimal, FromStr}; use std::env; -use std::time::Duration as NativeDuration; use std::sync::Arc; +use std::time::Duration as NativeDuration; // FIXME: https://github.com/nash-io/openlimits/issues/157 // #[tokio::test] @@ -211,7 +211,7 @@ async fn get_order_history() { async fn test_concurrent_requests() { let exchange = init().await; let client = Arc::new(exchange); - async fn make_request(client: Arc, i: u64){ + async fn make_request(client: Arc, i: u64) { let req = client.get_account_balances(None).await; if !req.is_err() { println!("completed req {}", i); From 11dcbdc26366fe36fab5f599cd22ee9d148e7473 Mon Sep 17 00:00:00 2001 From: Marvin Schubert Date: Mon, 7 Dec 2020 20:24:21 +0100 Subject: [PATCH 05/11] introduce order_status param to order-history-request --- src/errors.rs | 2 ++ src/model/mod.rs | 1 + src/nash/mod.rs | 69 +++++++++++++++++++++++++++++++++++------------- 3 files changed, 53 insertions(+), 19 deletions(-) diff --git a/src/errors.rs b/src/errors.rs index f15a8197..cf55cd62 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -97,4 +97,6 @@ pub enum OpenLimitsError { NotParsableResponse(String), #[error("")] MissingParameter(String), + #[error("")] + InvalidParameter(String), } diff --git a/src/model/mod.rs b/src/model/mod.rs index 9d9b7fe3..60adb618 100644 --- a/src/model/mod.rs +++ b/src/model/mod.rs @@ -157,6 +157,7 @@ pub struct CancelAllOrdersRequest { #[derive(Serialize, Deserialize, Clone, Constructor, Debug)] pub struct GetOrderHistoryRequest { pub market_pair: Option, + pub order_status: Option>, pub paginator: Option, } diff --git a/src/nash/mod.rs b/src/nash/mod.rs index f99ca429..6d0b6836 100644 --- a/src/nash/mod.rs +++ b/src/nash/mod.rs @@ -113,7 +113,7 @@ impl Exchange for Nash { #[async_trait] impl ExchangeMarketData for Nash { async fn get_historic_rates(&self, req: &GetHistoricRatesRequest) -> Result> { - let req: nash_protocol::protocol::list_candles::ListCandlesRequest = req.into(); + let req: nash_protocol::protocol::list_candles::ListCandlesRequest = req.try_into()?; let resp = self.transport.run(req).await; @@ -448,7 +448,7 @@ impl TryFrom<&TradeHistoryRequest> { type Error = OpenLimitsError; fn try_from(req: &TradeHistoryRequest) -> crate::shared::Result { - let (before, limit, range) = try_split_paginator(req.paginator.clone()); + let (before, limit, range) = try_split_paginator(req.paginator.clone())?; Ok(Self { market: req.market_pair.clone(), @@ -512,11 +512,14 @@ impl From for Liquidity { } } -impl From<&GetHistoricRatesRequest> for nash_protocol::protocol::list_candles::ListCandlesRequest { - fn from(req: &GetHistoricRatesRequest) -> Self { - let (before, limit, range) = try_split_paginator(req.paginator.clone()); +impl TryFrom<&GetHistoricRatesRequest> + for nash_protocol::protocol::list_candles::ListCandlesRequest +{ + type Error = OpenLimitError; + fn try_from(req: &GetHistoricRatesRequest) -> crate::shared::Result { + let (before, limit, range) = try_split_paginator(req.paginator.clone())?; - Self { + Ok(Self { market: req.market_pair.clone(), chronological: None, before, @@ -527,25 +530,30 @@ impl From<&GetHistoricRatesRequest> for nash_protocol::protocol::list_candles::L ), limit, range, - } + }) } } fn try_split_paginator( paginator: Option, -) -> ( +) -> crate::shared::Result<( Option, Option, Option, -) { - match paginator { +)> { + Ok(match paginator { Some(paginator) => ( paginator.before, - paginator - .limit - .map(|v| i64::try_from(v).expect("Couldn't convert u64 to i64.")), + match paginator.limit { + Some(v) => Some(i64::try_from(v).map_err(|_| { + OpenLimitError::InvalidParameter( + "Couldn't convert paginator limit to i64".to_string(), + ) + })?), + None => None, + }, if paginator.start_time.is_some() && paginator.end_time.is_some() { - Some(DateTimeRange { + Some(nash_protocol::types::DateTimeRange { start: paginator.start_time.map(timestamp_to_utc_datetime).unwrap(), stop: paginator.end_time.map(timestamp_to_utc_datetime).unwrap(), }) @@ -554,7 +562,7 @@ fn try_split_paginator( }, ), None => (None, None, None), - } + }) } impl TryFrom<&GetHistoricTradesRequest> @@ -563,7 +571,7 @@ impl TryFrom<&GetHistoricTradesRequest> type Error = OpenLimitsError; fn try_from(req: &GetHistoricTradesRequest) -> crate::shared::Result { let market = req.market_pair.clone(); - let (before, limit, _) = try_split_paginator(req.paginator.clone()); + let (before, limit, _) = try_split_paginator(req.paginator.clone())?; //FIXME: Some issues with the graphql protocol for the market to be non nil Ok(Self { market, @@ -624,7 +632,7 @@ impl TryFrom<&GetOrderHistoryRequest> { type Error = OpenLimitsError; fn try_from(req: &GetOrderHistoryRequest) -> crate::shared::Result { - let (before, limit, range) = try_split_paginator(req.paginator.clone()); + let (before, limit, range) = try_split_paginator(req.paginator.clone())?; Ok(Self { market: req.market_pair.clone(), @@ -633,7 +641,14 @@ impl TryFrom<&GetOrderHistoryRequest> range, buy_or_sell: None, order_type: None, - status: None, + status: match req.order_status.clone() { + Some(v) => Some( + v.into_iter() + .map(TryInto::try_into) + .collect::>>()?, + ), + None => None, + }, }) } } @@ -677,6 +692,23 @@ impl From for OrderStatus { } } +impl TryFrom for nash_protocol::types::OrderStatus { + type Error = OpenLimitError; + fn try_from(status: OrderStatus) -> crate::shared::Result { + Ok(match status { + OrderStatus::Filled => nash_protocol::types::OrderStatus::Filled, + OrderStatus::Open => nash_protocol::types::OrderStatus::Open, + OrderStatus::Canceled => nash_protocol::types::OrderStatus::Canceled, + OrderStatus::Pending => nash_protocol::types::OrderStatus::Pending, + _ => { + return Err(OpenLimitError::InvalidParameter( + "Had invalid order status for Nash".to_string(), + )) + } + }) + } +} + impl From<&GetPriceTickerRequest> for nash_protocol::protocol::get_ticker::TickerRequest { fn from(req: &GetPriceTickerRequest) -> Self { let market = req.market_pair.clone(); @@ -730,7 +762,6 @@ use nash_protocol::protocol::{ subscriptions::{SubscriptionRequest, SubscriptionResponse}, ResponseOrError, }; -use nash_protocol::types::DateTimeRange; use std::{pin::Pin, task::Context, task::Poll}; use tokio::time::Duration; From e17991732106ad311229ea71641b3a453c89c0bd Mon Sep 17 00:00:00 2001 From: Marvin Schubert Date: Thu, 10 Dec 2020 13:11:33 +0100 Subject: [PATCH 06/11] remove some old code --- src/nash/mod.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/nash/mod.rs b/src/nash/mod.rs index 6d0b6836..c168bfff 100644 --- a/src/nash/mod.rs +++ b/src/nash/mod.rs @@ -64,6 +64,7 @@ impl Clone for NashParameters { } async fn client_from_params_failable(params: NashParameters) -> Result { + let timeout = std::time::Duration::from_millis(params.timeout); let out = match params.credentials { Some(credentials) => { Client::from_key_data( @@ -466,12 +467,13 @@ impl From for Trade { let price = Decimal::from_str(&resp.limit_price.to_string()) .expect("Couldn't parse Decimal from string."); - let fees = match resp.account_side { - nash_protocol::types::AccountTradeSide::Taker => { + let (fees, order_id) = match resp.account_side { + nash_protocol::types::AccountTradeSide::Taker => ( Decimal::from_str(&resp.taker_fee.to_string()) - .expect("Couldn't parse Decimal from string.") - } - _ => Decimal::from(0), + .expect("Couldn't parse Decimal from string."), + resp.taker_order_id.clone(), + ), + _ => (Decimal::from(0), resp.maker_order_id.clone()), }; let (buyer_order_id, seller_order_id) = match resp.direction { From b5a3eb3ac81ab3f70e5570ce2ff13d0e4ae3efc4 Mon Sep 17 00:00:00 2001 From: Marvin Schubert Date: Mon, 21 Dec 2020 13:38:43 +0100 Subject: [PATCH 07/11] prepare for tokio 1.0 --- Cargo.toml | 20 ++++++++++---------- src/nash/mod.rs | 3 +-- tests/apis/binance/websocket.rs | 18 +++++++++--------- tests/apis/coinbase/websocket.rs | 2 +- tests/binance/account.rs | 1 + tests/binance/ws_callbacks.rs | 4 ++-- tests/binance/ws_streams.rs | 4 ++-- tests/coinbase/account.rs | 1 + tests/nash/account.rs | 1 + tests/nash/websocket.rs | 4 ++-- 10 files changed, 30 insertions(+), 28 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 187b3e58..499c99db 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,24 +19,24 @@ python = ["pyo3"] [dependencies] async-trait = "0.1" -base64 = "0.12.3" -chrono = { version = "0.4.11", features = ["std", "serde"] } -dotenv = "0.15.0" -futures = "0.3.5" -futures-util = "0.3.8" +base64 = "0.13" +chrono = { version = "0.4", features = ["std", "serde"] } +dotenv = "0.15" +futures = "0.3" +futures-util = "0.3" hex = "0.4.2" hmac = "0.8.1" log = "0.4.8" reqwest = { version = "0.10", features = ["json", "blocking"] } rust_decimal = "1.7.0" sugar = "0.2.0" -serde = { version = "1.0.114", features = ["derive"] } -serde_json = "1.0.55" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" serde_urlencoded = "0.6.1" thiserror = "1.0.20" -tokio = { version = "0.2", features = ["full"] } -tokio-tungstenite = { version = "0.10.1", features = ["tls"] } -tungstenite = "0.11.0" +tokio = { version = "0.3", features = ["full"] } +tokio-tungstenite = { version = "0.12", features = ["tls"] } +tungstenite = "0.11.1" sha2 = "0.9.1" url = "2.1.1" derive_more = "0.99" diff --git a/src/nash/mod.rs b/src/nash/mod.rs index c168bfff..50d83ebe 100644 --- a/src/nash/mod.rs +++ b/src/nash/mod.rs @@ -64,7 +64,6 @@ impl Clone for NashParameters { } async fn client_from_params_failable(params: NashParameters) -> Result { - let timeout = std::time::Duration::from_millis(params.timeout); let out = match params.credentials { Some(credentials) => { Client::from_key_data( @@ -467,7 +466,7 @@ impl From for Trade { let price = Decimal::from_str(&resp.limit_price.to_string()) .expect("Couldn't parse Decimal from string."); - let (fees, order_id) = match resp.account_side { + let (fees, _order_id) = match resp.account_side { nash_protocol::types::AccountTradeSide::Taker => ( Decimal::from_str(&resp.taker_fee.to_string()) .expect("Couldn't parse Decimal from string."), diff --git a/tests/apis/binance/websocket.rs b/tests/apis/binance/websocket.rs index 0e513231..b376f092 100644 --- a/tests/apis/binance/websocket.rs +++ b/tests/apis/binance/websocket.rs @@ -21,63 +21,63 @@ async fn test_subscription_callback(websocket: BinanceWebsocket, sub: BinanceSub rx.recv().expect("Couldn't receive sync message."); } -#[tokio::test(core_threads = 2)] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn aggregate_trade() { let websocket = init().await; let sub = BinanceSubscription::AggregateTrade("bnbbtc".to_string()); test_subscription_callback(websocket, sub).await; } -#[tokio::test(core_threads = 2)] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn candlestick() { let websocket = init().await; let sub = BinanceSubscription::Candlestick("bnbbtc".to_string(), "1m".to_string()); test_subscription_callback(websocket, sub).await; } -#[tokio::test(core_threads = 2)] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn depth() { let websocket = init().await; let sub = BinanceSubscription::Depth("bnbbtc".to_string(), Some(1)); test_subscription_callback(websocket, sub).await; } -#[tokio::test(core_threads = 2)] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn mini_ticker() { let websocket = init().await; let sub = BinanceSubscription::MiniTicker("bnbbtc".to_string()); test_subscription_callback(websocket, sub).await; } -#[tokio::test(core_threads = 2)] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn mini_ticker_all() { let websocket = init().await; let sub = BinanceSubscription::MiniTickerAll; test_subscription_callback(websocket, sub).await; } -#[tokio::test(core_threads = 2)] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn order_book() { let websocket = init().await; let sub = BinanceSubscription::OrderBook("bnbbtc".to_string(), 10); test_subscription_callback(websocket, sub).await; } -#[tokio::test(core_threads = 2)] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn ticker() { let websocket = init().await; let sub = BinanceSubscription::Ticker("bnbbtc".to_string()); test_subscription_callback(websocket, sub).await; } -#[tokio::test(core_threads = 2)] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn ticker_all() { let websocket = init().await; let sub = BinanceSubscription::TickerAll; test_subscription_callback(websocket, sub).await; } -#[tokio::test(core_threads = 2)] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn trade() { let websocket = init().await; let sub = BinanceSubscription::Trade("bnbbtc".to_string()); diff --git a/tests/apis/coinbase/websocket.rs b/tests/apis/coinbase/websocket.rs index f6b35aa4..1b2581a3 100644 --- a/tests/apis/coinbase/websocket.rs +++ b/tests/apis/coinbase/websocket.rs @@ -42,7 +42,7 @@ async fn test_subscription_callback( .expect("Couldn't receive sync message."); } -#[tokio::test(core_threads = 2)] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn order_book() { let websocket = init().await; let sub = CoinbaseSubscription::Level2("BTC-USD".to_string()); diff --git a/tests/binance/account.rs b/tests/binance/account.rs index 7921573a..1b51e770 100644 --- a/tests/binance/account.rs +++ b/tests/binance/account.rs @@ -160,6 +160,7 @@ async fn get_order_history() { let exchange = init().await; let req = GetOrderHistoryRequest { market_pair: Some(String::from("BNBBTC")), + order_status: None, paginator: None, }; diff --git a/tests/binance/ws_callbacks.rs b/tests/binance/ws_callbacks.rs index 7cfbbbd9..d5c341b9 100644 --- a/tests/binance/ws_callbacks.rs +++ b/tests/binance/ws_callbacks.rs @@ -20,14 +20,14 @@ async fn test_subscription_callback(websocket: OpenLimitsWs, s rx.recv().expect("Couldn't receive sync message."); } -#[tokio::test(core_threads = 2)] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn orderbook() { let ws = init().await; let sub = Subscription::OrderBookUpdates("bnbbtc".to_string()); test_subscription_callback(ws, sub).await; } -#[tokio::test(core_threads = 2)] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn trades() { let ws = init().await; let sub = Subscription::Trades("btcusdt".to_string()); diff --git a/tests/binance/ws_streams.rs b/tests/binance/ws_streams.rs index 3f6ae385..dc033a0f 100644 --- a/tests/binance/ws_streams.rs +++ b/tests/binance/ws_streams.rs @@ -5,7 +5,7 @@ use openlimits::{ model::websocket::Subscription, }; -#[tokio::test(core_threads = 2)] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn orderbook() { let ws = init().await; let s = ws @@ -17,7 +17,7 @@ async fn orderbook() { print!("{:?}", ob); } -#[tokio::test(core_threads = 2)] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn trades() { let ws = init().await; let s = ws diff --git a/tests/coinbase/account.rs b/tests/coinbase/account.rs index a0681d3f..fa90b48d 100644 --- a/tests/coinbase/account.rs +++ b/tests/coinbase/account.rs @@ -153,6 +153,7 @@ async fn get_order_history() { let exchange = init().await; let req = GetOrderHistoryRequest { market_pair: Some(String::from("ETH-BTC")), + order_status: None, paginator: None, }; diff --git a/tests/nash/account.rs b/tests/nash/account.rs index 5dcaa654..411a8c8b 100644 --- a/tests/nash/account.rs +++ b/tests/nash/account.rs @@ -197,6 +197,7 @@ async fn get_order_history() { let exchange = init().await; let req = GetOrderHistoryRequest { market_pair: Some(String::from("eth_btc")), + order_status: None, paginator: None, }; diff --git a/tests/nash/websocket.rs b/tests/nash/websocket.rs index 9ddc59c9..e386e9cb 100644 --- a/tests/nash/websocket.rs +++ b/tests/nash/websocket.rs @@ -18,14 +18,14 @@ async fn test_subscription_callback(websocket: OpenLimitsWs, sub: rx.recv().expect("Couldn't receive sync message."); } -#[tokio::test(core_threads = 2)] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn orderbook() { let client = init().await; let sub = Subscription::OrderBookUpdates("btc_usdc".to_string()); test_subscription_callback(client, sub).await; } -#[tokio::test(core_threads = 2)] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn trades() { let client = init().await; let sub = Subscription::Trades("btc_usdc".to_string()); From a288b1148edeabc4ccc660c82a69220499c98178 Mon Sep 17 00:00:00 2001 From: Marvin Schubert Date: Tue, 19 Jan 2021 23:12:44 +0100 Subject: [PATCH 08/11] fixes after rebase --- src/coinbase/prelude.rs | 2 +- src/nash/mod.rs | 8 ++++---- tests/any_exchange/reconnection.rs | 6 +++--- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/coinbase/prelude.rs b/src/coinbase/prelude.rs index d6d188df..ff66a0db 100644 --- a/src/coinbase/prelude.rs +++ b/src/coinbase/prelude.rs @@ -1 +1 @@ -pub type Result = std::result::Result; \ No newline at end of file +pub type Result = std::result::Result; \ No newline at end of file diff --git a/src/nash/mod.rs b/src/nash/mod.rs index 50d83ebe..5bc385e7 100644 --- a/src/nash/mod.rs +++ b/src/nash/mod.rs @@ -516,7 +516,7 @@ impl From for Liquidity { impl TryFrom<&GetHistoricRatesRequest> for nash_protocol::protocol::list_candles::ListCandlesRequest { - type Error = OpenLimitError; + type Error = OpenLimitsError; fn try_from(req: &GetHistoricRatesRequest) -> crate::shared::Result { let (before, limit, range) = try_split_paginator(req.paginator.clone())?; @@ -547,7 +547,7 @@ fn try_split_paginator( paginator.before, match paginator.limit { Some(v) => Some(i64::try_from(v).map_err(|_| { - OpenLimitError::InvalidParameter( + OpenLimitsError::InvalidParameter( "Couldn't convert paginator limit to i64".to_string(), ) })?), @@ -694,7 +694,7 @@ impl From for OrderStatus { } impl TryFrom for nash_protocol::types::OrderStatus { - type Error = OpenLimitError; + type Error = OpenLimitsError; fn try_from(status: OrderStatus) -> crate::shared::Result { Ok(match status { OrderStatus::Filled => nash_protocol::types::OrderStatus::Filled, @@ -702,7 +702,7 @@ impl TryFrom for nash_protocol::types::OrderStatus { OrderStatus::Canceled => nash_protocol::types::OrderStatus::Canceled, OrderStatus::Pending => nash_protocol::types::OrderStatus::Pending, _ => { - return Err(OpenLimitError::InvalidParameter( + return Err(OpenLimitsError::InvalidParameter( "Had invalid order status for Nash".to_string(), )) } diff --git a/tests/any_exchange/reconnection.rs b/tests/any_exchange/reconnection.rs index 64d7ba2c..e66ab588 100644 --- a/tests/any_exchange/reconnection.rs +++ b/tests/any_exchange/reconnection.rs @@ -43,21 +43,21 @@ async fn test_subscription_callback( .expect("Couldn't receive sync."); } -#[tokio::test(core_threads = 2)] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn coinbase() { let client = init_coinbase().await; let sub = Subscription::OrderBookUpdates("BTC-USD".to_string()); test_subscription_callback(client.expect("Couldn't create client."), sub).await; } -#[tokio::test(core_threads = 2)] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn nash() { let client = init_nash().await; let sub = Subscription::OrderBookUpdates("btc_usdc".to_string()); test_subscription_callback(client.expect("Couldn't create client."), sub).await; } -#[tokio::test(core_threads = 2)] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn binance() { let client = init_binance().await; let sub = Subscription::OrderBookUpdates("bnbbtc".to_string()); From a455ee9675f5f848a8fdbd64bf00bbf5defae2ee Mon Sep 17 00:00:00 2001 From: Marvin Schubert Date: Wed, 20 Jan 2021 01:33:25 +0100 Subject: [PATCH 09/11] nash-rust update --- src/model/python.rs | 1 + src/nash/mod.rs | 77 +++++++----------------------- tests/any_exchange/reconnection.rs | 2 +- tests/any_exchange/websocket.rs | 21 +------- tests/nash/account.rs | 2 +- tests/nash/market.rs | 2 +- tests/nash/websocket.rs | 23 +++++---- 7 files changed, 37 insertions(+), 91 deletions(-) diff --git a/src/model/python.rs b/src/model/python.rs index 161666c2..626717c2 100644 --- a/src/model/python.rs +++ b/src/model/python.rs @@ -207,6 +207,7 @@ impl<'a> FromPyObject<'a> for NashParameters { "sign states loop interval not included in nash credentials", ))? .extract()?; + let sign_states_loop_interval = sign_states_loop_interval.map(Duration::from_millis); Ok(NashParameters { affiliate_code, credentials, diff --git a/src/nash/mod.rs b/src/nash/mod.rs index 5bc385e7..45e0fd22 100644 --- a/src/nash/mod.rs +++ b/src/nash/mod.rs @@ -21,8 +21,7 @@ use crate::{ }; use async_trait::async_trait; use chrono::Utc; -pub use nash_native_client::ws_client::client::Client; -pub use nash_native_client::ws_client::client::Environment; +pub use nash_native_client::{Client, Environment}; use rust_decimal::prelude::*; use std::convert::{TryFrom, TryInto}; @@ -43,7 +42,7 @@ pub struct NashParameters { pub client_id: u64, pub environment: Environment, pub timeout: Duration, - pub sign_states_loop_interval: Option, + pub sign_states_loop_interval: Option, } impl Clone for NashParameters { @@ -64,33 +63,35 @@ impl Clone for NashParameters { } async fn client_from_params_failable(params: NashParameters) -> Result { - let out = match params.credentials { + let client = match params.credentials { Some(credentials) => { - Client::from_key_data( + Client::from_keys( &credentials.secret, &credentials.session, params.affiliate_code, params.client_id, params.environment, params.timeout, - params.sign_states_loop_interval, ) - .await + .await? } None => { - Client::new( + Client::from_keys_path( None, params.client_id, None, params.environment, params.timeout, - params.sign_states_loop_interval, ) - .await + .await? } }; - Ok(out.map_err(OpenLimitsError::NashProtocolError)?) + if let Some(interval) = params.sign_states_loop_interval { + client.start_background_state_signing(interval); + } + + Ok(client) } #[async_trait] @@ -159,13 +160,13 @@ impl ExchangeMarketData for Nash { impl ExchangeAccount for Nash { async fn cancel_all_orders(&self, req: &CancelAllOrdersRequest) -> Result> { let req: nash_protocol::protocol::cancel_all_orders::CancelAllOrders = req.into(); - self.transport.run(req).await?; + self.transport.run_http(req).await?; Ok(vec![]) } async fn cancel_order(&self, req: &CancelOrderRequest) -> Result { let req: nash_protocol::protocol::cancel_order::CancelOrderRequest = req.into(); - let resp = self.transport.run(req).await; + let resp = self.transport.run_http(req).await; Ok( Nash::unwrap_response::( resp, @@ -267,7 +268,7 @@ impl ExchangeAccount for Nash { let req: nash_protocol::protocol::place_order::LimitOrderRequest = Nash::convert_limit_order(req, nash_protocol::types::BuyOrSell::Buy); - let resp = self.transport.run(req).await; + let resp = self.transport.run_http(req).await; Ok( Nash::unwrap_response::( @@ -280,7 +281,7 @@ impl ExchangeAccount for Nash { async fn limit_sell(&self, req: &OpenLimitOrderRequest) -> Result { let req: nash_protocol::protocol::place_order::LimitOrderRequest = Nash::convert_limit_order(req, nash_protocol::types::BuyOrSell::Sell); - let resp = self.transport.run(req).await; + let resp = self.transport.run_http(req).await; Ok( Nash::unwrap_response::( @@ -294,7 +295,7 @@ impl ExchangeAccount for Nash { let req: nash_protocol::protocol::place_order::MarketOrderRequest = Nash::convert_market_request(req); - let resp = self.transport.run(req).await; + let resp = self.transport.run_http(req).await; Ok( Nash::unwrap_response::( resp, @@ -770,50 +771,6 @@ pub struct NashWebsocket { pub client: Client, } -impl NashWebsocket { - pub async fn public( - client_id: u64, - environment: Environment, - timeout: Duration, - sign_states_loop_interval: Option, - ) -> Self { - NashWebsocket { - client: Client::new( - None, - client_id, - None, - environment, - timeout, - sign_states_loop_interval, - ) - .await - .expect("Couldn't create Client."), - } - } - - pub async fn with_credential( - secret: &str, - session: &str, - client_id: u64, - environment: Environment, - timeout: Duration, - sign_states_loop_interval: Option, - ) -> Result { - Client::from_key_data( - secret, - session, - None, - client_id, - environment, - timeout, - sign_states_loop_interval, - ) - .await - .map(|client| NashWebsocket { client }) - .map_err(OpenLimitsError::NashProtocolError) - } -} - impl Stream for NashWebsocket { type Item = std::result::Result< ResponseOrError, diff --git a/tests/any_exchange/reconnection.rs b/tests/any_exchange/reconnection.rs index e66ab588..86bec35a 100644 --- a/tests/any_exchange/reconnection.rs +++ b/tests/any_exchange/reconnection.rs @@ -1,4 +1,4 @@ -use nash_native_client::ws_client::client::Environment; +use nash_native_client::Environment; use openlimits::shared::Result; use openlimits::{model::websocket::Subscription, nash::NashWebsocket}; use tokio::time::Duration; diff --git a/tests/any_exchange/websocket.rs b/tests/any_exchange/websocket.rs index 170a135e..db7d42bd 100644 --- a/tests/any_exchange/websocket.rs +++ b/tests/any_exchange/websocket.rs @@ -3,14 +3,14 @@ // supported exchanges. use dotenv::dotenv; -use nash_native_client::ws_client::client::Environment; +use nash_native_client::Environment; use openlimits::any_exchange::{AnyExchange, AnyWsExchange}; use openlimits::binance::{Binance, BinanceCredentials, BinanceParameters}; use openlimits::coinbase::client::websocket::CoinbaseWebsocket; use openlimits::coinbase::{Coinbase, CoinbaseCredentials, CoinbaseParameters}; use openlimits::exchange::OpenLimits; use openlimits::exchange_ws::OpenLimitsWs; -use openlimits::nash::{Nash, NashCredentials, NashParameters, NashWebsocket}; +use openlimits::nash::{Nash, NashCredentials, NashParameters}; use openlimits::shared::Result; use std::env; use tokio::time::Duration; @@ -68,23 +68,6 @@ async fn init() -> Result { coinbase().await.map(|exchange| exchange.into()) } -async fn _nash_websocket() -> OpenLimitsWs { - dotenv().ok(); - - let websocket = NashWebsocket::with_credential( - &env::var("NASH_API_SECRET").unwrap(), - &env::var("NASH_API_KEY").unwrap(), - 1234, - Environment::Sandbox, - Duration::from_secs_f32(10.0), - None, - ) - .await - .expect("Couldn't connect."); - - OpenLimitsWs { websocket } -} - async fn coinbase_websocket() -> OpenLimitsWs { dotenv().ok(); diff --git a/tests/nash/account.rs b/tests/nash/account.rs index 411a8c8b..59d98670 100644 --- a/tests/nash/account.rs +++ b/tests/nash/account.rs @@ -1,6 +1,6 @@ use chrono::Duration; use dotenv::dotenv; -use nash_native_client::ws_client::client::Environment; +use nash_native_client::Environment; use openlimits::{ exchange::{ExchangeAccount, OpenLimits}, model::OpenMarketOrderRequest, diff --git a/tests/nash/market.rs b/tests/nash/market.rs index 86a0bfd6..19a5a86a 100644 --- a/tests/nash/market.rs +++ b/tests/nash/market.rs @@ -1,4 +1,4 @@ -use nash_native_client::ws_client::client::Environment; +use nash_native_client::Environment; use openlimits::{ exchange::{ExchangeMarketData, OpenLimits}, exchange_info::ExchangeInfoRetrieval, diff --git a/tests/nash/websocket.rs b/tests/nash/websocket.rs index e386e9cb..ba445852 100644 --- a/tests/nash/websocket.rs +++ b/tests/nash/websocket.rs @@ -1,8 +1,10 @@ use dotenv::dotenv; -use nash_native_client::ws_client::client::Environment; +use nash_native_client::Environment; use openlimits::{exchange_ws::OpenLimitsWs, model::websocket::Subscription, nash::NashWebsocket}; use std::{env, sync::mpsc::sync_channel}; use tokio::time::Duration; +use openlimits::nash::{NashParameters, NashCredentials}; +use openlimits::exchange_ws::ExchangeWs; async fn test_subscription_callback(websocket: OpenLimitsWs, sub: Subscription) { let (tx, rx) = sync_channel(0); @@ -35,14 +37,17 @@ async fn trades() { async fn init() -> OpenLimitsWs { dotenv().ok(); - let websocket = NashWebsocket::with_credential( - &env::var("NASH_API_SECRET").expect("Couldn't get environment variable."), - &env::var("NASH_API_KEY").expect("Couldn't get environment variable."), - 1234, - Environment::Sandbox, - Duration::new(10, 0), - None, - ) + let websocket = NashWebsocket::new(NashParameters { + credentials: Some(NashCredentials { + secret: env::var("NASH_API_SECRET").expect("Couldn't get environment variable."), + session: env::var("NASH_API_KEY").expect("Couldn't get environment variable."), + }), + affiliate_code: None, + client_id: 1234, + environment: Environment::Sandbox, + timeout: Duration::from_secs(10), + sign_states_loop_interval: None, + }) .await .expect("Couldn't connect."); From 55a21b7f8d2b1ac33e7f53fc24fc9a0db3e4c4e1 Mon Sep 17 00:00:00 2001 From: Marvin Schubert Date: Wed, 20 Jan 2021 13:03:06 +0100 Subject: [PATCH 10/11] upgrade to tokio 1.0 --- Cargo.toml | 9 +++++---- src/nash/mod.rs | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 499c99db..97b87a13 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,16 +27,17 @@ futures-util = "0.3" hex = "0.4.2" hmac = "0.8.1" log = "0.4.8" -reqwest = { version = "0.10", features = ["json", "blocking"] } +reqwest = { version = "0.11", features = ["json", "blocking"] } rust_decimal = "1.7.0" sugar = "0.2.0" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_urlencoded = "0.6.1" thiserror = "1.0.20" -tokio = { version = "0.3", features = ["full"] } -tokio-tungstenite = { version = "0.12", features = ["tls"] } -tungstenite = "0.11.1" +tokio = { version = "1.0", features = ["full"] } +tokio-stream = "0.1" +tokio-tungstenite = { version = "0.13", features = ["tls"] } +tungstenite = "0.12" sha2 = "0.9.1" url = "2.1.1" derive_more = "0.99" diff --git a/src/nash/mod.rs b/src/nash/mod.rs index 45e0fd22..cb87ad72 100644 --- a/src/nash/mod.rs +++ b/src/nash/mod.rs @@ -807,7 +807,7 @@ impl ExchangeWs for NashWebsocket { for subscription in subscriptions.into_iter() { let stream = self.client.subscribe_protocol(subscription).await?; - streams.push(stream); + streams.push(tokio_stream::wrappers::UnboundedReceiverStream::new(stream)); } let s = streams.map(|message| match message { From d646d85d484d79ddba6b11cc04d9a4f9c5cc3e55 Mon Sep 17 00:00:00 2001 From: Marvin Schubert Date: Thu, 21 Jan 2021 23:46:07 +0100 Subject: [PATCH 11/11] PR fixes --- Cargo.toml | 2 +- src/coinbase/prelude.rs | 2 +- src/nash/mod.rs | 11 +++++------ tests/nash/account.rs | 19 ------------------- 4 files changed, 7 insertions(+), 27 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 97b87a13..64a6b8ea 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "openlimits" -version = "0.1.7" +version = "0.1.6" authors = ["steffel <2143646+steffenix@users.noreply.github.com>", "Ethan Fast "] edition = "2018" description = "A open source Rust high performance cryptocurrency trading API with support for multiple exchanges and language wrappers. Focused in safety, correctness and speed." diff --git a/src/coinbase/prelude.rs b/src/coinbase/prelude.rs index ff66a0db..9ea37cc5 100644 --- a/src/coinbase/prelude.rs +++ b/src/coinbase/prelude.rs @@ -1 +1 @@ -pub type Result = std::result::Result; \ No newline at end of file +pub type Result = std::result::Result; diff --git a/src/nash/mod.rs b/src/nash/mod.rs index cb87ad72..f4ba830f 100644 --- a/src/nash/mod.rs +++ b/src/nash/mod.rs @@ -467,13 +467,12 @@ impl From for Trade { let price = Decimal::from_str(&resp.limit_price.to_string()) .expect("Couldn't parse Decimal from string."); - let (fees, _order_id) = match resp.account_side { - nash_protocol::types::AccountTradeSide::Taker => ( + let fees = match resp.account_side { + nash_protocol::types::AccountTradeSide::Taker => { Decimal::from_str(&resp.taker_fee.to_string()) - .expect("Couldn't parse Decimal from string."), - resp.taker_order_id.clone(), - ), - _ => (Decimal::from(0), resp.maker_order_id.clone()), + .expect("Couldn't parse Decimal from string.") + } + _ => Decimal::from(0), }; let (buyer_order_id, seller_order_id) = match resp.direction { diff --git a/tests/nash/account.rs b/tests/nash/account.rs index 59d98670..e9d21d9d 100644 --- a/tests/nash/account.rs +++ b/tests/nash/account.rs @@ -14,7 +14,6 @@ use openlimits::{ }; use rust_decimal::prelude::{Decimal, FromStr}; use std::env; -use std::sync::Arc; use std::time::Duration as NativeDuration; // FIXME: https://github.com/nash-io/openlimits/issues/157 @@ -197,7 +196,6 @@ async fn get_order_history() { let exchange = init().await; let req = GetOrderHistoryRequest { market_pair: Some(String::from("eth_btc")), - order_status: None, paginator: None, }; @@ -208,23 +206,6 @@ async fn get_order_history() { println!("{:?}", resp); } -#[tokio::test] -async fn test_concurrent_requests() { - let exchange = init().await; - let client = Arc::new(exchange); - async fn make_request(client: Arc, i: u64) { - let req = client.get_account_balances(None).await; - if !req.is_err() { - println!("completed req {}", i); - } - } - let mut tasks = Vec::new(); - for i in 0..10 { - tasks.push(tokio::spawn(make_request(client.clone(), i))); - } - futures::future::join_all(tasks).await; -} - // #[tokio::test] // async fn get_all_open_orders() { // let mut exchange = init().await;