diff --git a/go.mod b/go.mod index 810b3dd..13bcac8 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.18 require ( github.com/HdrHistogram/hdrhistogram-go v1.1.0 + github.com/armon/go-metrics v0.3.10 github.com/benbjohnson/immutable v0.4.0 github.com/benmathews/bench v0.0.0-20210120214102-f7c75b9ef6e7 github.com/benmathews/hdrhistogram-writer v0.0.0-20210120211942-3cb1c7c33f95 @@ -13,13 +14,13 @@ require ( github.com/hashicorp/raft v1.3.10 github.com/hashicorp/raft-boltdb v0.0.0-20220329195025-15018e9b97e0 github.com/hashicorp/raft-boltdb/v2 v2.2.2 + github.com/ryboe/q v1.0.18 github.com/segmentio/fasthash v1.0.3 github.com/stretchr/testify v1.8.0 go.etcd.io/bbolt v1.3.6 ) require ( - github.com/armon/go-metrics v0.3.10 // indirect github.com/boltdb/bolt v1.3.1 // indirect github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf // indirect github.com/coreos/pkg v0.0.0-20220810130054-c7d1c02cb6cf // indirect @@ -28,9 +29,12 @@ require ( github.com/hashicorp/go-immutable-radix v1.3.0 // indirect github.com/hashicorp/go-msgpack v1.1.5 // indirect github.com/hashicorp/golang-lru v0.5.4 // indirect + github.com/kr/pretty v0.3.1 // indirect + github.com/kr/text v0.2.0 // indirect github.com/mattn/go-colorable v0.1.6 // indirect github.com/mattn/go-isatty v0.0.12 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/rogpeppe/go-internal v1.9.0 // indirect golang.org/x/exp v0.0.0-20220827204233-334a2380cb91 // indirect golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 // indirect golang.org/x/time v0.1.0 // indirect diff --git a/go.sum b/go.sum index dd9cea7..3e091bf 100644 --- a/go.sum +++ b/go.sum @@ -124,6 +124,8 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxv github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -158,6 +160,7 @@ github.com/nsqio/go-nsq v1.0.8/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQT github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pierrec/lz4 v2.4.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -178,6 +181,10 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= github.com/prometheus/procfs v0.2.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/ryboe/q v1.0.18 h1:uTonPt1eZjy7GSpB0XpYpsCvX+Yf9f+M4CUKuH2r+vg= +github.com/ryboe/q v1.0.18/go.mod h1:elqvVf/GBuZHvZ9gvHv4MKM6NZAMz2rFajnTgQZ46wU= github.com/segmentio/fasthash v1.0.3 h1:EI9+KE1EwvMLBWwjpRDc+fEM+prwxDYbslddQGtrmhM= github.com/segmentio/fasthash v1.0.3/go.mod h1:waKX8l2N8yckOgmSsXJi7x1ZfdKZ4x7KRMzBtS3oedY= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= diff --git a/metrics.go b/metrics.go index 2f42838..a037b12 100644 --- a/metrics.go +++ b/metrics.go @@ -4,102 +4,75 @@ package wal import ( - "sync/atomic" + "github.com/hashicorp/raft-wal/metrics" ) var ( - // All metrics are held in an array of uint64s accessed atomically. The names - // are defined in the list below. The order doesn't really matter for - // backwards compatibility since it is not persisted state although it might - // impact the order we print things. Why not a struct? This makes it easier to - // iterate them without reflection and stops us hard coding the same list in - // several places. - metrics = []string{ - // COUNTERS - - // LogEntryBytesWritten counts the bytes of log entry after encoding with - // Codec. Actual bytes written to disk might be slightly higher as it includes - // headers and index entries. - "log_entry_bytes_written", - - // log_entries_written counts the number of entries written. - "log_entries_written", - - // log_appends counts the number of calls to StoreLog(s) i.e. number of - // batches of entries appended. - "log_appends", - - // log_entry_bytes_read counts the bytes of log entry read from segments before - // decoding. actual bytes read from disk might be higher as it includes - // headers and index entries and possible secondary reads for large entries - // that don't fit in buffers. - "log_entry_bytes_read", - - // log_entries_read counts the number of calls to get_log. - "log_entries_read", - - // segment_rotations counts how many times we move to a new segment file. - "segment_rotations", - - // head_truncations counts how many log entries have been truncated from the - // head - i.e. the oldest entries. by graphing the rate of change over time - // you can see individual truncate calls as spikes. - "head_truncations", - - // tail_truncations counts how many log entries have been truncated from the - // head - i.e. the newest entries. by graphing the rate of change over time - // you can see individual truncate calls as spikes. - "tail_truncations", - - "stable_gets", - "stable_sets", - - // gauges - - // last_segment_age_seconds is a gauge that is set each time we rotate a segment - // and describes the number of seconds between when that segment file was - // first created and when it was sealed. this gives a rough estimate how - // quickly writes are filling the disk. - "last_segment_age_seconds", + // MetricDefinitions describe the metrics emitted by this library via the + // provided metrics.Collector implementation. It's public so that these can be + // registered during init with metrics clients that support pre-defining + // metrics. + MetricDefinitions = metrics.Definitions{ + Counters: []metrics.Descriptor{ + { + Name: "log_entry_bytes_written", + Desc: "log_entry_bytes_written counts the bytes of log entry after encoding" + + " with Codec. Actual bytes written to disk might be slightly higher as it" + + " includes headers and index entries.", + }, + { + Name: "log_entries_written", + Desc: "log_entries_written counts the number of entries written.", + }, + { + Name: "log_appends", + Desc: "log_appends counts the number of calls to StoreLog(s) i.e." + + " number of batches of entries appended.", + }, + { + Name: "log_entry_bytes_read", + Desc: "log_entry_bytes_read counts the bytes of log entry read from" + + " segments before decoding. actual bytes read from disk might be higher" + + " as it includes headers and index entries and possible secondary reads" + + " for large entries that don't fit in buffers.", + }, + { + Name: "log_entries_read", + Desc: "log_entries_read counts the number of calls to get_log.", + }, + { + Name: "segment_rotations", + Desc: "segment_rotations counts how many times we move to a new segment file.", + }, + { + Name: "head_truncations", + Desc: "head_truncations counts how many log entries have been truncated" + + " from the head - i.e. the oldest entries. by graphing the rate of" + + " change over time you can see individual truncate calls as spikes.", + }, + { + Name: "tail_truncations", + Desc: "tail_truncations counts how many log entries have been truncated" + + " from the head - i.e. the newest entries. by graphing the rate of" + + " change over time you can see individual truncate calls as spikes.", + }, + { + Name: "stable_gets", + Desc: "stable_gets counts how many calls to StableStore.Get or GetUint64.", + }, + { + Name: "stable_sets", + Desc: "stable_sets counts how many calls to StableStore.Set or SetUint64.", + }, + }, + Gauges: []metrics.Descriptor{ + { + Name: "last_segment_age_seconds", + Desc: "last_segment_age_seconds is a gauge that is set each time we" + + " rotate a segment and describes the number of seconds between when" + + " that segment file was first created and when it was sealed. this" + + " gives a rough estimate how quickly writes are filling the disk.", + }, + }, } - - metricIDs map[string]int - numMetrics int ) - -func init() { - numMetrics = len(metrics) - metricIDs = make(map[string]int, numMetrics) - for i, n := range metrics { - metricIDs[n] = i - } -} - -func (w *WAL) incr(name string, delta uint64) uint64 { - id, ok := metricIDs[name] - if !ok { - panic("invalid metric name: " + name) - } - return atomic.AddUint64(&w.metrics[id], delta) -} - -func (w *WAL) setGauge(name string, val uint64) { - id, ok := metricIDs[name] - if !ok { - panic("invalid metric name: " + name) - } - atomic.StoreUint64(&w.metrics[id], val) -} - -// Metrics returns a summary of the performance counters for the WAL since -// startup. This is intentionally agnostic to any metrics collector library. -// Package users should be able to poll this and report counter, gauge or timing -// information in their native format from this. -func (w *WAL) Metrics() map[string]uint64 { - // Copy the fields out with atomic reads - m := make(map[string]uint64) - for i, n := range metrics { - m[n] = atomic.LoadUint64(&w.metrics[i]) - } - return m -} diff --git a/metrics/atomic_collector.go b/metrics/atomic_collector.go new file mode 100644 index 0000000..b17daf1 --- /dev/null +++ b/metrics/atomic_collector.go @@ -0,0 +1,86 @@ +package metrics + +import "sync/atomic" + +var ( + _ Collector = &AtomicCollector{} +) + +// AtomicCollector is a simple Collector that atomically stores +// counters and gauges in memory. +type AtomicCollector struct { + counters []uint64 + gauges []uint64 + + counterIndex, gaugeIndex map[string]int +} + +// NewAtomicCollector creates a collector for the given set of Definitions. +func NewAtomicCollector(defs Definitions) *AtomicCollector { + c := &AtomicCollector{ + counters: make([]uint64, len(defs.Counters)), + gauges: make([]uint64, len(defs.Gauges)), + counterIndex: make(map[string]int), + gaugeIndex: make(map[string]int), + } + for i, d := range defs.Counters { + if _, ok := c.counterIndex[d.Name]; ok { + panic("duplicate metrics named " + d.Name) + } + c.counterIndex[d.Name] = i + } + for i, d := range defs.Gauges { + if _, ok := c.counterIndex[d.Name]; ok { + panic("duplicate metrics named " + d.Name) + } + if _, ok := c.gaugeIndex[d.Name]; ok { + panic("duplicate metrics named " + d.Name) + } + c.gaugeIndex[d.Name] = i + } + return c +} + +// IncrementCounter record val occurrences of the named event. Names will +// follow prometheus conventions with lower_case_and_underscores. We don't +// need any additional labels currently. +func (c *AtomicCollector) IncrementCounter(name string, delta uint64) { + id, ok := c.counterIndex[name] + if !ok { + panic("invalid metric name: " + name) + } + atomic.AddUint64(&c.counters[id], delta) +} + +// SetGauge sets the value of the named gauge overriding any previous value. +func (c *AtomicCollector) SetGauge(name string, val uint64) { + id, ok := c.gaugeIndex[name] + if !ok { + panic("invalid metric name: " + name) + } + atomic.StoreUint64(&c.gauges[id], val) +} + +// Summary returns a summary of the metrics since startup. Each value is +// atomically loaded but the set is not atomic overall and may represent an +// inconsistent snapshot e.g. with some metrics reflecting the most recent +// operation while others don't. +func (c *AtomicCollector) Summary() Summary { + s := Summary{ + Counters: make(map[string]uint64, len(c.counters)), + Gauges: make(map[string]uint64, len(c.gauges)), + } + for name, id := range c.counterIndex { + s.Counters[name] = atomic.LoadUint64(&c.counters[id]) + } + for name, id := range c.gaugeIndex { + s.Gauges[name] = atomic.LoadUint64(&c.gauges[id]) + } + return s +} + +// Summary is a copy of the values recorded so far for each metric. +type Summary struct { + Counters map[string]uint64 + Gauges map[string]uint64 +} diff --git a/metrics/atomic_collector_test.go b/metrics/atomic_collector_test.go new file mode 100644 index 0000000..14814b2 --- /dev/null +++ b/metrics/atomic_collector_test.go @@ -0,0 +1,58 @@ +package metrics + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestAtomicCollector(t *testing.T) { + defs := Definitions{ + Counters: []Descriptor{ + { + Name: "c1", + Desc: "counter one.", + }, + { + Name: "c2", + Desc: "counter two.", + }, + }, + Gauges: []Descriptor{ + { + Name: "g1", + Desc: "gauge one.", + }, + { + Name: "g2", + Desc: "gauge two.", + }, + }, + } + + c := NewAtomicCollector(defs) + + var wg sync.WaitGroup + + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < 10; j++ { + c.IncrementCounter("c1", 1) + c.IncrementCounter("c2", 2) + c.SetGauge("g1", uint64(j)) + c.SetGauge("g2", uint64(j*2)) + } + }() + } + + wg.Wait() + + s := c.Summary() + require.Equal(t, 100, int(s.Counters["c1"])) + require.Equal(t, 200, int(s.Counters["c2"])) + require.Equal(t, 9, int(s.Gauges["g1"])) + require.Equal(t, 18, int(s.Gauges["g2"])) +} diff --git a/metrics/gometrics_collector.go b/metrics/gometrics_collector.go new file mode 100644 index 0000000..2c1948a --- /dev/null +++ b/metrics/gometrics_collector.go @@ -0,0 +1,48 @@ +package metrics + +import gometrics "github.com/armon/go-metrics" + +// GoMetricsCollector implements a Collector that passes through observations to +// a go-metrics instance. The zero value works, writing metrics to the default +// global instance however to set a prefix or a static set of labels to add to +// each metric observed, or to use a non-global metrics instance use +// NewGoMetricsCollector. +type GoMetricsCollector struct { + gm *gometrics.Metrics + prefix []string + labels []gometrics.Label +} + +// NewGoMetricsCollector returns a GoMetricsCollector that will attach the +// specified name prefix and/or labels to each observation. If gm is nil the +// global metrics instance is used. +func NewGoMetricsCollector(prefix []string, labels []gometrics.Label, gm *gometrics.Metrics) *GoMetricsCollector { + if gm == nil { + gm = gometrics.Default() + } + return &GoMetricsCollector{ + gm: gm, + prefix: prefix, + labels: labels, + } +} + +// IncrementCounter record val occurrences of the named event. Names will +// follow prometheus conventions with lower_case_and_underscores. We don't +// need any additional labels currently. +func (c *GoMetricsCollector) IncrementCounter(name string, delta uint64) { + c.gm.IncrCounterWithLabels(c.name(name), float32(delta), c.labels) +} + +// SetGauge sets the value of the named gauge overriding any previous value. +func (c *GoMetricsCollector) SetGauge(name string, val uint64) { + c.gm.SetGaugeWithLabels(c.name(name), float32(val), c.labels) +} + +// name returns the metric name as a slice we don't want to risk modifying the +// prefix slice backing array since this might be called concurrently so we +// always allocate a new slice. +func (c *GoMetricsCollector) name(name string) []string { + var ss []string + return append(append(ss, c.prefix...), name) +} diff --git a/metrics/gometrics_collector_test.go b/metrics/gometrics_collector_test.go new file mode 100644 index 0000000..862cedf --- /dev/null +++ b/metrics/gometrics_collector_test.go @@ -0,0 +1,58 @@ +package metrics + +import ( + "testing" + "time" + + gometrics "github.com/armon/go-metrics" + "github.com/stretchr/testify/require" +) + +func TestGoMetricsCollector(t *testing.T) { + cfg := &gometrics.Config{ + EnableHostname: false, + EnableRuntimeMetrics: false, + // FilterDefault is super weird and backwards but "true" means "don't + // filter"! + FilterDefault: true, + } + sink := gometrics.NewInmemSink(1*time.Second, 10*time.Second) + gm, err := gometrics.New(cfg, sink) + require.NoError(t, err) + + c := NewGoMetricsCollector( + []string{"myapp", "wal"}, + []gometrics.Label{{Name: "label", Value: "foo"}}, + gm, + ) + + c.IncrementCounter("counter_one", 1) + c.IncrementCounter("counter_one", 1) + c.IncrementCounter("counter_two", 10) + + c.SetGauge("g1", 12345) + + summary := flattenData(sink.Data()) + + require.Equal(t, 2, int(summary.Counters["myapp.wal.counter_one;label=foo"])) + require.Equal(t, 10, int(summary.Counters["myapp.wal.counter_two;label=foo"])) + + require.Equal(t, 12345, int(summary.Gauges["myapp.wal.g1;label=foo"])) + +} + +func flattenData(ivs []*gometrics.IntervalMetrics) Summary { + s := Summary{ + Counters: make(map[string]uint64), + Gauges: make(map[string]uint64), + } + for _, iv := range ivs { + for name, v := range iv.Counters { + s.Counters[name] += uint64(v.Sum) + } + for name, v := range iv.Gauges { + s.Gauges[name] = uint64(v.Value) + } + } + return s +} diff --git a/metrics/metrics.go b/metrics/metrics.go new file mode 100644 index 0000000..4a93ec2 --- /dev/null +++ b/metrics/metrics.go @@ -0,0 +1,39 @@ +package metrics + +// Collector provides a simple abstraction for counter type metrics that +// the WAL and log verifier can use without depending on a specific metrics +// collector implementation. +type Collector interface { + // IncrementCounter record val occurrences of the named event. Names will + // follow prometheus conventions with lower_case_and_underscores. We don't + // need any additional labels currently. + IncrementCounter(name string, delta uint64) + + // SetGauge sets the value of the named gauge overriding any previous value. + SetGauge(name string, val uint64) +} + +// Definitions provides a simple description of a set of scalar metrics. +type Definitions struct { + Counters []Descriptor + Gauges []Descriptor +} + +// Descriptor describes a specific metric. +type Descriptor struct { + Name string + Desc string +} + +var _ Collector = &NoOpCollector{} + +// NoOpCollector is a Collector that does nothing. +type NoOpCollector struct{} + +// IncrementCounter record val occurrences of the named event. Names will +// follow prometheus conventions with lower_case_and_underscores. We don't +// need any additional labels currently. +func (c *NoOpCollector) IncrementCounter(name string, delta uint64) {} + +// SetGauge sets the value of the named gauge overriding any previous value. +func (c *NoOpCollector) SetGauge(name string, val uint64) {} diff --git a/options.go b/options.go index 146a103..743e1b8 100644 --- a/options.go +++ b/options.go @@ -9,6 +9,7 @@ import ( "github.com/hashicorp/go-hclog" "github.com/hashicorp/raft-wal/fs" "github.com/hashicorp/raft-wal/metadb" + "github.com/hashicorp/raft-wal/metrics" "github.com/hashicorp/raft-wal/segment" "github.com/hashicorp/raft-wal/types" ) @@ -52,6 +53,13 @@ func WithSegmentSize(size int) walOpt { } } +// WithMetricsCollector is an option that allows a custom segmentSize to be set. +func WithMetricsCollector(c metrics.Collector) walOpt { + return func(w *WAL) { + w.metrics = c + } +} + func (w *WAL) applyDefaultsAndValidate() error { // Check if an external codec has been used that it's not using a reserved ID. if w.codec != nil && w.codec.ID() < FirstExternalCodecID { @@ -71,6 +79,9 @@ func (w *WAL) applyDefaultsAndValidate() error { vfs := fs.New() w.sf = segment.NewFiler(w.dir, vfs) } + if w.metrics == nil { + w.metrics = &metrics.NoOpCollector{} + } if w.metaDB == nil { w.metaDB = &metadb.BoltMetaDB{} } diff --git a/verifier/metrics.go b/verifier/metrics.go index 3ca8903..269f640 100644 --- a/verifier/metrics.go +++ b/verifier/metrics.go @@ -3,13 +3,47 @@ package verifier -// Metrics summarises a set of counters maintained by the verifying LogCache -type Metrics struct { - // COUNTERS +import ( + "github.com/hashicorp/raft-wal/metrics" +) - CheckpointsWritten uint64 - RangesVerified uint64 - ReadChecksumFailures uint64 - WriteChecksumFailures uint64 - DroppedReports uint64 -} +var ( + // MetricDefinitions describe the metrics emitted by this library via the + // provided metrics.Collector implementation. It's public so that these can be + // registered during init with metrics clients that support pre-defining + // metrics. + MetricDefinitions = metrics.Definitions{ + Counters: []metrics.Descriptor{ + { + Name: "checkpoints_written", + Desc: "checkpoints_written counts the number of checkpoint entries" + + " written to the LogStore.", + }, + { + Name: "ranges_verified", + Desc: "ranges_verified counts the number of log ranges for which a" + + " verification report has been completed.", + }, + { + Name: "read_checksum_failures", + Desc: "read_checksum_failures counts the number of times a range of" + + " logs between two check points contained at least one corruption.", + }, + { + Name: "write_checksum_failures", + Desc: "write_checksum_failures counts the number of times a follower" + + " has a different checksum to the leader at the point where it" + + " writes to the log. This could be caused by either a disk-corruption" + + " on the leader (unlikely) or some other corruption of the log" + + " entries in-flight.", + }, + { + Name: "dropped_reports", + Desc: "dropped_reports counts how many times the verifier routine was" + + " still busy when the next checksum came in and so verification for" + + " a range was skipped. If you see this happen consider increasing" + + " the interval between checkpoints.", + }, + }, + } +) diff --git a/verifier/store.go b/verifier/store.go index 07427dc..fe6a502 100644 --- a/verifier/store.go +++ b/verifier/store.go @@ -12,6 +12,7 @@ import ( "github.com/hashicorp/go-hclog" "github.com/hashicorp/raft" + "github.com/hashicorp/raft-wal/metrics" ) // LogStore is a raft.LogStore that acts as middleware around an underlying @@ -45,7 +46,7 @@ type LogStore struct { s raft.LogStore - metrics *Metrics + metrics metrics.Collector log hclog.Logger verifyCh chan VerificationReport @@ -58,10 +59,10 @@ type LogStore struct { // set on the returned store _before_ it is passed to Raft, or may be left as // nil to bypass verification. Close must be called when the log store is no // longer useful to cleanup background verification. -func NewLogStore(store raft.LogStore, checkpointFn IsCheckpointFn, reportFn ReportFn) *LogStore { +func NewLogStore(store raft.LogStore, checkpointFn IsCheckpointFn, reportFn ReportFn, mc metrics.Collector) *LogStore { c := &LogStore{ s: store, - metrics: &Metrics{}, + metrics: mc, verifyCh: make(chan VerificationReport, 1), checkpointFn: checkpointFn, reportFn: reportFn, @@ -198,7 +199,7 @@ func (s *LogStore) StoreLogs(logs []*raft.Log) error { atomic.StoreUint64(&s.checksum, cs) atomic.StoreUint64(&s.sumStartIdx, startIdx) if len(triggeredReports) > 0 { - atomic.StoreUint64(&s.metrics.CheckpointsWritten, uint64(len(triggeredReports))) + s.metrics.IncrementCounter("checkpoints_written", uint64(len(triggeredReports))) } for _, r := range triggeredReports { @@ -215,7 +216,7 @@ func (s *LogStore) triggerVerify(r VerificationReport) { select { case s.verifyCh <- r: default: - atomic.AddUint64(&s.metrics.DroppedReports, 1) + s.metrics.IncrementCounter("dropped_reports", 1) } } diff --git a/verifier/store_test.go b/verifier/store_test.go index e2d030b..c798547 100644 --- a/verifier/store_test.go +++ b/verifier/store_test.go @@ -7,11 +7,11 @@ import ( "bytes" "fmt" "sync" - "sync/atomic" "testing" "time" "github.com/hashicorp/raft" + "github.com/hashicorp/raft-wal/metrics" "github.com/stretchr/testify/require" ) @@ -54,10 +54,10 @@ func TestStore(t *testing.T) { ReplicateTo("f1", 1239, 1235). // Follower should report and detect in-flight corruption AssertReport("f1", LogRange{1234, 1234 + 5}, "in-flight corruption"). - AssertMetrics("f1", func(t *testing.T, m *Metrics) { - require.Equal(t, 1, int(atomic.LoadUint64(&m.CheckpointsWritten))) - require.Equal(t, 1, int(atomic.LoadUint64(&m.WriteChecksumFailures))) - require.Equal(t, 0, int(atomic.LoadUint64(&m.ReadChecksumFailures))) + AssertMetrics("f1", func(t *testing.T, s metrics.Summary) { + require.Equal(t, 1, int(s.Counters["checkpoints_written"])) + require.Equal(t, 1, int(s.Counters["write_checksum_failures"])) + require.Equal(t, 0, int(s.Counters["read_checksum_failures"])) }). Steps(), }, @@ -76,10 +76,10 @@ func TestStore(t *testing.T) { ReplicateTo("f1", 1239, 0). // Follower should report and detect storage corruption AssertReport("f1", LogRange{1234, 1234 + 5}, "storage corruption"). - AssertMetrics("f1", func(t *testing.T, m *Metrics) { - require.Equal(t, 1, int(atomic.LoadUint64(&m.CheckpointsWritten))) - require.Equal(t, 0, int(atomic.LoadUint64(&m.WriteChecksumFailures))) - require.Equal(t, 1, int(atomic.LoadUint64(&m.ReadChecksumFailures))) + AssertMetrics("f1", func(t *testing.T, s metrics.Summary) { + require.Equal(t, 1, int(s.Counters["checkpoints_written"])) + require.Equal(t, 0, int(s.Counters["write_checksum_failures"])) + require.Equal(t, 1, int(s.Counters["read_checksum_failures"])) }). Steps(), }, @@ -131,9 +131,9 @@ func TestStore(t *testing.T) { AssertReport("leader", LogRange{18, 24}, "", func(t *testing.T, r *VerificationReport) { require.Equal(t, &LogRange{12, 18}, r.SkippedRange) }). - AssertMetrics("leader", func(t *testing.T, m *Metrics) { - require.Equal(t, 1, int(atomic.LoadUint64(&m.DroppedReports))) - require.Equal(t, 3, int(atomic.LoadUint64(&m.RangesVerified))) + AssertMetrics("leader", func(t *testing.T, s metrics.Summary) { + require.Equal(t, 1, int(s.Counters["dropped_reports"])) + require.Equal(t, 3, int(s.Counters["ranges_verified"])) }). Steps(), }, @@ -188,8 +188,8 @@ func TestStore(t *testing.T) { peers.unblockReportFn(step.targetNode) case step.assertMetrics != nil: - ls := peers.logStore(step.targetNode) - step.assertMetrics(t, ls.metrics) + ms := peers.metrics(step.targetNode) + step.assertMetrics(t, ms) default: t.Fatalf("invalid testStep: %#v", step) @@ -205,6 +205,7 @@ type peerSet struct { tss map[string]*testStore chs map[string]chan VerificationReport blocks map[string]sync.Locker + mcs map[string]*metrics.AtomicCollector } func newPeerSet() *peerSet { @@ -213,6 +214,7 @@ func newPeerSet() *peerSet { tss: make(map[string]*testStore), chs: make(map[string]chan VerificationReport), blocks: make(map[string]sync.Locker), + mcs: make(map[string]*metrics.AtomicCollector), } } @@ -222,6 +224,7 @@ func (s *peerSet) Close() error { delete(s.lss, node) delete(s.tss, node) delete(s.chs, node) + delete(s.mcs, node) // Don't close chans as it causes panics. } return nil @@ -231,7 +234,7 @@ func cpFn(l *raft.Log) (bool, error) { return bytes.Equal(l.Data, []byte("CHECKPOINT")), nil } -func (s *peerSet) init(node string) (*LogStore, *testStore, chan VerificationReport) { +func (s *peerSet) init(node string) (*LogStore, *testStore, chan VerificationReport, *metrics.AtomicCollector) { ts := &testStore{} ch := make(chan VerificationReport, 20) @@ -243,19 +246,21 @@ func (s *peerSet) init(node string) (*LogStore, *testStore, chan VerificationRep ch <- vr } - ls := NewLogStore(ts, cpFn, reportFn) + metrics := metrics.NewAtomicCollector(MetricDefinitions) + ls := NewLogStore(ts, cpFn, reportFn, metrics) s.lss[node] = ls s.tss[node] = ts s.chs[node] = ch + s.mcs[node] = metrics s.blocks[node] = &block - return ls, ts, ch + return ls, ts, ch, metrics } func (s *peerSet) logStore(node string) *LogStore { ls, ok := s.lss[node] if !ok { - ls, _, _ = s.init(node) + ls, _, _, _ = s.init(node) } return ls } @@ -263,7 +268,7 @@ func (s *peerSet) logStore(node string) *LogStore { func (s *peerSet) testStore(node string) *testStore { ts, ok := s.tss[node] if !ok { - _, ts, _ = s.init(node) + _, ts, _, _ = s.init(node) } return ts } @@ -271,11 +276,19 @@ func (s *peerSet) testStore(node string) *testStore { func (s *peerSet) reportCh(node string) chan VerificationReport { ch, ok := s.chs[node] if !ok { - _, _, ch = s.init(node) + _, _, ch, _ = s.init(node) } return ch } +func (s *peerSet) metrics(node string) metrics.Summary { + mc, ok := s.mcs[node] + if !ok { + _, _, _, mc = s.init(node) + } + return mc.Summary() +} + func (s *peerSet) blockReportFn(node string) { s.blocks[node].Lock() } @@ -290,7 +303,7 @@ func assertReportDelivered(t *testing.T, ch <-chan VerificationReport) *Verifica case r := <-ch: return &r - case <-time.After(time.Second): + case <-time.After(5 * time.Second): t.Fatalf("didn't get report after a second!") } return nil @@ -364,7 +377,7 @@ type testStep struct { blockReporting bool unblockReporting bool - assertMetrics func(t *testing.T, m *Metrics) + assertMetrics func(t *testing.T, s metrics.Summary) } func (s testStep) String() string { @@ -399,7 +412,7 @@ func (s testStep) String() string { return fmt.Sprintf("blockReporting(%s)", s.targetNode) case s.unblockReporting: - return fmt.Sprintf("blockReporting(%s)", s.targetNode) + return fmt.Sprintf("unblockReporting(%s)", s.targetNode) case s.assertMetrics != nil: return fmt.Sprintf("assertMetrics(%s)", s.targetNode) @@ -520,7 +533,7 @@ func (b *testBuilder) UnblockReporting(node string) *testBuilder { return b } -func (b *testBuilder) AssertMetrics(node string, fn func(t *testing.T, m *Metrics)) *testBuilder { +func (b *testBuilder) AssertMetrics(node string, fn func(t *testing.T, m metrics.Summary)) *testBuilder { step := testStep{ targetNode: node, assertMetrics: fn, diff --git a/verifier/verifier.go b/verifier/verifier.go index b955bad..96bc245 100644 --- a/verifier/verifier.go +++ b/verifier/verifier.go @@ -6,7 +6,6 @@ package verifier import ( "errors" "fmt" - "sync/atomic" "time" "github.com/hashicorp/raft" @@ -155,7 +154,7 @@ func (s *LogStore) runVerifier() { // Whatever state report ended up in, deliver it! report.Elapsed = time.Since(st) s.reportFn(report) - atomic.AddUint64(&s.metrics.RangesVerified, 1) + s.metrics.IncrementCounter("ranges_verified", 1) } } @@ -167,7 +166,7 @@ func (s *LogStore) verify(report *VerificationReport) { // leader in this range then there's not much point verifying that we read it // back OK. if report.WrittenSum != 0 && report.WrittenSum != report.ExpectedSum { - atomic.AddUint64(&s.metrics.WriteChecksumFailures, 1) + s.metrics.IncrementCounter("write_checksum_failures", 1) report.Err = ErrChecksumMismatch(fmt.Sprintf("log verification failed for range %s: "+ "in-flight corruption: follower wrote checksum=%08x, leader wrote checksum=%08x", report.Range, report.WrittenSum, report.ExpectedSum)) @@ -202,7 +201,7 @@ func (s *LogStore) verify(report *VerificationReport) { report.ReadSum = sum if report.ReadSum != report.ExpectedSum { - atomic.AddUint64(&s.metrics.ReadChecksumFailures, 1) + s.metrics.IncrementCounter("read_checksum_failures", 1) report.Err = ErrChecksumMismatch(fmt.Sprintf("log verification failed for range %s: "+ "storage corruption: node read checksum=%08x, leader wrote checksum=%08x", report.Range, report.ReadSum, report.ExpectedSum)) diff --git a/wal.go b/wal.go index d72bb68..3d0cdbe 100644 --- a/wal.go +++ b/wal.go @@ -18,6 +18,7 @@ import ( "github.com/hashicorp/go-hclog" "github.com/hashicorp/raft" + "github.com/hashicorp/raft-wal/metrics" "github.com/hashicorp/raft-wal/types" ) @@ -41,13 +42,10 @@ type WAL struct { codec Codec sf types.SegmentFiler metaDB types.MetaStore + metrics metrics.Collector log hclog.Logger segmentSize int - // metrics is allocated once on creation then it's elements are accessed - // atomically after that. - metrics []uint64 - // s is the current state of the WAL files. It is an immutable snapshot that // can be accessed without a lock when reading. We only support a single // writer so all methods that mutate either the WAL state or append to the @@ -90,7 +88,6 @@ type walOpt func(*WAL) func Open(dir string, opts ...walOpt) (*WAL, error) { w := &WAL{ dir: dir, - metrics: make([]uint64, numMetrics), triggerRotate: make(chan uint64, 1), } // Apply options @@ -333,13 +330,13 @@ func (w *WAL) GetLog(index uint64, log *raft.Log) error { } s, release := w.acquireState() defer release() - w.incr("log_entries_read", 1) + w.metrics.IncrementCounter("log_entries_read", 1) raw, err := s.getLog(index) if err != nil { return err } - w.incr("log_entry_bytes_read", uint64(len(raw.Bs))) + w.metrics.IncrementCounter("log_entry_bytes_read", uint64(len(raw.Bs))) defer raw.Close() // Decode the log @@ -428,9 +425,9 @@ func (w *WAL) StoreLogs(logs []*raft.Log) error { if err := s.tail.Append(encoded); err != nil { return err } - w.incr("log_appends", 1) - w.incr("log_entries_written", uint64(len(encoded))) - w.incr("log_entry_bytes_written", nBytes) + w.metrics.IncrementCounter("log_appends", 1) + w.metrics.IncrementCounter("log_entries_written", uint64(len(encoded))) + w.metrics.IncrementCounter("log_entry_bytes_written", nBytes) // Check if we need to roll logs sealed, indexStart, err := s.tail.Sealed() @@ -508,7 +505,7 @@ func (w *WAL) Set(key []byte, val []byte) error { if err := w.checkClosed(); err != nil { return err } - w.incr("stable_sets", 1) + w.metrics.IncrementCounter("stable_sets", 1) return w.metaDB.SetStable(key, val) } @@ -517,7 +514,7 @@ func (w *WAL) Get(key []byte) ([]byte, error) { if err := w.checkClosed(); err != nil { return nil, err } - w.incr("stable_gets", 1) + w.metrics.IncrementCounter("stable_gets", 1) return w.metaDB.GetStable(key) } @@ -605,7 +602,7 @@ func (w *WAL) rotateSegmentLocked(indexStart uint64) error { tail.SealTime = time.Now() tail.MaxIndex = newState.tail.LastIndex() tail.IndexStart = indexStart - w.setGauge("last_segment_age_seconds", uint64(tail.SealTime.Sub(tail.CreateTime).Seconds())) + w.metrics.SetGauge("last_segment_age_seconds", uint64(tail.SealTime.Sub(tail.CreateTime).Seconds())) // Update the old tail with the seal time etc. newState.segments = newState.segments.Set(tail.BaseIndex, *tail) @@ -613,7 +610,7 @@ func (w *WAL) rotateSegmentLocked(indexStart uint64) error { post, err := w.createNextSegment(newState) return nil, post, err } - w.incr("segment_rotations", 1) + w.metrics.IncrementCounter("segment_rotations", 1) return w.mutateStateLocked(txn) } @@ -715,7 +712,7 @@ func (w *WAL) truncateHeadLocked(newMin uint64) error { } postCommit = pc } - w.incr("head_truncations", nTruncated) + w.metrics.IncrementCounter("head_truncations", nTruncated) // Return a finalizer that will be called when all readers are done with the // segments in the current state to close and delete old segments. @@ -781,7 +778,7 @@ func (w *WAL) truncateTailLocked(newMax uint64) error { if err != nil { return nil, nil, err } - w.incr("tail_truncations", nTruncated) + w.metrics.IncrementCounter("tail_truncations", nTruncated) // Return a finalizer that will be called when all readers are done with the // segments in the current state to close and delete old segments. diff --git a/wal_test.go b/wal_test.go index fb7c180..a3a20ae 100644 --- a/wal_test.go +++ b/wal_test.go @@ -12,6 +12,7 @@ import ( "time" "github.com/hashicorp/raft" + "github.com/hashicorp/raft-wal/metrics" "github.com/stretchr/testify/require" ) @@ -599,7 +600,13 @@ func TestDeleteRange(t *testing.T) { for _, tc := range cases { tc := tc t.Run(tc.name, func(t *testing.T) { - ts, w, err := testOpenWAL(t, tc.tsOpts, tc.walOpts, false) + opts := tc.walOpts + + // add our own metrics counter + m := metrics.NewAtomicCollector(MetricDefinitions) + opts = append(opts, WithMetricsCollector(m)) + + ts, w, err := testOpenWAL(t, tc.tsOpts, opts, false) require.NoError(t, err) err = w.DeleteRange(tc.deleteMin, tc.deleteMax) @@ -659,9 +666,9 @@ func TestDeleteRange(t *testing.T) { validateLogEntry(t, &log) // Verify the metrics recorded what we expected! - metrics := w.Metrics() - require.Equal(t, int(tc.expectNTailTruncations), int(metrics["tail_truncations"])) - require.Equal(t, int(tc.expectNHeadTruncations), int(metrics["head_truncations"])) + metrics := m.Summary() + require.Equal(t, int(tc.expectNTailTruncations), int(metrics.Counters["tail_truncations"])) + require.Equal(t, int(tc.expectNHeadTruncations), int(metrics.Counters["head_truncations"])) }) } }