diff --git a/src/client/async.rs b/src/client/async.rs index 486d0764..4876f0b5 100644 --- a/src/client/async.rs +++ b/src/client/async.rs @@ -22,7 +22,6 @@ use crate::accounts::types::{AccountGroup, AccountId, ContractId, ModelCode}; use crate::accounts::{AccountSummaryResult, AccountUpdate, AccountUpdateMulti, FamilyCode, PnL, PnLSingle, PositionUpdate, PositionUpdateMulti}; use crate::contracts::Contract; use crate::display_groups; -use crate::display_groups::DisplayGroupUpdate; use crate::market_data::builder::MarketDataBuilder; use crate::market_data::TradingHours; use crate::orders::OrderBuilder; @@ -603,14 +602,17 @@ impl Client { /// async fn main() { /// let client = Client::connect("127.0.0.1:7497", 100).await.expect("connection failed"); /// - /// let mut subscription = client.subscribe_to_group_events(1).await.expect("error subscribing to group events"); + /// let mut subscription = client.subscribe_to_group_events(1).await.expect("subscription failed"); + /// + /// // Update the displayed contract + /// subscription.update("265598@SMART").await.expect("update failed"); /// /// while let Some(event) = subscription.next().await { /// println!("Received group event: {:?}", event); /// } /// } /// ``` - pub async fn subscribe_to_group_events(&self, group_id: i32) -> Result, Error> { + pub async fn subscribe_to_group_events(&self, group_id: i32) -> Result { display_groups::r#async::subscribe_to_group_events(self, group_id).await } diff --git a/src/client/sync.rs b/src/client/sync.rs index 3fabfba3..c5062165 100644 --- a/src/client/sync.rs +++ b/src/client/sync.rs @@ -18,7 +18,6 @@ use crate::accounts::{AccountSummaryResult, AccountUpdate, AccountUpdateMulti, F use crate::connection::common::StartupMessageCallback; use crate::connection::{sync::Connection, ConnectionMetadata}; use crate::contracts::{Contract, OptionComputation, SecurityType}; -use crate::display_groups::DisplayGroupUpdate; use crate::errors::Error; use crate::market_data::builder::MarketDataBuilder; use crate::market_data::historical::{self, HistogramEntry}; @@ -512,11 +511,15 @@ impl Client { /// let client = Client::connect("127.0.0.1:7497", 100).expect("connection failed"); /// /// let subscription = client.subscribe_to_group_events(1).expect("subscription failed"); + /// + /// // Update the displayed contract + /// subscription.update("265598@SMART").expect("update failed"); + /// /// for event in &subscription { /// println!("group event: {:?}", event); /// } /// ``` - pub fn subscribe_to_group_events(&self, group_id: i32) -> Result, Error> { + pub fn subscribe_to_group_events(&self, group_id: i32) -> Result { display_groups::sync::subscribe_to_group_events(self, group_id) } diff --git a/src/display_groups/async.rs b/src/display_groups/async.rs index 93d0c49e..db13d591 100644 --- a/src/display_groups/async.rs +++ b/src/display_groups/async.rs @@ -1,25 +1,71 @@ //! Asynchronous implementation of display groups functionality +use std::ops::{Deref, DerefMut}; +use std::sync::Arc; + use crate::client::ClientRequestBuilders; use crate::subscriptions::Subscription; +use crate::transport::AsyncMessageBus; use crate::{Client, Error}; use super::common::stream_decoders::DisplayGroupUpdate; use super::encoders; +/// A subscription to display group events with the ability to update the displayed contract. +/// +/// Created by [`Client::subscribe_to_group_events`](crate::Client::subscribe_to_group_events). +/// Derefs to `Subscription` for `next()`, `cancel()`, etc. +pub struct DisplayGroupSubscription { + inner: Subscription, + message_bus: Arc, +} + +impl DisplayGroupSubscription { + /// Updates the contract displayed in the TWS display group. + /// + /// # Arguments + /// * `contract_info` - Contract to display: + /// - `"contractID@exchange"` for individual contracts (e.g., "265598@SMART") + /// - `"none"` for empty selection + /// - `"combo"` for combination contracts + pub async fn update(&self, contract_info: &str) -> Result<(), Error> { + let request_id = self.inner.request_id().expect("subscription has no request ID"); + let request = encoders::encode_update_display_group(request_id, contract_info)?; + self.message_bus.send_message(request).await + } +} + +impl Deref for DisplayGroupSubscription { + type Target = Subscription; + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl DerefMut for DisplayGroupSubscription { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + /// Subscribes to display group events for the specified group. /// /// Display Groups are a TWS-only feature (not available in IB Gateway). -/// When subscribed, you receive updates whenever the user changes the contract -/// displayed in that group within TWS. +/// Returns a [`DisplayGroupSubscription`] that receives updates when the user changes +/// the displayed contract in TWS, and supports [`update()`](DisplayGroupSubscription::update) +/// to change the displayed contract programmatically. /// /// # Arguments /// * `client` - The connected client /// * `group_id` - The ID of the group to subscribe to (1-9) -pub async fn subscribe_to_group_events(client: &Client, group_id: i32) -> Result, Error> { +pub async fn subscribe_to_group_events(client: &Client, group_id: i32) -> Result { let builder = client.request(); let request = encoders::encode_subscribe_to_group_events(builder.request_id(), group_id)?; - builder.send::(request).await + let inner = builder.send::(request).await?; + Ok(DisplayGroupSubscription { + inner, + message_bus: client.message_bus.clone(), + }) } #[cfg(test)] @@ -75,6 +121,29 @@ mod tests { assert_eq!(update.contract_info, ""); } + #[tokio::test] + async fn test_update_display_group() { + let message_bus = Arc::new(MessageBusStub { + request_messages: RwLock::new(vec![]), + // Need a response so subscription can be created + response_messages: vec!["68\x001\x009000\x00265598@SMART\x00".to_string()], + }); + + let client = Client::stubbed(message_bus.clone(), 176); + + let subscription = subscribe_to_group_events(&client, 1).await.expect("failed to subscribe"); + subscription.update("265598@SMART").await.expect("update failed"); + + let requests = message_bus.request_messages.read().unwrap(); + // First request is subscribe, second is update + assert_eq!(requests.len(), 2); + + let req = &requests[1]; + assert_eq!(req[0], "69"); // UpdateDisplayGroup + assert_eq!(req[1], "1"); // Version + assert_eq!(req[3], "265598@SMART"); // Contract info + } + #[tokio::test] async fn test_subscribe_to_group_events_skips_wrong_message_type() { let message_bus = Arc::new(MessageBusStub { diff --git a/src/display_groups/common/encoders.rs b/src/display_groups/common/encoders.rs index 5a39c9ed..bb098ac5 100644 --- a/src/display_groups/common/encoders.rs +++ b/src/display_groups/common/encoders.rs @@ -24,6 +24,21 @@ pub(crate) fn encode_unsubscribe_from_group_events(request_id: i32) -> Result Result { + let mut message = RequestMessage::new(); + message.push_field(&OutgoingMessages::UpdateDisplayGroup); + message.push_field(&VERSION); + message.push_field(&request_id); + message.push_field(&contract_info); + Ok(message) +} + #[cfg(test)] mod tests { use super::*; @@ -52,4 +67,28 @@ mod tests { assert_eq!(message[1], "1"); // version assert_eq!(message[2], request_id.to_field()); } + + #[test] + fn test_encode_update_display_group() { + let request_id = 9000; + let contract_info = "265598@SMART"; + + let message = encode_update_display_group(request_id, contract_info).expect("encoding failed"); + + assert_eq!(message[0], OutgoingMessages::UpdateDisplayGroup.to_field()); + assert_eq!(message[1], "1"); // version + assert_eq!(message[2], request_id.to_field()); + assert_eq!(message[3], contract_info); + } + + #[test] + fn test_encode_update_display_group_none() { + let request_id = 9000; + let contract_info = "none"; + + let message = encode_update_display_group(request_id, contract_info).expect("encoding failed"); + + assert_eq!(message[0], OutgoingMessages::UpdateDisplayGroup.to_field()); + assert_eq!(message[3], "none"); + } } diff --git a/src/display_groups/mod.rs b/src/display_groups/mod.rs index 96f3f65e..8f36cb3c 100644 --- a/src/display_groups/mod.rs +++ b/src/display_groups/mod.rs @@ -18,3 +18,15 @@ pub(crate) mod sync; pub(crate) use common::encoders; pub use common::DisplayGroupUpdate; + +#[cfg(feature = "sync")] +/// Blocking display group helpers powered by the synchronous transport. +pub mod blocking { + pub(crate) use super::sync::*; +} + +#[cfg(all(feature = "sync", not(feature = "async")))] +pub use sync::DisplayGroupSubscription; + +#[cfg(feature = "async")] +pub use r#async::DisplayGroupSubscription; diff --git a/src/display_groups/sync.rs b/src/display_groups/sync.rs index a99fb352..dc920831 100644 --- a/src/display_groups/sync.rs +++ b/src/display_groups/sync.rs @@ -1,23 +1,111 @@ //! Synchronous implementation of display groups functionality +use std::ops::Deref; +use std::sync::Arc; + use crate::client::blocking::{ClientRequestBuilders, Subscription}; use crate::client::sync::Client; +use crate::transport::MessageBus; use crate::Error; use super::common::stream_decoders::DisplayGroupUpdate; use super::encoders; +/// A subscription to display group events with the ability to update the displayed contract. +/// +/// Created by [`Client::subscribe_to_group_events`](crate::client::blocking::Client::subscribe_to_group_events). +/// Derefs to `Subscription` for `next()`, `cancel()`, `iter()`, etc. +pub struct DisplayGroupSubscription { + inner: Subscription, + message_bus: Arc, +} + +impl DisplayGroupSubscription { + /// Updates the contract displayed in the TWS display group. + /// + /// # Arguments + /// * `contract_info` - Contract to display: + /// - `"contractID@exchange"` for individual contracts (e.g., "265598@SMART") + /// - `"none"` for empty selection + /// - `"combo"` for combination contracts + pub fn update(&self, contract_info: &str) -> Result<(), Error> { + let request_id = self.inner.request_id().expect("subscription has no request ID"); + let request = encoders::encode_update_display_group(request_id, contract_info)?; + self.message_bus.send_message(&request) + } +} + +impl Deref for DisplayGroupSubscription { + type Target = Subscription; + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl<'a> IntoIterator for &'a DisplayGroupSubscription { + type Item = DisplayGroupUpdate; + type IntoIter = <&'a Subscription as IntoIterator>::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + (&self.inner).into_iter() + } +} + +impl IntoIterator for DisplayGroupSubscription { + type Item = DisplayGroupUpdate; + type IntoIter = as IntoIterator>::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.inner.into_iter() + } +} + /// Subscribes to display group events for the specified group. /// /// Display Groups are a TWS-only feature (not available in IB Gateway). -/// When subscribed, you receive updates whenever the user changes the contract -/// displayed in that group within TWS. +/// Returns a [`DisplayGroupSubscription`] that receives updates when the user changes +/// the displayed contract in TWS, and supports [`update()`](DisplayGroupSubscription::update) +/// to change the displayed contract programmatically. /// /// # Arguments /// * `client` - The connected client /// * `group_id` - The ID of the group to subscribe to (1-9) -pub fn subscribe_to_group_events(client: &Client, group_id: i32) -> Result, Error> { +pub fn subscribe_to_group_events(client: &Client, group_id: i32) -> Result { let builder = client.request(); let request = encoders::encode_subscribe_to_group_events(builder.request_id(), group_id)?; - builder.send(request) + let inner = builder.send(request)?; + Ok(DisplayGroupSubscription { + inner, + message_bus: client.message_bus.clone(), + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::stubs::MessageBusStub; + use std::sync::{Arc, RwLock}; + + #[test] + fn test_update_display_group() { + let message_bus = Arc::new(MessageBusStub { + request_messages: RwLock::new(vec![]), + // Need a response so subscription can be created + response_messages: vec!["68\x001\x009000\x00265598@SMART\x00".to_string()], + }); + + let client = Client::stubbed(message_bus.clone(), 176); + + let subscription = subscribe_to_group_events(&client, 1).expect("failed to subscribe"); + subscription.update("265598@SMART").expect("update failed"); + + let requests = message_bus.request_messages.read().unwrap(); + // First request is subscribe, second is update + assert_eq!(requests.len(), 2); + + let req = &requests[1]; + assert_eq!(req[0], "69"); // UpdateDisplayGroup + assert_eq!(req[1], "1"); // Version + assert_eq!(req[3], "265598@SMART"); // Contract info + } } diff --git a/src/subscriptions/async.rs b/src/subscriptions/async.rs index d59a3fa2..f3a49125 100644 --- a/src/subscriptions/async.rs +++ b/src/subscriptions/async.rs @@ -227,6 +227,11 @@ impl Subscription { SubscriptionInner::PreDecoded { receiver } => receiver.recv().await, } } + + /// Get the request ID associated with this subscription + pub fn request_id(&self) -> Option { + self.request_id + } } impl Subscription { diff --git a/src/subscriptions/sync.rs b/src/subscriptions/sync.rs index a8a3c5b7..3476de31 100644 --- a/src/subscriptions/sync.rs +++ b/src/subscriptions/sync.rs @@ -95,6 +95,11 @@ impl> Subscription { } } + /// Returns the request ID associated with this subscription. + pub fn request_id(&self) -> Option { + self.request_id + } + /// Returns the next available value, blocking if necessary until a value becomes available. /// /// # Examples