Skip to content

Commit

Permalink
Merge branch 'main' into aldernero/memberlist-kv-delete
Browse files Browse the repository at this point in the history
  • Loading branch information
aldernero authored Jan 16, 2025
2 parents fda64ea + 441a90a commit ebd0c81
Show file tree
Hide file tree
Showing 35 changed files with 840 additions and 85 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
## Changelog

* [CHANGE] Add new metric `slow_request_server_throughput` to track the throughput of slow queries. #619
* [CHANGE] Log middleware updated to honor `logRequestHeaders` in all logging scenarios. #615
* [CHANGE] Roll back the gRPC dependency to v1.65.0 to allow downstream projects to avoid a performance regression and maybe a bug in v1.66.0. #581
* [CHANGE] Update the gRPC dependency to v1.66.0 and deprecate the `grpc_server_recv_buffer_pools_enabled` option that is no longer supported by it. #580
* [CHANGE] `ring.DoBatchWithOptions` (and `ring.DoBatch`) reports the cancelation cause when the context is canceled instead of `context.Canceled`.
Expand Down Expand Up @@ -97,6 +99,8 @@
* [FEATURE] Add methods `Increment`, `FlushAll`, `CompareAndSwap`, `Touch` to `cache.MemcachedClient` #477
* [FEATURE] Add `concurrency.ForEachJobMergeResults()` utility function. #486
* [FEATURE] Add `ring.DoMultiUntilQuorumWithoutSuccessfulContextCancellation()`. #495
* [ENHANCEMENT] Add option to hide token information in ring status page #633
* [ENHANCEMENT] Display token information in partition ring status page #631
* [ENHANCEMENT] Add ability to log all source hosts from http header instead of only the first one. #444
* [ENHANCEMENT] Add configuration to customize backoff for the gRPC clients.
* [ENHANCEMENT] Use `SecretReader` interface to fetch secrets when configuring TLS. #274
Expand Down Expand Up @@ -237,6 +241,7 @@
* [ENHANCEMENT] Memberlist: Add concurrency to the transport's WriteTo method. #525
* [ENHANCEMENT] Memberlist: Notifications can now be processed once per interval specified by `-memberlist.notify-interval` to reduce notify storms in large clusters. #592
* [ENHANCEMENT] Memberlist: Implemented the `Delete` operation in the memberlist backed KV store. How frequently deleted entries are cleaned up is specified by the `-memberlist.obsolete-entries-timeout` flag. #612
* [ENHANCEMENT] KV: Add `MockCountingClient`, which wraps the `kv.client` and can be used in order to count calls at specific functions of the interface. #618
* [BUGFIX] spanlogger: Support multiple tenant IDs. #59
* [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85
* [BUGFIX] Ring: `ring_member_ownership_percent` and `ring_tokens_owned` metrics are not updated on scale down. #109
Expand Down
6 changes: 4 additions & 2 deletions kv/consul/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ import (
"github.com/grafana/dskit/kv/codec"
)

// The max wait time allowed for mockKV operations, in order to have faster tests.
const maxWaitTime = 100 * time.Millisecond

type mockKV struct {
mtx sync.Mutex
cond *sync.Cond
Expand Down Expand Up @@ -87,7 +90,7 @@ func (m *mockKV) loop() {
select {
case <-m.close:
return
case <-time.After(time.Second):
case <-time.After(maxWaitTime):
m.mtx.Lock()
m.cond.Broadcast()
m.mtx.Unlock()
Expand Down Expand Up @@ -258,7 +261,6 @@ func (m *mockKV) ResetIndexForKey(key string) {
// mockedMaxWaitTime returns the minimum duration between the input duration
// and the max wait time allowed in this mock, in order to have faster tests.
func mockedMaxWaitTime(queryWaitTime time.Duration) time.Duration {
const maxWaitTime = time.Second
if queryWaitTime > maxWaitTime {
return maxWaitTime
}
Expand Down
61 changes: 61 additions & 0 deletions kv/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"go.uber.org/atomic"
)

// The mockClient does not anything.
Expand Down Expand Up @@ -37,3 +38,63 @@ func (m mockClient) WatchKey(_ context.Context, _ string, _ func(interface{}) bo

func (m mockClient) WatchPrefix(_ context.Context, _ string, _ func(string, interface{}) bool) {
}

// MockCountingClient is a wrapper around the Client interface that counts the number of times its functions are called.
// This is used for testing only.
type MockCountingClient struct {
client Client

ListCalls *atomic.Uint32
GetCalls *atomic.Uint32
DeleteCalls *atomic.Uint32
CASCalls *atomic.Uint32
WatchKeyCalls *atomic.Uint32
WatchPrefixCalls *atomic.Uint32
}

func NewMockCountingClient(client Client) *MockCountingClient {
return &MockCountingClient{
client: client,
ListCalls: atomic.NewUint32(0),
GetCalls: atomic.NewUint32(0),
DeleteCalls: atomic.NewUint32(0),
CASCalls: atomic.NewUint32(0),
WatchKeyCalls: atomic.NewUint32(0),
WatchPrefixCalls: atomic.NewUint32(0),
}
}

func (mc *MockCountingClient) List(ctx context.Context, prefix string) ([]string, error) {
mc.ListCalls.Inc()

return mc.client.List(ctx, prefix)
}
func (mc *MockCountingClient) Get(ctx context.Context, key string) (interface{}, error) {
mc.GetCalls.Inc()

return mc.client.Get(ctx, key)
}

func (mc *MockCountingClient) Delete(ctx context.Context, key string) error {
mc.DeleteCalls.Inc()

return mc.client.Delete(ctx, key)
}

func (mc *MockCountingClient) CAS(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error {
mc.CASCalls.Inc()

return mc.client.CAS(ctx, key, f)
}

func (mc *MockCountingClient) WatchKey(ctx context.Context, key string, f func(interface{}) bool) {
mc.WatchKeyCalls.Inc()

mc.client.WatchKey(ctx, key, f)
}

func (mc *MockCountingClient) WatchPrefix(ctx context.Context, key string, f func(string, interface{}) bool) {
mc.WatchPrefixCalls.Inc()

mc.client.WatchPrefix(ctx, key, f)
}
8 changes: 4 additions & 4 deletions loser/loser.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

package loser

import "golang.org/x/exp/constraints"
import "cmp"

func New[E constraints.Ordered](lists [][]E, maxVal E) *Tree[E] {
func New[E cmp.Ordered](lists [][]E, maxVal E) *Tree[E] {
nLists := len(lists)
t := Tree[E]{
maxVal: maxVal,
Expand All @@ -23,12 +23,12 @@ func New[E constraints.Ordered](lists [][]E, maxVal E) *Tree[E] {
// A loser tree is a binary tree laid out such that nodes N and N+1 have parent N/2.
// We store M leaf nodes in positions M...2M-1, and M-1 internal nodes in positions 1..M-1.
// Node 0 is a special node, containing the winner of the contest.
type Tree[E constraints.Ordered] struct {
type Tree[E cmp.Ordered] struct {
maxVal E
nodes []node[E]
}

type node[E constraints.Ordered] struct {
type node[E cmp.Ordered] struct {
index int // This is the loser for all nodes except the 0th, where it is the winner.
value E // Value copied from the loser node, or winner for node 0.
items []E // Only populated for leaf nodes.
Expand Down
6 changes: 3 additions & 3 deletions loser/loser_test.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
package loser_test

import (
"cmp"
"fmt"
"math"
"math/rand"
"slices"
"testing"

"github.com/stretchr/testify/require"
"golang.org/x/exp/constraints"
"golang.org/x/exp/slices"

"github.com/grafana/dskit/loser"
)

func checkTreeEqual[E constraints.Ordered](t *testing.T, tree *loser.Tree[E], expected []E, msg ...interface{}) {
func checkTreeEqual[E cmp.Ordered](t *testing.T, tree *loser.Tree[E], expected []E, msg ...interface{}) {
t.Helper()
actual := []E{}

Expand Down
5 changes: 5 additions & 0 deletions middleware/http_tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ func (t Tracer) Wrap(next http.Handler) http.Handler {
sp.SetTag("http.user_agent", userAgent)
}

// add the content type, useful when query requests are sent as POST
if ct := r.Header.Get("Content-Type"); ct != "" {
sp.SetTag("http.content_type", ct)
}

// add a tag with the client's sourceIPs to the span, if a
// SourceIPExtractor is given.
if t.SourceIPs != nil {
Expand Down
36 changes: 36 additions & 0 deletions middleware/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ package middleware

import (
"context"
"fmt"
"io"
"net/http"
"strconv"
"strings"
"time"

"github.com/felixge/httpsnoop"
"github.com/gorilla/mux"
Expand Down Expand Up @@ -50,6 +52,9 @@ type Instrument struct {
RequestBodySize *prometheus.HistogramVec
ResponseBodySize *prometheus.HistogramVec
InflightRequests *prometheus.GaugeVec
LatencyCutoff time.Duration
ThroughputUnit string
RequestThroughput *prometheus.HistogramVec
}

// IsWSHandshakeRequest returns true if the given request is a websocket handshake request.
Expand Down Expand Up @@ -105,9 +110,40 @@ func (i Instrument) Wrap(next http.Handler) http.Handler {
labelValues = append(labelValues, tenantID)
instrument.ObserveWithExemplar(r.Context(), i.PerTenantDuration.WithLabelValues(labelValues...), respMetrics.Duration.Seconds())
}
if i.LatencyCutoff > 0 && respMetrics.Duration > i.LatencyCutoff {
volume, err := extractValueFromMultiValueHeader(w.Header().Get("Server-Timing"), i.ThroughputUnit, "val")
if err == nil {
instrument.ObserveWithExemplar(r.Context(), i.RequestThroughput.WithLabelValues(r.Method, route), volume/respMetrics.Duration.Seconds())
}
}
})
}

// Extracts a single value from a multi-value header, e.g. "name0;key0=0.0;key1=1.1, name1;key0=1.1"
func extractValueFromMultiValueHeader(h, name string, key string) (float64, error) {
parts := strings.Split(h, ", ")
if len(parts) == 0 {
return 0, fmt.Errorf("not a multi-value header")
}
for _, part := range parts {
if part, found := strings.CutPrefix(part, name); found {
for _, spart := range strings.Split(part, ";") {
if !strings.HasPrefix(spart, key) {
continue
}
var value float64
_, err := fmt.Sscanf(spart, key+"=%f", &value)
if err != nil {
return 0, fmt.Errorf("failed to parse value from header: %w", err)
}
return value, nil
}
}

}
return 0, fmt.Errorf("desired name not found in header")
}

// Return a name identifier for ths request. There are three options:
// 1. The request matches a gorilla mux route, with a name. Use that.
// 2. The request matches an unamed gorilla mux router. Munge the path
Expand Down
Loading

0 comments on commit ebd0c81

Please sign in to comment.