diff --git a/mod/beacon/block_store/service.go b/mod/beacon/block_store/service.go index 14a73eff40..9b2ffae020 100644 --- a/mod/beacon/block_store/service.go +++ b/mod/beacon/block_store/service.go @@ -23,9 +23,9 @@ package blockstore import ( "context" - async "github.com/berachain/beacon-kit/mod/async/pkg/types" + asynctypes "github.com/berachain/beacon-kit/mod/async/pkg/types" "github.com/berachain/beacon-kit/mod/log" - async1 "github.com/berachain/beacon-kit/mod/primitives/pkg/async" + "github.com/berachain/beacon-kit/mod/primitives/pkg/async" ) // Service is a Service that listens for blocks and stores them in a KVStore. @@ -38,11 +38,11 @@ type Service[ // logger is used for logging information and errors. logger log.Logger[any] // dispatcher is the dispatcher for the service. - dispatcher async.EventDispatcher + dispatcher asynctypes.EventDispatcher // store is the block store for the service. store BlockStoreT // subFinalizedBlkEvents is a channel for receiving finalized block events. - subFinalizedBlkEvents chan async1.Event[BeaconBlockT] + subFinalizedBlkEvents chan async.Event[BeaconBlockT] } // NewService creates a new block service. @@ -52,7 +52,7 @@ func NewService[ ]( config Config, logger log.Logger[any], - dispatcher async.EventDispatcher, + dispatcher asynctypes.EventDispatcher, store BlockStoreT, ) *Service[BeaconBlockT, BlockStoreT] { return &Service[BeaconBlockT, BlockStoreT]{ @@ -60,7 +60,7 @@ func NewService[ logger: logger, dispatcher: dispatcher, store: store, - subFinalizedBlkEvents: make(chan async1.Event[BeaconBlockT]), + subFinalizedBlkEvents: make(chan async.Event[BeaconBlockT]), } } @@ -78,7 +78,7 @@ func (s *Service[BeaconBlockT, _]) Start(ctx context.Context) error { // subscribe a channel to the finalized block events. if err := s.dispatcher.Subscribe( - async1.BeaconBlockFinalizedEvent, s.subFinalizedBlkEvents, + async.BeaconBlockFinalizedEvent, s.subFinalizedBlkEvents, ); err != nil { s.logger.Error("failed to subscribe to block events", "error", err) return err @@ -102,7 +102,7 @@ func (s *Service[BeaconBlockT, BlockStoreT]) eventLoop(ctx context.Context) { // onFinalizeBlock is triggered when a finalized block event is received. // It stores the block in the KVStore. func (s *Service[BeaconBlockT, _]) onFinalizeBlock( - event async1.Event[BeaconBlockT], + event async.Event[BeaconBlockT], ) { slot := event.Data().GetSlot() if err := s.store.Set(event.Data()); err != nil { diff --git a/mod/beacon/blockchain/service.go b/mod/beacon/blockchain/service.go index 6022e6e6de..99a0ec7960 100644 --- a/mod/beacon/blockchain/service.go +++ b/mod/beacon/blockchain/service.go @@ -24,9 +24,9 @@ import ( "context" "sync" - async "github.com/berachain/beacon-kit/mod/async/pkg/types" + asynctypes "github.com/berachain/beacon-kit/mod/async/pkg/types" "github.com/berachain/beacon-kit/mod/log" - async1 "github.com/berachain/beacon-kit/mod/primitives/pkg/async" + "github.com/berachain/beacon-kit/mod/primitives/pkg/async" "github.com/berachain/beacon-kit/mod/primitives/pkg/common" "github.com/berachain/beacon-kit/mod/primitives/pkg/transition" ) @@ -57,7 +57,7 @@ type Service[ // chainSpec holds the chain specifications. chainSpec common.ChainSpec // dispatcher is the dispatcher for the service. - dispatcher async.Dispatcher + dispatcher asynctypes.Dispatcher // executionEngine is the execution engine responsible for processing // // execution payloads. @@ -82,12 +82,12 @@ type Service[ // subFinalBlkReceived is a channel for receiving finalize beacon block // requests. - subFinalBlkReceived chan async1.Event[BeaconBlockT] + subFinalBlkReceived chan async.Event[BeaconBlockT] // subBlockReceived is a channel for receiving verify beacon block requests. - subBlockReceived chan async1.Event[BeaconBlockT] + subBlockReceived chan async.Event[BeaconBlockT] // subGenDataReceived is a subscription for receiving genesis data // received events. - subGenDataReceived chan async1.Event[GenesisT] + subGenDataReceived chan async.Event[GenesisT] } // NewService creates a new validator service. @@ -112,7 +112,7 @@ func NewService[ ], logger log.Logger[any], chainSpec common.ChainSpec, - dispatcher async.Dispatcher, + dispatcher asynctypes.Dispatcher, executionEngine ExecutionEngine[PayloadAttributesT], localBuilder LocalBuilder[BeaconStateT], stateProcessor StateProcessor[ @@ -144,9 +144,9 @@ func NewService[ metrics: newChainMetrics(telemetrySink), optimisticPayloadBuilds: optimisticPayloadBuilds, forceStartupSyncOnce: new(sync.Once), - subFinalBlkReceived: make(chan async1.Event[BeaconBlockT]), - subBlockReceived: make(chan async1.Event[BeaconBlockT]), - subGenDataReceived: make(chan async1.Event[GenesisT]), + subFinalBlkReceived: make(chan async.Event[BeaconBlockT]), + subBlockReceived: make(chan async.Event[BeaconBlockT]), + subGenDataReceived: make(chan async.Event[GenesisT]), } } @@ -164,19 +164,19 @@ func (s *Service[ _, BeaconBlockT, _, _, _, _, _, _, GenesisT, _, ]) Start(ctx context.Context) error { if err := s.dispatcher.Subscribe( - async1.GenesisDataReceived, s.subGenDataReceived, + async.GenesisDataReceived, s.subGenDataReceived, ); err != nil { return err } if err := s.dispatcher.Subscribe( - async1.BeaconBlockReceived, s.subBlockReceived, + async.BeaconBlockReceived, s.subBlockReceived, ); err != nil { return err } if err := s.dispatcher.Subscribe( - async1.FinalBeaconBlockReceived, s.subFinalBlkReceived, + async.FinalBeaconBlockReceived, s.subFinalBlkReceived, ); err != nil { return err } @@ -210,7 +210,7 @@ func (s *Service[ func (s *Service[ _, _, _, _, _, _, _, _, GenesisT, _, -]) handleGenDataReceived(msg async1.Event[GenesisT]) { +]) handleGenDataReceived(msg async.Event[GenesisT]) { var ( valUpdates transition.ValidatorUpdates genesisErr error @@ -227,9 +227,9 @@ func (s *Service[ // Emit the event containing the validator updates. if err := s.dispatcher.Publish( - async1.NewEvent( + async.NewEvent( msg.Context(), - async1.GenesisDataProcessed, + async.GenesisDataProcessed, valUpdates, genesisErr, ), @@ -245,7 +245,7 @@ func (s *Service[ func (s *Service[ _, BeaconBlockT, _, _, _, _, _, _, _, _, ]) handleBeaconBlockReceived( - msg async1.Event[BeaconBlockT], + msg async.Event[BeaconBlockT], ) { // If the block is nil, exit early. if msg.Error() != nil { @@ -256,9 +256,9 @@ func (s *Service[ // emit a BeaconBlockVerified event with the error result from \ // VerifyIncomingBlock if err := s.dispatcher.Publish( - async1.NewEvent( + async.NewEvent( msg.Context(), - async1.BeaconBlockVerified, + async.BeaconBlockVerified, msg.Data(), s.VerifyIncomingBlock(msg.Context(), msg.Data()), ), @@ -273,7 +273,7 @@ func (s *Service[ func (s *Service[ _, BeaconBlockT, _, _, _, _, _, _, _, _, ]) handleBeaconBlockFinalization( - msg async1.Event[BeaconBlockT], + msg async.Event[BeaconBlockT], ) { var ( valUpdates transition.ValidatorUpdates @@ -295,9 +295,9 @@ func (s *Service[ // Emit the event containing the validator updates. if err := s.dispatcher.Publish( - async1.NewEvent( + async.NewEvent( msg.Context(), - async1.FinalValidatorUpdatesProcessed, + async.FinalValidatorUpdatesProcessed, valUpdates, finalizeErr, ), diff --git a/mod/beacon/validator/service.go b/mod/beacon/validator/service.go index 309a7ea0f1..e2314aa116 100644 --- a/mod/beacon/validator/service.go +++ b/mod/beacon/validator/service.go @@ -23,9 +23,9 @@ package validator import ( "context" - async "github.com/berachain/beacon-kit/mod/async/pkg/types" + asynctypes "github.com/berachain/beacon-kit/mod/async/pkg/types" "github.com/berachain/beacon-kit/mod/log" - async1 "github.com/berachain/beacon-kit/mod/primitives/pkg/async" + "github.com/berachain/beacon-kit/mod/primitives/pkg/async" "github.com/berachain/beacon-kit/mod/primitives/pkg/common" "github.com/berachain/beacon-kit/mod/primitives/pkg/crypto" "github.com/berachain/beacon-kit/mod/primitives/pkg/transition" @@ -62,7 +62,7 @@ type Service[ // sb is the beacon state backend. sb StorageBackend[BeaconStateT, DepositStoreT] // dispatcher is the dispatcher. - dispatcher async.EventDispatcher + dispatcher asynctypes.EventDispatcher // stateProcessor is responsible for processing the state. stateProcessor StateProcessor[ BeaconBlockT, @@ -81,7 +81,7 @@ type Service[ // metrics is a metrics collector. metrics *validatorMetrics // subNewSlot is a channel for new slot events. - subNewSlot chan async1.Event[SlotDataT] + subNewSlot chan async.Event[SlotDataT] } // NewService creates a new validator service. @@ -117,7 +117,7 @@ func NewService[ localPayloadBuilder PayloadBuilder[BeaconStateT, ExecutionPayloadT], remotePayloadBuilders []PayloadBuilder[BeaconStateT, ExecutionPayloadT], ts TelemetrySink, - dispatcher async.EventDispatcher, + dispatcher asynctypes.EventDispatcher, ) *Service[ AttestationDataT, BeaconBlockT, BeaconBlockBodyT, BeaconStateT, BlobSidecarsT, DepositT, DepositStoreT, Eth1DataT, ExecutionPayloadT, @@ -140,7 +140,7 @@ func NewService[ remotePayloadBuilders: remotePayloadBuilders, metrics: newValidatorMetrics(ts), dispatcher: dispatcher, - subNewSlot: make(chan async1.Event[SlotDataT]), + subNewSlot: make(chan async.Event[SlotDataT]), } } @@ -159,7 +159,7 @@ func (s *Service[ ctx context.Context, ) error { // subscribe to new slot events - err := s.dispatcher.Subscribe(async1.NewSlot, s.subNewSlot) + err := s.dispatcher.Subscribe(async.NewSlot, s.subNewSlot) if err != nil { return err } @@ -185,7 +185,7 @@ func (s *Service[_, _, _, _, _, _, _, _, _, _, _, _, SlotDataT]) eventLoop( // slot data and emits an event containing the built block and sidecars. func (s *Service[ _, BeaconBlockT, _, _, BlobSidecarsT, _, _, _, _, _, _, _, SlotDataT, -]) handleNewSlot(req async1.Event[SlotDataT]) { +]) handleNewSlot(req async.Event[SlotDataT]) { var ( blk BeaconBlockT sidecars BlobSidecarsT @@ -201,14 +201,14 @@ func (s *Service[ // emit a built block event with the built block and the error if bbErr := s.dispatcher.Publish( - async1.NewEvent(req.Context(), async1.BuiltBeaconBlock, blk, err), + async.NewEvent(req.Context(), async.BuiltBeaconBlock, blk, err), ); bbErr != nil { s.logger.Error("failed to dispatch built block", "err", err) } // emit a built sidecars event with the built sidecars and the error if scErr := s.dispatcher.Publish( - async1.NewEvent(req.Context(), async1.BuiltSidecars, sidecars, err), + async.NewEvent(req.Context(), async.BuiltSidecars, sidecars, err), ); scErr != nil { s.logger.Error("failed to dispatch built sidecars", "err", err) } diff --git a/mod/da/pkg/da/service.go b/mod/da/pkg/da/service.go index 61029db397..df849fdbdc 100644 --- a/mod/da/pkg/da/service.go +++ b/mod/da/pkg/da/service.go @@ -23,9 +23,9 @@ package da import ( "context" - async "github.com/berachain/beacon-kit/mod/async/pkg/types" + asynctypes "github.com/berachain/beacon-kit/mod/async/pkg/types" "github.com/berachain/beacon-kit/mod/log" - async1 "github.com/berachain/beacon-kit/mod/primitives/pkg/async" + "github.com/berachain/beacon-kit/mod/primitives/pkg/async" ) // The Data Availability service is responsible for verifying and processing @@ -41,10 +41,10 @@ type Service[ AvailabilityStoreT, BlobSidecarsT, ] - dispatcher async.EventDispatcher + dispatcher asynctypes.EventDispatcher logger log.Logger[any] - subSidecarsReceived chan async1.Event[BlobSidecarsT] - subFinalBlobSidecars chan async1.Event[BlobSidecarsT] + subSidecarsReceived chan async.Event[BlobSidecarsT] + subFinalBlobSidecars chan async.Event[BlobSidecarsT] } // NewService returns a new DA service. @@ -56,7 +56,7 @@ func NewService[ bp BlobProcessor[ AvailabilityStoreT, BlobSidecarsT, ], - dispatcher async.EventDispatcher, + dispatcher asynctypes.EventDispatcher, logger log.Logger[any], ) *Service[ AvailabilityStoreT, BlobSidecarsT, @@ -68,8 +68,8 @@ func NewService[ bp: bp, dispatcher: dispatcher, logger: logger, - subSidecarsReceived: make(chan async1.Event[BlobSidecarsT]), - subFinalBlobSidecars: make(chan async1.Event[BlobSidecarsT]), + subSidecarsReceived: make(chan async.Event[BlobSidecarsT]), + subFinalBlobSidecars: make(chan async.Event[BlobSidecarsT]), } } @@ -85,14 +85,14 @@ func (s *Service[_, BlobSidecarsT]) Start(ctx context.Context) error { // subscribe to SidecarsReceived events if err = s.dispatcher.Subscribe( - async1.SidecarsReceived, s.subSidecarsReceived, + async.SidecarsReceived, s.subSidecarsReceived, ); err != nil { return err } // subscribe to FinalSidecarsReceived events if err = s.dispatcher.Subscribe( - async1.FinalSidecarsReceived, s.subFinalBlobSidecars, + async.FinalSidecarsReceived, s.subFinalBlobSidecars, ); err != nil { return err } @@ -123,7 +123,7 @@ func (s *Service[_, BlobSidecarsT]) eventLoop(ctx context.Context) { // event. // It processes the sidecars and publishes a BlobSidecarsProcessed event. func (s *Service[_, BlobSidecarsT]) handleFinalSidecarsReceived( - msg async1.Event[BlobSidecarsT], + msg async.Event[BlobSidecarsT], ) { if err := s.processSidecars(msg.Context(), msg.Data()); err != nil { s.logger.Error( @@ -137,7 +137,7 @@ func (s *Service[_, BlobSidecarsT]) handleFinalSidecarsReceived( // handleSidecarsReceived handles the SidecarsVerifyRequest event. // It verifies the sidecars and publishes a SidecarsVerified event. func (s *Service[_, BlobSidecarsT]) handleSidecarsReceived( - msg async1.Event[BlobSidecarsT], + msg async.Event[BlobSidecarsT], ) { var sidecarsErr error // verify the sidecars. @@ -151,8 +151,8 @@ func (s *Service[_, BlobSidecarsT]) handleSidecarsReceived( // emit the sidecars verification event with error from verifySidecars if err := s.dispatcher.Publish( - async1.NewEvent( - msg.Context(), async1.SidecarsVerified, msg.Data(), sidecarsErr, + async.NewEvent( + msg.Context(), async.SidecarsVerified, msg.Data(), sidecarsErr, ), ); err != nil { s.logger.Error("failed to publish event", "err", err) diff --git a/mod/execution/pkg/deposit/service.go b/mod/execution/pkg/deposit/service.go index 934b5716a6..cef0c0c101 100644 --- a/mod/execution/pkg/deposit/service.go +++ b/mod/execution/pkg/deposit/service.go @@ -23,9 +23,9 @@ package deposit import ( "context" - async "github.com/berachain/beacon-kit/mod/async/pkg/types" + asynctypes "github.com/berachain/beacon-kit/mod/async/pkg/types" "github.com/berachain/beacon-kit/mod/log" - async1 "github.com/berachain/beacon-kit/mod/primitives/pkg/async" + "github.com/berachain/beacon-kit/mod/primitives/pkg/async" "github.com/berachain/beacon-kit/mod/primitives/pkg/math" ) @@ -46,9 +46,9 @@ type Service[ // ds is the deposit store that stores deposits. ds Store[DepositT] // dispatcher is the dispatcher for the service. - dispatcher async.EventDispatcher + dispatcher asynctypes.EventDispatcher // subFinalizedBlockEvents is the channel that provides finalized block events. - subFinalizedBlockEvents chan async1.Event[BeaconBlockT] + subFinalizedBlockEvents chan async.Event[BeaconBlockT] // metrics is the metrics for the deposit service. metrics *metrics // failedBlocks is a map of blocks that failed to be processed to be @@ -69,7 +69,7 @@ func NewService[ telemetrySink TelemetrySink, ds Store[DepositT], dc Contract[DepositT], - dispatcher async.EventDispatcher, + dispatcher asynctypes.EventDispatcher, ) *Service[ BeaconBlockT, BeaconBlockBodyT, DepositT, ExecutionPayloadT, WithdrawalCredentialsT, @@ -83,7 +83,7 @@ func NewService[ ds: ds, eth1FollowDistance: eth1FollowDistance, failedBlocks: make(map[math.Slot]struct{}), - subFinalizedBlockEvents: make(chan async1.Event[BeaconBlockT]), + subFinalizedBlockEvents: make(chan async.Event[BeaconBlockT]), logger: logger, metrics: newMetrics(telemetrySink), } @@ -94,10 +94,10 @@ func (s *Service[ _, _, _, _, _, ]) Start(ctx context.Context) error { if err := s.dispatcher.Subscribe( - async1.BeaconBlockFinalizedEvent, s.subFinalizedBlockEvents, + async.BeaconBlockFinalizedEvent, s.subFinalizedBlockEvents, ); err != nil { s.logger.Error("failed to subscribe to event", "event", - async1.BeaconBlockFinalizedEvent, "err", err) + async.BeaconBlockFinalizedEvent, "err", err) return err }