diff --git a/changelog.d/udp_datagram_reconnect_on_send_failure.fix.md b/changelog.d/udp_datagram_reconnect_on_send_failure.fix.md new file mode 100644 index 0000000000000..c581211ed731d --- /dev/null +++ b/changelog.d/udp_datagram_reconnect_on_send_failure.fix.md @@ -0,0 +1,3 @@ +UDP and Unix datagram sinks now reconnect and retry the failed event after a socket send failure instead of silently dropping all remaining events on the broken socket. + +authors: thomasqueirozb diff --git a/src/sinks/util/datagram.rs b/src/sinks/util/datagram.rs index 9fe37f761477a..94f0e9cc24b87 100644 --- a/src/sinks/util/datagram.rs +++ b/src/sinks/util/datagram.rs @@ -1,4 +1,6 @@ -use bytes::BytesMut; +use std::pin::Pin; + +use bytes::{Bytes, BytesMut}; use futures::{StreamExt, stream::BoxStream}; use futures_util::stream::Peekable; #[cfg(unix)] @@ -16,7 +18,7 @@ use vector_lib::{ use crate::internal_events::{UnixSendIncompleteError, UnixSocketSendError}; use crate::{ codecs::Transformer, - event::{Event, EventStatus, Finalizable}, + event::{Event, EventFinalizers, EventStatus, Finalizable}, internal_events::{ SocketEventsSent, SocketMode, SocketSendError, UdpChunkingError, UdpSendIncompleteError, }, @@ -28,54 +30,133 @@ pub enum DatagramSocket { Unix(UnixDatagram, PathBuf), } -pub async fn send_datagrams>( - input: &mut Peekable>, +/// A pre-encoded datagram ready to be sent over the socket. +pub struct EncodedDatagram { + /// The encoded bytes to send (`None` if encoding failed). + pub bytes: Option, + pub finalizers: EventFinalizers, +} + +enum SendOutcome { + /// Datagram was successfully sent. + Delivered, + /// Per-event error that reconnecting the socket cannot fix (e.g. EMSGSIZE, + /// EDESTADDRREQ). Drop the event and move on. + UnrecoverableError, + /// Socket-level error that may be resolved by reconnecting. + SocketError, +} + +/// Returns `true` only for errors that are known to be transient socket-level +/// failures where reconnecting may succeed. All other errors — including +/// EDESTADDRREQ (os error 89), EMSGSIZE, and any unknown error — are treated as +/// non-recoverable so the stream can drop the event and make progress. +fn is_recoverable_socket_error(error: &std::io::Error) -> bool { + use std::io::ErrorKind; + matches!( + error.kind(), + ErrorKind::ConnectionRefused + | ErrorKind::ConnectionReset + | ErrorKind::ConnectionAborted + | ErrorKind::BrokenPipe + | ErrorKind::NetworkDown + | ErrorKind::HostUnreachable + | ErrorKind::NetworkUnreachable + | ErrorKind::TimedOut + | ErrorKind::Interrupted + ) +} + +/// Transforms and encodes a raw event stream into a stream of [`EncodedDatagram`]s +/// ready to be passed to [`send_datagrams`]. +pub fn encode_to_datagrams<'a, E>( + input: BoxStream<'a, Event>, + transformer: Transformer, + mut encoder: E, +) -> Peekable> +where + E: Encoder + Send + 'a, +{ + input + .map(move |mut event| { + transformer.transform(&mut event); + let finalizers = event.take_finalizers(); + let mut bytes = BytesMut::new(); + let bytes = if encoder.encode(event, &mut bytes).is_ok() { + Some(bytes.freeze()) + } else { + None + }; + EncodedDatagram { bytes, finalizers } + }) + .boxed() + .peekable() +} + +pub async fn send_datagrams( + input: &mut Peekable>, mut socket: DatagramSocket, - transformer: &Transformer, - encoder: &mut E, chunker: &Option, bytes_sent: &::Handle, ) { - while let Some(mut event) = input.next().await { - transformer.transform(&mut event); - let finalizers = event.take_finalizers(); - let mut bytes = BytesMut::new(); + loop { + // Peek without consuming so the event can be retried after reconnection. + // Clone the bytes (ref-counted, cheap) to release the borrow on `input`. + let Some(datagram) = Pin::new(&mut *input).peek().await else { + break; + }; + let bytes = datagram.bytes.clone(); - // Errors are handled by `Encoder`. - if encoder.encode(event, &mut bytes).is_err() { - finalizers.update_status(EventStatus::Errored); + let Some(bytes) = bytes else { + // Encoding failed earlier — consume and mark errored. + if let Some(datagram) = input.next().await { + datagram.finalizers.update_status(EventStatus::Errored); + } continue; - } + }; - let delivered = if let Some(chunker) = chunker { + let outcome = if let Some(chunker) = chunker { let data_size = bytes.len(); - match chunker.chunk(bytes.freeze()) { + match chunker.chunk(bytes) { Ok(chunks) => { - let mut chunks_delivered = true; - for bytes in chunks { - if !send_and_emit(&mut socket, &bytes, bytes_sent).await { - chunks_delivered = false; + let mut result = SendOutcome::Delivered; + for chunk in chunks { + result = send_and_emit(&mut socket, &chunk, bytes_sent).await; + if !matches!(result, SendOutcome::Delivered) { break; } } - chunks_delivered + result } Err(err) => { emit!(UdpChunkingError { data_size, error: err }); - false + SendOutcome::UnrecoverableError } } } else { - send_and_emit(&mut socket, &bytes.freeze(), bytes_sent).await + send_and_emit(&mut socket, &bytes, bytes_sent).await }; - if delivered { - finalizers.update_status(EventStatus::Delivered); - } else { - finalizers.update_status(EventStatus::Errored); + match outcome { + SendOutcome::Delivered => { + if let Some(datagram) = input.next().await { + datagram.finalizers.update_status(EventStatus::Delivered); + } + } + SendOutcome::SocketError => { + // Leave item in stream for retry after reconnection. + break; + } + SendOutcome::UnrecoverableError => { + // Per-event or permanent error — consume and mark errored so the + // stream can make progress rather than retrying forever. + if let Some(datagram) = input.next().await { + datagram.finalizers.update_status(EventStatus::Errored); + } + } } } } @@ -84,7 +165,7 @@ async fn send_and_emit( socket: &mut DatagramSocket, bytes: &bytes::Bytes, bytes_sent: &::Handle, -) -> bool { +) -> SendOutcome { match send_datagram(socket, bytes).await { Ok(()) => { emit!(SocketEventsSent { @@ -97,9 +178,10 @@ async fn send_and_emit( byte_size: bytes.len().into(), }); bytes_sent.emit(ByteSize(bytes.len())); - true + SendOutcome::Delivered } Err(error) => { + let recoverable = is_recoverable_socket_error(&error); match socket { DatagramSocket::Udp(_) => emit!(SocketSendError { mode: SocketMode::Udp, @@ -113,7 +195,11 @@ async fn send_and_emit( }) } }; - false + if recoverable { + SendOutcome::SocketError + } else { + SendOutcome::UnrecoverableError + } } } } diff --git a/src/sinks/util/udp.rs b/src/sinks/util/udp.rs index 0b9441e682cde..339125acf9b15 100644 --- a/src/sinks/util/udp.rs +++ b/src/sinks/util/udp.rs @@ -4,7 +4,7 @@ use std::{ }; use async_trait::async_trait; -use futures::{FutureExt, StreamExt, stream::BoxStream}; +use futures::{FutureExt, stream::BoxStream}; use snafu::{ResultExt, Snafu}; use tokio::{net::UdpSocket, time::sleep}; use tokio_util::codec::Encoder; @@ -16,7 +16,7 @@ use vector_lib::{ use super::{ SinkBuildError, - datagram::{DatagramSocket, send_datagrams}, + datagram::{DatagramSocket, encode_to_datagrams, send_datagrams}, }; use crate::{ codecs::Transformer, @@ -198,17 +198,14 @@ where E: Encoder + Clone + Send + Sync, { async fn run(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { - let mut input = input.peekable(); + let mut input = encode_to_datagrams(input, self.transformer.clone(), self.encoder.clone()); - let mut encoder = self.encoder.clone(); let chunker = self.chunker.clone(); while Pin::new(&mut input).peek().await.is_some() { let socket = self.connector.connect_backoff().await; send_datagrams( &mut input, DatagramSocket::Udp(socket), - &self.transformer, - &mut encoder, &chunker, &self.bytes_sent, ) diff --git a/src/sinks/util/unix.rs b/src/sinks/util/unix.rs index cb9dadb934b3d..4ed914060171e 100644 --- a/src/sinks/util/unix.rs +++ b/src/sinks/util/unix.rs @@ -22,7 +22,7 @@ use vector_lib::{ json_size::JsonSize, }; -use super::datagram::{DatagramSocket, send_datagrams}; +use super::datagram::{DatagramSocket, encode_to_datagrams, send_datagrams}; use crate::{ codecs::Transformer, common::backoff::ExponentialBackoff, @@ -259,9 +259,8 @@ where async fn run_datagram(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { let bytes_sent = register!(BytesSent::from(Protocol::UNIX)); - let mut input = input.peekable(); + let mut input = encode_to_datagrams(input, self.transformer.clone(), self.encoder.clone()); - let mut encoder = self.encoder.clone(); while Pin::new(&mut input).peek().await.is_some() { let socket = match self.connector.connect_backoff().await { UnixEither::Datagram(datagram) => datagram, @@ -273,8 +272,6 @@ where send_datagrams( &mut input, DatagramSocket::Unix(socket, self.connector.path.clone()), - &self.transformer, - &mut encoder, &None, &bytes_sent, )