Skip to content

Commit

Permalink
refactor: use oneshot::channel to refactor controller & server (#64)
Browse files Browse the repository at this point in the history
* tokio v1!

* major WIP: refactor controller

* major: refactoring finish, waiting for merge and lint

* clippy

* fmt

* fix actix-rt to 2.0.0-beta2

Co-authored-by: 张卓 <mycinbrin@gmail.com>
Co-authored-by: Weifan <weifan@blackapi.com>
  • Loading branch information
3 people authored Feb 1, 2021
1 parent f37362b commit dd846e3
Show file tree
Hide file tree
Showing 10 changed files with 671 additions and 480 deletions.
99 changes: 48 additions & 51 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ futures-util = { version = "0.3.12", default-features = false }

tokio = { version = "1.1.1", features = ["full"] }
thread-id = "3.3.0"

futures = "0.3.12"
hyper = "0.14.2"
crossbeam-channel = "0.5.0"
Expand All @@ -39,6 +38,7 @@ dotenv = "0.15.0"
num_enum = "0.5.1"
tonic = "0.4.0"
actix-web = "4.0.0-beta.1"
actix-rt = "=2.0.0-beta.2"
qstring = "0.7.2"
thiserror = "1.0.23"
rand = "0.8.3"
Expand Down
35 changes: 9 additions & 26 deletions src/bin/matchengine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,9 @@ fn main() {

rt.block_on(async {
let stub = prepare().await.expect("Init state error");
stub.prepare_stub();
Controller::prepare_runtime(&rt as *const tokio::runtime::Runtime);

let rpc_thread = std::thread::spawn(move || {
let aux_rt: tokio::runtime::Runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("build auxiliary runtime");

println!("start grpc under single-thread runtime");
aux_rt.block_on(grpc_run()).unwrap()
});

tokio::runtime::Handle::current()
.spawn_blocking(|| rpc_thread.join())
.await
.unwrap()
grpc_run(stub).await
})
.unwrap();

Controller::release_stub();
}

async fn prepare() -> anyhow::Result<Controller> {
Expand All @@ -61,18 +43,17 @@ async fn prepare() -> anyhow::Result<Controller> {
Ok(grpc_stub)
}

async fn grpc_run() -> Result<(), Box<dyn std::error::Error>> {
persist::init_persist_timer();

async fn grpc_run(stub: Controller) -> Result<(), Box<dyn std::error::Error>> {
let addr = "0.0.0.0:50051".parse().unwrap();
let grpc = GrpcHandler {};
println!("Starting gprc service");
let grpc = GrpcHandler::new(stub);
log::info!("Starting gprc service");

let (tx, rx) = tokio::sync::oneshot::channel::<()>();
let on_leave = grpc.on_leave();

tokio::spawn(async move {
tokio::signal::ctrl_c().await.ok();
println!("Ctrl-c received, shutting down");
log::info!("Ctrl-c received, shutting down");
tx.send(()).ok();
});

Expand All @@ -83,6 +64,8 @@ async fn grpc_run() -> Result<(), Box<dyn std::error::Error>> {
})
.await?;

println!("Shutted down");
log::info!("Shutted down, wait for final clear");
on_leave.leave().await;
log::info!("Shutted down");
Ok(())
}
Loading

0 comments on commit dd846e3

Please sign in to comment.