Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ibapi"
version = "2.6.2"
version = "2.7.0"
edition = "2021"
authors = ["Wil Boayue <wil@wsbsolutions.com>"]
description = "A Rust implementation of the Interactive Brokers TWS API, providing a reliable and user friendly interface for TWS and IB Gateway. Designed with a focus on simplicity and performance."
Expand Down
4 changes: 1 addition & 3 deletions examples/async/historical_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
bar.close
);
}
}
HistoricalBarUpdate::HistoricalEnd => {
println!("Initial historical data complete. Now streaming updates...");
println!("Now streaming updates...");
}
HistoricalBarUpdate::Update(bar) => {
println!(
Expand Down
10 changes: 6 additions & 4 deletions src/client/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1025,24 +1025,26 @@ impl Client {
/// ```no_run
/// use ibapi::contracts::Contract;
/// use ibapi::Client;
/// use ibapi::market_data::historical::{ToDuration, BarSize, WhatToShow, HistoricalBarUpdate};
/// use ibapi::market_data::TradingHours;
/// use ibapi::market_data::historical::{ToDuration, HistoricalBarUpdate};
/// use ibapi::prelude::{HistoricalBarSize, HistoricalWhatToShow, TradingHours};
///
/// #[tokio::main]
/// async fn main() {
/// let client = Client::connect("127.0.0.1:4002", 100).await.expect("connection failed");
/// let contract = Contract::stock("SPY").build();
///
/// let mut subscription = client
/// .historical_data_streaming(&contract, 3.days(), BarSize::Min15, Some(WhatToShow::Trades), TradingHours::Extended, true)
/// .historical_data_streaming(
/// &contract, 3.days(), HistoricalBarSize::Min15,
/// Some(HistoricalWhatToShow::Trades), TradingHours::Extended, true
/// )
/// .await
/// .expect("streaming request failed");
///
/// while let Some(update) = subscription.next().await {
/// match update {
/// HistoricalBarUpdate::Historical(data) => println!("Initial bars: {}", data.bars.len()),
/// HistoricalBarUpdate::Update(bar) => println!("Streaming update: {:?}", bar),
/// HistoricalBarUpdate::HistoricalEnd => println!("Initial data complete"),
/// }
/// }
/// }
Expand Down
52 changes: 52 additions & 0 deletions src/client/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1083,6 +1083,58 @@ impl Client {
historical::blocking::historical_data(self, contract, interval_end, duration, bar_size, Some(what_to_show), trading_hours)
}

/// Requests historical data with optional streaming updates.
///
/// This method returns a subscription that first yields the initial historical bars.
/// When `keep_up_to_date` is `true`, it continues to yield streaming updates for
/// the current bar as it builds. IBKR sends updated bars every ~4-6 seconds until
/// the bar completes.
///
/// # Arguments
/// * `contract` - Contract object that is subject of query
/// * `duration` - The amount of time for which the data needs to be retrieved
/// * `bar_size` - The bar size (resolution)
/// * `what_to_show` - The type of data to retrieve (Trades, MidPoint, etc.)
/// * `trading_hours` - Regular trading hours only, or include extended hours
/// * `keep_up_to_date` - If true, continue receiving streaming updates after initial data
///
/// # Examples
///
/// ```no_run
/// use ibapi::contracts::Contract;
/// use ibapi::client::blocking::Client;
/// use ibapi::market_data::historical::{ToDuration, HistoricalBarUpdate};
/// use ibapi::prelude::{HistoricalBarSize, HistoricalWhatToShow, TradingHours};
///
/// let client = Client::connect("127.0.0.1:4002", 100).expect("connection failed");
/// let contract = Contract::stock("SPY").build();
///
/// let subscription = client
/// .historical_data_streaming(
/// &contract, 3.days(), HistoricalBarSize::Min15,
/// Some(HistoricalWhatToShow::Trades), TradingHours::Extended, true
/// )
/// .expect("streaming request failed");
///
/// while let Some(update) = subscription.next() {
/// match update {
/// HistoricalBarUpdate::Historical(data) => println!("Initial bars: {}", data.bars.len()),
/// HistoricalBarUpdate::Update(bar) => println!("Streaming update: {:?}", bar),
/// }
/// }
/// ```
pub fn historical_data_streaming(
&self,
contract: &Contract,
duration: historical::Duration,
bar_size: historical::BarSize,
what_to_show: Option<historical::WhatToShow>,
trading_hours: TradingHours,
keep_up_to_date: bool,
) -> Result<historical::blocking::HistoricalDataStreamingSubscription, Error> {
historical::blocking::historical_data_streaming(self, contract, duration, bar_size, what_to_show, trading_hours, keep_up_to_date)
}

/// Requests [Schedule](historical::Schedule) for an interval of given duration
/// ending at specified date.
///
Expand Down
69 changes: 23 additions & 46 deletions src/market_data/historical/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,18 @@ impl<T: TickDecoder<T> + Send> TickSubscription<T> {
/// A `HistoricalDataStreamingSubscription` that yields `HistoricalBarUpdate` values
///
/// # Example
/// ```ignore
/// ```no_run
/// use ibapi::Client;
/// use ibapi::contracts::Contract;
/// use ibapi::market_data::historical::{
/// BarSize, Duration, HistoricalBarUpdate, WhatToShow, historical_data_streaming
/// };
/// use ibapi::market_data::TradingHours;
///
/// # async fn example() -> Result<(), ibapi::Error> {
/// let client = Client::connect("127.0.0.1:4002", 100).await?;
/// let contract = Contract::stock("SPY").build();
///
/// let mut subscription = historical_data_streaming(
/// &client,
/// &contract,
Expand All @@ -375,11 +386,10 @@ impl<T: TickDecoder<T> + Send> TickSubscription<T> {
/// HistoricalBarUpdate::Update(bar) => {
/// println!("Bar update: {} close={}", bar.date, bar.close);
/// }
/// HistoricalBarUpdate::HistoricalEnd => {
/// println!("Initial historical data complete, now streaming");
/// }
/// }
/// }
/// # Ok(())
/// # }
/// ```
///
/// # See Also
Expand Down Expand Up @@ -427,13 +437,12 @@ pub async fn historical_data_streaming(

/// Async subscription for streaming historical data with keepUpToDate=true.
///
/// This subscription first yields the initial historical bars, then continues
/// to yield streaming updates for the current bar as it builds.
/// This subscription first yields the initial historical bars as a `Historical` variant,
/// then continues to yield streaming updates for the current bar as `Update` variants.
pub struct HistoricalDataStreamingSubscription {
messages: AsyncInternalSubscription,
server_version: i32,
time_zone: &'static Tz,
pending_end: bool,
error: Option<Error>,
}

Expand All @@ -443,25 +452,17 @@ impl HistoricalDataStreamingSubscription {
messages,
server_version,
time_zone,
pending_end: false,
error: None,
}
}

/// Get the next update from the streaming subscription.
///
/// Returns:
/// - `Some(HistoricalBarUpdate::Historical(data))` - Initial batch of historical bars
/// - `Some(HistoricalBarUpdate::HistoricalEnd)` - End of initial historical data
/// - `Some(HistoricalBarUpdate::Historical(data))` - Initial batch of historical bars (always first)
/// - `Some(HistoricalBarUpdate::Update(bar))` - Streaming bar update
/// - `None` - Subscription ended (connection closed or error)
pub async fn next(&mut self) -> Option<HistoricalBarUpdate> {
// Emit HistoricalEnd after Historical data was returned
if self.pending_end {
self.pending_end = false;
return Some(HistoricalBarUpdate::HistoricalEnd);
}

loop {
match self.messages.next().await {
Some(Ok(mut message)) => {
Expand All @@ -470,7 +471,6 @@ impl HistoricalDataStreamingSubscription {
// Initial historical data batch
match decoders::decode_historical_data(self.server_version, self.time_zone, &mut message) {
Ok(data) => {
self.pending_end = true;
return Some(HistoricalBarUpdate::Historical(data));
}
Err(e) => {
Expand Down Expand Up @@ -1187,18 +1187,10 @@ mod tests {
_ => panic!("Expected Historical variant"),
}

// Second: receive HistoricalEnd marker
// Second: receive streaming update
let update2 = subscription.next().await;
assert!(update2.is_some(), "Should receive HistoricalEnd");
assert!(update2.is_some(), "Should receive streaming update");
match update2.unwrap() {
HistoricalBarUpdate::HistoricalEnd => {}
_ => panic!("Expected HistoricalEnd variant"),
}

// Third: receive streaming update
let update3 = subscription.next().await;
assert!(update3.is_some(), "Should receive streaming update");
match update3.unwrap() {
HistoricalBarUpdate::Update(bar) => {
assert_eq!(bar.open, 185.80, "Wrong open price in update");
assert_eq!(bar.high, 186.10, "Wrong high price in update");
Expand All @@ -1210,11 +1202,8 @@ mod tests {
// Verify request message includes keepUpToDate=true
let request_messages = message_bus.request_messages.read().unwrap();
assert_eq!(request_messages.len(), 1, "Should send one request");
// The keepUpToDate field should be "1" (true)
assert!(
request_messages[0].fields.contains(&"1".to_string()),
"Request should have keepUpToDate=true"
);
// keepUpToDate is at field index 21 (for non-bag contracts)
assert_eq!(request_messages[0].fields[21], "1", "Request should have keepUpToDate=true at field[21]");
}

#[tokio::test]
Expand Down Expand Up @@ -1254,23 +1243,11 @@ mod tests {
_ => panic!("Expected Historical variant"),
}

// Receive HistoricalEnd marker
let update2 = subscription.next().await;
assert!(update2.is_some(), "Should receive HistoricalEnd");
match update2.unwrap() {
HistoricalBarUpdate::HistoricalEnd => {}
_ => panic!("Expected HistoricalEnd variant"),
}

// Verify request message includes keepUpToDate=false
let request_messages = message_bus.request_messages.read().unwrap();
assert_eq!(request_messages.len(), 1, "Should send one request");
// Find the keepUpToDate field - it should be "0" (false)
// The field order in historical data request puts keepUpToDate near the end
let request = &request_messages[0];
// Check the last few fields for the "0" value
let fields_str = request.fields.join("|");
assert!(fields_str.contains("|0|"), "Request should have keepUpToDate=false (0)");
// keepUpToDate is at field index 21 (for non-bag contracts)
assert_eq!(request_messages[0].fields[21], "0", "Request should have keepUpToDate=false at field[21]");
}

#[tokio::test]
Expand Down
3 changes: 0 additions & 3 deletions src/market_data/historical/common/decoders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,6 @@ pub(crate) fn decode_histogram_data(message: &mut ResponseMessage) -> Result<Vec
/// - volume
/// - wap
/// - count
#[cfg(feature = "async")]
pub(crate) fn decode_historical_data_update(time_zone: &Tz, message: &mut ResponseMessage) -> Result<Bar, Error> {
message.skip(); // message type
message.skip(); // request_id
Expand Down Expand Up @@ -482,7 +481,6 @@ mod tests {
assert_eq!(ticks[23].size, 0, "ticks[0].size");
}

#[cfg(feature = "async")]
#[test]
fn test_decode_historical_data_update() {
let time_zone: &Tz = time_tz::timezones::db::america::NEW_YORK;
Expand All @@ -502,7 +500,6 @@ mod tests {
assert_eq!(bar.count, 150, "bar.count");
}

#[cfg(feature = "async")]
#[test]
fn test_decode_historical_data_update_without_count() {
let time_zone: &Tz = time_tz::timezones::db::america::NEW_YORK;
Expand Down
14 changes: 8 additions & 6 deletions src/market_data/historical/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,18 +329,17 @@ pub struct HistoricalData {
/// Update from historical data streaming with keepUpToDate=true.
///
/// When requesting historical data with `keepUpToDate=true`, IBKR first sends
/// the historical bars, then continues streaming updates for the current bar.
/// the initial historical bars as a `Historical` variant, then continues
/// streaming real-time updates for the current bar as `Update` variants.
/// The current bar is updated approximately every 4-6 seconds until a new
/// bar begins.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum HistoricalBarUpdate {
/// Initial batch of historical bars.
/// Initial batch of historical bars. Always received first.
Historical(HistoricalData),
/// Real-time update of the current (incomplete) bar.
/// Note: Multiple updates with the same timestamp will be sent as the bar builds.
/// Multiple updates with the same timestamp will be sent as the bar builds.
Update(Bar),
/// Signals the end of the initial historical data batch.
HistoricalEnd,
}

/// Trading schedule describing sessions for a contract.
Expand Down Expand Up @@ -577,7 +576,10 @@ impl TickDecoder<TickMidpoint> for TickMidpoint {

// Re-export TickSubscription and iterator types based on active feature
#[cfg(all(feature = "sync", not(feature = "async")))]
pub use sync::{TickSubscription, TickSubscriptionIter, TickSubscriptionOwnedIter, TickSubscriptionTimeoutIter, TickSubscriptionTryIter};
pub use sync::{
HistoricalDataStreamingSubscription, TickSubscription, TickSubscriptionIter, TickSubscriptionOwnedIter, TickSubscriptionTimeoutIter,
TickSubscriptionTryIter,
};

#[cfg(feature = "async")]
pub use r#async::{historical_data_streaming, HistoricalDataStreamingSubscription, TickSubscription};
Expand Down
Loading