From ec152a76d42fc95112f488882f095c853be34a95 Mon Sep 17 00:00:00 2001 From: Steven Date: Fri, 3 Sep 2021 16:42:30 +0800 Subject: [PATCH] refactor: replace orignal `restapi` with `openapi` (#305) * Migrates Open API code to REST API. * Fixes the fmt issue. --- Cargo.toml | 4 - src/bin/openapi.rs | 97 ------ src/bin/restapi.rs | 109 +++--- src/lib.rs | 1 - src/openapi/manage.rs | 100 ------ src/openapi/mod.rs | 5 - src/openapi/personal_history.rs | 198 ----------- src/openapi/public_history.rs | 106 ------ src/openapi/tradingview.rs | 601 -------------------------------- src/openapi/user.rs | 33 -- src/restapi/manage.rs | 49 +-- src/restapi/personal_history.rs | 46 +-- src/restapi/public_history.rs | 40 ++- src/restapi/tradingview.rs | 100 +++--- src/restapi/user.rs | 14 +- 15 files changed, 193 insertions(+), 1310 deletions(-) delete mode 100644 src/bin/openapi.rs delete mode 100644 src/openapi/manage.rs delete mode 100644 src/openapi/mod.rs delete mode 100644 src/openapi/personal_history.rs delete mode 100644 src/openapi/public_history.rs delete mode 100644 src/openapi/tradingview.rs delete mode 100644 src/openapi/user.rs diff --git a/Cargo.toml b/Cargo.toml index 361ea2e2..5ebba245 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,10 +44,6 @@ tracing-appender = "0.1" tracing-subscriber = "0.2" ttl_cache = "0.5.1" -[[bin]] -name = "openapi" -path = "src/bin/openapi.rs" - [[bin]] name = "restapi" path = "src/bin/restapi.rs" diff --git a/src/bin/openapi.rs b/src/bin/openapi.rs deleted file mode 100644 index f8813c01..00000000 --- a/src/bin/openapi.rs +++ /dev/null @@ -1,97 +0,0 @@ -use actix_web::{App, HttpServer}; -use dingir_exchange::openapi::manage::market; -use dingir_exchange::openapi::personal_history::my_internal_txs; -use dingir_exchange::openapi::public_history::{order_trades, recent_trades}; -use dingir_exchange::openapi::tradingview::{chart_config, history, search_symbols, symbols, ticker, unix_timestamp}; -use dingir_exchange::openapi::user::get_user; -use dingir_exchange::restapi::state::{AppCache, AppState}; -use fluidex_common::non_blocking_tracing; -use paperclip::actix::web::{self, HttpResponse}; -use paperclip::actix::{api_v2_operation, OpenApiExt}; -use sqlx::postgres::Postgres; -use sqlx::Pool; -use std::collections::HashMap; -use std::convert::TryFrom; -use std::sync::Mutex; - -#[actix_web::main] -async fn main() -> std::io::Result<()> { - dotenv::dotenv().ok(); - let _guard = non_blocking_tracing::setup(); - - let db_url = dingir_exchange::config::Settings::new().db_history; - log::debug!("Prepared DB connection: {}", &db_url); - - let config = dingir_exchange::restapi::config::Settings::default(); - let manage_channel = if let Some(ep_str) = &config.manage_endpoint { - log::info!("Connect to manage channel {}", ep_str); - Some( - tonic::transport::Endpoint::try_from(ep_str.clone()) - .ok() - .unwrap() - .connect() - .await - .unwrap(), - ) - } else { - None - }; - - let user_map = web::Data::new(AppState { - user_addr_map: Mutex::new(HashMap::new()), - manage_channel, - db: Pool::::connect(&db_url).await.unwrap(), - config, - }); - - let workers = user_map.config.workers; - - let server = HttpServer::new(move || { - App::new() - .app_data(user_map.clone()) - .app_data(AppCache::new()) - .wrap_api() - .service( - web::scope("/openapi") - .route("/ping", web::get().to(ping)) - .route("/user/{l1addr_or_l2pubkey}", web::get().to(get_user)) - .route("/recenttrades/{market}", web::get().to(recent_trades)) - .route("/ordertrades/{market}/{order_id}", web::get().to(order_trades)) - .route("/internal_txs/{user_id}", web::get().to(my_internal_txs)) - .route("/ticker_{ticker_inv}/{market}", web::get().to(ticker)) - .service( - web::scope("/tradingview") - .route("/time", web::get().to(unix_timestamp)) - .route("/config", web::get().to(chart_config)) - .route("/search", web::get().to(search_symbols)) - .route("/symbols", web::get().to(symbols)) - .route("/history", web::get().to(history)), - ) - .service(if user_map.manage_channel.is_some() { - web::scope("/manage").service( - web::scope("/market") - .route("/reload", web::post().to(market::reload)) - .route("/tradepairs", web::post().to(market::add_pair)) - .route("/assets", web::post().to(market::add_assets)), - ) - } else { - web::scope("/manage") - .service(web::resource("/").to(|| HttpResponse::Forbidden().body(String::from("No manage endpoint")))) - }), - ) - .with_json_spec_at("/api/spec") - .build() - }); - - let server = match workers { - Some(wr) => server.workers(wr), - None => server, - }; - - server.bind("0.0.0.0:50054")?.run().await -} - -#[api_v2_operation] -async fn ping() -> Result<&'static str, actix_web::Error> { - Ok("pong") -} diff --git a/src/bin/restapi.rs b/src/bin/restapi.rs index 23e7de45..69f979ba 100644 --- a/src/bin/restapi.rs +++ b/src/bin/restapi.rs @@ -1,41 +1,28 @@ -#![allow(dead_code)] -#![allow(clippy::collapsible_if)] -#![allow(clippy::let_and_return)] -#![allow(clippy::too_many_arguments)] -#![allow(clippy::single_char_pattern)] - -use actix_web::{web, App, HttpRequest, HttpResponse, HttpServer, Responder}; +use actix_web::{App, HttpServer}; +use dingir_exchange::restapi::manage::market; +use dingir_exchange::restapi::personal_history::{my_internal_txs, my_orders}; +use dingir_exchange::restapi::public_history::{order_trades, recent_trades}; +use dingir_exchange::restapi::state::{AppCache, AppState}; +use dingir_exchange::restapi::tradingview::{chart_config, history, search_symbols, symbols, ticker, unix_timestamp}; +use dingir_exchange::restapi::user::get_user; use fluidex_common::non_blocking_tracing; +use paperclip::actix::web::{self, HttpResponse}; +use paperclip::actix::{api_v2_operation, OpenApiExt}; use sqlx::postgres::Postgres; use sqlx::Pool; use std::collections::HashMap; use std::convert::TryFrom; use std::sync::Mutex; -use dingir_exchange::restapi; - -use restapi::manage::market; -use restapi::personal_history::{my_internal_txs, my_orders}; -use restapi::public_history::{order_trades, recent_trades}; -use restapi::state::{AppCache, AppState}; -use restapi::tradingview::{chart_config, history, search_symbols, symbols, ticker, unix_timestamp}; -use restapi::user::get_user; - -async fn ping(_req: HttpRequest, _data: web::Data) -> impl Responder { - "pong" -} - #[actix_web::main] async fn main() -> std::io::Result<()> { dotenv::dotenv().ok(); let _guard = non_blocking_tracing::setup(); - // TODO: Should add another `config/restapi.yaml` for this target? - let dburl = dingir_exchange::config::Settings::new().db_history; - log::debug!("Prepared db connection: {}", &dburl); - - let config = restapi::config::Settings::default(); + let db_url = dingir_exchange::config::Settings::new().db_history; + log::debug!("Prepared DB connection: {}", &db_url); + let config = dingir_exchange::restapi::config::Settings::default(); let manage_channel = if let Some(ep_str) = &config.manage_endpoint { log::info!("Connect to manage channel {}", ep_str); Some( @@ -53,47 +40,59 @@ async fn main() -> std::io::Result<()> { let user_map = web::Data::new(AppState { user_addr_map: Mutex::new(HashMap::new()), manage_channel, - db: Pool::::connect(&dburl).await.unwrap(), + db: Pool::::connect(&db_url).await.unwrap(), config, }); let workers = user_map.config.workers; let server = HttpServer::new(move || { - App::new().app_data(user_map.clone()).app_data(AppCache::new()).service( - web::scope("/restapi") - .route("/ping", web::get().to(ping)) - .route("/user/{l1addr_or_l2pubkey}", web::get().to(get_user)) - .route("/recenttrades/{market}", web::get().to(recent_trades)) - .route("/ordertrades/{market}/{order_id}", web::get().to(order_trades)) - .route("/closedorders/{market}/{user_id}", web::get().to(my_orders)) - .route("/internal_txs/{user_id}", web::get().to(my_internal_txs)) - .route("/ticker_{ticker_inv}/{market}", web::get().to(ticker)) - .service( - web::scope("/tradingview") - .route("/time", web::get().to(unix_timestamp)) - .route("/config", web::get().to(chart_config)) - .route("/search", web::get().to(search_symbols)) - .route("/symbols", web::get().to(symbols)) - .route("/history", web::get().to(history)), - ) - .service(if user_map.manage_channel.is_some() { - web::scope("/manage").service( - web::scope("/market") - .route("/reload", web::post().to(market::reload)) - .route("/tradepairs", web::post().to(market::add_pair)) - .route("/assets", web::post().to(market::add_assets)), + App::new() + .app_data(user_map.clone()) + .app_data(AppCache::new()) + .wrap_api() + .service( + web::scope("/restapi") + .route("/ping", web::get().to(ping)) + .route("/user/{l1addr_or_l2pubkey}", web::get().to(get_user)) + .route("/recenttrades/{market}", web::get().to(recent_trades)) + .route("/ordertrades/{market}/{order_id}", web::get().to(order_trades)) + .route("/closedorders/{market}/{user_id}", web::get().to(my_orders)) + .route("/internal_txs/{user_id}", web::get().to(my_internal_txs)) + .route("/ticker_{ticker_inv}/{market}", web::get().to(ticker)) + .service( + web::scope("/tradingview") + .route("/time", web::get().to(unix_timestamp)) + .route("/config", web::get().to(chart_config)) + .route("/search", web::get().to(search_symbols)) + .route("/symbols", web::get().to(symbols)) + .route("/history", web::get().to(history)), ) - } else { - web::scope("/manage") - .service(web::resource("/").to(|| HttpResponse::Forbidden().body(String::from("No manage endpoint")))) - }), - ) + .service(if user_map.manage_channel.is_some() { + web::scope("/manage").service( + web::scope("/market") + .route("/reload", web::post().to(market::reload)) + .route("/tradepairs", web::post().to(market::add_pair)) + .route("/assets", web::post().to(market::add_assets)), + ) + } else { + web::scope("/manage") + .service(web::resource("/").to(|| HttpResponse::Forbidden().body(String::from("No manage endpoint")))) + }), + ) + .with_json_spec_at("/api/spec") + .build() }); let server = match workers { Some(wr) => server.workers(wr), None => server, }; - server.bind(("0.0.0.0", 50053))?.run().await + + server.bind("0.0.0.0:50053")?.run().await +} + +#[api_v2_operation] +async fn ping() -> Result<&'static str, actix_web::Error> { + Ok("pong") } diff --git a/src/lib.rs b/src/lib.rs index fc2a210c..a66eb58b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,7 +10,6 @@ pub mod storage; pub use storage::{database, models, sqlxextend}; pub mod config; pub mod message; -pub mod openapi; pub mod restapi; pub mod types; pub mod utils; diff --git a/src/openapi/manage.rs b/src/openapi/manage.rs deleted file mode 100644 index 3c75e18c..00000000 --- a/src/openapi/manage.rs +++ /dev/null @@ -1,100 +0,0 @@ -use crate::restapi::{state, types}; -use crate::storage; -use actix_web::error::InternalError; -use actix_web::http::StatusCode; -use futures::future::OptionFuture; -use orchestra::rpc::exchange::*; -use paperclip::actix::api_v2_operation; -use paperclip::actix::web; - -pub mod market { - use super::*; - - async fn do_reload(app_state: &state::AppState) -> Result<&'static str, actix_web::Error> { - let mut rpc_cli = matchengine_client::MatchengineClient::new(app_state.manage_channel.as_ref().unwrap().clone()); - - if let Err(e) = rpc_cli.reload_markets(ReloadMarketsRequest { from_scratch: false }).await { - return Err(InternalError::new(e.to_string(), StatusCode::INTERNAL_SERVER_ERROR).into()); - } - - Ok("done") - } - - #[api_v2_operation] - pub async fn add_assets( - req: web::Json, - app_state: web::Data, - ) -> Result<&'static str, actix_web::Error> { - let assets_req = req.into_inner(); - - for asset in &assets_req.assets { - log::debug!("Add asset {:?}", asset); - if let Err(e) = storage::config::persist_asset_to_db(&app_state.db, asset, false).await { - return Err(InternalError::new(e.to_string(), StatusCode::INTERNAL_SERVER_ERROR).into()); - } - } - - if !assets_req.not_reload { - do_reload(&app_state.into_inner()).await - } else { - Ok("done") - } - } - - #[api_v2_operation] - pub async fn reload(app_state: web::Data) -> Result<&'static str, actix_web::Error> { - do_reload(&app_state.into_inner()).await - } - - #[api_v2_operation] - pub async fn add_pair( - req: web::Json, - app_state: web::Data, - ) -> Result<&'static str, actix_web::Error> { - let trade_pair = req.into_inner(); - - if let Some(asset) = trade_pair.asset_base.as_ref() { - if asset.id != trade_pair.market.base { - return Err(InternalError::new("Base asset not match".to_owned(), StatusCode::BAD_REQUEST).into()); - } - } - - if let Some(asset) = trade_pair.asset_quote.as_ref() { - if asset.id != trade_pair.market.quote { - return Err(InternalError::new("Quote asset not match".to_owned(), StatusCode::BAD_REQUEST).into()); - } - } - - if let Some(Err(e)) = OptionFuture::from( - trade_pair - .asset_base - .as_ref() - .map(|base_asset| storage::config::persist_asset_to_db(&app_state.db, base_asset, false)), - ) - .await - { - return Err(InternalError::new(e.to_string(), StatusCode::INTERNAL_SERVER_ERROR).into()); - } - - if let Some(Err(e)) = OptionFuture::from( - trade_pair - .asset_quote - .as_ref() - .map(|quote_asset| storage::config::persist_asset_to_db(&app_state.db, quote_asset, false)), - ) - .await - { - return Err(InternalError::new(e.to_string(), StatusCode::INTERNAL_SERVER_ERROR).into()); - } - - if let Err(e) = storage::config::persist_market_to_db(&app_state.db, &trade_pair.market).await { - return Err(InternalError::new(e.to_string(), StatusCode::INTERNAL_SERVER_ERROR).into()); - } - - if !trade_pair.not_reload { - do_reload(&app_state.into_inner()).await - } else { - Ok("done") - } - } -} diff --git a/src/openapi/mod.rs b/src/openapi/mod.rs deleted file mode 100644 index 1f953b35..00000000 --- a/src/openapi/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -pub mod manage; -pub mod personal_history; -pub mod public_history; -pub mod tradingview; -pub mod user; diff --git a/src/openapi/personal_history.rs b/src/openapi/personal_history.rs deleted file mode 100644 index 17a39df7..00000000 --- a/src/openapi/personal_history.rs +++ /dev/null @@ -1,198 +0,0 @@ -use crate::models::tablenames::{ACCOUNT, INTERNALTX, ORDERHISTORY}; -use crate::models::{DecimalDbType, OrderHistory, TimestampDbType}; -use crate::restapi::errors::RpcError; -use crate::restapi::state::AppState; -use core::cmp::min; -use paperclip::actix::web::{self, HttpRequest, Json}; -use paperclip::actix::{api_v2_operation, Apiv2Schema}; -use serde::{Deserialize, Deserializer, Serialize}; - -#[derive(Serialize, Deserialize, Apiv2Schema)] -pub struct OrderResponse { - total: i64, - orders: Vec, -} - -#[api_v2_operation] -pub async fn my_orders(req: HttpRequest, data: web::Data) -> Result, actix_web::Error> { - let market = req.match_info().get("market").unwrap(); - let user_id = req.match_info().get("user_id").unwrap_or_default().parse::(); - let user_id = match user_id { - Err(_) => { - return Err(RpcError::bad_request("invalid user_id").into()); - } - _ => user_id.unwrap(), - }; - let qstring = qstring::QString::from(req.query_string()); - let limit = min(100, qstring.get("limit").unwrap_or_default().parse::().unwrap_or(20)); - let offset = qstring.get("offset").unwrap_or_default().parse::().unwrap_or(0); - - let table = ORDERHISTORY; - let condition = if market == "all" { - "user_id = $1".to_string() - } else { - "market = $1 and user_id = $2".to_string() - }; - let order_query = format!( - "select * from {} where {} order by id desc limit {} offset {}", - table, condition, limit, offset - ); - let orders: Vec = if market == "all" { - sqlx::query_as(&order_query).bind(user_id) - } else { - sqlx::query_as(&order_query).bind(market).bind(user_id) - } - .fetch_all(&data.db) - .await - .map_err(|err| actix_web::Error::from(RpcError::from(err)))?; - let count_query = format!("select count(*) from {} where {}", table, condition); - let total: i64 = if market == "all" { - sqlx::query_scalar(&count_query).bind(user_id) - } else { - sqlx::query_scalar(&count_query).bind(market).bind(user_id) - } - .fetch_one(&data.db) - .await - .map_err(|err| actix_web::Error::from(RpcError::from(err)))?; - Ok(Json(OrderResponse { total, orders })) -} - -#[derive(sqlx::FromRow, Serialize, Deserialize, Apiv2Schema)] -pub struct InternalTxResponse { - time: TimestampDbType, - user_from: String, - user_to: String, - asset: String, - amount: DecimalDbType, -} - -#[derive(Copy, Clone, Debug, Deserialize, Apiv2Schema)] -pub enum Order { - #[serde(rename = "lowercase")] - Asc, - #[serde(rename = "lowercase")] - Desc, -} - -impl Default for Order { - fn default() -> Self { - Self::Desc - } -} - -#[derive(Copy, Clone, Debug, Deserialize, Apiv2Schema)] -pub enum Side { - #[serde(rename = "lowercase")] - From, - #[serde(rename = "lowercase")] - To, - #[serde(rename = "lowercase")] - Both, -} - -impl Default for Side { - fn default() -> Self { - Self::Both - } -} - -#[derive(Debug, Deserialize, Apiv2Schema)] -pub struct InternalTxQuery { - /// limit with default value of 20 and max value of 100. - #[serde(default = "default_limit")] - limit: usize, - /// offset with default value of 0. - #[serde(default = "default_zero")] - offset: usize, - #[serde(default, deserialize_with = "u64_timestamp_deserializer")] - start_time: Option, - #[serde(default, deserialize_with = "u64_timestamp_deserializer")] - end_time: Option, - #[serde(default)] - order: Order, - #[serde(default)] - side: Side, -} - -fn u64_timestamp_deserializer<'de, D>(deserializer: D) -> Result, D::Error> -where - D: Deserializer<'de>, -{ - let timestamp = Option::::deserialize(deserializer)?; - Ok(timestamp.map(|ts| TimestampDbType::from_timestamp(ts as i64, 0))) -} - -const fn default_limit() -> usize { - 20 -} -const fn default_zero() -> usize { - 0 -} - -/// `/internal_txs/{user_id}` -#[api_v2_operation] -pub async fn my_internal_txs( - user_id: web::Path, - query: web::Query, - data: web::Data, -) -> Result>, actix_web::Error> { - let user_id = user_id.into_inner(); - let limit = min(query.limit, 100); - - let base_query: &'static str = const_format::formatcp!( - r#" -select i.time as time, - af.l2_pubkey as user_from, - at.l2_pubkey as user_to, - i.asset as asset, - i.amount as amount -from {} i -inner join {} af on af.id = i.user_from -inner join {} at on at.id = i.user_to -where "#, - INTERNALTX, - ACCOUNT, - ACCOUNT - ); - let (user_condition, args_n) = match query.side { - Side::From => ("i.user_from = $1", 1), - Side::To => ("i.user_to = $1", 1), - Side::Both => ("i.user_from = $1 or i.user_to = $2", 2), - }; - - let time_condition = match (query.start_time, query.end_time) { - (Some(_), Some(_)) => Some(format!("i.time >= ${} and i.time <= ${}", args_n + 1, args_n + 2)), - (Some(_), None) => Some(format!("i.time >= ${}", args_n + 1)), - (None, Some(_)) => Some(format!("i.time <= ${}", args_n + 1)), - (None, None) => None, - }; - - let condition = match time_condition { - Some(time_condition) => format!("({}) and {}", user_condition, time_condition), - None => user_condition.to_string(), - }; - - let constraint = format!("limit {} offset {}", limit, query.offset); - let sql_query = format!("{}{}{}", base_query, condition, constraint); - - let query_as = sqlx::query_as(sql_query.as_str()); - - let query_as = match query.side { - Side::To | Side::From => query_as.bind(user_id), - Side::Both => query_as.bind(user_id).bind(user_id), - }; - - let query_as = match (query.start_time, query.end_time) { - (Some(start_time), Some(end_time)) => query_as.bind(start_time).bind(end_time), - (Some(start_time), None) => query_as.bind(start_time), - (None, Some(end_time)) => query_as.bind(end_time), - (None, None) => query_as, - }; - - let txs: Vec = query_as - .fetch_all(&data.db) - .await - .map_err(|err| actix_web::Error::from(RpcError::from(err)))?; - - Ok(Json(txs)) -} diff --git a/src/openapi/public_history.rs b/src/openapi/public_history.rs deleted file mode 100644 index 353c4254..00000000 --- a/src/openapi/public_history.rs +++ /dev/null @@ -1,106 +0,0 @@ -use crate::models::tablenames::{MARKETTRADE, USERTRADE}; -use crate::models::{self, DecimalDbType, TimestampDbType}; -use crate::restapi::errors::RpcError; -use crate::restapi::state::AppState; -use crate::restapi::types; -use chrono::{DateTime, SecondsFormat, Utc}; -use core::cmp::min; -use paperclip::actix::api_v2_operation; -use paperclip::actix::web::{self, HttpRequest, Json}; - -fn check_market_exists(_market: &str) -> bool { - // TODO - true -} - -#[api_v2_operation] -pub async fn recent_trades(req: HttpRequest, data: web::Data) -> Result>, actix_web::Error> { - let market = req.match_info().get("market").unwrap(); - let qstring = qstring::QString::from(req.query_string()); - let limit = min(100, qstring.get("limit").unwrap_or_default().parse::().unwrap_or(20)); - log::debug!("recent_trades market {} limit {}", market, limit); - if !check_market_exists(market) { - return Err(RpcError::bad_request("invalid market").into()); - } - - // TODO: this API result should be cached, either in-memory or using redis - - // Here we use the kline trade table, which is more market-centric - // and more suitable for fetching latest trades on a market. - // models::UserTrade is designed for a user to fetch his trades. - - let sql_query = format!("select * from {} where market = $1 order by time desc limit {}", MARKETTRADE, limit); - - let trades: Vec = sqlx::query_as(&sql_query) - .bind(market) - .fetch_all(&data.db) - .await - .map_err(|err| actix_web::Error::from(RpcError::from(err)))?; - - log::debug!("query {} recent_trades records", trades.len()); - Ok(Json(trades)) -} - -#[derive(sqlx::FromRow, Debug, Clone)] -struct QueriedUserTrade { - pub time: TimestampDbType, - pub user_id: i32, - pub trade_id: i64, - pub order_id: i64, - pub price: DecimalDbType, - pub amount: DecimalDbType, - pub quote_amount: DecimalDbType, - pub fee: DecimalDbType, -} - -#[cfg(sqlxverf)] -fn sqlverf_ticker() -> impl std::any::Any { - sqlx::query_as!( - QueriedUserTrade, - "select time, user_id, trade_id, order_id, - price, amount, quote_amount, fee - from user_trade where market = $1 and order_id = $2 - order by trade_id, time asc", - "USDT_ETH", - 10000, - ) -} - -#[api_v2_operation] -pub async fn order_trades( - app_state: web::Data, - path: web::Path<(String, i64)>, -) -> Result, actix_web::Error> { - let (market_name, order_id): (String, i64) = path.into_inner(); - log::debug!("order_trades market {} order_id {}", market_name, order_id); - - let sql_query = format!( - " - select time, user_id, trade_id, order_id, - price, amount, quote_amount, fee - from {} where market = $1 and order_id = $2 - order by trade_id, time asc", - USERTRADE - ); - - let trades: Vec = sqlx::query_as(&sql_query) - .bind(market_name) - .bind(order_id) - .fetch_all(&app_state.db) - .await - .map_err(|err| actix_web::Error::from(RpcError::from(err)))?; - - Ok(Json(types::OrderTradeResult { - trades: trades - .into_iter() - .map(|v| types::MarketTrade { - trade_id: v.trade_id, - time: DateTime::::from_utc(v.time, Utc).to_rfc3339_opts(SecondsFormat::Secs, true), - amount: v.amount.to_string(), - quote_amount: v.quote_amount.to_string(), - price: v.price.to_string(), - fee: v.fee.to_string(), - }) - .collect(), - })) -} diff --git a/src/openapi/tradingview.rs b/src/openapi/tradingview.rs deleted file mode 100644 index 5a1d3b56..00000000 --- a/src/openapi/tradingview.rs +++ /dev/null @@ -1,601 +0,0 @@ -use crate::models::tablenames::{MARKET, MARKETTRADE}; -use crate::models::MarketDesc; -use crate::restapi::errors::RpcError; -use crate::restapi::types::{KlineReq, KlineResult, TickerResult}; -use crate::restapi::{mock, state}; -use actix_web::Responder; -use humantime::parse_duration; -use paperclip::actix::web::{self, HttpRequest, Json}; -use paperclip::actix::{api_v2_operation, Apiv2Schema}; -use serde::{Deserialize, Serialize}; -use serde_json::json; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; - -// All APIs here follow https://zlq4863947.gitbook.io/tradingview/3-shu-ju-bang-ding/udf - -#[api_v2_operation] -pub async fn unix_timestamp(_req: HttpRequest) -> impl Responder { - format!("{}", SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs()) -} - -static DEFAULT_EXCHANGE: &str = "test"; -static DEFAULT_SYMBOL: &str = "tradepair"; -static DEFAULT_SESSION: &str = "24x7"; - -#[api_v2_operation] -pub async fn chart_config(_req: HttpRequest) -> impl Responder { - log::debug!("request config"); - let value = json!({ - "supports_search": true, - "supports_group_request": false, - "supports_marks": false, - "supports_timescale_marks": false, - "supports_time": true, - "exchanges": [ - { - - "value": "test", - "name": "Test Zone", - "desc": "Current default exchange" - } - ], - "symbols_types": [], - "supported_resolutions": [1, 5, 15, 30, 60, 120, 240, 360, 720, 1440, 4320, 10080] // minutes - }); - value.to_string() -} - -#[derive(Deserialize, Apiv2Schema)] -pub struct SymbolQueryReq { - symbol: String, -} - -#[derive(Serialize, Apiv2Schema)] -pub struct Symbol { - name: String, - ticker: String, - #[serde(rename = "type")] - sym_type: String, - session: String, - exchange: String, - listed_exchange: String, - //TODO: we can use a enum - timezone: String, - minmov: u32, - pricescale: u32, - //TODO: this two field may has been deprecated - minmovement2: u32, - minmov2: u32, - #[serde(skip_serializing_if = "Option::is_none")] - minmove2: Option, - #[serde(skip_serializing_if = "Option::is_none")] - fractional: Option, - has_intraday: bool, - has_daily: bool, - has_weekly_and_monthly: bool, -} - -impl Default for Symbol { - fn default() -> Self { - Symbol { - name: String::default(), - ticker: String::default(), - sym_type: DEFAULT_SYMBOL.to_string(), - session: DEFAULT_SESSION.to_string(), - exchange: DEFAULT_EXCHANGE.to_string(), - listed_exchange: DEFAULT_EXCHANGE.to_string(), - timezone: "Etc/UTC".to_string(), - minmov: 1, - pricescale: 100, - minmovement2: 0, - minmov2: 0, - minmove2: None, - fractional: None, - has_intraday: true, - has_daily: true, - has_weekly_and_monthly: true, - } - } -} - -type SymNameAndTicker = String; -type FullName = String; - -/* - According to the symbology of chart [https://github.com/serdimoa/charting/blob/master/Symbology.md], - it may refer a symbol by EXCHANGE:SYMBOL format, to made things easy, our symbology always specify - a ticker which is identify with symbol name itself, which is the market_name used in matchingengine - and db record ({base asset}_{quote asset} or an specified name), to keep all API running correct, - and the user of chart libaray can use symbol() method to acquire current symbol name for their API -*/ -fn symbology(origin: &MarketDesc) -> (SymNameAndTicker, FullName) { - let s_name = format!("{}_{}", origin.base_asset, origin.quote_asset); - ( - origin.market_name.clone().unwrap_or_else(|| s_name.clone()), - origin - .market_name - .clone() - .map_or_else(|| s_name.clone(), |n| format!("{}({})", n, s_name)), - ) -} - -impl From for Symbol { - fn from(origin: MarketDesc) -> Self { - let (name, _) = symbology(&origin); - let pricescale = 10u32.pow(origin.min_amount.scale()); - //simply pick the lo part - let minmov = origin.min_amount.unpack().lo; - - Symbol { - name: name.clone(), - ticker: name, - sym_type: DEFAULT_SYMBOL.to_string(), - session: DEFAULT_SESSION.to_string(), - exchange: DEFAULT_EXCHANGE.to_string(), - listed_exchange: DEFAULT_EXCHANGE.to_string(), - timezone: "Etc/UTC".to_string(), - minmov, - pricescale, - minmovement2: 0, - minmov2: 0, - minmove2: None, - fractional: None, - has_intraday: true, - has_daily: true, - has_weekly_and_monthly: true, - } - } -} - -#[cfg(sqlxverf)] -fn sqlverf_symbol_resolve() -> impl std::any::Any { - ( - sqlx::query_as!( - MarketDesc, - "select * from market where - (base_asset = $1 AND quote_asset = $2) OR - (base_asset = $2 AND quote_asset = $1)", - "UNI", - "ETH" - ), - sqlx::query_as!(MarketDesc, "select * from market where market_name = $1", "Any spec name"), - ) -} - -//notice we use the standard symbology (EXCHANGE:SYMBOL) format while consider the exchange part may -//missed, so we return optional exchange part and the symbol part -fn resolve_canionical_symbol(symbol: &str) -> (Option<&str>, &str) { - match symbol.find(':') { - Some(pos) => (Some(symbol.get(..pos).unwrap()), symbol.get((pos + 1)..).unwrap()), - None => (None, symbol), - } -} - -#[cfg(test)] -#[test] -fn test_symbol_resolution() { - let (ex1, sym1) = resolve_canionical_symbol("test:USDT_ETH"); - assert_eq!(ex1.unwrap(), "test"); - assert_eq!(sym1, "USDT_ETH"); - - let (ex2, sym2) = resolve_canionical_symbol("BTC_ETH"); - assert_eq!(ex2, None); - assert_eq!(sym2, "BTC_ETH"); -} - -#[api_v2_operation] -pub async fn symbols( - symbol_req: web::Query, - app_state: web::Data, -) -> Result, actix_web::Error> { - let symbol = symbol_req.into_inner().symbol; - log::debug!("resolve symbol {:?}", symbol); - - //now we simply drop the exg part - let (_, rsymbol) = resolve_canionical_symbol(&symbol); - - let as_asset: Vec<&str> = rsymbol.split(&['-', '_'][..]).collect(); - let mut queried_market: Option = None; - //try asset first - if as_asset.len() == 2 { - log::debug!("query market from asset {}:{}", as_asset[0], as_asset[1]); - let symbol_query_1 = format!( - "select * from {} where - (base_asset = $1 AND quote_asset = $2) OR - (base_asset = $2 AND quote_asset = $1)", - MARKET - ); - queried_market = sqlx::query_as(&symbol_query_1) - .bind(as_asset[0]) - .bind(as_asset[1]) - .fetch_optional(&app_state.db) - .await - .map_err(TradeViewError::from)?; - } - - let queried_market = if let Some(queried_market) = queried_market { - queried_market - } else { - log::debug!("query market from name {}", rsymbol); - let symbol_query_2 = format!("select * from {} where market_name = $1", MARKET); - //TODO: would this returning correct? should we just - //response 404? - sqlx::query_as(&symbol_query_2) - .bind(&symbol) - .fetch_one(&app_state.db) - .await - .map_err(TradeViewError::from)? - }; - - Ok(Json(Symbol::from(queried_market))) - /* Ok(json!( - { - "name": "ETH_USDT", - "ticker": "ETH_USDT", - "description": "ETH_USDT", - "type": "btc", - "session": "24x7", - "exchange": "STOCK", - "listed_exchange": "STOCK", - "timezone": "Asia/Singapore", - "has_intraday": true, - "has_daily": true, - "has_weekly_and_monthly": true, - "pricescale": 10000, - "minmovement": 1, - "minmov": 1, - "minmovement2": 0, - "minmov2": 0 - } - ) - .to_string())*/ -} - -#[derive(Deserialize, Debug, Apiv2Schema)] -pub struct SymbolSearchQueryReq { - query: String, - #[serde(default, rename = "type")] - sym_type: Option, - #[serde(default)] - exchange: Option, - #[serde(default)] - limit: u32, -} - -#[derive(Serialize, Apiv2Schema)] -pub struct SymbolDesc { - symbol: String, - full_name: String, // e.g. BTCE:BTCUSD - description: String, - exchange: String, - ticker: String, - #[serde(rename = "type")] - sym_type: String, -} - -impl From for SymbolDesc { - fn from(origin: MarketDesc) -> Self { - let (name, full_name) = symbology(&origin); - - SymbolDesc { - symbol: name.clone(), - full_name: format!("{}:{}", DEFAULT_EXCHANGE, full_name), - description: String::default(), - sym_type: DEFAULT_SYMBOL.to_string(), - ticker: name, - exchange: DEFAULT_EXCHANGE.to_string(), - } - } -} - -#[cfg(sqlxverf)] -fn sqlverf_symbol_search() -> impl std::any::Any { - sqlx::query_as!( - MarketDesc, - "select * from market where base_asset = $1 OR quote_asset = $1 OR market_name = $1", - "UNI" - ) -} - -#[api_v2_operation] -pub async fn search_symbols( - symbol_search_req: web::Query, - app_state: web::Data, -) -> Result>, actix_web::Error> { - let symbol_query = symbol_search_req.into_inner(); - log::debug!("search symbol {:?}", symbol_query); - - //query should not contain exchange part? - let (_, rsymbol) = resolve_canionical_symbol(&symbol_query.query); - - let as_asset: Vec<&str> = rsymbol.split(&['-', '_'][..]).collect(); - let limit_query = if symbol_query.limit == 0 { - "".to_string() - } else { - format!(" limit {}", symbol_query.limit) - }; - //use different query type - //try asset first - - let ret: Vec = if as_asset.len() == 2 { - log::debug!("query symbol as trade pair {}:{}", as_asset[0], as_asset[1]); - let symbol_query_1 = format!( - "select * from {} where (base_asset = $1 AND quote_asset = $2) OR - (base_asset = $2 AND quote_asset = $1){}", - MARKET, limit_query - ); - sqlx::query_as(&symbol_query_1) - .bind(as_asset[0]) - .bind(as_asset[1]) - .fetch_all(&app_state.db) - .await - .map_err(TradeViewError::from)? - } else { - log::debug!("query symbol as name {}", rsymbol); - let symbol_query_2 = format!( - "select * from {} where base_asset = $1 OR quote_asset = $1 OR market_name = $1{}", - MARKET, limit_query - ); - sqlx::query_as(&symbol_query_2) - .bind(rsymbol) - .fetch_all(&app_state.db) - .await - .map_err(TradeViewError::from)? - }; - - Ok(Json(ret.into_iter().map(From::from).collect())) -} - -use chrono::{self, DurationRound}; -use fluidex_common::rust_decimal::{prelude::*, Decimal}; -use futures::TryStreamExt; -use sqlx::types::chrono::{DateTime, NaiveDateTime, Utc}; - -#[derive(sqlx::FromRow, Debug, Clone)] -struct TickerItem { - first: Option, - last: Option, - max: Option, - min: Option, - sum: Option, - quote_sum: Option, -} - -#[derive(Serialize, Deserialize)] -struct TickerInv(#[serde(with = "humantime_serde")] Duration); - -#[cfg(sqlxverf)] -fn sqlverf_ticker() -> impl std::any::Any { - sqlx::query_as!( - TickerItem, - "select first(price, time), last(price, time), max(price), min(price), - sum(amount), sum(quote_amount) as quote_sum from market_trade where market = $1 and time > $2", - "USDT_ETH", - NaiveDateTime::from_timestamp(100_000_000, 0) - ) -} - -#[api_v2_operation] -pub async fn ticker( - req: HttpRequest, - path: web::Path<(String, String)>, - app_state: web::Data, -) -> Result, actix_web::Error> { - let (ticker_inv, market_name) = path.into_inner(); - let ticker_inv = parse_duration(&ticker_inv).unwrap(); - - let cache = req.app_data::().expect("App cache not found"); - let now_ts: DateTime = SystemTime::now().into(); - let update_inv = app_state.config.trading.ticker_update_interval; - let ticker_ret_cache = &mut cache.trading.borrow_mut().ticker_ret_cache; - - if let Some(cached_resp) = ticker_ret_cache.get(&market_name) { - //consider systemtime may wraparound, we set the valid - //range of cache is [-inv, +inv] on now - let now_ts_dur = Duration::from_secs(now_ts.timestamp() as u64); - let cached_now = Duration::from_secs(cached_resp.to); - log::debug!( - "cache judge {}, {}, {}", - cached_now.as_secs(), - update_inv.as_secs(), - now_ts_dur.as_secs() - ); - if cached_now + update_inv > now_ts_dur && now_ts_dur > cached_now - update_inv { - log::debug!("use cached response"); - return Ok(Json(cached_resp.clone())); - } - } - - let ticker_inv = if ticker_inv > app_state.config.trading.ticker_interval { - app_state.config.trading.ticker_interval - } else { - ticker_inv - }; - - let update_inv = chrono::Duration::from_std(update_inv).map_err(|e| RpcError::unknown(&e.to_string()))?; - let ticker_inv = chrono::Duration::from_std(ticker_inv).map_err(|e| RpcError::unknown(&e.to_string()))?; - let now_ts = now_ts.duration_trunc(update_inv).map_err(|e| RpcError::unknown(&e.to_string()))?; - - let core_query = format!( - "select first(price, time), last(price, time), max(price), min(price), - sum(amount), sum(quote_amount) as quote_sum from {} where market = $1 and time > $2", - MARKETTRADE - ); - - let from_ts = now_ts - .checked_sub_signed(ticker_inv) - .ok_or_else(|| RpcError::unknown("Internal clock error"))?; - log::debug!("query ticker from {} to {}", from_ts, now_ts); - - let ticker_ret: TickerItem = sqlx::query_as(&core_query) - .bind(&market_name) - .bind(from_ts.naive_utc()) - .fetch_one(&app_state.db) - .await - .map_err(TradeViewError::from)?; - - let ret = TickerResult { - market: market_name.clone(), - change: match ticker_ret.last { - Some(lst) => ticker_ret - .first - .and_then(|fst| lst.checked_sub(fst)) - .and_then(|r1| r1.checked_div(lst)) - .as_ref() - .and_then(Decimal::to_f32) - .unwrap_or(9999.9), - None => 0.0, - }, - last: ticker_ret.last.as_ref().and_then(Decimal::to_f32).unwrap_or(0.0), - high: ticker_ret.max.as_ref().and_then(Decimal::to_f32).unwrap_or(0.0), - low: ticker_ret.min.as_ref().and_then(Decimal::to_f32).unwrap_or(0.0), - volume: ticker_ret.sum.as_ref().and_then(Decimal::to_f32).unwrap_or(0.0), - quote_volume: ticker_ret.quote_sum.as_ref().and_then(Decimal::to_f32).unwrap_or(0.0), - from: from_ts.timestamp() as u64, - to: now_ts.timestamp() as u64, - }; - - //update cache - ticker_ret_cache.insert(market_name, ret.clone()); - Ok(Json(ret)) -} - -#[derive(sqlx::FromRow, Debug, Clone)] -struct KlineItem { - ts: Option, - first: Option, - last: Option, - max: Option, - min: Option, - sum: Option, -} - -#[derive(Serialize, Clone, Debug)] -pub struct TradeViewError(RpcError); - -impl From for TradeViewError -where - T: Into, -{ - fn from(original: T) -> TradeViewError { - TradeViewError(Into::into(original)) - } -} - -use actix_web::{http::StatusCode, HttpResponse}; - -impl std::fmt::Display for TradeViewError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.0.fmt(f) - } -} - -impl actix_web::error::ResponseError for TradeViewError { - fn status_code(&self) -> StatusCode { - StatusCode::OK - } - - fn error_response(&self) -> HttpResponse { - // all http response are 200. we handle the error inside json - HttpResponse::build(StatusCode::OK).json(json!( - { - "s": "error", - "errmsg": &self.0.message, - })) - } -} - -#[cfg(sqlxverf)] -use std::convert::TryFrom; - -#[cfg(sqlxverf)] -fn sqlverf_history() -> impl std::any::Any { - sqlx::query_as!( - KlineItem, - "select time_bucket($1, time) as ts, first(price, time), - last(price, time), max(price), min(price), sum(amount) from market_trade - where market = $2 and time > $3 and time < $4 - group by ts order by ts asc", - sqlx::postgres::types::PgInterval::try_from(std::time::Duration::new(3600, 0)).unwrap(), - "ETH_USDT", - NaiveDateTime::from_timestamp(100_000_000, 0), - NaiveDateTime::from_timestamp(100_000_000, 0), - ) -} - -#[api_v2_operation] -pub async fn history(req_origin: HttpRequest, app_state: web::Data) -> Result, actix_web::Error> { - let req: web::Query = web::Query::from_query(req_origin.query_string()).map_err(TradeViewError::from)?; - let req = req.into_inner(); - log::debug!("kline req {:?}", req); - - if req.usemock.is_some() { - log::debug!("Use mock mode"); - return Ok(Json(mock::fake_kline_result(&req))); - } - - let core_query = format!( - "select time_bucket($1, time) as ts, first(price, time), - last(price, time), max(price), min(price), sum(amount) from {} - where market = $2 and time > $3 and time < $4 - group by ts order by ts asc", - MARKETTRADE - ); - - let mut query_rows = sqlx::query_as::<_, KlineItem>(&core_query) - .bind(std::time::Duration::new(req.resolution as u64 * 60, 0)) // TODO: remove this magic number - .bind(&req.symbol) - .bind(NaiveDateTime::from_timestamp(req.from as i64, 0)) - .bind(NaiveDateTime::from_timestamp(req.to as i64, 0)) - .fetch(&app_state.db); - - let mut out_t: Vec = Vec::new(); - let mut out_c: Vec = Vec::new(); - let mut out_o: Vec = Vec::new(); - let mut out_h: Vec = Vec::new(); - let mut out_l: Vec = Vec::new(); - let mut out_v: Vec = Vec::new(); - - while let Some(item) = query_rows.try_next().await.map_err(TradeViewError::from)? { - out_t.push(item.ts.as_ref().map(NaiveDateTime::timestamp).unwrap_or(0) as i32); - out_c.push(item.last.as_ref().and_then(Decimal::to_f32).unwrap_or(0.0)); - out_o.push(item.first.as_ref().and_then(Decimal::to_f32).unwrap_or(0.0)); - out_h.push(item.max.as_ref().and_then(Decimal::to_f32).unwrap_or(0.0)); - out_l.push(item.min.as_ref().and_then(Decimal::to_f32).unwrap_or(0.0)); - out_v.push(item.sum.as_ref().and_then(Decimal::to_f32).unwrap_or(0.0)); - } - - log::debug!("Query {} results", out_t.len()); - - if out_t.is_empty() { - let next_query = format!("select time from {} where time < $1 order by time desc limit 1", MARKETTRADE); - let nxt = sqlx::query_scalar(&next_query) - .bind(NaiveDateTime::from_timestamp(req.from as i64, 0)) - .fetch_optional(&app_state.db) - .await - .map_err(TradeViewError::from)? - .map(|x: NaiveDateTime| x.timestamp() as i32); - - return Ok(Json(KlineResult { - s: String::from("no_data"), - t: out_t, - c: out_c, - o: out_o, - h: out_h, - l: out_l, - v: out_v, - nxt, - })); - } - - Ok(Json(KlineResult { - s: String::from("ok"), - t: out_t, - c: out_c, - o: out_o, - h: out_h, - l: out_l, - v: out_v, - nxt: None, - })) -} diff --git a/src/openapi/user.rs b/src/openapi/user.rs deleted file mode 100644 index 9a810a19..00000000 --- a/src/openapi/user.rs +++ /dev/null @@ -1,33 +0,0 @@ -use crate::models::{tablenames::ACCOUNT, AccountDesc}; -use crate::restapi::errors::RpcError; -use crate::restapi::state::AppState; -use paperclip::actix::api_v2_operation; -use paperclip::actix::web::{self, HttpRequest, Json}; - -#[api_v2_operation] -pub async fn get_user(req: HttpRequest, data: web::Data) -> Result, actix_web::Error> { - let user_id: &str = req.match_info().get("l1addr_or_l2pubkey").unwrap(); - let mut user_map = data.user_addr_map.lock().unwrap(); - if user_map.contains_key(user_id) { - let user_info = &*user_map.get(user_id).unwrap(); - return Ok(Json(user_info.clone())); - } - - let sql_query = format!("select * from {} where l1_address = $1 OR l2_pubkey = $1", ACCOUNT); - let user: AccountDesc = sqlx::query_as(&sql_query).bind(user_id).fetch_one(&data.db).await.map_err(|e| { - log::error!("{:?}", e); - RpcError::bad_request("invalid user id or address") - })?; - - // update cache - user_map.insert( - user.l1_address.clone(), - AccountDesc { - id: user.id, - l1_address: user.l1_address.clone(), - l2_pubkey: user.l2_pubkey.clone(), - }, - ); - - Ok(Json(user)) -} diff --git a/src/restapi/manage.rs b/src/restapi/manage.rs index 16b197ba..3c75e18c 100644 --- a/src/restapi/manage.rs +++ b/src/restapi/manage.rs @@ -1,58 +1,67 @@ -use actix_web::{http, web, Responder}; -//use web::Json; +use crate::restapi::{state, types}; +use crate::storage; +use actix_web::error::InternalError; +use actix_web::http::StatusCode; use futures::future::OptionFuture; use orchestra::rpc::exchange::*; - -use super::{state, types}; -use crate::storage; +use paperclip::actix::api_v2_operation; +use paperclip::actix::web; pub mod market { - use super::*; - async fn do_reload(app_state: &state::AppState) -> (String, http::StatusCode) { + async fn do_reload(app_state: &state::AppState) -> Result<&'static str, actix_web::Error> { let mut rpc_cli = matchengine_client::MatchengineClient::new(app_state.manage_channel.as_ref().unwrap().clone()); if let Err(e) = rpc_cli.reload_markets(ReloadMarketsRequest { from_scratch: false }).await { - return (e.to_string(), http::StatusCode::INTERNAL_SERVER_ERROR); + return Err(InternalError::new(e.to_string(), StatusCode::INTERNAL_SERVER_ERROR).into()); } - (String::from("done"), http::StatusCode::OK) + Ok("done") } - pub async fn add_assets(req: web::Json, app_state: web::Data) -> impl Responder { + #[api_v2_operation] + pub async fn add_assets( + req: web::Json, + app_state: web::Data, + ) -> Result<&'static str, actix_web::Error> { let assets_req = req.into_inner(); for asset in &assets_req.assets { log::debug!("Add asset {:?}", asset); if let Err(e) = storage::config::persist_asset_to_db(&app_state.db, asset, false).await { - return (e.to_string(), http::StatusCode::INTERNAL_SERVER_ERROR); + return Err(InternalError::new(e.to_string(), StatusCode::INTERNAL_SERVER_ERROR).into()); } } if !assets_req.not_reload { do_reload(&app_state.into_inner()).await } else { - (String::from("done"), http::StatusCode::OK) + Ok("done") } } - pub async fn reload(app_state: web::Data) -> impl Responder { + #[api_v2_operation] + pub async fn reload(app_state: web::Data) -> Result<&'static str, actix_web::Error> { do_reload(&app_state.into_inner()).await } - pub async fn add_pair(req: web::Json, app_state: web::Data) -> impl Responder { + #[api_v2_operation] + pub async fn add_pair( + req: web::Json, + app_state: web::Data, + ) -> Result<&'static str, actix_web::Error> { let trade_pair = req.into_inner(); if let Some(asset) = trade_pair.asset_base.as_ref() { if asset.id != trade_pair.market.base { - return (String::from("Base asset not match"), http::StatusCode::BAD_REQUEST); + return Err(InternalError::new("Base asset not match".to_owned(), StatusCode::BAD_REQUEST).into()); } } if let Some(asset) = trade_pair.asset_quote.as_ref() { if asset.id != trade_pair.market.quote { - return (String::from("Quote asset not match"), http::StatusCode::BAD_REQUEST); + return Err(InternalError::new("Quote asset not match".to_owned(), StatusCode::BAD_REQUEST).into()); } } @@ -64,7 +73,7 @@ pub mod market { ) .await { - return (e.to_string(), http::StatusCode::INTERNAL_SERVER_ERROR); + return Err(InternalError::new(e.to_string(), StatusCode::INTERNAL_SERVER_ERROR).into()); } if let Some(Err(e)) = OptionFuture::from( @@ -75,17 +84,17 @@ pub mod market { ) .await { - return (e.to_string(), http::StatusCode::INTERNAL_SERVER_ERROR); + return Err(InternalError::new(e.to_string(), StatusCode::INTERNAL_SERVER_ERROR).into()); } if let Err(e) = storage::config::persist_market_to_db(&app_state.db, &trade_pair.market).await { - return (e.to_string(), http::StatusCode::INTERNAL_SERVER_ERROR); + return Err(InternalError::new(e.to_string(), StatusCode::INTERNAL_SERVER_ERROR).into()); } if !trade_pair.not_reload { do_reload(&app_state.into_inner()).await } else { - (String::from("done"), http::StatusCode::OK) + Ok("done") } } } diff --git a/src/restapi/personal_history.rs b/src/restapi/personal_history.rs index a9ab8429..17a39df7 100644 --- a/src/restapi/personal_history.rs +++ b/src/restapi/personal_history.rs @@ -1,29 +1,25 @@ -use actix_web::{ - web::{self, Json}, - HttpRequest, -}; +use crate::models::tablenames::{ACCOUNT, INTERNALTX, ORDERHISTORY}; +use crate::models::{DecimalDbType, OrderHistory, TimestampDbType}; +use crate::restapi::errors::RpcError; +use crate::restapi::state::AppState; use core::cmp::min; +use paperclip::actix::web::{self, HttpRequest, Json}; +use paperclip::actix::{api_v2_operation, Apiv2Schema}; use serde::{Deserialize, Deserializer, Serialize}; -use crate::models::{ - tablenames::{ACCOUNT, INTERNALTX, ORDERHISTORY}, - DecimalDbType, OrderHistory, TimestampDbType, -}; - -use super::{errors::RpcError, state::AppState}; - -#[derive(Serialize)] +#[derive(Serialize, Deserialize, Apiv2Schema)] pub struct OrderResponse { total: i64, orders: Vec, } -pub async fn my_orders(req: HttpRequest, data: web::Data) -> Result, RpcError> { +#[api_v2_operation] +pub async fn my_orders(req: HttpRequest, data: web::Data) -> Result, actix_web::Error> { let market = req.match_info().get("market").unwrap(); let user_id = req.match_info().get("user_id").unwrap_or_default().parse::(); let user_id = match user_id { Err(_) => { - return Err(RpcError::bad_request("invalid user_id")); + return Err(RpcError::bad_request("invalid user_id").into()); } _ => user_id.unwrap(), }; @@ -47,7 +43,8 @@ pub async fn my_orders(req: HttpRequest, data: web::Data) -> Result) -> Result usize { } /// `/internal_txs/{user_id}` +#[api_v2_operation] pub async fn my_internal_txs( user_id: web::Path, query: web::Query, data: web::Data, -) -> Result>, RpcError> { +) -> Result>, actix_web::Error> { let user_id = user_id.into_inner(); let limit = min(query.limit, 100); @@ -190,7 +189,10 @@ where "#, (None, None) => query_as, }; - let txs: Vec = query_as.fetch_all(&data.db).await?; + let txs: Vec = query_as + .fetch_all(&data.db) + .await + .map_err(|err| actix_web::Error::from(RpcError::from(err)))?; Ok(Json(txs)) } diff --git a/src/restapi/public_history.rs b/src/restapi/public_history.rs index 4b9932a4..353c4254 100644 --- a/src/restapi/public_history.rs +++ b/src/restapi/public_history.rs @@ -1,28 +1,26 @@ -use actix_web::{web, HttpRequest, Responder}; - -use actix_web::web::Json; - -use crate::models::{ - self, - tablenames::{MARKETTRADE, USERTRADE}, -}; -use core::cmp::min; - -use super::{errors::RpcError, state::AppState, types}; +use crate::models::tablenames::{MARKETTRADE, USERTRADE}; +use crate::models::{self, DecimalDbType, TimestampDbType}; +use crate::restapi::errors::RpcError; +use crate::restapi::state::AppState; +use crate::restapi::types; use chrono::{DateTime, SecondsFormat, Utc}; -use models::{DecimalDbType, TimestampDbType}; +use core::cmp::min; +use paperclip::actix::api_v2_operation; +use paperclip::actix::web::{self, HttpRequest, Json}; fn check_market_exists(_market: &str) -> bool { // TODO true } -pub async fn recent_trades(req: HttpRequest, data: web::Data) -> impl Responder { + +#[api_v2_operation] +pub async fn recent_trades(req: HttpRequest, data: web::Data) -> Result>, actix_web::Error> { let market = req.match_info().get("market").unwrap(); let qstring = qstring::QString::from(req.query_string()); let limit = min(100, qstring.get("limit").unwrap_or_default().parse::().unwrap_or(20)); log::debug!("recent_trades market {} limit {}", market, limit); if !check_market_exists(market) { - return Err(RpcError::bad_request("invalid market")); + return Err(RpcError::bad_request("invalid market").into()); } // TODO: this API result should be cached, either in-memory or using redis @@ -33,9 +31,13 @@ pub async fn recent_trades(req: HttpRequest, data: web::Data) -> impl let sql_query = format!("select * from {} where market = $1 order by time desc limit {}", MARKETTRADE, limit); - let trades: Vec = sqlx::query_as(&sql_query).bind(market).fetch_all(&data.db).await?; - log::debug!("query {} recent_trades records", trades.len()); + let trades: Vec = sqlx::query_as(&sql_query) + .bind(market) + .fetch_all(&data.db) + .await + .map_err(|err| actix_web::Error::from(RpcError::from(err)))?; + log::debug!("query {} recent_trades records", trades.len()); Ok(Json(trades)) } @@ -64,10 +66,11 @@ fn sqlverf_ticker() -> impl std::any::Any { ) } +#[api_v2_operation] pub async fn order_trades( app_state: web::Data, path: web::Path<(String, i64)>, -) -> Result, RpcError> { +) -> Result, actix_web::Error> { let (market_name, order_id): (String, i64) = path.into_inner(); log::debug!("order_trades market {} order_id {}", market_name, order_id); @@ -84,7 +87,8 @@ pub async fn order_trades( .bind(market_name) .bind(order_id) .fetch_all(&app_state.db) - .await?; + .await + .map_err(|err| actix_web::Error::from(RpcError::from(err)))?; Ok(Json(types::OrderTradeResult { trades: trades diff --git a/src/restapi/tradingview.rs b/src/restapi/tradingview.rs index 50a9dbd3..5a1d3b56 100644 --- a/src/restapi/tradingview.rs +++ b/src/restapi/tradingview.rs @@ -1,25 +1,19 @@ -use actix_web::web::{self, Data, Json}; -use actix_web::{HttpRequest, Responder}; +use crate::models::tablenames::{MARKET, MARKETTRADE}; +use crate::models::MarketDesc; +use crate::restapi::errors::RpcError; +use crate::restapi::types::{KlineReq, KlineResult, TickerResult}; +use crate::restapi::{mock, state}; +use actix_web::Responder; +use humantime::parse_duration; +use paperclip::actix::web::{self, HttpRequest, Json}; +use paperclip::actix::{api_v2_operation, Apiv2Schema}; use serde::{Deserialize, Serialize}; use serde_json::json; -use std::{ - time::{Duration, SystemTime, UNIX_EPOCH}, - vec, -}; - -use super::errors::RpcError; -use super::types::{KlineReq, KlineResult, TickerResult}; -use crate::restapi::state; - -use super::mock; - -use crate::models::{ - tablenames::{MARKET, MARKETTRADE}, - MarketDesc, -}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; // All APIs here follow https://zlq4863947.gitbook.io/tradingview/3-shu-ju-bang-ding/udf +#[api_v2_operation] pub async fn unix_timestamp(_req: HttpRequest) -> impl Responder { format!("{}", SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs()) } @@ -28,6 +22,7 @@ static DEFAULT_EXCHANGE: &str = "test"; static DEFAULT_SYMBOL: &str = "tradepair"; static DEFAULT_SESSION: &str = "24x7"; +#[api_v2_operation] pub async fn chart_config(_req: HttpRequest) -> impl Responder { log::debug!("request config"); let value = json!({ @@ -50,12 +45,12 @@ pub async fn chart_config(_req: HttpRequest) -> impl Responder { value.to_string() } -#[derive(Deserialize)] +#[derive(Deserialize, Apiv2Schema)] pub struct SymbolQueryReq { symbol: String, } -#[derive(Serialize)] +#[derive(Serialize, Apiv2Schema)] pub struct Symbol { name: String, ticker: String, @@ -188,7 +183,11 @@ fn test_symbol_resolution() { assert_eq!(sym2, "BTC_ETH"); } -pub async fn symbols(symbol_req: web::Query, app_state: Data) -> Result, RpcError> { +#[api_v2_operation] +pub async fn symbols( + symbol_req: web::Query, + app_state: web::Data, +) -> Result, actix_web::Error> { let symbol = symbol_req.into_inner().symbol; log::debug!("resolve symbol {:?}", symbol); @@ -210,17 +209,22 @@ pub async fn symbols(symbol_req: web::Query, app_state: Data, app_state: Data impl std::any::Any { ) } +#[api_v2_operation] pub async fn search_symbols( symbol_search_req: web::Query, - app_state: Data, -) -> Result>, RpcError> { + app_state: web::Data, +) -> Result>, actix_web::Error> { let symbol_query = symbol_search_req.into_inner(); log::debug!("search symbol {:?}", symbol_query); @@ -323,14 +328,19 @@ pub async fn search_symbols( .bind(as_asset[0]) .bind(as_asset[1]) .fetch_all(&app_state.db) - .await? + .await + .map_err(TradeViewError::from)? } else { log::debug!("query symbol as name {}", rsymbol); let symbol_query_2 = format!( "select * from {} where base_asset = $1 OR quote_asset = $1 OR market_name = $1{}", MARKET, limit_query ); - sqlx::query_as(&symbol_query_2).bind(rsymbol).fetch_all(&app_state.db).await? + sqlx::query_as(&symbol_query_2) + .bind(rsymbol) + .fetch_all(&app_state.db) + .await + .map_err(TradeViewError::from)? }; Ok(Json(ret.into_iter().map(From::from).collect())) @@ -351,8 +361,8 @@ struct TickerItem { quote_sum: Option, } -#[derive(Deserialize)] -pub struct TickerInv(#[serde(with = "humantime_serde")] Duration); +#[derive(Serialize, Deserialize)] +struct TickerInv(#[serde(with = "humantime_serde")] Duration); #[cfg(sqlxverf)] fn sqlverf_ticker() -> impl std::any::Any { @@ -365,12 +375,15 @@ fn sqlverf_ticker() -> impl std::any::Any { ) } +#[api_v2_operation] pub async fn ticker( req: HttpRequest, - path: web::Path<(TickerInv, String)>, - app_state: Data, -) -> Result, RpcError> { - let (TickerInv(ticker_inv), market_name) = path.into_inner(); + path: web::Path<(String, String)>, + app_state: web::Data, +) -> Result, actix_web::Error> { + let (ticker_inv, market_name) = path.into_inner(); + let ticker_inv = parse_duration(&ticker_inv).unwrap(); + let cache = req.app_data::().expect("App cache not found"); let now_ts: DateTime = SystemTime::now().into(); let update_inv = app_state.config.trading.ticker_update_interval; @@ -418,7 +431,8 @@ pub async fn ticker( .bind(&market_name) .bind(from_ts.naive_utc()) .fetch_one(&app_state.db) - .await?; + .await + .map_err(TradeViewError::from)?; let ret = TickerResult { market: market_name.clone(), @@ -509,8 +523,9 @@ fn sqlverf_history() -> impl std::any::Any { ) } -pub async fn history(req_origin: HttpRequest, app_state: Data) -> Result, TradeViewError> { - let req: web::Query = web::Query::from_query(req_origin.query_string())?; +#[api_v2_operation] +pub async fn history(req_origin: HttpRequest, app_state: web::Data) -> Result, actix_web::Error> { + let req: web::Query = web::Query::from_query(req_origin.query_string()).map_err(TradeViewError::from)?; let req = req.into_inner(); log::debug!("kline req {:?}", req); @@ -541,7 +556,7 @@ pub async fn history(req_origin: HttpRequest, app_state: Data) let mut out_l: Vec = Vec::new(); let mut out_v: Vec = Vec::new(); - while let Some(item) = query_rows.try_next().await? { + while let Some(item) = query_rows.try_next().await.map_err(TradeViewError::from)? { out_t.push(item.ts.as_ref().map(NaiveDateTime::timestamp).unwrap_or(0) as i32); out_c.push(item.last.as_ref().and_then(Decimal::to_f32).unwrap_or(0.0)); out_o.push(item.first.as_ref().and_then(Decimal::to_f32).unwrap_or(0.0)); @@ -557,7 +572,8 @@ pub async fn history(req_origin: HttpRequest, app_state: Data) let nxt = sqlx::query_scalar(&next_query) .bind(NaiveDateTime::from_timestamp(req.from as i64, 0)) .fetch_optional(&app_state.db) - .await? + .await + .map_err(TradeViewError::from)? .map(|x: NaiveDateTime| x.timestamp() as i32); return Ok(Json(KlineResult { diff --git a/src/restapi/user.rs b/src/restapi/user.rs index 22b3023c..9a810a19 100644 --- a/src/restapi/user.rs +++ b/src/restapi/user.rs @@ -1,12 +1,11 @@ -use super::errors::RpcError; -use super::state::AppState; use crate::models::{tablenames::ACCOUNT, AccountDesc}; -use actix_web::{ - web::{self, Json}, - HttpRequest, -}; +use crate::restapi::errors::RpcError; +use crate::restapi::state::AppState; +use paperclip::actix::api_v2_operation; +use paperclip::actix::web::{self, HttpRequest, Json}; -pub async fn get_user(req: HttpRequest, data: web::Data) -> Result, RpcError> { +#[api_v2_operation] +pub async fn get_user(req: HttpRequest, data: web::Data) -> Result, actix_web::Error> { let user_id: &str = req.match_info().get("l1addr_or_l2pubkey").unwrap(); let mut user_map = data.user_addr_map.lock().unwrap(); if user_map.contains_key(user_id) { @@ -15,7 +14,6 @@ pub async fn get_user(req: HttpRequest, data: web::Data) -> Result