Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions ami/ml/orchestration/nats_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,30 @@ async def get_connection(nats_url: str) -> tuple[nats.NATS, JetStreamContext]:
return nc, js


TASK_TTR = 300 # Default Time-To-Run (visibility timeout) in seconds
TASK_TTR = getattr(settings, "NATS_TASK_TTR", 30) # Visibility timeout in seconds (configurable)


class TaskQueueManager:
"""
Manager for NATS JetStream task queue operations.

Args:
nats_url: NATS server URL. Falls back to settings.NATS_URL, then "nats://nats:4222".
max_ack_pending: Max unacknowledged messages per consumer. Falls back to
settings.NATS_MAX_ACK_PENDING, then 100.

Use as an async context manager:
async with TaskQueueManager() as manager:
await manager.publish_task(123, {'data': 'value'})
tasks = await manager.reserve_tasks(123, count=64)
await manager.acknowledge_task(tasks[0].reply_subject)
"""

def __init__(self, nats_url: str | None = None):
def __init__(self, nats_url: str | None = None, max_ack_pending: int | None = None):
self.nats_url = nats_url or getattr(settings, "NATS_URL", "nats://nats:4222")
self.max_ack_pending = (
max_ack_pending if max_ack_pending is not None else getattr(settings, "NATS_MAX_ACK_PENDING", 100)
)
self.nc: nats.NATS | None = None
self.js: JetStreamContext | None = None

Expand Down Expand Up @@ -141,7 +149,7 @@ async def _ensure_consumer(self, job_id: int):
ack_wait=TASK_TTR, # Visibility timeout (TTR)
max_deliver=5, # Max retry attempts
deliver_policy=DeliverPolicy.ALL,
max_ack_pending=100, # Max unacked messages
max_ack_pending=self.max_ack_pending,
filter_subject=subject,
),
),
Expand Down
24 changes: 20 additions & 4 deletions compose/local/django/start
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,26 @@ set -o nounset

python manage.py migrate

# Launch VS Code debug server if DEBUGGER environment variable is set to 1
# Note that the --reload flag is not compatible with debugpy, so manually restart the server when code changes
# Set USE_UVICORN=1 to use the original raw uvicorn dev server instead of gunicorn
if [ "${USE_UVICORN:-0}" = "1" ]; then
if [ "${DEBUGGER:-0}" = "1" ]; then
exec python -Xfrozen_modules=off -m debugpy --listen 0.0.0.0:5678 -m uvicorn config.asgi:application --host 0.0.0.0
else
exec uvicorn config.asgi:application --host 0.0.0.0 --reload --reload-include '*.html'
fi
fi

# Gunicorn with UvicornWorker (production-parity mode, now the default)
# WEB_CONCURRENCY controls worker count (default: 1 for dev with auto-reload)
WORKERS=${WEB_CONCURRENCY:-1}

if [ "${DEBUGGER:-0}" = "1" ]; then
exec python -Xfrozen_modules=off -m debugpy --listen 0.0.0.0:5678 -m uvicorn config.asgi:application --host 0.0.0.0
echo "Starting Gunicorn with debugpy (1 worker)..."
exec python -Xfrozen_modules=off -m debugpy --listen 0.0.0.0:5678 -m gunicorn config.asgi --bind 0.0.0.0:8000 --workers 1 -k uvicorn.workers.UvicornWorker
elif [ "$WORKERS" -eq 1 ]; then
echo "Starting Gunicorn with 1 worker (auto-reload enabled)..."
exec gunicorn config.asgi --bind 0.0.0.0:8000 --workers 1 -k uvicorn.workers.UvicornWorker --reload
else
exec uvicorn config.asgi:application --host 0.0.0.0 --reload --reload-include '*.html'
echo "Starting Gunicorn with $WORKERS workers..."
exec gunicorn config.asgi --bind 0.0.0.0:8000 --workers "$WORKERS" -k uvicorn.workers.UvicornWorker
fi
8 changes: 8 additions & 0 deletions compose/production/django/start
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,12 @@ set -o nounset

python /app/manage.py collectstatic --noinput

# Gunicorn natively reads WEB_CONCURRENCY as its --workers default.
# If not set, default to CPU core count.
if [ -z "${WEB_CONCURRENCY:-}" ]; then
export WEB_CONCURRENCY=$(nproc)
fi

echo "Starting Gunicorn with $WEB_CONCURRENCY worker(s)..."

exec newrelic-admin run-program /usr/local/bin/gunicorn config.asgi --bind 0.0.0.0:5000 --chdir=/app -k uvicorn.workers.UvicornWorker