Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Commit

Permalink
kafka sink: expose more rdkafka settings
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello committed Nov 6, 2023
1 parent 018145c commit 2914c70
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 28 deletions.
25 changes: 18 additions & 7 deletions capture-server/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,22 @@ use rdkafka::{Message, TopicPartitionList};
use tokio::sync::Notify;
use tracing::debug;

use capture::config::Config;
use capture::config::{Config, KafkaConfig};
use capture::server::serve;

pub static DEFAULT_CONFIG: Lazy<Config> = Lazy::new(|| Config {
print_sink: false,
address: SocketAddr::from_str("127.0.0.1:0").unwrap(),
export_prometheus: false,
redis_url: "redis://localhost:6379/".to_string(),
kafka_hosts: "kafka:9092".to_string(),
kafka_topic: "events_plugin_ingestion".to_string(),
kafka_tls: false,
kafka: KafkaConfig {
kafka_producer_linger_ms: 0, // Send messages as soon as possible
kafka_producer_queue_mib: 10,
kafka_compression_codec: "none".to_string(),
kafka_hosts: "kafka:9092".to_string(),
kafka_topic: "events_plugin_ingestion".to_string(),
kafka_tls: false,
},
});

static TRACING_INIT: Once = Once::new();
Expand All @@ -48,7 +53,7 @@ pub struct ServerHandle {
impl ServerHandle {
pub fn for_topic(topic: &EphemeralTopic) -> Self {
let mut config = DEFAULT_CONFIG.clone();
config.kafka_topic = topic.topic_name().to_string();
config.kafka.kafka_topic = topic.topic_name().to_string();
Self::for_config(config)
}
pub fn for_config(config: Config) -> Self {
Expand Down Expand Up @@ -90,7 +95,10 @@ impl EphemeralTopic {
pub async fn new() -> Self {
let mut config = ClientConfig::new();
config.set("group.id", "capture_integration_tests");
config.set("bootstrap.servers", DEFAULT_CONFIG.kafka_hosts.clone());
config.set(
"bootstrap.servers",
DEFAULT_CONFIG.kafka.kafka_hosts.clone(),
);
config.set("debug", "all");

// TODO: check for name collision?
Expand Down Expand Up @@ -151,7 +159,10 @@ impl Drop for EphemeralTopic {

async fn delete_topic(topic: String) {
let mut config = ClientConfig::new();
config.set("bootstrap.servers", DEFAULT_CONFIG.kafka_hosts.clone());
config.set(
"bootstrap.servers",
DEFAULT_CONFIG.kafka.kafka_hosts.clone(),
);
let admin = AdminClient::from_config(&config).expect("failed to create admin client");
admin
.delete_topics(&[&topic], &AdminOptions::default())
Expand Down
11 changes: 11 additions & 0 deletions capture/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,18 @@ pub struct Config {
pub redis_url: String,
#[envconfig(default = "true")]
pub export_prometheus: bool,
#[envconfig(nested = true)]
pub kafka: KafkaConfig,
}

#[derive(Envconfig, Clone)]
pub struct KafkaConfig {
#[envconfig(default = "20")]
pub kafka_producer_linger_ms: u32, // Maximum time between producer batches during low traffic
#[envconfig(default = "400")]
pub kafka_producer_queue_mib: u32, // Size of the in-memory producer queue in mebibytes
#[envconfig(default = "none")]
pub kafka_compression_codec: String, // none, gzip, snappy, lz4, zstd
pub kafka_hosts: String,
pub kafka_topic: String,
#[envconfig(default = "false")]
Expand Down
4 changes: 1 addition & 3 deletions capture/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ where
config.export_prometheus,
)
} else {
let sink =
sink::KafkaSink::new(config.kafka_topic, config.kafka_hosts, config.kafka_tls).unwrap();

let sink = sink::KafkaSink::new(config.kafka).unwrap();
router::router(
crate::time::SystemTime {},
sink,
Expand Down
49 changes: 31 additions & 18 deletions capture/src/sink.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
use async_trait::async_trait;
use metrics::{absolute_counter, counter, gauge, histogram};
use std::time::Duration;
use tokio::task::JoinSet;

use crate::api::CaptureError;
use rdkafka::config::{ClientConfig, FromClientConfigAndContext};
use async_trait::async_trait;
use metrics::{absolute_counter, counter, gauge, histogram};
use rdkafka::config::ClientConfig;
use rdkafka::error::RDKafkaErrorCode;
use rdkafka::producer::future_producer::{FutureProducer, FutureRecord};
use rdkafka::producer::Producer;
use rdkafka::util::Timeout;
use tracing::info;
use tokio::task::JoinSet;
use tracing::{debug, info};

use crate::api::CaptureError;
use crate::config::KafkaConfig;
use crate::event::ProcessedEvent;
use crate::prometheus::report_dropped_events;

Expand Down Expand Up @@ -106,29 +107,41 @@ pub struct KafkaSink {
}

impl KafkaSink {
pub fn new(topic: String, brokers: String, tls: bool) -> anyhow::Result<KafkaSink> {
info!("connecting to Kafka brokers at {}...", brokers);
let mut config = ClientConfig::new();
config
.set("bootstrap.servers", &brokers)
.set("statistics.interval.ms", "10000");

if tls {
config
pub fn new(config: KafkaConfig) -> anyhow::Result<KafkaSink> {
info!("connecting to Kafka brokers at {}...", config.kafka_hosts);

let mut client_config = ClientConfig::new();
client_config
.set("bootstrap.servers", &config.kafka_hosts)
.set("statistics.interval.ms", "10000")
.set("linger.ms", config.kafka_producer_linger_ms.to_string())
.set("compression.codec", config.kafka_compression_codec)
.set(
"queue.buffering.max.kbytes",
(config.kafka_producer_queue_mib * 1024).to_string(),
);

if config.kafka_tls {
client_config
.set("security.protocol", "ssl")
.set("enable.ssl.certificate.verification", "false");
};

let producer = FutureProducer::from_config_and_context(&config, KafkaContext)?;
debug!("rdkafka configuration: {:?}", client_config);
let producer: FutureProducer<KafkaContext> =
client_config.create_with_context(KafkaContext)?;

// Ping the cluster to make sure we can reach brokers
// Ping the cluster to make sure we can reach brokers, fail after 10 seconds
_ = producer.client().fetch_metadata(
Some("__consumer_offsets"),
Timeout::After(Duration::new(10, 0)),
)?;
info!("connected to Kafka brokers");

Ok(KafkaSink { producer, topic })
Ok(KafkaSink {
producer,
topic: config.kafka_topic,
})
}
}

Expand Down

0 comments on commit 2914c70

Please sign in to comment.