From bcc011e18866d9ae67c3f0db22d92a64dea5d107 Mon Sep 17 00:00:00 2001 From: Wil Boayue Date: Thu, 1 Jan 2026 15:38:54 -0800 Subject: [PATCH 1/4] feat: add startup message callback for connection setup (#340) Add connect_with_callback() to handle unsolicited messages like OpenOrder and OrderStatus during connection handshake instead of discarding them. - Add StartupMessageCallback type for processing startup messages - Add connect_with_callback() to async and sync Client - Update ConnectionProtocol trait to accept callback parameter - Re-export StartupMessageCallback from crate root --- src/client/async.rs | 46 +++++++++- src/client/sync.rs | 43 +++++++++- src/connection.rs | 3 + src/connection/async.rs | 24 ++++-- src/connection/common.rs | 179 +++++++++++++++++++++++++++++++++++++-- src/connection/sync.rs | 24 ++++-- src/lib.rs | 36 ++++++++ src/transport/sync.rs | 18 ++-- 8 files changed, 341 insertions(+), 32 deletions(-) diff --git a/src/client/async.rs b/src/client/async.rs index 05de3703..7d52fab7 100644 --- a/src/client/async.rs +++ b/src/client/async.rs @@ -7,6 +7,7 @@ use log::debug; use time::OffsetDateTime; use time_tz::Tz; +use crate::connection::common::StartupMessageCallback; use crate::connection::{r#async::AsyncConnection, ConnectionMetadata}; use crate::messages::{OutgoingMessages, RequestMessage}; use crate::transport::{ @@ -71,7 +72,50 @@ impl Client { /// } /// ``` pub async fn connect(address: &str, client_id: i32) -> Result { - let connection = AsyncConnection::connect(address, client_id).await?; + Self::connect_with_callback(address, client_id, None).await + } + + /// Establishes async connection to TWS or Gateway with a callback for startup messages + /// + /// This is similar to [`connect`](Self::connect), but allows you to provide a callback + /// that will be invoked for any unsolicited messages received during the connection + /// handshake (e.g., OpenOrder, OrderStatus). + /// + /// # Arguments + /// * `address` - address of server. e.g. 127.0.0.1:4002 + /// * `client_id` - id of client. e.g. 100 + /// * `startup_callback` - optional callback for unsolicited messages during connection + /// + /// # Examples + /// + /// ```no_run + /// use ibapi::{Client, StartupMessageCallback}; + /// use ibapi::messages::IncomingMessages; + /// use std::sync::{Arc, Mutex}; + /// + /// #[tokio::main] + /// async fn main() { + /// let orders = Arc::new(Mutex::new(Vec::new())); + /// let orders_clone = orders.clone(); + /// + /// let callback: StartupMessageCallback = Box::new(move |msg| { + /// match msg.message_type() { + /// IncomingMessages::OpenOrder | IncomingMessages::OrderStatus => { + /// orders_clone.lock().unwrap().push(msg); + /// } + /// _ => {} + /// } + /// }); + /// + /// let client = Client::connect_with_callback("127.0.0.1:4002", 100, Some(callback)) + /// .await + /// .expect("connection failed"); + /// + /// println!("Received {} startup orders", orders.lock().unwrap().len()); + /// } + /// ``` + pub async fn connect_with_callback(address: &str, client_id: i32, startup_callback: Option) -> Result { + let connection = AsyncConnection::connect_with_callback(address, client_id, startup_callback).await?; let connection_metadata = connection.connection_metadata(); let message_bus = Arc::new(AsyncTcpMessageBus::new(connection)?); diff --git a/src/client/sync.rs b/src/client/sync.rs index 5c67cca6..1ecccf09 100644 --- a/src/client/sync.rs +++ b/src/client/sync.rs @@ -15,6 +15,7 @@ use time_tz::Tz; use crate::accounts::types::{AccountGroup, AccountId, ContractId, ModelCode}; use crate::accounts::{AccountSummaryResult, AccountUpdate, AccountUpdateMulti, FamilyCode, PnL, PnLSingle, PositionUpdate, PositionUpdateMulti}; +use crate::connection::common::StartupMessageCallback; use crate::connection::{sync::Connection, ConnectionMetadata}; use crate::contracts::{Contract, OptionComputation, SecurityType}; use crate::display_groups::DisplayGroupUpdate; @@ -71,10 +72,50 @@ impl Client { /// println!("next_order_id: {}", client.next_order_id()); /// ``` pub fn connect(address: &str, client_id: i32) -> Result { + Self::connect_with_callback(address, client_id, None) + } + + /// Establishes connection to TWS or Gateway with a callback for startup messages + /// + /// This is similar to [`connect`](Self::connect), but allows you to provide a callback + /// that will be invoked for any unsolicited messages received during the connection + /// handshake (e.g., OpenOrder, OrderStatus). + /// + /// # Arguments + /// * `address` - address of server. e.g. 127.0.0.1:4002 + /// * `client_id` - id of client. e.g. 100 + /// * `startup_callback` - optional callback for unsolicited messages during connection + /// + /// # Examples + /// + /// ```no_run + /// use ibapi::client::blocking::Client; + /// use ibapi::StartupMessageCallback; + /// use ibapi::messages::IncomingMessages; + /// use std::sync::{Arc, Mutex}; + /// + /// let orders = Arc::new(Mutex::new(Vec::new())); + /// let orders_clone = orders.clone(); + /// + /// let callback: StartupMessageCallback = Box::new(move |msg| { + /// match msg.message_type() { + /// IncomingMessages::OpenOrder | IncomingMessages::OrderStatus => { + /// orders_clone.lock().unwrap().push(msg); + /// } + /// _ => {} + /// } + /// }); + /// + /// let client = Client::connect_with_callback("127.0.0.1:4002", 100, Some(callback)) + /// .expect("connection failed"); + /// + /// println!("Received {} startup orders", orders.lock().unwrap().len()); + /// ``` + pub fn connect_with_callback(address: &str, client_id: i32, startup_callback: Option) -> Result { let stream = TcpStream::connect(address)?; let socket = TcpSocket::new(stream, address)?; - let connection = Connection::connect(socket, client_id)?; + let connection = Connection::connect_with_callback(socket, client_id, startup_callback)?; let connection_metadata = connection.connection_metadata(); let message_bus = Arc::new(TcpMessageBus::new(connection)?); diff --git a/src/connection.rs b/src/connection.rs index 97051cdc..ade98277 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -5,6 +5,9 @@ use time_tz::Tz; pub mod common; +// Re-export StartupMessageCallback for lib.rs to re-export publicly +pub use common::StartupMessageCallback; + /// Metadata about the connection to TWS #[derive(Default, Clone, Debug)] pub struct ConnectionMetadata { diff --git a/src/connection/async.rs b/src/connection/async.rs index af90146c..67dd9ab8 100644 --- a/src/connection/async.rs +++ b/src/connection/async.rs @@ -6,7 +6,7 @@ use tokio::net::TcpStream; use tokio::sync::Mutex; use tokio::time::sleep; -use super::common::{parse_connection_time, AccountInfo, ConnectionHandler, ConnectionProtocol}; +use super::common::{parse_connection_time, AccountInfo, ConnectionHandler, ConnectionProtocol, StartupMessageCallback}; use super::ConnectionMetadata; use crate::errors::Error; use crate::messages::{RequestMessage, ResponseMessage}; @@ -29,7 +29,16 @@ pub struct AsyncConnection { impl AsyncConnection { /// Create a new async connection + #[allow(dead_code)] pub async fn connect(address: &str, client_id: i32) -> Result { + Self::connect_with_callback(address, client_id, None).await + } + + /// Create a new async connection with a callback for unsolicited messages + /// + /// The callback will be invoked for any messages received during connection + /// setup that are not part of the normal handshake (e.g., OpenOrder, OrderStatus). + pub async fn connect_with_callback(address: &str, client_id: i32, startup_callback: Option) -> Result { let socket = TcpStream::connect(address).await?; let connection = Self { @@ -44,7 +53,7 @@ impl AsyncConnection { connection_url: address.to_string(), }; - connection.establish_connection().await?; + connection.establish_connection(startup_callback.as_ref()).await?; Ok(connection) } @@ -88,7 +97,8 @@ impl AsyncConnection { *socket = new_socket; } - self.establish_connection().await?; + // Reconnection doesn't use startup callback + self.establish_connection(None).await?; return Ok(()); } @@ -102,10 +112,10 @@ impl AsyncConnection { } /// Establish connection to TWS - pub(crate) async fn establish_connection(&self) -> Result<(), Error> { + pub(crate) async fn establish_connection(&self, startup_callback: Option<&StartupMessageCallback>) -> Result<(), Error> { self.handshake().await?; self.start_api().await?; - self.receive_account_info().await?; + self.receive_account_info(startup_callback).await?; Ok(()) } @@ -209,14 +219,14 @@ impl AsyncConnection { } // Fetches next order id and managed accounts. - pub(crate) async fn receive_account_info(&self) -> Result<(), Error> { + pub(crate) async fn receive_account_info(&self, startup_callback: Option<&StartupMessageCallback>) -> Result<(), Error> { let mut account_info = AccountInfo::default(); let mut attempts = 0; const MAX_ATTEMPTS: i32 = 100; loop { let mut message = self.read_message().await?; - let info = self.connection_handler.parse_account_info(&mut message)?; + let info = self.connection_handler.parse_account_info(&mut message, startup_callback)?; // Merge received info if info.next_order_id.is_some() { diff --git a/src/connection/common.rs b/src/connection/common.rs index 2d2f144c..9d66f8fa 100644 --- a/src/connection/common.rs +++ b/src/connection/common.rs @@ -9,6 +9,13 @@ use crate::errors::Error; use crate::messages::{encode_length, IncomingMessages, OutgoingMessages, RequestMessage, ResponseMessage}; use crate::server_versions; +/// Callback for handling unsolicited messages during connection setup. +/// +/// When TWS sends messages like `OpenOrder` or `OrderStatus` during the connection +/// handshake, this callback is invoked to allow the application to process them +/// instead of discarding them. +pub type StartupMessageCallback = Box; + /// Data exchanged during the connection handshake #[derive(Debug, Clone)] #[allow(dead_code)] @@ -33,7 +40,10 @@ pub trait ConnectionProtocol { fn format_start_api(&self, client_id: i32, server_version: i32) -> RequestMessage; /// Parse account information from incoming messages - fn parse_account_info(&self, message: &mut ResponseMessage) -> Result; + /// + /// If a callback is provided, unsolicited messages (like OpenOrder, OrderStatus) + /// will be passed to it instead of being discarded. + fn parse_account_info(&self, message: &mut ResponseMessage, callback: Option<&StartupMessageCallback>) -> Result; } /// Account information received during connection establishment @@ -98,7 +108,7 @@ impl ConnectionProtocol for ConnectionHandler { message } - fn parse_account_info(&self, message: &mut ResponseMessage) -> Result { + fn parse_account_info(&self, message: &mut ResponseMessage, callback: Option<&StartupMessageCallback>) -> Result { let mut info = AccountInfo::default(); match message.message_type() { @@ -116,11 +126,15 @@ impl ConnectionProtocol for ConnectionHandler { error!("Error during account info: {message:?}"); } _ => { - // Other messages during connection are logged but not processed - warn!( - "CONSUMING MESSAGE during connection setup: {:?} - THIS MESSAGE IS LOST!", - message.message_type() - ); + // Pass unsolicited messages to callback if provided + if let Some(cb) = callback { + cb(message.clone()); + } else { + warn!( + "CONSUMING MESSAGE during connection setup: {:?} - THIS MESSAGE IS LOST!", + message.message_type() + ); + } } } @@ -169,9 +183,160 @@ pub fn parse_connection_time(connection_time: &str) -> (Option, #[cfg(test)] mod tests { use super::*; + use std::sync::{Arc, Mutex}; use time::macros::datetime; use time_tz::{timezones, OffsetResult, PrimitiveDateTimeExt}; + #[test] + fn test_parse_account_info_next_valid_id() { + let handler = ConnectionHandler::default(); + // NextValidId message: message_type=9, version=1, next_order_id=1000 + let mut message = ResponseMessage::from("9\01\01000\0"); + + let result = handler.parse_account_info(&mut message, None); + assert!(result.is_ok()); + + let info = result.unwrap(); + assert_eq!(info.next_order_id, Some(1000)); + assert_eq!(info.managed_accounts, None); + } + + #[test] + fn test_parse_account_info_managed_accounts() { + let handler = ConnectionHandler::default(); + // ManagedAccounts message: message_type=15, version=1, accounts="DU123,DU456" + let mut message = ResponseMessage::from("15\01\0DU123,DU456\0"); + + let result = handler.parse_account_info(&mut message, None); + assert!(result.is_ok()); + + let info = result.unwrap(); + assert_eq!(info.next_order_id, None); + assert_eq!(info.managed_accounts, Some("DU123,DU456".to_string())); + } + + #[test] + fn test_parse_account_info_callback_invoked_for_open_order() { + let handler = ConnectionHandler::default(); + // OpenOrder message: message_type=5 + let mut message = ResponseMessage::from("5\0123\0AAPL\0STK\0"); + + let callback_invoked = Arc::new(Mutex::new(false)); + let callback_invoked_clone = callback_invoked.clone(); + + let callback: StartupMessageCallback = Box::new(move |_msg| { + *callback_invoked_clone.lock().unwrap() = true; + }); + + let result = handler.parse_account_info(&mut message, Some(&callback)); + assert!(result.is_ok()); + + assert!(*callback_invoked.lock().unwrap(), "callback should be invoked for OpenOrder"); + } + + #[test] + fn test_parse_account_info_callback_invoked_for_order_status() { + let handler = ConnectionHandler::default(); + // OrderStatus message: message_type=3 + let mut message = ResponseMessage::from("3\0456\0Filled\0100\0"); + + let callback_invoked = Arc::new(Mutex::new(false)); + let callback_invoked_clone = callback_invoked.clone(); + + let callback: StartupMessageCallback = Box::new(move |_msg| { + *callback_invoked_clone.lock().unwrap() = true; + }); + + let result = handler.parse_account_info(&mut message, Some(&callback)); + assert!(result.is_ok()); + + assert!(*callback_invoked.lock().unwrap(), "callback should be invoked for OrderStatus"); + } + + #[test] + fn test_parse_account_info_callback_receives_message() { + let handler = ConnectionHandler::default(); + // OpenOrder message with identifiable content + let mut message = ResponseMessage::from("5\0999\0TEST_SYMBOL\0"); + + let received_messages = Arc::new(Mutex::new(Vec::new())); + let received_messages_clone = received_messages.clone(); + + let callback: StartupMessageCallback = Box::new(move |msg| { + received_messages_clone.lock().unwrap().push(msg); + }); + + let result = handler.parse_account_info(&mut message, Some(&callback)); + assert!(result.is_ok()); + + let messages = received_messages.lock().unwrap(); + assert_eq!(messages.len(), 1); + assert_eq!(messages[0].message_type(), IncomingMessages::OpenOrder); + } + + #[test] + fn test_parse_account_info_callback_not_invoked_for_next_valid_id() { + let handler = ConnectionHandler::default(); + // NextValidId message should NOT trigger callback + let mut message = ResponseMessage::from("9\01\01000\0"); + + let callback_invoked = Arc::new(Mutex::new(false)); + let callback_invoked_clone = callback_invoked.clone(); + + let callback: StartupMessageCallback = Box::new(move |_msg| { + *callback_invoked_clone.lock().unwrap() = true; + }); + + let result = handler.parse_account_info(&mut message, Some(&callback)); + assert!(result.is_ok()); + + assert!(!*callback_invoked.lock().unwrap(), "callback should NOT be invoked for NextValidId"); + } + + #[test] + fn test_parse_account_info_callback_not_invoked_for_managed_accounts() { + let handler = ConnectionHandler::default(); + // ManagedAccounts message should NOT trigger callback + let mut message = ResponseMessage::from("15\01\0DU123\0"); + + let callback_invoked = Arc::new(Mutex::new(false)); + let callback_invoked_clone = callback_invoked.clone(); + + let callback: StartupMessageCallback = Box::new(move |_msg| { + *callback_invoked_clone.lock().unwrap() = true; + }); + + let result = handler.parse_account_info(&mut message, Some(&callback)); + assert!(result.is_ok()); + + assert!(!*callback_invoked.lock().unwrap(), "callback should NOT be invoked for ManagedAccounts"); + } + + #[test] + fn test_parse_account_info_multiple_messages_callback() { + let handler = ConnectionHandler::default(); + let received_count = Arc::new(Mutex::new(0)); + let received_count_clone = received_count.clone(); + + let callback: StartupMessageCallback = Box::new(move |_msg| { + *received_count_clone.lock().unwrap() += 1; + }); + + // First message: OpenOrder + let mut msg1 = ResponseMessage::from("5\0123\0AAPL\0"); + handler.parse_account_info(&mut msg1, Some(&callback)).unwrap(); + + // Second message: OrderStatus + let mut msg2 = ResponseMessage::from("3\0456\0Filled\0"); + handler.parse_account_info(&mut msg2, Some(&callback)).unwrap(); + + // Third message: NextValidId (should NOT trigger callback) + let mut msg3 = ResponseMessage::from("9\01\01000\0"); + handler.parse_account_info(&mut msg3, Some(&callback)).unwrap(); + + assert_eq!(*received_count.lock().unwrap(), 2, "callback should be invoked exactly twice"); + } + #[test] fn test_parse_connection_time() { let example = "20230405 22:20:39 PST"; diff --git a/src/connection/sync.rs b/src/connection/sync.rs index fde9c3cf..09764807 100644 --- a/src/connection/sync.rs +++ b/src/connection/sync.rs @@ -4,7 +4,7 @@ use std::sync::Mutex; use log::{debug, info}; -use super::common::{parse_connection_time, AccountInfo, ConnectionHandler, ConnectionProtocol}; +use super::common::{parse_connection_time, AccountInfo, ConnectionHandler, ConnectionProtocol, StartupMessageCallback}; use super::ConnectionMetadata; use crate::errors::Error; use crate::messages::{RequestMessage, ResponseMessage}; @@ -28,7 +28,16 @@ pub struct Connection { impl Connection { /// Create a new connection + #[allow(dead_code)] pub fn connect(socket: S, client_id: i32) -> Result { + Self::connect_with_callback(socket, client_id, None) + } + + /// Create a new connection with a callback for unsolicited messages + /// + /// The callback will be invoked for any messages received during connection + /// setup that are not part of the normal handshake (e.g., OpenOrder, OrderStatus). + pub fn connect_with_callback(socket: S, client_id: i32, startup_callback: Option) -> Result { let connection = Self { client_id, socket, @@ -41,7 +50,7 @@ impl Connection { connection_handler: ConnectionHandler::default(), }; - connection.establish_connection()?; + connection.establish_connection(startup_callback.as_ref())?; Ok(connection) } @@ -71,7 +80,8 @@ impl Connection { match self.socket.reconnect() { Ok(_) => { info!("reconnected !!!"); - self.establish_connection()?; + // Reconnection doesn't use startup callback + self.establish_connection(None)?; return Ok(()); } @@ -85,10 +95,10 @@ impl Connection { } /// Establish connection to TWS - pub(crate) fn establish_connection(&self) -> Result<(), Error> { + pub(crate) fn establish_connection(&self, startup_callback: Option<&StartupMessageCallback>) -> Result<(), Error> { self.handshake()?; self.start_api()?; - self.receive_account_info()?; + self.receive_account_info(startup_callback)?; Ok(()) } @@ -165,14 +175,14 @@ impl Connection { } // Fetches next order id and managed accounts. - pub(crate) fn receive_account_info(&self) -> Result<(), Error> { + pub(crate) fn receive_account_info(&self, startup_callback: Option<&StartupMessageCallback>) -> Result<(), Error> { let mut account_info = AccountInfo::default(); let mut attempts = 0; const MAX_ATTEMPTS: i32 = 100; loop { let mut message = self.read_message()?; - let info = self.connection_handler.parse_account_info(&mut message)?; + let info = self.connection_handler.parse_account_info(&mut message, startup_callback)?; // Merge received info if info.next_order_id.is_some() { diff --git a/src/lib.rs b/src/lib.rs index acf3ff6a..de135a52 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -52,6 +52,42 @@ pub(crate) mod transport; /// Connection management pub(crate) mod connection; +/// Callback for handling unsolicited messages during connection setup. +/// +/// When TWS sends messages like `OpenOrder` or `OrderStatus` during the connection +/// handshake, this callback is invoked to allow the application to process them +/// instead of discarding them. +/// +/// # Example +/// +/// ```no_run +/// use ibapi::{Client, StartupMessageCallback}; +/// use ibapi::messages::IncomingMessages; +/// use std::sync::{Arc, Mutex}; +/// +/// #[tokio::main] +/// async fn main() { +/// let orders = Arc::new(Mutex::new(Vec::new())); +/// let orders_clone = orders.clone(); +/// +/// let callback: StartupMessageCallback = Box::new(move |msg| { +/// match msg.message_type() { +/// IncomingMessages::OpenOrder | IncomingMessages::OrderStatus => { +/// orders_clone.lock().unwrap().push(msg); +/// } +/// _ => {} +/// } +/// }); +/// +/// let client = Client::connect_with_callback("127.0.0.1:4002", 100, Some(callback)) +/// .await +/// .expect("connection failed"); +/// +/// println!("Received {} startup orders", orders.lock().unwrap().len()); +/// } +/// ``` +pub use connection::StartupMessageCallback; + /// Common utilities shared across modules pub(crate) mod common; diff --git a/src/transport/sync.rs b/src/transport/sync.rs index 7a31d0ce..d7185fdd 100644 --- a/src/transport/sync.rs +++ b/src/transport/sync.rs @@ -1094,7 +1094,7 @@ mod tests { ]; let stream = MockSocket::new(events, 0); let connection = Connection::stubbed(stream, 28); - connection.establish_connection()?; + connection.establish_connection(None)?; Ok(()) } @@ -1108,7 +1108,7 @@ mod tests { let socket = MockSocket::new(events, MAX_RECONNECT_ATTEMPTS as usize + 1); let connection = Connection::stubbed(socket, 28); - connection.establish_connection()?; + connection.establish_connection(None)?; // simulated dispatcher thread read to trigger disconnection let _ = connection.read_message(); @@ -1130,7 +1130,7 @@ mod tests { let socket = MockSocket::new(events, MAX_RECONNECT_ATTEMPTS as usize - 1); let connection = Connection::stubbed(socket, 28); - connection.establish_connection()?; + connection.establish_connection(None)?; // simulated dispatcher thread read to trigger disconnection let _ = connection.read_message(); @@ -1150,7 +1150,7 @@ mod tests { ]; let stream = MockSocket::new(events, 0); let connection = Connection::stubbed(stream, 28); - connection.establish_connection()?; + connection.establish_connection(None)?; let server_version = connection.server_version(); let bus = Arc::new(TcpMessageBus::new(connection)?); bus.process_messages(server_version, std::time::Duration::from_secs(0))?; @@ -1179,7 +1179,7 @@ mod tests { let stream = MockSocket::new(events, 0); let connection = Connection::stubbed(stream, 28); - connection.establish_connection()?; + connection.establish_connection(None)?; let server_version = connection.server_version(); let bus = TcpMessageBus::new(connection)?; @@ -1213,7 +1213,7 @@ mod tests { let stream = MockSocket::new(events, 0); let connection = Connection::stubbed(stream, 28); - connection.establish_connection()?; + connection.establish_connection(None)?; let server_version = connection.server_version(); let bus = TcpMessageBus::new(connection)?; @@ -1245,7 +1245,7 @@ mod tests { let stream = MockSocket::new(events, 0); let connection = Connection::stubbed(stream, 28); - connection.establish_connection()?; + connection.establish_connection(None)?; match connection.read_message() { Ok(_) => panic!(""), @@ -1254,7 +1254,7 @@ mod tests { connection.handshake()?; connection.write_message(&packet)?; connection.start_api()?; - connection.receive_account_info()?; + connection.receive_account_info(None)?; } }; @@ -1277,7 +1277,7 @@ mod tests { let stream = MockSocket::new(events, 0); let connection = Connection::stubbed(stream, 28); - connection.establish_connection()?; + connection.establish_connection(None)?; let server_version = connection.server_version(); let bus = Arc::new(TcpMessageBus::new(connection)?); bus.process_messages(server_version, std::time::Duration::from_secs(0))?; From ea901f29ff39d9e772975c7fe39210cbf4b02e40 Mon Sep 17 00:00:00 2001 From: Wil Boayue Date: Thu, 1 Jan 2026 16:00:50 -0800 Subject: [PATCH 2/4] fix: resolve flaky order_update_stream test isolation issue Add Drop impl to MessageBusStub to clean up ORDER_UPDATE_SUBSCRIPTION_TRACKER when stub is dropped. This prevents test isolation issues where memory addresses are reused across tests. Also add with_responses() constructor to work around Rust limitation with struct literals using Default::default() when Drop is implemented. --- src/orders/sync.rs | 34 ++++++++++++++-------------------- src/stubs.rs | 17 +++++++++++++++++ 2 files changed, 31 insertions(+), 20 deletions(-) diff --git a/src/orders/sync.rs b/src/orders/sync.rs index 9a4b3909..6bc5dde5 100644 --- a/src/orders/sync.rs +++ b/src/orders/sync.rs @@ -399,18 +399,15 @@ mod tests { #[test] fn place_order() { - let message_bus = Arc::new(MessageBusStub { - response_messages: vec![ - "5|13|76792991|TSLA|STK||0|?||SMART|USD|TSLA|NMS|BUY|100|MKT|0.0|0.0|DAY||DU1234567||0||100|1376327563|0|0|0||1376327563.0/DU1234567/100||||||||||0||-1|0||||||2147483647|0|0|0||3|0|0||0|0||0|None||0||||?|0|0||0|0||||||0|0|0|2147483647|2147483647|||0||IB|0|0||0|0|PreSubmitted|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308||||||0|0|0|None|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|0||||0|1|0|0|0|||0||".to_owned(), - "3|13|PreSubmitted|0|100|0|1376327563|0|0|100||0||".to_owned(), - "11|-1|13|76792991|TSLA|STK||0.0|||ISLAND|USD|TSLA|NMS|00025b46.63f8f39c.01.01|20230224 12:04:56|DU1234567|ISLAND|BOT|100|196.52|1376327563|100|0|100|196.52|||||2||".to_owned(), - "5|13|76792991|TSLA|STK||0|?||SMART|USD|TSLA|NMS|BUY|100|MKT|0.0|0.0|DAY||DU1234567||0||100|1376327563|0|0|0||1376327563.0/DU1234567/100||||||||||0||-1|0||||||2147483647|0|0|0||3|0|0||0|0||0|None||0||||?|0|0||0|0||||||0|0|0|2147483647|2147483647|||0||IB|0|0||0|0|Filled|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308||||||0|0|0|None|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|0||||0|1|0|0|0|||0||".to_owned(), - "3|13|Filled|100|0|196.52|1376327563|0|196.52|100||0||".to_owned(), - "5|13|76792991|TSLA|STK||0|?||SMART|USD|TSLA|NMS|BUY|100|MKT|0.0|0.0|DAY||DU1234567||0||100|1376327563|0|0|0||1376327563.0/DU1234567/100||||||||||0||-1|0||||||2147483647|0|0|0||3|0|0||0|0||0|None||0||||?|0|0||0|0||||||0|0|0|2147483647|2147483647|||0||IB|0|0||0|0|Filled|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.0|||USD||0|0|0|None|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|0||||0|1|0|0|0|||0||".to_owned(), - "59|1|00025b46.63f8f39c.01.01|1.0|USD|1.7976931348623157E308|1.7976931348623157E308|||".to_owned(), - ], - ..Default::default() - }); + let message_bus = Arc::new(MessageBusStub::with_responses(vec![ + "5|13|76792991|TSLA|STK||0|?||SMART|USD|TSLA|NMS|BUY|100|MKT|0.0|0.0|DAY||DU1234567||0||100|1376327563|0|0|0||1376327563.0/DU1234567/100||||||||||0||-1|0||||||2147483647|0|0|0||3|0|0||0|0||0|None||0||||?|0|0||0|0||||||0|0|0|2147483647|2147483647|||0||IB|0|0||0|0|PreSubmitted|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308||||||0|0|0|None|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|0||||0|1|0|0|0|||0||".to_owned(), + "3|13|PreSubmitted|0|100|0|1376327563|0|0|100||0||".to_owned(), + "11|-1|13|76792991|TSLA|STK||0.0|||ISLAND|USD|TSLA|NMS|00025b46.63f8f39c.01.01|20230224 12:04:56|DU1234567|ISLAND|BOT|100|196.52|1376327563|100|0|100|196.52|||||2||".to_owned(), + "5|13|76792991|TSLA|STK||0|?||SMART|USD|TSLA|NMS|BUY|100|MKT|0.0|0.0|DAY||DU1234567||0||100|1376327563|0|0|0||1376327563.0/DU1234567/100||||||||||0||-1|0||||||2147483647|0|0|0||3|0|0||0|0||0|None||0||||?|0|0||0|0||||||0|0|0|2147483647|2147483647|||0||IB|0|0||0|0|Filled|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308||||||0|0|0|None|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|0||||0|1|0|0|0|||0||".to_owned(), + "3|13|Filled|100|0|196.52|1376327563|0|196.52|100||0||".to_owned(), + "5|13|76792991|TSLA|STK||0|?||SMART|USD|TSLA|NMS|BUY|100|MKT|0.0|0.0|DAY||DU1234567||0||100|1376327563|0|0|0||1376327563.0/DU1234567/100||||||||||0||-1|0||||||2147483647|0|0|0||3|0|0||0|0||0|None||0||||?|0|0||0|0||||||0|0|0|2147483647|2147483647|||0||IB|0|0||0|0|Filled|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.0|||USD||0|0|0|None|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|1.7976931348623157E308|0||||0|1|0|0|0|||0||".to_owned(), + "59|1|00025b46.63f8f39c.01.01|1.0|USD|1.7976931348623157E308|1.7976931348623157E308|||".to_owned(), + ])); let client = Client::stubbed(message_bus, server_versions::SIZE_RULES); @@ -768,14 +765,11 @@ mod tests { fn completed_orders() { let _ = env_logger::try_init(); - let message_bus = Arc::new(MessageBusStub { - response_messages: vec![ - // Copy exact format from integration test, just changing account to DU1234567 - "101|265598|AAPL|STK||0|||SMART|USD|AAPL|NMS|BUY|100|MKT|0.0|0.0|DAY||DU1234567||0||1377295418|0|0|0|||||||||||0||-1||||||2147483647|0|0||3|0||0|None||0|0|0||0|0||||0|0|0|2147483647|2147483647||||IB|0|0||0|Filled|100|0|0|150.25|1.7976931348623157E308|0|1|0||0|2147483647|0|Not an insider or substantial shareholder|0|0|9223372036854775807|20231122 10:30:00 America/Los_Angeles|Filled||||||".to_owned(), - "102|".to_owned(), - ], - ..Default::default() - }); + let message_bus = Arc::new(MessageBusStub::with_responses(vec![ + // Copy exact format from integration test, just changing account to DU1234567 + "101|265598|AAPL|STK||0|||SMART|USD|AAPL|NMS|BUY|100|MKT|0.0|0.0|DAY||DU1234567||0||1377295418|0|0|0|||||||||||0||-1||||||2147483647|0|0||3|0||0|None||0|0|0||0|0||||0|0|0|2147483647|2147483647||||IB|0|0||0|Filled|100|0|0|150.25|1.7976931348623157E308|0|1|0||0|2147483647|0|Not an insider or substantial shareholder|0|0|9223372036854775807|20231122 10:30:00 America/Los_Angeles|Filled||||||".to_owned(), + "102|".to_owned(), + ])); let client = Client::stubbed(message_bus, server_versions::SIZE_RULES); diff --git a/src/stubs.rs b/src/stubs.rs index 3071dea1..458dfb53 100644 --- a/src/stubs.rs +++ b/src/stubs.rs @@ -46,7 +46,24 @@ impl Default for MessageBusStub { } } +#[cfg(feature = "sync")] +impl Drop for MessageBusStub { + fn drop(&mut self) { + // Clean up the subscription tracker to prevent test isolation issues + let stub_id = self as *const _ as usize; + ORDER_UPDATE_SUBSCRIPTION_TRACKER.lock().unwrap().remove(&stub_id); + } +} + impl MessageBusStub { + #[cfg(feature = "sync")] + pub fn with_responses(response_messages: Vec) -> Self { + Self { + request_messages: RwLock::new(vec![]), + response_messages, + } + } + pub fn request_messages(&self) -> Vec { self.request_messages.read().unwrap().clone() } From 53fc0e1fbc552f0b415e45ae00208f6fb67932a0 Mon Sep 17 00:00:00 2001 From: Wil Boayue Date: Thu, 1 Jan 2026 16:21:32 -0800 Subject: [PATCH 3/4] docs: note callback not invoked on reconnection --- src/client/async.rs | 3 +++ src/client/sync.rs | 3 +++ 2 files changed, 6 insertions(+) diff --git a/src/client/async.rs b/src/client/async.rs index 7d52fab7..529063bf 100644 --- a/src/client/async.rs +++ b/src/client/async.rs @@ -81,6 +81,9 @@ impl Client { /// that will be invoked for any unsolicited messages received during the connection /// handshake (e.g., OpenOrder, OrderStatus). /// + /// Note: The callback is only invoked during the initial connection, not during + /// automatic reconnections. + /// /// # Arguments /// * `address` - address of server. e.g. 127.0.0.1:4002 /// * `client_id` - id of client. e.g. 100 diff --git a/src/client/sync.rs b/src/client/sync.rs index 1ecccf09..f01cecee 100644 --- a/src/client/sync.rs +++ b/src/client/sync.rs @@ -81,6 +81,9 @@ impl Client { /// that will be invoked for any unsolicited messages received during the connection /// handshake (e.g., OpenOrder, OrderStatus). /// + /// Note: The callback is only invoked during the initial connection, not during + /// automatic reconnections. + /// /// # Arguments /// * `address` - address of server. e.g. 127.0.0.1:4002 /// * `client_id` - id of client. e.g. 100 From 3ddeebc86857f585bb9a89bf25319e0da437c804 Mon Sep 17 00:00:00 2001 From: Wil Boayue Date: Thu, 1 Jan 2026 16:27:02 -0800 Subject: [PATCH 4/4] chore: bump version to 2.5.0 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index abbcd026..a377a8f6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ibapi" -version = "2.4.0" +version = "2.5.0" edition = "2021" authors = ["Wil Boayue "] description = "A Rust implementation of the Interactive Brokers TWS API, providing a reliable and user friendly interface for TWS and IB Gateway. Designed with a focus on simplicity and performance."