2021-11-03 13:38:37 +01:00
package telemetry
import (
"bytes"
2024-06-13 15:31:09 -07:00
"context"
2021-11-03 13:38:37 +01:00
"encoding/json"
"fmt"
"net/http"
2024-08-01 05:27:43 +02:00
"strings"
2024-06-28 03:24:04 -07:00
"sync"
2021-11-03 13:38:37 +01:00
"time"
"go.uber.org/zap"
2024-09-27 06:37:32 +08:00
"github.com/status-im/status-go/common"
2021-11-03 13:38:37 +01:00
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/transport"
2024-06-13 15:31:09 -07:00
"github.com/status-im/status-go/wakuv2"
2023-10-30 15:51:57 +01:00
2024-09-18 21:43:04 -07:00
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
2024-09-24 14:52:29 +01:00
2024-11-03 00:47:15 +00:00
v2protocol "github.com/waku-org/go-waku/waku/v2/protocol"
2024-09-24 14:52:29 +01:00
v1protocol "github.com/status-im/status-go/protocol/v1"
2024-10-28 15:01:18 -07:00
v2common "github.com/status-im/status-go/wakuv2/common"
2021-11-03 13:38:37 +01:00
)
2024-06-13 15:31:09 -07:00
type TelemetryType string
const (
2024-10-28 15:01:18 -07:00
// Bandwidth as reported by libp2p
ProtocolStatsMetric TelemetryType = "ProtocolStats"
// Envelopes sent by this node
SentEnvelopeMetric TelemetryType = "SentEnvelope"
// Change in status of a sent envelope (usually processing errors)
UpdateEnvelopeMetric TelemetryType = "UpdateEnvelope"
// Messages received by this node
ReceivedMessagesMetric TelemetryType = "ReceivedMessages"
// Errors encountered when sending envelopes
2024-06-28 03:24:04 -07:00
ErrorSendingEnvelopeMetric TelemetryType = "ErrorSendingEnvelope"
2024-10-28 15:01:18 -07:00
// Total connections for this node at a given time
PeerCountMetric TelemetryType = "PeerCount"
// Number of failed peer connections for this node at a given time
PeerConnFailuresMetric TelemetryType = "PeerConnFailure"
// Store confirmation for a sent message successful
MessageCheckSuccessMetric TelemetryType = "MessageCheckSuccess"
// Store confirmation for a sent message failed
MessageCheckFailureMetric TelemetryType = "MessageCheckFailure"
// Total connections for this node per shard at a given time
PeerCountByShardMetric TelemetryType = "PeerCountByShard"
// Total connections for this node per discovery origin at a given time
PeerCountByOriginMetric TelemetryType = "PeerCountByOrigin"
// Error encountered when attempting to dial a peer
DialFailureMetric TelemetryType = "DialFailure"
// Missed message as detected by periodic store query
MissedMessageMetric TelemetryType = "MissedMessages"
// Missed message with a relevant filter
MissedRelevantMessageMetric TelemetryType = "MissedRelevantMessages"
// MVDS ack received for a sent message
MessageDeliveryConfirmedMetric TelemetryType = "MessageDeliveryConfirmed"
2024-12-06 10:47:51 -08:00
// Total number and size of Waku messages sent by this node
SentMessageTotalMetric TelemetryType = "SentMessageTotal"
2024-06-13 15:31:09 -07:00
)
2024-10-28 15:01:18 -07:00
const MaxRetryCache = 5000
2024-06-13 15:31:09 -07:00
type TelemetryRequest struct {
Id int ` json:"id" `
TelemetryType TelemetryType ` json:"telemetry_type" `
TelemetryData * json . RawMessage ` json:"telemetry_data" `
}
2024-08-12 22:30:13 +01:00
func ( c * Client ) PushReceivedMessages ( ctx context . Context , receivedMessages ReceivedMessages ) {
c . processAndPushTelemetry ( ctx , receivedMessages )
2024-06-13 15:31:09 -07:00
}
2024-08-12 22:30:13 +01:00
func ( c * Client ) PushSentEnvelope ( ctx context . Context , sentEnvelope wakuv2 . SentEnvelope ) {
c . processAndPushTelemetry ( ctx , sentEnvelope )
2024-06-13 15:31:09 -07:00
}
2024-08-12 22:30:13 +01:00
func ( c * Client ) PushReceivedEnvelope ( ctx context . Context , receivedEnvelope * v2protocol . Envelope ) {
c . processAndPushTelemetry ( ctx , receivedEnvelope )
2024-06-28 03:24:04 -07:00
}
2024-08-12 22:30:13 +01:00
func ( c * Client ) PushErrorSendingEnvelope ( ctx context . Context , errorSendingEnvelope wakuv2 . ErrorSendingEnvelope ) {
c . processAndPushTelemetry ( ctx , errorSendingEnvelope )
2024-06-13 15:31:09 -07:00
}
2024-08-12 22:30:13 +01:00
func ( c * Client ) PushPeerCount ( ctx context . Context , peerCount int ) {
2024-08-30 09:49:31 -07:00
now := time . Now ( )
if peerCount != c . lastPeerCount && now . Sub ( c . lastPeerCountTime ) > 1 * time . Second {
2024-08-05 11:44:57 -07:00
c . lastPeerCount = peerCount
2024-08-30 09:49:31 -07:00
c . lastPeerCountTime = now
2024-08-12 22:30:13 +01:00
c . processAndPushTelemetry ( ctx , PeerCount { PeerCount : peerCount } )
2024-08-05 11:44:57 -07:00
}
}
2024-08-12 22:30:13 +01:00
func ( c * Client ) PushPeerConnFailures ( ctx context . Context , peerConnFailures map [ string ] int ) {
2024-08-05 11:44:57 -07:00
for peerID , failures := range peerConnFailures {
if lastFailures , exists := c . lastPeerConnFailures [ peerID ] ; exists {
if failures == lastFailures {
continue
}
}
c . lastPeerConnFailures [ peerID ] = failures
2024-08-12 22:30:13 +01:00
c . processAndPushTelemetry ( ctx , PeerConnFailure { FailedPeerId : peerID , FailureCount : failures } )
2024-08-05 11:44:57 -07:00
}
2024-07-12 13:37:55 -07:00
}
2024-09-18 21:43:04 -07:00
func ( c * Client ) PushMessageCheckSuccess ( ctx context . Context , messageHash string ) {
c . processAndPushTelemetry ( ctx , MessageCheckSuccess { MessageHash : messageHash } )
}
func ( c * Client ) PushMessageCheckFailure ( ctx context . Context , messageHash string ) {
c . processAndPushTelemetry ( ctx , MessageCheckFailure { MessageHash : messageHash } )
}
func ( c * Client ) PushPeerCountByShard ( ctx context . Context , peerCountByShard map [ uint16 ] uint ) {
for shard , count := range peerCountByShard {
c . processAndPushTelemetry ( ctx , PeerCountByShard { Shard : shard , Count : count } )
}
}
func ( c * Client ) PushPeerCountByOrigin ( ctx context . Context , peerCountByOrigin map [ wps . Origin ] uint ) {
for origin , count := range peerCountByOrigin {
c . processAndPushTelemetry ( ctx , PeerCountByOrigin { Origin : origin , Count : count } )
}
}
2024-10-28 15:01:18 -07:00
func ( c * Client ) PushDialFailure ( ctx context . Context , dialFailure v2common . DialError ) {
var errorMessage string = ""
if dialFailure . ErrType == v2common . ErrorUnknown {
errorMessage = dialFailure . ErrMsg
}
c . processAndPushTelemetry ( ctx , DialFailure { ErrorType : dialFailure . ErrType , ErrorMsg : errorMessage , Protocols : dialFailure . Protocols } )
}
func ( c * Client ) PushMissedMessage ( ctx context . Context , envelope * v2protocol . Envelope ) {
c . processAndPushTelemetry ( ctx , MissedMessage { Envelope : envelope } )
}
func ( c * Client ) PushMissedRelevantMessage ( ctx context . Context , receivedMessage * v2common . ReceivedMessage ) {
c . processAndPushTelemetry ( ctx , MissedRelevantMessage { ReceivedMessage : receivedMessage } )
}
func ( c * Client ) PushMessageDeliveryConfirmed ( ctx context . Context , messageHash string ) {
c . processAndPushTelemetry ( ctx , MessageDeliveryConfirmed { MessageHash : messageHash } )
}
2024-12-06 10:47:51 -08:00
func ( c * Client ) PushSentMessageTotal ( ctx context . Context , messageSize uint32 ) {
c . processAndPushTelemetry ( ctx , SentMessageTotal { Size : messageSize } )
}
2024-06-13 15:31:09 -07:00
type ReceivedMessages struct {
Filter transport . Filter
SSHMessage * types . Message
Messages [ ] * v1protocol . StatusMessage
}
2024-07-12 13:37:55 -07:00
type PeerCount struct {
PeerCount int
}
2024-08-05 11:44:57 -07:00
type PeerConnFailure struct {
FailedPeerId string
FailureCount int
}
2024-09-18 21:43:04 -07:00
type MessageCheckSuccess struct {
MessageHash string
}
type MessageCheckFailure struct {
MessageHash string
}
type PeerCountByShard struct {
Shard uint16
Count uint
}
type PeerCountByOrigin struct {
Origin wps . Origin
Count uint
}
2024-10-28 15:01:18 -07:00
type DialFailure struct {
ErrorType v2common . DialErrorType
ErrorMsg string
Protocols string
}
type MissedMessage struct {
Envelope * v2protocol . Envelope
}
type MissedRelevantMessage struct {
ReceivedMessage * v2common . ReceivedMessage
}
type MessageDeliveryConfirmed struct {
MessageHash string
}
2024-12-06 10:47:51 -08:00
type SentMessageTotal struct {
Size uint32
}
2021-11-03 13:38:37 +01:00
type Client struct {
2024-08-05 11:44:57 -07:00
serverURL string
httpClient * http . Client
logger * zap . Logger
keyUID string
nodeName string
peerId string
version string
telemetryCh chan TelemetryRequest
telemetryCacheLock sync . Mutex
telemetryCache [ ] TelemetryRequest
telemetryRetryCache [ ] TelemetryRequest
nextIdLock sync . Mutex
nextId int
sendPeriod time . Duration
lastPeerCount int
2024-08-30 09:49:31 -07:00
lastPeerCountTime time . Time
2024-08-05 11:44:57 -07:00
lastPeerConnFailures map [ string ] int
2024-08-30 08:59:03 -07:00
deviceType string
2021-11-03 13:38:37 +01:00
}
2024-06-28 03:24:04 -07:00
type TelemetryClientOption func ( * Client )
func WithSendPeriod ( sendPeriod time . Duration ) TelemetryClientOption {
return func ( c * Client ) {
c . sendPeriod = sendPeriod
}
}
2024-08-01 05:27:43 +02:00
func WithPeerID ( peerId string ) TelemetryClientOption {
return func ( c * Client ) {
c . peerId = peerId
}
}
2024-06-28 03:24:04 -07:00
func NewClient ( logger * zap . Logger , serverURL string , keyUID string , nodeName string , version string , opts ... TelemetryClientOption ) * Client {
2024-08-01 05:27:43 +02:00
serverURL = strings . TrimRight ( serverURL , "/" )
2024-06-28 03:24:04 -07:00
client := & Client {
2024-08-05 11:44:57 -07:00
serverURL : serverURL ,
httpClient : & http . Client { Timeout : time . Minute } ,
logger : logger ,
keyUID : keyUID ,
nodeName : nodeName ,
version : version ,
telemetryCh : make ( chan TelemetryRequest ) ,
telemetryCacheLock : sync . Mutex { } ,
telemetryCache : make ( [ ] TelemetryRequest , 0 ) ,
telemetryRetryCache : make ( [ ] TelemetryRequest , 0 ) ,
nextId : 0 ,
nextIdLock : sync . Mutex { } ,
sendPeriod : 10 * time . Second , // default value
lastPeerCount : 0 ,
2024-08-30 09:49:31 -07:00
lastPeerCountTime : time . Time { } ,
2024-08-05 11:44:57 -07:00
lastPeerConnFailures : make ( map [ string ] int ) ,
2024-06-28 03:24:04 -07:00
}
for _ , opt := range opts {
opt ( client )
2024-06-13 15:31:09 -07:00
}
2024-06-28 03:24:04 -07:00
return client
2024-06-13 15:31:09 -07:00
}
2024-08-30 08:59:03 -07:00
func ( c * Client ) SetDeviceType ( deviceType string ) {
c . deviceType = deviceType
}
2024-06-28 03:24:04 -07:00
func ( c * Client ) Start ( ctx context . Context ) {
2024-06-13 15:31:09 -07:00
go func ( ) {
2024-09-27 06:37:32 +08:00
defer common . LogOnPanic ( )
2024-06-13 15:31:09 -07:00
for {
select {
2024-06-28 03:24:04 -07:00
case telemetryRequest := <- c . telemetryCh :
c . telemetryCacheLock . Lock ( )
c . telemetryCache = append ( c . telemetryCache , telemetryRequest )
c . telemetryCacheLock . Unlock ( )
2024-06-13 15:31:09 -07:00
case <- ctx . Done ( ) :
return
}
}
} ( )
go func ( ) {
2024-09-27 06:37:32 +08:00
defer common . LogOnPanic ( )
2024-07-01 20:08:54 +02:00
sendPeriod := c . sendPeriod
timer := time . NewTimer ( sendPeriod )
defer timer . Stop ( )
2024-06-13 15:31:09 -07:00
for {
select {
2024-07-01 20:08:54 +02:00
case <- timer . C :
2024-06-28 03:24:04 -07:00
c . telemetryCacheLock . Lock ( )
telemetryRequests := make ( [ ] TelemetryRequest , len ( c . telemetryCache ) )
copy ( telemetryRequests , c . telemetryCache )
c . telemetryCache = nil
c . telemetryCacheLock . Unlock ( )
2024-06-13 15:31:09 -07:00
if len ( telemetryRequests ) > 0 {
2024-07-01 20:08:54 +02:00
err := c . pushTelemetryRequest ( telemetryRequests )
if err != nil {
2024-07-12 03:07:23 +02:00
if sendPeriod < 60 * time . Second { //Stop the growing if the timer is > 60s to at least retry every minute
2024-07-01 20:08:54 +02:00
sendPeriod = sendPeriod * 2
}
} else {
sendPeriod = c . sendPeriod
}
2024-06-13 15:31:09 -07:00
}
2024-07-01 20:08:54 +02:00
timer . Reset ( sendPeriod )
2024-06-13 15:31:09 -07:00
case <- ctx . Done ( ) :
return
}
}
2024-06-28 03:24:04 -07:00
2024-06-13 15:31:09 -07:00
} ( )
}
2024-08-12 22:30:13 +01:00
func ( c * Client ) processAndPushTelemetry ( ctx context . Context , data interface { } ) {
2024-06-13 15:31:09 -07:00
var telemetryRequest TelemetryRequest
switch v := data . ( type ) {
case ReceivedMessages :
telemetryRequest = TelemetryRequest {
Id : c . nextId ,
TelemetryType : ReceivedMessagesMetric ,
TelemetryData : c . ProcessReceivedMessages ( v ) ,
}
case wakuv2 . SentEnvelope :
telemetryRequest = TelemetryRequest {
Id : c . nextId ,
TelemetryType : SentEnvelopeMetric ,
TelemetryData : c . ProcessSentEnvelope ( v ) ,
}
2024-06-28 03:24:04 -07:00
case wakuv2 . ErrorSendingEnvelope :
telemetryRequest = TelemetryRequest {
Id : c . nextId ,
TelemetryType : ErrorSendingEnvelopeMetric ,
TelemetryData : c . ProcessErrorSendingEnvelope ( v ) ,
}
2024-07-12 13:37:55 -07:00
case PeerCount :
telemetryRequest = TelemetryRequest {
Id : c . nextId ,
TelemetryType : PeerCountMetric ,
TelemetryData : c . ProcessPeerCount ( v ) ,
}
2024-08-05 11:44:57 -07:00
case PeerConnFailure :
telemetryRequest = TelemetryRequest {
Id : c . nextId ,
TelemetryType : PeerConnFailuresMetric ,
TelemetryData : c . ProcessPeerConnFailure ( v ) ,
}
2024-09-18 21:43:04 -07:00
case MessageCheckSuccess :
telemetryRequest = TelemetryRequest {
Id : c . nextId ,
TelemetryType : MessageCheckSuccessMetric ,
TelemetryData : c . ProcessMessageCheckSuccess ( v ) ,
}
case MessageCheckFailure :
telemetryRequest = TelemetryRequest {
Id : c . nextId ,
TelemetryType : MessageCheckFailureMetric ,
TelemetryData : c . ProcessMessageCheckFailure ( v ) ,
}
case PeerCountByShard :
telemetryRequest = TelemetryRequest {
Id : c . nextId ,
TelemetryType : PeerCountByShardMetric ,
TelemetryData : c . ProcessPeerCountByShard ( v ) ,
}
case PeerCountByOrigin :
telemetryRequest = TelemetryRequest {
Id : c . nextId ,
TelemetryType : PeerCountByOriginMetric ,
TelemetryData : c . ProcessPeerCountByOrigin ( v ) ,
}
2024-10-28 15:01:18 -07:00
case DialFailure :
telemetryRequest = TelemetryRequest {
Id : c . nextId ,
TelemetryType : DialFailureMetric ,
TelemetryData : c . ProcessDialFailure ( v ) ,
}
case MissedMessage :
telemetryRequest = TelemetryRequest {
Id : c . nextId ,
TelemetryType : MissedMessageMetric ,
TelemetryData : c . ProcessMissedMessage ( v ) ,
}
case MissedRelevantMessage :
telemetryRequest = TelemetryRequest {
Id : c . nextId ,
TelemetryType : MissedRelevantMessageMetric ,
TelemetryData : c . ProcessMissedRelevantMessage ( v ) ,
}
case MessageDeliveryConfirmed :
telemetryRequest = TelemetryRequest {
Id : c . nextId ,
TelemetryType : MessageDeliveryConfirmedMetric ,
TelemetryData : c . ProcessMessageDeliveryConfirmed ( v ) ,
}
2024-12-06 10:47:51 -08:00
case SentMessageTotal :
telemetryRequest = TelemetryRequest {
Id : c . nextId ,
TelemetryType : SentMessageTotalMetric ,
TelemetryData : c . ProcessSentMessageTotal ( v ) ,
}
2024-06-13 15:31:09 -07:00
default :
c . logger . Error ( "Unknown telemetry data type" )
return
}
2024-08-12 22:30:13 +01:00
select {
case <- ctx . Done ( ) :
return
case c . telemetryCh <- telemetryRequest :
}
2024-06-28 03:24:04 -07:00
c . nextIdLock . Lock ( )
c . nextId ++
c . nextIdLock . Unlock ( )
2024-06-13 15:31:09 -07:00
}
2024-07-01 20:08:54 +02:00
// This is assuming to not run concurrently as we are not locking the `telemetryRetryCache`
func ( c * Client ) pushTelemetryRequest ( request [ ] TelemetryRequest ) error {
2024-07-12 03:07:23 +02:00
if len ( c . telemetryRetryCache ) > MaxRetryCache { //Limit the size of the cache to not grow the slice indefinitely in case the Telemetry server is gone for longer time
removeNum := len ( c . telemetryRetryCache ) - MaxRetryCache
2024-07-01 20:08:54 +02:00
c . telemetryRetryCache = c . telemetryRetryCache [ removeNum : ]
}
c . telemetryRetryCache = append ( c . telemetryRetryCache , request ... )
2024-06-13 15:31:09 -07:00
url := fmt . Sprintf ( "%s/record-metrics" , c . serverURL )
2024-07-01 20:08:54 +02:00
body , err := json . Marshal ( c . telemetryRetryCache )
if err != nil {
c . logger . Error ( "Error marshaling telemetry data" , zap . Error ( err ) )
return err
}
res , err := c . httpClient . Post ( url , "application/json" , bytes . NewBuffer ( body ) )
2024-06-13 15:31:09 -07:00
if err != nil {
c . logger . Error ( "Error sending telemetry data" , zap . Error ( err ) )
2024-07-01 20:08:54 +02:00
return err
}
2024-07-12 13:37:55 -07:00
defer res . Body . Close ( )
var responseBody [ ] map [ string ] interface { }
if err := json . NewDecoder ( res . Body ) . Decode ( & responseBody ) ; err != nil {
c . logger . Error ( "Error decoding response body" , zap . Error ( err ) )
return err
}
if res . StatusCode != http . StatusCreated {
c . logger . Error ( "Error sending telemetry data" , zap . Int ( "statusCode" , res . StatusCode ) , zap . Any ( "responseBody" , responseBody ) )
return fmt . Errorf ( "status code %d, response body: %v" , res . StatusCode , responseBody )
2021-11-03 13:38:37 +01:00
}
2024-07-01 20:08:54 +02:00
c . telemetryRetryCache = nil
return nil
2021-11-03 13:38:37 +01:00
}
2024-08-30 08:59:03 -07:00
func ( c * Client ) commonPostBody ( ) map [ string ] interface { } {
return map [ string ] interface { } {
"nodeName" : c . nodeName ,
"peerId" : c . peerId ,
"statusVersion" : c . version ,
"deviceType" : c . deviceType ,
"timestamp" : time . Now ( ) . Unix ( ) ,
}
}
2024-06-13 15:31:09 -07:00
func ( c * Client ) ProcessReceivedMessages ( receivedMessages ReceivedMessages ) * json . RawMessage {
2021-11-03 13:38:37 +01:00
var postBody [ ] map [ string ] interface { }
2024-06-13 15:31:09 -07:00
for _ , message := range receivedMessages . Messages {
2024-08-30 08:59:03 -07:00
messageBody := c . commonPostBody ( )
messageBody [ "chatId" ] = receivedMessages . Filter . ChatID
messageBody [ "messageHash" ] = types . EncodeHex ( receivedMessages . SSHMessage . Hash )
messageBody [ "messageId" ] = message . ApplicationLayer . ID
messageBody [ "sentAt" ] = receivedMessages . SSHMessage . Timestamp
messageBody [ "pubsubTopic" ] = receivedMessages . Filter . PubsubTopic
messageBody [ "topic" ] = receivedMessages . Filter . ContentTopic . String ( )
messageBody [ "messageType" ] = message . ApplicationLayer . Type . String ( )
messageBody [ "receiverKeyUID" ] = c . keyUID
messageBody [ "messageSize" ] = len ( receivedMessages . SSHMessage . Payload )
postBody = append ( postBody , messageBody )
2021-11-03 13:38:37 +01:00
}
body , _ := json . Marshal ( postBody )
2024-06-13 15:31:09 -07:00
jsonRawMessage := json . RawMessage ( body )
return & jsonRawMessage
2021-11-03 13:38:37 +01:00
}
2023-10-30 15:51:57 +01:00
2024-06-13 15:31:09 -07:00
func ( c * Client ) ProcessSentEnvelope ( sentEnvelope wakuv2 . SentEnvelope ) * json . RawMessage {
2024-08-30 08:59:03 -07:00
postBody := c . commonPostBody ( )
postBody [ "messageHash" ] = sentEnvelope . Envelope . Hash ( ) . String ( )
postBody [ "sentAt" ] = uint32 ( sentEnvelope . Envelope . Message ( ) . GetTimestamp ( ) / int64 ( time . Second ) )
postBody [ "pubsubTopic" ] = sentEnvelope . Envelope . PubsubTopic ( )
postBody [ "topic" ] = sentEnvelope . Envelope . Message ( ) . ContentTopic
postBody [ "senderKeyUID" ] = c . keyUID
postBody [ "publishMethod" ] = sentEnvelope . PublishMethod . String ( )
2024-10-28 15:01:18 -07:00
return c . marshalPostBody ( postBody )
2023-10-30 15:51:57 +01:00
}
2024-06-28 03:24:04 -07:00
func ( c * Client ) ProcessErrorSendingEnvelope ( errorSendingEnvelope wakuv2 . ErrorSendingEnvelope ) * json . RawMessage {
2024-08-30 08:59:03 -07:00
postBody := c . commonPostBody ( )
postBody [ "messageHash" ] = errorSendingEnvelope . SentEnvelope . Envelope . Hash ( ) . String ( )
postBody [ "sentAt" ] = uint32 ( errorSendingEnvelope . SentEnvelope . Envelope . Message ( ) . GetTimestamp ( ) / int64 ( time . Second ) )
postBody [ "pubsubTopic" ] = errorSendingEnvelope . SentEnvelope . Envelope . PubsubTopic ( )
postBody [ "topic" ] = errorSendingEnvelope . SentEnvelope . Envelope . Message ( ) . ContentTopic
postBody [ "senderKeyUID" ] = c . keyUID
postBody [ "publishMethod" ] = errorSendingEnvelope . SentEnvelope . PublishMethod . String ( )
postBody [ "error" ] = errorSendingEnvelope . Error . Error ( )
2024-10-28 15:01:18 -07:00
return c . marshalPostBody ( postBody )
2024-06-28 03:24:04 -07:00
}
2024-07-12 13:37:55 -07:00
func ( c * Client ) ProcessPeerCount ( peerCount PeerCount ) * json . RawMessage {
2024-08-30 08:59:03 -07:00
postBody := c . commonPostBody ( )
postBody [ "peerCount" ] = peerCount . PeerCount
2024-10-28 15:01:18 -07:00
return c . marshalPostBody ( postBody )
2024-08-05 11:44:57 -07:00
}
func ( c * Client ) ProcessPeerConnFailure ( peerConnFailure PeerConnFailure ) * json . RawMessage {
2024-08-30 08:59:03 -07:00
postBody := c . commonPostBody ( )
postBody [ "failedPeerId" ] = peerConnFailure . FailedPeerId
postBody [ "failureCount" ] = peerConnFailure . FailureCount
postBody [ "nodeKeyUID" ] = c . keyUID
2024-10-28 15:01:18 -07:00
return c . marshalPostBody ( postBody )
2024-09-18 21:43:04 -07:00
}
func ( c * Client ) ProcessMessageCheckSuccess ( messageCheckSuccess MessageCheckSuccess ) * json . RawMessage {
postBody := c . commonPostBody ( )
postBody [ "messageHash" ] = messageCheckSuccess . MessageHash
2024-10-28 15:01:18 -07:00
return c . marshalPostBody ( postBody )
2024-09-18 21:43:04 -07:00
}
func ( c * Client ) ProcessPeerCountByShard ( peerCountByShard PeerCountByShard ) * json . RawMessage {
postBody := c . commonPostBody ( )
postBody [ "shard" ] = peerCountByShard . Shard
postBody [ "count" ] = peerCountByShard . Count
2024-10-28 15:01:18 -07:00
return c . marshalPostBody ( postBody )
2024-09-18 21:43:04 -07:00
}
func ( c * Client ) ProcessMessageCheckFailure ( messageCheckFailure MessageCheckFailure ) * json . RawMessage {
postBody := c . commonPostBody ( )
postBody [ "messageHash" ] = messageCheckFailure . MessageHash
2024-10-28 15:01:18 -07:00
return c . marshalPostBody ( postBody )
2024-09-18 21:43:04 -07:00
}
func ( c * Client ) ProcessPeerCountByOrigin ( peerCountByOrigin PeerCountByOrigin ) * json . RawMessage {
postBody := c . commonPostBody ( )
postBody [ "origin" ] = peerCountByOrigin . Origin
postBody [ "count" ] = peerCountByOrigin . Count
2024-10-28 15:01:18 -07:00
return c . marshalPostBody ( postBody )
}
func ( c * Client ) ProcessDialFailure ( dialFailure DialFailure ) * json . RawMessage {
postBody := c . commonPostBody ( )
postBody [ "errorType" ] = dialFailure . ErrorType
postBody [ "errorMsg" ] = dialFailure . ErrorMsg
postBody [ "protocols" ] = dialFailure . Protocols
return c . marshalPostBody ( postBody )
}
func ( c * Client ) ProcessMissedMessage ( missedMessage MissedMessage ) * json . RawMessage {
postBody := c . commonPostBody ( )
postBody [ "messageHash" ] = missedMessage . Envelope . Hash ( ) . String ( )
postBody [ "sentAt" ] = uint32 ( missedMessage . Envelope . Message ( ) . GetTimestamp ( ) / int64 ( time . Second ) )
postBody [ "pubsubTopic" ] = missedMessage . Envelope . PubsubTopic ( )
postBody [ "contentTopic" ] = missedMessage . Envelope . Message ( ) . ContentTopic
return c . marshalPostBody ( postBody )
}
func ( c * Client ) ProcessMissedRelevantMessage ( missedMessage MissedRelevantMessage ) * json . RawMessage {
postBody := c . commonPostBody ( )
postBody [ "messageHash" ] = missedMessage . ReceivedMessage . Envelope . Hash ( ) . String ( )
postBody [ "sentAt" ] = missedMessage . ReceivedMessage . Sent
postBody [ "pubsubTopic" ] = missedMessage . ReceivedMessage . PubsubTopic
postBody [ "contentTopic" ] = missedMessage . ReceivedMessage . ContentTopic
return c . marshalPostBody ( postBody )
}
func ( c * Client ) ProcessMessageDeliveryConfirmed ( messageDeliveryConfirmed MessageDeliveryConfirmed ) * json . RawMessage {
postBody := c . commonPostBody ( )
postBody [ "messageHash" ] = messageDeliveryConfirmed . MessageHash
return c . marshalPostBody ( postBody )
}
2024-12-06 10:47:51 -08:00
func ( c * Client ) ProcessSentMessageTotal ( sentMessageTotal SentMessageTotal ) * json . RawMessage {
postBody := c . commonPostBody ( )
postBody [ "size" ] = sentMessageTotal . Size
return c . marshalPostBody ( postBody )
}
2024-10-28 15:01:18 -07:00
// Helper function to marshal post body and handle errors
func ( c * Client ) marshalPostBody ( postBody map [ string ] interface { } ) * json . RawMessage {
body , err := json . Marshal ( postBody )
if err != nil {
c . logger . Error ( "Error marshaling post body" , zap . Error ( err ) )
return nil
}
2024-09-18 21:43:04 -07:00
jsonRawMessage := json . RawMessage ( body )
2024-07-12 13:37:55 -07:00
return & jsonRawMessage
}
2023-10-30 15:51:57 +01:00
func ( c * Client ) UpdateEnvelopeProcessingError ( shhMessage * types . Message , processingError error ) {
2024-09-27 06:37:32 +08:00
defer common . LogOnPanic ( )
2023-10-30 15:51:57 +01:00
c . logger . Debug ( "Pushing envelope update to telemetry server" , zap . String ( "hash" , types . EncodeHex ( shhMessage . Hash ) ) )
url := fmt . Sprintf ( "%s/update-envelope" , c . serverURL )
var errorString = ""
if processingError != nil {
errorString = processingError . Error ( )
}
postBody := map [ string ] interface { } {
"messageHash" : types . EncodeHex ( shhMessage . Hash ) ,
"sentAt" : shhMessage . Timestamp ,
"pubsubTopic" : shhMessage . PubsubTopic ,
"topic" : shhMessage . Topic ,
"receiverKeyUID" : c . keyUID ,
2024-08-01 05:27:43 +02:00
"peerId" : c . peerId ,
2023-10-30 15:51:57 +01:00
"nodeName" : c . nodeName ,
"processingError" : errorString ,
2024-08-30 08:59:03 -07:00
"deviceType" : c . deviceType ,
2023-10-30 15:51:57 +01:00
}
body , _ := json . Marshal ( postBody )
_ , err := c . httpClient . Post ( url , "application/json" , bytes . NewBuffer ( body ) )
if err != nil {
c . logger . Error ( "Error sending envelope update to telemetry server" , zap . Error ( err ) )
}
}