From 4fb7d23602fd2bd57866c217aba7e54a3fe08e78 Mon Sep 17 00:00:00 2001 From: Mort Yao Date: Sun, 27 Apr 2025 01:58:30 +0200 Subject: [PATCH 1/7] [common] fix a long-standing bug that causes infinite downloading when content-length is missing --- src/you_get/common.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/you_get/common.py b/src/you_get/common.py index 0b307ddee8..c99e13da60 100755 --- a/src/you_get/common.py +++ b/src/you_get/common.py @@ -807,6 +807,8 @@ def numreturn(a): except socket.timeout: pass if not buffer: + if file_size == float('+inf'): # Prevent infinite downloading + break if is_chunked and received_chunk == range_length: break elif not is_chunked and received == file_size: # Download finished @@ -827,9 +829,10 @@ def numreturn(a): received, os.path.getsize(temp_filepath), temp_filepath ) - if os.access(filepath, os.W_OK): + if os.access(filepath, os.W_OK) and file_size != float('inf'): # on Windows rename could fail if destination filepath exists - os.remove(filepath) + # we should simply choose a new name instead of brutal os.remove(filepath) + filepath = filepath + " (2)" os.rename(temp_filepath, filepath) From 57cf717bd5be00c23cce50a136253577bd66763b Mon Sep 17 00:00:00 2001 From: Mort Yao Date: Sun, 27 Apr 2025 14:55:05 +0200 Subject: [PATCH 2/7] python-package.yml: disable the new flake8 F824 check --- .github/workflows/python-package.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 70fcc335e6..17198de58a 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -31,7 +31,7 @@ jobs: - name: Lint with flake8 run: | # stop the build if there are Python syntax errors or undefined names - flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics + flake8 . --count --select=E9,F63,F7,F82,F824 --show-source --statistics # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics - name: Test with unittest From c7e7525c9aec969984ac487982ba8f1876f68fd0 Mon Sep 17 00:00:00 2001 From: Mort Yao Date: Sun, 27 Apr 2025 15:02:18 +0200 Subject: [PATCH 3/7] python-package.yml: disable the new flake8 F824 check --- .github/workflows/python-package.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 17198de58a..fc24cda3d6 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -31,7 +31,7 @@ jobs: - name: Lint with flake8 run: | # stop the build if there are Python syntax errors or undefined names - flake8 . --count --select=E9,F63,F7,F82,F824 --show-source --statistics + flake8 . --count --select=E9,F63,F7,F82 --ignore=F824 --show-source --statistics # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics - name: Test with unittest From 1af8b714cdfed08523895dbb67de9bce631cf94e Mon Sep 17 00:00:00 2001 From: Mort Yao Date: Sun, 27 Apr 2025 16:05:03 +0200 Subject: [PATCH 4/7] python-package.yml: remove python 3.7 (no longer available in Ubuntu 24.04) --- .github/workflows/python-package.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index fc24cda3d6..be2e88842c 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -15,7 +15,7 @@ jobs: strategy: fail-fast: false matrix: - python-version: [3.7, 3.8, 3.9, '3.10', '3.11', '3.12', '3.13', pypy-3.8, pypy-3.9, pypy-3.10] + python-version: [3.8, 3.9, '3.10', '3.11', '3.12', '3.13', pypy-3.8, pypy-3.9, pypy-3.10] steps: - uses: actions/checkout@v4 From 049548f3f3f35e67ba8d3181c71fdc71d11cf260 Mon Sep 17 00:00:00 2001 From: Mort Yao Date: Sun, 27 Apr 2025 17:32:00 +0200 Subject: [PATCH 5/7] README.md: add --force-reinstall to pip because it is now necessary for upgrading from a VCS URL to work when the package version is unchanged (https://github.com/pypa/pip/issues/9397, https://github.com/pypa/pip/issues/5780) --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index a3cb7cea84..178ae0ac74 100644 --- a/README.md +++ b/README.md @@ -156,7 +156,7 @@ $ you-get https://github.com/soimort/you-get/archive/master.zip In order to get the latest ```develop``` branch without messing up the PIP, you can try: ``` -$ pip install --upgrade git+https://github.com/soimort/you-get@develop +$ pip install --upgrade --force-reinstall git+https://github.com/soimort/you-get@develop ``` ## Getting Started From 84bb3cac683cb7b38f216cb67a18d5adf87241f8 Mon Sep 17 00:00:00 2001 From: michaelos443 Date: Mon, 8 Dec 2025 17:06:53 +0000 Subject: [PATCH 6/7] refactor(extractor): reduce code duplication and improve PEP8 compliance --- src/you_get/extractor.py | 396 +++++++++++++++++++++++++++------------ 1 file changed, 274 insertions(+), 122 deletions(-) diff --git a/src/you_get/extractor.py b/src/you_get/extractor.py index bd71717e72..d3a0bd4117 100644 --- a/src/you_get/extractor.py +++ b/src/you_get/extractor.py @@ -1,31 +1,88 @@ #!/usr/bin/env python +"""Base extractor classes for video downloading. -from .common import match1, maybe_print, download_urls, get_filename, parse_host, set_proxy, unset_proxy, get_content, dry_run, player +This module provides: + - Extractor: Minimal base class for simple extractors. + - VideoExtractor: Full-featured base class with stream handling. +""" + +import os +import sys + +from .common import ( + maybe_print, download_urls, get_filename, parse_host, + set_proxy, unset_proxy, dry_run, player +) from .common import print_more_compatible as print from .util import log from . import json_output -import os -import sys + + +def _init_base_attributes(obj, url=None): + """Initialize common attributes for extractor classes. + + Args: + obj: The extractor instance to initialize. + url: Optional URL to set. + """ + obj.url = url + obj.title = None + obj.vid = None + obj.streams = {} + obj.streams_sorted = [] + class Extractor(): + """Minimal base class for simple extractors. + + Attributes: + url: The source URL. + title: The content title. + vid: The video ID. + streams: Dict of available streams. + streams_sorted: List of streams sorted by quality. + """ + def __init__(self, *args): - self.url = None - self.title = None - self.vid = None - self.streams = {} - self.streams_sorted = [] + """Initialize the extractor. + + Args: + *args: Optional URL as first argument. + """ + _init_base_attributes(self, args[0] if args else None) - if args: - self.url = args[0] class VideoExtractor(): + """Full-featured base class for video extractors. + + Provides stream management, download handling, and output formatting. + + Attributes: + url: The source URL. + title: The content title. + vid: The video ID. + m3u8_url: HLS manifest URL if applicable. + streams: Dict of available streams. + streams_sorted: List of streams sorted by quality. + audiolang: Available audio languages. + password_protected: Whether content requires password. + dash_streams: Dict of DASH streams. + caption_tracks: Dict of caption tracks by language. + out: Flag indicating early exit. + ua: Custom User-Agent header. + referer: Custom Referer header. + danmaku: Danmaku/comments data. + lyrics: Lyrics data. + """ + def __init__(self, *args): - self.url = None - self.title = None - self.vid = None + """Initialize the video extractor. + + Args: + *args: Optional URL as first argument. + """ + _init_base_attributes(self, args[0] if args else None) self.m3u8_url = None - self.streams = {} - self.streams_sorted = [] self.audiolang = None self.password_protected = False self.dash_streams = {} @@ -36,67 +93,113 @@ def __init__(self, *args): self.danmaku = None self.lyrics = None - if args: - self.url = args[0] - - def download_by_url(self, url, **kwargs): - self.url = url - self.vid = None + def _sort_streams(self): + """Sort streams by quality based on stream_types ordering.""" + try: + self.streams_sorted = [ + dict([('id', st['id'])] + + list(self.streams[st['id']].items())) + for st in self.__class__.stream_types + if st['id'] in self.streams + ] + except KeyError: + self.streams_sorted = [ + dict([('itag', st['itag'])] + + list(self.streams[st['itag']].items())) + for st in self.__class__.stream_types + if st['itag'] in self.streams + ] + + def _run_download(self, **kwargs): + """Execute the prepare, extract, and download pipeline. + + Args: + **kwargs: Download options passed to each step. + """ + extractor_proxy = kwargs.get('extractor_proxy') + if extractor_proxy: + set_proxy(parse_host(extractor_proxy)) - if 'extractor_proxy' in kwargs and kwargs['extractor_proxy']: - set_proxy(parse_host(kwargs['extractor_proxy'])) self.prepare(**kwargs) - if self.out: - return - if 'extractor_proxy' in kwargs and kwargs['extractor_proxy']: + + if extractor_proxy: unset_proxy() - try: - self.streams_sorted = [dict([('id', stream_type['id'])] + list(self.streams[stream_type['id']].items())) for stream_type in self.__class__.stream_types if stream_type['id'] in self.streams] - except: - self.streams_sorted = [dict([('itag', stream_type['itag'])] + list(self.streams[stream_type['itag']].items())) for stream_type in self.__class__.stream_types if stream_type['itag'] in self.streams] + if self.out: + return + self._sort_streams() self.extract(**kwargs) - self.download(**kwargs) - def download_by_vid(self, vid, **kwargs): - self.url = None - self.vid = vid - - if 'extractor_proxy' in kwargs and kwargs['extractor_proxy']: - set_proxy(parse_host(kwargs['extractor_proxy'])) - self.prepare(**kwargs) - if 'extractor_proxy' in kwargs and kwargs['extractor_proxy']: - unset_proxy() + def download_by_url(self, url, **kwargs): + """Download video by URL. - try: - self.streams_sorted = [dict([('id', stream_type['id'])] + list(self.streams[stream_type['id']].items())) for stream_type in self.__class__.stream_types if stream_type['id'] in self.streams] - except: - self.streams_sorted = [dict([('itag', stream_type['itag'])] + list(self.streams[stream_type['itag']].items())) for stream_type in self.__class__.stream_types if stream_type['itag'] in self.streams] + Args: + url: The video URL. + **kwargs: Download options. + """ + self.url = url + self.vid = None + self._run_download(**kwargs) - self.extract(**kwargs) + def download_by_vid(self, vid, **kwargs): + """Download video by video ID. - self.download(**kwargs) + Args: + vid: The video ID. + **kwargs: Download options. + """ + self.url = None + self.vid = vid + self._run_download(**kwargs) def prepare(self, **kwargs): + """Prepare for download. Override in subclasses.""" pass - #raise NotImplementedError() def extract(self, **kwargs): + """Extract stream information. Override in subclasses.""" pass - #raise NotImplementedError() - def p_stream(self, stream_id): + def _get_stream(self, stream_id): + """Get stream info from streams or dash_streams. + + Args: + stream_id: The stream identifier. + + Returns: + The stream dictionary. + """ if stream_id in self.streams: - stream = self.streams[stream_id] - else: - stream = self.dash_streams[stream_id] + return self.streams[stream_id] + return self.dash_streams[stream_id] + + def _get_stream_id_key(self, stream): + """Get the stream ID key ('id' or 'itag'). + + Args: + stream: The stream dictionary. + + Returns: + The stream ID value. + """ + return stream['id'] if 'id' in stream else stream['itag'] + + def p_stream(self, stream_id): + """Print stream information. + + Args: + stream_id: The stream identifier to print. + """ + stream = self._get_stream(stream_id) if 'itag' in stream: - print(" - itag: %s" % log.sprint(stream_id, log.NEGATIVE)) + print(" - itag: %s" % + log.sprint(stream_id, log.NEGATIVE)) else: - print(" - format: %s" % log.sprint(stream_id, log.NEGATIVE)) + print(" - format: %s" % + log.sprint(stream_id, log.NEGATIVE)) if 'container' in stream: print(" container: %s" % stream['container']) @@ -107,61 +210,84 @@ def p_stream(self, stream_id): if 'quality' in stream: print(" quality: %s" % stream['quality']) - if 'size' in stream and 'container' in stream and stream['container'].lower() != 'm3u8': - if stream['size'] != float('inf') and stream['size'] != 0: - print(" size: %s MiB (%s bytes)" % (round(stream['size'] / 1048576, 1), stream['size'])) + if 'size' in stream and 'container' in stream: + if stream['container'].lower() != 'm3u8': + if stream['size'] != float('inf') and stream['size'] != 0: + size_mib = round(stream['size'] / 1048576, 1) + print(" size: %s MiB (%s bytes)" % + (size_mib, stream['size'])) if 'm3u8_url' in stream: print(" m3u8_url: {}".format(stream['m3u8_url'])) if 'itag' in stream: - print(" # download-with: %s" % log.sprint("you-get --itag=%s [URL]" % stream_id, log.UNDERLINE)) + cmd = "you-get --itag=%s [URL]" % stream_id + print(" # download-with: %s" % + log.sprint(cmd, log.UNDERLINE)) else: - print(" # download-with: %s" % log.sprint("you-get --format=%s [URL]" % stream_id, log.UNDERLINE)) + cmd = "you-get --format=%s [URL]" % stream_id + print(" # download-with: %s" % + log.sprint(cmd, log.UNDERLINE)) print() def p_i(self, stream_id): - if stream_id in self.streams: - stream = self.streams[stream_id] - else: - stream = self.dash_streams[stream_id] + """Print minimal stream info (index mode). + + Args: + stream_id: The stream identifier. + """ + stream = self._get_stream(stream_id) + size_mib = round(stream['size'] / 1048576, 1) maybe_print(" - title: %s" % self.title) - print(" size: %s MiB (%s bytes)" % (round(stream['size'] / 1048576, 1), stream['size'])) + print(" size: %s MiB (%s bytes)" % + (size_mib, stream['size'])) print(" url: %s" % self.url) print() sys.stdout.flush() + def _select_best_stream_id(self): + """Select the best stream ID from sorted streams. + + Returns: + The best stream ID. + """ + return self._get_stream_id_key(self.streams_sorted[0]) + def p(self, stream_id=None): + """Print video information. + + Args: + stream_id: Stream to print. None for best, [] for all. + """ maybe_print("site: %s" % self.__class__.name) maybe_print("title: %s" % self.title) + if stream_id: - # Print the stream print("stream:") self.p_stream(stream_id) elif stream_id is None: - # Print stream with best quality print("stream: # Best quality") - stream_id = self.streams_sorted[0]['id'] if 'id' in self.streams_sorted[0] else self.streams_sorted[0]['itag'] + stream_id = self._select_best_stream_id() self.p_stream(stream_id) elif stream_id == []: print("streams: # Available quality and codecs") - # Print DASH streams if self.dash_streams: print(" [ DASH ] %s" % ('_' * 36)) - itags = sorted(self.dash_streams, - key=lambda i: -self.dash_streams[i]['size']) + itags = sorted( + self.dash_streams, + key=lambda i: -self.dash_streams[i]['size'] + ) for stream in itags: self.p_stream(stream) - # Print all other available streams if self.streams_sorted: print(" [ DEFAULT ] %s" % ('_' * 33)) for stream in self.streams_sorted: - self.p_stream(stream['id'] if 'id' in stream else stream['itag']) + self.p_stream(self._get_stream_id_key(stream)) if self.audiolang: print("audio-languages:") @@ -172,76 +298,115 @@ def p(self, stream_id=None): sys.stdout.flush() def p_playlist(self, stream_id=None): + """Print playlist information. + + Args: + stream_id: Optional stream identifier (unused). + """ maybe_print("site: %s" % self.__class__.name) print("playlist: %s" % self.title) print("videos:") + def _save_auxiliary_files(self, output_dir): + """Save danmaku and lyrics files if available. + + Args: + output_dir: The output directory path. + """ + if self.danmaku is not None and not dry_run: + filename = '{}.cmt.xml'.format(get_filename(self.title)) + print('Downloading {} ...\n'.format(filename)) + filepath = os.path.join(output_dir, filename) + with open(filepath, 'w', encoding='utf8') as fp: + fp.write(self.danmaku) + + if self.lyrics is not None and not dry_run: + filename = '{}.lrc'.format(get_filename(self.title)) + print('Downloading {} ...\n'.format(filename)) + filepath = os.path.join(output_dir, filename) + with open(filepath, 'w', encoding='utf8') as fp: + fp.write(self.lyrics) + def download(self, **kwargs): - if 'json_output' in kwargs and kwargs['json_output']: + """Download the video or display information. + + Args: + **kwargs: Download options including: + json_output: Output as JSON. + info_only: Only display info, don't download. + stream_id: Specific stream to download. + index: Use index mode display. + output_dir: Output directory. + merge: Merge video parts. + caption: Download captions. + """ + if kwargs.get('json_output'): json_output.output(self) - elif 'info_only' in kwargs and kwargs['info_only']: - if 'stream_id' in kwargs and kwargs['stream_id']: - # Display the stream - stream_id = kwargs['stream_id'] + elif kwargs.get('info_only'): + stream_id = kwargs.get('stream_id') + if stream_id: if 'index' not in kwargs: self.p(stream_id) else: self.p_i(stream_id) else: - # Display all available streams if 'index' not in kwargs: self.p([]) else: - stream_id = self.streams_sorted[0]['id'] if 'id' in self.streams_sorted[0] else self.streams_sorted[0]['itag'] + stream_id = self._select_best_stream_id() self.p_i(stream_id) else: - if 'stream_id' in kwargs and kwargs['stream_id']: - # Download the stream - stream_id = kwargs['stream_id'] - else: + stream_id = kwargs.get('stream_id') + if not stream_id: # Download stream with the best quality from .processor.ffmpeg import has_ffmpeg_installed - if has_ffmpeg_installed() and player is None and self.dash_streams or not self.streams_sorted: - #stream_id = list(self.dash_streams)[-1] - itags = sorted(self.dash_streams, - key=lambda i: -self.dash_streams[i]['size']) + use_dash = ( + has_ffmpeg_installed() and + player is None and + self.dash_streams + ) or not self.streams_sorted + + if use_dash: + itags = sorted( + self.dash_streams, + key=lambda i: -self.dash_streams[i]['size'] + ) stream_id = itags[0] else: - stream_id = self.streams_sorted[0]['id'] if 'id' in self.streams_sorted[0] else self.streams_sorted[0]['itag'] + stream_id = self._select_best_stream_id() if 'index' not in kwargs: self.p(stream_id) else: self.p_i(stream_id) - if stream_id in self.streams: - urls = self.streams[stream_id]['src'] - ext = self.streams[stream_id]['container'] - total_size = self.streams[stream_id]['size'] - else: - urls = self.dash_streams[stream_id]['src'] - ext = self.dash_streams[stream_id]['container'] - total_size = self.dash_streams[stream_id]['size'] + stream = self._get_stream(stream_id) + urls = stream['src'] + ext = stream['container'] + total_size = stream['size'] - if ext == 'm3u8' or ext == 'm4a': + if ext in ('m3u8', 'm4a'): ext = 'mp4' if not urls: log.wtf('[Failed] Cannot extract video source.') - # For legacy main() + headers = {} if self.ua is not None: headers['User-Agent'] = self.ua if self.referer is not None: headers['Referer'] = self.referer - download_urls(urls, self.title, ext, total_size, headers=headers, - output_dir=kwargs['output_dir'], - merge=kwargs['merge'], - av=stream_id in self.dash_streams, - vid=self.vid) - if 'caption' not in kwargs or not kwargs['caption']: + download_urls( + urls, self.title, ext, total_size, headers=headers, + output_dir=kwargs['output_dir'], + merge=kwargs['merge'], + av=stream_id in self.dash_streams, + vid=self.vid + ) + + if not kwargs.get('caption'): print('Skipping captions or danmaku.') return @@ -249,25 +414,12 @@ def download(self, **kwargs): filename = '%s.%s.srt' % (get_filename(self.title), lang) print('Saving %s ... ' % filename, end="", flush=True) srt = self.caption_tracks[lang] - with open(os.path.join(kwargs['output_dir'], filename), - 'w', encoding='utf-8') as x: + filepath = os.path.join(kwargs['output_dir'], filename) + with open(filepath, 'w', encoding='utf-8') as x: x.write(srt) print('Done.') - if self.danmaku is not None and not dry_run: - filename = '{}.cmt.xml'.format(get_filename(self.title)) - print('Downloading {} ...\n'.format(filename)) - with open(os.path.join(kwargs['output_dir'], filename), 'w', encoding='utf8') as fp: - fp.write(self.danmaku) - - if self.lyrics is not None and not dry_run: - filename = '{}.lrc'.format(get_filename(self.title)) - print('Downloading {} ...\n'.format(filename)) - with open(os.path.join(kwargs['output_dir'], filename), 'w', encoding='utf8') as fp: - fp.write(self.lyrics) - - # For main_dev() - #download_urls(urls, self.title, self.streams[stream_id]['container'], self.streams[stream_id]['size']) - keep_obj = kwargs.get('keep_obj', False) - if not keep_obj: + self._save_auxiliary_files(kwargs['output_dir']) + + if not kwargs.get('keep_obj', False): self.__init__() From 73834b102b25430dbfa03c4fb6f476fb4b3c6000 Mon Sep 17 00:00:00 2001 From: michaelos443 Date: Tue, 9 Dec 2025 16:33:55 +0000 Subject: [PATCH 7/7] Add type hints and NumPy-style docstrings to join_flv.py - Add module-level docstring describing the FLV joining functionality - Add type hints to all functions, methods, and class attributes - Add NumPy-style docstrings to all public functions and the ECMAObject class - Add FLVTag type alias for FLV tag tuple structure - Import typing annotations (Any, BinaryIO, Callable) - Fix PEP8 issues (blank lines, loop variable naming) --- src/you_get/processor/join_flv.py | 699 +++++++++++++++++++++++++----- 1 file changed, 596 insertions(+), 103 deletions(-) diff --git a/src/you_get/processor/join_flv.py b/src/you_get/processor/join_flv.py index 4ac7aadb2b..2579c07981 100755 --- a/src/you_get/processor/join_flv.py +++ b/src/you_get/processor/join_flv.py @@ -1,44 +1,123 @@ #!/usr/bin/env python +""" +FLV file joining module. + +This module provides functionality for reading, writing, and concatenating +FLV (Flash Video) files. It includes support for parsing AMF0 (Action Message +Format) data structures commonly used in FLV metadata. + +Notes +----- +FLV files contain audio and video data with metadata encoded in AMF0 format. +This module handles the low-level parsing and writing of these structures +to enable joining multiple FLV segments into a single file. +""" + +from __future__ import annotations import struct from io import BytesIO +from typing import Any, BinaryIO, Callable -TAG_TYPE_METADATA = 18 +TAG_TYPE_METADATA: int = 18 ################################################## # AMF0 ################################################## -AMF_TYPE_NUMBER = 0x00 -AMF_TYPE_BOOLEAN = 0x01 -AMF_TYPE_STRING = 0x02 -AMF_TYPE_OBJECT = 0x03 -AMF_TYPE_MOVIECLIP = 0x04 -AMF_TYPE_NULL = 0x05 -AMF_TYPE_UNDEFINED = 0x06 -AMF_TYPE_REFERENCE = 0x07 -AMF_TYPE_MIXED_ARRAY = 0x08 -AMF_TYPE_END_OF_OBJECT = 0x09 -AMF_TYPE_ARRAY = 0x0A -AMF_TYPE_DATE = 0x0B -AMF_TYPE_LONG_STRING = 0x0C -AMF_TYPE_UNSUPPORTED = 0x0D -AMF_TYPE_RECORDSET = 0x0E -AMF_TYPE_XML = 0x0F -AMF_TYPE_CLASS_OBJECT = 0x10 -AMF_TYPE_AMF3_OBJECT = 0x11 +AMF_TYPE_NUMBER: int = 0x00 +AMF_TYPE_BOOLEAN: int = 0x01 +AMF_TYPE_STRING: int = 0x02 +AMF_TYPE_OBJECT: int = 0x03 +AMF_TYPE_MOVIECLIP: int = 0x04 +AMF_TYPE_NULL: int = 0x05 +AMF_TYPE_UNDEFINED: int = 0x06 +AMF_TYPE_REFERENCE: int = 0x07 +AMF_TYPE_MIXED_ARRAY: int = 0x08 +AMF_TYPE_END_OF_OBJECT: int = 0x09 +AMF_TYPE_ARRAY: int = 0x0A +AMF_TYPE_DATE: int = 0x0B +AMF_TYPE_LONG_STRING: int = 0x0C +AMF_TYPE_UNSUPPORTED: int = 0x0D +AMF_TYPE_RECORDSET: int = 0x0E +AMF_TYPE_XML: int = 0x0F +AMF_TYPE_CLASS_OBJECT: int = 0x10 +AMF_TYPE_AMF3_OBJECT: int = 0x11 + class ECMAObject: - def __init__(self, max_number): - self.max_number = max_number - self.data = [] - self.map = {} - def put(self, k, v): + """ + ECMA Array object for AMF0 data structures. + + An ordered dictionary-like structure that maintains both insertion order + and key-value mapping, used in AMF0 mixed arrays. + + Parameters + ---------- + max_number : int + The maximum number of elements expected in the array. + + Attributes + ---------- + max_number : int + The maximum number of elements in the array. + data : list[tuple[str, Any]] + Ordered list of key-value pairs. + map : dict[str, Any] + Dictionary mapping keys to values for fast lookup. + """ + + def __init__(self, max_number: int) -> None: + self.max_number: int = max_number + self.data: list[tuple[str, Any]] = [] + self.map: dict[str, Any] = {} + + def put(self, k: str, v: Any) -> None: + """ + Add a key-value pair to the object. + + Parameters + ---------- + k : str + The key to add. + v : Any + The value associated with the key. + """ self.data.append((k, v)) self.map[k] = v - def get(self, k): + + def get(self, k: str) -> Any: + """ + Get a value by key. + + Parameters + ---------- + k : str + The key to look up. + + Returns + ------- + Any + The value associated with the key. + """ return self.map[k] - def set(self, k, v): + + def set(self, k: str, v: Any) -> None: + """ + Set an existing key to a new value. + + Parameters + ---------- + k : str + The key to update. + v : Any + The new value. + + Raises + ------ + KeyError + If the key does not exist in the object. + """ for i in range(len(self.data)): if self.data[i][0] == k: self.data[i] = (k, v) @@ -46,22 +125,80 @@ def set(self, k, v): else: raise KeyError(k) self.map[k] = v - def keys(self): - return self.map.keys() - def __str__(self): + + def keys(self) -> list[str]: + """ + Get all keys in the object. + + Returns + ------- + list[str] + A list of all keys. + """ + return list(self.map.keys()) + + def __str__(self) -> str: + """Return a string representation of the ECMAObject.""" return 'ECMAObject<' + repr(self.map) + '>' - def __eq__(self, other): + + def __eq__(self, other: object) -> bool: + """Check equality with another ECMAObject.""" + if not isinstance(other, ECMAObject): + return NotImplemented return self.max_number == other.max_number and self.data == other.data -def read_amf_number(stream): + +def read_amf_number(stream: BinaryIO) -> float: + """ + Read an AMF0 number (64-bit IEEE 754 double) from a stream. + + Parameters + ---------- + stream : BinaryIO + The binary stream to read from. + + Returns + ------- + float + The decoded number value. + """ return struct.unpack('>d', stream.read(8))[0] -def read_amf_boolean(stream): + +def read_amf_boolean(stream: BinaryIO) -> bool: + """ + Read an AMF0 boolean value from a stream. + + Parameters + ---------- + stream : BinaryIO + The binary stream to read from. + + Returns + ------- + bool + The decoded boolean value. + """ b = read_byte(stream) assert b in (0, 1) return bool(b) -def read_amf_string(stream): + +def read_amf_string(stream: BinaryIO) -> str | None: + """ + Read an AMF0 string from a stream. + + Parameters + ---------- + stream : BinaryIO + The binary stream to read from. + + Returns + ------- + str or None + The decoded UTF-8 string, or None if the stream is empty + (dirty fix for invalid Qiyi FLV files). + """ xx = stream.read(2) if xx == b'': # dirty fix for the invalid Qiyi flv @@ -71,8 +208,22 @@ def read_amf_string(stream): assert len(s) == n return s.decode('utf-8') -def read_amf_object(stream): - obj = {} + +def read_amf_object(stream: BinaryIO) -> dict[str, Any]: + """ + Read an AMF0 object from a stream. + + Parameters + ---------- + stream : BinaryIO + The binary stream to read from. + + Returns + ------- + dict[str, Any] + A dictionary containing the object's key-value pairs. + """ + obj: dict[str, Any] = {} while True: k = read_amf_string(stream) if not k: @@ -82,7 +233,21 @@ def read_amf_object(stream): obj[k] = v return obj -def read_amf_mixed_array(stream): + +def read_amf_mixed_array(stream: BinaryIO) -> ECMAObject: + """ + Read an AMF0 mixed array (ECMA array) from a stream. + + Parameters + ---------- + stream : BinaryIO + The binary stream to read from. + + Returns + ------- + ECMAObject + An ECMAObject containing the mixed array data. + """ max_number = read_uint(stream) mixed_results = ECMAObject(max_number) while True: @@ -98,14 +263,29 @@ def read_amf_mixed_array(stream): assert len(mixed_results.data) == max_number return mixed_results -def read_amf_array(stream): + +def read_amf_array(stream: BinaryIO) -> list[Any]: + """ + Read an AMF0 strict array from a stream. + + Parameters + ---------- + stream : BinaryIO + The binary stream to read from. + + Returns + ------- + list[Any] + A list containing the array elements. + """ n = read_uint(stream) - v = [] - for i in range(n): + v: list[Any] = [] + for _ in range(n): v.append(read_amf(stream)) return v -amf_readers = { + +amf_readers: dict[int, Callable[[BinaryIO], Any]] = { AMF_TYPE_NUMBER: read_amf_number, AMF_TYPE_BOOLEAN: read_amf_boolean, AMF_TYPE_STRING: read_amf_string, @@ -114,31 +294,102 @@ def read_amf_array(stream): AMF_TYPE_ARRAY: read_amf_array, } -def read_amf(stream): + +def read_amf(stream: BinaryIO) -> Any: + """ + Read an AMF0 value from a stream. + + Reads the type marker byte and dispatches to the appropriate reader. + + Parameters + ---------- + stream : BinaryIO + The binary stream to read from. + + Returns + ------- + Any + The decoded AMF0 value. + """ return amf_readers[read_byte(stream)](stream) -def write_amf_number(stream, v): + +def write_amf_number(stream: BinaryIO, v: float) -> None: + """ + Write an AMF0 number to a stream. + + Parameters + ---------- + stream : BinaryIO + The binary stream to write to. + v : float + The number value to write. + """ stream.write(struct.pack('>d', v)) -def write_amf_boolean(stream, v): + +def write_amf_boolean(stream: BinaryIO, v: bool) -> None: + """ + Write an AMF0 boolean to a stream. + + Parameters + ---------- + stream : BinaryIO + The binary stream to write to. + v : bool + The boolean value to write. + """ if v: stream.write(b'\x01') else: stream.write(b'\x00') -def write_amf_string(stream, s): - s = s.encode('utf-8') - stream.write(struct.pack('>H', len(s))) - stream.write(s) -def write_amf_object(stream, o): +def write_amf_string(stream: BinaryIO, s: str) -> None: + """ + Write an AMF0 string to a stream. + + Parameters + ---------- + stream : BinaryIO + The binary stream to write to. + s : str + The string to write. + """ + encoded = s.encode('utf-8') + stream.write(struct.pack('>H', len(encoded))) + stream.write(encoded) + + +def write_amf_object(stream: BinaryIO, o: dict[str, Any]) -> None: + """ + Write an AMF0 object to a stream. + + Parameters + ---------- + stream : BinaryIO + The binary stream to write to. + o : dict[str, Any] + The dictionary object to write. + """ for k in o: write_amf_string(stream, k) write_amf(stream, o[k]) write_amf_string(stream, '') write_byte(stream, AMF_TYPE_END_OF_OBJECT) -def write_amf_mixed_array(stream, o): + +def write_amf_mixed_array(stream: BinaryIO, o: ECMAObject) -> None: + """ + Write an AMF0 mixed array to a stream. + + Parameters + ---------- + stream : BinaryIO + The binary stream to write to. + o : ECMAObject + The ECMAObject to write. + """ write_uint(stream, o.max_number) for k, v in o.data: write_amf_string(stream, k) @@ -146,12 +397,24 @@ def write_amf_mixed_array(stream, o): write_amf_string(stream, '') write_byte(stream, AMF_TYPE_END_OF_OBJECT) -def write_amf_array(stream, o): + +def write_amf_array(stream: BinaryIO, o: list[Any]) -> None: + """ + Write an AMF0 strict array to a stream. + + Parameters + ---------- + stream : BinaryIO + The binary stream to write to. + o : list[Any] + The list to write. + """ write_uint(stream, len(o)) for v in o: write_amf(stream, v) -amf_writers_tags = { + +amf_writers_tags: dict[type, int] = { float: AMF_TYPE_NUMBER, bool: AMF_TYPE_BOOLEAN, str: AMF_TYPE_STRING, @@ -160,7 +423,7 @@ def write_amf_array(stream, o): list: AMF_TYPE_ARRAY, } -amf_writers = { +amf_writers: dict[int, Callable[[BinaryIO, Any], None]] = { AMF_TYPE_NUMBER: write_amf_number, AMF_TYPE_BOOLEAN: write_amf_boolean, AMF_TYPE_STRING: write_amf_string, @@ -169,7 +432,20 @@ def write_amf_array(stream, o): AMF_TYPE_ARRAY: write_amf_array, } -def write_amf(stream, v): + +def write_amf(stream: BinaryIO, v: Any) -> None: + """ + Write an AMF0 value to a stream. + + Determines the appropriate type tag and writer based on the value type. + + Parameters + ---------- + stream : BinaryIO + The binary stream to write to. + v : Any + The value to write. + """ if isinstance(v, ECMAObject): tag = amf_writers_tags[ECMAObject] else: @@ -181,30 +457,132 @@ def write_amf(stream, v): # FLV ################################################## -def read_int(stream): + +# Type alias for FLV tags +FLVTag = tuple[int, int, int, bytes, int] + + +def read_int(stream: BinaryIO) -> int: + """ + Read a signed 32-bit big-endian integer from a stream. + + Parameters + ---------- + stream : BinaryIO + The binary stream to read from. + + Returns + ------- + int + The decoded signed integer. + """ return struct.unpack('>i', stream.read(4))[0] -def read_uint(stream): + +def read_uint(stream: BinaryIO) -> int: + """ + Read an unsigned 32-bit big-endian integer from a stream. + + Parameters + ---------- + stream : BinaryIO + The binary stream to read from. + + Returns + ------- + int + The decoded unsigned integer. + """ return struct.unpack('>I', stream.read(4))[0] -def write_uint(stream, n): + +def write_uint(stream: BinaryIO, n: int) -> None: + """ + Write an unsigned 32-bit big-endian integer to a stream. + + Parameters + ---------- + stream : BinaryIO + The binary stream to write to. + n : int + The unsigned integer to write. + """ stream.write(struct.pack('>I', n)) -def read_byte(stream): + +def read_byte(stream: BinaryIO) -> int: + """ + Read a single byte from a stream. + + Parameters + ---------- + stream : BinaryIO + The binary stream to read from. + + Returns + ------- + int + The byte value (0-255). + """ return ord(stream.read(1)) -def write_byte(stream, b): + +def write_byte(stream: BinaryIO, b: int) -> None: + """ + Write a single byte to a stream. + + Parameters + ---------- + stream : BinaryIO + The binary stream to write to. + b : int + The byte value to write (0-255). + """ stream.write(bytes([b])) -def read_unsigned_medium_int(stream): + +def read_unsigned_medium_int(stream: BinaryIO) -> int: + """ + Read an unsigned 24-bit big-endian integer from a stream. + + Parameters + ---------- + stream : BinaryIO + The binary stream to read from. + + Returns + ------- + int + The decoded 24-bit unsigned integer. + """ x1, x2, x3 = struct.unpack('BBB', stream.read(3)) return (x1 << 16) | (x2 << 8) | x3 -def read_tag(stream): + +def read_tag(stream: BinaryIO) -> FLVTag | None: + """ + Read an FLV tag from a stream. + + Parameters + ---------- + stream : BinaryIO + The binary stream to read from. + + Returns + ------- + FLVTag or None + A tuple of (data_type, timestamp, body_size, body, previous_tag_size), + or None if end of stream is reached. + + Raises + ------ + AssertionError + If the tag body size exceeds 128MB or if the stream ID is non-zero. + """ # header size: 15 bytes header = stream.read(15) if len(header) == 4: - return + return None x = struct.unpack('>IBBBBBBBBBBB', header) previous_tag_size = x[0] data_type = x[1] @@ -215,31 +593,47 @@ def read_tag(stream): assert x[9:] == (0, 0, 0) body = stream.read(body_size) return (data_type, timestamp, body_size, body, previous_tag_size) - #previous_tag_size = read_uint(stream) - #data_type = read_byte(stream) - #body_size = read_unsigned_medium_int(stream) - #assert body_size < 1024*1024*128, 'tag body size too big (> 128MB)' - #timestamp = read_unsigned_medium_int(stream) - #timestamp += read_byte(stream) << 24 - #assert read_unsigned_medium_int(stream) == 0 - #body = stream.read(body_size) - #return (data_type, timestamp, body_size, body, previous_tag_size) - -def write_tag(stream, tag): + + +def write_tag(stream: BinaryIO, tag: FLVTag) -> None: + """ + Write an FLV tag to a stream. + + Parameters + ---------- + stream : BinaryIO + The binary stream to write to. + tag : FLVTag + A tuple of (data_type, timestamp, body_size, body, previous_tag_size). + """ data_type, timestamp, body_size, body, previous_tag_size = tag write_uint(stream, previous_tag_size) write_byte(stream, data_type) - write_byte(stream, body_size>>16 & 0xff) - write_byte(stream, body_size>>8 & 0xff) - write_byte(stream, body_size & 0xff) - write_byte(stream, timestamp>>16 & 0xff) - write_byte(stream, timestamp>>8 & 0xff) - write_byte(stream, timestamp & 0xff) - write_byte(stream, timestamp>>24 & 0xff) + write_byte(stream, body_size >> 16 & 0xff) + write_byte(stream, body_size >> 8 & 0xff) + write_byte(stream, body_size & 0xff) + write_byte(stream, timestamp >> 16 & 0xff) + write_byte(stream, timestamp >> 8 & 0xff) + write_byte(stream, timestamp & 0xff) + write_byte(stream, timestamp >> 24 & 0xff) stream.write(b'\0\0\0') stream.write(body) -def read_flv_header(stream): + +def read_flv_header(stream: BinaryIO) -> None: + """ + Read and validate an FLV file header. + + Parameters + ---------- + stream : BinaryIO + The binary stream to read from. + + Raises + ------ + AssertionError + If the header is invalid or has unexpected values. + """ assert stream.read(3) == b'FLV' header_version = read_byte(stream) assert header_version == 1 @@ -248,30 +642,80 @@ def read_flv_header(stream): data_offset = read_uint(stream) assert data_offset == 9 -def write_flv_header(stream): + +def write_flv_header(stream: BinaryIO) -> None: + """ + Write an FLV file header to a stream. + + Parameters + ---------- + stream : BinaryIO + The binary stream to write to. + """ stream.write(b'FLV') write_byte(stream, 1) write_byte(stream, 5) write_uint(stream, 9) -def read_meta_data(stream): + +def read_meta_data(stream: BinaryIO) -> tuple[Any, Any]: + """ + Read FLV metadata from a stream. + + Parameters + ---------- + stream : BinaryIO + The binary stream to read from. + + Returns + ------- + tuple[Any, Any] + A tuple of (meta_type, meta_data). + """ meta_type = read_amf(stream) meta = read_amf(stream) return meta_type, meta -def read_meta_tag(tag): + +def read_meta_tag(tag: FLVTag) -> tuple[Any, Any]: + """ + Parse metadata from an FLV tag. + + Parameters + ---------- + tag : FLVTag + The FLV tag containing metadata. + + Returns + ------- + tuple[Any, Any] + A tuple of (meta_type, meta_data). + + Raises + ------ + AssertionError + If the tag is not a metadata tag or has unexpected values. + """ data_type, timestamp, body_size, body, previous_tag_size = tag assert data_type == TAG_TYPE_METADATA assert timestamp == 0 assert previous_tag_size == 0 return read_meta_data(BytesIO(body)) -#def write_meta_data(stream, meta_type, meta_data): -# assert isinstance(meta_type, basesting) -# write_amf(meta_type) -# write_amf(meta_data) -def write_meta_tag(stream, meta_type, meta_data): +def write_meta_tag(stream: BinaryIO, meta_type: Any, meta_data: Any) -> None: + """ + Write a metadata tag to a stream. + + Parameters + ---------- + stream : BinaryIO + The binary stream to write to. + meta_type : Any + The metadata type identifier. + meta_data : Any + The metadata content. + """ buffer = BytesIO() write_amf(buffer, meta_type) write_amf(buffer, meta_data) @@ -283,23 +727,62 @@ def write_meta_tag(stream, meta_type, meta_data): # main ################################################## -def guess_output(inputs): + +def guess_output(inputs: list[str]) -> str: + """ + Guess an output filename based on common prefix of input filenames. + + Parameters + ---------- + inputs : list[str] + List of input file paths. + + Returns + ------- + str + A suggested output filename with '.flv' extension. + """ import os.path - inputs = map(os.path.basename, inputs) - n = min(map(len, inputs)) + basenames = list(map(os.path.basename, inputs)) + n = min(map(len, basenames)) for i in reversed(range(1, n)): - if len(set(s[:i] for s in inputs)) == 1: - return inputs[0][:i] + '.flv' + if len(set(s[:i] for s in basenames)) == 1: + return basenames[0][:i] + '.flv' return 'output.flv' -def concat_flv(flvs, output = None): + +def concat_flv(flvs: list[str], output: str | None = None) -> str: + """ + Concatenate multiple FLV files into a single file. + + Reads multiple FLV files, merges their metadata (updating total duration), + and writes all tags to a single output file with adjusted timestamps. + + Parameters + ---------- + flvs : list[str] + List of input FLV file paths. + output : str or None, optional + Output file path. If None, a filename is guessed from inputs. + If a directory, the guessed filename is placed in that directory. + + Returns + ------- + str + The path to the output file. + + Raises + ------ + AssertionError + If no FLV files are provided or if metadata types don't match. + """ assert flvs, 'no flv file found' import os.path if not output: output = guess_output(flvs) elif os.path.isdir(output): output = os.path.join(output, guess_output(flvs)) - + print('Merging video parts...') ins = [open(flv, 'rb') for flv in flvs] for stream in ins: @@ -309,13 +792,13 @@ def concat_flv(flvs, output = None): meta_types, metas = zip(*metas) assert len(set(meta_types)) == 1 meta_type = meta_types[0] - + # must merge fields: duration # TODO: check other meta info, update other meta info total_duration = sum(meta.get('duration') for meta in metas) meta_data = metas[0] meta_data.set('duration', total_duration) - + out = open(output, 'wb') write_flv_header(out) write_meta_tag(out, meta_type, meta_data) @@ -332,17 +815,26 @@ def concat_flv(flvs, output = None): break timestamp_start = timestamp write_uint(out, previous_tag_size) - + return output -def usage(): + +def usage() -> None: + """Print usage information for the command-line interface.""" print('Usage: [python3] join_flv.py --output TARGET.flv flv...') -def main(): - import sys, getopt + +def main() -> None: + """ + Main entry point for the FLV joining command-line tool. + + Parses command-line arguments and invokes the FLV concatenation. + """ + import getopt + import sys try: opts, args = getopt.getopt(sys.argv[1:], "ho:", ["help", "output="]) - except getopt.GetoptError as err: + except getopt.GetoptError: usage() sys.exit(1) output = None @@ -358,8 +850,9 @@ def main(): if not args: usage() sys.exit(1) - + concat_flv(args, output) + if __name__ == '__main__': main()