Skip to content

Commit

Permalink
Feature: persistor for all history (#70)
Browse files Browse the repository at this point in the history
  • Loading branch information
noel2004 authored Feb 8, 2021
1 parent 44e6359 commit 7949366
Show file tree
Hide file tree
Showing 15 changed files with 1,094 additions and 167 deletions.
2 changes: 1 addition & 1 deletion src/bin/matchengine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ async fn prepare() -> anyhow::Result<Controller> {

async fn grpc_run(stub: Controller) -> Result<(), Box<dyn std::error::Error>> {
let addr = "0.0.0.0:50051".parse().unwrap();
let grpc = GrpcHandler::new(stub);
let mut grpc = GrpcHandler::new(stub);
log::info!("Starting gprc service");

let (tx, rx) = tokio::sync::oneshot::channel::<()>();
Expand Down
90 changes: 57 additions & 33 deletions src/bin/persistor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,12 @@

use database::{DatabaseWriter, DatabaseWriterConfig};
use dingir_exchange::{config, database, message, models, types};
use types::{ConnectionType, DbType};
use std::pin::Pin;
use types::DbType;

use rdkafka::consumer::{stream_consumer, ConsumerContext, DefaultConsumerContext, StreamConsumer};
use rdkafka::consumer::StreamConsumer;

use message::persist::MIGRATOR;

//use sqlx::Connection;
struct AppliedConsumer<C: ConsumerContext + 'static = DefaultConsumerContext>(stream_consumer::StreamConsumer<C>);

impl<C: ConsumerContext + 'static> message::consumer::RdConsumerExt for AppliedConsumer<C> {
type CTXType = stream_consumer::StreamConsumerContext<C>;
type SelfType = stream_consumer::StreamConsumer<C>;
fn to_self(&self) -> &Self::SelfType {
&self.0
}
}

use sqlx::Connection;
use message::persist::{self, TopicConfig, MIGRATOR};

fn main() {
dotenv::dotenv().ok();
Expand All @@ -46,33 +34,57 @@ fn main() {
.set("group.id", &settings.consumer_group)
.set("enable.partition.eof", "false")
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "true")
.set("enable.auto.commit", "false")
.set("auto.offset.reset", "earliest")
.create()
.unwrap();
let consumer = AppliedConsumer(consumer);

MIGRATOR
.run(&mut ConnectionType::connect(&settings.db_history).await.unwrap())
.await
.ok();
let consumer = std::sync::Arc::new(consumer);

let pool = sqlx::Pool::<DbType>::connect(&settings.db_history).await.unwrap();

let persistor: DatabaseWriter<models::TradeRecord> = DatabaseWriter::new(&DatabaseWriterConfig {
MIGRATOR.run(&pool).await.ok();

let write_config = DatabaseWriterConfig {
spawn_limit: 4,
apply_benchmark: true,
capability_limit: 8192,
})
.start_schedule(&pool)
.unwrap();
};

let persistor_kline: DatabaseWriter<models::TradeRecord> = DatabaseWriter::new(&write_config).start_schedule(&pool).unwrap();

//following is equal to writers in history.rs
let persistor_trade: DatabaseWriter<models::TradeHistory> = DatabaseWriter::new(&write_config).start_schedule(&pool).unwrap();

let persistor_order: DatabaseWriter<models::OrderHistory> = DatabaseWriter::new(&write_config).start_schedule(&pool).unwrap();

let persistor_balance: DatabaseWriter<models::BalanceHistory> = DatabaseWriter::new(&write_config).start_schedule(&pool).unwrap();

let trade_cfg = TopicConfig::<message::Trade>::new(message::TRADES_TOPIC)
.persist_to(&persistor_kline)
.persist_to(&persistor_trade)
.with_tr::<persist::AskTrade>()
.persist_to(&persistor_trade)
.with_tr::<persist::BidTrade>();

let order_cfg = TopicConfig::<message::OrderMessage>::new(message::ORDERS_TOPIC).persist_to(&persistor_order);

let balance_cfg = TopicConfig::<message::BalanceMessage>::new(message::BALANCES_TOPIC).persist_to(&persistor_balance);

let auto_commit = vec![
trade_cfg.auto_commit_start(consumer.clone()),
order_cfg.auto_commit_start(consumer.clone()),
balance_cfg.auto_commit_start(consumer.clone()),
];
let consumer = consumer.as_ref();

loop {
let cr_main = message::consumer::SimpleConsumer::new(&consumer)
.add_topic(
message::TRADES_TOPIC,
message::persist::MsgDataPersistor::<_, types::Trade>::new(&persistor),
)
.unwrap();
let cr_main = message::consumer::SimpleConsumer::new(consumer)
.add_topic_config(&trade_cfg).unwrap()
.add_topic_config(&order_cfg).unwrap()
.add_topic_config(&balance_cfg).unwrap()
// .add_topic(message::TRADES_TOPIC, MsgDataPersistor::new(&persistor).handle_message::<message::Trade>())
;

tokio::select! {
_ = tokio::signal::ctrl_c() => {
Expand All @@ -86,6 +98,18 @@ fn main() {
}
}

persistor.finish().await.unwrap();
tokio::try_join!(
persistor_kline.finish(),
persistor_trade.finish(),
persistor_order.finish(),
persistor_balance.finish(),
)
.expect("all persistor should success finish");
let final_commits: Vec<Pin<Box<dyn std::future::Future<Output = ()> + Send>>> = auto_commit
.into_iter()
.map(|ac| -> Pin<Box<dyn std::future::Future<Output = ()> + Send>> { Box::pin(ac.final_commit(consumer)) })
.collect();
futures::future::join_all(final_commits).await;
//auto_commit.final_commit(consumer).await;
})
}
28 changes: 27 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,37 @@ impl Default for Market {
}
}

#[derive(Debug, PartialEq, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Copy, Clone)]
pub enum PersistPolicy {
Dummy,
Both,
ToDB,
ToMessage,
}

use serde::de;

impl<'de> de::Deserialize<'de> for PersistPolicy {
fn deserialize<D: de::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
let s = String::deserialize(deserializer)?;

match s.as_ref() {
"Both" | "both" => Ok(PersistPolicy::Both),
"Db" | "db" | "DB" => Ok(PersistPolicy::ToDB),
"Message" | "message" => Ok(PersistPolicy::ToMessage),
"Dummy" | "dummy" => Ok(PersistPolicy::Dummy),
_ => Err(serde::de::Error::custom("unexpected specification for persist policy")),
}
}
}

#[derive(Debug, PartialEq, Deserialize)]
#[serde(default)]
pub struct Settings {
pub debug: bool,
pub db_log: String,
pub db_history: String,
pub history_persist_policy: PersistPolicy,
pub assets: Vec<Asset>,
pub markets: Vec<Market>,
pub brokers: String,
Expand All @@ -71,6 +96,7 @@ impl Default for Settings {
debug: false,
db_log: Default::default(),
db_history: Default::default(),
history_persist_policy: PersistPolicy::Both,
assets: Vec::new(),
markets: Vec::new(),
consumer_group: "kline_data_fetcher".to_string(),
Expand Down
2 changes: 2 additions & 0 deletions src/matchengine/asset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,8 @@ impl<T: MessageManager> PersistExector for MessengerAsPersistor<'_, T> {
asset: balance.asset.clone(),
business: balance.business.clone(),
change: balance.change.to_string(),
balance: balance.balance.to_string(),
detail: balance.detail,
});
}
}
Expand Down
14 changes: 4 additions & 10 deletions src/matchengine/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,7 @@ use sqlx::Executor;
use serde::Serialize;
use std::str::FromStr;

#[derive(Copy, Clone)]
enum PersistPolicy {
Dummy,
Both,
ToDB,
ToMessege,
}
pub use config::PersistPolicy;

pub struct Persistor {
history_writer: DatabaseHistoryWriter,
Expand All @@ -54,7 +48,7 @@ impl<'c> PersistorGen<'c> {
match self.policy {
PersistPolicy::Dummy => Box::new(market::DummyPersistor(false)),
PersistPolicy::ToDB => Box::new(market::persistor_for_db(&mut self.base.history_writer)),
PersistPolicy::ToMessege => Box::new(market::persistor_for_message(
PersistPolicy::ToMessage => Box::new(market::persistor_for_message(
self.base.message_manager.as_mut().unwrap(),
market_tag,
)),
Expand All @@ -69,7 +63,7 @@ impl<'c> PersistorGen<'c> {
match self.policy {
PersistPolicy::Dummy => Box::new(asset::DummyPersistor(false)),
PersistPolicy::ToDB => Box::new(asset::persistor_for_db(&mut self.base.history_writer)),
PersistPolicy::ToMessege => Box::new(asset::persistor_for_message(self.base.message_manager.as_mut().unwrap())),
PersistPolicy::ToMessage => Box::new(asset::persistor_for_message(self.base.message_manager.as_mut().unwrap())),
PersistPolicy::Both => Box::new((
asset::persistor_for_db(&mut self.base.history_writer),
asset::persistor_for_message(self.base.message_manager.as_mut().unwrap()),
Expand Down Expand Up @@ -153,7 +147,7 @@ impl Controller {
.start_schedule(&main_pool)
.unwrap();

let persist_policy = PersistPolicy::Both;
let persist_policy = settings.history_persist_policy;

Controller {
settings,
Expand Down
2 changes: 1 addition & 1 deletion src/matchengine/dto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::str::FromStr;
pub fn order_to_proto(o: &market::Order) -> OrderInfo {
OrderInfo {
id: o.id,
market: String::from(o.market),
market: String::from(&*o.market),
order_type: if o.type_ == market::OrderType::LIMIT {
OrderType::Limit as i32
} else {
Expand Down
23 changes: 22 additions & 1 deletion src/matchengine/history.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::database::{DatabaseWriter, DatabaseWriterConfig};
use crate::market;
use crate::models;
use crate::types::Trade;
use market::Trade;

use crate::utils::FTimestamp;
use anyhow::Result;
Expand Down Expand Up @@ -44,6 +44,27 @@ impl DatabaseHistoryWriter {
}
}

impl<'r> From<&'r market::Order> for models::OrderHistory {
fn from(order: &'r market::Order) -> Self {
models::OrderHistory {
id: order.id as i64,
create_time: FTimestamp(order.create_time).into(),
finish_time: FTimestamp(order.update_time).into(),
user_id: order.user as i32,
market: order.market.to_string(),
order_type: order.type_,
order_side: order.side,
price: order.price,
amount: order.amount,
taker_fee: order.taker_fee,
maker_fee: order.maker_fee,
finished_base: order.finished_base,
finished_quote: order.finished_quote,
finished_fee: order.finished_fee,
}
}
}

impl HistoryWriter for DatabaseHistoryWriter {
fn is_block(&self) -> bool {
self.balance_writer.is_block() || self.trade_writer.is_block() || self.order_writer.is_block()
Expand Down
Loading

0 comments on commit 7949366

Please sign in to comment.