Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SST1RSoXSDB data browser widget and associated tools #137

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
245 changes: 222 additions & 23 deletions src/PyHyperScattering/SST1RSoXSDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class SST1RSoXSDB:
}
md_secondary_lookup = {
"energy": "en_monoen_setpoint",
"exposure": "Small Angle CCD Detector_cam_acquire_time",
}

def __init__(
Expand Down Expand Up @@ -117,6 +118,8 @@ def __init__(
self.exposure_offset = exposure_offset
self.use_precise_positions = use_precise_positions
self.suppress_time_dimension = suppress_time_dimension
self.catalog_df = None
self.catalog_df_kwargs = None

# def loadFileSeries(self,basepath):
# try:
Expand Down Expand Up @@ -162,6 +165,85 @@ def summarize_run(self, *args, **kwargs):
)
return self.searchCatalog(*args, **kwargs)

def browseCatalog(self,force_refresh = False,**kwargs):
"""
Browse the catalog.

Args:
**kwargs: passed through to searchCatalog, and bounds the set of runs fetched/displayed.

Returns:
result (obj): an ipyaggrid instance to browse the catalog.

"""
from ipyaggrid import Grid
if self.catalog_df is None or self.catalog_df_kwargs != kwargs or force_refresh:
self.catalog_df = self.searchCatalog(**kwargs)
self.catalog_df_kwargs = kwargs
else:
print(f'Not updating stored dataframe with kwargs {kwargs}')
column_names = []

pretty_names = {
"scan_id": "Scan ID",
"start_time": "Start Time",
"cycle": "Cycle",
"institution": "Institution",
"project": "Project",
"sample_name": "Sample Name",
"sample_id": "Sample ID",
"plan": "Plan Name",
"detector": "Detector",
"polarization": "Polarization",
"exit_status": "Exit Status",
"num_Images": "# Imgs/Pts",
}
additional_options = {
"scan_id": {"width": 150, "sortable": True, "sort": "desc", "lockPosition": "left"},
"institution": {"width": 125},
"project": {"width": 125},
"cycle": {"width": 125},
"exit_status": {"width": 125},
"num_Images": {"width": 125},
"plan": {"filter":"agMultiColumnFilter","filters":['textFilter','setFilter']},
"project": {"filter":"agMultiColumnFilter","filters":['textFilter','setFilter']},
"sample_name":{"filter":"agMultiColumnFilter","filters":['textFilter','setFilter']},
}

for field in self.catalog_df.columns:
col_config = {'field': field}
if field in pretty_names:
col_config['headerName'] = pretty_names[field]
if field in additional_options:
col_config.update(additional_options[field])
column_names.append(col_config)


grid = Grid(
grid_data=self.catalog_df,
grid_options={
"columnDefs": column_names,
"enableSorting": True,
"enableFilter": True,
"enableColResize": True,
"enableRangeSelection": True,
"rowSelection": "multiple",
# "pagination": True,
# "paginationPageSize": 100,
'defaultColDef':
{'sortable':True,
'resizable':True,
'floatingFilter':True,},
'autoSizeStrategy': {'type':'fitCellContents'},

},
quick_filter=True,
theme="ag-theme-bootstrap",
)


return grid

def searchCatalog(
self,
outputType: str = "default",
Expand All @@ -174,8 +256,10 @@ def searchCatalog(
sample: str = None,
sampleID: str = None,
plan: str = None,
scan_id: int = None,
userOutputs: list = [],
debugWarnings: bool = False,
existingCatalog: pd.DataFrame = None,
**kwargs,
) -> pd.DataFrame:
"""Search the Bluesky catalog for scans matching all provided keywords and return metadata as a dataframe.
Expand Down Expand Up @@ -218,6 +302,7 @@ def searchCatalog(
plan (str, optional): Measurement Plan, case-insensitive, regex search,
e.g., "Full" matches "full_carbon_scan_nd", "full_fluorine_scan_nd"
e.g., "carbon|oxygen|fluorine" matches carbon OR oxygen OR fluorine scans
scan_id (int, optional): Scan ID, exact numeric match, e.g., 12345
**kwargs: Additional search terms can be provided as keyword args and will further filter
the catalog Valid input follows metadataLabel='searchTerm' or metadataLavel = ['searchTerm','matchType'].
Metadata labels must match an entry in the 'start' dictionary of the catalog. Supported match types are
Expand All @@ -237,6 +322,7 @@ def searchCatalog(
r'catalog.stop["num_events"]']
e.g., userOutputs = [["Exposure Multiplier","exptime", r'catalog.start'], ["Stop Time","time",r'catalog.stop']]
debugWarnings (bool, optional): if True, raises a warning with debugging information whenever a key can't be found.
existingCatalog (pd.Dataframe, optional): if provided, results with scan_id that appear in this dataframe and equal number of points will not be re-downloaded.
Returns:
Pandas dataframe containing the results of the search, or an empty dataframe if the search fails
"""
Expand All @@ -256,6 +342,7 @@ def searchCatalog(
["sample_name", sample, "case-insensitive"],
["sample_id", sampleID, "case-insensitive"],
["plan_name", plan, "case-insensitive"],
["scan_id", scan_id, "numeric"],
]

# Pull any user-provided search terms
Expand All @@ -277,22 +364,20 @@ def searchCatalog(
# combine the lists of lists
fullSearchList = defaultSearchDetails + userSearchList

df_SearchDet = pd.DataFrame(
fullSearchList, columns=["Metadata field:", "User input:", "Search scheme:"]
)
# df_SearchDet = pd.DataFrame(
# fullSearchList, columns=["Metadata field:", "User input:", "Search scheme:"]
# )

# Iterate through search terms sequentially, reducing the size of the catalog based on successful matches

reducedCatalog = bsCatalog
for _, searchSeries in tqdm(
df_SearchDet.iterrows(), total=df_SearchDet.shape[0], desc="Running catalog search..."
):
for searchSeries in fullSearchList:
# Skip arguments with value None, and quits if the catalog was reduced to 0 elements
if (searchSeries[1] is not None) and (len(reducedCatalog) > 0):
# For numeric entries, do Key equality
if "numeric" in str(searchSeries.iloc[2]):
if "numeric" in str(searchSeries[2]):
reducedCatalog = reducedCatalog.search(
Key(searchSeries.iloc[0]) == float(searchSeries.iloc[1])
Key(searchSeries[0]) == float(searchSeries[1])
)

else: # Build regex search string
Expand All @@ -302,16 +387,16 @@ def searchCatalog(
# Regex cheatsheet:
# (?i) is case insensitive
# ^_$ forces exact match to _, ^ anchors the start, $ anchors the end
if "case-insensitive" in str(searchSeries.iloc[2]):
if "case-insensitive" in str(searchSeries[2]):
reg_prefix += "(?i)"
if "exact" in searchSeries.iloc[2]:
if "exact" in searchSeries[2]:
reg_prefix += "^"
reg_postfix += "$"

regexString = reg_prefix + str(searchSeries.iloc[1]) + reg_postfix
regexString = reg_prefix + str(searchSeries[1]) + reg_postfix

# Search/reduce the catalog
reducedCatalog = reducedCatalog.search(Regex(searchSeries.iloc[0], regexString))
reducedCatalog = reducedCatalog.search(Regex(searchSeries[0], regexString))

# If a match fails, notify the user which search parameter yielded 0 results
if len(reducedCatalog) == 0:
Expand All @@ -337,7 +422,6 @@ def searchCatalog(
# List elements are [Output Column Title, Bluesky Metadata Code, Metadata Source location, Applicable Output flag]
outputValueLibrary = [
["scan_id", "scan_id", r"catalog.start", "default"],
["uid", "uid", r"catalog.start", "ext_bio"],
["start_time", "time", r"catalog.start", "default"],
["cycle", "cycle", r"catalog.start", "default"],
["saf", "SAF", r"catalog.start", "ext_bio"],
Expand All @@ -353,6 +437,7 @@ def searchCatalog(
["sample_rotation", "angle", r"catalog.start", "ext_msmt"],
["exit_status", "exit_status", r"catalog.stop", "default"],
["num_Images", "primary", r'catalog.stop["num_events"]', "default"],
["uid", "uid", r"catalog.start", "default"],
]

# Subset the library based on the output flag selected
Expand Down Expand Up @@ -397,11 +482,18 @@ def searchCatalog(

# Build output dataframe as a list of lists
outputList = []

# Outer loop: Catalog entries
for scanEntry in tqdm(reducedCatalog.values(), desc="Retrieving results..."):
for scanEntry in tqdm(reducedCatalog.items(), desc="Retrieving results"):
singleScanOutput = []

if existingCatalog is not None:
if scanEntry[0] in existingCatalog.uid.values:
# if the scan is already in the catalog, skip it
continue

scanEntry = scanEntry[1]

# Pull the start and stop docs once

currentCatalogStart = scanEntry.start
Expand Down Expand Up @@ -470,6 +562,107 @@ def searchCatalog(
)
return pd.DataFrame(outputList, columns=activeOutputLabels)


def findAppropriateDiodes(self,run_to_find,cat_diodes=None, diode_name='diode', same_cycle=True,time_cutoff_days = 3.0):
'''
Finds appropriate diode scans for a given run.
"Appropriate" scans are somewhat controlled by the kwargs to this function, but in general:
- same detector
- same cycle
- same edge
- within 3 days of the run

Args:
run_to_find (pd.DataFrame or numeric): a dataframe with a single row of the run you are trying to find a diode for, or the scan_id
(if scan_id it will be loaded using searchCatalog)
cat_diodes (pd.DataFrame): a dataframe of diode scans to search through. If None, will search the catalog for diode scans.
diode_name (str): the sample_name of the diode to use in search. Default is 'diode'.
same_cycle (bool): if True, only searches for diodes in the same cycle as the run_to_find.
time_cutoff_days (float): the maximum time difference between the run and the diode scan to be considered "relevant".
Returns:
pd.DataFrame: a dataframe of relevant diode scans, ordered by distance in time from your run.
Will warn if the closest diode is more than 1 day away.
To get *a* singular "best diode", just take the first row of the returned dataframe, i.e,:
best_diode = findAppropriateDiodes(run_to_find,cat_diodes,diode_name,same_cycle,time_cutoff_days).iloc[0]



'''
import pandas as pd
import warnings
pd.options.mode.copy_on_write = True
time_cutoff = pd.Timedelta(3,'day')

if not isinstance(run_to_find,pd.DataFrame):
run_to_find = self.searchCatalog(scan_id=run_to_find)

if cat_diodes is None:
kwargs = {'sample':diode_name}
if same_cycle:
kwargs['cycle'] = run_to_find['cycle'].iloc[0]
cat_diodes = self.searchCatalog(**kwargs)

def _plan_to_edge_name(cat_diodes):
cat_diodes['edge_name'] = cat_diodes['plan'].str.replace('nexafs','')
cat_diodes['edge_name'] = cat_diodes['edge_name'].str.replace('rsoxs','')
cat_diodes['edge_name'] = cat_diodes['edge_name'].str.replace('full','')
cat_diodes['edge_name'] = cat_diodes['edge_name'].str.replace('short','')
cat_diodes['edge_name'] = cat_diodes['edge_name'].str.replace('very','')
cat_diodes['edge_name'] = cat_diodes['edge_name'].str.replace('scan','')
cat_diodes['edge_name'] = cat_diodes['edge_name'].str.replace('nd','')
cat_diodes['edge_name'] = cat_diodes['edge_name'].str.replace('_','')

# expand short edge abbreviations
try:
from rsoxs_scans.defaults import edge_names
except ImportError:
edge_names = {
"c": "carbon",
"carbon": "carbon",
"carbonk": "carbon",
"ck": "carbon",
"n": "nitrogen",
"nitrogen": "nitrogen",
"nitrogenk": "nitrogen",
"nk": "nitrogen",
"f": "fluorine",
"fluorine": "fluorine",
"fluorinek": "fluorine",
"fk": "fluorine",
"o": "oxygen",
"oxygen": "oxygen",
"oxygenk": "oxygen",
"ok": "oxygen",
"ca": "calcium",
"calcium": "calcium",
"calciumk": "calcium",
"cak": "calcium",
'al': 'aluminium',
'aluminum': 'aluminium',
}
for k,v in edge_names.items():
cat_diodes['edge_name'] = cat_diodes['edge_name'].replace(k,v)
return cat_diodes

run_to_find = _plan_to_edge_name(run_to_find)
cat_diodes['time_proximity'] = cat_diodes['start_time'] - run_to_find['start_time'].iloc[0]
cat_diodes['abs_time_proximity'] = np.abs(cat_diodes['start_time'] - run_to_find['start_time'].iloc[0])
cat_diodes['same_scan'] = cat_diodes['plan'] == run_to_find['plan'].iloc[0]
cat_diodes['same_detector'] = cat_diodes['detector'] == run_to_find['detector'].iloc[0]
cat_diodes['is_older'] = cat_diodes['time_proximity'] < pd.Timedelta(0)
cat_diodes = _plan_to_edge_name(cat_diodes)
cat_diodes['same_edge'] = cat_diodes['edge_name'] == run_to_find['edge_name'].iloc[0]

relevant_diodes = cat_diodes[cat_diodes.same_edge]
relevant_diodes = relevant_diodes[relevant_diodes.same_detector]
relevant_diodes = relevant_diodes[relevant_diodes.abs_time_proximity < time_cutoff]
relevant_diodes = relevant_diodes.sort_values(by='abs_time_proximity')

if (relevant_diodes['abs_time_proximity'].min()) > pd.Timedelta(1,unit='day'):
warnings.warn(f"Stale diode! The closest relevant diode scan to the requested scan is {relevant_diodes['abs_time_proximity'].min()} from the measurement.")
return relevant_diodes


def background(f):
def wrapped(*args, **kwargs):
return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)
Expand Down Expand Up @@ -850,23 +1043,28 @@ def loadMonitors(

monitors = None

monitor_accumulator = []

# Iterate through the list of streams held by the Bluesky document 'entry'
for stream_name in list(entry.keys()):
# Add monitor streams to the output xr.Dataset
if "monitor" in stream_name:
if monitors is None: # First one
# incantation to extract the dataset from the bluesky stream
monitors = entry[stream_name].data.read()
else: # merge into the to existing output xarray
monitors = xr.merge((monitors, entry[stream_name].data.read()))
monitor_accumulator.append(entry[stream_name].data.read())
#if monitors is None: # First one
# # incantation to extract the dataset from the bluesky stream
# monitors = entry[stream_name].data.read()
#else: # merge into the to existing output xarray
# monitors = xr.merge((monitors, entry[stream_name].data.read()))
# if there are no monitors, return an empty xarray Dataset
if len(monitor_accumulator) == 0:
return xr.Dataset()

monitors = xr.merge(monitor_accumulator)

# At this stage monitors has dimension time and all streams as data variables
# the time dimension inherited all time values from all streams
# the data variables (Mesh current, sample current etc.) are all sparse, with lots of nans

# if there are no monitors, return an empty xarray Dataset
if monitors is None:
return xr.Dataset()

# For each nan value, replace with the closest value ahead of it in time
# For remaining nans, replace with closest value behind it in time
Expand Down Expand Up @@ -1003,6 +1201,7 @@ def loadMd(self, run):
stacklevel=2,
)


if md["rsoxs_config"] == "saxs":
md["detector"] = "Small Angle CCD Detector"
elif md["rsoxs_config"] == "waxs":
Expand Down
Loading