From 8c5eb416bc833bbcffa857db9fd77e290e83d877 Mon Sep 17 00:00:00 2001 From: Yuri Nikolic Date: Tue, 28 Jan 2025 10:53:31 +0100 Subject: [PATCH 1/7] Adding grpc_cluster client and server middleware Signed-off-by: Yuri Nikolic --- middleware/grpc_cluster.go | 53 ++++++++++++++++ middleware/grpc_cluster_test.go | 103 ++++++++++++++++++++++++++++++++ 2 files changed, 156 insertions(+) create mode 100644 middleware/grpc_cluster.go create mode 100644 middleware/grpc_cluster_test.go diff --git a/middleware/grpc_cluster.go b/middleware/grpc_cluster.go new file mode 100644 index 000000000..984c79685 --- /dev/null +++ b/middleware/grpc_cluster.go @@ -0,0 +1,53 @@ +package middleware + +import ( + "context" + "fmt" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/gogo/status" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" +) + +const ( + MetadataClusterKey = "x-cluster" +) + +// ClusterUnaryClientInterceptor propagates the given cluster info to gRPC metadata. +func ClusterUnaryClientInterceptor(cluster string) grpc.UnaryClientInterceptor { + return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + if cluster != "" { + ctx = metadata.AppendToOutgoingContext(ctx, MetadataClusterKey, cluster) + } + + return invoker(ctx, method, req, reply, cc, opts...) + } +} + +// ClusterUnaryServerInterceptor checks if the incoming gRPC metadata contain any cluster information and if so, +// checks if the latter corresponds to the given info. If it is the case, the request is further propagated. +// Otherwise, an error is returned. +func ClusterUnaryServerInterceptor(cluster string, logger log.Logger) grpc.UnaryServerInterceptor { + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + reqCluster, ok := getClusterFromIncomingContext(ctx) + if ok { + if reqCluster != cluster { + msg := fmt.Sprintf("request intended for cluster %q - this is cluster %q", reqCluster, cluster) + level.Error(logger).Log("msg", msg) + return nil, status.Error(codes.FailedPrecondition, msg) + } + } + return handler(ctx, req) + } +} + +func getClusterFromIncomingContext(ctx context.Context) (string, bool) { + clusterIDs := metadata.ValueFromIncomingContext(ctx, MetadataClusterKey) + if len(clusterIDs) != 1 { + return "", false + } + return clusterIDs[0], true +} diff --git a/middleware/grpc_cluster_test.go b/middleware/grpc_cluster_test.go new file mode 100644 index 000000000..762f4a34b --- /dev/null +++ b/middleware/grpc_cluster_test.go @@ -0,0 +1,103 @@ +package middleware + +import ( + "context" + "net/http" + "os" + "strings" + "testing" + + "github.com/go-kit/log" + "github.com/gogo/status" + "github.com/grafana/dskit/httpgrpc" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" +) + +func TestClusterUnaryClientInterceptor(t *testing.T) { + testCases := map[string]struct { + cluster string + expectedClusterFromContext string + }{ + "no cluster info sets no cluster info in context": { + cluster: "", + expectedClusterFromContext: "", + }, + "if cluster info is set, it should be propagated to invoker": { + cluster: "cluster", + expectedClusterFromContext: "cluster", + }, + } + verify := func(ctx context.Context, expectedCluster string) { + md, ok := metadata.FromOutgoingContext(ctx) + require.True(t, ok) + clusterIDs, ok := md[MetadataClusterKey] + require.True(t, ok) + require.Len(t, clusterIDs, 1) + require.Equal(t, expectedCluster, clusterIDs[0]) + } + for testName, testCase := range testCases { + t.Run(testName, func(t *testing.T) { + interceptor := ClusterUnaryClientInterceptor(testCase.cluster) + invoker := func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, opts ...grpc.CallOption) error { + if testCase.expectedClusterFromContext != "" { + verify(ctx, testCase.expectedClusterFromContext) + } + return nil + } + + interceptor(context.Background(), "GET", createRequest(t), nil, nil, invoker) + }) + } +} + +func TestClusterUnaryServerInterceptor(t *testing.T) { + testCases := map[string]struct { + requestCluster string + serverCluster string + expectedError error + }{ + "equal request and server clusters give no error": { + requestCluster: "cluster", + serverCluster: "cluster", + expectedError: nil, + }, + "different request and server clusters give rise to an error": { + requestCluster: "wrong-cluster", + serverCluster: "cluster", + expectedError: status.Error(codes.FailedPrecondition, "request intended for cluster \"wrong-cluster\" - this is cluster \"cluster\""), + }, + } + for testName, testCase := range testCases { + t.Run(testName, func(t *testing.T) { + logger := log.NewLogfmtLogger(os.Stdin) + interceptor := ClusterUnaryServerInterceptor(testCase.serverCluster, logger) + handler := func(context.Context, interface{}) (interface{}, error) { + return nil, nil + } + + md := map[string][]string{ + MetadataClusterKey: {testCase.requestCluster}, + } + ctx := metadata.NewIncomingContext(context.Background(), md) + info := &grpc.UnaryServerInfo{FullMethod: "/Test/Me"} + req := createRequest(t) + _, err := interceptor(ctx, req, info, handler) + if testCase.expectedError == nil { + require.NoError(t, err) + } else { + require.Equal(t, testCase.expectedError, err) + } + }) + } +} + +func createRequest(t *testing.T) *httpgrpc.HTTPRequest { + r, err := http.NewRequest("POST", "/i/am/calling/you", strings.NewReader("some body")) + require.NoError(t, err) + req, err := httpgrpc.FromHTTPRequest(r) + require.NoError(t, err) + return req +} From 7c3cf0a745cb34b991b9e083b4739b3e30ee4fe4 Mon Sep 17 00:00:00 2001 From: Yuri Nikolic Date: Tue, 28 Jan 2025 11:09:06 +0100 Subject: [PATCH 2/7] Updating CHANGELOG Signed-off-by: Yuri Nikolic --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index cb4029f73..ad9a1aad7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -99,6 +99,8 @@ * [FEATURE] Add methods `Increment`, `FlushAll`, `CompareAndSwap`, `Touch` to `cache.MemcachedClient` #477 * [FEATURE] Add `concurrency.ForEachJobMergeResults()` utility function. #486 * [FEATURE] Add `ring.DoMultiUntilQuorumWithoutSuccessfulContextCancellation()`. #495 +* [FEATURE] Add `middleware.ClusterUnaryClientInterceptor`, a `grpc.UnaryClientInterceptor` that propagates a cluster info to the outgoing gRPC metadata. #640 +* [FEATURE] Add `middleware.ClusterUnaryServerInterceptor`, a `grpc.UnaryServerInterceptor` that checks if the incoming gRPC metadata contains a correct cluster info, and returns an error if it is not the case. #640 * [ENHANCEMENT] Add option to hide token information in ring status page #633 * [ENHANCEMENT] Display token information in partition ring status page #631 * [ENHANCEMENT] Add ability to log all source hosts from http header instead of only the first one. #444 From 10043acf216f68bdfdd0c217878bd3a8036bcbd3 Mon Sep 17 00:00:00 2001 From: Yuri Nikolic Date: Tue, 28 Jan 2025 11:21:47 +0100 Subject: [PATCH 3/7] Fixing lint issues Signed-off-by: Yuri Nikolic --- middleware/grpc_cluster.go | 2 +- middleware/grpc_cluster_test.go | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/middleware/grpc_cluster.go b/middleware/grpc_cluster.go index 984c79685..c39cd9a82 100644 --- a/middleware/grpc_cluster.go +++ b/middleware/grpc_cluster.go @@ -31,7 +31,7 @@ func ClusterUnaryClientInterceptor(cluster string) grpc.UnaryClientInterceptor { // checks if the latter corresponds to the given info. If it is the case, the request is further propagated. // Otherwise, an error is returned. func ClusterUnaryServerInterceptor(cluster string, logger log.Logger) grpc.UnaryServerInterceptor { - return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + return func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { reqCluster, ok := getClusterFromIncomingContext(ctx) if ok { if reqCluster != cluster { diff --git a/middleware/grpc_cluster_test.go b/middleware/grpc_cluster_test.go index 762f4a34b..88d9b5ab9 100644 --- a/middleware/grpc_cluster_test.go +++ b/middleware/grpc_cluster_test.go @@ -41,14 +41,15 @@ func TestClusterUnaryClientInterceptor(t *testing.T) { for testName, testCase := range testCases { t.Run(testName, func(t *testing.T) { interceptor := ClusterUnaryClientInterceptor(testCase.cluster) - invoker := func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, opts ...grpc.CallOption) error { + invoker := func(ctx context.Context, _ string, req, _ any, cc *grpc.ClientConn, opts ...grpc.CallOption) error { if testCase.expectedClusterFromContext != "" { verify(ctx, testCase.expectedClusterFromContext) } return nil } - interceptor(context.Background(), "GET", createRequest(t), nil, nil, invoker) + err := interceptor(context.Background(), "GET", createRequest(t), nil, nil, invoker) + require.NoError(t, err) }) } } From b5ceef973b07105ef9e9f7e1650cca3025aaa9e5 Mon Sep 17 00:00:00 2001 From: Yuri Nikolic Date: Tue, 28 Jan 2025 11:33:41 +0100 Subject: [PATCH 4/7] Fixing lint issues Signed-off-by: Yuri Nikolic --- middleware/grpc_cluster_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/middleware/grpc_cluster_test.go b/middleware/grpc_cluster_test.go index 88d9b5ab9..c45eecca2 100644 --- a/middleware/grpc_cluster_test.go +++ b/middleware/grpc_cluster_test.go @@ -9,7 +9,9 @@ import ( "github.com/go-kit/log" "github.com/gogo/status" + "github.com/grafana/dskit/httpgrpc" + "github.com/stretchr/testify/require" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -41,7 +43,7 @@ func TestClusterUnaryClientInterceptor(t *testing.T) { for testName, testCase := range testCases { t.Run(testName, func(t *testing.T) { interceptor := ClusterUnaryClientInterceptor(testCase.cluster) - invoker := func(ctx context.Context, _ string, req, _ any, cc *grpc.ClientConn, opts ...grpc.CallOption) error { + invoker := func(ctx context.Context, _ string, _, _ any, _ *grpc.ClientConn, _ ...grpc.CallOption) error { if testCase.expectedClusterFromContext != "" { verify(ctx, testCase.expectedClusterFromContext) } From 679b917072fd504ff639fd4c6421cef36d747553 Mon Sep 17 00:00:00 2001 From: Yuri Nikolic Date: Tue, 28 Jan 2025 15:34:54 +0100 Subject: [PATCH 5/7] Fixing review findings Signed-off-by: Yuri Nikolic --- middleware/grpc_cluster.go | 11 +++--- middleware/grpc_cluster_test.go | 63 +++++++++++++++++++++++++-------- 2 files changed, 54 insertions(+), 20 deletions(-) diff --git a/middleware/grpc_cluster.go b/middleware/grpc_cluster.go index c39cd9a82..71ac46280 100644 --- a/middleware/grpc_cluster.go +++ b/middleware/grpc_cluster.go @@ -27,16 +27,17 @@ func ClusterUnaryClientInterceptor(cluster string) grpc.UnaryClientInterceptor { } } -// ClusterUnaryServerInterceptor checks if the incoming gRPC metadata contain any cluster information and if so, -// checks if the latter corresponds to the given info. If it is the case, the request is further propagated. +// ClusterUnaryServerInterceptor checks if the incoming gRPC metadata contains any cluster information and if so, +// checks if the latter corresponds to the given cluster. If it is the case, the request is further propagated. // Otherwise, an error is returned. func ClusterUnaryServerInterceptor(cluster string, logger log.Logger) grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { - reqCluster, ok := getClusterFromIncomingContext(ctx) - if ok { + reqCluster, requestClusterFound := getClusterFromIncomingContext(ctx) + clustersConsistent := (cluster == "" && !requestClusterFound) || cluster == reqCluster + if !clustersConsistent { if reqCluster != cluster { msg := fmt.Sprintf("request intended for cluster %q - this is cluster %q", reqCluster, cluster) - level.Error(logger).Log("msg", msg) + level.Warn(logger).Log("msg", msg) return nil, status.Error(codes.FailedPrecondition, msg) } } diff --git a/middleware/grpc_cluster_test.go b/middleware/grpc_cluster_test.go index c45eecca2..51a219291 100644 --- a/middleware/grpc_cluster_test.go +++ b/middleware/grpc_cluster_test.go @@ -58,19 +58,46 @@ func TestClusterUnaryClientInterceptor(t *testing.T) { func TestClusterUnaryServerInterceptor(t *testing.T) { testCases := map[string]struct { - requestCluster string - serverCluster string - expectedError error + incomingContext context.Context + requestCluster string + serverCluster string + expectedError error }{ "equal request and server clusters give no error": { - requestCluster: "cluster", - serverCluster: "cluster", - expectedError: nil, + incomingContext: createIncomingContext(true, "cluster"), + requestCluster: "cluster", + serverCluster: "cluster", + expectedError: nil, }, "different request and server clusters give rise to an error": { - requestCluster: "wrong-cluster", - serverCluster: "cluster", - expectedError: status.Error(codes.FailedPrecondition, "request intended for cluster \"wrong-cluster\" - this is cluster \"cluster\""), + incomingContext: createIncomingContext(true, "wrong-cluster"), + requestCluster: "wrong-cluster", + serverCluster: "cluster", + expectedError: status.Error(codes.FailedPrecondition, "request intended for cluster \"wrong-cluster\" - this is cluster \"cluster\""), + }, + "empty request cluster and non-empty server custer give rise to an error": { + incomingContext: createIncomingContext(true, ""), + requestCluster: "", + serverCluster: "cluster", + expectedError: status.Error(codes.FailedPrecondition, "request intended for cluster \"\" - this is cluster \"cluster\""), + }, + "no request cluster and non-empty server custer give rise to an error": { + incomingContext: createIncomingContext(false, ""), + requestCluster: "", + serverCluster: "cluster", + expectedError: status.Error(codes.FailedPrecondition, "request intended for cluster \"\" - this is cluster \"cluster\""), + }, + "empty request cluster and empty server custer give no error": { + incomingContext: createIncomingContext(true, ""), + requestCluster: "", + serverCluster: "", + expectedError: nil, + }, + "no request cluster and empty server custer give no error": { + incomingContext: createIncomingContext(false, ""), + requestCluster: "", + serverCluster: "", + expectedError: nil, }, } for testName, testCase := range testCases { @@ -80,14 +107,9 @@ func TestClusterUnaryServerInterceptor(t *testing.T) { handler := func(context.Context, interface{}) (interface{}, error) { return nil, nil } - - md := map[string][]string{ - MetadataClusterKey: {testCase.requestCluster}, - } - ctx := metadata.NewIncomingContext(context.Background(), md) info := &grpc.UnaryServerInfo{FullMethod: "/Test/Me"} req := createRequest(t) - _, err := interceptor(ctx, req, info, handler) + _, err := interceptor(testCase.incomingContext, req, info, handler) if testCase.expectedError == nil { require.NoError(t, err) } else { @@ -97,6 +119,17 @@ func TestClusterUnaryServerInterceptor(t *testing.T) { } } +func createIncomingContext(containsRequestCluster bool, requestCluster string) context.Context { + ctx := context.Background() + if !containsRequestCluster { + return ctx + } + md := map[string][]string{ + MetadataClusterKey: {requestCluster}, + } + return metadata.NewIncomingContext(ctx, md) +} + func createRequest(t *testing.T) *httpgrpc.HTTPRequest { r, err := http.NewRequest("POST", "/i/am/calling/you", strings.NewReader("some body")) require.NoError(t, err) From 0c7f04a17c0f1cda887f4103e346bc8a9f9c2756 Mon Sep 17 00:00:00 2001 From: Yuri Nikolic Date: Tue, 28 Jan 2025 16:52:16 +0100 Subject: [PATCH 6/7] Fixing reveiw findings Signed-off-by: Yuri Nikolic --- middleware/grpc_cluster.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/middleware/grpc_cluster.go b/middleware/grpc_cluster.go index 71ac46280..034056c57 100644 --- a/middleware/grpc_cluster.go +++ b/middleware/grpc_cluster.go @@ -32,9 +32,8 @@ func ClusterUnaryClientInterceptor(cluster string) grpc.UnaryClientInterceptor { // Otherwise, an error is returned. func ClusterUnaryServerInterceptor(cluster string, logger log.Logger) grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { - reqCluster, requestClusterFound := getClusterFromIncomingContext(ctx) - clustersConsistent := (cluster == "" && !requestClusterFound) || cluster == reqCluster - if !clustersConsistent { + reqCluster := getClusterFromIncomingContext(ctx) + if cluster != reqCluster { if reqCluster != cluster { msg := fmt.Sprintf("request intended for cluster %q - this is cluster %q", reqCluster, cluster) level.Warn(logger).Log("msg", msg) @@ -45,10 +44,10 @@ func ClusterUnaryServerInterceptor(cluster string, logger log.Logger) grpc.Unary } } -func getClusterFromIncomingContext(ctx context.Context) (string, bool) { +func getClusterFromIncomingContext(ctx context.Context) string { clusterIDs := metadata.ValueFromIncomingContext(ctx, MetadataClusterKey) if len(clusterIDs) != 1 { - return "", false + return "" } - return clusterIDs[0], true + return clusterIDs[0] } From 24629fd3598afc3517b895085823e7048b2dae1a Mon Sep 17 00:00:00 2001 From: Yuri Nikolic Date: Tue, 28 Jan 2025 18:30:12 +0100 Subject: [PATCH 7/7] Fixing review findings Signed-off-by: Yuri Nikolic --- middleware/grpc_cluster.go | 14 +++++++------- middleware/grpc_cluster_test.go | 8 ++++---- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/middleware/grpc_cluster.go b/middleware/grpc_cluster.go index 034056c57..a705e7550 100644 --- a/middleware/grpc_cluster.go +++ b/middleware/grpc_cluster.go @@ -32,21 +32,21 @@ func ClusterUnaryClientInterceptor(cluster string) grpc.UnaryClientInterceptor { // Otherwise, an error is returned. func ClusterUnaryServerInterceptor(cluster string, logger log.Logger) grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { - reqCluster := getClusterFromIncomingContext(ctx) + reqCluster := getClusterFromIncomingContext(ctx, logger) if cluster != reqCluster { - if reqCluster != cluster { - msg := fmt.Sprintf("request intended for cluster %q - this is cluster %q", reqCluster, cluster) - level.Warn(logger).Log("msg", msg) - return nil, status.Error(codes.FailedPrecondition, msg) - } + msg := fmt.Sprintf("request intended for cluster %q - this is cluster %q", reqCluster, cluster) + level.Warn(logger).Log("msg", msg) + return nil, status.Error(codes.FailedPrecondition, msg) } return handler(ctx, req) } } -func getClusterFromIncomingContext(ctx context.Context) string { +func getClusterFromIncomingContext(ctx context.Context, logger log.Logger) string { clusterIDs := metadata.ValueFromIncomingContext(ctx, MetadataClusterKey) if len(clusterIDs) != 1 { + msg := fmt.Sprintf("gRPC metadata should contain exactly 1 value for key \"%s\", but the current set of values is %v. Returning an empty string.", MetadataClusterKey, clusterIDs) + level.Warn(logger).Log("msg", msg) return "" } return clusterIDs[0] diff --git a/middleware/grpc_cluster_test.go b/middleware/grpc_cluster_test.go index 51a219291..436bc2698 100644 --- a/middleware/grpc_cluster_test.go +++ b/middleware/grpc_cluster_test.go @@ -75,25 +75,25 @@ func TestClusterUnaryServerInterceptor(t *testing.T) { serverCluster: "cluster", expectedError: status.Error(codes.FailedPrecondition, "request intended for cluster \"wrong-cluster\" - this is cluster \"cluster\""), }, - "empty request cluster and non-empty server custer give rise to an error": { + "empty request cluster and non-empty server cluster give rise to an error": { incomingContext: createIncomingContext(true, ""), requestCluster: "", serverCluster: "cluster", expectedError: status.Error(codes.FailedPrecondition, "request intended for cluster \"\" - this is cluster \"cluster\""), }, - "no request cluster and non-empty server custer give rise to an error": { + "no request cluster and non-empty server cluster give rise to an error": { incomingContext: createIncomingContext(false, ""), requestCluster: "", serverCluster: "cluster", expectedError: status.Error(codes.FailedPrecondition, "request intended for cluster \"\" - this is cluster \"cluster\""), }, - "empty request cluster and empty server custer give no error": { + "empty request cluster and empty server cluster give no error": { incomingContext: createIncomingContext(true, ""), requestCluster: "", serverCluster: "", expectedError: nil, }, - "no request cluster and empty server custer give no error": { + "no request cluster and empty server cluster give no error": { incomingContext: createIncomingContext(false, ""), requestCluster: "", serverCluster: "",