Skip to content

Commit fc44c58

Browse files
committed
graph, node: Cleanup FileLogStore files on startup
- Keep logs within retention_hours of now, skipping cleanup if --log-store-retention-hours=0
1 parent bcbb425 commit fc44c58

File tree

6 files changed

+326
-69
lines changed

6 files changed

+326
-69
lines changed

graph/src/components/log_store/file.rs

Lines changed: 274 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -12,29 +12,38 @@ use super::{LogEntry, LogMeta, LogQuery, LogStore, LogStoreError};
1212

1313
pub struct FileLogStore {
1414
directory: PathBuf,
15-
// TODO: Implement log rotation when file exceeds max_file_size
16-
#[allow(dead_code)]
17-
max_file_size: u64,
18-
// TODO: Implement automatic cleanup of logs older than retention_days
19-
#[allow(dead_code)]
20-
retention_days: u32,
15+
retention_hours: u32,
2116
}
2217

2318
impl FileLogStore {
24-
pub fn new(
25-
directory: PathBuf,
26-
max_file_size: u64,
27-
retention_days: u32,
28-
) -> Result<Self, LogStoreError> {
19+
pub fn new(directory: PathBuf, retention_hours: u32) -> Result<Self, LogStoreError> {
2920
// Create directory if it doesn't exist
3021
std::fs::create_dir_all(&directory)
3122
.map_err(|e| LogStoreError::InitializationFailed(e.into()))?;
3223

33-
Ok(Self {
24+
let store = Self {
3425
directory,
35-
max_file_size,
36-
retention_days,
37-
})
26+
retention_hours,
27+
};
28+
29+
// Run cleanup on startup for all existing log files
30+
if retention_hours > 0 {
31+
if let Ok(entries) = std::fs::read_dir(&store.directory) {
32+
for entry in entries.filter_map(Result::ok) {
33+
let path = entry.path();
34+
35+
// Only process .jsonl files
36+
if path.extension().and_then(|s| s.to_str()) == Some("jsonl") {
37+
// Run cleanup, but don't fail initialization if cleanup fails
38+
if let Err(e) = store.cleanup_old_logs(&path) {
39+
eprintln!("Warning: Failed to cleanup old logs for {:?}: {}", path, e);
40+
}
41+
}
42+
}
43+
}
44+
}
45+
46+
Ok(store)
3847
}
3948

4049
/// Get log file path for a subgraph
@@ -95,6 +104,57 @@ impl FileLogStore {
95104

96105
true
97106
}
107+
108+
/// Delete log entries older than retention_hours
109+
fn cleanup_old_logs(&self, file_path: &std::path::Path) -> Result<(), LogStoreError> {
110+
if self.retention_hours == 0 {
111+
return Ok(()); // Cleanup disabled, keep all logs
112+
}
113+
114+
use chrono::{DateTime, Duration, Utc};
115+
use std::io::Write;
116+
117+
// Calculate cutoff time
118+
let cutoff = Utc::now() - Duration::hours(self.retention_hours as i64);
119+
120+
// Read all log entries
121+
let file = File::open(file_path).map_err(|e| LogStoreError::QueryFailed(e.into()))?;
122+
let reader = BufReader::new(file);
123+
124+
let kept_entries: Vec<String> = reader
125+
.lines()
126+
.filter_map(|line| line.ok())
127+
.filter(|line| {
128+
// Parse timestamp from log entry
129+
if let Some(entry) = self.parse_line(line) {
130+
// Parse RFC3339 timestamp
131+
if let Ok(timestamp) = DateTime::parse_from_rfc3339(&entry.timestamp) {
132+
return timestamp.with_timezone(&Utc) >= cutoff;
133+
}
134+
}
135+
// Keep if we can't parse (don't delete on error)
136+
true
137+
})
138+
.collect();
139+
140+
// Write filtered file atomically
141+
let temp_path = file_path.with_extension("jsonl.tmp");
142+
let mut temp_file =
143+
File::create(&temp_path).map_err(|e| LogStoreError::QueryFailed(e.into()))?;
144+
145+
for entry in kept_entries {
146+
writeln!(temp_file, "{}", entry).map_err(|e| LogStoreError::QueryFailed(e.into()))?;
147+
}
148+
149+
temp_file
150+
.sync_all()
151+
.map_err(|e| LogStoreError::QueryFailed(e.into()))?;
152+
153+
// Atomic rename
154+
std::fs::rename(&temp_path, file_path).map_err(|e| LogStoreError::QueryFailed(e.into()))?;
155+
156+
Ok(())
157+
}
98158
}
99159

100160
/// Helper struct to enable timestamp-based comparisons for BinaryHeap
@@ -185,8 +245,11 @@ impl LogStore for FileLogStore {
185245
.map(|Reverse(te)| te.entry)
186246
.collect();
187247

188-
// Sort by timestamp descending (most recent first)
189-
result.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
248+
// Sort by timestamp (direction based on query)
249+
match query.order_direction {
250+
super::OrderDirection::Desc => result.sort_by(|a, b| b.timestamp.cmp(&a.timestamp)),
251+
super::OrderDirection::Asc => result.sort_by(|a, b| a.timestamp.cmp(&b.timestamp)),
252+
}
190253

191254
// Apply skip and take to get the final page
192255
Ok(result
@@ -231,7 +294,7 @@ mod tests {
231294
#[test]
232295
fn test_file_log_store_initialization() {
233296
let temp_dir = TempDir::new().unwrap();
234-
let store = FileLogStore::new(temp_dir.path().to_path_buf(), 1024 * 1024, 30);
297+
let store = FileLogStore::new(temp_dir.path().to_path_buf(), 0);
235298
assert!(store.is_ok());
236299

237300
let store = store.unwrap();
@@ -241,7 +304,7 @@ mod tests {
241304
#[test]
242305
fn test_log_file_path() {
243306
let temp_dir = TempDir::new().unwrap();
244-
let store = FileLogStore::new(temp_dir.path().to_path_buf(), 1024 * 1024, 30).unwrap();
307+
let store = FileLogStore::new(temp_dir.path().to_path_buf(), 0).unwrap();
245308

246309
let subgraph_id = DeploymentHash::new("QmTest").unwrap();
247310
let path = store.log_file_path(&subgraph_id);
@@ -252,7 +315,7 @@ mod tests {
252315
#[tokio::test]
253316
async fn test_query_nonexistent_file() {
254317
let temp_dir = TempDir::new().unwrap();
255-
let store = FileLogStore::new(temp_dir.path().to_path_buf(), 1024 * 1024, 30).unwrap();
318+
let store = FileLogStore::new(temp_dir.path().to_path_buf(), 0).unwrap();
256319

257320
let query = LogQuery {
258321
subgraph_id: DeploymentHash::new("QmNonexistent").unwrap(),
@@ -262,6 +325,7 @@ mod tests {
262325
search: None,
263326
first: 100,
264327
skip: 0,
328+
order_direction: super::super::OrderDirection::Desc,
265329
};
266330

267331
let result = store.query_logs(query).await;
@@ -272,7 +336,7 @@ mod tests {
272336
#[tokio::test]
273337
async fn test_query_with_sample_data() {
274338
let temp_dir = TempDir::new().unwrap();
275-
let store = FileLogStore::new(temp_dir.path().to_path_buf(), 1024 * 1024, 30).unwrap();
339+
let store = FileLogStore::new(temp_dir.path().to_path_buf(), 0).unwrap();
276340

277341
let subgraph_id = DeploymentHash::new("QmTest").unwrap();
278342
let file_path = store.log_file_path(&subgraph_id);
@@ -303,6 +367,7 @@ mod tests {
303367
search: None,
304368
first: 100,
305369
skip: 0,
370+
order_direction: super::super::OrderDirection::Desc,
306371
};
307372

308373
let result = store.query_logs(query).await;
@@ -318,7 +383,7 @@ mod tests {
318383
#[tokio::test]
319384
async fn test_query_with_level_filter() {
320385
let temp_dir = TempDir::new().unwrap();
321-
let store = FileLogStore::new(temp_dir.path().to_path_buf(), 1024 * 1024, 30).unwrap();
386+
let store = FileLogStore::new(temp_dir.path().to_path_buf(), 0).unwrap();
322387

323388
let subgraph_id = DeploymentHash::new("QmTest").unwrap();
324389
let file_path = store.log_file_path(&subgraph_id);
@@ -351,6 +416,7 @@ mod tests {
351416
search: None,
352417
first: 100,
353418
skip: 0,
419+
order_direction: super::super::OrderDirection::Desc,
354420
};
355421

356422
let result = store.query_logs(query).await;
@@ -360,4 +426,190 @@ mod tests {
360426
assert_eq!(entries.len(), 2);
361427
assert!(entries.iter().all(|e| e.level == LogLevel::Error));
362428
}
429+
430+
#[tokio::test]
431+
async fn test_cleanup_old_logs() {
432+
use chrono::{Duration, Utc};
433+
434+
let temp_dir = TempDir::new().unwrap();
435+
let store = FileLogStore::new(temp_dir.path().to_path_buf(), 24).unwrap();
436+
437+
let subgraph_id = DeploymentHash::new("QmTest").unwrap();
438+
let file_path = store.log_file_path(&subgraph_id);
439+
440+
// Create test data with old and new entries
441+
let mut file = File::create(&file_path).unwrap();
442+
443+
// Old entry (48 hours ago)
444+
let old_timestamp =
445+
(Utc::now() - Duration::hours(48)).to_rfc3339_opts(chrono::SecondsFormat::Secs, true);
446+
let old_entry = FileLogDocument {
447+
id: "log-old".to_string(),
448+
subgraph_id: "QmTest".to_string(),
449+
timestamp: old_timestamp,
450+
level: "info".to_string(),
451+
text: "Old log entry".to_string(),
452+
arguments: vec![],
453+
meta: FileLogMeta {
454+
module: "test.ts".to_string(),
455+
line: 1,
456+
column: 1,
457+
},
458+
};
459+
writeln!(file, "{}", serde_json::to_string(&old_entry).unwrap()).unwrap();
460+
461+
// New entry (12 hours ago)
462+
let new_timestamp =
463+
(Utc::now() - Duration::hours(12)).to_rfc3339_opts(chrono::SecondsFormat::Secs, true);
464+
let new_entry = FileLogDocument {
465+
id: "log-new".to_string(),
466+
subgraph_id: "QmTest".to_string(),
467+
timestamp: new_timestamp,
468+
level: "info".to_string(),
469+
text: "New log entry".to_string(),
470+
arguments: vec![],
471+
meta: FileLogMeta {
472+
module: "test.ts".to_string(),
473+
line: 2,
474+
column: 1,
475+
},
476+
};
477+
writeln!(file, "{}", serde_json::to_string(&new_entry).unwrap()).unwrap();
478+
drop(file);
479+
480+
// Run cleanup
481+
store.cleanup_old_logs(&file_path).unwrap();
482+
483+
// Query to verify only new entry remains
484+
let query = LogQuery {
485+
subgraph_id,
486+
level: None,
487+
from: None,
488+
to: None,
489+
search: None,
490+
first: 100,
491+
skip: 0,
492+
order_direction: super::super::OrderDirection::Desc,
493+
};
494+
495+
let result = store.query_logs(query).await.unwrap();
496+
assert_eq!(result.len(), 1);
497+
assert_eq!(result[0].id, "log-new");
498+
}
499+
500+
#[tokio::test]
501+
async fn test_cleanup_keeps_unparseable_entries() {
502+
let temp_dir = TempDir::new().unwrap();
503+
let store = FileLogStore::new(temp_dir.path().to_path_buf(), 24).unwrap();
504+
505+
let subgraph_id = DeploymentHash::new("QmTest").unwrap();
506+
let file_path = store.log_file_path(&subgraph_id);
507+
508+
// Create test data with valid and unparseable entries
509+
let mut file = File::create(&file_path).unwrap();
510+
511+
// Valid entry
512+
let valid_entry = FileLogDocument {
513+
id: "log-valid".to_string(),
514+
subgraph_id: "QmTest".to_string(),
515+
timestamp: chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true),
516+
level: "info".to_string(),
517+
text: "Valid entry".to_string(),
518+
arguments: vec![],
519+
meta: FileLogMeta {
520+
module: "test.ts".to_string(),
521+
line: 1,
522+
column: 1,
523+
},
524+
};
525+
writeln!(file, "{}", serde_json::to_string(&valid_entry).unwrap()).unwrap();
526+
527+
// Unparseable entry (invalid JSON)
528+
writeln!(file, "{{invalid json}}").unwrap();
529+
530+
// Entry with invalid timestamp
531+
writeln!(
532+
file,
533+
r#"{{"id":"log-bad-time","subgraphId":"QmTest","timestamp":"not-a-timestamp","level":"info","text":"Bad timestamp","arguments":[],"meta":{{"module":"test.ts","line":2,"column":1}}}}"#
534+
)
535+
.unwrap();
536+
drop(file);
537+
538+
// Run cleanup
539+
store.cleanup_old_logs(&file_path).unwrap();
540+
541+
// Read file contents directly
542+
let file_contents = std::fs::read_to_string(&file_path).unwrap();
543+
let lines: Vec<&str> = file_contents.lines().collect();
544+
545+
// All 3 entries should be kept (don't delete on error)
546+
assert_eq!(lines.len(), 3);
547+
}
548+
549+
#[tokio::test]
550+
async fn test_startup_cleanup() {
551+
use chrono::{Duration, Utc};
552+
553+
let temp_dir = TempDir::new().unwrap();
554+
555+
// Create a log file with old entries before initializing the store
556+
let file_path = temp_dir.path().join("QmTestStartup.jsonl");
557+
let mut file = File::create(&file_path).unwrap();
558+
559+
// Old entry (48 hours ago)
560+
let old_timestamp =
561+
(Utc::now() - Duration::hours(48)).to_rfc3339_opts(chrono::SecondsFormat::Secs, true);
562+
let old_entry = FileLogDocument {
563+
id: "log-old".to_string(),
564+
subgraph_id: "QmTestStartup".to_string(),
565+
timestamp: old_timestamp,
566+
level: "info".to_string(),
567+
text: "Old log entry".to_string(),
568+
arguments: vec![],
569+
meta: FileLogMeta {
570+
module: "test.ts".to_string(),
571+
line: 1,
572+
column: 1,
573+
},
574+
};
575+
writeln!(file, "{}", serde_json::to_string(&old_entry).unwrap()).unwrap();
576+
577+
// New entry (12 hours ago)
578+
let new_timestamp =
579+
(Utc::now() - Duration::hours(12)).to_rfc3339_opts(chrono::SecondsFormat::Secs, true);
580+
let new_entry = FileLogDocument {
581+
id: "log-new".to_string(),
582+
subgraph_id: "QmTestStartup".to_string(),
583+
timestamp: new_timestamp,
584+
level: "info".to_string(),
585+
text: "New log entry".to_string(),
586+
arguments: vec![],
587+
meta: FileLogMeta {
588+
module: "test.ts".to_string(),
589+
line: 2,
590+
column: 1,
591+
},
592+
};
593+
writeln!(file, "{}", serde_json::to_string(&new_entry).unwrap()).unwrap();
594+
drop(file);
595+
596+
// Initialize store with 24-hour retention - should cleanup on startup
597+
let store = FileLogStore::new(temp_dir.path().to_path_buf(), 24).unwrap();
598+
599+
// Verify old entry was cleaned up
600+
let query = LogQuery {
601+
subgraph_id: DeploymentHash::new("QmTestStartup").unwrap(),
602+
level: None,
603+
from: None,
604+
to: None,
605+
search: None,
606+
first: 100,
607+
skip: 0,
608+
order_direction: super::super::OrderDirection::Desc,
609+
};
610+
611+
let result = store.query_logs(query).await.unwrap();
612+
assert_eq!(result.len(), 1);
613+
assert_eq!(result[0].id, "log-new");
614+
}
363615
}

0 commit comments

Comments
 (0)