-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy patheval_tartan.py
111 lines (85 loc) · 3.4 KB
/
eval_tartan.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import argparse
from collections import defaultdict
from pathlib import Path
from pprint import pprint
import cv2
import evo.main_ape as main_ape
import gin
import numpy as np
import torch
from einops import *
from evo.core.metrics import PoseRelation
from evo.core.trajectory import PoseTrajectory3D
from imageio.v3 import imread
from tqdm import tqdm
from multi_slam.dpvo.data_readers.tartan import test_split as val_split
from multi_slam.fullsystem import FullSystem
from multi_slam.locnet import LocNet
fx, fy, cx, cy = [320, 320, 320, 240]
STRIDE = 1
MAX_LEN = 1000
np.random.RandomState(0).shuffle(val_split)
val_split = val_split[:15]
def show_image(image, t=0):
image = image.permute(1, 2, 0).cpu().numpy()
cv2.imshow('image', image[...,[2,1,0]] / 255.0)
cv2.waitKey(t)
def video_iterator(imagedir):
imfiles = sorted((Path(imagedir) / "image_left").glob("*.png"))[:MAX_LEN]
assert len(imfiles) >= 20
data_list = []
for imfile in sorted(imfiles)[::STRIDE]:
img = imread(imfile)
image = torch.from_numpy(np.copy(img)).permute(2,0,1)
intrinsics = torch.as_tensor([fx, fy, cx, cy])
data_list.append((image, intrinsics))
return data_list
@torch.no_grad()
def run(imagedir, network):
model = FullSystem(network, network)
scene_name = Path(imagedir).parts[2]
data_list = video_iterator(imagedir)
model.add_new_video(scene_name, len(data_list), (480, 640))
for t, (image, intrinsics) in enumerate(tqdm(data_list, desc=f'Running on {imagedir}')):
image = image.cuda()
intrinsics = intrinsics.cuda()
model.insert_frame(image, intrinsics, t)
model.complete_video()
preds = model.terminate()
_, tstamps, poses = zip(*preds)
poses = np.stack(poses)
tstamps = np.array(tstamps)
return tstamps, poses
def ate(traj_ref, traj_est, timestamps):
traj_est = PoseTrajectory3D(
positions_xyz=traj_est[:,:3],
orientations_quat_wxyz=traj_est[:,3:],
timestamps=timestamps)
traj_ref = PoseTrajectory3D(
positions_xyz=traj_ref[:,:3],
orientations_quat_wxyz=traj_ref[:,3:],
timestamps=timestamps)
result = main_ape.ape(traj_ref, traj_est, est_name='traj',
pose_relation=PoseRelation.translation_part, align=True, correct_scale=True)
return result.stats["rmse"]
def validate(network, plot=False):
results = defaultdict(list)
for scene in val_split:
scene_dir = Path("datasets/TartanAir") / scene
traj_ref = scene_dir / "pose_left.txt"
assert traj_ref.exists()
tstamps, traj_est = run(scene_dir, network)
PERM = [1, 2, 0, 4, 5, 3, 6] # ned -> xyz
traj_ref = np.loadtxt(traj_ref, delimiter=" ")[::STRIDE, PERM][:MAX_LEN]
ate_score = ate(traj_ref, traj_est, tstamps)
results[scene].append(ate_score)
if plot:
scene_name = '_'.join(scene.split('/')[1:]).title()
Path("trajectory_plots").mkdir(exist_ok=True)
j = 0
plot_trajectory((traj_est, tstamps), (traj_ref, tstamps), f"TartanAir {scene_name.replace('_', ' ')} Trial #{j+1} (ATE: {ate_score:.03f})",
f"trajectory_plots/TartanAir_{scene_name}_Trial{j+1:02d}.pdf", align=True, correct_scale=True)
results = dict(results)
results_dict = dict([("Tartan/{}".format(k), np.median(v)) for (k, v) in results.items()])
pprint(results_dict)
return results_dict