diff --git a/Cargo.toml b/Cargo.toml index 0e6aea58..0172ac59 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ibapi" -version = "2.6.2" +version = "2.7.0" edition = "2021" authors = ["Wil Boayue "] description = "A Rust implementation of the Interactive Brokers TWS API, providing a reliable and user friendly interface for TWS and IB Gateway. Designed with a focus on simplicity and performance." diff --git a/examples/async/historical_data.rs b/examples/async/historical_data.rs index a9db50f4..2e3fb6db 100644 --- a/examples/async/historical_data.rs +++ b/examples/async/historical_data.rs @@ -275,9 +275,7 @@ async fn main() -> Result<(), Box> { bar.close ); } - } - HistoricalBarUpdate::HistoricalEnd => { - println!("Initial historical data complete. Now streaming updates..."); + println!("Now streaming updates..."); } HistoricalBarUpdate::Update(bar) => { println!( diff --git a/src/client/async.rs b/src/client/async.rs index 1611b2a9..c0249497 100644 --- a/src/client/async.rs +++ b/src/client/async.rs @@ -1025,8 +1025,8 @@ impl Client { /// ```no_run /// use ibapi::contracts::Contract; /// use ibapi::Client; - /// use ibapi::market_data::historical::{ToDuration, BarSize, WhatToShow, HistoricalBarUpdate}; - /// use ibapi::market_data::TradingHours; + /// use ibapi::market_data::historical::{ToDuration, HistoricalBarUpdate}; + /// use ibapi::prelude::{HistoricalBarSize, HistoricalWhatToShow, TradingHours}; /// /// #[tokio::main] /// async fn main() { @@ -1034,7 +1034,10 @@ impl Client { /// let contract = Contract::stock("SPY").build(); /// /// let mut subscription = client - /// .historical_data_streaming(&contract, 3.days(), BarSize::Min15, Some(WhatToShow::Trades), TradingHours::Extended, true) + /// .historical_data_streaming( + /// &contract, 3.days(), HistoricalBarSize::Min15, + /// Some(HistoricalWhatToShow::Trades), TradingHours::Extended, true + /// ) /// .await /// .expect("streaming request failed"); /// @@ -1042,7 +1045,6 @@ impl Client { /// match update { /// HistoricalBarUpdate::Historical(data) => println!("Initial bars: {}", data.bars.len()), /// HistoricalBarUpdate::Update(bar) => println!("Streaming update: {:?}", bar), - /// HistoricalBarUpdate::HistoricalEnd => println!("Initial data complete"), /// } /// } /// } diff --git a/src/client/sync.rs b/src/client/sync.rs index f01cecee..ad2b6118 100644 --- a/src/client/sync.rs +++ b/src/client/sync.rs @@ -1083,6 +1083,58 @@ impl Client { historical::blocking::historical_data(self, contract, interval_end, duration, bar_size, Some(what_to_show), trading_hours) } + /// Requests historical data with optional streaming updates. + /// + /// This method returns a subscription that first yields the initial historical bars. + /// When `keep_up_to_date` is `true`, it continues to yield streaming updates for + /// the current bar as it builds. IBKR sends updated bars every ~4-6 seconds until + /// the bar completes. + /// + /// # Arguments + /// * `contract` - Contract object that is subject of query + /// * `duration` - The amount of time for which the data needs to be retrieved + /// * `bar_size` - The bar size (resolution) + /// * `what_to_show` - The type of data to retrieve (Trades, MidPoint, etc.) + /// * `trading_hours` - Regular trading hours only, or include extended hours + /// * `keep_up_to_date` - If true, continue receiving streaming updates after initial data + /// + /// # Examples + /// + /// ```no_run + /// use ibapi::contracts::Contract; + /// use ibapi::client::blocking::Client; + /// use ibapi::market_data::historical::{ToDuration, HistoricalBarUpdate}; + /// use ibapi::prelude::{HistoricalBarSize, HistoricalWhatToShow, TradingHours}; + /// + /// let client = Client::connect("127.0.0.1:4002", 100).expect("connection failed"); + /// let contract = Contract::stock("SPY").build(); + /// + /// let subscription = client + /// .historical_data_streaming( + /// &contract, 3.days(), HistoricalBarSize::Min15, + /// Some(HistoricalWhatToShow::Trades), TradingHours::Extended, true + /// ) + /// .expect("streaming request failed"); + /// + /// while let Some(update) = subscription.next() { + /// match update { + /// HistoricalBarUpdate::Historical(data) => println!("Initial bars: {}", data.bars.len()), + /// HistoricalBarUpdate::Update(bar) => println!("Streaming update: {:?}", bar), + /// } + /// } + /// ``` + pub fn historical_data_streaming( + &self, + contract: &Contract, + duration: historical::Duration, + bar_size: historical::BarSize, + what_to_show: Option, + trading_hours: TradingHours, + keep_up_to_date: bool, + ) -> Result { + historical::blocking::historical_data_streaming(self, contract, duration, bar_size, what_to_show, trading_hours, keep_up_to_date) + } + /// Requests [Schedule](historical::Schedule) for an interval of given duration /// ending at specified date. /// diff --git a/src/market_data/historical/async.rs b/src/market_data/historical/async.rs index 3e4e8e02..d082481f 100644 --- a/src/market_data/historical/async.rs +++ b/src/market_data/historical/async.rs @@ -356,7 +356,18 @@ impl + Send> TickSubscription { /// A `HistoricalDataStreamingSubscription` that yields `HistoricalBarUpdate` values /// /// # Example -/// ```ignore +/// ```no_run +/// use ibapi::Client; +/// use ibapi::contracts::Contract; +/// use ibapi::market_data::historical::{ +/// BarSize, Duration, HistoricalBarUpdate, WhatToShow, historical_data_streaming +/// }; +/// use ibapi::market_data::TradingHours; +/// +/// # async fn example() -> Result<(), ibapi::Error> { +/// let client = Client::connect("127.0.0.1:4002", 100).await?; +/// let contract = Contract::stock("SPY").build(); +/// /// let mut subscription = historical_data_streaming( /// &client, /// &contract, @@ -375,11 +386,10 @@ impl + Send> TickSubscription { /// HistoricalBarUpdate::Update(bar) => { /// println!("Bar update: {} close={}", bar.date, bar.close); /// } -/// HistoricalBarUpdate::HistoricalEnd => { -/// println!("Initial historical data complete, now streaming"); -/// } /// } /// } +/// # Ok(()) +/// # } /// ``` /// /// # See Also @@ -427,13 +437,12 @@ pub async fn historical_data_streaming( /// Async subscription for streaming historical data with keepUpToDate=true. /// -/// This subscription first yields the initial historical bars, then continues -/// to yield streaming updates for the current bar as it builds. +/// This subscription first yields the initial historical bars as a `Historical` variant, +/// then continues to yield streaming updates for the current bar as `Update` variants. pub struct HistoricalDataStreamingSubscription { messages: AsyncInternalSubscription, server_version: i32, time_zone: &'static Tz, - pending_end: bool, error: Option, } @@ -443,7 +452,6 @@ impl HistoricalDataStreamingSubscription { messages, server_version, time_zone, - pending_end: false, error: None, } } @@ -451,17 +459,10 @@ impl HistoricalDataStreamingSubscription { /// Get the next update from the streaming subscription. /// /// Returns: - /// - `Some(HistoricalBarUpdate::Historical(data))` - Initial batch of historical bars - /// - `Some(HistoricalBarUpdate::HistoricalEnd)` - End of initial historical data + /// - `Some(HistoricalBarUpdate::Historical(data))` - Initial batch of historical bars (always first) /// - `Some(HistoricalBarUpdate::Update(bar))` - Streaming bar update /// - `None` - Subscription ended (connection closed or error) pub async fn next(&mut self) -> Option { - // Emit HistoricalEnd after Historical data was returned - if self.pending_end { - self.pending_end = false; - return Some(HistoricalBarUpdate::HistoricalEnd); - } - loop { match self.messages.next().await { Some(Ok(mut message)) => { @@ -470,7 +471,6 @@ impl HistoricalDataStreamingSubscription { // Initial historical data batch match decoders::decode_historical_data(self.server_version, self.time_zone, &mut message) { Ok(data) => { - self.pending_end = true; return Some(HistoricalBarUpdate::Historical(data)); } Err(e) => { @@ -1187,18 +1187,10 @@ mod tests { _ => panic!("Expected Historical variant"), } - // Second: receive HistoricalEnd marker + // Second: receive streaming update let update2 = subscription.next().await; - assert!(update2.is_some(), "Should receive HistoricalEnd"); + assert!(update2.is_some(), "Should receive streaming update"); match update2.unwrap() { - HistoricalBarUpdate::HistoricalEnd => {} - _ => panic!("Expected HistoricalEnd variant"), - } - - // Third: receive streaming update - let update3 = subscription.next().await; - assert!(update3.is_some(), "Should receive streaming update"); - match update3.unwrap() { HistoricalBarUpdate::Update(bar) => { assert_eq!(bar.open, 185.80, "Wrong open price in update"); assert_eq!(bar.high, 186.10, "Wrong high price in update"); @@ -1210,11 +1202,8 @@ mod tests { // Verify request message includes keepUpToDate=true let request_messages = message_bus.request_messages.read().unwrap(); assert_eq!(request_messages.len(), 1, "Should send one request"); - // The keepUpToDate field should be "1" (true) - assert!( - request_messages[0].fields.contains(&"1".to_string()), - "Request should have keepUpToDate=true" - ); + // keepUpToDate is at field index 21 (for non-bag contracts) + assert_eq!(request_messages[0].fields[21], "1", "Request should have keepUpToDate=true at field[21]"); } #[tokio::test] @@ -1254,23 +1243,11 @@ mod tests { _ => panic!("Expected Historical variant"), } - // Receive HistoricalEnd marker - let update2 = subscription.next().await; - assert!(update2.is_some(), "Should receive HistoricalEnd"); - match update2.unwrap() { - HistoricalBarUpdate::HistoricalEnd => {} - _ => panic!("Expected HistoricalEnd variant"), - } - // Verify request message includes keepUpToDate=false let request_messages = message_bus.request_messages.read().unwrap(); assert_eq!(request_messages.len(), 1, "Should send one request"); - // Find the keepUpToDate field - it should be "0" (false) - // The field order in historical data request puts keepUpToDate near the end - let request = &request_messages[0]; - // Check the last few fields for the "0" value - let fields_str = request.fields.join("|"); - assert!(fields_str.contains("|0|"), "Request should have keepUpToDate=false (0)"); + // keepUpToDate is at field index 21 (for non-bag contracts) + assert_eq!(request_messages[0].fields[21], "0", "Request should have keepUpToDate=false at field[21]"); } #[tokio::test] diff --git a/src/market_data/historical/common/decoders.rs b/src/market_data/historical/common/decoders.rs index c4397c56..db36985e 100644 --- a/src/market_data/historical/common/decoders.rs +++ b/src/market_data/historical/common/decoders.rs @@ -238,7 +238,6 @@ pub(crate) fn decode_histogram_data(message: &mut ResponseMessage) -> Result Result { message.skip(); // message type message.skip(); // request_id @@ -482,7 +481,6 @@ mod tests { assert_eq!(ticks[23].size, 0, "ticks[0].size"); } - #[cfg(feature = "async")] #[test] fn test_decode_historical_data_update() { let time_zone: &Tz = time_tz::timezones::db::america::NEW_YORK; @@ -502,7 +500,6 @@ mod tests { assert_eq!(bar.count, 150, "bar.count"); } - #[cfg(feature = "async")] #[test] fn test_decode_historical_data_update_without_count() { let time_zone: &Tz = time_tz::timezones::db::america::NEW_YORK; diff --git a/src/market_data/historical/mod.rs b/src/market_data/historical/mod.rs index 7764ce06..2bcfb769 100644 --- a/src/market_data/historical/mod.rs +++ b/src/market_data/historical/mod.rs @@ -329,18 +329,17 @@ pub struct HistoricalData { /// Update from historical data streaming with keepUpToDate=true. /// /// When requesting historical data with `keepUpToDate=true`, IBKR first sends -/// the historical bars, then continues streaming updates for the current bar. +/// the initial historical bars as a `Historical` variant, then continues +/// streaming real-time updates for the current bar as `Update` variants. /// The current bar is updated approximately every 4-6 seconds until a new /// bar begins. #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub enum HistoricalBarUpdate { - /// Initial batch of historical bars. + /// Initial batch of historical bars. Always received first. Historical(HistoricalData), /// Real-time update of the current (incomplete) bar. - /// Note: Multiple updates with the same timestamp will be sent as the bar builds. + /// Multiple updates with the same timestamp will be sent as the bar builds. Update(Bar), - /// Signals the end of the initial historical data batch. - HistoricalEnd, } /// Trading schedule describing sessions for a contract. @@ -577,7 +576,10 @@ impl TickDecoder for TickMidpoint { // Re-export TickSubscription and iterator types based on active feature #[cfg(all(feature = "sync", not(feature = "async")))] -pub use sync::{TickSubscription, TickSubscriptionIter, TickSubscriptionOwnedIter, TickSubscriptionTimeoutIter, TickSubscriptionTryIter}; +pub use sync::{ + HistoricalDataStreamingSubscription, TickSubscription, TickSubscriptionIter, TickSubscriptionOwnedIter, TickSubscriptionTimeoutIter, + TickSubscriptionTryIter, +}; #[cfg(feature = "async")] pub use r#async::{historical_data_streaming, HistoricalDataStreamingSubscription, TickSubscription}; diff --git a/src/market_data/historical/sync.rs b/src/market_data/historical/sync.rs index 3363c774..d1f0ffc0 100644 --- a/src/market_data/historical/sync.rs +++ b/src/market_data/historical/sync.rs @@ -12,8 +12,12 @@ use crate::protocol::{check_version, Features}; use crate::transport::{InternalSubscription, Response}; use crate::{client::sync::Client, Error, MAX_RETRIES}; +use time_tz::Tz; + use super::common::{decoders, encoders}; -use super::{BarSize, Duration, HistogramEntry, HistoricalData, Schedule, TickBidAsk, TickDecoder, TickLast, TickMidpoint, WhatToShow}; +use super::{ + BarSize, Duration, HistogramEntry, HistoricalBarUpdate, HistoricalData, Schedule, TickBidAsk, TickDecoder, TickLast, TickMidpoint, WhatToShow, +}; use crate::market_data::TradingHours; // Returns the timestamp of earliest available historical data for a contract and data type. @@ -249,6 +253,172 @@ pub(crate) fn histogram_data( } } +// === Historical Data Streaming with keepUpToDate === + +/// Requests historical data for a contract with optional streaming updates. +/// +/// When `keep_up_to_date` is `true`, this function requests historical bars and then +/// continues to receive streaming updates for the current (incomplete) bar. IBKR sends +/// updates approximately every 4-6 seconds until the bar completes, at which point a +/// new bar begins. +/// +/// When `keep_up_to_date` is `false`, only the initial historical data is returned +/// and the subscription ends after delivering the data. +pub(crate) fn historical_data_streaming( + client: &Client, + contract: &Contract, + duration: Duration, + bar_size: BarSize, + what_to_show: Option, + trading_hours: TradingHours, + keep_up_to_date: bool, +) -> Result { + if !contract.trading_class.is_empty() || contract.contract_id > 0 { + check_version(client.server_version(), Features::TRADING_CLASS)?; + } + + // Note: end_date must be None when keepUpToDate=true (IBKR requirement) + let builder = client.request(); + let request = encoders::encode_request_historical_data( + client.server_version(), + builder.request_id(), + contract, + None, // end_date must be None for keepUpToDate + duration, + bar_size, + what_to_show, + trading_hours.use_rth(), + keep_up_to_date, + Vec::::default(), + )?; + + let subscription = builder.send_raw(request)?; + + // Get the timezone directly + let tz: &'static Tz = client.time_zone.unwrap_or_else(|| { + warn!("server timezone unknown. assuming UTC, but that may be incorrect!"); + time_tz::timezones::db::UTC + }); + + Ok(HistoricalDataStreamingSubscription::new(subscription, client.server_version, tz)) +} + +/// Blocking subscription for streaming historical data with keepUpToDate=true. +/// +/// This subscription first yields the initial historical bars as a `Historical` variant, +/// then continues to yield streaming updates for the current bar as `Update` variants. +pub struct HistoricalDataStreamingSubscription { + messages: InternalSubscription, + server_version: i32, + time_zone: &'static Tz, + error: Mutex>, +} + +impl HistoricalDataStreamingSubscription { + fn new(messages: InternalSubscription, server_version: i32, time_zone: &'static Tz) -> Self { + Self { + messages, + server_version, + time_zone, + error: Mutex::new(None), + } + } + + /// Block until the next update is available. + /// + /// Returns: + /// - `Some(HistoricalBarUpdate::Historical(data))` - Initial batch of historical bars (always first) + /// - `Some(HistoricalBarUpdate::Update(bar))` - Streaming bar update + /// - `None` - Subscription ended (connection closed or error) + pub fn next(&self) -> Option { + self.next_helper(|| self.messages.next()) + } + + /// Attempt to fetch the next update without blocking. + pub fn try_next(&self) -> Option { + self.next_helper(|| self.messages.try_next()) + } + + /// Wait up to `duration` for the next update to arrive. + pub fn next_timeout(&self, duration: std::time::Duration) -> Option { + self.next_helper(|| self.messages.next_timeout(duration)) + } + + fn next_helper(&self, next_response: F) -> Option + where + F: Fn() -> Option, + { + self.clear_error(); + + loop { + match next_response() { + Some(Ok(mut message)) => { + match message.message_type() { + IncomingMessages::HistoricalData => { + // Initial historical data batch + match decoders::decode_historical_data(self.server_version, self.time_zone, &mut message) { + Ok(data) => { + return Some(HistoricalBarUpdate::Historical(data)); + } + Err(e) => { + self.set_error(e); + return None; + } + } + } + IncomingMessages::HistoricalDataUpdate => { + // Streaming bar update + match decoders::decode_historical_data_update(self.time_zone, &mut message) { + Ok(bar) => { + return Some(HistoricalBarUpdate::Update(bar)); + } + Err(e) => { + self.set_error(e); + return None; + } + } + } + IncomingMessages::Error => { + self.set_error(Error::from(message)); + return None; + } + _ => { + // Skip unexpected messages + debug!("unexpected message in streaming subscription: {:?}", message.message_type()); + continue; + } + } + } + Some(Err(e)) => { + self.set_error(e); + return None; + } + None => { + return None; + } + } + } + } + + /// Returns and clears the last error that occurred, if any. + pub fn error(&self) -> Option { + self.error.lock().unwrap().take() + } + + fn set_error(&self, e: Error) { + *self.error.lock().unwrap() = Some(e); + } + + fn clear_error(&self) { + *self.error.lock().unwrap() = None; + } + + /// Cancel the subscription. + pub fn cancel(&self) { + self.messages.cancel(); + } +} + // TickSubscription and related types /// Shared subscription handle that decodes historical tick batches as they arrive. @@ -1090,4 +1260,144 @@ mod tests { "time_zone should return the client's time zone when it is set" ); } + + #[test] + fn test_historical_data_streaming_with_updates() { + let message_bus = Arc::new(MessageBusStub { + request_messages: RwLock::new(vec![]), + response_messages: vec![ + // Initial historical data (message type 17) + "17\09000\020230315 09:30:00\020230315 10:30:00\01\01678886400\0185.50\0186.00\0185.25\0185.75\01000\0185.70\0100\0".to_owned(), + // Streaming update (message type 90) + "90\09000\0-1\01678890000\0185.80\0186.10\0185.60\0185.90\0500\0185.85\050\0".to_owned(), + ], + }); + + let mut client = Client::stubbed(message_bus.clone(), server_versions::SIZE_RULES); + client.time_zone = Some(time_tz::timezones::db::UTC); + + let contract = Contract::stock("SPY").build(); + + let subscription = historical_data_streaming( + &client, + &contract, + Duration::days(1), + BarSize::Hour, + Some(WhatToShow::Trades), + TradingHours::Regular, + true, + ) + .expect("streaming request should succeed"); + + // First: receive initial historical data + let update1 = subscription.next(); + assert!(update1.is_some(), "Should receive initial historical data"); + match update1.unwrap() { + HistoricalBarUpdate::Historical(data) => { + assert_eq!(data.bars.len(), 1, "Should have 1 initial bar"); + assert_eq!(data.bars[0].open, 185.50, "Wrong open price"); + } + _ => panic!("Expected Historical variant"), + } + + // Second: receive streaming update + let update2 = subscription.next(); + assert!(update2.is_some(), "Should receive streaming update"); + match update2.unwrap() { + HistoricalBarUpdate::Update(bar) => { + assert_eq!(bar.open, 185.80, "Wrong open price in update"); + assert_eq!(bar.high, 186.10, "Wrong high price in update"); + assert_eq!(bar.close, 185.90, "Wrong close price in update"); + } + _ => panic!("Expected Update variant"), + } + + // Verify request message includes keepUpToDate=true + let request_messages = message_bus.request_messages.read().unwrap(); + assert_eq!(request_messages.len(), 1, "Should send one request"); + // keepUpToDate is at field index 21 (for non-bag contracts) + assert_eq!(request_messages[0].fields[21], "1", "Request should have keepUpToDate=true at field[21]"); + } + + #[test] + fn test_historical_data_streaming_keep_up_to_date_false() { + let message_bus = Arc::new(MessageBusStub { + request_messages: RwLock::new(vec![]), + response_messages: vec![ + // Initial historical data only + "17\09000\020230315 09:30:00\020230315 10:30:00\01\01678886400\0185.50\0186.00\0185.25\0185.75\01000\0185.70\0100\0".to_owned(), + ], + }); + + let mut client = Client::stubbed(message_bus.clone(), server_versions::SIZE_RULES); + client.time_zone = Some(time_tz::timezones::db::UTC); + + let contract = Contract::stock("SPY").build(); + + let subscription = historical_data_streaming( + &client, + &contract, + Duration::days(1), + BarSize::Hour, + Some(WhatToShow::Trades), + TradingHours::Regular, + false, // keep_up_to_date = false + ) + .expect("streaming request should succeed"); + + // Receive initial historical data + let update1 = subscription.next(); + assert!(update1.is_some(), "Should receive initial historical data"); + match update1.unwrap() { + HistoricalBarUpdate::Historical(data) => { + assert_eq!(data.bars.len(), 1, "Should have 1 initial bar"); + } + _ => panic!("Expected Historical variant"), + } + + // Verify request message includes keepUpToDate=false + let request_messages = message_bus.request_messages.read().unwrap(); + assert_eq!(request_messages.len(), 1, "Should send one request"); + // keepUpToDate is at field index 21 (for non-bag contracts) + assert_eq!(request_messages[0].fields[21], "0", "Request should have keepUpToDate=false at field[21]"); + } + + #[test] + fn test_historical_data_streaming_error_response() { + let message_bus = Arc::new(MessageBusStub { + request_messages: RwLock::new(vec![]), + response_messages: vec![ + // Error response + "4\02\09000\0162\0Historical Market Data Service error message:No market data permissions.\0".to_owned(), + ], + }); + + let mut client = Client::stubbed(message_bus, server_versions::SIZE_RULES); + client.time_zone = Some(time_tz::timezones::db::UTC); + + let contract = Contract::stock("SPY").build(); + + let subscription = historical_data_streaming( + &client, + &contract, + Duration::days(1), + BarSize::Hour, + Some(WhatToShow::Trades), + TradingHours::Regular, + true, + ) + .expect("streaming request should succeed"); + + // Should return None due to error + let update = subscription.next(); + assert!(update.is_none(), "Should return None on error"); + + // Error should be accessible + let error = subscription.error(); + assert!(error.is_some(), "Error should be stored"); + assert!( + error.unwrap().to_string().contains("No market data permissions"), + "Error should contain the message" + ); + } }