diff --git a/common/src/config.rs b/common/src/config.rs index f262af9..fd61644 100644 --- a/common/src/config.rs +++ b/common/src/config.rs @@ -1,4 +1,5 @@ -use serde::Deserialize; +use serde::{Deserialize, Deserializer}; +use std::time::Duration; use std::{net::SocketAddr, path::PathBuf}; use toml::from_str; @@ -34,11 +35,41 @@ pub struct Database { pub client_id: Option, } +#[derive(Deserialize, Debug)] +#[serde(default)] +pub struct Limits { + /// Maximum time to wait on an idle connection between commands. + pub command_timeout: Duration, + + /// Maximum time to all `PULL_MD` queries. + pub pull_md_timeout: Duration, + + /// Maximum time to wait for `HELO` message. + pub hello_timeout: Duration, + + /// Maximum time allowed until TLS handshake completes. + pub tls_handshake_timeout: Duration, +} + +impl Default for Limits { + fn default() -> Self { + Self { + command_timeout: Duration::from_secs(3600), + pull_md_timeout: Duration::from_secs(4 * 60), + hello_timeout: Duration::from_secs(15), + tls_handshake_timeout: Duration::from_secs(10), + } + } +} + #[derive(Deserialize)] pub struct Config { pub lumina: LuminaServer, pub api_server: Option, pub database: Database, + + #[serde(default)] + pub limits: Limits, } pub trait HasConfig { diff --git a/lumen/src/main.rs b/lumen/src/main.rs index c06c4d9..e9c6294 100644 --- a/lumen/src/main.rs +++ b/lumen/src/main.rs @@ -14,7 +14,7 @@ use native_tls::Identity; use std::collections::HashMap; use std::mem::discriminant; use std::process::exit; -use std::time::{Duration, Instant}; +use std::time::Instant; use std::{borrow::Cow, sync::Arc}; use tokio::time::timeout; use tokio::{io::AsyncRead, io::AsyncWrite, net::TcpListener}; @@ -39,10 +39,11 @@ async fn handle_transaction<'a, S: AsyncRead + AsyncWrite + Unpin>( state: &SharedState, user: &'a RpcHello<'a>, mut stream: S, ) -> Result<(), Error> { let db = &state.db; + let limits = &state.config.limits; let server_name = state.server_name.as_str(); trace!("waiting for command.."); - let req = match timeout(Duration::from_secs(3600), rpc::read_packet(&mut stream)).await { + let req = match timeout(limits.command_timeout, rpc::read_packet(&mut stream)).await { Ok(res) => match res { Ok(v) => v, Err(e) => return Err(e), @@ -76,7 +77,7 @@ async fn handle_transaction<'a, S: AsyncRead + AsyncWrite + Unpin>( match req { RpcMessage::PullMetadata(md) => { let start = Instant::now(); - let funcs = match timeout(Duration::from_secs(4 * 60), db.get_funcs(&md.funcs)).await { + let funcs = match timeout(limits.pull_md_timeout, db.get_funcs(&md.funcs)).await { Ok(r) => match r { Ok(v) => v, Err(e) => { @@ -268,7 +269,9 @@ async fn handle_client( state: &SharedState, mut stream: S, ) -> Result<(), rpc::Error> { let server_name = &state.server_name; - let hello = match timeout(Duration::from_secs(15), rpc::read_packet(&mut stream)).await { + let limits = &state.config.limits; + + let hello = match timeout(limits.hello_timeout, rpc::read_packet(&mut stream)).await { Ok(v) => v?, Err(_) => { debug!("didn't get hello in time."); @@ -403,7 +406,9 @@ async fn serve( debug!("Connection from {:?}{}: {} active connections", &addr, protocol, count); match accpt { Some(accpt) => { - match timeout(Duration::from_secs(10), accpt.accept(client)).await { + match timeout(state.config.limits.tls_handshake_timeout, accpt.accept(client)) + .await + { Ok(r) => match r { Ok(s) => { handle_connection(&state, s).await;