Skip to content

Commit

Permalink
🌭 clientgroups: implement availability probe
Browse files Browse the repository at this point in the history
  • Loading branch information
database64128 committed Feb 20, 2025
1 parent f0e74e5 commit f492e01
Show file tree
Hide file tree
Showing 4 changed files with 453 additions and 11 deletions.
41 changes: 32 additions & 9 deletions clientgroups/clientgroups.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sync/atomic"

"github.com/database64128/shadowsocks-go/zerocopy"
"go.uber.org/zap"
)

// ClientSelectionPolicy is a client selection policy.
Expand Down Expand Up @@ -51,10 +52,21 @@ type ClientGroupConfig struct {

// UDPClients is the list of UDP clients in the group, represented by their names.
UDPClients []string `json:"udpClients"`

// TCPConnectivityProbe is the configuration for TCP connectivity probes.
TCPConnectivityProbe TCPConnectivityProbeConfig `json:"tcpConnectivityProbe"`

// UDPConnectivityProbe is the configuration for UDP connectivity probes.
UDPConnectivityProbe UDPConnectivityProbeConfig `json:"udpConnectivityProbe"`
}

// AddClientGroup creates a client group from the configuration and adds it to the client maps.
func (c *ClientGroupConfig) AddClientGroup(tcpClientByName map[string]zerocopy.TCPClient, udpClientByName map[string]zerocopy.UDPClient) error {
func (c *ClientGroupConfig) AddClientGroup(
logger *zap.Logger,
tcpClientByName map[string]zerocopy.TCPClient,
udpClientByName map[string]zerocopy.UDPClient,
addProbeService func(*ProbeService),
) error {
if len(c.TCPClients) == 0 && len(c.UDPClients) == 0 {
return errors.New("empty client group")
}
Expand All @@ -75,6 +87,10 @@ func (c *ClientGroupConfig) AddClientGroup(tcpClientByName map[string]zerocopy.T
group = newRoundRobinTCPClientGroup(clients)
case PolicyRandom:
group = newRandomTCPClientGroup(clients)
case PolicyAvailability:
var service *ProbeService
group, service = c.TCPConnectivityProbe.newAvailabilityClientGroup(c.Name, logger, clients)
addProbeService(service)
default:
return fmt.Errorf("unknown TCP client selection policy: %q", c.TCPPolicy)
}
Expand All @@ -99,6 +115,10 @@ func (c *ClientGroupConfig) AddClientGroup(tcpClientByName map[string]zerocopy.T
group = newRoundRobinUDPClientGroup(clients, info)
case PolicyRandom:
group = newRandomUDPClientGroup(clients, info)
case PolicyAvailability:
var service *ProbeService
group, service = c.UDPConnectivityProbe.newAvailabilityClientGroup(c.Name, logger, clients, info)
addProbeService(service)
default:
return fmt.Errorf("unknown UDP client selection policy: %q", c.UDPPolicy)
}
Expand All @@ -121,6 +141,11 @@ func newTCPClient(client zerocopy.TCPClient) tcpClient {
}
}

// NewDialer implements [zerocopy.TCPClient.NewDialer].
func (c tcpClient) NewDialer() (zerocopy.TCPDialer, zerocopy.TCPClientInfo) {
return c.dialer, c.info
}

// roundRobinClientSelector is a client selector that selects clients in a round-robin fashion.
type roundRobinClientSelector[C any] struct {
clients []C
Expand All @@ -137,9 +162,9 @@ func newRoundRobinClientSelector[C any](clients []C) *roundRobinClientSelector[C
}

// Select selects a client in a round-robin fashion.
func (g *roundRobinClientSelector[C]) Select() C {
func (s *roundRobinClientSelector[C]) Select() C {
const uintptrToNonNegativeInt = ^uintptr(0) >> 1
return g.clients[int(g.index.Add(1)&uintptrToNonNegativeInt)%len(g.clients)]
return s.clients[int(s.index.Add(1)&uintptrToNonNegativeInt)%len(s.clients)]
}

// roundRobinTCPClientGroup is a TCP client group that selects clients in a round-robin fashion.
Expand All @@ -158,8 +183,7 @@ func newRoundRobinTCPClientGroup(clients []tcpClient) *roundRobinTCPClientGroup

// NewDialer implements [zerocopy.TCPClient.NewDialer].
func (g *roundRobinTCPClientGroup) NewDialer() (zerocopy.TCPDialer, zerocopy.TCPClientInfo) {
client := g.selector.Select()
return client.dialer, client.info
return g.selector.Select().NewDialer()
}

// roundRobinUDPClientGroup is a UDP client group that selects clients in a round-robin fashion.
Expand Down Expand Up @@ -201,8 +225,8 @@ func newRandomClientSelector[C any](clients []C) *randomClientSelector[C] {
}

// Select selects a client randomly.
func (g *randomClientSelector[C]) Select() C {
return g.clients[rand.IntN(len(g.clients))]
func (s *randomClientSelector[C]) Select() C {
return s.clients[rand.IntN(len(s.clients))]
}

// randomTCPClientGroup is a TCP client group that selects clients randomly.
Expand All @@ -221,8 +245,7 @@ func newRandomTCPClientGroup(clients []tcpClient) *randomTCPClientGroup {

// NewDialer implements [zerocopy.TCPClient.NewDialer].
func (g *randomTCPClientGroup) NewDialer() (zerocopy.TCPDialer, zerocopy.TCPClientInfo) {
client := g.selector.Select()
return client.dialer, client.info
return g.selector.Select().NewDialer()
}

// randomUDPClientGroup is a UDP client group that selects clients randomly.
Expand Down
Loading

0 comments on commit f492e01

Please sign in to comment.