Skip to content

Commit

Permalink
improve static typing
Browse files Browse the repository at this point in the history
  • Loading branch information
lukaspie committed Mar 8, 2024
1 parent 2ebd168 commit 8d4e029
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 76 deletions.
99 changes: 55 additions & 44 deletions pynxtools_xps/vms/vamas.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import re
from copy import deepcopy
import datetime
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Any, Dict, List, Union

from itertools import groupby
import xarray as xr
import numpy as np
Expand Down Expand Up @@ -50,10 +52,10 @@ class VamasMapper(XPSMapper):
config_file = "config_vms.json"

def __init__(self):
self.file = None
self.parsers = []
self.file: Union[str, Path] = ""
self.parsers: List[Any] = []

self.units = {
self.units: dict = {
"instrument/sample_normal_polarangle_tilt": "degree ",
"instrument/sample_tilt_azimuth": "degree",
"instrument/sample_rotation_angle": "degree",
Expand Down Expand Up @@ -93,7 +95,7 @@ def construct_data(self):
# pylint: disable=duplicate-code
spectra = deepcopy(self.raw_data)

self._xps_dict["data"]: dict = {}
self._xps_dict["data"]: Dict[str, Any] = {}

key_map = {
"user": [],
Expand Down Expand Up @@ -158,15 +160,17 @@ def construct_data(self):
for spectrum in spectra:
self._update_xps_dict_with_spectrum(spectrum, key_map)

def _update_xps_dict_with_spectrum(self, spectrum, key_map):
def _update_xps_dict_with_spectrum(
self, spectrum: Dict[str, Any], key_map: Dict[str, str]
):
"""Map one spectrum from raw data to NXmpes-ready dict."""
# pylint: disable=too-many-locals,duplicate-code
group_parent = f'{self._root_path}/Group_{spectrum["group_name"]}'
region_parent = f'{group_parent}/regions/Region_{spectrum["spectrum_type"]}'
instrument_parent = f"{region_parent}/instrument"
analyser_parent = f"{instrument_parent}/analyser"

path_map = {
path_map: Dict[str, str] = {
"user": f"{region_parent}/user",
"instrument": f"{instrument_parent}",
"source": f"{instrument_parent}/source",
Expand Down Expand Up @@ -197,14 +201,14 @@ def _update_xps_dict_with_spectrum(self, spectrum, key_map):
self._xps_dict[f"{root}/{mpes_key}/@units"] = units

# Write process data
process_key_map = {
process_key_map: Dict[str, List[str]] = {
"energy_referencing": ["alignments"],
"peak_fitting": ["regions", "components"],
}

for grouping, spectrum_keys in process_key_map.items():
for grouping, process_key_list in process_key_map.items():
root = path_map[str(grouping)]
for spectrum_key in spectrum_keys:
for spectrum_key in process_key_list:
try:
processes = spectrum[spectrum_key]
for i, process in enumerate(processes):
Expand Down Expand Up @@ -265,7 +269,7 @@ def _update_xps_dict_with_spectrum(self, spectrum, key_map):
if spectrum_key not in used_keys:
self._xps_dict[f"{region_parent}/{spectrum_key}"] = value

def _get_units_for_key(self, unit_key):
def _get_units_for_key(self, unit_key: str):
"""
Get correct units for a given key.
Expand Down Expand Up @@ -302,10 +306,10 @@ def __init__(self):
vamas attribute keys, which are used, depending on how the
vamas file is formatted.
"""
self.data = []
self.data: List[str] = []

self.header = VamasHeader()
self.blocks = []
self.blocks: List[VamasBlock] = []

self.attrs = {
"common_header": [
Expand Down Expand Up @@ -472,7 +476,7 @@ def __init__(self):
],
}

def parse_file(self, file):
def parse_file(self, file: Union[str, Path]):
"""Parse the vamas file into a list of dictionaries.
Parameters
Expand All @@ -485,7 +489,7 @@ def parse_file(self, file):
self._parse_blocks()
return self.build_list()

def _read_lines(self, file):
def _read_lines(self, file: Union[str, Path]):
"""Read in vamas text file."""
with open(file, "rb") as vms_file:
for line in vms_file:
Expand Down Expand Up @@ -521,7 +525,10 @@ def _parse_header(self):
if attr == "nr_exp_var":
self._add_exp_var()

self.header.validate_types()

def _add_exp_var(self):
"""Add experimental variable to header."""
for _ in range(int(self.header.nr_exp_var)):
for attr in self.attrs["exp_var"]:
setattr(self.header, attr, self.data.pop(0).strip())
Expand Down Expand Up @@ -570,20 +577,20 @@ def _parse_norm_block(self):
block.unknown_1 = self.data.pop(0).strip()
block.unknown_2 = self.data.pop(0).strip()
block.unknown_3 = self.data.pop(0).strip()
block.source_analyzer_angle = self.data.pop(0).strip()
block.source_analyzer_angle = float(self.data.pop(0).strip())
block.unknown_4 = self.data.pop(0).strip()
block.analyzer_mode = self.data.pop(0).strip()
block.resolution = float(self.data.pop(0).strip())
block.magnification = self.data.pop(0).strip()
block.work_function = float(self.data.pop(0).strip())
block.target_bias = float(self.data.pop(0).strip())
block.analyzer_width_x = self.data.pop(0).strip()
block.analyzer_width_y = self.data.pop(0).strip()
block.analyzer_take_off_polar_angle = self.data.pop(0).strip()
block.analyzer_azimuth = self.data.pop(0).strip()
block.analyzer_width_x = float(self.data.pop(0).strip())
block.analyzer_width_y = float(self.data.pop(0).strip())
block.analyzer_take_off_polar_angle = float(self.data.pop(0).strip())
block.analyzer_azimuth = float(self.data.pop(0).strip())
block.species_label = self.data.pop(0).strip()
block.transition_label = self.data.pop(0).strip()
block.particle_charge = self.data.pop(0).strip()
block.particle_charge = int(self.data.pop(0).strip())

if self.header.scan_mode == "REGULAR":
block.abscissa_label = self.data.pop(0).strip()
Expand Down Expand Up @@ -633,9 +640,11 @@ def _parse_norm_block(self):
setattr(block, name, float(self.data.pop(0).strip()))

self._add_data_values(block)

block.validate_types()
return block

def _parse_map_block(self, regular: bool = True):
def _parse_map_block(self):
"""
Use this method when the MAP keyword is present.
Expand Down Expand Up @@ -671,20 +680,20 @@ def _parse_map_block(self, regular: bool = True):
block.unknown_3 = self.data.pop(0).strip()
block.fov_x = self.data.pop(0).strip()
block.fov_y = self.data.pop(0).strip()
block.source_analyzer_angle = self.data.pop(0).strip()
block.source_analyzer_angle = float(self.data.pop(0).strip())
block.unknown_4 = self.data.pop(0).strip()
block.analyzer_mode = self.data.pop(0).strip()
block.resolution = float(self.data.pop(0).strip())
block.magnification = self.data.pop(0).strip()
block.work_function = float(self.data.pop(0).strip())
block.target_bias = float(self.data.pop(0).strip())
block.analyzer_width_x = self.data.pop(0).strip()
block.analyzer_width_y = self.data.pop(0).strip()
block.analyzer_take_off_polar_angle = self.data.pop(0).strip()
block.analyzer_azimuth = self.data.pop(0).strip()
block.analyzer_width_x = float(self.data.pop(0).strip())
block.analyzer_width_y = float(self.data.pop(0).strip())
block.analyzer_take_off_polar_angle = float(self.data.pop(0).strip())
block.analyzer_azimuth = float(self.data.pop(0).strip())
block.species_label = self.data.pop(0).strip()
block.transition_label = self.data.pop(0).strip()
block.particle_charge = self.data.pop(0).strip()
block.particle_charge = int(self.data.pop(0).strip())

if self.header.scan_mode == "REGULAR":
block.abscissa_label = self.data.pop(0).strip()
Expand Down Expand Up @@ -737,16 +746,17 @@ def _parse_map_block(self, regular: bool = True):

return block

def _add_data_values(self, block):
def _add_data_values(self, block: VamasBlock):
"""Add data values to a Vamas data block."""
if self.header.scan_mode == "REGULAR":
self._add_regular_data(block)
elif self.header.scan_mode == "IRREGULAR":
self._add_irregular_data(block)

def _add_regular_data(self, block):
def _add_regular_data(self, block: VamasBlock):
"""Parse data with regularly spaced energy axis."""
data_dict = {}
data_dict: Dict[str, List] = {}

start = float(block.abscissa_start)
step = float(block.abscissa_step)
num = int(block.num_ord_values / block.no_variables)
Expand Down Expand Up @@ -778,19 +788,19 @@ def _add_regular_data(self, block):
data_dict[name] = data_array_slice
setattr(block, name, data_dict[name])

def _add_irregular_data(self, block):
def _add_irregular_data(self, block: VamasBlock):
"""Parse data with regularly spaced energy axis."""
data_dict = {}
data_dict: Dict[str, List] = {}

block_data = list(np.array(self.data[: block.num_ord_values], dtype=float))

energy = block_data[:: block.no_variables + 1]
if block.abscissa_label == "binding energy":
energy.reverse()
block.x = energy
block.abscissa_start = min(energy)
block.abscissa_stop = max(energy)
block.abscissa_step = get_minimal_step(energy)

setattr(block, "x", energy)
block.abscissa_start = float(min(energy))
block.abscissa_step = float(get_minimal_step(energy))

for var in range(block.no_variables):
if var == 0:
Expand All @@ -810,10 +820,11 @@ def _add_irregular_data(self, block):

self.data = self.data[block.num_ord_values + block.no_variables :]

def _get_scan_numbers_for_spectra(self, spectra):
def _get_scan_numbers_for_spectra(self, spectra: List[Dict]):
"""
For a flat list of spectra, groupby group name and spectrum
type and iteratively give them scan numbers.
For a flat list of spectra dictionaries, group the spectra
by group name and spectrum type and iteratively give them
scan numbers.
Parameters
----------
Expand Down Expand Up @@ -845,7 +856,7 @@ def _get_scan_numbers_for_spectra(self, spectra):

return flattened_spectra

def handle_header_comments(self, comment_list):
def handle_header_comments(self, comment_list: List[str]):
"""Handle comments (incl. Casa info) for the header."""
comments = {}

Expand All @@ -871,7 +882,7 @@ def handle_header_comments(self, comment_list):
end_index = [
i for i, line in enumerate(comment_list) if "EOFH" in line
][0]
special_comments = comment_list[index : end_index + 1]
special_comments = comment_list[index : end_index + 1] # type: ignore[assignment]
del comment_list[index : end_index + 1]

comments.update(handle_func(special_comments))
Expand Down Expand Up @@ -900,7 +911,7 @@ def _handle_prodigy_header(self, comment_line: str):
return {"prodigy_version": comment_line.split("Version")[1].strip()}

# =============================================================================
# def _handle_phi_header(self, comment_list):
# def _handle_phi_header(self, comment_list: List[str]):
# """Get metadta from Phi system."""
# phi_parser = PhiParser()
# phi_parser.parse_header_into_metadata(comment_list)
Expand All @@ -919,7 +930,7 @@ def _handle_prodigy_header(self, comment_line: str):
# return phi_comments
# =============================================================================

def handle_block_comments(self, comment_list):
def handle_block_comments(self, comment_list: List[str]):
"""Handle comments (incl. Casa fitting) for each block."""
comments = {}

Expand Down
64 changes: 32 additions & 32 deletions pynxtools_xps/vms/vamas_data_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,72 +57,72 @@ class VamasBlock(XpsDataclass):

block_id: str = ""
sample_id: str = ""
year: str = ""
month: str = ""
day: str = ""
hour: str = ""
minute: str = ""
second: str = ""
no_hrs_in_advance_of_gmt: str = "0"
no_comment_lines: str = ""
year: int = 0
month: int = 0
day: int = 0
hour: int = 0
minute: int = 0
second: int = 0
no_hrs_in_advance_of_gmt: int = 0
no_comment_lines: int = 0
# This list should contain one element per for each
# line in the comment block
comment_lines: list = field(default_factory=list)
technique: str = ""
exp_var_value: str = ""
source_label: str = ""
source_energy: str = ""
source_energy: float = 0.0
unknown_1: str = "0"
unknown_2: str = "0"
unknown_3: str = "0"
source_analyzer_angle: str = ""
source_analyzer_angle: float = 0.0
unknown_4: str = "180"
analyzer_mode: str = ""
resolution: str = ""
resolution: float = 0.0
magnification: str = "1"
work_function: str = ""
target_bias: str = "0"
work_function: float = 0.0
target_bias: float = 0.0
# analyser slit length divided by the magnification
# of the analyser transfer lens
analyzer_width_x: str = "0"
analyzer_width_y: str = "0"
analyzer_width_x: float = 0.0
analyzer_width_y: float = 0.0
# degrees from upward z-direction,
# defined by the sample stage
analyzer_take_off_polar_angle: str = "0"
analyzer_azimuth: str = "0"
analyzer_take_off_polar_angle: float = 0.0
analyzer_azimuth: float = 0.0
species_label: str = ""
transition_label: str = ""
particle_charge: str = "-1"
particle_charge: int = -1
abscissa_label: str = "kinetic energy"
abscissa_units: str = "eV"
abscissa_start: str = ""
abscissa_step: str = ""
no_variables: str = "2"
abscissa_start: float = 0.0
abscissa_step: float = 0.0
no_variables: int = 2
variable_label_1: str = "counts"
variable_units_1: str = "d"
variable_label_2: str = "Transmission"
variable_units_2: str = "d"
signal_mode: str = "pulse counting"
dwell_time: str = ""
no_scans: str = ""
dwell_time: float = 0.0
no_scans: int = 0
time_correction: str = "0"
# degrees from upward z-direction,
# defined by the sample stage
sample_angle_tilt: str = "0"
sample_angle_tilt: float = 0.0
# degrees clockwise from the y-direction towards the
# operator, defined by the sample stage
sample_tilt_azimuth: str = "0"
sample_rotation: str = "0"
no_additional_params: str = "2"
sample_tilt_azimuth: float = 0.0
sample_rotation: float = 0.0
no_additional_params: int = 2
param_label_1: str = "ESCAPE DEPTH TYPE"
param_unit_1: str = "d"
param_value_1: str = "0"
param_label_2: str = "MFP Exponent"
param_unit_2: str = "d"
param_value_2: str = "0"
num_ord_values: str = ""
min_ord_value_1: str = ""
max_ord_value_1: str = ""
min_ord_value_2: str = ""
max_ord_value_2: str = ""
num_ord_values: int = 0
min_ord_value_1: float = 0.0
max_ord_value_1: float = 0.0
min_ord_value_2: float = 0.0
max_ord_value_2: float = 0.0
data_string: str = ""

0 comments on commit 8d4e029

Please sign in to comment.