Skip to content

Commit

Permalink
add mmcv/test/test_video folder
Browse files Browse the repository at this point in the history
  • Loading branch information
seungwoo-ji-03 committed Oct 8, 2021
1 parent 4c25553 commit 5637ff8
Show file tree
Hide file tree
Showing 3 changed files with 520 additions and 0 deletions.
257 changes: 257 additions & 0 deletions mmcv/test/test_video/test_optflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
# Copyright (c) Open-MMLab. All rights reserved.
import os
import os.path as osp
import tempfile

import numpy as np
import pytest
from numpy.testing import assert_array_almost_equal, assert_array_equal

import mmcv


def test_flowread():
data_dir = osp.join(osp.dirname(__file__), '../data')
flow_shape = (60, 80, 2)

# read .flo file
flow = mmcv.flowread(osp.join(data_dir, 'optflow.flo'))
assert flow.shape == flow_shape

# pseudo read
flow_same = mmcv.flowread(flow)
assert_array_equal(flow, flow_same)

# read quantized flow concatenated vertically
flow = mmcv.flowread(
osp.join(data_dir, 'optflow_concat0.jpg'), quantize=True, denorm=True)
assert flow.shape == flow_shape

# read quantized flow concatenated horizontally
flow = mmcv.flowread(
osp.join(data_dir, 'optflow_concat1.jpg'),
quantize=True,
concat_axis=1,
denorm=True)
assert flow.shape == flow_shape

# test exceptions
notflow_file = osp.join(data_dir, 'color.jpg')
with pytest.raises(TypeError):
mmcv.flowread(1)
with pytest.raises(IOError):
mmcv.flowread(notflow_file)
with pytest.raises(IOError):
mmcv.flowread(notflow_file, quantize=True)
with pytest.raises(ValueError):
mmcv.flowread(np.zeros((100, 100, 1)))


def test_flowwrite():
flow = np.random.rand(100, 100, 2).astype(np.float32)

# write to a .flo file
_, filename = tempfile.mkstemp()
mmcv.flowwrite(flow, filename)
flow_from_file = mmcv.flowread(filename)
assert_array_equal(flow, flow_from_file)
os.remove(filename)

# write to two .jpg files
tmp_filename = osp.join(tempfile.gettempdir(), 'mmcv_test_flow.jpg')
for concat_axis in range(2):
mmcv.flowwrite(
flow, tmp_filename, quantize=True, concat_axis=concat_axis)
shape = (200, 100) if concat_axis == 0 else (100, 200)
assert osp.isfile(tmp_filename)
assert mmcv.imread(tmp_filename, flag='unchanged').shape == shape
os.remove(tmp_filename)

# test exceptions
with pytest.raises(AssertionError):
mmcv.flowwrite(flow, tmp_filename, quantize=True, concat_axis=2)


def test_quantize_flow():
flow = (np.random.rand(10, 8, 2).astype(np.float32) - 0.5) * 15
max_val = 5.0
dx, dy = mmcv.quantize_flow(flow, max_val=max_val, norm=False)
ref = np.zeros_like(flow, dtype=np.uint8)
for i in range(ref.shape[0]):
for j in range(ref.shape[1]):
for k in range(ref.shape[2]):
val = flow[i, j, k] + max_val
val = min(max(val, 0), 2 * max_val)
ref[i, j, k] = min(np.floor(255 * val / (2 * max_val)), 254)
assert_array_equal(dx, ref[..., 0])
assert_array_equal(dy, ref[..., 1])
max_val = 0.5
dx, dy = mmcv.quantize_flow(flow, max_val=max_val, norm=True)
ref = np.zeros_like(flow, dtype=np.uint8)
for i in range(ref.shape[0]):
for j in range(ref.shape[1]):
for k in range(ref.shape[2]):
scale = flow.shape[1] if k == 0 else flow.shape[0]
val = flow[i, j, k] / scale + max_val
val = min(max(val, 0), 2 * max_val)
ref[i, j, k] = min(np.floor(255 * val / (2 * max_val)), 254)
assert_array_equal(dx, ref[..., 0])
assert_array_equal(dy, ref[..., 1])


def test_dequantize_flow():
dx = np.random.randint(256, size=(10, 8), dtype=np.uint8)
dy = np.random.randint(256, size=(10, 8), dtype=np.uint8)
max_val = 5.0
flow = mmcv.dequantize_flow(dx, dy, max_val=max_val, denorm=False)
ref = np.zeros_like(flow, dtype=np.float32)
for i in range(ref.shape[0]):
for j in range(ref.shape[1]):
ref[i, j, 0] = float(dx[i, j] + 0.5) * 2 * max_val / 255 - max_val
ref[i, j, 1] = float(dy[i, j] + 0.5) * 2 * max_val / 255 - max_val
assert_array_almost_equal(flow, ref)
max_val = 0.5
flow = mmcv.dequantize_flow(dx, dy, max_val=max_val, denorm=True)
h, w = dx.shape
ref = np.zeros_like(flow, dtype=np.float32)
for i in range(ref.shape[0]):
for j in range(ref.shape[1]):
ref[i, j,
0] = (float(dx[i, j] + 0.5) * 2 * max_val / 255 - max_val) * w
ref[i, j,
1] = (float(dy[i, j] + 0.5) * 2 * max_val / 255 - max_val) * h
assert_array_almost_equal(flow, ref)


def test_flow2rgb():
flow = np.array([[[0, 0], [0.5, 0.5], [1, 1], [2, 1], [3, np.inf]]],
dtype=np.float32)
flow_img = mmcv.flow2rgb(flow)
# yapf: disable
assert_array_almost_equal(
flow_img,
np.array([[[1., 1., 1.],
[1., 0.826074731, 0.683772236],
[1., 0.652149462, 0.367544472],
[1., 0.265650552, 5.96046448e-08],
[0., 0., 0.]]],
dtype=np.float32))
# yapf: enable


def test_flow_warp():

def np_flow_warp(flow, img):
output = np.zeros_like(img, dtype=img.dtype)
height = flow.shape[0]
width = flow.shape[1]

grid = np.indices((height, width)).swapaxes(0, 1).swapaxes(1, 2)
dx = grid[:, :, 0] + flow[:, :, 1]
dy = grid[:, :, 1] + flow[:, :, 0]
sx = np.floor(dx).astype(int)
sy = np.floor(dy).astype(int)
valid = (sx >= 0) & (sx < height - 1) & (sy >= 0) & (sy < width - 1)

output[valid, :] = img[dx[valid].round().astype(int),
dy[valid].round().astype(int), :]

return output

dim = 500
a = np.random.randn(dim, dim, 3) * 10 + 125
b = np.random.randn(dim, dim, 2) + 2 + 0.2

c = mmcv.flow_warp(a, b, interpolate_mode='nearest')

d = np_flow_warp(b, a)

simple_a = np.zeros((5, 5, 3))
simple_a[2, 2, 0] = 1
simple_b = np.ones((5, 5, 2))

simple_res_c = np.zeros((5, 5, 3))
simple_res_c[1, 1, 0] = 1

res_c = mmcv.flow_warp(simple_a, simple_b, interpolate_mode='bilinear')

assert_array_equal(c, d)
assert_array_equal(res_c, simple_res_c)


def test_make_color_wheel():
default_color_wheel = mmcv.make_color_wheel()
color_wheel = mmcv.make_color_wheel([2, 2, 2, 2, 2, 2])
# yapf: disable
assert_array_equal(default_color_wheel, np.array(
[[1. , 0. , 0. ], # noqa
[1. , 0.06666667, 0. ], # noqa
[1. , 0.13333334, 0. ], # noqa
[1. , 0.2 , 0. ], # noqa
[1. , 0.26666668, 0. ], # noqa
[1. , 0.33333334, 0. ], # noqa
[1. , 0.4 , 0. ], # noqa
[1. , 0.46666667, 0. ], # noqa
[1. , 0.53333336, 0. ], # noqa
[1. , 0.6 , 0. ], # noqa
[1. , 0.6666667 , 0. ], # noqa
[1. , 0.73333335, 0. ], # noqa
[1. , 0.8 , 0. ], # noqa
[1. , 0.8666667 , 0. ], # noqa
[1. , 0.93333334, 0. ], # noqa
[1. , 1. , 0. ], # noqa
[0.8333333 , 1. , 0. ], # noqa
[0.6666667 , 1. , 0. ], # noqa
[0.5 , 1. , 0. ], # noqa
[0.33333334, 1. , 0. ], # noqa
[0.16666667, 1. , 0. ], # noqa
[0. , 1. , 0. ], # noqa
[0. , 1. , 0.25 ], # noqa
[0. , 1. , 0.5 ], # noqa
[0. , 1. , 0.75 ], # noqa
[0. , 1. , 1. ], # noqa
[0. , 0.90909094, 1. ], # noqa
[0. , 0.8181818 , 1. ], # noqa
[0. , 0.72727275, 1. ], # noqa
[0. , 0.6363636 , 1. ], # noqa
[0. , 0.54545456, 1. ], # noqa
[0. , 0.45454547, 1. ], # noqa
[0. , 0.36363637, 1. ], # noqa
[0. , 0.27272728, 1. ], # noqa
[0. , 0.18181819, 1. ], # noqa
[0. , 0.09090909, 1. ], # noqa
[0. , 0. , 1. ], # noqa
[0.07692308, 0. , 1. ], # noqa
[0.15384616, 0. , 1. ], # noqa
[0.23076923, 0. , 1. ], # noqa
[0.30769232, 0. , 1. ], # noqa
[0.3846154 , 0. , 1. ], # noqa
[0.46153846, 0. , 1. ], # noqa
[0.53846157, 0. , 1. ], # noqa
[0.61538464, 0. , 1. ], # noqa
[0.6923077 , 0. , 1. ], # noqa
[0.7692308 , 0. , 1. ], # noqa
[0.84615386, 0. , 1. ], # noqa
[0.9230769 , 0. , 1. ], # noqa
[1. , 0. , 1. ], # noqa
[1. , 0. , 0.8333333 ], # noqa
[1. , 0. , 0.6666667 ], # noqa
[1. , 0. , 0.5 ], # noqa
[1. , 0. , 0.33333334], # noqa
[1. , 0. , 0.16666667]], dtype=np.float32)) # noqa

assert_array_equal(
color_wheel,
np.array([[1., 0. , 0. ], # noqa
[1. , 0.5, 0. ], # noqa
[1. , 1. , 0. ], # noqa
[0.5, 1. , 0. ], # noqa
[0. , 1. , 0. ], # noqa
[0. , 1. , 0.5], # noqa
[0. , 1. , 1. ], # noqa
[0. , 0.5, 1. ], # noqa
[0. , 0. , 1. ], # noqa
[0.5, 0. , 1. ], # noqa
[1. , 0. , 1. ], # noqa
[1. , 0. , 0.5]], dtype=np.float32)) # noqa
# yapf: enable
53 changes: 53 additions & 0 deletions mmcv/test/test_video/test_processing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Copyright (c) Open-MMLab. All rights reserved.
import os
import os.path as osp
import tempfile

import mmcv


class TestVideoEditor:

@classmethod
def setup_class(cls):
cls.video_path = osp.join(osp.dirname(__file__), '../data/test.mp4')
cls.num_frames = 168

def test_cut_concat_video(self):
part1_file = osp.join(tempfile.gettempdir(), '.mmcv_test1.mp4')
part2_file = osp.join(tempfile.gettempdir(), '.mmcv_test2.mp4')
mmcv.cut_video(self.video_path, part1_file, end=3, vcodec='h264')
mmcv.cut_video(self.video_path, part2_file, start=3, vcodec='h264')
v1 = mmcv.VideoReader(part1_file)
v2 = mmcv.VideoReader(part2_file)
assert len(v1) == 75
assert len(v2) == self.num_frames - 75

out_file = osp.join(tempfile.gettempdir(), '.mmcv_test.mp4')
mmcv.concat_video([part1_file, part2_file], out_file)
v = mmcv.VideoReader(out_file)
assert len(v) == self.num_frames
os.remove(part1_file)
os.remove(part2_file)
os.remove(out_file)

def test_resize_video(self):
out_file = osp.join(tempfile.gettempdir(), '.mmcv_test.mp4')
mmcv.resize_video(
self.video_path, out_file, (200, 100), log_level='panic')
v = mmcv.VideoReader(out_file)
assert v.resolution == (200, 100)
os.remove(out_file)
mmcv.resize_video(self.video_path, out_file, ratio=2)
v = mmcv.VideoReader(out_file)
assert v.resolution == (294 * 2, 240 * 2)
os.remove(out_file)
mmcv.resize_video(self.video_path, out_file, (1000, 480), keep_ar=True)
v = mmcv.VideoReader(out_file)
assert v.resolution == (294 * 2, 240 * 2)
os.remove(out_file)
mmcv.resize_video(
self.video_path, out_file, ratio=(2, 1.5), keep_ar=True)
v = mmcv.VideoReader(out_file)
assert v.resolution == (294 * 2, 360)
os.remove(out_file)
Loading

0 comments on commit 5637ff8

Please sign in to comment.