feat(telemetry)_: track peers by shard and origin

This commit is contained in:
Arseniy Klempner 2024-09-09 16:34:31 -07:00
parent e3dd2b2377
commit 00a926759b
No known key found for this signature in database
GPG Key ID: 51653F18863BD24B
2 changed files with 117 additions and 10 deletions

View File

@ -16,9 +16,9 @@ import (
"github.com/status-im/status-go/protocol/transport" "github.com/status-im/status-go/protocol/transport"
"github.com/status-im/status-go/wakuv2" "github.com/status-im/status-go/wakuv2"
v2protocol "github.com/waku-org/go-waku/waku/v2/protocol"
v1protocol "github.com/status-im/status-go/protocol/v1" v1protocol "github.com/status-im/status-go/protocol/v1"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
v2protocol "github.com/waku-org/go-waku/waku/v2/protocol"
) )
type TelemetryType string type TelemetryType string
@ -32,8 +32,9 @@ const (
ErrorSendingEnvelopeMetric TelemetryType = "ErrorSendingEnvelope" ErrorSendingEnvelopeMetric TelemetryType = "ErrorSendingEnvelope"
PeerCountMetric TelemetryType = "PeerCount" PeerCountMetric TelemetryType = "PeerCount"
PeerConnFailuresMetric TelemetryType = "PeerConnFailure" PeerConnFailuresMetric TelemetryType = "PeerConnFailure"
PeerCountByShardMetric TelemetryType = "PeerCountByShard"
MaxRetryCache = 5000 PeerCountByOriginMetric TelemetryType = "PeerCountByOrigin"
MaxRetryCache = 5000
) )
type TelemetryRequest struct { type TelemetryRequest struct {
@ -79,6 +80,18 @@ func (c *Client) PushPeerConnFailures(ctx context.Context, peerConnFailures map[
} }
} }
func (c *Client) PushPeerCountByShard(ctx context.Context, peerCountByShard map[uint16]uint) {
for shard, count := range peerCountByShard {
c.processAndPushTelemetry(ctx, PeerCountByShard{Shard: shard, Count: count})
}
}
func (c *Client) PushPeerCountByOrigin(ctx context.Context, peerCountByOrigin map[wps.Origin]uint) {
for origin, count := range peerCountByOrigin {
c.processAndPushTelemetry(ctx, PeerCountByOrigin{Origin: origin, Count: count})
}
}
type ReceivedMessages struct { type ReceivedMessages struct {
Filter transport.Filter Filter transport.Filter
SSHMessage *types.Message SSHMessage *types.Message
@ -94,6 +107,16 @@ type PeerConnFailure struct {
FailureCount int FailureCount int
} }
type PeerCountByShard struct {
Shard uint16
Count uint
}
type PeerCountByOrigin struct {
Origin wps.Origin
Count uint
}
type Client struct { type Client struct {
serverURL string serverURL string
httpClient *http.Client httpClient *http.Client
@ -246,6 +269,18 @@ func (c *Client) processAndPushTelemetry(ctx context.Context, data interface{})
TelemetryType: PeerConnFailuresMetric, TelemetryType: PeerConnFailuresMetric,
TelemetryData: c.ProcessPeerConnFailure(v), TelemetryData: c.ProcessPeerConnFailure(v),
} }
case PeerCountByShard:
telemetryRequest = TelemetryRequest{
Id: c.nextId,
TelemetryType: PeerCountByShardMetric,
TelemetryData: c.ProcessPeerCountByShard(v),
}
case PeerCountByOrigin:
telemetryRequest = TelemetryRequest{
Id: c.nextId,
TelemetryType: PeerCountByOriginMetric,
TelemetryData: c.ProcessPeerCountByOrigin(v),
}
default: default:
c.logger.Error("Unknown telemetry data type") c.logger.Error("Unknown telemetry data type")
return return
@ -383,6 +418,24 @@ func (c *Client) ProcessPeerConnFailure(peerConnFailure PeerConnFailure) *json.R
return &jsonRawMessage return &jsonRawMessage
} }
func (c *Client) ProcessPeerCountByShard(peerCountByShard PeerCountByShard) *json.RawMessage {
postBody := c.commonPostBody()
postBody["shard"] = peerCountByShard.Shard
postBody["count"] = peerCountByShard.Count
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
}
func (c *Client) ProcessPeerCountByOrigin(peerCountByOrigin PeerCountByOrigin) *json.RawMessage {
postBody := c.commonPostBody()
postBody["origin"] = peerCountByOrigin.Origin
postBody["count"] = peerCountByOrigin.Count
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
}
func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) { func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) {
c.logger.Debug("Pushing envelope update to telemetry server", zap.String("hash", types.EncodeHex(shhMessage.Hash))) c.logger.Debug("Pushing envelope update to telemetry server", zap.String("hash", types.EncodeHex(shhMessage.Hash)))
url := fmt.Sprintf("%s/update-envelope", c.serverURL) url := fmt.Sprintf("%s/update-envelope", c.serverURL)

View File

@ -109,6 +109,8 @@ type ITelemetryClient interface {
PushErrorSendingEnvelope(ctx context.Context, errorSendingEnvelope ErrorSendingEnvelope) PushErrorSendingEnvelope(ctx context.Context, errorSendingEnvelope ErrorSendingEnvelope)
PushPeerCount(ctx context.Context, peerCount int) PushPeerCount(ctx context.Context, peerCount int)
PushPeerConnFailures(ctx context.Context, peerConnFailures map[string]int) PushPeerConnFailures(ctx context.Context, peerConnFailures map[string]int)
PushPeerCountByShard(ctx context.Context, peerCountByShard map[uint16]uint)
PushPeerCountByOrigin(ctx context.Context, peerCountByOrigin map[wps.Origin]uint)
} }
// Waku represents a dark communication interface through the Ethereum // Waku represents a dark communication interface through the Ethereum
@ -1092,6 +1094,22 @@ func (w *Waku) Start() error {
} }
}() }()
if w.cfg.TelemetryServerURL != "" {
go func() {
peerTelemetryTicker := time.NewTicker(10 * time.Second)
defer peerTelemetryTicker.Stop()
for {
select {
case <-w.ctx.Done():
return
case <-peerTelemetryTicker.C:
w.reportPeerMetrics()
}
}
}()
}
go w.telemetryBandwidthStats(w.cfg.TelemetryServerURL) go w.telemetryBandwidthStats(w.cfg.TelemetryServerURL)
//TODO: commenting for now so that only fleet nodes are used. //TODO: commenting for now so that only fleet nodes are used.
//Need to uncomment once filter peer scoring etc is implemented. //Need to uncomment once filter peer scoring etc is implemented.
@ -1187,18 +1205,54 @@ func (w *Waku) checkForConnectionChanges() {
w.onPeerStats(latestConnStatus) w.onPeerStats(latestConnStatus)
} }
if w.statusTelemetryClient != nil {
connFailures := FormatPeerConnFailures(w.node)
w.statusTelemetryClient.PushPeerCount(w.ctx, w.PeerCount())
w.statusTelemetryClient.PushPeerConnFailures(w.ctx, connFailures)
}
w.ConnectionChanged(connection.State{ w.ConnectionChanged(connection.State{
Type: w.state.Type, //setting state type as previous one since there won't be a change here Type: w.state.Type, //setting state type as previous one since there won't be a change here
Offline: !latestConnStatus.IsOnline, Offline: !latestConnStatus.IsOnline,
}) })
} }
func (w *Waku) reportPeerMetrics() {
if w.statusTelemetryClient != nil {
connFailures := FormatPeerConnFailures(w.node)
w.statusTelemetryClient.PushPeerCount(w.ctx, w.PeerCount())
w.statusTelemetryClient.PushPeerConnFailures(w.ctx, connFailures)
peerCountByOrigin := make(map[wps.Origin]uint)
peerCountByShard := make(map[uint16]uint)
for _, peerID := range w.node.Host().Network().Peers() {
wakuPeerStore := w.node.Host().Peerstore().(wps.WakuPeerstore)
origin, err := wakuPeerStore.Origin(peerID)
if err != nil {
continue
}
peerCountByOrigin[origin]++
pubsubTopics, err := wakuPeerStore.PubSubTopics(peerID)
if err != nil {
continue
}
keys := make([]string, 0, len(pubsubTopics))
for k := range pubsubTopics {
keys = append(keys, k)
}
relayShards, err := protocol.TopicsToRelayShards(keys...)
if err != nil {
continue
}
for _, shards := range relayShards {
for _, shard := range shards.ShardIDs {
peerCountByShard[shard]++
}
}
}
w.statusTelemetryClient.PushPeerCountByShard(w.ctx, peerCountByShard)
w.statusTelemetryClient.PushPeerCountByOrigin(w.ctx, peerCountByOrigin)
}
}
func (w *Waku) startMessageSender() error { func (w *Waku) startMessageSender() error {
publishMethod := publish.Relay publishMethod := publish.Relay
if w.cfg.LightClient { if w.cfg.LightClient {