-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathrun_boat.py
160 lines (119 loc) · 5.19 KB
/
run_boat.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
from utils.file_io import read_netcdf
from utils.comms import Comms
from module_control_unit.nav import ModuleNavigation
from module_control_unit.driver import ModuleDrivers
from module_control_unit.telemetry import ModuleTelemetry
from threading import Thread
import yaml
import sys
import time
import warnings
import logging
# Set logger
logger = logging.getLogger('boat_logger')
logger.setLevel(logging.INFO)
handler = logging.FileHandler('./logs/boat.log')
formatter = logging.Formatter('%(asctime)s %(levelname)-8s %(message)s')
logger.setLevel(logging.INFO)
handler.setFormatter(formatter)
logger.addHandler(handler)
def main(config):
# Read the initial currents from file
latlons, currents = read_netcdf(config['init_map'])
# Initialize command modules
nav = ModuleNavigation(config['nav_timestep'], latlons, currents)
telem = ModuleTelemetry()
drivers = ModuleDrivers()
drivers.set_propulsion(config['init_propulstion'])
# Initialize communications
ground_station = config['gs_addr']
port_list = config['ports']
path_comms = Comms(ground_station, port_list['nav_path'])
telem_comms = Comms(ground_station, port_list['telem'])
gps_comms = Comms(ground_station, port_list['gps'])
instr_az_comms = Comms(ground_station, port_list['manual_az'])
instr_prop_comms = Comms(ground_station, port_list['manual_prop'])
new_map_comms = Comms(ground_station, port_list['maps'])
# Start autonomous navigation
Thread(target=auto_pilot, args=(config['init_pos'],
config['destination'],
nav,
telem,
drivers,
path_comms,
gps_comms,
config['nav_timestep'],
config['sim_speedup_factor'])).start()
# Send telemetry and gps
Thread(target=telemetry_reports, args=(telem, telem_comms,
config['telem_freq'], config['sim_speedup_factor'])).start()
# Listen for new maps and instructions
Thread(target=recieve_manual_directions, args=(drivers, instr_az_comms)).start()
Thread(target=recieve_manual_propulsion, args=(drivers, instr_prop_comms)).start()
Thread(target=recieve_new_maps, args=(nav, new_map_comms)).start()
# Logging
logging.info("")
def auto_pilot(init_pos, dest, nav, telem, drivers, path_trans, gps_trans, timestep, sim_speedup_factor=1):
global logger
pos = init_pos
while pos != dest:
# Calculate the shortest path given our position and destination
az, path = nav.get_next_azimuth(pos, dest)
logger.info(f"Path recalculated.")
# Set the bearing of the craft
bearing = drivers.set_azimuth(az)
logger.info(f"Azimuth set: {bearing:.2f}")
# Get the ocean current at our position
current = nav.get_current(pos)
logger.info(f"Ocean current at position acquired.")
# Get module's propulsion
prop = drivers.get_propulsion()
logger.info(f"Propulsion: {prop}")
# Next position is determined by our position, speed, dir, ocean current, and time elapsed
pos = telem.get_position(pos, current, az, prop, timestep)
logger.info(f"GPS updated. GPS transmitted: {pos[0]:.4f} N {pos[1]:.4f} E")
# Transmit data
gps_trans.send(pos)
path_trans.send(path)
logger.info("Path and GPS sent.")
time.sleep(timestep / sim_speedup_factor)
def telemetry_reports(telem, comms, freq, sim_speedup_factor):
global logger
while True:
# Sample the temperature of the module (simulated)
data = telem.get_temp()
# Report to ground station
comms.send(data)
logger.info(f"Telemetry sent: {data:.4f}")
time.sleep(freq / sim_speedup_factor)
def recieve_manual_directions(drivers, comms):
global logger
while True:
# Wait for manual directions from the ground station
packet = comms.recv()
az, duration = packet[0], packet[1]
# Force set drivers for manual direction
drivers.set_azimuth(az, force_duration=duration)
logger.info(f"Manual direction command received. Azimuth set to {az} for duration {duration}.")
def recieve_manual_propulsion(drivers, comms):
global logger
while True:
# Wait for manual propulsion from the ground station
packet = comms.recv()
speed, duration = packet[0], packet[1]
# Force set drivers for manual propulsion
drivers.set_propulsion(speed, force_duration=duration)
logger.info(f"Manual propulsion command received. Speed set to {speed} for duration {duration}.")
def recieve_new_maps(nav, comms):
while True:
# Wait for new currents map from ground station
latlons, currents = comms.recv()
logger.info(f"New current map receieved.")
# Update currents map
nav.set_currents_map(latlons, currents)
if __name__ == '__main__':
warnings.filterwarnings("ignore")
# Load configuration YAML
with open(sys.argv[1]) as f:
config = yaml.load(f)
main(config)