Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions docs/getting-started/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,37 @@ The SDK provides a simple interface for writing task handlers:
- **`run()` function**: Enters the stdin/stdout loop for long-running mode
- **`run_once()` function**: Processes a single task for one-shot mode

## Queue Organization

### Sub-Queues and Priority

runqy supports sub-queues to handle different priority levels for the same task type. This is particularly useful when different user tiers or applications need the same processing but with different service levels.

**Example: Paid vs Free Users**

```
inference.premium (priority: 10) ─┐
├─→ Same task code (same deployment)
inference.standard (priority: 3) ─┘
```

Both sub-queues run identical task code, but workers prioritize `inference.premium` tasks. Your API routes users based on their tier:

- Paid user request → enqueue to `inference.premium`
- Free user request → enqueue to `inference.standard`

When workers have capacity, they process higher-priority sub-queues first, ensuring paid users experience lower latency.

### Queue Naming

Queues use the format `{parent}.{sub_queue}`:

- `inference.premium` — High priority
- `inference.standard` — Standard priority
- `simple.default` — Default (when no sub-queue specified)

When you reference a queue without a sub-queue suffix (e.g., `inference`), runqy automatically appends `.default` (→ `inference.default`).

## Communication Protocol

### Worker ↔ Python Process
Expand Down
17 changes: 10 additions & 7 deletions docs/getting-started/quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ server:

worker:
queues:
- "quickstart-oneshot_default"
- "quickstart-longrunning_default"
- "quickstart-oneshot.default"
- "quickstart-longrunning.default"
```

The worker will:
Expand All @@ -260,7 +260,7 @@ In a new terminal:
-H "Authorization: Bearer dev-api-key" \
-H "Content-Type: application/json" \
-d '{
"queue": "quickstart-oneshot_default",
"queue": "quickstart-oneshot.default",
"timeout": 60,
"data": {"operation": "uppercase", "data": "hello world"}
}'
Expand All @@ -272,7 +272,7 @@ In a new terminal:
curl.exe -X POST http://localhost:3000/queue/add `
-H "Authorization: Bearer dev-api-key" `
-H "Content-Type: application/json" `
-d '{\"queue\": \"quickstart-oneshot_default\", \"timeout\": 60, \"data\": {\"operation\": \"uppercase\", \"data\": \"hello world\"}}'
-d '{\"queue\": \"quickstart-oneshot.default\", \"timeout\": 60, \"data\": {\"operation\": \"uppercase\", \"data\": \"hello world\"}}'
```

Response:
Expand All @@ -282,7 +282,7 @@ Response:
"info": {
"id": "abc123...",
"state": "pending",
"queue": "quickstart-oneshot_default",
"queue": "quickstart-oneshot.default",
...
},
"data": {...}
Expand All @@ -292,9 +292,12 @@ Response:
!!! tip "Task ID"
Use the `id` from the response to check the result in the next step.

!!! note "Queue name shorthand"
You can omit the `.default` suffix when enqueueing. For example, `quickstart-oneshot` automatically resolves to `quickstart-oneshot.default`.

### Try Long-Running Mode

To try long-running mode, just enqueue to `quickstart-longrunning_default` — the worker already listens on both queues.
To try long-running mode, just enqueue to `quickstart-longrunning.default` — the worker already listens on both queues.

## 6. Check the Result

Expand All @@ -318,7 +321,7 @@ Response:
{
"info": {
"state": "completed",
"queue": "quickstart-oneshot_default",
"queue": "quickstart-oneshot.default",
"result": {"result": "HELLO WORLD"}
}
}
Expand Down
39 changes: 28 additions & 11 deletions docs/guides/enqueueing-tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,30 @@ runqy uses an asynq-compatible key format:

## Queue Naming

Queues use the format `{parent}_{sub_queue}`:
Sub-queues let you assign different priorities to the same task type. A common use case is routing paid users to a high-priority sub-queue while free users go to a lower-priority sub-queue—both execute the same task code, but paid users get processed first.

- `inference_high` — High priority inference
- `inference_low` — Low priority inference
- `simple_default` — Default simple queue
Queues use the format `{parent}.{sub_queue}`:

Workers register for a parent queue (e.g., `inference`) and process tasks from all its sub-queues.
- `inference.premium` — High priority (paid users)
- `inference.standard` — Standard priority (free users)
- `simple.default` — Default simple queue

Workers register for a parent queue (e.g., `inference`) and process tasks from all its sub-queues, prioritizing higher-priority sub-queues first.

### Automatic Default Fallback

When you specify a queue name without the sub-queue suffix, runqy automatically appends `.default`:

| You provide | Resolves to |
|-------------|-------------|
| `inference` | `inference.default` |
| `simple` | `simple.default` |
| `inference.high` | `inference.high` (unchanged) |

This works in the API, CLI, and direct Redis operations (the server normalizes the queue name before processing).

!!! warning "Queue must exist"
If the resolved queue (e.g., `inference.default`) doesn't exist in the configuration, the operation fails with an error.

## Examples

Expand All @@ -44,10 +61,10 @@ HSET asynq:t:my-task-id \
payload '{"input": "hello world"}' \
retry 0 \
max_retry 3 \
queue inference_default
queue inference.default

# Push to pending queue
LPUSH asynq:inference_default:pending my-task-id
LPUSH asynq:inference.default:pending my-task-id
```

### Python
Expand Down Expand Up @@ -78,7 +95,7 @@ def enqueue_task(queue: str, payload: dict, max_retry: int = 3) -> str:
return task_id

# Usage
task_id = enqueue_task("inference_default", {"input": "hello"})
task_id = enqueue_task("inference.default", {"input": "hello"})
print(f"Enqueued task: {task_id}")
```

Expand Down Expand Up @@ -109,7 +126,7 @@ async function enqueueTask(queue, payload, maxRetry = 3) {
}

// Usage
const taskId = await enqueueTask('inference_default', { input: 'hello' });
const taskId = await enqueueTask('inference.default', { input: 'hello' });
console.log(`Enqueued task: ${taskId}`);
```

Expand Down Expand Up @@ -199,10 +216,10 @@ print(f"Result: {result}")
redis-cli HGETALL asynq:t:my-task-id

# Check if task is pending
redis-cli LRANGE asynq:inference_default:pending 0 -1
redis-cli LRANGE asynq:inference.default:pending 0 -1

# Check if task is active (being processed)
redis-cli LRANGE asynq:inference_default:active 0 -1
redis-cli LRANGE asynq:inference.default:active 0 -1

# Get result
redis-cli GET asynq:result:my-task-id
Expand Down
4 changes: 2 additions & 2 deletions docs/guides/local-development.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ echo '{"task_id":"t1","payload":{"foo":"bar"}}' | python python/hello_world/dumm
```bash
redis-cli

HSET asynq:t:test-1 type task payload '{"msg":"hello"}' retry 0 max_retry 2 queue inference_default
LPUSH asynq:inference_default:pending test-1
HSET asynq:t:test-1 type task payload '{"msg":"hello"}' retry 0 max_retry 2 queue inference.default
LPUSH asynq:inference.default:pending test-1
```

### Check Results
Expand Down
8 changes: 4 additions & 4 deletions docs/guides/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,20 @@ When `healthy: false`:

```bash
# Count pending tasks
redis-cli LLEN asynq:inference_default:pending
redis-cli LLEN asynq:inference.default:pending

# List pending task IDs
redis-cli LRANGE asynq:inference_default:pending 0 -1
redis-cli LRANGE asynq:inference.default:pending 0 -1
```

### Active Tasks

```bash
# Count active tasks
redis-cli LLEN asynq:inference_default:active
redis-cli LLEN asynq:inference.default:active

# List active task IDs
redis-cli LRANGE asynq:inference_default:active 0 -1
redis-cli LRANGE asynq:inference.default:active 0 -1
```

## Task Inspection
Expand Down
2 changes: 1 addition & 1 deletion docs/guides/result-delivery.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ if __name__ == "__main__":
**Enqueueing with webhook:**

```python
task_id = enqueue_task("inference_default", {
task_id = enqueue_task("inference.default", {
"input": "process this",
"webhook_url": "https://api.example.com/webhooks/task-complete"
})
Expand Down
2 changes: 1 addition & 1 deletion docs/python-sdk/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ from runqy_python import RunqyClient
client = RunqyClient("http://localhost:3000", api_key="your-api-key")

# Enqueue a task
task = client.enqueue("inference_default", {"input": "hello"})
task = client.enqueue("inference.default", {"input": "hello"})
print(f"Task ID: {task.task_id}")

# Check result
Expand Down
6 changes: 3 additions & 3 deletions docs/server/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ Enqueue a new task.

```json
{
"queue": "inference_high",
"queue": "inference.high",
"timeout": 300,
"data": {
"prompt": "Hello world",
Expand All @@ -104,7 +104,7 @@ Enqueue a new task.
{
"info": {
"id": "task-uuid",
"queue": "inference_high",
"queue": "inference.high",
"state": "pending"
}
}
Expand All @@ -120,7 +120,7 @@ Get task status and result. The queue is automatically determined from the task'
{
"info": {
"id": "task-uuid",
"queue": "inference_high",
"queue": "inference.high",
"state": "completed",
"result": "..."
}
Expand Down
Loading