-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
778 lines (648 loc) · 33.7 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
# *************************************************************************** #
# ********************* Sharif University of Technology ********************* #
# ****************** Department of Electrical Engineering ******************* #
# **************************** Deep Learning Lab **************************** #
# ************************ Video Synopsis Version 1.0 *********************** #
# *************** Authors: Ramtin Malekpoor - Mehrdad Morsali *************** #
# ********** ramtin.malekpour3@gmail.com - mehrdadmorsali@gmail.com ********* #
# *************************************************************************** #
# *************************************************************************** #
# ************************** Packages and Libraries ************************* #
# *************************************************************************** #
import argparse
import os
import shutil
import time
import cv2
import tracemalloc
import numpy as np
from SFSORT import SFSORT
from ultralytics import YOLO
from ultralytics.utils.torch_utils import select_device
from background import extract_background
from background import extract_synopsis_background
from tube import Tube, interpolate_missed_boxes, segment_objects
from synopsis import generate_synopsis_video
# *************************************************************************** #
# ******************************** Functions ******************************** #
# *************************************************************************** #
def parse_arguments():
"""Takes input arguments from the interface running main.py"""
# Create a parser
parser = argparse.ArgumentParser()
# Add arguments to the parser
parser.add_argument('--InPath', help='path to the input file',
default='', required=True, type=str)
parser.add_argument('--IDPath', help='path to the app files',
default='', required=True, type=str)
parser.add_argument('--Model', help='path to model.pt file',
default='yolov8n.pt', required=True, type=str)
parser.add_argument('--ConfTH', help='detection score threshold',
default=0.1, type=float)
parser.add_argument('--IoUTH', help='IOU threshold for NMS',
default=0.45, type=float)
parser.add_argument('--Half', help='half precision inference',
default=False, type=bool)
parser.add_argument('--Device', help='cuda device: 0/0,1,2,3/cpu',
default='cpu', type=str)
parser.add_argument('--MaxDet', help='maximum detections count',
default=30, type=int)
parser.add_argument('--Class', help='detection classes: 0/0 2 3',
default=0, type=int)
parser.add_argument('--UseStride', help='object detection with stride 3',
default=False, type=bool)
parser.add_argument('--InterpolateRange', help='Frames to interpolate missed boxes (0 disables interpolation)',
default=22, type=int)
parser.add_argument('--Dynamic_tuning', help='Tracker',
default=True, type=bool)
parser.add_argument('--Cth', help='Tracker',
default=0.7, type=float)
parser.add_argument('--HighTH_m', help='Tracker',
default=0.1, type=float)
parser.add_argument('--MatchTH1_m', help='Tracker',
default=0.05, type=float)
parser.add_argument('--NewTH_m', help='Tracker',
default=0.1, type=float)
parser.add_argument('--HighTH', help='threshold for valid detection',
default=0.82, type=float)
parser.add_argument('--LowTH', help='threshold for possible detection',
default=0.3, type=float)
parser.add_argument('--MatchTH1', help='threshold for first association',
default=0.5, type=float)
parser.add_argument('--MatchTH2', help='threshold for second association',
default=0.1, type=float)
parser.add_argument('--NewTH', help='new track threshold',
default=0.7, type=float)
parser.add_argument('--MarginTimeout', help='marginal lost track timeout',
default=20, type=int)
parser.add_argument('--CenterTimeout', help='central lost track timeout',
default=30, type=int)
parser.add_argument('--HorizontalMargin', help='horizontal margin',
default=10, type=int)
parser.add_argument('--VerticalMargin', help='vertical margin',
default=30, type=int)
parser.add_argument('--FrameWidth', help='width of video frames',
default=1440, type=int)
parser.add_argument('--FrameHeight', help='height of video frames',
default=800, type=int)
parser.add_argument('--Test', help='run the app in the test mode',
default=False, type=bool)
# Return the received arguments
return vars(parser.parse_args())
def make_temporary_directories():
"""Creates temporary directories for the app"""
# Remove directories if they already exist
if os.path.exists(samples_path):
shutil.rmtree(samples_path)
# Make new directories
os.mkdir(samples_path)
return 0
def TubeID(input_tube):
return input_tube.id
def save_frame_as_background_sample(image, ID):
"""Saves an image in the specified path under the name ID"""
filename = str(ID) + '.npy'
np.save(os.path.join(samples_path, filename), image)
return 0
def standardize_box(box, margin):
"""Converts a box's coordinates into standard values while adding margins
to the box"""
# x1, y1 is the coordinate of the upper-left side of the box
# x2, y2 is the coordinate of the bottom-right side of the box
x1 = int(box[0] - margin)
y1 = int(box[1] - margin)
x2 = int(box[2] + margin)
y2 = int(box[3] + margin)
# All coordinates should be a positive value
x1 = max(0, x1)
y1 = max(0, y1)
x2 = max(0, x2)
y2 = max(0, y2)
# x-coordinates should be less than the frame's width
x1 = min(frame_width, x1)
x2 = min(frame_width, x2)
# y-coordinates should be less than the frame's height
y1 = min(frame_height, y1)
y2 = min(frame_height, y2)
return x1, y1, x2, y2
def remove_foreground(frame, box_list):
"""removes object boxes from an image"""
background = frame.copy()
background = np.array(background, dtype = np.float64)
# remove object boxes from the background
for box in box_list:
# amend the possible coordinate errors in the box
x1, y1, x2, y2 = standardize_box(box, 0)
# replace the bounding box area with an empty area
background[y1:y2, x1:x2, :] = np.nan
return background
def interpolate_box(box_2, box_1):
"""Bounding box interpolation for stride object detection"""
z1 = (box_2[0] + 2 * box_1[0]) // 3
z2 = (box_2[1] + 2 * box_1[1]) // 3
z3 = (box_2[2] + 2 * box_1[2]) // 3
z4 = (box_2[3] + 2 * box_1[3]) // 3
q1 = (2 * box_2[0] + box_1[0]) // 3
q2 = (2 * box_2[1] + box_1[1]) // 3
q3 = (2 * box_2[2] + box_1[2]) // 3
q4 = (2 * box_2[3] + box_1[3]) // 3
return [z1, z2, z3, z4], [q1, q2, q3, q4]
def crop_image(image, box):
image_crop = image[box[1]:box[3], box[0]:box[2], :]
return np.array(image_crop, dtype=np.uint8)
# *************************************************************************** #
# ********************************* Classes ********************************* #
# *************************************************************************** #
class DotAccess(dict):
"""Provides dot.notation access to dictionary attributes"""
__getattr__ = dict.get
__setattr__ = dict.__setitem__
__delattr__ = dict.__delitem__
class BackgroundFIFO:
"""Provides a FIFO of background for the low-precision detector"""
def __init__(self, size):
# Determine the FIFO size
self.size = size
# Create a list with the user-defined size
self.background_list = [None] * self.size
# Reset the pointer to the list's new entry index
self.replacement_candidate = 0
def initialize(self, initial_image):
""" Clears and initializes the FIFO"""
# Initialize all FIFO blocks with the first background
self.background_list = [initial_image] * self.size
# Reset the pointer to the list's new entry index
self.replacement_candidate = 0
return True
def add(self, image):
""" Adds an image to the FIFO"""
# Put the image in the address pointed by replacement_candidate
self.background_list[self.replacement_candidate] = image
# Update the pointer
self.replacement_candidate += 1
if self.replacement_candidate == self.size:
self.replacement_candidate = 0
return True
def read(self):
""" Returns all images added to the FIFO"""
return self.background_list
# *************************************************************************** #
# **************************** Hyper-parameters ***************************** #
# *************************************************************************** #
''' ******************* Video Synopsis Hyper-parameters ******************* '''
# The number of background samples
BACKGROUND_SAMPLES_COUNT = 50
# Minimum tube length for valid tubes
MIN_TUBE_LENGTH = 100
# Bounding box cropping margin for foreground extraction
CROP_MARGIN = 40
# Distance threshold for conjunct tubes
DISTANCE_TH = 0.017
# Collision threshold for conjunct tubes
COLLISION_TH = 75
# Small area boxes overlooking threshold for tube grouping
AREA_TH = 600
# Small area boxes overlooking threshold for group arrangement
BOX_AREA_TH = 0
# Group count for group arrangement
GROUP_COUNT = 25
# Decay rate for group arrangement
DECAY_RATE = 0.97
# Conflict cost threshold for group arrangement
CONFLICT_TH = 0.04
''' ************** The emptyframe detector Hyper-parameters ************** '''
# Use BACKGROUND_FIFO_SIZE of background samples to generate background
BACKGROUND_FIFO_SIZE = 10
# Background sampling occurs every BACKGROUND_REFRESH_TIME frames
BACKGROUND_REFRESH_TIME = 1000
# Static detector kernels
KERNEL_ERODE = np.ones((3, 3), np.uint8)
KERNEL_DIL = np.ones((9, 9), np.uint8)
# The minimum area of a box containing human
HUMAN_AREA_L = 30
# The maximum area of a box containing human
HUMAN_AREA_U = 1000
# The aspect ratio of a box containing human
HUMAN_ASPECT_RATIO = 1.2
# Determine the maximum period that a generated background is valid to use
VALID_BACKGROUND_PERIOD = 500
# Define a reduced size for the background image in the empty frame detector
# to improve computational efficiency.
DOWN_RATE = 4
# Min interval to save frames(ensures frames come from different time periods for background generation)
MIN_SAVE_INTERVAL = 300
''' ************* Tube objects' segmentation Hyper-parameters ************* '''
# The extra padding to avoid computational errors
EXTRA_PAD = 20
# The threshold value for segmentation masks
MASK_THRESHOLD = 40
# The area ratio for valid foreground mask identification
FOREGROUND_RATIO = 0.45
# The height ratio for valid foreground mask identification
HEIGHT_RATIO = 0.9
# *************************************************************************** #
# ******************************** Variables ******************************** #
# *************************************************************************** #
# The initial state of the object detector
emptyframe_detection = False
# The report file in the Test mode
report_file = False
''' ********************** Tube generation variables ********************** '''
# List of tubes
tube_list = []
# Skipped frames if stride is used
skipped_frame_1 = None
skipped_frame_2 = None
''' ****************** The emptyframe detector variables ***************** '''
# A FIFO to keep background samples
background_FIFO = BackgroundFIFO(BACKGROUND_FIFO_SIZE)
# A counter to indicate the time for background sampling
background_utilization_time = 0
# Frame number of the last background that was generated.
last_generated_background = -VALID_BACKGROUND_PERIOD
# Frame number of the last empty-frame saved for background generation
last_saved_emptyframe = 0
# Flag to check the background FIFO is initialized.
background_fifo_initialized = False
''' ************************* Synopsis variables ************************** '''
# Synopsis background sampling period
sampling_period = 0
# Synopsis background sampling counter
background_counter = 0
# *************************************************************************** #
# *********************************** Main ********************************** #
# *************************************************************************** #
# Parse arguments
args = parse_arguments()
# Determine paths to folders used by the app
samples_path = os.path.join(args['IDPath'], 'samples')
# Make folders used by the app
make_temporary_directories()
# Load the input video
video_capture = cv2.VideoCapture(args['InPath'])
# Extract the video metadata
frame_rate = video_capture.get(cv2.CAP_PROP_FPS)
frame_width = int(video_capture.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(video_capture.get(cv2.CAP_PROP_FRAME_HEIGHT))
frames_count = int(video_capture.get(cv2.CAP_PROP_FRAME_COUNT))
# Define a reduced size for the background image in the empty frame detector
# to improve computational efficiency.
DOWNSIZED_BACKGROUND_WIDTH = frame_width // DOWN_RATE
DOWNSIZED_BACKGROUND_HEIGHT = frame_height // DOWN_RATE
# Specify the address to the output video
output_path = os.path.join(args['IDPath'], 'Synopsis.mp4')
# Initialize the output video generator
video_synopsis = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*'mp4v'),
frame_rate, (frame_width, frame_height))
# Instantiate an object detector
model = YOLO(args['Model'], 'detect')
# Check for GPU availability
device = select_device(args['Device'])
# Devolve everything to selected devices
model.to(device)
# Package SFSORT arguments into the standard form
tracker_arguments = {"dynamic_tuning": args['Dynamic_tuning'], "cth": args['Cth'],
"high_th": args['HighTH'], "high_th_m": args['HighTH_m'],
"match_th_first": args['MatchTH1'],
"match_th_first_m": args['MatchTH1_m'],
"match_th_second": args['MatchTH2'], "low_th": args['LowTH'],
"new_track_th": args['NewTH'], "new_track_th_m": args['NewTH_m'],
"marginal_timeout": args['MarginTimeout'],
"central_timeout": frame_rate,
"horizontal_margin": args['HorizontalMargin'],
"vertical_margin": args['VerticalMargin'],
"frame_width": args['FrameWidth'],
"frame_height": args['FrameHeight']}
# Package the tracker args
tracker_arguments = DotAccess(tracker_arguments)
# Instantiate a tracker
tracker = SFSORT(tracker_arguments)
# Report preparation for the Test mode
if args['Test']:
# Specify the address to the report file
report_path = os.path.join(args['IDPath'], 'report.txt')
# Open the report file in the write mode
report_file = open(report_path, "w")
# Start Monitoring the RAM usage
tracemalloc.start()
# Register the current time for execution time measurement
current_moment = "Algorithm execution started at = " + str(time.time())
report_file.write(current_moment)
# Determine the sampling period for video synopsis background generation
sampling_period = frames_count // BACKGROUND_SAMPLES_COUNT
# Initialize runtimes for each component
yolo_runtime = 0
emptyframe_runtime = 0
tracking_runtime = 0
if args['InterpolateRange'] > 0:
# Initialize the frame buffer with shape [InterpolateRange, frame_height, frame_width, 3]
Frame_buffer = np.zeros((args['InterpolateRange'], frame_height, frame_width, 3), dtype=np.uint8)
start = time.time()
# Run the algorithm
for frame_number in range(frames_count):
# Read a frame
success, frame = video_capture.read()
# Print processing information every 100 frames
if frame_number % 100 == 0:
print(f"Processing frame {frame_number} of {frames_count}")
# Generate the video synopsis if everything is done
if not success or frame_number == frames_count-1: #frames_count-1
# Register the current time for execution time measurement
tube_generation_finishing_moment = time.time()
# Eliminate short tubes
for tube in tube_list:
if len(tube.images) < MIN_TUBE_LENGTH:
tube_list.remove(tube)
# Sort tubes
tube_list.sort(key=TubeID)
# Generate the background
background = extract_synopsis_background(samples_path)
# Fill the missed tube boxes with zero
tube_list = interpolate_missed_boxes(tube_list)
# Register the current time for execution time measurement
segmention_start = time.time()
# Extract the objects of each tube by segmentation
tube_list = segment_objects(tube_list, background, EXTRA_PAD,
MASK_THRESHOLD, FOREGROUND_RATIO,
HEIGHT_RATIO)
# Register the current time for execution time measurement
segmention_end = time.time()
# Register the current time for execution time measurement
synopsis_starting_moment = time.time()
# Generate the synopsis video
generate_synopsis_video(tube_list, frame_rate, DISTANCE_TH,
COLLISION_TH, AREA_TH, GROUP_COUNT, DECAY_RATE,
CONFLICT_TH, BOX_AREA_TH, background,
video_synopsis)
# Register the current time for execution time measurement
synopsis_finishing_moment = time.time()
# Report the results for the Test mode
if args['Test']:
# Register times for execution time measurement
moment = "\nTube generation finishing moment = "
moment += str(tube_generation_finishing_moment)
report_file.write(moment)
moment = "\nVideo Synopsis starting moment = "
moment += str(synopsis_starting_moment)
report_file.write(moment)
moment = "\nVideo Synopsis finishing moment = "
moment += str(synopsis_finishing_moment)
report_file.write(moment)
# Register the RAM usage
ram_usage = "\nRAM Usage = " + str(tracemalloc.get_traced_memory())
report_file.write(ram_usage)
break
# Save skipped frames in detection with strides
if args['UseStride']:
if frame_number % 3 == 1:
skipped_frame_1 = np.array(frame)
continue
if frame_number % 3 == 2:
skipped_frame_2 = np.array(frame)
continue
if args['InterpolateRange'] > 0:
# Shift all existing frames one step to the left
Frame_buffer = np.roll(Frame_buffer, shift=-1, axis=0)
# Insert the new frame at the last position
Frame_buffer[-1] = frame
# Use the emptyframe object detector if the scene is empty of people
if emptyframe_detection:
# Register the current time for execution time measurement
start_emptyframe = time.time()
# Update the background if it has been a long time since the last update.
if frame_number > last_generated_background + VALID_BACKGROUND_PERIOD:
# Read all Samples
backgrounds = background_FIFO.read()
# Generate a new background
background_small = extract_background(backgrounds)
# Update background frame time
last_generated_background = frame_number
# Decrease the frame size to save computational resources
frame_small = cv2.resize(frame, (DOWNSIZED_BACKGROUND_HEIGHT, DOWNSIZED_BACKGROUND_WIDTH),
interpolation=cv2.INTER_NEAREST)
# Find the foreground using absolute difference between images
foreground = cv2.absdiff(background_small, frame_small)
# Convert the foreground's color system to the grayscale
foreground = cv2.cvtColor(foreground, cv2.COLOR_BGR2GRAY)
# Convert the foreground to a binary image
_, foreground_binary = cv2.threshold(foreground, 15, 255,
cv2.THRESH_BINARY)
# Do morphological operations on the foreground
foreground_erosion = cv2.erode(foreground_binary, KERNEL_ERODE,
iterations=1)
foreground_binary = cv2.dilate(foreground_erosion, KERNEL_DIL,
iterations=1)
# Find foreground contours
contours, hierarchy = cv2.findContours(foreground_binary,
cv2.RETR_EXTERNAL,
cv2.CHAIN_APPROX_TC89_L1)
# Contours with specific area and aspect ratio contain human
# Check for possible human presence in all contours
for contour in contours:
# Calculate the contour's area
area = cv2.contourArea(contour)
# Check area to detect human contour
if HUMAN_AREA_L < area < HUMAN_AREA_U:
# Convert the contour to an approximate bounding box
x, y, w, h = cv2.boundingRect(contour)
# Calculate the object's aspect ratio
aspect_ratio = w / h
# Check the aspect ratio to make sure of human presence
if aspect_ratio < HUMAN_ASPECT_RATIO:
# Start using high-precision detector
emptyframe_detection = False
break
# Periodically, add the current human-free sample to the FIFO as
# a new background sample. Also, save the frame as a sample for
# background extraction in the video synopsis
if background_utilization_time == BACKGROUND_REFRESH_TIME:
# Add the current frame to the backgrounds FIFO
frame_small = cv2.resize(frame, (DOWNSIZED_BACKGROUND_HEIGHT, DOWNSIZED_BACKGROUND_WIDTH),
interpolation=cv2.INTER_NEAREST)
background_FIFO.add(frame_small.copy())
# Reset the sampling counter
background_utilization_time = 0
# Save the frame for video synopsis background extraction
save_frame_as_background_sample(frame, frame_number)
else:
# Increase the sampling counter's value
background_utilization_time += 1
# Record the current time for execution measurement
end_emptyframe = time.time()
# Accumulate emptyframe detector runtime
emptyframe_runtime += end_emptyframe - start_emptyframe
if not emptyframe_detection:
# Object detector requires image dimensions to be multiples of
# 32. So, resize the frame by zero-padding for compatibility
# Right padding to reach the nearest multiple of 32 width
padding_right = 0
if frame_width % 32 != 0:
padding_right = ((frame_width // 32) + 1) * 32 - frame_width
# Bottom padding to reach the nearest multiple of 32 height
padding_bottom = 0
if frame_height % 32 != 0:
padding_bottom = ((frame_height // 32) + 1) * 32 - frame_height
# Zero-pad the frame
frame_padded = cv2.copyMakeBorder(frame, 0, padding_bottom, 0,
padding_right, cv2.BORDER_CONSTANT)
# Record the start time for YOLO prediction
start_yolo = time.time()
# Run the YOLO model to find objects in the padded frame
prediction = model.predict(frame_padded,
imgsz=(frame_padded.shape[0], frame_padded.shape[1]),
conf=0.1, iou=0.45,
half=False, device=device,
max_det=30,
classes=0)
# Record the end time for YOLO prediction
end_yolo = time.time()
# Accumulate YOLO runtime
yolo_runtime += end_yolo - start_yolo
# Exclude extra info from the predictions
prediction_results = prediction[0].boxes.cpu().numpy()
box_list = prediction_results.xyxy
score_list = prediction_results.conf
# Skip further analysis if detector didn't find anyone
if len(prediction_results) == 0:
# Reset the background counter value
background_counter = 0
# Save the frame for video synopsis background extraction
if frame_number > last_saved_emptyframe + MIN_SAVE_INTERVAL:
save_frame_as_background_sample(frame, frame_number)
last_saved_emptyframe = frame_number
# Prepare everything for the emptyframe detector
frame_small = cv2.resize(frame, (DOWNSIZED_BACKGROUND_HEIGHT, DOWNSIZED_BACKGROUND_WIDTH),
interpolation=cv2.INTER_NEAREST)
# Initialize the background FIFO if it is not initialized, or just save emptyframe.
if background_fifo_initialized:
background_FIFO.add(frame_small.copy())
else:
background_FIFO.initialize(frame_small.copy())
background_fifo_initialized = True
# Reset the background sampling counter
background_utilization_time = 0
# Use the emptyframe detector for further frames
emptyframe_detection = True
continue
# Periodically, save the frame's background as a background sample
# Extract a background sample when reaching the sampling period
if background_counter == sampling_period:
# Extract a background sample by removing objects from the frame
background = remove_foreground(frame, box_list)
# Save the frame for video synopsis background extraction
save_frame_as_background_sample(background, frame_number)
# Reset the counter value
background_counter = 0
else:
# Increase the counter value
background_counter += 1
# Record the start time for tracking
start_tracking = time.time()
# Pass the predictions to the tracker for updating tracks
tracks = tracker.update(box_list, score_list)
# Record the end time for tracking
end_tracking = time.time()
# Accumulate the total runtime for tracking
tracking_runtime += end_tracking - start_tracking
# Skip further analysis if the tracker is not tracking anyone
if len(tracks) == 0:
continue
# Add detections of the current frame to the tubes
for track in tracks:
# Convert the bounding box into the standard format
bbox = standardize_box(track[0], 0)
# Enlarge the box for motion estimation in segmentation
bbox_pad = standardize_box(track[0], CROP_MARGIN)
# Tube ID is same as the track ID
tube_id = track[1]
# Gather all tube IDs
tube_id_list = [tube.id for tube in tube_list]
# If the tube_list contains the track, update tubes
if tube_id in tube_id_list:
# Pop the tube to update it
tube = tube_list.pop(tube_id_list.index(tube_id))
# Append missed info in the case of using strides
if args['UseStride']:
# Interpolate boxes from skipped frames
box1, box2 = interpolate_box(bbox, tube.boxes[-1])
# Add boxes to the tube
tube.add_box(box1)
tube.add_box(box2)
# Interpolate padded boxes
box1_pad, box2_pad = interpolate_box(bbox_pad,
tube.boxes_pad[-1])
# Add padded boxes to the tube
tube.add_box_pad(box1_pad)
tube.add_box_pad(box2_pad)
# Save frame number for interpolated boxes
tube.add_frame_number(frame_number - 1)
tube.add_frame_number(frame_number)
# Save the object's image
object_image = crop_image(skipped_frame_1, box1_pad)
tube.add_image(object_image)
object_image = crop_image(skipped_frame_2, box2_pad)
tube.add_image(object_image)
# Check if interpolation is enabled
if args['InterpolateRange'] > 0:
# Check if the interval for the missing object is less than InterpolateRange
if frame_number - args['InterpolateRange'] in tube.frame_number:
# Find the index of the corresponding frame in the tube's frame numbers
index = tube.frame_number.index(frame_number - args['InterpolateRange'])
# Loop through the active interpolation range
for i in range(frame_number - args['InterpolateRange'], frame_number):
# Check if the object was missed
if i not in tube.frame_number:
# Interpolate the coordinates of the missing box
x1 = np.interp(i, tube.frame_number, np.array(tube.boxes)[:, 0])
y1 = np.interp(i, tube.frame_number, np.array(tube.boxes)[:, 1])
x2 = np.interp(i, tube.frame_number, np.array(tube.boxes)[:, 2])
y2 = np.interp(i, tube.frame_number, np.array(tube.boxes)[:, 3])
missed_box = standardize_box((x1, y1, x2, y2), 0)
# Enlarge the box for motion estimation in segmentation
missed_box_pad = standardize_box(missed_box, CROP_MARGIN)
# Insert the new frame number and corresponding box into the tube's frame number list
tube.frame_number.insert(index, i)
# Insert the interpolated box into the tube's boxes list
tube.boxes.insert(index, missed_box)
# Insert the interpolated padded box into the tube's padded boxes list
tube.boxes_pad.insert(index, missed_box_pad)
# Calculate the index of the corresponding frame in the Frame_buffer
buffer_index = args['InterpolateRange'] - 1 - (frame_number - i)
# Crop the image of the object from the Frame_buffer
object_image = crop_image(Frame_buffer[buffer_index], missed_box_pad)
# Insert the object's image into the tube
tube.images.insert(index, object_image)
# Increment the index for the next insertion
index += 1
else:
# Create new tube
tube = Tube(frame_number, tube_id)
# Register the tube's bounding box
tube.add_box(bbox)
tube.add_box_pad(bbox_pad)
# Register the last frame of the tube
tube.set_last_frame(frame_number)
# Save frame number
tube.add_frame_number(frame_number + 1)
# Save the object's image
object_image = crop_image(frame, bbox_pad)
tube.add_image(object_image)
# Update Tube List
tube_list.append(tube)
# Terminate the test mode
if args['Test']:
# Close the report file
report_file.close()
# Stop Monitoring the RAM usage
tracemalloc.stop()
# Close the input file
video_capture.release()
# Close the output file
video_synopsis.release()
# Print the runtime of each component
print("Yolo_Runtime:", yolo_runtime)
print("EmptyFrameDetector_Runtime:", emptyframe_runtime)
print("Tracking_Runtime:", tracking_runtime)
print("Segmention_Runtime:", segmention_end - segmention_start)
print("Synopsis_Runtime:", synopsis_finishing_moment - synopsis_starting_moment)
print("Total_Runtime:", synopsis_finishing_moment - start)