diff --git a/telemetry/client.go b/telemetry/client.go index dae442ef5..41c018c5a 100644 --- a/telemetry/client.go +++ b/telemetry/client.go @@ -29,6 +29,8 @@ const ( UpdateEnvelopeMetric TelemetryType = "UpdateEnvelope" ReceivedMessagesMetric TelemetryType = "ReceivedMessages" ErrorSendingEnvelopeMetric TelemetryType = "ErrorSendingEnvelope" + + MaxRetryCache = 5000 ) type TelemetryRequest struct { @@ -60,18 +62,19 @@ type ReceivedMessages struct { } type Client struct { - serverURL string - httpClient *http.Client - logger *zap.Logger - keyUID string - nodeName string - version string - telemetryCh chan TelemetryRequest - telemetryCacheLock sync.Mutex - telemetryCache []TelemetryRequest - nextIdLock sync.Mutex - nextId int - sendPeriod time.Duration + serverURL string + httpClient *http.Client + logger *zap.Logger + keyUID string + nodeName string + version string + telemetryCh chan TelemetryRequest + telemetryCacheLock sync.Mutex + telemetryCache []TelemetryRequest + telemetryRetryCache []TelemetryRequest + nextIdLock sync.Mutex + nextId int + sendPeriod time.Duration } type TelemetryClientOption func(*Client) @@ -84,18 +87,19 @@ func WithSendPeriod(sendPeriod time.Duration) TelemetryClientOption { func NewClient(logger *zap.Logger, serverURL string, keyUID string, nodeName string, version string, opts ...TelemetryClientOption) *Client { client := &Client{ - serverURL: serverURL, - httpClient: &http.Client{Timeout: time.Minute}, - logger: logger, - keyUID: keyUID, - nodeName: nodeName, - version: version, - telemetryCh: make(chan TelemetryRequest), - telemetryCacheLock: sync.Mutex{}, - telemetryCache: make([]TelemetryRequest, 0), - nextId: 0, - nextIdLock: sync.Mutex{}, - sendPeriod: 10 * time.Second, // default value + serverURL: serverURL, + httpClient: &http.Client{Timeout: time.Minute}, + logger: logger, + keyUID: keyUID, + nodeName: nodeName, + version: version, + telemetryCh: make(chan TelemetryRequest), + telemetryCacheLock: sync.Mutex{}, + telemetryCache: make([]TelemetryRequest, 0), + telemetryRetryCache: make([]TelemetryRequest, 0), + nextId: 0, + nextIdLock: sync.Mutex{}, + sendPeriod: 10 * time.Second, // default value } for _, opt := range opts { @@ -120,12 +124,13 @@ func (c *Client) Start(ctx context.Context) { } }() go func() { - ticker := time.NewTicker(c.sendPeriod) - defer ticker.Stop() + sendPeriod := c.sendPeriod + timer := time.NewTimer(sendPeriod) + defer timer.Stop() for { select { - case <-ticker.C: + case <-timer.C: c.telemetryCacheLock.Lock() telemetryRequests := make([]TelemetryRequest, len(c.telemetryCache)) copy(telemetryRequests, c.telemetryCache) @@ -133,8 +138,16 @@ func (c *Client) Start(ctx context.Context) { c.telemetryCacheLock.Unlock() if len(telemetryRequests) > 0 { - c.pushTelemetryRequest(telemetryRequests) + err := c.pushTelemetryRequest(telemetryRequests) + if err != nil { + if sendPeriod < 60 { //Stop the growing if the timer is > 60s to at least retry every minute + sendPeriod = sendPeriod * 2 + } + } else { + sendPeriod = c.sendPeriod + } } + timer.Reset(sendPeriod) case <-ctx.Done(): return } @@ -181,17 +194,35 @@ func (c *Client) processAndPushTelemetry(data interface{}) { c.nextIdLock.Unlock() } -func (c *Client) pushTelemetryRequest(request []TelemetryRequest) { +// This is assuming to not run concurrently as we are not locking the `telemetryRetryCache` +func (c *Client) pushTelemetryRequest(request []TelemetryRequest) error { + if len(c.telemetryRetryCache)+len(request) > MaxRetryCache { //Limit the size of the cache to not grow the slice indefinitely in case the Telemetry server is gone for longer time + removeNum := len(c.telemetryRetryCache) + len(request) - MaxRetryCache + c.telemetryRetryCache = c.telemetryRetryCache[removeNum:] + } + c.telemetryRetryCache = append(c.telemetryRetryCache, request...) + url := fmt.Sprintf("%s/record-metrics", c.serverURL) - body, _ := json.Marshal(request) - _, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(body)) + body, err := json.Marshal(c.telemetryRetryCache) + if err != nil { + c.logger.Error("Error marshaling telemetry data", zap.Error(err)) + return err + } + res, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(body)) if err != nil { c.logger.Error("Error sending telemetry data", zap.Error(err)) + return err } + if res.StatusCode != http.StatusOK { + c.logger.Error("Error sending telemetry data", zap.Int("statusCode", res.StatusCode)) + return fmt.Errorf("status code %d", res.StatusCode) + } + + c.telemetryRetryCache = nil + return nil } func (c *Client) ProcessReceivedMessages(receivedMessages ReceivedMessages) *json.RawMessage { - c.logger.Debug("Pushing received messages to telemetry server") var postBody []map[string]interface{} for _, message := range receivedMessages.Messages { postBody = append(postBody, map[string]interface{}{ diff --git a/telemetry/client_test.go b/telemetry/client_test.go index c4afcf993..1a1865bea 100644 --- a/telemetry/client_test.go +++ b/telemetry/client_test.go @@ -30,8 +30,6 @@ var ( func createMockServer(t *testing.T, wg *sync.WaitGroup, expectedType TelemetryType) *httptest.Server { return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - defer wg.Done() // Signal that a request was received - if r.Method != "POST" { t.Errorf("Expected 'POST' request, got '%s'", r.Method) } @@ -55,6 +53,7 @@ func createMockServer(t *testing.T, wg *sync.WaitGroup, expectedType TelemetryTy // If the data is as expected, respond with success t.Log("Responding with success") w.WriteHeader(http.StatusOK) + wg.Done() } } })) @@ -67,10 +66,10 @@ func createClient(t *testing.T, mockServerURL string) *Client { if err != nil { t.Fatalf("Failed to create logger: %v", err) } - return NewClient(logger, mockServerURL, "testUID", "testNode", "1.0", WithSendPeriod(500*time.Millisecond)) + return NewClient(logger, mockServerURL, "testUID", "testNode", "1.0", WithSendPeriod(100*time.Millisecond)) } -func withMockServer(t *testing.T, expectedType TelemetryType, testFunc func(t *testing.T, client *Client, wg *sync.WaitGroup)) { +func withMockServer(t *testing.T, expectedType TelemetryType, testFunc func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup)) { var wg sync.WaitGroup wg.Add(1) // Expecting one request @@ -79,14 +78,17 @@ func withMockServer(t *testing.T, expectedType TelemetryType, testFunc func(t *t client := createClient(t, mockServer.URL) - testFunc(t, client, &wg) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + testFunc(ctx, t, client, &wg) // Wait for the request to be received wg.Wait() } func TestClient_ProcessReceivedMessages(t *testing.T) { - withMockServer(t, ReceivedMessagesMetric, func(t *testing.T, client *Client, wg *sync.WaitGroup) { + withMockServer(t, ReceivedMessagesMetric, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) { // Create a telemetry request to send data := ReceivedMessages{ Filter: transport.Filter{ @@ -107,20 +109,15 @@ func TestClient_ProcessReceivedMessages(t *testing.T) { }, }, } - telemetryData := client.ProcessReceivedMessages(data) - telemetryRequest := TelemetryRequest{ - Id: 1, - TelemetryType: ReceivedMessagesMetric, - TelemetryData: telemetryData, - } // Send the telemetry request - client.pushTelemetryRequest([]TelemetryRequest{telemetryRequest}) + client.Start(ctx) + client.PushReceivedMessages(data) }) } func TestClient_ProcessReceivedEnvelope(t *testing.T) { - withMockServer(t, ReceivedEnvelopeMetric, func(t *testing.T, client *Client, wg *sync.WaitGroup) { + withMockServer(t, ReceivedEnvelopeMetric, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) { // Create a telemetry request to send envelope := v2protocol.NewEnvelope(&pb.WakuMessage{ Payload: []byte{1, 2, 3, 4, 5}, @@ -128,20 +125,15 @@ func TestClient_ProcessReceivedEnvelope(t *testing.T) { Version: proto.Uint32(0), Timestamp: proto.Int64(time.Now().Unix()), }, 0, "") - telemetryData := client.ProcessReceivedEnvelope(envelope) - telemetryRequest := TelemetryRequest{ - Id: 2, - TelemetryType: ReceivedEnvelopeMetric, - TelemetryData: telemetryData, - } // Send the telemetry request - client.pushTelemetryRequest([]TelemetryRequest{telemetryRequest}) + client.Start(ctx) + client.PushReceivedEnvelope(envelope) }) } func TestClient_ProcessSentEnvelope(t *testing.T) { - withMockServer(t, SentEnvelopeMetric, func(t *testing.T, client *Client, wg *sync.WaitGroup) { + withMockServer(t, SentEnvelopeMetric, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) { // Create a telemetry request to send sentEnvelope := wakuv2.SentEnvelope{ Envelope: v2protocol.NewEnvelope(&pb.WakuMessage{ @@ -152,15 +144,10 @@ func TestClient_ProcessSentEnvelope(t *testing.T) { }, 0, ""), PublishMethod: wakuv2.LightPush, } - telemetryData := client.ProcessSentEnvelope(sentEnvelope) - telemetryRequest := TelemetryRequest{ - Id: 3, - TelemetryType: SentEnvelopeMetric, - TelemetryData: telemetryData, - } // Send the telemetry request - client.pushTelemetryRequest([]TelemetryRequest{telemetryRequest}) + client.Start(ctx) + client.PushSentEnvelope(sentEnvelope) }) } @@ -169,7 +156,7 @@ var ( ) func TestTelemetryUponPublishError(t *testing.T) { - withMockServer(t, ErrorSendingEnvelopeMetric, func(t *testing.T, client *Client, wg *sync.WaitGroup) { + withMockServer(t, ErrorSendingEnvelopeMetric, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) { enrTreeAddress := testENRBootstrap envEnrTreeAddress := os.Getenv("ENRTREE_ADDRESS") if envEnrTreeAddress != "" { @@ -191,7 +178,7 @@ func TestTelemetryUponPublishError(t *testing.T) { w, err := wakuv2.New(nil, "", wakuConfig, nil, nil, nil, nil, nil) require.NoError(t, err) - client.Start(context.Background()) + client.Start(ctx) w.SetStatusTelemetryClient(client) // Setting this forces the publish function to fail when sending a message @@ -212,3 +199,70 @@ func TestTelemetryUponPublishError(t *testing.T) { require.NoError(t, err) }) } + +func TestRetryCache(t *testing.T) { + counter := 0 + var wg sync.WaitGroup + wg.Add(2) + + mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + t.Errorf("Expected 'POST' request, got '%s'", r.Method) + } + if r.URL.EscapedPath() != "/record-metrics" { + t.Errorf("Expected request to '/record-metrics', got '%s'", r.URL.EscapedPath()) + } + + // Check the request body is as expected + var received []TelemetryRequest + err := json.NewDecoder(r.Body).Decode(&received) + if err != nil { + t.Fatal(err) + } + + // Fail for the first request to make telemetry cache grow + if counter < 1 { + counter++ + w.WriteHeader(http.StatusInternalServerError) + wg.Done() + } else { + t.Log("Counter reached, responding with success") + if len(received) == 4 { + w.WriteHeader(http.StatusOK) + wg.Done() + } else { + t.Fatalf("Expected 4 metrics, got %d", len(received)-1) + } + } + })) + defer mockServer.Close() + + client := createClient(t, mockServer.URL) + client.Start(context.Background()) + + for i := 0; i < 3; i++ { + client.PushReceivedEnvelope(v2protocol.NewEnvelope(&pb.WakuMessage{ + Payload: []byte{1, 2, 3, 4, 5}, + ContentTopic: testContentTopic, + Version: proto.Uint32(0), + Timestamp: proto.Int64(time.Now().Unix()), + }, 0, "")) + } + + time.Sleep(110 * time.Millisecond) + + require.Equal(t, 3, len(client.telemetryRetryCache)) + + client.PushReceivedEnvelope(v2protocol.NewEnvelope(&pb.WakuMessage{ + Payload: []byte{1, 2, 3, 4, 5}, + ContentTopic: testContentTopic, + Version: proto.Uint32(0), + Timestamp: proto.Int64(time.Now().Unix()), + }, 0, "")) + + wg.Wait() + + time.Sleep(100 * time.Millisecond) + + require.Equal(t, 0, len(client.telemetryRetryCache)) +}