Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More missed stuff #3

Merged
merged 1 commit into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
[project]
name = "transactron"
version = "2024.11.19.dev0"
version = "2024.11.20.dev0"
dependencies = [
"amaranth == 0.5.3",
"amaranth-stubs @ git+https://github.com/kuznia-rdzeni/amaranth-stubs.git@edb302b001433edf4c8568190adc9bd0c0039f45",
"dataclasses-json == 0.6.3"
"dataclasses-json == 0.6.3",
"tabulate == 0.9.0"
]
requires-python = ">=3.11"
classifiers = [
Expand Down
1 change: 0 additions & 1 deletion test/testing/test_validate_arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ def elaborate(self, platform):
class TestValidateArguments(TestCaseWithSimulator):
def control_caller(self, caller: TestbenchIO, method: TestbenchIO):
async def process(sim: TestbenchContext):
await sim.tick()
await sim.tick()
for _ in range(100):
val = random.randrange(2)
Expand Down
46 changes: 18 additions & 28 deletions transactron/testing/method_mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ async def output_process(
async def validate_arguments_process(self, sim: SimulatorContext) -> None:
assert self.validate_arguments is not None
sync = sim._design.lookup_domain("sync", None) # type: ignore
async for *args, clk in sim.changed(*(a for a, _ in self.adapter.validators)).edge(sync.clk, 1):
async for *args, clk, _ in (
sim.changed(*(a for a, _ in self.adapter.validators)).edge(sync.clk, 1).edge(self.adapter.en, 1)
):
assert len(args) == len(self.adapter.validators) # TODO: remove later
if clk:
self._freeze = True
Expand All @@ -74,27 +76,24 @@ async def validate_arguments_process(self, sim: SimulatorContext) -> None:
sim.set(r, async_mock_def_helper(self, self.validate_arguments, arg))

async def effect_process(self, sim: SimulatorContext) -> None:
sim.set(self.adapter.en, self.enable())
async for *_, done in sim.tick().sample(self.adapter.done):
# Disabling the method on each cycle forces an edge when it is reenabled again.
# The method body won't be executed until the effects are done.
sim.set(self.adapter.en, False)

# First, perform pending effects, updating internal state.
with sim.critical():
if done:
for eff in self._effects:
eff()

# Ensure that the effects of all mocks are applied
await sim.delay(1e-12)
# Ensure that the effects of all mocks are applied. Delay 0 also does this!
await sim.delay(self.delay)

# Next, update combinational signals taking the new state into account.
# In case the input signals get updated later, the other processes will perform the update again.
# Next, enable the method. The output will be updated by a combinational process.
self._effects = []
self._freeze = False
if self.validate_arguments is not None:
for a, r in self.adapter.validators:
sim.set(r, async_mock_def_helper(self, self.validate_arguments, sim.get(a)))
with self._context():
ret = async_mock_def_helper(self, self.function, sim.get(self.adapter.data_out))
sim.set(self.adapter.data_in, ret)
sim.set(self.adapter.en, self.enable())


Expand All @@ -112,9 +111,12 @@ def def_method_mock(
which correspond to named arguments of the method.

This decorator can be applied to function definitions or method definitions.
When applied to a method definition, lambdas passed to `async_def_method_mock`
When applied to a method definition, lambdas passed to `def_method_mock`
need to take a `self` argument, which should be the first.

Mocks defined at class level or at test level are automatically discovered and
don't need to be manually added to the simulation.

Any side effects (state modification, assertions, etc.) need to be guarded
using the `MethodMock.effect` decorator.

Expand All @@ -137,25 +139,13 @@ def def_method_mock(
Example
-------
```
m = TestCircuit()
def target_process(k: int):
@def_method_mock(lambda: m.target[k])
def process(arg):
return {"data": arg["data"] + k}
return process
```
or equivalently
```
m = TestCircuit()
def target_process(k: int):
@def_method_mock(lambda: m.target[k], settle=1, enable=False)
def process(data):
return {"data": data + k}
return process
@def_method_mock(lambda: m.target[k])
def process(arg):
return {"data": arg["data"] + k}
```
or for class methods
```
@def_method_mock(lambda self: self.target[k], settle=1, enable=False)
@def_method_mock(lambda self: self.target[k])
def process(self, data):
return {"data": data + k}
```
Expand Down
56 changes: 51 additions & 5 deletions transactron/testing/testbenchio.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,41 @@


class CallTrigger:
"""A trigger which allows to call multiple methods and sample signals.

The `call()` and `call_try()` methods on a `TestbenchIO` always wait at least one clock cycle. It follows
that these methods can't be used to perform calls to multiple methods in a single clock cycle. Usually
this is not a problem, as different methods can be called from different simulation processes. But in cases
when more control over the time when different calls happen is needed, this trigger class allows to call
many methods in a single clock cycle.
"""

def __init__(
self,
sim: SimulatorContext,
calls: Iterable[ValueLike | tuple["TestbenchIO", Optional[dict[str, Any]]]] = (),
_calls: Iterable[ValueLike | tuple["TestbenchIO", Optional[dict[str, Any]]]] = (),
):
"""
Parameters
----------
sim: SimulatorContext
Amaranth simulator context.
"""
self.sim = sim
self.calls_and_values: list[ValueLike | tuple[TestbenchIO, Optional[dict[str, Any]]]] = list(calls)
self.calls_and_values: list[ValueLike | tuple[TestbenchIO, Optional[dict[str, Any]]]] = list(_calls)

def sample(self, *values: "ValueLike | TestbenchIO"):
"""Sample a signal or a method result on a clock edge.

Values are sampled like in standard Amaranth `TickTrigger`. Sampling a method result works like `call()`,
but the method is not called - another process can do that instead. If the method was not called, the
sampled value is `None`.

Parameters
----------
*values: ValueLike | TestbenchIO
Value or method to sample.
"""
new_calls_and_values: list[ValueLike | tuple["TestbenchIO", None]] = []
for value in values:
if isinstance(value, TestbenchIO):
Expand All @@ -31,14 +57,34 @@ def sample(self, *values: "ValueLike | TestbenchIO"):
return CallTrigger(self.sim, (*self.calls_and_values, *new_calls_and_values))

def call(self, tbio: "TestbenchIO", data: dict[str, Any] = {}, /, **kwdata):
"""Call a method and sample its result.

Adds a method call to the trigger. The method result is sampled on a clock edge. If the call did not
succeed, the sampled value is `None`.

Parameters
----------
tbio: TestbenchIO
The method to call.
data: dict[str, Any]
Method call arguments stored in a dict.
**kwdata: Any
Method call arguments passed as keyword arguments. If keyword arguments are used,
the `data` argument should not be provided.
"""
if data and kwdata:
raise TypeError("call() takes either a single dict or keyword arguments")
return CallTrigger(self.sim, (*self.calls_and_values, (tbio, data or kwdata)))

async def until_done(self) -> Any:
call = self.__aiter__()
while True:
results = await call.__anext__()
"""Wait until at least one of the calls succeeds.

The `CallTrigger` normally acts like `TickTrigger`, e.g. awaiting on it advances the clock to the next
clock edge. It is possible that none of the calls could not be performed, for example because the called
methods were not enabled. In case we only want to focus on the cycles when one of the calls succeeded,
`until_done` can be used. This works like `until()` in `TickTrigger`.
"""
async for results in self:
if any(res is not None for res in results):
return results

Expand Down