Skip to content

Commit

Permalink
Updating estimate to output data in YAML format.
Browse files Browse the repository at this point in the history
  • Loading branch information
Vitaliy Zakaznikov committed Mar 16, 2024
1 parent f36538e commit 2373a11
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 39 deletions.
72 changes: 54 additions & 18 deletions testflows/github/hetzner/runners/estimate.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import math
import yaml
import types
import github

from .actions import Action
from .config import Config
from .config import check_prices
from .scale_up import get_runner_server_type_and_location
from .streamingyaml import StreamingYAMLWriter

from datetime import timedelta
from hcloud import Client
Expand Down Expand Up @@ -143,7 +146,12 @@ def login_and_get_prices(


def get_estimate_for_jobs(
action, jobs, server_prices, ipv4_price, ipv6_price, indent=" "
writer: StreamingYAMLWriter,
jobs: list[WorkflowJob],
server_prices: dict[str, dict[str, float]],
ipv4_price: float,
ipv6_price: float,
indent: int = 2,
):
"""Collect estimate for the given jobs."""

Expand All @@ -156,14 +164,27 @@ def get_estimate_for_jobs(
duration = job.completed_at - job.started_at
runner_name = job.raw_data["runner_name"]

action.note(
f"{indent}Job {job.id} {job.name} {job.status} {duration} {runner_name}"
)
job_entry = {
"name": job.name,
"id": job.id,
"status": job.status,
"duration": str(duration),
"url": job.url,
"run_id": job.run_id,
"run_url": job.run_url,
"runner_id": job.raw_data["runner_id"],
"runner_name": runner_name,
"runner_group_id": job.raw_data["runner_group_id"],
"runner_group_name": job.raw_data["runner_group_name"],
"workflow_name": job.raw_data["workflow_name"],
}

price = get_runner_server_price_per_second(
server_prices, runner_name, ipv4_price, ipv6_price
)

job_entry["estimate"] = {"worst": None, "best": None}

if price is not None:
job_best_estimate = duration.total_seconds() * price
job_worst_estimate = (
Expand All @@ -173,16 +194,19 @@ def get_estimate_for_jobs(
best_estimate += job_best_estimate
worst_estimate += job_worst_estimate

action.note(f"{indent}{job_worst_estimate:.6f}-€{job_best_estimate:.6f}")
job_entry["estimate"] = {
"worst": job_worst_estimate,
"best": job_best_estimate,
}

else:
unknown_jobs += 1
unknown_duration += duration

action.note(f"{indent} unknown price")

total_duration += duration

writer.add_list_element(value=job_entry)

return (
i,
total_duration,
Expand All @@ -199,9 +223,12 @@ def workflow_run(
repo: Repository = None,
workflow_run: WorkflowRun = None,
server_prices=None,
writer=None,
):
"""Estimate cost for a given workflow run."""
run_attempt = None
if writer is None:
writer = StreamingYAMLWriter(stream=sys.stdout, indent=0)

if workflow_run is None:
run_id = args.id
Expand All @@ -216,15 +243,18 @@ def workflow_run(
workflow_run: WorkflowRun = repo.get_workflow_run(run_id)
workflow_run = extend_workflow_run(workflow_run)

action.note(f"Workflow name: {workflow_run.name}")
workflow_entry = {"name": workflow_run.name, "id": workflow_run.id}

if run_attempt is None:
action.note(f"Run attempt: {workflow_run.run_attempt}")
workflow_entry["attempt"] = workflow_run.run_attempt
jobs = workflow_run.jobs()
else:
action.note(f"Run attempt: {run_attempt}")
workflow_entry["attempt"] = run_attempt
jobs = workflow_run.attempt_jobs(run_attempt)

_, list_value_writer = writer.add_list_element(workflow_entry)
jobs_writer = list_value_writer.add_key("jobs")

(
count,
total_duration,
Expand All @@ -233,15 +263,18 @@ def workflow_run(
worst_estimate,
best_estimate,
) = get_estimate_for_jobs(
action, jobs, server_prices, args.ipv4_price, args.ipv6_price
jobs_writer, jobs, server_prices, args.ipv4_price, args.ipv6_price, indent=2
)

action.note(f"Total jobs: {count}")
action.note(f"Total duration: {total_duration}")
action.note(f"Unknown jobs: {unknown_jobs}")
action.note(f"Unknown duration: {unknown_duration}")
action.note(f"Worst estimate: €{worst_estimate:.6f}")
action.note(f"Best estimate: €{best_estimate:.6f}")
workflow_totals = {}
workflow_totals["total_jobs"] = count
workflow_totals["total_duration"] = str(total_duration)
workflow_totals["unknown_jobs"] = unknown_jobs
workflow_totals["unknown_duration"] = str(unknown_duration)
workflow_totals["worst_estimate"] = worst_estimate
workflow_totals["best_estimate"] = best_estimate

list_value_writer.add_value(workflow_totals)


def workflow_runs(args, config: Config):
Expand All @@ -265,12 +298,14 @@ def workflow_runs(args, config: Config):
with Action(f"Getting workflow runs") as action:
for run in repo.get_workflow_runs(**runs_args):
run = extend_workflow_run(run)
writer = StreamingYAMLWriter(stream=sys.stdout, indent=0)
workflow_run(
args=args,
config=config,
repo=repo,
workflow_run=run,
server_prices=server_prices,
writer=writer,
)
try:
input("✋ Press any key to continue (Ctrl-D to abort)...")
Expand All @@ -284,10 +319,11 @@ def workflow_job(args, config: Config):

repo, server_prices = login_and_get_prices(args, config)
repo = extend_repository(repo)
writer = StreamingYAMLWriter(stream=sys.stdout, indent=0)

with Action(f"Getting workflow job id {args.id}") as action:
workflow_job = repo.get_workflow_job(args.id)

get_estimate_for_jobs(
action, [workflow_job], server_prices, args.ipv4_price, args.ipv6_price
writer, [workflow_job], server_prices, args.ipv4_price, args.ipv6_price
)
42 changes: 21 additions & 21 deletions testflows/github/hetzner/runners/scale_down.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,9 +308,9 @@ def scale_down(
observed_interval=current_interval,
)
powered_off_servers[server.name].server = server
powered_off_servers[
server.name
].observed_interval = current_interval
powered_off_servers[server.name].observed_interval = (
current_interval
)

elif server.status == server.STATUS_RUNNING:
if not any(
Expand All @@ -332,9 +332,9 @@ def scale_down(
observed_interval=current_interval,
)
zombie_servers[server.name].server = server
zombie_servers[
server.name
].observed_interval = current_interval
zombie_servers[server.name].observed_interval = (
current_interval
)

else:
zombie_servers.pop(server.name, None)
Expand Down Expand Up @@ -376,9 +376,9 @@ def scale_down(
observed_interval=current_interval,
)
unused_runners[runner.name].runner = runner
unused_runners[
runner.name
].observed_interval = current_interval
unused_runners[runner.name].observed_interval = (
current_interval
)

with Action(
"Checking for scale up failures", level=logging.DEBUG, interval=interval
Expand All @@ -395,20 +395,20 @@ def scale_down(
server_name=scaleup_failure.server_name,
interval=interval,
):
scaleup_failures[
scaleup_failure.server_name
] = ScaleUpFailure(
time=scaleup_failure.time,
labels=scaleup_failure.labels,
server_name=scaleup_failure.server_name,
exception=scaleup_failure.exception,
count=1,
observed_interval=current_interval,
scaleup_failures[scaleup_failure.server_name] = (
ScaleUpFailure(
time=scaleup_failure.time,
labels=scaleup_failure.labels,
server_name=scaleup_failure.server_name,
exception=scaleup_failure.exception,
count=1,
observed_interval=current_interval,
)
)
else:
scaleup_failures[
scaleup_failure.server_name
].exception = scaleup_failure.exception
scaleup_failures[scaleup_failure.server_name].exception = (
scaleup_failure.exception
)
scaleup_failures[scaleup_failure.server_name].count += 1
scaleup_failures[
scaleup_failure.server_name
Expand Down
83 changes: 83 additions & 0 deletions testflows/github/hetzner/runners/streamingyaml.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Copyright 2023 Katteli Inc.
# TestFlows.com Open-Source Software Testing Framework (http://testflows.com)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import yaml
import textwrap


def float_to_str(f, precision):
"""
Convert the given float to a string,
without resorting to scientific notation
"""
return format(f, f".{precision}f")


class Dumper(yaml.SafeDumper):
"""Dumper with custom float precision."""

def represent_float(self, data, precision=6):
if data != data or (data == 0.0 and data == 1.0):
value = ".nan"
elif data == self.inf_value:
value = ".inf"
elif data == -self.inf_value:
value = "-.inf"
else:
value = float_to_str(data, precision=precision).lower()
return self.represent_scalar("tag:yaml.org,2002:float", value)


Dumper.add_representer(float, Dumper.represent_float)


class StreamingYAMLWriter:
"""Streaming YAML writer."""

def __init__(self, stream, indent=0):
self.stream = stream
self.indent = indent

def _write(self, value):
"""Dump value to stream."""
s = yaml.dump(
value, sort_keys=False, indent=2, Dumper=Dumper, allow_unicode=True
)
s = textwrap.indent(s, prefix=" " * self.indent)
self.stream.write(s)
self.stream.flush()

def add_value(self, value):
"""Add '{value}\n'."""
self._write(value)
return self

def add_key_value(self, key, value):
"""Add '{key}: {value}\n'."""
self._write({key: value})
return self

def add_list_element(self, value):
"""Add '- {value}\n'."""
self._write([value])
return self, StreamingYAMLWriter(self.stream, indent=self.indent + 2)

def add_key(self, key):
"""Add key '{key}:\n'."""
s = yaml.dump({key: None}, sort_keys=False, Dumper=Dumper, allow_unicode=True)
s = s.rsplit(": null\n", 1)[0] + ":\n"
s = textwrap.indent(s, prefix=" " * self.indent)
self.stream.write(s)
self.stream.flush()
return StreamingYAMLWriter(stream=self.stream, indent=self.indent + 2)

0 comments on commit 2373a11

Please sign in to comment.