From 2749da2a5edf8358bd369d3f1840004b11caca1d Mon Sep 17 00:00:00 2001 From: Arseniy Klempner Date: Mon, 9 Sep 2024 14:54:20 -0700 Subject: [PATCH] feat(telemetry)_: add metric for confirmed delivery --- telemetry/client.go | 71 +++++++++++++++++++++++---------------------- wakuv2/waku.go | 5 +++- 2 files changed, 41 insertions(+), 35 deletions(-) diff --git a/telemetry/client.go b/telemetry/client.go index 54f257ccc..c6fea3fe0 100644 --- a/telemetry/client.go +++ b/telemetry/client.go @@ -12,7 +12,6 @@ import ( "go.uber.org/zap" - "github.com/ethereum/go-ethereum/common" "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/protocol/transport" "github.com/status-im/status-go/wakuv2" @@ -27,21 +26,21 @@ import ( type TelemetryType string const ( - ProtocolStatsMetric TelemetryType = "ProtocolStats" - SentEnvelopeMetric TelemetryType = "SentEnvelope" - UpdateEnvelopeMetric TelemetryType = "UpdateEnvelope" - ReceivedMessagesMetric TelemetryType = "ReceivedMessages" - ErrorSendingEnvelopeMetric TelemetryType = "ErrorSendingEnvelope" - PeerCountMetric TelemetryType = "PeerCount" - PeerConnFailuresMetric TelemetryType = "PeerConnFailure" - MessageCheckSuccessMetric TelemetryType = "MessageCheckSuccess" - MessageCheckFailureMetric TelemetryType = "MessageCheckFailure" - PeerCountByShardMetric TelemetryType = "PeerCountByShard" - PeerCountByOriginMetric TelemetryType = "PeerCountByOrigin" - StoreConfirmationFailedMetric TelemetryType = "StoreConfirmationFailed" - MissedMessageMetric TelemetryType = "MissedMessage" - MissedRelevantMessageMetric TelemetryType = "MissedRelevantMessage" - MaxRetryCache = 5000 + ProtocolStatsMetric TelemetryType = "ProtocolStats" + SentEnvelopeMetric TelemetryType = "SentEnvelope" + UpdateEnvelopeMetric TelemetryType = "UpdateEnvelope" + ReceivedMessagesMetric TelemetryType = "ReceivedMessages" + ErrorSendingEnvelopeMetric TelemetryType = "ErrorSendingEnvelope" + PeerCountMetric TelemetryType = "PeerCount" + PeerConnFailuresMetric TelemetryType = "PeerConnFailure" + MessageCheckSuccessMetric TelemetryType = "MessageCheckSuccess" + MessageCheckFailureMetric TelemetryType = "MessageCheckFailure" + PeerCountByShardMetric TelemetryType = "PeerCountByShard" + PeerCountByOriginMetric TelemetryType = "PeerCountByOrigin" + MissedMessageMetric TelemetryType = "MissedMessage" + MissedRelevantMessageMetric TelemetryType = "MissedRelevantMessage" + MessageDeliveryConfirmedMetric TelemetryType = "MessageDeliveryConfirmed" + MaxRetryCache = 5000 ) type TelemetryRequest struct { @@ -107,10 +106,6 @@ func (c *Client) PushPeerCountByOrigin(ctx context.Context, peerCountByOrigin ma } } -func (c *Client) PushStoreConfirmationFailed(ctx context.Context, msgHash common.Hash) { - c.processAndPushTelemetry(ctx, StoreConfirmationFailed{MessageHash: msgHash.String()}) -} - func (c *Client) PushMissedMessage(ctx context.Context, envelope *v2protocol.Envelope) { c.processAndPushTelemetry(ctx, MissedMessage{Envelope: envelope}) } @@ -119,6 +114,10 @@ func (c *Client) PushMissedRelevantMessage(ctx context.Context, receivedMessage c.processAndPushTelemetry(ctx, MissedRelevantMessage{ReceivedMessage: receivedMessage}) } +func (c *Client) PushMessageDeliveryConfirmed(ctx context.Context, numMessages int) { + c.processAndPushTelemetry(ctx, MessageDeliveryConfirmed{NumMessages: numMessages}) +} + type ReceivedMessages struct { Filter transport.Filter SSHMessage *types.Message @@ -164,6 +163,10 @@ type MissedRelevantMessage struct { ReceivedMessage *v2common.ReceivedMessage } +type MessageDeliveryConfirmed struct { + NumMessages int +} + type Client struct { serverURL string httpClient *http.Client @@ -334,12 +337,6 @@ func (c *Client) processAndPushTelemetry(ctx context.Context, data interface{}) TelemetryType: PeerCountByOriginMetric, TelemetryData: c.ProcessPeerCountByOrigin(v), } - case StoreConfirmationFailed: - telemetryRequest = TelemetryRequest{ - Id: c.nextId, - TelemetryType: StoreConfirmationFailedMetric, - TelemetryData: c.ProcessStoreConfirmationFailed(v), - } case MissedMessage: telemetryRequest = TelemetryRequest{ Id: c.nextId, @@ -352,6 +349,12 @@ func (c *Client) processAndPushTelemetry(ctx context.Context, data interface{}) TelemetryType: MissedRelevantMessageMetric, TelemetryData: c.ProcessMissedRelevantMessage(v), } + case MessageDeliveryConfirmed: + telemetryRequest = TelemetryRequest{ + Id: c.nextId, + TelemetryType: MessageDeliveryConfirmedMetric, + TelemetryData: c.ProcessMessageDeliveryConfirmed(v), + } default: c.logger.Error("Unknown telemetry data type") return @@ -511,14 +514,6 @@ func (c *Client) ProcessPeerCountByOrigin(peerCountByOrigin PeerCountByOrigin) * return &jsonRawMessage } -func (c *Client) ProcessStoreConfirmationFailed(storeConfirmationFailed StoreConfirmationFailed) *json.RawMessage { - postBody := c.commonPostBody() - postBody["messageHash"] = storeConfirmationFailed.MessageHash - body, _ := json.Marshal(postBody) - jsonRawMessage := json.RawMessage(body) - return &jsonRawMessage -} - func (c *Client) ProcessMissedMessage(missedMessage MissedMessage) *json.RawMessage { postBody := c.commonPostBody() postBody["messageHash"] = missedMessage.Envelope.Hash().String() @@ -541,6 +536,14 @@ func (c *Client) ProcessMissedRelevantMessage(missedMessage MissedRelevantMessag return &jsonRawMessage } +func (c *Client) ProcessMessageDeliveryConfirmed(messageDeliveryConfirmed MessageDeliveryConfirmed) *json.RawMessage { + postBody := c.commonPostBody() + postBody["numMessages"] = messageDeliveryConfirmed.NumMessages + body, _ := json.Marshal(postBody) + jsonRawMessage := json.RawMessage(body) + return &jsonRawMessage +} + func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) { c.logger.Debug("Pushing envelope update to telemetry server", zap.String("hash", types.EncodeHex(shhMessage.Hash))) url := fmt.Sprintf("%s/update-envelope", c.serverURL) diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 7001b6a36..8829294a0 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -112,9 +112,9 @@ type ITelemetryClient interface { PushMessageCheckFailure(ctx context.Context, messageHash string) PushPeerCountByShard(ctx context.Context, peerCountByShard map[uint16]uint) PushPeerCountByOrigin(ctx context.Context, peerCountByOrigin map[wps.Origin]uint) - PushStoreConfirmationFailed(ctx context.Context, msgHash gethcommon.Hash) PushMissedMessage(ctx context.Context, envelope *protocol.Envelope) PushMissedRelevantMessage(ctx context.Context, message *common.ReceivedMessage) + PushMessageDeliveryConfirmed(ctx context.Context, numMessages int) } // Waku represents a dark communication interface through the Ethereum @@ -986,6 +986,9 @@ func (w *Waku) SkipPublishToTopic(value bool) { func (w *Waku) ConfirmMessageDelivered(hashes []gethcommon.Hash) { w.messageSender.MessagesDelivered(hashes) + if w.statusTelemetryClient != nil { + w.statusTelemetryClient.PushMessageDeliveryConfirmed(w.ctx, len(hashes)) + } } func (w *Waku) SetStorePeerID(peerID peer.ID) {