diff --git a/backend.env.example b/backend.env.example
index bc51cb1..f5cee6a 100644
--- a/backend.env.example
+++ b/backend.env.example
@@ -38,16 +38,17 @@ MQTT_USER=sa
MQTT_PASS=1234
# ===========================================
-# GCS (Google Cloud Storage) 설정
+# GCS (Google Cloud Storage) 인증
# ===========================================
-# OCR_MOCK=false일 때만 필요
-GOOGLE_APPLICATION_CREDENTIALS=/-path(secret)-/gcp-cloud-storage.json
-# GCS_BUCKET_NAME은 불필요 (GCS URI에서 자동 파싱)
+# GCS는 ADC (Application Default Credentials) 사용
+# GCE 인스턴스: 메타데이터 서버에서 자동 인증 (설정 불필요)
+# 로컬 개발: gcloud auth application-default login 실행 후 사용
+# OCR_MOCK=true이면 GCS 호출 없으므로 인증 불필요
# ===========================================
# Firebase 설정 (FCM Push Notification)
# ===========================================
-FIREBASE_CREDENTIALS=/-path(secret)-/firebase-service-account.json
+FIREBASE_CREDENTIALS=/app/credentials/firebase-service-account.json
# ===========================================
# Celery Worker 설정
@@ -91,3 +92,5 @@ OTEL_RESOURCE_ATTRIBUTES=service.namespace=speedcam,deployment.environment=dev
# Valid values: always_on, always_off, traceidratio, parentbased_always_on, parentbased_always_off, parentbased_traceidratio
OTEL_TRACES_SAMPLER=parentbased_always_on
OTEL_PYTHON_LOG_CORRELATION=true
+# GCP metadata 내부 요청을 트레이싱에서 제외 (404 ERROR span 방지)
+OTEL_PYTHON_REQUESTS_EXCLUDED_URLS=metadata.google.internal
diff --git a/config/celery.py b/config/celery.py
index 9f3b672..50d3c6b 100644
--- a/config/celery.py
+++ b/config/celery.py
@@ -15,12 +15,12 @@
# Exchange 정의
ocr_exchange = Exchange("ocr_exchange", type="direct", durable=True)
-fcm_exchange = Exchange("fcm_exchange", type="direct", durable=True)
dlq_exchange = Exchange("dlq_exchange", type="fanout", durable=True)
# Queue 정의
+# Note: fcm_queue 제거 — Alert Service는 Celery Command가 아닌
+# AMQP domain_events exchange의 detections.completed 이벤트를 직접 구독 (Choreography)
app.conf.task_queues = (
- # 새로운 Queue (PRD 구조)
Queue(
"ocr_queue",
exchange=ocr_exchange,
@@ -31,15 +31,6 @@
"x-max-priority": 10,
},
),
- Queue(
- "fcm_queue",
- exchange=fcm_exchange,
- routing_key="fcm",
- queue_arguments={
- "x-dead-letter-exchange": "dlq_exchange",
- "x-message-ttl": 3600000,
- },
- ),
Queue(
"dlq_queue",
exchange=dlq_exchange,
@@ -49,17 +40,11 @@
# Task 라우팅
app.conf.task_routes = {
- # 새로운 Tasks (PRD 구조)
"tasks.ocr_tasks.process_ocr": {
"queue": "ocr_queue",
"exchange": "ocr_exchange",
"routing_key": "ocr",
},
- "tasks.notification_tasks.send_notification": {
- "queue": "fcm_queue",
- "exchange": "fcm_exchange",
- "routing_key": "fcm",
- },
"tasks.dlq_tasks.process_dlq_message": {
"queue": "dlq_queue",
"exchange": "dlq_exchange",
diff --git a/core/events/__init__.py b/core/events/__init__.py
new file mode 100644
index 0000000..9179f08
--- /dev/null
+++ b/core/events/__init__.py
@@ -0,0 +1,5 @@
+# Domain Events Module (AMQP)
+# 백엔드 서비스 간 도메인 이벤트 발행/구독 (Choreography)
+from .publisher import publish_event
+
+__all__ = ["publish_event"]
diff --git a/core/events/consumer.py b/core/events/consumer.py
new file mode 100644
index 0000000..6aa76b6
--- /dev/null
+++ b/core/events/consumer.py
@@ -0,0 +1,106 @@
+"""
+AMQP Domain Event Consumer (Kombu)
+
+Alert Service가 도메인 이벤트를 직접 구독하여 자율적으로 처리.
+Choreography: 각 서비스는 이벤트에 독립적으로 반응한다.
+
+Main Service를 거치지 않고 Alert Service가 직접
+detections.completed 이벤트를 구독하여 알림 발송 여부를 결정한다.
+"""
+
+import json
+import logging
+import os
+import time
+
+from kombu import Connection, Exchange, Queue
+from kombu.mixins import ConsumerMixin
+
+logger = logging.getLogger(__name__)
+
+DOMAIN_EVENTS_EXCHANGE = Exchange("domain_events", type="topic", durable=True)
+
+
+class AlertEventConsumer(ConsumerMixin):
+ """
+ Alert Service 도메인 이벤트 소비자
+
+ detections.completed 이벤트를 구독하고,
+ Alert Service가 자율적으로 알림 발송 여부를 결정한다.
+ OCR Service의 존재도, Main Service의 중개도 모른다.
+ """
+
+ def __init__(self, connection):
+ self.connection = connection
+
+ def get_consumers(self, Consumer, channel):
+ queue = Queue(
+ "alert_domain_events",
+ exchange=DOMAIN_EVENTS_EXCHANGE,
+ routing_key="detections.completed",
+ durable=True,
+ queue_arguments={
+ "x-dead-letter-exchange": "dlq_exchange",
+ },
+ )
+ return [
+ Consumer(
+ queues=[queue],
+ callbacks=[self.on_event],
+ accept=["json"],
+ )
+ ]
+
+ def on_event(self, body, message):
+ """도메인 이벤트 수신 및 처리"""
+ try:
+ payload = json.loads(body) if isinstance(body, str) else body
+ routing_key = message.delivery_info.get("routing_key", "")
+
+ if routing_key == "detections.completed":
+ self._on_detection_completed(payload)
+
+ message.ack()
+ except Exception as e:
+ logger.error(f"Failed to process domain event: {e}")
+ message.reject(requeue=False)
+
+ def _on_detection_completed(self, payload):
+ """
+ detections.completed 이벤트에 반응
+
+ Alert Service의 자율적 판단:
+ "OCR이 완료됐으니 알림을 보내야겠다"
+ """
+ detection_id = payload["detection_id"]
+ logger.info(
+ f"Detection {detection_id} completed event received — "
+ f"processing notification"
+ )
+
+ max_retries = 3
+ for attempt in range(max_retries + 1):
+ try:
+ from tasks.notification_tasks import process_notification
+
+ process_notification(detection_id)
+ return
+ except Exception as e:
+ if "DoesNotExist" in type(e).__name__ and attempt < max_retries:
+ logger.warning(
+ f"Detection {detection_id} not ready, "
+ f"retry {attempt + 1}/{max_retries}"
+ )
+ time.sleep(3)
+ else:
+ raise
+
+
+def start_event_consumer():
+ """Alert Service 도메인 이벤트 소비자 시작 (blocking)"""
+ broker_url = os.getenv("CELERY_BROKER_URL", "amqp://guest:guest@rabbitmq:5672//")
+ logger.info(f"Starting Alert Event Consumer on {broker_url}")
+
+ with Connection(broker_url) as conn:
+ consumer = AlertEventConsumer(conn)
+ consumer.run()
diff --git a/core/events/publisher.py b/core/events/publisher.py
new file mode 100644
index 0000000..eddd5e6
--- /dev/null
+++ b/core/events/publisher.py
@@ -0,0 +1,46 @@
+"""
+AMQP Domain Event Publisher (Kombu)
+
+Choreography 패턴에서 백엔드 서비스 간 도메인 이벤트 발행.
+프로토콜 분리 원칙: IoT 경계는 MQTT, 서비스 간 이벤트는 AMQP.
+"""
+
+import json
+import logging
+import os
+
+from kombu import Connection, Exchange
+
+logger = logging.getLogger(__name__)
+
+# 도메인 이벤트 교환기 (topic exchange: routing key 기반 선택적 구독)
+DOMAIN_EVENTS_EXCHANGE = Exchange("domain_events", type="topic", durable=True)
+
+
+def publish_event(routing_key: str, payload: dict):
+ """
+ AMQP 도메인 이벤트 발행
+
+ Topic exchange를 사용하여 이벤트를 발행하면,
+ 관심 있는 서비스가 routing key 패턴으로 독립적으로 구독한다.
+
+ Args:
+ routing_key: 이벤트 라우팅 키 (예: "detections.completed")
+ payload: 이벤트 페이로드
+ """
+ broker_url = os.getenv("CELERY_BROKER_URL", "amqp://guest:guest@rabbitmq:5672//")
+
+ try:
+ with Connection(broker_url) as conn:
+ producer = conn.Producer()
+ producer.publish(
+ json.dumps(payload),
+ exchange=DOMAIN_EVENTS_EXCHANGE,
+ routing_key=routing_key,
+ content_type="application/json",
+ declare=[DOMAIN_EVENTS_EXCHANGE],
+ )
+ logger.info(f"Domain event published: {routing_key} -> {payload}")
+ except Exception as e:
+ logger.error(f"Failed to publish domain event {routing_key}: {e}")
+ raise
diff --git a/core/firebase/fcm.py b/core/firebase/fcm.py
index 6316b99..0f85bc1 100644
--- a/core/firebase/fcm.py
+++ b/core/firebase/fcm.py
@@ -31,7 +31,7 @@ def initialize_firebase():
firebase_admin.initialize_app(cred)
logger.info(f"Firebase initialized with credentials: {cred_path}")
else:
- # GOOGLE_APPLICATION_CREDENTIALS 사용
+ # ADC (Application Default Credentials) 사용
firebase_admin.initialize_app()
logger.info("Firebase initialized with default credentials")
diff --git a/core/mqtt/__init__.py b/core/mqtt/__init__.py
index 11ce08c..0bcd00a 100644
--- a/core/mqtt/__init__.py
+++ b/core/mqtt/__init__.py
@@ -1,4 +1,4 @@
-# MQTT Module
+# MQTT Module (IoT Edge 전용)
from .subscriber import MQTTSubscriber
__all__ = ["MQTTSubscriber"]
diff --git a/core/mqtt/subscriber.py b/core/mqtt/subscriber.py
index f37bbe1..826ffd9 100644
--- a/core/mqtt/subscriber.py
+++ b/core/mqtt/subscriber.py
@@ -1,4 +1,9 @@
-"""MQTT Subscriber for Edge Device messages"""
+"""
+MQTT Subscriber for IoT Edge Device messages
+
+IoT 디바이스(Raspberry Pi 카메라)에서 오는 MQTT 메시지만 처리.
+프로토콜 분리 원칙: MQTT는 IoT 경계 전용, 서비스 간 이벤트는 AMQP.
+"""
import json
import logging
@@ -13,13 +18,10 @@
class MQTTSubscriber:
"""
- RabbitMQ MQTT Plugin을 통해 Edge Device 메시지를 수신하는 Subscriber
+ RabbitMQ MQTT Plugin을 통해 IoT 디바이스 메시지를 수신하는 Subscriber
- Flow:
- 1. Raspberry Pi -> MQTT Publish (detections/new)
- 2. RabbitMQ MQTT Plugin -> 내부 변환
- 3. Django MQTT Subscriber -> 메시지 수신
- 4. Detection 생성 (pending) -> OCR Task 발행
+ 구독 토픽:
+ - detections/new : IoT 디바이스 → Detection 생성 → OCR 발행
"""
def __init__(self):
@@ -33,12 +35,12 @@ def __init__(self):
self.client.on_disconnect = self.on_disconnect
# 인증 설정
- username = os.getenv("MQTT_USER", "sa")
- password = os.getenv("MQTT_PASS", "1234")
+ username = os.getenv("MQTT_USER", "")
+ password = os.getenv("MQTT_PASS", "")
self.client.username_pw_set(username, password)
def on_connect(self, client, userdata, flags, reason_code, properties):
- """MQTT 연결 시 토픽 구독"""
+ """MQTT 연결 시 IoT 토픽 구독"""
if reason_code.is_failure:
logger.error(f"MQTT connection failed: {reason_code}")
else:
@@ -55,20 +57,30 @@ def on_disconnect(
)
def on_message(self, client, userdata, msg):
+ """메시지 처리"""
+ try:
+ payload = json.loads(msg.payload.decode())
+ except json.JSONDecodeError as e:
+ logger.error(f"Invalid JSON in MQTT message: {e}")
+ return
+
+ if msg.topic == "detections/new":
+ self._handle_new_detection(payload)
+ else:
+ logger.warning(f"Unknown MQTT topic: {msg.topic}")
+
+ def _handle_new_detection(self, payload):
"""
- 메시지 수신 시 처리
+ detections/new: IoT 디바이스에서 새 감지 수신
1. DB에 pending 레코드 즉시 생성 (데이터 손실 방지)
- 2. OCR Task 발행
+ 2. OCR Task 발행 (AMQP)
"""
try:
- payload = json.loads(msg.payload.decode())
logger.info(f"Received MQTT message: {payload.get('camera_id')}")
- # Import here to avoid circular imports
from apps.detections.models import Detection
from tasks.ocr_tasks import process_ocr
- # 1. Detection 레코드 생성 (status=pending, detections_db)
detection = Detection.objects.using("detections_db").create(
camera_id=payload.get("camera_id"),
location=payload.get("location"),
@@ -81,7 +93,6 @@ def on_message(self, client, userdata, msg):
logger.info(f"Detection {detection.id} created (pending)")
- # 2. OCR Task 발행 (AMQP via Celery)
process_ocr.apply_async(
args=[detection.id],
kwargs={"gcs_uri": payload["image_gcs_uri"]},
@@ -91,12 +102,10 @@ def on_message(self, client, userdata, msg):
logger.info(f"OCR task dispatched for detection {detection.id}")
- except json.JSONDecodeError as e:
- logger.error(f"Invalid JSON in MQTT message: {e}")
except KeyError as e:
logger.error(f"Missing required field in MQTT message: {e}")
except Exception as e:
- logger.error(f"Error processing MQTT message: {e}")
+ logger.error(f"Error processing new detection: {e}")
@staticmethod
def _parse_detected_at(value):
diff --git a/credentials/README.md b/credentials/README.md
index beaf23d..3011e0e 100644
--- a/credentials/README.md
+++ b/credentials/README.md
@@ -1,28 +1,33 @@
# Credentials 폴더
-이 폴더에는 GCP 및 Firebase 인증 관련 파일들이 위치합니다.
+이 폴더에는 Firebase 인증 파일이 위치합니다.
+GCS (Cloud Storage)는 ADC를 사용하므로 JSON 키가 불필요합니다.
## 파일 목록
| 파일명 | 용도 | 환경변수 |
|--------|------|----------|
| `firebase-service-account.json` | FCM 푸시 알림 | `FIREBASE_CREDENTIALS` |
-| `gcp-cloud-storage.json` | GCS 이미지 저장소 | `GOOGLE_APPLICATION_CREDENTIALS` |
+
+## 인증 방식
+
+| 서비스 | 인증 방식 | 설정 |
+|--------|----------|------|
+| **GCS (Cloud Storage)** | ADC (Application Default Credentials) | GCE: 자동, 로컬: `gcloud auth application-default login` |
+| **Firebase (FCM)** | Service Account JSON 키 | `FIREBASE_CREDENTIALS` 환경변수로 경로 지정 |
## 설정 방법
-### 1. Firebase Service Account
+### Firebase Service Account
1. [Firebase Console](https://console.firebase.google.com/) 접속
2. 프로젝트 설정 → 서비스 계정 → 새 비공개 키 생성
3. 다운로드한 JSON 파일을 `firebase-service-account.json`으로 저장
-### 2. GCP Service Account (Cloud Storage)
-1. [GCP Console](https://console.cloud.google.com/) 접속
-2. IAM 및 관리자 → 서비스 계정 → 키 생성
-3. 필요한 역할: `Storage Object Viewer`, `Storage Object Creator`
-4. 다운로드한 JSON 파일을 `gcp-cloud-storage.json`으로 저장
+### GCS (ADC)
+- GCE 인스턴스: 설정 불필요 (메타데이터 서버에서 자동 인증)
+- 로컬 개발: `gcloud auth application-default login` 실행
## 주의사항
-- ⚠️ **실제 인증 파일은 절대 Git에 커밋하지 마세요**
-- Docker 환경에서는 볼륨 마운트 또는 Secret Manager 사용 권장
+- `GOOGLE_APPLICATION_CREDENTIALS` 환경변수를 설정하지 마세요 (ADC 자동 탐색 방해)
+- Firebase JSON 키는 절대 Git에 커밋하지 마세요
diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml
index 94270e4..08d8c82 100644
--- a/docker/docker-compose.yml
+++ b/docker/docker-compose.yml
@@ -72,8 +72,6 @@ services:
container_name: speedcam-ocr
env_file:
- ../backend.env
- volumes:
- - ../credentials:/app/credentials:ro
depends_on:
- main
- rabbitmq
diff --git a/docker/k6/mqtt-load-test.py b/docker/k6/mqtt-load-test.py
index 89b836e..f827d01 100644
--- a/docker/k6/mqtt-load-test.py
+++ b/docker/k6/mqtt-load-test.py
@@ -1,28 +1,50 @@
#!/usr/bin/env python3
"""
-MQTT Load Test - IoT Device Simulation
-
-Simulates Raspberry Pi cameras sending detection messages via MQTT.
-Full pipeline: MQTT → Detection (pending) → OCR Worker → Alert Worker
+MQTT Full Pipeline Load Test
+Full pipeline: MQTT → Main → AMQP → OCR (EasyOCR) → AMQP Event → Alert
+
+Usage:
+ python mqtt-load-test.py smoke
+ python mqtt-load-test.py baseline
+ python mqtt-load-test.py saturation
+ python mqtt-load-test.py spike
+ python mqtt-load-test.py sustained
"""
import argparse
import json
import os
import random
+import sys
import threading
import time
from datetime import datetime, timedelta, timezone
+from typing import Dict, Optional
import paho.mqtt.client as mqtt
# Config from environment
-MQTT_HOST = os.getenv("MQTT_HOST", "rabbitmq")
+MQTT_HOST = os.getenv("MQTT_HOST", "10.178.0.7")
MQTT_PORT = int(os.getenv("MQTT_PORT", "1883"))
MQTT_USER = os.getenv("MQTT_USER", "sa")
MQTT_PASS = os.getenv("MQTT_PASS", "1234")
TOPIC = "detections/new"
+# Database config
+DB_HOST = os.getenv("DB_HOST", "10.178.0.2")
+DB_PORT = int(os.getenv("DB_PORT", "3306"))
+DB_USER = os.getenv("DB_USER", "sa")
+DB_PASS = os.getenv("DB_PASS", "hG57vv9aJHPGm0X82xlQ")
+DB_NAME = os.getenv("DB_NAME_DETECTIONS", "speedcam_detections")
+
+# GCS 설정
+GCS_BUCKET = os.getenv("GCS_BUCKET", "speedcam-bucket-4f918446")
+GCS_IMAGES = (
+ [f"real-plate-{str(i).zfill(2)}.jpg" for i in range(1, 11)]
+ + [f"plate-{str(i).zfill(2)}.jpg" for i in range(1, 11)]
+ + [f"test-plate-{i}.jpg" for i in range(1, 6)]
+)
+
# Locations for realistic simulation
LOCATIONS = [
"서울시 강남구 테헤란로",
@@ -37,6 +59,60 @@
CAMERA_IDS = [f"CAM-{str(i).zfill(3)}" for i in range(1, 21)]
+# Scenario definitions
+SCENARIOS = {
+ "smoke": {
+ "workers": 1,
+ "rate": 0, # Manual single message
+ "duration": 0,
+ "expected_messages": 1,
+ "expected_throughput": 0.2,
+ "expected_queue_depth": 0,
+ "expected_error_rate": 0,
+ "description": "Verify pipeline works end-to-end",
+ },
+ "baseline": {
+ "workers": 1,
+ "rate": 0.2,
+ "duration": 60,
+ "expected_messages": 12,
+ "expected_throughput": 0.2,
+ "expected_queue_depth": 0,
+ "expected_error_rate": 1,
+ "description": "Match OCR capacity, measure steady-state latency",
+ },
+ "saturation": {
+ "workers": 3,
+ "rate": 1,
+ "duration": 60,
+ "expected_messages": 180,
+ "expected_throughput": 0.2,
+ "expected_queue_depth": 168, # ~180 - 12 processed
+ "expected_error_rate": 1,
+ "description": "Exceed OCR capacity to test queuing behavior",
+ },
+ "spike": {
+ "workers": 5,
+ "rate": 2,
+ "duration": 10,
+ "expected_messages": 100,
+ "expected_throughput": 0.2,
+ "expected_queue_depth": 98, # ~100 - 2 processed
+ "expected_error_rate": 1,
+ "description": "Sudden burst, measure recovery",
+ },
+ "sustained": {
+ "workers": 2,
+ "rate": 0.5,
+ "duration": 300,
+ "expected_messages": 300,
+ "expected_throughput": 0.2,
+ "expected_queue_depth": 240, # ~300 - 60 processed
+ "expected_error_rate": 1,
+ "description": "Long-running stability test",
+ },
+}
+
# Stats
stats = {
"published": 0,
@@ -47,7 +123,7 @@
stats_lock = threading.Lock()
-def generate_message():
+def generate_message() -> str:
"""Generate a realistic detection message."""
kst = timezone(timedelta(hours=9))
speed_limit = random.choice([60.0, 80.0, 100.0, 110.0])
@@ -61,14 +137,55 @@ def generate_message():
"speed_limit": speed_limit,
"detected_at": datetime.now(kst).isoformat(),
"image_gcs_uri": (
- f"gs://speedcam-bucket/detections/"
- f"{int(time.time() * 1000)}-{random.randint(1000, 9999)}.jpg"
+ f"gs://{GCS_BUCKET}/detections/{random.choice(GCS_IMAGES)}"
),
}
)
-def publish_worker(worker_id, rate_per_sec, duration_sec):
+def publish_single_message() -> bool:
+ """Publish a single message for smoke test."""
+ client = mqtt.Client(
+ callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
+ protocol=mqtt.MQTTv311,
+ client_id=f"loadtest-smoke-{os.getpid()}",
+ )
+ client.username_pw_set(MQTT_USER, MQTT_PASS)
+
+ try:
+ client.connect(MQTT_HOST, MQTT_PORT, keepalive=60)
+ client.loop_start()
+ time.sleep(0.5) # Allow connection to establish
+
+ msg = generate_message()
+ start = time.time()
+ result = client.publish(TOPIC, msg, qos=1)
+ result.wait_for_publish(timeout=5)
+
+ if result.rc == mqtt.MQTT_ERR_SUCCESS:
+ latency_ms = (time.time() - start) * 1000
+ print(f"✓ Message published successfully (latency: {latency_ms:.2f}ms)")
+ print(f" Payload: {msg[:100]}...")
+ with stats_lock:
+ stats["published"] = 1
+ stats["total_latency_ms"] = latency_ms
+ return True
+ else:
+ print(f"✗ Publish failed with code {result.rc}")
+ with stats_lock:
+ stats["failed"] = 1
+ return False
+ except Exception as e:
+ print(f"✗ Exception during publish: {e}")
+ with stats_lock:
+ stats["failed"] = 1
+ return False
+ finally:
+ client.loop_stop()
+ client.disconnect()
+
+
+def publish_worker(worker_id: int, rate_per_sec: float, duration_sec: int):
"""Single worker thread that publishes MQTT messages."""
client = mqtt.Client(
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
@@ -123,22 +240,298 @@ def print_stats():
print(f"\n{'='*60}")
print(f" Elapsed: {elapsed:.1f}s | Published: {published} | Failed: {failed}")
- print(f" Rate: {rate:.1f} msg/s | Avg Latency: {avg_latency:.2f}ms")
+ print(f" Rate: {rate:.2f} msg/s | Avg Latency: {avg_latency:.2f}ms")
print(f" Error Rate: {(failed/total*100) if total > 0 else 0:.2f}%")
print(f"{'='*60}")
-def run_load_test(workers, rate_per_worker, duration):
- """Run the load test with multiple workers."""
- print("\n MQTT Load Test Starting")
- print(f" Host: {MQTT_HOST}:{MQTT_PORT}")
- print(f" Workers: {workers}")
+def get_db_connection():
+ """Try to import and connect to MySQL database."""
+ # Try pymysql first
+ try:
+ import pymysql
+
+ conn = pymysql.connect(
+ host=DB_HOST,
+ port=DB_PORT,
+ user=DB_USER,
+ password=DB_PASS,
+ database=DB_NAME,
+ charset="utf8mb4",
+ cursorclass=pymysql.cursors.DictCursor,
+ )
+ return conn, "pymysql"
+ except ImportError:
+ pass
+ except Exception as e:
+ print(f"⚠ pymysql connection failed: {e}")
+
+ # Try mysql.connector
+ try:
+ import mysql.connector
+
+ conn = mysql.connector.connect(
+ host=DB_HOST,
+ port=DB_PORT,
+ user=DB_USER,
+ password=DB_PASS,
+ database=DB_NAME,
+ charset="utf8mb4",
+ )
+ return conn, "mysql.connector"
+ except ImportError:
+ pass
+ except Exception as e:
+ print(f"⚠ mysql.connector connection failed: {e}")
+
+ return None, None
+
+
+def verify_pipeline(expected_count: int, max_wait_sec: int = 30) -> Optional[Dict]:
+ """Verify the pipeline by querying MySQL database."""
+ conn, driver = get_db_connection()
+ if not conn:
+ print(
+ "\n⚠ WARNING: No MySQL driver available (pymysql or mysql-connector-python)"
+ )
+ print(" Pipeline verification skipped. Install pymysql to enable:")
+ print(" pip install pymysql")
+ return None
+
+ print(f"\n{'='*60}")
+ print("=== PIPELINE VERIFICATION ===")
+ print(f" Database: {DB_HOST}:{DB_PORT}/{DB_NAME} (driver: {driver})")
+ print(f"{'='*60}")
+
+ try:
+ cursor = conn.cursor()
+
+ # Initial check
+ cursor.execute(
+ """
+ SELECT
+ COUNT(*) as total,
+ SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed,
+ SUM(CASE WHEN status = 'processing' THEN 1 ELSE 0 END) as processing,
+ SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) as pending,
+ SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed
+ FROM detections
+ WHERE created_at >= DATE_SUB(NOW(), INTERVAL 10 MINUTE)
+ """
+ )
+
+ if driver == "pymysql":
+ result = cursor.fetchone()
+ else: # mysql.connector
+ result = cursor.fetchone()
+ # Convert tuple to dict for mysql.connector
+ columns = [desc[0] for desc in cursor.description]
+ result = dict(zip(columns, result))
+
+ initial_total = result["total"] or 0
+ print("\nInitial state (last 10 minutes):")
+ print(f" Total detections: {initial_total}")
+ print(f" - completed: {result['completed'] or 0}")
+ print(f" - processing: {result['processing'] or 0}")
+ print(f" - pending: {result['pending'] or 0}")
+ print(f" - failed: {result['failed'] or 0}")
+
+ # Wait for pipeline to process
+ if max_wait_sec > 0:
+ print(f"\nWaiting up to {max_wait_sec}s for pipeline to drain...")
+ wait_start = time.time()
+ while time.time() - wait_start < max_wait_sec:
+ cursor.execute(
+ """
+ SELECT
+ COUNT(*) as total,
+ SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed,
+ SUM(CASE WHEN status = 'processing' THEN 1 ELSE 0 END) as processing,
+ SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) as pending,
+ SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed
+ FROM detections
+ WHERE created_at >= DATE_SUB(NOW(), INTERVAL 10 MINUTE)
+ """
+ )
+
+ if driver == "pymysql":
+ result = cursor.fetchone()
+ else:
+ result = cursor.fetchone()
+ columns = [desc[0] for desc in cursor.description]
+ result = dict(zip(columns, result))
+
+ processing = result["processing"] or 0
+ pending = result["pending"] or 0
+
+ if processing == 0 and pending == 0:
+ print(f"✓ Pipeline drained after {time.time() - wait_start:.1f}s")
+ break
+
+ print(
+ f" [{time.time() - wait_start:.1f}s] processing: {processing}, pending: {pending}"
+ )
+ time.sleep(2)
+
+ # Final status
+ cursor.execute(
+ """
+ SELECT
+ COUNT(*) as total,
+ SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed,
+ SUM(CASE WHEN status = 'processing' THEN 1 ELSE 0 END) as processing,
+ SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) as pending,
+ SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed
+ FROM detections
+ WHERE created_at >= DATE_SUB(NOW(), INTERVAL 10 MINUTE)
+ """
+ )
+
+ if driver == "pymysql":
+ final_result = cursor.fetchone()
+ else:
+ final_result = cursor.fetchone()
+ columns = [desc[0] for desc in cursor.description]
+ final_result = dict(zip(columns, final_result))
+
+ print("\nFinal state:")
+ print(f" Total detections created: {final_result['total']}")
+ print(f" - completed: {final_result['completed'] or 0}")
+ print(f" - processing: {final_result['processing'] or 0}")
+ print(f" - pending: {final_result['pending'] or 0}")
+ print(f" - failed: {final_result['failed'] or 0}")
+
+ total = final_result["total"] or 0
+ completed = final_result["completed"] or 0
+ completion_rate = (completed / total * 100) if total > 0 else 0
+ print(
+ f" Pipeline completion rate: {completed}/{total} = {completion_rate:.1f}%"
+ )
+
+ cursor.close()
+ return final_result
+ except Exception as e:
+ print(f"✗ Pipeline verification error: {e}")
+ return None
+ finally:
+ conn.close()
+
+
+def print_hypothesis(scenario_name: str, config: Dict):
+ """Print hypothesis for the scenario."""
+ print(f"\n{'='*60}")
+ print("=== HYPOTHESIS ===")
+ print(f" Scenario: {scenario_name.upper()} - {config['description']}")
+ print(f" Expected published: {config['expected_messages']} messages")
+ print(
+ f" Expected OCR throughput: ~{config['expected_throughput']} msg/s (OCR_CONCURRENCY=1, ~5s/image)"
+ )
+ if config["duration"] > 0:
+ expected_processed = int(config["expected_throughput"] * config["duration"])
+ print(
+ f" Expected processed in {config['duration']}s: ~{expected_processed} messages"
+ )
+ print(f" Expected queue depth at end: ~{config['expected_queue_depth']} messages")
+ print(f" Expected error rate: <{config['expected_error_rate']}%")
+ print(f"{'='*60}\n")
+
+
+def print_comparison(scenario_name: str, config: Dict, db_result: Optional[Dict]):
+ """Print comparison between hypothesis and actual results."""
+ print(f"\n{'='*60}")
+ print("=== COMPARISON ===")
+
+ # Published vs Expected
+ published = stats["published"]
+ expected_pub = config["expected_messages"]
+ pub_match = abs(published - expected_pub) <= max(
+ 1, expected_pub * 0.1
+ ) # 10% tolerance
+ print(
+ f" Published: {published} vs Expected: {expected_pub} {'✓ PASS' if pub_match else '✗ FAIL'}"
+ )
+
+ # Error rate
+ failed = stats["failed"]
+ total = published + failed
+ actual_error_rate = (failed / total * 100) if total > 0 else 0
+ expected_error_rate = config["expected_error_rate"]
+ error_match = actual_error_rate <= expected_error_rate
+ err_status = "✓ PASS" if error_match else "✗ FAIL"
print(
- f" Rate: {rate_per_worker}/s per worker ({workers * rate_per_worker}/s total)"
+ f" Error rate: {actual_error_rate:.2f}% vs Expected: <{expected_error_rate}% {err_status}"
)
+
+ # Queue depth (from DB if available)
+ if db_result:
+ processing = db_result["processing"] or 0
+ pending = db_result["pending"] or 0
+ actual_queue = processing + pending
+ expected_queue = config["expected_queue_depth"]
+ print(f" Queue depth: {actual_queue} vs Expected: ~{expected_queue} (INFO)")
+
+ # Pipeline completion
+ completed = db_result["completed"] or 0
+ db_total = db_result["total"] or 0
+ if db_total > 0:
+ completion_rate = completed / db_total * 100
+ print(
+ f" Pipeline completion: {completion_rate:.1f}% ({completed}/{db_total})"
+ )
+
+ print(f"{'='*60}\n")
+
+
+def run_scenario(scenario_name: str):
+ """Run a specific test scenario."""
+ if scenario_name not in SCENARIOS:
+ print(f"✗ Unknown scenario: {scenario_name}")
+ print(f" Available scenarios: {', '.join(SCENARIOS.keys())}")
+ sys.exit(1)
+
+ config = SCENARIOS[scenario_name]
+
+ # Print hypothesis
+ print_hypothesis(scenario_name, config)
+
+ # Handle smoke test separately
+ if scenario_name == "smoke":
+ print("=== EXECUTION (SMOKE TEST) ===")
+ print("Publishing single message...\n")
+ stats["start_time"] = time.time()
+ success = publish_single_message()
+
+ if success:
+ print("\n=== RESULTS ===")
+ print(" Published: 1 | Failed: 0")
+ print(f" Avg Latency: {stats['total_latency_ms']:.2f}ms")
+
+ # Wait a bit for pipeline
+ print("\nWaiting 10s for pipeline to process...")
+ time.sleep(10)
+
+ # Verify pipeline
+ db_result = verify_pipeline(expected_count=1, max_wait_sec=20)
+
+ # Print comparison
+ print_comparison(scenario_name, config, db_result)
+ else:
+ print("\n✗ Smoke test failed - message not published")
+ sys.exit(1)
+
+ return
+
+ # Regular load test
+ workers = config["workers"]
+ rate = config["rate"]
+ duration = config["duration"]
+
+ print("=== EXECUTION ===")
+ print(f" Host: {MQTT_HOST}:{MQTT_PORT}")
+ print(f" Workers: {workers}")
+ print(f" Rate: {rate}/s per worker ({workers * rate}/s total)")
print(f" Duration: {duration}s")
- print(f" Topic: {TOPIC}")
- print()
+ print(f" Topic: {TOPIC}\n")
stats["start_time"] = time.time()
threads = []
@@ -146,7 +539,7 @@ def run_load_test(workers, rate_per_worker, duration):
for i in range(workers):
t = threading.Thread(
target=publish_worker,
- args=(i, rate_per_worker, duration),
+ args=(i, rate, duration),
)
t.start()
threads.append(t)
@@ -160,24 +553,57 @@ def run_load_test(workers, rate_per_worker, duration):
for t in threads:
t.join(timeout=10)
- print("\n FINAL RESULTS")
- print_stats()
+ # Final results
+ print("\n=== RESULTS ===")
+ elapsed = time.time() - stats["start_time"]
+ published = stats["published"]
+ failed = stats["failed"]
+ total = published + failed
+ rate_actual = published / elapsed if elapsed > 0 else 0
+ avg_latency = stats["total_latency_ms"] / published if published > 0 else 0
+
+ print(f" Published: {published} | Failed: {failed}")
+ print(f" Rate: {rate_actual:.2f} msg/s | Avg Latency: {avg_latency:.2f}ms")
+ print(f" Error Rate: {(failed/total*100) if total > 0 else 0:.2f}%")
+
+ # Verify pipeline
+ wait_time = min(30, duration // 2) # Wait up to 30s or half test duration
+ db_result = verify_pipeline(expected_count=published, max_wait_sec=wait_time)
+
+ # Print comparison
+ print_comparison(scenario_name, config, db_result)
def main():
- parser = argparse.ArgumentParser(description="MQTT Load Test")
- parser.add_argument(
- "--workers", type=int, default=5, help="Number of concurrent workers"
- )
- parser.add_argument(
- "--rate", type=int, default=2, help="Messages per second per worker"
+ parser = argparse.ArgumentParser(
+ description="MQTT Full Pipeline Load Test",
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ epilog="""
+Available scenarios:
+ smoke - Verify pipeline works end-to-end (1 message)
+ baseline - Match OCR capacity (1 worker, 0.2 msg/s, 60s)
+ saturation - Exceed OCR capacity (3 workers, 1 msg/s, 60s)
+ spike - Sudden burst (5 workers, 2 msg/s, 10s)
+ sustained - Long-running stability (2 workers, 0.5 msg/s, 300s)
+
+Examples:
+ python mqtt-load-test.py smoke
+ python mqtt-load-test.py baseline
+ python mqtt-load-test.py saturation
+ """,
)
parser.add_argument(
- "--duration", type=int, default=60, help="Test duration in seconds"
+ "scenario", choices=list(SCENARIOS.keys()), help="Test scenario to run"
)
+
args = parser.parse_args()
- run_load_test(args.workers, args.rate, args.duration)
+ # Verify MQTT credentials
+ if not MQTT_PASS:
+ print("✗ Error: MQTT_PASS environment variable not set")
+ sys.exit(1)
+
+ run_scenario(args.scenario)
if __name__ == "__main__":
diff --git a/docs/PRD.md b/docs/PRD.md
index faf015e..e5dca8b 100644
--- a/docs/PRD.md
+++ b/docs/PRD.md
@@ -105,11 +105,13 @@ graph TB
OCR -->|6. 이미지 다운로드| GCS
OCR -->|7. 결과 업데이트| MySQL_Detections
OCR -->|7-1. 차량 조회| MySQL_Vehicles
- OCR -->|8. AMQP Publish| AMQP
+ OCR -->|8. MQTT Publish
detections/completed| MQTT
+ MQTT -->|9. MQTT Subscribe| Django
+ Django -->|10. AMQP Publish| AMQP
AMQP -->|fcm_queue| FCM
- FCM -->|9. 차량/토큰 조회| MySQL_Vehicles
- FCM -->|10. 푸시 전송| Firebase
- FCM -->|11. 이력 저장| MySQL_Notifications
+ FCM -->|11. 차량/토큰 조회| MySQL_Vehicles
+ FCM -->|12. 푸시 전송| Firebase
+ FCM -->|13. 이력 저장| MySQL_Notifications
```
### 2.4 이벤트 흐름 (Sequence Diagram)
@@ -140,16 +142,17 @@ sequenceDiagram
OCR->>OCR: 8. EasyOCR 실행
OCR->>DDB: 9. 직접 업데이트 (status=completed)
OCR->>VDB: 10. 번호판으로 Vehicle 조회
- alt 차량 & FCM 토큰 존재
- OCR->>DDB: 11. vehicle_id 매핑
- OCR->>AMQP: 12. Publish to fcm_exchange (Direct)
- end
-
- AMQP->>FCM: 13. Consume from fcm_queue
- FCM->>DDB: 14. Detection 조회
- FCM->>VDB: 15. Vehicle/FCM 토큰 조회
- FCM->>FCM: 16. FCM API 호출
- FCM->>NDB: 17. 알림 이력 저장
+ OCR->>DDB: 11. vehicle_id 매핑
+ OCR->>MQTT: 12. MQTT Publish (detections/completed)
+
+ MQTT->>Django: 13. MQTT Subscribe (detections/completed)
+ Django->>AMQP: 14. Publish to fcm_exchange
+
+ AMQP->>FCM: 15. Consume from fcm_queue
+ FCM->>DDB: 16. Detection 조회
+ FCM->>VDB: 17. Vehicle/FCM 토큰 조회
+ FCM->>FCM: 18. FCM API 호출
+ FCM->>NDB: 19. 알림 이력 저장
```
---
@@ -181,11 +184,15 @@ sequenceDiagram
| Image Processing | OpenCV | 4.10.0.84 |
| Image Library | Pillow | 11.2.1 |
-### 3.4 Monitoring (Optional)
+### 3.4 Monitoring & Observability
| 구분 | 기술 | 용도 |
|------|------|------|
+| Metrics | Prometheus + Grafana | 시스템/컨테이너 메트릭 수집 및 시각화 |
+| Logging | Loki + Promtail | 중앙 집중식 로그 수집 및 검색 |
+| Tracing | OpenTelemetry + Jaeger | 분산 트레이싱 (서비스 간 요청 추적) |
| Task Monitoring | Flower | Celery Task 모니터링 |
| Queue Dashboard | RabbitMQ Management | Queue 상태 확인 |
+| Container Metrics | cAdvisor | 컨테이너 리소스 사용량 |
---
@@ -397,15 +404,17 @@ graph LR
direction TB
M1[Raspberry Pi] -->|Publish| M2[detections/new]
M2 -->|Subscribe| M3[Django]
+ M4[OCR Worker] -->|Publish| M5[detections/completed]
+ M5 -->|Subscribe| M3
end
-
+
subgraph AMQP["AMQP (Port 5672)"]
direction TB
A1[Django] -->|Publish| A2[ocr_exchange]
A2 -->|Route| A3[ocr_queue]
A3 -->|Consume| A4[OCR Worker]
-
- A4 -->|Publish| A5[fcm_exchange]
+
+ A1 -->|Publish| A5[fcm_exchange]
A5 -->|Route| A6[fcm_queue]
A6 -->|Consume| A7[Alert Worker]
end
@@ -413,8 +422,8 @@ graph LR
| 프로토콜 | 용도 | 특징 |
|----------|------|------|
-| **MQTT** | Raspberry Pi → Django | 경량 프로토콜, IoT 디바이스에 적합, QoS 1 |
-| **AMQP** | Django ↔ Celery Workers | 안정적인 메시지 전달, Exchange/Queue 라우팅 |
+| **MQTT** | IoT → Django, OCR → Django (도메인 이벤트) | 경량 프로토콜, QoS 1, Choreography 이벤트 전파 |
+| **AMQP** | Django → Celery Workers (Task 분배) | 안정적인 메시지 전달, Exchange/Queue 라우팅 |
### 5.2 Exchange 설계
@@ -512,6 +521,18 @@ QUEUES = {
│ Detection 업데이트 (detections_db)
│ Vehicle 조회 (vehicles_db)
│
+ │ MQTT Publish (Choreography)
+ │ Topic: detections/completed
+ │ QoS: 1
+ ▼
+[RabbitMQ MQTT Plugin]
+ │
+ │ MQTT Subscribe
+ ▼
+[Django MQTT Subscriber]
+ │
+ │ 이벤트 수신 & 라우팅
+ │
│ AMQP Publish
│ Exchange: fcm_exchange
│ Routing Key: fcm
@@ -674,7 +695,8 @@ backend/
│ ├── __init__.py
│ ├── mqtt/
│ │ ├── __init__.py
-│ │ └── subscriber.py # Main Service 전용
+│ │ ├── publisher.py # 도메인 이벤트 발행 (Choreography)
+│ │ └── subscriber.py # 도메인 이벤트 수신 및 라우팅
│ ├── gcs/
│ │ ├── __init__.py
│ │ └── client.py # GCS 클라이언트
@@ -743,6 +765,7 @@ easyocr==1.7.2
opencv-python-headless==4.10.0.84
pillow==11.2.1
google-cloud-storage==2.18.2
+paho-mqtt==2.1.0
```
**requirements/alert.txt** (Alert Service)
@@ -1015,81 +1038,132 @@ app.autodiscover_tasks(['tasks'])
import json
import os
import logging
-import threading
import paho.mqtt.client as mqtt
from django.utils import timezone
-from apps.detections.models import Detection
-from tasks.ocr_tasks import process_ocr
+from django.utils.dateparse import parse_datetime
logger = logging.getLogger(__name__)
class MQTTSubscriber:
+ """
+ RabbitMQ MQTT Plugin을 통해 도메인 이벤트를 수신하는 Subscriber
+
+ Choreography 패턴: 각 서비스는 이벤트를 발행하고,
+ 관심 있는 서비스가 독립적으로 구독하여 처리한다.
+
+ 구독 토픽:
+ - detections/new : IoT 디바이스 → Detection 생성 → OCR 발행
+ - detections/completed : OCR 완료 이벤트 → Notification 발행
+ """
+
def __init__(self):
self.client = mqtt.Client(
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
- protocol=mqtt.MQTTv5,
+ protocol=mqtt.MQTTv311,
client_id=f"django-main-{os.getpid()}"
)
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.on_disconnect = self.on_disconnect
-
- # 인증 설정
- username = os.getenv('MQTT_USER', 'sa')
- password = os.getenv('MQTT_PASS', '1234')
+
+ username = os.getenv('MQTT_USER', '')
+ password = os.getenv('MQTT_PASS', '')
self.client.username_pw_set(username, password)
-
- def on_connect(self, client, userdata, flags, rc, properties=None):
- logger.info(f"Connected to MQTT broker with code {rc}")
- client.subscribe("detections/new", qos=1)
-
+
+ def on_connect(self, client, userdata, flags, reason_code, properties):
+ if reason_code.is_failure:
+ logger.error(f"MQTT connection failed: {reason_code}")
+ else:
+ logger.info("Connected to MQTT broker")
+ client.subscribe("detections/new", qos=1)
+ client.subscribe("detections/completed", qos=1)
+
def on_message(self, client, userdata, msg):
+ """토픽별 메시지 라우팅"""
try:
payload = json.loads(msg.payload.decode())
- logger.info(f"Received MQTT message: {payload}")
-
- # 1. pending 레코드 즉시 생성 (detections_db)
- detection = Detection.objects.using('detections_db').create(
- camera_id=payload.get('camera_id'),
- location=payload.get('location'),
- detected_speed=payload['detected_speed'],
- speed_limit=payload.get('speed_limit', 60.0),
- detected_at=payload.get('detected_at', timezone.now()),
- image_gcs_uri=payload['image_gcs_uri'],
- status='pending'
- )
-
- logger.info(f"Created detection {detection.id} with pending status")
-
- # 2. OCR Task 발행 (AMQP)
- process_ocr.apply_async(
- args=[detection.id],
- kwargs={'gcs_uri': payload['image_gcs_uri']},
- queue='ocr_queue',
- priority=5
- )
-
- logger.info(f"Dispatched OCR task for detection {detection.id}")
-
- except Exception as e:
- logger.error(f"Error processing MQTT message: {e}")
-
- def on_disconnect(self, client, userdata, rc, properties=None):
- logger.warning(f"Disconnected from MQTT broker with code {rc}")
-
+ except json.JSONDecodeError as e:
+ logger.error(f"Invalid JSON in MQTT message: {e}")
+ return
+
+ if msg.topic == "detections/new":
+ self._handle_new_detection(payload)
+ elif msg.topic == "detections/completed":
+ self._handle_detection_completed(payload)
+ else:
+ logger.warning(f"Unknown MQTT topic: {msg.topic}")
+
+ def _handle_new_detection(self, payload):
+ """detections/new → Detection 생성 → OCR Task 발행"""
+ from apps.detections.models import Detection
+ from tasks.ocr_tasks import process_ocr
+
+ detection = Detection.objects.using('detections_db').create(
+ camera_id=payload.get('camera_id'),
+ location=payload.get('location'),
+ detected_speed=payload['detected_speed'],
+ speed_limit=payload.get('speed_limit', 60.0),
+ detected_at=payload.get('detected_at', timezone.now()),
+ image_gcs_uri=payload['image_gcs_uri'],
+ status='pending'
+ )
+
+ process_ocr.apply_async(
+ args=[detection.id],
+ kwargs={'gcs_uri': payload['image_gcs_uri']},
+ queue='ocr_queue',
+ priority=5
+ )
+
+ def _handle_detection_completed(self, payload):
+ """detections/completed → Notification Task 발행 (Choreography)"""
+ detection_id = payload["detection_id"]
+
+ from tasks.notification_tasks import send_notification
+ send_notification.apply_async(
+ args=[detection_id], queue="fcm_queue"
+ )
+
def start(self):
host = os.getenv('RABBITMQ_HOST', 'rabbitmq')
port = int(os.getenv('MQTT_PORT', 1883))
- logger.info(f"Connecting to MQTT broker at {host}:{port}")
self.client.connect(host, port, 60)
self.client.loop_forever()
+```
-def start_mqtt_subscriber():
- """백그라운드 스레드에서 MQTT Subscriber 시작"""
- subscriber = MQTTSubscriber()
- thread = threading.Thread(target=subscriber.start, daemon=True)
- thread.start()
- logger.info("MQTT Subscriber started in background thread")
+```python
+# core/mqtt/publisher.py
+"""도메인 이벤트 발행"""
+import json
+import logging
+import os
+import paho.mqtt.client as mqtt
+
+logger = logging.getLogger(__name__)
+
+def publish_event(topic: str, payload: dict):
+ """Choreography 패턴에서 서비스 간 이벤트 전파"""
+ host = os.getenv("RABBITMQ_HOST", "rabbitmq")
+ port = int(os.getenv("MQTT_PORT", 1883))
+
+ client = mqtt.Client(
+ callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
+ protocol=mqtt.MQTTv311,
+ client_id="",
+ )
+ client.username_pw_set(
+ os.getenv("MQTT_USER", ""),
+ os.getenv("MQTT_PASS", "")
+ )
+
+ try:
+ client.connect(host, port, keepalive=10)
+ client.loop_start()
+ result = client.publish(topic, json.dumps(payload), qos=1)
+ result.wait_for_publish(timeout=5)
+ finally:
+ client.loop_stop()
+ client.disconnect()
```
### 9.2 OCR Service (Celery Worker)
@@ -1123,8 +1197,7 @@ def mock_ocr_result():
def process_ocr(self, detection_id: int, gcs_uri: str):
from apps.detections.models import Detection
from apps.vehicles.models import Vehicle
- from tasks.notification_tasks import send_notification
-
+
logger.info(f"Processing OCR for detection {detection_id}")
try:
@@ -1180,14 +1253,18 @@ def process_ocr(self, detection_id: int, gcs_uri: str):
if vehicle:
detection.vehicle_id = vehicle.id
detection.save(update_fields=['vehicle_id', 'updated_at'])
-
- # 7. FCM 토큰이 있으면 알림 Task 발행
- if vehicle.fcm_token:
- send_notification.apply_async(
- args=[detection_id],
- queue='fcm_queue'
- )
-
+
+ # 7. detection.completed 이벤트 발행 (Choreography)
+ # OCR은 알림의 존재를 모른다. 이벤트만 발행하고 끝.
+ try:
+ from core.mqtt.publisher import publish_event
+ publish_event(
+ "detections/completed",
+ {"detection_id": detection_id},
+ )
+ except Exception as e:
+ logger.warning(f"Failed to publish completion event: {e}")
+
logger.info(f"OCR completed for detection {detection_id}: {plate_number}")
return {
'detection_id': detection_id,
@@ -1617,8 +1694,8 @@ CORS_ALLOWED_ORIGINS=http://localhost:5173,http://localhost:3000
- DLQ로 실패한 Task 별도 관리
### 12.4 프로토콜 분리
-- **MQTT**: IoT 디바이스(Raspberry Pi) 통신용 경량 프로토콜
-- **AMQP**: 백엔드 서비스 간 안정적인 메시지 전달
+- **MQTT**: 도메인 이벤트 전파 (IoT→Main, OCR→Main) — Choreography 패턴의 이벤트 버스
+- **AMQP**: Task 분배 (Main→Workers) — Celery를 통한 안정적인 작업 큐잉
### 12.5 GIL 병목 회피
- **OCR Worker**: `prefork` pool (multiprocessing) - CPU 집약적
@@ -1643,3 +1720,9 @@ CORS_ALLOWED_ORIGINS=http://localhost:5173,http://localhost:3000
| | | - Python 3.12로 버전 업데이트 |
| | | - DataDog 관련 설정 제거 (Optional) |
| | | - Mock 모드 추가 (OCR_MOCK, FCM_MOCK) |
+| 3.0 | 2026-02 | Choreography 패턴 구현 반영 |
+| | | - OCR → Alert 직접 호출 제거 (Orchestration → Choreography) |
+| | | - MQTT Event Publisher 추가 (detections/completed) |
+| | | - Subscriber 토픽 라우팅 구현 |
+| | | - Monitoring 스택 반영 (Prometheus, Grafana, Loki, Jaeger) |
+| | | - OCR Service에 paho-mqtt 의존성 추가 |
diff --git a/docs/load-testing.md b/docs/load-testing.md
new file mode 100644
index 0000000..e62d644
--- /dev/null
+++ b/docs/load-testing.md
@@ -0,0 +1,878 @@
+# SpeedCam 부하 테스트 전략
+
+## 개요
+
+SpeedCam은 IoT 카메라에서 MQTT를 통해 이미지를 수신하고, Django 애플리케이션이 AMQP/Celery를 통해 OCR 작업을 처리한 후, 감지된 결과를 Alert Service로 전달하는 완전한 파이프라인 아키텍처입니다. 이 문서는 이 복잡한 시스템의 성능을 검증하고 병목 지점을 식별하기 위한 부하 테스트 전략을 설명합니다.
+
+## 시스템 아키텍처
+
+### 처리 파이프라인
+
+```
+IoT Camera (MQTT)
+ ↓
+Main Service (Django + Gunicorn + MQTT Subscriber)
+ ↓
+AMQP/Celery Queue
+ ↓
+OCR Worker (EasyOCR on CPU)
+ ↓
+AMQP Domain Event (detections.completed)
+ ↓
+Alert Service (kombu consumer → FCM)
+```
+
+### 배포 인프라 (GCE asia-northeast3-a)
+
+| 인스턴스 | 역할 | 핵심 설정 |
+|---------|------|---------|
+| **speedcam-app** | Django + Gunicorn + MQTT Subscriber | GUNICORN_WORKERS=2 |
+| **speedcam-db** | MySQL 8.0 데이터베이스 | - |
+| **speedcam-mq** | RabbitMQ (MQTT plugin + AMQP) | - |
+| **speedcam-ocr** | Celery OCR Worker (EasyOCR) | OCR_CONCURRENCY=1 |
+| **speedcam-alert** | Domain Event Consumer → FCM | FCM_MOCK=true |
+| **speedcam-mon** | 모니터링 스택 | Prometheus, Grafana, Loki, Jaeger |
+
+## 측정된 성능 기준치 (Baselines)
+
+현재 환경에서 측정된 주요 성능 메트릭:
+
+| 메트릭 | 값 | 설명 |
+|-------|-----|------|
+| **MQTT 발행 지연** | ~0.3ms | 카메라에서 브로커까지의 지연 시간 |
+| **OCR 처리 시간** | 4-5s | 캐시된 EasyOCR Reader로 단일 이미지 처리 |
+| **OCR 모델 초기 로딩** | ~30s | EasyOCR 모델 처음 로드 시간 |
+| **OCR 최대 처리량** | ~0.2 msg/s | CONCURRENCY=1 설정에서의 이론적 한계 |
+| **Alert 이벤트 처리** | <1s | detections.completed 이벤트에서 FCM 전송까지 |
+| **엔드-투-엔드 지연** | ~5-6s | MQTT 발행에서 Alert 완료까지 전체 파이프라인 |
+
+## 테스트 시나리오
+
+부하 테스트는 다양한 부하 프로파일 하에서 시스템의 동작을 검증하기 위해 5개의 시나리오로 구성됩니다.
+
+### 시나리오 정의표
+
+| 시나리오 | 워커 수 | 전송률 | 지속시간 | 총 메시지 수 | 목표 및 검증 사항 |
+|---------|--------|-------|---------|------------|-----------------|
+| **smoke** | 1 | 1 msg (수동) | - | 1 | 파이프라인 기본 동작 검증 |
+| **baseline** | 1 | 0.2/s | 60s | ~12 | OCR 최대 처리량 수준에서 안정성 검증 |
+| **saturation** | 3 | 1/s | 60s | ~180 | OCR 워커 포화 상태 시뮬레이션 |
+| **spike** | 5 | 2/s | 10s | ~100 | 갑작스러운 트래픽 증가 대응 능력 검증 |
+| **sustained** | 2 | 0.5/s | 300s | ~300 | 장시간 안정적 운영 능력 검증 |
+
+### 각 시나리오의 상세 설명
+
+#### 1. Smoke Test (스모크 테스트)
+- **목적**: 파이프라인이 정상 작동하는지 기본 검증
+- **가설**: 단일 메시지는 지연 없이 전체 파이프라인을 통과
+- **수행 방법**: 수동으로 하나의 MQTT 메시지 발행
+- **검증 항목**:
+ - 메시지가 speedcam-app에서 수신됨
+ - Celery 작업이 생성됨
+ - OCR 처리 완료
+ - Alert 이벤트가 발행됨
+ - 종단간 지연이 ~5-6초 범위
+
+#### 2. Baseline Test (기준선 테스트)
+- **목적**: OCR 워커의 이론적 최대 처리량에서 안정성 검증
+- **가설**: OCR_CONCURRENCY=1 설정에서 0.2 msg/s 지속 가능
+- **초기화**: speedcam-ocr 워커 재시작하여 모델 미리 로드
+- **수행 방법**: 1개 워커가 12개 메시지를 60초에 걸쳐 0.2/s 속도로 발행
+- **검증 항목**:
+ - 모든 메시지가 처리됨 (100% 완료율)
+ - 큐 깊이가 안정적으로 유지됨
+ - 데이터베이스 연결이 누적되지 않음
+ - 종단간 지연이 안정적 (5-6초 범위)
+
+#### 3. Saturation Test (포화 테스트)
+- **목적**: OCR 워커 포화 상태에서 시스템의 대응 능력 검증
+- **가설**: 1/s 속도로 메시지가 쌓이면 큐 깊이 증가하며, 메시지 손실 없이 처리됨
+- **수행 방법**: 3개 워커가 180개 메시지를 60초에 걸쳐 1/s 속도로 발행
+- **검증 항목**:
+ - 큐 최대 깊이 관찰 (이론값: ~12)
+ - 메시지 손실 0건
+ - 종단간 지연 증가 추이 (선형 증가 예상)
+ - Celery 작업 타임아웃 발생 여부
+ - 데이터베이스 연결 고갈 여부
+
+#### 4. Spike Test (스파이크 테스트)
+- **목적**: 갑작스러운 트래픽 급증에 대한 시스템 회복 능력 검증
+- **가설**: 짧은 기간의 고속 전송 후 시스템이 정상으로 복구
+- **수행 방법**: 5개 워커가 100개 메시지를 10초에 걸쳐 2/s 속도로 발행
+- **검증 항목**:
+ - 스파이크 중 큐 최대 깊이
+ - 메시지 손실 여부
+ - 완료 후 큐 정상화 시간
+ - 메모리 누수 증상 (메모리 사용량 회귀)
+
+#### 5. Sustained Test (지속성 테스트)
+- **목적**: 장시간 안정적 운영 능력 검증 및 메모리 누수 감지
+- **가설**: 0.5/s 지속 속도로 300초 동안 모든 메시지 처리, 메모리 누수 없음
+- **수행 방법**: 2개 워커가 300개 메시지를 300초에 걸쳐 0.5/s 속도로 발행
+- **검증 항목**:
+ - 메시지 처리율 일정 유지 (100%)
+ - 메모리 사용량 추이 (증가 아닌 안정)
+ - CPU 사용률 안정성
+ - 완료된 메시지 수 = 300건
+
+## 가설-실행-비교 프레임워크
+
+부하 테스트는 다음의 명확한 프레임워크를 따릅니다:
+
+### 1. 사전 가설 수립 (Pre-Test Hypothesis)
+
+각 시나리오 시작 전에 예상 결과를 명확히 정의합니다:
+
+```
+테스트 시작 전:
+├─ 예상 완료 메시지 수
+├─ 예상 최대 큐 깊이
+├─ 예상 종단간 지연 범위
+├─ 예상 리소스 사용률 범위
+└─ 예상 오류율 (0% 또는 허용 범위)
+```
+
+테스트 스크립트는 다음과 같이 가설을 출력합니다:
+
+```python
+# 예시
+print(f"""
+=== Baseline Test Hypothesis ===
+Expected Message Completion: 12 (100%)
+Expected Queue Depth: <1 (stable)
+Expected E2E Latency: 5-6 seconds
+Expected Error Rate: 0%
+Expected OCR Processing Time: ~4-5s per image
+Resource Baseline: Monitor CPU and Memory
+""")
+```
+
+### 2. 테스트 실행 (Test Execution)
+
+실시간으로 시스템 상태를 모니터링하면서 테스트를 실행합니다:
+
+```
+테스트 진행 중:
+├─ 메시지 발행률 확인
+├─ 실시간 큐 깊이 모니터링
+├─ Celery 작업 상태 추적
+├─ 에러 로그 감시
+└─ 리소스 사용률 관찰
+```
+
+테스트 스크립트는 진행 상황을 실시간으로 출력합니다:
+
+```
+[T+5s] Published: 1/12 | Queue Depth: 0 | OCR Processing: 1 | Completed: 0
+[T+10s] Published: 2/12 | Queue Depth: 0 | OCR Processing: 1 | Completed: 1
+[T+15s] Published: 3/12 | Queue Depth: 1 | OCR Processing: 1 | Completed: 1
+...
+```
+
+### 3. 결과 비교 및 분석 (Comparison & Analysis)
+
+테스트 완료 후, 실제 결과를 사전 가설과 비교합니다:
+
+#### 파이프라인 검증 쿼리
+
+테스트 완료 후 데이터베이스를 직접 쿼리하여 메시지 처리 상황을 확인합니다:
+
+```sql
+-- Detection 테이블 확인 (OCR 처리된 메시지)
+SELECT
+ COUNT(*) as total_detections,
+ COUNT(CASE WHEN status='completed' THEN 1 END) as completed,
+ COUNT(CASE WHEN status='failed' THEN 1 END) as failed,
+ COUNT(CASE WHEN status='processing' THEN 1 END) as still_processing
+FROM detections
+WHERE created_at > DATE_SUB(NOW(), INTERVAL 10 MINUTE);
+
+-- 시간별 처리 완료 현황
+SELECT
+ DATE_FORMAT(created_at, '%Y-%m-%d %H:%i:00') as minute,
+ COUNT(*) as completed_count
+FROM detections
+WHERE status='completed' AND created_at > DATE_SUB(NOW(), INTERVAL 10 MINUTE)
+GROUP BY DATE_FORMAT(created_at, '%Y-%m-%d %H:%i:00')
+ORDER BY minute;
+
+-- 처리 시간 분석
+SELECT
+ MIN(DATE_FORMAT(TIMEDIFF(updated_at, created_at), '%H:%i:%s')) as min_duration,
+ MAX(DATE_FORMAT(TIMEDIFF(updated_at, created_at), '%H:%i:%s')) as max_duration,
+ AVG(TIME_TO_SEC(TIMEDIFF(updated_at, created_at))) as avg_seconds
+FROM detections
+WHERE status='completed' AND created_at > DATE_SUB(NOW(), INTERVAL 10 MINUTE);
+```
+
+#### 결과 비교표
+
+테스트 결과를 가설과 비교하는 형식:
+
+```
+=== Baseline Test Results ===
+Metric | Hypothesis | Actual | Status | Analysis
+Total Messages | 12 (100%) | 12 (100%) | PASS | All messages processed
+Queue Depth Peak | <1 | 0 | PASS | No queuing observed
+E2E Latency Range | 5-6s | 5.2-5.8s | PASS | Within expected range
+Error Rate | 0% | 0% | PASS | No processing errors
+OCR Processing Time | 4-5s avg | 4.6s avg | PASS | Consistent with baseline
+Memory Leak | None | Stable | PASS | No memory increase
+DB Connections | <5 | 2-3 | PASS | No connection pooling issues
+```
+
+### 4. 의사결정 기준
+
+결과 비교 후 다음 단계를 결정합니다:
+
+| 결과 | 의사결정 |
+|-----|--------|
+| **모두 PASS** | 다음 시나리오 진행 |
+| **부분 FAIL (메시지 처리 완료)** | 병목 분석 후 진행 (성능 개선 필요) |
+| **메시지 손실** | 중단 및 구성 검토 필요 |
+| **시스템 크래시** | 중단 및 디버깅 필요 |
+
+## 테스트 실행 방법
+
+### 사전 요구사항
+
+```bash
+# GCE 인스턴스에 SSH 접속
+gcloud compute ssh speedcam-app --zone=asia-northeast3-a
+
+# Docker 컨테이너 내부 접속
+sudo docker exec -it speedcam-main bash
+
+# 환경 변수 설정
+export MQTT_PASS=
+export MQTT_HOST=speedcam-mq
+export MQTT_PORT=1883
+```
+
+### 테스트 스크립트 위치
+
+```
+/app/docker/k6/mqtt-load-test.py # 주요 테스트 스크립트
+/app/docker/k6/load-test.js # k6 HTTP API 테스트 (보조)
+```
+
+### 시나리오별 실행 명령
+
+#### 1. Smoke Test (스모크 테스트)
+
+```bash
+# 모든 서비스 준비 상태 확인
+python /app/docker/k6/mqtt-load-test.py smoke
+
+# 예상 출력:
+# === Smoke Test Starting ===
+# Publishing 1 test message...
+# [T+0.5s] Message published
+# [T+5s] Detection created in database
+# [T+5.5s] Alert event published
+# === Smoke Test PASSED ===
+```
+
+#### 2. Baseline Test (기준선 테스트)
+
+```bash
+# OCR 워커가 모델을 미리 로드하도록 대기 (약 30초)
+# 모니터링 터미널에서 Grafana 대시보드 준비
+
+python /app/docker/k6/mqtt-load-test.py baseline
+
+# 예상 소요 시간: 약 70초 (60초 + 처리 완료 대기)
+# 예상 완료 메시지: 12건
+```
+
+#### 3. Saturation Test (포화 테스트)
+
+```bash
+# 주의: 이 테스트는 큐 깊이를 증가시킵니다
+# 모니터링 대시보드 확인 준비
+
+python /app/docker/k6/mqtt-load-test.py saturation
+
+# 예상 소요 시간: 약 150초 (60초 + 큐 처리 완료 대기)
+# 예상 완료 메시지: 180건
+```
+
+#### 4. Spike Test (스파이크 테스트)
+
+```bash
+# 단기간 높은 처리율 테스트
+
+python /app/docker/k6/mqtt-load-test.py spike
+
+# 예상 소요 시간: 약 90초 (10초 + 큐 처리 완료 대기)
+# 예상 완료 메시지: 100건
+```
+
+#### 5. Sustained Test (지속성 테스트)
+
+```bash
+# 장시간 안정성 테스트 - 커피를 준비하세요
+# 이 테스트는 약 8-10분 소요됩니다
+
+python /app/docker/k6/mqtt-load-test.py sustained
+
+# 예상 소요 시간: 약 600초 (300초 + 큐 처리 완료 대기)
+# 예상 완료 메시지: 300건
+```
+
+### 테스트 중단 및 정리
+
+```bash
+# 테스트를 강제 중단해야 하는 경우 (Ctrl+C)
+# Celery 큐에 남아있는 작업을 확인
+python -c "from celery_app import app; print(app.control.inspect().active())"
+
+# 필요시 큐 초기화 (주의: 처리 중인 작업도 제거됨)
+python -c "from celery_app import app; app.control.purge()"
+
+# 데이터베이스 테스트 데이터 정리
+python manage.py shell
+>>> from detections.models import Detection
+>>> Detection.objects.filter(created_at__gt=timezone.now()-timedelta(hours=1)).delete()
+```
+
+## 결과 해석 및 성능 분석
+
+### 메트릭 정의
+
+#### 완료 메시지 (Completed Messages)
+- 정의: MQTT 발행에서 FCM Alert까지 전체 파이프라인 완료
+- PASS 기준: 예상 메시지 수의 100%
+- FAIL 기준: 1건 이상의 메시지 손실
+
+#### 큐 깊이 (Queue Depth)
+- 정의: RabbitMQ AMQP 큐에 대기 중인 Celery 작업 수
+- 모니터링: `rabbitmqctl list_queues` 또는 Grafana
+- 분석:
+ - Baseline에서 큐 깊이 > 2: OCR 처리 능력 부족
+ - Saturation에서 큐 깊이 선형 증가: 예상된 동작
+ - Sustained 후 큐 깊이 0으로 복귀: 정상 종료
+
+#### 종단간 지연 (End-to-End Latency)
+- 정의: MQTT 발행부터 Alert 완료까지의 경과 시간
+- 측정: Database detection.created_at → alert_events.published_at
+- 분석:
+ - Baseline: 5-6초 (안정적)
+ - Saturation: 5초 + (큐_깊이 × 4초) (선형 증가)
+ - Spike 후 정상화: 초기 지연 증가 → 정상 복귀
+
+#### 오류율 (Error Rate)
+- 정의: 처리 실패한 메시지 비율
+- PASS 기준: 0%
+- 감지 방법:
+ - Celery 작업 failed 상태
+ - Loki 로그의 ERROR, EXCEPTION 레벨
+ - Database detection.status='failed'
+
+### 결과별 해석 가이드
+
+#### Baseline Test 결과 해석
+
+**완전 통과 (All PASS)**
+```
+완료: 12/12 (100%)
+큐 최대: 0
+E2E 지연: 5.2-5.8s 평균 5.5s
+오류율: 0%
+
+→ 해석: 시스템이 설계된 대로 동작. OCR 최대 처리량을 안전하게 유지.
+→ 다음: Saturation 테스트 진행.
+```
+
+**부분 실패 (Partial FAIL) - 메시지 손실 없음**
+```
+완료: 12/12 (100%)
+큐 최대: 1-2
+E2E 지연: 5.5-7.2s 평균 6.2s
+오류율: 0%
+
+→ 해석: 메시지는 모두 처리되지만 약간의 지연 발생.
+→ 원인 분석:
+ - 데이터베이스 연결 대기
+ - GC pause 영향
+ - MQTT 브로커 내부 처리 지연
+→ 다음: Grafana에서 상세 분석 (CPU, 메모리, DB 연결)
+```
+
+**실패 (FAIL) - 메시지 손실**
+```
+완료: 10/12 (83%)
+손실: 2
+큐 최대: 5+
+오류율: 16.7%
+
+→ 해석: 메시지 손실 발생 - 심각한 문제.
+→ 즉시 조치:
+ 1. Celery 워커 로그 확인
+ docker logs speedcam-ocr | tail -100
+ 2. Loki에서 ERROR 레벨 로그 검색
+ {job="celery-worker"} | json | status="ERROR"
+ 3. RabbitMQ 상태 확인
+ docker exec speedcam-mq rabbitmqctl status
+→ 다음: 원인 제거 후 Baseline 재실행.
+```
+
+#### Saturation Test 결과 해석
+
+**정상 포화 (Expected Behavior)**
+```
+완료: 180/180 (100%)
+큐 최대: 8-12 (이론값: (1.0-0.2)×60 = 48s×1msg/s ≈ 10-12)
+E2E 지연: 초기 5-6s → 최대 50-52s
+오류율: 0%
+
+→ 해석: OCR이 포화되었으나 메시지 손실 없음 (큐 기반 처리).
+→ 시스템이 설계된 대로 부하를 흡수.
+→ 다음: Spike 테스트 진행.
+```
+
+**예상과 다른 포화 (Anomaly)**
+```
+완료: 180/180 (100%)
+큐 최대: >30 (예상 10-12)
+E2E 지연: 100초 이상
+오류율: 0% 하지만 일부 타임아웃
+
+→ 해석: OCR 처리 속도가 예상보다 느림.
+→ 원인 분석:
+ - OCR 모델 재로드되었을 가능성 (메모리 부족)
+ - CPU 과부하 또는 열 제한
+ - 데이터베이스 슬로우 쿼리
+→ 다음: 인프라 점검 후 baseline 재실행.
+```
+
+### 병목 지점 식별 및 개선
+
+#### 현재 알려진 병목: OCR 처리 (Primary Bottleneck)
+
+**증상:**
+- OCR_CONCURRENCY=1로 설정되어 있음
+- 이론적 최대 처리량: 0.2 msg/s (4-5초/이미지)
+- 단일 CPU 코어에서 순차 처리
+
+**개선 방안:**
+
+```bash
+# 1단계: 멀티코어 EasyOCR (실험)
+# Saturation 결과를 토대로 확인
+# 큐가 10개 이상 쌓이면 다음 단계 고려
+
+# 2단계: OCR_CONCURRENCY 증가 (권장)
+# speedcam-ocr 환경 변수 수정
+export OCR_CONCURRENCY=2 # 이론상 0.4 msg/s 가능
+
+# Docker 재시작
+sudo docker restart speedcam-ocr
+
+# Baseline 재테스트
+python /app/docker/k6/mqtt-load-test.py baseline
+
+# 결과: 완료 시간 50% 단축 예상 (12 × 5s / 2 = 30s)
+```
+
+**OCR_CONCURRENCY 증가 시 고려사항:**
+- 메모리 사용량 증가 (각 워커당 GiB급)
+- CPU 코어 수 제한 (GCE 인스턴스 코어 수 확인)
+- 온도 및 열 제한 (GPU 없이 CPU만 사용)
+
+#### 2차 병목: Gunicorn 워커 (Secondary Bottleneck)
+
+**현재 상태:**
+- GUNICORN_WORKERS=2 설정
+- MQTT 수신은 별도 스레드에서 처리
+- HTTP API는 Gunicorn 워커 풀 공유
+
+**확인 방법:**
+```bash
+# Saturation 테스트 중 Grafana 확인
+# Gunicorn worker utilization을 모니터링
+# 만약 모든 워커가 항상 바쁘다면:
+
+# 로그에서 "worker timeout" 확인
+docker logs speedcam-app 2>&1 | grep -i timeout
+
+# 필요시 GUNICORN_WORKERS 증가
+export GUNICORN_WORKERS=4
+```
+
+#### 3차 병목: MySQL 데이터베이스 (Tertiary Bottleneck)
+
+**감지:**
+```sql
+-- 데이터베이스 연결 확인
+SHOW PROCESSLIST;
+
+-- 슬로우 쿼리 확인
+SELECT * FROM mysql.slow_log ORDER BY start_time DESC LIMIT 10;
+
+-- 테이블 락 확인
+SHOW OPEN TABLES WHERE In_use > 0;
+```
+
+**개선:**
+```bash
+# 1. 인덱스 확인
+# detections 테이블의 created_at, status에 인덱스 있는지 확인
+
+# 2. 연결 풀 크기 확인
+# Django DATABASES 설정의 CONN_MAX_AGE
+
+# 3. 필요시 마스터-슬레이브 구성 검토
+```
+
+#### MQTT Subscriber 병목 가능성
+
+**현재 아키텍처:**
+- Django 애플리케이션 내부 MQTT 클라이언트 (단일 스레드)
+- 블로킹 구독 모델
+
+**부하 테스트에서 영향:**
+- 메시지 발행률이 초당 1개 이상일 때 순차 처리
+- 단일 스레드이므로 CPU 활용도가 낮을 수 있음
+
+**확인:**
+```bash
+# 테스트 중 프로세스 상태 확인
+docker top speedcam-app | head -20
+
+# 만약 MQTT 수신 thread가 항상 busy면:
+# MQTT 클라이언트 최적화 필요
+```
+
+## 모니터링 및 관찰
+
+### Grafana 대시보드 사용
+
+테스트 진행 중 다음 대시보드를 지속적으로 관찰합니다:
+
+#### 1. SpeedCam 애플리케이션 대시보드
+
+```
+URL: http://speedcam-mon:3000/d/speedcam-app/speedcam-application
+
+주요 패널:
+├─ MQTT Messages Received (per second)
+│ └─ 값이 설정된 발행율과 일치하는지 확인
+├─ Celery Queue Depth (tasks)
+│ └─ Baseline: ~0, Saturation: 8-12, Spike: 최대값 관찰
+├─ OCR Processing Time (histogram)
+│ └─ 평균값이 4-5초 범위인지 확인
+└─ End-to-End Latency (percentiles)
+ └─ P50: 5-6초, P99: 큐_깊이에 비례하여 증가
+```
+
+#### 2. RabbitMQ 모니터링 대시보드
+
+```
+URL: http://speedcam-mq:15672 (username: guest, password: guest)
+
+관찰 항목:
+├─ Queue: celery (Messages)
+│ └─ Ready: 처리 대기 중인 작업
+│ └─ Unacked: 처리 중인 작업
+├─ Queue: detections.completed
+│ └─ Alert Service가 처리하는 이벤트 흐름
+└─ Consumers
+ └─ OCR Worker 연결 상태 확인
+```
+
+#### 3. MySQL 성능 모니터링
+
+```
+Grafana 패널: Database Performance
+
+확인 항목:
+├─ Queries per second
+├─ Average query execution time
+├─ Active connections
+├─ Slow queries (>1 second)
+└─ Innodb buffer pool hit ratio (>99% 목표)
+```
+
+### Prometheus 메트릭 쿼리
+
+테스트 중 다음 메트릭을 직접 쿼리하여 확인합니다:
+
+```promql
+# MQTT 수신 메시지율
+rate(mqtt_messages_received_total[1m])
+
+# Celery 큐 깊이 (현재값)
+celery_queue_length{queue="celery"}
+
+# OCR 처리 시간 (평균)
+rate(ocr_processing_time_seconds_sum[5m]) /
+rate(ocr_processing_time_seconds_count[5m])
+
+# 완료된 감지 이벤트
+rate(detections_completed_total[1m])
+
+# Alert 발행 이벤트
+rate(alert_events_published_total[1m])
+
+# 데이터베이스 연결
+mysql_global_status_threads_connected
+```
+
+### Loki 로그 쿼리
+
+오류 또는 의심스러운 동작을 추적하기 위해 다음 로그 쿼리를 사용합니다:
+
+```loki
+# OCR 워커 에러
+{job="celery-worker"} | json | level="ERROR"
+
+# Celery 작업 타임아웃
+{job="celery-worker"} | json | msg=~".*timeout.*"
+
+# Django 애플리케이션 에러
+{job="django-app"} | json | level="ERROR"
+
+# MQTT 연결 문제
+{job="django-app"} | json | msg=~".*mqtt.*error.*"
+
+# 데이터베이스 연결 에러
+{job="django-app"} | json | msg=~".*database.*connection.*"
+
+# 지난 5분간의 모든 ERROR 레벨 로그 개수
+count(
+ {job=~"celery-worker|django-app"}
+ | json
+ | level="ERROR"
+) by (job)
+```
+
+### Jaeger 분산 추적 (Distributed Tracing)
+
+선택적으로 완전한 요청 흐름을 추적합니다:
+
+```
+URL: http://speedcam-mon:6831/search
+
+추적 항목:
+1. MQTT 메시지 수신 스팬
+ ├─ MQTT publish (IoT Camera)
+ ├─ MQTT message received (Django)
+ └─ Celery task enqueue
+
+2. OCR 처리 스팬
+ ├─ Celery task start
+ ├─ EasyOCR model load (초회)
+ ├─ Image preprocessing
+ ├─ OCR inference
+ └─ Celery task complete
+
+3. Alert 발행 스팬
+ ├─ Domain event created
+ ├─ kombu consumer received
+ ├─ FCM API call
+ └─ Alert published
+```
+
+## 문제 해결 및 FAQ
+
+### Q: Baseline 테스트에서 메시지가 처리되지 않음
+
+**A: 다음을 순서대로 확인하세요:**
+
+```bash
+# 1. MQTT 브로커 연결 확인
+docker exec speedcam-mq mosquitto_sub -h localhost -t "#" &
+# (다른 터미널에서) python /app/docker/k6/mqtt-load-test.py smoke
+
+# 2. Celery 워커 상태 확인
+docker exec speedcam-ocr celery -A ocr_tasks inspect active
+
+# 3. Django 애플리케이션 로그 확인
+docker logs speedcam-app | tail -50 | grep -i error
+
+# 4. RabbitMQ 큐 상태
+docker exec speedcam-mq rabbitmqctl list_queues
+```
+
+### Q: OCR 워커가 응답하지 않음
+
+**A:**
+
+```bash
+# 1. EasyOCR 모델 로드 상태 확인
+# 첫 테스트 실행 시 약 30초 소요 (로그에서 확인)
+docker logs speedcam-ocr | grep -i "loading\|model"
+
+# 2. 메모리 부족 여부 확인
+docker stats speedcam-ocr
+
+# 만약 메모리 사용량이 95% 이상:
+# OCR_CONCURRENCY를 1로 유지하거나
+# 인스턴스 메모리 증설 필요
+
+# 3. 워커 재시작
+docker restart speedcam-ocr
+
+# 모델 다시 로드될 때까지 대기 (30초)
+sleep 30
+python /app/docker/k6/mqtt-load-test.py smoke
+```
+
+### Q: 테스트 중 "Task timed out" 에러 발생
+
+**A:**
+
+```bash
+# 1. Celery 타임아웃 설정 확인
+# celery_config.py의 task_soft_time_limit, task_time_limit 확인
+
+# 2. 원인별 대응:
+# - OCR 처리 시간 > 5초인 경우: 정상 (이미지 크기 또는 모델 특성)
+# - OCR 처리 시간 > 30초인 경우: 모델 재로드 또는 하드웨어 문제
+# → docker restart speedcam-ocr
+
+# 3. 타임아웃 시간 증가 (필요시)
+export CELERY_TASK_TIME_LIMIT=600 # 10분
+docker restart speedcam-ocr
+```
+
+### Q: 메모리 사용량이 계속 증가함
+
+**A: 메모리 누수 가능성**
+
+```bash
+# 1. 현재 메모리 사용량 추이 확인
+watch -n 1 'docker stats speedcam-app --no-stream | head -2'
+
+# 2. Grafana에서 Memory Usage 그래프 확인
+# Sustained 테스트 후 메모리가 복구되지 않으면 누수 가능
+
+# 3. 원인 분석:
+# - Database 연결 누적: Django ORM 미사용 연결
+# → Django 설정의 CONN_MAX_AGE 확인
+#
+# - Celery 작업 메타데이터 누적
+# → RabbitMQ 퍼지 또는 설정 검토
+#
+# - Python 객체 참조 순환
+# → 메모리 프로파일링 도구 (memory_profiler) 사용
+
+# 4. 재시작
+docker restart speedcam-app speedcam-ocr speedcam-alert
+```
+
+### Q: Saturation 테스트 후 큐가 비지 않음
+
+**A:**
+
+```bash
+# 1. 남아있는 작업 확인
+docker exec speedcam-mq rabbitmqctl list_queues
+
+# 2. Celery 워커 상태 확인
+docker exec speedcam-ocr celery -A ocr_tasks inspect active
+
+# 3. 만약 워커가 멈춘 경우:
+docker restart speedcam-ocr
+
+# 4. 큐 수동 퍼지 (데이터 손실 주의)
+docker exec speedcam-mq rabbitmqctl purge_queue celery
+
+# 5. 데이터베이스 정리
+docker exec speedcam-app python manage.py shell
+# >>> from detections.models import Detection
+# >>> Detection.objects.filter(created_at__gt=...).delete()
+```
+
+## 체크리스트
+
+테스트를 시작하기 전에 다음을 확인하세요:
+
+### 사전 점검
+
+- [ ] 모든 인스턴스(speedcam-app, speedcam-db, speedcam-mq, speedcam-ocr, speedcam-alert, speedcam-mon)가 실행 중
+- [ ] SSH 접속 가능 (`gcloud compute ssh speedcam-app --zone=asia-northeast3-a`)
+- [ ] Docker 컨테이너 접속 가능 (`docker exec -it speedcam-main bash`)
+- [ ] MQTT 환경 변수 설정 (`MQTT_PASS` 포함)
+- [ ] RabbitMQ 웹 UI 접속 가능 (http://speedcam-mq:15672)
+- [ ] Grafana 접속 가능 (http://speedcam-mon:3000)
+- [ ] MySQL 접속 가능 (`docker exec speedcam-db mysql -u root -p`)
+
+### 테스트별 점검
+
+#### Smoke Test 전
+- [ ] Celery 워커 상태: active/idle
+- [ ] RabbitMQ 큐: celery, detections.completed 모두 empty
+- [ ] 데이터베이스: 최근 detections 없음
+
+#### Baseline Test 전
+- [ ] OCR 모델 미리 로드 (약 30초 대기 후 확인)
+- [ ] Grafana 대시보드 새로고침
+- [ ] 모니터링 터미널 준비 (2개: 스크립트 + 로그)
+
+#### Saturation Test 전
+- [ ] Spike 테스트 완료 후 큐 비워짐 확인
+- [ ] 메모리 사용량 정상 범위 확인
+- [ ] Grafana 범위 설정 변경 (Y축 스케일 확인)
+
+#### Spike Test 전
+- [ ] 최근 테스트 결과 분석 완료
+- [ ] 병목 지점 파악 완료
+
+#### Sustained Test 전
+- [ ] 충분한 시간 확보 (약 10분)
+- [ ] 모니터링 도구 안정성 확인
+- [ ] 로그 수집 설정 확인 (Loki)
+
+### 테스트 후 정리
+
+- [ ] 모든 완료 메시지 수 기록
+- [ ] 최대 큐 깊이 기록
+- [ ] 주요 오류 로그 저장
+- [ ] Grafana 스크린샷 캡처
+- [ ] 분석 결과 문서화
+- [ ] 데이터베이스 테스트 데이터 정리 (필요시)
+- [ ] 다음 테스트 시나리오 계획
+
+## 결론 및 권장사항
+
+### 현재 성능 프로필 요약
+
+SpeedCam 시스템은 다음과 같은 성능 특성을 보입니다:
+
+- **최대 안전 처리 속도**: ~0.2 msg/s (OCR 병목)
+- **포화 상태 큐 깊이**: ~10-12 작업
+- **종단간 지연**: 5-6초 (baseline) ~ 50초+ (saturation)
+- **안정성**: 메시지 손실 0%, 메모리 누수 없음
+
+### 향후 개선 계획
+
+**Phase 1 (즉시 실행 가능)**
+1. OCR_CONCURRENCY를 2로 증가 → 처리량 2배 증대
+2. Baseline 테스트 재실행하여 안정성 재검증
+
+**Phase 2 (중장기)**
+1. 이미지 전처리 최적화 (해상도, 압축율)
+2. EasyOCR 대신 더 빠른 OCR 엔진 평가 (TrOCR, PaddleOCR)
+3. GPU 활용 검토 (GCE GPU 인스턴스)
+
+**Phase 3 (장기)**
+1. 마이크로서비스 아키텍처: OCR 서비스 독립 스케일링
+2. 메시지 브로커 클러스터링
+3. 캐싱 전략 (이미지 해시 기반 캐시 완료 결과)
+
+## 참고 자료
+
+- MQTT 메시지 형식: `/app/docs/mqtt-protocol.md`
+- Celery 설정: `/app/celery_config.py`
+- Django MQTT Subscriber: `/app/mqtt/subscriber.py`
+- OCR 워커 구현: `/app/ocr/tasks.py`
+- Grafana 대시보드 설정: `/app/docker/grafana/dashboards/`
+- Loki 설정: `/app/docker/loki/loki-config.yaml`
+
+---
+
+**작성일**: 2026년 2월 13일
+**최종 수정**: 2026년 2월 13일
+**유지보수자**: SpeedCam 팀
diff --git a/requirements/ocr.txt b/requirements/ocr.txt
index d163a00..7d9c73b 100644
--- a/requirements/ocr.txt
+++ b/requirements/ocr.txt
@@ -8,4 +8,3 @@ pillow==11.2.1
# Google Cloud Storage
google-cloud-storage==2.18.2
-
diff --git a/scripts/start_alert_worker.sh b/scripts/start_alert_worker.sh
index 24bda15..29eed45 100644
--- a/scripts/start_alert_worker.sh
+++ b/scripts/start_alert_worker.sh
@@ -1,14 +1,22 @@
#!/bin/bash
set -e
-echo "Starting Alert Worker (Celery)..."
+echo "Starting Alert Worker (Domain Event Consumer)..."
-# Celery Worker 시작 (gevent pool - I/O 집약적)
+# Django 설정 모듈 기본값
+export DJANGO_SETTINGS_MODULE="${DJANGO_SETTINGS_MODULE:-config.settings.dev}"
+
+# GCP metadata 내부 요청을 트레이싱에서 제외 (404 ERROR span 방지)
+export OTEL_PYTHON_REQUESTS_EXCLUDED_URLS="${OTEL_PYTHON_REQUESTS_EXCLUDED_URLS:-metadata.google.internal}"
+
+# Alert Event Consumer 시작
+# Choreography: detections.completed 이벤트를 직접 구독하여
+# Alert Service가 자율적으로 알림 발송 여부를 결정한다.
opentelemetry-instrument \
--service_name speedcam-alert \
- celery -A config worker \
- --pool=gevent \
- --concurrency=${ALERT_CONCURRENCY:-100} \
- --queues=fcm_queue \
- --hostname=alert@%h \
- --loglevel=${LOG_LEVEL:-info}
+ python -c "
+import django
+django.setup()
+from core.events.consumer import start_event_consumer
+start_event_consumer()
+"
diff --git a/scripts/start_ocr_worker.sh b/scripts/start_ocr_worker.sh
index 40557a6..dc8d892 100644
--- a/scripts/start_ocr_worker.sh
+++ b/scripts/start_ocr_worker.sh
@@ -3,6 +3,9 @@ set -e
echo "Starting OCR Worker (Celery)..."
+# GCP metadata 내부 요청을 트레이싱에서 제외 (404 ERROR span 방지)
+export OTEL_PYTHON_REQUESTS_EXCLUDED_URLS="${OTEL_PYTHON_REQUESTS_EXCLUDED_URLS:-metadata.google.internal}"
+
# Celery Worker 시작 (prefork pool - CPU 집약적)
opentelemetry-instrument \
--service_name speedcam-ocr \
diff --git a/tasks/__init__.py b/tasks/__init__.py
index c3d404a..d0f85f7 100644
--- a/tasks/__init__.py
+++ b/tasks/__init__.py
@@ -1,6 +1,5 @@
# Celery Tasks Package
from .dlq_tasks import process_dlq_message
-from .notification_tasks import send_notification
from .ocr_tasks import process_ocr
-__all__ = ["process_ocr", "send_notification", "process_dlq_message"]
+__all__ = ["process_ocr", "process_dlq_message"]
diff --git a/tasks/notification_tasks.py b/tasks/notification_tasks.py
index 770a3c8..a9061ec 100644
--- a/tasks/notification_tasks.py
+++ b/tasks/notification_tasks.py
@@ -1,9 +1,13 @@
-"""Notification Worker Tasks (I/O 집약적)"""
+"""
+Notification Processing Logic (Alert Service)
+
+EDA/Choreography: detections.completed 이벤트에 반응하여
+Alert Service가 자율적으로 알림 발송 여부를 결정한다.
+"""
import logging
import os
-from celery import shared_task
from django.utils import timezone
logger = logging.getLogger(__name__)
@@ -12,17 +16,10 @@
FCM_MOCK = os.getenv("FCM_MOCK", "false").lower() == "true"
-@shared_task(
- bind=True,
- max_retries=3,
- autoretry_for=(Exception,),
- retry_backoff=True,
- retry_backoff_max=600,
- acks_late=True,
-)
-def send_notification(self, detection_id: int):
+def process_notification(detection_id: int):
"""
- FCM 푸시 알림 전송 Task
+ FCM 푸시 알림 전송
+
- 대시보드 토픽 브로드캐스트 (모든 감지에 대해)
- 매칭된 차량 개별 푸시 (차량 있는 경우)
- MSA: 각 서비스별 DB에서 조회
@@ -31,150 +28,126 @@ def send_notification(self, detection_id: int):
from apps.notifications.models import Notification
from apps.vehicles.models import Vehicle
- try:
- # 1. Detection 조회 (detections_db)
- detection = Detection.objects.using("detections_db").get(
- id=detection_id, status="completed"
+ # 1. Detection 조회 (detections_db)
+ detection = Detection.objects.using("detections_db").get(
+ id=detection_id, status="completed"
+ )
+
+ # 2. 알림 메시지 생성
+ title = f"과속 위반 감지: {detection.ocr_result or '미확인'}"
+ body = (
+ f"위치: {detection.location or '알 수 없음'}\n"
+ f"속도: {detection.detected_speed}km/h "
+ f"(제한: {detection.speed_limit}km/h)"
+ )
+ data = {
+ "detection_id": str(detection_id),
+ "plate_number": detection.ocr_result or "",
+ "speed": str(detection.detected_speed),
+ "speed_limit": str(detection.speed_limit),
+ "location": detection.location or "",
+ "detected_at": detection.detected_at.isoformat(),
+ }
+
+ # 3. 대시보드 토픽으로 항상 전송 (중복 방지)
+ topic_response = None
+ already_sent_topic = (
+ Notification.objects.using("notifications_db")
+ .filter(
+ detection_id=detection_id,
+ fcm_token="topic:dashboard_alerts",
+ status="sent",
)
+ .exists()
+ )
- # 2. 알림 메시지 생성
- title = f"⚠️ 과속 위반 감지: {detection.ocr_result or '미확인'}"
- body = (
- f"📍 위치: {detection.location or '알 수 없음'}\n"
- f"🚗 속도: {detection.detected_speed}km/h "
- f"(제한: {detection.speed_limit}km/h)"
- )
- data = {
- "detection_id": str(detection_id),
- "plate_number": detection.ocr_result or "",
- "speed": str(detection.detected_speed),
- "speed_limit": str(detection.speed_limit),
- "location": detection.location or "",
- "detected_at": detection.detected_at.isoformat(),
- }
-
- # 3. 대시보드 토픽으로 항상 전송 (중복 방지)
- topic_response = None
- already_sent_topic = (
- Notification.objects.using("notifications_db")
- .filter(
- detection_id=detection_id,
- fcm_token="topic:dashboard_alerts",
- status="sent",
+ if not already_sent_topic:
+ try:
+ if FCM_MOCK:
+ topic_response = f"mock-topic-{detection_id}"
+ else:
+ from core.firebase.fcm import send_topic_notification
+
+ topic_response = send_topic_notification(
+ "dashboard_alerts", title, body, data
+ )
+ logger.info(
+ f"Dashboard topic notification sent for detection "
+ f"{detection_id}: {topic_response}"
+ )
+ except Exception as e:
+ logger.warning(
+ f"Dashboard topic notification failed for detection "
+ f"{detection_id}: {e}"
)
- .exists()
+
+ # 4. 토픽 알림 이력 저장 (notifications_db)
+ Notification.objects.using("notifications_db").create(
+ detection_id=detection_id,
+ fcm_token="topic:dashboard_alerts",
+ title=title,
+ body=body,
+ status="sent" if topic_response else "failed",
+ sent_at=timezone.now() if topic_response else None,
+ error_message=None if topic_response else "Topic send failed",
+ )
+ else:
+ topic_response = "already_sent"
+ logger.info(
+ f"Dashboard topic notification already sent for detection "
+ f"{detection_id}, skipping"
)
- if not already_sent_topic:
- try:
- if FCM_MOCK:
- topic_response = f"mock-topic-{detection_id}"
- else:
- from core.firebase.fcm import send_topic_notification
-
- topic_response = send_topic_notification(
- "dashboard_alerts", title, body, data
- )
- logger.info(
- f"Dashboard topic notification sent for detection "
- f"{detection_id}: {topic_response}"
- )
- except Exception as e:
- logger.warning(
- f"Dashboard topic notification failed for detection "
- f"{detection_id}: {e}"
+ # 5. 매칭된 차량에 개별 푸시
+ vehicle = None
+ if detection.vehicle_id:
+ try:
+ vehicle = Vehicle.objects.using("vehicles_db").get(id=detection.vehicle_id)
+ except Vehicle.DoesNotExist:
+ logger.warning(f"Vehicle {detection.vehicle_id} not found")
+
+ if vehicle and vehicle.fcm_token:
+ try:
+ if FCM_MOCK:
+ vehicle_response = f"mock-message-id-{detection_id}"
+ else:
+ from core.firebase.fcm import send_push_notification
+
+ vehicle_response = send_push_notification(
+ token=vehicle.fcm_token,
+ title=title,
+ body=body,
+ data=data,
)
- # 4. 토픽 알림 이력 저장 (notifications_db)
Notification.objects.using("notifications_db").create(
detection_id=detection_id,
- fcm_token="topic:dashboard_alerts",
+ fcm_token=vehicle.fcm_token,
title=title,
body=body,
- status="sent" if topic_response else "failed",
- sent_at=timezone.now() if topic_response else None,
- error_message=None if topic_response else "Topic send failed",
+ status="sent",
+ sent_at=timezone.now(),
)
- else:
- topic_response = "already_sent"
logger.info(
- f"Dashboard topic notification already sent for detection "
- f"{detection_id}, skipping"
+ f"Vehicle notification sent for detection "
+ f"{detection_id}: {vehicle_response}"
+ )
+ except Exception as e:
+ logger.warning(
+ f"Vehicle notification failed for detection {detection_id}: {e}"
)
-
- # 5. 매칭된 차량에 개별 푸시 (기존 동작)
- vehicle = None
- if detection.vehicle_id:
- try:
- vehicle = Vehicle.objects.using("vehicles_db").get(
- id=detection.vehicle_id
- )
- except Vehicle.DoesNotExist:
- logger.warning(f"Vehicle {detection.vehicle_id} not found")
-
- if vehicle and vehicle.fcm_token:
- try:
- if FCM_MOCK:
- vehicle_response = f"mock-message-id-{detection_id}"
- else:
- from core.firebase.fcm import send_push_notification
-
- vehicle_response = send_push_notification(
- token=vehicle.fcm_token,
- title=title,
- body=body,
- data=data,
- )
-
- Notification.objects.using("notifications_db").create(
- detection_id=detection_id,
- fcm_token=vehicle.fcm_token,
- title=title,
- body=body,
- status="sent",
- sent_at=timezone.now(),
- )
- logger.info(
- f"Vehicle notification sent for detection "
- f"{detection_id}: {vehicle_response}"
- )
- except Exception as e:
- logger.warning(
- f"Vehicle notification failed for detection " f"{detection_id}: {e}"
- )
- Notification.objects.using("notifications_db").create(
- detection_id=detection_id,
- fcm_token=vehicle.fcm_token,
- title=title,
- body=body,
- status="failed",
- error_message=str(e),
- )
-
- return {
- "status": "sent",
- "topic": bool(topic_response),
- "vehicle": bool(vehicle and vehicle.fcm_token),
- }
-
- except Detection.DoesNotExist:
- logger.warning(f"Detection {detection_id} not found or not completed, retrying")
- raise self.retry(countdown=3, max_retries=3)
-
- except Exception as exc:
- try:
- from apps.notifications.models import Notification
-
Notification.objects.using("notifications_db").create(
detection_id=detection_id,
+ fcm_token=vehicle.fcm_token,
+ title=title,
+ body=body,
status="failed",
- retry_count=self.request.retries,
- error_message=str(exc),
- )
- except Exception as db_err:
- logger.error(
- f"Failed to record notification failure for detection {detection_id}: {db_err}"
+ error_message=str(e),
)
- logger.error(f"Notification failed for detection {detection_id}: {exc}")
- raise
+ logger.info(f"Notification processing completed for detection {detection_id}")
+ return {
+ "status": "sent",
+ "topic": bool(topic_response),
+ "vehicle": bool(vehicle and vehicle.fcm_token),
+ }
diff --git a/tasks/ocr_tasks.py b/tasks/ocr_tasks.py
index a547939..b846c72 100644
--- a/tasks/ocr_tasks.py
+++ b/tasks/ocr_tasks.py
@@ -56,12 +56,11 @@ def process_ocr(self, detection_id: int, gcs_uri: str):
OCR 처리 Task
- GCS에서 이미지 다운로드
- EasyOCR 실행
- - 직접 MySQL 업데이트 (Choreography 패턴)
+ - 처리 완료 후 detection.completed 이벤트 발행 (Choreography)
- MSA: 각 서비스별 DB 사용
"""
from apps.detections.models import Detection
from apps.vehicles.models import Vehicle
- from tasks.notification_tasks import send_notification
try:
# 1. 상태를 processing으로 업데이트 (detections_db)
@@ -130,13 +129,19 @@ def process_ocr(self, detection_id: int, gcs_uri: str):
except Exception as e:
logger.warning(f"Vehicle lookup failed: {e}")
- # 7. Always send notification for completed detections
- # (dashboard gets topic notification; matched vehicle gets individual push)
+ # 7. detections.completed 이벤트 발행 (Choreography)
+ # OCR은 알림 서비스의 존재를 모른다. 완료 사실만 발행하고 끝.
+ # AMQP topic exchange를 통해 관심 있는 서비스가 독립적으로 구독.
try:
- send_notification.apply_async(args=[detection_id], queue="fcm_queue")
+ from core.events.publisher import publish_event
+
+ publish_event(
+ "detections.completed",
+ {"detection_id": detection_id},
+ )
except Exception as e:
logger.warning(
- f"Failed to enqueue notification for detection {detection_id}: {e}"
+ f"Failed to publish completion event for detection {detection_id}: {e}"
)
logger.info(f"OCR completed for detection {detection_id}: {plate_number}")
diff --git a/tests/unit/test_tasks.py b/tests/unit/test_tasks.py
index a636a79..093eb8c 100644
--- a/tests/unit/test_tasks.py
+++ b/tests/unit/test_tasks.py
@@ -80,13 +80,13 @@ def test_notification_body_format(self, completed_detection):
@pytest.mark.skipif(not firebase_available, reason="firebase_admin not installed")
@patch("tasks.notification_tasks.os.environ.get")
- def test_send_notification_mock_mode(self, mock_env, completed_detection):
- """Mock 모드에서 알림 전송 테스트"""
+ def test_process_notification_mock_mode(self, mock_env, completed_detection):
+ """Mock 모드에서 알림 처리 테스트"""
mock_env.return_value = "true"
- from tasks.notification_tasks import send_notification
+ from tasks.notification_tasks import process_notification
- result = send_notification(completed_detection.id)
+ result = process_notification(completed_detection.id)
assert result["status"] in ["sent", "skipped"]