-
Notifications
You must be signed in to change notification settings - Fork 0
/
logdata
executable file
·234 lines (204 loc) · 7.1 KB
/
logdata
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
#!/usr/bin/python3
from __future__ import division, absolute_import, print_function, unicode_literals
import argparse
import sys, termios, tty
from curses import ascii
from datetime import datetime, timezone, timedelta
import time
import json
import os, io
import subprocess
import collections, re
data_dir = os.path.join(os.path.expanduser("~"), "logdata")
tracks_dir = os.path.join(data_dir, "tracks")
suggestions = None
prompt_str = ""
def get_datetime(ns=True):
if ns:
return datetime.now(timezone(timedelta(seconds=-time.altzone)))
else:
return datetime.fromtimestamp(int(time.time()), timezone(timedelta(seconds=-time.altzone)))
def get_data_file(dt, mkdir=False):
path = os.path.join(data_dir, "logdata-" + str(dt.year) + str(dt.month).zfill(2) + ".txt")
if mkdir:
try: os.mkdir(os.path.dirname(path))
except FileExistsError: pass
return path
def get_event_stats():
last_events = []
events_by_frequency = {}
re_event = re.compile("^([^ ]+) EVENT ([\w-]+)$")
dt = get_datetime()
for df in [
get_data_file(datetime(dt.year, dt.month, 1) - timedelta(1)),
get_data_file(dt)
]:
if os.path.isfile(df):
with io.open(df) as fp:
for line in fp:
line = line.strip()
matches = re_event.match(line)
if matches:
ev_name = matches.group(2)
last_events.append(line)
if len(last_events) > 5:
last_events.pop(0)
if ev_name not in events_by_frequency:
events_by_frequency[ev_name] = 1
else:
events_by_frequency[ev_name] += 1
events_by_frequency = collections.OrderedDict(sorted(events_by_frequency.items(), key=lambda x: x[1], reverse=True))
def get_sugestions(suffix):
nonlocal events_by_frequency
suggestions = []
re_sugg = None
if suffix and suffix.find("-") == -1:
re_sugg = re.compile("^" + "\w*-".join(suffix) + "\w*")
for ev_name in events_by_frequency.keys():
if ev_name.startswith(suffix) or (re_sugg and re_sugg.match(ev_name)):
suggestions.append(ev_name)
return suggestions
return last_events, get_sugestions
def clear():
print("\x1b[H\x1b[2J", end="")
def alternate_buffer(on=True):
if on:
print("\x1b[?1049h", end="")
else:
print("\x1b[?1049l", end="")
def print_prompt(token=False):
global prompt_str
print("> " + prompt_str, end="")
sys.stdout.flush()
c = sys.stdin.read(1)
if c == "\x7f": # backspace
if prompt_str:
prompt_str = prompt_str[:-1]
else:
return -2
elif c == "\x0a": # enter
return -1
elif not token or ascii.isalpha(c) or c == "-":
prompt_str += c
elif ascii.isdigit(c):
return ord(c) - 48
return None
def confirm_prompt():
while True:
c = sys.stdin.read(1)
if c == "\x7f": # backspace
return False
elif c == "\x0a": # enter
return True
def start_location_track(name):
clear()
t_start = time.time_ns()
path = os.path.join(tracks_dir, name + "-" + str(t_start) + ".txt")
try: os.mkdir(os.path.dirname(tracks_dir))
except FileExistsError: pass
try: os.mkdir(os.path.dirname(path))
except FileExistsError: pass
clear()
print("start (" + name + ")")
head = json.dumps({
"version": 1,
"type": "termux-gps",
"time": t_start,
"interval": 5
})
with io.open(path, "w", encoding="utf-8") as fp:
fp.write(head + "\n")
fp.flush()
while True:
try:
print("next")
t_before = time.time_ns()
out = subprocess.check_output(["termux-location", "-p", "gps", "-r", "once"])
t_after = time.time_ns()
print(str((t_after - t_before)/1e9))
result = {
"t_before": t_before,
"t_after": t_after,
"data": json.loads(out)
}
print("lat: " + str(result["data"]["latitude"]) + ", log: " + str(result["data"]["longitude"]) + ", alt: " + str(result["data"]["altitude"]))
print("acc: " + str(result["data"]["accuracy"]))
fp.write(json.dumps(result) + "\n")
fp.flush()
time.sleep(5)
except KeyboardInterrupt:
print("end")
break
def save_data(data, data_type="TEXT", ns=True):
dt = get_datetime(ns)
data_file = get_data_file(dt, True)
entry = dt.isoformat() + " " + data_type + " " + data
clear()
print("Data Logger (" + data_type + ")")
print()
print(entry)
print()
print("[Backspace] to cancel, [Enter] to save")
if confirm_prompt():
with io.open(data_file, "a+", encoding="utf-8") as fp:
fp.write(entry + "\n")
return True
return False
def main(args):
data_type = "EVENT" if args.event else "TEXT"
if args.event:
last_events, get_sugestions = get_event_stats()
while True:
clear()
if args.track:
print("Track")
else:
print("Data Logger (" + data_type + ")")
print()
if args.event:
if last_events:
for s in last_events:
print(s)
print()
suges = get_sugestions(prompt_str)
if suges:
i = 1
for ev_name in suges:
print(str(i) + ". " + ev_name)
if i == 0:
break
i = (i + 1)%10
print()
n = print_prompt(args.event or args.track)
if n is not None:
if n == -2:
break
if n == -1 and prompt_str:
if args.track:
start_location_track(prompt_str)
break
elif save_data(prompt_str, data_type, args.ns):
break
elif args.event and (n > 0 and n <= len(suges) or n == 0 and len(suges) >= 10):
if save_data(suges[9 if n == 0 else n - 1], data_type, args.ns):
break
if __name__ == "__main__":
parser = argparse.ArgumentParser(prog="logdata")
group = parser.add_mutually_exclusive_group()
group.add_argument('--event', action="store_true")
group.add_argument('--track', action="store_true")
parser.add_argument('--ns', action="store_true")
args = parser.parse_args()
alternate_buffer()
attr = termios.tcgetattr(sys.stdin.fileno())
exit_message = None
try:
tty.setcbreak(sys.stdin.fileno(), termios.TCSADRAIN)
main(args)
except KeyboardInterrupt:
exit_message = "KeyboardInterrupt"
finally:
termios.tcsetattr(sys.stdin.fileno(), termios.TCSADRAIN, attr)
alternate_buffer(False)
if exit_message:
print(exit_message)