status-go/telemetry/client_test.go

496 lines
14 KiB
Go

package telemetry
import (
"context"
"encoding/json"
"errors"
"net/http"
"net/http/httptest"
"os"
"slices"
"sync"
"testing"
"time"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"github.com/waku-org/go-waku/waku/v2/api/publish"
v2protocol "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/stretchr/testify/require"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/transport"
"github.com/status-im/status-go/protocol/tt"
v1protocol "github.com/status-im/status-go/protocol/v1"
"github.com/status-im/status-go/wakuv2"
)
var (
testContentTopic = "/waku/1/0x12345679/rfc26"
)
func createMockServer(t *testing.T, wg *sync.WaitGroup, expectedType TelemetryType, expectedCondition func(received []TelemetryRequest) (shouldSucceed bool, shouldFail bool)) *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
t.Errorf("Expected 'POST' request, got '%s'", r.Method)
}
if r.URL.EscapedPath() != "/record-metrics" {
t.Errorf("Expected request to '/record-metrics', got '%s'", r.URL.EscapedPath())
}
// Check the request body is as expected
var received []TelemetryRequest
err := json.NewDecoder(r.Body).Decode(&received)
if err != nil {
t.Fatal(err)
}
if expectedCondition != nil {
shouldSucceed, shouldFail := expectedCondition(received)
if shouldFail {
w.WriteHeader(http.StatusInternalServerError)
t.Fail()
return
}
if !shouldSucceed {
w.WriteHeader(http.StatusOK)
return
}
} else {
if len(received) != 1 {
t.Errorf("Unexpected data received: %+v", received)
} else {
if received[0].TelemetryType != expectedType {
t.Errorf("Unexpected telemetry type: got %v, want %v", received[0].TelemetryType, expectedType)
}
}
}
// If the data is as expected, respond with success
t.Log("Responding with success")
responseBody := []map[string]interface{}{
{"status": "created"},
}
body, err := json.Marshal(responseBody)
if err != nil {
t.Fatalf("Failed to marshal response body: %v", err)
}
w.WriteHeader(http.StatusCreated)
_, err = w.Write(body)
if err != nil {
t.Fatalf("Failed to write response body: %v", err)
}
wg.Done()
}))
}
func createClient(t *testing.T, mockServerURL string) *Client {
config := zap.NewDevelopmentConfig()
config.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
logger, err := config.Build()
if err != nil {
t.Fatalf("Failed to create logger: %v", err)
}
return NewClient(logger, mockServerURL, "testUID", "testNode", "1.0", WithSendPeriod(100*time.Millisecond), WithPeerID("16Uiu2HAkvWiyFsgRhuJEb9JfjYxEkoHLgnUQmr1N5mKWnYjxYRVm"))
}
type expectedCondition func(received []TelemetryRequest) (shouldSucceed bool, shouldFail bool)
func withMockServer(t *testing.T, expectedType TelemetryType, expectedCondition expectedCondition, testFunc func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup)) {
var wg sync.WaitGroup
wg.Add(1) // Expecting one request
mockServer := createMockServer(t, &wg, expectedType, expectedCondition)
defer mockServer.Close()
client := createClient(t, mockServer.URL)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
testFunc(ctx, t, client, &wg)
// Wait for the request to be received
wg.Wait()
}
func TestClient_ProcessReceivedMessages(t *testing.T) {
withMockServer(t, ReceivedMessagesMetric, nil, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) {
// Create a telemetry request to send
data := ReceivedMessages{
Filter: transport.Filter{
ChatID: "testChat",
PubsubTopic: "testTopic",
ContentTopic: types.StringToTopic(testContentTopic),
},
SSHMessage: &types.Message{
Hash: []byte("hash"),
Timestamp: uint32(time.Now().Unix()),
},
Messages: []*v1protocol.StatusMessage{
{
ApplicationLayer: v1protocol.ApplicationLayer{
ID: types.HexBytes("123"),
Type: 1,
},
},
},
}
// Send the telemetry request
client.Start(ctx)
client.PushReceivedMessages(ctx, data)
})
}
func TestClient_ProcessReceivedEnvelope(t *testing.T) {
withMockServer(t, ReceivedEnvelopeMetric, nil, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) {
// Create a telemetry request to send
envelope := v2protocol.NewEnvelope(&pb.WakuMessage{
Payload: []byte{1, 2, 3, 4, 5},
ContentTopic: testContentTopic,
Version: proto.Uint32(0),
Timestamp: proto.Int64(time.Now().Unix()),
}, 0, "")
// Send the telemetry request
client.Start(ctx)
client.PushReceivedEnvelope(ctx, envelope)
})
}
func TestClient_ProcessSentEnvelope(t *testing.T) {
withMockServer(t, SentEnvelopeMetric, nil, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) {
// Create a telemetry request to send
sentEnvelope := wakuv2.SentEnvelope{
Envelope: v2protocol.NewEnvelope(&pb.WakuMessage{
Payload: []byte{1, 2, 3, 4, 5},
ContentTopic: testContentTopic,
Version: proto.Uint32(0),
Timestamp: proto.Int64(time.Now().Unix()),
}, 0, ""),
PublishMethod: publish.LightPush,
}
// Send the telemetry request
client.Start(ctx)
client.PushSentEnvelope(ctx, sentEnvelope)
})
}
var (
testENRBootstrap = "enrtree://AI4W5N5IFEUIHF5LESUAOSMV6TKWF2MB6GU2YK7PU4TYUGUNOCEPW@store.staging.status.nodes.status.im"
)
func TestTelemetryUponPublishError(t *testing.T) {
withMockServer(t, ErrorSendingEnvelopeMetric, nil, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) {
enrTreeAddress := testENRBootstrap
envEnrTreeAddress := os.Getenv("ENRTREE_ADDRESS")
if envEnrTreeAddress != "" {
enrTreeAddress = envEnrTreeAddress
}
wakuConfig := &wakuv2.Config{}
wakuConfig.Port = 0
wakuConfig.EnablePeerExchangeClient = true
wakuConfig.LightClient = true
wakuConfig.EnableDiscV5 = false
wakuConfig.DiscV5BootstrapNodes = []string{enrTreeAddress}
wakuConfig.DiscoveryLimit = 20
wakuConfig.ClusterID = 16
wakuConfig.WakuNodes = []string{enrTreeAddress}
wakuConfig.TelemetryServerURL = client.serverURL
wakuConfig.TelemetrySendPeriodMs = 500
w, err := wakuv2.New(nil, "", wakuConfig, nil, nil, nil, nil, nil)
require.NoError(t, err)
client.Start(ctx)
w.SetStatusTelemetryClient(client)
// Setting this forces the publish function to fail when sending a message
w.SkipPublishToTopic(true)
err = w.Start()
require.NoError(t, err)
msg := &pb.WakuMessage{
Payload: []byte{1, 2, 3, 4, 5},
ContentTopic: testContentTopic,
Version: proto.Uint32(0),
Timestamp: proto.Int64(time.Now().Unix()),
}
// This should result in a single request sent by the telemetry client
_, err = w.Send(wakuConfig.DefaultShardPubsubTopic, msg, nil)
require.NoError(t, err)
})
}
func TestRetryCache(t *testing.T) {
counter := 0
var wg sync.WaitGroup
wg.Add(2)
mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
t.Errorf("Expected 'POST' request, got '%s'", r.Method)
}
if r.URL.EscapedPath() != "/record-metrics" {
t.Errorf("Expected request to '/record-metrics', got '%s'", r.URL.EscapedPath())
}
// Check the request body is as expected
var received []TelemetryRequest
err := json.NewDecoder(r.Body).Decode(&received)
if err != nil {
t.Fatal(err)
}
// Fail for the first request to make telemetry cache grow
if counter < 1 {
counter++
w.WriteHeader(http.StatusInternalServerError)
wg.Done()
} else {
t.Log("Counter reached, responding with success")
if len(received) == 4 {
w.WriteHeader(http.StatusCreated)
responseBody := []map[string]interface{}{
{"status": "created"},
}
body, err := json.Marshal(responseBody)
if err != nil {
t.Fatalf("Failed to marshal response body: %v", err)
}
w.WriteHeader(http.StatusCreated)
_, err = w.Write(body)
if err != nil {
t.Fatalf("Failed to write response body: %v", err)
}
wg.Done()
} else {
t.Fatalf("Expected 4 metrics, got %d", len(received)-1)
}
}
}))
defer mockServer.Close()
ctx := context.Background()
client := createClient(t, mockServer.URL)
client.Start(ctx)
for i := 0; i < 3; i++ {
client.PushReceivedEnvelope(ctx, v2protocol.NewEnvelope(&pb.WakuMessage{
Payload: []byte{1, 2, 3, 4, 5},
ContentTopic: testContentTopic,
Version: proto.Uint32(0),
Timestamp: proto.Int64(time.Now().Unix()),
}, 0, ""))
}
time.Sleep(110 * time.Millisecond)
require.Equal(t, 3, len(client.telemetryRetryCache))
client.PushReceivedEnvelope(ctx, v2protocol.NewEnvelope(&pb.WakuMessage{
Payload: []byte{1, 2, 3, 4, 5},
ContentTopic: testContentTopic,
Version: proto.Uint32(0),
Timestamp: proto.Int64(time.Now().Unix()),
}, 0, ""))
wg.Wait()
time.Sleep(100 * time.Millisecond)
require.Equal(t, 0, len(client.telemetryRetryCache))
}
func TestRetryCacheCleanup(t *testing.T) {
ctx := context.Background()
client := createClient(t, "")
client.Start(ctx)
for i := 0; i < 6000; i++ {
client.PushReceivedEnvelope(ctx, v2protocol.NewEnvelope(&pb.WakuMessage{
Payload: []byte{1, 2, 3, 4, 5},
ContentTopic: testContentTopic,
Version: proto.Uint32(0),
Timestamp: proto.Int64(time.Now().Unix()),
}, 0, ""))
}
time.Sleep(110 * time.Millisecond)
require.Equal(t, 6000, len(client.telemetryRetryCache))
client.PushReceivedEnvelope(ctx, v2protocol.NewEnvelope(&pb.WakuMessage{
Payload: []byte{1, 2, 3, 4, 5},
ContentTopic: testContentTopic,
Version: proto.Uint32(0),
Timestamp: proto.Int64(time.Now().Unix()),
}, 0, ""))
time.Sleep(210 * time.Millisecond)
require.Equal(t, 5001, len(client.telemetryRetryCache))
}
func setDefaultConfig(config *wakuv2.Config, lightMode bool) {
config.ClusterID = 16
if lightMode {
config.EnablePeerExchangeClient = true
config.LightClient = true
config.EnableDiscV5 = false
} else {
config.EnableDiscV5 = true
config.EnablePeerExchangeServer = true
config.LightClient = false
config.EnablePeerExchangeClient = false
}
}
var testStoreENRBootstrap = "enrtree://AI4W5N5IFEUIHF5LESUAOSMV6TKWF2MB6GU2YK7PU4TYUGUNOCEPW@store.staging.shards.nodes.status.im"
func TestPeerCount(t *testing.T) {
// t.Skip("flaky test")
expectedCondition := func(received []TelemetryRequest) (shouldSucceed bool, shouldFail bool) {
found := slices.ContainsFunc(received, func(req TelemetryRequest) bool {
t.Log(req)
return req.TelemetryType == PeerCountMetric
})
return found, false
}
withMockServer(t, PeerCountMetric, expectedCondition, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) {
config := &wakuv2.Config{}
setDefaultConfig(config, false)
config.DiscV5BootstrapNodes = []string{testStoreENRBootstrap}
config.DiscoveryLimit = 20
config.TelemetryServerURL = client.serverURL
config.TelemetrySendPeriodMs = 1500
config.TelemetryPeerCountSendPeriod = 1500
w, err := wakuv2.New(nil, "shards.staging", config, nil, nil, nil, nil, nil)
require.NoError(t, err)
w.SetStatusTelemetryClient(client)
client.Start(ctx)
require.NoError(t, w.Start())
err = tt.RetryWithBackOff(func() error {
if len(w.Peers()) == 0 {
return errors.New("no peers discovered")
}
return nil
})
require.NoError(t, err)
require.NotEqual(t, 0, len(w.Peers()))
})
}
func TestPeerId(t *testing.T) {
expectedCondition := func(received []TelemetryRequest) (shouldSucceed bool, shouldFail bool) {
var data map[string]interface{}
err := json.Unmarshal(*received[0].TelemetryData, &data)
if err != nil {
return false, true
}
_, ok := data["peerId"]
require.True(t, ok)
return ok, false
}
withMockServer(t, ReceivedEnvelopeMetric, expectedCondition, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) {
client.Start(ctx)
client.PushReceivedEnvelope(ctx, v2protocol.NewEnvelope(&pb.WakuMessage{
Payload: []byte{1, 2, 3, 4, 5},
ContentTopic: testContentTopic,
Version: proto.Uint32(0),
Timestamp: proto.Int64(time.Now().Unix()),
}, 0, ""))
})
}
func TestPeerCountByShard(t *testing.T) {
expectedCondition := func(received []TelemetryRequest) (shouldSucceed bool, shouldFail bool) {
found := slices.ContainsFunc(received, func(req TelemetryRequest) bool {
return req.TelemetryType == PeerCountByShardMetric
})
return found, false
}
withMockServer(t, PeerCountByShardMetric, expectedCondition, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) {
config := &wakuv2.Config{}
setDefaultConfig(config, false)
config.DiscV5BootstrapNodes = []string{testStoreENRBootstrap}
config.DiscoveryLimit = 20
config.TelemetryServerURL = client.serverURL
config.TelemetryPeerCountSendPeriod = 1500
config.TelemetrySendPeriodMs = 1500
w, err := wakuv2.New(nil, "shards.staging", config, nil, nil, nil, nil, nil)
require.NoError(t, err)
w.SetStatusTelemetryClient(client)
client.Start(ctx)
require.NoError(t, w.Start())
err = tt.RetryWithBackOff(func() error {
if len(w.Peers()) == 0 {
return errors.New("no peers discovered")
}
return nil
})
require.NoError(t, err)
require.NotEqual(t, 0, len(w.Peers()))
})
}
func TestPeerCountByOrigin(t *testing.T) {
expectedCondition := func(received []TelemetryRequest) (shouldSucceed bool, shouldFail bool) {
found := slices.ContainsFunc(received, func(req TelemetryRequest) bool {
return req.TelemetryType == PeerCountByOriginMetric
})
return found, false
}
withMockServer(t, PeerCountByOriginMetric, expectedCondition, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) {
config := &wakuv2.Config{}
setDefaultConfig(config, false)
config.DiscV5BootstrapNodes = []string{testStoreENRBootstrap}
config.DiscoveryLimit = 20
config.TelemetryServerURL = client.serverURL
config.TelemetryPeerCountSendPeriod = 1500
config.TelemetrySendPeriodMs = 1500
w, err := wakuv2.New(nil, "shards.staging", config, nil, nil, nil, nil, nil)
require.NoError(t, err)
w.SetStatusTelemetryClient(client)
client.Start(ctx)
require.NoError(t, w.Start())
err = tt.RetryWithBackOff(func() error {
if len(w.Peers()) == 0 {
return errors.New("no peers discovered")
}
return nil
})
require.NoError(t, err)
require.NotEqual(t, 0, len(w.Peers()))
})
}