Skip to content

Commit

Permalink
closes #47, flowsheet solve supports nodes of type PartitionOperation…
Browse files Browse the repository at this point in the history
… (0.4.9)
  • Loading branch information
elphick committed Nov 25, 2024
1 parent 4fcb68c commit 34a6c83
Show file tree
Hide file tree
Showing 16 changed files with 354 additions and 70 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
Geomet 0.4.9 (2024-11-25)
=========================

Feature
-------

- Flowsheet solve supports nodes of type PartitionOperation (#47)


Geomet 0.4.8 (2024-11-03)
=========================

Expand Down
36 changes: 18 additions & 18 deletions docs/source/sg_execution_times.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

Computation times
=================
**00:14.610** total execution time for 17 files **from all galleries**:
**01:04.321** total execution time for 17 files **from all galleries**:

.. container::

Expand All @@ -32,8 +32,20 @@ Computation times
* - Example
- Time
- Mem (MB)
* - :ref:`sphx_glr_auto_examples_examples_02_interval_sample_04_partition_models.py` (``..\..\examples\02_interval_sample\04_partition_models.py``)
- 00:14.610
* - :ref:`sphx_glr_auto_examples_examples_04_block_model_02_create_block_model.py` (``..\..\examples\04_block_model\02_create_block_model.py``)
- 00:52.411
- 0.0
* - :ref:`sphx_glr_auto_examples_examples_04_block_model_03_load_block_model.py` (``..\..\examples\04_block_model\03_load_block_model.py``)
- 00:08.409
- 0.0
* - :ref:`sphx_glr_auto_examples_examples_03_flowsheet_02_flowsheet_from_file.py` (``..\..\examples\03_flowsheet\02_flowsheet_from_file.py``)
- 00:03.155
- 0.0
* - :ref:`sphx_glr_auto_examples_examples_06_map_01_mapping.py` (``..\..\examples\06_map\01_mapping.py``)
- 00:00.196
- 0.0
* - :ref:`sphx_glr_auto_examples_examples_05_mass_balance_01_mass_balance.py` (``..\..\examples\05_mass_balance\01_mass_balance.py``)
- 00:00.150
- 0.0
* - :ref:`sphx_glr_auto_examples_examples_01_getting_started_01_create_sample.py` (``..\..\examples\01_getting_started\01_create_sample.py``)
- 00:00.000
Expand All @@ -56,13 +68,13 @@ Computation times
* - :ref:`sphx_glr_auto_examples_examples_02_interval_sample_03_incremental_separation.py` (``..\..\examples\02_interval_sample\03_incremental_separation.py``)
- 00:00.000
- 0.0
* - :ref:`sphx_glr_auto_examples_examples_02_interval_sample_05_resampling_interval_data.py` (``..\..\examples\02_interval_sample\05_resampling_interval_data.py``)
* - :ref:`sphx_glr_auto_examples_examples_02_interval_sample_04_partition_models.py` (``..\..\examples\02_interval_sample\04_partition_models.py``)
- 00:00.000
- 0.0
* - :ref:`sphx_glr_auto_examples_examples_03_flowsheet_01_flowsheet_basics.py` (``..\..\examples\03_flowsheet\01_flowsheet_basics.py``)
* - :ref:`sphx_glr_auto_examples_examples_02_interval_sample_05_resampling_interval_data.py` (``..\..\examples\02_interval_sample\05_resampling_interval_data.py``)
- 00:00.000
- 0.0
* - :ref:`sphx_glr_auto_examples_examples_03_flowsheet_02_flowsheet_from_file.py` (``..\..\examples\03_flowsheet\02_flowsheet_from_file.py``)
* - :ref:`sphx_glr_auto_examples_examples_03_flowsheet_01_flowsheet_basics.py` (``..\..\examples\03_flowsheet\01_flowsheet_basics.py``)
- 00:00.000
- 0.0
* - :ref:`sphx_glr_auto_examples_examples_03_flowsheet_03_filtering_flowsheet.py` (``..\..\examples\03_flowsheet\03_filtering_flowsheet.py``)
Expand All @@ -71,15 +83,3 @@ Computation times
* - :ref:`sphx_glr_auto_examples_examples_04_block_model_01_consuming_omf.py` (``..\..\examples\04_block_model\01_consuming_omf.py``)
- 00:00.000
- 0.0
* - :ref:`sphx_glr_auto_examples_examples_04_block_model_02_create_block_model.py` (``..\..\examples\04_block_model\02_create_block_model.py``)
- 00:00.000
- 0.0
* - :ref:`sphx_glr_auto_examples_examples_04_block_model_03_load_block_model.py` (``..\..\examples\04_block_model\03_load_block_model.py``)
- 00:00.000
- 0.0
* - :ref:`sphx_glr_auto_examples_examples_05_mass_balance_01_mass_balance.py` (``..\..\examples\05_mass_balance\01_mass_balance.py``)
- 00:00.000
- 0.0
* - :ref:`sphx_glr_auto_examples_examples_06_map_01_mapping.py` (``..\..\examples\06_map\01_mapping.py``)
- 00:00.000
- 0.0
32 changes: 32 additions & 0 deletions elphick/geomet/config/flowsheet_example_partition.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
FLOWSHEET:
flowsheet:
name: Flowsheet
streams: # graph edges
Feed:
name: Feed
node_in: feed
node_out: screen
Coarse:
name: Coarse
node_in: screen
node_out: lump
Fine:
name: Fine
node_in: screen
node_out: fines
operations: # graph nodes
feed:
name: feed
screen:
name: screen
type: PartitionOperation
partition:
module: elphick.geomet.utils.partition
function: napier_munn_size_1mm
args: null # e.g. d50, ep if not defined in the (partial) function
output_stream: Lump
complement_stream: Fines
lump:
name: lump
fines:
name: fines
76 changes: 62 additions & 14 deletions elphick/geomet/flowsheet/flowsheet.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
from elphick.geomet import Sample
from elphick.geomet.base import MC
from elphick.geomet.config.config_read import get_column_config
from elphick.geomet.flowsheet.operation import NodeType, OP
from elphick.geomet.flowsheet.operation import NodeType, OP, PartitionOperation, Operation
from elphick.geomet.plot import parallel_plot, comparison_plot
from elphick.geomet.utils.layout import digraph_linear_layout
from elphick.geomet.flowsheet.loader import streams_from_dataframe
from elphick.geomet.utils.sampling import random_int

if TYPE_CHECKING:
from elphick.geomet.flowsheet.stream import Stream
# if TYPE_CHECKING:
from elphick.geomet.flowsheet.stream import Stream

# generic type variable, used for type hinting that play nicely with subclasses
FS = TypeVar('FS', bound='Flowsheet')
Expand Down Expand Up @@ -112,7 +112,7 @@ def from_dataframe(cls, df: pd.DataFrame, name: Optional[str] = 'Flowsheet',
return cls().from_objects(objects=streams, name=name)

@classmethod
def from_dict(cls, config: dict) -> FS:
def from_dict_old(cls, config: dict) -> FS:
"""Create a flowsheet from a dictionary
Args:
Expand All @@ -139,7 +139,17 @@ def from_dict(cls, config: dict) -> FS:
graph.add_edges_from(bunch_of_edges)
operation_objects: dict = {}
for node in graph.nodes:
operation_objects[node] = Operation(name=node)
# create the correct type of node object
if node in flowsheet_config['operations']:
operation_type = flowsheet_config['operations'][node].get('type', 'Operation')
if operation_type == 'PartitionOperation':
# get the output stream names from the graph
output_stream_names = [d['name'] for u, v, d in graph.out_edges(node, data=True)]
node_config = flowsheet_config['operations'][node]
node_config['output_stream_names'] = output_stream_names
operation_objects[node] = PartitionOperation.from_dict(node_config)
else:
operation_objects[node] = Operation.from_dict(flowsheet_config['operations'][node])
nx.set_node_attributes(graph, operation_objects, 'mc')

graph = nx.convert_node_labels_to_integers(graph)
Expand All @@ -149,6 +159,26 @@ def from_dict(cls, config: dict) -> FS:

return obj

@classmethod
def from_dict(cls, config: dict) -> FS:
flowsheet = cls()

# Process streams
for stream_name, stream_data in config['FLOWSHEET']['streams'].items():
stream = Stream.from_dict(stream_data)
flowsheet.add_stream(stream)

# Process operations
for operation_name, operation_data in config['FLOWSHEET']['operations'].items():
operation_type = operation_data.get('type', 'Operation')
if operation_type == 'PartitionOperation':
operation = PartitionOperation.from_dict(operation_data)
else:
operation = Operation.from_dict(operation_data)
flowsheet.add_operation(operation)

return flowsheet

@classmethod
def from_yaml(cls, file_path: Path) -> FS:
"""Create a flowsheet from yaml
Expand All @@ -162,7 +192,7 @@ def from_yaml(cls, file_path: Path) -> FS:
with open(file_path, 'r') as file:
config = yaml.safe_load(file)

return cls.from_dict(config)
return cls.from_dict_old(config)

@classmethod
def from_json(cls, file_path: Path) -> FS:
Expand All @@ -179,6 +209,14 @@ def from_json(cls, file_path: Path) -> FS:

return cls.from_dict(config)

def add_stream(self, stream: 'Stream'):
"""Add a stream to the flowsheet."""
self.graph.add_edge(stream.nodes[0], stream.nodes[1], mc=stream, name=stream.name)

def add_operation(self, operation: 'Operation'):
"""Add an operation to the flowsheet."""
self.graph.add_node(operation.name, mc=operation)

def copy_without_stream_data(self):
"""Copy without stream data"""
new_flowsheet = Flowsheet(name=self.name)
Expand Down Expand Up @@ -218,13 +256,24 @@ def solve(self):
edge_data['mc'].name = edge_data['name']

if self.graph.nodes[node]['mc'].has_empty_output:
mc: MC = self.graph.nodes[node]['mc'].solve()
# copy the solved object to the empty output edges
for successor in self.graph.successors(node):
edge_data = self.graph.get_edge_data(node, successor)
if edge_data and edge_data['mc'] is None:
edge_data['mc'] = mc
edge_data['mc'].name = edge_data['name']
# There are two cases to be managed, 1. a single output missing,
# 2. a partition operation that returns two outputs
if isinstance(self.graph.nodes[node]['mc'], PartitionOperation):
mc1, mc2 = self.graph.nodes[node]['mc'].solve()
# copy the solved object to the empty output edges
for successor in self.graph.successors(node):
edge_data = self.graph.get_edge_data(node, successor)
if edge_data and edge_data['mc'] is None:
edge_data['mc'] = mc1 if edge_data['name'] == 'preferred' else mc2
edge_data['mc'].name = edge_data['name']
else:
mc: MC = self.graph.nodes[node]['mc'].solve()
# copy the solved object to the empty output edges
for successor in self.graph.successors(node):
edge_data = self.graph.get_edge_data(node, successor)
if edge_data and edge_data['mc'] is None:
edge_data['mc'] = mc
edge_data['mc'].name = edge_data['name']

missing_count: int = sum([1 for u, v, d in self.graph.edges(data=True) if d['mc'] is None])

Expand Down Expand Up @@ -1037,4 +1086,3 @@ def reset_stream_nodes(self, stream: Optional[str] = None):
mc: MC = self.get_edge_by_name(stream)
mc.set_nodes([random_int(), random_int()])
self._update_graph(mc)

51 changes: 47 additions & 4 deletions elphick/geomet/flowsheet/operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@
import numpy as np
import pandas as pd

from elphick.geomet import IntervalSample
from elphick.geomet.base import MC
from elphick.geomet.flowsheet.stream import Stream
from elphick.geomet.utils.pandas import MeanIntervalIndex
from elphick.geomet.utils.partition import load_partition_function

# generic type variable, used for type hinting that play nicely with subclasses
OP = TypeVar('OP', bound='Operation')
Expand Down Expand Up @@ -180,6 +184,12 @@ def _get_object(self, name: Optional[str] = None) -> MC:
else:
return candidates[0]

@classmethod
def from_dict(cls, config: dict) -> 'Operation':
name = config.get('name')

return cls(name=name)


class Input(Operation):
def __init__(self, name):
Expand All @@ -196,8 +206,41 @@ def __init__(self, name):
super().__init__(name)


class UnitOperation(Operation):
def __init__(self, name, num_inputs, num_outputs):
class PartitionOperation(Operation):
"""An operation that partitions the input stream into multiple output streams based on a partition function
The partition input is the mean of the fractions or the geomean if the fractions are in the size dimension
The partition function is typically a partial function so that the partition is defined for all arguments
other than the input mean fraction values in one or two dimensions. The argument names must match the
index names in the IntervalSample.
"""

def __init__(self, name, partition=None):
super().__init__(name)
self.num_inputs = num_inputs
self.num_outputs = num_outputs
self.partition = partition
self.partition_function = None
if self.partition and 'module' in self.partition and 'function' in self.partition:
self.partition_function = load_partition_function(self.partition['module'], self.partition['function'])

def solve(self) -> [MC, MC]:
if self.partition_function:
self.apply_partition()
# update the balance related attributes
self.check_balance()
return self.outputs

def apply_partition(self):
if len(self.inputs) != 1:
raise ValueError("PartitionOperation must have exactly one input")
for input_sample in self.inputs:
input_sample: IntervalSample
if input_sample is not None:
output, complement = input_sample.split_by_partition(self.partition_function)
self.outputs = [output, complement]

@classmethod
def from_dict(cls, config: dict) -> 'PartitionOperation':
name = config.get('name')
partition = config.get('partition')
return cls(name=name, partition=partition)
7 changes: 7 additions & 0 deletions elphick/geomet/flowsheet/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,10 @@ def random_int():
# stream = cls(**filtered_kwargs)
# stream.__class__ = type(obj.__class__.__name__, (obj.__class__, cls), {})
# return stream

@classmethod
def from_dict(cls, config: dict) -> 'Stream':
name = config.get('name')
node_in = config.get('node_in')
node_out = config.get('node_out')
return cls(name=name).set_nodes([node_in, node_out])
46 changes: 31 additions & 15 deletions elphick/geomet/interval_sample.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from __future__ import annotations

import functools
from pathlib import Path
from typing import Optional, Literal, Callable, Union, Iterable, TYPE_CHECKING

Expand Down Expand Up @@ -104,31 +106,45 @@ def split_by_partition(self, partition_definition, name_1: str = 'preferred', na
K = \\frac{{m_{preferred}}}{{m_{feed}}}
:param partition_definition: A function that takes a data frame and returns a boolean series with a
range [0, 1].
range [0, 1]. A 1D function must have an argument that matches the dimension of the interval index.
A 2D function must have two arguments that match the dimensions of the interval index.
:param name_1: The name of the first sample.
:param name_2: The name of the second sample.
:return: A tuple of two IntervalSamples.
"""
if not isinstance(partition_definition, Callable):
raise TypeError("The definition is not a callable function")
if 'dim' not in partition_definition.keywords.keys():
raise NotImplementedError("The callable function passed does not have a dim")

dim = partition_definition.keywords['dim']
partition_definition.keywords.pop('dim')

# get the mean of the intervals - the geomean if the interval is called size
index = self.mass_data.index.get_level_values(dim)
# check the index is an interval index
if not isinstance(index, pd.IntervalIndex):
raise ValueError(f"The index is not an IntervalIndex. The index is {type(index)}")
index = MeanIntervalIndex(index)
x = index.mean
# Check that the partition definition has the correct number of arguments and that the names match
if isinstance(self.mass_data.index, pd.MultiIndex):
interval_levels = [level for level in self.mass_data.index.levels if isinstance(level, pd.IntervalIndex)]
else:
interval_levels = [self.mass_data.index] if isinstance(self.mass_data.index, pd.IntervalIndex) else []

# Get the function from the partial object if necessary
partition_func = partition_definition.func if isinstance(partition_definition,
functools.partial) else partition_definition

# Check that the required argument names are present in the IntervalIndex levels
required_args = partition_func.__code__.co_varnames[:len(interval_levels)]
for arg, level in zip(required_args, interval_levels):
if arg != level.name:
raise ValueError(f"The partition definition argument name does not match the index name. "
f"Expected {level.name}, found {arg}")

fraction_means: dict = {}
# iterate the Index or MultiIndex
if isinstance(self.mass_data.index, pd.MultiIndex):
for idx in self.mass_data.index.levels[0]:
# get the mean of the fractions, by converting to a MeanIntervalIndex
fraction_means[idx] = MeanIntervalIndex(self.mass_data.index.get_loc_level(idx)).mean
else:
fraction_means[self.mass_data.index.name] = MeanIntervalIndex(self.mass_data.index).mean

self.to_stream()
self: Stream
self: 'Stream'

pn: pd.Series = pd.Series(partition_definition(x), name='K', index=index)
pn: pd.Series = pd.Series(partition_definition(**fraction_means), name='K', index=self._mass_data.index)
sample_1 = self.create_congruent_object(name=name_1).to_stream()
sample_1.mass_data = self.mass_data.copy().multiply(pn, axis=0)
sample_1.set_nodes([self.nodes[1], random_int()])
Expand Down
Loading

0 comments on commit 34a6c83

Please sign in to comment.