Skip to content

Commit

Permalink
fix node aggregator, move to 0.3.5
Browse files Browse the repository at this point in the history
  • Loading branch information
nguu0123 committed Sep 10, 2024
1 parent 9ebfb2e commit 79b9e69
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 26 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ include = ["src/qoa4ml"]

[project]
name = "qoa4ml"
version = "0.3.4"
version = "0.3.5"
description = "Quality of Analysis for Machine Learning"
readme = "README.md"

Expand Down
2 changes: 1 addition & 1 deletion requirements-dev.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#
# last locked with the following flags:
# pre: false
# features: []
# features: ["test,dev"]
# all-features: false
# with-sources: false
# generate-hashes: false
Expand Down
2 changes: 1 addition & 1 deletion requirements.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#
# last locked with the following flags:
# pre: false
# features: []
# features: ["test,dev"]
# all-features: false
# with-sources: false
# generate-hashes: false
Expand Down
52 changes: 29 additions & 23 deletions src/qoa4ml/observability/odop_obs/node_aggregator.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import logging
import socket
from datetime import datetime
Expand Down Expand Up @@ -53,42 +54,47 @@ def __init__(self, config: NodeAggregatorConfig, odop_path: str):
methods=[self.config.query_method],
)

def process_report(self, report):
def process_report(self, report: str):
report_dict = json.loads(report)
if self.environment == EnvironmentEnum.hpc:
if report["type"] == "system":
del report["type"]
if report_dict["type"] == "system":
del report_dict["type"]
metadata = flatten(
{"metadata": report["metadata"]}, self.config.data_separator
{"metadata": report_dict["metadata"]}, self.config.data_separator
)
timestamp = report_dict["timestamp"]
del report_dict["metadata"], report_dict["timestamp"]
fields = self.convert_unit(
flatten(report_dict, self.config.data_separator)
)
timestamp = report["timestamp"]
del report["metadata"], report["timestamp"]
fields = self.convert_unit(flatten(report, self.config.data_separator))
self.embedded_database.insert(
timestamp,
{"type": "node", **metadata},
fields,
)
elif report["type"] == "process":
del report["type"]
elif report_dict["type"] == "process":
del report_dict["type"]
metadata = flatten(
{"metadata": report["metadata"]}, self.config.data_separator
{"metadata": report_dict["metadata"]}, self.config.data_separator
)
timestamp = report_dict["timestamp"]
del report_dict["metadata"], report_dict["timestamp"]
fields = self.convert_unit(
flatten(report_dict, self.config.data_separator)
)
timestamp = report["timestamp"]
del report["metadata"], report["timestamp"]
fields = self.convert_unit(flatten(report, self.config.data_separator))
self.embedded_database.insert(
timestamp,
{"type": "process", **metadata},
fields,
)
else:
logging.error("Value Error: Unknown report type")
elif isinstance(report, SystemReport):
node_name = report.metadata.node_name
timestamp = report.timestamp
del report.metadata, report.timestamp
elif isinstance(report_dict, SystemReport):
node_name = report_dict.metadata.node_name
timestamp = report_dict.timestamp
del report_dict.metadata, report_dict.timestamp
fields = self.convert_unit(
flatten(report.dict(exclude_none=True), self.config.data_separator)
flatten(report_dict.dict(exclude_none=True), self.config.data_separator)
)
self.embedded_database.insert(
timestamp,
Expand All @@ -98,14 +104,14 @@ def process_report(self, report):
},
fields,
)
elif isinstance(report, ProcessReport):
elif isinstance(report_dict, ProcessReport):
metadata = flatten(
{"metadata": report.metadata.dict()}, self.config.data_separator
{"metadata": report_dict.metadata.dict()}, self.config.data_separator
)
timestamp = report.timestamp
del report.metadata, report.timestamp
timestamp = report_dict.timestamp
del report_dict.metadata, report_dict.timestamp
fields = self.convert_unit(
flatten(report.dict(exclude_none=True), self.config.data_separator)
flatten(report_dict.dict(exclude_none=True), self.config.data_separator)
)
self.embedded_database.insert(
timestamp, {"type": "process", **metadata}, fields
Expand Down

0 comments on commit 79b9e69

Please sign in to comment.