-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathvideo_input.py
76 lines (61 loc) · 2.44 KB
/
video_input.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
from abc import ABC, abstractmethod
from threading import Thread, Event
import logging
import numpy as np
logger = logging.getLogger(__name__)
class VideoInput(ABC, Thread):
"""
Wrapper for all the possible video inputs of the pipeline.
The frame acquisition process by the app runs its own thread in order not to interfere add any delay in the main loop of the program.
"""
def __init__(self):
super().__init__()
self.configure()
self.frame = None
self.stop_event = Event()
self.frame_available = Event()
@abstractmethod
def configure(self) -> None:
"""
Allows to setup some settings or parameters specific to the video input.
As it's called during the initialization and most of the time needs the camera to be instanced before configuring it, the super().__init__() must be called *at the end* of the __init__ function of a subclass
In the case of camera input, it can correspond to the camera settings or any parameter setup to trigger it.
It can also theoretically handle videos, not only live stream devices.
"""
pass
@abstractmethod
def read_frame(self) -> np.ndarray:
"""
Grabs a frame from the video input
"""
pass
@abstractmethod
def cleanup(self) -> None:
"""
Cleanup the current context.
For example, the code in charge of the camera unbinding should be written in this function
It's called in the `stop` function
"""
pass
def run(self):
"""
Abstract method implementation coming from threading.Thread
It contains the main loop of this thread.
At each iteration, it grabs a new frame and it signals that the image is available via the frame_available threading.Event
The while loop stops when the stop_event evnet is triggered. See: function`stop`
"""
while not self.stop_event.is_set():
self.frame = self.read_frame()
self.frame_available.set()
def stop(self):
"""
Abstract method implementation coming from threading.Thread
When called, it triggers the stop_event event and calls the cleanup method
"""
logger.info('Stopping video stream')
self.stop_event.set()
self.cleanup()
def get_frame(self):
self.frame_available.wait()
self.frame_available.clear()
return self.frame