Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

kafka sink: expose more rdkafka settings #46

Merged
merged 1 commit into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions capture-server/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,22 @@ use rdkafka::{Message, TopicPartitionList};
use tokio::sync::Notify;
use tracing::debug;

use capture::config::Config;
use capture::config::{Config, KafkaConfig};
use capture::server::serve;

pub static DEFAULT_CONFIG: Lazy<Config> = Lazy::new(|| Config {
print_sink: false,
address: SocketAddr::from_str("127.0.0.1:0").unwrap(),
export_prometheus: false,
redis_url: "redis://localhost:6379/".to_string(),
kafka_hosts: "kafka:9092".to_string(),
kafka_topic: "events_plugin_ingestion".to_string(),
kafka_tls: false,
kafka: KafkaConfig {
kafka_producer_linger_ms: 0, // Send messages as soon as possible
kafka_producer_queue_mib: 10,
kafka_compression_codec: "none".to_string(),
kafka_hosts: "kafka:9092".to_string(),
kafka_topic: "events_plugin_ingestion".to_string(),
kafka_tls: false,
},
});

static TRACING_INIT: Once = Once::new();
Expand All @@ -48,7 +53,7 @@ pub struct ServerHandle {
impl ServerHandle {
pub fn for_topic(topic: &EphemeralTopic) -> Self {
let mut config = DEFAULT_CONFIG.clone();
config.kafka_topic = topic.topic_name().to_string();
config.kafka.kafka_topic = topic.topic_name().to_string();
Self::for_config(config)
}
pub fn for_config(config: Config) -> Self {
Expand Down Expand Up @@ -90,7 +95,10 @@ impl EphemeralTopic {
pub async fn new() -> Self {
let mut config = ClientConfig::new();
config.set("group.id", "capture_integration_tests");
config.set("bootstrap.servers", DEFAULT_CONFIG.kafka_hosts.clone());
config.set(
"bootstrap.servers",
DEFAULT_CONFIG.kafka.kafka_hosts.clone(),
);
config.set("debug", "all");

// TODO: check for name collision?
Expand Down Expand Up @@ -151,7 +159,10 @@ impl Drop for EphemeralTopic {

async fn delete_topic(topic: String) {
let mut config = ClientConfig::new();
config.set("bootstrap.servers", DEFAULT_CONFIG.kafka_hosts.clone());
config.set(
"bootstrap.servers",
DEFAULT_CONFIG.kafka.kafka_hosts.clone(),
);
let admin = AdminClient::from_config(&config).expect("failed to create admin client");
admin
.delete_topics(&[&topic], &AdminOptions::default())
Expand Down
11 changes: 11 additions & 0 deletions capture/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,18 @@ pub struct Config {
pub redis_url: String,
#[envconfig(default = "true")]
pub export_prometheus: bool,
#[envconfig(nested = true)]
pub kafka: KafkaConfig,
}

#[derive(Envconfig, Clone)]
pub struct KafkaConfig {
#[envconfig(default = "20")]
pub kafka_producer_linger_ms: u32, // Maximum time between producer batches during low traffic
#[envconfig(default = "400")]
pub kafka_producer_queue_mib: u32, // Size of the in-memory producer queue in mebibytes
#[envconfig(default = "none")]
pub kafka_compression_codec: String, // none, gzip, snappy, lz4, zstd
pub kafka_hosts: String,
pub kafka_topic: String,
#[envconfig(default = "false")]
Expand Down
4 changes: 1 addition & 3 deletions capture/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ where
config.export_prometheus,
)
} else {
let sink =
sink::KafkaSink::new(config.kafka_topic, config.kafka_hosts, config.kafka_tls).unwrap();

let sink = sink::KafkaSink::new(config.kafka).unwrap();
router::router(
crate::time::SystemTime {},
sink,
Expand Down
49 changes: 31 additions & 18 deletions capture/src/sink.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
use async_trait::async_trait;
use metrics::{absolute_counter, counter, gauge, histogram};
use std::time::Duration;
use tokio::task::JoinSet;

use crate::api::CaptureError;
use rdkafka::config::{ClientConfig, FromClientConfigAndContext};
use async_trait::async_trait;
use metrics::{absolute_counter, counter, gauge, histogram};
use rdkafka::config::ClientConfig;
use rdkafka::error::RDKafkaErrorCode;
use rdkafka::producer::future_producer::{FutureProducer, FutureRecord};
use rdkafka::producer::Producer;
use rdkafka::util::Timeout;
use tracing::info;
use tokio::task::JoinSet;
use tracing::{debug, info};

use crate::api::CaptureError;
use crate::config::KafkaConfig;
use crate::event::ProcessedEvent;
use crate::prometheus::report_dropped_events;

Expand Down Expand Up @@ -106,29 +107,41 @@ pub struct KafkaSink {
}

impl KafkaSink {
pub fn new(topic: String, brokers: String, tls: bool) -> anyhow::Result<KafkaSink> {
info!("connecting to Kafka brokers at {}...", brokers);
let mut config = ClientConfig::new();
config
.set("bootstrap.servers", &brokers)
.set("statistics.interval.ms", "10000");

if tls {
config
pub fn new(config: KafkaConfig) -> anyhow::Result<KafkaSink> {
info!("connecting to Kafka brokers at {}...", config.kafka_hosts);

let mut client_config = ClientConfig::new();
client_config
.set("bootstrap.servers", &config.kafka_hosts)
.set("statistics.interval.ms", "10000")
.set("linger.ms", config.kafka_producer_linger_ms.to_string())
.set("compression.codec", config.kafka_compression_codec)
.set(
"queue.buffering.max.kbytes",
(config.kafka_producer_queue_mib * 1024).to_string(),
);

if config.kafka_tls {
client_config
.set("security.protocol", "ssl")
.set("enable.ssl.certificate.verification", "false");
};

let producer = FutureProducer::from_config_and_context(&config, KafkaContext)?;
debug!("rdkafka configuration: {:?}", client_config);
let producer: FutureProducer<KafkaContext> =
client_config.create_with_context(KafkaContext)?;

// Ping the cluster to make sure we can reach brokers
// Ping the cluster to make sure we can reach brokers, fail after 10 seconds
_ = producer.client().fetch_metadata(
Some("__consumer_offsets"),
Timeout::After(Duration::new(10, 0)),
)?;
info!("connected to Kafka brokers");

Ok(KafkaSink { producer, topic })
Ok(KafkaSink {
producer,
topic: config.kafka_topic,
})
}
}

Expand Down