Skip to content

Commit

Permalink
feat(telemetry)_: track dial errors
Browse files Browse the repository at this point in the history
  • Loading branch information
adklempner committed Sep 24, 2024
1 parent d794e43 commit ce80682
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 0 deletions.
32 changes: 32 additions & 0 deletions telemetry/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/status-im/status-go/protocol/transport"
"github.com/status-im/status-go/wakuv2"

"github.com/status-im/status-go/wakuv2/common"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
v2protocol "github.com/waku-org/go-waku/waku/v2/protocol"

Expand All @@ -37,6 +38,7 @@ const (
MessageCheckFailureMetric TelemetryType = "MessageCheckFailure"
PeerCountByShardMetric TelemetryType = "PeerCountByShard"
PeerCountByOriginMetric TelemetryType = "PeerCountByOrigin"
DialFailureMetric TelemetryType = "DialFailure"
MaxRetryCache = 5000
)

Expand Down Expand Up @@ -103,6 +105,14 @@ func (c *Client) PushPeerCountByOrigin(ctx context.Context, peerCountByOrigin ma
}
}

func (c *Client) PushDialFailure(ctx context.Context, dialFailure common.DialError) {
var errorMessage string = ""
if dialFailure.ErrType == common.ErrorUnknown {
errorMessage = dialFailure.ErrMsg
}
c.processAndPushTelemetry(ctx, DialFailure{ErrorType: dialFailure.ErrType, ErrorMsg: errorMessage, Protocols: dialFailure.Protocols})
}

type ReceivedMessages struct {
Filter transport.Filter
SSHMessage *types.Message
Expand Down Expand Up @@ -136,6 +146,12 @@ type PeerCountByOrigin struct {
Count uint
}

type DialFailure struct {
ErrorType common.DialErrorType
ErrorMsg string
Protocols string
}

type Client struct {
serverURL string
httpClient *http.Client
Expand Down Expand Up @@ -312,6 +328,12 @@ func (c *Client) processAndPushTelemetry(ctx context.Context, data interface{})
TelemetryType: PeerCountByOriginMetric,
TelemetryData: c.ProcessPeerCountByOrigin(v),
}
case DialFailure:
telemetryRequest = TelemetryRequest{
Id: c.nextId,
TelemetryType: DialFailureMetric,
TelemetryData: c.ProcessDialFailure(v),
}
default:
c.logger.Error("Unknown telemetry data type")
return
Expand Down Expand Up @@ -483,6 +505,16 @@ func (c *Client) ProcessPeerCountByOrigin(peerCountByOrigin PeerCountByOrigin) *
return &jsonRawMessage
}

func (c *Client) ProcessDialFailure(dialFailure DialFailure) *json.RawMessage {
postBody := c.commonPostBody()
postBody["errorType"] = dialFailure.ErrorType
postBody["errorMsg"] = dialFailure.ErrorMsg
postBody["protocols"] = dialFailure.Protocols
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
}

func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) {
c.logger.Debug("Pushing envelope update to telemetry server", zap.String("hash", types.EncodeHex(shhMessage.Hash)))
url := fmt.Sprintf("%s/update-envelope", c.serverURL)
Expand Down
104 changes: 104 additions & 0 deletions wakuv2/common/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ import (
"errors"
"fmt"
mrand "math/rand"
"regexp"
"strings"

"github.com/ethereum/go-ethereum/common"
"github.com/multiformats/go-multiaddr"
)

// IsPubKeyEqual checks that two public keys are equal
Expand Down Expand Up @@ -110,3 +113,104 @@ func ValidateDataIntegrity(k []byte, expectedSize int) bool {
}
return true
}

func ParseDialErrors(errMsg string) []DialError {
// Regular expression to match the array of failed dial attempts
re := regexp.MustCompile(`all dials failed\n((?:\s*\*\s*\[.*\].*\n?)+)`)

match := re.FindStringSubmatch(errMsg)
if len(match) < 2 {
return nil
}

// Split the matched string into individual dial attempts
dialAttempts := strings.Split(strings.TrimSpace(match[1]), "\n")

// Regular expression to extract multiaddr and error message
reAttempt := regexp.MustCompile(`\[(.*?)\]\s*(.*)`)

var dialErrors []DialError
for _, attempt := range dialAttempts {
attempt = strings.TrimSpace(strings.Trim(attempt, "* "))
matches := reAttempt.FindStringSubmatch(attempt)
if len(matches) == 3 {
errMsg := strings.TrimSpace(matches[2])
ma, err := multiaddr.NewMultiaddr(matches[1])
if err != nil {
continue
}
protocols := ma.Protocols()
protocolsStr := "/"
for i, protocol := range protocols {
protocolsStr += fmt.Sprintf("%s", protocol.Name)
if i < len(protocols)-1 {
protocolsStr += "/"
}
}
dialErrors = append(dialErrors, DialError{
Protocols: protocolsStr,
MultiAddr: matches[1],
ErrMsg: errMsg,
ErrType: CategorizeDialError(errMsg),
})
}
}

return dialErrors
}

// DialErrorType represents the type of dial error
type DialErrorType int

const (
ErrorUnknown DialErrorType = iota
ErrorIOTimeout
ErrorConnectionRefused
ErrorRelayCircuitFailed
ErrorRelayNoReservation
ErrorSecurityNegotiationFailed
ErrorConcurrentDialSucceeded
ErrorConcurrentDialFailed
)

func (det DialErrorType) String() string {
return [...]string{
"Unknown",
"I/O Timeout",
"Connection Refused",
"Relay Circuit Failed",
"Relay No Reservation",
"Security Negotiation Failed",
"Concurrent Dial Succeeded",
"Concurrent Dial Failed",
}[det]
}

func CategorizeDialError(errMsg string) DialErrorType {
switch {
case strings.Contains(errMsg, "i/o timeout"):
return ErrorIOTimeout
case strings.Contains(errMsg, "connect: connection refused"):
return ErrorConnectionRefused
case strings.Contains(errMsg, "error opening relay circuit: CONNECTION_FAILED"):
return ErrorRelayCircuitFailed
case strings.Contains(errMsg, "error opening relay circuit: NO_RESERVATION"):
return ErrorRelayNoReservation
case strings.Contains(errMsg, "failed to negotiate security protocol"):
return ErrorSecurityNegotiationFailed
case strings.Contains(errMsg, "concurrent active dial succeeded"):
return ErrorConcurrentDialSucceeded
case strings.Contains(errMsg, "concurrent active dial through the same relay failed"):
return ErrorConcurrentDialFailed
default:
return ErrorUnknown
}
}

// DialError represents a single dial error with its multiaddr and error message
type DialError struct {
MultiAddr string
ErrMsg string
ErrType DialErrorType
Protocols string
}
13 changes: 13 additions & 0 deletions wakuv2/waku.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ type ITelemetryClient interface {
PushMessageCheckFailure(ctx context.Context, messageHash string)
PushPeerCountByShard(ctx context.Context, peerCountByShard map[uint16]uint)
PushPeerCountByOrigin(ctx context.Context, peerCountByOrigin map[wps.Origin]uint)
PushDialFailure(ctx context.Context, dialFailure common.DialError)
}

// Waku represents a dark communication interface through the Ethereum
Expand Down Expand Up @@ -1105,12 +1106,24 @@ func (w *Waku) Start() error {
peerTelemetryTicker := time.NewTicker(peerTelemetryTickerInterval)
defer peerTelemetryTicker.Stop()

sub, err := w.node.Host().EventBus().Subscribe(new(utils.DialError))
if err != nil {
w.logger.Error("failed to subscribe to dial errors", zap.Error(err))
return
}
defer sub.Close()

for {
select {
case <-w.ctx.Done():
return
case <-peerTelemetryTicker.C:
w.reportPeerMetrics()
case dialErr := <-sub.Out():
errors := common.ParseDialErrors(dialErr.(utils.DialError).Err.Error())
for _, dialError := range errors {
w.statusTelemetryClient.PushDialFailure(w.ctx, common.DialError{ErrType: dialError.ErrType, ErrMsg: dialError.ErrMsg, Protocols: dialError.Protocols})
}
}
}
}()
Expand Down

0 comments on commit ce80682

Please sign in to comment.