Fix: Prevent spurious retry attempts after subscription EndOfStream #397
+23
−1
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Fixes #396 - Prevents spurious retry attempts after a subscription completes (receives
EndOfStream).Problem
After a subscription completes (e.g.,
OpenOrderEndmessage), stray messages remaining in the message bus queue triggered unnecessary retry attempts. This caused:Solution
Track stream completion state with
stream_ended: Arc<AtomicBool>field to prevent retries afterEndOfStream:Key improvement: The
stream_endedflag is stored as a struct field (not a local variable), so it persists across asyncnext()calls. This prevents retries even when stray messages arrive in the shared message bus queue after the subscription has completed.Changes
stream_ended: Arc<AtomicBool>field toSubscription<T>structfalsein all constructorsCloneimplementation to include the fieldtrueonEndOfStream,cancel(), anddrop()UnexpectedResponseTesting
All existing tests pass. The fix is internal state management with no API changes.
Note
This fix addresses spurious retries for single subscription instances. For applications that create multiple subscriptions (e.g., polling
open_orders()every 30 seconds), additional caching may be needed at the application layer to prevent creating duplicate subscriptions on the shared message bus.