feat(telemetry)_: track dial errors

This commit is contained in:
Arseniy Klempner 2024-09-19 16:58:18 -07:00
parent d541df24cb
commit 1c0b830ebd
No known key found for this signature in database
GPG Key ID: 51653F18863BD24B
3 changed files with 149 additions and 0 deletions

View File

@ -17,6 +17,7 @@ import (
"github.com/status-im/status-go/protocol/transport" "github.com/status-im/status-go/protocol/transport"
"github.com/status-im/status-go/wakuv2" "github.com/status-im/status-go/wakuv2"
"github.com/status-im/status-go/wakuv2/common"
wps "github.com/waku-org/go-waku/waku/v2/peerstore" wps "github.com/waku-org/go-waku/waku/v2/peerstore"
v2protocol "github.com/waku-org/go-waku/waku/v2/protocol" v2protocol "github.com/waku-org/go-waku/waku/v2/protocol"
@ -37,6 +38,7 @@ const (
MessageCheckFailureMetric TelemetryType = "MessageCheckFailure" MessageCheckFailureMetric TelemetryType = "MessageCheckFailure"
PeerCountByShardMetric TelemetryType = "PeerCountByShard" PeerCountByShardMetric TelemetryType = "PeerCountByShard"
PeerCountByOriginMetric TelemetryType = "PeerCountByOrigin" PeerCountByOriginMetric TelemetryType = "PeerCountByOrigin"
DialFailureMetric TelemetryType = "DialFailure"
MaxRetryCache = 5000 MaxRetryCache = 5000
) )
@ -103,6 +105,14 @@ func (c *Client) PushPeerCountByOrigin(ctx context.Context, peerCountByOrigin ma
} }
} }
func (c *Client) PushDialFailure(ctx context.Context, dialFailure common.DialError) {
var errorMessage string = ""
if dialFailure.ErrType == common.ErrorUnknown {
errorMessage = dialFailure.ErrMsg
}
c.processAndPushTelemetry(ctx, DialFailure{ErrorType: dialFailure.ErrType, ErrorMsg: errorMessage, Protocols: dialFailure.Protocols})
}
type ReceivedMessages struct { type ReceivedMessages struct {
Filter transport.Filter Filter transport.Filter
SSHMessage *types.Message SSHMessage *types.Message
@ -136,6 +146,12 @@ type PeerCountByOrigin struct {
Count uint Count uint
} }
type DialFailure struct {
ErrorType common.DialErrorType
ErrorMsg string
Protocols string
}
type Client struct { type Client struct {
serverURL string serverURL string
httpClient *http.Client httpClient *http.Client
@ -308,6 +324,12 @@ func (c *Client) processAndPushTelemetry(ctx context.Context, data interface{})
TelemetryType: PeerCountByOriginMetric, TelemetryType: PeerCountByOriginMetric,
TelemetryData: c.ProcessPeerCountByOrigin(v), TelemetryData: c.ProcessPeerCountByOrigin(v),
} }
case DialFailure:
telemetryRequest = TelemetryRequest{
Id: c.nextId,
TelemetryType: DialFailureMetric,
TelemetryData: c.ProcessDialFailure(v),
}
default: default:
c.logger.Error("Unknown telemetry data type") c.logger.Error("Unknown telemetry data type")
return return
@ -467,6 +489,16 @@ func (c *Client) ProcessPeerCountByOrigin(peerCountByOrigin PeerCountByOrigin) *
return &jsonRawMessage return &jsonRawMessage
} }
func (c *Client) ProcessDialFailure(dialFailure DialFailure) *json.RawMessage {
postBody := c.commonPostBody()
postBody["errorType"] = dialFailure.ErrorType
postBody["errorMsg"] = dialFailure.ErrorMsg
postBody["protocols"] = dialFailure.Protocols
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
}
func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) { func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) {
defer common.LogOnPanic() defer common.LogOnPanic()
c.logger.Debug("Pushing envelope update to telemetry server", zap.String("hash", types.EncodeHex(shhMessage.Hash))) c.logger.Debug("Pushing envelope update to telemetry server", zap.String("hash", types.EncodeHex(shhMessage.Hash)))

View File

@ -6,8 +6,11 @@ import (
"errors" "errors"
"fmt" "fmt"
mrand "math/rand" mrand "math/rand"
"regexp"
"strings"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/multiformats/go-multiaddr"
) )
// IsPubKeyEqual checks that two public keys are equal // IsPubKeyEqual checks that two public keys are equal
@ -110,3 +113,104 @@ func ValidateDataIntegrity(k []byte, expectedSize int) bool {
} }
return true return true
} }
func ParseDialErrors(errMsg string) []DialError {
// Regular expression to match the array of failed dial attempts
re := regexp.MustCompile(`all dials failed\n((?:\s*\*\s*\[.*\].*\n?)+)`)
match := re.FindStringSubmatch(errMsg)
if len(match) < 2 {
return nil
}
// Split the matched string into individual dial attempts
dialAttempts := strings.Split(strings.TrimSpace(match[1]), "\n")
// Regular expression to extract multiaddr and error message
reAttempt := regexp.MustCompile(`\[(.*?)\]\s*(.*)`)
var dialErrors []DialError
for _, attempt := range dialAttempts {
attempt = strings.TrimSpace(strings.Trim(attempt, "* "))
matches := reAttempt.FindStringSubmatch(attempt)
if len(matches) == 3 {
errMsg := strings.TrimSpace(matches[2])
ma, err := multiaddr.NewMultiaddr(matches[1])
if err != nil {
continue
}
protocols := ma.Protocols()
protocolsStr := "/"
for i, protocol := range protocols {
protocolsStr += fmt.Sprintf("%s", protocol.Name)
if i < len(protocols)-1 {
protocolsStr += "/"
}
}
dialErrors = append(dialErrors, DialError{
Protocols: protocolsStr,
MultiAddr: matches[1],
ErrMsg: errMsg,
ErrType: CategorizeDialError(errMsg),
})
}
}
return dialErrors
}
// DialErrorType represents the type of dial error
type DialErrorType int
const (
ErrorUnknown DialErrorType = iota
ErrorIOTimeout
ErrorConnectionRefused
ErrorRelayCircuitFailed
ErrorRelayNoReservation
ErrorSecurityNegotiationFailed
ErrorConcurrentDialSucceeded
ErrorConcurrentDialFailed
)
func (det DialErrorType) String() string {
return [...]string{
"Unknown",
"I/O Timeout",
"Connection Refused",
"Relay Circuit Failed",
"Relay No Reservation",
"Security Negotiation Failed",
"Concurrent Dial Succeeded",
"Concurrent Dial Failed",
}[det]
}
func CategorizeDialError(errMsg string) DialErrorType {
switch {
case strings.Contains(errMsg, "i/o timeout"):
return ErrorIOTimeout
case strings.Contains(errMsg, "connect: connection refused"):
return ErrorConnectionRefused
case strings.Contains(errMsg, "error opening relay circuit: CONNECTION_FAILED"):
return ErrorRelayCircuitFailed
case strings.Contains(errMsg, "error opening relay circuit: NO_RESERVATION"):
return ErrorRelayNoReservation
case strings.Contains(errMsg, "failed to negotiate security protocol"):
return ErrorSecurityNegotiationFailed
case strings.Contains(errMsg, "concurrent active dial succeeded"):
return ErrorConcurrentDialSucceeded
case strings.Contains(errMsg, "concurrent active dial through the same relay failed"):
return ErrorConcurrentDialFailed
default:
return ErrorUnknown
}
}
// DialError represents a single dial error with its multiaddr and error message
type DialError struct {
MultiAddr string
ErrMsg string
ErrType DialErrorType
Protocols string
}

View File

@ -113,6 +113,7 @@ type ITelemetryClient interface {
PushMessageCheckFailure(ctx context.Context, messageHash string) PushMessageCheckFailure(ctx context.Context, messageHash string)
PushPeerCountByShard(ctx context.Context, peerCountByShard map[uint16]uint) PushPeerCountByShard(ctx context.Context, peerCountByShard map[uint16]uint)
PushPeerCountByOrigin(ctx context.Context, peerCountByOrigin map[wps.Origin]uint) PushPeerCountByOrigin(ctx context.Context, peerCountByOrigin map[wps.Origin]uint)
PushDialFailure(ctx context.Context, dialFailure common.DialError)
} }
// Waku represents a dark communication interface through the Ethereum // Waku represents a dark communication interface through the Ethereum
@ -1113,12 +1114,24 @@ func (w *Waku) Start() error {
peerTelemetryTicker := time.NewTicker(peerTelemetryTickerInterval) peerTelemetryTicker := time.NewTicker(peerTelemetryTickerInterval)
defer peerTelemetryTicker.Stop() defer peerTelemetryTicker.Stop()
sub, err := w.node.Host().EventBus().Subscribe(new(utils.DialError))
if err != nil {
w.logger.Error("failed to subscribe to dial errors", zap.Error(err))
return
}
defer sub.Close()
for { for {
select { select {
case <-w.ctx.Done(): case <-w.ctx.Done():
return return
case <-peerTelemetryTicker.C: case <-peerTelemetryTicker.C:
w.reportPeerMetrics() w.reportPeerMetrics()
case dialErr := <-sub.Out():
errors := common.ParseDialErrors(dialErr.(utils.DialError).Err.Error())
for _, dialError := range errors {
w.statusTelemetryClient.PushDialFailure(w.ctx, common.DialError{ErrType: dialError.ErrType, ErrMsg: dialError.ErrMsg, Protocols: dialError.Protocols})
}
} }
} }
}() }()