feat_: add lightpush and filter bandwidth usage to telemetry (#5547)

This commit is contained in:
richΛrd 2024-07-24 14:28:38 -04:00 committed by GitHub
parent 7c4b43b4d9
commit 5b36cf8267
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 58 additions and 20 deletions

View File

@ -9,7 +9,13 @@ import (
"github.com/google/uuid"
"github.com/libp2p/go-libp2p/core/metrics"
"github.com/libp2p/go-libp2p/core/protocol"
"go.uber.org/zap"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_store"
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
)
type BandwidthTelemetryClient struct {
@ -28,25 +34,29 @@ func NewBandwidthTelemetryClient(logger *zap.Logger, serverURL string) *Bandwidt
}
}
func (c *BandwidthTelemetryClient) PushProtocolStats(relayStats metrics.Stats, storeStats metrics.Stats) {
url := fmt.Sprintf("%s/protocol-stats", c.serverURL)
postBody := map[string]interface{}{
"hostID": c.hostID,
"relay": map[string]interface{}{
"rateIn": relayStats.RateIn,
"rateOut": relayStats.RateOut,
"totalIn": relayStats.TotalIn,
"totalOut": relayStats.TotalOut,
},
"store": map[string]interface{}{
"rateIn": storeStats.RateIn,
"rateOut": storeStats.RateOut,
"totalIn": storeStats.TotalIn,
"totalOut": storeStats.TotalOut,
},
func getStatsPerProtocol(protocolID protocol.ID, stats map[protocol.ID]metrics.Stats) map[string]interface{} {
return map[string]interface{}{
"rateIn": stats[protocolID].RateIn,
"rateOut": stats[protocolID].RateOut,
"totalIn": stats[protocolID].TotalIn,
"totalOut": stats[protocolID].TotalOut,
}
}
body, _ := json.Marshal(postBody)
func (c *BandwidthTelemetryClient) getTelemetryRequestBody(stats map[protocol.ID]metrics.Stats) map[string]interface{} {
return map[string]interface{}{
"hostID": c.hostID,
"relay": getStatsPerProtocol(relay.WakuRelayID_v200, stats),
"store": getStatsPerProtocol(legacy_store.StoreID_v20beta4, stats),
"filter-push": getStatsPerProtocol(filter.FilterPushID_v20beta1, stats),
"filter-subscribe": getStatsPerProtocol(filter.FilterSubscribeID_v20beta1, stats),
"lightpush": getStatsPerProtocol(lightpush.LightPushID_v20beta1, stats),
}
}
func (c *BandwidthTelemetryClient) PushProtocolStats(stats map[protocol.ID]metrics.Stats) {
url := fmt.Sprintf("%s/protocol-stats", c.serverURL)
body, _ := json.Marshal(c.getTelemetryRequestBody(stats))
_, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(body))
if err != nil {
c.logger.Error("Error sending message to telemetry server", zap.Error(err))

View File

@ -529,9 +529,7 @@ func (w *Waku) telemetryBandwidthStats(telemetryServerURL string) {
w.bandwidthCounter.Reset()
}
storeStats := w.bandwidthCounter.GetBandwidthForProtocol(legacy_store.StoreID_v20beta4)
relayStats := w.bandwidthCounter.GetBandwidthForProtocol(relay.WakuRelayID_v200)
go telemetry.PushProtocolStats(relayStats, storeStats)
go telemetry.PushProtocolStats(w.bandwidthCounter.GetBandwidthByProtocol())
}
}
}

View File

@ -3,6 +3,7 @@ package wakuv2
import (
"context"
"crypto/rand"
"encoding/json"
"errors"
"math/big"
"os"
@ -13,6 +14,8 @@ import (
"go.uber.org/zap"
"github.com/cenkalti/backoff/v3"
"github.com/libp2p/go-libp2p/core/metrics"
"github.com/libp2p/go-libp2p/core/protocol"
ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
@ -28,7 +31,9 @@ import (
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_store"
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/status-im/status-go/appdatabase"
"github.com/status-im/status-go/connection"
@ -785,3 +790,28 @@ func TestLightpushRateLimit(t *testing.T) {
require.Len(t, messages, 2)
}
func TestTelemetryFormat(t *testing.T) {
logger, err := zap.NewDevelopment()
require.NoError(t, err)
tc := NewBandwidthTelemetryClient(logger, "#")
s := metrics.Stats{
TotalIn: 10,
TotalOut: 20,
RateIn: 30,
RateOut: 40,
}
m := make(map[protocol.ID]metrics.Stats)
m[relay.WakuRelayID_v200] = s
m[filter.FilterPushID_v20beta1] = s
m[filter.FilterSubscribeID_v20beta1] = s
m[legacy_store.StoreID_v20beta4] = s
m[lightpush.LightPushID_v20beta1] = s
requestBody := tc.getTelemetryRequestBody(m)
_, err = json.Marshal(requestBody)
require.NoError(t, err)
}