fix(telemetry)_: properly handle the size of the retryCache slice (#5510)

This commit is contained in:
Vaclav Pavlin 2024-07-12 03:07:23 +02:00 committed by GitHub
parent c477a3845f
commit 5f0d344b73
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 32 additions and 3 deletions

View File

@ -140,7 +140,7 @@ func (c *Client) Start(ctx context.Context) {
if len(telemetryRequests) > 0 {
err := c.pushTelemetryRequest(telemetryRequests)
if err != nil {
if sendPeriod < 60 { //Stop the growing if the timer is > 60s to at least retry every minute
if sendPeriod < 60*time.Second { //Stop the growing if the timer is > 60s to at least retry every minute
sendPeriod = sendPeriod * 2
}
} else {
@ -196,8 +196,8 @@ func (c *Client) processAndPushTelemetry(data interface{}) {
// This is assuming to not run concurrently as we are not locking the `telemetryRetryCache`
func (c *Client) pushTelemetryRequest(request []TelemetryRequest) error {
if len(c.telemetryRetryCache)+len(request) > MaxRetryCache { //Limit the size of the cache to not grow the slice indefinitely in case the Telemetry server is gone for longer time
removeNum := len(c.telemetryRetryCache) + len(request) - MaxRetryCache
if len(c.telemetryRetryCache) > MaxRetryCache { //Limit the size of the cache to not grow the slice indefinitely in case the Telemetry server is gone for longer time
removeNum := len(c.telemetryRetryCache) - MaxRetryCache
c.telemetryRetryCache = c.telemetryRetryCache[removeNum:]
}
c.telemetryRetryCache = append(c.telemetryRetryCache, request...)

View File

@ -266,3 +266,32 @@ func TestRetryCache(t *testing.T) {
require.Equal(t, 0, len(client.telemetryRetryCache))
}
func TestRetryCacheCleanup(t *testing.T) {
client := createClient(t, "")
client.Start(context.Background())
for i := 0; i < 6000; i++ {
client.PushReceivedEnvelope(v2protocol.NewEnvelope(&pb.WakuMessage{
Payload: []byte{1, 2, 3, 4, 5},
ContentTopic: testContentTopic,
Version: proto.Uint32(0),
Timestamp: proto.Int64(time.Now().Unix()),
}, 0, ""))
}
time.Sleep(110 * time.Millisecond)
require.Equal(t, 6000, len(client.telemetryRetryCache))
client.PushReceivedEnvelope(v2protocol.NewEnvelope(&pb.WakuMessage{
Payload: []byte{1, 2, 3, 4, 5},
ContentTopic: testContentTopic,
Version: proto.Uint32(0),
Timestamp: proto.Int64(time.Now().Unix()),
}, 0, ""))
time.Sleep(210 * time.Millisecond)
require.Equal(t, 5001, len(client.telemetryRetryCache))
}