added testing env for large cluster setup#45
Conversation
There was a problem hiding this comment.
Pull request overview
This PR adds a new test environment (test-stack-large-cluster) designed to reproduce and test file descriptor exhaustion issues with klag-exporter under high-scale scenarios. The setup mirrors the existing test-stack structure but is configured for larger-scale testing with hundreds of topics, multiple consumer groups, and configurable file descriptor limits.
Changes:
- Adds a complete Docker Compose test stack with Kafka, klag-exporter, Prometheus, Grafana, OpenTelemetry Collector, and Kafka UI
- Includes automated scripts for creating topics, producing messages, and consuming with intentional lag
- Provides Grafana dashboard for visualizing Kafka consumer lag metrics
- Uses network and container name prefixes (
lc-) to allow running alongside the existing test-stack
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 17 comments.
Show a summary per file
| File | Description |
|---|---|
| docker-compose.yml | Multi-service stack definition with resource limits and health checks for FD exhaustion testing |
| scripts/producer.sh | Bulk message producer that writes to all topics in parallel batches |
| scripts/create-topics.sh | Batch topic creation script with configurable partition count |
| scripts/consumer.sh | Consumer group setup that intentionally creates lag across topic ranges |
| klag-config.toml | klag-exporter configuration with OTLP export and timestamp sampling enabled |
| prometheus/prometheus.yml | Prometheus scrape configuration for klag-exporter metrics |
| otel/otel-collector-config.yaml | OpenTelemetry Collector configuration for receiving and exporting metrics |
| grafana/provisioning/* | Grafana datasource and dashboard provisioning for Kafka lag visualization |
| README.md | Setup instructions and configuration documentation |
| .env | Environment variable defaults for test scenarios |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
- Replaced list_consumer_group_offsets() — was using
committed_offsets() on the shared consumer (which only works for the
consumer's own group.id). New implementation uses rdkafka-sys FFI
to call rd_kafka_ListConsumerGroupOffsets through the existing
AdminClient's native handle. Takes group_id, partitions, and timeout
as parameters.
- The FFI method uses an RAII Cleanup guard to ensure all C objects
(queue, event, options, request, partition list) are freed on any
exit path.
- Removed bootstrap_servers() and consumer_properties() accessors — no
longer needed.
klag-exporter/src/collector/offset_collector.rs
- Rewrote fetch_all_group_offsets_parallel() to call
client.list_consumer_group_offsets() instead of
fetch_group_offsets_standalone(). Same semaphore-based concurrency,
same error
handling.
- Removed fetch_group_offsets_standalone() (60-line function creating
a BaseConsumer per group).
- Removed fetch_group_offsets() (sequential version, also created
per-group consumers).
- Updated the sequential collect() to also use the Admin API.
klag-exporter/src/error.rs
- Added KlagError::Admin(String) variant for Admin API FFI errors.
Impact
- Zero additional FDs for offset fetching — routes through the
existing AdminClient connection.
- No connection churn — eliminates TCP/TLS/SASL handshake per group
per cycle.
- Timestamp fetching unaffected — TimestampConsumer is a separate
component.
Files modified
klag-exporter/src/kafka/consumer.rs — Rewrote TimestampConsumer from a
per-fetch consumer factory to a pool-based design:
- Constructor with_pool_size(config, pool_size) pre-creates N
BaseConsumer instances
- acquire() takes a consumer from the pool (or creates a temporary one
if exhausted)
- release() unassigns partitions (assign(empty)) and returns the
consumer to the pool
- fetch_timestamp() borrows a consumer, assigns → polls → releases —
no consumer creation/destruction per fetch
- Removed the unused new() constructor
klag-exporter/src/cluster/manager.rs — Updated to call
TimestampConsumer::with_pool_size(&config, max_concurrent_fetches) so
the pool size matches the concurrency setting.
Impact
- Connection churn eliminated: Consumers persist across fetch cycles
instead of being created/destroyed each time
- No TCP/TLS/SASL handshake per timestamp fetch: Pool consumers
maintain their broker connections
- Same concurrency model: Semaphore-based backpressure unchanged, pool
size = max_concurrent_fetches
- Graceful overflow: If more concurrent requests than pool size,
temporary consumers are created (same as before)
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 17 out of 17 changed files in this pull request and generated 9 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
No description provided.