Skip to content

Commit

Permalink
Merge branch 'main' into 56-integratesingleimage-indexing-error
Browse files Browse the repository at this point in the history
  • Loading branch information
pbeaucage authored Aug 2, 2023
2 parents 4892704 + 6018306 commit 9b0ddb3
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 76 deletions.
36 changes: 30 additions & 6 deletions src/PyHyperScattering/PFEnergySeriesIntegrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import warnings
import xarray as xr
import numpy as np
import pandas as pd
import math
import pandas as pd
from tqdm.auto import tqdm
Expand Down Expand Up @@ -40,12 +39,36 @@ def integrateSingleImage(self,img):
# get the energy and locate the matching integrator
# use that integrator to reduce
# return single reduced frame


if type(img.energy) != float:

#print(f' img.energy is a {type(img.energy)}, len = {len(img.energy)} and is {img.energy}')
#print(f' img.system is a {type(img.system)}, len = {len(img.energy)} and is {img.system}')
#print(f' img.system.levels: {img.indexes["system"].names}')
#print(f' img.indexes: {img.indexes}')
#print(f' type of system value: {type(getattr(img,"system").values)}')
#print(f' shape of system value: {getattr(img,"system").values.shape}')
en = None
if 'energy' not in img.indexes and type(img.energy) != float:
try:
multiindex_name = None
for idx in img.indexes:
if type(img.indexes[idx]) == pd.core.indexes.multi.MultiIndex:
multiindex_name = idx
break
if 'energy' in img.indexes[multiindex_name].names:
for i,n in enumerate(img.indexes[multiindex_name].names):
if n == 'energy':
idx_of_energy = i
en = float(getattr(img,multiindex_name).values[idx_of_energy][0])
except KeyError:
pass
if en is not None:
pass
elif type(img.energy) == float:
en = img.energy
else:
try:
en = img.energy.values[0]
if len(img.energy)>1:
if len(img.energy.values)>1:
warnings.warn(f'Using the first energy value of {img.energy.values}, check that this is correct.',stacklevel=2)
except IndexError:
en = float(img.energy)
Expand All @@ -58,6 +81,7 @@ def integrateSingleImage(self,img):

else:
en = img.energy

try:
self.integrator = self.integrator_stack[en]
except KeyError:
Expand Down Expand Up @@ -169,7 +193,7 @@ def integrateImageStack_legacy(self,img_stack):

# the following section attempts to shape the array that we have so that it meets the requirements for the xarray GroupBy/map
# paradigm; specifically, it needs one index that's not pixel x/y. We either identify if that is the case, or we make it the
# case. however, there are probably edge cases not handled here.
# case. however, there are probably edge cases not handled here

if len(indexes) == 1:
if img_stack.__getattr__(indexes[0]).to_pandas().drop_duplicates().shape[0] != img_stack.__getattr__(indexes[0]).shape[0]:
Expand Down
120 changes: 52 additions & 68 deletions src/PyHyperScattering/SST1RSoXSDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
from httpx import HTTPStatusError
import tiled
import dask
from databroker.queries import RawMongo, Key, FullText, Contains, Regex
from databroker.queries import RawMongo, Key, FullText, Contains, Regex, ScanIDRange
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
except:
print(
"Imports failed. Are you running on a machine with proper libraries for databroker, tiled, etc.?"
Expand Down Expand Up @@ -140,9 +141,10 @@ def summarize_run(
sampleID: str = None,
plan: str = None,
userOutputs: list = [],
debugWarnings: bool = False,
**kwargs,
) -> pd.DataFrame:
"""Search the databroker.client.CatalogOfBlueskyRuns for scans matching all provided keywords and return metadata as a dataframe.
"""Search the Bluesky catalog for scans matching all provided keywords and return metadata as a dataframe.
Matches are made based on the values in the top level of the 'start' dict within the metadata of each
entry in the Bluesky Catalog (databroker.client.CatalogOfBlueskyRuns). Based on the search arguments provided,
Expand Down Expand Up @@ -200,9 +202,9 @@ def summarize_run(
Valid options for the Metadata Source are any of [r'catalog.start', r'catalog.start["plan_args"], r'catalog.stop',
r'catalog.stop["num_events"]']
e.g., userOutputs = [["Exposure Multiplier","exptime", r'catalog.start'], ["Stop Time","time",r'catalog.stop']]
debugWarnings (bool, optional): if True, raises a warning with debugging information whenever a key can't be found.
Returns:
pd.Dataframe containing the results of the search, or an empty dataframe if the search fails
Pandas dataframe containing the results of the search, or an empty dataframe if the search fails
"""

# Pull in the reference to the databroker.client.CatalogOfBlueskyRuns attribute
Expand Down Expand Up @@ -233,11 +235,8 @@ def summarize_run(
elif isinstance(value, list) and len(value) == 2:
userSearchList.append([userLabel, value[0], value[1]])
else: # bad user input
warnString = (
"Error parsing a keyword search term, check the format.\nSkipped argument: "
+ str(value)
)
warnings.warn(warnString, stacklevel=2)
raise ValueError(
f"Error parsing a keyword search term, check the format. Skipped argument: {value} ")

# combine the lists of lists
fullSearchList = defaultSearchDetails + userSearchList
Expand All @@ -249,10 +248,7 @@ def summarize_run(
# Iterate through search terms sequentially, reducing the size of the catalog based on successful matches

reducedCatalog = bsCatalog
loopDesc = "Searching by keyword arguments"
for index, searchSeries in tqdm(
df_SearchDet.iterrows(), total=df_SearchDet.shape[0], desc=loopDesc
):
for _,searchSeries in tqdm(df_SearchDet.iterrows(),total = df_SearchDet.shape[0], desc = "Running catalog search..."):

# Skip arguments with value None, and quits if the catalog was reduced to 0 elements
if (searchSeries[1] is not None) and (len(reducedCatalog) > 0):
Expand Down Expand Up @@ -286,34 +282,29 @@ def summarize_run(
# If a match fails, notify the user which search parameter yielded 0 results
if len(reducedCatalog) == 0:
warnString = (
"Catalog reduced to zero when attempting to match the following condition:\n"
+ searchSeries.to_string()
+ "\n If this is a user-provided search parameter, check spelling/syntax.\n"
f"Catalog reduced to zero when attempting to match {searchSeries}\n"
+f"If this is a user-provided search parameter, check spelling/syntax."
)
warnings.warn(warnString, stacklevel=2)
return pd.DataFrame()

### Part 2: Build and return output dataframe

if (
outputType == "scans"
): # Branch 2.1, if only scan IDs needed, build and return a 1-column dataframe
if (outputType == "scans"):
# Branch 2.1, if only scan IDs needed, build and return a 1-column dataframe
scan_ids = []
loopDesc = "Building scan list"
for index, scanEntry in tqdm(
(enumerate(reducedCatalog)), total=len(reducedCatalog), desc=loopDesc
):
scan_ids.append(reducedCatalog[scanEntry].start["scan_id"])
for scanEntry in tqdm(reducedCatalog.values(), desc = "Building scan list"):
scan_ids.append(scanEntry.start["scan_id"])
return pd.DataFrame(scan_ids, columns=["Scan ID"])

else: # Branch 2.2, Output metadata from a variety of sources within each the catalog entry

missesDuringLoad = False
# Store details of output values as a list of lists
# List elements are [Output Column Title, Bluesky Metadata Code, Metadata Source location, Applicable Output flag]
outputValueLibrary = [
["scan_id", "scan_id", r"catalog.start", "default"],
["uid", "uid", r"catalog.start", "ext_bio"],
["start time", "time", r"catalog.start", "default"],
["start_time", "time", r"catalog.start", "default"],
["cycle", "cycle", r"catalog.start", "default"],
["saf", "SAF", r"catalog.start", "ext_bio"],
["user_name", "user_name", r"catalog.start", "ext_bio"],
Expand All @@ -323,7 +314,7 @@ def summarize_run(
["sample_id", "sample_id", r"catalog.start", "default"],
["bar_spot", "bar_spot", r"catalog.start", "ext_msmt"],
["plan", "plan_name", r"catalog.start", "default"],
["detector", "RSoXS_Main_DET", r"catalog.start", "default"],
["detector", "RSoXS_Main_DET", r"catalog.start", "default"],
["polarization", "pol", r'catalog.start["plan_args"]', "default"],
["sample_rotation", "angle", r"catalog.start", "ext_msmt"],
["exit_status", "exit_status", r"catalog.stop", "default"],
Expand All @@ -350,11 +341,7 @@ def summarize_run(
activeOutputValues.append(userOutEntry)
activeOutputLabels.append(userOutEntry[0])
else: # bad user input
warnString = (
"Error parsing user-provided output request, check the format.\nSkipped: "
+ str(userOutEntry)
)
warnings.warn(warnString, stacklevel=2)
raise ValueError(f"Error parsing user-provided output request {userOutEntry}, check the format.", stacklevel=2)

# Add any user-provided search terms
for userSearchEntry in userSearchList:
Expand All @@ -371,28 +358,28 @@ def summarize_run(
# Build output dataframe as a list of lists
outputList = []

# Outer loop: Catalog entries
loopDesc = "Building output dataframe"
for index, scanEntry in tqdm(
(enumerate(reducedCatalog)), total=len(reducedCatalog), desc=loopDesc
):

# Outer loop: Catalog entries
for scanEntry in tqdm(reducedCatalog.values(),desc = "Retrieving results..."):
singleScanOutput = []

# Pull the start and stop docs once
currentCatalogStart = reducedCatalog[scanEntry].start
currentCatalogStop = reducedCatalog[scanEntry].stop


currentCatalogStart = scanEntry.start
currentCatalogStop = scanEntry.stop

currentScanID = currentCatalogStart["scan_id"]

# Inner loop: append output values
for outputEntry in activeOutputValues:
outputVariableName = outputEntry[0]
metaDataLabel = outputEntry[1]
metaDataSource = outputEntry[2]

try: # Add the metadata value depending on where it is located
if metaDataSource == r"catalog.start":
if metaDataLabel == 'time':
singleScanOutput.append(datetime.datetime.fromtimestamp(currentCatalogStart['time']))
# see Zen of Python # 9,8 for justification
elif metaDataSource == r"catalog.start":
singleScanOutput.append(currentCatalogStart[metaDataLabel])
elif metaDataSource == r'catalog.start["plan_args"]':
singleScanOutput.append(
Expand All @@ -405,36 +392,29 @@ def summarize_run(
currentCatalogStop["num_events"][metaDataLabel]
)
else:
warnString = (
"Scan: > "
+ str(currentScanID)
+ " < Failed to locate metaData entry for > "
+ str(outputVariableName)
+ " <\n Tried looking for label: > "
+ str(metaDataLabel)
+ " < in: "
+ str(metaDataSource)
)
warnings.warn(warnString, stacklevel=2)
if debugWarnings:
warnings.warn(
f'Failed to locate metadata for {outputVariableName} in scan {currentScanID}.',
stacklevel=2)
missesDuringLoad = True

except (KeyError, TypeError):
warnString = (
"Scan: > "
+ str(currentScanID)
+ " < Failed to locate metaData entry for > "
+ str(outputVariableName)
+ " <\n Tried looking for label: > "
+ str(metaDataLabel)
+ " < in: "
+ str(metaDataSource)
)
warnings.warn(warnString, stacklevel=2)
if debugWarnings:
warnings.warn(
f'Failed to locate metadata for {outputVariableName} in scan {currentScanID}.',
stacklevel=2)
missesDuringLoad = True
singleScanOutput.append("N/A")

# Append to the filled output list for this entry to the list of lists
outputList.append(singleScanOutput)



# Convert to dataframe for export
if missesDuringLoad:
warnings.warn(
f'One or more missing field(s) during this load were replaced with "N/A". Re-run with debugWarnings=True to see details.',
stacklevel=2)
return pd.DataFrame(outputList, columns=activeOutputLabels)

def background(f):
Expand Down Expand Up @@ -781,7 +761,7 @@ def loadMonitors(
Presently bins are averaged between measurements intervals.
useShutterThinning : bool, optional
Whether or not to attempt to thin (filter) the raw time streams to remove data collected during shutter opening/closing, by default False
As of 230209 at NSLS2 SST1, using useShutterThinning= True for exposure times of < 0.5s is
As of 9 Feb 2023 at NSLS2 SST1, using useShutterThinning= True for exposure times of < 0.5s is
not recommended because the shutter data is unreliable and too many points will be culled
n_thinning_iters : int, optional
how many iterations of thinning to perform, by default 5
Expand All @@ -807,6 +787,10 @@ def loadMonitors(
# At this stage monitors has dimension time and all streams as data variables
# the time dimension inherited all time values from all streams
# the data variables (Mesh current, sample current etc.) are all sparse, with lots of nans

# if there are no monitors, return an empty xarray Dataset
if monitors is None:
return xr.Dataset()

# For each nan value, replace with the closest value ahead of it in time
# For remaining nans, replace with closest value behind it in time
Expand Down Expand Up @@ -873,7 +857,7 @@ def loadMonitors(
except Exception as e:
# raise e # for testing
warnings.warn(
"Error while time-integrating onto images. Check data.",
"Error while time-integrating monitors onto images. Usually, this indicates a problem with the monitors, this is a critical error if doing normalization otherwise fine to ignore.",
stacklevel=2,
)
return monitors
Expand Down
4 changes: 2 additions & 2 deletions src/PyHyperScattering/WPIntegrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ def integrateSingleImage(self,img):
pass

if self.MACHINE_HAS_CUDA:
TwoD = self.warp_polar_gpu(img_to_integ,center=(center_x,center_y), radius = np.nanmax(img_to_integ.shape))
TwoD = self.warp_polar_gpu(img_to_integ,center=(center_x,center_y), radius = np.sqrt((img_to_integ.shape[0] - center_x)**2 + (img_to_integ.shape[1] - center_y)**2))
else:
TwoD = skimage.transform.warp_polar(img_to_integ,center=(center_x,center_y), radius = np.nanmax(img_to_integ.shape))
TwoD = skimage.transform.warp_polar(img_to_integ,center=(center_x,center_y), radius = np.sqrt((img_to_integ.shape[0] - center_x)**2 + (img_to_integ.shape[1] - center_y)**2))


qx = img.qx
Expand Down

0 comments on commit 9b0ddb3

Please sign in to comment.