From 79b9e692516544bb10350e31f0f66633ff55533e Mon Sep 17 00:00:00 2001 From: nguu0123 Date: Tue, 10 Sep 2024 11:15:05 +0300 Subject: [PATCH] fix node aggregator, move to 0.3.5 --- pyproject.toml | 2 +- requirements-dev.lock | 2 +- requirements.lock | 2 +- .../observability/odop_obs/node_aggregator.py | 52 +++++++++++-------- 4 files changed, 32 insertions(+), 26 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 6e36a1b..43fb036 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,7 +10,7 @@ include = ["src/qoa4ml"] [project] name = "qoa4ml" -version = "0.3.4" +version = "0.3.5" description = "Quality of Analysis for Machine Learning" readme = "README.md" diff --git a/requirements-dev.lock b/requirements-dev.lock index 0be5a29..0ac5ce9 100644 --- a/requirements-dev.lock +++ b/requirements-dev.lock @@ -3,7 +3,7 @@ # # last locked with the following flags: # pre: false -# features: [] +# features: ["test,dev"] # all-features: false # with-sources: false # generate-hashes: false diff --git a/requirements.lock b/requirements.lock index 13f0156..bddc916 100644 --- a/requirements.lock +++ b/requirements.lock @@ -3,7 +3,7 @@ # # last locked with the following flags: # pre: false -# features: [] +# features: ["test,dev"] # all-features: false # with-sources: false # generate-hashes: false diff --git a/src/qoa4ml/observability/odop_obs/node_aggregator.py b/src/qoa4ml/observability/odop_obs/node_aggregator.py index 6c153e1..8e2eb86 100644 --- a/src/qoa4ml/observability/odop_obs/node_aggregator.py +++ b/src/qoa4ml/observability/odop_obs/node_aggregator.py @@ -1,3 +1,4 @@ +import json import logging import socket from datetime import datetime @@ -53,29 +54,34 @@ def __init__(self, config: NodeAggregatorConfig, odop_path: str): methods=[self.config.query_method], ) - def process_report(self, report): + def process_report(self, report: str): + report_dict = json.loads(report) if self.environment == EnvironmentEnum.hpc: - if report["type"] == "system": - del report["type"] + if report_dict["type"] == "system": + del report_dict["type"] metadata = flatten( - {"metadata": report["metadata"]}, self.config.data_separator + {"metadata": report_dict["metadata"]}, self.config.data_separator + ) + timestamp = report_dict["timestamp"] + del report_dict["metadata"], report_dict["timestamp"] + fields = self.convert_unit( + flatten(report_dict, self.config.data_separator) ) - timestamp = report["timestamp"] - del report["metadata"], report["timestamp"] - fields = self.convert_unit(flatten(report, self.config.data_separator)) self.embedded_database.insert( timestamp, {"type": "node", **metadata}, fields, ) - elif report["type"] == "process": - del report["type"] + elif report_dict["type"] == "process": + del report_dict["type"] metadata = flatten( - {"metadata": report["metadata"]}, self.config.data_separator + {"metadata": report_dict["metadata"]}, self.config.data_separator + ) + timestamp = report_dict["timestamp"] + del report_dict["metadata"], report_dict["timestamp"] + fields = self.convert_unit( + flatten(report_dict, self.config.data_separator) ) - timestamp = report["timestamp"] - del report["metadata"], report["timestamp"] - fields = self.convert_unit(flatten(report, self.config.data_separator)) self.embedded_database.insert( timestamp, {"type": "process", **metadata}, @@ -83,12 +89,12 @@ def process_report(self, report): ) else: logging.error("Value Error: Unknown report type") - elif isinstance(report, SystemReport): - node_name = report.metadata.node_name - timestamp = report.timestamp - del report.metadata, report.timestamp + elif isinstance(report_dict, SystemReport): + node_name = report_dict.metadata.node_name + timestamp = report_dict.timestamp + del report_dict.metadata, report_dict.timestamp fields = self.convert_unit( - flatten(report.dict(exclude_none=True), self.config.data_separator) + flatten(report_dict.dict(exclude_none=True), self.config.data_separator) ) self.embedded_database.insert( timestamp, @@ -98,14 +104,14 @@ def process_report(self, report): }, fields, ) - elif isinstance(report, ProcessReport): + elif isinstance(report_dict, ProcessReport): metadata = flatten( - {"metadata": report.metadata.dict()}, self.config.data_separator + {"metadata": report_dict.metadata.dict()}, self.config.data_separator ) - timestamp = report.timestamp - del report.metadata, report.timestamp + timestamp = report_dict.timestamp + del report_dict.metadata, report_dict.timestamp fields = self.convert_unit( - flatten(report.dict(exclude_none=True), self.config.data_separator) + flatten(report_dict.dict(exclude_none=True), self.config.data_separator) ) self.embedded_database.insert( timestamp, {"type": "process", **metadata}, fields