From 7c4a05d685aac67e2c7f5f22e8ccd1dc586695ad Mon Sep 17 00:00:00 2001 From: ocnc-two Date: Fri, 16 Aug 2024 03:38:52 -0400 Subject: [PATCH] omg im so dumb --- mod/async/pkg/publisher/publisher.go | 2 +- mod/async/pkg/types/event.go | 6 +-- mod/async/pkg/types/subscription.go | 3 -- mod/beacon/blockchain/service.go | 3 -- .../pkg/components/service_registry.go | 2 + mod/runtime/pkg/middleware/abci.go | 46 +++++++++++-------- mod/runtime/pkg/middleware/constants.go | 4 ++ mod/runtime/pkg/middleware/middleware.go | 17 +++---- .../pkg/pruner/mocks/block_event.mock.go | 2 +- 9 files changed, 46 insertions(+), 39 deletions(-) diff --git a/mod/async/pkg/publisher/publisher.go b/mod/async/pkg/publisher/publisher.go index 1984b801dd..5c45dee49d 100644 --- a/mod/async/pkg/publisher/publisher.go +++ b/mod/async/pkg/publisher/publisher.go @@ -84,7 +84,7 @@ func (p *Publisher[T]) start(ctx context.Context) { func (p *Publisher[T]) Publish(msg types.BaseEvent) error { typedMsg, err := ensureType[T](msg) if err != nil { - return errIncompatibleAssignee(msg, *new(T)) + return err } ctx := msg.Context() select { diff --git a/mod/async/pkg/types/event.go b/mod/async/pkg/types/event.go index 6ec58679f2..d5d2c03c60 100644 --- a/mod/async/pkg/types/event.go +++ b/mod/async/pkg/types/event.go @@ -41,18 +41,18 @@ type Event[DataT any] interface { BaseEvent Data() DataT Error() error - Is(messageType EventID) bool + Is(id EventID) bool } // NewEvent creates a new Event with the given context and beacon event. func NewEvent[ DataT any, ]( - ctx context.Context, messageType EventID, data DataT, errs ...error, + ctx context.Context, id EventID, data DataT, errs ...error, ) Event[DataT] { return &event[DataT]{ ctx: ctx, - id: messageType, + id: id, data: data, err: errors.Join(errs...), } diff --git a/mod/async/pkg/types/subscription.go b/mod/async/pkg/types/subscription.go index 7f6b3fe8a3..0720babc15 100644 --- a/mod/async/pkg/types/subscription.go +++ b/mod/async/pkg/types/subscription.go @@ -22,7 +22,6 @@ package types import ( "context" - "fmt" ) // Subscription is a channel that receives events. @@ -63,10 +62,8 @@ func (s Subscription[T]) Listen( // Await will block until an event is received from the subscription or the // context is canceled. func (s Subscription[T]) Await(ctx context.Context) (T, error) { - fmt.Println("AWAITING EVENT") select { case event := <-s: - fmt.Println("GOT EVENT") return event, nil case <-ctx.Done(): return *new(T), ctx.Err() diff --git a/mod/beacon/blockchain/service.go b/mod/beacon/blockchain/service.go index 1fde0d2be9..07fc0a14cd 100644 --- a/mod/beacon/blockchain/service.go +++ b/mod/beacon/blockchain/service.go @@ -22,7 +22,6 @@ package blockchain import ( "context" - "fmt" "sync" async "github.com/berachain/beacon-kit/mod/async/pkg/types" @@ -227,7 +226,6 @@ func (s *Service[ if genesisErr != nil { s.logger.Error("Failed to process genesis data", "error", genesisErr) } - fmt.Println("GEN DATA PROCESSED") // Emit the event containing the validator updates. if err := s.dispatcher.PublishEvent( @@ -244,7 +242,6 @@ func (s *Service[ ) panic(err) } - fmt.Println("EMITTED EVENT") } func (s *Service[ diff --git a/mod/node-core/pkg/components/service_registry.go b/mod/node-core/pkg/components/service_registry.go index e922543d8a..9294add654 100644 --- a/mod/node-core/pkg/components/service_registry.go +++ b/mod/node-core/pkg/components/service_registry.go @@ -32,6 +32,7 @@ type ServiceRegistryInput[ LoggerT log.AdvancedLogger[any, LoggerT], ] struct { depinject.In + Middleware *ABCIMiddleware BlockStoreService *BlockStoreService ChainService *ChainService DAService *DAService @@ -54,6 +55,7 @@ func ProvideServiceRegistry[ ) *service.Registry { return service.NewRegistry( service.WithLogger(in.Logger), + service.WithService(in.Middleware), service.WithService(in.Dispatcher), service.WithService(in.ValidatorService), service.WithService(in.BlockStoreService), diff --git a/mod/runtime/pkg/middleware/abci.go b/mod/runtime/pkg/middleware/abci.go index 1de13f1a19..2c16619ae2 100644 --- a/mod/runtime/pkg/middleware/abci.go +++ b/mod/runtime/pkg/middleware/abci.go @@ -22,7 +22,6 @@ package middleware import ( "context" - "fmt" "time" async "github.com/berachain/beacon-kit/mod/async/pkg/types" @@ -48,9 +47,11 @@ func (h *ABCIMiddleware[ bz []byte, ) (transition.ValidatorUpdates, error) { var ( - err error - gdpEvent async.Event[transition.ValidatorUpdates] + err error + gdpEvent async.Event[transition.ValidatorUpdates] + awaitCtx, cancel = context.WithTimeout(ctx, AwaitTimeout) ) + defer cancel() // in theory this channel should already be empty, but we clear it anyways h.subGenDataProcessed.Clear() @@ -66,11 +67,11 @@ func (h *ABCIMiddleware[ return nil, err } - gdpEvent, err = h.subGenDataProcessed.Await(ctx) + gdpEvent, err = h.subGenDataProcessed.Await(awaitCtx) if err != nil { return nil, err } - fmt.Println("GOT IT BACK") + return gdpEvent.Data(), gdpEvent.Error() } @@ -86,11 +87,13 @@ func (h *ABCIMiddleware[ slotData SlotDataT, ) ([]byte, []byte, error) { var ( - err error - builtBBEvent async.Event[BeaconBlockT] - builtSCEvent async.Event[BlobSidecarsT] - startTime = time.Now() + err error + builtBBEvent async.Event[BeaconBlockT] + builtSCEvent async.Event[BlobSidecarsT] + startTime = time.Now() + awaitCtx, cancel = context.WithTimeout(ctx, AwaitTimeout) ) + defer cancel() defer h.metrics.measurePrepareProposalDuration(startTime) // in theory these subs should already be empty, but we clear them anyways h.subBuiltBeaconBlock.Clear() @@ -105,7 +108,7 @@ func (h *ABCIMiddleware[ } // wait for built beacon block - builtBBEvent, err = h.subBuiltBeaconBlock.Await(ctx) + builtBBEvent, err = h.subBuiltBeaconBlock.Await(awaitCtx) if err != nil { return nil, nil, err } @@ -114,7 +117,7 @@ func (h *ABCIMiddleware[ } // wait for built sidecars - builtSCEvent, err = h.subBuiltSidecars.Await(ctx) + builtSCEvent, err = h.subBuiltSidecars.Await(awaitCtx) if err != nil { return nil, nil, err } @@ -165,12 +168,14 @@ func (h *ABCIMiddleware[ req proto.Message, ) (proto.Message, error) { var ( - err error - startTime = time.Now() - blk BeaconBlockT - sidecars BlobSidecarsT + err error + startTime = time.Now() + blk BeaconBlockT + sidecars BlobSidecarsT + awaitCtx, cancel = context.WithTimeout(ctx, AwaitTimeout) ) - // in theory these subs should already be empty, but we clear them anyways + defer cancel() + // in theory these subs should already be empty, probably redundant h.subBBVerified.Clear() h.subSCVerified.Clear() abciReq, ok := req.(*cmtabci.ProcessProposalRequest) @@ -185,7 +190,6 @@ func (h *ABCIMiddleware[ return h.createProcessProposalResponse(errors.WrapNonFatal(err)) } - // TODO: implement service // notify that the beacon block has been received. if err = h.dispatcher.PublishEvent( async.NewEvent(ctx, events.BeaconBlockReceived, blk), @@ -206,11 +210,11 @@ func (h *ABCIMiddleware[ } // err if the built beacon block or sidecars failed verification. - _, err = h.subBBVerified.Await(ctx) + _, err = h.subBBVerified.Await(awaitCtx) if err != nil { return h.createProcessProposalResponse(err) } - _, err = h.subSCVerified.Await(ctx) + _, err = h.subSCVerified.Await(awaitCtx) if err != nil { return h.createProcessProposalResponse(err) } @@ -245,7 +249,9 @@ func (h *ABCIMiddleware[ blk BeaconBlockT blobs BlobSidecarsT finalValUpdatesEvent async.Event[transition.ValidatorUpdates] + awaitCtx, cancel = context.WithTimeout(ctx, AwaitTimeout) ) + defer cancel() // in theory this sub should already be empty, but we clear them anyways h.subFinalValidatorUpdates.Clear() abciReq, ok := req.(*cmtabci.FinalizeBlockRequest) @@ -281,7 +287,7 @@ func (h *ABCIMiddleware[ } // wait for the final validator updates. - finalValUpdatesEvent, err = h.subFinalValidatorUpdates.Await(ctx) + finalValUpdatesEvent, err = h.subFinalValidatorUpdates.Await(awaitCtx) if err != nil { return nil, err } diff --git a/mod/runtime/pkg/middleware/constants.go b/mod/runtime/pkg/middleware/constants.go index 4862a9a5b9..283b75aa59 100644 --- a/mod/runtime/pkg/middleware/constants.go +++ b/mod/runtime/pkg/middleware/constants.go @@ -20,6 +20,8 @@ package middleware +import "time" + const ( // BeaconBlockTxIndex represents the index of the beacon block transaction. // It is the first transaction in the tx list. @@ -27,4 +29,6 @@ const ( // BlobSidecarsTxIndex represents the index of the blob sidecar transaction. // It follows the beacon block transaction in the tx list. BlobSidecarsTxIndex + // AwaitTimeout is the timeout for awaiting events. + AwaitTimeout = 2 * time.Second ) diff --git a/mod/runtime/pkg/middleware/middleware.go b/mod/runtime/pkg/middleware/middleware.go index 098d179089..9d2d86f0b1 100644 --- a/mod/runtime/pkg/middleware/middleware.go +++ b/mod/runtime/pkg/middleware/middleware.go @@ -21,6 +21,7 @@ package middleware import ( + "context" "encoding/json" "github.com/berachain/beacon-kit/mod/async/pkg/types" @@ -133,34 +134,34 @@ func NewABCIMiddleware[ func (am *ABCIMiddleware[ AvailabilityStoreT, BeaconBlockT, BeaconBlockBundleT, BlobSidecarsT, DepositT, ExecutionPayloadT, GenesisT, SlotDataT, -]) Start() error { - // subGenDat - if err := am.dispatcher.Subscribe( +]) Start(_ context.Context) error { + var err error + if err = am.dispatcher.Subscribe( events.GenesisDataProcessed, am.subGenDataProcessed, ); err != nil { return err } - if err := am.dispatcher.Subscribe( + if err = am.dispatcher.Subscribe( events.BuiltBeaconBlock, am.subBuiltBeaconBlock, ); err != nil { return err } - if err := am.dispatcher.Subscribe( + if err = am.dispatcher.Subscribe( events.BuiltSidecars, am.subBuiltSidecars, ); err != nil { return err } - if err := am.dispatcher.Subscribe( + if err = am.dispatcher.Subscribe( events.BeaconBlockVerified, am.subBBVerified, ); err != nil { return err } - if err := am.dispatcher.Subscribe( + if err = am.dispatcher.Subscribe( events.SidecarsVerified, am.subSCVerified, ); err != nil { return err } - if err := am.dispatcher.Subscribe( + if err = am.dispatcher.Subscribe( events.FinalValidatorUpdatesProcessed, am.subFinalValidatorUpdates, ); err != nil { return err diff --git a/mod/storage/pkg/pruner/mocks/block_event.mock.go b/mod/storage/pkg/pruner/mocks/block_event.mock.go index d39df6b022..5a23af48d4 100644 --- a/mod/storage/pkg/pruner/mocks/block_event.mock.go +++ b/mod/storage/pkg/pruner/mocks/block_event.mock.go @@ -185,7 +185,7 @@ type BlockEvent_Is_Call[BeaconBlockT pruner.BeaconBlock] struct { } // Is is a helper method to define mock.On call -// - _a0 types.MessageID +// - _a0 types.EventID func (_e *BlockEvent_Expecter[BeaconBlockT]) Is(_a0 interface{}) *BlockEvent_Is_Call[BeaconBlockT] { return &BlockEvent_Is_Call[BeaconBlockT]{Call: _e.mock.On("Is", _a0)} }