feat(telemetry)_: send peer count metric (#5460)

This commit is contained in:
Arseniy Klempner 2024-07-12 13:37:55 -07:00 committed by GitHub
parent 194d9444a5
commit a006d80acf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 150 additions and 20 deletions

View File

@ -29,6 +29,7 @@ const (
UpdateEnvelopeMetric TelemetryType = "UpdateEnvelope" UpdateEnvelopeMetric TelemetryType = "UpdateEnvelope"
ReceivedMessagesMetric TelemetryType = "ReceivedMessages" ReceivedMessagesMetric TelemetryType = "ReceivedMessages"
ErrorSendingEnvelopeMetric TelemetryType = "ErrorSendingEnvelope" ErrorSendingEnvelopeMetric TelemetryType = "ErrorSendingEnvelope"
PeerCountMetric TelemetryType = "PeerCount"
MaxRetryCache = 5000 MaxRetryCache = 5000
) )
@ -55,12 +56,20 @@ func (c *Client) PushErrorSendingEnvelope(errorSendingEnvelope wakuv2.ErrorSendi
c.processAndPushTelemetry(errorSendingEnvelope) c.processAndPushTelemetry(errorSendingEnvelope)
} }
func (c *Client) PushPeerCount(peerCount int) {
c.processAndPushTelemetry(PeerCount{PeerCount: peerCount})
}
type ReceivedMessages struct { type ReceivedMessages struct {
Filter transport.Filter Filter transport.Filter
SSHMessage *types.Message SSHMessage *types.Message
Messages []*v1protocol.StatusMessage Messages []*v1protocol.StatusMessage
} }
type PeerCount struct {
PeerCount int
}
type Client struct { type Client struct {
serverURL string serverURL string
httpClient *http.Client httpClient *http.Client
@ -183,6 +192,12 @@ func (c *Client) processAndPushTelemetry(data interface{}) {
TelemetryType: ErrorSendingEnvelopeMetric, TelemetryType: ErrorSendingEnvelopeMetric,
TelemetryData: c.ProcessErrorSendingEnvelope(v), TelemetryData: c.ProcessErrorSendingEnvelope(v),
} }
case PeerCount:
telemetryRequest = TelemetryRequest{
Id: c.nextId,
TelemetryType: PeerCountMetric,
TelemetryData: c.ProcessPeerCount(v),
}
default: default:
c.logger.Error("Unknown telemetry data type") c.logger.Error("Unknown telemetry data type")
return return
@ -213,9 +228,15 @@ func (c *Client) pushTelemetryRequest(request []TelemetryRequest) error {
c.logger.Error("Error sending telemetry data", zap.Error(err)) c.logger.Error("Error sending telemetry data", zap.Error(err))
return err return err
} }
if res.StatusCode != http.StatusOK { defer res.Body.Close()
c.logger.Error("Error sending telemetry data", zap.Int("statusCode", res.StatusCode)) var responseBody []map[string]interface{}
return fmt.Errorf("status code %d", res.StatusCode) if err := json.NewDecoder(res.Body).Decode(&responseBody); err != nil {
c.logger.Error("Error decoding response body", zap.Error(err))
return err
}
if res.StatusCode != http.StatusCreated {
c.logger.Error("Error sending telemetry data", zap.Int("statusCode", res.StatusCode), zap.Any("responseBody", responseBody))
return fmt.Errorf("status code %d, response body: %v", res.StatusCode, responseBody)
} }
c.telemetryRetryCache = nil c.telemetryRetryCache = nil
@ -292,6 +313,19 @@ func (c *Client) ProcessErrorSendingEnvelope(errorSendingEnvelope wakuv2.ErrorSe
return &jsonRawMessage return &jsonRawMessage
} }
func (c *Client) ProcessPeerCount(peerCount PeerCount) *json.RawMessage {
postBody := map[string]interface{}{
"peerCount": peerCount.PeerCount,
"nodeName": c.nodeName,
"nodeKeyUID": c.keyUID,
"statusVersion": c.version,
"timestamp": time.Now().Unix(),
}
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
}
func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) { func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) {
c.logger.Debug("Pushing envelope update to telemetry server", zap.String("hash", types.EncodeHex(shhMessage.Hash))) c.logger.Debug("Pushing envelope update to telemetry server", zap.String("hash", types.EncodeHex(shhMessage.Hash)))
url := fmt.Sprintf("%s/update-envelope", c.serverURL) url := fmt.Sprintf("%s/update-envelope", c.serverURL)

View File

@ -3,9 +3,11 @@ package telemetry
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"errors"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"os" "os"
"slices"
"sync" "sync"
"testing" "testing"
"time" "time"
@ -20,6 +22,7 @@ import (
"github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/transport" "github.com/status-im/status-go/protocol/transport"
"github.com/status-im/status-go/protocol/tt"
v1protocol "github.com/status-im/status-go/protocol/v1" v1protocol "github.com/status-im/status-go/protocol/v1"
"github.com/status-im/status-go/wakuv2" "github.com/status-im/status-go/wakuv2"
) )
@ -28,7 +31,7 @@ var (
testContentTopic = "/waku/1/0x12345679/rfc26" testContentTopic = "/waku/1/0x12345679/rfc26"
) )
func createMockServer(t *testing.T, wg *sync.WaitGroup, expectedType TelemetryType) *httptest.Server { func createMockServer(t *testing.T, wg *sync.WaitGroup, expectedType TelemetryType, expectedCondition func(received []TelemetryRequest) (shouldSucceed bool, shouldFail bool)) *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" { if r.Method != "POST" {
t.Errorf("Expected 'POST' request, got '%s'", r.Method) t.Errorf("Expected 'POST' request, got '%s'", r.Method)
@ -44,18 +47,41 @@ func createMockServer(t *testing.T, wg *sync.WaitGroup, expectedType TelemetryTy
t.Fatal(err) t.Fatal(err)
} }
if len(received) != 1 { if expectedCondition != nil {
t.Errorf("Unexpected data received: %+v", received) shouldSucceed, shouldFail := expectedCondition(received)
} else { if shouldFail {
if received[0].TelemetryType != expectedType { w.WriteHeader(http.StatusInternalServerError)
t.Errorf("Unexpected telemetry type: got %v, want %v", received[0].TelemetryType, expectedType) t.Fail()
} else { return
// If the data is as expected, respond with success }
t.Log("Responding with success") if !shouldSucceed {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
wg.Done() return
}
} else {
if len(received) != 1 {
t.Errorf("Unexpected data received: %+v", received)
} else {
if received[0].TelemetryType != expectedType {
t.Errorf("Unexpected telemetry type: got %v, want %v", received[0].TelemetryType, expectedType)
}
} }
} }
// If the data is as expected, respond with success
t.Log("Responding with success")
responseBody := []map[string]interface{}{
{"status": "created"},
}
body, err := json.Marshal(responseBody)
if err != nil {
t.Fatalf("Failed to marshal response body: %v", err)
}
w.WriteHeader(http.StatusCreated)
_, err = w.Write(body)
if err != nil {
t.Fatalf("Failed to write response body: %v", err)
}
wg.Done()
})) }))
} }
@ -69,11 +95,13 @@ func createClient(t *testing.T, mockServerURL string) *Client {
return NewClient(logger, mockServerURL, "testUID", "testNode", "1.0", WithSendPeriod(100*time.Millisecond)) return NewClient(logger, mockServerURL, "testUID", "testNode", "1.0", WithSendPeriod(100*time.Millisecond))
} }
func withMockServer(t *testing.T, expectedType TelemetryType, testFunc func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup)) { type expectedCondition func(received []TelemetryRequest) (shouldSucceed bool, shouldFail bool)
func withMockServer(t *testing.T, expectedType TelemetryType, expectedCondition expectedCondition, testFunc func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup)) {
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(1) // Expecting one request wg.Add(1) // Expecting one request
mockServer := createMockServer(t, &wg, expectedType) mockServer := createMockServer(t, &wg, expectedType, expectedCondition)
defer mockServer.Close() defer mockServer.Close()
client := createClient(t, mockServer.URL) client := createClient(t, mockServer.URL)
@ -88,7 +116,7 @@ func withMockServer(t *testing.T, expectedType TelemetryType, testFunc func(ctx
} }
func TestClient_ProcessReceivedMessages(t *testing.T) { func TestClient_ProcessReceivedMessages(t *testing.T) {
withMockServer(t, ReceivedMessagesMetric, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) { withMockServer(t, ReceivedMessagesMetric, nil, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) {
// Create a telemetry request to send // Create a telemetry request to send
data := ReceivedMessages{ data := ReceivedMessages{
Filter: transport.Filter{ Filter: transport.Filter{
@ -117,7 +145,7 @@ func TestClient_ProcessReceivedMessages(t *testing.T) {
} }
func TestClient_ProcessReceivedEnvelope(t *testing.T) { func TestClient_ProcessReceivedEnvelope(t *testing.T) {
withMockServer(t, ReceivedEnvelopeMetric, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) { withMockServer(t, ReceivedEnvelopeMetric, nil, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) {
// Create a telemetry request to send // Create a telemetry request to send
envelope := v2protocol.NewEnvelope(&pb.WakuMessage{ envelope := v2protocol.NewEnvelope(&pb.WakuMessage{
Payload: []byte{1, 2, 3, 4, 5}, Payload: []byte{1, 2, 3, 4, 5},
@ -133,7 +161,7 @@ func TestClient_ProcessReceivedEnvelope(t *testing.T) {
} }
func TestClient_ProcessSentEnvelope(t *testing.T) { func TestClient_ProcessSentEnvelope(t *testing.T) {
withMockServer(t, SentEnvelopeMetric, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) { withMockServer(t, SentEnvelopeMetric, nil, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) {
// Create a telemetry request to send // Create a telemetry request to send
sentEnvelope := wakuv2.SentEnvelope{ sentEnvelope := wakuv2.SentEnvelope{
Envelope: v2protocol.NewEnvelope(&pb.WakuMessage{ Envelope: v2protocol.NewEnvelope(&pb.WakuMessage{
@ -156,7 +184,7 @@ var (
) )
func TestTelemetryUponPublishError(t *testing.T) { func TestTelemetryUponPublishError(t *testing.T) {
withMockServer(t, ErrorSendingEnvelopeMetric, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) { withMockServer(t, ErrorSendingEnvelopeMetric, nil, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) {
enrTreeAddress := testENRBootstrap enrTreeAddress := testENRBootstrap
envEnrTreeAddress := os.Getenv("ENRTREE_ADDRESS") envEnrTreeAddress := os.Getenv("ENRTREE_ADDRESS")
if envEnrTreeAddress != "" { if envEnrTreeAddress != "" {
@ -228,7 +256,19 @@ func TestRetryCache(t *testing.T) {
} else { } else {
t.Log("Counter reached, responding with success") t.Log("Counter reached, responding with success")
if len(received) == 4 { if len(received) == 4 {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusCreated)
responseBody := []map[string]interface{}{
{"status": "created"},
}
body, err := json.Marshal(responseBody)
if err != nil {
t.Fatalf("Failed to marshal response body: %v", err)
}
w.WriteHeader(http.StatusCreated)
_, err = w.Write(body)
if err != nil {
t.Fatalf("Failed to write response body: %v", err)
}
wg.Done() wg.Done()
} else { } else {
t.Fatalf("Expected 4 metrics, got %d", len(received)-1) t.Fatalf("Expected 4 metrics, got %d", len(received)-1)
@ -295,3 +335,54 @@ func TestRetryCacheCleanup(t *testing.T) {
require.Equal(t, 5001, len(client.telemetryRetryCache)) require.Equal(t, 5001, len(client.telemetryRetryCache))
} }
func setDefaultConfig(config *wakuv2.Config, lightMode bool) {
config.ClusterID = 16
config.UseShardAsDefaultTopic = true
if lightMode {
config.EnablePeerExchangeClient = true
config.LightClient = true
config.EnableDiscV5 = false
} else {
config.EnableDiscV5 = true
config.EnablePeerExchangeServer = true
config.LightClient = false
config.EnablePeerExchangeClient = false
}
}
var testStoreENRBootstrap = "enrtree://AI4W5N5IFEUIHF5LESUAOSMV6TKWF2MB6GU2YK7PU4TYUGUNOCEPW@store.staging.shards.nodes.status.im"
func TestPeerCount(t *testing.T) {
expectedCondition := func(received []TelemetryRequest) (shouldSucceed bool, shouldFail bool) {
found := slices.ContainsFunc(received, func(req TelemetryRequest) bool {
t.Log(req)
return req.TelemetryType == PeerCountMetric
})
return found, false
}
withMockServer(t, PeerCountMetric, expectedCondition, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) {
config := &wakuv2.Config{}
setDefaultConfig(config, false)
config.DiscV5BootstrapNodes = []string{testStoreENRBootstrap}
config.DiscoveryLimit = 20
w, err := wakuv2.New(nil, "shards.staging", config, nil, nil, nil, nil, nil)
require.NoError(t, err)
w.SetStatusTelemetryClient(client)
client.Start(ctx)
require.NoError(t, w.Start())
err = tt.RetryWithBackOff(func() error {
if len(w.Peers()) == 0 {
return errors.New("no peers discovered")
}
return nil
})
require.NoError(t, err)
require.NotEqual(t, 0, len(w.Peers()))
})
}

View File

@ -106,6 +106,7 @@ type ITelemetryClient interface {
PushReceivedEnvelope(receivedEnvelope *protocol.Envelope) PushReceivedEnvelope(receivedEnvelope *protocol.Envelope)
PushSentEnvelope(sentEnvelope SentEnvelope) PushSentEnvelope(sentEnvelope SentEnvelope)
PushErrorSendingEnvelope(errorSendingEnvelope ErrorSendingEnvelope) PushErrorSendingEnvelope(errorSendingEnvelope ErrorSendingEnvelope)
PushPeerCount(peerCount int)
} }
// Waku represents a dark communication interface through the Ethereum // Waku represents a dark communication interface through the Ethereum
@ -1382,6 +1383,10 @@ func (w *Waku) Start() error {
w.onPeerStats(latestConnStatus) w.onPeerStats(latestConnStatus)
} }
if w.statusTelemetryClient != nil {
w.statusTelemetryClient.PushPeerCount(w.PeerCount())
}
//TODO: analyze if we need to discover and connect to peers with peerExchange loop enabled. //TODO: analyze if we need to discover and connect to peers with peerExchange loop enabled.
if !w.onlineChecker.IsOnline() && isOnline { if !w.onlineChecker.IsOnline() && isOnline {
if err := w.discoverAndConnectPeers(); err != nil { if err := w.discoverAndConnectPeers(); err != nil {