diff --git a/src/PyHyperScattering/PFEnergySeriesIntegrator.py b/src/PyHyperScattering/PFEnergySeriesIntegrator.py index 35843a60..e8b60ff7 100755 --- a/src/PyHyperScattering/PFEnergySeriesIntegrator.py +++ b/src/PyHyperScattering/PFEnergySeriesIntegrator.py @@ -4,7 +4,6 @@ import warnings import xarray as xr import numpy as np -import pandas as pd import math import pandas as pd from tqdm.auto import tqdm @@ -40,12 +39,36 @@ def integrateSingleImage(self,img): # get the energy and locate the matching integrator # use that integrator to reduce # return single reduced frame - - - if type(img.energy) != float: + + #print(f' img.energy is a {type(img.energy)}, len = {len(img.energy)} and is {img.energy}') + #print(f' img.system is a {type(img.system)}, len = {len(img.energy)} and is {img.system}') + #print(f' img.system.levels: {img.indexes["system"].names}') + #print(f' img.indexes: {img.indexes}') + #print(f' type of system value: {type(getattr(img,"system").values)}') + #print(f' shape of system value: {getattr(img,"system").values.shape}') + en = None + if 'energy' not in img.indexes and type(img.energy) != float: + try: + multiindex_name = None + for idx in img.indexes: + if type(img.indexes[idx]) == pd.core.indexes.multi.MultiIndex: + multiindex_name = idx + break + if 'energy' in img.indexes[multiindex_name].names: + for i,n in enumerate(img.indexes[multiindex_name].names): + if n == 'energy': + idx_of_energy = i + en = float(getattr(img,multiindex_name).values[idx_of_energy][0]) + except KeyError: + pass + if en is not None: + pass + elif type(img.energy) == float: + en = img.energy + else: try: en = img.energy.values[0] - if len(img.energy)>1: + if len(img.energy.values)>1: warnings.warn(f'Using the first energy value of {img.energy.values}, check that this is correct.',stacklevel=2) except IndexError: en = float(img.energy) @@ -58,6 +81,7 @@ def integrateSingleImage(self,img): else: en = img.energy + try: self.integrator = self.integrator_stack[en] except KeyError: @@ -169,7 +193,7 @@ def integrateImageStack_legacy(self,img_stack): # the following section attempts to shape the array that we have so that it meets the requirements for the xarray GroupBy/map # paradigm; specifically, it needs one index that's not pixel x/y. We either identify if that is the case, or we make it the - # case. however, there are probably edge cases not handled here. + # case. however, there are probably edge cases not handled here if len(indexes) == 1: if img_stack.__getattr__(indexes[0]).to_pandas().drop_duplicates().shape[0] != img_stack.__getattr__(indexes[0]).shape[0]: diff --git a/src/PyHyperScattering/SST1RSoXSDB.py b/src/PyHyperScattering/SST1RSoXSDB.py index dc315189..698b1447 100644 --- a/src/PyHyperScattering/SST1RSoXSDB.py +++ b/src/PyHyperScattering/SST1RSoXSDB.py @@ -19,7 +19,8 @@ from httpx import HTTPStatusError import tiled import dask - from databroker.queries import RawMongo, Key, FullText, Contains, Regex + from databroker.queries import RawMongo, Key, FullText, Contains, Regex, ScanIDRange + from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor except: print( "Imports failed. Are you running on a machine with proper libraries for databroker, tiled, etc.?" @@ -140,9 +141,10 @@ def summarize_run( sampleID: str = None, plan: str = None, userOutputs: list = [], + debugWarnings: bool = False, **kwargs, ) -> pd.DataFrame: - """Search the databroker.client.CatalogOfBlueskyRuns for scans matching all provided keywords and return metadata as a dataframe. + """Search the Bluesky catalog for scans matching all provided keywords and return metadata as a dataframe. Matches are made based on the values in the top level of the 'start' dict within the metadata of each entry in the Bluesky Catalog (databroker.client.CatalogOfBlueskyRuns). Based on the search arguments provided, @@ -200,9 +202,9 @@ def summarize_run( Valid options for the Metadata Source are any of [r'catalog.start', r'catalog.start["plan_args"], r'catalog.stop', r'catalog.stop["num_events"]'] e.g., userOutputs = [["Exposure Multiplier","exptime", r'catalog.start'], ["Stop Time","time",r'catalog.stop']] - + debugWarnings (bool, optional): if True, raises a warning with debugging information whenever a key can't be found. Returns: - pd.Dataframe containing the results of the search, or an empty dataframe if the search fails + Pandas dataframe containing the results of the search, or an empty dataframe if the search fails """ # Pull in the reference to the databroker.client.CatalogOfBlueskyRuns attribute @@ -233,11 +235,8 @@ def summarize_run( elif isinstance(value, list) and len(value) == 2: userSearchList.append([userLabel, value[0], value[1]]) else: # bad user input - warnString = ( - "Error parsing a keyword search term, check the format.\nSkipped argument: " - + str(value) - ) - warnings.warn(warnString, stacklevel=2) + raise ValueError( + f"Error parsing a keyword search term, check the format. Skipped argument: {value} ") # combine the lists of lists fullSearchList = defaultSearchDetails + userSearchList @@ -249,10 +248,7 @@ def summarize_run( # Iterate through search terms sequentially, reducing the size of the catalog based on successful matches reducedCatalog = bsCatalog - loopDesc = "Searching by keyword arguments" - for index, searchSeries in tqdm( - df_SearchDet.iterrows(), total=df_SearchDet.shape[0], desc=loopDesc - ): + for _,searchSeries in tqdm(df_SearchDet.iterrows(),total = df_SearchDet.shape[0], desc = "Running catalog search..."): # Skip arguments with value None, and quits if the catalog was reduced to 0 elements if (searchSeries[1] is not None) and (len(reducedCatalog) > 0): @@ -286,34 +282,29 @@ def summarize_run( # If a match fails, notify the user which search parameter yielded 0 results if len(reducedCatalog) == 0: warnString = ( - "Catalog reduced to zero when attempting to match the following condition:\n" - + searchSeries.to_string() - + "\n If this is a user-provided search parameter, check spelling/syntax.\n" + f"Catalog reduced to zero when attempting to match {searchSeries}\n" + +f"If this is a user-provided search parameter, check spelling/syntax." ) warnings.warn(warnString, stacklevel=2) return pd.DataFrame() ### Part 2: Build and return output dataframe - if ( - outputType == "scans" - ): # Branch 2.1, if only scan IDs needed, build and return a 1-column dataframe + if (outputType == "scans"): + # Branch 2.1, if only scan IDs needed, build and return a 1-column dataframe scan_ids = [] - loopDesc = "Building scan list" - for index, scanEntry in tqdm( - (enumerate(reducedCatalog)), total=len(reducedCatalog), desc=loopDesc - ): - scan_ids.append(reducedCatalog[scanEntry].start["scan_id"]) + for scanEntry in tqdm(reducedCatalog.values(), desc = "Building scan list"): + scan_ids.append(scanEntry.start["scan_id"]) return pd.DataFrame(scan_ids, columns=["Scan ID"]) else: # Branch 2.2, Output metadata from a variety of sources within each the catalog entry - + missesDuringLoad = False # Store details of output values as a list of lists # List elements are [Output Column Title, Bluesky Metadata Code, Metadata Source location, Applicable Output flag] outputValueLibrary = [ ["scan_id", "scan_id", r"catalog.start", "default"], ["uid", "uid", r"catalog.start", "ext_bio"], - ["start time", "time", r"catalog.start", "default"], + ["start_time", "time", r"catalog.start", "default"], ["cycle", "cycle", r"catalog.start", "default"], ["saf", "SAF", r"catalog.start", "ext_bio"], ["user_name", "user_name", r"catalog.start", "ext_bio"], @@ -323,7 +314,7 @@ def summarize_run( ["sample_id", "sample_id", r"catalog.start", "default"], ["bar_spot", "bar_spot", r"catalog.start", "ext_msmt"], ["plan", "plan_name", r"catalog.start", "default"], - ["detector", "RSoXS_Main_DET", r"catalog.start", "default"], + ["detector", "RSoXS_Main_DET", r"catalog.start", "default"], ["polarization", "pol", r'catalog.start["plan_args"]', "default"], ["sample_rotation", "angle", r"catalog.start", "ext_msmt"], ["exit_status", "exit_status", r"catalog.stop", "default"], @@ -350,11 +341,7 @@ def summarize_run( activeOutputValues.append(userOutEntry) activeOutputLabels.append(userOutEntry[0]) else: # bad user input - warnString = ( - "Error parsing user-provided output request, check the format.\nSkipped: " - + str(userOutEntry) - ) - warnings.warn(warnString, stacklevel=2) + raise ValueError(f"Error parsing user-provided output request {userOutEntry}, check the format.", stacklevel=2) # Add any user-provided search terms for userSearchEntry in userSearchList: @@ -371,20 +358,17 @@ def summarize_run( # Build output dataframe as a list of lists outputList = [] - # Outer loop: Catalog entries - loopDesc = "Building output dataframe" - for index, scanEntry in tqdm( - (enumerate(reducedCatalog)), total=len(reducedCatalog), desc=loopDesc - ): - + # Outer loop: Catalog entries + for scanEntry in tqdm(reducedCatalog.values(),desc = "Retrieving results..."): singleScanOutput = [] # Pull the start and stop docs once - currentCatalogStart = reducedCatalog[scanEntry].start - currentCatalogStop = reducedCatalog[scanEntry].stop - + + currentCatalogStart = scanEntry.start + currentCatalogStop = scanEntry.stop + currentScanID = currentCatalogStart["scan_id"] - + # Inner loop: append output values for outputEntry in activeOutputValues: outputVariableName = outputEntry[0] @@ -392,7 +376,10 @@ def summarize_run( metaDataSource = outputEntry[2] try: # Add the metadata value depending on where it is located - if metaDataSource == r"catalog.start": + if metaDataLabel == 'time': + singleScanOutput.append(datetime.datetime.fromtimestamp(currentCatalogStart['time'])) + # see Zen of Python # 9,8 for justification + elif metaDataSource == r"catalog.start": singleScanOutput.append(currentCatalogStart[metaDataLabel]) elif metaDataSource == r'catalog.start["plan_args"]': singleScanOutput.append( @@ -405,36 +392,29 @@ def summarize_run( currentCatalogStop["num_events"][metaDataLabel] ) else: - warnString = ( - "Scan: > " - + str(currentScanID) - + " < Failed to locate metaData entry for > " - + str(outputVariableName) - + " <\n Tried looking for label: > " - + str(metaDataLabel) - + " < in: " - + str(metaDataSource) - ) - warnings.warn(warnString, stacklevel=2) + if debugWarnings: + warnings.warn( + f'Failed to locate metadata for {outputVariableName} in scan {currentScanID}.', + stacklevel=2) + missesDuringLoad = True except (KeyError, TypeError): - warnString = ( - "Scan: > " - + str(currentScanID) - + " < Failed to locate metaData entry for > " - + str(outputVariableName) - + " <\n Tried looking for label: > " - + str(metaDataLabel) - + " < in: " - + str(metaDataSource) - ) - warnings.warn(warnString, stacklevel=2) + if debugWarnings: + warnings.warn( + f'Failed to locate metadata for {outputVariableName} in scan {currentScanID}.', + stacklevel=2) + missesDuringLoad = True singleScanOutput.append("N/A") # Append to the filled output list for this entry to the list of lists outputList.append(singleScanOutput) - + + # Convert to dataframe for export + if missesDuringLoad: + warnings.warn( + f'One or more missing field(s) during this load were replaced with "N/A". Re-run with debugWarnings=True to see details.', + stacklevel=2) return pd.DataFrame(outputList, columns=activeOutputLabels) def background(f): @@ -781,7 +761,7 @@ def loadMonitors( Presently bins are averaged between measurements intervals. useShutterThinning : bool, optional Whether or not to attempt to thin (filter) the raw time streams to remove data collected during shutter opening/closing, by default False - As of 230209 at NSLS2 SST1, using useShutterThinning= True for exposure times of < 0.5s is + As of 9 Feb 2023 at NSLS2 SST1, using useShutterThinning= True for exposure times of < 0.5s is not recommended because the shutter data is unreliable and too many points will be culled n_thinning_iters : int, optional how many iterations of thinning to perform, by default 5 @@ -807,6 +787,10 @@ def loadMonitors( # At this stage monitors has dimension time and all streams as data variables # the time dimension inherited all time values from all streams # the data variables (Mesh current, sample current etc.) are all sparse, with lots of nans + + # if there are no monitors, return an empty xarray Dataset + if monitors is None: + return xr.Dataset() # For each nan value, replace with the closest value ahead of it in time # For remaining nans, replace with closest value behind it in time @@ -873,7 +857,7 @@ def loadMonitors( except Exception as e: # raise e # for testing warnings.warn( - "Error while time-integrating onto images. Check data.", + "Error while time-integrating monitors onto images. Usually, this indicates a problem with the monitors, this is a critical error if doing normalization otherwise fine to ignore.", stacklevel=2, ) return monitors diff --git a/src/PyHyperScattering/WPIntegrator.py b/src/PyHyperScattering/WPIntegrator.py index 9dd7041f..68f59707 100644 --- a/src/PyHyperScattering/WPIntegrator.py +++ b/src/PyHyperScattering/WPIntegrator.py @@ -112,9 +112,9 @@ def integrateSingleImage(self,img): pass if self.MACHINE_HAS_CUDA: - TwoD = self.warp_polar_gpu(img_to_integ,center=(center_x,center_y), radius = np.nanmax(img_to_integ.shape)) + TwoD = self.warp_polar_gpu(img_to_integ,center=(center_x,center_y), radius = np.sqrt((img_to_integ.shape[0] - center_x)**2 + (img_to_integ.shape[1] - center_y)**2)) else: - TwoD = skimage.transform.warp_polar(img_to_integ,center=(center_x,center_y), radius = np.nanmax(img_to_integ.shape)) + TwoD = skimage.transform.warp_polar(img_to_integ,center=(center_x,center_y), radius = np.sqrt((img_to_integ.shape[0] - center_x)**2 + (img_to_integ.shape[1] - center_y)**2)) qx = img.qx