-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDetector.py
152 lines (126 loc) · 4.11 KB
/
Detector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
from csv import reader
import yagmail
from os import environ
from time import sleep
from time import time
import cv2
import torch
import torchvision.transforms as transforms
import requests
import json
# GLOBAL VARS
CHECK_PERIOD = 7 # checks for a pet every X seconds
MAIL_SEND_TIMEOUT = 30 # waits for X seconds after sending out all the emails
INCLUDE_IMAGE = True
REPEAT_DETECT = 2
MAX_SEND_ROW = 4 # How many times to send the notification before long timeout
LONG_TIMEOUT = 180
CROP_X = 700
CROP_Y = 420
CROP_SIZE = 200
# Set up for notifications
serverToken = environ.get("FIREBASE_PET_TOKEN")
deviceToken = '/topics/all'
headers = {
'Content-Type': 'application/json',
'Authorization': 'key=' + serverToken,
}
body = {
'notification': {'title': 'Pes / Kočka je u dveří',
'body': 'Detekuji kočku nebo psa'},
'to':deviceToken,
'priority': 'high',
# 'data': dataPayLoad,
}
# Set up for emails
yagmail.register(environ.get("DOG_MAIL"), environ.get("DOG_MAIL_PASS"))
SENDER_EMAIL = environ.get("DOG_MAIL")
RECIEVER_EMAIL = []
with open("emails.csv") as f:
_emails = reader(f)
for row in _emails:
RECIEVER_EMAIL.append(row)
BODY = """
Pes nebo kocka je u dveri!!
"""
# Create a device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Create the model
model = torch.load("2Epochs0005lrv2.pth",map_location=device)
model.eval()
def detect(model):
cap = cv2.VideoCapture(environ.get("CAMERA_2_ADDR"))
_, frame = cap.read()
# Crop the image
frame = frame[CROP_Y:CROP_Y + CROP_SIZE, CROP_X:CROP_X + CROP_SIZE]
# Torch transforms
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Grayscale(),
transforms.Resize((100, 100))
])
image_data = frame
image_data = transform(image_data)
image_data = torch.unsqueeze(image_data, 0).float()
pred = model(image_data.to(device))
pred = (pred >= 0.5).item()
print("[ LOG ] Pet present: ",pred)
cap.release()
return pred
print("[ READY ] Loading has finished!")
def send_notification(body):
response = requests.post("https://fcm.googleapis.com/fcm/send",headers = headers, data=json.dumps(body))
print(f"[ SEND ] notification sent. Got back: {response.status_code}")
def send_email(sender_email,reciever_email,body):
print("[ SEND ] sending email")
yag = yagmail.SMTP(sender_email)
for person in reciever_email:
yag.send(
to=person,
subject="Pes nebo kocka cekaji!",
contents=body,
#attachments="Data/Dog/cat915.jpg",
)
print("[ SEND ] done")
RUNNING = True
LAST_SENT_TIME = None
NUM_SENT = 0
while RUNNING:
try:
prediction = detect(model)
if prediction == 1:
if LAST_SENT_TIME is None:
_ok = True
# Repeat for n times to make sure that it wasnt a false positive
for repeat in range(REPEAT_DETECT):
pred = detect(model)
if pred == 1:
continue
else:
_ok = False
if _ok:
LAST_SENT_TIME = time()
send_notification(body)
send_email(SENDER_EMAIL,RECIEVER_EMAIL,BODY)
NUM_SENT += 1
else:
now = time()
if (now - LAST_SENT_TIME) >= MAIL_SEND_TIMEOUT:
if NUM_SENT < MAX_SEND_ROW:
send_notification(body)
send_email(SENDER_EMAIL,RECIEVER_EMAIL,BODY)
LAST_SENT_TIME = None
NUM_SENT += 1
elif (now - LAST_SENT_TIME) >= LONG_TIMEOUT:
NUM_SENT = 0
else:
if LAST_SENT_TIME is not None:
LAST_SENT_TIME = None
NUM_SENT = 0
sleep(CHECK_PERIOD)
except Exception as e:
print(e)
RUNNING = False
except KeyboardInterrupt:
RUNNING = False
print("Shutting down!")