-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtracker.py
160 lines (126 loc) · 6.41 KB
/
tracker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import uuid
import numpy as np
from boundingbox import BBox
from motpy import Detection, MultiObjectTracker
class TrackedObject():
def __init__(self, id: int, bbox: BBox, frame: int, uuid=uuid.uuid4(), color=None):
self.uuid = uuid
self.id = id
self.label = bbox.label
self.color = color if color else np.uint8(np.random.uniform(0, 255, (3,)))
self.first_frame = frame
self.last_frame = frame
bbox.object_id = id
self._bboxes = [bbox]
self._frames = [frame]
@property
def last_bbox(self):
return self._bboxes[-1]
def add_bbox(self, bbox: BBox, frame: int):
bbox.object_id = self.id
if bbox.label: self.label = bbox.label
self._bboxes.append(bbox)
self._frames.append(frame)
self.last_frame = frame
def repr(self):
return f"{self.label if self.label else 'object'} {self.id}: {self.last_bbox.confidence:.2%}"
def to_csv(self):
return "\n".join([f"{frame},{self.id},{','.join(str(p) for p in bbox.as_list)},{bbox.confidence},-1,-1,-1" for frame, bbox in zip(self._frames, self._bboxes)])
class ObjectTracker():
def __init__(self):
self.tracked_objects = []
self._last_detections = None
self._frame_number = 0
self._last_object_id = 0
def get_new_id(self):
self._last_object_id += 1
return self._last_object_id
@property
def active_objects(self):
return [obj for obj in self.tracked_objects if obj.last_frame == self._frame_number]
@property
def tracked_objects_by_id(self):
return {obj.id: obj for obj in self.tracked_objects}
@property
def tracked_objects_by_uuid(self):
return {obj.uuid: obj for obj in self.tracked_objects}
def track(self, detections: list):
self._frame_number += 1
self._track(detections)
self._last_detections = detections
return self.active_objects
def _track(self, detections: list):
raise NotImplementedError("You must override the `_track` method in your child tracker class")
def to_csv(self):
return "\n".join([obj.to_csv() for obj in self.tracked_objects])
class NaiveObjectTracker(ObjectTracker):
def __init__(self, distance_threshold=np.inf):
super(NaiveObjectTracker, self).__init__()
self.distance_threshold = distance_threshold
def _track(self, detections: list):
if self._last_detections is None:
self.tracked_objects = [TrackedObject(self.get_new_id(), detection, self._frame_number) for detection in detections]
else:
labels = {detection.label for detection in detections}
objects_by_class = {label: [d for d in detections if d.label == label] for label in labels}
for label, new_detections in objects_by_class.items():
previous_detections = [d for d in self._last_detections if d.label == label]
bbox_distances = np.full((len(new_detections), len(previous_detections)), np.inf)
for i, new_detection in enumerate(new_detections):
for j, previous_detection in enumerate(previous_detections):
bbox_distances[i, j] = new_detection.dist(previous_detection)
while np.any(bbox_distances < self.distance_threshold):
argmin = np.unravel_index(np.argmin(bbox_distances), bbox_distances.shape)
bbox_distances = np.delete(bbox_distances, argmin[0], 0)
bbox_distances = np.delete(bbox_distances, argmin[1], 1)
self.tracked_objects_by_id[previous_detections.pop(argmin[1]).object_id].add_bbox(new_detections.pop(argmin[0]), self._frame_number)
for new_bbox in new_detections:
self.tracked_objects.append(TrackedObject(self.get_new_id(), new_bbox, self._frame_number))
class KalmanObjectTracker(ObjectTracker):
def __init__(self, class_labels: list):
super(KalmanObjectTracker, self).__init__()
self._tracker = MultiObjectTracker(dt=0.1)
self._class_labels = class_labels
def _track(self, detections):
self._tracker.step(detections=[Detection(box=[b.p1.x, b.p1.y, b.p2.x, b.p2.y], score=b.confidence, class_id=b.class_id) for b in detections])
tracks = self._tracker.active_tracks()
for track in tracks:
xmin, ymin, xmax, ymax = track.box
bbox = BBox(
pos=(xmin, ymin),
size=(np.abs(xmax-xmin), np.abs(ymax-ymin)),
confidence=track.score,
class_id=track.class_id,
label=self._class_labels[track.class_id]
)
if track.id in self.tracked_objects_by_uuid:
self.tracked_objects_by_uuid[track.id].add_bbox(bbox, self._frame_number)
else:
self.tracked_objects.append(TrackedObject(self.get_new_id(), bbox, self._frame_number, uuid=track.id))
class ByteTrack(ObjectTracker):
def __init__(self, track_thresh=0.5, match_thresh=0.8, track_buffer=30):
"""
track_thresh: tracking confidence threshold
track_buffer: the frames for keep lost tracks
match_thresh: matching threshold for tracking
"""
super(ByteTrack, self).__init__()
from trackers.ByteTrack.yolox.tracker.byte_tracker import BYTETracker
from types import SimpleNamespace
args = SimpleNamespace(track_thresh=track_thresh, match_thresh=match_thresh, track_buffer=track_buffer, mot20=None)
self._tracker = BYTETracker(args)
def _track(self, detections):
dets = np.array([(bbox.p1.x, bbox.p1.y, bbox.p2.x, bbox.p2.y, bbox.confidence) for bbox in detections]).reshape((len(detections), 5))
online_targets = self._tracker.update(dets, [1, 1], [1, 1])
for track in online_targets:
left, top, w, h = track.tlwh
bbox = BBox(
pos=(left, top),
size=(w, h),
confidence=track.score,
object_id=track.track_id
)
if track.track_id in self.tracked_objects_by_uuid:
self.tracked_objects_by_uuid[track.track_id].add_bbox(bbox, self._frame_number)
else:
self.tracked_objects.append(TrackedObject(self.get_new_id(), bbox, self._frame_number, uuid=track.track_id))