diff --git a/Cargo.lock b/Cargo.lock index 37009f736e69a..5afeb4c275948 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5549,7 +5549,7 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.8.0", "libc", ] diff --git a/Cargo.toml b/Cargo.toml index 9dfa65d5b86ff..dea6c479d516b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -599,7 +599,7 @@ sources-eventstoredb_metrics = [] sources-exec = [] sources-file = ["vector-lib/file-source"] sources-file_descriptor = ["tokio-util/io"] -sources-fluent = ["dep:base64", "sources-utils-net-tcp", "tokio-util/net", "dep:rmpv", "dep:rmp-serde", "dep:serde_bytes"] +sources-fluent = ["dep:base64", "sources-utils-net-tcp", "sources-utils-net-unix", "tokio-util/net", "dep:rmpv", "dep:rmp-serde", "dep:serde_bytes"] sources-gcp_pubsub = ["gcp", "dep:h2", "dep:prost", "dep:prost-types", "protobuf-build", "dep:tonic"] sources-heroku_logs = ["sources-utils-http", "sources-utils-http-query", "sources-http_server"] sources-host_metrics = ["heim/cpu", "heim/host", "heim/memory", "heim/net"] diff --git a/src/sources/fluent/mod.rs b/src/sources/fluent/mod.rs index 47afb99ded16a..114dac05b30d0 100644 --- a/src/sources/fluent/mod.rs +++ b/src/sources/fluent/mod.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use std::io::{self, Read}; use std::net::SocketAddr; +use std::path::PathBuf; use std::time::Duration; use base64::prelude::{Engine as _, BASE64_STANDARD}; @@ -41,6 +42,35 @@ use self::message::{FluentEntry, FluentMessage, FluentRecord, FluentTag, FluentT #[configurable_component(source("fluent", "Collect logs from a Fluentd or Fluent Bit agent."))] #[derive(Clone, Debug)] pub struct FluentConfig { + #[serde(flatten)] + mode: FluentMode, + + /// The namespace to use for logs. This overrides the global setting. + #[configurable(metadata(docs::hidden))] + #[serde(default)] + log_namespace: Option, +} + +/// Listening mode for the `fluent` source. +#[configurable_component] +#[derive(Clone, Debug)] +#[serde(untagged, rename_all = "snake_case")] +#[configurable(metadata(docs::enum_tag_description = "The type of socket to use."))] +#[allow(clippy::large_enum_variant)] // just used for configuration +pub enum FluentMode { + /// Listen on TCP port + Tcp(FluentTcpConfig), + + /// Listen on unix stream socket + #[cfg(unix)] + Unix(FluentUnixConfig), +} + +/// Configuration for the `fluent` TCP source. +#[configurable_component] +#[derive(Clone, Debug)] +#[serde(deny_unknown_fields)] +pub struct FluentTcpConfig { #[configurable(derived)] address: SocketListenAddr, @@ -67,34 +97,14 @@ pub struct FluentConfig { #[configurable(derived)] #[serde(default, deserialize_with = "bool_or_struct")] acknowledgements: SourceAcknowledgementsConfig, - - /// The namespace to use for logs. This overrides the global setting. - #[configurable(metadata(docs::hidden))] - #[serde(default)] - log_namespace: Option, } -impl GenerateConfig for FluentConfig { - fn generate_config() -> toml::Value { - toml::Value::try_from(Self { - address: SocketListenAddr::SocketAddr("0.0.0.0:24224".parse().unwrap()), - keepalive: None, - permit_origin: None, - tls: None, - receive_buffer_bytes: None, - acknowledgements: Default::default(), - connection_limit: Some(2), - log_namespace: None, - }) - .unwrap() - } -} - -#[async_trait::async_trait] -#[typetag::serde(name = "fluent")] -impl SourceConfig for FluentConfig { - async fn build(&self, cx: SourceContext) -> crate::Result { - let log_namespace = cx.log_namespace(self.log_namespace); +impl FluentTcpConfig { + fn build( + &self, + cx: SourceContext, + log_namespace: LogNamespace, + ) -> crate::Result { let source = FluentSource::new(log_namespace); let shutdown_secs = Duration::from_secs(30); let tls_config = self.tls.as_ref().map(|tls| tls.tls_config.clone()); @@ -120,6 +130,79 @@ impl SourceConfig for FluentConfig { log_namespace, ) } +} + +/// Configuration for the `fluent` unix socket source. +#[configurable_component] +#[derive(Clone, Debug)] +#[serde(deny_unknown_fields)] +#[cfg(unix)] +pub struct FluentUnixConfig { + /// The Unix socket path. + /// + /// This should be an absolute path. + #[configurable(metadata(docs::examples = "/path/to/socket"))] + pub path: PathBuf, + + /// Unix file mode bits to be applied to the unix socket file as its designated file permissions. + /// + /// Note: The file mode value can be specified in any numeric format supported by your configuration + /// language, but it is most intuitive to use an octal number. + #[configurable(metadata(docs::examples = 0o777))] + #[configurable(metadata(docs::examples = 0o600))] + #[configurable(metadata(docs::examples = 508))] + pub socket_file_mode: Option, +} + +#[cfg(unix)] +impl FluentUnixConfig { + fn build( + &self, + cx: SourceContext, + log_namespace: LogNamespace, + ) -> crate::Result { + let source = FluentSource::new(log_namespace); + + crate::sources::util::build_unix_stream_source( + self.path.clone(), + self.socket_file_mode, + source.decoder(), + move |events, host| source.handle_events_impl(events, host.into()), + cx.shutdown, + cx.out, + ) + } +} + +impl GenerateConfig for FluentConfig { + fn generate_config() -> toml::Value { + toml::Value::try_from(Self { + mode: FluentMode::Tcp(FluentTcpConfig { + address: SocketListenAddr::SocketAddr("0.0.0.0:24224".parse().unwrap()), + keepalive: None, + permit_origin: None, + tls: None, + receive_buffer_bytes: None, + acknowledgements: Default::default(), + connection_limit: Some(2), + }), + log_namespace: None, + }) + .unwrap() + } +} + +#[async_trait::async_trait] +#[typetag::serde(name = "fluent")] +impl SourceConfig for FluentConfig { + async fn build(&self, cx: SourceContext) -> crate::Result { + let log_namespace = cx.log_namespace(self.log_namespace); + match &self.mode { + FluentMode::Tcp(t) => t.build(cx, log_namespace), + #[cfg(unix)] + FluentMode::Unix(u) => u.build(cx, log_namespace), + } + } fn outputs(&self, global_log_namespace: LogNamespace) -> Vec { let log_namespace = global_log_namespace.merge(self.log_namespace); @@ -132,11 +215,15 @@ impl SourceConfig for FluentConfig { } fn resources(&self) -> Vec { - vec![self.address.as_tcp_resource()] + match &self.mode { + FluentMode::Tcp(tcp) => vec![tcp.address.as_tcp_resource()], + #[cfg(unix)] + FluentMode::Unix(_) => vec![], + } } fn can_acknowledge(&self) -> bool { - true + matches!(self.mode, FluentMode::Tcp(_)) } } @@ -151,12 +238,16 @@ impl FluentConfig { let tag_key = parse_value_path("tag").ok().map(LegacyKey::Overwrite); - let tls_client_metadata_path = self - .tls - .as_ref() - .and_then(|tls| tls.client_metadata_key.as_ref()) - .and_then(|k| k.path.clone()) - .map(LegacyKey::Overwrite); + let tls_client_metadata_path = match &self.mode { + FluentMode::Tcp(tcp) => tcp + .tls + .as_ref() + .and_then(|tls| tls.client_metadata_key.as_ref()) + .and_then(|k| k.path.clone()) + .map(LegacyKey::Overwrite), + #[cfg(unix)] + FluentMode::Unix(_) => None, + }; // There is a global and per-source `log_namespace` config. // The source config overrides the global setting and is merged here. @@ -222,19 +313,8 @@ impl FluentSource { legacy_host_key_path: log_schema().host_key().cloned(), } } -} - -impl TcpSource for FluentSource { - type Error = DecodeError; - type Item = FluentFrame; - type Decoder = FluentDecoder; - type Acker = FluentAcker; - fn decoder(&self) -> Self::Decoder { - FluentDecoder::new(self.log_namespace) - } - - fn handle_events(&self, events: &mut [Event], host: SocketAddr) { + fn handle_events_impl(&self, events: &mut [Event], host: Value) { for event in events { let log = event.as_mut_log(); @@ -248,10 +328,25 @@ impl TcpSource for FluentSource { log, legacy_host_key, path!("host"), - host.ip().to_string(), + host.clone(), ); } } +} + +impl TcpSource for FluentSource { + type Error = DecodeError; + type Item = FluentFrame; + type Decoder = FluentDecoder; + type Acker = FluentAcker; + + fn decoder(&self) -> Self::Decoder { + FluentDecoder::new(self.log_namespace) + } + + fn handle_events(&self, events: &mut [Event], host: SocketAddr) { + self.handle_events_impl(events, host.ip().to_string().into()) + } fn build_acker(&self, frame: &[Self::Item]) -> Self::Acker { FluentAcker::new(frame) @@ -304,7 +399,7 @@ impl From for DecodeError { } } -#[derive(Debug)] +#[derive(Debug, Clone)] struct FluentDecoder { log_namespace: LogNamespace, } @@ -899,13 +994,15 @@ mod tests { let (sender, recv) = SourceSender::new_test_finalize(status); let address = next_addr(); let source = FluentConfig { - address: address.into(), - tls: None, - keepalive: None, - permit_origin: None, - receive_buffer_bytes: None, - acknowledgements: true.into(), - connection_limit: None, + mode: FluentMode::Tcp(FluentTcpConfig { + address: address.into(), + tls: None, + keepalive: None, + permit_origin: None, + receive_buffer_bytes: None, + acknowledgements: true.into(), + connection_limit: None, + }), log_namespace: None, } .build(SourceContext::new_test(sender, None)) @@ -964,13 +1061,15 @@ mod tests { #[test] fn output_schema_definition_vector_namespace() { let config = FluentConfig { - address: SocketListenAddr::SocketAddr("0.0.0.0:24224".parse().unwrap()), - tls: None, - keepalive: None, - permit_origin: None, - receive_buffer_bytes: None, - acknowledgements: false.into(), - connection_limit: None, + mode: FluentMode::Tcp(FluentTcpConfig { + address: SocketListenAddr::SocketAddr("0.0.0.0:24224".parse().unwrap()), + tls: None, + keepalive: None, + permit_origin: None, + receive_buffer_bytes: None, + acknowledgements: false.into(), + connection_limit: None, + }), log_namespace: Some(true), }; @@ -1020,13 +1119,15 @@ mod tests { #[test] fn output_schema_definition_legacy_namespace() { let config = FluentConfig { - address: SocketListenAddr::SocketAddr("0.0.0.0:24224".parse().unwrap()), - tls: None, - keepalive: None, - permit_origin: None, - receive_buffer_bytes: None, - acknowledgements: false.into(), - connection_limit: None, + mode: FluentMode::Tcp(FluentTcpConfig { + address: SocketListenAddr::SocketAddr("0.0.0.0:24224".parse().unwrap()), + tls: None, + keepalive: None, + permit_origin: None, + receive_buffer_bytes: None, + acknowledgements: false.into(), + connection_limit: None, + }), log_namespace: None, }; @@ -1062,6 +1163,7 @@ mod integration_tests { use tokio::time::sleep; use vector_lib::event::{Event, EventStatus}; + use crate::sources::fluent::{FluentMode, FluentTcpConfig}; use crate::{ config::{SourceConfig, SourceContext}, docker::Container, @@ -1242,13 +1344,15 @@ mod integration_tests { let address = next_addr_for_ip(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED)); tokio::spawn(async move { FluentConfig { - address: address.into(), - tls: None, - keepalive: None, - permit_origin: None, - receive_buffer_bytes: None, - acknowledgements: false.into(), - connection_limit: None, + mode: FluentMode::Tcp(FluentTcpConfig { + address: address.into(), + tls: None, + keepalive: None, + permit_origin: None, + receive_buffer_bytes: None, + acknowledgements: false.into(), + connection_limit: None, + }), log_namespace: None, } .build(SourceContext::new_test(sender, None)) diff --git a/src/sources/util/unix_stream.rs b/src/sources/util/unix_stream.rs index a08bf59b9de19..7c1f5d151ae20 100644 --- a/src/sources/util/unix_stream.rs +++ b/src/sources/util/unix_stream.rs @@ -2,6 +2,7 @@ use std::{fs::remove_file, path::PathBuf, time::Duration}; use bytes::Bytes; use futures::{FutureExt, StreamExt}; +use smallvec::SmallVec; use tokio::{ io::AsyncWriteExt, net::{UnixListener, UnixStream}, @@ -17,7 +18,6 @@ use vector_lib::EstimatedJsonEncodedSizeOf; use super::AfterReadExt; use crate::{ async_read::VecAsyncReadExt, - codecs::Decoder, event::Event, internal_events::{ ConnectionOpen, OpenGauge, SocketEventsReceived, SocketMode, StreamClosedError, @@ -34,14 +34,19 @@ use crate::{ /// Passing in different functions for `decoder` and `handle_events` can allow /// for different source-specific logic (such as decoding syslog messages in the /// syslog source). -pub fn build_unix_stream_source( +pub fn build_unix_stream_source( listen_path: PathBuf, socket_file_mode: Option, - decoder: Decoder, + decoder: D, handle_events: impl Fn(&mut [Event], Option) + Clone + Send + Sync + 'static, shutdown: ShutdownSignal, out: SourceSender, -) -> crate::Result { +) -> crate::Result +where + D: tokio_util::codec::Decoder + Clone + Send + 'static, + E: StreamDecodingError + std::fmt::Display + Send, + F: Into> + Send, +{ Ok(Box::pin(async move { let listener = UnixListener::bind(&listen_path).unwrap_or_else(|e| { panic!( @@ -108,7 +113,9 @@ pub fn build_unix_stream_source( while let Some(result) = stream.next().await { match result { - Ok((mut events, _byte_size)) => { + Ok((frame, _byte_size)) => { + let mut events = frame.into(); + emit!(SocketEventsReceived { mode: SocketMode::Unix, byte_size: events.estimated_json_encoded_size_of(),