Skip to content

Commit

Permalink
Merge branch 'main' into add-healthy-instances-in-zone-count-to-lifec…
Browse files Browse the repository at this point in the history
…ycler
  • Loading branch information
JordanRushing committed Jun 25, 2024
2 parents 3884aa5 + b3bde8c commit 3afb61c
Show file tree
Hide file tree
Showing 57 changed files with 531 additions and 258 deletions.
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ linters:
- gofmt
- misspell
- revive
- loggercheck

linters-settings:
errcheck:
Expand Down
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,10 @@
* `operation_duration_seconds`
* [ENHANCEMENT] Add `outcome` label to `gate_duration_seconds` metric. Possible values are `rejected_canceled`, `rejected_deadline_exceeded`, `rejected_other`, and `permitted`. #512
* [ENHANCEMENT] Expose `InstancesWithTokensCount` and `InstancesWithTokensInZoneCount` in `ring.ReadRing` interface. #516
* [ENHANCEMENT] Middleware: determine route name in a single place, and add `middleware.ExtractRouteName()` method to allow consuming applications to retrieve the route name. #527
* [ENHANCEMENT] SpanProfiler: do less work on unsampled traces. #528
* [ENHANCEMENT] Log Middleware: if the trace is not sampled, log its ID as `trace_id_unsampled` instead of `trace_id`. #529
* [EHNANCEMENT] httpgrpc: httpgrpc Server can now use error message from special HTTP header when converting HTTP response to an error. This is useful when HTTP response body contains binary data that doesn't form valid utf-8 string, otherwise grpc would fail to marshal returned error. #531
* [BUGFIX] spanlogger: Support multiple tenant IDs. #59
* [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85
* [BUGFIX] Ring: `ring_member_ownership_percent` and `ring_tokens_owned` metrics are not updated on scale down. #109
Expand Down Expand Up @@ -240,3 +244,4 @@
* [BUGFIX] ring: don't mark trace spans as failed if `DoUntilQuorum` terminates due to cancellation. #449
* [BUGFIX] middleware: fix issue where applications that used the httpgrpc tracing middleware would generate duplicate spans for incoming HTTP requests. #451
* [BUGFIX] httpgrpc: store headers in canonical form when converting from gRPC to HTTP. #518
* [BUGFIX] Memcached: Don't truncate sub-second TTLs to 0 which results in them being cached forever. #530
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,10 @@ check-protos: clean-protos protos ## Re-generates protos and git diffs them
GOPATH=$(CURDIR)/.tools go install github.com/client9/misspell/cmd/misspell@v0.3.4

.tools/bin/faillint: .tools
GOPATH=$(CURDIR)/.tools go install github.com/fatih/faillint@v1.11.0
GOPATH=$(CURDIR)/.tools go install github.com/fatih/faillint@v1.13.0

.tools/bin/golangci-lint: .tools
GOPATH=$(CURDIR)/.tools go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.53.2
GOPATH=$(CURDIR)/.tools go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.59.1

.tools/bin/protoc: .tools
ifeq ("$(wildcard .tools/protoc/bin/protoc)","")
Expand Down
31 changes: 18 additions & 13 deletions cache/memcached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,22 +315,27 @@ func (c *MemcachedClient) Name() string {
}

func (c *MemcachedClient) SetMultiAsync(data map[string][]byte, ttl time.Duration) {
c.setMultiAsync(data, ttl, func(key string, buf []byte, ttl time.Duration) error {
return c.client.Set(&memcache.Item{
Key: key,
Value: buf,
Expiration: int32(ttl.Seconds()),
})
})
c.setMultiAsync(data, ttl, c.setSingleItem)
}

func (c *MemcachedClient) SetAsync(key string, value []byte, ttl time.Duration) {
c.setAsync(key, value, ttl, func(key string, buf []byte, ttl time.Duration) error {
return c.client.Set(&memcache.Item{
Key: key,
Value: buf,
Expiration: int32(ttl.Seconds()),
})
c.setAsync(key, value, ttl, c.setSingleItem)
}

func (c *MemcachedClient) setSingleItem(key string, value []byte, ttl time.Duration) error {
ttlSeconds := int32(ttl.Seconds())
// If a TTL of exactly 0 is passed, we honor it and pass it to Memcached which will
// interpret it as an infinite TTL. However, if we get a non-zero TTL that is truncated
// to 0 seconds, we discard the update since the caller didn't intend to set an infinite
// TTL.
if ttl != 0 && ttlSeconds <= 0 {
return nil
}

return c.client.Set(&memcache.Item{
Key: key,
Value: value,
Expiration: ttlSeconds,
})
}

Expand Down
41 changes: 37 additions & 4 deletions cache/memcached_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,44 @@ func TestMemcachedClientConfig_Validate(t *testing.T) {
}
}

func TestMemcachedClient_GetMulti(t *testing.T) {
setup := setupDefaultMemcachedClient
func TestMemcachedClient_SetAsync(t *testing.T) {
t.Run("with non-zero TTL", func(t *testing.T) {
client, _, err := setupDefaultMemcachedClient()
require.NoError(t, err)
client.SetAsync("foo", []byte("bar"), 10*time.Second)
require.NoError(t, client.wait())

ctx := context.Background()
res := client.GetMulti(ctx, []string{"foo"})
require.Equal(t, map[string][]byte{"foo": []byte("bar")}, res)
})

t.Run("with truncated zero TTL", func(t *testing.T) {
client, _, err := setupDefaultMemcachedClient()
require.NoError(t, err)
client.SetAsync("foo", []byte("bar"), 100*time.Millisecond)
require.NoError(t, client.wait())

ctx := context.Background()
res := client.GetMulti(ctx, []string{"foo"})
require.Empty(t, res)
})

t.Run("with zero TTL", func(t *testing.T) {
client, _, err := setupDefaultMemcachedClient()
require.NoError(t, err)
client.SetAsync("foo", []byte("bar"), 0)
require.NoError(t, client.wait())

ctx := context.Background()
res := client.GetMulti(ctx, []string{"foo"})
require.Equal(t, map[string][]byte{"foo": []byte("bar")}, res)
})
}

func TestMemcachedClient_GetMulti(t *testing.T) {
t.Run("no allocator", func(t *testing.T) {
client, backend, err := setup()
client, backend, err := setupDefaultMemcachedClient()
require.NoError(t, err)
client.SetAsync("foo", []byte("bar"), 10*time.Second)
require.NoError(t, client.wait())
Expand All @@ -77,7 +110,7 @@ func TestMemcachedClient_GetMulti(t *testing.T) {
})

t.Run("with allocator", func(t *testing.T) {
client, backend, err := setup()
client, backend, err := setupDefaultMemcachedClient()
require.NoError(t, err)
client.SetAsync("foo", []byte("bar"), 10*time.Second)
require.NoError(t, client.wait())
Expand Down
14 changes: 7 additions & 7 deletions concurrency/limited_concurrency_singleflight_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestLimitedConcurrencySingleFlight_ForEachNotInFlight_ConcurrencyLimit(t *t
require.NoError(t, sf.ForEachNotInFlight(ctx, tokens, f))
}

busyWorker := func(ctx context.Context, s string) error {
busyWorker := func(context.Context, string) error {
workersToStart.Done()
<-workersWait
return nil
Expand All @@ -49,7 +49,7 @@ func TestLimitedConcurrencySingleFlight_ForEachNotInFlight_ConcurrencyLimit(t *t
workersToStart.Wait()

extraWorkerInvoked := make(chan struct{})
go forEachNotInFlight(func(ctx context.Context, s string) error {
go forEachNotInFlight(func(context.Context, string) error {
close(extraWorkerInvoked)
return nil
}, "10")
Expand Down Expand Up @@ -77,7 +77,7 @@ func TestLimitedConcurrencySingleFlight_ForEachNotInFlight_ReturnsWhenAllTokensA

workersToStart.Add(1)
go func() {
require.NoError(t, sf.ForEachNotInFlight(ctx, []string{token}, func(ctx context.Context, s string) error {
require.NoError(t, sf.ForEachNotInFlight(ctx, []string{token}, func(context.Context, string) error {
workersToStart.Done()
<-workersWait
return nil
Expand All @@ -87,7 +87,7 @@ func TestLimitedConcurrencySingleFlight_ForEachNotInFlight_ReturnsWhenAllTokensA
workersToStart.Wait()

duplicatedTokenInvoked := false
require.NoError(t, sf.ForEachNotInFlight(ctx, []string{token}, func(ctx context.Context, s string) error {
require.NoError(t, sf.ForEachNotInFlight(ctx, []string{token}, func(context.Context, string) error {
duplicatedTokenInvoked = true
return nil
}))
Expand All @@ -114,7 +114,7 @@ func TestLimitedConcurrencySingleFlight_ForEachNotInFlight_CallsOnlyNotInFlightT

workersToStart.Add(1)
go func() {
require.NoError(t, sf.ForEachNotInFlight(ctx, []string{tokenA}, func(ctx context.Context, s string) error {
require.NoError(t, sf.ForEachNotInFlight(ctx, []string{tokenA}, func(context.Context, string) error {
workersToStart.Done()
<-workersWait
return nil
Expand All @@ -123,7 +123,7 @@ func TestLimitedConcurrencySingleFlight_ForEachNotInFlight_CallsOnlyNotInFlightT

workersToStart.Wait()
var invocations atomic.Int64
assert.NoError(t, sf.ForEachNotInFlight(ctx, []string{tokenA, tokenB}, func(ctx context.Context, s string) error {
assert.NoError(t, sf.ForEachNotInFlight(ctx, []string{tokenA, tokenB}, func(_ context.Context, s string) error {
assert.Equal(t, tokenB, s)
invocations.Inc()
return nil
Expand All @@ -136,7 +136,7 @@ func TestLimitedConcurrencySingleFlight_ForEachNotInFlight_ReturnsWhenTokensAreE
t.Parallel()

var invocations atomic.Int64
assert.NoError(t, NewLimitedConcurrencySingleFlight(10).ForEachNotInFlight(context.Background(), []string{}, func(ctx context.Context, s string) error {
assert.NoError(t, NewLimitedConcurrencySingleFlight(10).ForEachNotInFlight(context.Background(), []string{}, func(context.Context, string) error {
invocations.Inc()
return nil
}))
Expand Down
28 changes: 14 additions & 14 deletions concurrency/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestForEachUser(t *testing.T) {

input := []string{"a", "b", "c"}

err := ForEachUser(context.Background(), input, 2, func(ctx context.Context, user string) error {
err := ForEachUser(context.Background(), input, 2, func(_ context.Context, user string) error {
processedMx.Lock()
defer processedMx.Unlock()
processed = append(processed, user)
Expand All @@ -40,7 +40,7 @@ func TestForEachUser_ShouldContinueOnErrorButReturnIt(t *testing.T) {

input := []string{"a", "b", "c"}

err := ForEachUser(context.Background(), input, 2, func(ctx context.Context, user string) error {
err := ForEachUser(context.Background(), input, 2, func(ctx context.Context, _ string) error {
if processed.CompareAndSwap(0, 1) {
return errors.New("the first request is failing")
}
Expand All @@ -63,7 +63,7 @@ func TestForEachUser_ShouldContinueOnErrorButReturnIt(t *testing.T) {
}

func TestForEachUser_ShouldReturnImmediatelyOnNoUsersProvided(t *testing.T) {
require.NoError(t, ForEachUser(context.Background(), nil, 2, func(ctx context.Context, user string) error {
require.NoError(t, ForEachUser(context.Background(), nil, 2, func(context.Context, string) error {
return nil
}))
}
Expand All @@ -72,7 +72,7 @@ func TestForEachJob(t *testing.T) {
jobs := []string{"a", "b", "c"}
processed := make([]string, len(jobs))

err := ForEachJob(context.Background(), len(jobs), 2, func(ctx context.Context, idx int) error {
err := ForEachJob(context.Background(), len(jobs), 2, func(_ context.Context, idx int) error {
processed[idx] = jobs[idx]
return nil
})
Expand All @@ -85,7 +85,7 @@ func TestForEachJob_ShouldBreakOnFirstError_ContextCancellationHandled(t *testin
// Keep the processed jobs count.
var processed atomic.Int32

err := ForEachJob(context.Background(), 3, 2, func(ctx context.Context, idx int) error {
err := ForEachJob(context.Background(), 3, 2, func(ctx context.Context, _ int) error {
if processed.CompareAndSwap(0, 1) {
return errors.New("the first request is failing")
}
Expand Down Expand Up @@ -116,7 +116,7 @@ func TestForEachJob_ShouldBreakOnFirstError_ContextCancellationUnhandled(t *test
var wg sync.WaitGroup
wg.Add(2)

err := ForEachJob(context.Background(), 3, 2, func(ctx context.Context, idx int) error {
err := ForEachJob(context.Background(), 3, 2, func(ctx context.Context, _ int) error {
wg.Done()

if processed.CompareAndSwap(0, 1) {
Expand All @@ -143,7 +143,7 @@ func TestForEachJob_ShouldBreakOnFirstError_ContextCancellationUnhandled(t *test
func TestForEachJob_ShouldReturnImmediatelyOnNoJobsProvided(t *testing.T) {
// Keep the processed jobs count.
var processed atomic.Int32
require.NoError(t, ForEachJob(context.Background(), 0, 2, func(ctx context.Context, idx int) error {
require.NoError(t, ForEachJob(context.Background(), 0, 2, func(context.Context, int) error {
processed.Inc()
return nil
}))
Expand All @@ -161,9 +161,9 @@ func TestForEachJob_ShouldCancelContextPassedToCallbackOnceDone(t *testing.T) {
contexts []context.Context
)

jobFunc := func(ctx context.Context, idx int) error {
jobFunc := func(ctx context.Context, _ int) error {
// Context should not be cancelled.
assert.Nil(t, ctx.Err())
require.NoError(t, ctx.Err())

contextsMx.Lock()
contexts = append(contexts, ctx)
Expand Down Expand Up @@ -194,7 +194,7 @@ func TestForEach(t *testing.T) {

jobs := []string{"a", "b", "c"}

err := ForEach(context.Background(), CreateJobsFromStrings(jobs), 2, func(ctx context.Context, job interface{}) error {
err := ForEach(context.Background(), CreateJobsFromStrings(jobs), 2, func(_ context.Context, job interface{}) error {
processedMx.Lock()
defer processedMx.Unlock()
processed = append(processed, job.(string))
Expand All @@ -213,7 +213,7 @@ func TestForEach_ShouldBreakOnFirstError_ContextCancellationHandled(t *testing.T
processed atomic.Int32
)

err := ForEach(ctx, []interface{}{"a", "b", "c"}, 2, func(ctx context.Context, job interface{}) error {
err := ForEach(ctx, []interface{}{"a", "b", "c"}, 2, func(ctx context.Context, _ interface{}) error {
if processed.CompareAndSwap(0, 1) {
return errors.New("the first request is failing")
}
Expand Down Expand Up @@ -244,7 +244,7 @@ func TestForEach_ShouldBreakOnFirstError_ContextCancellationUnhandled(t *testing
var wg sync.WaitGroup
wg.Add(2)

err := ForEach(context.Background(), []interface{}{"a", "b", "c"}, 2, func(ctx context.Context, job interface{}) error {
err := ForEach(context.Background(), []interface{}{"a", "b", "c"}, 2, func(ctx context.Context, _ interface{}) error {
wg.Done()

if processed.CompareAndSwap(0, 1) {
Expand All @@ -269,7 +269,7 @@ func TestForEach_ShouldBreakOnFirstError_ContextCancellationUnhandled(t *testing
}

func TestForEach_ShouldReturnImmediatelyOnNoJobsProvided(t *testing.T) {
require.NoError(t, ForEach(context.Background(), nil, 2, func(ctx context.Context, job interface{}) error {
require.NoError(t, ForEach(context.Background(), nil, 2, func(context.Context, interface{}) error {
return nil
}))
}
Expand Down Expand Up @@ -391,7 +391,7 @@ func TestForEachJobMergeResults(t *testing.T) {
close(waitBeforeReturningError)
}()

_, err := ForEachJobMergeResults[[]string, string](context.Background(), jobs, 0, func(ctx context.Context, job []string) ([]string, error) {
_, err := ForEachJobMergeResults[[]string, string](context.Background(), jobs, 0, func(context.Context, []string) ([]string, error) {
callbacksStarted.Done()
<-waitBeforeReturningError

Expand Down
2 changes: 1 addition & 1 deletion crypto/tls/test/tls_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func newIntegrationClientServer(
serv, err := server.New(cfg)
require.NoError(t, err)

serv.HTTP.HandleFunc("/hello", func(w http.ResponseWriter, r *http.Request) {
serv.HTTP.HandleFunc("/hello", func(w http.ResponseWriter, _ *http.Request) {
fmt.Fprintf(w, "OK")
})

Expand Down
2 changes: 1 addition & 1 deletion grpcclient/ratelimit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestRateLimiterFailureResultsInResourceExhaustedError(t *testing.T) {
RateLimit: 0,
}
conn := grpc.ClientConn{}
invoker := func(currentCtx context.Context, currentMethod string, currentReq, currentRepl interface{}, currentConn *grpc.ClientConn, currentOpts ...grpc.CallOption) error {
invoker := func(context.Context, string, interface{}, interface{}, *grpc.ClientConn, ...grpc.CallOption) error {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion grpcutil/dns_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (w *dnsWatcher) lookupSRV() map[string]*Update {
for _, a := range addrs {
a, ok := formatIP(a)
if !ok {
level.Error(w.logger).Log("failed IP parsing", "err", err)
level.Error(w.logger).Log("msg", "failed IP parsing", "err", err)
continue
}
addr := a + ":" + strconv.Itoa(int(s.Port))
Expand Down
4 changes: 2 additions & 2 deletions grpcutil/health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type Check func(ctx context.Context) bool

// WithManager returns a new Check that tests if the managed services are healthy.
func WithManager(manager *services.Manager) Check {
return func(ctx context.Context) bool {
return func(context.Context) bool {
states := manager.ServicesByState()

// Given this is a health check endpoint for the whole instance, we should consider
Expand All @@ -33,7 +33,7 @@ func WithManager(manager *services.Manager) Check {

// WithShutdownRequested returns a new Check that returns false when shutting down.
func WithShutdownRequested(requested *atomic.Bool) Check {
return func(ctx context.Context) bool {
return func(context.Context) bool {
return !requested.Load()
}
}
Expand Down
4 changes: 2 additions & 2 deletions hedging/hedging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestHedging(t *testing.T) {
}
count := atomic.NewInt32(0)
client, err := Client(cfg, &http.Client{
Transport: RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
Transport: RoundTripperFunc(func(*http.Request) (*http.Response, error) {
count.Inc()
time.Sleep(200 * time.Millisecond)
return &http.Response{
Expand Down Expand Up @@ -70,7 +70,7 @@ func TestHedgingRateLimit(t *testing.T) {
}
count := atomic.NewInt32(0)
client, err := Client(cfg, &http.Client{
Transport: RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
Transport: RoundTripperFunc(func(*http.Request) (*http.Response, error) {
count.Inc()
time.Sleep(200 * time.Millisecond)
return &http.Response{
Expand Down
Loading

0 comments on commit 3afb61c

Please sign in to comment.