Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions packages/testing/src/consensus_testing/keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,10 +289,6 @@ def get_public_key(self, idx: ValidatorIndex) -> PublicKey:
"""Get a validator's public key."""
return self[idx].public

def get_all_public_keys(self) -> dict[ValidatorIndex, PublicKey]:
"""Get all public keys (from base keys, not advanced state)."""
return {idx: kp.public for idx, kp in self.keys.items()}

def sign_attestation_data(
self,
validator_id: ValidatorIndex,
Expand Down
62 changes: 14 additions & 48 deletions packages/testing/src/framework/pytest_plugins/filler.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,10 +305,13 @@ def pytest_collection_modifyitems(config: pytest.Config, items: List[pytest.Item
config.hook.pytest_deselected(items=deselected)


def _is_test_item_valid_for_fork(item: pytest.Item, fork_class: Any, get_fork_by_name: Any) -> bool:
"""Check if a test item is valid for the given fork based on validity markers."""
markers = list(item.iter_markers())
def _check_markers_valid_for_fork(
markers: list[Any], fork_class: Any, get_fork_by_name: Any
) -> bool:
"""Check if test markers indicate validity for the given fork.

Shared logic for both collection-time and parametrization-time fork filtering.
"""
has_valid_from = False
has_valid_until = False
has_valid_at = False
Expand Down Expand Up @@ -354,6 +357,11 @@ def _is_test_item_valid_for_fork(item: pytest.Item, fork_class: Any, get_fork_by
return from_valid and until_valid


def _is_test_item_valid_for_fork(item: pytest.Item, fork_class: Any, get_fork_by_name: Any) -> bool:
"""Check if a test item is valid for the given fork based on validity markers."""
return _check_markers_valid_for_fork(list(item.iter_markers()), fork_class, get_fork_by_name)


def pytest_sessionfinish(session: pytest.Session, exitstatus: int) -> None:
"""Write all collected fixtures at the end of the session."""
if hasattr(session.config, "fixture_collector"):
Expand Down Expand Up @@ -527,51 +535,9 @@ def _is_test_valid_for_fork(
metafunc: pytest.Metafunc, fork_class: Any, get_fork_by_name: Any
) -> bool:
"""Check if a test is valid for the given fork based on validity markers."""
markers = list(metafunc.definition.iter_markers())

has_valid_from = False
has_valid_until = False
has_valid_at = False

valid_from_forks = []
valid_until_forks = []
valid_at_forks = []

for marker in markers:
if marker.name == "valid_from":
has_valid_from = True
for fork_name in marker.args:
target_fork = get_fork_by_name(fork_name)
if target_fork:
valid_from_forks.append(target_fork)
elif marker.name == "valid_until":
has_valid_until = True
for fork_name in marker.args:
target_fork = get_fork_by_name(fork_name)
if target_fork:
valid_until_forks.append(target_fork)
elif marker.name == "valid_at":
has_valid_at = True
for fork_name in marker.args:
target_fork = get_fork_by_name(fork_name)
if target_fork:
valid_at_forks.append(target_fork)

if not (has_valid_from or has_valid_until or has_valid_at):
return True

if has_valid_at:
return fork_class in valid_at_forks

from_valid = True
if has_valid_from:
from_valid = any(fork_class >= from_fork for from_fork in valid_from_forks)

until_valid = True
if has_valid_until:
until_valid = any(fork_class <= until_fork for until_fork in valid_until_forks)

return from_valid and until_valid
return _check_markers_valid_for_fork(
list(metafunc.definition.iter_markers()), fork_class, get_fork_by_name
)


def _register_layer_fixtures(config: pytest.Config, layer: str) -> None:
Expand Down
4 changes: 1 addition & 3 deletions src/lean_spec/subspecs/koalabear/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
"""Specifications for the KoalaBear finite field."""

from .field import P_BITS, P_BYTES, TWO_ADICITY, Fp, P
from .field import P_BYTES, Fp, P

__all__ = [
"P",
"P_BITS",
"P_BYTES",
"TWO_ADICITY",
"Fp",
]
56 changes: 0 additions & 56 deletions src/lean_spec/subspecs/koalabear/field.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,59 +191,3 @@ def __bytes__(self) -> bytes:
4-byte little-endian representation of the field element.
"""
return self.encode_bytes()

@classmethod
def from_bytes(cls, data: bytes) -> Self:
"""
Deserialize a field element from bytes.

This is the inverse of `__bytes__()` and follows Python's standard
deserialization pattern.

Args:
data: 4-byte little-endian representation of a field element.

Returns:
Deserialized field element.

Raises:
ValueError: If data has incorrect length or represents an invalid field value.
"""
return cls.decode_bytes(data)

@classmethod
def serialize_list(cls, elements: list[Self]) -> bytes:
"""
Serialize a list of field elements to bytes.

This is a convenience method for serializing multiple field elements
at once, useful for container serialization.

Args:
elements: List of field elements to serialize.

Returns:
Concatenated bytes of all field elements.
"""
return b"".join(bytes(elem) for elem in elements)

@classmethod
def deserialize_list(cls, data: bytes, count: int) -> list[Self]:
"""
Deserialize a fixed number of field elements from bytes.

Args:
data: Raw bytes to deserialize.
count: Expected number of field elements.

Returns:
List of deserialized field elements.

Raises:
ValueError: If data length doesn't match expected count.
"""
expected_len = count * P_BYTES
if len(data) != expected_len:
raise ValueError(f"Expected {expected_len} bytes for {count} elements, got {len(data)}")

return [cls.from_bytes(data[i : i + P_BYTES]) for i in range(0, len(data), P_BYTES)]
148 changes: 0 additions & 148 deletions src/lean_spec/subspecs/networking/client/event_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@
from lean_spec.subspecs.networking.varint import (
VarintError,
decode_varint,
encode_varint,
)
from lean_spec.types.exceptions import SSZSerializationError

Expand Down Expand Up @@ -1345,119 +1344,6 @@ async def setup_outbound_with_delay() -> None:
# The connection will be cleaned up elsewhere.
logger.warning("Stream acceptor error for %s: %s", peer_id, e)

async def _handle_gossip_stream(self, peer_id: PeerId, stream: InboundStreamProtocol) -> None:
"""
Handle an incoming gossip stream.

Reads the gossip message, decodes it, and emits the appropriate event.

Args:
peer_id: Peer that sent the message.
stream: QUIC stream containing the gossip message.


COMPLETE FLOW
-------------
A gossip message goes through these stages:

1. Read raw bytes from QUIC stream.
2. Parse topic string and data length (varints).
3. Decompress Snappy-framed data.
4. Decode SSZ bytes into typed object.
5. Emit event to the sync layer.

Any stage can fail. Failures are logged but don't crash the handler.


ERROR HANDLING STRATEGY
-----------------------
Gossip is best-effort. A single bad message should not:

- Crash the node.
- Disconnect the peer.
- Block other messages.

We log errors and continue. Peer scoring (not implemented here)
would track repeated failures for reputation management.


RESOURCE CLEANUP
----------------
The stream MUST be closed in finally, even if errors occur.
Unclosed streams leak QUIC resources and can cause deadlocks.
"""
try:
# Step 1: Read the gossip message from the stream.
#
# This parses the varint-prefixed topic and data fields.
# May fail if the message is truncated or malformed.
topic_str, compressed_data = await read_gossip_message(stream)

# Step 2: Decode the message.
#
# This performs:
# - Topic validation (correct prefix, encoding, fork).
# - Snappy decompression with CRC verification.
# - SSZ decoding into the appropriate type.
message = self._gossip_handler.decode_message(topic_str, compressed_data)
topic = self._gossip_handler.get_topic(topic_str)

# Step 3: Emit the appropriate event based on message type.
#
# The topic determines the expected message type.
# We verify the decoded type matches to catch bugs.
match topic.kind:
case TopicKind.BLOCK:
if isinstance(message, SignedBlockWithAttestation):
await self._emit_gossip_block(message, peer_id)
else:
# Type mismatch indicates a bug in decode_message.
logger.warning("Block topic but got %s", type(message).__name__)

case TopicKind.ATTESTATION_SUBNET:
if isinstance(message, SignedAttestation):
await self._emit_gossip_attestation(message, peer_id)
else:
# Type mismatch indicates a bug in decode_message.
logger.warning("Attestation topic but got %s", type(message).__name__)

case TopicKind.AGGREGATED_ATTESTATION:
if isinstance(message, SignedAggregatedAttestation):
await self._emit_gossip_aggregated_attestation(message, peer_id)
else:
logger.warning(
"Aggregated attestation topic but got %s",
type(message).__name__,
)

logger.debug("Received gossip %s from %s", topic.kind.value, peer_id)

except GossipMessageError as e:
# Expected error: malformed message, decompression failure, etc.
#
# This is not necessarily a bug. The peer may be misbehaving
# or there may be network corruption. Log and continue.
logger.warning("Gossip message error from %s: %s", peer_id, e)

except Exception as e:
# Unexpected error: likely a bug in our code.
#
# Log as warning to aid debugging. Don't crash.
logger.warning("Unexpected error handling gossip from %s: %s", peer_id, e)

finally:
# Always close the stream to release QUIC resources.
#
# Unclosed streams cause resource leaks and can deadlock
# the connection if too many accumulate.
#
# The try/except suppresses close errors. The stream may
# already be closed if the connection dropped.
try:
await stream.close()
except Exception:
pass

async def publish(self, topic: str, data: bytes) -> None:
"""
Broadcast a message to all connected peers on a topic.
Expand All @@ -1482,37 +1368,3 @@ async def publish(self, topic: str, data: bytes) -> None:
logger.debug("Published message to gossipsub topic %s", topic)
except Exception as e:
logger.warning("Failed to publish to gossipsub: %s", e)

async def _send_gossip_message(
self,
conn: QuicConnection,
topic: str,
data: bytes,
) -> None:
"""
Send a gossip message to a peer.

Opens a new stream for the gossip message and sends the data.

Args:
conn: QuicConnection to the peer.
topic: Topic string for the message.
data: Message bytes to send.
"""
# Open a new outbound stream for gossip protocol.
stream = await conn.open_stream(GOSSIPSUB_DEFAULT_PROTOCOL_ID)

try:
# Format: topic length (varint) + topic + data length (varint) + data
topic_bytes = topic.encode("utf-8")

# Write topic length and topic.
await stream.write(encode_varint(len(topic_bytes)))
await stream.write(topic_bytes)

# Write data length and data.
await stream.write(encode_varint(len(data)))
await stream.write(data)

finally:
await stream.close()
9 changes: 0 additions & 9 deletions src/lean_spec/subspecs/networking/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@

# --- Gossipsub Protocol IDs ---

GOSSIPSUB_PROTOCOL_ID_V10: Final[str] = "/meshsub/1.0.0"
"""Gossipsub v1.0 protocol ID - basic mesh pubsub."""

GOSSIPSUB_PROTOCOL_ID_V11: Final[str] = "/meshsub/1.1.0"
"""Gossipsub v1.1 protocol ID - peer scoring, extended validators.

Expand All @@ -75,11 +72,5 @@
before attempting to re-graft. This prevents rapid mesh churn.
"""

MESSAGE_ID_SIZE: Final[int] = 20
"""Size of gossipsub message IDs in bytes.

Per Ethereum spec, message IDs are the first 20 bytes of SHA256(domain + topic_len + topic + data).
"""

MAX_ERROR_MESSAGE_SIZE: Final[int] = 256
"""Maximum error message size in bytes per Ethereum P2P spec (ErrorMessage: List[byte, 256])."""
2 changes: 0 additions & 2 deletions src/lean_spec/subspecs/networking/gossipsub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
ForkMismatchError,
GossipTopic,
TopicKind,
format_topic_string,
parse_topic_string,
)
from .types import (
Expand All @@ -59,7 +58,6 @@
# Topic (commonly needed for Ethereum)
"GossipTopic",
"TopicKind",
"format_topic_string",
"parse_topic_string",
"ForkMismatchError",
# Types
Expand Down
8 changes: 0 additions & 8 deletions src/lean_spec/subspecs/networking/gossipsub/mcache.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,6 @@ def __len__(self) -> int:
"""Return the total number of cached messages."""
return len(self._by_id)

def __contains__(self, msg_id: MessageId) -> bool:
"""Check if a message ID is in the cache."""
return msg_id in self._by_id


@dataclass(slots=True)
class SeenCache:
Expand Down Expand Up @@ -328,7 +324,3 @@ def clear(self) -> None:
def __len__(self) -> int:
"""Return the number of seen message IDs."""
return len(self._timestamps)

def __contains__(self, msg_id: MessageId) -> bool:
"""Check if a message ID has been seen."""
return msg_id in self._timestamps
Loading
Loading