Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] Run bsim tests with twister #85610

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion scripts/pylib/twister/twisterlib/config_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class TwisterConfigParser:
"extra_dtc_overlay_files": {"type": "list", "default": []},
"required_snippets": {"type": "list"},
"build_only": {"type": "bool", "default": False},
"no_build": {"type": "bool", "default": False},
"build_on_all": {"type": "bool", "default": False},
"skip": {"type": "bool", "default": False},
"slow": {"type": "bool", "default": False},
Expand Down Expand Up @@ -176,7 +177,7 @@ def get_scenario(self, name):
if k == "filter":
d[k] = f"({d[k]}) and ({v})"
elif k not in ("extra_conf_files", "extra_overlay_confs",
"extra_dtc_overlay_files"):
"extra_dtc_overlay_files", "harness_config"):
if isinstance(d[k], str) and isinstance(v, list):
d[k] = [d[k]] + v
elif isinstance(d[k], list) and isinstance(v, str):
Expand All @@ -198,6 +199,25 @@ def get_scenario(self, name):
else:
d[k] = v

harness_config = copy.deepcopy(self.common.get("harness_config", {}))
if "harness_config" in self.scenarios[name]:
if harness_config:
for k, v in self.scenarios[name]["harness_config"].items():
if k in harness_config:
if isinstance(harness_config[k], list):
if d["harness"] == "bsim":
harness_config[k] += v if isinstance(v, list) else [v]
else:
harness_config[k] = v if isinstance(v, list) else [v]
else:
harness_config[k] = v
else:
harness_config[k] = v
else:
harness_config = self.scenarios[name]["harness_config"]

d["harness_config"] = harness_config

# Compile conf files in to a single list. The order to apply them is:
# (1) CONF_FILEs extracted from common['extra_args']
# (2) common['extra_conf_files']
Expand Down
227 changes: 210 additions & 17 deletions scripts/pylib/twister/twisterlib/harness.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
# SPDX-License-Identifier: Apache-2.0

from __future__ import annotations

import glob
import json
import logging
import os
import platform
import random
import re
import shlex
import shutil
Expand Down Expand Up @@ -41,6 +44,8 @@ class Harness:
RUN_FAILED = "PROJECT EXECUTION FAILED"
run_id_pattern = r"RunID: (?P<run_id>.*)"

cacheable = False

def __init__(self):
self._status = TwisterStatus.NONE
self.reason = None
Expand Down Expand Up @@ -964,12 +969,72 @@ class Ztest(Test):

class Bsim(Harness):

DEFAULT_VERBOSITY = 2
DEFAULT_SIM_LENGTH = 60e6

BSIM_READY_TIMEOUT_S = 60

cacheable = True

def __init__(self):
super().__init__()
self._bsim_out_path = os.getenv('BSIM_OUT_PATH', '')
if self._bsim_out_path:
self._bsim_out_path = os.path.join(self._bsim_out_path, 'bin')
self._exe_paths = []
self._tc_output = []
self._start_time = 0

def _set_start_time(self):
self._start_time = time.time()

def _get_exe_path(self, index):
return self._exe_paths[index if index < len(self._exe_paths) else 0]

def configure(self, instance):
def replacer(exe_name):
return exe_name.replace(os.path.sep, '_').replace('.', '_').replace('@', '_')

super().configure(instance)

if not self._bsim_out_path:
raise Exception('Cannot copy bsim exe - BSIM_OUT_PATH not provided.')

exe_names = []
for exe_name in self.instance.testsuite.harness_config.get('bsim_exe_name', []):
new_exe_name = f'bs_{self.instance.platform.name}_{exe_name}'
exe_names.append(replacer(new_exe_name))

if not exe_names:
exe_names = [f'bs_{replacer(self.instance.name)}']

self._exe_paths = \
[os.path.join(self._bsim_out_path, exe_name) for exe_name in exe_names]

def clean_exes(self):
self._set_start_time()

try:
for exe_path in [self._get_exe_path(i) for i in range(len(self._exe_paths))]:
if os.path.exists(exe_path):
os.remove(exe_path)
except Exception as e:
logger.warning(f'Failed to clean up bsim exes: {e}')

def wait_bsim_ready(self):
start_time = time.time()
while time.time() - start_time < Bsim.BSIM_READY_TIMEOUT_S:
if all([os.path.exists(self._get_exe_path(i)) for i in range(len(self._exe_paths))]):
return True
time.sleep(0.1)

return False

def build(self):
"""
Copying the application executable to BabbleSim's bin directory enables
running multidevice bsim tests after twister has built them.
"""

if self.instance is None:
return

Expand All @@ -978,23 +1043,125 @@ def build(self):
logger.warning('Cannot copy bsim exe - cannot find original executable.')
return

bsim_out_path: str = os.getenv('BSIM_OUT_PATH', '')
if not bsim_out_path:
logger.warning('Cannot copy bsim exe - BSIM_OUT_PATH not provided.')
return
try:
new_exe_path = self._get_exe_path(0)
logger.debug(f'Copying executable from {original_exe_path} to {new_exe_path}')
shutil.copy(original_exe_path, new_exe_path)
self.status = TwisterStatus.PASS
except Exception as e:
logger.error(f'Failed to copy bsim exe: {e}')
self.status = TwisterStatus.ERROR
finally:
self.instance.execution_time = time.time() - self._start_time

def _run_cmd(self, cmd, timeout):
logger.debug(' '.join(cmd))
with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
cwd=self._bsim_out_path) as proc:
try:
reader_t = threading.Thread(target=self._output_reader, args=(proc,), daemon=True)
reader_t.start()
reader_t.join(timeout)
if reader_t.is_alive():
terminate_process(proc)
logger.warning('Timeout has occurred. Can be extended in testspec file. '
f'Currently set to {timeout} seconds.')
self.status = TwisterStatus.ERROR
proc.wait(timeout)
except subprocess.TimeoutExpired:
self.status = TwisterStatus.ERROR
proc.kill()

new_exe_name: str = self.instance.testsuite.harness_config.get('bsim_exe_name', '')
if new_exe_name:
new_exe_name = f'bs_{self.instance.platform.name}_{new_exe_name}'
if proc.returncode != 0:
self.status = TwisterStatus.ERROR
self.instance.reason = f'Bsim error - return code {proc.returncode}'
else:
new_exe_name = self.instance.name
new_exe_name = f'bs_{new_exe_name}'
self.status = TwisterStatus.PASS if self.status == TwisterStatus.NONE else self.status

def _output_reader(self, proc):
while proc.stdout.readable() and proc.poll() is None:
line = proc.stdout.readline().decode().strip()
if not line:
continue
logger.debug(line)
self._tc_output.append(line)
proc.communicate()

def _generate_commands(self):
def rs():
return f'-rs={random.randint(0, 2**10 - 1)}'

def expand_args(dev_id):
try:
args = [str(eval(v)[dev_id]) if v.startswith('[') else v for v in extra_args]
return [arg for arg in args if args if arg]
except Exception as e:
logger.warning(f'Unable to expand extra arguments set {extra_args}: {e}')
return extra_args

bsim_phy_path = os.path.join(self._bsim_out_path, 'bs_2G4_phy_v1')
suite_id = f'-s={self.instance.name.split(os.path.sep)[-1].replace(".", "_")}'

cfg = self.instance.testsuite.harness_config
verbosity = f'-v={cfg.get("bsim_verbosity", self.DEFAULT_VERBOSITY)}'
sim_length = f'-sim_length={cfg.get("bsim_sim_length", self.DEFAULT_SIM_LENGTH)}'
extra_args = cfg.get('bsim_options', [])
phy_extra_args = cfg.get('bsim_phy_options', [])
test_ids = cfg.get('bsim_test_ids', [])
if not test_ids:
logger.error('No test ids specified for bsim test')
self.status = TwisterStatus.ERROR
return []

cmds = [[self._get_exe_path(i), verbosity, suite_id, f'-d={i}', f'-testid={t_id}', rs()]
+ expand_args(i) for i, t_id in enumerate(test_ids)]
cmds.append([bsim_phy_path, verbosity, suite_id, f'-D={len(test_ids)}', sim_length]
+ phy_extra_args)

new_exe_name = new_exe_name.replace(os.path.sep, '_').replace('.', '_').replace('@', '_')
return cmds

def _clean_up_files(self):
# Clean-up any log files that the test may have generated
files = glob.glob(os.path.join(self._bsim_out_path, '*.log'))
# files += glob.glob(os.path.join(self._bsim_out_path, '*.bin'))
try:
for file in [f for f in files if os.path.getctime(f) > self._start_time]:
os.remove(file)
except Exception as e:
logger.warning(f'Failed to clean up bsim log files: {e}')

def bsim_run(self, timeout):
try:
self._set_start_time()

threads = []
for cmd in self._generate_commands():
t = threading.Thread(target=lambda c=cmd: self._run_cmd(c, timeout))
threads.append(t)
t.start()

for t in threads:
t.join(timeout=timeout)
except Exception as e:
logger.error(f'BSIM test failed: {e}')
self.status = TwisterStatus.ERROR
finally:
self._update_test_status()
self._clean_up_files()

def _update_test_status(self):
self.instance.execution_time += time.time() - self._start_time
if not self.instance.testcases:
self.instance.init_cases()

# currently there is always one testcase per bsim suite
self.instance.testcases[0].status = self.status if self.status != TwisterStatus.NONE else \
TwisterStatus.FAIL
self.instance.status = self.instance.testcases[0].status
if self.instance.status in [TwisterStatus.ERROR, TwisterStatus.FAIL]:
self.instance.reason = self.instance.reason or 'Bsim test failed'
self.instance.testcases[0].output = '\n'.join(self._tc_output)

new_exe_path: str = os.path.join(bsim_out_path, 'bin', new_exe_name)
logger.debug(f'Copying executable from {original_exe_path} to {new_exe_path}')
shutil.copy(original_exe_path, new_exe_path)

class Ctest(Harness):
def configure(self, instance: TestInstance):
Expand Down Expand Up @@ -1135,15 +1302,41 @@ def _parse_report_file(self, report):

class HarnessImporter:

cache = {}
cache_lock = threading.Lock()

@staticmethod
def get_harness(harness_name):
def get_harness(instance: TestInstance):
harness_class = HarnessImporter._get_harness_class(instance.testsuite.harness)
if not harness_class:
return None

harness = None
with HarnessImporter.cache_lock:
if harness_class.cacheable and instance.name in HarnessImporter.cache:
harness = HarnessImporter.cache[instance.name]
else:
harness = harness_class()
if harness_class.cacheable:
harness.configure(instance)
HarnessImporter.cache[instance.name] = harness

return harness

@staticmethod
def _get_harness_class(harness_name: str):
thismodule = sys.modules[__name__]
try:
if harness_name:
harness_class = getattr(thismodule, harness_name)
harness_class = getattr(thismodule, harness_name.capitalize())
else:
harness_class = thismodule.Test
return harness_class()
return harness_class
except AttributeError as e:
logger.debug(f"harness {harness_name} not implemented: {e}")
return None

@staticmethod
def get_harness_by_name(harness_name: str):
harness_class = HarnessImporter._get_harness_class(harness_name)
return harness_class() if harness_class else None
Loading
Loading