diff --git a/testflows/github/hetzner/runners/estimate.py b/testflows/github/hetzner/runners/estimate.py index 1bd1464..1647ce5 100644 --- a/testflows/github/hetzner/runners/estimate.py +++ b/testflows/github/hetzner/runners/estimate.py @@ -12,7 +12,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import sys import math +import yaml import types import github @@ -20,6 +22,7 @@ from .config import Config from .config import check_prices from .scale_up import get_runner_server_type_and_location +from .streamingyaml import StreamingYAMLWriter from datetime import timedelta from hcloud import Client @@ -143,7 +146,12 @@ def login_and_get_prices( def get_estimate_for_jobs( - action, jobs, server_prices, ipv4_price, ipv6_price, indent=" " + writer: StreamingYAMLWriter, + jobs: list[WorkflowJob], + server_prices: dict[str, dict[str, float]], + ipv4_price: float, + ipv6_price: float, + indent: int = 2, ): """Collect estimate for the given jobs.""" @@ -156,14 +164,27 @@ def get_estimate_for_jobs( duration = job.completed_at - job.started_at runner_name = job.raw_data["runner_name"] - action.note( - f"{indent}Job {job.id} {job.name} {job.status} {duration} {runner_name}" - ) + job_entry = { + "name": job.name, + "id": job.id, + "status": job.status, + "duration": str(duration), + "url": job.url, + "run_id": job.run_id, + "run_url": job.run_url, + "runner_id": job.raw_data["runner_id"], + "runner_name": runner_name, + "runner_group_id": job.raw_data["runner_group_id"], + "runner_group_name": job.raw_data["runner_group_name"], + "workflow_name": job.raw_data["workflow_name"], + } price = get_runner_server_price_per_second( server_prices, runner_name, ipv4_price, ipv6_price ) + job_entry["estimate"] = {"worst": None, "best": None} + if price is not None: job_best_estimate = duration.total_seconds() * price job_worst_estimate = ( @@ -173,16 +194,19 @@ def get_estimate_for_jobs( best_estimate += job_best_estimate worst_estimate += job_worst_estimate - action.note(f"{indent} €{job_worst_estimate:.6f}-€{job_best_estimate:.6f}") + job_entry["estimate"] = { + "worst": job_worst_estimate, + "best": job_best_estimate, + } else: unknown_jobs += 1 unknown_duration += duration - action.note(f"{indent} unknown price") - total_duration += duration + writer.add_list_element(value=job_entry) + return ( i, total_duration, @@ -199,9 +223,12 @@ def workflow_run( repo: Repository = None, workflow_run: WorkflowRun = None, server_prices=None, + writer=None, ): """Estimate cost for a given workflow run.""" run_attempt = None + if writer is None: + writer = StreamingYAMLWriter(stream=sys.stdout, indent=0) if workflow_run is None: run_id = args.id @@ -216,15 +243,18 @@ def workflow_run( workflow_run: WorkflowRun = repo.get_workflow_run(run_id) workflow_run = extend_workflow_run(workflow_run) - action.note(f"Workflow name: {workflow_run.name}") + workflow_entry = {"name": workflow_run.name, "id": workflow_run.id} if run_attempt is None: - action.note(f"Run attempt: {workflow_run.run_attempt}") + workflow_entry["attempt"] = workflow_run.run_attempt jobs = workflow_run.jobs() else: - action.note(f"Run attempt: {run_attempt}") + workflow_entry["attempt"] = run_attempt jobs = workflow_run.attempt_jobs(run_attempt) + _, list_value_writer = writer.add_list_element(workflow_entry) + jobs_writer = list_value_writer.add_key("jobs") + ( count, total_duration, @@ -233,15 +263,18 @@ def workflow_run( worst_estimate, best_estimate, ) = get_estimate_for_jobs( - action, jobs, server_prices, args.ipv4_price, args.ipv6_price + jobs_writer, jobs, server_prices, args.ipv4_price, args.ipv6_price, indent=2 ) - action.note(f"Total jobs: {count}") - action.note(f"Total duration: {total_duration}") - action.note(f"Unknown jobs: {unknown_jobs}") - action.note(f"Unknown duration: {unknown_duration}") - action.note(f"Worst estimate: €{worst_estimate:.6f}") - action.note(f"Best estimate: €{best_estimate:.6f}") + workflow_totals = {} + workflow_totals["total_jobs"] = count + workflow_totals["total_duration"] = str(total_duration) + workflow_totals["unknown_jobs"] = unknown_jobs + workflow_totals["unknown_duration"] = str(unknown_duration) + workflow_totals["worst_estimate"] = worst_estimate + workflow_totals["best_estimate"] = best_estimate + + list_value_writer.add_value(workflow_totals) def workflow_runs(args, config: Config): @@ -265,12 +298,14 @@ def workflow_runs(args, config: Config): with Action(f"Getting workflow runs") as action: for run in repo.get_workflow_runs(**runs_args): run = extend_workflow_run(run) + writer = StreamingYAMLWriter(stream=sys.stdout, indent=0) workflow_run( args=args, config=config, repo=repo, workflow_run=run, server_prices=server_prices, + writer=writer, ) try: input("✋ Press any key to continue (Ctrl-D to abort)...") @@ -284,10 +319,11 @@ def workflow_job(args, config: Config): repo, server_prices = login_and_get_prices(args, config) repo = extend_repository(repo) + writer = StreamingYAMLWriter(stream=sys.stdout, indent=0) with Action(f"Getting workflow job id {args.id}") as action: workflow_job = repo.get_workflow_job(args.id) get_estimate_for_jobs( - action, [workflow_job], server_prices, args.ipv4_price, args.ipv6_price + writer, [workflow_job], server_prices, args.ipv4_price, args.ipv6_price ) diff --git a/testflows/github/hetzner/runners/scale_down.py b/testflows/github/hetzner/runners/scale_down.py index 81036b3..6249ec7 100644 --- a/testflows/github/hetzner/runners/scale_down.py +++ b/testflows/github/hetzner/runners/scale_down.py @@ -308,9 +308,9 @@ def scale_down( observed_interval=current_interval, ) powered_off_servers[server.name].server = server - powered_off_servers[ - server.name - ].observed_interval = current_interval + powered_off_servers[server.name].observed_interval = ( + current_interval + ) elif server.status == server.STATUS_RUNNING: if not any( @@ -332,9 +332,9 @@ def scale_down( observed_interval=current_interval, ) zombie_servers[server.name].server = server - zombie_servers[ - server.name - ].observed_interval = current_interval + zombie_servers[server.name].observed_interval = ( + current_interval + ) else: zombie_servers.pop(server.name, None) @@ -376,9 +376,9 @@ def scale_down( observed_interval=current_interval, ) unused_runners[runner.name].runner = runner - unused_runners[ - runner.name - ].observed_interval = current_interval + unused_runners[runner.name].observed_interval = ( + current_interval + ) with Action( "Checking for scale up failures", level=logging.DEBUG, interval=interval @@ -395,20 +395,20 @@ def scale_down( server_name=scaleup_failure.server_name, interval=interval, ): - scaleup_failures[ - scaleup_failure.server_name - ] = ScaleUpFailure( - time=scaleup_failure.time, - labels=scaleup_failure.labels, - server_name=scaleup_failure.server_name, - exception=scaleup_failure.exception, - count=1, - observed_interval=current_interval, + scaleup_failures[scaleup_failure.server_name] = ( + ScaleUpFailure( + time=scaleup_failure.time, + labels=scaleup_failure.labels, + server_name=scaleup_failure.server_name, + exception=scaleup_failure.exception, + count=1, + observed_interval=current_interval, + ) ) else: - scaleup_failures[ - scaleup_failure.server_name - ].exception = scaleup_failure.exception + scaleup_failures[scaleup_failure.server_name].exception = ( + scaleup_failure.exception + ) scaleup_failures[scaleup_failure.server_name].count += 1 scaleup_failures[ scaleup_failure.server_name diff --git a/testflows/github/hetzner/runners/streamingyaml.py b/testflows/github/hetzner/runners/streamingyaml.py new file mode 100644 index 0000000..d397cfd --- /dev/null +++ b/testflows/github/hetzner/runners/streamingyaml.py @@ -0,0 +1,83 @@ +# Copyright 2023 Katteli Inc. +# TestFlows.com Open-Source Software Testing Framework (http://testflows.com) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import yaml +import textwrap + + +def float_to_str(f, precision): + """ + Convert the given float to a string, + without resorting to scientific notation + """ + return format(f, f".{precision}f") + + +class Dumper(yaml.SafeDumper): + """Dumper with custom float precision.""" + + def represent_float(self, data, precision=6): + if data != data or (data == 0.0 and data == 1.0): + value = ".nan" + elif data == self.inf_value: + value = ".inf" + elif data == -self.inf_value: + value = "-.inf" + else: + value = float_to_str(data, precision=precision).lower() + return self.represent_scalar("tag:yaml.org,2002:float", value) + + +Dumper.add_representer(float, Dumper.represent_float) + + +class StreamingYAMLWriter: + """Streaming YAML writer.""" + + def __init__(self, stream, indent=0): + self.stream = stream + self.indent = indent + + def _write(self, value): + """Dump value to stream.""" + s = yaml.dump( + value, sort_keys=False, indent=2, Dumper=Dumper, allow_unicode=True + ) + s = textwrap.indent(s, prefix=" " * self.indent) + self.stream.write(s) + self.stream.flush() + + def add_value(self, value): + """Add '{value}\n'.""" + self._write(value) + return self + + def add_key_value(self, key, value): + """Add '{key}: {value}\n'.""" + self._write({key: value}) + return self + + def add_list_element(self, value): + """Add '- {value}\n'.""" + self._write([value]) + return self, StreamingYAMLWriter(self.stream, indent=self.indent + 2) + + def add_key(self, key): + """Add key '{key}:\n'.""" + s = yaml.dump({key: None}, sort_keys=False, Dumper=Dumper, allow_unicode=True) + s = s.rsplit(": null\n", 1)[0] + ":\n" + s = textwrap.indent(s, prefix=" " * self.indent) + self.stream.write(s) + self.stream.flush() + return StreamingYAMLWriter(stream=self.stream, indent=self.indent + 2)