Kafkat is a powerful Python CLI tool for searching and analyzing data in Apache Kafka topics. It provides an intuitive interface for real-time message searching with regex patterns, topic filtering, and flexible output formatting.
- 🔍 Regex Search: Search messages using powerful regex patterns
- đź“‹ Topic Filtering: Search across multiple topics with pattern matching
- 🎨 Pretty Output: JSON formatting with syntax highlighting
- ⚙️ Configuration: Persistent configuration management
- 📊 Progress Tracking: Real-time search progress indicators
- 🚀 Performance: Efficient message processing with configurable limits
- đź”§ Flexible: Extensive CLI options for customization
- 📝 Logging: Configurable logging with multiple verbosity levels
pip install kafkatgit clone https://github.com/danielrus/kafkat.git
cd kafkat
pip install -e .# Search for a specific pattern in all topics
kafkat --search "error"
# Search in specific topics with pretty output
kafkat --search "order_id.*12345" -t "orders.*" -t "payments.*" --prettify
# List available topics
kafkat --list-topics -t "user.*"# Show current configuration
kafkat --config-show
# Reset configuration to defaults
kafkat --config-reset| Option | Short | Description | Default |
|---|---|---|---|
--bootstrap-server |
-b |
Kafka bootstrap servers (comma-separated) | localhost:9092 |
--auto-commit/--no-auto-commit |
-c/-C |
Enable/disable auto commit | True |
--auto-offset-reset |
-o |
Offset reset strategy | earliest |
--separator |
-e |
Field separator for output | , |
--prettify/--no-prettify |
-p/-P |
Pretty print JSON output | False |
--topic |
-t |
Topic patterns (multiple allowed) | .* |
--search |
-s |
Search regex pattern | .* |
--max-messages |
-m |
Maximum messages to process | 1000 |
--timeout |
Consumer timeout in seconds | 10 |
|
--verbose |
-v |
Enable verbose logging | False |
--quiet |
-q |
Suppress non-error output | False |
--list-topics |
List matching topics and exit | False |
|
--config-show |
Show current configuration | False |
|
--config-reset |
Reset configuration to defaults | False |
|
--version |
Show version and exit | ||
--help |
Show help message |
kafkat --search '.*982834084219' -t 'orders.*' -t 'payments.*' --prettifyOutput:
{
"topic": "orders.processed",
"partition": 0,
"offset": 12345,
"timestamp": "2023-12-07T10:30:45.123Z",
"value": {
"order_id": "982834084219",
"status": "processed",
"amount": 99.99
}
}kafkat -b "kafka1:9092,kafka2:9092" --search "ERROR|FATAL" -t "logs.*" --max-messages 500kafkat --list-topics -t "user.*" -t "profile.*"Output:
Found 5 topics:
user.created
user.updated
user.deleted
profile.events
profile.changes
kafkat --search "payment.*failed" -t "transactions.*" --verboseKafkat stores configuration in ~/.kafkat/config.json. You can modify settings using CLI options, and they will be persisted for future use.
{
"bootstrap_servers": "localhost:9092",
"auto_commit": true,
"auto_offset_reset": "earliest",
"separator": ",",
"prettify": false,
"timeout_ms": 10000,
"max_messages": 1000,
"consumer_timeout_ms": 5000
}You can set environment variables to override default settings:
export KAFKAT_BOOTSTRAP_SERVERS="kafka1:9092,kafka2:9092"
export KAFKAT_AUTO_OFFSET_RESET="latest"
kafkat --search "error"Kafkat supports full Python regex syntax for powerful searching:
# Case-insensitive search
kafkat --search "(?i)error"
# Search for JSON field
kafkat --search '"status":\s*"failed"'
# Search for IP addresses
kafkat --search '\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b'
# Search for UUIDs
kafkat --search '[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}'Control output format based on your needs:
# Compact JSON (one line per message)
kafkat --search "order" --no-prettify
# Pretty formatted JSON
kafkat --search "order" --prettify
# Quiet mode (errors only)
kafkat --search "order" --quiet- Limit Message Count: Use
--max-messagesto prevent processing too many messages - Specific Topics: Use specific topic patterns instead of
.*for better performance - Efficient Patterns: Use specific regex patterns rather than broad ones
- Timeout Settings: Adjust
--timeoutbased on your Kafka cluster performance
Kafkat provides detailed error messages and proper exit codes:
0: Success1: General error (connection, search, etc.)130: Interrupted by user (Ctrl+C)
# Test connectivity
kafkat --list-topics --verbose
# Use specific bootstrap servers
kafkat -b "your-kafka-server:9092" --list-topics# Check if topics exist
kafkat --list-topics -t "your-pattern"
# Try different offset reset
kafkat --search "pattern" --auto-offset-reset latestgit clone https://github.com/danielrus/kafkat.git
cd kafkat
pip install -e ".[dev]"pytest tests/black kafkat/
flake8 kafkat/
mypy kafkat/- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
This project is licensed under the MIT License - see the LICENSE file for details.
- Complete rewrite with improved architecture
- Added configuration management
- Enhanced error handling and logging
- Better CLI interface with more options
- Improved search performance
- Added progress indicators
- Signal handling for graceful shutdown
- Initial release
- Basic search functionality
- Simple CLI interface
If you encounter any issues or have questions:
- Check the Issues page
- Create a new issue with detailed information
- Include logs with
--verboseflag for debugging
Author: Daniel Rus
Email: dani@fsck.ro
License: MIT
