@@ -6,8 +6,7 @@ use tokio::time::sleep;
66use tracing:: { debug, info, instrument} ;
77
88use crate :: pocketoption:: {
9- error:: PocketOptionError , parser:: message:: WebSocketMessage , types:: info:: MessageInfo ,
10- validators:: history_validator,
9+ parser:: message:: WebSocketMessage , types:: info:: MessageInfo ,
1110} ;
1211use binary_options_tools_core:: {
1312 error:: { BinaryOptionsResult , BinaryOptionsToolsError } ,
@@ -23,29 +22,14 @@ impl PocketCallback {
2322 async fn update_assets (
2423 data : & Data < PocketData , WebSocketMessage > ,
2524 sender : & SenderMessage ,
26- config : & Config < PocketData , WebSocketMessage , ( ) > ,
2725 ) -> BinaryOptionsResult < ( ) > {
2826 for asset in data. stream_assets ( ) . await {
27+ // Send 3 messages, 1: change symbol, 2: unsubscribe symbol, 3: subscribe symbol
28+ debug ! ( "Updating asset: {asset}" ) ;
29+ sender. send ( WebSocketMessage :: ChangeSymbol ( ChangeSymbol :: new ( asset. to_string ( ) , 1 ) ) ) . await ?;
30+ sender. send ( WebSocketMessage :: Unsubfor ( asset. to_string ( ) ) ) . await ?;
31+ sender. send ( WebSocketMessage :: Subfor ( asset. to_string ( ) ) ) . await ?;
2932 sleep ( Duration :: from_secs ( 1 ) ) . await ;
30- let history = ChangeSymbol :: new ( asset. to_string ( ) , 3600 ) ;
31- let res = sender
32- . send_message_with_timout (
33- config. get_timeout ( ) ?,
34- "SubscribeSymbolCallback" ,
35- data,
36- WebSocketMessage :: ChangeSymbol ( history) ,
37- MessageInfo :: UpdateHistoryNewFast ,
38- & history_validator ( asset. to_string ( ) , 3600 ) ,
39- )
40- . await ?;
41- if let WebSocketMessage :: UpdateHistoryNewFast ( _) = res {
42- debug ! ( "Sent 'ChangeSymbol' for asset: {asset}" ) ;
43- } else {
44- return Err ( PocketOptionError :: UnexpectedIncorrectWebSocketMessage (
45- res. information ( ) ,
46- )
47- . into ( ) ) ;
48- }
4933 }
5034 Ok ( ( ) )
5135 }
@@ -75,15 +59,15 @@ impl WCallback for PocketCallback {
7559 type Transfer = WebSocketMessage ;
7660 type U = ( ) ;
7761
78- #[ instrument( skip( self , data, sender, config ) ) ]
62+ #[ instrument( skip( self , data, sender, _config ) ) ]
7963 async fn call (
8064 & self ,
8165 data : Data < Self :: T , Self :: Transfer > ,
8266 sender : & SenderMessage ,
83- config : & Config < Self :: T , Self :: Transfer , Self :: U > ,
67+ _config : & Config < Self :: T , Self :: Transfer , Self :: U > ,
8468 ) -> BinaryOptionsResult < ( ) > {
8569 // let sender = sender.clone();
86- let update_assets_future = Self :: update_assets ( & data, sender, config ) ;
70+ let update_assets_future = Self :: update_assets ( & data, sender) ;
8771 let update_check_results_future = Self :: update_check_results ( & data) ;
8872 try_join ( update_assets_future, update_check_results_future) . await ?;
8973 Ok ( ( ) )
0 commit comments