Skip to content

Commit f07cdce

Browse files
committed
query tracing
1 parent 7b786cb commit f07cdce

File tree

17 files changed

+706
-14
lines changed

17 files changed

+706
-14
lines changed

Cargo.lock

Lines changed: 15 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,15 @@ bs58 = "0.5.1"
3434
chrono = "0.4.38"
3535
clap = { version = "4.5.4", features = ["derive", "env"] }
3636
derivative = "2.2.0"
37-
diesel = { version = "2.2.4", features = ["postgres", "serde_json", "numeric", "r2d2", "chrono", "uuid", "i-implement-a-third-party-backend-and-opt-into-breaking-changes"] }
37+
diesel = { version = "2.2.4", features = [
38+
"postgres",
39+
"serde_json",
40+
"numeric",
41+
"r2d2",
42+
"chrono",
43+
"uuid",
44+
"i-implement-a-third-party-backend-and-opt-into-breaking-changes",
45+
] }
3846
diesel-derive-enum = { version = "2.1.0", features = ["postgres"] }
3947
diesel-dynamic-schema = { version = "0.2.1", features = ["postgres"] }
4048
diesel_derives = "2.1.4"
@@ -56,7 +64,10 @@ serde_derive = "1.0.125"
5664
serde_json = { version = "1.0", features = ["arbitrary_precision"] }
5765
serde_regex = "1.1.0"
5866
serde_yaml = "0.9.21"
59-
slog = { version = "2.7.0", features = ["release_max_level_trace", "max_level_trace"] }
67+
slog = { version = "2.7.0", features = [
68+
"release_max_level_trace",
69+
"max_level_trace",
70+
] }
6071
sqlparser = "0.46.0"
6172
strum = { version = "0.26", features = ["derive"] }
6273
syn = { version = "2.0.66", features = ["full"] }

graph/build.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,10 @@ fn main() {
2525
.out_dir("src/substreams_rpc")
2626
.compile(&["proto/substreams-rpc.proto"], &["proto"])
2727
.expect("Failed to compile Substreams RPC proto(s)");
28+
29+
tonic_build::configure()
30+
.out_dir("src/grpc/pb")
31+
.include_file("mod.rs")
32+
.compile(&["proto/tracing.proto"], &["proto"])
33+
.expect("Failed to compile Tracing proto(s)");
2834
}

graph/proto/tracing.proto

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
syntax = "proto3";
2+
3+
package graph.tracing.v1;
4+
5+
service Stream {
6+
rpc QueryTrace(Request) returns (stream Trace);
7+
}
8+
9+
message Request {
10+
int32 deployment_id = 1;
11+
}
12+
13+
message Trace {
14+
int32 deployment_id = 1;
15+
string query = 2;
16+
uint64 duration_millis = 3;
17+
}

graph/src/components/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ pub mod metrics;
6060
/// Components dealing with versioning
6161
pub mod versions;
6262

63+
pub mod tracing;
64+
6365
/// A component that receives events of type `T`.
6466
pub trait EventConsumer<E> {
6567
/// Get the event sink.

graph/src/components/tracing.rs

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
use std::{collections::HashMap, sync::atomic::AtomicBool};
2+
3+
use tokio::sync::{mpsc, RwLock};
4+
5+
use super::store::DeploymentId;
6+
7+
const DEFAULT_BUFFER_SIZE: usize = 100;
8+
9+
#[derive(Debug)]
10+
pub struct Subscriptions<T> {
11+
inner: RwLock<HashMap<DeploymentId, mpsc::Sender<T>>>,
12+
}
13+
14+
/// A control structure for managing tracing subscriptions.
15+
#[derive(Debug)]
16+
pub struct TracingControl<T> {
17+
enabled: AtomicBool,
18+
subscriptions: Subscriptions<T>,
19+
default_buffer_size: usize,
20+
}
21+
22+
impl<T> Default for TracingControl<T> {
23+
fn default() -> Self {
24+
Self {
25+
enabled: AtomicBool::new(false),
26+
subscriptions: Subscriptions {
27+
inner: RwLock::new(HashMap::new()),
28+
},
29+
default_buffer_size: DEFAULT_BUFFER_SIZE,
30+
}
31+
}
32+
}
33+
34+
impl<T> TracingControl<T> {
35+
pub fn new(default_buffer_size: Option<usize>) -> Self {
36+
Self {
37+
enabled: AtomicBool::new(false),
38+
subscriptions: Subscriptions {
39+
inner: RwLock::new(HashMap::new()),
40+
},
41+
default_buffer_size: default_buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE),
42+
}
43+
}
44+
45+
/// Creates a channel sender for the given deployment ID. Only one subscription
46+
/// can exist for a given deployment ID. If tracing is disabled or no subscription
47+
/// exists, it will return None. Calling producer when a dead subscription exists
48+
/// will incur a cleanup cost.
49+
pub async fn producer(&self, key: DeploymentId) -> Option<mpsc::Sender<T>> {
50+
if !self.enabled.load(std::sync::atomic::Ordering::Relaxed) {
51+
return None;
52+
}
53+
54+
let subs = self.subscriptions.inner.read().await;
55+
let tx = subs.get(&key);
56+
57+
match tx {
58+
Some(tx) if tx.is_closed() => {
59+
drop(subs);
60+
let mut subs = self.subscriptions.inner.write().await;
61+
subs.remove(&key);
62+
63+
if subs.is_empty() {
64+
self.enabled
65+
.store(false, std::sync::atomic::Ordering::Relaxed);
66+
}
67+
68+
None
69+
}
70+
None => None,
71+
tx => tx.cloned(),
72+
}
73+
}
74+
pub async fn subscribe_with_chan_size(
75+
&self,
76+
key: DeploymentId,
77+
buffer_size: usize,
78+
) -> mpsc::Receiver<T> {
79+
let (tx, rx) = mpsc::channel(buffer_size);
80+
let mut guard = self.subscriptions.inner.write().await;
81+
guard.insert(key, tx);
82+
self.enabled
83+
.store(true, std::sync::atomic::Ordering::Relaxed);
84+
85+
rx
86+
}
87+
88+
/// Creates a new subscription for a given deployment ID. If a subscription already
89+
/// exists, it will be replaced.
90+
pub async fn subscribe(&self, key: DeploymentId) -> mpsc::Receiver<T> {
91+
self.subscribe_with_chan_size(key, self.default_buffer_size)
92+
.await
93+
}
94+
}
95+
96+
#[cfg(test)]
97+
mod test {
98+
99+
use super::*;
100+
use std::sync::atomic::Ordering::Relaxed;
101+
use std::sync::Arc;
102+
103+
#[tokio::test]
104+
async fn test_tracing_control() {
105+
let control: TracingControl<()> = TracingControl::default();
106+
let control = Arc::new(control);
107+
assert_eq!(false, control.enabled.load(Relaxed));
108+
109+
let tx = control.producer(DeploymentId(123)).await;
110+
assert!(tx.is_none());
111+
112+
let rx = control.subscribe(DeploymentId(123)).await;
113+
assert_eq!(true, control.enabled.load(Relaxed));
114+
115+
drop(rx);
116+
let tx = control.producer(DeploymentId(123)).await;
117+
assert!(tx.is_none());
118+
assert_eq!(false, control.enabled.load(Relaxed));
119+
120+
_ = control.subscribe(DeploymentId(123)).await;
121+
assert_eq!(true, control.enabled.load(Relaxed));
122+
}
123+
}

graph/src/data/query/trace.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ lazy_static! {
1414
pub static ref TRACE_NONE: Arc<Trace> = Arc::new(Trace::None);
1515
}
1616

17-
#[derive(Debug, CacheWeight)]
17+
#[derive(Debug, CacheWeight, Clone)]
1818
pub struct TraceWithCacheStatus {
1919
pub trace: Arc<Trace>,
2020
pub cache_status: CacheStatus,
@@ -35,7 +35,7 @@ impl HttpTrace {
3535
}
3636
}
3737

38-
#[derive(Debug, CacheWeight)]
38+
#[derive(Debug, CacheWeight, Clone)]
3939
pub enum Trace {
4040
None,
4141
Root {

graph/src/grpc/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub mod pb;

0 commit comments

Comments
 (0)