Skip to content
Merged
991 changes: 619 additions & 372 deletions README.md

Large diffs are not rendered by default.

51 changes: 23 additions & 28 deletions core/mqtt/subscriber.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
"""
MQTT Subscriber for IoT Edge Device messages

IoT 디바이스(Raspberry Pi 카메라)에서 오는 MQTT 메시지만 처리.
프로토콜 분리 원칙: MQTT는 IoT 경계 전용, 서비스 간 이벤트는 AMQP.
"""
"""MQTT Subscriber for Edge Device messages"""

import json
import logging
import os

import paho.mqtt.client as mqtt
from django.db import close_old_connections
from django.utils import timezone
from django.utils.dateparse import parse_datetime

Expand All @@ -18,10 +14,13 @@

class MQTTSubscriber:
"""
RabbitMQ MQTT Plugin을 통해 IoT 디바이스 메시지를 수신하는 Subscriber
RabbitMQ MQTT Plugin을 통해 Edge Device 메시지를 수신하는 Subscriber

구독 토픽:
- detections/new : IoT 디바이스 → Detection 생성 → OCR 발행
Flow:
1. Raspberry Pi -> MQTT Publish (detections/new)
2. RabbitMQ MQTT Plugin -> 내부 변환
3. Django MQTT Subscriber -> 메시지 수신
4. Detection 생성 (pending) -> OCR Task 발행
"""

def __init__(self):
Expand All @@ -35,12 +34,12 @@ def __init__(self):
self.client.on_disconnect = self.on_disconnect

# 인증 설정
username = os.getenv("MQTT_USER", "")
password = os.getenv("MQTT_PASS", "")
username = os.getenv("MQTT_USER", "sa")
password = os.getenv("MQTT_PASS", "1234")
self.client.username_pw_set(username, password)

def on_connect(self, client, userdata, flags, reason_code, properties):
"""MQTT 연결 시 IoT 토픽 구독"""
"""MQTT 연결 시 토픽 구독"""
if reason_code.is_failure:
logger.error(f"MQTT connection failed: {reason_code}")
else:
Expand All @@ -57,30 +56,23 @@ def on_disconnect(
)

def on_message(self, client, userdata, msg):
"""메시지 처리"""
try:
payload = json.loads(msg.payload.decode())
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON in MQTT message: {e}")
return

if msg.topic == "detections/new":
self._handle_new_detection(payload)
else:
logger.warning(f"Unknown MQTT topic: {msg.topic}")

def _handle_new_detection(self, payload):
"""
detections/new: IoT 디바이스에서 새 감지 수신
메시지 수신 시 처리
1. DB에 pending 레코드 즉시 생성 (데이터 손실 방지)
2. OCR Task 발행 (AMQP)
2. OCR Task 발행
"""
try:
payload = json.loads(msg.payload.decode())
logger.info(f"Received MQTT message: {payload.get('camera_id')}")

# 장기 실행 스레드에서 stale DB 연결 방지
close_old_connections()

# Import here to avoid circular imports
from apps.detections.models import Detection
from tasks.ocr_tasks import process_ocr

# 1. Detection 레코드 생성 (status=pending, detections_db)
detection = Detection.objects.using("detections_db").create(
camera_id=payload.get("camera_id"),
location=payload.get("location"),
Expand All @@ -93,6 +85,7 @@ def _handle_new_detection(self, payload):

logger.info(f"Detection {detection.id} created (pending)")

# 2. OCR Task 발행 (AMQP via Celery)
process_ocr.apply_async(
args=[detection.id],
kwargs={"gcs_uri": payload["image_gcs_uri"]},
Expand All @@ -102,10 +95,12 @@ def _handle_new_detection(self, payload):

logger.info(f"OCR task dispatched for detection {detection.id}")

except json.JSONDecodeError as e:
logger.error(f"Invalid JSON in MQTT message: {e}")
except KeyError as e:
logger.error(f"Missing required field in MQTT message: {e}")
except Exception as e:
logger.error(f"Error processing new detection: {e}")
logger.error(f"Error processing MQTT message: {e}")

@staticmethod
def _parse_detected_at(value):
Expand Down
Loading