forked from sparky8512/starlink-grpc-tools
-
Notifications
You must be signed in to change notification settings - Fork 0
/
dish_obstruction_map.py
215 lines (188 loc) · 8.44 KB
/
dish_obstruction_map.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
#!/usr/bin/python3
"""Write a PNG image representing Starlink obstruction map data.
This scripts queries obstruction map data from the Starlink user terminal
(dish) reachable on the local network and writes a PNG image based on that
data.
Each pixel in the image represents the signal quality in a particular
direction, as observed by the dish. If the dish has not communicated with
satellites located in that direction, the pixel will be the "no data" color;
otherwise, it will be a color in the range from the "obstructed" color (no
signal at all) to the "unobstructed" color (sufficient signal quality for full
signal).
The coordinates of the pixels are the altitude and azimuth angles from the
horizontal coordinate system representation of the sky, converted to Cartesian
(rectangular) coordinates. The conversion is done in a way that maps all valid
directions into a circle that touches the edges of the image. Pixels outside
that circle will show up as "no data".
Azimuth is represented as angle from a line drawn from the center of the image
to the center of the top edge of the image, where center-top is 0 degrees
(North), the center of the right edge is 90 degrees (East), etc.
Altitude (elevation) is represented as distance from the center of the image,
where the center of the image represents vertical up from the point of view of
an observer located at the dish (zenith, which is usually not the physical
direction the dish is pointing) and the further away from the center a pixel
is, the closer to the horizon it is, down to a minimum altitude angle at the
edge of the circle.
"""
import argparse
from datetime import datetime
import logging
import os
import png
import sys
import time
import starlink_grpc
DEFAULT_OBSTRUCTED_COLOR = "FFFF0000"
DEFAULT_UNOBSTRUCTED_COLOR = "FFFFFFFF"
DEFAULT_NO_DATA_COLOR = "00000000"
DEFAULT_OBSTRUCTED_GREYSCALE = "FF00"
DEFAULT_UNOBSTRUCTED_GREYSCALE = "FFFF"
DEFAULT_NO_DATA_GREYSCALE = "0000"
LOOP_TIME_DEFAULT = 0
def loop_body(opts, context):
try:
snr_data = starlink_grpc.obstruction_map(context)
except starlink_grpc.GrpcError as e:
logging.error("Failed getting obstruction map data: %s", str(e))
return 1
def pixel_bytes(row):
for point in row:
if point > 1.0:
# shouldn't happen, but just in case...
point = 1.0
if point >= 0.0:
if opts.greyscale:
yield round(point * opts.unobstructed_color_g +
(1.0-point) * opts.obstructed_color_g)
else:
yield round(point * opts.unobstructed_color_r +
(1.0-point) * opts.obstructed_color_r)
yield round(point * opts.unobstructed_color_g +
(1.0-point) * opts.obstructed_color_g)
yield round(point * opts.unobstructed_color_b +
(1.0-point) * opts.obstructed_color_b)
if not opts.no_alpha:
yield round(point * opts.unobstructed_color_a +
(1.0-point) * opts.obstructed_color_a)
else:
if opts.greyscale:
yield opts.no_data_color_g
else:
yield opts.no_data_color_r
yield opts.no_data_color_g
yield opts.no_data_color_b
if not opts.no_alpha:
yield opts.no_data_color_a
if opts.filename == "-":
# Open new stdout file to get binary mode
out_file = os.fdopen(sys.stdout.fileno(), "wb", closefd=False)
else:
now = int(time.time())
filename = opts.filename.replace("%u", str(now))
filename = filename.replace("%d",
datetime.utcfromtimestamp(now).strftime("%Y_%m_%d_%H_%M_%S"))
filename = filename.replace("%s", str(opts.sequence))
out_file = open(filename, "wb")
if not snr_data or not snr_data[0]:
logging.error("Invalid SNR map data: Zero-length")
return 1
writer = png.Writer(len(snr_data[0]),
len(snr_data),
alpha=(not opts.no_alpha),
greyscale=opts.greyscale)
writer.write(out_file, (bytes(pixel_bytes(row)) for row in snr_data))
out_file.close()
opts.sequence += 1
return 0
def parse_args():
parser = argparse.ArgumentParser(
description="Collect directional obstruction map data from a Starlink user terminal and "
"emit it as a PNG image")
parser.add_argument(
"filename",
help="The image file to write, or - to write to stdout; may be a template with the "
"following to be filled in per loop iteration: %%s for sequence number, %%d for UTC date "
"and time, %%u for seconds since Unix epoch.")
parser.add_argument(
"-o",
"--obstructed-color",
help="Color of obstructed areas, in RGB, ARGB, L, or AL hex notation, default: " +
DEFAULT_OBSTRUCTED_COLOR + " or " + DEFAULT_OBSTRUCTED_GREYSCALE)
parser.add_argument(
"-u",
"--unobstructed-color",
help="Color of unobstructed areas, in RGB, ARGB, L, or AL hex notation, default: " +
DEFAULT_UNOBSTRUCTED_COLOR + " or " + DEFAULT_UNOBSTRUCTED_GREYSCALE)
parser.add_argument(
"-n",
"--no-data-color",
help="Color of areas with no data, in RGB, ARGB, L, or AL hex notation, default: " +
DEFAULT_NO_DATA_COLOR + " or " + DEFAULT_NO_DATA_GREYSCALE)
parser.add_argument(
"-g",
"--greyscale",
action="store_true",
help="Emit a greyscale image instead of the default full color image; greyscale images "
"use L or AL hex notation for the color options")
parser.add_argument(
"-z",
"--no-alpha",
action="store_true",
help="Emit an image without alpha (transparency) channel instead of the default that "
"includes alpha channel")
parser.add_argument("-e",
"--target",
help="host:port of dish to query, default is the standard IP address "
"and port (192.168.100.1:9200)")
parser.add_argument("-t",
"--loop-interval",
type=float,
default=float(LOOP_TIME_DEFAULT),
help="Loop interval in seconds or 0 for no loop, default: " +
str(LOOP_TIME_DEFAULT))
parser.add_argument("-s",
"--sequence",
type=int,
default=1,
help="Starting sequence number for templatized filenames, default: 1")
opts = parser.parse_args()
if opts.obstructed_color is None:
opts.obstructed_color = DEFAULT_OBSTRUCTED_GREYSCALE if opts.greyscale else DEFAULT_OBSTRUCTED_COLOR
if opts.unobstructed_color is None:
opts.unobstructed_color = DEFAULT_UNOBSTRUCTED_GREYSCALE if opts.greyscale else DEFAULT_UNOBSTRUCTED_COLOR
if opts.no_data_color is None:
opts.no_data_color = DEFAULT_NO_DATA_GREYSCALE if opts.greyscale else DEFAULT_NO_DATA_COLOR
for option in ("obstructed_color", "unobstructed_color", "no_data_color"):
try:
color = int(getattr(opts, option), 16)
if opts.greyscale:
setattr(opts, option + "_a", (color >> 8) & 255)
setattr(opts, option + "_g", color & 255)
else:
setattr(opts, option + "_a", (color >> 24) & 255)
setattr(opts, option + "_r", (color >> 16) & 255)
setattr(opts, option + "_g", (color >> 8) & 255)
setattr(opts, option + "_b", color & 255)
except ValueError:
logging.error("Invalid hex number for %s", option)
sys.exit(1)
return opts
def main():
opts = parse_args()
logging.basicConfig(format="%(levelname)s: %(message)s")
context = starlink_grpc.ChannelContext(target=opts.target)
try:
next_loop = time.monotonic()
while True:
rc = loop_body(opts, context)
if opts.loop_interval > 0.0:
now = time.monotonic()
next_loop = max(next_loop + opts.loop_interval, now)
time.sleep(next_loop - now)
else:
break
finally:
context.close()
sys.exit(rc)
if __name__ == "__main__":
main()