Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/connection/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ impl AsyncConnection {
/// setup that are not part of the normal handshake (e.g., OpenOrder, OrderStatus).
pub async fn connect_with_callback(address: &str, client_id: i32, startup_callback: Option<StartupMessageCallback>) -> Result<Self, Error> {
let socket = TcpStream::connect(address).await?;
// Optionally disable Nagle's algorithm for low-latency order submission.
// Set IBAPI_TCP_NODELAY=1 to send small writes (order messages) immediately
// instead of buffering up to 40ms.
if std::env::var("IBAPI_TCP_NODELAY").unwrap_or_default() == "1" {
socket.set_nodelay(true)?;
}

let connection = Self {
client_id,
Expand Down Expand Up @@ -90,6 +96,9 @@ impl AsyncConnection {

match TcpStream::connect(&self.connection_url).await {
Ok(new_socket) => {
if std::env::var("IBAPI_TCP_NODELAY").unwrap_or_default() == "1" {
new_socket.set_nodelay(true)?;
}
info!("reconnected !!!");

{
Expand Down
7 changes: 7 additions & 0 deletions src/market_data/historical/common/decoders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,12 +287,19 @@ fn parse_schedule_date(text: &str) -> Result<Date, Error> {

fn parse_bar_date(text: &str, time_zone: &Tz) -> Result<OffsetDateTime, Error> {
if text.len() == 8 {
// Format: YYYYMMDD
let date_format = format_description!("[year][month][day]");
let bar_date = Date::parse(text, date_format)?;
let bar_date = bar_date.with_time(time!(00:00));

Ok(bar_date.assume_timezone_utc(time_tz::timezones::db::UTC))
} else if text.len() > 8 && text.contains(' ') {
// Format: YYYYMMDD HH:MM:SS (note: two spaces between date and time per IBKR format)
let datetime_format = format_description!("[year][month][day] [hour]:[minute]:[second]");
let bar_datetime = PrimitiveDateTime::parse(text, datetime_format)?;
Ok(bar_datetime.assume_timezone(time_zone).unwrap())
} else {
// Unix timestamp
let timestamp: i64 = text.parse()?;
let date_utc = OffsetDateTime::from_unix_timestamp(timestamp).unwrap();
Ok(date_utc.to_timezone(time_zone))
Expand Down
6 changes: 6 additions & 0 deletions src/market_data/historical/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ pub enum BarSize {
Min3,
/// Five-minute bars.
Min5,
/// Ten-minute bars.
Min10,
/// Fifteen-minute bars.
Min15,
/// Twenty-minute bars.
Expand Down Expand Up @@ -115,6 +117,7 @@ impl Display for BarSize {
Self::Min2 => write!(f, "2 mins"),
Self::Min3 => write!(f, "3 mins"),
Self::Min5 => write!(f, "5 mins"),
Self::Min10 => write!(f, "10 mins"),
Self::Min15 => write!(f, "15 mins"),
Self::Min20 => write!(f, "20 mins"),
Self::Min30 => write!(f, "30 mins"),
Expand Down Expand Up @@ -144,6 +147,7 @@ impl FromStr for BarSize {
"MIN2" => Ok(Self::Min2),
"MIN3" => Ok(Self::Min3),
"MIN5" => Ok(Self::Min5),
"MIN10" => Ok(Self::Min10),
"MIN15" => Ok(Self::Min15),
"MIN20" => Ok(Self::Min20),
"MIN30" => Ok(Self::Min30),
Expand Down Expand Up @@ -599,6 +603,7 @@ mod tests {
assert_eq!("2 mins", BarSize::Min2.to_string());
assert_eq!("3 mins", BarSize::Min3.to_string());
assert_eq!("5 mins", BarSize::Min5.to_string());
assert_eq!("10 mins", BarSize::Min10.to_string());
assert_eq!("15 mins", BarSize::Min15.to_string());
assert_eq!("20 mins", BarSize::Min20.to_string());
assert_eq!("30 mins", BarSize::Min30.to_string());
Expand All @@ -623,6 +628,7 @@ mod tests {
assert_eq!(BarSize::Min2, BarSize::from("MIN2"));
assert_eq!(BarSize::Min3, BarSize::from("MIN3"));
assert_eq!(BarSize::Min5, BarSize::from("MIN5"));
assert_eq!(BarSize::Min10, BarSize::from("MIN10"));
assert_eq!(BarSize::Min15, BarSize::from("MIN15"));
assert_eq!(BarSize::Min20, BarSize::from("MIN20"));
assert_eq!(BarSize::Min30, BarSize::from("MIN30"));
Expand Down
9 changes: 9 additions & 0 deletions src/transport/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,12 @@ pub(crate) struct TcpSocket {
}
impl TcpSocket {
pub fn new(stream: TcpStream, connection_url: &str) -> Result<Self, Error> {
// Optionally disable Nagle's algorithm for low-latency order submission.
// Set IBAPI_TCP_NODELAY=1 to send small writes immediately.
if std::env::var("IBAPI_TCP_NODELAY").unwrap_or_default() == "1" {
stream.set_nodelay(true)?;
}

let writer = stream.try_clone()?;

stream.set_read_timeout(Some(TWS_READ_TIMEOUT))?;
Expand All @@ -718,6 +724,9 @@ impl Reconnect for TcpSocket {
fn reconnect(&self) -> Result<(), Error> {
match TcpStream::connect(&self.connection_url) {
Ok(stream) => {
if std::env::var("IBAPI_TCP_NODELAY").unwrap_or_default() == "1" {
stream.set_nodelay(true)?;
}
stream.set_read_timeout(Some(TWS_READ_TIMEOUT))?;

let mut reader = self.reader.lock()?;
Expand Down