From ce806826f5f947281d515eafb52d074b3bf48f25 Mon Sep 17 00:00:00 2001 From: Arseniy Klempner Date: Thu, 19 Sep 2024 16:58:18 -0700 Subject: [PATCH] feat(telemetry)_: track dial errors --- telemetry/client.go | 32 ++++++++++++ wakuv2/common/helpers.go | 104 +++++++++++++++++++++++++++++++++++++++ wakuv2/waku.go | 13 +++++ 3 files changed, 149 insertions(+) diff --git a/telemetry/client.go b/telemetry/client.go index 43c9b7a5270..00683ecba2c 100644 --- a/telemetry/client.go +++ b/telemetry/client.go @@ -16,6 +16,7 @@ import ( "github.com/status-im/status-go/protocol/transport" "github.com/status-im/status-go/wakuv2" + "github.com/status-im/status-go/wakuv2/common" wps "github.com/waku-org/go-waku/waku/v2/peerstore" v2protocol "github.com/waku-org/go-waku/waku/v2/protocol" @@ -37,6 +38,7 @@ const ( MessageCheckFailureMetric TelemetryType = "MessageCheckFailure" PeerCountByShardMetric TelemetryType = "PeerCountByShard" PeerCountByOriginMetric TelemetryType = "PeerCountByOrigin" + DialFailureMetric TelemetryType = "DialFailure" MaxRetryCache = 5000 ) @@ -103,6 +105,14 @@ func (c *Client) PushPeerCountByOrigin(ctx context.Context, peerCountByOrigin ma } } +func (c *Client) PushDialFailure(ctx context.Context, dialFailure common.DialError) { + var errorMessage string = "" + if dialFailure.ErrType == common.ErrorUnknown { + errorMessage = dialFailure.ErrMsg + } + c.processAndPushTelemetry(ctx, DialFailure{ErrorType: dialFailure.ErrType, ErrorMsg: errorMessage, Protocols: dialFailure.Protocols}) +} + type ReceivedMessages struct { Filter transport.Filter SSHMessage *types.Message @@ -136,6 +146,12 @@ type PeerCountByOrigin struct { Count uint } +type DialFailure struct { + ErrorType common.DialErrorType + ErrorMsg string + Protocols string +} + type Client struct { serverURL string httpClient *http.Client @@ -312,6 +328,12 @@ func (c *Client) processAndPushTelemetry(ctx context.Context, data interface{}) TelemetryType: PeerCountByOriginMetric, TelemetryData: c.ProcessPeerCountByOrigin(v), } + case DialFailure: + telemetryRequest = TelemetryRequest{ + Id: c.nextId, + TelemetryType: DialFailureMetric, + TelemetryData: c.ProcessDialFailure(v), + } default: c.logger.Error("Unknown telemetry data type") return @@ -483,6 +505,16 @@ func (c *Client) ProcessPeerCountByOrigin(peerCountByOrigin PeerCountByOrigin) * return &jsonRawMessage } +func (c *Client) ProcessDialFailure(dialFailure DialFailure) *json.RawMessage { + postBody := c.commonPostBody() + postBody["errorType"] = dialFailure.ErrorType + postBody["errorMsg"] = dialFailure.ErrorMsg + postBody["protocols"] = dialFailure.Protocols + body, _ := json.Marshal(postBody) + jsonRawMessage := json.RawMessage(body) + return &jsonRawMessage +} + func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) { c.logger.Debug("Pushing envelope update to telemetry server", zap.String("hash", types.EncodeHex(shhMessage.Hash))) url := fmt.Sprintf("%s/update-envelope", c.serverURL) diff --git a/wakuv2/common/helpers.go b/wakuv2/common/helpers.go index 887eeab9edc..2affba10d48 100644 --- a/wakuv2/common/helpers.go +++ b/wakuv2/common/helpers.go @@ -6,8 +6,11 @@ import ( "errors" "fmt" mrand "math/rand" + "regexp" + "strings" "github.com/ethereum/go-ethereum/common" + "github.com/multiformats/go-multiaddr" ) // IsPubKeyEqual checks that two public keys are equal @@ -110,3 +113,104 @@ func ValidateDataIntegrity(k []byte, expectedSize int) bool { } return true } + +func ParseDialErrors(errMsg string) []DialError { + // Regular expression to match the array of failed dial attempts + re := regexp.MustCompile(`all dials failed\n((?:\s*\*\s*\[.*\].*\n?)+)`) + + match := re.FindStringSubmatch(errMsg) + if len(match) < 2 { + return nil + } + + // Split the matched string into individual dial attempts + dialAttempts := strings.Split(strings.TrimSpace(match[1]), "\n") + + // Regular expression to extract multiaddr and error message + reAttempt := regexp.MustCompile(`\[(.*?)\]\s*(.*)`) + + var dialErrors []DialError + for _, attempt := range dialAttempts { + attempt = strings.TrimSpace(strings.Trim(attempt, "* ")) + matches := reAttempt.FindStringSubmatch(attempt) + if len(matches) == 3 { + errMsg := strings.TrimSpace(matches[2]) + ma, err := multiaddr.NewMultiaddr(matches[1]) + if err != nil { + continue + } + protocols := ma.Protocols() + protocolsStr := "/" + for i, protocol := range protocols { + protocolsStr += fmt.Sprintf("%s", protocol.Name) + if i < len(protocols)-1 { + protocolsStr += "/" + } + } + dialErrors = append(dialErrors, DialError{ + Protocols: protocolsStr, + MultiAddr: matches[1], + ErrMsg: errMsg, + ErrType: CategorizeDialError(errMsg), + }) + } + } + + return dialErrors +} + +// DialErrorType represents the type of dial error +type DialErrorType int + +const ( + ErrorUnknown DialErrorType = iota + ErrorIOTimeout + ErrorConnectionRefused + ErrorRelayCircuitFailed + ErrorRelayNoReservation + ErrorSecurityNegotiationFailed + ErrorConcurrentDialSucceeded + ErrorConcurrentDialFailed +) + +func (det DialErrorType) String() string { + return [...]string{ + "Unknown", + "I/O Timeout", + "Connection Refused", + "Relay Circuit Failed", + "Relay No Reservation", + "Security Negotiation Failed", + "Concurrent Dial Succeeded", + "Concurrent Dial Failed", + }[det] +} + +func CategorizeDialError(errMsg string) DialErrorType { + switch { + case strings.Contains(errMsg, "i/o timeout"): + return ErrorIOTimeout + case strings.Contains(errMsg, "connect: connection refused"): + return ErrorConnectionRefused + case strings.Contains(errMsg, "error opening relay circuit: CONNECTION_FAILED"): + return ErrorRelayCircuitFailed + case strings.Contains(errMsg, "error opening relay circuit: NO_RESERVATION"): + return ErrorRelayNoReservation + case strings.Contains(errMsg, "failed to negotiate security protocol"): + return ErrorSecurityNegotiationFailed + case strings.Contains(errMsg, "concurrent active dial succeeded"): + return ErrorConcurrentDialSucceeded + case strings.Contains(errMsg, "concurrent active dial through the same relay failed"): + return ErrorConcurrentDialFailed + default: + return ErrorUnknown + } +} + +// DialError represents a single dial error with its multiaddr and error message +type DialError struct { + MultiAddr string + ErrMsg string + ErrType DialErrorType + Protocols string +} diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 63a0f083003..24483163c08 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -113,6 +113,7 @@ type ITelemetryClient interface { PushMessageCheckFailure(ctx context.Context, messageHash string) PushPeerCountByShard(ctx context.Context, peerCountByShard map[uint16]uint) PushPeerCountByOrigin(ctx context.Context, peerCountByOrigin map[wps.Origin]uint) + PushDialFailure(ctx context.Context, dialFailure common.DialError) } // Waku represents a dark communication interface through the Ethereum @@ -1105,12 +1106,24 @@ func (w *Waku) Start() error { peerTelemetryTicker := time.NewTicker(peerTelemetryTickerInterval) defer peerTelemetryTicker.Stop() + sub, err := w.node.Host().EventBus().Subscribe(new(utils.DialError)) + if err != nil { + w.logger.Error("failed to subscribe to dial errors", zap.Error(err)) + return + } + defer sub.Close() + for { select { case <-w.ctx.Done(): return case <-peerTelemetryTicker.C: w.reportPeerMetrics() + case dialErr := <-sub.Out(): + errors := common.ParseDialErrors(dialErr.(utils.DialError).Err.Error()) + for _, dialError := range errors { + w.statusTelemetryClient.PushDialFailure(w.ctx, common.DialError{ErrType: dialError.ErrType, ErrMsg: dialError.ErrMsg, Protocols: dialError.Protocols}) + } } } }()