-
Notifications
You must be signed in to change notification settings - Fork 69
/
Copy pathdish_common.py
445 lines (380 loc) · 18.8 KB
/
dish_common.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
"""Shared code among the dish_grpc_* commands
Note:
This module is not intended to be generically useful or to export a stable
interface. Rather, it should be considered an implementation detail of the
other scripts, and will change as needed.
For a module that exports an interface intended for general use, see
starlink_grpc.
"""
import argparse
from datetime import datetime
from datetime import timezone
import logging
import re
import time
from typing import List
import grpc
import starlink_grpc
BRACKETS_RE = re.compile(r"([^[]*)(\[((\d+),|)(\d*)\]|)$")
LOOP_TIME_DEFAULT = 0
STATUS_MODES: List[str] = ["status", "obstruction_detail", "alert_detail", "location"]
HISTORY_STATS_MODES: List[str] = [
"ping_drop", "ping_run_length", "ping_latency", "ping_loaded_latency", "usage", "power"
]
UNGROUPED_MODES: List[str] = []
def create_arg_parser(output_description, bulk_history=True):
"""Create an argparse parser and add the common command line options."""
parser = argparse.ArgumentParser(
description="Collect status and/or history data from a Starlink user terminal and " +
output_description,
epilog="Additional arguments can be read from a file by including @FILENAME as an "
"option, where FILENAME is a path to a file that contains arguments, one per line.",
fromfile_prefix_chars="@",
add_help=False)
# need to remember this for later
parser.bulk_history = bulk_history
group = parser.add_argument_group(title="General options")
group.add_argument("-g",
"--target",
help="host:port of dish to query, default is the standard IP address "
"and port (192.168.100.1:9200)")
group.add_argument("-h", "--help", action="help", help="Be helpful")
group.add_argument("-N",
"--numeric",
action="store_true",
help="Record boolean values as 1 and 0 instead of True and False")
group.add_argument("-t",
"--loop-interval",
type=float,
default=float(LOOP_TIME_DEFAULT),
help="Loop interval in seconds or 0 for no loop, default: " +
str(LOOP_TIME_DEFAULT))
group.add_argument("-v", "--verbose", action="store_true", help="Be verbose")
group = parser.add_argument_group(title="History mode options")
group.add_argument("-a",
"--all-samples",
action="store_const",
const=-1,
dest="samples",
help="Parse all valid samples")
group.add_argument("-o",
"--poll-loops",
type=int,
help="Poll history for N loops and aggregate data before computing history "
"stats; this allows for a smaller loop interval with less loss of data "
"when the dish reboots",
metavar="N")
if bulk_history:
sample_help = ("Number of data samples to parse; normally applies to first loop "
"iteration only, default: all in bulk mode, loop interval if loop "
"interval set, else all available samples")
no_counter_help = ("Don't track sample counter across loop iterations in non-bulk "
"modes; keep using samples option value instead")
else:
sample_help = ("Number of data samples to parse; normally applies to first loop "
"iteration only, default: loop interval, if set, else all available " +
"samples")
no_counter_help = ("Don't track sample counter across loop iterations; keep using "
"samples option value instead")
group.add_argument("-s", "--samples", type=int, help=sample_help)
group.add_argument("-j", "--no-counter", action="store_true", help=no_counter_help)
return parser
def run_arg_parser(parser, need_id=False, no_stdout_errors=False, modes=None):
"""Run parse_args on a parser previously created with create_arg_parser
Args:
need_id (bool): A flag to set in options to indicate whether or not to
set dish_id on the global state object; see get_data for more
detail.
no_stdout_errors (bool): A flag set in options to protect stdout from
error messages, in case that's where the data output is going, so
may be being redirected to a file.
modes (list[str]): Optionally provide the subset of data group modes
to allow.
Returns:
An argparse Namespace object with the parsed options set as attributes.
"""
if modes is None:
modes = STATUS_MODES + HISTORY_STATS_MODES + UNGROUPED_MODES
if parser.bulk_history:
modes.append("bulk_history")
parser.add_argument("mode",
nargs="+",
choices=modes,
help="The data group to record, one or more of: " + ", ".join(modes),
metavar="mode")
opts = parser.parse_args()
if opts.loop_interval <= 0.0 or opts.poll_loops is None:
opts.poll_loops = 1
elif opts.poll_loops < 2:
parser.error("Poll loops arg must be 2 or greater to be meaningful")
# for convenience, set flags for whether any mode in a group is selected
status_set = set(STATUS_MODES)
opts.status_mode = bool(status_set.intersection(opts.mode))
status_set.remove("location")
# special group for any status mode other than location
opts.pure_status_mode = bool(status_set.intersection(opts.mode))
opts.history_stats_mode = bool(set(HISTORY_STATS_MODES).intersection(opts.mode))
opts.bulk_mode = "bulk_history" in opts.mode
if opts.samples is None:
opts.samples = int(opts.loop_interval) if opts.loop_interval >= 1.0 else -1
opts.bulk_samples = -1
else:
# for scripts that query starting history counter, skip it if samples
# was explicitly set
opts.skip_query = True
opts.bulk_samples = opts.samples
opts.no_stdout_errors = no_stdout_errors
opts.need_id = need_id
return opts
def conn_error(opts, msg, *args):
"""Indicate an error in an appropriate way."""
# Connection errors that happen in an interval loop are not critical
# failures, but are interesting enough to print in non-verbose mode.
if opts.loop_interval > 0.0 and not opts.no_stdout_errors:
print(msg % args)
else:
logging.error(msg, *args)
class GlobalState:
"""A class for keeping state across loop iterations."""
def __init__(self, target=None):
# counter, timestamp for bulk_history:
self.counter = None
self.timestamp = None
# counter, timestamp for history stats:
self.counter_stats = None
self.timestamp_stats = None
self.dish_id = None
self.context = starlink_grpc.ChannelContext(target=target)
self.poll_count = 0
self.accum_history = None
self.first_poll = True
self.warn_once_location = True
def shutdown(self):
self.context.close()
def get_data(opts, gstate, add_item, add_sequence, add_bulk=None, flush_history=False):
"""Fetch data from the dish, pull it apart and call back with the pieces.
This function uses call backs to return the useful data. If need_id is set
in opts, then it is guaranteed that dish_id will have been set in gstate
prior to any of the call backs being invoked.
Args:
opts (object): The options object returned from run_arg_parser.
gstate (GlobalState): An object for keeping track of state across
multiple calls.
add_item (function): Call back for non-sequence data, with prototype:
add_item(name, value, category)
add_sequence (function): Call back for sequence data, with prototype:
add_sequence(name, value, category, start_index_label)
add_bulk (function): Optional. Call back for bulk history data, with
prototype:
add_bulk(bulk_data, count, start_timestamp, start_counter)
flush_history (bool): Optional. If true, run in a special mode that
emits (only) history stats for already polled data, if any,
regardless of --poll-loops state. Intended for script shutdown
operation, in order to flush stats for polled history data which
would otherwise be lost on script restart.
Returns:
Tuple with 3 values. The first value is 1 if there were any failures
getting data from the dish, otherwise 0. The second value is an int
timestamp for status data (data with category "status"), or None if
no status data was reported. The third value is an int timestamp for
history stats data (non-bulk data with category other than "status"),
or None if no history stats data was reported.
"""
if flush_history and opts.poll_loops < 2:
return 0, None, None
rc = 0
status_ts = None
hist_ts = None
if not flush_history:
rc, status_ts = get_status_data(opts, gstate, add_item, add_sequence)
if opts.history_stats_mode and (not rc or opts.poll_loops > 1):
hist_rc, hist_ts = get_history_stats(opts, gstate, add_item, add_sequence, flush_history)
if not rc:
rc = hist_rc
if not flush_history and opts.bulk_mode and add_bulk and not rc:
rc = get_bulk_data(opts, gstate, add_bulk)
return rc, status_ts, hist_ts
def add_data_normal(data, category, add_item, add_sequence):
for key, val in data.items():
name, start, seq = BRACKETS_RE.match(key).group(1, 4, 5)
if seq is None:
add_item(name, val, category)
else:
add_sequence(name, val, category, int(start) if start else 0)
def add_data_numeric(data, category, add_item, add_sequence):
for key, val in data.items():
name, start, seq = BRACKETS_RE.match(key).group(1, 4, 5)
if seq is None:
add_item(name, int(val) if isinstance(val, int) else val, category)
else:
add_sequence(name,
[int(subval) if isinstance(subval, int) else subval for subval in val],
category,
int(start) if start else 0)
def get_status_data(opts, gstate, add_item, add_sequence):
if opts.status_mode:
timestamp = int(time.time())
add_data = add_data_numeric if opts.numeric else add_data_normal
if opts.pure_status_mode or opts.need_id and gstate.dish_id is None:
try:
groups = starlink_grpc.status_data(context=gstate.context)
status_data, obstruct_detail, alert_detail = groups[0:3]
except starlink_grpc.GrpcError as e:
if "status" in opts.mode:
if opts.need_id and gstate.dish_id is None:
conn_error(opts, "Dish unreachable and ID unknown, so not recording state")
return 1, None
if opts.verbose:
print("Dish unreachable")
add_item("state", "DISH_UNREACHABLE", "status")
return 0, timestamp
conn_error(opts, "Failure getting status: %s", str(e))
return 1, None
if opts.need_id:
gstate.dish_id = status_data["id"]
del status_data["id"]
if "status" in opts.mode:
add_data(status_data, "status", add_item, add_sequence)
if "obstruction_detail" in opts.mode:
add_data(obstruct_detail, "status", add_item, add_sequence)
if "alert_detail" in opts.mode:
add_data(alert_detail, "status", add_item, add_sequence)
if "location" in opts.mode:
try:
location = starlink_grpc.location_data(context=gstate.context)
except starlink_grpc.GrpcError as e:
conn_error(opts, "Failure getting location: %s", str(e))
return 1, None
if location["latitude"] is None and gstate.warn_once_location:
logging.warning("Location data not enabled. See README for more details.")
gstate.warn_once_location = False
add_data(location, "status", add_item, add_sequence)
return 0, timestamp
elif opts.need_id and gstate.dish_id is None:
try:
gstate.dish_id = starlink_grpc.get_id(context=gstate.context)
except starlink_grpc.GrpcError as e:
conn_error(opts, "Failure getting dish ID: %s", str(e))
return 1, None
if opts.verbose:
print("Using dish ID: " + gstate.dish_id)
return 0, None
def get_history_stats(opts, gstate, add_item, add_sequence, flush_history):
"""Fetch history stats. See `get_data` for details."""
if flush_history or (opts.need_id and gstate.dish_id is None):
history = None
else:
try:
timestamp = int(time.time())
history = starlink_grpc.get_history(context=gstate.context)
gstate.timestamp_stats = timestamp
except (AttributeError, ValueError, grpc.RpcError) as e:
conn_error(opts, "Failure getting history: %s", str(starlink_grpc.GrpcError(e)))
history = None
parse_samples = opts.samples if gstate.counter_stats is None else -1
start = gstate.counter_stats if gstate.counter_stats else None
# Accumulate polled history data into gstate.accum_history, even if there
# was a dish reboot.
if gstate.accum_history:
if history is not None:
gstate.accum_history = starlink_grpc.concatenate_history(gstate.accum_history,
history,
samples1=parse_samples,
start1=start,
verbose=opts.verbose)
# Counter tracking gets too complicated to handle across reboots
# once the data has been accumulated, so just have concatenate
# handle it on the first polled loop and use a value of 0 to
# remember it was done (as opposed to None, which is used for a
# different purpose).
if not opts.no_counter:
gstate.counter_stats = 0
else:
gstate.accum_history = history
# When resuming from prior count with --poll-loops set, advance the loop
# count by however many loops worth of data was caught up on. This helps
# avoid abnormally large sample counts in the first set of output data.
if gstate.first_poll and gstate.accum_history:
if opts.poll_loops > 1 and gstate.counter_stats:
new_samples = gstate.accum_history.current - gstate.counter_stats
if new_samples < 0:
new_samples = gstate.accum_history.current
if new_samples > len(gstate.accum_history.pop_ping_drop_rate):
new_samples = len(gstate.accum_history.pop_ping_drop_rate)
gstate.poll_count = max(gstate.poll_count, int((new_samples-1) / opts.loop_interval))
gstate.first_poll = False
if gstate.poll_count < opts.poll_loops - 1 and not flush_history:
gstate.poll_count += 1
return 0, None
gstate.poll_count = 0
if gstate.accum_history is None:
return (0, None) if flush_history else (1, None)
groups = starlink_grpc.history_stats(parse_samples,
start=start,
verbose=opts.verbose,
history=gstate.accum_history)
general, ping, runlen, latency, loaded, usage, power = groups[0:7]
add_data = add_data_numeric if opts.numeric else add_data_normal
add_data(general, "ping_stats", add_item, add_sequence)
if "ping_drop" in opts.mode:
add_data(ping, "ping_stats", add_item, add_sequence)
if "ping_run_length" in opts.mode:
add_data(runlen, "ping_stats", add_item, add_sequence)
if "ping_latency" in opts.mode:
add_data(latency, "ping_stats", add_item, add_sequence)
if "ping_loaded_latency" in opts.mode:
add_data(loaded, "ping_stats", add_item, add_sequence)
if "usage" in opts.mode:
add_data(usage, "usage", add_item, add_sequence)
if "power" in opts.mode:
add_data(power, "power", add_item, add_sequence)
if not opts.no_counter:
gstate.counter_stats = general["end_counter"]
timestamp = gstate.timestamp_stats
gstate.timestamp_stats = None
gstate.accum_history = None
return 0, timestamp
def get_bulk_data(opts, gstate, add_bulk):
"""Fetch bulk data. See `get_data` for details."""
before = time.time()
start = gstate.counter
parse_samples = opts.bulk_samples if start is None else -1
try:
general, bulk = starlink_grpc.history_bulk_data(parse_samples,
start=start,
verbose=opts.verbose,
context=gstate.context)
except starlink_grpc.GrpcError as e:
conn_error(opts, "Failure getting history: %s", str(e))
return 1
after = time.time()
parsed_samples = general["samples"]
new_counter = general["end_counter"]
timestamp = gstate.timestamp
# check this first, so it doesn't report as lost time sync
if gstate.counter is not None and new_counter != gstate.counter + parsed_samples:
timestamp = None
# Allow up to 2 seconds of time drift before forcibly re-syncing, since
# +/- 1 second can happen just due to scheduler timing.
if timestamp is not None and not before - 2.0 <= timestamp + parsed_samples <= after + 2.0:
if opts.verbose:
print("Lost sample time sync at: " +
str(datetime.fromtimestamp(timestamp + parsed_samples, tz=timezone.utc)))
timestamp = None
if timestamp is None:
timestamp = int(before)
if opts.verbose:
print("Establishing new time base: {0} -> {1}".format(
new_counter, datetime.fromtimestamp(timestamp, tz=timezone.utc)))
timestamp -= parsed_samples
if opts.numeric:
add_bulk(
{
k: [int(subv) if isinstance(subv, int) else subv for subv in v]
for k, v in bulk.items()
}, parsed_samples, timestamp, new_counter - parsed_samples)
else:
add_bulk(bulk, parsed_samples, timestamp, new_counter - parsed_samples)
gstate.counter = new_counter
gstate.timestamp = timestamp + parsed_samples
return 0