Skip to content

Commit b3379e2

Browse files
authored
[v17] Periodically check connectivity between peer proxies (#48921)
* Add ping RPC to grpc proxy peering * Periodically check connectivity between peer proxies * Replace deprecated jitter functions
1 parent 8dbee6d commit b3379e2

File tree

8 files changed

+549
-48
lines changed

8 files changed

+549
-48
lines changed

api/client/proto/proxyservice.pb.go

Lines changed: 330 additions & 29 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/proto/teleport/legacy/client/proto/proxyservice.proto

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ option go_package = "github.com/gravitational/teleport/api/client/proto";
2424
service ProxyService {
2525
// DialNode opens a bidrectional stream to the requested node.
2626
rpc DialNode(stream Frame) returns (stream Frame);
27+
28+
// Ping checks if the peer is reachable and responsive.
29+
rpc Ping(ProxyServicePingRequest) returns (ProxyServicePingResponse);
2730
}
2831

2932
// Frame wraps different message types to be sent over a stream.
@@ -63,3 +66,7 @@ message Data {
6366

6467
// ConnectionEstablished signals to the client a connection to the node has been established.
6568
message ConnectionEstablished {}
69+
70+
message ProxyServicePingRequest {}
71+
72+
message ProxyServicePingResponse {}

lib/proxy/peer/client.go

Lines changed: 73 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -169,15 +169,19 @@ type grpcClientConn struct {
169169
cc *grpc.ClientConn
170170
metrics *clientMetrics
171171

172-
id string
173-
addr string
172+
id string
173+
addr string
174+
host string
175+
group string
174176

175177
// if closing is set, count is not allowed to increase from zero; upon
176178
// reaching zero, cond should be broadcast
177179
mu sync.Mutex
178180
cond sync.Cond
179181
closing bool
180182
count int
183+
184+
pingCancel context.CancelFunc
181185
}
182186

183187
var _ internal.ClientConn = (*grpcClientConn)(nil)
@@ -211,7 +215,7 @@ func (c *grpcClientConn) maybeAcquire() (release func()) {
211215

212216
// Shutdown implements [internal.ClientConn].
213217
func (c *grpcClientConn) Shutdown(ctx context.Context) {
214-
defer c.cc.Close()
218+
defer c.Close()
215219

216220
c.mu.Lock()
217221
defer c.mu.Unlock()
@@ -232,9 +236,25 @@ func (c *grpcClientConn) Shutdown(ctx context.Context) {
232236

233237
// Close implements [internal.ClientConn].
234238
func (c *grpcClientConn) Close() error {
239+
c.pingCancel()
235240
return c.cc.Close()
236241
}
237242

243+
// Ping implements [internal.ClientConn].
244+
func (c *grpcClientConn) Ping(ctx context.Context) error {
245+
release := c.maybeAcquire()
246+
if release == nil {
247+
return trace.ConnectionProblem(nil, "error starting stream: connection is shutting down")
248+
}
249+
defer release()
250+
251+
_, err := clientapi.NewProxyServiceClient(c.cc).Ping(ctx, new(clientapi.ProxyServicePingRequest))
252+
if trace.IsNotImplemented(err) {
253+
err = nil
254+
}
255+
return trace.Wrap(err)
256+
}
257+
238258
// Dial implements [internal.ClientConn].
239259
func (c *grpcClientConn) Dial(
240260
nodeID string,
@@ -461,7 +481,14 @@ func (c *Client) updateConnections(proxies []types.Server) error {
461481

462482
// establish new connections
463483
supportsQUIC, _ := proxy.GetLabel(types.UnstableProxyPeerQUICLabel)
464-
conn, err := c.connect(id, proxy.GetPeerAddr(), supportsQUIC == "yes")
484+
proxyGroup, _ := proxy.GetLabel(types.ProxyGroupIDLabel)
485+
conn, err := c.connect(connectParams{
486+
peerID: id,
487+
peerAddr: proxy.GetPeerAddr(),
488+
peerHost: proxy.GetHostname(),
489+
peerGroup: proxyGroup,
490+
supportsQUIC: supportsQUIC == "yes",
491+
})
465492
if err != nil {
466493
c.metrics.reportTunnelError(errorProxyPeerTunnelDial)
467494
c.config.Log.DebugContext(c.ctx, "error dialing peer proxy", "peer_id", id, "peer_addr", proxy.GetPeerAddr())
@@ -662,7 +689,14 @@ func (c *Client) getConnections(proxyIDs []string) ([]internal.ClientConn, bool,
662689
}
663690

664691
supportsQUIC, _ := proxy.GetLabel(types.UnstableProxyPeerQUICLabel)
665-
conn, err := c.connect(id, proxy.GetPeerAddr(), supportsQUIC == "yes")
692+
proxyGroup, _ := proxy.GetLabel(types.ProxyGroupIDLabel)
693+
conn, err := c.connect(connectParams{
694+
peerID: id,
695+
peerAddr: proxy.GetPeerAddr(),
696+
peerHost: proxy.GetHostname(),
697+
peerGroup: proxyGroup,
698+
supportsQUIC: supportsQUIC == "yes",
699+
})
666700
if err != nil {
667701
c.metrics.reportTunnelError(errorProxyPeerTunnelDirectDial)
668702
c.config.Log.DebugContext(c.ctx, "error direct dialing peer proxy", "peer_id", id, "peer_addr", proxy.GetPeerAddr())
@@ -689,9 +723,17 @@ func (c *Client) getConnections(proxyIDs []string) ([]internal.ClientConn, bool,
689723
return conns, false, nil
690724
}
691725

692-
// connect dials a new connection to proxyAddr.
693-
func (c *Client) connect(peerID string, peerAddr string, supportsQUIC bool) (internal.ClientConn, error) {
694-
if supportsQUIC && c.config.QUICTransport != nil {
726+
type connectParams struct {
727+
peerID string
728+
peerAddr string
729+
peerHost string
730+
peerGroup string
731+
supportsQUIC bool
732+
}
733+
734+
// connect dials a new connection to a peer proxy with the given ID and address.
735+
func (c *Client) connect(params connectParams) (internal.ClientConn, error) {
736+
if params.supportsQUIC && c.config.QUICTransport != nil {
695737
panic("QUIC proxy peering is not implemented")
696738
}
697739
tlsConfig := utils.TLSConfig(c.config.TLSCipherSuites)
@@ -706,11 +748,11 @@ func (c *Client) connect(peerID string, peerAddr string, supportsQUIC bool) (int
706748
tlsConfig.InsecureSkipVerify = true
707749
tlsConfig.VerifyConnection = utils.VerifyConnectionWithRoots(c.config.GetTLSRoots)
708750

709-
expectedPeer := authclient.HostFQDN(peerID, c.config.ClusterName)
751+
expectedPeer := authclient.HostFQDN(params.peerID, c.config.ClusterName)
710752

711753
conn, err := grpc.Dial(
712-
peerAddr,
713-
grpc.WithTransportCredentials(newClientCredentials(expectedPeer, peerAddr, c.config.Log, credentials.NewTLS(tlsConfig))),
754+
params.peerAddr,
755+
grpc.WithTransportCredentials(newClientCredentials(expectedPeer, params.peerAddr, c.config.Log, credentials.NewTLS(tlsConfig))),
714756
grpc.WithStatsHandler(newStatsHandler(c.reporter)),
715757
grpc.WithChainStreamInterceptor(metadata.StreamClientInterceptor, interceptors.GRPCClientStreamErrorInterceptor),
716758
grpc.WithKeepaliveParams(keepalive.ClientParameters{
@@ -721,14 +763,29 @@ func (c *Client) connect(peerID string, peerAddr string, supportsQUIC bool) (int
721763
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
722764
)
723765
if err != nil {
724-
return nil, trace.Wrap(err, "Error dialing proxy %q", peerID)
766+
return nil, trace.Wrap(err, "Error dialing proxy %q", params.peerID)
725767
}
726768

727-
return &grpcClientConn{
769+
pingCtx, pingCancel := context.WithCancel(context.Background())
770+
cc := &grpcClientConn{
728771
cc: conn,
729772
metrics: c.metrics,
730773

731-
id: peerID,
732-
addr: peerAddr,
733-
}, nil
774+
id: params.peerID,
775+
addr: params.peerAddr,
776+
host: params.peerHost,
777+
group: params.peerGroup,
778+
779+
pingCancel: pingCancel,
780+
}
781+
782+
pings, pingFailures := internal.ClientPingsMetrics(internal.ClientPingsMetricsParams{
783+
LocalID: c.config.ID,
784+
PeerID: params.peerID,
785+
PeerHost: params.peerHost,
786+
PeerGroup: params.peerGroup,
787+
})
788+
go internal.RunClientPing(pingCtx, cc, pings, pingFailures)
789+
790+
return cc, nil
734791
}

lib/proxy/peer/client_test.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,13 @@ func TestCAChange(t *testing.T) {
145145

146146
// dial server and send a test data frame
147147
const supportsQUICFalse = false
148-
conn, err := client.connect("s1", ts.GetPeerAddr(), supportsQUICFalse)
148+
conn, err := client.connect(connectParams{
149+
peerID: "s1",
150+
peerAddr: ts.GetPeerAddr(),
151+
peerHost: "s1",
152+
peerGroup: "",
153+
supportsQUIC: supportsQUICFalse,
154+
})
149155
require.NoError(t, err)
150156
require.NotNil(t, conn)
151157
require.IsType(t, (*grpcClientConn)(nil), conn)
@@ -163,7 +169,13 @@ func TestCAChange(t *testing.T) {
163169

164170
// new connection should fail because client tls config still references old
165171
// RootCAs.
166-
conn, err = client.connect("s1", ts.GetPeerAddr(), supportsQUICFalse)
172+
conn, err = client.connect(connectParams{
173+
peerID: "s1",
174+
peerAddr: ts.GetPeerAddr(),
175+
peerHost: "s1",
176+
peerGroup: "",
177+
supportsQUIC: supportsQUICFalse,
178+
})
167179
require.NoError(t, err)
168180
require.NotNil(t, conn)
169181
require.IsType(t, (*grpcClientConn)(nil), conn)
@@ -175,7 +187,13 @@ func TestCAChange(t *testing.T) {
175187
// RootCAs.
176188
currentServerCA.Store(newServerCA)
177189

178-
conn, err = client.connect("s1", ts.GetPeerAddr(), supportsQUICFalse)
190+
conn, err = client.connect(connectParams{
191+
peerID: "s1",
192+
peerAddr: ts.GetPeerAddr(),
193+
peerHost: "s1",
194+
peerGroup: "",
195+
supportsQUIC: supportsQUICFalse,
196+
})
179197
require.NoError(t, err)
180198
require.NotNil(t, conn)
181199
require.IsType(t, (*grpcClientConn)(nil), conn)

lib/proxy/peer/helpers_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ type mockProxyAccessPoint struct {
5757
}
5858

5959
type mockProxyService struct {
60+
clientapi.UnimplementedProxyServiceServer
6061
mockDialNode func(stream clientapi.ProxyService_DialNodeServer) error
6162
}
6263

lib/proxy/peer/internal/clientconn.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ type ClientConn interface {
4040
tunnelType types.TunnelType,
4141
) (net.Conn, error)
4242

43+
// Ping checks if the peer is reachable and responsive.
44+
Ping(context.Context) error
45+
4346
// Close closes all connections and releases any background resources
4447
// immediately.
4548
Close() error

lib/proxy/peer/internal/metrics.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
// Teleport
2+
// Copyright (C) 2024 Gravitational, Inc.
3+
//
4+
// This program is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU Affero General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
//
9+
// This program is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU Affero General Public License for more details.
13+
//
14+
// You should have received a copy of the GNU Affero General Public License
15+
// along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
17+
package internal
18+
19+
import (
20+
"context"
21+
"sync"
22+
"time"
23+
24+
"github.com/prometheus/client_golang/prometheus"
25+
"github.com/prometheus/client_golang/prometheus/promauto"
26+
27+
"github.com/gravitational/teleport"
28+
"github.com/gravitational/teleport/api/utils/retryutils"
29+
"github.com/gravitational/teleport/lib/utils/interval"
30+
)
31+
32+
var (
33+
clientPingInitOnce sync.Once
34+
35+
clientPingsTotal *prometheus.CounterVec
36+
clientFailedPingsTotal *prometheus.CounterVec
37+
)
38+
39+
func clientPingInit() {
40+
clientPingsTotal = promauto.NewCounterVec(prometheus.CounterOpts{
41+
Namespace: teleport.MetricNamespace,
42+
Subsystem: "proxy_peer_client",
43+
Name: "pings_total",
44+
Help: "Total number of proxy peering client pings per peer proxy, both successful and failed.",
45+
}, []string{"local_id", "peer_id", "peer_host", "peer_group"})
46+
47+
clientFailedPingsTotal = promauto.NewCounterVec(prometheus.CounterOpts{
48+
Namespace: teleport.MetricNamespace,
49+
Subsystem: "proxy_peer_client",
50+
Name: "failed_pings_total",
51+
Help: "Total number of failed proxy peering client pings per peer proxy.",
52+
}, []string{"local_id", "peer_id", "peer_host", "peer_group"})
53+
}
54+
55+
// ClientPingsMetricsParams contains the parameters for [ClientPingsMetrics].
56+
type ClientPingsMetricsParams struct {
57+
// LocalID is the host ID of the current proxy.
58+
LocalID string
59+
// PeerID is the host ID of the peer proxy.
60+
PeerID string
61+
// PeerHost is the hostname of the peer proxy.
62+
PeerHost string
63+
// PeerGroup is the peer group ID of the peer proxy. Can be blank.
64+
PeerGroup string
65+
}
66+
67+
// ClientPingsMetrics returns the Prometheus metrics for a given peer proxy,
68+
// given host ID, hostname and (optionally) peer group.
69+
func ClientPingsMetrics(params ClientPingsMetricsParams) (pings, failedPings prometheus.Counter) {
70+
clientPingInitOnce.Do(clientPingInit)
71+
72+
pings = clientPingsTotal.WithLabelValues(params.LocalID, params.PeerID, params.PeerHost, params.PeerGroup)
73+
failedPings = clientFailedPingsTotal.WithLabelValues(params.LocalID, params.PeerID, params.PeerHost, params.PeerGroup)
74+
75+
return pings, failedPings
76+
}
77+
78+
// RunClientPing periodically pings the peer proxy reachable through the given
79+
// [ClientConn], accumulating counts in the given Prometheus metrics. Returns
80+
// when the context is canceled.
81+
func RunClientPing(ctx context.Context, cc ClientConn, pings, failedPings prometheus.Counter) {
82+
const pingInterval = time.Minute
83+
ivl := interval.New(interval.Config{
84+
Duration: pingInterval * 14 / 13,
85+
FirstDuration: retryutils.HalfJitter(pingInterval),
86+
Jitter: retryutils.SeventhJitter,
87+
})
88+
defer ivl.Stop()
89+
90+
for ctx.Err() == nil {
91+
select {
92+
case <-ivl.Next():
93+
func() {
94+
timeoutCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
95+
defer cancel()
96+
97+
err := cc.Ping(timeoutCtx)
98+
if err != nil {
99+
if ctx.Err() != nil {
100+
return
101+
}
102+
failedPings.Inc()
103+
}
104+
pings.Inc()
105+
}()
106+
case <-ctx.Done():
107+
}
108+
}
109+
}

lib/proxy/peer/service.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package peer
2020

2121
import (
22+
"context"
2223
"log/slog"
2324
"strings"
2425

@@ -106,6 +107,10 @@ func (s *proxyService) DialNode(stream proto.ProxyService_DialNodeServer) error
106107
return trace.Wrap(err)
107108
}
108109

110+
func (s *proxyService) Ping(ctx context.Context, _ *proto.ProxyServicePingRequest) (*proto.ProxyServicePingResponse, error) {
111+
return new(proto.ProxyServicePingResponse), nil
112+
}
113+
109114
// splitServerID splits a server id in to a node id and cluster name.
110115
func splitServerID(address string) (string, string, error) {
111116
split := strings.Split(address, ".")

0 commit comments

Comments
 (0)