Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 40 additions & 5 deletions .github/workflows/docker-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,26 @@ name: Docker Build

on:
push:
branches: [develop]
branches: [develop, main]
pull_request:
branches: [develop]
branches: [develop, main]

concurrency:
group: docker-${{ github.ref }}
cancel-in-progress: true

env:
AR_REGION: asia-northeast3
GCP_PROJECT_ID: project-4f918446-ebe5-4774-a52
AR_REPO: speedcam

jobs:
build:
name: ${{ matrix.service.name }}
runs-on: ubuntu-latest
permissions:
contents: read
id-token: write

strategy:
fail-fast: false
Expand All @@ -33,12 +41,39 @@ jobs:
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3

- name: Build ${{ matrix.service.name }} image
- name: Authenticate to Google Cloud
if: github.ref == 'refs/heads/main' && github.event_name == 'push'
uses: google-github-actions/auth@v2
with:
workload_identity_provider: ${{ secrets.WIF_PROVIDER }}
service_account: ${{ secrets.WIF_SERVICE_ACCOUNT }}

- name: Configure Docker for Artifact Registry
if: github.ref == 'refs/heads/main' && github.event_name == 'push'
run: gcloud auth configure-docker ${{ env.AR_REGION }}-docker.pkg.dev

- name: Build and push ${{ matrix.service.name }} image
uses: docker/build-push-action@v6
with:
context: .
file: ${{ matrix.service.dockerfile }}
push: false
push: ${{ github.ref == 'refs/heads/main' && github.event_name == 'push' }}
cache-from: type=gha,scope=${{ matrix.service.name }}
cache-to: type=gha,mode=max,scope=${{ matrix.service.name }}
tags: speedcam/${{ matrix.service.name }}:ci-${{ github.sha }}
tags: |
${{ env.AR_REGION }}-docker.pkg.dev/${{ env.GCP_PROJECT_ID }}/${{ env.AR_REPO }}/speedcam-${{ matrix.service.name }}:latest
${{ env.AR_REGION }}-docker.pkg.dev/${{ env.GCP_PROJECT_ID }}/${{ env.AR_REPO }}/speedcam-${{ matrix.service.name }}:${{ github.sha }}

trigger-deploy:
name: Trigger Deploy
needs: build
if: github.ref == 'refs/heads/main' && github.event_name == 'push'
runs-on: ubuntu-latest
steps:
- name: Trigger depoly CD workflow
run: |
curl -X POST \
-H "Accept: application/vnd.github.v3+json" \
-H "Authorization: token ${{ secrets.BACKEND_DEPLOY_TOKEN }}" \
https://api.github.com/repos/${{ github.repository_owner }}/depoly/dispatches \
-d '{"event_type":"deploy-backend","client_payload":{"image_tag":"${{ github.sha }}","triggered_by":"${{ github.actor }}"}}'
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -381,3 +381,21 @@ http://localhost:5173 에 접속하여 결과물을 조회합니다.

<br />
<br />

## 저장소 책임 범위

> "Django API와 Celery Worker의 소스 코드, Dockerfile, 로컬 개발 환경을 소유한다"

| 항목 | 이 저장소 | depoly 저장소 |
|------|----------|--------------|
| 애플리케이션 소스 코드 | O | X |
| Dockerfile (3개) | O | X |
| 로컬 개발 docker-compose | O | X |
| 로컬 개발 모니터링 설정 | O | X |
| 로컬 개발 env.example | O | X |
| GitHub Actions CI (빌드/테스트) | O | X |
| 프로덕션 compose 파일 | X | O |
| 프로덕션 모니터링 설정 | X | O |
| 프로덕션 env 템플릿 | X | O |
| GitHub Actions CD (배포) | X | O |
| 배포 스크립트/문서 | X | O |
3 changes: 2 additions & 1 deletion apps/vehicles/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ class Meta:
class VehicleCreateSerializer(serializers.ModelSerializer):
class Meta:
model = Vehicle
fields = ["plate_number", "owner_name", "owner_phone", "fcm_token"]
fields = ["id", "plate_number", "owner_name", "owner_phone", "fcm_token"]
read_only_fields = ["id"]


class FCMTokenUpdateSerializer(serializers.Serializer):
Expand Down
13 changes: 8 additions & 5 deletions backend.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,17 @@ MQTT_USER=sa
MQTT_PASS=1234

# ===========================================
# GCS (Google Cloud Storage) 설정
# GCS (Google Cloud Storage) 인증
# ===========================================
# OCR_MOCK=false일 때만 필요
GOOGLE_APPLICATION_CREDENTIALS=/-path(secret)-/gcp-cloud-storage.json
# GCS_BUCKET_NAME은 불필요 (GCS URI에서 자동 파싱)
# GCS는 ADC (Application Default Credentials) 사용
# GCE 인스턴스: 메타데이터 서버에서 자동 인증 (설정 불필요)
# 로컬 개발: gcloud auth application-default login 실행 후 사용
# OCR_MOCK=true이면 GCS 호출 없으므로 인증 불필요

# ===========================================
# Firebase 설정 (FCM Push Notification)
# ===========================================
FIREBASE_CREDENTIALS=/-path(secret)-/firebase-service-account.json
FIREBASE_CREDENTIALS=/app/credentials/firebase-service-account.json

# ===========================================
# Celery Worker 설정
Expand Down Expand Up @@ -91,3 +92,5 @@ OTEL_RESOURCE_ATTRIBUTES=service.namespace=speedcam,deployment.environment=dev
# Valid values: always_on, always_off, traceidratio, parentbased_always_on, parentbased_always_off, parentbased_traceidratio
OTEL_TRACES_SAMPLER=parentbased_always_on
OTEL_PYTHON_LOG_CORRELATION=true
# GCP metadata 내부 요청을 트레이싱에서 제외 (404 ERROR span 방지)
OTEL_PYTHON_REQUESTS_EXCLUDED_URLS=metadata.google.internal
19 changes: 2 additions & 17 deletions config/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@

# Exchange 정의
ocr_exchange = Exchange("ocr_exchange", type="direct", durable=True)
fcm_exchange = Exchange("fcm_exchange", type="direct", durable=True)
dlq_exchange = Exchange("dlq_exchange", type="fanout", durable=True)

# Queue 정의
# Note: fcm_queue 제거 — Alert Service는 Celery Command가 아닌
# AMQP domain_events exchange의 detections.completed 이벤트를 직접 구독 (Choreography)
app.conf.task_queues = (
# 새로운 Queue (PRD 구조)
Queue(
"ocr_queue",
exchange=ocr_exchange,
Expand All @@ -31,15 +31,6 @@
"x-max-priority": 10,
},
),
Queue(
"fcm_queue",
exchange=fcm_exchange,
routing_key="fcm",
queue_arguments={
"x-dead-letter-exchange": "dlq_exchange",
"x-message-ttl": 3600000,
},
),
Queue(
"dlq_queue",
exchange=dlq_exchange,
Expand All @@ -49,17 +40,11 @@

# Task 라우팅
app.conf.task_routes = {
# 새로운 Tasks (PRD 구조)
"tasks.ocr_tasks.process_ocr": {
"queue": "ocr_queue",
"exchange": "ocr_exchange",
"routing_key": "ocr",
},
"tasks.notification_tasks.send_notification": {
"queue": "fcm_queue",
"exchange": "fcm_exchange",
"routing_key": "fcm",
},
"tasks.dlq_tasks.process_dlq_message": {
"queue": "dlq_queue",
"exchange": "dlq_exchange",
Expand Down
5 changes: 5 additions & 0 deletions core/events/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Domain Events Module (AMQP)
# 백엔드 서비스 간 도메인 이벤트 발행/구독 (Choreography)
from .publisher import publish_event

__all__ = ["publish_event"]
106 changes: 106 additions & 0 deletions core/events/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
"""
AMQP Domain Event Consumer (Kombu)

Alert Service가 도메인 이벤트를 직접 구독하여 자율적으로 처리.
Choreography: 각 서비스는 이벤트에 독립적으로 반응한다.

Main Service를 거치지 않고 Alert Service가 직접
detections.completed 이벤트를 구독하여 알림 발송 여부를 결정한다.
"""

import json
import logging
import os
import time

from kombu import Connection, Exchange, Queue
from kombu.mixins import ConsumerMixin

logger = logging.getLogger(__name__)

DOMAIN_EVENTS_EXCHANGE = Exchange("domain_events", type="topic", durable=True)


class AlertEventConsumer(ConsumerMixin):
"""
Alert Service 도메인 이벤트 소비자

detections.completed 이벤트를 구독하고,
Alert Service가 자율적으로 알림 발송 여부를 결정한다.
OCR Service의 존재도, Main Service의 중개도 모른다.
"""

def __init__(self, connection):
self.connection = connection

def get_consumers(self, Consumer, channel):
queue = Queue(
"alert_domain_events",
exchange=DOMAIN_EVENTS_EXCHANGE,
routing_key="detections.completed",
durable=True,
queue_arguments={
"x-dead-letter-exchange": "dlq_exchange",
},
)
return [
Consumer(
queues=[queue],
callbacks=[self.on_event],
accept=["json"],
)
]

def on_event(self, body, message):
"""도메인 이벤트 수신 및 처리"""
try:
payload = json.loads(body) if isinstance(body, str) else body
routing_key = message.delivery_info.get("routing_key", "")

if routing_key == "detections.completed":
self._on_detection_completed(payload)

message.ack()
except Exception as e:
logger.error(f"Failed to process domain event: {e}")
message.reject(requeue=False)

def _on_detection_completed(self, payload):
"""
detections.completed 이벤트에 반응

Alert Service의 자율적 판단:
"OCR이 완료됐으니 알림을 보내야겠다"
"""
detection_id = payload["detection_id"]
logger.info(
f"Detection {detection_id} completed event received — "
f"processing notification"
)

max_retries = 3
for attempt in range(max_retries + 1):
try:
from tasks.notification_tasks import process_notification

process_notification(detection_id)
return
except Exception as e:
if "DoesNotExist" in type(e).__name__ and attempt < max_retries:
logger.warning(
f"Detection {detection_id} not ready, "
f"retry {attempt + 1}/{max_retries}"
)
time.sleep(3)
else:
raise


def start_event_consumer():
"""Alert Service 도메인 이벤트 소비자 시작 (blocking)"""
broker_url = os.getenv("CELERY_BROKER_URL", "amqp://guest:guest@rabbitmq:5672//")
logger.info(f"Starting Alert Event Consumer on {broker_url}")

with Connection(broker_url) as conn:
consumer = AlertEventConsumer(conn)
consumer.run()
46 changes: 46 additions & 0 deletions core/events/publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""
AMQP Domain Event Publisher (Kombu)

Choreography 패턴에서 백엔드 서비스 간 도메인 이벤트 발행.
프로토콜 분리 원칙: IoT 경계는 MQTT, 서비스 간 이벤트는 AMQP.
"""

import json
import logging
import os

from kombu import Connection, Exchange

logger = logging.getLogger(__name__)

# 도메인 이벤트 교환기 (topic exchange: routing key 기반 선택적 구독)
DOMAIN_EVENTS_EXCHANGE = Exchange("domain_events", type="topic", durable=True)


def publish_event(routing_key: str, payload: dict):
"""
AMQP 도메인 이벤트 발행

Topic exchange를 사용하여 이벤트를 발행하면,
관심 있는 서비스가 routing key 패턴으로 독립적으로 구독한다.

Args:
routing_key: 이벤트 라우팅 키 (예: "detections.completed")
payload: 이벤트 페이로드
"""
broker_url = os.getenv("CELERY_BROKER_URL", "amqp://guest:guest@rabbitmq:5672//")

try:
with Connection(broker_url) as conn:
producer = conn.Producer()
producer.publish(
json.dumps(payload),
exchange=DOMAIN_EVENTS_EXCHANGE,
routing_key=routing_key,
content_type="application/json",
declare=[DOMAIN_EVENTS_EXCHANGE],
)
logger.info(f"Domain event published: {routing_key} -> {payload}")
except Exception as e:
logger.error(f"Failed to publish domain event {routing_key}: {e}")
raise
2 changes: 1 addition & 1 deletion core/firebase/fcm.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def initialize_firebase():
firebase_admin.initialize_app(cred)
logger.info(f"Firebase initialized with credentials: {cred_path}")
else:
# GOOGLE_APPLICATION_CREDENTIALS 사용
# ADC (Application Default Credentials) 사용
firebase_admin.initialize_app()
logger.info("Firebase initialized with default credentials")

Expand Down
2 changes: 1 addition & 1 deletion core/mqtt/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# MQTT Module
# MQTT Module (IoT Edge 전용)
from .subscriber import MQTTSubscriber

__all__ = ["MQTTSubscriber"]
Loading