From 8dbfe95a9b873249d817c9aa4de39d001340f02a Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Mon, 6 Nov 2023 13:52:49 +0100 Subject: [PATCH] kafka sink: expose more rdkafka settings --- capture-server/tests/common.rs | 25 ++++++++++++----- capture/src/config.rs | 11 ++++++++ capture/src/server.rs | 4 +-- capture/src/sink.rs | 49 +++++++++++++++++++++------------- 4 files changed, 61 insertions(+), 28 deletions(-) diff --git a/capture-server/tests/common.rs b/capture-server/tests/common.rs index 40836ca..d4665cf 100644 --- a/capture-server/tests/common.rs +++ b/capture-server/tests/common.rs @@ -19,7 +19,7 @@ use rdkafka::{Message, TopicPartitionList}; use tokio::sync::Notify; use tracing::debug; -use capture::config::Config; +use capture::config::{Config, KafkaConfig}; use capture::server::serve; pub static DEFAULT_CONFIG: Lazy = Lazy::new(|| Config { @@ -27,9 +27,14 @@ pub static DEFAULT_CONFIG: Lazy = Lazy::new(|| Config { address: SocketAddr::from_str("127.0.0.1:0").unwrap(), export_prometheus: false, redis_url: "redis://localhost:6379/".to_string(), - kafka_hosts: "kafka:9092".to_string(), - kafka_topic: "events_plugin_ingestion".to_string(), - kafka_tls: false, + kafka: KafkaConfig { + kafka_producer_linger_ms: 0, // Send messages as soon as possible + kafka_producer_queue_mib: 10, + kafka_compression_codec: "none".to_string(), + kafka_hosts: "kafka:9092".to_string(), + kafka_topic: "events_plugin_ingestion".to_string(), + kafka_tls: false, + }, }); static TRACING_INIT: Once = Once::new(); @@ -48,7 +53,7 @@ pub struct ServerHandle { impl ServerHandle { pub fn for_topic(topic: &EphemeralTopic) -> Self { let mut config = DEFAULT_CONFIG.clone(); - config.kafka_topic = topic.topic_name().to_string(); + config.kafka.kafka_topic = topic.topic_name().to_string(); Self::for_config(config) } pub fn for_config(config: Config) -> Self { @@ -90,7 +95,10 @@ impl EphemeralTopic { pub async fn new() -> Self { let mut config = ClientConfig::new(); config.set("group.id", "capture_integration_tests"); - config.set("bootstrap.servers", DEFAULT_CONFIG.kafka_hosts.clone()); + config.set( + "bootstrap.servers", + DEFAULT_CONFIG.kafka.kafka_hosts.clone(), + ); config.set("debug", "all"); // TODO: check for name collision? @@ -151,7 +159,10 @@ impl Drop for EphemeralTopic { async fn delete_topic(topic: String) { let mut config = ClientConfig::new(); - config.set("bootstrap.servers", DEFAULT_CONFIG.kafka_hosts.clone()); + config.set( + "bootstrap.servers", + DEFAULT_CONFIG.kafka.kafka_hosts.clone(), + ); let admin = AdminClient::from_config(&config).expect("failed to create admin client"); admin .delete_topics(&[&topic], &AdminOptions::default()) diff --git a/capture/src/config.rs b/capture/src/config.rs index 6edf438..e3ea1e8 100644 --- a/capture/src/config.rs +++ b/capture/src/config.rs @@ -11,7 +11,18 @@ pub struct Config { pub redis_url: String, #[envconfig(default = "true")] pub export_prometheus: bool, + #[envconfig(nested = true)] + pub kafka: KafkaConfig, +} +#[derive(Envconfig, Clone)] +pub struct KafkaConfig { + #[envconfig(default = "20")] + pub kafka_producer_linger_ms: u32, // Maximum time between producer batches during low traffic + #[envconfig(default = "400")] + pub kafka_producer_queue_mib: u32, // Size of the in-memory producer queue in mebibytes + #[envconfig(default = "none")] + pub kafka_compression_codec: String, // none, gzip, snappy, lz4, zstd pub kafka_hosts: String, pub kafka_topic: String, #[envconfig(default = "false")] diff --git a/capture/src/server.rs b/capture/src/server.rs index bee579e..e4372ae 100644 --- a/capture/src/server.rs +++ b/capture/src/server.rs @@ -28,9 +28,7 @@ where config.export_prometheus, ) } else { - let sink = - sink::KafkaSink::new(config.kafka_topic, config.kafka_hosts, config.kafka_tls).unwrap(); - + let sink = sink::KafkaSink::new(config.kafka).unwrap(); router::router( crate::time::SystemTime {}, sink, diff --git a/capture/src/sink.rs b/capture/src/sink.rs index cc686ee..d6b60d8 100644 --- a/capture/src/sink.rs +++ b/capture/src/sink.rs @@ -1,16 +1,17 @@ -use async_trait::async_trait; -use metrics::{absolute_counter, counter, gauge, histogram}; use std::time::Duration; -use tokio::task::JoinSet; -use crate::api::CaptureError; -use rdkafka::config::{ClientConfig, FromClientConfigAndContext}; +use async_trait::async_trait; +use metrics::{absolute_counter, counter, gauge, histogram}; +use rdkafka::config::ClientConfig; use rdkafka::error::RDKafkaErrorCode; use rdkafka::producer::future_producer::{FutureProducer, FutureRecord}; use rdkafka::producer::Producer; use rdkafka::util::Timeout; -use tracing::info; +use tokio::task::JoinSet; +use tracing::{debug, info}; +use crate::api::CaptureError; +use crate::config::KafkaConfig; use crate::event::ProcessedEvent; use crate::prometheus::report_dropped_events; @@ -106,29 +107,41 @@ pub struct KafkaSink { } impl KafkaSink { - pub fn new(topic: String, brokers: String, tls: bool) -> anyhow::Result { - info!("connecting to Kafka brokers at {}...", brokers); - let mut config = ClientConfig::new(); - config - .set("bootstrap.servers", &brokers) - .set("statistics.interval.ms", "10000"); - - if tls { - config + pub fn new(config: KafkaConfig) -> anyhow::Result { + info!("connecting to Kafka brokers at {}...", config.kafka_hosts); + + let mut client_config = ClientConfig::new(); + client_config + .set("bootstrap.servers", &config.kafka_hosts) + .set("statistics.interval.ms", "10000") + .set("linger.ms", config.kafka_producer_linger_ms.to_string()) + .set("compression.codec", config.kafka_compression_codec) + .set( + "queue.buffering.max.kbytes", + (config.kafka_producer_queue_mib * 1024).to_string(), + ); + + if config.kafka_tls { + client_config .set("security.protocol", "ssl") .set("enable.ssl.certificate.verification", "false"); }; - let producer = FutureProducer::from_config_and_context(&config, KafkaContext)?; + debug!("rdkafka configuration: {:?}", client_config); + let producer: FutureProducer = + client_config.create_with_context(KafkaContext)?; - // Ping the cluster to make sure we can reach brokers + // Ping the cluster to make sure we can reach brokers, fail after 10 seconds _ = producer.client().fetch_metadata( Some("__consumer_offsets"), Timeout::After(Duration::new(10, 0)), )?; info!("connected to Kafka brokers"); - Ok(KafkaSink { producer, topic }) + Ok(KafkaSink { + producer, + topic: config.kafka_topic, + }) } }