diff --git a/src/market_data/historical/common/decoders.rs b/src/market_data/historical/common/decoders.rs index db36985e..6f17af8e 100644 --- a/src/market_data/historical/common/decoders.rs +++ b/src/market_data/historical/common/decoders.rs @@ -287,12 +287,19 @@ fn parse_schedule_date(text: &str) -> Result { fn parse_bar_date(text: &str, time_zone: &Tz) -> Result { if text.len() == 8 { + // Format: YYYYMMDD let date_format = format_description!("[year][month][day]"); let bar_date = Date::parse(text, date_format)?; let bar_date = bar_date.with_time(time!(00:00)); Ok(bar_date.assume_timezone_utc(time_tz::timezones::db::UTC)) + } else if text.len() > 8 && text.contains(' ') { + // Format: YYYYMMDD HH:MM:SS (note: two spaces between date and time per IBKR format) + let datetime_format = format_description!("[year][month][day] [hour]:[minute]:[second]"); + let bar_datetime = PrimitiveDateTime::parse(text, datetime_format)?; + Ok(bar_datetime.assume_timezone(time_zone).unwrap()) } else { + // Unix timestamp let timestamp: i64 = text.parse()?; let date_utc = OffsetDateTime::from_unix_timestamp(timestamp).unwrap(); Ok(date_utc.to_timezone(time_zone)) diff --git a/src/subscriptions/async.rs b/src/subscriptions/async.rs index f3a49125..d31c0bf1 100644 --- a/src/subscriptions/async.rs +++ b/src/subscriptions/async.rs @@ -25,6 +25,7 @@ pub struct Subscription { _message_type: Option, context: DecoderContext, cancelled: Arc, + stream_ended: Arc, message_bus: Option>, /// Cancel message generator cancel_fn: Option>, @@ -70,6 +71,7 @@ impl Clone for Subscription { _message_type: self._message_type, context: self.context.clone(), cancelled: self.cancelled.clone(), + stream_ended: self.stream_ended.clone(), message_bus: self.message_bus.clone(), cancel_fn: self.cancel_fn.clone(), } @@ -102,6 +104,7 @@ impl Subscription { _message_type: message_type, context, cancelled: Arc::new(AtomicBool::new(false)), + stream_ended: Arc::new(AtomicBool::new(false)), message_bus: Some(message_bus), cancel_fn: None, } @@ -185,6 +188,7 @@ impl Subscription { _message_type: None, context: DecoderContext::default(), cancelled: Arc::new(AtomicBool::new(false)), + stream_ended: Arc::new(AtomicBool::new(false)), message_bus: None, cancel_fn: None, } @@ -208,8 +212,17 @@ impl Subscription { let result = decoder(context, &mut message); match process_decode_result(result) { ProcessingResult::Success(val) => return Some(Ok(val)), - ProcessingResult::EndOfStream => return None, + ProcessingResult::EndOfStream => { + self.stream_ended.store(true, Ordering::Release); + return None; + } ProcessingResult::Retry => { + // Stop retrying if stream has already ended + // This prevents spurious retries after EndOfStream when + // extra messages arrive in the message bus queue + if self.stream_ended.load(Ordering::Acquire) { + return None; + } if check_retry(retry_count) == RetryDecision::Stop { return None; } @@ -242,6 +255,7 @@ impl Subscription { } self.cancelled.store(true, Ordering::Relaxed); + self.stream_ended.store(true, Ordering::Relaxed); if let (Some(message_bus), Some(cancel_fn)) = (&self.message_bus, &self.cancel_fn) { let id = self.request_id.or(self.order_id); @@ -266,6 +280,7 @@ impl Drop for Subscription { } self.cancelled.store(true, Ordering::Relaxed); + self.stream_ended.store(true, Ordering::Relaxed); // Try to send cancel message if we have the necessary components if let (Some(message_bus), Some(cancel_fn)) = (&self.message_bus, &self.cancel_fn) {