Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

232 update PeakFlux.py, add new code for testing spot analysis #233

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 143 additions & 103 deletions contrib/app/SpotAnalysis/PeakFlux.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import numpy as np

from contrib.app.SpotAnalysis.PeakFluxSettings import PeakFluxSettings
import opencsp.common.lib.cv.SpotAnalysis as sa
from opencsp.common.lib.cv.annotations.HotspotAnnotation import HotspotAnnotation
from opencsp.common.lib.cv.fiducials.BcsFiducial import BcsFiducial
Expand All @@ -11,8 +12,8 @@
import opencsp.common.lib.cv.spot_analysis.SpotAnalysisOperableAttributeParser as saoap
from opencsp.common.lib.cv.spot_analysis.image_processor import *
import opencsp.common.lib.tool.file_tools as ft
import opencsp.common.lib.tool.image_tools as it
import opencsp.common.lib.tool.log_tools as lt
import opencsp.common.lib.tool.time_date_tools as tdt


class PeakFlux:
Expand All @@ -34,138 +35,177 @@ class PeakFlux:
- Per-heliostat peak flux identification
"""

def __init__(self, indir: str, outdir: str, experiment_name: str, settings_path_name_ext: str):
self.indir = indir
def __init__(self, outdir: str, experiment_name: str, settings: PeakFluxSettings):
self.outdir = outdir
self.experiment_name = experiment_name
self.settings_path_name_ext = settings_path_name_ext
self.settings = settings

settings_path, settings_name, settings_ext = ft.path_components(self.settings_path_name_ext)
settings_dict = ft.read_json("PeakFlux settings", settings_path, settings_name + settings_ext)
self.crop_box: list[int] = settings_dict['crop_box']
self.bcs_pixel: list[int] = settings_dict['bcs_pixel_location']
self.heliostate_name_pattern = re.compile(settings_dict['heliostat_name_pattern'])

group_assigner = AverageByGroupImageProcessor.group_by_name(re.compile(r"(_off)?( Raw)"))
group_assigner = AverageByGroupImageProcessor.group_by_name(re.compile(r"([0-9]{1,2}[EW][0-9]{1,2})"))
group_trigger = AverageByGroupImageProcessor.group_trigger_on_change()
supporting_images_map = {
ImageType.PRIMARY: lambda operable, operables: "off" not in operable.get_primary_path_nameext(),
ImageType.NULL: lambda operable, operables: "off" in operable.get_primary_path_nameext(),
}

self.image_processors: list[AbstractSpotAnalysisImageProcessor] = [
CroppingImageProcessor(*self.crop_box),
AverageByGroupImageProcessor(group_assigner, group_trigger),
EchoImageProcessor(),
SupportingImagesCollectorImageProcessor(supporting_images_map),
NullImageSubtractionImageProcessor(),
ConvolutionImageProcessor(kernel="box", diameter=3),
BcsLocatorImageProcessor(),
View3dImageProcessor(crop_to_threshold=20, max_resolution=(100, 100), interactive=False),
HotspotImageProcessor(desired_shape=21, draw_debug_view=False),
ViewCrossSectionImageProcessor(
self.get_bcs_origin, 'BCS', single_plot=False, crop_to_threshold=20, interactive=True
self.image_processors: list[AbstractSpotAnalysisImageProcessor] = {
'Crop': CroppingImageProcessor(*self.crop_box),
'AvgG': AverageByGroupImageProcessor(group_assigner, group_trigger),
'Echo': EchoImageProcessor(),
'Coll': SupportingImagesCollectorImageProcessor(supporting_images_map),
'Null': NullImageSubtractionImageProcessor(),
'Noiz': ConvolutionImageProcessor(kernel="box", diameter=3),
'Targ': TargetBoardLocatorImageProcessor(
None,
None,
settings.target_board_size_wh[0],
settings.target_board_size_wh[1],
settings.target_canny_gradients[0],
settings.target_canny_gradients[1],
edge_coarse_width=30,
canny_test_gradients=settings.target_canny_test_gradients,
),
'Fill': InpaintImageProcessor(settings.infill_mask_file, settings.tmpdir),
'Save': SaveToFileImageProcessor(ft.join(settings.outdir, "filled_targets")),
'Ve3d': View3dImageProcessor(crop_to_threshold=20, max_resolution=(100, 100)),
'HotL': HotspotImageProcessor(
desired_shape=49, draw_debug_view=False, record_visualization=False, record_debug_view=6
),
ViewCrossSectionImageProcessor(
self.get_peak_origin, 'Hotspot', single_plot=False, crop_to_threshold=20, interactive=True
'Vcx2': ViewCrossSectionImageProcessor(
self.get_peak_origin, 'Hotspot', single_plot=False, crop_to_threshold=20
),
PopulationStatisticsImageProcessor(initial_min=0, initial_max=255),
FalseColorImageProcessor(),
AnnotationImageProcessor(),
]
'Stat': PopulationStatisticsImageProcessor(min_pop_size=1, initial_min=0, initial_max=255),
'Fclr': ViewFalseColorImageProcessor(),
'Anno': ViewAnnotationsImageProcessor(),
}
self.pptx_processor = PowerpointImageProcessor(
save_dir=outdir,
save_name="processing_pipeline",
overwrite=True,
processors_per_slide=[
[self.image_processors['Noiz'], self.image_processors['Targ']],
[self.image_processors['Fill']],
[self.image_processors['Ve3d']],
[self.image_processors['HotL']],
[self.image_processors['Vcx2']],
[self.image_processors['Fclr'], self.image_processors['Anno']],
],
)
image_processor_list = list(self.image_processors.values()) + [self.pptx_processor]
self.image_processors_list: list[AbstractSpotAnalysisImageProcessor] = image_processor_list

self.spot_analysis = sa.SpotAnalysis(
experiment_name, self.image_processors, save_dir=outdir, save_overwrite=True
experiment_name, self.image_processors_list, save_dir=outdir, save_overwrite=True
)

filenames = ft.files_in_directory_by_extension(self.indir, [".jpg"])[".jpg"]
source_path_name_exts = [os.path.join(self.indir, filename) for filename in filenames]
self.spot_analysis.set_primary_images(source_path_name_exts)

def run(self):
# process all images from indir
for result in self.spot_analysis:
# save the processed image
save_path = self.spot_analysis.save_image(
result, self.outdir, save_ext="png", also_save_supporting_images=False, also_save_attributes_file=True
)
if save_path is None:
lt.warn(
f"Warning in PeakFlux.run(): failed to save image. "
+ "Maybe SpotAnalaysis.save_overwrite is False? ({self.spot_analysis.save_overwrite=})"
)
def assign_inputs(self, dirs_or_files: str | list[str]):
"""
Assigns the given image files or the image files in the given
directories as input to this instance's SpotAnalysis. Must be done
before iterating through this instance's SpotAnalysis.

Parameters
----------
dirs_or_files : str | list[str]
The image files to be analyzed, or directories that contain image
files. Can be mixed files and directores.
"""
file_path_name_exts: list[str] = []

for dir_or_file in dirs_or_files:
if os.path.isfile(dir_or_file):
file_path_name_exts.append(dir_or_file)
elif os.path.isdir(dir_or_file):
dir_files = it.image_files_in_directory(dir_or_file)
file_path_name_exts += [os.path.join(dir_or_file, filename) for filename in dir_files]
else:
lt.info(f"Saved image to {save_path}")
lt.error_and_raise(
FileNotFoundError,
"Error in PeakFlux.assign_inputs(): "
+ f"given dir_or_file {dir_or_file} is neither a directory nor a file.",
)

# Get the attributes of the processed image, to save the results we're most interested in into a single
# condensed csv file.
parser = saoap.SpotAnalysisOperableAttributeParser(result, self.spot_analysis)
self.spot_analysis.set_primary_images(file_path_name_exts)

def get_bcs_origin(self, operable: SpotAnalysisOperable) -> tuple[int, int]:
def assign_target_board_reference_image(self, reference_image_dir_or_file: str):
"""
Returns the origin pixel location of the BCS fiducial assigned to this operable.
Assigns the given image file or directory containing image files as the
reference for the TargetBoardLocatorImageProcessor.

Parameters
----------
operable : SpotAnalysisOperable
An operable that has resulted from a BcsLocatorImageProcessor and
has an assigned BCS fiducial.

Returns
-------
bcs_origin: tuple[int, int]
The origin of the BCS fiducial. For circular BCS systems, this will
be the center point of the circle.
reference_image_dir_or_file : str
The image file, or the directory containing image files.
"""
targ_processor: TargetBoardLocatorImageProcessor = self.image_processors['Targ']
targ_processor.set_reference_images(reference_image_dir_or_file)

def assign_inputs_from_settings(self):
"""
Use the values from the PeakFluxSettings instance to find input images
and reference images, and assign them to this PeakFlux instance.
"""
# Build the filters for the directories
is_reference_dir = lambda dn: any([rdn in dn for rdn in self.settings.target_reference_images_dirnames])
is_missed_helio_dir = lambda dn: self.get_helio_name(dn) in self.settings.missed_heliostats
is_issue_helio_dir = lambda dn: self.get_helio_name(dn) in self.settings.issue_heliostats
has_helio_name = lambda dn: self.get_helio_name(dn) is not None

# Get the target reference directories. These should be directories that
# contain images without any sun on them.
dirnames = ft.files_in_directory(self.settings.indir, files_only=False)
reference_dirnames = list(filter(lambda dn: is_reference_dir(dn), dirnames))
target_reference_path = ft.norm_path(os.path.join(self.settings.indir, reference_dirnames[0], "Raw Images"))

# Get the heliostat image directories. Since the directories names start
# with a datetime string, sorting will put them all into collection order.
heliostat_dirnames = list(filter(lambda dn: not is_reference_dir(dn), dirnames))
heliostat_dirnames = list(filter(lambda dn: not is_missed_helio_dir(dn), heliostat_dirnames))
heliostat_dirnames = list(filter(lambda dn: not is_issue_helio_dir(dn), heliostat_dirnames))
heliostat_dirnames = list(filter(lambda dn: has_helio_name(dn), heliostat_dirnames))
# heliostat_dirnames = list(filter(lambda dn: "07E01" in dn, heliostat_dirnames))
if self.settings.limit_num_heliostats >= 0:
heliostat_dirnames = heliostat_dirnames[
: np.min([self.settings.limit_num_heliostats, len(heliostat_dirnames)])
]
heliostat_dirs = [ft.norm_path(os.path.join(self.settings.indir, dn)) for dn in heliostat_dirnames]

# Compile a list of images from the image directories.
source_images_path_name_exts: list[str] = []
for dirname in heliostat_dirs:
raw_images_path = ft.norm_path(os.path.join(self.settings.indir, dirname, "Raw Images"))
if not ft.directory_exists(raw_images_path):
lt.error_and_raise(
FileNotFoundError,
"Error in peak_flux: " + f"raw images directory \"{raw_images_path}\" does not exist",
)
for file_name_ext in ft.files_in_directory(raw_images_path, files_only=True):
source_images_path_name_exts.append(ft.norm_path(os.path.join(raw_images_path, file_name_ext)))

self.assign_inputs(source_images_path_name_exts)
self.assign_target_board_reference_image(target_reference_path)

def get_helio_name(self, dirname: str) -> str | None:
"""Uses the heliostat_name_pattern from the peak flux settings to parse
the heliostat name out of the given directory name."""
helio_names = self.settings.heliostat_name_pattern.findall(dirname)
if len(helio_names) == 0:
return None
return helio_names[0]

def get_bcs_origin(self, operable: SpotAnalysisOperable):
fiducials = operable.get_fiducials_by_type(BcsFiducial)
if len(fiducials) == 0:
return None
fiducial = fiducials[0]
origin_fx, origin_fy = fiducial.origin.astuple()
origin_ix, origin_iy = int(np.round(origin_fx)), int(np.round(origin_fy))
return origin_ix, origin_iy

def get_peak_origin(self, operable: SpotAnalysisOperable) -> tuple[int, int]:
"""
Get the peak pixel location of the hotspot for the given operable.

Parameters
----------
operable : SpotAnalysisOperable
An operable that has resulted from a HotspotImageProcessor and has
an assigned hotspot annotation.

Returns
-------
peak_origin: tuple[int, int]
The origin of the hotspot annotation.
"""
def get_peak_origin(self, operable: SpotAnalysisOperable):
fiducials = operable.get_fiducials_by_type(HotspotAnnotation)
if len(fiducials) == 0:
return None
fiducial = fiducials[0]
origin_fx, origin_fy = fiducial.origin.astuple()
origin_ix, origin_iy = int(np.round(origin_fx)), int(np.round(origin_fy))
return origin_ix, origin_iy


if __name__ == "__main__":
import argparse

parser = argparse.ArgumentParser(
prog=__file__.rstrip(".py"), description='Processes images to find the point of peak flux.'
)
parser.add_argument('indir', type=str, help="Directory with images to be processed.")
parser.add_argument('outdir', type=str, help="Directory for where to put processed images and computed results.")
parser.add_argument('experiment_name', type=str, help="A description of the current data collection.")
parser.add_argument('settings_file', type=str, help="Path to the settings JSON file for this PeakFlux evaluation.")
args = parser.parse_args()

# create the output directory
ft.create_directories_if_necessary(args.outdir)
ft.delete_files_in_directory(args.outdir, "*")

# create the log file
log_path_name_ext = os.path.join(args.outdir, "PeakFlux_" + tdt.current_date_time_string_forfile() + ".log")
lt.logger(log_path_name_ext)

# validate the rest of the inputs
if not ft.directory_exists(args.indir):
lt.error_and_raise(FileNotFoundError, f"Error in PeakFlux.py: input directory '{args.indir}' does not exist!")

PeakFlux(args.indir, args.outdir, args.experiment_name, args.settings_file).run()
Loading