diff --git a/telemetry/client.go b/telemetry/client.go index a9b55f2b5..54f257ccc 100644 --- a/telemetry/client.go +++ b/telemetry/client.go @@ -18,9 +18,10 @@ import ( "github.com/status-im/status-go/wakuv2" wps "github.com/waku-org/go-waku/waku/v2/peerstore" - v2protocol "github.com/waku-org/go-waku/waku/v2/protocol" v1protocol "github.com/status-im/status-go/protocol/v1" + v2common "github.com/status-im/status-go/wakuv2/common" + v2protocol "github.com/waku-org/go-waku/waku/v2/protocol" ) type TelemetryType string @@ -38,6 +39,8 @@ const ( PeerCountByShardMetric TelemetryType = "PeerCountByShard" PeerCountByOriginMetric TelemetryType = "PeerCountByOrigin" StoreConfirmationFailedMetric TelemetryType = "StoreConfirmationFailed" + MissedMessageMetric TelemetryType = "MissedMessage" + MissedRelevantMessageMetric TelemetryType = "MissedRelevantMessage" MaxRetryCache = 5000 ) @@ -108,6 +111,14 @@ func (c *Client) PushStoreConfirmationFailed(ctx context.Context, msgHash common c.processAndPushTelemetry(ctx, StoreConfirmationFailed{MessageHash: msgHash.String()}) } +func (c *Client) PushMissedMessage(ctx context.Context, envelope *v2protocol.Envelope) { + c.processAndPushTelemetry(ctx, MissedMessage{Envelope: envelope}) +} + +func (c *Client) PushMissedRelevantMessage(ctx context.Context, receivedMessage *v2common.ReceivedMessage) { + c.processAndPushTelemetry(ctx, MissedRelevantMessage{ReceivedMessage: receivedMessage}) +} + type ReceivedMessages struct { Filter transport.Filter SSHMessage *types.Message @@ -145,6 +156,14 @@ type StoreConfirmationFailed struct { MessageHash string } +type MissedMessage struct { + Envelope *v2protocol.Envelope +} + +type MissedRelevantMessage struct { + ReceivedMessage *v2common.ReceivedMessage +} + type Client struct { serverURL string httpClient *http.Client @@ -321,6 +340,18 @@ func (c *Client) processAndPushTelemetry(ctx context.Context, data interface{}) TelemetryType: StoreConfirmationFailedMetric, TelemetryData: c.ProcessStoreConfirmationFailed(v), } + case MissedMessage: + telemetryRequest = TelemetryRequest{ + Id: c.nextId, + TelemetryType: MissedMessageMetric, + TelemetryData: c.ProcessMissedMessage(v), + } + case MissedRelevantMessage: + telemetryRequest = TelemetryRequest{ + Id: c.nextId, + TelemetryType: MissedRelevantMessageMetric, + TelemetryData: c.ProcessMissedRelevantMessage(v), + } default: c.logger.Error("Unknown telemetry data type") return @@ -488,6 +519,28 @@ func (c *Client) ProcessStoreConfirmationFailed(storeConfirmationFailed StoreCon return &jsonRawMessage } +func (c *Client) ProcessMissedMessage(missedMessage MissedMessage) *json.RawMessage { + postBody := c.commonPostBody() + postBody["messageHash"] = missedMessage.Envelope.Hash().String() + postBody["sentAt"] = uint32(missedMessage.Envelope.Message().GetTimestamp() / int64(time.Second)) + postBody["pubsubTopic"] = missedMessage.Envelope.PubsubTopic() + postBody["contentTopic"] = missedMessage.Envelope.Message().ContentTopic + body, _ := json.Marshal(postBody) + jsonRawMessage := json.RawMessage(body) + return &jsonRawMessage +} + +func (c *Client) ProcessMissedRelevantMessage(missedMessage MissedRelevantMessage) *json.RawMessage { + postBody := c.commonPostBody() + postBody["messageHash"] = missedMessage.ReceivedMessage.Envelope.Hash().String() + postBody["sentAt"] = missedMessage.ReceivedMessage.Sent + postBody["pubsubTopic"] = missedMessage.ReceivedMessage.PubsubTopic + postBody["contentTopic"] = missedMessage.ReceivedMessage.ContentTopic + body, _ := json.Marshal(postBody) + jsonRawMessage := json.RawMessage(body) + return &jsonRawMessage +} + func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) { c.logger.Debug("Pushing envelope update to telemetry server", zap.String("hash", types.EncodeHex(shhMessage.Hash))) url := fmt.Sprintf("%s/update-envelope", c.serverURL) diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 6e01926d2..7001b6a36 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -113,6 +113,8 @@ type ITelemetryClient interface { PushPeerCountByShard(ctx context.Context, peerCountByShard map[uint16]uint) PushPeerCountByOrigin(ctx context.Context, peerCountByOrigin map[wps.Origin]uint) PushStoreConfirmationFailed(ctx context.Context, msgHash gethcommon.Hash) + PushMissedMessage(ctx context.Context, envelope *protocol.Envelope) + PushMissedRelevantMessage(ctx context.Context, message *common.ReceivedMessage) } // Waku represents a dark communication interface through the Ethereum @@ -1125,7 +1127,6 @@ func (w *Waku) Start() error { go w.runPeerExchangeLoop() if w.cfg.EnableMissingMessageVerification { - w.missingMsgVerifier = missing.NewMissingMessageVerifier( w.node.Store(), w, @@ -1401,6 +1402,12 @@ func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.Messag return nil } + if w.statusTelemetryClient != nil { + if msgType == common.MissingMessageType { + w.statusTelemetryClient.PushMissedMessage(w.ctx, envelope) + } + } + logger := w.logger.With( zap.String("messageType", msgType), zap.Stringer("envelopeHash", envelope.Hash()), @@ -1518,6 +1525,9 @@ func (w *Waku) processMessage(e *common.ReceivedMessage) { w.storeMsgIDsMu.Unlock() } else { logger.Debug("filters did match") + if w.statusTelemetryClient != nil && e.MsgType == common.MissingMessageType { + w.statusTelemetryClient.PushMissedRelevantMessage(w.ctx, e) + } e.Processed.Store(true) }