Skip to content

Commit dcdb5e6

Browse files
committed
graph: Refactor log drains to share common code
- Create graph/src/log/common.rs for common log drain functionality - SimpleKVSerializer: Concatenates KV pairs to strings - VecKVSerializer: Collects KV pairs into Vec<(String, String)> - HashMapKVSerializer: Collects KV pairs into HashMap - LogMeta: Shared metadata structure (module, line, column) - LogEntryBuilder: Builder for common log entry fields - level_to_str(): Converts slog::Level to string - create_async_logger(): Consistent async logger creation - Updated FileDrain, LokiDrain, and ElasticDrain to use the log common utilities
1 parent 8d033db commit dcdb5e6

File tree

5 files changed

+274
-378
lines changed

5 files changed

+274
-378
lines changed

graph/src/log/common.rs

Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
use std::collections::HashMap;
2+
use std::fmt;
3+
4+
use serde::Serialize;
5+
use slog::*;
6+
7+
/// Serializer for concatenating key-value arguments into a string
8+
pub struct SimpleKVSerializer {
9+
kvs: Vec<(String, String)>,
10+
}
11+
12+
impl SimpleKVSerializer {
13+
pub fn new() -> Self {
14+
Self { kvs: Vec::new() }
15+
}
16+
17+
/// Returns the number of key-value pairs and the concatenated string
18+
pub fn finish(self) -> (usize, String) {
19+
(
20+
self.kvs.len(),
21+
self.kvs
22+
.iter()
23+
.map(|(k, v)| format!("{}: {}", k, v))
24+
.collect::<Vec<_>>()
25+
.join(", "),
26+
)
27+
}
28+
}
29+
30+
impl Serializer for SimpleKVSerializer {
31+
fn emit_arguments(&mut self, key: Key, val: &fmt::Arguments) -> slog::Result {
32+
self.kvs.push((key.into(), val.to_string()));
33+
Ok(())
34+
}
35+
}
36+
37+
/// Serializer for extracting key-value pairs into a Vec
38+
pub struct VecKVSerializer {
39+
kvs: Vec<(String, String)>,
40+
}
41+
42+
impl VecKVSerializer {
43+
pub fn new() -> Self {
44+
Self { kvs: Vec::new() }
45+
}
46+
47+
pub fn finish(self) -> Vec<(String, String)> {
48+
self.kvs
49+
}
50+
}
51+
52+
impl Serializer for VecKVSerializer {
53+
fn emit_arguments(&mut self, key: Key, val: &fmt::Arguments) -> slog::Result {
54+
self.kvs.push((key.into(), val.to_string()));
55+
Ok(())
56+
}
57+
}
58+
59+
/// Serializer for extracting key-value pairs into a HashMap
60+
pub struct HashMapKVSerializer {
61+
kvs: Vec<(String, String)>,
62+
}
63+
64+
impl HashMapKVSerializer {
65+
pub fn new() -> Self {
66+
HashMapKVSerializer { kvs: Vec::new() }
67+
}
68+
69+
pub fn finish(self) -> HashMap<String, String> {
70+
self.kvs.into_iter().collect()
71+
}
72+
}
73+
74+
impl Serializer for HashMapKVSerializer {
75+
fn emit_arguments(&mut self, key: Key, val: &fmt::Arguments) -> slog::Result {
76+
self.kvs.push((key.into(), val.to_string()));
77+
Ok(())
78+
}
79+
}
80+
81+
/// Log metadata structure
82+
#[derive(Clone, Debug, Serialize)]
83+
#[serde(rename_all = "camelCase")]
84+
pub struct LogMeta {
85+
pub module: String,
86+
pub line: i64,
87+
pub column: i64,
88+
}
89+
90+
/// Converts an slog Level to a string representation
91+
pub fn level_to_str(level: Level) -> &'static str {
92+
match level {
93+
Level::Critical => "critical",
94+
Level::Error => "error",
95+
Level::Warning => "warning",
96+
Level::Info => "info",
97+
Level::Debug => "debug",
98+
Level::Trace => "trace",
99+
}
100+
}
101+
102+
/// Builder for common log entry fields across different drain implementations
103+
pub struct LogEntryBuilder<'a> {
104+
record: &'a Record<'a>,
105+
values: &'a OwnedKVList,
106+
subgraph_id: String,
107+
timestamp: String,
108+
}
109+
110+
impl<'a> LogEntryBuilder<'a> {
111+
pub fn new(
112+
record: &'a Record<'a>,
113+
values: &'a OwnedKVList,
114+
subgraph_id: String,
115+
timestamp: String,
116+
) -> Self {
117+
Self {
118+
record,
119+
values,
120+
subgraph_id,
121+
timestamp,
122+
}
123+
}
124+
125+
/// Builds the log ID in the format: subgraph_id-timestamp
126+
pub fn build_id(&self) -> String {
127+
format!("{}-{}", self.subgraph_id, self.timestamp)
128+
}
129+
130+
/// Builds the text field by concatenating the message with all key-value pairs
131+
pub fn build_text(&self) -> String {
132+
// Serialize logger arguments
133+
let mut serializer = SimpleKVSerializer::new();
134+
self.record
135+
.kv()
136+
.serialize(self.record, &mut serializer)
137+
.expect("failed to serialize logger arguments");
138+
let (n_logger_kvs, logger_kvs) = serializer.finish();
139+
140+
// Serialize log message arguments
141+
let mut serializer = SimpleKVSerializer::new();
142+
self.values
143+
.serialize(self.record, &mut serializer)
144+
.expect("failed to serialize log message arguments");
145+
let (n_value_kvs, value_kvs) = serializer.finish();
146+
147+
// Build text with all key-value pairs
148+
let mut text = format!("{}", self.record.msg());
149+
if n_logger_kvs > 0 {
150+
use std::fmt::Write;
151+
write!(text, ", {}", logger_kvs).unwrap();
152+
}
153+
if n_value_kvs > 0 {
154+
use std::fmt::Write;
155+
write!(text, ", {}", value_kvs).unwrap();
156+
}
157+
158+
text
159+
}
160+
161+
/// Builds arguments as a Vec of tuples (for file drain)
162+
pub fn build_arguments_vec(&self) -> Vec<(String, String)> {
163+
let mut serializer = VecKVSerializer::new();
164+
self.record
165+
.kv()
166+
.serialize(self.record, &mut serializer)
167+
.expect("failed to serialize log message arguments into vec");
168+
serializer.finish()
169+
}
170+
171+
/// Builds arguments as a HashMap (for elastic and loki drains)
172+
pub fn build_arguments_map(&self) -> HashMap<String, String> {
173+
let mut serializer = HashMapKVSerializer::new();
174+
self.record
175+
.kv()
176+
.serialize(self.record, &mut serializer)
177+
.expect("failed to serialize log message arguments into hash map");
178+
serializer.finish()
179+
}
180+
181+
/// Builds metadata from the log record
182+
pub fn build_meta(&self) -> LogMeta {
183+
LogMeta {
184+
module: self.record.module().into(),
185+
line: self.record.line() as i64,
186+
column: self.record.column() as i64,
187+
}
188+
}
189+
190+
/// Gets the level as a string
191+
pub fn level_str(&self) -> &'static str {
192+
level_to_str(self.record.level())
193+
}
194+
195+
/// Gets the timestamp
196+
pub fn timestamp(&self) -> &str {
197+
&self.timestamp
198+
}
199+
200+
/// Gets the subgraph ID
201+
pub fn subgraph_id(&self) -> &str {
202+
&self.subgraph_id
203+
}
204+
}
205+
206+
/// Creates a new asynchronous logger with consistent configuration
207+
pub fn create_async_logger<D>(drain: D, chan_size: usize, use_block_overflow: bool) -> Logger
208+
where
209+
D: Drain + Send + 'static,
210+
D::Err: std::fmt::Debug,
211+
{
212+
let mut builder = slog_async::Async::new(drain.fuse()).chan_size(chan_size);
213+
214+
if use_block_overflow {
215+
builder = builder.overflow_strategy(slog_async::OverflowStrategy::Block);
216+
}
217+
218+
Logger::root(builder.build().fuse(), o!())
219+
}

0 commit comments

Comments
 (0)