Skip to content

Commit ae551e6

Browse files
committed
wut
1 parent 2732f2b commit ae551e6

File tree

1 file changed

+49
-38
lines changed

1 file changed

+49
-38
lines changed

graph/src/components/tracing.rs

Lines changed: 49 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
use std::{collections::HashMap, sync::Arc, time::Duration};
22

3-
use slog::{o, Logger };
3+
use futures03::TryFutureExt;
4+
use slog::{o, Logger};
45
use tokio::sync::{mpsc, watch::Receiver, RwLock};
56

7+
use crate::prelude::LoggerFactory;
8+
69
use super::store::DeploymentId;
710

811
const DEFAULT_BUFFER_SIZE: usize = 100;
@@ -30,49 +33,58 @@ pub struct TracingControl<T> {
3033
default_buffer_size: usize,
3134
}
3235

33-
impl<T: Send + Clone + 'static> Default for TracingControl<T> {
34-
fn default() -> Self {
36+
// impl<T: Send + Clone + 'static> Default for TracingControl<T> {
37+
// fn default() -> Self {
38+
// let subscriptions = Subscriptions::default();
39+
// let subs = subscriptions.clone();
40+
// let watcher = std::thread::spawn(move || {
41+
// let runtime = tokio::runtime::Builder::new_multi_thread()
42+
// .enable_all()
43+
// .build()
44+
// .unwrap();
45+
// runtime.block_on()
46+
// })
47+
// .join()
48+
// .unwrap()
49+
// .unwrap();
50+
51+
// Self {
52+
// subscriptions,
53+
// default_buffer_size: DEFAULT_BUFFER_SIZE,
54+
// watcher,
55+
// }
56+
// }
57+
// }
58+
59+
impl<T: Send + Clone + 'static> TracingControl<T> {
60+
pub async fn start() -> Self {
3561
let subscriptions = Subscriptions::default();
3662
let subs = subscriptions.clone();
37-
let watcher = std::thread::spawn(move || {
38-
let runtime = tokio::runtime::Builder::new_multi_thread()
39-
.enable_all()
40-
.build()
41-
.unwrap();
42-
runtime.block_on(indexer_watcher::new_watcher(
43-
#[cfg(test)]
44-
Duration::from_millis(100),
45-
#[cfg(not(test))]
46-
Duration::from_secs(30),
47-
move || {
48-
let subs = subs.clone();
49-
50-
async move {
51-
Logger::root(StdLog{}, o!())
52-
Ok(subs.inner.read().await.clone())
53-
}
54-
},
55-
))
56-
})
57-
.join()
58-
.unwrap()
63+
let watcher = indexer_watcher::new_watcher(
64+
#[cfg(test)]
65+
Duration::from_millis(100),
66+
#[cfg(not(test))]
67+
Duration::from_secs(30),
68+
move || {
69+
let subs = subs.clone();
70+
71+
async move { Ok(subs.inner.read().await.clone()) }
72+
},
73+
)
74+
.await
5975
.unwrap();
60-
6176
Self {
77+
watcher,
6278
subscriptions,
6379
default_buffer_size: DEFAULT_BUFFER_SIZE,
64-
watcher,
65-
}
66-
}
67-
}
68-
69-
impl<T: Send + Clone + 'static> TracingControl<T> {
70-
pub fn new(default_buffer_size: Option<usize>) -> Self {
71-
Self {
72-
default_buffer_size: default_buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE),
73-
..Default::default()
7480
}
7581
}
82+
// pub fn new(default_buffer_size: Option<usize>) -> Self {
83+
// Self {
84+
// default_buffer_size: default_buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE),
85+
// ..Default::default()
86+
// }
87+
// }
7688

7789
pub fn producer(&self, key: DeploymentId) -> Option<mpsc::Sender<T>> {
7890
self.watcher
@@ -113,7 +125,7 @@ mod test {
113125

114126
#[tokio::test]
115127
async fn test_tracing_control() {
116-
let control: TracingControl<()> = TracingControl::default();
128+
let control: TracingControl<()> = TracingControl::start().await;
117129
let control = Arc::new(control);
118130

119131
// produce before subscription
@@ -136,7 +148,6 @@ mod test {
136148
}
137149
}
138150
})
139-
.into_future()
140151
.await
141152
.unwrap();
142153
assert!(!tx.is_closed());

0 commit comments

Comments
 (0)