From 16c4efe5490776f301b1e2cad7c98c33ab540f40 Mon Sep 17 00:00:00 2001 From: Jason Date: Sun, 20 Apr 2025 15:03:57 -0500 Subject: [PATCH 1/5] add metrics tracing script --- scripts/trace_metrics.py | 222 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 222 insertions(+) create mode 100644 scripts/trace_metrics.py diff --git a/scripts/trace_metrics.py b/scripts/trace_metrics.py new file mode 100644 index 00000000..7b2db324 --- /dev/null +++ b/scripts/trace_metrics.py @@ -0,0 +1,222 @@ +#!/usr/bin/env python3 +""" +metrics_trace.py – simplified metrics tracing, logging, and optional plotting. +Direct, precise, minimal. +""" + +import argparse, subprocess, os, signal, time, json, logging, tempfile +from collections import deque +from datetime import datetime +from pathlib import Path +from typing import Deque, TextIO, Any, Dict, List + +import numpy as np +import pandas as pd +import matplotlib.pyplot as plt + +# ───────────────────────── Configuration ────────────────────────── +REFRESH_SEC = 1 +HISTORY_SECONDS = 3600 +PANELS = [ + ("CPU %", ["cpu_pct"]), + ("Memory %", ["mem_pct"]), + ("Disk MB/s", ["disk_read_mb", "disk_write_mb"]), + ("GPU %", ["gpu_util"]), +] +# scratch file for Glances JSON +GLANCES_TMP = Path(tempfile.gettempdir()) / "system_metrics.tmp.json" +_MB = 1_048_576.0 + +# ────────────────────── Metric Extraction ────────────────────────── +def extract(j: Dict[str, Any]) -> Dict[str, float]: + cpu = j.get("cpu", {}) + mem = j.get("mem", {}) + # Disk + diskio = j.get("diskio", []) + read = write = 0.0 + if isinstance(diskio, list): + for d in diskio: + if not isinstance(d, dict): continue + read += d.get("rate/read_bytes", d.get("read_bytes", 0)) + write += d.get("rate/write_bytes", d.get("write_bytes", 0)) + elif isinstance(diskio, dict): + vals = diskio.values() + # nested dicts + if any(isinstance(v, dict) for v in vals): + for v in vals: + if not isinstance(v, dict): continue + read += v.get("rate/read_bytes", v.get("read_bytes", 0)) + write += v.get("rate/write_bytes", v.get("write_bytes", 0)) + else: + # flat mapping: keys like 'disk0.read_bytes' + for k, v in diskio.items(): + if k.endswith(".read_bytes") or k.endswith(".read_bytes_rate_per_sec"): read += v + if k.endswith(".write_bytes") or k.endswith(".write_bytes_rate_per_sec"): write += v + read /= _MB; write /= _MB + # GPU + gpu_src = j.get("gpu") + if isinstance(gpu_src, list) and gpu_src: + gpu0 = gpu_src[0] + elif isinstance(gpu_src, dict) and gpu_src: + gpu0 = next(iter(gpu_src.values())) + else: + gpu0 = {} + gpu_util = gpu0.get("proc") or gpu0.get("utilization") or np.nan + return { + "cpu_pct": cpu.get("user", 0) + cpu.get("system", 0), + "mem_pct": mem.get("percent", np.nan), + "disk_read_mb": read, + "disk_write_mb": write, + "gpu_util": gpu_util, + } + +# ───────────────────────── Utilities ────────────────────────────── +def start_glances() -> subprocess.Popen: + cmd = [ + "glances", "-q", f"-t{REFRESH_SEC}", + "--export", "json", "--export-json-file", str(GLANCES_TMP), + "--disable-plugin", "all", "--enable-plugin", "cpu,mem,diskio,gpu", + ] + logging.info("Starting Glances: %s", " ".join(cmd)) + p = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, + text=True, preexec_fn=os.setsid) + time.sleep(1.5) + if p.poll() is not None: + raise RuntimeError(p.stderr.read()) + return p + + +def stop_glances(p: subprocess.Popen | None): + if p and p.poll() is None: + os.killpg(p.pid, signal.SIGTERM) + try: p.wait(3) + except subprocess.TimeoutExpired: os.killpg(p.pid, signal.SIGKILL) + + +def read_all(path: Path) -> List[dict]: + txt = path.read_text(encoding="utf-8").strip() + if not txt: return [] + try: + data = json.loads(txt) + return data if isinstance(data, list) else [data] + except json.JSONDecodeError: + out = [] + for line in txt.splitlines(): + line = line.strip() + if not line: continue + try: out.append(json.loads(line)) + except json.JSONDecodeError: pass + return out + + +def parse_timestamp(r: dict) -> datetime: + raw = r.get("timestamp") or r.get("now") or r.get("ts") + if isinstance(raw, (int, float)): + return pd.to_datetime(raw, unit="s", utc=True).to_pydatetime() + dt = pd.to_datetime(raw, utc=True, errors="coerce") + return dt.to_pydatetime() if not pd.isna(dt) else datetime.utcnow() + + +def new_fig(n: int): + cols = 2 if n > 2 else 1 + rows = (n + cols - 1) // cols + fig, axes = plt.subplots(rows, cols, figsize=(6*cols, 3*rows), sharex=True) + return fig, axes.flatten() if n>1 else [axes] + + +def redraw(fig, axes, df: pd.DataFrame): + for ax in axes: ax.clear() + for ax, (title, cols) in zip(axes, PANELS): + if set(cols).issubset(df.columns): + df[cols].plot(ax=ax, lw=1.2) + ax.set_title(title, fontsize=9); ax.grid(True, ls="--", alpha=0.5) + fig.tight_layout(); fig.canvas.draw_idle() + +# ───────────────────────── Loops ────────────────────────────────── +def live_loop_plot(log_fp: TextIO, history: Deque[dict]): + fig, axes = new_fig(len(PANELS)) + plt.ion(); fig.show() + last_ts = None + try: + while True: + recs = read_all(GLANCES_TMP) + updated = False + for r in recs: + ts = parse_timestamp(r) + if last_ts is None or ts>last_ts: + row = {"ts": ts} | extract(r) + history.append(row); last_ts=ts; updated=True + log_fp.write(json.dumps({"ts": str(ts), **extract(r)})+"\n"); log_fp.flush() + if updated: + df = pd.DataFrame(history).set_index("ts") + redraw(fig, axes, df) + plt.pause(0.1) + except KeyboardInterrupt: + pass + finally: + plt.ioff(); plt.close('all') + + +def live_loop_log(log_fp: TextIO, history: Deque[dict]): + try: + while True: + recs = read_all(GLANCES_TMP) + for r in recs: + ts = parse_timestamp(r) + row = {"ts": ts} | extract(r) + history.append(row) + log_fp.write(json.dumps({"ts": str(ts), **extract(r)})+"\n"); log_fp.flush() + time.sleep(REFRESH_SEC) + except KeyboardInterrupt: + pass + + +def load_static(logfile: Path, history: Deque[dict]): + recs = read_all(logfile) + history.clear(); last_ts=None + for r in recs: + ts = parse_timestamp(r) + if last_ts is None or ts>last_ts: + if "cpu_pct" in r and "disk_read_mb" in r: + row={"ts":ts} + row.update({k:r.get(k, np.nan) for _,cols in PANELS for k in cols}) + else: + row={"ts":ts}|extract(r) + history.append(row); last_ts=ts + +# ───────────────────────── Main ──────────────────────────────────── +def main(): + ap = argparse.ArgumentParser(description="Simplified metrics tracing & logging") + ap.add_argument("-f","--file", default="outputs/metrics.log.ndjson", + help="ND-JSON log (live or replay)") + ap.add_argument("--live", action="store_true", help="Run live logging") + ap.add_argument("--plot", action="store_true", help="Enable plotting in live mode") + ap.add_argument("--debug",action="store_true",help="Verbose logging") + args=ap.parse_args() + + lvl = logging.DEBUG if args.debug else logging.INFO + logging.basicConfig(level=lvl,format="%(asctime)s %(levelname)s %(message)s",datefmt="%H:%M:%S") + + logfile = Path(args.file).expanduser() + logfile.parent.mkdir(parents=True,exist_ok=True) + history:Deque[dict] = deque(maxlen=HISTORY_SECONDS//REFRESH_SEC) + proc=None + try: + if args.live: + proc=start_glances() + with logfile.open("a",encoding="utf-8") as log_fp: + if args.plot: + live_loop_plot(log_fp,history) + else: + live_loop_log(log_fp,history) + else: + load_static(logfile,history) + df=pd.DataFrame(history).set_index("ts") + fig,axes=new_fig(len(PANELS)) + redraw(fig,axes,df) + plt.show() + finally: + stop_glances(proc) + +if __name__=="__main__": + main() From 7a9213feb1256888f15b67e71046633a17bdca8b Mon Sep 17 00:00:00 2001 From: Jason Date: Sun, 20 Apr 2025 15:07:08 -0500 Subject: [PATCH 2/5] add metrics tracing script --- scripts/trace_metrics.py | 108 ++++++++++++++++++++++----------------- 1 file changed, 61 insertions(+), 47 deletions(-) diff --git a/scripts/trace_metrics.py b/scripts/trace_metrics.py index 7b2db324..744151b7 100644 --- a/scripts/trace_metrics.py +++ b/scripts/trace_metrics.py @@ -6,7 +6,7 @@ import argparse, subprocess, os, signal, time, json, logging, tempfile from collections import deque -from datetime import datetime +from datetime import datetime, timezone from pathlib import Path from typing import Deque, TextIO, Any, Dict, List @@ -24,11 +24,12 @@ ("GPU %", ["gpu_util"]), ] # scratch file for Glances JSON -GLANCES_TMP = Path(tempfile.gettempdir()) / "system_metrics.tmp.json" +glances_tmp = Path(tempfile.gettempdir()) / "system_metrics.tmp.json" _MB = 1_048_576.0 # ────────────────────── Metric Extraction ────────────────────────── def extract(j: Dict[str, Any]) -> Dict[str, float]: + """Flatten JSON record to the metrics we plot.""" cpu = j.get("cpu", {}) mem = j.get("mem", {}) # Disk @@ -36,32 +37,33 @@ def extract(j: Dict[str, Any]) -> Dict[str, float]: read = write = 0.0 if isinstance(diskio, list): for d in diskio: - if not isinstance(d, dict): continue + if not isinstance(d, dict): + continue read += d.get("rate/read_bytes", d.get("read_bytes", 0)) write += d.get("rate/write_bytes", d.get("write_bytes", 0)) elif isinstance(diskio, dict): vals = diskio.values() - # nested dicts if any(isinstance(v, dict) for v in vals): for v in vals: if not isinstance(v, dict): continue read += v.get("rate/read_bytes", v.get("read_bytes", 0)) write += v.get("rate/write_bytes", v.get("write_bytes", 0)) else: - # flat mapping: keys like 'disk0.read_bytes' for k, v in diskio.items(): if k.endswith(".read_bytes") or k.endswith(".read_bytes_rate_per_sec"): read += v if k.endswith(".write_bytes") or k.endswith(".write_bytes_rate_per_sec"): write += v read /= _MB; write /= _MB - # GPU - gpu_src = j.get("gpu") - if isinstance(gpu_src, list) and gpu_src: - gpu0 = gpu_src[0] - elif isinstance(gpu_src, dict) and gpu_src: - gpu0 = next(iter(gpu_src.values())) - else: - gpu0 = {} + # GPU: find first dict in list or dict values + gpu_src = j.get("gpu") or [] + gpu_list = [] + if isinstance(gpu_src, dict): + gpu_list = list(gpu_src.values()) + elif isinstance(gpu_src, list): + gpu_list = gpu_src + # first dict entry or fallback {} + gpu0 = next((x for x in gpu_list if isinstance(x, dict)), {}) gpu_util = gpu0.get("proc") or gpu0.get("utilization") or np.nan + return { "cpu_pct": cpu.get("user", 0) + cpu.get("system", 0), "mem_pct": mem.get("percent", np.nan), @@ -74,7 +76,7 @@ def extract(j: Dict[str, Any]) -> Dict[str, float]: def start_glances() -> subprocess.Popen: cmd = [ "glances", "-q", f"-t{REFRESH_SEC}", - "--export", "json", "--export-json-file", str(GLANCES_TMP), + "--export", "json", "--export-json-file", str(glances_tmp), "--disable-plugin", "all", "--enable-plugin", "cpu,mem,diskio,gpu", ] logging.info("Starting Glances: %s", " ".join(cmd)) @@ -95,7 +97,8 @@ def stop_glances(p: subprocess.Popen | None): def read_all(path: Path) -> List[dict]: txt = path.read_text(encoding="utf-8").strip() - if not txt: return [] + if not txt: + return [] try: data = json.loads(txt) return data if isinstance(data, list) else [data] @@ -103,9 +106,12 @@ def read_all(path: Path) -> List[dict]: out = [] for line in txt.splitlines(): line = line.strip() - if not line: continue - try: out.append(json.loads(line)) - except json.JSONDecodeError: pass + if not line: + continue + try: + out.append(json.loads(line)) + except json.JSONDecodeError: + pass return out @@ -114,7 +120,7 @@ def parse_timestamp(r: dict) -> datetime: if isinstance(raw, (int, float)): return pd.to_datetime(raw, unit="s", utc=True).to_pydatetime() dt = pd.to_datetime(raw, utc=True, errors="coerce") - return dt.to_pydatetime() if not pd.isna(dt) else datetime.utcnow() + return dt.to_pydatetime() if not pd.isna(dt) else datetime.now(timezone.utc) def new_fig(n: int): @@ -125,12 +131,15 @@ def new_fig(n: int): def redraw(fig, axes, df: pd.DataFrame): - for ax in axes: ax.clear() + for ax in axes: + ax.clear() for ax, (title, cols) in zip(axes, PANELS): if set(cols).issubset(df.columns): df[cols].plot(ax=ax, lw=1.2) - ax.set_title(title, fontsize=9); ax.grid(True, ls="--", alpha=0.5) - fig.tight_layout(); fig.canvas.draw_idle() + ax.set_title(title, fontsize=9) + ax.grid(True, ls="--", alpha=0.5) + fig.tight_layout() + fig.canvas.draw_idle() # ───────────────────────── Loops ────────────────────────────────── def live_loop_plot(log_fp: TextIO, history: Deque[dict]): @@ -139,14 +148,17 @@ def live_loop_plot(log_fp: TextIO, history: Deque[dict]): last_ts = None try: while True: - recs = read_all(GLANCES_TMP) + recs = read_all(glances_tmp) updated = False for r in recs: ts = parse_timestamp(r) if last_ts is None or ts>last_ts: row = {"ts": ts} | extract(r) - history.append(row); last_ts=ts; updated=True - log_fp.write(json.dumps({"ts": str(ts), **extract(r)})+"\n"); log_fp.flush() + history.append(row) + last_ts = ts + updated = True + log_fp.write(json.dumps({"ts": str(ts), **extract(r)})+"\n") + log_fp.flush() if updated: df = pd.DataFrame(history).set_index("ts") redraw(fig, axes, df) @@ -160,12 +172,13 @@ def live_loop_plot(log_fp: TextIO, history: Deque[dict]): def live_loop_log(log_fp: TextIO, history: Deque[dict]): try: while True: - recs = read_all(GLANCES_TMP) + recs = read_all(glances_tmp) for r in recs: ts = parse_timestamp(r) row = {"ts": ts} | extract(r) history.append(row) - log_fp.write(json.dumps({"ts": str(ts), **extract(r)})+"\n"); log_fp.flush() + log_fp.write(json.dumps({"ts": str(ts), **extract(r)})+"\n") + log_fp.flush() time.sleep(REFRESH_SEC) except KeyboardInterrupt: pass @@ -173,47 +186,48 @@ def live_loop_log(log_fp: TextIO, history: Deque[dict]): def load_static(logfile: Path, history: Deque[dict]): recs = read_all(logfile) - history.clear(); last_ts=None + history.clear() + last_ts = None for r in recs: ts = parse_timestamp(r) if last_ts is None or ts>last_ts: if "cpu_pct" in r and "disk_read_mb" in r: row={"ts":ts} - row.update({k:r.get(k, np.nan) for _,cols in PANELS for k in cols}) + row.update({k: r.get(k, np.nan) for _,cols in PANELS for k in cols}) else: row={"ts":ts}|extract(r) - history.append(row); last_ts=ts + history.append(row) + last_ts = ts # ───────────────────────── Main ──────────────────────────────────── def main(): ap = argparse.ArgumentParser(description="Simplified metrics tracing & logging") - ap.add_argument("-f","--file", default="outputs/metrics.log.ndjson", - help="ND-JSON log (live or replay)") + ap.add_argument("-f","--file", default="outputs/metrics.log.ndjson", help="ND-JSON log (live or replay)") ap.add_argument("--live", action="store_true", help="Run live logging") - ap.add_argument("--plot", action="store_true", help="Enable plotting in live mode") - ap.add_argument("--debug",action="store_true",help="Verbose logging") - args=ap.parse_args() + ap.add_argument("--plot", action="store_true", help="Enable plotting") + ap.add_argument("--debug", action="store_true", help="Verbose logging") + args = ap.parse_args() lvl = logging.DEBUG if args.debug else logging.INFO - logging.basicConfig(level=lvl,format="%(asctime)s %(levelname)s %(message)s",datefmt="%H:%M:%S") + logging.basicConfig(level=lvl, format="%(asctime)s %(levelname)s %(message)s", datefmt="%H:%M:%S") logfile = Path(args.file).expanduser() - logfile.parent.mkdir(parents=True,exist_ok=True) - history:Deque[dict] = deque(maxlen=HISTORY_SECONDS//REFRESH_SEC) - proc=None + logfile.parent.mkdir(parents=True, exist_ok=True) + history: Deque[dict] = deque(maxlen=HISTORY_SECONDS//REFRESH_SEC) + proc = None try: if args.live: - proc=start_glances() - with logfile.open("a",encoding="utf-8") as log_fp: + proc = start_glances() + with logfile.open("a", encoding="utf-8") as log_fp: if args.plot: - live_loop_plot(log_fp,history) + live_loop_plot(log_fp, history) else: - live_loop_log(log_fp,history) + live_loop_log(log_fp, history) else: - load_static(logfile,history) - df=pd.DataFrame(history).set_index("ts") - fig,axes=new_fig(len(PANELS)) - redraw(fig,axes,df) + load_static(logfile, history) + df = pd.DataFrame(history).set_index("ts") + fig, axes = new_fig(len(PANELS)) + redraw(fig, axes, df) plt.show() finally: stop_glances(proc) From b118f98dead43a2bdff11f8b0fde2a07f83605da Mon Sep 17 00:00:00 2001 From: Jason Date: Sun, 20 Apr 2025 15:09:28 -0500 Subject: [PATCH 3/5] add metrics tracing script --- scripts/trace_metrics.py | 38 +++++++++++++++++++++++++------------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/scripts/trace_metrics.py b/scripts/trace_metrics.py index 744151b7..07008dd3 100644 --- a/scripts/trace_metrics.py +++ b/scripts/trace_metrics.py @@ -45,25 +45,37 @@ def extract(j: Dict[str, Any]) -> Dict[str, float]: vals = diskio.values() if any(isinstance(v, dict) for v in vals): for v in vals: - if not isinstance(v, dict): continue + if not isinstance(v, dict): + continue read += v.get("rate/read_bytes", v.get("read_bytes", 0)) write += v.get("rate/write_bytes", v.get("write_bytes", 0)) else: for k, v in diskio.items(): if k.endswith(".read_bytes") or k.endswith(".read_bytes_rate_per_sec"): read += v if k.endswith(".write_bytes") or k.endswith(".write_bytes_rate_per_sec"): write += v - read /= _MB; write /= _MB - # GPU: find first dict in list or dict values - gpu_src = j.get("gpu") or [] - gpu_list = [] - if isinstance(gpu_src, dict): - gpu_list = list(gpu_src.values()) - elif isinstance(gpu_src, list): - gpu_list = gpu_src - # first dict entry or fallback {} - gpu0 = next((x for x in gpu_list if isinstance(x, dict)), {}) - gpu_util = gpu0.get("proc") or gpu0.get("utilization") or np.nan - + read /= _MB + write /= _MB + # GPU + gpu_src = j.get("gpu") or {} + gpu_util: float = np.nan + # List of per-GPU dicts + if isinstance(gpu_src, list) and gpu_src: + first = gpu_src[0] + if isinstance(first, dict): + gpu_util = first.get("proc") or first.get("utilization") or np.nan + # Nested dicts under keys + elif isinstance(gpu_src, dict): + # Check for nested dict values + nested = [v for v in gpu_src.values() if isinstance(v, dict)] + if nested: + gpu0 = nested[0] + gpu_util = gpu0.get("proc") or gpu0.get("utilization") or np.nan + else: + # Flat mapping: look for .proc or .gpu_proc keys + for k, v in gpu_src.items(): + if k.endswith(".proc") or k.endswith(".gpu_proc"): + gpu_util = v + break return { "cpu_pct": cpu.get("user", 0) + cpu.get("system", 0), "mem_pct": mem.get("percent", np.nan), From b19ff5ef8e433cff608b55a5ec643ef7122bf1b1 Mon Sep 17 00:00:00 2001 From: Jason Date: Sun, 20 Apr 2025 15:15:17 -0500 Subject: [PATCH 4/5] add metrics tracing script --- scripts/trace_metrics.py | 184 ++++++++++++++++----------------------- 1 file changed, 76 insertions(+), 108 deletions(-) diff --git a/scripts/trace_metrics.py b/scripts/trace_metrics.py index 07008dd3..eecec6d0 100644 --- a/scripts/trace_metrics.py +++ b/scripts/trace_metrics.py @@ -1,14 +1,14 @@ #!/usr/bin/env python3 """ -metrics_trace.py – simplified metrics tracing, logging, and optional plotting. -Direct, precise, minimal. +metrics_trace.py – metrics tracing, logging, and optional plotting/saving. +Direct, precise, minimal; consolidated output directory. """ -import argparse, subprocess, os, signal, time, json, logging, tempfile +import argparse, subprocess, os, signal, time, json, logging from collections import deque from datetime import datetime, timezone from pathlib import Path -from typing import Deque, TextIO, Any, Dict, List +from typing import Deque, TextIO, Any, Dict, List, Optional import numpy as np import pandas as pd @@ -23,72 +23,57 @@ ("Disk MB/s", ["disk_read_mb", "disk_write_mb"]), ("GPU %", ["gpu_util"]), ] -# scratch file for Glances JSON -glances_tmp = Path(tempfile.gettempdir()) / "system_metrics.tmp.json" _MB = 1_048_576.0 # ────────────────────── Metric Extraction ────────────────────────── def extract(j: Dict[str, Any]) -> Dict[str, float]: - """Flatten JSON record to the metrics we plot.""" cpu = j.get("cpu", {}) mem = j.get("mem", {}) - # Disk + # Disk IO diskio = j.get("diskio", []) read = write = 0.0 if isinstance(diskio, list): for d in diskio: - if not isinstance(d, dict): - continue - read += d.get("rate/read_bytes", d.get("read_bytes", 0)) + if not isinstance(d, dict): continue + read += d.get("rate/read_bytes", d.get("read_bytes", 0)) write += d.get("rate/write_bytes", d.get("write_bytes", 0)) elif isinstance(diskio, dict): vals = diskio.values() if any(isinstance(v, dict) for v in vals): for v in vals: - if not isinstance(v, dict): - continue - read += v.get("rate/read_bytes", v.get("read_bytes", 0)) + if not isinstance(v, dict): continue + read += v.get("rate/read_bytes", v.get("read_bytes", 0)) write += v.get("rate/write_bytes", v.get("write_bytes", 0)) else: for k, v in diskio.items(): - if k.endswith(".read_bytes") or k.endswith(".read_bytes_rate_per_sec"): read += v + if k.endswith(".read_bytes") or k.endswith(".read_bytes_rate_per_sec"): read += v if k.endswith(".write_bytes") or k.endswith(".write_bytes_rate_per_sec"): write += v - read /= _MB - write /= _MB - # GPU + read /= _MB; write /= _MB + # GPU utilization gpu_src = j.get("gpu") or {} - gpu_util: float = np.nan - # List of per-GPU dicts - if isinstance(gpu_src, list) and gpu_src: - first = gpu_src[0] - if isinstance(first, dict): - gpu_util = first.get("proc") or first.get("utilization") or np.nan - # Nested dicts under keys + gpu_util = np.nan + if isinstance(gpu_src, list) and gpu_src and isinstance(gpu_src[0], dict): + gpu_util = gpu_src[0].get("proc") or gpu_src[0].get("utilization") or np.nan elif isinstance(gpu_src, dict): - # Check for nested dict values nested = [v for v in gpu_src.values() if isinstance(v, dict)] if nested: - gpu0 = nested[0] - gpu_util = gpu0.get("proc") or gpu0.get("utilization") or np.nan + gpu_util = nested[0].get("proc") or nested[0].get("utilization") or np.nan else: - # Flat mapping: look for .proc or .gpu_proc keys for k, v in gpu_src.items(): - if k.endswith(".proc") or k.endswith(".gpu_proc"): - gpu_util = v - break + if k.endswith(".proc") or k.endswith(".gpu_proc"): gpu_util = v; break return { - "cpu_pct": cpu.get("user", 0) + cpu.get("system", 0), - "mem_pct": mem.get("percent", np.nan), - "disk_read_mb": read, + "cpu_pct": cpu.get("user", 0) + cpu.get("system", 0), + "mem_pct": mem.get("percent", np.nan), + "disk_read_mb": read, "disk_write_mb": write, - "gpu_util": gpu_util, + "gpu_util": gpu_util, } # ───────────────────────── Utilities ────────────────────────────── -def start_glances() -> subprocess.Popen: +def start_glances(tmp_json: Path) -> subprocess.Popen: cmd = [ "glances", "-q", f"-t{REFRESH_SEC}", - "--export", "json", "--export-json-file", str(glances_tmp), + "--export", "json", "--export-json-file", str(tmp_json), "--disable-plugin", "all", "--enable-plugin", "cpu,mem,diskio,gpu", ] logging.info("Starting Glances: %s", " ".join(cmd)) @@ -108,22 +93,16 @@ def stop_glances(p: subprocess.Popen | None): def read_all(path: Path) -> List[dict]: - txt = path.read_text(encoding="utf-8").strip() - if not txt: - return [] + text = path.read_text(encoding="utf-8").strip() + if not text: return [] try: - data = json.loads(txt) + data = json.loads(text) return data if isinstance(data, list) else [data] except json.JSONDecodeError: out = [] - for line in txt.splitlines(): - line = line.strip() - if not line: - continue - try: - out.append(json.loads(line)) - except json.JSONDecodeError: - pass + for line in text.splitlines(): + try: out.append(json.loads(line)) + except: pass return out @@ -136,44 +115,36 @@ def parse_timestamp(r: dict) -> datetime: def new_fig(n: int): - cols = 2 if n > 2 else 1 - rows = (n + cols - 1) // cols + cols = 2 if n>2 else 1; rows = (n+cols-1)//cols fig, axes = plt.subplots(rows, cols, figsize=(6*cols, 3*rows), sharex=True) return fig, axes.flatten() if n>1 else [axes] def redraw(fig, axes, df: pd.DataFrame): - for ax in axes: - ax.clear() + for ax in axes: ax.clear() for ax, (title, cols) in zip(axes, PANELS): if set(cols).issubset(df.columns): df[cols].plot(ax=ax, lw=1.2) - ax.set_title(title, fontsize=9) - ax.grid(True, ls="--", alpha=0.5) - fig.tight_layout() - fig.canvas.draw_idle() + ax.set_title(title, fontsize=9); ax.grid(True, ls="--", alpha=0.5) + fig.tight_layout(); fig.canvas.draw_idle() # ───────────────────────── Loops ────────────────────────────────── -def live_loop_plot(log_fp: TextIO, history: Deque[dict]): - fig, axes = new_fig(len(PANELS)) - plt.ion(); fig.show() +def live_loop_plot(tmp_json: Path, log_fp: TextIO, history: Deque[dict], out_dir: Path): + fig, axes = new_fig(len(PANELS)); plt.ion(); fig.show() last_ts = None try: while True: - recs = read_all(glances_tmp) - updated = False + recs = read_all(tmp_json); updated=False for r in recs: ts = parse_timestamp(r) if last_ts is None or ts>last_ts: - row = {"ts": ts} | extract(r) - history.append(row) - last_ts = ts - updated = True - log_fp.write(json.dumps({"ts": str(ts), **extract(r)})+"\n") - log_fp.flush() + row={"ts":ts}|extract(r) + history.append(row); last_ts=ts; updated=True + log_fp.write(json.dumps({"ts":str(ts),**extract(r)})+"\n"); log_fp.flush() if updated: - df = pd.DataFrame(history).set_index("ts") - redraw(fig, axes, df) + df=pd.DataFrame(history).set_index("ts"); redraw(fig,axes,df) + if out_dir: + fig.savefig(out_dir/"live.png") plt.pause(0.1) except KeyboardInterrupt: pass @@ -181,68 +152,65 @@ def live_loop_plot(log_fp: TextIO, history: Deque[dict]): plt.ioff(); plt.close('all') -def live_loop_log(log_fp: TextIO, history: Deque[dict]): +def live_loop_log(tmp_json: Path, log_fp: TextIO, history: Deque[dict]): try: while True: - recs = read_all(glances_tmp) - for r in recs: - ts = parse_timestamp(r) - row = {"ts": ts} | extract(r) + for r in read_all(tmp_json): + ts=parse_timestamp(r) + row={"ts":ts}|extract(r) history.append(row) - log_fp.write(json.dumps({"ts": str(ts), **extract(r)})+"\n") - log_fp.flush() + log_fp.write(json.dumps({"ts":str(ts),**extract(r)})+"\n"); log_fp.flush() time.sleep(REFRESH_SEC) except KeyboardInterrupt: pass -def load_static(logfile: Path, history: Deque[dict]): - recs = read_all(logfile) - history.clear() - last_ts = None - for r in recs: - ts = parse_timestamp(r) - if last_ts is None or ts>last_ts: +def load_static(logfile: Path, history: Deque[dict], out_dir: Path, plot: bool): + for r in read_all(logfile): + ts=parse_timestamp(r) + if not history or ts>history[-1]["ts"]: if "cpu_pct" in r and "disk_read_mb" in r: row={"ts":ts} - row.update({k: r.get(k, np.nan) for _,cols in PANELS for k in cols}) + row.update({k:r.get(k,0) for _,cols in PANELS for k in cols}) else: row={"ts":ts}|extract(r) history.append(row) - last_ts = ts + if plot: + df=pd.DataFrame(history).set_index("ts") + fig,axes=new_fig(len(PANELS)); redraw(fig,axes,df) + fig.savefig(out_dir/"static.png"); plt.show() # ───────────────────────── Main ──────────────────────────────────── def main(): - ap = argparse.ArgumentParser(description="Simplified metrics tracing & logging") - ap.add_argument("-f","--file", default="outputs/metrics.log.ndjson", help="ND-JSON log (live or replay)") + ap=argparse.ArgumentParser(description="Metrics tracing with consolidated out-dir") + ap.add_argument("-o","--out-dir", default="outputs", type=Path, + help="Directory for logs, glances JSON, and plots") ap.add_argument("--live", action="store_true", help="Run live logging") ap.add_argument("--plot", action="store_true", help="Enable plotting") - ap.add_argument("--debug", action="store_true", help="Verbose logging") - args = ap.parse_args() + ap.add_argument("--debug",action="store_true",help="Verbose logging") + args=ap.parse_args() + + lvl=logging.DEBUG if args.debug else logging.INFO + logging.basicConfig(level=lvl,format="%(asctime)s %(levelname)s %(message)s",datefmt="%H:%M:%S") - lvl = logging.DEBUG if args.debug else logging.INFO - logging.basicConfig(level=lvl, format="%(asctime)s %(levelname)s %(message)s", datefmt="%H:%M:%S") + out_dir=args.out_dir.expanduser() + out_dir.mkdir(parents=True,exist_ok=True) + tmp_json=out_dir/"system_metrics.tmp.json" + logfile=out_dir/"metrics.log.ndjson" - logfile = Path(args.file).expanduser() - logfile.parent.mkdir(parents=True, exist_ok=True) - history: Deque[dict] = deque(maxlen=HISTORY_SECONDS//REFRESH_SEC) - proc = None + history:Deque[dict]=deque(maxlen=HISTORY_SECONDS//REFRESH_SEC) + proc=None try: if args.live: - proc = start_glances() - with logfile.open("a", encoding="utf-8") as log_fp: + proc=start_glances(tmp_json) + with logfile.open("a",encoding="utf-8") as log_fp: if args.plot: - live_loop_plot(log_fp, history) + live_loop_plot(tmp_json,log_fp,history,out_dir) else: - live_loop_log(log_fp, history) + live_loop_log(tmp_json,log_fp,history) else: - load_static(logfile, history) - df = pd.DataFrame(history).set_index("ts") - fig, axes = new_fig(len(PANELS)) - redraw(fig, axes, df) - plt.show() + load_static(logfile, history, out_dir, args.plot) finally: stop_glances(proc) -if __name__=="__main__": - main() +if __name__=="__main__": main() From 2d73d9a548b18c9fdf79c30fad06a9341df18cbc Mon Sep 17 00:00:00 2001 From: Jason Date: Sun, 20 Apr 2025 15:42:19 -0500 Subject: [PATCH 5/5] link checks --- pyproject.toml | 2 +- scripts/trace_metrics.py | 187 ++++++++++++++++++++++++--------------- 2 files changed, 117 insertions(+), 72 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 520eddf9..d2373c2a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,7 +36,7 @@ exclude = ''' [tool.isort] profile = "black" -skip = ["src/cpp/*", "build/*", "dist/*"] +skip = ["/Users/jason/projects/quake/src/cpp/third_party/", "build/", "dist/"] line_length = 120 [project.optional-dependencies] diff --git a/scripts/trace_metrics.py b/scripts/trace_metrics.py index eecec6d0..7be99409 100644 --- a/scripts/trace_metrics.py +++ b/scripts/trace_metrics.py @@ -1,21 +1,26 @@ #!/usr/bin/env python3 """ metrics_trace.py – metrics tracing, logging, and optional plotting/saving. -Direct, precise, minimal; consolidated output directory. """ -import argparse, subprocess, os, signal, time, json, logging +import argparse +import json +import logging +import os +import signal +import subprocess +import time from collections import deque from datetime import datetime, timezone from pathlib import Path -from typing import Deque, TextIO, Any, Dict, List, Optional +from typing import Any, Deque, Dict, List, TextIO +import matplotlib.pyplot as plt import numpy as np import pandas as pd -import matplotlib.pyplot as plt # ───────────────────────── Configuration ────────────────────────── -REFRESH_SEC = 1 +REFRESH_SEC = 1 HISTORY_SECONDS = 3600 PANELS = [ ("CPU %", ["cpu_pct"]), @@ -25,6 +30,7 @@ ] _MB = 1_048_576.0 + # ────────────────────── Metric Extraction ────────────────────────── def extract(j: Dict[str, Any]) -> Dict[str, float]: cpu = j.get("cpu", {}) @@ -34,21 +40,26 @@ def extract(j: Dict[str, Any]) -> Dict[str, float]: read = write = 0.0 if isinstance(diskio, list): for d in diskio: - if not isinstance(d, dict): continue - read += d.get("rate/read_bytes", d.get("read_bytes", 0)) + if not isinstance(d, dict): + continue + read += d.get("rate/read_bytes", d.get("read_bytes", 0)) write += d.get("rate/write_bytes", d.get("write_bytes", 0)) elif isinstance(diskio, dict): vals = diskio.values() if any(isinstance(v, dict) for v in vals): for v in vals: - if not isinstance(v, dict): continue - read += v.get("rate/read_bytes", v.get("read_bytes", 0)) + if not isinstance(v, dict): + continue + read += v.get("rate/read_bytes", v.get("read_bytes", 0)) write += v.get("rate/write_bytes", v.get("write_bytes", 0)) else: for k, v in diskio.items(): - if k.endswith(".read_bytes") or k.endswith(".read_bytes_rate_per_sec"): read += v - if k.endswith(".write_bytes") or k.endswith(".write_bytes_rate_per_sec"): write += v - read /= _MB; write /= _MB + if k.endswith(".read_bytes") or k.endswith(".read_bytes_rate_per_sec"): + read += v + if k.endswith(".write_bytes") or k.endswith(".write_bytes_rate_per_sec"): + write += v + read /= _MB + write /= _MB # GPU utilization gpu_src = j.get("gpu") or {} gpu_util = np.nan @@ -60,25 +71,35 @@ def extract(j: Dict[str, Any]) -> Dict[str, float]: gpu_util = nested[0].get("proc") or nested[0].get("utilization") or np.nan else: for k, v in gpu_src.items(): - if k.endswith(".proc") or k.endswith(".gpu_proc"): gpu_util = v; break + if k.endswith(".proc") or k.endswith(".gpu_proc"): + gpu_util = v + break return { - "cpu_pct": cpu.get("user", 0) + cpu.get("system", 0), - "mem_pct": mem.get("percent", np.nan), - "disk_read_mb": read, + "cpu_pct": cpu.get("user", 0) + cpu.get("system", 0), + "mem_pct": mem.get("percent", np.nan), + "disk_read_mb": read, "disk_write_mb": write, - "gpu_util": gpu_util, + "gpu_util": gpu_util, } + # ───────────────────────── Utilities ────────────────────────────── def start_glances(tmp_json: Path) -> subprocess.Popen: cmd = [ - "glances", "-q", f"-t{REFRESH_SEC}", - "--export", "json", "--export-json-file", str(tmp_json), - "--disable-plugin", "all", "--enable-plugin", "cpu,mem,diskio,gpu", + "glances", + "-q", + f"-t{REFRESH_SEC}", + "--export", + "json", + "--export-json-file", + str(tmp_json), + "--disable-plugin", + "all", + "--enable-plugin", + "cpu,mem,diskio,gpu", ] logging.info("Starting Glances: %s", " ".join(cmd)) - p = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, - text=True, preexec_fn=os.setsid) + p = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, text=True, preexec_fn=os.setsid) time.sleep(1.5) if p.poll() is not None: raise RuntimeError(p.stderr.read()) @@ -88,21 +109,27 @@ def start_glances(tmp_json: Path) -> subprocess.Popen: def stop_glances(p: subprocess.Popen | None): if p and p.poll() is None: os.killpg(p.pid, signal.SIGTERM) - try: p.wait(3) - except subprocess.TimeoutExpired: os.killpg(p.pid, signal.SIGKILL) + try: + p.wait(3) + except subprocess.TimeoutExpired: + os.killpg(p.pid, signal.SIGKILL) def read_all(path: Path) -> List[dict]: text = path.read_text(encoding="utf-8").strip() - if not text: return [] + if not text: + return [] try: data = json.loads(text) return data if isinstance(data, list) else [data] except json.JSONDecodeError: out = [] for line in text.splitlines(): - try: out.append(json.loads(line)) - except: pass + try: + out.append(json.loads(line)) + except Exception as e: + logging.error("Error parsing JSON line: %s", line) + logging.exception(e) return out @@ -115,51 +142,65 @@ def parse_timestamp(r: dict) -> datetime: def new_fig(n: int): - cols = 2 if n>2 else 1; rows = (n+cols-1)//cols - fig, axes = plt.subplots(rows, cols, figsize=(6*cols, 3*rows), sharex=True) - return fig, axes.flatten() if n>1 else [axes] + cols = 2 if n > 2 else 1 + rows = (n + cols - 1) // cols + fig, axes = plt.subplots(rows, cols, figsize=(6 * cols, 3 * rows), sharex=True) + return fig, axes.flatten() if n > 1 else [axes] def redraw(fig, axes, df: pd.DataFrame): - for ax in axes: ax.clear() + for ax in axes: + ax.clear() for ax, (title, cols) in zip(axes, PANELS): if set(cols).issubset(df.columns): df[cols].plot(ax=ax, lw=1.2) - ax.set_title(title, fontsize=9); ax.grid(True, ls="--", alpha=0.5) - fig.tight_layout(); fig.canvas.draw_idle() + ax.set_title(title, fontsize=9) + ax.grid(True, ls="--", alpha=0.5) + fig.tight_layout() + fig.canvas.draw_idle() + # ───────────────────────── Loops ────────────────────────────────── def live_loop_plot(tmp_json: Path, log_fp: TextIO, history: Deque[dict], out_dir: Path): - fig, axes = new_fig(len(PANELS)); plt.ion(); fig.show() + fig, axes = new_fig(len(PANELS)) + plt.ion() + fig.show() last_ts = None try: while True: - recs = read_all(tmp_json); updated=False + recs = read_all(tmp_json) + updated = False for r in recs: ts = parse_timestamp(r) - if last_ts is None or ts>last_ts: - row={"ts":ts}|extract(r) - history.append(row); last_ts=ts; updated=True - log_fp.write(json.dumps({"ts":str(ts),**extract(r)})+"\n"); log_fp.flush() + if last_ts is None or ts > last_ts: + row = {"ts": ts} | extract(r) + history.append(row) + last_ts = ts + updated = True + log_fp.write(json.dumps({"ts": str(ts), **extract(r)}) + "\n") + log_fp.flush() if updated: - df=pd.DataFrame(history).set_index("ts"); redraw(fig,axes,df) + df = pd.DataFrame(history).set_index("ts") + redraw(fig, axes, df) if out_dir: - fig.savefig(out_dir/"live.png") + fig.savefig(out_dir / "live.png") plt.pause(0.1) except KeyboardInterrupt: pass finally: - plt.ioff(); plt.close('all') + plt.ioff() + plt.close("all") def live_loop_log(tmp_json: Path, log_fp: TextIO, history: Deque[dict]): try: while True: for r in read_all(tmp_json): - ts=parse_timestamp(r) - row={"ts":ts}|extract(r) + ts = parse_timestamp(r) + row = {"ts": ts} | extract(r) history.append(row) - log_fp.write(json.dumps({"ts":str(ts),**extract(r)})+"\n"); log_fp.flush() + log_fp.write(json.dumps({"ts": str(ts), **extract(r)}) + "\n") + log_fp.flush() time.sleep(REFRESH_SEC) except KeyboardInterrupt: pass @@ -167,50 +208,54 @@ def live_loop_log(tmp_json: Path, log_fp: TextIO, history: Deque[dict]): def load_static(logfile: Path, history: Deque[dict], out_dir: Path, plot: bool): for r in read_all(logfile): - ts=parse_timestamp(r) - if not history or ts>history[-1]["ts"]: + ts = parse_timestamp(r) + if not history or ts > history[-1]["ts"]: if "cpu_pct" in r and "disk_read_mb" in r: - row={"ts":ts} - row.update({k:r.get(k,0) for _,cols in PANELS for k in cols}) + row = {"ts": ts} + row.update({k: r.get(k, 0) for _, cols in PANELS for k in cols}) else: - row={"ts":ts}|extract(r) + row = {"ts": ts} | extract(r) history.append(row) if plot: - df=pd.DataFrame(history).set_index("ts") - fig,axes=new_fig(len(PANELS)); redraw(fig,axes,df) - fig.savefig(out_dir/"static.png"); plt.show() + df = pd.DataFrame(history).set_index("ts") + fig, axes = new_fig(len(PANELS)) + redraw(fig, axes, df) + fig.savefig(out_dir / "static.png") + plt.show() + # ───────────────────────── Main ──────────────────────────────────── def main(): - ap=argparse.ArgumentParser(description="Metrics tracing with consolidated out-dir") - ap.add_argument("-o","--out-dir", default="outputs", type=Path, - help="Directory for logs, glances JSON, and plots") + ap = argparse.ArgumentParser(description="Metrics tracing with consolidated out-dir") + ap.add_argument("-o", "--out-dir", default="outputs", type=Path, help="Directory for logs, glances JSON, and plots") ap.add_argument("--live", action="store_true", help="Run live logging") ap.add_argument("--plot", action="store_true", help="Enable plotting") - ap.add_argument("--debug",action="store_true",help="Verbose logging") - args=ap.parse_args() + ap.add_argument("--debug", action="store_true", help="Verbose logging") + args = ap.parse_args() - lvl=logging.DEBUG if args.debug else logging.INFO - logging.basicConfig(level=lvl,format="%(asctime)s %(levelname)s %(message)s",datefmt="%H:%M:%S") + lvl = logging.DEBUG if args.debug else logging.INFO + logging.basicConfig(level=lvl, format="%(asctime)s %(levelname)s %(message)s", datefmt="%H:%M:%S") - out_dir=args.out_dir.expanduser() - out_dir.mkdir(parents=True,exist_ok=True) - tmp_json=out_dir/"system_metrics.tmp.json" - logfile=out_dir/"metrics.log.ndjson" + out_dir = args.out_dir.expanduser() + out_dir.mkdir(parents=True, exist_ok=True) + tmp_json = out_dir / "system_metrics.tmp.json" + logfile = out_dir / "metrics.log.ndjson" - history:Deque[dict]=deque(maxlen=HISTORY_SECONDS//REFRESH_SEC) - proc=None + history: Deque[dict] = deque(maxlen=HISTORY_SECONDS // REFRESH_SEC) + proc = None try: if args.live: - proc=start_glances(tmp_json) - with logfile.open("a",encoding="utf-8") as log_fp: + proc = start_glances(tmp_json) + with logfile.open("a", encoding="utf-8") as log_fp: if args.plot: - live_loop_plot(tmp_json,log_fp,history,out_dir) + live_loop_plot(tmp_json, log_fp, history, out_dir) else: - live_loop_log(tmp_json,log_fp,history) + live_loop_log(tmp_json, log_fp, history) else: load_static(logfile, history, out_dir, args.plot) finally: stop_glances(proc) -if __name__=="__main__": main() + +if __name__ == "__main__": + main()