Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ibapi"
version = "2.4.0"
version = "2.5.0"
edition = "2021"
authors = ["Wil Boayue <wil@wsbsolutions.com>"]
description = "A Rust implementation of the Interactive Brokers TWS API, providing a reliable and user friendly interface for TWS and IB Gateway. Designed with a focus on simplicity and performance."
Expand Down
49 changes: 48 additions & 1 deletion src/client/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use log::debug;
use time::OffsetDateTime;
use time_tz::Tz;

use crate::connection::common::StartupMessageCallback;
use crate::connection::{r#async::AsyncConnection, ConnectionMetadata};
use crate::messages::{OutgoingMessages, RequestMessage};
use crate::transport::{
Expand Down Expand Up @@ -71,7 +72,53 @@ impl Client {
/// }
/// ```
pub async fn connect(address: &str, client_id: i32) -> Result<Client, Error> {
let connection = AsyncConnection::connect(address, client_id).await?;
Self::connect_with_callback(address, client_id, None).await
}

/// Establishes async connection to TWS or Gateway with a callback for startup messages
///
/// This is similar to [`connect`](Self::connect), but allows you to provide a callback
/// that will be invoked for any unsolicited messages received during the connection
/// handshake (e.g., OpenOrder, OrderStatus).
///
/// Note: The callback is only invoked during the initial connection, not during
/// automatic reconnections.
///
/// # Arguments
/// * `address` - address of server. e.g. 127.0.0.1:4002
/// * `client_id` - id of client. e.g. 100
/// * `startup_callback` - optional callback for unsolicited messages during connection
///
/// # Examples
///
/// ```no_run
/// use ibapi::{Client, StartupMessageCallback};
/// use ibapi::messages::IncomingMessages;
/// use std::sync::{Arc, Mutex};
///
/// #[tokio::main]
/// async fn main() {
/// let orders = Arc::new(Mutex::new(Vec::new()));
/// let orders_clone = orders.clone();
///
/// let callback: StartupMessageCallback = Box::new(move |msg| {
/// match msg.message_type() {
/// IncomingMessages::OpenOrder | IncomingMessages::OrderStatus => {
/// orders_clone.lock().unwrap().push(msg);
/// }
/// _ => {}
/// }
/// });
///
/// let client = Client::connect_with_callback("127.0.0.1:4002", 100, Some(callback))
/// .await
/// .expect("connection failed");
///
/// println!("Received {} startup orders", orders.lock().unwrap().len());
/// }
/// ```
pub async fn connect_with_callback(address: &str, client_id: i32, startup_callback: Option<StartupMessageCallback>) -> Result<Client, Error> {
let connection = AsyncConnection::connect_with_callback(address, client_id, startup_callback).await?;
let connection_metadata = connection.connection_metadata();

let message_bus = Arc::new(AsyncTcpMessageBus::new(connection)?);
Expand Down
46 changes: 45 additions & 1 deletion src/client/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use time_tz::Tz;

use crate::accounts::types::{AccountGroup, AccountId, ContractId, ModelCode};
use crate::accounts::{AccountSummaryResult, AccountUpdate, AccountUpdateMulti, FamilyCode, PnL, PnLSingle, PositionUpdate, PositionUpdateMulti};
use crate::connection::common::StartupMessageCallback;
use crate::connection::{sync::Connection, ConnectionMetadata};
use crate::contracts::{Contract, OptionComputation, SecurityType};
use crate::display_groups::DisplayGroupUpdate;
Expand Down Expand Up @@ -71,10 +72,53 @@ impl Client {
/// println!("next_order_id: {}", client.next_order_id());
/// ```
pub fn connect(address: &str, client_id: i32) -> Result<Client, Error> {
Self::connect_with_callback(address, client_id, None)
}

/// Establishes connection to TWS or Gateway with a callback for startup messages
///
/// This is similar to [`connect`](Self::connect), but allows you to provide a callback
/// that will be invoked for any unsolicited messages received during the connection
/// handshake (e.g., OpenOrder, OrderStatus).
///
/// Note: The callback is only invoked during the initial connection, not during
/// automatic reconnections.
///
/// # Arguments
/// * `address` - address of server. e.g. 127.0.0.1:4002
/// * `client_id` - id of client. e.g. 100
/// * `startup_callback` - optional callback for unsolicited messages during connection
///
/// # Examples
///
/// ```no_run
/// use ibapi::client::blocking::Client;
/// use ibapi::StartupMessageCallback;
/// use ibapi::messages::IncomingMessages;
/// use std::sync::{Arc, Mutex};
///
/// let orders = Arc::new(Mutex::new(Vec::new()));
/// let orders_clone = orders.clone();
///
/// let callback: StartupMessageCallback = Box::new(move |msg| {
/// match msg.message_type() {
/// IncomingMessages::OpenOrder | IncomingMessages::OrderStatus => {
/// orders_clone.lock().unwrap().push(msg);
/// }
/// _ => {}
/// }
/// });
///
/// let client = Client::connect_with_callback("127.0.0.1:4002", 100, Some(callback))
/// .expect("connection failed");
///
/// println!("Received {} startup orders", orders.lock().unwrap().len());
/// ```
pub fn connect_with_callback(address: &str, client_id: i32, startup_callback: Option<StartupMessageCallback>) -> Result<Client, Error> {
let stream = TcpStream::connect(address)?;
let socket = TcpSocket::new(stream, address)?;

let connection = Connection::connect(socket, client_id)?;
let connection = Connection::connect_with_callback(socket, client_id, startup_callback)?;
let connection_metadata = connection.connection_metadata();

let message_bus = Arc::new(TcpMessageBus::new(connection)?);
Expand Down
3 changes: 3 additions & 0 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ use time_tz::Tz;

pub mod common;

// Re-export StartupMessageCallback for lib.rs to re-export publicly
pub use common::StartupMessageCallback;

/// Metadata about the connection to TWS
#[derive(Default, Clone, Debug)]
pub struct ConnectionMetadata {
Expand Down
24 changes: 17 additions & 7 deletions src/connection/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tokio::net::TcpStream;
use tokio::sync::Mutex;
use tokio::time::sleep;

use super::common::{parse_connection_time, AccountInfo, ConnectionHandler, ConnectionProtocol};
use super::common::{parse_connection_time, AccountInfo, ConnectionHandler, ConnectionProtocol, StartupMessageCallback};
use super::ConnectionMetadata;
use crate::errors::Error;
use crate::messages::{RequestMessage, ResponseMessage};
Expand All @@ -29,7 +29,16 @@ pub struct AsyncConnection {

impl AsyncConnection {
/// Create a new async connection
#[allow(dead_code)]
pub async fn connect(address: &str, client_id: i32) -> Result<Self, Error> {
Self::connect_with_callback(address, client_id, None).await
}

/// Create a new async connection with a callback for unsolicited messages
///
/// The callback will be invoked for any messages received during connection
/// setup that are not part of the normal handshake (e.g., OpenOrder, OrderStatus).
pub async fn connect_with_callback(address: &str, client_id: i32, startup_callback: Option<StartupMessageCallback>) -> Result<Self, Error> {
let socket = TcpStream::connect(address).await?;

let connection = Self {
Expand All @@ -44,7 +53,7 @@ impl AsyncConnection {
connection_url: address.to_string(),
};

connection.establish_connection().await?;
connection.establish_connection(startup_callback.as_ref()).await?;

Ok(connection)
}
Expand Down Expand Up @@ -88,7 +97,8 @@ impl AsyncConnection {
*socket = new_socket;
}

self.establish_connection().await?;
// Reconnection doesn't use startup callback
self.establish_connection(None).await?;

return Ok(());
}
Expand All @@ -102,10 +112,10 @@ impl AsyncConnection {
}

/// Establish connection to TWS
pub(crate) async fn establish_connection(&self) -> Result<(), Error> {
pub(crate) async fn establish_connection(&self, startup_callback: Option<&StartupMessageCallback>) -> Result<(), Error> {
self.handshake().await?;
self.start_api().await?;
self.receive_account_info().await?;
self.receive_account_info(startup_callback).await?;
Ok(())
}

Expand Down Expand Up @@ -209,14 +219,14 @@ impl AsyncConnection {
}

// Fetches next order id and managed accounts.
pub(crate) async fn receive_account_info(&self) -> Result<(), Error> {
pub(crate) async fn receive_account_info(&self, startup_callback: Option<&StartupMessageCallback>) -> Result<(), Error> {
let mut account_info = AccountInfo::default();

let mut attempts = 0;
const MAX_ATTEMPTS: i32 = 100;
loop {
let mut message = self.read_message().await?;
let info = self.connection_handler.parse_account_info(&mut message)?;
let info = self.connection_handler.parse_account_info(&mut message, startup_callback)?;

// Merge received info
if info.next_order_id.is_some() {
Expand Down
Loading