diff --git a/lumen/src/server.rs b/lumen/src/server.rs index 32ad718..e467ddc 100644 --- a/lumen/src/server.rs +++ b/lumen/src/server.rs @@ -1,10 +1,5 @@ use std::{ - borrow::Cow, - collections::HashMap, - mem::discriminant, - process::exit, - sync::Arc, - time::{Duration, Instant}, + borrow::Cow, collections::HashMap, mem::discriminant, process::exit, sync::Arc, time::Instant, }; use common::{ @@ -33,21 +28,22 @@ async fn handle_transaction<'a, S: AsyncRead + AsyncWrite + Unpin>( let server_name = state.server_name.as_str(); trace!("waiting for command.."); - let req = match timeout(Duration::from_secs(3600), rpc::read_packet(&mut stream)).await { - Ok(res) => match res { - Ok(v) => v, - Err(e) => return Err(e), - }, - Err(_) => { - _ = RpcMessage::Fail(RpcFail { - code: 0, - message: &format!("{server_name} client idle for too long.\n"), - }) - .async_write(&mut stream) - .await; - return Err(Error::Timeout); - }, - }; + let req = + match timeout(state.config.limits.command_timeout, rpc::read_packet(&mut stream)).await { + Ok(res) => match res { + Ok(v) => v, + Err(e) => return Err(e), + }, + Err(_) => { + _ = RpcMessage::Fail(RpcFail { + code: 0, + message: &format!("{server_name} client idle for too long.\n"), + }) + .async_write(&mut stream) + .await; + return Err(Error::Timeout); + }, + }; trace!("got command!"); let req = match RpcMessage::deserialize(&req) { Ok(v) => v, @@ -67,33 +63,34 @@ async fn handle_transaction<'a, S: AsyncRead + AsyncWrite + Unpin>( match req { RpcMessage::PullMetadata(md) => { let start = Instant::now(); - let funcs = match timeout(Duration::from_secs(4 * 60), db.get_funcs(&md.funcs)).await { - Ok(r) => match r { - Ok(v) => v, - Err(e) => { - error!("pull failed, db: {}", e); - rpc::RpcMessage::Fail(rpc::RpcFail { + let funcs = + match timeout(state.config.limits.pull_md_timeout, db.get_funcs(&md.funcs)).await { + Ok(r) => match r { + Ok(v) => v, + Err(e) => { + error!("pull failed, db: {}", e); + rpc::RpcMessage::Fail(rpc::RpcFail { + code: 0, + message: &format!( + "{server_name}: db error; please try again later..\n" + ), + }) + .async_write(&mut stream) + .await?; + return Ok(()); + }, + }, + Err(_) => { + RpcMessage::Fail(RpcFail { code: 0, - message: &format!( - "{server_name}: db error; please try again later..\n" - ), + message: &format!("{server_name}: query took too long to execute.\n"), }) .async_write(&mut stream) .await?; - return Ok(()); + debug!("pull query timeout"); + return Err(Error::Timeout); }, - }, - Err(_) => { - RpcMessage::Fail(RpcFail { - code: 0, - message: &format!("{server_name}: query took too long to execute.\n"), - }) - .async_write(&mut stream) - .await?; - debug!("pull query timeout"); - return Err(Error::Timeout); - }, - }; + }; let pulled_funcs = funcs.iter().filter(|v| v.is_some()).count(); state.metrics.pulls.inc_by(pulled_funcs as _); state.metrics.queried_funcs.inc_by(md.funcs.len() as _); @@ -259,13 +256,14 @@ async fn handle_client( state: &SharedState, mut stream: S, ) -> Result<(), rpc::Error> { let server_name = &state.server_name; - let hello = match timeout(Duration::from_secs(15), rpc::read_packet(&mut stream)).await { - Ok(v) => v?, - Err(_) => { - debug!("didn't get hello in time."); - return Ok(()); - }, - }; + let hello = + match timeout(state.config.limits.hello_timeout, rpc::read_packet(&mut stream)).await { + Ok(v) => v?, + Err(_) => { + debug!("didn't get hello in time."); + return Ok(()); + }, + }; let (hello, creds) = match RpcMessage::deserialize(&hello) { Ok(RpcMessage::Hello(v, creds)) => { @@ -394,7 +392,9 @@ async fn serve( debug!("Connection from {:?}{}: {} active connections", &addr, protocol, count); match accpt { Some(accpt) => { - match timeout(Duration::from_secs(10), accpt.accept(client)).await { + match timeout(state.config.limits.tls_handshake_timeout, accpt.accept(client)) + .await + { Ok(r) => match r { Ok(s) => { handle_connection(&state, s).await;