Skip to content

Commit

Permalink
ACU: Add feeds for Sun info and UDP stream quality (#570)
Browse files Browse the repository at this point in the history
* ACU sun: add a feed with sun info (position, state of escape)

* ACU: feed for UDP data health variable

* ACU: remove useless "from_reactor" args
  • Loading branch information
mhasself authored Dec 4, 2023
1 parent 0fd9ec9 commit b851382
Showing 1 changed file with 92 additions and 8 deletions.
100 changes: 92 additions & 8 deletions socs/agents/acu/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import yaml
from autobahn.twisted.util import sleep as dsleep
from ocs import ocs_agent, site_config
from ocs.ocs_twisted import TimeoutLock
from ocs.ocs_twisted import Pacemaker, TimeoutLock
from soaculib.twisted_backend import TwistedHttpBackend
from twisted.internet import protocol, reactor, threads
from twisted.internet.defer import DeferredList, inlineCallbacks
Expand Down Expand Up @@ -176,6 +176,14 @@ def __init__(self, agent, acu_config='guess', exercise_plan=None,

tclient._HTTP11ClientFactory.noisy = False

# Structure for the broadcast process to communicate state to
# the monitor process, for a data quality feed.
self._broadcast_qual = {
'timestamp': time.time(),
'active': False,
'time_offset': 0,
}

self.acu_control = aculib.AcuControl(
acu_config, backend=TwistedHttpBackend(persistent=False))
self.acu_read = aculib.AcuControl(
Expand Down Expand Up @@ -243,6 +251,14 @@ def __init__(self, agent, acu_config='guess', exercise_plan=None,
record=True,
agg_params=basic_agg_params,
buffer_time=1)
self.agent.register_feed('sun',
record=True,
agg_params=basic_agg_params,
buffer_time=0)
self.agent.register_feed('data_qual',
record=True,
agg_params=basic_agg_params,
buffer_time=0)
agent.register_task('go_to',
self.go_to,
blocking=False,
Expand Down Expand Up @@ -496,6 +512,8 @@ def monitor(self, session, params):
'Status3rdAxis': j2,
'StatusResponseRate': n_ok / (query_t - report_t)})

qual_pacer = Pacemaker(.1)

was_remote = False
last_resp_rate = None
data_blocks = {}
Expand All @@ -518,6 +536,25 @@ def monitor(self, session, params):
n_ok = 0
session.data.update({'StatusResponseRate': resp_rate})

if qual_pacer.next_sample <= time.time():
# Publish UDP data health feed
qual_pacer.sleep() # should be instantaneous, just update counters
bq = self._broadcast_qual
bq_offset = bq['time_offset']
if bq_offset is None:
bq_offset = 0.
bq_ok = (bq['active'] and (now - bq['timestamp'] < 5)
and abs(bq_offset) < 1.)
block = {
'timestamp': time.time(),
'block_name': 'qual0',
'data': {
'Broadcast_stream_ok': int(bq_ok),
'Broadcast_recv_offset': bq_offset,
}
}
self.agent.publish_to_feed('data_qual', block)

try:
j = yield self.acu_read.http.Values(self.acu8100)
if self.acu3rdaxis:
Expand Down Expand Up @@ -739,6 +776,10 @@ def broadcast(self, session, params):
FMT = self.udp_schema['format']
FMT_LEN = struct.calcsize(FMT)
UDP_PORT = self.udp['port']

# The udp_data list is used as a queue; it contains
# struct-unpacked samples from the UDP stream in the form
# (time_received, data).
udp_data = []
fields = self.udp_schema['fields']
session.data = {}
Expand All @@ -748,11 +789,12 @@ def broadcast(self, session, params):

class MonitorUDP(protocol.DatagramProtocol):
def datagramReceived(self, data, src_addr):
now = time.time()
host, port = src_addr
offset = 0
while len(data) - offset >= FMT_LEN:
d = struct.unpack(FMT, data[offset:offset + FMT_LEN])
udp_data.append(d)
udp_data.append((now, d))
offset += FMT_LEN

handler = reactor.listenUDP(int(UDP_PORT), MonitorUDP())
Expand All @@ -761,21 +803,28 @@ def datagramReceived(self, data, src_addr):
for i in range(2, len(fields)):
influx_data[fields[i].replace(' ', '_') + '_bcast_influx'] = []

best_dt = None

active = True
last_packet_time = time.time()

while session.status in ['running']:
now = time.time()

if len(udp_data) >= 200:
if not active:
self.log.info('UDP packets are being received.')
active = True
last_packet_time = now
best_dt = None

process_data = udp_data[:200]
udp_data = udp_data[200:]
for d in process_data:
for recv_time, d in process_data:
data_ctime = sh.timecode(d[0] + d[1] / sh.DAY)
if best_dt is None or abs(recv_time - data_ctime) < best_dt:
best_dt = recv_time - data_ctime

self.data['broadcast']['Time'] = data_ctime
influx_data['Time_bcast_influx'].append(data_ctime)
for i in range(2, len(d)):
Expand All @@ -785,8 +834,7 @@ def datagramReceived(self, data, src_addr):
'block_name': 'ACU_broadcast',
'data': self.data['broadcast']
}
self.agent.publish_to_feed('acu_udp_stream',
acu_udp_stream, from_reactor=True)
self.agent.publish_to_feed('acu_udp_stream', acu_udp_stream)
influx_means = {}
for key in influx_data.keys():
influx_means[key] = np.mean(influx_data[key])
Expand All @@ -795,7 +843,7 @@ def datagramReceived(self, data, src_addr):
'block_name': 'ACU_bcast_influx',
'data': influx_means,
}
self.agent.publish_to_feed('acu_broadcast_influx', acu_broadcast_influx, from_reactor=True)
self.agent.publish_to_feed('acu_broadcast_influx', acu_broadcast_influx)
sd = {}
for ky in influx_means:
sd[ky.split('_bcast_influx')[0]] = influx_means[ky]
Expand All @@ -813,9 +861,13 @@ def datagramReceived(self, data, src_addr):
except Exception as err:
self.log.info('Exception while trying to enable stream: {err}', err=err)
next_reconfig += 60
yield dsleep(1)

yield dsleep(0.005)
self._broadcast_qual = {
'timestamp': now,
'active': active,
'time_offset': best_dt,
}
yield dsleep(.01)

handler.stopListening()
return True, 'Acquisition exited cleanly.'
Expand Down Expand Up @@ -2021,6 +2073,28 @@ def _notify_recomputed(result):
self.sun = new_sun
req_out = False

def lookup(keys, tree):
if isinstance(keys, str):
keys = [keys]
if len(keys) == 0:
if isinstance(tree, (bool, np.bool_)):
return int(tree)
return tree
return lookup(keys[1:], tree[keys[0]])

# Feed -- unpack some elements of session.data
feed_keys = {
'sun_avoidance': ('active_avoidance', int),
'sun_az': (('sun_pos', 'sun_azel', 0), float),
'sun_el': (('sun_pos', 'sun_azel', 1), float),
'sun_dist': (('sun_pos', 'sun_dist'), float),
'sun_safe_time': (('sun_pos', 'sun_safe_time'), float),
}
for k in ['warning_zone', 'danger_zone',
'escape_triggered', 'escape_active']:
feed_keys[f'sun_{k}'] = (('avoidance', k), int)
feed_pacer = Pacemaker(.1)

req_out = False
self.sun = None
last_panic = 0
Expand Down Expand Up @@ -2115,6 +2189,16 @@ def _notify_recomputed(result):
# Update session.
session.data.update(new_data)

# Publish -- only if we have the sun pos though..
if sun_is_real and safety_known and feed_pacer.next_sample <= time.time():
feed_pacer.sleep() # should be instantaneous, just update counters
block = {'timestamp': time.time(),
'block_name': 'sun0',
'data': {}}
for kshort, (keys, cast) in feed_keys.items():
block['data'][kshort] = cast(lookup(keys, new_data))
self.agent.publish_to_feed('sun', block)

yield dsleep(1)

@ocs_agent.param('reset', type=bool, default=None)
Expand Down

0 comments on commit b851382

Please sign in to comment.