Skip to content
This repository was archived by the owner on Feb 8, 2024. It is now read-only.

Commit 8f60032

Browse files
authored
kafka sink: expose more rdkafka settings (#46)
1 parent bcc4549 commit 8f60032

File tree

4 files changed

+61
-28
lines changed

4 files changed

+61
-28
lines changed

capture-server/tests/common.rs

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,22 @@ use rdkafka::{Message, TopicPartitionList};
1919
use tokio::sync::Notify;
2020
use tracing::debug;
2121

22-
use capture::config::Config;
22+
use capture::config::{Config, KafkaConfig};
2323
use capture::server::serve;
2424

2525
pub static DEFAULT_CONFIG: Lazy<Config> = Lazy::new(|| Config {
2626
print_sink: false,
2727
address: SocketAddr::from_str("127.0.0.1:0").unwrap(),
2828
export_prometheus: false,
2929
redis_url: "redis://localhost:6379/".to_string(),
30-
kafka_hosts: "kafka:9092".to_string(),
31-
kafka_topic: "events_plugin_ingestion".to_string(),
32-
kafka_tls: false,
30+
kafka: KafkaConfig {
31+
kafka_producer_linger_ms: 0, // Send messages as soon as possible
32+
kafka_producer_queue_mib: 10,
33+
kafka_compression_codec: "none".to_string(),
34+
kafka_hosts: "kafka:9092".to_string(),
35+
kafka_topic: "events_plugin_ingestion".to_string(),
36+
kafka_tls: false,
37+
},
3338
});
3439

3540
static TRACING_INIT: Once = Once::new();
@@ -48,7 +53,7 @@ pub struct ServerHandle {
4853
impl ServerHandle {
4954
pub fn for_topic(topic: &EphemeralTopic) -> Self {
5055
let mut config = DEFAULT_CONFIG.clone();
51-
config.kafka_topic = topic.topic_name().to_string();
56+
config.kafka.kafka_topic = topic.topic_name().to_string();
5257
Self::for_config(config)
5358
}
5459
pub fn for_config(config: Config) -> Self {
@@ -90,7 +95,10 @@ impl EphemeralTopic {
9095
pub async fn new() -> Self {
9196
let mut config = ClientConfig::new();
9297
config.set("group.id", "capture_integration_tests");
93-
config.set("bootstrap.servers", DEFAULT_CONFIG.kafka_hosts.clone());
98+
config.set(
99+
"bootstrap.servers",
100+
DEFAULT_CONFIG.kafka.kafka_hosts.clone(),
101+
);
94102
config.set("debug", "all");
95103

96104
// TODO: check for name collision?
@@ -151,7 +159,10 @@ impl Drop for EphemeralTopic {
151159

152160
async fn delete_topic(topic: String) {
153161
let mut config = ClientConfig::new();
154-
config.set("bootstrap.servers", DEFAULT_CONFIG.kafka_hosts.clone());
162+
config.set(
163+
"bootstrap.servers",
164+
DEFAULT_CONFIG.kafka.kafka_hosts.clone(),
165+
);
155166
let admin = AdminClient::from_config(&config).expect("failed to create admin client");
156167
admin
157168
.delete_topics(&[&topic], &AdminOptions::default())

capture/src/config.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,18 @@ pub struct Config {
1111
pub redis_url: String,
1212
#[envconfig(default = "true")]
1313
pub export_prometheus: bool,
14+
#[envconfig(nested = true)]
15+
pub kafka: KafkaConfig,
16+
}
1417

18+
#[derive(Envconfig, Clone)]
19+
pub struct KafkaConfig {
20+
#[envconfig(default = "20")]
21+
pub kafka_producer_linger_ms: u32, // Maximum time between producer batches during low traffic
22+
#[envconfig(default = "400")]
23+
pub kafka_producer_queue_mib: u32, // Size of the in-memory producer queue in mebibytes
24+
#[envconfig(default = "none")]
25+
pub kafka_compression_codec: String, // none, gzip, snappy, lz4, zstd
1526
pub kafka_hosts: String,
1627
pub kafka_topic: String,
1728
#[envconfig(default = "false")]

capture/src/server.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,7 @@ where
2828
config.export_prometheus,
2929
)
3030
} else {
31-
let sink =
32-
sink::KafkaSink::new(config.kafka_topic, config.kafka_hosts, config.kafka_tls).unwrap();
33-
31+
let sink = sink::KafkaSink::new(config.kafka).unwrap();
3432
router::router(
3533
crate::time::SystemTime {},
3634
sink,

capture/src/sink.rs

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
1-
use async_trait::async_trait;
2-
use metrics::{absolute_counter, counter, gauge, histogram};
31
use std::time::Duration;
4-
use tokio::task::JoinSet;
52

6-
use crate::api::CaptureError;
7-
use rdkafka::config::{ClientConfig, FromClientConfigAndContext};
3+
use async_trait::async_trait;
4+
use metrics::{absolute_counter, counter, gauge, histogram};
5+
use rdkafka::config::ClientConfig;
86
use rdkafka::error::RDKafkaErrorCode;
97
use rdkafka::producer::future_producer::{FutureProducer, FutureRecord};
108
use rdkafka::producer::Producer;
119
use rdkafka::util::Timeout;
12-
use tracing::info;
10+
use tokio::task::JoinSet;
11+
use tracing::{debug, info};
1312

13+
use crate::api::CaptureError;
14+
use crate::config::KafkaConfig;
1415
use crate::event::ProcessedEvent;
1516
use crate::prometheus::report_dropped_events;
1617

@@ -106,29 +107,41 @@ pub struct KafkaSink {
106107
}
107108

108109
impl KafkaSink {
109-
pub fn new(topic: String, brokers: String, tls: bool) -> anyhow::Result<KafkaSink> {
110-
info!("connecting to Kafka brokers at {}...", brokers);
111-
let mut config = ClientConfig::new();
112-
config
113-
.set("bootstrap.servers", &brokers)
114-
.set("statistics.interval.ms", "10000");
115-
116-
if tls {
117-
config
110+
pub fn new(config: KafkaConfig) -> anyhow::Result<KafkaSink> {
111+
info!("connecting to Kafka brokers at {}...", config.kafka_hosts);
112+
113+
let mut client_config = ClientConfig::new();
114+
client_config
115+
.set("bootstrap.servers", &config.kafka_hosts)
116+
.set("statistics.interval.ms", "10000")
117+
.set("linger.ms", config.kafka_producer_linger_ms.to_string())
118+
.set("compression.codec", config.kafka_compression_codec)
119+
.set(
120+
"queue.buffering.max.kbytes",
121+
(config.kafka_producer_queue_mib * 1024).to_string(),
122+
);
123+
124+
if config.kafka_tls {
125+
client_config
118126
.set("security.protocol", "ssl")
119127
.set("enable.ssl.certificate.verification", "false");
120128
};
121129

122-
let producer = FutureProducer::from_config_and_context(&config, KafkaContext)?;
130+
debug!("rdkafka configuration: {:?}", client_config);
131+
let producer: FutureProducer<KafkaContext> =
132+
client_config.create_with_context(KafkaContext)?;
123133

124-
// Ping the cluster to make sure we can reach brokers
134+
// Ping the cluster to make sure we can reach brokers, fail after 10 seconds
125135
_ = producer.client().fetch_metadata(
126136
Some("__consumer_offsets"),
127137
Timeout::After(Duration::new(10, 0)),
128138
)?;
129139
info!("connected to Kafka brokers");
130140

131-
Ok(KafkaSink { producer, topic })
141+
Ok(KafkaSink {
142+
producer,
143+
topic: config.kafka_topic,
144+
})
132145
}
133146
}
134147

0 commit comments

Comments
 (0)