Skip to content

Commit

Permalink
chore: emission intensity and results
Browse files Browse the repository at this point in the history
  • Loading branch information
frodehk committed Jan 23, 2025
1 parent 0537f20 commit 11efa7d
Show file tree
Hide file tree
Showing 17 changed files with 864 additions and 139 deletions.
8 changes: 7 additions & 1 deletion src/libecalc/application/energy/emitter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import abc
from typing import Optional

from pydantic import Field

from libecalc.application.energy.component_energy_context import ComponentEnergyContext
from libecalc.application.energy.energy_model import EnergyModel
from libecalc.common.variables import ExpressionEvaluator
Expand All @@ -12,6 +14,8 @@ class Emitter(abc.ABC):
Something that emits something.
"""

expression_evaluator: Optional[ExpressionEvaluator] = Field(default=None, exclude=True)

@property
@abc.abstractmethod
def id(self) -> str: ...
Expand All @@ -21,5 +25,7 @@ def evaluate_emissions(
self,
energy_context: ComponentEnergyContext,
energy_model: EnergyModel,
expression_evaluator: ExpressionEvaluator,
) -> Optional[dict[str, EmissionResult]]: ...

def set_expression_evaluator(self, expression_evaluator: ExpressionEvaluator):
self.expression_evaluator = expression_evaluator
7 changes: 1 addition & 6 deletions src/libecalc/application/energy_calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,7 @@ def evaluate_energy_usage(self) -> dict[str, EcalcModelResult]:
for energy_component in energy_components:
if hasattr(energy_component, "evaluate_energy_usage"):
context = self._get_context(energy_component.id)
self._consumer_results.update(
energy_component.evaluate_energy_usage(
context=context, expression_evaluator=self._expression_evaluator
)
)
self._consumer_results.update(energy_component.evaluate_energy_usage(context=context))

self._consumer_results = Numbers.format_results_to_precision(self._consumer_results, precision=6)
return self._consumer_results
Expand All @@ -91,7 +87,6 @@ def evaluate_emissions(self) -> dict[str, dict[str, EmissionResult]]:
emission_result = energy_component.evaluate_emissions(
energy_context=self._get_context(energy_component.id),
energy_model=self._energy_model,
expression_evaluator=self._expression_evaluator,
)

if emission_result is not None:
Expand Down
138 changes: 138 additions & 0 deletions src/libecalc/common/utils/emission_intensity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
from datetime import datetime
from typing import Optional

import numpy as np
import pandas as pd

from libecalc.common.time_utils import Period, Periods
from libecalc.common.units import Unit
from libecalc.common.utils.rates import (
TimeSeriesCalendarDayRate,
TimeSeriesIntensity,
TimeSeriesVolumesCumulative,
)


class EmissionIntensity:
def __init__(
self,
emission_cumulative: TimeSeriesVolumesCumulative,
hydrocarbon_export_cumulative: TimeSeriesVolumesCumulative,
unit: Unit = Unit.KG_SM3,
):
self.emission_cumulative = emission_cumulative
self.hydrocarbon_export_cumulative = hydrocarbon_export_cumulative
self.unit = unit
self.periods = emission_cumulative.periods
self.yearly_periods = self._create_yearly_periods()
self.time_vector = self.periods.all_dates
self.start_years = [period.start.year for period in self.periods]

def _create_yearly_periods(self) -> Periods:
yearly_periods = []
added_periods = set()

for period in self.periods:
for year in range(period.start.year, period.end.year + 1):
start_date = datetime(year, 1, 1)
end_date = datetime(year + 1, 1, 1)
period_tuple = (start_date, end_date)

if period_tuple not in added_periods:
yearly_periods.append(Period(start=start_date, end=end_date))
added_periods.add(period_tuple)

return Periods(yearly_periods)

def _calculate_yearly_old(self) -> list[float]:
"""Standard emission intensity at time k, is the sum of emissions from startup until time k
divided by the sum of export from startup until time k.
Thus, intensity_k = ( sum_t=1:k emission(t) ) / ( sum_t=1:k export(t) ).
The yearly emission intensity for year k is the sum of emissions in year k divided by
the sum of export in year k (and thus independent of the years before year k)
I.e. intensity_yearly_k = emission_year_k / export_year_k
emission_year_k may be computed as emission_cumulative(1. january year=k+1) - emission_cumulative(1. january year=k)
hcexport_year_k may be computed as hcexport_cumulative(1. january year=k+1) - hcexport_cumulative(1. january year=k)
To be able to evaluate cumulative emission and hydrocarbon_export at 1. january each year, a linear interpolation function
is created between the time vector and the cumulative function. To be able to treat time as the x-variable, this is
first converted to number of seconds from the beginning of the time vector
"""

df = pd.DataFrame(
data={
"emission_cumulative": self.emission_cumulative.values,
"hydrocarbon_cumulative": self.hydrocarbon_export_cumulative.values,
},
index=self.emission_cumulative.end_dates, # Assuming dates are aligned with cumulative values
)

# Reindex the DataFrame to match the time_vector and fill missing values
df = df.reindex(self.time_vector).ffill().fillna(0)

# df = pd.DataFrame(
# index=self.time_vector,
# data=list(zip(self.emission_cumulative.values, self.hydrocarbon_export_cumulative.values)),
# columns=["emission_cumulative", "hydrocarbon_cumulative"],
# )

# Extending the time vector back and forth 1 year used as padding when calculating yearly buckets.
time_vector_interpolated = pd.date_range(
start=datetime(self.time_vector[0].year - 1, 1, 1),
end=datetime(self.time_vector[-1].year + 1, 1, 1),
freq="YS",
)

# Linearly interpolating by time using the built-in functionality in Pandas.
cumulative_interpolated: Optional[pd.DataFrame] = df.reindex(
sorted(set(self.periods.all_dates).union(time_vector_interpolated))
).interpolate("time")

if cumulative_interpolated is None:
raise ValueError("Time interpolation of cumulative yearly emission intensity failed")

cumulative_yearly = cumulative_interpolated.bfill().loc[time_vector_interpolated]

# Remove the extrapolated timesteps
emissions_per_year = np.diff(cumulative_yearly.emission_cumulative[1:])
hcexport_per_year = np.diff(cumulative_yearly.hydrocarbon_cumulative[1:])

yearly_emission_intensity = np.divide(
emissions_per_year,
hcexport_per_year,
out=np.full_like(emissions_per_year, fill_value=np.nan),
where=hcexport_per_year != 0,
)

return yearly_emission_intensity.tolist()

def calculate_old(self) -> TimeSeriesCalendarDayRate:
"""Legacy code that computes yearly intensity and casts the results back to the original time-vector."""
yearly_buckets = range(self.time_vector[0].year, self.time_vector[-1].year + 1)
yearly_intensity = self._calculate_yearly()
return TimeSeriesCalendarDayRate(
periods=self.periods,
values=[yearly_intensity[yearly_buckets.index(period.start.year)] for period in self.periods],
unit=Unit.KG_SM3,
)

def calculate_periods(self):
emission_volumes = self.emission_cumulative.to_volumes()
hydrocarbon_export_volumes = self.hydrocarbon_export_cumulative.to_volumes()

intensity = emission_volumes / hydrocarbon_export_volumes

return TimeSeriesIntensity(
periods=self.periods,
values=intensity.values,
unit=self.unit,
)

def calculate(self) -> TimeSeriesIntensity:
"""Write description here"""
intensity = self.emission_cumulative / self.hydrocarbon_export_cumulative

return TimeSeriesIntensity(
periods=self.periods,
values=intensity.values,
unit=self.unit,
)
75 changes: 75 additions & 0 deletions src/libecalc/common/utils/rates.py
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,27 @@ def to_rate(self, regularity: Optional[list[float]] = None) -> TimeSeriesRate:
rate_type=RateType.CALENDAR_DAY,
)

def __truediv__(self, other: object) -> TimeSeriesCalendarDayRate:
if not isinstance(other, TimeSeriesVolumes):
raise TypeError(f"Dividing TimeSeriesVolumes by '{str(other.__class__)}' is not supported.")

if self.unit == Unit.KILO and other.unit == Unit.STANDARD_CUBIC_METER:
unit = Unit.KG_SM3
else:
raise ProgrammingError(
f"Unable to divide unit '{self.unit}' by unit '{other.unit}'. Please add unit conversion."
)
return TimeSeriesCalendarDayRate(
periods=self.periods,
values=np.divide(
self.values,
other.values,
out=np.full_like(self.values, fill_value=np.nan),
where=np.asarray(other.values) != 0.0,
).tolist(),
unit=unit,
)


class TimeSeriesStreamDayRate(TimeSeriesFloat):
"""
Expand Down Expand Up @@ -1061,3 +1082,57 @@ def to_stream_day_timeseries(self) -> TimeSeriesStreamDayRate:
return TimeSeriesStreamDayRate(
periods=stream_day_rate.periods, values=stream_day_rate.values, unit=stream_day_rate.unit
)


class TimeSeriesIntensity(TimeSeries[float]):
@field_validator("values", mode="before")
@classmethod
def convert_none_to_nan(cls, v: Any) -> list[TimeSeriesValue]:
if isinstance(v, list):
# convert None to nan
return [i if i is not None else math.nan for i in v]
return v

def resample(
self, freq: Frequency, include_start_date: bool = True, include_end_date: bool = True
) -> TimeSeriesIntensity:
"""
Resample emission intensity according to given frequency.
Slinear is used in order to only interpolate, not extrapolate.
Args:
freq: The frequency the time series should be resampled to
Returns:
TimeSeriesIntensity resampled to the given frequency
"""
if freq is Frequency.NONE:
return self.model_copy()

ds = pd.Series(index=self.all_dates, data=[0] + self.values)

new_periods = resample_periods(
self.periods, frequency=freq, include_start_date=include_start_date, include_end_date=include_end_date
)

new_dates = new_periods.all_dates
if ds.index[-1] not in new_dates:
logger.warning(
f"The final date in the rate input ({ds.index[-1].strftime('%m/%d/%Y')}) does not "
f"correspond to the end of a period with the requested output frequency. There is a "
f"possibility that the resampling will drop intensities."
)
ds_interpolated = ds.reindex(ds.index.union(new_dates)).interpolate("time")

# New resampled pd.Series
resampled: list[float] = ds_interpolated.reindex(new_dates).values.tolist()

if not include_start_date:
dropped_intensity = resampled[0]
resampled = [value - dropped_intensity for value in resampled[1:]]
else:
resampled = resampled[1:]

return TimeSeriesIntensity(
periods=new_periods,
values=resampled,
unit=self.unit,
)
5 changes: 5 additions & 0 deletions src/libecalc/common/variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import numpy as np
from numpy.typing import NDArray
from pydantic import BaseModel, ConfigDict, Field, model_validator
from pydantic_core import CoreSchema, core_schema

from libecalc.common.errors.exceptions import ProgrammingError
from libecalc.common.temporal_model import TemporalModel
Expand Down Expand Up @@ -176,3 +177,7 @@ def get_subset_for_period(self, period: Period) -> ExpressionEvaluator: ...
def evaluate(
self, expression: Union[Expression, TemporalModel, dict[Period, Expression]]
) -> NDArray[np.float64]: ...

@classmethod
def __get_pydantic_core_schema__(cls, source, handler) -> CoreSchema:
return core_schema.any_schema()
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def __init__(
ComponentType.COMPRESSOR_SYSTEM,
],
energy_usage_model: dict[Period, ElectricEnergyUsageModel],
expression_evaluator: ExpressionEvaluator,
consumes: Literal[ConsumptionType.ELECTRICITY] = ConsumptionType.ELECTRICITY,
):
self.name = name
Expand All @@ -47,6 +48,7 @@ def __init__(
self.energy_usage_model = self.check_energy_usage_model(energy_usage_model)
self._validate_el_consumer_temporal_model(self.energy_usage_model)
self._check_model_energy_usage(self.energy_usage_model)
self.expression_evaluator = expression_evaluator
self.consumes = consumes
self.component_type = component_type

Expand Down Expand Up @@ -78,9 +80,10 @@ def get_component_process_type(self) -> ComponentType:
def get_name(self) -> str:
return self.name

def evaluate_energy_usage(
self, expression_evaluator: ExpressionEvaluator, context: ComponentEnergyContext
) -> dict[str, EcalcModelResult]:
def set_consumer_results(self, consumer_results: dict[str, EcalcModelResult]):
self.consumer_results = consumer_results

def evaluate_energy_usage(self, context: ComponentEnergyContext) -> dict[str, EcalcModelResult]:
consumer_results: dict[str, EcalcModelResult] = {}
consumer = ConsumerEnergyComponent(
id=self.id,
Expand All @@ -95,7 +98,8 @@ def evaluate_energy_usage(
}
),
)
consumer_results[self.id] = consumer.evaluate(expression_evaluator=expression_evaluator)
consumer_results[self.id] = consumer.evaluate(expression_evaluator=self.expression_evaluator)
self.set_consumer_results(consumer_results)

return consumer_results

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,15 @@ def __init__(
],
fuel: dict[Period, FuelType],
energy_usage_model: dict[Period, FuelEnergyUsageModel],
expression_evaluator: ExpressionEvaluator,
consumes: Literal[ConsumptionType.FUEL] = ConsumptionType.FUEL,
):
self.name = name
self.regularity = self.check_regularity(regularity)
validate_temporal_model(self.regularity)
self.user_defined_category = user_defined_category
self.energy_usage_model = self.check_energy_usage_model(energy_usage_model)
self.expression_evaluator = expression_evaluator
self.fuel = self.validate_fuel_exist(name=self.name, fuel=fuel, consumes=consumes)
self._validate_fuel_consumer_temporal_models(self.energy_usage_model, self.fuel)
self._check_model_energy_usage(self.energy_usage_model)
Expand Down Expand Up @@ -88,9 +90,13 @@ def get_component_process_type(self) -> ComponentType:
def get_name(self) -> str:
return self.name

def evaluate_energy_usage(
self, expression_evaluator: ExpressionEvaluator, context: ComponentEnergyContext
) -> dict[str, EcalcModelResult]:
def set_emission_results(self, emission_results: dict[str, EmissionResult]):
self.emission_results = emission_results

def set_consumer_results(self, consumer_results: dict[str, EcalcModelResult]):
self.consumer_results = consumer_results

def evaluate_energy_usage(self, context: ComponentEnergyContext) -> dict[str, EcalcModelResult]:
consumer_results: dict[str, EcalcModelResult] = {}
consumer = ConsumerEnergyComponent(
id=self.id,
Expand All @@ -105,25 +111,27 @@ def evaluate_energy_usage(
}
),
)
consumer_results[self.id] = consumer.evaluate(expression_evaluator=expression_evaluator)
consumer_results[self.id] = consumer.evaluate(expression_evaluator=self.expression_evaluator)
self.set_consumer_results(consumer_results)

return consumer_results

def evaluate_emissions(
self,
energy_context: ComponentEnergyContext,
energy_model: EnergyModel,
expression_evaluator: ExpressionEvaluator,
) -> Optional[dict[str, EmissionResult]]:
fuel_model = FuelModel(self.fuel)
fuel_usage = energy_context.get_fuel_usage()

assert fuel_usage is not None

return fuel_model.evaluate_emissions(
expression_evaluator=expression_evaluator,
emissions = fuel_model.evaluate_emissions(
expression_evaluator=self.expression_evaluator,
fuel_rate=fuel_usage.values,
)
self.set_emission_results(emissions)
return emissions

@staticmethod
def check_energy_usage_model(energy_usage_model: dict[Period, FuelEnergyUsageModel]):
Expand Down
Loading

0 comments on commit 11efa7d

Please sign in to comment.