diff --git a/Cargo.lock b/Cargo.lock index 5ca0853a79ae8..232b2ccfbdebe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12539,6 +12539,7 @@ dependencies = [ "console-subscriber", "criterion", "csv", + "dashmap", "databend-client", "deadpool", "derivative", diff --git a/Cargo.toml b/Cargo.toml index a24afe7d3fa59..6893e7940c5a3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -366,6 +366,7 @@ chrono.workspace = true chrono-tz.workspace = true colored.workspace = true csv = { version = "1.3", default-features = false } +dashmap.workspace = true databend-client = { version = "0.28.0", default-features = false, features = ["rustls"], optional = true } derivative.workspace = true dirs-next = { version = "2.0.0", default-features = false, optional = true } diff --git a/changelog.d/docker-logs-checkpointing.feature.md b/changelog.d/docker-logs-checkpointing.feature.md new file mode 100644 index 0000000000000..bb0fb6c6c8ed4 --- /dev/null +++ b/changelog.d/docker-logs-checkpointing.feature.md @@ -0,0 +1,3 @@ +Added checkpointing to the `docker_logs` source. Vector now resumes log collection from where it left off after a restart instead of only collecting new logs. On first start (no checkpoint), all available historical logs are collected by default. Set `since_now: true` to only capture logs produced after Vector starts. Previously, Vector was not capturing historical logs. + +authors: vincentbernat diff --git a/src/internal_events/docker_logs.rs b/src/internal_events/docker_logs.rs index bb44e112e05b8..002d816404343 100644 --- a/src/internal_events/docker_logs.rs +++ b/src/internal_events/docker_logs.rs @@ -7,6 +7,30 @@ use vector_lib::{ json_size::JsonSize, }; +#[derive(Debug, NamedInternalEvent)] +pub struct DockerLogsCheckpointWriteError { + pub error: std::io::Error, +} + +impl InternalEvent for DockerLogsCheckpointWriteError { + fn emit(self) { + error!( + message = "Failed writing docker_logs checkpoints.", + error = %self.error, + error_code = "writing_checkpoints", + error_type = error_type::WRITER_FAILED, + stage = error_stage::RECEIVING, + ); + counter!( + "component_errors_total", + "error_code" => "writing_checkpoints", + "error_type" => error_type::WRITER_FAILED, + "stage" => error_stage::RECEIVING, + ) + .increment(1); + } +} + #[derive(Debug, NamedInternalEvent)] pub struct DockerLogsEventsReceived<'a> { pub byte_size: JsonSize, diff --git a/src/sources/docker_logs/checkpointer.rs b/src/sources/docker_logs/checkpointer.rs new file mode 100644 index 0000000000000..76e4db09d637a --- /dev/null +++ b/src/sources/docker_logs/checkpointer.rs @@ -0,0 +1,235 @@ +use std::{ + collections::BTreeSet, + io, + path::{Path, PathBuf}, + sync::Arc, +}; + +use chrono::{DateTime, FixedOffset, Utc}; +use dashmap::DashMap; +use serde::{Deserialize, Serialize}; +use tokio::{fs, sync::Mutex}; +use tracing::{error, info, warn}; + +const TMP_FILE_NAME: &str = "checkpoints.new.json"; +const CHECKPOINT_FILE_NAME: &str = "checkpoints.json"; +const CHECKPOINT_EXPIRY: chrono::Duration = chrono::Duration::days(7); + +/// This enum represents the file format of checkpoints persisted to disk. Right +/// now there is only one variant. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(tag = "version", rename_all = "snake_case")] +enum State { + #[serde(rename = "1")] + V1 { + checkpoints: BTreeSet, + }, +} + +/// A container checkpoint mapping container ID to last log timestamp. +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)] +#[serde(rename_all = "snake_case")] +struct ContainerCheckpoint { + container_id: String, + last_log_timestamp: DateTime, + modified: DateTime, +} + +pub(super) struct Checkpointer { + tmp_file_path: PathBuf, + stable_file_path: PathBuf, + checkpoints: Arc, + last: Mutex>, +} + +/// A thread-safe handle for reading and writing checkpoints in-memory across +/// multiple threads. +#[derive(Debug, Default)] +pub(super) struct CheckpointsView { + checkpoints: DashMap>, + modified_times: DashMap>, +} + +impl CheckpointsView { + pub(super) fn update(&self, container_id: &str, timestamp: DateTime) { + self.checkpoints.insert(container_id.to_string(), timestamp); + self.modified_times + .insert(container_id.to_string(), Utc::now()); + } + + pub(super) fn get(&self, container_id: &str) -> Option> { + self.checkpoints.get(container_id).map(|r| *r.value()) + } + + pub(super) fn remove_expired(&self) { + let now = Utc::now(); + + // Collect all of the expired keys. Removing them while iterating can + // lead to deadlocks, the set should be small, and this is not a + // performance-sensitive path. + let to_remove = self + .modified_times + .iter() + .filter(|entry| { + let ts = entry.value(); + let duration = now - *ts; + duration >= CHECKPOINT_EXPIRY + }) + .map(|entry| entry.key().clone()) + .collect::>(); + + for key in to_remove { + self.checkpoints.remove(&key); + self.modified_times.remove(&key); + } + } + + fn load(&self, checkpoint: ContainerCheckpoint) { + self.checkpoints.insert( + checkpoint.container_id.clone(), + checkpoint.last_log_timestamp, + ); + self.modified_times + .insert(checkpoint.container_id, checkpoint.modified); + } + + fn set_state(&self, state: State) { + match state { + State::V1 { checkpoints } => { + for checkpoint in checkpoints { + self.load(checkpoint); + } + } + } + } + + fn get_state(&self) -> State { + State::V1 { + checkpoints: self + .checkpoints + .iter() + .map(|entry| { + let container_id = entry.key(); + let last_log_timestamp = entry.value(); + ContainerCheckpoint { + container_id: container_id.clone(), + last_log_timestamp: *last_log_timestamp, + modified: self + .modified_times + .get(container_id) + .map(|r| *r.value()) + .unwrap_or_else(Utc::now), + } + }) + .collect(), + } + } +} + +impl Checkpointer { + pub(super) fn new(data_dir: &Path) -> Checkpointer { + let tmp_file_path = data_dir.join(TMP_FILE_NAME); + let stable_file_path = data_dir.join(CHECKPOINT_FILE_NAME); + + Checkpointer { + tmp_file_path, + stable_file_path, + checkpoints: Arc::new(CheckpointsView::default()), + last: Mutex::new(None), + } + } + + pub(super) fn view(&self) -> Arc { + Arc::clone(&self.checkpoints) + } + + /// Persist the current checkpoints state to disk, making our best effort to + /// do so in an atomic way that allows for recovering the previous state in + /// the event of a crash. + pub(super) async fn write_checkpoints(&self) -> Result { + self.checkpoints.remove_expired(); + let current = self.checkpoints.get_state(); + + // Fetch last written state. + let mut last = self.last.lock().await; + if last.as_ref() != Some(¤t) { + // Write the new checkpoints to a tmp file and flush it fully to + // disk. If vector dies anywhere during this section, the existing + // stable file will still be in its current valid state and we'll be + // able to recover. + let tmp_file_path = self.tmp_file_path.clone(); + + // spawn_blocking shouldn't be needed: https://github.com/vectordotdev/vector/issues/23743 + let current = tokio::task::spawn_blocking(move || -> Result { + let mut f = std::io::BufWriter::new(std::fs::File::create(tmp_file_path)?); + serde_json::to_writer(&mut f, ¤t)?; + f.into_inner()?.sync_all()?; + Ok(current) + }) + .await + .map_err(io::Error::other)??; + + // Once the temp file is fully flushed, rename the tmp file to replace + // the previous stable file. This is an atomic operation on POSIX + // systems (and the stdlib claims to provide equivalent behavior on + // Windows), which should prevent scenarios where we don't have at least + // one full valid file to recover from. + fs::rename(&self.tmp_file_path, &self.stable_file_path).await?; + + *last = Some(current); + } + + Ok(self.checkpoints.checkpoints.len()) + } + + /// Read persisted checkpoints from disk, preferring the tmp file (which + /// indicates an interrupted checkpoint write) over the stable file. + pub(super) fn read_checkpoints(&self) { + // First try reading from the tmp file location. If this works, it means + // that the previous process was interrupted in the process of + // checkpointing and the tmp file should contain more recent data that + // should be preferred. + match self.read_checkpoints_file(&self.tmp_file_path) { + Ok(state) => { + warn!(message = "Recovered checkpoint data from interrupted process."); + self.checkpoints.set_state(state); + self.checkpoints.remove_expired(); + + // Try to move this tmp file to the stable location so we don't + // immediately overwrite it when we next persist checkpoints. + if let Err(error) = std::fs::rename(&self.tmp_file_path, &self.stable_file_path) { + warn!(message = "Error persisting recovered checkpoint file.", %error); + } + return; + } + Err(error) if error.kind() == io::ErrorKind::NotFound => { + // This is expected, so no warning needed + } + Err(error) => { + error!(message = "Unable to recover checkpoint data from interrupted process.", %error); + } + } + + // Next, attempt to read checkpoints from the stable file location. This + // is the expected location, so warn more aggressively if something goes + // wrong. + match self.read_checkpoints_file(&self.stable_file_path) { + Ok(state) => { + info!(message = "Loaded checkpoint data."); + self.checkpoints.set_state(state); + self.checkpoints.remove_expired(); + } + Err(error) if error.kind() == io::ErrorKind::NotFound => { + // This is expected, so no warning needed + } + Err(error) => { + warn!(message = "Unable to load checkpoint data.", %error); + } + } + } + + fn read_checkpoints_file(&self, path: &Path) -> io::Result { + let data = std::fs::read_to_string(path)?; + serde_json::from_str(&data).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)) + } +} diff --git a/src/sources/docker_logs/mod.rs b/src/sources/docker_logs/mod.rs index 45268788d45cc..70f9e401512dc 100644 --- a/src/sources/docker_logs/mod.rs +++ b/src/sources/docker_logs/mod.rs @@ -2,6 +2,7 @@ use std::{ collections::HashMap, convert::TryFrom, future::ready, + path::PathBuf, pin::Pin, sync::{Arc, LazyLock}, time::Duration, @@ -46,18 +47,22 @@ use crate::{ docker::{DockerTlsConfig, docker}, event::{self, EstimatedJsonEncodedSizeOf, LogEvent, Value, merge_state::LogEventMergeState}, internal_events::{ - DockerLogsCommunicationError, DockerLogsContainerEventReceived, - DockerLogsContainerMetadataFetchError, DockerLogsContainerUnwatch, - DockerLogsContainerWatch, DockerLogsEventsReceived, + DockerLogsCheckpointWriteError, DockerLogsCommunicationError, + DockerLogsContainerEventReceived, DockerLogsContainerMetadataFetchError, + DockerLogsContainerUnwatch, DockerLogsContainerWatch, DockerLogsEventsReceived, DockerLogsLoggingDriverUnsupportedError, DockerLogsTimestampParseError, StreamClosedError, }, line_agg::{self, LineAgg}, shutdown::ShutdownSignal, }; +mod checkpointer; + #[cfg(test)] mod tests; +use checkpointer::{Checkpointer, CheckpointsView}; + const IMAGE: &str = "image"; const CREATED_AT: &str = "container_created_at"; const NAME: &str = "container_name"; @@ -65,6 +70,7 @@ const STREAM: &str = "stream"; const CONTAINER: &str = "container_id"; // Prevent short hostname from being wrongly recognized as a container's short ID. const MIN_HOSTNAME_LENGTH: usize = 6; +const CHECKPOINT_FLUSH_INTERVAL: Duration = Duration::from_secs(5); static STDERR: LazyLock = LazyLock::new(|| "stderr".into()); static STDOUT: LazyLock = LazyLock::new(|| "stdout".into()); @@ -172,6 +178,26 @@ pub struct DockerLogsConfig { #[configurable(derived)] tls: Option, + /// Only include entries that appended to the journal after the entries have been read. + /// + /// When `false` (the default), Vector reads all available historical logs from Docker + /// on first start (when no checkpoint exists). When `true`, Vector only captures logs + /// produced after it starts. On subsequent restarts, Vector always resumes from its + /// last checkpoint regardless of this setting. + #[serde(default)] + since_now: bool, + + /// The directory used to persist file checkpoint positions. + /// + /// By default, the [global `data_dir` option][global_data_dir] is used. + /// Make sure the running user has write permissions to this directory. + /// + /// [global_data_dir]: https://vector.dev/docs/reference/configuration/global-options/#data_dir + #[serde(default)] + #[configurable(metadata(docs::examples = "/var/lib/vector"))] + #[configurable(metadata(docs::human_name = "Data Directory"))] + data_dir: Option, + /// The namespace to use for logs. This overrides the global setting. #[serde(default)] #[configurable(metadata(docs::hidden))] @@ -192,6 +218,8 @@ impl Default for DockerLogsConfig { auto_partial_merge: true, multiline: None, retry_backoff_secs: default_retry_backoff_secs(), + since_now: false, + data_dir: None, log_namespace: None, } } @@ -248,11 +276,21 @@ impl_generate_config_from_default!(DockerLogsConfig); impl SourceConfig for DockerLogsConfig { async fn build(&self, cx: SourceContext) -> crate::Result { let log_namespace = cx.log_namespace(self.log_namespace); + + let data_dir = cx + .globals + .resolve_and_make_data_subdir(self.data_dir.as_ref(), cx.key.id())?; + + let checkpointer = Checkpointer::new(&data_dir); + checkpointer.read_checkpoints(); + let checkpoint_view = checkpointer.view(); + let source = DockerLogsSource::new( self.clone().with_empty_partial_event_marker_field_as_none(), cx.out, cx.shutdown.clone(), log_namespace, + Arc::clone(&checkpoint_view), )?; // Capture currently running containers, and do main future(run) @@ -271,10 +309,33 @@ impl SourceConfig for DockerLogsConfig { let shutdown = cx.shutdown; // Once this ShutdownSignal resolves it will drop DockerLogsSource and by extension it's ShutdownSignal. Ok(Box::pin(async move { - Ok(tokio::select! { + // Write checkpoint at regular interval to not start from scratch in case of improper + // shutdown. + let checkpoint_shutdown = shutdown.clone(); + let checkpoint_task = tokio::spawn(async move { + let mut interval = tokio::time::interval(CHECKPOINT_FLUSH_INTERVAL); + loop { + let done = tokio::select! { + _ = interval.tick() => false, + _ = checkpoint_shutdown.clone() => true, + }; + if let Err(error) = checkpointer.write_checkpoints().await { + emit!(DockerLogsCheckpointWriteError { error }); + } + if done { + break; + } + } + }); + + tokio::select! { _ = fut => {} _ = shutdown => {} - }) + }; + + // Wait for checkpoint task to finish its final flush. + _ = checkpoint_task.await; + Ok(()) })) } @@ -385,12 +446,15 @@ impl DockerLogsSourceCore { // ? Otherwise connects to unix socket which requires sudo privileges, or docker group membership. let docker = docker(config.docker_host.clone(), config.tls.clone())?; - // Only log events created at-or-after this moment are logged. let now = Local::now(); - info!( - message = "Capturing logs from now on.", - now = %now.to_rfc3339() - ); + if config.since_now { + info!( + message = "Capturing logs from now on.", + now = %now.to_rfc3339() + ); + } else { + info!(message = "Capturing all available logs."); + } let line_agg_config = if let Some(ref multiline_config) = config.multiline { Some(line_agg::Config::try_from(multiline_config)?) @@ -479,6 +543,7 @@ impl DockerLogsSource { out: SourceSender, shutdown: ShutdownSignal, log_namespace: LogNamespace, + checkpoints: Arc, ) -> crate::Result { let backoff_secs = config.retry_backoff_secs; @@ -515,6 +580,7 @@ impl DockerLogsSource { main_send, shutdown, log_namespace, + checkpoints, }; Ok(DockerLogsSource { @@ -592,6 +658,9 @@ impl DockerLogsSource { value = self.main_recv.recv() => { match value { Some(Ok(info)) => { + if let Some((timestamp, _)) = info.last_log.as_ref() { + self.esb.checkpoints.update(info.id.as_str(), *timestamp); + } let state = self .containers .get_mut(&info.id) @@ -734,6 +803,8 @@ struct EventStreamBuilder { /// Self and event streams will end on this. shutdown: ShutdownSignal, log_namespace: LogNamespace, + /// Checkpoint view for persisting log positions + checkpoints: Arc, } impl EventStreamBuilder { @@ -754,7 +825,14 @@ impl EventStreamBuilder { { Ok(details) => match ContainerMetadata::from_details(details) { Ok(metadata) => { - let info = ContainerLogInfo::new(id, metadata, this.core.now_timestamp); + let created = if let Some(ts) = this.checkpoints.get(id.as_str()) { + ts.with_timezone(&Utc) + } else if this.core.config.since_now { + this.core.now_timestamp + } else { + DateTime::::default() + }; + let info = ContainerLogInfo::new(id, metadata, created); this.run_event_stream(info).await; return; } @@ -878,6 +956,11 @@ impl EventStreamBuilder { container_id: info.id.as_str() }); + // Update checkpoint before returning info to main loop + if let Some((timestamp, _)) = info.last_log.as_ref() { + self.checkpoints.update(info.id.as_str(), *timestamp); + } + let result = match (result, error) { (Ok(()), None) => Ok(info), (Err(()), _) => Err((info.id, ErrorPersistence::Permanent)), diff --git a/src/sources/docker_logs/tests.rs b/src/sources/docker_logs/tests.rs index f5af697c1b9f1..34c1c08cd58b7 100644 --- a/src/sources/docker_logs/tests.rs +++ b/src/sources/docker_logs/tests.rs @@ -12,11 +12,13 @@ mod integration_tests { /// is not present even though it is not used. fn exclude_self() { let (tx, _rx) = SourceSender::new_test(); + let checkpoints = Arc::new(CheckpointsView::default()); let mut source = DockerLogsSource::new( DockerLogsConfig::default(), tx, ShutdownSignal::noop(), LogNamespace::Legacy, + checkpoints, ) .unwrap(); source.hostname = Some("451062c59603".to_owned()); @@ -40,10 +42,12 @@ mod integration_tests { use futures::{FutureExt, stream::TryStreamExt}; use itertools::Itertools as _; use similar_asserts::assert_eq; + use std::path::Path; use vrl::value; use crate::{ SourceSender, + config::ComponentKey, event::Event, sources::docker_logs::{CONTAINER, CREATED_AT, IMAGE, NAME, *}, test_util::{ @@ -69,6 +73,10 @@ mod integration_tests { } async fn source_with_config(config: DockerLogsConfig) -> impl Stream + Unpin { + let mut config = config; + let dir = tempfile::tempdir().expect("Failed to create temp dir"); + config.data_dir = Some(dir.keep()); + config.since_now = true; let (sender, recv) = SourceSender::new_test(); let source = config .build(SourceContext::new_test(sender, None)) @@ -80,6 +88,37 @@ mod integration_tests { recv } + /// Source that can be shutdown with a persistent data directory + async fn source_with_shutdown( + config: DockerLogsConfig, + data_dir: &Path, + ) -> ( + impl Stream + Unpin, + tokio::task::JoinHandle>, + crate::shutdown::SourceShutdownCoordinator, + ) { + let mut config = config; + config.data_dir = Some(data_dir.to_path_buf()); + let source_id = ComponentKey::from("docker_logs_test"); + let (sender, recv) = SourceSender::new_test(); + let (context, shutdown) = SourceContext::new_shutdown(&source_id, sender); + let source = config.build(context).await.unwrap(); + let handle = tokio::spawn(source); + (recv, handle, shutdown) + } + + /// Shut down a source cleanly and wait for it to finish. + async fn shutdown_source( + mut shutdown: crate::shutdown::SourceShutdownCoordinator, + handle: tokio::task::JoinHandle>, + ) { + let source_id = ComponentKey::from("docker_logs_test"); + let deadline = tokio::time::Instant::now() + Duration::from_secs(5); + let shutdown_complete = shutdown.shutdown_source(&source_id, deadline); + assert!(shutdown_complete.await, "source did not shut down in time"); + handle.await.unwrap().unwrap(); + } + /// Users should ensure to remove container before exiting. async fn log_container( name: &str, @@ -976,4 +1015,79 @@ mod integration_tests { }) .await; } + + #[tokio::test] + async fn checkpoint_persists_across_restarts() { + trace_init(); + + let name = "vector_test_checkpoint_restart"; + let docker = docker(None, None).unwrap(); + let data_dir = tempfile::tempdir().expect("Failed to create temp dir"); + + // Long-running container producing numbered lines 1 second apart + // so each line has a distinct second-level timestamp (Docker's + // `since` parameter has second-level precision). + pull_busybox(&docker).await; + let id = cmd_container( + name, + None, + vec![ + "sh", + "-c", + "i=0; while true; do echo line_$i; sleep 1; i=$((i+1)); done", + ], + &docker, + false, + ) + .await; + container_start(&id, &docker).await.unwrap(); + + let config = DockerLogsConfig { + include_containers: Some(vec![name.to_owned()]), + since_now: true, + ..DockerLogsConfig::default() + }; + + // Start source and collect events. + let (out1, handle1, shutdown1) = + source_with_shutdown(config.clone(), data_dir.path()).await; + let events1 = collect_n(out1, 3).await; + let last_line_run1: i32 = events1.last().unwrap().as_log() + [log_schema().message_key().unwrap().to_string()] + .to_string_lossy() + .strip_prefix("line_") + .expect("unexpected message format") + .parse() + .unwrap(); + + // Shut down the source, this should create the checkpoint. + shutdown_source(shutdown1, handle1).await; + + // Let the container generates more logs. + tokio::time::sleep(Duration::from_secs(3)).await; + + // Start a second source using the same data directory. + let (out2, handle2, shutdown2) = source_with_shutdown(config, data_dir.path()).await; + let events2 = collect_n(out2, 3).await; + let first_line_run2: i32 = events2.first().unwrap().as_log() + [log_schema().message_key().unwrap().to_string()] + .to_string_lossy() + .strip_prefix("line_") + .expect("unexpected message format") + .parse() + .unwrap(); + + // Shut down the source and stop the container. + shutdown_source(shutdown2, handle2).await; + _ = container_kill(&id, &docker).await; + container_remove(&id, &docker).await; + + // Check we don't have a discontinuity. This may be a bit fragile since Docker's `since` + // parameter has second-level precision. + assert_eq!( + last_line_run1 + 1, + first_line_run2, + "Discontinuity between last line number of first run and first line number of second run" + ) + } } diff --git a/website/cue/reference/components/sources/docker_logs.cue b/website/cue/reference/components/sources/docker_logs.cue index 3331b3cdb254d..fed1e77cbd202 100644 --- a/website/cue/reference/components/sources/docker_logs.cue +++ b/website/cue/reference/components/sources/docker_logs.cue @@ -49,7 +49,7 @@ components: sources: docker_logs: { acknowledgements: false auto_generated: true collect: { - checkpoint: enabled: false + checkpoint: enabled: true from: { service: services.docker