feat(telemetry)_: message check success and failure, peers by shard and origin (#5824)

* feat(telemetry)_: track message check success and failure

* feat(telemetry)_: track peers by shard and origin
This commit is contained in:
Arseniy Klempner 2024-09-18 21:43:04 -07:00 committed by GitHub
parent 946ee4e496
commit 11a27bb2bd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 254 additions and 11 deletions

View File

@ -16,9 +16,9 @@ import (
"github.com/status-im/status-go/protocol/transport"
"github.com/status-im/status-go/wakuv2"
v2protocol "github.com/waku-org/go-waku/waku/v2/protocol"
v1protocol "github.com/status-im/status-go/protocol/v1"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
v2protocol "github.com/waku-org/go-waku/waku/v2/protocol"
)
type TelemetryType string
@ -32,7 +32,10 @@ const (
ErrorSendingEnvelopeMetric TelemetryType = "ErrorSendingEnvelope"
PeerCountMetric TelemetryType = "PeerCount"
PeerConnFailuresMetric TelemetryType = "PeerConnFailure"
MessageCheckSuccessMetric TelemetryType = "MessageCheckSuccess"
MessageCheckFailureMetric TelemetryType = "MessageCheckFailure"
PeerCountByShardMetric TelemetryType = "PeerCountByShard"
PeerCountByOriginMetric TelemetryType = "PeerCountByOrigin"
MaxRetryCache = 5000
)
@ -79,6 +82,26 @@ func (c *Client) PushPeerConnFailures(ctx context.Context, peerConnFailures map[
}
}
func (c *Client) PushMessageCheckSuccess(ctx context.Context, messageHash string) {
c.processAndPushTelemetry(ctx, MessageCheckSuccess{MessageHash: messageHash})
}
func (c *Client) PushMessageCheckFailure(ctx context.Context, messageHash string) {
c.processAndPushTelemetry(ctx, MessageCheckFailure{MessageHash: messageHash})
}
func (c *Client) PushPeerCountByShard(ctx context.Context, peerCountByShard map[uint16]uint) {
for shard, count := range peerCountByShard {
c.processAndPushTelemetry(ctx, PeerCountByShard{Shard: shard, Count: count})
}
}
func (c *Client) PushPeerCountByOrigin(ctx context.Context, peerCountByOrigin map[wps.Origin]uint) {
for origin, count := range peerCountByOrigin {
c.processAndPushTelemetry(ctx, PeerCountByOrigin{Origin: origin, Count: count})
}
}
type ReceivedMessages struct {
Filter transport.Filter
SSHMessage *types.Message
@ -94,6 +117,24 @@ type PeerConnFailure struct {
FailureCount int
}
type MessageCheckSuccess struct {
MessageHash string
}
type MessageCheckFailure struct {
MessageHash string
}
type PeerCountByShard struct {
Shard uint16
Count uint
}
type PeerCountByOrigin struct {
Origin wps.Origin
Count uint
}
type Client struct {
serverURL string
httpClient *http.Client
@ -246,6 +287,30 @@ func (c *Client) processAndPushTelemetry(ctx context.Context, data interface{})
TelemetryType: PeerConnFailuresMetric,
TelemetryData: c.ProcessPeerConnFailure(v),
}
case MessageCheckSuccess:
telemetryRequest = TelemetryRequest{
Id: c.nextId,
TelemetryType: MessageCheckSuccessMetric,
TelemetryData: c.ProcessMessageCheckSuccess(v),
}
case MessageCheckFailure:
telemetryRequest = TelemetryRequest{
Id: c.nextId,
TelemetryType: MessageCheckFailureMetric,
TelemetryData: c.ProcessMessageCheckFailure(v),
}
case PeerCountByShard:
telemetryRequest = TelemetryRequest{
Id: c.nextId,
TelemetryType: PeerCountByShardMetric,
TelemetryData: c.ProcessPeerCountByShard(v),
}
case PeerCountByOrigin:
telemetryRequest = TelemetryRequest{
Id: c.nextId,
TelemetryType: PeerCountByOriginMetric,
TelemetryData: c.ProcessPeerCountByOrigin(v),
}
default:
c.logger.Error("Unknown telemetry data type")
return
@ -383,6 +448,40 @@ func (c *Client) ProcessPeerConnFailure(peerConnFailure PeerConnFailure) *json.R
return &jsonRawMessage
}
func (c *Client) ProcessMessageCheckSuccess(messageCheckSuccess MessageCheckSuccess) *json.RawMessage {
postBody := c.commonPostBody()
postBody["messageHash"] = messageCheckSuccess.MessageHash
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
}
func (c *Client) ProcessPeerCountByShard(peerCountByShard PeerCountByShard) *json.RawMessage {
postBody := c.commonPostBody()
postBody["shard"] = peerCountByShard.Shard
postBody["count"] = peerCountByShard.Count
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
}
func (c *Client) ProcessMessageCheckFailure(messageCheckFailure MessageCheckFailure) *json.RawMessage {
postBody := c.commonPostBody()
postBody["messageHash"] = messageCheckFailure.MessageHash
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
}
func (c *Client) ProcessPeerCountByOrigin(peerCountByOrigin PeerCountByOrigin) *json.RawMessage {
postBody := c.commonPostBody()
postBody["origin"] = peerCountByOrigin.Origin
postBody["count"] = peerCountByOrigin.Count
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
}
func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) {
c.logger.Debug("Pushing envelope update to telemetry server", zap.String("hash", types.EncodeHex(shhMessage.Hash)))
url := fmt.Sprintf("%s/update-envelope", c.serverURL)

View File

@ -357,7 +357,7 @@ func setDefaultConfig(config *wakuv2.Config, lightMode bool) {
var testStoreENRBootstrap = "enrtree://AI4W5N5IFEUIHF5LESUAOSMV6TKWF2MB6GU2YK7PU4TYUGUNOCEPW@store.staging.shards.nodes.status.im"
func TestPeerCount(t *testing.T) {
t.Skip("flaky test")
// t.Skip("flaky test")
expectedCondition := func(received []TelemetryRequest) (shouldSucceed bool, shouldFail bool) {
found := slices.ContainsFunc(received, func(req TelemetryRequest) bool {
@ -371,6 +371,9 @@ func TestPeerCount(t *testing.T) {
setDefaultConfig(config, false)
config.DiscV5BootstrapNodes = []string{testStoreENRBootstrap}
config.DiscoveryLimit = 20
config.TelemetryServerURL = client.serverURL
config.TelemetrySendPeriodMs = 1500
config.TelemetryPeerCountSendPeriod = 1500
w, err := wakuv2.New(nil, "shards.staging", config, nil, nil, nil, nil, nil)
require.NoError(t, err)
@ -418,3 +421,75 @@ func TestPeerId(t *testing.T) {
})
}
func TestPeerCountByShard(t *testing.T) {
expectedCondition := func(received []TelemetryRequest) (shouldSucceed bool, shouldFail bool) {
found := slices.ContainsFunc(received, func(req TelemetryRequest) bool {
return req.TelemetryType == PeerCountByShardMetric
})
return found, false
}
withMockServer(t, PeerCountByShardMetric, expectedCondition, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) {
config := &wakuv2.Config{}
setDefaultConfig(config, false)
config.DiscV5BootstrapNodes = []string{testStoreENRBootstrap}
config.DiscoveryLimit = 20
config.TelemetryServerURL = client.serverURL
config.TelemetryPeerCountSendPeriod = 1500
config.TelemetrySendPeriodMs = 1500
w, err := wakuv2.New(nil, "shards.staging", config, nil, nil, nil, nil, nil)
require.NoError(t, err)
w.SetStatusTelemetryClient(client)
client.Start(ctx)
require.NoError(t, w.Start())
err = tt.RetryWithBackOff(func() error {
if len(w.Peers()) == 0 {
return errors.New("no peers discovered")
}
return nil
})
require.NoError(t, err)
require.NotEqual(t, 0, len(w.Peers()))
})
}
func TestPeerCountByOrigin(t *testing.T) {
expectedCondition := func(received []TelemetryRequest) (shouldSucceed bool, shouldFail bool) {
found := slices.ContainsFunc(received, func(req TelemetryRequest) bool {
return req.TelemetryType == PeerCountByOriginMetric
})
return found, false
}
withMockServer(t, PeerCountByOriginMetric, expectedCondition, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) {
config := &wakuv2.Config{}
setDefaultConfig(config, false)
config.DiscV5BootstrapNodes = []string{testStoreENRBootstrap}
config.DiscoveryLimit = 20
config.TelemetryServerURL = client.serverURL
config.TelemetryPeerCountSendPeriod = 1500
config.TelemetrySendPeriodMs = 1500
w, err := wakuv2.New(nil, "shards.staging", config, nil, nil, nil, nil, nil)
require.NoError(t, err)
w.SetStatusTelemetryClient(client)
client.Start(ctx)
require.NoError(t, w.Start())
err = tt.RetryWithBackOff(func() error {
if len(w.Peers()) == 0 {
return errors.New("no peers discovered")
}
return nil
})
require.NoError(t, err)
require.NotEqual(t, 0, len(w.Peers()))
})
}

View File

@ -58,6 +58,7 @@ type Config struct {
StoreSeconds int `toml:",omitempty"`
TelemetryServerURL string `toml:",omitempty"`
TelemetrySendPeriodMs int `toml:",omitempty"` // Number of milliseconds to wait between sending requests to telemetry service
TelemetryPeerCountSendPeriod int `toml:",omitempty"` // Number of milliseconds to wait between checking peer count
DefaultShardPubsubTopic string `toml:",omitempty"` // Pubsub topic to be used by default for messages that do not have a topic assigned (depending whether sharding is used or not)
DefaultShardedPubsubTopics []string `toml:", omitempty"`
ClusterID uint16 `toml:",omitempty"`

View File

@ -109,6 +109,10 @@ type ITelemetryClient interface {
PushErrorSendingEnvelope(ctx context.Context, errorSendingEnvelope ErrorSendingEnvelope)
PushPeerCount(ctx context.Context, peerCount int)
PushPeerConnFailures(ctx context.Context, peerConnFailures map[string]int)
PushMessageCheckSuccess(ctx context.Context, messageHash string)
PushMessageCheckFailure(ctx context.Context, messageHash string)
PushPeerCountByShard(ctx context.Context, peerCountByShard map[uint16]uint)
PushPeerCountByOrigin(ctx context.Context, peerCountByOrigin map[wps.Origin]uint)
}
// Waku represents a dark communication interface through the Ethereum
@ -1090,6 +1094,28 @@ func (w *Waku) Start() error {
}
}()
if w.cfg.TelemetryServerURL != "" {
w.wg.Add(1)
go func() {
defer w.wg.Done()
peerTelemetryTickerInterval := time.Duration(w.cfg.TelemetryPeerCountSendPeriod) * time.Millisecond
if peerTelemetryTickerInterval == 0 {
peerTelemetryTickerInterval = 10 * time.Second
}
peerTelemetryTicker := time.NewTicker(peerTelemetryTickerInterval)
defer peerTelemetryTicker.Stop()
for {
select {
case <-w.ctx.Done():
return
case <-peerTelemetryTicker.C:
w.reportPeerMetrics()
}
}
}()
}
w.wg.Add(1)
go w.telemetryBandwidthStats(w.cfg.TelemetryServerURL)
//TODO: commenting for now so that only fleet nodes are used.
@ -1191,18 +1217,54 @@ func (w *Waku) checkForConnectionChanges() {
w.onPeerStats(latestConnStatus)
}
if w.statusTelemetryClient != nil {
connFailures := FormatPeerConnFailures(w.node)
w.statusTelemetryClient.PushPeerCount(w.ctx, w.PeerCount())
w.statusTelemetryClient.PushPeerConnFailures(w.ctx, connFailures)
}
w.ConnectionChanged(connection.State{
Type: w.state.Type, //setting state type as previous one since there won't be a change here
Offline: !latestConnStatus.IsOnline,
})
}
func (w *Waku) reportPeerMetrics() {
if w.statusTelemetryClient != nil {
connFailures := FormatPeerConnFailures(w.node)
w.statusTelemetryClient.PushPeerCount(w.ctx, w.PeerCount())
w.statusTelemetryClient.PushPeerConnFailures(w.ctx, connFailures)
peerCountByOrigin := make(map[wps.Origin]uint)
peerCountByShard := make(map[uint16]uint)
wakuPeerStore := w.node.Host().Peerstore().(wps.WakuPeerstore)
for _, peerID := range w.node.Host().Network().Peers() {
origin, err := wakuPeerStore.Origin(peerID)
if err != nil {
origin = wps.Unknown
}
peerCountByOrigin[origin]++
pubsubTopics, err := wakuPeerStore.PubSubTopics(peerID)
if err != nil {
continue
}
keys := make([]string, 0, len(pubsubTopics))
for k := range pubsubTopics {
keys = append(keys, k)
}
relayShards, err := protocol.TopicsToRelayShards(keys...)
if err != nil {
continue
}
for _, shards := range relayShards {
for _, shard := range shards.ShardIDs {
peerCountByShard[shard]++
}
}
}
w.statusTelemetryClient.PushPeerCountByShard(w.ctx, peerCountByShard)
w.statusTelemetryClient.PushPeerCountByOrigin(w.ctx, peerCountByOrigin)
}
}
func (w *Waku) startMessageSender() error {
publishMethod := publish.Relay
if w.cfg.LightClient {
@ -1233,11 +1295,17 @@ func (w *Waku) startMessageSender() error {
Hash: hash,
Event: common.EventEnvelopeSent,
})
if w.statusTelemetryClient != nil {
w.statusTelemetryClient.PushMessageCheckSuccess(w.ctx, hash.Hex())
}
case hash := <-msgExpiredChan:
w.SendEnvelopeEvent(common.EnvelopeEvent{
Hash: hash,
Event: common.EventEnvelopeExpired,
})
if w.statusTelemetryClient != nil {
w.statusTelemetryClient.PushMessageCheckFailure(w.ctx, hash.Hex())
}
}
}
}()