From 46263bd8f8d6d18ae8f4450db68a400469a0e03a Mon Sep 17 00:00:00 2001 From: Elias Van Ootegem Date: Fri, 22 Sep 2023 14:48:01 +0100 Subject: [PATCH 1/7] feat: add trusted proxy support to XFF check Signed-off-by: Elias Van Ootegem --- CHANGELOG.md | 1 + datanode/api/config.go | 2 ++ datanode/api/errors.go | 2 ++ datanode/api/server.go | 64 ++++++++++++++++++++++++---------- datanode/gateway/middleware.go | 7 ++-- 5 files changed, 56 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b91c1396edf..ba08a338f9b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -107,6 +107,7 @@ - [8764](https://github.com/vegaprotocol/vega/issues/8764) - Include funding payment in margin and liquidation price estimates for `PERPS`. - [9519](https://github.com/vegaprotocol/vega/issues/9519) - Fix `oracle_specs` data in the `database` that was inadvertently removed during an earlier database migration - [9478](https://github.com/vegaprotocol/vega/issues/8764) - Add SLA statistics to market data and liquidity provision APIs. +- [8979](https://github.com/vegaprotocol/vega/issues/8979) - Add trusted proxy config and verification for `XFF` header. ### 🐛 Fixes diff --git a/datanode/api/config.go b/datanode/api/config.go index 24a4cc7d046..913465087cb 100644 --- a/datanode/api/config.go +++ b/datanode/api/config.go @@ -38,6 +38,7 @@ type Config struct { CoreNodeGRPCPort int `long:"core-node-grpc-port"` RateLimit ratelimit.Config `group:"rate-limits"` MaxSubscriptionPerClient uint32 `long:"max-subscription-per-client"` + TrustedProxies []string `long:"trusted-proxy"` } // NewDefaultConfig creates an instance of the package specific configuration, given a @@ -57,5 +58,6 @@ func NewDefaultConfig() Config { CoreNodeGRPCPort: 3002, RateLimit: ratelimit.NewDefaultConfig(), MaxSubscriptionPerClient: 250, + TrustedProxies: []string{}, } } diff --git a/datanode/api/errors.go b/datanode/api/errors.go index dbbd70d34ed..fd50da4c1d9 100644 --- a/datanode/api/errors.go +++ b/datanode/api/errors.go @@ -23,6 +23,8 @@ import ( // API Errors and descriptions. var ( + // ErrNoTrustedProxy indactes a forwarded request that did not pass through a trusted proxy + ErrNoTrustedProxy = errors.New("forwarded requests need to pass through a trusted proxy") // ErrChannelClosed signals that the channel streaming data is closed. ErrChannelClosed = errors.New("channel closed") // ErrNotAValidVegaID signals an invalid id. diff --git a/datanode/api/server.go b/datanode/api/server.go index 905b07a82d3..dc76fdca87b 100644 --- a/datanode/api/server.go +++ b/datanode/api/server.go @@ -41,9 +41,11 @@ import ( "github.com/fullstorydev/grpcui/standalone" "golang.org/x/sync/errgroup" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" "google.golang.org/grpc/reflection" + "google.golang.org/grpc/status" ) // EventService ... @@ -136,6 +138,8 @@ type GRPCServer struct { // used in order to gracefully close streams ctx context.Context cfunc context.CancelFunc + + trustedProxies map[string]struct{} } // NewGRPCServer create a new instance of the GPRC api for the vega node. @@ -192,6 +196,10 @@ func NewGRPCServer( log = log.Named(namedLogger) log.SetLevel(config.Level.Get()) ctx, cfunc := context.WithCancel(context.Background()) + tps := make(map[string]struct{}, len(config.TrustedProxies)) + for _, ip := range config.TrustedProxies { + tps[ip] = struct{}{} + } return &GRPCServer{ log: log, @@ -247,8 +255,9 @@ func NewGRPCServer( eventService: eventService, Config: config, }, - ctx: ctx, - cfunc: cfunc, + ctx: ctx, + cfunc: cfunc, + trustedProxies: tps, } } @@ -262,24 +271,40 @@ func (g *GRPCServer) ReloadConf(cfg Config) { ) g.log.SetLevel(cfg.Level.Get()) } + tps := make(map[string]struct{}, len(cfg.TrustedProxies)) + for _, ip := range cfg.TrustedProxies { + tps[ip] = struct{}{} + } // TODO(): not updating the actual server for now, may need to look at this later // e.g restart the http server on another port or whatever g.Config = cfg + g.trustedProxies = tps } -func ipFromContext(ctx context.Context, method string, log *logging.Logger) string { +func (g *GRPCServer) ipFromContext(ctx context.Context, method string, log *logging.Logger) (string, error) { // first check if the request is forwarded from our restproxy // get the metadata - md, ok := metadata.FromIncomingContext(ctx) - if ok { - forwardedFor, ok := md["x-forwarded-for"] - if ok && len(forwardedFor) > 0 { - log.Debug("grpc request x-forwarded-for", - logging.String("method", method), - logging.String("remote-ip-addr", forwardedFor[0]), - ) - return forwardedFor[0] + tps := g.trustedProxies + if len(tps) > 0 { + if md, ok := metadata.FromIncomingContext(ctx); ok { + if forwardedFor, ok := md["x-forwarded-for"]; ok { + if len(forwardedFor) < 2 { + // this should return an error + return "", ErrNoTrustedProxy + } + // check the proxies for trusted + for _, pip := range forwardedFor[1:] { + // trusted proxy found, return + if _, ok := tps[pip]; ok { + log.Debug("grpc request x-forwarded-for", + logging.String("method", method), + logging.String("remote-ip-addr", forwardedFor[0]), + ) + return forwardedFor[0], nil + } + } + } } } @@ -289,20 +314,23 @@ func ipFromContext(ctx context.Context, method string, log *logging.Logger) stri log.Debug("grpc peer client request", logging.String("method", method), logging.String("remote-ip-addr", p.Addr.String())) - return p.Addr.String() + return p.Addr.String(), nil } - return "" + return "", nil } -func remoteAddrInterceptor(log *logging.Logger) grpc.UnaryServerInterceptor { +func (g *GRPCServer) remoteAddrInterceptor(log *logging.Logger) grpc.UnaryServerInterceptor { return func( ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler, ) (resp interface{}, err error) { - ip := ipFromContext(ctx, info.FullMethod, log) + ip, err := g.ipFromContext(ctx, info.FullMethod, log) + if err != nil { + return nil, status.Error(codes.PermissionDenied, err.Error()) + } ctx = contextutil.WithRemoteIPAddr(ctx, ip) @@ -385,12 +413,12 @@ func (g *GRPCServer) Start(ctx context.Context, lis net.Listener) error { rateLimit := ratelimit.NewFromConfig(&g.RateLimit, g.log) intercept := grpc.ChainUnaryInterceptor( - remoteAddrInterceptor(g.log), + g.remoteAddrInterceptor(g.log), headersInterceptor(g.blockService.GetLastBlock, g.log), rateLimit.GRPCInterceptor, ) - streamIntercept := grpc.StreamInterceptor(subscriptionRateLimiter.WithGrpcInterceptor(ipFromContext)) + streamIntercept := grpc.StreamInterceptor(subscriptionRateLimiter.WithGrpcInterceptor(g.ipFromContext)) g.srv = grpc.NewServer(intercept, streamIntercept) diff --git a/datanode/gateway/middleware.go b/datanode/gateway/middleware.go index 647482c0bd5..1da06e0a7bb 100644 --- a/datanode/gateway/middleware.go +++ b/datanode/gateway/middleware.go @@ -192,11 +192,14 @@ func (s *SubscriptionRateLimiter) WithSubscriptionRateLimiter(next http.Handler) }) } -type ipGetter func(ctx context.Context, method string, log *logging.Logger) string +type ipGetter func(ctx context.Context, method string, log *logging.Logger) (string, error) func (s *SubscriptionRateLimiter) WithGrpcInterceptor(ipGetterFunc ipGetter) grpc.StreamServerInterceptor { return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { - addr := ipGetterFunc(ss.Context(), info.FullMethod, s.log) + addr, err := ipGetterFunc(ss.Context(), info.FullMethod, s.log) + if err != nil { + return status.Error(codes.PermissionDenied, err.Error()) + } if addr == "" { // If we don't have an IP we can't rate limit return handler(srv, ss) From 5d97a226b4cbdeeabff3cd0ca11d36c3ad0ac13c Mon Sep 17 00:00:00 2001 From: Elias Van Ootegem Date: Thu, 21 Sep 2023 12:35:28 +0100 Subject: [PATCH 2/7] feat: linter issue Signed-off-by: Elias Van Ootegem --- datanode/api/errors.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datanode/api/errors.go b/datanode/api/errors.go index fd50da4c1d9..0966fb87f43 100644 --- a/datanode/api/errors.go +++ b/datanode/api/errors.go @@ -23,7 +23,7 @@ import ( // API Errors and descriptions. var ( - // ErrNoTrustedProxy indactes a forwarded request that did not pass through a trusted proxy + // ErrNoTrustedProxy indactes a forwarded request that did not pass through a trusted proxy. ErrNoTrustedProxy = errors.New("forwarded requests need to pass through a trusted proxy") // ErrChannelClosed signals that the channel streaming data is closed. ErrChannelClosed = errors.New("channel closed") From 8479f5d64404e77d63441d0f9231f83b4b4ddbba Mon Sep 17 00:00:00 2001 From: Elias Van Ootegem Date: Fri, 22 Sep 2023 15:28:35 +0100 Subject: [PATCH 3/7] feat: move changelog to breaking changes, default to localhost Signed-off-by: Elias Van Ootegem --- CHANGELOG.md | 2 +- datanode/api/config.go | 2 +- datanode/api/server.go | 3 +-- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ba08a338f9b..eebd6924d9d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ - [8679](https://github.com/vegaprotocol/vega/issues/8679) - Snapshot configuration `load-from-block-height` no longer accept `-1` as value. To reload from the latest snapshot, it should be valued `0`. - [8679](https://github.com/vegaprotocol/vega/issues/8679) - Snapshot configuration `snapshot-keep-recent` only accept values from `1` (included) to `10` (included) . - [8944](https://github.com/vegaprotocol/vega/issues/8944) - Asset ID field on the `ExportLedgerEntriesRequest gRPC API` for exporting ledger entries has changed type to make it optional. +- [8979](https://github.com/vegaprotocol/vega/issues/8979) - Add trusted proxy config and verification for `XFF` header. ### 🗑️ Deprecation @@ -107,7 +108,6 @@ - [8764](https://github.com/vegaprotocol/vega/issues/8764) - Include funding payment in margin and liquidation price estimates for `PERPS`. - [9519](https://github.com/vegaprotocol/vega/issues/9519) - Fix `oracle_specs` data in the `database` that was inadvertently removed during an earlier database migration - [9478](https://github.com/vegaprotocol/vega/issues/8764) - Add SLA statistics to market data and liquidity provision APIs. -- [8979](https://github.com/vegaprotocol/vega/issues/8979) - Add trusted proxy config and verification for `XFF` header. ### 🐛 Fixes diff --git a/datanode/api/config.go b/datanode/api/config.go index 913465087cb..5e7c1af1f52 100644 --- a/datanode/api/config.go +++ b/datanode/api/config.go @@ -58,6 +58,6 @@ func NewDefaultConfig() Config { CoreNodeGRPCPort: 3002, RateLimit: ratelimit.NewDefaultConfig(), MaxSubscriptionPerClient: 250, - TrustedProxies: []string{}, + TrustedProxies: []string{"127.0.0.1"}, } } diff --git a/datanode/api/server.go b/datanode/api/server.go index dc76fdca87b..d0a5590c99b 100644 --- a/datanode/api/server.go +++ b/datanode/api/server.go @@ -284,13 +284,12 @@ func (g *GRPCServer) ReloadConf(cfg Config) { func (g *GRPCServer) ipFromContext(ctx context.Context, method string, log *logging.Logger) (string, error) { // first check if the request is forwarded from our restproxy - // get the metadata tps := g.trustedProxies if len(tps) > 0 { + // get the metadata if md, ok := metadata.FromIncomingContext(ctx); ok { if forwardedFor, ok := md["x-forwarded-for"]; ok { if len(forwardedFor) < 2 { - // this should return an error return "", ErrNoTrustedProxy } // check the proxies for trusted From a0741e568c34e0d82ca2ae632c1310bd61c9a534 Mon Sep 17 00:00:00 2001 From: Elias Van Ootegem Date: Fri, 22 Sep 2023 20:23:59 +0100 Subject: [PATCH 4/7] feat: clear trusted proxies for integration tests Signed-off-by: Elias Van Ootegem --- datanode/integration/integration_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/datanode/integration/integration_test.go b/datanode/integration/integration_test.go index dcc294c9d49..0e4f23b7fe5 100644 --- a/datanode/integration/integration_test.go +++ b/datanode/integration/integration_test.go @@ -251,6 +251,7 @@ func newTestConfig(postgresRuntimePath string) (*config.Config, error) { } cfg := config.NewDefaultConfig() + cfg.API.TrustedProxies = []string{} cfg.Broker.UseEventFile = true cfg.Broker.PanicOnError = true cfg.Broker.FileEventSourceConfig.Directory = filepath.Join(cwd, eventsDir) From d3fa322eca4feff9a2dae74d213f545546dcece2 Mon Sep 17 00:00:00 2001 From: Elias Van Ootegem Date: Mon, 25 Sep 2023 10:07:11 +0100 Subject: [PATCH 5/7] feat: move trusted proxies to rate-limit config Signed-off-by: Elias Van Ootegem --- datanode/api/config.go | 2 -- datanode/api/server.go | 8 ++++---- datanode/integration/integration_test.go | 2 +- datanode/ratelimit/config.go | 22 ++++++++++++---------- 4 files changed, 17 insertions(+), 17 deletions(-) diff --git a/datanode/api/config.go b/datanode/api/config.go index 5e7c1af1f52..24a4cc7d046 100644 --- a/datanode/api/config.go +++ b/datanode/api/config.go @@ -38,7 +38,6 @@ type Config struct { CoreNodeGRPCPort int `long:"core-node-grpc-port"` RateLimit ratelimit.Config `group:"rate-limits"` MaxSubscriptionPerClient uint32 `long:"max-subscription-per-client"` - TrustedProxies []string `long:"trusted-proxy"` } // NewDefaultConfig creates an instance of the package specific configuration, given a @@ -58,6 +57,5 @@ func NewDefaultConfig() Config { CoreNodeGRPCPort: 3002, RateLimit: ratelimit.NewDefaultConfig(), MaxSubscriptionPerClient: 250, - TrustedProxies: []string{"127.0.0.1"}, } } diff --git a/datanode/api/server.go b/datanode/api/server.go index d0a5590c99b..a83ed0e1ecd 100644 --- a/datanode/api/server.go +++ b/datanode/api/server.go @@ -196,8 +196,8 @@ func NewGRPCServer( log = log.Named(namedLogger) log.SetLevel(config.Level.Get()) ctx, cfunc := context.WithCancel(context.Background()) - tps := make(map[string]struct{}, len(config.TrustedProxies)) - for _, ip := range config.TrustedProxies { + tps := make(map[string]struct{}, len(config.RateLimit.TrustedProxies)) + for _, ip := range config.RateLimit.TrustedProxies { tps[ip] = struct{}{} } @@ -271,8 +271,8 @@ func (g *GRPCServer) ReloadConf(cfg Config) { ) g.log.SetLevel(cfg.Level.Get()) } - tps := make(map[string]struct{}, len(cfg.TrustedProxies)) - for _, ip := range cfg.TrustedProxies { + tps := make(map[string]struct{}, len(cfg.RateLimit.TrustedProxies)) + for _, ip := range cfg.RateLimit.TrustedProxies { tps[ip] = struct{}{} } diff --git a/datanode/integration/integration_test.go b/datanode/integration/integration_test.go index 0e4f23b7fe5..a8e297478ff 100644 --- a/datanode/integration/integration_test.go +++ b/datanode/integration/integration_test.go @@ -251,7 +251,7 @@ func newTestConfig(postgresRuntimePath string) (*config.Config, error) { } cfg := config.NewDefaultConfig() - cfg.API.TrustedProxies = []string{} + cfg.API.RateLimit.TrustedProxies = []string{} cfg.Broker.UseEventFile = true cfg.Broker.PanicOnError = true cfg.Broker.FileEventSourceConfig.Directory = filepath.Join(cwd, eventsDir) diff --git a/datanode/ratelimit/config.go b/datanode/ratelimit/config.go index fa0784913d8..2084f1a2868 100644 --- a/datanode/ratelimit/config.go +++ b/datanode/ratelimit/config.go @@ -7,19 +7,21 @@ import ( ) type Config struct { - Enabled bool `description:"Enable rate limit of API requests per IP address. Based on a 'token bucket' algorithm" long:"enabled"` - Rate float64 `description:"Refill rate of token bucket; maximum average request rate" long:"rate"` - Burst int `description:"Size of token bucket; maximum number of requests in short time window" long:"burst"` - TTL encoding.Duration `description:"Time after which inactive token buckets are reset" long:"ttl"` - BanFor encoding.Duration `description:"If IP continues to make requests after passing rate limit threshold, ban for this duration. Setting to 0 seconds disables banning." long:"banfor"` + Enabled bool `description:"Enable rate limit of API requests per IP address. Based on a 'token bucket' algorithm" long:"enabled"` + TrustedProxies []string `description:"specify a trusted proxy for forwarded requests" long:"trusted-proxy"` + Rate float64 `description:"Refill rate of token bucket; maximum average request rate" long:"rate"` + Burst int `description:"Size of token bucket; maximum number of requests in short time window" long:"burst"` + TTL encoding.Duration `description:"Time after which inactive token buckets are reset" long:"ttl"` + BanFor encoding.Duration `description:"If IP continues to make requests after passing rate limit threshold, ban for this duration. Setting to 0 seconds disables banning." long:"banfor"` } func NewDefaultConfig() Config { return Config{ - Enabled: true, - Rate: 20, - Burst: 100, - TTL: encoding.Duration{Duration: time.Hour}, - BanFor: encoding.Duration{Duration: 10 * time.Minute}, + Enabled: true, + TrustedProxies: []string{"127.0.0.1"}, + Rate: 20, + Burst: 100, + TTL: encoding.Duration{Duration: time.Hour}, + BanFor: encoding.Duration{Duration: 10 * time.Minute}, } } From 147d449632d65b39e1c6ff42b2fecafb36c0db88 Mon Sep 17 00:00:00 2001 From: Elias Van Ootegem Date: Mon, 25 Sep 2023 11:28:15 +0100 Subject: [PATCH 6/7] fix: do not reject request based on XFF payload alone Signed-off-by: Elias Van Ootegem --- datanode/api/server.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/datanode/api/server.go b/datanode/api/server.go index a83ed0e1ecd..4f1d64a599f 100644 --- a/datanode/api/server.go +++ b/datanode/api/server.go @@ -288,10 +288,10 @@ func (g *GRPCServer) ipFromContext(ctx context.Context, method string, log *logg if len(tps) > 0 { // get the metadata if md, ok := metadata.FromIncomingContext(ctx); ok { - if forwardedFor, ok := md["x-forwarded-for"]; ok { - if len(forwardedFor) < 2 { - return "", ErrNoTrustedProxy - } + // if trusted proxies are specified, the XFF header will be used to rate-limit the IP + // for which the request is forwarded. If no proxies are specified, or no trusted proxies + // are found, the peer is rate limited. + if forwardedFor, ok := md["x-forwarded-for"]; ok && len(forwardedFor) >= 2 { // check the proxies for trusted for _, pip := range forwardedFor[1:] { // trusted proxy found, return From 190168e43fbd648cfa7a620e0c87b678b320e566 Mon Sep 17 00:00:00 2001 From: Elias Van Ootegem Date: Mon, 25 Sep 2023 11:44:36 +0100 Subject: [PATCH 7/7] fix: remove breaking flag, no need for XFF changes to integration tests Signed-off-by: Elias Van Ootegem --- CHANGELOG.md | 2 +- datanode/integration/integration_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index eebd6924d9d..d3fac70bc62 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,7 +7,6 @@ - [8679](https://github.com/vegaprotocol/vega/issues/8679) - Snapshot configuration `load-from-block-height` no longer accept `-1` as value. To reload from the latest snapshot, it should be valued `0`. - [8679](https://github.com/vegaprotocol/vega/issues/8679) - Snapshot configuration `snapshot-keep-recent` only accept values from `1` (included) to `10` (included) . - [8944](https://github.com/vegaprotocol/vega/issues/8944) - Asset ID field on the `ExportLedgerEntriesRequest gRPC API` for exporting ledger entries has changed type to make it optional. -- [8979](https://github.com/vegaprotocol/vega/issues/8979) - Add trusted proxy config and verification for `XFF` header. ### 🗑️ Deprecation @@ -216,6 +215,7 @@ - [9489](https://github.com/vegaprotocol/vega/issues/9489) - A referrer cannot join another team. - [9074](https://github.com/vegaprotocol/vega/issues/9074) - Fix error response for `CSV` exports. - [9512](https://github.com/vegaprotocol/vega/issues/9512) - Allow hysteresis period to be set to 0. +- [8979](https://github.com/vegaprotocol/vega/issues/8979) - Add trusted proxy config and verification for `XFF` header. ## 0.72.1 diff --git a/datanode/integration/integration_test.go b/datanode/integration/integration_test.go index a8e297478ff..9afe0a458e3 100644 --- a/datanode/integration/integration_test.go +++ b/datanode/integration/integration_test.go @@ -251,7 +251,7 @@ func newTestConfig(postgresRuntimePath string) (*config.Config, error) { } cfg := config.NewDefaultConfig() - cfg.API.RateLimit.TrustedProxies = []string{} + // cfg.API.RateLimit.TrustedProxies = []string{} cfg.Broker.UseEventFile = true cfg.Broker.PanicOnError = true cfg.Broker.FileEventSourceConfig.Directory = filepath.Join(cwd, eventsDir)