From 4af27e5439e9b79db5dc47826919b1ea0400f178 Mon Sep 17 00:00:00 2001 From: sanghun Date: Fri, 13 Feb 2026 08:39:48 +0900 Subject: [PATCH 01/11] docs: add STAR analysis for Django+Celery gevent DB thread-safety issue Document the DatabaseWrapper thread-sharing error that occurs when Celery alert worker uses gevent pool with concurrency=100. Includes root cause analysis, solution comparison, and reference materials. --- docs/GEVENT_DB_THREAD_SAFETY.md | 413 ++++++++++++++++++++++++++++++++ 1 file changed, 413 insertions(+) create mode 100644 docs/GEVENT_DB_THREAD_SAFETY.md diff --git a/docs/GEVENT_DB_THREAD_SAFETY.md b/docs/GEVENT_DB_THREAD_SAFETY.md new file mode 100644 index 0000000..85e680f --- /dev/null +++ b/docs/GEVENT_DB_THREAD_SAFETY.md @@ -0,0 +1,413 @@ +# Django + Celery Gevent Pool: DB Thread-Safety 이슈 분석 및 해결 + +## STAR 분석 + +--- + +## 1. Situation (상황) + +### 프로젝트 개요 + +SpeedCam 프로젝트는 과속 감지 시스템으로, 다음과 같은 MSA 아키텍처를 사용한다: + +| 인스턴스 | 역할 | 핵심 기술 | +|----------|------|-----------| +| speedcam-app | Django + Gunicorn + MQTT Subscriber | HTTP API, MQTT 메시지 수신 | +| speedcam-db | MySQL 8.0 | 3개 DB (detections, vehicles, notifications) | +| speedcam-mq | RabbitMQ | 메시지 큐 (ocr_queue, fcm_queue) | +| speedcam-ocr | Celery OCR Worker | `--pool=prefork --concurrency=4` | +| speedcam-alert | Celery Alert Worker | `--pool=gevent --concurrency=100` | +| speedcam-mon | Prometheus + Grafana + Loki + Jaeger | 모니터링 | + +### 문제 발생 지점 + +Jaeger 트레이싱에서 `send_notification` 태스크 실행 시 다음 에러가 반복적으로 발생: + +``` +DatabaseWrapper objects created in a thread can only be used in that same thread. +The object with alias 'detections_db' was created in thread id 35839779840 +and this is thread id 35804459008. +``` + +> **📸 캡처 1**: Jaeger UI에서 `send_notification` span의 에러 로그 +> - URL: `http://34.47.70.132:16686` +> - 검색 조건: Service=`speedcam-alert`, Operation=`send_notification` +> - 에러가 포함된 span을 클릭하여 상세 로그 확인 +> - **캡처 항목**: span 타임라인 + Logs 탭의 에러 메시지 전문 + +### Alert Worker 실행 설정 + +```bash +# scripts/start_alert_worker.sh +opentelemetry-instrument \ + --service_name speedcam-alert \ + celery -A config worker \ + --pool=gevent \ # ← gevent pool 사용 + --concurrency=${ALERT_CONCURRENCY:-100} \ # ← greenlet 100개 + --queues=fcm_queue \ + --hostname=alert@%h \ + --loglevel=${LOG_LEVEL:-info} +``` + +> **📸 캡처 2**: Alert Worker 컨테이너 시작 로그 +> ```bash +> gcloud compute ssh speedcam-alert --zone=asia-northeast3-a \ +> -- sudo docker logs speedcam-alert 2>&1 | head -20 +> ``` +> - **캡처 항목**: `celery@alert` 워커 시작 메시지에서 pool=gevent, concurrency=100 확인 + +### Django DB 설정 (CONN_MAX_AGE 미설정) + +```python +# config/settings/prod.py +DATABASES = { + "default": { + "ENGINE": "django.db.backends.mysql", + "NAME": "speedcam", + # ... (CONN_MAX_AGE 미설정 → 기본값 0) + }, + "vehicles_db": { ... }, # CONN_MAX_AGE 미설정 + "detections_db": { ... }, # CONN_MAX_AGE 미설정 + "notifications_db": { ... }, # CONN_MAX_AGE 미설정 +} +``` + +### 문제의 태스크 코드 + +```python +# tasks/notification_tasks.py +@shared_task( + bind=True, + max_retries=3, + autoretry_for=(Exception,), # ← 모든 예외에 대해 자동 재시도 + retry_backoff=True, + acks_late=True, +) +def send_notification(self, detection_id: int): + from apps.detections.models import Detection + # ... + detection = Detection.objects.using("detections_db").get( # ← line 36: 에러 발생 지점 + id=detection_id, status="completed" + ) +``` + +이 태스크는 3개의 DB에 접근한다: +- `detections_db`: Detection 조회 (line 36) +- `notifications_db`: Notification 생성 (line 59, 89, 129, 145, 168) +- `vehicles_db`: Vehicle 조회 (line 109) + +--- + +## 2. Task (과제) + +### 해결해야 할 문제 + +1. **즉각적 문제**: `send_notification` 태스크가 gevent greenlet 환경에서 DB 커넥션 스레드 공유 에러로 실패 +2. **근본 원인 파악**: Django ORM의 스레드 안전성 모델과 gevent greenlet 간의 충돌 메커니즘 규명 +3. **안정적 해결**: 프로덕션 환경에서 100 concurrency gevent pool이 안정적으로 DB 접근하도록 수정 + +### 성공 기준 + +- [ ] Jaeger에서 `send_notification` span에 `DatabaseWrapper` 에러 없음 +- [ ] Alert Worker의 100 concurrent greenlet이 안정적으로 동작 +- [ ] 기존 OCR Worker(prefork pool)에 부정적 영향 없음 + +--- + +## 3. Action (분석 및 조치) + +### 3-1. 근본 원인 분석 + +#### Django의 DB 커넥션 스레드 격리 메커니즘 + +Django는 `django/db/backends/base/base.py`의 `validate_thread_sharing()` 메서드로 DB 커넥션의 스레드 간 공유를 차단한다: + +```python +# django/db/backends/base/base.py (Django 소스코드) +def validate_thread_sharing(self): + if not (self.allow_thread_sharing or self._thread_ident == _thread.get_ident()): + raise DatabaseError( + "DatabaseWrapper objects created in a " + "thread can only be used in that same thread. The object " + "with alias '%s' was created in thread id %s and this is " + "thread id %s." % (self.alias, self._thread_ident, _thread.get_ident()) + ) +``` + +> **📸 캡처 3**: Django 소스코드에서 `validate_thread_sharing()` 확인 +> ```bash +> # Django 패키지 내 소스 위치 확인 +> gcloud compute ssh speedcam-alert --zone=asia-northeast3-a \ +> -- sudo docker exec speedcam-alert python -c " +> import django.db.backends.base.base as b +> import inspect +> print(inspect.getsource(b.BaseDatabaseWrapper.validate_thread_sharing))" +> ``` +> - **캡처 항목**: 실제 배포된 Django 버전의 `validate_thread_sharing()` 소스코드 출력 + +#### 핵심 메커니즘: Greenlet ≠ Thread이지만 다른 Thread ID를 가짐 + +``` +[일반 스레드 모델] +Thread A (id=100) → DB Connection A (thread_ident=100) → ✅ 같은 ID + +[Gevent Greenlet 모델] +Greenlet A (id=200) → DB Connection 생성 (thread_ident=200) + ↓ (예외 발생 → autoretry) +Greenlet B (id=300) → DB Connection 재사용 시도 (thread_ident=200) → ❌ ID 불일치! +``` + +gevent가 `monkey.patch_all()`을 실행하면: +1. `threading.local()`이 greenlet-local로 패치됨 +2. `_thread.get_ident()`가 greenlet ID를 반환하도록 패치됨 +3. 각 greenlet은 고유한 "스레드 ID"를 가지게 됨 + +#### 왜 `autoretry_for=(Exception,)`이 문제를 악화시키는가 + +``` +Timeline: + t=0 Greenlet-A (id=200): send_notification(42) 시작 + t=1 Greenlet-A (id=200): DB connection 생성 (thread_ident=200) + t=2 Greenlet-A (id=200): Detection.objects.get() → 예외 발생 + t=3 [Celery autoretry] 태스크를 재시도 큐에 넣음 + t=4 Greenlet-B (id=300): send_notification(42) 재시도 시작 ← 다른 greenlet! + t=5 Greenlet-B (id=300): 기존 connection 접근 시도 + → thread_ident(200) ≠ current_ident(300) → 💥 에러! +``` + +> **📸 캡처 4**: Jaeger에서 retry span 확인 +> - URL: `http://34.47.70.132:16686` +> - `send_notification` 트레이스에서 retry가 발생한 span을 찾아 원본과 retry의 span 관계 확인 +> - **캡처 항목**: 원본 span → retry span 으로 이어지는 트레이스 타임라인 + +#### OCR Worker는 왜 이 문제가 없는가 + +| 설정 | Alert Worker | OCR Worker | +|------|-------------|------------| +| Pool | `gevent` (greenlet) | `prefork` (프로세스) | +| Concurrency | 100 | 4 | +| 동시성 단위 | greenlet (같은 프로세스, 다른 "스레드" ID) | 별도 프로세스 (fork 후 새 커넥션) | +| `_thread.get_ident()` | greenlet마다 다름 | 프로세스마다 독립 | +| DB 커넥션 | greenlet 간 공유 위험 | 프로세스 격리로 안전 | + +### 3-2. 해결 방안 비교 + +| 방안 | 설명 | 장점 | 단점 | 채택 | +|------|------|------|------|------| +| **A. `close_old_connections()` 호출** | 태스크 시작 시 기존 커넥션 닫기 | 간단, 비침습적 | 매 태스크마다 새 커넥션 오버헤드 | ✅ | +| **B. Celery Signal 사용** | `task_prerun`/`task_postrun` signal로 일괄 처리 | 태스크 코드 수정 없음 | 전역 영향, 디버깅 난이도 | ⚠️ 대안 | +| **C. `CONN_MAX_AGE = 0` 설정** | 매 요청마다 커넥션 닫기 | Django 표준 방식 | Celery에서는 "요청" 개념 없음, 불충분 | ❌ | +| **D. `inc_thread_sharing()` 사용** | 스레드 공유 허용 플래그 | 에러 해소 | race condition 위험, 비권장 | ❌ | +| **E. gevent → prefork 전환** | Pool 타입 변경 | 근본 해결 | I/O 집약 태스크에 비효율적 | ❌ | + +### 3-3. 채택한 해결 방안: `db.close_old_connections()` + +#### 수정 전 (현재 코드) + +```python +# tasks/notification_tasks.py +@shared_task(bind=True, max_retries=3, autoretry_for=(Exception,), ...) +def send_notification(self, detection_id: int): + from apps.detections.models import Detection + from apps.notifications.models import Notification + from apps.vehicles.models import Vehicle + + try: + detection = Detection.objects.using("detections_db").get(...) # ← 💥 에러 발생 가능 +``` + +#### 수정 후 (적용 예정) + +```python +# tasks/notification_tasks.py +from django import db # ← 추가 + +@shared_task(bind=True, max_retries=3, autoretry_for=(Exception,), ...) +def send_notification(self, detection_id: int): + # Gevent greenlet 환경에서 스레드 간 DB 커넥션 공유 방지 + db.close_old_connections() # ← 추가: 이전 greenlet의 stale 커넥션 정리 + + from apps.detections.models import Detection + from apps.notifications.models import Notification + from apps.vehicles.models import Vehicle + + try: + detection = Detection.objects.using("detections_db").get(...) # ← ✅ 새 커넥션 사용 +``` + +#### `db.close_old_connections()`의 동작 원리 + +```python +# django/db/__init__.py +def close_old_connections(**kwargs): + for conn in connections.all(): + conn.close_if_unusable_or_obsolete() +``` + +이 함수는: +1. 모든 DB alias(`default`, `detections_db`, `vehicles_db`, `notifications_db`)를 순회 +2. 각 커넥션이 **사용 불가능(unusable)**하거나 **수명 초과(obsolete)**인지 확인 +3. 해당되면 커넥션을 닫음 → 다음 ORM 호출 시 새 커넥션 자동 생성 + +greenlet 환경에서 이 함수가 효과적인 이유: +- 이전 greenlet이 남긴 stale 커넥션을 정리 +- 현재 greenlet에서 새 커넥션이 생성되므로 `_thread_ident`가 현재 greenlet ID와 일치 + +### 3-4. 유사 사례: Java Virtual Thread + +이 문제는 Python/Django에만 국한되지 않는다. Java의 Virtual Thread(Project Loom)에서도 유사한 범주의 문제가 발생할 수 있다: + +| 항목 | Python Gevent Greenlet | Java Virtual Thread | +|------|----------------------|---------------------| +| ThreadLocal 동작 | greenlet-local로 패치됨 | 가상 스레드마다 고유 ThreadLocal | +| DB 커넥션 관리 | Django의 thread-local 저장 | 보통 HikariCP 같은 글로벌 풀 사용 | +| 핵심 문제 | **thread ID 불일치로 validation 실패** | **Connection pinning** (carrier thread 고갈) | +| 프레임워크 차단 | Django가 명시적 validate | 명시적 차단 없음, 암묵적 race condition | + +**공통 근본 원인**: 경량 동시성 단위(greenlet/virtual thread)가 thread-local 스토리지 패턴과 충돌 + +--- + +## 4. Result (결과) + +### 4-1. 수정 적용 전 상태 (Before) + +> **📸 캡처 5**: 수정 전 Jaeger 에러 확인 +> - URL: `http://34.47.70.132:16686` +> - Service: `speedcam-alert`, Lookback: Last Hour +> - **캡처 항목**: `send_notification` 트레이스 목록에서 에러(빨간색) 표시된 span들 + +> **📸 캡처 6**: 수정 전 Grafana Logs Explorer +> - URL: `http://34.47.70.132:3000` → Logs Explorer 대시보드 +> - Container: `speedcam-alert` 선택 +> - Search: `DatabaseWrapper` +> - **캡처 항목**: "DatabaseWrapper objects created in a thread" 에러 로그 라인들 + +> **📸 캡처 7**: 수정 전 Alert Worker 컨테이너 로그 +> ```bash +> gcloud compute ssh speedcam-alert --zone=asia-northeast3-a \ +> -- sudo docker logs speedcam-alert 2>&1 | grep -i "DatabaseWrapper" | tail -10 +> ``` +> - **캡처 항목**: 터미널에서 grep한 DatabaseWrapper 에러 로그 + +### 4-2. 수정 적용 후 상태 (After) + +> **📸 캡처 8**: 수정 후 코드 diff 확인 +> ```bash +> git diff docs/gevent-db-thread-safety -- tasks/notification_tasks.py +> ``` +> - **캡처 항목**: `db.close_old_connections()` 추가된 diff 출력 + +> **📸 캡처 9**: 수정 후 Alert Worker 재배포 확인 +> ```bash +> gcloud compute ssh speedcam-alert --zone=asia-northeast3-a \ +> -- sudo docker compose -f ~/depoly/compose/docker-compose.alert.yml up -d --build +> ``` +> - **캡처 항목**: 컨테이너 재시작 로그 + +> **📸 캡처 10**: 수정 후 Jaeger 정상 트레이스 +> - URL: `http://34.47.70.132:16686` +> - Service: `speedcam-alert` +> - **캡처 항목**: `send_notification` 트레이스에 에러 없는 정상 span들 (수정 후 시점부터) + +> **📸 캡처 11**: 수정 후 Grafana Logs Explorer +> - Container: `speedcam-alert`, Search: `DatabaseWrapper` +> - **캡처 항목**: 수정 후 시점부터 DatabaseWrapper 에러 로그 없음 + +### 4-3. 영향 분석 + +| 항목 | Before | After | +|------|--------|-------| +| `send_notification` 에러율 | DatabaseWrapper 에러 반복 발생 | 에러 제거 | +| DB 커넥션 패턴 | stale 커넥션 재사용 시도 → 실패 | 매 태스크 시작 시 정리 → 새 커넥션 | +| OCR Worker 영향 | - | 없음 (prefork pool이므로 해당 없음) | +| 성능 오버헤드 | - | 미미 (커넥션 정리는 O(n) where n=DB alias 수=4) | + +--- + +## 참고 자료 (References) + +### 공식 문서 + +| 출처 | 링크 | 내용 | +|------|------|------| +| Django Databases | https://docs.djangoproject.com/en/5.1/ref/databases/ | CONN_MAX_AGE, 스레드 안전성, persistent connections | +| Celery Gevent Pool | https://docs.celeryq.dev/en/main/userguide/concurrency/gevent.html | gevent pool 공식 가이드 | +| Celery Workers Guide | https://docs.celeryq.dev/en/stable/userguide/workers.html | Worker 구성 및 pool 타입 | +| Gevent Monkey Patching | http://www.gevent.org/api/gevent.monkey.html | monkey patch API 레퍼런스 | + +### Django 소스코드 + +| 파일 | 내용 | +|------|------| +| `django/db/backends/base/base.py` | `validate_thread_sharing()` 구현 | +| `django/db/__init__.py` | `close_old_connections()` 구현 | +| [Django PR #10972](https://github.com/django/django/pull/10972) | Thread sharing 로직 개선 | +| [Django Ticket #30171](https://code.djangoproject.com/ticket/30171) | Threading 에러 수정 | +| [Django Ticket #25714](https://code.djangoproject.com/ticket/25714) | DatabaseWrapper 스레드 에러 | + +### GitHub Issues (동일 문제 보고) + +| 이슈 | 내용 | +|------|------| +| [Gunicorn #879](https://github.com/benoitc/gunicorn/issues/879) | Gunicorn + Django + Gevent에서 동일 에러 | +| [Celery #4489](https://github.com/celery/celery/issues/4489) | Celery에서 DatabaseWrapper 스레드 에러 | +| [Celery #2453](https://github.com/celery/celery/issues/2453) | Celery가 DB 커넥션을 닫지 않는 문제 | +| [Celery #5924](https://github.com/celery/celery/issues/5924) | Django + eventlet 패치 깨짐 | +| [Celery #3520](https://github.com/celery/celery/issues/3520) | Prefork에서 스레드 에러 | + +### 기술 블로그 및 아티클 + +| 출처 | 링크 | 내용 | +|------|------|------| +| Celery School | https://celery.school/celery-gevent-5-lessons-learned | Gevent Pool 5가지 교훈 | +| DoorDash Engineering | https://doordash.engineering/2021/01/19/scaling-efficienc-of-a-python-service-with-gevent/ | 프로덕션 gevent 확장 사례 | +| Medium (Celery Workers) | https://medium.com/@gupta.rishabh2912/mastering-celery-workers-in-django-when-to-use-prefork-eventlet-or-gevent-2679cffae2bd | Pool 타입 비교 가이드 | +| Vinta Software | https://www.vintasoftware.com/blog/guide-django-celery-tasks | Django Celery 고급 가이드 | +| LimeChat | https://limechat.ai/blog/scaling-django-server-celery-workers | Django + Celery 스케일링 전략 | +| Heroku | https://devcenter.heroku.com/articles/python-concurrency-and-database-connections | Python 동시성 + DB 커넥션 best practices | + +--- + +## 부록 + +### A. 확인 명령어 모음 + +```bash +# 1. Alert Worker 로그에서 에러 확인 +gcloud compute ssh speedcam-alert --zone=asia-northeast3-a \ + -- sudo docker logs speedcam-alert 2>&1 | grep -i "DatabaseWrapper" + +# 2. Jaeger에서 에러 트레이스 API 조회 +curl -s "http://34.47.70.132:16686/api/traces?service=speedcam-alert&operation=send_notification&limit=20" \ + | python3 -m json.tool | grep -A2 "error" + +# 3. Django validate_thread_sharing 소스 확인 +gcloud compute ssh speedcam-alert --zone=asia-northeast3-a \ + -- sudo docker exec speedcam-alert python -c " +import django.db.backends.base.base as b +import inspect +print(inspect.getsource(b.BaseDatabaseWrapper.validate_thread_sharing))" + +# 4. 현재 greenlet 수 확인 (gevent pool 상태) +gcloud compute ssh speedcam-alert --zone=asia-northeast3-a \ + -- sudo docker exec speedcam-alert python -c " +import gevent +print(f'Current greenlet count: {len(gevent.hub.get_hub().loop._callbacks)}')" + +# 5. MySQL 커넥션 수 확인 +gcloud compute ssh speedcam-db --zone=asia-northeast3-a \ + -- sudo docker exec speedcam-mysql mysql -usa -p'' \ + -e "SHOW STATUS LIKE 'Threads_connected';" +``` + +### B. 관련 프로젝트 파일 경로 + +| 파일 | 역할 | +|------|------| +| `tasks/notification_tasks.py` | 알림 전송 태스크 (문제 발생 지점) | +| `tasks/ocr_tasks.py` | OCR 처리 태스크 (prefork pool, 참고용) | +| `scripts/start_alert_worker.sh` | Alert Worker 시작 스크립트 | +| `scripts/start_ocr_worker.sh` | OCR Worker 시작 스크립트 | +| `config/settings/prod.py` | 프로덕션 DB 설정 (CONN_MAX_AGE 미설정) | +| `config/settings/base.py` | Celery 공통 설정 | From f3db496edfbf990aa1db7a5f42f80d696800056e Mon Sep 17 00:00:00 2001 From: sanghun Date: Fri, 13 Feb 2026 08:47:00 +0900 Subject: [PATCH 02/11] docs: add monkey-patching late warning analysis to STAR document Add Layer 1 root cause: opentelemetry-instrument imports ssl before Celery calls gevent.monkey.patch_all(), causing incomplete patching. Include actual container startup logs showing MonkeyPatchWarning and _after_fork_in_child AssertionError. Add cause hierarchy diagram. --- docs/GEVENT_DB_THREAD_SAFETY.md | 77 +++++++++++++++++++++++++++++++-- 1 file changed, 74 insertions(+), 3 deletions(-) diff --git a/docs/GEVENT_DB_THREAD_SAFETY.md b/docs/GEVENT_DB_THREAD_SAFETY.md index 85e680f..367ccdc 100644 --- a/docs/GEVENT_DB_THREAD_SAFETY.md +++ b/docs/GEVENT_DB_THREAD_SAFETY.md @@ -118,7 +118,63 @@ def send_notification(self, detection_id: int): ### 3-1. 근본 원인 분석 -#### Django의 DB 커넥션 스레드 격리 메커니즘 +#### 원인 Layer 1: Gevent Monkey-Patching 순서 문제 (Late Patching) + +Alert Worker 시작 로그에서 다음 경고들이 확인된다: + +``` +# 실제 speedcam-alert 컨테이너 시작 로그 +MonkeyPatchWarning: Monkey-patching ssl after ssl has already been imported +may lead to errors, including RecursionError on Python 3.6. It may also +silently lead to incorrect behaviour on Python 3.7. +Please monkey-patch earlier. See https://github.com/gevent/gevent/issues/1016. +Modules that had direct imports (NOT patched): + ['urllib3.util.ssl_ (...)', 'urllib3.util (...)'] +``` + +``` +Exception ignored in: + File "gevent/threading.py", line 264, in _after_fork_in_child + assert len(active) == 1 + ^^^^^^^^^^^^^^^^ +AssertionError +``` + +> **📸 캡처 2-1**: Alert Worker 컨테이너 시작 로그 (경고 메시지 포함) +> ```bash +> gcloud compute ssh speedcam-alert --zone=asia-northeast3-a \ +> -- sudo docker logs speedcam-alert 2>&1 | head -20 +> ``` +> - **캡처 항목**: `MonkeyPatchWarning`과 `AssertionError` 전문 + +**이 경고가 발생하는 이유:** + +`start_alert_worker.sh`의 실행 체인: + +``` +opentelemetry-instrument → celery -A config worker --pool=gevent +``` + +``` +실행 순서 (시간순): +1. opentelemetry-instrument 시작 + → urllib3 import → ssl import됨 (이미 native ssl 모듈 로드) +2. celery -A config worker --pool=gevent 실행 +3. Celery가 --pool=gevent 감지 + → gevent.monkey.patch_all() 호출 ← 💥 ssl은 이미 import됨! +4. ssl, 일부 threading 모듈이 불완전하게 패치된 상태로 동작 +``` + +**영향:** +- `_thread.get_ident()`는 패치되어 greenlet ID를 반환하지만 +- `threading.local()`의 일부 동작이 불완전할 수 있음 +- `_after_fork_in_child`에서 active 스레드 수가 예상과 다름 (AssertionError) +- **결과**: greenlet 간 DB 커넥션 격리가 불안정해져 `validate_thread_sharing()` 에러 발생 확률 증가 + +> **참고**: gevent 공식 문서는 monkey-patching을 "프로그램 생명주기에서 가능한 한 빨리, 다른 import보다 먼저" 수행하라고 권고한다. +> 그러나 `opentelemetry-instrument` 래퍼가 먼저 실행되므로 현재 구조에서는 이를 완전히 제어하기 어렵다. + +#### 원인 Layer 2: Django의 DB 커넥션 스레드 격리 메커니즘 Django는 `django/db/backends/base/base.py`의 `validate_thread_sharing()` 메서드로 DB 커넥션의 스레드 간 공유를 차단한다: @@ -145,7 +201,7 @@ def validate_thread_sharing(self): > ``` > - **캡처 항목**: 실제 배포된 Django 버전의 `validate_thread_sharing()` 소스코드 출력 -#### 핵심 메커니즘: Greenlet ≠ Thread이지만 다른 Thread ID를 가짐 +#### 원인 Layer 3: Greenlet ≠ Thread이지만 다른 Thread ID를 가짐 ``` [일반 스레드 모델] @@ -162,7 +218,7 @@ gevent가 `monkey.patch_all()`을 실행하면: 2. `_thread.get_ident()`가 greenlet ID를 반환하도록 패치됨 3. 각 greenlet은 고유한 "스레드 ID"를 가지게 됨 -#### 왜 `autoretry_for=(Exception,)`이 문제를 악화시키는가 +#### 원인 Layer 4: `autoretry_for=(Exception,)`이 문제를 악화시키는가 ``` Timeline: @@ -190,6 +246,20 @@ Timeline: | `_thread.get_ident()` | greenlet마다 다름 | 프로세스마다 독립 | | DB 커넥션 | greenlet 간 공유 위험 | 프로세스 격리로 안전 | +#### 원인 계층 요약 + +``` +Layer 1: Late Monkey-Patching (opentelemetry-instrument → ssl 먼저 import) + ↓ 불완전한 threading 패치 +Layer 2: Django validate_thread_sharing() 스레드 격리 검증 + ↓ _thread.get_ident()로 커넥션 소유자 확인 +Layer 3: Greenlet마다 다른 Thread ID + ↓ greenlet-local 저장소에 커넥션 바인딩 +Layer 4: autoretry_for=(Exception,) 자동 재시도 + ↓ 다른 greenlet에서 재시도 → stale 커넥션 접근 + 💥 DatabaseWrapper thread-sharing error +``` + ### 3-2. 해결 방안 비교 | 방안 | 설명 | 장점 | 단점 | 채택 | @@ -199,6 +269,7 @@ Timeline: | **C. `CONN_MAX_AGE = 0` 설정** | 매 요청마다 커넥션 닫기 | Django 표준 방식 | Celery에서는 "요청" 개념 없음, 불충분 | ❌ | | **D. `inc_thread_sharing()` 사용** | 스레드 공유 허용 플래그 | 에러 해소 | race condition 위험, 비권장 | ❌ | | **E. gevent → prefork 전환** | Pool 타입 변경 | 근본 해결 | I/O 집약 태스크에 비효율적 | ❌ | +| **F. OTel 래퍼 제거 후 코드 내 초기화** | monkey.patch_all()을 앱 진입점에서 먼저 호출 | Late patching 해소 | OTel 자동 계측 포기 | ⚠️ 장기 검토 | ### 3-3. 채택한 해결 방안: `db.close_old_connections()` From 6d58838ae6b6f2d9d1145fccf51c8711b5ab1521 Mon Sep 17 00:00:00 2001 From: sanghun Date: Fri, 13 Feb 2026 09:01:24 +0900 Subject: [PATCH 03/11] docs: rewrite STAR document as blog-friendly narrative Restructure from verbose reference format to storytelling flow. Remove deployment-specific instructions, reduce redundancy, curate references to official docs + 3 GitHub issues + 3 enterprise blogs. --- docs/GEVENT_DB_THREAD_SAFETY.md | 516 ++++++++------------------------ 1 file changed, 130 insertions(+), 386 deletions(-) diff --git a/docs/GEVENT_DB_THREAD_SAFETY.md b/docs/GEVENT_DB_THREAD_SAFETY.md index 367ccdc..9023890 100644 --- a/docs/GEVENT_DB_THREAD_SAFETY.md +++ b/docs/GEVENT_DB_THREAD_SAFETY.md @@ -1,484 +1,228 @@ -# Django + Celery Gevent Pool: DB Thread-Safety 이슈 분석 및 해결 +# Celery Gevent Pool에서 Django DB 커넥션이 깨지는 이유 -## STAR 분석 +> Celery의 gevent pool과 Django ORM을 함께 쓸 때 마주치는 `DatabaseWrapper objects created in a thread can only be used in that same thread` 에러의 원인과 해결. --- -## 1. Situation (상황) +## Situation — 어느 날 Jaeger에서 발견한 에러 -### 프로젝트 개요 - -SpeedCam 프로젝트는 과속 감지 시스템으로, 다음과 같은 MSA 아키텍처를 사용한다: - -| 인스턴스 | 역할 | 핵심 기술 | -|----------|------|-----------| -| speedcam-app | Django + Gunicorn + MQTT Subscriber | HTTP API, MQTT 메시지 수신 | -| speedcam-db | MySQL 8.0 | 3개 DB (detections, vehicles, notifications) | -| speedcam-mq | RabbitMQ | 메시지 큐 (ocr_queue, fcm_queue) | -| speedcam-ocr | Celery OCR Worker | `--pool=prefork --concurrency=4` | -| speedcam-alert | Celery Alert Worker | `--pool=gevent --concurrency=100` | -| speedcam-mon | Prometheus + Grafana + Loki + Jaeger | 모니터링 | - -### 문제 발생 지점 - -Jaeger 트레이싱에서 `send_notification` 태스크 실행 시 다음 에러가 반복적으로 발생: - -``` -DatabaseWrapper objects created in a thread can only be used in that same thread. -The object with alias 'detections_db' was created in thread id 35839779840 -and this is thread id 35804459008. -``` - -> **📸 캡처 1**: Jaeger UI에서 `send_notification` span의 에러 로그 -> - URL: `http://34.47.70.132:16686` -> - 검색 조건: Service=`speedcam-alert`, Operation=`send_notification` -> - 에러가 포함된 span을 클릭하여 상세 로그 확인 -> - **캡처 항목**: span 타임라인 + Logs 탭의 에러 메시지 전문 - -### Alert Worker 실행 설정 +과속 감지 시스템의 알림 워커(Alert Worker)는 FCM 푸시 전송처럼 I/O 집약적인 작업을 처리한다. 네트워크 대기가 대부분이라 `--pool=gevent --concurrency=100`으로 greenlet 100개를 띄워 처리량을 극대화했다. ```bash # scripts/start_alert_worker.sh opentelemetry-instrument \ --service_name speedcam-alert \ celery -A config worker \ - --pool=gevent \ # ← gevent pool 사용 - --concurrency=${ALERT_CONCURRENCY:-100} \ # ← greenlet 100개 + --pool=gevent \ + --concurrency=${ALERT_CONCURRENCY:-100} \ --queues=fcm_queue \ - --hostname=alert@%h \ - --loglevel=${LOG_LEVEL:-info} + --hostname=alert@%h ``` -> **📸 캡처 2**: Alert Worker 컨테이너 시작 로그 -> ```bash -> gcloud compute ssh speedcam-alert --zone=asia-northeast3-a \ -> -- sudo docker logs speedcam-alert 2>&1 | head -20 -> ``` -> - **캡처 항목**: `celery@alert` 워커 시작 메시지에서 pool=gevent, concurrency=100 확인 - -### Django DB 설정 (CONN_MAX_AGE 미설정) +문제는 Jaeger 트레이싱을 확인하면서 드러났다. `send_notification` 태스크에서 이런 에러가 반복되고 있었다: -```python -# config/settings/prod.py -DATABASES = { - "default": { - "ENGINE": "django.db.backends.mysql", - "NAME": "speedcam", - # ... (CONN_MAX_AGE 미설정 → 기본값 0) - }, - "vehicles_db": { ... }, # CONN_MAX_AGE 미설정 - "detections_db": { ... }, # CONN_MAX_AGE 미설정 - "notifications_db": { ... }, # CONN_MAX_AGE 미설정 -} ``` - -### 문제의 태스크 코드 - -```python -# tasks/notification_tasks.py -@shared_task( - bind=True, - max_retries=3, - autoretry_for=(Exception,), # ← 모든 예외에 대해 자동 재시도 - retry_backoff=True, - acks_late=True, -) -def send_notification(self, detection_id: int): - from apps.detections.models import Detection - # ... - detection = Detection.objects.using("detections_db").get( # ← line 36: 에러 발생 지점 - id=detection_id, status="completed" - ) +DatabaseWrapper objects created in a thread can only be used in that same thread. +The object with alias 'detections_db' was created in thread id 35839779840 +and this is thread id 35804459008. ``` -이 태스크는 3개의 DB에 접근한다: -- `detections_db`: Detection 조회 (line 36) -- `notifications_db`: Notification 생성 (line 59, 89, 129, 145, 168) -- `vehicles_db`: Vehicle 조회 (line 109) - ---- + -## 2. Task (과제) +같은 프로젝트의 OCR 워커(`--pool=prefork --concurrency=4`)는 멀쩡했다. Alert 워커만 터지고 있었다. -### 해결해야 할 문제 +--- -1. **즉각적 문제**: `send_notification` 태스크가 gevent greenlet 환경에서 DB 커넥션 스레드 공유 에러로 실패 -2. **근본 원인 파악**: Django ORM의 스레드 안전성 모델과 gevent greenlet 간의 충돌 메커니즘 규명 -3. **안정적 해결**: 프로덕션 환경에서 100 concurrency gevent pool이 안정적으로 DB 접근하도록 수정 +## Task — 왜 greenlet에서만 터지는가 -### 성공 기준 +에러 메시지를 곧이곧대로 읽으면 "스레드 A가 만든 DB 커넥션을 스레드 B가 쓰려 했다"는 뜻이다. 그런데 gevent는 스레드가 아니라 greenlet을 쓴다. 문제를 풀려면 세 가지를 이해해야 했다: -- [ ] Jaeger에서 `send_notification` span에 `DatabaseWrapper` 에러 없음 -- [ ] Alert Worker의 100 concurrent greenlet이 안정적으로 동작 -- [ ] 기존 OCR Worker(prefork pool)에 부정적 영향 없음 +1. Django가 DB 커넥션을 어떻게 격리하는지 +2. gevent monkey-patching이 그 격리를 어떻게 무너뜨리는지 +3. Celery의 autoretry가 왜 상황을 악화시키는지 --- -## 3. Action (분석 및 조치) - -### 3-1. 근본 원인 분석 +## Action — 원인 추적과 해결 -#### 원인 Layer 1: Gevent Monkey-Patching 순서 문제 (Late Patching) +### 1단계: Django의 스레드 격리 검증 -Alert Worker 시작 로그에서 다음 경고들이 확인된다: +Django는 모든 DB 쿼리 실행 전에 `validate_thread_sharing()`을 호출한다: -``` -# 실제 speedcam-alert 컨테이너 시작 로그 -MonkeyPatchWarning: Monkey-patching ssl after ssl has already been imported -may lead to errors, including RecursionError on Python 3.6. It may also -silently lead to incorrect behaviour on Python 3.7. -Please monkey-patch earlier. See https://github.com/gevent/gevent/issues/1016. -Modules that had direct imports (NOT patched): - ['urllib3.util.ssl_ (...)', 'urllib3.util (...)'] +```python +# django/db/backends/base/base.py +def validate_thread_sharing(self): + if not (self.allow_thread_sharing + or self._thread_ident == _thread.get_ident()): + raise DatabaseError( + "DatabaseWrapper objects created in a thread can only " + "be used in that same thread." + ) ``` -``` -Exception ignored in: - File "gevent/threading.py", line 264, in _after_fork_in_child - assert len(active) == 1 - ^^^^^^^^^^^^^^^^ -AssertionError -``` +커넥션이 생성될 때의 `_thread.get_ident()` 값을 `_thread_ident`에 저장해두고, 쿼리 실행 시점의 ID와 비교한다. 다르면 즉시 예외를 던진다. -> **📸 캡처 2-1**: Alert Worker 컨테이너 시작 로그 (경고 메시지 포함) -> ```bash -> gcloud compute ssh speedcam-alert --zone=asia-northeast3-a \ -> -- sudo docker logs speedcam-alert 2>&1 | head -20 -> ``` -> - **캡처 항목**: `MonkeyPatchWarning`과 `AssertionError` 전문 +일반적인 멀티스레드 환경에서는 당연히 잘 동작한다. 각 스레드는 고유한 ID를 가지고, `threading.local()`이 스레드별 커넥션을 격리하니까. -**이 경고가 발생하는 이유:** +### 2단계: gevent가 바꿔놓은 규칙 -`start_alert_worker.sh`의 실행 체인: +gevent의 `monkey.patch_all()`은 `_thread.get_ident()`를 패치해서 **greenlet ID를 반환**하도록 바꾼다. 즉, greenlet마다 다른 "스레드 ID"를 갖게 된다. ``` -opentelemetry-instrument → celery -A config worker --pool=gevent -``` +[일반 스레드] +Thread A (id=100) → DB Connection (thread_ident=100) → 쿼리 시 get_ident()=100 ✅ -``` -실행 순서 (시간순): -1. opentelemetry-instrument 시작 - → urllib3 import → ssl import됨 (이미 native ssl 모듈 로드) -2. celery -A config worker --pool=gevent 실행 -3. Celery가 --pool=gevent 감지 - → gevent.monkey.patch_all() 호출 ← 💥 ssl은 이미 import됨! -4. ssl, 일부 threading 모듈이 불완전하게 패치된 상태로 동작 +[Gevent Greenlet] +Greenlet A (id=200) → DB Connection (thread_ident=200) + ↓ 예외 발생 → autoretry +Greenlet B (id=300) → 같은 Connection 접근 → get_ident()=300 ≠ 200 ❌ ``` -**영향:** -- `_thread.get_ident()`는 패치되어 greenlet ID를 반환하지만 -- `threading.local()`의 일부 동작이 불완전할 수 있음 -- `_after_fork_in_child`에서 active 스레드 수가 예상과 다름 (AssertionError) -- **결과**: greenlet 간 DB 커넥션 격리가 불안정해져 `validate_thread_sharing()` 에러 발생 확률 증가 +여기서 `autoretry_for=(Exception,)` 설정이 문제를 악화시킨다. Celery의 자동 재시도는 태스크를 큐에 다시 넣고, **다른 greenlet이 그것을 집어간다.** 이전 greenlet이 남긴 DB 커넥션에 새 greenlet이 접근하는 순간 Django의 검증에 걸린다. -> **참고**: gevent 공식 문서는 monkey-patching을 "프로그램 생명주기에서 가능한 한 빨리, 다른 import보다 먼저" 수행하라고 권고한다. -> 그러나 `opentelemetry-instrument` 래퍼가 먼저 실행되므로 현재 구조에서는 이를 완전히 제어하기 어렵다. + -#### 원인 Layer 2: Django의 DB 커넥션 스레드 격리 메커니즘 +### 3단계: Late Monkey-Patching이라는 복병 -Django는 `django/db/backends/base/base.py`의 `validate_thread_sharing()` 메서드로 DB 커넥션의 스레드 간 공유를 차단한다: +워커 시작 로그를 보면 또 다른 단서가 있었다: -```python -# django/db/backends/base/base.py (Django 소스코드) -def validate_thread_sharing(self): - if not (self.allow_thread_sharing or self._thread_ident == _thread.get_ident()): - raise DatabaseError( - "DatabaseWrapper objects created in a " - "thread can only be used in that same thread. The object " - "with alias '%s' was created in thread id %s and this is " - "thread id %s." % (self.alias, self._thread_ident, _thread.get_ident()) - ) ``` - -> **📸 캡처 3**: Django 소스코드에서 `validate_thread_sharing()` 확인 -> ```bash -> # Django 패키지 내 소스 위치 확인 -> gcloud compute ssh speedcam-alert --zone=asia-northeast3-a \ -> -- sudo docker exec speedcam-alert python -c " -> import django.db.backends.base.base as b -> import inspect -> print(inspect.getsource(b.BaseDatabaseWrapper.validate_thread_sharing))" -> ``` -> - **캡처 항목**: 실제 배포된 Django 버전의 `validate_thread_sharing()` 소스코드 출력 - -#### 원인 Layer 3: Greenlet ≠ Thread이지만 다른 Thread ID를 가짐 - +MonkeyPatchWarning: Monkey-patching ssl after ssl has already been imported +may lead to errors. Please monkey-patch earlier. +See https://github.com/gevent/gevent/issues/1016. ``` -[일반 스레드 모델] -Thread A (id=100) → DB Connection A (thread_ident=100) → ✅ 같은 ID -[Gevent Greenlet 모델] -Greenlet A (id=200) → DB Connection 생성 (thread_ident=200) - ↓ (예외 발생 → autoretry) -Greenlet B (id=300) → DB Connection 재사용 시도 (thread_ident=200) → ❌ ID 불일치! +``` +Exception ignored in: + File "gevent/threading.py", line 264, in _after_fork_in_child + assert len(active) == 1 +AssertionError ``` -gevent가 `monkey.patch_all()`을 실행하면: -1. `threading.local()`이 greenlet-local로 패치됨 -2. `_thread.get_ident()`가 greenlet ID를 반환하도록 패치됨 -3. 각 greenlet은 고유한 "스레드 ID"를 가지게 됨 + -#### 원인 Layer 4: `autoretry_for=(Exception,)`이 문제를 악화시키는가 +원인은 실행 순서다: ``` -Timeline: - t=0 Greenlet-A (id=200): send_notification(42) 시작 - t=1 Greenlet-A (id=200): DB connection 생성 (thread_ident=200) - t=2 Greenlet-A (id=200): Detection.objects.get() → 예외 발생 - t=3 [Celery autoretry] 태스크를 재시도 큐에 넣음 - t=4 Greenlet-B (id=300): send_notification(42) 재시도 시작 ← 다른 greenlet! - t=5 Greenlet-B (id=300): 기존 connection 접근 시도 - → thread_ident(200) ≠ current_ident(300) → 💥 에러! +1. opentelemetry-instrument 시작 → urllib3 → ssl import됨 +2. celery worker --pool=gevent 실행 +3. Celery가 gevent.monkey.patch_all() 호출 ← ssl은 이미 로드된 상태 ``` -> **📸 캡처 4**: Jaeger에서 retry span 확인 -> - URL: `http://34.47.70.132:16686` -> - `send_notification` 트레이스에서 retry가 발생한 span을 찾아 원본과 retry의 span 관계 확인 -> - **캡처 항목**: 원본 span → retry span 으로 이어지는 트레이스 타임라인 +gevent 공식 문서는 monkey-patching을 "가능한 한 빨리, 다른 import보다 먼저" 하라고 권고한다. 하지만 `opentelemetry-instrument` 래퍼가 먼저 실행되면서 패치 순서가 꼬인다. 이로 인해 threading 관련 패치가 불완전해지고, greenlet 간 DB 커넥션 격리가 더 불안정해진다. -#### OCR Worker는 왜 이 문제가 없는가 +### 4단계: 왜 OCR Worker는 괜찮은가 -| 설정 | Alert Worker | OCR Worker | -|------|-------------|------------| +| | Alert Worker | OCR Worker | +|---|---|---| | Pool | `gevent` (greenlet) | `prefork` (프로세스) | | Concurrency | 100 | 4 | -| 동시성 단위 | greenlet (같은 프로세스, 다른 "스레드" ID) | 별도 프로세스 (fork 후 새 커넥션) | -| `_thread.get_ident()` | greenlet마다 다름 | 프로세스마다 독립 | -| DB 커넥션 | greenlet 간 공유 위험 | 프로세스 격리로 안전 | +| 커넥션 격리 | greenlet 간 공유 위험 | 프로세스 격리로 안전 | -#### 원인 계층 요약 +prefork는 `fork()`로 별도 프로세스를 만든다. 프로세스마다 독립적인 메모리 공간을 가지므로 커넥션 공유 자체가 불가능하다. gevent만의 문제다. + +### 원인 요약 ``` -Layer 1: Late Monkey-Patching (opentelemetry-instrument → ssl 먼저 import) - ↓ 불완전한 threading 패치 -Layer 2: Django validate_thread_sharing() 스레드 격리 검증 - ↓ _thread.get_ident()로 커넥션 소유자 확인 -Layer 3: Greenlet마다 다른 Thread ID - ↓ greenlet-local 저장소에 커넥션 바인딩 -Layer 4: autoretry_for=(Exception,) 자동 재시도 - ↓ 다른 greenlet에서 재시도 → stale 커넥션 접근 - 💥 DatabaseWrapper thread-sharing error +opentelemetry-instrument가 ssl을 먼저 import + → gevent monkey-patching이 불완전하게 적용 + → greenlet마다 다른 thread ID 부여 + → autoretry 시 다른 greenlet이 태스크를 받음 + → 이전 greenlet의 DB 커넥션에 접근 + → Django validate_thread_sharing() 실패 + 💥 DatabaseWrapper thread-sharing error ``` -### 3-2. 해결 방안 비교 +### 해결: `db.close_old_connections()` -| 방안 | 설명 | 장점 | 단점 | 채택 | -|------|------|------|------|------| -| **A. `close_old_connections()` 호출** | 태스크 시작 시 기존 커넥션 닫기 | 간단, 비침습적 | 매 태스크마다 새 커넥션 오버헤드 | ✅ | -| **B. Celery Signal 사용** | `task_prerun`/`task_postrun` signal로 일괄 처리 | 태스크 코드 수정 없음 | 전역 영향, 디버깅 난이도 | ⚠️ 대안 | -| **C. `CONN_MAX_AGE = 0` 설정** | 매 요청마다 커넥션 닫기 | Django 표준 방식 | Celery에서는 "요청" 개념 없음, 불충분 | ❌ | -| **D. `inc_thread_sharing()` 사용** | 스레드 공유 허용 플래그 | 에러 해소 | race condition 위험, 비권장 | ❌ | -| **E. gevent → prefork 전환** | Pool 타입 변경 | 근본 해결 | I/O 집약 태스크에 비효율적 | ❌ | -| **F. OTel 래퍼 제거 후 코드 내 초기화** | monkey.patch_all()을 앱 진입점에서 먼저 호출 | Late patching 해소 | OTel 자동 계측 포기 | ⚠️ 장기 검토 | +검토한 방안들: -### 3-3. 채택한 해결 방안: `db.close_old_connections()` +| 방안 | 판단 | +|------|------| +| **`db.close_old_connections()` 호출** | ✅ 채택 — 간단하고 비침습적 | +| Celery Signal(`task_prerun`) 사용 | ⚠️ 대안 — 태스크 코드 수정 없이 전역 적용 가능하나 디버깅 어려움 | +| `CONN_MAX_AGE = 0` 설정 | ❌ — Celery에는 "요청" 개념이 없어 작동하지 않음 | +| `inc_thread_sharing()` | ❌ — race condition 위험, Django가 비권장 | +| gevent → prefork 전환 | ❌ — I/O 집약 태스크에서 비효율적 | +| OTel 래퍼 제거 후 코드 내 monkey-patch | ⚠️ 장기 검토 — late patching 근본 해결이나 자동 계측 포기 | -#### 수정 전 (현재 코드) +수정은 단순하다. 태스크 시작 시 stale 커넥션을 정리하면 된다: +**수정 전:** ```python -# tasks/notification_tasks.py @shared_task(bind=True, max_retries=3, autoretry_for=(Exception,), ...) def send_notification(self, detection_id: int): from apps.detections.models import Detection - from apps.notifications.models import Notification - from apps.vehicles.models import Vehicle - - try: - detection = Detection.objects.using("detections_db").get(...) # ← 💥 에러 발생 가능 + # ... + detection = Detection.objects.using("detections_db").get(...) # 💥 ``` -#### 수정 후 (적용 예정) - +**수정 후:** ```python -# tasks/notification_tasks.py -from django import db # ← 추가 +from django import db @shared_task(bind=True, max_retries=3, autoretry_for=(Exception,), ...) def send_notification(self, detection_id: int): - # Gevent greenlet 환경에서 스레드 간 DB 커넥션 공유 방지 - db.close_old_connections() # ← 추가: 이전 greenlet의 stale 커넥션 정리 + db.close_old_connections() # stale 커넥션 정리 → 새 커넥션으로 시작 from apps.detections.models import Detection - from apps.notifications.models import Notification - from apps.vehicles.models import Vehicle - - try: - detection = Detection.objects.using("detections_db").get(...) # ← ✅ 새 커넥션 사용 + # ... + detection = Detection.objects.using("detections_db").get(...) # ✅ ``` -#### `db.close_old_connections()`의 동작 원리 - -```python -# django/db/__init__.py -def close_old_connections(**kwargs): - for conn in connections.all(): - conn.close_if_unusable_or_obsolete() -``` +`close_old_connections()`는 모든 DB alias를 순회하며 사용 불가능하거나 수명이 초과된 커넥션을 닫는다. 이후 ORM 호출 시 현재 greenlet에서 새 커넥션이 생성되므로 `_thread_ident`가 일치하게 된다. -이 함수는: -1. 모든 DB alias(`default`, `detections_db`, `vehicles_db`, `notifications_db`)를 순회 -2. 각 커넥션이 **사용 불가능(unusable)**하거나 **수명 초과(obsolete)**인지 확인 -3. 해당되면 커넥션을 닫음 → 다음 ORM 호출 시 새 커넥션 자동 생성 + -greenlet 환경에서 이 함수가 효과적인 이유: -- 이전 greenlet이 남긴 stale 커넥션을 정리 -- 현재 greenlet에서 새 커넥션이 생성되므로 `_thread_ident`가 현재 greenlet ID와 일치 +--- -### 3-4. 유사 사례: Java Virtual Thread +## Result — Before & After -이 문제는 Python/Django에만 국한되지 않는다. Java의 Virtual Thread(Project Loom)에서도 유사한 범주의 문제가 발생할 수 있다: + -| 항목 | Python Gevent Greenlet | Java Virtual Thread | -|------|----------------------|---------------------| -| ThreadLocal 동작 | greenlet-local로 패치됨 | 가상 스레드마다 고유 ThreadLocal | -| DB 커넥션 관리 | Django의 thread-local 저장 | 보통 HikariCP 같은 글로벌 풀 사용 | -| 핵심 문제 | **thread ID 불일치로 validation 실패** | **Connection pinning** (carrier thread 고갈) | -| 프레임워크 차단 | Django가 명시적 validate | 명시적 차단 없음, 암묵적 race condition | + -**공통 근본 원인**: 경량 동시성 단위(greenlet/virtual thread)가 thread-local 스토리지 패턴과 충돌 + ---- + -## 4. Result (결과) - -### 4-1. 수정 적용 전 상태 (Before) - -> **📸 캡처 5**: 수정 전 Jaeger 에러 확인 -> - URL: `http://34.47.70.132:16686` -> - Service: `speedcam-alert`, Lookback: Last Hour -> - **캡처 항목**: `send_notification` 트레이스 목록에서 에러(빨간색) 표시된 span들 - -> **📸 캡처 6**: 수정 전 Grafana Logs Explorer -> - URL: `http://34.47.70.132:3000` → Logs Explorer 대시보드 -> - Container: `speedcam-alert` 선택 -> - Search: `DatabaseWrapper` -> - **캡처 항목**: "DatabaseWrapper objects created in a thread" 에러 로그 라인들 - -> **📸 캡처 7**: 수정 전 Alert Worker 컨테이너 로그 -> ```bash -> gcloud compute ssh speedcam-alert --zone=asia-northeast3-a \ -> -- sudo docker logs speedcam-alert 2>&1 | grep -i "DatabaseWrapper" | tail -10 -> ``` -> - **캡처 항목**: 터미널에서 grep한 DatabaseWrapper 에러 로그 - -### 4-2. 수정 적용 후 상태 (After) - -> **📸 캡처 8**: 수정 후 코드 diff 확인 -> ```bash -> git diff docs/gevent-db-thread-safety -- tasks/notification_tasks.py -> ``` -> - **캡처 항목**: `db.close_old_connections()` 추가된 diff 출력 - -> **📸 캡처 9**: 수정 후 Alert Worker 재배포 확인 -> ```bash -> gcloud compute ssh speedcam-alert --zone=asia-northeast3-a \ -> -- sudo docker compose -f ~/depoly/compose/docker-compose.alert.yml up -d --build -> ``` -> - **캡처 항목**: 컨테이너 재시작 로그 - -> **📸 캡처 10**: 수정 후 Jaeger 정상 트레이스 -> - URL: `http://34.47.70.132:16686` -> - Service: `speedcam-alert` -> - **캡처 항목**: `send_notification` 트레이스에 에러 없는 정상 span들 (수정 후 시점부터) - -> **📸 캡처 11**: 수정 후 Grafana Logs Explorer -> - Container: `speedcam-alert`, Search: `DatabaseWrapper` -> - **캡처 항목**: 수정 후 시점부터 DatabaseWrapper 에러 로그 없음 - -### 4-3. 영향 분석 - -| 항목 | Before | After | -|------|--------|-------| -| `send_notification` 에러율 | DatabaseWrapper 에러 반복 발생 | 에러 제거 | -| DB 커넥션 패턴 | stale 커넥션 재사용 시도 → 실패 | 매 태스크 시작 시 정리 → 새 커넥션 | -| OCR Worker 영향 | - | 없음 (prefork pool이므로 해당 없음) | -| 성능 오버헤드 | - | 미미 (커넥션 정리는 O(n) where n=DB alias 수=4) | +| | Before | After | +|---|---|---| +| `send_notification` 에러율 | DatabaseWrapper 에러 반복 | 에러 제거 | +| DB 커넥션 패턴 | stale 커넥션 재사용 → 실패 | 태스크 시작 시 정리 → 새 커넥션 | +| OCR Worker 영향 | — | 없음 (prefork pool) | +| 성능 오버헤드 | — | 미미 (DB alias 4개 순회) | --- -## 참고 자료 (References) +## References ### 공식 문서 -| 출처 | 링크 | 내용 | -|------|------|------| -| Django Databases | https://docs.djangoproject.com/en/5.1/ref/databases/ | CONN_MAX_AGE, 스레드 안전성, persistent connections | -| Celery Gevent Pool | https://docs.celeryq.dev/en/main/userguide/concurrency/gevent.html | gevent pool 공식 가이드 | -| Celery Workers Guide | https://docs.celeryq.dev/en/stable/userguide/workers.html | Worker 구성 및 pool 타입 | -| Gevent Monkey Patching | http://www.gevent.org/api/gevent.monkey.html | monkey patch API 레퍼런스 | +- [Django Databases — Persistent connections and thread safety](https://docs.djangoproject.com/en/5.1/ref/databases/#persistent-database-connections) +- [Celery — Concurrency with Gevent](https://docs.celeryq.dev/en/stable/userguide/concurrency/gevent.html) +- [Gevent — Monkey Patching](http://www.gevent.org/api/gevent.monkey.html) -### Django 소스코드 +### GitHub Issues -| 파일 | 내용 | -|------|------| -| `django/db/backends/base/base.py` | `validate_thread_sharing()` 구현 | -| `django/db/__init__.py` | `close_old_connections()` 구현 | -| [Django PR #10972](https://github.com/django/django/pull/10972) | Thread sharing 로직 개선 | -| [Django Ticket #30171](https://code.djangoproject.com/ticket/30171) | Threading 에러 수정 | -| [Django Ticket #25714](https://code.djangoproject.com/ticket/25714) | DatabaseWrapper 스레드 에러 | +- [Celery #4489](https://github.com/celery/celery/issues/4489) — Workers inherit DatabaseWrapper during fork, thread ID mismatch +- [Celery #2453](https://github.com/celery/celery/issues/2453) — Celery not closing database connections properly +- [Gunicorn #879](https://github.com/benoitc/gunicorn/issues/879) — DatabaseWrapper thread error with gevent worker -### GitHub Issues (동일 문제 보고) - -| 이슈 | 내용 | -|------|------| -| [Gunicorn #879](https://github.com/benoitc/gunicorn/issues/879) | Gunicorn + Django + Gevent에서 동일 에러 | -| [Celery #4489](https://github.com/celery/celery/issues/4489) | Celery에서 DatabaseWrapper 스레드 에러 | -| [Celery #2453](https://github.com/celery/celery/issues/2453) | Celery가 DB 커넥션을 닫지 않는 문제 | -| [Celery #5924](https://github.com/celery/celery/issues/5924) | Django + eventlet 패치 깨짐 | -| [Celery #3520](https://github.com/celery/celery/issues/3520) | Prefork에서 스레드 에러 | - -### 기술 블로그 및 아티클 - -| 출처 | 링크 | 내용 | -|------|------|------| -| Celery School | https://celery.school/celery-gevent-5-lessons-learned | Gevent Pool 5가지 교훈 | -| DoorDash Engineering | https://doordash.engineering/2021/01/19/scaling-efficienc-of-a-python-service-with-gevent/ | 프로덕션 gevent 확장 사례 | -| Medium (Celery Workers) | https://medium.com/@gupta.rishabh2912/mastering-celery-workers-in-django-when-to-use-prefork-eventlet-or-gevent-2679cffae2bd | Pool 타입 비교 가이드 | -| Vinta Software | https://www.vintasoftware.com/blog/guide-django-celery-tasks | Django Celery 고급 가이드 | -| LimeChat | https://limechat.ai/blog/scaling-django-server-celery-workers | Django + Celery 스케일링 전략 | -| Heroku | https://devcenter.heroku.com/articles/python-concurrency-and-database-connections | Python 동시성 + DB 커넥션 best practices | +### 기술 블로그 ---- - -## 부록 - -### A. 확인 명령어 모음 - -```bash -# 1. Alert Worker 로그에서 에러 확인 -gcloud compute ssh speedcam-alert --zone=asia-northeast3-a \ - -- sudo docker logs speedcam-alert 2>&1 | grep -i "DatabaseWrapper" - -# 2. Jaeger에서 에러 트레이스 API 조회 -curl -s "http://34.47.70.132:16686/api/traces?service=speedcam-alert&operation=send_notification&limit=20" \ - | python3 -m json.tool | grep -A2 "error" - -# 3. Django validate_thread_sharing 소스 확인 -gcloud compute ssh speedcam-alert --zone=asia-northeast3-a \ - -- sudo docker exec speedcam-alert python -c " -import django.db.backends.base.base as b -import inspect -print(inspect.getsource(b.BaseDatabaseWrapper.validate_thread_sharing))" - -# 4. 현재 greenlet 수 확인 (gevent pool 상태) -gcloud compute ssh speedcam-alert --zone=asia-northeast3-a \ - -- sudo docker exec speedcam-alert python -c " -import gevent -print(f'Current greenlet count: {len(gevent.hub.get_hub().loop._callbacks)}')" - -# 5. MySQL 커넥션 수 확인 -gcloud compute ssh speedcam-db --zone=asia-northeast3-a \ - -- sudo docker exec speedcam-mysql mysql -usa -p'' \ - -e "SHOW STATUS LIKE 'Threads_connected';" -``` - -### B. 관련 프로젝트 파일 경로 - -| 파일 | 역할 | -|------|------| -| `tasks/notification_tasks.py` | 알림 전송 태스크 (문제 발생 지점) | -| `tasks/ocr_tasks.py` | OCR 처리 태스크 (prefork pool, 참고용) | -| `scripts/start_alert_worker.sh` | Alert Worker 시작 스크립트 | -| `scripts/start_ocr_worker.sh` | OCR Worker 시작 스크립트 | -| `config/settings/prod.py` | 프로덕션 DB 설정 (CONN_MAX_AGE 미설정) | -| `config/settings/base.py` | Celery 공통 설정 | +- [DoorDash Engineering — Scaling Efficiency of a Python Service with Gevent](https://doordash.engineering/2021/01/19/scaling-efficienc-of-a-python-service-with-gevent/) +- [Heroku — Python Concurrency and Database Connections](https://devcenter.heroku.com/articles/python-concurrency-and-database-connections) +- [Celery School — The Gevent Pool: 5 Lessons Learned](https://celery.school/celery-gevent-5-lessons-learned) From f915a070db36b8557202fe055b248c34b7082ed8 Mon Sep 17 00:00:00 2001 From: sanghun Date: Fri, 13 Feb 2026 09:09:46 +0900 Subject: [PATCH 04/11] docs: add mermaid diagrams, visible capture placeholders, trade-off analysis - Add 5 mermaid diagrams: prefork vs gevent comparison, sequence diagram for greenlet retry failure, late patching flow, cause layer diagram, quadrant chart for solution trade-offs - Make capture placeholders visible (blockquote format, not HTML comments) - Add personal Spring developer perspective on monkey-patching - Expand solution section with detailed trade-off analysis per option --- docs/GEVENT_DB_THREAD_SAFETY.md | 193 +++++++++++++++++++++----------- 1 file changed, 128 insertions(+), 65 deletions(-) diff --git a/docs/GEVENT_DB_THREAD_SAFETY.md b/docs/GEVENT_DB_THREAD_SAFETY.md index 9023890..c6bed9c 100644 --- a/docs/GEVENT_DB_THREAD_SAFETY.md +++ b/docs/GEVENT_DB_THREAD_SAFETY.md @@ -27,12 +27,25 @@ The object with alias 'detections_db' was created in thread id 35839779840 and this is thread id 35804459008. ``` - +> **📸 캡처 1**: Jaeger에서 `send_notification` 에러 span +> - `http://34.47.70.132:16686` → Service: `speedcam-alert`, Operation: `send_notification` +> - 에러 span 클릭 → Logs 탭의 에러 메시지 전문 같은 프로젝트의 OCR 워커(`--pool=prefork --concurrency=4`)는 멀쩡했다. Alert 워커만 터지고 있었다. +```mermaid +graph LR + subgraph "정상: OCR Worker (prefork)" + P1[Process A] -->|독립 커넥션| DB1[(MySQL)] + P2[Process B] -->|독립 커넥션| DB1 + end + subgraph "문제: Alert Worker (gevent)" + G1[Greenlet A] -.->|stale 커넥션 공유| DB2[(MySQL)] + G2[Greenlet B] -.->|💥 thread ID 불일치| DB2 + end + style G2 fill:#ff6b6b,color:#fff +``` + --- ## Task — 왜 greenlet에서만 터지는가 @@ -43,6 +56,8 @@ and this is thread id 35804459008. 2. gevent monkey-patching이 그 격리를 어떻게 무너뜨리는지 3. Celery의 autoretry가 왜 상황을 악화시키는지 +솔직히 말하면 이 문제를 처음 마주했을 때 꽤 당황스러웠다. 나는 주로 Spring 기반으로 개발해왔기 때문에 HikariCP 같은 글로벌 커넥션 풀에 익숙했고, 런타임에 표준 라이브러리를 통째로 바꿔치는 monkey-patching이라는 개념 자체가 낯설었다. Spring에서는 Virtual Thread를 써도 커넥션 풀이 스레드와 무관하게 동작하는데, Django는 커넥션을 스레드에 바인딩한다. 이 차이를 이해하는 과정이 오히려 두 프레임워크의 동시성 모델을 더 깊이 비교할 수 있는 계기가 되었다. + --- ## Action — 원인 추적과 해결 @@ -62,28 +77,35 @@ def validate_thread_sharing(self): ) ``` -커넥션이 생성될 때의 `_thread.get_ident()` 값을 `_thread_ident`에 저장해두고, 쿼리 실행 시점의 ID와 비교한다. 다르면 즉시 예외를 던진다. - -일반적인 멀티스레드 환경에서는 당연히 잘 동작한다. 각 스레드는 고유한 ID를 가지고, `threading.local()`이 스레드별 커넥션을 격리하니까. +커넥션이 생성될 때의 `_thread.get_ident()` 값을 저장해두고, 쿼리 실행 시점의 ID와 비교한다. 다르면 즉시 예외를 던진다. 일반적인 멀티스레드 환경에서는 당연히 잘 동작한다. 각 스레드는 고유한 ID를 가지고, `threading.local()`이 스레드별 커넥션을 격리하니까. ### 2단계: gevent가 바꿔놓은 규칙 -gevent의 `monkey.patch_all()`은 `_thread.get_ident()`를 패치해서 **greenlet ID를 반환**하도록 바꾼다. 즉, greenlet마다 다른 "스레드 ID"를 갖게 된다. - -``` -[일반 스레드] -Thread A (id=100) → DB Connection (thread_ident=100) → 쿼리 시 get_ident()=100 ✅ - -[Gevent Greenlet] -Greenlet A (id=200) → DB Connection (thread_ident=200) - ↓ 예외 발생 → autoretry -Greenlet B (id=300) → 같은 Connection 접근 → get_ident()=300 ≠ 200 ❌ +gevent의 `monkey.patch_all()`은 `_thread.get_ident()`를 패치해서 **greenlet ID를 반환**하도록 바꾼다. 즉 greenlet마다 다른 "스레드 ID"를 갖게 된다. + +```mermaid +sequenceDiagram + participant Q as RabbitMQ (fcm_queue) + participant GA as Greenlet A (id=200) + participant GB as Greenlet B (id=300) + participant DB as MySQL (detections_db) + + Q->>GA: send_notification(42) 배정 + GA->>DB: Connection 생성 (thread_ident=200) + GA->>DB: Detection.objects.get() 쿼리 + DB-->>GA: ❌ 예외 발생 + Note over GA: autoretry_for=(Exception,)
→ 태스크를 큐에 재등록 + + GA->>Q: 재시도 요청 + Q->>GB: send_notification(42) 재배정 + GB->>DB: 기존 Connection 접근 시도
thread_ident=200 ≠ get_ident()=300 + Note over GB,DB: 💥 DatabaseWrapper
thread-sharing error ``` 여기서 `autoretry_for=(Exception,)` 설정이 문제를 악화시킨다. Celery의 자동 재시도는 태스크를 큐에 다시 넣고, **다른 greenlet이 그것을 집어간다.** 이전 greenlet이 남긴 DB 커넥션에 새 greenlet이 접근하는 순간 Django의 검증에 걸린다. - +> **📸 캡처 2**: Jaeger에서 retry span 관계 확인 +> - `send_notification` 트레이스에서 원본 span → retry span 이어지는 타임라인 ### 3단계: Late Monkey-Patching이라는 복병 @@ -102,56 +124,97 @@ Exception ignored in: AssertionError ``` - +> **📸 캡처 3**: Alert Worker 시작 로그 +> - `docker logs speedcam-alert 2>&1 | head -20` +> - `MonkeyPatchWarning`과 `AssertionError`가 보이는 구간 원인은 실행 순서다: -``` -1. opentelemetry-instrument 시작 → urllib3 → ssl import됨 -2. celery worker --pool=gevent 실행 -3. Celery가 gevent.monkey.patch_all() 호출 ← ssl은 이미 로드된 상태 +```mermaid +flowchart LR + A["opentelemetry-instrument
시작"] --> B["urllib3 import
→ ssl 로드됨"] + B --> C["celery worker
--pool=gevent"] + C --> D["gevent.monkey
.patch_all()"] + D --> E["⚠️ ssl은
이미 import됨"] + style E fill:#ff6b6b,color:#fff ``` gevent 공식 문서는 monkey-patching을 "가능한 한 빨리, 다른 import보다 먼저" 하라고 권고한다. 하지만 `opentelemetry-instrument` 래퍼가 먼저 실행되면서 패치 순서가 꼬인다. 이로 인해 threading 관련 패치가 불완전해지고, greenlet 간 DB 커넥션 격리가 더 불안정해진다. -### 4단계: 왜 OCR Worker는 괜찮은가 +### 원인 요약 -| | Alert Worker | OCR Worker | -|---|---|---| -| Pool | `gevent` (greenlet) | `prefork` (프로세스) | -| Concurrency | 100 | 4 | -| 커넥션 격리 | greenlet 간 공유 위험 | 프로세스 격리로 안전 | +```mermaid +flowchart TB + L1["Layer 1: Late Monkey-Patching
opentelemetry-instrument → ssl 먼저 import"] + L2["Layer 2: Django validate_thread_sharing()
_thread.get_ident()로 커넥션 소유자 확인"] + L3["Layer 3: Greenlet별 다른 Thread ID
monkey-patch가 greenlet ID를 반환"] + L4["Layer 4: autoretry
다른 greenlet이 재시도 태스크를 받음"] + ERR["💥 DatabaseWrapper thread-sharing error"] + + L1 -->|불완전한 threading 패치| L2 + L2 -->|스레드 격리 검증| L3 + L3 -->|stale 커넥션 접근| L4 + L4 --> ERR + + style L1 fill:#ffeaa7 + style ERR fill:#ff6b6b,color:#fff +``` -prefork는 `fork()`로 별도 프로세스를 만든다. 프로세스마다 독립적인 메모리 공간을 가지므로 커넥션 공유 자체가 불가능하다. gevent만의 문제다. +### 해결: Trade-off를 따져보다 -### 원인 요약 +원인을 파악했으니 해결 방안을 검토했다. 핵심은 **"어떤 비용을 감수할 것인가"**였다. +**방안 A. `db.close_old_connections()` — 태스크 시작 시 호출** + +매 태스크가 실행될 때마다 모든 DB alias의 stale 커넥션을 닫는다. 다음 ORM 호출 시 현재 greenlet에서 새 커넥션이 생성된다. +- *Trade-off*: 매번 커넥션을 새로 맺는 오버헤드가 발생한다. 하지만 Alert Worker의 태스크는 FCM 전송(네트워크 I/O)이 병목이므로 DB 커넥션 생성 비용(~1ms)은 무시할 수 있는 수준이다. + +**방안 B. Celery Signal(`task_prerun`) — 전역 훅** + +```python +@signals.task_prerun.connect +def close_db_connections(**kwargs): + close_old_connections() ``` -opentelemetry-instrument가 ssl을 먼저 import - → gevent monkey-patching이 불완전하게 적용 - → greenlet마다 다른 thread ID 부여 - → autoretry 시 다른 greenlet이 태스크를 받음 - → 이전 greenlet의 DB 커넥션에 접근 - → Django validate_thread_sharing() 실패 - 💥 DatabaseWrapper thread-sharing error -``` -### 해결: `db.close_old_connections()` +태스크 코드를 수정하지 않아도 되지만, 모든 태스크에 전역으로 적용된다. 문제는 OCR Worker처럼 이 이슈가 없는 워커에도 불필요하게 커넥션을 닫게 된다는 점이다. 그리고 Signal은 암묵적으로 동작하기 때문에 나중에 "왜 커넥션이 자꾸 끊기지?"라는 디버깅 지옥에 빠질 수 있다. +- *Trade-off*: 코드 수정 없음 vs. 전역 부작용 + 디버깅 난이도 + +**방안 C. `CONN_MAX_AGE = 0` 설정** + +Django의 표준적인 커넥션 수명 관리. 하지만 이 설정은 **요청-응답 사이클이 끝날 때** 커넥션을 닫는 방식이다. Celery 태스크에는 "요청"이라는 개념이 없으므로 아예 트리거되지 않는다. +- *Trade-off*: 해당 없음 — 동작하지 않음 + +**방안 D. `inc_thread_sharing()` — 스레드 공유 허용** -검토한 방안들: +Django가 제공하는 escape hatch. 커넥션의 스레드 검증을 끈다. 에러는 사라지지만, 여러 greenlet이 같은 커넥션을 동시에 사용할 수 있게 되어 race condition 위험이 생긴다. Django 공식 문서도 이 방식을 권장하지 않는다. +- *Trade-off*: 에러 해소 vs. 데이터 정합성 위험 -| 방안 | 판단 | -|------|------| -| **`db.close_old_connections()` 호출** | ✅ 채택 — 간단하고 비침습적 | -| Celery Signal(`task_prerun`) 사용 | ⚠️ 대안 — 태스크 코드 수정 없이 전역 적용 가능하나 디버깅 어려움 | -| `CONN_MAX_AGE = 0` 설정 | ❌ — Celery에는 "요청" 개념이 없어 작동하지 않음 | -| `inc_thread_sharing()` | ❌ — race condition 위험, Django가 비권장 | -| gevent → prefork 전환 | ❌ — I/O 집약 태스크에서 비효율적 | -| OTel 래퍼 제거 후 코드 내 monkey-patch | ⚠️ 장기 검토 — late patching 근본 해결이나 자동 계측 포기 | +**방안 E. gevent → prefork 전환** + +근본적으로 gevent를 쓰지 않으면 문제 자체가 없다. 하지만 Alert Worker는 FCM 푸시라는 I/O 바운드 작업에 특화되어 있고, prefork의 프로세스 기반 모델은 동시 100개 처리에 메모리 비효율적이다. +- *Trade-off*: 문제 근본 해결 vs. I/O 집약 워크로드에 부적합 + +**방안 F. OTel 래퍼 제거 후 코드 내 monkey-patch** + +Late patching의 근본 원인을 해결할 수 있다. 하지만 `opentelemetry-instrument`가 제공하는 자동 계측(auto-instrumentation)을 포기해야 한다. 직접 계측 코드를 작성해야 하며, 유지보수 부담이 생긴다. +- *Trade-off*: late patching 해소 vs. OTel 자동 계측 포기 + +**결론: 방안 A를 채택했다.** + +```mermaid +quadrantChart + title 해결 방안 Trade-off 비교 + x-axis 낮은 침습성 --> 높은 침습성 + y-axis 낮은 안정성 --> 높은 안정성 + A. close_old_connections: [0.25, 0.78] + B. Celery Signal: [0.2, 0.6] + D. inc_thread_sharing: [0.35, 0.2] + E. prefork 전환: [0.85, 0.9] + F. OTel 래퍼 제거: [0.75, 0.75] +``` -수정은 단순하다. 태스크 시작 시 stale 커넥션을 정리하면 된다: +가장 실용적인 선택이다. 코드 변경이 한 줄이고, 영향 범위가 해당 태스크로 한정되며, 커넥션 재생성 비용은 FCM 네트워크 I/O 대비 무시할 수 있다. "왜 이 코드가 있는지"도 주석 한 줄이면 충분하다. **수정 전:** ```python @@ -168,7 +231,7 @@ from django import db @shared_task(bind=True, max_retries=3, autoretry_for=(Exception,), ...) def send_notification(self, detection_id: int): - db.close_old_connections() # stale 커넥션 정리 → 새 커넥션으로 시작 + db.close_old_connections() # gevent greenlet 간 stale 커넥션 정리 from apps.detections.models import Detection # ... @@ -177,26 +240,26 @@ def send_notification(self, detection_id: int): `close_old_connections()`는 모든 DB alias를 순회하며 사용 불가능하거나 수명이 초과된 커넥션을 닫는다. 이후 ORM 호출 시 현재 greenlet에서 새 커넥션이 생성되므로 `_thread_ident`가 일치하게 된다. - +> **📸 캡처 4**: 수정 코드 diff +> - `git diff -- tasks/notification_tasks.py` --- ## Result — Before & After - +> **📸 캡처 5**: [Before] Jaeger 에러 트레이스 목록 +> - `http://34.47.70.132:16686` → Service: `speedcam-alert` +> - `send_notification` 에러(빨간색) span들이 보이는 목록 - +> **📸 캡처 6**: [Before] Grafana Logs Explorer +> - `http://34.47.70.132:3000` → Logs Explorer +> - Container: `speedcam-alert`, Search: `DatabaseWrapper` - +> **📸 캡처 7**: [After] Jaeger 정상 트레이스 +> - 수정 배포 후 `send_notification` 에러 없는 정상 span들 - +> **📸 캡처 8**: [After] Grafana Logs Explorer +> - 수정 후 `DatabaseWrapper` 에러 로그 없음 확인 | | Before | After | |---|---|---| From c33398697e723dd364f8a3d8546db00e7d1b845b Mon Sep 17 00:00:00 2001 From: sanghun Date: Fri, 13 Feb 2026 09:20:03 +0900 Subject: [PATCH 05/11] docs: rewrite solution section as narrative with natural trade-off flow Replace structured method-by-method format with storytelling approach where trade-offs emerge naturally through the elimination process. --- docs/GEVENT_DB_THREAD_SAFETY.md | 44 ++++++--------------------------- 1 file changed, 7 insertions(+), 37 deletions(-) diff --git a/docs/GEVENT_DB_THREAD_SAFETY.md b/docs/GEVENT_DB_THREAD_SAFETY.md index c6bed9c..0c9e4a0 100644 --- a/docs/GEVENT_DB_THREAD_SAFETY.md +++ b/docs/GEVENT_DB_THREAD_SAFETY.md @@ -160,47 +160,19 @@ flowchart TB style ERR fill:#ff6b6b,color:#fff ``` -### 해결: Trade-off를 따져보다 +### 해결: 어떻게 고칠 것인가 -원인을 파악했으니 해결 방안을 검토했다. 핵심은 **"어떤 비용을 감수할 것인가"**였다. +원인은 찾았다. 이제 고쳐야 하는데, 가장 먼저 떠오른 건 아예 gevent를 걷어내는 것이었다. prefork로 바꾸면 문제 자체가 사라진다. 하지만 Alert Worker는 FCM 푸시라는 I/O 바운드 작업에 특화되어 있다. prefork로 동시 100개를 처리하려면 프로세스 100개가 필요하고, 그건 메모리 낭비다. gevent를 쓰는 이유가 있었다. -**방안 A. `db.close_old_connections()` — 태스크 시작 시 호출** +그러면 Django 쪽에서 검증을 꺼버릴까? `inc_thread_sharing()`이라는 escape hatch가 있다. 커넥션의 스레드 공유를 허용하는 플래그다. 에러는 당장 사라지겠지만, 여러 greenlet이 같은 커넥션을 동시에 사용할 수 있게 된다. race condition으로 데이터가 꼬일 위험을 안고 가는 셈이다. Django 공식 문서도 이 방식을 권장하지 않는다. -매 태스크가 실행될 때마다 모든 DB alias의 stale 커넥션을 닫는다. 다음 ORM 호출 시 현재 greenlet에서 새 커넥션이 생성된다. -- *Trade-off*: 매번 커넥션을 새로 맺는 오버헤드가 발생한다. 하지만 Alert Worker의 태스크는 FCM 전송(네트워크 I/O)이 병목이므로 DB 커넥션 생성 비용(~1ms)은 무시할 수 있는 수준이다. +`CONN_MAX_AGE = 0`도 잠깐 고려했는데, 이건 Django의 요청-응답 사이클이 끝날 때 커넥션을 닫는 방식이다. Celery 태스크에는 "요청"이라는 개념 자체가 없으므로 트리거되지 않는다. -**방안 B. Celery Signal(`task_prerun`) — 전역 훅** +Late patching을 근본적으로 해결하려면 `opentelemetry-instrument` 래퍼를 제거하고 코드 내에서 `monkey.patch_all()`을 직접 먼저 호출하는 방법도 있다. 하지만 그러면 OTel 자동 계측을 포기해야 하고, 직접 계측 코드를 유지보수해야 한다. 모니터링 인프라를 한창 구축하는 시점에 자동 계측을 포기하는 건 배보다 배꼽이 더 크다. -```python -@signals.task_prerun.connect -def close_db_connections(**kwargs): - close_old_connections() -``` - -태스크 코드를 수정하지 않아도 되지만, 모든 태스크에 전역으로 적용된다. 문제는 OCR Worker처럼 이 이슈가 없는 워커에도 불필요하게 커넥션을 닫게 된다는 점이다. 그리고 Signal은 암묵적으로 동작하기 때문에 나중에 "왜 커넥션이 자꾸 끊기지?"라는 디버깅 지옥에 빠질 수 있다. -- *Trade-off*: 코드 수정 없음 vs. 전역 부작용 + 디버깅 난이도 - -**방안 C. `CONN_MAX_AGE = 0` 설정** - -Django의 표준적인 커넥션 수명 관리. 하지만 이 설정은 **요청-응답 사이클이 끝날 때** 커넥션을 닫는 방식이다. Celery 태스크에는 "요청"이라는 개념이 없으므로 아예 트리거되지 않는다. -- *Trade-off*: 해당 없음 — 동작하지 않음 - -**방안 D. `inc_thread_sharing()` — 스레드 공유 허용** +Celery Signal(`task_prerun`)로 전역 훅을 거는 것도 깔끔해 보였다. 태스크 코드를 안 건드려도 되니까. 하지만 이건 OCR Worker처럼 이 이슈가 없는 워커에도 매번 커넥션을 닫게 된다. 그리고 Signal은 암묵적으로 동작하기 때문에 나중에 "왜 커넥션이 자꾸 끊기지?"라는 디버깅 지옥에 빠질 수 있다. -Django가 제공하는 escape hatch. 커넥션의 스레드 검증을 끈다. 에러는 사라지지만, 여러 greenlet이 같은 커넥션을 동시에 사용할 수 있게 되어 race condition 위험이 생긴다. Django 공식 문서도 이 방식을 권장하지 않는다. -- *Trade-off*: 에러 해소 vs. 데이터 정합성 위험 - -**방안 E. gevent → prefork 전환** - -근본적으로 gevent를 쓰지 않으면 문제 자체가 없다. 하지만 Alert Worker는 FCM 푸시라는 I/O 바운드 작업에 특화되어 있고, prefork의 프로세스 기반 모델은 동시 100개 처리에 메모리 비효율적이다. -- *Trade-off*: 문제 근본 해결 vs. I/O 집약 워크로드에 부적합 - -**방안 F. OTel 래퍼 제거 후 코드 내 monkey-patch** - -Late patching의 근본 원인을 해결할 수 있다. 하지만 `opentelemetry-instrument`가 제공하는 자동 계측(auto-instrumentation)을 포기해야 한다. 직접 계측 코드를 작성해야 하며, 유지보수 부담이 생긴다. -- *Trade-off*: late patching 해소 vs. OTel 자동 계측 포기 - -**결론: 방안 A를 채택했다.** +결국 가장 단순한 방법으로 돌아왔다. **태스크 시작 시 `db.close_old_connections()`를 호출**하는 것이다. 매번 커넥션을 새로 맺는 오버헤드가 있지만, Alert Worker의 병목은 FCM 네트워크 I/O다. DB 커넥션 생성 비용(~1ms)은 그에 비하면 무시할 수 있다. 코드 변경은 한 줄이고, 영향 범위는 해당 태스크로 한정되며, "왜 이 코드가 있는지"도 주석 한 줄이면 충분하다. ```mermaid quadrantChart @@ -214,8 +186,6 @@ quadrantChart F. OTel 래퍼 제거: [0.75, 0.75] ``` -가장 실용적인 선택이다. 코드 변경이 한 줄이고, 영향 범위가 해당 태스크로 한정되며, 커넥션 재생성 비용은 FCM 네트워크 I/O 대비 무시할 수 있다. "왜 이 코드가 있는지"도 주석 한 줄이면 충분하다. - **수정 전:** ```python @shared_task(bind=True, max_retries=3, autoretry_for=(Exception,), ...) From 5c057675e0f9f795831d43ac2198dad5ab457c27 Mon Sep 17 00:00:00 2001 From: sanghun Date: Fri, 13 Feb 2026 09:25:36 +0900 Subject: [PATCH 06/11] docs: remove inapplicable options and re-number trade-off comparison A-D --- docs/GEVENT_DB_THREAD_SAFETY.md | 37 ++++++++++++++++++++++++--------- 1 file changed, 27 insertions(+), 10 deletions(-) diff --git a/docs/GEVENT_DB_THREAD_SAFETY.md b/docs/GEVENT_DB_THREAD_SAFETY.md index 0c9e4a0..a2c247b 100644 --- a/docs/GEVENT_DB_THREAD_SAFETY.md +++ b/docs/GEVENT_DB_THREAD_SAFETY.md @@ -160,19 +160,37 @@ flowchart TB style ERR fill:#ff6b6b,color:#fff ``` -### 해결: 어떻게 고칠 것인가 +### 해결 방안 비교 -원인은 찾았다. 이제 고쳐야 하는데, 가장 먼저 떠오른 건 아예 gevent를 걷어내는 것이었다. prefork로 바꾸면 문제 자체가 사라진다. 하지만 Alert Worker는 FCM 푸시라는 I/O 바운드 작업에 특화되어 있다. prefork로 동시 100개를 처리하려면 프로세스 100개가 필요하고, 그건 메모리 낭비다. gevent를 쓰는 이유가 있었다. +원인을 파악했으니 해결 방안을 검토했다. 핵심은 **"어떤 비용을 감수할 것인가"**였다. -그러면 Django 쪽에서 검증을 꺼버릴까? `inc_thread_sharing()`이라는 escape hatch가 있다. 커넥션의 스레드 공유를 허용하는 플래그다. 에러는 당장 사라지겠지만, 여러 greenlet이 같은 커넥션을 동시에 사용할 수 있게 된다. race condition으로 데이터가 꼬일 위험을 안고 가는 셈이다. Django 공식 문서도 이 방식을 권장하지 않는다. +**방안 A. `db.close_old_connections()` — 태스크 시작 시 호출** -`CONN_MAX_AGE = 0`도 잠깐 고려했는데, 이건 Django의 요청-응답 사이클이 끝날 때 커넥션을 닫는 방식이다. Celery 태스크에는 "요청"이라는 개념 자체가 없으므로 트리거되지 않는다. +매 태스크가 실행될 때마다 모든 DB alias의 stale 커넥션을 닫는다. 다음 ORM 호출 시 현재 greenlet에서 새 커넥션이 생성된다. +- *Trade-off*: 매번 커넥션을 새로 맺는 오버헤드가 발생한다. 하지만 Alert Worker의 태스크는 FCM 전송(네트워크 I/O)이 병목이므로 DB 커넥션 생성 비용(~1ms)은 무시할 수 있는 수준이다. -Late patching을 근본적으로 해결하려면 `opentelemetry-instrument` 래퍼를 제거하고 코드 내에서 `monkey.patch_all()`을 직접 먼저 호출하는 방법도 있다. 하지만 그러면 OTel 자동 계측을 포기해야 하고, 직접 계측 코드를 유지보수해야 한다. 모니터링 인프라를 한창 구축하는 시점에 자동 계측을 포기하는 건 배보다 배꼽이 더 크다. +**방안 B. Celery Signal(`task_prerun`) — 전역 훅** -Celery Signal(`task_prerun`)로 전역 훅을 거는 것도 깔끔해 보였다. 태스크 코드를 안 건드려도 되니까. 하지만 이건 OCR Worker처럼 이 이슈가 없는 워커에도 매번 커넥션을 닫게 된다. 그리고 Signal은 암묵적으로 동작하기 때문에 나중에 "왜 커넥션이 자꾸 끊기지?"라는 디버깅 지옥에 빠질 수 있다. +```python +@signals.task_prerun.connect +def close_db_connections(**kwargs): + close_old_connections() +``` + +태스크 코드를 수정하지 않아도 되지만, 모든 태스크에 전역으로 적용된다. 문제는 OCR Worker처럼 이 이슈가 없는 워커에도 불필요하게 커넥션을 닫게 된다는 점이다. 그리고 Signal은 암묵적으로 동작하기 때문에 나중에 "왜 커넥션이 자꾸 끊기지?"라는 디버깅 지옥에 빠질 수 있다. +- *Trade-off*: 코드 수정 없음 vs. 전역 부작용 + 디버깅 난이도 + +**방안 C. `inc_thread_sharing()` — 스레드 공유 허용** + +Django가 제공하는 escape hatch. 커넥션의 스레드 검증을 끈다. 에러는 사라지지만, 여러 greenlet이 같은 커넥션을 동시에 사용할 수 있게 되어 race condition 위험이 생긴다. Django 공식 문서도 이 방식을 권장하지 않는다. +- *Trade-off*: 에러 해소 vs. 데이터 정합성 위험 + +**방안 D. gevent → prefork 전환** + +근본적으로 gevent를 쓰지 않으면 문제 자체가 없다. 하지만 Alert Worker는 FCM 푸시라는 I/O 바운드 작업에 특화되어 있고, prefork의 프로세스 기반 모델은 동시 100개 처리에 메모리 비효율적이다. +- *Trade-off*: 문제 근본 해결 vs. I/O 집약 워크로드에 부적합 -결국 가장 단순한 방법으로 돌아왔다. **태스크 시작 시 `db.close_old_connections()`를 호출**하는 것이다. 매번 커넥션을 새로 맺는 오버헤드가 있지만, Alert Worker의 병목은 FCM 네트워크 I/O다. DB 커넥션 생성 비용(~1ms)은 그에 비하면 무시할 수 있다. 코드 변경은 한 줄이고, 영향 범위는 해당 태스크로 한정되며, "왜 이 코드가 있는지"도 주석 한 줄이면 충분하다. +**결론: 방안 A를 채택했다.** 코드 변경이 한 줄이고, 영향 범위가 해당 태스크로 한정되며, 커넥션 재생성 비용은 FCM 네트워크 I/O 대비 무시할 수 있다. ```mermaid quadrantChart @@ -181,9 +199,8 @@ quadrantChart y-axis 낮은 안정성 --> 높은 안정성 A. close_old_connections: [0.25, 0.78] B. Celery Signal: [0.2, 0.6] - D. inc_thread_sharing: [0.35, 0.2] - E. prefork 전환: [0.85, 0.9] - F. OTel 래퍼 제거: [0.75, 0.75] + C. inc_thread_sharing: [0.35, 0.2] + D. prefork 전환: [0.85, 0.9] ``` **수정 전:** From 39af995e726d05a82049f6d4bcbbecab6650c447 Mon Sep 17 00:00:00 2001 From: sanghun Date: Fri, 13 Feb 2026 09:29:09 +0900 Subject: [PATCH 07/11] docs: add local vs deployment context to explain why error only appears in prod Local runs celery directly (proper monkey-patch order), deployment uses opentelemetry-instrument wrapper which imports ssl before gevent patches. --- docs/GEVENT_DB_THREAD_SAFETY.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/docs/GEVENT_DB_THREAD_SAFETY.md b/docs/GEVENT_DB_THREAD_SAFETY.md index a2c247b..9d1caeb 100644 --- a/docs/GEVENT_DB_THREAD_SAFETY.md +++ b/docs/GEVENT_DB_THREAD_SAFETY.md @@ -31,7 +31,7 @@ and this is thread id 35804459008. > - `http://34.47.70.132:16686` → Service: `speedcam-alert`, Operation: `send_notification` > - 에러 span 클릭 → Logs 탭의 에러 메시지 전문 -같은 프로젝트의 OCR 워커(`--pool=prefork --concurrency=4`)는 멀쩡했다. Alert 워커만 터지고 있었다. +혼란스러웠던 건, 로컬 개발 환경에서는 이 에러가 한 번도 발생하지 않았다는 점이다. 같은 코드, 같은 gevent pool인데 로컬에서는 멀쩡하고 GCP에 배포하니까 터졌다. 그리고 같은 프로젝트의 OCR 워커(`--pool=prefork --concurrency=4`)도 멀쩡했다. Alert 워커만 터지고 있었다. ```mermaid graph LR @@ -139,7 +139,9 @@ flowchart LR style E fill:#ff6b6b,color:#fff ``` -gevent 공식 문서는 monkey-patching을 "가능한 한 빨리, 다른 import보다 먼저" 하라고 권고한다. 하지만 `opentelemetry-instrument` 래퍼가 먼저 실행되면서 패치 순서가 꼬인다. 이로 인해 threading 관련 패치가 불완전해지고, greenlet 간 DB 커넥션 격리가 더 불안정해진다. +gevent 공식 문서는 monkey-patching을 "가능한 한 빨리, 다른 import보다 먼저" 하라고 권고한다. 하지만 배포 환경에서는 `opentelemetry-instrument` 래퍼가 먼저 실행되면서 패치 순서가 꼬인다. 이로 인해 `threading.local()`이 greenlet-local로 완전히 패치되지 못하고, greenlet 간 DB 커넥션 격리가 무너진다. + +**로컬에서 발생하지 않았던 이유가 여기에 있었다.** 로컬에서는 `celery -A config worker --pool=gevent`를 직접 실행한다. OTel 래퍼 없이 Celery가 직접 시작되므로 `monkey.patch_all()`이 충분히 일찍 호출되고, `threading.local()`이 정상적으로 greenlet-local로 패치된다. 각 greenlet이 자기만의 DB connections를 갖게 되어 스레드 ID 불일치가 일어나지 않는다. 배포 환경에서만 `opentelemetry-instrument`가 끼어들면서 패치 순서가 뒤집힌 것이다. ### 원인 요약 From cdaeaf74e9d98ed3a29925d373f86f5a939b2590 Mon Sep 17 00:00:00 2001 From: sanghun Date: Fri, 13 Feb 2026 09:32:46 +0900 Subject: [PATCH 08/11] fix: reset DB connection thread_ident before cleanup in gevent tasks OTel late patching prevents threading.local from being greenlet-local, causing stale connections to be shared across greenlets. Transfer connection ownership to current greenlet before close_old_connections() so both task code and Celery's post-task cleanup pass validation. --- tasks/notification_tasks.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/tasks/notification_tasks.py b/tasks/notification_tasks.py index 770a3c8..b2afea1 100644 --- a/tasks/notification_tasks.py +++ b/tasks/notification_tasks.py @@ -3,7 +3,10 @@ import logging import os +import _thread + from celery import shared_task +from django import db from django.utils import timezone logger = logging.getLogger(__name__) @@ -27,6 +30,14 @@ def send_notification(self, detection_id: int): - 매칭된 차량 개별 푸시 (차량 있는 경우) - MSA: 각 서비스별 DB에서 조회 """ + # OTel late patching으로 threading.local이 greenlet-local로 패치되지 않아 + # 다른 greenlet이 만든 커넥션이 공유됨. 현재 greenlet ID로 소유권을 이전한 뒤 + # stale 커넥션을 정리해야 close() 시 validate_thread_sharing()을 통과한다. + current_ident = _thread.get_ident() + for conn in db.connections.all(): + conn._thread_ident = current_ident + db.close_old_connections() + from apps.detections.models import Detection from apps.notifications.models import Notification from apps.vehicles.models import Vehicle From 6735042d894771c829f587b31b8d6fb87766b865 Mon Sep 17 00:00:00 2001 From: sanghun Date: Fri, 13 Feb 2026 09:41:48 +0900 Subject: [PATCH 09/11] docs: update solution to reflect actual working fix (thread_ident reset) close_old_connections() alone fails because close() also validates thread sharing. Must reset _thread_ident to current greenlet first. --- docs/GEVENT_DB_THREAD_SAFETY.md | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/docs/GEVENT_DB_THREAD_SAFETY.md b/docs/GEVENT_DB_THREAD_SAFETY.md index 9d1caeb..5912243 100644 --- a/docs/GEVENT_DB_THREAD_SAFETY.md +++ b/docs/GEVENT_DB_THREAD_SAFETY.md @@ -166,9 +166,9 @@ flowchart TB 원인을 파악했으니 해결 방안을 검토했다. 핵심은 **"어떤 비용을 감수할 것인가"**였다. -**방안 A. `db.close_old_connections()` — 태스크 시작 시 호출** +**방안 A. 커넥션 소유권 이전 + `db.close_old_connections()`** -매 태스크가 실행될 때마다 모든 DB alias의 stale 커넥션을 닫는다. 다음 ORM 호출 시 현재 greenlet에서 새 커넥션이 생성된다. +태스크 시작 시 모든 DB 커넥션의 `_thread_ident`를 현재 greenlet ID로 갱신한 뒤, stale 커넥션을 닫는다. 소유권을 먼저 이전해야 `close()` 내부의 `validate_thread_sharing()` 검증을 통과할 수 있다. 이후 ORM 호출 시 현재 greenlet에서 새 커넥션이 생성된다. - *Trade-off*: 매번 커넥션을 새로 맺는 오버헤드가 발생한다. 하지만 Alert Worker의 태스크는 FCM 전송(네트워크 I/O)이 병목이므로 DB 커넥션 생성 비용(~1ms)은 무시할 수 있는 수준이다. **방안 B. Celery Signal(`task_prerun`) — 전역 훅** @@ -216,18 +216,23 @@ def send_notification(self, detection_id: int): **수정 후:** ```python +import _thread from django import db @shared_task(bind=True, max_retries=3, autoretry_for=(Exception,), ...) def send_notification(self, detection_id: int): - db.close_old_connections() # gevent greenlet 간 stale 커넥션 정리 + # 현재 greenlet으로 커넥션 소유권 이전 → stale 커넥션 정리 + current_ident = _thread.get_ident() + for conn in db.connections.all(): + conn._thread_ident = current_ident + db.close_old_connections() from apps.detections.models import Detection # ... detection = Detection.objects.using("detections_db").get(...) # ✅ ``` -`close_old_connections()`는 모든 DB alias를 순회하며 사용 불가능하거나 수명이 초과된 커넥션을 닫는다. 이후 ORM 호출 시 현재 greenlet에서 새 커넥션이 생성되므로 `_thread_ident`가 일치하게 된다. +단순히 `close_old_connections()`만 호출하면 `close()` 내부에서도 `validate_thread_sharing()`이 실행되어 같은 에러가 발생한다. 커넥션의 `_thread_ident`를 현재 greenlet ID로 먼저 갱신해야 정리가 가능하다. 이후 ORM 호출 시 현재 greenlet에서 새 커넥션이 생성되므로 `_thread_ident`가 자연스럽게 일치한다. > **📸 캡처 4**: 수정 코드 diff > - `git diff -- tasks/notification_tasks.py` From b57133e8151fd27194be147b4cf34a507934626e Mon Sep 17 00:00:00 2001 From: sanghun Date: Wed, 18 Feb 2026 22:22:00 +0900 Subject: [PATCH 10/11] fix: replace _thread_ident workaround with OTel gevent env var (root cause fix) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 기존 Django private API(_thread_ident) 의존 워크어라운드를 제거하고, OTel 환경변수(OTEL_PYTHON_AUTO_INSTRUMENTATION_EXPERIMENTAL_GEVENT_PATCH)로 monkey-patch 순서를 교정하여 근본 원인을 해결한다. --- backend.env.example | 3 ++ docs/GEVENT_DB_THREAD_SAFETY.md | 87 +++++++++++++++++---------------- scripts/start_alert_worker.sh | 7 +++ 3 files changed, 54 insertions(+), 43 deletions(-) diff --git a/backend.env.example b/backend.env.example index f5cee6a..aa8f0bb 100644 --- a/backend.env.example +++ b/backend.env.example @@ -94,3 +94,6 @@ OTEL_TRACES_SAMPLER=parentbased_always_on OTEL_PYTHON_LOG_CORRELATION=true # GCP metadata 내부 요청을 트레이싱에서 제외 (404 ERROR span 방지) OTEL_PYTHON_REQUESTS_EXCLUDED_URLS=metadata.google.internal +# gevent pool 사용 시 monkey-patching을 OTel 초기화 전에 수행 (alert worker) +# See: docs/GEVENT_DB_THREAD_SAFETY.md +OTEL_PYTHON_AUTO_INSTRUMENTATION_EXPERIMENTAL_GEVENT_PATCH=patch_all diff --git a/docs/GEVENT_DB_THREAD_SAFETY.md b/docs/GEVENT_DB_THREAD_SAFETY.md index 5912243..60803ef 100644 --- a/docs/GEVENT_DB_THREAD_SAFETY.md +++ b/docs/GEVENT_DB_THREAD_SAFETY.md @@ -16,7 +16,8 @@ opentelemetry-instrument \ --pool=gevent \ --concurrency=${ALERT_CONCURRENCY:-100} \ --queues=fcm_queue \ - --hostname=alert@%h + --hostname=alert@%h \ + --loglevel=${LOG_LEVEL:-info} ``` 문제는 Jaeger 트레이싱을 확인하면서 드러났다. `send_notification` 태스크에서 이런 에러가 반복되고 있었다: @@ -166,25 +167,19 @@ flowchart TB 원인을 파악했으니 해결 방안을 검토했다. 핵심은 **"어떤 비용을 감수할 것인가"**였다. -**방안 A. 커넥션 소유권 이전 + `db.close_old_connections()`** +**방안 A. OTel 환경변수로 monkey-patch 순서 교정 (근본 해결)** -태스크 시작 시 모든 DB 커넥션의 `_thread_ident`를 현재 greenlet ID로 갱신한 뒤, stale 커넥션을 닫는다. 소유권을 먼저 이전해야 `close()` 내부의 `validate_thread_sharing()` 검증을 통과할 수 있다. 이후 ORM 호출 시 현재 greenlet에서 새 커넥션이 생성된다. -- *Trade-off*: 매번 커넥션을 새로 맺는 오버헤드가 발생한다. 하지만 Alert Worker의 태스크는 FCM 전송(네트워크 I/O)이 병목이므로 DB 커넥션 생성 비용(~1ms)은 무시할 수 있는 수준이다. +OTel Python 1.37.0+에서 제공하는 `OTEL_PYTHON_AUTO_INSTRUMENTATION_EXPERIMENTAL_GEVENT_PATCH=patch_all` 환경변수를 설정하면, `opentelemetry-instrument`가 자체 초기화 전에 `gevent.monkey.patch_all()`을 호출한다. 이렇게 하면 `threading.local()`이 정상적으로 greenlet-local로 패치되어, 각 greenlet이 자기만의 `db.connections`를 갖게 된다. Late patching 자체가 발생하지 않으므로 문제의 근본 원인이 제거된다. +- *Trade-off*: "experimental" 라벨이 붙어 있지만, 내부적으로는 동일한 `monkey.patch_all()` 호출 — 타이밍만 다를 뿐이다. OTel 릴리스 노트 모니터링 필요. -**방안 B. Celery Signal(`task_prerun`) — 전역 훅** +**방안 B. 커넥션 소유권 이전 + `db.close_old_connections()` (런타임 워크어라운드)** -```python -@signals.task_prerun.connect -def close_db_connections(**kwargs): - close_old_connections() -``` - -태스크 코드를 수정하지 않아도 되지만, 모든 태스크에 전역으로 적용된다. 문제는 OCR Worker처럼 이 이슈가 없는 워커에도 불필요하게 커넥션을 닫게 된다는 점이다. 그리고 Signal은 암묵적으로 동작하기 때문에 나중에 "왜 커넥션이 자꾸 끊기지?"라는 디버깅 지옥에 빠질 수 있다. -- *Trade-off*: 코드 수정 없음 vs. 전역 부작용 + 디버깅 난이도 +태스크 시작 시 모든 DB 커넥션의 `_thread_ident`를 현재 greenlet ID로 갱신한 뒤 stale 커넥션을 닫는 방식. 동작하지만 Django의 private API(`_thread_ident`)에 의존하며, Django 업그레이드 시 깨질 수 있다. 또한 100개 greenlet이 cooperative scheduling으로 동작하므로 `close()` 시 socket I/O에서 context switch가 발생할 수 있는 잠재적 race condition이 존재한다. +- *Trade-off*: 즉시 적용 가능 vs. private API 의존 + Django 버전 종속 **방안 C. `inc_thread_sharing()` — 스레드 공유 허용** -Django가 제공하는 escape hatch. 커넥션의 스레드 검증을 끈다. 에러는 사라지지만, 여러 greenlet이 같은 커넥션을 동시에 사용할 수 있게 되어 race condition 위험이 생긴다. Django 공식 문서도 이 방식을 권장하지 않는다. +Django가 제공하는 escape hatch. 커넥션의 스레드 검증을 끈다. 에러는 사라지지만, 여러 greenlet이 같은 물리적 DB 커넥션을 동시에 사용할 수 있게 되어 프로토콜 레벨 MySQL 오류와 데이터 정합성 위험이 생긴다. - *Trade-off*: 에러 해소 vs. 데이터 정합성 위험 **방안 D. gevent → prefork 전환** @@ -192,50 +187,54 @@ Django가 제공하는 escape hatch. 커넥션의 스레드 검증을 끈다. 근본적으로 gevent를 쓰지 않으면 문제 자체가 없다. 하지만 Alert Worker는 FCM 푸시라는 I/O 바운드 작업에 특화되어 있고, prefork의 프로세스 기반 모델은 동시 100개 처리에 메모리 비효율적이다. - *Trade-off*: 문제 근본 해결 vs. I/O 집약 워크로드에 부적합 -**결론: 방안 A를 채택했다.** 코드 변경이 한 줄이고, 영향 범위가 해당 태스크로 한정되며, 커넥션 재생성 비용은 FCM 네트워크 I/O 대비 무시할 수 있다. +**결론: 방안 A를 채택했다.** 환경변수 한 줄로 근본 원인(late patching)을 제거하며, 애플리케이션 코드 수정이 불필요하다. `MonkeyPatchWarning` 경고도 함께 사라진다. ```mermaid quadrantChart title 해결 방안 Trade-off 비교 x-axis 낮은 침습성 --> 높은 침습성 y-axis 낮은 안정성 --> 높은 안정성 - A. close_old_connections: [0.25, 0.78] - B. Celery Signal: [0.2, 0.6] + A. OTel gevent patch: [0.1, 0.9] + B. _thread_ident 워크어라운드: [0.3, 0.65] C. inc_thread_sharing: [0.35, 0.2] D. prefork 전환: [0.85, 0.9] ``` **수정 전:** -```python -@shared_task(bind=True, max_retries=3, autoretry_for=(Exception,), ...) -def send_notification(self, detection_id: int): - from apps.detections.models import Detection - # ... - detection = Detection.objects.using("detections_db").get(...) # 💥 +```bash +# scripts/start_alert_worker.sh +opentelemetry-instrument \ + --service_name speedcam-alert \ + celery -A config worker \ + --pool=gevent \ + --concurrency=${ALERT_CONCURRENCY:-100} \ + --queues=fcm_queue \ + --hostname=alert@%h \ + --loglevel=${LOG_LEVEL:-info} ``` **수정 후:** -```python -import _thread -from django import db - -@shared_task(bind=True, max_retries=3, autoretry_for=(Exception,), ...) -def send_notification(self, detection_id: int): - # 현재 greenlet으로 커넥션 소유권 이전 → stale 커넥션 정리 - current_ident = _thread.get_ident() - for conn in db.connections.all(): - conn._thread_ident = current_ident - db.close_old_connections() - - from apps.detections.models import Detection - # ... - detection = Detection.objects.using("detections_db").get(...) # ✅ +```bash +# scripts/start_alert_worker.sh + +# gevent monkey-patching을 OTel 초기화 전에 수행하도록 설정 +export OTEL_PYTHON_AUTO_INSTRUMENTATION_EXPERIMENTAL_GEVENT_PATCH=patch_all + +opentelemetry-instrument \ + --service_name speedcam-alert \ + celery -A config worker \ + --pool=gevent \ + --concurrency=${ALERT_CONCURRENCY:-100} \ + --queues=fcm_queue \ + --hostname=alert@%h \ + --loglevel=${LOG_LEVEL:-info} ``` -단순히 `close_old_connections()`만 호출하면 `close()` 내부에서도 `validate_thread_sharing()`이 실행되어 같은 에러가 발생한다. 커넥션의 `_thread_ident`를 현재 greenlet ID로 먼저 갱신해야 정리가 가능하다. 이후 ORM 호출 시 현재 greenlet에서 새 커넥션이 생성되므로 `_thread_ident`가 자연스럽게 일치한다. +이 환경변수가 `opentelemetry-instrument`에게 "초기화 전에 `gevent.monkey.patch_all()`을 먼저 실행하라"고 지시한다. 패치 순서가 바로잡히면 `threading.local()`이 정상적으로 greenlet-local이 되고, Django의 `validate_thread_sharing()` 검증을 greenlet 간에도 자연스럽게 통과한다. -> **📸 캡처 4**: 수정 코드 diff -> - `git diff -- tasks/notification_tasks.py` +> **📸 캡처 4**: 수정 전후 비교 +> - `git diff -- scripts/start_alert_worker.sh` +> - `docker logs speedcam-alert 2>&1 | head -20` — `MonkeyPatchWarning` 사라진 것 확인 --- @@ -258,9 +257,10 @@ def send_notification(self, detection_id: int): | | Before | After | |---|---|---| | `send_notification` 에러율 | DatabaseWrapper 에러 반복 | 에러 제거 | -| DB 커넥션 패턴 | stale 커넥션 재사용 → 실패 | 태스크 시작 시 정리 → 새 커넥션 | +| `MonkeyPatchWarning` | ssl late patching 경고 발생 | 경고 제거 (정상 패치 순서) | +| DB 커넥션 격리 | greenlet 간 공유 (threading.local 미패치) | greenlet별 독립 (greenlet-local) | | OCR Worker 영향 | — | 없음 (prefork pool) | -| 성능 오버헤드 | — | 미미 (DB alias 4개 순회) | +| 코드 변경 | — | 환경변수 1줄 (애플리케이션 코드 변경 없음) | --- @@ -271,6 +271,7 @@ def send_notification(self, detection_id: int): - [Django Databases — Persistent connections and thread safety](https://docs.djangoproject.com/en/5.1/ref/databases/#persistent-database-connections) - [Celery — Concurrency with Gevent](https://docs.celeryq.dev/en/stable/userguide/concurrency/gevent.html) - [Gevent — Monkey Patching](http://www.gevent.org/api/gevent.monkey.html) +- [OpenTelemetry Python — Agent Configuration (gevent patch)](https://opentelemetry.io/docs/zero-code/python/configuration/) ### GitHub Issues diff --git a/scripts/start_alert_worker.sh b/scripts/start_alert_worker.sh index 29eed45..7150f2a 100644 --- a/scripts/start_alert_worker.sh +++ b/scripts/start_alert_worker.sh @@ -9,6 +9,13 @@ export DJANGO_SETTINGS_MODULE="${DJANGO_SETTINGS_MODULE:-config.settings.dev}" # GCP metadata 내부 요청을 트레이싱에서 제외 (404 ERROR span 방지) export OTEL_PYTHON_REQUESTS_EXCLUDED_URLS="${OTEL_PYTHON_REQUESTS_EXCLUDED_URLS:-metadata.google.internal}" +# gevent monkey-patching을 OTel 초기화 전에 수행하도록 설정 +# 이 설정이 없으면 OTel이 ssl/urllib3를 먼저 import하여 +# threading.local()이 greenlet-local로 패치되지 않아 +# Django DB 커넥션이 greenlet 간 공유되는 thread-safety 문제 발생 +# See: docs/GEVENT_DB_THREAD_SAFETY.md +export OTEL_PYTHON_AUTO_INSTRUMENTATION_EXPERIMENTAL_GEVENT_PATCH=patch_all + # Alert Event Consumer 시작 # Choreography: detections.completed 이벤트를 직접 구독하여 # Alert Service가 자율적으로 알림 발송 여부를 결정한다. From b4af831e2ebdf21ca640c61e6a85ca60c697bc6c Mon Sep 17 00:00:00 2001 From: sanghun Date: Thu, 19 Feb 2026 05:47:13 +0900 Subject: [PATCH 11/11] feat: separate Kombu event consumer and Celery gevent worker - Kombu consumer receives detections.completed events (single thread) - send_notification dispatched to Celery gevent pool via .delay() - notification_tasks.py converted to @shared_task with autoretry - fcm_queue added to celery.py for FCM task routing - start_alert_worker.sh runs 2 processes (POSIX sh compatible) - GEVENT_DB_THREAD_SAFETY.md cleaned up to focus on concurrency issue --- backend.env.example | 9 +- config/celery.py | 17 ++- core/events/consumer.py | 24 +-- docs/GEVENT_DB_THREAD_SAFETY.md | 16 +- scripts/start_alert_worker.sh | 55 +++++-- tasks/__init__.py | 3 +- tasks/notification_tasks.py | 257 +++++++++++++++++--------------- 7 files changed, 220 insertions(+), 161 deletions(-) diff --git a/backend.env.example b/backend.env.example index aa8f0bb..2896a93 100644 --- a/backend.env.example +++ b/backend.env.example @@ -89,11 +89,14 @@ CORS_ALLOWED_ORIGINS=http://localhost:5173,http://localhost:3000 OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317 OTEL_EXPORTER_OTLP_PROTOCOL=grpc OTEL_RESOURCE_ATTRIBUTES=service.namespace=speedcam,deployment.environment=dev + # Valid values: always_on, always_off, traceidratio, parentbased_always_on, parentbased_always_off, parentbased_traceidratio OTEL_TRACES_SAMPLER=parentbased_always_on OTEL_PYTHON_LOG_CORRELATION=true + +# gevent monkey-patching 순서 교정 (Alert Worker: Celery gevent pool + OTel) +# docs/GEVENT_DB_THREAD_SAFETY.md 참조 +OTEL_PYTHON_AUTO_INSTRUMENTATION_EXPERIMENTAL_GEVENT_PATCH=patch_all + # GCP metadata 내부 요청을 트레이싱에서 제외 (404 ERROR span 방지) OTEL_PYTHON_REQUESTS_EXCLUDED_URLS=metadata.google.internal -# gevent pool 사용 시 monkey-patching을 OTel 초기화 전에 수행 (alert worker) -# See: docs/GEVENT_DB_THREAD_SAFETY.md -OTEL_PYTHON_AUTO_INSTRUMENTATION_EXPERIMENTAL_GEVENT_PATCH=patch_all diff --git a/config/celery.py b/config/celery.py index 50d3c6b..a347b82 100644 --- a/config/celery.py +++ b/config/celery.py @@ -15,11 +15,10 @@ # Exchange 정의 ocr_exchange = Exchange("ocr_exchange", type="direct", durable=True) +fcm_exchange = Exchange("fcm_exchange", type="direct", durable=True) dlq_exchange = Exchange("dlq_exchange", type="fanout", durable=True) # Queue 정의 -# Note: fcm_queue 제거 — Alert Service는 Celery Command가 아닌 -# AMQP domain_events exchange의 detections.completed 이벤트를 직접 구독 (Choreography) app.conf.task_queues = ( Queue( "ocr_queue", @@ -31,6 +30,15 @@ "x-max-priority": 10, }, ), + Queue( + "fcm_queue", + exchange=fcm_exchange, + routing_key="fcm", + queue_arguments={ + "x-dead-letter-exchange": "dlq_exchange", + "x-message-ttl": 3600000, + }, + ), Queue( "dlq_queue", exchange=dlq_exchange, @@ -45,6 +53,11 @@ "exchange": "ocr_exchange", "routing_key": "ocr", }, + "tasks.notification_tasks.send_notification": { + "queue": "fcm_queue", + "exchange": "fcm_exchange", + "routing_key": "fcm", + }, "tasks.dlq_tasks.process_dlq_message": { "queue": "dlq_queue", "exchange": "dlq_exchange", diff --git a/core/events/consumer.py b/core/events/consumer.py index 6aa76b6..d5c4a61 100644 --- a/core/events/consumer.py +++ b/core/events/consumer.py @@ -11,7 +11,6 @@ import json import logging import os -import time from kombu import Connection, Exchange, Queue from kombu.mixins import ConsumerMixin @@ -71,29 +70,18 @@ def _on_detection_completed(self, payload): Alert Service의 자율적 판단: "OCR이 완료됐으니 알림을 보내야겠다" + + Celery gevent worker에 위임하여 비동기 병렬 처리. """ detection_id = payload["detection_id"] logger.info( f"Detection {detection_id} completed event received — " - f"processing notification" + f"dispatching to FCM worker" ) - max_retries = 3 - for attempt in range(max_retries + 1): - try: - from tasks.notification_tasks import process_notification - - process_notification(detection_id) - return - except Exception as e: - if "DoesNotExist" in type(e).__name__ and attempt < max_retries: - logger.warning( - f"Detection {detection_id} not ready, " - f"retry {attempt + 1}/{max_retries}" - ) - time.sleep(3) - else: - raise + from tasks.notification_tasks import send_notification + + send_notification.delay(detection_id) def start_event_consumer(): diff --git a/docs/GEVENT_DB_THREAD_SAFETY.md b/docs/GEVENT_DB_THREAD_SAFETY.md index 60803ef..189a75b 100644 --- a/docs/GEVENT_DB_THREAD_SAFETY.md +++ b/docs/GEVENT_DB_THREAD_SAFETY.md @@ -2,6 +2,8 @@ > Celery의 gevent pool과 Django ORM을 함께 쓸 때 마주치는 `DatabaseWrapper objects created in a thread can only be used in that same thread` 에러의 원인과 해결. +> **Note (2025-02)**: Alert Worker는 현재도 Celery gevent pool을 사용한다. 따라서 이 문서에서 분석한 OTel + gevent late patching → DB thread-safety 이슈가 **현재도 유효**하며, `OTEL_PYTHON_AUTO_INSTRUMENTATION_EXPERIMENTAL_GEVENT_PATCH=patch_all` 환경변수가 필수다. + --- ## Situation — 어느 날 Jaeger에서 발견한 에러 @@ -187,7 +189,7 @@ Django가 제공하는 escape hatch. 커넥션의 스레드 검증을 끈다. 근본적으로 gevent를 쓰지 않으면 문제 자체가 없다. 하지만 Alert Worker는 FCM 푸시라는 I/O 바운드 작업에 특화되어 있고, prefork의 프로세스 기반 모델은 동시 100개 처리에 메모리 비효율적이다. - *Trade-off*: 문제 근본 해결 vs. I/O 집약 워크로드에 부적합 -**결론: 방안 A를 채택했다.** 환경변수 한 줄로 근본 원인(late patching)을 제거하며, 애플리케이션 코드 수정이 불필요하다. `MonkeyPatchWarning` 경고도 함께 사라진다. +**당시 결론: 방안 A를 채택했다.** 환경변수 한 줄로 근본 원인(late patching)을 제거하며, 애플리케이션 코드 수정이 불필요하다. `MonkeyPatchWarning` 경고도 함께 사라진다. ```mermaid quadrantChart @@ -200,7 +202,7 @@ quadrantChart D. prefork 전환: [0.85, 0.9] ``` -**수정 전:** +**수정 전 (Celery gevent pool):** ```bash # scripts/start_alert_worker.sh opentelemetry-instrument \ @@ -213,7 +215,7 @@ opentelemetry-instrument \ --loglevel=${LOG_LEVEL:-info} ``` -**수정 후:** +**수정 후 (OTel 환경변수 추가):** ```bash # scripts/start_alert_worker.sh @@ -232,10 +234,6 @@ opentelemetry-instrument \ 이 환경변수가 `opentelemetry-instrument`에게 "초기화 전에 `gevent.monkey.patch_all()`을 먼저 실행하라"고 지시한다. 패치 순서가 바로잡히면 `threading.local()`이 정상적으로 greenlet-local이 되고, Django의 `validate_thread_sharing()` 검증을 greenlet 간에도 자연스럽게 통과한다. -> **📸 캡처 4**: 수정 전후 비교 -> - `git diff -- scripts/start_alert_worker.sh` -> - `docker logs speedcam-alert 2>&1 | head -20` — `MonkeyPatchWarning` 사라진 것 확인 - --- ## Result — Before & After @@ -254,12 +252,12 @@ opentelemetry-instrument \ > **📸 캡처 8**: [After] Grafana Logs Explorer > - 수정 후 `DatabaseWrapper` 에러 로그 없음 확인 -| | Before | After | +| | Before (env var 없음) | After (env var 적용) | |---|---|---| | `send_notification` 에러율 | DatabaseWrapper 에러 반복 | 에러 제거 | | `MonkeyPatchWarning` | ssl late patching 경고 발생 | 경고 제거 (정상 패치 순서) | | DB 커넥션 격리 | greenlet 간 공유 (threading.local 미패치) | greenlet별 독립 (greenlet-local) | -| OCR Worker 영향 | — | 없음 (prefork pool) | +| OCR Worker 영향 | 없음 (prefork pool) | 없음 (prefork pool) | | 코드 변경 | — | 환경변수 1줄 (애플리케이션 코드 변경 없음) | --- diff --git a/scripts/start_alert_worker.sh b/scripts/start_alert_worker.sh index 7150f2a..f303cce 100644 --- a/scripts/start_alert_worker.sh +++ b/scripts/start_alert_worker.sh @@ -1,7 +1,7 @@ -#!/bin/bash +#!/bin/sh set -e -echo "Starting Alert Worker (Domain Event Consumer)..." +echo "Starting Alert Worker (Kombu Consumer + Celery FCM Worker)..." # Django 설정 모듈 기본값 export DJANGO_SETTINGS_MODULE="${DJANGO_SETTINGS_MODULE:-config.settings.dev}" @@ -10,20 +10,53 @@ export DJANGO_SETTINGS_MODULE="${DJANGO_SETTINGS_MODULE:-config.settings.dev}" export OTEL_PYTHON_REQUESTS_EXCLUDED_URLS="${OTEL_PYTHON_REQUESTS_EXCLUDED_URLS:-metadata.google.internal}" # gevent monkey-patching을 OTel 초기화 전에 수행하도록 설정 -# 이 설정이 없으면 OTel이 ssl/urllib3를 먼저 import하여 -# threading.local()이 greenlet-local로 패치되지 않아 -# Django DB 커넥션이 greenlet 간 공유되는 thread-safety 문제 발생 -# See: docs/GEVENT_DB_THREAD_SAFETY.md +# Celery gevent pool + OTel 조합에서 late patching으로 인한 +# Django DB thread-safety 이슈 방지 (docs/GEVENT_DB_THREAD_SAFETY.md 참조) export OTEL_PYTHON_AUTO_INSTRUMENTATION_EXPERIMENTAL_GEVENT_PATCH=patch_all -# Alert Event Consumer 시작 -# Choreography: detections.completed 이벤트를 직접 구독하여 -# Alert Service가 자율적으로 알림 발송 여부를 결정한다. +# ============================================================ +# 프로세스 1: Kombu 이벤트 소비자 (백그라운드) +# - 단일 스레드, detections.completed 이벤트 수신 +# - send_notification.delay()로 Celery에 위임 +# ============================================================ opentelemetry-instrument \ - --service_name speedcam-alert \ + --service_name speedcam-alert-consumer \ python -c " import django django.setup() from core.events.consumer import start_event_consumer start_event_consumer() -" +" & + +CONSUMER_PID=$! +echo "Kombu consumer started (PID: $CONSUMER_PID)" + +# ============================================================ +# 프로세스 2: Celery gevent worker (포그라운드) +# - gevent pool, FCM 전송 병렬 처리 +# - fcm_queue에서 send_notification 태스크 소비 +# ============================================================ +opentelemetry-instrument \ + --service_name speedcam-alert \ + celery -A config worker \ + --pool=gevent \ + --concurrency="${ALERT_CONCURRENCY:-100}" \ + --queues=fcm_queue \ + --hostname=alert@%h \ + --loglevel="${LOG_LEVEL:-info}" & + +CELERY_PID=$! +echo "Celery FCM worker started (PID: $CELERY_PID)" + +# 어느 하나라도 종료되면 나머지도 종료 (POSIX sh 호환) +trap "kill $CONSUMER_PID $CELERY_PID 2>/dev/null; exit" TERM INT + +# wait -n은 bash 전용이므로 POSIX 호환 방식으로 대체 +# 두 프로세스 중 하나라도 종료되면 감지 +while kill -0 $CONSUMER_PID 2>/dev/null && kill -0 $CELERY_PID 2>/dev/null; do + sleep 1 +done + +echo "A process exited, shutting down..." +kill $CONSUMER_PID $CELERY_PID 2>/dev/null || true +wait diff --git a/tasks/__init__.py b/tasks/__init__.py index d0f85f7..c3d404a 100644 --- a/tasks/__init__.py +++ b/tasks/__init__.py @@ -1,5 +1,6 @@ # Celery Tasks Package from .dlq_tasks import process_dlq_message +from .notification_tasks import send_notification from .ocr_tasks import process_ocr -__all__ = ["process_ocr", "process_dlq_message"] +__all__ = ["process_ocr", "send_notification", "process_dlq_message"] diff --git a/tasks/notification_tasks.py b/tasks/notification_tasks.py index 3784751..770a3c8 100644 --- a/tasks/notification_tasks.py +++ b/tasks/notification_tasks.py @@ -1,13 +1,9 @@ -""" -Notification Processing Logic (Alert Service) - -EDA/Choreography: detections.completed 이벤트에 반응하여 -Alert Service가 자율적으로 알림 발송 여부를 결정한다. -""" +"""Notification Worker Tasks (I/O 집약적)""" import logging import os +from celery import shared_task from django.utils import timezone logger = logging.getLogger(__name__) @@ -16,142 +12,169 @@ FCM_MOCK = os.getenv("FCM_MOCK", "false").lower() == "true" -def process_notification(detection_id: int): +@shared_task( + bind=True, + max_retries=3, + autoretry_for=(Exception,), + retry_backoff=True, + retry_backoff_max=600, + acks_late=True, +) +def send_notification(self, detection_id: int): """ - FCM 푸시 알림 전송 - + FCM 푸시 알림 전송 Task - 대시보드 토픽 브로드캐스트 (모든 감지에 대해) - 매칭된 차량 개별 푸시 (차량 있는 경우) - MSA: 각 서비스별 DB에서 조회 """ - from django import db - - db.close_old_connections() - from apps.detections.models import Detection from apps.notifications.models import Notification from apps.vehicles.models import Vehicle - # 1. Detection 조회 (detections_db) - detection = Detection.objects.using("detections_db").get( - id=detection_id, status="completed" - ) - - # 2. 알림 메시지 생성 - title = f"과속 위반 감지: {detection.ocr_result or '미확인'}" - body = ( - f"위치: {detection.location or '알 수 없음'}\n" - f"속도: {detection.detected_speed}km/h " - f"(제한: {detection.speed_limit}km/h)" - ) - data = { - "detection_id": str(detection_id), - "plate_number": detection.ocr_result or "", - "speed": str(detection.detected_speed), - "speed_limit": str(detection.speed_limit), - "location": detection.location or "", - "detected_at": detection.detected_at.isoformat(), - } - - # 3. 대시보드 토픽으로 항상 전송 (중복 방지) - topic_response = None - already_sent_topic = ( - Notification.objects.using("notifications_db") - .filter( - detection_id=detection_id, - fcm_token="topic:dashboard_alerts", - status="sent", + try: + # 1. Detection 조회 (detections_db) + detection = Detection.objects.using("detections_db").get( + id=detection_id, status="completed" ) - .exists() - ) - - if not already_sent_topic: - try: - if FCM_MOCK: - topic_response = f"mock-topic-{detection_id}" - else: - from core.firebase.fcm import send_topic_notification - - topic_response = send_topic_notification( - "dashboard_alerts", title, body, data - ) - logger.info( - f"Dashboard topic notification sent for detection " - f"{detection_id}: {topic_response}" - ) - except Exception as e: - logger.warning( - f"Dashboard topic notification failed for detection " - f"{detection_id}: {e}" - ) - # 4. 토픽 알림 이력 저장 (notifications_db) - Notification.objects.using("notifications_db").create( - detection_id=detection_id, - fcm_token="topic:dashboard_alerts", - title=title, - body=body, - status="sent" if topic_response else "failed", - sent_at=timezone.now() if topic_response else None, - error_message=None if topic_response else "Topic send failed", + # 2. 알림 메시지 생성 + title = f"⚠️ 과속 위반 감지: {detection.ocr_result or '미확인'}" + body = ( + f"📍 위치: {detection.location or '알 수 없음'}\n" + f"🚗 속도: {detection.detected_speed}km/h " + f"(제한: {detection.speed_limit}km/h)" ) - else: - topic_response = "already_sent" - logger.info( - f"Dashboard topic notification already sent for detection " - f"{detection_id}, skipping" + data = { + "detection_id": str(detection_id), + "plate_number": detection.ocr_result or "", + "speed": str(detection.detected_speed), + "speed_limit": str(detection.speed_limit), + "location": detection.location or "", + "detected_at": detection.detected_at.isoformat(), + } + + # 3. 대시보드 토픽으로 항상 전송 (중복 방지) + topic_response = None + already_sent_topic = ( + Notification.objects.using("notifications_db") + .filter( + detection_id=detection_id, + fcm_token="topic:dashboard_alerts", + status="sent", + ) + .exists() ) - # 5. 매칭된 차량에 개별 푸시 - vehicle = None - if detection.vehicle_id: - try: - vehicle = Vehicle.objects.using("vehicles_db").get(id=detection.vehicle_id) - except Vehicle.DoesNotExist: - logger.warning(f"Vehicle {detection.vehicle_id} not found") - - if vehicle and vehicle.fcm_token: - try: - if FCM_MOCK: - vehicle_response = f"mock-message-id-{detection_id}" - else: - from core.firebase.fcm import send_push_notification - - vehicle_response = send_push_notification( - token=vehicle.fcm_token, - title=title, - body=body, - data=data, + if not already_sent_topic: + try: + if FCM_MOCK: + topic_response = f"mock-topic-{detection_id}" + else: + from core.firebase.fcm import send_topic_notification + + topic_response = send_topic_notification( + "dashboard_alerts", title, body, data + ) + logger.info( + f"Dashboard topic notification sent for detection " + f"{detection_id}: {topic_response}" + ) + except Exception as e: + logger.warning( + f"Dashboard topic notification failed for detection " + f"{detection_id}: {e}" ) + # 4. 토픽 알림 이력 저장 (notifications_db) Notification.objects.using("notifications_db").create( detection_id=detection_id, - fcm_token=vehicle.fcm_token, + fcm_token="topic:dashboard_alerts", title=title, body=body, - status="sent", - sent_at=timezone.now(), + status="sent" if topic_response else "failed", + sent_at=timezone.now() if topic_response else None, + error_message=None if topic_response else "Topic send failed", ) + else: + topic_response = "already_sent" logger.info( - f"Vehicle notification sent for detection " - f"{detection_id}: {vehicle_response}" - ) - except Exception as e: - logger.warning( - f"Vehicle notification failed for detection {detection_id}: {e}" + f"Dashboard topic notification already sent for detection " + f"{detection_id}, skipping" ) + + # 5. 매칭된 차량에 개별 푸시 (기존 동작) + vehicle = None + if detection.vehicle_id: + try: + vehicle = Vehicle.objects.using("vehicles_db").get( + id=detection.vehicle_id + ) + except Vehicle.DoesNotExist: + logger.warning(f"Vehicle {detection.vehicle_id} not found") + + if vehicle and vehicle.fcm_token: + try: + if FCM_MOCK: + vehicle_response = f"mock-message-id-{detection_id}" + else: + from core.firebase.fcm import send_push_notification + + vehicle_response = send_push_notification( + token=vehicle.fcm_token, + title=title, + body=body, + data=data, + ) + + Notification.objects.using("notifications_db").create( + detection_id=detection_id, + fcm_token=vehicle.fcm_token, + title=title, + body=body, + status="sent", + sent_at=timezone.now(), + ) + logger.info( + f"Vehicle notification sent for detection " + f"{detection_id}: {vehicle_response}" + ) + except Exception as e: + logger.warning( + f"Vehicle notification failed for detection " f"{detection_id}: {e}" + ) + Notification.objects.using("notifications_db").create( + detection_id=detection_id, + fcm_token=vehicle.fcm_token, + title=title, + body=body, + status="failed", + error_message=str(e), + ) + + return { + "status": "sent", + "topic": bool(topic_response), + "vehicle": bool(vehicle and vehicle.fcm_token), + } + + except Detection.DoesNotExist: + logger.warning(f"Detection {detection_id} not found or not completed, retrying") + raise self.retry(countdown=3, max_retries=3) + + except Exception as exc: + try: + from apps.notifications.models import Notification + Notification.objects.using("notifications_db").create( detection_id=detection_id, - fcm_token=vehicle.fcm_token, - title=title, - body=body, status="failed", - error_message=str(e), + retry_count=self.request.retries, + error_message=str(exc), + ) + except Exception as db_err: + logger.error( + f"Failed to record notification failure for detection {detection_id}: {db_err}" ) - logger.info(f"Notification processing completed for detection {detection_id}") - return { - "status": "sent", - "topic": bool(topic_response), - "vehicle": bool(vehicle and vehicle.fcm_token), - } + logger.error(f"Notification failed for detection {detection_id}: {exc}") + raise