From 1b53871c110595c3f2cdedff07b4ec549df8ea8a Mon Sep 17 00:00:00 2001 From: Xitee <59659167+Xitee1@users.noreply.github.com> Date: Mon, 16 Feb 2026 21:38:10 +0100 Subject: [PATCH 1/6] refactor: extract order business logic from API routes into service layer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The orders API route handlers contained all business logic inline — direct ORM operations, query building, and order merging logic — bypassing the service layer. This moves all domain logic into order_service.py, making routes thin HTTP wrappers that only handle request parsing and error translation. Co-Authored-By: Claude Opus 4.6 --- backend/app/api/orders.py | 215 +++++---------- backend/app/services/orders/order_service.py | 270 +++++++++++++++++++ 2 files changed, 340 insertions(+), 145 deletions(-) diff --git a/backend/app/api/orders.py b/backend/app/api/orders.py index 759971f..196eeaa 100644 --- a/backend/app/api/orders.py +++ b/backend/app/api/orders.py @@ -1,28 +1,33 @@ from fastapi import APIRouter, Depends, HTTPException, Query -from sqlalchemy import asc, desc, func, nullslast, select from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.orm import selectinload from typing import Optional from app.database import get_db from app.models.user import User -from app.models.order import Order -from app.models.order_state import OrderState -from app.schemas.order import OrderResponse, OrderDetailResponse, UpdateOrderRequest, LinkOrderRequest, CreateOrderRequest, OrderListResponse, OrderCountsResponse +from app.schemas.order import ( + OrderResponse, + OrderDetailResponse, + UpdateOrderRequest, + LinkOrderRequest, + CreateOrderRequest, + OrderListResponse, + OrderCountsResponse, +) from app.api.deps import get_current_user +from app.services.orders.order_service import ( + create_order as svc_create_order, + list_orders as svc_list_orders, + get_order_counts as svc_get_order_counts, + get_order_detail as svc_get_order_detail, + update_order as svc_update_order, + link_orders as svc_link_orders, + delete_order as svc_delete_order, + OrderNotFoundError, + InvalidSortError, +) router = APIRouter(prefix="/api/v1/orders", tags=["orders"]) -SORTABLE_COLUMNS = { - "order_number": Order.order_number, - "vendor_name": Order.vendor_name, - "carrier": Order.carrier, - "status": Order.status, - "order_date": Order.order_date, - "total_amount": Order.total_amount, - "updated_at": Order.updated_at, -} - @router.post("", response_model=OrderResponse, status_code=201) async def create_order( @@ -30,32 +35,7 @@ async def create_order( user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): - order = Order( - user_id=user.id, - vendor_name=req.vendor_name, - order_number=req.order_number, - tracking_number=req.tracking_number, - carrier=req.carrier, - vendor_domain=req.vendor_domain, - status=req.status, - order_date=req.order_date, - total_amount=req.total_amount, - currency=req.currency, - estimated_delivery=req.estimated_delivery, - items=[item.model_dump() for item in req.items] if req.items else None, - ) - db.add(order) - await db.flush() - - state = OrderState( - order_id=order.id, - status=req.status, - source_type="manual", - ) - db.add(state) - await db.commit() - await db.refresh(order) - return order + return await svc_create_order(db, user.id, req) @router.get("", response_model=OrderListResponse) @@ -69,39 +49,22 @@ async def list_orders( user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): - query = select(Order).where(Order.user_id == user.id) - - if status: - statuses = [s.strip() for s in status.split(",")] - query = query.where(Order.status.in_(statuses)) - if search: - search_filter = f"%{search}%" - query = query.where( - (Order.order_number.ilike(search_filter)) - | (Order.vendor_name.ilike(search_filter)) - | (Order.tracking_number.ilike(search_filter)) - | (Order.carrier.ilike(search_filter)) - | (Order.vendor_domain.ilike(search_filter)) + try: + result = await svc_list_orders( + db, user.id, + page=page, + per_page=per_page, + status=status, + search=search, + sort_by=sort_by, + sort_dir=sort_dir, ) - - count_query = select(func.count()).select_from(query.subquery()) - total = (await db.execute(count_query)).scalar() or 0 - - if sort_by not in SORTABLE_COLUMNS: - raise HTTPException(status_code=422, detail=f"Invalid sort_by. Must be one of: {', '.join(sorted(SORTABLE_COLUMNS))}") - if sort_dir not in ("asc", "desc"): - raise HTTPException(status_code=422, detail="Invalid sort_dir. Must be 'asc' or 'desc'") - - column = SORTABLE_COLUMNS[sort_by] - direction = asc if sort_dir == "asc" else desc - query = query.order_by(nullslast(direction(column))) - query = query.offset((page - 1) * per_page).limit(per_page) - result = await db.execute(query) - items = result.scalars().all() + except InvalidSortError as e: + raise HTTPException(status_code=422, detail=str(e)) return OrderListResponse( - items=[OrderResponse.model_validate(i) for i in items], - total=total, + items=[OrderResponse.model_validate(i) for i in result.items], + total=result.total, page=page, per_page=per_page, ) @@ -113,94 +76,56 @@ async def order_counts( user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): - query = select(Order.status, func.count()).where(Order.user_id == user.id) - - if search: - search_filter = f"%{search}%" - query = query.where( - (Order.order_number.ilike(search_filter)) - | (Order.vendor_name.ilike(search_filter)) - | (Order.tracking_number.ilike(search_filter)) - | (Order.carrier.ilike(search_filter)) - | (Order.vendor_domain.ilike(search_filter)) - ) - - query = query.group_by(Order.status) - result = await db.execute(query) - counts = dict(result.all()) - - total = sum(counts.values()) - return OrderCountsResponse( - total=total, - ordered=counts.get("ordered", 0), - shipment_preparing=counts.get("shipment_preparing", 0), - shipped=counts.get("shipped", 0), - in_transit=counts.get("in_transit", 0), - out_for_delivery=counts.get("out_for_delivery", 0), - delivered=counts.get("delivered", 0), - ) + counts = await svc_get_order_counts(db, user.id, search) + return OrderCountsResponse(**counts) @router.get("/{order_id}", response_model=OrderDetailResponse) -async def get_order(order_id: int, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)): - result = await db.execute( - select(Order).where(Order.id == order_id, Order.user_id == user.id).options(selectinload(Order.states)) - ) - order = result.scalar_one_or_none() - if not order: +async def get_order( + order_id: int, + user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + try: + return await svc_get_order_detail(db, user.id, order_id) + except OrderNotFoundError: raise HTTPException(status_code=404, detail="Order not found") - return order @router.patch("/{order_id}", response_model=OrderResponse) -async def update_order(order_id: int, req: UpdateOrderRequest, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)): - order = await db.get(Order, order_id) - if not order or order.user_id != user.id: +async def update_order( + order_id: int, + req: UpdateOrderRequest, + user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + try: + return await svc_update_order(db, user.id, order_id, req) + except OrderNotFoundError: raise HTTPException(status_code=404, detail="Order not found") - old_status = order.status - for field, value in req.model_dump(exclude_unset=True).items(): - setattr(order, field, value) - - # Create OrderState if status changed - if req.status and req.status != old_status: - state = OrderState( - order_id=order.id, - status=req.status, - source_type="manual", - ) - db.add(state) - - await db.commit() - await db.refresh(order) - return order @router.post("/{order_id}/link") -async def link_orders(order_id: int, req: LinkOrderRequest, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)): - source = await db.get(Order, order_id) - target = await db.get(Order, req.target_order_id) - if not source or source.user_id != user.id or not target or target.user_id != user.id: +async def link_orders( + order_id: int, + req: LinkOrderRequest, + user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + try: + source = await svc_link_orders(db, user.id, order_id, req.target_order_id) + except OrderNotFoundError: raise HTTPException(status_code=404, detail="Order not found") - if target.tracking_number and not source.tracking_number: - source.tracking_number = target.tracking_number - if target.carrier and not source.carrier: - source.carrier = target.carrier - if target.status and target.status != "ordered": - source.status = target.status - # Move states from target to source - result = await db.execute(select(OrderState).where(OrderState.order_id == target.id)) - for state in result.scalars().all(): - state.order_id = source.id - await db.delete(target) - await db.commit() - await db.refresh(source) return {"merged_into": source.id} @router.delete("/{order_id}", status_code=204) -async def delete_order(order_id: int, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)): - order = await db.get(Order, order_id) - if not order or order.user_id != user.id: +async def delete_order( + order_id: int, + user: User = Depends(get_current_user), + db: AsyncSession = Depends(get_db), +): + try: + await svc_delete_order(db, user.id, order_id) + except OrderNotFoundError: raise HTTPException(status_code=404, detail="Order not found") - await db.delete(order) - await db.commit() diff --git a/backend/app/services/orders/order_service.py b/backend/app/services/orders/order_service.py index 2fcd132..8a1cbf7 100644 --- a/backend/app/services/orders/order_service.py +++ b/backend/app/services/orders/order_service.py @@ -1,12 +1,21 @@ +from dataclasses import dataclass from datetime import date +from typing import Optional +from sqlalchemy import asc, desc, func, nullslast, select from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm import selectinload from app.models.order import Order from app.models.order_state import OrderState from app.modules.analysers.llm.service import EmailAnalysis +from app.schemas.order import CreateOrderRequest, UpdateOrderRequest +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + def _parse_date(value: str | None) -> date | None: if not value: return None @@ -16,6 +25,267 @@ def _parse_date(value: str | None) -> date | None: return None +_SORTABLE_COLUMNS = { + "order_number": Order.order_number, + "vendor_name": Order.vendor_name, + "carrier": Order.carrier, + "status": Order.status, + "order_date": Order.order_date, + "total_amount": Order.total_amount, + "updated_at": Order.updated_at, +} + + +def _apply_search_filter(query, search: str | None): + """Apply multi-field ILIKE search filter to an order query.""" + if not search: + return query + pattern = f"%{search}%" + return query.where( + (Order.order_number.ilike(pattern)) + | (Order.vendor_name.ilike(pattern)) + | (Order.tracking_number.ilike(pattern)) + | (Order.carrier.ilike(pattern)) + | (Order.vendor_domain.ilike(pattern)) + ) + + +class OrderNotFoundError(Exception): + """Raised when an order does not exist or the user is not authorised.""" + + +class InvalidSortError(ValueError): + """Raised when a caller supplies an unrecognised sort column/direction.""" + + +# --------------------------------------------------------------------------- +# Results +# --------------------------------------------------------------------------- + +@dataclass +class OrderListResult: + items: list[Order] + total: int + + +# --------------------------------------------------------------------------- +# Service functions – manual / API operations +# --------------------------------------------------------------------------- + +async def create_order( + db: AsyncSession, + user_id: int, + data: CreateOrderRequest, +) -> Order: + """Create an order from manual user input (API / UI).""" + order = Order( + user_id=user_id, + vendor_name=data.vendor_name, + order_number=data.order_number, + tracking_number=data.tracking_number, + carrier=data.carrier, + vendor_domain=data.vendor_domain, + status=data.status, + order_date=data.order_date, + total_amount=data.total_amount, + currency=data.currency, + estimated_delivery=data.estimated_delivery, + items=[item.model_dump() for item in data.items] if data.items else None, + ) + db.add(order) + await db.flush() + + state = OrderState( + order_id=order.id, + status=data.status, + source_type="manual", + ) + db.add(state) + await db.commit() + await db.refresh(order) + return order + + +async def list_orders( + db: AsyncSession, + user_id: int, + *, + page: int = 1, + per_page: int = 25, + status: Optional[str] = None, + search: Optional[str] = None, + sort_by: str = "order_date", + sort_dir: str = "desc", +) -> OrderListResult: + """List a user's orders with filtering, sorting, and pagination.""" + if sort_by not in _SORTABLE_COLUMNS: + raise InvalidSortError( + f"Invalid sort_by. Must be one of: {', '.join(sorted(_SORTABLE_COLUMNS))}" + ) + if sort_dir not in ("asc", "desc"): + raise InvalidSortError("Invalid sort_dir. Must be 'asc' or 'desc'") + + query = select(Order).where(Order.user_id == user_id) + + if status: + statuses = [s.strip() for s in status.split(",")] + query = query.where(Order.status.in_(statuses)) + + query = _apply_search_filter(query, search) + + # Total count (before pagination) + total = (await db.execute( + select(func.count()).select_from(query.subquery()) + )).scalar() or 0 + + # Sorting + pagination + column = _SORTABLE_COLUMNS[sort_by] + direction = asc if sort_dir == "asc" else desc + query = query.order_by(nullslast(direction(column))) + query = query.offset((page - 1) * per_page).limit(per_page) + + items = list((await db.execute(query)).scalars().all()) + return OrderListResult(items=items, total=total) + + +async def get_order_counts( + db: AsyncSession, + user_id: int, + search: Optional[str] = None, +) -> dict[str, int]: + """Return order counts grouped by status.""" + query = select(Order.status, func.count()).where(Order.user_id == user_id) + query = _apply_search_filter(query, search) + query = query.group_by(Order.status) + + counts = dict((await db.execute(query)).all()) + total = sum(counts.values()) + return { + "total": total, + "ordered": counts.get("ordered", 0), + "shipment_preparing": counts.get("shipment_preparing", 0), + "shipped": counts.get("shipped", 0), + "in_transit": counts.get("in_transit", 0), + "out_for_delivery": counts.get("out_for_delivery", 0), + "delivered": counts.get("delivered", 0), + } + + +async def get_order_detail( + db: AsyncSession, + user_id: int, + order_id: int, +) -> Order: + """Fetch a single order with its state history. + + Raises OrderNotFoundError when the order does not exist or belongs to + another user. + """ + result = await db.execute( + select(Order) + .where(Order.id == order_id, Order.user_id == user_id) + .options(selectinload(Order.states)) + ) + order = result.scalar_one_or_none() + if not order: + raise OrderNotFoundError + return order + + +async def update_order( + db: AsyncSession, + user_id: int, + order_id: int, + data: UpdateOrderRequest, +) -> Order: + """Apply a partial update to an order. + + Creates an OrderState entry when the status changes. + Raises OrderNotFoundError when the order does not exist or belongs to + another user. + """ + order = await db.get(Order, order_id) + if not order or order.user_id != user_id: + raise OrderNotFoundError + + old_status = order.status + for field, value in data.model_dump(exclude_unset=True).items(): + setattr(order, field, value) + + if data.status and data.status != old_status: + db.add(OrderState( + order_id=order.id, + status=data.status, + source_type="manual", + )) + + await db.commit() + await db.refresh(order) + return order + + +async def link_orders( + db: AsyncSession, + user_id: int, + source_id: int, + target_id: int, +) -> Order: + """Merge *target* order into *source* order. + + Copies missing tracking/carrier info, moves all state history, then + deletes the target order. + Raises OrderNotFoundError when either order does not exist or belongs + to another user. + """ + source = await db.get(Order, source_id) + target = await db.get(Order, target_id) + if ( + not source or source.user_id != user_id + or not target or target.user_id != user_id + ): + raise OrderNotFoundError + + if target.tracking_number and not source.tracking_number: + source.tracking_number = target.tracking_number + if target.carrier and not source.carrier: + source.carrier = target.carrier + if target.status and target.status != "ordered": + source.status = target.status + + # Move states from target to source + result = await db.execute( + select(OrderState).where(OrderState.order_id == target.id) + ) + for state in result.scalars().all(): + state.order_id = source.id + + await db.delete(target) + await db.commit() + await db.refresh(source) + return source + + +async def delete_order( + db: AsyncSession, + user_id: int, + order_id: int, +) -> None: + """Delete an order. + + Raises OrderNotFoundError when the order does not exist or belongs to + another user. + """ + order = await db.get(Order, order_id) + if not order or order.user_id != user_id: + raise OrderNotFoundError + await db.delete(order) + await db.commit() + + +# --------------------------------------------------------------------------- +# Service function – email-pipeline / automated operations +# --------------------------------------------------------------------------- + async def create_or_update_order( analysis: EmailAnalysis, user_id: int, From e4ac0d9cfe987e1337cbf46c56e5976a3c70845d Mon Sep 17 00:00:00 2001 From: Xitee <59659167+Xitee1@users.noreply.github.com> Date: Mon, 16 Feb 2026 21:49:44 +0100 Subject: [PATCH 2/6] refactor: extract shared utilities for API error handling and formatting Replace 43 duplicated unsafe error-handling blocks (using TypeScript `as` casts) across 19 files with type-safe getApiErrorMessage/getApiErrorStatus utilities that use axios.isAxiosError(). Extract 5 duplicated formatDate, 3 formatAmount, and formatTimeAgo/formatTimeUntil functions into shared utils/format.ts module. Co-Authored-By: Claude Opus 4.6 --- frontend/src/components/OrderFormModal.vue | 9 +-- .../analysers/llm/AdminLLMConfigView.vue | 10 ++- .../notify-email/UserNotifyEmailView.vue | 15 ++-- .../notify-webhook/UserNotifyWebhookView.vue | 18 ++--- .../email-global/AdminGlobalMailView.vue | 10 ++- .../email-global/UserForwardingView.vue | 7 +- .../email-user/AdminImapSettingsView.vue | 7 +- .../email-user/UserImapAccountsView.vue | 27 +++---- frontend/src/utils/api-error.ts | 22 ++++++ frontend/src/utils/format.ts | 71 +++++++++++++++++++ frontend/src/views/DashboardView.vue | 12 +--- frontend/src/views/HistoryView.vue | 12 +--- frontend/src/views/LoginView.vue | 7 +- frontend/src/views/OrderDetailView.vue | 27 +------ frontend/src/views/OrdersView.vue | 12 +--- frontend/src/views/ProfileView.vue | 16 ++--- frontend/src/views/VerifyEmailView.vue | 4 +- .../src/views/admin/AdminSmtpSettingsView.vue | 12 ++-- .../src/views/admin/QueueSettingsView.vue | 7 +- frontend/src/views/admin/StatusView.vue | 40 +---------- frontend/src/views/admin/UsersView.vue | 13 ++-- 21 files changed, 166 insertions(+), 192 deletions(-) create mode 100644 frontend/src/utils/api-error.ts create mode 100644 frontend/src/utils/format.ts diff --git a/frontend/src/components/OrderFormModal.vue b/frontend/src/components/OrderFormModal.vue index 501a394..a5fc500 100644 --- a/frontend/src/components/OrderFormModal.vue +++ b/frontend/src/components/OrderFormModal.vue @@ -270,6 +270,7 @@ import { ref } from 'vue' import { useI18n } from 'vue-i18n' import { useOrdersStore, type OrderDetail, type CreateOrderData } from '@/stores/orders' +import { getApiErrorMessage } from '@/utils/api-error' const props = defineProps<{ mode: 'create' | 'edit' @@ -402,10 +403,10 @@ async function submit() { emit('saved', o.id) } } catch (e: unknown) { - const err = e as { response?: { data?: { detail?: string } } } - error.value = - err.response?.data?.detail || - t(props.mode === 'create' ? 'orders.createFailed' : 'orderDetail.saveFailed') + error.value = getApiErrorMessage( + e, + t(props.mode === 'create' ? 'orders.createFailed' : 'orderDetail.saveFailed'), + ) } finally { submitting.value = false } diff --git a/frontend/src/modules/analysers/llm/AdminLLMConfigView.vue b/frontend/src/modules/analysers/llm/AdminLLMConfigView.vue index 9e65850..370f7c7 100644 --- a/frontend/src/modules/analysers/llm/AdminLLMConfigView.vue +++ b/frontend/src/modules/analysers/llm/AdminLLMConfigView.vue @@ -192,6 +192,7 @@ import { ref, computed, onMounted } from 'vue' import { useI18n } from 'vue-i18n' import api from '@/api/client' +import { getApiErrorMessage } from '@/utils/api-error' import ModuleHeader from '@/components/ModuleHeader.vue' import { useModulesStore } from '@/stores/modules' @@ -296,8 +297,7 @@ async function fetchConfig() { } } } catch (e: unknown) { - const err = e as { response?: { data?: { detail?: string } } } - loadError.value = err.response?.data?.detail || t('llm.loadFailed') + loadError.value = getApiErrorMessage(e, t('llm.loadFailed')) } finally { loading.value = false } @@ -327,8 +327,7 @@ async function handleSave() { saveSuccess.value = false }, 3000) } catch (e: unknown) { - const err = e as { response?: { data?: { detail?: string } } } - saveError.value = err.response?.data?.detail || t('llm.saveFailed') + saveError.value = getApiErrorMessage(e, t('llm.saveFailed')) } finally { saving.value = false } @@ -341,10 +340,9 @@ async function handleTest() { const res = await api.post('/modules/analysers/llm/test') testResult.value = res.data } catch (e: unknown) { - const err = e as { response?: { data?: { detail?: string } } } testResult.value = { success: false, - message: err.response?.data?.detail || t('llm.testFailed'), + message: getApiErrorMessage(e, t('llm.testFailed')), } } finally { testing.value = false diff --git a/frontend/src/modules/notifiers/notify-email/UserNotifyEmailView.vue b/frontend/src/modules/notifiers/notify-email/UserNotifyEmailView.vue index d1468a2..d86b001 100644 --- a/frontend/src/modules/notifiers/notify-email/UserNotifyEmailView.vue +++ b/frontend/src/modules/notifiers/notify-email/UserNotifyEmailView.vue @@ -189,6 +189,7 @@ import { ref, computed, onMounted } from 'vue' import { useI18n } from 'vue-i18n' import api from '@/api/client' +import { getApiErrorMessage, getApiErrorStatus } from '@/utils/api-error' const { t } = useI18n() @@ -233,9 +234,8 @@ async function fetchConfig() { events.value.tracking_update = eventList.includes('tracking_update') events.value.package_delivered = eventList.includes('package_delivered') } catch (e: unknown) { - const err = e as { response?: { status?: number; data?: { detail?: string } } } - if (err.response?.status !== 404) { - loadError.value = err.response?.data?.detail || t('notifications.saveFailed') + if (getApiErrorStatus(e) !== 404) { + loadError.value = getApiErrorMessage(e, t('notifications.saveFailed')) } } finally { loading.value = false @@ -255,8 +255,7 @@ async function handleSaveEmail() { verificationSent.value = false }, 5000) } catch (e: unknown) { - const err = e as { response?: { data?: { detail?: string } } } - emailError.value = err.response?.data?.detail || t('notifications.saveFailed') + emailError.value = getApiErrorMessage(e, t('notifications.saveFailed')) } finally { sendingVerification.value = false } @@ -269,8 +268,7 @@ async function handleToggle() { await api.put('/notifiers/notify-email/config/toggle', { enabled: newVal }) config.value.enabled = newVal } catch (e: unknown) { - const err = e as { response?: { data?: { detail?: string } } } - loadError.value = err.response?.data?.detail || t('notifications.saveFailed') + loadError.value = getApiErrorMessage(e, t('notifications.saveFailed')) } finally { togglingEnabled.value = false } @@ -288,8 +286,7 @@ async function handleSaveEvents() { eventsSaveSuccess.value = false }, 3000) } catch (e: unknown) { - const err = e as { response?: { data?: { detail?: string } } } - eventsError.value = err.response?.data?.detail || t('notifications.saveFailed') + eventsError.value = getApiErrorMessage(e, t('notifications.saveFailed')) } finally { savingEvents.value = false } diff --git a/frontend/src/modules/notifiers/notify-webhook/UserNotifyWebhookView.vue b/frontend/src/modules/notifiers/notify-webhook/UserNotifyWebhookView.vue index 9beba91..333b488 100644 --- a/frontend/src/modules/notifiers/notify-webhook/UserNotifyWebhookView.vue +++ b/frontend/src/modules/notifiers/notify-webhook/UserNotifyWebhookView.vue @@ -216,6 +216,7 @@ import { ref, onMounted } from 'vue' import { useI18n } from 'vue-i18n' import api from '@/api/client' +import { getApiErrorMessage, getApiErrorStatus } from '@/utils/api-error' const { t } = useI18n() @@ -264,9 +265,8 @@ async function fetchConfig() { events.value.tracking_update = eventList.includes('tracking_update') events.value.package_delivered = eventList.includes('package_delivered') } catch (e: unknown) { - const err = e as { response?: { status?: number; data?: { detail?: string } } } - if (err.response?.status !== 404) { - loadError.value = err.response?.data?.detail || t('notifications.saveFailed') + if (getApiErrorStatus(e) !== 404) { + loadError.value = getApiErrorMessage(e, t('notifications.saveFailed')) } } finally { loading.value = false @@ -280,8 +280,7 @@ async function handleToggle() { await api.put('/notifiers/notify-webhook/config/toggle', { enabled: newVal }) config.value.enabled = newVal } catch (e: unknown) { - const err = e as { response?: { data?: { detail?: string } } } - loadError.value = err.response?.data?.detail || t('notifications.saveFailed') + loadError.value = getApiErrorMessage(e, t('notifications.saveFailed')) } finally { togglingEnabled.value = false } @@ -304,8 +303,7 @@ async function handleSaveWebhook() { saveSuccess.value = false }, 3000) } catch (e: unknown) { - const err = e as { response?: { data?: { detail?: string } } } - saveError.value = err.response?.data?.detail || t('notifications.saveFailed') + saveError.value = getApiErrorMessage(e, t('notifications.saveFailed')) } finally { savingWebhook.value = false } @@ -323,8 +321,7 @@ async function handleSaveEvents() { eventsSaveSuccess.value = false }, 3000) } catch (e: unknown) { - const err = e as { response?: { data?: { detail?: string } } } - eventsError.value = err.response?.data?.detail || t('notifications.saveFailed') + eventsError.value = getApiErrorMessage(e, t('notifications.saveFailed')) } finally { savingEvents.value = false } @@ -344,8 +341,7 @@ async function handleTest() { testSuccess.value = false }, 5000) } catch (e: unknown) { - const err = e as { response?: { data?: { detail?: string } } } - testError.value = err.response?.data?.detail || t('modules.notify-webhook.testFailed') + testError.value = getApiErrorMessage(e, t('modules.notify-webhook.testFailed')) } finally { testing.value = false } diff --git a/frontend/src/modules/providers/email-global/AdminGlobalMailView.vue b/frontend/src/modules/providers/email-global/AdminGlobalMailView.vue index 56f549d..815d707 100644 --- a/frontend/src/modules/providers/email-global/AdminGlobalMailView.vue +++ b/frontend/src/modules/providers/email-global/AdminGlobalMailView.vue @@ -233,6 +233,7 @@ import { ref, computed, onMounted } from 'vue' import { useI18n } from 'vue-i18n' import api from '@/api/client' +import { getApiErrorMessage } from '@/utils/api-error' import ModuleHeader from '@/components/ModuleHeader.vue' import { useModulesStore } from '@/stores/modules' @@ -295,8 +296,7 @@ async function fetchSettings() { idleSupported.value = res.data.idle_supported ?? null } } catch (e: unknown) { - const err = e as { response?: { data?: { detail?: string } } } - loadError.value = err.response?.data?.detail || t('globalMail.saveFailed') + loadError.value = getApiErrorMessage(e, t('globalMail.saveFailed')) } finally { loading.value = false } @@ -338,8 +338,7 @@ async function handleSaveConnection() { // Fetch folders after saving connection await fetchFolders() } catch (e: unknown) { - const err = e as { response?: { data?: { detail?: string } } } - connectionSaveError.value = err.response?.data?.detail || t('globalMail.saveFailed') + connectionSaveError.value = getApiErrorMessage(e, t('globalMail.saveFailed')) } finally { savingConnection.value = false } @@ -365,8 +364,7 @@ async function handleSaveSettings() { settingsSaveSuccess.value = false }, 3000) } catch (e: unknown) { - const err = e as { response?: { data?: { detail?: string } } } - settingsSaveError.value = err.response?.data?.detail || t('globalMail.saveFailed') + settingsSaveError.value = getApiErrorMessage(e, t('globalMail.saveFailed')) } finally { savingSettings.value = false } diff --git a/frontend/src/modules/providers/email-global/UserForwardingView.vue b/frontend/src/modules/providers/email-global/UserForwardingView.vue index e6091f3..65a0822 100644 --- a/frontend/src/modules/providers/email-global/UserForwardingView.vue +++ b/frontend/src/modules/providers/email-global/UserForwardingView.vue @@ -70,6 +70,7 @@ import { ref, reactive, onMounted } from 'vue' import { useI18n } from 'vue-i18n' import api from '@/api/client' +import { getApiErrorMessage } from '@/utils/api-error' import { useSenderAddressesStore } from './store' const { t } = useI18n() @@ -98,8 +99,7 @@ async function handleAdd() { await store.addAddress(newEmail.value) newEmail.value = '' } catch (e: unknown) { - const err = e as { response?: { data?: { detail?: string } } } - error.value = err.response?.data?.detail || t('forwarding.addFailed') + error.value = getApiErrorMessage(e, t('forwarding.addFailed')) } finally { adding.value = false } @@ -110,8 +110,7 @@ async function handleDelete(id: number) { try { await store.removeAddress(id) } catch (e: unknown) { - const err = e as { response?: { data?: { detail?: string } } } - error.value = err.response?.data?.detail || t('forwarding.deleteFailed') + error.value = getApiErrorMessage(e, t('forwarding.deleteFailed')) } finally { deletingId.value = null } diff --git a/frontend/src/modules/providers/email-user/AdminImapSettingsView.vue b/frontend/src/modules/providers/email-user/AdminImapSettingsView.vue index 648f313..79a7b80 100644 --- a/frontend/src/modules/providers/email-user/AdminImapSettingsView.vue +++ b/frontend/src/modules/providers/email-user/AdminImapSettingsView.vue @@ -94,6 +94,7 @@ import { ref, computed, onMounted } from 'vue' import { useI18n } from 'vue-i18n' import api from '@/api/client' +import { getApiErrorMessage } from '@/utils/api-error' import ModuleHeader from '@/components/ModuleHeader.vue' import { useModulesStore } from '@/stores/modules' @@ -127,8 +128,7 @@ async function fetchSettings() { form.value.max_email_age_days = res.data.max_email_age_days form.value.check_uidvalidity = res.data.check_uidvalidity } catch (e: unknown) { - const err = e as { response?: { data?: { detail?: string } } } - loadError.value = err.response?.data?.detail || t('imap.loadFailed') + loadError.value = getApiErrorMessage(e, t('imap.loadFailed')) } finally { loading.value = false } @@ -145,8 +145,7 @@ async function handleSave() { saveSuccess.value = false }, 3000) } catch (e: unknown) { - const err = e as { response?: { data?: { detail?: string } } } - saveError.value = err.response?.data?.detail || t('imap.saveFailed') + saveError.value = getApiErrorMessage(e, t('imap.saveFailed')) } finally { saving.value = false } diff --git a/frontend/src/modules/providers/email-user/UserImapAccountsView.vue b/frontend/src/modules/providers/email-user/UserImapAccountsView.vue index 7b9c8b8..e1b8c64 100644 --- a/frontend/src/modules/providers/email-user/UserImapAccountsView.vue +++ b/frontend/src/modules/providers/email-user/UserImapAccountsView.vue @@ -565,6 +565,7 @@ import { ref, computed, onMounted } from 'vue' import { useI18n } from 'vue-i18n' import { useAccountsStore, type EmailAccount, type IMAPFolder, type WatchedFolder } from './store' +import { getApiErrorMessage, getApiErrorStatus } from '@/utils/api-error' const { t } = useI18n() const accountsStore = useAccountsStore() @@ -672,8 +673,7 @@ async function handleSubmit() { } closeForm() } catch (e: unknown) { - const err = e as { response?: { data?: { detail?: string } } } - formError.value = err.response?.data?.detail || t('accounts.saveFailed') + formError.value = getApiErrorMessage(e, t('accounts.saveFailed')) } finally { formSaving.value = false } @@ -691,10 +691,9 @@ async function handleTest(id: number) { : t('accounts.connectionTestFailed'), } } catch (e: unknown) { - const err = e as { response?: { data?: { detail?: string } } } testResults.value[id] = { success: false, - message: err.response?.data?.detail || t('accounts.connectionTestFailed'), + message: getApiErrorMessage(e, t('accounts.connectionTestFailed')), } } finally { testingId.value = null @@ -717,8 +716,7 @@ async function confirmDelete() { expandedId.value = null } } catch (e: unknown) { - const err = e as { response?: { data?: { detail?: string } } } - formError.value = err.response?.data?.detail || t('accounts.deleteFailed') + formError.value = getApiErrorMessage(e, t('accounts.deleteFailed')) } finally { deletingId.value = null } @@ -747,8 +745,7 @@ async function loadFolders(id: number) { availableFolders.value = folders watchedFolders.value = watched } catch (e: unknown) { - const err = e as { response?: { data?: { detail?: string } } } - folderError.value = err.response?.data?.detail || t('accounts.loadFoldersFailed') + folderError.value = getApiErrorMessage(e, t('accounts.loadFoldersFailed')) } finally { foldersLoading.value = false } @@ -777,8 +774,7 @@ async function handleOverrideChange( const idx = watchedFolders.value.findIndex((f) => f.id === wf.id) if (idx !== -1) watchedFolders.value[idx] = updated } catch (e: unknown) { - const err = e as { response?: { data?: { detail?: string } } } - folderError.value = err.response?.data?.detail || t('accounts.overrideUpdateFailed') + folderError.value = getApiErrorMessage(e, t('accounts.overrideUpdateFailed')) } } @@ -787,8 +783,7 @@ async function handleAddWatched(accountId: number, folderName: string) { const wf = await accountsStore.addWatchedFolder(accountId, folderName) watchedFolders.value.push(wf) } catch (e: unknown) { - const err = e as { response?: { data?: { detail?: string } } } - folderError.value = err.response?.data?.detail || t('accounts.addWatchedFailed') + folderError.value = getApiErrorMessage(e, t('accounts.addWatchedFailed')) } } @@ -797,8 +792,7 @@ async function handleRemoveWatched(accountId: number, folderId: number) { await accountsStore.removeWatchedFolder(accountId, folderId) watchedFolders.value = watchedFolders.value.filter((wf) => wf.id !== folderId) } catch (e: unknown) { - const err = e as { response?: { data?: { detail?: string } } } - folderError.value = err.response?.data?.detail || t('accounts.removeWatchedFailed') + folderError.value = getApiErrorMessage(e, t('accounts.removeWatchedFailed')) } } @@ -815,11 +809,10 @@ async function handleScan(accountId: number, folderId: number) { await accountsStore.scanFolder(accountId, folderId) folderError.value = '' } catch (e: unknown) { - const err = e as { response?: { status?: number; data?: { detail?: string } } } - if (err.response?.status === 409) { + if (getApiErrorStatus(e) === 409) { folderError.value = t('accounts.scanAlreadyRunning') } else { - folderError.value = err.response?.data?.detail || t('accounts.scanTriggered') + folderError.value = getApiErrorMessage(e, t('accounts.scanTriggered')) } } finally { scanningFolderId.value = null diff --git a/frontend/src/utils/api-error.ts b/frontend/src/utils/api-error.ts new file mode 100644 index 0000000..ca09417 --- /dev/null +++ b/frontend/src/utils/api-error.ts @@ -0,0 +1,22 @@ +import axios from 'axios' + +/** + * Extract the error message from an API error response. + * Uses axios.isAxiosError() for proper type narrowing instead of unsafe `as` casts. + */ +export function getApiErrorMessage(error: unknown, fallback: string): string { + if (axios.isAxiosError(error)) { + return error.response?.data?.detail || fallback + } + return fallback +} + +/** + * Extract the HTTP status code from an API error response. + */ +export function getApiErrorStatus(error: unknown): number | undefined { + if (axios.isAxiosError(error)) { + return error.response?.status + } + return undefined +} diff --git a/frontend/src/utils/format.ts b/frontend/src/utils/format.ts new file mode 100644 index 0000000..184e2a3 --- /dev/null +++ b/frontend/src/utils/format.ts @@ -0,0 +1,71 @@ +/** + * Format a date string as a short date (e.g. "Feb 16, 2026"). + */ +export function formatDate(dateStr: string): string { + const date = new Date(dateStr) + return date.toLocaleDateString(undefined, { month: 'short', day: 'numeric', year: 'numeric' }) +} + +/** + * Format a date string as date + time (e.g. "Feb 16, 2026, 2:30 PM"). + */ +export function formatDateTime(dateStr: string): string { + const date = new Date(dateStr) + return date.toLocaleDateString(undefined, { + month: 'short', + day: 'numeric', + year: 'numeric', + hour: 'numeric', + minute: '2-digit', + }) +} + +/** + * Format a monetary amount with currency symbol (e.g. "$12.99"). + */ +export function formatAmount(amount: number | null, currency: string | null): string { + if (amount === null) return '-' + const curr = currency || 'USD' + return new Intl.NumberFormat(undefined, { style: 'currency', currency: curr }).format(amount) +} + +/** + * Format an ISO date string as a relative time in the past (e.g. "5m ago"). + */ +export function formatTimeAgo(isoString: string | null): string { + if (!isoString) return '-' + const now = Date.now() + const then = new Date(isoString).getTime() + const diffMs = now - then + if (diffMs < 0) return 'just now' + + const diffSec = Math.floor(diffMs / 1000) + if (diffSec < 60) return 'just now' + + const diffMin = Math.floor(diffSec / 60) + if (diffMin < 60) return `${diffMin}m ago` + + const diffHour = Math.floor(diffMin / 60) + if (diffHour < 24) return `${diffHour}h ago` + + const diffDay = Math.floor(diffHour / 24) + return `${diffDay}d ago` +} + +/** + * Format an ISO date string as a relative time in the future (e.g. "2m 30s"). + */ +export function formatTimeUntil(isoString: string | null): string { + if (!isoString) return '-' + const now = Date.now() + const target = new Date(isoString).getTime() + const diffMs = target - now + if (diffMs <= 0) return 'now' + + const diffSec = Math.floor(diffMs / 1000) + const minutes = Math.floor(diffSec / 60) + const seconds = diffSec % 60 + + if (minutes > 0) return `${minutes}m ${seconds}s` + return `${seconds}s` +} diff --git a/frontend/src/views/DashboardView.vue b/frontend/src/views/DashboardView.vue index dd74e2f..ae72698 100644 --- a/frontend/src/views/DashboardView.vue +++ b/frontend/src/views/DashboardView.vue @@ -216,6 +216,7 @@ import { computed, onMounted } from 'vue' import { useI18n } from 'vue-i18n' import { useOrdersStore } from '@/stores/orders' +import { formatDate, formatAmount } from '@/utils/format' const { t, te } = useI18n() const ordersStore = useOrdersStore() @@ -256,17 +257,6 @@ function formatStatus(status: string): string { return status.replace(/_/g, ' ').replace(/\b\w/g, (c) => c.toUpperCase()) } -function formatDate(dateStr: string): string { - const date = new Date(dateStr) - return date.toLocaleDateString('en-US', { month: 'short', day: 'numeric', year: 'numeric' }) -} - -function formatAmount(amount: number | null, currency: string | null): string { - if (amount === null) return '-' - const curr = currency || 'USD' - return new Intl.NumberFormat('en-US', { style: 'currency', currency: curr }).format(amount) -} - onMounted(() => { ordersStore.fetchOrders() }) diff --git a/frontend/src/views/HistoryView.vue b/frontend/src/views/HistoryView.vue index 17844a6..d50dcdd 100644 --- a/frontend/src/views/HistoryView.vue +++ b/frontend/src/views/HistoryView.vue @@ -424,6 +424,7 @@ import { ref, computed, onMounted } from 'vue' import { useI18n } from 'vue-i18n' import { useQueueStore, type QueueItem, type QueueItemSummary } from '@/stores/queue' +import { formatDateTime as formatDate } from '@/utils/format' const { t } = useI18n() const queueStore = useQueueStore() @@ -553,17 +554,6 @@ function statusLabel(status: string): string { return labels[status] || status } -// --- Formatting --- - -function formatDate(dateStr: string): string { - const date = new Date(dateStr) - return ( - date.toLocaleDateString(undefined, { month: 'short', day: 'numeric', year: 'numeric' }) + - ' ' + - date.toLocaleTimeString(undefined, { hour: '2-digit', minute: '2-digit' }) - ) -} - // --- Lifecycle --- onMounted(() => { diff --git a/frontend/src/views/LoginView.vue b/frontend/src/views/LoginView.vue index c105402..246a925 100644 --- a/frontend/src/views/LoginView.vue +++ b/frontend/src/views/LoginView.vue @@ -161,6 +161,7 @@ import { ref } from 'vue' import { useRouter } from 'vue-router' import { useI18n } from 'vue-i18n' import { useAuthStore } from '@/stores/auth' +import { getApiErrorMessage } from '@/utils/api-error' const { t } = useI18n() const router = useRouter() @@ -191,8 +192,7 @@ async function handleSetup() { auth.setupCompleted = true router.push('/dashboard') } catch (e: unknown) { - const err = e as { response?: { data?: { detail?: string } } } - error.value = err.response?.data?.detail || t('login.setupFailed') + error.value = getApiErrorMessage(e, t('login.setupFailed')) } finally { loading.value = false } @@ -205,8 +205,7 @@ async function handleLogin() { await auth.login(username.value, password.value) router.push('/dashboard') } catch (e: unknown) { - const err = e as { response?: { data?: { detail?: string } } } - error.value = err.response?.data?.detail || t('login.loginFailed') + error.value = getApiErrorMessage(e, t('login.loginFailed')) } finally { loading.value = false } diff --git a/frontend/src/views/OrderDetailView.vue b/frontend/src/views/OrderDetailView.vue index a63d4ff..fdb38a9 100644 --- a/frontend/src/views/OrderDetailView.vue +++ b/frontend/src/views/OrderDetailView.vue @@ -291,6 +291,8 @@ import { useRoute, useRouter } from 'vue-router' import { useOrdersStore, type OrderDetail } from '@/stores/orders' import StatusBadge from '@/components/StatusBadge.vue' import OrderFormModal from '@/components/OrderFormModal.vue' +import { getApiErrorMessage } from '@/utils/api-error' +import { formatDate, formatDateTime, formatAmount } from '@/utils/format' const { t } = useI18n() const route = useRoute() @@ -323,36 +325,13 @@ async function confirmDelete() { await ordersStore.deleteOrder(order.value.id) router.push('/orders') } catch (e: unknown) { - const err = e as { response?: { data?: { detail?: string } } } - error.value = err.response?.data?.detail || t('orderDetail.deleteFailed') + error.value = getApiErrorMessage(e, t('orderDetail.deleteFailed')) showDeleteConfirm.value = false } finally { deleting.value = false } } -function formatDate(dateStr: string): string { - const date = new Date(dateStr) - return date.toLocaleDateString('en-US', { month: 'short', day: 'numeric', year: 'numeric' }) -} - -function formatDateTime(dateStr: string): string { - const date = new Date(dateStr) - return date.toLocaleDateString('en-US', { - month: 'short', - day: 'numeric', - year: 'numeric', - hour: 'numeric', - minute: '2-digit', - }) -} - -function formatAmount(amount: number | null, currency: string | null): string { - if (amount === null) return '-' - const curr = currency || 'USD' - return new Intl.NumberFormat('en-US', { style: 'currency', currency: curr }).format(amount) -} - onMounted(async () => { const id = Number(route.params.id) if (isNaN(id)) { diff --git a/frontend/src/views/OrdersView.vue b/frontend/src/views/OrdersView.vue index a5ef3cd..38c4e8f 100644 --- a/frontend/src/views/OrdersView.vue +++ b/frontend/src/views/OrdersView.vue @@ -220,6 +220,7 @@ import { useI18n } from 'vue-i18n' import { useOrdersStore } from '@/stores/orders' import StatusBadge from '@/components/StatusBadge.vue' import OrderFormModal from '@/components/OrderFormModal.vue' +import { formatDate, formatAmount } from '@/utils/format' const { t } = useI18n() const ordersStore = useOrdersStore() @@ -335,17 +336,6 @@ function onOrderCreated(_id: number) { loadCounts() } -function formatDate(dateStr: string): string { - const date = new Date(dateStr) - return date.toLocaleDateString('en-US', { month: 'short', day: 'numeric', year: 'numeric' }) -} - -function formatAmount(amount: number | null, currency: string | null): string { - if (amount === null) return '-' - const curr = currency || 'USD' - return new Intl.NumberFormat('en-US', { style: 'currency', currency: curr }).format(amount) -} - onMounted(() => { loadOrders() loadCounts() diff --git a/frontend/src/views/ProfileView.vue b/frontend/src/views/ProfileView.vue index 3a9c83b..d0bce0a 100644 --- a/frontend/src/views/ProfileView.vue +++ b/frontend/src/views/ProfileView.vue @@ -260,6 +260,8 @@ import { useI18n } from 'vue-i18n' import { useAuthStore } from '@/stores/auth' import api from '@/api/client' import ThemeToggle from '@/components/ThemeToggle.vue' +import { getApiErrorMessage } from '@/utils/api-error' +import { formatDate } from '@/utils/format' const { t, locale } = useI18n() const auth = useAuthStore() @@ -300,8 +302,7 @@ async function handleChangePassword() { pwSuccess.value = '' }, 3000) } catch (e: unknown) { - const err = e as { response?: { data?: { detail?: string } } } - pwError.value = err.response?.data?.detail || t('profile.updateFailed') + pwError.value = getApiErrorMessage(e, t('profile.updateFailed')) } finally { pwSaving.value = false } @@ -348,8 +349,7 @@ async function handleCreateKey() { showCreateForm.value = false await fetchKeys() } catch (e: unknown) { - const err = e as { response?: { data?: { detail?: string } } } - keyError.value = err.response?.data?.detail || t('profile.createKeyFailed') + keyError.value = getApiErrorMessage(e, t('profile.createKeyFailed')) } finally { keyCreating.value = false } @@ -381,13 +381,5 @@ function copyKey() { }, 2000) } -function formatDate(iso: string): string { - return new Date(iso).toLocaleDateString(undefined, { - year: 'numeric', - month: 'short', - day: 'numeric', - }) -} - onMounted(fetchKeys) diff --git a/frontend/src/views/VerifyEmailView.vue b/frontend/src/views/VerifyEmailView.vue index 9d6ea6f..fcef335 100644 --- a/frontend/src/views/VerifyEmailView.vue +++ b/frontend/src/views/VerifyEmailView.vue @@ -25,6 +25,7 @@ import { ref, onMounted } from 'vue' import { useRoute } from 'vue-router' import { useI18n } from 'vue-i18n' import api from '@/api/client' +import { getApiErrorMessage } from '@/utils/api-error' const route = useRoute() const { t } = useI18n() @@ -37,9 +38,8 @@ onMounted(async () => { await api.post(`/notifiers/notify-email/verify/${route.params.token}`) success.value = true } catch (e: unknown) { - const err = e as { response?: { data?: { detail?: string } } } error.value = - err.response?.data?.detail === 'Verification link expired' + getApiErrorMessage(e, '') === 'Verification link expired' ? t('modules.notify-email.verifyExpired') : t('modules.notify-email.verifyFailed') } finally { diff --git a/frontend/src/views/admin/AdminSmtpSettingsView.vue b/frontend/src/views/admin/AdminSmtpSettingsView.vue index abee6db..71c02b2 100644 --- a/frontend/src/views/admin/AdminSmtpSettingsView.vue +++ b/frontend/src/views/admin/AdminSmtpSettingsView.vue @@ -201,6 +201,7 @@ import { ref, onMounted } from 'vue' import { useI18n } from 'vue-i18n' import api from '@/api/client' +import { getApiErrorMessage, getApiErrorStatus } from '@/utils/api-error' const { t } = useI18n() @@ -243,9 +244,8 @@ async function fetchConfig() { form.value.sender_name = res.data.sender_name || '' } } catch (e: unknown) { - const err = e as { response?: { status?: number; data?: { detail?: string } } } - if (err.response?.status !== 404) { - loadError.value = err.response?.data?.detail || t('smtp.loadFailed') + if (getApiErrorStatus(e) !== 404) { + loadError.value = getApiErrorMessage(e, t('smtp.loadFailed')) } } finally { loading.value = false @@ -268,8 +268,7 @@ async function handleSave() { saveSuccess.value = false }, 3000) } catch (e: unknown) { - const err = e as { response?: { data?: { detail?: string } } } - saveError.value = err.response?.data?.detail || t('smtp.saveFailed') + saveError.value = getApiErrorMessage(e, t('smtp.saveFailed')) } finally { saving.value = false } @@ -286,8 +285,7 @@ async function handleTest() { testSuccess.value = false }, 5000) } catch (e: unknown) { - const err = e as { response?: { data?: { detail?: string } } } - testError.value = err.response?.data?.detail || t('smtp.testFailed') + testError.value = getApiErrorMessage(e, t('smtp.testFailed')) } finally { testing.value = false } diff --git a/frontend/src/views/admin/QueueSettingsView.vue b/frontend/src/views/admin/QueueSettingsView.vue index 136d6f1..f267e24 100644 --- a/frontend/src/views/admin/QueueSettingsView.vue +++ b/frontend/src/views/admin/QueueSettingsView.vue @@ -80,6 +80,7 @@ import { ref, onMounted } from 'vue' import { useI18n } from 'vue-i18n' import api from '@/api/client' +import { getApiErrorMessage } from '@/utils/api-error' const { t } = useI18n() @@ -102,8 +103,7 @@ async function fetchSettings() { form.value.max_age_days = res.data.max_age_days form.value.max_per_user = res.data.max_per_user } catch (e: unknown) { - const err = e as { response?: { data?: { detail?: string } } } - loadError.value = err.response?.data?.detail || t('queue.loadFailed') + loadError.value = getApiErrorMessage(e, t('queue.loadFailed')) } finally { loading.value = false } @@ -120,8 +120,7 @@ async function handleSave() { saveSuccess.value = false }, 3000) } catch (e: unknown) { - const err = e as { response?: { data?: { detail?: string } } } - saveError.value = err.response?.data?.detail || t('queue.saveFailed') + saveError.value = getApiErrorMessage(e, t('queue.saveFailed')) } finally { saving.value = false } diff --git a/frontend/src/views/admin/StatusView.vue b/frontend/src/views/admin/StatusView.vue index db13163..7d18c47 100644 --- a/frontend/src/views/admin/StatusView.vue +++ b/frontend/src/views/admin/StatusView.vue @@ -615,6 +615,8 @@ import { ref, computed, onMounted, onUnmounted } from 'vue' import { useI18n } from 'vue-i18n' import api from '@/api/client' +import { getApiErrorMessage } from '@/utils/api-error' +import { formatTimeAgo, formatTimeUntil } from '@/utils/format' const { t } = useI18n() @@ -742,41 +744,6 @@ let tickInterval: ReturnType | null = null // --- Helper Functions --- -function formatTimeAgo(isoString: string | null): string { - if (!isoString) return '-' - const now = Date.now() - const then = new Date(isoString).getTime() - const diffMs = now - then - if (diffMs < 0) return 'just now' - - const diffSec = Math.floor(diffMs / 1000) - if (diffSec < 60) return 'just now' - - const diffMin = Math.floor(diffSec / 60) - if (diffMin < 60) return `${diffMin}m ago` - - const diffHour = Math.floor(diffMin / 60) - if (diffHour < 24) return `${diffHour}h ago` - - const diffDay = Math.floor(diffHour / 24) - return `${diffDay}d ago` -} - -function formatTimeUntil(isoString: string | null): string { - if (!isoString) return '-' - const now = Date.now() - const target = new Date(isoString).getTime() - const diffMs = target - now - if (diffMs <= 0) return 'now' - - const diffSec = Math.floor(diffMs / 1000) - const minutes = Math.floor(diffSec / 60) - const seconds = diffSec % 60 - - if (minutes > 0) return `${minutes}m ${seconds}s` - return `${seconds}s` -} - function modeLabel(mode: string, folder?: FolderStatus): string { const labels: Record = { idle: t('system.modeIdle'), @@ -900,8 +867,7 @@ async function fetchStatus() { lastRefreshedAt.value = new Date() secondsSinceRefresh.value = 0 } catch (e: unknown) { - const err = e as { response?: { data?: { detail?: string } } } - error.value = err.response?.data?.detail || t('system.loadFailed') + error.value = getApiErrorMessage(e, t('system.loadFailed')) } finally { loading.value = false } diff --git a/frontend/src/views/admin/UsersView.vue b/frontend/src/views/admin/UsersView.vue index 6e855c5..72fdb49 100644 --- a/frontend/src/views/admin/UsersView.vue +++ b/frontend/src/views/admin/UsersView.vue @@ -220,6 +220,7 @@ import { ref, onMounted } from 'vue' import { useI18n } from 'vue-i18n' import { useAuthStore } from '@/stores/auth' import api from '@/api/client' +import { getApiErrorMessage } from '@/utils/api-error' const { t } = useI18n() @@ -257,8 +258,7 @@ async function fetchUsers() { const res = await api.get('/users') users.value = res.data } catch (e: unknown) { - const err = e as { response?: { data?: { detail?: string } } } - error.value = err.response?.data?.detail || t('users.loadFailed') + error.value = getApiErrorMessage(e, t('users.loadFailed')) } finally { loading.value = false } @@ -278,8 +278,7 @@ async function handleCreateUser() { users.value.push(res.data) closeForm() } catch (e: unknown) { - const err = e as { response?: { data?: { detail?: string } } } - formError.value = err.response?.data?.detail || t('users.createFailed') + formError.value = getApiErrorMessage(e, t('users.createFailed')) } finally { formSaving.value = false } @@ -291,8 +290,7 @@ async function toggleAdmin(user: User) { const idx = users.value.findIndex((u) => u.id === user.id) if (idx !== -1) users.value[idx] = res.data } catch (e: unknown) { - const err = e as { response?: { data?: { detail?: string } } } - error.value = err.response?.data?.detail || t('users.updateFailed') + error.value = getApiErrorMessage(e, t('users.updateFailed')) } } @@ -310,8 +308,7 @@ async function confirmDelete() { showDeleteConfirm.value = false deleteTarget.value = null } catch (e: unknown) { - const err = e as { response?: { data?: { detail?: string } } } - error.value = err.response?.data?.detail || t('users.deleteFailed') + error.value = getApiErrorMessage(e, t('users.deleteFailed')) showDeleteConfirm.value = false } finally { deleting.value = false From e4a90f39afc290e50c653fd1ddb89dd5128c2b58 Mon Sep 17 00:00:00 2001 From: Xitee <59659167+Xitee1@users.noreply.github.com> Date: Mon, 16 Feb 2026 22:05:13 +0100 Subject: [PATCH 3/6] refactor: replace blocking imaplib with async aioimaplib in route handlers The IMAP test_connection and list_folders endpoints used synchronous imaplib, which blocked the entire asyncio event loop during network I/O. Extract shared async IMAP utilities (connect, test, list folders, parse) into _shared/email/imap_utils.py using aioimaplib, eliminating both the event loop blocking bug and the duplicated IMAP logic between email-user and email-global routers. Co-Authored-By: Claude Opus 4.6 --- .../app/modules/_shared/email/imap_utils.py | 86 +++++++++++++++++++ .../modules/providers/email_global/router.py | 37 ++------ .../providers/email_user/user_router.py | 51 ++++------- backend/tests/test_global_mail_folders.py | 24 +++--- 4 files changed, 127 insertions(+), 71 deletions(-) create mode 100644 backend/app/modules/_shared/email/imap_utils.py diff --git a/backend/app/modules/_shared/email/imap_utils.py b/backend/app/modules/_shared/email/imap_utils.py new file mode 100644 index 0000000..f3a4a61 --- /dev/null +++ b/backend/app/modules/_shared/email/imap_utils.py @@ -0,0 +1,86 @@ +import logging +from dataclasses import dataclass + +from aioimaplib import IMAP4, IMAP4_SSL + +logger = logging.getLogger(__name__) + + +@dataclass +class ImapTestResult: + success: bool + message: str + idle_supported: bool | None + + +@dataclass +class ImapFoldersResult: + folders: list[str] + idle_supported: bool + + +async def _connect( + host: str, port: int, user: str, password: str, use_ssl: bool, +) -> IMAP4_SSL | IMAP4: + """Connect and authenticate to an IMAP server.""" + if use_ssl: + imap = IMAP4_SSL(host=host, port=port) + else: + imap = IMAP4(host=host, port=port) + await imap.wait_hello_from_server() + await imap.login(user, password) + return imap + + +def _parse_folder_list(raw_folders: list[bytes | str]) -> list[str]: + """Parse IMAP LIST response lines into folder name strings. + + aioimaplib returns response lines as bytes. Handles formats like: + b'(\\HasNoChildren) "/" "INBOX"' + b'(\\HasNoChildren) "." INBOX' + """ + folders = [] + for item in raw_folders: + line = item.decode() if isinstance(item, bytes) else item + parts = line.rsplit('" "', 1) + if len(parts) == 2: + folders.append(parts[1].strip('"')) + else: + parts = line.rsplit(" ", 1) + folders.append(parts[-1].strip('"')) + return folders + + +async def test_imap_connection( + host: str, port: int, user: str, password: str, use_ssl: bool, +) -> ImapTestResult: + """Test IMAP connection and return whether IDLE is supported.""" + try: + imap = await _connect(host, port, user, password, use_ssl) + idle_supported = imap.has_capability("IDLE") + await imap.logout() + return ImapTestResult( + success=True, + message="Connection successful", + idle_supported=idle_supported, + ) + except Exception as e: + return ImapTestResult(success=False, message=str(e), idle_supported=None) + + +async def list_imap_folders( + host: str, port: int, user: str, password: str, use_ssl: bool, +) -> ImapFoldersResult: + """List available IMAP folders and check IDLE capability.""" + imap = await _connect(host, port, user, password, use_ssl) + try: + idle_supported = imap.has_capability("IDLE") + result = await imap.list('""', "*") + raw_folders = result.lines[:-1] if result.lines else [] + folders = _parse_folder_list(raw_folders) + return ImapFoldersResult(folders=folders, idle_supported=idle_supported) + finally: + try: + await imap.logout() + except Exception: + pass diff --git a/backend/app/modules/providers/email_global/router.py b/backend/app/modules/providers/email_global/router.py index 655b4d7..7485194 100644 --- a/backend/app/modules/providers/email_global/router.py +++ b/backend/app/modules/providers/email_global/router.py @@ -1,5 +1,3 @@ -import imaplib - from fastapi import APIRouter, Depends, HTTPException from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession @@ -7,6 +5,7 @@ from app.api.deps import get_admin_user from app.core.encryption import decrypt_value, encrypt_value from app.database import get_db +from app.modules._shared.email.imap_utils import list_imap_folders from app.modules.providers.email_global.models import GlobalMailConfig from app.modules.providers.email_global.schemas import ( GlobalMailConfigRequest, @@ -71,32 +70,14 @@ async def list_global_mail_folders( raise HTTPException(status_code=404, detail="Global mail not configured") try: password = decrypt_value(config.imap_password_encrypted) - if config.use_ssl: - mail = imaplib.IMAP4_SSL(config.imap_host, config.imap_port) - else: - mail = imaplib.IMAP4(config.imap_host, config.imap_port) - mail.login(config.imap_user, password) - try: - _, caps = mail.capability() - capabilities = caps[0].decode().upper().split() if caps else [] - idle_supported = "IDLE" in capabilities - - _, folder_list = mail.list() - finally: - try: - mail.logout() - except Exception: - pass - folders = [] - for item in folder_list: - decoded = item.decode() if isinstance(item, bytes) else item - parts = decoded.rsplit('" "', 1) - if len(parts) == 2: - folders.append(parts[1].strip('"')) - else: - parts = decoded.rsplit(" ", 1) - folders.append(parts[-1].strip('"')) - return GlobalMailFoldersResponse(folders=folders, idle_supported=idle_supported) + result = await list_imap_folders( + host=config.imap_host, + port=config.imap_port, + user=config.imap_user, + password=password, + use_ssl=config.use_ssl, + ) + return GlobalMailFoldersResponse(folders=result.folders, idle_supported=result.idle_supported) except HTTPException: raise except Exception as e: diff --git a/backend/app/modules/providers/email_user/user_router.py b/backend/app/modules/providers/email_user/user_router.py index 29f5380..9d848aa 100644 --- a/backend/app/modules/providers/email_user/user_router.py +++ b/backend/app/modules/providers/email_user/user_router.py @@ -1,5 +1,3 @@ -import imaplib - from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession @@ -7,6 +5,7 @@ from app.core.encryption import encrypt_value, decrypt_value from app.database import get_db from app.models.user import User +from app.modules._shared.email.imap_utils import test_imap_connection, list_imap_folders from app.modules.providers.email_user.models import EmailAccount, WatchedFolder from app.modules.providers.email_user.schemas import ( CreateAccountRequest, UpdateAccountRequest, AccountResponse, @@ -77,20 +76,15 @@ async def test_connection(account_id: int, user: User = Depends(get_current_user account = await db.get(EmailAccount, account_id) if not account or account.user_id != user.id: raise HTTPException(status_code=404, detail="Account not found") - try: - password = decrypt_value(account.imap_password_encrypted) - if account.use_ssl: - mail = imaplib.IMAP4_SSL(account.imap_host, account.imap_port) - else: - mail = imaplib.IMAP4(account.imap_host, account.imap_port) - mail.login(account.imap_user, password) - _, caps = mail.capability() - capabilities = caps[0].decode().upper().split() if caps else [] - idle_supported = "IDLE" in capabilities - mail.logout() - return {"success": True, "message": "Connection successful", "idle_supported": idle_supported} - except Exception as e: - return {"success": False, "message": str(e), "idle_supported": None} + password = decrypt_value(account.imap_password_encrypted) + result = await test_imap_connection( + host=account.imap_host, + port=account.imap_port, + user=account.imap_user, + password=password, + use_ssl=account.use_ssl, + ) + return {"success": result.success, "message": result.message, "idle_supported": result.idle_supported} @user_router.get("/accounts/{account_id}/folders", response_model=list[str]) @@ -100,23 +94,14 @@ async def list_folders(account_id: int, user: User = Depends(get_current_user), raise HTTPException(status_code=404, detail="Account not found") try: password = decrypt_value(account.imap_password_encrypted) - if account.use_ssl: - mail = imaplib.IMAP4_SSL(account.imap_host, account.imap_port) - else: - mail = imaplib.IMAP4(account.imap_host, account.imap_port) - mail.login(account.imap_user, password) - _, folder_list = mail.list() - mail.logout() - folders = [] - for item in folder_list: - decoded = item.decode() if isinstance(item, bytes) else item - parts = decoded.rsplit('" "', 1) - if len(parts) == 2: - folders.append(parts[1].strip('"')) - else: - parts = decoded.rsplit(" ", 1) - folders.append(parts[-1].strip('"')) - return folders + result = await list_imap_folders( + host=account.imap_host, + port=account.imap_port, + user=account.imap_user, + password=password, + use_ssl=account.use_ssl, + ) + return result.folders except Exception as e: raise HTTPException(status_code=400, detail=f"Failed to list folders: {e}") diff --git a/backend/tests/test_global_mail_folders.py b/backend/tests/test_global_mail_folders.py index 61e0594..45daee5 100644 --- a/backend/tests/test_global_mail_folders.py +++ b/backend/tests/test_global_mail_folders.py @@ -1,6 +1,8 @@ -from unittest.mock import patch, MagicMock +from unittest.mock import patch, AsyncMock import pytest +from app.modules._shared.email.imap_utils import ImapFoldersResult + @pytest.fixture async def admin_token(client): @@ -31,14 +33,13 @@ async def test_folders_returns_list(client, admin_token): headers=auth(admin_token), ) - mock_mail = MagicMock() - mock_mail.login.return_value = ("OK", []) - mock_mail.capability.return_value = ("OK", [b"IMAP4rev1 IDLE"]) - mock_mail.list.return_value = ("OK", [b'(\\HasNoChildren) "/" "INBOX"', b'(\\HasNoChildren) "/" "Sent"']) - mock_mail.logout.return_value = ("BYE", []) + mock_result = ImapFoldersResult(folders=["INBOX", "Sent"], idle_supported=True) - with patch("app.modules.providers.email_global.router.imaplib") as mock_imaplib: - mock_imaplib.IMAP4_SSL.return_value = mock_mail + with patch( + "app.modules.providers.email_global.router.list_imap_folders", + new_callable=AsyncMock, + return_value=mock_result, + ): resp = await client.get("/api/v1/modules/providers/email-global/folders", headers=auth(admin_token)) assert resp.status_code == 200 @@ -59,8 +60,11 @@ async def test_folders_connection_failure(client, admin_token): headers=auth(admin_token), ) - with patch("app.modules.providers.email_global.router.imaplib") as mock_imaplib: - mock_imaplib.IMAP4_SSL.side_effect = Exception("Connection refused") + with patch( + "app.modules.providers.email_global.router.list_imap_folders", + new_callable=AsyncMock, + side_effect=Exception("Connection refused"), + ): resp = await client.get("/api/v1/modules/providers/email-global/folders", headers=auth(admin_token)) assert resp.status_code == 400 From c9c1b0c80d7144bef6d6744e02a4d08361fe3e91 Mon Sep 17 00:00:00 2001 From: Xitee <59659167+Xitee1@users.noreply.github.com> Date: Tue, 17 Feb 2026 18:10:15 +0100 Subject: [PATCH 4/6] refactor: extract shared IMAP watch loop logic into imap_watch_loop.py Eliminate ~350 lines of duplicated IMAP watching logic between email_user and email_global services by extracting generic watch/idle/poll/fetch functions that accept provider-specific behavior via a callbacks dataclass. Co-Authored-By: Claude Opus 4.6 --- .../modules/_shared/email/imap_watch_loop.py | 326 ++++++++++++++++ .../modules/providers/email_global/service.py | 353 +++++------------ .../modules/providers/email_user/service.py | 356 +++++------------- backend/tests/test_imap_worker.py | 11 +- 4 files changed, 517 insertions(+), 529 deletions(-) create mode 100644 backend/app/modules/_shared/email/imap_watch_loop.py diff --git a/backend/app/modules/_shared/email/imap_watch_loop.py b/backend/app/modules/_shared/email/imap_watch_loop.py new file mode 100644 index 0000000..ae0ee65 --- /dev/null +++ b/backend/app/modules/_shared/email/imap_watch_loop.py @@ -0,0 +1,326 @@ +"""Shared IMAP watch loop logic used by both user and global email watchers. + +Provides generic async functions for IDLE/poll loops, email fetching, and +reconnection with exponential backoff. Provider-specific behavior is injected +via the ImapWatcherCallbacks dataclass. +""" + +import asyncio +import email as email_mod +import hashlib +import logging +from dataclasses import dataclass +from datetime import datetime, timedelta, timezone +from typing import Awaitable, Callable + +from aioimaplib import IMAP4_SSL, STOP_WAIT_SERVER_PUSH +from sqlalchemy.ext.asyncio import AsyncSession + +from app.database import async_session +from app.modules._shared.email.imap_client import decode_header_value, extract_body +from app.modules._shared.email.imap_watcher import ( + IDLE_TIMEOUT_SEC, + MAX_BACKOFF_SEC, + WorkerMode, + WorkerState, +) +from app.modules._shared.email.email_fetcher import check_dedup_and_enqueue + +logger = logging.getLogger(__name__) + + +def generate_fallback_message_id( + account_id: int, folder_path: str, uidvalidity: int | None, uid: int, +) -> str: + """Generate a deterministic fallback Message-ID for emails missing one.""" + uidvalidity_part = str(uidvalidity) if uidvalidity is not None else "no-uidvalidity" + folder_hash = hashlib.sha256(folder_path.encode()).hexdigest()[:16] + return f"fallback:{account_id}:{folder_hash}:{uidvalidity_part}:{uid}" + + +@dataclass +class ConnectResult: + """Result from a successful connect callback.""" + imap: IMAP4_SSL + idle_supported: bool + use_polling: bool + polling_interval_sec: int + + +@dataclass +class FetchContext: + """Per-cycle context for email fetching, loaded fresh each cycle.""" + last_seen_uid: int + folder_path: str + uidvalidity: int | None + max_email_age_days: int + source_info: str + source_label: str + account_id: int | None + + +# Type aliases for callbacks +RouteResult = tuple[int, str] | None # (user_id, source) or None to skip + + +@dataclass +class ImapWatcherCallbacks: + """Provider-specific behavior injected into the generic watch loop. + + connect: Open a DB session, validate liveness, connect to IMAP, + return ConnectResult or None to stop. + load_fetch_context: Given a DB session, return FetchContext for the current cycle. + route_email: Given sender email, DB session, return (user_id, source) + or None to skip the email. + save_uid: Persist the new last_seen_uid after processing an email. + log_label: Human-readable label for log messages (e.g. "folder 5"). + """ + connect: Callable[[AsyncSession], Awaitable[ConnectResult | None]] + load_fetch_context: Callable[[AsyncSession], Awaitable[FetchContext | None]] + route_email: Callable[[str, AsyncSession], Awaitable[RouteResult]] + save_uid: Callable[[int, AsyncSession], Awaitable[None]] + log_label: str + + +async def fetch_new_emails( + imap: IMAP4_SSL, + ctx: FetchContext, + callbacks: ImapWatcherCallbacks, + db: AsyncSession, + state: WorkerState | None, +) -> None: + """UID-search for new emails, process and enqueue them.""" + since_date = ( + datetime.now(timezone.utc) - timedelta(days=ctx.max_email_age_days) + ).strftime("%d-%b-%Y") + search_criteria = f"UID {ctx.last_seen_uid + 1}:* SINCE {since_date}" + _, data = await imap.uid_search(search_criteria) + uids = data[0].split() if data[0] else [] + + if state: + if uids: + state.mode = WorkerMode.PROCESSING + state.queue_total = len(uids) + state.last_activity_at = datetime.now(timezone.utc) + + for i, uid_bytes in enumerate(uids): + uid = int(uid_bytes) + if uid <= ctx.last_seen_uid: + continue + + _, msg_data = await imap.uid("fetch", str(uid), "(RFC822)") + if not msg_data or not msg_data[0]: + continue + + raw_email = None + for part in msg_data: + if isinstance(part, bytearray): + raw_email = bytes(part) + break + if raw_email is None: + continue + msg = email_mod.message_from_bytes(raw_email) + + subject = decode_header_value(msg.get("Subject", "")) + sender = decode_header_value(msg.get("From", "")) + message_id = msg.get("Message-ID", "") + if not message_id or not message_id.strip(): + message_id = generate_fallback_message_id( + ctx.account_id or 0, ctx.folder_path, ctx.uidvalidity, uid, + ) + body = extract_body(msg) + + email_date = None + try: + date_str = msg.get("Date", "") + if date_str: + from email.utils import parsedate_to_datetime + email_date = parsedate_to_datetime(date_str) + except Exception: + pass + + # Route: determine user_id + source, or skip + route = await callbacks.route_email(sender, db) + if route is None: + await callbacks.save_uid(uid, db) + continue + user_id, source = route + + if state: + state.queue_position = i + 1 + state.current_email_subject = subject + state.current_email_sender = sender + state.last_activity_at = datetime.now(timezone.utc) + + await check_dedup_and_enqueue( + message_id=message_id, + subject=subject, + sender=sender, + body=body, + email_date=email_date, + email_uid=uid, + user_id=user_id, + source_info=ctx.source_info, + account_id=ctx.account_id, + folder_path=ctx.folder_path, + source=source, + db=db, + ) + + await callbacks.save_uid(uid, db) + + if state: + state.last_scan_at = datetime.now(timezone.utc) + state.clear_queue() + + +async def idle_loop( + imap: IMAP4_SSL, + ctx: FetchContext, + callbacks: ImapWatcherCallbacks, + db: AsyncSession, + state: WorkerState | None, +) -> None: + """Persistent IDLE loop. Returns only on connection error (to trigger reconnect).""" + while True: + if state: + state.mode = WorkerMode.IDLE + state.next_scan_at = None + state.last_activity_at = datetime.now(timezone.utc) + + try: + idle_task = await imap.idle_start(timeout=IDLE_TIMEOUT_SEC) + server_msg = await imap.wait_server_push() + + if server_msg == STOP_WAIT_SERVER_PUSH: + imap.idle_done() + await asyncio.wait_for(idle_task, timeout=5) + continue + + imap.idle_done() + await asyncio.wait_for(idle_task, timeout=5) + + has_new = False + if isinstance(server_msg, list): + for line in server_msg: + line_str = line.decode() if isinstance(line, bytes) else str(line) + if "EXISTS" in line_str: + has_new = True + break + + if has_new: + await fetch_new_emails(imap, ctx, callbacks, db, state) + + except asyncio.CancelledError: + raise + except Exception as e: + logger.warning(f"IDLE loop error for {callbacks.log_label}: {e}") + try: + imap.idle_done() + except Exception: + pass + return + + +async def poll_loop( + callbacks: ImapWatcherCallbacks, + polling_interval_sec: int, + state: WorkerState | None, +) -> None: + """Polling loop. Disconnects between cycles.""" + interval = polling_interval_sec + while True: + if state: + state.mode = WorkerMode.POLLING + state.next_scan_at = datetime.now(timezone.utc) + timedelta(seconds=interval) + + await asyncio.sleep(interval) + + try: + async with async_session() as db: + connect_result = await callbacks.connect(db) + if connect_result is None: + return + + ctx = await callbacks.load_fetch_context(db) + if ctx is None: + return + + await fetch_new_emails( + connect_result.imap, ctx, callbacks, db, state, + ) + try: + await connect_result.imap.logout() + except Exception: + pass + + interval = connect_result.polling_interval_sec + + except asyncio.CancelledError: + raise + except Exception as e: + logger.warning(f"Poll cycle error for {callbacks.log_label}: {e}") + return + + +async def watch_loop( + callbacks: ImapWatcherCallbacks, + state: WorkerState | None, +) -> None: + """Top-level watch loop with exponential backoff on errors.""" + backoff = 30 + + while True: + if state: + state.mode = WorkerMode.CONNECTING + state.next_scan_at = None + state.clear_queue() + state.error = None + + try: + async with async_session() as db: + connect_result = await callbacks.connect(db) + if connect_result is None: + logger.info( + f"Stopping watcher for {callbacks.log_label}: " + "inactive or removed" + ) + return + + ctx = await callbacks.load_fetch_context(db) + if ctx is None: + return + + await fetch_new_emails( + connect_result.imap, ctx, callbacks, db, state, + ) + + backoff = 30 + + if not connect_result.use_polling and connect_result.idle_supported: + await idle_loop( + connect_result.imap, ctx, callbacks, db, state, + ) + else: + try: + await connect_result.imap.logout() + except Exception: + pass + await poll_loop( + callbacks, connect_result.polling_interval_sec, state, + ) + + except asyncio.CancelledError: + logger.info(f"Watcher cancelled for {callbacks.log_label}") + return + except Exception as e: + logger.error(f"Error watching {callbacks.log_label}: {e}") + if state: + state.mode = WorkerMode.ERROR_BACKOFF + state.error = str(e) + state.next_scan_at = datetime.now(timezone.utc) + timedelta( + seconds=backoff + ) + + await asyncio.sleep(backoff) + backoff = min(backoff * 2, MAX_BACKOFF_SEC) diff --git a/backend/app/modules/providers/email_global/service.py b/backend/app/modules/providers/email_global/service.py index a728752..87a9491 100644 --- a/backend/app/modules/providers/email_global/service.py +++ b/backend/app/modules/providers/email_global/service.py @@ -1,19 +1,20 @@ import asyncio -import email -import hashlib import logging -from datetime import datetime, timedelta, timezone -from aioimaplib import IMAP4_SSL, STOP_WAIT_SERVER_PUSH from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from app.database import async_session from app.models.imap_settings import ImapSettings from app.models.module_config import ModuleConfig -from app.modules._shared.email.imap_client import decode_header_value, extract_email_from_header, extract_body -from app.modules._shared.email.imap_watcher import WorkerMode, WorkerState, IDLE_TIMEOUT_SEC, MAX_BACKOFF_SEC -from app.modules._shared.email.email_fetcher import check_dedup_and_enqueue +from app.modules._shared.email.imap_client import extract_email_from_header +from app.modules._shared.email.imap_watcher import WorkerMode, WorkerState +from app.modules._shared.email.imap_watch_loop import ( + ConnectResult, + FetchContext, + ImapWatcherCallbacks, + watch_loop, +) from app.modules.providers.email_global.models import GlobalMailConfig, UserSenderAddress logger = logging.getLogger(__name__) @@ -26,272 +27,101 @@ def get_global_state() -> WorkerState | None: return _global_state -async def _fetch_global_emails( - imap: IMAP4_SSL, config: GlobalMailConfig, db, state: WorkerState | None, -) -> None: - """Fetch new emails from global inbox, gate on sender address.""" - max_age = 7 - result = await db.execute(select(ImapSettings)) - global_settings = result.scalar_one_or_none() - if global_settings: - max_age = global_settings.max_email_age_days +def _build_global_callbacks() -> ImapWatcherCallbacks: + """Build provider-specific callbacks for the global mail watcher.""" - since_date = (datetime.now(timezone.utc) - timedelta(days=max_age)).strftime("%d-%b-%Y") - search_criteria = f"UID {config.last_seen_uid + 1}:* SINCE {since_date}" - _, data = await imap.uid_search(search_criteria) - uids = data[0].split() if data[0] else [] + async def connect(db: AsyncSession) -> ConnectResult | None: + from aioimaplib import IMAP4_SSL + from app.core.encryption import decrypt_value - if state: - if uids: - state.mode = WorkerMode.PROCESSING - state.queue_total = len(uids) - state.last_activity_at = datetime.now(timezone.utc) - - for i, uid_bytes in enumerate(uids): - uid = int(uid_bytes) - if uid <= config.last_seen_uid: - continue - - _, msg_data = await imap.uid("fetch", str(uid), "(RFC822)") - if not msg_data or not msg_data[0]: - continue - - raw_email = None - for part in msg_data: - if isinstance(part, bytearray): - raw_email = bytes(part) - break - if raw_email is None: - continue - msg = email.message_from_bytes(raw_email) - - sender = decode_header_value(msg.get("From", "")) - sender_email = extract_email_from_header(sender) - - # Sender gate: look up in UserSenderAddress - result = await db.execute( - select(UserSenderAddress).where(UserSenderAddress.email_address == sender_email) + mod_result = await db.execute( + select(ModuleConfig).where(ModuleConfig.module_key == "email-global") ) - sender_addr = result.scalar_one_or_none() - if not sender_addr: - logger.info(f"Global mail: discarding email from unregistered sender: {sender_email}") - config.last_seen_uid = uid + module = mod_result.scalar_one_or_none() + if not module or not module.enabled: + logger.info("Stopping global mail watcher: module disabled") + return None + + result = await db.execute(select(GlobalMailConfig)) + config = result.scalar_one_or_none() + if not config: + logger.info("Stopping global mail watcher: inactive or removed") + return None + + password = decrypt_value(config.imap_password_encrypted) + imap = IMAP4_SSL(host=config.imap_host, port=config.imap_port) + await imap.wait_hello_from_server() + await imap.login(config.imap_user, password) + + idle_supported = imap.has_capability("IDLE") + if config.idle_supported != idle_supported: + config.idle_supported = idle_supported + if not idle_supported and not config.use_polling: + config.use_polling = True + logger.info("Global mail: IDLE not supported, forcing polling mode") await db.commit() - continue + await db.refresh(config) - subject = decode_header_value(msg.get("Subject", "")) - message_id = msg.get("Message-ID", "") - if not message_id or not message_id.strip(): - uidvalidity_part = str(config.uidvalidity) if config.uidvalidity is not None else "no-uidvalidity" - folder_hash = hashlib.sha256(config.watched_folder_path.encode()).hexdigest()[:16] - message_id = f"fallback:{config.id}:{folder_hash}:{uidvalidity_part}:{uid}" - body = extract_body(msg) + await imap.select(config.watched_folder_path) - email_date = None - try: - date_str = msg.get("Date", "") - if date_str: - from email.utils import parsedate_to_datetime - email_date = parsedate_to_datetime(date_str) - except Exception: - pass - - if state: - state.queue_position = i + 1 - state.current_email_subject = subject - state.current_email_sender = sender - state.last_activity_at = datetime.now(timezone.utc) - - enqueued = await check_dedup_and_enqueue( - message_id=message_id, - subject=subject, - sender=sender, - body=body, - email_date=email_date, - email_uid=uid, - user_id=sender_addr.user_id, - source_info=f"global / {config.watched_folder_path}", - account_id=None, - folder_path=config.watched_folder_path, - source="global_mail", - db=db, + return ConnectResult( + imap=imap, + idle_supported=idle_supported, + use_polling=config.use_polling, + polling_interval_sec=config.polling_interval_sec, ) - config.last_seen_uid = uid - await db.commit() - - if state: - state.last_scan_at = datetime.now(timezone.utc) - state.clear_queue() - - -async def _global_idle_loop( - imap: IMAP4_SSL, config: GlobalMailConfig, db, state: WorkerState | None, -) -> None: - """Persistent IDLE loop for global mail.""" - while True: - if state: - state.mode = WorkerMode.IDLE - state.next_scan_at = None - state.last_activity_at = datetime.now(timezone.utc) - - try: - idle_task = await imap.idle_start(timeout=IDLE_TIMEOUT_SEC) - - msg = await imap.wait_server_push() - - if msg == STOP_WAIT_SERVER_PUSH: - imap.idle_done() - await asyncio.wait_for(idle_task, timeout=5) - continue - - imap.idle_done() - await asyncio.wait_for(idle_task, timeout=5) - - has_new = False - if isinstance(msg, list): - for line in msg: - line_str = line.decode() if isinstance(line, bytes) else str(line) - if "EXISTS" in line_str: - has_new = True - break - - if has_new: - await _fetch_global_emails(imap, config, db, state) - - except asyncio.CancelledError: - raise - except Exception as e: - logger.warning(f"Global IDLE loop error: {e}") - try: - imap.idle_done() - except Exception: - pass - return - - -async def _global_poll_loop( - polling_interval_sec: int, state: WorkerState | None, -) -> None: - """Polling loop for global mail.""" - from app.core.encryption import decrypt_value - - interval = polling_interval_sec - while True: - if state: - state.mode = WorkerMode.POLLING - state.next_scan_at = datetime.now(timezone.utc) + timedelta(seconds=interval) - - await asyncio.sleep(interval) - - try: - async with async_session() as db: - mod_result = await db.execute( - select(ModuleConfig).where(ModuleConfig.module_key == "email-global") - ) - module = mod_result.scalar_one_or_none() - if not module or not module.enabled: - logger.info("Stopping global poll loop: module disabled") - return - - result = await db.execute(select(GlobalMailConfig)) - config = result.scalar_one_or_none() - if not config: - return - - password = decrypt_value(config.imap_password_encrypted) - imap = IMAP4_SSL(host=config.imap_host, port=config.imap_port) - await imap.wait_hello_from_server() - await imap.login(config.imap_user, password) - await imap.select(config.watched_folder_path) - - await _fetch_global_emails(imap, config, db, state) - try: - await imap.logout() - except Exception: - pass - - interval = config.polling_interval_sec - - except asyncio.CancelledError: - raise - except Exception as e: - logger.warning(f"Global poll cycle error: {e}") - return - - -async def _watch_global_folder(config: GlobalMailConfig) -> None: - """Watch the global IMAP folder using IDLE or polling.""" - from app.core.encryption import decrypt_value + async def load_fetch_context(db: AsyncSession) -> FetchContext | None: + result = await db.execute(select(GlobalMailConfig)) + config = result.scalar_one_or_none() + if not config: + return None - global _global_state - backoff = 30 + max_age = 7 + settings_result = await db.execute(select(ImapSettings)) + global_settings = settings_result.scalar_one_or_none() + if global_settings: + max_age = global_settings.max_email_age_days - while True: - state = _global_state - if state: - state.mode = WorkerMode.CONNECTING - state.next_scan_at = None - state.clear_queue() - state.error = None + return FetchContext( + last_seen_uid=config.last_seen_uid, + folder_path=config.watched_folder_path, + uidvalidity=config.uidvalidity, + max_email_age_days=max_age, + source_info=f"global / {config.watched_folder_path}", + source_label="global mail", + account_id=None, + ) - try: - async with async_session() as db: - mod_result = await db.execute( - select(ModuleConfig).where(ModuleConfig.module_key == "email-global") - ) - module = mod_result.scalar_one_or_none() - if not module or not module.enabled: - logger.info("Stopping global mail watcher: module disabled") - return - - result = await db.execute(select(GlobalMailConfig)) - config = result.scalar_one_or_none() - if not config: - logger.info("Stopping global mail watcher: inactive or removed") - return - - password = decrypt_value(config.imap_password_encrypted) - imap = IMAP4_SSL(host=config.imap_host, port=config.imap_port) - await imap.wait_hello_from_server() - await imap.login(config.imap_user, password) - - idle_supported = imap.has_capability("IDLE") - if config.idle_supported != idle_supported: - config.idle_supported = idle_supported - if not idle_supported and not config.use_polling: - config.use_polling = True - logger.info("Global mail: IDLE not supported, forcing polling mode") - await db.commit() - await db.refresh(config) - - await imap.select(config.watched_folder_path) - - await _fetch_global_emails(imap, config, db, state) - - backoff = 30 - - if not config.use_polling and idle_supported: - await _global_idle_loop(imap, config, db, state) - else: - try: - await imap.logout() - except Exception: - pass - await _global_poll_loop(config.polling_interval_sec, state) + async def route_email(sender: str, db: AsyncSession): + sender_email = extract_email_from_header(sender) + result = await db.execute( + select(UserSenderAddress).where( + UserSenderAddress.email_address == sender_email + ) + ) + sender_addr = result.scalar_one_or_none() + if not sender_addr: + logger.info( + f"Global mail: discarding email from unregistered sender: {sender_email}" + ) + return None + return (sender_addr.user_id, "global_mail") - except asyncio.CancelledError: - logger.info("Global mail watcher cancelled") - return - except Exception as e: - logger.error(f"Error watching global mail folder: {e}") - if state: - state.mode = WorkerMode.ERROR_BACKOFF - state.error = str(e) - state.next_scan_at = datetime.now(timezone.utc) + timedelta(seconds=backoff) + async def save_uid(uid: int, db: AsyncSession) -> None: + result = await db.execute(select(GlobalMailConfig)) + config = result.scalar_one_or_none() + if config: + config.last_seen_uid = uid + await db.commit() - await asyncio.sleep(backoff) - backoff = min(backoff * 2, MAX_BACKOFF_SEC) + return ImapWatcherCallbacks( + connect=connect, + load_fetch_context=load_fetch_context, + route_email=route_email, + save_uid=save_uid, + log_label="global mail", + ) def _start_global_watcher(config: GlobalMailConfig) -> None: @@ -303,7 +133,8 @@ def _start_global_watcher(config: GlobalMailConfig) -> None: account_id=0, mode=WorkerMode.CONNECTING, ) - _global_task = asyncio.create_task(_watch_global_folder(config)) + callbacks = _build_global_callbacks() + _global_task = asyncio.create_task(watch_loop(callbacks, _global_state)) async def start_global_watcher(): diff --git a/backend/app/modules/providers/email_user/service.py b/backend/app/modules/providers/email_user/service.py index a3a3cf7..94ec92d 100644 --- a/backend/app/modules/providers/email_user/service.py +++ b/backend/app/modules/providers/email_user/service.py @@ -1,20 +1,21 @@ import asyncio -import email -import hashlib import logging import re -from datetime import datetime, timedelta, timezone +from datetime import datetime, timezone -from aioimaplib import IMAP4_SSL, STOP_WAIT_SERVER_PUSH from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from app.database import async_session from app.models.imap_settings import ImapSettings -from app.modules._shared.email.imap_client import decode_header_value, extract_body -from app.modules._shared.email.imap_watcher import WorkerMode, WorkerState, IDLE_TIMEOUT_SEC, MAX_BACKOFF_SEC -from app.modules._shared.email.email_fetcher import check_dedup_and_enqueue +from app.modules._shared.email.imap_watcher import WorkerMode, WorkerState +from app.modules._shared.email.imap_watch_loop import ( + ConnectResult, + FetchContext, + ImapWatcherCallbacks, + watch_loop, +) from app.modules.providers.email_user.models import EmailAccount, WatchedFolder logger = logging.getLogger(__name__) @@ -47,267 +48,104 @@ async def _get_effective_settings(db, folder: WatchedFolder) -> tuple[int, bool] return max_age, check_uid -async def _connect_and_select( - account, folder, db, -) -> tuple[IMAP4_SSL, bool]: - """Connect, login, check IDLE capability, select folder.""" - from app.core.encryption import decrypt_value - - password = decrypt_value(account.imap_password_encrypted) - imap = IMAP4_SSL(host=account.imap_host, port=account.imap_port) - await imap.wait_hello_from_server() - await imap.login(account.imap_user, password) - - idle_supported = imap.has_capability("IDLE") - - if account.idle_supported != idle_supported: - account.idle_supported = idle_supported - if not idle_supported and not account.use_polling: - account.use_polling = True - logger.info( - f"Account {account.id}: IDLE not supported, forcing polling mode" - ) - await db.commit() - await db.refresh(account) - - select_response = await imap.select(folder.folder_path) - - _, check_uid = await _get_effective_settings(db, folder) - if check_uid: - current_uidvalidity = None - if select_response and len(select_response) > 1: - for line in select_response[1]: - line_str = line.decode() if isinstance(line, bytes) else str(line) - match = re.search(r"UIDVALIDITY\s+(\d+)", line_str) - if match: - current_uidvalidity = int(match.group(1)) - break - - if current_uidvalidity is not None: - if folder.uidvalidity is None: - folder.uidvalidity = current_uidvalidity - await db.commit() - elif folder.uidvalidity != current_uidvalidity: - logger.warning( - f"UIDVALIDITY changed for folder {folder.id}: " - f"{folder.uidvalidity} -> {current_uidvalidity}. Resetting." - ) - folder.uidvalidity = current_uidvalidity - folder.last_seen_uid = 0 - await db.commit() - - return imap, idle_supported - - -async def _fetch_new_emails( - imap: IMAP4_SSL, account, folder, db, state: WorkerState | None, -) -> None: - """UID search for new emails, process and enqueue them.""" - max_age, _ = await _get_effective_settings(db, folder) - since_date = (datetime.now(timezone.utc) - timedelta(days=max_age)).strftime("%d-%b-%Y") - search_criteria = f"UID {folder.last_seen_uid + 1}:* SINCE {since_date}" - _, data = await imap.uid_search(search_criteria) - uids = data[0].split() if data[0] else [] - - if state: - if uids: - state.mode = WorkerMode.PROCESSING - state.queue_total = len(uids) - state.last_activity_at = datetime.now(timezone.utc) - - for i, uid_bytes in enumerate(uids): - uid = int(uid_bytes) - if uid <= folder.last_seen_uid: - continue - - _, msg_data = await imap.uid("fetch", str(uid), "(RFC822)") - if not msg_data or not msg_data[0]: - continue - - raw_email = None - for part in msg_data: - if isinstance(part, bytearray): - raw_email = bytes(part) - break - if raw_email is None: - continue - msg = email.message_from_bytes(raw_email) - - subject = decode_header_value(msg.get("Subject", "")) - sender = decode_header_value(msg.get("From", "")) - message_id = msg.get("Message-ID", "") - if not message_id or not message_id.strip(): - uidvalidity_part = str(folder.uidvalidity) if folder.uidvalidity is not None else "no-uidvalidity" - folder_hash = hashlib.sha256(folder.folder_path.encode()).hexdigest()[:16] - message_id = f"fallback:{account.id}:{folder_hash}:{uidvalidity_part}:{uid}" - body = extract_body(msg) - - email_date = None - try: - date_str = msg.get("Date", "") - if date_str: - from email.utils import parsedate_to_datetime - email_date = parsedate_to_datetime(date_str) - except Exception: - pass - - if state: - state.queue_position = i + 1 - state.current_email_subject = subject - state.current_email_sender = sender - state.last_activity_at = datetime.now(timezone.utc) - - enqueued = await check_dedup_and_enqueue( - message_id=message_id, - subject=subject, - sender=sender, - body=body, - email_date=email_date, - email_uid=uid, - user_id=account.user_id, - source_info=f"{account.imap_user} / {folder.folder_path}", - account_id=account.id, - folder_path=folder.folder_path, - source="user_account", - db=db, - ) - - folder.last_seen_uid = uid - await db.commit() - - if state: - state.last_scan_at = datetime.now(timezone.utc) - state.clear_queue() - - -async def _idle_loop( - imap: IMAP4_SSL, account, folder, db, state: WorkerState | None, -) -> None: - """Persistent IDLE loop. Returns only on connection error (to trigger reconnect).""" - while True: - if state: - state.mode = WorkerMode.IDLE - state.next_scan_at = None - state.last_activity_at = datetime.now(timezone.utc) - - try: - idle_task = await imap.idle_start(timeout=IDLE_TIMEOUT_SEC) +def _build_callbacks(account_id: int, folder_id: int) -> ImapWatcherCallbacks: + """Build provider-specific callbacks for a user email account folder.""" - msg = await imap.wait_server_push() + async def connect(db: AsyncSession) -> ConnectResult | None: + from aioimaplib import IMAP4_SSL + from app.core.encryption import decrypt_value - if msg == STOP_WAIT_SERVER_PUSH: - imap.idle_done() - await asyncio.wait_for(idle_task, timeout=5) - continue + account = await db.get(EmailAccount, account_id) + folder = await db.get(WatchedFolder, folder_id) + if not account or not folder or not account.is_active: + return None + + password = decrypt_value(account.imap_password_encrypted) + imap = IMAP4_SSL(host=account.imap_host, port=account.imap_port) + await imap.wait_hello_from_server() + await imap.login(account.imap_user, password) + + idle_supported = imap.has_capability("IDLE") + if account.idle_supported != idle_supported: + account.idle_supported = idle_supported + if not idle_supported and not account.use_polling: + account.use_polling = True + logger.info( + f"Account {account.id}: IDLE not supported, forcing polling mode" + ) + await db.commit() + await db.refresh(account) - imap.idle_done() - await asyncio.wait_for(idle_task, timeout=5) + select_response = await imap.select(folder.folder_path) - has_new = False - if isinstance(msg, list): - for line in msg: + _, check_uid = await _get_effective_settings(db, folder) + if check_uid: + current_uidvalidity = None + if select_response and len(select_response) > 1: + for line in select_response[1]: line_str = line.decode() if isinstance(line, bytes) else str(line) - if "EXISTS" in line_str: - has_new = True + match = re.search(r"UIDVALIDITY\s+(\d+)", line_str) + if match: + current_uidvalidity = int(match.group(1)) break - if has_new: - await _fetch_new_emails(imap, account, folder, db, state) - - except asyncio.CancelledError: - raise - except Exception as e: - logger.warning(f"IDLE loop error for folder {folder.id}: {e}") - try: - imap.idle_done() - except Exception: - pass - return - - -async def _poll_loop( - account_id: int, folder_id: int, polling_interval_sec: int, - state: WorkerState | None, -) -> None: - """Polling loop. Disconnects between cycles.""" - interval = polling_interval_sec - while True: - if state: - state.mode = WorkerMode.POLLING - state.next_scan_at = datetime.now(timezone.utc) + timedelta(seconds=interval) - - await asyncio.sleep(interval) - - try: - async with async_session() as db: - account = await db.get(EmailAccount, account_id) - folder = await db.get(WatchedFolder, folder_id) - if not account or not folder or not account.is_active: - return - - imap, _ = await _connect_and_select(account, folder, db) - await _fetch_new_emails(imap, account, folder, db, state) - try: - await imap.logout() - except Exception: - pass - - interval = account.polling_interval_sec - - except asyncio.CancelledError: - raise - except Exception as e: - logger.warning(f"Poll cycle error for folder {folder_id}: {e}") - return - - -async def _watch_folder(account_id: int, folder_id: int): - """Watch a single IMAP folder using IDLE (persistent) or polling (periodic).""" - backoff = 30 - - while True: - state = _worker_state.get(folder_id) - if state: - state.mode = WorkerMode.CONNECTING - state.next_scan_at = None - state.clear_queue() - state.error = None - - try: - async with async_session() as db: - account = await db.get(EmailAccount, account_id) - folder = await db.get(WatchedFolder, folder_id) - if not account or not folder or not account.is_active: - logger.info(f"Stopping watcher for folder {folder_id}: inactive or removed") - return - - imap, idle_supported = await _connect_and_select(account, folder, db) + if current_uidvalidity is not None: + if folder.uidvalidity is None: + folder.uidvalidity = current_uidvalidity + await db.commit() + elif folder.uidvalidity != current_uidvalidity: + logger.warning( + f"UIDVALIDITY changed for folder {folder.id}: " + f"{folder.uidvalidity} -> {current_uidvalidity}. Resetting." + ) + folder.uidvalidity = current_uidvalidity + folder.last_seen_uid = 0 + await db.commit() + + return ConnectResult( + imap=imap, + idle_supported=idle_supported, + use_polling=account.use_polling, + polling_interval_sec=account.polling_interval_sec, + ) - await _fetch_new_emails(imap, account, folder, db, state) + async def load_fetch_context(db: AsyncSession) -> FetchContext | None: + account = await db.get(EmailAccount, account_id) + folder = await db.get(WatchedFolder, folder_id) + if not account or not folder: + return None - backoff = 30 + max_age, _ = await _get_effective_settings(db, folder) - if not account.use_polling and idle_supported: - await _idle_loop(imap, account, folder, db, state) - else: - try: - await imap.logout() - except Exception: - pass - await _poll_loop(account_id, folder_id, account.polling_interval_sec, state) + return FetchContext( + last_seen_uid=folder.last_seen_uid, + folder_path=folder.folder_path, + uidvalidity=folder.uidvalidity, + max_email_age_days=max_age, + source_info=f"{account.imap_user} / {folder.folder_path}", + source_label=f"folder {folder_id}", + account_id=account.id, + ) - except asyncio.CancelledError: - logger.info(f"Watcher cancelled for folder {folder_id}") - return - except Exception as e: - logger.error(f"Error watching folder {folder_id}: {e}") - if state: - state.mode = WorkerMode.ERROR_BACKOFF - state.error = str(e) - state.next_scan_at = datetime.now(timezone.utc) + timedelta(seconds=backoff) + async def route_email(sender: str, db: AsyncSession): + account = await db.get(EmailAccount, account_id) + if not account: + return None + return (account.user_id, "user_account") - await asyncio.sleep(backoff) - backoff = min(backoff * 2, MAX_BACKOFF_SEC) + async def save_uid(uid: int, db: AsyncSession) -> None: + folder = await db.get(WatchedFolder, folder_id) + if folder: + folder.last_seen_uid = uid + await db.commit() + + return ImapWatcherCallbacks( + connect=connect, + load_fetch_context=load_fetch_context, + route_email=route_email, + save_uid=save_uid, + log_label=f"folder {folder_id}", + ) async def start_all_watchers(): @@ -323,7 +161,8 @@ async def start_all_watchers(): key = folder.id if key not in _running_tasks: _worker_state[key] = WorkerState(folder_id=folder.id, account_id=folder.account_id) - task = asyncio.create_task(_watch_folder(folder.account_id, folder.id)) + callbacks = _build_callbacks(folder.account_id, folder.id) + task = asyncio.create_task(watch_loop(callbacks, _worker_state[key])) _running_tasks[key] = task logger.info(f"Started watcher for folder {folder.id} (account {folder.account_id})") @@ -361,7 +200,8 @@ async def restart_single_watcher(folder_id: int): if not account or not account.is_active: return _worker_state[folder_id] = WorkerState(folder_id=folder_id, account_id=folder.account_id) - task = asyncio.create_task(_watch_folder(folder.account_id, folder_id)) + callbacks = _build_callbacks(folder.account_id, folder_id) + task = asyncio.create_task(watch_loop(callbacks, _worker_state[folder_id])) _running_tasks[folder_id] = task logger.info(f"Restarted watcher for folder {folder_id} (manual scan)") diff --git a/backend/tests/test_imap_worker.py b/backend/tests/test_imap_worker.py index 076e099..c433e53 100644 --- a/backend/tests/test_imap_worker.py +++ b/backend/tests/test_imap_worker.py @@ -8,6 +8,7 @@ from app.core.encryption import encrypt_value from app.modules.providers.email_user.models import EmailAccount, WatchedFolder from app.modules._shared.email.models import ProcessedEmail +from app.modules._shared.email.imap_watch_loop import generate_fallback_message_id from app.models.user import User @@ -54,16 +55,6 @@ async def test_folder(db_session, test_account): return folder -def generate_fallback_message_id(account_id: int, folder_path: str, uidvalidity: int | None, uid: int) -> str: - """ - Generate fallback message_id using the same logic as imap_worker.py. - This mirrors the implementation to test the format. - """ - uidvalidity_part = str(uidvalidity) if uidvalidity is not None else "no-uidvalidity" - folder_hash = hashlib.sha256(folder_path.encode()).hexdigest()[:16] - return f"fallback:{account_id}:{folder_hash}:{uidvalidity_part}:{uid}" - - @pytest.mark.asyncio async def test_fallback_message_id_format(test_account, test_folder): """Test that fallback message_id format is correct.""" From caf1c69f93ca0e7091c3d8142d5c00a6197846b6 Mon Sep 17 00:00:00 2001 From: Xitee <59659167+Xitee1@users.noreply.github.com> Date: Tue, 17 Feb 2026 18:25:34 +0100 Subject: [PATCH 5/6] refactor: decouple core pipeline from LLM analyser module Move EmailAnalysis/EmailItem to shared schemas so the core service layer no longer depends on a specific plugin module. Analysis is now routed through the module registry via a new `analyze` hook on ModuleInfo, consistent with how `notify` already works for notifiers. Co-Authored-By: Claude Opus 4.6 --- backend/app/core/module_base.py | 1 + backend/app/core/module_registry.py | 21 ++++++---- backend/app/modules/analysers/llm/__init__.py | 3 +- backend/app/modules/analysers/llm/service.py | 26 +------------ backend/app/schemas/email_analysis.py | 24 ++++++++++++ backend/app/services/orders/order_matcher.py | 2 +- backend/app/services/orders/order_service.py | 2 +- backend/app/services/queue/queue_worker.py | 6 +-- backend/tests/test_order_matcher.py | 2 +- backend/tests/test_queue_worker.py | 38 +++++++++---------- 10 files changed, 65 insertions(+), 60 deletions(-) create mode 100644 backend/app/schemas/email_analysis.py diff --git a/backend/app/core/module_base.py b/backend/app/core/module_base.py index dd6e5d2..1e88578 100644 --- a/backend/app/core/module_base.py +++ b/backend/app/core/module_base.py @@ -20,3 +20,4 @@ class ModuleInfo: is_configured: Callable[[], Awaitable[bool]] | None = None status: Callable[[AsyncSession], Awaitable[dict | None]] | None = None notify: Callable[[int, str, dict, dict | None, AsyncSession], Awaitable[None]] | None = None + analyze: Callable[[dict, AsyncSession], Awaitable[tuple]] | None = None diff --git a/backend/app/core/module_registry.py b/backend/app/core/module_registry.py index 12edd99..2db3c9c 100644 --- a/backend/app/core/module_registry.py +++ b/backend/app/core/module_registry.py @@ -54,11 +54,14 @@ def discover_modules() -> dict[str, ModuleInfo]: return _registered_modules -async def has_available_analyser() -> bool: - """Return True if at least one analyser module is enabled and configured.""" +async def get_active_analyser(): + """Return the analyze callable from the first enabled and configured analyser module. + + Returns None if no analyser is available. + """ analyser_modules = get_modules_by_type("analyser") if not analyser_modules: - return False + return None async with async_session() as db: result = await db.execute( @@ -71,17 +74,19 @@ async def has_available_analyser() -> bool: for config in enabled_configs: info = analyser_modules.get(config.module_key) - if info and info.is_configured: + if not info or not info.analyze: + continue + if info.is_configured: try: if await info.is_configured(): - return True + return info.analyze except Exception: continue - elif info: + else: # Module has no is_configured hook — treat as configured - return True + return info.analyze - return False + return None async def sync_module_configs() -> None: diff --git a/backend/app/modules/analysers/llm/__init__.py b/backend/app/modules/analysers/llm/__init__.py index 06061ed..344f8ec 100644 --- a/backend/app/modules/analysers/llm/__init__.py +++ b/backend/app/modules/analysers/llm/__init__.py @@ -1,7 +1,7 @@ from app.core.module_base import ModuleInfo from app.modules.analysers.llm.router import router from app.modules.analysers.llm.models import LLMConfig -from app.modules.analysers.llm.service import get_status, check_configured +from app.modules.analysers.llm.service import get_status, check_configured, analyze MODULE_INFO = ModuleInfo( key="llm", @@ -13,4 +13,5 @@ models=[LLMConfig], status=get_status, is_configured=check_configured, + analyze=analyze, ) diff --git a/backend/app/modules/analysers/llm/service.py b/backend/app/modules/analysers/llm/service.py index 3d121f8..55d9309 100644 --- a/backend/app/modules/analysers/llm/service.py +++ b/backend/app/modules/analysers/llm/service.py @@ -1,12 +1,12 @@ import json import litellm -from pydantic import BaseModel, ValidationError -from typing import Optional +from pydantic import ValidationError from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.core.encryption import decrypt_value from app.modules.analysers.llm.models import LLMConfig +from app.schemas.email_analysis import EmailAnalysis, EmailItem # noqa: F401 _active_requests: int = 0 @@ -45,28 +45,6 @@ async def call_llm(config: LLMConfig, api_key: str | None, messages: list[dict], return response.choices[0].message.content -class EmailItem(BaseModel): - name: str - quantity: int = 1 - price: Optional[float] = None - - -class EmailAnalysis(BaseModel): - is_relevant: bool - email_type: Optional[str] = None - order_number: Optional[str] = None - tracking_number: Optional[str] = None - carrier: Optional[str] = None - vendor_name: Optional[str] = None - vendor_domain: Optional[str] = None - status: Optional[str] = None - order_date: Optional[str] = None - estimated_delivery: Optional[str] = None - total_amount: Optional[float] = None - currency: Optional[str] = None - items: Optional[list[EmailItem]] = None - - SYSTEM_PROMPT = """You are an email analysis assistant. Analyze the provided email and extract purchase/shipping information. Return ONLY valid JSON matching this schema: diff --git a/backend/app/schemas/email_analysis.py b/backend/app/schemas/email_analysis.py new file mode 100644 index 0000000..9a2104a --- /dev/null +++ b/backend/app/schemas/email_analysis.py @@ -0,0 +1,24 @@ +from pydantic import BaseModel +from typing import Optional + + +class EmailItem(BaseModel): + name: str + quantity: int = 1 + price: Optional[float] = None + + +class EmailAnalysis(BaseModel): + is_relevant: bool + email_type: Optional[str] = None + order_number: Optional[str] = None + tracking_number: Optional[str] = None + carrier: Optional[str] = None + vendor_name: Optional[str] = None + vendor_domain: Optional[str] = None + status: Optional[str] = None + order_date: Optional[str] = None + estimated_delivery: Optional[str] = None + total_amount: Optional[float] = None + currency: Optional[str] = None + items: Optional[list[EmailItem]] = None diff --git a/backend/app/services/orders/order_matcher.py b/backend/app/services/orders/order_matcher.py index 8bc3283..b242604 100644 --- a/backend/app/services/orders/order_matcher.py +++ b/backend/app/services/orders/order_matcher.py @@ -4,7 +4,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.models.order import Order -from app.modules.analysers.llm.service import EmailAnalysis +from app.schemas.email_analysis import EmailAnalysis class OrderMatcherProtocol(Protocol): diff --git a/backend/app/services/orders/order_service.py b/backend/app/services/orders/order_service.py index 8a1cbf7..b48d117 100644 --- a/backend/app/services/orders/order_service.py +++ b/backend/app/services/orders/order_service.py @@ -8,7 +8,7 @@ from app.models.order import Order from app.models.order_state import OrderState -from app.modules.analysers.llm.service import EmailAnalysis +from app.schemas.email_analysis import EmailAnalysis from app.schemas.order import CreateOrderRequest, UpdateOrderRequest diff --git a/backend/app/services/queue/queue_worker.py b/backend/app/services/queue/queue_worker.py index 91a5f76..909040b 100644 --- a/backend/app/services/queue/queue_worker.py +++ b/backend/app/services/queue/queue_worker.py @@ -4,10 +4,9 @@ from app.database import async_session from app.models.queue_item import QueueItem -from app.modules.analysers.llm.service import analyze from app.services.orders.order_matcher import DefaultOrderMatcher from app.services.orders.order_service import create_or_update_order -from app.core.module_registry import has_available_analyser +from app.core.module_registry import get_active_analyser from app.services.notification_service import notify_user, NotificationEvent logger = logging.getLogger(__name__) @@ -20,7 +19,8 @@ async def process_next_item() -> None: """Pick one queued item and process it. Called by the scheduler every 5s.""" global _no_analyser_warned - if not await has_available_analyser(): + analyze = await get_active_analyser() + if analyze is None: if not _no_analyser_warned: logger.warning("No analyser module is enabled and configured — queue processing paused") _no_analyser_warned = True diff --git a/backend/tests/test_order_matcher.py b/backend/tests/test_order_matcher.py index 97c70c2..76494dd 100644 --- a/backend/tests/test_order_matcher.py +++ b/backend/tests/test_order_matcher.py @@ -2,7 +2,7 @@ from app.models.user import User from app.models.order import Order -from app.modules.analysers.llm.service import EmailAnalysis, EmailItem +from app.schemas.email_analysis import EmailAnalysis, EmailItem from app.services.orders.order_matcher import DefaultOrderMatcher from app.core.auth import hash_password diff --git a/backend/tests/test_queue_worker.py b/backend/tests/test_queue_worker.py index 350772f..7838e8f 100644 --- a/backend/tests/test_queue_worker.py +++ b/backend/tests/test_queue_worker.py @@ -14,7 +14,7 @@ from app.models.order_state import OrderState from app.models.queue_item import QueueItem from app.models.user import User -from app.modules.analysers.llm.service import EmailAnalysis, EmailItem +from app.schemas.email_analysis import EmailAnalysis, EmailItem from app.services.orders.order_service import create_or_update_order from app.services.orders.order_matcher import DefaultOrderMatcher @@ -229,10 +229,8 @@ async def mock_async_session(): with ( patch("app.services.queue.queue_worker.async_session", mock_async_session), - patch("app.services.queue.queue_worker.has_available_analyser", new_callable=AsyncMock) as mock_check, + patch("app.services.queue.queue_worker.get_active_analyser", new_callable=AsyncMock, return_value=None), ): - mock_check.return_value = False - from app.services.queue.queue_worker import process_next_item await process_next_item() @@ -268,13 +266,12 @@ async def test_process_queued_item_creates_order(db_session, test_user): async def mock_async_session(): yield db_session + mock_analyze = AsyncMock(return_value=(analysis, raw_response)) + with ( patch("app.services.queue.queue_worker.async_session", mock_async_session), - patch("app.services.queue.queue_worker.analyze", new_callable=AsyncMock) as mock_analyze, - patch("app.services.queue.queue_worker.has_available_analyser", new_callable=AsyncMock, return_value=True), + patch("app.services.queue.queue_worker.get_active_analyser", new_callable=AsyncMock, return_value=mock_analyze), ): - mock_analyze.return_value = (analysis, raw_response) - from app.services.queue.queue_worker import process_next_item await process_next_item() @@ -322,13 +319,12 @@ async def test_process_irrelevant_item(db_session, test_user): async def mock_async_session(): yield db_session + mock_analyze = AsyncMock(return_value=(analysis, raw_response)) + with ( patch("app.services.queue.queue_worker.async_session", mock_async_session), - patch("app.services.queue.queue_worker.analyze", new_callable=AsyncMock) as mock_analyze, - patch("app.services.queue.queue_worker.has_available_analyser", new_callable=AsyncMock, return_value=True), + patch("app.services.queue.queue_worker.get_active_analyser", new_callable=AsyncMock, return_value=mock_analyze), ): - mock_analyze.return_value = (analysis, raw_response) - from app.services.queue.queue_worker import process_next_item await process_next_item() @@ -381,13 +377,12 @@ async def test_process_updates_existing_order(db_session, test_user): async def mock_async_session(): yield db_session + mock_analyze = AsyncMock(return_value=(analysis, raw_response)) + with ( patch("app.services.queue.queue_worker.async_session", mock_async_session), - patch("app.services.queue.queue_worker.analyze", new_callable=AsyncMock) as mock_analyze, - patch("app.services.queue.queue_worker.has_available_analyser", new_callable=AsyncMock, return_value=True), + patch("app.services.queue.queue_worker.get_active_analyser", new_callable=AsyncMock, return_value=mock_analyze), ): - mock_analyze.return_value = (analysis, raw_response) - from app.services.queue.queue_worker import process_next_item await process_next_item() @@ -413,13 +408,12 @@ async def test_process_failed_llm(db_session, test_user): async def mock_async_session(): yield db_session + mock_analyze = AsyncMock(side_effect=RuntimeError("LLM unavailable")) + with ( patch("app.services.queue.queue_worker.async_session", mock_async_session), - patch("app.services.queue.queue_worker.analyze", new_callable=AsyncMock) as mock_analyze, - patch("app.services.queue.queue_worker.has_available_analyser", new_callable=AsyncMock, return_value=True), + patch("app.services.queue.queue_worker.get_active_analyser", new_callable=AsyncMock, return_value=mock_analyze), ): - mock_analyze.side_effect = RuntimeError("LLM unavailable") - from app.services.queue.queue_worker import process_next_item await process_next_item() @@ -440,9 +434,11 @@ async def test_no_items_to_process(db_session): async def mock_async_session(): yield db_session + mock_analyze = AsyncMock() + with ( patch("app.services.queue.queue_worker.async_session", mock_async_session), - patch("app.services.queue.queue_worker.has_available_analyser", new_callable=AsyncMock, return_value=True), + patch("app.services.queue.queue_worker.get_active_analyser", new_callable=AsyncMock, return_value=mock_analyze), ): from app.services.queue.queue_worker import process_next_item await process_next_item() # Should not raise From 42546ffe0cb3b2be3095ba38a1e85cb4ab5b5644 Mon Sep 17 00:00:00 2001 From: Xitee <59659167+Xitee1@users.noreply.github.com> Date: Tue, 17 Feb 2026 18:39:39 +0100 Subject: [PATCH 6/6] refactor: rename email-specific types to generic, add multi-analyser fallback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Rename EmailAnalysis→AnalysisResult, EmailItem→ExtractedItem, email_type→document_type across all files - Rename email_analysis.py→analysis.py (shared schema) - Change get_active_analyser() to get_active_analysers() returning all enabled analysers in priority order (ordered by ModuleConfig.priority) - Queue worker now tries each analyser in priority order, falling back to the next on failure - Update SYSTEM_PROMPT to use generic "data" language instead of "email" - Clean up email_item_names→extracted_item_names variable naming Co-Authored-By: Claude Opus 4.6 --- backend/app/core/module_registry.py | 21 ++++++----- backend/app/modules/analysers/llm/__init__.py | 2 +- backend/app/modules/analysers/llm/service.py | 18 +++++----- .../{email_analysis.py => analysis.py} | 8 ++--- backend/app/services/orders/order_matcher.py | 10 +++--- backend/app/services/orders/order_service.py | 4 +-- backend/app/services/queue/queue_worker.py | 20 ++++++++--- backend/tests/test_llm_service.py | 6 ++-- backend/tests/test_order_matcher.py | 22 ++++++------ backend/tests/test_queue_worker.py | 36 +++++++++---------- 10 files changed, 82 insertions(+), 65 deletions(-) rename backend/app/schemas/{email_analysis.py => analysis.py} (77%) diff --git a/backend/app/core/module_registry.py b/backend/app/core/module_registry.py index 2db3c9c..d4286f2 100644 --- a/backend/app/core/module_registry.py +++ b/backend/app/core/module_registry.py @@ -54,24 +54,29 @@ def discover_modules() -> dict[str, ModuleInfo]: return _registered_modules -async def get_active_analyser(): - """Return the analyze callable from the first enabled and configured analyser module. +async def get_active_analysers() -> list[tuple[str, callable]]: + """Return all enabled and configured analyser modules in priority order. - Returns None if no analyser is available. + Returns a list of (module_key, analyze_callable) tuples, ordered by the + module priority field. The caller can iterate through them to implement + fallback (try the first analyser, fall back to the next on failure, etc.). """ analyser_modules = get_modules_by_type("analyser") if not analyser_modules: - return None + return [] async with async_session() as db: result = await db.execute( - select(ModuleConfig).where( + select(ModuleConfig) + .where( ModuleConfig.module_key.in_(analyser_modules.keys()), ModuleConfig.enabled == True, ) + .order_by(ModuleConfig.priority, ModuleConfig.module_key) ) enabled_configs = result.scalars().all() + active = [] for config in enabled_configs: info = analyser_modules.get(config.module_key) if not info or not info.analyze: @@ -79,14 +84,14 @@ async def get_active_analyser(): if info.is_configured: try: if await info.is_configured(): - return info.analyze + active.append((config.module_key, info.analyze)) except Exception: continue else: # Module has no is_configured hook — treat as configured - return info.analyze + active.append((config.module_key, info.analyze)) - return None + return active async def sync_module_configs() -> None: diff --git a/backend/app/modules/analysers/llm/__init__.py b/backend/app/modules/analysers/llm/__init__.py index 344f8ec..611926f 100644 --- a/backend/app/modules/analysers/llm/__init__.py +++ b/backend/app/modules/analysers/llm/__init__.py @@ -8,7 +8,7 @@ name="LLM Analyser", type="analyser", version="1.0.0", - description="Analyse emails using LLM (via LiteLLM) to extract order information", + description="Analyse data using LLM (via LiteLLM) to extract order information", router=router, models=[LLMConfig], status=get_status, diff --git a/backend/app/modules/analysers/llm/service.py b/backend/app/modules/analysers/llm/service.py index 55d9309..a2537d4 100644 --- a/backend/app/modules/analysers/llm/service.py +++ b/backend/app/modules/analysers/llm/service.py @@ -6,7 +6,7 @@ from app.core.encryption import decrypt_value from app.modules.analysers.llm.models import LLMConfig -from app.schemas.email_analysis import EmailAnalysis, EmailItem # noqa: F401 +from app.schemas.analysis import AnalysisResult, ExtractedItem # noqa: F401 _active_requests: int = 0 @@ -45,12 +45,12 @@ async def call_llm(config: LLMConfig, api_key: str | None, messages: list[dict], return response.choices[0].message.content -SYSTEM_PROMPT = """You are an email analysis assistant. Analyze the provided email and extract purchase/shipping information. +SYSTEM_PROMPT = """You are a data analysis assistant. Analyze the provided data and extract purchase/shipping information. Return ONLY valid JSON matching this schema: { "is_relevant": true/false, - "email_type": "order_confirmation" | "shipment_confirmation" | "shipment_update" | "delivery_confirmation", + "document_type": "order_confirmation" | "shipment_confirmation" | "shipment_update" | "delivery_confirmation", "order_number": "string or null", "tracking_number": "string or null", "carrier": "string or null", @@ -65,16 +65,16 @@ async def call_llm(config: LLMConfig, api_key: str | None, messages: list[dict], } Rules: -- An email is ONLY relevant if at least an order_number OR a tracking_number can be extracted. If neither is present, return {"is_relevant": false}. +- The data is ONLY relevant if at least an order_number OR a tracking_number can be extracted. If neither is present, return {"is_relevant": false}. - For marketplace platforms (eBay, Amazon Marketplace, Etsy, etc.), include the seller/shop name in vendor_name, e.g. "eBay - elektro-computershop", "Amazon - TechStore GmbH". Use the format "Platform - Seller". -- Always extract the estimated delivery date when mentioned in the email (e.g. "voraussichtliche Lieferung", "estimated delivery", "Zustellung bis", "Lieferung zwischen"). Many order confirmations (especially eBay) include this directly. -- For order_date: extract the order/purchase date from the email body. If no explicit date is found in the body, use the Date header from the email metadata as fallback. -- If the email is NOT related to a purchase order or shipment, return: {"is_relevant": false} +- Always extract the estimated delivery date when mentioned (e.g. "voraussichtliche Lieferung", "estimated delivery", "Zustellung bis", "Lieferung zwischen"). Many order confirmations (especially eBay) include this directly. +- For order_date: extract the order/purchase date from the content. If no explicit date is found in the body, use the Date header from metadata as fallback. +- If the data is NOT related to a purchase order or shipment, return: {"is_relevant": false} Do not include any text outside the JSON object.""" -async def analyze(raw_data: dict, db: AsyncSession) -> tuple[EmailAnalysis | None, dict]: +async def analyze(raw_data: dict, db: AsyncSession) -> tuple[AnalysisResult | None, dict]: """Analyze raw input data using the configured LLM. Returns (parsed_result, raw_response_dict).""" result = await db.execute(select(LLMConfig).where(LLMConfig.is_active == True)) config = result.scalar_one_or_none() @@ -98,7 +98,7 @@ async def analyze(raw_data: dict, db: AsyncSession) -> tuple[EmailAnalysis | Non try: raw_text = await call_llm(config, api_key, messages, max_tokens=2048) raw_dict = json.loads(raw_text) - parsed = EmailAnalysis.model_validate(raw_dict) + parsed = AnalysisResult.model_validate(raw_dict) return parsed, raw_dict except (json.JSONDecodeError, ValidationError): if attempt == 0: diff --git a/backend/app/schemas/email_analysis.py b/backend/app/schemas/analysis.py similarity index 77% rename from backend/app/schemas/email_analysis.py rename to backend/app/schemas/analysis.py index 9a2104a..cfc1592 100644 --- a/backend/app/schemas/email_analysis.py +++ b/backend/app/schemas/analysis.py @@ -2,15 +2,15 @@ from typing import Optional -class EmailItem(BaseModel): +class ExtractedItem(BaseModel): name: str quantity: int = 1 price: Optional[float] = None -class EmailAnalysis(BaseModel): +class AnalysisResult(BaseModel): is_relevant: bool - email_type: Optional[str] = None + document_type: Optional[str] = None order_number: Optional[str] = None tracking_number: Optional[str] = None carrier: Optional[str] = None @@ -21,4 +21,4 @@ class EmailAnalysis(BaseModel): estimated_delivery: Optional[str] = None total_amount: Optional[float] = None currency: Optional[str] = None - items: Optional[list[EmailItem]] = None + items: Optional[list[ExtractedItem]] = None diff --git a/backend/app/services/orders/order_matcher.py b/backend/app/services/orders/order_matcher.py index b242604..4d5244b 100644 --- a/backend/app/services/orders/order_matcher.py +++ b/backend/app/services/orders/order_matcher.py @@ -4,12 +4,12 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.models.order import Order -from app.schemas.email_analysis import EmailAnalysis +from app.schemas.analysis import AnalysisResult class OrderMatcherProtocol(Protocol): async def find_match( - self, analysis: EmailAnalysis, user_id: int, db: AsyncSession + self, analysis: AnalysisResult, user_id: int, db: AsyncSession ) -> Order | None: ... @@ -17,7 +17,7 @@ class DefaultOrderMatcher: """3-tier matching: exact order_number → exact tracking_number → fuzzy vendor+items.""" async def find_match( - self, analysis: EmailAnalysis, user_id: int, db: AsyncSession + self, analysis: AnalysisResult, user_id: int, db: AsyncSession ) -> Order | None: # Priority 1: exact order_number match if analysis.order_number: @@ -57,7 +57,7 @@ async def find_match( candidates = result.scalars().all() if analysis.items and candidates: - email_item_names = { + extracted_item_names = { item.name.lower() for item in analysis.items if item.name } for candidate in candidates: @@ -65,7 +65,7 @@ async def find_match( order_item_names = { item.get("name", "").lower() for item in candidate.items } - if email_item_names & order_item_names: + if extracted_item_names & order_item_names: return candidate return None diff --git a/backend/app/services/orders/order_service.py b/backend/app/services/orders/order_service.py index b48d117..c9ef6ad 100644 --- a/backend/app/services/orders/order_service.py +++ b/backend/app/services/orders/order_service.py @@ -8,7 +8,7 @@ from app.models.order import Order from app.models.order_state import OrderState -from app.schemas.email_analysis import EmailAnalysis +from app.schemas.analysis import AnalysisResult from app.schemas.order import CreateOrderRequest, UpdateOrderRequest @@ -287,7 +287,7 @@ async def delete_order( # --------------------------------------------------------------------------- async def create_or_update_order( - analysis: EmailAnalysis, + analysis: AnalysisResult, user_id: int, existing_order: Order | None, *, diff --git a/backend/app/services/queue/queue_worker.py b/backend/app/services/queue/queue_worker.py index 909040b..39cbaef 100644 --- a/backend/app/services/queue/queue_worker.py +++ b/backend/app/services/queue/queue_worker.py @@ -6,7 +6,7 @@ from app.models.queue_item import QueueItem from app.services.orders.order_matcher import DefaultOrderMatcher from app.services.orders.order_service import create_or_update_order -from app.core.module_registry import get_active_analyser +from app.core.module_registry import get_active_analysers from app.services.notification_service import notify_user, NotificationEvent logger = logging.getLogger(__name__) @@ -15,12 +15,24 @@ _no_analyser_warned = False +async def _run_analysis(raw_data: dict, db, analysers: list[tuple[str, callable]]): + """Try each analyser in priority order, falling back to the next on failure.""" + last_error = None + for module_key, analyze in analysers: + try: + return await analyze(raw_data, db) + except Exception as e: + logger.warning(f"Analyser {module_key} failed: {e}, trying next") + last_error = e + raise last_error or RuntimeError("No analysers available") + + async def process_next_item() -> None: """Pick one queued item and process it. Called by the scheduler every 5s.""" global _no_analyser_warned - analyze = await get_active_analyser() - if analyze is None: + analysers = await get_active_analysers() + if not analysers: if not _no_analyser_warned: logger.warning("No analyser module is enabled and configured — queue processing paused") _no_analyser_warned = True @@ -45,7 +57,7 @@ async def process_next_item() -> None: await db.commit() try: - analysis, raw_response = await analyze(item.raw_data, db) + analysis, raw_response = await _run_analysis(item.raw_data, db, analysers) item.extracted_data = raw_response diff --git a/backend/tests/test_llm_service.py b/backend/tests/test_llm_service.py index 39bdf7f..47addc3 100644 --- a/backend/tests/test_llm_service.py +++ b/backend/tests/test_llm_service.py @@ -4,7 +4,7 @@ from app.modules.analysers.llm.models import LLMConfig from app.core.encryption import encrypt_value -from app.modules.analysers.llm.service import analyze, EmailAnalysis +from app.modules.analysers.llm.service import analyze, AnalysisResult def _make_llm_response(content: str): @@ -38,7 +38,7 @@ async def test_analyze_relevant_order_email(db_session, llm_config): """Test that a relevant order confirmation email is parsed correctly.""" raw = { "is_relevant": True, - "email_type": "order_confirmation", + "document_type": "order_confirmation", "order_number": "ORD-12345", "tracking_number": None, "carrier": None, @@ -97,7 +97,7 @@ async def test_analyze_malformed_json_retry(db_session, llm_config): """Test that malformed JSON on first attempt triggers a retry.""" good_response = { "is_relevant": True, - "email_type": "shipment_confirmation", + "document_type": "shipment_confirmation", "order_number": "ORD-999", "tracking_number": "1Z999AA10123456784", "carrier": "UPS", diff --git a/backend/tests/test_order_matcher.py b/backend/tests/test_order_matcher.py index 76494dd..08d78dc 100644 --- a/backend/tests/test_order_matcher.py +++ b/backend/tests/test_order_matcher.py @@ -2,7 +2,7 @@ from app.models.user import User from app.models.order import Order -from app.schemas.email_analysis import EmailAnalysis, EmailItem +from app.schemas.analysis import AnalysisResult, ExtractedItem from app.services.orders.order_matcher import DefaultOrderMatcher from app.core.auth import hash_password @@ -60,7 +60,7 @@ def matcher(): @pytest.mark.asyncio async def test_match_by_order_number(db_session, test_user, order_with_order_number, matcher): """Priority 1: exact order_number match.""" - analysis = EmailAnalysis( + analysis = AnalysisResult( is_relevant=True, order_number="ORD-100", vendor_domain="amazon.com", @@ -74,7 +74,7 @@ async def test_match_by_order_number(db_session, test_user, order_with_order_num @pytest.mark.asyncio async def test_match_by_tracking_number(db_session, test_user, order_with_tracking, matcher): """Priority 2: exact tracking_number match.""" - analysis = EmailAnalysis( + analysis = AnalysisResult( is_relevant=True, tracking_number="1Z999AA10123456784", ) @@ -87,10 +87,10 @@ async def test_match_by_tracking_number(db_session, test_user, order_with_tracki @pytest.mark.asyncio async def test_match_by_vendor_and_items(db_session, test_user, order_with_order_number, matcher): """Priority 3: fuzzy match by vendor_domain + item name overlap.""" - analysis = EmailAnalysis( + analysis = AnalysisResult( is_relevant=True, vendor_domain="amazon.com", - items=[EmailItem(name="Wireless Mouse", quantity=1)], + items=[ExtractedItem(name="Wireless Mouse", quantity=1)], ) matched = await matcher.find_match(analysis, test_user.id, db_session) assert matched is not None @@ -100,11 +100,11 @@ async def test_match_by_vendor_and_items(db_session, test_user, order_with_order @pytest.mark.asyncio async def test_no_match_returns_none(db_session, test_user, order_with_order_number, matcher): """When nothing matches, return None.""" - analysis = EmailAnalysis( + analysis = AnalysisResult( is_relevant=True, order_number="ORD-NONEXISTENT", vendor_domain="bestbuy.com", - items=[EmailItem(name="Laptop Stand", quantity=1)], + items=[ExtractedItem(name="Laptop Stand", quantity=1)], ) matched = await matcher.find_match(analysis, test_user.id, db_session) assert matched is None @@ -118,7 +118,7 @@ async def test_no_match_different_user(db_session, test_user, order_with_order_n await db_session.commit() await db_session.refresh(other_user) - analysis = EmailAnalysis( + analysis = AnalysisResult( is_relevant=True, order_number="ORD-100", ) @@ -129,10 +129,10 @@ async def test_no_match_different_user(db_session, test_user, order_with_order_n @pytest.mark.asyncio async def test_fuzzy_match_case_insensitive(db_session, test_user, order_with_order_number, matcher): """Fuzzy item name matching should be case-insensitive.""" - analysis = EmailAnalysis( + analysis = AnalysisResult( is_relevant=True, vendor_domain="amazon.com", - items=[EmailItem(name="wireless mouse", quantity=1)], + items=[ExtractedItem(name="wireless mouse", quantity=1)], ) matched = await matcher.find_match(analysis, test_user.id, db_session) assert matched is not None @@ -142,7 +142,7 @@ async def test_fuzzy_match_case_insensitive(db_session, test_user, order_with_or @pytest.mark.asyncio async def test_order_number_takes_priority_over_tracking(db_session, test_user, order_with_order_number, order_with_tracking, matcher): """Order number match should take priority even if tracking also matches a different order.""" - analysis = EmailAnalysis( + analysis = AnalysisResult( is_relevant=True, order_number="ORD-100", tracking_number="1Z999AA10123456784", diff --git a/backend/tests/test_queue_worker.py b/backend/tests/test_queue_worker.py index 7838e8f..c5b63e4 100644 --- a/backend/tests/test_queue_worker.py +++ b/backend/tests/test_queue_worker.py @@ -14,7 +14,7 @@ from app.models.order_state import OrderState from app.models.queue_item import QueueItem from app.models.user import User -from app.schemas.email_analysis import EmailAnalysis, EmailItem +from app.schemas.analysis import AnalysisResult, ExtractedItem from app.services.orders.order_service import create_or_update_order from app.services.orders.order_matcher import DefaultOrderMatcher @@ -62,11 +62,11 @@ def _make_queue_item(user_id: int, **kwargs) -> QueueItem: return QueueItem(**defaults) -def _make_analysis(**kwargs) -> EmailAnalysis: - """Helper to create an EmailAnalysis with defaults.""" +def _make_analysis(**kwargs) -> AnalysisResult: + """Helper to create an AnalysisResult with defaults.""" defaults = {"is_relevant": True} defaults.update(kwargs) - return EmailAnalysis(**defaults) + return AnalysisResult(**defaults) # --------------------------------------------------------------------------- @@ -85,7 +85,7 @@ async def test_create_order_from_analysis(db_session, test_user): order_date="2025-01-15", total_amount=59.99, currency="USD", - items=[EmailItem(name="Keyboard", quantity=1, price=59.99)], + items=[ExtractedItem(name="Keyboard", quantity=1, price=59.99)], ) order = await create_or_update_order( @@ -212,7 +212,7 @@ async def test_irrelevant_analysis_creates_no_order(db_session, test_user): # --------------------------------------------------------------------------- -# Tests for process_next_item (integration, mocked session + LLM) +# Tests for process_next_item (integration, mocked session + analysers) # --------------------------------------------------------------------------- @@ -229,7 +229,7 @@ async def mock_async_session(): with ( patch("app.services.queue.queue_worker.async_session", mock_async_session), - patch("app.services.queue.queue_worker.get_active_analyser", new_callable=AsyncMock, return_value=None), + patch("app.services.queue.queue_worker.get_active_analysers", new_callable=AsyncMock, return_value=[]), ): from app.services.queue.queue_worker import process_next_item await process_next_item() @@ -253,12 +253,12 @@ async def test_process_queued_item_creates_order(db_session, test_user): await db_session.commit() analysis = _make_analysis( - email_type="order_confirmation", + document_type="order_confirmation", order_number="ORD-500", vendor_name="Amazon", vendor_domain="amazon.com", status="ordered", - items=[EmailItem(name="Keyboard", quantity=1, price=59.99)], + items=[ExtractedItem(name="Keyboard", quantity=1, price=59.99)], ) raw_response = {"is_relevant": True, "order_number": "ORD-500"} @@ -270,7 +270,7 @@ async def mock_async_session(): with ( patch("app.services.queue.queue_worker.async_session", mock_async_session), - patch("app.services.queue.queue_worker.get_active_analyser", new_callable=AsyncMock, return_value=mock_analyze), + patch("app.services.queue.queue_worker.get_active_analysers", new_callable=AsyncMock, return_value=[("llm", mock_analyze)]), ): from app.services.queue.queue_worker import process_next_item await process_next_item() @@ -300,7 +300,7 @@ async def mock_async_session(): @pytest.mark.asyncio async def test_process_irrelevant_item(db_session, test_user): - """Processing an irrelevant email should complete the item without creating an order.""" + """Processing an irrelevant item should complete without creating an order.""" item = _make_queue_item( test_user.id, raw_data={ @@ -312,7 +312,7 @@ async def test_process_irrelevant_item(db_session, test_user): db_session.add(item) await db_session.commit() - analysis = EmailAnalysis(is_relevant=False) + analysis = AnalysisResult(is_relevant=False) raw_response = {"is_relevant": False} @asynccontextmanager @@ -323,7 +323,7 @@ async def mock_async_session(): with ( patch("app.services.queue.queue_worker.async_session", mock_async_session), - patch("app.services.queue.queue_worker.get_active_analyser", new_callable=AsyncMock, return_value=mock_analyze), + patch("app.services.queue.queue_worker.get_active_analysers", new_callable=AsyncMock, return_value=[("llm", mock_analyze)]), ): from app.services.queue.queue_worker import process_next_item await process_next_item() @@ -365,7 +365,7 @@ async def test_process_updates_existing_order(db_session, test_user): await db_session.commit() analysis = _make_analysis( - email_type="shipment_confirmation", + document_type="shipment_confirmation", order_number="ORD-600", tracking_number="1Z999AA10123456784", carrier="UPS", @@ -381,7 +381,7 @@ async def mock_async_session(): with ( patch("app.services.queue.queue_worker.async_session", mock_async_session), - patch("app.services.queue.queue_worker.get_active_analyser", new_callable=AsyncMock, return_value=mock_analyze), + patch("app.services.queue.queue_worker.get_active_analysers", new_callable=AsyncMock, return_value=[("llm", mock_analyze)]), ): from app.services.queue.queue_worker import process_next_item await process_next_item() @@ -398,7 +398,7 @@ async def mock_async_session(): @pytest.mark.asyncio async def test_process_failed_llm(db_session, test_user): - """Mock LLM failure -> QueueItem status=failed, error_message set.""" + """All analysers fail -> QueueItem status=failed, error_message set.""" item = _make_queue_item(test_user.id) db_session.add(item) await db_session.commit() @@ -412,7 +412,7 @@ async def mock_async_session(): with ( patch("app.services.queue.queue_worker.async_session", mock_async_session), - patch("app.services.queue.queue_worker.get_active_analyser", new_callable=AsyncMock, return_value=mock_analyze), + patch("app.services.queue.queue_worker.get_active_analysers", new_callable=AsyncMock, return_value=[("llm", mock_analyze)]), ): from app.services.queue.queue_worker import process_next_item await process_next_item() @@ -438,7 +438,7 @@ async def mock_async_session(): with ( patch("app.services.queue.queue_worker.async_session", mock_async_session), - patch("app.services.queue.queue_worker.get_active_analyser", new_callable=AsyncMock, return_value=mock_analyze), + patch("app.services.queue.queue_worker.get_active_analysers", new_callable=AsyncMock, return_value=[("llm", mock_analyze)]), ): from app.services.queue.queue_worker import process_next_item await process_next_item() # Should not raise