Skip to content

Commit

Permalink
gRPC: reject request if clock skew is too large (#7686)
Browse files Browse the repository at this point in the history
Have our gRPC server interceptor check for excessive clock skew between
its own clock and gRPC client clocks. Do this by taking advantage of the
client request timestamp that most clients already supply for the
purpose of measuring cross-service latency. If the included timestamp is
more than 10 minutes from the gRPC server's local time, immediately
error out.

To keep the integration tests -- which heavily rely on clock
manipulation -- working, use build tags to disable this behavior during
integration testing.

Fixes #7684
  • Loading branch information
aarongable committed Aug 29, 2024
1 parent da7865c commit e5731a4
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 14 deletions.
2 changes: 1 addition & 1 deletion cmd/clock_integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func Clock() clock.Clock {

cl := clock.NewFake()
cl.Set(targetTime)
blog.Get().Infof("Time was set to %v via FAKECLOCK", targetTime)
blog.Get().Debugf("Time was set to %v via FAKECLOCK", targetTime)
return cl
}

Expand Down
46 changes: 33 additions & 13 deletions grpc/interceptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,12 @@ func (smi *serverMetadataInterceptor) Unary(
return nil, berrors.InternalServerError("passed nil *grpc.UnaryServerInfo")
}

// Extract the grpc metadata from the context. If the context has
// a `clientRequestTimeKey` field, and it has a value, then observe the RPC
// latency with Prometheus.
if md, ok := metadata.FromIncomingContext(ctx); ok && len(md[clientRequestTimeKey]) > 0 {
err := smi.observeLatency(md[clientRequestTimeKey][0])
// Extract the grpc metadata from the context, and handle the client request
// timestamp embedded in it. It's okay if the timestamp is missing, since some
// clients (like nomad's health-checker) don't set it.
md, ok := metadata.FromIncomingContext(ctx)
if ok && len(md[clientRequestTimeKey]) > 0 {
err := smi.checkLatency(md[clientRequestTimeKey][0])
if err != nil {
return nil, err
}
Expand All @@ -96,6 +97,9 @@ func (smi *serverMetadataInterceptor) Unary(
// opposed to "RA.NewCertificate timed out" (causing a 500).
// Once we've shaved the deadline, we ensure we have we have at least another
// 100ms left to do work; otherwise we abort early.
// Note that these computations use the global clock (time.Now) instead of
// the local clock (smi.clk.Now) because context.WithTimeout also uses the
// global clock.
deadline, ok := ctx.Deadline()
// Should never happen: there was no deadline.
if !ok {
Expand Down Expand Up @@ -137,11 +141,12 @@ func (smi *serverMetadataInterceptor) Stream(
handler grpc.StreamHandler) error {
ctx := ss.Context()

// Extract the grpc metadata from the context. If the context has
// a `clientRequestTimeKey` field, and it has a value, then observe the RPC
// latency with Prometheus.
if md, ok := metadata.FromIncomingContext(ctx); ok && len(md[clientRequestTimeKey]) > 0 {
err := smi.observeLatency(md[clientRequestTimeKey][0])
// Extract the grpc metadata from the context, and handle the client request
// timestamp embedded in it. It's okay if the timestamp is missing, since some
// clients (like nomad's health-checker) don't set it.
md, ok := metadata.FromIncomingContext(ctx)
if ok && len(md[clientRequestTimeKey]) > 0 {
err := smi.checkLatency(md[clientRequestTimeKey][0])
if err != nil {
return err
}
Expand All @@ -155,6 +160,9 @@ func (smi *serverMetadataInterceptor) Stream(
// opposed to "RA.NewCertificate timed out" (causing a 500).
// Once we've shaved the deadline, we ensure we have we have at least another
// 100ms left to do work; otherwise we abort early.
// Note that these computations use the global clock (time.Now) instead of
// the local clock (smi.clk.Now) because context.WithTimeout also uses the
// global clock.
deadline, ok := ctx.Deadline()
// Should never happen: there was no deadline.
if !ok {
Expand Down Expand Up @@ -190,12 +198,13 @@ func splitMethodName(fullMethodName string) (string, string) {
return "unknown", "unknown"
}

// observeLatency is called with the `clientRequestTimeKey` value from
// checkLatency is called with the `clientRequestTimeKey` value from
// a request's gRPC metadata. This string value is converted to a timestamp and
// used to calculate the latency between send and receive time. The latency is
// published to the server interceptor's rpcLag prometheus histogram. An error
// is returned if the `clientReqTime` string is not a valid timestamp.
func (smi *serverMetadataInterceptor) observeLatency(clientReqTime string) error {
// is returned if the `clientReqTime` string is not a valid timestamp, or if
// the latency is so large that it indicates dangerous levels of clock skew.
func (smi *serverMetadataInterceptor) checkLatency(clientReqTime string) error {
// Convert the metadata request time into an int64
reqTimeUnixNanos, err := strconv.ParseInt(clientReqTime, 10, 64)
if err != nil {
Expand All @@ -205,6 +214,17 @@ func (smi *serverMetadataInterceptor) observeLatency(clientReqTime string) error
// Calculate the elapsed time since the client sent the RPC
reqTime := time.Unix(0, reqTimeUnixNanos)
elapsed := smi.clk.Since(reqTime)

// If the elapsed time is very large, that indicates it is probably due to
// clock skew rather than simple latency. Refuse to handle the request, since
// accurate timekeeping is critical to CA operations and large skew indicates
// something has gone very wrong.
if tooSkewed(elapsed) {
return fmt.Errorf(
"gRPC client reported a very different time: %s (client) vs %s (this server)",
reqTime, smi.clk.Now())
}

// Publish an RPC latency observation to the histogram
smi.metrics.rpcLag.Observe(elapsed.Seconds())
return nil
Expand Down
56 changes: 56 additions & 0 deletions grpc/interceptors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,62 @@ func TestRequestTimeTagging(t *testing.T) {
test.AssertMetricWithLabelsEquals(t, si.metrics.rpcLag, prometheus.Labels{}, 1)
}

func TestClockSkew(t *testing.T) {
// Create two separate clocks for the client and server
serverClk := clock.NewFake()
serverClk.Set(time.Now())
clientClk := clock.NewFake()
clientClk.Set(time.Now())

// Listen for TCP requests on a random system assigned port number
lis, err := net.Listen("tcp", ":0")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
port := lis.Addr().(*net.TCPAddr).Port

// Start a gRPC server listening on that port
serverMetrics, err := newServerMetrics(metrics.NoopRegisterer)
test.AssertNotError(t, err, "creating server metrics")
si := newServerMetadataInterceptor(serverMetrics, serverClk)
s := grpc.NewServer(grpc.UnaryInterceptor(si.Unary))
test_proto.RegisterChillerServer(s, &testServer{})
go func() { _ = s.Serve(lis) }()
defer s.Stop()

// Start a gRPC client talking to the server
clientMetrics, err := newClientMetrics(metrics.NoopRegisterer)
test.AssertNotError(t, err, "creating client metrics")
ci := &clientMetadataInterceptor{metrics: clientMetrics, clk: clientClk, timeout: time.Second}
conn, err := grpc.NewClient(
net.JoinHostPort("localhost", strconv.Itoa(port)),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithUnaryInterceptor(ci.Unary),
)
test.AssertNotError(t, err, "creating test client")
client := test_proto.NewChillerClient(conn)

// Create a context with plenty of timeout
ctx, cancel := context.WithDeadline(context.Background(), clientClk.Now().Add(10*time.Second))
defer cancel()

// Attempt a gRPC request which should succeed
_, err = client.Chill(ctx, &test_proto.Time{Duration: durationpb.New(100 * time.Millisecond)})
test.AssertNotError(t, err, "should succeed with no skew")

// Skew the client clock forward and the request should fail due to skew
clientClk.Add(time.Hour)
_, err = client.Chill(ctx, &test_proto.Time{Duration: durationpb.New(100 * time.Millisecond)})
test.AssertError(t, err, "should fail with positive client skew")
test.AssertContains(t, err.Error(), "very different time")

// Skew the server clock forward and the request should fail due to skew
serverClk.Add(2 * time.Hour)
_, err = client.Chill(ctx, &test_proto.Time{Duration: durationpb.New(100 * time.Millisecond)})
test.AssertError(t, err, "should fail with negative client skew")
test.AssertContains(t, err.Error(), "very different time")
}

// blockedServer implements a ChillerServer with a Chill method that:
// 1. Calls Done() on the received waitgroup when receiving an RPC
// 2. Blocks the RPC on the roadblock waitgroup
Expand Down
13 changes: 13 additions & 0 deletions grpc/skew.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
//go:build !integration

package grpc

import "time"

// tooSkewed returns true if the absolute value of the input duration is more
// than ten minutes. We break this out into a separate function so that it can
// be disabled in the integration tests, which make extensive use of fake
// clocks.
func tooSkewed(skew time.Duration) bool {
return skew > 10*time.Minute || skew < -10*time.Minute
}
12 changes: 12 additions & 0 deletions grpc/skew_integration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
//go:build integration

package grpc

import "time"

// tooSkewed always returns false, but is only built when the integration build
// flag is set. We use this to replace the real tooSkewed function in the
// integration tests, which make extensive use of fake clocks.
func tooSkewed(_ time.Duration) bool {
return false
}

0 comments on commit e5731a4

Please sign in to comment.