@@ -9,6 +9,10 @@ use crate::prelude::LoggerFactory;
99use super :: store:: DeploymentId ;
1010
1111const DEFAULT_BUFFER_SIZE : usize = 100 ;
12+ #[ cfg( not( test) ) ]
13+ const INDEXER_WATCHER_INTERVAL : Duration = Duration :: from_secs ( 30 ) ;
14+ #[ cfg( test) ]
15+ const INDEXER_WATCHER_INTERVAL : Duration = Duration :: from_millis ( 100 ) ;
1216
1317#[ derive( Debug , Clone ) ]
1418pub struct Subscriptions < T > {
@@ -57,22 +61,33 @@ pub struct TracingControl<T> {
5761// }
5862
5963impl < T : Send + Clone + ' static > TracingControl < T > {
60- pub async fn start ( ) -> Self {
64+ pub fn start ( ) -> Self {
6165 let subscriptions = Subscriptions :: default ( ) ;
6266 let subs = subscriptions. clone ( ) ;
63- let watcher = indexer_watcher:: new_watcher (
64- #[ cfg( test) ]
65- Duration :: from_millis ( 100 ) ,
66- #[ cfg( not( test) ) ]
67- Duration :: from_secs ( 30 ) ,
68- move || {
69- let subs = subs. clone ( ) ;
70-
71- async move { Ok ( subs. inner . read ( ) . await . clone ( ) ) }
72- } ,
73- )
74- . await
67+
68+ let watcher = std:: thread:: spawn ( move || {
69+ let handle = tokio:: runtime:: Handle :: try_current ( ) . unwrap_or (
70+ tokio:: runtime:: Builder :: new_current_thread ( )
71+ . enable_all ( )
72+ . build ( )
73+ . unwrap ( )
74+ . handle ( )
75+ . clone ( ) ,
76+ ) ;
77+
78+ handle. block_on ( async move {
79+ indexer_watcher:: new_watcher ( INDEXER_WATCHER_INTERVAL , move || {
80+ let subs = subs. clone ( ) ;
81+
82+ async move { Ok ( subs. inner . read ( ) . await . clone ( ) ) }
83+ } )
84+ . await
85+ } )
86+ } )
87+ . join ( )
88+ . unwrap ( )
7589 . unwrap ( ) ;
90+
7691 Self {
7792 watcher,
7893 subscriptions,
@@ -118,10 +133,37 @@ impl<T: Send + Clone + 'static> TracingControl<T> {
118133mod test {
119134
120135 use anyhow:: anyhow;
136+ use tokio:: time:: { self , Instant } ;
121137 use tokio_retry:: Retry ;
122138
123139 use super :: * ;
124- use std:: { future:: IntoFuture , sync:: Arc } ;
140+ use std:: sync:: Arc ;
141+
142+ #[ tokio:: test]
143+ async fn test_watcher ( ) {
144+ let x = time:: Instant :: now ( ) ;
145+ let x = indexer_watcher:: new_watcher ( Duration :: from_millis ( 10 ) , move || {
146+ let x = x. clone ( ) ;
147+
148+ async move {
149+ let now = Instant :: now ( ) ;
150+ Ok ( now. duration_since ( x) )
151+ }
152+ } )
153+ . await
154+ . unwrap ( ) ;
155+
156+ Retry :: spawn ( vec ! [ Duration :: from_secs( 10 ) ; 3 ] . into_iter ( ) , move || {
157+ let x = x. clone ( ) ;
158+ async move {
159+ let count = x. borrow ( ) . clone ( ) ;
160+ println ! ( "{}" , count. as_millis( ) ) ;
161+ Err :: < Duration , anyhow:: Error > ( anyhow ! ( "millis: {}" , count. as_millis( ) ) )
162+ }
163+ } )
164+ . await
165+ . unwrap ( ) ;
166+ }
125167
126168 #[ tokio:: test]
127169 async fn test_tracing_control ( ) {
@@ -133,11 +175,11 @@ mod test {
133175 assert ! ( tx. is_none( ) ) ;
134176
135177 // drop the subscription
136- let rx = control. subscribe ( DeploymentId ( 123 ) ) ;
178+ let rx = control. subscribe ( DeploymentId ( 123 ) ) . await ;
137179
138180 let c = control. clone ( ) ;
139181 // check subscription is none because channel is closed
140- let tx = Retry :: spawn ( vec ! [ Duration :: from_secs ( 5 ) ; 10 ] . into_iter ( ) , move || {
182+ let tx = Retry :: spawn ( vec ! [ INDEXER_WATCHER_INTERVAL ; 2 ] . into_iter ( ) , move || {
141183 let control = c. clone ( ) ;
142184 async move {
143185 match control. producer ( DeploymentId ( 123 ) ) {
@@ -158,9 +200,22 @@ mod test {
158200 assert ! ( tx. is_none( ) ) ;
159201
160202 // re-create subscription
161- let _rx = control. subscribe ( DeploymentId ( 123 ) ) ;
203+ let _rx = control. subscribe ( DeploymentId ( 123 ) ) . await ;
204+
162205 // check old subscription was replaced
163- let tx = control. producer ( DeploymentId ( 123 ) ) ;
164- assert ! ( !tx. unwrap( ) . is_closed( ) )
206+ let c = control. clone ( ) ;
207+ let tx = Retry :: spawn ( vec ! [ INDEXER_WATCHER_INTERVAL ; 2 ] . into_iter ( ) , move || {
208+ let tx = c. producer ( DeploymentId ( 123 ) ) ;
209+ async move {
210+ match tx {
211+ Some ( sender) if !sender. is_closed ( ) => Ok ( sender) ,
212+ Some ( _) => Err ( anyhow ! ( "Sender is closed" ) ) ,
213+ None => Err ( anyhow ! ( "Sender not created yet" ) ) ,
214+ }
215+ }
216+ } )
217+ . await
218+ . unwrap ( ) ;
219+ assert ! ( !tx. is_closed( ) )
165220 }
166221}
0 commit comments