From e17df5b97720203472db3f9f797b316f4bf80051 Mon Sep 17 00:00:00 2001 From: sanghun Date: Fri, 13 Feb 2026 16:28:28 +0900 Subject: [PATCH 1/4] =?UTF-8?q?refactor:=20EDA=20choreography=20=ED=8C=A8?= =?UTF-8?q?=ED=84=B4=20=EC=A0=84=ED=99=98=20=EB=B0=8F=20GCS=20ADC=20?= =?UTF-8?q?=EB=A7=88=EC=9D=B4=EA=B7=B8=EB=A0=88=EC=9D=B4=EC=85=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - AMQP domain events 기반 choreography 패턴 구현 - core/events/publisher.py: topic exchange로 이벤트 발행 - core/events/consumer.py: kombu ConsumerMixin으로 이벤트 구독 - OCR → detections.completed 이벤트 → Alert 독립 구독 - Alert Worker: Celery worker → kombu event consumer로 전환 - GCS 인증: JSON 키 파일 → ADC (Application Default Credentials) - GOOGLE_APPLICATION_CREDENTIALS 환경변수 제거 - GCE 메타데이터 서버 자동 인증 사용 - FIREBASE_CREDENTIALS는 유지 (FCM용) - MQTT subscriber: paho-mqtt v2 API 적용 - 부하테스트 스크립트 전면 개선 - 5개 시나리오 (smoke/baseline/saturation/spike/sustained) - 가설-실행-비교 프레임워크 - MySQL 파이프라인 검증 - 부하테스트 문서 추가 (docs/load-testing.md) --- backend.env.example | 13 +- config/celery.py | 19 +- core/events/__init__.py | 5 + core/events/consumer.py | 106 ++++ core/events/publisher.py | 46 ++ core/firebase/fcm.py | 2 +- core/mqtt/__init__.py | 2 +- core/mqtt/subscriber.py | 47 +- credentials/README.md | 25 +- docker/docker-compose.yml | 2 - docker/k6/mqtt-load-test.py | 459 ++++++++++++++++-- docs/PRD.md | 251 ++++++---- docs/load-testing.md | 878 ++++++++++++++++++++++++++++++++++ requirements/ocr.txt | 1 - scripts/start_alert_worker.sh | 24 +- scripts/start_ocr_worker.sh | 3 + tasks/__init__.py | 3 +- tasks/notification_tasks.py | 255 +++++----- tasks/ocr_tasks.py | 17 +- tests/unit/test_tasks.py | 8 +- 20 files changed, 1837 insertions(+), 329 deletions(-) create mode 100644 core/events/__init__.py create mode 100644 core/events/consumer.py create mode 100644 core/events/publisher.py create mode 100644 docs/load-testing.md diff --git a/backend.env.example b/backend.env.example index bc51cb1..f5cee6a 100644 --- a/backend.env.example +++ b/backend.env.example @@ -38,16 +38,17 @@ MQTT_USER=sa MQTT_PASS=1234 # =========================================== -# GCS (Google Cloud Storage) 설정 +# GCS (Google Cloud Storage) 인증 # =========================================== -# OCR_MOCK=false일 때만 필요 -GOOGLE_APPLICATION_CREDENTIALS=/-path(secret)-/gcp-cloud-storage.json -# GCS_BUCKET_NAME은 불필요 (GCS URI에서 자동 파싱) +# GCS는 ADC (Application Default Credentials) 사용 +# GCE 인스턴스: 메타데이터 서버에서 자동 인증 (설정 불필요) +# 로컬 개발: gcloud auth application-default login 실행 후 사용 +# OCR_MOCK=true이면 GCS 호출 없으므로 인증 불필요 # =========================================== # Firebase 설정 (FCM Push Notification) # =========================================== -FIREBASE_CREDENTIALS=/-path(secret)-/firebase-service-account.json +FIREBASE_CREDENTIALS=/app/credentials/firebase-service-account.json # =========================================== # Celery Worker 설정 @@ -91,3 +92,5 @@ OTEL_RESOURCE_ATTRIBUTES=service.namespace=speedcam,deployment.environment=dev # Valid values: always_on, always_off, traceidratio, parentbased_always_on, parentbased_always_off, parentbased_traceidratio OTEL_TRACES_SAMPLER=parentbased_always_on OTEL_PYTHON_LOG_CORRELATION=true +# GCP metadata 내부 요청을 트레이싱에서 제외 (404 ERROR span 방지) +OTEL_PYTHON_REQUESTS_EXCLUDED_URLS=metadata.google.internal diff --git a/config/celery.py b/config/celery.py index 9f3b672..50d3c6b 100644 --- a/config/celery.py +++ b/config/celery.py @@ -15,12 +15,12 @@ # Exchange 정의 ocr_exchange = Exchange("ocr_exchange", type="direct", durable=True) -fcm_exchange = Exchange("fcm_exchange", type="direct", durable=True) dlq_exchange = Exchange("dlq_exchange", type="fanout", durable=True) # Queue 정의 +# Note: fcm_queue 제거 — Alert Service는 Celery Command가 아닌 +# AMQP domain_events exchange의 detections.completed 이벤트를 직접 구독 (Choreography) app.conf.task_queues = ( - # 새로운 Queue (PRD 구조) Queue( "ocr_queue", exchange=ocr_exchange, @@ -31,15 +31,6 @@ "x-max-priority": 10, }, ), - Queue( - "fcm_queue", - exchange=fcm_exchange, - routing_key="fcm", - queue_arguments={ - "x-dead-letter-exchange": "dlq_exchange", - "x-message-ttl": 3600000, - }, - ), Queue( "dlq_queue", exchange=dlq_exchange, @@ -49,17 +40,11 @@ # Task 라우팅 app.conf.task_routes = { - # 새로운 Tasks (PRD 구조) "tasks.ocr_tasks.process_ocr": { "queue": "ocr_queue", "exchange": "ocr_exchange", "routing_key": "ocr", }, - "tasks.notification_tasks.send_notification": { - "queue": "fcm_queue", - "exchange": "fcm_exchange", - "routing_key": "fcm", - }, "tasks.dlq_tasks.process_dlq_message": { "queue": "dlq_queue", "exchange": "dlq_exchange", diff --git a/core/events/__init__.py b/core/events/__init__.py new file mode 100644 index 0000000..9179f08 --- /dev/null +++ b/core/events/__init__.py @@ -0,0 +1,5 @@ +# Domain Events Module (AMQP) +# 백엔드 서비스 간 도메인 이벤트 발행/구독 (Choreography) +from .publisher import publish_event + +__all__ = ["publish_event"] diff --git a/core/events/consumer.py b/core/events/consumer.py new file mode 100644 index 0000000..6aa76b6 --- /dev/null +++ b/core/events/consumer.py @@ -0,0 +1,106 @@ +""" +AMQP Domain Event Consumer (Kombu) + +Alert Service가 도메인 이벤트를 직접 구독하여 자율적으로 처리. +Choreography: 각 서비스는 이벤트에 독립적으로 반응한다. + +Main Service를 거치지 않고 Alert Service가 직접 +detections.completed 이벤트를 구독하여 알림 발송 여부를 결정한다. +""" + +import json +import logging +import os +import time + +from kombu import Connection, Exchange, Queue +from kombu.mixins import ConsumerMixin + +logger = logging.getLogger(__name__) + +DOMAIN_EVENTS_EXCHANGE = Exchange("domain_events", type="topic", durable=True) + + +class AlertEventConsumer(ConsumerMixin): + """ + Alert Service 도메인 이벤트 소비자 + + detections.completed 이벤트를 구독하고, + Alert Service가 자율적으로 알림 발송 여부를 결정한다. + OCR Service의 존재도, Main Service의 중개도 모른다. + """ + + def __init__(self, connection): + self.connection = connection + + def get_consumers(self, Consumer, channel): + queue = Queue( + "alert_domain_events", + exchange=DOMAIN_EVENTS_EXCHANGE, + routing_key="detections.completed", + durable=True, + queue_arguments={ + "x-dead-letter-exchange": "dlq_exchange", + }, + ) + return [ + Consumer( + queues=[queue], + callbacks=[self.on_event], + accept=["json"], + ) + ] + + def on_event(self, body, message): + """도메인 이벤트 수신 및 처리""" + try: + payload = json.loads(body) if isinstance(body, str) else body + routing_key = message.delivery_info.get("routing_key", "") + + if routing_key == "detections.completed": + self._on_detection_completed(payload) + + message.ack() + except Exception as e: + logger.error(f"Failed to process domain event: {e}") + message.reject(requeue=False) + + def _on_detection_completed(self, payload): + """ + detections.completed 이벤트에 반응 + + Alert Service의 자율적 판단: + "OCR이 완료됐으니 알림을 보내야겠다" + """ + detection_id = payload["detection_id"] + logger.info( + f"Detection {detection_id} completed event received — " + f"processing notification" + ) + + max_retries = 3 + for attempt in range(max_retries + 1): + try: + from tasks.notification_tasks import process_notification + + process_notification(detection_id) + return + except Exception as e: + if "DoesNotExist" in type(e).__name__ and attempt < max_retries: + logger.warning( + f"Detection {detection_id} not ready, " + f"retry {attempt + 1}/{max_retries}" + ) + time.sleep(3) + else: + raise + + +def start_event_consumer(): + """Alert Service 도메인 이벤트 소비자 시작 (blocking)""" + broker_url = os.getenv("CELERY_BROKER_URL", "amqp://guest:guest@rabbitmq:5672//") + logger.info(f"Starting Alert Event Consumer on {broker_url}") + + with Connection(broker_url) as conn: + consumer = AlertEventConsumer(conn) + consumer.run() diff --git a/core/events/publisher.py b/core/events/publisher.py new file mode 100644 index 0000000..eddd5e6 --- /dev/null +++ b/core/events/publisher.py @@ -0,0 +1,46 @@ +""" +AMQP Domain Event Publisher (Kombu) + +Choreography 패턴에서 백엔드 서비스 간 도메인 이벤트 발행. +프로토콜 분리 원칙: IoT 경계는 MQTT, 서비스 간 이벤트는 AMQP. +""" + +import json +import logging +import os + +from kombu import Connection, Exchange + +logger = logging.getLogger(__name__) + +# 도메인 이벤트 교환기 (topic exchange: routing key 기반 선택적 구독) +DOMAIN_EVENTS_EXCHANGE = Exchange("domain_events", type="topic", durable=True) + + +def publish_event(routing_key: str, payload: dict): + """ + AMQP 도메인 이벤트 발행 + + Topic exchange를 사용하여 이벤트를 발행하면, + 관심 있는 서비스가 routing key 패턴으로 독립적으로 구독한다. + + Args: + routing_key: 이벤트 라우팅 키 (예: "detections.completed") + payload: 이벤트 페이로드 + """ + broker_url = os.getenv("CELERY_BROKER_URL", "amqp://guest:guest@rabbitmq:5672//") + + try: + with Connection(broker_url) as conn: + producer = conn.Producer() + producer.publish( + json.dumps(payload), + exchange=DOMAIN_EVENTS_EXCHANGE, + routing_key=routing_key, + content_type="application/json", + declare=[DOMAIN_EVENTS_EXCHANGE], + ) + logger.info(f"Domain event published: {routing_key} -> {payload}") + except Exception as e: + logger.error(f"Failed to publish domain event {routing_key}: {e}") + raise diff --git a/core/firebase/fcm.py b/core/firebase/fcm.py index 6316b99..0f85bc1 100644 --- a/core/firebase/fcm.py +++ b/core/firebase/fcm.py @@ -31,7 +31,7 @@ def initialize_firebase(): firebase_admin.initialize_app(cred) logger.info(f"Firebase initialized with credentials: {cred_path}") else: - # GOOGLE_APPLICATION_CREDENTIALS 사용 + # ADC (Application Default Credentials) 사용 firebase_admin.initialize_app() logger.info("Firebase initialized with default credentials") diff --git a/core/mqtt/__init__.py b/core/mqtt/__init__.py index 11ce08c..0bcd00a 100644 --- a/core/mqtt/__init__.py +++ b/core/mqtt/__init__.py @@ -1,4 +1,4 @@ -# MQTT Module +# MQTT Module (IoT Edge 전용) from .subscriber import MQTTSubscriber __all__ = ["MQTTSubscriber"] diff --git a/core/mqtt/subscriber.py b/core/mqtt/subscriber.py index f37bbe1..826ffd9 100644 --- a/core/mqtt/subscriber.py +++ b/core/mqtt/subscriber.py @@ -1,4 +1,9 @@ -"""MQTT Subscriber for Edge Device messages""" +""" +MQTT Subscriber for IoT Edge Device messages + +IoT 디바이스(Raspberry Pi 카메라)에서 오는 MQTT 메시지만 처리. +프로토콜 분리 원칙: MQTT는 IoT 경계 전용, 서비스 간 이벤트는 AMQP. +""" import json import logging @@ -13,13 +18,10 @@ class MQTTSubscriber: """ - RabbitMQ MQTT Plugin을 통해 Edge Device 메시지를 수신하는 Subscriber + RabbitMQ MQTT Plugin을 통해 IoT 디바이스 메시지를 수신하는 Subscriber - Flow: - 1. Raspberry Pi -> MQTT Publish (detections/new) - 2. RabbitMQ MQTT Plugin -> 내부 변환 - 3. Django MQTT Subscriber -> 메시지 수신 - 4. Detection 생성 (pending) -> OCR Task 발행 + 구독 토픽: + - detections/new : IoT 디바이스 → Detection 생성 → OCR 발행 """ def __init__(self): @@ -33,12 +35,12 @@ def __init__(self): self.client.on_disconnect = self.on_disconnect # 인증 설정 - username = os.getenv("MQTT_USER", "sa") - password = os.getenv("MQTT_PASS", "1234") + username = os.getenv("MQTT_USER", "") + password = os.getenv("MQTT_PASS", "") self.client.username_pw_set(username, password) def on_connect(self, client, userdata, flags, reason_code, properties): - """MQTT 연결 시 토픽 구독""" + """MQTT 연결 시 IoT 토픽 구독""" if reason_code.is_failure: logger.error(f"MQTT connection failed: {reason_code}") else: @@ -55,20 +57,30 @@ def on_disconnect( ) def on_message(self, client, userdata, msg): + """메시지 처리""" + try: + payload = json.loads(msg.payload.decode()) + except json.JSONDecodeError as e: + logger.error(f"Invalid JSON in MQTT message: {e}") + return + + if msg.topic == "detections/new": + self._handle_new_detection(payload) + else: + logger.warning(f"Unknown MQTT topic: {msg.topic}") + + def _handle_new_detection(self, payload): """ - 메시지 수신 시 처리 + detections/new: IoT 디바이스에서 새 감지 수신 1. DB에 pending 레코드 즉시 생성 (데이터 손실 방지) - 2. OCR Task 발행 + 2. OCR Task 발행 (AMQP) """ try: - payload = json.loads(msg.payload.decode()) logger.info(f"Received MQTT message: {payload.get('camera_id')}") - # Import here to avoid circular imports from apps.detections.models import Detection from tasks.ocr_tasks import process_ocr - # 1. Detection 레코드 생성 (status=pending, detections_db) detection = Detection.objects.using("detections_db").create( camera_id=payload.get("camera_id"), location=payload.get("location"), @@ -81,7 +93,6 @@ def on_message(self, client, userdata, msg): logger.info(f"Detection {detection.id} created (pending)") - # 2. OCR Task 발행 (AMQP via Celery) process_ocr.apply_async( args=[detection.id], kwargs={"gcs_uri": payload["image_gcs_uri"]}, @@ -91,12 +102,10 @@ def on_message(self, client, userdata, msg): logger.info(f"OCR task dispatched for detection {detection.id}") - except json.JSONDecodeError as e: - logger.error(f"Invalid JSON in MQTT message: {e}") except KeyError as e: logger.error(f"Missing required field in MQTT message: {e}") except Exception as e: - logger.error(f"Error processing MQTT message: {e}") + logger.error(f"Error processing new detection: {e}") @staticmethod def _parse_detected_at(value): diff --git a/credentials/README.md b/credentials/README.md index beaf23d..3011e0e 100644 --- a/credentials/README.md +++ b/credentials/README.md @@ -1,28 +1,33 @@ # Credentials 폴더 -이 폴더에는 GCP 및 Firebase 인증 관련 파일들이 위치합니다. +이 폴더에는 Firebase 인증 파일이 위치합니다. +GCS (Cloud Storage)는 ADC를 사용하므로 JSON 키가 불필요합니다. ## 파일 목록 | 파일명 | 용도 | 환경변수 | |--------|------|----------| | `firebase-service-account.json` | FCM 푸시 알림 | `FIREBASE_CREDENTIALS` | -| `gcp-cloud-storage.json` | GCS 이미지 저장소 | `GOOGLE_APPLICATION_CREDENTIALS` | + +## 인증 방식 + +| 서비스 | 인증 방식 | 설정 | +|--------|----------|------| +| **GCS (Cloud Storage)** | ADC (Application Default Credentials) | GCE: 자동, 로컬: `gcloud auth application-default login` | +| **Firebase (FCM)** | Service Account JSON 키 | `FIREBASE_CREDENTIALS` 환경변수로 경로 지정 | ## 설정 방법 -### 1. Firebase Service Account +### Firebase Service Account 1. [Firebase Console](https://console.firebase.google.com/) 접속 2. 프로젝트 설정 → 서비스 계정 → 새 비공개 키 생성 3. 다운로드한 JSON 파일을 `firebase-service-account.json`으로 저장 -### 2. GCP Service Account (Cloud Storage) -1. [GCP Console](https://console.cloud.google.com/) 접속 -2. IAM 및 관리자 → 서비스 계정 → 키 생성 -3. 필요한 역할: `Storage Object Viewer`, `Storage Object Creator` -4. 다운로드한 JSON 파일을 `gcp-cloud-storage.json`으로 저장 +### GCS (ADC) +- GCE 인스턴스: 설정 불필요 (메타데이터 서버에서 자동 인증) +- 로컬 개발: `gcloud auth application-default login` 실행 ## 주의사항 -- ⚠️ **실제 인증 파일은 절대 Git에 커밋하지 마세요** -- Docker 환경에서는 볼륨 마운트 또는 Secret Manager 사용 권장 +- `GOOGLE_APPLICATION_CREDENTIALS` 환경변수를 설정하지 마세요 (ADC 자동 탐색 방해) +- Firebase JSON 키는 절대 Git에 커밋하지 마세요 diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 94270e4..08d8c82 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -72,8 +72,6 @@ services: container_name: speedcam-ocr env_file: - ../backend.env - volumes: - - ../credentials:/app/credentials:ro depends_on: - main - rabbitmq diff --git a/docker/k6/mqtt-load-test.py b/docker/k6/mqtt-load-test.py index 89b836e..06a3bb4 100644 --- a/docker/k6/mqtt-load-test.py +++ b/docker/k6/mqtt-load-test.py @@ -1,28 +1,50 @@ #!/usr/bin/env python3 """ -MQTT Load Test - IoT Device Simulation - -Simulates Raspberry Pi cameras sending detection messages via MQTT. -Full pipeline: MQTT → Detection (pending) → OCR Worker → Alert Worker +MQTT Full Pipeline Load Test +Full pipeline: MQTT → Main → AMQP → OCR (EasyOCR) → AMQP Event → Alert + +Usage: + python mqtt-load-test.py smoke + python mqtt-load-test.py baseline + python mqtt-load-test.py saturation + python mqtt-load-test.py spike + python mqtt-load-test.py sustained """ import argparse import json import os import random +import sys import threading import time from datetime import datetime, timedelta, timezone +from typing import Dict, Optional import paho.mqtt.client as mqtt # Config from environment -MQTT_HOST = os.getenv("MQTT_HOST", "rabbitmq") +MQTT_HOST = os.getenv("MQTT_HOST", "10.178.0.7") MQTT_PORT = int(os.getenv("MQTT_PORT", "1883")) MQTT_USER = os.getenv("MQTT_USER", "sa") MQTT_PASS = os.getenv("MQTT_PASS", "1234") TOPIC = "detections/new" +# Database config +DB_HOST = os.getenv("DB_HOST", "10.178.0.2") +DB_PORT = int(os.getenv("DB_PORT", "3306")) +DB_USER = os.getenv("DB_USER", "sa") +DB_PASS = os.getenv("DB_PASS", "hG57vv9aJHPGm0X82xlQ") +DB_NAME = os.getenv("DB_NAME_DETECTIONS", "speedcam_detections") + +# GCS 설정 +GCS_BUCKET = os.getenv("GCS_BUCKET", "speedcam-bucket-4f918446") +GCS_IMAGES = ( + [f"real-plate-{str(i).zfill(2)}.jpg" for i in range(1, 11)] + + [f"plate-{str(i).zfill(2)}.jpg" for i in range(1, 11)] + + [f"test-plate-{i}.jpg" for i in range(1, 6)] +) + # Locations for realistic simulation LOCATIONS = [ "서울시 강남구 테헤란로", @@ -37,6 +59,60 @@ CAMERA_IDS = [f"CAM-{str(i).zfill(3)}" for i in range(1, 21)] +# Scenario definitions +SCENARIOS = { + "smoke": { + "workers": 1, + "rate": 0, # Manual single message + "duration": 0, + "expected_messages": 1, + "expected_throughput": 0.2, + "expected_queue_depth": 0, + "expected_error_rate": 0, + "description": "Verify pipeline works end-to-end", + }, + "baseline": { + "workers": 1, + "rate": 0.2, + "duration": 60, + "expected_messages": 12, + "expected_throughput": 0.2, + "expected_queue_depth": 0, + "expected_error_rate": 1, + "description": "Match OCR capacity, measure steady-state latency", + }, + "saturation": { + "workers": 3, + "rate": 1, + "duration": 60, + "expected_messages": 180, + "expected_throughput": 0.2, + "expected_queue_depth": 168, # ~180 - 12 processed + "expected_error_rate": 1, + "description": "Exceed OCR capacity to test queuing behavior", + }, + "spike": { + "workers": 5, + "rate": 2, + "duration": 10, + "expected_messages": 100, + "expected_throughput": 0.2, + "expected_queue_depth": 98, # ~100 - 2 processed + "expected_error_rate": 1, + "description": "Sudden burst, measure recovery", + }, + "sustained": { + "workers": 2, + "rate": 0.5, + "duration": 300, + "expected_messages": 300, + "expected_throughput": 0.2, + "expected_queue_depth": 240, # ~300 - 60 processed + "expected_error_rate": 1, + "description": "Long-running stability test", + }, +} + # Stats stats = { "published": 0, @@ -47,7 +123,7 @@ stats_lock = threading.Lock() -def generate_message(): +def generate_message() -> str: """Generate a realistic detection message.""" kst = timezone(timedelta(hours=9)) speed_limit = random.choice([60.0, 80.0, 100.0, 110.0]) @@ -61,14 +137,55 @@ def generate_message(): "speed_limit": speed_limit, "detected_at": datetime.now(kst).isoformat(), "image_gcs_uri": ( - f"gs://speedcam-bucket/detections/" - f"{int(time.time() * 1000)}-{random.randint(1000, 9999)}.jpg" + f"gs://{GCS_BUCKET}/detections/{random.choice(GCS_IMAGES)}" ), } ) -def publish_worker(worker_id, rate_per_sec, duration_sec): +def publish_single_message() -> bool: + """Publish a single message for smoke test.""" + client = mqtt.Client( + callback_api_version=mqtt.CallbackAPIVersion.VERSION2, + protocol=mqtt.MQTTv311, + client_id=f"loadtest-smoke-{os.getpid()}", + ) + client.username_pw_set(MQTT_USER, MQTT_PASS) + + try: + client.connect(MQTT_HOST, MQTT_PORT, keepalive=60) + client.loop_start() + time.sleep(0.5) # Allow connection to establish + + msg = generate_message() + start = time.time() + result = client.publish(TOPIC, msg, qos=1) + result.wait_for_publish(timeout=5) + + if result.rc == mqtt.MQTT_ERR_SUCCESS: + latency_ms = (time.time() - start) * 1000 + print(f"✓ Message published successfully (latency: {latency_ms:.2f}ms)") + print(f" Payload: {msg[:100]}...") + with stats_lock: + stats["published"] = 1 + stats["total_latency_ms"] = latency_ms + return True + else: + print(f"✗ Publish failed with code {result.rc}") + with stats_lock: + stats["failed"] = 1 + return False + except Exception as e: + print(f"✗ Exception during publish: {e}") + with stats_lock: + stats["failed"] = 1 + return False + finally: + client.loop_stop() + client.disconnect() + + +def publish_worker(worker_id: int, rate_per_sec: float, duration_sec: int): """Single worker thread that publishes MQTT messages.""" client = mqtt.Client( callback_api_version=mqtt.CallbackAPIVersion.VERSION2, @@ -123,22 +240,271 @@ def print_stats(): print(f"\n{'='*60}") print(f" Elapsed: {elapsed:.1f}s | Published: {published} | Failed: {failed}") - print(f" Rate: {rate:.1f} msg/s | Avg Latency: {avg_latency:.2f}ms") + print(f" Rate: {rate:.2f} msg/s | Avg Latency: {avg_latency:.2f}ms") print(f" Error Rate: {(failed/total*100) if total > 0 else 0:.2f}%") print(f"{'='*60}") -def run_load_test(workers, rate_per_worker, duration): - """Run the load test with multiple workers.""" - print("\n MQTT Load Test Starting") +def get_db_connection(): + """Try to import and connect to MySQL database.""" + # Try pymysql first + try: + import pymysql + conn = pymysql.connect( + host=DB_HOST, + port=DB_PORT, + user=DB_USER, + password=DB_PASS, + database=DB_NAME, + charset='utf8mb4', + cursorclass=pymysql.cursors.DictCursor + ) + return conn, 'pymysql' + except ImportError: + pass + except Exception as e: + print(f"⚠ pymysql connection failed: {e}") + + # Try mysql.connector + try: + import mysql.connector + conn = mysql.connector.connect( + host=DB_HOST, + port=DB_PORT, + user=DB_USER, + password=DB_PASS, + database=DB_NAME, + charset='utf8mb4' + ) + return conn, 'mysql.connector' + except ImportError: + pass + except Exception as e: + print(f"⚠ mysql.connector connection failed: {e}") + + return None, None + + +def verify_pipeline(expected_count: int, max_wait_sec: int = 30) -> Optional[Dict]: + """Verify the pipeline by querying MySQL database.""" + conn, driver = get_db_connection() + if not conn: + print("\n⚠ WARNING: No MySQL driver available (pymysql or mysql-connector-python)") + print(" Pipeline verification skipped. Install pymysql to enable:") + print(" pip install pymysql") + return None + + print(f"\n{'='*60}") + print("=== PIPELINE VERIFICATION ===") + print(f" Database: {DB_HOST}:{DB_PORT}/{DB_NAME} (driver: {driver})") + print(f"{'='*60}") + + try: + cursor = conn.cursor() + + # Initial check + cursor.execute(""" + SELECT + COUNT(*) as total, + SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed, + SUM(CASE WHEN status = 'processing' THEN 1 ELSE 0 END) as processing, + SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) as pending, + SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed + FROM detections + WHERE created_at >= DATE_SUB(NOW(), INTERVAL 10 MINUTE) + """) + + if driver == 'pymysql': + result = cursor.fetchone() + else: # mysql.connector + result = cursor.fetchone() + # Convert tuple to dict for mysql.connector + columns = [desc[0] for desc in cursor.description] + result = dict(zip(columns, result)) + + initial_total = result['total'] or 0 + print(f"\nInitial state (last 10 minutes):") + print(f" Total detections: {initial_total}") + print(f" - completed: {result['completed'] or 0}") + print(f" - processing: {result['processing'] or 0}") + print(f" - pending: {result['pending'] or 0}") + print(f" - failed: {result['failed'] or 0}") + + # Wait for pipeline to process + if max_wait_sec > 0: + print(f"\nWaiting up to {max_wait_sec}s for pipeline to drain...") + wait_start = time.time() + while time.time() - wait_start < max_wait_sec: + cursor.execute(""" + SELECT + COUNT(*) as total, + SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed, + SUM(CASE WHEN status = 'processing' THEN 1 ELSE 0 END) as processing, + SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) as pending, + SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed + FROM detections + WHERE created_at >= DATE_SUB(NOW(), INTERVAL 10 MINUTE) + """) + + if driver == 'pymysql': + result = cursor.fetchone() + else: + result = cursor.fetchone() + columns = [desc[0] for desc in cursor.description] + result = dict(zip(columns, result)) + + processing = result['processing'] or 0 + pending = result['pending'] or 0 + + if processing == 0 and pending == 0: + print(f"✓ Pipeline drained after {time.time() - wait_start:.1f}s") + break + + print(f" [{time.time() - wait_start:.1f}s] processing: {processing}, pending: {pending}") + time.sleep(2) + + # Final status + cursor.execute(""" + SELECT + COUNT(*) as total, + SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed, + SUM(CASE WHEN status = 'processing' THEN 1 ELSE 0 END) as processing, + SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) as pending, + SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed + FROM detections + WHERE created_at >= DATE_SUB(NOW(), INTERVAL 10 MINUTE) + """) + + if driver == 'pymysql': + final_result = cursor.fetchone() + else: + final_result = cursor.fetchone() + columns = [desc[0] for desc in cursor.description] + final_result = dict(zip(columns, final_result)) + + print(f"\nFinal state:") + print(f" Total detections created: {final_result['total']}") + print(f" - completed: {final_result['completed'] or 0}") + print(f" - processing: {final_result['processing'] or 0}") + print(f" - pending: {final_result['pending'] or 0}") + print(f" - failed: {final_result['failed'] or 0}") + + total = final_result['total'] or 0 + completed = final_result['completed'] or 0 + completion_rate = (completed / total * 100) if total > 0 else 0 + print(f" Pipeline completion rate: {completed}/{total} = {completion_rate:.1f}%") + + cursor.close() + return final_result + except Exception as e: + print(f"✗ Pipeline verification error: {e}") + return None + finally: + conn.close() + + +def print_hypothesis(scenario_name: str, config: Dict): + """Print hypothesis for the scenario.""" + print(f"\n{'='*60}") + print("=== HYPOTHESIS ===") + print(f" Scenario: {scenario_name.upper()} - {config['description']}") + print(f" Expected published: {config['expected_messages']} messages") + print(f" Expected OCR throughput: ~{config['expected_throughput']} msg/s (OCR_CONCURRENCY=1, ~5s/image)") + if config['duration'] > 0: + expected_processed = int(config['expected_throughput'] * config['duration']) + print(f" Expected processed in {config['duration']}s: ~{expected_processed} messages") + print(f" Expected queue depth at end: ~{config['expected_queue_depth']} messages") + print(f" Expected error rate: <{config['expected_error_rate']}%") + print(f"{'='*60}\n") + + +def print_comparison(scenario_name: str, config: Dict, db_result: Optional[Dict]): + """Print comparison between hypothesis and actual results.""" + print(f"\n{'='*60}") + print("=== COMPARISON ===") + + # Published vs Expected + published = stats["published"] + expected_pub = config["expected_messages"] + pub_match = abs(published - expected_pub) <= max(1, expected_pub * 0.1) # 10% tolerance + print(f" Published: {published} vs Expected: {expected_pub} {'✓ PASS' if pub_match else '✗ FAIL'}") + + # Error rate + failed = stats["failed"] + total = published + failed + actual_error_rate = (failed / total * 100) if total > 0 else 0 + expected_error_rate = config["expected_error_rate"] + error_match = actual_error_rate <= expected_error_rate + print(f" Error rate: {actual_error_rate:.2f}% vs Expected: <{expected_error_rate}% {'✓ PASS' if error_match else '✗ FAIL'}") + + # Queue depth (from DB if available) + if db_result: + processing = db_result['processing'] or 0 + pending = db_result['pending'] or 0 + actual_queue = processing + pending + expected_queue = config['expected_queue_depth'] + print(f" Queue depth: {actual_queue} vs Expected: ~{expected_queue} (INFO)") + + # Pipeline completion + completed = db_result['completed'] or 0 + db_total = db_result['total'] or 0 + if db_total > 0: + completion_rate = completed / db_total * 100 + print(f" Pipeline completion: {completion_rate:.1f}% ({completed}/{db_total})") + + print(f"{'='*60}\n") + + +def run_scenario(scenario_name: str): + """Run a specific test scenario.""" + if scenario_name not in SCENARIOS: + print(f"✗ Unknown scenario: {scenario_name}") + print(f" Available scenarios: {', '.join(SCENARIOS.keys())}") + sys.exit(1) + + config = SCENARIOS[scenario_name] + + # Print hypothesis + print_hypothesis(scenario_name, config) + + # Handle smoke test separately + if scenario_name == "smoke": + print("=== EXECUTION (SMOKE TEST) ===") + print("Publishing single message...\n") + stats["start_time"] = time.time() + success = publish_single_message() + + if success: + print("\n=== RESULTS ===") + print(f" Published: 1 | Failed: 0") + print(f" Avg Latency: {stats['total_latency_ms']:.2f}ms") + + # Wait a bit for pipeline + print("\nWaiting 10s for pipeline to process...") + time.sleep(10) + + # Verify pipeline + db_result = verify_pipeline(expected_count=1, max_wait_sec=20) + + # Print comparison + print_comparison(scenario_name, config, db_result) + else: + print("\n✗ Smoke test failed - message not published") + sys.exit(1) + + return + + # Regular load test + workers = config["workers"] + rate = config["rate"] + duration = config["duration"] + + print("=== EXECUTION ===") print(f" Host: {MQTT_HOST}:{MQTT_PORT}") print(f" Workers: {workers}") - print( - f" Rate: {rate_per_worker}/s per worker ({workers * rate_per_worker}/s total)" - ) + print(f" Rate: {rate}/s per worker ({workers * rate}/s total)") print(f" Duration: {duration}s") - print(f" Topic: {TOPIC}") - print() + print(f" Topic: {TOPIC}\n") stats["start_time"] = time.time() threads = [] @@ -146,7 +512,7 @@ def run_load_test(workers, rate_per_worker, duration): for i in range(workers): t = threading.Thread( target=publish_worker, - args=(i, rate_per_worker, duration), + args=(i, rate, duration), ) t.start() threads.append(t) @@ -160,24 +526,59 @@ def run_load_test(workers, rate_per_worker, duration): for t in threads: t.join(timeout=10) - print("\n FINAL RESULTS") - print_stats() + # Final results + print("\n=== RESULTS ===") + elapsed = time.time() - stats["start_time"] + published = stats["published"] + failed = stats["failed"] + total = published + failed + rate_actual = published / elapsed if elapsed > 0 else 0 + avg_latency = stats["total_latency_ms"] / published if published > 0 else 0 + + print(f" Published: {published} | Failed: {failed}") + print(f" Rate: {rate_actual:.2f} msg/s | Avg Latency: {avg_latency:.2f}ms") + print(f" Error Rate: {(failed/total*100) if total > 0 else 0:.2f}%") + + # Verify pipeline + wait_time = min(30, duration // 2) # Wait up to 30s or half test duration + db_result = verify_pipeline(expected_count=published, max_wait_sec=wait_time) + + # Print comparison + print_comparison(scenario_name, config, db_result) def main(): - parser = argparse.ArgumentParser(description="MQTT Load Test") - parser.add_argument( - "--workers", type=int, default=5, help="Number of concurrent workers" + parser = argparse.ArgumentParser( + description="MQTT Full Pipeline Load Test", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Available scenarios: + smoke - Verify pipeline works end-to-end (1 message) + baseline - Match OCR capacity (1 worker, 0.2 msg/s, 60s) + saturation - Exceed OCR capacity (3 workers, 1 msg/s, 60s) + spike - Sudden burst (5 workers, 2 msg/s, 10s) + sustained - Long-running stability (2 workers, 0.5 msg/s, 300s) + +Examples: + python mqtt-load-test.py smoke + python mqtt-load-test.py baseline + python mqtt-load-test.py saturation + """ ) parser.add_argument( - "--rate", type=int, default=2, help="Messages per second per worker" - ) - parser.add_argument( - "--duration", type=int, default=60, help="Test duration in seconds" + "scenario", + choices=list(SCENARIOS.keys()), + help="Test scenario to run" ) + args = parser.parse_args() - run_load_test(args.workers, args.rate, args.duration) + # Verify MQTT credentials + if not MQTT_PASS: + print("✗ Error: MQTT_PASS environment variable not set") + sys.exit(1) + + run_scenario(args.scenario) if __name__ == "__main__": diff --git a/docs/PRD.md b/docs/PRD.md index faf015e..e5dca8b 100644 --- a/docs/PRD.md +++ b/docs/PRD.md @@ -105,11 +105,13 @@ graph TB OCR -->|6. 이미지 다운로드| GCS OCR -->|7. 결과 업데이트| MySQL_Detections OCR -->|7-1. 차량 조회| MySQL_Vehicles - OCR -->|8. AMQP Publish| AMQP + OCR -->|8. MQTT Publish
detections/completed| MQTT + MQTT -->|9. MQTT Subscribe| Django + Django -->|10. AMQP Publish| AMQP AMQP -->|fcm_queue| FCM - FCM -->|9. 차량/토큰 조회| MySQL_Vehicles - FCM -->|10. 푸시 전송| Firebase - FCM -->|11. 이력 저장| MySQL_Notifications + FCM -->|11. 차량/토큰 조회| MySQL_Vehicles + FCM -->|12. 푸시 전송| Firebase + FCM -->|13. 이력 저장| MySQL_Notifications ``` ### 2.4 이벤트 흐름 (Sequence Diagram) @@ -140,16 +142,17 @@ sequenceDiagram OCR->>OCR: 8. EasyOCR 실행 OCR->>DDB: 9. 직접 업데이트 (status=completed) OCR->>VDB: 10. 번호판으로 Vehicle 조회 - alt 차량 & FCM 토큰 존재 - OCR->>DDB: 11. vehicle_id 매핑 - OCR->>AMQP: 12. Publish to fcm_exchange (Direct) - end - - AMQP->>FCM: 13. Consume from fcm_queue - FCM->>DDB: 14. Detection 조회 - FCM->>VDB: 15. Vehicle/FCM 토큰 조회 - FCM->>FCM: 16. FCM API 호출 - FCM->>NDB: 17. 알림 이력 저장 + OCR->>DDB: 11. vehicle_id 매핑 + OCR->>MQTT: 12. MQTT Publish (detections/completed) + + MQTT->>Django: 13. MQTT Subscribe (detections/completed) + Django->>AMQP: 14. Publish to fcm_exchange + + AMQP->>FCM: 15. Consume from fcm_queue + FCM->>DDB: 16. Detection 조회 + FCM->>VDB: 17. Vehicle/FCM 토큰 조회 + FCM->>FCM: 18. FCM API 호출 + FCM->>NDB: 19. 알림 이력 저장 ``` --- @@ -181,11 +184,15 @@ sequenceDiagram | Image Processing | OpenCV | 4.10.0.84 | | Image Library | Pillow | 11.2.1 | -### 3.4 Monitoring (Optional) +### 3.4 Monitoring & Observability | 구분 | 기술 | 용도 | |------|------|------| +| Metrics | Prometheus + Grafana | 시스템/컨테이너 메트릭 수집 및 시각화 | +| Logging | Loki + Promtail | 중앙 집중식 로그 수집 및 검색 | +| Tracing | OpenTelemetry + Jaeger | 분산 트레이싱 (서비스 간 요청 추적) | | Task Monitoring | Flower | Celery Task 모니터링 | | Queue Dashboard | RabbitMQ Management | Queue 상태 확인 | +| Container Metrics | cAdvisor | 컨테이너 리소스 사용량 | --- @@ -397,15 +404,17 @@ graph LR direction TB M1[Raspberry Pi] -->|Publish| M2[detections/new] M2 -->|Subscribe| M3[Django] + M4[OCR Worker] -->|Publish| M5[detections/completed] + M5 -->|Subscribe| M3 end - + subgraph AMQP["AMQP (Port 5672)"] direction TB A1[Django] -->|Publish| A2[ocr_exchange] A2 -->|Route| A3[ocr_queue] A3 -->|Consume| A4[OCR Worker] - - A4 -->|Publish| A5[fcm_exchange] + + A1 -->|Publish| A5[fcm_exchange] A5 -->|Route| A6[fcm_queue] A6 -->|Consume| A7[Alert Worker] end @@ -413,8 +422,8 @@ graph LR | 프로토콜 | 용도 | 특징 | |----------|------|------| -| **MQTT** | Raspberry Pi → Django | 경량 프로토콜, IoT 디바이스에 적합, QoS 1 | -| **AMQP** | Django ↔ Celery Workers | 안정적인 메시지 전달, Exchange/Queue 라우팅 | +| **MQTT** | IoT → Django, OCR → Django (도메인 이벤트) | 경량 프로토콜, QoS 1, Choreography 이벤트 전파 | +| **AMQP** | Django → Celery Workers (Task 분배) | 안정적인 메시지 전달, Exchange/Queue 라우팅 | ### 5.2 Exchange 설계 @@ -512,6 +521,18 @@ QUEUES = { │ Detection 업데이트 (detections_db) │ Vehicle 조회 (vehicles_db) │ + │ MQTT Publish (Choreography) + │ Topic: detections/completed + │ QoS: 1 + ▼ +[RabbitMQ MQTT Plugin] + │ + │ MQTT Subscribe + ▼ +[Django MQTT Subscriber] + │ + │ 이벤트 수신 & 라우팅 + │ │ AMQP Publish │ Exchange: fcm_exchange │ Routing Key: fcm @@ -674,7 +695,8 @@ backend/ │ ├── __init__.py │ ├── mqtt/ │ │ ├── __init__.py -│ │ └── subscriber.py # Main Service 전용 +│ │ ├── publisher.py # 도메인 이벤트 발행 (Choreography) +│ │ └── subscriber.py # 도메인 이벤트 수신 및 라우팅 │ ├── gcs/ │ │ ├── __init__.py │ │ └── client.py # GCS 클라이언트 @@ -743,6 +765,7 @@ easyocr==1.7.2 opencv-python-headless==4.10.0.84 pillow==11.2.1 google-cloud-storage==2.18.2 +paho-mqtt==2.1.0 ``` **requirements/alert.txt** (Alert Service) @@ -1015,81 +1038,132 @@ app.autodiscover_tasks(['tasks']) import json import os import logging -import threading import paho.mqtt.client as mqtt from django.utils import timezone -from apps.detections.models import Detection -from tasks.ocr_tasks import process_ocr +from django.utils.dateparse import parse_datetime logger = logging.getLogger(__name__) class MQTTSubscriber: + """ + RabbitMQ MQTT Plugin을 통해 도메인 이벤트를 수신하는 Subscriber + + Choreography 패턴: 각 서비스는 이벤트를 발행하고, + 관심 있는 서비스가 독립적으로 구독하여 처리한다. + + 구독 토픽: + - detections/new : IoT 디바이스 → Detection 생성 → OCR 발행 + - detections/completed : OCR 완료 이벤트 → Notification 발행 + """ + def __init__(self): self.client = mqtt.Client( callback_api_version=mqtt.CallbackAPIVersion.VERSION2, - protocol=mqtt.MQTTv5, + protocol=mqtt.MQTTv311, client_id=f"django-main-{os.getpid()}" ) self.client.on_connect = self.on_connect self.client.on_message = self.on_message self.client.on_disconnect = self.on_disconnect - - # 인증 설정 - username = os.getenv('MQTT_USER', 'sa') - password = os.getenv('MQTT_PASS', '1234') + + username = os.getenv('MQTT_USER', '') + password = os.getenv('MQTT_PASS', '') self.client.username_pw_set(username, password) - - def on_connect(self, client, userdata, flags, rc, properties=None): - logger.info(f"Connected to MQTT broker with code {rc}") - client.subscribe("detections/new", qos=1) - + + def on_connect(self, client, userdata, flags, reason_code, properties): + if reason_code.is_failure: + logger.error(f"MQTT connection failed: {reason_code}") + else: + logger.info("Connected to MQTT broker") + client.subscribe("detections/new", qos=1) + client.subscribe("detections/completed", qos=1) + def on_message(self, client, userdata, msg): + """토픽별 메시지 라우팅""" try: payload = json.loads(msg.payload.decode()) - logger.info(f"Received MQTT message: {payload}") - - # 1. pending 레코드 즉시 생성 (detections_db) - detection = Detection.objects.using('detections_db').create( - camera_id=payload.get('camera_id'), - location=payload.get('location'), - detected_speed=payload['detected_speed'], - speed_limit=payload.get('speed_limit', 60.0), - detected_at=payload.get('detected_at', timezone.now()), - image_gcs_uri=payload['image_gcs_uri'], - status='pending' - ) - - logger.info(f"Created detection {detection.id} with pending status") - - # 2. OCR Task 발행 (AMQP) - process_ocr.apply_async( - args=[detection.id], - kwargs={'gcs_uri': payload['image_gcs_uri']}, - queue='ocr_queue', - priority=5 - ) - - logger.info(f"Dispatched OCR task for detection {detection.id}") - - except Exception as e: - logger.error(f"Error processing MQTT message: {e}") - - def on_disconnect(self, client, userdata, rc, properties=None): - logger.warning(f"Disconnected from MQTT broker with code {rc}") - + except json.JSONDecodeError as e: + logger.error(f"Invalid JSON in MQTT message: {e}") + return + + if msg.topic == "detections/new": + self._handle_new_detection(payload) + elif msg.topic == "detections/completed": + self._handle_detection_completed(payload) + else: + logger.warning(f"Unknown MQTT topic: {msg.topic}") + + def _handle_new_detection(self, payload): + """detections/new → Detection 생성 → OCR Task 발행""" + from apps.detections.models import Detection + from tasks.ocr_tasks import process_ocr + + detection = Detection.objects.using('detections_db').create( + camera_id=payload.get('camera_id'), + location=payload.get('location'), + detected_speed=payload['detected_speed'], + speed_limit=payload.get('speed_limit', 60.0), + detected_at=payload.get('detected_at', timezone.now()), + image_gcs_uri=payload['image_gcs_uri'], + status='pending' + ) + + process_ocr.apply_async( + args=[detection.id], + kwargs={'gcs_uri': payload['image_gcs_uri']}, + queue='ocr_queue', + priority=5 + ) + + def _handle_detection_completed(self, payload): + """detections/completed → Notification Task 발행 (Choreography)""" + detection_id = payload["detection_id"] + + from tasks.notification_tasks import send_notification + send_notification.apply_async( + args=[detection_id], queue="fcm_queue" + ) + def start(self): host = os.getenv('RABBITMQ_HOST', 'rabbitmq') port = int(os.getenv('MQTT_PORT', 1883)) - logger.info(f"Connecting to MQTT broker at {host}:{port}") self.client.connect(host, port, 60) self.client.loop_forever() +``` -def start_mqtt_subscriber(): - """백그라운드 스레드에서 MQTT Subscriber 시작""" - subscriber = MQTTSubscriber() - thread = threading.Thread(target=subscriber.start, daemon=True) - thread.start() - logger.info("MQTT Subscriber started in background thread") +```python +# core/mqtt/publisher.py +"""도메인 이벤트 발행""" +import json +import logging +import os +import paho.mqtt.client as mqtt + +logger = logging.getLogger(__name__) + +def publish_event(topic: str, payload: dict): + """Choreography 패턴에서 서비스 간 이벤트 전파""" + host = os.getenv("RABBITMQ_HOST", "rabbitmq") + port = int(os.getenv("MQTT_PORT", 1883)) + + client = mqtt.Client( + callback_api_version=mqtt.CallbackAPIVersion.VERSION2, + protocol=mqtt.MQTTv311, + client_id="", + ) + client.username_pw_set( + os.getenv("MQTT_USER", ""), + os.getenv("MQTT_PASS", "") + ) + + try: + client.connect(host, port, keepalive=10) + client.loop_start() + result = client.publish(topic, json.dumps(payload), qos=1) + result.wait_for_publish(timeout=5) + finally: + client.loop_stop() + client.disconnect() ``` ### 9.2 OCR Service (Celery Worker) @@ -1123,8 +1197,7 @@ def mock_ocr_result(): def process_ocr(self, detection_id: int, gcs_uri: str): from apps.detections.models import Detection from apps.vehicles.models import Vehicle - from tasks.notification_tasks import send_notification - + logger.info(f"Processing OCR for detection {detection_id}") try: @@ -1180,14 +1253,18 @@ def process_ocr(self, detection_id: int, gcs_uri: str): if vehicle: detection.vehicle_id = vehicle.id detection.save(update_fields=['vehicle_id', 'updated_at']) - - # 7. FCM 토큰이 있으면 알림 Task 발행 - if vehicle.fcm_token: - send_notification.apply_async( - args=[detection_id], - queue='fcm_queue' - ) - + + # 7. detection.completed 이벤트 발행 (Choreography) + # OCR은 알림의 존재를 모른다. 이벤트만 발행하고 끝. + try: + from core.mqtt.publisher import publish_event + publish_event( + "detections/completed", + {"detection_id": detection_id}, + ) + except Exception as e: + logger.warning(f"Failed to publish completion event: {e}") + logger.info(f"OCR completed for detection {detection_id}: {plate_number}") return { 'detection_id': detection_id, @@ -1617,8 +1694,8 @@ CORS_ALLOWED_ORIGINS=http://localhost:5173,http://localhost:3000 - DLQ로 실패한 Task 별도 관리 ### 12.4 프로토콜 분리 -- **MQTT**: IoT 디바이스(Raspberry Pi) 통신용 경량 프로토콜 -- **AMQP**: 백엔드 서비스 간 안정적인 메시지 전달 +- **MQTT**: 도메인 이벤트 전파 (IoT→Main, OCR→Main) — Choreography 패턴의 이벤트 버스 +- **AMQP**: Task 분배 (Main→Workers) — Celery를 통한 안정적인 작업 큐잉 ### 12.5 GIL 병목 회피 - **OCR Worker**: `prefork` pool (multiprocessing) - CPU 집약적 @@ -1643,3 +1720,9 @@ CORS_ALLOWED_ORIGINS=http://localhost:5173,http://localhost:3000 | | | - Python 3.12로 버전 업데이트 | | | | - DataDog 관련 설정 제거 (Optional) | | | | - Mock 모드 추가 (OCR_MOCK, FCM_MOCK) | +| 3.0 | 2026-02 | Choreography 패턴 구현 반영 | +| | | - OCR → Alert 직접 호출 제거 (Orchestration → Choreography) | +| | | - MQTT Event Publisher 추가 (detections/completed) | +| | | - Subscriber 토픽 라우팅 구현 | +| | | - Monitoring 스택 반영 (Prometheus, Grafana, Loki, Jaeger) | +| | | - OCR Service에 paho-mqtt 의존성 추가 | diff --git a/docs/load-testing.md b/docs/load-testing.md new file mode 100644 index 0000000..e62d644 --- /dev/null +++ b/docs/load-testing.md @@ -0,0 +1,878 @@ +# SpeedCam 부하 테스트 전략 + +## 개요 + +SpeedCam은 IoT 카메라에서 MQTT를 통해 이미지를 수신하고, Django 애플리케이션이 AMQP/Celery를 통해 OCR 작업을 처리한 후, 감지된 결과를 Alert Service로 전달하는 완전한 파이프라인 아키텍처입니다. 이 문서는 이 복잡한 시스템의 성능을 검증하고 병목 지점을 식별하기 위한 부하 테스트 전략을 설명합니다. + +## 시스템 아키텍처 + +### 처리 파이프라인 + +``` +IoT Camera (MQTT) + ↓ +Main Service (Django + Gunicorn + MQTT Subscriber) + ↓ +AMQP/Celery Queue + ↓ +OCR Worker (EasyOCR on CPU) + ↓ +AMQP Domain Event (detections.completed) + ↓ +Alert Service (kombu consumer → FCM) +``` + +### 배포 인프라 (GCE asia-northeast3-a) + +| 인스턴스 | 역할 | 핵심 설정 | +|---------|------|---------| +| **speedcam-app** | Django + Gunicorn + MQTT Subscriber | GUNICORN_WORKERS=2 | +| **speedcam-db** | MySQL 8.0 데이터베이스 | - | +| **speedcam-mq** | RabbitMQ (MQTT plugin + AMQP) | - | +| **speedcam-ocr** | Celery OCR Worker (EasyOCR) | OCR_CONCURRENCY=1 | +| **speedcam-alert** | Domain Event Consumer → FCM | FCM_MOCK=true | +| **speedcam-mon** | 모니터링 스택 | Prometheus, Grafana, Loki, Jaeger | + +## 측정된 성능 기준치 (Baselines) + +현재 환경에서 측정된 주요 성능 메트릭: + +| 메트릭 | 값 | 설명 | +|-------|-----|------| +| **MQTT 발행 지연** | ~0.3ms | 카메라에서 브로커까지의 지연 시간 | +| **OCR 처리 시간** | 4-5s | 캐시된 EasyOCR Reader로 단일 이미지 처리 | +| **OCR 모델 초기 로딩** | ~30s | EasyOCR 모델 처음 로드 시간 | +| **OCR 최대 처리량** | ~0.2 msg/s | CONCURRENCY=1 설정에서의 이론적 한계 | +| **Alert 이벤트 처리** | <1s | detections.completed 이벤트에서 FCM 전송까지 | +| **엔드-투-엔드 지연** | ~5-6s | MQTT 발행에서 Alert 완료까지 전체 파이프라인 | + +## 테스트 시나리오 + +부하 테스트는 다양한 부하 프로파일 하에서 시스템의 동작을 검증하기 위해 5개의 시나리오로 구성됩니다. + +### 시나리오 정의표 + +| 시나리오 | 워커 수 | 전송률 | 지속시간 | 총 메시지 수 | 목표 및 검증 사항 | +|---------|--------|-------|---------|------------|-----------------| +| **smoke** | 1 | 1 msg (수동) | - | 1 | 파이프라인 기본 동작 검증 | +| **baseline** | 1 | 0.2/s | 60s | ~12 | OCR 최대 처리량 수준에서 안정성 검증 | +| **saturation** | 3 | 1/s | 60s | ~180 | OCR 워커 포화 상태 시뮬레이션 | +| **spike** | 5 | 2/s | 10s | ~100 | 갑작스러운 트래픽 증가 대응 능력 검증 | +| **sustained** | 2 | 0.5/s | 300s | ~300 | 장시간 안정적 운영 능력 검증 | + +### 각 시나리오의 상세 설명 + +#### 1. Smoke Test (스모크 테스트) +- **목적**: 파이프라인이 정상 작동하는지 기본 검증 +- **가설**: 단일 메시지는 지연 없이 전체 파이프라인을 통과 +- **수행 방법**: 수동으로 하나의 MQTT 메시지 발행 +- **검증 항목**: + - 메시지가 speedcam-app에서 수신됨 + - Celery 작업이 생성됨 + - OCR 처리 완료 + - Alert 이벤트가 발행됨 + - 종단간 지연이 ~5-6초 범위 + +#### 2. Baseline Test (기준선 테스트) +- **목적**: OCR 워커의 이론적 최대 처리량에서 안정성 검증 +- **가설**: OCR_CONCURRENCY=1 설정에서 0.2 msg/s 지속 가능 +- **초기화**: speedcam-ocr 워커 재시작하여 모델 미리 로드 +- **수행 방법**: 1개 워커가 12개 메시지를 60초에 걸쳐 0.2/s 속도로 발행 +- **검증 항목**: + - 모든 메시지가 처리됨 (100% 완료율) + - 큐 깊이가 안정적으로 유지됨 + - 데이터베이스 연결이 누적되지 않음 + - 종단간 지연이 안정적 (5-6초 범위) + +#### 3. Saturation Test (포화 테스트) +- **목적**: OCR 워커 포화 상태에서 시스템의 대응 능력 검증 +- **가설**: 1/s 속도로 메시지가 쌓이면 큐 깊이 증가하며, 메시지 손실 없이 처리됨 +- **수행 방법**: 3개 워커가 180개 메시지를 60초에 걸쳐 1/s 속도로 발행 +- **검증 항목**: + - 큐 최대 깊이 관찰 (이론값: ~12) + - 메시지 손실 0건 + - 종단간 지연 증가 추이 (선형 증가 예상) + - Celery 작업 타임아웃 발생 여부 + - 데이터베이스 연결 고갈 여부 + +#### 4. Spike Test (스파이크 테스트) +- **목적**: 갑작스러운 트래픽 급증에 대한 시스템 회복 능력 검증 +- **가설**: 짧은 기간의 고속 전송 후 시스템이 정상으로 복구 +- **수행 방법**: 5개 워커가 100개 메시지를 10초에 걸쳐 2/s 속도로 발행 +- **검증 항목**: + - 스파이크 중 큐 최대 깊이 + - 메시지 손실 여부 + - 완료 후 큐 정상화 시간 + - 메모리 누수 증상 (메모리 사용량 회귀) + +#### 5. Sustained Test (지속성 테스트) +- **목적**: 장시간 안정적 운영 능력 검증 및 메모리 누수 감지 +- **가설**: 0.5/s 지속 속도로 300초 동안 모든 메시지 처리, 메모리 누수 없음 +- **수행 방법**: 2개 워커가 300개 메시지를 300초에 걸쳐 0.5/s 속도로 발행 +- **검증 항목**: + - 메시지 처리율 일정 유지 (100%) + - 메모리 사용량 추이 (증가 아닌 안정) + - CPU 사용률 안정성 + - 완료된 메시지 수 = 300건 + +## 가설-실행-비교 프레임워크 + +부하 테스트는 다음의 명확한 프레임워크를 따릅니다: + +### 1. 사전 가설 수립 (Pre-Test Hypothesis) + +각 시나리오 시작 전에 예상 결과를 명확히 정의합니다: + +``` +테스트 시작 전: +├─ 예상 완료 메시지 수 +├─ 예상 최대 큐 깊이 +├─ 예상 종단간 지연 범위 +├─ 예상 리소스 사용률 범위 +└─ 예상 오류율 (0% 또는 허용 범위) +``` + +테스트 스크립트는 다음과 같이 가설을 출력합니다: + +```python +# 예시 +print(f""" +=== Baseline Test Hypothesis === +Expected Message Completion: 12 (100%) +Expected Queue Depth: <1 (stable) +Expected E2E Latency: 5-6 seconds +Expected Error Rate: 0% +Expected OCR Processing Time: ~4-5s per image +Resource Baseline: Monitor CPU and Memory +""") +``` + +### 2. 테스트 실행 (Test Execution) + +실시간으로 시스템 상태를 모니터링하면서 테스트를 실행합니다: + +``` +테스트 진행 중: +├─ 메시지 발행률 확인 +├─ 실시간 큐 깊이 모니터링 +├─ Celery 작업 상태 추적 +├─ 에러 로그 감시 +└─ 리소스 사용률 관찰 +``` + +테스트 스크립트는 진행 상황을 실시간으로 출력합니다: + +``` +[T+5s] Published: 1/12 | Queue Depth: 0 | OCR Processing: 1 | Completed: 0 +[T+10s] Published: 2/12 | Queue Depth: 0 | OCR Processing: 1 | Completed: 1 +[T+15s] Published: 3/12 | Queue Depth: 1 | OCR Processing: 1 | Completed: 1 +... +``` + +### 3. 결과 비교 및 분석 (Comparison & Analysis) + +테스트 완료 후, 실제 결과를 사전 가설과 비교합니다: + +#### 파이프라인 검증 쿼리 + +테스트 완료 후 데이터베이스를 직접 쿼리하여 메시지 처리 상황을 확인합니다: + +```sql +-- Detection 테이블 확인 (OCR 처리된 메시지) +SELECT + COUNT(*) as total_detections, + COUNT(CASE WHEN status='completed' THEN 1 END) as completed, + COUNT(CASE WHEN status='failed' THEN 1 END) as failed, + COUNT(CASE WHEN status='processing' THEN 1 END) as still_processing +FROM detections +WHERE created_at > DATE_SUB(NOW(), INTERVAL 10 MINUTE); + +-- 시간별 처리 완료 현황 +SELECT + DATE_FORMAT(created_at, '%Y-%m-%d %H:%i:00') as minute, + COUNT(*) as completed_count +FROM detections +WHERE status='completed' AND created_at > DATE_SUB(NOW(), INTERVAL 10 MINUTE) +GROUP BY DATE_FORMAT(created_at, '%Y-%m-%d %H:%i:00') +ORDER BY minute; + +-- 처리 시간 분석 +SELECT + MIN(DATE_FORMAT(TIMEDIFF(updated_at, created_at), '%H:%i:%s')) as min_duration, + MAX(DATE_FORMAT(TIMEDIFF(updated_at, created_at), '%H:%i:%s')) as max_duration, + AVG(TIME_TO_SEC(TIMEDIFF(updated_at, created_at))) as avg_seconds +FROM detections +WHERE status='completed' AND created_at > DATE_SUB(NOW(), INTERVAL 10 MINUTE); +``` + +#### 결과 비교표 + +테스트 결과를 가설과 비교하는 형식: + +``` +=== Baseline Test Results === +Metric | Hypothesis | Actual | Status | Analysis +Total Messages | 12 (100%) | 12 (100%) | PASS | All messages processed +Queue Depth Peak | <1 | 0 | PASS | No queuing observed +E2E Latency Range | 5-6s | 5.2-5.8s | PASS | Within expected range +Error Rate | 0% | 0% | PASS | No processing errors +OCR Processing Time | 4-5s avg | 4.6s avg | PASS | Consistent with baseline +Memory Leak | None | Stable | PASS | No memory increase +DB Connections | <5 | 2-3 | PASS | No connection pooling issues +``` + +### 4. 의사결정 기준 + +결과 비교 후 다음 단계를 결정합니다: + +| 결과 | 의사결정 | +|-----|--------| +| **모두 PASS** | 다음 시나리오 진행 | +| **부분 FAIL (메시지 처리 완료)** | 병목 분석 후 진행 (성능 개선 필요) | +| **메시지 손실** | 중단 및 구성 검토 필요 | +| **시스템 크래시** | 중단 및 디버깅 필요 | + +## 테스트 실행 방법 + +### 사전 요구사항 + +```bash +# GCE 인스턴스에 SSH 접속 +gcloud compute ssh speedcam-app --zone=asia-northeast3-a + +# Docker 컨테이너 내부 접속 +sudo docker exec -it speedcam-main bash + +# 환경 변수 설정 +export MQTT_PASS= +export MQTT_HOST=speedcam-mq +export MQTT_PORT=1883 +``` + +### 테스트 스크립트 위치 + +``` +/app/docker/k6/mqtt-load-test.py # 주요 테스트 스크립트 +/app/docker/k6/load-test.js # k6 HTTP API 테스트 (보조) +``` + +### 시나리오별 실행 명령 + +#### 1. Smoke Test (스모크 테스트) + +```bash +# 모든 서비스 준비 상태 확인 +python /app/docker/k6/mqtt-load-test.py smoke + +# 예상 출력: +# === Smoke Test Starting === +# Publishing 1 test message... +# [T+0.5s] Message published +# [T+5s] Detection created in database +# [T+5.5s] Alert event published +# === Smoke Test PASSED === +``` + +#### 2. Baseline Test (기준선 테스트) + +```bash +# OCR 워커가 모델을 미리 로드하도록 대기 (약 30초) +# 모니터링 터미널에서 Grafana 대시보드 준비 + +python /app/docker/k6/mqtt-load-test.py baseline + +# 예상 소요 시간: 약 70초 (60초 + 처리 완료 대기) +# 예상 완료 메시지: 12건 +``` + +#### 3. Saturation Test (포화 테스트) + +```bash +# 주의: 이 테스트는 큐 깊이를 증가시킵니다 +# 모니터링 대시보드 확인 준비 + +python /app/docker/k6/mqtt-load-test.py saturation + +# 예상 소요 시간: 약 150초 (60초 + 큐 처리 완료 대기) +# 예상 완료 메시지: 180건 +``` + +#### 4. Spike Test (스파이크 테스트) + +```bash +# 단기간 높은 처리율 테스트 + +python /app/docker/k6/mqtt-load-test.py spike + +# 예상 소요 시간: 약 90초 (10초 + 큐 처리 완료 대기) +# 예상 완료 메시지: 100건 +``` + +#### 5. Sustained Test (지속성 테스트) + +```bash +# 장시간 안정성 테스트 - 커피를 준비하세요 +# 이 테스트는 약 8-10분 소요됩니다 + +python /app/docker/k6/mqtt-load-test.py sustained + +# 예상 소요 시간: 약 600초 (300초 + 큐 처리 완료 대기) +# 예상 완료 메시지: 300건 +``` + +### 테스트 중단 및 정리 + +```bash +# 테스트를 강제 중단해야 하는 경우 (Ctrl+C) +# Celery 큐에 남아있는 작업을 확인 +python -c "from celery_app import app; print(app.control.inspect().active())" + +# 필요시 큐 초기화 (주의: 처리 중인 작업도 제거됨) +python -c "from celery_app import app; app.control.purge()" + +# 데이터베이스 테스트 데이터 정리 +python manage.py shell +>>> from detections.models import Detection +>>> Detection.objects.filter(created_at__gt=timezone.now()-timedelta(hours=1)).delete() +``` + +## 결과 해석 및 성능 분석 + +### 메트릭 정의 + +#### 완료 메시지 (Completed Messages) +- 정의: MQTT 발행에서 FCM Alert까지 전체 파이프라인 완료 +- PASS 기준: 예상 메시지 수의 100% +- FAIL 기준: 1건 이상의 메시지 손실 + +#### 큐 깊이 (Queue Depth) +- 정의: RabbitMQ AMQP 큐에 대기 중인 Celery 작업 수 +- 모니터링: `rabbitmqctl list_queues` 또는 Grafana +- 분석: + - Baseline에서 큐 깊이 > 2: OCR 처리 능력 부족 + - Saturation에서 큐 깊이 선형 증가: 예상된 동작 + - Sustained 후 큐 깊이 0으로 복귀: 정상 종료 + +#### 종단간 지연 (End-to-End Latency) +- 정의: MQTT 발행부터 Alert 완료까지의 경과 시간 +- 측정: Database detection.created_at → alert_events.published_at +- 분석: + - Baseline: 5-6초 (안정적) + - Saturation: 5초 + (큐_깊이 × 4초) (선형 증가) + - Spike 후 정상화: 초기 지연 증가 → 정상 복귀 + +#### 오류율 (Error Rate) +- 정의: 처리 실패한 메시지 비율 +- PASS 기준: 0% +- 감지 방법: + - Celery 작업 failed 상태 + - Loki 로그의 ERROR, EXCEPTION 레벨 + - Database detection.status='failed' + +### 결과별 해석 가이드 + +#### Baseline Test 결과 해석 + +**완전 통과 (All PASS)** +``` +완료: 12/12 (100%) +큐 최대: 0 +E2E 지연: 5.2-5.8s 평균 5.5s +오류율: 0% + +→ 해석: 시스템이 설계된 대로 동작. OCR 최대 처리량을 안전하게 유지. +→ 다음: Saturation 테스트 진행. +``` + +**부분 실패 (Partial FAIL) - 메시지 손실 없음** +``` +완료: 12/12 (100%) +큐 최대: 1-2 +E2E 지연: 5.5-7.2s 평균 6.2s +오류율: 0% + +→ 해석: 메시지는 모두 처리되지만 약간의 지연 발생. +→ 원인 분석: + - 데이터베이스 연결 대기 + - GC pause 영향 + - MQTT 브로커 내부 처리 지연 +→ 다음: Grafana에서 상세 분석 (CPU, 메모리, DB 연결) +``` + +**실패 (FAIL) - 메시지 손실** +``` +완료: 10/12 (83%) +손실: 2 +큐 최대: 5+ +오류율: 16.7% + +→ 해석: 메시지 손실 발생 - 심각한 문제. +→ 즉시 조치: + 1. Celery 워커 로그 확인 + docker logs speedcam-ocr | tail -100 + 2. Loki에서 ERROR 레벨 로그 검색 + {job="celery-worker"} | json | status="ERROR" + 3. RabbitMQ 상태 확인 + docker exec speedcam-mq rabbitmqctl status +→ 다음: 원인 제거 후 Baseline 재실행. +``` + +#### Saturation Test 결과 해석 + +**정상 포화 (Expected Behavior)** +``` +완료: 180/180 (100%) +큐 최대: 8-12 (이론값: (1.0-0.2)×60 = 48s×1msg/s ≈ 10-12) +E2E 지연: 초기 5-6s → 최대 50-52s +오류율: 0% + +→ 해석: OCR이 포화되었으나 메시지 손실 없음 (큐 기반 처리). +→ 시스템이 설계된 대로 부하를 흡수. +→ 다음: Spike 테스트 진행. +``` + +**예상과 다른 포화 (Anomaly)** +``` +완료: 180/180 (100%) +큐 최대: >30 (예상 10-12) +E2E 지연: 100초 이상 +오류율: 0% 하지만 일부 타임아웃 + +→ 해석: OCR 처리 속도가 예상보다 느림. +→ 원인 분석: + - OCR 모델 재로드되었을 가능성 (메모리 부족) + - CPU 과부하 또는 열 제한 + - 데이터베이스 슬로우 쿼리 +→ 다음: 인프라 점검 후 baseline 재실행. +``` + +### 병목 지점 식별 및 개선 + +#### 현재 알려진 병목: OCR 처리 (Primary Bottleneck) + +**증상:** +- OCR_CONCURRENCY=1로 설정되어 있음 +- 이론적 최대 처리량: 0.2 msg/s (4-5초/이미지) +- 단일 CPU 코어에서 순차 처리 + +**개선 방안:** + +```bash +# 1단계: 멀티코어 EasyOCR (실험) +# Saturation 결과를 토대로 확인 +# 큐가 10개 이상 쌓이면 다음 단계 고려 + +# 2단계: OCR_CONCURRENCY 증가 (권장) +# speedcam-ocr 환경 변수 수정 +export OCR_CONCURRENCY=2 # 이론상 0.4 msg/s 가능 + +# Docker 재시작 +sudo docker restart speedcam-ocr + +# Baseline 재테스트 +python /app/docker/k6/mqtt-load-test.py baseline + +# 결과: 완료 시간 50% 단축 예상 (12 × 5s / 2 = 30s) +``` + +**OCR_CONCURRENCY 증가 시 고려사항:** +- 메모리 사용량 증가 (각 워커당 GiB급) +- CPU 코어 수 제한 (GCE 인스턴스 코어 수 확인) +- 온도 및 열 제한 (GPU 없이 CPU만 사용) + +#### 2차 병목: Gunicorn 워커 (Secondary Bottleneck) + +**현재 상태:** +- GUNICORN_WORKERS=2 설정 +- MQTT 수신은 별도 스레드에서 처리 +- HTTP API는 Gunicorn 워커 풀 공유 + +**확인 방법:** +```bash +# Saturation 테스트 중 Grafana 확인 +# Gunicorn worker utilization을 모니터링 +# 만약 모든 워커가 항상 바쁘다면: + +# 로그에서 "worker timeout" 확인 +docker logs speedcam-app 2>&1 | grep -i timeout + +# 필요시 GUNICORN_WORKERS 증가 +export GUNICORN_WORKERS=4 +``` + +#### 3차 병목: MySQL 데이터베이스 (Tertiary Bottleneck) + +**감지:** +```sql +-- 데이터베이스 연결 확인 +SHOW PROCESSLIST; + +-- 슬로우 쿼리 확인 +SELECT * FROM mysql.slow_log ORDER BY start_time DESC LIMIT 10; + +-- 테이블 락 확인 +SHOW OPEN TABLES WHERE In_use > 0; +``` + +**개선:** +```bash +# 1. 인덱스 확인 +# detections 테이블의 created_at, status에 인덱스 있는지 확인 + +# 2. 연결 풀 크기 확인 +# Django DATABASES 설정의 CONN_MAX_AGE + +# 3. 필요시 마스터-슬레이브 구성 검토 +``` + +#### MQTT Subscriber 병목 가능성 + +**현재 아키텍처:** +- Django 애플리케이션 내부 MQTT 클라이언트 (단일 스레드) +- 블로킹 구독 모델 + +**부하 테스트에서 영향:** +- 메시지 발행률이 초당 1개 이상일 때 순차 처리 +- 단일 스레드이므로 CPU 활용도가 낮을 수 있음 + +**확인:** +```bash +# 테스트 중 프로세스 상태 확인 +docker top speedcam-app | head -20 + +# 만약 MQTT 수신 thread가 항상 busy면: +# MQTT 클라이언트 최적화 필요 +``` + +## 모니터링 및 관찰 + +### Grafana 대시보드 사용 + +테스트 진행 중 다음 대시보드를 지속적으로 관찰합니다: + +#### 1. SpeedCam 애플리케이션 대시보드 + +``` +URL: http://speedcam-mon:3000/d/speedcam-app/speedcam-application + +주요 패널: +├─ MQTT Messages Received (per second) +│ └─ 값이 설정된 발행율과 일치하는지 확인 +├─ Celery Queue Depth (tasks) +│ └─ Baseline: ~0, Saturation: 8-12, Spike: 최대값 관찰 +├─ OCR Processing Time (histogram) +│ └─ 평균값이 4-5초 범위인지 확인 +└─ End-to-End Latency (percentiles) + └─ P50: 5-6초, P99: 큐_깊이에 비례하여 증가 +``` + +#### 2. RabbitMQ 모니터링 대시보드 + +``` +URL: http://speedcam-mq:15672 (username: guest, password: guest) + +관찰 항목: +├─ Queue: celery (Messages) +│ └─ Ready: 처리 대기 중인 작업 +│ └─ Unacked: 처리 중인 작업 +├─ Queue: detections.completed +│ └─ Alert Service가 처리하는 이벤트 흐름 +└─ Consumers + └─ OCR Worker 연결 상태 확인 +``` + +#### 3. MySQL 성능 모니터링 + +``` +Grafana 패널: Database Performance + +확인 항목: +├─ Queries per second +├─ Average query execution time +├─ Active connections +├─ Slow queries (>1 second) +└─ Innodb buffer pool hit ratio (>99% 목표) +``` + +### Prometheus 메트릭 쿼리 + +테스트 중 다음 메트릭을 직접 쿼리하여 확인합니다: + +```promql +# MQTT 수신 메시지율 +rate(mqtt_messages_received_total[1m]) + +# Celery 큐 깊이 (현재값) +celery_queue_length{queue="celery"} + +# OCR 처리 시간 (평균) +rate(ocr_processing_time_seconds_sum[5m]) / +rate(ocr_processing_time_seconds_count[5m]) + +# 완료된 감지 이벤트 +rate(detections_completed_total[1m]) + +# Alert 발행 이벤트 +rate(alert_events_published_total[1m]) + +# 데이터베이스 연결 +mysql_global_status_threads_connected +``` + +### Loki 로그 쿼리 + +오류 또는 의심스러운 동작을 추적하기 위해 다음 로그 쿼리를 사용합니다: + +```loki +# OCR 워커 에러 +{job="celery-worker"} | json | level="ERROR" + +# Celery 작업 타임아웃 +{job="celery-worker"} | json | msg=~".*timeout.*" + +# Django 애플리케이션 에러 +{job="django-app"} | json | level="ERROR" + +# MQTT 연결 문제 +{job="django-app"} | json | msg=~".*mqtt.*error.*" + +# 데이터베이스 연결 에러 +{job="django-app"} | json | msg=~".*database.*connection.*" + +# 지난 5분간의 모든 ERROR 레벨 로그 개수 +count( + {job=~"celery-worker|django-app"} + | json + | level="ERROR" +) by (job) +``` + +### Jaeger 분산 추적 (Distributed Tracing) + +선택적으로 완전한 요청 흐름을 추적합니다: + +``` +URL: http://speedcam-mon:6831/search + +추적 항목: +1. MQTT 메시지 수신 스팬 + ├─ MQTT publish (IoT Camera) + ├─ MQTT message received (Django) + └─ Celery task enqueue + +2. OCR 처리 스팬 + ├─ Celery task start + ├─ EasyOCR model load (초회) + ├─ Image preprocessing + ├─ OCR inference + └─ Celery task complete + +3. Alert 발행 스팬 + ├─ Domain event created + ├─ kombu consumer received + ├─ FCM API call + └─ Alert published +``` + +## 문제 해결 및 FAQ + +### Q: Baseline 테스트에서 메시지가 처리되지 않음 + +**A: 다음을 순서대로 확인하세요:** + +```bash +# 1. MQTT 브로커 연결 확인 +docker exec speedcam-mq mosquitto_sub -h localhost -t "#" & +# (다른 터미널에서) python /app/docker/k6/mqtt-load-test.py smoke + +# 2. Celery 워커 상태 확인 +docker exec speedcam-ocr celery -A ocr_tasks inspect active + +# 3. Django 애플리케이션 로그 확인 +docker logs speedcam-app | tail -50 | grep -i error + +# 4. RabbitMQ 큐 상태 +docker exec speedcam-mq rabbitmqctl list_queues +``` + +### Q: OCR 워커가 응답하지 않음 + +**A:** + +```bash +# 1. EasyOCR 모델 로드 상태 확인 +# 첫 테스트 실행 시 약 30초 소요 (로그에서 확인) +docker logs speedcam-ocr | grep -i "loading\|model" + +# 2. 메모리 부족 여부 확인 +docker stats speedcam-ocr + +# 만약 메모리 사용량이 95% 이상: +# OCR_CONCURRENCY를 1로 유지하거나 +# 인스턴스 메모리 증설 필요 + +# 3. 워커 재시작 +docker restart speedcam-ocr + +# 모델 다시 로드될 때까지 대기 (30초) +sleep 30 +python /app/docker/k6/mqtt-load-test.py smoke +``` + +### Q: 테스트 중 "Task timed out" 에러 발생 + +**A:** + +```bash +# 1. Celery 타임아웃 설정 확인 +# celery_config.py의 task_soft_time_limit, task_time_limit 확인 + +# 2. 원인별 대응: +# - OCR 처리 시간 > 5초인 경우: 정상 (이미지 크기 또는 모델 특성) +# - OCR 처리 시간 > 30초인 경우: 모델 재로드 또는 하드웨어 문제 +# → docker restart speedcam-ocr + +# 3. 타임아웃 시간 증가 (필요시) +export CELERY_TASK_TIME_LIMIT=600 # 10분 +docker restart speedcam-ocr +``` + +### Q: 메모리 사용량이 계속 증가함 + +**A: 메모리 누수 가능성** + +```bash +# 1. 현재 메모리 사용량 추이 확인 +watch -n 1 'docker stats speedcam-app --no-stream | head -2' + +# 2. Grafana에서 Memory Usage 그래프 확인 +# Sustained 테스트 후 메모리가 복구되지 않으면 누수 가능 + +# 3. 원인 분석: +# - Database 연결 누적: Django ORM 미사용 연결 +# → Django 설정의 CONN_MAX_AGE 확인 +# +# - Celery 작업 메타데이터 누적 +# → RabbitMQ 퍼지 또는 설정 검토 +# +# - Python 객체 참조 순환 +# → 메모리 프로파일링 도구 (memory_profiler) 사용 + +# 4. 재시작 +docker restart speedcam-app speedcam-ocr speedcam-alert +``` + +### Q: Saturation 테스트 후 큐가 비지 않음 + +**A:** + +```bash +# 1. 남아있는 작업 확인 +docker exec speedcam-mq rabbitmqctl list_queues + +# 2. Celery 워커 상태 확인 +docker exec speedcam-ocr celery -A ocr_tasks inspect active + +# 3. 만약 워커가 멈춘 경우: +docker restart speedcam-ocr + +# 4. 큐 수동 퍼지 (데이터 손실 주의) +docker exec speedcam-mq rabbitmqctl purge_queue celery + +# 5. 데이터베이스 정리 +docker exec speedcam-app python manage.py shell +# >>> from detections.models import Detection +# >>> Detection.objects.filter(created_at__gt=...).delete() +``` + +## 체크리스트 + +테스트를 시작하기 전에 다음을 확인하세요: + +### 사전 점검 + +- [ ] 모든 인스턴스(speedcam-app, speedcam-db, speedcam-mq, speedcam-ocr, speedcam-alert, speedcam-mon)가 실행 중 +- [ ] SSH 접속 가능 (`gcloud compute ssh speedcam-app --zone=asia-northeast3-a`) +- [ ] Docker 컨테이너 접속 가능 (`docker exec -it speedcam-main bash`) +- [ ] MQTT 환경 변수 설정 (`MQTT_PASS` 포함) +- [ ] RabbitMQ 웹 UI 접속 가능 (http://speedcam-mq:15672) +- [ ] Grafana 접속 가능 (http://speedcam-mon:3000) +- [ ] MySQL 접속 가능 (`docker exec speedcam-db mysql -u root -p`) + +### 테스트별 점검 + +#### Smoke Test 전 +- [ ] Celery 워커 상태: active/idle +- [ ] RabbitMQ 큐: celery, detections.completed 모두 empty +- [ ] 데이터베이스: 최근 detections 없음 + +#### Baseline Test 전 +- [ ] OCR 모델 미리 로드 (약 30초 대기 후 확인) +- [ ] Grafana 대시보드 새로고침 +- [ ] 모니터링 터미널 준비 (2개: 스크립트 + 로그) + +#### Saturation Test 전 +- [ ] Spike 테스트 완료 후 큐 비워짐 확인 +- [ ] 메모리 사용량 정상 범위 확인 +- [ ] Grafana 범위 설정 변경 (Y축 스케일 확인) + +#### Spike Test 전 +- [ ] 최근 테스트 결과 분석 완료 +- [ ] 병목 지점 파악 완료 + +#### Sustained Test 전 +- [ ] 충분한 시간 확보 (약 10분) +- [ ] 모니터링 도구 안정성 확인 +- [ ] 로그 수집 설정 확인 (Loki) + +### 테스트 후 정리 + +- [ ] 모든 완료 메시지 수 기록 +- [ ] 최대 큐 깊이 기록 +- [ ] 주요 오류 로그 저장 +- [ ] Grafana 스크린샷 캡처 +- [ ] 분석 결과 문서화 +- [ ] 데이터베이스 테스트 데이터 정리 (필요시) +- [ ] 다음 테스트 시나리오 계획 + +## 결론 및 권장사항 + +### 현재 성능 프로필 요약 + +SpeedCam 시스템은 다음과 같은 성능 특성을 보입니다: + +- **최대 안전 처리 속도**: ~0.2 msg/s (OCR 병목) +- **포화 상태 큐 깊이**: ~10-12 작업 +- **종단간 지연**: 5-6초 (baseline) ~ 50초+ (saturation) +- **안정성**: 메시지 손실 0%, 메모리 누수 없음 + +### 향후 개선 계획 + +**Phase 1 (즉시 실행 가능)** +1. OCR_CONCURRENCY를 2로 증가 → 처리량 2배 증대 +2. Baseline 테스트 재실행하여 안정성 재검증 + +**Phase 2 (중장기)** +1. 이미지 전처리 최적화 (해상도, 압축율) +2. EasyOCR 대신 더 빠른 OCR 엔진 평가 (TrOCR, PaddleOCR) +3. GPU 활용 검토 (GCE GPU 인스턴스) + +**Phase 3 (장기)** +1. 마이크로서비스 아키텍처: OCR 서비스 독립 스케일링 +2. 메시지 브로커 클러스터링 +3. 캐싱 전략 (이미지 해시 기반 캐시 완료 결과) + +## 참고 자료 + +- MQTT 메시지 형식: `/app/docs/mqtt-protocol.md` +- Celery 설정: `/app/celery_config.py` +- Django MQTT Subscriber: `/app/mqtt/subscriber.py` +- OCR 워커 구현: `/app/ocr/tasks.py` +- Grafana 대시보드 설정: `/app/docker/grafana/dashboards/` +- Loki 설정: `/app/docker/loki/loki-config.yaml` + +--- + +**작성일**: 2026년 2월 13일 +**최종 수정**: 2026년 2월 13일 +**유지보수자**: SpeedCam 팀 diff --git a/requirements/ocr.txt b/requirements/ocr.txt index d163a00..7d9c73b 100644 --- a/requirements/ocr.txt +++ b/requirements/ocr.txt @@ -8,4 +8,3 @@ pillow==11.2.1 # Google Cloud Storage google-cloud-storage==2.18.2 - diff --git a/scripts/start_alert_worker.sh b/scripts/start_alert_worker.sh index 24bda15..29eed45 100644 --- a/scripts/start_alert_worker.sh +++ b/scripts/start_alert_worker.sh @@ -1,14 +1,22 @@ #!/bin/bash set -e -echo "Starting Alert Worker (Celery)..." +echo "Starting Alert Worker (Domain Event Consumer)..." -# Celery Worker 시작 (gevent pool - I/O 집약적) +# Django 설정 모듈 기본값 +export DJANGO_SETTINGS_MODULE="${DJANGO_SETTINGS_MODULE:-config.settings.dev}" + +# GCP metadata 내부 요청을 트레이싱에서 제외 (404 ERROR span 방지) +export OTEL_PYTHON_REQUESTS_EXCLUDED_URLS="${OTEL_PYTHON_REQUESTS_EXCLUDED_URLS:-metadata.google.internal}" + +# Alert Event Consumer 시작 +# Choreography: detections.completed 이벤트를 직접 구독하여 +# Alert Service가 자율적으로 알림 발송 여부를 결정한다. opentelemetry-instrument \ --service_name speedcam-alert \ - celery -A config worker \ - --pool=gevent \ - --concurrency=${ALERT_CONCURRENCY:-100} \ - --queues=fcm_queue \ - --hostname=alert@%h \ - --loglevel=${LOG_LEVEL:-info} + python -c " +import django +django.setup() +from core.events.consumer import start_event_consumer +start_event_consumer() +" diff --git a/scripts/start_ocr_worker.sh b/scripts/start_ocr_worker.sh index 40557a6..dc8d892 100644 --- a/scripts/start_ocr_worker.sh +++ b/scripts/start_ocr_worker.sh @@ -3,6 +3,9 @@ set -e echo "Starting OCR Worker (Celery)..." +# GCP metadata 내부 요청을 트레이싱에서 제외 (404 ERROR span 방지) +export OTEL_PYTHON_REQUESTS_EXCLUDED_URLS="${OTEL_PYTHON_REQUESTS_EXCLUDED_URLS:-metadata.google.internal}" + # Celery Worker 시작 (prefork pool - CPU 집약적) opentelemetry-instrument \ --service_name speedcam-ocr \ diff --git a/tasks/__init__.py b/tasks/__init__.py index c3d404a..d0f85f7 100644 --- a/tasks/__init__.py +++ b/tasks/__init__.py @@ -1,6 +1,5 @@ # Celery Tasks Package from .dlq_tasks import process_dlq_message -from .notification_tasks import send_notification from .ocr_tasks import process_ocr -__all__ = ["process_ocr", "send_notification", "process_dlq_message"] +__all__ = ["process_ocr", "process_dlq_message"] diff --git a/tasks/notification_tasks.py b/tasks/notification_tasks.py index 770a3c8..70d2ac0 100644 --- a/tasks/notification_tasks.py +++ b/tasks/notification_tasks.py @@ -1,9 +1,13 @@ -"""Notification Worker Tasks (I/O 집약적)""" +""" +Notification Processing Logic (Alert Service) + +EDA/Choreography: detections.completed 이벤트에 반응하여 +Alert Service가 자율적으로 알림 발송 여부를 결정한다. +""" import logging import os -from celery import shared_task from django.utils import timezone logger = logging.getLogger(__name__) @@ -12,17 +16,10 @@ FCM_MOCK = os.getenv("FCM_MOCK", "false").lower() == "true" -@shared_task( - bind=True, - max_retries=3, - autoretry_for=(Exception,), - retry_backoff=True, - retry_backoff_max=600, - acks_late=True, -) -def send_notification(self, detection_id: int): +def process_notification(detection_id: int): """ - FCM 푸시 알림 전송 Task + FCM 푸시 알림 전송 + - 대시보드 토픽 브로드캐스트 (모든 감지에 대해) - 매칭된 차량 개별 푸시 (차량 있는 경우) - MSA: 각 서비스별 DB에서 조회 @@ -31,150 +28,128 @@ def send_notification(self, detection_id: int): from apps.notifications.models import Notification from apps.vehicles.models import Vehicle - try: - # 1. Detection 조회 (detections_db) - detection = Detection.objects.using("detections_db").get( - id=detection_id, status="completed" + # 1. Detection 조회 (detections_db) + detection = Detection.objects.using("detections_db").get( + id=detection_id, status="completed" + ) + + # 2. 알림 메시지 생성 + title = f"과속 위반 감지: {detection.ocr_result or '미확인'}" + body = ( + f"위치: {detection.location or '알 수 없음'}\n" + f"속도: {detection.detected_speed}km/h " + f"(제한: {detection.speed_limit}km/h)" + ) + data = { + "detection_id": str(detection_id), + "plate_number": detection.ocr_result or "", + "speed": str(detection.detected_speed), + "speed_limit": str(detection.speed_limit), + "location": detection.location or "", + "detected_at": detection.detected_at.isoformat(), + } + + # 3. 대시보드 토픽으로 항상 전송 (중복 방지) + topic_response = None + already_sent_topic = ( + Notification.objects.using("notifications_db") + .filter( + detection_id=detection_id, + fcm_token="topic:dashboard_alerts", + status="sent", ) + .exists() + ) - # 2. 알림 메시지 생성 - title = f"⚠️ 과속 위반 감지: {detection.ocr_result or '미확인'}" - body = ( - f"📍 위치: {detection.location or '알 수 없음'}\n" - f"🚗 속도: {detection.detected_speed}km/h " - f"(제한: {detection.speed_limit}km/h)" - ) - data = { - "detection_id": str(detection_id), - "plate_number": detection.ocr_result or "", - "speed": str(detection.detected_speed), - "speed_limit": str(detection.speed_limit), - "location": detection.location or "", - "detected_at": detection.detected_at.isoformat(), - } - - # 3. 대시보드 토픽으로 항상 전송 (중복 방지) - topic_response = None - already_sent_topic = ( - Notification.objects.using("notifications_db") - .filter( - detection_id=detection_id, - fcm_token="topic:dashboard_alerts", - status="sent", + if not already_sent_topic: + try: + if FCM_MOCK: + topic_response = f"mock-topic-{detection_id}" + else: + from core.firebase.fcm import send_topic_notification + + topic_response = send_topic_notification( + "dashboard_alerts", title, body, data + ) + logger.info( + f"Dashboard topic notification sent for detection " + f"{detection_id}: {topic_response}" + ) + except Exception as e: + logger.warning( + f"Dashboard topic notification failed for detection " + f"{detection_id}: {e}" ) - .exists() + + # 4. 토픽 알림 이력 저장 (notifications_db) + Notification.objects.using("notifications_db").create( + detection_id=detection_id, + fcm_token="topic:dashboard_alerts", + title=title, + body=body, + status="sent" if topic_response else "failed", + sent_at=timezone.now() if topic_response else None, + error_message=None if topic_response else "Topic send failed", + ) + else: + topic_response = "already_sent" + logger.info( + f"Dashboard topic notification already sent for detection " + f"{detection_id}, skipping" ) - if not already_sent_topic: - try: - if FCM_MOCK: - topic_response = f"mock-topic-{detection_id}" - else: - from core.firebase.fcm import send_topic_notification - - topic_response = send_topic_notification( - "dashboard_alerts", title, body, data - ) - logger.info( - f"Dashboard topic notification sent for detection " - f"{detection_id}: {topic_response}" - ) - except Exception as e: - logger.warning( - f"Dashboard topic notification failed for detection " - f"{detection_id}: {e}" + # 5. 매칭된 차량에 개별 푸시 + vehicle = None + if detection.vehicle_id: + try: + vehicle = Vehicle.objects.using("vehicles_db").get( + id=detection.vehicle_id + ) + except Vehicle.DoesNotExist: + logger.warning(f"Vehicle {detection.vehicle_id} not found") + + if vehicle and vehicle.fcm_token: + try: + if FCM_MOCK: + vehicle_response = f"mock-message-id-{detection_id}" + else: + from core.firebase.fcm import send_push_notification + + vehicle_response = send_push_notification( + token=vehicle.fcm_token, + title=title, + body=body, + data=data, ) - # 4. 토픽 알림 이력 저장 (notifications_db) Notification.objects.using("notifications_db").create( detection_id=detection_id, - fcm_token="topic:dashboard_alerts", + fcm_token=vehicle.fcm_token, title=title, body=body, - status="sent" if topic_response else "failed", - sent_at=timezone.now() if topic_response else None, - error_message=None if topic_response else "Topic send failed", + status="sent", + sent_at=timezone.now(), ) - else: - topic_response = "already_sent" logger.info( - f"Dashboard topic notification already sent for detection " - f"{detection_id}, skipping" + f"Vehicle notification sent for detection " + f"{detection_id}: {vehicle_response}" + ) + except Exception as e: + logger.warning( + f"Vehicle notification failed for detection {detection_id}: {e}" ) - - # 5. 매칭된 차량에 개별 푸시 (기존 동작) - vehicle = None - if detection.vehicle_id: - try: - vehicle = Vehicle.objects.using("vehicles_db").get( - id=detection.vehicle_id - ) - except Vehicle.DoesNotExist: - logger.warning(f"Vehicle {detection.vehicle_id} not found") - - if vehicle and vehicle.fcm_token: - try: - if FCM_MOCK: - vehicle_response = f"mock-message-id-{detection_id}" - else: - from core.firebase.fcm import send_push_notification - - vehicle_response = send_push_notification( - token=vehicle.fcm_token, - title=title, - body=body, - data=data, - ) - - Notification.objects.using("notifications_db").create( - detection_id=detection_id, - fcm_token=vehicle.fcm_token, - title=title, - body=body, - status="sent", - sent_at=timezone.now(), - ) - logger.info( - f"Vehicle notification sent for detection " - f"{detection_id}: {vehicle_response}" - ) - except Exception as e: - logger.warning( - f"Vehicle notification failed for detection " f"{detection_id}: {e}" - ) - Notification.objects.using("notifications_db").create( - detection_id=detection_id, - fcm_token=vehicle.fcm_token, - title=title, - body=body, - status="failed", - error_message=str(e), - ) - - return { - "status": "sent", - "topic": bool(topic_response), - "vehicle": bool(vehicle and vehicle.fcm_token), - } - - except Detection.DoesNotExist: - logger.warning(f"Detection {detection_id} not found or not completed, retrying") - raise self.retry(countdown=3, max_retries=3) - - except Exception as exc: - try: - from apps.notifications.models import Notification - Notification.objects.using("notifications_db").create( detection_id=detection_id, + fcm_token=vehicle.fcm_token, + title=title, + body=body, status="failed", - retry_count=self.request.retries, - error_message=str(exc), - ) - except Exception as db_err: - logger.error( - f"Failed to record notification failure for detection {detection_id}: {db_err}" + error_message=str(e), ) - logger.error(f"Notification failed for detection {detection_id}: {exc}") - raise + logger.info(f"Notification processing completed for detection {detection_id}") + return { + "status": "sent", + "topic": bool(topic_response), + "vehicle": bool(vehicle and vehicle.fcm_token), + } diff --git a/tasks/ocr_tasks.py b/tasks/ocr_tasks.py index a547939..b846c72 100644 --- a/tasks/ocr_tasks.py +++ b/tasks/ocr_tasks.py @@ -56,12 +56,11 @@ def process_ocr(self, detection_id: int, gcs_uri: str): OCR 처리 Task - GCS에서 이미지 다운로드 - EasyOCR 실행 - - 직접 MySQL 업데이트 (Choreography 패턴) + - 처리 완료 후 detection.completed 이벤트 발행 (Choreography) - MSA: 각 서비스별 DB 사용 """ from apps.detections.models import Detection from apps.vehicles.models import Vehicle - from tasks.notification_tasks import send_notification try: # 1. 상태를 processing으로 업데이트 (detections_db) @@ -130,13 +129,19 @@ def process_ocr(self, detection_id: int, gcs_uri: str): except Exception as e: logger.warning(f"Vehicle lookup failed: {e}") - # 7. Always send notification for completed detections - # (dashboard gets topic notification; matched vehicle gets individual push) + # 7. detections.completed 이벤트 발행 (Choreography) + # OCR은 알림 서비스의 존재를 모른다. 완료 사실만 발행하고 끝. + # AMQP topic exchange를 통해 관심 있는 서비스가 독립적으로 구독. try: - send_notification.apply_async(args=[detection_id], queue="fcm_queue") + from core.events.publisher import publish_event + + publish_event( + "detections.completed", + {"detection_id": detection_id}, + ) except Exception as e: logger.warning( - f"Failed to enqueue notification for detection {detection_id}: {e}" + f"Failed to publish completion event for detection {detection_id}: {e}" ) logger.info(f"OCR completed for detection {detection_id}: {plate_number}") diff --git a/tests/unit/test_tasks.py b/tests/unit/test_tasks.py index a636a79..093eb8c 100644 --- a/tests/unit/test_tasks.py +++ b/tests/unit/test_tasks.py @@ -80,13 +80,13 @@ def test_notification_body_format(self, completed_detection): @pytest.mark.skipif(not firebase_available, reason="firebase_admin not installed") @patch("tasks.notification_tasks.os.environ.get") - def test_send_notification_mock_mode(self, mock_env, completed_detection): - """Mock 모드에서 알림 전송 테스트""" + def test_process_notification_mock_mode(self, mock_env, completed_detection): + """Mock 모드에서 알림 처리 테스트""" mock_env.return_value = "true" - from tasks.notification_tasks import send_notification + from tasks.notification_tasks import process_notification - result = send_notification(completed_detection.id) + result = process_notification(completed_detection.id) assert result["status"] in ["sent", "skipped"] From a0525e2018a83733274361d6681563fd43a0640f Mon Sep 17 00:00:00 2001 From: sanghun Date: Fri, 13 Feb 2026 23:26:52 +0900 Subject: [PATCH 2/4] fix: flake8 lint errors in mqtt-load-test.py - Remove unnecessary f-string prefixes (F541) - Fix line too long >120 chars (E501) --- docker/k6/mqtt-load-test.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/docker/k6/mqtt-load-test.py b/docker/k6/mqtt-load-test.py index 06a3bb4..a168c3a 100644 --- a/docker/k6/mqtt-load-test.py +++ b/docker/k6/mqtt-load-test.py @@ -323,7 +323,7 @@ def verify_pipeline(expected_count: int, max_wait_sec: int = 30) -> Optional[Dic result = dict(zip(columns, result)) initial_total = result['total'] or 0 - print(f"\nInitial state (last 10 minutes):") + print("\nInitial state (last 10 minutes):") print(f" Total detections: {initial_total}") print(f" - completed: {result['completed'] or 0}") print(f" - processing: {result['processing'] or 0}") @@ -382,7 +382,7 @@ def verify_pipeline(expected_count: int, max_wait_sec: int = 30) -> Optional[Dic columns = [desc[0] for desc in cursor.description] final_result = dict(zip(columns, final_result)) - print(f"\nFinal state:") + print("\nFinal state:") print(f" Total detections created: {final_result['total']}") print(f" - completed: {final_result['completed'] or 0}") print(f" - processing: {final_result['processing'] or 0}") @@ -435,7 +435,8 @@ def print_comparison(scenario_name: str, config: Dict, db_result: Optional[Dict] actual_error_rate = (failed / total * 100) if total > 0 else 0 expected_error_rate = config["expected_error_rate"] error_match = actual_error_rate <= expected_error_rate - print(f" Error rate: {actual_error_rate:.2f}% vs Expected: <{expected_error_rate}% {'✓ PASS' if error_match else '✗ FAIL'}") + err_status = '✓ PASS' if error_match else '✗ FAIL' + print(f" Error rate: {actual_error_rate:.2f}% vs Expected: <{expected_error_rate}% {err_status}") # Queue depth (from DB if available) if db_result: @@ -476,7 +477,7 @@ def run_scenario(scenario_name: str): if success: print("\n=== RESULTS ===") - print(f" Published: 1 | Failed: 0") + print(" Published: 1 | Failed: 0") print(f" Avg Latency: {stats['total_latency_ms']:.2f}ms") # Wait a bit for pipeline From 2ac1e6c8c8e63f66e7596117be8a45806ff06325 Mon Sep 17 00:00:00 2001 From: sanghun Date: Fri, 13 Feb 2026 23:29:16 +0900 Subject: [PATCH 3/4] style: apply black formatting --- docker/k6/mqtt-load-test.py | 86 ++++++++++++++++++++++--------------- tasks/notification_tasks.py | 4 +- 2 files changed, 53 insertions(+), 37 deletions(-) diff --git a/docker/k6/mqtt-load-test.py b/docker/k6/mqtt-load-test.py index a168c3a..4924abf 100644 --- a/docker/k6/mqtt-load-test.py +++ b/docker/k6/mqtt-load-test.py @@ -250,16 +250,17 @@ def get_db_connection(): # Try pymysql first try: import pymysql + conn = pymysql.connect( host=DB_HOST, port=DB_PORT, user=DB_USER, password=DB_PASS, database=DB_NAME, - charset='utf8mb4', - cursorclass=pymysql.cursors.DictCursor + charset="utf8mb4", + cursorclass=pymysql.cursors.DictCursor, ) - return conn, 'pymysql' + return conn, "pymysql" except ImportError: pass except Exception as e: @@ -268,15 +269,16 @@ def get_db_connection(): # Try mysql.connector try: import mysql.connector + conn = mysql.connector.connect( host=DB_HOST, port=DB_PORT, user=DB_USER, password=DB_PASS, database=DB_NAME, - charset='utf8mb4' + charset="utf8mb4", ) - return conn, 'mysql.connector' + return conn, "mysql.connector" except ImportError: pass except Exception as e: @@ -289,7 +291,9 @@ def verify_pipeline(expected_count: int, max_wait_sec: int = 30) -> Optional[Dic """Verify the pipeline by querying MySQL database.""" conn, driver = get_db_connection() if not conn: - print("\n⚠ WARNING: No MySQL driver available (pymysql or mysql-connector-python)") + print( + "\n⚠ WARNING: No MySQL driver available (pymysql or mysql-connector-python)" + ) print(" Pipeline verification skipped. Install pymysql to enable:") print(" pip install pymysql") return None @@ -314,7 +318,7 @@ def verify_pipeline(expected_count: int, max_wait_sec: int = 30) -> Optional[Dic WHERE created_at >= DATE_SUB(NOW(), INTERVAL 10 MINUTE) """) - if driver == 'pymysql': + if driver == "pymysql": result = cursor.fetchone() else: # mysql.connector result = cursor.fetchone() @@ -322,7 +326,7 @@ def verify_pipeline(expected_count: int, max_wait_sec: int = 30) -> Optional[Dic columns = [desc[0] for desc in cursor.description] result = dict(zip(columns, result)) - initial_total = result['total'] or 0 + initial_total = result["total"] or 0 print("\nInitial state (last 10 minutes):") print(f" Total detections: {initial_total}") print(f" - completed: {result['completed'] or 0}") @@ -346,21 +350,23 @@ def verify_pipeline(expected_count: int, max_wait_sec: int = 30) -> Optional[Dic WHERE created_at >= DATE_SUB(NOW(), INTERVAL 10 MINUTE) """) - if driver == 'pymysql': + if driver == "pymysql": result = cursor.fetchone() else: result = cursor.fetchone() columns = [desc[0] for desc in cursor.description] result = dict(zip(columns, result)) - processing = result['processing'] or 0 - pending = result['pending'] or 0 + processing = result["processing"] or 0 + pending = result["pending"] or 0 if processing == 0 and pending == 0: print(f"✓ Pipeline drained after {time.time() - wait_start:.1f}s") break - print(f" [{time.time() - wait_start:.1f}s] processing: {processing}, pending: {pending}") + print( + f" [{time.time() - wait_start:.1f}s] processing: {processing}, pending: {pending}" + ) time.sleep(2) # Final status @@ -375,7 +381,7 @@ def verify_pipeline(expected_count: int, max_wait_sec: int = 30) -> Optional[Dic WHERE created_at >= DATE_SUB(NOW(), INTERVAL 10 MINUTE) """) - if driver == 'pymysql': + if driver == "pymysql": final_result = cursor.fetchone() else: final_result = cursor.fetchone() @@ -389,10 +395,12 @@ def verify_pipeline(expected_count: int, max_wait_sec: int = 30) -> Optional[Dic print(f" - pending: {final_result['pending'] or 0}") print(f" - failed: {final_result['failed'] or 0}") - total = final_result['total'] or 0 - completed = final_result['completed'] or 0 + total = final_result["total"] or 0 + completed = final_result["completed"] or 0 completion_rate = (completed / total * 100) if total > 0 else 0 - print(f" Pipeline completion rate: {completed}/{total} = {completion_rate:.1f}%") + print( + f" Pipeline completion rate: {completed}/{total} = {completion_rate:.1f}%" + ) cursor.close() return final_result @@ -409,10 +417,14 @@ def print_hypothesis(scenario_name: str, config: Dict): print("=== HYPOTHESIS ===") print(f" Scenario: {scenario_name.upper()} - {config['description']}") print(f" Expected published: {config['expected_messages']} messages") - print(f" Expected OCR throughput: ~{config['expected_throughput']} msg/s (OCR_CONCURRENCY=1, ~5s/image)") - if config['duration'] > 0: - expected_processed = int(config['expected_throughput'] * config['duration']) - print(f" Expected processed in {config['duration']}s: ~{expected_processed} messages") + print( + f" Expected OCR throughput: ~{config['expected_throughput']} msg/s (OCR_CONCURRENCY=1, ~5s/image)" + ) + if config["duration"] > 0: + expected_processed = int(config["expected_throughput"] * config["duration"]) + print( + f" Expected processed in {config['duration']}s: ~{expected_processed} messages" + ) print(f" Expected queue depth at end: ~{config['expected_queue_depth']} messages") print(f" Expected error rate: <{config['expected_error_rate']}%") print(f"{'='*60}\n") @@ -426,8 +438,12 @@ def print_comparison(scenario_name: str, config: Dict, db_result: Optional[Dict] # Published vs Expected published = stats["published"] expected_pub = config["expected_messages"] - pub_match = abs(published - expected_pub) <= max(1, expected_pub * 0.1) # 10% tolerance - print(f" Published: {published} vs Expected: {expected_pub} {'✓ PASS' if pub_match else '✗ FAIL'}") + pub_match = abs(published - expected_pub) <= max( + 1, expected_pub * 0.1 + ) # 10% tolerance + print( + f" Published: {published} vs Expected: {expected_pub} {'✓ PASS' if pub_match else '✗ FAIL'}" + ) # Error rate failed = stats["failed"] @@ -435,23 +451,27 @@ def print_comparison(scenario_name: str, config: Dict, db_result: Optional[Dict] actual_error_rate = (failed / total * 100) if total > 0 else 0 expected_error_rate = config["expected_error_rate"] error_match = actual_error_rate <= expected_error_rate - err_status = '✓ PASS' if error_match else '✗ FAIL' - print(f" Error rate: {actual_error_rate:.2f}% vs Expected: <{expected_error_rate}% {err_status}") + err_status = "✓ PASS" if error_match else "✗ FAIL" + print( + f" Error rate: {actual_error_rate:.2f}% vs Expected: <{expected_error_rate}% {err_status}" + ) # Queue depth (from DB if available) if db_result: - processing = db_result['processing'] or 0 - pending = db_result['pending'] or 0 + processing = db_result["processing"] or 0 + pending = db_result["pending"] or 0 actual_queue = processing + pending - expected_queue = config['expected_queue_depth'] + expected_queue = config["expected_queue_depth"] print(f" Queue depth: {actual_queue} vs Expected: ~{expected_queue} (INFO)") # Pipeline completion - completed = db_result['completed'] or 0 - db_total = db_result['total'] or 0 + completed = db_result["completed"] or 0 + db_total = db_result["total"] or 0 if db_total > 0: completion_rate = completed / db_total * 100 - print(f" Pipeline completion: {completion_rate:.1f}% ({completed}/{db_total})") + print( + f" Pipeline completion: {completion_rate:.1f}% ({completed}/{db_total})" + ) print(f"{'='*60}\n") @@ -564,12 +584,10 @@ def main(): python mqtt-load-test.py smoke python mqtt-load-test.py baseline python mqtt-load-test.py saturation - """ + """, ) parser.add_argument( - "scenario", - choices=list(SCENARIOS.keys()), - help="Test scenario to run" + "scenario", choices=list(SCENARIOS.keys()), help="Test scenario to run" ) args = parser.parse_args() diff --git a/tasks/notification_tasks.py b/tasks/notification_tasks.py index 70d2ac0..a9061ec 100644 --- a/tasks/notification_tasks.py +++ b/tasks/notification_tasks.py @@ -102,9 +102,7 @@ def process_notification(detection_id: int): vehicle = None if detection.vehicle_id: try: - vehicle = Vehicle.objects.using("vehicles_db").get( - id=detection.vehicle_id - ) + vehicle = Vehicle.objects.using("vehicles_db").get(id=detection.vehicle_id) except Vehicle.DoesNotExist: logger.warning(f"Vehicle {detection.vehicle_id} not found") From 4e56318e92b606809c0f7e6d15bd09b849443fbc Mon Sep 17 00:00:00 2001 From: sanghun Date: Fri, 13 Feb 2026 23:34:11 +0900 Subject: [PATCH 4/4] style: fix black==24.10.0 formatting for CI compatibility --- docker/k6/mqtt-load-test.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/docker/k6/mqtt-load-test.py b/docker/k6/mqtt-load-test.py index 4924abf..f827d01 100644 --- a/docker/k6/mqtt-load-test.py +++ b/docker/k6/mqtt-load-test.py @@ -307,7 +307,8 @@ def verify_pipeline(expected_count: int, max_wait_sec: int = 30) -> Optional[Dic cursor = conn.cursor() # Initial check - cursor.execute(""" + cursor.execute( + """ SELECT COUNT(*) as total, SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed, @@ -316,7 +317,8 @@ def verify_pipeline(expected_count: int, max_wait_sec: int = 30) -> Optional[Dic SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed FROM detections WHERE created_at >= DATE_SUB(NOW(), INTERVAL 10 MINUTE) - """) + """ + ) if driver == "pymysql": result = cursor.fetchone() @@ -339,7 +341,8 @@ def verify_pipeline(expected_count: int, max_wait_sec: int = 30) -> Optional[Dic print(f"\nWaiting up to {max_wait_sec}s for pipeline to drain...") wait_start = time.time() while time.time() - wait_start < max_wait_sec: - cursor.execute(""" + cursor.execute( + """ SELECT COUNT(*) as total, SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed, @@ -348,7 +351,8 @@ def verify_pipeline(expected_count: int, max_wait_sec: int = 30) -> Optional[Dic SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed FROM detections WHERE created_at >= DATE_SUB(NOW(), INTERVAL 10 MINUTE) - """) + """ + ) if driver == "pymysql": result = cursor.fetchone() @@ -370,7 +374,8 @@ def verify_pipeline(expected_count: int, max_wait_sec: int = 30) -> Optional[Dic time.sleep(2) # Final status - cursor.execute(""" + cursor.execute( + """ SELECT COUNT(*) as total, SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END) as completed, @@ -379,7 +384,8 @@ def verify_pipeline(expected_count: int, max_wait_sec: int = 30) -> Optional[Dic SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed FROM detections WHERE created_at >= DATE_SUB(NOW(), INTERVAL 10 MINUTE) - """) + """ + ) if driver == "pymysql": final_result = cursor.fetchone()