From 11efa7d60c4fba234b4ddde3392610fc059d700e Mon Sep 17 00:00:00 2001 From: Frode Helgetun Krogh <70878501+frodehk@users.noreply.github.com> Date: Thu, 23 Jan 2025 14:00:49 +0100 Subject: [PATCH] chore: emission intensity and results --- src/libecalc/application/energy/emitter.py | 8 +- src/libecalc/application/energy_calculator.py | 7 +- .../common/utils/emission_intensity.py | 138 ++++++++++ src/libecalc/common/utils/rates.py | 75 ++++++ src/libecalc/common/variables.py | 5 + .../electricity_consumer.py | 12 +- .../fuel_consumer/fuel_consumer.py | 22 +- .../generator_set/generator_set_dto.py | 24 +- .../installation/installation.py | 209 ++++++++++++++ .../presentation/json_result/mapper.py | 143 ++++------ .../json_result/result/emission.py | 9 + .../json_result/result/results.py | 2 + .../yaml/mappers/component_mapper.py | 23 +- src/libecalc/presentation/yaml/model.py | 1 + .../emitters/yaml_venting_emitter.py | 67 +++-- .../test_compute_emission_intensity_yearly.py | 254 ++++++++++++++++++ .../presentation/exporter/test_ltp.py | 4 +- 17 files changed, 864 insertions(+), 139 deletions(-) create mode 100644 src/libecalc/common/utils/emission_intensity.py diff --git a/src/libecalc/application/energy/emitter.py b/src/libecalc/application/energy/emitter.py index f3a70caa8e..668cc7b1ac 100644 --- a/src/libecalc/application/energy/emitter.py +++ b/src/libecalc/application/energy/emitter.py @@ -1,6 +1,8 @@ import abc from typing import Optional +from pydantic import Field + from libecalc.application.energy.component_energy_context import ComponentEnergyContext from libecalc.application.energy.energy_model import EnergyModel from libecalc.common.variables import ExpressionEvaluator @@ -12,6 +14,8 @@ class Emitter(abc.ABC): Something that emits something. """ + expression_evaluator: Optional[ExpressionEvaluator] = Field(default=None, exclude=True) + @property @abc.abstractmethod def id(self) -> str: ... @@ -21,5 +25,7 @@ def evaluate_emissions( self, energy_context: ComponentEnergyContext, energy_model: EnergyModel, - expression_evaluator: ExpressionEvaluator, ) -> Optional[dict[str, EmissionResult]]: ... + + def set_expression_evaluator(self, expression_evaluator: ExpressionEvaluator): + self.expression_evaluator = expression_evaluator diff --git a/src/libecalc/application/energy_calculator.py b/src/libecalc/application/energy_calculator.py index e11e9c00b1..784c8431b9 100644 --- a/src/libecalc/application/energy_calculator.py +++ b/src/libecalc/application/energy_calculator.py @@ -70,11 +70,7 @@ def evaluate_energy_usage(self) -> dict[str, EcalcModelResult]: for energy_component in energy_components: if hasattr(energy_component, "evaluate_energy_usage"): context = self._get_context(energy_component.id) - self._consumer_results.update( - energy_component.evaluate_energy_usage( - context=context, expression_evaluator=self._expression_evaluator - ) - ) + self._consumer_results.update(energy_component.evaluate_energy_usage(context=context)) self._consumer_results = Numbers.format_results_to_precision(self._consumer_results, precision=6) return self._consumer_results @@ -91,7 +87,6 @@ def evaluate_emissions(self) -> dict[str, dict[str, EmissionResult]]: emission_result = energy_component.evaluate_emissions( energy_context=self._get_context(energy_component.id), energy_model=self._energy_model, - expression_evaluator=self._expression_evaluator, ) if emission_result is not None: diff --git a/src/libecalc/common/utils/emission_intensity.py b/src/libecalc/common/utils/emission_intensity.py new file mode 100644 index 0000000000..a6f110de8d --- /dev/null +++ b/src/libecalc/common/utils/emission_intensity.py @@ -0,0 +1,138 @@ +from datetime import datetime +from typing import Optional + +import numpy as np +import pandas as pd + +from libecalc.common.time_utils import Period, Periods +from libecalc.common.units import Unit +from libecalc.common.utils.rates import ( + TimeSeriesCalendarDayRate, + TimeSeriesIntensity, + TimeSeriesVolumesCumulative, +) + + +class EmissionIntensity: + def __init__( + self, + emission_cumulative: TimeSeriesVolumesCumulative, + hydrocarbon_export_cumulative: TimeSeriesVolumesCumulative, + unit: Unit = Unit.KG_SM3, + ): + self.emission_cumulative = emission_cumulative + self.hydrocarbon_export_cumulative = hydrocarbon_export_cumulative + self.unit = unit + self.periods = emission_cumulative.periods + self.yearly_periods = self._create_yearly_periods() + self.time_vector = self.periods.all_dates + self.start_years = [period.start.year for period in self.periods] + + def _create_yearly_periods(self) -> Periods: + yearly_periods = [] + added_periods = set() + + for period in self.periods: + for year in range(period.start.year, period.end.year + 1): + start_date = datetime(year, 1, 1) + end_date = datetime(year + 1, 1, 1) + period_tuple = (start_date, end_date) + + if period_tuple not in added_periods: + yearly_periods.append(Period(start=start_date, end=end_date)) + added_periods.add(period_tuple) + + return Periods(yearly_periods) + + def _calculate_yearly_old(self) -> list[float]: + """Standard emission intensity at time k, is the sum of emissions from startup until time k + divided by the sum of export from startup until time k. + Thus, intensity_k = ( sum_t=1:k emission(t) ) / ( sum_t=1:k export(t) ). + The yearly emission intensity for year k is the sum of emissions in year k divided by + the sum of export in year k (and thus independent of the years before year k) + I.e. intensity_yearly_k = emission_year_k / export_year_k + emission_year_k may be computed as emission_cumulative(1. january year=k+1) - emission_cumulative(1. january year=k) + hcexport_year_k may be computed as hcexport_cumulative(1. january year=k+1) - hcexport_cumulative(1. january year=k) + To be able to evaluate cumulative emission and hydrocarbon_export at 1. january each year, a linear interpolation function + is created between the time vector and the cumulative function. To be able to treat time as the x-variable, this is + first converted to number of seconds from the beginning of the time vector + """ + + df = pd.DataFrame( + data={ + "emission_cumulative": self.emission_cumulative.values, + "hydrocarbon_cumulative": self.hydrocarbon_export_cumulative.values, + }, + index=self.emission_cumulative.end_dates, # Assuming dates are aligned with cumulative values + ) + + # Reindex the DataFrame to match the time_vector and fill missing values + df = df.reindex(self.time_vector).ffill().fillna(0) + + # df = pd.DataFrame( + # index=self.time_vector, + # data=list(zip(self.emission_cumulative.values, self.hydrocarbon_export_cumulative.values)), + # columns=["emission_cumulative", "hydrocarbon_cumulative"], + # ) + + # Extending the time vector back and forth 1 year used as padding when calculating yearly buckets. + time_vector_interpolated = pd.date_range( + start=datetime(self.time_vector[0].year - 1, 1, 1), + end=datetime(self.time_vector[-1].year + 1, 1, 1), + freq="YS", + ) + + # Linearly interpolating by time using the built-in functionality in Pandas. + cumulative_interpolated: Optional[pd.DataFrame] = df.reindex( + sorted(set(self.periods.all_dates).union(time_vector_interpolated)) + ).interpolate("time") + + if cumulative_interpolated is None: + raise ValueError("Time interpolation of cumulative yearly emission intensity failed") + + cumulative_yearly = cumulative_interpolated.bfill().loc[time_vector_interpolated] + + # Remove the extrapolated timesteps + emissions_per_year = np.diff(cumulative_yearly.emission_cumulative[1:]) + hcexport_per_year = np.diff(cumulative_yearly.hydrocarbon_cumulative[1:]) + + yearly_emission_intensity = np.divide( + emissions_per_year, + hcexport_per_year, + out=np.full_like(emissions_per_year, fill_value=np.nan), + where=hcexport_per_year != 0, + ) + + return yearly_emission_intensity.tolist() + + def calculate_old(self) -> TimeSeriesCalendarDayRate: + """Legacy code that computes yearly intensity and casts the results back to the original time-vector.""" + yearly_buckets = range(self.time_vector[0].year, self.time_vector[-1].year + 1) + yearly_intensity = self._calculate_yearly() + return TimeSeriesCalendarDayRate( + periods=self.periods, + values=[yearly_intensity[yearly_buckets.index(period.start.year)] for period in self.periods], + unit=Unit.KG_SM3, + ) + + def calculate_periods(self): + emission_volumes = self.emission_cumulative.to_volumes() + hydrocarbon_export_volumes = self.hydrocarbon_export_cumulative.to_volumes() + + intensity = emission_volumes / hydrocarbon_export_volumes + + return TimeSeriesIntensity( + periods=self.periods, + values=intensity.values, + unit=self.unit, + ) + + def calculate(self) -> TimeSeriesIntensity: + """Write description here""" + intensity = self.emission_cumulative / self.hydrocarbon_export_cumulative + + return TimeSeriesIntensity( + periods=self.periods, + values=intensity.values, + unit=self.unit, + ) diff --git a/src/libecalc/common/utils/rates.py b/src/libecalc/common/utils/rates.py index 44f9d70d19..0985fd8ed8 100644 --- a/src/libecalc/common/utils/rates.py +++ b/src/libecalc/common/utils/rates.py @@ -665,6 +665,27 @@ def to_rate(self, regularity: Optional[list[float]] = None) -> TimeSeriesRate: rate_type=RateType.CALENDAR_DAY, ) + def __truediv__(self, other: object) -> TimeSeriesCalendarDayRate: + if not isinstance(other, TimeSeriesVolumes): + raise TypeError(f"Dividing TimeSeriesVolumes by '{str(other.__class__)}' is not supported.") + + if self.unit == Unit.KILO and other.unit == Unit.STANDARD_CUBIC_METER: + unit = Unit.KG_SM3 + else: + raise ProgrammingError( + f"Unable to divide unit '{self.unit}' by unit '{other.unit}'. Please add unit conversion." + ) + return TimeSeriesCalendarDayRate( + periods=self.periods, + values=np.divide( + self.values, + other.values, + out=np.full_like(self.values, fill_value=np.nan), + where=np.asarray(other.values) != 0.0, + ).tolist(), + unit=unit, + ) + class TimeSeriesStreamDayRate(TimeSeriesFloat): """ @@ -1061,3 +1082,57 @@ def to_stream_day_timeseries(self) -> TimeSeriesStreamDayRate: return TimeSeriesStreamDayRate( periods=stream_day_rate.periods, values=stream_day_rate.values, unit=stream_day_rate.unit ) + + +class TimeSeriesIntensity(TimeSeries[float]): + @field_validator("values", mode="before") + @classmethod + def convert_none_to_nan(cls, v: Any) -> list[TimeSeriesValue]: + if isinstance(v, list): + # convert None to nan + return [i if i is not None else math.nan for i in v] + return v + + def resample( + self, freq: Frequency, include_start_date: bool = True, include_end_date: bool = True + ) -> TimeSeriesIntensity: + """ + Resample emission intensity according to given frequency. + Slinear is used in order to only interpolate, not extrapolate. + Args: + freq: The frequency the time series should be resampled to + Returns: + TimeSeriesIntensity resampled to the given frequency + """ + if freq is Frequency.NONE: + return self.model_copy() + + ds = pd.Series(index=self.all_dates, data=[0] + self.values) + + new_periods = resample_periods( + self.periods, frequency=freq, include_start_date=include_start_date, include_end_date=include_end_date + ) + + new_dates = new_periods.all_dates + if ds.index[-1] not in new_dates: + logger.warning( + f"The final date in the rate input ({ds.index[-1].strftime('%m/%d/%Y')}) does not " + f"correspond to the end of a period with the requested output frequency. There is a " + f"possibility that the resampling will drop intensities." + ) + ds_interpolated = ds.reindex(ds.index.union(new_dates)).interpolate("time") + + # New resampled pd.Series + resampled: list[float] = ds_interpolated.reindex(new_dates).values.tolist() + + if not include_start_date: + dropped_intensity = resampled[0] + resampled = [value - dropped_intensity for value in resampled[1:]] + else: + resampled = resampled[1:] + + return TimeSeriesIntensity( + periods=new_periods, + values=resampled, + unit=self.unit, + ) diff --git a/src/libecalc/common/variables.py b/src/libecalc/common/variables.py index ffd491d9b8..f8fdcda644 100644 --- a/src/libecalc/common/variables.py +++ b/src/libecalc/common/variables.py @@ -7,6 +7,7 @@ import numpy as np from numpy.typing import NDArray from pydantic import BaseModel, ConfigDict, Field, model_validator +from pydantic_core import CoreSchema, core_schema from libecalc.common.errors.exceptions import ProgrammingError from libecalc.common.temporal_model import TemporalModel @@ -176,3 +177,7 @@ def get_subset_for_period(self, period: Period) -> ExpressionEvaluator: ... def evaluate( self, expression: Union[Expression, TemporalModel, dict[Period, Expression]] ) -> NDArray[np.float64]: ... + + @classmethod + def __get_pydantic_core_schema__(cls, source, handler) -> CoreSchema: + return core_schema.any_schema() diff --git a/src/libecalc/domain/infrastructure/energy_components/electricity_consumer/electricity_consumer.py b/src/libecalc/domain/infrastructure/energy_components/electricity_consumer/electricity_consumer.py index 032cfbe4c2..1840c3f477 100644 --- a/src/libecalc/domain/infrastructure/energy_components/electricity_consumer/electricity_consumer.py +++ b/src/libecalc/domain/infrastructure/energy_components/electricity_consumer/electricity_consumer.py @@ -38,6 +38,7 @@ def __init__( ComponentType.COMPRESSOR_SYSTEM, ], energy_usage_model: dict[Period, ElectricEnergyUsageModel], + expression_evaluator: ExpressionEvaluator, consumes: Literal[ConsumptionType.ELECTRICITY] = ConsumptionType.ELECTRICITY, ): self.name = name @@ -47,6 +48,7 @@ def __init__( self.energy_usage_model = self.check_energy_usage_model(energy_usage_model) self._validate_el_consumer_temporal_model(self.energy_usage_model) self._check_model_energy_usage(self.energy_usage_model) + self.expression_evaluator = expression_evaluator self.consumes = consumes self.component_type = component_type @@ -78,9 +80,10 @@ def get_component_process_type(self) -> ComponentType: def get_name(self) -> str: return self.name - def evaluate_energy_usage( - self, expression_evaluator: ExpressionEvaluator, context: ComponentEnergyContext - ) -> dict[str, EcalcModelResult]: + def set_consumer_results(self, consumer_results: dict[str, EcalcModelResult]): + self.consumer_results = consumer_results + + def evaluate_energy_usage(self, context: ComponentEnergyContext) -> dict[str, EcalcModelResult]: consumer_results: dict[str, EcalcModelResult] = {} consumer = ConsumerEnergyComponent( id=self.id, @@ -95,7 +98,8 @@ def evaluate_energy_usage( } ), ) - consumer_results[self.id] = consumer.evaluate(expression_evaluator=expression_evaluator) + consumer_results[self.id] = consumer.evaluate(expression_evaluator=self.expression_evaluator) + self.set_consumer_results(consumer_results) return consumer_results diff --git a/src/libecalc/domain/infrastructure/energy_components/fuel_consumer/fuel_consumer.py b/src/libecalc/domain/infrastructure/energy_components/fuel_consumer/fuel_consumer.py index 6ea681db11..c67c4aa0b0 100644 --- a/src/libecalc/domain/infrastructure/energy_components/fuel_consumer/fuel_consumer.py +++ b/src/libecalc/domain/infrastructure/energy_components/fuel_consumer/fuel_consumer.py @@ -47,6 +47,7 @@ def __init__( ], fuel: dict[Period, FuelType], energy_usage_model: dict[Period, FuelEnergyUsageModel], + expression_evaluator: ExpressionEvaluator, consumes: Literal[ConsumptionType.FUEL] = ConsumptionType.FUEL, ): self.name = name @@ -54,6 +55,7 @@ def __init__( validate_temporal_model(self.regularity) self.user_defined_category = user_defined_category self.energy_usage_model = self.check_energy_usage_model(energy_usage_model) + self.expression_evaluator = expression_evaluator self.fuel = self.validate_fuel_exist(name=self.name, fuel=fuel, consumes=consumes) self._validate_fuel_consumer_temporal_models(self.energy_usage_model, self.fuel) self._check_model_energy_usage(self.energy_usage_model) @@ -88,9 +90,13 @@ def get_component_process_type(self) -> ComponentType: def get_name(self) -> str: return self.name - def evaluate_energy_usage( - self, expression_evaluator: ExpressionEvaluator, context: ComponentEnergyContext - ) -> dict[str, EcalcModelResult]: + def set_emission_results(self, emission_results: dict[str, EmissionResult]): + self.emission_results = emission_results + + def set_consumer_results(self, consumer_results: dict[str, EcalcModelResult]): + self.consumer_results = consumer_results + + def evaluate_energy_usage(self, context: ComponentEnergyContext) -> dict[str, EcalcModelResult]: consumer_results: dict[str, EcalcModelResult] = {} consumer = ConsumerEnergyComponent( id=self.id, @@ -105,7 +111,8 @@ def evaluate_energy_usage( } ), ) - consumer_results[self.id] = consumer.evaluate(expression_evaluator=expression_evaluator) + consumer_results[self.id] = consumer.evaluate(expression_evaluator=self.expression_evaluator) + self.set_consumer_results(consumer_results) return consumer_results @@ -113,17 +120,18 @@ def evaluate_emissions( self, energy_context: ComponentEnergyContext, energy_model: EnergyModel, - expression_evaluator: ExpressionEvaluator, ) -> Optional[dict[str, EmissionResult]]: fuel_model = FuelModel(self.fuel) fuel_usage = energy_context.get_fuel_usage() assert fuel_usage is not None - return fuel_model.evaluate_emissions( - expression_evaluator=expression_evaluator, + emissions = fuel_model.evaluate_emissions( + expression_evaluator=self.expression_evaluator, fuel_rate=fuel_usage.values, ) + self.set_emission_results(emissions) + return emissions @staticmethod def check_energy_usage_model(energy_usage_model: dict[Period, FuelEnergyUsageModel]): diff --git a/src/libecalc/domain/infrastructure/energy_components/generator_set/generator_set_dto.py b/src/libecalc/domain/infrastructure/energy_components/generator_set/generator_set_dto.py index 21c0dde1bb..d8bf6f203e 100644 --- a/src/libecalc/domain/infrastructure/energy_components/generator_set/generator_set_dto.py +++ b/src/libecalc/domain/infrastructure/energy_components/generator_set/generator_set_dto.py @@ -43,6 +43,7 @@ def __init__( user_defined_category: dict[Period, ConsumerUserDefinedCategoryType], generator_set_model: dict[Period, GeneratorSetSampled], regularity: dict[Period, Expression], + expression_evaluator: ExpressionEvaluator, consumers: list[Union[ElectricityConsumer, ConsumerSystem]] = None, fuel: dict[Period, FuelType] = None, cable_loss: Optional[ExpressionType] = None, @@ -52,6 +53,7 @@ def __init__( self.name = name self.user_defined_category = user_defined_category self.regularity = self.check_regularity(regularity) + self.expression_evaluator = expression_evaluator validate_temporal_model(self.regularity) self.generator_set_model = self.check_generator_set_model(generator_set_model) self.fuel = self.check_fuel(fuel) @@ -90,9 +92,13 @@ def get_component_process_type(self) -> ComponentType: def get_name(self) -> str: return self.name - def evaluate_energy_usage( - self, expression_evaluator: ExpressionEvaluator, context: ComponentEnergyContext - ) -> dict[str, EcalcModelResult]: + def set_emission_results(self, emission_results: dict[str, EmissionResult]): + self.emission_results = emission_results + + def set_consumer_results(self, consumer_results: dict[str, EcalcModelResult]): + self.consumer_results = consumer_results + + def evaluate_energy_usage(self, context: ComponentEnergyContext) -> dict[str, EcalcModelResult]: consumer_results: dict[str, EcalcModelResult] = {} fuel_consumer = Genset( id=self.id, @@ -112,12 +118,13 @@ def evaluate_energy_usage( consumer_results[self.id] = EcalcModelResult( component_result=fuel_consumer.evaluate( - expression_evaluator=expression_evaluator, + expression_evaluator=self.expression_evaluator, power_requirement=context.get_power_requirement(), ), models=[], sub_components=[], ) + self.set_consumer_results(consumer_results) return consumer_results @@ -125,17 +132,18 @@ def evaluate_emissions( self, energy_context: ComponentEnergyContext, energy_model: EnergyModel, - expression_evaluator: ExpressionEvaluator, ) -> Optional[dict[str, EmissionResult]]: fuel_model = FuelModel(self.fuel) fuel_usage = energy_context.get_fuel_usage() assert fuel_usage is not None - - return fuel_model.evaluate_emissions( - expression_evaluator=expression_evaluator, + emissions = fuel_model.evaluate_emissions( + expression_evaluator=self.expression_evaluator, fuel_rate=fuel_usage.values, ) + self.set_emission_results(emissions) + + return emissions @staticmethod def _validate_genset_temporal_models( diff --git a/src/libecalc/domain/infrastructure/energy_components/installation/installation.py b/src/libecalc/domain/infrastructure/energy_components/installation/installation.py index 42bdbb777f..092d8fee2f 100644 --- a/src/libecalc/domain/infrastructure/energy_components/installation/installation.py +++ b/src/libecalc/domain/infrastructure/energy_components/installation/installation.py @@ -1,12 +1,24 @@ +import operator +from collections import defaultdict +from functools import reduce from typing import Optional, Union +import libecalc +from libecalc.application.energy.emitter import Emitter from libecalc.application.energy.energy_component import EnergyComponent +from libecalc.common.component_info.component_level import ComponentLevel from libecalc.common.component_type import ComponentType from libecalc.common.string.string_utils import generate_id +from libecalc.common.temporal_model import TemporalModel from libecalc.common.time_utils import Period +from libecalc.common.units import Unit +from libecalc.common.utils.rates import RateType, TimeSeriesBoolean, TimeSeriesFloat, TimeSeriesRate +from libecalc.common.variables import ExpressionEvaluator +from libecalc.core.result.emission import EmissionResult from libecalc.domain.infrastructure.energy_components.consumer_system.consumer_system_dto import ConsumerSystem from libecalc.domain.infrastructure.energy_components.fuel_consumer.fuel_consumer import FuelConsumer from libecalc.domain.infrastructure.energy_components.generator_set.generator_set_dto import GeneratorSet +from libecalc.domain.infrastructure.energy_components.legacy_consumer.component import ConsumerResult from libecalc.dto.component_graph import ComponentGraph from libecalc.dto.types import InstallationUserDefinedCategoryType from libecalc.dto.utils.validators import ( @@ -14,6 +26,11 @@ validate_temporal_model, ) from libecalc.expression import Expression +from libecalc.presentation.json_result.aggregators import ( + aggregate_emissions, + aggregate_is_valid, +) +from libecalc.presentation.json_result.result.emission import PartialEmissionResult from libecalc.presentation.yaml.yaml_types.emitters.yaml_venting_emitter import ( YamlVentingEmitter, ) @@ -26,6 +43,7 @@ def __init__( regularity: dict[Period, Expression], hydrocarbon_export: dict[Period, Expression], fuel_consumers: list[Union[GeneratorSet, FuelConsumer, ConsumerSystem]], + expression_evaluator: ExpressionEvaluator, venting_emitters: Optional[list[YamlVentingEmitter]] = None, user_defined_category: Optional[InstallationUserDefinedCategoryType] = None, ): @@ -33,8 +51,10 @@ def __init__( self.hydrocarbon_export = self.convert_expression_installation(hydrocarbon_export) self.regularity = self.convert_expression_installation(regularity) self.fuel_consumers = fuel_consumers + self.expression_evaluator = expression_evaluator self.user_defined_category = user_defined_category self.component_type = ComponentType.INSTALLATION + self.component_level = ComponentLevel.INSTALLATION self.validate_installation_temporal_model() if venting_emitters is None: @@ -83,3 +103,192 @@ def get_graph(self) -> ComponentGraph: graph.add_edge(self.id, component.id) return graph + + def evaluate_regularity(self) -> TimeSeriesFloat: + return TimeSeriesFloat( + periods=self.expression_evaluator.get_periods(), + values=self.expression_evaluator.evaluate(expression=TemporalModel(self.regularity)).tolist(), + unit=Unit.NONE, + ) + + def get_hydrocarbon_export_rate(self) -> TimeSeriesRate: + hydrocarbon_export_rate = self.expression_evaluator.evaluate(expression=TemporalModel(self.hydrocarbon_export)) + regularity = self.evaluate_regularity() + + hydrocarbon_export_rate = TimeSeriesRate( + periods=self.expression_evaluator.get_periods(), + values=hydrocarbon_export_rate.tolist(), + unit=Unit.STANDARD_CUBIC_METER_PER_DAY, + rate_type=RateType.CALENDAR_DAY, + regularity=regularity.values, + ) + return hydrocarbon_export_rate + + def get_sub_component_results(self) -> list[libecalc.presentation.json_result.result.EcalcModelResult]: + sub_component_results: list[libecalc.presentation.json_result.result.EcalcModelResult] = [] + for fuel_consumer in self.fuel_consumers: + sub_component_results.extend(result.component_result for result in self._collect_all_results(fuel_consumer)) + return sub_component_results + + def get_consumer_results(self) -> dict[str, ConsumerResult]: + consumer_results = {} + for fuel_consumer in self.fuel_consumers: + consumer_results.update( + {result.component_result.id: result for result in self._collect_all_results(fuel_consumer)} + ) + return consumer_results + + def get_emission_results(self) -> dict[str, dict[str, EmissionResult]]: + """ + Retrieve the emission results for all components in the installation. + + Returns: a mapping from component_id to emissions + """ + emission_results: dict[str, dict[str, EmissionResult]] = {} + for component in self.fuel_consumers: + if isinstance(component, Emitter): + emission_result = component.emission_results # Assuming emission_results is already set + if emission_result is not None: + emission_results[component.id] = emission_result + + return emission_results + + def get_aggregated_emissions(self) -> dict[str, EmissionResult]: + emission_dto_results = self._get_emission_dto_results() + successors = [component.id for component in self.fuel_consumers] + + aggregated_emissions = aggregate_emissions( + [emission_dto_results[fuel_consumer_id] for fuel_consumer_id in successors] + ) + + return { + key: libecalc.presentation.json_result.result.EmissionResult( + name=key, + periods=aggregated_emissions[key].periods, + rate=aggregated_emissions[key].rate, + cumulative=aggregated_emissions[key].rate.to_volumes().cumulative(), + ) + for key in aggregated_emissions + } + + def get_power_component_results(self) -> list[libecalc.presentation.json_result.result.EcalcModelResult]: + return [ + self.get_consumer_results()[consumer.id].component_result + for consumer in self.fuel_consumers + if consumer.id in self.get_consumer_results() + ] + + def get_aggregated_electrical_power_results(self) -> TimeSeriesRate: + return self._compute_aggregated_power( + power_components=self._get_electrical_component_results(), + regularity=self.evaluate_regularity(), + ) + + def get_aggregated_mechanical_power_results(self) -> TimeSeriesRate: + return self._compute_aggregated_power( + power_components=self._get_fuel_component_results(), + regularity=self.evaluate_regularity(), + ) + + def get_energy_usage(self) -> TimeSeriesRate: + regularity = self.evaluate_regularity() + sub_components = self.get_sub_component_results() + return ( + reduce( + operator.add, + [ + TimeSeriesRate.from_timeseries_stream_day_rate(component.energy_usage, regularity=regularity) + for component in sub_components + if component.energy_usage.unit == Unit.STANDARD_CUBIC_METER_PER_DAY + ], + ) + if sub_components + else TimeSeriesRate( + values=[0.0] * self.expression_evaluator.number_of_periods, + periods=self.expression_evaluator.get_periods(), + unit=Unit.STANDARD_CUBIC_METER_PER_DAY, + rate_type=RateType.STREAM_DAY, + regularity=regularity.values, + ) + ) + + def get_is_valid(self) -> TimeSeriesBoolean: + sub_components = self.get_sub_component_results() + return TimeSeriesBoolean( + periods=self.expression_evaluator.get_periods(), + values=aggregate_is_valid([component for component in sub_components if hasattr(component, "is_valid")]), + unit=Unit.NONE, + ) + + def _collect_all_results(self, consumer): + results = [] + if hasattr(consumer, "consumer_results"): + results.extend(consumer.consumer_results.values()) + for sub_consumer in getattr(consumer, "consumers", []): + results.extend(self._collect_all_results(sub_consumer)) + return results + + def _get_electrical_component_results(self) -> list: + electrical_components = [] + for fuel_consumer in self.fuel_consumers: + if fuel_consumer.component_type == ComponentType.GENERATOR_SET: + electrical_components.append(self.get_consumer_results()[fuel_consumer.id].component_result) + return electrical_components + + def _get_fuel_component_results(self) -> list: + fuel_components = [] + for fuel_consumer in self.fuel_consumers: + if fuel_consumer.component_type != ComponentType.GENERATOR_SET: + fuel_components.append(self.get_consumer_results()[fuel_consumer.id].component_result) + return fuel_components + + def _compute_aggregated_power( + self, + regularity: TimeSeriesFloat, + power_components: list, + ): + return reduce( + operator.add, + [ + TimeSeriesRate.from_timeseries_stream_day_rate(component.power, regularity=regularity) + for component in power_components + if component.power is not None + ], + TimeSeriesRate( + values=[0.0] * self.expression_evaluator.number_of_periods, + periods=self.expression_evaluator.get_periods(), + unit=Unit.MEGA_WATT, + rate_type=RateType.STREAM_DAY, + regularity=regularity.values, + ), # Initial value, handle no power output from components + ) + + def _get_emission_dto_results(self) -> dict[str, dict[str, PartialEmissionResult]]: + emission_core_results = self.get_emission_results() + return self._convert_to_timeseries( + emission_core_results=emission_core_results, + regularities=self.evaluate_regularity(), + ) + + def _convert_to_timeseries( + self, + emission_core_results: dict[str, dict[str, EmissionResult]], + regularities: Union[TimeSeriesFloat, dict[str, TimeSeriesFloat]], + ) -> dict[str, dict[str, PartialEmissionResult]]: + dto_result: dict[str, dict[str, PartialEmissionResult]] = {} + + for consumer_id, emissions in emission_core_results.items(): + installation_id = self.id + dto_result[consumer_id] = defaultdict() + + if isinstance(regularities, dict): + regularity = regularities[installation_id] + else: + regularity = regularities + + for emission_name, emission_result in emissions.items(): + dto_result[consumer_id][emission_name] = PartialEmissionResult.from_emission_core_result( + emission_result, regularity=regularity + ) + + return dto_result diff --git a/src/libecalc/presentation/json_result/mapper.py b/src/libecalc/presentation/json_result/mapper.py index abd9507060..54e48314af 100644 --- a/src/libecalc/presentation/json_result/mapper.py +++ b/src/libecalc/presentation/json_result/mapper.py @@ -17,6 +17,7 @@ from libecalc.common.temporal_model import TemporalModel from libecalc.common.time_utils import Period, Periods from libecalc.common.units import Unit +from libecalc.common.utils.emission_intensity import EmissionIntensity from libecalc.common.utils.rates import ( RateType, TimeSeriesBoolean, @@ -26,7 +27,7 @@ ) from libecalc.common.variables import ExpressionEvaluator from libecalc.core.result.emission import EmissionResult -from libecalc.domain.infrastructure import Asset +from libecalc.domain.infrastructure import Asset, Installation from libecalc.dto import CompressorSystemConsumerFunction from libecalc.expression import Expression from libecalc.presentation.json_result.aggregators import ( @@ -34,6 +35,7 @@ aggregate_is_valid, ) from libecalc.presentation.json_result.result.emission import ( + EmissionIntensityResult, PartialEmissionResult, ) from libecalc.presentation.json_result.result.results import ( @@ -67,7 +69,7 @@ def get_requested_compressor_pressures( The pressures are the actual pressures defined by user in input. Args: - energy_usage_model (Dict[Period, Any]): Temporal energy model. + energy_usage_model (dict[Period, Any]): Temporal energy model. pressure_type (CompressorPressureType): Compressor pressure type, inlet- or outlet. name (str): Name of compressor. model_periods (Periods): Actual periods in the model. @@ -131,6 +133,34 @@ def get_requested_compressor_pressures( return TemporalModel(evaluated_temporal_energy_usage_models) +def _compute_intensity( + hydrocarbon_export_rate: TimeSeriesRate, + emissions: dict[str, PartialEmissionResult], +) -> list[EmissionIntensityResult]: + hydrocarbon_export_cumulative = hydrocarbon_export_rate.to_volumes().cumulative() + emission_intensities = [] + for key in emissions: + cumulative_rate_kg = emissions[key].rate.to_volumes().to_unit(Unit.KILO).cumulative() + intensity = EmissionIntensity( + emission_cumulative=cumulative_rate_kg, + hydrocarbon_export_cumulative=hydrocarbon_export_cumulative, + ) + intensity_sm3 = intensity.calculate() + intensity_yearly_sm3 = intensity.calculate_periods() + + emission_intensities.append( + EmissionIntensityResult( + name=emissions[key].name, + periods=emissions[key].periods, + intensity_sm3=intensity_sm3, + intensity_boe=intensity_sm3.to_unit(Unit.KG_BOE), + intensity_yearly_sm3=intensity_yearly_sm3, + intensity_yearly_boe=intensity_yearly_sm3.to_unit(Unit.KG_BOE), + ) + ) + return emission_intensities + + def _to_full_result( emissions: dict[str, PartialEmissionResult], ) -> dict[str, libecalc.presentation.json_result.result.EmissionResult]: @@ -217,7 +247,7 @@ def _parse_emissions( def _compute_aggregated_power( - graph_result: GraphResult, + installation: Installation, regularity: TimeSeriesFloat, power_components: list, ): @@ -229,8 +259,8 @@ def _compute_aggregated_power( if component.power is not None ], TimeSeriesRate( - values=[0.0] * graph_result.variables_map.number_of_periods, - periods=graph_result.variables_map.periods, + values=[0.0] * installation.expression_evaluator.number_of_periods, + periods=installation.expression_evaluator.get_periods(), unit=Unit.MEGA_WATT, rate_type=RateType.STREAM_DAY, regularity=regularity.values, @@ -248,98 +278,29 @@ def _evaluate_installations( asset = graph_result.graph.get_node(asset_id) installation_results = [] for installation in asset.installations: - regularity = TimeSeriesFloat( - periods=expression_evaluator.get_periods(), - values=expression_evaluator.evaluate(expression=TemporalModel(installation.regularity)), - unit=Unit.NONE, - ) - hydrocarbon_export_rate = expression_evaluator.evaluate( - expression=TemporalModel(installation.hydrocarbon_export) - ) + regularity = installation.evaluate_regularity() - hydrocarbon_export_rate = TimeSeriesRate( - periods=expression_evaluator.get_periods(), - values=hydrocarbon_export_rate, - unit=Unit.STANDARD_CUBIC_METER_PER_DAY, - rate_type=RateType.CALENDAR_DAY, - regularity=regularity.values, - ) + hydrocarbon_export_rate = installation.get_hydrocarbon_export_rate() - sub_components = [ - graph_result.consumer_results[component_id].component_result - for component_id in graph_result.graph.get_successors(installation.id, recursively=True) - if component_id in graph_result.consumer_results - ] - - power_components = [ - graph_result.consumer_results[component_id].component_result - for component_id in graph_result.graph.get_successors(installation.id, recursively=False) - if component_id in graph_result.consumer_results - ] - - electrical_components = [] - fuel_components = [] - - for component in power_components: - if graph_result.graph.get_node_info(component.id).component_type == ComponentType.GENERATOR_SET: - electrical_components.append(component) - else: - fuel_components.append(component) - - installation_node_info = graph_result.graph.get_node_info(installation.id) - - power_electrical = _compute_aggregated_power( - graph_result=graph_result, - power_components=electrical_components, - regularity=regularity, - ) - - power_mechanical = _compute_aggregated_power( - graph_result=graph_result, - power_components=fuel_components, - regularity=regularity, - ) + sub_components = installation.get_sub_component_results() + power_electrical = installation.get_aggregated_electrical_power_results() + power_mechanical = installation.get_aggregated_mechanical_power_results() power = power_electrical + power_mechanical - energy_usage = ( - reduce( - operator.add, - [ - TimeSeriesRate.from_timeseries_stream_day_rate(component.energy_usage, regularity=regularity) - for component in sub_components - if component.energy_usage.unit == Unit.STANDARD_CUBIC_METER_PER_DAY - ], - ) - if sub_components - else 0 - ) + energy_usage = installation.get_energy_usage() - emission_dto_results = _convert_to_timeseries( - graph_result, - graph_result.emission_results, - regularity, - ) - aggregated_emissions = aggregate_emissions( - [ - emission_dto_results[fuel_consumer_id] - for fuel_consumer_id in graph_result.graph.get_successors(installation.id) - ] - ) + aggregated_emissions = installation.get_aggregated_emissions() installation_results.append( libecalc.presentation.json_result.result.InstallationResult( id=installation.id, - name=installation_node_info.name, + name=installation.name, parent=asset.id, - component_level=installation_node_info.component_level, - componentType=installation_node_info.component_type.value, + component_level=installation.component_level, + componentType=installation.component_type.value, periods=expression_evaluator.get_periods(), - is_valid=TimeSeriesBoolean( - periods=expression_evaluator.get_periods(), - values=aggregate_is_valid(sub_components), - unit=Unit.NONE, - ), + is_valid=installation.get_is_valid(), power=power, power_cumulative=power.to_volumes().to_unit(Unit.GIGA_WATT_HOURS).cumulative(), power_electrical=power_electrical, @@ -351,7 +312,11 @@ def _evaluate_installations( else energy_usage.to_stream_day(), energy_usage_cumulative=energy_usage.to_volumes().cumulative(), hydrocarbon_export_rate=hydrocarbon_export_rate, - emissions=_to_full_result(aggregated_emissions), + emissions=aggregated_emissions, + emission_intensities=_compute_intensity( + hydrocarbon_export_rate=hydrocarbon_export_rate, + emissions=aggregated_emissions, + ), regularity=TimeSeriesFloat( periods=expression_evaluator.get_periods(), values=regularity.values, @@ -1425,6 +1390,12 @@ def get_asset_result(graph_result: GraphResult) -> libecalc.presentation.json_re energy_usage_cumulative=asset_energy_usage_cumulative, hydrocarbon_export_rate=asset_hydrocarbon_export_rate_core, emissions=_to_full_result(asset_aggregated_emissions), + emission_intensities=_compute_intensity( + hydrocarbon_export_rate=asset_hydrocarbon_export_rate_core, + emissions=asset_aggregated_emissions, + ) + if installation_results + else [], ) return Numbers.format_results_to_precision( diff --git a/src/libecalc/presentation/json_result/result/emission.py b/src/libecalc/presentation/json_result/result/emission.py index aaf24ae108..d53879f515 100644 --- a/src/libecalc/presentation/json_result/result/emission.py +++ b/src/libecalc/presentation/json_result/result/emission.py @@ -6,6 +6,7 @@ from libecalc.common.units import Unit from libecalc.common.utils.rates import ( TimeSeriesFloat, + TimeSeriesIntensity, TimeSeriesRate, TimeSeriesVolumesCumulative, ) @@ -62,3 +63,11 @@ def from_emission_core_result(cls, emission_result: EmissionCoreResult, regulari periods=emission_result.periods, rate=TimeSeriesRate.from_timeseries_stream_day_rate(emission_result.rate, regularity), ) + + +class EmissionIntensityResult(TabularTimeSeries): + name: str + intensity_sm3: TimeSeriesIntensity + intensity_boe: TimeSeriesIntensity + intensity_yearly_sm3: TimeSeriesIntensity + intensity_yearly_boe: TimeSeriesIntensity diff --git a/src/libecalc/presentation/json_result/result/results.py b/src/libecalc/presentation/json_result/result/results.py index c57564ecc5..2083d670c4 100644 --- a/src/libecalc/presentation/json_result/result/results.py +++ b/src/libecalc/presentation/json_result/result/results.py @@ -26,6 +26,7 @@ ) from libecalc.presentation.json_result.result.base import EcalcResultBaseModel from libecalc.presentation.json_result.result.emission import ( + EmissionIntensityResult, EmissionResult, ) from libecalc.presentation.json_result.result.tabular_time_series import ( @@ -72,6 +73,7 @@ class AssetResult(ComponentResultBase): power_electrical_cumulative: Optional[TimeSeriesVolumesCumulative] = None power_mechanical: Optional[TimeSeriesRate] = None power_mechanical_cumulative: Optional[TimeSeriesVolumesCumulative] = None + emission_intensities: Optional[list[EmissionIntensityResult]] = None class InstallationResult(AssetResult): diff --git a/src/libecalc/presentation/yaml/mappers/component_mapper.py b/src/libecalc/presentation/yaml/mappers/component_mapper.py index 8a4578e0dc..7ebb2deb9b 100644 --- a/src/libecalc/presentation/yaml/mappers/component_mapper.py +++ b/src/libecalc/presentation/yaml/mappers/component_mapper.py @@ -8,6 +8,7 @@ from libecalc.common.energy_model_type import EnergyModelType from libecalc.common.logger import logger from libecalc.common.time_utils import Period, define_time_model_for_period +from libecalc.common.variables import ExpressionEvaluator from libecalc.domain.infrastructure import ( Asset, Installation, @@ -101,6 +102,7 @@ def from_yaml_to_dto( data: Union[YamlFuelConsumer, YamlElectricityConsumer, YamlConsumerSystem], regularity: dict[Period, Expression], consumes: ConsumptionType, + expression_evaluator: ExpressionEvaluator, default_fuel: Optional[str] = None, ) -> Consumer: component_type = data.component_type @@ -143,6 +145,7 @@ def from_yaml_to_dto( if consumes == ConsumptionType.FUEL: try: fuel_consumer_name = data.name + return FuelConsumer( name=fuel_consumer_name, user_defined_category=define_time_model_for_period( @@ -152,6 +155,7 @@ def from_yaml_to_dto( fuel=fuel, energy_usage_model=energy_usage_model, component_type=_get_component_type(energy_usage_model), + expression_evaluator=expression_evaluator, consumes=consumes, ) except ValidationError as e: @@ -166,6 +170,7 @@ def from_yaml_to_dto( data.category, target_period=self._target_period ), energy_usage_model=energy_usage_model, + expression_evaluator=expression_evaluator, component_type=_get_component_type(energy_usage_model), consumes=consumes, ) @@ -183,6 +188,7 @@ def from_yaml_to_dto( self, data: YamlGeneratorSet, regularity: dict[Period, Expression], + expression_evaluator: ExpressionEvaluator, default_fuel: Optional[str] = None, ) -> GeneratorSet: try: @@ -208,6 +214,7 @@ def from_yaml_to_dto( self.__consumer_mapper.from_yaml_to_dto( consumer, regularity=regularity, + expression_evaluator=expression_evaluator, consumes=ConsumptionType.ELECTRICITY, ) for consumer in data.consumers or [] @@ -223,6 +230,7 @@ def from_yaml_to_dto( name=generator_set_name, fuel=fuel, regularity=regularity, + expression_evaluator=expression_evaluator, generator_set_model=generator_set_model, consumers=consumers, user_defined_category=user_defined_category, @@ -241,7 +249,7 @@ def __init__(self, references: ReferenceService, target_period: Period): self.__generator_set_mapper = GeneratorSetMapper(references=references, target_period=target_period) self.__consumer_mapper = ConsumerMapper(references=references, target_period=target_period) - def from_yaml_to_dto(self, data: YamlInstallation) -> Installation: + def from_yaml_to_dto(self, data: YamlInstallation, expression_evaluator: ExpressionEvaluator) -> Installation: fuel_data = data.fuel regularity = define_time_model_for_period( convert_expression(data.regularity or 1), target_period=self._target_period @@ -254,6 +262,7 @@ def from_yaml_to_dto(self, data: YamlInstallation) -> Installation: generator_set, regularity=regularity, default_fuel=fuel_data, + expression_evaluator=expression_evaluator, ) for generator_set in data.generator_sets or [] ] @@ -262,6 +271,7 @@ def from_yaml_to_dto(self, data: YamlInstallation) -> Installation: fuel_consumer, regularity=regularity, consumes=ConsumptionType.FUEL, + expression_evaluator=expression_evaluator, default_fuel=fuel_data, ) for fuel_consumer in data.fuel_consumers or [] @@ -271,6 +281,10 @@ def from_yaml_to_dto(self, data: YamlInstallation) -> Installation: data.hydrocarbon_export or Expression.setup_from_expression(0), target_period=self._target_period, ) + venting_emitters = [ + venting_emitter.set_expression_evaluator(expression_evaluator) or venting_emitter + for venting_emitter in data.venting_emitters or [] + ] try: return Installation( @@ -278,7 +292,8 @@ def from_yaml_to_dto(self, data: YamlInstallation) -> Installation: regularity=regularity, hydrocarbon_export=hydrocarbon_export, fuel_consumers=[*generator_sets, *fuel_consumers], - venting_emitters=data.venting_emitters or [], + expression_evaluator=expression_evaluator, + venting_emitters=venting_emitters, user_defined_category=data.category, ) except ValidationError as e: @@ -290,16 +305,18 @@ def __init__( self, references: ReferenceService, target_period: Period, + expression_evaluator: ExpressionEvaluator, ): self.__references = references self.__installation_mapper = InstallationMapper(references=references, target_period=target_period) + self.__expression_evaluator = expression_evaluator def from_yaml_to_dto(self, configuration: YamlValidator) -> Asset: try: ecalc_model = Asset( name=configuration.name, installations=[ - self.__installation_mapper.from_yaml_to_dto(installation) + self.__installation_mapper.from_yaml_to_dto(installation, self.__expression_evaluator) for installation in configuration.installations ], ) diff --git a/src/libecalc/presentation/yaml/model.py b/src/libecalc/presentation/yaml/model.py index fd32cf1ab2..24276022b8 100644 --- a/src/libecalc/presentation/yaml/model.py +++ b/src/libecalc/presentation/yaml/model.py @@ -110,6 +110,7 @@ def dto(self): model_mapper = EcalcModelMapper( references=self._get_reference_service(), target_period=self.period, + expression_evaluator=self.variables, ) return model_mapper.from_yaml_to_dto(configuration=self._configuration) diff --git a/src/libecalc/presentation/yaml/yaml_types/emitters/yaml_venting_emitter.py b/src/libecalc/presentation/yaml/yaml_types/emitters/yaml_venting_emitter.py index adba2336a1..29c30af94b 100644 --- a/src/libecalc/presentation/yaml/yaml_types/emitters/yaml_venting_emitter.py +++ b/src/libecalc/presentation/yaml/yaml_types/emitters/yaml_venting_emitter.py @@ -85,7 +85,9 @@ def check_name(cls, name, info: ValidationInfo): class YamlDirectTypeEmitter(YamlBase, Emitter, EnergyComponent): - model_config = ConfigDict(title="VentingEmitter") + model_config = ConfigDict( + title="VentingEmitter", + ) @property def component_type(self): @@ -121,6 +123,16 @@ def id(self) -> str: description="The emissions for the emitter of type DIRECT_EMISSION", ) + expression_evaluator: Optional[ExpressionEvaluator] = None + + emission_results: Optional[Optional[dict[str, EmissionResult]]] = None + + def set_expression_evaluator(self, expression_evaluator: ExpressionEvaluator): + self.expression_evaluator = expression_evaluator + + def set_emission_results(self, emission_results: Optional[dict[str, EmissionResult]]): + self.emission_results = emission_results + def is_fuel_consumer(self) -> bool: return False @@ -143,33 +155,33 @@ def evaluate_emissions( self, energy_context: ComponentEnergyContext, energy_model: EnergyModel, - expression_evaluator: ExpressionEvaluator, ) -> Optional[dict[str, EmissionResult]]: venting_emitter_results = {} emission_rates = self.get_emissions( - expression_evaluator=expression_evaluator, regularity=energy_model.get_regularity(self.id), ) for emission_name, emission_rate in emission_rates.items(): emission_result = EmissionResult( name=emission_name, - periods=expression_evaluator.get_periods(), + periods=self.expression_evaluator.get_periods(), rate=emission_rate, ) venting_emitter_results[emission_name] = emission_result + + self.set_emission_results(venting_emitter_results) return venting_emitter_results - def get_emissions( - self, expression_evaluator: ExpressionEvaluator, regularity: dict[datetime, Expression] - ) -> dict[str, TimeSeriesStreamDayRate]: - regularity_evaluated = expression_evaluator.evaluate( + def get_emissions(self, regularity: dict[datetime, Expression]) -> dict[str, TimeSeriesStreamDayRate]: + regularity_evaluated = self.expression_evaluator.evaluate( expression=TemporalModel(regularity), ) emissions = {} for emission in self.emissions: - emission_rate = expression_evaluator.evaluate(Expression.setup_from_expression(value=emission.rate.value)) + emission_rate = self.expression_evaluator.evaluate( + Expression.setup_from_expression(value=emission.rate.value) + ) if emission.rate.type == RateType.CALENDAR_DAY: emission_rate = Rates.to_stream_day( calendar_day_rates=np.asarray(emission_rate), regularity=regularity_evaluated @@ -178,7 +190,7 @@ def get_emissions( emission_rate = unit.to(Unit.TONS_PER_DAY)(emission_rate) emissions[emission.name] = TimeSeriesStreamDayRate( - periods=expression_evaluator.get_periods(), + periods=self.expression_evaluator.get_periods(), values=emission_rate, unit=Unit.TONS_PER_DAY, ) @@ -206,7 +218,9 @@ def check_user_defined_category(cls, category, info: ValidationInfo): class YamlOilTypeEmitter(YamlBase, Emitter, EnergyComponent): - model_config = ConfigDict(title="VentingEmitter") + model_config = ConfigDict( + title="VentingEmitter", + ) @property def component_type(self): @@ -242,6 +256,16 @@ def id(self) -> str: description="The volume rate and emissions for the emitter of type OIL_VOLUME", ) + expression_evaluator: Optional[ExpressionEvaluator] = None + + emission_results: Optional[Optional[dict[str, EmissionResult]]] = None + + def set_expression_evaluator(self, expression_evaluator: ExpressionEvaluator): + self.expression_evaluator = expression_evaluator + + def set_emission_results(self, emission_results: Optional[dict[str, EmissionResult]]): + self.emission_results = emission_results + def is_fuel_consumer(self) -> bool: return False @@ -264,31 +288,29 @@ def evaluate_emissions( self, energy_context: ComponentEnergyContext, energy_model: EnergyModel, - expression_evaluator: ExpressionEvaluator, ) -> Optional[dict[str, EmissionResult]]: venting_emitter_results = {} emission_rates = self.get_emissions( - expression_evaluator=expression_evaluator, regularity=energy_model.get_regularity(self.id), ) for emission_name, emission_rate in emission_rates.items(): emission_result = EmissionResult( name=emission_name, - periods=expression_evaluator.get_periods(), + periods=self.expression_evaluator.get_periods(), rate=emission_rate, ) venting_emitter_results[emission_name] = emission_result + self.set_emission_results(venting_emitter_results) return venting_emitter_results def get_emissions( self, - expression_evaluator: ExpressionEvaluator, regularity: dict[datetime, Expression], ) -> dict[str, TimeSeriesStreamDayRate]: - regularity_evaluated = expression_evaluator.evaluate(expression=TemporalModel(regularity)) + regularity_evaluated = self.expression_evaluator.evaluate(expression=TemporalModel(regularity)) - oil_rates = expression_evaluator.evaluate( + oil_rates = self.expression_evaluator.evaluate( expression=Expression.setup_from_expression(value=self.volume.rate.value) ) @@ -300,7 +322,9 @@ def get_emissions( emissions = {} for emission in self.volume.emissions: - factors = expression_evaluator.evaluate(Expression.setup_from_expression(value=emission.emission_factor)) + factors = self.expression_evaluator.evaluate( + Expression.setup_from_expression(value=emission.emission_factor) + ) unit = self.volume.rate.unit.to_unit() oil_rates = unit.to(Unit.STANDARD_CUBIC_METER_PER_DAY)(oil_rates) @@ -310,7 +334,7 @@ def get_emissions( emission_rate = Unit.KILO_PER_DAY.to(Unit.TONS_PER_DAY)(emission_rate) emissions[emission.name] = TimeSeriesStreamDayRate( - periods=expression_evaluator.get_periods(), + periods=self.expression_evaluator.get_periods(), values=emission_rate, unit=Unit.TONS_PER_DAY, ) @@ -318,10 +342,9 @@ def get_emissions( def get_oil_rates( self, - expression_evaluator: ExpressionEvaluator, regularity: TimeSeriesFloat, ) -> TimeSeriesStreamDayRate: - oil_rates = expression_evaluator.evaluate(expression=convert_expression(self.volume.rate.value)) + oil_rates = self.expression_evaluator.evaluate(expression=convert_expression(self.volume.rate.value)) if self.volume.rate.type == RateType.CALENDAR_DAY: oil_rates = Rates.to_stream_day( @@ -333,7 +356,7 @@ def get_oil_rates( oil_rates = unit.to(Unit.STANDARD_CUBIC_METER_PER_DAY)(oil_rates) return TimeSeriesStreamDayRate( - periods=expression_evaluator.get_periods(), + periods=self.expression_evaluator.get_periods(), values=oil_rates, unit=Unit.STANDARD_CUBIC_METER_PER_DAY, ) diff --git a/tests/libecalc/common/utils/test_compute_emission_intensity_yearly.py b/tests/libecalc/common/utils/test_compute_emission_intensity_yearly.py index e69de29bb2..9ae681f30d 100644 --- a/tests/libecalc/common/utils/test_compute_emission_intensity_yearly.py +++ b/tests/libecalc/common/utils/test_compute_emission_intensity_yearly.py @@ -0,0 +1,254 @@ +import math +from datetime import datetime +from typing import List, Tuple + +import numpy as np + +from libecalc.common.time_utils import Periods, Period, Frequency +from libecalc.common.units import Unit +from libecalc.common.utils.emission_intensity import EmissionIntensity +from libecalc.common.utils.rates import ( + Rates, + RateType, + TimeSeriesRate, + TimeSeriesVolumes, + TimeSeriesVolumesCumulative, +) + + +def _setup_intensity_testcase( + periods: Periods, +) -> Tuple[TimeSeriesRate, TimeSeriesVolumes, TimeSeriesRate, TimeSeriesVolumes]: + number_of_periods = len(periods) + emission_rate = np.full(shape=number_of_periods, fill_value=1.0) + hcexport_rate = np.asarray(list(range(number_of_periods, 0, -1))) + + emission_cumulative = Rates.compute_cumulative_volumes_from_daily_rates(rates=emission_rate, periods=periods) + hcexport_cumulative = Rates.compute_cumulative_volumes_from_daily_rates(rates=hcexport_rate, periods=periods) + + return ( + TimeSeriesRate( + values=list(emission_rate), + periods=periods, + unit=Unit.KILO_PER_DAY, + rate_type=RateType.STREAM_DAY, + regularity=[1.0] * number_of_periods, + ), + TimeSeriesVolumesCumulative(values=list(emission_cumulative), periods=periods, unit=Unit.KILO), + TimeSeriesRate( + values=list(hcexport_rate), + periods=periods, + unit=Unit.STANDARD_CUBIC_METER_PER_DAY, + rate_type=RateType.STREAM_DAY, + regularity=[1.0] * number_of_periods, + ), + TimeSeriesVolumesCumulative(values=list(hcexport_cumulative), periods=periods, unit=Unit.STANDARD_CUBIC_METER), + ) + + +class TestEmissionIntensityByPeriods: + def test_emission_intensity_single_year(self): + periods = Periods.create_periods( + [datetime(2000, 7, 1), datetime(2001, 7, 1)], + include_before=False, + include_after=False, + ) + emission_intensity = EmissionIntensity( + emission_cumulative=TimeSeriesVolumes(values=[0.0], periods=periods, unit=Unit.KILO).cumulative(), + hydrocarbon_export_cumulative=TimeSeriesVolumes( + values=[0.0], periods=periods, unit=Unit.STANDARD_CUBIC_METER + ).cumulative(), + ).calculate() + assert len(emission_intensity) == 1 + assert math.isnan(emission_intensity.values[0]) + + def test_emission_intensity(self): + # Test where time vector starts at 1.1 and all year starts are in time vector + periods = Periods( + [ + Period(start=datetime(year=2000, month=1, day=1), end=datetime(2000, 7, 2)), + Period(start=datetime(year=2000, month=7, day=2), end=datetime(2001, 1, 1)), + Period(start=datetime(year=2001, month=1, day=1), end=datetime(2001, 7, 2)), + ] + ) + + ( + emission_rate, + emission_cumulative, + hcexport_rate, + hcexport_cumulative, + ) = _setup_intensity_testcase(periods=periods) + + # Yearly emission intensities + emission_period_1 = emission_cumulative.values[0] + emission_period_2 = emission_cumulative.values[1] - emission_cumulative.values[0] + emission_period_3 = emission_cumulative.values[2] - emission_cumulative.values[1] + hcexport_period_1 = hcexport_cumulative.values[0] + hcexport_period_2 = hcexport_cumulative.values[1] - hcexport_cumulative.values[0] + hcexport_period_3 = hcexport_cumulative.values[2] - hcexport_cumulative.values[1] + intensity_period_1 = emission_period_1 / hcexport_period_1 + intensity_period_2 = emission_period_2 / hcexport_period_2 + intensity_period_3 = emission_period_3 / hcexport_period_3 + + emission_intensity_yearly = EmissionIntensity( + emission_cumulative=emission_cumulative, + hydrocarbon_export_cumulative=hcexport_cumulative, + ).calculate_periods() + # TODO: why did this fail? + assert np.all(emission_intensity_yearly.values[0] == intensity_period_1) + assert np.all(emission_intensity_yearly.values[1] == intensity_period_2) + assert np.all(emission_intensity_yearly.values[2] == intensity_period_3) + + def test_emission_intensity_without_start_of_the_year_in_time_vector(self): + # Test where the dates in time_vector are not at year start + periods = Periods( + [ + Period( + start=datetime(year=2000, month=7, day=2, hour=12), end=datetime(year=2001, month=7, day=2, hour=12) + ), + Period( + start=datetime(year=2001, month=7, day=2, hour=12), end=datetime(year=2002, month=7, day=2, hour=12) + ), + ] + ) + ( + emission_rate, + emission_cumulative, + hcexport_rate, + hcexport_cumulative, + ) = _setup_intensity_testcase(periods=periods) + + emission_period_1 = 365 * emission_rate.values[0] + emission_period_2 = 365 * emission_rate.values[1] + hcexport_period_1 = 365 * hcexport_rate.values[0] + hcexport_period_2 = 365 * hcexport_rate.values[1] + intensity_period_1 = emission_period_1 / hcexport_period_1 + intensity_period_2 = emission_period_2 / hcexport_period_2 + + emission_intensity_yearly = EmissionIntensity( + emission_cumulative=emission_cumulative, + hydrocarbon_export_cumulative=hcexport_cumulative, + ).calculate_periods() + + assert emission_intensity_yearly.values[0] == intensity_period_1 + assert emission_intensity_yearly.values[1] == intensity_period_2 + + def test_emission_intensity_year_not_present_in_time_vector(self): + # Test where some years are not present in time_vector + periods = Periods( + [ + Period( + start=datetime(year=2000, month=7, day=2, hour=12), end=datetime(year=2002, month=7, day=2, hour=12) + ), + Period( + start=datetime(year=2002, month=7, day=2, hour=12), end=datetime(year=2003, month=7, day=2, hour=12) + ), + ] + ) + ( + emission_rate, + emission_cumulative, + hcexport_rate, + hcexport_cumulative, + ) = _setup_intensity_testcase(periods=periods) + emission_period_1 = 365 * 2 * emission_rate.values[0] + emission_period_2 = 365 * emission_rate.values[1] + hcexport_period_1 = 365 * 2 * hcexport_rate.values[0] + hcexport_period_2 = 365 * hcexport_rate.values[1] + intensity_period_1 = emission_period_1 / hcexport_period_1 + intensity_period_2 = emission_period_2 / hcexport_period_2 + emission_intensity_yearly = EmissionIntensity( + emission_cumulative=emission_cumulative, + hydrocarbon_export_cumulative=hcexport_cumulative, + ).calculate_periods() + + assert emission_intensity_yearly.values[0] == intensity_period_1 + assert emission_intensity_yearly.values[1] == intensity_period_2 + + def test_emission_resample_by_year(self): + periods = Periods( + [ + Period(start=datetime(year=2000, month=7, day=1), end=datetime(2001, 7, 1)), + Period(start=datetime(year=2001, month=7, day=1), end=datetime(2002, 7, 1)), + ] + ) + + ( + emission_rate, + emission_cumulative, + hcexport_rate, + hcexport_cumulative, + ) = _setup_intensity_testcase(periods=periods) + + emission_intensity_yearly = EmissionIntensity( + emission_cumulative=emission_cumulative, hydrocarbon_export_cumulative=hcexport_cumulative + ).calculate_periods() + + resampled_intensity = emission_intensity_yearly.resample(Frequency.YEAR) + + # Calculate expected intensities + expected_emission_2000 = emission_rate.values[0] * 365 / 2 + expected_emission_2001 = emission_rate.values[0] * 365 / 2 + emission_rate.values[1] * 365 / 2 + expected_hcexport_2000 = hcexport_rate.values[0] * 365 / 2 + expected_hcexport_2001 = hcexport_rate.values[0] * 365 / 2 + hcexport_rate.values[1] * 365 / 2 + + expected_intensity_2000 = expected_emission_2000 / expected_hcexport_2000 + expected_intensity_2001 = expected_emission_2001 / expected_hcexport_2001 + + assert resampled_intensity.values[0] == expected_intensity_2000 + assert resampled_intensity.values[1] == expected_intensity_2001 + + +class TestEmissionIntensity: + def test_emission_intensity_single_year(self): + periods = Periods.create_periods( + [datetime(2000, 7, 1), datetime(2000, 12, 31, 23, 59, 59)], + include_before=False, + include_after=False, + ) + + emission_cumulative_single_year = TimeSeriesVolumesCumulative( + periods=periods, + values=[0.0], + unit=Unit.KILO, + ) + hydrocarbon_export_cumulative_single_year = TimeSeriesVolumesCumulative( + periods=periods, + values=[0.0], + unit=Unit.STANDARD_CUBIC_METER, + ) + + emission_intensity = EmissionIntensity( + emission_cumulative=emission_cumulative_single_year, + hydrocarbon_export_cumulative=hydrocarbon_export_cumulative_single_year, + ).calculate() + + assert len(emission_intensity) == 1 + assert math.isnan(emission_intensity.values[0]) + + def test_emission_intensity(self): + periods = Periods( + [ + Period(start=datetime(year=2000, month=1, day=1), end=datetime(year=2000, month=7, day=2)), + Period(start=datetime(year=2000, month=7, day=2), end=datetime(year=2001, month=1, day=1)), + Period(start=datetime(year=2001, month=1, day=1), end=datetime(year=2001, month=7, day=2)), + ] + ) + + ( + emission_rate, + emission_cumulative, + hcexport_rate, + hcexport_cumulative, + ) = _setup_intensity_testcase(periods=periods) + + # Yearly emission intensities + emission = emission_cumulative.values[-1] + hcexport = hcexport_cumulative.values[-1] + intensity = emission / hcexport + + emission_intensity = EmissionIntensity( + emission_cumulative=emission_cumulative, hydrocarbon_export_cumulative=hcexport_cumulative + ).calculate() + + assert emission_intensity.values[-1] == intensity diff --git a/tests/libecalc/presentation/exporter/test_ltp.py b/tests/libecalc/presentation/exporter/test_ltp.py index 906c0ff5f1..fd9731abda 100644 --- a/tests/libecalc/presentation/exporter/test_ltp.py +++ b/tests/libecalc/presentation/exporter/test_ltp.py @@ -905,7 +905,7 @@ def test_electrical_and_mechanical_power_installation( ): """Check that new total power includes the sum of electrical- and mechanical power at installation level""" variables = ltp_test_helper.create_variables_map(ltp_test_helper.time_vector_installation) - + regularity = [ltp_test_helper.regularity_installation] * variables.number_of_periods fuel = fuel_gas_factory(["co2"], [ltp_test_helper.co2_factor]) generator_set = ltp_test_helper.generator_set(request) @@ -943,7 +943,7 @@ def test_electrical_and_mechanical_power_installation( asset = ltp_test_helper.get_yaml_model(request, asset=asset, resources=resources, frequency=Frequency.YEAR) - asset_result = ltp_test_helper.calculate_asset_result(model=asset, variables=variables) + asset_result = ltp_test_helper.calculate_asset_result(model=asset, variables=asset.variables) power_fuel_driven_compressor = asset_result.get_component_by_name("compressor").power_cumulative.values[-1] power_generator_set = asset_result.get_component_by_name("generator_set").power_cumulative.values[-1]