diff --git a/src/cluster/manager.rs b/src/cluster/manager.rs index 1a4ee60..a4baef5 100644 --- a/src/cluster/manager.rs +++ b/src/cluster/manager.rs @@ -48,7 +48,10 @@ impl ClusterManager { OffsetCollector::with_performance(Arc::clone(&client), filters, performance.clone()); let timestamp_sampler = if exporter_config.timestamp_sampling.enabled { - let ts_consumer = TimestampConsumer::new(&config)?; + let ts_consumer = TimestampConsumer::with_pool_size( + &config, + exporter_config.timestamp_sampling.max_concurrent_fetches, + )?; Some(TimestampSampler::new( ts_consumer, exporter_config.timestamp_sampling.cache_ttl, diff --git a/src/collector/offset_collector.rs b/src/collector/offset_collector.rs index a7806b4..5c8c1fd 100644 --- a/src/collector/offset_collector.rs +++ b/src/collector/offset_collector.rs @@ -3,7 +3,6 @@ use crate::error::Result; use crate::kafka::client::{KafkaClient, TopicPartition}; use std::collections::HashMap; use std::sync::Arc; -use std::time::Duration; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tracing::{debug, instrument, warn}; @@ -100,9 +99,14 @@ impl OffsetCollector { debug!(partitions = watermarks.len(), "Fetched watermarks"); // Build group snapshots + let wm_keys: Vec = watermarks.keys().cloned().collect(); let mut groups = Vec::with_capacity(descriptions.len()); for desc in descriptions { - let offsets = self.fetch_group_offsets(&desc.group_id, &watermarks)?; + let offsets = self.client.list_consumer_group_offsets( + &desc.group_id, + &wm_keys, + self.performance.offset_fetch_timeout, + )?; // Filter offsets by topic whitelist/blacklist let filtered_offsets: HashMap = offsets @@ -242,6 +246,7 @@ impl OffsetCollector { } /// Fetch offsets for all groups in parallel with bounded concurrency. + /// Uses the Admin API (ListConsumerGroupOffsets) through the shared AdminClient — no per-group consumers needed. async fn fetch_all_group_offsets_parallel( &self, descriptions: &[crate::kafka::client::GroupDescription], @@ -253,42 +258,31 @@ impl OffsetCollector { debug!( groups = descriptions.len(), max_concurrent = max_concurrent, - "Fetching group offsets in parallel" + "Fetching group offsets in parallel via Admin API" ); let semaphore = Arc::new(Semaphore::new(max_concurrent)); - let mut handles = Vec::with_capacity(descriptions.len()); - - // Get bootstrap servers once, not per-task - let bootstrap_servers = self.client.bootstrap_servers().to_string(); - let consumer_properties = self.client.consumer_properties().clone(); + let client = Arc::clone(&self.client); let wm_keys: Vec = watermarks.keys().cloned().collect(); + let mut handles = Vec::with_capacity(descriptions.len()); + for desc in descriptions { let group_id = desc.group_id.clone(); let permit = semaphore.clone(); - let bootstrap = bootstrap_servers.clone(); - let props = consumer_properties.clone(); + let client_clone = Arc::clone(&client); let partitions = wm_keys.clone(); let timeout = offset_timeout; - // Spawn async task that properly awaits the semaphore before spawning blocking work let handle = tokio::spawn(async move { - // Acquire permit - this properly awaits until one is available let permit_guard: OwnedSemaphorePermit = permit.acquire_owned().await.expect("semaphore closed"); - // Now spawn the blocking task with the permit held tokio::task::spawn_blocking(move || { - let _permit = permit_guard; // Hold permit until blocking work completes + let _permit = permit_guard; - let offsets = Self::fetch_group_offsets_standalone( - &group_id, - &bootstrap, - &props, - &partitions, - timeout, - ); + let offsets = + client_clone.list_consumer_group_offsets(&group_id, &partitions, timeout); (group_id, offsets) }) @@ -298,7 +292,6 @@ impl OffsetCollector { handles.push(handle); } - // Wait for all tasks to complete (nested Result from tokio::spawn -> spawn_blocking) let results = futures::future::join_all(handles).await; let mut all_offsets = HashMap::new(); @@ -322,139 +315,6 @@ impl OffsetCollector { all_offsets } - - /// Standalone function to fetch group offsets (for use in spawn_blocking) - fn fetch_group_offsets_standalone( - group_id: &str, - bootstrap_servers: &str, - consumer_properties: &HashMap, - watermark_partitions: &[TopicPartition], - timeout: Duration, - ) -> Result> { - use rdkafka::config::ClientConfig; - use rdkafka::consumer::{BaseConsumer, Consumer}; - use rdkafka::TopicPartitionList; - - if bootstrap_servers.is_empty() { - return Ok(HashMap::new()); - } - - let mut client_config = ClientConfig::new(); - - // Apply consumer properties first (security settings, etc.) - for (key, value) in consumer_properties { - client_config.set(key, value); - } - - client_config - .set("bootstrap.servers", bootstrap_servers) - .set("group.id", group_id) - .set("enable.auto.commit", "false"); - - let consumer: BaseConsumer = match client_config.create() { - Ok(c) => c, - Err(e) => { - warn!(group = group_id, error = %e, "Failed to create consumer for group"); - return Ok(HashMap::new()); - } - }; - - // Build topic partition list from watermarks - let mut tpl = TopicPartitionList::new(); - for tp in watermark_partitions { - tpl.add_partition(&tp.topic, tp.partition); - } - - // Fetch committed offsets - let committed = match consumer.committed_offsets(tpl, timeout) { - Ok(c) => c, - Err(e) => { - warn!(group = group_id, error = %e, "Failed to fetch committed offsets"); - return Ok(HashMap::new()); - } - }; - - let mut offsets = HashMap::new(); - for elem in committed.elements() { - if let rdkafka::Offset::Offset(offset) = elem.offset() { - offsets.insert(TopicPartition::new(elem.topic(), elem.partition()), offset); - } - } - - Ok(offsets) - } - - /// Fetch offsets for a single group (used by sequential collect method). - #[allow(dead_code)] - fn fetch_group_offsets( - &self, - group_id: &str, - watermarks: &HashMap, - ) -> Result> { - // We need to create a consumer for this specific group to fetch committed offsets - use rdkafka::config::ClientConfig; - use rdkafka::consumer::{BaseConsumer, Consumer}; - use rdkafka::TopicPartitionList; - - let _cluster = self.client.cluster_name(); - let mut client_config = ClientConfig::new(); - - // Get bootstrap servers from the existing client's metadata - let metadata = self.client.fetch_metadata()?; - - // Build bootstrap servers from metadata brokers - let bootstrap_servers: Vec = metadata - .brokers() - .iter() - .map(|b| format!("{}:{}", b.host(), b.port())) - .collect(); - - if bootstrap_servers.is_empty() { - return Ok(HashMap::new()); - } - - // Apply consumer properties first (security settings, etc.) - for (key, value) in self.client.consumer_properties() { - client_config.set(key, value); - } - - client_config - .set("bootstrap.servers", bootstrap_servers.join(",")) - .set("group.id", group_id) - .set("enable.auto.commit", "false"); - - let consumer: BaseConsumer = match client_config.create() { - Ok(c) => c, - Err(e) => { - warn!(group = group_id, error = %e, "Failed to create consumer for group"); - return Ok(HashMap::new()); - } - }; - - // Build topic partition list from watermarks - let mut tpl = TopicPartitionList::new(); - for tp in watermarks.keys() { - tpl.add_partition(&tp.topic, tp.partition); - } - - // Fetch committed offsets - let committed = match consumer.committed_offsets(tpl, std::time::Duration::from_secs(10)) { - Ok(c) => c, - Err(e) => { - warn!(group = group_id, error = %e, "Failed to fetch committed offsets"); - return Ok(HashMap::new()); - } - }; - - let mut offsets = HashMap::new(); - for elem in committed.elements() { - if let rdkafka::Offset::Offset(offset) = elem.offset() { - offsets.insert(TopicPartition::new(elem.topic(), elem.partition()), offset); - } - } - - Ok(offsets) - } } impl OffsetsSnapshot { diff --git a/src/error.rs b/src/error.rs index a4ce98d..fe095f3 100644 --- a/src/error.rs +++ b/src/error.rs @@ -35,6 +35,9 @@ pub enum KlagError { #[allow(dead_code)] #[error("Leadership error: {0}")] Leadership(String), + + #[error("Admin API error: {0}")] + Admin(String), } pub type Result = std::result::Result; diff --git a/src/kafka/client.rs b/src/kafka/client.rs index 5eb5197..79c08bb 100644 --- a/src/kafka/client.rs +++ b/src/kafka/client.rs @@ -8,6 +8,8 @@ use rdkafka::groups::GroupList; use rdkafka::metadata::Metadata; use rdkafka::TopicPartitionList; use std::collections::{HashMap, HashSet}; +use std::ffi::{CStr, CString}; +use std::os::raw::c_char; use std::sync::Arc; use std::time::Duration; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; @@ -65,7 +67,7 @@ pub enum OffsetPosition { pub struct KafkaClient { admin: AdminClient, - consumer: BaseConsumer, + consumer: Arc, config: ClusterConfig, timeout: Duration, performance: PerformanceConfig, @@ -108,7 +110,7 @@ impl KafkaClient { Ok(Self { admin, - consumer, + consumer: Arc::new(consumer), config: config.clone(), timeout, performance, @@ -124,14 +126,6 @@ impl KafkaClient { &self.config.name } - pub fn consumer_properties(&self) -> &HashMap { - &self.config.consumer_properties - } - - pub fn bootstrap_servers(&self) -> &str { - &self.config.bootstrap_servers - } - #[instrument(skip(self), fields(cluster = %self.config.name))] pub fn list_consumer_groups(&self) -> Result> { let group_list: GroupList = self @@ -193,39 +187,230 @@ impl KafkaClient { Ok(descriptions) } - #[allow(dead_code)] - #[instrument(skip(self), fields(cluster = %self.config.name, group = %group_id))] + /// Fetch committed offsets for a consumer group using the Admin API. + /// Uses the existing AdminClient connection — no additional consumers/FDs needed. + #[instrument(skip(self, partitions), fields(cluster = %self.config.name, group = %group_id))] pub fn list_consumer_group_offsets( &self, group_id: &str, + partitions: &[TopicPartition], + timeout: Duration, ) -> Result> { - let metadata = self.fetch_metadata()?; - let mut tpl = TopicPartitionList::new(); + use rdkafka::bindings::*; + + let group_cstr = CString::new(group_id) + .map_err(|e| KlagError::Admin(format!("Invalid group_id contains null byte: {e}")))?; + let timeout_ms = timeout.as_millis().min(i32::MAX as u128) as i32; + + unsafe { + // Get the native rd_kafka_t handle from the AdminClient + let rk = self.admin.inner().native_ptr(); + + // Build the C topic-partition list from our partitions + let c_tpl = rd_kafka_topic_partition_list_new(partitions.len() as i32); + if c_tpl.is_null() { + return Err(KlagError::Admin( + "Failed to create topic partition list".into(), + )); + } - for topic in metadata.topics() { - for partition in topic.partitions() { - tpl.add_partition(topic.name(), partition.id()); + // RAII guard to clean up all C resources on any exit path + struct Cleanup { + tpl: *mut rd_kafka_topic_partition_list_t, + request: *mut rd_kafka_ListConsumerGroupOffsets_t, + options: *mut rd_kafka_AdminOptions_t, + queue: *mut rd_kafka_queue_t, + event: *mut rd_kafka_event_t, + // CStrings kept alive for the duration of the FFI call + _topic_cstrings: Vec, + } + impl Drop for Cleanup { + fn drop(&mut self) { + unsafe { + if !self.event.is_null() { + rd_kafka_event_destroy(self.event); + } + if !self.queue.is_null() { + rd_kafka_queue_destroy(self.queue); + } + if !self.options.is_null() { + rd_kafka_AdminOptions_destroy(self.options); + } + if !self.request.is_null() { + rd_kafka_ListConsumerGroupOffsets_destroy(self.request); + } + // tpl ownership is transferred to the request via + // rd_kafka_ListConsumerGroupOffsets_new, which copies it. + // We still own the original and must free it. + if !self.tpl.is_null() { + rd_kafka_topic_partition_list_destroy(self.tpl); + } + } + } } - } - let committed = self - .consumer - .committed_offsets(tpl, self.timeout) - .map_err(KlagError::Kafka)?; + let mut cleanup = Cleanup { + tpl: c_tpl, + request: std::ptr::null_mut(), + options: std::ptr::null_mut(), + queue: std::ptr::null_mut(), + event: std::ptr::null_mut(), + _topic_cstrings: Vec::with_capacity(partitions.len()), + }; - let mut offsets = HashMap::new(); - for elem in committed.elements() { - if let rdkafka::Offset::Offset(offset) = elem.offset() { - offsets.insert(TopicPartition::new(elem.topic(), elem.partition()), offset); + // Populate the partition list + for tp in partitions { + let topic_cstr = CString::new(tp.topic.as_str()) + .map_err(|e| KlagError::Admin(format!("Topic name contains null byte: {e}")))?; + cleanup._topic_cstrings.push(topic_cstr); + let cstr_ptr = cleanup._topic_cstrings.last().unwrap().as_ptr(); + rd_kafka_topic_partition_list_add(c_tpl, cstr_ptr, tp.partition); } - } - debug!( - group = group_id, - partitions = offsets.len(), - "Fetched committed offsets" - ); - Ok(offsets) + // Create the request object + let request = rd_kafka_ListConsumerGroupOffsets_new(group_cstr.as_ptr(), c_tpl); + if request.is_null() { + return Err(KlagError::Admin( + "Failed to create ListConsumerGroupOffsets request".into(), + )); + } + cleanup.request = request; + + // Create admin options with timeout + let options = rd_kafka_AdminOptions_new( + rk, + rd_kafka_admin_op_t::RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPOFFSETS, + ); + if options.is_null() { + return Err(KlagError::Admin("Failed to create AdminOptions".into())); + } + cleanup.options = options; + + let mut errstr_buf = [0 as c_char; 512]; + let err = rd_kafka_AdminOptions_set_request_timeout( + options, + timeout_ms, + errstr_buf.as_mut_ptr(), + errstr_buf.len(), + ); + if err != rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR_NO_ERROR { + let errstr = CStr::from_ptr(errstr_buf.as_ptr()) + .to_string_lossy() + .to_string(); + return Err(KlagError::Admin(format!( + "Failed to set request timeout: {errstr}" + ))); + } + + // Create a temporary queue for the async result + let queue = rd_kafka_queue_new(rk); + if queue.is_null() { + return Err(KlagError::Admin("Failed to create queue".into())); + } + cleanup.queue = queue; + + // Issue the async call + let mut request_ptr = request; + rd_kafka_ListConsumerGroupOffsets(rk, &mut request_ptr, 1, options, queue); + // After the call, the request is consumed; prevent double-free + cleanup.request = std::ptr::null_mut(); + + // Poll for the result event (blocks up to timeout) + let event = rd_kafka_queue_poll(queue, timeout_ms); + if event.is_null() { + return Err(KlagError::Admin( + "ListConsumerGroupOffsets timed out".into(), + )); + } + cleanup.event = event; + + // Verify it's the right event type + let event_type = rd_kafka_event_type(event); + if event_type != RD_KAFKA_EVENT_LISTCONSUMERGROUPOFFSETS_RESULT { + return Err(KlagError::Admin(format!( + "Unexpected event type: {event_type}" + ))); + } + + // Check top-level error + let resp_err = rd_kafka_event_error(event); + if resp_err != rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR_NO_ERROR { + let err_cstr = rd_kafka_event_error_string(event); + let err_msg = if err_cstr.is_null() { + "unknown error".to_string() + } else { + CStr::from_ptr(err_cstr).to_string_lossy().to_string() + }; + return Err(KlagError::Admin(format!( + "ListConsumerGroupOffsets failed: {err_msg}" + ))); + } + + // Extract the result + let result = rd_kafka_event_ListConsumerGroupOffsets_result(event); + if result.is_null() { + return Err(KlagError::Admin( + "ListConsumerGroupOffsets result is null".into(), + )); + } + + let mut n_groups: usize = 0; + let groups_ptr = rd_kafka_ListConsumerGroupOffsets_result_groups(result, &mut n_groups); + + let mut offsets = HashMap::new(); + + if groups_ptr.is_null() || n_groups == 0 { + debug!(group = group_id, "No group results returned from Admin API"); + return Ok(offsets); + } + + for i in 0..n_groups { + let group = *groups_ptr.add(i); + + // Check per-group error + let group_error = rd_kafka_group_result_error(group); + if !group_error.is_null() { + let code = rd_kafka_error_code(group_error); + if code != rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR_NO_ERROR { + let err_str = rd_kafka_error_string(group_error); + let msg = if err_str.is_null() { + "unknown".to_string() + } else { + CStr::from_ptr(err_str).to_string_lossy().to_string() + }; + warn!(group = group_id, error = %msg, "Group result error"); + continue; + } + } + + let result_partitions = rd_kafka_group_result_partitions(group); + if result_partitions.is_null() { + continue; + } + + let cnt = (*result_partitions).cnt; + let elems = (*result_partitions).elems; + if elems.is_null() { + continue; + } + + for j in 0..cnt { + let elem = &*elems.add(j as usize); + // offset == -1001 (RD_KAFKA_OFFSET_INVALID) means no committed offset + if elem.offset >= 0 && !elem.topic.is_null() { + let topic = CStr::from_ptr(elem.topic).to_string_lossy().to_string(); + offsets.insert(TopicPartition::new(topic, elem.partition), elem.offset); + } + } + } + + debug!( + group = group_id, + partitions = offsets.len(), + "Fetched committed offsets via Admin API" + ); + Ok(offsets) + } } #[instrument(skip(self), fields(cluster = %self.config.name))] @@ -346,59 +531,25 @@ impl KafkaClient { // Use semaphore to limit concurrency let semaphore = Arc::new(Semaphore::new(max_concurrent)); - let bootstrap_servers = self.config.bootstrap_servers.clone(); - let consumer_properties = self.config.consumer_properties.clone(); + let consumer = Arc::clone(&self.consumer); let timeout = self.timeout; - let cluster_name = self.config.name.clone(); let mut handles = Vec::with_capacity(partitions.len()); for (topic, partition) in partitions { let semaphore_clone = semaphore.clone(); - let bootstrap = bootstrap_servers.clone(); - let props = consumer_properties.clone(); - let cluster = cluster_name.clone(); + let consumer_clone = Arc::clone(&consumer); - // Spawn async task that properly awaits the semaphore before spawning blocking work let handle = tokio::spawn(async move { - // Acquire permit - this properly awaits until one is available let permit: OwnedSemaphorePermit = semaphore_clone .acquire_owned() .await .expect("semaphore closed"); - // Now spawn the blocking task with the permit held tokio::task::spawn_blocking(move || { - let _permit = permit; // Hold permit until blocking work completes - - // Create a temporary consumer for this fetch - let mut client_config = ClientConfig::new(); - client_config.set("bootstrap.servers", &bootstrap); - client_config.set( - "client.id", - format!("klag-wm-{}-{}-{}", cluster, topic, partition), - ); - client_config.set("group.id", format!("klag-wm-internal-{}", cluster)); - client_config.set("enable.auto.commit", "false"); - - for (key, value) in &props { - client_config.set(key, value); - } - - let consumer: BaseConsumer = match client_config.create() { - Ok(c) => c, - Err(e) => { - warn!( - topic = topic, - partition = partition, - error = %e, - "Failed to create consumer for watermark fetch" - ); - return None; - } - }; + let _permit = permit; - match consumer.fetch_watermarks(&topic, partition, timeout) { + match consumer_clone.fetch_watermarks(&topic, partition, timeout) { Ok((low, high)) => { Some((TopicPartition::new(&topic, partition), (low, high))) } diff --git a/src/kafka/consumer.rs b/src/kafka/consumer.rs index bf62cc4..73872ed 100644 --- a/src/kafka/consumer.rs +++ b/src/kafka/consumer.rs @@ -6,6 +6,7 @@ use rdkafka::consumer::{BaseConsumer, Consumer}; use rdkafka::message::Message; use rdkafka::Offset; use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Mutex; use std::time::Duration; use tracing::{debug, instrument, warn}; @@ -16,27 +17,44 @@ pub struct TimestampFetchResult { pub timestamp_ms: i64, } -/// Factory for creating consumers. Creates a fresh consumer for each fetch -/// to avoid state conflicts when fetching timestamps concurrently. +/// Pool-based timestamp consumer. Maintains a pool of reusable BaseConsumers +/// to avoid connection churn (TCP/TLS/SASL handshake per fetch). pub struct TimestampConsumer { config: ClusterConfig, cluster_name: String, fetch_timeout: Duration, consumer_counter: AtomicU64, + pool: Mutex>, + pool_size: usize, } impl TimestampConsumer { - pub fn new(config: &ClusterConfig) -> Result { - Ok(Self { + pub fn with_pool_size(config: &ClusterConfig, pool_size: usize) -> Result { + let mut consumer = Self { config: config.clone(), cluster_name: config.name.clone(), fetch_timeout: Duration::from_secs(5), consumer_counter: AtomicU64::new(0), - }) + pool: Mutex::new(Vec::with_capacity(pool_size)), + pool_size, + }; + + // Pre-populate the pool + for _ in 0..pool_size { + let c = consumer.create_consumer()?; + consumer.pool.get_mut().unwrap().push(c); + } + + debug!( + cluster = %consumer.cluster_name, + pool_size = pool_size, + "Created timestamp consumer pool" + ); + + Ok(consumer) } - /// Create a fresh consumer for a single fetch operation. - /// Each consumer gets a unique client.id to avoid broker-side conflicts. + /// Create a consumer for the pool. fn create_consumer(&self) -> Result { let counter = self.consumer_counter.fetch_add(1, Ordering::Relaxed); @@ -64,6 +82,40 @@ impl TimestampConsumer { client_config.create().map_err(KlagError::Kafka) } + /// Take a consumer from the pool, or create a new one if the pool is empty. + fn acquire(&self) -> Result { + let mut pool = self + .pool + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + if let Some(consumer) = pool.pop() { + Ok(consumer) + } else { + // Pool exhausted (more concurrent fetches than pool_size); create a temporary one + self.create_consumer() + } + } + + /// Return a consumer to the pool. If the pool is full, the consumer is dropped. + fn release(&self, consumer: BaseConsumer) { + // Unassign before returning to pool to clear any partition state + let empty = rdkafka::TopicPartitionList::new(); + if let Err(e) = consumer.assign(&empty) { + warn!(error = %e, "Failed to unassign consumer before returning to pool"); + // Don't return a broken consumer to the pool + return; + } + + let mut pool = self + .pool + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + if pool.len() < self.pool_size { + pool.push(consumer); + } + // else: pool is full, consumer is dropped + } + #[instrument(skip(self), fields(cluster = %self.cluster_name, topic = %tp.topic, partition = tp.partition, offset = offset))] pub fn fetch_timestamp( &self, @@ -72,8 +124,26 @@ impl TimestampConsumer { ) -> Result> { use rdkafka::TopicPartitionList; - // Create a fresh consumer for this fetch to avoid state conflicts - let consumer = self.create_consumer()?; + let consumer = self.acquire()?; + + // RAII guard ensures consumer is returned to pool even on panic + struct PoolGuard<'a> { + consumer: Option, + pool: &'a TimestampConsumer, + } + impl<'a> Drop for PoolGuard<'a> { + fn drop(&mut self) { + if let Some(consumer) = self.consumer.take() { + self.pool.release(consumer); + } + } + } + + let guard = PoolGuard { + consumer: Some(consumer), + pool: self, + }; + let consumer = guard.consumer.as_ref().unwrap(); let mut tpl = TopicPartitionList::new(); tpl.add_partition_offset(&tp.topic, tp.partition, Offset::Offset(offset)) @@ -81,8 +151,7 @@ impl TimestampConsumer { consumer.assign(&tpl).map_err(KlagError::Kafka)?; - // Poll for message - no seek needed since assign with offset already positions - match consumer.poll(self.fetch_timeout) { + let result = match consumer.poll(self.fetch_timeout) { Some(result) => match result { Ok(msg) => { let timestamp = msg.timestamp().to_millis(); @@ -117,8 +186,11 @@ impl TimestampConsumer { ); Ok(None) } - } - // Consumer is dropped here, cleaning up resources + }; + + // Take consumer out of guard so drop still returns it to pool + // (guard.drop() handles the release) + result } #[allow(dead_code)] @@ -140,6 +212,7 @@ impl std::fmt::Debug for TimestampConsumer { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("TimestampConsumer") .field("cluster", &self.cluster_name) + .field("pool_size", &self.pool_size) .finish() } } diff --git a/test-stack-large-cluster/.env b/test-stack-large-cluster/.env new file mode 100644 index 0000000..f3125c4 --- /dev/null +++ b/test-stack-large-cluster/.env @@ -0,0 +1,10 @@ +# File descriptor limit for klag-exporter container +# Set low (256) to trigger FD exhaustion quickly, higher (1024+) for comparison +KLAG_NOFILE_LIMIT=256 + +# Topic configuration +NUM_TOPICS=500 +PARTITIONS_PER_TOPIC=3 + +# Consumer configuration +NUM_CONSUMER_GROUPS=10 diff --git a/test-stack-large-cluster/README.md b/test-stack-large-cluster/README.md new file mode 100644 index 0000000..9ef622b --- /dev/null +++ b/test-stack-large-cluster/README.md @@ -0,0 +1,66 @@ +# Large Cluster Test Stack — FD Exhaustion Reproduction + +Test environment for reproducing file descriptor exhaustion issues with klag-exporter under high-scale scenarios. + +Creates 500 topics x 3 partitions (1500 partitions) with 10 consumer groups and a low `ulimit` on the klag-exporter container to trigger "Too many open files" errors. + +## System Requirements + +- Docker with at least 8GB memory allocated (Kafka alone uses up to 4GB) +- Docker Compose v2+ +- ~2GB free disk space for container images + +## Quick Start + +```bash +cd test-stack-large-cluster +docker-compose up -d --build +``` + +Wait for topic creation to finish: +```bash +docker logs -f lc-topic-creator +``` + +Watch for FD exhaustion errors: +```bash +docker logs -f lc-klag-exporter +``` + +With the default `KLAG_NOFILE_LIMIT=256`, you should see "Too many open files" errors within a few collection cycles. + +## Configuration + +Edit `.env` to adjust scenarios: + +| Variable | Default | Description | +|---|---|---| +| `KLAG_NOFILE_LIMIT` | `256` | ulimit for klag-exporter (lower = faster failure) | +| `NUM_TOPICS` | `500` | Number of topics to create | +| `PARTITIONS_PER_TOPIC` | `3` | Partitions per topic | +| `NUM_CONSUMER_GROUPS` | `10` | Number of consumer groups | + +klag-exporter settings (`poll_interval`, `max_concurrent_fetches`, etc.) are configured directly in `klag-config.toml`. + +After editing, apply with: +```bash +docker-compose up -d +``` + +## Access Points + +| Service | URL | +|---|---| +| klag-exporter metrics | http://localhost:8100/metrics | +| Prometheus | http://localhost:9190 | +| Grafana | http://localhost:3100 (admin/admin) | +| Kafka UI | http://localhost:8180 | +| OTel Collector | http://localhost:8988 (internal), http://localhost:8989 (exported metrics) | + +All ports are offset from the existing `test-stack/` so both can run simultaneously. + +## Cleanup + +```bash +docker-compose down -v +``` diff --git a/test-stack-large-cluster/docker-compose.yml b/test-stack-large-cluster/docker-compose.yml new file mode 100644 index 0000000..000e26d --- /dev/null +++ b/test-stack-large-cluster/docker-compose.yml @@ -0,0 +1,186 @@ +services: + # Kafka cluster using KRaft (no Zookeeper) + kafka: + image: confluentinc/cp-kafka:7.5.0 + hostname: kafka + container_name: lc-kafka + ports: + - "9192:9092" + - "9193:9093" + deploy: + resources: + limits: + memory: 4g + environment: + KAFKA_HEAP_OPTS: "-Xmx2g -Xms2g" + KAFKA_NODE_ID: 1 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9192 + KAFKA_LISTENERS: PLAINTEXT://kafka:29092,CONTROLLER://kafka:29093,PLAINTEXT_HOST://0.0.0.0:9092 + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:29093 + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" + KAFKA_NUM_PARTITIONS: 3 + CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk + healthcheck: + test: ["CMD", "kafka-topics", "--bootstrap-server", "kafka:29092", "--list"] + interval: 10s + timeout: 10s + retries: 10 + start_period: 30s + networks: + - lc-klag-network + + # Topic creator - runs once then exits + topic-creator: + image: confluentinc/cp-kafka:7.5.0 + container_name: lc-topic-creator + depends_on: + kafka: + condition: service_healthy + volumes: + - ./scripts:/scripts:ro + environment: + NUM_TOPICS: ${NUM_TOPICS:-500} + PARTITIONS_PER_TOPIC: ${PARTITIONS_PER_TOPIC:-3} + entrypoint: ["/bin/bash", "/scripts/create-topics.sh"] + networks: + - lc-klag-network + + # klag-exporter with configurable ulimit + klag-exporter: + build: + context: .. + dockerfile: Dockerfile.dev + container_name: lc-klag-exporter + ports: + - "8100:8000" + ulimits: + nofile: + soft: ${KLAG_NOFILE_LIMIT:-256} + hard: ${KLAG_NOFILE_LIMIT:-256} + volumes: + - ./klag-config.toml:/app/config.toml:ro + depends_on: + kafka: + condition: service_healthy + healthcheck: + test: ["CMD", "wget", "-q", "--spider", "http://localhost:8000/health"] + interval: 10s + timeout: 5s + retries: 3 + networks: + - lc-klag-network + + # Producer - writes to all topics continuously + producer: + image: confluentinc/cp-kafka:7.5.0 + container_name: lc-producer + depends_on: + topic-creator: + condition: service_completed_successfully + volumes: + - ./scripts:/scripts:ro + environment: + NUM_TOPICS: ${NUM_TOPICS:-500} + entrypoint: ["/bin/bash", "/scripts/producer.sh"] + networks: + - lc-klag-network + + # Consumer - creates multiple consumer groups with lag + consumer: + image: confluentinc/cp-kafka:7.5.0 + container_name: lc-consumer + depends_on: + topic-creator: + condition: service_completed_successfully + volumes: + - ./scripts:/scripts:ro + environment: + NUM_TOPICS: ${NUM_TOPICS:-500} + NUM_CONSUMER_GROUPS: ${NUM_CONSUMER_GROUPS:-10} + entrypoint: ["/bin/bash", "/scripts/consumer.sh"] + networks: + - lc-klag-network + + # Prometheus + prometheus: + image: prom/prometheus:v2.47.0 + container_name: lc-prometheus + ports: + - "9190:9090" + volumes: + - ./prometheus/prometheus.yml:/etc/prometheus/prometheus.yml:ro + - lc-prometheus-data:/prometheus + command: + - '--config.file=/etc/prometheus/prometheus.yml' + - '--storage.tsdb.path=/prometheus' + - '--web.console.libraries=/etc/prometheus/console_libraries' + - '--web.console.templates=/etc/prometheus/consoles' + - '--web.enable-lifecycle' + depends_on: + - klag-exporter + networks: + - lc-klag-network + + # Grafana + grafana: + image: grafana/grafana:10.1.0 + container_name: lc-grafana + ports: + - "3100:3000" + environment: + GF_SECURITY_ADMIN_USER: admin + GF_SECURITY_ADMIN_PASSWORD: admin + GF_USERS_ALLOW_SIGN_UP: "false" + volumes: + - ./grafana/provisioning:/etc/grafana/provisioning:ro + - lc-grafana-data:/var/lib/grafana + depends_on: + - prometheus + networks: + - lc-klag-network + + # OpenTelemetry Collector + otel-collector: + image: otel/opentelemetry-collector-contrib:0.91.0 + container_name: lc-otel-collector + ports: + - "4417:4317" + - "4418:4318" + - "8988:8888" + - "8989:8889" + volumes: + - ./otel/otel-collector-config.yaml:/etc/otelcol-contrib/config.yaml:ro + command: ["--config=/etc/otelcol-contrib/config.yaml"] + networks: + - lc-klag-network + + # Kafka UI for debugging + kafka-ui: + image: provectuslabs/kafka-ui:latest + container_name: lc-kafka-ui + ports: + - "8180:8080" + environment: + KAFKA_CLUSTERS_0_NAME: large-cluster + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092 + depends_on: + kafka: + condition: service_healthy + networks: + - lc-klag-network + +volumes: + lc-prometheus-data: + lc-grafana-data: + +networks: + lc-klag-network: + driver: bridge diff --git a/test-stack-large-cluster/grafana/provisioning/dashboards/dashboard.yml b/test-stack-large-cluster/grafana/provisioning/dashboards/dashboard.yml new file mode 100644 index 0000000..5878c2b --- /dev/null +++ b/test-stack-large-cluster/grafana/provisioning/dashboards/dashboard.yml @@ -0,0 +1,13 @@ +apiVersion: 1 + +providers: + - name: 'Kafka Lag Exporter' + orgId: 1 + folder: '' + folderUid: '' + type: file + disableDeletion: false + updateIntervalSeconds: 10 + allowUiUpdates: true + options: + path: /etc/grafana/provisioning/dashboards diff --git a/test-stack-large-cluster/grafana/provisioning/dashboards/kafka-lag.json b/test-stack-large-cluster/grafana/provisioning/dashboards/kafka-lag.json new file mode 100644 index 0000000..051c291 --- /dev/null +++ b/test-stack-large-cluster/grafana/provisioning/dashboards/kafka-lag.json @@ -0,0 +1,1307 @@ +{ + "annotations": { + "list": [] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 1, + "id": null, + "links": [], + "liveNow": false, + "panels": [ + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 0 + }, + "id": 1, + "panels": [], + "title": "Overview", + "type": "row" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 100 + }, + { + "color": "red", + "value": 1000 + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 4, + "w": 5, + "x": 0, + "y": 1 + }, + "id": 2, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": ["lastNotNull"], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "pluginVersion": "10.1.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "sum(kafka_consumergroup_group_sum_lag{cluster_name=~\"$cluster\", group=~\"$consumer_group\"})", + "legendFormat": "Total Lag", + "refId": "A" + } + ], + "title": "Total Consumer Lag", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 10 + }, + { + "color": "red", + "value": 60 + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 4, + "w": 5, + "x": 5, + "y": 1 + }, + "id": 3, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": ["lastNotNull"], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "pluginVersion": "10.1.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "max(kafka_consumergroup_group_max_lag_seconds{cluster_name=~\"$cluster\", group=~\"$consumer_group\"})", + "legendFormat": "Max Time Lag", + "refId": "A" + } + ], + "title": "Max Time Lag", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [ + { + "options": { + "0": { + "color": "red", + "index": 1, + "text": "DOWN" + }, + "1": { + "color": "green", + "index": 0, + "text": "UP" + } + }, + "type": "value" + } + ], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "red", + "value": null + }, + { + "color": "green", + "value": 1 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 4, + "w": 5, + "x": 10, + "y": 1 + }, + "id": 4, + "options": { + "colorMode": "background", + "graphMode": "none", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": ["lastNotNull"], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "pluginVersion": "10.1.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "kafka_lag_exporter_up", + "legendFormat": "Exporter Status", + "refId": "A" + } + ], + "title": "Exporter Status", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 30 + }, + { + "color": "red", + "value": 60 + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 4, + "w": 5, + "x": 15, + "y": 1 + }, + "id": 5, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": ["lastNotNull"], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "pluginVersion": "10.1.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "time() - kafka_lag_exporter_last_update_timestamp_seconds{cluster_name=~\"$cluster\"}", + "legendFormat": "Data Age", + "refId": "A" + } + ], + "title": "Data Age", + "description": "How old is the metrics data. Green < 30s, Yellow 30-60s, Red > 60s", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 4, + "w": 4, + "x": 20, + "y": 1 + }, + "id": 6, + "options": { + "colorMode": "value", + "graphMode": "none", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": ["lastNotNull"], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "pluginVersion": "10.1.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "count(kafka_consumergroup_group_sum_lag{cluster_name=~\"$cluster\"})", + "legendFormat": "Consumer Groups", + "refId": "A" + } + ], + "title": "Consumer Groups", + "type": "stat" + }, + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 5 + }, + "id": 7, + "panels": [], + "title": "Lag Over Time", + "type": "row" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 6 + }, + "id": 8, + "options": { + "legend": { + "calcs": ["mean", "max", "lastNotNull"], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "desc" + } + }, + "pluginVersion": "10.1.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "kafka_consumergroup_group_sum_lag{cluster_name=~\"$cluster\", group=~\"$consumer_group\"}", + "legendFormat": "{{group}}", + "refId": "A" + } + ], + "title": "Consumer Group Total Lag", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 6 + }, + "id": 9, + "options": { + "legend": { + "calcs": ["mean", "max", "lastNotNull"], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "desc" + } + }, + "pluginVersion": "10.1.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "kafka_consumergroup_group_max_lag_seconds{cluster_name=~\"$cluster\", group=~\"$consumer_group\"}", + "legendFormat": "{{group}}", + "refId": "A" + } + ], + "title": "Consumer Group Time Lag (seconds)", + "type": "timeseries" + }, + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 14 + }, + "id": 10, + "panels": [], + "title": "Per Partition", + "type": "row" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic", + "seriesBy": "last" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 15 + }, + "id": 11, + "options": { + "legend": { + "calcs": ["mean", "max", "lastNotNull"], + "displayMode": "table", + "placement": "bottom", + "showLegend": true, + "sortBy": "Last *", + "sortDesc": true + }, + "tooltip": { + "mode": "multi", + "sort": "desc" + } + }, + "pluginVersion": "10.1.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "max by (group, topic, partition) (kafka_consumergroup_group_lag{cluster_name=~\"$cluster\", group=~\"$consumer_group\", topic=~\"$topic\"})", + "legendFormat": "{{group}} - {{topic}}-{{partition}}", + "refId": "A" + } + ], + "title": "Partition Lag", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic", + "seriesBy": "last" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 15 + }, + "id": 12, + "options": { + "legend": { + "calcs": ["mean", "max", "lastNotNull"], + "displayMode": "table", + "placement": "bottom", + "showLegend": true, + "sortBy": "Last *", + "sortDesc": true + }, + "tooltip": { + "mode": "multi", + "sort": "desc" + } + }, + "pluginVersion": "10.1.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "max by (group, topic, partition) (kafka_consumergroup_group_lag_seconds{cluster_name=~\"$cluster\", group=~\"$consumer_group\", topic=~\"$topic\"})", + "legendFormat": "{{group}} - {{topic}}-{{partition}}", + "refId": "A" + } + ], + "title": "Partition Time Lag (seconds)", + "type": "timeseries" + }, + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 23 + }, + "id": 16, + "panels": [], + "title": "Data Loss Prevention", + "type": "row" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "max": 100, + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 50 + }, + { + "color": "orange", + "value": 75 + }, + { + "color": "red", + "value": 90 + } + ] + }, + "unit": "percent" + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 6, + "x": 0, + "y": 24 + }, + "id": 17, + "options": { + "orientation": "auto", + "reduceOptions": { + "calcs": ["lastNotNull"], + "fields": "", + "values": false + }, + "showThresholdLabels": false, + "showThresholdMarkers": true + }, + "pluginVersion": "10.1.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "max(kafka_consumergroup_group_lag_retention_ratio{cluster_name=~\"$cluster\", group=~\"$consumer_group\"})", + "legendFormat": "Max Retention Usage", + "refId": "A" + } + ], + "title": "Max Lag/Retention Ratio", + "description": "Percentage of the retention window consumed by lag. 0% = no lag, >75% = danger zone, 100% = at deletion boundary", + "type": "gauge" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 1 + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 6, + "x": 6, + "y": 24 + }, + "id": 18, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": ["lastNotNull"], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "pluginVersion": "10.1.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "sum(kafka_lag_exporter_data_loss_partitions_total{cluster_name=~\"$cluster\"})", + "legendFormat": "Partitions with Data Loss", + "refId": "A" + } + ], + "title": "Data Loss Partitions", + "description": "Number of partitions where messages were deleted before consumer could process them", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic", + "seriesBy": "last" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "line" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 75 + }, + { + "color": "red", + "value": 100 + } + ] + }, + "unit": "percent" + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 12, + "x": 12, + "y": 24 + }, + "id": 19, + "options": { + "legend": { + "calcs": ["mean", "max", "lastNotNull"], + "displayMode": "table", + "placement": "bottom", + "showLegend": true, + "sortBy": "Last *", + "sortDesc": true + }, + "tooltip": { + "mode": "multi", + "sort": "desc" + } + }, + "pluginVersion": "10.1.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "max by (group, topic, partition) (kafka_consumergroup_group_lag_retention_ratio{cluster_name=~\"$cluster\", group=~\"$consumer_group\", topic=~\"$topic\"})", + "legendFormat": "{{group}} - {{topic}}-{{partition}}", + "refId": "A" + } + ], + "title": "Lag/Retention Ratio Over Time (%)", + "description": "How much of the retention window is consumed by lag. >75% = danger zone, 100% = at deletion boundary, >100% = data loss", + "type": "timeseries" + }, + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 30 + }, + "id": 13, + "panels": [], + "title": "Throughput & Distribution", + "type": "row" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic", + "seriesBy": "last" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 31 + }, + "id": 14, + "options": { + "legend": { + "calcs": ["lastNotNull"], + "displayMode": "table", + "placement": "bottom", + "showLegend": true, + "sortBy": "Last *", + "sortDesc": true + }, + "tooltip": { + "mode": "multi", + "sort": "desc" + } + }, + "pluginVersion": "10.1.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "max by (topic, partition) (kafka_partition_latest_offset{cluster_name=~\"$cluster\", topic=~\"$topic\"})", + "legendFormat": "{{topic}}-{{partition}}", + "refId": "A" + } + ], + "title": "Partition Latest Offsets", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic", + "seriesBy": "last" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 31 + }, + "id": 15, + "options": { + "legend": { + "calcs": ["lastNotNull"], + "displayMode": "table", + "placement": "bottom", + "showLegend": true, + "sortBy": "Last *", + "sortDesc": true + }, + "tooltip": { + "mode": "multi", + "sort": "desc" + } + }, + "pluginVersion": "10.1.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "sum by (topic) (rate(kafka_partition_latest_offset{cluster_name=~\"$cluster\", topic=~\"$topic\"}[1m]))", + "legendFormat": "{{topic}} (produced)", + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "expr": "sum by (group, topic) (rate(kafka_consumergroup_group_offset{cluster_name=~\"$cluster\", group=~\"$consumer_group\", topic=~\"$topic\"}[1m]))", + "legendFormat": "{{group}} - {{topic}} (consumed)", + "refId": "B" + } + ], + "title": "Topic Message Rate (msg/sec)", + "type": "timeseries" + } + ], + "refresh": "10s", + "schemaVersion": 38, + "style": "dark", + "tags": ["kafka", "lag", "consumer"], + "templating": { + "list": [ + { + "current": { + "selected": false, + "text": "Prometheus", + "value": "Prometheus" + }, + "hide": 0, + "includeAll": false, + "label": "Data Source", + "multi": false, + "name": "datasource", + "options": [], + "query": "prometheus", + "refresh": 1, + "regex": "", + "skipUrlSync": false, + "type": "datasource" + }, + { + "current": {}, + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "definition": "label_values(kafka_consumergroup_group_sum_lag, cluster_name)", + "hide": 0, + "includeAll": true, + "label": "Cluster", + "multi": true, + "name": "cluster", + "options": [], + "query": { + "query": "label_values(kafka_consumergroup_group_sum_lag, cluster_name)", + "refId": "StandardVariableQuery" + }, + "refresh": 2, + "regex": "", + "skipUrlSync": false, + "sort": 1, + "type": "query" + }, + { + "current": {}, + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "definition": "label_values(kafka_consumergroup_group_sum_lag{cluster_name=~\"$cluster\"}, group)", + "hide": 0, + "includeAll": true, + "label": "Consumer Group", + "multi": true, + "name": "consumer_group", + "options": [], + "query": { + "query": "label_values(kafka_consumergroup_group_sum_lag{cluster_name=~\"$cluster\"}, group)", + "refId": "StandardVariableQuery" + }, + "refresh": 2, + "regex": "", + "skipUrlSync": false, + "sort": 1, + "type": "query" + }, + { + "current": {}, + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, + "definition": "label_values(kafka_consumergroup_group_lag{cluster_name=~\"$cluster\", group=~\"$consumer_group\"}, topic)", + "hide": 0, + "includeAll": true, + "label": "Topic", + "multi": true, + "name": "topic", + "options": [], + "query": { + "query": "label_values(kafka_consumergroup_group_lag{cluster_name=~\"$cluster\", group=~\"$consumer_group\"}, topic)", + "refId": "StandardVariableQuery" + }, + "refresh": 2, + "regex": "", + "skipUrlSync": false, + "sort": 1, + "type": "query" + } + ] + }, + "time": { + "from": "now-15m", + "to": "now" + }, + "timepicker": {}, + "timezone": "", + "title": "Kafka Consumer Lag", + "uid": "kafka-lag-dashboard", + "version": 1, + "weekStart": "" +} diff --git a/test-stack-large-cluster/grafana/provisioning/datasources/datasource.yml b/test-stack-large-cluster/grafana/provisioning/datasources/datasource.yml new file mode 100644 index 0000000..bb009bb --- /dev/null +++ b/test-stack-large-cluster/grafana/provisioning/datasources/datasource.yml @@ -0,0 +1,9 @@ +apiVersion: 1 + +datasources: + - name: Prometheus + type: prometheus + access: proxy + url: http://prometheus:9090 + isDefault: true + editable: false diff --git a/test-stack-large-cluster/klag-config.toml b/test-stack-large-cluster/klag-config.toml new file mode 100644 index 0000000..e5a1c21 --- /dev/null +++ b/test-stack-large-cluster/klag-config.toml @@ -0,0 +1,30 @@ +# klag-exporter configuration for large-cluster FD exhaustion test + +[exporter] +poll_interval = "60s" +http_port = 8000 +http_host = "0.0.0.0" +granularity = "partition" + +[exporter.timestamp_sampling] +enabled = true +cache_ttl = "30s" +max_concurrent_fetches = 10 + +[exporter.otel] +enabled = true +endpoint = "http://otel-collector:4317" +export_interval = "15s" + +[[clusters]] +name = "large-cluster" +bootstrap_servers = "kafka:29092" +group_whitelist = [".*"] +group_blacklist = [] +topic_whitelist = [".*"] +topic_blacklist = ["^__.*"] + +[clusters.consumer_properties] + +[clusters.labels] +environment = "fd-exhaustion-test" diff --git a/test-stack-large-cluster/otel/otel-collector-config.yaml b/test-stack-large-cluster/otel/otel-collector-config.yaml new file mode 100644 index 0000000..f90d33d --- /dev/null +++ b/test-stack-large-cluster/otel/otel-collector-config.yaml @@ -0,0 +1,42 @@ +# OpenTelemetry Collector configuration for klag-exporter test stack +# Receives OTLP metrics from klag-exporter and exports to multiple backends + +receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + http: + endpoint: 0.0.0.0:4318 + +processors: + batch: + timeout: 10s + send_batch_size: 1024 + +exporters: + # Debug exporter - logs metrics to stdout + debug: + verbosity: detailed + sampling_initial: 5 + sampling_thereafter: 200 + + # Prometheus exporter - makes metrics scrapable at :8889/metrics + prometheus: + endpoint: 0.0.0.0:8889 + namespace: klag + const_labels: + source: otel_collector + +service: + pipelines: + metrics: + receivers: [otlp] + processors: [batch] + exporters: [debug, prometheus] + + telemetry: + logs: + level: info + metrics: + address: 0.0.0.0:8888 diff --git a/test-stack-large-cluster/prometheus/prometheus.yml b/test-stack-large-cluster/prometheus/prometheus.yml new file mode 100644 index 0000000..79d2535 --- /dev/null +++ b/test-stack-large-cluster/prometheus/prometheus.yml @@ -0,0 +1,14 @@ +global: + scrape_interval: 15s + evaluation_interval: 15s + +scrape_configs: + - job_name: 'prometheus' + static_configs: + - targets: ['localhost:9090'] + + - job_name: 'klag-exporter' + static_configs: + - targets: ['klag-exporter:8000'] + scrape_interval: 10s + metrics_path: /metrics diff --git a/test-stack-large-cluster/scripts/consumer.sh b/test-stack-large-cluster/scripts/consumer.sh new file mode 100755 index 0000000..4358d80 --- /dev/null +++ b/test-stack-large-cluster/scripts/consumer.sh @@ -0,0 +1,54 @@ +#!/bin/bash + +BOOTSTRAP="kafka:29092" +NUM_TOPICS="${NUM_TOPICS:-500}" +NUM_GROUPS="${NUM_CONSUMER_GROUPS:-10}" + +TOPICS_PER_GROUP=$((NUM_TOPICS / NUM_GROUPS)) + +echo "Starting $NUM_GROUPS consumer groups, each covering ~$TOPICS_PER_GROUP topics..." + +# Launch one consumer per group, each subscribing to a slice of topics +for ((g=1; g<=NUM_GROUPS; g++)); do + START=$(( (g - 1) * TOPICS_PER_GROUP + 1 )) + # Last group picks up any remainder from integer division + if (( g == NUM_GROUPS )); then + END=$NUM_TOPICS + else + END=$(( g * TOPICS_PER_GROUP )) + fi + + # Build a regex pattern for this group's topic range + # e.g., group 1 gets topic-1 through topic-50 + TOPIC_LIST="" + for ((t=START; t<=END; t++)); do + if [ -n "$TOPIC_LIST" ]; then + TOPIC_LIST="${TOPIC_LIST}|" + fi + TOPIC_LIST="${TOPIC_LIST}topic-${t}" + done + + GROUP_ID="consumer-group-${g}" + echo "Starting $GROUP_ID for topics $START..$END" + + # Consume slowly to maintain lag + ( + while true; do + kafka-console-consumer \ + --bootstrap-server "$BOOTSTRAP" \ + --group "$GROUP_ID" \ + --include "^(${TOPIC_LIST})$" \ + --max-messages 10 \ + --timeout-ms 30000 \ + >> "/tmp/${GROUP_ID}.log" 2>&1 + # Intentional delay to build lag + sleep 15 + done + ) & +done + +echo "All $NUM_GROUPS consumer groups started." +echo "Consumers are intentionally slow to maintain lag for klag-exporter to detect." + +# Keep container alive +wait diff --git a/test-stack-large-cluster/scripts/create-topics.sh b/test-stack-large-cluster/scripts/create-topics.sh new file mode 100755 index 0000000..10d78cd --- /dev/null +++ b/test-stack-large-cluster/scripts/create-topics.sh @@ -0,0 +1,39 @@ +#!/bin/bash +set -e + +BOOTSTRAP="kafka:29092" +NUM_TOPICS="${NUM_TOPICS:-500}" +PARTITIONS="${PARTITIONS_PER_TOPIC:-3}" +BATCH_SIZE=50 + +echo "Creating $NUM_TOPICS topics with $PARTITIONS partitions each..." + +created=0 +for ((i=1; i<=NUM_TOPICS; i++)); do + kafka-topics --bootstrap-server "$BOOTSTRAP" \ + --create --if-not-exists \ + --topic "topic-$i" \ + --partitions "$PARTITIONS" \ + --replication-factor 1 & + + # Wait in batches to avoid overwhelming the broker + if (( i % BATCH_SIZE == 0 )); then + if ! wait; then + echo "ERROR: Some topic creations failed in batch ending at topic-$i" + exit 1 + fi + created=$i + echo "Created $created / $NUM_TOPICS topics..." + fi +done + +# Wait for any remaining +if ! wait; then + echo "ERROR: Some topic creations failed in final batch" + exit 1 +fi +echo "All $NUM_TOPICS topics created successfully." + +# Verify +ACTUAL=$(kafka-topics --bootstrap-server "$BOOTSTRAP" --list | grep -c '^topic-' || true) +echo "Verified: $ACTUAL topics exist matching 'topic-*' pattern." diff --git a/test-stack-large-cluster/scripts/producer.sh b/test-stack-large-cluster/scripts/producer.sh new file mode 100755 index 0000000..318a982 --- /dev/null +++ b/test-stack-large-cluster/scripts/producer.sh @@ -0,0 +1,31 @@ +#!/bin/bash + +BOOTSTRAP="kafka:29092" +NUM_TOPICS="${NUM_TOPICS:-500}" +BATCH_SIZE=50 + +echo "Starting bulk producer for $NUM_TOPICS topics..." + +# Continuously produce messages to all topics in round-robin +CYCLE=0 +while true; do + CYCLE=$((CYCLE + 1)) + echo "[$(date)] Producer cycle $CYCLE - writing to $NUM_TOPICS topics" + + for ((i=1; i<=NUM_TOPICS; i++)); do + # Produce 2 messages per topic per cycle using a single producer invocation + printf "msg-${CYCLE}-1-$(date +%s%N)\nmsg-${CYCLE}-2-$(date +%s%N)\n" | \ + kafka-console-producer \ + --bootstrap-server "$BOOTSTRAP" \ + --topic "topic-$i" >> /tmp/producer.log 2>&1 & + + # Batch parallelism: wait every BATCH_SIZE topics to avoid process table overflow + if (( i % BATCH_SIZE == 0 )); then + wait + fi + done + wait + + echo "[$(date)] Cycle $CYCLE complete - produced to all $NUM_TOPICS topics" + sleep 5 +done