From f6406bf6987d65fc314020a92b50eeb7a1998820 Mon Sep 17 00:00:00 2001 From: Uchechukwu Orji Date: Thu, 5 Mar 2026 03:41:03 +0100 Subject: [PATCH 1/6] process events from Redis streams --- backend/pyproject.toml | 2 + backend/src/cms_backend/api/main.py | 5 +- .../cms_backend/api/routes/dependencies.py | 8 +- backend/src/cms_backend/api/routes/titles.py | 7 +- backend/src/cms_backend/context.py | 6 +- backend/src/cms_backend/db/models.py | 4 +- backend/src/cms_backend/db/title.py | 10 + backend/src/cms_backend/mill/context.py | 3 + .../mill/process_title_modifications.py | 94 +++++++ .../mill/processors/title_modification.py | 35 +++ .../src/cms_backend/mill/stream_processor.py | 46 ++++ backend/src/cms_backend/redis/__init__.py | 19 ++ backend/src/cms_backend/redis/consumer.py | 125 +++++++++ backend/src/cms_backend/redis/publisher.py | 60 ++++ backend/tests/api/routes/conftest.py | 9 +- backend/tests/conftest.py | 10 + backend/tests/mill/conftest.py | 26 ++ .../mill/test_process_title_modifications.py | 260 ++++++++++++++++++ dev/docker-compose.yml | 47 ++++ dev/redis.conf | 89 ++++++ 20 files changed, 858 insertions(+), 7 deletions(-) create mode 100644 backend/src/cms_backend/mill/process_title_modifications.py create mode 100644 backend/src/cms_backend/mill/processors/title_modification.py create mode 100644 backend/src/cms_backend/mill/stream_processor.py create mode 100644 backend/src/cms_backend/redis/__init__.py create mode 100644 backend/src/cms_backend/redis/consumer.py create mode 100644 backend/src/cms_backend/redis/publisher.py create mode 100644 backend/tests/mill/conftest.py create mode 100644 backend/tests/mill/test_process_title_modifications.py create mode 100644 dev/redis.conf diff --git a/backend/pyproject.toml b/backend/pyproject.toml index f77594c..599247c 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -25,6 +25,7 @@ dependencies = [ "regex == 2025.10.23", "humanfriendly == 10.0", "Werkzeug == 3.1.5", + "redis[hiredis] == 7.2.1", ] dynamic = ["version"] @@ -76,6 +77,7 @@ Donate = "https://www.kiwix.org/en/support-us/" [project.scripts] cms-api = "cms_backend.api.main:app" cms-mill = "cms_backend.mill.main:main" +cms-mill-stream-processor = "cms_backend.mill.stream_processor:main" cms-shuttle = "cms_backend.shuttle.main:main" create-initial-user = "cms_backend.utils.database:create_initial_user" check-db-schema = "cms_backend.utils.database:check_if_schema_is_up_to_date" diff --git a/backend/src/cms_backend/api/main.py b/backend/src/cms_backend/api/main.py index bd5f350..1cf510b 100644 --- a/backend/src/cms_backend/api/main.py +++ b/backend/src/cms_backend/api/main.py @@ -25,6 +25,7 @@ RecordDisabledError, RecordDoesNotExistError, ) +from cms_backend.redis.publisher import RedisPublisher from cms_backend.utils.database import ( check_if_schema_is_up_to_date, create_initial_user, @@ -33,12 +34,14 @@ @asynccontextmanager -async def lifespan(_: FastAPI): +async def lifespan(app: FastAPI): if Context.alembic_upgrade_head_on_start: upgrade_db_schema() check_if_schema_is_up_to_date() create_initial_user() + app.state.redis_publisher = RedisPublisher() yield + app.state.redis_publisher.close() def create_app(*, debug: bool = True): diff --git a/backend/src/cms_backend/api/routes/dependencies.py b/backend/src/cms_backend/api/routes/dependencies.py index 5ed0cec..f90c790 100644 --- a/backend/src/cms_backend/api/routes/dependencies.py +++ b/backend/src/cms_backend/api/routes/dependencies.py @@ -1,6 +1,6 @@ from typing import Annotated, Literal -from fastapi import Depends +from fastapi import Depends, Request from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from jwt import exceptions as jwt_exceptions from sqlalchemy.orm import Session as OrmSession @@ -15,6 +15,7 @@ create_user, get_user_by_id_or_none, ) +from cms_backend.redis.publisher import RedisPublisher from cms_backend.roles import RoleEnum security = HTTPBearer(description="Access Token", auto_error=False) @@ -121,3 +122,8 @@ def _check_permission( return current_user return _check_permission + + +def get_redis_publisher(request: Request) -> RedisPublisher: + """Retrieve the redis event publisher""" + return request.app.state.redis_publisher diff --git a/backend/src/cms_backend/api/routes/titles.py b/backend/src/cms_backend/api/routes/titles.py index 115c5f1..4c4b3cc 100644 --- a/backend/src/cms_backend/api/routes/titles.py +++ b/backend/src/cms_backend/api/routes/titles.py @@ -5,7 +5,7 @@ from pydantic import model_validator from sqlalchemy.orm import Session as OrmSession -from cms_backend.api.routes.dependencies import require_permission +from cms_backend.api.routes.dependencies import get_redis_publisher, require_permission from cms_backend.api.routes.fields import LimitFieldMax200, NotEmptyString, SkipField from cms_backend.api.routes.models import ListResponse, calculate_pagination_metadata from cms_backend.db import gen_dbsession @@ -15,6 +15,7 @@ from cms_backend.db.title import get_title_by_name as db_get_title_by_name from cms_backend.db.title import get_titles as db_get_titles from cms_backend.db.title import update_title as db_update_title +from cms_backend.redis.publisher import RedisPublisher from cms_backend.schemas import BaseModel from cms_backend.schemas.orms import ( BaseTitleCollectionSchema, @@ -101,10 +102,12 @@ def get_title( def create_title( title_data: TitleCreateSchema, session: OrmSession = Depends(gen_dbsession), + publisher: RedisPublisher = Depends(get_redis_publisher), ) -> TitleLightSchema: """Create a new title""" title = db_create_title( session, + publisher=publisher, name=title_data.name, maturity=title_data.maturity, collection_titles=title_data.collection_titles, @@ -124,10 +127,12 @@ def update_title( title_id: UUID, title_data: TitleUpdateSchema, session: OrmSession = Depends(gen_dbsession), + publisher: RedisPublisher = Depends(get_redis_publisher), ) -> TitleLightSchema: """Update a title's maturity and/or collection_titles""" title = db_update_title( session, + publisher=publisher, title_id=title_id, name=title_data.name, maturity=title_data.maturity, diff --git a/backend/src/cms_backend/context.py b/backend/src/cms_backend/context.py index 0135b70..c381be5 100644 --- a/backend/src/cms_backend/context.py +++ b/backend/src/cms_backend/context.py @@ -30,8 +30,12 @@ class Context: debug: bool = parse_bool(os.getenv("DEBUG", "False")) - # URL to connect to the database + # URL to connect to the PostgreSQL database database_url: str = get_mandatory_env("DATABASE_URL") + # URL to connect to connect to Redis database. Should be mandatory but not needed + # by shuttle + redis_database_url = os.getenv("REDIS_DATABASE_URL", default="") + mill_events_key = os.getenv("MILL_EVENTS_KEY", default="mill:events") # should we run alembic migrations on startup alembic_upgrade_head_on_start: bool = parse_bool( diff --git a/backend/src/cms_backend/db/models.py b/backend/src/cms_backend/db/models.py index 0d1b73d..7c04102 100644 --- a/backend/src/cms_backend/db/models.py +++ b/backend/src/cms_backend/db/models.py @@ -139,7 +139,9 @@ class Book(Base): events: Mapped[list[str]] = mapped_column(init=False, default_factory=list) title_id: Mapped[UUID | None] = mapped_column(ForeignKey("title.id"), init=False) - title: Mapped[Optional["Title"]] = relationship(init=False, foreign_keys=[title_id]) + title: Mapped[Optional["Title"]] = relationship( + init=False, foreign_keys=[title_id], back_populates="books" + ) zimfarm_notification: Mapped[Optional["ZimfarmNotification"]] = relationship( back_populates="book" diff --git a/backend/src/cms_backend/db/title.py b/backend/src/cms_backend/db/title.py index 404a694..b517785 100644 --- a/backend/src/cms_backend/db/title.py +++ b/backend/src/cms_backend/db/title.py @@ -15,6 +15,7 @@ from cms_backend.db.collection import get_collection_by_name from cms_backend.db.exceptions import RecordAlreadyExistsError, RecordDoesNotExistError from cms_backend.db.models import CollectionTitle, Title +from cms_backend.redis.publisher import RedisPublisher from cms_backend.schemas.orms import ( BaseTitleCollectionSchema, BookLightSchema, @@ -130,6 +131,7 @@ def get_titles( def create_title( session: OrmSession, + publisher: RedisPublisher | None = None, *, name: str, maturity: str | None, @@ -169,11 +171,15 @@ def create_title( logger.exception("Unknown exception encountered while creating title") raise + if publisher: + publisher.publish_title_modified(str(title.id), title.name, action="created") + return title def update_title( session: OrmSession, + publisher: RedisPublisher | None = None, *, title_id: UUID, maturity: str | None = None, @@ -198,10 +204,12 @@ def update_title( ) # Update name if provided + title_changed = False if name and name != title.name: old_name = title.name title.name = name title.events.append(f"{getnow()}: name updated from {old_name} to {name}") + title_changed = True # Determine if collection titles changed collection_titles_changed = False @@ -280,4 +288,6 @@ def update_title( f"{getnow()}: locations updated due to title collection change" ) + if title_changed and publisher: + publisher.publish_title_modified(str(title.id), title.name, action="updated") return get_title_by_id(session, title_id=title.id) diff --git a/backend/src/cms_backend/mill/context.py b/backend/src/cms_backend/mill/context.py index ecd7ad0..285b026 100644 --- a/backend/src/cms_backend/mill/context.py +++ b/backend/src/cms_backend/mill/context.py @@ -45,3 +45,6 @@ class Context: old_book_deletion_delay: timedelta = timedelta( seconds=parse_timespan(os.getenv("OLD_BOOK_DELETION_DELAY", default="1d")) ) + stream_processor_consumer = os.getenv( + "STREAM_PROCESSOR_CONSUMER", default="mill-stream-processor" + ) diff --git a/backend/src/cms_backend/mill/process_title_modifications.py b/backend/src/cms_backend/mill/process_title_modifications.py new file mode 100644 index 0000000..7281bb1 --- /dev/null +++ b/backend/src/cms_backend/mill/process_title_modifications.py @@ -0,0 +1,94 @@ +from uuid import UUID + +from cms_backend import logger +from cms_backend.context import Context +from cms_backend.db import Session +from cms_backend.db.title import get_title_by_id +from cms_backend.mill.context import Context as MillContext +from cms_backend.mill.processors.title_modification import process_title_modification +from cms_backend.redis.consumer import RedisConsumer + + +def process_title_modifications_from_stream(): + """Process title modification events from Redis stream.""" + + consumer = RedisConsumer( + consumer_name=MillContext.stream_processor_consumer, + stream_key=Context.mill_events_key, + group_name="mill-stream-group", + ) + logger.info("Starting title modification event processor...") + + try: + while True: + events = consumer.read_events(block_ms=2000, count=10) + + if not events: + logger.debug("No title modification events to process") + continue + + logger.info(f"Processing {len(events)} title modification event(s)") + + with Session.begin() as session: + for event in events: + try: + event_type = event.data.get("event_type") + if event_type != "TITLE_MODIFIED": + logger.warning( + f"Unknown event type '{event_type}' in message " + f"{event.message_id}, skipping" + ) + consumer.acknowledge_event(event.message_id) + continue + + title_id_str = event.data.get("title_id") + title_name = event.data.get("title_name") + action = event.data.get("action") + + if not title_id_str or not title_name: + logger.error( + f"Missing title_id or title_name in event " + f"{event.message_id}, skipping" + ) + consumer.acknowledge_event(event.message_id) + continue + + logger.info( + f"Processing TITLE_MODIFIED event: {action} title " + f"'{title_name}' (ID: {title_id_str})" + ) + + with session.begin_nested(): + try: + title = get_title_by_id( + session, title_id=UUID(title_id_str) + ) + process_title_modification(session, title) + consumer.acknowledge_event(event.message_id) + + logger.info( + f"Successfully processed event {event.message_id} " + f"for title '{title_name}' (ID: {title_id_str})" + ) + + except Exception: + logger.exception( + f"Error processing event {event.message_id} for " + f"title {title_id_str}" + ) + # Should we acknowledge event? + continue + + except Exception: + logger.exception( + f"Unexpected error processing event {event.message_id}" + ) + + except KeyboardInterrupt: + logger.info("Received interrupt signal, shutting down gracefully...") + except Exception as exc: + logger.exception(f"Fatal error in title modification processor: {exc}") + raise + finally: + consumer.close() + logger.info("Title modification event processor stopped") diff --git a/backend/src/cms_backend/mill/processors/title_modification.py b/backend/src/cms_backend/mill/processors/title_modification.py new file mode 100644 index 0000000..ffd1dfc --- /dev/null +++ b/backend/src/cms_backend/mill/processors/title_modification.py @@ -0,0 +1,35 @@ +from sqlalchemy import select +from sqlalchemy.orm import Session as ORMSession + +from cms_backend import logger +from cms_backend.db.models import Book, Title +from cms_backend.mill.processors.book import process_book + + +def process_title_modification(session: ORMSession, title: Title): + """Process books without titles when a title is created or modified. + + When a title is created or when a title name is changed, we need to also + automatically process books without titles attached, looking for potential + new title/book match. + """ + books_without_title = session.scalars( + select(Book) + .where( + Book.title_id.is_(None), Book.has_error.is_(False), Book.name == title.name + ) + .order_by(Book.created_at) + ).all() + + if not books_without_title: + logger.info(f"No books without title matching title '{title.name}'") + return + + logger.info( + f"Found {len(books_without_title)} book(s) matching title '{title.name}'" + ) + + for book in books_without_title: + process_book(session, book) + + logger.info(f"Completed processing books without title for title '{title.name}' ") diff --git a/backend/src/cms_backend/mill/stream_processor.py b/backend/src/cms_backend/mill/stream_processor.py new file mode 100644 index 0000000..d8230c1 --- /dev/null +++ b/backend/src/cms_backend/mill/stream_processor.py @@ -0,0 +1,46 @@ +import argparse +import logging +from time import sleep + +from cms_backend import logger +from cms_backend.__about__ import __version__ +from cms_backend.context import Context +from cms_backend.mill.context import Context as MillContext +from cms_backend.mill.process_title_modifications import ( + process_title_modifications_from_stream, +) +from cms_backend.utils.database import upgrade_db_schema + + +def main(): + parser = argparse.ArgumentParser(description="Process Redis stream events") + parser.add_argument( + "--version", + help="Show version and exit.", + action="version", + version="%(prog)s: " + __version__, + ) + parser.add_argument( + "--verbose", "-v", help="Show verbose output.", action="store_true" + ) + + args = parser.parse_args() + if args.verbose: + logger.setLevel(logging.DEBUG) + + logger.info("Title modification processor starting...") + + if Context.alembic_upgrade_head_on_start: + upgrade_db_schema() + + while True: + try: + logger.debug("Starting title modification processing cycle") + process_title_modifications_from_stream() + except KeyboardInterrupt: + logger.info("Received interrupt signal, shutting down...") + break + except Exception: + logger.exception("Error in title modification processor, retrying...") + logger.debug(f"Sleeping {MillContext.pause_in_the_loop}s before retry...") + sleep(MillContext.pause_in_the_loop) diff --git a/backend/src/cms_backend/redis/__init__.py b/backend/src/cms_backend/redis/__init__.py new file mode 100644 index 0000000..5daa09f --- /dev/null +++ b/backend/src/cms_backend/redis/__init__.py @@ -0,0 +1,19 @@ +import redis + +from cms_backend import logger +from cms_backend.context import Context + + +def get_redis_connection() -> redis.Redis: + """Get a connection to Reids.""" + try: + client = redis.Redis.from_url( # pyright: ignore[reportUnknownMemberType] + Context.redis_database_url, + decode_responses=True, + ) + client.ping() # pyright: ignore[reportUnknownMemberType] + except redis.RedisError as exc: + logger.error(f"Failed to connect to Redis: {exc}") + raise + + return client diff --git a/backend/src/cms_backend/redis/consumer.py b/backend/src/cms_backend/redis/consumer.py new file mode 100644 index 0000000..6cdc71a --- /dev/null +++ b/backend/src/cms_backend/redis/consumer.py @@ -0,0 +1,125 @@ +from dataclasses import dataclass +from typing import Any + +from cms_backend import logger +from cms_backend.redis import get_redis_connection + + +@dataclass +class Event: + """Model of Redis stream events.""" + + message_id: str + data: dict[str, Any] + stream_name: str + + +class RedisConsumer: + """Consumer for receiving events from API via Redis Streams. + + Uses Redis Streams consumer groups for reliable message processing. + Multiple Mill instances can share work via the same consumer group. + """ + + def __init__(self, *, consumer_name: str, stream_key: str, group_name: str): + """Initialize Redis consumer.""" + self.consumer_name = consumer_name + self.stream_key = stream_key + self.group_name = group_name + + self._redis_client = get_redis_connection() + self._check_backlog: bool = True + self._ensure_consumer_group() + + def _ensure_consumer_group(self): + """Create consumer group if it doesn't exist.""" + try: + self._redis_client.xgroup_create( + name=self.stream_key, + groupname=self.group_name, + id="0", # start from the first message in the queue + mkstream=True, + ) + logger.info(f"Created consumer group '{self.group_name}'") + except Exception as exc: + if "BUSYGROUP" in str(exc): + logger.debug(f"Consumer group '{self.group_name}' already exists") + else: + raise + + def read_events( + self, + *, + block_ms: int = 2000, + count: int = 10, + ) -> list[Event]: + """Read events from Redis Streams. + + First, check for pending messages (messages we read but didn't ACK) + If no pending, read new messages + + This ensures we never lose messages after a crash or restart. + """ + try: + # Check for pending messages (from this consumer) + if self._check_backlog: + stream_id = "0" # '0-0' means my pending messages + else: + stream_id = ">" # '>' means new messages + + result = self._redis_client.xreadgroup( + groupname=self.group_name, + consumername=self.consumer_name, + streams={self.stream_key: stream_id}, + count=count, + block=block_ms, + ) + + if not result: + logger.debug("Timeout! No new messages.") + return [] + + self._check_backlog = ( + False + if result + and len( + result[ # pyright: ignore[reportUnknownArgumentType, reportIndexIssue] + 0 + ][ + 1 + ] + ) + == 0 + else True + ) + return self._parse_stream_result(result) + + except Exception: + logger.exception("Error reading from Redis Stream") + return [] + + def _parse_stream_result(self, result: Any) -> list[Event]: + """Parse Redis XREADGROUP result into event dictionaries.""" + events: list[Event] = [] + # Result format: [('stream_name', [('msg_id', {'field': 'value'}), ...])] + for stream_name, messages in result: + for message_id, data in messages: + events.append( + Event(message_id=message_id, data=data, stream_name=stream_name) + ) + + return events + + def acknowledge_event(self, message_id: str): + """Acknowledge that an event was successfully processed.""" + try: + self._redis_client.xack(self.stream_key, self.group_name, message_id) + logger.info(f"Acknowledged message {message_id}") + except Exception: + logger.exception(f"Error acknowledging message {message_id}") + + def close(self): + """Close Redis connection.""" + if self._redis_client: + self._redis_client.close() + logger.info("Redis consumer closed") diff --git a/backend/src/cms_backend/redis/publisher.py b/backend/src/cms_backend/redis/publisher.py new file mode 100644 index 0000000..7e4b416 --- /dev/null +++ b/backend/src/cms_backend/redis/publisher.py @@ -0,0 +1,60 @@ +from cms_backend import logger +from cms_backend.context import Context +from cms_backend.redis import get_redis_connection +from cms_backend.utils.datetime import getnow + + +class RedisPublisher: + """Publisher for sending events to Mill via Redis Streams.""" + + _instance = None + _redis_client = None + + def __new__(cls): + """Singleton pattern to reuse Redis connection across requests.""" + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__(self): + """Initialize Redis client.""" + if self._redis_client is None: + self._redis_client = get_redis_connection() + logger.debug("Acquired redis connection.") + + def publish_title_modified( + self, + title_id: str, + title_name: str, + action: str, + ) -> str | None: + """Publish a title modification event to Redis Stream.""" + if self._redis_client is None: + raise ValueError("Redis client not connected.") + + try: + message_id = self._redis_client.xadd( + Context.mill_events_key, + { + "event_type": "TITLE_MODIFIED", + "title_id": title_id, + "title_name": title_name, + "action": action, + "timestamp": getnow().isoformat(), + }, + maxlen=10000, # Keep last 10k messages (prevent unbounded growth) + ) + logger.info( + f"Published title modification event to Redis: " + f"{action} '{title_name}' ({title_id}) - message_id: {message_id}" + ) + return message_id # pyright: ignore[reportReturnType] + except Exception as exc: + logger.exception(f"Failed to publish to Redis Stream: {exc}. ") + return None + + def close(self): + """Close Redis connection.""" + if self._redis_client: + self._redis_client.close() + logger.debug("Closed redis connection.") diff --git a/backend/tests/api/routes/conftest.py b/backend/tests/api/routes/conftest.py index c9737b6..08e9444 100644 --- a/backend/tests/api/routes/conftest.py +++ b/backend/tests/api/routes/conftest.py @@ -1,20 +1,25 @@ from collections.abc import Generator +from unittest.mock import MagicMock import pytest from fastapi.testclient import TestClient from sqlalchemy.orm import Session as OrmSession from cms_backend.api.main import app +from cms_backend.api.routes.dependencies import get_redis_publisher from cms_backend.db import gen_dbsession, gen_manual_dbsession @pytest.fixture -def client(dbsession: OrmSession) -> TestClient: +def client(dbsession: OrmSession, mock_redis_publisher: MagicMock) -> TestClient: def test_dbsession() -> Generator[OrmSession]: yield dbsession + def test_redis_publisher() -> MagicMock: + return mock_redis_publisher + # Replace the database session with the test dbsession app.dependency_overrides[gen_dbsession] = test_dbsession app.dependency_overrides[gen_manual_dbsession] = test_dbsession - + app.dependency_overrides[get_redis_publisher] = test_redis_publisher return TestClient(app=app) diff --git a/backend/tests/conftest.py b/backend/tests/conftest.py index f62eaf2..6504723 100644 --- a/backend/tests/conftest.py +++ b/backend/tests/conftest.py @@ -3,6 +3,7 @@ from datetime import datetime from pathlib import Path from typing import Any +from unittest.mock import MagicMock from uuid import UUID, uuid4 import pytest @@ -360,3 +361,12 @@ def access_token(user: User) -> str: issue_time=getnow(), user_id=str(user.id), ) + + +@pytest.fixture +def mock_redis_publisher() -> MagicMock: + """Mock RedisPublisher for testing without Redis connection.""" + mock = MagicMock() + mock.publish_title_modified = MagicMock(return_value="mock-message-id") + mock.close = MagicMock() + return mock diff --git a/backend/tests/mill/conftest.py b/backend/tests/mill/conftest.py new file mode 100644 index 0000000..0a78e6e --- /dev/null +++ b/backend/tests/mill/conftest.py @@ -0,0 +1,26 @@ +from unittest.mock import MagicMock + +import pytest + + +@pytest.fixture +def mock_redis_consumer(): + """Mock RedisConsumer for testing without Redis connection.""" + mock = MagicMock() + mock.read_events = MagicMock(return_value=[]) + mock.acknowledge_event = MagicMock() + mock.close = MagicMock() + return mock + + +@pytest.fixture +def mock_redis_client(): + """Mock Redis client for testing without Redis connection.""" + mock = MagicMock() + mock.ping = MagicMock(return_value=True) + mock.xadd = MagicMock(return_value="1234-0") + mock.xreadgroup = MagicMock(return_value=[]) + mock.xack = MagicMock(return_value=1) + mock.xgroup_create = MagicMock() + mock.close = MagicMock() + return mock diff --git a/backend/tests/mill/test_process_title_modifications.py b/backend/tests/mill/test_process_title_modifications.py new file mode 100644 index 0000000..ff7b56d --- /dev/null +++ b/backend/tests/mill/test_process_title_modifications.py @@ -0,0 +1,260 @@ +from collections.abc import Callable +from contextlib import contextmanager +from unittest.mock import MagicMock, patch +from uuid import UUID + +from sqlalchemy.orm import Session as OrmSession + +from cms_backend.db.models import Book, Title +from cms_backend.mill.process_title_modifications import ( + process_title_modifications_from_stream, +) +from cms_backend.redis.consumer import Event + + +@patch("cms_backend.mill.process_title_modifications.Session") +@patch("cms_backend.mill.process_title_modifications.RedisConsumer") +def test_process_title_modifications_single_event( + mock_consumer_class: MagicMock, + mock_session_class: MagicMock, + dbsession: OrmSession, + create_title: Callable[..., Title], + create_book: Callable[..., Book], + mock_redis_consumer: MagicMock, +): + """Test processing a single title modification event.""" + mock_consumer_class.return_value = mock_redis_consumer + + @contextmanager + def mock_session_begin(): + yield dbsession + + mock_session_class.begin = mock_session_begin + + # Create a title and a book without title + title = create_title(name="wikipedia_en_all") + book = create_book( + name="wikipedia_en_all", + date="2025-01", + zim_metadata={ + "Name": "wikipedia_en_all", + "Title": "Wikipedia", + "Creator": "Wikipedia Contributors", + "Publisher": "Kiwix", + "Date": "2025-01", + "Description": "Wikipedia Encyclopedia", + "Language": "eng", + }, + ) + book.title_id = None + dbsession.flush() + + event = Event( + message_id="1234-0", + data={ + "event_type": "TITLE_MODIFIED", + "title_id": str(title.id), + "title_name": "wikipedia_en_all", + "action": "created", + "timestamp": "2025-01-15T10:00:00", + }, + stream_name="mill:events", + ) + + # Set up mock to return one event, then raise KeyboardInterrupt + mock_redis_consumer.read_events.side_effect = [[event], KeyboardInterrupt()] + process_title_modifications_from_stream() + mock_redis_consumer.acknowledge_event.assert_called_once_with("1234-0") + mock_redis_consumer.close.assert_called_once() + + dbsession.refresh(book) + assert book.title_id == title.id + + +@patch("cms_backend.mill.process_title_modifications.Session") +@patch("cms_backend.mill.process_title_modifications.RedisConsumer") +def test_process_title_modifications_multiple_events( + mock_consumer_class: MagicMock, + mock_session_class: MagicMock, + dbsession: OrmSession, + create_title: Callable[..., Title], + create_book: Callable[..., Book], + mock_redis_consumer: MagicMock, +): + """Test processing multiple title modification events in one batch.""" + mock_consumer_class.return_value = mock_redis_consumer + + @contextmanager + def mock_session_begin(): + yield dbsession + + mock_session_class.begin = mock_session_begin + + title1 = create_title(name="wikipedia_en_all") + title2 = create_title(name="wikivoyage_fr_all") + + book1 = create_book( + name="wikipedia_en_all", + date="2025-01", + zim_metadata={ + "Name": "wikipedia_en_all", + "Title": "Wikipedia", + "Creator": "Wikipedia Contributors", + "Publisher": "Kiwix", + "Date": "2025-01", + "Description": "Wikipedia Encyclopedia", + "Language": "eng", + }, + ) + book1.title_id = None + + book2 = create_book( + name="wikivoyage_fr_all", + date="2025-01", + zim_metadata={ + "Name": "wikivoyage_fr_all", + "Title": "Wikivoyage", + "Creator": "Wikivoyage Contributors", + "Publisher": "Kiwix", + "Date": "2025-01", + "Description": "Wikivoyage Travel Guide", + "Language": "fra", + }, + ) + book2.title_id = None + + dbsession.flush() + + event1 = Event( + message_id="1234-0", + data={ + "event_type": "TITLE_MODIFIED", + "title_id": str(title1.id), + "title_name": "wikipedia_en_all", + "action": "created", + "timestamp": "2025-01-15T10:00:00", + }, + stream_name="mill:events", + ) + + event2 = Event( + message_id="1234-1", + data={ + "event_type": "TITLE_MODIFIED", + "title_id": str(title2.id), + "title_name": "wikivoyage_fr_all", + "action": "created", + "timestamp": "2025-01-15T10:01:00", + }, + stream_name="mill:events", + ) + + mock_redis_consumer.read_events.side_effect = [ + [event1, event2], + KeyboardInterrupt(), + ] + + process_title_modifications_from_stream() + + assert mock_redis_consumer.acknowledge_event.call_count == 2 + mock_redis_consumer.acknowledge_event.assert_any_call("1234-0") + mock_redis_consumer.acknowledge_event.assert_any_call("1234-1") + + dbsession.refresh(book1) + dbsession.refresh(book2) + assert book1.title_id == title1.id + assert book2.title_id == title2.id + + +@patch("cms_backend.mill.process_title_modifications.Session") +@patch("cms_backend.mill.process_title_modifications.RedisConsumer") +def test_process_title_modifications_unknown_event_type( + mock_consumer_class: MagicMock, + mock_session_class: MagicMock, + dbsession: OrmSession, + mock_redis_consumer: MagicMock, +): + """Test that unknown event types are skipped and acknowledged.""" + mock_consumer_class.return_value = mock_redis_consumer + + @contextmanager + def mock_session_begin(): + yield dbsession + + mock_session_class.begin = mock_session_begin + + event = Event( + message_id="1234-0", + data={ + "event_type": "UNKNOWN_EVENT", + "some_data": "value", + }, + stream_name="mill:events", + ) + mock_redis_consumer.read_events.side_effect = [[event], KeyboardInterrupt()] + process_title_modifications_from_stream() + mock_redis_consumer.acknowledge_event.assert_called_once_with("1234-0") + + +@patch("cms_backend.mill.process_title_modifications.Session") +@patch("cms_backend.mill.process_title_modifications.RedisConsumer") +def test_process_title_modifications_missing_title_id( + mock_consumer_class: MagicMock, + mock_session_class: MagicMock, + dbsession: OrmSession, + mock_redis_consumer: MagicMock, +): + """Test that events missing title_id are skipped and acknowledged.""" + mock_consumer_class.return_value = mock_redis_consumer + + @contextmanager + def mock_session_begin(): + yield dbsession + + mock_session_class.begin = mock_session_begin + + event = Event( + message_id="1234-0", + data={ + "event_type": "TITLE_MODIFIED", + "title_name": "wikipedia_en_all", + "action": "created", + }, + stream_name="mill:events", + ) + + mock_redis_consumer.read_events.side_effect = [[event], KeyboardInterrupt()] + process_title_modifications_from_stream() + mock_redis_consumer.acknowledge_event.assert_called_once_with("1234-0") + + +@patch("cms_backend.mill.process_title_modifications.Session") +@patch("cms_backend.mill.process_title_modifications.RedisConsumer") +def test_process_title_modifications_missing_title_name( + mock_consumer_class: MagicMock, + mock_session_class: MagicMock, + dbsession: OrmSession, + mock_redis_consumer: MagicMock, +): + """Test that events missing title_name are skipped and acknowledged.""" + mock_consumer_class.return_value = mock_redis_consumer + + @contextmanager + def mock_session_begin(): + yield dbsession + + mock_session_class.begin = mock_session_begin + + event = Event( + message_id="1234-0", + data={ + "event_type": "TITLE_MODIFIED", + "title_id": str(UUID("12345678-1234-5678-1234-567812345678")), + "action": "created", + }, + stream_name="mill:events", + ) + + mock_redis_consumer.read_events.side_effect = [[event], KeyboardInterrupt()] + process_title_modifications_from_stream() + mock_redis_consumer.acknowledge_event.assert_called_once_with("1234-0") diff --git a/dev/docker-compose.yml b/dev/docker-compose.yml index 23c08a9..f927f28 100644 --- a/dev/docker-compose.yml +++ b/dev/docker-compose.yml @@ -17,6 +17,25 @@ services: interval: 10s timeout: 5s retries: 3 + + redisdb: + image: redis:8.6.1-trixie + container_name: cms_redisdb + environment: + - REDIS_PASSWORD=cmspass + - REDISCLI_AUTH=${REDIS_PASSWORD} + - REDIS_ARGS=--requirepass ${REDIS_PASSWORD} + volumes: + - redis_data_cms:/data + - ./redis.conf:/usr/local/etc/redis/redis.conf + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 10s + timeout: 5s + retries: 3 + ports: + - 9736:6379 + api: build: context: ../backend @@ -36,6 +55,7 @@ services: DEBUG: 1 WEB_CONCURRENCY: 1 DATABASE_URL: postgresql+psycopg://cms:cmspass@postgresdb:5432/cms + REDIS_DATABASE_URL: redis://default:cmspass@redisdb:6379/0 ALLOWED_ORIGINS: http://localhost:37600,http://localhost:37603 OAUTH_JWKS_URI: https://login-staging.kiwix.org/.well-known/jwks.json OAUTH_ISSUER: https://login-staging.kiwix.org @@ -63,6 +83,8 @@ services: depends_on: postgresdb: condition: service_healthy + redisdb: + condition: service_healthy mill: build: context: ../backend @@ -75,6 +97,7 @@ services: environment: DEBUG: 1 DATABASE_URL: postgresql+psycopg://cms:cmspass@postgresdb:5432/cms + REDIS_DATABASE_URL: redis://default:cmspass@redisdb:6379/0 QUARANTINE_WAREHOUSE_ID: 11111111-1111-1111-1111-111111111111 QUARANTINE_BASE_PATH: quarantine STAGING_WAREHOUSE_ID: 11111111-1111-1111-1111-111111111111 @@ -82,6 +105,29 @@ services: depends_on: postgresdb: condition: service_healthy + + mill-streams-processor: + build: + context: ../backend + dockerfile: Dockerfile-mill + container_name: cms_mill_streams_processor + volumes: + - ../backend/maint-scripts:/app/maint-scripts + - ../backend/src/cms_backend:/usr/local/lib/python3.13/site-packages/cms_backend + - ./scripts:/scripts + environment: + DEBUG: 1 + DATABASE_URL: postgresql+psycopg://cms:cmspass@postgresdb:5432/cms + REDIS_DATABASE_URL: redis://default:cmspass@redisdb:6379/0 + QUARANTINE_WAREHOUSE_ID: 11111111-1111-1111-1111-111111111111 + QUARANTINE_BASE_PATH: quarantine + STAGING_WAREHOUSE_ID: 11111111-1111-1111-1111-111111111111 + STAGING_BASE_PATH: staging + depends_on: + postgresdb: + condition: service_healthy + command: cms-mill-stream-processor + shuttle: build: context: ../backend @@ -173,3 +219,4 @@ services: volumes: pg_data_cms: + redis_data_cms: diff --git a/dev/redis.conf b/dev/redis.conf new file mode 100644 index 0000000..827f413 --- /dev/null +++ b/dev/redis.conf @@ -0,0 +1,89 @@ +################################ SNAPSHOTTING ################################ + +# Save the DB to disk. +# +# save +# +# Redis will save the DB if both the given number of seconds and the given +# number of write operations against the DB occurred. +# +# Unless specified otherwise, by default Redis will save the DB: +# * After 3600 seconds (an hour) if at least 1 key changed +# * After 300 seconds (5 minutes) if at least 100 keys changed +# * After 60 seconds if at least 100 keys changed +# +# You can set these explicitly by uncommenting the three following lines. +# +save 3600 1 +save 300 100 +save 60 1000 + +# By default Redis will stop accepting writes if RDB snapshots are enabled +# (at least one save point) and the latest background save failed. +# This will make the user aware (in a hard way) that data is not persisting +# on disk properly, otherwise chances are that no one will notice and some +# disaster will happen. +# +# If the background saving process will start working again Redis will +# automatically allow writes again. +# +# However if you have setup your proper monitoring of the Redis server +# and persistence, you may want to disable this feature so that Redis will +# continue to work as usual even if there are problems with disk, +# permissions, and so forth. +stop-writes-on-bgsave-error yes + +# Compress string objects using LZF when dump .rdb databases? +# By default compression is enabled as it's almost always a win. +# If you want to save some CPU in the saving child set it to 'no' but +# the dataset will likely be bigger if you have compressible values or keys. +rdbcompression yes + +# Since version 5 of RDB a CRC64 checksum is placed at the end of the file. +# This makes the format more resistant to corruption but there is a performance +# hit to pay (around 10%) when saving and loading RDB files, so you can disable it +# for maximum performances. +# +# RDB files created with checksum disabled have a checksum of zero that will +# tell the loading code to skip the check. +rdbchecksum yes + +# +# AOF and RDB persistence can be enabled at the same time without problems. +# If the AOF is enabled on startup Redis will load the AOF, that is the file +# with the better durability guarantees. +# +# Please check https://redis.io/topics/persistence for more information. + +appendonly yes + +# The name of the append only file (default: "appendonly.aof") + +appendfilename "appendonly.aof" + +# The fsync() call tells the Operating System to actually write data on disk +# instead of waiting for more data in the output buffer. Some OS will really flush +# data on disk, some other OS will just try to do it ASAP. +# +# Redis supports three different modes: +# +# no: don't fsync, just let the OS flush the data when it wants. Faster. +# always: fsync after every write to the append only log. Slow, Safest. +# everysec: fsync only one time every second. Compromise. +# +# The default is "everysec", as that's usually the right compromise between +# speed and data safety. It's up to you to understand if you can relax this to +# "no" that will let the operating system flush the output buffer when +# it wants, for better performances (but if you can live with the idea of +# some data loss consider the default persistence mode that's snapshotting), +# or on the contrary, use "always" that's very slow but a bit safer than +# everysec. +# +# More details please check the following article: +# http://antirez.com/post/redis-persistence-demystified.html +# +# If unsure, use "everysec". + +# appendfsync always +appendfsync everysec +# appendfsync no From f1d1c5d32c0e4fffa5aabd2231c637a185c7d581 Mon Sep 17 00:00:00 2001 From: Uchechukwu Orji Date: Thu, 5 Mar 2026 03:49:41 +0100 Subject: [PATCH 2/6] add REDIS_DATABASE env to CI --- backend/src/cms_backend/redis/consumer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/src/cms_backend/redis/consumer.py b/backend/src/cms_backend/redis/consumer.py index 6cdc71a..8e838a8 100644 --- a/backend/src/cms_backend/redis/consumer.py +++ b/backend/src/cms_backend/redis/consumer.py @@ -63,7 +63,7 @@ def read_events( try: # Check for pending messages (from this consumer) if self._check_backlog: - stream_id = "0" # '0-0' means my pending messages + stream_id = "0" # '0' means my pending messages else: stream_id = ">" # '>' means new messages From b13c86f44a3e9f74e2f61fa5fd26068ae0e6bc8c Mon Sep 17 00:00:00 2001 From: Uchechukwu Orji Date: Thu, 5 Mar 2026 03:54:27 +0100 Subject: [PATCH 3/6] add env var to ci containers --- .github/workflows/backend-Tests.yml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.github/workflows/backend-Tests.yml b/.github/workflows/backend-Tests.yml index 64b1e25..085beb8 100644 --- a/.github/workflows/backend-Tests.yml +++ b/.github/workflows/backend-Tests.yml @@ -34,6 +34,7 @@ jobs: env: DATABASE_URL: postgresql+psycopg://cms:cmspass@localhost:5432/cmstest ALEMBIC_UPGRADE_HEAD_ON_START: false + REDIS_DATABASE_URL: redis://default:cmspass@redisdb:6379/0 steps: - name: Retrieve source code @@ -78,6 +79,7 @@ jobs: --network host \ -e DATABASE_URL=$DATABASE_URL \ -e JWT_SECRET=DH8kSxcflUVfNRdkEiJJCn2dOOKI3qfw \ + -e REDIS_DATABASE_URL=REDIS_DATABASE_URL \ -p 8000:80 \ cms-backend-api:test # wait for container to be ready @@ -103,6 +105,7 @@ jobs: -e DATABASE_URL=$DATABASE_URL \ -e QUARANTINE_WAREHOUSE_ID=11111111-1111-1111-1111-111111111111 \ -e STAGING_WAREHOUSE_ID=11111111-1111-1111-1111-111111111111 \ + -e REDIS_DATABASE_URL=REDIS_DATABASE_URL \ cms-backend-mill:test cms-mill --help - name: Run shuttle container @@ -110,6 +113,7 @@ jobs: docker run -d --name cms-shuttle-test \ --network host \ -e DATABASE_URL=$DATABASE_URL \ + -e REDIS_DATABASE_URL=REDIS_DATABASE_URL \ cms-backend-shuttle:test cms-shuttle --help - name: Run tests From 8dcbc4cbe69e14eaaefa6e710bddb6d60daf1eab Mon Sep 17 00:00:00 2001 From: Uchechukwu Orji Date: Thu, 5 Mar 2026 04:00:00 +0100 Subject: [PATCH 4/6] fix bug in env var assignment --- .github/workflows/backend-Tests.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/backend-Tests.yml b/.github/workflows/backend-Tests.yml index 085beb8..e56324d 100644 --- a/.github/workflows/backend-Tests.yml +++ b/.github/workflows/backend-Tests.yml @@ -79,7 +79,7 @@ jobs: --network host \ -e DATABASE_URL=$DATABASE_URL \ -e JWT_SECRET=DH8kSxcflUVfNRdkEiJJCn2dOOKI3qfw \ - -e REDIS_DATABASE_URL=REDIS_DATABASE_URL \ + -e REDIS_DATABASE_URL=$REDIS_DATABASE_URL \ -p 8000:80 \ cms-backend-api:test # wait for container to be ready @@ -105,7 +105,7 @@ jobs: -e DATABASE_URL=$DATABASE_URL \ -e QUARANTINE_WAREHOUSE_ID=11111111-1111-1111-1111-111111111111 \ -e STAGING_WAREHOUSE_ID=11111111-1111-1111-1111-111111111111 \ - -e REDIS_DATABASE_URL=REDIS_DATABASE_URL \ + -e REDIS_DATABASE_URL=$REDIS_DATABASE_URL \ cms-backend-mill:test cms-mill --help - name: Run shuttle container @@ -113,7 +113,7 @@ jobs: docker run -d --name cms-shuttle-test \ --network host \ -e DATABASE_URL=$DATABASE_URL \ - -e REDIS_DATABASE_URL=REDIS_DATABASE_URL \ + -e REDIS_DATABASE_URL=$REDIS_DATABASE_URL \ cms-backend-shuttle:test cms-shuttle --help - name: Run tests From a4d7ed86cd23c801e67d257905d4df68ee9d8da1 Mon Sep 17 00:00:00 2001 From: Uchechukwu Orji Date: Thu, 5 Mar 2026 04:12:56 +0100 Subject: [PATCH 5/6] add redisc container to ci --- .github/workflows/backend-Tests.yml | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/.github/workflows/backend-Tests.yml b/.github/workflows/backend-Tests.yml index e56324d..e8fc2fa 100644 --- a/.github/workflows/backend-Tests.yml +++ b/.github/workflows/backend-Tests.yml @@ -31,6 +31,20 @@ jobs: --health-timeout 5s --health-retries 5 + redisdb: + image: redis:8.6.1-trixie + ports: + - 6379:6379 + env: + - REDIS_PASSWORD=cmspass + - REDISCLI_AUTH=${REDIS_PASSWORD} + - REDIS_ARGS=--requirepass ${REDIS_PASSWORD} + options: >- + --health-cmd "redis-cli ping" + --health-interval 10s + --health-timeout 5s + --health-retries 5 + env: DATABASE_URL: postgresql+psycopg://cms:cmspass@localhost:5432/cmstest ALEMBIC_UPGRADE_HEAD_ON_START: false From fd3717f00d44b91b05f484b8a3b262a0cdb04674 Mon Sep 17 00:00:00 2001 From: Uchechukwu Orji Date: Thu, 5 Mar 2026 11:03:09 +0100 Subject: [PATCH 6/6] use 0-0 instead of 0 to read unacknowledged messages --- backend/src/cms_backend/redis/consumer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/src/cms_backend/redis/consumer.py b/backend/src/cms_backend/redis/consumer.py index 8e838a8..e486663 100644 --- a/backend/src/cms_backend/redis/consumer.py +++ b/backend/src/cms_backend/redis/consumer.py @@ -63,7 +63,7 @@ def read_events( try: # Check for pending messages (from this consumer) if self._check_backlog: - stream_id = "0" # '0' means my pending messages + stream_id = "0-0" # '0-0' means my pending messages else: stream_id = ">" # '>' means new messages