Routilux — Event-driven workflow orchestration. Too many pipelines to tame? One event queue for orchestration, concurrency, and resume-from-checkpoint. Build in minutes, recover anytime.
- 🚀 Event queue: Non-blocking, one model for sequential and concurrent execution
- 🔗 Flexible wiring: Many-to-many routines, smart routing
- 📊 State built-in: Execution state, metrics, history out of the box
- 🛡️ Error policies: STOP / CONTINUE / RETRY / SKIP with automatic recovery
- ⚡ Concurrent execution: I/O parallelized without blocking the main flow
- 💾 Checkpoint & resume: Save and restore at any node; survive interruptions
- 🎯 Production-ready: Error handling, tracing, and monitoring
- 🎨 Simple API: Flow auto-detection; fewer parameters in most cases
- Data Pipelines: ETL processes, data transformation workflows
- API Orchestration: Coordinating multiple API calls with complex dependencies
- Event Processing: Real-time event streams and reactive systems
- Workflow Automation: Business process automation and task scheduling
- Microservices Coordination: Managing interactions between services
- LLM Agent Workflows: Complex AI agent orchestration and chaining
# Mac / Linux - Auto-detects best method (uv > pipx > pip)
curl -fsSL https://raw.githubusercontent.com/lzjever/routilux/main/install.sh | bash
# Or with wget
wget -qO- https://raw.githubusercontent.com/lzjever/routilux/main/install.sh | bashInstallation options:
# Use pipx instead of uv
METHOD=pipx curl -fsSL https://raw.githubusercontent.com/lzjever/routilux/main/install.sh | bash
# Install specific version
VERSION=0.14.0 curl -fsSL https://raw.githubusercontent.com/lzjever/routilux/main/install.sh | bashThe best way to install Routilux CLI without affecting your system:
# Install CLI with isolated environment
pipx install "routilux[cli]"
# Use anywhere
routilux --help
routilux run workflow.yamlWhy pipx?
- ✅ Creates isolated virtual environment (no dependency conflicts)
- ✅ CLI available globally
- ✅ Easy to update:
pipx upgrade routilux - ✅ Works on Mac and Linux
Using uv (faster than pipx):
# Install
uv tool install "routilux[cli]"
# Use
routilux --help# Add tap and install
brew tap lzjever/routilux
brew install routilux
# Or directly
brew install lzjever/routilux/routiluxFor library use or development:
# Library only
pip install routilux
# With CLI support
pip install "routilux[cli]"This project uses uv for fast dependency management. Install uv first:
curl -LsSf https://astral.sh/uv/install.sh | shThen set up the development environment:
Recommended: For active development
# Install package with all development dependencies (recommended)
make dev-install
# Or manually with uv (dev group is installed by default)
uv sync --group docs --all-extrasAlternative: Dependencies only (for CI/CD or code review)
# Create virtual environment and install dependencies only (without installing the package)
# Useful for: CI/CD pipelines, code review, or when you only need development tools
make setup-venv
# Later, if you need to install the package:
make installUnderstanding dependency groups vs extras:
- Dependency groups (
dev,docs): Development dependencies that are not published to PyPI. Thedevgroup is installed by default withuv sync. - Extras: Currently none, but may be added in the future.
All make commands will automatically use uv if available, otherwise fall back to pip.
For development with all dependencies using pip:
pip install -e ".[dev]"
# Or using Makefile
make dev-installRoutilux includes a command-line interface for workflow management:
# Install with CLI support
pip install routilux[cli]
# Run a workflow
routilux run --workflow flow.yaml
# Start server
routilux server start
# See all commands
routilux --helproutilux init- Initialize a new project with example filesroutilux run- Execute a workflow from a DSL fileroutilux server- Start the HTTP server for API accessroutilux job- Submit and manage jobsroutilux list- List available routines or flowsroutilux validate- Validate a workflow DSL file
See CLI Documentation for details.
Start the HTTP server with flow auto-loading:
# Start server with flows directory
routilux server start --flows-dir ./flows --port 8080
# Built-in routines (Mapper, Filter, etc.) are automatically available
# Flows from ./flows/*.yaml are loaded at startup
# Hot reload enabled - flow files are watched for changesSubmit and manage jobs via CLI:
# Submit job locally
routilux job submit --flow myflow --routine processor --data '{"input": "value"}'
# Submit job to remote server
routilux job submit --server http://localhost:8080 --flow myflow --routine processor --data '{}'
# Check job status
routilux job status <job_id>
# List jobs
routilux job list --flow myflowFor development with all dependencies using pip:
pip install -e ".[dev]"
# Or using Makefile
make dev-installStep 1: Define a Routine
from routilux import Routine
class DataProcessor(Routine):
def __init__(self):
super().__init__()
# Define input slot
self.input_slot = self.define_slot("input", handler=self.process_data)
# Define output event
self.output_event = self.define_event("output", ["result"])
def process_data(self, data=None, **kwargs):
# Flow is automatically detected from routine context
result = f"Processed: {data}"
self._stats["processed_count"] = self._stats.get("processed_count", 0) + 1
self.emit("output", result=result) # No need to pass flow!Step 2: Create and Connect a Flow
from routilux import Flow
flow = Flow(flow_id="my_workflow")
processor1 = DataProcessor()
processor2 = DataProcessor()
id1 = flow.add_routine(processor1, "processor1")
id2 = flow.add_routine(processor2, "processor2")
# Connect: processor1's output → processor2's input
flow.connect(id1, "output", id2, "input")Step 3: Execute
job_state = flow.execute(id1, entry_params={"data": "Hello, Routilux!"})
print(job_state.status) # "completed"
print(processor1.stats()) # {"processed_count": 1}🎉 Done! You've created your first workflow.
Routines communicate through events and slots using a unified event queue pattern:
# Multiple routines can listen to the same event
flow.connect(processor1, "output", processor2, "input")
flow.connect(processor1, "output", processor3, "input") # Fan-out
# Multiple events can feed into the same slot
flow.connect(processor1, "output", aggregator, "input")
flow.connect(processor2, "output", aggregator, "input") # Fan-in
# emit() is non-blocking - returns immediately after enqueuing tasks
# Flow is automatically detected from routine context
self.emit("output", data="value") # No flow parameter needed!Track everything automatically:
# Access routine state
stats = routine.stats() # {"processed_count": 42, "errors": 0}
# Track execution history
history = job_state.get_execution_history()
# Performance metrics
perf = flow.execution_tracker.get_routine_performance("processor1")Choose the right strategy for your use case:
from routilux import ErrorHandler, ErrorStrategy
# Stop on error (default)
flow.set_error_handler(ErrorHandler(ErrorStrategy.STOP))
# Continue and log errors
flow.set_error_handler(ErrorHandler(ErrorStrategy.CONTINUE))
# Retry with exponential backoff
flow.set_error_handler(ErrorHandler(
ErrorStrategy.RETRY,
max_retries=3,
retry_delay=1.0,
backoff_multiplier=2.0
))Both sequential and concurrent modes use the same event queue mechanism:
# Sequential mode (default): max_workers=1
flow = Flow() # Sequential by default
# Concurrent mode: max_workers>1
flow.set_execution_strategy("concurrent", max_workers=4)
# Tasks are processed fairly in queue order
# Long chains don't block shorter ones
job_state = flow.execute(entry_routine_id)
flow.wait_for_completion() # Wait for async tasksSave and resume workflows:
# Save workflow state
job_state.save("workflow_state.json")
# Later, resume from saved state
saved_state = JobState.load("workflow_state.json")
flow.resume(saved_state)📖 Full documentation available at: routilux.readthedocs.io
- 📄 Where flows come from: DSL, registry, API, and “Sync from registry” in Overseer
- 📘 User Guide: Comprehensive guide covering all features
- 🔧 API Reference: Complete API documentation
- 💻 Examples: Real-world code examples
- 🏗️ Design: Architecture and design principles
pip install -e ".[docs]"
cd docs && make htmlCheck out the examples/ directory for practical examples:
basic_example.py- Your first workflowdata_processing.py- Multi-stage data pipelineconcurrent_flow_demo.py- Parallel executionerror_handling_example.py- Error handling strategiesstate_management_example.py- State tracking and recoverybuiltin_routines_demo.py- Using built-in routines
data_pipeline.py- Multi-stage data processing with validation, transformation, and quality checksasync_orchestration.py- Concurrent task execution with result aggregationlong_running_workflow.py- Pause/resume execution with state persistence and recoveryerror_handling.py- Retry patterns and fallback mechanisms
Run examples:
python examples/basic_example.py
python examples/data_pipeline.py
python examples/async_orchestration.py
python examples/long_running_workflow.py
python examples/error_handling.pyRoutilux comes with a rich set of built-in routines ready to use:
- Text Processing:
TextClipper,TextRenderer,ResultExtractor - Data Processing:
DataTransformer,DataValidator,DataFlattener - Control Flow:
ConditionalRouterfor dynamic routing - Utilities:
TimeProviderfor timestamps
from routilux.builtin_routines import ConditionalRouter, DataTransformer
# Use built-in routines directly
router = ConditionalRouter()
transformer = DataTransformer()routilux/
├── routilux/ # Main package
│ ├── routine.py # Routine base class
│ ├── flow.py # Flow manager
│ ├── job_state.py # State management
│ ├── connection.py # Connection management
│ ├── event.py # Event class
│ ├── slot.py # Slot class
│ ├── error_handler.py # Error handling
│ └── execution_tracker.py # Performance tracking
├── tests/ # Comprehensive test suite
├── examples/ # Usage examples
└── docs/ # Sphinx documentation
Routilux comes with comprehensive tests:
# Run all tests
make test-all
# Run with coverage
make test-cov
# Run specific test suite
pytest tests/ # Core tests
pytest routilux/builtin_routines/ # Built-in routines testsWe welcome contributions! Here's how you can help:
- Star the project ⭐ - Show your support
- Report bugs 🐛 - Help us improve
- Suggest features 💡 - Share your ideas
- Submit PRs 🔧 - Contribute code
Routilux is part of the Agentsmith open-source ecosystem. Agentsmith is a ToB AI agent and algorithm development platform, currently deployed in multiple highway management companies, securities firms, and regulatory agencies in China. The Agentsmith team is gradually open-sourcing the platform by removing proprietary code and algorithm modules, as well as enterprise-specific customizations, while decoupling the system for modular use by the open-source community.
- Varlord ⚙️ - Configuration management library with multi-source support
- Routilux ⚡ - Event-driven workflow orchestration framework
- Serilux 📦 - Flexible serialization framework for Python objects
- Lexilux 🚀 - Unified LLM API client library
These projects are modular components extracted from the Agentsmith platform, designed to be used independently or together to build powerful applications.
Routilux is licensed under the Apache License 2.0. See LICENSE for details.
- 📦 PyPI: pypi.org/project/routilux
- 📚 Documentation: routilux.readthedocs.io
- 🐙 GitHub: github.com/lzjever/routilux
- 📧 Issues: github.com/lzjever/routilux/issues
If Routilux helps you build amazing workflows, consider giving it a star on GitHub!
Built with ❤️ by the Routilux Team
Making workflow orchestration simple, powerful, and fun.