Skip to content

Commit

Permalink
app: generic consensus debugger (#3301)
Browse files Browse the repository at this point in the history
Towards larger consensus abstraction work: reworked QBFT messages debugger to be the generic consensus messages debugger (except for wire representation). This is to eliminate QBFT specific leaked from the consensus/qbft packages.

Breaking change: the debug endpoint `/debug/qbft` has been changed to `/debug/consensus` (and the received file name is changed from `qbft_messages.pb.gz` to `consensus_messages.pb.gz`).
We are not aware of any systems or users actively using or relying on that endpoint in production.

category: refactor
ticket: #3299
  • Loading branch information
pinebit authored Sep 25, 2024
1 parent 11f64c1 commit 8876c3c
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 37 deletions.
10 changes: 5 additions & 5 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func Run(ctx context.Context, conf Config) (err error) {

wirePeerInfo(life, tcpNode, peerIDs, cluster.GetInitialMutationHash(), sender, conf.BuilderAPI)

qbftDebug := newQBFTDebugger()
consensusDebugger := consensus.NewDebugger()

// seenPubkeys channel to send seen public keys from validatorapi to monitoringapi.
seenPubkeys := make(chan core.PubKey)
Expand All @@ -282,10 +282,10 @@ func Run(ctx context.Context, conf Config) (err error) {
}

wireMonitoringAPI(ctx, life, conf.MonitoringAddr, conf.DebugAddr, tcpNode, eth2Cl, peerIDs,
promRegistry, qbftDebug, pubkeys, seenPubkeys, vapiCalls, len(cluster.GetValidators()))
promRegistry, consensusDebugger, pubkeys, seenPubkeys, vapiCalls, len(cluster.GetValidators()))

err = wireCoreWorkflow(ctx, life, conf, cluster, nodeIdx, tcpNode, p2pKey, eth2Cl, subEth2Cl,
peerIDs, sender, qbftDebug.AddInstance, seenPubkeysFunc, vapiCallsFunc)
peerIDs, sender, consensusDebugger, seenPubkeysFunc, vapiCallsFunc)
if err != nil {
return err
}
Expand Down Expand Up @@ -357,7 +357,7 @@ func wireP2P(ctx context.Context, life *lifecycle.Manager, conf Config,
func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
cluster *manifestpb.Cluster, nodeIdx cluster.NodeIdx, tcpNode host.Host, p2pKey *k1.PrivateKey,
eth2Cl, submissionEth2Cl eth2wrap.Client, peerIDs []peer.ID, sender *p2p.Sender,
qbftSniffer func(*pbv1.SniffedConsensusInstance), seenPubkeys func(core.PubKey),
consensusDebugger consensus.Debugger, seenPubkeys func(core.PubKey),
vapiCalls func(),
) error {
// Convert and prep public keys and public shares
Expand Down Expand Up @@ -530,7 +530,7 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
retryer := retry.New[core.Duty](deadlineFunc)

cons, startCons, err := newConsensus(cluster, tcpNode, p2pKey, sender,
deadlinerFunc("consensus"), gaterFunc, qbftSniffer)
deadlinerFunc("consensus"), gaterFunc, consensusDebugger.AddInstance)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions app/monitoringapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ var (
// It serves prometheus metrics, pprof profiling and the runtime enr.
func wireMonitoringAPI(ctx context.Context, life *lifecycle.Manager, promAddr, debugAddr string,
tcpNode host.Host, eth2Cl eth2wrap.Client,
peerIDs []peer.ID, registry *prometheus.Registry, qbftDebug http.Handler,
peerIDs []peer.ID, registry *prometheus.Registry, consensusDebugger http.Handler,
pubkeys []core.PubKey, seenPubkeys <-chan core.PubKey, vapiCalls <-chan struct{},
numValidators int,
) {
Expand Down Expand Up @@ -93,8 +93,8 @@ func wireMonitoringAPI(ctx context.Context, life *lifecycle.Manager, promAddr, d
if debugAddr != "" {
debugMux := http.NewServeMux()

// Serve sniffed qbft instances messages in gzipped protobuf format.
debugMux.Handle("/debug/qbft", qbftDebug)
// Serve sniffed consensus instances messages in gzipped protobuf format.
debugMux.Handle("/debug/consensus", consensusDebugger)

// Copied from net/http/pprof/pprof.go
debugMux.HandleFunc("/debug/pprof/", pprof.Index)
Expand Down
50 changes: 27 additions & 23 deletions app/qbftdebug.go → core/consensus/debugger.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright © 2022-2024 Obol Labs Inc. Licensed under the terms of a Business Source License 1.1

package app
package consensus

import (
"bytes"
Expand All @@ -16,77 +16,81 @@ import (
pbv1 "github.com/obolnetwork/charon/core/corepb/v1"
)

const maxQBFTDebugger = 50 * (1 << 20) // 50 MB.
const maxDebuggerBuffer = 50 * (1 << 20) // 50 MB.

// newQBFTDebugger returns a new qbftDebugger.
func newQBFTDebugger() *qbftDebugger {
// Debugger is an interface for debugging consensus messages.
type Debugger interface {
http.Handler

AddInstance(instance *pbv1.SniffedConsensusInstance)
}

// NewDebugger returns a new debugger.
func NewDebugger() Debugger {
gitHash, _ := version.GitCommit()

return &qbftDebugger{
return &debugger{
gitHash: gitHash,
}
}

// qbftDebugger buffers up to 2MB worth of sniffed qbft messages in a fifo buffer serving them as a gzipped
// debugger buffers sniffed consensus messages in a fifo buffer serving them as a gzipped
// *pbv1.SniffedConsensusSets protobuf on request.
type qbftDebugger struct {
type debugger struct {
gitHash string

mu sync.Mutex
totalSize int
sets []*pbv1.SniffedConsensusInstance
}

// AddInstance adds the instance to the fifo buffer, removing older messages if the max size is exceeded.
func (d *qbftDebugger) AddInstance(instance *pbv1.SniffedConsensusInstance) {
// AddInstance adds the instance to the fifo buffer, removing older messages if the capacity is exceeded.
func (d *debugger) AddInstance(instance *pbv1.SniffedConsensusInstance) {
d.mu.Lock()
defer d.mu.Unlock()

// getSize returns the size of the proto or false.
getSize := func(instance *pbv1.SniffedConsensusInstance) (int, bool) {
b, err := proto.Marshal(instance)
return len(b), err == nil
getSize := func(instance *pbv1.SniffedConsensusInstance) int {
return proto.Size(instance)
}

size, ok := getSize(instance)
if !ok {
return // Just drop this if we cannot calculate the size
}
size := getSize(instance)

d.totalSize += size
d.sets = append(d.sets, instance)

for d.totalSize > maxQBFTDebugger {
dropped, _ := getSize(d.sets[0]) // Ignoring ok is ok here since we got the size when we added it.
for d.totalSize > maxDebuggerBuffer {
dropped := getSize(d.sets[0])
d.totalSize -= dropped
d.sets = d.sets[1:]
}
}

// ServeHTTP serves sniffed qbft messages in a fifo buffer as a gzipped
// ServeHTTP serves sniffed consensus messages in a fifo buffer as a gzipped
// *pbv1.SniffedConsensusSets protobuf.
func (d *qbftDebugger) ServeHTTP(w http.ResponseWriter, r *http.Request) {
func (d *debugger) ServeHTTP(w http.ResponseWriter, r *http.Request) {
b, err := d.getZippedProto()
if err != nil {
log.Warn(r.Context(), "Error serving qbft debug", err)
log.Warn(r.Context(), "Error serving consensus debug", err)
http.Error(w, "something went wrong, see logs", http.StatusInternalServerError)

return
}

w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("Content-Disposition", `attachment; filename="qbft_messages.pb.gz"`)
w.Header().Set("Content-Disposition", `attachment; filename="consensus_messages.pb.gz"`)
_, _ = w.Write(b)
}

// getZippedProto returns a gzipped serialised *pbv1.SniffedConsensusSets protobuf of the fifo buffer.
func (d *qbftDebugger) getZippedProto() ([]byte, error) {
func (d *debugger) getZippedProto() ([]byte, error) {
d.mu.Lock()
b, err := proto.Marshal(&pbv1.SniffedConsensusInstances{
Instances: d.sets,
GitHash: d.gitHash,
})
d.mu.Unlock()

if err != nil {
return nil, errors.Wrap(err, "marshal proto")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright © 2022-2024 Obol Labs Inc. Licensed under the terms of a Business Source License 1.1

package app
package consensus

import (
"compress/gzip"
Expand All @@ -17,20 +17,21 @@ import (
pbv1 "github.com/obolnetwork/charon/core/corepb/v1"
)

func TestQBFTDebugger(t *testing.T) {
func TestDebugger(t *testing.T) {
var (
instances []*pbv1.SniffedConsensusInstance
debug = new(qbftDebugger)
debug = new(debugger)
)

for range 10 {
instance := &pbv1.SniffedConsensusInstance{
Msgs: []*pbv1.SniffedConsensusMsg{
{
Timestamp: timestamppb.Now(),
// Eventually the ConsensusMsg will be replaced by a more generic message type.
Msg: &pbv1.ConsensusMsg{
Msg: randomQBFTMessage(),
Justification: []*pbv1.QBFTMsg{randomQBFTMessage(), randomQBFTMessage()},
Msg: randomQBFTMsg(),
Justification: []*pbv1.QBFTMsg{randomQBFTMsg(), randomQBFTMsg()},
},
},
},
Expand Down Expand Up @@ -58,7 +59,7 @@ func TestQBFTDebugger(t *testing.T) {
require.True(t, proto.Equal(&pbv1.SniffedConsensusInstances{Instances: instances}, resp))
}

func randomQBFTMessage() *pbv1.QBFTMsg {
func randomQBFTMsg() *pbv1.QBFTMsg {
return &pbv1.QBFTMsg{
Type: rand.Int63(),
Duty: &pbv1.Duty{Slot: rand.Uint64()},
Expand Down

0 comments on commit 8876c3c

Please sign in to comment.