feat(telemetry)_: send connection failure metric

This commit is contained in:
Arseniy Klempner 2024-07-12 20:33:08 -07:00
parent 21101c9444
commit 570ef17c77
No known key found for this signature in database
GPG Key ID: 51653F18863BD24B
2 changed files with 83 additions and 27 deletions

View File

@ -30,6 +30,7 @@ const (
ReceivedMessagesMetric TelemetryType = "ReceivedMessages" ReceivedMessagesMetric TelemetryType = "ReceivedMessages"
ErrorSendingEnvelopeMetric TelemetryType = "ErrorSendingEnvelope" ErrorSendingEnvelopeMetric TelemetryType = "ErrorSendingEnvelope"
PeerCountMetric TelemetryType = "PeerCount" PeerCountMetric TelemetryType = "PeerCount"
PeerConnFailuresMetric TelemetryType = "PeerConnFailures"
MaxRetryCache = 5000 MaxRetryCache = 5000
) )
@ -57,7 +58,22 @@ func (c *Client) PushErrorSendingEnvelope(errorSendingEnvelope wakuv2.ErrorSendi
} }
func (c *Client) PushPeerCount(peerCount int) { func (c *Client) PushPeerCount(peerCount int) {
c.processAndPushTelemetry(PeerCount{PeerCount: peerCount}) if peerCount != c.lastPeerCount {
c.lastPeerCount = peerCount
c.processAndPushTelemetry(PeerCount{PeerCount: peerCount})
}
}
func (c *Client) PushPeerConnFailures(peerConnFailures map[string]int) {
for peerID, failures := range peerConnFailures {
if lastFailures, exists := c.lastPeerConnFailures[peerID]; exists {
if failures == lastFailures {
continue
}
}
c.lastPeerConnFailures[peerID] = failures
c.processAndPushTelemetry(PeerConnFailure{PeerID: peerID})
}
} }
type ReceivedMessages struct { type ReceivedMessages struct {
@ -70,20 +86,26 @@ type PeerCount struct {
PeerCount int PeerCount int
} }
type PeerConnFailure struct {
PeerID string
}
type Client struct { type Client struct {
serverURL string serverURL string
httpClient *http.Client httpClient *http.Client
logger *zap.Logger logger *zap.Logger
keyUID string keyUID string
nodeName string nodeName string
version string version string
telemetryCh chan TelemetryRequest telemetryCh chan TelemetryRequest
telemetryCacheLock sync.Mutex telemetryCacheLock sync.Mutex
telemetryCache []TelemetryRequest telemetryCache []TelemetryRequest
telemetryRetryCache []TelemetryRequest telemetryRetryCache []TelemetryRequest
nextIdLock sync.Mutex nextIdLock sync.Mutex
nextId int nextId int
sendPeriod time.Duration sendPeriod time.Duration
lastPeerCount int
lastPeerConnFailures map[string]int
} }
type TelemetryClientOption func(*Client) type TelemetryClientOption func(*Client)
@ -96,19 +118,21 @@ func WithSendPeriod(sendPeriod time.Duration) TelemetryClientOption {
func NewClient(logger *zap.Logger, serverURL string, keyUID string, nodeName string, version string, opts ...TelemetryClientOption) *Client { func NewClient(logger *zap.Logger, serverURL string, keyUID string, nodeName string, version string, opts ...TelemetryClientOption) *Client {
client := &Client{ client := &Client{
serverURL: serverURL, serverURL: serverURL,
httpClient: &http.Client{Timeout: time.Minute}, httpClient: &http.Client{Timeout: time.Minute},
logger: logger, logger: logger,
keyUID: keyUID, keyUID: keyUID,
nodeName: nodeName, nodeName: nodeName,
version: version, version: version,
telemetryCh: make(chan TelemetryRequest), telemetryCh: make(chan TelemetryRequest),
telemetryCacheLock: sync.Mutex{}, telemetryCacheLock: sync.Mutex{},
telemetryCache: make([]TelemetryRequest, 0), telemetryCache: make([]TelemetryRequest, 0),
telemetryRetryCache: make([]TelemetryRequest, 0), telemetryRetryCache: make([]TelemetryRequest, 0),
nextId: 0, nextId: 0,
nextIdLock: sync.Mutex{}, nextIdLock: sync.Mutex{},
sendPeriod: 10 * time.Second, // default value sendPeriod: 10 * time.Second, // default value
lastPeerCount: 0,
lastPeerConnFailures: make(map[string]int),
} }
for _, opt := range opts { for _, opt := range opts {
@ -198,6 +222,12 @@ func (c *Client) processAndPushTelemetry(data interface{}) {
TelemetryType: PeerCountMetric, TelemetryType: PeerCountMetric,
TelemetryData: c.ProcessPeerCount(v), TelemetryData: c.ProcessPeerCount(v),
} }
case PeerConnFailure:
telemetryRequest = TelemetryRequest{
Id: c.nextId,
TelemetryType: PeerConnFailuresMetric,
TelemetryData: c.ProcessPeerConnFailure(v),
}
default: default:
c.logger.Error("Unknown telemetry data type") c.logger.Error("Unknown telemetry data type")
return return
@ -326,6 +356,19 @@ func (c *Client) ProcessPeerCount(peerCount PeerCount) *json.RawMessage {
return &jsonRawMessage return &jsonRawMessage
} }
func (c *Client) ProcessPeerConnFailure(peerConnFailure PeerConnFailure) *json.RawMessage {
postBody := map[string]interface{}{
"peerID": peerConnFailure.PeerID,
"nodeName": c.nodeName,
"nodeKeyUID": c.keyUID,
"statusVersion": c.version,
"timestamp": time.Now().Unix(),
}
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
}
func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) { func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) {
c.logger.Debug("Pushing envelope update to telemetry server", zap.String("hash", types.EncodeHex(shhMessage.Hash))) c.logger.Debug("Pushing envelope update to telemetry server", zap.String("hash", types.EncodeHex(shhMessage.Hash)))
url := fmt.Sprintf("%s/update-envelope", c.serverURL) url := fmt.Sprintf("%s/update-envelope", c.serverURL)

View File

@ -107,6 +107,7 @@ type ITelemetryClient interface {
PushSentEnvelope(sentEnvelope SentEnvelope) PushSentEnvelope(sentEnvelope SentEnvelope)
PushErrorSendingEnvelope(errorSendingEnvelope ErrorSendingEnvelope) PushErrorSendingEnvelope(errorSendingEnvelope ErrorSendingEnvelope)
PushPeerCount(peerCount int) PushPeerCount(peerCount int)
PushPeerConnFailures(peerConnFailures map[string]int)
} }
// Waku represents a dark communication interface through the Ethereum // Waku represents a dark communication interface through the Ethereum
@ -1384,7 +1385,9 @@ func (w *Waku) Start() error {
} }
if w.statusTelemetryClient != nil { if w.statusTelemetryClient != nil {
connFailures := FormatPeerConnFailures(w.node)
w.statusTelemetryClient.PushPeerCount(w.PeerCount()) w.statusTelemetryClient.PushPeerCount(w.PeerCount())
w.statusTelemetryClient.PushPeerConnFailures(connFailures)
} }
//TODO: analyze if we need to discover and connect to peers with peerExchange loop enabled. //TODO: analyze if we need to discover and connect to peers with peerExchange loop enabled.
@ -2037,6 +2040,16 @@ func FormatPeerStats(wakuNode *node.WakuNode) map[string]types.WakuV2Peer {
return p return p
} }
func FormatPeerConnFailures(wakuNode *node.WakuNode) map[string]int {
p := make(map[string]int)
for _, peerID := range wakuNode.Host().Network().Peers() {
peerInfo := wakuNode.Host().Peerstore().PeerInfo(peerID)
connFailures := wakuNode.Host().Peerstore().(wps.WakuPeerstore).ConnFailures(peerInfo)
p[peerID.String()] = connFailures
}
return p
}
func (w *Waku) StoreNode() legacy_store.Store { func (w *Waku) StoreNode() legacy_store.Store {
return w.node.LegacyStore() return w.node.LegacyStore()
} }