diff --git a/pyproject.toml b/pyproject.toml index af5684e..193a3b2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,10 +1,11 @@ [project] name = "transactron" -version = "2024.11.19.dev0" +version = "2024.11.20.dev0" dependencies = [ "amaranth == 0.5.3", "amaranth-stubs @ git+https://github.com/kuznia-rdzeni/amaranth-stubs.git@edb302b001433edf4c8568190adc9bd0c0039f45", - "dataclasses-json == 0.6.3" + "dataclasses-json == 0.6.3", + "tabulate == 0.9.0" ] requires-python = ">=3.11" classifiers = [ diff --git a/test/testing/test_validate_arguments.py b/test/testing/test_validate_arguments.py index 3695915..7e70369 100644 --- a/test/testing/test_validate_arguments.py +++ b/test/testing/test_validate_arguments.py @@ -26,7 +26,6 @@ def elaborate(self, platform): class TestValidateArguments(TestCaseWithSimulator): def control_caller(self, caller: TestbenchIO, method: TestbenchIO): async def process(sim: TestbenchContext): - await sim.tick() await sim.tick() for _ in range(100): val = random.randrange(2) diff --git a/transactron/testing/method_mock.py b/transactron/testing/method_mock.py index 78f7589..9587ae1 100644 --- a/transactron/testing/method_mock.py +++ b/transactron/testing/method_mock.py @@ -64,7 +64,9 @@ async def output_process( async def validate_arguments_process(self, sim: SimulatorContext) -> None: assert self.validate_arguments is not None sync = sim._design.lookup_domain("sync", None) # type: ignore - async for *args, clk in sim.changed(*(a for a, _ in self.adapter.validators)).edge(sync.clk, 1): + async for *args, clk, _ in ( + sim.changed(*(a for a, _ in self.adapter.validators)).edge(sync.clk, 1).edge(self.adapter.en, 1) + ): assert len(args) == len(self.adapter.validators) # TODO: remove later if clk: self._freeze = True @@ -74,27 +76,24 @@ async def validate_arguments_process(self, sim: SimulatorContext) -> None: sim.set(r, async_mock_def_helper(self, self.validate_arguments, arg)) async def effect_process(self, sim: SimulatorContext) -> None: + sim.set(self.adapter.en, self.enable()) async for *_, done in sim.tick().sample(self.adapter.done): + # Disabling the method on each cycle forces an edge when it is reenabled again. + # The method body won't be executed until the effects are done. + sim.set(self.adapter.en, False) + # First, perform pending effects, updating internal state. with sim.critical(): if done: for eff in self._effects: eff() - # Ensure that the effects of all mocks are applied - await sim.delay(1e-12) + # Ensure that the effects of all mocks are applied. Delay 0 also does this! await sim.delay(self.delay) - # Next, update combinational signals taking the new state into account. - # In case the input signals get updated later, the other processes will perform the update again. + # Next, enable the method. The output will be updated by a combinational process. self._effects = [] self._freeze = False - if self.validate_arguments is not None: - for a, r in self.adapter.validators: - sim.set(r, async_mock_def_helper(self, self.validate_arguments, sim.get(a))) - with self._context(): - ret = async_mock_def_helper(self, self.function, sim.get(self.adapter.data_out)) - sim.set(self.adapter.data_in, ret) sim.set(self.adapter.en, self.enable()) @@ -112,9 +111,12 @@ def def_method_mock( which correspond to named arguments of the method. This decorator can be applied to function definitions or method definitions. - When applied to a method definition, lambdas passed to `async_def_method_mock` + When applied to a method definition, lambdas passed to `def_method_mock` need to take a `self` argument, which should be the first. + Mocks defined at class level or at test level are automatically discovered and + don't need to be manually added to the simulation. + Any side effects (state modification, assertions, etc.) need to be guarded using the `MethodMock.effect` decorator. @@ -137,25 +139,13 @@ def def_method_mock( Example ------- ``` - m = TestCircuit() - def target_process(k: int): - @def_method_mock(lambda: m.target[k]) - def process(arg): - return {"data": arg["data"] + k} - return process - ``` - or equivalently - ``` - m = TestCircuit() - def target_process(k: int): - @def_method_mock(lambda: m.target[k], settle=1, enable=False) - def process(data): - return {"data": data + k} - return process + @def_method_mock(lambda: m.target[k]) + def process(arg): + return {"data": arg["data"] + k} ``` or for class methods ``` - @def_method_mock(lambda self: self.target[k], settle=1, enable=False) + @def_method_mock(lambda self: self.target[k]) def process(self, data): return {"data": data + k} ``` diff --git a/transactron/testing/testbenchio.py b/transactron/testing/testbenchio.py index 478428d..0553184 100644 --- a/transactron/testing/testbenchio.py +++ b/transactron/testing/testbenchio.py @@ -13,15 +13,41 @@ class CallTrigger: + """A trigger which allows to call multiple methods and sample signals. + + The `call()` and `call_try()` methods on a `TestbenchIO` always wait at least one clock cycle. It follows + that these methods can't be used to perform calls to multiple methods in a single clock cycle. Usually + this is not a problem, as different methods can be called from different simulation processes. But in cases + when more control over the time when different calls happen is needed, this trigger class allows to call + many methods in a single clock cycle. + """ + def __init__( self, sim: SimulatorContext, - calls: Iterable[ValueLike | tuple["TestbenchIO", Optional[dict[str, Any]]]] = (), + _calls: Iterable[ValueLike | tuple["TestbenchIO", Optional[dict[str, Any]]]] = (), ): + """ + Parameters + ---------- + sim: SimulatorContext + Amaranth simulator context. + """ self.sim = sim - self.calls_and_values: list[ValueLike | tuple[TestbenchIO, Optional[dict[str, Any]]]] = list(calls) + self.calls_and_values: list[ValueLike | tuple[TestbenchIO, Optional[dict[str, Any]]]] = list(_calls) def sample(self, *values: "ValueLike | TestbenchIO"): + """Sample a signal or a method result on a clock edge. + + Values are sampled like in standard Amaranth `TickTrigger`. Sampling a method result works like `call()`, + but the method is not called - another process can do that instead. If the method was not called, the + sampled value is `None`. + + Parameters + ---------- + *values: ValueLike | TestbenchIO + Value or method to sample. + """ new_calls_and_values: list[ValueLike | tuple["TestbenchIO", None]] = [] for value in values: if isinstance(value, TestbenchIO): @@ -31,14 +57,34 @@ def sample(self, *values: "ValueLike | TestbenchIO"): return CallTrigger(self.sim, (*self.calls_and_values, *new_calls_and_values)) def call(self, tbio: "TestbenchIO", data: dict[str, Any] = {}, /, **kwdata): + """Call a method and sample its result. + + Adds a method call to the trigger. The method result is sampled on a clock edge. If the call did not + succeed, the sampled value is `None`. + + Parameters + ---------- + tbio: TestbenchIO + The method to call. + data: dict[str, Any] + Method call arguments stored in a dict. + **kwdata: Any + Method call arguments passed as keyword arguments. If keyword arguments are used, + the `data` argument should not be provided. + """ if data and kwdata: raise TypeError("call() takes either a single dict or keyword arguments") return CallTrigger(self.sim, (*self.calls_and_values, (tbio, data or kwdata))) async def until_done(self) -> Any: - call = self.__aiter__() - while True: - results = await call.__anext__() + """Wait until at least one of the calls succeeds. + + The `CallTrigger` normally acts like `TickTrigger`, e.g. awaiting on it advances the clock to the next + clock edge. It is possible that none of the calls could not be performed, for example because the called + methods were not enabled. In case we only want to focus on the cycles when one of the calls succeeded, + `until_done` can be used. This works like `until()` in `TickTrigger`. + """ + async for results in self: if any(res is not None for res in results): return results