Skip to content

Commit 688827a

Browse files
committed
graph: Optimize FileLogStore query_logs to stream logs files
Do not load entire log file into memory, stream chunks out, and filter out entries that don't match query early on
1 parent 7fe2532 commit 688827a

File tree

1 file changed

+79
-29
lines changed
  • graph/src/components/log_store

1 file changed

+79
-29
lines changed

graph/src/components/log_store/file.rs

Lines changed: 79 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use async_trait::async_trait;
22
use serde::{Deserialize, Serialize};
3+
use std::cmp::Reverse;
4+
use std::collections::BinaryHeap;
35
use std::fs::File;
46
use std::io::{BufRead, BufReader};
57
use std::path::PathBuf;
@@ -95,6 +97,32 @@ impl FileLogStore {
9597
}
9698
}
9799

100+
/// Helper struct to enable timestamp-based comparisons for BinaryHeap
101+
/// Implements Ord based on timestamp field for maintaining a min-heap of recent entries
102+
struct TimestampedEntry {
103+
entry: LogEntry,
104+
}
105+
106+
impl PartialEq for TimestampedEntry {
107+
fn eq(&self, other: &Self) -> bool {
108+
self.entry.timestamp == other.entry.timestamp
109+
}
110+
}
111+
112+
impl Eq for TimestampedEntry {}
113+
114+
impl PartialOrd for TimestampedEntry {
115+
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
116+
Some(self.cmp(other))
117+
}
118+
}
119+
120+
impl Ord for TimestampedEntry {
121+
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
122+
self.entry.timestamp.cmp(&other.entry.timestamp)
123+
}
124+
}
125+
98126
#[async_trait]
99127
impl LogStore for FileLogStore {
100128
async fn query_logs(&self, query: LogQuery) -> Result<Vec<LogEntry>, LogStoreError> {
@@ -105,45 +133,67 @@ impl LogStore for FileLogStore {
105133
}
106134

107135
let file = File::open(&file_path).map_err(|e| LogStoreError::QueryFailed(e.into()))?;
108-
109136
let reader = BufReader::new(file);
110-
let mut entries = Vec::new();
111-
let mut skipped = 0;
112-
113-
// Read all lines and collect matching entries
114-
// Note: For large files, this loads everything into memory
115-
// A production implementation would use reverse iteration or indexing
116-
let all_entries: Vec<LogEntry> = reader
117-
.lines()
118-
.filter_map(|line| line.ok())
119-
.filter_map(|line| self.parse_line(&line))
120-
.collect();
121137

122-
// Sort by timestamp descending (most recent first)
123-
let mut sorted_entries = all_entries;
124-
sorted_entries.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
138+
// Calculate how many entries we need to keep in memory
139+
// We need skip + first entries to handle pagination
140+
let needed_entries = (query.skip + query.first) as usize;
141+
142+
// Use a min-heap (via Reverse) to maintain only the top N most recent entries
143+
// This bounds memory usage to O(skip + first) instead of O(total_log_entries)
144+
let mut top_entries: BinaryHeap<Reverse<TimestampedEntry>> =
145+
BinaryHeap::with_capacity(needed_entries + 1);
146+
147+
// Stream through the file line-by-line, applying filters and maintaining bounded collection
148+
for line in reader.lines() {
149+
// Skip malformed lines
150+
let line = match line {
151+
Ok(l) => l,
152+
Err(_) => continue,
153+
};
125154

126-
// Apply filters and pagination
127-
for entry in sorted_entries {
128-
if !self.matches_filters(&entry, &query) {
129-
continue;
130-
}
155+
// Parse the line into a LogEntry
156+
let entry = match self.parse_line(&line) {
157+
Some(e) => e,
158+
None => continue,
159+
};
131160

132-
// Skip the first N entries
133-
if skipped < query.skip {
134-
skipped += 1;
161+
// Apply filters early to avoid keeping filtered-out entries in memory
162+
if !self.matches_filters(&entry, &query) {
135163
continue;
136164
}
137165

138-
entries.push(entry);
139-
140-
// Stop once we have enough entries
141-
if entries.len() >= query.first as usize {
142-
break;
166+
let timestamped = TimestampedEntry { entry };
167+
168+
// Maintain only the top N most recent entries by timestamp
169+
// BinaryHeap with Reverse creates a min-heap, so we can efficiently
170+
// keep the N largest (most recent) timestamps
171+
if top_entries.len() < needed_entries {
172+
top_entries.push(Reverse(timestamped));
173+
} else if let Some(Reverse(oldest)) = top_entries.peek() {
174+
// If this entry is more recent than the oldest in our heap, replace it
175+
if timestamped.entry.timestamp > oldest.entry.timestamp {
176+
top_entries.pop();
177+
top_entries.push(Reverse(timestamped));
178+
}
143179
}
144180
}
145181

146-
Ok(entries)
182+
// Convert heap to sorted vector (most recent first)
183+
let mut result: Vec<LogEntry> = top_entries
184+
.into_iter()
185+
.map(|Reverse(te)| te.entry)
186+
.collect();
187+
188+
// Sort by timestamp descending (most recent first)
189+
result.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
190+
191+
// Apply skip and take to get the final page
192+
Ok(result
193+
.into_iter()
194+
.skip(query.skip as usize)
195+
.take(query.first as usize)
196+
.collect())
147197
}
148198

149199
fn is_available(&self) -> bool {

0 commit comments

Comments
 (0)