Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement(fluent source): Add support for forwarding over unix socket #22212

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ sources-eventstoredb_metrics = []
sources-exec = []
sources-file = ["vector-lib/file-source"]
sources-file_descriptor = ["tokio-util/io"]
sources-fluent = ["dep:base64", "sources-utils-net-tcp", "tokio-util/net", "dep:rmpv", "dep:rmp-serde", "dep:serde_bytes"]
sources-fluent = ["dep:base64", "sources-utils-net-tcp", "sources-utils-net-unix", "tokio-util/net", "dep:rmpv", "dep:rmp-serde", "dep:serde_bytes"]
sources-gcp_pubsub = ["gcp", "dep:h2", "dep:prost", "dep:prost-types", "protobuf-build", "dep:tonic"]
sources-heroku_logs = ["sources-utils-http", "sources-utils-http-query", "sources-http_server"]
sources-host_metrics = ["heim/cpu", "heim/host", "heim/memory", "heim/net"]
Expand Down
256 changes: 180 additions & 76 deletions src/sources/fluent/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::HashMap;
use std::io::{self, Read};
use std::net::SocketAddr;
use std::path::PathBuf;
use std::time::Duration;

use base64::prelude::{Engine as _, BASE64_STANDARD};
Expand Down Expand Up @@ -41,6 +42,35 @@ use self::message::{FluentEntry, FluentMessage, FluentRecord, FluentTag, FluentT
#[configurable_component(source("fluent", "Collect logs from a Fluentd or Fluent Bit agent."))]
#[derive(Clone, Debug)]
pub struct FluentConfig {
#[serde(flatten)]
mode: FluentMode,
pront marked this conversation as resolved.
Show resolved Hide resolved

/// The namespace to use for logs. This overrides the global setting.
#[configurable(metadata(docs::hidden))]
#[serde(default)]
log_namespace: Option<bool>,
}

/// Listening mode for the `fluent` source.
#[configurable_component]
#[derive(Clone, Debug)]
#[serde(untagged, rename_all = "snake_case")]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unlike the socket data source I used untagged, to avoid this being a breaking change requiring introducing a "mode" tag. As the fields path vs address are unlikely to ever overlap, I think this is probably fine

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any thoughts on this? cc @jszwedko

Copy link
Member

@jszwedko jszwedko Jan 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest we maintain compatibility while also moving towards the desired configuration schema. That is, we add mode but default it to tcp so that if users only specify address it continues to work. If users want to use a unix socket, they should specify mode = ... and then also path. I'd suggest we also call this mode unix_stream to match recent changes to the socket sink.

Ideally we'd go further and start to evolve the configuration into what is described in the configuration spec. This would mean configuration would look like:

mode:
  type: tcp
  tcp:
    address: "localhost:1234"

But I can see an argument for not going that far just yet since it would make it inconsistent with the existing configuration for the socket source/sink. So I'd be happy to just see mode added.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Jesse, I am happy with both recommendations i.e. I am OK with diverging from socket as long as an example is provided.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps you could advise on how to achieve this, I'm not aware of a way to add a serde tag attribute with a default - https://serde.rs/enum-representations.html#internally-tagged

Copy link
Member

@pront pront Jan 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be the easiest way to do this way: https://github.com/vectordotdev/vector/blob/master/src/sources/socket/mod.rs#L20-L48

Which will look like:

mode: tcp
address: localhost:123

Jesse shared this with me: serde-rs/serde#2231. I guess the ideal way will need some custom deserialization.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be the easiest way to do this way: https://github.com/vectordotdev/vector/blob/master/src/sources/socket/mod.rs#L20-L48

That would force mode to be specified, as linked in the serde issue, and would therefore be a breaking change.

I could manually implement Serialize but this would likely be a fairly chunky piece of code...

Just let me know how you would like to proceed, I'm not sure atm what is being asked for

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ideally we'd make this change in a non-breaking fashion while implementing the config UX we want. That is, allow for:

address: localhost:1234

to continue to work and default to tcp while we add the mode option so that:

mode: tcp
address: localhost:1234
mode: unix_streams
path: /dev/socket/foo

Also works.

To your point, I'm not seeing a way to default mode using an adjacently tagged enum in serde so I think the options are:

  1. Write a custom deserializer for the enum (I'm not 100% sure what this would look like or if it is actually possible)
  2. Don't use an enum to model the config, but instead have separate mode,address, and path options. When initializing the component, check mode, defaulting to tcp, and then look for the appropriate secondary field (address or path), raising an error if it isn't set
  3. Make this a breaking change

I think path (2) would be best since it maintains compatibility and is only slightly hacky to implement. What do you think?

#[configurable(metadata(docs::enum_tag_description = "The type of socket to use."))]
#[allow(clippy::large_enum_variant)] // just used for configuration
pub enum FluentMode {
/// Listen on TCP port
Tcp(FluentTcpConfig),

/// Listen on unix stream socket
#[cfg(unix)]
Unix(FluentUnixConfig),
}

/// Configuration for the `fluent` TCP source.
#[configurable_component]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields)]
pub struct FluentTcpConfig {
#[configurable(derived)]
address: SocketListenAddr,

Expand All @@ -67,34 +97,14 @@ pub struct FluentConfig {
#[configurable(derived)]
#[serde(default, deserialize_with = "bool_or_struct")]
acknowledgements: SourceAcknowledgementsConfig,

/// The namespace to use for logs. This overrides the global setting.
#[configurable(metadata(docs::hidden))]
#[serde(default)]
log_namespace: Option<bool>,
}

impl GenerateConfig for FluentConfig {
fn generate_config() -> toml::Value {
toml::Value::try_from(Self {
address: SocketListenAddr::SocketAddr("0.0.0.0:24224".parse().unwrap()),
keepalive: None,
permit_origin: None,
tls: None,
receive_buffer_bytes: None,
acknowledgements: Default::default(),
connection_limit: Some(2),
log_namespace: None,
})
.unwrap()
}
}

#[async_trait::async_trait]
#[typetag::serde(name = "fluent")]
impl SourceConfig for FluentConfig {
async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
let log_namespace = cx.log_namespace(self.log_namespace);
impl FluentTcpConfig {
fn build(
&self,
cx: SourceContext,
log_namespace: LogNamespace,
) -> crate::Result<super::Source> {
let source = FluentSource::new(log_namespace);
let shutdown_secs = Duration::from_secs(30);
let tls_config = self.tls.as_ref().map(|tls| tls.tls_config.clone());
Expand All @@ -120,6 +130,79 @@ impl SourceConfig for FluentConfig {
log_namespace,
)
}
}

/// Configuration for the `fluent` unix socket source.
#[configurable_component]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields)]
#[cfg(unix)]
pub struct FluentUnixConfig {
/// The Unix socket path.
///
/// This should be an absolute path.
#[configurable(metadata(docs::examples = "/path/to/socket"))]
pub path: PathBuf,

/// Unix file mode bits to be applied to the unix socket file as its designated file permissions.
///
/// Note: The file mode value can be specified in any numeric format supported by your configuration
/// language, but it is most intuitive to use an octal number.
#[configurable(metadata(docs::examples = 0o777))]
#[configurable(metadata(docs::examples = 0o600))]
#[configurable(metadata(docs::examples = 508))]
pub socket_file_mode: Option<u32>,
}

#[cfg(unix)]
impl FluentUnixConfig {
fn build(
&self,
cx: SourceContext,
log_namespace: LogNamespace,
) -> crate::Result<super::Source> {
let source = FluentSource::new(log_namespace);

crate::sources::util::build_unix_stream_source(
self.path.clone(),
self.socket_file_mode,
source.decoder(),
move |events, host| source.handle_events_impl(events, host.into()),
cx.shutdown,
cx.out,
)
}
}

impl GenerateConfig for FluentConfig {
fn generate_config() -> toml::Value {
toml::Value::try_from(Self {
mode: FluentMode::Tcp(FluentTcpConfig {
address: SocketListenAddr::SocketAddr("0.0.0.0:24224".parse().unwrap()),
keepalive: None,
permit_origin: None,
tls: None,
receive_buffer_bytes: None,
acknowledgements: Default::default(),
connection_limit: Some(2),
}),
log_namespace: None,
})
.unwrap()
}
}

#[async_trait::async_trait]
#[typetag::serde(name = "fluent")]
impl SourceConfig for FluentConfig {
async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
let log_namespace = cx.log_namespace(self.log_namespace);
match &self.mode {
FluentMode::Tcp(t) => t.build(cx, log_namespace),
#[cfg(unix)]
FluentMode::Unix(u) => u.build(cx, log_namespace),
}
}

fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
let log_namespace = global_log_namespace.merge(self.log_namespace);
Expand All @@ -132,11 +215,15 @@ impl SourceConfig for FluentConfig {
}

fn resources(&self) -> Vec<Resource> {
vec![self.address.as_tcp_resource()]
match &self.mode {
FluentMode::Tcp(tcp) => vec![tcp.address.as_tcp_resource()],
#[cfg(unix)]
FluentMode::Unix(_) => vec![],
}
}

fn can_acknowledge(&self) -> bool {
true
matches!(self.mode, FluentMode::Tcp(_))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really understand what this does, I just matched the socket source

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please share a link to the code you are referring to?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This basically determines if this source supports acknowledgments. This is currently true. And it can stay as true if the new mode also supports acks.

https://vector.dev/docs/about/under-the-hood/architecture/end-to-end-acknowledgements/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the new mode does, it behaves similarly to the socket source which also does not

}
}

Expand All @@ -151,12 +238,16 @@ impl FluentConfig {

let tag_key = parse_value_path("tag").ok().map(LegacyKey::Overwrite);

let tls_client_metadata_path = self
.tls
.as_ref()
.and_then(|tls| tls.client_metadata_key.as_ref())
.and_then(|k| k.path.clone())
.map(LegacyKey::Overwrite);
let tls_client_metadata_path = match &self.mode {
FluentMode::Tcp(tcp) => tcp
.tls
.as_ref()
.and_then(|tls| tls.client_metadata_key.as_ref())
.and_then(|k| k.path.clone())
.map(LegacyKey::Overwrite),
#[cfg(unix)]
FluentMode::Unix(_) => None,
};

// There is a global and per-source `log_namespace` config.
// The source config overrides the global setting and is merged here.
Expand Down Expand Up @@ -222,19 +313,8 @@ impl FluentSource {
legacy_host_key_path: log_schema().host_key().cloned(),
}
}
}

impl TcpSource for FluentSource {
type Error = DecodeError;
type Item = FluentFrame;
type Decoder = FluentDecoder;
type Acker = FluentAcker;

fn decoder(&self) -> Self::Decoder {
FluentDecoder::new(self.log_namespace)
}

fn handle_events(&self, events: &mut [Event], host: SocketAddr) {
fn handle_events_impl(&self, events: &mut [Event], host: Value) {
for event in events {
let log = event.as_mut_log();

Expand All @@ -248,10 +328,25 @@ impl TcpSource for FluentSource {
log,
legacy_host_key,
path!("host"),
host.ip().to_string(),
host.clone(),
);
}
}
}

impl TcpSource for FluentSource {
type Error = DecodeError;
type Item = FluentFrame;
type Decoder = FluentDecoder;
type Acker = FluentAcker;

fn decoder(&self) -> Self::Decoder {
FluentDecoder::new(self.log_namespace)
}

fn handle_events(&self, events: &mut [Event], host: SocketAddr) {
self.handle_events_impl(events, host.ip().to_string().into())
}

fn build_acker(&self, frame: &[Self::Item]) -> Self::Acker {
FluentAcker::new(frame)
Expand Down Expand Up @@ -304,7 +399,7 @@ impl From<decode::Error> for DecodeError {
}
}

#[derive(Debug)]
#[derive(Debug, Clone)]
struct FluentDecoder {
log_namespace: LogNamespace,
}
Expand Down Expand Up @@ -899,13 +994,15 @@ mod tests {
let (sender, recv) = SourceSender::new_test_finalize(status);
let address = next_addr();
let source = FluentConfig {
address: address.into(),
tls: None,
keepalive: None,
permit_origin: None,
receive_buffer_bytes: None,
acknowledgements: true.into(),
connection_limit: None,
mode: FluentMode::Tcp(FluentTcpConfig {
address: address.into(),
tls: None,
keepalive: None,
permit_origin: None,
receive_buffer_bytes: None,
acknowledgements: true.into(),
connection_limit: None,
}),
log_namespace: None,
}
.build(SourceContext::new_test(sender, None))
Expand Down Expand Up @@ -964,13 +1061,15 @@ mod tests {
#[test]
fn output_schema_definition_vector_namespace() {
let config = FluentConfig {
address: SocketListenAddr::SocketAddr("0.0.0.0:24224".parse().unwrap()),
tls: None,
keepalive: None,
permit_origin: None,
receive_buffer_bytes: None,
acknowledgements: false.into(),
connection_limit: None,
mode: FluentMode::Tcp(FluentTcpConfig {
address: SocketListenAddr::SocketAddr("0.0.0.0:24224".parse().unwrap()),
tls: None,
keepalive: None,
permit_origin: None,
receive_buffer_bytes: None,
acknowledgements: false.into(),
connection_limit: None,
}),
log_namespace: Some(true),
};

Expand Down Expand Up @@ -1020,13 +1119,15 @@ mod tests {
#[test]
fn output_schema_definition_legacy_namespace() {
let config = FluentConfig {
address: SocketListenAddr::SocketAddr("0.0.0.0:24224".parse().unwrap()),
tls: None,
keepalive: None,
permit_origin: None,
receive_buffer_bytes: None,
acknowledgements: false.into(),
connection_limit: None,
mode: FluentMode::Tcp(FluentTcpConfig {
address: SocketListenAddr::SocketAddr("0.0.0.0:24224".parse().unwrap()),
tls: None,
keepalive: None,
permit_origin: None,
receive_buffer_bytes: None,
acknowledgements: false.into(),
connection_limit: None,
}),
log_namespace: None,
};

Expand Down Expand Up @@ -1062,6 +1163,7 @@ mod integration_tests {
use tokio::time::sleep;
use vector_lib::event::{Event, EventStatus};

use crate::sources::fluent::{FluentMode, FluentTcpConfig};
use crate::{
config::{SourceConfig, SourceContext},
docker::Container,
Expand Down Expand Up @@ -1242,13 +1344,15 @@ mod integration_tests {
let address = next_addr_for_ip(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED));
tokio::spawn(async move {
FluentConfig {
address: address.into(),
tls: None,
keepalive: None,
permit_origin: None,
receive_buffer_bytes: None,
acknowledgements: false.into(),
connection_limit: None,
mode: FluentMode::Tcp(FluentTcpConfig {
address: address.into(),
tls: None,
keepalive: None,
permit_origin: None,
receive_buffer_bytes: None,
acknowledgements: false.into(),
connection_limit: None,
}),
log_namespace: None,
}
.build(SourceContext::new_test(sender, None))
Expand Down
Loading