forked from semuconsulting/pynmeagps
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnmeapoller.py
151 lines (124 loc) · 4.28 KB
/
nmeapoller.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
"""
nmeapoller.py
This example illustrates how to read, write and display NMEA messages
"concurrently" using threads and queues. This represents a useful
generic pattern for many end user applications.
It implements two threads which run concurrently:
1) an I/O thread which continuously reads NMEA data from the
receiver and sends any queued outbound command or poll messages.
2) a process thread which processes parsed NMEA data - in this example
it simply prints the parsed data to the terminal.
NMEA data is passed between threads using queues.
Press CTRL-C to terminate.
FYI: Since Python implements a Global Interpreter Lock (GIL),
threads are not strictly concurrent, though this is of minor
practical consequence here.
Created on 07 Aug 2021
:author: semuadmin
:copyright: SEMU Consulting © 2021
:license: BSD 3-Clause
"""
# pylint: disable=invalid-name
from queue import Queue
from sys import platform
from threading import Event, Lock, Thread
from time import sleep
from serial import Serial
from pynmeagps import NMEA_MSGIDS, POLL, NMEAMessage, NMEAReader
def io_data(
stream: object,
nmr: NMEAReader,
readqueue: Queue,
sendqueue: Queue,
stop: Event,
):
"""
THREADED
Read and parse inbound NMEA data and place
raw and parsed data on queue.
Send any queued outbound messages to receiver.
"""
# pylint: disable=broad-exception-caught
while not stop.is_set():
try:
if stream.in_waiting:
(raw_data, parsed_data) = nmr.read()
if parsed_data:
readqueue.put((raw_data, parsed_data))
if not sendqueue.empty():
data = sendqueue.get(False)
if data is not None:
nmr.datastream.write(data.serialize())
sendqueue.task_done()
except Exception as err:
print(f"\n\nSomething went wrong {err}\n\n")
continue
def process_data(queue: Queue, stop: Event):
"""
THREADED
Get NMEA data from queue and display.
"""
while not stop.is_set():
if queue.empty() is False:
(_, parsed) = queue.get()
print(parsed)
queue.task_done()
if __name__ == "__main__":
# set port, baudrate and timeout to suit your device configuration
if platform == "win32": # Windows
port = "COM13"
elif platform == "darwin": # MacOS
port = "/dev/tty.usbmodem1101"
else: # Linux
port = "/dev/ttyACM1"
baudrate = 38400
timeout = 0.1
with Serial(port, baudrate, timeout=timeout) as serial_stream:
nmeareader = NMEAReader(serial_stream)
serial_lock = Lock()
read_queue = Queue()
send_queue = Queue()
stop_event = Event()
io_thread = Thread(
target=io_data,
args=(
serial_stream,
nmeareader,
read_queue,
send_queue,
stop_event,
),
)
process_thread = Thread(
target=process_data,
args=(
read_queue,
stop_event,
),
)
print("\nStarting handler threads. Press Ctrl-C to terminate...")
io_thread.start()
process_thread.start()
# loop until user presses Ctrl-C
while not stop_event.is_set():
try:
# DO STUFF IN THE BACKGROUND...
# Poll for each NMEA sentence type.
# NB: Your receiver may not support all types. It will return a
# GNTXT "NMEA unknown msg" response for any types it doesn't support.
for msgid in NMEA_MSGIDS:
print(
f"\nSending a GNQ message to poll for an {msgid} response...\n"
)
msg = NMEAMessage("EI", "GNQ", POLL, msgId=msgid)
send_queue.put(msg)
sleep(1)
sleep(3)
stop_event.set()
except KeyboardInterrupt: # capture Ctrl-C
print("\n\nTerminated by user.")
stop_event.set()
print("\nStop signal set. Waiting for threads to complete...")
io_thread.join()
process_thread.join()
print("\nProcessing complete")