Skip to content

Commit 7ecd327

Browse files
committed
experiment
1 parent ae551e6 commit 7ecd327

File tree

9 files changed

+200
-169
lines changed

9 files changed

+200
-169
lines changed

graph/proto/tracing.proto

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,18 @@ syntax = "proto3";
22

33
package graph.tracing.v1;
44

5-
service Stream {
6-
rpc QueryTrace(Request) returns (stream Trace);
7-
}
5+
service Stream { rpc QueryTrace(Request) returns (stream Response); }
86

9-
message Request {
10-
int32 deployment_id = 1;
11-
}
7+
message Request { int32 deployment_id = 1; }
8+
9+
message Response { repeated Trace traces = 1; }
1210

1311
message Trace {
14-
int32 deployment_id = 1;
15-
string query = 2;
16-
uint64 duration_millis = 3;
12+
int32 deployment_id = 1;
13+
string query = 2;
14+
uint64 duration_millis = 3;
15+
uint32 children = 4;
16+
optional uint64 conn_wait_millis = 5;
17+
optional uint64 permit_wait_millis = 6;
18+
optional uint64 entity_count = 7;
1719
}

graph/src/components/tracing.rs

Lines changed: 89 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,24 @@
1+
use lazy_static::lazy_static;
2+
13
use std::{collections::HashMap, sync::Arc, time::Duration};
24

3-
use futures03::TryFutureExt;
4-
use slog::{o, Logger};
55
use tokio::sync::{mpsc, watch::Receiver, RwLock};
66

7-
use crate::prelude::LoggerFactory;
8-
97
use super::store::DeploymentId;
108

119
const DEFAULT_BUFFER_SIZE: usize = 100;
10+
#[cfg(not(test))]
11+
const INDEXER_WATCHER_INTERVAL: Duration = Duration::from_secs(10);
12+
#[cfg(test)]
13+
const INDEXER_WATCHER_INTERVAL: Duration = Duration::from_millis(100);
14+
lazy_static! {
15+
pub static ref TRACING_RUNTIME: tokio::runtime::Runtime =
16+
tokio::runtime::Builder::new_multi_thread()
17+
.worker_threads(1)
18+
.enable_all()
19+
.build()
20+
.unwrap();
21+
}
1222

1323
#[derive(Debug, Clone)]
1424
pub struct Subscriptions<T> {
@@ -33,59 +43,42 @@ pub struct TracingControl<T> {
3343
default_buffer_size: usize,
3444
}
3545

36-
// impl<T: Send + Clone + 'static> Default for TracingControl<T> {
37-
// fn default() -> Self {
38-
// let subscriptions = Subscriptions::default();
39-
// let subs = subscriptions.clone();
40-
// let watcher = std::thread::spawn(move || {
41-
// let runtime = tokio::runtime::Builder::new_multi_thread()
42-
// .enable_all()
43-
// .build()
44-
// .unwrap();
45-
// runtime.block_on()
46-
// })
47-
// .join()
48-
// .unwrap()
49-
// .unwrap();
50-
51-
// Self {
52-
// subscriptions,
53-
// default_buffer_size: DEFAULT_BUFFER_SIZE,
54-
// watcher,
55-
// }
56-
// }
57-
// }
58-
5946
impl<T: Send + Clone + 'static> TracingControl<T> {
60-
pub async fn start() -> Self {
47+
/// Starts a new tracing control instance.If an async runtime is not available, a new one will be created.
48+
pub fn start() -> Self {
49+
Self::new(DEFAULT_BUFFER_SIZE)
50+
}
51+
52+
pub fn new(buffer_size: usize) -> Self {
6153
let subscriptions = Subscriptions::default();
6254
let subs = subscriptions.clone();
63-
let watcher = indexer_watcher::new_watcher(
64-
#[cfg(test)]
65-
Duration::from_millis(100),
66-
#[cfg(not(test))]
67-
Duration::from_secs(30),
68-
move || {
69-
let subs = subs.clone();
70-
71-
async move { Ok(subs.inner.read().await.clone()) }
72-
},
73-
)
74-
.await
55+
56+
let watcher = std::thread::spawn(move || {
57+
let handle =
58+
tokio::runtime::Handle::try_current().unwrap_or(TRACING_RUNTIME.handle().clone());
59+
60+
handle.block_on(async move {
61+
indexer_watcher::new_watcher(INDEXER_WATCHER_INTERVAL, move || {
62+
let subs = subs.clone();
63+
64+
async move { Ok(subs.inner.read().await.clone()) }
65+
})
66+
.await
67+
})
68+
})
69+
.join()
70+
.unwrap()
7571
.unwrap();
72+
7673
Self {
7774
watcher,
7875
subscriptions,
79-
default_buffer_size: DEFAULT_BUFFER_SIZE,
76+
default_buffer_size: buffer_size,
8077
}
8178
}
82-
// pub fn new(default_buffer_size: Option<usize>) -> Self {
83-
// Self {
84-
// default_buffer_size: default_buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE),
85-
// ..Default::default()
86-
// }
87-
// }
8879

80+
/// Returns a producer for a given deployment ID. If the producer is closed, it will return None.
81+
/// The producer could still be closed in the meantime.
8982
pub fn producer(&self, key: DeploymentId) -> Option<mpsc::Sender<T>> {
9083
self.watcher
9184
.borrow()
@@ -94,6 +87,8 @@ impl<T: Send + Clone + 'static> TracingControl<T> {
9487
.filter(|sender| !sender.is_closed())
9588
}
9689

90+
/// Creates a new subscription for a given deployment ID with a given buffer size. If a subscription already
91+
/// exists, it will be replaced.
9792
pub async fn subscribe_with_chan_size(
9893
&self,
9994
key: DeploymentId,
@@ -118,26 +113,53 @@ impl<T: Send + Clone + 'static> TracingControl<T> {
118113
mod test {
119114

120115
use anyhow::anyhow;
116+
use tokio::time::{self, Instant};
121117
use tokio_retry::Retry;
122118

123119
use super::*;
124-
use std::{future::IntoFuture, sync::Arc};
120+
use std::sync::Arc;
121+
122+
#[tokio::test]
123+
async fn test_watcher() {
124+
let x = time::Instant::now();
125+
let x = indexer_watcher::new_watcher(Duration::from_millis(10), move || {
126+
let x = x.clone();
127+
128+
async move {
129+
let now = Instant::now();
130+
Ok(now.duration_since(x))
131+
}
132+
})
133+
.await
134+
.unwrap();
135+
136+
Retry::spawn(vec![Duration::from_secs(10); 3].into_iter(), move || {
137+
let x = x.clone();
138+
async move {
139+
let count = x.borrow().clone();
140+
println!("{}", count.as_millis());
141+
Err::<Duration, anyhow::Error>(anyhow!("millis: {}", count.as_millis()))
142+
}
143+
})
144+
.await
145+
.unwrap();
146+
}
125147

126148
#[tokio::test]
127149
async fn test_tracing_control() {
128-
let control: TracingControl<()> = TracingControl::start().await;
150+
let control: TracingControl<()> = TracingControl::start();
129151
let control = Arc::new(control);
130152

131153
// produce before subscription
132154
let tx = control.producer(DeploymentId(123));
133155
assert!(tx.is_none());
134156

135157
// drop the subscription
136-
let rx = control.subscribe(DeploymentId(123));
158+
let rx = control.subscribe(DeploymentId(123)).await;
137159

138160
let c = control.clone();
139161
// check subscription is none because channel is closed
140-
let tx = Retry::spawn(vec![Duration::from_secs(5); 10].into_iter(), move || {
162+
let tx = Retry::spawn(vec![INDEXER_WATCHER_INTERVAL; 2].into_iter(), move || {
141163
let control = c.clone();
142164
async move {
143165
match control.producer(DeploymentId(123)) {
@@ -158,9 +180,22 @@ mod test {
158180
assert!(tx.is_none());
159181

160182
// re-create subscription
161-
let _rx = control.subscribe(DeploymentId(123));
183+
let _rx = control.subscribe(DeploymentId(123)).await;
184+
162185
// check old subscription was replaced
163-
let tx = control.producer(DeploymentId(123));
164-
assert!(!tx.unwrap().is_closed())
186+
let c = control.clone();
187+
let tx = Retry::spawn(vec![INDEXER_WATCHER_INTERVAL; 2].into_iter(), move || {
188+
let tx = c.producer(DeploymentId(123));
189+
async move {
190+
match tx {
191+
Some(sender) if !sender.is_closed() => Ok(sender),
192+
Some(_) => Err(anyhow!("Sender is closed")),
193+
None => Err(anyhow!("Sender not created yet")),
194+
}
195+
}
196+
})
197+
.await
198+
.unwrap();
199+
assert!(!tx.is_closed())
165200
}
166201
}

graph/src/env/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,8 @@ pub struct EnvVars {
247247
/// Set by the environment variable `GRAPH_FIREHOSE_FETCH_BLOCK_TIMEOUT_SECS`.
248248
/// The default value is 60 seconds.
249249
pub firehose_block_fetch_timeout: u64,
250+
/// Set by the environment variable `GRAPH_ENABLE_TRACING_GRPC_SERVER`.
251+
pub enable_tracing_grpc_server: bool,
250252
}
251253

252254
impl EnvVars {
@@ -339,6 +341,7 @@ impl EnvVars {
339341
block_write_capacity: inner.block_write_capacity.0,
340342
firehose_block_fetch_retry_limit: inner.firehose_block_fetch_retry_limit,
341343
firehose_block_fetch_timeout: inner.firehose_block_fetch_timeout,
344+
enable_tracing_grpc_server: inner.enable_tracing_grpc_server,
342345
})
343346
}
344347

@@ -506,6 +509,8 @@ struct Inner {
506509
firehose_block_fetch_retry_limit: usize,
507510
#[envconfig(from = "GRAPH_FIREHOSE_FETCH_BLOCK_TIMEOUT_SECS", default = "60")]
508511
firehose_block_fetch_timeout: u64,
512+
#[envconfig(from = "GRAPH_NODE_ENABLE_QUERY_TRACING_GRPC", default = "false")]
513+
enable_tracing_grpc_server: EnvVarBoolean,
509514
}
510515

511516
#[derive(Clone, Debug)]

graph/src/grpc/pb/graph.tracing.v1.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,27 @@ pub struct Request {
77
}
88
#[allow(clippy::derive_partial_eq_without_eq)]
99
#[derive(Clone, PartialEq, ::prost::Message)]
10+
pub struct Response {
11+
#[prost(message, repeated, tag = "1")]
12+
pub traces: ::prost::alloc::vec::Vec<Trace>,
13+
}
14+
#[allow(clippy::derive_partial_eq_without_eq)]
15+
#[derive(Clone, PartialEq, ::prost::Message)]
1016
pub struct Trace {
1117
#[prost(int32, tag = "1")]
1218
pub deployment_id: i32,
1319
#[prost(string, tag = "2")]
1420
pub query: ::prost::alloc::string::String,
1521
#[prost(uint64, tag = "3")]
1622
pub duration_millis: u64,
23+
#[prost(uint32, tag = "4")]
24+
pub children: u32,
25+
#[prost(uint64, optional, tag = "5")]
26+
pub conn_wait_millis: ::core::option::Option<u64>,
27+
#[prost(uint64, optional, tag = "6")]
28+
pub permit_wait_millis: ::core::option::Option<u64>,
29+
#[prost(uint64, optional, tag = "7")]
30+
pub entity_count: ::core::option::Option<u64>,
1731
}
1832
/// Generated client implementations.
1933
pub mod stream_client {
@@ -104,7 +118,7 @@ pub mod stream_client {
104118
&mut self,
105119
request: impl tonic::IntoRequest<super::Request>,
106120
) -> std::result::Result<
107-
tonic::Response<tonic::codec::Streaming<super::Trace>>,
121+
tonic::Response<tonic::codec::Streaming<super::Response>>,
108122
tonic::Status,
109123
> {
110124
self.inner
@@ -136,7 +150,7 @@ pub mod stream_server {
136150
pub trait Stream: Send + Sync + 'static {
137151
/// Server streaming response type for the QueryTrace method.
138152
type QueryTraceStream: tonic::codegen::tokio_stream::Stream<
139-
Item = std::result::Result<super::Trace, tonic::Status>,
153+
Item = std::result::Result<super::Response, tonic::Status>,
140154
>
141155
+ Send
142156
+ 'static;
@@ -229,7 +243,7 @@ pub mod stream_server {
229243
struct QueryTraceSvc<T: Stream>(pub Arc<T>);
230244
impl<T: Stream> tonic::server::ServerStreamingService<super::Request>
231245
for QueryTraceSvc<T> {
232-
type Response = super::Trace;
246+
type Response = super::Response;
233247
type ResponseStream = T::QueryTraceStream;
234248
type Future = BoxFuture<
235249
tonic::Response<Self::ResponseStream>,

node/src/main.rs

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,9 @@ async fn main() {
154154
// Obtain metrics server port
155155
let metrics_port = opt.metrics_port;
156156

157+
// Obtain tracing server port
158+
let tracing_grpc_port = opt.grpc_port;
159+
157160
// Obtain the fork base URL
158161
let fork_base = match &opt.fork_base {
159162
Some(url) => {
@@ -517,9 +520,11 @@ async fn main() {
517520
// Run the index node server
518521
graph::spawn(async move { index_node_server.start(index_node_port).await });
519522

520-
graph::spawn(async move {
521-
graph_server_grpc::start(8888).await.unwrap();
522-
});
523+
if env_vars.enable_tracing_grpc_server {
524+
graph::spawn(async move {
525+
graph_server_grpc::start(tracing_grpc_port).await.unwrap();
526+
});
527+
}
523528

524529
graph::spawn(async move {
525530
metrics_server
@@ -564,19 +569,6 @@ async fn main() {
564569
}
565570
});
566571

567-
let mut rx = TRACING_CONTROL.subscribe(DeploymentId(1)).await;
568-
loop {
569-
let trace = rx.recv().await;
570-
match trace {
571-
Some(trace) => {
572-
info!(&logger, "#### trace: {:?}", trace);
573-
}
574-
None => {
575-
break;
576-
}
577-
}
578-
}
579-
580572
graph::futures03::future::pending::<()>().await;
581573
}
582574

node/src/opt.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,13 @@ pub struct Opt {
154154
help = "Port for the Prometheus metrics server"
155155
)]
156156
pub metrics_port: u16,
157+
#[clap(
158+
long,
159+
default_value = "8060",
160+
value_name = "PORT",
161+
help = "Port for the query tracing GRPC Server"
162+
)]
163+
pub grpc_port: u16,
157164
#[clap(
158165
long,
159166
default_value = "default",

0 commit comments

Comments
 (0)