From 5637ff821c34ef5360e16b731a784d221d8f7678 Mon Sep 17 00:00:00 2001 From: seungw00-ji Date: Fri, 8 Oct 2021 11:09:42 +0900 Subject: [PATCH] add mmcv/test/test_video folder --- mmcv/test/test_video/test_optflow.py | 257 ++++++++++++++++++++++++ mmcv/test/test_video/test_processing.py | 53 +++++ mmcv/test/test_video/test_reader.py | 210 +++++++++++++++++++ 3 files changed, 520 insertions(+) create mode 100644 mmcv/test/test_video/test_optflow.py create mode 100644 mmcv/test/test_video/test_processing.py create mode 100644 mmcv/test/test_video/test_reader.py diff --git a/mmcv/test/test_video/test_optflow.py b/mmcv/test/test_video/test_optflow.py new file mode 100644 index 0000000..3eecbac --- /dev/null +++ b/mmcv/test/test_video/test_optflow.py @@ -0,0 +1,257 @@ +# Copyright (c) Open-MMLab. All rights reserved. +import os +import os.path as osp +import tempfile + +import numpy as np +import pytest +from numpy.testing import assert_array_almost_equal, assert_array_equal + +import mmcv + + +def test_flowread(): + data_dir = osp.join(osp.dirname(__file__), '../data') + flow_shape = (60, 80, 2) + + # read .flo file + flow = mmcv.flowread(osp.join(data_dir, 'optflow.flo')) + assert flow.shape == flow_shape + + # pseudo read + flow_same = mmcv.flowread(flow) + assert_array_equal(flow, flow_same) + + # read quantized flow concatenated vertically + flow = mmcv.flowread( + osp.join(data_dir, 'optflow_concat0.jpg'), quantize=True, denorm=True) + assert flow.shape == flow_shape + + # read quantized flow concatenated horizontally + flow = mmcv.flowread( + osp.join(data_dir, 'optflow_concat1.jpg'), + quantize=True, + concat_axis=1, + denorm=True) + assert flow.shape == flow_shape + + # test exceptions + notflow_file = osp.join(data_dir, 'color.jpg') + with pytest.raises(TypeError): + mmcv.flowread(1) + with pytest.raises(IOError): + mmcv.flowread(notflow_file) + with pytest.raises(IOError): + mmcv.flowread(notflow_file, quantize=True) + with pytest.raises(ValueError): + mmcv.flowread(np.zeros((100, 100, 1))) + + +def test_flowwrite(): + flow = np.random.rand(100, 100, 2).astype(np.float32) + + # write to a .flo file + _, filename = tempfile.mkstemp() + mmcv.flowwrite(flow, filename) + flow_from_file = mmcv.flowread(filename) + assert_array_equal(flow, flow_from_file) + os.remove(filename) + + # write to two .jpg files + tmp_filename = osp.join(tempfile.gettempdir(), 'mmcv_test_flow.jpg') + for concat_axis in range(2): + mmcv.flowwrite( + flow, tmp_filename, quantize=True, concat_axis=concat_axis) + shape = (200, 100) if concat_axis == 0 else (100, 200) + assert osp.isfile(tmp_filename) + assert mmcv.imread(tmp_filename, flag='unchanged').shape == shape + os.remove(tmp_filename) + + # test exceptions + with pytest.raises(AssertionError): + mmcv.flowwrite(flow, tmp_filename, quantize=True, concat_axis=2) + + +def test_quantize_flow(): + flow = (np.random.rand(10, 8, 2).astype(np.float32) - 0.5) * 15 + max_val = 5.0 + dx, dy = mmcv.quantize_flow(flow, max_val=max_val, norm=False) + ref = np.zeros_like(flow, dtype=np.uint8) + for i in range(ref.shape[0]): + for j in range(ref.shape[1]): + for k in range(ref.shape[2]): + val = flow[i, j, k] + max_val + val = min(max(val, 0), 2 * max_val) + ref[i, j, k] = min(np.floor(255 * val / (2 * max_val)), 254) + assert_array_equal(dx, ref[..., 0]) + assert_array_equal(dy, ref[..., 1]) + max_val = 0.5 + dx, dy = mmcv.quantize_flow(flow, max_val=max_val, norm=True) + ref = np.zeros_like(flow, dtype=np.uint8) + for i in range(ref.shape[0]): + for j in range(ref.shape[1]): + for k in range(ref.shape[2]): + scale = flow.shape[1] if k == 0 else flow.shape[0] + val = flow[i, j, k] / scale + max_val + val = min(max(val, 0), 2 * max_val) + ref[i, j, k] = min(np.floor(255 * val / (2 * max_val)), 254) + assert_array_equal(dx, ref[..., 0]) + assert_array_equal(dy, ref[..., 1]) + + +def test_dequantize_flow(): + dx = np.random.randint(256, size=(10, 8), dtype=np.uint8) + dy = np.random.randint(256, size=(10, 8), dtype=np.uint8) + max_val = 5.0 + flow = mmcv.dequantize_flow(dx, dy, max_val=max_val, denorm=False) + ref = np.zeros_like(flow, dtype=np.float32) + for i in range(ref.shape[0]): + for j in range(ref.shape[1]): + ref[i, j, 0] = float(dx[i, j] + 0.5) * 2 * max_val / 255 - max_val + ref[i, j, 1] = float(dy[i, j] + 0.5) * 2 * max_val / 255 - max_val + assert_array_almost_equal(flow, ref) + max_val = 0.5 + flow = mmcv.dequantize_flow(dx, dy, max_val=max_val, denorm=True) + h, w = dx.shape + ref = np.zeros_like(flow, dtype=np.float32) + for i in range(ref.shape[0]): + for j in range(ref.shape[1]): + ref[i, j, + 0] = (float(dx[i, j] + 0.5) * 2 * max_val / 255 - max_val) * w + ref[i, j, + 1] = (float(dy[i, j] + 0.5) * 2 * max_val / 255 - max_val) * h + assert_array_almost_equal(flow, ref) + + +def test_flow2rgb(): + flow = np.array([[[0, 0], [0.5, 0.5], [1, 1], [2, 1], [3, np.inf]]], + dtype=np.float32) + flow_img = mmcv.flow2rgb(flow) + # yapf: disable + assert_array_almost_equal( + flow_img, + np.array([[[1., 1., 1.], + [1., 0.826074731, 0.683772236], + [1., 0.652149462, 0.367544472], + [1., 0.265650552, 5.96046448e-08], + [0., 0., 0.]]], + dtype=np.float32)) + # yapf: enable + + +def test_flow_warp(): + + def np_flow_warp(flow, img): + output = np.zeros_like(img, dtype=img.dtype) + height = flow.shape[0] + width = flow.shape[1] + + grid = np.indices((height, width)).swapaxes(0, 1).swapaxes(1, 2) + dx = grid[:, :, 0] + flow[:, :, 1] + dy = grid[:, :, 1] + flow[:, :, 0] + sx = np.floor(dx).astype(int) + sy = np.floor(dy).astype(int) + valid = (sx >= 0) & (sx < height - 1) & (sy >= 0) & (sy < width - 1) + + output[valid, :] = img[dx[valid].round().astype(int), + dy[valid].round().astype(int), :] + + return output + + dim = 500 + a = np.random.randn(dim, dim, 3) * 10 + 125 + b = np.random.randn(dim, dim, 2) + 2 + 0.2 + + c = mmcv.flow_warp(a, b, interpolate_mode='nearest') + + d = np_flow_warp(b, a) + + simple_a = np.zeros((5, 5, 3)) + simple_a[2, 2, 0] = 1 + simple_b = np.ones((5, 5, 2)) + + simple_res_c = np.zeros((5, 5, 3)) + simple_res_c[1, 1, 0] = 1 + + res_c = mmcv.flow_warp(simple_a, simple_b, interpolate_mode='bilinear') + + assert_array_equal(c, d) + assert_array_equal(res_c, simple_res_c) + + +def test_make_color_wheel(): + default_color_wheel = mmcv.make_color_wheel() + color_wheel = mmcv.make_color_wheel([2, 2, 2, 2, 2, 2]) + # yapf: disable + assert_array_equal(default_color_wheel, np.array( + [[1. , 0. , 0. ], # noqa + [1. , 0.06666667, 0. ], # noqa + [1. , 0.13333334, 0. ], # noqa + [1. , 0.2 , 0. ], # noqa + [1. , 0.26666668, 0. ], # noqa + [1. , 0.33333334, 0. ], # noqa + [1. , 0.4 , 0. ], # noqa + [1. , 0.46666667, 0. ], # noqa + [1. , 0.53333336, 0. ], # noqa + [1. , 0.6 , 0. ], # noqa + [1. , 0.6666667 , 0. ], # noqa + [1. , 0.73333335, 0. ], # noqa + [1. , 0.8 , 0. ], # noqa + [1. , 0.8666667 , 0. ], # noqa + [1. , 0.93333334, 0. ], # noqa + [1. , 1. , 0. ], # noqa + [0.8333333 , 1. , 0. ], # noqa + [0.6666667 , 1. , 0. ], # noqa + [0.5 , 1. , 0. ], # noqa + [0.33333334, 1. , 0. ], # noqa + [0.16666667, 1. , 0. ], # noqa + [0. , 1. , 0. ], # noqa + [0. , 1. , 0.25 ], # noqa + [0. , 1. , 0.5 ], # noqa + [0. , 1. , 0.75 ], # noqa + [0. , 1. , 1. ], # noqa + [0. , 0.90909094, 1. ], # noqa + [0. , 0.8181818 , 1. ], # noqa + [0. , 0.72727275, 1. ], # noqa + [0. , 0.6363636 , 1. ], # noqa + [0. , 0.54545456, 1. ], # noqa + [0. , 0.45454547, 1. ], # noqa + [0. , 0.36363637, 1. ], # noqa + [0. , 0.27272728, 1. ], # noqa + [0. , 0.18181819, 1. ], # noqa + [0. , 0.09090909, 1. ], # noqa + [0. , 0. , 1. ], # noqa + [0.07692308, 0. , 1. ], # noqa + [0.15384616, 0. , 1. ], # noqa + [0.23076923, 0. , 1. ], # noqa + [0.30769232, 0. , 1. ], # noqa + [0.3846154 , 0. , 1. ], # noqa + [0.46153846, 0. , 1. ], # noqa + [0.53846157, 0. , 1. ], # noqa + [0.61538464, 0. , 1. ], # noqa + [0.6923077 , 0. , 1. ], # noqa + [0.7692308 , 0. , 1. ], # noqa + [0.84615386, 0. , 1. ], # noqa + [0.9230769 , 0. , 1. ], # noqa + [1. , 0. , 1. ], # noqa + [1. , 0. , 0.8333333 ], # noqa + [1. , 0. , 0.6666667 ], # noqa + [1. , 0. , 0.5 ], # noqa + [1. , 0. , 0.33333334], # noqa + [1. , 0. , 0.16666667]], dtype=np.float32)) # noqa + + assert_array_equal( + color_wheel, + np.array([[1., 0. , 0. ], # noqa + [1. , 0.5, 0. ], # noqa + [1. , 1. , 0. ], # noqa + [0.5, 1. , 0. ], # noqa + [0. , 1. , 0. ], # noqa + [0. , 1. , 0.5], # noqa + [0. , 1. , 1. ], # noqa + [0. , 0.5, 1. ], # noqa + [0. , 0. , 1. ], # noqa + [0.5, 0. , 1. ], # noqa + [1. , 0. , 1. ], # noqa + [1. , 0. , 0.5]], dtype=np.float32)) # noqa + # yapf: enable diff --git a/mmcv/test/test_video/test_processing.py b/mmcv/test/test_video/test_processing.py new file mode 100644 index 0000000..b430c08 --- /dev/null +++ b/mmcv/test/test_video/test_processing.py @@ -0,0 +1,53 @@ +# Copyright (c) Open-MMLab. All rights reserved. +import os +import os.path as osp +import tempfile + +import mmcv + + +class TestVideoEditor: + + @classmethod + def setup_class(cls): + cls.video_path = osp.join(osp.dirname(__file__), '../data/test.mp4') + cls.num_frames = 168 + + def test_cut_concat_video(self): + part1_file = osp.join(tempfile.gettempdir(), '.mmcv_test1.mp4') + part2_file = osp.join(tempfile.gettempdir(), '.mmcv_test2.mp4') + mmcv.cut_video(self.video_path, part1_file, end=3, vcodec='h264') + mmcv.cut_video(self.video_path, part2_file, start=3, vcodec='h264') + v1 = mmcv.VideoReader(part1_file) + v2 = mmcv.VideoReader(part2_file) + assert len(v1) == 75 + assert len(v2) == self.num_frames - 75 + + out_file = osp.join(tempfile.gettempdir(), '.mmcv_test.mp4') + mmcv.concat_video([part1_file, part2_file], out_file) + v = mmcv.VideoReader(out_file) + assert len(v) == self.num_frames + os.remove(part1_file) + os.remove(part2_file) + os.remove(out_file) + + def test_resize_video(self): + out_file = osp.join(tempfile.gettempdir(), '.mmcv_test.mp4') + mmcv.resize_video( + self.video_path, out_file, (200, 100), log_level='panic') + v = mmcv.VideoReader(out_file) + assert v.resolution == (200, 100) + os.remove(out_file) + mmcv.resize_video(self.video_path, out_file, ratio=2) + v = mmcv.VideoReader(out_file) + assert v.resolution == (294 * 2, 240 * 2) + os.remove(out_file) + mmcv.resize_video(self.video_path, out_file, (1000, 480), keep_ar=True) + v = mmcv.VideoReader(out_file) + assert v.resolution == (294 * 2, 240 * 2) + os.remove(out_file) + mmcv.resize_video( + self.video_path, out_file, ratio=(2, 1.5), keep_ar=True) + v = mmcv.VideoReader(out_file) + assert v.resolution == (294 * 2, 360) + os.remove(out_file) diff --git a/mmcv/test/test_video/test_reader.py b/mmcv/test/test_video/test_reader.py new file mode 100644 index 0000000..0a21a38 --- /dev/null +++ b/mmcv/test/test_video/test_reader.py @@ -0,0 +1,210 @@ +# Copyright (c) Open-MMLab. All rights reserved. +import os +import os.path as osp +import shutil +import tempfile +from collections import OrderedDict + +import pytest + +import mmcv + + +class TestCache: + + def test_init(self): + with pytest.raises(ValueError): + mmcv.Cache(0) + cache = mmcv.Cache(100) + assert cache.capacity == 100 + assert cache.size == 0 + + def test_put(self): + cache = mmcv.Cache(3) + for i in range(1, 4): + cache.put(f'k{i}', i) + assert cache.size == i + assert cache._cache == OrderedDict([('k1', 1), ('k2', 2), ('k3', 3)]) + cache.put('k4', 4) + assert cache.size == 3 + assert cache._cache == OrderedDict([('k2', 2), ('k3', 3), ('k4', 4)]) + cache.put('k2', 2) + assert cache._cache == OrderedDict([('k2', 2), ('k3', 3), ('k4', 4)]) + + def test_get(self): + cache = mmcv.Cache(3) + assert cache.get('key_none') is None + assert cache.get('key_none', 0) == 0 + cache.put('k1', 1) + assert cache.get('k1') == 1 + + +class TestVideoReader: + + @classmethod + def setup_class(cls): + cls.video_path = osp.join(osp.dirname(__file__), '../data/test.mp4') + cls.num_frames = 168 + cls.video_url = 'https://www.learningcontainer.com/wp-content/uploads/2020/05/sample-mp4-file.mp4' # noqa: E501 + + def test_load(self): + # read from video file + v = mmcv.VideoReader(self.video_path) + assert v.width == 294 + assert v.height == 240 + assert v.fps == 25 + assert v.frame_cnt == self.num_frames + assert len(v) == self.num_frames + assert v.opened + import cv2 + assert isinstance(v.vcap, type(cv2.VideoCapture())) + + # read from video url + v = mmcv.VideoReader(self.video_url) + assert v.width == 320 + assert v.height == 240 + assert v.fps == 15 + assert v.frame_cnt == 1889 + assert len(v) == 1889 + assert v.opened + assert isinstance(v.vcap, type(cv2.VideoCapture())) + + def test_read(self): + v = mmcv.VideoReader(self.video_path) + img = v.read() + assert int(round(img.mean())) == 94 + img = v.get_frame(63) + assert int(round(img.mean())) == 94 + img = v[64] + assert int(round(img.mean())) == 205 + img = v[-104] + assert int(round(img.mean())) == 205 + img = v[63] + assert int(round(img.mean())) == 94 + img = v[-105] + assert int(round(img.mean())) == 94 + img = v.read() + assert int(round(img.mean())) == 205 + with pytest.raises(IndexError): + v.get_frame(self.num_frames + 1) + with pytest.raises(IndexError): + v[-self.num_frames - 1] + + def test_slice(self): + v = mmcv.VideoReader(self.video_path) + imgs = v[-105:-103] + assert int(round(imgs[0].mean())) == 94 + assert int(round(imgs[1].mean())) == 205 + assert len(imgs) == 2 + imgs = v[63:65] + assert int(round(imgs[0].mean())) == 94 + assert int(round(imgs[1].mean())) == 205 + assert len(imgs) == 2 + imgs = v[64:62:-1] + assert int(round(imgs[0].mean())) == 205 + assert int(round(imgs[1].mean())) == 94 + assert len(imgs) == 2 + imgs = v[:5] + assert len(imgs) == 5 + for img in imgs: + assert int(round(img.mean())) == 94 + imgs = v[165:] + assert len(imgs) == 3 + for img in imgs: + assert int(round(img.mean())) == 0 + imgs = v[-3:] + assert len(imgs) == 3 + for img in imgs: + assert int(round(img.mean())) == 0 + + def test_current_frame(self): + v = mmcv.VideoReader(self.video_path) + assert v.current_frame() is None + v.read() + img = v.current_frame() + assert int(round(img.mean())) == 94 + + def test_position(self): + v = mmcv.VideoReader(self.video_path) + assert v.position == 0 + for _ in range(10): + v.read() + assert v.position == 10 + v.get_frame(99) + assert v.position == 100 + + def test_iterator(self): + cnt = 0 + for img in mmcv.VideoReader(self.video_path): + cnt += 1 + assert img.shape == (240, 294, 3) + assert cnt == self.num_frames + + def test_with(self): + with mmcv.VideoReader(self.video_path) as v: + assert v.opened + assert not v.opened + + def test_cvt2frames(self): + v = mmcv.VideoReader(self.video_path) + frame_dir = tempfile.mkdtemp() + v.cvt2frames(frame_dir) + assert osp.isdir(frame_dir) + for i in range(self.num_frames): + filename = f'{frame_dir}/{i:06d}.jpg' + assert osp.isfile(filename) + os.remove(filename) + + v = mmcv.VideoReader(self.video_path) + v.cvt2frames(frame_dir, show_progress=False) + assert osp.isdir(frame_dir) + for i in range(self.num_frames): + filename = f'{frame_dir}/{i:06d}.jpg' + assert osp.isfile(filename) + os.remove(filename) + + v = mmcv.VideoReader(self.video_path) + v.cvt2frames( + frame_dir, + file_start=100, + filename_tmpl='{:03d}.JPEG', + start=100, + max_num=20) + assert osp.isdir(frame_dir) + for i in range(100, 120): + filename = f'{frame_dir}/{i:03d}.JPEG' + assert osp.isfile(filename) + os.remove(filename) + shutil.rmtree(frame_dir) + + def test_frames2video(self): + v = mmcv.VideoReader(self.video_path) + frame_dir = tempfile.mkdtemp() + v.cvt2frames(frame_dir) + assert osp.isdir(frame_dir) + for i in range(self.num_frames): + filename = f'{frame_dir}/{i:06d}.jpg' + assert osp.isfile(filename) + + out_filename = osp.join(tempfile.gettempdir(), 'mmcv_test.avi') + mmcv.frames2video(frame_dir, out_filename) + v = mmcv.VideoReader(out_filename) + assert v.fps == 30 + assert len(v) == self.num_frames + + mmcv.frames2video( + frame_dir, + out_filename, + fps=25, + start=10, + end=50, + show_progress=False) + v = mmcv.VideoReader(out_filename) + assert v.fps == 25 + assert len(v) == 40 + + for i in range(self.num_frames): + filename = f'{frame_dir}/{i:06d}.jpg' + os.remove(filename) + shutil.rmtree(frame_dir) + os.remove(out_filename)