diff --git a/telemetry/client.go b/telemetry/client.go index 7194c70c8..b2e48d746 100644 --- a/telemetry/client.go +++ b/telemetry/client.go @@ -16,9 +16,9 @@ import ( "github.com/status-im/status-go/protocol/transport" "github.com/status-im/status-go/wakuv2" - v2protocol "github.com/waku-org/go-waku/waku/v2/protocol" - v1protocol "github.com/status-im/status-go/protocol/v1" + wps "github.com/waku-org/go-waku/waku/v2/peerstore" + v2protocol "github.com/waku-org/go-waku/waku/v2/protocol" ) type TelemetryType string @@ -32,8 +32,9 @@ const ( ErrorSendingEnvelopeMetric TelemetryType = "ErrorSendingEnvelope" PeerCountMetric TelemetryType = "PeerCount" PeerConnFailuresMetric TelemetryType = "PeerConnFailure" - - MaxRetryCache = 5000 + PeerCountByShardMetric TelemetryType = "PeerCountByShard" + PeerCountByOriginMetric TelemetryType = "PeerCountByOrigin" + MaxRetryCache = 5000 ) type TelemetryRequest struct { @@ -79,6 +80,18 @@ func (c *Client) PushPeerConnFailures(ctx context.Context, peerConnFailures map[ } } +func (c *Client) PushPeerCountByShard(ctx context.Context, peerCountByShard map[uint16]uint) { + for shard, count := range peerCountByShard { + c.processAndPushTelemetry(ctx, PeerCountByShard{Shard: shard, Count: count}) + } +} + +func (c *Client) PushPeerCountByOrigin(ctx context.Context, peerCountByOrigin map[wps.Origin]uint) { + for origin, count := range peerCountByOrigin { + c.processAndPushTelemetry(ctx, PeerCountByOrigin{Origin: origin, Count: count}) + } +} + type ReceivedMessages struct { Filter transport.Filter SSHMessage *types.Message @@ -94,6 +107,16 @@ type PeerConnFailure struct { FailureCount int } +type PeerCountByShard struct { + Shard uint16 + Count uint +} + +type PeerCountByOrigin struct { + Origin wps.Origin + Count uint +} + type Client struct { serverURL string httpClient *http.Client @@ -246,6 +269,18 @@ func (c *Client) processAndPushTelemetry(ctx context.Context, data interface{}) TelemetryType: PeerConnFailuresMetric, TelemetryData: c.ProcessPeerConnFailure(v), } + case PeerCountByShard: + telemetryRequest = TelemetryRequest{ + Id: c.nextId, + TelemetryType: PeerCountByShardMetric, + TelemetryData: c.ProcessPeerCountByShard(v), + } + case PeerCountByOrigin: + telemetryRequest = TelemetryRequest{ + Id: c.nextId, + TelemetryType: PeerCountByOriginMetric, + TelemetryData: c.ProcessPeerCountByOrigin(v), + } default: c.logger.Error("Unknown telemetry data type") return @@ -383,6 +418,24 @@ func (c *Client) ProcessPeerConnFailure(peerConnFailure PeerConnFailure) *json.R return &jsonRawMessage } +func (c *Client) ProcessPeerCountByShard(peerCountByShard PeerCountByShard) *json.RawMessage { + postBody := c.commonPostBody() + postBody["shard"] = peerCountByShard.Shard + postBody["count"] = peerCountByShard.Count + body, _ := json.Marshal(postBody) + jsonRawMessage := json.RawMessage(body) + return &jsonRawMessage +} + +func (c *Client) ProcessPeerCountByOrigin(peerCountByOrigin PeerCountByOrigin) *json.RawMessage { + postBody := c.commonPostBody() + postBody["origin"] = peerCountByOrigin.Origin + postBody["count"] = peerCountByOrigin.Count + body, _ := json.Marshal(postBody) + jsonRawMessage := json.RawMessage(body) + return &jsonRawMessage +} + func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) { c.logger.Debug("Pushing envelope update to telemetry server", zap.String("hash", types.EncodeHex(shhMessage.Hash))) url := fmt.Sprintf("%s/update-envelope", c.serverURL) diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 574381b5e..f29b0b9a6 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -109,6 +109,8 @@ type ITelemetryClient interface { PushErrorSendingEnvelope(ctx context.Context, errorSendingEnvelope ErrorSendingEnvelope) PushPeerCount(ctx context.Context, peerCount int) PushPeerConnFailures(ctx context.Context, peerConnFailures map[string]int) + PushPeerCountByShard(ctx context.Context, peerCountByShard map[uint16]uint) + PushPeerCountByOrigin(ctx context.Context, peerCountByOrigin map[wps.Origin]uint) } // Waku represents a dark communication interface through the Ethereum @@ -1092,6 +1094,22 @@ func (w *Waku) Start() error { } }() + if w.cfg.TelemetryServerURL != "" { + go func() { + peerTelemetryTicker := time.NewTicker(10 * time.Second) + defer peerTelemetryTicker.Stop() + + for { + select { + case <-w.ctx.Done(): + return + case <-peerTelemetryTicker.C: + w.reportPeerMetrics() + } + } + }() + } + go w.telemetryBandwidthStats(w.cfg.TelemetryServerURL) //TODO: commenting for now so that only fleet nodes are used. //Need to uncomment once filter peer scoring etc is implemented. @@ -1187,18 +1205,54 @@ func (w *Waku) checkForConnectionChanges() { w.onPeerStats(latestConnStatus) } - if w.statusTelemetryClient != nil { - connFailures := FormatPeerConnFailures(w.node) - w.statusTelemetryClient.PushPeerCount(w.ctx, w.PeerCount()) - w.statusTelemetryClient.PushPeerConnFailures(w.ctx, connFailures) - } - w.ConnectionChanged(connection.State{ Type: w.state.Type, //setting state type as previous one since there won't be a change here Offline: !latestConnStatus.IsOnline, }) } +func (w *Waku) reportPeerMetrics() { + if w.statusTelemetryClient != nil { + connFailures := FormatPeerConnFailures(w.node) + w.statusTelemetryClient.PushPeerCount(w.ctx, w.PeerCount()) + w.statusTelemetryClient.PushPeerConnFailures(w.ctx, connFailures) + + peerCountByOrigin := make(map[wps.Origin]uint) + peerCountByShard := make(map[uint16]uint) + for _, peerID := range w.node.Host().Network().Peers() { + wakuPeerStore := w.node.Host().Peerstore().(wps.WakuPeerstore) + + origin, err := wakuPeerStore.Origin(peerID) + if err != nil { + continue + } + + peerCountByOrigin[origin]++ + pubsubTopics, err := wakuPeerStore.PubSubTopics(peerID) + if err != nil { + continue + } + + keys := make([]string, 0, len(pubsubTopics)) + for k := range pubsubTopics { + keys = append(keys, k) + } + relayShards, err := protocol.TopicsToRelayShards(keys...) + if err != nil { + continue + } + + for _, shards := range relayShards { + for _, shard := range shards.ShardIDs { + peerCountByShard[shard]++ + } + } + } + w.statusTelemetryClient.PushPeerCountByShard(w.ctx, peerCountByShard) + w.statusTelemetryClient.PushPeerCountByOrigin(w.ctx, peerCountByOrigin) + } +} + func (w *Waku) startMessageSender() error { publishMethod := publish.Relay if w.cfg.LightClient {