Skip to content

Commit

Permalink
add configurable time limits (#138)
Browse files Browse the repository at this point in the history
  • Loading branch information
naim94a authored Feb 25, 2024
1 parent c96c91f commit 8abc0e4
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 6 deletions.
33 changes: 32 additions & 1 deletion common/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use serde::Deserialize;
use serde::{Deserialize, Deserializer};

Check warning on line 1 in common/src/config.rs

View workflow job for this annotation

GitHub Actions / clippy

unused import: `Deserializer`

warning: unused import: `Deserializer` --> common/src/config.rs:1:26 | 1 | use serde::{Deserialize, Deserializer}; | ^^^^^^^^^^^^ | = note: `#[warn(unused_imports)]` on by default
use std::time::Duration;
use std::{net::SocketAddr, path::PathBuf};
use toml::from_str;

Expand Down Expand Up @@ -34,11 +35,41 @@ pub struct Database {
pub client_id: Option<PathBuf>,
}

#[derive(Deserialize, Debug)]
#[serde(default)]
pub struct Limits {
/// Maximum time to wait on an idle connection between commands.
pub command_timeout: Duration,

/// Maximum time to all `PULL_MD` queries.
pub pull_md_timeout: Duration,

/// Maximum time to wait for `HELO` message.
pub hello_timeout: Duration,

/// Maximum time allowed until TLS handshake completes.
pub tls_handshake_timeout: Duration,
}

impl Default for Limits {
fn default() -> Self {
Self {
command_timeout: Duration::from_secs(3600),
pull_md_timeout: Duration::from_secs(4 * 60),
hello_timeout: Duration::from_secs(15),
tls_handshake_timeout: Duration::from_secs(10),
}
}
}

#[derive(Deserialize)]
pub struct Config {
pub lumina: LuminaServer,
pub api_server: Option<WebServer>,
pub database: Database,

#[serde(default)]
pub limits: Limits,
}

pub trait HasConfig {
Expand Down
15 changes: 10 additions & 5 deletions lumen/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use native_tls::Identity;
use std::collections::HashMap;
use std::mem::discriminant;
use std::process::exit;
use std::time::{Duration, Instant};
use std::time::Instant;
use std::{borrow::Cow, sync::Arc};
use tokio::time::timeout;
use tokio::{io::AsyncRead, io::AsyncWrite, net::TcpListener};
Expand All @@ -39,10 +39,11 @@ async fn handle_transaction<'a, S: AsyncRead + AsyncWrite + Unpin>(
state: &SharedState, user: &'a RpcHello<'a>, mut stream: S,
) -> Result<(), Error> {
let db = &state.db;
let limits = &state.config.limits;
let server_name = state.server_name.as_str();

trace!("waiting for command..");
let req = match timeout(Duration::from_secs(3600), rpc::read_packet(&mut stream)).await {
let req = match timeout(limits.command_timeout, rpc::read_packet(&mut stream)).await {
Ok(res) => match res {
Ok(v) => v,
Err(e) => return Err(e),
Expand Down Expand Up @@ -76,7 +77,7 @@ async fn handle_transaction<'a, S: AsyncRead + AsyncWrite + Unpin>(
match req {
RpcMessage::PullMetadata(md) => {
let start = Instant::now();
let funcs = match timeout(Duration::from_secs(4 * 60), db.get_funcs(&md.funcs)).await {
let funcs = match timeout(limits.pull_md_timeout, db.get_funcs(&md.funcs)).await {
Ok(r) => match r {
Ok(v) => v,
Err(e) => {
Expand Down Expand Up @@ -268,7 +269,9 @@ async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
state: &SharedState, mut stream: S,
) -> Result<(), rpc::Error> {
let server_name = &state.server_name;
let hello = match timeout(Duration::from_secs(15), rpc::read_packet(&mut stream)).await {
let limits = &state.config.limits;

let hello = match timeout(limits.hello_timeout, rpc::read_packet(&mut stream)).await {
Ok(v) => v?,
Err(_) => {
debug!("didn't get hello in time.");
Expand Down Expand Up @@ -403,7 +406,9 @@ async fn serve(
debug!("Connection from {:?}{}: {} active connections", &addr, protocol, count);
match accpt {
Some(accpt) => {
match timeout(Duration::from_secs(10), accpt.accept(client)).await {
match timeout(state.config.limits.tls_handshake_timeout, accpt.accept(client))
.await
{
Ok(r) => match r {
Ok(s) => {
handle_connection(&state, s).await;
Expand Down

0 comments on commit 8abc0e4

Please sign in to comment.