2021-11-03 12:38:37 +00:00
package telemetry
import (
"bytes"
2024-06-13 22:31:09 +00:00
"context"
2021-11-03 12:38:37 +00:00
"encoding/json"
"fmt"
"net/http"
2024-06-28 10:24:04 +00:00
"sync"
2021-11-03 12:38:37 +00:00
"time"
"go.uber.org/zap"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/transport"
2024-06-13 22:31:09 +00:00
"github.com/status-im/status-go/wakuv2"
2023-10-30 14:51:57 +00:00
v2protocol "github.com/waku-org/go-waku/waku/v2/protocol"
2024-06-19 10:40:52 +00:00
v1protocol "github.com/status-im/status-go/protocol/v1"
2021-11-03 12:38:37 +00:00
)
2024-06-13 22:31:09 +00:00
type TelemetryType string
const (
2024-06-28 10:24:04 +00:00
ProtocolStatsMetric TelemetryType = "ProtocolStats"
ReceivedEnvelopeMetric TelemetryType = "ReceivedEnvelope"
SentEnvelopeMetric TelemetryType = "SentEnvelope"
UpdateEnvelopeMetric TelemetryType = "UpdateEnvelope"
ReceivedMessagesMetric TelemetryType = "ReceivedMessages"
ErrorSendingEnvelopeMetric TelemetryType = "ErrorSendingEnvelope"
2024-07-12 20:37:55 +00:00
PeerCountMetric TelemetryType = "PeerCount"
2024-07-01 18:08:54 +00:00
MaxRetryCache = 5000
2024-06-13 22:31:09 +00:00
)
type TelemetryRequest struct {
Id int ` json:"id" `
TelemetryType TelemetryType ` json:"telemetry_type" `
TelemetryData * json . RawMessage ` json:"telemetry_data" `
}
2024-08-13 19:13:15 +00:00
func ( c * Client ) PushReceivedMessages ( ctx context . Context , receivedMessages ReceivedMessages ) {
c . processAndPushTelemetry ( ctx , receivedMessages )
2024-06-13 22:31:09 +00:00
}
2024-08-13 19:13:15 +00:00
func ( c * Client ) PushSentEnvelope ( ctx context . Context , sentEnvelope wakuv2 . SentEnvelope ) {
c . processAndPushTelemetry ( ctx , sentEnvelope )
2024-06-13 22:31:09 +00:00
}
2024-08-13 19:13:15 +00:00
func ( c * Client ) PushReceivedEnvelope ( ctx context . Context , receivedEnvelope * v2protocol . Envelope ) {
c . processAndPushTelemetry ( ctx , receivedEnvelope )
2024-06-28 10:24:04 +00:00
}
2024-08-13 19:13:15 +00:00
func ( c * Client ) PushErrorSendingEnvelope ( ctx context . Context , errorSendingEnvelope wakuv2 . ErrorSendingEnvelope ) {
c . processAndPushTelemetry ( ctx , errorSendingEnvelope )
2024-06-13 22:31:09 +00:00
}
2024-08-13 19:13:15 +00:00
func ( c * Client ) PushPeerCount ( ctx context . Context , peerCount int ) {
c . processAndPushTelemetry ( ctx , PeerCount { PeerCount : peerCount } )
2024-07-12 20:37:55 +00:00
}
2024-06-13 22:31:09 +00:00
type ReceivedMessages struct {
Filter transport . Filter
SSHMessage * types . Message
Messages [ ] * v1protocol . StatusMessage
}
2024-07-12 20:37:55 +00:00
type PeerCount struct {
PeerCount int
}
2021-11-03 12:38:37 +00:00
type Client struct {
2024-07-01 18:08:54 +00:00
serverURL string
httpClient * http . Client
logger * zap . Logger
keyUID string
nodeName string
version string
telemetryCh chan TelemetryRequest
telemetryCacheLock sync . Mutex
telemetryCache [ ] TelemetryRequest
telemetryRetryCache [ ] TelemetryRequest
nextIdLock sync . Mutex
nextId int
sendPeriod time . Duration
2021-11-03 12:38:37 +00:00
}
2024-06-28 10:24:04 +00:00
type TelemetryClientOption func ( * Client )
func WithSendPeriod ( sendPeriod time . Duration ) TelemetryClientOption {
return func ( c * Client ) {
c . sendPeriod = sendPeriod
}
}
func NewClient ( logger * zap . Logger , serverURL string , keyUID string , nodeName string , version string , opts ... TelemetryClientOption ) * Client {
client := & Client {
2024-07-01 18:08:54 +00:00
serverURL : serverURL ,
httpClient : & http . Client { Timeout : time . Minute } ,
logger : logger ,
keyUID : keyUID ,
nodeName : nodeName ,
version : version ,
telemetryCh : make ( chan TelemetryRequest ) ,
telemetryCacheLock : sync . Mutex { } ,
telemetryCache : make ( [ ] TelemetryRequest , 0 ) ,
telemetryRetryCache : make ( [ ] TelemetryRequest , 0 ) ,
nextId : 0 ,
nextIdLock : sync . Mutex { } ,
sendPeriod : 10 * time . Second , // default value
2024-06-28 10:24:04 +00:00
}
for _ , opt := range opts {
opt ( client )
2024-06-13 22:31:09 +00:00
}
2024-06-28 10:24:04 +00:00
return client
2024-06-13 22:31:09 +00:00
}
2024-06-28 10:24:04 +00:00
func ( c * Client ) Start ( ctx context . Context ) {
2024-06-13 22:31:09 +00:00
go func ( ) {
for {
select {
2024-06-28 10:24:04 +00:00
case telemetryRequest := <- c . telemetryCh :
c . telemetryCacheLock . Lock ( )
c . telemetryCache = append ( c . telemetryCache , telemetryRequest )
c . telemetryCacheLock . Unlock ( )
2024-06-13 22:31:09 +00:00
case <- ctx . Done ( ) :
return
}
}
} ( )
go func ( ) {
2024-07-01 18:08:54 +00:00
sendPeriod := c . sendPeriod
timer := time . NewTimer ( sendPeriod )
defer timer . Stop ( )
2024-06-13 22:31:09 +00:00
for {
select {
2024-07-01 18:08:54 +00:00
case <- timer . C :
2024-06-28 10:24:04 +00:00
c . telemetryCacheLock . Lock ( )
telemetryRequests := make ( [ ] TelemetryRequest , len ( c . telemetryCache ) )
copy ( telemetryRequests , c . telemetryCache )
c . telemetryCache = nil
c . telemetryCacheLock . Unlock ( )
2024-06-13 22:31:09 +00:00
if len ( telemetryRequests ) > 0 {
2024-07-01 18:08:54 +00:00
err := c . pushTelemetryRequest ( telemetryRequests )
if err != nil {
2024-07-12 01:07:23 +00:00
if sendPeriod < 60 * time . Second { //Stop the growing if the timer is > 60s to at least retry every minute
2024-07-01 18:08:54 +00:00
sendPeriod = sendPeriod * 2
}
} else {
sendPeriod = c . sendPeriod
}
2024-06-13 22:31:09 +00:00
}
2024-07-01 18:08:54 +00:00
timer . Reset ( sendPeriod )
2024-06-13 22:31:09 +00:00
case <- ctx . Done ( ) :
return
}
}
2024-06-28 10:24:04 +00:00
2024-06-13 22:31:09 +00:00
} ( )
}
2024-08-13 19:13:15 +00:00
func ( c * Client ) processAndPushTelemetry ( ctx context . Context , data interface { } ) {
2024-06-13 22:31:09 +00:00
var telemetryRequest TelemetryRequest
switch v := data . ( type ) {
case ReceivedMessages :
telemetryRequest = TelemetryRequest {
Id : c . nextId ,
TelemetryType : ReceivedMessagesMetric ,
TelemetryData : c . ProcessReceivedMessages ( v ) ,
}
case * v2protocol . Envelope :
telemetryRequest = TelemetryRequest {
Id : c . nextId ,
TelemetryType : ReceivedEnvelopeMetric ,
TelemetryData : c . ProcessReceivedEnvelope ( v ) ,
}
case wakuv2 . SentEnvelope :
telemetryRequest = TelemetryRequest {
Id : c . nextId ,
TelemetryType : SentEnvelopeMetric ,
TelemetryData : c . ProcessSentEnvelope ( v ) ,
}
2024-06-28 10:24:04 +00:00
case wakuv2 . ErrorSendingEnvelope :
telemetryRequest = TelemetryRequest {
Id : c . nextId ,
TelemetryType : ErrorSendingEnvelopeMetric ,
TelemetryData : c . ProcessErrorSendingEnvelope ( v ) ,
}
2024-07-12 20:37:55 +00:00
case PeerCount :
telemetryRequest = TelemetryRequest {
Id : c . nextId ,
TelemetryType : PeerCountMetric ,
TelemetryData : c . ProcessPeerCount ( v ) ,
}
2024-06-13 22:31:09 +00:00
default :
c . logger . Error ( "Unknown telemetry data type" )
return
}
2024-08-13 19:13:15 +00:00
select {
case <- ctx . Done ( ) :
return
case c . telemetryCh <- telemetryRequest :
}
2024-06-28 10:24:04 +00:00
c . nextIdLock . Lock ( )
c . nextId ++
c . nextIdLock . Unlock ( )
2024-06-13 22:31:09 +00:00
}
2024-07-01 18:08:54 +00:00
// This is assuming to not run concurrently as we are not locking the `telemetryRetryCache`
func ( c * Client ) pushTelemetryRequest ( request [ ] TelemetryRequest ) error {
2024-07-12 01:07:23 +00:00
if len ( c . telemetryRetryCache ) > MaxRetryCache { //Limit the size of the cache to not grow the slice indefinitely in case the Telemetry server is gone for longer time
removeNum := len ( c . telemetryRetryCache ) - MaxRetryCache
2024-07-01 18:08:54 +00:00
c . telemetryRetryCache = c . telemetryRetryCache [ removeNum : ]
}
c . telemetryRetryCache = append ( c . telemetryRetryCache , request ... )
2024-06-13 22:31:09 +00:00
url := fmt . Sprintf ( "%s/record-metrics" , c . serverURL )
2024-07-01 18:08:54 +00:00
body , err := json . Marshal ( c . telemetryRetryCache )
if err != nil {
c . logger . Error ( "Error marshaling telemetry data" , zap . Error ( err ) )
return err
}
res , err := c . httpClient . Post ( url , "application/json" , bytes . NewBuffer ( body ) )
2024-06-13 22:31:09 +00:00
if err != nil {
c . logger . Error ( "Error sending telemetry data" , zap . Error ( err ) )
2024-07-01 18:08:54 +00:00
return err
}
2024-07-12 20:37:55 +00:00
defer res . Body . Close ( )
var responseBody [ ] map [ string ] interface { }
if err := json . NewDecoder ( res . Body ) . Decode ( & responseBody ) ; err != nil {
c . logger . Error ( "Error decoding response body" , zap . Error ( err ) )
return err
}
if res . StatusCode != http . StatusCreated {
c . logger . Error ( "Error sending telemetry data" , zap . Int ( "statusCode" , res . StatusCode ) , zap . Any ( "responseBody" , responseBody ) )
return fmt . Errorf ( "status code %d, response body: %v" , res . StatusCode , responseBody )
2021-11-03 12:38:37 +00:00
}
2024-07-01 18:08:54 +00:00
c . telemetryRetryCache = nil
return nil
2021-11-03 12:38:37 +00:00
}
2024-06-13 22:31:09 +00:00
func ( c * Client ) ProcessReceivedMessages ( receivedMessages ReceivedMessages ) * json . RawMessage {
2021-11-03 12:38:37 +00:00
var postBody [ ] map [ string ] interface { }
2024-06-13 22:31:09 +00:00
for _ , message := range receivedMessages . Messages {
2021-11-03 12:38:37 +00:00
postBody = append ( postBody , map [ string ] interface { } {
2024-06-13 22:31:09 +00:00
"chatId" : receivedMessages . Filter . ChatID ,
"messageHash" : types . EncodeHex ( receivedMessages . SSHMessage . Hash ) ,
2023-11-08 18:05:33 +00:00
"messageId" : message . ApplicationLayer . ID ,
2024-06-13 22:31:09 +00:00
"sentAt" : receivedMessages . SSHMessage . Timestamp ,
"pubsubTopic" : receivedMessages . Filter . PubsubTopic ,
"topic" : receivedMessages . Filter . ContentTopic . String ( ) ,
2023-11-08 18:05:33 +00:00
"messageType" : message . ApplicationLayer . Type . String ( ) ,
2021-11-03 12:38:37 +00:00
"receiverKeyUID" : c . keyUID ,
"nodeName" : c . nodeName ,
2024-06-13 22:31:09 +00:00
"messageSize" : len ( receivedMessages . SSHMessage . Payload ) ,
"statusVersion" : c . version ,
2021-11-03 12:38:37 +00:00
} )
}
body , _ := json . Marshal ( postBody )
2024-06-13 22:31:09 +00:00
jsonRawMessage := json . RawMessage ( body )
return & jsonRawMessage
2021-11-03 12:38:37 +00:00
}
2023-10-30 14:51:57 +00:00
2024-06-13 22:31:09 +00:00
func ( c * Client ) ProcessReceivedEnvelope ( envelope * v2protocol . Envelope ) * json . RawMessage {
2023-10-30 14:51:57 +00:00
postBody := map [ string ] interface { } {
2024-05-15 23:15:00 +00:00
"messageHash" : envelope . Hash ( ) . String ( ) ,
2023-12-05 04:29:27 +00:00
"sentAt" : uint32 ( envelope . Message ( ) . GetTimestamp ( ) / int64 ( time . Second ) ) ,
2023-10-30 14:51:57 +00:00
"pubsubTopic" : envelope . PubsubTopic ( ) ,
"topic" : envelope . Message ( ) . ContentTopic ,
"receiverKeyUID" : c . keyUID ,
"nodeName" : c . nodeName ,
2024-06-13 22:31:09 +00:00
"statusVersion" : c . version ,
2023-10-30 14:51:57 +00:00
}
body , _ := json . Marshal ( postBody )
2024-06-13 22:31:09 +00:00
jsonRawMessage := json . RawMessage ( body )
return & jsonRawMessage
}
func ( c * Client ) ProcessSentEnvelope ( sentEnvelope wakuv2 . SentEnvelope ) * json . RawMessage {
postBody := map [ string ] interface { } {
"messageHash" : sentEnvelope . Envelope . Hash ( ) . String ( ) ,
"sentAt" : uint32 ( sentEnvelope . Envelope . Message ( ) . GetTimestamp ( ) / int64 ( time . Second ) ) ,
"pubsubTopic" : sentEnvelope . Envelope . PubsubTopic ( ) ,
"topic" : sentEnvelope . Envelope . Message ( ) . ContentTopic ,
"senderKeyUID" : c . keyUID ,
"nodeName" : c . nodeName ,
"publishMethod" : sentEnvelope . PublishMethod . String ( ) ,
"statusVersion" : c . version ,
2023-10-30 14:51:57 +00:00
}
2024-06-13 22:31:09 +00:00
body , _ := json . Marshal ( postBody )
jsonRawMessage := json . RawMessage ( body )
return & jsonRawMessage
2023-10-30 14:51:57 +00:00
}
2024-06-28 10:24:04 +00:00
func ( c * Client ) ProcessErrorSendingEnvelope ( errorSendingEnvelope wakuv2 . ErrorSendingEnvelope ) * json . RawMessage {
postBody := map [ string ] interface { } {
"messageHash" : errorSendingEnvelope . SentEnvelope . Envelope . Hash ( ) . String ( ) ,
"sentAt" : uint32 ( errorSendingEnvelope . SentEnvelope . Envelope . Message ( ) . GetTimestamp ( ) / int64 ( time . Second ) ) ,
"pubsubTopic" : errorSendingEnvelope . SentEnvelope . Envelope . PubsubTopic ( ) ,
"topic" : errorSendingEnvelope . SentEnvelope . Envelope . Message ( ) . ContentTopic ,
"senderKeyUID" : c . keyUID ,
"nodeName" : c . nodeName ,
"publishMethod" : errorSendingEnvelope . SentEnvelope . PublishMethod . String ( ) ,
"statusVersion" : c . version ,
"error" : errorSendingEnvelope . Error . Error ( ) ,
}
body , _ := json . Marshal ( postBody )
jsonRawMessage := json . RawMessage ( body )
return & jsonRawMessage
}
2024-07-12 20:37:55 +00:00
func ( c * Client ) ProcessPeerCount ( peerCount PeerCount ) * json . RawMessage {
postBody := map [ string ] interface { } {
"peerCount" : peerCount . PeerCount ,
"nodeName" : c . nodeName ,
"nodeKeyUID" : c . keyUID ,
"statusVersion" : c . version ,
"timestamp" : time . Now ( ) . Unix ( ) ,
}
body , _ := json . Marshal ( postBody )
jsonRawMessage := json . RawMessage ( body )
return & jsonRawMessage
}
2023-10-30 14:51:57 +00:00
func ( c * Client ) UpdateEnvelopeProcessingError ( shhMessage * types . Message , processingError error ) {
c . logger . Debug ( "Pushing envelope update to telemetry server" , zap . String ( "hash" , types . EncodeHex ( shhMessage . Hash ) ) )
url := fmt . Sprintf ( "%s/update-envelope" , c . serverURL )
var errorString = ""
if processingError != nil {
errorString = processingError . Error ( )
}
postBody := map [ string ] interface { } {
"messageHash" : types . EncodeHex ( shhMessage . Hash ) ,
"sentAt" : shhMessage . Timestamp ,
"pubsubTopic" : shhMessage . PubsubTopic ,
"topic" : shhMessage . Topic ,
"receiverKeyUID" : c . keyUID ,
"nodeName" : c . nodeName ,
"processingError" : errorString ,
}
body , _ := json . Marshal ( postBody )
_ , err := c . httpClient . Post ( url , "application/json" , bytes . NewBuffer ( body ) )
if err != nil {
c . logger . Error ( "Error sending envelope update to telemetry server" , zap . Error ( err ) )
}
}