diff --git a/telemetry/client.go b/telemetry/client.go index 544083ae7..587e290fb 100644 --- a/telemetry/client.go +++ b/telemetry/client.go @@ -29,6 +29,7 @@ const ( UpdateEnvelopeMetric TelemetryType = "UpdateEnvelope" ReceivedMessagesMetric TelemetryType = "ReceivedMessages" ErrorSendingEnvelopeMetric TelemetryType = "ErrorSendingEnvelope" + PeerCountMetric TelemetryType = "PeerCount" MaxRetryCache = 5000 ) @@ -55,12 +56,20 @@ func (c *Client) PushErrorSendingEnvelope(errorSendingEnvelope wakuv2.ErrorSendi c.processAndPushTelemetry(errorSendingEnvelope) } +func (c *Client) PushPeerCount(peerCount int) { + c.processAndPushTelemetry(PeerCount{PeerCount: peerCount}) +} + type ReceivedMessages struct { Filter transport.Filter SSHMessage *types.Message Messages []*v1protocol.StatusMessage } +type PeerCount struct { + PeerCount int +} + type Client struct { serverURL string httpClient *http.Client @@ -183,6 +192,12 @@ func (c *Client) processAndPushTelemetry(data interface{}) { TelemetryType: ErrorSendingEnvelopeMetric, TelemetryData: c.ProcessErrorSendingEnvelope(v), } + case PeerCount: + telemetryRequest = TelemetryRequest{ + Id: c.nextId, + TelemetryType: PeerCountMetric, + TelemetryData: c.ProcessPeerCount(v), + } default: c.logger.Error("Unknown telemetry data type") return @@ -213,9 +228,15 @@ func (c *Client) pushTelemetryRequest(request []TelemetryRequest) error { c.logger.Error("Error sending telemetry data", zap.Error(err)) return err } - if res.StatusCode != http.StatusOK { - c.logger.Error("Error sending telemetry data", zap.Int("statusCode", res.StatusCode)) - return fmt.Errorf("status code %d", res.StatusCode) + defer res.Body.Close() + var responseBody []map[string]interface{} + if err := json.NewDecoder(res.Body).Decode(&responseBody); err != nil { + c.logger.Error("Error decoding response body", zap.Error(err)) + return err + } + if res.StatusCode != http.StatusCreated { + c.logger.Error("Error sending telemetry data", zap.Int("statusCode", res.StatusCode), zap.Any("responseBody", responseBody)) + return fmt.Errorf("status code %d, response body: %v", res.StatusCode, responseBody) } c.telemetryRetryCache = nil @@ -292,6 +313,19 @@ func (c *Client) ProcessErrorSendingEnvelope(errorSendingEnvelope wakuv2.ErrorSe return &jsonRawMessage } +func (c *Client) ProcessPeerCount(peerCount PeerCount) *json.RawMessage { + postBody := map[string]interface{}{ + "peerCount": peerCount.PeerCount, + "nodeName": c.nodeName, + "nodeKeyUID": c.keyUID, + "statusVersion": c.version, + "timestamp": time.Now().Unix(), + } + body, _ := json.Marshal(postBody) + jsonRawMessage := json.RawMessage(body) + return &jsonRawMessage +} + func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) { c.logger.Debug("Pushing envelope update to telemetry server", zap.String("hash", types.EncodeHex(shhMessage.Hash))) url := fmt.Sprintf("%s/update-envelope", c.serverURL) diff --git a/telemetry/client_test.go b/telemetry/client_test.go index e476b0144..c2e335dde 100644 --- a/telemetry/client_test.go +++ b/telemetry/client_test.go @@ -3,9 +3,11 @@ package telemetry import ( "context" "encoding/json" + "errors" "net/http" "net/http/httptest" "os" + "slices" "sync" "testing" "time" @@ -20,6 +22,7 @@ import ( "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/protocol/transport" + "github.com/status-im/status-go/protocol/tt" v1protocol "github.com/status-im/status-go/protocol/v1" "github.com/status-im/status-go/wakuv2" ) @@ -28,7 +31,7 @@ var ( testContentTopic = "/waku/1/0x12345679/rfc26" ) -func createMockServer(t *testing.T, wg *sync.WaitGroup, expectedType TelemetryType) *httptest.Server { +func createMockServer(t *testing.T, wg *sync.WaitGroup, expectedType TelemetryType, expectedCondition func(received []TelemetryRequest) (shouldSucceed bool, shouldFail bool)) *httptest.Server { return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { t.Errorf("Expected 'POST' request, got '%s'", r.Method) @@ -44,18 +47,41 @@ func createMockServer(t *testing.T, wg *sync.WaitGroup, expectedType TelemetryTy t.Fatal(err) } - if len(received) != 1 { - t.Errorf("Unexpected data received: %+v", received) - } else { - if received[0].TelemetryType != expectedType { - t.Errorf("Unexpected telemetry type: got %v, want %v", received[0].TelemetryType, expectedType) - } else { - // If the data is as expected, respond with success - t.Log("Responding with success") + if expectedCondition != nil { + shouldSucceed, shouldFail := expectedCondition(received) + if shouldFail { + w.WriteHeader(http.StatusInternalServerError) + t.Fail() + return + } + if !shouldSucceed { w.WriteHeader(http.StatusOK) - wg.Done() + return + } + } else { + if len(received) != 1 { + t.Errorf("Unexpected data received: %+v", received) + } else { + if received[0].TelemetryType != expectedType { + t.Errorf("Unexpected telemetry type: got %v, want %v", received[0].TelemetryType, expectedType) + } } } + // If the data is as expected, respond with success + t.Log("Responding with success") + responseBody := []map[string]interface{}{ + {"status": "created"}, + } + body, err := json.Marshal(responseBody) + if err != nil { + t.Fatalf("Failed to marshal response body: %v", err) + } + w.WriteHeader(http.StatusCreated) + _, err = w.Write(body) + if err != nil { + t.Fatalf("Failed to write response body: %v", err) + } + wg.Done() })) } @@ -69,11 +95,13 @@ func createClient(t *testing.T, mockServerURL string) *Client { return NewClient(logger, mockServerURL, "testUID", "testNode", "1.0", WithSendPeriod(100*time.Millisecond)) } -func withMockServer(t *testing.T, expectedType TelemetryType, testFunc func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup)) { +type expectedCondition func(received []TelemetryRequest) (shouldSucceed bool, shouldFail bool) + +func withMockServer(t *testing.T, expectedType TelemetryType, expectedCondition expectedCondition, testFunc func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup)) { var wg sync.WaitGroup wg.Add(1) // Expecting one request - mockServer := createMockServer(t, &wg, expectedType) + mockServer := createMockServer(t, &wg, expectedType, expectedCondition) defer mockServer.Close() client := createClient(t, mockServer.URL) @@ -88,7 +116,7 @@ func withMockServer(t *testing.T, expectedType TelemetryType, testFunc func(ctx } func TestClient_ProcessReceivedMessages(t *testing.T) { - withMockServer(t, ReceivedMessagesMetric, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) { + withMockServer(t, ReceivedMessagesMetric, nil, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) { // Create a telemetry request to send data := ReceivedMessages{ Filter: transport.Filter{ @@ -117,7 +145,7 @@ func TestClient_ProcessReceivedMessages(t *testing.T) { } func TestClient_ProcessReceivedEnvelope(t *testing.T) { - withMockServer(t, ReceivedEnvelopeMetric, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) { + withMockServer(t, ReceivedEnvelopeMetric, nil, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) { // Create a telemetry request to send envelope := v2protocol.NewEnvelope(&pb.WakuMessage{ Payload: []byte{1, 2, 3, 4, 5}, @@ -133,7 +161,7 @@ func TestClient_ProcessReceivedEnvelope(t *testing.T) { } func TestClient_ProcessSentEnvelope(t *testing.T) { - withMockServer(t, SentEnvelopeMetric, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) { + withMockServer(t, SentEnvelopeMetric, nil, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) { // Create a telemetry request to send sentEnvelope := wakuv2.SentEnvelope{ Envelope: v2protocol.NewEnvelope(&pb.WakuMessage{ @@ -156,7 +184,7 @@ var ( ) func TestTelemetryUponPublishError(t *testing.T) { - withMockServer(t, ErrorSendingEnvelopeMetric, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) { + withMockServer(t, ErrorSendingEnvelopeMetric, nil, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) { enrTreeAddress := testENRBootstrap envEnrTreeAddress := os.Getenv("ENRTREE_ADDRESS") if envEnrTreeAddress != "" { @@ -228,7 +256,19 @@ func TestRetryCache(t *testing.T) { } else { t.Log("Counter reached, responding with success") if len(received) == 4 { - w.WriteHeader(http.StatusOK) + w.WriteHeader(http.StatusCreated) + responseBody := []map[string]interface{}{ + {"status": "created"}, + } + body, err := json.Marshal(responseBody) + if err != nil { + t.Fatalf("Failed to marshal response body: %v", err) + } + w.WriteHeader(http.StatusCreated) + _, err = w.Write(body) + if err != nil { + t.Fatalf("Failed to write response body: %v", err) + } wg.Done() } else { t.Fatalf("Expected 4 metrics, got %d", len(received)-1) @@ -295,3 +335,54 @@ func TestRetryCacheCleanup(t *testing.T) { require.Equal(t, 5001, len(client.telemetryRetryCache)) } +func setDefaultConfig(config *wakuv2.Config, lightMode bool) { + config.ClusterID = 16 + config.UseShardAsDefaultTopic = true + + if lightMode { + config.EnablePeerExchangeClient = true + config.LightClient = true + config.EnableDiscV5 = false + } else { + config.EnableDiscV5 = true + config.EnablePeerExchangeServer = true + config.LightClient = false + config.EnablePeerExchangeClient = false + } +} + +var testStoreENRBootstrap = "enrtree://AI4W5N5IFEUIHF5LESUAOSMV6TKWF2MB6GU2YK7PU4TYUGUNOCEPW@store.staging.shards.nodes.status.im" + +func TestPeerCount(t *testing.T) { + expectedCondition := func(received []TelemetryRequest) (shouldSucceed bool, shouldFail bool) { + found := slices.ContainsFunc(received, func(req TelemetryRequest) bool { + t.Log(req) + return req.TelemetryType == PeerCountMetric + }) + return found, false + } + withMockServer(t, PeerCountMetric, expectedCondition, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) { + config := &wakuv2.Config{} + setDefaultConfig(config, false) + config.DiscV5BootstrapNodes = []string{testStoreENRBootstrap} + config.DiscoveryLimit = 20 + w, err := wakuv2.New(nil, "shards.staging", config, nil, nil, nil, nil, nil) + require.NoError(t, err) + + w.SetStatusTelemetryClient(client) + client.Start(ctx) + + require.NoError(t, w.Start()) + + err = tt.RetryWithBackOff(func() error { + if len(w.Peers()) == 0 { + return errors.New("no peers discovered") + } + return nil + }) + + require.NoError(t, err) + + require.NotEqual(t, 0, len(w.Peers())) + }) +} diff --git a/wakuv2/waku.go b/wakuv2/waku.go index ac4e965a0..807ec0f50 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -106,6 +106,7 @@ type ITelemetryClient interface { PushReceivedEnvelope(receivedEnvelope *protocol.Envelope) PushSentEnvelope(sentEnvelope SentEnvelope) PushErrorSendingEnvelope(errorSendingEnvelope ErrorSendingEnvelope) + PushPeerCount(peerCount int) } // Waku represents a dark communication interface through the Ethereum @@ -1382,6 +1383,10 @@ func (w *Waku) Start() error { w.onPeerStats(latestConnStatus) } + if w.statusTelemetryClient != nil { + w.statusTelemetryClient.PushPeerCount(w.PeerCount()) + } + //TODO: analyze if we need to discover and connect to peers with peerExchange loop enabled. if !w.onlineChecker.IsOnline() && isOnline { if err := w.discoverAndConnectPeers(); err != nil {