diff --git a/telemetry/client.go b/telemetry/client.go index 587e290fb..fcd7f087c 100644 --- a/telemetry/client.go +++ b/telemetry/client.go @@ -30,6 +30,7 @@ const ( ReceivedMessagesMetric TelemetryType = "ReceivedMessages" ErrorSendingEnvelopeMetric TelemetryType = "ErrorSendingEnvelope" PeerCountMetric TelemetryType = "PeerCount" + PeerConnFailuresMetric TelemetryType = "PeerConnFailures" MaxRetryCache = 5000 ) @@ -57,7 +58,22 @@ func (c *Client) PushErrorSendingEnvelope(errorSendingEnvelope wakuv2.ErrorSendi } func (c *Client) PushPeerCount(peerCount int) { - c.processAndPushTelemetry(PeerCount{PeerCount: peerCount}) + if peerCount != c.lastPeerCount { + c.lastPeerCount = peerCount + c.processAndPushTelemetry(PeerCount{PeerCount: peerCount}) + } +} + +func (c *Client) PushPeerConnFailures(peerConnFailures map[string]int) { + for peerID, failures := range peerConnFailures { + if lastFailures, exists := c.lastPeerConnFailures[peerID]; exists { + if failures == lastFailures { + continue + } + } + c.lastPeerConnFailures[peerID] = failures + c.processAndPushTelemetry(PeerConnFailure{PeerID: peerID}) + } } type ReceivedMessages struct { @@ -70,20 +86,26 @@ type PeerCount struct { PeerCount int } +type PeerConnFailure struct { + PeerID string +} + type Client struct { - serverURL string - httpClient *http.Client - logger *zap.Logger - keyUID string - nodeName string - version string - telemetryCh chan TelemetryRequest - telemetryCacheLock sync.Mutex - telemetryCache []TelemetryRequest - telemetryRetryCache []TelemetryRequest - nextIdLock sync.Mutex - nextId int - sendPeriod time.Duration + serverURL string + httpClient *http.Client + logger *zap.Logger + keyUID string + nodeName string + version string + telemetryCh chan TelemetryRequest + telemetryCacheLock sync.Mutex + telemetryCache []TelemetryRequest + telemetryRetryCache []TelemetryRequest + nextIdLock sync.Mutex + nextId int + sendPeriod time.Duration + lastPeerCount int + lastPeerConnFailures map[string]int } type TelemetryClientOption func(*Client) @@ -96,19 +118,21 @@ func WithSendPeriod(sendPeriod time.Duration) TelemetryClientOption { func NewClient(logger *zap.Logger, serverURL string, keyUID string, nodeName string, version string, opts ...TelemetryClientOption) *Client { client := &Client{ - serverURL: serverURL, - httpClient: &http.Client{Timeout: time.Minute}, - logger: logger, - keyUID: keyUID, - nodeName: nodeName, - version: version, - telemetryCh: make(chan TelemetryRequest), - telemetryCacheLock: sync.Mutex{}, - telemetryCache: make([]TelemetryRequest, 0), - telemetryRetryCache: make([]TelemetryRequest, 0), - nextId: 0, - nextIdLock: sync.Mutex{}, - sendPeriod: 10 * time.Second, // default value + serverURL: serverURL, + httpClient: &http.Client{Timeout: time.Minute}, + logger: logger, + keyUID: keyUID, + nodeName: nodeName, + version: version, + telemetryCh: make(chan TelemetryRequest), + telemetryCacheLock: sync.Mutex{}, + telemetryCache: make([]TelemetryRequest, 0), + telemetryRetryCache: make([]TelemetryRequest, 0), + nextId: 0, + nextIdLock: sync.Mutex{}, + sendPeriod: 10 * time.Second, // default value + lastPeerCount: 0, + lastPeerConnFailures: make(map[string]int), } for _, opt := range opts { @@ -198,6 +222,12 @@ func (c *Client) processAndPushTelemetry(data interface{}) { TelemetryType: PeerCountMetric, TelemetryData: c.ProcessPeerCount(v), } + case PeerConnFailure: + telemetryRequest = TelemetryRequest{ + Id: c.nextId, + TelemetryType: PeerConnFailuresMetric, + TelemetryData: c.ProcessPeerConnFailure(v), + } default: c.logger.Error("Unknown telemetry data type") return @@ -326,6 +356,19 @@ func (c *Client) ProcessPeerCount(peerCount PeerCount) *json.RawMessage { return &jsonRawMessage } +func (c *Client) ProcessPeerConnFailure(peerConnFailure PeerConnFailure) *json.RawMessage { + postBody := map[string]interface{}{ + "peerID": peerConnFailure.PeerID, + "nodeName": c.nodeName, + "nodeKeyUID": c.keyUID, + "statusVersion": c.version, + "timestamp": time.Now().Unix(), + } + body, _ := json.Marshal(postBody) + jsonRawMessage := json.RawMessage(body) + return &jsonRawMessage +} + func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) { c.logger.Debug("Pushing envelope update to telemetry server", zap.String("hash", types.EncodeHex(shhMessage.Hash))) url := fmt.Sprintf("%s/update-envelope", c.serverURL) diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 807ec0f50..9e5544134 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -107,6 +107,7 @@ type ITelemetryClient interface { PushSentEnvelope(sentEnvelope SentEnvelope) PushErrorSendingEnvelope(errorSendingEnvelope ErrorSendingEnvelope) PushPeerCount(peerCount int) + PushPeerConnFailures(peerConnFailures map[string]int) } // Waku represents a dark communication interface through the Ethereum @@ -1384,7 +1385,9 @@ func (w *Waku) Start() error { } if w.statusTelemetryClient != nil { + connFailures := FormatPeerConnFailures(w.node) w.statusTelemetryClient.PushPeerCount(w.PeerCount()) + w.statusTelemetryClient.PushPeerConnFailures(connFailures) } //TODO: analyze if we need to discover and connect to peers with peerExchange loop enabled. @@ -2037,6 +2040,16 @@ func FormatPeerStats(wakuNode *node.WakuNode) map[string]types.WakuV2Peer { return p } +func FormatPeerConnFailures(wakuNode *node.WakuNode) map[string]int { + p := make(map[string]int) + for _, peerID := range wakuNode.Host().Network().Peers() { + peerInfo := wakuNode.Host().Peerstore().PeerInfo(peerID) + connFailures := wakuNode.Host().Peerstore().(wps.WakuPeerstore).ConnFailures(peerInfo) + p[peerID.String()] = connFailures + } + return p +} + func (w *Waku) StoreNode() legacy_store.Store { return w.node.LegacyStore() }