-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathtrack.py
151 lines (122 loc) · 4.81 KB
/
track.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import os
import sys
from collections import Counter
from glob import glob
from tqdm import tqdm
import numpy as np
import os.path as osp
from im_utils import *
from scipy.misc import imsave
from copy import deepcopy
from particleFilter import RoIPF
class Track(object):
"""This class contains all necessary for every individual track."""
def __init__(self, track_id, time_stamp, pos,
count_inactive, inactive_patience,
im_shape, hist_vec, max_particles=100):
"""
id : ID number through the process. Unique for each det
time_stamp : Current Timestamp(Frame number)
pos : Position in the frame(X_min, Y_min, X_max, Y_max)
score : detection confidences
count_inactive : Nos of time_stamp for which this frame has been inactive
inactive_patience : Nos of time_stamps for which this track is allowed to be
inactive
im_shape : Shape of the incoming frame. (H, W, 3)
last_pos : `pos` at time_stamp-N(hopefully N > 1)
"""
self.id = track_id
self.time_stamp = time_stamp
self.pos = pos
self.p_noise = min(self.pos[3] - self.pos[1],
self.pos[2] - self.pos[0]) / 4
self.count_inactive = count_inactive
self.inactive_patience = inactive_patience
self.im_shape = im_shape
self.last_pos = []
self.max_particles = max_particles
# Initialise Particle Filter
self.init_filter()
# Appearance
self.hist_vec = hist_vec
self.past_hist = [hist_vec]
self.hist_count = 0
# Attributes
self.kill_flag = False
self.exit_thresh = 50 # Image border
def init_filter(self):
self.roiPF = RoIPF(self.max_particles, self.pos)
def avg_hist(self):
return np.mean(self.past_hist, axis=0)
def predict_particles(self):
return self.roiPF.predict()
def get_n_particles(self):
return self.roiPF.created_particles
def get_particles(self):
return self.roiPF.get_all_particles()
def align_particles(self, aligned_particles):
self.roiPF.align_particles(aligned_particles)
def update_position(self, best_pos, all_scores=None, all_pos=None):
"""
When rematching, all_scores is None and hence particles need to be
resampled.
"""
self.pos = best_pos
self._update_lastpos()
if all_scores is not None and all_pos is not None:
self.score = np.max(all_scores)
self.roiPF.update(best_pos, all_scores, all_pos)
if self.roiPF.neff() < self.roiPF.created_particles // 3:
self.roiPF.resample_particles(self.pos)
def update_track(self, time_stamp, hist_vec, rematch=False):
"""
time_stamp : Current Timestamp(Frame number)
pos : Position in the frame(X_min, Y_min, X_max, Y_max)
"""
if rematch:
self.last_pos = []
self.init_filter()
self.time_stamp = time_stamp
self.past_hist.append(hist_vec)
self.hist_vec = hist_vec
self.reset_count_inactive()
# self.last_pos_copy = deepcopy(self.last_pos)
def _update_lastpos(self):
if isinstance(self.last_pos, list):
self.last_pos.append(self.pos)
self.last_pos = np.array(self.last_pos)
else:
self.last_pos = np.concatenate([self.last_pos, np.array([self.pos])])
def step(self):
self.count_inactive += 1
self.time_stamp += 1
if not self._does_it_exit():
self._update_lastpos()
self.pos = self._predict_position()
def get_velocity(self):
return self.roiPF.velocity
def _does_it_exit(self):
if len(self.last_pos) < 1 :
self.inactive_patience = 5
return False
cur_centroid = compute_centroid(self.pos)
cur_vel = self.get_velocity()
cur_wh = self.pos[2:4] - self.pos[0:2]
# Check if it's leaving the frame
# TODO : Keep the inactive patience equally high rather than killing it right away.
eventual_centroid = cur_centroid + (self.inactive_patience * cur_vel)
eventual_box = box_from_centroid(np.r_[eventual_centroid, cur_wh])
if is_outside(eventual_box, self.im_shape, thresh=20):
self.kill_flag = True
self.count_inactive += 30
return True
return False
def reset_count_inactive(self):
self.count_inactive = 0
def _predict_position(self):
cur_state = self.pos
cur_centre = compute_centroid(cur_state)
cur_wh = self.roiPF.get_mean_wh()
new_centre = cur_centre + self.get_velocity()
new_pos = box_from_centroid(np.r_[new_centre, cur_wh])
return new_pos