-
Notifications
You must be signed in to change notification settings - Fork 0
/
validate_bags.py
96 lines (72 loc) · 2.23 KB
/
validate_bags.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
#!/usr/bin/env python
import multiprocessing
import os
import subprocess
import yaml
from typing import List
from aido_utils import get_device_list, show_status
OUTPUT_DIR = ""
bagfiles = None
def find_bag(device, bag_files):
bag_name = "Not found"
for bag in bag_files:
if device in bag:
bag_name = bag
break
return bag_name
def check_bag(device):
global bag_files
bag = find_bag(device, bag_files)
if bag == "Not found":
return "Missing"
cmd = "rosbag info %s" % bag
try:
subprocess.check_output(cmd, shell=True)
except:
cmd = 'rosbag reindex %s' % bag
try:
subprocess.check_output(cmd, shell=True)
bag_root = os.path.splitext(bag)[0]
if ("active" in bag):
os.rename(bag, bag_root)
try:
os.unlink('%s.orig.active' % bag_root)
except:
os.unlink('%s.orig.bag' % bag_root)
except subprocess.CalledProcessError:
return "Reindex error"
info_dict = yaml.load(subprocess.check_output(
['rosbag', 'info', '--yaml', bag]
).rstrip().decode("utf-8"),
Loader=yaml.FullLoader)
try:
duration = info_dict['duration']
bag_topics = info_dict['topics']
count = 0
for topic in bag_topics:
if "compressed" in topic['topic']:
count = topic['messages']
break
return "T:%.1f, R:%.1f" % (duration, count/duration)
except KeyError:
return "Empty"
def check_all_bags(device_list):
global OUTPUT_DIR
global bag_files
bag_files = [os.path.join(OUTPUT_DIR, f)
for f in os.listdir(OUTPUT_DIR)
if os.path.isfile(os.path.join(OUTPUT_DIR, f))]
print(bag_files)
pool = multiprocessing.Pool(processes=20)
results = pool.map(check_bag, device_list)
pool.close()
pool.join()
show_status(device_list, results)
def copy_bags_main(device_list, outputdir):
global OUTPUT_DIR
OUTPUT_DIR = outputdir
if OUTPUT_DIR == None:
OUTPUT_DIR = '/data/bags'
print('Validating bags:')
check_all_bags(device_list)
print(copy_bags_main({"watchtower22"}, None))