feat(telemetry)_: add metric for confirmed delivery

This commit is contained in:
Arseniy Klempner 2024-09-09 14:54:20 -07:00
parent 5d9680253b
commit 973dee9003
No known key found for this signature in database
GPG Key ID: 51653F18863BD24B
2 changed files with 48 additions and 20 deletions

View File

@ -12,12 +12,11 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
"github.com/ethereum/go-ethereum/common" "github.com/status-im/status-go/common"
"github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/transport" "github.com/status-im/status-go/protocol/transport"
"github.com/status-im/status-go/wakuv2" "github.com/status-im/status-go/wakuv2"
wakuv2common "github.com/status-im/status-go/wakuv2/common"
wps "github.com/waku-org/go-waku/waku/v2/peerstore" wps "github.com/waku-org/go-waku/waku/v2/peerstore"
v1protocol "github.com/status-im/status-go/protocol/v1" v1protocol "github.com/status-im/status-go/protocol/v1"
@ -28,21 +27,22 @@ import (
type TelemetryType string type TelemetryType string
const ( const (
ProtocolStatsMetric TelemetryType = "ProtocolStats" ProtocolStatsMetric TelemetryType = "ProtocolStats"
SentEnvelopeMetric TelemetryType = "SentEnvelope" SentEnvelopeMetric TelemetryType = "SentEnvelope"
UpdateEnvelopeMetric TelemetryType = "UpdateEnvelope" UpdateEnvelopeMetric TelemetryType = "UpdateEnvelope"
ReceivedMessagesMetric TelemetryType = "ReceivedMessages" ReceivedMessagesMetric TelemetryType = "ReceivedMessages"
ErrorSendingEnvelopeMetric TelemetryType = "ErrorSendingEnvelope" ErrorSendingEnvelopeMetric TelemetryType = "ErrorSendingEnvelope"
PeerCountMetric TelemetryType = "PeerCount" PeerCountMetric TelemetryType = "PeerCount"
PeerConnFailuresMetric TelemetryType = "PeerConnFailure" PeerConnFailuresMetric TelemetryType = "PeerConnFailure"
MessageCheckSuccessMetric TelemetryType = "MessageCheckSuccess" MessageCheckSuccessMetric TelemetryType = "MessageCheckSuccess"
MessageCheckFailureMetric TelemetryType = "MessageCheckFailure" MessageCheckFailureMetric TelemetryType = "MessageCheckFailure"
PeerCountByShardMetric TelemetryType = "PeerCountByShard" PeerCountByShardMetric TelemetryType = "PeerCountByShard"
PeerCountByOriginMetric TelemetryType = "PeerCountByOrigin" PeerCountByOriginMetric TelemetryType = "PeerCountByOrigin"
DialFailureMetric TelemetryType = "DialFailure" DialFailureMetric TelemetryType = "DialFailure"
MissedMessageMetric TelemetryType = "MissedMessage" MissedMessageMetric TelemetryType = "MissedMessage"
MissedRelevantMessageMetric TelemetryType = "MissedRelevantMessage" MissedRelevantMessageMetric TelemetryType = "MissedRelevantMessage"
MaxRetryCache = 5000 MessageDeliveryConfirmedMetric TelemetryType = "MessageDeliveryConfirmed"
MaxRetryCache = 5000
) )
type TelemetryRequest struct { type TelemetryRequest struct {
@ -108,9 +108,9 @@ func (c *Client) PushPeerCountByOrigin(ctx context.Context, peerCountByOrigin ma
} }
} }
func (c *Client) PushDialFailure(ctx context.Context, dialFailure wakuv2common.DialError) { func (c *Client) PushDialFailure(ctx context.Context, dialFailure v2common.DialError) {
var errorMessage string = "" var errorMessage string = ""
if dialFailure.ErrType == wakuv2common.ErrorUnknown { if dialFailure.ErrType == v2common.ErrorUnknown {
errorMessage = dialFailure.ErrMsg errorMessage = dialFailure.ErrMsg
} }
c.processAndPushTelemetry(ctx, DialFailure{ErrorType: dialFailure.ErrType, ErrorMsg: errorMessage, Protocols: dialFailure.Protocols}) c.processAndPushTelemetry(ctx, DialFailure{ErrorType: dialFailure.ErrType, ErrorMsg: errorMessage, Protocols: dialFailure.Protocols})
@ -124,6 +124,10 @@ func (c *Client) PushMissedRelevantMessage(ctx context.Context, receivedMessage
c.processAndPushTelemetry(ctx, MissedRelevantMessage{ReceivedMessage: receivedMessage}) c.processAndPushTelemetry(ctx, MissedRelevantMessage{ReceivedMessage: receivedMessage})
} }
func (c *Client) PushMessageDeliveryConfirmed(ctx context.Context, messageHash string) {
c.processAndPushTelemetry(ctx, MessageDeliveryConfirmed{MessageHash: messageHash})
}
type ReceivedMessages struct { type ReceivedMessages struct {
Filter transport.Filter Filter transport.Filter
SSHMessage *types.Message SSHMessage *types.Message
@ -158,7 +162,7 @@ type PeerCountByOrigin struct {
} }
type DialFailure struct { type DialFailure struct {
ErrorType wakuv2common.DialErrorType ErrorType v2common.DialErrorType
ErrorMsg string ErrorMsg string
Protocols string Protocols string
} }
@ -171,6 +175,10 @@ type MissedRelevantMessage struct {
ReceivedMessage *v2common.ReceivedMessage ReceivedMessage *v2common.ReceivedMessage
} }
type MessageDeliveryConfirmed struct {
MessageHash string
}
type Client struct { type Client struct {
serverURL string serverURL string
httpClient *http.Client httpClient *http.Client
@ -361,6 +369,12 @@ func (c *Client) processAndPushTelemetry(ctx context.Context, data interface{})
TelemetryType: MissedRelevantMessageMetric, TelemetryType: MissedRelevantMessageMetric,
TelemetryData: c.ProcessMissedRelevantMessage(v), TelemetryData: c.ProcessMissedRelevantMessage(v),
} }
case MessageDeliveryConfirmed:
telemetryRequest = TelemetryRequest{
Id: c.nextId,
TelemetryType: MessageDeliveryConfirmedMetric,
TelemetryData: c.ProcessMessageDeliveryConfirmed(v),
}
default: default:
c.logger.Error("Unknown telemetry data type") c.logger.Error("Unknown telemetry data type")
return return
@ -552,6 +566,14 @@ func (c *Client) ProcessMissedRelevantMessage(missedMessage MissedRelevantMessag
return &jsonRawMessage return &jsonRawMessage
} }
func (c *Client) ProcessMessageDeliveryConfirmed(messageDeliveryConfirmed MessageDeliveryConfirmed) *json.RawMessage {
postBody := c.commonPostBody()
postBody["messageHash"] = messageDeliveryConfirmed.MessageHash
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
}
func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) { func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) {
defer common.LogOnPanic() defer common.LogOnPanic()
c.logger.Debug("Pushing envelope update to telemetry server", zap.String("hash", types.EncodeHex(shhMessage.Hash))) c.logger.Debug("Pushing envelope update to telemetry server", zap.String("hash", types.EncodeHex(shhMessage.Hash)))

View File

@ -116,6 +116,7 @@ type ITelemetryClient interface {
PushDialFailure(ctx context.Context, dialFailure common.DialError) PushDialFailure(ctx context.Context, dialFailure common.DialError)
PushMissedMessage(ctx context.Context, envelope *protocol.Envelope) PushMissedMessage(ctx context.Context, envelope *protocol.Envelope)
PushMissedRelevantMessage(ctx context.Context, message *common.ReceivedMessage) PushMissedRelevantMessage(ctx context.Context, message *common.ReceivedMessage)
PushMessageDeliveryConfirmed(ctx context.Context, messageHash string)
} }
// Waku represents a dark communication interface through the Ethereum // Waku represents a dark communication interface through the Ethereum
@ -993,6 +994,11 @@ func (w *Waku) SkipPublishToTopic(value bool) {
func (w *Waku) ConfirmMessageDelivered(hashes []gethcommon.Hash) { func (w *Waku) ConfirmMessageDelivered(hashes []gethcommon.Hash) {
w.messageSender.MessagesDelivered(hashes) w.messageSender.MessagesDelivered(hashes)
if w.statusTelemetryClient != nil {
for _, hash := range hashes {
w.statusTelemetryClient.PushMessageDeliveryConfirmed(w.ctx, hash.String())
}
}
} }
func (w *Waku) SetStorePeerID(peerID peer.ID) { func (w *Waku) SetStorePeerID(peerID peer.ID) {