diff --git a/telemetry/client.go b/telemetry/client.go index 96d750516..1aa9f6ba5 100644 --- a/telemetry/client.go +++ b/telemetry/client.go @@ -31,6 +31,7 @@ const ( ReceivedMessagesMetric TelemetryType = "ReceivedMessages" ErrorSendingEnvelopeMetric TelemetryType = "ErrorSendingEnvelope" PeerCountMetric TelemetryType = "PeerCount" + PeerConnFailuresMetric TelemetryType = "PeerConnFailure" MaxRetryCache = 5000 ) @@ -58,7 +59,22 @@ func (c *Client) PushErrorSendingEnvelope(errorSendingEnvelope wakuv2.ErrorSendi } func (c *Client) PushPeerCount(peerCount int) { - c.processAndPushTelemetry(PeerCount{PeerCount: peerCount}) + if peerCount != c.lastPeerCount { + c.lastPeerCount = peerCount + c.processAndPushTelemetry(PeerCount{PeerCount: peerCount}) + } +} + +func (c *Client) PushPeerConnFailures(peerConnFailures map[string]int) { + for peerID, failures := range peerConnFailures { + if lastFailures, exists := c.lastPeerConnFailures[peerID]; exists { + if failures == lastFailures { + continue + } + } + c.lastPeerConnFailures[peerID] = failures + c.processAndPushTelemetry(PeerConnFailure{FailedPeerId: peerID, FailureCount: failures}) + } } type ReceivedMessages struct { @@ -71,21 +87,28 @@ type PeerCount struct { PeerCount int } +type PeerConnFailure struct { + FailedPeerId string + FailureCount int +} + type Client struct { - serverURL string - httpClient *http.Client - logger *zap.Logger - keyUID string - nodeName string - peerId string - version string - telemetryCh chan TelemetryRequest - telemetryCacheLock sync.Mutex - telemetryCache []TelemetryRequest - telemetryRetryCache []TelemetryRequest - nextIdLock sync.Mutex - nextId int - sendPeriod time.Duration + serverURL string + httpClient *http.Client + logger *zap.Logger + keyUID string + nodeName string + peerId string + version string + telemetryCh chan TelemetryRequest + telemetryCacheLock sync.Mutex + telemetryCache []TelemetryRequest + telemetryRetryCache []TelemetryRequest + nextIdLock sync.Mutex + nextId int + sendPeriod time.Duration + lastPeerCount int + lastPeerConnFailures map[string]int } type TelemetryClientOption func(*Client) @@ -105,19 +128,21 @@ func WithPeerID(peerId string) TelemetryClientOption { func NewClient(logger *zap.Logger, serverURL string, keyUID string, nodeName string, version string, opts ...TelemetryClientOption) *Client { serverURL = strings.TrimRight(serverURL, "/") client := &Client{ - serverURL: serverURL, - httpClient: &http.Client{Timeout: time.Minute}, - logger: logger, - keyUID: keyUID, - nodeName: nodeName, - version: version, - telemetryCh: make(chan TelemetryRequest), - telemetryCacheLock: sync.Mutex{}, - telemetryCache: make([]TelemetryRequest, 0), - telemetryRetryCache: make([]TelemetryRequest, 0), - nextId: 0, - nextIdLock: sync.Mutex{}, - sendPeriod: 10 * time.Second, // default value + serverURL: serverURL, + httpClient: &http.Client{Timeout: time.Minute}, + logger: logger, + keyUID: keyUID, + nodeName: nodeName, + version: version, + telemetryCh: make(chan TelemetryRequest), + telemetryCacheLock: sync.Mutex{}, + telemetryCache: make([]TelemetryRequest, 0), + telemetryRetryCache: make([]TelemetryRequest, 0), + nextId: 0, + nextIdLock: sync.Mutex{}, + sendPeriod: 10 * time.Second, // default value + lastPeerCount: 0, + lastPeerConnFailures: make(map[string]int), } for _, opt := range opts { @@ -207,6 +232,12 @@ func (c *Client) processAndPushTelemetry(data interface{}) { TelemetryType: PeerCountMetric, TelemetryData: c.ProcessPeerCount(v), } + case PeerConnFailure: + telemetryRequest = TelemetryRequest{ + Id: c.nextId, + TelemetryType: PeerConnFailuresMetric, + TelemetryData: c.ProcessPeerConnFailure(v), + } default: c.logger.Error("Unknown telemetry data type") return @@ -340,6 +371,21 @@ func (c *Client) ProcessPeerCount(peerCount PeerCount) *json.RawMessage { return &jsonRawMessage } +func (c *Client) ProcessPeerConnFailure(peerConnFailure PeerConnFailure) *json.RawMessage { + postBody := map[string]interface{}{ + "failedPeerId": peerConnFailure.FailedPeerId, + "failureCount": peerConnFailure.FailureCount, + "nodeName": c.nodeName, + "nodeKeyUID": c.keyUID, + "peerId": c.peerId, + "statusVersion": c.version, + "timestamp": time.Now().Unix(), + } + body, _ := json.Marshal(postBody) + jsonRawMessage := json.RawMessage(body) + return &jsonRawMessage +} + func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) { c.logger.Debug("Pushing envelope update to telemetry server", zap.String("hash", types.EncodeHex(shhMessage.Hash))) url := fmt.Sprintf("%s/update-envelope", c.serverURL) diff --git a/wakuv2/message_publishing.go b/wakuv2/message_publishing.go index 05fcce883..0e32d8ee0 100644 --- a/wakuv2/message_publishing.go +++ b/wakuv2/message_publishing.go @@ -109,8 +109,6 @@ func (w *Waku) broadcast() { } } - fn = w.limiter.ThrottlePublishFn(w.ctx, fn) - // Wraps the publish function with a call to the telemetry client if w.statusTelemetryClient != nil { sendFn := fn @@ -125,6 +123,9 @@ func (w *Waku) broadcast() { } } + // Wraps the publish function with rate limiter + fn = w.limiter.ThrottlePublishFn(w.ctx, fn) + w.wg.Add(1) go w.publishEnvelope(envelope, fn, logger) } diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 97ce3506e..8fcff1fe9 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -112,6 +112,7 @@ type ITelemetryClient interface { PushSentEnvelope(sentEnvelope SentEnvelope) PushErrorSendingEnvelope(errorSendingEnvelope ErrorSendingEnvelope) PushPeerCount(peerCount int) + PushPeerConnFailures(peerConnFailures map[string]int) } // Waku represents a dark communication interface through the Ethereum @@ -1275,7 +1276,9 @@ func (w *Waku) Start() error { } if w.statusTelemetryClient != nil { + connFailures := FormatPeerConnFailures(w.node) w.statusTelemetryClient.PushPeerCount(w.PeerCount()) + w.statusTelemetryClient.PushPeerConnFailures(connFailures) } w.ConnectionChanged(connection.State{ @@ -1953,6 +1956,18 @@ func (w *Waku) StoreNode() *store.WakuStore { return w.node.Store() } +func FormatPeerConnFailures(wakuNode *node.WakuNode) map[string]int { + p := make(map[string]int) + for _, peerID := range wakuNode.Host().Network().Peers() { + peerInfo := wakuNode.Host().Peerstore().PeerInfo(peerID) + connFailures := wakuNode.Host().Peerstore().(wps.WakuPeerstore).ConnFailures(peerInfo) + if connFailures > 0 { + p[peerID.String()] = connFailures + } + } + return p +} + func (w *Waku) LegacyStoreNode() legacy_store.Store { return w.node.LegacyStore() }