Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .github/workflows/backend-Tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,24 @@ jobs:
--health-timeout 5s
--health-retries 5

redisdb:
image: redis:8.6.1-trixie
ports:
- 6379:6379
env:
- REDIS_PASSWORD=cmspass
- REDISCLI_AUTH=${REDIS_PASSWORD}
- REDIS_ARGS=--requirepass ${REDIS_PASSWORD}
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5

env:
DATABASE_URL: postgresql+psycopg://cms:cmspass@localhost:5432/cmstest
ALEMBIC_UPGRADE_HEAD_ON_START: false
REDIS_DATABASE_URL: redis://default:cmspass@redisdb:6379/0

steps:
- name: Retrieve source code
Expand Down Expand Up @@ -78,6 +93,7 @@ jobs:
--network host \
-e DATABASE_URL=$DATABASE_URL \
-e JWT_SECRET=DH8kSxcflUVfNRdkEiJJCn2dOOKI3qfw \
-e REDIS_DATABASE_URL=$REDIS_DATABASE_URL \
-p 8000:80 \
cms-backend-api:test
# wait for container to be ready
Expand All @@ -103,13 +119,15 @@ jobs:
-e DATABASE_URL=$DATABASE_URL \
-e QUARANTINE_WAREHOUSE_ID=11111111-1111-1111-1111-111111111111 \
-e STAGING_WAREHOUSE_ID=11111111-1111-1111-1111-111111111111 \
-e REDIS_DATABASE_URL=$REDIS_DATABASE_URL \
cms-backend-mill:test cms-mill --help

- name: Run shuttle container
run: |
docker run -d --name cms-shuttle-test \
--network host \
-e DATABASE_URL=$DATABASE_URL \
-e REDIS_DATABASE_URL=$REDIS_DATABASE_URL \
cms-backend-shuttle:test cms-shuttle --help

- name: Run tests
Expand Down
2 changes: 2 additions & 0 deletions backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ dependencies = [
"regex == 2025.10.23",
"humanfriendly == 10.0",
"Werkzeug == 3.1.5",
"redis[hiredis] == 7.2.1",
]
dynamic = ["version"]

Expand Down Expand Up @@ -76,6 +77,7 @@ Donate = "https://www.kiwix.org/en/support-us/"
[project.scripts]
cms-api = "cms_backend.api.main:app"
cms-mill = "cms_backend.mill.main:main"
cms-mill-stream-processor = "cms_backend.mill.stream_processor:main"
cms-shuttle = "cms_backend.shuttle.main:main"
create-initial-user = "cms_backend.utils.database:create_initial_user"
check-db-schema = "cms_backend.utils.database:check_if_schema_is_up_to_date"
Expand Down
5 changes: 4 additions & 1 deletion backend/src/cms_backend/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
RecordDisabledError,
RecordDoesNotExistError,
)
from cms_backend.redis.publisher import RedisPublisher
from cms_backend.utils.database import (
check_if_schema_is_up_to_date,
create_initial_user,
Expand All @@ -33,12 +34,14 @@


@asynccontextmanager
async def lifespan(_: FastAPI):
async def lifespan(app: FastAPI):
if Context.alembic_upgrade_head_on_start:
upgrade_db_schema()
check_if_schema_is_up_to_date()
create_initial_user()
app.state.redis_publisher = RedisPublisher()
yield
app.state.redis_publisher.close()


def create_app(*, debug: bool = True):
Expand Down
8 changes: 7 additions & 1 deletion backend/src/cms_backend/api/routes/dependencies.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Annotated, Literal

from fastapi import Depends
from fastapi import Depends, Request
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jwt import exceptions as jwt_exceptions
from sqlalchemy.orm import Session as OrmSession
Expand All @@ -15,6 +15,7 @@
create_user,
get_user_by_id_or_none,
)
from cms_backend.redis.publisher import RedisPublisher
from cms_backend.roles import RoleEnum

security = HTTPBearer(description="Access Token", auto_error=False)
Expand Down Expand Up @@ -121,3 +122,8 @@ def _check_permission(
return current_user

return _check_permission


def get_redis_publisher(request: Request) -> RedisPublisher:
"""Retrieve the redis event publisher"""
return request.app.state.redis_publisher
7 changes: 6 additions & 1 deletion backend/src/cms_backend/api/routes/titles.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from pydantic import model_validator
from sqlalchemy.orm import Session as OrmSession

from cms_backend.api.routes.dependencies import require_permission
from cms_backend.api.routes.dependencies import get_redis_publisher, require_permission
from cms_backend.api.routes.fields import LimitFieldMax200, NotEmptyString, SkipField
from cms_backend.api.routes.models import ListResponse, calculate_pagination_metadata
from cms_backend.db import gen_dbsession
Expand All @@ -15,6 +15,7 @@
from cms_backend.db.title import get_title_by_name as db_get_title_by_name
from cms_backend.db.title import get_titles as db_get_titles
from cms_backend.db.title import update_title as db_update_title
from cms_backend.redis.publisher import RedisPublisher
from cms_backend.schemas import BaseModel
from cms_backend.schemas.orms import (
BaseTitleCollectionSchema,
Expand Down Expand Up @@ -101,10 +102,12 @@ def get_title(
def create_title(
title_data: TitleCreateSchema,
session: OrmSession = Depends(gen_dbsession),
publisher: RedisPublisher = Depends(get_redis_publisher),
) -> TitleLightSchema:
"""Create a new title"""
title = db_create_title(
session,
publisher=publisher,
name=title_data.name,
maturity=title_data.maturity,
collection_titles=title_data.collection_titles,
Expand All @@ -124,10 +127,12 @@ def update_title(
title_id: UUID,
title_data: TitleUpdateSchema,
session: OrmSession = Depends(gen_dbsession),
publisher: RedisPublisher = Depends(get_redis_publisher),
) -> TitleLightSchema:
"""Update a title's maturity and/or collection_titles"""
title = db_update_title(
session,
publisher=publisher,
title_id=title_id,
name=title_data.name,
maturity=title_data.maturity,
Expand Down
6 changes: 5 additions & 1 deletion backend/src/cms_backend/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,12 @@ class Context:

debug: bool = parse_bool(os.getenv("DEBUG", "False"))

# URL to connect to the database
# URL to connect to the PostgreSQL database
database_url: str = get_mandatory_env("DATABASE_URL")
# URL to connect to connect to Redis database. Should be mandatory but not needed
# by shuttle
redis_database_url = os.getenv("REDIS_DATABASE_URL", default="")
mill_events_key = os.getenv("MILL_EVENTS_KEY", default="mill:events")

# should we run alembic migrations on startup
alembic_upgrade_head_on_start: bool = parse_bool(
Expand Down
4 changes: 3 additions & 1 deletion backend/src/cms_backend/db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ class Book(Base):
events: Mapped[list[str]] = mapped_column(init=False, default_factory=list)

title_id: Mapped[UUID | None] = mapped_column(ForeignKey("title.id"), init=False)
title: Mapped[Optional["Title"]] = relationship(init=False, foreign_keys=[title_id])
title: Mapped[Optional["Title"]] = relationship(
init=False, foreign_keys=[title_id], back_populates="books"
)

zimfarm_notification: Mapped[Optional["ZimfarmNotification"]] = relationship(
back_populates="book"
Expand Down
10 changes: 10 additions & 0 deletions backend/src/cms_backend/db/title.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from cms_backend.db.collection import get_collection_by_name
from cms_backend.db.exceptions import RecordAlreadyExistsError, RecordDoesNotExistError
from cms_backend.db.models import CollectionTitle, Title
from cms_backend.redis.publisher import RedisPublisher
from cms_backend.schemas.orms import (
BaseTitleCollectionSchema,
BookLightSchema,
Expand Down Expand Up @@ -130,6 +131,7 @@ def get_titles(

def create_title(
session: OrmSession,
publisher: RedisPublisher | None = None,
*,
name: str,
maturity: str | None,
Expand Down Expand Up @@ -169,11 +171,15 @@ def create_title(
logger.exception("Unknown exception encountered while creating title")
raise

if publisher:
publisher.publish_title_modified(str(title.id), title.name, action="created")

return title


def update_title(
session: OrmSession,
publisher: RedisPublisher | None = None,
*,
title_id: UUID,
maturity: str | None = None,
Expand All @@ -198,10 +204,12 @@ def update_title(
)

# Update name if provided
title_changed = False
if name and name != title.name:
old_name = title.name
title.name = name
title.events.append(f"{getnow()}: name updated from {old_name} to {name}")
title_changed = True

# Determine if collection titles changed
collection_titles_changed = False
Expand Down Expand Up @@ -280,4 +288,6 @@ def update_title(
f"{getnow()}: locations updated due to title collection change"
)

if title_changed and publisher:
publisher.publish_title_modified(str(title.id), title.name, action="updated")
return get_title_by_id(session, title_id=title.id)
3 changes: 3 additions & 0 deletions backend/src/cms_backend/mill/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,6 @@ class Context:
old_book_deletion_delay: timedelta = timedelta(
seconds=parse_timespan(os.getenv("OLD_BOOK_DELETION_DELAY", default="1d"))
)
stream_processor_consumer = os.getenv(
"STREAM_PROCESSOR_CONSUMER", default="mill-stream-processor"
)
94 changes: 94 additions & 0 deletions backend/src/cms_backend/mill/process_title_modifications.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
from uuid import UUID

from cms_backend import logger
from cms_backend.context import Context
from cms_backend.db import Session
from cms_backend.db.title import get_title_by_id
from cms_backend.mill.context import Context as MillContext
from cms_backend.mill.processors.title_modification import process_title_modification
from cms_backend.redis.consumer import RedisConsumer


def process_title_modifications_from_stream():
"""Process title modification events from Redis stream."""

consumer = RedisConsumer(
consumer_name=MillContext.stream_processor_consumer,
stream_key=Context.mill_events_key,
group_name="mill-stream-group",
)
logger.info("Starting title modification event processor...")

try:
while True:
events = consumer.read_events(block_ms=2000, count=10)

if not events:
logger.debug("No title modification events to process")
continue

logger.info(f"Processing {len(events)} title modification event(s)")

with Session.begin() as session:
for event in events:
try:
event_type = event.data.get("event_type")
if event_type != "TITLE_MODIFIED":
logger.warning(
f"Unknown event type '{event_type}' in message "
f"{event.message_id}, skipping"
)
consumer.acknowledge_event(event.message_id)
continue

title_id_str = event.data.get("title_id")
title_name = event.data.get("title_name")
action = event.data.get("action")

if not title_id_str or not title_name:
logger.error(
f"Missing title_id or title_name in event "
f"{event.message_id}, skipping"
)
consumer.acknowledge_event(event.message_id)
continue

logger.info(
f"Processing TITLE_MODIFIED event: {action} title "
f"'{title_name}' (ID: {title_id_str})"
)

with session.begin_nested():
try:
title = get_title_by_id(
session, title_id=UUID(title_id_str)
)
process_title_modification(session, title)
consumer.acknowledge_event(event.message_id)

logger.info(
f"Successfully processed event {event.message_id} "
f"for title '{title_name}' (ID: {title_id_str})"
)

except Exception:
logger.exception(
f"Error processing event {event.message_id} for "
f"title {title_id_str}"
)
# Should we acknowledge event?
continue

except Exception:
logger.exception(
f"Unexpected error processing event {event.message_id}"
)

except KeyboardInterrupt:
logger.info("Received interrupt signal, shutting down gracefully...")
except Exception as exc:
logger.exception(f"Fatal error in title modification processor: {exc}")
raise
finally:
consumer.close()
logger.info("Title modification event processor stopped")
35 changes: 35 additions & 0 deletions backend/src/cms_backend/mill/processors/title_modification.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from sqlalchemy import select
from sqlalchemy.orm import Session as ORMSession

from cms_backend import logger
from cms_backend.db.models import Book, Title
from cms_backend.mill.processors.book import process_book


def process_title_modification(session: ORMSession, title: Title):
"""Process books without titles when a title is created or modified.

When a title is created or when a title name is changed, we need to also
automatically process books without titles attached, looking for potential
new title/book match.
"""
books_without_title = session.scalars(
select(Book)
.where(
Book.title_id.is_(None), Book.has_error.is_(False), Book.name == title.name
)
.order_by(Book.created_at)
).all()

if not books_without_title:
logger.info(f"No books without title matching title '{title.name}'")
return

logger.info(
f"Found {len(books_without_title)} book(s) matching title '{title.name}'"
)

for book in books_without_title:
process_book(session, book)

logger.info(f"Completed processing books without title for title '{title.name}' ")
Loading