2
2
3
3
import static com .slack .astra .server .AstraConfig .DEFAULT_ZK_TIMEOUT_SECS ;
4
4
5
+ import com .google .common .util .concurrent .ThreadFactoryBuilder ;
5
6
import com .slack .astra .util .RuntimeHalterImpl ;
6
7
import java .io .Closeable ;
7
8
import java .util .List ;
11
12
import java .util .concurrent .ConcurrentHashMap ;
12
13
import java .util .concurrent .CountDownLatch ;
13
14
import java .util .concurrent .ExecutionException ;
15
+ import java .util .concurrent .ExecutorService ;
16
+ import java .util .concurrent .Executors ;
14
17
import java .util .concurrent .TimeUnit ;
15
18
import java .util .concurrent .TimeoutException ;
16
19
import org .apache .curator .x .async .AsyncCuratorFramework ;
@@ -44,6 +47,9 @@ public class AstraMetadataStore<T extends AstraMetadata> implements Closeable {
44
47
private final Map <AstraMetadataStoreChangeListener <T >, ModeledCacheListener <T >> listenerMap =
45
48
new ConcurrentHashMap <>();
46
49
50
+ private final ExecutorService cacheInitializedService ;
51
+ private final ModeledCacheListener <T > initializedListener = getCacheInitializedListener ();
52
+
47
53
public AstraMetadataStore (
48
54
AsyncCuratorFramework curator ,
49
55
CreateMode createMode ,
@@ -64,11 +70,15 @@ public AstraMetadataStore(
64
70
modeledClient = ModeledFramework .wrap (curator , modelSpec );
65
71
66
72
if (shouldCache ) {
73
+ cacheInitializedService =
74
+ Executors .newSingleThreadExecutor (
75
+ new ThreadFactoryBuilder ().setNameFormat ("cache-initialized-service-%d" ).build ());
67
76
cachedModeledFramework = modeledClient .cached ();
68
- cachedModeledFramework .listenable ().addListener (getCacheInitializedListener () );
77
+ cachedModeledFramework .listenable ().addListener (initializedListener , cacheInitializedService );
69
78
cachedModeledFramework .start ();
70
79
} else {
71
80
cachedModeledFramework = null ;
81
+ cacheInitializedService = null ;
72
82
}
73
83
}
74
84
@@ -204,7 +214,12 @@ public void removeListener(AstraMetadataStoreChangeListener<T> watcher) {
204
214
205
215
private void awaitCacheInitialized () {
206
216
try {
207
- cacheInitialized .await ();
217
+ if (!cacheInitialized .await (30 , TimeUnit .SECONDS )) {
218
+ // in the event we deadlock, go ahead and time this out at 30s and restart the pod
219
+ new RuntimeHalterImpl ()
220
+ .handleFatal (
221
+ new TimeoutException ("Timed out waiting for Zookeeper cache to initialize" ));
222
+ }
208
223
} catch (InterruptedException e ) {
209
224
new RuntimeHalterImpl ().handleFatal (e );
210
225
}
@@ -221,6 +236,14 @@ public void accept(Type type, ZPath path, Stat stat, T model) {
221
236
public void initialized () {
222
237
ModeledCacheListener .super .initialized ();
223
238
cacheInitialized .countDown ();
239
+
240
+ // after it's initialized, we no longer need the listener or executor
241
+ if (cachedModeledFramework != null ) {
242
+ cachedModeledFramework .listenable ().removeListener (initializedListener );
243
+ }
244
+ if (cacheInitializedService != null ) {
245
+ cacheInitializedService .shutdown ();
246
+ }
224
247
}
225
248
};
226
249
}
0 commit comments