Skip to content

Commit 0a64eaf

Browse files
committed
Add SimaPro LCIA extractor
1 parent 5afeb88 commit 0a64eaf

File tree

3 files changed

+381
-1
lines changed

3 files changed

+381
-1
lines changed

bw2io/extractors/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,4 @@
66
from .exiobase import Exiobase3MonetaryDataExtractor
77
from .simapro_csv import SimaProCSVExtractor
88
from .simapro_lcia_csv import SimaProLCIACSVExtractor
9+
from .simapro_lcia_95project_csv import SimaProLCIA95ProjectCSVExtractor
Lines changed: 379 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,379 @@
1+
import csv
2+
from pathlib import Path
3+
4+
from bw2data.logs import close_log, get_io_logger
5+
from bw2io.utils import standardize_method_to_len_3
6+
from stats_arrays import *
7+
8+
# SKIPPABLE_SECTIONS = {
9+
# "Airborne emissions",
10+
# "Economic issues",
11+
# "Emissions to soil",
12+
# "Final waste flows",
13+
# "Quantities",
14+
# "Raw materials",
15+
# "Units",
16+
# "Waterborne emissions",
17+
# }
18+
19+
20+
class EndOfDatasets(Exception):
21+
pass
22+
23+
24+
class SimaProLCIA95ProjectCSVExtractor:
25+
"""
26+
Extract data from SimaPro LCIA 9.5 Project CSV file format.
27+
28+
Differs from `SimaProLCIACSVExtractor` in that this format seems not to use
29+
`End` at the end of sections.
30+
31+
Parameters
32+
----------
33+
filepath: str
34+
Filepath of the SimaPro LCIACSV file.
35+
delimiter: str, optional (default: ";")
36+
Delimiter used in the SimaPro LCIACSV file.
37+
encoding: str, optional (default: "cp1252")
38+
Encoding of the SimaPro LCIACSV file.
39+
40+
Raises
41+
------
42+
AssertionError
43+
If the filepath does not exist or the file is not a valid SimaPro
44+
export file.
45+
46+
Returns
47+
-------
48+
list
49+
List of impact categories extracted from the SimaPro file.
50+
"""
51+
52+
@classmethod
53+
def extract(cls, filepath: Path, delimiter: str=";", encoding: str="cp1252"):
54+
filepath = Path(filepath)
55+
assert filepath.is_file(), f"Can't find file {filepath}"
56+
log, logfile = get_io_logger("SimaPro-LCIA-extractor")
57+
58+
log.info(f"""Starting SimaPro import:
59+
Filepath: {filepath}
60+
Delimiter: {delimiter}""")
61+
62+
strip_delete = lambda obj: obj.strip().replace("\x7f", "") if isinstance(obj, str) else obj
63+
empty_lines = lambda line: line if any(line) else None
64+
65+
with open(filepath, "r", encoding=encoding) as csv_file:
66+
reader = csv.reader(csv_file, delimiter=delimiter)
67+
lines = [
68+
[strip_delete(elem) for elem in line]
69+
for line in reader
70+
]
71+
72+
# Check if valid SimaPro file
73+
assert "SimaPro" in lines[0][0], "File is not valid SimaPro export"
74+
75+
impact_categories, context = [], {}
76+
sections = cls.clean_sections(cls.split_into_sections(lines))
77+
78+
for section in sections:
79+
if section[0][0].startswith("SimaPro"):
80+
context["simapro version"] = section[0][1]
81+
elif section[0][0] == 'Name':
82+
context["method"] = section[0][1]
83+
elif section[0][0] == 'Comment':
84+
context["comment"] = "\n".join([line[1] for line in section])
85+
elif section[0][0].startswith("Use"):
86+
context["configuration"] = dict(section)
87+
elif section[0][0] == 'Impact category':
88+
impact_categories.append({
89+
'impact category': section[0][1],
90+
'unit': section[0][2],
91+
'cfs': [cls.parse_cf(line) for line in section[1:]],
92+
**context
93+
})
94+
elif section[0][0] == 'Normalization-Weighting set':
95+
continue
96+
elif section[0][0] == 'Normalization':
97+
pass
98+
elif section[0][0] == 'Weighting':
99+
pass
100+
101+
close_log(log)
102+
return impact_categories
103+
104+
@classmethod
105+
def clean_sections(cls, sections: list) -> list:
106+
"""Remove empty sections, and empty lines from sections"""
107+
return [
108+
[line for line in section if line != []]
109+
for section in sections
110+
if section != [[]]
111+
]
112+
113+
@classmethod
114+
def split_into_sections(cls, data: list) -> list:
115+
"""Split the SimaPro file into sections using the blank line pattern"""
116+
split_locations = [2]
117+
118+
for index, line in enumerate(data):
119+
if line == []:
120+
split_locations.append(index + 1)
121+
122+
sections = (
123+
[data[:split_locations[0]]]
124+
+ [data[split_locations[index]:split_locations[index + 1]] for index in range(len(split_locations) - 1)]
125+
+ [data[split_locations[-1]:]]
126+
)
127+
128+
return sections
129+
130+
@classmethod
131+
def parse_cf(cls, line):
132+
"""Parse line in `Substances` section.
133+
134+
0. category
135+
1. subcategory
136+
2. flow
137+
3. CAS number
138+
4. CF
139+
5. unit
140+
6. damage rate
141+
142+
"""
143+
return {
144+
"categories": (line[0], line[1]),
145+
"name": line[2],
146+
"CAS number": line[3],
147+
"amount": float(line[4].replace(",", ".")),
148+
"unit": line[5],
149+
"damage_rate": line[6] if len(line) >= 7 else None,
150+
}
151+
152+
# @classmethod
153+
# def read_method_data_set(cls, data, index, filepath):
154+
# """
155+
# Read method data set from `data` starting at `index`.
156+
157+
# Parameters
158+
# ----------
159+
# data : list
160+
# A list of lists containing the data to be processed.
161+
# index : int
162+
# The starting index to read method data set from.
163+
# filepath : str
164+
# The file path of the method data set.
165+
166+
# Returns
167+
# -------
168+
# list
169+
# A list of completed method data sets.
170+
# int
171+
# The index where the method data set reading ended.
172+
173+
# Raises
174+
# ------
175+
# ValueError
176+
177+
# """
178+
# metadata, index = cls.read_metadata(data, index)
179+
# method_root_name = metadata.pop("Name")
180+
# description = metadata.pop("Comment")
181+
# category_data, nw_data, damage_category_data, completed_data = [], [], [], []
182+
183+
# # `index` is now the `Impact category` line
184+
# while not data[index] or data[index][0] != "End":
185+
# if not data[index] or not data[index][0]:
186+
# index += 1
187+
# elif data[index][0] == "Impact category":
188+
# catdata, index = cls.get_category_data(data, index + 1)
189+
# category_data.append(catdata)
190+
# elif data[index][0] == "Normalization-Weighting set":
191+
# nw_dataset, index = cls.get_normalization_weighting_data(
192+
# data, index + 1
193+
# )
194+
# nw_data.append(nw_dataset)
195+
# elif data[index][0] == "Damage category":
196+
# catdata, index = cls.get_damage_category_data(data, index + 1)
197+
# damage_category_data.append(catdata)
198+
# else:
199+
# raise ValueError
200+
201+
# for ds in category_data:
202+
# completed_data.append(
203+
# {
204+
# "description": description,
205+
# "name": (method_root_name, ds[0]),
206+
# "unit": ds[1],
207+
# "filename": filepath,
208+
# "exchanges": ds[2],
209+
# }
210+
# )
211+
212+
# for ds in nw_data:
213+
# completed_data.append(
214+
# {
215+
# "description": description,
216+
# "name": (method_root_name, ds[0]),
217+
# "unit": metadata["Weighting unit"],
218+
# "filename": filepath,
219+
# "exchanges": cls.get_all_cfs(ds[1], category_data),
220+
# }
221+
# )
222+
223+
# for ds in damage_category_data:
224+
# completed_data.append(
225+
# {
226+
# "description": description,
227+
# "name": (method_root_name, ds[0]),
228+
# "unit": ds[1],
229+
# "filename": filepath,
230+
# "exchanges": cls.get_damage_exchanges(ds[2], category_data),
231+
# }
232+
# )
233+
234+
# return completed_data, index
235+
236+
# @classmethod
237+
# def get_all_cfs(cls, nw_data, category_data):
238+
# """
239+
# Get all CFs from `nw_data` and `category_data`.
240+
241+
# Parameters
242+
# ----------
243+
# nw_data : list
244+
# A list of tuples containing normalization-weighting (NW) set names and scales.
245+
# category_data : list
246+
# A list of tuples containing impact category names, units, and CF data.
247+
# Returns
248+
# -------
249+
# list
250+
# A list of all CFs.
251+
# """
252+
253+
# def rescale(cf, scale):
254+
# cf["amount"] *= scale
255+
# return cf
256+
257+
# cfs = []
258+
# for nw_name, scale in nw_data:
259+
# for cat_name, _, cf_data in category_data:
260+
# if cat_name == nw_name:
261+
# cfs.extend([rescale(cf, scale) for cf in cf_data])
262+
# return cfs
263+
264+
# @classmethod
265+
# def get_damage_exchanges(cls, damage_data, category_data):
266+
# """
267+
# Calculate the damage exchanges based on damage data and category data.
268+
269+
# Parameters
270+
# ----------
271+
# damage_data : list of tuples
272+
# A list of tuples containing the name and scale of the damage
273+
# category_data : list of tuples
274+
# A list of tuples containing the name, unit, and data of each impact category
275+
276+
# Returns
277+
# -------
278+
# list of dictionaries
279+
# A list of dictionaries with the calculated damage exchanges of each impact category
280+
# """
281+
282+
# def rescale(cf, scale):
283+
# cf["amount"] *= scale
284+
# return cf
285+
286+
# cfs = []
287+
# for damage_name, scale in damage_data:
288+
# for cat_name, _, cf_data in category_data:
289+
# if cat_name == damage_name:
290+
# # Multiple impact categories might use the same exchanges
291+
# # So scale and increment the amount if it exists, scale and append if it doesn't
292+
# for cf in cf_data:
293+
# c_name, c_categories = cf["name"], cf["categories"]
294+
# found_cf = False
295+
# for existing_cf in cfs:
296+
# if (
297+
# existing_cf["name"] == c_name
298+
# and existing_cf["categories"] == c_categories
299+
# ):
300+
# existing_cf["amount"] += cf["amount"] * scale
301+
# found_cf = True
302+
# continue
303+
# if found_cf:
304+
# continue
305+
# cfs.extend([rescale(cf, scale) for cf in cf_data])
306+
# return cfs
307+
308+
# @classmethod
309+
# def get_category_data(cls, data, index):
310+
# """
311+
# Parse impact category data and return its name, unit, and data.
312+
313+
# Parameters
314+
# ----------
315+
# data : list of lists
316+
# A list of lists with the data for all categories
317+
# index : int
318+
# The index of the current impact category in the list
319+
320+
# Returns
321+
# -------
322+
# tuple
323+
# A tuple with the name, unit, and data for the impact category
324+
# """
325+
# cf_data = []
326+
# # First line is name and unit
327+
# name, unit = data[index][:2]
328+
# index += 2
329+
# assert data[index][0] == "Substances"
330+
# index += 1
331+
# while data[index]:
332+
# cf_data.append(cls.parse_cf(data[index]))
333+
# index += 1
334+
# return (name, unit, cf_data), index
335+
336+
# @classmethod
337+
# def get_damage_category_data(cls, data, index):
338+
# """
339+
# Parse damage category data and return the name, unit, and data of the category.
340+
341+
# Parameters
342+
# ----------
343+
# data : list of lists
344+
# A list of lists with the data of the damage categories
345+
# index : int
346+
# The index of the current damage category in the list
347+
348+
# Returns
349+
# -------
350+
# tuple
351+
# A tuple with the name, unit, and data for the damage category
352+
# """
353+
# damage_data = []
354+
# # First line is name and unit
355+
# name, unit = data[index][:2]
356+
# index += 2
357+
# assert data[index][0] == "Impact categories"
358+
# index += 1
359+
# while data[index]:
360+
# method, scalar = data[index][:2]
361+
# damage_data.append((method, float(scalar.replace(",", "."))))
362+
# index += 1
363+
# return (name, unit, damage_data), index
364+
365+
# @classmethod
366+
# def get_normalization_weighting_data(cls, data, index):
367+
# # TODO: Only works for weighting data, no addition or normalization
368+
# nw_data = []
369+
# name = data[index][0]
370+
# index += 2
371+
# assert data[index][0] == "Weighting"
372+
# index += 1
373+
# while data[index]:
374+
# cat, weight = data[index][:2]
375+
# index += 1
376+
# if weight == "0":
377+
# continue
378+
# nw_data.append((cat, float(weight.replace(",", "."))))
379+
# return (name, nw_data), index

bw2io/extractors/simapro_lcia_csv.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ def get_next_method_index(cls, data, index):
119119
try:
120120
if data[index] and data[index][0] in SKIPPABLE_SECTIONS:
121121
index = cls.skip_to_section_end(data, index)
122-
elif data[index] and data[index][0] == "Method":
122+
elif data[index] and data[index][0] in ("Method", "Impact category"):
123123
return index + 1
124124
except IndexError:
125125
# File ends without extra metadata

0 commit comments

Comments
 (0)