From 01a2181bac149c0b9cf3bd28576181fed0028bb8 Mon Sep 17 00:00:00 2001 From: Vincent Bernat Date: Sun, 8 Mar 2026 05:44:50 +0100 Subject: [PATCH 1/2] feat(docker_logs source): add checkpointing Like for the file source, the docker_logs source now record checkpoints at regular interval (5s) and at shutdown in a file. The checkpointing logic is mostly stolen from the file source and maybe it would be possible to abstract it later. This also introduces an option to start from the last message. If `since_now` is true, the source does not look at old logs. This is false by default, so it is a behavior change, but I think it is a better default. We can still switch this if needed. Fix #7358 --- Cargo.lock | 1 + Cargo.toml | 1 + .../docker-logs-checkpointing.feature.md | 3 + src/internal_events/docker_logs.rs | 24 ++ src/sources/docker_logs/checkpointer.rs | 235 ++++++++++++++++++ src/sources/docker_logs/mod.rs | 105 +++++++- src/sources/docker_logs/tests.rs | 114 +++++++++ .../components/sources/docker_logs.cue | 2 +- 8 files changed, 473 insertions(+), 12 deletions(-) create mode 100644 changelog.d/docker-logs-checkpointing.feature.md create mode 100644 src/sources/docker_logs/checkpointer.rs diff --git a/Cargo.lock b/Cargo.lock index 5ca0853a79ae8..232b2ccfbdebe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12539,6 +12539,7 @@ dependencies = [ "console-subscriber", "criterion", "csv", + "dashmap", "databend-client", "deadpool", "derivative", diff --git a/Cargo.toml b/Cargo.toml index a24afe7d3fa59..6893e7940c5a3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -366,6 +366,7 @@ chrono.workspace = true chrono-tz.workspace = true colored.workspace = true csv = { version = "1.3", default-features = false } +dashmap.workspace = true databend-client = { version = "0.28.0", default-features = false, features = ["rustls"], optional = true } derivative.workspace = true dirs-next = { version = "2.0.0", default-features = false, optional = true } diff --git a/changelog.d/docker-logs-checkpointing.feature.md b/changelog.d/docker-logs-checkpointing.feature.md new file mode 100644 index 0000000000000..bb0fb6c6c8ed4 --- /dev/null +++ b/changelog.d/docker-logs-checkpointing.feature.md @@ -0,0 +1,3 @@ +Added checkpointing to the `docker_logs` source. Vector now resumes log collection from where it left off after a restart instead of only collecting new logs. On first start (no checkpoint), all available historical logs are collected by default. Set `since_now: true` to only capture logs produced after Vector starts. Previously, Vector was not capturing historical logs. + +authors: vincentbernat diff --git a/src/internal_events/docker_logs.rs b/src/internal_events/docker_logs.rs index bb44e112e05b8..002d816404343 100644 --- a/src/internal_events/docker_logs.rs +++ b/src/internal_events/docker_logs.rs @@ -7,6 +7,30 @@ use vector_lib::{ json_size::JsonSize, }; +#[derive(Debug, NamedInternalEvent)] +pub struct DockerLogsCheckpointWriteError { + pub error: std::io::Error, +} + +impl InternalEvent for DockerLogsCheckpointWriteError { + fn emit(self) { + error!( + message = "Failed writing docker_logs checkpoints.", + error = %self.error, + error_code = "writing_checkpoints", + error_type = error_type::WRITER_FAILED, + stage = error_stage::RECEIVING, + ); + counter!( + "component_errors_total", + "error_code" => "writing_checkpoints", + "error_type" => error_type::WRITER_FAILED, + "stage" => error_stage::RECEIVING, + ) + .increment(1); + } +} + #[derive(Debug, NamedInternalEvent)] pub struct DockerLogsEventsReceived<'a> { pub byte_size: JsonSize, diff --git a/src/sources/docker_logs/checkpointer.rs b/src/sources/docker_logs/checkpointer.rs new file mode 100644 index 0000000000000..76e4db09d637a --- /dev/null +++ b/src/sources/docker_logs/checkpointer.rs @@ -0,0 +1,235 @@ +use std::{ + collections::BTreeSet, + io, + path::{Path, PathBuf}, + sync::Arc, +}; + +use chrono::{DateTime, FixedOffset, Utc}; +use dashmap::DashMap; +use serde::{Deserialize, Serialize}; +use tokio::{fs, sync::Mutex}; +use tracing::{error, info, warn}; + +const TMP_FILE_NAME: &str = "checkpoints.new.json"; +const CHECKPOINT_FILE_NAME: &str = "checkpoints.json"; +const CHECKPOINT_EXPIRY: chrono::Duration = chrono::Duration::days(7); + +/// This enum represents the file format of checkpoints persisted to disk. Right +/// now there is only one variant. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(tag = "version", rename_all = "snake_case")] +enum State { + #[serde(rename = "1")] + V1 { + checkpoints: BTreeSet, + }, +} + +/// A container checkpoint mapping container ID to last log timestamp. +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)] +#[serde(rename_all = "snake_case")] +struct ContainerCheckpoint { + container_id: String, + last_log_timestamp: DateTime, + modified: DateTime, +} + +pub(super) struct Checkpointer { + tmp_file_path: PathBuf, + stable_file_path: PathBuf, + checkpoints: Arc, + last: Mutex>, +} + +/// A thread-safe handle for reading and writing checkpoints in-memory across +/// multiple threads. +#[derive(Debug, Default)] +pub(super) struct CheckpointsView { + checkpoints: DashMap>, + modified_times: DashMap>, +} + +impl CheckpointsView { + pub(super) fn update(&self, container_id: &str, timestamp: DateTime) { + self.checkpoints.insert(container_id.to_string(), timestamp); + self.modified_times + .insert(container_id.to_string(), Utc::now()); + } + + pub(super) fn get(&self, container_id: &str) -> Option> { + self.checkpoints.get(container_id).map(|r| *r.value()) + } + + pub(super) fn remove_expired(&self) { + let now = Utc::now(); + + // Collect all of the expired keys. Removing them while iterating can + // lead to deadlocks, the set should be small, and this is not a + // performance-sensitive path. + let to_remove = self + .modified_times + .iter() + .filter(|entry| { + let ts = entry.value(); + let duration = now - *ts; + duration >= CHECKPOINT_EXPIRY + }) + .map(|entry| entry.key().clone()) + .collect::>(); + + for key in to_remove { + self.checkpoints.remove(&key); + self.modified_times.remove(&key); + } + } + + fn load(&self, checkpoint: ContainerCheckpoint) { + self.checkpoints.insert( + checkpoint.container_id.clone(), + checkpoint.last_log_timestamp, + ); + self.modified_times + .insert(checkpoint.container_id, checkpoint.modified); + } + + fn set_state(&self, state: State) { + match state { + State::V1 { checkpoints } => { + for checkpoint in checkpoints { + self.load(checkpoint); + } + } + } + } + + fn get_state(&self) -> State { + State::V1 { + checkpoints: self + .checkpoints + .iter() + .map(|entry| { + let container_id = entry.key(); + let last_log_timestamp = entry.value(); + ContainerCheckpoint { + container_id: container_id.clone(), + last_log_timestamp: *last_log_timestamp, + modified: self + .modified_times + .get(container_id) + .map(|r| *r.value()) + .unwrap_or_else(Utc::now), + } + }) + .collect(), + } + } +} + +impl Checkpointer { + pub(super) fn new(data_dir: &Path) -> Checkpointer { + let tmp_file_path = data_dir.join(TMP_FILE_NAME); + let stable_file_path = data_dir.join(CHECKPOINT_FILE_NAME); + + Checkpointer { + tmp_file_path, + stable_file_path, + checkpoints: Arc::new(CheckpointsView::default()), + last: Mutex::new(None), + } + } + + pub(super) fn view(&self) -> Arc { + Arc::clone(&self.checkpoints) + } + + /// Persist the current checkpoints state to disk, making our best effort to + /// do so in an atomic way that allows for recovering the previous state in + /// the event of a crash. + pub(super) async fn write_checkpoints(&self) -> Result { + self.checkpoints.remove_expired(); + let current = self.checkpoints.get_state(); + + // Fetch last written state. + let mut last = self.last.lock().await; + if last.as_ref() != Some(¤t) { + // Write the new checkpoints to a tmp file and flush it fully to + // disk. If vector dies anywhere during this section, the existing + // stable file will still be in its current valid state and we'll be + // able to recover. + let tmp_file_path = self.tmp_file_path.clone(); + + // spawn_blocking shouldn't be needed: https://github.com/vectordotdev/vector/issues/23743 + let current = tokio::task::spawn_blocking(move || -> Result { + let mut f = std::io::BufWriter::new(std::fs::File::create(tmp_file_path)?); + serde_json::to_writer(&mut f, ¤t)?; + f.into_inner()?.sync_all()?; + Ok(current) + }) + .await + .map_err(io::Error::other)??; + + // Once the temp file is fully flushed, rename the tmp file to replace + // the previous stable file. This is an atomic operation on POSIX + // systems (and the stdlib claims to provide equivalent behavior on + // Windows), which should prevent scenarios where we don't have at least + // one full valid file to recover from. + fs::rename(&self.tmp_file_path, &self.stable_file_path).await?; + + *last = Some(current); + } + + Ok(self.checkpoints.checkpoints.len()) + } + + /// Read persisted checkpoints from disk, preferring the tmp file (which + /// indicates an interrupted checkpoint write) over the stable file. + pub(super) fn read_checkpoints(&self) { + // First try reading from the tmp file location. If this works, it means + // that the previous process was interrupted in the process of + // checkpointing and the tmp file should contain more recent data that + // should be preferred. + match self.read_checkpoints_file(&self.tmp_file_path) { + Ok(state) => { + warn!(message = "Recovered checkpoint data from interrupted process."); + self.checkpoints.set_state(state); + self.checkpoints.remove_expired(); + + // Try to move this tmp file to the stable location so we don't + // immediately overwrite it when we next persist checkpoints. + if let Err(error) = std::fs::rename(&self.tmp_file_path, &self.stable_file_path) { + warn!(message = "Error persisting recovered checkpoint file.", %error); + } + return; + } + Err(error) if error.kind() == io::ErrorKind::NotFound => { + // This is expected, so no warning needed + } + Err(error) => { + error!(message = "Unable to recover checkpoint data from interrupted process.", %error); + } + } + + // Next, attempt to read checkpoints from the stable file location. This + // is the expected location, so warn more aggressively if something goes + // wrong. + match self.read_checkpoints_file(&self.stable_file_path) { + Ok(state) => { + info!(message = "Loaded checkpoint data."); + self.checkpoints.set_state(state); + self.checkpoints.remove_expired(); + } + Err(error) if error.kind() == io::ErrorKind::NotFound => { + // This is expected, so no warning needed + } + Err(error) => { + warn!(message = "Unable to load checkpoint data.", %error); + } + } + } + + fn read_checkpoints_file(&self, path: &Path) -> io::Result { + let data = std::fs::read_to_string(path)?; + serde_json::from_str(&data).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)) + } +} diff --git a/src/sources/docker_logs/mod.rs b/src/sources/docker_logs/mod.rs index 45268788d45cc..70f9e401512dc 100644 --- a/src/sources/docker_logs/mod.rs +++ b/src/sources/docker_logs/mod.rs @@ -2,6 +2,7 @@ use std::{ collections::HashMap, convert::TryFrom, future::ready, + path::PathBuf, pin::Pin, sync::{Arc, LazyLock}, time::Duration, @@ -46,18 +47,22 @@ use crate::{ docker::{DockerTlsConfig, docker}, event::{self, EstimatedJsonEncodedSizeOf, LogEvent, Value, merge_state::LogEventMergeState}, internal_events::{ - DockerLogsCommunicationError, DockerLogsContainerEventReceived, - DockerLogsContainerMetadataFetchError, DockerLogsContainerUnwatch, - DockerLogsContainerWatch, DockerLogsEventsReceived, + DockerLogsCheckpointWriteError, DockerLogsCommunicationError, + DockerLogsContainerEventReceived, DockerLogsContainerMetadataFetchError, + DockerLogsContainerUnwatch, DockerLogsContainerWatch, DockerLogsEventsReceived, DockerLogsLoggingDriverUnsupportedError, DockerLogsTimestampParseError, StreamClosedError, }, line_agg::{self, LineAgg}, shutdown::ShutdownSignal, }; +mod checkpointer; + #[cfg(test)] mod tests; +use checkpointer::{Checkpointer, CheckpointsView}; + const IMAGE: &str = "image"; const CREATED_AT: &str = "container_created_at"; const NAME: &str = "container_name"; @@ -65,6 +70,7 @@ const STREAM: &str = "stream"; const CONTAINER: &str = "container_id"; // Prevent short hostname from being wrongly recognized as a container's short ID. const MIN_HOSTNAME_LENGTH: usize = 6; +const CHECKPOINT_FLUSH_INTERVAL: Duration = Duration::from_secs(5); static STDERR: LazyLock = LazyLock::new(|| "stderr".into()); static STDOUT: LazyLock = LazyLock::new(|| "stdout".into()); @@ -172,6 +178,26 @@ pub struct DockerLogsConfig { #[configurable(derived)] tls: Option, + /// Only include entries that appended to the journal after the entries have been read. + /// + /// When `false` (the default), Vector reads all available historical logs from Docker + /// on first start (when no checkpoint exists). When `true`, Vector only captures logs + /// produced after it starts. On subsequent restarts, Vector always resumes from its + /// last checkpoint regardless of this setting. + #[serde(default)] + since_now: bool, + + /// The directory used to persist file checkpoint positions. + /// + /// By default, the [global `data_dir` option][global_data_dir] is used. + /// Make sure the running user has write permissions to this directory. + /// + /// [global_data_dir]: https://vector.dev/docs/reference/configuration/global-options/#data_dir + #[serde(default)] + #[configurable(metadata(docs::examples = "/var/lib/vector"))] + #[configurable(metadata(docs::human_name = "Data Directory"))] + data_dir: Option, + /// The namespace to use for logs. This overrides the global setting. #[serde(default)] #[configurable(metadata(docs::hidden))] @@ -192,6 +218,8 @@ impl Default for DockerLogsConfig { auto_partial_merge: true, multiline: None, retry_backoff_secs: default_retry_backoff_secs(), + since_now: false, + data_dir: None, log_namespace: None, } } @@ -248,11 +276,21 @@ impl_generate_config_from_default!(DockerLogsConfig); impl SourceConfig for DockerLogsConfig { async fn build(&self, cx: SourceContext) -> crate::Result { let log_namespace = cx.log_namespace(self.log_namespace); + + let data_dir = cx + .globals + .resolve_and_make_data_subdir(self.data_dir.as_ref(), cx.key.id())?; + + let checkpointer = Checkpointer::new(&data_dir); + checkpointer.read_checkpoints(); + let checkpoint_view = checkpointer.view(); + let source = DockerLogsSource::new( self.clone().with_empty_partial_event_marker_field_as_none(), cx.out, cx.shutdown.clone(), log_namespace, + Arc::clone(&checkpoint_view), )?; // Capture currently running containers, and do main future(run) @@ -271,10 +309,33 @@ impl SourceConfig for DockerLogsConfig { let shutdown = cx.shutdown; // Once this ShutdownSignal resolves it will drop DockerLogsSource and by extension it's ShutdownSignal. Ok(Box::pin(async move { - Ok(tokio::select! { + // Write checkpoint at regular interval to not start from scratch in case of improper + // shutdown. + let checkpoint_shutdown = shutdown.clone(); + let checkpoint_task = tokio::spawn(async move { + let mut interval = tokio::time::interval(CHECKPOINT_FLUSH_INTERVAL); + loop { + let done = tokio::select! { + _ = interval.tick() => false, + _ = checkpoint_shutdown.clone() => true, + }; + if let Err(error) = checkpointer.write_checkpoints().await { + emit!(DockerLogsCheckpointWriteError { error }); + } + if done { + break; + } + } + }); + + tokio::select! { _ = fut => {} _ = shutdown => {} - }) + }; + + // Wait for checkpoint task to finish its final flush. + _ = checkpoint_task.await; + Ok(()) })) } @@ -385,12 +446,15 @@ impl DockerLogsSourceCore { // ? Otherwise connects to unix socket which requires sudo privileges, or docker group membership. let docker = docker(config.docker_host.clone(), config.tls.clone())?; - // Only log events created at-or-after this moment are logged. let now = Local::now(); - info!( - message = "Capturing logs from now on.", - now = %now.to_rfc3339() - ); + if config.since_now { + info!( + message = "Capturing logs from now on.", + now = %now.to_rfc3339() + ); + } else { + info!(message = "Capturing all available logs."); + } let line_agg_config = if let Some(ref multiline_config) = config.multiline { Some(line_agg::Config::try_from(multiline_config)?) @@ -479,6 +543,7 @@ impl DockerLogsSource { out: SourceSender, shutdown: ShutdownSignal, log_namespace: LogNamespace, + checkpoints: Arc, ) -> crate::Result { let backoff_secs = config.retry_backoff_secs; @@ -515,6 +580,7 @@ impl DockerLogsSource { main_send, shutdown, log_namespace, + checkpoints, }; Ok(DockerLogsSource { @@ -592,6 +658,9 @@ impl DockerLogsSource { value = self.main_recv.recv() => { match value { Some(Ok(info)) => { + if let Some((timestamp, _)) = info.last_log.as_ref() { + self.esb.checkpoints.update(info.id.as_str(), *timestamp); + } let state = self .containers .get_mut(&info.id) @@ -734,6 +803,8 @@ struct EventStreamBuilder { /// Self and event streams will end on this. shutdown: ShutdownSignal, log_namespace: LogNamespace, + /// Checkpoint view for persisting log positions + checkpoints: Arc, } impl EventStreamBuilder { @@ -754,7 +825,14 @@ impl EventStreamBuilder { { Ok(details) => match ContainerMetadata::from_details(details) { Ok(metadata) => { - let info = ContainerLogInfo::new(id, metadata, this.core.now_timestamp); + let created = if let Some(ts) = this.checkpoints.get(id.as_str()) { + ts.with_timezone(&Utc) + } else if this.core.config.since_now { + this.core.now_timestamp + } else { + DateTime::::default() + }; + let info = ContainerLogInfo::new(id, metadata, created); this.run_event_stream(info).await; return; } @@ -878,6 +956,11 @@ impl EventStreamBuilder { container_id: info.id.as_str() }); + // Update checkpoint before returning info to main loop + if let Some((timestamp, _)) = info.last_log.as_ref() { + self.checkpoints.update(info.id.as_str(), *timestamp); + } + let result = match (result, error) { (Ok(()), None) => Ok(info), (Err(()), _) => Err((info.id, ErrorPersistence::Permanent)), diff --git a/src/sources/docker_logs/tests.rs b/src/sources/docker_logs/tests.rs index f5af697c1b9f1..1c3e3a6d26718 100644 --- a/src/sources/docker_logs/tests.rs +++ b/src/sources/docker_logs/tests.rs @@ -12,11 +12,13 @@ mod integration_tests { /// is not present even though it is not used. fn exclude_self() { let (tx, _rx) = SourceSender::new_test(); + let checkpoints = Arc::new(CheckpointsView::default()); let mut source = DockerLogsSource::new( DockerLogsConfig::default(), tx, ShutdownSignal::noop(), LogNamespace::Legacy, + checkpoints, ) .unwrap(); source.hostname = Some("451062c59603".to_owned()); @@ -40,10 +42,12 @@ mod integration_tests { use futures::{FutureExt, stream::TryStreamExt}; use itertools::Itertools as _; use similar_asserts::assert_eq; + use std::path::Path; use vrl::value; use crate::{ SourceSender, + config::ComponentKey, event::Event, sources::docker_logs::{CONTAINER, CREATED_AT, IMAGE, NAME, *}, test_util::{ @@ -69,6 +73,10 @@ mod integration_tests { } async fn source_with_config(config: DockerLogsConfig) -> impl Stream + Unpin { + let mut config = config; + let dir = tempfile::tempdir().expect("Failed to create temp dir"); + config.data_dir = Some(dir.keep()); + config.since_now = true; let (sender, recv) = SourceSender::new_test(); let source = config .build(SourceContext::new_test(sender, None)) @@ -80,6 +88,37 @@ mod integration_tests { recv } + /// Source that can be shutdown with a persistent data directory + async fn source_with_shutdown( + config: DockerLogsConfig, + data_dir: &Path, + ) -> ( + impl Stream + Unpin, + tokio::task::JoinHandle>, + crate::shutdown::SourceShutdownCoordinator, + ) { + let mut config = config; + config.data_dir = Some(data_dir.to_path_buf()); + let source_id = ComponentKey::from("docker_logs_test"); + let (sender, recv) = SourceSender::new_test(); + let (context, shutdown) = SourceContext::new_shutdown(&source_id, sender); + let source = config.build(context).await.unwrap(); + let handle = tokio::spawn(source); + (recv, handle, shutdown) + } + + /// Shut down a source cleanly and wait for it to finish. + async fn shutdown_source( + mut shutdown: crate::shutdown::SourceShutdownCoordinator, + handle: tokio::task::JoinHandle>, + ) { + let source_id = ComponentKey::from("docker_logs_test"); + let deadline = tokio::time::Instant::now() + Duration::from_secs(5); + let shutdown_complete = shutdown.shutdown_source(&source_id, deadline.into()); + assert!(shutdown_complete.await, "source did not shut down in time"); + handle.await.unwrap().unwrap(); + } + /// Users should ensure to remove container before exiting. async fn log_container( name: &str, @@ -976,4 +1015,79 @@ mod integration_tests { }) .await; } + + #[tokio::test] + async fn checkpoint_persists_across_restarts() { + trace_init(); + + let name = "vector_test_checkpoint_restart"; + let docker = docker(None, None).unwrap(); + let data_dir = tempfile::tempdir().expect("Failed to create temp dir"); + + // Long-running container producing numbered lines 1 second apart + // so each line has a distinct second-level timestamp (Docker's + // `since` parameter has second-level precision). + pull_busybox(&docker).await; + let id = cmd_container( + name, + None, + vec![ + "sh", + "-c", + "i=0; while true; do echo line_$i; sleep 1; i=$((i+1)); done", + ], + &docker, + false, + ) + .await; + container_start(&id, &docker).await.unwrap(); + + let config = DockerLogsConfig { + include_containers: Some(vec![name.to_owned()]), + since_now: true, + ..DockerLogsConfig::default() + }; + + // Start source and collect events. + let (out1, handle1, shutdown1) = + source_with_shutdown(config.clone(), data_dir.path()).await; + let events1 = collect_n(out1, 3).await; + let last_line_run1: i32 = events1.last().unwrap().as_log() + [log_schema().message_key().unwrap().to_string()] + .to_string_lossy() + .strip_prefix("line_") + .expect("unexpected message format") + .parse() + .unwrap(); + + // Shut down the source, this should create the checkpoint. + shutdown_source(shutdown1, handle1).await; + + // Let the container generates more logs. + tokio::time::sleep(Duration::from_secs(3)).await; + + // Start a second source using the same data directory. + let (out2, handle2, shutdown2) = source_with_shutdown(config, data_dir.path()).await; + let events2 = collect_n(out2, 3).await; + let first_line_run2: i32 = events2.first().unwrap().as_log() + [log_schema().message_key().unwrap().to_string()] + .to_string_lossy() + .strip_prefix("line_") + .expect("unexpected message format") + .parse() + .unwrap(); + + // Shut down the source and stop the container. + shutdown_source(shutdown2, handle2).await; + _ = container_kill(&id, &docker).await; + container_remove(&id, &docker).await; + + // Check we don't have a discontinuity. This may be a bit fragile since Docker's `since` + // parameter has second-level precision. + assert_eq!( + last_line_run1 + 1, + first_line_run2, + "Discontinuity between last line number of first run and first line number of second run" + ) + } } diff --git a/website/cue/reference/components/sources/docker_logs.cue b/website/cue/reference/components/sources/docker_logs.cue index 3331b3cdb254d..fed1e77cbd202 100644 --- a/website/cue/reference/components/sources/docker_logs.cue +++ b/website/cue/reference/components/sources/docker_logs.cue @@ -49,7 +49,7 @@ components: sources: docker_logs: { acknowledgements: false auto_generated: true collect: { - checkpoint: enabled: false + checkpoint: enabled: true from: { service: services.docker From e7a3b9f9aac275ac2363ab6b3ea8f4f1d99c7739 Mon Sep 17 00:00:00 2001 From: Vincent Bernat Date: Sun, 8 Mar 2026 18:43:26 +0100 Subject: [PATCH 2/2] fix(docker_logs source): fix a clippy warning --- src/sources/docker_logs/tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sources/docker_logs/tests.rs b/src/sources/docker_logs/tests.rs index 1c3e3a6d26718..34c1c08cd58b7 100644 --- a/src/sources/docker_logs/tests.rs +++ b/src/sources/docker_logs/tests.rs @@ -114,7 +114,7 @@ mod integration_tests { ) { let source_id = ComponentKey::from("docker_logs_test"); let deadline = tokio::time::Instant::now() + Duration::from_secs(5); - let shutdown_complete = shutdown.shutdown_source(&source_id, deadline.into()); + let shutdown_complete = shutdown.shutdown_source(&source_id, deadline); assert!(shutdown_complete.await, "source did not shut down in time"); handle.await.unwrap().unwrap(); }