Hyrex is a modern, open-source task orchestration framework built on PostgreSQL. It provides powerful features for distributed task processing, workflow management, and asynchronous job execution.
- Task Orchestration: Define and execute distributed tasks with automatic retries, timeouts, and error handling
- Workflow Support: Build complex DAG-based workflows with dependencies
- Queue Management: Route tasks to different queues with separate worker pools
- Key-Value Store: Built-in distributed key-value storage for task coordination
- Task Context: Rich execution context with task metadata and hierarchy tracking
- Cron Scheduling: Schedule recurring tasks with cron expressions
- Hyrex Studio: Web-based UI for monitoring and debugging (available at https://local.hyrex.studio)
- Type Safety: Full type hints and Pydantic model validation
pip install hyrex
Use the interactive hyrex init
command to set up a new project:
hyrex init
This will guide you through:
- Choosing a project name
- Selecting between PostgreSQL (self-hosted) or Hyrex Cloud
- Creating project files (
.env
,hyrex_app.py
,tasks.py
,requirements.txt
,Dockerfile
)
For manual database initialization (if needed):
export HYREX_DATABASE_URL="postgresql://user:password@localhost/dbname"
hyrex init-db
After running hyrex init
, you'll have:
your-project/
├── .env # Environment configuration
├── hyrex_app.py # Hyrex app configuration
├── tasks.py # Task definitions
├── requirements.txt # Python dependencies
└── Dockerfile # Container configuration
Example tasks.py
:
from hyrex import HyrexRegistry
from pydantic import BaseModel
hy = HyrexRegistry()
class EmailContext(BaseModel):
to: str
subject: str
body: str
@hy.task
def send_email(context: EmailContext):
print(f"Sending email to {context.to}")
# Your email logic here
return {"sent": True}
Queue tasks for execution:
# Send a task to the default queue
send_email.send(EmailContext(
to="user@example.com",
subject="Welcome!",
body="Thanks for signing up"
))
# Send with custom configuration
send_email.with_config(
queue="high-priority",
max_retries=3,
timeout_seconds=30
).send(EmailContext(...))
Start workers to process tasks:
hyrex run-worker hyrex_app:app
The @task
decorator transforms functions into distributed tasks:
@hy.task(
queue="processing", # Target queue (str or HyrexQueue object)
max_retries=3, # Maximum retry attempts (default: 0)
timeout_seconds=300, # Task timeout in seconds
priority=5, # Priority 1-10 (higher = more important)
on_error=error_handler, # Error callback function
retry_backoff=lambda n: n*10 # Backoff strategy function
)
def process_data(context: ProcessContext):
# Task implementation
pass
Access rich execution context within tasks:
from hyrex import get_hyrex_context
@hy.task
def contextual_task():
context = get_hyrex_context()
print(f"Task ID: {context.task_id}")
print(f"Task Name: {context.task_name}")
print(f"Attempt: {context.attempt_number}/{context.max_retries}")
print(f"Queue: {context.queue}")
print(f"Parent Task: {context.parent_id}")
print(f"Root Task: {context.root_id}")
Use HyrexKV for distributed state management:
from hyrex import HyrexKV
@hy.task
def process_with_state(user_id: str):
# Store state
HyrexKV.set(f"user:{user_id}:status", "processing")
# Retrieve state
status = HyrexKV.get(f"user:{user_id}:status")
# Delete state
HyrexKV.delete(f"user:{user_id}:status")
Note: HyrexKV stores string values up to 1MB in size.
Build complex DAG-based workflows:
@hy.task
def extract_data():
return {"data": "extracted"}
@hy.task
def transform_data():
return {"data": "transformed"}
@hy.task
def load_data():
return {"data": "loaded"}
class ETLWorkflowArgs(BaseModel):
source: str
destination: str
@hy.workflow(
queue="etl",
timeout_seconds=3600,
workflow_arg_schema=ETLWorkflowArgs
)
def etl_workflow():
# Define workflow DAG
extract_data >> transform_data >> load_data
# Parallel execution
extract_data >> [transform_data, validate_data] >> load_data
# With custom config
extract_data >> transform_data.with_config(queue="cpu-intensive") >> load_data
Send workflows:
etl_workflow.send(ETLWorkflowArgs(
source="s3://input",
destination="s3://output"
))
Access workflow context:
from hyrex import get_hyrex_workflow_context
@hy.task
def workflow_task():
wf_context = get_hyrex_workflow_context()
# Access workflow arguments
args = wf_context.workflow_args
# Access other task results
extract_result = wf_context.durable_runs.get("extract_data")
if extract_result:
extract_result.refresh() # Get latest status
Use with_config()
to modify task behavior at runtime:
# Define base task
@hy.task(queue="default", max_retries=1)
def flexible_task(data: str):
return process(data)
# Override configuration when sending
flexible_task.with_config(
queue="high-priority",
max_retries=5,
timeout_seconds=60,
priority=10
).send("important-data")
Schedule recurring tasks:
@hy.task(cron="0 2 * * *") # Daily at 2 AM
def daily_cleanup():
# Cleanup logic
pass
# Tasks with default arguments can also be scheduled
@hy.task(cron="0 0 * * 0") # Weekly on Sunday
def weekly_backup(retention_days: int = 30):
# Backup logic with configurable retention
pass
@hy.workflow(cron="0 0 * * 0") # Weekly on Sunday
def weekly_report():
generate_report >> send_report
Note: Cron-scheduled tasks must have no arguments or all arguments must have default values.
Implement custom error handlers:
def handle_task_error(error: Exception):
# Log error, send alerts, etc.
print(f"Task failed: {error}")
@hy.task(
on_error=handle_task_error,
max_retries=3,
retry_backoff=lambda attempt: 2 ** attempt # Exponential backoff
)
def risky_task():
# Task that might fail
pass
hyrex init
- Interactive project initialization wizardhyrex init-db
- Initialize the database schemahyrex run-worker <module:instance>
- Start a worker processhyrex studio
- Start Hyrex Studio server
Hyrex Studio provides a web interface for monitoring your tasks and workflows:
-
Start the studio server:
hyrex studio
-
Open https://local.hyrex.studio in your browser
-
Monitor task execution, view logs, and inspect your data
Hyrex uses environment variables for configuration:
HYREX_DATABASE_URL
- PostgreSQL connection string (required)STUDIO_PORT
- Port for Hyrex Studio (default: 1337)STUDIO_VERBOSE
- Enable verbose logging for Studio (default: false)
Share task definitions across modules:
# common_tasks.py
common_registry = HyrexRegistry()
@common_registry.task
def shared_task():
pass
# main.py
from common_tasks import common_registry
hy = HyrexRegistry()
hy.add_registry(common_registry) # Include all tasks from common_registry
Build complex task hierarchies:
@hy.task
def parent_task(count: int):
# Spawn child tasks
for i in range(count):
child_task.send(index=i)
# Tasks maintain parent-child relationships
# visible in context.parent_id and context.root_id
Ensure tasks run only once:
@hy.task
def idempotent_task(user_id: str):
# Process user
pass
# Using idempotency key
idempotent_task.with_config(
idempotency_key=f"process-user-{user_id}"
).send(user_id="123")
- Python 3.11+
- PostgreSQL 12+
- Required Python packages are automatically installed with pip
Apache License 2.0