Skip to content

Commit

Permalink
feat: Add standard tracing points to portions of the mempool and cons…
Browse files Browse the repository at this point in the history
…ensus (#1055)

## Description

This PR updates the trace client with a slight refactor (renaming to be
more consistent) and then adds a few standard tables to store traced
information. We are also able to select which tables we wish to update
(and therefore which information to trace) in the config.

closes #978

---------

Co-authored-by: Rootul P <rootulp@gmail.com>
  • Loading branch information
evan-forbes and rootulp committed Aug 8, 2023
1 parent 518e0e1 commit 2f93fc8
Show file tree
Hide file tree
Showing 15 changed files with 444 additions and 60 deletions.
11 changes: 11 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ var (

minSubscriptionBufferSize = 100
defaultSubscriptionBufferSize = 200

// DefaultInfluxTables is a list of tables that are used for storing traces.
// This global var is filled by an init function in the schema package. This
// allows for the schema package to contain all the relevant logic while
// avoiding import cycles.
DefaultInfluxTables = []string{}
)

// Config defines the top level configuration for a CometBFT node
Expand Down Expand Up @@ -1191,6 +1197,10 @@ type InstrumentationConfig struct {
// InfluxBatchSize is the number of points to write in a single batch.
InfluxBatchSize int `mapstructure:"influx_batch_size"`

// InfluxTables is the list of tables that will be traced. See the
// pkg/trace/schema for a complete list of tables.
InfluxTables []string `mapstructure:"influx_tables"`

// PyroscopeURL is the pyroscope url used to establish a connection with a
// pyroscope continuous profiling server.
PyroscopeURL string `mapstructure:"pyroscope_url"`
Expand All @@ -1217,6 +1227,7 @@ func DefaultInstrumentationConfig() *InstrumentationConfig {
InfluxOrg: "celestia",
InfluxBucket: "e2e",
InfluxBatchSize: 20,
InfluxTables: DefaultInfluxTables,
PyroscopeURL: "",
PyroscopeTrace: false,
PyroscopeProfileTypes: []string{
Expand Down
4 changes: 4 additions & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,10 @@ influx_org = "{{ .Instrumentation.InfluxOrg }}"
# The size of the batches that are sent to the database.
influx_batch_size = {{ .Instrumentation.InfluxBatchSize }}
# The list of tables that are updated when tracing. All available tables and
# their schema can be found in the pkg/trace/schema package.
influx_tables = [{{ range .Instrumentation.InfluxTables }}{{ printf "%q, " . }}{{end}}]
# The URL of the pyroscope instance to use for continuous profiling.
# If empty, continuous profiling is disabled.
pyroscope_url = "{{ .Instrumentation.PyroscopeURL }}"
Expand Down
20 changes: 15 additions & 5 deletions consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/tendermint/tendermint/libs/log"
cmtsync "github.com/tendermint/tendermint/libs/sync"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/pkg/trace"
"github.com/tendermint/tendermint/pkg/trace/schema"
cmtcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
cmtproto "github.com/tendermint/tendermint/proto/tendermint/types"
sm "github.com/tendermint/tendermint/state"
Expand Down Expand Up @@ -47,7 +49,8 @@ type Reactor struct {
eventBus *types.EventBus
rs *cstypes.RoundState

Metrics *Metrics
Metrics *Metrics
traceClient *trace.Client
}

type ReactorOption func(*Reactor)
Expand All @@ -56,10 +59,11 @@ type ReactorOption func(*Reactor)
// consensusState.
func NewReactor(consensusState *State, waitSync bool, options ...ReactorOption) *Reactor {
conR := &Reactor{
conS: consensusState,
waitSync: waitSync,
rs: consensusState.GetRoundState(),
Metrics: NopMetrics(),
conS: consensusState,
waitSync: waitSync,
rs: consensusState.GetRoundState(),
Metrics: NopMetrics(),
traceClient: &trace.Client{},
}
conR.BaseReactor = *p2p.NewBaseReactor("Consensus", conR)

Expand Down Expand Up @@ -334,6 +338,7 @@ func (conR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
case *BlockPartMessage:
ps.SetHasProposalBlockPart(msg.Height, msg.Round, int(msg.Part.Index))
conR.Metrics.BlockParts.With("peer_id", string(e.Src.ID())).Add(1)
schema.WriteBlockPart(conR.traceClient, msg.Height, msg.Round, e.Src.ID(), msg.Part.Index, schema.TransferTypeDownload)
conR.conS.peerMsgQueue <- msgInfo{msg, e.Src.ID()}
default:
conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
Expand Down Expand Up @@ -590,6 +595,7 @@ OUTER_LOOP:
Part: *parts,
},
}, logger) {
schema.WriteBlockPart(conR.traceClient, rs.Height, rs.Round, peer.ID(), part.Index, schema.TransferTypeUpload)
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
}
continue OUTER_LOOP
Expand Down Expand Up @@ -1021,6 +1027,10 @@ func ReactorMetrics(metrics *Metrics) ReactorOption {
return func(conR *Reactor) { conR.Metrics = metrics }
}

func ReactorTracing(traceClient *trace.Client) ReactorOption {
return func(conR *Reactor) { conR.traceClient = traceClient }
}

//-----------------------------------------------------------------------------

var (
Expand Down
13 changes: 8 additions & 5 deletions consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
cmtsync "github.com/tendermint/tendermint/libs/sync"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/pkg/trace"
"github.com/tendermint/tendermint/pkg/trace/schema"
cmtproto "github.com/tendermint/tendermint/proto/tendermint/types"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
Expand Down Expand Up @@ -142,7 +143,7 @@ type State struct {
// for reporting metrics
metrics *Metrics

eventCollector *trace.Client
traceClient *trace.Client
}

// StateOption sets an optional parameter on the State.
Expand Down Expand Up @@ -173,7 +174,7 @@ func NewState(
evpool: evpool,
evsw: cmtevents.NewEventSwitch(),
metrics: NopMetrics(),
eventCollector: &trace.Client{},
traceClient: &trace.Client{},
}

// set function defaults (may be overwritten before calling Start)
Expand Down Expand Up @@ -215,9 +216,9 @@ func StateMetrics(metrics *Metrics) StateOption {
return func(cs *State) { cs.metrics = metrics }
}

// SetEventCollector sets the remote event collector.
func SetEventCollector(ec *trace.Client) StateOption {
return func(cs *State) { cs.eventCollector = ec }
// SetTraceClient sets the remote event collector.
func SetTraceClient(ec *trace.Client) StateOption {
return func(cs *State) { cs.traceClient = ec }
}

// String returns a string.
Expand Down Expand Up @@ -703,6 +704,8 @@ func (cs *State) newStep() {

cs.nSteps++

schema.WriteRoundState(cs.traceClient, cs.Height, cs.Round, cs.Step)

// newStep is called by updateToState in NewState before the eventBus is set!
if cs.eventBus != nil {
if err := cs.eventBus.PublishEventNewRoundStep(rs); err != nil {
Expand Down
47 changes: 39 additions & 8 deletions mempool/cat/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/pkg/trace"
"github.com/tendermint/tendermint/pkg/trace/schema"
protomem "github.com/tendermint/tendermint/proto/tendermint/mempool"
"github.com/tendermint/tendermint/types"
)
Expand All @@ -35,10 +37,11 @@ const (
// spec under /.spec.md
type Reactor struct {
p2p.BaseReactor
opts *ReactorOptions
mempool *TxPool
ids *mempoolIDs
requests *requestScheduler
opts *ReactorOptions
mempool *TxPool
ids *mempoolIDs
requests *requestScheduler
traceClient *trace.Client
}

type ReactorOptions struct {
Expand All @@ -52,6 +55,9 @@ type ReactorOptions struct {
// MaxGossipDelay is the maximum allotted time that the reactor expects a transaction to
// arrive before issuing a new request to a different peer
MaxGossipDelay time.Duration

// TraceClient is the trace client for collecting trace level events
TraceClient *trace.Client
}

func (opts *ReactorOptions) VerifyAndComplete() error {
Expand Down Expand Up @@ -81,10 +87,11 @@ func NewReactor(mempool *TxPool, opts *ReactorOptions) (*Reactor, error) {
return nil, err
}
memR := &Reactor{
opts: opts,
mempool: mempool,
ids: newMempoolIDs(),
requests: newRequestScheduler(opts.MaxGossipDelay, defaultGlobalRequestTimeout),
opts: opts,
mempool: mempool,
ids: newMempoolIDs(),
requests: newRequestScheduler(opts.MaxGossipDelay, defaultGlobalRequestTimeout),
traceClient: &trace.Client{},
}
memR.BaseReactor = *p2p.NewBaseReactor("Mempool", memR)
return memR, nil
Expand Down Expand Up @@ -203,6 +210,9 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
// NOTE: This setup also means that we can support older mempool implementations that simply
// flooded the network with transactions.
case *protomem.Txs:
for _, tx := range msg.Txs {
schema.WriteMempoolTx(memR.traceClient, e.Src.ID(), tx, schema.TransferTypeDownload, schema.CatVersionFieldValue)
}
protoTxs := msg.GetTxs()
if len(protoTxs) == 0 {
memR.Logger.Error("received empty txs from peer", "src", e.Src)
Expand Down Expand Up @@ -245,6 +255,13 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
// 3. If we recently evicted the tx and still don't have space for it, we do nothing.
// 4. Else, we request the transaction from that peer.
case *protomem.SeenTx:
schema.WriteMempoolPeerState(
memR.traceClient,
e.Src.ID(),
schema.SeenTxStateUpdateFieldValue,
schema.TransferTypeDownload,
schema.CatVersionFieldValue,
)
txKey, err := types.TxKeyFromBytes(msg.TxKey)
if err != nil {
memR.Logger.Error("peer sent SeenTx with incorrect tx key", "err", err)
Expand Down Expand Up @@ -272,6 +289,13 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
// A peer is requesting a transaction that we have claimed to have. Find the specified
// transaction and broadcast it to the peer. We may no longer have the transaction
case *protomem.WantTx:
schema.WriteMempoolPeerState(
memR.traceClient,
e.Src.ID(),
schema.WantTxStateUpdateFieldValue,
schema.TransferTypeDownload,
schema.CatVersionFieldValue,
)
txKey, err := types.TxKeyFromBytes(msg.TxKey)
if err != nil {
memR.Logger.Error("peer sent WantTx with incorrect tx key", "err", err)
Expand All @@ -281,6 +305,13 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
tx, has := memR.mempool.Get(txKey)
if has && !memR.opts.ListenOnly {
peerID := memR.ids.GetIDForPeer(e.Src.ID())
schema.WriteMempoolTx(
memR.traceClient,
e.Src.ID(),
msg.TxKey,
schema.TransferTypeUpload,
schema.CatVersionFieldValue,
)
memR.Logger.Debug("sending a tx in response to a want msg", "peer", peerID)
if p2p.SendEnvelopeShim(e.Src, p2p.Envelope{ //nolint:staticcheck
ChannelID: mempool.MempoolChannel,
Expand Down
34 changes: 27 additions & 7 deletions mempool/v1/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
cmtsync "github.com/tendermint/tendermint/libs/sync"
"github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/pkg/trace"
"github.com/tendermint/tendermint/pkg/trace/schema"
protomem "github.com/tendermint/tendermint/proto/tendermint/mempool"
"github.com/tendermint/tendermint/types"
)
Expand All @@ -22,9 +24,10 @@ import (
// peers you received it from.
type Reactor struct {
p2p.BaseReactor
config *cfg.MempoolConfig
mempool *TxMempool
ids *mempoolIDs
config *cfg.MempoolConfig
mempool *TxMempool
ids *mempoolIDs
traceClient *trace.Client
}

type mempoolIDs struct {
Expand Down Expand Up @@ -91,11 +94,12 @@ func newMempoolIDs() *mempoolIDs {
}

// NewReactor returns a new Reactor with the given config and mempool.
func NewReactor(config *cfg.MempoolConfig, mempool *TxMempool) *Reactor {
func NewReactor(config *cfg.MempoolConfig, mempool *TxMempool, traceClient *trace.Client) *Reactor {
memR := &Reactor{
config: config,
mempool: mempool,
ids: newMempoolIDs(),
config: config,
mempool: mempool,
ids: newMempoolIDs(),
traceClient: traceClient,
}
memR.BaseReactor = *p2p.NewBaseReactor("Mempool", memR)
return memR
Expand Down Expand Up @@ -160,6 +164,15 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
memR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)
switch msg := e.Message.(type) {
case *protomem.Txs:
for _, tx := range msg.Txs {
schema.WriteMempoolTx(
memR.traceClient,
e.Src.ID(),
tx,
schema.TransferTypeDownload,
schema.V1VersionFieldValue,
)
}
protoTxs := msg.GetTxs()
if len(protoTxs) == 0 {
memR.Logger.Error("received tmpty txs from peer", "src", e.Src)
Expand Down Expand Up @@ -270,6 +283,13 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
time.Sleep(mempool.PeerCatchupSleepIntervalMS * time.Millisecond)
continue
}
schema.WriteMempoolTx(
memR.traceClient,
peer.ID(),
memTx.tx,
schema.TransferTypeUpload,
schema.V1VersionFieldValue,
)
}

select {
Expand Down
3 changes: 2 additions & 1 deletion mempool/v1/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/tendermint/tendermint/abci/example/kvstore"
"github.com/tendermint/tendermint/p2p/mock"
"github.com/tendermint/tendermint/pkg/trace"

cfg "github.com/tendermint/tendermint/config"

Expand Down Expand Up @@ -133,7 +134,7 @@ func makeAndConnectReactors(config *cfg.Config, n int) []*Reactor {
mempool, cleanup := newMempoolWithApp(cc)
defer cleanup()

reactors[i] = NewReactor(config.Mempool, mempool) // so we dont start the consensus states
reactors[i] = NewReactor(config.Mempool, mempool, &trace.Client{}) // so we dont start the consensus states
reactors[i].SetLogger(logger.With("validator", i))
}

Expand Down
Loading

0 comments on commit 2f93fc8

Please sign in to comment.