Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/lean_spec/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
from lean_spec.subspecs.networking.enr import ENR
from lean_spec.subspecs.networking.gossipsub import GossipTopic
from lean_spec.subspecs.networking.reqresp.message import Status
from lean_spec.subspecs.node import Node, NodeConfig, get_local_validator_id
from lean_spec.subspecs.node import Node, NodeConfig
from lean_spec.subspecs.ssz.hash import hash_tree_root
from lean_spec.subspecs.sync.checkpoint_sync import (
CheckpointSyncError,
Expand Down Expand Up @@ -281,7 +281,7 @@ async def _init_from_checkpoint(
#
# The store treats this as the new "genesis" for fork choice purposes.
# All blocks before the checkpoint are effectively pruned.
validator_id = get_local_validator_id(validator_registry)
validator_id = validator_registry.primary_index() if validator_registry else None
store = Store.get_forkchoice_store(state, anchor_block, validator_id)
logger.info(
"Initialized from checkpoint at slot %d (finalized=%s)",
Expand Down Expand Up @@ -487,7 +487,7 @@ async def run_node(
block_topic = str(GossipTopic.block(GOSSIP_FORK_DIGEST))
event_source.subscribe_gossip_topic(block_topic)
# Subscribe to attestation subnet topics based on local validator id.
validator_id = get_local_validator_id(validator_registry)
validator_id = validator_registry.primary_index() if validator_registry else None
if validator_id is None:
subnet_id = 0
logger.info("No local validator id; subscribing to attestation subnet %d", subnet_id)
Expand Down
3 changes: 2 additions & 1 deletion src/lean_spec/subspecs/networking/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@
Bridges connection events to NetworkService events.
"""

from .event_source import LiveNetworkEventSource
from .event_source import EventSource, LiveNetworkEventSource
from .reqresp_client import ReqRespClient

__all__ = [
"EventSource",
"LiveNetworkEventSource",
"ReqRespClient",
]
22 changes: 22 additions & 0 deletions src/lean_spec/subspecs/networking/client/event_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
import asyncio
import logging
from dataclasses import dataclass, field
from typing import Protocol, Self

from lean_spec.snappy import SnappyDecompressionError, frame_decompress
from lean_spec.subspecs.containers import SignedBlockWithAttestation
Expand Down Expand Up @@ -162,6 +163,27 @@
logger = logging.getLogger(__name__)


class EventSource(Protocol):
"""Protocol for network event sources.

Defines the minimal interface needed by NetworkService.
LiveNetworkEventSource satisfies this with real network I/O.
MockEventSource satisfies this for testing.
"""

def __aiter__(self) -> Self:
"""Return self as async iterator."""
...

async def __anext__(self) -> NetworkEvent:
"""Yield the next network event."""
...

async def publish(self, topic: str, data: bytes) -> None:
"""Broadcast a message to all peers on a topic."""
...


class GossipMessageError(Exception):
"""Raised when a gossip message cannot be processed."""

Expand Down
4 changes: 2 additions & 2 deletions src/lean_spec/subspecs/networking/service/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from lean_spec.snappy import frame_compress
from lean_spec.subspecs.containers import SignedBlockWithAttestation
from lean_spec.subspecs.containers.attestation import SignedAggregatedAttestation, SignedAttestation
from lean_spec.subspecs.networking.client.event_source import LiveNetworkEventSource
from lean_spec.subspecs.networking.client.event_source import EventSource
from lean_spec.subspecs.networking.gossipsub.topic import GossipTopic
from lean_spec.subspecs.networking.peer import PeerInfo
from lean_spec.subspecs.networking.types import ConnectionState
Expand Down Expand Up @@ -70,7 +70,7 @@ class NetworkService:
sync_service: SyncService
"""Sync service that receives routed events."""

event_source: LiveNetworkEventSource
event_source: EventSource
"""Source of network events from the transport layer."""

fork_digest: str = field(default="0x00000000")
Expand Down
4 changes: 2 additions & 2 deletions src/lean_spec/subspecs/node/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Node orchestrator for the Lean Ethereum consensus client."""

from .node import Node, NodeConfig, get_local_validator_id
from .node import Node, NodeConfig

__all__ = ["Node", "NodeConfig", "get_local_validator_id"]
__all__ = ["Node", "NodeConfig"]
118 changes: 70 additions & 48 deletions src/lean_spec/subspecs/node/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@

from lean_spec.subspecs.api import ApiServer, ApiServerConfig
from lean_spec.subspecs.chain import SlotClock
from lean_spec.subspecs.chain.config import ATTESTATION_COMMITTEE_COUNT, INTERVALS_PER_SLOT
from lean_spec.subspecs.chain.config import (
ATTESTATION_COMMITTEE_COUNT,
INTERVALS_PER_SLOT,
SECONDS_PER_SLOT,
)
from lean_spec.subspecs.chain.service import ChainService
from lean_spec.subspecs.containers import Block, BlockBody, SignedBlockWithAttestation, State
from lean_spec.subspecs.containers.attestation import SignedAttestation
Expand All @@ -29,13 +33,16 @@
from lean_spec.subspecs.containers.validator import ValidatorIndex
from lean_spec.subspecs.forkchoice import Store
from lean_spec.subspecs.networking import NetworkService
from lean_spec.subspecs.networking.client.event_source import LiveNetworkEventSource
from lean_spec.subspecs.networking.client.event_source import EventSource
from lean_spec.subspecs.ssz.hash import hash_tree_root
from lean_spec.subspecs.storage import Database, SQLiteDatabase
from lean_spec.subspecs.sync import BlockCache, NetworkRequester, PeerManager, SyncService
from lean_spec.subspecs.validator import ValidatorRegistry, ValidatorService
from lean_spec.types import Bytes32, Uint64

_ZERO_TIME = Uint64(0)
"""Default genesis time for database loading when no genesis time is available."""


@dataclass(frozen=True, slots=True)
class NodeConfig:
Expand All @@ -51,7 +58,7 @@ class NodeConfig:
validators: Validators
"""Initial validator set for genesis state."""

event_source: LiveNetworkEventSource
event_source: EventSource
"""Source of network events."""

network: NetworkRequester
Expand Down Expand Up @@ -95,6 +102,11 @@ class NodeConfig:
"""
Whether this node functions as an aggregator.

Aggregator selection is static (node-level flag), not VRF-based rotation.
The spec assumes at least one aggregator node exists in the network.

With ATTESTATION_COMMITTEE_COUNT = 1, all validators share subnet 0.

When True:
- The node performs attestation aggregation operations
- The ENR advertises aggregator capability to peers
Expand All @@ -104,20 +116,6 @@ class NodeConfig:
"""


def get_local_validator_id(registry: ValidatorRegistry | None) -> ValidatorIndex | None:
"""
Get the validator index for this node.

For now, returns None as a default for passive nodes or simple setups.
Future implementations will look up keys in the registry.
"""
if registry is None or len(registry) == 0:
return None

# For simplicity, use the first validator in the registry.
return registry.indices()[0]


@dataclass(slots=True)
class Node:
"""
Expand Down Expand Up @@ -148,6 +146,9 @@ class Node:
validator_service: ValidatorService | None = field(default=None)
"""Optional validator service for block/attestation production."""

database: Database | None = field(default=None)
"""Optional database reference for lifecycle management."""

_shutdown: asyncio.Event = field(default_factory=asyncio.Event)
"""Event signaling shutdown request."""

Expand All @@ -170,13 +171,17 @@ def from_genesis(cls, config: NodeConfig) -> Node:
# The database is optional - nodes can run without persistence.
database: Database | None = None
if config.database_path is not None:
database = cls._create_database(config.database_path)
database = SQLiteDatabase(config.database_path)

#
# If database contains valid state, resume from there.
# Otherwise, fall through to genesis initialization.
validator_id = get_local_validator_id(config.validator_registry)
store = cls._try_load_from_database(database, validator_id)
validator_id = (
config.validator_registry.primary_index() if config.validator_registry else None
)
store = cls._try_load_from_database(
database, validator_id, config.genesis_time, config.time_fn
)

if store is None:
# Generate genesis state from validators.
Expand Down Expand Up @@ -242,7 +247,7 @@ def from_genesis(cls, config: NodeConfig) -> Node:
#
# SyncService delegates aggregate publishing to NetworkService
# via a callback, avoiding a circular dependency.
sync_service._publish_agg_fn = network_service.publish_aggregated_attestation
sync_service.set_publish_agg_fn(network_service.publish_aggregated_attestation)

# Create API server if configured
api_server: ApiServer | None = None
Expand All @@ -261,17 +266,20 @@ def from_genesis(cls, config: NodeConfig) -> Node:
# Wire callbacks to publish produced blocks/attestations to the network.
validator_service: ValidatorService | None = None
if config.validator_registry is not None:
# Create a wrapper for publish_attestation that computes the subnet_id
# from the validator_id in the attestation
# These wrappers serve a dual purpose:
#
# 1. Publish to the network so peers receive the block/attestation.
# 2. Process locally so the node's own store reflects what it produced.
#
# Without local processing, the node would not see its own produced
# blocks/attestations in forkchoice until they arrived back via gossip.
async def publish_attestation_wrapper(attestation: SignedAttestation) -> None:
subnet_id = attestation.validator_id.compute_subnet_id(ATTESTATION_COMMITTEE_COUNT)
await network_service.publish_attestation(attestation, subnet_id)
# Also route locally so we can aggregate our own attestation
await sync_service.on_gossip_attestation(attestation)

async def publish_block_wrapper(block: SignedBlockWithAttestation) -> None:
await network_service.publish_block(block)
# Also route locally so we update our own store
await sync_service.on_gossip_block(block, peer_id=None)

validator_service = ValidatorService(
Expand All @@ -290,35 +298,32 @@ async def publish_block_wrapper(block: SignedBlockWithAttestation) -> None:
network_service=network_service,
api_server=api_server,
validator_service=validator_service,
database=database,
)

@staticmethod
def _create_database(path: Path | str) -> Database:
"""
Create database instance from path.

Args:
path: Path to SQLite database file.

Returns:
Database instance ready for use.
"""
# SQLite handles its own caching at the filesystem level.
return SQLiteDatabase(path)

@staticmethod
def _try_load_from_database(
database: Database | None,
validator_id: ValidatorIndex | None,
genesis_time: Uint64 | None = None,
time_fn: Callable[[], float] = time.time,
) -> Store | None:
"""
Try to load forkchoice store from existing database state.

Returns None if database is empty or unavailable.

Uses wall-clock time to set the store's time field. This ensures that
after a restart, the store reflects actual elapsed time rather than just
the head block's proposal moment. Without this, the store would reject
valid attestations as "too far in future" until the chain service ticks
catch up.

Args:
database: Database to load from.
validator_id: Validator index for the store instance.
genesis_time: Unix timestamp of genesis (slot 0).
time_fn: Wall-clock time source.

Returns:
Loaded Store or None if no valid state exists.
Expand All @@ -345,12 +350,24 @@ def _try_load_from_database(
if justified is None or finalized is None:
return None

# Compute store time from wall clock to avoid post-restart drift.
#
# Using only the head block's slot would set the store time to the
# block's proposal moment. After a restart, this makes the store
# think it's in the past, rejecting valid attestations as "future".
# Instead, derive time from wall clock, floored by the block's slot.
gt = genesis_time if genesis_time is not None else _ZERO_TIME
elapsed_seconds = Uint64(max(0, int(time_fn()) - int(gt)))
wall_clock_intervals = elapsed_seconds * INTERVALS_PER_SLOT // SECONDS_PER_SLOT
block_intervals = head_block.slot * INTERVALS_PER_SLOT
store_time = max(wall_clock_intervals, block_intervals)

# Reconstruct minimal store from persisted data.
#
# The store starts with just the head block and state.
# Additional blocks can be loaded on demand or via sync.
return Store(
time=Uint64(head_block.slot * INTERVALS_PER_SLOT),
time=store_time,
config=head_state.config,
head=head_root,
safe_target=head_root,
Expand Down Expand Up @@ -383,14 +400,19 @@ async def run(self, *, install_signal_handlers: bool = True) -> None:
# A separate task monitors the shutdown signal.
# When triggered, it stops all services.
# Once services exit, execution completes.
async with asyncio.TaskGroup() as tg:
tg.create_task(self.chain_service.run())
tg.create_task(self.network_service.run())
if self.api_server is not None:
tg.create_task(self.api_server.run())
if self.validator_service is not None:
tg.create_task(self.validator_service.run())
tg.create_task(self._wait_shutdown())
# The finally block ensures the database is closed on shutdown.
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(self.chain_service.run())
tg.create_task(self.network_service.run())
if self.api_server is not None:
tg.create_task(self.api_server.run())
if self.validator_service is not None:
tg.create_task(self.validator_service.run())
tg.create_task(self._wait_shutdown())
finally:
if self.database is not None:
self.database.close()

def _install_signal_handlers(self) -> None:
"""
Expand Down
11 changes: 11 additions & 0 deletions src/lean_spec/subspecs/sync/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,17 @@ class SyncService:
Same buffering strategy as individual attestations.
"""

def set_publish_agg_fn(
self, fn: Callable[[SignedAggregatedAttestation], Coroutine[Any, Any, None]]
) -> None:
"""Wire the aggregated attestation publisher after construction.

Breaks circular dependency between SyncService and NetworkService.
NetworkService needs SyncService at construction, but SyncService
needs NetworkService's publish method. This setter resolves the cycle.
"""
self._publish_agg_fn = fn

def __post_init__(self) -> None:
"""Initialize sync components."""
self._init_components()
Expand Down
15 changes: 15 additions & 0 deletions src/lean_spec/subspecs/validator/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,21 @@ def indices(self) -> ValidatorIndices:
"""
return ValidatorIndices(data=list(self._validators.keys()))

def primary_index(self) -> ValidatorIndex | None:
"""
Get the primary validator index for store-level identity.

Returns the first validator index in the registry.
With ATTESTATION_COMMITTEE_COUNT = 1, all validators share subnet 0,
so a single ID suffices for store-level operations.

Returns:
First validator index, or None if registry is empty.
"""
if not self._validators:
return None
return next(iter(self._validators))

def __len__(self) -> int:
"""Number of validators in the registry."""
return len(self._validators)
Expand Down
Loading
Loading