diff --git a/compose/backend.yml b/compose/backend.yml index ad34cf25..0252f115 100644 --- a/compose/backend.yml +++ b/compose/backend.yml @@ -26,8 +26,8 @@ services: - COMPOSE_PROJECT_NAME=${COMPOSE_PROJECT_NAME:-ushadow} # Environment name for unode display - ENV_NAME=${ENV_NAME:-} - # Host machine hostname - uses shell's HOSTNAME at compose startup - - HOST_HOSTNAME=${HOST_HOSTNAME:-${HOSTNAME}} + # Host machine hostname for Tailscale peer discovery + - HOST_HOSTNAME=${HOST_HOSTNAME} # Config directory location - CONFIG_DIR=/config - MONGODB_DATABASE=${MONGODB_DATABASE:-ushadow} diff --git a/config/feature_flags.yaml b/config/feature_flags.yaml index 4a2f4193..49984d4e 100644 --- a/config/feature_flags.yaml +++ b/config/feature_flags.yaml @@ -108,6 +108,13 @@ flags: configuration" type: release + # Social Feed - Personalized fediverse content curation + social_feed: + enabled: true + description: "Social Feed - Curated fediverse posts ranked by your OpenMemory + interest graph" + type: release + # Add your feature flags here following this format: # my_feature_name: # enabled: false diff --git a/docs/INTEREST_EXTRACTION.md b/docs/INTEREST_EXTRACTION.md new file mode 100644 index 00000000..be78121f --- /dev/null +++ b/docs/INTEREST_EXTRACTION.md @@ -0,0 +1,539 @@ +# Interest Extraction for Personalized Feeds + +## Problem + +You want to build a personalized social media/YouTube feed based on user interests, but: +- Querying graph at read-time is slow for feed ranking +- Need fast access to user interests for real-time recommendations +- Want to track interest intensity and evolution + +## Solution: Hybrid Approach + +### Write-Time: Extract Interests with Custom Prompts +Extract and store interests during memory creation for fast retrieval. + +### Read-Time: Enrich with Graph Relationships +Use graph to find related topics and deeper context. + +--- + +## Implementation + +### Step 1: Add Interest Extraction Prompt + +Create a custom prompt that extracts interests during storage: + +```python +# In api/seed_prompts.py or via UI + +INTEREST_EXTRACTION_PROMPT = """ +You are analyzing user messages to extract their interests and preferences for content recommendations. + +Extract the following from the user's message: + +1. **Primary Interests**: Main topics they care about (technology, cooking, sports, etc.) +2. **Content Types**: Preferred content formats (videos, articles, tutorials, etc.) +3. **Specific Topics**: Detailed interests (Python programming, Italian cooking, soccer, etc.) +4. **Sentiment**: How they feel about each topic (loves, likes, dislikes, curious about) +5. **Intensity**: How strong their interest is (casual, moderate, passionate) + +Format as structured metadata: +{ + "interests": { + "primary": ["technology", "cooking"], + "specific": ["Python programming", "Italian recipes", "AI/ML"], + "content_types": ["tutorial videos", "technical articles"], + "sentiment": { + "Python programming": "passionate", + "Italian recipes": "curious" + }, + "intensity": { + "Python programming": "high", + "Italian recipes": "moderate" + } + } +} + +User message: {text} + +Extract interests as structured JSON. +""" +``` + +### Step 2: Store Interests in Metadata + +When creating memories, interests are automatically extracted: + +```python +# Example: User says "I've been learning Python lately, love building AI apps" + +# mem0 processes with custom prompt and stores: +{ + "id": "mem_123", + "memory": "User is learning Python and enjoys building AI applications", + "metadata_": { + "interests": { + "primary": ["technology", "programming"], + "specific": ["Python", "AI/ML", "app development"], + "content_types": ["tutorials", "documentation", "projects"], + "sentiment": { + "Python": "passionate", + "AI/ML": "passionate" + }, + "intensity": { + "Python": "high", + "AI/ML": "high" + } + }, + "timestamp": "2024-01-15T10:30:00Z" + } +} +``` + +### Step 3: Fast Interest Queries + +Query interests directly from metadata for feed building: + +```python +import requests +from collections import Counter +from datetime import datetime, timedelta + +def get_user_interests(user_id: str, days_recent: int = 30): + """ + Get user interests for feed personalization + + Returns: + { + "interests": ["Python", "AI/ML", "cooking"], + "intensity": {"Python": "high", "AI/ML": "high"}, + "trending": ["AI/ML"], # interests mentioned more recently + "content_types": ["tutorial videos", "articles"] + } + """ + # Query recent memories + cutoff_date = int((datetime.now() - timedelta(days=days_recent)).timestamp()) + + response = requests.post( + "http://localhost:8765/api/v1/memories/filter", + json={ + "user_id": user_id, + "from_date": cutoff_date, + "page": 1, + "size": 100 + } + ) + + memories = response.json()["items"] + + # Aggregate interests + all_interests = [] + interest_sentiments = {} + interest_intensities = {} + content_types = set() + interest_timestamps = {} + + for memory in memories: + metadata = memory.get("metadata_", {}) + interests = metadata.get("interests", {}) + timestamp = memory.get("created_at") + + # Collect interests + for interest in interests.get("specific", []): + all_interests.append(interest) + + # Track when interest was mentioned + if interest not in interest_timestamps: + interest_timestamps[interest] = [] + interest_timestamps[interest].append(timestamp) + + # Collect sentiment + for topic, sentiment in interests.get("sentiment", {}).items(): + interest_sentiments[topic] = sentiment + + # Collect intensity + for topic, intensity in interests.get("intensity", {}).items(): + interest_intensities[topic] = intensity + + # Collect content types + content_types.update(interests.get("content_types", [])) + + # Count frequency + interest_counts = Counter(all_interests) + top_interests = [interest for interest, _ in interest_counts.most_common(10)] + + # Calculate trending (mentioned more in recent half vs older half) + mid_point = cutoff_date + ((int(datetime.now().timestamp()) - cutoff_date) / 2) + trending = [] + + for interest, timestamps in interest_timestamps.items(): + recent_mentions = sum(1 for ts in timestamps if ts > mid_point) + older_mentions = sum(1 for ts in timestamps if ts <= mid_point) + + if recent_mentions > older_mentions * 1.5: # 50% more mentions recently + trending.append(interest) + + return { + "interests": top_interests, + "sentiment": interest_sentiments, + "intensity": interest_intensities, + "trending": trending, + "content_types": list(content_types), + "interest_counts": dict(interest_counts) + } + + +# Usage +interests = get_user_interests("user123", days_recent=30) +print(f"Top interests: {interests['interests']}") +print(f"Trending: {interests['trending']}") +print(f"Content types: {interests['content_types']}") +``` + +### Step 4: Build Feed Rankings + +Use extracted interests to rank feed items: + +```python +def rank_feed_items(user_interests: dict, feed_items: list) -> list: + """ + Rank feed items based on user interests + + Args: + user_interests: Output from get_user_interests() + feed_items: List of content items with topics/tags + + Returns: + Ranked list of feed items with scores + """ + scored_items = [] + + for item in feed_items: + score = 0 + matched_interests = [] + + # Match item topics with user interests + item_topics = set(item.get("topics", [])) + user_interest_set = set(user_interests["interests"]) + + # Base score: topic match + matches = item_topics & user_interest_set + score += len(matches) * 10 + matched_interests.extend(matches) + + # Bonus: high intensity interests + for interest in matches: + intensity = user_interests["intensity"].get(interest) + if intensity == "high": + score += 15 + elif intensity == "moderate": + score += 5 + + # Bonus: positive sentiment + for interest in matches: + sentiment = user_interests["sentiment"].get(interest) + if sentiment == "passionate": + score += 20 + elif sentiment == "likes": + score += 10 + + # Bonus: trending topics + trending_matches = item_topics & set(user_interests["trending"]) + score += len(trending_matches) * 25 # Big boost for trending + + # Bonus: preferred content type + if item.get("content_type") in user_interests["content_types"]: + score += 10 + + scored_items.append({ + **item, + "score": score, + "matched_interests": matched_interests, + "is_trending": bool(trending_matches) + }) + + # Sort by score + scored_items.sort(key=lambda x: x["score"], reverse=True) + + return scored_items + + +# Example feed items +feed_items = [ + { + "id": "video_1", + "title": "Python AI Tutorial: Build a Chatbot", + "topics": ["Python", "AI/ML", "tutorials"], + "content_type": "tutorial video" + }, + { + "id": "video_2", + "title": "Italian Pasta Recipes", + "topics": ["cooking", "Italian recipes"], + "content_type": "recipe video" + }, + { + "id": "article_1", + "title": "Advanced Python Patterns", + "topics": ["Python", "software engineering"], + "content_type": "article" + } +] + +interests = get_user_interests("user123") +ranked_feed = rank_feed_items(interests, feed_items) + +for item in ranked_feed: + print(f"{item['title']}: {item['score']} points") + print(f" Matched: {item['matched_interests']}") + print(f" Trending: {item['is_trending']}") + print() +``` + +--- + +## Step 5: Enhance with Graph Enrichment (Optional) + +For **deeper personalization**, use graph enrichment to find related topics: + +```python +def get_related_interests(user_id: str, primary_interest: str): + """ + Use graph to find related topics and implicit interests + + Example: User likes "Python" -> Find they also interact with + "FastAPI", "Django", "data science" through relationships + """ + response = requests.get( + f"http://localhost:8765/api/v1/memories/entity/{primary_interest}", + params={"user_id": user_id} + ) + + entity_context = response.json() + + # Extract related entities from relationships + related = [] + for rel in entity_context.get("relationships", []): + if rel["relation"] in ["RELATED_TO", "USES", "INTERESTED_IN"]: + related.append(rel["related_entity"]) + + return related + + +# Expand user interests with graph relationships +def expand_interests_with_graph(user_interests: dict, user_id: str): + """ + Enhance interests with graph-discovered relationships + """ + expanded = user_interests.copy() + expanded["related_topics"] = {} + + for interest in user_interests["interests"][:5]: # Top 5 interests + related = get_related_interests(user_id, interest) + if related: + expanded["related_topics"][interest] = related + + return expanded + + +# Usage +interests = get_user_interests("user123") +expanded_interests = expand_interests_with_graph(interests, "user123") + +# Now include related topics in feed ranking +# Example: User likes "Python" -> also show "FastAPI", "Django" content +``` + +--- + +## Comparison: Write-Time vs Read-Time + +### Write-Time (Custom Prompt) βœ… **Recommended for Feeds** + +**Pros:** +- ⚑ **Fast queries** - interests pre-computed +- πŸ“Š **Easy aggregation** - simple metadata filtering +- 🎯 **Consistent format** - structured interest data +- πŸ”„ **Real-time updates** - interests extracted immediately +- πŸ’° **Cost-effective** - query once, use many times + +**Cons:** +- πŸ”’ Limited to what prompt extracts +- πŸ”„ Need to reprocess if extraction logic changes + +**Best for:** +- Feed ranking +- Real-time recommendations +- Dashboard analytics +- Quick interest queries + +### Read-Time (Graph Enrichment) + +**Pros:** +- πŸ•ΈοΈ **Rich relationships** - discover implicit interests +- πŸ” **Deep context** - multi-hop reasoning +- 🎨 **Flexible** - can query different aspects + +**Cons:** +- 🐌 **Slower** - graph queries + aggregation +- πŸ’Έ **More expensive** - queries on every read +- 🧩 **Complex** - need to aggregate from many memories + +**Best for:** +- Deep personalization +- Discovery ("users who like X also like Y") +- Interest evolution tracking +- Related topic suggestions + +--- + +## Architecture Diagram + +``` +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ User Conversation β”‚ +β”‚ "I've been learning Python, love AI apps" β”‚ +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + β”‚ + β–Ό +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ POST /api/v1/memories/ β”‚ +β”‚ (with custom interest extraction prompt) β”‚ +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + β”‚ + β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” + β”‚ β”‚ + β–Ό β–Ό +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ Vector Store β”‚ β”‚ Relational DB β”‚ +β”‚ (Qdrant) β”‚ β”‚ (SQLite/Postgres) β”‚ +β”‚ β”‚ β”‚ β”‚ +β”‚ Embeddings β”‚ β”‚ Metadata: β”‚ +β”‚ for semantic β”‚ β”‚ { β”‚ +β”‚ search β”‚ β”‚ "interests": { β”‚ +β”‚ β”‚ β”‚ "specific": [ β”‚ +β”‚ β”‚ β”‚ "Python", β”‚ +β”‚ β”‚ β”‚ "AI/ML" β”‚ +β”‚ β”‚ β”‚ ], β”‚ +β”‚ β”‚ β”‚ "intensity": { β”‚ +β”‚ β”‚ β”‚ "Python": "high"β”‚ +β”‚ β”‚ β”‚ } β”‚ +β”‚ β”‚ β”‚ } β”‚ +β”‚ β”‚ β”‚ } β”‚ +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + β”‚ + β”‚ FAST QUERY ⚑ + β–Ό + β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” + β”‚ Feed Builder β”‚ + β”‚ β”‚ + β”‚ 1. Get interests β”‚ + β”‚ 2. Rank content β”‚ + β”‚ 3. Return feed β”‚ + β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + β”‚ + β–Ό + β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” + β”‚ Personalized Feed β”‚ + β”‚ β”‚ + β”‚ - Python tutorials β”‚ + β”‚ - AI/ML articles β”‚ + β”‚ - Trending tech β”‚ + β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + + Optional: Enhance with Graph + β”‚ + β–Ό + β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” + β”‚ Neo4j Graph β”‚ + β”‚ β”‚ + β”‚ Python -[USES]-> β”‚ + β”‚ FastAPI β”‚ + β”‚ β”‚ + β”‚ Python -[USED_IN]->β”‚ + β”‚ AI/ML β”‚ + β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + β”‚ + β”‚ DEEP QUERY πŸ•ΈοΈ + β–Ό + Expand interests with + related topics +``` + +--- + +## Recommended Implementation + +### Phase 1: Write-Time Extraction (Start Here) + +```python +# 1. Create custom interest extraction prompt in UI or database +# 2. Memories automatically extract interests during storage +# 3. Query metadata for fast feed building +``` + +### Phase 2: Add Graph Enhancement (Later) + +```python +# 1. Use graph to discover related topics +# 2. Find implicit interests from relationships +# 3. Enhance feed with "users who like X also like Y" +``` + +--- + +## Example End-to-End Flow + +```python +# User conversation +user_message = "I've been watching Python tutorials on YouTube, especially AI stuff" + +# 1. Store memory (interests auto-extracted) +memory = create_memory( + user_id="user123", + text=user_message, + # Custom prompt extracts: ["Python", "AI/ML", "tutorials", "YouTube"] +) + +# 2. Build personalized feed +interests = get_user_interests("user123") +# Returns: { +# "interests": ["Python", "AI/ML", "tutorials"], +# "content_types": ["tutorial videos"], +# "intensity": {"Python": "high", "AI/ML": "high"} +# } + +# 3. Rank feed content +feed = rank_feed_items(interests, available_content) +# Returns prioritized content matching user interests + +# 4. (Optional) Expand with graph +expanded = expand_interests_with_graph(interests, "user123") +# Discovers: User also interacts with "FastAPI", "data science" +``` + +--- + +## Performance Comparison + +| Approach | Query Time | Best Use Case | +|----------|-----------|---------------| +| **Metadata query** | ~10ms | Feed ranking, real-time | +| **Graph enrichment** | ~100ms | Deep personalization | +| **Hybrid** | ~50ms | Best of both worlds | + +--- + +## Conclusion + +βœ… **Recommended: Write-Time + Metadata Queries** +- Fast, efficient, perfect for feed building +- Store interests in metadata during memory creation +- Query metadata for real-time feed ranking + +🎨 **Optional: Add Graph for Deep Personalization** +- Use when you need related topics +- Great for "discover" features +- Enhances recommendations with implicit interests + +The hybrid approach gives you **speed when you need it** (feed ranking) and **depth when you want it** (discovery). diff --git a/ushadow/backend/main.py b/ushadow/backend/main.py index 37c4f811..102421f2 100644 --- a/ushadow/backend/main.py +++ b/ushadow/backend/main.py @@ -20,11 +20,13 @@ from src.models.user import User # Beanie document model from src.models.share import ShareToken # Beanie document model +from src.models.feed import PostSource, Post # Beanie document model from src.routers import health, wizard, chronicle, auth, feature_flags from src.routers import services, deployments, providers, service_configs, chat from src.routers import kubernetes, tailscale, unodes, docker, sse from src.routers import github_import, audio_relay, memories, share, keycloak_admin, dashboard +from src.routers import feed from src.routers import settings as settings_api from src.middleware import setup_middleware from src.services.unode_manager import init_unode_manager, get_unode_manager @@ -128,7 +130,7 @@ def send_telemetry(): app.state.db = db # Initialize Beanie ODM with document models - await init_beanie(database=db, document_models=[User, ShareToken]) + await init_beanie(database=db, document_models=[User, ShareToken, PostSource, Post]) logger.info("βœ“ Beanie ODM initialized") # Create admin user if explicitly configured in secrets.yaml @@ -204,6 +206,7 @@ def send_telemetry(): app.include_router(share.router, tags=["sharing"]) app.include_router(keycloak_admin.router, prefix="/api/keycloak", tags=["keycloak-admin"]) app.include_router(dashboard.router, prefix="/api/dashboard", tags=["dashboard"]) +app.include_router(feed.router, tags=["feed"]) # Setup MCP server for LLM tool access setup_mcp_server(app) diff --git a/ushadow/backend/src/models/feed.py b/ushadow/backend/src/models/feed.py new file mode 100644 index 00000000..02026537 --- /dev/null +++ b/ushadow/backend/src/models/feed.py @@ -0,0 +1,169 @@ +"""Feed models for multi-platform content curation. + +PostSource: a content platform to fetch posts from (Mastodon, YouTube, etc.). +Post: a single content item, scored against the user's interests. +Interest: a topic/entity derived from the user's stored memories (not persisted). +""" + +import logging +from datetime import datetime +from typing import Any, Dict, List, Optional +from uuid import uuid4 + +from beanie import Document +from pydantic import BaseModel, Field + +logger = logging.getLogger(__name__) + + +# ============================================================================= +# Beanie Documents (MongoDB collections) +# ============================================================================= + + +class PostSource(Document): + """A content platform to fetch posts from (Mastodon, YouTube, etc.).""" + + source_id: str = Field( + default_factory=lambda: str(uuid4()), + description="Unique source identifier", + ) + user_id: str = Field(..., description="Owner email") + name: str = Field(..., min_length=1, max_length=200, description="Display name") + platform_type: str = Field( + default="mastodon", description="mastodon | youtube" + ) + instance_url: Optional[str] = Field( + default=None, description="Server URL (required for mastodon)" + ) + api_key: Optional[str] = Field( + default=None, description="API key (required for youtube)" + ) + enabled: bool = Field(default=True) + + created_at: datetime = Field(default_factory=datetime.utcnow) + + class Settings: + name = "feed_sources" + indexes = [ + "user_id", + "source_id", + [("user_id", 1), ("source_id", 1)], + ] + + +class Post(Document): + """A content item from any platform, scored against the user's interests.""" + + post_id: str = Field( + default_factory=lambda: str(uuid4()), + description="Internal post identifier", + ) + user_id: str = Field(..., description="Owner who fetched this post") + source_id: str = Field(..., description="PostSource this came from") + external_id: str = Field(..., description="Platform-specific ID (for dedup)") + platform_type: str = Field( + default="mastodon", description="mastodon | youtube" + ) + + # Author + author_handle: str = Field(..., description="e.g., @user@mastodon.social") + author_display_name: str = Field(default="") + author_avatar: Optional[str] = Field(default=None) + + # Content (shared across platforms) + content: str = Field(..., description="HTML content or description text") + url: str = Field(..., description="Link to original post/video") + published_at: datetime = Field(..., description="When the author posted it") + hashtags: List[str] = Field(default_factory=list) + language: Optional[str] = Field(default=None) + + # Mastodon engagement (optional β€” only set for mastodon) + boosts_count: Optional[int] = Field(default=None) + favourites_count: Optional[int] = Field(default=None) + replies_count: Optional[int] = Field(default=None) + + # YouTube-specific (optional β€” only set for youtube) + thumbnail_url: Optional[str] = Field(default=None) + video_id: Optional[str] = Field(default=None) + channel_title: Optional[str] = Field(default=None) + view_count: Optional[int] = Field(default=None) + like_count: Optional[int] = Field(default=None) + duration: Optional[str] = Field(default=None, description="ISO 8601 or HH:MM:SS") + + # Scoring + relevance_score: float = Field(default=0.0, description="Computed by PostScorer") + matched_interests: List[str] = Field( + default_factory=list, description="Interest names that matched this post" + ) + + # User interaction + seen: bool = Field(default=False) + bookmarked: bool = Field(default=False) + + # Metadata + fetched_at: datetime = Field(default_factory=datetime.utcnow) + + class Settings: + name = "feed_posts" + indexes = [ + "user_id", + "post_id", + "external_id", + [("user_id", 1), ("relevance_score", -1)], # Feed query: ranked + [("user_id", 1), ("platform_type", 1), ("relevance_score", -1)], + [("user_id", 1), ("bookmarked", 1)], # Bookmarked posts + [("user_id", 1), ("external_id", 1)], # Dedup check + ] + + +# ============================================================================= +# Pydantic models (not persisted β€” derived or request/response) +# ============================================================================= + + +class Interest(BaseModel): + """A topic/entity derived from the user's stored memories. + + Not persisted β€” computed on each refresh by aggregating memory categories + and entities, weighted by mention count and recency. + """ + + name: str = Field(..., description="Interest name (e.g., 'kubernetes', 'Mac mini')") + node_id: str = Field(..., description="Deterministic ID (md5 hash of name)") + labels: List[str] = Field( + default_factory=list, description="Source type: ['category'] or ['entity']" + ) + relationship_count: int = Field( + default=0, description="Weighted score (mention_count Γ— recency Γ— source_bonus)" + ) + last_active: Optional[datetime] = Field( + default=None, description="Most recent memory timestamp for this interest" + ) + hashtags: List[str] = Field( + default_factory=list, description="Derived hashtags for fediverse search" + ) + + +class SourceCreate(BaseModel): + """Request model for adding a post source.""" + + name: str = Field(..., min_length=1, max_length=200) + platform_type: str = Field(default="mastodon", description="mastodon | youtube") + instance_url: Optional[str] = Field( + default=None, description="Server URL (required for mastodon)" + ) + api_key: Optional[str] = Field( + default=None, description="API key (required for youtube)" + ) + + model_config = {"extra": "forbid"} + + +class PostUpdate(BaseModel): + """Request model for updating a post (seen/bookmark).""" + + seen: Optional[bool] = None + bookmarked: Optional[bool] = None + + model_config = {"extra": "forbid"} diff --git a/ushadow/backend/src/routers/feed.py b/ushadow/backend/src/routers/feed.py new file mode 100644 index 00000000..16ffaef7 --- /dev/null +++ b/ushadow/backend/src/routers/feed.py @@ -0,0 +1,184 @@ +"""Feed Router - API endpoints for the personalized fediverse feed. + +Thin HTTP adapter: parses requests, calls FeedService, returns responses. +""" + +import logging +from typing import Optional + +from fastapi import APIRouter, Depends, HTTPException, Query +from motor.motor_asyncio import AsyncIOMotorDatabase + +from src.database import get_database +from src.models.feed import SourceCreate +from src.services.auth import get_current_user +from src.services.feed_service import FeedService +from src.utils.auth_helpers import get_user_email + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/api/feed", tags=["feed"]) + + +def get_feed_service( + db: AsyncIOMotorDatabase = Depends(get_database), +) -> FeedService: + return FeedService(db) + + +# ========================================================================= +# Sources +# ========================================================================= + + +@router.get("/sources") +async def list_sources( + service: FeedService = Depends(get_feed_service), + current_user=Depends(get_current_user), +): + """List configured post sources.""" + user_id = get_user_email(current_user) + sources = await service.list_sources(user_id) + return {"sources": sources} + + +@router.post("/sources", status_code=201) +async def add_source( + data: SourceCreate, + service: FeedService = Depends(get_feed_service), + current_user=Depends(get_current_user), +): + """Add a content source (Mastodon instance or YouTube API key).""" + # Validate platform-specific required fields + if data.platform_type == "mastodon" and not data.instance_url: + raise HTTPException( + status_code=422, detail="instance_url is required for mastodon sources" + ) + if data.platform_type == "youtube" and not data.api_key: + raise HTTPException( + status_code=422, detail="api_key is required for youtube sources" + ) + if data.platform_type not in ("mastodon", "youtube"): + raise HTTPException( + status_code=422, detail=f"Unknown platform_type: {data.platform_type}" + ) + + user_id = get_user_email(current_user) + source = await service.add_source(user_id, data) + return source + + +@router.delete("/sources/{source_id}") +async def remove_source( + source_id: str, + service: FeedService = Depends(get_feed_service), + current_user=Depends(get_current_user), +): + """Remove a post source.""" + user_id = get_user_email(current_user) + removed = await service.remove_source(user_id, source_id) + if not removed: + raise HTTPException(status_code=404, detail="Source not found") + return {"status": "removed"} + + +# ========================================================================= +# Interests (read-only, derived from stored memories) +# ========================================================================= + + +@router.get("/interests") +async def get_interests( + service: FeedService = Depends(get_feed_service), + current_user=Depends(get_current_user), +): + """View interests extracted from your stored memories.""" + user_id = get_user_email(current_user) + interests = await service.get_interests(user_id) + return {"interests": [i.model_dump() for i in interests]} + + +# ========================================================================= +# Feed +# ========================================================================= + + +@router.get("/posts") +async def get_feed( + page: int = Query(default=1, ge=1), + page_size: int = Query(default=20, ge=1, le=100), + interest: Optional[str] = Query(default=None, description="Filter by interest name"), + show_seen: bool = Query(default=True), + platform_type: Optional[str] = Query( + default=None, description="Filter: mastodon | youtube" + ), + service: FeedService = Depends(get_feed_service), + current_user=Depends(get_current_user), +): + """Get ranked feed of posts, sorted by relevance to your interests.""" + user_id = get_user_email(current_user) + return await service.get_feed( + user_id, page, page_size, interest, show_seen, platform_type + ) + + +@router.post("/refresh") +async def refresh_feed( + platform_type: Optional[str] = Query( + default=None, description="Refresh only this platform: mastodon | youtube" + ), + service: FeedService = Depends(get_feed_service), + current_user=Depends(get_current_user), +): + """Trigger a feed refresh, optionally scoped to one platform.""" + user_id = get_user_email(current_user) + result = await service.refresh(user_id, platform_type) + return result + + +# ========================================================================= +# Post Actions +# ========================================================================= + + +@router.post("/posts/{post_id}/seen") +async def mark_post_seen( + post_id: str, + service: FeedService = Depends(get_feed_service), + current_user=Depends(get_current_user), +): + """Mark a specific post as seen.""" + user_id = get_user_email(current_user) + ok = await service.mark_post_seen(user_id, post_id) + if not ok: + raise HTTPException(status_code=404, detail="Post not found") + return {"status": "seen"} + + +@router.post("/posts/{post_id}/bookmark") +async def bookmark_post( + post_id: str, + service: FeedService = Depends(get_feed_service), + current_user=Depends(get_current_user), +): + """Toggle bookmark on a specific post.""" + user_id = get_user_email(current_user) + ok = await service.bookmark_post(user_id, post_id) + if not ok: + raise HTTPException(status_code=404, detail="Post not found") + return {"status": "toggled"} + + +# ========================================================================= +# Stats +# ========================================================================= + + +@router.get("/stats") +async def get_stats( + service: FeedService = Depends(get_feed_service), + current_user=Depends(get_current_user), +): + """Get feed statistics.""" + user_id = get_user_email(current_user) + return await service.get_stats(user_id) diff --git a/ushadow/backend/src/routers/memories.py b/ushadow/backend/src/routers/memories.py index 14f19a48..93753ab8 100644 --- a/ushadow/backend/src/routers/memories.py +++ b/ushadow/backend/src/routers/memories.py @@ -9,7 +9,8 @@ The routing is source-aware and queries the appropriate backend(s). """ import logging -from typing import List, Literal, Optional +from typing import List, Literal, Optional, Dict, Any +from datetime import datetime, timedelta import httpx from fastapi import APIRouter, HTTPException, Depends, Query @@ -45,6 +46,39 @@ class ConversationMemoriesResponse(BaseModel): sources_queried: List[str] # Which memory systems were checked +class UserInterestsResponse(BaseModel): + """Response for user interests query""" + user_id: str + interests: List[str] # Top interests + sentiment: Dict[str, str] # Interest -> sentiment mapping + intensity: Dict[str, str] # Interest -> intensity mapping + trending: List[str] # Interests mentioned more recently + content_types: List[str] # Preferred content formats + interest_counts: Dict[str, int] # Interest -> count mapping + days_analyzed: int # Number of days analyzed + + +class MemoriesFilterRequest(BaseModel): + """Request for filtering memories by metadata""" + user_id: Optional[str] = None + from_date: Optional[int] = None # Unix timestamp + to_date: Optional[int] = None # Unix timestamp + search_query: Optional[str] = None + app_ids: Optional[List[str]] = None + category_ids: Optional[List[str]] = None + page: int = 1 + size: int = 100 + + +class MemoriesFilterResponse(BaseModel): + """Response for filtered memories query""" + items: List[MemoryItem] + total: int + page: int + size: int + pages: int + + @router.get("/{memory_id}") async def get_memory_by_id( memory_id: str, @@ -83,7 +117,7 @@ async def get_memory_by_id( # Get specific memory by ID response = await client.get( f"{openmemory_url}/api/v1/memories/{memory_id}", - params={"user_id": get_user_email(current_user)} + params={"user_id": get_user_email(current_user), "output_format": "v1.1"} ) if response.status_code == 200: @@ -254,6 +288,234 @@ async def get_memories_by_conversation( ) +@router.post("/filter") +async def filter_memories( + filter_request: MemoriesFilterRequest, + current_user: User = Depends(get_current_user) +) -> MemoriesFilterResponse: + """ + Filter memories by metadata with advanced search capabilities. + + Supports filtering by: + - Date range (from_date, to_date as Unix timestamps) + - Search query (semantic search) + - App IDs (app_ids filter) + - Category IDs (category_ids filter) + - Pagination (page, size) + + This endpoint leverages OpenMemory's v1.1 output format for enhanced metadata. + + Args: + filter_request: Filter criteria + current_user: Authenticated user + + Returns: + Paginated list of memories matching filter criteria + + Access Control: + - Regular users: Only their own memories + - Admins: Can query all users if user_id specified + """ + try: + openmemory_url = get_localhost_proxy_url("mem0") + user_email = filter_request.user_id or get_user_email(current_user) + + logger.info(f"[MEMORIES] Filtering memories for user: {user_email}") + + async with httpx.AsyncClient() as client: + # Build filter request for OpenMemory + payload = { + "user_id": user_email, + "page": filter_request.page, + "size": filter_request.size, + "output_format": "v1.1" + } + + # Add optional filters + if filter_request.from_date: + payload["from_date"] = filter_request.from_date + if filter_request.to_date: + payload["to_date"] = filter_request.to_date + if filter_request.search_query: + payload["search_query"] = filter_request.search_query + if filter_request.app_ids: + payload["app_ids"] = filter_request.app_ids + if filter_request.category_ids: + payload["category_ids"] = filter_request.category_ids + + response = await client.post( + f"{openmemory_url}/api/v1/memories/filter", + json=payload + ) + response.raise_for_status() + data = response.json() + + # Convert to unified format + memories = [] + for item in data.get("items", []): + metadata = item.get("metadata_", {}) + content = item.get("text") or item.get("content", "") + + # Include categories in metadata if they exist + if "categories" in item and item["categories"]: + metadata["categories"] = item["categories"] + + memories.append(MemoryItem( + id=str(item.get("id")), + content=content, + created_at=str(item.get("created_at", "")), + metadata=metadata, + source="openmemory", + score=None + )) + + return MemoriesFilterResponse( + items=memories, + total=data.get("total", len(memories)), + page=filter_request.page, + size=filter_request.size, + pages=data.get("pages", 1) + ) + + except Exception as e: + logger.error(f"[MEMORIES] Filter query failed: {e}", exc_info=True) + raise HTTPException( + status_code=500, + detail=f"Failed to filter memories: {str(e)}" + ) + + +@router.get("/interests") +async def get_user_interests( + days_recent: int = Query(30, description="Number of recent days to analyze"), + current_user: User = Depends(get_current_user) +) -> UserInterestsResponse: + """ + Extract and aggregate user interests from recent memories. + + This endpoint analyzes recent memories to build a user interest profile for: + - Personalized feed ranking + - Content recommendations + - Trending topic detection + + Based on the interest extraction pattern from docs/INTEREST_EXTRACTION.md, + this aggregates interests stored in memory metadata during write-time. + + Args: + days_recent: Number of days to look back (default: 30) + current_user: Authenticated user + + Returns: + Aggregated user interests with intensity, sentiment, and trending indicators + + Access Control: + - Regular users: Only their own interests + - Admins: Can query specific user if user_id provided + """ + try: + openmemory_url = get_localhost_proxy_url("mem0") + user_email = get_user_email(current_user) + + logger.info(f"[MEMORIES] Extracting interests for user: {user_email} (last {days_recent} days)") + + # Calculate date range + cutoff_date = int((datetime.now() - timedelta(days=days_recent)).timestamp()) + mid_point_date = int((datetime.now() - timedelta(days=days_recent // 2)).timestamp()) + + async with httpx.AsyncClient() as client: + # Query recent memories + response = await client.post( + f"{openmemory_url}/api/v1/memories/filter", + json={ + "user_id": user_email, + "from_date": cutoff_date, + "page": 1, + "size": 100, + "output_format": "v1.1" + } + ) + response.raise_for_status() + data = response.json() + memories = data.get("items", []) + + logger.info(f"[MEMORIES] Analyzing {len(memories)} memories for interests") + + # Aggregate interests from metadata + from collections import Counter + + all_interests = [] + interest_sentiments = {} + interest_intensities = {} + content_types = set() + interest_timestamps: Dict[str, List[int]] = {} + + for memory in memories: + metadata = memory.get("metadata_", {}) + interests = metadata.get("interests", {}) + timestamp = memory.get("created_at", cutoff_date) + + # Convert timestamp to int if it's a string + if isinstance(timestamp, str): + try: + timestamp = int(datetime.fromisoformat(timestamp.replace('Z', '+00:00')).timestamp()) + except: + timestamp = cutoff_date + + # Collect specific interests + for interest in interests.get("specific", []): + all_interests.append(interest) + + # Track when interest was mentioned + if interest not in interest_timestamps: + interest_timestamps[interest] = [] + interest_timestamps[interest].append(timestamp) + + # Collect sentiment + for topic, sentiment in interests.get("sentiment", {}).items(): + interest_sentiments[topic] = sentiment + + # Collect intensity + for topic, intensity in interests.get("intensity", {}).items(): + interest_intensities[topic] = intensity + + # Collect content types + content_types.update(interests.get("content_types", [])) + + # Count frequency + interest_counts = Counter(all_interests) + top_interests = [interest for interest, _ in interest_counts.most_common(10)] + + # Calculate trending (mentioned more in recent half vs older half) + trending = [] + for interest, timestamps in interest_timestamps.items(): + recent_mentions = sum(1 for ts in timestamps if ts > mid_point_date) + older_mentions = sum(1 for ts in timestamps if ts <= mid_point_date) + + # 50% more mentions recently indicates trending + if recent_mentions > older_mentions * 1.5 and older_mentions > 0: + trending.append(interest) + + logger.info(f"[MEMORIES] Extracted {len(top_interests)} top interests, {len(trending)} trending") + + return UserInterestsResponse( + user_id=user_email, + interests=top_interests, + sentiment=interest_sentiments, + intensity=interest_intensities, + trending=trending, + content_types=list(content_types), + interest_counts=dict(interest_counts), + days_analyzed=days_recent + ) + + except Exception as e: + logger.error(f"[MEMORIES] Interest extraction failed: {e}", exc_info=True) + raise HTTPException( + status_code=500, + detail=f"Failed to extract user interests: {str(e)}" + ) + + async def _query_openmemory_by_source_id( openmemory_url: str, source_id: str, @@ -271,7 +533,7 @@ async def _query_openmemory_by_source_id( async with httpx.AsyncClient() as client: # Query all memories for user query_url = f"{openmemory_url}/api/v1/memories/" - params = {"user_id": user_email, "limit": 100} + params = {"user_id": user_email, "limit": 100, "output_format": "v1.1"} logger.info(f"[MEMORIES] Querying: {query_url} with params: {params}") response = await client.get(query_url, params=params) diff --git a/ushadow/backend/src/services/feed_service.py b/ushadow/backend/src/services/feed_service.py new file mode 100644 index 00000000..d6ad6aad --- /dev/null +++ b/ushadow/backend/src/services/feed_service.py @@ -0,0 +1,255 @@ +"""Feed Service - Orchestrates interest extraction, post fetching, scoring, and storage. + +Business logic layer for the personalized multi-platform feed feature. +Router -> FeedService -> InterestExtractor / PostFetcher / PostScorer / MongoDB +""" + +import logging +from datetime import datetime +from typing import Any, Dict, List, Optional + +from motor.motor_asyncio import AsyncIOMotorDatabase + +from src.models.feed import ( + Interest, + Post, + PostSource, + SourceCreate, +) +from src.services.interest_extractor import InterestExtractor +from src.services.post_fetcher import PostFetcher +from src.services.post_scorer import PostScorer + +logger = logging.getLogger(__name__) + + +class FeedService: + """Orchestrates the personalized feed pipeline.""" + + def __init__(self, db: AsyncIOMotorDatabase): + self.db = db + self._extractor = InterestExtractor() + self._fetcher = PostFetcher() + self._scorer = PostScorer() + + # ========================================================================= + # Sources + # ========================================================================= + + async def add_source(self, user_id: str, data: SourceCreate) -> PostSource: + """Add a content source (Mastodon instance or YouTube API key).""" + source = PostSource( + user_id=user_id, + name=data.name, + platform_type=data.platform_type, + instance_url=data.instance_url.rstrip("/") if data.instance_url else None, + api_key=data.api_key, + ) + await source.insert() + logger.info( + f"Added {data.platform_type} source '{data.name}' for user {user_id}" + ) + return source + + async def list_sources(self, user_id: str) -> List[PostSource]: + """List all configured post sources for a user.""" + return await PostSource.find(PostSource.user_id == user_id).to_list() + + async def remove_source(self, user_id: str, source_id: str) -> bool: + """Remove a post source.""" + source = await PostSource.find_one( + PostSource.user_id == user_id, + PostSource.source_id == source_id, + ) + if not source: + return False + await source.delete() + logger.info(f"Removed source '{source.name}' for user {user_id}") + return True + + # ========================================================================= + # Interests (read-only, derived from OpenMemory graph) + # ========================================================================= + + async def get_interests(self, user_id: str) -> List[Interest]: + """Extract and return current interests from the user's knowledge graph.""" + return await self._extractor.extract_interests(user_id) + + # ========================================================================= + # Feed Refresh Pipeline + # ========================================================================= + + async def refresh( + self, user_id: str, platform_type: Optional[str] = None + ) -> Dict[str, Any]: + """Full pipeline: extract interests -> fetch posts -> score -> save. + + Args: + user_id: Owner email. + platform_type: If set, only refresh sources of this platform. + + Returns summary of what was fetched and stored. + """ + # 1. Clear cache and extract fresh interests from memories + self._extractor.clear_cache(user_id) + interests = await self._extractor.extract_interests(user_id) + if not interests: + return { + "status": "no_interests", + "message": "No interests found in your knowledge graph. " + "Add more memories to build your interest profile.", + "interests_count": 0, + "posts_fetched": 0, + "posts_new": 0, + } + + # 2. Get configured sources (optionally filtered by platform) + sources = await self.list_sources(user_id) + if platform_type: + sources = [s for s in sources if s.platform_type == platform_type] + if not sources: + return { + "status": "no_sources", + "message": f"No {platform_type or 'post'} sources configured.", + "interests_count": len(interests), + "posts_fetched": 0, + "posts_new": 0, + } + + # 3. Fetch posts from all platforms (returns List[Post]) + posts = await self._fetcher.fetch_for_interests( + sources, interests, user_id + ) + + # 4. Score posts against interests + scored_posts = self._scorer.score_posts(posts, interests) + + # 5. Save new posts to DB (skip duplicates) + new_count = 0 + for post in scored_posts: + # Check for existing by external_id + existing = await Post.find_one( + Post.user_id == user_id, + Post.external_id == post.external_id, + ) + if existing: + # Update score if post already exists (interests may have changed) + existing.relevance_score = post.relevance_score + existing.matched_interests = post.matched_interests + existing.fetched_at = datetime.utcnow() + await existing.save() + else: + await post.insert() + new_count += 1 + + logger.info( + f"Feed refresh for {user_id}: {len(interests)} interests, " + f"{len(posts)} fetched, {new_count} new posts saved" + ) + + return { + "status": "ok", + "interests_count": len(interests), + "interests_used": [ + {"name": i.name, "hashtags": i.hashtags, "weight": i.relationship_count} + for i in interests[:10] + ], + "posts_fetched": len(posts), + "posts_scored": len(scored_posts), + "posts_new": new_count, + } + + # ========================================================================= + # Feed Read + # ========================================================================= + + async def get_feed( + self, + user_id: str, + page: int = 1, + page_size: int = 20, + filter_interest: Optional[str] = None, + show_seen: bool = True, + platform_type: Optional[str] = None, + ) -> Dict[str, Any]: + """Get the ranked feed of posts for a user. + + Returns paginated posts sorted by relevance_score descending. + Optional platform_type filter for tab-based UI (social vs videos). + """ + filters: Dict[str, Any] = {"user_id": user_id} + + if not show_seen: + filters["seen"] = False + + if filter_interest: + filters["matched_interests"] = filter_interest + + if platform_type: + filters["platform_type"] = platform_type + + query = Post.find(filters) + + total = await query.count() + posts = ( + await query.sort(-Post.relevance_score) + .skip((page - 1) * page_size) + .limit(page_size) + .to_list() + ) + + return { + "posts": posts, + "total": total, + "page": page, + "page_size": page_size, + "total_pages": max(1, -(-total // page_size)), # ceil division + } + + # ========================================================================= + # Post Actions (per-post) + # ========================================================================= + + async def mark_post_seen(self, user_id: str, post_id: str) -> bool: + """Mark a specific post as seen.""" + post = await Post.find_one( + Post.user_id == user_id, Post.post_id == post_id + ) + if not post: + return False + post.seen = True + await post.save() + return True + + async def bookmark_post(self, user_id: str, post_id: str) -> bool: + """Toggle bookmark on a specific post.""" + post = await Post.find_one( + Post.user_id == user_id, Post.post_id == post_id + ) + if not post: + return False + post.bookmarked = not post.bookmarked + await post.save() + return True + + # ========================================================================= + # Stats + # ========================================================================= + + async def get_stats(self, user_id: str) -> Dict[str, Any]: + """Get feed statistics for the user.""" + total = await Post.find(Post.user_id == user_id).count() + unseen = await Post.find( + Post.user_id == user_id, Post.seen == False # noqa: E712 + ).count() + bookmarked = await Post.find( + Post.user_id == user_id, Post.bookmarked == True # noqa: E712 + ).count() + sources = await PostSource.find(PostSource.user_id == user_id).count() + + return { + "total_posts": total, + "unseen_posts": unseen, + "bookmarked_posts": bookmarked, + "sources_count": sources, + } diff --git a/ushadow/backend/src/services/interest_extractor.py b/ushadow/backend/src/services/interest_extractor.py new file mode 100644 index 00000000..00a3aa31 --- /dev/null +++ b/ushadow/backend/src/services/interest_extractor.py @@ -0,0 +1,521 @@ +"""Interest Extractor - Derives user interests from OpenMemory's stored memories. + +Fetches user facts via mem0's /api/v1/memories/filter/enriched endpoint, +which returns graph-enriched data with entity types (PERSON, LOCATION, etc.) +and relationships. Uses entity types to filter out private/irrelevant entities. + +Two layers of signal: + - Categories (broad): "ai, ml & technology" β†’ #ai #ml #technology + - Entities (specific, type-filtered): "Mac mini" β†’ #macmini #apple +""" + +import hashlib +import logging +import re +import time +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional, Set, Tuple + +import httpx + +from src.config import get_localhost_proxy_url +from src.models.feed import Interest + +logger = logging.getLogger(__name__) + +# Categories too personal/broad to produce useful fediverse search results +EXCLUDED_CATEGORIES: Set[str] = { + # Too personal + "personal", "relationships", "health", "finance", + # Too broad β€” no useful fediverse hashtag signal + "preferences", "work", "daily life", "communication", + "lifestyle", "general", "other", "miscellaneous", + "activities", "hobbies", "interests", "entertainment", + "education", "learning", "professional", "career", + "social", "culture", "news", "media", + "goals", "projects", "products", "shopping", + "home", "organization", "company affiliation", + "technical support", "customer support", +} + +# Entity types from Neo4j graph that are NOT useful for fediverse content discovery +EXCLUDED_ENTITY_TYPES: Set[str] = { + "PERSON", "DATE", "EVENT", "ADDRESS", + "__USER__", "USER", +} + +# Simple in-memory cache: user_id β†’ (timestamp, interests) +_interest_cache: Dict[str, Tuple[float, List[Interest]]] = {} +CACHE_TTL_SECONDS = 300 # 5 minutes + +# Maximum number of interests to return (drops low-signal tail) +MAX_INTERESTS = 25 + +# Known product/brand β†’ hashtag expansions (poor man's LLM) +PRODUCT_HASHTAGS: Dict[str, List[str]] = { + "strix halo": ["strixhalo", "amd", "ryzen"], + "strix halo box": ["strixhalo", "amd", "ryzen"], + "mac mini": ["macmini", "apple", "homelab"], + "raspberry pi": ["raspberrypi", "homelab", "sbc"], + "home assistant": ["homeassistant", "smarthome", "iot"], +} + +# Common abbreviation expansions +ABBREVIATIONS: Dict[str, str] = { + "artificial intelligence": "ai", + "machine learning": "ml", + "deep learning": "dl", + "reinforcement learning": "rl", + "natural language processing": "nlp", + "large language model": "llm", + "large language models": "llm", + "language model": "llm", + "language models": "llm", + "lms": "llm", + "kubernetes": "k8s", + "javascript": "js", + "typescript": "ts", + "open source": "opensource", + "self hosted": "selfhosted", + "home lab": "homelab", + "home server": "homeserver", + "mac mini": "macmini", + "raspberry pi": "raspberrypi", +} + +# Words too generic to be useful as standalone hashtags +# (only filtered when splitting multi-word names into individual words) +GENERIC_SUBWORDS: Set[str] = { + "box", "the", "and", "for", "old", "new", "big", "set", + "pro", "max", "mini", "road", "street", "house", "office", + "post", "party", "blue", "red", "green", "white", "black", +} + + +class InterestExtractor: + """Extracts user interests from OpenMemory's stored memories.""" + + async def extract_interests( + self, user_id: str, limit: int = 100 + ) -> List[Interest]: + """Extract interests from the user's stored memories. + + 1. Fetch recent active memories from mem0 + 2. Aggregate categories (breadth) and entities (specificity) + 3. Compute weighted scores based on mention count + recency + 4. Derive hashtags from interest names + 5. Return sorted by weight descending + """ + # Check cache first + now = time.time() + cached = _interest_cache.get(user_id) + if cached and (now - cached[0]) < CACHE_TTL_SECONDS: + logger.debug(f"Returning {len(cached[1])} cached interests for {user_id}") + return cached[1] + + memories = await self._fetch_memories(user_id, limit) + if not memories: + logger.warning("No memories returned from OpenMemory") + return [] + + # Two aggregation passes + category_interests = self._aggregate_categories(memories) + entity_interests = self._aggregate_entities(memories) + + # Merge: entities override categories if same name + merged: Dict[str, Interest] = {} + for interest in category_interests + entity_interests: + key = interest.name.lower() + existing = merged.get(key) + if existing is None or interest.relationship_count > existing.relationship_count: + merged[key] = interest + + interests = sorted( + merged.values(), + key=lambda i: i.relationship_count, + reverse=True, + )[:MAX_INTERESTS] + + logger.info( + f"Extracted {len(interests)} interests from {len(memories)} memories " + f"({len(category_interests)} categories, {len(entity_interests)} entities, " + f"capped at {MAX_INTERESTS})" + ) + + # Update cache + _interest_cache[user_id] = (now, interests) + return interests + + def clear_cache(self, user_id: str) -> None: + """Clear cached interests for a user (e.g., on refresh).""" + _interest_cache.pop(user_id, None) + + # ------------------------------------------------------------------ + # Data fetching + # ------------------------------------------------------------------ + + async def _fetch_memories( + self, user_id: str, limit: int + ) -> List[Dict[str, Any]]: + """Fetch user memories from mem0 via the backend proxy. + + Uses the /filter/enriched endpoint when available, which returns + graph-enriched data with entity types and relationships. + Falls back to /filter if enrichment fails. + """ + proxy_url = get_localhost_proxy_url("mem0") + # mem0's Params model enforces size <= 100 + body = { + "user_id": user_id, + "size": min(limit, 100), + "sort_column": "created_at", + "sort_direction": "desc", + } + + try: + async with httpx.AsyncClient(timeout=30.0) as client: + # Try enriched endpoint first (includes entity types from graph) + resp = await client.post( + f"{proxy_url}/api/v1/memories/filter/enriched", + json=body, + ) + resp.raise_for_status() + data = resp.json() + items = data.get("items", []) + if items: + enriched_count = sum( + 1 for m in items if m.get("graph_enriched") + ) + logger.info( + f"Fetched {len(items)} memories " + f"({enriched_count} graph-enriched)" + ) + return items + except httpx.HTTPError as e: + logger.warning(f"Enriched endpoint failed, falling back: {e}") + + # Fallback to plain filter + try: + async with httpx.AsyncClient(timeout=30.0) as client: + resp = await client.post( + f"{proxy_url}/api/v1/memories/filter", + json=body, + ) + resp.raise_for_status() + data = resp.json() + return data.get("items", []) + except httpx.HTTPError as e: + logger.error(f"Failed to fetch memories: {e}") + return [] + + # ------------------------------------------------------------------ + # Aggregation + # ------------------------------------------------------------------ + + def _aggregate_categories( + self, memories: List[Dict[str, Any]] + ) -> List[Interest]: + """Aggregate memory categories into broad interests.""" + # category_name β†’ {count, latest_timestamp} + agg: Dict[str, Dict[str, Any]] = {} + + for mem in memories: + categories = mem.get("categories", []) + ts = _parse_created_at(mem.get("created_at")) + + for raw_cat in categories: + cat = raw_cat.strip().lower() + if not cat or cat in EXCLUDED_CATEGORIES: + continue + + if cat not in agg: + agg[cat] = {"count": 0, "latest": ts} + agg[cat]["count"] += 1 + if ts and (agg[cat]["latest"] is None or ts > agg[cat]["latest"]): + agg[cat]["latest"] = ts + + interests = [] + for name, info in agg.items(): + weight = _compute_weight(info["count"], info["latest"], is_entity=False) + if weight <= 0: + continue + + hashtags = self._category_to_hashtags(name) + if not hashtags: + continue + + interests.append( + Interest( + name=name, + node_id=_deterministic_id(name), + labels=["category"], + relationship_count=int(round(weight)), + last_active=info["latest"], + hashtags=hashtags, + ) + ) + + return interests + + def _aggregate_entities( + self, memories: List[Dict[str, Any]] + ) -> List[Interest]: + """Aggregate entities into specific interests. + + Uses two sources of entity data: + 1. Graph-enriched entities (from /filter/enriched) β€” have type info + 2. Flat metadata entities (fallback) β€” no type info, heuristic filter + """ + agg: Dict[str, Dict[str, Any]] = {} + + # Build a type lookup from graph-enriched entity data + entity_types: Dict[str, str] = {} # name_lower β†’ type (e.g. "PERSON") + for mem in memories: + for ent in mem.get("entities", []): + name = ent.get("name", "").strip() + etype = ent.get("type", "ENTITY").upper() + if name: + entity_types[name.lower()] = etype + + for mem in memories: + ts = _parse_created_at(mem.get("created_at")) + + # Prefer graph-enriched entities (have type info) + graph_entities = mem.get("entities", []) + if graph_entities: + for ent in graph_entities: + name = ent.get("name", "").strip() + etype = ent.get("type", "ENTITY").upper() + if not name or len(name) < 2: + continue + if etype in EXCLUDED_ENTITY_TYPES: + continue + key = name.lower() + if key not in agg: + agg[key] = { + "count": 0, "latest": ts, + "original": name, "type": etype, + } + agg[key]["count"] += 1 + if ts and (agg[key]["latest"] is None or ts > agg[key]["latest"]): + agg[key]["latest"] = ts + else: + # Fallback: flat metadata entities (no type info) + metadata = mem.get("metadata_", {}) or {} + entities = metadata.get("entities", []) + + if isinstance(entities, list): + entity_list = entities + elif isinstance(entities, dict): + entity_list = [] + for names in entities.values(): + if isinstance(names, list): + entity_list.extend(names) + else: + continue + + for entity in entity_list: + if not isinstance(entity, str) or len(entity.strip()) < 2: + continue + name = entity.strip() + key = name.lower() + + # Use graph type lookup if available + etype = entity_types.get(key, "ENTITY") + if etype in EXCLUDED_ENTITY_TYPES: + continue + + # Heuristic filter for un-typed entities + if etype == "ENTITY" and _is_likely_private(name): + continue + + if key not in agg: + agg[key] = { + "count": 0, "latest": ts, + "original": name, "type": etype, + } + agg[key]["count"] += 1 + if ts and (agg[key]["latest"] is None or ts > agg[key]["latest"]): + agg[key]["latest"] = ts + + interests = [] + for key, info in agg.items(): + weight = _compute_weight(info["count"], info["latest"], is_entity=True) + if weight <= 0: + continue + + hashtags = self._name_to_hashtags(info["original"]) + if not hashtags: + continue + + interests.append( + Interest( + name=info["original"], + node_id=_deterministic_id(key), + labels=["entity", info.get("type", "ENTITY").lower()], + relationship_count=int(round(weight)), + last_active=info["latest"], + hashtags=hashtags, + ) + ) + + return interests + + # ------------------------------------------------------------------ + # Hashtag derivation + # ------------------------------------------------------------------ + + @staticmethod + def _category_to_hashtags(category: str) -> List[str]: + """Convert a category string to hashtags. + + 'ai, ml & technology' β†’ ['ai', 'ml', 'technology'] + """ + # Split on commas, ampersands, 'and' + parts = re.split(r"[,&]+|\band\b", category) + hashtags: List[str] = [] + + for part in parts: + clean = re.sub(r"[^a-zA-Z0-9\s]", "", part).strip().lower() + if not clean: + continue + + joined = clean.replace(" ", "") + if len(joined) >= 2 and joined not in hashtags: + hashtags.append(joined) + + # Check abbreviations for the sub-part + abbrev = ABBREVIATIONS.get(clean) + if abbrev and abbrev not in hashtags: + hashtags.append(abbrev) + + return hashtags + + @staticmethod + def _name_to_hashtags(name: str) -> List[str]: + """Convert an entity/interest name to fediverse hashtags. + + 'Mac mini' β†’ ['macmini', 'apple', 'homelab'] + 'LMs' β†’ ['lms', 'llm'] + 'Kubernetes' β†’ ['kubernetes', 'k8s'] + 'Strix Halo box' β†’ ['strixhalobox', 'strixhalo', 'amd', 'ryzen'] + """ + clean = re.sub(r"[^a-zA-Z0-9\s]", "", name).strip().lower() + joined = clean.replace(" ", "") + + hashtags: List[str] = [] + if joined and len(joined) >= 2: + hashtags.append(joined) + + # Individual words for multi-word names (skip generic subwords) + words = clean.split() + if len(words) > 1: + for word in words: + if ( + len(word) >= 3 + and word not in hashtags + and word not in GENERIC_SUBWORDS + ): + hashtags.append(word) + + # Common abbreviations + abbrev = ABBREVIATIONS.get(clean) + if abbrev and abbrev not in hashtags: + hashtags.append(abbrev) + + # Known product/brand expansions β€” try full name then partial matches + product_tags = PRODUCT_HASHTAGS.get(clean, []) + if not product_tags: + # Try matching a known product prefix (e.g. "strix halo box" β†’ "strix halo") + for product_key, tags in PRODUCT_HASHTAGS.items(): + if clean.startswith(product_key) or product_key.startswith(clean): + product_tags = tags + break + for tag in product_tags: + if tag not in hashtags: + hashtags.append(tag) + + return hashtags + + +# ====================================================================== +# Module-level helpers +# ====================================================================== + + +def _compute_weight( + mention_count: int, + latest: Optional[datetime], + is_entity: bool, +) -> float: + """Compute interest weight from mention count, recency, and source type. + + weight = mention_count Γ— recency_multiplier Γ— source_bonus + """ + if mention_count <= 0: + return 0.0 + + # Recency multiplier based on how recent the latest memory is + recency = 1.0 + if latest: + try: + now = datetime.now(timezone.utc) + if latest.tzinfo is None: + latest = latest.replace(tzinfo=timezone.utc) + age_days = (now - latest).total_seconds() / 86400 + if age_days <= 7: + recency = 2.0 + elif age_days <= 30: + recency = 1.5 + elif age_days <= 90: + recency = 1.0 + else: + recency = 0.5 + except (TypeError, ValueError): + recency = 1.0 + + source_bonus = 1.5 if is_entity else 1.0 + + return mention_count * recency * source_bonus + + +def _deterministic_id(name: str) -> str: + """Generate a stable short ID from a name string.""" + return hashlib.md5(name.lower().encode()).hexdigest()[:12] + + +_PRIVATE_PATTERNS = re.compile( + r"\b(road|street|avenue|lane|drive|court|place|blvd|way)\b" + r"|\b(party|birthday|wedding|anniversary|funeral)\b", + re.IGNORECASE, +) + + +def _is_likely_private(name: str) -> bool: + """Heuristic: is this entity likely a private person/place/event? + + Used as fallback when graph entity types are not available. + """ + # Very short single-word names are often first names (ambiguous) + if len(name) <= 3 and " " not in name: + return True + # Address/event patterns + if _PRIVATE_PATTERNS.search(name): + return True + return False + + +def _parse_created_at(value: Any) -> Optional[datetime]: + """Parse a created_at value (unix timestamp or ISO string).""" + if value is None: + return None + try: + if isinstance(value, datetime): + return value + if isinstance(value, (int, float)): + return datetime.fromtimestamp(value, tz=timezone.utc) + if isinstance(value, str): + return datetime.fromisoformat(value.replace("Z", "+00:00")) + except (ValueError, TypeError, OSError): + pass + return None diff --git a/ushadow/backend/src/services/platforms/__init__.py b/ushadow/backend/src/services/platforms/__init__.py new file mode 100644 index 00000000..74884299 --- /dev/null +++ b/ushadow/backend/src/services/platforms/__init__.py @@ -0,0 +1,54 @@ +"""Platform strategies for multi-source content fetching. + +Each platform (Mastodon, YouTube, etc.) implements PlatformFetcher to handle +its own API calls and data transformation. The generic PostScorer then ranks +all posts uniformly regardless of source platform. +""" + +from abc import ABC, abstractmethod +from typing import Any, Dict, List + +from src.models.feed import Interest, Post + + +class PlatformFetcher(ABC): + """Abstract base for platform-specific content fetchers. + + Implementors handle: + - Fetching raw content from the platform API + - Transforming platform-specific JSON into Post objects + """ + + @abstractmethod + async def fetch_for_interests( + self, interests: List[Interest], config: Dict[str, Any] + ) -> List[Dict[str, Any]]: + """Fetch raw content items matching user interests. + + Args: + interests: User interests with hashtags/keywords. + config: Platform-specific config (instance_url, api_key, etc.) + + Returns: + List of raw platform-specific dicts (to be transformed by to_post). + """ + + @abstractmethod + def to_post( + self, + raw: Dict[str, Any], + source_id: str, + user_id: str, + interests: List[Interest], + ) -> Post | None: + """Transform a raw platform item into a Post document. + + Args: + raw: Raw API response item. + source_id: The PostSource.source_id this came from. + user_id: The user who owns this feed. + interests: Used for initial relevance scoring. + + Returns: + Post object, or None if the item can't be parsed. + """ diff --git a/ushadow/backend/src/services/platforms/mastodon.py b/ushadow/backend/src/services/platforms/mastodon.py new file mode 100644 index 00000000..bf0d8af3 --- /dev/null +++ b/ushadow/backend/src/services/platforms/mastodon.py @@ -0,0 +1,169 @@ +"""Mastodon platform strategy β€” fetches posts from hashtag timelines. + +Uses the public Mastodon API: + GET /api/v1/timelines/tag/{hashtag}?limit=40 +No authentication required for public timelines. +""" + +import asyncio +import logging +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional, Set + +import httpx + +from src.models.feed import Interest, Post +from src.services.platforms import PlatformFetcher + +logger = logging.getLogger(__name__) + +DEFAULT_LIMIT = 40 +MAX_CONCURRENT = 5 +MAX_HASHTAGS = 20 + + +class MastodonFetcher(PlatformFetcher): + """Fetches posts from Mastodon-compatible hashtag timelines.""" + + async def fetch_for_interests( + self, interests: List[Interest], config: Dict[str, Any] + ) -> List[Dict[str, Any]]: + """Fetch posts for all interest hashtags from a Mastodon instance. + + Args: + interests: User interests with derived hashtags. + config: Must contain 'instance_url'. + """ + instance_url = config["instance_url"] + + hashtags = _collect_hashtags(interests, MAX_HASHTAGS) + if not hashtags: + return [] + + semaphore = asyncio.Semaphore(MAX_CONCURRENT) + + async def _bounded_fetch(hashtag: str) -> List[Dict[str, Any]]: + async with semaphore: + return await _fetch_hashtag_timeline( + instance_url, hashtag + ) + + tasks = [_bounded_fetch(tag) for tag in hashtags] + results = await asyncio.gather(*tasks, return_exceptions=True) + + # Flatten and deduplicate by Mastodon URI + seen_ids: Set[str] = set() + posts: List[Dict[str, Any]] = [] + source_id = config.get("source_id", "") + + for result in results: + if isinstance(result, Exception): + logger.warning(f"Mastodon fetch failed: {result}") + continue + for status in result: + ext_id = status.get("uri") or status.get("id", "") + if ext_id and ext_id not in seen_ids: + seen_ids.add(ext_id) + status["_source_id"] = source_id + status["_source_instance"] = instance_url + posts.append(status) + + logger.info( + f"Fetched {len(posts)} unique posts from {instance_url} " + f"({len(tasks)} requests)" + ) + return posts + + def to_post( + self, + raw: Dict[str, Any], + source_id: str, + user_id: str, + interests: List[Interest], + ) -> Post | None: + """Transform a Mastodon Status JSON into a Post document.""" + try: + account = raw.get("account", {}) + tags = raw.get("tags", []) + + acct = account.get("acct", "unknown") + if "@" not in acct: + instance_url = raw.get("_source_instance", "") + domain = ( + instance_url.replace("https://", "") + .replace("http://", "") + .rstrip("/") + ) + acct = f"{acct}@{domain}" if domain else acct + + published_at = _parse_datetime(raw.get("created_at", "")) + + return Post( + user_id=user_id, + source_id=source_id, + external_id=raw.get("uri") or raw.get("id", ""), + platform_type="mastodon", + author_handle=f"@{acct}", + author_display_name=account.get("display_name", ""), + author_avatar=account.get("avatar"), + content=raw.get("content", ""), + url=raw.get("url") or raw.get("uri", ""), + published_at=published_at, + hashtags=[t.get("name", "") for t in tags if t.get("name")], + language=raw.get("language"), + boosts_count=raw.get("reblogs_count", 0), + favourites_count=raw.get("favourites_count", 0), + replies_count=raw.get("replies_count", 0), + ) + except Exception as e: + logger.warning(f"Failed to parse Mastodon status: {e}") + return None + + +# ====================================================================== +# Module-level helpers +# ====================================================================== + + +def _collect_hashtags(interests: List[Interest], max_count: int) -> List[str]: + """Collect unique hashtags from interests, ordered by interest weight.""" + hashtags: List[str] = [] + seen: Set[str] = set() + for interest in interests: + for tag in interest.hashtags: + if tag not in seen: + seen.add(tag) + hashtags.append(tag) + if len(hashtags) >= max_count: + return hashtags + return hashtags + + +async def _fetch_hashtag_timeline( + instance_url: str, hashtag: str +) -> List[Dict[str, Any]]: + """Fetch public posts for a hashtag from a Mastodon instance.""" + url = f"{instance_url.rstrip('/')}/api/v1/timelines/tag/{hashtag}" + try: + async with httpx.AsyncClient(timeout=15.0) as client: + resp = await client.get(url, params={"limit": DEFAULT_LIMIT}) + resp.raise_for_status() + statuses = resp.json() + logger.debug( + f"Fetched {len(statuses)} posts for #{hashtag} " + f"from {instance_url}" + ) + return statuses + except httpx.HTTPError as e: + logger.warning( + f"Failed to fetch #{hashtag} from {instance_url}: {e}" + ) + return [] + + +def _parse_datetime(value: str) -> datetime: + """Parse ISO datetime string, falling back to now(UTC).""" + try: + return datetime.fromisoformat(value.replace("Z", "+00:00")) + except (ValueError, AttributeError): + return datetime.now(timezone.utc) diff --git a/ushadow/backend/src/services/platforms/youtube.py b/ushadow/backend/src/services/platforms/youtube.py new file mode 100644 index 00000000..387a1528 --- /dev/null +++ b/ushadow/backend/src/services/platforms/youtube.py @@ -0,0 +1,281 @@ +"""YouTube platform strategy β€” fetches videos via YouTube Data API v3. + +Uses two API endpoints: + - search.list (100 quota units each) β€” finds video IDs matching interests + - videos.list (1 quota unit per 50 videos) β€” fetches details (thumbnails, stats) + +Quota budget: 5 searches Γ— 100 = 500 + 1 details call = 501 units per refresh. +Free tier is 10,000 units/day β†’ ~19 refreshes/day. +""" + +import asyncio +import logging +import re +from datetime import datetime, timedelta, timezone +from typing import Any, Dict, List, Optional, Set + +import httpx + +from src.models.feed import Interest, Post +from src.services.platforms import PlatformFetcher + +logger = logging.getLogger(__name__) + +SEARCH_URL = "https://www.googleapis.com/youtube/v3/search" +VIDEOS_URL = "https://www.googleapis.com/youtube/v3/videos" + +MAX_QUERIES = 5 +MAX_RESULTS_PER_QUERY = 10 +MAX_CONCURRENT = 3 +PUBLISHED_AFTER_DAYS = 30 + + +class YouTubeFetcher(PlatformFetcher): + """Fetches videos from YouTube Data API v3.""" + + async def fetch_for_interests( + self, interests: List[Interest], config: Dict[str, Any] + ) -> List[Dict[str, Any]]: + """Search YouTube for videos matching user interests. + + 1. Convert top interests β†’ search query strings + 2. Run searches concurrently (bounded) + 3. Batch-fetch video details (thumbnails, stats, duration) + 4. Deduplicate by video ID + """ + api_key = config.get("api_key", "") + if not api_key: + logger.warning("YouTube source has no API key") + return [] + + queries = _interests_to_queries(interests, MAX_QUERIES) + if not queries: + return [] + + # Phase 1: Search for video IDs + semaphore = asyncio.Semaphore(MAX_CONCURRENT) + published_after = ( + datetime.now(timezone.utc) - timedelta(days=PUBLISHED_AFTER_DAYS) + ).isoformat() + + async def _bounded_search(query: str) -> List[str]: + async with semaphore: + return await _search_videos(query, api_key, published_after) + + tasks = [_bounded_search(q) for q in queries] + results = await asyncio.gather(*tasks, return_exceptions=True) + + # Collect unique video IDs + seen_ids: Set[str] = set() + video_ids: List[str] = [] + for result in results: + if isinstance(result, Exception): + logger.warning(f"YouTube search failed: {result}") + continue + for vid_id in result: + if vid_id not in seen_ids: + seen_ids.add(vid_id) + video_ids.append(vid_id) + + if not video_ids: + return [] + + # Phase 2: Batch-fetch video details + videos = await _get_video_details(video_ids, api_key) + + logger.info( + f"Fetched {len(videos)} YouTube videos " + f"({len(queries)} queries, {len(video_ids)} unique IDs)" + ) + + # Tag each video with source metadata + source_id = config.get("source_id", "") + for video in videos: + video["_source_id"] = source_id + + return videos + + def to_post( + self, + raw: Dict[str, Any], + source_id: str, + user_id: str, + interests: List[Interest], + ) -> Post | None: + """Transform a YouTube video JSON into a Post document.""" + try: + snippet = raw.get("snippet", {}) + stats = raw.get("statistics", {}) + content_details = raw.get("contentDetails", {}) + video_id = raw.get("id", "") + + published_at = _parse_datetime(snippet.get("publishedAt", "")) + title = snippet.get("title", "") + description = snippet.get("description", "") + + # Use highest-quality thumbnail available + thumbnails = snippet.get("thumbnails", {}) + thumbnail_url = ( + thumbnails.get("high", {}).get("url") + or thumbnails.get("medium", {}).get("url") + or thumbnails.get("default", {}).get("url") + ) + + # Extract hashtags from title + description + hashtags = _extract_hashtags(f"{title} {description}") + + channel_title = snippet.get("channelTitle", "") + + return Post( + user_id=user_id, + source_id=source_id, + external_id=f"yt:{video_id}", + platform_type="youtube", + author_handle=channel_title, + author_display_name=channel_title, + author_avatar=None, + content=f"{title}
{description[:500]}", + url=f"https://www.youtube.com/watch?v={video_id}", + published_at=published_at, + hashtags=hashtags, + language=snippet.get("defaultAudioLanguage"), + # YouTube-specific fields + thumbnail_url=thumbnail_url, + video_id=video_id, + channel_title=channel_title, + view_count=_safe_int(stats.get("viewCount")), + like_count=_safe_int(stats.get("likeCount")), + duration=_format_duration(content_details.get("duration", "")), + ) + except Exception as e: + logger.warning(f"Failed to parse YouTube video: {e}") + return None + + +# ====================================================================== +# Module-level helpers +# ====================================================================== + + +def _interests_to_queries( + interests: List[Interest], max_queries: int +) -> List[str]: + """Convert top interests into YouTube search queries. + + Joins the top 2-3 hashtags per interest into a search string. + Example: Interest(hashtags=["kubernetes", "k8s"]) β†’ "kubernetes k8s" + """ + queries: List[str] = [] + for interest in interests[:max_queries]: + keywords = " ".join(interest.hashtags[:3]) + if keywords.strip(): + queries.append(keywords) + return queries + + +async def _search_videos( + query: str, api_key: str, published_after: str +) -> List[str]: + """Search YouTube for video IDs matching a query string.""" + params = { + "part": "id", + "q": query, + "type": "video", + "maxResults": MAX_RESULTS_PER_QUERY, + "order": "relevance", + "publishedAfter": published_after, + "key": api_key, + } + try: + async with httpx.AsyncClient(timeout=15.0) as client: + resp = await client.get(SEARCH_URL, params=params) + resp.raise_for_status() + data = resp.json() + return [ + item["id"]["videoId"] + for item in data.get("items", []) + if item.get("id", {}).get("videoId") + ] + except httpx.HTTPError as e: + logger.warning(f"YouTube search failed for '{query}': {e}") + return [] + + +async def _get_video_details( + video_ids: List[str], api_key: str +) -> List[Dict[str, Any]]: + """Batch-fetch video details (snippet, stats, contentDetails). + + YouTube allows up to 50 IDs per request (1 quota unit). + """ + all_videos: List[Dict[str, Any]] = [] + + # Process in batches of 50 + for i in range(0, len(video_ids), 50): + batch = video_ids[i : i + 50] + params = { + "part": "snippet,statistics,contentDetails", + "id": ",".join(batch), + "key": api_key, + } + try: + async with httpx.AsyncClient(timeout=15.0) as client: + resp = await client.get(VIDEOS_URL, params=params) + resp.raise_for_status() + data = resp.json() + all_videos.extend(data.get("items", [])) + except httpx.HTTPError as e: + logger.warning(f"YouTube video details failed: {e}") + + return all_videos + + +def _extract_hashtags(text: str) -> List[str]: + """Extract #hashtags from YouTube title/description.""" + tags = re.findall(r"#(\w{2,})", text.lower()) + # Deduplicate while preserving order + seen: Set[str] = set() + result: List[str] = [] + for tag in tags: + if tag not in seen: + seen.add(tag) + result.append(tag) + return result[:10] + + +def _parse_datetime(value: str) -> datetime: + """Parse ISO datetime string, falling back to now(UTC).""" + try: + return datetime.fromisoformat(value.replace("Z", "+00:00")) + except (ValueError, AttributeError): + return datetime.now(timezone.utc) + + +def _safe_int(value: Optional[str]) -> Optional[int]: + """Convert string numeric value to int, or None.""" + if value is None: + return None + try: + return int(value) + except (ValueError, TypeError): + return None + + +_ISO_DURATION_RE = re.compile( + r"PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?" +) + + +def _format_duration(iso_duration: str) -> Optional[str]: + """Convert ISO 8601 duration (PT1H2M30S) to human-readable (1:02:30).""" + match = _ISO_DURATION_RE.match(iso_duration) + if not match: + return None + + hours = int(match.group(1) or 0) + minutes = int(match.group(2) or 0) + seconds = int(match.group(3) or 0) + + if hours: + return f"{hours}:{minutes:02d}:{seconds:02d}" + return f"{minutes}:{seconds:02d}" diff --git a/ushadow/backend/src/services/post_fetcher.py b/ushadow/backend/src/services/post_fetcher.py new file mode 100644 index 00000000..c5cd459c --- /dev/null +++ b/ushadow/backend/src/services/post_fetcher.py @@ -0,0 +1,85 @@ +"""Post Fetcher - Dispatches content fetching to platform strategies. + +Groups sources by platform_type and delegates to the appropriate +PlatformFetcher implementation (MastodonFetcher, YouTubeFetcher, etc.). +""" + +import logging +from typing import Any, Dict, List, Type + +from src.models.feed import Interest, Post, PostSource +from src.services.platforms import PlatformFetcher +from src.services.platforms.mastodon import MastodonFetcher +from src.services.platforms.youtube import YouTubeFetcher + +logger = logging.getLogger(__name__) + +# Registry: platform_type β†’ fetcher class +_STRATEGIES: Dict[str, Type[PlatformFetcher]] = { + "mastodon": MastodonFetcher, + "youtube": YouTubeFetcher, +} + + +def register_platform(name: str, cls: Type[PlatformFetcher]) -> None: + """Register a new platform fetcher (called at import time).""" + _STRATEGIES[name] = cls + + +class PostFetcher: + """Dispatches content fetching to platform-specific strategies.""" + + async def fetch_for_interests( + self, + sources: List[PostSource], + interests: List[Interest], + user_id: str, + ) -> List[Post]: + """Fetch and transform posts from all active sources. + + Groups sources by platform_type, dispatches to the registered + strategy, transforms raw items to Post objects via to_post(). + + Returns: + List of Post objects (not yet scored). + """ + active = [s for s in sources if s.enabled] + if not active: + logger.info("No active sources configured") + return [] + + all_posts: List[Post] = [] + + for source in active: + strategy_cls = _STRATEGIES.get(source.platform_type) + if not strategy_cls: + logger.warning( + f"No strategy for platform '{source.platform_type}'" + ) + continue + + strategy = strategy_cls() + config = _source_to_config(source) + + raw_items = await strategy.fetch_for_interests(interests, config) + for raw in raw_items: + post = strategy.to_post( + raw, source.source_id, user_id, interests + ) + if post: + all_posts.append(post) + + logger.info( + f"Fetched {len(all_posts)} posts from {len(active)} sources" + ) + return all_posts + + +def _source_to_config(source: PostSource) -> Dict[str, Any]: + """Convert a PostSource document to a strategy config dict.""" + return { + "source_id": source.source_id, + "instance_url": source.instance_url or "", + "api_key": source.api_key or "", + "platform_type": source.platform_type, + } diff --git a/ushadow/backend/src/services/post_scorer.py b/ushadow/backend/src/services/post_scorer.py new file mode 100644 index 00000000..8ad2c01e --- /dev/null +++ b/ushadow/backend/src/services/post_scorer.py @@ -0,0 +1,142 @@ +"""Post Scorer - Ranks posts by relevance to the user's interest graph. + +Platform-agnostic scoring: works on Post objects regardless of whether +they came from Mastodon, YouTube, or any future platform. + +Scoring signals: +- Hashtag overlap with interest keywords (direct match) +- Interest weight (more connected interests rank higher) +- Interest recency (recently-active interests boost more) +- Post recency (newer posts get a time decay boost) +- Content keyword matching (post text contains interest terms) +""" + +import logging +import math +import re +from datetime import datetime, timezone +from typing import Dict, List, Set + +from src.models.feed import Interest, Post + +logger = logging.getLogger(__name__) + +_HTML_TAG_RE = re.compile(r"<[^>]+>") + + +class PostScorer: + """Scores posts against the user's interest graph.""" + + def score_posts( + self, + posts: List[Post], + interests: List[Interest], + ) -> List[Post]: + """Score pre-transformed Post objects against user interests. + + For each post: + 1. Find which interests match (hashtag overlap + content keywords) + 2. Compute relevance_score from matched interest weights + 3. Add recency boost + 4. Return sorted by relevance_score descending + """ + if not interests: + logger.info("No interests to score against") + return posts + + # Build lookups + tag_to_interests = _build_tag_lookup(interests) + kw_to_interests = _build_keyword_lookup(interests) + + now = datetime.now(timezone.utc) + + for post in posts: + matched: Set[str] = set() + score = 0.0 + + # 1. Hashtag matching + for tag in post.hashtags: + for interest in tag_to_interests.get(tag.lower(), []): + if interest.name not in matched: + matched.add(interest.name) + score += _interest_score(interest, now) + + # 2. Content keyword matching (weaker signal) + plain = _strip_html(post.content).lower() + for keyword, kw_interests in kw_to_interests.items(): + if keyword in plain: + for interest in kw_interests: + if interest.name not in matched: + matched.add(interest.name) + score += _interest_score(interest, now) * 0.5 + + # 3. Post recency boost + score += _recency_boost(post.published_at, now) + + post.relevance_score = round(score, 3) + post.matched_interests = sorted(matched) + + posts.sort(key=lambda p: p.relevance_score, reverse=True) + + logger.info( + f"Scored {len(posts)} posts, " + f"top score: {posts[0].relevance_score if posts else 0}" + ) + return posts + + +# ====================================================================== +# Helpers +# ====================================================================== + + +def _build_tag_lookup( + interests: List[Interest], +) -> Dict[str, List[Interest]]: + """Map hashtag β†’ list of interests that use it.""" + lookup: Dict[str, List[Interest]] = {} + for interest in interests: + for tag in interest.hashtags: + lookup.setdefault(tag.lower(), []).append(interest) + return lookup + + +def _build_keyword_lookup( + interests: List[Interest], +) -> Dict[str, List[Interest]]: + """Map interest name words β†’ list of interests (for text matching).""" + lookup: Dict[str, List[Interest]] = {} + for interest in interests: + for word in interest.name.lower().split(): + if len(word) >= 3: + lookup.setdefault(word, []).append(interest) + return lookup + + +def _interest_score(interest: Interest, now: datetime) -> float: + """Score contribution from a single matched interest. + + log2(relationship_count + 1) + recency bonus if active recently. + """ + base = math.log(interest.relationship_count + 1, 2) + + recency_bonus = 0.0 + if interest.last_active: + days_since = (now - interest.last_active).total_seconds() / 86400 + if days_since < 7: + recency_bonus = 2.0 * (1.0 - days_since / 7.0) + + return base + recency_bonus + + +def _recency_boost(published_at: datetime, now: datetime) -> float: + """Boost for recent posts β€” decays logarithmically over hours.""" + hours_old = max((now - published_at).total_seconds() / 3600, 0) + if hours_old < 1: + return 1.5 + return 1.0 / math.log2(hours_old + 1) + + +def _strip_html(html: str) -> str: + """Remove HTML tags for plain text matching.""" + return _HTML_TAG_RE.sub(" ", html).strip() diff --git a/ushadow/frontend/src/App.tsx b/ushadow/frontend/src/App.tsx index c9b83d95..41347219 100644 --- a/ushadow/frontend/src/App.tsx +++ b/ushadow/frontend/src/App.tsx @@ -50,6 +50,7 @@ import ClusterPage from './pages/ClusterPage' import SpeakerRecognitionPage from './pages/SpeakerRecognitionPage' import ChatPage from './pages/ChatPage' import TimelinePage from './pages/TimelinePage' +import FeedPage from './pages/FeedPage' // Wizards (all use WizardShell pattern) import { @@ -141,6 +142,7 @@ function AppContent() { } /> } /> } /> + } /> } /> } /> } /> diff --git a/ushadow/frontend/src/components/feed/AddSourceModal.tsx b/ushadow/frontend/src/components/feed/AddSourceModal.tsx new file mode 100644 index 00000000..10716c9b --- /dev/null +++ b/ushadow/frontend/src/components/feed/AddSourceModal.tsx @@ -0,0 +1,219 @@ +/** + * AddSourceModal β€” form to add a content source (Mastodon instance or YouTube API key). + * + * Platform selector at the top switches between Mastodon fields (instance URL) + * and YouTube fields (API key via SecretInput). + */ + +import { useState, useEffect } from 'react' +import { Radio, Loader2, MessageSquare, Play } from 'lucide-react' +import Modal from '../Modal' +import { SecretInput } from '../settings/SecretInput' +import type { SourceCreateData } from '../../services/feedApi' + +type PlatformType = 'mastodon' | 'youtube' + +interface AddSourceModalProps { + isOpen: boolean + onClose: () => void + onAdd: (data: SourceCreateData) => Promise + isAdding: boolean + defaultPlatform?: PlatformType +} + +export default function AddSourceModal({ isOpen, onClose, onAdd, isAdding, defaultPlatform }: AddSourceModalProps) { + const [platformType, setPlatformType] = useState(defaultPlatform ?? 'mastodon') + const [name, setName] = useState('') + const [instanceUrl, setInstanceUrl] = useState('') + const [apiKey, setApiKey] = useState('') + const [error, setError] = useState(null) + + // Sync platform when modal opens with a new default + useEffect(() => { + if (isOpen && defaultPlatform) { + setPlatformType(defaultPlatform) + } + }, [isOpen, defaultPlatform]) + + const resetForm = () => { + setName('') + setInstanceUrl('') + setApiKey('') + setError(null) + } + + const handlePlatformChange = (type: PlatformType) => { + setPlatformType(type) + resetForm() + } + + const isValid = platformType === 'mastodon' + ? instanceUrl.trim().length > 0 + : apiKey.trim().length > 0 + + const handleSubmit = async (e: React.FormEvent) => { + e.preventDefault() + setError(null) + + try { + if (platformType === 'mastodon') { + let url = instanceUrl.trim() + if (!url.startsWith('http')) url = `https://${url}` + url = url.replace(/\/+$/, '') + + await onAdd({ + name: name.trim() || url.replace(/^https?:\/\//, ''), + platform_type: 'mastodon', + instance_url: url, + }) + } else { + await onAdd({ + name: name.trim() || 'YouTube', + platform_type: 'youtube', + api_key: apiKey.trim(), + }) + } + + resetForm() + onClose() + } catch (err: any) { + setError(err?.response?.data?.detail || 'Failed to add source') + } + } + + return ( + } + maxWidth="sm" + testId="add-source-modal" + > +
+ {/* Platform selector */} +
+ +
+ + +
+
+ + {/* Platform-specific fields */} + {platformType === 'mastodon' ? ( +
+ + setInstanceUrl(e.target.value)} + required + className="w-full px-3 py-2 rounded-lg border border-neutral-300 dark:border-neutral-600 bg-white dark:bg-neutral-700 text-neutral-900 dark:text-neutral-100 text-sm focus:ring-2 focus:ring-primary-500 focus:outline-none" + data-testid="add-source-url-input" + /> +

+ Any Mastodon-compatible server (e.g., mastodon.social, fosstodon.org) +

+
+ ) : ( +
+ + +

+ Get one from{' '} + + Google Cloud Console + + {' '}→ YouTube Data API v3 +

+
+ )} + + {/* Display name (shared) */} +
+ + setName(e.target.value)} + className="w-full px-3 py-2 rounded-lg border border-neutral-300 dark:border-neutral-600 bg-white dark:bg-neutral-700 text-neutral-900 dark:text-neutral-100 text-sm focus:ring-2 focus:ring-primary-500 focus:outline-none" + data-testid="add-source-name-input" + /> +
+ + {error && ( +

{error}

+ )} + +
+ + +
+
+
+ ) +} diff --git a/ushadow/frontend/src/components/feed/FeedEmptyState.tsx b/ushadow/frontend/src/components/feed/FeedEmptyState.tsx new file mode 100644 index 00000000..cee33417 --- /dev/null +++ b/ushadow/frontend/src/components/feed/FeedEmptyState.tsx @@ -0,0 +1,79 @@ +/** + * FeedEmptyState β€” shown when no sources are configured or no posts are available. + * Platform-aware: messaging adapts to whether the user is on Social or Videos tab. + */ + +import { Radio, Plus, Loader2, MessageSquare, Play } from 'lucide-react' + +const PLATFORM_LABELS: Record = { + mastodon: { + name: 'Mastodon', + guidance: 'Add a Mastodon-compatible server to start curating your personalized feed based on your knowledge graph interests.', + }, + youtube: { + name: 'YouTube', + guidance: 'Add a YouTube API key to discover videos ranked by your knowledge graph interests.', + }, +} + +interface FeedEmptyStateProps { + hasSources: boolean + platformType?: string + onAddSource: () => void + onRefresh: () => void + isRefreshing: boolean +} + +export default function FeedEmptyState({ + hasSources, + platformType, + onAddSource, + onRefresh, + isRefreshing, +}: FeedEmptyStateProps) { + const label = platformType ? PLATFORM_LABELS[platformType] : undefined + const Icon = platformType === 'youtube' ? Play : MessageSquare + + if (!hasSources) { + return ( +
+ +

+ No {label?.name ?? ''} sources configured +

+

+ {label?.guidance ?? 'Add a content source to get started.'} +

+ +
+ ) + } + + return ( +
+ +

+ No {label?.name ? `${label.name} ` : ''}posts yet +

+

+ Hit refresh to fetch posts from your sources and score them against your interests. +

+ +
+ ) +} diff --git a/ushadow/frontend/src/components/feed/InterestChip.tsx b/ushadow/frontend/src/components/feed/InterestChip.tsx new file mode 100644 index 00000000..b22d7552 --- /dev/null +++ b/ushadow/frontend/src/components/feed/InterestChip.tsx @@ -0,0 +1,28 @@ +/** + * InterestChip β€” clickable filter chip for a knowledge-graph interest. + */ + +import type { FeedInterest } from '../../services/feedApi' + +interface InterestChipProps { + interest: FeedInterest + active: boolean + onClick: () => void +} + +export default function InterestChip({ interest, active, onClick }: InterestChipProps) { + return ( + + ) +} diff --git a/ushadow/frontend/src/components/feed/PostCard.tsx b/ushadow/frontend/src/components/feed/PostCard.tsx new file mode 100644 index 00000000..1423697d --- /dev/null +++ b/ushadow/frontend/src/components/feed/PostCard.tsx @@ -0,0 +1,170 @@ +/** + * PostCard β€” renders a single fediverse post with actions and relevance info. + */ + +import { Bookmark, Eye, ExternalLink, Clock } from 'lucide-react' +import type { FeedPost } from '../../services/feedApi' + +interface PostCardProps { + post: FeedPost + onBookmark: (postId: string) => void + onMarkSeen: (postId: string) => void +} + +function timeAgo(dateStr: string): string { + const seconds = Math.floor((Date.now() - new Date(dateStr).getTime()) / 1000) + if (seconds < 60) return 'just now' + const minutes = Math.floor(seconds / 60) + if (minutes < 60) return `${minutes}m` + const hours = Math.floor(minutes / 60) + if (hours < 24) return `${hours}h` + const days = Math.floor(hours / 24) + return `${days}d` +} + +/** Strip HTML tags for a plain-text excerpt using regex (no DOM / no innerHTML). */ +function stripHtml(html: string): string { + return html + .replace(//gi, '\n') + .replace(/<\/p>/gi, '\n') + .replace(/<[^>]+>/g, '') + .replace(/&/g, '&') + .replace(/</g, '<') + .replace(/>/g, '>') + .replace(/"/g, '"') + .replace(/'/g, "'") + .replace(/\n{3,}/g, '\n\n') + .trim() +} + +export default function PostCard({ post, onBookmark, onMarkSeen }: PostCardProps) { + const plainText = stripHtml(post.content) + + return ( +
+ {/* Header: author info */} +
+ {post.author_avatar ? ( + {post.author_display_name} + ) : ( +
+ {(post.author_display_name || post.author_handle).charAt(0).toUpperCase()} +
+ )} +
+
+ + {post.author_display_name || post.author_handle} + + + {post.author_handle} + +
+
+ + {timeAgo(post.published_at)} +
+
+ {/* Relevance score pill */} + {post.relevance_score > 0 && ( + + {post.relevance_score.toFixed(1)} + + )} +
+ + {/* Content */} +

+ {plainText} +

+ + {/* Matched interests */} + {post.matched_interests.length > 0 && ( +
+ Matched: + {post.matched_interests.map((name) => ( + + {name} + + ))} +
+ )} + + {/* Hashtags */} + {post.hashtags.length > 0 && ( +
+ {post.hashtags.slice(0, 5).map((tag) => ( + + #{tag} + + ))} +
+ )} + + {/* Actions */} +
+
+ {/* Bookmark */} + + + {/* Mark seen */} + {!post.seen && ( + + )} +
+ + {/* Link to original */} + + + Original + +
+
+ ) +} diff --git a/ushadow/frontend/src/components/feed/YouTubePostCard.tsx b/ushadow/frontend/src/components/feed/YouTubePostCard.tsx new file mode 100644 index 00000000..5ef1e19d --- /dev/null +++ b/ushadow/frontend/src/components/feed/YouTubePostCard.tsx @@ -0,0 +1,216 @@ +/** + * YouTubePostCard β€” renders a YouTube video card with thumbnail, stats, and actions. + * + * Horizontal layout: thumbnail left, metadata right. + * Reuses bookmark/seen actions from PostCard pattern. + */ + +import { Bookmark, Eye, ExternalLink, Clock, ThumbsUp, Play } from 'lucide-react' +import type { FeedPost } from '../../services/feedApi' + +interface YouTubePostCardProps { + post: FeedPost + onBookmark: (postId: string) => void + onMarkSeen: (postId: string) => void +} + +function timeAgo(dateStr: string): string { + const seconds = Math.floor((Date.now() - new Date(dateStr).getTime()) / 1000) + if (seconds < 60) return 'just now' + const minutes = Math.floor(seconds / 60) + if (minutes < 60) return `${minutes}m` + const hours = Math.floor(minutes / 60) + if (hours < 24) return `${hours}h` + const days = Math.floor(hours / 24) + return `${days}d` +} + +function formatCount(n: number | null | undefined): string { + if (n == null) return 'β€”' + if (n >= 1_000_000) return `${(n / 1_000_000).toFixed(1)}M` + if (n >= 1_000) return `${(n / 1_000).toFixed(1)}K` + return String(n) +} + +/** Extract title from content (format: title
description). */ +function extractTitle(content: string): string { + const match = content.match(/(.*?)<\/b>/) + return match ? match[1] : content.replace(/<[^>]+>/g, '').slice(0, 100) +} + +/** Extract description from content (after
). */ +function extractDescription(content: string): string { + const idx = content.indexOf('
') + if (idx === -1) return '' + return content + .slice(idx + 5) + .replace(/<[^>]+>/g, '') + .trim() +} + +export default function YouTubePostCard({ post, onBookmark, onMarkSeen }: YouTubePostCardProps) { + const title = extractTitle(post.content) + const description = extractDescription(post.content) + + return ( +
+
+ {/* Thumbnail */} + + {post.thumbnail_url ? ( + {title} + ) : ( +
+ +
+ )} + {/* Duration overlay */} + {post.duration && ( + + {post.duration} + + )} + {/* Hover play icon */} +
+ +
+
+ + {/* Metadata */} +
+ {/* Title */} + + {title} + + + {/* Channel + time */} +
+ {post.channel_title && ( + + {post.channel_title} + + )} + + + {timeAgo(post.published_at)} + +
+ + {/* Stats row */} +
+ {post.view_count != null && ( + + + {formatCount(post.view_count)} views + + )} + {post.like_count != null && ( + + + {formatCount(post.like_count)} + + )} +
+ + {/* Description snippet */} + {description && ( +

+ {description} +

+ )} + + {/* Matched interests */} + {post.matched_interests.length > 0 && ( +
+ {post.matched_interests.map((name) => ( + + {name} + + ))} +
+ )} + + {/* Relevance + actions */} +
+
+ {/* Relevance score pill */} + {post.relevance_score > 0 && ( + + {post.relevance_score.toFixed(1)} + + )} + + {/* Bookmark */} + + + {/* Mark seen */} + {!post.seen && ( + + )} +
+ + {/* Link to YouTube */} + + + Watch + +
+
+
+
+ ) +} diff --git a/ushadow/frontend/src/components/layout/Layout.tsx b/ushadow/frontend/src/components/layout/Layout.tsx index 32a0caa2..2e2eaa0c 100644 --- a/ushadow/frontend/src/components/layout/Layout.tsx +++ b/ushadow/frontend/src/components/layout/Layout.tsx @@ -128,6 +128,7 @@ export default function Layout() { { path: '/memories', label: 'Memories', icon: Brain }, ] : []), { path: '/timeline', label: 'Timeline', icon: Calendar, featureFlag: 'timeline' }, + { path: '/feed', label: 'Feed', icon: Radio, featureFlag: 'social_feed' }, { path: '/cluster', label: 'Cluster', icon: Network, badgeVariant: 'beta' }, { path: '/kubernetes', label: 'Kubernetes', icon: Cloud }, { path: '/settings', label: 'Settings', icon: Settings }, diff --git a/ushadow/frontend/src/hooks/useFeed.ts b/ushadow/frontend/src/hooks/useFeed.ts new file mode 100644 index 00000000..6eb71b2f --- /dev/null +++ b/ushadow/frontend/src/hooks/useFeed.ts @@ -0,0 +1,206 @@ +/** + * useFeed Hooks + * + * React Query hooks for the personalized multi-platform feed feature. + * Provides hooks for posts, interests, sources, refresh, and post actions. + */ + +import { useQuery, useMutation, useQueryClient } from '@tanstack/react-query' +import { feedApi, type FeedPost, type SourceCreateData } from '../services/feedApi' +import { useAuth } from '../contexts/AuthContext' +import { useKeycloakAuth } from '../contexts/KeycloakAuthContext' + +/** Resolve user email from whichever auth provider is active. */ +function useUserId(): { userId: string; isLoadingUser: boolean } { + const { user: legacyUser, isLoading: legacyLoading } = useAuth() + const { isAuthenticated: kcAuthenticated, user: kcUser, isLoading: kcLoading } = useKeycloakAuth() + + const user = kcAuthenticated && kcUser ? kcUser : legacyUser + return { + userId: user?.email || 'ushadow', + isLoadingUser: legacyLoading || kcLoading, + } +} + +// --------------------------------------------------------------------------- +// Feed Posts +// --------------------------------------------------------------------------- + +export function useFeedPosts( + page: number = 1, + pageSize: number = 20, + interest?: string, + showSeen: boolean = true, + platformType?: string, +) { + const { userId, isLoadingUser } = useUserId() + const queryClient = useQueryClient() + + const postsQuery = useQuery({ + queryKey: ['feedPosts', userId, page, pageSize, interest, showSeen, platformType], + queryFn: () => + feedApi.getPosts({ + page, + page_size: pageSize, + interest, + show_seen: showSeen, + platform_type: platformType, + }).then(r => r.data), + staleTime: 60_000, + enabled: !isLoadingUser, + }) + + const markSeenMutation = useMutation({ + mutationFn: (postId: string) => feedApi.markSeen(postId), + onSuccess: () => { + queryClient.invalidateQueries({ queryKey: ['feedPosts'] }) + queryClient.invalidateQueries({ queryKey: ['feedStats'] }) + }, + }) + + const bookmarkMutation = useMutation({ + mutationFn: (postId: string) => feedApi.bookmarkPost(postId), + onMutate: async (postId: string) => { + await queryClient.cancelQueries({ queryKey: ['feedPosts'] }) + queryClient.setQueriesData<{ posts: FeedPost[] }>( + { queryKey: ['feedPosts'] }, + (old) => { + if (!old) return old + return { + ...old, + posts: old.posts.map((p) => + p.post_id === postId ? { ...p, bookmarked: !p.bookmarked } : p, + ), + } + }, + ) + }, + onSettled: () => { + queryClient.invalidateQueries({ queryKey: ['feedPosts'] }) + queryClient.invalidateQueries({ queryKey: ['feedStats'] }) + }, + }) + + return { + posts: postsQuery.data?.posts ?? [], + total: postsQuery.data?.total ?? 0, + totalPages: postsQuery.data?.total_pages ?? 1, + isLoading: postsQuery.isLoading, + isFetching: postsQuery.isFetching, + error: postsQuery.error, + refetch: postsQuery.refetch, + + markSeen: markSeenMutation.mutate, + toggleBookmark: bookmarkMutation.mutate, + } +} + +// --------------------------------------------------------------------------- +// Interests +// --------------------------------------------------------------------------- + +export function useFeedInterests() { + const { userId, isLoadingUser } = useUserId() + + const query = useQuery({ + queryKey: ['feedInterests', userId], + queryFn: () => feedApi.getInterests().then(r => r.data), + staleTime: 120_000, + enabled: !isLoadingUser, + }) + + return { + interests: query.data?.interests ?? [], + isLoading: query.isLoading, + error: query.error, + } +} + +// --------------------------------------------------------------------------- +// Sources +// --------------------------------------------------------------------------- + +export function useFeedSources() { + const { userId, isLoadingUser } = useUserId() + const queryClient = useQueryClient() + + const sourcesQuery = useQuery({ + queryKey: ['feedSources', userId], + queryFn: () => feedApi.getSources().then(r => r.data), + staleTime: 120_000, + enabled: !isLoadingUser, + }) + + const addMutation = useMutation({ + mutationFn: (data: SourceCreateData) => + feedApi.addSource(data), + onSuccess: () => { + queryClient.invalidateQueries({ queryKey: ['feedSources'] }) + queryClient.invalidateQueries({ queryKey: ['feedStats'] }) + }, + }) + + const removeMutation = useMutation({ + mutationFn: (sourceId: string) => feedApi.removeSource(sourceId), + onSuccess: () => { + queryClient.invalidateQueries({ queryKey: ['feedSources'] }) + queryClient.invalidateQueries({ queryKey: ['feedStats'] }) + }, + }) + + return { + sources: sourcesQuery.data?.sources ?? [], + isLoading: sourcesQuery.isLoading, + error: sourcesQuery.error, + + addSource: addMutation.mutateAsync, + isAdding: addMutation.isPending, + + removeSource: removeMutation.mutateAsync, + isRemoving: removeMutation.isPending, + } +} + +// --------------------------------------------------------------------------- +// Refresh +// --------------------------------------------------------------------------- + +export function useRefreshFeed(platformType?: string) { + const queryClient = useQueryClient() + + const mutation = useMutation({ + mutationFn: () => feedApi.refresh(platformType).then(r => r.data), + onSuccess: () => { + queryClient.invalidateQueries({ queryKey: ['feedPosts'] }) + queryClient.invalidateQueries({ queryKey: ['feedInterests'] }) + queryClient.invalidateQueries({ queryKey: ['feedStats'] }) + }, + }) + + return { + refresh: mutation.mutateAsync, + isRefreshing: mutation.isPending, + lastResult: mutation.data ?? null, + error: mutation.error, + } +} + +// --------------------------------------------------------------------------- +// Stats +// --------------------------------------------------------------------------- + +export function useFeedStats() { + const { userId, isLoadingUser } = useUserId() + + const query = useQuery({ + queryKey: ['feedStats', userId], + queryFn: () => feedApi.getStats().then(r => r.data), + staleTime: 30_000, + enabled: !isLoadingUser, + }) + + return { + stats: query.data ?? null, + isLoading: query.isLoading, + } +} diff --git a/ushadow/frontend/src/pages/ChatPage.tsx b/ushadow/frontend/src/pages/ChatPage.tsx index ba7aa6ff..41737492 100644 --- a/ushadow/frontend/src/pages/ChatPage.tsx +++ b/ushadow/frontend/src/pages/ChatPage.tsx @@ -4,6 +4,7 @@ import { useNavigate } from 'react-router-dom' import { useTheme } from '../contexts/ThemeContext' import { chatApi, BACKEND_URL } from '../services/api' import type { ChatMessage, ChatStatus } from '../services/api' +import { getStorageKey } from '../utils/storage' interface Message extends ChatMessage { id: string @@ -97,12 +98,23 @@ export default function ChatPage() { content: m.content, })) + // Get token from storage (Keycloak in sessionStorage, or legacy in localStorage) + const kcToken = sessionStorage.getItem('kc_access_token') + const legacyToken = localStorage.getItem(getStorageKey('token')) + const token = kcToken || legacyToken + + // Build headers - only include Authorization if we have a valid token + const headers: Record = { + 'Content-Type': 'application/json', + } + + if (token && token !== 'null' && token !== 'undefined') { + headers['Authorization'] = `Bearer ${token}` + } + const response = await fetch(`${BACKEND_URL}/api/chat`, { method: 'POST', - headers: { - 'Content-Type': 'application/json', - 'Authorization': `Bearer ${localStorage.getItem('ushadow_token')}`, - }, + headers, body: JSON.stringify({ messages: allMessages, use_memory: useMemory, diff --git a/ushadow/frontend/src/pages/FeedPage.tsx b/ushadow/frontend/src/pages/FeedPage.tsx new file mode 100644 index 00000000..968fbfce --- /dev/null +++ b/ushadow/frontend/src/pages/FeedPage.tsx @@ -0,0 +1,341 @@ +/** + * FeedPage + * + * Multi-platform feed ranked by your OpenMemory knowledge graph interests. + * Features: Social/Videos tabs, interest filter chips, ranked post cards, + * source management, refresh. + */ + +import { useState } from 'react' +import { + Radio, + RefreshCw, + Plus, + ChevronLeft, + ChevronRight, + Loader2, + AlertCircle, + Trash2, + Eye, + EyeOff, + MessageSquare, + Play, +} from 'lucide-react' +import PostCard from '../components/feed/PostCard' +import YouTubePostCard from '../components/feed/YouTubePostCard' +import InterestChip from '../components/feed/InterestChip' +import AddSourceModal from '../components/feed/AddSourceModal' +import FeedEmptyState from '../components/feed/FeedEmptyState' +import { + useFeedPosts, + useFeedInterests, + useFeedSources, + useRefreshFeed, + useFeedStats, +} from '../hooks/useFeed' + +type FeedTab = 'social' | 'videos' + +const TAB_TO_PLATFORM: Record = { + social: 'mastodon', + videos: 'youtube', +} + +export default function FeedPage() { + const [activeTab, setActiveTab] = useState('social') + const [page, setPage] = useState(1) + const [selectedInterest, setSelectedInterest] = useState() + const [showSeen, setShowSeen] = useState(true) + const [showAddSource, setShowAddSource] = useState(false) + + const platformType = TAB_TO_PLATFORM[activeTab] + + const { posts, total, totalPages, isLoading, isFetching, error, markSeen, toggleBookmark } = + useFeedPosts(page, 20, selectedInterest, showSeen, platformType) + const { interests } = useFeedInterests() + const { sources, addSource, isAdding, removeSource, isRemoving } = useFeedSources() + const { refresh, isRefreshing, lastResult } = useRefreshFeed(platformType) + const { stats } = useFeedStats() + + // Filter sources for the active tab + const tabSources = sources.filter((s) => s.platform_type === platformType) + const hasTabSources = tabSources.length > 0 + + const handleTabChange = (tab: FeedTab) => { + setActiveTab(tab) + setPage(1) + setSelectedInterest(undefined) + } + + const handleRefresh = async () => { + try { + await refresh() + setPage(1) + } catch { + // error is available via useRefreshFeed().error + } + } + + const handleInterestClick = (name: string) => { + setSelectedInterest((prev) => (prev === name ? undefined : name)) + setPage(1) + } + + return ( +
+ {/* Page header */} +
+
+ +
+

Feed

+

+ Content ranked by your knowledge graph interests +

+
+
+ +
+ {/* Stats pills */} + {stats && ( +
+ {stats.total_posts} posts + | + {stats.unseen_posts} unseen + | + {stats.sources_count} sources +
+ )} + + {/* Toggle seen */} + + + {/* Add source */} + + + {/* Refresh β€” scoped to active tab's platform */} + +
+
+ + {/* Tab bar */} +
+ + +
+ + {/* Refresh result banner */} + {lastResult && ( +
+ Fetched {lastResult.posts_fetched} posts, {lastResult.posts_new} new ·{' '} + {lastResult.interests_count} interests used +
+ )} + + {/* Error */} + {error && ( +
+ + {(error as Error).message || 'Failed to load feed'} +
+ )} + + {/* Interest chips */} + {interests.length > 0 && ( +
+ + {interests.map((interest) => ( + handleInterestClick(interest.name)} + /> + ))} +
+ )} + + {/* Sources list β€” scoped to active tab */} +
+ Sources: + {tabSources.map((s) => ( + + {s.platform_type === 'youtube' && } + {s.name} + + + ))} + +
+ + {/* Loading state */} + {isLoading && ( +
+ +
+ )} + + {/* Empty state */} + {!isLoading && posts.length === 0 && ( + setShowAddSource(true)} + onRefresh={handleRefresh} + isRefreshing={isRefreshing} + /> + )} + + {/* Post list β€” conditional card type */} + {!isLoading && posts.length > 0 && ( +
+ {posts.map((post) => + post.platform_type === 'youtube' ? ( + + ) : ( + + ), + )} +
+ )} + + {/* Pagination */} + {totalPages > 1 && ( +
+ + + Page {page} of {totalPages} · {total} posts + + +
+ )} + + {/* Fetching indicator (background refetch) */} + {isFetching && !isLoading && ( +
+ + Updating... +
+ )} + + {/* Add source modal */} + setShowAddSource(false)} + onAdd={addSource} + isAdding={isAdding} + defaultPlatform={platformType as 'mastodon' | 'youtube'} + /> +
+ ) +} diff --git a/ushadow/frontend/src/services/api.ts b/ushadow/frontend/src/services/api.ts index 725ad882..571df424 100644 --- a/ushadow/frontend/src/services/api.ts +++ b/ushadow/frontend/src/services/api.ts @@ -71,7 +71,8 @@ api.interceptors.request.use((config) => { // Prefer Keycloak token if both are present const token = kcToken || legacyToken - if (token) { + // Only add Authorization header if we have a valid token (not null, not empty) + if (token && token !== 'null' && token !== 'undefined') { config.headers.Authorization = `Bearer ${token}` } return config diff --git a/ushadow/frontend/src/services/feedApi.ts b/ushadow/frontend/src/services/feedApi.ts new file mode 100644 index 00000000..5dd43abf --- /dev/null +++ b/ushadow/frontend/src/services/feedApi.ts @@ -0,0 +1,130 @@ +/** + * Feed API Client + * + * HTTP functions for the personalized multi-platform feed feature. + * Uses the shared `api` axios instance (includes JWT auth automatically). + */ + +import { api } from './api' + +export interface FeedPost { + post_id: string + user_id: string + source_id: string + external_id: string + platform_type: string // 'mastodon' | 'youtube' + author_handle: string + author_display_name: string + author_avatar: string | null + content: string + url: string + published_at: string + hashtags: string[] + language: string | null + // Mastodon engagement (optional β€” null for non-mastodon) + boosts_count: number | null + favourites_count: number | null + replies_count: number | null + // YouTube-specific (optional β€” null for non-youtube) + thumbnail_url?: string | null + video_id?: string | null + channel_title?: string | null + view_count?: number | null + like_count?: number | null + duration?: string | null + // Scoring & interaction + relevance_score: number + matched_interests: string[] + seen: boolean + bookmarked: boolean + fetched_at: string +} + +export interface FeedInterest { + name: string + node_id: string + labels: string[] + relationship_count: number + last_active: string | null + hashtags: string[] +} + +export interface FeedSource { + source_id: string + user_id: string + name: string + platform_type: string + instance_url: string | null + api_key: string | null + enabled: boolean + created_at: string +} + +export interface FeedResponse { + posts: FeedPost[] + total: number + page: number + page_size: number + total_pages: number +} + +export interface RefreshResult { + status: string + interests_count: number + interests_used?: Array<{ name: string; hashtags: string[]; weight: number }> + posts_fetched: number + posts_scored?: number + posts_new: number + message?: string +} + +export interface SourceCreateData { + name: string + platform_type: string + instance_url?: string + api_key?: string +} + +export const feedApi = { + // Posts + getPosts: (params: { + page?: number + page_size?: number + interest?: string + show_seen?: boolean + platform_type?: string + }) => api.get('/api/feed/posts', { params }), + + // Refresh (optionally scoped to one platform) + refresh: (platformType?: string) => + api.post('/api/feed/refresh', null, { + params: platformType ? { platform_type: platformType } : undefined, + }), + + // Interests + getInterests: () => + api.get<{ interests: FeedInterest[] }>('/api/feed/interests'), + + // Sources + getSources: () => + api.get<{ sources: FeedSource[] }>('/api/feed/sources'), + + addSource: (data: SourceCreateData) => + api.post('/api/feed/sources', data), + + removeSource: (sourceId: string) => + api.delete(`/api/feed/sources/${sourceId}`), + + // Post actions + markSeen: (postId: string) => + api.post(`/api/feed/posts/${postId}/seen`), + + bookmarkPost: (postId: string) => + api.post(`/api/feed/posts/${postId}/bookmark`), + + // Stats + getStats: () => + api.get<{ total_posts: number; unseen_posts: number; bookmarked_posts: number; sources_count: number }>( + '/api/feed/stats' + ), +}