Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src/client/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use crate::accounts::types::{AccountGroup, AccountId, ContractId, ModelCode};
use crate::accounts::{AccountSummaryResult, AccountUpdate, AccountUpdateMulti, FamilyCode, PnL, PnLSingle, PositionUpdate, PositionUpdateMulti};
use crate::contracts::Contract;
use crate::display_groups;
use crate::display_groups::DisplayGroupUpdate;
use crate::market_data::builder::MarketDataBuilder;
use crate::market_data::TradingHours;
use crate::orders::OrderBuilder;
Expand Down Expand Up @@ -603,14 +602,17 @@ impl Client {
/// async fn main() {
/// let client = Client::connect("127.0.0.1:7497", 100).await.expect("connection failed");
///
/// let mut subscription = client.subscribe_to_group_events(1).await.expect("error subscribing to group events");
/// let mut subscription = client.subscribe_to_group_events(1).await.expect("subscription failed");
///
/// // Update the displayed contract
/// subscription.update("265598@SMART").await.expect("update failed");
///
/// while let Some(event) = subscription.next().await {
/// println!("Received group event: {:?}", event);
/// }
/// }
/// ```
pub async fn subscribe_to_group_events(&self, group_id: i32) -> Result<Subscription<DisplayGroupUpdate>, Error> {
pub async fn subscribe_to_group_events(&self, group_id: i32) -> Result<display_groups::DisplayGroupSubscription, Error> {
display_groups::r#async::subscribe_to_group_events(self, group_id).await
}

Expand Down
7 changes: 5 additions & 2 deletions src/client/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use crate::accounts::{AccountSummaryResult, AccountUpdate, AccountUpdateMulti, F
use crate::connection::common::StartupMessageCallback;
use crate::connection::{sync::Connection, ConnectionMetadata};
use crate::contracts::{Contract, OptionComputation, SecurityType};
use crate::display_groups::DisplayGroupUpdate;
use crate::errors::Error;
use crate::market_data::builder::MarketDataBuilder;
use crate::market_data::historical::{self, HistogramEntry};
Expand Down Expand Up @@ -512,11 +511,15 @@ impl Client {
/// let client = Client::connect("127.0.0.1:7497", 100).expect("connection failed");
///
/// let subscription = client.subscribe_to_group_events(1).expect("subscription failed");
///
/// // Update the displayed contract
/// subscription.update("265598@SMART").expect("update failed");
///
/// for event in &subscription {
/// println!("group event: {:?}", event);
/// }
/// ```
pub fn subscribe_to_group_events(&self, group_id: i32) -> Result<Subscription<DisplayGroupUpdate>, Error> {
pub fn subscribe_to_group_events(&self, group_id: i32) -> Result<display_groups::blocking::DisplayGroupSubscription, Error> {
display_groups::sync::subscribe_to_group_events(self, group_id)
}

Expand Down
77 changes: 73 additions & 4 deletions src/display_groups/async.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,71 @@
//! Asynchronous implementation of display groups functionality

use std::ops::{Deref, DerefMut};
use std::sync::Arc;

use crate::client::ClientRequestBuilders;
use crate::subscriptions::Subscription;
use crate::transport::AsyncMessageBus;
use crate::{Client, Error};

use super::common::stream_decoders::DisplayGroupUpdate;
use super::encoders;

/// A subscription to display group events with the ability to update the displayed contract.
///
/// Created by [`Client::subscribe_to_group_events`](crate::Client::subscribe_to_group_events).
/// Derefs to `Subscription<DisplayGroupUpdate>` for `next()`, `cancel()`, etc.
pub struct DisplayGroupSubscription {
inner: Subscription<DisplayGroupUpdate>,
message_bus: Arc<dyn AsyncMessageBus>,
}

impl DisplayGroupSubscription {
/// Updates the contract displayed in the TWS display group.
///
/// # Arguments
/// * `contract_info` - Contract to display:
/// - `"contractID@exchange"` for individual contracts (e.g., "265598@SMART")
/// - `"none"` for empty selection
/// - `"combo"` for combination contracts
pub async fn update(&self, contract_info: &str) -> Result<(), Error> {
let request_id = self.inner.request_id().expect("subscription has no request ID");
let request = encoders::encode_update_display_group(request_id, contract_info)?;
self.message_bus.send_message(request).await
}
}

impl Deref for DisplayGroupSubscription {
type Target = Subscription<DisplayGroupUpdate>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}

impl DerefMut for DisplayGroupSubscription {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}

/// Subscribes to display group events for the specified group.
///
/// Display Groups are a TWS-only feature (not available in IB Gateway).
/// When subscribed, you receive updates whenever the user changes the contract
/// displayed in that group within TWS.
/// Returns a [`DisplayGroupSubscription`] that receives updates when the user changes
/// the displayed contract in TWS, and supports [`update()`](DisplayGroupSubscription::update)
/// to change the displayed contract programmatically.
///
/// # Arguments
/// * `client` - The connected client
/// * `group_id` - The ID of the group to subscribe to (1-9)
pub async fn subscribe_to_group_events(client: &Client, group_id: i32) -> Result<Subscription<DisplayGroupUpdate>, Error> {
pub async fn subscribe_to_group_events(client: &Client, group_id: i32) -> Result<DisplayGroupSubscription, Error> {
let builder = client.request();
let request = encoders::encode_subscribe_to_group_events(builder.request_id(), group_id)?;
builder.send::<DisplayGroupUpdate>(request).await
let inner = builder.send::<DisplayGroupUpdate>(request).await?;
Ok(DisplayGroupSubscription {
inner,
message_bus: client.message_bus.clone(),
})
}

#[cfg(test)]
Expand Down Expand Up @@ -75,6 +121,29 @@ mod tests {
assert_eq!(update.contract_info, "");
}

#[tokio::test]
async fn test_update_display_group() {
let message_bus = Arc::new(MessageBusStub {
request_messages: RwLock::new(vec![]),
// Need a response so subscription can be created
response_messages: vec!["68\x001\x009000\x00265598@SMART\x00".to_string()],
});

let client = Client::stubbed(message_bus.clone(), 176);

let subscription = subscribe_to_group_events(&client, 1).await.expect("failed to subscribe");
subscription.update("265598@SMART").await.expect("update failed");

let requests = message_bus.request_messages.read().unwrap();
// First request is subscribe, second is update
assert_eq!(requests.len(), 2);

let req = &requests[1];
assert_eq!(req[0], "69"); // UpdateDisplayGroup
assert_eq!(req[1], "1"); // Version
assert_eq!(req[3], "265598@SMART"); // Contract info
}

#[tokio::test]
async fn test_subscribe_to_group_events_skips_wrong_message_type() {
let message_bus = Arc::new(MessageBusStub {
Expand Down
39 changes: 39 additions & 0 deletions src/display_groups/common/encoders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,21 @@ pub(crate) fn encode_unsubscribe_from_group_events(request_id: i32) -> Result<Re
Ok(message)
}

/// Encodes a request to update the contract displayed in a display group.
///
/// # Arguments
/// * `request_id` - The request ID (should match the subscription request ID)
/// * `contract_info` - Contract to display, format: "contractID@exchange" (e.g., "265598@SMART"),
/// "none" for empty selection, or "combo" for combination contracts
pub(crate) fn encode_update_display_group(request_id: i32, contract_info: &str) -> Result<RequestMessage, Error> {
let mut message = RequestMessage::new();
message.push_field(&OutgoingMessages::UpdateDisplayGroup);
message.push_field(&VERSION);
message.push_field(&request_id);
message.push_field(&contract_info);
Ok(message)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -52,4 +67,28 @@ mod tests {
assert_eq!(message[1], "1"); // version
assert_eq!(message[2], request_id.to_field());
}

#[test]
fn test_encode_update_display_group() {
let request_id = 9000;
let contract_info = "265598@SMART";

let message = encode_update_display_group(request_id, contract_info).expect("encoding failed");

assert_eq!(message[0], OutgoingMessages::UpdateDisplayGroup.to_field());
assert_eq!(message[1], "1"); // version
assert_eq!(message[2], request_id.to_field());
assert_eq!(message[3], contract_info);
}

#[test]
fn test_encode_update_display_group_none() {
let request_id = 9000;
let contract_info = "none";

let message = encode_update_display_group(request_id, contract_info).expect("encoding failed");

assert_eq!(message[0], OutgoingMessages::UpdateDisplayGroup.to_field());
assert_eq!(message[3], "none");
}
}
12 changes: 12 additions & 0 deletions src/display_groups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,15 @@ pub(crate) mod sync;
pub(crate) use common::encoders;

pub use common::DisplayGroupUpdate;

#[cfg(feature = "sync")]
/// Blocking display group helpers powered by the synchronous transport.
pub mod blocking {
pub(crate) use super::sync::*;
}

#[cfg(all(feature = "sync", not(feature = "async")))]
pub use sync::DisplayGroupSubscription;

#[cfg(feature = "async")]
pub use r#async::DisplayGroupSubscription;
96 changes: 92 additions & 4 deletions src/display_groups/sync.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,111 @@
//! Synchronous implementation of display groups functionality

use std::ops::Deref;
use std::sync::Arc;

use crate::client::blocking::{ClientRequestBuilders, Subscription};
use crate::client::sync::Client;
use crate::transport::MessageBus;
use crate::Error;

use super::common::stream_decoders::DisplayGroupUpdate;
use super::encoders;

/// A subscription to display group events with the ability to update the displayed contract.
///
/// Created by [`Client::subscribe_to_group_events`](crate::client::blocking::Client::subscribe_to_group_events).
/// Derefs to `Subscription<DisplayGroupUpdate>` for `next()`, `cancel()`, `iter()`, etc.
pub struct DisplayGroupSubscription {
inner: Subscription<DisplayGroupUpdate>,
message_bus: Arc<dyn MessageBus>,
}

impl DisplayGroupSubscription {
/// Updates the contract displayed in the TWS display group.
///
/// # Arguments
/// * `contract_info` - Contract to display:
/// - `"contractID@exchange"` for individual contracts (e.g., "265598@SMART")
/// - `"none"` for empty selection
/// - `"combo"` for combination contracts
pub fn update(&self, contract_info: &str) -> Result<(), Error> {
let request_id = self.inner.request_id().expect("subscription has no request ID");
let request = encoders::encode_update_display_group(request_id, contract_info)?;
self.message_bus.send_message(&request)
}
}

impl Deref for DisplayGroupSubscription {
type Target = Subscription<DisplayGroupUpdate>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}

impl<'a> IntoIterator for &'a DisplayGroupSubscription {
type Item = DisplayGroupUpdate;
type IntoIter = <&'a Subscription<DisplayGroupUpdate> as IntoIterator>::IntoIter;

fn into_iter(self) -> Self::IntoIter {
(&self.inner).into_iter()
}
}

impl IntoIterator for DisplayGroupSubscription {
type Item = DisplayGroupUpdate;
type IntoIter = <Subscription<DisplayGroupUpdate> as IntoIterator>::IntoIter;

fn into_iter(self) -> Self::IntoIter {
self.inner.into_iter()
}
}

/// Subscribes to display group events for the specified group.
///
/// Display Groups are a TWS-only feature (not available in IB Gateway).
/// When subscribed, you receive updates whenever the user changes the contract
/// displayed in that group within TWS.
/// Returns a [`DisplayGroupSubscription`] that receives updates when the user changes
/// the displayed contract in TWS, and supports [`update()`](DisplayGroupSubscription::update)
/// to change the displayed contract programmatically.
///
/// # Arguments
/// * `client` - The connected client
/// * `group_id` - The ID of the group to subscribe to (1-9)
pub fn subscribe_to_group_events(client: &Client, group_id: i32) -> Result<Subscription<DisplayGroupUpdate>, Error> {
pub fn subscribe_to_group_events(client: &Client, group_id: i32) -> Result<DisplayGroupSubscription, Error> {
let builder = client.request();
let request = encoders::encode_subscribe_to_group_events(builder.request_id(), group_id)?;
builder.send(request)
let inner = builder.send(request)?;
Ok(DisplayGroupSubscription {
inner,
message_bus: client.message_bus.clone(),
})
}

#[cfg(test)]
mod tests {
use super::*;
use crate::stubs::MessageBusStub;
use std::sync::{Arc, RwLock};

#[test]
fn test_update_display_group() {
let message_bus = Arc::new(MessageBusStub {
request_messages: RwLock::new(vec![]),
// Need a response so subscription can be created
response_messages: vec!["68\x001\x009000\x00265598@SMART\x00".to_string()],
});

let client = Client::stubbed(message_bus.clone(), 176);

let subscription = subscribe_to_group_events(&client, 1).expect("failed to subscribe");
subscription.update("265598@SMART").expect("update failed");

let requests = message_bus.request_messages.read().unwrap();
// First request is subscribe, second is update
assert_eq!(requests.len(), 2);

let req = &requests[1];
assert_eq!(req[0], "69"); // UpdateDisplayGroup
assert_eq!(req[1], "1"); // Version
assert_eq!(req[3], "265598@SMART"); // Contract info
}
}
5 changes: 5 additions & 0 deletions src/subscriptions/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,11 @@ impl<T> Subscription<T> {
SubscriptionInner::PreDecoded { receiver } => receiver.recv().await,
}
}

/// Get the request ID associated with this subscription
pub fn request_id(&self) -> Option<i32> {
self.request_id
}
}

impl<T> Subscription<T> {
Expand Down
5 changes: 5 additions & 0 deletions src/subscriptions/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ impl<T: StreamDecoder<T>> Subscription<T> {
}
}

/// Returns the request ID associated with this subscription.
pub fn request_id(&self) -> Option<i32> {
self.request_id
}

/// Returns the next available value, blocking if necessary until a value becomes available.
///
/// # Examples
Expand Down