feat: protocol stats
This commit is contained in:
parent
9e43905821
commit
ab2ff4eeb1
|
@ -618,3 +618,7 @@ func (db *Database) GetTestNetworksEnabled() (result bool, err error) {
|
|||
}
|
||||
return result, err
|
||||
}
|
||||
|
||||
func (db *Database) GetTelemetryServerURL() (string, error) {
|
||||
return db.makeSelectString(TelemetryServerURL)
|
||||
}
|
||||
|
|
|
@ -107,7 +107,12 @@ func (b *StatusNode) initServices(config *params.NodeConfig, mediaServer *server
|
|||
}
|
||||
|
||||
if config.WakuV2Config.Enabled {
|
||||
waku2Service, err := b.wakuV2Service(config)
|
||||
telemetryServerURL, err := accDB.GetTelemetryServerURL()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
waku2Service, err := b.wakuV2Service(config, telemetryServerURL)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -278,7 +283,7 @@ func (b *StatusNode) wakuService(wakuCfg *params.WakuConfig, clusterCfg *params.
|
|||
|
||||
}
|
||||
|
||||
func (b *StatusNode) wakuV2Service(nodeConfig *params.NodeConfig) (*wakuv2.Waku, error) {
|
||||
func (b *StatusNode) wakuV2Service(nodeConfig *params.NodeConfig, telemetryServerURL string) (*wakuv2.Waku, error) {
|
||||
if b.wakuV2Srvc == nil {
|
||||
cfg := &wakuv2.Config{
|
||||
MaxMessageSize: wakucommon.DefaultMaxMessageSize,
|
||||
|
@ -298,6 +303,7 @@ func (b *StatusNode) wakuV2Service(nodeConfig *params.NodeConfig) (*wakuv2.Waku,
|
|||
EnableDiscV5: nodeConfig.WakuV2Config.EnableDiscV5,
|
||||
UDPPort: nodeConfig.WakuV2Config.UDPPort,
|
||||
AutoUpdate: nodeConfig.WakuV2Config.AutoUpdate,
|
||||
TelemetryServerURL: telemetryServerURL,
|
||||
}
|
||||
|
||||
if nodeConfig.WakuV2Config.MaxMessageSize > 0 {
|
||||
|
|
|
@ -42,6 +42,7 @@ type Config struct {
|
|||
EnableStore bool `toml:",omitempty"`
|
||||
StoreCapacity int `toml:",omitempty"`
|
||||
StoreSeconds int `toml:",omitempty"`
|
||||
TelemetryServerURL string `toml:",omitempty"`
|
||||
}
|
||||
|
||||
var DefaultConfig = Config{
|
||||
|
|
|
@ -0,0 +1,53 @@
|
|||
package wakuv2
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/metrics"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type BandwidthTelemetryClient struct {
|
||||
serverURL string
|
||||
httpClient *http.Client
|
||||
hostID string
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
func NewBandwidthTelemetryClient(logger *zap.Logger, serverURL string, hostID string) *BandwidthTelemetryClient {
|
||||
return &BandwidthTelemetryClient{
|
||||
serverURL: serverURL,
|
||||
httpClient: &http.Client{Timeout: time.Minute},
|
||||
hostID: hostID,
|
||||
logger: logger.Named("bandwidth-telemetry"),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *BandwidthTelemetryClient) PushProtocolStats(relayStats metrics.Stats, storeStats metrics.Stats) {
|
||||
url := fmt.Sprintf("%s/protocol-stats", c.serverURL)
|
||||
postBody := map[string]interface{}{
|
||||
"hostID": c.hostID,
|
||||
"relay": map[string]interface{}{
|
||||
"rateIn": relayStats.RateIn,
|
||||
"rateOut": relayStats.RateOut,
|
||||
"totalIn": relayStats.TotalIn,
|
||||
"totalOut": relayStats.TotalOut,
|
||||
},
|
||||
"store": map[string]interface{}{
|
||||
"rateIn": storeStats.RateIn,
|
||||
"rateOut": storeStats.RateOut,
|
||||
"totalIn": storeStats.TotalIn,
|
||||
"totalOut": storeStats.TotalOut,
|
||||
},
|
||||
}
|
||||
|
||||
body, _ := json.Marshal(postBody)
|
||||
_, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(body))
|
||||
if err != nil {
|
||||
c.logger.Error("Error sending message to telemetry server", zap.Error(err))
|
||||
}
|
||||
}
|
|
@ -368,6 +368,7 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s
|
|||
}
|
||||
}()
|
||||
|
||||
go waku.telemetryBandwidthStats(cfg.TelemetryServerURL)
|
||||
go waku.runFilterMsgLoop()
|
||||
go waku.runRelayMsgLoop()
|
||||
go waku.runPeerExchangeLoop()
|
||||
|
@ -552,6 +553,28 @@ func (w *Waku) identifyAndConnect(ctx context.Context, isLightClient bool, ma mu
|
|||
}
|
||||
}
|
||||
|
||||
func (w *Waku) telemetryBandwidthStats(telemetryServerURL string) {
|
||||
if telemetryServerURL == "" {
|
||||
return
|
||||
}
|
||||
|
||||
telemetry := NewBandwidthTelemetryClient(w.logger, telemetryServerURL, w.node.ID())
|
||||
|
||||
ticker := time.NewTicker(time.Second * 20)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-w.quit:
|
||||
return
|
||||
case <-ticker.C:
|
||||
storeStats := w.bandwidthCounter.GetBandwidthForProtocol(store.StoreID_v20beta4)
|
||||
relayStats := w.bandwidthCounter.GetBandwidthForProtocol(relay.WakuRelayID_v200)
|
||||
go telemetry.PushProtocolStats(relayStats, storeStats)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Waku) GetStats() types.StatsSummary {
|
||||
stats := w.bandwidthCounter.GetBandwidthTotals()
|
||||
return types.StatsSummary{
|
||||
|
|
Loading…
Reference in New Issue