Skip to content

Commit

Permalink
omg im so dumb
Browse files Browse the repository at this point in the history
  • Loading branch information
ocnc2 committed Aug 16, 2024
1 parent 2488a15 commit 7c4a05d
Show file tree
Hide file tree
Showing 9 changed files with 46 additions and 39 deletions.
2 changes: 1 addition & 1 deletion mod/async/pkg/publisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (p *Publisher[T]) start(ctx context.Context) {
func (p *Publisher[T]) Publish(msg types.BaseEvent) error {
typedMsg, err := ensureType[T](msg)
if err != nil {
return errIncompatibleAssignee(msg, *new(T))
return err
}
ctx := msg.Context()
select {
Expand Down
6 changes: 3 additions & 3 deletions mod/async/pkg/types/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,18 @@ type Event[DataT any] interface {
BaseEvent
Data() DataT
Error() error
Is(messageType EventID) bool
Is(id EventID) bool
}

// NewEvent creates a new Event with the given context and beacon event.
func NewEvent[
DataT any,
](
ctx context.Context, messageType EventID, data DataT, errs ...error,
ctx context.Context, id EventID, data DataT, errs ...error,
) Event[DataT] {
return &event[DataT]{
ctx: ctx,
id: messageType,
id: id,
data: data,
err: errors.Join(errs...),
}
Expand Down
3 changes: 0 additions & 3 deletions mod/async/pkg/types/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package types

import (
"context"
"fmt"
)

// Subscription is a channel that receives events.
Expand Down Expand Up @@ -63,10 +62,8 @@ func (s Subscription[T]) Listen(
// Await will block until an event is received from the subscription or the
// context is canceled.
func (s Subscription[T]) Await(ctx context.Context) (T, error) {
fmt.Println("AWAITING EVENT")
select {
case event := <-s:
fmt.Println("GOT EVENT")
return event, nil
case <-ctx.Done():
return *new(T), ctx.Err()
Expand Down
3 changes: 0 additions & 3 deletions mod/beacon/blockchain/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package blockchain

import (
"context"
"fmt"
"sync"

async "github.com/berachain/beacon-kit/mod/async/pkg/types"
Expand Down Expand Up @@ -227,7 +226,6 @@ func (s *Service[
if genesisErr != nil {
s.logger.Error("Failed to process genesis data", "error", genesisErr)
}
fmt.Println("GEN DATA PROCESSED")

// Emit the event containing the validator updates.
if err := s.dispatcher.PublishEvent(
Expand All @@ -244,7 +242,6 @@ func (s *Service[
)
panic(err)
}
fmt.Println("EMITTED EVENT")
}

func (s *Service[
Expand Down
2 changes: 2 additions & 0 deletions mod/node-core/pkg/components/service_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type ServiceRegistryInput[
LoggerT log.AdvancedLogger[any, LoggerT],
] struct {
depinject.In
Middleware *ABCIMiddleware
BlockStoreService *BlockStoreService
ChainService *ChainService
DAService *DAService
Expand All @@ -54,6 +55,7 @@ func ProvideServiceRegistry[
) *service.Registry {
return service.NewRegistry(
service.WithLogger(in.Logger),
service.WithService(in.Middleware),
service.WithService(in.Dispatcher),
service.WithService(in.ValidatorService),
service.WithService(in.BlockStoreService),
Expand Down
46 changes: 26 additions & 20 deletions mod/runtime/pkg/middleware/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package middleware

import (
"context"
"fmt"
"time"

async "github.com/berachain/beacon-kit/mod/async/pkg/types"
Expand All @@ -48,9 +47,11 @@ func (h *ABCIMiddleware[
bz []byte,
) (transition.ValidatorUpdates, error) {
var (
err error
gdpEvent async.Event[transition.ValidatorUpdates]
err error
gdpEvent async.Event[transition.ValidatorUpdates]
awaitCtx, cancel = context.WithTimeout(ctx, AwaitTimeout)
)
defer cancel()
// in theory this channel should already be empty, but we clear it anyways
h.subGenDataProcessed.Clear()

Expand All @@ -66,11 +67,11 @@ func (h *ABCIMiddleware[
return nil, err
}

gdpEvent, err = h.subGenDataProcessed.Await(ctx)
gdpEvent, err = h.subGenDataProcessed.Await(awaitCtx)
if err != nil {
return nil, err
}
fmt.Println("GOT IT BACK")

return gdpEvent.Data(), gdpEvent.Error()
}

Expand All @@ -86,11 +87,13 @@ func (h *ABCIMiddleware[
slotData SlotDataT,
) ([]byte, []byte, error) {
var (
err error
builtBBEvent async.Event[BeaconBlockT]
builtSCEvent async.Event[BlobSidecarsT]
startTime = time.Now()
err error
builtBBEvent async.Event[BeaconBlockT]
builtSCEvent async.Event[BlobSidecarsT]
startTime = time.Now()
awaitCtx, cancel = context.WithTimeout(ctx, AwaitTimeout)
)
defer cancel()
defer h.metrics.measurePrepareProposalDuration(startTime)
// in theory these subs should already be empty, but we clear them anyways
h.subBuiltBeaconBlock.Clear()
Expand All @@ -105,7 +108,7 @@ func (h *ABCIMiddleware[
}

// wait for built beacon block
builtBBEvent, err = h.subBuiltBeaconBlock.Await(ctx)
builtBBEvent, err = h.subBuiltBeaconBlock.Await(awaitCtx)
if err != nil {
return nil, nil, err
}
Expand All @@ -114,7 +117,7 @@ func (h *ABCIMiddleware[
}

// wait for built sidecars
builtSCEvent, err = h.subBuiltSidecars.Await(ctx)
builtSCEvent, err = h.subBuiltSidecars.Await(awaitCtx)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -165,12 +168,14 @@ func (h *ABCIMiddleware[
req proto.Message,
) (proto.Message, error) {
var (
err error
startTime = time.Now()
blk BeaconBlockT
sidecars BlobSidecarsT
err error
startTime = time.Now()
blk BeaconBlockT
sidecars BlobSidecarsT
awaitCtx, cancel = context.WithTimeout(ctx, AwaitTimeout)
)
// in theory these subs should already be empty, but we clear them anyways
defer cancel()
// in theory these subs should already be empty, probably redundant
h.subBBVerified.Clear()
h.subSCVerified.Clear()
abciReq, ok := req.(*cmtabci.ProcessProposalRequest)
Expand All @@ -185,7 +190,6 @@ func (h *ABCIMiddleware[
return h.createProcessProposalResponse(errors.WrapNonFatal(err))
}

// TODO: implement service
// notify that the beacon block has been received.
if err = h.dispatcher.PublishEvent(
async.NewEvent(ctx, events.BeaconBlockReceived, blk),
Expand All @@ -206,11 +210,11 @@ func (h *ABCIMiddleware[
}

// err if the built beacon block or sidecars failed verification.
_, err = h.subBBVerified.Await(ctx)
_, err = h.subBBVerified.Await(awaitCtx)
if err != nil {
return h.createProcessProposalResponse(err)
}
_, err = h.subSCVerified.Await(ctx)
_, err = h.subSCVerified.Await(awaitCtx)
if err != nil {
return h.createProcessProposalResponse(err)
}
Expand Down Expand Up @@ -245,7 +249,9 @@ func (h *ABCIMiddleware[
blk BeaconBlockT
blobs BlobSidecarsT
finalValUpdatesEvent async.Event[transition.ValidatorUpdates]
awaitCtx, cancel = context.WithTimeout(ctx, AwaitTimeout)
)
defer cancel()
// in theory this sub should already be empty, but we clear them anyways
h.subFinalValidatorUpdates.Clear()
abciReq, ok := req.(*cmtabci.FinalizeBlockRequest)
Expand Down Expand Up @@ -281,7 +287,7 @@ func (h *ABCIMiddleware[
}

// wait for the final validator updates.
finalValUpdatesEvent, err = h.subFinalValidatorUpdates.Await(ctx)
finalValUpdatesEvent, err = h.subFinalValidatorUpdates.Await(awaitCtx)
if err != nil {
return nil, err
}
Expand Down
4 changes: 4 additions & 0 deletions mod/runtime/pkg/middleware/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@

package middleware

import "time"

const (
// BeaconBlockTxIndex represents the index of the beacon block transaction.
// It is the first transaction in the tx list.
BeaconBlockTxIndex uint = iota
// BlobSidecarsTxIndex represents the index of the blob sidecar transaction.
// It follows the beacon block transaction in the tx list.
BlobSidecarsTxIndex
// AwaitTimeout is the timeout for awaiting events.
AwaitTimeout = 2 * time.Second
)
17 changes: 9 additions & 8 deletions mod/runtime/pkg/middleware/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package middleware

import (
"context"
"encoding/json"

"github.com/berachain/beacon-kit/mod/async/pkg/types"
Expand Down Expand Up @@ -133,34 +134,34 @@ func NewABCIMiddleware[
func (am *ABCIMiddleware[
AvailabilityStoreT, BeaconBlockT, BeaconBlockBundleT, BlobSidecarsT,
DepositT, ExecutionPayloadT, GenesisT, SlotDataT,
]) Start() error {
// subGenDat
if err := am.dispatcher.Subscribe(
]) Start(_ context.Context) error {
var err error
if err = am.dispatcher.Subscribe(
events.GenesisDataProcessed, am.subGenDataProcessed,
); err != nil {
return err
}
if err := am.dispatcher.Subscribe(
if err = am.dispatcher.Subscribe(
events.BuiltBeaconBlock, am.subBuiltBeaconBlock,
); err != nil {
return err
}
if err := am.dispatcher.Subscribe(
if err = am.dispatcher.Subscribe(
events.BuiltSidecars, am.subBuiltSidecars,
); err != nil {
return err
}
if err := am.dispatcher.Subscribe(
if err = am.dispatcher.Subscribe(
events.BeaconBlockVerified, am.subBBVerified,
); err != nil {
return err
}
if err := am.dispatcher.Subscribe(
if err = am.dispatcher.Subscribe(
events.SidecarsVerified, am.subSCVerified,
); err != nil {
return err
}
if err := am.dispatcher.Subscribe(
if err = am.dispatcher.Subscribe(
events.FinalValidatorUpdatesProcessed, am.subFinalValidatorUpdates,
); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion mod/storage/pkg/pruner/mocks/block_event.mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 7c4a05d

Please sign in to comment.