Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions backend.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,17 @@ MQTT_USER=sa
MQTT_PASS=1234

# ===========================================
# GCS (Google Cloud Storage) 설정
# GCS (Google Cloud Storage) 인증
# ===========================================
# OCR_MOCK=false일 때만 필요
GOOGLE_APPLICATION_CREDENTIALS=/-path(secret)-/gcp-cloud-storage.json
# GCS_BUCKET_NAME은 불필요 (GCS URI에서 자동 파싱)
# GCS는 ADC (Application Default Credentials) 사용
# GCE 인스턴스: 메타데이터 서버에서 자동 인증 (설정 불필요)
# 로컬 개발: gcloud auth application-default login 실행 후 사용
# OCR_MOCK=true이면 GCS 호출 없으므로 인증 불필요

# ===========================================
# Firebase 설정 (FCM Push Notification)
# ===========================================
FIREBASE_CREDENTIALS=/-path(secret)-/firebase-service-account.json
FIREBASE_CREDENTIALS=/app/credentials/firebase-service-account.json

# ===========================================
# Celery Worker 설정
Expand Down Expand Up @@ -91,3 +92,5 @@ OTEL_RESOURCE_ATTRIBUTES=service.namespace=speedcam,deployment.environment=dev
# Valid values: always_on, always_off, traceidratio, parentbased_always_on, parentbased_always_off, parentbased_traceidratio
OTEL_TRACES_SAMPLER=parentbased_always_on
OTEL_PYTHON_LOG_CORRELATION=true
# GCP metadata 내부 요청을 트레이싱에서 제외 (404 ERROR span 방지)
OTEL_PYTHON_REQUESTS_EXCLUDED_URLS=metadata.google.internal
19 changes: 2 additions & 17 deletions config/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@

# Exchange 정의
ocr_exchange = Exchange("ocr_exchange", type="direct", durable=True)
fcm_exchange = Exchange("fcm_exchange", type="direct", durable=True)
dlq_exchange = Exchange("dlq_exchange", type="fanout", durable=True)

# Queue 정의
# Note: fcm_queue 제거 — Alert Service는 Celery Command가 아닌
# AMQP domain_events exchange의 detections.completed 이벤트를 직접 구독 (Choreography)
app.conf.task_queues = (
# 새로운 Queue (PRD 구조)
Queue(
"ocr_queue",
exchange=ocr_exchange,
Expand All @@ -31,15 +31,6 @@
"x-max-priority": 10,
},
),
Queue(
"fcm_queue",
exchange=fcm_exchange,
routing_key="fcm",
queue_arguments={
"x-dead-letter-exchange": "dlq_exchange",
"x-message-ttl": 3600000,
},
),
Queue(
"dlq_queue",
exchange=dlq_exchange,
Expand All @@ -49,17 +40,11 @@

# Task 라우팅
app.conf.task_routes = {
# 새로운 Tasks (PRD 구조)
"tasks.ocr_tasks.process_ocr": {
"queue": "ocr_queue",
"exchange": "ocr_exchange",
"routing_key": "ocr",
},
"tasks.notification_tasks.send_notification": {
"queue": "fcm_queue",
"exchange": "fcm_exchange",
"routing_key": "fcm",
},
"tasks.dlq_tasks.process_dlq_message": {
"queue": "dlq_queue",
"exchange": "dlq_exchange",
Expand Down
5 changes: 5 additions & 0 deletions core/events/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Domain Events Module (AMQP)
# 백엔드 서비스 간 도메인 이벤트 발행/구독 (Choreography)
from .publisher import publish_event

__all__ = ["publish_event"]
106 changes: 106 additions & 0 deletions core/events/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
"""
AMQP Domain Event Consumer (Kombu)

Alert Service가 도메인 이벤트를 직접 구독하여 자율적으로 처리.
Choreography: 각 서비스는 이벤트에 독립적으로 반응한다.

Main Service를 거치지 않고 Alert Service가 직접
detections.completed 이벤트를 구독하여 알림 발송 여부를 결정한다.
"""

import json
import logging
import os
import time

from kombu import Connection, Exchange, Queue
from kombu.mixins import ConsumerMixin

logger = logging.getLogger(__name__)

DOMAIN_EVENTS_EXCHANGE = Exchange("domain_events", type="topic", durable=True)


class AlertEventConsumer(ConsumerMixin):
"""
Alert Service 도메인 이벤트 소비자

detections.completed 이벤트를 구독하고,
Alert Service가 자율적으로 알림 발송 여부를 결정한다.
OCR Service의 존재도, Main Service의 중개도 모른다.
"""

def __init__(self, connection):
self.connection = connection

def get_consumers(self, Consumer, channel):
queue = Queue(
"alert_domain_events",
exchange=DOMAIN_EVENTS_EXCHANGE,
routing_key="detections.completed",
durable=True,
queue_arguments={
"x-dead-letter-exchange": "dlq_exchange",
},
)
return [
Consumer(
queues=[queue],
callbacks=[self.on_event],
accept=["json"],
)
]

def on_event(self, body, message):
"""도메인 이벤트 수신 및 처리"""
try:
payload = json.loads(body) if isinstance(body, str) else body
routing_key = message.delivery_info.get("routing_key", "")

if routing_key == "detections.completed":
self._on_detection_completed(payload)

message.ack()
except Exception as e:
logger.error(f"Failed to process domain event: {e}")
message.reject(requeue=False)

def _on_detection_completed(self, payload):
"""
detections.completed 이벤트에 반응

Alert Service의 자율적 판단:
"OCR이 완료됐으니 알림을 보내야겠다"
"""
detection_id = payload["detection_id"]
logger.info(
f"Detection {detection_id} completed event received — "
f"processing notification"
)

max_retries = 3
for attempt in range(max_retries + 1):
try:
from tasks.notification_tasks import process_notification

process_notification(detection_id)
return
except Exception as e:
if "DoesNotExist" in type(e).__name__ and attempt < max_retries:
logger.warning(
f"Detection {detection_id} not ready, "
f"retry {attempt + 1}/{max_retries}"
)
time.sleep(3)
else:
raise


def start_event_consumer():
"""Alert Service 도메인 이벤트 소비자 시작 (blocking)"""
broker_url = os.getenv("CELERY_BROKER_URL", "amqp://guest:guest@rabbitmq:5672//")
logger.info(f"Starting Alert Event Consumer on {broker_url}")

with Connection(broker_url) as conn:
consumer = AlertEventConsumer(conn)
consumer.run()
46 changes: 46 additions & 0 deletions core/events/publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""
AMQP Domain Event Publisher (Kombu)

Choreography 패턴에서 백엔드 서비스 간 도메인 이벤트 발행.
프로토콜 분리 원칙: IoT 경계는 MQTT, 서비스 간 이벤트는 AMQP.
"""

import json
import logging
import os

from kombu import Connection, Exchange

logger = logging.getLogger(__name__)

# 도메인 이벤트 교환기 (topic exchange: routing key 기반 선택적 구독)
DOMAIN_EVENTS_EXCHANGE = Exchange("domain_events", type="topic", durable=True)


def publish_event(routing_key: str, payload: dict):
"""
AMQP 도메인 이벤트 발행

Topic exchange를 사용하여 이벤트를 발행하면,
관심 있는 서비스가 routing key 패턴으로 독립적으로 구독한다.

Args:
routing_key: 이벤트 라우팅 키 (예: "detections.completed")
payload: 이벤트 페이로드
"""
broker_url = os.getenv("CELERY_BROKER_URL", "amqp://guest:guest@rabbitmq:5672//")

try:
with Connection(broker_url) as conn:
producer = conn.Producer()
producer.publish(
json.dumps(payload),
exchange=DOMAIN_EVENTS_EXCHANGE,
routing_key=routing_key,
content_type="application/json",
declare=[DOMAIN_EVENTS_EXCHANGE],
)
logger.info(f"Domain event published: {routing_key} -> {payload}")
except Exception as e:
logger.error(f"Failed to publish domain event {routing_key}: {e}")
raise
2 changes: 1 addition & 1 deletion core/firebase/fcm.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def initialize_firebase():
firebase_admin.initialize_app(cred)
logger.info(f"Firebase initialized with credentials: {cred_path}")
else:
# GOOGLE_APPLICATION_CREDENTIALS 사용
# ADC (Application Default Credentials) 사용
firebase_admin.initialize_app()
logger.info("Firebase initialized with default credentials")

Expand Down
2 changes: 1 addition & 1 deletion core/mqtt/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# MQTT Module
# MQTT Module (IoT Edge 전용)
from .subscriber import MQTTSubscriber

__all__ = ["MQTTSubscriber"]
47 changes: 28 additions & 19 deletions core/mqtt/subscriber.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
"""MQTT Subscriber for Edge Device messages"""
"""
MQTT Subscriber for IoT Edge Device messages

IoT 디바이스(Raspberry Pi 카메라)에서 오는 MQTT 메시지만 처리.
프로토콜 분리 원칙: MQTT는 IoT 경계 전용, 서비스 간 이벤트는 AMQP.
"""

import json
import logging
Expand All @@ -13,13 +18,10 @@

class MQTTSubscriber:
"""
RabbitMQ MQTT Plugin을 통해 Edge Device 메시지를 수신하는 Subscriber
RabbitMQ MQTT Plugin을 통해 IoT 디바이스 메시지를 수신하는 Subscriber

Flow:
1. Raspberry Pi -> MQTT Publish (detections/new)
2. RabbitMQ MQTT Plugin -> 내부 변환
3. Django MQTT Subscriber -> 메시지 수신
4. Detection 생성 (pending) -> OCR Task 발행
구독 토픽:
- detections/new : IoT 디바이스 → Detection 생성 → OCR 발행
"""

def __init__(self):
Expand All @@ -33,12 +35,12 @@ def __init__(self):
self.client.on_disconnect = self.on_disconnect

# 인증 설정
username = os.getenv("MQTT_USER", "sa")
password = os.getenv("MQTT_PASS", "1234")
username = os.getenv("MQTT_USER", "")
password = os.getenv("MQTT_PASS", "")
self.client.username_pw_set(username, password)

def on_connect(self, client, userdata, flags, reason_code, properties):
"""MQTT 연결 시 토픽 구독"""
"""MQTT 연결 시 IoT 토픽 구독"""
if reason_code.is_failure:
logger.error(f"MQTT connection failed: {reason_code}")
else:
Expand All @@ -55,20 +57,30 @@ def on_disconnect(
)

def on_message(self, client, userdata, msg):
"""메시지 처리"""
try:
payload = json.loads(msg.payload.decode())
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON in MQTT message: {e}")
return

if msg.topic == "detections/new":
self._handle_new_detection(payload)
else:
logger.warning(f"Unknown MQTT topic: {msg.topic}")

def _handle_new_detection(self, payload):
"""
메시지 수신 시 처리
detections/new: IoT 디바이스에서 새 감지 수신
1. DB에 pending 레코드 즉시 생성 (데이터 손실 방지)
2. OCR Task 발행
2. OCR Task 발행 (AMQP)
"""
try:
payload = json.loads(msg.payload.decode())
logger.info(f"Received MQTT message: {payload.get('camera_id')}")

# Import here to avoid circular imports
from apps.detections.models import Detection
from tasks.ocr_tasks import process_ocr

# 1. Detection 레코드 생성 (status=pending, detections_db)
detection = Detection.objects.using("detections_db").create(
camera_id=payload.get("camera_id"),
location=payload.get("location"),
Expand All @@ -81,7 +93,6 @@ def on_message(self, client, userdata, msg):

logger.info(f"Detection {detection.id} created (pending)")

# 2. OCR Task 발행 (AMQP via Celery)
process_ocr.apply_async(
args=[detection.id],
kwargs={"gcs_uri": payload["image_gcs_uri"]},
Expand All @@ -91,12 +102,10 @@ def on_message(self, client, userdata, msg):

logger.info(f"OCR task dispatched for detection {detection.id}")

except json.JSONDecodeError as e:
logger.error(f"Invalid JSON in MQTT message: {e}")
except KeyError as e:
logger.error(f"Missing required field in MQTT message: {e}")
except Exception as e:
logger.error(f"Error processing MQTT message: {e}")
logger.error(f"Error processing new detection: {e}")

@staticmethod
def _parse_detected_at(value):
Expand Down
25 changes: 15 additions & 10 deletions credentials/README.md
Original file line number Diff line number Diff line change
@@ -1,28 +1,33 @@
# Credentials 폴더

이 폴더에는 GCP 및 Firebase 인증 관련 파일들이 위치합니다.
이 폴더에는 Firebase 인증 파일이 위치합니다.
GCS (Cloud Storage)는 ADC를 사용하므로 JSON 키가 불필요합니다.

## 파일 목록

| 파일명 | 용도 | 환경변수 |
|--------|------|----------|
| `firebase-service-account.json` | FCM 푸시 알림 | `FIREBASE_CREDENTIALS` |
| `gcp-cloud-storage.json` | GCS 이미지 저장소 | `GOOGLE_APPLICATION_CREDENTIALS` |

## 인증 방식

| 서비스 | 인증 방식 | 설정 |
|--------|----------|------|
| **GCS (Cloud Storage)** | ADC (Application Default Credentials) | GCE: 자동, 로컬: `gcloud auth application-default login` |
| **Firebase (FCM)** | Service Account JSON 키 | `FIREBASE_CREDENTIALS` 환경변수로 경로 지정 |

## 설정 방법

### 1. Firebase Service Account
### Firebase Service Account
1. [Firebase Console](https://console.firebase.google.com/) 접속
2. 프로젝트 설정 → 서비스 계정 → 새 비공개 키 생성
3. 다운로드한 JSON 파일을 `firebase-service-account.json`으로 저장

### 2. GCP Service Account (Cloud Storage)
1. [GCP Console](https://console.cloud.google.com/) 접속
2. IAM 및 관리자 → 서비스 계정 → 키 생성
3. 필요한 역할: `Storage Object Viewer`, `Storage Object Creator`
4. 다운로드한 JSON 파일을 `gcp-cloud-storage.json`으로 저장
### GCS (ADC)
- GCE 인스턴스: 설정 불필요 (메타데이터 서버에서 자동 인증)
- 로컬 개발: `gcloud auth application-default login` 실행

## 주의사항

- ⚠️ **실제 인증 파일은 절대 Git에 커밋하지 마세요**
- Docker 환경에서는 볼륨 마운트 또는 Secret Manager 사용 권장
- `GOOGLE_APPLICATION_CREDENTIALS` 환경변수를 설정하지 마세요 (ADC 자동 탐색 방해)
- Firebase JSON 키는 절대 Git에 커밋하지 마세요
Loading