Skip to content

Commit

Permalink
refactor: collect results in priority optimizer
Browse files Browse the repository at this point in the history
  • Loading branch information
jsolaas committed Nov 16, 2023
1 parent e8ef9b9 commit 16b9ccc
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 45 deletions.
55 changes: 52 additions & 3 deletions src/libecalc/common/priority_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,68 @@
TResult = TypeVar("TResult")
TPriority = TypeVar("TPriority")

ComponentID = str


@dataclass
class PriorityOptimizerResult(Generic[TResult]):
priorities_used: TimeSeriesString
priority_results: Dict[datetime, Dict[PriorityID, Dict[str, TResult]]]
priority_results: List[typing.Any] # TODO: typing. This is the consumer results merged based on priorities used


@dataclass
class EvaluatorResult(Generic[TResult]):
id: str
id: ComponentID
result: TResult
is_valid: TimeSeriesBoolean


class PriorityOptimizer(Generic[TResult, TPriority]):
@staticmethod
def get_component_ids(
priority_results: Dict[datetime, Dict[PriorityID, Dict[ComponentID, TResult]]]
) -> List[ComponentID]:
component_ids = []
for timestep in priority_results:
for priority in priority_results[timestep]:
for component_id in priority_results[timestep][priority].keys():
if component_id not in component_ids:
component_ids.append(component_id)
return component_ids

def collect_consumer_results(
self,
priorities_used: TimeSeriesString,
priority_results: Dict[datetime, Dict[PriorityID, Dict[ComponentID, TResult]]],
) -> List[typing.Any]: # TODO: Any type since we don't have access to component_result within TResult
"""
Merge consumer results into a single result per consumer based on the operational settings used. I.e. pick results
from the correct operational setting result and merge into a single result per consumer.
Args:
priorities_used:
priority_results:
Returns: List of merged consumer results
"""
component_ids = self.get_component_ids(priority_results)

consumer_results: Dict[ComponentID, typing.Any] = {}
for component_id in component_ids:
for timestep_index, timestep in enumerate(priorities_used.timesteps):
priority_used = priorities_used.values[timestep_index]
prev_result = consumer_results.get(component_id)
consumer_result_subset = priority_results[timestep][priority_used][
component_id
].component_result # TODO: Accessing something the type does not make clear exists

if prev_result is None:
consumer_results[component_id] = consumer_result_subset
else:
consumer_results[component_id] = prev_result.merge(consumer_result_subset)

return list(consumer_results.values())

def optimize(
self,
timesteps: List[datetime],
Expand Down Expand Up @@ -93,5 +140,7 @@ def optimize(
break
return PriorityOptimizerResult(
priorities_used=priorities_used,
priority_results=dict(priority_results),
priority_results=self.collect_consumer_results(
priorities_used=priorities_used, priority_results=priority_results
),
)
44 changes: 3 additions & 41 deletions src/libecalc/core/consumers/consumer_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,12 @@
from libecalc.common.stream_conditions import StreamConditions
from libecalc.common.utils.rates import (
TimeSeriesInt,
TimeSeriesString,
)
from libecalc.core.consumers.base import BaseConsumer
from libecalc.core.consumers.compressor import Compressor
from libecalc.core.consumers.factory import create_consumer
from libecalc.core.consumers.pump import Pump
from libecalc.core.result import ConsumerSystemResult, EcalcModelResult
from libecalc.core.result.results import (
CompressorResult,
PumpResult,
)
from libecalc.dto import VariablesMap
from libecalc.dto.components import ConsumerComponent

Consumer = TypeVar("Consumer", bound=Union[Compressor, Pump])
Expand Down Expand Up @@ -108,7 +102,7 @@ def _get_stream_conditions_adjusted_for_crossover(

def evaluate(
self,
variables_map: VariablesMap,
timesteps: List[datetime],
system_stream_conditions_priorities: Priorities[Dict[str, List[StreamConditions]]],
) -> EcalcModelResult:
"""
Expand Down Expand Up @@ -147,13 +141,10 @@ def evaluator(
]

optimizer_result = optimizer.optimize(
timesteps=variables_map.time_vector, priorities=system_stream_conditions_priorities, evaluator=evaluator
timesteps=timesteps, priorities=system_stream_conditions_priorities, evaluator=evaluator
)

consumer_results = self.collect_consumer_results(
priorities_used=optimizer_result.priorities_used,
priority_results=optimizer_result.priority_results,
)
consumer_results = optimizer_result.priority_results

# Convert to legacy compatible operational_settings_used
priorities_to_int_map = {
Expand Down Expand Up @@ -190,35 +181,6 @@ def evaluator(
models=[],
)

def collect_consumer_results(
self,
priorities_used: TimeSeriesString,
priority_results: Dict[datetime, Dict[str, Dict[str, EcalcModelResult]]],
) -> List[Union[CompressorResult, PumpResult]]:
"""
Merge consumer results into a single result per consumer based on the operational settings used. I.e. pick results
from the correct operational setting result and merge into a single result per consumer.
Args:
priorities_used:
priority_results:
Returns:
"""
consumer_results: Dict[str, Union[CompressorResult, PumpResult]] = {}
for consumer in self._consumers:
for timestep_index, timestep in enumerate(priorities_used.timesteps):
priority_used = priorities_used.values[timestep_index]
prev_result = consumer_results.get(consumer.id)
consumer_result_subset = priority_results[timestep][priority_used][consumer.id].component_result

if prev_result is None:
consumer_results[consumer.id] = consumer_result_subset
else:
consumer_results[consumer.id] = prev_result.merge(consumer_result_subset)

return list(consumer_results.values())

@staticmethod
def _topologically_sort_consumers_by_crossover(
crossover: List[Crossover], consumers: List[Consumer]
Expand Down
2 changes: 1 addition & 1 deletion src/libecalc/core/ecalc.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def evaluate_energy_usage(self, variables_map: dto.VariablesMap) -> Dict[str, Ec
variables_map=variables_map,
)
system_result = consumer_system.evaluate(
variables_map=variables_map, system_stream_conditions_priorities=evaluated_stream_conditions
timesteps=variables_map.time_vector, system_stream_conditions_priorities=evaluated_stream_conditions
)
consumer_results[component_dto.id] = system_result
for consumer_result in system_result.sub_components:
Expand Down

0 comments on commit 16b9ccc

Please sign in to comment.