-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsave_data.py
111 lines (98 loc) · 3.57 KB
/
save_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import cv2
import numpy as np
import json, yaml
import datetime as dt
"""
run as thread, writes live payload data: ./liveDataLog:
/camera_data: video frames as jpeg, filename=timestamp (publish)
/radcam_log.json:
camera: {topic, sub_ts, pub_ts (matching filename in ./camera_data), frame_id}
radar: {topic, sub_ts, pub_ts, radar_json (sensor id, x, y, z, tlv, tid, frame)}
"""
IMG_PATH = "./liveDataLog/camera_data/"
RADCAM_PATH = "./liveDataLog/radcam_log.json"
# SAMPLE_IMG = "./data/sample_frame.png"
# data_array = cv2.imread(SAMPLE_IMG)
CAMERA_TOPIC = "data/livecamera"
RADAR_TOPIC = "data/liveradar"
with open("config.yaml", "r") as f:
config = yaml.safe_load(f)
def save_data(topic: str, payload: bytes | str) -> dict | None:
"""
saves live camera frames and radar data to ./liveDataLog/camera_data and ./liveDataLog/radcam_log.json
"""
# function attribute to keep track of frame_id
if not hasattr(save_data, "frame_id"):
save_data.frame_id = 0
cam_object = radar_object = None
# save camera data frames
sub_ts = str(dt.datetime.now().isoformat())
if topic == CAMERA_TOPIC:
save_data.frame_id += 1
cam_ts = payload[-26:].decode("utf-8") # ts is 26 bytes extension
frame_payload = payload[:-26] # remove timestamp
frame = np.frombuffer(frame_payload, dtype=np.uint8)
height = config["LiveData"]["camera"]["height"]
width = config["LiveData"]["camera"]["width"]
frame = frame.reshape((height, width, 3))
ts_filename = cam_ts.replace(":", "-").replace(".", "-")
cv2.imwrite(IMG_PATH + ts_filename + ".jpg", frame)
# create cam object for json
cam_object = {
"topic": topic,
"sub_ts": sub_ts,
"pub_ts": cam_ts,
"frame_id": save_data.frame_id,
}
# save radar data
if topic == RADAR_TOPIC:
radar_object1 = {
"topic": topic,
"sub_ts": sub_ts,
}
try:
radar_object2 = {"radar_payload": json.loads(payload.decode("utf-8"))}
except json.JSONDecodeError:
radar_object2 = {"radar_payload": payload.decode("utf-8")}
radar_object = {**radar_object1, **radar_object2}
with open(RADCAM_PATH, "a") as f:
if f.tell() == 0:
f.write("[")
else:
f.write(",")
if cam_object:
json.dump(cam_object, f)
if radar_object:
json.dump(radar_object, f)
"""
class CamPayload:
def __init__(self, sub_ts, pub_ts, frame_id) -> None:
self.topic = CAMERA_TOPIC
self.sub_ts = sub_ts
self.pub_ts = pub_ts
self.frame_id = frame_id
def __repr__(self) -> str:
return f"CamPayload(topic={self.topic}, sub_ts={self.sub_ts}, pub_ts={self.pub_ts}, frame_id={self.frame_id})"
def to_json(self) -> dict:
return {
"topic": self.topic,
"sub_ts": self.sub_ts,
"pub_ts": self.pub_ts,
"frame_id": self.frame_id
}
class RadarPayload:
def __init__(self, sub_ts, pub_ts, radar_json) -> None:
self.topic = RADAR_TOPIC
self.sub_ts = sub_ts
self.pub_ts = pub_ts
self.radar_json = radar_json
def __repr__(self) -> str:
return f"RadarPayload(topic={self.topic}, sub_ts={self.sub_ts}, pub_ts={self.pub_ts}, radar_json={self.radar_json})"
def to_json(self) -> dict:
return {
"topic": self.topic,
"sub_ts": self.sub_ts,
"pub_ts": self.pub_ts,
"radar_json": self.radar_json
}
"""