feat(telemetry)_: include device type in metrics (#5669)

* feat(telemetry)_: include device type in metrics

* chore(telemetry)_: refactor common fields used across metrics
This commit is contained in:
Arseniy Klempner 2024-08-30 08:59:03 -07:00 committed by GitHub
parent 0531535ef5
commit 27d02d5fc8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 66 additions and 65 deletions

View File

@ -554,7 +554,10 @@ func NewMessenger(
var telemetryClient *telemetry.Client
if c.telemetryServerURL != "" {
telemetryClient = telemetry.NewClient(logger, c.telemetryServerURL, c.account.KeyUID, nodeName, version, telemetry.WithPeerID(peerId.String()))
options := []telemetry.TelemetryClientOption{
telemetry.WithPeerID(peerId.String()),
}
telemetryClient = telemetry.NewClient(logger, c.telemetryServerURL, c.account.KeyUID, nodeName, version, options...)
if c.wakuService != nil {
c.wakuService.SetStatusTelemetryClient(telemetryClient)
}

View File

@ -415,6 +415,13 @@ func (m *Messenger) InitInstallations() error {
return err
}
if m.telemetryClient != nil {
installation, ok := m.allInstallations.Load(m.installationID)
if ok {
m.telemetryClient.SetDeviceType(installation.InstallationMetadata.DeviceType)
}
}
return nil
}

View File

@ -109,6 +109,7 @@ type Client struct {
sendPeriod time.Duration
lastPeerCount int
lastPeerConnFailures map[string]int
deviceType string
}
type TelemetryClientOption func(*Client)
@ -152,6 +153,10 @@ func NewClient(logger *zap.Logger, serverURL string, keyUID string, nodeName str
return client
}
func (c *Client) SetDeviceType(deviceType string) {
c.deviceType = deviceType
}
func (c *Client) Start(ctx context.Context) {
go func() {
for {
@ -287,23 +292,30 @@ func (c *Client) pushTelemetryRequest(request []TelemetryRequest) error {
return nil
}
func (c *Client) commonPostBody() map[string]interface{} {
return map[string]interface{}{
"nodeName": c.nodeName,
"peerId": c.peerId,
"statusVersion": c.version,
"deviceType": c.deviceType,
"timestamp": time.Now().Unix(),
}
}
func (c *Client) ProcessReceivedMessages(receivedMessages ReceivedMessages) *json.RawMessage {
var postBody []map[string]interface{}
for _, message := range receivedMessages.Messages {
postBody = append(postBody, map[string]interface{}{
"chatId": receivedMessages.Filter.ChatID,
"messageHash": types.EncodeHex(receivedMessages.SSHMessage.Hash),
"messageId": message.ApplicationLayer.ID,
"sentAt": receivedMessages.SSHMessage.Timestamp,
"pubsubTopic": receivedMessages.Filter.PubsubTopic,
"topic": receivedMessages.Filter.ContentTopic.String(),
"messageType": message.ApplicationLayer.Type.String(),
"receiverKeyUID": c.keyUID,
"peerId": c.peerId,
"nodeName": c.nodeName,
"messageSize": len(receivedMessages.SSHMessage.Payload),
"statusVersion": c.version,
})
messageBody := c.commonPostBody()
messageBody["chatId"] = receivedMessages.Filter.ChatID
messageBody["messageHash"] = types.EncodeHex(receivedMessages.SSHMessage.Hash)
messageBody["messageId"] = message.ApplicationLayer.ID
messageBody["sentAt"] = receivedMessages.SSHMessage.Timestamp
messageBody["pubsubTopic"] = receivedMessages.Filter.PubsubTopic
messageBody["topic"] = receivedMessages.Filter.ContentTopic.String()
messageBody["messageType"] = message.ApplicationLayer.Type.String()
messageBody["receiverKeyUID"] = c.keyUID
messageBody["messageSize"] = len(receivedMessages.SSHMessage.Payload)
postBody = append(postBody, messageBody)
}
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
@ -311,80 +323,57 @@ func (c *Client) ProcessReceivedMessages(receivedMessages ReceivedMessages) *jso
}
func (c *Client) ProcessReceivedEnvelope(envelope *v2protocol.Envelope) *json.RawMessage {
postBody := map[string]interface{}{
"messageHash": envelope.Hash().String(),
"sentAt": uint32(envelope.Message().GetTimestamp() / int64(time.Second)),
"pubsubTopic": envelope.PubsubTopic(),
"topic": envelope.Message().ContentTopic,
"receiverKeyUID": c.keyUID,
"peerId": c.peerId,
"nodeName": c.nodeName,
"statusVersion": c.version,
}
postBody := c.commonPostBody()
postBody["messageHash"] = envelope.Hash().String()
postBody["sentAt"] = uint32(envelope.Message().GetTimestamp() / int64(time.Second))
postBody["pubsubTopic"] = envelope.PubsubTopic()
postBody["topic"] = envelope.Message().ContentTopic
postBody["receiverKeyUID"] = c.keyUID
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
}
func (c *Client) ProcessSentEnvelope(sentEnvelope wakuv2.SentEnvelope) *json.RawMessage {
postBody := map[string]interface{}{
"messageHash": sentEnvelope.Envelope.Hash().String(),
"sentAt": uint32(sentEnvelope.Envelope.Message().GetTimestamp() / int64(time.Second)),
"pubsubTopic": sentEnvelope.Envelope.PubsubTopic(),
"topic": sentEnvelope.Envelope.Message().ContentTopic,
"senderKeyUID": c.keyUID,
"peerId": c.peerId,
"nodeName": c.nodeName,
"publishMethod": sentEnvelope.PublishMethod.String(),
"statusVersion": c.version,
}
postBody := c.commonPostBody()
postBody["messageHash"] = sentEnvelope.Envelope.Hash().String()
postBody["sentAt"] = uint32(sentEnvelope.Envelope.Message().GetTimestamp() / int64(time.Second))
postBody["pubsubTopic"] = sentEnvelope.Envelope.PubsubTopic()
postBody["topic"] = sentEnvelope.Envelope.Message().ContentTopic
postBody["senderKeyUID"] = c.keyUID
postBody["publishMethod"] = sentEnvelope.PublishMethod.String()
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
}
func (c *Client) ProcessErrorSendingEnvelope(errorSendingEnvelope wakuv2.ErrorSendingEnvelope) *json.RawMessage {
postBody := map[string]interface{}{
"messageHash": errorSendingEnvelope.SentEnvelope.Envelope.Hash().String(),
"sentAt": uint32(errorSendingEnvelope.SentEnvelope.Envelope.Message().GetTimestamp() / int64(time.Second)),
"pubsubTopic": errorSendingEnvelope.SentEnvelope.Envelope.PubsubTopic(),
"topic": errorSendingEnvelope.SentEnvelope.Envelope.Message().ContentTopic,
"senderKeyUID": c.keyUID,
"peerId": c.peerId,
"nodeName": c.nodeName,
"publishMethod": errorSendingEnvelope.SentEnvelope.PublishMethod.String(),
"statusVersion": c.version,
"error": errorSendingEnvelope.Error.Error(),
}
postBody := c.commonPostBody()
postBody["messageHash"] = errorSendingEnvelope.SentEnvelope.Envelope.Hash().String()
postBody["sentAt"] = uint32(errorSendingEnvelope.SentEnvelope.Envelope.Message().GetTimestamp() / int64(time.Second))
postBody["pubsubTopic"] = errorSendingEnvelope.SentEnvelope.Envelope.PubsubTopic()
postBody["topic"] = errorSendingEnvelope.SentEnvelope.Envelope.Message().ContentTopic
postBody["senderKeyUID"] = c.keyUID
postBody["publishMethod"] = errorSendingEnvelope.SentEnvelope.PublishMethod.String()
postBody["error"] = errorSendingEnvelope.Error.Error()
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
}
func (c *Client) ProcessPeerCount(peerCount PeerCount) *json.RawMessage {
postBody := map[string]interface{}{
"peerCount": peerCount.PeerCount,
"nodeName": c.nodeName,
"nodeKeyUID": c.keyUID,
"peerId": c.peerId,
"statusVersion": c.version,
"timestamp": time.Now().Unix(),
}
postBody := c.commonPostBody()
postBody["peerCount"] = peerCount.PeerCount
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
}
func (c *Client) ProcessPeerConnFailure(peerConnFailure PeerConnFailure) *json.RawMessage {
postBody := map[string]interface{}{
"failedPeerId": peerConnFailure.FailedPeerId,
"failureCount": peerConnFailure.FailureCount,
"nodeName": c.nodeName,
"nodeKeyUID": c.keyUID,
"peerId": c.peerId,
"statusVersion": c.version,
"timestamp": time.Now().Unix(),
}
postBody := c.commonPostBody()
postBody["failedPeerId"] = peerConnFailure.FailedPeerId
postBody["failureCount"] = peerConnFailure.FailureCount
postBody["nodeKeyUID"] = c.keyUID
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
@ -406,6 +395,7 @@ func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, proces
"peerId": c.peerId,
"nodeName": c.nodeName,
"processingError": errorString,
"deviceType": c.deviceType,
}
body, _ := json.Marshal(postBody)
_, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(body))

View File

@ -102,6 +102,7 @@ type ErrorSendingEnvelope struct {
}
type ITelemetryClient interface {
SetDeviceType(deviceType string)
PushReceivedEnvelope(ctx context.Context, receivedEnvelope *protocol.Envelope)
PushSentEnvelope(ctx context.Context, sentEnvelope SentEnvelope)
PushErrorSendingEnvelope(ctx context.Context, errorSendingEnvelope ErrorSendingEnvelope)