diff --git a/app.py b/app.py index 1f1b150..d567085 100644 --- a/app.py +++ b/app.py @@ -1,7 +1,8 @@ -APP_VERSION = "v4.1.0" +# app.py — merged version (features from both sources) +APP_VERSION = "v4.2.0" APP_RELEASE_DATE = "2025-10-25" -from flask import Flask, render_template, request, redirect, url_for, flash, session +from flask import Flask, render_template, request, redirect, url_for, flash, session, jsonify, abort from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user from werkzeug.security import generate_password_hash, check_password_hash import sqlite3 @@ -12,11 +13,20 @@ import datetime import requests import xml.etree.ElementTree as ET -from urllib.parse import urlparse # added -import socket # added -import ipaddress # added +from urllib.parse import urlparse, urljoin +import socket +import ipaddress +import logging +import subprocess from datetime import datetime, timezone, timedelta +# Optional helper module for vlc (try import; fall back to subprocess-based helpers) +try: + import vlc_control +except Exception: + vlc_control = None + logging.exception("vlc_control import failed (this is optional): %s", sys.exc_info()[0]) + APP_START_TIME = datetime.now() # ------------------- Config ------------------- @@ -24,7 +34,6 @@ app.secret_key = os.urandom(24) # replace with a fixed key in production app.config['REMEMBER_COOKIE_DURATION'] = timedelta(days=30) - DATABASE = 'users.db' TUNER_DB = 'tuners.db' @@ -214,7 +223,7 @@ def parse_m3u(m3u_url): lines = r.text.splitlines() except: return channels - + for i, line in enumerate(lines): if line.startswith('#EXTINF:'): info = line.strip() @@ -235,11 +244,11 @@ def parse_m3u(m3u_url): # ------------------- XMLTV EPG Parsing ------------------- def parse_epg(xml_url): programs = {} - - # ✅ Handle when user pastes same .m3u for XML + + # handle when user pastes same .m3u for XML if xml_url.lower().endswith(('.m3u', '.m3u8')): return programs # empty, fallback will fill it later - + try: r = requests.get(xml_url, timeout=15) r.raise_for_status() @@ -292,6 +301,67 @@ def apply_epg_fallback(channels, epg): }] return epg +# ------------------- Helper: safe redirect ------------------- +def is_safe_url(target): + try: + ref_url = urlparse(request.host_url) + test_url = urlparse(urljoin(request.host_url, target)) + return test_url.scheme in ('http', 'https') and ref_url.netloc == test_url.netloc + except Exception: + return False + +# ------------------- Remote / Playback Helpers ------------------- +PLAY_SCRIPT = "/usr/local/bin/vlc-play.sh" +STOP_SCRIPT = "/usr/local/bin/vlc-stop.sh" +VLC_LOG = "/var/log/vlc-play.log" + +def run_play_script(url, instance="main"): + """Run the configured helper script via sudo if vlc_control isn't available.""" + if vlc_control: + try: + return vlc_control.start(url, instance) + except Exception: + logging.exception("vlc_control.start failed") + # fallthrough to subprocess + try: + subprocess.run(["sudo", PLAY_SCRIPT, url, instance], check=True, timeout=30) + return True + except Exception: + logging.exception("Failed to run play script") + return False + +def run_stop_script(instance="main"): + if vlc_control: + try: + return vlc_control.stop(instance) + except Exception: + logging.exception("vlc_control.stop failed") + try: + subprocess.run(["sudo", STOP_SCRIPT, instance], check=True, timeout=20) + return True + except Exception: + logging.exception("Failed to run stop script") + return False + +def tail_file(path, lines=200): + try: + with open(path, 'rb') as f: + f.seek(0, os.SEEK_END) + size = f.tell() + block = 1024 + data = b'' + while size > 0 and lines > 0: + size -= block + if size < 0: + block += size + size = 0 + f.seek(size) + chunk = f.read(block) + data = chunk + data + lines -= chunk.count(b'\n') + return data.decode(errors='ignore').splitlines()[-200:] + except Exception: + return [] # ------------------- Routes ------------------- @app.route('/') @@ -303,11 +373,10 @@ def login(): if request.method == 'POST': username = request.form['username'] password = request.form['password'] - remember = 'remember' in request.form # ✅ new: detect Save Session checkbox + remember = 'remember' in request.form user = get_user(username) if user and check_password_hash(user.password_hash, password): - login_user(user, remember=remember) # ✅ persist login if checked - login_user(user) + login_user(user, remember=remember) log_event(username, "Logged in") tuners = get_tuners() current_tuner = get_current_tuner() @@ -316,8 +385,11 @@ def login(): global cached_channels, cached_epg cached_channels = parse_m3u(m3u_url) cached_epg = parse_epg(xml_url) - # ✅ Apply “No Guide Data Available” fallback cached_epg = apply_epg_fallback(cached_channels, cached_epg) + # safe redirect handling for optional 'next' param + next_url = request.args.get('next') or request.form.get('next') + if next_url and is_safe_url(next_url): + return redirect(next_url) return redirect(url_for('guide')) else: log_event(username if username else "unknown", "Failed login attempt") @@ -332,8 +404,6 @@ def logout(): return redirect(url_for('login')) def revoke_user_sessions(username): - # Placeholder: later this can use a session-tracking table or Redis - # For now, it clears any "remember" cookie or stored flag session_key = f"user_session_{username}" if session_key in session: session.pop(session_key, None) @@ -417,18 +487,14 @@ def delete_user(): def manage_users(): ua = request.headers.get('User-Agent', '').lower() - # Detect Android / Fire / Google TV browsers tv_patterns = ['silk', 'aft', 'android tv', 'googletv', 'mibox', 'bravia', 'shield', 'tcl', 'hisense', 'puffin', 'tv bro'] is_tv = any(p in ua for p in tv_patterns) - # Restrict access if current_user.username != 'admin' or is_tv: - # Log unauthorized or TV-based attempt log_event(current_user.username, f"Unauthorized attempt to access /manage_users from UA: {ua}") flash("Unauthorized access.") return redirect(url_for('guide')) - # ---- Normal admin logic below ---- with sqlite3.connect(DATABASE, timeout=10) as conn: c = conn.cursor() c.execute('SELECT username FROM users WHERE username != "admin"') @@ -474,9 +540,8 @@ def manage_users(): return render_template('manage_users.html', users=users, current_tuner=get_current_tuner()) - @app.route("/about") -@login_required # optional +@login_required def about(): python_version = sys.version.split()[0] os_info = platform.platform() @@ -484,7 +549,6 @@ def about(): db_path = os.path.join(install_path, "app.db") log_path = "/var/log/iptv" if os.name != "nt" else os.path.join(install_path, "logs") - # calculate uptime uptime_delta = datetime.now() - APP_START_TIME days, seconds = uptime_delta.days, uptime_delta.seconds hours = seconds // 3600 @@ -504,7 +568,6 @@ def about(): } return render_template("about.html", info=info) - @app.route('/change_tuner', methods=['GET', 'POST']) @login_required def change_tuner(): @@ -521,14 +584,12 @@ def change_tuner(): log_event(current_user.username, f"Switched active tuner to {new_tuner}") flash(f"Active tuner switched to {new_tuner}") - # ✅ Refresh cached guide data immediately global cached_channels, cached_epg tuners = get_tuners() m3u_url = tuners[new_tuner]["m3u"] xml_url = tuners[new_tuner]["xml"] cached_channels = parse_m3u(m3u_url) cached_epg = parse_epg(xml_url) - # ✅ Apply “No Guide Data Available” fallback cached_epg = apply_epg_fallback(cached_channels, cached_epg) elif action == "update_urls": @@ -536,12 +597,10 @@ def change_tuner(): xml_url = request.form["xml_url"] m3u_url = request.form["m3u_url"] - # update DB update_tuner_urls(tuner, xml_url, m3u_url) log_event(current_user.username, f"Updated URLs for tuner {tuner}") flash(f"Updated URLs for tuner {tuner}") - # ✅ Validate inputs if xml_url: validate_tuner_url(xml_url, label=f"{tuner} XML") if m3u_url: @@ -558,7 +617,7 @@ def change_tuner(): flash(f"Tuner {tuner} deleted.") elif action == "rename_tuner": - old_name = request.form["tuner"] # matches HTML