Skip to content

Commit

Permalink
Alerts Page: bulk save alerts & URL; search params for both Alerts an…
Browse files Browse the repository at this point in the history
…d Archive Pages (#541)

* add url search params to both alerts and archive pages, allow bulk saving alerts just like archival sources, clean up the archive page's code

* improvements to the archive (DR) handler to search by specific LC ID

* post_alert adds photometry from all programids, since they are restricted by the user streams later on anyway

* pin skyportal to 26937771b319c4c709910b9bd047a29d8d208135
  • Loading branch information
Theodlz authored May 17, 2024
1 parent ed24eb1 commit 24225e4
Show file tree
Hide file tree
Showing 9 changed files with 1,559 additions and 1,034 deletions.
150 changes: 75 additions & 75 deletions extensions/skyportal/skyportal/handlers/api/alert.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

from ...models import (
Group,
GroupStream,
Instrument,
Obj,
Source,
Expand Down Expand Up @@ -171,20 +170,21 @@ def post_alert(
Object ID to save to. Defaults to object_id if None. Useful to import ZTF alerts data to other sources.
"""

photometry_ids = []
user = session.scalar(sa.select(User).where(User.id == user_id))

if program_id_selector is None:
# allow access to public data only by default
program_id_selector = {1}
if user.is_admin:
program_id_selector = [1, 2, 3]
else:
# allow access to public data only by default
program_id_selector = {1}

# using self.Session() should attach the
# associated_user_object to the current session
# so it can lazy load things like streams
for stream in user.streams:
if "ztf" in stream.name.lower():
program_id_selector.update(set(stream.altdata.get("selector", [])))
for stream in user.streams:
if "ztf" in stream.name.lower():
program_id_selector.update(set(stream.altdata.get("selector", [])))

program_id_selector = list(program_id_selector)
program_id_selector = list(program_id_selector)

save_to_diff_object = False
if obj_id_to_save is None:
Expand Down Expand Up @@ -220,7 +220,12 @@ def post_alert(
"cond": {
"$in": [
"$$item.programid",
program_id_selector,
[
1,
2,
3,
], # fetch photometry from all datastreams
# the data access will be enforced using the user's streams
]
},
}
Expand Down Expand Up @@ -444,71 +449,49 @@ def post_alert(
mask_good_diffmaglim = df["diffmaglim"] > 0
df = df.loc[mask_good_diffmaglim]

# get group stream access and map it to ZTF alert program ids
group_stream_access = []
for group in groups:
group_stream_subquery = (
GroupStream.select(user)
.where(GroupStream.group_id == group.id)
.subquery()
)
group_streams = session.scalars(
Stream.select(user).join(
group_stream_subquery,
Stream.id == group_stream_subquery.c.stream_id,
)
).all()
if group_streams is None:
group_streams = []

group_stream_selector = {1}

for stream in group_streams:
if "ztf" in stream.name.lower():
group_stream_selector.update(
set(stream.altdata.get("selector", []))
)
photometry = {
"obj_id": obj_id_to_save,
"instrument_id": instrument.id,
"mjd": df["mjd"].tolist(),
"mag": df["magpsf"].tolist(),
"magerr": df["sigmapsf"].tolist(),
"limiting_mag": df["diffmaglim"].tolist(),
"magsys": df["magsys"].tolist(),
"filter": df["ztf_filter"].tolist(),
"ra": df["ra"].tolist(),
"dec": df["dec"].tolist(),
}

group_stream_access.append(
{"group_id": group.id, "permissions": list(group_stream_selector)}
)
ztf_program_id_to_stream_id = dict()
streams = session.scalars(Stream.select(session.user_or_token)).all()
if streams is None:
raise ValueError("Failed to get programid to stream_id mapping")
for stream in streams:
if stream.name == "ZTF Public":
ztf_program_id_to_stream_id[1] = stream.id
if stream.name == "ZTF Public+Partnership":
ztf_program_id_to_stream_id[2] = stream.id
if stream.name == "ZTF Public+Partnership+Caltech":
# programid=0 is engineering data
ztf_program_id_to_stream_id[0] = stream.id
ztf_program_id_to_stream_id[3] = stream.id

df["stream_ids"] = df["programid"].apply(
lambda x: ztf_program_id_to_stream_id[x]
)
photometry["stream_ids"] = df["stream_ids"].tolist()

# post data from different program_id's
photometry_ids = []
for pid in set(df.programid.unique()):
group_ids = [
gsa.get("group_id")
for gsa in group_stream_access
if pid in gsa.get("permissions", [1])
]

if len(group_ids) > 0:
pid_mask = df.programid == int(pid)

photometry = {
"obj_id": obj_id_to_save,
"group_ids": group_ids,
"instrument_id": instrument.id,
"mjd": df.loc[pid_mask, "mjd"].tolist(),
"mag": df.loc[pid_mask, "magpsf"].tolist(),
"magerr": df.loc[pid_mask, "sigmapsf"].tolist(),
"limiting_mag": df.loc[pid_mask, "diffmaglim"].tolist(),
"magsys": df.loc[pid_mask, "magsys"].tolist(),
"filter": df.loc[pid_mask, "ztf_filter"].tolist(),
"ra": df.loc[pid_mask, "ra"].tolist(),
"dec": df.loc[pid_mask, "dec"].tolist(),
}
# remove the datapoints where the stream_id is not defined
mask_good_stream_id = df["stream_ids"].apply(lambda x: x is not None)
df = df.loc[mask_good_stream_id]

if len(photometry.get("mag", ())) > 0:
try:
photometry_ids_tmp, _ = add_external_photometry(
photometry, user, duplicates="update"
)
photometry_ids = photometry_ids + photometry_ids_tmp
except Exception:
log(
f"Failed to post photometry of {object_id} to group_ids {group_ids}"
)
if len(photometry.get("mag", ())) > 0:
try:
photometry_ids, _ = add_external_photometry(photometry, user)
except Exception as e:
log(
f"Failed to post photometry of {object_id} to group_ids {group_ids}: {e}",
)
# post cutouts
used_latest = False
thumbnail_ids = []
Expand Down Expand Up @@ -552,10 +535,14 @@ def post_alert(
"query": {
"catalog": "ZTF_alerts",
"filter": {
"candid": candid,
"candidate.programid": {"$in": program_id_selector},
},
"projection": {"_id": 0, "objectId": 1, f"cutout{ztftype}": 1},
"projection": {
"_id": 0,
"objectId": 1,
"candidate.jd": 1,
f"cutout{ztftype}": 1,
},
},
"kwargs": {
"limit": 1,
Expand All @@ -565,13 +552,26 @@ def post_alert(
if response.get("default").get("status", "error") == "success":
cutout = response.get("default").get("data", list(dict()))
if len(cutout) > 0:
cutout = sorted(
cutout, key=lambda x: x["candidate"]["jd"], reverse=True
)
cutout = cutout[0]
used_latest = True
else:
cutout = dict()
else:
cutout = dict()

if len(cutout.keys()) == 0:
log(
f"Failed to fetch cutout for {object_id} | {candid} | {ttype} | {ztftype}"
)
if thumbnails_only:
raise ValueError(
f"Failed to fetch cutout for {object_id} | {candid} | {ttype} | {ztftype}"
)
continue

try:
thumb = make_thumbnail(cutout, ttype, ztftype)
if thumb is not None:
Expand Down
Loading

0 comments on commit 24225e4

Please sign in to comment.