Skip to content

Commit

Permalink
Cluster peer discovery improvements and fixes (#1387)
Browse files Browse the repository at this point in the history
* Refactor cluster peer discovery

* Fix inconsistent behaviour

* Add A record discovery and tests

* Changelog and docs

* PR feedback, ty
  • Loading branch information
thampiotr authored Aug 9, 2024
1 parent 41a8310 commit 0499453
Show file tree
Hide file tree
Showing 9 changed files with 529 additions and 203 deletions.
20 changes: 20 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,30 @@ internal API changes are not present.
Main (unreleased)
-----------------

### Enhancements

- Clustering peer resolution through `--cluster.join-addresses` flag has been
improved with more consistent behaviour, better error handling and added
support for A/AAAA DNS records. If necessary, users can temporarily opt out of
this new behaviour with the `--cluster.use-discovery-v1`, but this can only be
used as a temporary measure, since this flag will be disabled in future
releases. (@thampiotr)

### Bugfixes

- Fixed an issue which caused loss of context data in Faro exception. (@codecapitano)

- Fixed an issue where providing multiple hostnames or IP addresses
via `--cluster.join-addresses` would only use the first provided value.
(@thampiotr)

- Fixed an issue where providing `<hostname>:<port>`
in `--cluster.join-addresses` would only resolve with DNS to a single address,
instead of using all the available records. (@thampiotr)

- Fixed an issue where clustering peers resolution via hostname in `--cluster.join-addresses`
resolves to duplicated IP addresses when using SRV records. (@thampiotr)

v1.3.0
-----------------

Expand Down
6 changes: 3 additions & 3 deletions docs/sources/reference/cli/run.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,9 @@ If `--cluster.advertise-interfaces` isn't explicitly set, {{< param "PRODUCT_NAM
{{< param "PRODUCT_NAME" >}} will fail to start if it can't determine the advertised address.
Since Windows doesn't use the interface names `eth0` or `en0`, Windows users must explicitly pass at least one valid network interface for `--cluster.advertise-interfaces` or a value for `--cluster.advertise-address`.

The comma-separated list of addresses provided in `--cluster.join-addresses` can either be IP addresses with an optional port, or DNS SRV records to lookup.
The ports on the list of addresses default to the port used for the HTTP listener if not explicitly provided.
We recommend that you align the port numbers on as many nodes as possible to simplify the deployment process.
The comma-separated list of addresses provided in `--cluster.join-addresses` can either be IP addresses or DNS names to lookup (supports SRV and A/AAAA records).
In both cases, the port number can be specified with a `:<port>` suffix. If ports are not provided, default of the port used for the HTTP listener is used.
If you do not provide the port number explicitly, you must ensure that all instances use the same port for the HTTP listener.

The `--cluster.discover-peers` command-line flag expects a list of tuples in the form of `provider=XXX key=val key=val ...`.
Clustering uses the [go-discover] package to discover peers and fetch their IP addresses, based on the chosen provider and the filtering key-values it supports.
Expand Down
15 changes: 0 additions & 15 deletions internal/service/cluster/discovery/common.go

This file was deleted.

41 changes: 0 additions & 41 deletions internal/service/cluster/discovery/dynamic.go

This file was deleted.

72 changes: 72 additions & 0 deletions internal/service/cluster/discovery/go_discovery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package discovery

import (
"context"
"fmt"
stdlog "log"
"net"
"strconv"

"github.com/go-kit/log"
"github.com/hashicorp/go-discover"
"github.com/hashicorp/go-discover/provider/k8s"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)

// newWithGoDiscovery creates a new peer discovery function that uses the github.com/hashicorp/go-discover library to
// discover peer addresses that can be used for clustering.
func newWithGoDiscovery(opt Options) (DiscoverFn, error) {
// Default to discover.New if no factory is provided.
factory := opt.goDiscoverFactory
if factory == nil {
factory = discover.New
}

providers := make(map[string]discover.Provider, len(discover.Providers)+1)
for k, v := range discover.Providers {
providers[k] = v
}

// Custom providers that aren't enabled by default
providers["k8s"] = &k8s.Provider{}

discoverer, err := factory(discover.WithProviders(providers))
if err != nil {
return nil, fmt.Errorf("bootstrapping peer discovery: %w", err)
}

return func() ([]string, error) {
_, span := opt.Tracer.Tracer("").Start(
context.Background(),
"DiscoverClusterPeers",
trace.WithSpanKind(trace.SpanKindInternal),
)
defer span.End()

addrs, err := discoverer.Addrs(opt.DiscoverPeers, stdlog.New(log.NewStdlibAdapter(opt.Logger), "", 0))
if err != nil {
span.SetStatus(codes.Error, err.Error())
return nil, fmt.Errorf("discovering peers: %w", err)
}

for i := range addrs {
// Default to using the same advertise port as the local node.
addrs[i] = appendPortIfAbsent(addrs[i], strconv.Itoa(opt.DefaultPort))
}

span.SetAttributes(attribute.Int("discovered_addresses_count", len(addrs)))
span.SetStatus(codes.Ok, "discovered peers")
return addrs, nil
}, nil
}

func appendPortIfAbsent(addr string, port string) string {
_, _, err := net.SplitHostPort(addr)
if err == nil {
// No error means there was a port in the string
return addr
}
return net.JoinHostPort(addr, port)
}
164 changes: 164 additions & 0 deletions internal/service/cluster/discovery/join_peers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package discovery

import (
"context"
"errors"
"fmt"
"net"
"strconv"

"github.com/go-kit/log"
"github.com/samber/lo"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"

"github.com/grafana/alloy/internal/runtime/logging/level"
)

// newWithJoinPeers creates a DiscoverFn that resolves the provided list of peers to a list of addresses that can be
// used for clustering. See docs/sources/reference/cli/run.md and the tests for more information.
func newWithJoinPeers(opts Options) DiscoverFn {
return func() ([]string, error) {
ctx, span := opts.Tracer.Tracer("").Start(
context.Background(),
"ResolveClusterJoinAddresses",
trace.WithSpanKind(trace.SpanKindInternal),
trace.WithAttributes(attribute.Int("join_peers_count", len(opts.JoinPeers))),
)
defer span.End()

// Use these resolvers in order to resolve the provided addresses into a form that can be used by clustering.
resolvers := []addressResolver{
ipResolver(opts.Logger),
dnsAResolver(opts, ctx),
dnsSRVResolver(opts, ctx),
}

// Get the addresses.
addresses, err := buildJoinAddresses(opts, resolvers)
if err != nil {
span.SetStatus(codes.Error, err.Error())
return nil, fmt.Errorf("static peer discovery: %w", err)
}

// Return unique addresses.
result := lo.Uniq(addresses)
span.SetAttributes(attribute.Int("resolved_addresses_count", len(result)))
span.SetStatus(codes.Ok, "resolved addresses")
return result, nil
}
}

func buildJoinAddresses(opts Options, resolvers []addressResolver) ([]string, error) {
var (
result []string
deferredErr error
)

for _, addr := range opts.JoinPeers {
// See if we have a port override, if not use the default port.
host, port, err := net.SplitHostPort(addr)
if err != nil {
host = addr
port = strconv.Itoa(opts.DefaultPort)
}

atLeastOneSuccess := false
for _, resolver := range resolvers {
resolved, err := resolver(host)
deferredErr = errors.Join(deferredErr, err)
for _, foundAddr := range resolved {
result = append(result, net.JoinHostPort(foundAddr, port))
}
// we stop once we find a resolver that succeeded for given address
if len(resolved) > 0 {
atLeastOneSuccess = true
break
}
}

if !atLeastOneSuccess {
// It is still useful to know if user provided an address that we could not resolve, even
// if another addresses resolve successfully, and we don't return an error. To keep things simple, we're
// not including more detail as it's available through debug level.
level.Warn(opts.Logger).Log("msg", "failed to resolve provided join address", "addr", addr)
}
}

if len(result) == 0 {
return nil, fmt.Errorf("failed to find any valid join addresses: %w", deferredErr)
}
return result, nil
}

type addressResolver func(addr string) ([]string, error)

func ipResolver(log log.Logger) addressResolver {
return func(addr string) ([]string, error) {
// Check if it's IP and use it if so.
ip := net.ParseIP(addr)
if ip == nil {
return nil, fmt.Errorf("could not parse as an IP or IP:port address: %q", addr)
}
level.Debug(log).Log("msg", "found an IP cluster join address", "addr", addr)
return []string{ip.String()}, nil
}
}

func dnsAResolver(opts Options, ctx context.Context) addressResolver {
// Default to net.LookupIP if not provided. By default, this will look up A/AAAA records.
ipLookup := opts.lookupIPFn
if ipLookup == nil {
ipLookup = net.LookupIP
}
return dnsResolver(opts, ctx, "A/AAAA", func(addr string) ([]string, error) {
ips, err := ipLookup(addr)
result := make([]string, 0, len(ips))
for _, ip := range ips {
result = append(result, ip.String())
}
return result, err
})
}

func dnsSRVResolver(opts Options, ctx context.Context) addressResolver {
// Default to net.LookupSRV if not provided.
srvLookup := opts.lookupSRVFn
if srvLookup == nil {
srvLookup = net.LookupSRV
}
return dnsResolver(opts, ctx, "SRV", func(addr string) ([]string, error) {
_, addresses, err := srvLookup("", "", addr)
result := make([]string, 0, len(addresses))
for _, a := range addresses {
result = append(result, a.Target)
}
return result, err
})
}

func dnsResolver(opts Options, ctx context.Context, recordType string, dnsLookupFn func(string) ([]string, error)) addressResolver {
return func(addr string) ([]string, error) {
_, span := opts.Tracer.Tracer("").Start(
ctx,
"ClusterPeersDNSLookup",
trace.WithSpanKind(trace.SpanKindInternal),
trace.WithAttributes(attribute.String("addr", addr)),
trace.WithAttributes(attribute.String("record_type", recordType)),
)
defer span.End()

result, err := dnsLookupFn(addr)
if err != nil {
level.Debug(opts.Logger).Log("msg", "failed to resolve DNS records", "addr", addr, "record_type", recordType, "err", err)
span.SetStatus(codes.Error, err.Error())
return nil, fmt.Errorf("failed to resolve %q records: %w", recordType, err)
}

level.Debug(opts.Logger).Log("msg", "received DNS query response", "addr", addr, "record_type", recordType, "records_count", len(result))
span.SetAttributes(attribute.Int("resolved_addresses_count", len(result)))
span.SetStatus(codes.Ok, "resolved addresses")
return result, nil
}
}
Loading

0 comments on commit 0499453

Please sign in to comment.