diff --git a/perfmetrics/scripts/testing_on_gke/examples/dlio/parse_logs.py b/perfmetrics/scripts/testing_on_gke/examples/dlio/parse_logs.py index 59a604f1a7..d491e26e00 100644 --- a/perfmetrics/scripts/testing_on_gke/examples/dlio/parse_logs.py +++ b/perfmetrics/scripts/testing_on_gke/examples/dlio/parse_logs.py @@ -22,7 +22,7 @@ import dlio_workload sys.path.append("../") -from utils.utils import get_memory, get_cpu, standard_timestamp, is_mash_installed +from utils.utils import get_memory, get_cpu, unix_to_timestamp, standard_timestamp, is_mash_installed, get_memory_from_monitoring_api, get_cpu_from_monitoring_api, timestamp_to_epoch from utils.parse_logs_common import ensureDir, download_gcs_objects, parseLogParserArguments, SUPPORTED_SCENARIOS _LOCAL_LOGS_LOCATION = "../../bin/dlio-logs/logs" @@ -36,6 +36,8 @@ "train_throughput_samples_per_second": 0, "train_throughput_mb_per_second": 0, "throughput_over_local_ssd": 0, + "start_epoch": "", + "end_epoch": "", "start": "", "end": "", "highest_memory": 0, @@ -161,24 +163,51 @@ def createOutputScenariosFromDownloadedFiles(args: dict) -> dict: * int(output[key]["mean_file_size"]) / (1024**2) ) + r["start_epoch"] = timestamp_to_epoch( + per_epoch_stats_data[str(i + 1)]["start"] + ) + r["end_epoch"] = timestamp_to_epoch( + per_epoch_stats_data[str(i + 1)]["end"] + ) r["start"] = standard_timestamp( per_epoch_stats_data[str(i + 1)]["start"] ) r["end"] = standard_timestamp(per_epoch_stats_data[str(i + 1)]["end"]) - if r["scenario"] != "local-ssd" and mash_installed: - r["lowest_memory"], r["highest_memory"] = get_memory( - r["pod_name"], - r["start"], - r["end"], - project_number=args.project_number, - ) - r["lowest_cpu"], r["highest_cpu"] = get_cpu( - r["pod_name"], - r["start"], - r["end"], - project_number=args.project_number, - ) - pass + + if r["scenario"] != "local-ssd": + if mash_installed: + r["lowest_memory"], r["highest_memory"] = get_memory( + r["pod_name"], + r["start"], + r["end"], + project_number=args.project_number, + ) + r["lowest_cpu"], r["highest_cpu"] = get_cpu( + r["pod_name"], + r["start"], + r["end"], + project_number=args.project_number, + ) + else: + r["lowest_memory"], r["highest_memory"] = ( + get_memory_from_monitoring_api( + pod_name=r["pod_name"], + start_epoch=r["start_epoch"], + end_epoch=r["end_epoch"], + project_id=args.project_id, + cluster_name=args.cluster_name, + namespace_name=args.namespace_name, + ) + ) + r["lowest_cpu"], r["highest_cpu"] = get_cpu_from_monitoring_api( + pod_name=r["pod_name"], + start_epoch=r["start_epoch"], + end_epoch=r["end_epoch"], + project_id=args.project_id, + cluster_name=args.cluster_name, + namespace_name=args.namespace_name, + ) + pass r["gcsfuse_mount_options"] = gcsfuse_mount_options diff --git a/perfmetrics/scripts/testing_on_gke/examples/fio/parse_logs.py b/perfmetrics/scripts/testing_on_gke/examples/fio/parse_logs.py index a1aaeafbb3..8073f77de0 100644 --- a/perfmetrics/scripts/testing_on_gke/examples/fio/parse_logs.py +++ b/perfmetrics/scripts/testing_on_gke/examples/fio/parse_logs.py @@ -22,7 +22,7 @@ import fio_workload sys.path.append("../") -from utils.utils import get_memory, get_cpu, unix_to_timestamp, is_mash_installed +from utils.utils import get_memory, get_cpu, unix_to_timestamp, is_mash_installed, get_memory_from_monitoring_api, get_cpu_from_monitoring_api from utils.parse_logs_common import ensureDir, download_gcs_objects, parseLogParserArguments, SUPPORTED_SCENARIOS _LOCAL_LOGS_LOCATION = "../../bin/fio-logs" @@ -35,6 +35,8 @@ "IOPS": 0, "throughput_mb_per_second": 0, "throughput_over_local_ssd": 0, + "start_epoch": "", + "end_epoch": "", "start": "", "end": "", "highest_memory": 0, @@ -203,24 +205,48 @@ def createOutputScenariosFromDownloadedFiles(args: dict) -> dict: r["throughput_mb_per_second"] = int( per_epoch_output_data["jobs"][0]["read"]["bw_bytes"] / (1024**2) ) + r["start_epoch"] = per_epoch_output_data["jobs"][0]["job_start"] // 1000 + r["end_epoch"] = per_epoch_output_data["timestamp_ms"] // 1000 r["start"] = unix_to_timestamp( per_epoch_output_data["jobs"][0]["job_start"] ) r["end"] = unix_to_timestamp(per_epoch_output_data["timestamp_ms"]) - if r["scenario"] != "local-ssd" and mash_installed: - r["lowest_memory"], r["highest_memory"] = get_memory( - r["pod_name"], - r["start"], - r["end"], - project_number=args.project_number, - ) - r["lowest_cpu"], r["highest_cpu"] = get_cpu( - r["pod_name"], - r["start"], - r["end"], - project_number=args.project_number, - ) + + if r["scenario"] != "local-ssd": + if mash_installed: + r["lowest_memory"], r["highest_memory"] = get_memory( + r["pod_name"], + r["start"], + r["end"], + project_number=args.project_number, + ) + r["lowest_cpu"], r["highest_cpu"] = get_cpu( + r["pod_name"], + r["start"], + r["end"], + project_number=args.project_number, + ) + else: + r["lowest_memory"], r["highest_memory"] = ( + get_memory_from_monitoring_api( + pod_name=r["pod_name"], + start_epoch=r["start_epoch"], + end_epoch=r["end_epoch"], + project_id=args.project_id, + cluster_name=args.cluster_name, + namespace_name=args.namespace_name, + ) + ) + r["lowest_cpu"], r["highest_cpu"] = get_cpu_from_monitoring_api( + pod_name=r["pod_name"], + start_epoch=r["start_epoch"], + end_epoch=r["end_epoch"], + project_id=args.project_id, + cluster_name=args.cluster_name, + namespace_name=args.namespace_name, + ) pass + r["gcsfuse_mount_options"] = gcsfuse_mount_options r["blockSize"] = bs r["filesPerThread"] = nrfiles diff --git a/perfmetrics/scripts/testing_on_gke/examples/run-gke-tests.sh b/perfmetrics/scripts/testing_on_gke/examples/run-gke-tests.sh index 8fed0c3600..af07c24651 100755 --- a/perfmetrics/scripts/testing_on_gke/examples/run-gke-tests.sh +++ b/perfmetrics/scripts/testing_on_gke/examples/run-gke-tests.sh @@ -241,7 +241,16 @@ function installDependencies() { sudo apt install docker-ce -y fi # Ensure that mash is installed. - which mash || (sudo apt-get install -y monarch-tools) + if ! which mash ; then + if ! sudo apt-get install -y monarch-tools; then + # Ensure that gcloud monitoring tools are installed. This is alternative to + # mash on gce vm. + # pip install --upgrade google-cloud-storage + # pip install --ignore-installed --upgrade google-api-python-client + # pip install --ignore-installed --upgrade google-cloud + pip install --upgrade google-cloud-monitoring + fi + fi } # Make sure you have access to the necessary GCP resources. The easiest way to enable it is to use @google.com as active auth. @@ -529,14 +538,14 @@ function waitTillAllPodsComplete() { function fetchAndParseFioOutputs() { echo "Fetching and parsing fio outputs ..." cd "${gke_testing_dir}"/examples/fio - python3 parse_logs.py --project-number=${project_number} --workload-config "${workload_config}" --instance-id ${instance_id} --output-file "${output_dir}"/fio/output.csv + python3 parse_logs.py --project-number=${project_number} --workload-config "${workload_config}" --instance-id ${instance_id} --output-file "${output_dir}"/fio/output.csv --project-id=${project_id} --cluster-name=${cluster_name} --namespace-name=${appnamespace} cd - } function fetchAndParseDlioOutputs() { echo "Fetching and parsing dlio outputs ..." cd "${gke_testing_dir}"/examples/dlio - python3 parse_logs.py --project-number=${project_number} --workload-config "${workload_config}" --instance-id ${instance_id} --output-file "${output_dir}"/dlio/output.csv + python3 parse_logs.py --project-number=${project_number} --workload-config "${workload_config}" --instance-id ${instance_id} --output-file "${output_dir}"/dlio/output.csv --project-id=${project_id} --cluster-name=${cluster_name} --namespace-name=${appnamespace} cd - } diff --git a/perfmetrics/scripts/testing_on_gke/examples/utils/parse_logs_common.py b/perfmetrics/scripts/testing_on_gke/examples/utils/parse_logs_common.py index 9baf5c4c2e..91f19a745c 100644 --- a/perfmetrics/scripts/testing_on_gke/examples/utils/parse_logs_common.py +++ b/perfmetrics/scripts/testing_on_gke/examples/utils/parse_logs_common.py @@ -74,8 +74,18 @@ def parseLogParserArguments() -> object: ), required=True, ) + parser.add_argument( + "--project-id", + metavar="GCP Project ID/name", + help=( + "project-id (e.g. gcs-fuse-test) is needed to fetch the cpu/memory" + " utilization data from GCP." + ), + required=True, + ) parser.add_argument( "--project-number", + metavar="GCP Project Number", help=( "project-number (e.g. 93817472919) is needed to fetch the cpu/memory" " utilization data from GCP." @@ -87,6 +97,16 @@ def parseLogParserArguments() -> object: help="unique string ID for current test-run", required=True, ) + parser.add_argument( + "--cluster-name", + help="Name of GKE cluster where the current test was run", + required=True, + ) + parser.add_argument( + "--namespace-name", + help="kubernestes namespace used for the current test-run", + required=True, + ) parser.add_argument( "-o", "--output-file", diff --git a/perfmetrics/scripts/testing_on_gke/examples/utils/utils.py b/perfmetrics/scripts/testing_on_gke/examples/utils/utils.py index a7a2a41829..0bb5fccb2b 100644 --- a/perfmetrics/scripts/testing_on_gke/examples/utils/utils.py +++ b/perfmetrics/scripts/testing_on_gke/examples/utils/utils.py @@ -16,7 +16,10 @@ # limitations under the License. import datetime, subprocess +import math +import time from typing import Tuple +from google.cloud import monitoring_v3 def is_mash_installed() -> bool: @@ -138,10 +141,20 @@ def unix_to_timestamp(unix_timestamp: int) -> str: return utc_timestamp_string -def standard_timestamp(timestamp: int) -> str: +def standard_timestamp(timestamp: str) -> str: return timestamp.split(".")[0].replace("T", " ") + " UTC" +def timestamp_to_epoch(timestamp: str) -> int: + return int( + time.mktime( + time.strptime( + timestamp.split(".")[0].replace("T", " "), "%Y-%m-%d %H:%M:%S" + ) + ) + ) + + class UnknownMachineTypeError(Exception): """Defines custom exception for unknown machine-type scenario. @@ -167,3 +180,183 @@ def resource_limits(nodeType: str) -> Tuple[dict, dict]: " resource-limits for it.", nodeType, ) + + +def isRelevantMonitoringResult( + result, + cluster_name: str, + pod_name: str, + # container_name: str, + namespace_name: str, +) -> bool: + return ( + True + if ( + hasattr(result, "resource") + and hasattr(result.resource, "type") + and result.resource.type == "k8s_container" + and hasattr(result.resource, "labels") + # and "cluster_name" in result.resource.labels + # and result.resource.labels["cluster_name"] == cluster_name + # and "pod_name" in result.resource.labels + # and result.resource.labels["pod_name"] == pod_name + # and "container_name" in result.resource.labels + # and result.resource.labels["container_name"] == container_name + # and "namespace_name" in result.resource.labels + # and result.resource.labels["namespace_name"] == namespace_name + and hasattr(result, "points") + ) + else False + ) + + +def get_memory_from_monitoring_api( + project_id: str, + cluster_name: str, + pod_name: str, + # container_name: str, + namespace_name: str, + start_epoch: int, + end_epoch: int, +) -> Tuple[int, int]: + """Returns min,max memory usage of the given gke-cluster/namespace/pod/container/start/end scenario in MiB .""" + client = monitoring_v3.MetricServiceClient() + project_name = f"projects/{project_id}" + + interval = monitoring_v3.TimeInterval({ + "start_time": {"seconds": start_epoch, "nanos": 0}, + "end_time": {"seconds": end_epoch, "nanos": 0}, + }) + aggregation = monitoring_v3.Aggregation({ + "alignment_period": {"seconds": 60}, # 1 minute + "per_series_aligner": monitoring_v3.Aggregation.Aligner.ALIGN_MAX, + }) + + results = client.list_time_series( + request={ + "name": project_name, + "filter": ( + 'metric.type = "kubernetes.io/container/memory/used_bytes"' + # ' AND metric.memory_type = "non-evictable"' # for some reason, + # this throws error, so commented it out. + f" AND resource.labels.cluster_name = {cluster_name}" + f" AND resource.labels.pod_name = {pod_name}" + # f" AND resource.labels.container_name = {container_name}" + f" AND resource.labels.namespace_name = {namespace_name}" + ), + "interval": interval, + "view": monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL, + "aggregation": aggregation, + } + ) + + relevant_results = [ + result + for result in results + if isRelevantMonitoringResult( + result, + cluster_name, + pod_name, + # container_name, + namespace_name, + ) + ] + return round( + min( + min( + (point.value.int64_value if point.value.int64_value >= 0 else 0) + for point in result.points + ) + for result in relevant_results + ) + / 2**20, # convert to MiB/s + 0, # round to integer. + ), round( + max( + max( + (point.value.int64_value if point.value.int64_value > 0 else 0) + for point in result.points + ) + for result in relevant_results + ) + / 2**20, # convert to MiB/s + 0, # round to integer. + ) + + +def get_cpu_from_monitoring_api( + project_id: str, + cluster_name: str, + pod_name: str, + # container_name: str, + namespace_name: str, + start_epoch: int, + end_epoch: int, +) -> Tuple[float, float]: + """Returns min,max cpu usage of the given gke-cluster/namespace/pod/container/start/end scenario.""" + client = monitoring_v3.MetricServiceClient() + project_name = f"projects/{project_id}" + + interval = monitoring_v3.TimeInterval({ + "start_time": {"seconds": start_epoch, "nanos": 0}, + "end_time": {"seconds": end_epoch, "nanos": 0}, + }) + aggregation = monitoring_v3.Aggregation({ + "alignment_period": {"seconds": 60}, # 1 minute + "per_series_aligner": monitoring_v3.Aggregation.Aligner.ALIGN_RATE, + }) + + results = client.list_time_series( + request={ + "name": project_name, + "filter": ( + 'metric.type = "kubernetes.io/container/cpu/core_usage_time"' + f" AND resource.labels.cluster_name = {cluster_name}" + f" AND resource.labels.pod_name = {pod_name}" + # f" AND resource.labels.container_name = {container_name}" + f" AND resource.labels.namespace_name = {namespace_name}" + ), + "interval": interval, + "view": monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL, + "aggregation": aggregation, + } + ) + + relevant_results = [ + result + for result in results + if isRelevantMonitoringResult( + result, + cluster_name, + pod_name, + # container_name, + namespace_name, + ) + ] + return round( + min( + min( + ( + point.value.double_value + if point.value.double_value != math.nan + else 0 + ) + for point in result.points + ) + for result in relevant_results + ), + 5, # round up to 5 decimal places. + ), round( + max( + max( + ( + point.value.double_value + if point.value.double_value != math.nan + else 0 + ) + for point in result.points + ) + for result in relevant_results + ), + 5, # round up to 5 decimal places. + ) diff --git a/perfmetrics/scripts/testing_on_gke/examples/utils/utils_test.py b/perfmetrics/scripts/testing_on_gke/examples/utils/utils_test.py new file mode 100644 index 0000000000..df91d0257f --- /dev/null +++ b/perfmetrics/scripts/testing_on_gke/examples/utils/utils_test.py @@ -0,0 +1,54 @@ +"""This file defines unit tests for functionalities in utils.py""" + +import unittest +import utils +from utils import get_cpu_from_monitoring_api, get_memory_from_monitoring_api, timestamp_to_epoch + + +class UtilsTest(unittest.TestCase): + + @classmethod + def setUpClass(self): + self.project_id = "gcs-fuse-test" + self.cluster_name = "gargnitin-dryrun-us-west1-6" + self.pod_name = "fio-tester-gcsfuse-rr-64k-1670041227260535313" + # self.container_name = "fio-tester" + self.namespace_name = "default" + self.start_epoch = 1724233283 + self.end_epoch = 1724233442 + + def test_get_memory_from_monitoring_api(self): + print( + get_memory_from_monitoring_api( + self.project_id, + self.cluster_name, + self.pod_name, + # self.container_name, + self.namespace_name, + self.start_epoch, + self.end_epoch, + ) + ) + + def test_get_cpu_from_monitoring_api(self): + print( + get_cpu_from_monitoring_api( + self.project_id, + self.cluster_name, + self.pod_name, + # self.container_name, + self.namespace_name, + self.start_epoch, + self.end_epoch, + ) + ) + + def test_timestamp_to_epoch(self): + timestamp = "2024-08-21T19:20:25" + expected_epoch = 1724268025 + self.assertEqual(timestamp_to_epoch(timestamp), expected_epoch) + pass + + +if __name__ == "__main__": + unittest.main()