@@ -44,6 +44,7 @@ import (
44
44
streamutils "github.com/gravitational/teleport/api/utils/grpc/stream"
45
45
"github.com/gravitational/teleport/lib/auth/authclient"
46
46
"github.com/gravitational/teleport/lib/defaults"
47
+ "github.com/gravitational/teleport/lib/proxy/peer/internal"
47
48
"github.com/gravitational/teleport/lib/services"
48
49
"github.com/gravitational/teleport/lib/utils"
49
50
)
@@ -94,11 +95,11 @@ type ClientConfig struct {
94
95
}
95
96
96
97
// connShuffler shuffles the order of client connections.
97
- type connShuffler func ([]clientConn )
98
+ type connShuffler func ([]internal. ClientConn )
98
99
99
100
// randomConnShuffler returns a conn shuffler that randomizes the order of connections.
100
101
func randomConnShuffler () connShuffler {
101
- return func (conns []clientConn ) {
102
+ return func (conns []internal. ClientConn ) {
102
103
rand .Shuffle (len (conns ), func (i , j int ) {
103
104
conns [i ], conns [j ] = conns [j ], conns [i ]
104
105
})
@@ -107,7 +108,7 @@ func randomConnShuffler() connShuffler {
107
108
108
109
// noopConnShutffler returns a conn shuffler that keeps the original connection ordering.
109
110
func noopConnShuffler () connShuffler {
110
- return func ([]clientConn ) {}
111
+ return func ([]internal. ClientConn ) {}
111
112
}
112
113
113
114
// checkAndSetDefaults checks and sets default values
@@ -163,32 +164,6 @@ func (c *ClientConfig) checkAndSetDefaults() error {
163
164
return nil
164
165
}
165
166
166
- // clientConn manages client connections to a specific peer proxy (with a fixed
167
- // host ID and address).
168
- type clientConn interface {
169
- // peerID returns the host ID of the peer proxy.
170
- peerID () string
171
- // peerAddr returns the address of the peer proxy.
172
- peerAddr () string
173
-
174
- // dial opens a connection of a given tunnel type to a node with the given
175
- // ID through the peer proxy managed by the clientConn.
176
- dial (
177
- nodeID string ,
178
- src net.Addr ,
179
- dst net.Addr ,
180
- tunnelType types.TunnelType ,
181
- ) (net.Conn , error )
182
-
183
- // close closes all connections and releases any background resources
184
- // immediately.
185
- close () error
186
-
187
- // shutdown waits until all connections are closed or the context is done,
188
- // then acts like close.
189
- shutdown (context.Context )
190
- }
191
-
192
167
// grpcClientConn manages client connections to a specific peer proxy over gRPC.
193
168
type grpcClientConn struct {
194
169
cc * grpc.ClientConn
@@ -205,13 +180,13 @@ type grpcClientConn struct {
205
180
count int
206
181
}
207
182
208
- var _ clientConn = (* grpcClientConn )(nil )
183
+ var _ internal. ClientConn = (* grpcClientConn )(nil )
209
184
210
- // peerID implements [clientConn ].
211
- func (c * grpcClientConn ) peerID () string { return c .id }
185
+ // PeerID implements [internal.ClientConn ].
186
+ func (c * grpcClientConn ) PeerID () string { return c .id }
212
187
213
- // peerAddr implements [clientConn ].
214
- func (c * grpcClientConn ) peerAddr () string { return c .addr }
188
+ // PeerAddr implements [internal.ClientConn ].
189
+ func (c * grpcClientConn ) PeerAddr () string { return c .addr }
215
190
216
191
// maybeAcquire returns a non-nil release func if the grpcClientConn is
217
192
// currently allowed to open connections; i.e., if it hasn't fully shut down.
@@ -234,8 +209,8 @@ func (c *grpcClientConn) maybeAcquire() (release func()) {
234
209
})
235
210
}
236
211
237
- // shutdown implements [clientConn ].
238
- func (c * grpcClientConn ) shutdown (ctx context.Context ) {
212
+ // Shutdown implements [internal.ClientConn ].
213
+ func (c * grpcClientConn ) Shutdown (ctx context.Context ) {
239
214
defer c .cc .Close ()
240
215
241
216
c .mu .Lock ()
@@ -255,13 +230,13 @@ func (c *grpcClientConn) shutdown(ctx context.Context) {
255
230
}
256
231
}
257
232
258
- // close implements [clientConn ].
259
- func (c * grpcClientConn ) close () error {
233
+ // Close implements [internal.ClientConn ].
234
+ func (c * grpcClientConn ) Close () error {
260
235
return c .cc .Close ()
261
236
}
262
237
263
- // dial implements [clientConn ].
264
- func (c * grpcClientConn ) dial (
238
+ // Dial implements [internal.ClientConn ].
239
+ func (c * grpcClientConn ) Dial (
265
240
nodeID string ,
266
241
src net.Addr ,
267
242
dst net.Addr ,
@@ -335,7 +310,7 @@ type Client struct {
335
310
cancel context.CancelFunc
336
311
337
312
config ClientConfig
338
- conns map [string ]clientConn
313
+ conns map [string ]internal. ClientConn
339
314
metrics * clientMetrics
340
315
reporter * reporter
341
316
}
@@ -360,7 +335,7 @@ func NewClient(config ClientConfig) (*Client, error) {
360
335
config : config ,
361
336
ctx : closeContext ,
362
337
cancel : cancel ,
363
- conns : make (map [string ]clientConn ),
338
+ conns : make (map [string ]internal. ClientConn ),
364
339
metrics : metrics ,
365
340
reporter : reporter ,
366
341
}
@@ -453,7 +428,7 @@ func (c *Client) updateConnections(proxies []types.Server) error {
453
428
}
454
429
455
430
var toDelete []string
456
- toKeep := make (map [string ]clientConn )
431
+ toKeep := make (map [string ]internal. ClientConn )
457
432
for id , conn := range c .conns {
458
433
proxy , ok := toDial [id ]
459
434
@@ -464,7 +439,7 @@ func (c *Client) updateConnections(proxies []types.Server) error {
464
439
}
465
440
466
441
// peer address changed
467
- if conn .peerAddr () != proxy .GetPeerAddr () {
442
+ if conn .PeerAddr () != proxy .GetPeerAddr () {
468
443
toDelete = append (toDelete , id )
469
444
continue
470
445
}
@@ -485,8 +460,8 @@ func (c *Client) updateConnections(proxies []types.Server) error {
485
460
}
486
461
487
462
// establish new connections
488
- _ , supportsQuic := proxy .GetLabel (types .ProxyPeerQUICLabel )
489
- conn , err := c .connect (id , proxy .GetPeerAddr (), supportsQuic )
463
+ supportsQUIC , _ := proxy .GetLabel (types .UnstableProxyPeerQUICLabel )
464
+ conn , err := c .connect (id , proxy .GetPeerAddr (), supportsQUIC == "yes" )
490
465
if err != nil {
491
466
c .metrics .reportTunnelError (errorProxyPeerTunnelDial )
492
467
c .config .Log .DebugContext (c .ctx , "error dialing peer proxy" , "peer_id" , id , "peer_addr" , proxy .GetPeerAddr ())
@@ -503,7 +478,7 @@ func (c *Client) updateConnections(proxies []types.Server) error {
503
478
504
479
for _ , id := range toDelete {
505
480
if conn , ok := c .conns [id ]; ok {
506
- go conn .shutdown (c .ctx )
481
+ go conn .Shutdown (c .ctx )
507
482
}
508
483
}
509
484
c .conns = toKeep
@@ -556,9 +531,9 @@ func (c *Client) Shutdown(ctx context.Context) {
556
531
var wg sync.WaitGroup
557
532
for _ , conn := range c .conns {
558
533
wg .Add (1 )
559
- go func (conn clientConn ) {
534
+ go func (conn internal. ClientConn ) {
560
535
defer wg .Done ()
561
- conn .shutdown (ctx )
536
+ conn .Shutdown (ctx )
562
537
}(conn )
563
538
}
564
539
wg .Wait ()
@@ -572,7 +547,7 @@ func (c *Client) Stop() error {
572
547
573
548
var errs []error
574
549
for _ , conn := range c .conns {
575
- if err := conn .close (); err != nil {
550
+ if err := conn .Close (); err != nil {
576
551
errs = append (errs , err )
577
552
}
578
553
}
@@ -627,7 +602,7 @@ func (c *Client) dial(
627
602
628
603
var errs []error
629
604
for _ , clientConn := range conns {
630
- conn , err := clientConn .dial (nodeID , src , dst , tunnelType )
605
+ conn , err := clientConn .Dial (nodeID , src , dst , tunnelType )
631
606
if err != nil {
632
607
errs = append (errs , trace .Wrap (err ))
633
608
continue
@@ -643,13 +618,13 @@ func (c *Client) dial(
643
618
// otherwise.
644
619
// The boolean returned in the second argument is intended for testing purposes,
645
620
// to indicates whether the connection was cached or newly established.
646
- func (c * Client ) getConnections (proxyIDs []string ) ([]clientConn , bool , error ) {
621
+ func (c * Client ) getConnections (proxyIDs []string ) ([]internal. ClientConn , bool , error ) {
647
622
if len (proxyIDs ) == 0 {
648
623
return nil , false , trace .BadParameter ("failed to dial: no proxy ids given" )
649
624
}
650
625
651
626
ids := make (map [string ]struct {})
652
- var conns []clientConn
627
+ var conns []internal. ClientConn
653
628
654
629
// look for existing matching connections.
655
630
c .RLock ()
@@ -686,8 +661,8 @@ func (c *Client) getConnections(proxyIDs []string) ([]clientConn, bool, error) {
686
661
continue
687
662
}
688
663
689
- _ , supportsQuic := proxy .GetLabel (types .ProxyPeerQUICLabel )
690
- conn , err := c .connect (id , proxy .GetPeerAddr (), supportsQuic )
664
+ supportsQUIC , _ := proxy .GetLabel (types .UnstableProxyPeerQUICLabel )
665
+ conn , err := c .connect (id , proxy .GetPeerAddr (), supportsQUIC == "yes" )
691
666
if err != nil {
692
667
c .metrics .reportTunnelError (errorProxyPeerTunnelDirectDial )
693
668
c .config .Log .DebugContext (c .ctx , "error direct dialing peer proxy" , "peer_id" , id , "peer_addr" , proxy .GetPeerAddr ())
@@ -707,15 +682,15 @@ func (c *Client) getConnections(proxyIDs []string) ([]clientConn, bool, error) {
707
682
defer c .Unlock ()
708
683
709
684
for _ , conn := range conns {
710
- c .conns [conn .peerID ()] = conn
685
+ c .conns [conn .PeerID ()] = conn
711
686
}
712
687
713
688
c .config .connShuffler (conns )
714
689
return conns , false , nil
715
690
}
716
691
717
692
// connect dials a new connection to proxyAddr.
718
- func (c * Client ) connect (peerID string , peerAddr string , supportsQUIC bool ) (clientConn , error ) {
693
+ func (c * Client ) connect (peerID string , peerAddr string , supportsQUIC bool ) (internal. ClientConn , error ) {
719
694
if supportsQUIC && c .config .QUICTransport != nil {
720
695
panic ("QUIC proxy peering is not implemented" )
721
696
}
0 commit comments