From 820f78dccbb019009a406ab65189c931d570ac74 Mon Sep 17 00:00:00 2001 From: Ethen Pociask Date: Wed, 15 May 2024 17:48:51 -0700 Subject: [PATCH] feat(op-plasma-eigenda): Metrics server --- cmd/daserver/entrypoint.go | 31 ++- cmd/daserver/flags.go | 4 + cmd/daserver/main.go | 8 + damgr.go | 425 ------------------------------------- damgr_test.go | 366 -------------------------------- damock.go | 99 --------- daserver.go | 11 +- dastate.go | 228 -------------------- metrics.go | 78 ------- metrics/metrics.go | 120 +++++++++++ 10 files changed, 170 insertions(+), 1200 deletions(-) delete mode 100644 damgr.go delete mode 100644 damgr_test.go delete mode 100644 damock.go delete mode 100644 dastate.go delete mode 100644 metrics.go create mode 100644 metrics/metrics.go diff --git a/cmd/daserver/entrypoint.go b/cmd/daserver/entrypoint.go index a454ed8b..d06fd5b7 100644 --- a/cmd/daserver/entrypoint.go +++ b/cmd/daserver/entrypoint.go @@ -1,8 +1,11 @@ package main import ( + "context" "fmt" + "net/http" + "github.com/Layr-Labs/op-plasma-eigenda/metrics" "github.com/urfave/cli/v2" plasma "github.com/Layr-Labs/op-plasma-eigenda" @@ -13,15 +16,23 @@ import ( "github.com/ethereum-optimism/optimism/op-service/opio" ) +type App struct { + DAServer *http.Server + MetricsSvr *http.Server +} + func StartDAServer(cliCtx *cli.Context) error { + println("CHECKING") if err := CheckRequired(cliCtx); err != nil { return err } - + println("Reading CLI CFG") cfg := ReadCLIConfig(cliCtx) if err := cfg.Check(); err != nil { return err } + println("HERERERER") + m := metrics.NewMetrics("default") log := oplog.NewLogger(oplog.AppOut(cliCtx), oplog.ReadCLIConfig(cliCtx)).New("role", "eigenda_plasma_server") oplog.SetGlobalLogHandler(log.Handler()) @@ -61,8 +72,7 @@ func StartDAServer(cliCtx *cli.Context) error { } store = eigenda } - - server := plasma.NewDAServer(cliCtx.String(ListenAddrFlagName), cliCtx.Int(PortFlagName), store, log) + server := plasma.NewDAServer(cliCtx.String(ListenAddrFlagName), cliCtx.Int(PortFlagName), store, log, m) if err := server.Start(); err != nil { return fmt.Errorf("failed to start the DA server") @@ -76,6 +86,21 @@ func StartDAServer(cliCtx *cli.Context) error { } }() + if cfg.MetricsCfg.Enabled { + log.Debug("starting metrics server", "addr", cfg.MetricsCfg.ListenAddr, "port", cfg.MetricsCfg.ListenPort) + metricsSrv, err := m.StartServer(cfg.MetricsCfg.ListenAddr, cfg.MetricsCfg.ListenPort) + if err != nil { + return fmt.Errorf("failed to start metrics server: %w", err) + } + defer func() { + if err := metricsSrv.Stop(context.Background()); err != nil { + log.Error("failed to stop metrics server", "err", err) + } + }() + log.Info("started metrics server", "addr", metricsSrv.Addr()) + m.RecordUp() + } + opio.BlockOnInterrupts() return nil diff --git a/cmd/daserver/flags.go b/cmd/daserver/flags.go index ac8cdfa5..1807e4fe 100644 --- a/cmd/daserver/flags.go +++ b/cmd/daserver/flags.go @@ -8,6 +8,7 @@ import ( "github.com/Layr-Labs/op-plasma-eigenda/eigenda" opservice "github.com/ethereum-optimism/optimism/op-service" oplog "github.com/ethereum-optimism/optimism/op-service/log" + opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" ) const ( @@ -61,6 +62,7 @@ var optionalFlags = []cli.Flag{ func init() { optionalFlags = append(optionalFlags, oplog.CLIFlags(EnvVarPrefix)...) optionalFlags = append(optionalFlags, eigenda.CLIFlags(EnvVarPrefix)...) + optionalFlags = append(optionalFlags, opmetrics.CLIFlags(EnvVarPrefix)...) Flags = append(requiredFlags, optionalFlags...) } @@ -71,6 +73,7 @@ type CLIConfig struct { FileStoreDirPath string S3Bucket string EigenDAConfig eigenda.Config + MetricsCfg opmetrics.CLIConfig } func ReadCLIConfig(ctx *cli.Context) CLIConfig { @@ -78,6 +81,7 @@ func ReadCLIConfig(ctx *cli.Context) CLIConfig { FileStoreDirPath: ctx.String(FileStorePathFlagName), S3Bucket: ctx.String(S3BucketFlagName), EigenDAConfig: eigenda.ReadConfig(ctx), + MetricsCfg: opmetrics.ReadCLIConfig(ctx), } } diff --git a/cmd/daserver/main.go b/cmd/daserver/main.go index 33f6b4e6..137f599a 100644 --- a/cmd/daserver/main.go +++ b/cmd/daserver/main.go @@ -7,9 +7,11 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/urfave/cli/v2" + "github.com/ethereum-optimism/optimism/op-node/metrics" opservice "github.com/ethereum-optimism/optimism/op-service" "github.com/ethereum-optimism/optimism/op-service/cliapp" oplog "github.com/ethereum-optimism/optimism/op-service/log" + "github.com/ethereum-optimism/optimism/op-service/metrics/doc" "github.com/ethereum-optimism/optimism/op-service/opio" ) @@ -25,6 +27,12 @@ func main() { app.Usage = "EigenDA Plasma DA Storage Service" app.Description = "Service for storing plasma DA inputs" app.Action = StartDAServer + app.Commands = []*cli.Command{ + { + Name: "doc", + Subcommands: doc.NewSubcommands(metrics.NewMetrics("default")), + }, + } ctx := opio.WithInterruptBlocker(context.Background()) err := app.RunContext(ctx, os.Args) diff --git a/damgr.go b/damgr.go deleted file mode 100644 index 4f7f83f8..00000000 --- a/damgr.go +++ /dev/null @@ -1,425 +0,0 @@ -package plasma - -import ( - "context" - "errors" - "fmt" - "io" - - "github.com/ethereum/go-ethereum/accounts/abi" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/log" - - "github.com/ethereum-optimism/optimism/op-bindings/bindings" - "github.com/ethereum-optimism/optimism/op-service/eth" -) - -// ErrPendingChallenge is returned when data is not available but can still be challenged/resolved -// so derivation should halt temporarily. -var ErrPendingChallenge = errors.New("not found, pending challenge") - -// ErrExpiredChallenge is returned when a challenge was not resolved and derivation should skip this input. -var ErrExpiredChallenge = errors.New("challenge expired") - -// ErrMissingPastWindow is returned when the input data is MIA and cannot be challenged. -// This is a protocol fatal error. -var ErrMissingPastWindow = errors.New("data missing past window") - -// ErrInvalidChallenge is returned when a challenge event does is decoded but does not -// relate to the actual chain commitments. -var ErrInvalidChallenge = errors.New("invalid challenge") - -// L1Fetcher is the required interface for syncing the DA challenge contract state. -type L1Fetcher interface { - InfoAndTxsByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, types.Transactions, error) - FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error) - L1BlockRefByNumber(context.Context, uint64) (eth.L1BlockRef, error) -} - -// DAStorage interface for calling the DA storage server. -type DAStorage interface { - GetInput(ctx context.Context, key EigenDACommitment) ([]byte, error) - SetInput(ctx context.Context, img []byte) (EigenDACommitment, error) -} - -// HeadSignalFn is the callback function to accept head-signals without a context. -type HeadSignalFn func(eth.L1BlockRef) - -// Config is the relevant subset of rollup config for plasma DA. -type Config struct { - // Required for filtering contract events - DAChallengeContractAddress common.Address - // The number of l1 blocks after the input is committed during which one can challenge. - ChallengeWindow uint64 - // The number of l1 blocks after a commitment is challenged during which one can resolve. - ResolveWindow uint64 -} - -type DA struct { - log log.Logger - cfg Config - metrics Metricer - - storage DAStorage - - // the DA state keeps track of all the commitments and their challenge status. - state *State - - // the latest l1 block we synced challenge contract events from - origin eth.BlockID - // the latest recorded finalized head as per the challenge contract - finalizedHead eth.L1BlockRef - // the latest recorded finalized head as per the l1 finalization signal - l1FinalizedHead eth.L1BlockRef - // flag the reset function we are resetting because of an expired challenge - resetting bool - - finalizedHeadSignalFunc HeadSignalFn -} - -// NewPlasmaDA creates a new PlasmaDA instance with the given log and CLIConfig. -func NewPlasmaDA(log log.Logger, cli CLIConfig, cfg Config, metrics Metricer) *DA { - return NewPlasmaDAWithStorage(log, cfg, cli.NewDAClient(), metrics) -} - -// NewPlasmaDAWithStorage creates a new PlasmaDA instance with the given log and DAStorage interface. -func NewPlasmaDAWithStorage(log log.Logger, cfg Config, storage DAStorage, metrics Metricer) *DA { - return &DA{ - log: log, - cfg: cfg, - storage: storage, - metrics: metrics, - state: NewState(log, metrics), - } -} - -// NewPlasmaWithState creates a plasma storage from initial state used for testing in isolation. -// We pass the L1Fetcher to each method so it is kept in sync with the conf depth of the pipeline. -func NewPlasmaDAWithState(log log.Logger, cfg Config, storage DAStorage, metrics Metricer, state *State) *DA { - return &DA{ - log: log, - cfg: cfg, - storage: storage, - metrics: metrics, - state: state, - } -} - -// OnFinalizedHeadSignal sets the callback function to be called when the finalized head is updated. -// This will signal to the engine queue that will set the proper L2 block as finalized. -func (d *DA) OnFinalizedHeadSignal(f HeadSignalFn) { - d.finalizedHeadSignalFunc = f -} - -// Finalize takes the L1 finality signal, compares the plasma finalized block and forwards the finality -// signal to the engine queue based on whichever is most behind. -func (d *DA) Finalize(l1Finalized eth.L1BlockRef) { - ref := d.finalizedHead - d.log.Info("received l1 finalized signal, forwarding to engine queue", "l1", l1Finalized, "plasma", ref) - // if the l1 finalized head is behind it is the finalized head - if l1Finalized.Number < d.finalizedHead.Number { - ref = l1Finalized - } - // prune finalized state - d.state.Prune(ref.Number) - - if d.finalizedHeadSignalFunc == nil { - d.log.Warn("finalized head signal function not set") - return - } - - // signal the engine queue - d.finalizedHeadSignalFunc(ref) -} - -// LookAhead increments the challenges origin and process the new block if it exists. -// It is used when the derivation pipeline stalls due to missing data and we need to continue -// syncing challenge events until the challenge is resolved or expires. -func (d *DA) LookAhead(ctx context.Context, l1 L1Fetcher) error { - blkRef, err := l1.L1BlockRefByNumber(ctx, d.origin.Number+1) - // temporary error, will do a backoff - if err != nil { - return err - } - return d.AdvanceL1Origin(ctx, l1, blkRef.ID()) -} - -// Reset the challenge event derivation origin in case of L1 reorg -func (d *DA) Reset(ctx context.Context, base eth.L1BlockRef, baseCfg eth.SystemConfig) error { - // resetting due to expired challenge, do not clear state. - // If the DA source returns ErrReset, the pipeline is forced to reset by the rollup driver. - // In that case the Reset function will be called immediately, BEFORE the pipeline can - // call any further stage to step. Thus the state will NOT be cleared if the reset originates - // from this stage of the pipeline. - if d.resetting { - d.resetting = false - } else { - // resetting due to L1 reorg, clear state - d.origin = base.ID() - d.state.Reset() - } - return io.EOF -} - -// GetInput returns the input data for the given commitment bytes. blockNumber is required to lookup -// the challenge status in the DataAvailabilityChallenge L1 contract. -func (d *DA) GetInput(ctx context.Context, l1 L1Fetcher, comm EigenDACommitment, blockId eth.BlockID) (eth.Data, error) { - // If the challenge head is ahead in the case of a pipeline reset or stall, we might have synced a - // challenge event for this commitment. Otherwise we mark the commitment as part of the canonical - // chain so potential future challenge events can be selected. - ch := d.state.GetOrTrackChallenge(comm.Encode(), blockId.Number, d.cfg.ChallengeWindow) - - // Fetch the input from the DA storage. - data, err := d.storage.GetInput(ctx, comm) - - // data is not found in storage but may be available if the challenge was resolved. - notFound := errors.Is(ErrNotFound, err) - - if err != nil && !notFound { - d.log.Error("failed to get preimage", "err", err) - // the storage client request failed for some other reason - // in which case derivation pipeline should be retried - return nil, err - } - - switch ch.challengeStatus { - case ChallengeActive: - if d.isExpired(ch.expiresAt) { - // this challenge has expired, this input must be skipped - return nil, ErrExpiredChallenge - } else if notFound { - // data is missing and a challenge is active, we must wait for the challenge to resolve - // hence we continue syncing new origins to sync the new challenge events. - if err := d.LookAhead(ctx, l1); err != nil { - return nil, err - } - return nil, ErrPendingChallenge - } - case ChallengeExpired: - // challenge was marked as expired, skip - return nil, ErrExpiredChallenge - case ChallengeResolved: - // challenge was resolved, data is available in storage, return directly - if !notFound { - return data, nil - } - // data not found in storage, return from challenge resolved input - resolvedInput, err := d.state.GetResolvedInput(comm.Encode()) - if err != nil { - return nil, err - } - return resolvedInput, nil - default: - if notFound { - if d.isExpired(ch.expiresAt) { - // we're past the challenge window and the data is not available - return nil, ErrMissingPastWindow - } else { - // continue syncing challenges hoping it eventually is challenged and resolved - if err := d.LookAhead(ctx, l1); err != nil { - return nil, err - } - return nil, ErrPendingChallenge - } - } - } - - return data, nil -} - -// isExpired returns whether the given expiration block number is lower or equal to the current head -func (d *DA) isExpired(bn uint64) bool { - return d.origin.Number >= bn -} - -// AdvanceL1Origin syncs any challenge events included in the l1 block, expires any active challenges -// after the new resolveWindow, computes and signals the new finalized head and sets the l1 block -// as the new head for tracking challenges. If forwards an error if any new challenge have expired to -// trigger a derivation reset. -func (d *DA) AdvanceL1Origin(ctx context.Context, l1 L1Fetcher, block eth.BlockID) error { - // do not repeat for the same origin - if block.Number <= d.origin.Number { - return nil - } - // sync challenges for the given block ID - if err := d.LoadChallengeEvents(ctx, l1, block); err != nil { - return err - } - // advance challenge window, computing the finalized head - bn, err := d.state.ExpireChallenges(block.Number) - if err != nil { - // warn the reset function not to clear the state - d.resetting = true - return err - } - - // finalized head signal is called only when the finalized head number increases - // and the l1 finalized head ahead of the DA finalized head. - if bn > d.finalizedHead.Number { - ref, err := l1.L1BlockRefByNumber(ctx, bn) - if err != nil { - return err - } - d.metrics.RecordChallengesHead("finalized", bn) - - // keep track of finalized had so it can be picked up by the - // l1 finalization signal - d.finalizedHead = ref - } - d.origin = block - d.metrics.RecordChallengesHead("latest", d.origin.Number) - - d.log.Info("processed plasma l1 origin", "origin", block, "next-finalized", bn, "finalized", d.finalizedHead.Number, "l1-finalize", d.l1FinalizedHead.Number) - return nil -} - -// LoadChallengeEvents fetches the l1 block receipts and updates the challenge status -func (d *DA) LoadChallengeEvents(ctx context.Context, l1 L1Fetcher, block eth.BlockID) error { - // filter any challenge event logs in the block - logs, err := d.fetchChallengeLogs(ctx, l1, block) - if err != nil { - return err - } - - for _, log := range logs { - i := log.TxIndex - status, comm, err := d.decodeChallengeStatus(log) - if err != nil { - d.log.Error("failed to decode challenge event", "block", block.Number, "tx", i, "log", log.Index, "err", err) - continue - } - switch status { - case ChallengeResolved: - // cached with input resolution call so not expensive - _, txs, err := l1.InfoAndTxsByHash(ctx, block.Hash) - if err != nil { - d.log.Error("failed to fetch l1 block", "block", block.Number, "err", err) - continue - } - // avoid panic in black swan case of faulty rpc - if uint(len(txs)) <= i { - d.log.Error("tx/receipt mismatch in InfoAndTxsByHash") - continue - } - // select the transaction corresponding to the receipt - tx := txs[i] - // txs and receipts must be in the same order - if tx.Hash() != log.TxHash { - d.log.Error("tx hash mismatch", "block", block.Number, "txIdx", i, "log", log.Index, "txHash", tx.Hash(), "receiptTxHash", log.TxHash) - continue - } - // Decode the input from resolver tx calldata - input, err := DecodeResolvedInput(tx.Data()) - if err != nil { - d.log.Error("failed to decode resolved input", "block", block.Number, "txIdx", i, "err", err) - continue - } - if err := comm.Verify(input); err != nil { - d.log.Error("failed to verify commitment", "block", block.Number, "txIdx", i, "err", err) - continue - } - d.log.Debug("challenge resolved", "block", block, "txIdx", i) - d.state.SetResolvedChallenge(comm.Encode(), input, log.BlockNumber) - case ChallengeActive: - d.log.Info("detected new active challenge", "block", block) - d.state.SetActiveChallenge(comm.Encode(), log.BlockNumber, d.cfg.ResolveWindow) - default: - d.log.Warn("skipping unknown challenge status", "block", block.Number, "tx", i, "log", log.Index, "status", status) - } - } - return nil -} - -// fetchChallengeLogs returns logs for challenge events if any for the given block -func (d *DA) fetchChallengeLogs(ctx context.Context, l1 L1Fetcher, block eth.BlockID) ([]*types.Log, error) { //cached with deposits events call so not expensive - var logs []*types.Log - _, receipts, err := l1.FetchReceipts(ctx, block.Hash) - if err != nil { - return nil, err - } - d.log.Info("loading challenges", "epoch", block.Number, "numReceipts", len(receipts)) - for _, rec := range receipts { - // skip error logs - if rec.Status != types.ReceiptStatusSuccessful { - continue - } - for _, log := range rec.Logs { - if log.Address == d.cfg.DAChallengeContractAddress && len(log.Topics) > 0 && log.Topics[0] == ChallengeStatusEventABIHash { - logs = append(logs, log) - } - } - } - - return logs, nil -} - -// decodeChallengeStatus decodes and validates a challenge event from a transaction log, returning the associated commitment bytes. -func (d *DA) decodeChallengeStatus(log *types.Log) (ChallengeStatus, Keccak256Commitment, error) { - event, err := DecodeChallengeStatusEvent(log) - if err != nil { - return 0, nil, err - } - comm, err := DecodeKeccak256(event.ChallengedCommitment) - if err != nil { - return 0, nil, err - } - d.log.Debug("decoded challenge status event", "log", log, "event", event, "comm", fmt.Sprintf("%x", comm.Encode())) - - bn := event.ChallengedBlockNumber.Uint64() - // IsTracking just validates whether the commitment was challenged for the correct block number - // if it has been loaded from the batcher inbox before. Spam commitments will be tracked but - // ignored and evicted unless derivation encounters the commitment. - if !d.state.IsTracking(comm.Encode(), bn) { - return 0, nil, fmt.Errorf("%w: %x at block %d", ErrInvalidChallenge, comm.Encode(), bn) - } - return ChallengeStatus(event.Status), comm, nil -} - -var ( - ChallengeStatusEventName = "ChallengeStatusChanged" - ChallengeStatusEventABI = "ChallengeStatusChanged(uint256,bytes,uint8)" - ChallengeStatusEventABIHash = crypto.Keccak256Hash([]byte(ChallengeStatusEventABI)) -) - -// DecodeChallengeStatusEvent decodes the challenge status event from the log data and the indexed challenged -// hash and block number from the topics. -func DecodeChallengeStatusEvent(log *types.Log) (*bindings.DataAvailabilityChallengeChallengeStatusChanged, error) { - // abi lazy loaded, cached after decoded once - dacAbi, err := bindings.DataAvailabilityChallengeMetaData.GetAbi() - if err != nil { - return nil, err - } - var event bindings.DataAvailabilityChallengeChallengeStatusChanged - err = dacAbi.UnpackIntoInterface(&event, ChallengeStatusEventName, log.Data) - if err != nil { - return nil, err - } - var indexed abi.Arguments - for _, arg := range dacAbi.Events[ChallengeStatusEventName].Inputs { - if arg.Indexed { - indexed = append(indexed, arg) - } - } - if err := abi.ParseTopics(&event, indexed, log.Topics[1:]); err != nil { - return nil, err - } - return &event, nil -} - -// DecodeResolvedInput decodes the preimage bytes from the tx input data. -func DecodeResolvedInput(data []byte) ([]byte, error) { - dacAbi, _ := bindings.DataAvailabilityChallengeMetaData.GetAbi() - - args := make(map[string]interface{}) - err := dacAbi.Methods["resolve"].Inputs.UnpackIntoMap(args, data[4:]) - if err != nil { - return nil, err - } - rd, ok := args["resolveData"].([]byte) - if !ok || len(rd) == 0 { - return nil, fmt.Errorf("invalid resolve data") - } - return rd, nil -} diff --git a/damgr_test.go b/damgr_test.go deleted file mode 100644 index 5a37badc..00000000 --- a/damgr_test.go +++ /dev/null @@ -1,366 +0,0 @@ -package plasma - -import ( - "context" - "math/big" - "math/rand" - "testing" - - "github.com/ethereum-optimism/optimism/op-service/eth" - "github.com/ethereum-optimism/optimism/op-service/testlog" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/log" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" -) - -func RandomData(rng *rand.Rand, size int) []byte { - out := make([]byte, size) - rng.Read(out) - return out -} - -// TestDAChallengeState is a simple test with small values to verify the finalized head logic -func TestDAChallengeState(t *testing.T) { - logger := testlog.Logger(t, log.LvlDebug) - - rng := rand.New(rand.NewSource(1234)) - state := NewState(logger, &NoopMetrics{}) - - i := uint64(1) - - challengeWindow := uint64(6) - resolveWindow := uint64(6) - - // track commitments in the first 10 blocks - for ; i < 10; i++ { - // this is akin to stepping the derivation pipeline through a range a blocks each with a commitment - state.SetInputCommitment(RandomData(rng, 32), i, challengeWindow) - } - - // blocks are finalized after the challenge window expires - bn, err := state.ExpireChallenges(10) - require.NoError(t, err) - // finalized head = 10 - 6 = 4 - require.Equal(t, uint64(4), bn) - - // track the next commitment and mark it as challenged - c := RandomData(rng, 32) - // add input commitment at block i = 10 - state.SetInputCommitment(c, 10, challengeWindow) - // i+4 is the block at which it was challenged - state.SetActiveChallenge(c, 14, resolveWindow) - - for j := i + 1; j < 18; j++ { - // continue walking the pipeline through some more blocks with commitments - state.SetInputCommitment(RandomData(rng, 32), j, challengeWindow) - } - - // finalized l1 origin should not extend past the resolve window - bn, err = state.ExpireChallenges(18) - require.NoError(t, err) - // finalized is active_challenge_block - 1 = 10 - 1 and cannot move until the challenge expires - require.Equal(t, uint64(9), bn) - - // walk past the resolve window - for j := uint64(18); j < 22; j++ { - state.SetInputCommitment(RandomData(rng, 32), j, challengeWindow) - } - - // no more active challenges, the finalized head can catch up to the challenge window - bn, err = state.ExpireChallenges(22) - require.ErrorIs(t, err, ErrReorgRequired) - // finalized head is now 22 - 6 = 16 - require.Equal(t, uint64(16), bn) - - // cleanup state we don't need anymore - state.Prune(22) - // now if we expire the challenges again, it won't request a reorg again - bn, err = state.ExpireChallenges(22) - require.NoError(t, err) - // finalized head hasn't moved - require.Equal(t, uint64(16), bn) - - // add one more commitment and challenge it - c = RandomData(rng, 32) - state.SetInputCommitment(c, 22, challengeWindow) - // challenge 3 blocks after - state.SetActiveChallenge(c, 25, resolveWindow) - - // exceed the challenge window with more commitments - for j := uint64(23); j < 30; j++ { - state.SetInputCommitment(RandomData(rng, 32), j, challengeWindow) - } - - // finalized head should not extend past the resolve window - bn, err = state.ExpireChallenges(30) - require.NoError(t, err) - // finalized head is stuck waiting for resolve window - require.Equal(t, uint64(21), bn) - - input := RandomData(rng, 100) - // resolve the challenge - state.SetResolvedChallenge(c, input, 30) - - // finalized head catches up - bn, err = state.ExpireChallenges(31) - require.NoError(t, err) - // finalized head is now 31 - 6 = 25 - require.Equal(t, uint64(25), bn) - - // the resolved input is also stored - storedInput, err := state.GetResolvedInput(c) - require.NoError(t, err) - require.Equal(t, input, storedInput) -} - -// TestExpireChallenges expires challenges and prunes the state for longer windows -// with commitments every 6 blocks. -func TestExpireChallenges(t *testing.T) { - logger := testlog.Logger(t, log.LvlDebug) - - rng := rand.New(rand.NewSource(1234)) - state := NewState(logger, &NoopMetrics{}) - - comms := make(map[uint64][]byte) - - i := uint64(3713854) - - var finalized uint64 - - challengeWindow := uint64(90) - resolveWindow := uint64(90) - - // increment new commitments every 6 blocks - for ; i < 3713948; i += 6 { - comm := RandomData(rng, 32) - comms[i] = comm - logger.Info("set commitment", "block", i) - cm := state.GetOrTrackChallenge(comm, i, challengeWindow) - require.NotNil(t, cm) - - bn, err := state.ExpireChallenges(i) - logger.Info("expire challenges", "finalized head", bn, "err", err) - - // only update finalized head if it has moved - if bn > finalized { - finalized = bn - // prune unused state - state.Prune(bn) - } - } - - // activate a couple of subsequent challenges - state.SetActiveChallenge(comms[3713926], 3713948, resolveWindow) - - state.SetActiveChallenge(comms[3713932], 3713950, resolveWindow) - - // continue incrementing commitments - for ; i < 3714038; i += 6 { - comm := RandomData(rng, 32) - comms[i] = comm - logger.Info("set commitment", "block", i) - cm := state.GetOrTrackChallenge(comm, i, challengeWindow) - require.NotNil(t, cm) - - bn, err := state.ExpireChallenges(i) - logger.Info("expire challenges", "expired", bn, "err", err) - - if bn > finalized { - finalized = bn - state.Prune(bn) - } - - } - - // finalized head does not move as it expires previously seen blocks - bn, err := state.ExpireChallenges(3714034) - require.NoError(t, err) - require.Equal(t, uint64(3713920), bn) - - bn, err = state.ExpireChallenges(3714035) - require.NoError(t, err) - require.Equal(t, uint64(3713920), bn) - - bn, err = state.ExpireChallenges(3714036) - require.NoError(t, err) - require.Equal(t, uint64(3713920), bn) - - bn, err = state.ExpireChallenges(3714037) - require.NoError(t, err) - require.Equal(t, uint64(3713920), bn) - - // lastly we get to the resolve window and trigger a reorg - _, err = state.ExpireChallenges(3714038) - require.ErrorIs(t, err, ErrReorgRequired) - - // this is simulating a pipeline reset where it walks back challenge + resolve window - for i := uint64(3713854); i < 3714044; i += 6 { - cm := state.GetOrTrackChallenge(comms[i], i, challengeWindow) - require.NotNil(t, cm) - - // check that the challenge status was updated to expired - if i == 3713926 { - require.Equal(t, ChallengeExpired, cm.challengeStatus) - } - } - - bn, err = state.ExpireChallenges(3714038) - require.NoError(t, err) - - // finalized at last - require.Equal(t, uint64(3713926), bn) -} - -func TestDAChallengeDetached(t *testing.T) { - logger := testlog.Logger(t, log.LvlDebug) - - rng := rand.New(rand.NewSource(1234)) - state := NewState(logger, &NoopMetrics{}) - - challengeWindow := uint64(6) - resolveWindow := uint64(6) - - c1 := RandomData(rng, 32) - c2 := RandomData(rng, 32) - - // c1 at bn1 is missing, pipeline stalls - state.GetOrTrackChallenge(c1, 1, challengeWindow) - - // c2 at bn2 is challenged at bn3 - require.True(t, state.IsTracking(c2, 2)) - state.SetActiveChallenge(c2, 3, resolveWindow) - - // c1 is finally challenged at bn5 - state.SetActiveChallenge(c1, 5, resolveWindow) - - // c2 expires but should not trigger a reset because we don't know if it's valid yet - bn, err := state.ExpireChallenges(10) - require.NoError(t, err) - require.Equal(t, uint64(0), bn) - - // c1 expires finally - bn, err = state.ExpireChallenges(11) - require.ErrorIs(t, err, ErrReorgRequired) - require.Equal(t, uint64(1), bn) - - // pruning finalized block is safe - state.Prune(bn) - - // pipeline discovers c2 - comm := state.GetOrTrackChallenge(c2, 2, challengeWindow) - // it is already marked as expired so it will be skipped without needing a reorg - require.Equal(t, ChallengeExpired, comm.challengeStatus) - - // later when we get to finalizing block 10 + margin, the pending challenge is safely pruned - state.Prune(210) - require.Equal(t, 0, len(state.expiredComms)) -} - -// cannot import from testutils at this time because of import cycle -type mockL1Fetcher struct { - mock.Mock -} - -func (m *mockL1Fetcher) InfoAndTxsByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, types.Transactions, error) { - out := m.Mock.Called(hash) - return out.Get(0).(eth.BlockInfo), out.Get(1).(types.Transactions), out.Error(2) -} - -func (m *mockL1Fetcher) ExpectInfoAndTxsByHash(hash common.Hash, info eth.BlockInfo, transactions types.Transactions, err error) { - m.Mock.On("InfoAndTxsByHash", hash).Once().Return(info, transactions, err) -} - -func (m *mockL1Fetcher) FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error) { - out := m.Mock.Called(blockHash) - return *out.Get(0).(*eth.BlockInfo), out.Get(1).(types.Receipts), out.Error(2) -} - -func (m *mockL1Fetcher) ExpectFetchReceipts(hash common.Hash, info eth.BlockInfo, receipts types.Receipts, err error) { - m.Mock.On("FetchReceipts", hash).Once().Return(&info, receipts, err) -} - -func (m *mockL1Fetcher) L1BlockRefByNumber(ctx context.Context, num uint64) (eth.L1BlockRef, error) { - out := m.Mock.Called(num) - return out.Get(0).(eth.L1BlockRef), out.Error(1) -} - -func (m *mockL1Fetcher) ExpectL1BlockRefByNumber(num uint64, ref eth.L1BlockRef, err error) { - m.Mock.On("L1BlockRefByNumber", num).Once().Return(ref, err) -} - -func TestFilterInvalidBlockNumber(t *testing.T) { - logger := testlog.Logger(t, log.LevelDebug) - ctx := context.Background() - - l1F := &mockL1Fetcher{} - - storage := NewMockDAClient(logger) - - daddr := common.HexToAddress("0x978e3286eb805934215a88694d80b09aded68d90") - pcfg := Config{ - ChallengeWindow: 90, ResolveWindow: 90, DAChallengeContractAddress: daddr, - } - - bn := uint64(19) - bhash := common.HexToHash("0xd438144ffab918b1349e7cd06889c26800c26d8edc34d64f750e3e097166a09c") - - state := NewState(logger, &NoopMetrics{}) - - da := NewPlasmaDAWithState(logger, pcfg, storage, &NoopMetrics{}, state) - - receipts := types.Receipts{&types.Receipt{ - Type: 2, - Status: 1, - Logs: []*types.Log{ - { - BlockNumber: bn, - Address: daddr, - Topics: []common.Hash{ - common.HexToHash("0xa448afda7ea1e3a7a10fcab0c29fe9a9dd85791503bf0171f281521551c7ec05"), - }, - }, - { - BlockNumber: bn, - Address: daddr, - Topics: []common.Hash{ - common.HexToHash("0xc5d8c630ba2fdacb1db24c4599df78c7fb8cf97b5aecde34939597f6697bb1ad"), - common.HexToHash("0x000000000000000000000000000000000000000000000000000000000000000e"), - }, - Data: common.FromHex("0x00000000000000000000000000000000000000000000000000000000000000400000000000000000000000000000000000000000000000000000000000000001000000000000000000000000000000000000000000000000000000000000002100eed82c1026bdd0f23461dd6ca515ef677624e63e6fc0ff91e3672af8eddf579d00000000000000000000000000000000000000000000000000000000000000"), - }, - }, - BlockNumber: big.NewInt(int64(bn)), - }} - id := eth.BlockID{ - Number: bn, - Hash: bhash, - } - l1F.ExpectFetchReceipts(bhash, nil, receipts, nil) - - // we get 1 log successfully filtered as valid status updated contract event - logs, err := da.fetchChallengeLogs(ctx, l1F, id) - require.NoError(t, err) - require.Equal(t, len(logs), 1) - - // commitment is tracked but not canonical - status, comm, err := da.decodeChallengeStatus(logs[0]) - require.NoError(t, err) - - c, has := state.commsByKey[string(comm.Encode())] - require.True(t, has) - require.False(t, c.canonical) - - require.Equal(t, ChallengeActive, status) - // once tracked, set as active based on decoded status - state.SetActiveChallenge(comm.Encode(), bn, pcfg.ResolveWindow) - - // once we request it during derivation it becomes canonical - tracked := state.GetOrTrackChallenge(comm.Encode(), 14, pcfg.ChallengeWindow) - require.True(t, tracked.canonical) - - require.Equal(t, ChallengeActive, tracked.challengeStatus) - require.Equal(t, uint64(14), tracked.blockNumber) - require.Equal(t, bn+pcfg.ResolveWindow, tracked.expiresAt) -} diff --git a/damock.go b/damock.go deleted file mode 100644 index 72b4a5be..00000000 --- a/damock.go +++ /dev/null @@ -1,99 +0,0 @@ -package plasma - -import ( - "context" - "errors" - "io" - - "github.com/ethereum-optimism/optimism/op-service/eth" - "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/ethdb/memorydb" - "github.com/ethereum/go-ethereum/log" -) - -// MockDAClient mocks a DA storage provider to avoid running an HTTP DA server -// in unit tests. -type MockDAClient struct { - store ethdb.KeyValueStore - log log.Logger -} - -func NewMockDAClient(log log.Logger) *MockDAClient { - return &MockDAClient{ - store: memorydb.New(), - log: log, - } -} - -func (c *MockDAClient) GetInput(ctx context.Context, key EigenDACommitment) ([]byte, error) { - bytes, err := c.store.Get(key.Encode()) - if err != nil { - return nil, ErrNotFound - } - return bytes, nil -} - -func (c *MockDAClient) SetInput(ctx context.Context, data []byte) (EigenDACommitment, error) { - key := EigenDACommitment(data) - return key, c.store.Put(key.Encode(), data) -} - -func (c *MockDAClient) DeleteData(key []byte) error { - return c.store.Delete(key) -} - -type DAErrFaker struct { - Client *MockDAClient - - getInputErr error - setInputErr error -} - -func (f *DAErrFaker) GetInput(ctx context.Context, key EigenDACommitment) ([]byte, error) { - if err := f.getInputErr; err != nil { - f.getInputErr = nil - return nil, err - } - return f.Client.GetInput(ctx, key) -} - -func (f *DAErrFaker) SetInput(ctx context.Context, data []byte) (EigenDACommitment, error) { - if err := f.setInputErr; err != nil { - f.setInputErr = nil - return nil, err - } - return f.Client.SetInput(ctx, data) -} - -func (f *DAErrFaker) ActGetPreImageFail() { - f.getInputErr = errors.New("get input failed") -} - -func (f *DAErrFaker) ActSetPreImageFail() { - f.setInputErr = errors.New("set input failed") -} - -var Disabled = &PlasmaDisabled{} - -var ErrNotEnabled = errors.New("plasma not enabled") - -// PlasmaDisabled is a noop plasma DA implementation for stubbing. -type PlasmaDisabled struct{} - -func (d *PlasmaDisabled) GetInput(ctx context.Context, l1 L1Fetcher, commitment Keccak256Commitment, blockId eth.BlockID) (eth.Data, error) { - return nil, ErrNotEnabled -} - -func (d *PlasmaDisabled) Reset(ctx context.Context, base eth.L1BlockRef, baseCfg eth.SystemConfig) error { - return io.EOF -} - -func (d *PlasmaDisabled) Finalize(ref eth.L1BlockRef) { -} - -func (d *PlasmaDisabled) OnFinalizedHeadSignal(f HeadSignalFn) { -} - -func (d *PlasmaDisabled) AdvanceL1Origin(ctx context.Context, l1 L1Fetcher, blockId eth.BlockID) error { - return ErrNotEnabled -} diff --git a/daserver.go b/daserver.go index 9c67ea23..275f0a3a 100644 --- a/daserver.go +++ b/daserver.go @@ -11,6 +11,7 @@ import ( "strconv" "time" + "github.com/Layr-Labs/op-plasma-eigenda/metrics" "github.com/ethereum-optimism/optimism/op-service/rpc" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/log" @@ -29,14 +30,16 @@ type DAServer struct { log log.Logger endpoint string store PlasmaStore + m *metrics.Metrics tls *rpc.ServerTLSConfig httpServer *http.Server listener net.Listener } -func NewDAServer(host string, port int, store PlasmaStore, log log.Logger) *DAServer { +func NewDAServer(host string, port int, store PlasmaStore, log log.Logger, m *metrics.Metrics) *DAServer { endpoint := net.JoinHostPort(host, strconv.Itoa(port)) return &DAServer{ + m: m, log: log, endpoint: endpoint, store: store, @@ -94,12 +97,16 @@ func (d *DAServer) Start() error { func (d *DAServer) Health(w http.ResponseWriter, r *http.Request) { d.log.Info("GET", "url", r.URL) + recordDur := d.m.RecordRPCServerRequest("health") + defer recordDur() w.WriteHeader(http.StatusOK) } func (d *DAServer) HandleGet(w http.ResponseWriter, r *http.Request) { d.log.Info("GET", "url", r.URL) + recordDur := d.m.RecordRPCServerRequest("put") + defer recordDur() route := path.Dir(r.URL.Path) if route != "/get" { @@ -143,6 +150,8 @@ func (d *DAServer) HandleGet(w http.ResponseWriter, r *http.Request) { func (d *DAServer) HandlePut(w http.ResponseWriter, r *http.Request) { d.log.Info("PUT", "url", r.URL) + recordDur := d.m.RecordRPCServerRequest("put") + defer recordDur() route := path.Dir(r.URL.Path) if route != "/put" { diff --git a/dastate.go b/dastate.go deleted file mode 100644 index c6fee480..00000000 --- a/dastate.go +++ /dev/null @@ -1,228 +0,0 @@ -package plasma - -import ( - "container/heap" - "errors" - "fmt" - - "github.com/ethereum/go-ethereum/log" -) - -// ErrReorgRequired is returned when a commitment was derived but for which the challenge expired. -// This requires a reorg to rederive without the input even if the input was previously available. -var ErrReorgRequired = errors.New("reorg required") - -type ChallengeStatus uint8 - -const ( - ChallengeUninitialized ChallengeStatus = iota - ChallengeActive - ChallengeResolved - ChallengeExpired -) - -// Commitment keeps track of the onchain state of an input commitment. -type Commitment struct { - key []byte // the encoded commitment - input []byte // the input itself if it was resolved onchain - expiresAt uint64 // represents the block number after which the commitment can no longer be challenged or if challenged no longer be resolved. - blockNumber uint64 // block where the commitment is included as calldata to the batcher inbox - challengeStatus ChallengeStatus // latest known challenge status - canonical bool // whether the commitment was derived as part of the canonical chain if canonical it will be in comms queue if not in the pendingComms queue. -} - -// CommQueue is a priority queue of commitments ordered by block number. -type CommQueue []*Commitment - -var _ heap.Interface = (*CommQueue)(nil) - -func (c CommQueue) Len() int { return len(c) } - -// we want the first item in the queue to have the lowest block number -func (c CommQueue) Less(i, j int) bool { - return c[i].blockNumber < c[j].blockNumber -} - -func (c CommQueue) Swap(i, j int) { - c[i], c[j] = c[j], c[i] -} - -func (c *CommQueue) Push(x any) { - *c = append(*c, x.(*Commitment)) -} - -func (c *CommQueue) Pop() any { - old := *c - n := len(old) - item := old[n-1] - old[n-1] = nil // avoid memory leak - *c = old[0 : n-1] - return item -} - -// State tracks the commitment and their challenges in order of l1 inclusion. -type State struct { - activeComms CommQueue - expiredComms CommQueue - commsByKey map[string]*Commitment - log log.Logger - metrics Metricer - finalized uint64 -} - -func NewState(log log.Logger, m Metricer) *State { - return &State{ - activeComms: make(CommQueue, 0), - expiredComms: make(CommQueue, 0), - commsByKey: make(map[string]*Commitment), - log: log, - metrics: m, - } -} - -// IsTracking returns whether we currently have a commitment for the given key. -// if the block number is mismatched we return false to ignore the challenge. -func (s *State) IsTracking(key []byte, bn uint64) bool { - if c, ok := s.commsByKey[string(key)]; ok { - return c.blockNumber == bn - } - // track the commitment knowing we may be in detached head and not have seen - // the commitment in the inbox yet. - s.TrackDetachedCommitment(key, bn) - return true -} - -// TrackDetachedCommitment is used for indexing challenges for commitments that have not yet -// been derived due to the derivation pipeline being stalled pending a commitment to be challenged. -// Memory usage is bound to L1 block space during the DA windows, so it is hard and expensive to spam. -// Note that the challenge status and expiration is updated separately after it is tracked. -func (s *State) TrackDetachedCommitment(key []byte, bn uint64) { - c := &Commitment{ - key: key, - expiresAt: bn, - blockNumber: bn, - canonical: false, - } - s.log.Debug("tracking detached commitment", "blockNumber", c.blockNumber, "commitment", fmt.Sprintf("%x", key)) - heap.Push(&s.activeComms, c) - s.commsByKey[string(key)] = c -} - -// SetActiveChallenge switches the state of a given commitment to active challenge. Noop if -// the commitment is not tracked as we don't want to track challenges for invalid commitments. -func (s *State) SetActiveChallenge(key []byte, challengedAt uint64, resolveWindow uint64) { - if c, ok := s.commsByKey[string(key)]; ok { - c.expiresAt = challengedAt + resolveWindow - c.challengeStatus = ChallengeActive - s.metrics.RecordActiveChallenge(c.blockNumber, challengedAt, key) - } -} - -// SetResolvedChallenge switches the state of a given commitment to resolved. Noop if -// the commitment is not tracked as we don't want to track challenges for invalid commitments. -// The input posted onchain is stored in the state for later retrieval. -func (s *State) SetResolvedChallenge(key []byte, input []byte, resolvedAt uint64) { - if c, ok := s.commsByKey[string(key)]; ok { - c.challengeStatus = ChallengeResolved - c.expiresAt = resolvedAt - c.input = input - s.metrics.RecordResolvedChallenge(key) - } -} - -// SetInputCommitment initializes a new commitment and adds it to the state. -// This is called when we see a commitment during derivation so we can refer to it later in -// challenges. -func (s *State) SetInputCommitment(key []byte, committedAt uint64, challengeWindow uint64) *Commitment { - c := &Commitment{ - key: key, - expiresAt: committedAt + challengeWindow, - blockNumber: committedAt, - canonical: true, - } - s.log.Debug("append commitment", "expiresAt", c.expiresAt, "blockNumber", c.blockNumber) - heap.Push(&s.activeComms, c) - s.commsByKey[string(key)] = c - - return c -} - -// GetOrTrackChallenge returns the commitment for the given key if it is already tracked, or -// initializes a new commitment and adds it to the state. -func (s *State) GetOrTrackChallenge(key []byte, bn uint64, challengeWindow uint64) *Commitment { - if c, ok := s.commsByKey[string(key)]; ok { - // commitments previously introduced by challenge events are marked as canonical - c.canonical = true - return c - } - return s.SetInputCommitment(key, bn, challengeWindow) -} - -// GetResolvedInput returns the input bytes if the commitment was resolved onchain. -func (s *State) GetResolvedInput(key []byte) ([]byte, error) { - if c, ok := s.commsByKey[string(key)]; ok { - return c.input, nil - } - return nil, errors.New("commitment not found") -} - -// ExpireChallenges walks back from the oldest commitment to find the latest l1 origin -// for which input data can no longer be challenged. It also marks any active challenges -// as expired based on the new latest l1 origin. If any active challenges are expired -// it returns an error to signal that a derivation pipeline reset is required. -func (s *State) ExpireChallenges(bn uint64) (uint64, error) { - var err error - for s.activeComms.Len() > 0 && s.activeComms[0].expiresAt <= bn && s.activeComms[0].blockNumber > s.finalized { - // move from the active to the expired queue - c := heap.Pop(&s.activeComms).(*Commitment) - heap.Push(&s.expiredComms, c) - - if c.canonical { - // advance finalized head only if the commitment was derived as part of the canonical chain - s.finalized = c.blockNumber - } - - // active mark as expired so it is skipped in the derivation pipeline - if c.challengeStatus == ChallengeActive { - c.challengeStatus = ChallengeExpired - - // only reorg if canonical. If the pipeline is behind, it will just - // get skipped once it catches up. If it is spam, it will be pruned - // with no effect. - if c.canonical { - err = ErrReorgRequired - s.metrics.RecordExpiredChallenge(c.key) - } - } - } - - return s.finalized, err -} - -// safely prune in case reset is deeper than the finalized l1 -const commPruneMargin = 200 - -// Prune removes commitments once they can no longer be challenged or resolved. -// the finalized head block number is passed so we can safely remove any commitments -// with finalized block numbers. -func (s *State) Prune(bn uint64) { - if bn > commPruneMargin { - bn -= commPruneMargin - } else { - bn = 0 - } - for s.expiredComms.Len() > 0 && s.expiredComms[0].blockNumber < bn { - c := heap.Pop(&s.expiredComms).(*Commitment) - s.log.Debug("prune commitment", "expiresAt", c.expiresAt, "blockNumber", c.blockNumber) - delete(s.commsByKey, string(c.key)) - } -} - -// In case of L1 reorg, state should be cleared so we can sync all the challenge events -// from scratch. -func (s *State) Reset() { - s.activeComms = s.activeComms[:0] - s.expiredComms = s.expiredComms[:0] - s.finalized = 0 - clear(s.commsByKey) -} diff --git a/metrics.go b/metrics.go deleted file mode 100644 index dfde9b81..00000000 --- a/metrics.go +++ /dev/null @@ -1,78 +0,0 @@ -package plasma - -import ( - "github.com/ethereum-optimism/optimism/op-service/metrics" - "github.com/prometheus/client_golang/prometheus" -) - -/* - TODO: Add telemetry specific to EigenDA interactions and - request/response latency -*/ - -type Metricer interface { - RecordActiveChallenge(commBlock uint64, startBlock uint64, hash []byte) - RecordResolvedChallenge(hash []byte) - RecordExpiredChallenge(hash []byte) - RecordChallengesHead(name string, num uint64) - RecordStorageError() -} - -type Metrics struct { - ChallengesStatus *prometheus.GaugeVec - ChallengesHead *prometheus.GaugeVec - - StorageErrors *metrics.Event -} - -var _ Metricer = (*Metrics)(nil) - -func MakeMetrics(ns string, factory metrics.Factory) *Metrics { - return &Metrics{ - ChallengesStatus: factory.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: ns, - Name: "challenges_status", - Help: "Gauge representing the status of challenges synced", - }, []string{"status"}), - ChallengesHead: factory.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: ns, - Name: "challenges_head", - Help: "Gauge representing the l1 heads of challenges synced", - }, []string{"type"}), - StorageErrors: metrics.NewEvent(factory, ns, "", "storage_errors", "errors when fetching or uploading to storage service"), - } -} - -func (m *Metrics) RecordChallenge(status string) { - m.ChallengesStatus.WithLabelValues(status).Inc() -} - -// RecordActiveChallenge records when a commitment is challenged including the block where the commitment -// is included, the block where the commitment was challenged and the commitment hash. -func (m *Metrics) RecordActiveChallenge(commBlock uint64, startBlock uint64, hash []byte) { - m.RecordChallenge("active") -} - -func (m *Metrics) RecordResolvedChallenge(hash []byte) { - m.RecordChallenge("resolved") -} - -func (m *Metrics) RecordExpiredChallenge(hash []byte) { - m.RecordChallenge("expired") -} - -func (m *Metrics) RecordStorageError() { - m.StorageErrors.Record() -} - -func (m *Metrics) RecordChallengesHead(name string, num uint64) { - m.ChallengesHead.WithLabelValues(name).Set(float64(num)) -} - -type NoopMetrics struct{} - -func (m *NoopMetrics) RecordActiveChallenge(commBlock uint64, startBlock uint64, hash []byte) {} -func (m *NoopMetrics) RecordResolvedChallenge(hash []byte) {} -func (m *NoopMetrics) RecordExpiredChallenge(hash []byte) {} -func (m *NoopMetrics) RecordChallengesHead(name string, num uint64) {} -func (m *NoopMetrics) RecordStorageError() {} diff --git a/metrics/metrics.go b/metrics/metrics.go new file mode 100644 index 00000000..f1392107 --- /dev/null +++ b/metrics/metrics.go @@ -0,0 +1,120 @@ +package metrics + +import ( + "net" + "strconv" + + ophttp "github.com/ethereum-optimism/optimism/op-service/httputil" + + "github.com/ethereum-optimism/optimism/op-service/metrics" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +const ( + Namespace = "plasma_eigenda" +) + +// Config ... Metrics server configuration +type Config struct { + Host string + Port int + Enabled bool + ReadHeaderTimeout int +} + +// Metricer ... Interface for metrics +type Metricer interface { + RecordInfo(version string) + RecordUp() + RecordRPCServerRequest(method string) func() + RecordRPCClientResponse(method string, err error) + + Document() []metrics.DocumentedMetric +} + +// Metrics ... Metrics struct +type Metrics struct { + Info *prometheus.GaugeVec + Up prometheus.Gauge + + metrics.RPCMetrics + + registry *prometheus.Registry + factory metrics.Factory +} + +var _ Metricer = (*Metrics)(nil) + +func NewMetrics(procName string) *Metrics { + if procName == "" { + procName = "default" + } + ns := Namespace + "_" + procName + + registry := prometheus.NewRegistry() + registry.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) + registry.MustRegister(collectors.NewGoCollector()) + factory := metrics.With(registry) + + return &Metrics{ + Up: factory.NewGauge(prometheus.GaugeOpts{ + Namespace: ns, + Name: "up", + Help: "1 if the plasma server has finished starting up", + }), + Info: factory.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, + Name: "info", + Help: "Pseudo-metric tracking version and config info", + }, []string{ + "version", + }), + RPCMetrics: metrics.MakeRPCMetrics(ns, factory), + registry: registry, + factory: factory, + } + +} + +// RecordInfo sets a pseudo-metric that contains versioning and +// config info for the plasma DA node. +func (m *Metrics) RecordInfo(version string) { + m.Info.WithLabelValues(version).Set(1) +} + +// RecordUp sets the up metric to 1. +func (m *Metrics) RecordUp() { + prometheus.MustRegister() + m.Up.Set(1) +} + +// StartServer starts the metrics server on the given hostname and port. +func (m *Metrics) StartServer(hostname string, port int) (*ophttp.HTTPServer, error) { + addr := net.JoinHostPort(hostname, strconv.Itoa(port)) + h := promhttp.InstrumentMetricHandler( + m.registry, promhttp.HandlerFor(m.registry, promhttp.HandlerOpts{}), + ) + return ophttp.StartHTTPServer(addr, h) +} + +func (m *Metrics) Document() []metrics.DocumentedMetric { + return m.factory.Document() +} + +type noopMetricer struct { + metrics.NoopRPCMetrics +} + +var NoopMetrics Metricer = new(noopMetricer) + +func (n *noopMetricer) Document() []metrics.DocumentedMetric { + return nil +} + +func (n *noopMetricer) RecordInfo(version string) { +} + +func (n *noopMetricer) RecordUp() { +}