2021-11-03 12:38:37 +00:00
package telemetry
import (
"bytes"
2024-06-13 22:31:09 +00:00
"context"
2021-11-03 12:38:37 +00:00
"encoding/json"
"fmt"
"net/http"
2024-08-01 03:27:43 +00:00
"strings"
2024-06-28 10:24:04 +00:00
"sync"
2021-11-03 12:38:37 +00:00
"time"
"go.uber.org/zap"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/transport"
2024-06-13 22:31:09 +00:00
"github.com/status-im/status-go/wakuv2"
2023-10-30 14:51:57 +00:00
v2protocol "github.com/waku-org/go-waku/waku/v2/protocol"
2024-06-19 10:40:52 +00:00
v1protocol "github.com/status-im/status-go/protocol/v1"
2024-09-20 12:24:34 +00:00
"github.com/status-im/status-go/utils"
2021-11-03 12:38:37 +00:00
)
2024-06-13 22:31:09 +00:00
type TelemetryType string
const (
2024-06-28 10:24:04 +00:00
ProtocolStatsMetric TelemetryType = "ProtocolStats"
ReceivedEnvelopeMetric TelemetryType = "ReceivedEnvelope"
SentEnvelopeMetric TelemetryType = "SentEnvelope"
UpdateEnvelopeMetric TelemetryType = "UpdateEnvelope"
ReceivedMessagesMetric TelemetryType = "ReceivedMessages"
ErrorSendingEnvelopeMetric TelemetryType = "ErrorSendingEnvelope"
2024-07-12 20:37:55 +00:00
PeerCountMetric TelemetryType = "PeerCount"
2024-08-05 18:44:57 +00:00
PeerConnFailuresMetric TelemetryType = "PeerConnFailure"
2024-07-01 18:08:54 +00:00
MaxRetryCache = 5000
2024-06-13 22:31:09 +00:00
)
type TelemetryRequest struct {
Id int ` json:"id" `
TelemetryType TelemetryType ` json:"telemetry_type" `
TelemetryData * json . RawMessage ` json:"telemetry_data" `
}
2024-08-12 21:30:13 +00:00
func ( c * Client ) PushReceivedMessages ( ctx context . Context , receivedMessages ReceivedMessages ) {
c . processAndPushTelemetry ( ctx , receivedMessages )
2024-06-13 22:31:09 +00:00
}
2024-08-12 21:30:13 +00:00
func ( c * Client ) PushSentEnvelope ( ctx context . Context , sentEnvelope wakuv2 . SentEnvelope ) {
c . processAndPushTelemetry ( ctx , sentEnvelope )
2024-06-13 22:31:09 +00:00
}
2024-08-12 21:30:13 +00:00
func ( c * Client ) PushReceivedEnvelope ( ctx context . Context , receivedEnvelope * v2protocol . Envelope ) {
c . processAndPushTelemetry ( ctx , receivedEnvelope )
2024-06-28 10:24:04 +00:00
}
2024-08-12 21:30:13 +00:00
func ( c * Client ) PushErrorSendingEnvelope ( ctx context . Context , errorSendingEnvelope wakuv2 . ErrorSendingEnvelope ) {
c . processAndPushTelemetry ( ctx , errorSendingEnvelope )
2024-06-13 22:31:09 +00:00
}
2024-08-12 21:30:13 +00:00
func ( c * Client ) PushPeerCount ( ctx context . Context , peerCount int ) {
2024-08-30 16:49:31 +00:00
now := time . Now ( )
if peerCount != c . lastPeerCount && now . Sub ( c . lastPeerCountTime ) > 1 * time . Second {
2024-08-05 18:44:57 +00:00
c . lastPeerCount = peerCount
2024-08-30 16:49:31 +00:00
c . lastPeerCountTime = now
2024-08-12 21:30:13 +00:00
c . processAndPushTelemetry ( ctx , PeerCount { PeerCount : peerCount } )
2024-08-05 18:44:57 +00:00
}
}
2024-08-12 21:30:13 +00:00
func ( c * Client ) PushPeerConnFailures ( ctx context . Context , peerConnFailures map [ string ] int ) {
2024-08-05 18:44:57 +00:00
for peerID , failures := range peerConnFailures {
if lastFailures , exists := c . lastPeerConnFailures [ peerID ] ; exists {
if failures == lastFailures {
continue
}
}
c . lastPeerConnFailures [ peerID ] = failures
2024-08-12 21:30:13 +00:00
c . processAndPushTelemetry ( ctx , PeerConnFailure { FailedPeerId : peerID , FailureCount : failures } )
2024-08-05 18:44:57 +00:00
}
2024-07-12 20:37:55 +00:00
}
2024-06-13 22:31:09 +00:00
type ReceivedMessages struct {
Filter transport . Filter
SSHMessage * types . Message
Messages [ ] * v1protocol . StatusMessage
}
2024-07-12 20:37:55 +00:00
type PeerCount struct {
PeerCount int
}
2024-08-05 18:44:57 +00:00
type PeerConnFailure struct {
FailedPeerId string
FailureCount int
}
2021-11-03 12:38:37 +00:00
type Client struct {
2024-08-05 18:44:57 +00:00
serverURL string
httpClient * http . Client
logger * zap . Logger
keyUID string
nodeName string
peerId string
version string
telemetryCh chan TelemetryRequest
telemetryCacheLock sync . Mutex
telemetryCache [ ] TelemetryRequest
telemetryRetryCache [ ] TelemetryRequest
nextIdLock sync . Mutex
nextId int
sendPeriod time . Duration
lastPeerCount int
2024-08-30 16:49:31 +00:00
lastPeerCountTime time . Time
2024-08-05 18:44:57 +00:00
lastPeerConnFailures map [ string ] int
2024-08-30 15:59:03 +00:00
deviceType string
2021-11-03 12:38:37 +00:00
}
2024-06-28 10:24:04 +00:00
type TelemetryClientOption func ( * Client )
func WithSendPeriod ( sendPeriod time . Duration ) TelemetryClientOption {
return func ( c * Client ) {
c . sendPeriod = sendPeriod
}
}
2024-08-01 03:27:43 +00:00
func WithPeerID ( peerId string ) TelemetryClientOption {
return func ( c * Client ) {
c . peerId = peerId
}
}
2024-06-28 10:24:04 +00:00
func NewClient ( logger * zap . Logger , serverURL string , keyUID string , nodeName string , version string , opts ... TelemetryClientOption ) * Client {
2024-08-01 03:27:43 +00:00
serverURL = strings . TrimRight ( serverURL , "/" )
2024-06-28 10:24:04 +00:00
client := & Client {
2024-08-05 18:44:57 +00:00
serverURL : serverURL ,
httpClient : & http . Client { Timeout : time . Minute } ,
logger : logger ,
keyUID : keyUID ,
nodeName : nodeName ,
version : version ,
telemetryCh : make ( chan TelemetryRequest ) ,
telemetryCacheLock : sync . Mutex { } ,
telemetryCache : make ( [ ] TelemetryRequest , 0 ) ,
telemetryRetryCache : make ( [ ] TelemetryRequest , 0 ) ,
nextId : 0 ,
nextIdLock : sync . Mutex { } ,
sendPeriod : 10 * time . Second , // default value
lastPeerCount : 0 ,
2024-08-30 16:49:31 +00:00
lastPeerCountTime : time . Time { } ,
2024-08-05 18:44:57 +00:00
lastPeerConnFailures : make ( map [ string ] int ) ,
2024-06-28 10:24:04 +00:00
}
for _ , opt := range opts {
opt ( client )
2024-06-13 22:31:09 +00:00
}
2024-06-28 10:24:04 +00:00
return client
2024-06-13 22:31:09 +00:00
}
2024-08-30 15:59:03 +00:00
func ( c * Client ) SetDeviceType ( deviceType string ) {
c . deviceType = deviceType
}
2024-09-20 12:24:34 +00:00
func ( c * Client ) Foo ( ) {
}
func Bar ( ) {
defer utils . LogOnPanic ( )
}
2024-06-28 10:24:04 +00:00
func ( c * Client ) Start ( ctx context . Context ) {
2024-09-20 12:24:34 +00:00
go c . Foo ( )
go Bar ( )
2024-06-13 22:31:09 +00:00
go func ( ) {
2024-09-20 12:24:34 +00:00
defer utils . LogOnPanic ( )
2024-06-13 22:31:09 +00:00
for {
select {
2024-06-28 10:24:04 +00:00
case telemetryRequest := <- c . telemetryCh :
c . telemetryCacheLock . Lock ( )
c . telemetryCache = append ( c . telemetryCache , telemetryRequest )
c . telemetryCacheLock . Unlock ( )
2024-06-13 22:31:09 +00:00
case <- ctx . Done ( ) :
return
}
}
} ( )
go func ( ) {
2024-07-01 18:08:54 +00:00
sendPeriod := c . sendPeriod
timer := time . NewTimer ( sendPeriod )
defer timer . Stop ( )
2024-06-13 22:31:09 +00:00
for {
select {
2024-07-01 18:08:54 +00:00
case <- timer . C :
2024-06-28 10:24:04 +00:00
c . telemetryCacheLock . Lock ( )
telemetryRequests := make ( [ ] TelemetryRequest , len ( c . telemetryCache ) )
copy ( telemetryRequests , c . telemetryCache )
c . telemetryCache = nil
c . telemetryCacheLock . Unlock ( )
2024-06-13 22:31:09 +00:00
if len ( telemetryRequests ) > 0 {
2024-07-01 18:08:54 +00:00
err := c . pushTelemetryRequest ( telemetryRequests )
if err != nil {
2024-07-12 01:07:23 +00:00
if sendPeriod < 60 * time . Second { //Stop the growing if the timer is > 60s to at least retry every minute
2024-07-01 18:08:54 +00:00
sendPeriod = sendPeriod * 2
}
} else {
sendPeriod = c . sendPeriod
}
2024-06-13 22:31:09 +00:00
}
2024-07-01 18:08:54 +00:00
timer . Reset ( sendPeriod )
2024-06-13 22:31:09 +00:00
case <- ctx . Done ( ) :
return
}
}
2024-06-28 10:24:04 +00:00
2024-06-13 22:31:09 +00:00
} ( )
}
2024-08-12 21:30:13 +00:00
func ( c * Client ) processAndPushTelemetry ( ctx context . Context , data interface { } ) {
2024-06-13 22:31:09 +00:00
var telemetryRequest TelemetryRequest
switch v := data . ( type ) {
case ReceivedMessages :
telemetryRequest = TelemetryRequest {
Id : c . nextId ,
TelemetryType : ReceivedMessagesMetric ,
TelemetryData : c . ProcessReceivedMessages ( v ) ,
}
case * v2protocol . Envelope :
telemetryRequest = TelemetryRequest {
Id : c . nextId ,
TelemetryType : ReceivedEnvelopeMetric ,
TelemetryData : c . ProcessReceivedEnvelope ( v ) ,
}
case wakuv2 . SentEnvelope :
telemetryRequest = TelemetryRequest {
Id : c . nextId ,
TelemetryType : SentEnvelopeMetric ,
TelemetryData : c . ProcessSentEnvelope ( v ) ,
}
2024-06-28 10:24:04 +00:00
case wakuv2 . ErrorSendingEnvelope :
telemetryRequest = TelemetryRequest {
Id : c . nextId ,
TelemetryType : ErrorSendingEnvelopeMetric ,
TelemetryData : c . ProcessErrorSendingEnvelope ( v ) ,
}
2024-07-12 20:37:55 +00:00
case PeerCount :
telemetryRequest = TelemetryRequest {
Id : c . nextId ,
TelemetryType : PeerCountMetric ,
TelemetryData : c . ProcessPeerCount ( v ) ,
}
2024-08-05 18:44:57 +00:00
case PeerConnFailure :
telemetryRequest = TelemetryRequest {
Id : c . nextId ,
TelemetryType : PeerConnFailuresMetric ,
TelemetryData : c . ProcessPeerConnFailure ( v ) ,
}
2024-06-13 22:31:09 +00:00
default :
c . logger . Error ( "Unknown telemetry data type" )
return
}
2024-08-12 21:30:13 +00:00
select {
case <- ctx . Done ( ) :
return
case c . telemetryCh <- telemetryRequest :
}
2024-06-28 10:24:04 +00:00
c . nextIdLock . Lock ( )
c . nextId ++
c . nextIdLock . Unlock ( )
2024-06-13 22:31:09 +00:00
}
2024-07-01 18:08:54 +00:00
// This is assuming to not run concurrently as we are not locking the `telemetryRetryCache`
func ( c * Client ) pushTelemetryRequest ( request [ ] TelemetryRequest ) error {
2024-07-12 01:07:23 +00:00
if len ( c . telemetryRetryCache ) > MaxRetryCache { //Limit the size of the cache to not grow the slice indefinitely in case the Telemetry server is gone for longer time
removeNum := len ( c . telemetryRetryCache ) - MaxRetryCache
2024-07-01 18:08:54 +00:00
c . telemetryRetryCache = c . telemetryRetryCache [ removeNum : ]
}
c . telemetryRetryCache = append ( c . telemetryRetryCache , request ... )
2024-06-13 22:31:09 +00:00
url := fmt . Sprintf ( "%s/record-metrics" , c . serverURL )
2024-07-01 18:08:54 +00:00
body , err := json . Marshal ( c . telemetryRetryCache )
if err != nil {
c . logger . Error ( "Error marshaling telemetry data" , zap . Error ( err ) )
return err
}
res , err := c . httpClient . Post ( url , "application/json" , bytes . NewBuffer ( body ) )
2024-06-13 22:31:09 +00:00
if err != nil {
c . logger . Error ( "Error sending telemetry data" , zap . Error ( err ) )
2024-07-01 18:08:54 +00:00
return err
}
2024-07-12 20:37:55 +00:00
defer res . Body . Close ( )
var responseBody [ ] map [ string ] interface { }
if err := json . NewDecoder ( res . Body ) . Decode ( & responseBody ) ; err != nil {
c . logger . Error ( "Error decoding response body" , zap . Error ( err ) )
return err
}
if res . StatusCode != http . StatusCreated {
c . logger . Error ( "Error sending telemetry data" , zap . Int ( "statusCode" , res . StatusCode ) , zap . Any ( "responseBody" , responseBody ) )
return fmt . Errorf ( "status code %d, response body: %v" , res . StatusCode , responseBody )
2021-11-03 12:38:37 +00:00
}
2024-07-01 18:08:54 +00:00
c . telemetryRetryCache = nil
return nil
2021-11-03 12:38:37 +00:00
}
2024-08-30 15:59:03 +00:00
func ( c * Client ) commonPostBody ( ) map [ string ] interface { } {
return map [ string ] interface { } {
"nodeName" : c . nodeName ,
"peerId" : c . peerId ,
"statusVersion" : c . version ,
"deviceType" : c . deviceType ,
"timestamp" : time . Now ( ) . Unix ( ) ,
}
}
2024-06-13 22:31:09 +00:00
func ( c * Client ) ProcessReceivedMessages ( receivedMessages ReceivedMessages ) * json . RawMessage {
2021-11-03 12:38:37 +00:00
var postBody [ ] map [ string ] interface { }
2024-06-13 22:31:09 +00:00
for _ , message := range receivedMessages . Messages {
2024-08-30 15:59:03 +00:00
messageBody := c . commonPostBody ( )
messageBody [ "chatId" ] = receivedMessages . Filter . ChatID
messageBody [ "messageHash" ] = types . EncodeHex ( receivedMessages . SSHMessage . Hash )
messageBody [ "messageId" ] = message . ApplicationLayer . ID
messageBody [ "sentAt" ] = receivedMessages . SSHMessage . Timestamp
messageBody [ "pubsubTopic" ] = receivedMessages . Filter . PubsubTopic
messageBody [ "topic" ] = receivedMessages . Filter . ContentTopic . String ( )
messageBody [ "messageType" ] = message . ApplicationLayer . Type . String ( )
messageBody [ "receiverKeyUID" ] = c . keyUID
messageBody [ "messageSize" ] = len ( receivedMessages . SSHMessage . Payload )
postBody = append ( postBody , messageBody )
2021-11-03 12:38:37 +00:00
}
body , _ := json . Marshal ( postBody )
2024-06-13 22:31:09 +00:00
jsonRawMessage := json . RawMessage ( body )
return & jsonRawMessage
2021-11-03 12:38:37 +00:00
}
2023-10-30 14:51:57 +00:00
2024-06-13 22:31:09 +00:00
func ( c * Client ) ProcessReceivedEnvelope ( envelope * v2protocol . Envelope ) * json . RawMessage {
2024-08-30 15:59:03 +00:00
postBody := c . commonPostBody ( )
postBody [ "messageHash" ] = envelope . Hash ( ) . String ( )
postBody [ "sentAt" ] = uint32 ( envelope . Message ( ) . GetTimestamp ( ) / int64 ( time . Second ) )
postBody [ "pubsubTopic" ] = envelope . PubsubTopic ( )
postBody [ "topic" ] = envelope . Message ( ) . ContentTopic
postBody [ "receiverKeyUID" ] = c . keyUID
2023-10-30 14:51:57 +00:00
body , _ := json . Marshal ( postBody )
2024-06-13 22:31:09 +00:00
jsonRawMessage := json . RawMessage ( body )
return & jsonRawMessage
}
func ( c * Client ) ProcessSentEnvelope ( sentEnvelope wakuv2 . SentEnvelope ) * json . RawMessage {
2024-08-30 15:59:03 +00:00
postBody := c . commonPostBody ( )
postBody [ "messageHash" ] = sentEnvelope . Envelope . Hash ( ) . String ( )
postBody [ "sentAt" ] = uint32 ( sentEnvelope . Envelope . Message ( ) . GetTimestamp ( ) / int64 ( time . Second ) )
postBody [ "pubsubTopic" ] = sentEnvelope . Envelope . PubsubTopic ( )
postBody [ "topic" ] = sentEnvelope . Envelope . Message ( ) . ContentTopic
postBody [ "senderKeyUID" ] = c . keyUID
postBody [ "publishMethod" ] = sentEnvelope . PublishMethod . String ( )
2024-06-13 22:31:09 +00:00
body , _ := json . Marshal ( postBody )
jsonRawMessage := json . RawMessage ( body )
return & jsonRawMessage
2023-10-30 14:51:57 +00:00
}
2024-06-28 10:24:04 +00:00
func ( c * Client ) ProcessErrorSendingEnvelope ( errorSendingEnvelope wakuv2 . ErrorSendingEnvelope ) * json . RawMessage {
2024-08-30 15:59:03 +00:00
postBody := c . commonPostBody ( )
postBody [ "messageHash" ] = errorSendingEnvelope . SentEnvelope . Envelope . Hash ( ) . String ( )
postBody [ "sentAt" ] = uint32 ( errorSendingEnvelope . SentEnvelope . Envelope . Message ( ) . GetTimestamp ( ) / int64 ( time . Second ) )
postBody [ "pubsubTopic" ] = errorSendingEnvelope . SentEnvelope . Envelope . PubsubTopic ( )
postBody [ "topic" ] = errorSendingEnvelope . SentEnvelope . Envelope . Message ( ) . ContentTopic
postBody [ "senderKeyUID" ] = c . keyUID
postBody [ "publishMethod" ] = errorSendingEnvelope . SentEnvelope . PublishMethod . String ( )
postBody [ "error" ] = errorSendingEnvelope . Error . Error ( )
2024-06-28 10:24:04 +00:00
body , _ := json . Marshal ( postBody )
jsonRawMessage := json . RawMessage ( body )
return & jsonRawMessage
}
2024-07-12 20:37:55 +00:00
func ( c * Client ) ProcessPeerCount ( peerCount PeerCount ) * json . RawMessage {
2024-08-30 15:59:03 +00:00
postBody := c . commonPostBody ( )
postBody [ "peerCount" ] = peerCount . PeerCount
2024-07-12 20:37:55 +00:00
body , _ := json . Marshal ( postBody )
2024-08-05 18:44:57 +00:00
jsonRawMessage := json . RawMessage ( body )
return & jsonRawMessage
}
func ( c * Client ) ProcessPeerConnFailure ( peerConnFailure PeerConnFailure ) * json . RawMessage {
2024-08-30 15:59:03 +00:00
postBody := c . commonPostBody ( )
postBody [ "failedPeerId" ] = peerConnFailure . FailedPeerId
postBody [ "failureCount" ] = peerConnFailure . FailureCount
postBody [ "nodeKeyUID" ] = c . keyUID
2024-08-05 18:44:57 +00:00
body , _ := json . Marshal ( postBody )
2024-07-12 20:37:55 +00:00
jsonRawMessage := json . RawMessage ( body )
return & jsonRawMessage
}
2023-10-30 14:51:57 +00:00
func ( c * Client ) UpdateEnvelopeProcessingError ( shhMessage * types . Message , processingError error ) {
c . logger . Debug ( "Pushing envelope update to telemetry server" , zap . String ( "hash" , types . EncodeHex ( shhMessage . Hash ) ) )
url := fmt . Sprintf ( "%s/update-envelope" , c . serverURL )
var errorString = ""
if processingError != nil {
errorString = processingError . Error ( )
}
postBody := map [ string ] interface { } {
"messageHash" : types . EncodeHex ( shhMessage . Hash ) ,
"sentAt" : shhMessage . Timestamp ,
"pubsubTopic" : shhMessage . PubsubTopic ,
"topic" : shhMessage . Topic ,
"receiverKeyUID" : c . keyUID ,
2024-08-01 03:27:43 +00:00
"peerId" : c . peerId ,
2023-10-30 14:51:57 +00:00
"nodeName" : c . nodeName ,
"processingError" : errorString ,
2024-08-30 15:59:03 +00:00
"deviceType" : c . deviceType ,
2023-10-30 14:51:57 +00:00
}
body , _ := json . Marshal ( postBody )
_ , err := c . httpClient . Post ( url , "application/json" , bytes . NewBuffer ( body ) )
if err != nil {
c . logger . Error ( "Error sending envelope update to telemetry server" , zap . Error ( err ) )
}
}