-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathwatchdog.py
executable file
·197 lines (169 loc) · 6.97 KB
/
watchdog.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
#! /usr/bin/python3 -u
################################################################################
#
# WatchDog.py - Rev 1.0
# Copyright (C) 2021-4 by Joseph B. Attili, aa2il AT arrl DOT net
#
# Watchdog timer for pyKeyer.
#
################################################################################
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
################################################################################
import threading
from utilities import freq2band,error_trap,show_hex,show_ascii
from tcp_server import open_udp_client,BANDMAP_UDP_PORT
from udp import UDP_msg_handler
from rig_io import SetSubDial
from tkinter import END
import time
################################################################################
VERBOSITY=0
################################################################################
class WatchDog:
def __init__(self,P,msec):
print('Watch Dog Starting ....')
self.P = P
self.dt =.001*msec
P.SHUTDOWN = False
# Kick off watchdog monito
P.Timer = threading.Timer(self.dt, self.Monitor)
P.Timer.daemon=True # This prevents timer thread from blocking shutdown
P.Timer.start()
# Function to monitor udp connections
def check_udp_clients(P):
print('CHECK UDP CLIENTS:',P.UDP_SERVER,P.udp_client,P.udp_ntries)
if P.UDP_SERVER != None:
# Client to BANDMAP
if not P.udp_client:
P.udp_ntries+=1
if P.udp_ntries<=1000:
P.udp_client=open_udp_client(P,BANDMAP_UDP_PORT,
UDP_msg_handler)
if P.udp_client:
print('CHECK UDP CLIENTS: Opened connection to BANDMAP - port=',
BANDMAP_UDP_PORT)
P.udp_ntries=0
else:
print('CHECK UDP CLIENTS: Unable to open UDP client - too many attempts',
P.udp_ntries)
def Monitor(self):
P=self.P
# Check if another thread shut down - this isn't complete yet
if P.SHUTDOWN:
if P.CAPTURE:
P.rec.stop_recording()
P.rec.close()
if P.Timer:
print('WatchDog - Cancelling timer ...')
P.Timer.cancel()
P.WATCHDOG = False
P.Timer=None
print('WatchDog - Shut down.')
# Monitor memory usage
if P.MEM:
P.MEM.take_snapshot()
# Check if we are transmitting
if P.DIGI and P.sock_xml.tx_evt.is_set():
print('Watch Dog - PTT is on ...', P.sock.connection)
try:
state=P.sock_xml.ptt(-1)
print('state=',state)
except:
error_trap('WATCHDOG: Unable to retrieve PTT state',1)
# Read radio status
# NEED TO MOVE ANYTHING THAT MESSES WITH THE GUI TO gui.gui_updater!!!!!!!!!!!!!!
if P.sock.connection!='NONE' and not P.sock.tx_evt.is_set():
print('Watch Dog - reading radio status ...', P.sock.connection)
P.FREQ = P.sock.get_freq()
P.MODE = P.sock.get_mode()
P.BAND = freq2band(1e-6*P.FREQ)
# Let user adjust WPM from radio knob
if P.MODE in ['CW']:
if VERBOSITY>0:
print("WatchDog - Checking WPM ...")
wpm = P.sock.read_speed()
if wpm!=P.WPM and wpm>0:
print("WatchDog - Changing WPM to",wpm)
P.keyer.set_wpm(wpm)
P.WPM2 = wpm
# Keep an eye on the small knob
#if not (P.SO2V or P.SPLIT_VFOs):
#if P.sock.rig_type=='FLRIG' and not (P.SO2V or P.SPLIT_VFOs):
# SetSubDial(P.gui,'CLAR')
if P.SO2V or P.SPLIT_VFOs:
print('WATCHDOG: Need to check subdial ...')
P.gui.CHECK_DIAL = 2
elif P.gui.CHECK_DIAL>0:
print('WATCHDOG: Subdial checked ...')
SetSubDial(P.gui,'CLAR')
P.gui.CHECK_DIAL-=1
# Save program state to disk
#print("WatchDog - Dirty Dozen ...")
if P.DIRTY:
P.gui.SaveState()
# Compute QSO Rate
if VERBOSITY>0:
print("WatchDog - QSO Rate ...")
P.gui.qso_rate()
# Send out heart beat message - Cmd:Source:Msg
if P.UDP_SERVER and False:
print('WATCHDOG - Broadcasting heart beat ...')
P.udp_server.Broadcast('HeartBeat:Keyer:Thump Thump - kerr chunk')
# Open clients to BANDMAP and SDR
#check_udp_clients(P)
# Check if anything is going on with FLDIGI
if P.DIGI and not P.sock_xml.tx_evt.is_set():
print('WATCHDOG - Checking RX buff ...')
# Get any new decoded text from RX box and put it in the big text box
try:
txt=P.sock_xml.get_rx_buff()
except:
error_trap('WATCHDOG: Unable to get rx buffer',1)
txt=''
if len(txt)>0:
print('txt=',txt,'\tlen=',len(txt))
#print('\t',show_hex(txt))
print('\t',show_ascii(txt))
P.gui.txt.insert(END, txt)
P.gui.txt.see(END)
P.gui.root.update_idletasks()
# Read rotor position
if P.sock_rotor.connection!='NONE' or False:
gui2=P.gui.rotor_ctrl
pos=P.sock_rotor.get_position()
#print('pos:',pos)
if pos[0]==None:
pos[0]=gui2.azlcd1.get() # Temp
if pos[0]!=None:
if pos[0]>180:
pos[0]-=360
if pos[0]<-180:
pos[0]+=360
gui2.azlcd1.set(pos[0])
gui2.ellcd1.set(pos[1])
gui2.nominalBearing()
# Reset timer
if VERBOSITY>0:
print("WatchDog - Timer ...")
if not P.gui.Done and not P.SHUTDOWN and not P.Stopper.isSet():
if P.DIGI:
dt=0.2
else:
dt=1.0
P.Timer = threading.Timer(self.dt, self.Monitor)
P.Timer.setDaemon(True) # This prevents timer thread from blocking shutdown
P.Timer.start()
else:
P.Timer=None
print('... Watch Dog quit')
P.WATCHDOG = False