diff --git a/core/src/timestamp.rs b/core/src/timestamp.rs index f80f88e..6026eb8 100644 --- a/core/src/timestamp.rs +++ b/core/src/timestamp.rs @@ -18,6 +18,7 @@ const DAYS_IN_MONTH: [u8; 12] = [31, 30, 31, 30, 31, 31, 30, 31, 30, 31, 31, 29] #[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct Timestamp(Duration); +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct Parts { pub years: u16, pub months: u8, @@ -50,6 +51,100 @@ impl Timestamp { std::time::SystemTime::UNIX_EPOCH + self.0 } + pub fn from_parts(parts: Parts) -> Option { + let is_leap; + let start_of_year; + let year = (parts.years as i64) - 1900; + + // Fast path for years 1900 - 2038. + if year as u64 <= 138 { + let mut leaps: i64 = (year - 68) >> 2; + if (year - 68).trailing_zeros() >= 2 { + leaps -= 1; + is_leap = true; + } else { + is_leap = false; + } + + start_of_year = i128::from(31_536_000 * (year - 70) + 86400 * leaps); + } else { + let centuries: i64; + let mut leaps: i64; + + let mut cycles: i64 = (year - 100) / 400; + let mut rem: i64 = (year - 100) % 400; + + if rem < 0 { + cycles -= 1; + rem += 400 + } + if rem == 0 { + is_leap = true; + centuries = 0; + leaps = 0; + } else { + if rem >= 200 { + if rem >= 300 { + centuries = 3; + rem -= 300; + } else { + centuries = 2; + rem -= 200; + } + } else if rem >= 100 { + centuries = 1; + rem -= 100; + } else { + centuries = 0; + } + if rem == 0 { + is_leap = false; + leaps = 0; + } else { + leaps = rem / 4; + rem %= 4; + is_leap = rem == 0; + } + } + leaps += 97 * cycles + 24 * centuries - i64::from(is_leap); + + start_of_year = i128::from((year - 100) * 31_536_000) + + i128::from(leaps * 86400 + 946_684_800 + 86400); + } + + let seconds_within_month = 86400 * u32::from(parts.days - 1) + + 3600 * u32::from(parts.hours) + + 60 * u32::from(parts.minutes) + + u32::from(parts.seconds); + + let mut seconds_within_year = [ + 0, // Jan + 31 * 86400, // Feb + 59 * 86400, // Mar + 90 * 86400, // Apr + 120 * 86400, // May + 151 * 86400, // Jun + 181 * 86400, // Jul + 212 * 86400, // Aug + 243 * 86400, // Sep + 273 * 86400, // Oct + 304 * 86400, // Nov + 334 * 86400, // Dec + ][usize::from(parts.months - 1)] + + seconds_within_month; + + if is_leap && parts.months > 2 { + seconds_within_year += 86400 + } + + Timestamp::new(Duration::new( + (start_of_year + i128::from(seconds_within_year)) + .try_into() + .ok()?, + parts.nanos, + )) + } + pub fn to_parts(&self) -> Parts { /* Original implementation: https://github.com/tokio-rs/prost/blob/master/prost-types/src/datetime.rs @@ -199,7 +294,7 @@ fn parse_rfc3339(fmt: &str) -> Result { unimplemented!("non-UTC") } - let years = i64::from_str_radix(&fmt[0..4], 10).unwrap(); + let years = u16::from_str_radix(&fmt[0..4], 10).unwrap(); let months = u8::from_str_radix(&fmt[5..7], 10).unwrap(); let days = u8::from_str_radix(&fmt[8..10], 10).unwrap(); let hours = u8::from_str_radix(&fmt[11..13], 10).unwrap(); @@ -212,97 +307,15 @@ fn parse_rfc3339(fmt: &str) -> Result { 0 }; - let is_leap; - let start_of_year; - let year = years - 1900; - - // Fast path for years 1900 - 2038. - if year as u64 <= 138 { - let mut leaps: i64 = (year - 68) >> 2; - if (year - 68).trailing_zeros() >= 2 { - leaps -= 1; - is_leap = true; - } else { - is_leap = false; - } - - start_of_year = i128::from(31_536_000 * (year - 70) + 86400 * leaps); - } else { - let centuries: i64; - let mut leaps: i64; - - let mut cycles: i64 = (year - 100) / 400; - let mut rem: i64 = (year - 100) % 400; - - if rem < 0 { - cycles -= 1; - rem += 400 - } - if rem == 0 { - is_leap = true; - centuries = 0; - leaps = 0; - } else { - if rem >= 200 { - if rem >= 300 { - centuries = 3; - rem -= 300; - } else { - centuries = 2; - rem -= 200; - } - } else if rem >= 100 { - centuries = 1; - rem -= 100; - } else { - centuries = 0; - } - if rem == 0 { - is_leap = false; - leaps = 0; - } else { - leaps = rem / 4; - rem %= 4; - is_leap = rem == 0; - } - } - leaps += 97 * cycles + 24 * centuries - i64::from(is_leap); - - start_of_year = - i128::from((year - 100) * 31_536_000) + i128::from(leaps * 86400 + 946_684_800 + 86400); - } - - let seconds_within_month = 86400 * u32::from(days - 1) - + 3600 * u32::from(hours) - + 60 * u32::from(minutes) - + u32::from(seconds); - - let mut seconds_within_year = [ - 0, // Jan - 31 * 86400, // Feb - 59 * 86400, // Mar - 90 * 86400, // Apr - 120 * 86400, // May - 151 * 86400, // Jun - 181 * 86400, // Jul - 212 * 86400, // Aug - 243 * 86400, // Sep - 273 * 86400, // Oct - 304 * 86400, // Nov - 334 * 86400, // Dec - ][usize::from(months - 1)] - + seconds_within_month; - - if is_leap && months > 2 { - seconds_within_year += 86400 - } - - Timestamp::new(Duration::new( - (start_of_year + i128::from(seconds_within_year)) - .try_into() - .map_err(|_| ParseTimestampError {})?, + Timestamp::from_parts(Parts { + years, + months, + days, + hours, + minutes, + seconds, nanos, - )) + }) .ok_or_else(|| ParseTimestampError {}) } diff --git a/targets/file/Cargo.toml b/targets/file/Cargo.toml index 2afb552..73cbd6f 100644 --- a/targets/file/Cargo.toml +++ b/targets/file/Cargo.toml @@ -17,10 +17,5 @@ features = ["std"] [dependencies.emit_batcher] path = "../../batcher" -[dependencies.uuid] -version = "1" -features = ["v7"] - -[dependencies.time] -version = "0.3" -features = ["local-offset"] +[dependencies.rand] +version = "0.8" diff --git a/targets/file/src/lib.rs b/targets/file/src/lib.rs index 61be092..cf5e87d 100644 --- a/targets/file/src/lib.rs +++ b/targets/file/src/lib.rs @@ -1,6 +1,6 @@ use std::{ fs, - io::Write, + io::{self, Write}, ops::ControlFlow, path::{Path, PathBuf}, sync::{Arc, Condvar, Mutex}, @@ -17,17 +17,55 @@ pub fn set(file_set: impl AsRef) -> FileSetBuilder { pub struct FileSetBuilder { file_set: PathBuf, + roll_by: RollBy, + max_files: usize, + reuse_files: bool, +} + +#[derive(Debug, Clone, Copy)] +enum RollBy { + Day, + Hour, + Minute, } impl FileSetBuilder { pub fn new(file_set: impl Into) -> Self { FileSetBuilder { file_set: file_set.into(), + roll_by: RollBy::Day, + max_files: 32, + reuse_files: false, } } + pub fn roll_by_day(mut self) -> Self { + self.roll_by = RollBy::Day; + self + } + + pub fn roll_by_hour(mut self) -> Self { + self.roll_by = RollBy::Hour; + self + } + + pub fn roll_by_minute(mut self) -> Self { + self.roll_by = RollBy::Minute; + self + } + + pub fn max_files(mut self, max_files: usize) -> Self { + self.max_files = max_files; + self + } + + pub fn reuse_files(mut self, reuse_files: bool) -> Self { + self.reuse_files = reuse_files; + self + } + pub fn spawn(self) -> Result { - let (dir, prefix, ext) = dir_prefix_ext(self.file_set)?; + let (dir, file_prefix, file_ext) = dir_prefix_ext(self.file_set)?; let (sender, receiver) = emit_batcher::bounded(10_000); @@ -35,18 +73,12 @@ impl FileSetBuilder { let mut active_file = None; let _ = receiver.blocking_exec(|mut batch: Buffer| { - let now = std::time::UNIX_EPOCH.elapsed().unwrap(); - let (secs, nanos) = (now.as_secs(), now.subsec_nanos()); - let ts = emit::Timestamp::new(now).unwrap().to_parts(); - let mut file = match active_file.take() { Some(file) => file, None => { - let id = uuid::Uuid::new_v7(uuid::Timestamp::from_unix( - uuid::NoContext, - secs, - nanos, - )); + let now = std::time::UNIX_EPOCH.elapsed().unwrap(); + let ts = emit::Timestamp::new(now).unwrap(); + let parts = ts.to_parts(); let mut path = PathBuf::from(dir.clone()); @@ -54,60 +86,59 @@ impl FileSetBuilder { return Err(emit_batcher::BatchError::retry(e, batch)); } - // TODO: Should probably cache this - let read_dir = match fs::read_dir(&path) { - Ok(read_dir) => read_dir, - Err(e) => return Err(emit_batcher::BatchError::retry(e, batch)), - }; - - let mut file_set = Vec::new(); - for entry in read_dir { - let Ok(entry) = entry else { - continue; - }; - - if let Ok(file_type) = entry.file_type() { - if !file_type.is_file() { - continue; + let file_ts = file_ts(self.roll_by, parts); + + // Apply retention to the file set and see if there's an existing file that can be reused + // Files are only reused if the writer is configured to do so + let reuse_file_name = + match apply_retention(&path, &file_prefix, &file_ext, self.max_files) { + // If there's an existing file and: + // 1. We're configured to reuse files + // 2. The timestamp part of the file matches the current window + // we'll attempt to open and reuse this file + Ok(Some(last_file)) => { + if self.reuse_files + && read_file_ts(&last_file) == Some(&*file_ts) + { + Some(last_file) + } else { + None + } } - } - - let file_name = entry.file_name(); - let Some(file_name) = file_name.to_str() else { - continue; + // In any other case, a new file will be created + Ok(None) => None, + Err(e) => return Err(emit_batcher::BatchError::retry(e, batch)), }; - if file_name.starts_with(&prefix) && file_name.ends_with(&ext) { - file_set.push(file_name.to_owned()); - } - } - file_set.sort_by(|a, b| a.cmp(b).reverse()); - - while file_set.len() >= 4 { + let reuse_file = if let Some(file_name) = reuse_file_name { let mut path = path.clone(); - path.push(file_set.pop().unwrap()); + path.push(file_name); - let _ = fs::remove_file(path); - } + try_open(&path).ok() + } else { + None + }; - path.push(format!( - "{}.{:>04}-{:>02}-{:>02}.{}.{}", - prefix, - ts.years, - ts.months, - ts.days, - id.simple(), - ext - )); - - match fs::OpenOptions::new() - .create_new(true) - .read(false) - .append(true) - .open(path) - { - Ok(file) => file, - Err(e) => return Err(emit_batcher::BatchError::retry(e, batch)), + if let Some(file) = reuse_file { + file + } + // If there's no file to reuse then create a new one + else { + let file_id = + file_id(rolling_millis(self.roll_by, ts, parts), rolling_id()); + + // File format that sorts lexically; newer files sort earlier than older ones + path.push(file_name(&file_prefix, &file_ext, &file_ts, &file_id)); + + match fs::OpenOptions::new() + .create_new(true) + .read(false) + .append(true) + .open(path) + { + Ok(file) => file, + Err(e) => return Err(emit_batcher::BatchError::retry(e, batch)), + } } } }; @@ -137,6 +168,18 @@ impl FileSetBuilder { } } +fn try_open(path: impl AsRef) -> Result { + let mut file = fs::OpenOptions::new().read(false).append(true).open(path)?; + + // Defensive newline to ensure any incomplete event is terminated + // before any new ones are written + // We could avoid filling files with newlines by attempting to read + // one from the end first + file.write_all(b"\n")?; + + Ok(file) +} + fn dir_prefix_ext(file_set: impl AsRef) -> Result<(String, String, String), Error> { let file_set = file_set.as_ref(); @@ -167,6 +210,119 @@ fn dir_prefix_ext(file_set: impl AsRef) -> Result<(String, String, String) Ok((dir, prefix, ext)) } +fn rolling_millis(roll_by: RollBy, ts: emit::Timestamp, parts: emit::timestamp::Parts) -> u32 { + let truncated = match roll_by { + RollBy::Day => emit::Timestamp::from_parts(emit::timestamp::Parts { + years: parts.years, + months: parts.months, + days: parts.days, + ..Default::default() + }) + .unwrap(), + RollBy::Hour => emit::Timestamp::from_parts(emit::timestamp::Parts { + years: parts.years, + months: parts.months, + days: parts.days, + hours: parts.hours, + ..Default::default() + }) + .unwrap(), + RollBy::Minute => emit::Timestamp::from_parts(emit::timestamp::Parts { + years: parts.years, + months: parts.months, + days: parts.days, + hours: parts.hours, + minutes: parts.minutes, + ..Default::default() + }) + .unwrap(), + }; + + ts.duration_since(truncated).unwrap().as_millis() as u32 +} + +fn rolling_id() -> u32 { + rand::random() +} + +fn file_ts(roll_by: RollBy, parts: emit::timestamp::Parts) -> String { + match roll_by { + RollBy::Day => format!( + "{:>04}-{:>02}-{:>02}", + parts.years, parts.months, parts.days, + ), + RollBy::Hour => format!( + "{:>04}-{:>02}-{:>02}-{:>02}", + parts.years, parts.months, parts.days, parts.hours, + ), + RollBy::Minute => format!( + "{:>04}-{:>02}-{:>02}-{:>02}-{:>02}", + parts.years, parts.months, parts.days, parts.hours, parts.minutes, + ), + } +} + +fn file_id(rolling_millis: u32, rolling_id: u32) -> String { + format!("{:<08}.{:<08x}", rolling_millis, rolling_id) +} + +fn read_file_ts(file_name: &str) -> Option<&str> { + file_name.split('.').skip(1).next() +} + +fn read_file_id(file_name: &str) -> Option<&str> { + file_name.split('.').skip(2).next() +} + +fn file_name(file_prefix: &str, file_ext: &str, ts: &str, id: &str) -> String { + format!("{}.{}.{}.{}", file_prefix, ts, id, file_ext) +} + +fn apply_retention( + path: impl Into, + prefix: &str, + ext: &str, + max_files: usize, +) -> Result, io::Error> { + let path = path.into(); + + let read_dir = fs::read_dir(&path)?; + + let mut file_set = Vec::new(); + + for entry in read_dir { + let Ok(entry) = entry else { + continue; + }; + + if let Ok(file_type) = entry.file_type() { + if !file_type.is_file() { + continue; + } + } + + let file_name = entry.file_name(); + let Some(file_name) = file_name.to_str() else { + continue; + }; + + if file_name.starts_with(&prefix) && file_name.ends_with(&ext) { + file_set.push(file_name.to_owned()); + } + } + + file_set.sort_by(|a, b| a.cmp(b).reverse()); + + while file_set.len() >= max_files { + let mut path = path.clone(); + path.push(file_set.pop().unwrap()); + + let _ = fs::remove_file(path); + } + + Ok(file_set.pop()) +} + struct Buffer { bufs: Vec, index: usize, diff --git a/tests/smoke-test/main.rs b/tests/smoke-test/main.rs index 652c95e..d3b2523 100644 --- a/tests/smoke-test/main.rs +++ b/tests/smoke-test/main.rs @@ -36,11 +36,18 @@ async fn main() { .scope("some-scope", "0.1", emit::props! {}) .spawn() .unwrap()) - //.and_to(emit_metrics::plot_metrics_by_count(30, emit_term::stdout())) - .and_to(emit_file::set("./target/logs/log.txt").spawn().unwrap()) + .and_to(emit_metrics::plot_metrics_by_count(30, emit_term::stdout())) + .and_to( + emit_file::set("./target/logs/log.txt") + .reuse_files(true) + .roll_by_minute() + .max_files(6) + .spawn() + .unwrap(), + ) .init(); - //emit::setup().to(emit_term::stdout()).init_internal(); + emit::setup().to(emit_term::stdout()).init_internal(); sample_metrics();