diff --git a/Cargo.lock b/Cargo.lock index c7628df4..e5001f0c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -61,7 +61,7 @@ dependencies = [ "serde_urlencoded", "sha-1", "slab", - "time 0.2.24", + "time 0.2.25", ] [[package]] @@ -216,7 +216,7 @@ dependencies = [ "serde_urlencoded", "smallvec", "socket2", - "time 0.2.24", + "time 0.2.25", "url", ] @@ -424,9 +424,9 @@ checksum = "39092a32794787acd8525ee150305ff051b0aa6cc2abaf193924f5ab05425f39" [[package]] name = "bumpalo" -version = "3.4.0" +version = "3.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e8c087f005730276d1096a652e92a8bacee2e2472bcc9715a74d2bec38b5820" +checksum = "099e596ef14349721d9016f6b80dd3419ea1bf289ab9b44df8e4dfd3a005d5d9" [[package]] name = "byteorder" @@ -519,7 +519,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "784ad0fbab4f3e9cef09f20e0aea6000ae08d2cb98ac4c0abc53df18803d702f" dependencies = [ "percent-encoding", - "time 0.2.24", + "time 0.2.25", "version_check", ] @@ -596,15 +596,15 @@ dependencies = [ [[package]] name = "data-encoding" -version = "2.3.1" +version = "2.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "993a608597367c6377b258c25d7120740f00ed23a2252b729b1932dd7866f908" +checksum = "3ee2393c4a91429dffb4bedf19f4d6abf27d8a732c8ce4980305d782e5426d57" [[package]] name = "derivative" -version = "2.1.3" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eaed5874effa6cde088c644ddcdcb4ffd1511391c5be4fdd7a5ccd02c7e4a183" +checksum = "fcc3dd5e9e9c0b295d6e1e4d811fb6f157d5ffd784b8d202fc62eac8035a770b" dependencies = [ "proc-macro2", "quote", @@ -635,6 +635,7 @@ dependencies = [ name = "dingir-exchange" version = "0.1.0" dependencies = [ + "actix-rt", "actix-web", "anyhow", "bytes", @@ -737,9 +738,9 @@ checksum = "37ab347416e802de484e4d03c7316c48f1ecb56574dfd4a46a80f173ce1de04d" [[package]] name = "flate2" -version = "1.0.19" +version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7411863d55df97a419aa64cb4d2f167103ea9d767e2c54a1868b7ac3f6b47129" +checksum = "cd3aec53de10fe96d7d8c565eb17f2c687bb5518a2ec453b5b1252964526abe0" dependencies = [ "cfg-if 1.0.0", "crc32fast", @@ -1135,9 +1136,9 @@ checksum = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736" [[package]] name = "js-sys" -version = "0.3.46" +version = "0.3.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf3d7383929f7c9c7c2d0fa596f325832df98c3704f2c60553080f7127a58175" +checksum = "5cfb73131c35423a367daf8cbd24100af0d077668c8c2943f0e7dd775fef0f65" dependencies = [ "wasm-bindgen", ] @@ -1169,9 +1170,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.82" +version = "0.2.84" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89203f3fba0a3795506acaad8ebce3c80c0af93f994d5a1d7a0b1eeb23271929" +checksum = "1cca32fa0182e8c0989459524dc356b8f2b5c10f1b9eb521b7d182c03cf8c5ff" [[package]] name = "libz-sys" @@ -1339,9 +1340,9 @@ dependencies = [ [[package]] name = "nom" -version = "6.0.1" +version = "6.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "88034cfd6b4a0d54dd14f4a507eceee36c0b70e5a02236c4e4df571102be17f0" +checksum = "ab6f70b46d6325aa300f1c7bb3d470127dfc27806d8ea6bf294ee0ce643ce2b1" dependencies = [ "bitvec", "lexical-core", @@ -2078,9 +2079,9 @@ checksum = "2579985fda508104f7587689507983eadd6a6e84dd35d6d115361f530916fa0d" [[package]] name = "sha2" -version = "0.9.2" +version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e7aab86fe2149bad8c507606bdb3f4ef5e7b2380eb92350f56122cca72a42a8" +checksum = "fa827a14b29ab7f44778d14a88d3cb76e949c45083f7dbfa507d0cb699dc12de" dependencies = [ "block-buffer", "cfg-if 1.0.0", @@ -2141,7 +2142,7 @@ checksum = "74c70f0235b9925cbb106c52af1a28b5ea4885a8b851e328b8562e257a389c2d" dependencies = [ "lazy_static", "maplit", - "nom 6.0.1", + "nom 6.1.0", "regex", "unicode_categories", ] @@ -2149,7 +2150,7 @@ dependencies = [ [[package]] name = "sqlx" version = "0.4.2" -source = "git+https://github.com/launchbadge/sqlx.git#df393128f8b07e3b858e65446b2a508a73bf5135" +source = "git+https://github.com/launchbadge/sqlx.git#31abe22e348d6ac668c140de32612f0930c2abec" dependencies = [ "sqlx-core", "sqlx-macros", @@ -2158,7 +2159,7 @@ dependencies = [ [[package]] name = "sqlx-core" version = "0.4.2" -source = "git+https://github.com/launchbadge/sqlx.git#df393128f8b07e3b858e65446b2a508a73bf5135" +source = "git+https://github.com/launchbadge/sqlx.git#31abe22e348d6ac668c140de32612f0930c2abec" dependencies = [ "ahash 0.6.3", "atoi", @@ -2209,16 +2210,14 @@ dependencies = [ [[package]] name = "sqlx-macros" version = "0.4.2" -source = "git+https://github.com/launchbadge/sqlx.git#df393128f8b07e3b858e65446b2a508a73bf5135" +source = "git+https://github.com/launchbadge/sqlx.git#31abe22e348d6ac668c140de32612f0930c2abec" dependencies = [ "dotenv", "either", "futures", "heck", - "once_cell", "proc-macro2", "quote", - "serde 1.0.123", "serde_json", "sha2", "sqlx-core", @@ -2230,7 +2229,7 @@ dependencies = [ [[package]] name = "sqlx-rt" version = "0.2.0" -source = "git+https://github.com/launchbadge/sqlx.git#df393128f8b07e3b858e65446b2a508a73bf5135" +source = "git+https://github.com/launchbadge/sqlx.git#31abe22e348d6ac668c140de32612f0930c2abec" dependencies = [ "once_cell", "tokio", @@ -2390,11 +2389,11 @@ dependencies = [ [[package]] name = "thread_local" -version = "1.1.0" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb9bc092d0d51e76b2b19d9d85534ffc9ec2db959a2523cdae0697e2972cd447" +checksum = "d8208a331e1cb318dd5bd76951d2b8fc48ca38a69f5f4e4af1b6a9f8c6236915" dependencies = [ - "lazy_static", + "once_cell", ] [[package]] @@ -2419,9 +2418,9 @@ dependencies = [ [[package]] name = "time" -version = "0.2.24" +version = "0.2.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "273d3ed44dca264b0d6b3665e8d48fb515042d42466fad93d2a45b90ec4058f7" +checksum = "1195b046942c221454c2539395f85413b33383a067449d78aab2b7b052a142f7" dependencies = [ "const_fn", "libc", @@ -2457,9 +2456,9 @@ dependencies = [ [[package]] name = "tinyvec" -version = "1.1.0" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ccf8dbc19eb42fba10e8feaaec282fb50e2c14b2726d6301dbfeed0f73306a6f" +checksum = "317cca572a0e89c3ce0ca1f1bdc9369547fe318a683418e42ac8f59d14701023" dependencies = [ "tinyvec_macros", ] @@ -2548,18 +2547,16 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.6.2" +version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "feb971a26599ffd28066d387f109746df178eff14d5ea1e235015c5601967a4b" +checksum = "ebb7cb2f00c5ae8df755b252306272cd1790d39728363936e01827e11f0b017b" dependencies = [ - "async-stream", "bytes", "futures-core", "futures-sink", "log", "pin-project-lite", "tokio", - "tokio-stream", ] [[package]] @@ -2639,9 +2636,9 @@ checksum = "343bc9466d3fe6b0f960ef45960509f84480bf4fd96f92901afe7ff3df9d3a62" [[package]] name = "tower-service" -version = "0.3.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e987b6bf443f4b5b3b6f38704195592cca41c5bb7aedd3c3693c7081f8289860" +checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6" [[package]] name = "tracing" @@ -2841,9 +2838,9 @@ checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" [[package]] name = "wasm-bindgen" -version = "0.2.69" +version = "0.2.70" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3cd364751395ca0f68cafb17666eee36b63077fb5ecd972bbcd74c90c4bf736e" +checksum = "55c0f7123de74f0dab9b7d00fd614e7b19349cd1e2f5252bbe9b1754b59433be" dependencies = [ "cfg-if 1.0.0", "wasm-bindgen-macro", @@ -2851,9 +2848,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.69" +version = "0.2.70" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1114f89ab1f4106e5b55e688b828c0ab0ea593a1ea7c094b141b14cbaaec2d62" +checksum = "7bc45447f0d4573f3d65720f636bbcc3dd6ce920ed704670118650bcd47764c7" dependencies = [ "bumpalo", "lazy_static", @@ -2866,9 +2863,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.69" +version = "0.2.70" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a6ac8995ead1f084a8dea1e65f194d0973800c7f571f6edd70adf06ecf77084" +checksum = "3b8853882eef39593ad4174dd26fc9865a64e84026d223f63bb2c42affcbba2c" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -2876,9 +2873,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.69" +version = "0.2.70" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5a48c72f299d80557c7c62e37e7225369ecc0c963964059509fbafe917c7549" +checksum = "4133b5e7f2a531fa413b3a1695e925038a05a71cf67e87dafa295cb645a01385" dependencies = [ "proc-macro2", "quote", @@ -2889,15 +2886,15 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.69" +version = "0.2.70" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e7811dd7f9398f14cc76efd356f98f03aa30419dea46aa810d71e819fc97158" +checksum = "dd4945e4943ae02d15c13962b38a5b1e81eadd4b71214eee75af64a4d6a4fd64" [[package]] name = "web-sys" -version = "0.3.46" +version = "0.3.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "222b1ef9334f92a21d3fb53dc3fd80f30836959a90f9274a626d7e06315ba3c3" +checksum = "c40dc691fc48003eba817c38da7113c15698142da971298003cac3ef175680b3" dependencies = [ "js-sys", "wasm-bindgen", diff --git a/Cargo.toml b/Cargo.toml index 624b38a2..4938d765 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,6 @@ futures-util = { version = "0.3.12", default-features = false } tokio = { version = "1.1.1", features = ["full"] } thread-id = "3.3.0" - futures = "0.3.12" hyper = "0.14.2" crossbeam-channel = "0.5.0" @@ -39,6 +38,7 @@ dotenv = "0.15.0" num_enum = "0.5.1" tonic = "0.4.0" actix-web = "4.0.0-beta.1" +actix-rt = "=2.0.0-beta.2" qstring = "0.7.2" thiserror = "1.0.23" rand = "0.8.3" diff --git a/src/bin/matchengine.rs b/src/bin/matchengine.rs index 92de67aa..b63b39d4 100644 --- a/src/bin/matchengine.rs +++ b/src/bin/matchengine.rs @@ -24,27 +24,9 @@ fn main() { rt.block_on(async { let stub = prepare().await.expect("Init state error"); - stub.prepare_stub(); - Controller::prepare_runtime(&rt as *const tokio::runtime::Runtime); - - let rpc_thread = std::thread::spawn(move || { - let aux_rt: tokio::runtime::Runtime = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .expect("build auxiliary runtime"); - - println!("start grpc under single-thread runtime"); - aux_rt.block_on(grpc_run()).unwrap() - }); - - tokio::runtime::Handle::current() - .spawn_blocking(|| rpc_thread.join()) - .await - .unwrap() + grpc_run(stub).await }) .unwrap(); - - Controller::release_stub(); } async fn prepare() -> anyhow::Result { @@ -61,18 +43,17 @@ async fn prepare() -> anyhow::Result { Ok(grpc_stub) } -async fn grpc_run() -> Result<(), Box> { - persist::init_persist_timer(); - +async fn grpc_run(stub: Controller) -> Result<(), Box> { let addr = "0.0.0.0:50051".parse().unwrap(); - let grpc = GrpcHandler {}; - println!("Starting gprc service"); + let grpc = GrpcHandler::new(stub); + log::info!("Starting gprc service"); let (tx, rx) = tokio::sync::oneshot::channel::<()>(); + let on_leave = grpc.on_leave(); tokio::spawn(async move { tokio::signal::ctrl_c().await.ok(); - println!("Ctrl-c received, shutting down"); + log::info!("Ctrl-c received, shutting down"); tx.send(()).ok(); }); @@ -83,6 +64,8 @@ async fn grpc_run() -> Result<(), Box> { }) .await?; - println!("Shutted down"); + log::info!("Shutted down, wait for final clear"); + on_leave.leave().await; + log::info!("Shutted down"); Ok(()) } diff --git a/src/config.rs b/src/config.rs index 8b494481..6cc4cc3e 100644 --- a/src/config.rs +++ b/src/config.rs @@ -58,6 +58,7 @@ pub struct Settings { pub markets: Vec, pub brokers: String, pub consumer_group: String, + pub persist_interval: i32, pub slice_interval: i32, pub slice_keeptime: i32, pub history_thread: i32, @@ -74,6 +75,7 @@ impl Default for Settings { markets: Vec::new(), consumer_group: "kline_data_fetcher".to_string(), brokers: "127.0.0.1:9092".to_string(), + persist_interval: 3600, slice_interval: 86400, slice_keeptime: 86400 * 3, history_thread: 10, diff --git a/src/matchengine/asset.rs b/src/matchengine/asset.rs index a78e4680..69a80f85 100644 --- a/src/matchengine/asset.rs +++ b/src/matchengine/asset.rs @@ -3,7 +3,7 @@ use crate::message::{BalanceMessage, MessageManager}; use crate::models; use crate::utils; use crate::{config, utils::FTimestamp}; -use models::BalanceHistory; +pub use models::BalanceHistory; use anyhow::Result; use rust_decimal::prelude::Zero; @@ -12,10 +12,8 @@ use serde::{Deserialize, Serialize}; use ttl_cache::TtlCache; use num_enum::TryFromPrimitive; -use std::cell::RefCell; use std::collections::HashMap; -use std::rc::Rc; use std::time::Duration; const BALANCE_MAP_INIT_SIZE_ASSET: usize = 64; @@ -238,23 +236,64 @@ struct BalanceUpdateKey { pub struct BalanceUpdateController { cache: TtlCache, - balance_manager: Rc>, - message_manager: Rc>, - history_writer: Rc>, +} + +pub trait PersistExector { + fn real_persist(&self) -> bool { + true + } + fn put_balance(&mut self, balance: BalanceHistory); +} + +impl PersistExector for Box { + fn put_balance(&mut self, balance: BalanceHistory) { + self.as_mut().put_balance(balance) + } +} + +pub(super) struct DummyPersistor(pub(super) bool); +impl PersistExector for DummyPersistor { + fn real_persist(&self) -> bool { + self.0 + } + fn put_balance(&mut self, _balance: BalanceHistory) {} +} + +pub(super) struct MessengerAsPersistor<'a, T>(&'a mut T); + +impl PersistExector for MessengerAsPersistor<'_, T> { + fn put_balance(&mut self, balance: BalanceHistory) { + self.0.push_balance_message(&BalanceMessage { + timestamp: balance.time.timestamp() as f64, + user_id: balance.user_id as u32, + asset: balance.asset.clone(), + business: balance.business.clone(), + change: balance.change.to_string(), + }); + } +} + +pub(super) struct DBAsPersistor<'a, T>(&'a mut T); + +impl PersistExector for DBAsPersistor<'_, T> { + fn put_balance(&mut self, balance: BalanceHistory) { + self.0.append_balance_history(balance); + } +} + +pub(super) fn persistor_for_message(messenger: &mut T) -> MessengerAsPersistor<'_, T> { + MessengerAsPersistor(messenger) +} + +pub(super) fn persistor_for_db(history_writer: &mut T) -> DBAsPersistor<'_, T> { + DBAsPersistor(history_writer) } impl BalanceUpdateController { - pub fn new( - balance_manager: Rc>, - message_manager: Rc>, - history_writer: Rc>, - ) -> BalanceUpdateController { + pub fn new() -> BalanceUpdateController { let capacity = 1_000_000; BalanceUpdateController { cache: TtlCache::new(capacity), - balance_manager, - message_manager, - history_writer, } } pub fn reset(&mut self) { @@ -269,7 +308,8 @@ impl BalanceUpdateController { // return false if duplicate pub fn update_user_balance( &mut self, - real: bool, + balance_manager: &mut BalanceManager, + mut persistor: impl PersistExector, user_id: u32, asset: &str, business: String, @@ -288,38 +328,31 @@ impl BalanceUpdateController { } let abs_change = change.abs(); let new_balance = if abs_change.is_sign_positive() || abs_change.is_zero() { - self.balance_manager - .borrow_mut() - .add(user_id, BalanceType::AVAILABLE, &asset, &abs_change) + balance_manager.add(user_id, BalanceType::AVAILABLE, &asset, &abs_change) } else { - self.balance_manager - .borrow_mut() - .sub(user_id, BalanceType::AVAILABLE, &asset, &abs_change) + balance_manager.sub(user_id, BalanceType::AVAILABLE, &asset, &abs_change) }; log::debug!("change user balance: {} {} {}", user_id, asset, change); self.cache.insert(cache_key, true, Duration::from_secs(3600)); - if real { + + if persistor.real_persist() { detail["id"] = serde_json::Value::from(business_id); - let balance_history = BalanceHistory { + persistor.put_balance(BalanceHistory { time: FTimestamp(utils::current_timestamp()).into(), user_id: user_id as i32, asset: asset.to_string(), - business: business.clone(), + business, change, balance: new_balance, detail: detail.to_string(), - }; - self.history_writer.borrow_mut().append_balance_history(balance_history); - - let message = BalanceMessage { - timestamp: FTimestamp(utils::current_timestamp()).into(), - user_id, - asset: asset.to_string(), - business, - change: change.to_string(), - }; - self.message_manager.borrow_mut().push_balance_message(&message); + }); } true } } + +impl Default for BalanceUpdateController { + fn default() -> Self { + Self::new() + } +} diff --git a/src/matchengine/controller.rs b/src/matchengine/controller.rs index 5f74dc8f..123d6ed0 100644 --- a/src/matchengine/controller.rs +++ b/src/matchengine/controller.rs @@ -1,4 +1,4 @@ -use crate::asset::{AssetManager, BalanceManager, BalanceType, BalanceUpdateController}; +use crate::asset::{self, AssetManager, BalanceManager, BalanceType, BalanceUpdateController}; use crate::database::OperationLogSender; use crate::market; use crate::sequencer::Sequencer; @@ -7,8 +7,6 @@ use crate::{config, utils}; use anyhow::anyhow; use rust_decimal::Decimal; use serde_json::json; -use std::cell::RefCell; -use std::rc::Rc; use tonic::{self, Status}; //use rust_decimal::Decimal; @@ -32,17 +30,76 @@ use sqlx::Executor; use serde::Serialize; use std::str::FromStr; +#[derive(Copy, Clone)] +enum PersistPolicy { + Dummy, + ToDB, + ToMessege, +} + +pub struct Persistor { + history_writer: DatabaseHistoryWriter, + message_manager: Option, + policy: PersistPolicy, +} + +pub struct PersistorGen<'c> { + base: &'c mut Persistor, + policy: PersistPolicy, +} + +impl<'c> PersistorGen<'c> { + fn persist_for_market(self, market_tag: (String, String)) -> Box { + match self.policy { + PersistPolicy::Dummy => Box::new(market::DummyPersistor(false)), + PersistPolicy::ToDB => Box::new(market::persistor_for_db(&mut self.base.history_writer)), + PersistPolicy::ToMessege => Box::new(market::persistor_for_message( + self.base.message_manager.as_mut().unwrap(), + market_tag, + )), + } + } + + fn persist_for_balance(self) -> Box { + match self.policy { + PersistPolicy::Dummy => Box::new(asset::DummyPersistor(false)), + PersistPolicy::ToDB => Box::new(asset::persistor_for_db(&mut self.base.history_writer)), + PersistPolicy::ToMessege => Box::new(asset::persistor_for_message(self.base.message_manager.as_mut().unwrap())), + } + } +} + +impl Persistor { + fn is_real(&mut self, real: bool) -> PersistorGen<'_> { + let policy = if real { self.policy } else { PersistPolicy::Dummy }; + + PersistorGen { base: self, policy } + } + + fn service_available(&self) -> bool { + if self.message_manager.as_ref().map(ChannelMessageManager::is_block).unwrap_or(true) { + log::warn!("message_manager full"); + return false; + } + if self.history_writer.is_block() { + log::warn!("history_writer full"); + return false; + } + true + } +} + pub struct Controller { pub settings: config::Settings, - pub sequencer: Rc>, - pub balance_manager: Rc>, + pub sequencer: Sequencer, + pub balance_manager: BalanceManager, pub asset_manager: AssetManager, - pub update_controller: Rc>, + pub update_controller: BalanceUpdateController, pub markets: HashMap, pub log_handler: OperationLogSender, - pub history_writer: Rc>, - pub message_manager: Rc>, - pub(crate) rt: tokio::runtime::Handle, + pub persistor: Persistor, + dbg_pool: sqlx::Pool, + //pub(crate) rt: tokio::runtime::Handle, } const ORDER_LIST_MAX_LEN: usize = 100; @@ -53,45 +110,42 @@ const OPERATION_ORDER_PUT: &str = "order_put"; impl Controller { pub fn new(settings: config::Settings) -> Controller { - let balance_manager = Rc::new(RefCell::new(BalanceManager::new(&settings.assets).unwrap())); - let message_manager = Rc::new(RefCell::new(new_message_manager_with_kafka_backend(&settings.brokers).unwrap())); - let history_writer = Rc::new(RefCell::new( - DatabaseHistoryWriter::new( - &DatabaseWriterConfig { - spawn_limit: 4, - apply_benchmark: true, - capability_limit: 8192, - }, - &sqlx::Pool::::connect_lazy(&settings.db_history).unwrap(), - ) - .unwrap(), - )); - let update_controller = Rc::new(RefCell::new(BalanceUpdateController::new( - balance_manager.clone(), - message_manager.clone(), - history_writer.clone(), - ))); + let history_pool = sqlx::Pool::::connect_lazy(&settings.db_history).unwrap(); + let mut balance_manager = BalanceManager::new(&settings.assets).unwrap(); + let message_manager = new_message_manager_with_kafka_backend(&settings.brokers).unwrap(); + let history_writer = DatabaseHistoryWriter::new( + &DatabaseWriterConfig { + spawn_limit: 4, + apply_benchmark: true, + capability_limit: 8192, + }, + &history_pool, + ) + .unwrap(); + let update_controller = BalanceUpdateController::new(); let asset_manager = AssetManager::new(&settings.assets).unwrap(); - let sequencer = Rc::new(RefCell::new(Sequencer::default())); + let sequencer = Sequencer::default(); let mut markets = HashMap::new(); for entry in &settings.markets { - let market = market::Market::new( - entry, - balance_manager.clone(), - sequencer.clone(), - history_writer.clone(), - message_manager.clone(), - ) - .unwrap(); + let market = market::Market::new(entry, &mut balance_manager).unwrap(); markets.insert(entry.name.clone(), market); } + let main_pool = if settings.db_log == settings.db_history { + history_pool + } else { + sqlx::Pool::::connect_lazy(&settings.db_log).unwrap() + }; + let log_handler = OperationLogSender::new(&DatabaseWriterConfig { spawn_limit: 4, apply_benchmark: true, capability_limit: 8192, }) - .start_schedule(&sqlx::Pool::::connect_lazy(&settings.db_log).unwrap()) + .start_schedule(&main_pool) .unwrap(); + + let persist_policy = PersistPolicy::ToDB; + Controller { settings, sequencer, @@ -100,22 +154,15 @@ impl Controller { update_controller, markets, log_handler, - history_writer, - message_manager, - rt: tokio::runtime::Handle::current(), + persistor: Persistor { + history_writer, + message_manager: Some(message_manager), + policy: persist_policy, + }, + dbg_pool: main_pool, + // rt: tokio::runtime::Handle::current(), } } - // TODO: make the code more elegant - pub fn prepare_stub(self) { - unsafe { G_STUB = Some(self) }; - } - pub fn prepare_runtime(rt: *const tokio::runtime::Runtime) { - unsafe { G_RT = rt }; - } - - pub fn release_stub() { - unsafe { G_STUB = None }; - } pub fn asset_list(&self, _req: AssetListRequest) -> Result { let result = AssetListResponse { @@ -145,7 +192,7 @@ impl Controller { req.assets }; let user_id = req.user_id; - let balance_manager = self.balance_manager.borrow_mut(); + let balance_manager = &self.balance_manager; let balances = query_assets .into_iter() .map(|asset_name| { @@ -195,10 +242,7 @@ impl Controller { .rev() .skip(req.offset as usize) .take(limit as usize) - .map(|order_rc| { - let order = *order_rc.borrow_mut(); - order_to_proto(&order) - }) + .map(|order_rc| order_to_proto(&order_rc.borrow())) .collect() }) .unwrap_or_else(Vec::new); @@ -300,15 +344,7 @@ impl Controller { log::warn!("log_handler full"); return false; } - if self.message_manager.borrow_mut().is_block() { - log::warn!("message_manager full"); - return false; - } - if self.history_writer.borrow_mut().is_block() { - log::warn!("history_writer full"); - return false; - } - true + self.persistor.service_available() } pub fn update_balance(&mut self, real: bool, req: BalanceUpdateRequest) -> std::result::Result { @@ -326,8 +362,9 @@ impl Controller { } else { serde_json::from_str(req.detail.as_str()).map_err(|_| Status::invalid_argument("invalid detail"))? }; - let _is_valid = self.update_controller.borrow_mut().update_user_balance( - real, + let _is_valid = self.update_controller.update_user_balance( + &mut self.balance_manager, + self.persistor.is_real(real).persist_for_balance(), req.user_id, req.asset.as_str(), req.business.clone(), @@ -352,10 +389,14 @@ impl Controller { return Err(Status::invalid_argument("invalid market")); } let market = self.markets.get_mut(&req.market).unwrap(); + let balance_manager = &mut self.balance_manager; + let persistor = self.persistor.is_real(real).persist_for_market(market.tag()); let order_input = order_input_from_proto(&req).map_err(|e| Status::invalid_argument(format!("invalid decimal {}", e)))?; - let order = market.put_order(real, order_input).map_err(|e| Status::unknown(format!("{}", e)))?; + let order = market + .put_order(&mut self.sequencer, balance_manager.into(), persistor, order_input) + .map_err(|e| Status::unknown(format!("{}", e)))?; if real { self.append_operation_log(OPERATION_ORDER_PUT, &req); } @@ -376,7 +417,10 @@ impl Controller { if order.user != req.user_id { return Err(Status::invalid_argument("invalid user")); } - market.cancel(real, order.id); + let balance_manager = &mut self.balance_manager; + let persistor = self.persistor.is_real(real).persist_for_market(market.tag()); + + market.cancel(balance_manager.into(), persistor, order.id); if real { self.append_operation_log(OPERATION_ORDER_CANCEL, &req); } @@ -391,7 +435,11 @@ impl Controller { .markets .get_mut(&req.market) .ok_or_else(|| Status::invalid_argument("invalid market"))?; - let total = market.cancel_all_for_user(real, req.user_id) as u32; + let total = market.cancel_all_for_user( + (&mut self.balance_manager).into(), + self.persistor.is_real(real).persist_for_market(market.tag()), + req.user_id, + ) as u32; if real { self.append_operation_log(OPERATION_ORDER_CANCEL_ALL, &req); } @@ -409,13 +457,13 @@ impl Controller { } fn reset_state(&mut self) { - self.sequencer.borrow_mut().reset(); + self.sequencer.reset(); for market in self.markets.values_mut() { market.reset(); } //self.log_handler.reset(); - self.update_controller.borrow_mut().reset(); - self.balance_manager.borrow_mut().reset(); + self.update_controller.reset(); + self.balance_manager.reset(); //Ok(()) } @@ -452,17 +500,33 @@ impl Controller { tablenames::ORDERSLICE); */ // sqlx::query seems unable to handle multi statements, so `execute` is used here - let db_str = &self.settings.db_log; + let down_cmd = include_str!("../../migrations/reset/down.sql"); let up_cmd = include_str!("../../migrations/reset/up.sql"); - let mut connection = ConnectionType::connect(db_str).await?; - connection.execute(down_cmd).await?; - let mut connection = ConnectionType::connect(db_str).await?; - connection.execute(up_cmd).await?; - - let mut connection = ConnectionType::connect(db_str).await?; - crate::persist::MIGRATOR.run(&mut connection).await?; - crate::message::persist::MIGRATOR.run(&mut connection).await + let mut connection1 = self.dbg_pool.acquire().await?; + connection1.execute(down_cmd).await?; + let mut connection2 = self.dbg_pool.acquire().await?; + connection2.execute(up_cmd).await?; + + //To workaround https://github.com/launchbadge/sqlx/issues/954: migrator is not Send + let db_str = self.settings.db_log.clone(); + let thr_handle = std::thread::spawn(move || { + let rt: tokio::runtime::Runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("build another runtime for migration"); + + let ret = rt.block_on(async move { + let mut conn = ConnectionType::connect(&db_str).await?; + crate::persist::MIGRATOR.run(&mut conn).await?; + crate::message::persist::MIGRATOR.run(&mut conn).await + }); + + println!("migration task done"); + ret + }); + + tokio::task::spawn_blocking(move || thr_handle.join().unwrap()).await.unwrap() } .await .map_err(|err| Status::unknown(format!("{}", err)))?; @@ -506,7 +570,7 @@ impl Controller { { let params = serde_json::to_string(req).unwrap(); let operation_log = models::OperationLog { - id: self.sequencer.borrow_mut().next_operation_log_id() as i64, + id: self.sequencer.next_operation_log_id() as i64, time: FTimestamp(utils::current_timestamp()).into(), method: method.to_owned(), params, diff --git a/src/matchengine/history.rs b/src/matchengine/history.rs index a20dfbeb..7beebe80 100644 --- a/src/matchengine/history.rs +++ b/src/matchengine/history.rs @@ -12,6 +12,7 @@ type TradeWriter = DatabaseWriter; pub trait HistoryWriter { fn is_block(&self) -> bool; + //TODO: don't take the ownership? fn append_balance_history(&mut self, data: models::BalanceHistory); fn append_order_history(&mut self, order: &market::Order); fn append_trade_history(&mut self, trade: &Trade); diff --git a/src/matchengine/market.rs b/src/matchengine/market.rs index 0298c76a..e5b4dcab 100644 --- a/src/matchengine/market.rs +++ b/src/matchengine/market.rs @@ -1,16 +1,16 @@ use crate::asset::{BalanceManager, BalanceType}; +use crate::config; use crate::history::HistoryWriter; use crate::message::{MessageManager, OrderMessage}; use crate::sequencer::Sequencer; -use crate::types::{self, MarketRole, OrderEventType, Trade}; +use crate::types::{self, MarketRole, OrderEventType}; use crate::utils; -use crate::{config, message}; -use std::cell::RefCell; use std::cmp::{min, Ordering}; use std::collections::BTreeMap; use std::iter::Iterator; -use std::rc::Rc; +use std::sync::Arc; +use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use anyhow::{anyhow, Result}; use itertools::Itertools; @@ -86,7 +86,33 @@ impl Order { } } -pub type OrderRc = Rc>; +pub use types::Trade; + +#[derive(Clone, Debug)] +pub struct OrderRc(Arc>); + +/* + simulate behavior like RefCell, the syncing is ensured by locking in higher rank + here we use RwLock only for avoiding unsafe tag, we can just use raw pointer + casted from ARc rather than RwLock here if we do not care about unsafe +*/ +impl OrderRc { + fn new(order: Order) -> Self { + OrderRc(Arc::new(RwLock::new(order))) + } + + pub(super) fn borrow(&self) -> RwLockReadGuard<'_, Order> { + self.0.try_read().expect("Lock for parent entry ensure it") + } + + pub(super) fn borrow_mut(&mut self) -> RwLockWriteGuard<'_, Order> { + self.0.try_write().expect("Lock for parent entry ensure it") + } + + fn deep(&self) -> Order { + *(self.borrow()) + } +} pub fn is_order_ask(order: &Order) -> bool { order.side == OrderSide::ASK @@ -108,63 +134,113 @@ pub struct Market { pub bids: BTreeMap, pub trade_count: u64, +} - pub sequencer: Rc>, - balance_manager: BalanceManagerWrapper, - pub history_writer: Rc>, - message_manager: MessageManagerWrapper, +pub trait PersistExector { + fn real_persist(&self) -> bool { + true + } + fn put_order(&mut self, order: &Order, at_step: OrderEventType); + fn put_trade(&mut self, trade: &Trade); } -const MAP_INIT_CAPACITY: usize = 1024; +impl PersistExector for Box { + fn put_order(&mut self, order: &Order, at_step: OrderEventType) { + self.as_mut().put_order(order, at_step) + } + fn put_trade(&mut self, trade: &Trade) { + self.as_mut().put_trade(trade) + } +} + +pub(super) struct DummyPersistor(pub(super) bool); +impl PersistExector for DummyPersistor { + fn real_persist(&self) -> bool { + self.0 + } + fn put_order(&mut self, _order: &Order, _as_step: OrderEventType) {} + fn put_trade(&mut self, _: &Trade) {} +} -struct MessageManagerWrapper { - inner: Rc>, +pub(super) struct MessengerAsPersistor<'a, T>(&'a mut T, (String, String)); + +impl PersistExector for MessengerAsPersistor<'_, T> { + fn put_order(&mut self, order: &Order, at_step: OrderEventType) { + self.0.push_order_message(&OrderMessage { + event: at_step, + order: *order, + base: self.1 .0.clone(), + quote: self.1 .1.clone(), + }); + } + fn put_trade(&mut self, trade: &Trade) { + self.0.push_trade_message(trade); + } } -impl MessageManagerWrapper { - pub fn push_order_message(&self, message: &OrderMessage) { - self.inner.borrow_mut().push_order_message(message) + +pub(super) struct DBAsPersistor<'a, T>(&'a mut T); + +impl PersistExector for DBAsPersistor<'_, T> { + fn put_order(&mut self, order: &Order, at_step: OrderEventType) { + //only persist on finish + match at_step { + OrderEventType::FINISH => self.0.append_order_history(order), + OrderEventType::PUT => (), + _ => (), + } } - pub fn push_trade_message(&self, message: &Trade) { - self.inner.borrow_mut().push_trade_message(message) + fn put_trade(&mut self, trade: &Trade) { + self.0.append_trade_history(trade); } } -struct BalanceManagerWrapper { - inner: Rc>, +pub(super) fn persistor_for_message(messenger: &mut T, tag: (String, String)) -> MessengerAsPersistor<'_, T> { + MessengerAsPersistor(messenger, tag) +} + +pub(super) fn persistor_for_db(history_writer: &mut T) -> DBAsPersistor<'_, T> { + DBAsPersistor(history_writer) +} + +pub struct BalanceManagerWrapper<'a> { + inner: &'a mut BalanceManager, } -impl BalanceManagerWrapper { - pub fn balance_add(&self, user_id: u32, balance_type: BalanceType, asset: &str, amount: &Decimal) { - self.inner.borrow_mut().add(user_id, balance_type, asset, amount); +impl<'a> From<&'a mut BalanceManager> for BalanceManagerWrapper<'a> { + fn from(origin: &'a mut BalanceManager) -> Self { + BalanceManagerWrapper { inner: origin } } - pub fn balance_get(&self, user_id: u32, balance_type: BalanceType, asset: &str) -> Decimal { - self.inner.borrow_mut().get(user_id, balance_type, asset) +} + +impl BalanceManagerWrapper<'_> { + pub fn balance_add(&mut self, user_id: u32, balance_type: BalanceType, asset: &str, amount: &Decimal) { + self.inner.add(user_id, balance_type, asset, amount); } - pub fn balance_sub(&self, user_id: u32, balance_type: BalanceType, asset: &str, amount: &Decimal) { - self.inner.borrow_mut().sub(user_id, balance_type, asset, amount); + pub fn balance_get(&mut self, user_id: u32, balance_type: BalanceType, asset: &str) -> Decimal { + self.inner.get(user_id, balance_type, asset) } - pub fn balance_frozen(&self, user_id: u32, asset: &str, amount: &Decimal) { - self.inner.borrow_mut().frozen(user_id, asset, amount) + pub fn balance_sub(&mut self, user_id: u32, balance_type: BalanceType, asset: &str, amount: &Decimal) { + self.inner.sub(user_id, balance_type, asset, amount); } - pub fn balance_unfrozen(&self, user_id: u32, asset: &str, amount: &Decimal) { - self.inner.borrow_mut().unfrozen(user_id, asset, amount) + pub fn balance_frozen(&mut self, user_id: u32, asset: &str, amount: &Decimal) { + self.inner.frozen(user_id, asset, amount) } - pub fn asset_prec(&self, asset: &str) -> u32 { - self.inner.borrow_mut().asset_manager.asset_prec(asset) + pub fn balance_unfrozen(&mut self, user_id: u32, asset: &str, amount: &Decimal) { + self.inner.unfrozen(user_id, asset, amount) + } + pub fn asset_prec(&mut self, asset: &str) -> u32 { + self.inner.asset_manager.asset_prec(asset) } } + +const MAP_INIT_CAPACITY: usize = 1024; + // TODO: is it ok to match with oneself's order? // TODO: precision impl Market { - pub fn new( - market_conf: &config::Market, - balance_manager: Rc>, - sequencer: Rc>, - history_writer: Rc>, - message_manager: Rc>, - ) -> Result { - let asset_exist = |asset: &str| -> bool { balance_manager.borrow_mut().asset_manager.asset_exist(asset) }; - let asset_prec = |asset: &str| -> u32 { balance_manager.borrow_mut().asset_manager.asset_prec(asset) }; + pub fn new(market_conf: &config::Market, balance_manager: &mut BalanceManager) -> Result { + let asset_exist = |asset: &str| -> bool { balance_manager.asset_manager.asset_exist(asset) }; + let asset_prec = |asset: &str| -> u32 { balance_manager.asset_manager.asset_prec(asset) }; if !asset_exist(&market_conf.quote.name) || !asset_exist(&market_conf.base.name) { return Err(anyhow!("invalid assert name {} {}", market_conf.quote.name, market_conf.base.name)); } @@ -184,18 +260,19 @@ impl Market { quote_prec: market_conf.quote.prec, fee_prec: market_conf.fee_prec, min_amount: market_conf.min_amount, - sequencer, orders: BTreeMap::new(), users: BTreeMap::new(), asks: BTreeMap::new(), bids: BTreeMap::new(), trade_count: 0, - balance_manager: BalanceManagerWrapper { inner: balance_manager }, - history_writer, - message_manager: MessageManagerWrapper { inner: message_manager }, }; Ok(market) } + + pub fn tag(&self) -> (String, String) { + (self.base.clone(), self.quote.clone()) + } + pub fn reset(&mut self) { log::debug!("market {} reset", self.name); self.bids.clear(); @@ -203,21 +280,20 @@ impl Market { self.users.clear(); self.orders.clear(); } - pub fn frozen_balance(&self, order: &Order) { + pub fn frozen_balance(&self, balance_manager: &mut BalanceManagerWrapper<'_>, order: &Order) { let asset = if is_order_ask(order) { &self.base } else { &self.quote }; - self.balance_manager.balance_frozen(order.user, asset, &order.frozen); + balance_manager.balance_frozen(order.user, asset, &order.frozen); } - pub fn unfrozen_balance(&self, order: &Order) { + pub fn unfrozen_balance(&self, balance_manager: &mut BalanceManagerWrapper<'_>, order: &Order) { debug_assert!(order.remain.is_sign_positive()); if order.remain.is_zero() { return; } let asset = if is_order_ask(&order) { &self.base } else { &self.quote }; - self.balance_manager.balance_unfrozen(order.user, asset, &order.frozen); + balance_manager.balance_unfrozen(order.user, asset, &order.frozen); } - pub fn insert_order(&mut self, order_rc: OrderRc) -> Order { - let mut order = order_rc.borrow_mut(); + pub fn insert_order(&mut self, mut order: Order) -> Order { if order.side == OrderSide::ASK { order.frozen = order.remain; } else { @@ -226,6 +302,7 @@ impl Market { debug_assert_eq!(order.type_, OrderType::LIMIT); debug_assert!(!self.orders.contains_key(&order.id)); //println!("order insert {}", &order.id); + let order_rc = OrderRc::new(order); self.orders.insert(order.id, order_rc.clone()); let user_map = self.users.entry(order.user).or_insert_with(BTreeMap::new); debug_assert!(!user_map.contains_key(&order.id)); @@ -239,10 +316,10 @@ impl Market { debug_assert!(!self.bids.contains_key(&key)); self.bids.insert(key, order_rc.clone()); } - *order + order_rc.deep() } - fn order_finish(&mut self, real: bool, order: &Order) { + fn order_finish(&mut self, balance_manager: &mut BalanceManagerWrapper<'_>, persistor: &mut impl PersistExector, order: &Order) { if order.side == OrderSide::ASK { let key = &order.get_ask_key(); debug_assert!(self.asks.contains_key(key)); @@ -252,7 +329,7 @@ impl Market { debug_assert!(self.bids.contains_key(key)); self.bids.remove(key); } - self.unfrozen_balance(&order); + self.unfrozen_balance(balance_manager, order); debug_assert!(self.orders.contains_key(&order.id)); //println!("order finish {}", &order.id); self.orders.remove(&order.id); @@ -260,31 +337,26 @@ impl Market { debug_assert!(user_map.contains_key(&order.id)); user_map.remove(&order.id); - if real { - // TODO need this if?? - if order.finished_base.is_sign_positive() { - self.history_writer.borrow_mut().append_order_history(&order); - } - let order_message = OrderMessage { - event: OrderEventType::FINISH, - order: *order, - base: self.base.clone(), - quote: self.quote.clone(), - }; - self.message_manager.push_order_message(&order_message); - } + persistor.put_order(order, OrderEventType::FINISH); } // the last parameter `quote_limit`, is only used for market bid order, // it indicates the `quote` balance of the user, // so the sum of all the trades' quote amount cannot exceed this value - pub fn execute_order(&mut self, real: bool, taker: OrderRc, quote_limit: &Decimal) { + fn execute_order( + &mut self, + sequencer: &mut Sequencer, + balance_manager: &mut BalanceManagerWrapper<'_>, + persistor: &mut impl PersistExector, + mut taker: Order, + quote_limit: &Decimal, + ) -> Order { log::debug!("execute_order {:?}", taker); - let taker_is_ask = taker.borrow_mut().side == OrderSide::ASK; + let taker_is_ask = taker.side == OrderSide::ASK; let taker_is_bid = !taker_is_ask; let maker_is_bid = taker_is_ask; let maker_is_ask = !maker_is_bid; - let is_limit_order = taker.borrow_mut().type_ == OrderType::LIMIT; + let is_limit_order = taker.type_ == OrderType::LIMIT; let is_market_order = !is_limit_order; //let mut quote_available = *quote_limit; let mut quote_sum = Decimal::zero(); @@ -296,22 +368,22 @@ impl Market { } else { Box::new(self.asks.values_mut()) }; - for maker in counter_orders { - let taker_mut = taker.borrow_mut(); - let maker_mut = maker.borrow_mut(); - if taker_mut.remain.is_zero() { + + for maker_ref in counter_orders { + let mut maker = maker_ref.borrow_mut(); + if taker.remain.is_zero() { break; } let (ask_fee_rate, bid_fee_rate) = if taker_is_ask { - (taker_mut.taker_fee, maker_mut.maker_fee) + (taker.taker_fee, maker.maker_fee) } else { - (maker_mut.maker_fee, taker_mut.taker_fee) + (maker.maker_fee, taker.taker_fee) }; - let price = maker_mut.price; - let (mut ask_order, mut bid_order) = if taker_is_ask { - (taker_mut, maker_mut) + let price = maker.price; + let (ask_order, bid_order) = if taker_is_ask { + (&mut taker, &mut *maker) } else { - (maker_mut, taker_mut) + (&mut *maker, &mut taker) }; if is_limit_order && ask_order.price.gt(&bid_order.price) { break; @@ -341,9 +413,9 @@ impl Market { ask_order.update_time = timestamp; bid_order.update_time = timestamp; - if real { + if persistor.real_persist() { // emit the trade - let trade_id = self.sequencer.borrow_mut().next_trade_id(); + let trade_id = sequencer.next_trade_id(); let trade = types::Trade { id: trade_id, timestamp: utils::current_timestamp(), @@ -362,8 +434,7 @@ impl Market { bid_role: if taker_is_ask { MarketRole::MAKER } else { MarketRole::TAKER }, bid_fee, }; - self.history_writer.borrow_mut().append_trade_history(&trade); - self.message_manager.push_trade_message(&trade); + persistor.put_trade(&trade); self.trade_count += 1; } ask_order.remain -= traded_base_amount; @@ -383,9 +454,8 @@ impl Market { BalanceType::AVAILABLE }; // handle base - self.balance_manager - .balance_add(bid_order.user, BalanceType::AVAILABLE, &self.base, &traded_base_amount); - self.balance_manager.balance_sub( + balance_manager.balance_add(bid_order.user, BalanceType::AVAILABLE, &self.base, &traded_base_amount); + balance_manager.balance_sub( ask_order.user, if maker_is_ask { BalanceType::FREEZE @@ -396,9 +466,8 @@ impl Market { &traded_base_amount, ); // handle quote - self.balance_manager - .balance_add(ask_order.user, BalanceType::AVAILABLE, &self.quote, &traded_quote_amount); - self.balance_manager.balance_sub( + balance_manager.balance_add(ask_order.user, BalanceType::AVAILABLE, &self.quote, &traded_quote_amount); + balance_manager.balance_sub( bid_order.user, if maker_is_bid { BalanceType::FREEZE @@ -410,44 +479,44 @@ impl Market { ); if ask_fee.is_sign_positive() { - self.balance_manager - .balance_sub(ask_order.user, BalanceType::AVAILABLE, &self.quote, &ask_fee); + balance_manager.balance_sub(ask_order.user, BalanceType::AVAILABLE, &self.quote, &ask_fee); } if bid_fee.is_sign_positive() { - self.balance_manager - .balance_sub(bid_order.user, BalanceType::AVAILABLE, &self.base, &bid_fee); + balance_manager.balance_sub(bid_order.user, BalanceType::AVAILABLE, &self.base, &bid_fee); } - let (mut _taker_mut, mut maker_mut) = if taker_is_ask { + /* //Not need + let (_, maker_mut) = if taker_is_ask { (ask_order, bid_order) } else { (bid_order, ask_order) - }; - maker_mut.frozen -= if maker_is_bid { traded_quote_amount } else { traded_base_amount }; + };*/ + maker.frozen -= if maker_is_bid { traded_quote_amount } else { traded_base_amount }; - let maker_finished = maker_mut.remain.is_zero(); + let maker_finished = maker.remain.is_zero(); if maker_finished { - finished_orders.push(*maker_mut); - } - // When maker_finished, `order_finish` will send message. - // So we don't need to send the finish message here. - if real && !maker_finished { - let order_message = message::OrderMessage { - event: OrderEventType::UPDATE, - order: *maker_mut, - base: self.base.clone(), - quote: self.quote.clone(), - }; - self.message_manager.push_order_message(&order_message); + finished_orders.push(*maker); + } else { + // When maker_finished, `order_finish` will send message. + // So we don't need to send the finish message here. + persistor.put_order(&maker, OrderEventType::UPDATE); } } for item in finished_orders.iter() { - self.order_finish(real, item); + self.order_finish(&mut *balance_manager, &mut *persistor, item); } + + taker } - pub fn put_order(&mut self, real: bool, order_input: OrderInput) -> Result { + pub fn put_order( + &mut self, + sequencer: &mut Sequencer, + mut balance_manager: BalanceManagerWrapper<'_>, + mut persistor: impl PersistExector, + order_input: OrderInput, + ) -> Result { if order_input.amount.lt(&self.min_amount) { return Err(anyhow!("invalid amount")); } @@ -475,17 +544,14 @@ impl Market { } } if order_input.side == OrderSide::ASK { - if self - .balance_manager + if balance_manager .balance_get(order_input.user_id, BalanceType::AVAILABLE, &self.base) .lt(&order_input.amount) { return Err(anyhow!("balance not enough")); } } else { - let balance = self - .balance_manager - .balance_get(order_input.user_id, BalanceType::AVAILABLE, &self.quote); + let balance = balance_manager.balance_get(order_input.user_id, BalanceType::AVAILABLE, &self.quote); if order_input.type_ == OrderType::LIMIT { if balance.lt(&(order_input.amount * order_input.price)) { @@ -502,23 +568,22 @@ impl Market { // Here we only make a minimum balance check against the top of the counter order book. // After the check, balance may still be not enough, then the remain part of the order // will be marked as `canceled(finished)`. - let top_counter_order_price = self.asks.values().next().unwrap().borrow_mut().price; + let top_counter_order_price = self.asks.values().next().unwrap().borrow().price; if balance.lt(&(order_input.amount * top_counter_order_price)) { return Err(anyhow!("balance not enough")); } } } let quote_limit = if order_input.type_ == OrderType::MARKET && order_input.side == OrderSide::BID { - self.balance_manager - .balance_get(order_input.user_id, BalanceType::AVAILABLE, &self.quote) + balance_manager.balance_get(order_input.user_id, BalanceType::AVAILABLE, &self.quote) } else { // not used Decimal::zero() }; let t = utils::current_timestamp(); - let order_rc = Rc::new(RefCell::new(Order { - id: self.sequencer.borrow_mut().next_order_id(), + let order = Order { + id: sequencer.next_order_id(), type_: order_input.type_, side: order_input.side, create_time: t, @@ -534,59 +599,48 @@ impl Market { finished_base: Decimal::zero(), finished_quote: Decimal::zero(), finished_fee: Decimal::zero(), - })); - self.execute_order(real, order_rc.clone(), "e_limit); - let mut order = *order_rc.borrow_mut(); + }; + let mut order = self.execute_order(sequencer, &mut balance_manager, &mut persistor, order, "e_limit); if order.type_ == OrderType::LIMIT && !order.remain.is_zero() { - if real { - let order_message = OrderMessage { - event: OrderEventType::PUT, - order, - base: self.base.clone(), - quote: self.quote.clone(), - }; - self.message_manager.push_order_message(&order_message); - } - order = self.insert_order(order_rc); - self.frozen_balance(&order); + persistor.put_order(&order, OrderEventType::PUT); + order = self.insert_order(order); + self.frozen_balance(&mut balance_manager, &order); } else { - if real { - self.history_writer.borrow_mut().append_order_history(&order); - let order_message = OrderMessage { - event: OrderEventType::FINISH, - order, - base: self.base.clone(), - quote: self.quote.clone(), - }; - self.message_manager.push_order_message(&order_message); - } + persistor.put_order(&order, OrderEventType::FINISH); } Ok(order) } - pub fn cancel(&mut self, real: bool, order_id: u64) -> Order { + pub fn cancel(&mut self, mut balance_manager: BalanceManagerWrapper<'_>, mut persistor: impl PersistExector, order_id: u64) -> Order { let order = self.orders.get(&order_id).unwrap(); - let order_struct = *order.borrow_mut(); - self.order_finish(real, &order_struct); + let order_struct = *order.borrow(); + self.order_finish(&mut balance_manager, &mut persistor, &order_struct); order_struct } - pub fn cancel_all_for_user(&mut self, real: bool, user_id: u32) -> usize { + pub fn cancel_all_for_user( + &mut self, + mut balance_manager: BalanceManagerWrapper<'_>, + mut persistor: impl PersistExector, + user_id: u32, + ) -> usize { // TODO: can we mutate while iterate? let order_ids: Vec = self.users.get(&user_id).unwrap_or(&BTreeMap::new()).keys().copied().collect(); let total = order_ids.len(); for order_id in order_ids { - self.cancel(real, order_id); + let order = self.orders.get(&order_id).unwrap(); + let order_struct = *order.borrow(); + self.order_finish(&mut balance_manager, &mut persistor, &order_struct); } total } pub fn get(&self, order_id: u64) -> Option { - self.orders.get(&order_id).map(|o| *o.borrow_mut()) + self.orders.get(&order_id).map(OrderRc::deep) } pub fn get_order_of_user(&self, user_id: u32) -> Vec { self.users .get(&user_id) .unwrap_or(&BTreeMap::new()) .values() - .map(|v| *v.borrow_mut()) + .map(OrderRc::deep) .collect() } pub fn print(&self) { @@ -599,9 +653,9 @@ impl Market { MarketStatus { name: self.name.to_string(), ask_count: self.asks.len(), - ask_amount: self.asks.values().map(|item| item.borrow_mut().remain).sum(), + ask_amount: self.asks.values().map(|item| item.borrow().remain).sum(), bid_count: self.bids.len(), - bid_amount: self.bids.values().map(|item| item.borrow_mut().remain).sum(), + bid_amount: self.bids.values().map(|item| item.borrow().remain).sum(), trade_count: self.trade_count, } } @@ -628,12 +682,12 @@ impl Market { { orderbook .values() - .group_by(|order_rc| -> Decimal { f(&order_rc.borrow_mut()) }) + .group_by(|order_rc| -> Decimal { f(&order_rc.borrow()) }) .into_iter() .take(limit) .map(|(price, group)| PriceInfo { price, - amount: group.map(|order_rc| order_rc.borrow_mut().remain).sum(), + amount: group.map(|order_rc| order_rc.borrow().remain).sum(), }) .collect::>() } @@ -690,8 +744,6 @@ struct BalanceHistoryFromFee { mod tests { use super::*; use crate::asset::AssetManager; - use crate::history::DummyHistoryWriter; - use crate::message::DummyMessageManager; use rust_decimal_macros::*; fn get_simple_market_config() -> config::Market { @@ -739,19 +791,11 @@ mod tests { #[test] fn test_market_taker_is_bid() { //let mut market = get_simple_market_with_data(); - let mut balance_manager = get_simple_balance_manager(); - init_balance(&mut balance_manager); - let sequencer = Rc::new(RefCell::new(Sequencer::default())); - let balance_manager_rc = Rc::new(RefCell::new(balance_manager)); + let balance_manager = &mut get_simple_balance_manager(); + init_balance(balance_manager); + let sequencer = &mut Sequencer::default(); let ask_user_id = 101; - let mut market = Market::new( - &get_simple_market_config(), - balance_manager_rc.clone(), - sequencer, - Rc::new(RefCell::new(DummyHistoryWriter)), - Rc::new(RefCell::new(DummyMessageManager)), - ) - .unwrap(); + let mut market = Market::new(&get_simple_market_config(), balance_manager).unwrap(); let ask_order_input = OrderInput { user_id: ask_user_id, side: OrderSide::ASK, @@ -762,7 +806,9 @@ mod tests { maker_fee: dec!(0.001), market: market.name.to_string(), }; - let ask_order = market.put_order(false, ask_order_input).unwrap(); + let ask_order = market + .put_order(sequencer, balance_manager.into(), DummyPersistor(false), ask_order_input) + .unwrap(); assert_eq!(ask_order.id, 1); assert_eq!(ask_order.remain, dec!(20.0)); @@ -777,7 +823,9 @@ mod tests { maker_fee: dec!(0.001), market: market.name.to_string(), }; - let bid_order = market.put_order(false, bid_order_input).unwrap(); + let bid_order = market + .put_order(sequencer, balance_manager.into(), DummyPersistor(false), bid_order_input) + .unwrap(); // trade: price: 0.10 amount: 10 assert_eq!(bid_order.id, 2); assert_eq!(bid_order.remain, dec!(0)); @@ -794,7 +842,6 @@ mod tests { assert_eq!(ask_order.finished_fee, dec!(0.001)); // original balance: btc 300, eth 1000 - let balance_manager = balance_manager_rc.borrow_mut(); assert_eq!(balance_manager.get(ask_user_id, BalanceType::AVAILABLE, ð()), dec!(980)); assert_eq!(balance_manager.get(ask_user_id, BalanceType::FREEZE, ð()), dec!(10)); diff --git a/src/matchengine/persist.rs b/src/matchengine/persist.rs index 91a4091c..8b9acd71 100644 --- a/src/matchengine/persist.rs +++ b/src/matchengine/persist.rs @@ -1,6 +1,6 @@ use crate::asset; use crate::asset::BalanceManager; -use crate::controller::{Controller, G_STUB}; +use crate::controller::Controller; use crate::database; use crate::models; use crate::types::SimpleResult; @@ -12,9 +12,6 @@ use crate::sqlxextend::*; use sqlx::migrate::Migrator; use sqlx::Connection; -use std::cell::RefCell; -use std::rc::Rc; - use crate::market::Order; use std::convert::TryFrom; @@ -109,7 +106,6 @@ pub async fn load_slice_from_db(conn: &mut ConnectionType, slice_id: i64, contro let amount = balance.balance; controller .balance_manager - .borrow_mut() .set(balance.user_id as u32, balance_type, &balance.asset, &amount); } if let Some(slice_balance) = balances.last() { @@ -136,7 +132,7 @@ pub async fn load_slice_from_db(conn: &mut ConnectionType, slice_id: i64, contro .unwrap(); for order in &orders { let market = controller.markets.get_mut(&order.market).unwrap(); - let order_rc = Rc::new(RefCell::new(Order { + let order = Order { id: order.id as u64, type_: order.order_type, side: order.order_side, @@ -153,8 +149,8 @@ pub async fn load_slice_from_db(conn: &mut ConnectionType, slice_id: i64, contro finished_base: order.finished_base, finished_quote: order.finished_quote, finished_fee: order.finished_fee, - })); - market.insert_order(order_rc); + }; + market.insert_order(order); } if let Some(last_order) = orders.last() { order_id = last_order.id; @@ -211,10 +207,7 @@ pub async fn load_operation_log_from_db(conn: &mut ConnectionType, operation_log controller.replay(&log.method, &log.params).unwrap(); } } - controller - .sequencer - .borrow_mut() - .set_operation_log_id(operation_log_start_id as u64); + controller.sequencer.set_operation_log_id(operation_log_start_id as u64); log::info!("set operation_log_id to {}", operation_log_start_id); } @@ -225,8 +218,8 @@ pub async fn init_from_db(conn: &mut ConnectionType, controller: &mut Controller log::debug!("last slice {:?}", slice); load_slice_from_db(conn, slice.time, controller).await; end_operation_log_id = slice.end_operation_log_id; - controller.sequencer.borrow_mut().set_order_id(slice.end_order_id as u64); - controller.sequencer.borrow_mut().set_trade_id(slice.end_trade_id as u64); + controller.sequencer.set_order_id(slice.end_order_id as u64); + controller.sequencer.set_trade_id(slice.end_trade_id as u64); log::info!("set order_id and trade_id to {} {}", slice.end_order_id, slice.end_trade_id); } load_operation_log_from_db(conn, end_operation_log_id as u64, controller).await; @@ -269,7 +262,7 @@ pub async fn dump_orders(conn: &mut ConnectionType, slice_id: i64, controller: & let mut records = Vec::new(); for market in controller.markets.values() { for order_rc in market.orders.values() { - let order = *order_rc.borrow_mut(); + let order = order_rc.borrow(); let record = OrderSlice { id: order.id as i64, slice_id, @@ -312,7 +305,7 @@ pub async fn dump_orders(conn: &mut ConnectionType, slice_id: i64, controller: & } pub async fn update_slice_history(conn: &mut ConnectionType, slice_id: i64, controller: &Controller) -> SimpleResult { - let sequencer = controller.sequencer.borrow_mut(); + let sequencer = &controller.sequencer; let slice_history = SliceHistory { time: slice_id, end_operation_log_id: sequencer.get_operation_log_id() as i64, @@ -327,7 +320,7 @@ pub async fn update_slice_history(conn: &mut ConnectionType, slice_id: i64, cont pub async fn dump_to_db(conn: &mut ConnectionType, slice_id: i64, controller: &Controller) -> SimpleResult { log::info!("persisting orders and balances to db"); dump_orders(conn, slice_id, controller).await?; - dump_balance(conn, slice_id, &controller.balance_manager.borrow()).await?; + dump_balance(conn, slice_id, &controller.balance_manager).await?; update_slice_history(conn, slice_id, controller).await?; Ok(()) } @@ -448,7 +441,10 @@ fn do_forking() -> bool { } } -pub fn fork_and_make_slice() /*-> SimpleResult*/ +/// # Safety +/// +/// Safe by designation +pub unsafe fn fork_and_make_slice(controller: *const Controller) /*-> SimpleResult*/ { if !do_forking() { return; @@ -460,14 +456,14 @@ pub fn fork_and_make_slice() /*-> SimpleResult*/ //tokio runtime in current thread would highly possible being ruined after fork //so we put our task under new thread, with another tokio runtime + let controller = controller.as_ref().unwrap(); + let thread_handle = std::thread::spawn(move || { let rt: tokio::runtime::Runtime = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .expect("build another runtime for slice-making"); - let controller = unsafe { G_STUB.as_mut().unwrap() }; - if let Err(e) = rt.block_on(make_slice(controller)) { // TODO: it seems sometimes no stderr/stdout is printed here. check it later panic!("panic {:?}", e); @@ -490,7 +486,7 @@ pub fn fork_and_make_slice() /*-> SimpleResult*/ //die fast std::process::exit(exitcode); } - +/* pub fn init_persist_timer() { // use spawn_local here will block the network thread tokio::spawn(async move { @@ -503,3 +499,4 @@ pub fn init_persist_timer() { } }); } +*/ diff --git a/src/matchengine/server.rs b/src/matchengine/server.rs index 61a8896c..04440b0a 100644 --- a/src/matchengine/server.rs +++ b/src/matchengine/server.rs @@ -4,53 +4,138 @@ use tonic::{self, Request, Response, Status}; pub use crate::dto::*; //use crate::me_history::HistoryWriter; -use crate::controller::G_RT; -use crate::controller::G_STUB; +use crate::controller::Controller; +use std::fmt::Debug; +use std::pin::Pin; +use std::sync::Arc; +use tokio::sync::{mpsc, oneshot, RwLock}; -pub struct GrpcHandler {} +type StubType = Arc>; -macro_rules! get_stub { - () => { - unsafe { G_STUB.as_mut().unwrap() } - }; +type ControllerAction = Box Pin + Send>> + Send>; + +pub struct GrpcHandler { + stub: StubType, + task_dispacther: mpsc::Sender, + set_close: mpsc::Sender<()>, +} + +struct ControllerDispatch(ControllerAction, oneshot::Receiver); + +impl ControllerDispatch { + fn new(f: T) -> Self + where + T: for<'c> FnOnce(&'c mut Controller) -> Pin + Send + 'c>>, + T: Send + 'static, + { + let (tx, rx) = oneshot::channel(); + + ControllerDispatch( + Box::new( + move |ctrl: StubType| -> Pin + Send + 'static>> { + Box::pin(async move { + let mut wg = ctrl.write().await; + if let Err(t) = tx.send(f(&mut wg).await) { + log::error!("Controller action can not be return: {:?}", t); + } + }) + }, + ), + rx, + ) + } +} + +fn map_dispatch_err(_: mpsc::error::SendError) -> tonic::Status { + tonic::Status::unknown("Server temporary unavaliable") +} + +type ControllerRet = Result; +type ServerRet = Result, tonic::Status>; + +fn map_dispatch_ret(recv_ret: Result, oneshot::error::RecvError>) -> ServerRet { + match recv_ret { + Ok(ret) => ret.map(Response::new), + Err(_) => Err(Status::unknown("Dispatch ret unreach")), + } +} + +pub struct ServerLeave(mpsc::Sender, mpsc::Sender<()>); + +impl ServerLeave { + pub async fn leave(self) { + self.1.send(()).await.unwrap(); + self.0.closed().await; + } } -fn run_blocking_the_world_task(f: G) -> Result<(), Status> -where - G: FnOnce() -> F + Send + 'static, //We need additional wrapping to send the using of controller into another thread - F: std::future::Future> + 'static, -{ - println!("We start a handling with block-the-world (grpc) mode"); - //let handle = get_stub!().rt.clone(); - - let thr_handle = std::thread::spawn(move || -> Result<(), Status> { - unsafe { - (*G_RT).block_on(f()) - // just for verification - // std::thread::sleep(std::time::Duration::from_secs(10)); - } - }); - - //simply block the thread in a crude way ... - let ret = thr_handle.join(); - println!("Block-the-world task done, continue running"); - ret.unwrap() +impl GrpcHandler { + pub fn new(stub: Controller) -> Self { + let mut persist_interval = tokio::time::interval(std::time::Duration::from_secs(stub.settings.persist_interval as u64)); + + let stub = Arc::new(RwLock::new(stub)); + //we always wait so the size of channel is no matter + let (tx, mut rx) = mpsc::channel(16); + let (tx_close, mut rx_close) = mpsc::channel(1); + + let stub_for_dispatch = stub.clone(); + + let ret = GrpcHandler { + task_dispacther: tx, + set_close: tx_close, + stub, + }; + + tokio::spawn(async move { + persist_interval.tick().await; //skip first tick + loop { + tokio::select! { + may_task = rx.recv() => { + if let Some(task) = may_task { + task(stub_for_dispatch.clone()).await; + }else { + break; + } + } + _ = persist_interval.tick() => { + let stub_rd = stub_for_dispatch.read().await; + log::info!("Start a persisting task"); + unsafe { + crate::persist::fork_and_make_slice(&*stub_rd); + } + } + _ = rx_close.recv() => { + log::info!("Server scheduler is notified to close"); + rx.close(); + } + } + } + + log::warn!("Server scheduler has exited"); + }); + + ret + } + + pub fn on_leave(&self) -> ServerLeave { + ServerLeave(self.task_dispacther.clone(), self.set_close.clone()) + } } #[tonic::async_trait] impl Matchengine for GrpcHandler { async fn asset_list(&self, request: Request) -> Result, Status> { - let stub = get_stub!(); + let stub = self.stub.read().await; Ok(Response::new(stub.asset_list(request.into_inner())?)) } async fn balance_query(&self, request: Request) -> Result, Status> { - let stub = get_stub!(); + let stub = self.stub.read().await; Ok(Response::new(stub.balance_query(request.into_inner())?)) } async fn order_query(&self, request: tonic::Request) -> Result, tonic::Status> { - let stub = get_stub!(); + let stub = self.stub.read().await; Ok(Response::new(stub.order_query(request.into_inner())?)) } //async fn order_book(&self, _request: tonic::Request) -> Result, tonic::Status> { @@ -60,105 +145,87 @@ impl Matchengine for GrpcHandler { &self, request: tonic::Request, ) -> Result, tonic::Status> { - let stub = get_stub!(); + let stub = self.stub.read().await; Ok(Response::new(stub.order_book_depth(request.into_inner())?)) } async fn order_detail(&self, request: tonic::Request) -> Result, tonic::Status> { - let stub = get_stub!(); + let stub = self.stub.read().await; Ok(Response::new(stub.order_detail(request.into_inner())?)) } async fn market_list(&self, request: tonic::Request) -> Result, tonic::Status> { - let stub = get_stub!(); + let stub = self.stub.read().await; Ok(Response::new(stub.market_list(request.into_inner())?)) } async fn market_summary( &self, request: tonic::Request, ) -> Result, tonic::Status> { - let stub = get_stub!(); + let stub = self.stub.read().await; Ok(Response::new(stub.market_summary(request.into_inner())?)) } + /*---------------------------- following are "written ops" ---------------------------------*/ async fn balance_update(&self, request: Request) -> Result, Status> { - let stub = get_stub!(); - Ok(Response::new(stub.update_balance(true, request.into_inner())?)) + let ControllerDispatch(act, rt) = + ControllerDispatch::new(move |ctrl: &mut Controller| Box::pin(async move { ctrl.update_balance(true, request.into_inner()) })); + + self.task_dispacther.send(act).await.map_err(map_dispatch_err)?; + map_dispatch_ret(rt.await) } async fn order_put(&self, request: Request) -> Result, Status> { - let stub = get_stub!(); - Ok(Response::new(stub.order_put(true, request.into_inner())?)) + let ControllerDispatch(act, rt) = + ControllerDispatch::new(move |ctrl: &mut Controller| Box::pin(async move { ctrl.order_put(true, request.into_inner()) })); + + self.task_dispacther.send(act).await.map_err(map_dispatch_err)?; + map_dispatch_ret(rt.await) } async fn order_cancel(&self, request: tonic::Request) -> Result, tonic::Status> { - let stub = get_stub!(); - Ok(Response::new(stub.order_cancel(true, request.into_inner())?)) + let ControllerDispatch(act, rt) = + ControllerDispatch::new(move |ctrl: &mut Controller| Box::pin(async move { ctrl.order_cancel(true, request.into_inner()) })); + + self.task_dispacther.send(act).await.map_err(map_dispatch_err)?; + map_dispatch_ret(rt.await) } async fn order_cancel_all( &self, request: tonic::Request, ) -> Result, tonic::Status> { - let stub = get_stub!(); - Ok(Response::new(stub.order_cancel_all(true, request.into_inner())?)) + let ControllerDispatch(act, rt) = ControllerDispatch::new(move |ctrl: &mut Controller| { + Box::pin(async move { ctrl.order_cancel_all(true, request.into_inner()) }) + }); + + self.task_dispacther.send(act).await.map_err(map_dispatch_err)?; + map_dispatch_ret(rt.await) } // This is the only blocking call of the server #[cfg(debug_assertions)] async fn debug_dump(&self, request: Request) -> Result, Status> { - run_blocking_the_world_task(|| async { - let stub = get_stub!(); - stub.debug_dump(request.into_inner()).await.map(|_| ()) - }) - .map(|_| Response::new(DebugDumpResponse {})) - - // match stub.stw_notifier.replace(None) { - // Some(chn) => { - // let f = Box::pin(stub.debug_dump(request.into_inner())); - // let fs: Box> = Box::new(f); - // chn.send(controller::DebugRunTask::Dump(fs)) - // .map_err(|_| Status::unknown("Can not send the task out"))?; - // Ok(Response::new(DebugDumpResponse {})) - // } - // _ => Err(Status::unknown("No channel for Stop the world, may be occupied?")), - // } + let ControllerDispatch(act, rt) = + ControllerDispatch::new(move |ctrl: &mut Controller| Box::pin(ctrl.debug_dump(request.into_inner()))); + + self.task_dispacther.send(act).await.map_err(map_dispatch_err)?; + map_dispatch_ret(rt.await) } #[cfg(debug_assertions)] async fn debug_reset(&self, request: Request) -> Result, Status> { - run_blocking_the_world_task(|| async { - let stub = get_stub!(); - stub.debug_reset(request.into_inner()).await.map(|_| ()) - }) - .map(|_| Response::new(DebugResetResponse {})) - - // match stub.stw_notifier.replace(None) { - // Some(chn) => { - // let f = Box::pin(stub.debug_reset(request.into_inner())); - // let fs: Box> = Box::new(f); - // chn.send(controller::DebugRunTask::Reset(fs)) - // .map_err(|_| Status::unknown("Can not send the task out"))?; - // Ok(Response::new(DebugResetResponse {})) - // } - // _ => Err(Status::unknown("No channel for Stop the world, may be occupied?")), - // } + let ControllerDispatch(act, rt) = + ControllerDispatch::new(move |ctrl: &mut Controller| Box::pin(ctrl.debug_reset(request.into_inner()))); + + self.task_dispacther.send(act).await.map_err(map_dispatch_err)?; + map_dispatch_ret(rt.await) } #[cfg(debug_assertions)] async fn debug_reload(&self, request: Request) -> Result, Status> { - run_blocking_the_world_task(|| async { - let stub = get_stub!(); - stub.debug_reload(request.into_inner()).await.map(|_| ()) - }) - .map(|_| Response::new(DebugReloadResponse {})) - // match stub.stw_notifier.replace(None) { - // Some(chn) => { - // let f = Box::pin(stub.debug_reload(request.into_inner())); - // let fs: Box> = Box::new(f); - // chn.send(controller::DebugRunTask::Reload(fs)) - // .map_err(|_| Status::unknown("Can not send the task out"))?; - // Ok(Response::new(DebugReloadResponse {})) - // } - // _ => Err(Status::unknown("No channel for Stop the world, may be occupied?")), - // } + let ControllerDispatch(act, rt) = + ControllerDispatch::new(move |ctrl: &mut Controller| Box::pin(ctrl.debug_reload(request.into_inner()))); + + self.task_dispacther.send(act).await.map_err(map_dispatch_err)?; + map_dispatch_ret(rt.await) } #[cfg(not(debug_assertions))]