Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
name: CI

on:
push:
branches: [main]
pull_request:

jobs:
test:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.9", "3.11", "3.13"]

steps:
- uses: actions/checkout@v4

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}

- name: Install package
run: pip install -e .

- name: Install test dependencies
run: pip install pytest

- name: Test
run: python -m pytest tests/ -v
3 changes: 2 additions & 1 deletion src/runqy_python/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""runqy-python: Python SDK for runqy - write distributed task handlers with simple decorators."""

# Task execution (for workers)
from .decorator import task, load
from .decorator import task, load, RetryableError
from .runner import run, run_once

# Client (for enqueuing tasks)
Expand All @@ -20,6 +20,7 @@
# Task execution
"task",
"load",
"RetryableError",
"run",
"run_once",
# Client
Expand Down
34 changes: 34 additions & 0 deletions src/runqy_python/decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,23 @@
_registered_loader = None


class RetryableError(Exception):
"""Raise this from a @task handler to signal that the task should be retried.

Usage:
from runqy_python import task, RetryableError

@task
def process(payload):
try:
result = call_external_api(payload)
except TimeoutError:
raise RetryableError("API timed out, please retry")
return result
"""
pass


def task(func):
"""Decorator to register a function as the task handler.

Expand All @@ -18,6 +35,11 @@ def process(payload: dict, ctx: dict) -> dict:
return ctx["model"].predict(payload)
"""
global _registered_handler
if _registered_handler is not None:
raise RuntimeError(
f"@task handler already registered ({_registered_handler.__name__}). "
"Only one @task handler is allowed per process."
)
_registered_handler = func
return func

Expand All @@ -39,6 +61,11 @@ def process(payload: dict, ctx: dict) -> dict:
return ctx["model"].predict(payload)
"""
global _registered_loader
if _registered_loader is not None:
raise RuntimeError(
f"@load handler already registered ({_registered_loader.__name__}). "
"Only one @load handler is allowed per process."
)
_registered_loader = func
return func

Expand All @@ -51,3 +78,10 @@ def get_handler():
def get_loader():
"""Get the registered load function."""
return _registered_loader


def _reset():
"""Reset registered handler and loader. For testing only."""
global _registered_handler, _registered_loader
_registered_handler = None
_registered_loader = None
134 changes: 121 additions & 13 deletions src/runqy_python/runner.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,67 @@
"""Runner loop for processing tasks from runqy-worker."""

import os
import sys
import json
from .decorator import get_handler, get_loader
import signal
import traceback
from .decorator import get_handler, get_loader, RetryableError

# Flag for graceful shutdown
_shutdown_requested = False

# Private file object for protocol communication (set by _protect_stdout)
_protocol_stdout = None


def _shutdown_handler(signum, frame):
"""Handle SIGTERM/SIGINT for graceful shutdown.

First signal: set flag so the current task can complete before exit.
Second signal: force exit (in case process is stuck).
"""
global _shutdown_requested
if _shutdown_requested:
# Second signal — force exit
sys.exit(1)
_shutdown_requested = True


def _protect_stdout():
"""Redirect sys.stdout to stderr so print() doesn't corrupt the JSON protocol.

The original stdout fd is saved to _protocol_stdout for _safe_write to use.
"""
global _protocol_stdout
# Duplicate the real stdout fd so it survives sys.stdout reassignment
proto_fd = os.dup(sys.stdout.fileno())
_protocol_stdout = os.fdopen(proto_fd, "w")
# Redirect sys.stdout to stderr so user print() goes to logs
sys.stdout = sys.stderr


def _safe_write(data):
"""Safely write JSON data to the protocol channel, handling BrokenPipeError and serialization errors."""
out = _protocol_stdout if _protocol_stdout is not None else sys.stdout

try:
text = json.dumps(data)
except (TypeError, ValueError) as e:
# Result not JSON-serializable — send error response instead
fallback = {
"task_id": data.get("task_id", "unknown") if isinstance(data, dict) else "unknown",
"result": None,
"error": f"Result not JSON-serializable: {e}",
"retry": False,
}
text = json.dumps(fallback)

try:
out.write(text + "\n")
out.flush()
except BrokenPipeError:
# Pipe closed by worker — exit cleanly
sys.exit(1)


def run():
Expand All @@ -15,6 +74,13 @@ def run():
4. Calls the registered @task handler with the payload (and context if @load was used)
5. Writes JSON responses to stdout
"""
# Protect stdout: redirect sys.stdout to stderr so print() doesn't corrupt protocol
_protect_stdout()

# Install signal handlers for graceful shutdown
signal.signal(signal.SIGTERM, _shutdown_handler)
signal.signal(signal.SIGINT, _shutdown_handler)

handler = get_handler()
if handler is None:
raise RuntimeError("No task handler registered. Use @task decorator.")
Expand All @@ -23,14 +89,20 @@ def run():
loader = get_loader()
ctx = None
if loader is not None:
ctx = loader()
try:
ctx = loader()
except Exception as e:
_safe_write({"status": "error", "error": f"@load failed: {e}"})
sys.exit(1)

# Ready signal
print(json.dumps({"status": "ready"}))
sys.stdout.flush()
_safe_write({"status": "ready"})

# Process tasks from stdin
for line in sys.stdin:
if _shutdown_requested:
break

line = line.strip()
if not line:
continue
Expand All @@ -53,16 +125,29 @@ def run():
"error": None,
"retry": False
}
except Exception as e:
except json.JSONDecodeError as e:
response = {
"task_id": task_id,
"result": None,
"error": f"Invalid JSON input: {e}",
"retry": False
}
except RetryableError as e:
response = {
"task_id": task_id,
"result": None,
"error": str(e),
"retry": True
}
except Exception as e:
response = {
"task_id": task_id,
"result": None,
"error": traceback.format_exc(),
"retry": False
}

print(json.dumps(response))
sys.stdout.flush()
_safe_write(response)


def run_once():
Expand All @@ -78,6 +163,13 @@ def run_once():
5. Writes response to stdout
6. Exits
"""
# Protect stdout: redirect sys.stdout to stderr so print() doesn't corrupt protocol
_protect_stdout()

# Install signal handlers for graceful shutdown
signal.signal(signal.SIGTERM, _shutdown_handler)
signal.signal(signal.SIGINT, _shutdown_handler)

handler = get_handler()
if handler is None:
raise RuntimeError("No task handler registered. Use @task decorator.")
Expand All @@ -86,11 +178,14 @@ def run_once():
loader = get_loader()
ctx = None
if loader is not None:
ctx = loader()
try:
ctx = loader()
except Exception as e:
_safe_write({"status": "error", "error": f"@load failed: {e}"})
sys.exit(1)

# Ready signal
print(json.dumps({"status": "ready"}))
sys.stdout.flush()
_safe_write({"status": "ready"})

# Read ONE task
line = sys.stdin.readline().strip()
Expand All @@ -115,13 +210,26 @@ def run_once():
"error": None,
"retry": False
}
except Exception as e:
except json.JSONDecodeError as e:
response = {
"task_id": task_id,
"result": None,
"error": f"Invalid JSON input: {e}",
"retry": False
}
except RetryableError as e:
response = {
"task_id": task_id,
"result": None,
"error": str(e),
"retry": True
}
except Exception as e:
response = {
"task_id": task_id,
"result": None,
"error": traceback.format_exc(),
"retry": False
}

print(json.dumps(response))
sys.stdout.flush()
_safe_write(response)
Loading