Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pacifist monitoring #40

Merged
merged 2 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions aiida_aurora/monitors.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from __future__ import annotations

from json import load
from tempfile import NamedTemporaryFile
from typing import Optional

from aiida.common.log import LOG_LEVEL_REPORT
from aiida.orm import CalcJobNode
from aiida.transports import Transport

Expand All @@ -13,7 +15,7 @@ def monitor_capacity_threshold(
transport: Transport,
settings: dict,
filename="snapshot.json",
) -> Optional[str]:
) -> str | None:
"""Retrieve and inspect snapshot to determine if capacity has
fallen below threshold for several consecutive cycles.

Expand Down Expand Up @@ -48,7 +50,6 @@ def monitor_capacity_threshold(
"""

analyzer = CapacityAnalyzer(**settings)
analyzer.set_logger(node.logger)

try:

Expand All @@ -72,7 +73,26 @@ def monitor_capacity_threshold(
if not snapshot:
raise ValueError

return analyzer.analyze(snapshot)
analyzer.analyze(snapshot)

node.base.extras.set_many({
"status": analyzer.status,
"snapshot": analyzer.snapshot,
})

node.logger.log(LOG_LEVEL_REPORT, analyzer.report)

if node.base.extras.get("marked_for_death", False):

node.base.extras.set("flag", "☠️")

if "snapshot" in node.base.extras:
node.base.extras.delete("snapshot")

return "Job terminated by monitor per user request"

if analyzer.flag:
node.base.extras.set("flag", f"🍅{analyzer.flag}")

except TypeError:
node.logger.error(f"'{filename}' not in dictionary format")
Expand Down
156 changes: 76 additions & 80 deletions aiida_aurora/utils/analyzers.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,14 @@
from itertools import groupby
from logging import LoggerAdapter
from typing import Optional
from __future__ import annotations

from aiida.common.log import AIIDA_LOGGER, LOG_LEVEL_REPORT
import numpy as np

from .parsers import get_data_from_raw


class Analyzer:
"""Base class for all analyzers.
"""Base class for all analyzers."""

Attributes
==========
`logger` : `Union[AiidaLoggerType, LoggerAdapter]`
The associated logger.
"""

logger = AIIDA_LOGGER.getChild("monitor")

def set_logger(self, logger: LoggerAdapter) -> None:
"""Set the analyzer logger.

Parameters
----------
`logger` : `LoggerAdapter`
The logger of the analyzed calculation node.
"""
self.logger = logger

def analyze(self, snapshot: dict) -> Optional[str]:
def analyze(self, snapshot: dict) -> None:
"""Analyze the experiment snapshot against a condition.

Condition is defined in subclass analyzers.
Expand All @@ -37,12 +17,6 @@ def analyze(self, snapshot: dict) -> Optional[str]:
----------
`snapshot` : `dict`
The loaded snapshot dictionary.

Returns
-------
`Optional[str]`
A string if a defined condition has been met,
`None` otherwise.
"""
raise NotImplementedError

Expand All @@ -67,6 +41,7 @@ def __init__(
check_type="discharge_capacity",
threshold=0.8,
consecutive_cycles=2,
keep_last=10,
) -> None:
"""`CapacityAnalyzer` constructor.

Expand All @@ -80,6 +55,8 @@ def __init__(
`consecutive_cycles` : `int`
The number of required consecutive cycles,
`2` by default.
`keep_last` : `int`
The number of cycles to keep in snapshot.

Raises
------
Expand All @@ -93,8 +70,13 @@ def __init__(
self.threshold = threshold
self.consecutive = consecutive_cycles
self.is_discharge = check_type == "discharge_capacity"
self.keep_last = keep_last

self.flag = ""
self.status = ""
self.report = ""

def analyze(self, snapshot: dict) -> Optional[str]:
def analyze(self, snapshot: dict) -> None:
"""Analyze the snapshot.

Check if capacity has fallen below threshold for required
Expand All @@ -104,84 +86,98 @@ def analyze(self, snapshot: dict) -> Optional[str]:
----------
`snapshot` : `dict`
The loaded snapshot dictionary.

Returns
-------
`Optional[str]`
If condition is met, an exit message, `None` otherwise.
"""
self.capacities = self._get_capacities(snapshot)
self.cycles = len(self.capacities)
return None if self.cycles < 1 else self._check_capacity()
self._extract_capacities(snapshot)
self._check_capacity()
self._truncate_snapshot()

###########
# PRIVATE #
###########

def _get_capacities(self, snapshot: dict):
def _extract_capacities(self, snapshot: dict) -> None:
"""Post-process the snapshot to extract capacities.

Parameters
----------
`snapshot` : `dict`
The loaded snapshot dictionary.

Returns
-------
`_type_`
A `numpy` array of capacities (in mAh), or empty list
if failed to process snapshot.
"""
try:
data = get_data_from_raw(snapshot)
return data['Qd'] if self.is_discharge else data['Qc']
self.snapshot = get_data_from_raw(snapshot)
self.capacities = self.snapshot["Qd"] \
if self.is_discharge \
else self.snapshot["Qc"]
except KeyError as err:
self.logger.error(f"missing '{str(err)}' in snapshot")
return []
self.report = f"missing '{str(err)}' in snapshot"
self.snapshot = {}
self.capacities = []

def _check_capacity(self) -> Optional[str]:
def _check_capacity(self) -> None:
"""Check if capacity has fallen below threshold for required
consecutive cycles.
consecutive cycles."""

Returns
-------
`Optional[str]`
If condition is met, an exit message, `None` otherwise.
"""
if (n := len(self.capacities)) < 2:
self.report = "need at least two complete cycles"
return

n = self.cycles
Qs = self.capacities[0]
Q = self.capacities[-1]
Q = self.capacities[-2]
Qt = self.threshold * Qs
C_per = Q / Qs * 100

message = f"cycle #{n} : {Q = :.2f} mAh ({Q / Qs * 100:.1f}%)"
self.report = f"cycle #{n} : {Q = :.2f} mAh ({C_per:.1f}%)"
self.status = f"(cycle #{n} : C @ {C_per:.1f}%)"

if Q < Qt:
message += f" : {(Qt - Q) / Qt * 100:.1f}% below threshold"
self.report += f" - {(Qt - Q) / Qt * 100:.1f}% below threshold"

below_threshold = np.where(self.capacities < Qt)[0] + 1
consecutively_below = self._filter_consecutive(below_threshold)

self.logger.log(LOG_LEVEL_REPORT, message)
if len(consecutively_below):

below_threshold = self._count_cycles_below_threshold()
if below_threshold >= self.consecutive:
return f"Capacity below threshold ({Qt:.2f} mAh) " \
f"for {below_threshold} cycles!"
cycles_str = str(consecutively_below).replace("'", "")
self.report += f" - cycles below threshold: {cycles_str}"

return None
if consecutively_below[-1] == n:
self.flag = "🔴"
else:
self.flag = "🟡"

def _count_cycles_below_threshold(self) -> int:
"""Count the number of consecutive cycles below threshold.
def _filter_consecutive(self, cycles: list[int]) -> list[int]:
"""Return cycles below threshold for `x` consecutive cycles.

Parameters
----------
`cycles` : `list[int]`
The cycles below threshold.

Returns
-------
`int`
The number of consecutive cycles below threshold.
`list[int]`
The cycles below threshold for `x` consecutive cycles.
"""
Qt = self.threshold * self.capacities[0]
return next(
(
len(list(group)) # cycle-count of first below-threshold group
for below, group in groupby(self.capacities < Qt)
if below
),
0,
)
return [
cycle for i, cycle in enumerate(cycles)
if i >= self.consecutive - 1 and \
all(cycles[i - j] == cycle - j for j in range(1, self.consecutive))
]

def _truncate_snapshot(self) -> None:
"""Truncate the snapshot to user defined size."""

truncated = {}

size = min(self.keep_last, len(self.snapshot["cycle-number"]))

for key, value in self.snapshot.items():

if key in ("time", "I", "Ewe", "Q"):
index = self.snapshot["cycle-index"][-size]
truncated[key] = value[index:]

elif key in ("cycle-number", "Qc", "Qd", "Ec", "Ed"):
truncated[key] = value[-size:]

self.snapshot = truncated
31 changes: 19 additions & 12 deletions aiida_aurora/utils/cycling_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import json

import numpy as np
from pandas import DataFrame
from pandas.io.formats.style import Styler

Expand Down Expand Up @@ -278,9 +279,6 @@ def process_data(node: CalcJobNode) -> tuple[dict, str, Styler | str]:
Post-processed data, warning, and analysis | error message.
"""

if node.process_state and "finished" not in node.process_state.value:
return {}, f"Job terminated with message '{node.process_status}'", ""

warning = ""

if node.exit_status:
Expand All @@ -289,16 +287,19 @@ def process_data(node: CalcJobNode) -> tuple[dict, str, Styler | str]:
warning += f"{node.exit_message}" if node.exit_message else generic
warning += "\n\n"

if "results" in node.outputs:
data = get_data_from_results(node.outputs.results)
elif "raw_data" in node.outputs:
data = get_data_from_file(node.outputs.raw_data)
elif "retrieved" in node.outputs:
data = get_data_from_file(node.outputs.retrieved)
elif "remote_folder" in node.outputs:
data = get_data_from_remote(node.outputs.remote_folder)
if node.exit_status is None:
data = get_data_from_snapshot(node.base.extras.get("snapshot", {}))
else:
data = {}
if "results" in node.outputs:
data = get_data_from_results(node.outputs.results)
elif "raw_data" in node.outputs:
data = get_data_from_file(node.outputs.raw_data)
elif "retrieved" in node.outputs:
data = get_data_from_file(node.outputs.retrieved)
elif "remote_folder" in node.outputs:
data = get_data_from_remote(node.outputs.remote_folder)
else:
data = {}

return data, warning, add_analysis(data)

Expand Down Expand Up @@ -346,6 +347,11 @@ def get_data_from_remote(source: RemoteData) -> dict:
return {}


def get_data_from_snapshot(snapshot: dict) -> dict:
"""docstring"""
return {k: np.array(v) for k, v in snapshot.items()}


def add_analysis(data: dict) -> Styler | str:
"""Return analysis details.

Expand Down Expand Up @@ -381,4 +387,5 @@ def add_analysis(data: dict) -> Styler | str:
]).hide(axis="index")

else:

return "ERROR! Failed to find or parse output"
Loading
Loading