Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 77 additions & 1 deletion mfd_host/feature/cpu/esxi.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,19 @@

import logging
import re
from time import sleep

from mfd_common_libs import add_logging_level, log_levels, TimeoutCounter
from mfd_connect.process.rpyc import RPyCProcess

from mfd_common_libs import add_logging_level, log_levels
from mfd_host.exceptions import CPUFeatureExecutionError, CPUFeatureException
from mfd_host.feature.cpu.base import BaseFeatureCPU
from mfd_network_adapter.data_structures import State

logger = logging.getLogger(__name__)
add_logging_level("MODULE_DEBUG", log_levels.MODULE_DEBUG)

ESXTOP_FILE_LOCATION = "/tmp/esxtop_cpu.log"

class ESXiCPU(BaseFeatureCPU):
"""ESXi class for CPU feature."""
Expand Down Expand Up @@ -62,3 +66,75 @@ def set_numa_affinity(self, numa_state: State) -> None:
f'esxcli system settings advanced set {value} -o "/Numa/LocalityWeightActionAffinity"',
custom_exception=CPUFeatureExecutionError,
)

def start_cpu_measurement(self) -> RPyCProcess:
"""
Start CPU measurement on SUT host.

:return: Handle to process
"""
logger.debug(msg="Start CPU measurement on SUT Host")
return self._connection.start_process(
command="esxtop -b -n 8 -d3", log_file=True, output_file=ESXTOP_FILE_LOCATION, shell=True
)

def stop_cpu_measurement(self, process: RPyCProcess, vm_name: str) -> int:
"""
Stop CPU measurement process.

:param process: Process handle
:param vm_name: VM name to filter CPU usage
:return: Average CPU usage percentage
"""
logger.debug(msg="Stop CPU measurement on SUT Host")
if process.running:
process.stop()
timeout = TimeoutCounter(5)
while not timeout:
if not process.running:
break
sleep(1)
else:
process.kill()
timeout = TimeoutCounter(5)
while not timeout:
if not process.running:
break
sleep(1)
if process.running:
raise RuntimeError("CPU measurement process is still running after stop and kill.")

return self.parse_cpu_usage(vm_name=vm_name, process=process)

def parse_cpu_usage(self, vm_name: str, process: RPyCProcess) -> int:
"""
Parse CPU usage from esxtop output.

:param vm_name: VM name to filter CPU usage
:param process: Process handle
:return: average CPU usage percentage
"""
parsed_file_path = "/tmp/parsed_output.txt"
Copy link

Copilot AI Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hardcoded path '/tmp/parsed_output.txt' creates a security risk as it could be exploited through race conditions or predictable file paths. Consider using tempfile.NamedTemporaryFile() or generating unique filenames with timestamps or UUIDs to prevent potential file overwrites or unauthorized access.

Suggested change
parsed_file_path = "/tmp/parsed_output.txt"
# Use remote tempfile to generate a unique temporary file path
remote_tempfile = self._connection.modules().tempfile
temp_file = remote_tempfile.NamedTemporaryFile(delete=False)
parsed_file_path = temp_file.name
temp_file.close()

Copilot uses AI. Check for mistakes.
command = (
f"cut -d, -f`awk -F, '{{for (i=1;i<=NF;i++){{if ($i ~/Group Cpu.*{vm_name}).*Used/) "
f"{{print i}}}}}}' {process.log_path}` {process.log_path}>{parsed_file_path}"
)
Comment on lines +118 to +121
Copy link

Copilot AI Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The vm_name parameter is directly interpolated into a shell command without sanitization, creating a command injection vulnerability. Validate or escape vm_name before using it in the command, or use a safer approach to construct the command.

Copilot uses AI. Check for mistakes.
self._connection.execute_command(command=command, shell=True)
Copy link

Copilot AI Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using shell=True with command string interpolation can create command injection vulnerabilities. The vm_name parameter is directly interpolated into the command without sanitization. Consider validating/sanitizing vm_name or using a safer approach that doesn't rely on shell=True with user-provided input.

Copilot uses AI. Check for mistakes.
p = self._connection.path(process.log_path)
p.unlink()
Comment on lines +122 to +124
Copy link

Copilot AI Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original log file is deleted before attempting to read the parsed output. If the execute_command fails or the parsed file is not created, there's no way to debug or recover. Consider deleting the original log file after successfully reading the parsed output, or handle the case where the parsed file might not exist.

Copilot uses AI. Check for mistakes.
try:
with self._connection.modules().builtins.open(parsed_file_path, "r") as f:
file_content = f.read()
cpu_list = []
for line in file_content.splitlines()[1:]:
try:
cpu_list.append(float(line.strip('"')))
except ValueError:
continue

except Exception as e:
raise RuntimeError(f"Failed to read parsed CPU usage output file due to - {e}.")

p = self._connection.path(parsed_file_path)
p.unlink()
return round(sum(cpu_list) / len(cpu_list)) if cpu_list else 0
92 changes: 92 additions & 0 deletions tests/unit/test_mfd_host/test_feature/test_cpu/test_esxi.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
from mfd_network_adapter.data_structures import State
from mfd_typing import OSName

from mfd_host.feature.cpu import ESXiCPU
from mfd_host.feature.cpu.esxi import ESXTOP_FILE_LOCATION


class TestESXiCPU:
cpu_attribute_output = dedent(
Expand Down Expand Up @@ -78,3 +81,92 @@ def test_set_numa_affinity_disabled(self, host, mocker):
cmd = 'esxcli system settings advanced set -i 0 -o "/Numa/LocalityWeightActionAffinity"'
assert host.cpu.set_numa_affinity(State.DISABLED) is None
host.connection.execute_command.assert_called_once_with(cmd, custom_exception=CPUFeatureExecutionError)

def test_start_cpu_measurement_returns_process_handle(self, host, mocker):
mock_connection = mocker.Mock()
mock_process = mocker.Mock()
mock_connection.start_process.return_value = mock_process
host.cpu._connection = mock_connection

result = host.cpu.start_cpu_measurement()

assert result == mock_process
mock_connection.start_process.assert_called_once_with(
command="esxtop -b -n 8 -d3", log_file=True, output_file=ESXTOP_FILE_LOCATION, shell=True
)

def test_logs_debug_message_when_starting_cpu_measurement(self, host, mocker):
mock_logger = mocker.patch("mfd_host.feature.cpu.esxi.logger")

host.cpu.start_cpu_measurement()

mock_logger.debug.assert_called_once_with(
msg="Start CPU measurement on SUT Host"
)

def test_parses_average_cpu_usage(self, host, mocker):
mock_connection = mocker.MagicMock()
mock_process = mocker.MagicMock()
mock_process.log_path = "/tmp/esxtop.log"
mock_connection.modules().builtins.open.return_value.read.return_value = (
'Header\n"10"\n"20"\n"30"\n"40"\n"50"\n'
)
mock_connection.modules().builtins.open.return_value.__enter__.return_value = (
mock_connection.modules().builtins.open.return_value
)
host.cpu._connection = mock_connection

result = host.cpu.parse_cpu_usage("vm1", mock_process)
assert result == 30 # (10+20+30+40+50)//5

def test_returns_zero_when_no_cpu_samples_found(self, host, mocker):
mock_connection = mocker.MagicMock()
mock_process = mocker.MagicMock()
mock_process.log_path = "/tmp/esxtop.log"
mock_connection.modules().builtins.open.return_value.read.return_value = (
"Header\n"
)
mock_connection.modules().builtins.open.return_value.__enter__.return_value = (
mock_connection.modules().builtins.open.return_value
)
host.cpu._connection = mock_connection

result = host.cpu.parse_cpu_usage("vm1", mock_process)
assert result == 0

def test_skips_lines_that_cannot_be_converted_to_float(self, host, mocker):
mock_connection = mocker.MagicMock()
mock_process = mocker.MagicMock()
mock_process.log_path = "/tmp/esxtop.log"
mock_connection.modules().builtins.open.return_value.read.return_value = (
'Header\n"10"\n"invalid"\n"30"\n"40"\n"50"\n'
)
mock_connection.modules().builtins.open.return_value.__enter__.return_value = (
mock_connection.modules().builtins.open.return_value
)
host.cpu._connection = mock_connection

result = host.cpu.parse_cpu_usage("vm1", mock_process)
assert result == 32 # (10+30+40+50)//4

def test_raises_runtime_error_when_file_read_fails(self, host, mocker):
mock_connection = mocker.MagicMock()
mock_process = mocker.MagicMock()
mock_process.log_path = "/tmp/esxtop.log"
mock_connection.modules().builtins.open.side_effect = Exception("File error")
host.cpu._connection = mock_connection

with pytest.raises(
RuntimeError,
match="Failed to read parsed CPU usage output file due to - File error.",
):
host.cpu.parse_cpu_usage("vm1", mock_process)

def test_parses_cpu_usage_when_process_not_running(self, host, mocker):
mock_process = mocker.Mock()
mock_process.running = False
mock_parse = mocker.patch.object(ESXiCPU, "parse_cpu_usage", return_value=7)
host.cpu._connection = mocker.Mock()
result = host.cpu.stop_cpu_measurement(mock_process, "vm4")
assert result == 7
mock_parse.assert_called_once_with(vm_name="vm4", process=mock_process)