chore(telemetry)_: add telemetry publish retry (#5442)

* chore(telemetry)_: add telemetry publish retry

* fix(telemetry)_: return error when server does not respond with 200

---------

Co-authored-by: Arseniy Klempner <arseniyk@status.im>
This commit is contained in:
Vaclav Pavlin 2024-07-01 20:08:54 +02:00 committed by GitHub
parent 5fd1c06bce
commit 131cfe7b3d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 148 additions and 63 deletions

View File

@ -29,6 +29,8 @@ const (
UpdateEnvelopeMetric TelemetryType = "UpdateEnvelope"
ReceivedMessagesMetric TelemetryType = "ReceivedMessages"
ErrorSendingEnvelopeMetric TelemetryType = "ErrorSendingEnvelope"
MaxRetryCache = 5000
)
type TelemetryRequest struct {
@ -60,18 +62,19 @@ type ReceivedMessages struct {
}
type Client struct {
serverURL string
httpClient *http.Client
logger *zap.Logger
keyUID string
nodeName string
version string
telemetryCh chan TelemetryRequest
telemetryCacheLock sync.Mutex
telemetryCache []TelemetryRequest
nextIdLock sync.Mutex
nextId int
sendPeriod time.Duration
serverURL string
httpClient *http.Client
logger *zap.Logger
keyUID string
nodeName string
version string
telemetryCh chan TelemetryRequest
telemetryCacheLock sync.Mutex
telemetryCache []TelemetryRequest
telemetryRetryCache []TelemetryRequest
nextIdLock sync.Mutex
nextId int
sendPeriod time.Duration
}
type TelemetryClientOption func(*Client)
@ -84,18 +87,19 @@ func WithSendPeriod(sendPeriod time.Duration) TelemetryClientOption {
func NewClient(logger *zap.Logger, serverURL string, keyUID string, nodeName string, version string, opts ...TelemetryClientOption) *Client {
client := &Client{
serverURL: serverURL,
httpClient: &http.Client{Timeout: time.Minute},
logger: logger,
keyUID: keyUID,
nodeName: nodeName,
version: version,
telemetryCh: make(chan TelemetryRequest),
telemetryCacheLock: sync.Mutex{},
telemetryCache: make([]TelemetryRequest, 0),
nextId: 0,
nextIdLock: sync.Mutex{},
sendPeriod: 10 * time.Second, // default value
serverURL: serverURL,
httpClient: &http.Client{Timeout: time.Minute},
logger: logger,
keyUID: keyUID,
nodeName: nodeName,
version: version,
telemetryCh: make(chan TelemetryRequest),
telemetryCacheLock: sync.Mutex{},
telemetryCache: make([]TelemetryRequest, 0),
telemetryRetryCache: make([]TelemetryRequest, 0),
nextId: 0,
nextIdLock: sync.Mutex{},
sendPeriod: 10 * time.Second, // default value
}
for _, opt := range opts {
@ -120,12 +124,13 @@ func (c *Client) Start(ctx context.Context) {
}
}()
go func() {
ticker := time.NewTicker(c.sendPeriod)
defer ticker.Stop()
sendPeriod := c.sendPeriod
timer := time.NewTimer(sendPeriod)
defer timer.Stop()
for {
select {
case <-ticker.C:
case <-timer.C:
c.telemetryCacheLock.Lock()
telemetryRequests := make([]TelemetryRequest, len(c.telemetryCache))
copy(telemetryRequests, c.telemetryCache)
@ -133,8 +138,16 @@ func (c *Client) Start(ctx context.Context) {
c.telemetryCacheLock.Unlock()
if len(telemetryRequests) > 0 {
c.pushTelemetryRequest(telemetryRequests)
err := c.pushTelemetryRequest(telemetryRequests)
if err != nil {
if sendPeriod < 60 { //Stop the growing if the timer is > 60s to at least retry every minute
sendPeriod = sendPeriod * 2
}
} else {
sendPeriod = c.sendPeriod
}
}
timer.Reset(sendPeriod)
case <-ctx.Done():
return
}
@ -181,17 +194,35 @@ func (c *Client) processAndPushTelemetry(data interface{}) {
c.nextIdLock.Unlock()
}
func (c *Client) pushTelemetryRequest(request []TelemetryRequest) {
// This is assuming to not run concurrently as we are not locking the `telemetryRetryCache`
func (c *Client) pushTelemetryRequest(request []TelemetryRequest) error {
if len(c.telemetryRetryCache)+len(request) > MaxRetryCache { //Limit the size of the cache to not grow the slice indefinitely in case the Telemetry server is gone for longer time
removeNum := len(c.telemetryRetryCache) + len(request) - MaxRetryCache
c.telemetryRetryCache = c.telemetryRetryCache[removeNum:]
}
c.telemetryRetryCache = append(c.telemetryRetryCache, request...)
url := fmt.Sprintf("%s/record-metrics", c.serverURL)
body, _ := json.Marshal(request)
_, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(body))
body, err := json.Marshal(c.telemetryRetryCache)
if err != nil {
c.logger.Error("Error marshaling telemetry data", zap.Error(err))
return err
}
res, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(body))
if err != nil {
c.logger.Error("Error sending telemetry data", zap.Error(err))
return err
}
if res.StatusCode != http.StatusOK {
c.logger.Error("Error sending telemetry data", zap.Int("statusCode", res.StatusCode))
return fmt.Errorf("status code %d", res.StatusCode)
}
c.telemetryRetryCache = nil
return nil
}
func (c *Client) ProcessReceivedMessages(receivedMessages ReceivedMessages) *json.RawMessage {
c.logger.Debug("Pushing received messages to telemetry server")
var postBody []map[string]interface{}
for _, message := range receivedMessages.Messages {
postBody = append(postBody, map[string]interface{}{

View File

@ -30,8 +30,6 @@ var (
func createMockServer(t *testing.T, wg *sync.WaitGroup, expectedType TelemetryType) *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer wg.Done() // Signal that a request was received
if r.Method != "POST" {
t.Errorf("Expected 'POST' request, got '%s'", r.Method)
}
@ -55,6 +53,7 @@ func createMockServer(t *testing.T, wg *sync.WaitGroup, expectedType TelemetryTy
// If the data is as expected, respond with success
t.Log("Responding with success")
w.WriteHeader(http.StatusOK)
wg.Done()
}
}
}))
@ -67,10 +66,10 @@ func createClient(t *testing.T, mockServerURL string) *Client {
if err != nil {
t.Fatalf("Failed to create logger: %v", err)
}
return NewClient(logger, mockServerURL, "testUID", "testNode", "1.0", WithSendPeriod(500*time.Millisecond))
return NewClient(logger, mockServerURL, "testUID", "testNode", "1.0", WithSendPeriod(100*time.Millisecond))
}
func withMockServer(t *testing.T, expectedType TelemetryType, testFunc func(t *testing.T, client *Client, wg *sync.WaitGroup)) {
func withMockServer(t *testing.T, expectedType TelemetryType, testFunc func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup)) {
var wg sync.WaitGroup
wg.Add(1) // Expecting one request
@ -79,14 +78,17 @@ func withMockServer(t *testing.T, expectedType TelemetryType, testFunc func(t *t
client := createClient(t, mockServer.URL)
testFunc(t, client, &wg)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
testFunc(ctx, t, client, &wg)
// Wait for the request to be received
wg.Wait()
}
func TestClient_ProcessReceivedMessages(t *testing.T) {
withMockServer(t, ReceivedMessagesMetric, func(t *testing.T, client *Client, wg *sync.WaitGroup) {
withMockServer(t, ReceivedMessagesMetric, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) {
// Create a telemetry request to send
data := ReceivedMessages{
Filter: transport.Filter{
@ -107,20 +109,15 @@ func TestClient_ProcessReceivedMessages(t *testing.T) {
},
},
}
telemetryData := client.ProcessReceivedMessages(data)
telemetryRequest := TelemetryRequest{
Id: 1,
TelemetryType: ReceivedMessagesMetric,
TelemetryData: telemetryData,
}
// Send the telemetry request
client.pushTelemetryRequest([]TelemetryRequest{telemetryRequest})
client.Start(ctx)
client.PushReceivedMessages(data)
})
}
func TestClient_ProcessReceivedEnvelope(t *testing.T) {
withMockServer(t, ReceivedEnvelopeMetric, func(t *testing.T, client *Client, wg *sync.WaitGroup) {
withMockServer(t, ReceivedEnvelopeMetric, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) {
// Create a telemetry request to send
envelope := v2protocol.NewEnvelope(&pb.WakuMessage{
Payload: []byte{1, 2, 3, 4, 5},
@ -128,20 +125,15 @@ func TestClient_ProcessReceivedEnvelope(t *testing.T) {
Version: proto.Uint32(0),
Timestamp: proto.Int64(time.Now().Unix()),
}, 0, "")
telemetryData := client.ProcessReceivedEnvelope(envelope)
telemetryRequest := TelemetryRequest{
Id: 2,
TelemetryType: ReceivedEnvelopeMetric,
TelemetryData: telemetryData,
}
// Send the telemetry request
client.pushTelemetryRequest([]TelemetryRequest{telemetryRequest})
client.Start(ctx)
client.PushReceivedEnvelope(envelope)
})
}
func TestClient_ProcessSentEnvelope(t *testing.T) {
withMockServer(t, SentEnvelopeMetric, func(t *testing.T, client *Client, wg *sync.WaitGroup) {
withMockServer(t, SentEnvelopeMetric, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) {
// Create a telemetry request to send
sentEnvelope := wakuv2.SentEnvelope{
Envelope: v2protocol.NewEnvelope(&pb.WakuMessage{
@ -152,15 +144,10 @@ func TestClient_ProcessSentEnvelope(t *testing.T) {
}, 0, ""),
PublishMethod: wakuv2.LightPush,
}
telemetryData := client.ProcessSentEnvelope(sentEnvelope)
telemetryRequest := TelemetryRequest{
Id: 3,
TelemetryType: SentEnvelopeMetric,
TelemetryData: telemetryData,
}
// Send the telemetry request
client.pushTelemetryRequest([]TelemetryRequest{telemetryRequest})
client.Start(ctx)
client.PushSentEnvelope(sentEnvelope)
})
}
@ -169,7 +156,7 @@ var (
)
func TestTelemetryUponPublishError(t *testing.T) {
withMockServer(t, ErrorSendingEnvelopeMetric, func(t *testing.T, client *Client, wg *sync.WaitGroup) {
withMockServer(t, ErrorSendingEnvelopeMetric, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) {
enrTreeAddress := testENRBootstrap
envEnrTreeAddress := os.Getenv("ENRTREE_ADDRESS")
if envEnrTreeAddress != "" {
@ -191,7 +178,7 @@ func TestTelemetryUponPublishError(t *testing.T) {
w, err := wakuv2.New(nil, "", wakuConfig, nil, nil, nil, nil, nil)
require.NoError(t, err)
client.Start(context.Background())
client.Start(ctx)
w.SetStatusTelemetryClient(client)
// Setting this forces the publish function to fail when sending a message
@ -212,3 +199,70 @@ func TestTelemetryUponPublishError(t *testing.T) {
require.NoError(t, err)
})
}
func TestRetryCache(t *testing.T) {
counter := 0
var wg sync.WaitGroup
wg.Add(2)
mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
t.Errorf("Expected 'POST' request, got '%s'", r.Method)
}
if r.URL.EscapedPath() != "/record-metrics" {
t.Errorf("Expected request to '/record-metrics', got '%s'", r.URL.EscapedPath())
}
// Check the request body is as expected
var received []TelemetryRequest
err := json.NewDecoder(r.Body).Decode(&received)
if err != nil {
t.Fatal(err)
}
// Fail for the first request to make telemetry cache grow
if counter < 1 {
counter++
w.WriteHeader(http.StatusInternalServerError)
wg.Done()
} else {
t.Log("Counter reached, responding with success")
if len(received) == 4 {
w.WriteHeader(http.StatusOK)
wg.Done()
} else {
t.Fatalf("Expected 4 metrics, got %d", len(received)-1)
}
}
}))
defer mockServer.Close()
client := createClient(t, mockServer.URL)
client.Start(context.Background())
for i := 0; i < 3; i++ {
client.PushReceivedEnvelope(v2protocol.NewEnvelope(&pb.WakuMessage{
Payload: []byte{1, 2, 3, 4, 5},
ContentTopic: testContentTopic,
Version: proto.Uint32(0),
Timestamp: proto.Int64(time.Now().Unix()),
}, 0, ""))
}
time.Sleep(110 * time.Millisecond)
require.Equal(t, 3, len(client.telemetryRetryCache))
client.PushReceivedEnvelope(v2protocol.NewEnvelope(&pb.WakuMessage{
Payload: []byte{1, 2, 3, 4, 5},
ContentTopic: testContentTopic,
Version: proto.Uint32(0),
Timestamp: proto.Int64(time.Now().Unix()),
}, 0, ""))
wg.Wait()
time.Sleep(100 * time.Millisecond)
require.Equal(t, 0, len(client.telemetryRetryCache))
}