Skip to content

Commit e07f602

Browse files
committed
#None: Update v0.4.20
1 parent 2918f8c commit e07f602

File tree

26 files changed

+571
-158
lines changed

26 files changed

+571
-158
lines changed

RELEASE_NOTES

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,19 @@ in previous functions. This helps with allocating time slots. It is garantied
189189
that all the targets in the list can be observed. However, the time slots are currently
190190
not send with observation programs.
191191

192+
## 0.4.20
193+
194+
- Added comment to `CfgOption`
195+
- Added comments for all options
196+
- Added QueueHub to allow use of the Telegram bot and the sockets at the same time
197+
- Added semaphore to limit number of consumer tasks
198+
- Added test for EP alert parser
199+
- Fixed /visplot command of the Telegram bot
200+
- Fixed issue when a current configuration file update changes values of already
201+
- Improved format for distance in LVK alerts
202+
- Save configuration file with comments
203+
- Updated EP alert parser to use the id when possible
204+
existing options
192205

193206

194207
# v0.3.0

aware/__init__.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,13 @@
1111
# ----------------------------------------------------------------------------
1212
"""
1313

14+
import asyncio
1415
import collections
1516
import traceback
1617
from types import ModuleType
18+
from ruamel.yaml import YAML
19+
20+
import uvloop
1721

1822
__modules__ = [
1923
"config",
@@ -119,6 +123,11 @@ def deep_update(src: dict, updates: dict):
119123
# src[k] is not a dict, nothing to merge, so just set it,
120124
# regardless if src[k] *was* a dict
121125
src[k] = v
126+
if src.get(k):
127+
if isinstance(src[k], tuple):
128+
src[k][1] = v[1]
129+
else:
130+
src[k] = v
122131
else:
123132
# note: updates[k] is a dict
124133
if k not in src:
@@ -127,7 +136,11 @@ def deep_update(src: dict, updates: dict):
127136
elif not isinstance(src[k], dict):
128137
# src[k] is not a dict, so just set it to updates[k],
129138
# overriding whatever it was
130-
src[k] = v
139+
if src.get(k):
140+
if isinstance(src[k], tuple):
141+
src[k][1] = v[1]
142+
else:
143+
src[k] = v
131144
else:
132145
# both src[k] and updates[k] are dicts, push them on the stack
133146
# to merge
@@ -140,3 +153,6 @@ def deep_update(src: dict, updates: dict):
140153
# # It is save to remove configuration here, because it is presented it config module
141154
# del cfg
142155
# del yaml_cfg
156+
157+
158+
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

aware/__main__.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,11 @@ def goodbye(start_time: float):
4747
"environment variable)",
4848
)
4949
def main(telegram: bool, sockserver: bool, mode: str, threads: int, dump: str):
50-
if sockserver and telegram:
51-
log.error(
52-
"Simultaneous execution of Telegram bot and socket server not supported"
53-
)
54-
return 1
50+
# if sockserver and telegram:
51+
# log.error(
52+
# "Simultaneous execution of Telegram bot and socket server not supported"
53+
# )
54+
# return 1
5555

5656
if dump:
5757
if not os.getenv("AWARE_CONFIG_FILE", ""):

aware/__version__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
__version__ = (0, 4, 19)
1+
__version__ = (0, 4, 20)
22
__strversion__ = "{}.{}.{}".format(__version__)

aware/alert/crossmatch.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,12 @@
1919

2020

2121
# Maximal date difference to match a pair of alerts
22-
max_date_diff_min = CfgOption("max_date_diff_min", 5.0, float)
22+
max_date_diff_min = CfgOption(
23+
"max_date_diff_min",
24+
5.0,
25+
float,
26+
comment="Maximal date difference [min] to match a pair of alerts",
27+
)
2328

2429

2530
def crossmatch_alerts(

aware/alert/plugins/ep.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
"""
88

99
from io import BytesIO, StringIO
10+
import numpy as np
1011
import orjson
1112
from astropy.time import Time
1213
from aware.logger import log
@@ -28,15 +29,26 @@ def parse_alert(
2829
except orjson.JSONDecodeError as e:
2930
log.error("Could not parse alert due to incorrect JSON encoding: %s", e)
3031
info = None
31-
else:
32+
33+
try:
3234
ra = msg_content["ra"]
3335
dec = msg_content["dec"]
3436
radius = msg_content["ra_dec_error"]
37+
snr = msg_content["image_snr"]
38+
39+
# Normalized so, at S/N = 5, importance is 90%
40+
importance = np.tanh(snr / 3.35)
41+
3542
trigger_date = Time(
3643
msg_content["trigger_time"], format="isot"
3744
).to_datetime()
38-
importance = 0.9
39-
event = f"J{ra:.3f}{dec:+.3f}"
45+
46+
# If there is no trigger id, then event name is derived from coordinates
47+
if "id" in msg_content:
48+
event = np.atleast_1d(msg_content["id"])[0]
49+
else:
50+
event = f"J{ra:.3f}{dec:+.3f}"
51+
4052
localization = CircularSkyMap(ra_center=ra, dec_center=dec, radius=radius)
4153
info = TargetInfo(
4254
localization=localization,
@@ -46,4 +58,10 @@ def parse_alert(
4658
trigger_date=trigger_date,
4759
importance=importance,
4860
)
61+
except Exception as e:
62+
if not isinstance(e, KeyboardInterrupt):
63+
raise
64+
else:
65+
log.error("Failed to parse EP alert: %s", e, exc_info=e)
66+
4967
return info

aware/alert/plugins/gbm.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,12 @@
1818

1919

2020
FERMI_GBM = "Fermi (GBM)"
21-
fermi_gbm_sys_error = CfgOption("fermi_gbm_sys_error", 2.0, float) # degrees
21+
fermi_gbm_sys_error = CfgOption(
22+
"fermi_gbm_sys_error",
23+
2.0,
24+
float,
25+
comment="Systematic error [deg] to be added to statistical one in quadrature",
26+
) # degrees
2227

2328

2429
def parse_fermi_alert(

aware/alert/plugins/lvc.py

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,24 @@
3232
]
3333

3434

35-
has_ns_prob_thresh = CfgOption("has_ns_prob_thresh", 0.667, float)
36-
bbh_skymap_max_area = CfgOption("bbh_skymap_max_area", 30, float)
37-
far_threshold_global = CfgOption("far_threshold_global", 10, float)
35+
has_ns_prob_thresh = CfgOption(
36+
"has_ns_prob_thresh",
37+
0.667,
38+
float,
39+
comment="The probability threshold to consider LVK event to be either from NSBH or BNS merger",
40+
)
41+
bbh_skymap_max_area = CfgOption(
42+
"bbh_skymap_max_area",
43+
30,
44+
float,
45+
comment="The maximum area of BBH merger to be considered for planning",
46+
)
47+
far_threshold_global = CfgOption(
48+
"far_threshold_global",
49+
10,
50+
float,
51+
comment="The FAR threshold for an event to be considered as not a real event",
52+
)
3853

3954

4055
HZ_to_reciprocal_yr = 31557600
@@ -260,7 +275,9 @@ def parse_lvc_alert(
260275
if meta:
261276
# distmu_str = render_number_as_rich_utf8(distmu.value, 3)
262277
# distsigma_str = render_number_as_rich_utf8(2 * distsigma.value, 3)
263-
description += f"Distance: {distmu.value:.1f} \u00B1 {distsigma.value:.1f} Mpc (2\u03C3)"
278+
description += (
279+
f"Distance: {distmu.value:.1f} \u00B1 {distsigma.value:.1f} Mpc (2\u03C3)"
280+
)
264281
else:
265282
description += "Skymap is not available at the moment."
266283

aware/app.py

Lines changed: 92 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,80 @@
88
import matplotlib as mpl
99
import uvloop
1010

11+
from aware.logger import log
1112
from aware.service import ServiceFactory
13+
from aiomisc import Service
1214

1315
from .config import dev
1416
from .sql.models import create_session
1517
from .sql.util import create_alert_tables
18+
from aware.data import AlertMessage, DataPackage
19+
from collections import deque
1620

1721

18-
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
22+
class QueueHub(Service):
23+
"""
24+
QueueHub is performing distribution of items from a main queue between
25+
queues owned by other clients. It is useful because when a multiple clients use
26+
the same queue, the one which gets an item first, actually removes it from the
27+
queue.
28+
"""
29+
30+
def __init__(
31+
self,
32+
master_queue: asyncio.Queue[AlertMessage | DataPackage],
33+
queues: list[asyncio.Queue[AlertMessage | DataPackage]] = None,
34+
**kwargs,
35+
):
36+
self.master_queue = master_queue
37+
self.queues = deque(queues or [])
38+
39+
async def distribute(self):
40+
"""
41+
Perform actual distribution of items between master queue and other queues
42+
43+
Raises
44+
------
45+
ValueError
46+
if the queue is empty
47+
"""
48+
if self.queues is None:
49+
raise ValueError(
50+
"No queues to distribute; use add_queue method to add queues"
51+
)
52+
53+
while self.queues:
54+
try:
55+
item = await self.master_queue.get()
56+
except asyncio.QueueEmpty as e:
57+
log.error(
58+
"Error occurred while sending item to hub memebers", exc_info=e
59+
)
60+
else:
61+
for q in self.queues:
62+
await q.put(item)
63+
64+
async def start(self):
65+
"""Start the hub."""
66+
await self.distribute()
67+
68+
async def add_queue(self, q: asyncio.Queue):
69+
"""Add a queue to the hub
70+
71+
Parameters
72+
----------
73+
q : asyncio.Queue
74+
a queue to add
75+
"""
76+
async with asyncio.Lock():
77+
self.queues.append(q)
78+
79+
async def cleanup(self):
80+
"""
81+
Cleanup the hub
82+
"""
83+
async with asyncio.Lock():
84+
self.queues.clear()
1985

2086

2187
class Application:
@@ -63,16 +129,37 @@ def _create_services(self):
63129
self.services = []
64130
factory = ServiceFactory(self._que)
65131

132+
# if self.mode == "test":
133+
# self.services.append(factory.create_test_consumer())
134+
# elif self.mode == "prod":
135+
# self.services.append(factory.create_prod_consumer())
136+
137+
# if self.run_telegram:
138+
# self.services.append(factory.create_telegram_bot())
139+
140+
# if self.run_socket_server:
141+
# self.services.append(factory.create_socket_server())
142+
143+
queues = []
144+
145+
# Consumer fills the master queue with data
66146
if self.mode == "test":
67-
self.services.append(factory.create_test_consumer())
147+
self.services.append(factory.create_test_consumer(queue=self._que))
68148
elif self.mode == "prod":
69-
self.services.append(factory.create_prod_consumer())
149+
self.services.append(factory.create_prod_consumer(queue=self._que))
70150

71151
if self.run_telegram:
72-
self.services.append(factory.create_telegram_bot())
152+
tg_que = asyncio.Queue()
153+
self.services.append(factory.create_telegram_bot(queue=tg_que))
154+
queues.append(tg_que)
73155

74156
if self.run_socket_server:
75-
self.services.append(factory.create_socket_server())
157+
socket_que = asyncio.Queue()
158+
self.services.append(factory.create_socket_server(queue=socket_que))
159+
queues.append(socket_que)
160+
161+
hub = QueueHub(self._que, queues)
162+
self.services.insert(0, hub)
76163

77164
def run(self) -> float:
78165
engine, _ = create_session()

aware/cache.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,16 @@
1818
__all__ = ["cache_value"]
1919

2020

21-
cache_dir = CfgOption("cache_dir", ".cache", str)
22-
timeout = CfgOption("timeout", 60, float)
23-
expire = CfgOption("expire", 86400, float)
21+
cache_dir = CfgOption("cache_dir", ".cache", str, comment="Cache directory")
22+
timeout = CfgOption(
23+
"timeout",
24+
60,
25+
float,
26+
comment="Maximum number of seconds to wait for retrieval of a cached value",
27+
)
28+
expire = CfgOption(
29+
"expire", 86400, float, comment="Cached value expiration time in seconds"
30+
)
2431

2532

2633
def __access_cache():
@@ -40,10 +47,11 @@ def set(self, *args, **kwargs): ...
4047

4148
def get(self, *args, **kwargs): ...
4249

43-
def __enter__(self): return self
50+
def __enter__(self):
51+
return self
4452

4553
def __exit__(self, *args, **kwargs): ...
46-
54+
4755
cache = EmptyCache()
4856

4957
return cache

0 commit comments

Comments
 (0)