diff --git a/app.py b/app.py index d567085..4bc2b84 100644 --- a/app.py +++ b/app.py @@ -1,6 +1,6 @@ # app.py — merged version (features from both sources) APP_VERSION = "v4.2.0" -APP_RELEASE_DATE = "2025-10-25" +APP_RELEASE_DATE = "2025-11-06" from flask import Flask, render_template, request, redirect, url_for, flash, session, jsonify, abort from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user @@ -20,12 +20,13 @@ import subprocess from datetime import datetime, timezone, timedelta -# Optional helper module for vlc (try import; fall back to subprocess-based helpers) +# New import: vlc control helper (optional - keep existing integration compatibility) try: import vlc_control -except Exception: +except Exception as e: vlc_control = None - logging.exception("vlc_control import failed (this is optional): %s", sys.exc_info()[0]) + # Log the import failure so we can see why it failed when the app starts + logging.exception("Failed to import vlc_control: %s", e) APP_START_TIME = datetime.now() @@ -34,6 +35,7 @@ app.secret_key = os.urandom(24) # replace with a fixed key in production app.config['REMEMBER_COOKIE_DURATION'] = timedelta(days=30) + DATABASE = 'users.db' TUNER_DB = 'tuners.db' @@ -200,7 +202,7 @@ def delete_tuner(name): """Delete a tuner from DB (except current one).""" with sqlite3.connect(TUNER_DB, timeout=10) as conn: c = conn.cursor() - c.execute("DELETE FROM tuners WHERE name=?", (name,)) + c.execute('DELETE FROM tuners WHERE name=?', (name,)) conn.commit() def rename_tuner(old_name, new_name): @@ -214,6 +216,9 @@ def rename_tuner(old_name, new_name): cached_channels = [] cached_epg = {} +# Track currently playing marker (server-side) +CURRENTLY_PLAYING = None + # ------------------- M3U Parsing ------------------- def parse_m3u(m3u_url): channels = [] @@ -223,7 +228,7 @@ def parse_m3u(m3u_url): lines = r.text.splitlines() except: return channels - + for i, line in enumerate(lines): if line.startswith('#EXTINF:'): info = line.strip() @@ -244,11 +249,11 @@ def parse_m3u(m3u_url): # ------------------- XMLTV EPG Parsing ------------------- def parse_epg(xml_url): programs = {} - - # handle when user pastes same .m3u for XML + + # ✅ Handle when user pastes same .m3u for XML if xml_url.lower().endswith(('.m3u', '.m3u8')): return programs # empty, fallback will fill it later - + try: r = requests.get(xml_url, timeout=15) r.raise_for_status() @@ -301,68 +306,19 @@ def apply_epg_fallback(channels, epg): }] return epg -# ------------------- Helper: safe redirect ------------------- + +# ------------------- Safe redirect helper ------------------- def is_safe_url(target): + """ + Return True if the target is a safe local URL (same host). Prevent open redirect. + """ try: ref_url = urlparse(request.host_url) test_url = urlparse(urljoin(request.host_url, target)) - return test_url.scheme in ('http', 'https') and ref_url.netloc == test_url.netloc - except Exception: - return False - -# ------------------- Remote / Playback Helpers ------------------- -PLAY_SCRIPT = "/usr/local/bin/vlc-play.sh" -STOP_SCRIPT = "/usr/local/bin/vlc-stop.sh" -VLC_LOG = "/var/log/vlc-play.log" - -def run_play_script(url, instance="main"): - """Run the configured helper script via sudo if vlc_control isn't available.""" - if vlc_control: - try: - return vlc_control.start(url, instance) - except Exception: - logging.exception("vlc_control.start failed") - # fallthrough to subprocess - try: - subprocess.run(["sudo", PLAY_SCRIPT, url, instance], check=True, timeout=30) - return True + return (test_url.scheme in ("http", "https") and ref_url.netloc == test_url.netloc) except Exception: - logging.exception("Failed to run play script") return False -def run_stop_script(instance="main"): - if vlc_control: - try: - return vlc_control.stop(instance) - except Exception: - logging.exception("vlc_control.stop failed") - try: - subprocess.run(["sudo", STOP_SCRIPT, instance], check=True, timeout=20) - return True - except Exception: - logging.exception("Failed to run stop script") - return False - -def tail_file(path, lines=200): - try: - with open(path, 'rb') as f: - f.seek(0, os.SEEK_END) - size = f.tell() - block = 1024 - data = b'' - while size > 0 and lines > 0: - size -= block - if size < 0: - block += size - size = 0 - f.seek(size) - chunk = f.read(block) - data = chunk + data - lines -= chunk.count(b'\n') - return data.decode(errors='ignore').splitlines()[-200:] - except Exception: - return [] - # ------------------- Routes ------------------- @app.route('/') def home(): @@ -370,31 +326,86 @@ def home(): @app.route('/login', methods=['GET','POST']) def login(): + """ + Login view updated to preserve a safe 'next' redirect. If a user supplies ?next=/remote + (e.g. from the QR), after successful login they will be redirected there automatically. + """ + # If already authenticated, redirect to next or guide + if getattr(current_user, "is_authenticated", False): + next_url = request.args.get('next') or request.form.get('next') or url_for('guide') + if next_url and not is_safe_url(next_url): + return abort(400) + return redirect(next_url) + if request.method == 'POST': - username = request.form['username'] - password = request.form['password'] - remember = 'remember' in request.form + username = request.form.get('username', '') + password = request.form.get('password', '') + remember = request.form.get('remember') == 'on' or request.form.get('remember') == 'true' or 'remember' in request.form user = get_user(username) if user and check_password_hash(user.password_hash, password): login_user(user, remember=remember) log_event(username, "Logged in") + + # reload cached guide after login tuners = get_tuners() current_tuner = get_current_tuner() - m3u_url = tuners[current_tuner]["m3u"] - xml_url = tuners[current_tuner]["xml"] + m3u_url = tuners[current_tuner]["m3u"] if current_tuner and current_tuner in tuners else None + xml_url = tuners[current_tuner]["xml"] if current_tuner and current_tuner in tuners else None global cached_channels, cached_epg - cached_channels = parse_m3u(m3u_url) - cached_epg = parse_epg(xml_url) + if m3u_url: + cached_channels = parse_m3u(m3u_url) + if xml_url: + cached_epg = parse_epg(xml_url) cached_epg = apply_epg_fallback(cached_channels, cached_epg) - # safe redirect handling for optional 'next' param - next_url = request.args.get('next') or request.form.get('next') - if next_url and is_safe_url(next_url): - return redirect(next_url) - return redirect(url_for('guide')) + + # Determine next redirect target (prefer POSTed next, then query param) + next_url = request.form.get('next') or request.args.get('next') or url_for('guide') + if next_url and not is_safe_url(next_url): + return abort(400) + return redirect(next_url) else: log_event(username if username else "unknown", "Failed login attempt") - return render_template('login.html', error='Invalid username or password') - return render_template('login.html') + error = 'Invalid username or password' + next_url = request.form.get('next') or request.args.get('next') or '' + return render_template('login.html', error=error, next=next_url), 401 + + # GET: render login form; preserve ?next=... into the form + next_url = request.args.get('next') or '' + return render_template('login.html', next=next_url) + +@app.route('/_debug/vlcinfo', methods=['GET']) +def _debug_vlcinfo(): + """ + Debug helper: returns last launch args and running vlc/cvlc processes. + This is safe to keep but can be removed once debugging is done. + """ + info = {} + try: + info['last_launch'] = vlc_control.last_launch_info() if vlc_control else None + except Exception as e: + info['last_launch_error'] = str(e) + try: + # list vlc/cvlc processes (ps output) + out = subprocess.check_output(['ps','-o','pid,cmd','-C','cvlc','-C','vlc'], stderr=subprocess.DEVNULL).decode(errors='ignore') + info['processes'] = out.strip() + except Exception as e: + info['processes_error'] = str(e) + return jsonify(info) + +@app.route('/_debug/current', methods=['GET']) +@login_required +def _debug_current(): + """ + Debug helper: returns the server-side CURRENTLY_PLAYING value so you can verify what the server thinks is playing. + """ + return jsonify({ + "CURRENTLY_PLAYING": CURRENTLY_PLAYING, + "cached_channels_sample": [{ + "tvg_id": ch.get('tvg_id'), + "url": ch.get('url'), + "name": ch.get('name') + } for ch in cached_channels[:10]] + }) @app.route('/logout') @login_required @@ -404,6 +415,8 @@ def logout(): return redirect(url_for('login')) def revoke_user_sessions(username): + # Placeholder: later this can use a session-tracking table or Redis + # For now, it clears any "remember" cookie or stored flag session_key = f"user_session_{username}" if session_key in session: session.pop(session_key, None) @@ -487,14 +500,18 @@ def delete_user(): def manage_users(): ua = request.headers.get('User-Agent', '').lower() + # Detect Android / Fire / Google TV browsers tv_patterns = ['silk', 'aft', 'android tv', 'googletv', 'mibox', 'bravia', 'shield', 'tcl', 'hisense', 'puffin', 'tv bro'] is_tv = any(p in ua for p in tv_patterns) + # Restrict access if current_user.username != 'admin' or is_tv: + # Log unauthorized or TV-based attempt log_event(current_user.username, f"Unauthorized attempt to access /manage_users from UA: {ua}") flash("Unauthorized access.") return redirect(url_for('guide')) + # ---- Normal admin logic below ---- with sqlite3.connect(DATABASE, timeout=10) as conn: c = conn.cursor() c.execute('SELECT username FROM users WHERE username != "admin"') @@ -540,8 +557,9 @@ def manage_users(): return render_template('manage_users.html', users=users, current_tuner=get_current_tuner()) + @app.route("/about") -@login_required +@login_required # optional def about(): python_version = sys.version.split()[0] os_info = platform.platform() @@ -549,6 +567,7 @@ def about(): db_path = os.path.join(install_path, "app.db") log_path = "/var/log/iptv" if os.name != "nt" else os.path.join(install_path, "logs") + # calculate uptime uptime_delta = datetime.now() - APP_START_TIME days, seconds = uptime_delta.days, uptime_delta.seconds hours = seconds // 3600 @@ -568,6 +587,7 @@ def about(): } return render_template("about.html", info=info) + @app.route('/change_tuner', methods=['GET', 'POST']) @login_required def change_tuner(): @@ -584,12 +604,14 @@ def change_tuner(): log_event(current_user.username, f"Switched active tuner to {new_tuner}") flash(f"Active tuner switched to {new_tuner}") + # ✅ Refresh cached guide data immediately global cached_channels, cached_epg tuners = get_tuners() m3u_url = tuners[new_tuner]["m3u"] xml_url = tuners[new_tuner]["xml"] cached_channels = parse_m3u(m3u_url) cached_epg = parse_epg(xml_url) + # ✅ Apply “No Guide Data Available” fallback cached_epg = apply_epg_fallback(cached_channels, cached_epg) elif action == "update_urls": @@ -597,10 +619,12 @@ def change_tuner(): xml_url = request.form["xml_url"] m3u_url = request.form["m3u_url"] + # update DB update_tuner_urls(tuner, xml_url, m3u_url) log_event(current_user.username, f"Updated URLs for tuner {tuner}") flash(f"Updated URLs for tuner {tuner}") + # ✅ Validate inputs if xml_url: validate_tuner_url(xml_url, label=f"{tuner} XML") if m3u_url: @@ -617,7 +641,7 @@ def change_tuner(): flash(f"Tuner {tuner} deleted.") elif action == "rename_tuner": - old_name = request.form["tuner"] + old_name = request.form["tuner"] # matches HTML