diff --git a/docs/getting-started/architecture.md b/docs/getting-started/architecture.md index fdc1d7a..2eeca8f 100644 --- a/docs/getting-started/architecture.md +++ b/docs/getting-started/architecture.md @@ -71,6 +71,37 @@ The SDK provides a simple interface for writing task handlers: - **`run()` function**: Enters the stdin/stdout loop for long-running mode - **`run_once()` function**: Processes a single task for one-shot mode +## Queue Organization + +### Sub-Queues and Priority + +runqy supports sub-queues to handle different priority levels for the same task type. This is particularly useful when different user tiers or applications need the same processing but with different service levels. + +**Example: Paid vs Free Users** + +``` +inference.premium (priority: 10) ─┐ + ├─→ Same task code (same deployment) +inference.standard (priority: 3) ─┘ +``` + +Both sub-queues run identical task code, but workers prioritize `inference.premium` tasks. Your API routes users based on their tier: + +- Paid user request → enqueue to `inference.premium` +- Free user request → enqueue to `inference.standard` + +When workers have capacity, they process higher-priority sub-queues first, ensuring paid users experience lower latency. + +### Queue Naming + +Queues use the format `{parent}.{sub_queue}`: + +- `inference.premium` — High priority +- `inference.standard` — Standard priority +- `simple.default` — Default (when no sub-queue specified) + +When you reference a queue without a sub-queue suffix (e.g., `inference`), runqy automatically appends `.default` (→ `inference.default`). + ## Communication Protocol ### Worker ↔ Python Process diff --git a/docs/getting-started/quickstart.md b/docs/getting-started/quickstart.md index cba954f..921f130 100644 --- a/docs/getting-started/quickstart.md +++ b/docs/getting-started/quickstart.md @@ -237,8 +237,8 @@ server: worker: queues: - - "quickstart-oneshot_default" - - "quickstart-longrunning_default" + - "quickstart-oneshot.default" + - "quickstart-longrunning.default" ``` The worker will: @@ -260,7 +260,7 @@ In a new terminal: -H "Authorization: Bearer dev-api-key" \ -H "Content-Type: application/json" \ -d '{ - "queue": "quickstart-oneshot_default", + "queue": "quickstart-oneshot.default", "timeout": 60, "data": {"operation": "uppercase", "data": "hello world"} }' @@ -272,7 +272,7 @@ In a new terminal: curl.exe -X POST http://localhost:3000/queue/add ` -H "Authorization: Bearer dev-api-key" ` -H "Content-Type: application/json" ` - -d '{\"queue\": \"quickstart-oneshot_default\", \"timeout\": 60, \"data\": {\"operation\": \"uppercase\", \"data\": \"hello world\"}}' + -d '{\"queue\": \"quickstart-oneshot.default\", \"timeout\": 60, \"data\": {\"operation\": \"uppercase\", \"data\": \"hello world\"}}' ``` Response: @@ -282,7 +282,7 @@ Response: "info": { "id": "abc123...", "state": "pending", - "queue": "quickstart-oneshot_default", + "queue": "quickstart-oneshot.default", ... }, "data": {...} @@ -292,9 +292,12 @@ Response: !!! tip "Task ID" Use the `id` from the response to check the result in the next step. +!!! note "Queue name shorthand" + You can omit the `.default` suffix when enqueueing. For example, `quickstart-oneshot` automatically resolves to `quickstart-oneshot.default`. + ### Try Long-Running Mode -To try long-running mode, just enqueue to `quickstart-longrunning_default` — the worker already listens on both queues. +To try long-running mode, just enqueue to `quickstart-longrunning.default` — the worker already listens on both queues. ## 6. Check the Result @@ -318,7 +321,7 @@ Response: { "info": { "state": "completed", - "queue": "quickstart-oneshot_default", + "queue": "quickstart-oneshot.default", "result": {"result": "HELLO WORLD"} } } diff --git a/docs/guides/enqueueing-tasks.md b/docs/guides/enqueueing-tasks.md index e59f03d..00d0d1d 100644 --- a/docs/guides/enqueueing-tasks.md +++ b/docs/guides/enqueueing-tasks.md @@ -23,13 +23,30 @@ runqy uses an asynq-compatible key format: ## Queue Naming -Queues use the format `{parent}_{sub_queue}`: +Sub-queues let you assign different priorities to the same task type. A common use case is routing paid users to a high-priority sub-queue while free users go to a lower-priority sub-queue—both execute the same task code, but paid users get processed first. -- `inference_high` — High priority inference -- `inference_low` — Low priority inference -- `simple_default` — Default simple queue +Queues use the format `{parent}.{sub_queue}`: -Workers register for a parent queue (e.g., `inference`) and process tasks from all its sub-queues. +- `inference.premium` — High priority (paid users) +- `inference.standard` — Standard priority (free users) +- `simple.default` — Default simple queue + +Workers register for a parent queue (e.g., `inference`) and process tasks from all its sub-queues, prioritizing higher-priority sub-queues first. + +### Automatic Default Fallback + +When you specify a queue name without the sub-queue suffix, runqy automatically appends `.default`: + +| You provide | Resolves to | +|-------------|-------------| +| `inference` | `inference.default` | +| `simple` | `simple.default` | +| `inference.high` | `inference.high` (unchanged) | + +This works in the API, CLI, and direct Redis operations (the server normalizes the queue name before processing). + +!!! warning "Queue must exist" + If the resolved queue (e.g., `inference.default`) doesn't exist in the configuration, the operation fails with an error. ## Examples @@ -44,10 +61,10 @@ HSET asynq:t:my-task-id \ payload '{"input": "hello world"}' \ retry 0 \ max_retry 3 \ - queue inference_default + queue inference.default # Push to pending queue -LPUSH asynq:inference_default:pending my-task-id +LPUSH asynq:inference.default:pending my-task-id ``` ### Python @@ -78,7 +95,7 @@ def enqueue_task(queue: str, payload: dict, max_retry: int = 3) -> str: return task_id # Usage -task_id = enqueue_task("inference_default", {"input": "hello"}) +task_id = enqueue_task("inference.default", {"input": "hello"}) print(f"Enqueued task: {task_id}") ``` @@ -109,7 +126,7 @@ async function enqueueTask(queue, payload, maxRetry = 3) { } // Usage -const taskId = await enqueueTask('inference_default', { input: 'hello' }); +const taskId = await enqueueTask('inference.default', { input: 'hello' }); console.log(`Enqueued task: ${taskId}`); ``` @@ -199,10 +216,10 @@ print(f"Result: {result}") redis-cli HGETALL asynq:t:my-task-id # Check if task is pending -redis-cli LRANGE asynq:inference_default:pending 0 -1 +redis-cli LRANGE asynq:inference.default:pending 0 -1 # Check if task is active (being processed) -redis-cli LRANGE asynq:inference_default:active 0 -1 +redis-cli LRANGE asynq:inference.default:active 0 -1 # Get result redis-cli GET asynq:result:my-task-id diff --git a/docs/guides/local-development.md b/docs/guides/local-development.md index 9cccf48..dacf5c9 100644 --- a/docs/guides/local-development.md +++ b/docs/guides/local-development.md @@ -117,8 +117,8 @@ echo '{"task_id":"t1","payload":{"foo":"bar"}}' | python python/hello_world/dumm ```bash redis-cli -HSET asynq:t:test-1 type task payload '{"msg":"hello"}' retry 0 max_retry 2 queue inference_default -LPUSH asynq:inference_default:pending test-1 +HSET asynq:t:test-1 type task payload '{"msg":"hello"}' retry 0 max_retry 2 queue inference.default +LPUSH asynq:inference.default:pending test-1 ``` ### Check Results diff --git a/docs/guides/monitoring.md b/docs/guides/monitoring.md index c76d69a..22c37b2 100644 --- a/docs/guides/monitoring.md +++ b/docs/guides/monitoring.md @@ -37,20 +37,20 @@ When `healthy: false`: ```bash # Count pending tasks -redis-cli LLEN asynq:inference_default:pending +redis-cli LLEN asynq:inference.default:pending # List pending task IDs -redis-cli LRANGE asynq:inference_default:pending 0 -1 +redis-cli LRANGE asynq:inference.default:pending 0 -1 ``` ### Active Tasks ```bash # Count active tasks -redis-cli LLEN asynq:inference_default:active +redis-cli LLEN asynq:inference.default:active # List active task IDs -redis-cli LRANGE asynq:inference_default:active 0 -1 +redis-cli LRANGE asynq:inference.default:active 0 -1 ``` ## Task Inspection diff --git a/docs/guides/result-delivery.md b/docs/guides/result-delivery.md index da3ecdc..0d407ce 100644 --- a/docs/guides/result-delivery.md +++ b/docs/guides/result-delivery.md @@ -78,7 +78,7 @@ if __name__ == "__main__": **Enqueueing with webhook:** ```python -task_id = enqueue_task("inference_default", { +task_id = enqueue_task("inference.default", { "input": "process this", "webhook_url": "https://api.example.com/webhooks/task-complete" }) diff --git a/docs/python-sdk/index.md b/docs/python-sdk/index.md index cd61eca..5ee8ca9 100644 --- a/docs/python-sdk/index.md +++ b/docs/python-sdk/index.md @@ -60,7 +60,7 @@ from runqy_python import RunqyClient client = RunqyClient("http://localhost:3000", api_key="your-api-key") # Enqueue a task -task = client.enqueue("inference_default", {"input": "hello"}) +task = client.enqueue("inference.default", {"input": "hello"}) print(f"Task ID: {task.task_id}") # Check result diff --git a/docs/server/api.md b/docs/server/api.md index d88a709..b9eea89 100644 --- a/docs/server/api.md +++ b/docs/server/api.md @@ -89,7 +89,7 @@ Enqueue a new task. ```json { - "queue": "inference_high", + "queue": "inference.high", "timeout": 300, "data": { "prompt": "Hello world", @@ -104,7 +104,7 @@ Enqueue a new task. { "info": { "id": "task-uuid", - "queue": "inference_high", + "queue": "inference.high", "state": "pending" } } @@ -120,7 +120,7 @@ Get task status and result. The queue is automatically determined from the task' { "info": { "id": "task-uuid", - "queue": "inference_high", + "queue": "inference.high", "state": "completed", "result": "..." } diff --git a/docs/server/cli.md b/docs/server/cli.md index d2b1ec6..c011942 100644 --- a/docs/server/cli.md +++ b/docs/server/cli.md @@ -67,14 +67,14 @@ runqy queue list Output: ``` QUEUE PENDING ACTIVE SCHEDULED RETRY ARCHIVED COMPLETED PAUSED -inference_high 5 2 0 1 0 150 no -inference_low 12 0 3 0 0 89 no +inference.high 5 2 0 1 0 150 no +inference.low 12 0 3 0 0 89 no ``` ### Inspect Queue ```bash -runqy queue inspect inference_high +runqy queue inspect inference.high ``` Shows detailed queue information including status, pause state, memory usage, and task counts. @@ -83,10 +83,10 @@ Shows detailed queue information including status, pause state, memory usage, an ```bash # Pause a queue (stops processing new tasks) -runqy queue pause inference_high +runqy queue pause inference.high # Resume a paused queue -runqy queue unpause inference_high +runqy queue unpause inference.high ``` ## Task Commands @@ -95,13 +95,13 @@ runqy queue unpause inference_high ```bash # Enqueue a task with JSON payload -runqy task enqueue --queue inference_high --payload '{"prompt":"Hello world","width":1024}' +runqy task enqueue --queue inference.high --payload '{"prompt":"Hello world","width":1024}' # Short flags -runqy task enqueue -q inference_high -p '{"msg":"test"}' +runqy task enqueue -q inference.high -p '{"msg":"test"}' # With custom timeout (seconds) -runqy task enqueue -q inference_high -p '{"data":"value"}' --timeout 300 +runqy task enqueue -q inference.high -p '{"data":"value"}' --timeout 300 ``` **Enqueue Flags:** @@ -116,31 +116,31 @@ runqy task enqueue -q inference_high -p '{"data":"value"}' --timeout 300 ```bash # List pending tasks in a queue -runqy task list inference_high +runqy task list inference.high # List tasks by state -runqy task list inference_high --state pending -runqy task list inference_high --state active -runqy task list inference_high --state scheduled -runqy task list inference_high --state retry -runqy task list inference_high --state archived -runqy task list inference_high --state completed +runqy task list inference.high --state pending +runqy task list inference.high --state active +runqy task list inference.high --state scheduled +runqy task list inference.high --state retry +runqy task list inference.high --state archived +runqy task list inference.high --state completed # Limit number of results -runqy task list inference_high --state pending --limit 20 +runqy task list inference.high --state pending --limit 20 ``` ### Get Task Details ```bash -runqy task get inference_high abc123-task-id +runqy task get inference.high abc123-task-id ``` Output: ``` Task ID: abc123-task-id Type: task -Queue: inference_high +Queue: inference.high State: completed Max Retry: 3 Retried: 0 @@ -155,7 +155,7 @@ Timeout: 10m0s runqy task cancel abc123-task-id # Delete a task from a queue -runqy task delete inference_high abc123-task-id +runqy task delete inference.high abc123-task-id ``` ## Worker Commands @@ -169,8 +169,8 @@ runqy worker list Output: ``` WORKER_ID STATUS QUEUES CONCURRENCY LAST_BEAT STALE -worker-abc123-def456 ready inference_high 1 5s no -worker-xyz789-uvw012 ready inference_low 1 3s no +worker-abc123-def456 ready inference.high 1 5s no +worker-xyz789-uvw012 ready inference.low 1 3s no ``` ### Get Worker Info @@ -183,7 +183,7 @@ Output: ``` Worker ID: worker-abc123-def456 Status: ready -Queues: inference_high +Queues: inference.high Concurrency: 1 Started At: 2024-01-15 10:30:00 Last Beat: 2024-01-15 10:35:45 (5s ago) @@ -200,9 +200,9 @@ runqy config list Output: ``` NAME PRIORITY PROVIDER MODE GIT_URL -inference_high 10 worker long_running https://github.com/org/worker.git -inference_low 5 worker long_running https://github.com/org/worker.git -simple_default 1 worker one_shot https://github.com/org/simple.git +inference.high 10 worker long_running https://github.com/org/worker.git +inference.low 5 worker long_running https://github.com/org/worker.git +simple.default 1 worker one_shot https://github.com/org/simple.git ``` ### Reload Configurations @@ -408,7 +408,7 @@ runqy -s https://runqy.example.com:3000 queue list runqy -s https://server:3000 -k API_KEY queue list # Enqueue a task on remote server -runqy -s https://server:3000 -k API_KEY task enqueue -q inference_high -p '{"msg":"hello"}' +runqy -s https://server:3000 -k API_KEY task enqueue -q inference.high -p '{"msg":"hello"}' # List workers on remote server runqy -s https://server:3000 -k API_KEY worker list diff --git a/docs/server/configuration.md b/docs/server/configuration.md index ea5487c..bdb78cd 100644 --- a/docs/server/configuration.md +++ b/docs/server/configuration.md @@ -76,14 +76,68 @@ Each queue is defined by its name (key) and the following options: ## Sub-Queues -Queues support sub-queue naming using the format `{parent}_{sub_queue}`: +Sub-queues allow you to assign different priorities to the same task type based on the caller's tier or urgency. This is useful when multiple clients submit identical tasks but require different service levels. -- `inference_high` — High priority inference tasks -- `inference_low` — Low priority inference tasks -- `simple_default` — Default simple tasks +### Use Case: Paid vs Free Users + +A common pattern is routing tasks from paid users to a high-priority sub-queue while free users go to a lower-priority sub-queue: + +```yaml +queues: + inference: + sub_queues: + - name: premium # Paid users — processed first + priority: 10 + - name: standard # Free users — processed when premium queue is empty + priority: 3 + deployment: + git_url: "https://github.com/example/inference.git" + branch: "main" + startup_cmd: "python main.py" + startup_timeout_secs: 300 +``` + +Your API backend then routes tasks based on user tier: + +```python +# In your API backend +if user.is_premium: + client.enqueue("inference.premium", payload) +else: + client.enqueue("inference.standard", payload) +``` + +Both sub-queues run the **same task code** (same deployment), but workers prioritize `inference.premium` tasks over `inference.standard` tasks. + +### Naming Format + +Queues use the format `{parent}.{sub_queue}`: + +- `inference.premium` — High priority inference tasks +- `inference.standard` — Standard priority inference tasks +- `simple.default` — Default simple tasks Workers register for a parent queue and can process tasks from any of its sub-queues based on priority. +### Automatic Default Fallback + +When a queue name is referenced without a sub-queue suffix, runqy automatically appends `.default`: + +| You provide | Resolves to | +|-------------|-------------| +| `inference` | `inference.default` | +| `simple` | `simple.default` | +| `inference.high` | `inference.high` (unchanged) | + +This applies to: + +- **Workers:** When a worker registers for queue `inference`, it actually registers for `inference.default` +- **Task enqueueing:** When you enqueue a task to `inference`, it goes to `inference.default` +- **Task retrieval:** When you query queue `inference`, it looks up `inference.default` + +!!! warning "Queue must exist" + If the resolved queue (e.g., `inference.default`) doesn't exist in the configuration, an error is returned. The fallback only adds the `.default` suffix—it doesn't create the queue automatically. + ## Vaults Vaults provide secure storage for secrets that are injected into workers as environment variables. diff --git a/docs/server/index.md b/docs/server/index.md index 1da39dd..c27b9f3 100644 --- a/docs/server/index.md +++ b/docs/server/index.md @@ -24,7 +24,7 @@ The runqy server includes a powerful CLI for managing your task queue system. Th ```bash # Local operations runqy queue list -runqy task enqueue -q inference_high -p '{"msg":"hello"}' +runqy task enqueue -q inference.high -p '{"msg":"hello"}' runqy worker list # Remote operations (with saved credentials)