Skip to content

Commit

Permalink
refactor: move everything to the schema package
Browse files Browse the repository at this point in the history
  • Loading branch information
evan-forbes committed Aug 6, 2023
1 parent 07b839f commit b918d57
Show file tree
Hide file tree
Showing 12 changed files with 219 additions and 96 deletions.
25 changes: 12 additions & 13 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ var (

minSubscriptionBufferSize = 100
defaultSubscriptionBufferSize = 200

// DefaultInfluxTables is a list of tables that are used for storing traces.
// This global var is filled by an init function in the schema package. This
// allows for the schema package to contain all the relevant logic while
// avoiding import cycles.
DefaultInfluxTables = []string{}
)

// Config defines the top level configuration for a CometBFT node
Expand Down Expand Up @@ -1191,10 +1197,9 @@ type InstrumentationConfig struct {
// InfluxBatchSize is the number of points to write in a single batch.
InfluxBatchSize int `mapstructure:"influx_batch_size"`

// TracingProfiles is a list of profile types to be traced with influxdb.
// All available profiles are: mempool_tx, mempool_state, consensus_state,
// votes, and p2p.
TracingTables []string `mapstructure:"tracing_profiles"`
// InfluxTables is the list of tables that will be traced. See the
// pkg/trace/schema for a complete list of tables.
InfluxTables []string `mapstructure:"influx_tables"`

// PyroscopeURL is the pyroscope url used to establish a connection with a
// pyroscope continuous profiling server.
Expand Down Expand Up @@ -1222,15 +1227,9 @@ func DefaultInstrumentationConfig() *InstrumentationConfig {
InfluxOrg: "celestia",
InfluxBucket: "e2e",
InfluxBatchSize: 20,
TracingTables: []string{
"mempool_tx",
"mempool_state",
"consensus_state",
"votes",
"p2p",
},
PyroscopeURL: "",
PyroscopeTrace: false,
InfluxTables: DefaultInfluxTables,
PyroscopeURL: "",
PyroscopeTrace: false,
PyroscopeProfileTypes: []string{
"cpu",
"alloc_objects",
Expand Down
4 changes: 4 additions & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,10 @@ influx_org = "{{ .Instrumentation.InfluxOrg }}"
# The size of the batches that are sent to the database.
influx_batch_size = {{ .Instrumentation.InfluxBatchSize }}
# The list of tables that are updated when tracing. All available tables and
# their schema can be found in the pkg/trace/schema package.
influx_tables = [{{ range .Instrumentation.InfluxTables }}{{ printf "%q, " . }}{{end}}]
# The URL of the pyroscope instance to use for continuous profiling.
# If empty, continuous profiling is disabled.
pyroscope_url = "{{ .Instrumentation.PyroscopeURL }}"
Expand Down
13 changes: 8 additions & 5 deletions consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
cmtsync "github.com/tendermint/tendermint/libs/sync"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/pkg/trace"
"github.com/tendermint/tendermint/pkg/trace/schema"
cmtproto "github.com/tendermint/tendermint/proto/tendermint/types"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
Expand Down Expand Up @@ -142,7 +143,7 @@ type State struct {
// for reporting metrics
metrics *Metrics

eventCollector *trace.Client
traceClient *trace.Client
}

// StateOption sets an optional parameter on the State.
Expand Down Expand Up @@ -173,7 +174,7 @@ func NewState(
evpool: evpool,
evsw: cmtevents.NewEventSwitch(),
metrics: NopMetrics(),
eventCollector: &trace.Client{},
traceClient: &trace.Client{},
}

// set function defaults (may be overwritten before calling Start)
Expand Down Expand Up @@ -215,9 +216,9 @@ func StateMetrics(metrics *Metrics) StateOption {
return func(cs *State) { cs.metrics = metrics }
}

// SetEventCollector sets the remote event collector.
func SetEventCollector(ec *trace.Client) StateOption {
return func(cs *State) { cs.eventCollector = ec }
// SetTraceClient sets the remote event collector.
func SetTraceClient(ec *trace.Client) StateOption {
return func(cs *State) { cs.traceClient = ec }
}

// String returns a string.
Expand Down Expand Up @@ -703,6 +704,8 @@ func (cs *State) newStep() {

cs.nSteps++

schema.WriteRoundState(cs.traceClient, cs.Height, cs.Round, cs.Step)

// newStep is called by updateToState in NewState before the eventBus is set!
if cs.eventBus != nil {
if err := cs.eventBus.PublishEventNewRoundStep(rs); err != nil {
Expand Down
25 changes: 13 additions & 12 deletions mempool/cat/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/pkg/trace"
"github.com/tendermint/tendermint/pkg/trace/schema"
protomem "github.com/tendermint/tendermint/proto/tendermint/mempool"
"github.com/tendermint/tendermint/types"
)
Expand Down Expand Up @@ -209,7 +210,7 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
// flooded the network with transactions.
case *protomem.Txs:
for _, tx := range msg.Txs {
mempool.WriteTxTracingPoint(memR.evCollector, e.Src.ID(), tx, mempool.TransferTypeDownload, mempool.CatVersionFieldValue)
schema.WriteMempoolTx(memR.evCollector, e.Src.ID(), tx, schema.TransferTypeDownload, schema.CatVersionFieldValue)
}
protoTxs := msg.GetTxs()
if len(protoTxs) == 0 {
Expand Down Expand Up @@ -253,12 +254,12 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
// 3. If we recently evicted the tx and still don't have space for it, we do nothing.
// 4. Else, we request the transaction from that peer.
case *protomem.SeenTx:
mempool.WriteStateTracingPoint(
schema.WriteMempoolPeerState(
memR.evCollector,
e.Src.ID(),
mempool.SeenTxStateUpdateFieldValue,
mempool.TransferTypeDownload,
mempool.CatVersionFieldValue,
schema.SeenTxStateUpdateFieldValue,
schema.TransferTypeDownload,
schema.CatVersionFieldValue,
)
txKey, err := types.TxKeyFromBytes(msg.TxKey)
if err != nil {
Expand Down Expand Up @@ -287,12 +288,12 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
// A peer is requesting a transaction that we have claimed to have. Find the specified
// transaction and broadcast it to the peer. We may no longer have the transaction
case *protomem.WantTx:
mempool.WriteStateTracingPoint(
schema.WriteMempoolPeerState(
memR.evCollector,
e.Src.ID(),
mempool.WantTxStateUpdateFieldValue,
mempool.TransferTypeDownload,
mempool.CatVersionFieldValue,
schema.WantTxStateUpdateFieldValue,
schema.TransferTypeDownload,
schema.CatVersionFieldValue,
)
txKey, err := types.TxKeyFromBytes(msg.TxKey)
if err != nil {
Expand All @@ -303,12 +304,12 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
tx, has := memR.mempool.Get(txKey)
if has && !memR.opts.ListenOnly {
peerID := memR.ids.GetIDForPeer(e.Src.ID())
mempool.WriteTxTracingPoint(
schema.WriteMempoolTx(
memR.evCollector,
e.Src.ID(),
msg.TxKey,
mempool.TransferTypeUpload,
mempool.CatVersionFieldValue,
schema.TransferTypeUpload,
schema.CatVersionFieldValue,
)
memR.Logger.Debug("sending a tx in response to a want msg", "peer", peerID)
if p2p.SendEnvelopeShim(e.Src, p2p.Envelope{ //nolint:staticcheck
Expand Down
13 changes: 7 additions & 6 deletions mempool/v1/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/pkg/trace"
"github.com/tendermint/tendermint/pkg/trace/schema"
protomem "github.com/tendermint/tendermint/proto/tendermint/mempool"
"github.com/tendermint/tendermint/types"
)
Expand Down Expand Up @@ -164,12 +165,12 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
switch msg := e.Message.(type) {
case *protomem.Txs:
for _, tx := range msg.Txs {
mempool.WriteTxTracingPoint(
schema.WriteMempoolTx(
memR.evCollector,
e.Src.ID(),
tx,
mempool.TransferTypeDownload,
mempool.V1VersionFieldValue,
schema.TransferTypeDownload,
schema.V1VersionFieldValue,
)
}
protoTxs := msg.GetTxs()
Expand Down Expand Up @@ -282,12 +283,12 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
time.Sleep(mempool.PeerCatchupSleepIntervalMS * time.Millisecond)
continue
}
mempool.WriteTxTracingPoint(
schema.WriteMempoolTx(
memR.evCollector,
peer.ID(),
memTx.tx,
mempool.TransferTypeUpload,
mempool.V1VersionFieldValue,
schema.TransferTypeUpload,
schema.V1VersionFieldValue,
)
}

Expand Down
2 changes: 1 addition & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ func createConsensusReactor(config *cfg.Config,
mempool,
evidencePool,
cs.StateMetrics(csMetrics),
cs.SetEventCollector(evCollector),
cs.SetTraceClient(evCollector),
)
consensusState.SetLogger(consensusLogger)
if privValidator != nil {
Expand Down
61 changes: 51 additions & 10 deletions pkg/trace/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@ example, we're pushing a point in the consensus reactor to measure exactly when
each step of consensus is reached for each node.

```go
if cs.eventCollector.IsCollecting() {
cs.eventCollector.WritePoint("consensus", map[string]interface{}{
"roundData": []interface{}{rs.Height, rs.Round, rs.Step},
})
}
client.WritePoint(RoundStateTable, map[string]interface{}{
HeightFieldKey: height,
RoundFieldKey: round,
StepFieldKey: step.String(),
})
```

Using this method enforces the typical schema, where we are tagging (aka
indexing) each point by the chain-id and the node-id, then adding the local time
of the creation of the event. If you need to push a custom point, you can use
the underlying client directly. See influxdb2.WriteAPI for more details.
the underlying client directly. See `influxdb2.WriteAPI` for more details.

### Schema

Expand All @@ -40,19 +40,54 @@ node.
from(bucket: "e2e")
|> range(start: -1h)
|> filter(
fn: (r) => r["_measurement"] == "consensus"
fn: (r) => r["_measurement"] == "consensus_round_state"
and r.chain_id == "ci-YREG8X"
and r.node_id == "0b529c309608172a29c49979394734260b42acfb"
)
```

We can easily retrieve all fields in a relatively standard table format by using
the pivot `fluxQL` command.

```flux
from(bucket: "mocha")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "consensus_round_state")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
```

### Querying Data Using Python

Python can be used to quickly search for and isolate specific patterns.

```python
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS

client = InfluxDBClient(url="http://your-influx-url:8086/", token="your-influx-token", org="celestia")

query_api = client.query_api()

def create_flux_table_query(start, bucket, measurement, filter_clause):
flux_table_query = f'''
from(bucket: "{bucket}")
|> range(start: {start})
|> filter(fn: (r) => r._measurement == "{measurement}")
{filter_clause}
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
'''
return flux_table_query

query = create_flux_table_query("-1h", "mocha", "consenus_round_state", "")
result = query_api.query(query=query)
```

### Running a node with remote tracing on

Tracing will only occur if an influxdb URL in specified either directly in the
`config.toml` or as flags provided to the start sub command.

configure in the config.toml
#### Configure in the `config.toml`

```toml
#######################################################
Expand All @@ -62,7 +97,7 @@ configure in the config.toml

...

# The URL of the influxdb instance to use for remote event
# The URL of the influxdb instance to use for remote event
# collection. If empty, remote event collection is disabled.
influx_url = "http://your-influx-ip:8086/"

Expand All @@ -77,9 +112,15 @@ influx_org = "celestia"

# The size of the batches that are sent to the database.
influx_batch_size = 20

# The list of tables that are updated when tracing. All available tables and
# their schema can be found in the pkg/trace/schema package.
influx_tables = ["consensus_round_state", "mempool_tx", ]

```

or
or

```sh
celestia-appd start --influxdb-url=http://your-influx-ip:8086/ --influxdb-token="your-token"
```
Expand Down
16 changes: 3 additions & 13 deletions pkg/trace/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ const (
ChainIDTag = "chain_id"
)

// EventCollectorConfig is the influxdb client configuration used for
// ClientConfigConfig is the influxdb client configuration used for
// collecting events.
type EventCollectorConfig struct {
type ClientConfigConfig struct {
// URL is the influxdb url.
URL string `mapstructure:"influx_url"`
// Token is the influxdb token.
Expand All @@ -31,16 +31,6 @@ type EventCollectorConfig struct {
BatchSize int `mapstructure:"influx_batch_size"`
}

// DefaultEventCollectorConfig returns the default configuration.
func DefaultEventCollectorConfig() EventCollectorConfig {
return EventCollectorConfig{
URL: "",
Org: "celestia",
Bucket: "e2e",
BatchSize: 10,
}
}

// Client is an influxdb client that can be used to push events to influxdb. It
// is used to collect trace data from many different nodes in a network. If
// there is no URL in the config.toml, then the underlying client is nil and no
Expand Down Expand Up @@ -91,7 +81,7 @@ func NewClient(cfg *config.InstrumentationConfig, logger log.Logger, chainID, no
cancel: cancel,
chainID: chainID,
nodeID: nodeID,
tables: sliceToMap(cfg.TracingTables),
tables: sliceToMap(cfg.InfluxTables),
}
if cfg == nil || cfg.InfluxURL == "" {
return cli, nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/trace/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ each step of consensus is reached for each node.
```go
if cs.eventCollector.IsCollecting() {
cs.eventCollector.WritePoint("consensus", map[string]interface{}{
if cs.traceClient.IsCollecting() {
cs.traceClient.WritePoint("consensus", map[string]interface{}{
"roundData": []interface{}{rs.Height, rs.Round, rs.Step},
})
}
Expand Down
Loading

0 comments on commit b918d57

Please sign in to comment.