-
Notifications
You must be signed in to change notification settings - Fork 0
/
depthqueueing.py
127 lines (93 loc) · 4.1 KB
/
depthqueueing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
from collections import deque
class DepthQueueModifier:
def __init__(self, maxlen=5, breakfree=0.1):
# queue consists of [object depth, boundingbox]
self.queue = deque(maxlen=maxlen)
self.breakfree = breakfree
self.currentmean = None
self.queue.extend([[1,1]])
def update_calc_average(self):
if not self.queue:
self.currentmean = None
else:
# Extract depths from queue
depths = [item[0] for item in self.queue]
self.currentmean = sum(depths) / len(depths)
def add_item_to_queue(self, objectdepth, boundingbox):
self.queue.appendleft([objectdepth, boundingbox])
def return_currentmean(self):
return self.currentmean
def check_breakfree(self):
threshold = self.currentmean * self.breakfree
if self.currentmean - threshold > newvalue or newvalue > self.currentmean + threshold:
return True
else:
return False
'''
def run(self):
self.update_calc_average()
self.queue.appendleft([newvalue])
self.currentmean = DepthQueueModifier.calc_average(self.queue)
print(f"after:{self.queue}")
def run(self, newvalue):
if self.queue:
print(f"before:{self.queue}")
print(f"mean:{DepthQueueModifier.calc_average(self.queue)}")
self.currentmean = DepthQueueModifier.calc_average(self.queue)
threshold = self.currentmean * self.breakfree
if self.currentmean - threshold > newvalue or newvalue > self.currentmean + threshold:
self.queue.appendleft([newvalue])
self.currentmean = DepthQueueModifier.calc_average(self.queue)
print("triggered")
print(f"currentmean:{self.currentmean}")
print(f"current queue {self.queue}")
return self.currentmean
else:
return self.currentmean
else:
self.queue.append(newvalue)
print(f"after:{self.queue}")
'''
class QueueManager:
def __init__(self, IoU=0.3):
self.suitable_object_found = False
self.IoU = IoU
self.currentmeandepth = None
def check_objects_and_return_depth(self, objectdepth, roi, objectlist):
self.suitable_object_found = False
for object in objectlist:
if QueueManager.bb_intersection_over_union(roi, object.queue[0][1]) > self.IoU:
object.queue.appendleft([objectdepth, roi])
object.update_calc_average()
self.suitable_object_found = True
print('triggered')
return object.return_currentmean()
if self.suitable_object_found == False:
newobject = DepthQueueModifier()
newobject.add_item_to_queue(objectdepth, roi)
objectlist.append(newobject)
return objectdepth
#else:
#return objectdepth
def bb_intersection_over_union(boxA, boxB):
# Example bounding box coordinates: (x1, y1, x2, y2)
# determine the (x, y)-coordinates of the intersection rectangle
xA = max(boxA[0], boxB[0])
yA = max(boxA[1], boxB[1])
xB = min(boxA[2], boxB[2])
yB = min(boxA[3], boxB[3])
# compute the area of intersection rectangle
interArea = max(0, xB - xA + 1) * max(0, yB - yA + 1)
# compute the area of both the prediction and ground-truth rectangles
boxAArea = (boxA[2] - boxA[0] + 1) * (boxA[3] - boxA[1] + 1)
boxBArea = (boxB[2] - boxB[0] + 1) * (boxB[3] - boxB[1] + 1)
# compute the intersection over union by taking the intersection area
# and dividing it by the sum of prediction + ground-truth
# areas - the intersection area
iou = interArea / float(boxAArea + boxBArea - interArea)
return iou
if __name__ == '__main__':
d = DepthQueueModifier()
print(d.run(4))
print(d.run(5))
print(d.run(6))