Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement manager worker architecture #15

Merged
merged 11 commits into from
May 15, 2024
59 changes: 50 additions & 9 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,23 +1,64 @@
FROM python:3.8
FROM postgis/postgis:14-3.3 AS builder
# This docker image is based on the bullseye operating system
# See: https://github.com/postgis/docker-postgis/blob/master/14-3.3/Dockerfile

ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
# Install libraries needed for GeoDjango and PostGIS
# See https://docs.djangoproject.com/en/3.2/ref/contrib/gis/install/geolibs/
RUN apt-get update && apt-get install -y \
binutils \
libproj-dev \
gdal-bin

# Install Postgres client to check liveness of the database
RUN apt-get update && apt-get install -y postgresql-client
RUN apt-get install -y postgresql-client

# Install osm2pgsql to load the osm-data into the database
RUN apt-get install -y osm2pgsql

# Install Python and a dependency for psycopg2
RUN apt-get install -y python3 python3-pip libpq-dev

# Install Poetry as the package manager for this application
RUN pip install poetry

WORKDIR /code

# Use the admin interface to check the health of the application
HEALTHCHECK --interval=10s --timeout=8s --start-period=20s --retries=10 \
CMD curl --fail http://localhost:8000/admin || exit 1

# Install Python dependencies separated from the
# run script to enable Docker caching
ADD pyproject.toml /code
# Install all dependencies
RUN poetry install --no-interaction --no-ansi --no-dev

ADD . /code
# Install CURL for healthcheck
RUN apt-get update && apt-get install -y curl

# Expose Django port, DO NOT EXPOSE THE DATABASE PORT!
EXPOSE 8000

COPY . /code

ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1

ENV POSTGRES_NAME=news-db
ENV POSTGRES_USER=news-user
ENV POSTGRES_PASSWORD=news-password
ENV POSTGRES_DB=news-db
ENV POSTGRES_HOST=localhost
ENV POSTGRES_PORT=5432

ENV HEALTHCHECK_TOKEN=healthcheck-token

# Use this argument to invalidate the cache of subsequent steps.
ARG CACHE_DATE=1970-01-01

# Make the postgres persistance dirs writable for every user
RUN chmod -R 777 /var/lib/postgresql/data

FROM builder AS production
ENV DJANGO_DEBUG_MODE=False
# Preheat our database, by running migrations and pre-loading data
RUN ./run-preheating.sh
HEALTHCHECK --interval=60s --timeout=10s --retries=5 --start-period=10s \
CMD curl --fail http://localhost:8000/healthcheck?token=healthcheck-token || exit 1
CMD "./run-server.sh"
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# News Service
A microservice to create and publish news to the PrioBike app.

## REST-API Endpoints
## WORKER REST-API Endpoints
- ```api/news```: Get all news articles
- optional query params:
- ```from```: Specifies the date(time) from which on new news articles shoud be returned.released on or before the ```from``` date(time).
Expand Down
51 changes: 32 additions & 19 deletions backend/backend/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

ALLOWED_HOSTS = ['*']

HEALTHCHECK_TOKEN = os.environ.get('HEALTHCHECK_TOKEN', 'healthcheck-token')

# One of the following: 'dev' / 'staging' / 'production'
FCM_PUSH_NOTIFICATION_ENVIRONMENT = os.environ.get('FCM_PUSH_NOTIFICATION_ENVIRONMENT', 'dev')
FCM_PUSH_NOTIFICATION_CONF = os.path.join(BASE_DIR.parent, "config/fcm-key.json")
Expand All @@ -29,9 +31,21 @@
# Detect whether it's a test run or not.
TESTING = sys.argv[1:2] == ['test']

# Detect whether we run in worker or manager mode.
WORKER_MODE = 'True' in os.environ.get('WORKER_MODE', 'False')
if not WORKER_MODE:
# Needed to find the workers.
WORKER_HOST = os.environ.get('WORKER_HOST')
else:
WORKER_HOST = None

# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = os.environ.get('DEBUG', 'True') == 'True'

SYNC_PORT = os.environ.get('SYNC_PORT', 8001)
SYNC_EXPOSED = 'True' in os.environ.get('SYNC_EXPOSED', 'False')
SYNC_KEY = os.environ.get('SYNC_KEY')

# The news service is deployed behind reverse NGINX proxies.
# Therefore, we set the admin url here so that it redirects to the correct browser path.
# By default, the admin site will be accessible under admin/
Expand All @@ -55,14 +69,16 @@

INSTALLED_APPS = [
'news',
'sync',

'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
]
if not WORKER_MODE:
INSTALLED_APPS.append('django.contrib.admin')

MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
Expand Down Expand Up @@ -97,25 +113,22 @@

# Database
# https://docs.djangoproject.com/en/4.0/ref/settings/#databases

if DEBUG:
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': BASE_DIR / 'db.sqlite3',
}
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',

'NAME': os.environ.get('POSTGRES_NAME'),
'USER': os.environ.get('POSTGRES_USER'),
'PASSWORD': os.environ.get('POSTGRES_PASSWORD'),
'HOST': os.environ.get('POSTGRES_HOST'),
'PORT': os.environ.get('POSTGRES_PORT'),
}
else:
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',

'NAME': os.environ.get('POSTGRES_NAME'),
'USER': os.environ.get('POSTGRES_USER'),
'PASSWORD': os.environ.get('POSTGRES_PASSWORD'),
'HOST': os.environ.get('POSTGRES_HOST'),
'PORT': os.environ.get('POSTGRES_PORT'),
}
}

if TESTING:
DATABASES['default'] = {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': BASE_DIR / 'db.sqlite3',
}


Expand Down
10 changes: 9 additions & 1 deletion backend/backend/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,15 @@
from django.contrib import admin
from django.urls import include, path

from backend.views import HealthcheckView, StatusView

urlpatterns = [
path('news/', include('news.urls')),
path(settings.ADMIN_URL, admin.site.urls),
path('status', StatusView.as_view(), name='status'),
path('healthcheck', HealthcheckView.as_view(), name='healthcheck'),
]
if not settings.WORKER_MODE:
urlpatterns.append(path(settings.ADMIN_URL, admin.site.urls))

if settings.SYNC_EXPOSED:
urlpatterns.append(path('sync/', include('sync.urls')))
46 changes: 46 additions & 0 deletions backend/backend/views.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from datetime import datetime

from django.apps import apps
from django.conf import settings
from django.http import JsonResponse
from django.utils.decorators import method_decorator
from django.views.decorators.csrf import csrf_exempt
from django.views.generic import View


class StatusView(View):
"""
View to get the status of the signal group selector.
"""

def get(self, request, *args, **kwargs):
"""
Handle the GET request.
"""
return JsonResponse({'status': 'ok'})


@method_decorator(csrf_exempt, name='dispatch')
class HealthcheckView(View):
"""
View to get the healthcheck of the signal group selector.
"""

def get(self, request, *args, **kwargs):
"""
Handle the GET request.
"""
token = settings.HEALTHCHECK_TOKEN
if token and token != request.GET.get('token'):
return JsonResponse({'status': 'unauthorized'}, status=401)

# Fetch all objects from all models.
# This will heat the cache and make sure
# the database is available.
now = datetime.now()
for model in apps.get_models():
model.objects.all()
time = (datetime.now() - now).total_seconds()
print(f'OK: Healthcheck took {time} seconds')

return JsonResponse({'status': 'ok', 'time': time})
109 changes: 106 additions & 3 deletions backend/news/models.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import hashlib
import json
import os
import socket

import requests
from django.conf import settings
from django.db import models
from django.db.models.signals import post_save
from django.db.transaction import atomic
from django.dispatch import receiver
from django.utils import timezone
from firebase_admin import delete_app, initialize_app
Expand Down Expand Up @@ -55,6 +58,103 @@ class Meta:
ordering = ['-pub_date']


def sync_from_content(data):
with atomic():
# Clear the database
Category.objects.all().delete()
NewsArticle.objects.all().delete()

# Load the categories contained in the response.
categories = data.get("categories")
for category in categories:
title = category.get("title")
try:
_, created = Category.objects.get_or_create(title=title)
if created:
print(f"Created category: {title}")
except Exception as err:
print(f"Error during sync: {err}")
raise err

# Load the articles contained in the response.
articles = data.get("articles")
for article in articles:
category_title = article.get("category")
article_title = article.get("title")
if category_title:
category = Category.objects.get(title=category_title)
else:
category = None
try:
_, created = NewsArticle.objects.get_or_create(
text=article.get("text"),
title=article_title,
pub_date=timezone.datetime.fromisoformat(article.get("pubDate")),
category=category,
)
except Exception as err:
print(f"Error during sync: {err}")
raise err
if created:
print(f"Created article: {article_title}")


def get_sync_content():
# Write a json that contains all news articles and categories.
data = {
'key': settings.SYNC_KEY,
'categories': [
{
'title': category.title,
} for category in Category.objects.all()
],
'articles': [
{
'text': article.text,
'title': article.title,
'pubDate': article.pub_date.isoformat(),
'category': article.category.title if article.category else None,
} for article in NewsArticle.objects.all()
]
}
return data


@receiver(post_save, sender=NewsArticle)
def sync_workers(sender, instance, created, **kwargs):
"""
Sync the new news article with the worker instances.
"""
# Lookup all workers using DNS.
if settings.WORKER_MODE:
return
if settings.TESTING:
return

host = settings.WORKER_HOST
port = settings.SYNC_PORT

worker_hosts = socket.getaddrinfo(host, port, proto=socket.IPPROTO_TCP)
worker_ips = [worker_host[4][0] for worker_host in worker_hosts]

data = get_sync_content()

# Fetch the status for now
for worker_ip in worker_ips:
print(f"Syncing with worker: {worker_ip}")
url = f"http://{worker_ip}:{port}/sync/sync"
response = requests.post(url, json=data)
# Parse the response as json
if response.status_code != 200:
print(f"Failed to sync with worker {worker_ip}: status {response.status_code}")
raise Exception(f"Failed to sync with worker {worker_ip}: status {response.status_code}")
status = json.loads(response.text).get('status')
if status != 'ok':
print(f"Failed to sync with worker {worker_ip}: {status}")
raise Exception(f"Failed to sync with worker {worker_ip}: {status}")
print(f"Synced with worker {worker_ip}: {status}")


@receiver(post_save, sender=NewsArticle)
def send_notification_for_news_article(sender, instance, created, **kwargs):
"""
Expand Down Expand Up @@ -91,8 +191,11 @@ def send_notification_for_news_article(sender, instance, created, **kwargs):
message = Message(notification=notification, topic=topic)

# Send a message to the devices subscribed to the provided topic.
response = send(message)
# Response is a message ID string.
print('Successfully sent FCM message:', response)
if settings.FCM_PUSH_NOTIFICATION_ENVIRONMENT == 'dev':
print('[DEV] Sending message to topic:', topic)
else:
response = send(message)
# Response is a message ID string.
print('Successfully sent FCM message:', response)

delete_app(app)
Empty file added backend/sync/__init__.py
Empty file.
6 changes: 6 additions & 0 deletions backend/sync/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from django.apps import AppConfig


class SyncAppConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'sync'
Loading
Loading