Skip to content

Commit

Permalink
remove expired logs and minor refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
ragibkl committed Aug 22, 2024
1 parent 70cb612 commit e7b49c3
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 44 deletions.
8 changes: 4 additions & 4 deletions src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ use axum::{
};
use handlebars::Handlebars;

use crate::logs_store::{LogsStore, Query};
use crate::logs_store::{DNSQueryLog, LogsStore};

static GET_LOGS_TEMPLATE: &str = include_str!("./get_logs.hbs");

#[derive(serde::Serialize, Debug, Clone)]
pub struct GetLogsOutput {
ip: String,
queries: Vec<Query>,
queries: Vec<DNSQueryLog>,
}

fn get_ip(addr: SocketAddr) -> String {
Expand All @@ -33,7 +33,7 @@ pub async fn get_logs_api(
tracing::info!("get_logs_api - addr: {addr}");

let ip = get_ip(addr);
let queries = logs_store.get_queries_for_ip(&ip);
let queries = logs_store.get_logs_for_ip(&ip);

Json(GetLogsOutput { ip, queries })
}
Expand All @@ -46,7 +46,7 @@ pub async fn get_logs(
tracing::info!("get_logs - addr: {addr}");

let ip = get_ip(addr);
let queries = logs_store.get_queries_for_ip(&ip);
let queries = logs_store.get_logs_for_ip(&ip);

let reg = Handlebars::new();
let response = reg
Expand Down
82 changes: 43 additions & 39 deletions src/logs_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ pub struct RawLog {
}

#[derive(serde::Serialize, Debug, Clone, PartialEq)]
pub struct Query {
pub struct DNSQueryLog {
ip: String,
query_time: chrono::DateTime<Utc>,
question: String,
answers: Vec<String>,
Expand All @@ -43,9 +44,10 @@ fn parse_query_time(query_time: &str) -> DateTime<Utc> {
query_time.and_utc()
}

fn extract_query(raw_log: &RawLog) -> Query {
let response_message = &raw_log.message.response_message;
fn extract_query(raw_log: &RawLog) -> DNSQueryLog {
let ip = raw_log.message.query_address.to_string();
let query_time = parse_query_time(&raw_log.message.query_time);
let response_message = &raw_log.message.response_message;

let question: String = response_message
.split('\n')
Expand All @@ -65,34 +67,34 @@ fn extract_query(raw_log: &RawLog) -> Query {
.map(|s| s.to_string())
.collect();

Query {
DNSQueryLog {
ip,
query_time,
question,
answers,
}
}

fn extract_queries(content: &str) -> HashMap<String, Vec<Query>> {
let mut logs: Vec<RawLog> = Vec::new();
fn extract_queries(content: &str) -> HashMap<String, Vec<DNSQueryLog>> {
let mut raw_logs: Vec<RawLog> = Vec::new();
for document in serde_yaml::Deserializer::from_str(content) {
let Ok(log) = RawLog::deserialize(document) else {
continue;
};

logs.push(log);
raw_logs.push(log);
}

let mut logs_store: HashMap<String, Vec<Query>> = HashMap::new();
for log in logs.into_iter() {
let ip = log.message.query_address.to_string();
let query = extract_query(&log);
let mut logs_store: HashMap<String, Vec<DNSQueryLog>> = HashMap::new();
for raw_log in raw_logs.into_iter() {
let query_log = extract_query(&raw_log);

match logs_store.get_mut(&ip) {
match logs_store.get_mut(&query_log.ip) {
Some(queries) => {
queries.push(query);
queries.push(query_log);
}
None => {
logs_store.insert(ip, vec![query]);
logs_store.insert(query_log.ip.to_string(), vec![query_log]);
}
}
}
Expand All @@ -102,26 +104,28 @@ fn extract_queries(content: &str) -> HashMap<String, Vec<Query>> {

#[derive(Debug, Clone, Default)]
pub struct LogsStore {
logs_store: Arc<Mutex<HashMap<String, Arc<Mutex<Vec<Query>>>>>>,
logs_store: Arc<Mutex<HashMap<String, Arc<Mutex<Vec<DNSQueryLog>>>>>>,
}

impl LogsStore {
pub fn update_logs(&self, logs: HashMap<String, Vec<Query>>) {
pub fn remove_expired_logs(&self) {
let query_time_cutoff = Utc::now() - Duration::minutes(10);

let logs_store_guard = self.logs_store.lock().unwrap();
for queries in logs_store_guard.values() {
queries
.lock()
.unwrap()
.retain(|q| q.query_time > query_time_cutoff);
}
}

pub fn merge_logs(&self, logs: HashMap<String, Vec<DNSQueryLog>>) {
let mut logs_store_guard = self.logs_store.lock().unwrap();
for (ip, queries) in logs.into_iter() {
match logs_store_guard.get(&ip).cloned() {
Some(existing) => {
let mut filtered = existing
.lock()
.unwrap()
.clone()
.into_iter()
.filter(|q| q.query_time > query_time_cutoff)
.collect::<Vec<_>>();
filtered.extend(queries);
*existing.lock().unwrap() = filtered;
existing.lock().unwrap().extend(queries);
}
None => {
logs_store_guard.insert(ip, Arc::new(Mutex::new(queries)));
Expand All @@ -130,22 +134,21 @@ impl LogsStore {
}
}

pub fn get_queries_for_ip(&self, ip: &str) -> Vec<Query> {
match self.logs_store.lock().unwrap().get(ip) {
Some(v) => v.lock().unwrap().clone(),
None => Vec::new(),
}
}
pub fn ingest_logs_from_file(&self) {
self.remove_expired_logs();

pub fn read_logs(&self) {
let Ok(content) = std::fs::read_to_string("./logs.yaml") else {
return;
};
let content = std::fs::read_to_string("./logs.yaml").unwrap_or_default();
let _ = std::fs::write("./logs.yaml", "");

let logs_store = extract_queries(&content);

self.update_logs(logs_store);
self.merge_logs(logs_store);
}

pub fn get_logs_for_ip(&self, ip: &str) -> Vec<DNSQueryLog> {
match self.logs_store.lock().unwrap().get(ip) {
Some(v) => v.lock().unwrap().clone(),
None => Vec::new(),
}
}
}

Expand All @@ -155,7 +158,7 @@ mod tests {

use chrono::TimeZone;

use crate::logs_store::{parse_query_time, Query};
use crate::logs_store::{parse_query_time, DNSQueryLog};

use super::extract_queries;

Expand Down Expand Up @@ -201,7 +204,8 @@ message:

let expected = HashMap::from([(
"127.0.0.1".to_string(),
vec![Query {
vec![DNSQueryLog {
ip: "127.0.0.1".to_string(),
query_time: chrono::Utc.with_ymd_and_hms(2022, 2, 26, 9, 25, 7).unwrap(),
question: ";zedo.com.IN A".to_string(),
answers: vec![
Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ async fn main() -> anyhow::Result<()> {
}

tracing::info!("Reading logs");
cloned_logs_store.read_logs();
cloned_logs_store.ingest_logs_from_file();
tracing::info!("Reading logs. DONE");
}
});
Expand Down

0 comments on commit e7b49c3

Please sign in to comment.