Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
name: CI

on:
push:
branches:
- main
pull_request:
workflow_dispatch:

jobs:
lint:
name: Lint Python
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11"

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install black flake8

- name: Check formatting with black
run: black --check --verbose --line-length 79 bin/bin/tsd-plot.py src/tsd_plot tests python

- name: Lint with flake8
run: |
flake8 --tee --output-file flake8.report bin/bin/tsd-plot.py src/tsd_plot tests python

- name: Upload flake8 report
if: always()
uses: actions/upload-artifact@v4
with:
name: flake8-report
path: flake8.report

tests:
name: Run Tests
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11"

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install pytest

- name: Run pytest
run: pytest
85 changes: 67 additions & 18 deletions python/bandwidth_tool/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Utilities for inspecting recorded bandwidth measurements."""

from __future__ import annotations

import socket
Expand Down Expand Up @@ -79,7 +80,9 @@ class Measurement:
ssid: Optional[str] = None

def as_row(self) -> Sequence[str]:
dt = datetime.fromtimestamp(self.timestamp).strftime("%Y-%m-%d %H:%M:%S")
dt = datetime.fromtimestamp(self.timestamp).strftime(
"%Y-%m-%d %H:%M:%S"
)
return (
dt,
format_number(self.upload),
Expand Down Expand Up @@ -149,7 +152,9 @@ def load_measurements(directory: Path) -> List[Measurement]:
pings = _parse_numeric_file(paths["ping"])
ssids = _parse_text_file(paths["ssid"])

timestamps = sorted({*uploads.keys(), *downloads.keys(), *pings.keys(), *ssids.keys()})
timestamps = sorted(
{*uploads.keys(), *downloads.keys(), *pings.keys(), *ssids.keys()}
)
measurements: List[Measurement] = []
for ts in timestamps:
measurements.append(
Expand All @@ -164,7 +169,9 @@ def load_measurements(directory: Path) -> List[Measurement]:
return measurements


def limit_measurements(measurements: Sequence[Measurement], limit: int) -> List[Measurement]:
def limit_measurements(
measurements: Sequence[Measurement], limit: int
) -> List[Measurement]:
if limit <= 0:
return []
return list(measurements[-limit:])
Expand All @@ -173,10 +180,14 @@ def limit_measurements(measurements: Sequence[Measurement], limit: int) -> List[
def render_table(measurements: Iterable[Measurement]) -> str:
rows = [measurement.as_row() for measurement in measurements]
headers = ("datetime", "upload MiBps", "download MiBps", "ping ms", "ssid")
return format_table(headers, rows, colalign=("left", "right", "right", "right", "left"))
return format_table(
headers, rows, colalign=("left", "right", "right", "right", "left")
)


def _collect_values(measurements: Iterable[Measurement], attribute: str) -> List[float]:
def _collect_values(
measurements: Iterable[Measurement], attribute: str
) -> List[float]:
values: List[float] = []
for measurement in measurements:
value = getattr(measurement, attribute)
Expand All @@ -202,7 +213,9 @@ def _determine_edges(values: Sequence[float], bins: int = 10) -> List[float]:
return edges


def _histogram_from_edges(values: Sequence[float], edges: Sequence[float]) -> List[int]:
def _histogram_from_edges(
values: Sequence[float], edges: Sequence[float]
) -> List[int]:
counts = [0 for _ in range(len(edges) - 1)]
if not values:
return counts
Expand Down Expand Up @@ -231,7 +244,9 @@ def _format_range(start: float, end: float, is_last: bool) -> str:
return f"[{start:7.2f}, {end:7.2f}{right}"


def _bar(count: int, max_count: int, width: int, *, reverse: bool = False) -> str:
def _bar(
count: int, max_count: int, width: int, *, reverse: bool = False
) -> str:
if max_count <= 0 or count <= 0:
bar = ""
else:
Expand All @@ -242,7 +257,11 @@ def _bar(count: int, max_count: int, width: int, *, reverse: bool = False) -> st
return bar.ljust(width)


def _render_violin_text(edges: Sequence[float], upload_counts: Sequence[int], download_counts: Sequence[int]) -> List[str]:
def _render_violin_text(
edges: Sequence[float],
upload_counts: Sequence[int],
download_counts: Sequence[int],
) -> List[str]:
width = 16
lines = ["Upload/Download speeds (MiBps)"]
lines.append("upload".rjust(width) + " │ " + "download".ljust(width))
Expand All @@ -256,7 +275,9 @@ def _render_violin_text(edges: Sequence[float], upload_counts: Sequence[int], do
return lines


def _render_ping_text(edges: Sequence[float], counts: Sequence[int]) -> List[str]:
def _render_ping_text(
edges: Sequence[float], counts: Sequence[int]
) -> List[str]:
width = 32
lines = ["Ping times (ms)"]
max_count = max([*counts, 0])
Expand All @@ -268,7 +289,9 @@ def _render_ping_text(edges: Sequence[float], counts: Sequence[int]) -> List[str
return lines


def render_stats_text(measurements: Sequence[Measurement], bins: int = 10) -> str:
def render_stats_text(
measurements: Sequence[Measurement], bins: int = 10
) -> str:
uploads = _collect_values(measurements, "upload")
downloads = _collect_values(measurements, "download")
pings = _collect_values(measurements, "ping")
Expand All @@ -279,7 +302,9 @@ def render_stats_text(measurements: Sequence[Measurement], bins: int = 10) -> st
edges = _determine_edges([*uploads, *downloads], bins=bins)
upload_counts = _histogram_from_edges(uploads, edges)
download_counts = _histogram_from_edges(downloads, edges)
lines.extend(_render_violin_text(edges, upload_counts, download_counts))
lines.extend(
_render_violin_text(edges, upload_counts, download_counts)
)
else:
lines.append("No upload/download data available.")

Expand All @@ -295,11 +320,17 @@ def render_stats_text(measurements: Sequence[Measurement], bins: int = 10) -> st
return "\n".join(lines)


def render_stats_graphical(measurements: Sequence[Measurement], bins: int = 10) -> None:
def render_stats_graphical(
measurements: Sequence[Measurement], bins: int = 10
) -> None:
try:
import matplotlib.pyplot as plt # type: ignore
except ImportError as exc: # pragma: no cover - depends on optional dependency
raise RuntimeError("Matplotlib is required for graphical statistics") from exc
except (
ImportError
) as exc: # pragma: no cover - depends on optional dependency
raise RuntimeError(
"Matplotlib is required for graphical statistics"
) from exc

uploads = _collect_values(measurements, "upload")
downloads = _collect_values(measurements, "download")
Expand All @@ -313,8 +344,22 @@ def render_stats_graphical(measurements: Sequence[Measurement], bins: int = 10)
centers = _bin_centers(edges)
heights = [edges[i + 1] - edges[i] for i in range(len(edges) - 1)]

ax_speed.barh(centers, upload_counts, height=heights, align="center", color="tab:blue", label="Upload")
ax_speed.barh(centers, [-count for count in download_counts], height=heights, align="center", color="tab:orange", label="Download")
ax_speed.barh(
centers,
upload_counts,
height=heights,
align="center",
color="tab:blue",
label="Upload",
)
ax_speed.barh(
centers,
[-count for count in download_counts],
height=heights,
align="center",
color="tab:orange",
label="Download",
)
ax_speed.axvline(0, color="black", linewidth=0.8)
ax_speed.set_xlabel("Sample count")
ax_speed.set_ylabel("MiBps")
Expand All @@ -324,7 +369,9 @@ def render_stats_graphical(measurements: Sequence[Measurement], bins: int = 10)
ping_edges = _determine_edges(pings, bins=bins)
ping_counts = _histogram_from_edges(pings, ping_edges)
ping_centers = _bin_centers(ping_edges)
widths = [ping_edges[i + 1] - ping_edges[i] for i in range(len(ping_edges) - 1)]
widths = [
ping_edges[i + 1] - ping_edges[i] for i in range(len(ping_edges) - 1)
]

ax_ping.bar(ping_centers, ping_counts, width=widths, color="tab:green")
ax_ping.set_xlabel("Ping (ms)")
Expand All @@ -335,7 +382,9 @@ def render_stats_graphical(measurements: Sequence[Measurement], bins: int = 10)
plt.show()


def render_stats(measurements: Sequence[Measurement], *, text: bool, bins: int = 10) -> Optional[str]:
def render_stats(
measurements: Sequence[Measurement], *, text: bool, bins: int = 10
) -> Optional[str]:
if text:
return render_stats_text(measurements, bins=bins)
render_stats_graphical(measurements, bins=bins)
Expand Down
20 changes: 16 additions & 4 deletions python/bandwidth_tool/tabular.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
or test time. The :func:`format_table` helper aligns text in a way that is
compatible with traditional command line utilities such as ``column``.
"""

from __future__ import annotations

from typing import Iterable, List, Sequence
Expand All @@ -22,7 +23,12 @@ def _align_cell(text: str, width: int, align: str) -> str:
return text.ljust(width)


def format_table(headers: Sequence[str], rows: Iterable[Row], *, colalign: Alignment | None = None) -> str:
def format_table(
headers: Sequence[str],
rows: Iterable[Row],
*,
colalign: Alignment | None = None,
) -> str:
"""Return a string representing a table with aligned columns.

Parameters
Expand All @@ -49,17 +55,23 @@ def format_table(headers: Sequence[str], rows: Iterable[Row], *, colalign: Align
widths = [len(headers[i]) for i in range(num_cols)]
for row in row_list:
if len(row) != num_cols:
raise ValueError("row has different number of columns than headers")
raise ValueError(
"row has different number of columns than headers"
)
for i, cell in enumerate(row):
widths[i] = max(widths[i], len(cell))

header_line = " ".join(
_align_cell(headers[i], widths[i], colalign[i]) for i in range(num_cols)
_align_cell(headers[i], widths[i], colalign[i])
for i in range(num_cols)
)
divider = " ".join("-" * widths[i] for i in range(num_cols))

body_lines = [
" ".join(_align_cell(cell, widths[i], colalign[i]) for i, cell in enumerate(row))
" ".join(
_align_cell(cell, widths[i], colalign[i])
for i, cell in enumerate(row)
)
for row in row_list
]

Expand Down