Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 24 additions & 22 deletions tests/multi_extmod/machine_i2c_target_irq.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,34 @@
print("SKIP")
raise SystemExit

from target_wiring import i2c_args, i2c_kwargs

ADDR = 67
clock_stretch_us = 200

# Configure pins based on the target.
if sys.platform == "alif":
i2c_args = (1,) # pins P3_7/P3_6
i2c_kwargs = {}
elif sys.platform == "mimxrt":
i2c_args = (0,) # pins 19/18 on Teensy 4.x
i2c_kwargs = {}
clock_stretch_us = 50 # mimxrt cannot delay too long in the IRQ handler
elif sys.platform == "rp2":
i2c_args = (0,)
i2c_kwargs = {"scl": 9, "sda": 8}
elif sys.platform == "pyboard":
i2c_args = ("Y",)
i2c_kwargs = {}
elif sys.platform == "samd":
i2c_args = () # pins SCL/SDA
i2c_kwargs = {}
elif "zephyr-rpi_pico" in sys.implementation._machine:
i2c_args = ("i2c1",) # on gpio7/gpio6
i2c_kwargs = {}
else:
print("Please add support for this test on this platform.")
raise SystemExit
# if sys.platform == "alif":
# i2c_args = (1,) # pins P3_7/P3_6
# i2c_kwargs = {}
# elif sys.platform == "mimxrt":
# i2c_args = (0,) # pins 19/18 on Teensy 4.x
# i2c_kwargs = {}
# clock_stretch_us = 50 # mimxrt cannot delay too long in the IRQ handler
# elif sys.platform == "rp2":
# i2c_args = (0,)
# i2c_kwargs = {"scl": 9, "sda": 8}
# elif sys.platform == "pyboard":
# i2c_args = ("Y",)
# i2c_kwargs = {}
# elif sys.platform == "samd":
# i2c_args = () # pins SCL/SDA
# i2c_kwargs = {}
# elif "zephyr-rpi_pico" in sys.implementation._machine:
# i2c_args = ("i2c1",) # on gpio7/gpio6
# i2c_kwargs = {}
# else:
# print("Please add support for this test on this platform.")
# raise SystemExit


def simple_irq(i2c_target):
Expand Down
52 changes: 27 additions & 25 deletions tests/multi_extmod/machine_i2c_target_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,33 +10,35 @@
import sys
from machine import I2C, I2CTarget

from target_wiring import i2c_args, i2c_kwargs

ADDR = 67

# Configure pins based on the target.
if sys.platform == "alif":
i2c_args = (1,) # pins P3_7/P3_6
i2c_kwargs = {}
elif sys.platform == "esp32":
i2c_args = (1,) # on pins 9/8
i2c_kwargs = {}
elif sys.platform == "mimxrt":
i2c_args = (0,) # pins 19/18 on Teensy 4.x
i2c_kwargs = {}
elif sys.platform == "rp2":
i2c_args = (0,)
i2c_kwargs = {"scl": 9, "sda": 8}
elif sys.platform == "pyboard":
i2c_args = ("Y",)
i2c_kwargs = {}
elif sys.platform == "samd":
i2c_args = () # pins SCL/SDA
i2c_kwargs = {}
elif "zephyr-rpi_pico" in sys.implementation._machine:
i2c_args = ("i2c1",) # on gpio7/gpio6
i2c_kwargs = {}
else:
print("Please add support for this test on this platform.")
raise SystemExit
# # Configure pins based on the target.
# if sys.platform == "alif":
# i2c_args = (1,) # pins P3_7/P3_6
# i2c_kwargs = {}
# elif sys.platform == "esp32":
# i2c_args = (1,) # on pins 9/8
# i2c_kwargs = {}
# elif sys.platform == "mimxrt":
# i2c_args = (0,) # pins 19/18 on Teensy 4.x
# i2c_kwargs = {}
# elif sys.platform == "rp2":
# i2c_args = (0,)
# i2c_kwargs = {"scl": 9, "sda": 8}
# elif sys.platform == "pyboard":
# i2c_args = ("Y",)
# i2c_kwargs = {}
# elif sys.platform == "samd":
# i2c_args = () # pins SCL/SDA
# i2c_kwargs = {}
# elif "zephyr-rpi_pico" in sys.implementation._machine:
# i2c_args = ("i2c1",) # on gpio7/gpio6
# i2c_kwargs = {}
# else:
# print("Please add support for this test on this platform.")
# raise SystemExit


def simple_irq(i2c_target):
Expand Down
81 changes: 64 additions & 17 deletions tests/run-multitests.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
# Runs a test suite that relies on two micropython instances/devices
# interacting in some way. Typically used to test networking / bluetooth etc.


from __future__ import annotations
import sys, os, time, re, select
import argparse
import itertools
import subprocess
import tempfile
import dataclasses

run_tests_module = __import__("run-tests")

Expand Down Expand Up @@ -264,6 +265,9 @@ def run_script(self, script):

def start_script(self, script):
self.pyb.enter_raw_repl()
if hasattr(self, 'target_wiring_script'):
cmd = "import sys;sys.modules['target_wiring']=__build_class__(lambda:exec(" + repr(self.target_wiring_script) + "),'target_wiring')"
self.pyb.exec_(cmd)
self.pyb.exec_raw_no_follow(script)
self.finished = False

Expand Down Expand Up @@ -318,7 +322,7 @@ def trace_instance_output(instance_idx, line):
sys.stdout.flush()


def run_test_on_instances(test_file, num_instances, instances):
def run_test_on_instances(test_file, num_instances, instances: tuple[TestInstance, ...]):
global trace_t0
trace_t0 = time.time()

Expand All @@ -336,12 +340,12 @@ def run_test_on_instances(test_file, num_instances, instances):
injected_globals += "HOST_IP = '" + get_host_ip() + "'\n"

if cmd_args.trace_output:
print("TRACE {}:".format("|".join(str(i) for i in instances)))
print("TRACE {}:".format("|".join(str(i.pyinstance) for i in instances)))

# Start all instances running, in order, waiting until they signal they are ready
for idx in range(num_instances):
append_code = APPEND_CODE_TEMPLATE.format(injected_globals, idx)
instance = instances[idx]
instance = instances[idx].pyinstance
instance.start_file(test_file, append=append_code)
last_read_time = time.time()
while True:
Expand Down Expand Up @@ -382,7 +386,7 @@ def run_test_on_instances(test_file, num_instances, instances):
num_running = 0
num_output = 0
for idx in range(num_instances):
instance = instances[idx]
instance = instances[idx].pyinstance
if instance.is_finished():
continue
num_running += 1
Expand Down Expand Up @@ -412,7 +416,7 @@ def run_test_on_instances(test_file, num_instances, instances):
last_read_time[idx] = time.time()

if out.startswith("BROADCAST "):
for instance2 in instances:
for instance2 in [i.pyinstance for i in instances]:
if instance2 is not instance:
instance2.write(bytes(out, "ascii") + b"\r\n")
elif out.startswith("OUTPUT_METRIC "):
Expand All @@ -431,7 +435,7 @@ def run_test_on_instances(test_file, num_instances, instances):

# Stop all instances
for idx in range(num_instances):
instances[idx].stop()
instances[idx].pyinstance.stop()

output_str = ""
for idx, lines in enumerate(output):
Expand Down Expand Up @@ -485,11 +489,11 @@ def print_diff(a, b):
os.unlink(b_path)


def run_tests(test_files, instances_truth, instances_test):
def run_tests(test_files, instances_truth, instances_test: tuple[TestInstance, ...]):
test_results = []

for test_file, num_instances in test_files:
instances_str = "|".join(str(instances_test[i]) for i in range(num_instances))
instances_str = "|".join(str(instances_test[i].pyinstance) for i in range(num_instances))
print("{} on {}: ".format(test_file, instances_str), end="")
if cmd_args.show_output or cmd_args.trace_output:
print()
Expand Down Expand Up @@ -548,6 +552,20 @@ def run_tests(test_files, instances_truth, instances_test):
return test_results


@dataclasses.dataclass
class TestInstance:
pyinstance: PyInstance
tw_source: str
"The pathname to a file in tests/target_wiring"

@property
def target_wiring_specified(self) -> bool:
return self.tw_source != ""

def __repr__(self):
return f"{self.pyinstance.device}({self.tw_source})"


def main():
global cmd_args

Expand Down Expand Up @@ -585,6 +603,12 @@ def main():
default=run_tests_module.base_path("results"),
help="directory for test results",
)
cmd_parser.add_argument(
"--target-wiring",
action="append",
default=[],
help="force the given script to be used as target_wiring.py",
)
cmd_parser.add_argument("files", nargs="+", help="input test files")
cmd_args = cmd_parser.parse_args()

Expand All @@ -596,27 +620,50 @@ def main():

instances_truth = [PyInstanceSubProcess([PYTHON_TRUTH]) for _ in range(max_instances)]

instances_test = []
for i in cmd_args.test_instance:
# Make sure the count matches: test_instance, target_wiring, instances_test
if len(cmd_args.target_wiring) == 0:
cmd_args.target_wiring = [""] * len(cmd_args.test_instance)

if len(cmd_args.test_instance) != len(cmd_args.target_wiring):
print(
"The argument count of '--target-wiring' must match '--target-instance'",
file=sys.stderr,
)
sys.exit(2)

instances_test: list[TestInstance] = []
for i, w in zip(cmd_args.test_instance, cmd_args.target_wiring, strict=True):
# Each instance arg is <cmd>,ENV=VAR,ENV=VAR...
i = i.split(",")
cmd = i[0]
env = i[1:]
if cmd.startswith("exec:"):
instances_test.append(PyInstanceSubProcess([cmd[len("exec:") :]], env))
pyinstance = PyInstanceSubProcess([cmd[len("exec:") :]], env)
elif cmd == "unix":
instances_test.append(PyInstanceSubProcess([MICROPYTHON], env))
pyinstance = PyInstanceSubProcess([MICROPYTHON], env)
elif cmd == "cpython":
instances_test.append(PyInstanceSubProcess([CPYTHON3], env))
pyinstance = PyInstanceSubProcess([CPYTHON3], env)
elif cmd == "webassembly" or cmd.startswith("execpty:"):
print("unsupported instance string: {}".format(cmd), file=sys.stderr)
sys.exit(2)
else:
device = run_tests_module.convert_device_shortcut_to_real_device(cmd)
instances_test.append(PyInstancePyboard(device))
pyinstance = PyInstancePyboard(device)

instances_test.append(TestInstance(pyinstance=pyinstance, tw_source=w))

for _ in range(max_instances - len(instances_test)):
instances_test.append(PyInstanceSubProcess([MICROPYTHON]))
instances_test.append(
TestInstance(pyinstance=PyInstanceSubProcess([MICROPYTHON]), tw_source="")
)

for instance in instances_test:
run_tests_module.detect_target_wiring_script2(
pyb=instance.pyinstance,
target_wiring=instance.tw_source,
platform=None, # TODO: correct parameter
build="", # TODO: correct parameter
)

os.makedirs(cmd_args.result_dir, exist_ok=True)
all_pass = True
Expand All @@ -632,7 +679,7 @@ def main():
for i in instances_truth:
i.close()
for i in instances_test:
i.close()
i.pyinstance.close()

if not all_pass:
sys.exit(1)
Expand Down
17 changes: 13 additions & 4 deletions tests/run-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,19 +359,27 @@ def detect_test_platform(pyb, args):


def detect_target_wiring_script(pyb, args):
# TODO: How to detect 'platform' and 'build'
detect_target_wiring_script2(
pyb, target_wiring=args.target_wiring, platform=args.platform, build=args.build
)


def detect_target_wiring_script2(pyb, target_wiring, platform, build):
tw_data = b""
tw_source = None
if args.target_wiring:
if target_wiring:
# A target_wiring path is explicitly provided, so use that.
tw_source = args.target_wiring
tw_source = target_wiring
with open(tw_source, "rb") as f:
tw_data = f.read()

elif hasattr(pyb, "exec_raw") and pyb.exec_raw("import target_wiring") == (b"", b""):
# The board already has a target_wiring module available, so use that.
tw_source = "on-device"
else:
port = platform_to_port(args.platform)
build = args.build
port = platform_to_port(platform)
build = build
tw_board_exact = None
tw_board_partial = None
tw_port = None
Expand Down Expand Up @@ -1232,6 +1240,7 @@ def to_json(obj):
},
f,
default=to_json,
indent=2,
)

# Return True only if all tests succeeded.
Expand Down
4 changes: 4 additions & 0 deletions tests/target_wiring/rp2.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,7 @@

uart_loopback_args = (0,)
uart_loopback_kwargs = {"tx": "GPIO0", "rx": "GPIO1"}

i2c_args = (0,)
i2c_kwargs = {"scl": 9, "sda": 8}

5 changes: 5 additions & 0 deletions tests/target_wiring/rp2_octoprobe_infra.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Target wiring for rp2 on octoprobe tentacle.
#

i2c_args = (1,)
i2c_kwargs = {"scl": 11, "sda": 10}