Skip to content

feat: Add KafkaBackend implementation #58

@alob-mtc

Description

@alob-mtc

Summary

Add a KafkaBackend implementation to support Apache Kafka as a storage backend for RunnerQ.

Background

With the pluggable storage abstraction layer (QueueStorage and InspectionStorage traits), RunnerQ can now support alternative backends beyond Redis. Kafka is a natural fit for high-throughput, distributed activity processing scenarios where:

  • Durability - Kafka's log-based storage provides strong durability guarantees
  • Scalability - Horizontal scaling across partitions and consumer groups
  • Event Sourcing - Natural fit for activity event streams
  • Enterprise Adoption - Many organizations already have Kafka infrastructure

Proposed Implementation

File Structure

src/storage/
├── mod.rs
├── traits.rs
├── error.rs
├── redis/
│   └── ...
└── kafka/
    ├── mod.rs
    ├── queue.rs
    ├── inspection.rs
    └── producer.rs

API

use runner_q::storage::KafkaBackend;

let backend = KafkaBackend::builder()
    .brokers("localhost:9092")
    .queue_name("runnerq-activities")
    .consumer_group("runnerq-workers")
    .build()
    .await?;

let engine = WorkerEngine::builder()
    .backend(Arc::new(backend))
    .max_workers(8)
    .build()
    .await?;

Topic Structure

runnerq-{queue_name}-pending      # Activities waiting to be processed
runnerq-{queue_name}-processing   # Activities currently being processed (with lease tracking)
runnerq-{queue_name}-completed    # Completed activities (compacted topic)
runnerq-{queue_name}-dlq          # Dead letter queue
runnerq-{queue_name}-events       # Activity lifecycle events (for observability)

Design Considerations

QueueStorage Implementation

Method Kafka Approach
enqueue() Produce to pending topic with priority in headers
dequeue() Consume from pending, produce to processing with lease timestamp
ack_success() Remove from processing, produce to completed
ack_failure() Requeue to pending or move to DLQ based on retry count
process_scheduled() Use Kafka Streams or a timer-based approach
requeue_expired() Scan processing topic for expired leases

InspectionStorage Implementation

  • Use Kafka Admin API for topic statistics
  • Query completed/DLQ topics for activity history
  • Use Kafka Streams for real-time event streaming

Challenges

  • Priority Queues: Kafka doesn't natively support priority; may need multiple topics or consumer-side sorting
  • Scheduled Activities: Requires additional mechanism (timer topic, external scheduler, or Kafka Streams)
  • Lease Management: Need to track leases separately (compacted topic or external store)
  • Random Access: Kafka is optimized for sequential reads; get_activity() may need secondary index

Tasks

  • Research Kafka Rust client options (rdkafka, kafka-rust)
  • Design topic structure and partitioning strategy
  • Create src/storage/kafka/ module structure
  • Implement QueueStorage trait for KafkaBackend
  • Implement InspectionStorage trait for KafkaBackend
  • Handle priority queue semantics
  • Implement scheduled activity support
  • Add lease tracking mechanism
  • Write integration tests (requires Kafka/Docker)
  • Add documentation and examples
  • Update README with Kafka usage section

Feature Flags

[features]
default = ["redis"]
redis = ["bb8-redis", "redis"]
kafka = ["rdkafka"]

Related

  • Storage abstraction layer implementation (completed)
  • QueueStorage and InspectionStorage traits

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions