diff --git a/.github/workflows/publish_antithesis_images.yml b/.github/workflows/publish_antithesis_images.yml index 0932c2f971b7..bdc8ce087344 100644 --- a/.github/workflows/publish_antithesis_images.yml +++ b/.github/workflows/publish_antithesis_images.yml @@ -30,7 +30,9 @@ jobs: password: ${{ secrets.ANTITHESIS_GAR_JSON_KEY }} - name: Set the Go version in the environment - uses: ./.github/actions/set-go-version-in-env + # Need an exact version vs the range (~x.x.x) provided by set-go-version-in-env action + run: echo GO_VERSION="$(go list -m -f '{{.GoVersion}}')" >> $GITHUB_ENV + shell: bash - name: Build node id: build-node-image diff --git a/database/meterdb/db.go b/database/meterdb/db.go index fd3b3b77d7a8..fe3165637494 100644 --- a/database/meterdb/db.go +++ b/database/meterdb/db.go @@ -5,88 +5,191 @@ package meterdb import ( "context" + "time" "github.com/prometheus/client_golang/prometheus" "github.com/ava-labs/avalanchego/database" - "github.com/ava-labs/avalanchego/utils/timer/mockable" + "github.com/ava-labs/avalanchego/utils" ) +const methodLabel = "method" + var ( _ database.Database = (*Database)(nil) _ database.Batch = (*batch)(nil) _ database.Iterator = (*iterator)(nil) + + methodLabels = []string{methodLabel} + hasLabel = prometheus.Labels{ + methodLabel: "has", + } + getLabel = prometheus.Labels{ + methodLabel: "get", + } + putLabel = prometheus.Labels{ + methodLabel: "put", + } + deleteLabel = prometheus.Labels{ + methodLabel: "delete", + } + newBatchLabel = prometheus.Labels{ + methodLabel: "new_batch", + } + newIteratorLabel = prometheus.Labels{ + methodLabel: "new_iterator", + } + compactLabel = prometheus.Labels{ + methodLabel: "compact", + } + closeLabel = prometheus.Labels{ + methodLabel: "close", + } + healthCheckLabel = prometheus.Labels{ + methodLabel: "health_check", + } + batchPutLabel = prometheus.Labels{ + methodLabel: "batch_put", + } + batchDeleteLabel = prometheus.Labels{ + methodLabel: "batch_delete", + } + batchSizeLabel = prometheus.Labels{ + methodLabel: "batch_size", + } + batchWriteLabel = prometheus.Labels{ + methodLabel: "batch_write", + } + batchResetLabel = prometheus.Labels{ + methodLabel: "batch_reset", + } + batchReplayLabel = prometheus.Labels{ + methodLabel: "batch_replay", + } + batchInnerLabel = prometheus.Labels{ + methodLabel: "batch_inner", + } + iteratorNextLabel = prometheus.Labels{ + methodLabel: "iterator_next", + } + iteratorErrorLabel = prometheus.Labels{ + methodLabel: "iterator_error", + } + iteratorKeyLabel = prometheus.Labels{ + methodLabel: "iterator_key", + } + iteratorValueLabel = prometheus.Labels{ + methodLabel: "iterator_value", + } + iteratorReleaseLabel = prometheus.Labels{ + methodLabel: "iterator_release", + } ) // Database tracks the amount of time each operation takes and how many bytes // are read/written to the underlying database instance. type Database struct { - metrics - db database.Database - clock mockable.Clock + db database.Database + + calls *prometheus.CounterVec + duration *prometheus.CounterVec + size *prometheus.CounterVec } // New returns a new database with added metrics func New( namespace string, - registerer prometheus.Registerer, + reg prometheus.Registerer, db database.Database, ) (*Database, error) { - metrics, err := newMetrics(namespace, registerer) - return &Database{ - metrics: metrics, - db: db, - }, err + meterDB := &Database{ + db: db, + calls: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "calls", + Help: "number of calls to the database", + }, + methodLabels, + ), + duration: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "duration", + Help: "time spent in database calls (ns)", + }, + methodLabels, + ), + size: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "size", + Help: "size of data passed in database calls", + }, + methodLabels, + ), + } + return meterDB, utils.Err( + reg.Register(meterDB.calls), + reg.Register(meterDB.duration), + reg.Register(meterDB.size), + ) } func (db *Database) Has(key []byte) (bool, error) { - start := db.clock.Time() + start := time.Now() has, err := db.db.Has(key) - end := db.clock.Time() - db.readSize.Observe(float64(len(key))) - db.has.Observe(float64(end.Sub(start))) - db.hasSize.Observe(float64(len(key))) + duration := time.Since(start) + + db.calls.With(hasLabel).Inc() + db.duration.With(hasLabel).Add(float64(duration)) + db.size.With(hasLabel).Add(float64(len(key))) return has, err } func (db *Database) Get(key []byte) ([]byte, error) { - start := db.clock.Time() + start := time.Now() value, err := db.db.Get(key) - end := db.clock.Time() - db.readSize.Observe(float64(len(key) + len(value))) - db.get.Observe(float64(end.Sub(start))) - db.getSize.Observe(float64(len(key) + len(value))) + duration := time.Since(start) + + db.calls.With(getLabel).Inc() + db.duration.With(getLabel).Add(float64(duration)) + db.size.With(getLabel).Add(float64(len(key) + len(value))) return value, err } func (db *Database) Put(key, value []byte) error { - start := db.clock.Time() + start := time.Now() err := db.db.Put(key, value) - end := db.clock.Time() - db.writeSize.Observe(float64(len(key) + len(value))) - db.put.Observe(float64(end.Sub(start))) - db.putSize.Observe(float64(len(key) + len(value))) + duration := time.Since(start) + + db.calls.With(putLabel).Inc() + db.duration.With(putLabel).Add(float64(duration)) + db.size.With(putLabel).Add(float64(len(key) + len(value))) return err } func (db *Database) Delete(key []byte) error { - start := db.clock.Time() + start := time.Now() err := db.db.Delete(key) - end := db.clock.Time() - db.writeSize.Observe(float64(len(key))) - db.delete.Observe(float64(end.Sub(start))) - db.deleteSize.Observe(float64(len(key))) + duration := time.Since(start) + + db.calls.With(deleteLabel).Inc() + db.duration.With(deleteLabel).Add(float64(duration)) + db.size.With(deleteLabel).Add(float64(len(key))) return err } func (db *Database) NewBatch() database.Batch { - start := db.clock.Time() + start := time.Now() b := &batch{ batch: db.db.NewBatch(), db: db, } - end := db.clock.Time() - db.newBatch.Observe(float64(end.Sub(start))) + duration := time.Since(start) + + db.calls.With(newBatchLabel).Inc() + db.duration.With(newBatchLabel).Add(float64(duration)) return b } @@ -106,37 +209,45 @@ func (db *Database) NewIteratorWithStartAndPrefix( start, prefix []byte, ) database.Iterator { - startTime := db.clock.Time() + startTime := time.Now() it := &iterator{ iterator: db.db.NewIteratorWithStartAndPrefix(start, prefix), db: db, } - end := db.clock.Time() - db.newIterator.Observe(float64(end.Sub(startTime))) + duration := time.Since(startTime) + + db.calls.With(newIteratorLabel).Inc() + db.duration.With(newIteratorLabel).Add(float64(duration)) return it } func (db *Database) Compact(start, limit []byte) error { - startTime := db.clock.Time() + startTime := time.Now() err := db.db.Compact(start, limit) - end := db.clock.Time() - db.compact.Observe(float64(end.Sub(startTime))) + duration := time.Since(startTime) + + db.calls.With(compactLabel).Inc() + db.duration.With(compactLabel).Add(float64(duration)) return err } func (db *Database) Close() error { - start := db.clock.Time() + start := time.Now() err := db.db.Close() - end := db.clock.Time() - db.close.Observe(float64(end.Sub(start))) + duration := time.Since(start) + + db.calls.With(closeLabel).Inc() + db.duration.With(closeLabel).Add(float64(duration)) return err } func (db *Database) HealthCheck(ctx context.Context) (interface{}, error) { - start := db.clock.Time() + start := time.Now() result, err := db.db.HealthCheck(ctx) - end := db.clock.Time() - db.healthCheck.Observe(float64(end.Sub(start))) + duration := time.Since(start) + + db.calls.With(healthCheckLabel).Inc() + db.duration.With(healthCheckLabel).Add(float64(duration)) return result, err } @@ -146,62 +257,75 @@ type batch struct { } func (b *batch) Put(key, value []byte) error { - start := b.db.clock.Time() + start := time.Now() err := b.batch.Put(key, value) - end := b.db.clock.Time() - b.db.bPut.Observe(float64(end.Sub(start))) - b.db.bPutSize.Observe(float64(len(key) + len(value))) + duration := time.Since(start) + + b.db.calls.With(batchPutLabel).Inc() + b.db.duration.With(batchPutLabel).Add(float64(duration)) + b.db.size.With(batchPutLabel).Add(float64(len(key) + len(value))) return err } func (b *batch) Delete(key []byte) error { - start := b.db.clock.Time() + start := time.Now() err := b.batch.Delete(key) - end := b.db.clock.Time() - b.db.bDelete.Observe(float64(end.Sub(start))) - b.db.bDeleteSize.Observe(float64(len(key))) + duration := time.Since(start) + + b.db.calls.With(batchDeleteLabel).Inc() + b.db.duration.With(batchDeleteLabel).Add(float64(duration)) + b.db.size.With(batchDeleteLabel).Add(float64(len(key))) return err } func (b *batch) Size() int { - start := b.db.clock.Time() + start := time.Now() size := b.batch.Size() - end := b.db.clock.Time() - b.db.bSize.Observe(float64(end.Sub(start))) + duration := time.Since(start) + + b.db.calls.With(batchSizeLabel).Inc() + b.db.duration.With(batchSizeLabel).Add(float64(duration)) return size } func (b *batch) Write() error { - start := b.db.clock.Time() + start := time.Now() err := b.batch.Write() - end := b.db.clock.Time() - batchSize := float64(b.batch.Size()) - b.db.writeSize.Observe(batchSize) - b.db.bWrite.Observe(float64(end.Sub(start))) - b.db.bWriteSize.Observe(batchSize) + duration := time.Since(start) + size := b.batch.Size() + + b.db.calls.With(batchWriteLabel).Inc() + b.db.duration.With(batchWriteLabel).Add(float64(duration)) + b.db.size.With(batchWriteLabel).Add(float64(size)) return err } func (b *batch) Reset() { - start := b.db.clock.Time() + start := time.Now() b.batch.Reset() - end := b.db.clock.Time() - b.db.bReset.Observe(float64(end.Sub(start))) + duration := time.Since(start) + + b.db.calls.With(batchResetLabel).Inc() + b.db.duration.With(batchResetLabel).Add(float64(duration)) } func (b *batch) Replay(w database.KeyValueWriterDeleter) error { - start := b.db.clock.Time() + start := time.Now() err := b.batch.Replay(w) - end := b.db.clock.Time() - b.db.bReplay.Observe(float64(end.Sub(start))) + duration := time.Since(start) + + b.db.calls.With(batchReplayLabel).Inc() + b.db.duration.With(batchReplayLabel).Add(float64(duration)) return err } func (b *batch) Inner() database.Batch { - start := b.db.clock.Time() + start := time.Now() inner := b.batch.Inner() - end := b.db.clock.Time() - b.db.bInner.Observe(float64(end.Sub(start))) + duration := time.Since(start) + + b.db.calls.With(batchInnerLabel).Inc() + b.db.duration.With(batchInnerLabel).Add(float64(duration)) return inner } @@ -211,43 +335,52 @@ type iterator struct { } func (it *iterator) Next() bool { - start := it.db.clock.Time() + start := time.Now() next := it.iterator.Next() - end := it.db.clock.Time() - it.db.iNext.Observe(float64(end.Sub(start))) - size := float64(len(it.iterator.Key()) + len(it.iterator.Value())) - it.db.readSize.Observe(size) - it.db.iNextSize.Observe(size) + duration := time.Since(start) + size := len(it.iterator.Key()) + len(it.iterator.Value()) + + it.db.calls.With(iteratorNextLabel).Inc() + it.db.duration.With(iteratorNextLabel).Add(float64(duration)) + it.db.size.With(iteratorNextLabel).Add(float64(size)) return next } func (it *iterator) Error() error { - start := it.db.clock.Time() + start := time.Now() err := it.iterator.Error() - end := it.db.clock.Time() - it.db.iError.Observe(float64(end.Sub(start))) + duration := time.Since(start) + + it.db.calls.With(iteratorErrorLabel).Inc() + it.db.duration.With(iteratorErrorLabel).Add(float64(duration)) return err } func (it *iterator) Key() []byte { - start := it.db.clock.Time() + start := time.Now() key := it.iterator.Key() - end := it.db.clock.Time() - it.db.iKey.Observe(float64(end.Sub(start))) + duration := time.Since(start) + + it.db.calls.With(iteratorKeyLabel).Inc() + it.db.duration.With(iteratorKeyLabel).Add(float64(duration)) return key } func (it *iterator) Value() []byte { - start := it.db.clock.Time() + start := time.Now() value := it.iterator.Value() - end := it.db.clock.Time() - it.db.iValue.Observe(float64(end.Sub(start))) + duration := time.Since(start) + + it.db.calls.With(iteratorValueLabel).Inc() + it.db.duration.With(iteratorValueLabel).Add(float64(duration)) return value } func (it *iterator) Release() { - start := it.db.clock.Time() + start := time.Now() it.iterator.Release() - end := it.db.clock.Time() - it.db.iRelease.Observe(float64(end.Sub(start))) + duration := time.Since(start) + + it.db.calls.With(iteratorReleaseLabel).Inc() + it.db.duration.With(iteratorReleaseLabel).Add(float64(duration)) } diff --git a/database/meterdb/metrics.go b/database/meterdb/metrics.go deleted file mode 100644 index f311607cd467..000000000000 --- a/database/meterdb/metrics.go +++ /dev/null @@ -1,96 +0,0 @@ -// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package meterdb - -import ( - "fmt" - - "github.com/prometheus/client_golang/prometheus" - - "github.com/ava-labs/avalanchego/utils/metric" - "github.com/ava-labs/avalanchego/utils/wrappers" -) - -func newSizeMetric(namespace, name string, reg prometheus.Registerer, errs *wrappers.Errs) metric.Averager { - return metric.NewAveragerWithErrs( - namespace, - name+"_size", - fmt.Sprintf("bytes passed in a %s call", name), - reg, - errs, - ) -} - -func newTimeMetric(namespace, name string, reg prometheus.Registerer, errs *wrappers.Errs) metric.Averager { - return metric.NewAveragerWithErrs( - namespace, - name, - "time (in ns) of a "+name, - reg, - errs, - ) -} - -type metrics struct { - readSize, - writeSize, - has, hasSize, - get, getSize, - put, putSize, - delete, deleteSize, - newBatch, - newIterator, - compact, - close, - healthCheck, - bPut, bPutSize, - bDelete, bDeleteSize, - bSize, - bWrite, bWriteSize, - bReset, - bReplay, - bInner, - iNext, iNextSize, - iError, - iKey, - iValue, - iRelease metric.Averager -} - -func newMetrics(namespace string, reg prometheus.Registerer) (metrics, error) { - errs := wrappers.Errs{} - return metrics{ - readSize: newSizeMetric(namespace, "read", reg, &errs), - writeSize: newSizeMetric(namespace, "write", reg, &errs), - has: newTimeMetric(namespace, "has", reg, &errs), - hasSize: newSizeMetric(namespace, "has", reg, &errs), - get: newTimeMetric(namespace, "get", reg, &errs), - getSize: newSizeMetric(namespace, "get", reg, &errs), - put: newTimeMetric(namespace, "put", reg, &errs), - putSize: newSizeMetric(namespace, "put", reg, &errs), - delete: newTimeMetric(namespace, "delete", reg, &errs), - deleteSize: newSizeMetric(namespace, "delete", reg, &errs), - newBatch: newTimeMetric(namespace, "new_batch", reg, &errs), - newIterator: newTimeMetric(namespace, "new_iterator", reg, &errs), - compact: newTimeMetric(namespace, "compact", reg, &errs), - close: newTimeMetric(namespace, "close", reg, &errs), - healthCheck: newTimeMetric(namespace, "health_check", reg, &errs), - bPut: newTimeMetric(namespace, "batch_put", reg, &errs), - bPutSize: newSizeMetric(namespace, "batch_put", reg, &errs), - bDelete: newTimeMetric(namespace, "batch_delete", reg, &errs), - bDeleteSize: newSizeMetric(namespace, "batch_delete", reg, &errs), - bSize: newTimeMetric(namespace, "batch_size", reg, &errs), - bWrite: newTimeMetric(namespace, "batch_write", reg, &errs), - bWriteSize: newSizeMetric(namespace, "batch_write", reg, &errs), - bReset: newTimeMetric(namespace, "batch_reset", reg, &errs), - bReplay: newTimeMetric(namespace, "batch_replay", reg, &errs), - bInner: newTimeMetric(namespace, "batch_inner", reg, &errs), - iNext: newTimeMetric(namespace, "iterator_next", reg, &errs), - iNextSize: newSizeMetric(namespace, "iterator_next", reg, &errs), - iError: newTimeMetric(namespace, "iterator_error", reg, &errs), - iKey: newTimeMetric(namespace, "iterator_key", reg, &errs), - iValue: newTimeMetric(namespace, "iterator_value", reg, &errs), - iRelease: newTimeMetric(namespace, "iterator_release", reg, &errs), - }, errs.Err -} diff --git a/database/prefixdb/db.go b/database/prefixdb/db.go index 0e203653acc1..b3082d9e986e 100644 --- a/database/prefixdb/db.go +++ b/database/prefixdb/db.go @@ -23,7 +23,9 @@ var ( // a unique value. type Database struct { // All keys in this db begin with this byte slice - dbPrefix []byte + dbPrefix []byte + // Lexically one greater than dbPrefix, defining the end of this db's key range + dbLimit []byte bufferPool *utils.BytesPool // lock needs to be held during Close to guarantee db will not be set to nil @@ -37,11 +39,25 @@ type Database struct { func newDB(prefix []byte, db database.Database) *Database { return &Database{ dbPrefix: prefix, + dbLimit: incrementByteSlice(prefix), db: db, bufferPool: utils.NewBytesPool(), } } +func incrementByteSlice(orig []byte) []byte { + n := len(orig) + buf := make([]byte, n) + copy(buf, orig) + for i := n - 1; i >= 0; i-- { + buf[i]++ + if buf[i] != 0 { + break + } + } + return buf +} + // New returns a new prefixed database func New(prefix []byte, db database.Database) *Database { if prefixDB, ok := db.(*Database); ok { @@ -189,6 +205,9 @@ func (db *Database) Compact(start, limit []byte) error { prefixedStart := db.prefix(start) defer db.bufferPool.Put(prefixedStart) + if limit == nil { + return db.db.Compact(*prefixedStart, db.dbLimit) + } prefixedLimit := db.prefix(limit) defer db.bufferPool.Put(prefixedLimit) diff --git a/database/prefixdb/db_test.go b/database/prefixdb/db_test.go index f928d2f635a4..82b801f22e8f 100644 --- a/database/prefixdb/db_test.go +++ b/database/prefixdb/db_test.go @@ -7,6 +7,8 @@ import ( "fmt" "testing" + "github.com/stretchr/testify/require" + "github.com/ava-labs/avalanchego/database" "github.com/ava-labs/avalanchego/database/memdb" ) @@ -25,6 +27,15 @@ func TestInterface(t *testing.T) { } } +func TestPrefixLimit(t *testing.T) { + testString := []string{"hello", "world", "a\xff", "\x01\xff\xff\xff\xff"} + expected := []string{"hellp", "worle", "b\x00", "\x02\x00\x00\x00\x00"} + for i, str := range testString { + db := newDB([]byte(str), nil) + require.Equal(t, db.dbLimit, []byte(expected[i])) + } +} + func FuzzKeyValue(f *testing.F) { database.FuzzKeyValue(f, New([]byte(""), memdb.New())) } diff --git a/go.mod b/go.mod index 6babb1067ba9..173663337957 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ go 1.21.9 require ( github.com/DataDog/zstd v1.5.2 github.com/NYTimes/gziphandler v1.1.1 - github.com/ava-labs/coreth v0.13.3-rc.2 + github.com/ava-labs/coreth v0.13.4-0.20240506124912-82b6c4e91557 github.com/ava-labs/ledger-avalanche/go v0.0.0-20231102202641-ae2ebdaeac34 github.com/btcsuite/btcd/btcutil v1.1.3 github.com/cockroachdb/pebble v0.0.0-20230906160148-46873a6a7a06 diff --git a/go.sum b/go.sum index ee53bd2b070c..41df2f526573 100644 --- a/go.sum +++ b/go.sum @@ -60,8 +60,8 @@ github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8= github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= -github.com/ava-labs/coreth v0.13.3-rc.2 h1:lhyQwln6at1DTs1O586dMSAtGtSfQWlt2WH+Z2kgYdQ= -github.com/ava-labs/coreth v0.13.3-rc.2/go.mod h1:4l15XGak3FklhIb7CtlC/1YVwGAfMl83R2zd2N0hNE0= +github.com/ava-labs/coreth v0.13.4-0.20240506124912-82b6c4e91557 h1:92JWd4u2pqpO551gXUIZ/qDZu3l7vn8jIxX2qRyyFwM= +github.com/ava-labs/coreth v0.13.4-0.20240506124912-82b6c4e91557/go.mod h1:yMIxezDyB/5moKt8LlATlfwR/Z5cmipY3gUQ1SqHvQ0= github.com/ava-labs/ledger-avalanche/go v0.0.0-20231102202641-ae2ebdaeac34 h1:mg9Uw6oZFJKytJxgxnl3uxZOs/SB8CVHg6Io4Tf99Zc= github.com/ava-labs/ledger-avalanche/go v0.0.0-20231102202641-ae2ebdaeac34/go.mod h1:pJxaT9bUgeRNVmNRgtCHb7sFDIRKy7CzTQVi8gGNT6g= github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g= diff --git a/message/messages.go b/message/messages.go index a1d0601d4988..05220222dda7 100644 --- a/message/messages.go +++ b/message/messages.go @@ -9,23 +9,32 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" - "go.uber.org/zap" "google.golang.org/protobuf/proto" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/proto/pb/p2p" + "github.com/ava-labs/avalanchego/utils" "github.com/ava-labs/avalanchego/utils/compression" "github.com/ava-labs/avalanchego/utils/constants" "github.com/ava-labs/avalanchego/utils/logging" - "github.com/ava-labs/avalanchego/utils/metric" "github.com/ava-labs/avalanchego/utils/timer/mockable" - "github.com/ava-labs/avalanchego/utils/wrappers" +) + +const ( + typeLabel = "type" + opLabel = "op" + directionLabel = "direction" + + compressionLabel = "compression" + decompressionLabel = "decompression" ) var ( _ InboundMessage = (*inboundMessage)(nil) _ OutboundMessage = (*outboundMessage)(nil) + metricLabels = []string{typeLabel, opLabel, directionLabel} + errUnknownCompressionType = errors.New("message is compressed with an unknown compression type") ) @@ -131,9 +140,9 @@ func (m *outboundMessage) BytesSavedCompression() int { type msgBuilder struct { log logging.Logger - zstdCompressor compression.Compressor - zstdCompressTimeMetrics map[Op]metric.Averager - zstdDecompressTimeMetrics map[Op]metric.Averager + zstdCompressor compression.Compressor + count *prometheus.CounterVec // type + op + direction + duration *prometheus.CounterVec // type + op + direction maxMessageTimeout time.Duration } @@ -152,31 +161,30 @@ func newMsgBuilder( mb := &msgBuilder{ log: log, - zstdCompressor: zstdCompressor, - zstdCompressTimeMetrics: make(map[Op]metric.Averager, len(ExternalOps)), - zstdDecompressTimeMetrics: make(map[Op]metric.Averager, len(ExternalOps)), + zstdCompressor: zstdCompressor, + count: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "compressed_count", + Help: "number of compressed messages", + }, + metricLabels, + ), + duration: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "compressed_duration", + Help: "time spent handling compressed messages", + }, + metricLabels, + ), maxMessageTimeout: maxMessageTimeout, } - - errs := wrappers.Errs{} - for _, op := range ExternalOps { - mb.zstdCompressTimeMetrics[op] = metric.NewAveragerWithErrs( - namespace, - fmt.Sprintf("zstd_%s_compress_time", op), - fmt.Sprintf("time (in ns) to compress %s messages with zstd", op), - metrics, - &errs, - ) - mb.zstdDecompressTimeMetrics[op] = metric.NewAveragerWithErrs( - namespace, - fmt.Sprintf("zstd_%s_decompress_time", op), - fmt.Sprintf("time (in ns) to decompress %s messages with zstd", op), - metrics, - &errs, - ) - } - return mb, errs.Err + return mb, utils.Err( + metrics.Register(mb.count), + metrics.Register(mb.duration), + ) } func (mb *msgBuilder) marshal( @@ -200,9 +208,8 @@ func (mb *msgBuilder) marshal( // This recursive packing allows us to avoid an extra compression on/off // field in the message. var ( - startTime = time.Now() - compressedMsg p2p.Message - opToCompressTimeMetrics map[Op]metric.Averager + startTime = time.Now() + compressedMsg p2p.Message ) switch compressionType { case compression.TypeNone: @@ -217,7 +224,6 @@ func (mb *msgBuilder) marshal( CompressedZstd: compressedBytes, }, } - opToCompressTimeMetrics = mb.zstdCompressTimeMetrics default: return nil, 0, 0, errUnknownCompressionType } @@ -228,15 +234,13 @@ func (mb *msgBuilder) marshal( } compressTook := time.Since(startTime) - if compressTimeMetric, ok := opToCompressTimeMetrics[op]; ok { - compressTimeMetric.Observe(float64(compressTook)) - } else { - // Should never happen - mb.log.Warn("no compression metric found for op", - zap.Stringer("op", op), - zap.Stringer("compressionType", compressionType), - ) + labels := prometheus.Labels{ + typeLabel: compressionType.String(), + opLabel: op.String(), + directionLabel: compressionLabel, } + mb.count.With(labels).Inc() + mb.duration.With(labels).Add(float64(compressTook)) bytesSaved := len(uncompressedMsgBytes) - len(compressedMsgBytes) return compressedMsgBytes, bytesSaved, op, nil @@ -250,14 +254,12 @@ func (mb *msgBuilder) unmarshal(b []byte) (*p2p.Message, int, Op, error) { // Figure out what compression type, if any, was used to compress the message. var ( - opToDecompressTimeMetrics map[Op]metric.Averager - compressor compression.Compressor - compressedBytes []byte - zstdCompressed = m.GetCompressedZstd() + compressor compression.Compressor + compressedBytes []byte + zstdCompressed = m.GetCompressedZstd() ) switch { case len(zstdCompressed) > 0: - opToDecompressTimeMetrics = mb.zstdDecompressTimeMetrics compressor = mb.zstdCompressor compressedBytes = zstdCompressed default: @@ -284,14 +286,14 @@ func (mb *msgBuilder) unmarshal(b []byte) (*p2p.Message, int, Op, error) { if err != nil { return nil, 0, 0, err } - if decompressTimeMetric, ok := opToDecompressTimeMetrics[op]; ok { - decompressTimeMetric.Observe(float64(decompressTook)) - } else { - // Should never happen - mb.log.Warn("no decompression metric found for op", - zap.Stringer("op", op), - ) + + labels := prometheus.Labels{ + typeLabel: compression.TypeZstd.String(), + opLabel: op.String(), + directionLabel: decompressionLabel, } + mb.count.With(labels).Inc() + mb.duration.With(labels).Add(float64(decompressTook)) return m, bytesSavedCompression, op, nil } diff --git a/vms/components/avax/metadata.go b/vms/components/avax/metadata.go deleted file mode 100644 index 1630484131a8..000000000000 --- a/vms/components/avax/metadata.go +++ /dev/null @@ -1,59 +0,0 @@ -// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package avax - -import ( - "errors" - - "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/avalanchego/utils/hashing" - "github.com/ava-labs/avalanchego/vms/components/verify" -) - -var ( - errNilMetadata = errors.New("nil metadata is not valid") - errMetadataNotInitialize = errors.New("metadata was never initialized and is not valid") - - _ verify.Verifiable = (*Metadata)(nil) -) - -// TODO: Delete this once the downstream dependencies have been updated. -type Metadata struct { - id ids.ID // The ID of this data - unsignedBytes []byte // Unsigned byte representation of this data - bytes []byte // Byte representation of this data -} - -// Initialize set the bytes and ID -func (md *Metadata) Initialize(unsignedBytes, bytes []byte) { - md.id = hashing.ComputeHash256Array(bytes) - md.unsignedBytes = unsignedBytes - md.bytes = bytes -} - -// ID returns the unique ID of this data -func (md *Metadata) ID() ids.ID { - return md.id -} - -// UnsignedBytes returns the unsigned binary representation of this data -func (md *Metadata) Bytes() []byte { - return md.unsignedBytes -} - -// Bytes returns the binary representation of this data -func (md *Metadata) SignedBytes() []byte { - return md.bytes -} - -func (md *Metadata) Verify() error { - switch { - case md == nil: - return errNilMetadata - case md.id == ids.Empty: - return errMetadataNotInitialize - default: - return nil - } -} diff --git a/vms/components/avax/metadata_test.go b/vms/components/avax/metadata_test.go deleted file mode 100644 index 9569e3e3a465..000000000000 --- a/vms/components/avax/metadata_test.go +++ /dev/null @@ -1,22 +0,0 @@ -// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package avax - -import ( - "testing" - - "github.com/stretchr/testify/require" -) - -func TestMetaDataVerifyNil(t *testing.T) { - md := (*Metadata)(nil) - err := md.Verify() - require.ErrorIs(t, err, errNilMetadata) -} - -func TestMetaDataVerifyUninitialized(t *testing.T) { - md := &Metadata{} - err := md.Verify() - require.ErrorIs(t, err, errMetadataNotInitialize) -}