diff --git a/musicalgestures/_360video.py b/musicalgestures/_360video.py new file mode 100644 index 0000000..cee1394 --- /dev/null +++ b/musicalgestures/_360video.py @@ -0,0 +1,192 @@ +import os +from enum import Enum +from functools import partial +from musicalgestures._video import MgVideo +from musicalgestures._utils import ffmpeg_cmd, get_length, generate_outfilename + + +class Projection(Enum): + """ + same as https://ffmpeg.org/ffmpeg-filters.html#v360. + """ + + e = 0 + equirect = 1 + c3x2 = 2 + c6x1 = 3 + c1x6 = 4 + eac = 5 # Equi-Angular Cubemap. + flat = 6 + gnomonic = 7 + rectilinear = 8 # Regular video. + dfisheye = 9 # Dual fisheye. + barrel = 10 + fb = 11 + barrelsplit = 12 # Facebook’s 360 formats. + sg = 13 # Stereographic format. + mercator = 14 # Mercator format. + ball = 15 # Ball format, gives significant distortion toward the back. + hammer = 16 # Hammer-Aitoff map projection format. + sinusoidal = 17 # Sinusoidal map projection format. + fisheye = 18 # Fisheye projection. + pannini = 19 # Pannini projection. + cylindrical = 20 # Cylindrical projection. + perspective = 21 # Perspective projection. (output only) + tetrahedron = 22 # Tetrahedron projection. + tsp = 23 # Truncated square pyramid projection. + he = 24 + hequirect = 25 # Half equirectangular projection. + equisolid = 26 # Equisolid format. + og = 27 # Orthographic format. + octahedron = 28 # Octahedron projection. + cylindricalea = 29 + + equirectangular = 30 # extra option for equirectangular + erp = 31 + + def __str__(self): + # collapse all aliases of erp + if self.name in ["equirectangular", "erp", "e"]: + return "equirect" + else: + return self.name + + def __eq__(self, other): + # collapse all aliases of erp + if self.name in ["equirectangular", "erp", "e", "equirect"] and other.name in [ + "equirectangular", + "erp", + "e", + "equirect", + ]: + return True + elif self.name == other.name: + return True + else: + return False + + +# TODO: add settings for cameras and files +CAMERA = { + "gopro max": { + "ext": "360", + "projection": Projection.eac, + }, + "insta360 x3": { + "ext": "insv", + "projection": Projection.fisheye, + }, + "garmin virb 360": { + "ext": "MP4", + "projection": Projection.erp, + }, + "ricoh theta xs00": { + "ext": "MP4", + "projection": Projection.erp, + }, +} + + +class Mg360Video(MgVideo): + """ + Class for 360 videos. + """ + + def __init__( + self, + filename: str, + projection: str | Projection, + camera: str = None, + **kwargs, + ): + """ + Args: + filename (str): Path to the video file. + projection (str, Projection): Projection type. + camera (str): Camera type. + """ + super().__init__(filename, **kwargs) + self.filename = os.path.abspath(self.filename) + self.projection = self._parse_projection(projection) + + if camera is None: + self.camera = None + elif camera.lower() in CAMERA: + self.camera = CAMERA[camera.lower()] + else: + raise Warning(f"Camera type '{camera}' not recognized.") + + # override self.show() with extra ipython_kwarg embed=True + self.show = partial(self.show, embed=True) + + def convert_projection( + self, + target_projection: Projection | str, + options: dict[str, str] = None, + print_cmd: bool = False, + ): + """ + Convert the video to a different projection. + Args: + target_projection (Projection): Target projection. + options (dict[str, str], optional): Options for the conversion. Defaults to None. + print_cmd (bool, optional): Print the ffmpeg command. Defaults to False. + """ + target_projection = self._parse_projection(target_projection) + + if target_projection == self.projection: + print(f"{self} is already in target projection {target_projection}.") + return + else: + output_name = generate_outfilename( + f"{self.filename.split('.')[0]}_{target_projection}.mp4" + ) + + # parse options + if options: + options = "".join([f"{k}={options[k]}:" for k in options])[:-1] + cmds = [ + "ffmpeg", + "-i", + self.filename, + "-vf", + f"v360={self.projection}:{target_projection}:{options}", + output_name, + ] + else: + cmds = [ + "ffmpeg", + "-i", + self.filename, + "-vf", + f"v360={self.projection}:{target_projection}", + output_name, + ] + + # execute conversion + ffmpeg_cmd( + cmds, + get_length(self.filename), + pb_prefix=f"Converting projection to {target_projection}:", + print_cmd=print_cmd, + ) + self.filename = output_name + self.projection = target_projection + + def _parse_projection(self, projection: str | Projection): + """ + Parse projection type. + Args: + projection (str): Projection type. + """ + if isinstance(projection, str): + try: + return Projection[projection.lower()] + except KeyError: + raise ValueError( + f"Projection type '{projection}' not recognized. See `Projection` for available options." + ) + elif isinstance(projection, Projection): + return projection + else: + raise TypeError(f"Unsupported projection type: '{type(projection)}'.") diff --git a/musicalgestures/__init__.py b/musicalgestures/__init__.py index aad03ef..5bfb03c 100644 --- a/musicalgestures/__init__.py +++ b/musicalgestures/__init__.py @@ -1,235 +1,30 @@ import os -import numpy as np from musicalgestures._input_test import mg_input_test from musicalgestures._videoreader import mg_videoreader from musicalgestures._flow import Flow -from musicalgestures._audio import MgAudio -from musicalgestures._utils import MgFigure, MgImage, convert, convert_to_mp4, get_framecount, ffmpeg_cmd +from musicalgestures._audio import MgAudio +from musicalgestures._video import MgVideo +from musicalgestures._360video import Mg360Video +from musicalgestures._utils import ( + MgFigure, + MgImage, + convert, + convert_to_mp4, + get_framecount, + ffmpeg_cmd, + get_length, + generate_outfilename, +) from musicalgestures._mglist import MgList -class MgVideo(MgAudio): - """ - This is the class for working with video files in the Musical Gestures Toolbox. It inherites from the class MgAudio for working with audio files as well. - There is a set of preprocessing tools you can use when you load a video, such as: - - trimming: to extract a section of the video, - - skipping: to shrink the video by skipping N frames after keeping one, - - rotating: to rotate the video by N degrees, - - applying brightness and contrast - - cropping: to crop the video either automatically (by assessing the area of motion) or manually with a pop-up user interface, - - converting to grayscale - - These preprocesses will apply upon creating the MgVideo. Further processes are available as class methods. - """ - - def __init__( - self, - filename, - array=None, - fps=None, - path=None, - # Video parameters - filtertype='Regular', - thresh=0.05, - starttime=0, - endtime=0, - blur='None', - skip=0, - frames=0, - rotate=0, - color=True, - contrast=0, - brightness=0, - crop='None', - keep_all=False, - returned_by_process=False, - # Audio parameters - sr=22050, - n_fft=2048, - hop_length=512, - ): - """ - Initializes Musical Gestures data structure from a video file, and applies preprocesses if desired. - - Args: - filename (str): Path to the video file. - array (np.ndarray, optional): Generates an MgVideo object from a video array. Defauts to None. - fps (float, optional): The frequency at which consecutive images from the video array are captured or displayed. Defauts to None. - path (str, optional): Path to save the output video file generated from a video array. Defaults to None. - filtertype (str, optional): The `filtertype` parameter for the `motion()` method. `Regular` turns all values below `thresh` to 0. `Binary` turns all values below `thresh` to 0, above `thresh` to 1. `Blob` removes individual pixels with erosion method. Defaults to 'Regular'. - thresh (float, optional): The `thresh` parameter for the `motion()` method. Eliminates pixel values less than given threshold. A number in the range of 0 to 1. Defaults to 0.05. - starttime (int or float, optional): Trims the video from this start time (s). Defaults to 0. - endtime (int or float, optional): Trims the video until this end time (s). Defaults to 0 (which means the full length). - blur (str, optional): The `blur` parameter for the `motion()` method. 'Average' to apply a 10px * 10px blurring filter, 'None' otherwise. Defaults to 'None'. - skip (int, optional): Time-shrinks the video by skipping (discarding) every n frames determined by `skip`. Defaults to 0. - frames (int, optional): Specify a fixed target number of frames to extract from the video. Defaults to 0. - rotate (int, optional): Rotates the video by a `rotate` degrees. Defaults to 0. - color (bool, optional): If False, converts the video to grayscale and sets every method in grayscale mode. Defaults to True. - contrast (int, optional): Applies +/- 100 contrast to video. Defaults to 0. - brightness (int, optional): Applies +/- 100 brightness to video. Defaults to 0. - crop (str, optional): If 'manual', opens a window displaying the first frame of the input video file, where the user can draw a rectangle to which cropping is applied. If 'auto' the cropping function attempts to determine the area of significant motion and applies the cropping to that area. Defaults to 'None'. - keep_all (bool, optional): If True, preserves an output video file after each used preprocessing stage. Defaults to False. - returned_by_process (bool, optional): This parameter is only for internal use, do not use it. Defaults to False. - - sr (int, optional): Sampling rate of the audio file. Defaults to 22050. - n_fft (int, optional): Length of the FFT window. Defaults to 2048. - hop_length (int, optional): Number of samples between successive frames. Defaults to 512. - """ - - self.filename = filename - self.array = array - self.fps = fps - self.path = path - # Name of file without extension (only-filename) - self.of = os.path.splitext(self.filename)[0] - self.fex = os.path.splitext(self.filename)[1] - # Video parameters - self.color = color - self.starttime = starttime - self.endtime = endtime - self.skip = skip - self.frames = frames - self.filtertype = filtertype - self.thresh = thresh - self.blur = blur - self.contrast = contrast - self.brightness = brightness - self.crop = crop - self.rotate = rotate - self.keep_all = keep_all - self.has_audio = None - self.returned_by_process = returned_by_process - # Audio parameters - self.sr = sr - self.n_fft = n_fft - self.hop_length = hop_length - - # Check input and if FFmpeg is properly installed - self.test_input() - - if all(arg is not None for arg in [self.array, self.fps]): - self.from_numpy(self.array, self.fps) - - self.get_video() - self.flow = Flow(self, self.filename, self.color, self.has_audio) - - from musicalgestures._motionvideo import mg_motion as motion - from musicalgestures._motionvideo import mg_motiongrams as motiongrams - from musicalgestures._motionvideo import mg_motiondata as motiondata - from musicalgestures._motionvideo import mg_motionplots as motionplots - from musicalgestures._motionvideo import mg_motionvideo as motionvideo - from musicalgestures._motionvideo import mg_motionscore as motionscore - from musicalgestures._motionvideo_mp_run import mg_motion_mp as motion_mp - from musicalgestures._subtract import mg_subtract as subtract - from musicalgestures._ssm import mg_ssm as ssm - from musicalgestures._videograms import videograms_ffmpeg as videograms - from musicalgestures._directograms import mg_directograms as directograms - from musicalgestures._warp import mg_warp_audiovisual_beats as warp_audiovisual_beats - from musicalgestures._blurfaces import mg_blurfaces as blur_faces - from musicalgestures._impacts import mg_impacts as impacts - from musicalgestures._grid import mg_grid as grid - from musicalgestures._motionvideo import save_analysis - # from musicalgestures._cropvideo import mg_cropvideo, find_motion_box, find_total_motion_box - from musicalgestures._show import mg_show as show - from musicalgestures._info import mg_info as info - from musicalgestures._history import history_ffmpeg as history - from musicalgestures._history import history_cv2 - from musicalgestures._blend import mg_blend_image as blend - from musicalgestures._pose import pose - - def test_input(self): - """Gives feedback to user if initialization from input went wrong.""" - mg_input_test(self.filename, self.array, self.fps, self.filtertype, self.thresh, self.starttime, self.endtime, self.blur, self.skip, self.frames) - - def get_video(self): - """Creates a video attribute to the Musical Gestures object with the given correct settings.""" - self.length, self.width, self.height, self.fps, self.endtime, self.of, self.fex, self.has_audio = mg_videoreader( - filename=self.filename, - starttime=self.starttime, - endtime=self.endtime, - skip=self.skip, - frames=self.frames, - rotate=self.rotate, - contrast=self.contrast, - brightness=self.brightness, - crop=self.crop, - color=self.color, - returned_by_process=self.returned_by_process, - keep_all=self.keep_all) - - # Convert eventual low-resolution video or image - video_formats = ['.avi', '.mp4', '.mov', '.mkv', '.mpg', '.mpeg', '.webm', '.ogg', '.ts', '.wmv', '.3gp'] - if self.fex not in video_formats: - # Check if it is an image file - if get_framecount(self.filename) == 1: - image_formats = ['.gif', '.jpeg', '.jpg', '.jfif', '.pjpeg', '.png', '.svg', '.webp', '.avif', '.apng'] - if self.fex not in image_formats: - # Create one converted version and register it to the MgVideo - filename = convert(self.of + self.fex, self.of + self.fex + '.png', overwrite=True) - # point of and fex to the png version - self.of, self.fex = os.path.splitext(filename) - else: - # update filename after the processes - self.filename = self.of + self.fex - else: - # Create one converted version and register it to the MgVideo - filename = convert_to_mp4(self.of + self.fex, overwrite=True) - # point of and fex to the mp4 version - self.of, self.fex = os.path.splitext(filename) - else: - # Update filename after the processes - self.filename = self.of + self.fex - - # Check if there is audio in the video file - if self.has_audio: - self.audio = MgAudio(self.filename, self.sr, self.n_fft, self.hop_length) - else: - self.audio = None - - def __repr__(self): - return f"MgVideo('{self.filename}')" - - def numpy(self): - "Pipe all video frames from FFmpeg to numpy array" - - # Define ffmpeg command and load all the video frames in memory - cmd = ['ffmpeg', '-y', '-i', self.filename] - process = ffmpeg_cmd(cmd, total_time=self.length, pipe='load') - # Convert bytes to numpy array - array = np.frombuffer(process.stdout, dtype=np.uint8).reshape(-1, self.height, self.width, 3) - - return array, self.fps - - def from_numpy(self, array, fps, target_name=None): - if target_name is not None: - self.filename = os.path.splitext(target_name)[0] + self.fex - - if self.path is not None: - target_name = os.path.join(self.path, self.filename) - else: - target_name = self.filename - - process = None - for frame in array: - if process is None: - cmd =['ffmpeg', '-y', '-s', '{}x{}'.format(frame.shape[1], frame.shape[0]), - '-r', str(fps), '-f', 'rawvideo', '-pix_fmt', 'bgr24', '-vcodec', 'rawvideo', - '-i', '-', '-vcodec', 'libx264', '-pix_fmt', 'yuv420p', target_name] - process = ffmpeg_cmd(cmd, total_time=array.shape[0], pipe='write') - process.stdin.write(frame.astype(np.uint8)) - process.stdin.close() - process.wait() - - return - - class Examples: def __init__(self): - module_path = os.path.realpath( - os.path.dirname(__file__)).replace("\\", "/") + module_path = os.path.realpath(os.path.dirname(__file__)).replace("\\", "/") # module_path = os.path.abspath(os.path.dirname(__file__)) self.dance = module_path + "/examples/dancer.avi" self.pianist = module_path + "/examples/pianist.avi" self.notebook = module_path + "/MusicalGesturesToolbox.ipynb" -examples = Examples() \ No newline at end of file + +examples = Examples() diff --git a/musicalgestures/_show.py b/musicalgestures/_show.py index 11b6e79..a64f532 100644 --- a/musicalgestures/_show.py +++ b/musicalgestures/_show.py @@ -11,7 +11,7 @@ import musicalgestures -def mg_show(self, filename=None, key=None, mode='windowed', window_width=640, window_height=480, window_title=None): +def mg_show(self, filename=None, key=None, mode='windowed', window_width=640, window_height=480, window_title=None, **ipython_kwargs): # def mg_show(self, filename=None, mode='windowed', window_width=640, window_height=480, window_title=None): """ General method to show an image or video file either in a window, or inline in a jupyter notebook. @@ -23,9 +23,10 @@ def mg_show(self, filename=None, key=None, mode='windowed', window_width=640, wi window_width (int, optional): The width of the window. Defaults to 640. window_height (int, optional): The height of the window. Defaults to 480. window_title (str, optional): The title of the window. If None, the title of the window will be the file name. Defaults to None. + ipython_kwargs (dict, optional): Additional arguments for IPython.display.Image or IPython.display.Video. Defaults to None. """ - def show(file, width=640, height=480, mode='windowed', title='Untitled', parent=None): + def show(file, width=640, height=480, mode='windowed', title='Untitled', parent=None, **ipython_kwargs): """ Helper function which actually does the "showing". @@ -35,11 +36,14 @@ def show(file, width=640, height=480, mode='windowed', title='Untitled', parent= height (int, optional): The height of the window. Defaults to 480. mode (str, optional): 'windowed' will use ffplay (in a separate window), while 'notebook' will use Image or Video from IPython.display. Defaults to 'windowed'. title (str, optional): The title of the window. Defaults to 'Untitled'. + ipython_kwargs (dict, optional): Additional arguments for IPython.display.Image or IPython.display.Video. Defaults to None. """ # Check's if the environment is a Google Colab document if musicalgestures._utils.in_colab(): mode = 'notebook' + elif musicalgestures._utils.in_ipynb(): + mode = 'notebook' if mode.lower() == 'windowed': # from musicalgestures._utils import wrap_str @@ -47,13 +51,14 @@ def show(file, width=640, height=480, mode='windowed', title='Untitled', parent= video_to_display = os.path.realpath(file) cmd = ' '.join(map(str, ['ffplay', video_to_display, '-window_title', title, '-x', width, '-y', height])) - show_in_new_process(cmd) + show_in_new_process(cmd) elif mode.lower() == 'notebook': - video_formats = ['.avi', '.mp4', '.mov', '.mkv', '.mpg', '.mpeg', '.webm', '.ogg', '.ts', '.wmv', '.3gp'] + video_formats = ['.avi', '.mp4', '.mov', '.mkv', '.mpg', '.mpeg', '.webm', '.ogg', '.ts', '.wmv', '.3gp', '.lrv', '.insv', '.360', '.glv'] image_formats = ['.jpg', '.png', '.jpeg', '.tiff', '.gif', '.bmp'] of, file_extension = os.path.splitext(file) + file_extension = file_extension.lower() if file_extension in video_formats: file_type = 'video' @@ -85,7 +90,7 @@ def show(file, width=640, height=480, mode='windowed', title='Untitled', parent= # and if it is somewhere else, we need to embed it to make it work (neither absolute nor relative paths seem to work without embedding) cwd = os.getcwd().replace('\\', '/') file_dir = os.path.dirname(video_to_display).replace('\\', '/') - + def colab_display(video_to_display, video_width, video_height): video_file = open(video_to_display, "r+b").read() video_url = f"data:video/mp4;base64,{b64encode(video_file).decode()}" @@ -97,26 +102,26 @@ def colab_display(video_to_display, video_width, video_height): if musicalgestures._utils.in_colab(): display(colab_display(video_to_display, video_width, video_height)) else: - display(Video(video_to_display,width=video_width, height=video_height)) + display(Video(video_to_display,width=video_width, height=video_height, **ipython_kwargs)) except ValueError: video_to_display = os.path.abspath(video_to_display, os.getcwd()).replace('\\', '/') if musicalgestures._utils.in_colab(): display(colab_display(video_to_display, video_width, video_height)) else: - display(Video(video_to_display, width=video_width, height=video_height)) + display(Video(video_to_display, width=video_width, height=video_height, **ipython_kwargs)) else: try: video_to_display = os.path.relpath(video_to_display, os.getcwd()).replace('\\', '/') if musicalgestures._utils.in_colab(): display(colab_display(video_to_display, video_width, video_height)) else: - display(Video(video_to_display, width=video_width, height=video_height)) + display(Video(video_to_display, width=video_width, height=video_height, **ipython_kwargs)) except ValueError: video_to_display = os.path.abspath(video_to_display, os.getcwd()).replace('\\', '/') if musicalgestures._utils.in_colab(): display(colab_display(video_to_display, video_width, video_height)) else: - display(Video(video_to_display, width=video_width,height=video_height)) + display(Video(video_to_display, width=video_width,height=video_height, **ipython_kwargs)) else: print(f'Unrecognized mode: "{mode}". Try "windowed" or "notebook".') @@ -129,13 +134,13 @@ def colab_display(video_to_display, video_width, video_height): if key == None: filename = self.filename show(file=filename, width=window_width, - height=window_height, mode=mode, title=window_title, parent=self) + height=window_height, mode=mode, title=window_title, parent=self, **ipython_kwargs) elif key.lower() == 'mgx': if "motiongram_x" in keys: filename = self.motiongram_x.filename show(file=filename, width=window_width, - height=window_height, mode=mode, title=f'Horizontal Motiongram | {filename}', parent=self) + height=window_height, mode=mode, title=f'Horizontal Motiongram | {filename}', parent=self, **ipython_kwargs) else: raise FileNotFoundError( "There is no known horizontal motiongram for this file.") @@ -144,7 +149,7 @@ def colab_display(video_to_display, video_width, video_height): if "motiongram_y" in keys: filename = self.motiongram_y.filename show(file=filename, width=window_width, - height=window_height, mode=mode, title=f'Vertical Motiongram | {filename}', parent=self) + height=window_height, mode=mode, title=f'Vertical Motiongram | {filename}', parent=self, **ipython_kwargs) else: raise FileNotFoundError( "There is no known vertical motiongram for this file.") @@ -153,7 +158,7 @@ def colab_display(video_to_display, video_width, video_height): if "videogram_x" in keys: filename = self.videogram_x.filename show(file=filename, width=window_width, - height=window_height, mode=mode, title=f'Horizontal Videogram | {filename}', parent=self) + height=window_height, mode=mode, title=f'Horizontal Videogram | {filename}', parent=self, **ipython_kwargs) else: raise FileNotFoundError( "There is no known horizontal videogram for this file.") @@ -162,7 +167,7 @@ def colab_display(video_to_display, video_width, video_height): if "videogram_y" in keys: filename = self.videogram_y.filename show(file=filename, width=window_width, - height=window_height, mode=mode, title=f'Vertical Videogram | {filename}', parent=self) + height=window_height, mode=mode, title=f'Vertical Videogram | {filename}', parent=self, **ipython_kwargs) else: raise FileNotFoundError( "There is no known vertical videogram for this file.") @@ -171,10 +176,10 @@ def colab_display(video_to_display, video_width, video_height): if "ssm_fig" in keys: filename = self.ssm_fig.image if len(filename) == 2: - show(file=filename[0], width=window_width, height=window_height, mode=mode, title=f'Horizontal SSM | {filename}', parent=self) - show(file=filename[1], width=window_width, height=window_height, mode=mode, title=f'Vertical SSM | {filename}', parent=self) + show(file=filename[0], width=window_width, height=window_height, mode=mode, title=f'Horizontal SSM | {filename}', parent=self, **ipython_kwargs) + show(file=filename[1], width=window_width, height=window_height, mode=mode, title=f'Vertical SSM | {filename}', parent=self, **ipython_kwargs) else: - show(file=filename, width=window_width, height=window_height, mode=mode, title=f'Self-Similarity Matrix | {filename}', parent=self) + show(file=filename, width=window_width, height=window_height, mode=mode, title=f'Self-Similarity Matrix | {filename}', parent=self, **ipython_kwargs) else: raise FileNotFoundError( "There is no known self-smilarity matrix for this file.") @@ -183,7 +188,7 @@ def colab_display(video_to_display, video_width, video_height): if "blend_image" in keys: filename = self.blend_image.filename show(file=filename, width=window_width, - height=window_height, mode=mode, title=f'Blended Image | {filename}', parent=self) + height=window_height, mode=mode, title=f'Blended Image | {filename}', parent=self, **ipython_kwargs) else: raise FileNotFoundError( "There is no known blended image for this file.") @@ -193,7 +198,7 @@ def colab_display(video_to_display, video_width, video_height): if "motion_plot" in keys: filename = self.motion_plot.filename show(file=filename, width=window_width, - height=window_height, mode=mode, title=f'Centroid and Quantity of Motion | {filename}', parent=self) + height=window_height, mode=mode, title=f'Centroid and Quantity of Motion | {filename}', parent=self, **ipython_kwargs) else: raise FileNotFoundError( "There is no known motion plot for this file.") @@ -202,7 +207,7 @@ def colab_display(video_to_display, video_width, video_height): if "motion_video" in keys: filename = self.motion_video.filename show(file=filename, width=window_width, - height=window_height, mode=mode, title=f'Motion Video | {filename}', parent=self) + height=window_height, mode=mode, title=f'Motion Video | {filename}', parent=self, **ipython_kwargs) else: raise FileNotFoundError( "There is no known motion video for this file.") @@ -211,7 +216,7 @@ def colab_display(video_to_display, video_width, video_height): if "history_video" in keys: filename = self.history_video.filename show(file=filename, width=window_width, - height=window_height, mode=mode, title=f'History Video | {filename}', parent=self) + height=window_height, mode=mode, title=f'History Video | {filename}', parent=self, **ipython_kwargs) else: raise FileNotFoundError( "There is no known history video for this file.") @@ -222,7 +227,7 @@ def colab_display(video_to_display, video_width, video_height): if "history_video" in motion_video_keys: filename = self.motion_vide.history_video.filename show(file=filename, width=window_width, - height=window_height, mode=mode, title=f'Motion History Video | {filename}', parent=self) + height=window_height, mode=mode, title=f'Motion History Video | {filename}', parent=self, **ipython_kwargs) else: raise FileNotFoundError( "There is no known motion history video for this file.") @@ -234,7 +239,7 @@ def colab_display(video_to_display, video_width, video_height): if "flow_sparse_video" in keys: filename = self.flow_sparse_video.filename show(file=filename, width=window_width, - height=window_height, mode=mode, title=f'Sparse Optical Flow Video | {filename}', parent=self) + height=window_height, mode=mode, title=f'Sparse Optical Flow Video | {filename}', parent=self, **ipython_kwargs) else: raise FileNotFoundError( "There is no known sparse optial flow video for this file.") @@ -243,7 +248,7 @@ def colab_display(video_to_display, video_width, video_height): if "flow_dense_video" in keys: filename = self.flow_dense_video.filename show(file=filename, width=window_width, - height=window_height, mode=mode, title=f'Dense Optical Flow Video | {filename}', parent=self) + height=window_height, mode=mode, title=f'Dense Optical Flow Video | {filename}', parent=self, **ipython_kwargs) else: raise FileNotFoundError( "There is no known dense optial flow video for this file.") @@ -252,7 +257,7 @@ def colab_display(video_to_display, video_width, video_height): if "pose_video" in keys: filename = self.pose_video.filename show(file=filename, width=window_width, - height=window_height, mode=mode, title=f'Pose Video | {filename}', parent=self) + height=window_height, mode=mode, title=f'Pose Video | {filename}', parent=self, **ipython_kwargs) else: raise FileNotFoundError( "There is no known pose video for this file.") @@ -261,7 +266,7 @@ def colab_display(video_to_display, video_width, video_height): if "warp_audiovisual_beats" in keys: filename = self.warp_audiovisual_beats.filename show(file=filename, width=window_width, - height=window_height, mode=mode, title=f'Warp Audiovisual Video | {filename}', parent=self) + height=window_height, mode=mode, title=f'Warp Audiovisual Video | {filename}', parent=self, **ipython_kwargs) else: raise FileNotFoundError( "There is no known warp audiovisual beats video for this file.") @@ -270,13 +275,13 @@ def colab_display(video_to_display, video_width, video_height): if "blur_faces" in keys: filename = self.blur_faces.filename show(file=filename, width=window_width, - height=window_height, mode=mode, title=f'Blur Faces Video | {filename}', parent=self) + height=window_height, mode=mode, title=f'Blur Faces Video | {filename}', parent=self, **ipython_kwargs) elif key.lower() == 'subtract': if "subtract" in keys: filename = self.subtract.filename show(file=filename, width=window_width, - height=window_height, mode=mode, title=f'Background Subtraction Video | {filename}', parent=self) + height=window_height, mode=mode, title=f'Background Subtraction Video | {filename}', parent=self, **ipython_kwargs) else: raise FileNotFoundError("There is no known subtract video for this file.") @@ -287,7 +292,7 @@ def colab_display(video_to_display, video_width, video_height): else: show(file=filename, width=window_width, - height=window_height, mode=mode, title=window_title, parent=self) + height=window_height, mode=mode, title=window_title, parent=self, **ipython_kwargs) # show(file=filename, width=window_width, height=window_height, mode=mode, title=window_title) return self diff --git a/musicalgestures/_utils.py b/musicalgestures/_utils.py index c452981..93e25e9 100644 --- a/musicalgestures/_utils.py +++ b/musicalgestures/_utils.py @@ -568,6 +568,62 @@ def cast_into_avi(filename, target_name=None, overwrite=False): return target_name +def extract_frame( + filename: str, + frame: int=None, + time: str|float=None, + target_name: str=None, + overwrite: bool=False, + )-> str: + """ + Extracts a single frame from a video using ffmpeg. + + Args: + filename (str): Path to the input video file. + frame (int): The frame number to extract. + time (str|float): The time in HH:MM:ss.ms where to extract the frame from. If float, it is interpreted as seconds from the start of the video. + target_name (str, optional): The name for the output file. If None, the name will be \FRAME\.\. Defaults to None. + overwrite (bool, optional): Whether to allow overwriting existing files or to automatically increment target filename to avoid overwriting. Defaults to False. + """ + + import os + import datetime + if frame is not None and time is not None: + raise ValueError("frame and time cannot be both not None.") + if frame is None and time is None: + raise ValueError("frame and time cannot be both None.") + + name, ext = os.path.splitext(filename) + if not target_name: + if frame is not None: + target_name = f"{name}_frame_{str(frame)}.png" + elif time is not None: + time = time if isinstance(time, str) else datetime.datetime.fromtimestamp(time-3600).strftime('%H:%M:%S.%f') + target_name = f"{name}_time_{time}.png" + if not overwrite: + target_name = generate_outfilename(target_name) + + if frame is not None: + cmds = ['ffmpeg', + '-y' if overwrite else "-n", + '-i', filename, + "-vf", f"select='eq(n\,{frame})'", + "-vsync", "0", + # "-vframes", "1", + target_name] + elif time is not None: + cmds = ['ffmpeg', + '-y' if overwrite else "-n", + '-i', filename, + "-vf", f"select='eq(t\,{time})'", + "-vsync", "0", + # "-vframes", "1", + target_name] + ffmpeg_cmd(cmds, get_length(filename), pb_prefix='Extracting frame:') + + return target_name + + def extract_subclip(filename, t1, t2, target_name=None, overwrite=False): """ Extracts a section of the video using ffmpeg. @@ -1004,7 +1060,7 @@ def ffprobe(filename): else: return out -def get_widthheight(filename): +def get_widthheight(filename: str) -> tuple[int, int]: """ Gets the width and height of a video using FFprobe. @@ -1071,7 +1127,7 @@ def has_audio(filename): return True -def get_length(filename): +def get_length(filename: str) -> float: """ Gets the length (in seconds) of a video using FFprobe. @@ -1388,7 +1444,7 @@ def ffmpeg_cmd(command, total_time, pb_prefix='Progress', print_cmd=False, strea command = ['ffmpeg', '-hide_banner', '-loglevel', 'quiet'] + command[1:] if print_cmd: - if type(command) == list: + if isinstance(command, list): print(' '.join(command)) else: print(command) @@ -1442,7 +1498,7 @@ def ffmpeg_cmd(command, total_time, pb_prefix='Progress', print_cmd=False, strea if returncode in [None, 0]: pb.progress(total_time) else: - raise FFmpegError(all_out) + raise FFmpegError(f"return code: {returncode}"+all_out) except KeyboardInterrupt: try: @@ -1519,3 +1575,100 @@ def in_colab(): except NameError: result = False return result + + +def in_ipynb(): + """ + Check if the environment is a Jupyter notebook. + Taken from https://stackoverflow.com/questions/15411967/how-can-i-check-if-code-is-executed-in-the-ipython-notebook. + + Returns: + bool: True if the environment is a Jupyter notebook, otherwise False. + """ + try: + shell = get_ipython().__class__.__name__ + if shell == 'ZMQInteractiveShell': + return True # Jupyter notebook or qtconsole + elif shell == 'TerminalInteractiveShell': + return False # Terminal running IPython + else: + return False # Other type (?) + except NameError: + return False # Probably standard Python interpreter + + +class FilesNotMatchError(Exception): + def __init__(self, message): + self.message = message + + +def merge_videos( + media_paths: list, target_name: str = None, overwrite: bool = False, print_cmd: bool = False +) -> str: + """ + Merges a list of video files into a single video file using ffmpeg. + + Args: + media_paths (list): List of paths to the video files to merge. + target_name (str, optional): The name of the output video. Defaults to None (which assumes that the input filename with the suffix "_merged" should be used). + overwrite (bool, optional): Whether to allow overwriting existing files or to automatically increment target filename to avoid overwriting. Defaults to False. + + Returns: + str: Path to the output video. + """ + + if len(media_paths) == 0: + raise ValueError("The list of media paths is empty.") + elif len(media_paths) == 1: + return media_paths[0] + + import os + from musicalgestures._utils import generate_outfilename + + # check if all media files have the same container, same resolution and same fps + try: + for media in media_paths: + pass_if_containers_match(media, media_paths[0]) + assert get_widthheight(media) == get_widthheight(media_paths[0]) + assert get_fps(media) == get_fps(media_paths[0]) + except WrongContainer: + raise FilesNotMatchError("All media files must be in the same container.") + except AssertionError: + raise FilesNotMatchError("All media files must have the same resolution and fps.") + + # set target name, a new file in the same directory as the first media file + of, fex = os.path.splitext(media_paths[0]) + of = os.path.abspath(of) + # create a tmp .txt file for concat + txt_path = os.path.join(os.path.dirname(media_paths[0]), "tmp.txt") + with open(os.path.join(txt_path), "w") as f: + for media in media_paths: + f.write(f"file '{os.path.abspath(media)}'\n") + + # if files are in certain containers, remain the same; + # otherwise, convert to .mkv + if fex.lower() not in [".mp4", ".mov", ".avi"]: + fex = ".mkv" + # set target name, a new file in the same directory as the first media file + if target_name == None: + target_name = of + "_merged" + fex.lower() + if not overwrite: + target_name = generate_outfilename(target_name) + + total_length = sum([get_length(media) for media in media_paths]) + + cmd = [ + "ffmpeg", + "-y" if overwrite else "-n", + "-f", "concat", + "-safe", "0", + "-i", txt_path, + "-c", "copy", + target_name, + ] + ffmpeg_cmd(cmd, total_length, pb_prefix="Merging videos:", print_cmd=print_cmd) + + # remove tmp.txt + os.remove(txt_path) + + return target_name diff --git a/musicalgestures/_video.py b/musicalgestures/_video.py new file mode 100644 index 0000000..62988d1 --- /dev/null +++ b/musicalgestures/_video.py @@ -0,0 +1,321 @@ +import os +import glob +import numpy as np +from musicalgestures._input_test import mg_input_test +from musicalgestures._videoreader import mg_videoreader +from musicalgestures._flow import Flow +from musicalgestures._audio import MgAudio +from musicalgestures._utils import ( + convert, + convert_to_mp4, + get_framecount, + ffmpeg_cmd, + merge_videos, + extract_frame, + MgImage +) + + +class MgVideo(MgAudio): + """ + This is the class for working with video files in the Musical Gestures Toolbox. It inherites from the class MgAudio for working with audio files as well. + There is a set of preprocessing tools you can use when you load a video, such as: + - trimming: to extract a section of the video, + - skipping: to shrink the video by skipping N frames after keeping one, + - rotating: to rotate the video by N degrees, + - applying brightness and contrast + - cropping: to crop the video either automatically (by assessing the area of motion) or manually with a pop-up user interface, + - converting to grayscale + + These preprocesses will apply upon creating the MgVideo. Further processes are available as class methods. + """ + + def __init__( + self, + filename: str|list[str], + array=None, + fps=None, + path=None, + # Video parameters + filtertype="Regular", + thresh=0.05, + starttime=0, + endtime=0, + blur="None", + skip=0, + frames=0, + rotate=0, + color=True, + contrast=0, + brightness=0, + crop="None", + keep_all=False, + returned_by_process=False, + # Audio parameters + sr=22050, + n_fft=2048, + hop_length=512, + ): + """ + Initializes Musical Gestures data structure from a video file, and applies preprocesses if desired. + + Args: + filename (str|list[str]): Path to the video file. If input is a list, will merge all videos into one. + array (np.ndarray, optional): Generates an MgVideo object from a video array. Defauts to None. + fps (float, optional): The frequency at which consecutive images from the video array are captured or displayed. Defauts to None. + path (str, optional): Path to save the output video file generated from a video array. Defaults to None. + filtertype (str, optional): The `filtertype` parameter for the `motion()` method. `Regular` turns all values below `thresh` to 0. `Binary` turns all values below `thresh` to 0, above `thresh` to 1. `Blob` removes individual pixels with erosion method. Defaults to 'Regular'. + thresh (float, optional): The `thresh` parameter for the `motion()` method. Eliminates pixel values less than given threshold. A number in the range of 0 to 1. Defaults to 0.05. + starttime (int or float, optional): Trims the video from this start time (s). Defaults to 0. + endtime (int or float, optional): Trims the video until this end time (s). Defaults to 0 (which means the full length). + blur (str, optional): The `blur` parameter for the `motion()` method. 'Average' to apply a 10px * 10px blurring filter, 'None' otherwise. Defaults to 'None'. + skip (int, optional): Time-shrinks the video by skipping (discarding) every n frames determined by `skip`. Defaults to 0. + frames (int, optional): Specify a fixed target number of frames to extract from the video. Defaults to 0. + rotate (int, optional): Rotates the video by a `rotate` degrees. Defaults to 0. + color (bool, optional): If False, converts the video to grayscale and sets every method in grayscale mode. Defaults to True. + contrast (int, optional): Applies +/- 100 contrast to video. Defaults to 0. + brightness (int, optional): Applies +/- 100 brightness to video. Defaults to 0. + crop (str, optional): If 'manual', opens a window displaying the first frame of the input video file, where the user can draw a rectangle to which cropping is applied. If 'auto' the cropping function attempts to determine the area of significant motion and applies the cropping to that area. Defaults to 'None'. + keep_all (bool, optional): If True, preserves an output video file after each used preprocessing stage. Defaults to False. + returned_by_process (bool, optional): This parameter is only for internal use, do not use it. Defaults to False. + + sr (int, optional): Sampling rate of the audio file. Defaults to 22050. + n_fft (int, optional): Length of the FFT window. Defaults to 2048. + hop_length (int, optional): Number of samples between successive frames. Defaults to 512. + """ + + # if filename is a list, merge all videos into one + if isinstance(filename, list): + self.filename = merge_videos(filename) + else: + self.filename = filename + + self.array = array + self.fps = fps + self.path = path + # Name of file without extension (only-filename) + self.of = os.path.splitext(self.filename)[0] + self.fex = os.path.splitext(self.filename)[1] + # Video parameters + self.color = color + self.starttime = starttime + self.endtime = endtime + self.skip = skip + self.frames = frames + self.filtertype = filtertype + self.thresh = thresh + self.blur = blur + self.contrast = contrast + self.brightness = brightness + self.crop = crop + self.rotate = rotate + self.keep_all = keep_all + self.has_audio = None + self.returned_by_process = returned_by_process + # Audio parameters + self.sr = sr + self.n_fft = n_fft + self.hop_length = hop_length + + # Check input and if FFmpeg is properly installed + self.test_input() + + if all(arg is not None for arg in [self.array, self.fps]): + self.from_numpy(self.array, self.fps) + + self.get_video() + self.flow = Flow(self, self.filename, self.color, self.has_audio) + + from musicalgestures._motionvideo import mg_motion as motion + from musicalgestures._motionvideo import mg_motiongrams as motiongrams + from musicalgestures._motionvideo import mg_motiondata as motiondata + from musicalgestures._motionvideo import mg_motionplots as motionplots + from musicalgestures._motionvideo import mg_motionvideo as motionvideo + from musicalgestures._motionvideo import mg_motionscore as motionscore + from musicalgestures._motionvideo_mp_run import mg_motion_mp as motion_mp + from musicalgestures._subtract import mg_subtract as subtract + from musicalgestures._ssm import mg_ssm as ssm + from musicalgestures._videograms import videograms_ffmpeg as videograms + from musicalgestures._directograms import mg_directograms as directograms + from musicalgestures._warp import ( + mg_warp_audiovisual_beats as warp_audiovisual_beats, + ) + from musicalgestures._blurfaces import mg_blurfaces as blur_faces + from musicalgestures._impacts import mg_impacts as impacts + from musicalgestures._grid import mg_grid as grid + from musicalgestures._motionvideo import save_analysis + + # from musicalgestures._cropvideo import mg_cropvideo, find_motion_box, find_total_motion_box + from musicalgestures._show import mg_show as show + from musicalgestures._info import mg_info as info + from musicalgestures._history import history_ffmpeg as history + from musicalgestures._history import history_cv2 + from musicalgestures._blend import mg_blend_image as blend + from musicalgestures._pose import pose + + def test_input(self): + """Gives feedback to user if initialization from input went wrong.""" + mg_input_test( + self.filename, + self.array, + self.fps, + self.filtertype, + self.thresh, + self.starttime, + self.endtime, + self.blur, + self.skip, + self.frames, + ) + + def get_video(self): + """Creates a video attribute to the Musical Gestures object with the given correct settings.""" + ( + self.length, + self.width, + self.height, + self.fps, + self.endtime, + self.of, + self.fex, + self.has_audio, + ) = mg_videoreader( + filename=self.filename, + starttime=self.starttime, + endtime=self.endtime, + skip=self.skip, + frames=self.frames, + rotate=self.rotate, + contrast=self.contrast, + brightness=self.brightness, + crop=self.crop, + color=self.color, + returned_by_process=self.returned_by_process, + keep_all=self.keep_all, + ) + + # Convert eventual low-resolution video or image + video_formats = [ + ".avi", + ".mp4", + ".mov", + ".mkv", + ".mpg", + ".mpeg", + ".webm", + ".ogg", + ".ts", + ".wmv", + ".3gp", + ] + if self.fex not in video_formats: + # Check if it is an image file + if get_framecount(self.filename) == 1: + image_formats = [ + ".gif", + ".jpeg", + ".jpg", + ".jfif", + ".pjpeg", + ".png", + ".svg", + ".webp", + ".avif", + ".apng", + ] + if self.fex not in image_formats: + # Create one converted version and register it to the MgVideo + filename = convert( + self.of + self.fex, self.of + self.fex + ".png", overwrite=True + ) + # point of and fex to the png version + self.of, self.fex = os.path.splitext(filename) + else: + # update filename after the processes + self.filename = self.of + self.fex + else: + # Create one converted version and register it to the MgVideo + filename = convert_to_mp4(self.of + self.fex, overwrite=True) + # point of and fex to the mp4 version + self.of, self.fex = os.path.splitext(filename) + else: + # Update filename after the processes + self.filename = self.of + self.fex + + # Check if there is audio in the video file + if self.has_audio: + self.audio = MgAudio(self.filename, self.sr, self.n_fft, self.hop_length) + else: + self.audio = None + + def __repr__(self): + return f"MgVideo('{self.filename}')" + + def numpy(self): + "Pipe all video frames from FFmpeg to numpy array" + + # Define ffmpeg command and load all the video frames in memory + cmd = ["ffmpeg", "-y", "-i", self.filename] + process = ffmpeg_cmd(cmd, total_time=self.length, pipe="load") + # Convert bytes to numpy array + array = np.frombuffer(process.stdout, dtype=np.uint8).reshape( + -1, self.height, self.width, 3 + ) + + return array, self.fps + + def from_numpy(self, array, fps, target_name=None): + if target_name is not None: + self.filename = os.path.splitext(target_name)[0] + self.fex + + if self.path is not None: + target_name = os.path.join(self.path, self.filename) + else: + target_name = self.filename + + process = None + for frame in array: + if process is None: + cmd = [ + "ffmpeg", + "-y", + "-s", + "{}x{}".format(frame.shape[1], frame.shape[0]), + "-r", + str(fps), + "-f", + "rawvideo", + "-pix_fmt", + "bgr24", + "-vcodec", + "rawvideo", + "-i", + "-", + "-vcodec", + "libx264", + "-pix_fmt", + "yuv420p", + target_name, + ] + process = ffmpeg_cmd(cmd, total_time=array.shape[0], pipe="write") + process.stdin.write(frame.astype(np.uint8)) + process.stdin.close() + process.wait() + + return + + def extract_frame(self, **kwargs): + """ + Extracts a frame from the video at a given time. + see _utils.extract_frame for details. + + Args: + frame (int): The frame number to extract. + time (str): The time in HH:MM:ss.ms where to extract the frame from. + target_name (str, optional): The name for the output file. If None, the name will be \FRAME\.\. Defaults to None. + overwrite (bool, optional): Whether to allow overwriting existing files or to automatically increment target filename to avoid overwriting. Defaults to False. + + Returns: + MgImage: An MgImage object referring to the extracted frame. + """ + return MgImage(extract_frame(self.filename, **kwargs))