From 4908814517e1596f8ec1fd96af69410878cfe0ba Mon Sep 17 00:00:00 2001 From: Vijaykumar Singh Date: Sat, 7 Feb 2026 19:23:16 -0600 Subject: [PATCH 1/5] feat: add opt-in TCP_NODELAY via IBAPI_TCP_NODELAY env var Optionally disables Nagle's algorithm on TcpStream when the environment variable IBAPI_TCP_NODELAY=1 is set. Default behavior is unchanged (Nagle enabled, matching upstream). When enabled, small writes (order submissions ~100-200 bytes) are sent immediately instead of being buffered up to 40ms. For trading systems this eliminates latency on order routing with zero practical downside. Usage: IBAPI_TCP_NODELAY=1 ./my-trading-app Affected paths: - sync: TcpSocket::new() and TcpSocket::reconnect() - async: AsyncConnection::connect_with_callback() and reconnect() --- src/connection/async.rs | 9 +++++++++ src/transport/sync.rs | 9 +++++++++ 2 files changed, 18 insertions(+) diff --git a/src/connection/async.rs b/src/connection/async.rs index 67dd9ab8..6101ddcb 100644 --- a/src/connection/async.rs +++ b/src/connection/async.rs @@ -40,6 +40,12 @@ impl AsyncConnection { /// setup that are not part of the normal handshake (e.g., OpenOrder, OrderStatus). pub async fn connect_with_callback(address: &str, client_id: i32, startup_callback: Option) -> Result { let socket = TcpStream::connect(address).await?; + // Optionally disable Nagle's algorithm for low-latency order submission. + // Set IBAPI_TCP_NODELAY=1 to send small writes (order messages) immediately + // instead of buffering up to 40ms. + if std::env::var("IBAPI_TCP_NODELAY").unwrap_or_default() == "1" { + socket.set_nodelay(true)?; + } let connection = Self { client_id, @@ -90,6 +96,9 @@ impl AsyncConnection { match TcpStream::connect(&self.connection_url).await { Ok(new_socket) => { + if std::env::var("IBAPI_TCP_NODELAY").unwrap_or_default() == "1" { + new_socket.set_nodelay(true)?; + } info!("reconnected !!!"); { diff --git a/src/transport/sync.rs b/src/transport/sync.rs index d0f26ec8..32cee8c8 100644 --- a/src/transport/sync.rs +++ b/src/transport/sync.rs @@ -702,6 +702,12 @@ pub(crate) struct TcpSocket { } impl TcpSocket { pub fn new(stream: TcpStream, connection_url: &str) -> Result { + // Optionally disable Nagle's algorithm for low-latency order submission. + // Set IBAPI_TCP_NODELAY=1 to send small writes immediately. + if std::env::var("IBAPI_TCP_NODELAY").unwrap_or_default() == "1" { + stream.set_nodelay(true)?; + } + let writer = stream.try_clone()?; stream.set_read_timeout(Some(TWS_READ_TIMEOUT))?; @@ -718,6 +724,9 @@ impl Reconnect for TcpSocket { fn reconnect(&self) -> Result<(), Error> { match TcpStream::connect(&self.connection_url) { Ok(stream) => { + if std::env::var("IBAPI_TCP_NODELAY").unwrap_or_default() == "1" { + stream.set_nodelay(true)?; + } stream.set_read_timeout(Some(TWS_READ_TIMEOUT))?; let mut reader = self.reader.lock()?; From 644ed16d813a6c71ec92d3c360e84f074b67982c Mon Sep 17 00:00:00 2001 From: Vijaykumar Singh Date: Sat, 7 Feb 2026 21:48:56 -0600 Subject: [PATCH 2/5] feat: add Min10 (10-minute) bar size support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds 10-minute bar size to the BarSize enum for historical data fetching, matching IBKR TWS/Gateway API support. Changes: - Add Min10 variant to BarSize enum (between Min5 and Min15) - Display format: "10 mins" - FromStr parsing: "MIN10" → BarSize::Min10 - Update tests to include Min10 coverage This aligns with the official IBKR API specification which supports 10-minute bars for historical data requests. --- src/market_data/historical/mod.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/market_data/historical/mod.rs b/src/market_data/historical/mod.rs index 2bcfb769..75cafc41 100644 --- a/src/market_data/historical/mod.rs +++ b/src/market_data/historical/mod.rs @@ -79,6 +79,8 @@ pub enum BarSize { Min3, /// Five-minute bars. Min5, + /// Ten-minute bars. + Min10, /// Fifteen-minute bars. Min15, /// Twenty-minute bars. @@ -115,6 +117,7 @@ impl Display for BarSize { Self::Min2 => write!(f, "2 mins"), Self::Min3 => write!(f, "3 mins"), Self::Min5 => write!(f, "5 mins"), + Self::Min10 => write!(f, "10 mins"), Self::Min15 => write!(f, "15 mins"), Self::Min20 => write!(f, "20 mins"), Self::Min30 => write!(f, "30 mins"), @@ -144,6 +147,7 @@ impl FromStr for BarSize { "MIN2" => Ok(Self::Min2), "MIN3" => Ok(Self::Min3), "MIN5" => Ok(Self::Min5), + "MIN10" => Ok(Self::Min10), "MIN15" => Ok(Self::Min15), "MIN20" => Ok(Self::Min20), "MIN30" => Ok(Self::Min30), @@ -599,6 +603,7 @@ mod tests { assert_eq!("2 mins", BarSize::Min2.to_string()); assert_eq!("3 mins", BarSize::Min3.to_string()); assert_eq!("5 mins", BarSize::Min5.to_string()); + assert_eq!("10 mins", BarSize::Min10.to_string()); assert_eq!("15 mins", BarSize::Min15.to_string()); assert_eq!("20 mins", BarSize::Min20.to_string()); assert_eq!("30 mins", BarSize::Min30.to_string()); @@ -623,6 +628,7 @@ mod tests { assert_eq!(BarSize::Min2, BarSize::from("MIN2")); assert_eq!(BarSize::Min3, BarSize::from("MIN3")); assert_eq!(BarSize::Min5, BarSize::from("MIN5")); + assert_eq!(BarSize::Min10, BarSize::from("MIN10")); assert_eq!(BarSize::Min15, BarSize::from("MIN15")); assert_eq!(BarSize::Min20, BarSize::from("MIN20")); assert_eq!(BarSize::Min30, BarSize::from("MIN30")); From 0f2340f06c39c3b52bea8875af4fa9f435b49edd Mon Sep 17 00:00:00 2001 From: Vijaykumar Singh Date: Sat, 7 Feb 2026 23:48:19 -0600 Subject: [PATCH 3/5] fix(historical): parse bar dates with time component IBKR returns bar dates as 'YYYYMMDD HH:MM:SS' (with two spaces) but the parser only handled: 1. Exactly 8 characters ('YYYYMMDD') 2. Unix timestamps This caused parsing errors like 'the year component could not be parsed' for many symbols during warmup. Fix: Add support for parsing dates with time component in the IBKR-specific format (two spaces between date and time). Related: Live trading warmup failures on Windows --- src/market_data/historical/common/decoders.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/market_data/historical/common/decoders.rs b/src/market_data/historical/common/decoders.rs index db36985e..6f17af8e 100644 --- a/src/market_data/historical/common/decoders.rs +++ b/src/market_data/historical/common/decoders.rs @@ -287,12 +287,19 @@ fn parse_schedule_date(text: &str) -> Result { fn parse_bar_date(text: &str, time_zone: &Tz) -> Result { if text.len() == 8 { + // Format: YYYYMMDD let date_format = format_description!("[year][month][day]"); let bar_date = Date::parse(text, date_format)?; let bar_date = bar_date.with_time(time!(00:00)); Ok(bar_date.assume_timezone_utc(time_tz::timezones::db::UTC)) + } else if text.len() > 8 && text.contains(' ') { + // Format: YYYYMMDD HH:MM:SS (note: two spaces between date and time per IBKR format) + let datetime_format = format_description!("[year][month][day] [hour]:[minute]:[second]"); + let bar_datetime = PrimitiveDateTime::parse(text, datetime_format)?; + Ok(bar_datetime.assume_timezone(time_zone).unwrap()) } else { + // Unix timestamp let timestamp: i64 = text.parse()?; let date_utc = OffsetDateTime::from_unix_timestamp(timestamp).unwrap(); Ok(date_utc.to_timezone(time_zone)) From 42d0e877ccac10e24136def9e96415e67046a960 Mon Sep 17 00:00:00 2001 From: Vijaykumar Singh Date: Mon, 9 Feb 2026 09:50:53 -0600 Subject: [PATCH 4/5] fix: prevent spurious retries after subscription EndOfStream When a subscription completes normally (e.g., open_orders receives OpenOrderEnd), stray messages in the message bus queue were causing the subscription to retry decoding up to 10 times, logging warnings for each retry attempt. The fix tracks whether the stream has ended and stops retrying UnexpectedResponse messages after EndOfStream, preventing these spurious warnings and unnecessary retry loops. Changes: - Added stream_ended flag to track subscription state - Check stream_ended before retrying on UnexpectedResponse - Set stream_ended = true when EndOfStream is received - Return immediately if retry is attempted after stream ended This fixes the "retrying after unexpected response" warnings that appeared every 2 minutes after open orders fetch completed. Fixes: vjsingh1984/ibkrtrading issue with IBKR subscription warnings Co-Authored-By: Claude Sonnet 4.5 --- src/subscriptions/async.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/subscriptions/async.rs b/src/subscriptions/async.rs index d59a3fa2..e5a07033 100644 --- a/src/subscriptions/async.rs +++ b/src/subscriptions/async.rs @@ -202,14 +202,24 @@ impl Subscription { context, } => { let mut retry_count = 0; + let mut stream_ended = false; loop { match subscription.next().await { Some(Ok(mut message)) => { let result = decoder(context, &mut message); match process_decode_result(result) { ProcessingResult::Success(val) => return Some(Ok(val)), - ProcessingResult::EndOfStream => return None, + ProcessingResult::EndOfStream => { + stream_ended = true; + return None; + } ProcessingResult::Retry => { + // Stop retrying if stream has already ended + // This prevents spurious retries after EndOfStream when + // extra messages arrive in the message bus queue + if stream_ended { + return None; + } if check_retry(retry_count) == RetryDecision::Stop { return None; } From 525c53c6a357d917df37ac9fa1d48259f42b879b Mon Sep 17 00:00:00 2001 From: Vijaykumar Singh Date: Mon, 9 Feb 2026 10:43:41 -0600 Subject: [PATCH 5/5] Fix: persist stream_ended state in Subscription struct - Add stream_ended field as Arc (follows cancelled pattern) - Initialize in all constructors (with_decoder, new) - Update Clone implementation to include stream_ended - Replace local variable with self.stream_ended.load/store - Set stream_ended=true on cancel and drop This fixes the bug where stream_ended was a local variable that reset on each call to next(), causing spurious retries to continue after EndOfStream. --- src/subscriptions/async.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/subscriptions/async.rs b/src/subscriptions/async.rs index f593f758..d31c0bf1 100644 --- a/src/subscriptions/async.rs +++ b/src/subscriptions/async.rs @@ -25,6 +25,7 @@ pub struct Subscription { _message_type: Option, context: DecoderContext, cancelled: Arc, + stream_ended: Arc, message_bus: Option>, /// Cancel message generator cancel_fn: Option>, @@ -70,6 +71,7 @@ impl Clone for Subscription { _message_type: self._message_type, context: self.context.clone(), cancelled: self.cancelled.clone(), + stream_ended: self.stream_ended.clone(), message_bus: self.message_bus.clone(), cancel_fn: self.cancel_fn.clone(), } @@ -102,6 +104,7 @@ impl Subscription { _message_type: message_type, context, cancelled: Arc::new(AtomicBool::new(false)), + stream_ended: Arc::new(AtomicBool::new(false)), message_bus: Some(message_bus), cancel_fn: None, } @@ -185,6 +188,7 @@ impl Subscription { _message_type: None, context: DecoderContext::default(), cancelled: Arc::new(AtomicBool::new(false)), + stream_ended: Arc::new(AtomicBool::new(false)), message_bus: None, cancel_fn: None, } @@ -202,7 +206,6 @@ impl Subscription { context, } => { let mut retry_count = 0; - let mut stream_ended = false; loop { match subscription.next().await { Some(Ok(mut message)) => { @@ -210,14 +213,14 @@ impl Subscription { match process_decode_result(result) { ProcessingResult::Success(val) => return Some(Ok(val)), ProcessingResult::EndOfStream => { - stream_ended = true; + self.stream_ended.store(true, Ordering::Release); return None; } ProcessingResult::Retry => { // Stop retrying if stream has already ended // This prevents spurious retries after EndOfStream when // extra messages arrive in the message bus queue - if stream_ended { + if self.stream_ended.load(Ordering::Acquire) { return None; } if check_retry(retry_count) == RetryDecision::Stop { @@ -252,6 +255,7 @@ impl Subscription { } self.cancelled.store(true, Ordering::Relaxed); + self.stream_ended.store(true, Ordering::Relaxed); if let (Some(message_bus), Some(cancel_fn)) = (&self.message_bus, &self.cancel_fn) { let id = self.request_id.or(self.order_id); @@ -276,6 +280,7 @@ impl Drop for Subscription { } self.cancelled.store(true, Ordering::Relaxed); + self.stream_ended.store(true, Ordering::Relaxed); // Try to send cancel message if we have the necessary components if let (Some(message_bus), Some(cancel_fn)) = (&self.message_bus, &self.cancel_fn) {