Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/udp_datagram_reconnect_on_send_failure.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
UDP and Unix datagram sinks now reconnect and retry the failed event after a socket send failure instead of silently dropping all remaining events on the broken socket.

authors: thomasqueirozb
146 changes: 116 additions & 30 deletions src/sinks/util/datagram.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use bytes::BytesMut;
use std::pin::Pin;

use bytes::{Bytes, BytesMut};
use futures::{StreamExt, stream::BoxStream};
use futures_util::stream::Peekable;
#[cfg(unix)]
Expand All @@ -16,7 +18,7 @@ use vector_lib::{
use crate::internal_events::{UnixSendIncompleteError, UnixSocketSendError};
use crate::{
codecs::Transformer,
event::{Event, EventStatus, Finalizable},
event::{Event, EventFinalizers, EventStatus, Finalizable},
internal_events::{
SocketEventsSent, SocketMode, SocketSendError, UdpChunkingError, UdpSendIncompleteError,
},
Expand All @@ -28,54 +30,133 @@ pub enum DatagramSocket {
Unix(UnixDatagram, PathBuf),
}

pub async fn send_datagrams<E: Encoder<Event, Error = vector_lib::codecs::encoding::Error>>(
input: &mut Peekable<BoxStream<'_, Event>>,
/// A pre-encoded datagram ready to be sent over the socket.
pub struct EncodedDatagram {
/// The encoded bytes to send (`None` if encoding failed).
pub bytes: Option<Bytes>,
pub finalizers: EventFinalizers,
}

enum SendOutcome {
/// Datagram was successfully sent.
Delivered,
/// Per-event error that reconnecting the socket cannot fix (e.g. EMSGSIZE,
/// EDESTADDRREQ). Drop the event and move on.
UnrecoverableError,
/// Socket-level error that may be resolved by reconnecting.
SocketError,
}

/// Returns `true` only for errors that are known to be transient socket-level
/// failures where reconnecting may succeed. All other errors — including
/// EDESTADDRREQ (os error 89), EMSGSIZE, and any unknown error — are treated as
/// non-recoverable so the stream can drop the event and make progress.
fn is_recoverable_socket_error(error: &std::io::Error) -> bool {
use std::io::ErrorKind;
matches!(
error.kind(),
ErrorKind::ConnectionRefused
| ErrorKind::ConnectionReset
| ErrorKind::ConnectionAborted
| ErrorKind::BrokenPipe
| ErrorKind::NetworkDown
| ErrorKind::HostUnreachable
| ErrorKind::NetworkUnreachable
| ErrorKind::TimedOut
| ErrorKind::Interrupted
)
}

/// Transforms and encodes a raw event stream into a stream of [`EncodedDatagram`]s
/// ready to be passed to [`send_datagrams`].
pub fn encode_to_datagrams<'a, E>(
input: BoxStream<'a, Event>,
transformer: Transformer,
mut encoder: E,
) -> Peekable<BoxStream<'a, EncodedDatagram>>
where
E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Send + 'a,
{
input
.map(move |mut event| {
transformer.transform(&mut event);
let finalizers = event.take_finalizers();
let mut bytes = BytesMut::new();
let bytes = if encoder.encode(event, &mut bytes).is_ok() {
Some(bytes.freeze())
} else {
None
};
EncodedDatagram { bytes, finalizers }
})
.boxed()
.peekable()
}

pub async fn send_datagrams(
input: &mut Peekable<BoxStream<'_, EncodedDatagram>>,
mut socket: DatagramSocket,
transformer: &Transformer,
encoder: &mut E,
chunker: &Option<Chunker>,
bytes_sent: &<BytesSent as RegisterInternalEvent>::Handle,
) {
while let Some(mut event) = input.next().await {
transformer.transform(&mut event);
let finalizers = event.take_finalizers();
let mut bytes = BytesMut::new();
loop {
// Peek without consuming so the event can be retried after reconnection.
// Clone the bytes (ref-counted, cheap) to release the borrow on `input`.
let Some(datagram) = Pin::new(&mut *input).peek().await else {
break;
};
let bytes = datagram.bytes.clone();

// Errors are handled by `Encoder`.
if encoder.encode(event, &mut bytes).is_err() {
finalizers.update_status(EventStatus::Errored);
let Some(bytes) = bytes else {
// Encoding failed earlier — consume and mark errored.
if let Some(datagram) = input.next().await {
datagram.finalizers.update_status(EventStatus::Errored);
}
continue;
}
};

let delivered = if let Some(chunker) = chunker {
let outcome = if let Some(chunker) = chunker {
let data_size = bytes.len();
match chunker.chunk(bytes.freeze()) {
match chunker.chunk(bytes) {
Ok(chunks) => {
let mut chunks_delivered = true;
for bytes in chunks {
if !send_and_emit(&mut socket, &bytes, bytes_sent).await {
chunks_delivered = false;
let mut result = SendOutcome::Delivered;
for chunk in chunks {
result = send_and_emit(&mut socket, &chunk, bytes_sent).await;
if !matches!(result, SendOutcome::Delivered) {
break;
}
}
chunks_delivered
result
}
Err(err) => {
emit!(UdpChunkingError {
data_size,
error: err
});
false
SendOutcome::UnrecoverableError
}
}
} else {
send_and_emit(&mut socket, &bytes.freeze(), bytes_sent).await
send_and_emit(&mut socket, &bytes, bytes_sent).await
};

if delivered {
finalizers.update_status(EventStatus::Delivered);
} else {
finalizers.update_status(EventStatus::Errored);
match outcome {
SendOutcome::Delivered => {
if let Some(datagram) = input.next().await {
datagram.finalizers.update_status(EventStatus::Delivered);
}
}
SendOutcome::SocketError => {
// Leave item in stream for retry after reconnection.
break;
}
SendOutcome::UnrecoverableError => {
// Per-event or permanent error — consume and mark errored so the
// stream can make progress rather than retrying forever.
if let Some(datagram) = input.next().await {
datagram.finalizers.update_status(EventStatus::Errored);
}
}
}
}
}
Expand All @@ -84,7 +165,7 @@ async fn send_and_emit(
socket: &mut DatagramSocket,
bytes: &bytes::Bytes,
bytes_sent: &<BytesSent as RegisterInternalEvent>::Handle,
) -> bool {
) -> SendOutcome {
match send_datagram(socket, bytes).await {
Ok(()) => {
emit!(SocketEventsSent {
Expand All @@ -97,9 +178,10 @@ async fn send_and_emit(
byte_size: bytes.len().into(),
});
bytes_sent.emit(ByteSize(bytes.len()));
true
SendOutcome::Delivered
}
Err(error) => {
let recoverable = is_recoverable_socket_error(&error);
match socket {
DatagramSocket::Udp(_) => emit!(SocketSendError {
mode: SocketMode::Udp,
Expand All @@ -113,7 +195,11 @@ async fn send_and_emit(
})
}
};
false
if recoverable {
SendOutcome::SocketError
} else {
SendOutcome::UnrecoverableError
}
}
}
}
Expand Down
9 changes: 3 additions & 6 deletions src/sinks/util/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
};

use async_trait::async_trait;
use futures::{FutureExt, StreamExt, stream::BoxStream};
use futures::{FutureExt, stream::BoxStream};
use snafu::{ResultExt, Snafu};
use tokio::{net::UdpSocket, time::sleep};
use tokio_util::codec::Encoder;
Expand All @@ -16,7 +16,7 @@ use vector_lib::{

use super::{
SinkBuildError,
datagram::{DatagramSocket, send_datagrams},
datagram::{DatagramSocket, encode_to_datagrams, send_datagrams},
};
use crate::{
codecs::Transformer,
Expand Down Expand Up @@ -198,17 +198,14 @@ where
E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync,
{
async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
let mut input = input.peekable();
let mut input = encode_to_datagrams(input, self.transformer.clone(), self.encoder.clone());

let mut encoder = self.encoder.clone();
let chunker = self.chunker.clone();
while Pin::new(&mut input).peek().await.is_some() {
let socket = self.connector.connect_backoff().await;
send_datagrams(
&mut input,
DatagramSocket::Udp(socket),
&self.transformer,
&mut encoder,
&chunker,
&self.bytes_sent,
)
Expand Down
7 changes: 2 additions & 5 deletions src/sinks/util/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use vector_lib::{
json_size::JsonSize,
};

use super::datagram::{DatagramSocket, send_datagrams};
use super::datagram::{DatagramSocket, encode_to_datagrams, send_datagrams};
use crate::{
codecs::Transformer,
common::backoff::ExponentialBackoff,
Expand Down Expand Up @@ -259,9 +259,8 @@ where

async fn run_datagram(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
let bytes_sent = register!(BytesSent::from(Protocol::UNIX));
let mut input = input.peekable();
let mut input = encode_to_datagrams(input, self.transformer.clone(), self.encoder.clone());

let mut encoder = self.encoder.clone();
while Pin::new(&mut input).peek().await.is_some() {
let socket = match self.connector.connect_backoff().await {
UnixEither::Datagram(datagram) => datagram,
Expand All @@ -273,8 +272,6 @@ where
send_datagrams(
&mut input,
DatagramSocket::Unix(socket, self.connector.path.clone()),
&self.transformer,
&mut encoder,
&None,
&bytes_sent,
)
Expand Down
Loading