Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/cluster/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ impl ClusterManager {
OffsetCollector::with_performance(Arc::clone(&client), filters, performance.clone());

let timestamp_sampler = if exporter_config.timestamp_sampling.enabled {
let ts_consumer = TimestampConsumer::new(&config)?;
let ts_consumer = TimestampConsumer::with_pool_size(
&config,
exporter_config.timestamp_sampling.max_concurrent_fetches,
)?;
Some(TimestampSampler::new(
ts_consumer,
exporter_config.timestamp_sampling.cache_ttl,
Expand Down
170 changes: 15 additions & 155 deletions src/collector/offset_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use crate::error::Result;
use crate::kafka::client::{KafkaClient, TopicPartition};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tracing::{debug, instrument, warn};

Expand Down Expand Up @@ -100,9 +99,14 @@ impl OffsetCollector {
debug!(partitions = watermarks.len(), "Fetched watermarks");

// Build group snapshots
let wm_keys: Vec<TopicPartition> = watermarks.keys().cloned().collect();
let mut groups = Vec::with_capacity(descriptions.len());
for desc in descriptions {
let offsets = self.fetch_group_offsets(&desc.group_id, &watermarks)?;
let offsets = self.client.list_consumer_group_offsets(
&desc.group_id,
&wm_keys,
self.performance.offset_fetch_timeout,
)?;

// Filter offsets by topic whitelist/blacklist
let filtered_offsets: HashMap<TopicPartition, i64> = offsets
Expand Down Expand Up @@ -242,6 +246,7 @@ impl OffsetCollector {
}

/// Fetch offsets for all groups in parallel with bounded concurrency.
/// Uses the Admin API (ListConsumerGroupOffsets) through the shared AdminClient — no per-group consumers needed.
async fn fetch_all_group_offsets_parallel(
&self,
descriptions: &[crate::kafka::client::GroupDescription],
Expand All @@ -253,42 +258,31 @@ impl OffsetCollector {
debug!(
groups = descriptions.len(),
max_concurrent = max_concurrent,
"Fetching group offsets in parallel"
"Fetching group offsets in parallel via Admin API"
);

let semaphore = Arc::new(Semaphore::new(max_concurrent));
let mut handles = Vec::with_capacity(descriptions.len());

// Get bootstrap servers once, not per-task
let bootstrap_servers = self.client.bootstrap_servers().to_string();
let consumer_properties = self.client.consumer_properties().clone();
let client = Arc::clone(&self.client);
let wm_keys: Vec<TopicPartition> = watermarks.keys().cloned().collect();

let mut handles = Vec::with_capacity(descriptions.len());

for desc in descriptions {
let group_id = desc.group_id.clone();
let permit = semaphore.clone();
let bootstrap = bootstrap_servers.clone();
let props = consumer_properties.clone();
let client_clone = Arc::clone(&client);
let partitions = wm_keys.clone();
let timeout = offset_timeout;

// Spawn async task that properly awaits the semaphore before spawning blocking work
let handle = tokio::spawn(async move {
// Acquire permit - this properly awaits until one is available
let permit_guard: OwnedSemaphorePermit =
permit.acquire_owned().await.expect("semaphore closed");

// Now spawn the blocking task with the permit held
tokio::task::spawn_blocking(move || {
let _permit = permit_guard; // Hold permit until blocking work completes
let _permit = permit_guard;

let offsets = Self::fetch_group_offsets_standalone(
&group_id,
&bootstrap,
&props,
&partitions,
timeout,
);
let offsets =
client_clone.list_consumer_group_offsets(&group_id, &partitions, timeout);

(group_id, offsets)
})
Expand All @@ -298,7 +292,6 @@ impl OffsetCollector {
handles.push(handle);
}

// Wait for all tasks to complete (nested Result from tokio::spawn -> spawn_blocking)
let results = futures::future::join_all(handles).await;

let mut all_offsets = HashMap::new();
Expand All @@ -322,139 +315,6 @@ impl OffsetCollector {

all_offsets
}

/// Standalone function to fetch group offsets (for use in spawn_blocking)
fn fetch_group_offsets_standalone(
group_id: &str,
bootstrap_servers: &str,
consumer_properties: &HashMap<String, String>,
watermark_partitions: &[TopicPartition],
timeout: Duration,
) -> Result<HashMap<TopicPartition, i64>> {
use rdkafka::config::ClientConfig;
use rdkafka::consumer::{BaseConsumer, Consumer};
use rdkafka::TopicPartitionList;

if bootstrap_servers.is_empty() {
return Ok(HashMap::new());
}

let mut client_config = ClientConfig::new();

// Apply consumer properties first (security settings, etc.)
for (key, value) in consumer_properties {
client_config.set(key, value);
}

client_config
.set("bootstrap.servers", bootstrap_servers)
.set("group.id", group_id)
.set("enable.auto.commit", "false");

let consumer: BaseConsumer = match client_config.create() {
Ok(c) => c,
Err(e) => {
warn!(group = group_id, error = %e, "Failed to create consumer for group");
return Ok(HashMap::new());
}
};

// Build topic partition list from watermarks
let mut tpl = TopicPartitionList::new();
for tp in watermark_partitions {
tpl.add_partition(&tp.topic, tp.partition);
}

// Fetch committed offsets
let committed = match consumer.committed_offsets(tpl, timeout) {
Ok(c) => c,
Err(e) => {
warn!(group = group_id, error = %e, "Failed to fetch committed offsets");
return Ok(HashMap::new());
}
};

let mut offsets = HashMap::new();
for elem in committed.elements() {
if let rdkafka::Offset::Offset(offset) = elem.offset() {
offsets.insert(TopicPartition::new(elem.topic(), elem.partition()), offset);
}
}

Ok(offsets)
}

/// Fetch offsets for a single group (used by sequential collect method).
#[allow(dead_code)]
fn fetch_group_offsets(
&self,
group_id: &str,
watermarks: &HashMap<TopicPartition, (i64, i64)>,
) -> Result<HashMap<TopicPartition, i64>> {
// We need to create a consumer for this specific group to fetch committed offsets
use rdkafka::config::ClientConfig;
use rdkafka::consumer::{BaseConsumer, Consumer};
use rdkafka::TopicPartitionList;

let _cluster = self.client.cluster_name();
let mut client_config = ClientConfig::new();

// Get bootstrap servers from the existing client's metadata
let metadata = self.client.fetch_metadata()?;

// Build bootstrap servers from metadata brokers
let bootstrap_servers: Vec<String> = metadata
.brokers()
.iter()
.map(|b| format!("{}:{}", b.host(), b.port()))
.collect();

if bootstrap_servers.is_empty() {
return Ok(HashMap::new());
}

// Apply consumer properties first (security settings, etc.)
for (key, value) in self.client.consumer_properties() {
client_config.set(key, value);
}

client_config
.set("bootstrap.servers", bootstrap_servers.join(","))
.set("group.id", group_id)
.set("enable.auto.commit", "false");

let consumer: BaseConsumer = match client_config.create() {
Ok(c) => c,
Err(e) => {
warn!(group = group_id, error = %e, "Failed to create consumer for group");
return Ok(HashMap::new());
}
};

// Build topic partition list from watermarks
let mut tpl = TopicPartitionList::new();
for tp in watermarks.keys() {
tpl.add_partition(&tp.topic, tp.partition);
}

// Fetch committed offsets
let committed = match consumer.committed_offsets(tpl, std::time::Duration::from_secs(10)) {
Ok(c) => c,
Err(e) => {
warn!(group = group_id, error = %e, "Failed to fetch committed offsets");
return Ok(HashMap::new());
}
};

let mut offsets = HashMap::new();
for elem in committed.elements() {
if let rdkafka::Offset::Offset(offset) = elem.offset() {
offsets.insert(TopicPartition::new(elem.topic(), elem.partition()), offset);
}
}

Ok(offsets)
}
}

impl OffsetsSnapshot {
Expand Down
3 changes: 3 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ pub enum KlagError {
#[allow(dead_code)]
#[error("Leadership error: {0}")]
Leadership(String),

#[error("Admin API error: {0}")]
Admin(String),
}

pub type Result<T> = std::result::Result<T, KlagError>;
Loading