Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions src/client/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use log::debug;
use time::OffsetDateTime;
use time_tz::Tz;

use crate::connection::common::StartupMessageCallback;
use crate::connection::common::{ConnectionOptions, StartupMessageCallback};
use crate::connection::{r#async::AsyncConnection, ConnectionMetadata};
use crate::messages::{OutgoingMessages, RequestMessage};
use crate::transport::{
Expand Down Expand Up @@ -117,7 +117,37 @@ impl Client {
/// }
/// ```
pub async fn connect_with_callback(address: &str, client_id: i32, startup_callback: Option<StartupMessageCallback>) -> Result<Client, Error> {
let connection = AsyncConnection::connect_with_callback(address, client_id, startup_callback).await?;
Self::connect_with_options(address, client_id, startup_callback.into()).await
}

/// Establishes async connection to TWS or Gateway with custom options
///
/// This is similar to [`connect`](Self::connect), but allows you to configure
/// connection options like `TCP_NODELAY` and startup callbacks via
/// [`ConnectionOptions`].
///
/// # Arguments
/// * `address` - address of server. e.g. 127.0.0.1:4002
/// * `client_id` - id of client. e.g. 100
/// * `options` - connection options
///
/// # Examples
///
/// ```no_run
/// use ibapi::{Client, ConnectionOptions};
///
/// #[tokio::main]
/// async fn main() {
/// let options = ConnectionOptions::default()
/// .tcp_no_delay(true);
///
/// let client = Client::connect_with_options("127.0.0.1:4002", 100, options)
/// .await
/// .expect("connection failed");
/// }
/// ```
pub async fn connect_with_options(address: &str, client_id: i32, options: ConnectionOptions) -> Result<Client, Error> {
let connection = AsyncConnection::connect_with_options(address, client_id, options).await?;
let connection_metadata = connection.connection_metadata();

let message_bus = Arc::new(AsyncTcpMessageBus::new(connection)?);
Expand Down
35 changes: 29 additions & 6 deletions src/client/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
//! subscriptions, and maintains the connection state.

use std::fmt::Debug;
use std::net::TcpStream;
use std::sync::Arc;
use std::time::Duration;

Expand All @@ -15,7 +14,7 @@ use time_tz::Tz;

use crate::accounts::types::{AccountGroup, AccountId, ContractId, ModelCode};
use crate::accounts::{AccountSummaryResult, AccountUpdate, AccountUpdateMulti, FamilyCode, PnL, PnLSingle, PositionUpdate, PositionUpdateMulti};
use crate::connection::common::StartupMessageCallback;
use crate::connection::common::{ConnectionOptions, StartupMessageCallback};
use crate::connection::{sync::Connection, ConnectionMetadata};
use crate::contracts::{Contract, OptionComputation, SecurityType};
use crate::errors::Error;
Expand All @@ -28,7 +27,7 @@ use crate::news::NewsArticle;
use crate::orders::{CancelOrder, Executions, ExerciseOptions, Order, OrderBuilder, OrderUpdate, Orders, PlaceOrder};
use crate::scanner::ScannerData;
use crate::subscriptions::sync::Subscription;
use crate::transport::{InternalSubscription, MessageBus, TcpMessageBus, TcpSocket};
use crate::transport::{InternalSubscription, MessageBus, TcpMessageBus};
use crate::wsh::AutoFill;
use crate::{accounts, contracts, display_groups, market_data, news, orders, scanner, wsh};

Expand Down Expand Up @@ -114,10 +113,34 @@ impl Client {
/// println!("Received {} startup orders", orders.lock().unwrap().len());
/// ```
pub fn connect_with_callback(address: &str, client_id: i32, startup_callback: Option<StartupMessageCallback>) -> Result<Client, Error> {
let stream = TcpStream::connect(address)?;
let socket = TcpSocket::new(stream, address)?;
Self::connect_with_options(address, client_id, startup_callback.into())
}

let connection = Connection::connect_with_callback(socket, client_id, startup_callback)?;
/// Establishes connection to TWS or Gateway with custom options
///
/// This is similar to [`connect`](Self::connect), but allows you to configure
/// connection options like `TCP_NODELAY` and startup callbacks via
/// [`ConnectionOptions`].
///
/// # Arguments
/// * `address` - address of server. e.g. 127.0.0.1:4002
/// * `client_id` - id of client. e.g. 100
/// * `options` - connection options
///
/// # Examples
///
/// ```no_run
/// use ibapi::client::blocking::Client;
/// use ibapi::ConnectionOptions;
///
/// let options = ConnectionOptions::default()
/// .tcp_no_delay(true);
///
/// let client = Client::connect_with_options("127.0.0.1:4002", 100, options)
/// .expect("connection failed");
/// ```
pub fn connect_with_options(address: &str, client_id: i32, options: ConnectionOptions) -> Result<Client, Error> {
let connection = Connection::connect_with_options(address, client_id, options)?;
let connection_metadata = connection.connection_metadata();

let message_bus = Arc::new(TcpMessageBus::new(connection)?);
Expand Down
2 changes: 1 addition & 1 deletion src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use time_tz::Tz;

pub mod common;

// Re-export StartupMessageCallback for lib.rs to re-export publicly
pub use common::ConnectionOptions;
pub use common::StartupMessageCallback;

/// Metadata about the connection to TWS
Expand Down
29 changes: 23 additions & 6 deletions src/connection/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tokio::net::TcpStream;
use tokio::sync::Mutex;
use tokio::time::sleep;

use super::common::{parse_connection_time, AccountInfo, ConnectionHandler, ConnectionProtocol, StartupMessageCallback};
use super::common::{parse_connection_time, AccountInfo, ConnectionHandler, ConnectionOptions, ConnectionProtocol, StartupMessageCallback};
use super::ConnectionMetadata;
use crate::errors::Error;
use crate::messages::{RequestMessage, ResponseMessage};
Expand All @@ -25,6 +25,7 @@ pub struct AsyncConnection {
pub(crate) recorder: MessageRecorder,
pub(crate) connection_handler: ConnectionHandler,
pub(crate) connection_url: String,
pub(crate) options: ConnectionOptions,
}

impl AsyncConnection {
Expand All @@ -39,7 +40,15 @@ impl AsyncConnection {
/// The callback will be invoked for any messages received during connection
/// setup that are not part of the normal handshake (e.g., OpenOrder, OrderStatus).
pub async fn connect_with_callback(address: &str, client_id: i32, startup_callback: Option<StartupMessageCallback>) -> Result<Self, Error> {
let socket = TcpStream::connect(address).await?;
Self::connect_with_options(address, client_id, startup_callback.into()).await
}

/// Create a new async connection with custom options.
///
/// Applies settings from [`ConnectionOptions`] (e.g. `TCP_NODELAY`, startup callback)
/// before performing the TWS handshake.
pub async fn connect_with_options(address: &str, client_id: i32, options: ConnectionOptions) -> Result<Self, Error> {
let socket = Self::connect_socket(address, &options).await?;

let connection = Self {
client_id,
Expand All @@ -51,13 +60,21 @@ impl AsyncConnection {
recorder: MessageRecorder::from_env(),
connection_handler: ConnectionHandler::default(),
connection_url: address.to_string(),
options,
};

connection.establish_connection(startup_callback.as_ref()).await?;
let cb_ref = connection.options.startup_callback.as_deref();
connection.establish_connection(cb_ref).await?;

Ok(connection)
}

async fn connect_socket(address: &str, options: &ConnectionOptions) -> Result<TcpStream, Error> {
let socket = TcpStream::connect(address).await?;
socket.set_nodelay(options.tcp_no_delay)?;
Ok(socket)
}

/// Get a copy of the connection metadata
pub fn connection_metadata(&self) -> ConnectionMetadata {
// For now, we'll use blocking lock since this is called during initialization
Expand Down Expand Up @@ -88,7 +105,7 @@ impl AsyncConnection {

sleep(next_delay).await;

match TcpStream::connect(&self.connection_url).await {
match Self::connect_socket(&self.connection_url, &self.options).await {
Ok(new_socket) => {
info!("reconnected !!!");

Expand All @@ -112,7 +129,7 @@ impl AsyncConnection {
}

/// Establish connection to TWS
pub(crate) async fn establish_connection(&self, startup_callback: Option<&StartupMessageCallback>) -> Result<(), Error> {
pub(crate) async fn establish_connection(&self, startup_callback: Option<&(dyn Fn(ResponseMessage) + Send + Sync)>) -> Result<(), Error> {
self.handshake().await?;
self.start_api().await?;
self.receive_account_info(startup_callback).await?;
Expand Down Expand Up @@ -219,7 +236,7 @@ impl AsyncConnection {
}

// Fetches next order id and managed accounts.
pub(crate) async fn receive_account_info(&self, startup_callback: Option<&StartupMessageCallback>) -> Result<(), Error> {
pub(crate) async fn receive_account_info(&self, startup_callback: Option<&(dyn Fn(ResponseMessage) + Send + Sync)>) -> Result<(), Error> {
let mut account_info = AccountInfo::default();

let mut attempts = 0;
Expand Down
102 changes: 100 additions & 2 deletions src/connection/common.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
//! Common connection logic shared between sync and async implementations

use std::fmt;
use std::sync::Arc;

use log::{debug, error, warn};
use time::macros::format_description;
use time::OffsetDateTime;
Expand All @@ -17,6 +20,64 @@ use crate::server_versions;
/// instead of discarding them.
pub type StartupMessageCallback = Box<dyn Fn(ResponseMessage) + Send + Sync>;

/// Options for configuring a connection to TWS or IB Gateway.
///
/// Use the builder methods to configure options, then pass to
/// [`Client::connect_with_options`](crate::Client::connect_with_options).
///
/// # Examples
///
/// ```
/// use ibapi::ConnectionOptions;
///
/// let options = ConnectionOptions::default()
/// .tcp_no_delay(true);
/// ```
#[derive(Clone, Default)]
pub struct ConnectionOptions {
pub(crate) tcp_no_delay: bool,
pub(crate) startup_callback: Option<Arc<dyn Fn(ResponseMessage) + Send + Sync>>,
}

impl ConnectionOptions {
/// Enable or disable `TCP_NODELAY` on the connection socket.
///
/// When enabled, disables Nagle's algorithm for lower latency.
/// Default: `false`.
pub fn tcp_no_delay(mut self, enabled: bool) -> Self {
self.tcp_no_delay = enabled;
self
}

/// Set a callback for unsolicited messages during connection setup.
///
/// When TWS sends messages like `OpenOrder` or `OrderStatus` during the
/// connection handshake, this callback processes them instead of discarding.
pub fn startup_callback(mut self, callback: impl Fn(ResponseMessage) + Send + Sync + 'static) -> Self {
self.startup_callback = Some(Arc::new(callback));
self
}
}

impl From<Option<StartupMessageCallback>> for ConnectionOptions {
fn from(callback: Option<StartupMessageCallback>) -> Self {
let mut opts = Self::default();
if let Some(cb) = callback {
opts.startup_callback = Some(Arc::from(cb));
}
opts
}
}

impl fmt::Debug for ConnectionOptions {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ConnectionOptions")
.field("tcp_no_delay", &self.tcp_no_delay)
.field("startup_callback", &self.startup_callback.is_some())
.finish()
}
}

/// Data exchanged during the connection handshake
#[derive(Debug, Clone)]
#[allow(dead_code)]
Expand Down Expand Up @@ -44,7 +105,11 @@ pub trait ConnectionProtocol {
///
/// If a callback is provided, unsolicited messages (like OpenOrder, OrderStatus)
/// will be passed to it instead of being discarded.
fn parse_account_info(&self, message: &mut ResponseMessage, callback: Option<&StartupMessageCallback>) -> Result<AccountInfo, Self::Error>;
fn parse_account_info(
&self,
message: &mut ResponseMessage,
callback: Option<&(dyn Fn(ResponseMessage) + Send + Sync)>,
) -> Result<AccountInfo, Self::Error>;
}

/// Account information received during connection establishment
Expand Down Expand Up @@ -109,7 +174,11 @@ impl ConnectionProtocol for ConnectionHandler {
message
}

fn parse_account_info(&self, message: &mut ResponseMessage, callback: Option<&StartupMessageCallback>) -> Result<AccountInfo, Self::Error> {
fn parse_account_info(
&self,
message: &mut ResponseMessage,
callback: Option<&(dyn Fn(ResponseMessage) + Send + Sync)>,
) -> Result<AccountInfo, Self::Error> {
let mut info = AccountInfo::default();

match message.message_type() {
Expand Down Expand Up @@ -442,4 +511,33 @@ mod tests {
// server_time will contain replacement characters but parsing succeeds
assert!(handshake_data.server_time.contains("20251205"));
}

#[test]
fn test_connection_options_default() {
let opts = ConnectionOptions::default();
assert_eq!(opts.tcp_no_delay, false);
assert!(opts.startup_callback.is_none());
}

#[test]
fn test_connection_options_builder() {
let opts = ConnectionOptions::default().tcp_no_delay(true).startup_callback(|_msg| {});
assert_eq!(opts.tcp_no_delay, true);
assert!(opts.startup_callback.is_some());
}

#[test]
fn test_connection_options_clone() {
let opts = ConnectionOptions::default().tcp_no_delay(true);
let cloned = opts.clone();
assert_eq!(cloned.tcp_no_delay, true);
}

#[test]
fn test_connection_options_debug() {
let opts = ConnectionOptions::default().tcp_no_delay(true);
let debug_str = format!("{:?}", opts);
assert!(debug_str.contains("tcp_no_delay: true"));
assert!(debug_str.contains("startup_callback: false"));
}
}
Loading