diff --git a/streamtv/api/auth.py b/streamtv/api/auth.py index 75aa40f..c234f4b 100644 --- a/streamtv/api/auth.py +++ b/streamtv/api/auth.py @@ -201,21 +201,35 @@ async def youtube_set_cookies(request: Request, file: UploadFile = File(...)): cookies_dir = Path("data/cookies") cookies_dir.mkdir(parents=True, exist_ok=True) - # Save uploaded file - cookies_path = cookies_dir / "youtube_cookies.txt" + # Save uploaded file with site name format (youtube.com_cookies.txt or www.youtube.com_cookies.txt) + # Try both formats - prefer youtube.com (without www.) as it's more common + cookies_path_new1 = cookies_dir / "youtube.com_cookies.txt" + cookies_path_new2 = cookies_dir / "www.youtube.com_cookies.txt" + cookies_path_old = cookies_dir / "youtube_cookies.txt" + cookies_path = cookies_path_new1 # Use youtube.com format by default try: with open(cookies_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) + # Also save to old format for backward compatibility + try: + with open(cookies_path_old, "wb") as buffer: + file.file.seek(0) # Reset file pointer + shutil.copyfileobj(file.file, buffer) + except Exception: + pass # Non-critical if old format save fails + # Validate it's a valid cookies file (basic check) with open(cookies_path, "r") as f: content = f.read() if "youtube.com" not in content.lower() and "# Netscape" not in content: cookies_path.unlink() # Delete invalid file + if cookies_path_old.exists(): + cookies_path_old.unlink() raise HTTPException(status_code=400, detail="Invalid cookies file format") - # Update config and persist to file + # Update config and persist to file (use new format) config.update_section("youtube", { "cookies_file": str(cookies_path.absolute()), "use_authentication": True @@ -233,6 +247,8 @@ async def youtube_set_cookies(request: Request, file: UploadFile = File(...)): logger.error(f"Error uploading cookies file: {e}") if cookies_path.exists(): cookies_path.unlink() + if cookies_path_old.exists(): + cookies_path_old.unlink() raise HTTPException(status_code=500, detail=str(e)) @@ -258,21 +274,34 @@ async def archive_set_cookies(request: Request, file: UploadFile = File(...)): cookies_dir = Path("data/cookies") cookies_dir.mkdir(parents=True, exist_ok=True) - # Save uploaded file - cookies_path = cookies_dir / "archive_cookies.txt" + # Save uploaded file with site name format (archive.org_cookies.txt) + # Also keep old format for backward compatibility + cookies_path_new = cookies_dir / "archive.org_cookies.txt" + cookies_path_old = cookies_dir / "archive_cookies.txt" + cookies_path = cookies_path_new # Use new format by default try: with open(cookies_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) + # Also save to old format for backward compatibility + try: + with open(cookies_path_old, "wb") as buffer: + file.file.seek(0) # Reset file pointer + shutil.copyfileobj(file.file, buffer) + except Exception: + pass # Non-critical if old format save fails + # Validate it's a valid cookies file (basic check) with open(cookies_path, "r") as f: content = f.read() if "archive.org" not in content.lower() and "# Netscape" not in content: cookies_path.unlink() # Delete invalid file + if cookies_path_old.exists(): + cookies_path_old.unlink() raise HTTPException(status_code=400, detail="Invalid cookies file format. Must contain archive.org cookies.") - # Update config and persist to file + # Update config and persist to file (use new format) config.update_section("archive_org", { "cookies_file": str(cookies_path.absolute()), "use_authentication": True diff --git a/streamtv/api/channels.py b/streamtv/api/channels.py index ff8bd46..8cebef1 100644 --- a/streamtv/api/channels.py +++ b/streamtv/api/channels.py @@ -368,22 +368,33 @@ async def upload_channel_icon( icons_dir = project_root / "data" / "channel_icons" icons_dir.mkdir(parents=True, exist_ok=True) - # Generate filename: channel_{channel_id}.png - icon_filename = f"channel_{channel_id}.png" + # Generate filename using channel NUMBER (not database ID) for consistency with XMLTV/HDHomeRun + # This ensures icons match channel numbers used in lineup.json and XMLTV channel IDs + icon_filename = f"channel_{channel.number}.png" icon_path = icons_dir / icon_filename + # If there's an old icon file using database ID, remove it + old_icon_filename = f"channel_{channel_id}.png" + old_icon_path = icons_dir / old_icon_filename + if old_icon_path.exists() and old_icon_path != icon_path: + try: + old_icon_path.unlink() + logger.info(f"Removed old icon file using database ID: {old_icon_filename}") + except Exception as e: + logger.warning(f"Could not remove old icon file {old_icon_filename}: {e}") + try: # Save the uploaded file with open(icon_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) - # Update channel logo_path to point to the static file + # Update channel logo_path to point to the static file (using channel number) logo_url = f"/static/channel_icons/{icon_filename}" channel.logo_path = logo_url db.commit() db.refresh(channel) - logger.info(f"Uploaded icon for channel {channel_id} ({channel.name}): {icon_path}") + logger.info(f"Uploaded icon for channel {channel.number} (ID: {channel_id}, {channel.name}): {icon_path}") return channel except Exception as e: @@ -410,8 +421,14 @@ def delete_channel_icon( # Determine icons directory project_root = Path(__file__).parent.parent.parent icons_dir = project_root / "data" / "channel_icons" - icon_filename = f"channel_{channel_id}.png" + + # Try channel number first (preferred), then fallback to database ID for backward compatibility + icon_filename = f"channel_{channel.number}.png" icon_path = icons_dir / icon_filename + if not icon_path.exists(): + # Fallback to database ID for old icons + icon_filename = f"channel_{channel_id}.png" + icon_path = icons_dir / icon_filename # Delete the file if it exists if icon_path.exists(): diff --git a/streamtv/api/iptv.py b/streamtv/api/iptv.py index 52b440a..c9596ba 100644 --- a/streamtv/api/iptv.py +++ b/streamtv/api/iptv.py @@ -33,16 +33,46 @@ def _xml(value) -> str: def _resolve_logo_url(channel, base_url: str) -> Optional[str]: """ Build an absolute logo URL for M3U/XMLTV. - - Uses channel.logo_path if provided. + - Uses channel.logo_path if provided and it matches the channel number. - Falls back to /static/channel_icons/channel_.png. + + Note: Some channels have incorrect logo_path values using database IDs instead of channel numbers. + We validate and use the correct path based on channel number. """ logo_path = channel.logo_path if logo_path: + # If it's an external URL, use it directly if logo_path.startswith('http'): return logo_path - if logo_path.startswith('/'): - return f"{base_url}{logo_path}" - return f"{base_url}/{logo_path}" + + # Check if logo_path contains a channel number that matches this channel + # Extract any number from the logo_path filename + import re + logo_filename = logo_path.split('/')[-1] # Get just the filename + logo_match = re.search(r'channel_(\d+)\.png', logo_filename) + + if logo_match: + logo_number = logo_match.group(1) + channel_number_str = str(channel.number) + # If the logo path number matches the channel number, use it + if logo_number == channel_number_str: + if logo_path.startswith('/'): + return f"{base_url}{logo_path}" + return f"{base_url}/{logo_path}" + else: + # Logo path has wrong number (likely database ID), use fallback + logger.debug(f"Channel {channel.number}: logo_path '{logo_path}' has number {logo_number} (doesn't match channel number), using fallback") + else: + # No number found in logo_path, might be a custom path - use it if it's a static path + if '/static/channel_icons/' in logo_path or '/channel_icons/' in logo_path: + if logo_path.startswith('/'): + return f"{base_url}{logo_path}" + return f"{base_url}/{logo_path}" + # Custom path not in channel_icons - use it as-is + if logo_path.startswith('/'): + return f"{base_url}{logo_path}" + return f"{base_url}/{logo_path}" + # Default fallback based on channel number icon return f"{base_url}/static/channel_icons/channel_{channel.number}.png" @@ -308,9 +338,25 @@ async def get_epg( plex_client = None # Channel definitions - ensure Plex-compatible format + # #region agent log + import json + try: + with open('/Users/roto1231/Documents/XCode Projects/StreamTV/.cursor/debug.log', 'a') as f: + f.write(json.dumps({"sessionId":"debug-session","runId":"run1","hypothesisId":"A","location":"api/iptv.py:311","message":"XMLTV: Starting channel definitions","data":{"channel_count":len(channels),"base_url":base_url},"timestamp":int(__import__('time').time()*1000)})+'\n') + except: pass + # #endregion + for channel in channels: # Use channel number as ID (Plex expects numeric or alphanumeric IDs) channel_id = str(channel.number).strip() + + # #region agent log + try: + with open('/Users/roto1231/Documents/XCode Projects/StreamTV/.cursor/debug.log', 'a') as f: + f.write(json.dumps({"sessionId":"debug-session","runId":"run1","hypothesisId":"A","location":"api/iptv.py:316","message":"XMLTV: Channel ID generated","data":{"channel_number":channel.number,"channel_id":channel_id,"channel_id_type":type(channel_id).__name__,"channel_id_repr":repr(channel_id),"channel_name":channel.name},"timestamp":int(__import__('time').time()*1000)})+'\n') + except: pass + # #endregion + xml_content += f' \n' # Primary display name (required) @@ -325,6 +371,14 @@ async def get_epg( # Logo/icon (Plex expects absolute URLs). Fall back to default icon by number. logo_url = _resolve_logo_url(channel, base_url) + + # #region agent log + try: + with open('/Users/roto1231/Documents/XCode Projects/StreamTV/.cursor/debug.log', 'a') as f: + f.write(json.dumps({"sessionId":"debug-session","runId":"run1","hypothesisId":"D","location":"api/iptv.py:332","message":"XMLTV: Icon URL resolved","data":{"channel_number":channel.number,"logo_url":logo_url,"logo_path":getattr(channel,'logo_path',None)},"timestamp":int(__import__('time').time()*1000)})+'\n') + except: pass + # #endregion + if logo_url: xml_content += f' \n' @@ -884,6 +938,14 @@ async def get_epg( generation_time = time.time() - perf_start_time logger.info(f"XMLTV EPG generated in {generation_time:.2f}s ({len(xml_content)} bytes)") + # #region agent log + try: + import json + with open('/Users/roto1231/Documents/XCode Projects/StreamTV/.cursor/debug.log', 'a') as f: + f.write(json.dumps({"sessionId":"debug-session","runId":"run1","hypothesisId":"B","location":"api/iptv.py:887","message":"XMLTV: Response prepared","data":{"content_length":len(xml_content),"media_type":"application/xml; charset=utf-8","generation_time":generation_time},"timestamp":int(__import__('time').time()*1000)})+'\n') + except: pass + # #endregion + return Response( content=xml_content, media_type="application/xml; charset=utf-8", @@ -1573,3 +1635,4 @@ async def generate(): except Exception as e: logger.error(f"Unexpected error streaming media {media_id}: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") + diff --git a/streamtv/hdhomerun/api.py b/streamtv/hdhomerun/api.py index 18c50ea..94406bd 100644 --- a/streamtv/hdhomerun/api.py +++ b/streamtv/hdhomerun/api.py @@ -7,7 +7,6 @@ import json import logging import asyncio -import re from datetime import datetime from ..database import get_db, Channel, Playlist, PlaylistItem, MediaItem @@ -331,43 +330,40 @@ async def lineup(request: Request, db: Session = Depends(get_db)): lineup_data = [] + # #region agent log + import json + try: + with open('/Users/roto1231/Documents/XCode Projects/StreamTV/.cursor/debug.log', 'a') as f: + f.write(json.dumps({"sessionId":"debug-session","runId":"run1","hypothesisId":"A","location":"hdhomerun/api.py:333","message":"lineup.json: Starting channel iteration","data":{"channel_count":len(channels)},"timestamp":int(__import__('time').time()*1000)})+'\n') + except: pass + # #endregion + for channel in channels: # HDHomeRun expects GuideNumber, GuideName, URL, and optionally HD # We'll use the channel number as GuideNumber guide_number = channel.number - # Strip channel number prefix from GuideName to avoid duplication in Plex - # Plex displays channels as "GuideNumber GuideName", so if name already - # starts with the number, it gets doubled (e.g., "2000 2000's Movies") + # Use the full channel name as GuideName + # Plex matches channels primarily by GuideNumber (channel ID), but GuideName + # should match the primary display-name in XMLTV for proper metadata association + # (icons, descriptions, etc.). Using the full name ensures proper matching. guide_name = channel.name - if guide_name and guide_number: - # Check if name starts with the channel number - name_stripped = guide_name.strip() - number_str = str(guide_number).strip() - - if name_stripped.startswith(number_str): - # Remove the number prefix - remaining = name_stripped[len(number_str):].strip() - - # Remove common patterns after the number (e.g., "'s ", " - ", " ", "-", "'s") - # Handle patterns in order of specificity (longer patterns first) - patterns_to_remove = [ - r"^'s\s+", # "'s " (apostrophe-s-space) - r"^[\s\-\.\_]+", # Any combination of spaces, dashes, dots, underscores - ] - for pattern in patterns_to_remove: - remaining = re.sub(pattern, '', remaining) - - # Only use cleaned name if there's content left, otherwise keep original - if remaining: - guide_name = remaining # Create stream URL - HDHomeRun expects MPEG-TS, but we'll use HLS # Plex/Emby/Jellyfin can handle HLS stream_url = f"{base_url}/hdhomerun/auto/v{channel.number}" + guide_number_str = str(guide_number) + + # #region agent log + try: + with open('/Users/roto1231/Documents/XCode Projects/StreamTV/.cursor/debug.log', 'a') as f: + f.write(json.dumps({"sessionId":"debug-session","runId":"run1","hypothesisId":"A","location":"hdhomerun/api.py:355","message":"lineup.json: Channel entry created","data":{"channel_number":channel.number,"guide_number_type":type(guide_number).__name__,"guide_number_str":guide_number_str,"guide_number_repr":repr(guide_number_str),"guide_name":guide_name,"channel_id_in_db":channel.id},"timestamp":int(__import__('time').time()*1000)})+'\n') + except: pass + # #endregion + channel_entry = { - "GuideNumber": str(guide_number), + "GuideNumber": guide_number_str, "GuideName": guide_name, "URL": stream_url, "HD": 1 if "HD" in channel.name.upper() else 0 @@ -375,6 +371,13 @@ async def lineup(request: Request, db: Session = Depends(get_db)): lineup_data.append(channel_entry) + # #region agent log + try: + with open('/Users/roto1231/Documents/XCode Projects/StreamTV/.cursor/debug.log', 'a') as f: + f.write(json.dumps({"sessionId":"debug-session","runId":"run1","hypothesisId":"A","location":"hdhomerun/api.py:375","message":"lineup.json: Returning lineup data","data":{"total_channels":len(lineup_data),"sample_guide_numbers":[e["GuideNumber"] for e in lineup_data[:3]]},"timestamp":int(__import__('time').time()*1000)})+'\n') + except: pass + # #endregion + return lineup_data @@ -398,10 +401,74 @@ async def stream_channel( """Stream a channel (HDHomeRun format) - Returns MPEG-TS for Plex compatibility""" logger.info(f"HDHomeRun stream request for channel {channel_number} from {request.client.host if request else 'unknown'}") - channel = db.query(Channel).filter( - Channel.number == channel_number, - Channel.enabled == True - ).first() + # #region agent log + import json + try: + with open('/Users/roto1231/Documents/XCode Projects/StreamTV/.cursor/debug.log', 'a') as f: + f.write(json.dumps({"sessionId":"debug-session","runId":"run1","hypothesisId":"C","location":"hdhomerun/api.py:378","message":"stream_channel: Request received","data":{"channel_number":channel_number,"channel_number_type":type(channel_number).__name__,"client_host":request.client.host if request else None},"timestamp":int(__import__('time').time()*1000)})+'\n') + except: pass + # #endregion + + # Query channel - handle both string and numeric channel numbers + # Plex may send channel numbers as strings, but database stores as strings + # Also handle enum validation errors with fallback + channel = None + try: + channel = db.query(Channel).filter( + Channel.number == str(channel_number), + Channel.enabled == True + ).first() + except (LookupError, ValueError, Exception) as query_error: + # Handle SQLAlchemy enum validation errors + error_str = str(query_error) + if isinstance(query_error, LookupError) or "is not among the defined enum values" in error_str: + logger.warning(f"SQLAlchemy enum validation error when querying channel {channel_number} for HDHomeRun stream: {query_error}") + # Query using raw SQL + from sqlalchemy import text + raw_result = db.execute(text(""" + SELECT * FROM channels WHERE number = :number AND enabled = 1 + """), {"number": str(channel_number)}).fetchone() + + if raw_result: + # Construct Channel object from raw result + from ..database.models import ( + PlayoutMode, StreamingMode, ChannelTranscodeMode, ChannelSubtitleMode, + ChannelStreamSelectorMode, ChannelMusicVideoCreditsMode, ChannelSongVideoMode, + ChannelIdleBehavior, ChannelPlayoutSource + ) + channel = Channel() + for key, value in raw_result._mapping.items(): + if value is None: + setattr(channel, key, None) + elif key in ['playout_mode', 'streaming_mode', 'transcode_mode', 'subtitle_mode', + 'stream_selector_mode', 'music_video_credits_mode', 'song_video_mode', + 'idle_behavior', 'playout_source'] and isinstance(value, str): + # These will be handled by @reconstructor, just set as string for now + setattr(channel, key, value) + else: + setattr(channel, key, value) + # Trigger @reconstructor to convert enums + channel._on_load() + else: + # Re-raise if it's a different error + raise + + # If still not found, try without string conversion + if not channel: + try: + channel = db.query(Channel).filter( + Channel.number == channel_number, + Channel.enabled == True + ).first() + except Exception: + pass + + # #region agent log + try: + with open('/Users/roto1231/Documents/XCode Projects/StreamTV/.cursor/debug.log', 'a') as f: + f.write(json.dumps({"sessionId":"debug-session","runId":"run1","hypothesisId":"C","location":"hdhomerun/api.py:386","message":"stream_channel: Channel query result","data":{"channel_found":channel is not None,"channel_number":channel_number,"channel_id":channel.id if channel else None,"channel_name":channel.name if channel else None},"timestamp":int(__import__('time').time()*1000)})+'\n') + except: pass + # #endregion if not channel: logger.warning(f"Channel {channel_number} not found or not enabled") @@ -413,9 +480,23 @@ async def stream_channel( # Get ChannelManager from app state channel_manager = None if request: - app = request.app - if hasattr(app, 'state'): - channel_manager = getattr(app.state, 'channel_manager', None) + try: + app = request.app + if hasattr(app, 'state'): + channel_manager = getattr(app.state, 'channel_manager', None) + # If channel_manager is None, try to get it from the app directly + if channel_manager is None and hasattr(app, 'channel_manager'): + channel_manager = app.channel_manager + except Exception as e: + logger.warning(f"Error accessing app.state for ChannelManager: {e}") + channel_manager = None + + # #region agent log + try: + with open('/Users/roto1231/Documents/XCode Projects/StreamTV/.cursor/debug.log', 'a') as f: + f.write(json.dumps({"sessionId":"debug-session","runId":"run1","hypothesisId":"E","location":"hdhomerun/api.py:400","message":"stream_channel: ChannelManager check","data":{"channel_manager_available":channel_manager is not None,"has_request":request is not None,"has_app_state":request and hasattr(request.app, 'state') if request else False},"timestamp":int(__import__('time').time()*1000)})+'\n') + except: pass + # #endregion if channel_manager: # Use the continuous stream from ChannelManager @@ -429,47 +510,142 @@ async def generate(): import time start_time = time.time() - async for chunk in channel_manager.get_channel_stream(channel_number): - if chunk_count == 0: - first_chunk_time = time.time() - elapsed = first_chunk_time - start_time - logger.info(f"HDHomeRun: First chunk received for channel {channel_number} after {elapsed:.2f}s ({len(chunk)} bytes)") + # #region agent log + try: + with open('/Users/roto1231/Documents/XCode Projects/StreamTV/.cursor/debug.log', 'a') as f: + f.write(json.dumps({"sessionId":"debug-session","runId":"run1","hypothesisId":"C","location":"hdhomerun/api.py:449","message":"stream_channel: Starting ChannelManager stream","data":{"channel_number":channel_number},"timestamp":int(time.time()*1000)})+'\n') + except: pass + # #endregion + + try: + async for chunk in channel_manager.get_channel_stream(channel_number): + if chunk_count == 0: + first_chunk_time = time.time() + elapsed = first_chunk_time - start_time + logger.info(f"HDHomeRun: First chunk received for channel {channel_number} after {elapsed:.2f}s ({len(chunk)} bytes)") + + # #region agent log + try: + with open('/Users/roto1231/Documents/XCode Projects/StreamTV/.cursor/debug.log', 'a') as f: + f.write(json.dumps({"sessionId":"debug-session","runId":"run1","hypothesisId":"C","location":"hdhomerun/api.py:463","message":"stream_channel: First chunk yielded","data":{"channel_number":channel_number,"elapsed_seconds":elapsed,"chunk_size":len(chunk),"channel_id":channel.id if channel else None,"channel_name":channel.name if channel else None},"timestamp":int(time.time()*1000)})+'\n') + except: pass + # #endregion + + chunk_count += 1 + yield chunk + + # Log after first few chunks to verify stream continues + if chunk_count <= 10: + # #region agent log + try: + with open('/Users/roto1231/Documents/XCode Projects/StreamTV/.cursor/debug.log', 'a') as f: + f.write(json.dumps({"sessionId":"debug-session","runId":"run1","hypothesisId":"C","location":"hdhomerun/api.py:534","message":"stream_channel: Chunk yielded","data":{"channel_number":channel_number,"chunk_count":chunk_count,"chunk_size":len(chunk)},"timestamp":int(time.time()*1000)})+'\n') + except: pass + # #endregion + + # Log periodically for long-running streams + if chunk_count % 1000 == 0: + logger.debug(f"HDHomeRun: Streamed {chunk_count} chunks for channel {channel_number}") - chunk_count += 1 - yield chunk + # Stream ended normally (generator exhausted) + logger.info(f"HDHomeRun: Stream generation completed for channel {channel_number} ({chunk_count} chunks total)") - # Log periodically for long-running streams - if chunk_count % 1000 == 0: - logger.debug(f"HDHomeRun: Streamed {chunk_count} chunks for channel {channel_number}") - - logger.info(f"HDHomeRun: Stream generation completed for channel {channel_number} ({chunk_count} chunks total)") + # #region agent log + try: + with open('/Users/roto1231/Documents/XCode Projects/StreamTV/.cursor/debug.log', 'a') as f: + f.write(json.dumps({"sessionId":"debug-session","runId":"run1","hypothesisId":"C","location":"hdhomerun/api.py:540","message":"stream_channel: Stream generator exhausted (normal end)","data":{"channel_number":channel_number,"chunk_count":chunk_count},"timestamp":int(time.time()*1000)})+'\n') + except: pass + # #endregion + except StopAsyncIteration: + # Generator exhausted normally + logger.info(f"HDHomeRun: Stream generator exhausted for channel {channel_number} ({chunk_count} chunks)") + + # #region agent log + try: + with open('/Users/roto1231/Documents/XCode Projects/StreamTV/.cursor/debug.log', 'a') as f: + f.write(json.dumps({"sessionId":"debug-session","runId":"run1","hypothesisId":"C","location":"hdhomerun/api.py:548","message":"stream_channel: StopAsyncIteration (generator exhausted)","data":{"channel_number":channel_number,"chunk_count":chunk_count},"timestamp":int(time.time()*1000)})+'\n') + except: pass + # #endregion + return + except Exception as stream_error: + # Error in the async for loop itself (not in generator) + logger.error(f"HDHomeRun: Error iterating stream for channel {channel_number}: {stream_error}", exc_info=True) + + # #region agent log + try: + import traceback + with open('/Users/roto1231/Documents/XCode Projects/StreamTV/.cursor/debug.log', 'a') as f: + f.write(json.dumps({"sessionId":"debug-session","runId":"run1","hypothesisId":"C","location":"hdhomerun/api.py:556","message":"stream_channel: Error in async for loop","data":{"channel_number":channel_number,"error_type":type(stream_error).__name__,"error_message":str(stream_error),"error_traceback":traceback.format_exc()[:500],"chunk_count":chunk_count},"timestamp":int(time.time()*1000)})+'\n') + except: pass + # #endregion + # Don't raise - let the client handle the connection error gracefully + return except asyncio.CancelledError: # Client disconnected - this is normal, don't log as error logger.info(f"HDHomeRun: Stream cancelled for channel {channel_number} (client disconnected) after {chunk_count} chunks") + + # #region agent log + try: + with open('/Users/roto1231/Documents/XCode Projects/StreamTV/.cursor/debug.log', 'a') as f: + f.write(json.dumps({"sessionId":"debug-session","runId":"run1","hypothesisId":"C","location":"hdhomerun/api.py:487","message":"stream_channel: Stream cancelled (client disconnect)","data":{"channel_number":channel_number,"chunk_count":chunk_count},"timestamp":int(time.time()*1000)})+'\n') + except: pass + # #endregion return except Exception as e: logger.error(f"HDHomeRun: Error in continuous stream for channel {channel_number}: {e}", exc_info=True) + + # #region agent log + try: + import traceback + with open('/Users/roto1231/Documents/XCode Projects/StreamTV/.cursor/debug.log', 'a') as f: + f.write(json.dumps({"sessionId":"debug-session","runId":"run1","hypothesisId":"C","location":"hdhomerun/api.py:495","message":"stream_channel: Stream generation error","data":{"channel_number":channel_number,"error_type":type(e).__name__,"error_message":str(e),"error_traceback":traceback.format_exc()[:500],"chunk_count":chunk_count},"timestamp":int(time.time()*1000)})+'\n') + except: pass + # #endregion + # Don't raise - let the client handle the connection error gracefully # Raising here causes Plex to show "Error tuning channel" return + # HDHomeRun/Plex expects specific headers for MPEG-TS streams + # Plex may timeout if headers are missing or incorrect + headers = { + "Content-Type": "video/mp2t", # MPEG-TS MIME type (explicit, required by Plex) + "Access-Control-Allow-Origin": "*", + "Access-Control-Allow-Methods": "GET, HEAD, OPTIONS", + "Cache-Control": "no-cache, no-store, must-revalidate, private", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", # Disable buffering (nginx) + "Transfer-Encoding": "chunked", # Chunked transfer for streaming (required for live streams) + # HDHomeRun-specific headers that Plex may check + "Server": "HDHomeRun/1.0", + } + + # #region agent log + try: + import json + import time + with open('/Users/roto1231/Documents/XCode Projects/StreamTV/.cursor/debug.log', 'a') as f: + f.write(json.dumps({"sessionId":"debug-session","runId":"run1","hypothesisId":"K","location":"hdhomerun/api.py:610","message":"stream_channel: Creating StreamingResponse","data":{"channel_number":channel_number,"media_type":"video/mp2t","headers_count":len(headers)},"timestamp":int(time.time()*1000)})+'\n') + except: pass + # #endregion + return StreamingResponse( generate(), media_type="video/mp2t", # MPEG-TS MIME type (required by Plex) - headers={ - "Access-Control-Allow-Origin": "*", - "Access-Control-Allow-Methods": "GET, HEAD, OPTIONS", - "Cache-Control": "no-cache, no-store, must-revalidate, private", - "Connection": "keep-alive", - "X-Accel-Buffering": "no", # Disable buffering (nginx) - "Transfer-Encoding": "chunked", # Chunked transfer for streaming (required for live streams) - } + headers=headers ) else: # ChannelManager not available - fallback to on-demand streaming logger.warning(f"HDHomeRun: ChannelManager not available for channel {channel_number}, using on-demand streaming fallback") logger.warning(f"HDHomeRun: This may cause tuning delays. Ensure ChannelManager is initialized at startup.") + # #region agent log + try: + with open('/Users/roto1231/Documents/XCode Projects/StreamTV/.cursor/debug.log', 'a') as f: + f.write(json.dumps({"sessionId":"debug-session","runId":"run1","hypothesisId":"E","location":"hdhomerun/api.py:520","message":"stream_channel: Using fallback (ChannelManager unavailable)","data":{"channel_number":channel_number},"timestamp":int(__import__('time').time()*1000)})+'\n') + except: pass + # #endregion + # Fallback: create stream on-demand from ..streaming.mpegts_streamer import MPEGTSStreamer @@ -546,6 +722,15 @@ async def generate(): raise except Exception as e: logger.error(f"Error streaming channel {channel_number} via HDHomeRun: {e}", exc_info=True) + + # #region agent log + try: + import traceback + with open('/Users/roto1231/Documents/XCode Projects/StreamTV/.cursor/debug.log', 'a') as f: + f.write(json.dumps({"sessionId":"debug-session","runId":"run1","hypothesisId":"C","location":"hdhomerun/api.py:603","message":"stream_channel: Top-level exception","data":{"channel_number":channel_number,"error_type":type(e).__name__,"error_message":str(e)},"timestamp":int(__import__('time').time()*1000)})+'\n') + except: pass + # #endregion + raise HTTPException(status_code=500, detail=f"Error streaming channel: {str(e)}") diff --git a/streamtv/streaming/channel_manager.py b/streamtv/streaming/channel_manager.py index 62b0787..4baa9bc 100644 --- a/streamtv/streaming/channel_manager.py +++ b/streamtv/streaming/channel_manager.py @@ -11,6 +11,7 @@ from streamtv.database.models import ChannelPlaybackPosition, PlayoutMode from sqlalchemy.orm import Session from streamtv.streaming.mpegts_streamer import MPEGTSStreamer +from streamtv.streaming.stream_prewarmer import StreamPrewarmer logger = logging.getLogger(__name__) @@ -38,6 +39,9 @@ def __init__(self, channel: Channel, db_session_factory): self._current_item_index = 0 self._current_item_start_time: Optional[datetime] = None self._timeline_lock = asyncio.Lock() + + # Stream pre-warmer for fast startup + self._prewarmer = StreamPrewarmer(max_buffer_size=5 * 1024 * 1024, max_chunks=20) # 5MB, 20 chunks async def start(self): """Start the continuous stream in the background""" @@ -349,37 +353,266 @@ async def get_stream(self) -> AsyncIterator[bytes]: # No explicit return needed - async generators automatically stop when function completes # CONTINUOUS mode: use broadcast queue (existing logic) - # Create a queue for this client - client_queue = asyncio.Queue(maxsize=10) + # Create a queue for this client (larger size to prevent blocking) + client_queue = asyncio.Queue(maxsize=50) # Increased from 10 to 50 to prevent queue full errors async with self._lock: # If stream is not running, start it if not self._is_running: + # #region agent log + try: + import json + with open('/Users/roto1231/Documents/XCode Projects/StreamTV/.cursor/debug.log', 'a') as f: + f.write(json.dumps({"sessionId":"debug-session","runId":"run1","hypothesisId":"F","location":"channel_manager.py:357","message":"CONTINUOUS: Starting stream (not running)","data":{"channel_number":self.channel_number},"timestamp":int(__import__('time').time()*1000)})+'\n') + except: pass + # #endregion await self.start() # Wait for stream to initialize await asyncio.sleep(0.5) + else: + # #region agent log + try: + import json + with open('/Users/roto1231/Documents/XCode Projects/StreamTV/.cursor/debug.log', 'a') as f: + f.write(json.dumps({"sessionId":"debug-session","runId":"run1","hypothesisId":"F","location":"channel_manager.py:365","message":"CONTINUOUS: Stream already running","data":{"channel_number":self.channel_number,"existing_clients":self._client_count},"timestamp":int(__import__('time').time()*1000)})+'\n') + except: pass + # #endregion # Register this client self._client_queues.append(client_queue) self._client_count += 1 + # #region agent log + try: + import json + with open('/Users/roto1231/Documents/XCode Projects/StreamTV/.cursor/debug.log', 'a') as f: + f.write(json.dumps({"sessionId":"debug-session","runId":"run1","hypothesisId":"F","location":"channel_manager.py:373","message":"CONTINUOUS: Client queue registered","data":{"channel_number":self.channel_number,"total_clients":self._client_count,"queue_size":client_queue.qsize()},"timestamp":int(__import__('time').time()*1000)})+'\n') + except: pass + # #endregion + # Continuous: calculate position in playout timeline using system time (ErsatzTV-style) current_position = await self._get_current_position() now = datetime.utcnow() elapsed_hours = int(current_position['elapsed_seconds'] // 3600) elapsed_minutes = int((current_position['elapsed_seconds'] % 3600) // 60) logger.info(f"Client connected to channel {self.channel_number} (CONTINUOUS mode) at {now} - position {current_position['item_index']}/{len(self._schedule_items)} ({elapsed_hours}h {elapsed_minutes}m from midnight, total clients: {self._client_count})") + + # Pre-warm current item if no buffer exists and stream is running + # This ensures fast response even when connecting mid-stream + if self._is_running and len(self._schedule_items) > 0: + current_idx = current_position.get('item_index', 0) + if current_idx < len(self._schedule_items): + current_item = self._schedule_items[current_idx] + current_media = current_item.get('media_item') + if current_media and not ('PLACEHOLDER' in current_media.url.upper()): + # Check if buffer already exists + buffer_info = await self._prewarmer.get_buffer_info(self.channel_number) + if not buffer_info.get("has_buffer"): + # Start pre-warming current item in background + try: + # Ensure streamer exists (it should be created in _run_continuous_stream) + if not self.streamer: + # Create a temporary streamer for pre-warming + db_temp = self.db_session_factory() + try: + from streamtv.streaming.mpegts_streamer import MPEGTSStreamer + temp_streamer = MPEGTSStreamer(db_temp) + except Exception as e: + logger.warning(f"Could not create streamer for pre-warming: {e}") + temp_streamer = None + else: + temp_streamer = self.streamer + db_temp = None + + if temp_streamer: + async def current_item_generator(): + try: + async for chunk in temp_streamer._stream_single_item(current_media, self.channel_number, skip_codec_detection=True): + yield chunk + finally: + if db_temp: + try: + db_temp.close() + except: + pass + + asyncio.create_task( + self._prewarmer.prewarm_stream(self.channel_number, current_item_generator()) + ) + logger.info(f"Started pre-warming current item (index {current_idx}) for channel {self.channel_number} on client connect") + # #region agent log + try: + import json + with open('/Users/roto1231/Documents/XCode Projects/StreamTV/.cursor/debug.log', 'a') as f: + f.write(json.dumps({"sessionId":"debug-session","runId":"run1","hypothesisId":"I","location":"channel_manager.py:420","message":"CONTINUOUS: Pre-warming current item on client connect","data":{"channel_number":self.channel_number,"item_index":current_idx,"item_title":current_media.title[:60] if current_media else None,"has_streamer":self.streamer is not None},"timestamp":int(__import__('time').time()*1000)})+'\n') + except: pass + # #endregion + else: + logger.warning(f"Could not pre-warm current item for channel {self.channel_number}: streamer not available") + # #region agent log + try: + import json + with open('/Users/roto1231/Documents/XCode Projects/StreamTV/.cursor/debug.log', 'a') as f: + f.write(json.dumps({"sessionId":"debug-session","runId":"run1","hypothesisId":"I","location":"channel_manager.py:435","message":"CONTINUOUS: Pre-warming skipped (streamer unavailable)","data":{"channel_number":self.channel_number,"item_index":current_idx,"has_streamer":self.streamer is not None},"timestamp":int(__import__('time').time()*1000)})+'\n') + except: pass + # #endregion + except Exception as e: + logger.warning(f"Failed to pre-warm current item for channel {self.channel_number}: {e}", exc_info=True) + # #region agent log + try: + import json + import traceback + with open('/Users/roto1231/Documents/XCode Projects/StreamTV/.cursor/debug.log', 'a') as f: + f.write(json.dumps({"sessionId":"debug-session","runId":"run1","hypothesisId":"I","location":"channel_manager.py:443","message":"CONTINUOUS: Pre-warming error","data":{"channel_number":self.channel_number,"item_index":current_idx,"error_type":type(e).__name__,"error_message":str(e),"traceback":traceback.format_exc()[:300]},"timestamp":int(__import__('time').time()*1000)})+'\n') + except: pass + # #endregion + else: + # #region agent log + try: + import json + with open('/Users/roto1231/Documents/XCode Projects/StreamTV/.cursor/debug.log', 'a') as f: + f.write(json.dumps({"sessionId":"debug-session","runId":"run1","hypothesisId":"I","location":"channel_manager.py:450","message":"CONTINUOUS: Pre-warming skipped (buffer already exists)","data":{"channel_number":self.channel_number,"item_index":current_idx,"buffer_chunks":buffer_info.get("chunk_count",0)},"timestamp":int(__import__('time').time()*1000)})+'\n') + except: pass + # #endregion + else: + # #region agent log + try: + import json + with open('/Users/roto1231/Documents/XCode Projects/StreamTV/.cursor/debug.log', 'a') as f: + f.write(json.dumps({"sessionId":"debug-session","runId":"run1","hypothesisId":"I","location":"channel_manager.py:457","message":"CONTINUOUS: Pre-warming skipped (invalid index)","data":{"channel_number":self.channel_number,"item_index":current_idx,"schedule_items_count":len(self._schedule_items)},"timestamp":int(__import__('time').time()*1000)})+'\n') + except: pass + # #endregion + + # Stream from the broadcast queue (with pre-warmed buffer support) + chunk_count = 0 + timeout_count = 0 + + # Wait a short time for initial chunks to arrive in the queue + # This handles the case where client connects right as a new item is starting + initial_wait_time = 0.1 # 100ms + initial_chunks_received = False + + # Try to get first chunk quickly (with short timeout) + # #region agent log + try: + import json + import time + with open('/Users/roto1231/Documents/XCode Projects/StreamTV/.cursor/debug.log', 'a') as f: + f.write(json.dumps({"sessionId":"debug-session","runId":"run1","hypothesisId":"J","location":"channel_manager.py:496","message":"CONTINUOUS: Attempting immediate chunk check","data":{"channel_number":self.channel_number,"wait_time_ms":initial_wait_time*1000,"queue_size":client_queue.qsize()},"timestamp":int(time.time()*1000)})+'\n') + except: pass + # #endregion + + try: + first_chunk = await asyncio.wait_for(client_queue.get(), timeout=initial_wait_time) + chunk_count += 1 + initial_chunks_received = True + + # #region agent log + try: + import json + import time + with open('/Users/roto1231/Documents/XCode Projects/StreamTV/.cursor/debug.log', 'a') as f: + f.write(json.dumps({"sessionId":"debug-session","runId":"run1","hypothesisId":"J","location":"channel_manager.py:505","message":"CONTINUOUS: First chunk received immediately","data":{"channel_number":self.channel_number,"chunk_count":chunk_count,"chunk_size":len(first_chunk),"wait_time_ms":initial_wait_time*1000},"timestamp":int(time.time()*1000)})+'\n') + except: pass + # #endregion + + yield first_chunk + except asyncio.TimeoutError: + # No chunks available immediately - check for pre-warmed buffer + # #region agent log + try: + import json + import time + with open('/Users/roto1231/Documents/XCode Projects/StreamTV/.cursor/debug.log', 'a') as f: + f.write(json.dumps({"sessionId":"debug-session","runId":"run1","hypothesisId":"J","location":"channel_manager.py:518","message":"CONTINUOUS: No immediate chunks, checking pre-warmed buffer","data":{"channel_number":self.channel_number,"queue_size":client_queue.qsize()},"timestamp":int(time.time()*1000)})+'\n') + except: pass + # #endregion + + # Check if we have pre-warmed chunks and serve them first (if we didn't get immediate chunks) + if not initial_chunks_received: + # Give pre-warming a moment to start filling (if it just started) + # Check buffer, and if empty, wait a short time for pre-warming to fill + buffer_info = await self._prewarmer.get_buffer_info(self.channel_number) + if not buffer_info.get("has_buffer") or buffer_info.get("chunk_count", 0) == 0: + # Wait a short time for pre-warming to start filling (if it's running) + await asyncio.sleep(0.2) # 200ms - enough for FFmpeg to start and produce first chunk + buffer_info = await self._prewarmer.get_buffer_info(self.channel_number) + + if buffer_info.get("has_buffer") and buffer_info.get("chunk_count", 0) > 0: + # Serve pre-warmed chunks immediately + logger.info(f"Serving {buffer_info['chunk_count']} pre-warmed chunks for channel {self.channel_number}") + # #region agent log + try: + import json + with open('/Users/roto1231/Documents/XCode Projects/StreamTV/.cursor/debug.log', 'a') as f: + f.write(json.dumps({"sessionId":"debug-session","runId":"run1","hypothesisId":"H","location":"channel_manager.py:405","message":"CONTINUOUS: Serving pre-warmed chunks","data":{"channel_number":self.channel_number,"chunk_count":buffer_info['chunk_count'],"buffer_size":buffer_info['buffer_size_bytes']},"timestamp":int(__import__('time').time()*1000)})+'\n') + except: pass + # #endregion + + # Get buffered chunks (this will clear the buffer) + async def get_live_stream(): + while self._is_running: + try: + chunk = await asyncio.wait_for(client_queue.get(), timeout=2.0) + yield chunk + except asyncio.TimeoutError: + if not self._is_running: + break + continue + + # Use pre-warmer's get_buffered_stream to serve buffer first, then live + async for chunk in self._prewarmer.get_buffered_stream(self.channel_number, get_live_stream()): + chunk_count += 1 + if chunk_count <= 10: + # #region agent log + try: + import json + with open('/Users/roto1231/Documents/XCode Projects/StreamTV/.cursor/debug.log', 'a') as f: + f.write(json.dumps({"sessionId":"debug-session","runId":"run1","hypothesisId":"H","location":"channel_manager.py:425","message":"CONTINUOUS: Chunk yielded (pre-warmed or live)","data":{"channel_number":self.channel_number,"chunk_count":chunk_count,"chunk_size":len(chunk)},"timestamp":int(__import__('time').time()*1000)})+'\n') + except: pass + # #endregion + yield chunk + return # Exit after buffered stream completes - # Stream from the broadcast queue + # Fallback: stream from queue normally (no pre-warm buffer) try: while self._is_running: try: chunk = await asyncio.wait_for(client_queue.get(), timeout=2.0) + chunk_count += 1 + timeout_count = 0 # Reset timeout counter on successful chunk + + # Log first few chunks to verify stream is working + if chunk_count <= 10: + # #region agent log + try: + import json + with open('/Users/roto1231/Documents/XCode Projects/StreamTV/.cursor/debug.log', 'a') as f: + f.write(json.dumps({"sessionId":"debug-session","runId":"run1","hypothesisId":"F","location":"channel_manager.py:445","message":"CONTINUOUS: Chunk received from client queue (no pre-warm)","data":{"channel_number":self.channel_number,"chunk_count":chunk_count,"chunk_size":len(chunk),"queue_size":client_queue.qsize()},"timestamp":int(__import__('time').time()*1000)})+'\n') + except: pass + # #endregion + yield chunk except asyncio.TimeoutError: + timeout_count += 1 + # #region agent log + try: + import json + with open('/Users/roto1231/Documents/XCode Projects/StreamTV/.cursor/debug.log', 'a') as f: + f.write(json.dumps({"sessionId":"debug-session","runId":"run1","hypothesisId":"F","location":"channel_manager.py:390","message":"CONTINUOUS: Timeout waiting for chunk","data":{"channel_number":self.channel_number,"timeout_count":timeout_count,"is_running":self._is_running,"queue_size":client_queue.qsize()},"timestamp":int(__import__('time').time()*1000)})+'\n') + except: pass + # #endregion + # Check if still running if not self._is_running: + logger.debug(f"CONTINUOUS: Stream stopped for channel {self.channel_number}, exiting loop") break + + # If too many timeouts, log warning but continue + if timeout_count >= 5: + logger.warning(f"CONTINUOUS: Multiple timeouts ({timeout_count}) for channel {self.channel_number}, but continuing...") + timeout_count = 0 # Reset to avoid spam + # Continue waiting continue finally: @@ -388,7 +621,15 @@ async def get_stream(self) -> AsyncIterator[bytes]: if client_queue in self._client_queues: self._client_queues.remove(client_queue) self._client_count -= 1 - logger.debug(f"Client disconnected from channel {self.channel_number} (remaining clients: {self._client_count})") + logger.debug(f"Client disconnected from channel {self.channel_number} (remaining clients: {self._client_count}, chunks sent: {chunk_count})") + + # #region agent log + try: + import json + with open('/Users/roto1231/Documents/XCode Projects/StreamTV/.cursor/debug.log', 'a') as f: + f.write(json.dumps({"sessionId":"debug-session","runId":"run1","hypothesisId":"F","location":"channel_manager.py:410","message":"CONTINUOUS: Client disconnected","data":{"channel_number":self.channel_number,"chunk_count":chunk_count,"remaining_clients":self._client_count},"timestamp":int(__import__('time').time()*1000)})+'\n') + except: pass + # #endregion def _save_playback_position( self, @@ -625,10 +866,32 @@ async def _run_continuous_stream(self): # Stream continuously, starting from calculated position # After first loop, always start from 0 first_loop = True + first_item_prewarmed = False + while self._is_running: # Determine starting index for this loop loop_start = start_index if first_loop else 0 + # Pre-warm first item for fast client response (only on first loop, before streaming) + if first_loop and not first_item_prewarmed and loop_start < len(self._schedule_items): + first_item = self._schedule_items[loop_start] + first_media = first_item.get('media_item') + if first_media and not ('PLACEHOLDER' in first_media.url.upper()): + try: + # Create a generator for the first item (skip codec detection for speed) + async def first_item_generator(): + async for chunk in self.streamer._stream_single_item(first_media, self.channel_number, skip_codec_detection=True): + yield chunk + + # Start pre-warming in background (don't await - let it run) + asyncio.create_task( + self._prewarmer.prewarm_stream(self.channel_number, first_item_generator()) + ) + logger.info(f"Started pre-warming first item for channel {self.channel_number} (item {loop_start})") + first_item_prewarmed = True + except Exception as e: + logger.warning(f"Failed to pre-warm first item for channel {self.channel_number}: {e}") + # Loop through schedule items starting from calculated position for idx in range(loop_start, len(self._schedule_items)): if not self._is_running: @@ -666,6 +929,32 @@ async def _run_continuous_stream(self): self._current_item_index = idx self._current_item_start_time = datetime.utcnow() # Use system time + # Pre-warm current item for fast client response (start before streaming) + # This ensures buffer is ready when clients connect mid-item + buffer_info = await self._prewarmer.get_buffer_info(self.channel_number) + if not buffer_info.get("has_buffer") or buffer_info.get("chunk_count", 0) < 5: + # Start pre-warming current item in background + try: + async def current_item_generator(): + async for chunk in self.streamer._stream_single_item(media_item, self.channel_number, skip_codec_detection=True): + yield chunk + + # Start pre-warming in background (don't await - let it run) + asyncio.create_task( + self._prewarmer.prewarm_stream(self.channel_number, current_item_generator()) + ) + logger.debug(f"Started pre-warming current item (index {idx}) for channel {self.channel_number}") + # #region agent log + try: + import json + import time + with open('/Users/roto1231/Documents/XCode Projects/StreamTV/.cursor/debug.log', 'a') as f: + f.write(json.dumps({"sessionId":"debug-session","runId":"run1","hypothesisId":"L","location":"channel_manager.py:930","message":"CONTINUOUS: Pre-warming current item before streaming","data":{"channel_number":self.channel_number,"item_index":idx,"item_title":media_item.title[:60] if media_item else None},"timestamp":int(time.time()*1000)})+'\n') + except: pass + # #endregion + except Exception as e: + logger.warning(f"Failed to pre-warm current item for channel {self.channel_number}: {e}") + # Periodically save position (every 5 items or every 30 minutes) if idx % 5 == 0 or (self._current_item_start_time and (datetime.utcnow() - self._current_item_start_time).total_seconds() > 1800): try: @@ -694,21 +983,48 @@ async def _run_continuous_stream(self): db.rollback() try: - # Stream this item - async for chunk in self.streamer._stream_single_item(media_item, self.channel_number): + # Stream this item (skip codec detection for first item if pre-warming) + skip_codec = (first_loop and idx == loop_start and not first_item_prewarmed) + chunk_count_for_item = 0 + async for chunk in self.streamer._stream_single_item(media_item, self.channel_number, skip_codec_detection=skip_codec): if not self._is_running: break + chunk_count_for_item += 1 + + # Log first chunk of item + if chunk_count_for_item == 1: + # #region agent log + try: + import json + with open('/Users/roto1231/Documents/XCode Projects/StreamTV/.cursor/debug.log', 'a') as f: + f.write(json.dumps({"sessionId":"debug-session","runId":"run1","hypothesisId":"G","location":"channel_manager.py:737","message":"CONTINUOUS: First chunk from item","data":{"channel_number":self.channel_number,"item_index":idx,"item_title":media_item.title[:60] if media_item else None,"client_count":len(self._client_queues)},"timestamp":int(__import__('time').time()*1000)})+'\n') + except: pass + # #endregion + # Broadcast to all connected clients disconnected_clients = [] + clients_received = 0 for queue in self._client_queues: try: queue.put_nowait(chunk) + clients_received += 1 except asyncio.QueueFull: disconnected_clients.append(queue) - except Exception: + except Exception as e: + logger.debug(f"Error putting chunk in client queue: {e}") disconnected_clients.append(queue) + # Log if no clients received chunk + if chunk_count_for_item <= 5 and clients_received == 0: + # #region agent log + try: + import json + with open('/Users/roto1231/Documents/XCode Projects/StreamTV/.cursor/debug.log', 'a') as f: + f.write(json.dumps({"sessionId":"debug-session","runId":"run1","hypothesisId":"G","location":"channel_manager.py:757","message":"CONTINUOUS: Chunk broadcasted but no clients","data":{"channel_number":self.channel_number,"item_index":idx,"chunk_count":chunk_count_for_item,"client_queues_count":len(self._client_queues)},"timestamp":int(__import__('time').time()*1000)})+'\n') + except: pass + # #endregion + # Remove disconnected clients for queue in disconnected_clients: if queue in self._client_queues: diff --git a/streamtv/streaming/mpegts_streamer.py b/streamtv/streaming/mpegts_streamer.py index 4cc3fe4..96f8ee0 100644 --- a/streamtv/streaming/mpegts_streamer.py +++ b/streamtv/streaming/mpegts_streamer.py @@ -266,7 +266,8 @@ async def _get_schedule_items(self, channel: Channel) -> List[Dict[str, Any]]: async def _stream_single_item( self, media_item: MediaItem, - channel_number: str + channel_number: str, + skip_codec_detection: bool = False ) -> AsyncIterator[bytes]: """Stream a single media item as MPEG-TS""" # Skip placeholder URLs @@ -311,15 +312,20 @@ async def _stream_single_item( logger.info(f"Stream cancelled before transcoding {media_item.title}") raise - # Detect input codec for smart transcoding - input_codec_info = await self._detect_input_codec(stream_url) - - # Check for cancellation again after codec detection (may take time) - try: - await asyncio.sleep(0) - except asyncio.CancelledError: - logger.info(f"Stream cancelled after codec detection for {media_item.title}") - raise + # Detect input codec for smart transcoding (skip for pre-warming to reduce delay) + if skip_codec_detection: + # Use default codec info for faster startup + input_codec_info = None + logger.debug(f"Skipping codec detection for {media_item.title} (pre-warming)") + else: + input_codec_info = await self._detect_input_codec(stream_url) + + # Check for cancellation again after codec detection (may take time) + try: + await asyncio.sleep(0) + except asyncio.CancelledError: + logger.info(f"Stream cancelled after codec detection for {media_item.title}") + raise # Transcode to MPEG-TS (with smart codec detection) async for chunk in self._transcode_to_mpegts(stream_url, input_codec_info, source=detected_source): diff --git a/streamtv/streaming/stream_manager.py b/streamtv/streaming/stream_manager.py index d1e8943..ba73406 100644 --- a/streamtv/streaming/stream_manager.py +++ b/streamtv/streaming/stream_manager.py @@ -6,6 +6,7 @@ from enum import Enum import json from datetime import datetime +from pathlib import Path from .youtube_adapter import YouTubeAdapter from .archive_org_adapter import ArchiveOrgAdapter @@ -37,6 +38,56 @@ def _debug_log(location: str, message: str, data: dict, hypothesis_id: str): # #endregion +def find_cookies_file(site_name: str, cookies_dir: str = "data/cookies") -> Optional[str]: + """ + Find cookies file for a site, checking both new format (site_name_cookies.txt) + and old format (youtube_cookies.txt, archive_cookies.txt, etc.) + + Args: + site_name: Site name to look for (e.g., "www.youtube.com", "youtube.com", "archive.org") + cookies_dir: Directory to search for cookies files + + Returns: + Path to cookies file if found, None otherwise + """ + from pathlib import Path + + cookies_path = Path(cookies_dir) + if not cookies_path.exists(): + return None + + # Normalize site name (remove www. prefix for matching) + normalized_name = site_name.replace("www.", "") if site_name.startswith("www.") else site_name + + # Try new format variations: + # 1. Exact site_name_cookies.txt (e.g., www.youtube.com_cookies.txt) + new_format_exact = cookies_path / f"{site_name}_cookies.txt" + if new_format_exact.exists(): + return str(new_format_exact.absolute()) + + # 2. Normalized site_name_cookies.txt (e.g., youtube.com_cookies.txt) + new_format_normalized = cookies_path / f"{normalized_name}_cookies.txt" + if new_format_normalized.exists(): + return str(new_format_normalized.absolute()) + + # Fall back to old format based on site name + old_formats = { + "youtube.com": ["youtube_cookies.txt"], + "archive.org": ["archive_cookies.txt", "archive.org_cookies.txt"], + "pbs.org": ["pbs_cookies.txt", "pbs.org_cookies.txt"], + } + + # Try to find matching old format + for key, formats in old_formats.items(): + if key in normalized_name or normalized_name in key: + for fmt in formats: + old_format = cookies_path / fmt + if old_format.exists(): + return str(old_format.absolute()) + + return None + + class StreamSource(Enum): YOUTUBE = "youtube" ARCHIVE_ORG = "archive_org" @@ -49,10 +100,19 @@ class StreamManager: """Manages streaming from different sources""" def __init__(self): + # Find YouTube cookies file (try new format first, then fall back to config/old format) + youtube_cookies = config.youtube.cookies_file + if not youtube_cookies or not Path(youtube_cookies).exists(): + # Try to find cookies file with site name (try both www. and non-www. variants) + found_cookies = find_cookies_file("www.youtube.com") or find_cookies_file("youtube.com") + if found_cookies: + youtube_cookies = found_cookies + logger.info(f"Found YouTube cookies file: {youtube_cookies}") + self.youtube_adapter = YouTubeAdapter( quality=config.youtube.quality, extract_audio=config.youtube.extract_audio, - cookies_file=config.youtube.cookies_file, + cookies_file=youtube_cookies, api_key=config.youtube.api_key # YouTube Data API v3 key for validation ) if config.youtube.enabled else None @@ -71,12 +131,21 @@ def __init__(self): # This is the secure configuration logger.debug("Archive.org username in config, password should be in Keychain") + # Find Archive.org cookies file (try new format first, then fall back to config/old format) + archive_cookies = config.archive_org.cookies_file + if not archive_cookies or not Path(archive_cookies).exists(): + # Try to find cookies file with site name + found_cookies = find_cookies_file("archive.org") + if found_cookies: + archive_cookies = found_cookies + logger.info(f"Found Archive.org cookies file: {archive_cookies}") + self.archive_org_adapter = ArchiveOrgAdapter( preferred_format=config.archive_org.preferred_format, username=archive_username, password=archive_password, - use_authentication=config.archive_org.use_authentication and (bool(archive_username and archive_password) or bool(config.archive_org.cookies_file)), - cookies_file=config.archive_org.cookies_file + use_authentication=config.archive_org.use_authentication and (bool(archive_username and archive_password) or bool(archive_cookies)), + cookies_file=archive_cookies ) if config.archive_org.enabled else None # PBS adapter - load credentials from Keychain first, then config @@ -91,11 +160,20 @@ def __init__(self): elif config.pbs.username and not config.pbs.password: logger.debug("PBS username in config, password should be in Keychain") + # Find PBS cookies file (try new format first, then fall back to config/old format) + pbs_cookies = config.pbs.cookies_file + if not pbs_cookies or not Path(pbs_cookies).exists(): + # Try to find cookies file with site name + found_cookies = find_cookies_file("pbs.org") + if found_cookies: + pbs_cookies = found_cookies + logger.info(f"Found PBS cookies file: {pbs_cookies}") + self.pbs_adapter = PBSAdapter( username=pbs_username, password=pbs_password, - use_authentication=config.pbs.use_authentication and (bool(pbs_username and pbs_password) or bool(config.pbs.cookies_file)), - cookies_file=config.pbs.cookies_file, + use_authentication=config.pbs.use_authentication and (bool(pbs_username and pbs_password) or bool(pbs_cookies)), + cookies_file=pbs_cookies, use_headless_browser=config.pbs.use_headless_browser ) if config.pbs.enabled else None diff --git a/streamtv/streaming/stream_prewarmer.py b/streamtv/streaming/stream_prewarmer.py new file mode 100644 index 0000000..f21e8e2 --- /dev/null +++ b/streamtv/streaming/stream_prewarmer.py @@ -0,0 +1,199 @@ +"""Stream pre-warming module to reduce startup delay for Plex compatibility + +This module pre-starts FFmpeg streams and buffers initial chunks to ensure +sub-second response times when clients connect, preventing Plex "cannot tune channel" errors. +""" + +import asyncio +import logging +from typing import Optional, Deque, Dict, AsyncIterator +from collections import deque +from datetime import datetime, timedelta + +logger = logging.getLogger(__name__) + + +class StreamPrewarmer: + """Pre-warms streams by starting FFmpeg early and buffering initial chunks""" + + def __init__(self, max_buffer_size: int = 10 * 1024 * 1024, max_chunks: int = 50): + """ + Initialize pre-warmer + + Args: + max_buffer_size: Maximum buffer size in bytes (default: 10MB) + max_chunks: Maximum number of chunks to buffer (default: 50) + """ + self.max_buffer_size = max_buffer_size + self.max_chunks = max_chunks + self._buffers: Dict[str, Deque[bytes]] = {} # channel_number -> deque of chunks + self._buffer_sizes: Dict[str, int] = {} # channel_number -> total buffer size in bytes + self._prewarm_tasks: Dict[str, asyncio.Task] = {} # channel_number -> prewarm task + self._lock = asyncio.Lock() + + async def prewarm_stream( + self, + channel_number: str, + stream_generator: AsyncIterator[bytes] + ) -> None: + """ + Pre-warm a stream by buffering initial chunks + + Args: + channel_number: Channel number to pre-warm + stream_generator: Async generator that yields chunks + """ + async with self._lock: + # Clear existing buffer for this channel + if channel_number in self._buffers: + self._buffers[channel_number].clear() + self._buffer_sizes[channel_number] = 0 + + # Create new buffer + self._buffers[channel_number] = deque(maxlen=self.max_chunks) + self._buffer_sizes[channel_number] = 0 + + try: + chunk_count = 0 + async for chunk in stream_generator: + async with self._lock: + if channel_number not in self._buffers: + # Buffer was cleared (channel stopped) + break + + # Check if buffer is full + if len(self._buffers[channel_number]) >= self.max_chunks: + # Buffer is full, stop pre-warming + logger.debug(f"Pre-warm buffer full for channel {channel_number} ({chunk_count} chunks)") + break + + # Check if buffer size limit reached + chunk_size = len(chunk) + if self._buffer_sizes[channel_number] + chunk_size > self.max_buffer_size: + logger.debug(f"Pre-warm buffer size limit reached for channel {channel_number} ({self._buffer_sizes[channel_number]} bytes)") + break + + # Add chunk to buffer + self._buffers[channel_number].append(chunk) + self._buffer_sizes[channel_number] += chunk_size + chunk_count += 1 + + # Log first chunk (important milestone) + if chunk_count == 1: + logger.info(f"Pre-warmed first chunk for channel {channel_number} ({chunk_size} bytes)") + + # Stop after we have enough chunks (typically 5-10 chunks is enough for sub-second response) + if chunk_count >= 10: + logger.debug(f"Pre-warmed {chunk_count} chunks for channel {channel_number} ({self._buffer_sizes[channel_number]} bytes)") + break + except asyncio.CancelledError: + logger.debug(f"Pre-warm cancelled for channel {channel_number}") + raise + except Exception as e: + logger.warning(f"Error pre-warming stream for channel {channel_number}: {e}") + finally: + async with self._lock: + if channel_number in self._prewarm_tasks: + del self._prewarm_tasks[channel_number] + + async def get_buffered_stream( + self, + channel_number: str, + stream_generator: AsyncIterator[bytes] + ) -> AsyncIterator[bytes]: + """ + Get a stream that serves buffered chunks first, then continues from generator + + Args: + channel_number: Channel number + stream_generator: Async generator that yields chunks (continues after buffer) + + Yields: + Chunks from buffer first, then from generator + """ + # First, yield all buffered chunks + async with self._lock: + if channel_number in self._buffers and self._buffers[channel_number]: + buffer = list(self._buffers[channel_number]) + buffer_size = self._buffer_sizes.get(channel_number, 0) + logger.info(f"Serving {len(buffer)} pre-warmed chunks for channel {channel_number} ({buffer_size} bytes)") + + # Clear buffer after serving (don't serve same chunks twice) + self._buffers[channel_number].clear() + self._buffer_sizes[channel_number] = 0 + else: + buffer = [] + logger.debug(f"No pre-warmed buffer for channel {channel_number}, starting fresh") + + # Yield buffered chunks immediately + for chunk in buffer: + yield chunk + + # Continue from live stream + async for chunk in stream_generator: + yield chunk + + async def clear_buffer(self, channel_number: str) -> None: + """Clear the buffer for a channel""" + async with self._lock: + if channel_number in self._buffers: + self._buffers[channel_number].clear() + self._buffer_sizes[channel_number] = 0 + logger.debug(f"Cleared pre-warm buffer for channel {channel_number}") + + async def start_prewarm_task( + self, + channel_number: str, + stream_generator: AsyncIterator[bytes] + ) -> None: + """Start a background task to pre-warm a stream""" + async with self._lock: + # Cancel existing prewarm task if any + if channel_number in self._prewarm_tasks: + task = self._prewarm_tasks[channel_number] + if not task.done(): + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + # Start new prewarm task + task = asyncio.create_task( + self.prewarm_stream(channel_number, stream_generator) + ) + self._prewarm_tasks[channel_number] = task + logger.debug(f"Started pre-warm task for channel {channel_number}") + + async def stop_prewarm_task(self, channel_number: str) -> None: + """Stop pre-warm task for a channel""" + async with self._lock: + if channel_number in self._prewarm_tasks: + task = self._prewarm_tasks[channel_number] + if not task.done(): + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + del self._prewarm_tasks[channel_number] + + # Clear buffer + await self.clear_buffer(channel_number) + + async def get_buffer_info(self, channel_number: str) -> Dict[str, int]: + """Get buffer information for a channel""" + async with self._lock: + if channel_number in self._buffers: + return { + "chunk_count": len(self._buffers[channel_number]), + "buffer_size_bytes": self._buffer_sizes.get(channel_number, 0), + "has_buffer": True + } + else: + return { + "chunk_count": 0, + "buffer_size_bytes": 0, + "has_buffer": False + } + diff --git a/streamtv/streaming/youtube_adapter.py b/streamtv/streaming/youtube_adapter.py index 347c0a6..f1814d8 100644 --- a/streamtv/streaming/youtube_adapter.py +++ b/streamtv/streaming/youtube_adapter.py @@ -56,6 +56,44 @@ def __init__(self, quality: str = "best", extract_audio: bool = False, cookies_f if cookies_file: self._ydl_opts['cookiefile'] = cookies_file logger.info(f"Using YouTube cookies file: {cookies_file}") + # Validate cookies file has required cookies + self._validate_cookies_file(cookies_file) + + def _validate_cookies_file(self, cookies_file: str): + """Validate that cookies file contains required authentication cookies""" + try: + from pathlib import Path + cookies_path = Path(cookies_file) + if not cookies_path.exists(): + logger.warning(f"YouTube cookies file not found: {cookies_file}") + return + + # Required cookies for YouTube authentication + required_cookies = ['LOGIN_INFO', 'SID', 'HSID', 'SSID', 'APISID', 'SAPISID', '__Secure-1PSID', '__Secure-3PSID'] + found_cookies = set() + + with open(cookies_path, 'r', encoding='utf-8', errors='ignore') as f: + for line in f: + line = line.strip() + if not line or line.startswith('#'): + continue + # Netscape cookie format: domain, flag, path, secure, expiration, name, value + parts = line.split('\t') + if len(parts) >= 6: + cookie_name = parts[5] if len(parts) > 5 else '' + if cookie_name in required_cookies: + found_cookies.add(cookie_name) + + missing_cookies = set(required_cookies) - found_cookies + if missing_cookies: + logger.warning(f"YouTube cookies file missing required cookies: {', '.join(missing_cookies)}") + logger.warning(f"Cookies file: {cookies_file}") + logger.warning("To fix: Export a complete cookies file from your browser after logging into YouTube.") + logger.warning("See: https://github.com/yt-dlp/yt-dlp/wiki/Extractors#exporting-youtube-cookies") + else: + logger.debug(f"YouTube cookies file validated: all required cookies present") + except Exception as e: + logger.debug(f"Error validating cookies file: {e}") def __del__(self): """Cleanup thread pool executor""" @@ -255,7 +293,10 @@ def _get_stream_url_sync(self, url: str, format_id: Optional[str] = None) -> str if 'height<=' in requested_format: # Try without height restriction format_selectors.append('best') - format_selectors.append('best') # Final fallback + # Add more permissive fallbacks + format_selectors.append('worst/best') # Accept worst if best not available + format_selectors.append('worst') # Just worst quality + format_selectors.append('best') # Final fallback before permissive attempts # Ensure cookies file is explicitly set (in case it wasn't copied properly) cookies_file = self.cookies_file @@ -265,13 +306,17 @@ def _get_stream_url_sync(self, url: str, format_id: Optional[str] = None) -> str info = None for fmt_selector in format_selectors: try: + # Create fresh options, explicitly excluding any format from base options ydl_opts = { - **self._ydl_opts, - 'format': fmt_selector, - 'noplaylist': True, - # CRITICAL: Ensure no downloading happens + 'quiet': True, + 'no_warnings': True, + 'extract_flat': False, + 'geo_bypass': True, + 'geo_bypass_country': 'US', 'download': False, + 'noplaylist': True, 'skip_download': True, + 'format': fmt_selector, # Explicitly set format for this attempt } if cookies_file: @@ -311,43 +356,72 @@ def _get_stream_url_sync(self, url: str, format_id: Optional[str] = None) -> str logger.debug(f"Non-format error with format '{fmt_selector}' for {url}: {error_msg_clean[:150]}") raise - # If we exhausted all format selectors, try one more time with no format restriction + # If we exhausted all format selectors, try one more time with very permissive format selectors if info is None: - logger.warning(f"All format selectors failed for {url}, trying with no format restriction (let yt-dlp auto-select)...") - try: - # Create fresh options without any format specification - ydl_opts = { - 'quiet': True, - 'no_warnings': True, - 'extract_flat': False, - 'geo_bypass': True, - 'geo_bypass_country': 'US', - 'download': False, - 'noplaylist': True, - 'skip_download': True, - # Explicitly do NOT set 'format' - let yt-dlp auto-select - } - - if cookies_file: - ydl_opts['cookiefile'] = cookies_file - - with yt_dlp.YoutubeDL(ydl_opts) as ydl: - info = ydl.extract_info(url, download=False) - logger.info(f"Successfully extracted info with auto-selected format for {url}") - except Exception as final_error: - error_msg_final = str(final_error) - # Remove ANSI codes + logger.warning(f"All format selectors failed for {url}, trying with very permissive format selectors...") + + # Try progressively more permissive formats + permissive_formats = [ + 'worst/best', # Accept worst quality if best not available + 'worst', # Just worst quality + None, # No format restriction - let yt-dlp auto-select + ] + + for permissive_fmt in permissive_formats: + try: + # Create fresh options + ydl_opts = { + 'quiet': True, + 'no_warnings': True, + 'extract_flat': False, + 'geo_bypass': True, + 'geo_bypass_country': 'US', + 'download': False, + 'noplaylist': True, + 'skip_download': True, + } + + # Only set format if specified (None means auto-select) + if permissive_fmt is not None: + ydl_opts['format'] = permissive_fmt + + if cookies_file: + ydl_opts['cookiefile'] = cookies_file + + with yt_dlp.YoutubeDL(ydl_opts) as ydl: + info = ydl.extract_info(url, download=False) + logger.info(f"Successfully extracted info with format '{permissive_fmt or 'auto-select'}' for {url}") + break # Success, exit loop + except Exception as fmt_error: + error_msg_fmt = str(fmt_error) + error_msg_fmt_clean = error_msg_fmt.replace('\x1b[0;31m', '').replace('\x1b[0m', '').replace('[0;31m', '').replace('[0m', '') + + # Check if it's still a format error + is_format_error = ( + 'Requested format is not available' in error_msg_fmt_clean or + 'format is not available' in error_msg_fmt_clean.lower() or + 'list-formats' in error_msg_fmt_clean.lower() + ) + + if is_format_error and permissive_fmt != permissive_formats[-1]: + # Try next permissive format + logger.debug(f"Format '{permissive_fmt or 'auto-select'}' failed, trying next...") + continue + elif is_format_error: + # Last format also failed - video likely has no formats + logger.error(f"All format attempts failed for {url}. Video may have no available formats.") + logger.error(f"Last error: {error_msg_fmt_clean[:200]}") + raise ValueError(f"YouTube video has no available formats: {url}. The video may be restricted, region-locked, or unavailable.") + else: + # Non-format error, re-raise + raise + + # If we still don't have info after all permissive formats, raise error + if info is None: + error_msg_final = str(last_error) if last_error else "Unknown error" error_msg_final_clean = error_msg_final.replace('\x1b[0;31m', '').replace('\x1b[0m', '').replace('[0;31m', '').replace('[0m', '') - logger.error(f"Final fallback (auto-select format) also failed for {url}: {error_msg_final_clean[:200]}") - - # If it's still a format error, the video might truly have no available formats - if 'format' in error_msg_final_clean.lower() and 'not available' in error_msg_final_clean.lower(): - logger.error(f"Video {url} appears to have no streamable formats available. This may be a restricted or unavailable video.") - raise ValueError(f"YouTube video has no available formats: {url}. The video may be restricted, region-locked, or unavailable.") - - if last_error: - raise last_error - raise ValueError(f"Failed to extract video info: {error_msg_final_clean[:200]}") + logger.error(f"All format attempts (including permissive) failed for {url}: {error_msg_final_clean[:200]}") + raise ValueError(f"YouTube video has no available formats: {url}. The video may be restricted, region-locked, or unavailable.") # Get the best available format URL if 'url' in info: