diff --git a/src/pynxtools_xps/config/config_txt_vamas_export.json b/src/pynxtools_xps/config/config_txt_vamas_export.json deleted file mode 100644 index 68e77bf6..00000000 --- a/src/pynxtools_xps/config/config_txt_vamas_export.json +++ /dev/null @@ -1,281 +0,0 @@ -{ - "/ENTRY":{ - "@default":"data", - "title":"@eln", - "start_time":"@eln", - "end_time":"@eln", - "experiment_institution":"@eln", - "experiment_facility":"@eln", - "experiment_laboratory":"@eln", - "entry_identifier":"@eln", - "duration":"@eln", - "duration/@units":"@eln", - "method":"@eln", - "program_name":"@eln" - }, - "/ENTRY/USER[user]":{ - "name":"@eln", - "affiliation":"@eln", - "address":"@eln", - "email":"@eln", - "IDENTIFIER[orcid]":{ - "service":"@eln", - "identifier":"@eln", - "is_persistent":"@eln" - } - }, - "/ENTRY/INSTRUMENT[instrument]":{ - "@default":"electronanalyser", - "device_information":{ - "vendor":"@eln", - "model":"@eln", - "identifier":{ - "service":"@eln", - "identifier":"@eln", - "is_persistent":"@eln" - } - }, - "energy_resolution":{ - "physical_quantity":"energy", - "type":"@eln", - "resolution":"!@eln", - "resolution/@units":"@eln" - }, - "sourceTYPE[source_xray]":{ - "type":"@eln", - "name":"@eln", - "probe":"photon", - "device_information":{ - "vendor":"@eln", - "model":"@eln", - "identifier":{ - "service":"@eln", - "identifier":"@eln", - "is_persistent":"@eln" - } - }, - "associated_beam":"/entry/instrument/beam_xray" - }, - "beamTYPE[beam_xray]":{ - "distance":"@eln", - "distance/@units":"@eln", - "incident_energy":"@attrs:beam_xray/excitation_energy", - "incident_energy/@units":"@attrs:beam_xray/excitation_energy/@units", - "incident_energy_spread":null, - "incident_energy_spread/@units":null, - "incident_polarization":null, - "incident_polarization/@units":null, - "extent":null, - "associated_source":"/entry/instrument/source_xray" - }, - "ELECTRONANALYSER[electronanalyser]":{ - "@default":"detector", - "name":"@eln", - "description":"@eln", - "work_function":"@eln", - "work_function/@units":"@eln", - "fast_axes":null, - "slow_axes":"energy", - "device_information":{ - "vendor":"@eln", - "model":"@eln", - "identifier":{ - "service":"@eln", - "identifier":"@eln", - "is_persistent":"@eln" - } - }, - "energy_resolution":{ - "physical_quantity":"energy", - "type":"estimated", - "resolution":"@attrs:data/step_size", - "resolution/@units":"@attrs:data/step_size/@units" - }, - "transmission_function":null, - "COLLECTIONCOLUMN[collectioncolumn]":{ - "scheme":"@eln", - "lens_mode":"@eln", - "projection":null, - "angular_acceptance":null, - "spatial_acceptance":null, - "field_aperture":null, - "contrast_aperture":null, - "device_information":{ - "vendor":"@eln", - "model":"@eln", - "identifier":{ - "service":"@eln", - "identifier":"@eln", - "is_persistent":"@eln" - } - } - }, - "ENERGYDISPERSION[energydispersion]":{ - "scheme":"@eln", - "pass_energy":"@eln", - "pass_energy/@units":"@eln", - "energy_scan_mode":"@eln", - "device_information":{ - "vendor":"@eln", - "model":"@eln", - "identifier":{ - "service":"@eln", - "identifier":"@eln", - "is_persistent":"@eln" - } - } - }, - "DETECTOR[detector]":{ - "@default":"raw_data", - "amplifier_type":"@eln", - "detector_type":"@eln", - "count_time":"@attrs:detector/dwell_time", - "count_time/@units":"@attrs:detector/dwell_time/@units", - "device_information":{ - "vendor":"@eln", - "model":"@eln", - "identifier":{ - "service":"@eln", - "identifier":"@eln", - "is_persistent":"@eln" - } - }, - "raw_data":{ - "@signal":"raw", - "raw":"@data:raw_data", - "raw/@units":"counts", - "DATA[*]":"@data:*.channels", - "DATA[*]/@units":"counts" - } - } - }, - "MANIPULATOR[manipulator]":{ - "device_information":{ - "vendor":"@eln", - "model":"@eln", - "identifier":{ - "service":"@eln", - "identifier":"@eln", - "is_persistent":"@eln" - } - }, - "temperature_sensor":{ - "name":"@eln", - "measurement":"@eln", - "type":"@eln", - "attached_to":"@eln", - "value":"@eln", - "value/@units":"@eln" - }, - "sample_heater":{ - "name":"@eln", - "physical_quantity":"@eln", - "type":"@eln", - "heater_power":"@eln", - "heater_power/@units":"@eln", - "PID[pid]/setpoint":"@eln", - "PID[pid]/setpoint/@units":"@eln" - }, - "cryostat":null, - "drain_current_amperemeter":{ - "name":"@eln", - "measurement":"@eln", - "type":"@eln", - "value":"@eln", - "value/@units":"@eln" - }, - "sample_bias_voltmeter":{ - "name":"@eln", - "measurement":"@eln", - "type":"@eln", - "value":"@eln", - "value/@units":"@eln" - }, - "sample_bias_potentiostat":{ - "name":"@eln", - "physical_quantity":"@eln", - "type":"@eln", - "PID[pid]/setpoint":"@eln", - "PID[pid]/setpoint/@units":"@eln" - } - }, - "pressure_gauge":{ - "name":"@eln", - "measurement":"@eln", - "type":"@eln", - "value":"@eln", - "value/@units":"@eln" - }, - "flood_gun":{ - "name":"@eln", - "physical_quantity":"@eln", - "type":"@eln", - "current":"@eln", - "current/@units":"@eln" - } - }, - "/ENTRY/PROCESS_MPES[process]":{ - "energy_calibration":{ - "calibrated_axis":"@data:energy", - "calibrated_axis/@units":"eV" - }, - "energy_referencing":null - }, - "/ENTRY/SAMPLE[sample]":{ - "name":"@eln", - "identifier":{ - "identifier":"@eln" - }, - "atom_types":"@eln", - "physical_form":"@eln", - "situation":"@eln", - "SUBSTANCE[substance]":{ - "name":"@eln", - "molecular_mass":"@eln", - "cas_number":"@eln", - "molecular_formula_hill":"@eln" - }, - "history":{ - "sample_preparation":{ - "start_time":"@eln", - "end_time":"@eln", - "description":"@eln", - "method":"@eln" - } - }, - "temperature_env":{ - "temperature_sensor":"@link:/entry/instrument/manipulator/temperature_sensor", - "sample_heater":"@link:/entry/instrument/manipulator/sample_heater", - "cryostat":null - }, - "gas_pressure_env":{ - "pressure_gauge":"@link:/entry/instrument/pressure_gauge" - }, - "bias_env":{ - "potentiostat":"@link:/entry/instrument/manipulator/sample_bias_potentiostat", - "voltmeter":"@link:/entry/instrument/manipulator/sample_bias_voltmeter" - }, - "drain_current_env":{ - "amperemeter":"@link:/entry/instrument/manipulator/drain_current_amperemeter" - }, - "flood_gun_current_env":{ - "flood_gun":"@link:/entry/instrument/flood_gun" - } - }, - "/ENTRY/data":{ - "@axes":["energy"], - "@signal":"data", - "data":"@data:average", - "data_errors":"@data:errors", - "data/@units":"@attrs:data/y_units", - "DATA[*]":"@data:*.scans", - "DATA[*]/@units":"@attrs:data/y_units", - "energy":"@data:energy", - "energy/@type":"@attrs:data/energy_type", - "energy/@units":"eV", - "energy/@reference":null, - "@energy_indices":0 - } - } - - \ No newline at end of file diff --git a/src/pynxtools_xps/config/config_vms.json b/src/pynxtools_xps/config/config_vms.json index 1ac5baed..210f12a2 100644 --- a/src/pynxtools_xps/config/config_vms.json +++ b/src/pynxtools_xps/config/config_vms.json @@ -2,16 +2,18 @@ "/ENTRY":{ "@default":"data", "title":"@eln", - "start_time":"@attrs:time_stamp", + "start_time":"['@attrs:time_stamp', '@eln']", "end_time":"@eln", "experiment_institution":"@attrs:institute_id", - "experiment_identifier":"@attrs:experiment_id", + "experiment_identifier":{ + "identifier":"['@attrs:experiment_id', '@eln']" + }, "experiment_facility":"@eln", "experiment_laboratory":"@eln", "entry_identifier":"@eln", - "duration":null, + "duration":"@eln", "duration/@units":"s", - "method":"@attrs:region/analysis_method", + "method":"['@attrs:analysis_method', '@eln']", "program_name":"@eln" }, "/ENTRY/geometries":{ @@ -27,7 +29,7 @@ "z":[0, 0 , 1], "z/@units":"m", "coordinate_system_transformations":{ - "AXISNAME[z_rotation]":"@attrs:source_azimuth_angle", + "AXISNAME[z_rotation]":"!@attrs:source_azimuth_angle", "AXISNAME[z_rotation]/@units":"@attrs:source_azimuth_angle/@units", "AXISNAME[z_rotation]/@transformation_type":"rotation", "AXISNAME[z_rotation]/@vector":[0, 0, 1], @@ -39,7 +41,7 @@ "AXISNAME[y_flip]/@vector":[0, 1, 0], "AXISNAME[y_flip]/@depends_on":"y_rotation", - "AXISNAME[y_rotation]":"@attrs:source_polar_angle", + "AXISNAME[y_rotation]":"!@attrs:source_polar_angle", "AXISNAME[y_rotation]/@units":"@attrs:source_polar_angle/@units", "AXISNAME[y_rotation]/@transformation_type":"rotation", "AXISNAME[y_rotation]/@vector":[0, -1, 0], @@ -72,12 +74,12 @@ "energy_resolution":{ "physical_quantity":"energy", "type":"derived", - "resolution":"@attrs:step_size", - "resolution/@units":"@attrs:step_size/@units" + "resolution":"!['@attrs:step_size', '@eln']", + "resolution/@units":"['@attrs:step_size@units', '@eln']" }, "sourceTYPE[source_xray]":{ "type":"@eln", - "name":"@attrs:source_label", + "name":"['@attrs:source_label', '@eln']", "probe":"photon", "device_information":{ "vendor":"@eln", @@ -95,8 +97,8 @@ "distance/@units":"@eln", "extent":"@attrs:extent", "extent/@units":"@attrs:extent/@units", - "incident_energy":"@attrs:excitation_energy", - "incident_energy/@units":"eV", + "incident_energy":"['@attrs:excitation_energy', '@attrs:characteristic_energy']", + "incident_energy/@units":"['@attrs:excitation_energy/@units', '@attrs:characteristic_energy/@units', 'eV']", "incident_energy_spread":null, "incident_energy_spread/@units":null, "incident_polarization":null, @@ -104,13 +106,13 @@ "associated_source":"/entry/instrument/source_xray", "depends_on":"/entry/instrument/beam_xray/transformations/beam_polar_angle_of_incidence", "transformations":{ - "beam_polar_angle_of_incidence":"@attrs:source_polar_angle", + "beam_polar_angle_of_incidence":"!@attrs:source_polar_angle", "beam_polar_angle_of_incidence/@units":"@attrs:source_polar_angle/@units", "beam_polar_angle_of_incidence/@transformation_type":"rotation", "beam_polar_angle_of_incidence/@vector":[0, -1, 0], "beam_polar_angle_of_incidence/@depends_on":"beam_azimuth_angle", - "beam_azimuth_angle":"@attrs:source_azimuth_angle", + "beam_azimuth_angle":"!@attrs:source_azimuth_angle", "beam_azimuth_angle/@units":"@attrs:source_azimuth_angle/@units", "beam_azimuth_angle/@transformation_type":"rotation", "beam_azimuth_angle/@vector":[0, 0, -1], @@ -121,7 +123,7 @@ "@default":"detector", "name":"@eln", "description":"@eln", - "work_function":"@attrs:work_function", + "work_function":"['@attrs:work_function', '@eln']", "work_function/@units":"eV", "fast_axes":null, "slow_axes":"energy", @@ -139,7 +141,7 @@ "energy_resolution":{ "physical_quantity":"energy", "type":"estimated", - "resolution":"@attrs:step_size", + "resolution":"!@attrs:step_size", "resolution/@units":"eV" }, "transmission_function":null, @@ -164,9 +166,9 @@ }, "ENERGYDISPERSION[energydispersion]":{ "scheme":"@eln", - "pass_energy":"@attrs:pass_energy", - "pass_energy/@units":"@attrs:pass_energy/@units", - "energy_scan_mode":"@attrs:scan_mode", + "pass_energy":"['@attrs:pass_energy', '@eln']", + "pass_energy/@units":"['@attrs:pass_energy/@units', '@eln']", + "energy_scan_mode":"['@attrs:scan_mode', '@eln']", "radius":"@eln", "radius/@units":"@eln", "device_information":{ @@ -185,8 +187,8 @@ "detector_type":"@eln", "detector_voltage":"@eln", "detector_voltage/@units":"@eln", - "count_time":"@attrs:dwell_time", - "count_time/@units":"@attrs:dwell_time/@units", + "count_time":"['@attrs:dwell_time', '@attrs:acquisition_time']", + "count_time/@units":"['@attrs:dwell_time/@units', '@attrs:acquisition_time/@units']", "acquisition_mode":"@attrs:signal_mode", "device_information":{ "vendor":"@eln", @@ -207,13 +209,13 @@ }, "depends_on":"/entry/instrument/electronanalyser/transformations/analyser_take_off_polar_angle", "transformations":{ - "analyser_take_off_polar_angle":"@attrs:analyser_take_off_polar_angle", + "analyser_take_off_polar_angle":"!@attrs:analyser_take_off_polar_angle", "analyser_take_off_polar_angle/@units":"@attrs:analyser_take_off_polar_angle/@units", "analyser_take_off_polar_angle/@transformation_type":"rotation", "analyser_take_off_polar_angle/@vector":[0, -1, 0], "analyser_take_off_polar_angle/@depends_on":"analyser_take_off_azimuth_angle", - "analyser_take_off_azimuth_angle":"@attrs:analyser_take_off_azimuth_angle", + "analyser_take_off_azimuth_angle":"!@attrs:analyser_take_off_azimuth_angle", "analyser_take_off_azimuth_angle/@units":"@attrs:analyser_take_off_azimuth_angle/@units", "analyser_take_off_azimuth_angle/@transformation_type":"rotation", "analyser_take_off_azimuth_angle/@vector":[0, 0, -1], @@ -307,8 +309,90 @@ } } }, + "/ENTRY/FIT[fit]":{ + "@default":"data", + "label":"!@attrs:fit_label", + "data":{ + "@axes":["input_independent"], + "@signal":"input_dependent", + "@auxiliary_signals":["envelope"], + "input_independent":"@link:/entry/data/energy", + "input_dependent":"@link:/entry/data/data", + "envelope":"['@attrs:fit_envelope_cps/data', '@attrs:fit_envelope_cps/data']", + "envelope/@units":"counts_per_second", + "residual": null, + "residual/@units":"counts_per_second" + }, + "peakPEAK[peak*]":{ + "@default":"data", + "label":"@attrs:component*/name", + "data":{ + "@axes":["position"], + "@signal":"intensity", + "intensity":"['@attrs:component*/data_cps', '@attrs:component*/data']", + "intensity/@units":"counts_per_second", + "position":"@data:energy", + "position/@units":"@attrs:energy_units" + }, + "function":{ + "description":"@attrs:component*/lineshape", + "formula":"@attrs:component*/formula", + "position":{ + "value":"@attrs:component*/position", + "value/@units":"@attrs:component*/position/@units", + "min_value":"@attrs:component*/position_min", + "min_value/@units":"@attrs:component*/position_min/@units", + "max_value":"@attrs:component*/position_max", + "max_value/@units":"@attrs:component*/position_max/@units" + }, + "width":{ + "value":"@attrs:component*/width", + "value/@units":"@attrs:component*/width/@units", + "min_value":"@attrs:component*/width_min", + "min_value/@units":"@attrs:component*/width_min/@units", + "max_value":"@attrs:component*/width_max", + "max_value/@units":"@attrs:component*/width_max/@units" + }, + "area":{ + "value":"@attrs:component*/area", + "min_value":"@attrs:component*/area_min", + "max_value":"@attrs:component*/area_max" + } + }, + "total_area":"@attrs:component*/area", + "relative_sensitivity_factor":"@attrs:component*/rsf", + "relative_atomic_concentration":"@attrs:component*/atomic_concentration" + }, + "backgroundBACKGROUND[background*]":{ + "@default":"data", + "label":"@attrs:region*/name", + "data":{ + "@axes":["position"], + "@signal":"intensity", + "intensity":"['@attrs:background_intensity/data_cps', '@attrs:background_intensity/data', '@attrs:region*/data_cps', '@attrs:region*/data']", + "intensity/@units":"counts_per_second", + "position":"@data:energy", + "position/@units":"@attrs:energy_units" + }, + "function":{ + "description":"@attrs:region*/bg_type", + "formula":"@attrs:region*/formula" + } + }, + "error_function":{ + "description": null, + "formula": null + }, + "global_fit_function":{ + "description": null, + "formula": null + }, + "figure_of_meritMETRIC[figure_of_merit]": null, + "figure_of_meritMETRIC[figure_of_merit]/@metric": null, + "figure_of_meritMETRIC[figure_of_merit]/@units": null + }, "/ENTRY/SAMPLE[sample]":{ - "name":"@attrs:sample_name", + "name":"['@attrs:sample_name', '@eln']", "identifier":{ "identifier":"@eln" }, @@ -330,22 +414,22 @@ } }, "temperature_env":{ - "temperature_sensor":"@link:/entry/instrument/manipulator/temperature_sensor", - "sample_heater":"@link:/entry/instrument/manipulator/sample_heater", + "temperature_sensor":"!@link:/entry/instrument/manipulator/temperature_sensor", + "sample_heater":"!@link:/entry/instrument/manipulator/sample_heater", "cryostat":null }, "gas_pressure_env":{ - "pressure_gauge":"@link:/entry/instrument/pressure_gauge" + "pressure_gauge":"!@link:/entry/instrument/pressure_gauge" }, "bias_env":{ - "potentiostat":"@link:/entry/instrument/manipulator/sample_bias_potentiostat", - "voltmeter":"@link:/entry/instrument/manipulator/sample_bias_voltmeter" + "potentiostat":"!@link:/entry/instrument/manipulator/sample_bias_potentiostat", + "voltmeter":"!@link:/entry/instrument/manipulator/sample_bias_voltmeter" }, "drain_current_env":{ - "amperemeter":"@link:/entry/instrument/manipulator/drain_current_amperemeter" + "amperemeter":"!@link:/entry/instrument/manipulator/drain_current_amperemeter" }, "flood_gun_current_env":{ - "flood_gun":"@link:/entry/instrument/flood_gun" + "flood_gun":"!@link:/entry/instrument/flood_gun" }, "depends_on":"/entry/sample/transformations/sample_rotation_angle", "transformations":{ @@ -355,14 +439,14 @@ "sample_rotation_angle/@vector":[0, 0, -1], "sample_rotation_angle/@depends_on":"sample_normal_tilt_azimuth_angle", - "sample_normal_polar_angle_of_tilt":"@attrs:sample_normal_polar_angle_of_tilt", + "sample_normal_polar_angle_of_tilt":"!@attrs:sample_normal_polar_angle_of_tilt", "sample_normal_polar_angle_of_tilt/@units":"@attrs:sample_normal_polar_angle_of_tilt/@units", "sample_normal_polar_angle_of_tilt/@transformation_type":"rotation", "sample_normal_polar_angle_of_tilt/@vector":[0, -1, 0], "sample_normal_polar_angle_of_tilt/@depends_on":"sample_normal_tilt_azimuth_angle", - "sample_normal_tilt_azimuth_angle":"@attrs:sample/sample_normal_tilt_azimuth_angle", - "sample_normal_tilt_azimuth_angle/@units":"@attrs:sample/sample_normal_tilt_azimuth_angle/@units", + "sample_normal_tilt_azimuth_angle":"!@attrs:sample_normal_tilt_azimuth_angle", + "sample_normal_tilt_azimuth_angle/@units":"@attrs:sample_normal_tilt_azimuth_angle/@units", "sample_normal_tilt_azimuth_angle/@transformation_type":"rotation", "sample_normal_tilt_azimuth_angle/@vector":[0, 0, -1], "sample_normal_tilt_azimuth_angle/@depends_on":"/entry/geometries/xps_coordinate_system/coordinate_transformations/z_rotation" @@ -373,12 +457,12 @@ "@signal":"data", "data":"@data:average", "data_errors":"@data:errors", - "data/@units":"@attrs:y_units_1", + "data/@units":"['@attrs:y_units_1','@attrs:y_units', 'counts_per_second']", "DATA[*]":"@data:*.scans", - "DATA[*]/@units":"@attrs:y_units_1", + "DATA[*]/@units":"['@attrs:y_units_1','@attrs:y_units', 'counts_per_second']", "energy":"@data:energy", "energy/@type":"@attrs:energy_label", - "energy/@units":"@attrs:energy_units", + "energy/@units":"['@attrs:energy_units', 'eV']", "energy/@reference":null, "@energy_indices":0 } diff --git a/src/pynxtools_xps/models/__init__.py b/src/pynxtools_xps/models/__init__.py index 7adc2107..eab8c4dc 100644 --- a/src/pynxtools_xps/models/__init__.py +++ b/src/pynxtools_xps/models/__init__.py @@ -1,18 +1,18 @@ -# -# Copyright The NOMAD Authors. -# -# This file is part of NOMAD. See https://nomad-lab.eu for further info. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - +# +# Copyright The NOMAD Authors. +# +# This file is part of NOMAD. See https://nomad-lab.eu for further info. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + diff --git a/src/pynxtools_xps/models/backgrounds.py b/src/pynxtools_xps/models/backgrounds.py index d1f0d98a..401051fd 100644 --- a/src/pynxtools_xps/models/backgrounds.py +++ b/src/pynxtools_xps/models/backgrounds.py @@ -1,329 +1,413 @@ -# Copyright The NOMAD Authors. -# -# This file is part of NOMAD. See https://nomad-lab.eu for further info. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -""" -Backgrounds for peak fitting. -""" - -from typing import Optional, final -import numpy as np - - -class LinearBackground: - """Linear background model for XPS spectra that connects the first and last points in the y array.""" - - def __init__(self): - """ - Initialize the linear background model without predefined parameters. - The slope and intercept will be calculated based on the input data. - """ - self.slope = None - self.intercept = None - - @final - def calc_background(self, x: np.ndarray, y: np.ndarray) -> np.ndarray: - """ - Calculate the linear background based on the first and last points of the y array. - - Parameters: - x (np.ndarray): The energy values at which to evaluate the background. - y (np.ndarray): The observed intensity values at the corresponding energy points in x. - - Returns: - np.ndarray: The background intensity at each energy point. - """ - if len(x) != len(y): - raise ValueError("x and y arrays must have the same length.") - - # Calculate slope and intercept using the first and last points - self.slope = (y[-1] - y[0]) / (x[-1] - x[0]) - self.intercept = y[0] - self.slope * x[0] - - # Return the background values using the linear equation - return self.slope * x + self.intercept - - @final - def formula(self) -> str: - """ - Returns the formula used for the linear background model. - - Returns: - str: The formula for the linear background. - """ - return ( - "Linear Background Formula:\n" - "B(x) = slope * x + intercept\n" - "Where:\n" - " slope: (y[-1] - y[0]) / (x[-1] - x[0]) # Calculated from the first and last points of y\n" - " intercept: y[0] - slope * x[0] # Calculated from the first point of y\n" - " x: Energy values\n" - " y: Intensity values\n" - ) - - -class Shirley: - """ - Shirley background subtraction using the Sherwood method. - - The Shirley method is used to compute the background of an x-ray or - spectroscopy data set by iteratively subtracting the baseline until - convergence. This class implements this method with options for - tolerance and maximum iterations. - """ - - def __init__(self) -> None: - """Initialize the Shirley background subtraction object.""" - pass - - @final - def calc_background( - self, x: np.ndarray, y: np.ndarray, tol: float = 1e-5, maxit: int = 15 - ) -> np.ndarray: - """ - Calculate the Shirley background using the Sherwood method. - - Parameters: - - x (np.ndarray): The x-axis data (energy or time, etc.). - - y (np.ndarray): The y-axis data (intensity or counts, etc.). - - tol (float, optional): The tolerance for the convergence criterion. Defaults to 1e-5. - - maxit (int, optional): The maximum number of iterations to prevent infinite loops. Defaults to 15. - - Raises: - - ValueError: If input arrays have mismatched dimensions, incorrect types, or the fit does not converge. - - Returns: - - np.ndarray: The calculated Shirley background. - """ - - # Validate input - if not isinstance(x, np.ndarray) or not isinstance(y, np.ndarray): - raise ValueError( - f"Parameters x and y must be numpy arrays, not {type(x)} and {type(y)}" - ) - - if len(x) != len(y): - raise ValueError("x and y arrays must have the same length.") - - if x.size == 0 or y.size == 0: - raise ValueError("x or y array is empty.") - - if x.ndim > 1 or y.ndim > 1: - raise ValueError( - f"Data arrays must be one-dimensional. Found shapes x: {x.shape}, y: {y.shape}." - ) - - # Reverse the data if it is in ascending order - is_reversed = False - if x[0] < x[-1]: - is_reversed = True - x = np.flip(x) - y = np.flip(y) - - # Initialize background arrays - background = np.zeros_like(x) - background_next = np.zeros_like(x) - - # Iterative loop to compute Shirley background - for iters in range(maxit): - k = (y[0] - y[-1]) / np.trapz(y - background, x=x) - - # Update the background using trapezoidal integration - for energy in range(len(x)): - background_next[energy] = k * np.trapz( - y[energy:] - background[energy:], x=x[energy:] - ) - - # Check for convergence - diff = np.linalg.norm(background_next - background) - background = np.copy(background_next) - if diff < tol: - break - else: - # Raise an error if the maximum iterations are reached - raise ValueError( - "Maximum number of iterations exceeded before convergence." - ) - - # Reverse back the result if the data was originally reversed - if is_reversed: - return np.flip(y[-1] + background) - return y[-1] + background - - def formula(self) -> str: - """ - Returns the iterative formula used in the Shirley background subtraction. - - Returns: - - str: The formula used for the Shirley background subtraction. - """ - return ( - "Shirley Background Subtraction Formula:\n" - "1. Initial background B_i = 0 for all i.\n" - "2. In each iteration, calculate:\n" - " k = (y_0 - y_n) / integral(x_0, x_n, (y_j - B_j) dx_j)\n" - "3. Update the background using the formula:\n" - " B_i = k * integral(x_i, x_n, (y_j - B_j) dx_j)\n" - "4. Repeat steps 2 and 3 until convergence (|B_next - B| < tol).\n" - "5. If convergence fails after maxit iterations, raise an error.\n" - "Where:\n" - " B_i: Background at point i\n" - " k: Scaling factor based on the integral\n" - " y_i: Original data at point i\n" - " x_i: X-axis values\n" - " B: Current background estimate\n" - " tol: Convergence tolerance\n" - " maxit: Maximum number of iterations" - ) - - -class TougaardU3: - """U3 Tougaard background model for XPS spectra. - - Parameters: - E0 (float): Energy onset for the background, typically near the core-level peak energy. - A (float): Scaling factor for the background intensity. - C (float): Shape parameter that determines the background curvature. - """ - - def __init__(self, E0: float, A: float, C: float): - """ - Initialize the U3 Tougaard model with the required parameters. - - Parameters: - E0 (float): Energy onset for the background, typically near the core-level peak energy. - A (float): Scaling factor for the background intensity. - C (float): Shape parameter that determines the background curvature. - """ - self.E0 = E0 - self.A = A - self.C = C - - @final - def calc_background(self, x: np.ndarray, y: np.ndarray) -> np.ndarray: - """Calculate the Tougaard background at each energy point. - - Args: - x (np.ndarray): Array of energy values. - - Returns: - np.ndarray: Tougaard background values corresponding to each energy value in x. - """ - return self.A / ((x - self.E0) ** 2 + self.C) - - @final - def formula(self) -> str: - """ - Returns the formula used for the Tougaard background model. - - Returns: - str: The formula for the Tougaard background. - """ - return ( - "Tougaard U3 Background Formula:\n" - "B(x) = A / ((x - E0)^2 + C)\n" - "Where:\n" - " B(x): Background at energy x\n" - " A: Scaling factor\n" - " E0: Energy onset (near core-level peak energy)\n" - " C: Shape parameter (determines background curvature)\n" - ) - - @final - def __repr__(self) -> str: - """ - Returns a string representation of the GaussianLorentzianSum object, including details - for position, width, intensity, and fraction_gauss. - - Returns: - - str: The string representation of the object. - """ - return f"TougaardU3(E0={self.E0}, A={self.A}, " f"C={self.C})" - - -class TougaardU4: - """U4 Tougaard background model for XPS spectra.""" - - def __init__(self, B: float, C: float, D: float, Eg: float, temp: float = 300.0): - """ - Initialize the U4 Tougaard model with the required parameters. - - Parameters: - B (float): A scaling factor that adjusts the overall amplitude of the background. - C (float): A parameter influencing the background's shape, specifically the width of the energy region where the background decays. - D (float): Another shape parameter that controls the fall-off of the background, influencing the tail of the background. - Eg (float): The energy onset for the background, typically near the core-level peak energy, controlling where the background starts. - temp (float): Temperature in Kelvin (default is 300 K). - """ - self.B = B - self.C = C - self.D = D - self.Eg = Eg - self.temp = temp - - @final - def calc_background(self, x: np.ndarray, y: np.ndarray) -> np.ndarray: - """ - Calculate the modified Tougaard background at each energy point using a modified expression. - - Parameters: - x (np.ndarray or float): The energy values at which to evaluate the background. - - Returns: - np.ndarray or float: The background intensity at each energy point. - """ - kb = 0.000086 # Boltzmann constant in eV/K - - return ( - (self.B * x) - / ((self.C - x**2) ** 2 + self.D * x**2) - * 1 - / (np.exp((self.Eg - x) / (self.temp * kb)) + 1) - ) - - @final - def formula(self) -> str: - """ - Returns the formula used for the U4 Tougaard background model. - - Returns: - str: The formula for the U4 Tougaard background. - """ - return ( - "U4 Tougaard Background Formula:\n" - "B(x) = (B * x) / ((C - x^2)^2 + D * x^2) * 1 / (exp((Eg - x) / (kB * T)) + 1)\n" - "Where:\n" - " B: Scaling factor\n" - " C: Shape parameter (affects width of energy decay region)\n" - " D: Shape parameter (controls the tail of the background)\n" - " Eg: Energy onset (near core-level peak energy)\n" - " kB: Boltzmann constant (0.000086 eV/K)\n" - " T: Temperature (default 300 K, can be modified)\n" - " x: Energy values\n" - ) - - @final - def __repr__(self) -> str: - """ - Returns a string representation of the GaussianLorentzianSum object, including details - for position, width, intensity, and fraction_gauss. - - Returns: - - str: The string representation of the object. - """ - return ( - f"TougaardU4(B={self.B}, C={self.C}, D={self.D}, " - f"Eg={self.Eg}, temp={self.temp})" - ) +# Copyright The NOMAD Authors. +# +# This file is part of NOMAD. See https://nomad-lab.eu for further info. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +Backgrounds for peak fitting. +""" + +from typing import Optional +import numpy as np +import scipy + + +class LinearBackground: + """Linear background model for XPS spectra that connects the first and last points in the y array.""" + + def __init__(self): + """ + Initialize the linear background model without predefined parameters. + The slope and intercept will be calculated based on the input data. + """ + self.slope = None + self.intercept = None + + def calc_background(self, x: np.ndarray, y: np.ndarray) -> np.ndarray: + """ + Calculate the linear background based on the first and last points of the y array. + + Parameters: + x (np.ndarray): The energy values at which to evaluate the background. + y (np.ndarray): The observed intensity values at the corresponding energy points in x. + + Returns: + np.ndarray: The background intensity at each energy point. + """ + if len(x) != len(y): + raise ValueError("x and y arrays must have the same length.") + + # Calculate slope and intercept using the first and last points + self.slope = (y[-1] - y[0]) / (x[-1] - x[0]) + self.intercept = y[0] - self.slope * x[0] + + # Return the background values using the linear equation + return self.slope * x + self.intercept + + def formula(self) -> str: + """ + Returns the formula used for the linear background model. + + Returns: + str: The formula for the linear background. + """ + return ( + "Linear Background Formula:\n" + "B(x) = slope * x + intercept\n" + "Where:\n" + " slope: (y[-1] - y[0]) / (x[-1] - x[0]) # Calculated from the first and last points of y\n" + " intercept: y[0] - slope * x[0] # Calculated from the first point of y\n" + " x: Energy values\n" + " y: Intensity values\n" + ) + + +class StepUp: + """Step Up background model for XPS spectra, based on the complementary error function.""" + + def __init__(self, a0: float, a1: float, a2: float, a3: float): + """ + Initialize the Step Up background model without predefined parameters. + """ + self.a0 = a0 # Step magnitude + self.a1 = a1 # Edge position + self.a2 = a2 # FWHM-related parameter + self.a3 = a3 # Constant offset + + def calc_background(self, x: np.ndarray, y: np.ndarray) -> np.ndarray: + """ + Calculate the Step Up background based on the complementary error function. + + Parameters: + x (np.ndarray): The energy values at which to evaluate the background. + a0 (float): Magnitude of the step. + a1 (float): Location of the step (edge position). + a2 (float): Parameter related to FWHM (2 * sqrt(ln(2)) * a2 = FWHM). + a3 (float): Constant offset. + + Returns: + np.ndarray: The background intensity at each energy point. + """ + return (self.a0 / 2) * scipy.special.erfc((self.a1 - x) / self.a2) + self.a3 + + def formula(self) -> str: + """ + Returns the formula used for the Step Up background model. + + Returns: + str: The formula for the Step Up background. + """ + return ( + "Step Up Background Formula:\n" + "B(x; a0, a1, a2, a3) = (a0 / 2) * erfc((a1 - x) / a2) + a3\n" + "Where:\n" + " a0: Magnitude of the step\n" + " a1: Edge position\n" + " a2: FWHM-related parameter (FWHM = 2 * sqrt(ln(2)) * a2)\n" + " a3: Constant offset\n" + ) + + +class StepDown: + """Step Down background model for XPS spectra, the reflection of Step Up around the edge position.""" + + def __init__(self, a0: float, a1: float, a2: float, a3: float): + """ + Initialize the Step Up background model without predefined parameters. + """ + self.a0 = a0 # Step magnitude + self.a1 = a1 # Edge position + self.a2 = a2 # FWHM-related parameter + self.a3 = a3 # Constant offset + + def calc_background(self, x: np.ndarray, y: np.ndarray) -> np.ndarray: + """ + Calculate the Step Down background as the reflection of the Step Up background. + + Parameters: + x (np.ndarray): 1D array of energy values at which to evaluate the background. + a0 (float): Magnitude of the step. + a1 (float): Location of the step (edge position). + a2 (float): Parameter related to FWHM (2 * sqrt(ln(2)) * a2 = FWHM). + a3 (float): Constant offset. + + Returns: + np.ndarray: 1D array of background intensity at each energy point. + """ + + return (self.a0 / 2) * scipy.special.erfc((x - self.a1) / self.a2) + self.a3 + + def formula(self) -> str: + """ + Returns the formula used for the Step Down background model. + + Returns: + str: The formula for the Step Down background. + """ + return ( + "Step Down Background Formula:\n" + "B(x) = (a0 / 2) * erfc((x - a1) / a2) + a3\n" + "Where:\n" + " a0: Magnitude of the step\n" + " a1: Edge position\n" + " a2: FWHM-related parameter (FWHM = 2 * sqrt(ln(2)) * a2)\n" + " a3: Constant offset\n" + ) + + +class Shirley: + """ + Shirley background subtraction using the Sherwood method. + + The Shirley method is used to compute the background of an x-ray or + spectroscopy data set by iteratively subtracting the baseline until + convergence. This class implements this method with options for + tolerance and maximum iterations. + """ + + def __init__(self) -> None: + """Initialize the Shirley background subtraction object.""" + pass + + def calc_background( + self, x: np.ndarray, y: np.ndarray, tol: float = 1e-5, maxit: int = 15 + ) -> np.ndarray: + """ + Calculate the Shirley background using the Sherwood method. + + Parameters: + - x (np.ndarray): The x-axis data (energy or time, etc.). + - y (np.ndarray): The y-axis data (intensity or counts, etc.). + - tol (float, optional): The tolerance for the convergence criterion. Defaults to 1e-5. + - maxit (int, optional): The maximum number of iterations to prevent infinite loops. Defaults to 15. + + Raises: + - ValueError: If input arrays have mismatched dimensions, incorrect types, or the fit does not converge. + + Returns: + - np.ndarray: The calculated Shirley background. + """ + # Validate input + if not isinstance(x, np.ndarray) or not isinstance(y, np.ndarray): + raise ValueError( + f"Parameters x and y must be numpy arrays, not {type(x)} and {type(y)}" + ) + + if len(x) != len(y): + raise ValueError("x and y arrays must have the same length.") + + if x.size == 0 or y.size == 0: + raise ValueError("x or y array is empty.") + + if x.ndim > 1 or y.ndim > 1: + raise ValueError( + f"Data arrays must be one-dimensional. Found shapes x: {x.shape}, y: {y.shape}." + ) + + # Reverse the data if it is in ascending order + is_reversed = False + if x[0] < x[-1]: + is_reversed = True + x = np.flip(x) + y = np.flip(y) + + # Initialize background arrays + background = np.zeros_like(x) + background_next = np.zeros_like(x) + + # Iterative loop to compute Shirley background + for iters in range(maxit): + k = (y[0] - y[-1]) / np.trapz(y - background, x=x) + + # Update the background using trapezoidal integration + for energy in range(len(x)): + background_next[energy] = k * np.trapz( + y[energy:] - background[energy:], x=x[energy:] + ) + + # Check for convergence + diff = np.linalg.norm(background_next - background) + background = np.copy(background_next) + if diff < tol: + break + else: + # Raise an error if the maximum iterations are reached + raise ValueError( + "Maximum number of iterations exceeded before convergence." + ) + + # Reverse back the result if the data was originally reversed + if is_reversed: + return np.flip(y[-1] + background) + return y[-1] + background + + def formula(self) -> str: + """ + Returns the iterative formula used in the Shirley background subtraction. + + Returns: + - str: The formula used for the Shirley background subtraction. + """ + return ( + "Shirley Background Subtraction Formula:\n" + "1. Initial background B_i = 0 for all i.\n" + "2. In each iteration, calculate:\n" + " k = (y_0 - y_n) / integral(x_0, x_n, (y_j - B_j) dx_j)\n" + "3. Update the background using the formula:\n" + " B_i = k * integral(x_i, x_n, (y_j - B_j) dx_j)\n" + "4. Repeat steps 2 and 3 until convergence (|B_next - B| < tol).\n" + "5. If convergence fails after maxit iterations, raise an error.\n" + "Where:\n" + " B_i: Background at point i\n" + " k: Scaling factor based on the integral\n" + " y_i: Original data at point i\n" + " x_i: X-axis values\n" + " B: Current background estimate\n" + " tol: Convergence tolerance\n" + " maxit: Maximum number of iterations" + ) + + +class TougaardU3: + """U3 Tougaard background model for XPS spectra. + + Parameters: + E0 (float): Energy onset for the background, typically near the core-level peak energy. + A (float): Scaling factor for the background intensity. + C (float): Shape parameter that determines the background curvature. + """ + + def __init__(self, E0: float, A: float, C: float): + """ + Initialize the U3 Tougaard model with the required parameters. + + Parameters: + E0 (float): Energy onset for the background, typically near the core-level peak energy. + A (float): Scaling factor for the background intensity. + C (float): Shape parameter that determines the background curvature. + """ + self.E0 = E0 + self.A = A + self.C = C + + def calc_background(self, x: np.ndarray, y: np.ndarray) -> np.ndarray: + """Calculate the Tougaard background at each energy point. + + Args: + x (np.ndarray): Array of energy values. + + Returns: + np.ndarray: Tougaard background values corresponding to each energy value in x. + """ + return self.A / ((x - self.E0) ** 2 + self.C) + + def formula(self) -> str: + """ + Returns the formula used for the Tougaard background model. + + Returns: + str: The formula for the Tougaard background. + """ + return ( + "Tougaard U3 Background Formula:\n" + "B(x; A, E0, C) = A / ((x - E0)^2 + C)\n" + "Where:\n" + " B(x): Background at energy x\n" + " A: Scaling factor\n" + " E0: Energy onset (near core-level peak energy)\n" + " C: Shape parameter (determines background curvature)\n" + ) + + def __repr__(self) -> str: + """ + Returns a string representation of the GaussianLorentzianSum object, including details + for position, width, intensity, and fraction_gauss. + + Returns: + - str: The string representation of the object. + """ + return f"TougaardU3(E0={self.E0}, A={self.A}, " f"C={self.C})" + + +class TougaardU4: + """U4 Tougaard background model for XPS spectra.""" + + def __init__(self, B: float, C: float, D: float, Eg: float, temp: float = 300.0): + """ + Initialize the U4 Tougaard model with the required parameters. + + Parameters: + B (float): A scaling factor that adjusts the overall amplitude of the background. + C (float): A parameter influencing the background's shape, specifically the width of the energy region where the background decays. + D (float): Another shape parameter that controls the fall-off of the background, influencing the tail of the background. + Eg (float): The energy onset for the background, typically near the core-level peak energy, controlling where the background starts. + temp (float): Temperature in Kelvin (default is 300 K). + """ + self.B = B + self.C = C + self.D = D + self.Eg = Eg + self.temp = temp + + def calc_background(self, x: np.ndarray, y: np.ndarray) -> np.ndarray: + """ + Calculate the modified Tougaard background at each energy point using a modified expression. + + Parameters: + x (np.ndarray or float): The energy values at which to evaluate the background. + + Returns: + np.ndarray or float: The background intensity at each energy point. + """ + kb = 0.000086 # Boltzmann constant in eV/K + + return ( + (self.B * x) + / ((self.C - x**2) ** 2 + self.D * x**2) + * 1 + / (np.exp((self.Eg - x) / (self.temp * kb)) + 1) + ) + + def formula(self) -> str: + """ + Returns the formula used for the U4 Tougaard background model. + + Returns: + str: The formula for the U4 Tougaard background. + """ + return ( + "U4 Tougaard Background Formula:\n" + "B(x; B, C, D, E_g, T) = (B * x) / ((C - x^2)^2 + D * x^2) * 1 / (exp((Eg - x) / (kB * T)) + 1)\n" + "Where:\n" + " B: Scaling factor\n" + " C: Shape parameter (affects width of energy decay region)\n" + " D: Shape parameter (controls the tail of the background)\n" + " Eg: Energy onset (near core-level peak energy)\n" + " kB: Boltzmann constant (0.000086 eV/K)\n" + " T: Temperature (default 300 K, can be modified)\n" + " x: Energy values\n" + ) + + def __repr__(self) -> str: + """ + Returns a string representation of the GaussianLorentzianSum object, including details + for position, width, intensity, and fraction_gauss. + + Returns: + - str: The string representation of the object. + """ + return ( + f"TougaardU4(B={self.B}, C={self.C}, D={self.D}, " + f"Eg={self.Eg}, temp={self.temp})" + ) diff --git a/src/pynxtools_xps/models/lineshapes.py b/src/pynxtools_xps/models/lineshapes.py index b6bd4827..31af3a7d 100644 --- a/src/pynxtools_xps/models/lineshapes.py +++ b/src/pynxtools_xps/models/lineshapes.py @@ -1,470 +1,454 @@ -# Copyright The NOMAD Authors. -# -# This file is part of NOMAD. See https://nomad-lab.eu for further info. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -""" -Line shapes for peak fitting. -""" - -from typing import Optional, final -import numpy as np - - -class Peak: - def __init__(self, position: float, width: float, area: float) -> None: - """ - Base class for a peak with position, width, and intensity. - - Parameters: - - position (float): Position of the peak on the energy axis. - - width (float): Full width at half maximum (FWHM) of the peak. - - area (float): Area of the peak. - """ - if width <= 0: - raise ValueError("Width must be positive.") - if area <= 0: - raise ValueError("Intensity must be positive.") - self.position = position - self.width = width - self.area = area - - -class Gaussian(Peak): - """ - Gaussian peak with specified position, width, and area. - """ - - @final - def calc_lineshape(self, x: float) -> Optional[float]: - """ - Calculate the Gaussian lineshape at a given energy value x. - - Parameters: - - x (float): Energy value on the energy axis. - - Returns: - - float: Value of the Gaussian lineshape at x, or None if width is zero. - """ - if self.width != 0: - # Calculate intensity from area - intensity = self.area / (np.sqrt(2 * np.pi) * self.width) - exponent = -4 * np.log(2) * (x - self.position) ** 2 / self.width**2 - return intensity * np.exp(exponent) - return None - - @final - def formula(self) -> str: - """ - Returns a string representation of the Gaussian formula. - - Returns: - - str: The formula used for the Gaussian lineshape. - """ - return "G(x) = (area / (sqrt(2 * pi) * width)) * exp[-(4 * ln(2) * (x - position)^2) / width^2]" - - @final - def __repr__(self) -> str: - return ( - f"Gaussian(position={self.position}, width={self.width}, area={self.area})" - ) - - -class Lorentzian(Peak): - """Lorentzian peak with specified position, width, and area.""" - - @final - def calc_lineshape(self, x: float) -> Optional[float]: - """ - Calculate the Lorentzian lineshape at a given energy value x. - - Parameters: - - x (float): Energy value on the energy axis. - - Returns: - - float: Value of the Lorentzian lineshape at x, or None if width is zero. - """ - if self.width != 0: - # Calculate intensity from area - intensity = self.area / (np.pi * self.width) - return intensity / (1 + (4 * (x - self.position) ** 2) / self.width**2) - return None - - @final - def formula(self) -> str: - """ - Returns a string representation of the Lorentzian formula. - - Returns: - - str: The formula used for the Lorentzian lineshape. - """ - return "L(x) = (area / (pi * width)) / (1 + (4 * (x - position)^2) / width^2)" - - @final - def __repr__(self) -> str: - return f"Lorentzian(position={self.position}, width={self.width}, area={self.area})" - - -class LorentzianAsymmetric: - def __init__( - self, position: float, width: float, area: float, alpha: float, beta: float - ): - """ - Initialize the LorentzianAsymmetric profile with the required parameters. - - Parameters: - position (float): The peak position (center). - width (float): Full width at half maximum (FWHM) of the peak. - area (float): Area under the peak. - alpha (float): Scaling factor for width on the right. - beta (float): Scaling factor for width on the left. - """ - self.position = position - self.width = width - self.area = area - self.alpha = alpha - self.beta = beta - - @final - def calc_lineshape(self, x: np.ndarray) -> np.ndarray: - """ - Calculate the asymmetric Lorentzian lineshape for an array of energy values x. - - Parameters: - - x (np.ndarray): Array of energy values on the energy axis. - - Returns: - - np.ndarray: Array of values of the Lorentzian lineshape for each x. - """ - # Calculate intensity from area - intensity = self.area / (np.pi * self.width) - - # Calculate the width based on the position and scaling factors alpha and beta - width = np.where( - x < self.position, self.width / self.alpha, self.width / self.beta - ) - - # Calculate the Lorentzian lineshape for each value in x - return intensity / (1 + (4 * (x - self.position) ** 2) / width**2) - - @final - def formula(self) -> str: - """ - Returns a string representation of the asymmetric Lorentzian formula. - - Returns: - - str: The formula used for the asymmetric Lorentzian lineshape. - """ - return ( - "L(x) = (area / (pi * width)) / (1 + ((x - position) / (width / alpha))^2) for x <= position, " - "(area / (pi * width)) / (1 + ((x - position) / (width / beta))^2) for x > position" - ) - - @final - def __repr__(self) -> str: - """ - Return a string representation of the LorentzianAsymmetric object. - """ - return ( - f"LorentzianAsymmetric(position={self.position}, width={self.width}, " - f"area={self.area}, alpha={self.alpha}, beta={self.beta})" - ) - - -class LorentzianFinite(LorentzianAsymmetric): - """Finite Lorentzian peak with specified position, width, area, asymmetry parameters, and damping.""" - - def __init__( - self, - position: float, - width: float, - area: float, - alpha: float, - beta: float, - w: float, - gauss_contribution: float, - no_of_convolutions: int, - ): - super().__init__(position, width, area, alpha, beta) - self.w = w - self.gauss_contribution = gauss_contribution - self.no_of_convolutions = no_of_convolutions - - @final - def calc_lineshape(self, x: np.ndarray) -> Optional[np.ndarray]: - """ - Calculate the finite Lorentzian lineshape with damping for an array of energy values x. - - Parameters: - - x (np.ndarray): Array of energy values on the energy axis. - - Returns: - - np.ndarray: Array of finite Lorentzian lineshape values for each x. - """ - # Get the Lorentzian lineshape for the array of x - lorentzian = super().calc_lineshape(x) - - # If lorentzian is not None, apply damping - if lorentzian is not None: - damping_factor = 1 / (1 + 4 * ((x - self.position) / self.w) ** 2) - return lorentzian * damping_factor - - # Return None if lorentzian is None - return None - - @final - def formula(self) -> str: - """ - Returns a string representation of the finite Lorentzian formula with damping. - - Returns: - - str: The formula used for the finite Lorentzian lineshape. - """ - return ( - "L(x) = (area / (pi * width)) / (1 + (4 * (x - position)^2) / width^2) * " - "(1 / (1 + 4 * ((x - position) / w)^2))" - ) - - @final - def __repr__(self) -> str: - """ - Return a string representation of the LorentzianFinite object. - """ - return ( - f"LorentzianFinite(position={self.position}, width={self.width}, " - f"area={self.area}, alpha={self.alpha}, beta={self.beta}, w={self.w})" - ) - - -class GaussianLorentzianSum(Peak): - """Combined Gaussian and Lorentzian peak using the existing Gaussian and Lorentzian classes.""" - - @final - def __init__( - self, position: float, width: float, area: float, fraction_gauss: float = 0.5 - ) -> None: - """ - Combined Gaussian and Lorentzian sum peak. - - Parameters: - - position (float): Position of the peak. - - width (float): Width of the peak. - - area (float): Area of the peak (instead of intensity). - - fraction_gauss (float): Fraction of the Gaussian contribution (between 0 and 1). - """ - super().__init__(position, width, area) - self.fraction_gauss = fraction_gauss - - @final - def calc_lineshape(self, x: float) -> Optional[float]: - """ - Calculate the combined lineshape of the Gaussian and Lorentzian at x. - - Parameters: - - x (float): Energy value on the energy axis. - - Returns: - - float: Combined lineshape value at x, or None if width is zero. - """ - if self.width != 0: - # Calculate intensity from area for Gaussian and Lorentzian - intensity = self.area / (np.pi * self.width) - - # Gaussian part (1 - fraction_gauss) contribution - gauss_part = (1 - self.fraction_gauss) * Gaussian( - self.position, self.width, intensity - ).calc_lineshape(x) - # Lorentzian part (fraction_gauss) contribution - lorentz_part = self.fraction_gauss * Lorentzian( - self.position, self.width, intensity - ).calc_lineshape(x) - return gauss_part + lorentz_part - return None - - @final - def formula(self) -> str: - """ - Returns a detailed string representation of the combined Gaussian-Lorentzian formula. - - Returns: - - str: The formula used for the combined lineshape. - """ - # Using the formula for both Gaussian and Lorentzian - gauss_formula = "G(x) = (area / (pi * width)) * exp[-(4 * ln(2) * (x - position)^2) / width^2]" - lorentz_formula = ( - "L(x) = area / (pi * width) / (1 + (4 * (x - position)^2) / width^2)" - ) - combined_formula = ( - f"SGL(x): G(x) + L(x) = (1 - fraction_gauss) * ({gauss_formula}) + " - f"fraction_gauss * ({lorentz_formula})" - ) - return combined_formula - - @final - def __repr__(self) -> str: - """ - Returns a string representation of the GaussianLorentzianSum object, including details - for position, width, area, and fraction_gauss. - - Returns: - - str: The string representation of the object. - """ - return ( - f"GaussianLorentzianSum(position={self.position}, width={self.width}, " - f"area={self.area}, fraction_gauss={self.fraction_gauss})" - ) - - -class GaussianLorentzianProduct(Peak): - def __init__( - self, position: float, width: float, area: float, fraction_gauss: float = 0.5 - ) -> None: - """ - Combined Gaussian and Lorentzian product peak. - - Parameters: - - position (float): Position of the peak. - - width (float): Width of the peak. - - area (float): Area of the peak (instead of intensity). - - fraction_gauss (float): Fraction of the Gaussian contribution. - """ - super().__init__(position, width, area) - self.fraction_gauss = fraction_gauss - - @final - def calc_lineshape(self, x: float) -> Optional[float]: - """ - Calculate the combined lineshape of the Gaussian and Lorentzian product at x. - - Parameters: - - x (float): Energy value on the energy axis. - - Returns: - - float: Combined lineshape value at x, or None if width is zero. - """ - if self.width != 0: - # Calculate intensity from area for Gaussian and Lorentzian - intensity = self.area / (np.pi * self.width) - - # Gaussian part (1 - fraction_gauss) contribution - gauss_part = (1 - self.fraction_gauss) * Gaussian( - self.position, self.width, intensity - ).calc_lineshape(x) - # Lorentzian part (fraction_gauss) contribution - lorentz_part = self.fraction_gauss * Lorentzian( - self.position, self.width, intensity - ).calc_lineshape(x) - return gauss_part * lorentz_part - return None - - @final - def formula(self) -> str: - """ - Returns a detailed string representation of the combined Gaussian-Lorentzian formula. - - Returns: - - str: The formula used for the combined lineshape. - """ - # Using the formula for both Gaussian and Lorentzian - gauss_formula = "G(x) = (area / (pi * width)) * exp[-(4 * ln(2) * (x - position)^2) / width^2]" - lorentz_formula = ( - "L(x) = area / (pi * width) / (1 + (4 * (x - position)^2) / width^2)" - ) - combined_formula = ( - f"GL(x): G(x) * L(x) = (1 - fraction_gauss) * ({gauss_formula}) * " - f"fraction_gauss * ({lorentz_formula})" - ) - return combined_formula - - @final - def __repr__(self) -> str: - """ - Returns a string representation of the GaussianLorentzianProduct object, including details - for position, width, area, and fraction_gauss. - - Returns: - - str: The string representation of the object. - """ - return ( - f"GaussianLorentzianProduct(position={self.position}, width={self.width}, " - f"area={self.area}, fraction_gauss={self.fraction_gauss})" - ) - - -class DoniachSunjic(Peak): - """Doniach-Sunjic profile for XPS peaks with asymmetry.""" - - def __init__(self, position: float, width: float, area: float, beta: float) -> None: - """ - Initialize the Doniach-Sunjic profile with the required parameters. - - Parameters: - position (float): The peak position (center). - width (float): Full width at half maximum (FWHM) of the peak. - area (float): Area under the peak (instead of intensity). - beta (float): Asymmetry parameter (1 for symmetric Lorentzian, <1 for left skew, >1 for right skew). - """ - # Initialize the parent Peak class with area - super().__init__(position, width, area) - self.beta = beta - - @final - def calc_lineshape(self, x: np.ndarray) -> np.ndarray: - """ - Calculate the Doniach-Sunjic profile at each energy point. - - Parameters: - x (np.ndarray or float): The energy values at which to evaluate the profile. - - Returns: - np.ndarray or float: The intensity values at each energy point. - """ - # Calculate the intensity from area for normalization - intensity = self.area / (np.pi * self.width) - - # Calculate the Doniach-Sunjic profile - return intensity / ((1 + ((x - self.position) / self.width) ** 2) ** self.beta) - - @final - def formula(self) -> str: - """ - Returns the formula used for the Doniach-Sunjic profile. - - Returns: - str: The formula for the Doniach-Sunjic profile. - """ - return ( - "Doniach-Sunjic Profile Formula:\n" - "f(x) = area / (pi * Gamma) * ((1 + ((x - x0) / Gamma)^2)^beta)\n" - "Where:\n" - " area: Area under the peak\n" - " x0: Peak position (center)\n" - " Gamma: FWHM (full width at half maximum)\n" - " beta: Asymmetry parameter (1 for symmetric Lorentzian)\n" - " x: Energy values\n" - ) - - @final - def __repr__(self) -> str: - """ - Returns a string representation of the DoniachSunjic object, including details - for position, width, area, and beta. - - Returns: - - str: The string representation of the object. - """ - return ( - f"DoniachSunjic(position={self.position}, width={self.width}, " - f"area={self.area}, beta={self.beta})" - ) +# Copyright The NOMAD Authors. +# +# This file is part of NOMAD. See https://nomad-lab.eu for further info. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +Line shapes for peak fitting. +""" + +from typing import Optional +import numpy as np + + +class Peak: + def __init__(self, position: float, width: float, area: float) -> None: + """ + Base class for a peak with position, width, and intensity. + + Parameters: + - position (float): Position of the peak on the energy axis. + - width (float): Full width at half maximum (FWHM) of the peak. + - area (float): Area of the peak. + """ + if width <= 0: + raise ValueError("Width must be positive.") + if area <= 0: + raise ValueError("Intensity must be positive.") + self.position = position + self.width = width + self.area = area + + +class Gaussian(Peak): + """ + Gaussian peak with specified position, width, and area. + """ + + def calc_lineshape(self, x: float) -> Optional[float]: + """ + Calculate the Gaussian lineshape at a given energy value x. + + Parameters: + - x (float): Energy value on the energy axis. + + Returns: + - float: Value of the Gaussian lineshape at x, or None if width is zero. + """ + if self.width != 0: + # Calculate intensity from area + intensity = self.area / (np.sqrt(2 * np.pi) * self.width) + exponent = -4 * np.log(2) * (x - self.position) ** 2 / self.width**2 + return intensity * np.exp(exponent) + return None + + def formula(self) -> str: + """ + Returns a string representation of the Gaussian formula. + + Returns: + - str: The formula used for the Gaussian lineshape. + """ + return "G(x; position, width, area) = (area / (sqrt(2 * pi) * width)) * exp[-(4 * ln(2) * (x - position)^2) / width^2]" + + def __repr__(self) -> str: + return ( + f"Gaussian(position={self.position}, width={self.width}, area={self.area})" + ) + + +class Lorentzian(Peak): + """Lorentzian peak with specified position, width, and area.""" + + def calc_lineshape(self, x: float) -> Optional[float]: + """ + Calculate the Lorentzian lineshape at a given energy value x. + + Parameters: + - x (float): Energy value on the energy axis. + + Returns: + - float: Value of the Lorentzian lineshape at x, or None if width is zero. + """ + if self.width != 0: + # Calculate intensity from area + intensity = self.area / (np.pi * self.width) + return intensity / (1 + (4 * (x - self.position) ** 2) / self.width**2) + return None + + def formula(self) -> str: + """ + Returns a string representation of the Lorentzian formula. + + Returns: + - str: The formula used for the Lorentzian lineshape. + """ + return "L(x; position, width, area) = (area / (pi * width)) / (1 + (4 * (x - position)^2) / width^2)" + + def __repr__(self) -> str: + return f"Lorentzian(position={self.position}, width={self.width}, area={self.area})" + + +class LorentzianAsymmetric: + def __init__( + self, position: float, width: float, area: float, alpha: float, beta: float + ): + """ + Initialize the LorentzianAsymmetric profile with the required parameters. + + Parameters: + position (float): The peak position (center). + width (float): Full width at half maximum (FWHM) of the peak. + area (float): Area under the peak. + alpha (float): Scaling factor for width on the right. + beta (float): Scaling factor for width on the left. + """ + self.position = position + self.width = width + self.area = area + self.alpha = alpha + self.beta = beta + + def calc_lineshape(self, x: np.ndarray) -> np.ndarray: + """ + Calculate the asymmetric Lorentzian lineshape for an array of energy values x. + + Parameters: + - x (np.ndarray): Array of energy values on the energy axis. + + Returns: + - np.ndarray: Array of values of the Lorentzian lineshape for each x. + """ + # Calculate intensity from area + intensity = self.area / (np.pi * self.width) + + # Calculate the width based on the position and scaling factors alpha and beta + width = np.where( + x < self.position, self.width / self.alpha, self.width / self.beta + ) + + # Calculate the Lorentzian lineshape for each value in x + return intensity / (1 + (4 * (x - self.position) ** 2) / width**2) + + def formula(self) -> str: + """ + Returns a string representation of the asymmetric Lorentzian formula. + + Returns: + - str: The formula used for the asymmetric Lorentzian lineshape. + """ + return ( + "LA(x; position, width, area, alpha, beta) = (area / (pi * width)) / (1 + ((x - position) / (width / alpha))^2) for x <= position, " + "(area / (pi * width)) / (1 + ((x - position) / (width / beta))^2) for x > position" + ) + + def __repr__(self) -> str: + """ + Return a string representation of the LorentzianAsymmetric object. + """ + return ( + f"LorentzianAsymmetric(position={self.position}, width={self.width}, " + f"area={self.area}, alpha={self.alpha}, beta={self.beta})" + ) + + +class LorentzianFinite(LorentzianAsymmetric): + """Finite Lorentzian peak with specified position, width, area, asymmetry parameters, and damping.""" + + def __init__( + self, + position: float, + width: float, + area: float, + alpha: float, + beta: float, + w: float, + gauss_contribution: float, + no_of_convolutions: int, + ): + super().__init__(position, width, area, alpha, beta) + self.w = w + self.gauss_contribution = gauss_contribution + self.no_of_convolutions = no_of_convolutions + + def calc_lineshape(self, x: np.ndarray) -> Optional[np.ndarray]: + """ + Calculate the finite Lorentzian lineshape with damping for an array of energy values x. + + Parameters: + - x (np.ndarray): Array of energy values on the energy axis. + + Returns: + - np.ndarray: Array of finite Lorentzian lineshape values for each x. + """ + # Get the Lorentzian lineshape for the array of x + lorentzian = super().calc_lineshape(x) + + # If lorentzian is not None, apply damping + if lorentzian is not None: + damping_factor = 1 / (1 + 4 * ((x - self.position) / self.w) ** 2) + return lorentzian * damping_factor + + # Return None if lorentzian is None + return None + + def formula(self) -> str: + """ + Returns a string representation of the finite Lorentzian formula with damping. + + Returns: + - str: The formula used for the finite Lorentzian lineshape. + """ + return ( + "LF(x; position, width, w) = (area / (pi * width)) / (1 + (4 * (x - position)^2) / width^2) * " + "(1 / (1 + 4 * ((x - position) / w)^2))" + ) + + def __repr__(self) -> str: + """ + Return a string representation of the LorentzianFinite object. + """ + return ( + f"LorentzianFinite(position={self.position}, width={self.width}, " + f"area={self.area}, alpha={self.alpha}, beta={self.beta}, w={self.w})" + ) + + +class GaussianLorentzianSum(Peak): + """Combined Gaussian and Lorentzian peak using the existing Gaussian and Lorentzian classes.""" + + def __init__( + self, position: float, width: float, area: float, fraction_gauss: float = 0.5 + ) -> None: + """ + Combined Gaussian and Lorentzian sum peak. + + Parameters: + - position (float): Position of the peak. + - width (float): Width of the peak. + - area (float): Area of the peak (instead of intensity). + - fraction_gauss (float): Fraction of the Gaussian contribution (between 0 and 1). + """ + super().__init__(position, width, area) + + if fraction_gauss > 1.0: + fraction_gauss /= 100 + self.fraction_gauss = fraction_gauss + + def calc_lineshape(self, x: float) -> Optional[float]: + """ + Calculate the combined lineshape of the Gaussian and Lorentzian at x. + + Parameters: + - x (float): Energy value on the energy axis. + + Returns: + - float: Combined lineshape value at x, or None if width is zero. + """ + if self.width != 0: + # Calculate intensity from area for Gaussian and Lorentzian + intensity = self.area / (np.pi * self.width) + + # Gaussian part (1 - fraction_gauss) contribution + gauss_part = (1 - self.fraction_gauss) * Gaussian( + self.position, self.width, intensity + ).calc_lineshape(x) + # Lorentzian part (fraction_gauss) contribution + lorentz_part = self.fraction_gauss * Lorentzian( + self.position, self.width, intensity + ).calc_lineshape(x) + return gauss_part + lorentz_part + return None + + def formula(self) -> str: + """ + Returns a detailed string representation of the combined Gaussian-Lorentzian formula. + + Returns: + - str: The formula used for the combined lineshape. + """ + # Using the formula for both Gaussian and Lorentzian + gauss_formula = ( + "(area / (pi * width)) * exp[-(4 * ln(2) * (x - position)^2) / width^2]" + ) + lorentz_formula = "area / (pi * width) / (1 + (4 * (x - position)^2) / width^2)" + combined_formula = ( + f"SGL(x; position, width, area, fraction_gauss): fraction_gauss * G(x) + (1 - fraction_gauss) * L(x) = fraction_gauss * ({gauss_formula}) + " + f"(1 - fraction_gauss) * ({lorentz_formula})" + ) + return combined_formula + + def __repr__(self) -> str: + """ + Returns a string representation of the GaussianLorentzianSum object, including details + for position, width, area, and fraction_gauss. + + Returns: + - str: The string representation of the object. + """ + return ( + f"GaussianLorentzianSum(position={self.position}, width={self.width}, " + f"area={self.area}, fraction_gauss={self.fraction_gauss})" + ) + + +class GaussianLorentzianProduct(Peak): + def __init__( + self, position: float, width: float, area: float, fraction_gauss: float = 0.5 + ) -> None: + """ + Combined Gaussian and Lorentzian product peak. + + Parameters: + - position (float): Position of the peak. + - width (float): Width of the peak. + - area (float): Area of the peak (instead of intensity). + - fraction_gauss (float): Fraction of the Gaussian contribution. + """ + super().__init__(position, width, area) + + if fraction_gauss > 1.0: + fraction_gauss /= 100 + self.fraction_gauss = fraction_gauss + + def calc_lineshape(self, x: float) -> Optional[float]: + """ + Calculate the combined lineshape of the Gaussian and Lorentzian product at x. + + Parameters: + - x (float): Energy value on the energy axis. + + Returns: + - float: Combined lineshape value at x, or None if width is zero. + """ + if self.width != 0: + # Calculate intensity from area for Gaussian and Lorentzian + intensity = self.area / (np.pi * self.width) + + # Gaussian part (1 - fraction_gauss) contribution + gauss_part = self.fraction_gauss * Gaussian( + self.position, self.width, intensity + ).calc_lineshape(x) + # Lorentzian part (fraction_gauss) contribution + lorentz_part = (1 - self.fraction_gauss) * Lorentzian( + self.position, self.width, intensity + ).calc_lineshape(x) + return gauss_part * lorentz_part + return None + + def formula(self) -> str: + """ + Returns a detailed string representation of the combined Gaussian-Lorentzian formula. + + Returns: + - str: The formula used for the combined lineshape. + """ + # Using the formula for both Gaussian and Lorentzian + gauss_formula = ( + "(area / (pi * width)) * exp[-(4 * ln(2) * (x - position)^2) / width^2]" + ) + lorentz_formula = "area / (pi * width) / (1 + (4 * (x - position)^2) / width^2)" + combined_formula = ( + f"GL(x; position, width, area, fraction_gauss): fraction_gauss * G(x) * (1 - fraction_gauss) * L(x) = fraction_gauss * ({gauss_formula}) * " + f"(1 - fraction_gauss) * ({lorentz_formula})" + ) + return combined_formula + + def __repr__(self) -> str: + """ + Returns a string representation of the GaussianLorentzianProduct object, including details + for position, width, area, and fraction_gauss. + + Returns: + - str: The string representation of the object. + """ + return ( + f"GaussianLorentzianProduct(position={self.position}, width={self.width}, " + f"area={self.area}, fraction_gauss={self.fraction_gauss})" + ) + + +class DoniachSunjic(Peak): + """Doniach-Sunjic profile for XPS peaks with asymmetry.""" + + def __init__(self, position: float, width: float, area: float, beta: float) -> None: + """ + Initialize the Doniach-Sunjic profile with the required parameters. + + Parameters: + position (float): The peak position (center). + width (float): Full width at half maximum (FWHM) of the peak. + area (float): Area under the peak (instead of intensity). + beta (float): Asymmetry parameter (1 for symmetric Lorentzian, <1 for left skew, >1 for right skew). + """ + # Initialize the parent Peak class with area + super().__init__(position, width, area) + self.beta = beta + + def calc_lineshape(self, x: np.ndarray) -> np.ndarray: + """ + Calculate the Doniach-Sunjic profile at each energy point. + + Parameters: + x (np.ndarray or float): The energy values at which to evaluate the profile. + + Returns: + np.ndarray or float: The intensity values at each energy point. + """ + # Calculate the intensity from area for normalization + intensity = self.area / (np.pi * self.width) + + # Calculate the Doniach-Sunjic profile + return intensity / ((1 + ((x - self.position) / self.width) ** 2) ** self.beta) + + def formula(self) -> str: + """ + Returns the formula used for the Doniach-Sunjic profile. + + Returns: + str: The formula for the Doniach-Sunjic profile. + """ + return ( + "Doniach-Sunjic Profile Formula:\n" + "f(x; area, position, width, beta, fraction_gauss)) = area / (pi * width) * ((1 + ((x - position) / width)^2)^beta)\n" + "Where:\n" + " area: Area under the peak\n" + " position: Peak position (center)\n" + " width: FWHM (full width at half maximum)\n" + " beta: Asymmetry parameter (1 for symmetric Lorentzian)\n" + " x: Energy values\n" + ) + + def __repr__(self) -> str: + """ + Returns a string representation of the DoniachSunjic object, including details + for position, width, area, and beta. + + Returns: + - str: The string representation of the object. + """ + return ( + f"DoniachSunjic(position={self.position}, width={self.width}, " + f"area={self.area}, beta={self.beta})" + ) diff --git a/src/pynxtools_xps/reader.py b/src/pynxtools_xps/reader.py index 56d1ae71..6fe0dae0 100644 --- a/src/pynxtools_xps/reader.py +++ b/src/pynxtools_xps/reader.py @@ -42,7 +42,7 @@ from pynxtools_xps.specs.sle.sle_specs import SleMapperSpecs from pynxtools_xps.specs.xml.xml_specs import XmlMapperSpecs from pynxtools_xps.specs.xy.xy_specs import XyMapperSpecs -from pynxtools_xps.vms.txt_vamas_export import TxtMapperVamasExport +from pynxtools_xps.vms.vamas_export import TxtMapperVamasExport, CsvMapperVamasResult from pynxtools_xps.vms.vamas import VamasMapper logger = logging.getLogger(__name__) @@ -109,6 +109,29 @@ def concatenate_values(value1, value2): return concatenated +def _check_multiple_extensions(file_paths: Tuple[str] = None) -> bool: + """ + Determines if a list of file paths contains more than one unique file extension. + + This method accepts a list of file paths (as strings or `Path` objects) and checks + if there are multiple unique file extensions present in the list. A file extension + is identified as the substring after the last period (`.`) in the file name. + + Parameters: + file_paths (Tuple[str]): A tuple of file paths, which can be strings or + `Path` objects. Defaults to None. + + Returns: + bool: True if more than one unique file extension is found, False otherwise. + + Raises: + TypeError: If `file_paths` is not a tuple of strings or `Path` objects. + """ + extensions = {str(path).split(".")[-1] for path in file_paths if "." in str(path)} + + return len(extensions) > 1 + + # pylint: disable=too-few-public-methods class XPSReader(MultiFormatReader): """Reader for XPS.""" @@ -130,13 +153,17 @@ class XPSReader(MultiFormatReader): ".spe", ".sle", ".slh", - ".txt", ".vms", ".xml", ".xy", + ".txt", # This is last because of the processing_order ] + + __prmt_metadata_file_ext__ = {".csv": ".txt"} + __vendors__ = ["kratos", "phi", "scienta", "specs", "unkwown"] __prmt_vndr_cls: Dict[str, Dict] = { + ".csv": {"unknown": CsvMapperVamasResult}, ".ibw": {"scienta": MapperScienta}, ".npl": {"unkwown": VamasMapper}, ".pro": {"phi": MapperPhi}, @@ -153,7 +180,7 @@ class XPSReader(MultiFormatReader): __file_err_msg__ = ( "Need an XPS data file with one of the following extensions: " - f"{__prmt_file_ext__}" + f"data files: {__prmt_file_ext__}, metadata files: {__prmt_metadata_file_ext__}." ) __vndr_err_msg__ = ( @@ -173,7 +200,15 @@ def __init__(self, *args, **kwargs): ".json": self.set_config_file, } - for ext in XPSReader.__prmt_file_ext__: + self.processing_order = ( + XPSReader.__prmt_file_ext__ + + list(XPSReader.__prmt_metadata_file_ext__.keys()) + + list(self.extensions.keys()) + ) + + for ext in XPSReader.__prmt_file_ext__ + list( + XPSReader.__prmt_metadata_file_ext__.keys() + ): self.extensions[ext] = self.handle_data_file def set_config_file(self, file_path: str) -> Dict[str, Any]: @@ -266,7 +301,7 @@ def _check_for_vendors(file_path: str) -> str: return list(vendor_dict.keys())[0] if file_ext == ".txt": return _check_for_vendors_txt(file_path) - return None + raise ValueError(XPSReader.__vndr_err_msg__) def _check_for_vendors_txt(file_path: str) -> str: """ @@ -294,7 +329,7 @@ def _check_for_vendors_txt(file_path: str) -> str: if any(vendor_opt in contents for vendor_opt in vendor_options): return vendor if contents[:6] == "[Info]": - # This is for picking the Scienta reader is "scienta" + # This is for picking the Scienta reader if "scienta" # is not in the file return vendor return "unknown" @@ -303,23 +338,29 @@ def _check_for_vendors_txt(file_path: str) -> str: if file_ext in XPSReader.__prmt_file_ext__: vendor = _check_for_vendors(file_path) - try: - parser = XPSReader.__prmt_vndr_cls[file_ext][vendor]() - - parser.parse_file(file_path, **self.kwargs) - self.config_file = XPSReader.reader_dir.joinpath( - "config", parser.config_file - ) - data_dict = parser.data_dict - - except ValueError as val_err: - raise ValueError(XPSReader.__vndr_err_msg__) from val_err - except KeyError as key_err: - raise KeyError(XPSReader.__vndr_err_msg__) from key_err - else: - raise ValueError(XPSReader.__file_err_msg__) - self.xps_data_dicts += [data_dict] + parser = XPSReader.__prmt_vndr_cls[file_ext][vendor]() + parser.parse_file(file_path, **self.kwargs) + data_dict = parser.data_dict + + self.config_file = XPSReader.reader_dir.joinpath( + "config", parser.config_file + ) + self.xps_data_dicts += [data_dict] + + elif file_ext in XPSReader.__prmt_metadata_file_ext__: + vendor = _check_for_vendors(file_path) + + metadata_parser = XPSReader.__prmt_vndr_cls[file_ext][vendor]() + metadata_parser.parse_file(file_path, **self.kwargs) + + main_file_ext = XPSReader.__prmt_metadata_file_ext__[file_ext] + + main_file_dicts = [ + d for d in self.xps_data_dicts if d.get("file_ext") == main_file_ext + ] + + metadata_parser.update_main_file_dict(main_file_dicts) return {} @@ -422,23 +463,28 @@ def check_for_same_entries( common_entries, dict_indices = check_for_same_entries(self.xps_data_dicts) if common_entries: - for entry, indices in zip(common_entries, dict_indices): - dicts_with_common_entries = [self.xps_data_dicts[i] for i in indices] - - for i, data_dict in enumerate(dicts_with_common_entries): - for key, value in data_dict.copy().items(): - new_key = key.replace(f"/ENTRY[{entry}]", f"/ENTRY[{entry}{i}]") - if key == "data": - for entry_name, xarr in value.copy().items(): - if entry_name == entry: - new_entry_name = entry_name.replace( - f"{entry}", f"{entry}{i}" - ) - value[new_entry_name] = xarr - del value[entry_name] - if new_key != key: - data_dict[new_key] = value - del data_dict[key] + if not self.overwrite_keys: + for entry, indices in zip(common_entries, dict_indices): + dicts_with_common_entries = [ + self.xps_data_dicts[i] for i in indices + ] + + for i, data_dict in enumerate(dicts_with_common_entries): + for key, value in data_dict.copy().items(): + new_key = key.replace( + f"/ENTRY[{entry}]", f"/ENTRY[{entry}{i}]" + ) + if key == "data": + for entry_name, xarr in value.copy().items(): + if entry_name == entry: + new_entry_name = entry_name.replace( + f"{entry}", f"{entry}{i}" + ) + value[new_entry_name] = xarr + del value[entry_name] + if new_key != key: + data_dict[new_key] = value + del data_dict[key] for data_dict in self.xps_data_dicts: # If there are multiple input data files of the same type, @@ -449,8 +495,9 @@ def check_for_same_entries( ] self.xps_data = {**self.xps_data, **data_dict} - for key, value1, value2 in existing: - self.xps_data[key] = concatenate_values(value1, value2) + if not self.overwrite_keys: + for key, value1, value2 in existing: + self.xps_data[key] = concatenate_values(value1, value2) def _get_analyser_names(self) -> List[str]: """ @@ -627,11 +674,28 @@ def get_signals(key: str) -> List[str]: return list(map(str, data_vars)) - if path.startswith("@data:*"): - return get_signals(key=path.split(":*.")[-1]) + def get_processes(process_key: str) -> List[str]: + # pattern = re.compile(rf"/ENTRY\[{self.callbacks.entry_name}]/({process_key}\d+)/") + pattern = re.compile( + rf"/ENTRY\[{self.callbacks.entry_name}]\.*/{process_key}([a-zA-Z0-9_]+)" + ) + + process_names = { + match for key in self.xps_data for match in pattern.findall(key) + } + + return sorted(process_names) + + patterns: Dict[str, Any] = { + r"data/DATA": lambda: get_signals(path.split(":*.")[-1]), + r"DETECTOR\[[a-zA-Z0-9_]+\]/raw_data": lambda: get_signals("channels"), + "peak": lambda: get_processes("component"), + "background": lambda: get_processes("region"), + } - if any(x in path for x in ["counts", "raw/@units"]): - return get_signals(key="channels") + for pattern, func in patterns.items(): + if re.search(pattern, key): + return func() return get_signals(key="scans") @@ -711,6 +775,8 @@ def read( objects: Tuple[Any] = None, **kwargs, ) -> dict: + self.overwrite_keys = _check_multiple_extensions(file_paths) + template = super().read(template, file_paths, objects, suppress_warning=True) self.set_root_default(template) diff --git a/src/pynxtools_xps/reader_utils.py b/src/pynxtools_xps/reader_utils.py index 79f394c6..0311aa1f 100644 --- a/src/pynxtools_xps/reader_utils.py +++ b/src/pynxtools_xps/reader_utils.py @@ -18,6 +18,7 @@ Helper functions for populating NXmpes template """ +import os import logging import re from abc import ABC, abstractmethod @@ -62,7 +63,6 @@ def __init__(self): self.file: Union[str, Path] = "" self.raw_data: List[str] = [] self._xps_dict: Dict[str, Any] = {} - self._root_path = "" self.parser = None @@ -93,8 +93,8 @@ def parse_file(self, file, **kwargs): self.parser = self._select_parser() self.raw_data = self.parser.parse_file(file, **kwargs) - file_key = f"{self._root_path}/File" - self._xps_dict[file_key] = file + self._xps_dict["File"] = file + self._xps_dict["file_ext"] = os.path.splitext(file)[1] self.construct_data() @@ -437,6 +437,7 @@ def align_name_part(name_part: str): "-": "_", ":": "_", "+": "_", + "/": "_", } ) diff --git a/src/pynxtools_xps/vms/casa_data_model.py b/src/pynxtools_xps/vms/casa_data_model.py index c976aeab..d260428f 100644 --- a/src/pynxtools_xps/vms/casa_data_model.py +++ b/src/pynxtools_xps/vms/casa_data_model.py @@ -39,10 +39,13 @@ from pynxtools_xps.models.backgrounds import ( LinearBackground, Shirley, + StepUp, + StepDown, TougaardU3, TougaardU4, ) + logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) @@ -114,21 +117,23 @@ def process_comments(self, comment_list: List[str]): def flatten_metadata(self): # Write process data - process_key_map: Dict[str, List[str]] = { - "process": ["energy_calibrations", "intensity_calibrations", "smoothings"], - "peak_fitting": ["regions", "components"], - } + process_keys: List[str] = [ + "energy_calibrations", + "intensity_calibrations", + "smoothings", + "regions", + "components", + ] flattened_dict: Dict[str, Any] = {} - for grouping, process_key_list in process_key_map.items(): - for spectrum_key in process_key_list: - processes = self.casa_data[spectrum_key] - for i, process in enumerate(processes): - process_key = f"{spectrum_key}/{spectrum_key.rstrip('s')}{i}" - for key, value in process.dict().items(): - key = key.replace("_units", "/@units") - flattened_dict[f"{process_key}/{key}"] = value + for process_key in process_keys: + processes = self.casa_data[process_key] + for i, process in enumerate(processes): + spectrum_key = f"{process_key.rstrip('s')}{i}" + for key, value in process.dict().items(): + key = key.replace("_units", "/@units") + flattened_dict[f"{spectrum_key}/{key}"] = value return flattened_dict @@ -329,9 +334,19 @@ class CasaEnergyCalibration(XpsDataclass): measured_energies_units: str = "eV" aligned_energy: float = 0.0 aligned_energy_units: str = "eV" - operation: str = "eV" + operation: str = "ADD" range_calibration: bool = False + def apply_energy_shift(self, x: float): + if self.operation == "addition" and self.energy_type == "binding": + return x - self.energy_offset + + elif self.operation == "addition" and self.energy_type == "kinetic": + return x + self.energy_offset + + if self.range_calibration: + pass # ToDo: apply range calibration + @dataclass class CasaIntensityCalibration(XpsDataclass): @@ -356,31 +371,57 @@ class CasaRegion(XpsDataclass): av_width: float = 0.0 av_width_units: str = "eV" start_offset: float = 0.0 - start_offset_units: str = "counts_per_second" + start_offset_units: str = "" end_offset: float = 0.0 - end_offset_units: str = "counts_per_second" + end_offset_units: str = "" cross_section: list = field(default_factory=list) tag: str = "" unknown_0: float = 0.0 unknown_1: float = 0.0 rsf_effective: float = 0.0 - def calculate_background(self, x: np.array, y: np.array): + def calculate_background(self, x: np.ndarray, y: np.ndarray): backgrounds: Dict[str, Any] = { + "Linear": LinearBackground, "Shirley": Shirley, - # "Step Down": StepDown, + "Step Up": StepUp, + "Step Down": StepDown, "U 3 Tougaard": TougaardU3, "U 4 Tougaard": TougaardU4, } - leading_letters, background_parameters = split_after_letters(self.bg_type) + leading_letters, _ = split_after_letters(self.bg_type) + background_params = [float(param) for param in self.cross_section] try: background_class = backgrounds[leading_letters] - background = background_class(*background_parameters) + try: + background = background_class(*background_params) + except TypeError: + background = background_class() + + if self.end > self.start: + min_x = self.start + max_x = self.end + else: + min_x = self.end + max_x = self.start + + region = np.argwhere((x >= min_x) & (x <= max_x)) + fit_region = slice(region[0, 0], region[-1, 0], 1) + + self.start_offset = 100 + + y_start_offset = y[0] * (self.start_offset / 100.0) + y_end_offset = y[-1] * (self.end_offset / 100.0) + y[0] -= y_start_offset + y[-1] -= y_end_offset + + x, y = x[fit_region], y[fit_region] + + self.data = background.calc_background(x, y) - self.lineshape = background.calc_background(x, y) self.formula = background.formula() self.description = str(background) @@ -428,7 +469,9 @@ class CasaComponent(XpsDataclass): tag: str = "" const: str = "" # CONST - def calculate_lineshape(self, x: np.array): + atomic_concentration: float = 0.0 + + def calculate_lineshape(self, x: np.ndarray): lineshapes: Dict[str, Any] = { "GL": GaussianLorentzianProduct, "SGL": GaussianLorentzianSum, @@ -446,7 +489,7 @@ def calculate_lineshape(self, x: np.array): peak = peak_class(*peak_parameters) - self.lineshape = peak.calc_lineshape(x) + self.data = peak.calc_lineshape(x) self.formula = peak.formula() self.description = str(peak) diff --git a/src/pynxtools_xps/vms/txt_vamas_export.py b/src/pynxtools_xps/vms/txt_vamas_export.py deleted file mode 100644 index a7203f18..00000000 --- a/src/pynxtools_xps/vms/txt_vamas_export.py +++ /dev/null @@ -1,594 +0,0 @@ -# Copyright The NOMAD Authors. -# -# This file is part of NOMAD. See https://nomad-lab.eu for further info. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# pylint: disable=too-many-lines,too-few-public-methods -""" -Classes for reading XPS files from TXT export of CasaXPS. -""" - -import itertools -import operator -import copy -import warnings -from typing import Any, Dict, List -from abc import ABC, abstractmethod -import xarray as xr -import numpy as np - -from pynxtools_xps.reader_utils import ( - XPSMapper, - check_uniform_step_width, - get_minimal_step, - interpolate_arrays, - construct_entry_name, - construct_data_key, -) -from pynxtools_xps.value_mappers import get_units_for_key, convert_units - -UNITS: dict = { - "data/step_size": "eV", -} - - -class TxtMapperVamasExport(XPSMapper): - """ - Class for restructuring .txt data file from - Casa TXT export (from Vamas) into python dictionary. - """ - - config_file = "config_txt_vamas_export.json" - - def __init__(self): - self.parser_map = { - "rows_of_tables": TextParserRows, - "columns_of_tables": TextParserColumns, - } - super().__init__() - - def _get_file_type(self, file): - """ - Check which export option was used in CasaXPS. - - Parameters - ---------- - file : str - XPS data filepath. - - Returns - ------- - str - Either columns_of_tables or rows_of_tables. - - """ - with open(file, encoding="utf-8") as txt_file: - first_line = txt_file.readline() - if first_line.startswith("Cycle"): - return "columns_of_tables" - return "rows_of_tables" - - def _select_parser(self): - """ - Select parser based on the structure of the text file - - Returns - ------- - TextParser - TextParser for CasaXPS export from Vamas files. - - """ - return self.parser_map[self._get_file_type(self.file)]() - - def construct_data(self): - """Map TXT format to NXmpes-ready dict.""" - # pylint: disable=duplicate-code - spectra = copy.deepcopy(self.raw_data) - - self._xps_dict["data"]: Dict[str, Any] = {} - - template_key_map = { - "file_info": [], - "user": [], - "instrument": [], - "source_xray": [], - "beam_xray": [ - "excitation_energy", - "excitation_energy/@units", - ], - "analyser": [], - "collectioncolumn": [], - "energydispersion": [], - "detector": [ - "dwell_time", - "dwell_time/@units", - ], - "manipulator": [], - "calibration": [], - "sample": [], - "data": [ - "energy_type", - "y_units", - "start_energy", - "stop_energy", - "step_size", - ], - } - - for spectrum in spectra: - self._update_xps_dict_with_spectrum(spectrum, template_key_map) - - def _update_xps_dict_with_spectrum( - self, spectrum: Dict[str, Any], template_key_map: Dict[str, str] - ): - """ - Map one spectrum from raw data to NXmpes-ready dict. - - """ - # pylint: disable=too-many-locals,duplicate-code - entry_parts = [] - for part in ["group_name", "spectrum_type"]: - val = spectrum.get(part, None) - if val: - entry_parts += [val] - - entry = construct_entry_name(entry_parts) - entry_parent = f"/ENTRY[{entry}]" - - file_parent = f"{entry_parent}/file_info" - instrument_parent = f"{entry_parent}/instrument" - analyser_parent = f"{instrument_parent}/analyser" - - path_map = { - "file_info": f"{file_parent}", - "user": f"{entry_parent}/user", - "instrument": f"{instrument_parent}", - "source_xray": f"{instrument_parent}/source_xray", - "beam_xray": f"{instrument_parent}/beam_xray", - "analyser": f"{analyser_parent}", - "collectioncolumn": f"{analyser_parent}/collectioncolumn", - "energydispersion": f"{analyser_parent}/energydispersion", - "detector": f"{analyser_parent}/detector", - "manipulator": f"{instrument_parent}/manipulator", - "calibration": f"{instrument_parent}/calibration", - "sample": f"{entry_parent}/sample", - "data": f"{entry_parent}/data", - } - - for grouping, spectrum_keys in template_key_map.items(): - root = path_map[str(grouping)] - for spectrum_key in spectrum_keys: - try: - mpes_key = spectrum_key - self._xps_dict[f"{root}/{mpes_key}"] = spectrum[spectrum_key] - except KeyError: - pass - - unit_key = f"{grouping}/{spectrum_key}" - units = get_units_for_key(unit_key, UNITS) - if units is not None: - self._xps_dict[f"{root}/{mpes_key}/@units"] = units - - # Create key for writing to data. - scan_key = construct_data_key(spectrum) - energy = np.array(spectrum["data"]["binding_energy"]) - intensity = np.array(spectrum["data"]["intensity"]) - - # If multiple spectra exist to entry, only create a new - # xr.Dataset if the entry occurs for the first time. - if entry not in self._xps_dict["data"]: - self._xps_dict["data"][entry] = xr.Dataset() - - # Write averaged cycle data to 'data'. - all_scan_data = [ - value - for key, value in self._xps_dict["data"][entry].items() - if scan_key.split("_")[0] in key - ] - with warnings.catch_warnings(): - warnings.simplefilter("ignore", category=RuntimeWarning) - averaged_scans = np.mean(all_scan_data, axis=0) - - if averaged_scans.size == 1: - # on first scan in cycle - averaged_scans = intensity - - self._xps_dict["data"][entry][scan_key.split("_")[0]] = xr.DataArray( - data=averaged_scans, - coords={"energy": energy}, - ) - - self._xps_dict["data"][entry][scan_key] = xr.DataArray( - data=intensity, coords={"energy": energy} - ) - - -class TextParser(ABC): # pylint: disable=too-few-public-methods - """ - Parser for ASCI files exported from CasaXPS. - """ - - def __init__(self): - self.lines: List[str] = [] - self.n_headerlines: int = 7 - self.uniform_energy_steps: bool = True - - def parse_file(self, file, uniform_energy_steps=True, **kwargs): - """ - Parse the file into a list of dictionaries. - - Parsed data stored in the attribute 'self.data'. - - Parameters - ---------- - file : str - XPS data filepath. - uniform_energy_steps : bool, optional - If true, the spectra are interpolate to have uniform - energy steps. The default is True. - **kwargs : dict - n_headerlines: number of header_lines in each data block. - - Returns - ------- - dict - DESCRIPTION. - - """ - if "n_headerlines" in kwargs: - self.n_headerlines = kwargs["n_headerlines"] - - self.uniform_energy_steps = uniform_energy_steps - - self._read_lines(file) - blocks = self._parse_blocks() - return self._build_list_of_dicts(blocks) - - def _read_lines(self, file): - """ - Read in all lines from the file as a list of strings. - - Parameters - ---------- - file : str - XPS data filepath. - - Returns - ------- - None. - - """ - with open(file, encoding="utf-8") as txt_file: - for line in txt_file: - self.lines += [line] - - @abstractmethod - def _parse_blocks(self): - """ - Extract spectrum blocks from full data string. - - This method has to be implemented in the inherited parsers. - - Returns - ------- - blocks : list - List of strings, with each string containing one spectrum's - data and metadata. - - """ - blocks = [] - - return blocks - - @abstractmethod - def _build_list_of_dicts(self, blocks): - """ - Build list of dictionaries, with each dict containing data - and metadata of one spectrum (block). - - This method has to be implemented in the inherited parsers. - - Parameters - ---------- - blocks : list - List of data blocks containing one spectrum each. - - Returns - ------- - spectra : list - List of dicts with spectrum data and metadata. - - """ - spectra = [] - - return spectra - - def _separate_header_and_data(self, block): - """ - Separate header (with metadata) from data for one measurement - block - - Returns - ------- - None. - - """ - header = block[: self.n_headerlines] - data = block[self.n_headerlines :] - - return header, data - - -class TextParserRows(TextParser): - """ - Parser for ASCI files exported from CasaXPS using the - 'Rows of Tables' option. - """ - - def __init__(self): - super().__init__() - self.n_headerlines = 7 - - def _parse_blocks(self): - """ - With the 'Rows of Tables' option, there is only one block - with common metadata. - - """ - return self.lines - - def _build_list_of_dicts(self, blocks): - """ - Build list of dictionaries, with each dict containing data - and metadata of one spectrum. - - Parameters - ---------- - blocks : list - List of data blocks containing one spectrum each. - - Returns - ------- - spectra : list - List of dicts with spectrum data and metadata. - - """ - spectra = [] - - header, data_lines = self._separate_header_and_data(blocks) - settings = self._parse_header(header) - data = self._parse_data(data_lines) - for spec_settings, spec_data in zip(settings, data): - spectra += [{**spec_settings, **spec_data}] - - return spectra - - def _parse_header(self, header): - """ - Parse header into metadata dictionary. - - Parameters - ---------- - header : str - Header data for one spectrum as a String. - - Returns - ------- - settings : list - List of dicts with measurement settings for - one spectrum each. - - """ - settings = [] - for spec_header in header[-1].split("\t")[1::3]: - try: - group_name = spec_header.split(":")[1] - region = spec_header.split(":")[2] - y_units = convert_units(spec_header.split(":")[-1]) - except IndexError: - group_name = spec_header if spec_header.strip() else "group" - region = spec_header if spec_header.strip() else "region" - y_units = "counts_per_second" - - spectrum_settings = { - "group_name": group_name, - "spectrum_type": region, - "energy_type": "binding", - "y_units": y_units, - } - settings += [spectrum_settings] - - return settings - - def _parse_data(self, data_lines): - """ - Extract energy and intensity data. - - Parameters - ---------- - data_lines : list - List of lines with measurement data. - - Returns - ------- - list - List of dicts containing the binding energy - and the intensity axes of one spectrum each. - - """ - data_lines = [x.split("\t") for x in data_lines] - for line in data_lines: - del line[2::3] - del line[-1] - - lines = [[] for _ in range(max(len(line) for line in data_lines))] - - for line in data_lines: - for i, data_point in enumerate(line): - try: - lines[i].append(float(data_point.strip("\n"))) - except ValueError: - pass - - data = [] - - for x_bin, intensity in zip(lines[::2], lines[1::2]): - x_bin, intensity = np.array(x_bin), np.array(intensity) - - if self.uniform_energy_steps and not check_uniform_step_width(x_bin): - x_bin, intensity = interpolate_arrays(x_bin, intensity) - - spectrum = { - "data": { - "binding_energy": np.array(x_bin), - "intensity": np.array(intensity).squeeze(), - }, - "start_energy": x_bin[0], - "stop_energy": x_bin[-1], - "energy_type": "binding", - "y_units": "counts_per_second", - } - - if check_uniform_step_width(x_bin): - spectrum["step_size"] = get_minimal_step(x_bin) - - data += [spectrum] - - return data - - -class TextParserColumns(TextParser): - """ - Parser for ASCI files exported from CasaXPS using the - 'Columns of Tables' option. - """ - - def __init__(self): - super().__init__() - self.n_headerlines = 8 - - def _parse_blocks(self): - """ - Extract spectrum blocks from full data string. - - Returns - ------- - blocks : list - List of strings, with each string containing one spectrum's - data and metadata. - - """ - blocks = [ - list(g) for _, g in itertools.groupby(self.lines, lambda i: "Cycle " in i) - ] - blocks = [operator.add(*blocks[i : i + 2]) for i in range(0, len(blocks), 2)] - - return blocks - - def _parse_block_header(self, header): - """ - Parse spectrum block header into metadata dictionary. - - Parameters - ---------- - header : str - Header data for one spectrum as a String. - - Returns - ------- - settings : dictf - Dict of measurement settings for one spectrum. - - """ - group_name = header[0].split(":")[1] - region = header[0].split(":")[2].split("\n")[0] - settings = { - "group_name": group_name, - "spectrum_type": region, - "excitation_energy": float(header[1].split("\t")[2]), - "excitation_energy/@units": header[1].split("\t")[1].split(" ")[-1], - "dwell_time": float(header[1].split("\t")[4].strip()), - "dwell_time/@units": header[1].split("\t")[3].split(" ")[-1], - } - - return settings - - def _parse_block_data(self, block_data): - """ - Extract energy and intensity data from one data block. - - Parameters - ---------- - block_data : list - List of lines with measurement data. - - Returns - ------- - dict - Dict containing the kinetic/binding energy - and the intensity axes. - - """ - lines = np.array([[float(i) for i in d.split()] for d in block_data]) - - x_kin = lines[:, 0] - x_bin = lines[:, 2] - intensity = lines[:, 1] - - if self.uniform_energy_steps and not check_uniform_step_width(x_kin): - x_kin, (x_bin, intensity) = interpolate_arrays(x_kin, [x_bin, intensity]) - - return { - "kinetic_energy": np.array(x_kin), - "binding_energy": np.array(x_bin), - "intensity": np.array(intensity).squeeze(), - } - - def _build_list_of_dicts(self, blocks): - """ - Build list of dictionaries, with each dict containing data - and metadata of one spectrum (block). - - - Parameters - ---------- - blocks : list - List of data blocks containing one spectrum each. - - Returns - ------- - spectra : list - List of dicts with spectrum data and metadata. - - """ - spectra = [] - for block in blocks: - header, block_data_lines = self._separate_header_and_data(block) - block_settings = self._parse_block_header(header) - block_data = {"data": self._parse_block_data(block_data_lines)} - kinetic_energy = block_data["data"]["kinetic_energy"] - block_settings.update( - { - "start_energy": kinetic_energy[0], - "stop_energy": kinetic_energy[-1], - "energy_type": "binding", - "y_units": "counts_per_second", - } - ) - if check_uniform_step_width(kinetic_energy): - block_settings["step_size"] = get_minimal_step(kinetic_energy) - - spectra += [{**block_settings, **block_data}] - - return spectra diff --git a/src/pynxtools_xps/vms/vamas.py b/src/pynxtools_xps/vms/vamas.py index 000151e0..587137e6 100644 --- a/src/pynxtools_xps/vms/vamas.py +++ b/src/pynxtools_xps/vms/vamas.py @@ -134,6 +134,12 @@ class VamasMapper(XPSMapper): config_file = "config_vms.json" + def __init__(self): + self.multiple_spectra_groups: bool = True + self.same_spectrum_names: bool = False + + super().__init__() + def _select_parser(self): """ Select parser based on the structure of the Vamas file, @@ -149,11 +155,30 @@ def _select_parser(self): def construct_data(self): """Map VMS format to NXmpes-ready dict.""" - # pylint: disable=duplicate-code + + def has_duplicate_spectrum_type(spectra: list[dict]) -> bool: + """ + Check if any two or more spectra in the list have the same 'spectrum_type'. + """ + seen = set() + return any( + spectrum.get("spectrum_type") in seen + or seen.add(spectrum["spectrum_type"]) + for spectrum in spectra + if spectrum.get("spectrum_type") + ) + spectra = deepcopy(self.raw_data) self._xps_dict["data"]: Dict[str, Any] = {} + if len({spectrum.get("group_name") for spectrum in spectra}) == 1: + self.multiple_spectra_groups = False + + if not self.multiple_spectra_groups: + if has_duplicate_spectrum_type(spectra): + self.same_spectrum_names = True + for spectrum in spectra: self._update_xps_dict_with_spectrum(spectrum) @@ -163,12 +188,17 @@ def _update_xps_dict_with_spectrum(self, spectrum: Dict[str, Any]): """ entry_parts = [] - for part in ["group_name", "spectrum_type"]: + + parts_to_use = ["group_name"] * bool(self.multiple_spectra_groups) + [ + "spectrum_type" + ] + + for part in parts_to_use: val = spectrum.get(part, None) if val: entry_parts += [val] - if len(entry_parts) == 1: + if len(entry_parts) == 1 and self.same_spectrum_names: entry_parts += [spectrum["time_stamp"]] entry = construct_entry_name(entry_parts) @@ -745,14 +775,22 @@ def build_list(self): if "casa" in comment_dict: casa_process = comment_dict["casa"] + for energy_calibration in casa_process.casa_data["energy_calibrations"]: + block.x = energy_calibration.apply_energy_shift(block.x) + for region in casa_process.casa_data["regions"]: region.calculate_background(block.x, block.y) + region.data_cps = region.data / block.dwell_time for component in casa_process.casa_data["components"]: component.calculate_lineshape(block.x) + component.data_cps = component.data / block.dwell_time flattened_casa_data = casa_process.flatten_metadata() + if casa_process.casa_data["components"]: + flattened_casa_data["fit_label"] = spectrum_type + comment_dict.update(flattened_casa_data) del comment_dict["casa"] diff --git a/src/pynxtools_xps/vms/vamas_data_model.py b/src/pynxtools_xps/vms/vamas_data_model.py index 035c3599..d444d9dd 100644 --- a/src/pynxtools_xps/vms/vamas_data_model.py +++ b/src/pynxtools_xps/vms/vamas_data_model.py @@ -143,6 +143,17 @@ class VamasBlock(XpsDataclass): num_ord_values: int = 0 future_upgrade_block_entries: list = field(default_factory=list) + def convert_to_binding_energy_scale(self): + """ + Convert from kinetic to binding energy. + + ToDo: check that components are also converted. + """ + self.abscissa_label = "binding energy" + + self.x = self.source_energy - self.x + self.abscissa_start = float(min(self.x)) + @dataclass class ExpVariable(XpsDataclass): diff --git a/src/pynxtools_xps/vms/vamas_export.py b/src/pynxtools_xps/vms/vamas_export.py new file mode 100644 index 00000000..6d55e2b5 --- /dev/null +++ b/src/pynxtools_xps/vms/vamas_export.py @@ -0,0 +1,836 @@ +# Copyright The NOMAD Authors. +# +# This file is part of NOMAD. See https://nomad-lab.eu for further info. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# pylint: disable=too-many-lines,too-few-public-methods +""" +Classes for reading XPS files from TXT export of CasaXPS. +""" + +import matplotlib.pyplot as plt + + +import re +import itertools +import operator +import copy +import warnings +from typing import Any, Dict, List +from collections import Counter +from abc import ABC, abstractmethod +import csv +import xarray as xr +import numpy as np + +from pynxtools_xps.reader_utils import ( + XPSMapper, + check_uniform_step_width, + get_minimal_step, + interpolate_arrays, + construct_entry_name, + construct_data_key, + convert_pascal_to_snake, + _format_value, +) +from pynxtools_xps.value_mappers import get_units_for_key, convert_units + +UNITS: Dict[str, str] = { + "step_size": "eV", +} + +KEY_MAP: Dict[str, str] = { + "K.E.": "kinetic_energy", + "B.E.": "binding_energy", + "Counts": "counts", + "CPS": "counts_per_second", + "Background": "background_intensity", + "Background CPS": "background_intensity_cps", + "Envelope": "fit_envelope", + "Envelope CPS": "fit_envelope_cps", + "%At Conc": "atomic_concentration", +} + + +def handle_repetitions(input_list: List[str]) -> List[str]: + """ + Process a list of strings to handle repeated items by appending a suffix + to each duplicate item. The suffix is in the format '_n', where 'n' is the + occurrence number of that item in the list. + + Parameters: + - input_list (List[str]): A list of strings where repeated items are + identified and renamed with a suffix. + + Returns: + - List[str]: A new list where repeated items are modified by appending + a suffix to make them unique. + """ + counts = Counter(input_list) + result = [] + occurrences = {} + + for item in input_list: + if counts[item] > 1: + # If the item has been seen before, add a suffix + if item not in occurrences: + occurrences[item] = 0 + occurrences[item] += 1 + result.append(f"{item}_{occurrences[item]}") + else: + result.append(item) + + return result + + +def select_from_list(input_list: List[str], skip: int, keep_middle: int) -> List[str]: + """ + Select items from a list according to the specified pattern: + - Extract the first (2 + count + keep_middle) items, + - Skip the next 'count' number of items, + - Extract everything after the skipped items. + + Parameters: + - input_list (List[str]): The list of strings to process. + - skip (int): The number of items to skip after the initial selection. + - keep_middle (int): The number of items to keep in the middle. + + Returns: + - List[str]: The processed list after extracting and skipping items. + """ + first_part = input_list[: (2 + skip + keep_middle)] + skip_part = input_list[(2 + skip + keep_middle) : (2 + skip + keep_middle + skip)] + remaining_part = input_list[(2 + skip + keep_middle + skip) :] + + return first_part + remaining_part + + +def get_dict_keys(header_lines: List[str]) -> List[str]: + """ + Maps a list of header strings to their corresponding keys based on a predefined mapping. + + Args: + header_lines (List[str]): A list of header strings to be mapped. + + Returns: + List[str]: A list of keys, where each header is replaced by its mapped value + or left unchanged if no mapping is found. + """ + return [KEY_MAP.get(header, header) for header in header_lines if header] + + +class TxtMapperVamasExport(XPSMapper): + """ + Class for restructuring .txt data file from + Casa TXT export (from Vamas) into python dictionary. + """ + + config_file = "config_vms.json" + + def __init__(self): + self.parser_map = { + "rows_of_tables": TextParserRows, + "columns_of_tables": TextParserColumns, + } + super().__init__() + + def _get_file_type(self, file): + """ + Check which export option was used in CasaXPS. + + Parameters + ---------- + file : str + XPS data filepath. + + Returns + ------- + str + Either columns_of_tables or rows_of_tables. + + """ + with open(file, encoding="utf-8") as txt_file: + first_line = txt_file.readline() + if first_line.startswith("Cycle"): + return "columns_of_tables" + return "rows_of_tables" + + def _select_parser(self): + """ + Select parser based on the structure of the text file + + Returns + ------- + TextParser + TextParser for CasaXPS export from Vamas files. + + """ + return self.parser_map[self._get_file_type(self.file)]() + + def construct_data(self): + """Map TXT format to NXmpes-ready dict.""" + spectra = copy.deepcopy(self.raw_data) + + self._xps_dict["data"]: Dict[str, Any] = {} + + for spectrum in spectra: + self._update_xps_dict_with_spectrum(spectrum) + + def _update_xps_dict_with_spectrum(self, spectrum: Dict[str, Any]): + """ + Map one spectrum from raw data to NXmpes-ready dict. + + """ + # pylint: disable=too-many-locals,duplicate-code + entry_parts = [] + for part in ["group_name", "spectrum_type"]: + val = spectrum.get(part, None) + if val: + entry_parts += [val] + + entry = construct_entry_name(entry_parts) + entry_parent = f"/ENTRY[{entry}]" + + entry_parent = f"/ENTRY[{entry}]" + + for key, value in spectrum.items(): + if key.startswith("entry"): + entry_parent = "/ENTRY[entry]" + key = key.replace("entry/", "", 1) + mpes_key = f"{entry_parent}/{key}" + self._xps_dict[mpes_key] = value + + units = get_units_for_key(key, UNITS) + if units is not None: + self._xps_dict[f"{mpes_key}/@units"] = units + + # Create key for writing to data. + scan_key = construct_data_key(spectrum) + # energy = np.array(spectrum["binding_energy/data"]) + # intensity = np.array(spectrum["counts_per_second/data"]) + + energy = np.array(spectrum["kinetic_energy/data"]) + intensity = np.array(spectrum["counts/data"]) + + # If multiple spectra exist to entry, only create a new + # xr.Dataset if the entry occurs for the first time. + if entry not in self._xps_dict["data"]: + self._xps_dict["data"][entry] = xr.Dataset() + + # Write averaged cycle data to 'data'. + all_scan_data = [ + value + for key, value in self._xps_dict["data"][entry].items() + if scan_key.split("_")[0] in key + ] + with warnings.catch_warnings(): + warnings.simplefilter("ignore", category=RuntimeWarning) + averaged_scans = np.mean(all_scan_data, axis=0) + + if averaged_scans.size == 1: + # on first scan in cycle + averaged_scans = intensity + + self._xps_dict["data"][entry][scan_key.split("_")[0]] = xr.DataArray( + data=averaged_scans, + coords={"energy": energy}, + ) + + self._xps_dict["data"][entry][scan_key] = xr.DataArray( + data=intensity, coords={"energy": energy} + ) + + +class TextParser(ABC): # pylint: disable=too-few-public-methods + """ + Parser for ASCI files exported from CasaXPS. + """ + + def __init__(self): + self.lines: List[str] = [] + self.n_headerlines: int = 7 + self.uniform_energy_steps: bool = True + + def parse_file(self, file, uniform_energy_steps=True, **kwargs): + """ + Parse the file into a list of dictionaries. + + Parsed data stored in the attribute 'self.data'. + + Parameters + ---------- + file : str + XPS data filepath. + uniform_energy_steps : bool, optional + If true, the spectra are interpolate to have uniform + energy steps. The default is True. + **kwargs : dict + n_headerlines: number of header_lines in each data block. + + Returns + ------- + dict + DESCRIPTION. + + """ + if "n_headerlines" in kwargs: + self.n_headerlines = kwargs["n_headerlines"] + + self.uniform_energy_steps = uniform_energy_steps + + self._read_lines(file) + blocks = self._parse_blocks() + return self._build_list_of_dicts(blocks) + + def _read_lines(self, file): + """ + Read in all lines from the file as a list of strings. + + Parameters + ---------- + file : str + XPS data filepath. + + Returns + ------- + None. + + """ + with open(file, encoding="utf-8") as txt_file: + for line in txt_file: + self.lines += [line] + + @abstractmethod + def _parse_blocks(self): + """ + Extract spectrum blocks from full data string. + + This method has to be implemented in the inherited parsers. + + Returns + ------- + blocks : list + List of strings, with each string containing one spectrum's + data and metadata. + + """ + blocks = [] + + return blocks + + @abstractmethod + def _build_list_of_dicts(self, blocks): + """ + Build list of dictionaries, with each dict containing data + and metadata of one spectrum (block). + + This method has to be implemented in the inherited parsers. + + Parameters + ---------- + blocks : list + List of data blocks containing one spectrum each. + + Returns + ------- + spectra : list + List of dicts with spectrum data and metadata. + + """ + spectra = [] + + return spectra + + def _separate_header_and_data(self, block): + """ + Separate header (with metadata) from data for one measurement + block + + Returns + ------- + None. + + """ + header = block[: self.n_headerlines] + data = block[self.n_headerlines :] + + return header, data + + +class TextParserRows(TextParser): + """ + Parser for ASCI files exported from CasaXPS using the + 'Rows of Tables' option. + """ + + def __init__(self): + super().__init__() + self.n_headerlines: int = 7 + + def _parse_blocks(self): + """ + With the 'Rows of Tables' option, there is only one block + with common metadata. + + """ + return self.lines + + def _build_list_of_dicts(self, blocks): + """ + Build list of dictionaries, with each dict containing data + and metadata of one spectrum. + + Parameters + ---------- + blocks : list + List of data blocks containing one spectrum each. + + Returns + ------- + spectra : list + List of dicts with spectrum data and metadata. + + """ + spectra = [] + + header, data_lines = self._separate_header_and_data(blocks) + settings = self._parse_header(header) + data = self._parse_data(data_lines) + for spec_settings, spec_data in zip(settings, data): + spectra += [{**spec_settings, **spec_data}] + + return spectra + + def _parse_header(self, header): + """ + Parse header into metadata dictionary. + + Parameters + ---------- + header : str + Header data for one spectrum as a String. + + Returns + ------- + settings : list + List of dicts with measurement settings for + one spectrum each. + + """ + settings = [] + for spec_header in header[-1].split("\t")[1::3]: + try: + group_name = spec_header.split(":")[1] + region = spec_header.split(":")[2] + y_units = convert_units(spec_header.split(":")[-1]) + except IndexError: + group_name = spec_header if spec_header.strip() else "group" + region = spec_header if spec_header.strip() else "region" + y_units = "counts_per_second" + + spectrum_settings = { + "group_name": group_name, + "spectrum_type": region, + "energy_type": "binding", + "y_units": y_units, + } + settings += [spectrum_settings] + + return settings + + def _parse_data(self, data_lines): + """ + Extract energy and intensity data. + + Parameters + ---------- + data_lines : list + List of lines with measurement data. + + Returns + ------- + list + List of dicts containing the binding energy + and the intensity axes of one spectrum each. + + """ + data_lines = [x.split("\t") for x in data_lines] + for line in data_lines: + del line[2::3] + del line[-1] + + lines = [[] for _ in range(max(len(line) for line in data_lines))] + + for line in data_lines: + for i, data_point in enumerate(line): + try: + lines[i].append(float(data_point.strip("\n"))) + except ValueError: + pass + + data = [] + + for x_bin, intensity in zip(lines[::2], lines[1::2]): + x_bin, intensity = np.array(x_bin), np.array(intensity) + + if self.uniform_energy_steps and not check_uniform_step_width(x_bin): + x_bin, intensity = interpolate_arrays(x_bin, intensity) + + spectrum = { + "data": { + "binding_energy": np.array(x_bin), + "intensity": np.array(intensity).squeeze(), + }, + "start_energy": x_bin[0], + "stop_energy": x_bin[-1], + "energy_type": "binding", + "y_units": "counts_per_second", + } + + if check_uniform_step_width(x_bin): + spectrum["step_size"] = get_minimal_step(x_bin) + + data += [spectrum] + + return data + + +class TextParserColumns(TextParser): + """ + Parser for ASCI files exported from CasaXPS using the + 'Columns of Tables' option. + """ + + def __init__(self): + super().__init__() + self.n_headerlines = 8 + + def _parse_blocks(self): + """ + Extract spectrum blocks from full data string. + + Returns + ------- + blocks : list + List of strings, with each string containing one spectrum's + data and metadata. + + """ + blocks = [ + list(g) for _, g in itertools.groupby(self.lines, lambda i: "Cycle " in i) + ] + blocks = [operator.add(*blocks[i : i + 2]) for i in range(0, len(blocks), 2)] + + return blocks + + def _parse_block_data(self, block_lines): + """ + Parses a block of spectral data into metadata and a DataFrame of measurements. + + Args: + block (list of str): The raw lines of the spectral data block. + + Returns: + dict: A dictionary with metadata and a DataFrame of measurements. + """ + + metadata = {} + data = {} + fit_data = {} + + in_data_section = False + + for line in block_lines: + line = line.strip() + if line.startswith("Cycle"): + # Extract cycle and scan type + metadata["cycle"], metadata["source"], metadata["spectrum_type"] = map( + str.strip, line.split(":") + ) + + elif line.startswith("Characteristic Energy"): + # Parse characteristic energy and acquisition time + metadata_match = re.findall(r"([\w\s]+)\t([\deE\+\-.]+)", line) + matches = {key.strip(): float(value) for key, value in metadata_match} + for key, value in matches.items(): + key, unit = key.rsplit(" ", 1) + key = convert_pascal_to_snake(key) + metadata.update({key: value, f"{key}/@units": unit}) + + elif line.startswith("Name"): + comp_names = handle_repetitions(get_dict_keys(line.split("\t")[1:])) + for comp_index, comp_name in enumerate(comp_names): + data[f"component{comp_index}"] = { + "name": comp_name, + "data": [], + "data_cps": [], + } + n_components = len(comp_names) + + elif line.startswith(("Area", "Width", "Position", "data")): + line_split = line.split("\t") + fit_data[line_split[0]] = [_format_value(val) for val in line_split[1:]] + + elif "K.E." in line and "Counts" in line: + # Parse column headers + all_names = get_dict_keys(line.split("\t")) + + keep_middle = 2 + + for name in ["background_intensity", "fit_envelope"]: + if name in all_names: + keep_middle += 1 + + names = select_from_list( + all_names, skip=n_components, keep_middle=keep_middle + ) + + names = handle_repetitions(names) + + non_comp_names = [] + + for name in names: + if ( + not any( + subdict.get("name") == name for subdict in data.values() + ) + and "CPS" not in name + ): + data[name] = {"name": name, "data": [], "data_cps": []} + + non_comp_names += [name] + + # Circumvents the problem that there are two columns for + # each component, but two components can also have the + # same name. + new_names = ( + non_comp_names[:2] + + comp_names + + non_comp_names[2 : 2 + keep_middle] + + comp_names + + non_comp_names[2 + keep_middle :] + ) + + in_data_section = True + + elif in_data_section: + values = [val for val in line.split("\t") if val] + + assert len(values) == len(new_names), f"{new_names}" + + lineshape_in = [] + for name, value in zip(new_names, values): + matching_key = name + for key, subdict in data.items(): + if subdict.get("name") == name: + matching_key = key + break + + if name not in lineshape_in: + data[matching_key]["data"].append(_format_value(value)) + lineshape_in += [name] + else: + data[matching_key]["data_cps"].append(_format_value(value)) + flattened = {} + for i, (supkey, subdict) in enumerate(data.items()): + for subkey, value in subdict.items(): + if supkey == value: + continue + if value and any(str(val).strip() for val in value): + if "data" in subkey: + value = np.array(value) + flattened[f"{supkey}/{subkey}"] = value + + for param in ("Area", "FWHM", "Position", "data"): + if param in fit_data: + if param in fit_data and i < len(fit_data[param]): + param_value = _format_value(fit_data[param][i]) + if param_value: + flattened[f"{supkey}/{param.lower()}"] = param_value + + if self.uniform_energy_steps: + uniform = False + + try: + x_arr = flattened["kinetic_energy/data"] + uniform = check_uniform_step_width(x_arr) + except KeyError: + x_arr = flattened["binding_energy/data"] + uniform = check_uniform_step_width(x_arr) + + if not uniform: + return {**metadata, **flattened} + else: + uniform_dict = {} + all_arrays = {} + + for key, value in flattened.copy().items(): + if isinstance(value, np.ndarray): + all_arrays[key] = value + else: + uniform_dict[key] = value + + x_arr, resampled_arrays = interpolate_arrays( + x_arr, list(all_arrays.values()) + ) + + for i, key in enumerate(all_arrays): + uniform_dict[key] = resampled_arrays[i] + + return {**metadata, **uniform_dict} + + return {**metadata, **flattened} + + def _build_list_of_dicts(self, blocks): + """ + Build list of dictionaries, with each dict containing data + and metadata of one spectrum (block). + + + Parameters + ---------- + blocks : list + List of data blocks containing one spectrum each. + + Returns + ------- + spectra : list + List of dicts with spectrum data and metadata. + + """ + spectra = [] + for block in blocks: + parsed_data = self._parse_block_data(block) + + if "binding_energy/data" not in parsed_data: + parsed_data["binding_energy/data"] = ( + parsed_data["characteristic_energy"] + - parsed_data["kinetic_energy/data"] + ) + + if "counts_per_second/data" not in parsed_data: + parsed_data["counts_per_second/data"] = ( + parsed_data["counts"] / parsed_data["acquisition_time"] + ) + + if check_uniform_step_width(parsed_data["kinetic_energy/data"]): + parsed_data["step_size"] = get_minimal_step( + parsed_data["kinetic_energy/data"] + ) + + parsed_data["energy_label"] = "binding" + + spectra += [parsed_data] + + plt.show() + + return spectra + + +class CsvMapperVamasResult(XPSMapper): + """ + Class for restructuring .csv result files from + Casa report export (from Vamas) into python dictionary. + """ + + config_file = "config_vms.json" + + def __init__(self): + super().__init__() + + def _select_parser(self): + """ + Select parser based on the structure of the text file + + Returns + ------- + TextParser + TextParser for CasaXPS export from Vamas files. + + """ + return CsvResultParser() + + def construct_data(self): + self._xps_dict = self.raw_data + + def update_main_file_dict(self, main_file_dicts: List[Dict[str, Any]]): + """ + Update the dictionaries returned by the main files with specific keys from self.data_dict. + + Args: + main_file_dicts (List[Dict[str, Any]]): List of dictionaries to update. + """ + pattern = re.compile(r"(component\d+/)name") + update_with = { + "Area/(RSF*T*MFP)", + "atomic_concentration", + } # Use a set for faster lookups + + for existing_dict in main_file_dicts: + filtered_keys = { + key: match.group(1) + for key in existing_dict + if (match := pattern.search(key)) + } + + for key, prefix in filtered_keys.items(): + value = existing_dict[key] + if value in self.data_dict: + subdict = self.data_dict[value] + for subkey in update_with & subdict.keys(): + new_key = f"{key.rsplit('name', 1)[0]}{subkey}" + existing_dict[new_key] = subdict[subkey] + + +class CsvResultParser: + def parse_file(self, file: str, **kwargs): + """ + Parse only the first table from the input file, + + Args: + file_path (str): Path to the .vms file. + + Returns: + dict: Parsed data including the file path, header, and rows. + """ + table_data = {} + headers = [] + reading_table = False + + with open(file, "r") as f: + reader = csv.reader(f, delimiter="\t") + + for row in reader: + if not row: + if reading_table: + break + continue + + # Detect header row + if row[0].startswith("Name") and not reading_table: + headers = get_dict_keys(row)[1:] + reading_table = True + continue + + # Process rows of the table + if reading_table: + table_data[row[0]] = {} + for i, (header, value) in enumerate(zip(headers, row[1:])): + if value: + value = _format_value(value) + if header == "atomic_concentration": + value /= 100 + table_data[row[0]].update({header: value}) + + return table_data