diff --git a/app.py b/app.py index 86ecfe3..91dd708 100644 --- a/app.py +++ b/app.py @@ -204,7 +204,60 @@ def update_tuner_urls(name, xml_url, m3u_url): conn.commit() def add_tuner(name, xml_url, m3u_url): - """Insert a new tuner into DB.""" + """Insert a new tuner into DB with validation.""" + # Check for duplicate name + tuners = get_tuners() + if name in tuners: + raise ValueError(f"Tuner '{name}' already exists") + + # Validate M3U URL + if not m3u_url or not m3u_url.strip(): + raise ValueError("M3U URL is required") + if not m3u_url.startswith(('http://', 'https://')): + raise ValueError("M3U URL must start with http:// or https://") + + # Validate XML URL if provided + if xml_url and xml_url.strip(): + if not xml_url.startswith(('http://', 'https://')): + raise ValueError("XML URL must start with http:// or https://") + + # Optional: Check URL reachability with SSRF protection + try: + # Parse the URL to validate the hostname + parsed_url = urlparse(m3u_url) + hostname = parsed_url.hostname + + if not hostname: + raise ValueError("M3U URL must have a valid hostname") + + # Block localhost to prevent SSRF attacks on local services + try: + # Resolve hostname to IP address + ip_addr = socket.gethostbyname(hostname) + ip_obj = ipaddress.ip_address(ip_addr) + + # Block localhost (127.0.0.0/8) to prevent SSRF + if ip_obj.is_loopback: + raise ValueError("M3U URL cannot point to localhost (127.0.0.0/8)") + # Block link-local addresses (169.254.0.0/16) which could be cloud metadata + if ip_obj.is_link_local: + raise ValueError("M3U URL cannot point to link-local addresses (169.254.0.0/16)") + except socket.gaierror: + # If hostname can't be resolved, it will fail in the requests call anyway + pass + + # Make the request with security restrictions + r = requests.head(m3u_url, timeout=5, allow_redirects=True) + r.raise_for_status() + except requests.RequestException as e: + raise ValueError(f"M3U URL unreachable: {str(e)}") + except ValueError: + # Re-raise ValueError from our validation + raise + except Exception as e: + raise ValueError(f"M3U URL validation failed: {str(e)}") + + # Insert into database with sqlite3.connect(TUNER_DB, timeout=10) as conn: c = conn.cursor() c.execute( @@ -272,6 +325,38 @@ def parse_m3u(m3u_url): except: return channels + # Filter out empty lines and comments (except #EXTINF) + non_empty_lines = [line.strip() for line in lines if line.strip()] + + # Check if this is a single-channel playlist (no #EXTINF tags) + has_extinf = any(line.startswith('#EXTINF:') for line in non_empty_lines) + + if not has_extinf: + # Look for a single stream URL + stream_urls = [line for line in non_empty_lines + if line.startswith(('http://', 'https://')) + and not line.startswith('#')] + + if len(stream_urls) == 1: + url = stream_urls[0] + # Extract a channel name from the URL or use default + try: + parsed = urlparse(url) + name = parsed.path.split('/')[-1].replace('.m3u8', '').replace('_', ' ').title() + if not name: + name = 'Live Stream' + except Exception: + name = 'Live Stream' + + channels.append({ + 'name': name, + 'logo': '', + 'url': url, + 'tvg_id': 'stream_1' + }) + return channels + + # Existing multi-channel parsing logic for i, line in enumerate(lines): if line.startswith('#EXTINF:'): info = line.strip() @@ -749,12 +834,14 @@ def change_tuner(): if not name: flash("Tuner name cannot be empty.", "warning") - elif name in get_tuners(): - flash(f"Tuner {name} already exists.", "warning") else: - add_tuner(name, xml_url, m3u_url) - log_event(current_user.username, f"Added tuner {name}") - flash(f"Tuner {name} added successfully.") + try: + add_tuner(name, xml_url, m3u_url) + log_event(current_user.username, f"Added tuner {name}") + flash(f"Tuner {name} added successfully.") + except ValueError as e: + flash(str(e), "warning") + log_event(current_user.username, f"Failed to add tuner {name}: {str(e)}") elif action == "update_auto_refresh": # Expect form fields: auto_refresh_enabled ('0' or '1') and auto_refresh_interval_hours (2/4/6/12/24) diff --git a/tests/test_tuner_validation.py b/tests/test_tuner_validation.py new file mode 100644 index 0000000..527d12b --- /dev/null +++ b/tests/test_tuner_validation.py @@ -0,0 +1,257 @@ +# tests/test_tuner_validation.py +# Tests for tuner validation and M3U parsing improvements + +import pytest +import sys +import os +import sqlite3 +import tempfile +from unittest.mock import Mock, patch, MagicMock + +# Add the parent directory to the path to import app +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import app as app_module + + +class TestAddTunerValidation: + """Test validation logic in add_tuner() function.""" + + def setup_method(self): + """Set up test database before each test.""" + self.temp_db = tempfile.NamedTemporaryFile(delete=False, suffix='.db') + self.temp_db.close() + app_module.TUNER_DB = self.temp_db.name + + # Initialize database + with sqlite3.connect(self.temp_db.name, timeout=10) as conn: + c = conn.cursor() + c.execute('''CREATE TABLE IF NOT EXISTS tuners + (name TEXT PRIMARY KEY, xml TEXT, m3u TEXT)''') + c.execute('''CREATE TABLE IF NOT EXISTS settings + (key TEXT PRIMARY KEY, value TEXT)''') + conn.commit() + + def teardown_method(self): + """Clean up test database after each test.""" + try: + os.unlink(self.temp_db.name) + except OSError: + pass + + def test_duplicate_name_prevention(self): + """Test that duplicate tuner names are rejected.""" + # Add a tuner first + with patch('app.requests.head') as mock_head: + mock_response = Mock() + mock_response.raise_for_status = Mock() + mock_head.return_value = mock_response + + app_module.add_tuner("TestTuner", "http://example.com/epg.xml", "http://example.com/playlist.m3u") + + # Try to add duplicate + with pytest.raises(ValueError, match="Tuner 'TestTuner' already exists"): + with patch('app.requests.head') as mock_head: + mock_response = Mock() + mock_response.raise_for_status = Mock() + mock_head.return_value = mock_response + + app_module.add_tuner("TestTuner", "http://example.com/epg2.xml", "http://example.com/playlist2.m3u") + + def test_m3u_url_required(self): + """Test that M3U URL is required.""" + with pytest.raises(ValueError, match="M3U URL is required"): + app_module.add_tuner("TestTuner", "http://example.com/epg.xml", "") + + with pytest.raises(ValueError, match="M3U URL is required"): + app_module.add_tuner("TestTuner", "http://example.com/epg.xml", " ") + + def test_m3u_url_must_be_http_or_https(self): + """Test that M3U URL must start with http:// or https://.""" + with pytest.raises(ValueError, match="M3U URL must start with http:// or https://"): + app_module.add_tuner("TestTuner", "http://example.com/epg.xml", "ftp://example.com/playlist.m3u") + + with pytest.raises(ValueError, match="M3U URL must start with http:// or https://"): + app_module.add_tuner("TestTuner", "http://example.com/epg.xml", "/local/playlist.m3u") + + def test_xml_url_must_be_http_or_https_if_provided(self): + """Test that XML URL must start with http:// or https:// if provided.""" + with pytest.raises(ValueError, match="XML URL must start with http:// or https://"): + with patch('app.requests.head') as mock_head: + mock_response = Mock() + mock_response.raise_for_status = Mock() + mock_head.return_value = mock_response + + app_module.add_tuner("TestTuner", "ftp://example.com/epg.xml", "http://example.com/playlist.m3u") + + def test_xml_url_can_be_empty(self): + """Test that XML URL can be empty or whitespace.""" + with patch('app.requests.head') as mock_head: + mock_response = Mock() + mock_response.raise_for_status = Mock() + mock_head.return_value = mock_response + + # Should not raise an error + app_module.add_tuner("TestTuner1", "", "http://example.com/playlist.m3u") + app_module.add_tuner("TestTuner2", " ", "http://example.com/playlist2.m3u") + + def test_url_reachability_check(self): + """Test that unreachable URLs are rejected.""" + import requests + with patch('app.requests.head') as mock_head: + mock_head.side_effect = requests.RequestException("Connection refused") + + with pytest.raises(ValueError, match="M3U URL unreachable"): + app_module.add_tuner("TestTuner", "http://example.com/epg.xml", "http://unreachable.example.com/playlist.m3u") + + def test_localhost_blocked(self): + """Test that localhost URLs are blocked to prevent SSRF.""" + with pytest.raises(ValueError, match="M3U URL cannot point to localhost"): + app_module.add_tuner("LocalhostTuner", "http://example.com/epg.xml", "http://localhost:8080/playlist.m3u") + + with pytest.raises(ValueError, match="M3U URL cannot point to localhost"): + app_module.add_tuner("LocalhostTuner", "http://example.com/epg.xml", "http://127.0.0.1:8080/playlist.m3u") + + def test_link_local_blocked(self): + """Test that link-local addresses are blocked to prevent SSRF.""" + with pytest.raises(ValueError, match="M3U URL cannot point to link-local"): + app_module.add_tuner("LinkLocalTuner", "http://example.com/epg.xml", "http://169.254.169.254/latest/meta-data/") + + def test_successful_tuner_addition(self): + """Test successful tuner addition with valid inputs.""" + with patch('app.requests.head') as mock_head: + mock_response = Mock() + mock_response.raise_for_status = Mock() + mock_head.return_value = mock_response + + app_module.add_tuner("ValidTuner", "http://example.com/epg.xml", "http://example.com/playlist.m3u") + + # Verify tuner was added + tuners = app_module.get_tuners() + assert "ValidTuner" in tuners + assert tuners["ValidTuner"]["xml"] == "http://example.com/epg.xml" + assert tuners["ValidTuner"]["m3u"] == "http://example.com/playlist.m3u" + + +class TestSingleChannelM3U8: + """Test single-channel M3U8 playlist parsing.""" + + def test_single_channel_m3u8_with_simple_url(self): + """Test parsing a simple M3U8 with just one stream URL.""" + m3u_content = """#EXTM3U +https://example.com/live/stream.m3u8""" + + with patch('app.requests.get') as mock_get: + mock_response = Mock() + mock_response.text = m3u_content + mock_response.raise_for_status = Mock() + mock_get.return_value = mock_response + + channels = app_module.parse_m3u("http://example.com/single.m3u8") + + assert len(channels) == 1 + assert channels[0]['url'] == "https://example.com/live/stream.m3u8" + assert channels[0]['tvg_id'] == 'stream_1' + assert channels[0]['logo'] == '' + # Name should be extracted from URL + assert channels[0]['name'] == 'Stream' + + def test_single_channel_m3u8_with_descriptive_name(self): + """Test that channel name is extracted from URL path.""" + m3u_content = """#EXTM3U +https://example.com/live/my_awesome_channel.m3u8""" + + with patch('app.requests.get') as mock_get: + mock_response = Mock() + mock_response.text = m3u_content + mock_response.raise_for_status = Mock() + mock_get.return_value = mock_response + + channels = app_module.parse_m3u("http://example.com/single.m3u8") + + assert len(channels) == 1 + assert channels[0]['name'] == 'My Awesome Channel' + + def test_single_channel_m3u8_with_no_extinf(self): + """Test M3U8 without any #EXTINF tags.""" + m3u_content = """https://example.com/live/stream.m3u8""" + + with patch('app.requests.get') as mock_get: + mock_response = Mock() + mock_response.text = m3u_content + mock_response.raise_for_status = Mock() + mock_get.return_value = mock_response + + channels = app_module.parse_m3u("http://example.com/single.m3u8") + + assert len(channels) == 1 + assert channels[0]['url'] == "https://example.com/live/stream.m3u8" + + def test_single_channel_default_name(self): + """Test default name when URL doesn't provide a good name.""" + m3u_content = """https://example.com/""" + + with patch('app.requests.get') as mock_get: + mock_response = Mock() + mock_response.text = m3u_content + mock_response.raise_for_status = Mock() + mock_get.return_value = mock_response + + channels = app_module.parse_m3u("http://example.com/single.m3u8") + + assert len(channels) == 1 + assert channels[0]['name'] == 'Live Stream' + + def test_multiple_urls_not_single_channel(self): + """Test that multiple URLs without #EXTINF don't parse as single channel.""" + m3u_content = """https://example.com/stream1.m3u8 +https://example.com/stream2.m3u8""" + + with patch('app.requests.get') as mock_get: + mock_response = Mock() + mock_response.text = m3u_content + mock_response.raise_for_status = Mock() + mock_get.return_value = mock_response + + channels = app_module.parse_m3u("http://example.com/multi.m3u8") + + # Should return empty since it's not a valid single-channel playlist + assert len(channels) == 0 + + def test_multi_channel_m3u_still_works(self): + """Test that existing multi-channel M3U parsing still works.""" + m3u_content = """#EXTM3U +#EXTINF:-1 tvg-id="ch1" tvg-logo="http://example.com/logo1.png",Channel 1 +http://example.com/stream1.m3u8 +#EXTINF:-1 tvg-id="ch2" tvg-logo="http://example.com/logo2.png",Channel 2 +http://example.com/stream2.m3u8""" + + with patch('app.requests.get') as mock_get: + mock_response = Mock() + mock_response.text = m3u_content + mock_response.raise_for_status = Mock() + mock_get.return_value = mock_response + + channels = app_module.parse_m3u("http://example.com/multi.m3u8") + + assert len(channels) == 2 + assert channels[0]['name'] == 'Channel 1' + assert channels[0]['tvg_id'] == 'ch1' + assert channels[0]['logo'] == 'http://example.com/logo1.png' + assert channels[0]['url'] == 'http://example.com/stream1.m3u8' + assert channels[1]['name'] == 'Channel 2' + assert channels[1]['tvg_id'] == 'ch2' + + def test_empty_m3u_returns_empty_channels(self): + """Test that an empty M3U returns empty channel list.""" + m3u_content = "" + + with patch('app.requests.get') as mock_get: + mock_response = Mock() + mock_response.text = m3u_content + mock_response.raise_for_status = Mock() + mock_get.return_value = mock_response + + channels = app_module.parse_m3u("http://example.com/empty.m3u8") + + assert len(channels) == 0