diff --git a/pynxtools_xps/vms/vamas.py b/pynxtools_xps/vms/vamas.py index b3c1409e..92b16da5 100644 --- a/pynxtools_xps/vms/vamas.py +++ b/pynxtools_xps/vms/vamas.py @@ -22,7 +22,9 @@ import re from copy import deepcopy import datetime -from abc import ABC, abstractmethod +from pathlib import Path +from typing import Any, Dict, List, Union + from itertools import groupby import xarray as xr import numpy as np @@ -50,10 +52,10 @@ class VamasMapper(XPSMapper): config_file = "config_vms.json" def __init__(self): - self.file = None - self.parsers = [] + self.file: Union[str, Path] = "" + self.parsers: List[Any] = [] - self.units = { + self.units: dict = { "instrument/sample_normal_polarangle_tilt": "degree ", "instrument/sample_tilt_azimuth": "degree", "instrument/sample_rotation_angle": "degree", @@ -93,7 +95,7 @@ def construct_data(self): # pylint: disable=duplicate-code spectra = deepcopy(self.raw_data) - self._xps_dict["data"]: dict = {} + self._xps_dict["data"]: Dict[str, Any] = {} key_map = { "user": [], @@ -158,7 +160,9 @@ def construct_data(self): for spectrum in spectra: self._update_xps_dict_with_spectrum(spectrum, key_map) - def _update_xps_dict_with_spectrum(self, spectrum, key_map): + def _update_xps_dict_with_spectrum( + self, spectrum: Dict[str, Any], key_map: Dict[str, str] + ): """Map one spectrum from raw data to NXmpes-ready dict.""" # pylint: disable=too-many-locals,duplicate-code group_parent = f'{self._root_path}/Group_{spectrum["group_name"]}' @@ -166,7 +170,7 @@ def _update_xps_dict_with_spectrum(self, spectrum, key_map): instrument_parent = f"{region_parent}/instrument" analyser_parent = f"{instrument_parent}/analyser" - path_map = { + path_map: Dict[str, str] = { "user": f"{region_parent}/user", "instrument": f"{instrument_parent}", "source": f"{instrument_parent}/source", @@ -197,14 +201,14 @@ def _update_xps_dict_with_spectrum(self, spectrum, key_map): self._xps_dict[f"{root}/{mpes_key}/@units"] = units # Write process data - process_key_map = { + process_key_map: Dict[str, List[str]] = { "energy_referencing": ["alignments"], "peak_fitting": ["regions", "components"], } - for grouping, spectrum_keys in process_key_map.items(): + for grouping, process_key_list in process_key_map.items(): root = path_map[str(grouping)] - for spectrum_key in spectrum_keys: + for spectrum_key in process_key_list: try: processes = spectrum[spectrum_key] for i, process in enumerate(processes): @@ -265,7 +269,7 @@ def _update_xps_dict_with_spectrum(self, spectrum, key_map): if spectrum_key not in used_keys: self._xps_dict[f"{region_parent}/{spectrum_key}"] = value - def _get_units_for_key(self, unit_key): + def _get_units_for_key(self, unit_key: str): """ Get correct units for a given key. @@ -302,10 +306,10 @@ def __init__(self): vamas attribute keys, which are used, depending on how the vamas file is formatted. """ - self.data = [] + self.data: List[str] = [] self.header = VamasHeader() - self.blocks = [] + self.blocks: List[VamasBlock] = [] self.attrs = { "common_header": [ @@ -472,7 +476,7 @@ def __init__(self): ], } - def parse_file(self, file): + def parse_file(self, file: Union[str, Path]): """Parse the vamas file into a list of dictionaries. Parameters @@ -485,7 +489,7 @@ def parse_file(self, file): self._parse_blocks() return self.build_list() - def _read_lines(self, file): + def _read_lines(self, file: Union[str, Path]): """Read in vamas text file.""" with open(file, "rb") as vms_file: for line in vms_file: @@ -521,7 +525,10 @@ def _parse_header(self): if attr == "nr_exp_var": self._add_exp_var() + self.header.validate_types() + def _add_exp_var(self): + """Add experimental variable to header.""" for _ in range(int(self.header.nr_exp_var)): for attr in self.attrs["exp_var"]: setattr(self.header, attr, self.data.pop(0).strip()) @@ -570,20 +577,20 @@ def _parse_norm_block(self): block.unknown_1 = self.data.pop(0).strip() block.unknown_2 = self.data.pop(0).strip() block.unknown_3 = self.data.pop(0).strip() - block.source_analyzer_angle = self.data.pop(0).strip() + block.source_analyzer_angle = float(self.data.pop(0).strip()) block.unknown_4 = self.data.pop(0).strip() block.analyzer_mode = self.data.pop(0).strip() block.resolution = float(self.data.pop(0).strip()) block.magnification = self.data.pop(0).strip() block.work_function = float(self.data.pop(0).strip()) block.target_bias = float(self.data.pop(0).strip()) - block.analyzer_width_x = self.data.pop(0).strip() - block.analyzer_width_y = self.data.pop(0).strip() - block.analyzer_take_off_polar_angle = self.data.pop(0).strip() - block.analyzer_azimuth = self.data.pop(0).strip() + block.analyzer_width_x = float(self.data.pop(0).strip()) + block.analyzer_width_y = float(self.data.pop(0).strip()) + block.analyzer_take_off_polar_angle = float(self.data.pop(0).strip()) + block.analyzer_azimuth = float(self.data.pop(0).strip()) block.species_label = self.data.pop(0).strip() block.transition_label = self.data.pop(0).strip() - block.particle_charge = self.data.pop(0).strip() + block.particle_charge = int(self.data.pop(0).strip()) if self.header.scan_mode == "REGULAR": block.abscissa_label = self.data.pop(0).strip() @@ -633,9 +640,11 @@ def _parse_norm_block(self): setattr(block, name, float(self.data.pop(0).strip())) self._add_data_values(block) + + block.validate_types() return block - def _parse_map_block(self, regular: bool = True): + def _parse_map_block(self): """ Use this method when the MAP keyword is present. @@ -671,20 +680,20 @@ def _parse_map_block(self, regular: bool = True): block.unknown_3 = self.data.pop(0).strip() block.fov_x = self.data.pop(0).strip() block.fov_y = self.data.pop(0).strip() - block.source_analyzer_angle = self.data.pop(0).strip() + block.source_analyzer_angle = float(self.data.pop(0).strip()) block.unknown_4 = self.data.pop(0).strip() block.analyzer_mode = self.data.pop(0).strip() block.resolution = float(self.data.pop(0).strip()) block.magnification = self.data.pop(0).strip() block.work_function = float(self.data.pop(0).strip()) block.target_bias = float(self.data.pop(0).strip()) - block.analyzer_width_x = self.data.pop(0).strip() - block.analyzer_width_y = self.data.pop(0).strip() - block.analyzer_take_off_polar_angle = self.data.pop(0).strip() - block.analyzer_azimuth = self.data.pop(0).strip() + block.analyzer_width_x = float(self.data.pop(0).strip()) + block.analyzer_width_y = float(self.data.pop(0).strip()) + block.analyzer_take_off_polar_angle = float(self.data.pop(0).strip()) + block.analyzer_azimuth = float(self.data.pop(0).strip()) block.species_label = self.data.pop(0).strip() block.transition_label = self.data.pop(0).strip() - block.particle_charge = self.data.pop(0).strip() + block.particle_charge = int(self.data.pop(0).strip()) if self.header.scan_mode == "REGULAR": block.abscissa_label = self.data.pop(0).strip() @@ -737,16 +746,17 @@ def _parse_map_block(self, regular: bool = True): return block - def _add_data_values(self, block): + def _add_data_values(self, block: VamasBlock): """Add data values to a Vamas data block.""" if self.header.scan_mode == "REGULAR": self._add_regular_data(block) elif self.header.scan_mode == "IRREGULAR": self._add_irregular_data(block) - def _add_regular_data(self, block): + def _add_regular_data(self, block: VamasBlock): """Parse data with regularly spaced energy axis.""" - data_dict = {} + data_dict: Dict[str, List] = {} + start = float(block.abscissa_start) step = float(block.abscissa_step) num = int(block.num_ord_values / block.no_variables) @@ -778,19 +788,19 @@ def _add_regular_data(self, block): data_dict[name] = data_array_slice setattr(block, name, data_dict[name]) - def _add_irregular_data(self, block): + def _add_irregular_data(self, block: VamasBlock): """Parse data with regularly spaced energy axis.""" - data_dict = {} + data_dict: Dict[str, List] = {} block_data = list(np.array(self.data[: block.num_ord_values], dtype=float)) energy = block_data[:: block.no_variables + 1] if block.abscissa_label == "binding energy": energy.reverse() - block.x = energy - block.abscissa_start = min(energy) - block.abscissa_stop = max(energy) - block.abscissa_step = get_minimal_step(energy) + + setattr(block, "x", energy) + block.abscissa_start = float(min(energy)) + block.abscissa_step = float(get_minimal_step(energy)) for var in range(block.no_variables): if var == 0: @@ -810,10 +820,11 @@ def _add_irregular_data(self, block): self.data = self.data[block.num_ord_values + block.no_variables :] - def _get_scan_numbers_for_spectra(self, spectra): + def _get_scan_numbers_for_spectra(self, spectra: List[Dict]): """ - For a flat list of spectra, groupby group name and spectrum - type and iteratively give them scan numbers. + For a flat list of spectra dictionaries, group the spectra + by group name and spectrum type and iteratively give them + scan numbers. Parameters ---------- @@ -845,7 +856,7 @@ def _get_scan_numbers_for_spectra(self, spectra): return flattened_spectra - def handle_header_comments(self, comment_list): + def handle_header_comments(self, comment_list: List[str]): """Handle comments (incl. Casa info) for the header.""" comments = {} @@ -871,7 +882,7 @@ def handle_header_comments(self, comment_list): end_index = [ i for i, line in enumerate(comment_list) if "EOFH" in line ][0] - special_comments = comment_list[index : end_index + 1] + special_comments = comment_list[index : end_index + 1] # type: ignore[assignment] del comment_list[index : end_index + 1] comments.update(handle_func(special_comments)) @@ -900,7 +911,7 @@ def _handle_prodigy_header(self, comment_line: str): return {"prodigy_version": comment_line.split("Version")[1].strip()} # ============================================================================= - # def _handle_phi_header(self, comment_list): + # def _handle_phi_header(self, comment_list: List[str]): # """Get metadta from Phi system.""" # phi_parser = PhiParser() # phi_parser.parse_header_into_metadata(comment_list) @@ -919,7 +930,7 @@ def _handle_prodigy_header(self, comment_line: str): # return phi_comments # ============================================================================= - def handle_block_comments(self, comment_list): + def handle_block_comments(self, comment_list: List[str]): """Handle comments (incl. Casa fitting) for each block.""" comments = {} diff --git a/pynxtools_xps/vms/vamas_data_model.py b/pynxtools_xps/vms/vamas_data_model.py index e62a69ab..75d7adf3 100644 --- a/pynxtools_xps/vms/vamas_data_model.py +++ b/pynxtools_xps/vms/vamas_data_model.py @@ -57,72 +57,72 @@ class VamasBlock(XpsDataclass): block_id: str = "" sample_id: str = "" - year: str = "" - month: str = "" - day: str = "" - hour: str = "" - minute: str = "" - second: str = "" - no_hrs_in_advance_of_gmt: str = "0" - no_comment_lines: str = "" + year: int = 0 + month: int = 0 + day: int = 0 + hour: int = 0 + minute: int = 0 + second: int = 0 + no_hrs_in_advance_of_gmt: int = 0 + no_comment_lines: int = 0 # This list should contain one element per for each # line in the comment block comment_lines: list = field(default_factory=list) technique: str = "" exp_var_value: str = "" source_label: str = "" - source_energy: str = "" + source_energy: float = 0.0 unknown_1: str = "0" unknown_2: str = "0" unknown_3: str = "0" - source_analyzer_angle: str = "" + source_analyzer_angle: float = 0.0 unknown_4: str = "180" analyzer_mode: str = "" - resolution: str = "" + resolution: float = 0.0 magnification: str = "1" - work_function: str = "" - target_bias: str = "0" + work_function: float = 0.0 + target_bias: float = 0.0 # analyser slit length divided by the magnification # of the analyser transfer lens - analyzer_width_x: str = "0" - analyzer_width_y: str = "0" + analyzer_width_x: float = 0.0 + analyzer_width_y: float = 0.0 # degrees from upward z-direction, # defined by the sample stage - analyzer_take_off_polar_angle: str = "0" - analyzer_azimuth: str = "0" + analyzer_take_off_polar_angle: float = 0.0 + analyzer_azimuth: float = 0.0 species_label: str = "" transition_label: str = "" - particle_charge: str = "-1" + particle_charge: int = -1 abscissa_label: str = "kinetic energy" abscissa_units: str = "eV" - abscissa_start: str = "" - abscissa_step: str = "" - no_variables: str = "2" + abscissa_start: float = 0.0 + abscissa_step: float = 0.0 + no_variables: int = 2 variable_label_1: str = "counts" variable_units_1: str = "d" variable_label_2: str = "Transmission" variable_units_2: str = "d" signal_mode: str = "pulse counting" - dwell_time: str = "" - no_scans: str = "" + dwell_time: float = 0.0 + no_scans: int = 0 time_correction: str = "0" # degrees from upward z-direction, # defined by the sample stage - sample_angle_tilt: str = "0" + sample_angle_tilt: float = 0.0 # degrees clockwise from the y-direction towards the # operator, defined by the sample stage - sample_tilt_azimuth: str = "0" - sample_rotation: str = "0" - no_additional_params: str = "2" + sample_tilt_azimuth: float = 0.0 + sample_rotation: float = 0.0 + no_additional_params: int = 2 param_label_1: str = "ESCAPE DEPTH TYPE" param_unit_1: str = "d" param_value_1: str = "0" param_label_2: str = "MFP Exponent" param_unit_2: str = "d" param_value_2: str = "0" - num_ord_values: str = "" - min_ord_value_1: str = "" - max_ord_value_1: str = "" - min_ord_value_2: str = "" - max_ord_value_2: str = "" + num_ord_values: int = 0 + min_ord_value_1: float = 0.0 + max_ord_value_1: float = 0.0 + min_ord_value_2: float = 0.0 + max_ord_value_2: float = 0.0 data_string: str = ""