From 5b36cf8267d3e65d2dc6d247e7d88ecb37ad1568 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?rich=CE=9Brd?= Date: Wed, 24 Jul 2024 14:28:38 -0400 Subject: [PATCH] feat_: add lightpush and filter bandwidth usage to telemetry (#5547) --- wakuv2/telemetry.go | 44 +++++++++++++++++++++++++++----------------- wakuv2/waku.go | 4 +--- wakuv2/waku_test.go | 30 ++++++++++++++++++++++++++++++ 3 files changed, 58 insertions(+), 20 deletions(-) diff --git a/wakuv2/telemetry.go b/wakuv2/telemetry.go index 88eaaea52..581225d52 100644 --- a/wakuv2/telemetry.go +++ b/wakuv2/telemetry.go @@ -9,7 +9,13 @@ import ( "github.com/google/uuid" "github.com/libp2p/go-libp2p/core/metrics" + "github.com/libp2p/go-libp2p/core/protocol" "go.uber.org/zap" + + "github.com/waku-org/go-waku/waku/v2/protocol/filter" + "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" + "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" ) type BandwidthTelemetryClient struct { @@ -28,25 +34,29 @@ func NewBandwidthTelemetryClient(logger *zap.Logger, serverURL string) *Bandwidt } } -func (c *BandwidthTelemetryClient) PushProtocolStats(relayStats metrics.Stats, storeStats metrics.Stats) { - url := fmt.Sprintf("%s/protocol-stats", c.serverURL) - postBody := map[string]interface{}{ - "hostID": c.hostID, - "relay": map[string]interface{}{ - "rateIn": relayStats.RateIn, - "rateOut": relayStats.RateOut, - "totalIn": relayStats.TotalIn, - "totalOut": relayStats.TotalOut, - }, - "store": map[string]interface{}{ - "rateIn": storeStats.RateIn, - "rateOut": storeStats.RateOut, - "totalIn": storeStats.TotalIn, - "totalOut": storeStats.TotalOut, - }, +func getStatsPerProtocol(protocolID protocol.ID, stats map[protocol.ID]metrics.Stats) map[string]interface{} { + return map[string]interface{}{ + "rateIn": stats[protocolID].RateIn, + "rateOut": stats[protocolID].RateOut, + "totalIn": stats[protocolID].TotalIn, + "totalOut": stats[protocolID].TotalOut, } +} - body, _ := json.Marshal(postBody) +func (c *BandwidthTelemetryClient) getTelemetryRequestBody(stats map[protocol.ID]metrics.Stats) map[string]interface{} { + return map[string]interface{}{ + "hostID": c.hostID, + "relay": getStatsPerProtocol(relay.WakuRelayID_v200, stats), + "store": getStatsPerProtocol(legacy_store.StoreID_v20beta4, stats), + "filter-push": getStatsPerProtocol(filter.FilterPushID_v20beta1, stats), + "filter-subscribe": getStatsPerProtocol(filter.FilterSubscribeID_v20beta1, stats), + "lightpush": getStatsPerProtocol(lightpush.LightPushID_v20beta1, stats), + } +} + +func (c *BandwidthTelemetryClient) PushProtocolStats(stats map[protocol.ID]metrics.Stats) { + url := fmt.Sprintf("%s/protocol-stats", c.serverURL) + body, _ := json.Marshal(c.getTelemetryRequestBody(stats)) _, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(body)) if err != nil { c.logger.Error("Error sending message to telemetry server", zap.Error(err)) diff --git a/wakuv2/waku.go b/wakuv2/waku.go index efc0cc9ae..149379805 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -529,9 +529,7 @@ func (w *Waku) telemetryBandwidthStats(telemetryServerURL string) { w.bandwidthCounter.Reset() } - storeStats := w.bandwidthCounter.GetBandwidthForProtocol(legacy_store.StoreID_v20beta4) - relayStats := w.bandwidthCounter.GetBandwidthForProtocol(relay.WakuRelayID_v200) - go telemetry.PushProtocolStats(relayStats, storeStats) + go telemetry.PushProtocolStats(w.bandwidthCounter.GetBandwidthByProtocol()) } } } diff --git a/wakuv2/waku_test.go b/wakuv2/waku_test.go index 4e6651458..6a03eaaf1 100644 --- a/wakuv2/waku_test.go +++ b/wakuv2/waku_test.go @@ -3,6 +3,7 @@ package wakuv2 import ( "context" "crypto/rand" + "encoding/json" "errors" "math/big" "os" @@ -13,6 +14,8 @@ import ( "go.uber.org/zap" "github.com/cenkalti/backoff/v3" + "github.com/libp2p/go-libp2p/core/metrics" + "github.com/libp2p/go-libp2p/core/protocol" ethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" @@ -28,7 +31,9 @@ import ( wps "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol/filter" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" + "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/status-im/status-go/appdatabase" "github.com/status-im/status-go/connection" @@ -785,3 +790,28 @@ func TestLightpushRateLimit(t *testing.T) { require.Len(t, messages, 2) } + +func TestTelemetryFormat(t *testing.T) { + logger, err := zap.NewDevelopment() + require.NoError(t, err) + + tc := NewBandwidthTelemetryClient(logger, "#") + + s := metrics.Stats{ + TotalIn: 10, + TotalOut: 20, + RateIn: 30, + RateOut: 40, + } + + m := make(map[protocol.ID]metrics.Stats) + m[relay.WakuRelayID_v200] = s + m[filter.FilterPushID_v20beta1] = s + m[filter.FilterSubscribeID_v20beta1] = s + m[legacy_store.StoreID_v20beta4] = s + m[lightpush.LightPushID_v20beta1] = s + + requestBody := tc.getTelemetryRequestBody(m) + _, err = json.Marshal(requestBody) + require.NoError(t, err) +}