diff --git a/contrib/app/SpotAnalysis/PeakFlux.py b/contrib/app/SpotAnalysis/PeakFlux.py index 4aec600c8..5da36d3bc 100644 --- a/contrib/app/SpotAnalysis/PeakFlux.py +++ b/contrib/app/SpotAnalysis/PeakFlux.py @@ -3,6 +3,7 @@ import numpy as np +from contrib.app.SpotAnalysis.PeakFluxSettings import PeakFluxSettings import opencsp.common.lib.cv.SpotAnalysis as sa from opencsp.common.lib.cv.annotations.HotspotAnnotation import HotspotAnnotation from opencsp.common.lib.cv.fiducials.BcsFiducial import BcsFiducial @@ -11,8 +12,8 @@ import opencsp.common.lib.cv.spot_analysis.SpotAnalysisOperableAttributeParser as saoap from opencsp.common.lib.cv.spot_analysis.image_processor import * import opencsp.common.lib.tool.file_tools as ft +import opencsp.common.lib.tool.image_tools as it import opencsp.common.lib.tool.log_tools as lt -import opencsp.common.lib.tool.time_date_tools as tdt class PeakFlux: @@ -34,138 +35,177 @@ class PeakFlux: - Per-heliostat peak flux identification """ - def __init__(self, indir: str, outdir: str, experiment_name: str, settings_path_name_ext: str): - self.indir = indir + def __init__(self, outdir: str, experiment_name: str, settings: PeakFluxSettings): self.outdir = outdir self.experiment_name = experiment_name - self.settings_path_name_ext = settings_path_name_ext + self.settings = settings - settings_path, settings_name, settings_ext = ft.path_components(self.settings_path_name_ext) - settings_dict = ft.read_json("PeakFlux settings", settings_path, settings_name + settings_ext) - self.crop_box: list[int] = settings_dict['crop_box'] - self.bcs_pixel: list[int] = settings_dict['bcs_pixel_location'] - self.heliostate_name_pattern = re.compile(settings_dict['heliostat_name_pattern']) - - group_assigner = AverageByGroupImageProcessor.group_by_name(re.compile(r"(_off)?( Raw)")) + group_assigner = AverageByGroupImageProcessor.group_by_name(re.compile(r"([0-9]{1,2}[EW][0-9]{1,2})")) group_trigger = AverageByGroupImageProcessor.group_trigger_on_change() supporting_images_map = { ImageType.PRIMARY: lambda operable, operables: "off" not in operable.get_primary_path_nameext(), ImageType.NULL: lambda operable, operables: "off" in operable.get_primary_path_nameext(), } - self.image_processors: list[AbstractSpotAnalysisImageProcessor] = [ - CroppingImageProcessor(*self.crop_box), - AverageByGroupImageProcessor(group_assigner, group_trigger), - EchoImageProcessor(), - SupportingImagesCollectorImageProcessor(supporting_images_map), - NullImageSubtractionImageProcessor(), - ConvolutionImageProcessor(kernel="box", diameter=3), - BcsLocatorImageProcessor(), - View3dImageProcessor(crop_to_threshold=20, max_resolution=(100, 100), interactive=False), - HotspotImageProcessor(desired_shape=21, draw_debug_view=False), - ViewCrossSectionImageProcessor( - self.get_bcs_origin, 'BCS', single_plot=False, crop_to_threshold=20, interactive=True + self.image_processors: list[AbstractSpotAnalysisImageProcessor] = { + 'Crop': CroppingImageProcessor(*self.crop_box), + 'AvgG': AverageByGroupImageProcessor(group_assigner, group_trigger), + 'Echo': EchoImageProcessor(), + 'Coll': SupportingImagesCollectorImageProcessor(supporting_images_map), + 'Null': NullImageSubtractionImageProcessor(), + 'Noiz': ConvolutionImageProcessor(kernel="box", diameter=3), + 'Targ': TargetBoardLocatorImageProcessor( + None, + None, + settings.target_board_size_wh[0], + settings.target_board_size_wh[1], + settings.target_canny_gradients[0], + settings.target_canny_gradients[1], + edge_coarse_width=30, + canny_test_gradients=settings.target_canny_test_gradients, + ), + 'Fill': InpaintImageProcessor(settings.infill_mask_file, settings.tmpdir), + 'Save': SaveToFileImageProcessor(ft.join(settings.outdir, "filled_targets")), + 'Ve3d': View3dImageProcessor(crop_to_threshold=20, max_resolution=(100, 100)), + 'HotL': HotspotImageProcessor( + desired_shape=49, draw_debug_view=False, record_visualization=False, record_debug_view=6 ), - ViewCrossSectionImageProcessor( - self.get_peak_origin, 'Hotspot', single_plot=False, crop_to_threshold=20, interactive=True + 'Vcx2': ViewCrossSectionImageProcessor( + self.get_peak_origin, 'Hotspot', single_plot=False, crop_to_threshold=20 ), - PopulationStatisticsImageProcessor(initial_min=0, initial_max=255), - FalseColorImageProcessor(), - AnnotationImageProcessor(), - ] + 'Stat': PopulationStatisticsImageProcessor(min_pop_size=1, initial_min=0, initial_max=255), + 'Fclr': ViewFalseColorImageProcessor(), + 'Anno': ViewAnnotationsImageProcessor(), + } + self.pptx_processor = PowerpointImageProcessor( + save_dir=outdir, + save_name="processing_pipeline", + overwrite=True, + processors_per_slide=[ + [self.image_processors['Noiz'], self.image_processors['Targ']], + [self.image_processors['Fill']], + [self.image_processors['Ve3d']], + [self.image_processors['HotL']], + [self.image_processors['Vcx2']], + [self.image_processors['Fclr'], self.image_processors['Anno']], + ], + ) + image_processor_list = list(self.image_processors.values()) + [self.pptx_processor] + self.image_processors_list: list[AbstractSpotAnalysisImageProcessor] = image_processor_list + self.spot_analysis = sa.SpotAnalysis( - experiment_name, self.image_processors, save_dir=outdir, save_overwrite=True + experiment_name, self.image_processors_list, save_dir=outdir, save_overwrite=True ) - filenames = ft.files_in_directory_by_extension(self.indir, [".jpg"])[".jpg"] - source_path_name_exts = [os.path.join(self.indir, filename) for filename in filenames] - self.spot_analysis.set_primary_images(source_path_name_exts) - - def run(self): - # process all images from indir - for result in self.spot_analysis: - # save the processed image - save_path = self.spot_analysis.save_image( - result, self.outdir, save_ext="png", also_save_supporting_images=False, also_save_attributes_file=True - ) - if save_path is None: - lt.warn( - f"Warning in PeakFlux.run(): failed to save image. " - + "Maybe SpotAnalaysis.save_overwrite is False? ({self.spot_analysis.save_overwrite=})" - ) + def assign_inputs(self, dirs_or_files: str | list[str]): + """ + Assigns the given image files or the image files in the given + directories as input to this instance's SpotAnalysis. Must be done + before iterating through this instance's SpotAnalysis. + + Parameters + ---------- + dirs_or_files : str | list[str] + The image files to be analyzed, or directories that contain image + files. Can be mixed files and directores. + """ + file_path_name_exts: list[str] = [] + + for dir_or_file in dirs_or_files: + if os.path.isfile(dir_or_file): + file_path_name_exts.append(dir_or_file) + elif os.path.isdir(dir_or_file): + dir_files = it.image_files_in_directory(dir_or_file) + file_path_name_exts += [os.path.join(dir_or_file, filename) for filename in dir_files] else: - lt.info(f"Saved image to {save_path}") + lt.error_and_raise( + FileNotFoundError, + "Error in PeakFlux.assign_inputs(): " + + f"given dir_or_file {dir_or_file} is neither a directory nor a file.", + ) - # Get the attributes of the processed image, to save the results we're most interested in into a single - # condensed csv file. - parser = saoap.SpotAnalysisOperableAttributeParser(result, self.spot_analysis) + self.spot_analysis.set_primary_images(file_path_name_exts) - def get_bcs_origin(self, operable: SpotAnalysisOperable) -> tuple[int, int]: + def assign_target_board_reference_image(self, reference_image_dir_or_file: str): """ - Returns the origin pixel location of the BCS fiducial assigned to this operable. + Assigns the given image file or directory containing image files as the + reference for the TargetBoardLocatorImageProcessor. Parameters ---------- - operable : SpotAnalysisOperable - An operable that has resulted from a BcsLocatorImageProcessor and - has an assigned BCS fiducial. - - Returns - ------- - bcs_origin: tuple[int, int] - The origin of the BCS fiducial. For circular BCS systems, this will - be the center point of the circle. + reference_image_dir_or_file : str + The image file, or the directory containing image files. """ + targ_processor: TargetBoardLocatorImageProcessor = self.image_processors['Targ'] + targ_processor.set_reference_images(reference_image_dir_or_file) + + def assign_inputs_from_settings(self): + """ + Use the values from the PeakFluxSettings instance to find input images + and reference images, and assign them to this PeakFlux instance. + """ + # Build the filters for the directories + is_reference_dir = lambda dn: any([rdn in dn for rdn in self.settings.target_reference_images_dirnames]) + is_missed_helio_dir = lambda dn: self.get_helio_name(dn) in self.settings.missed_heliostats + is_issue_helio_dir = lambda dn: self.get_helio_name(dn) in self.settings.issue_heliostats + has_helio_name = lambda dn: self.get_helio_name(dn) is not None + + # Get the target reference directories. These should be directories that + # contain images without any sun on them. + dirnames = ft.files_in_directory(self.settings.indir, files_only=False) + reference_dirnames = list(filter(lambda dn: is_reference_dir(dn), dirnames)) + target_reference_path = ft.norm_path(os.path.join(self.settings.indir, reference_dirnames[0], "Raw Images")) + + # Get the heliostat image directories. Since the directories names start + # with a datetime string, sorting will put them all into collection order. + heliostat_dirnames = list(filter(lambda dn: not is_reference_dir(dn), dirnames)) + heliostat_dirnames = list(filter(lambda dn: not is_missed_helio_dir(dn), heliostat_dirnames)) + heliostat_dirnames = list(filter(lambda dn: not is_issue_helio_dir(dn), heliostat_dirnames)) + heliostat_dirnames = list(filter(lambda dn: has_helio_name(dn), heliostat_dirnames)) + # heliostat_dirnames = list(filter(lambda dn: "07E01" in dn, heliostat_dirnames)) + if self.settings.limit_num_heliostats >= 0: + heliostat_dirnames = heliostat_dirnames[ + : np.min([self.settings.limit_num_heliostats, len(heliostat_dirnames)]) + ] + heliostat_dirs = [ft.norm_path(os.path.join(self.settings.indir, dn)) for dn in heliostat_dirnames] + + # Compile a list of images from the image directories. + source_images_path_name_exts: list[str] = [] + for dirname in heliostat_dirs: + raw_images_path = ft.norm_path(os.path.join(self.settings.indir, dirname, "Raw Images")) + if not ft.directory_exists(raw_images_path): + lt.error_and_raise( + FileNotFoundError, + "Error in peak_flux: " + f"raw images directory \"{raw_images_path}\" does not exist", + ) + for file_name_ext in ft.files_in_directory(raw_images_path, files_only=True): + source_images_path_name_exts.append(ft.norm_path(os.path.join(raw_images_path, file_name_ext))) + + self.assign_inputs(source_images_path_name_exts) + self.assign_target_board_reference_image(target_reference_path) + + def get_helio_name(self, dirname: str) -> str | None: + """Uses the heliostat_name_pattern from the peak flux settings to parse + the heliostat name out of the given directory name.""" + helio_names = self.settings.heliostat_name_pattern.findall(dirname) + if len(helio_names) == 0: + return None + return helio_names[0] + + def get_bcs_origin(self, operable: SpotAnalysisOperable): fiducials = operable.get_fiducials_by_type(BcsFiducial) + if len(fiducials) == 0: + return None fiducial = fiducials[0] origin_fx, origin_fy = fiducial.origin.astuple() origin_ix, origin_iy = int(np.round(origin_fx)), int(np.round(origin_fy)) return origin_ix, origin_iy - def get_peak_origin(self, operable: SpotAnalysisOperable) -> tuple[int, int]: - """ - Get the peak pixel location of the hotspot for the given operable. - - Parameters - ---------- - operable : SpotAnalysisOperable - An operable that has resulted from a HotspotImageProcessor and has - an assigned hotspot annotation. - - Returns - ------- - peak_origin: tuple[int, int] - The origin of the hotspot annotation. - """ + def get_peak_origin(self, operable: SpotAnalysisOperable): fiducials = operable.get_fiducials_by_type(HotspotAnnotation) + if len(fiducials) == 0: + return None fiducial = fiducials[0] origin_fx, origin_fy = fiducial.origin.astuple() origin_ix, origin_iy = int(np.round(origin_fx)), int(np.round(origin_fy)) return origin_ix, origin_iy - - -if __name__ == "__main__": - import argparse - - parser = argparse.ArgumentParser( - prog=__file__.rstrip(".py"), description='Processes images to find the point of peak flux.' - ) - parser.add_argument('indir', type=str, help="Directory with images to be processed.") - parser.add_argument('outdir', type=str, help="Directory for where to put processed images and computed results.") - parser.add_argument('experiment_name', type=str, help="A description of the current data collection.") - parser.add_argument('settings_file', type=str, help="Path to the settings JSON file for this PeakFlux evaluation.") - args = parser.parse_args() - - # create the output directory - ft.create_directories_if_necessary(args.outdir) - ft.delete_files_in_directory(args.outdir, "*") - - # create the log file - log_path_name_ext = os.path.join(args.outdir, "PeakFlux_" + tdt.current_date_time_string_forfile() + ".log") - lt.logger(log_path_name_ext) - - # validate the rest of the inputs - if not ft.directory_exists(args.indir): - lt.error_and_raise(FileNotFoundError, f"Error in PeakFlux.py: input directory '{args.indir}' does not exist!") - - PeakFlux(args.indir, args.outdir, args.experiment_name, args.settings_file).run() diff --git a/contrib/experiments/WIP/1_AssignImageGains.py b/contrib/experiments/WIP/1_AssignImageGains.py new file mode 100644 index 000000000..7239dd209 --- /dev/null +++ b/contrib/experiments/WIP/1_AssignImageGains.py @@ -0,0 +1,356 @@ +import shutil +import multiprocessing + +import exiftool +import cv2 as cv +import numpy as np +from PIL import Image +import pytesseract + +from opencsp import opencsp_settings +import opencsp.common.lib.render.figure_management as fm +import opencsp.common.lib.render.view_spec as vs +import opencsp.common.lib.render_control.RenderControlAxis as rca +import opencsp.common.lib.render_control.RenderControlFigure as rcfg +import opencsp.common.lib.render_control.RenderControlFigureRecord as rcfr +import opencsp.common.lib.render_control.RenderControlPointSeq as rcps +import opencsp.common.lib.tool.file_tools as ft +import opencsp.common.lib.tool.image_tools as it +import opencsp.common.lib.tool.log_tools as lt + + +def get_matching_bcs_images(image_path_name_exts: list[str]) -> list[tuple[str, str | None, str | None]]: + """Find the matching Raw and PixelData images to the processed images. + + Performs the following steps: + 1. Find all "(.*) Processed.JPG" images + (for example "20241004_140500.32 hourly_1400 Processed.JPG") + 2. Find all images in the same directory as the processed image, or a + sister directory of the processed image, that match the first part of + the processed image name + 3. Collect these images together in a tuple + 4. Return a list of all gathered tuples + + Parameters + ---------- + image_path_name_exts: list[str] + The images path/name.ext for all the images to be matched, such as from + image_tools.image_files_in_directory(). + + Returns + ------- + matched_images: list[tuple[str, str|None, str|None]] + The processed images path/name.ext, and potentially the matching Raw and + PixelData images. + """ + # Index image base directories + base_dirs_to_images: dict[str, list[tuple[str, str, str]]] = {} + for image_path_name_ext in image_path_name_exts: + pne = ft.path_components(image_path_name_ext) + path, name, ext = pne + base_dir, _, _ = ft.path_components(path) + + # Add the base directory + if base_dir not in base_dirs_to_images: + base_dirs_to_images[base_dir] = [] + base_dirs_to_images[base_dir].append(pne) + + # Add the leaf directory + if path not in base_dirs_to_images: + base_dirs_to_images[path] = [] + base_dirs_to_images[path].append(pne) + + # Find matches + matched_images = [] + for image_path_name_ext in image_path_name_exts: + path, name, ext = ft.path_components(image_path_name_ext) + + # Check if the image is a processed image + if name.endswith(' Processed') and ext.upper() == '.JPG': + base_name = name.split(' Processed')[0] + + # Get the images that are in the same directory or a sister directory + base_dir, _, _ = ft.path_components(path) + sister_image_path_name_exts = list(set(base_dirs_to_images[path] + base_dirs_to_images[base_dir])) + + # Find matching Raw image + raw_image = None + for other_path, other_name, other_ext in sister_image_path_name_exts: + if other_name == base_name + " Raw": + assert raw_image is None + raw_image = ft.join(other_path, other_name + other_ext) + + # Find matching PixelData image + pixel_data_image = None + for other_path, other_name, other_ext in sister_image_path_name_exts: + if (other_name == base_name + " PixelData") or (other_name == base_name + "_nosun PixelData"): + assert pixel_data_image is None + pixel_data_image = ft.join(other_path, other_name + other_ext) + + # Collect the images together in a tuple + matched_images.append((image_path_name_ext, raw_image, pixel_data_image)) + + # Sanity check: no image gets matched multiple times + all_matching_images: list[str] = [] + for processed, raw, pixeldata in matched_images: + assert processed not in all_matching_images + all_matching_images.append(processed) + assert raw not in all_matching_images + all_matching_images.append(raw) + assert pixeldata not in all_matching_images + all_matching_images.append(pixeldata) + + return matched_images + + +def prepare_for_tesseract(data_dir: str, processed_image_path_name_ext: str) -> tuple[np.ndarray, np.ndarray]: + """Get the region of image for the given processed image, and just the red + channel for the same region of interest.""" + roi = {'left': 514, 'top': 98, 'right': 514 + 84, 'bottom': 98 + 41} + + processed_image = np.array(Image.open(ft.join(data_dir, processed_image_path_name_ext))) + processed_image_roi = processed_image[roi['top'] : roi['bottom'], roi['left'] : roi['right']] + processed_image_red = processed_image_roi[:, :, 0] + + return processed_image_roi, processed_image_red + + +def load_gain_values( + data_dir: str, processed_raw_pixeldata: list[tuple[str, str, str]] +) -> list[tuple[str, str, str, int | None]]: + """Returns the ISO Exif information on the given images, if they have such + information. If not, then None is returned for that set of images.""" + processed_images = [ft.join(data_dir, processed) for processed, raw, pixeldata in processed_raw_pixeldata] + raw_images = [ft.join(data_dir, raw) for processed, raw, pixeldata in processed_raw_pixeldata] + pixeldata_images = [ft.join(data_dir, pixeldata) for processed, raw, pixeldata in processed_raw_pixeldata] + + with exiftool.ExifToolHelper() as et: + processed_tags = et.get_tags(processed_images, tags=["EXIF:ISO"]) + raw_tags = et.get_tags(raw_images, tags=["EXIF:ISO"]) + pixeldata_tags = et.get_tags(pixeldata_images, tags=["EXIF:ISO"]) + + has_processed_gain = ['EXIF:ISO' in tags for tags in processed_tags] + has_raw_gain = ['EXIF:ISO' in tags for tags in raw_tags] + has_pixeldata_gain = ['EXIF:ISO' in tags for tags in pixeldata_tags] + + gain_exif_values: list[int | None] = [] + for i in range(len(has_processed_gain)): + if has_processed_gain[i] and has_raw_gain[i] and has_pixeldata_gain[i]: + g1 = int(processed_tags[i]['EXIF:ISO']) + g2 = int(raw_tags[i]['EXIF:ISO']) + g3 = int(pixeldata_tags[i]['EXIF:ISO']) + if g1 == g2 and g1 == g3: + gain_exif_values.append(g1) + else: + gain_exif_values.append(None) + else: + gain_exif_values.append(None) + + ret: list[tuple[str, str, str, int | None]] = [] + for i in range(len(has_processed_gain)): + processed, raw, pixeldata = processed_raw_pixeldata[i] + ret.append((processed, raw, pixeldata, gain_exif_values[i])) + + return ret + + +def tesseract_read_gain_values( + data_dir: str, processed: str, raw: str, pixeldata: str +) -> tuple[str, str, str, int | None]: + """ + Use OCR to read the gain value from the processed version of the given image + + Parameters + ---------- + data_dir : str + The top level directory containing images. The image paths will be + relative to this directory. + processed : str + The relative path/name.ext of the processed image to read the gain value from. + raw : str + The relative path/name.ext of the raw image that matches the processed image. + pixeldata : str + The relative path/name.ext of the pixeldata image that matches the processed image. + + Returns + ------- + tuple[str, str, str, int | None] + The [processed, raw, pixeldata, gain] tuple for the image. + """ + gain: int = None + + # prepare the image + processed_image_roi, processed_image_red = prepare_for_tesseract(data_dir, processed) + + # Try from the standard RGB image + gain_str = pytesseract.image_to_string(processed_image_roi, lang='eng') + try: + gain = int(gain_str) + except Exception: + gain_str = "" + + # Try to read the gain value from just the red channel + if gain is None: + gain_str = pytesseract.image_to_string(processed_image_red, lang='eng') + try: + gain = int(gain_str) + except Exception: + gain_str = "" + + # Increase contrast and try again + if gain is None: + processed_image_red = cv.convertScaleAbs(processed_image_red, alpha=2.0, beta=0) + gain_str = pytesseract.image_to_string(processed_image_red, lang='eng') + try: + gain = int(gain_str) + except Exception: + gain_str = "" + + return processed, raw, pixeldata, gain + + +def tesseract_read_gain_values_as_necessary( + data_dir: str, processed: str, raw: str, pixeldata: str, gain: int | None +) -> tuple[str, str, str, int | None]: + if gain is not None: + return processed, raw, pixeldata, gain + return tesseract_read_gain_values(data_dir, processed, raw, pixeldata) + + +def ask_user_for_gain(data_dir: str, processed: str, raw: str, pixeldata: str) -> int: + # prepare the image + processed_image_roi, processed_image_red = prepare_for_tesseract(data_dir, processed) + + # draw the image + axis_control = rca.image(grid=False) + figure_control = rcfg.RenderControlFigure() + view_spec_2d = vs.view_spec_im() + fig_record1 = fm.setup_figure( + figure_control, axis_control, view_spec_2d, title=processed + " ROI", code_tag=f"{__file__}", equal=False + ) + fig_record2 = fm.setup_figure( + figure_control, axis_control, view_spec_2d, title=processed + " Red", code_tag=f"{__file__}", equal=False + ) + fig_record1.view.imshow(processed_image_roi) + fig_record1.view.show(block=False) + fig_record2.view.imshow(processed_image_red) + fig_record2.view.show(block=False) + + # ask the user for input + gain_str = input("What is the gain for this image? ") + gain = int(gain_str) + + fig_record1.close() + fig_record2.close() + + return gain + + +def ask_user_for_gain_as_necessary(data_dir: str, processed: str, raw: str, pixeldata: str, gain: int | None) -> int: + if gain is not None: + return gain + return ask_user_for_gain(data_dir, processed, raw, pixeldata) + + +def write_gain_value(image_path_name_exts: list[str], gain: int): + with exiftool.ExifToolHelper() as et: + try: + et.set_tags(image_path_name_exts, tags={"EXIF:ISO": str(gain)}, params=["-P", "-overwrite_original"]) + except exiftool.exceptions.ExifToolExecuteError: + + # The image may have been poorly formatted the first time around + for image_pne in image_path_name_exts: + + # Save the image to a new file with a trusted program (Pillow) + p, n, e = ft.path_components(image_pne) + rewrite = ft.join(p, n + " - rewrite" + e) + Image.open(image_pne).save(rewrite) + shutil.copystat(image_pne, rewrite) + ft.delete_file(image_pne) + ft.rename_file(rewrite, image_pne) + + # Try to set the gain EXIF information again + et.set_tags(image_pne, tags={"EXIF:ISO": str(gain)}, params=["-P", "-overwrite_original"]) + + +if __name__ == "__main__": + data_dir = ft.join(opencsp_settings["opencsp_root_path"]["experiment_dir"], "2_Data\\BCS_Data") + image_path_name_exts = it.image_files_in_directory(data_dir, recursive=True) + matching_bcs_images = get_matching_bcs_images(image_path_name_exts) + + # Sanity check: there are no images that aren't matched + all_matching_images: list[str] = [] + unmatched_images: list[str] = [] + for processed, raw, pixeldata in matching_bcs_images: + all_matching_images.append(processed) + all_matching_images.append(raw) + all_matching_images.append(pixeldata) + for image_path_name_ext in image_path_name_exts: + if image_path_name_ext not in all_matching_images: + unmatched_images.append(image_path_name_ext) + assert len(unmatched_images) == 0 + + # Read the already assigned gain values, in case some images already have this value + print("Loading gain values...", end="") + bcs_images_gains = load_gain_values(data_dir, matching_bcs_images) + print("[done]") + + # How many images need gains? + missing_bcs_images_gains: list[tuple[str, str, str, int | None]] = [] + for i in range(len(bcs_images_gains)): + processed, raw, pixeldata, gain = bcs_images_gains[i] + if gain is None: + missing_bcs_images_gains.append(bcs_images_gains[i]) + print(f"Missing {len(missing_bcs_images_gains)} gains") + + # Use OCR to read gain values + print("Reading gain values...") + tesseract_bcs_images_gains: list[tuple[str, str, str, int | None]] = [] + with multiprocessing.Pool() as pool: + chunk_size = pool._processes * 10 + for chunk_start in range(0, len(missing_bcs_images_gains), chunk_size): + chunk_stop = min(len(missing_bcs_images_gains), chunk_start + chunk_size) + chunk = missing_bcs_images_gains[chunk_start:chunk_stop] + print(f"processing {chunk_start}:{chunk_stop}/{len(missing_bcs_images_gains)}") + tesseract_bcs_images_gains += pool.starmap( + tesseract_read_gain_values_as_necessary, [(data_dir, *matches) for matches in chunk] + ) + + # How many more gains did we identify? + new_tesseract_bcs_images_gains: list[tuple[str, str, str, int | None]] = [] + for i in range(len(tesseract_bcs_images_gains)): + processed, raw, pixeldata, gain = missing_bcs_images_gains[i] + _, _, _, updated_gain = tesseract_bcs_images_gains[i] + if updated_gain is not None: + if (gain is None) or (updated_gain != gain): + new_tesseract_bcs_images_gains.append(tesseract_bcs_images_gains[i]) + print(f"Found {len(new_tesseract_bcs_images_gains)} new gain values") + + # Write gain values + print("Write gain values...", end="") + for i in range(len(new_tesseract_bcs_images_gains)): + print(f"\rWrite gain values...{i}", end="") + processed, raw, pixeldata, gain = new_tesseract_bcs_images_gains[i] + to_set_images = [ft.join(data_dir, pne) for pne in [processed, raw, pixeldata]] + write_gain_value(to_set_images, gain) + print(" [done]") + + # Populate unknown gain values from the user + nvalues_from_user = len(filter(lambda prpg: prpg[3] is None, tesseract_bcs_images_gains)) + print(f"Gathering {nvalues_from_user} gain values from the user") + manual_bcs_images_gains: list[tuple[str, str, str, int | None]] = [] + for i in range(len(tesseract_bcs_images_gains)): + processed, raw, pixeldata, gain = tesseract_bcs_images_gains[i] + if gain is None: + gain = ask_user_for_gain_as_necessary(data_dir, processed, raw, pixeldata, gain) + if gain is not None: + manual_bcs_images_gains += [(processed, raw, pixeldata, gain)] + + # Write gain values + print("Write gain values...", end="") + for i in range(len(manual_bcs_images_gains)): + print(f"\rWrite gain values...{i}", end="") + processed, raw, pixeldata, gain = manual_bcs_images_gains[i] + to_set_images = [ft.join(data_dir, pne) for pne in [processed, raw, pixeldata]] + write_gain_value(to_set_images, gain) + print(" [done]") diff --git a/contrib/experiments/WIP/2_AddGainToImagename.py b/contrib/experiments/WIP/2_AddGainToImagename.py new file mode 100644 index 000000000..1e63c9cb1 --- /dev/null +++ b/contrib/experiments/WIP/2_AddGainToImagename.py @@ -0,0 +1,39 @@ +import shutil +import multiprocessing + +import cv2 as cv +import numpy as np +from PIL import Image +import pytesseract + +from opencsp import opencsp_settings +import opencsp.common.lib.render.figure_management as fm +import opencsp.common.lib.render.view_spec as vs +import opencsp.common.lib.render_control.RenderControlAxis as rca +import opencsp.common.lib.render_control.RenderControlFigure as rcfg +import opencsp.common.lib.render_control.RenderControlFigureRecord as rcfr +import opencsp.common.lib.render_control.RenderControlPointSeq as rcps +import opencsp.common.lib.tool.file_tools as ft +import opencsp.common.lib.tool.image_tools as it +import opencsp.common.lib.tool.log_tools as lt + +from contrib.experiments.WIP.AssignImageGains import get_matching_bcs_images + + +def get_parent_dir(image_path_name_ext: str): + image_path, image_name, image_ext = ft.path_components(image_path_name_ext) + parent_path, image_subdir, _ = ft.path_components(image_path) + return parent_path + + +if __name__ == "__main__": + data_dir = ft.join(opencsp_settings["opencsp_root_path"]["experiment_dir"], "2_Data") + image_path_name_exts = it.image_files_in_directory(data_dir, recursive=True) + + gains = it.get_exif_value(data_dir, image_path_name_exts) + for gain, image_path_name_ext in zip(gains, image_path_name_exts): + if gain is None: + lt.error(f"Image {image_path_name_ext} has no gain value set") + else: + p, n, e = ft.path_components(image_path_name_ext) + ft.rename_file(ft.join(data_dir, image_path_name_ext), ft.join(data_dir, p, n + f"_g{gain}" + e)) diff --git a/contrib/experiments/WIP/3_AnalyzeHeliostats.py b/contrib/experiments/WIP/3_AnalyzeHeliostats.py new file mode 100644 index 000000000..9349e93f6 --- /dev/null +++ b/contrib/experiments/WIP/3_AnalyzeHeliostats.py @@ -0,0 +1,395 @@ +import copy +import datetime +import multiprocessing +import re +import shutil + +import cv2 as cv +import numpy as np +from PIL import Image +import pytesseract + +from opencsp import opencsp_settings +from contrib.common.lib.cv.spot_analysis.image_processor import * +from opencsp.common.lib.cv.CacheableImage import CacheableImage +from opencsp.common.lib.cv.SpotAnalysis import SpotAnalysis +from opencsp.common.lib.cv.annotations.HotspotAnnotation import HotspotAnnotation +from opencsp.common.lib.cv.spot_analysis.ImageType import ImageType +from opencsp.common.lib.cv.spot_analysis.SpotAnalysisOperable import SpotAnalysisOperable +from opencsp.common.lib.cv.spot_analysis.image_processor import * +import opencsp.common.lib.render.Color as color +import opencsp.common.lib.render.figure_management as fm +import opencsp.common.lib.render.view_spec as vs +import opencsp.common.lib.render.PowerpointSlide as ps +import opencsp.common.lib.render.lib.PowerpointImage as ppi +import opencsp.common.lib.render.lib.PowerpointText as pptext +import opencsp.common.lib.render_control.RenderControlAxis as rca +import opencsp.common.lib.render_control.RenderControlFigure as rcfg +import opencsp.common.lib.render_control.RenderControlFigureRecord as rcfr +import opencsp.common.lib.render_control.RenderControlPointSeq as rcps +import opencsp.common.lib.render_control.RenderControlPowerpointPresentation as rcpp +import opencsp.common.lib.render_control.RenderControlPowerpointSlide as rcpps +import opencsp.common.lib.tool.exception_tools as et +import opencsp.common.lib.tool.file_tools as ft +import opencsp.common.lib.tool.image_tools as it +import opencsp.common.lib.tool.log_tools as lt + + +def get_parent_dir(image_path_name_ext: str): + """ + Returns the parent's parent directory of the the given image and it's + siblings. For example, "experiment_time" will be returned for the + image_path_name_ext "experiment_name/experiment_time/heliostat/image_name.ext". + """ + image_path, image_name, image_ext = ft.path_components(image_path_name_ext) + parent_path, image_subdir, _ = ft.path_components(image_path) + return parent_path + + +def crop_images(to_crop_dir: str, left_upper_right_lower: tuple[int, int, int, int]): + for image_name_ext in it.image_files_in_directory(to_crop_dir): + _, n, e = ft.path_components(image_name_ext) + cropped_image_name_ext = n + "_cropped" + e + img = Image.open(ft.join(to_crop_dir, image_name_ext)) + left, upper, right, lower = 965, 0, 965 + 661, 0 + 526 + img = img.crop((left, upper, right, lower)) + img.save(ft.join(to_crop_dir, cropped_image_name_ext)) + gain = it.get_exif_value(to_crop_dir, cropped_image_name_ext) + if gain is None: + gain = it.get_exif_value(to_crop_dir, image_name_ext) + it.set_exif_value(to_crop_dir, cropped_image_name_ext, str(gain)) + gain = it.get_exif_value(to_crop_dir, cropped_image_name_ext) + if gain is None: + lt.error_and_raise(RuntimeError, cropped_image_name_ext) + else: + ft.delete_file(ft.join(to_crop_dir, image_name_ext)) + else: + ft.delete_file(ft.join(to_crop_dir, image_name_ext)) + + +def get_time_from_image_name(bcs_image_name: str): + """Example name '20241004_150547.27 hourly_1500_images.png'""" + _, bcs_image_name, _ = ft.path_components(bcs_image_name) + time_pattern = re.compile(r"^([0-9]{8})_([0-9]{6})\.([0-9]{2}) .*$") + + # verify the format + match = time_pattern.match(bcs_image_name.strip()) + if match is None: + lt.error_and_raise( + ValueError, "Error in get_time_from_image_name: " + f"unexpected format for image name {bcs_image_name}" + ) + + # parse out the time + ymd = match.groups()[0] + hms = match.groups()[1] + centi = match.groups()[2] + year, month, day = int(ymd[:4]), int(ymd[4:6]), int(ymd[6:]) + hour, minute, second = int(hms[:2]), int(hms[2:4]), int(hms[4:]) + centisecond = int(centi) + + # create the time instance + dt = datetime.datetime(year, month, day, hour, minute, second, centisecond * 10 * 1000) + + return dt + + +def preprocess_images(images_dir: str, image_names_exts: list[str]) -> CacheableImage: + image_processors = { + "AvgG": AverageByGroupImageProcessor(lambda o: 0, lambda l: None), + "Conv": ConvolutionImageProcessor(), + } + sa = SpotAnalysis("preprocess_images", image_processors=list(image_processors.values())) + sa.set_primary_images([ft.join(images_dir, fn) for fn in image_names_exts]) + for result in sa: + pass + + return result.primary_image + + +def process_images( + on_sun_dir: str, + no_sun_dir: str | None, + results_dir: str, + on_sun_images: list[str] = None, + no_sun_images: list[str] = None, + save_name_prefix: str = "", +) -> tuple[str, str, str]: + """ + Find the hotspot for the given on sun images, and plot the cross section + (including the no-sun cross section). The resulting visualization images are + saved in the results_dir. + + Parameters + ---------- + on_sun_dir : str + Directory containing the on-sun images for some number of heliostats. + no_sun_dir : str | None + Directory containing the corresponding no-sun images. None if there are + no matching no-sun images. + results_dir : str + Where to save the output visualization images to. + + Returns + ------- + hotspot_vis: str + The path/name.ext of the saved hotspot visualization image. + crosssection_vis_1: str + The path/name.ext of the saved cross section visualization image. + crosssection_vis_2: str + The path/name.ext of the saved cross section visualization image. + """ + # load the no-sun images + no_sun_images = None + if no_sun_dir is not None: + if no_sun_images is None: + no_sun_images = it.image_files_in_directory(no_sun_dir) + no_sun_images = [ft.join(no_sun_dir, image_name_ext) for image_name_ext in no_sun_images] + + # load the heliostat images + if on_sun_images is None: + on_sun_images = it.image_files_in_directory(on_sun_dir) + on_sun_images = [ft.join(on_sun_dir, image_name_ext) for image_name_ext in on_sun_images] + _, on_sun_name, on_sun_ext = ft.path_components(on_sun_images[0]) + original_image = CacheableImage.from_single_source(on_sun_images[0]) + + def hotspot_pixel_locator(operable: SpotAnalysisOperable) -> tuple[int, int]: + """Returns the x/y pixel location of the hotspot center""" + hotspots = filter(lambda a: isinstance(a, HotspotAnnotation), operable.annotations) + hs_annotation = list(hotspots)[0] + ret = hs_annotation.origin.astuple() + return (int(ret[0]), int(ret[1])) + + # build the list of image processors for the primary image + remove_leftover_noise = BackgroundColorSubtractionImageProcessor('constant', [2]) + image_processors = { + "EchoEcho": EchoImageProcessor(), + "AvgGroup": AverageByGroupImageProcessor(), + "BlurGaus": ConvolutionImageProcessor(), + "NullSubt": NullImageSubtractionImageProcessor(), + "ConstSub": remove_leftover_noise, + "PopStats": PopulationStatisticsImageProcessor(), + "SaveImag": SaveToFileImageProcessor(results_dir, prefix=on_sun_name, suffix="_null_image_subtraction"), + "Centroid": MomentsImageProcessor( + include_visualization=True, centroid_style=rcps.default(color=color.cyan(), markersize=20) + ), + "HotSpots": HotspotImageProcessor(15, record_visualization=True), + "SpotSize": SpotWidthImageProcessor(spot_width_technique="fwhm"), + "VFalseCl": ViewFalseColorImageProcessor(), + "VOverExp": ViewHighlightImageProcessor(base_image_selector='visualization', black_highlight_color=(70, 0, 70)), + "VAnnotat": ViewAnnotationsImageProcessor(base_image_selector='visualization'), + "_VFalse2": ViewFalseColorImageProcessor(), + "_VOverE2": ViewHighlightImageProcessor(base_image_selector='visualization', black_highlight_color=(70, 0, 70)), + "VCrosSec": ViewCrossSectionImageProcessor( + hotspot_pixel_locator, single_plot=False, y_range=(0, 255), base_image_selector='visualization' + ), + "EnclEnrg": EnclosedEnergyImageProcessor("hotspot", percentages_of_interest=[0.85], plot_x_limit_pixels=600), + } + _p = image_processors + processors_per_slide = [ + [ + original_image, + (_p["AvgGroup"], "Average"), + (_p["BlurGaus"], "Blur"), + (_p["NullSubt"], "Subtract No-Sun"), + (_p["ConstSub"], "Threshold Subtraction", ImageType.PRIMARY), + ], + [(_p["Centroid"], "Centroid & Principle Axis"), (_p["HotSpots"], "Max Intensity"), _p["SpotSize"]], + [_p["VAnnotat"]], + [_p["VAnnotat"], (_p["EnclEnrg"], "Encircled Energy", ImageType.VISUALIZATION), _p["VCrosSec"]], + ] + image_processors["PowerPnt"] = PowerpointImageProcessor( + results_dir, save_name_prefix, overwrite=True, processors_per_slide=processors_per_slide + ) + no_sun_image_processors = { + "EchoEcho": EchoImageProcessor(), + "AvgGroup": AverageByGroupImageProcessor(), + "BlurGaus": ConvolutionImageProcessor(), + } + + # process the on-sun and no-sun images + + # Find the hotspot and visualize the vertical/horizontal cross-sections + if no_sun_images is not None: + no_sun_spot_analysis = SpotAnalysis( + "NoSunSpotAnalysis", image_processors=list(no_sun_image_processors.values()) + ) + no_sun_spot_analysis.set_primary_images(no_sun_images) + lt.info("Processing no-sun images") + no_sun_image = next(iter(no_sun_spot_analysis)).primary_image + spot_analysis = SpotAnalysis("SpotAnalysis", image_processors=list(image_processors.values())) + spot_analysis.set_primary_images(on_sun_images) + spot_analysis.set_default_support_images({ImageType.NULL: no_sun_image}) + lt.info("Processing on-sun images") + result = next(iter(spot_analysis)) + + # Save the visualization images to the results directory + background_sub_algo_images = result.algorithm_images[image_processors["ConstSub"]] + centroid_vis_images = result.visualization_images[image_processors["Centroid"]] + false_color_vis_image = result.visualization_images[image_processors["VFalseCl"]][-1] + false_color_highlights_vis_image = result.visualization_images[image_processors["VOverExp"]][-1] + hotspot_vis_image = result.visualization_images[image_processors["VAnnotat"]][-1] + crosssec_vis_images = result.visualization_images[image_processors["VCrosSec"]] + enclosed_energy_vis_images = result.visualization_images[image_processors["EnclEnrg"]] + + ft.copy_file(on_sun_images[0], results_dir, f"0_{save_name_prefix}_original.png") + no_sun_image.to_image().save(ft.join(results_dir, f"1_{save_name_prefix}_no_sun_avg.png")) + for i, image in enumerate(background_sub_algo_images): + titles = ["color", "result"] + image.to_image().save(ft.join(results_dir, f"2_{save_name_prefix}_background_subtraction_{titles[i]}.png")) + false_color_vis_image.to_image().save(ft.join(results_dir, f"3_{save_name_prefix}_" + "VFalseCl.png")) + false_color_highlights_vis_image.to_image().save(ft.join(results_dir, f"4_{save_name_prefix}_" + "VOverExp.png")) + hotspot_vis_image.to_image().save(ft.join(results_dir, f"5_{save_name_prefix}_" + "VHotSpt_Centroid.png")) + for i, image in enumerate(crosssec_vis_images): + titles = ["vis", "horizontal", "vertical"] + image.to_image().save(ft.join(results_dir, f"6_{save_name_prefix}_crosssection_{titles[i]}.png")) + enclosed_energy_vis_images[0].to_image().save(ft.join(results_dir, f"7_{save_name_prefix}_enclosed_energy.png")) + + +if __name__ == "__main__": + data_dir = ft.join(opencsp_settings["opencsp_root_path"]["experiment_dir"], "2_Data", "BCS_Data_sorted") + process_dir = ft.join(opencsp_settings["opencsp_root_path"]["experiment_dir"], "3_Process", "BCS_Data_sorted") + image_path_name_exts = it.image_files_in_directory(data_dir, recursive=True) + time_dirs = sorted(list(set([get_parent_dir(image_path_name_ext) for image_path_name_ext in image_path_name_exts]))) + + doi = opencsp_settings["opencsp_root_path"]["data_of_interest"] + cross_section_dir = ft.join(process_dir, f"{doi}_Cross_Sections") + if ft.directory_exists(cross_section_dir): + for file in ft.files_in_directory(cross_section_dir, sort=False, files_only=True, recursive=True): + ft.delete_file(ft.join(cross_section_dir, file)) + for file in ft.files_in_directory(cross_section_dir, sort=False, files_only=False, recursive=True): + with et.ignored(Exception): + shutil.rmtree(file) + for time_dirname in time_dirs: + contained_dirs = ft.directories_in_directory(ft.join(data_dir, time_dirname)) + + # make sure we have data of interest + on_sun_dir = ft.join(time_dirname, doi) + if doi not in contained_dirs: + if f"{doi}" in contained_dirs: + ft.rename_directory(ft.join(data_dir, time_dirname, f"{doi}"), ft.join(data_dir, time_dirname, doi)) + else: + lt.info(f"{time_dirname=} doesn't have any {doi} data. ({contained_dirs=})") + continue + on_sun_dir = ft.join(data_dir, on_sun_dir) + on_sun_images = it.image_files_in_directory(on_sun_dir) + if len(on_sun_images) == 0: + lt.info(f"{time_dirname=} has an empty on-sun directory {on_sun_dir}") + continue + + # check for corresponding nosun data + no_sun_dir = None + possible_no_sun_dirnames = [f"NoSun{doi}", f"NoSun", "NoSunSF"] + for possible_no_sun_dirname in possible_no_sun_dirnames: + if possible_no_sun_dirname in contained_dirs: + no_sun_dir = ft.join(time_dirname, possible_no_sun_dirname) + if no_sun_dir is None: + lt.info(f"{time_dirname=} has no {possible_no_sun_dirnames} directories ({contained_dirs=})") + else: + no_sun_dir = ft.join(data_dir, no_sun_dir) + + # limit on-sun images to those with a consistent gain + inconsistent_gain_images: list[str] = [] + on_sun_gains = it.get_exif_value(on_sun_dir, on_sun_images) + target_on_sun_gain = on_sun_gains[-1] + for on_sun_image, on_sun_gain in list(zip(on_sun_images, on_sun_gains)): + if np.abs(on_sun_gain - target_on_sun_gain) > 10: + on_sun_images.remove(on_sun_image) + inconsistent_gain_images.append(on_sun_image) + + # Limit on-sun images to those within 2 seconds of each other, to reduce + # blur caused by the motion of the sun. + outside_time_range_images: list[str] = [] + target_time = get_time_from_image_name(on_sun_images[0]) + for on_sun_image in copy.copy(on_sun_images): + image_time = get_time_from_image_name(on_sun_image) + if (image_time - target_time).total_seconds() >= 2: + on_sun_images.remove(on_sun_image) + outside_time_range_images.append(on_sun_image) + + # limit the no-sun images to those with a gain near the on-sun data + no_sun_images = None + if no_sun_dir is not None: + no_sun_images = it.image_files_in_directory(no_sun_dir) + no_sun_gains = it.get_exif_value(no_sun_dir, no_sun_images) + target_no_sun_gain = no_sun_gains[-1] + for no_sun_image, no_sun_gain in list(zip(no_sun_images, no_sun_gains)): + if np.abs(no_sun_gain - target_no_sun_gain) > 10: + no_sun_images.remove(no_sun_image) + if no_sun_images is None or len(no_sun_images) == 0: + lt.info(f"{time_dirname=} has an empty no-sun directory or has no no-sun directory") + no_sun_dir = None + + # build the results for this directory + lt.info(f"Processing images for {time_dirname}") + lt.info( + f"\ton_sun_images: {len(on_sun_images)}, " + + f"off_sun_images: {0 if no_sun_images is None else len(no_sun_images)}, " + + f"gain images rejected: {len(inconsistent_gain_images)}, " + + f">2s images rejected: {len(outside_time_range_images)}" + ) + results_dir = ft.join(cross_section_dir, time_dirname) + ft.create_directories_if_necessary(results_dir) + process_images( + on_sun_dir, + no_sun_dir, + results_dir, + on_sun_images=on_sun_images, + no_sun_images=no_sun_images, + save_name_prefix=f"{doi}_{time_dirname}", + ) + + break + + # build the powerpoint + ppt = rcpp.RenderControlPowerpointPresentation() + for time_dirname in ft.directories_in_directory(cross_section_dir): + dir = ft.join(cross_section_dir, time_dirname) + result_image_name_exts = it.image_files_in_directory(dir) + + slide = ps.PowerpointSlide.template_content_grid(2, 2) + slide.set_title(time_dirname) + slide.add_image( + ppi.PowerpointImage( + ft.join(dir, list(filter(lambda f: "_original" in f, result_image_name_exts))[0]), caption="Original" + ) + ) + slide.add_image( + ppi.PowerpointImage( + ft.join(dir, list(filter(lambda f: "_null_image_subtraction" in f, result_image_name_exts))[0]), + caption="After subtracting NoSun image", + ) + ) + slide.add_image( + ppi.PowerpointImage( + ft.join(dir, list(filter(lambda f: "_VHotSpt_Centroid" in f, result_image_name_exts))[0]), + caption="Hotspot, Centroid", + ) + ) + slide.add_image( + ppi.PowerpointImage( + ft.join(dir, list(filter(lambda f: "_enclosed_energy" in f, result_image_name_exts))[0]), + caption="Enclosed Energy", + ) + ) + slide.save_and_bake() + ppt.add_slide(slide) + + slide = ps.PowerpointSlide.template_content_grid(2, 2) + slide.set_title(time_dirname) + slide.add_image( + ppi.PowerpointImage( + ft.join(dir, list(filter(lambda f: "crosssection_vis" in f, result_image_name_exts))[0]), + caption="Hotspot", + ), + index=0, + ) + for result_image_name_ext in result_image_name_exts: + if "no_sun_avg" in result_image_name_ext: + ppt_img = ppi.PowerpointImage(ft.join(dir, result_image_name_ext), caption=result_image_name_ext) + slide.add_image(ppt_img, index=1) + slide.add_image(ft.join(dir, list(filter(lambda f: "_horizontal" in f, result_image_name_exts))[0]), index=2) + slide.add_image(ft.join(dir, list(filter(lambda f: "_vertical" in f, result_image_name_exts))[0]), index=3) + slide.save_and_bake() + ppt.add_slide(slide) + + ppt.save(ft.join(process_dir, f"{doi}_Cross_Sections.pptx"), overwrite=True)