feat(telemetry)_: track missed messages
This commit is contained in:
parent
55b0a8b898
commit
d1887cd501
|
@ -18,9 +18,10 @@ import (
|
|||
"github.com/status-im/status-go/wakuv2"
|
||||
|
||||
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
|
||||
v2protocol "github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
|
||||
v1protocol "github.com/status-im/status-go/protocol/v1"
|
||||
v2common "github.com/status-im/status-go/wakuv2/common"
|
||||
v2protocol "github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
)
|
||||
|
||||
type TelemetryType string
|
||||
|
@ -38,6 +39,8 @@ const (
|
|||
PeerCountByShardMetric TelemetryType = "PeerCountByShard"
|
||||
PeerCountByOriginMetric TelemetryType = "PeerCountByOrigin"
|
||||
StoreConfirmationFailedMetric TelemetryType = "StoreConfirmationFailed"
|
||||
MissedMessageMetric TelemetryType = "MissedMessage"
|
||||
MissedRelevantMessageMetric TelemetryType = "MissedRelevantMessage"
|
||||
MaxRetryCache = 5000
|
||||
)
|
||||
|
||||
|
@ -108,6 +111,14 @@ func (c *Client) PushStoreConfirmationFailed(ctx context.Context, msgHash common
|
|||
c.processAndPushTelemetry(ctx, StoreConfirmationFailed{MessageHash: msgHash.String()})
|
||||
}
|
||||
|
||||
func (c *Client) PushMissedMessage(ctx context.Context, envelope *v2protocol.Envelope) {
|
||||
c.processAndPushTelemetry(ctx, MissedMessage{Envelope: envelope})
|
||||
}
|
||||
|
||||
func (c *Client) PushMissedRelevantMessage(ctx context.Context, receivedMessage *v2common.ReceivedMessage) {
|
||||
c.processAndPushTelemetry(ctx, MissedRelevantMessage{ReceivedMessage: receivedMessage})
|
||||
}
|
||||
|
||||
type ReceivedMessages struct {
|
||||
Filter transport.Filter
|
||||
SSHMessage *types.Message
|
||||
|
@ -145,6 +156,14 @@ type StoreConfirmationFailed struct {
|
|||
MessageHash string
|
||||
}
|
||||
|
||||
type MissedMessage struct {
|
||||
Envelope *v2protocol.Envelope
|
||||
}
|
||||
|
||||
type MissedRelevantMessage struct {
|
||||
ReceivedMessage *v2common.ReceivedMessage
|
||||
}
|
||||
|
||||
type Client struct {
|
||||
serverURL string
|
||||
httpClient *http.Client
|
||||
|
@ -321,6 +340,18 @@ func (c *Client) processAndPushTelemetry(ctx context.Context, data interface{})
|
|||
TelemetryType: StoreConfirmationFailedMetric,
|
||||
TelemetryData: c.ProcessStoreConfirmationFailed(v),
|
||||
}
|
||||
case MissedMessage:
|
||||
telemetryRequest = TelemetryRequest{
|
||||
Id: c.nextId,
|
||||
TelemetryType: MissedMessageMetric,
|
||||
TelemetryData: c.ProcessMissedMessage(v),
|
||||
}
|
||||
case MissedRelevantMessage:
|
||||
telemetryRequest = TelemetryRequest{
|
||||
Id: c.nextId,
|
||||
TelemetryType: MissedRelevantMessageMetric,
|
||||
TelemetryData: c.ProcessMissedRelevantMessage(v),
|
||||
}
|
||||
default:
|
||||
c.logger.Error("Unknown telemetry data type")
|
||||
return
|
||||
|
@ -488,6 +519,28 @@ func (c *Client) ProcessStoreConfirmationFailed(storeConfirmationFailed StoreCon
|
|||
return &jsonRawMessage
|
||||
}
|
||||
|
||||
func (c *Client) ProcessMissedMessage(missedMessage MissedMessage) *json.RawMessage {
|
||||
postBody := c.commonPostBody()
|
||||
postBody["messageHash"] = missedMessage.Envelope.Hash().String()
|
||||
postBody["sentAt"] = uint32(missedMessage.Envelope.Message().GetTimestamp() / int64(time.Second))
|
||||
postBody["pubsubTopic"] = missedMessage.Envelope.PubsubTopic()
|
||||
postBody["contentTopic"] = missedMessage.Envelope.Message().ContentTopic
|
||||
body, _ := json.Marshal(postBody)
|
||||
jsonRawMessage := json.RawMessage(body)
|
||||
return &jsonRawMessage
|
||||
}
|
||||
|
||||
func (c *Client) ProcessMissedRelevantMessage(missedMessage MissedRelevantMessage) *json.RawMessage {
|
||||
postBody := c.commonPostBody()
|
||||
postBody["messageHash"] = missedMessage.ReceivedMessage.Envelope.Hash().String()
|
||||
postBody["sentAt"] = missedMessage.ReceivedMessage.Sent
|
||||
postBody["pubsubTopic"] = missedMessage.ReceivedMessage.PubsubTopic
|
||||
postBody["contentTopic"] = missedMessage.ReceivedMessage.ContentTopic
|
||||
body, _ := json.Marshal(postBody)
|
||||
jsonRawMessage := json.RawMessage(body)
|
||||
return &jsonRawMessage
|
||||
}
|
||||
|
||||
func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) {
|
||||
c.logger.Debug("Pushing envelope update to telemetry server", zap.String("hash", types.EncodeHex(shhMessage.Hash)))
|
||||
url := fmt.Sprintf("%s/update-envelope", c.serverURL)
|
||||
|
|
|
@ -113,6 +113,8 @@ type ITelemetryClient interface {
|
|||
PushPeerCountByShard(ctx context.Context, peerCountByShard map[uint16]uint)
|
||||
PushPeerCountByOrigin(ctx context.Context, peerCountByOrigin map[wps.Origin]uint)
|
||||
PushStoreConfirmationFailed(ctx context.Context, msgHash gethcommon.Hash)
|
||||
PushMissedMessage(ctx context.Context, envelope *protocol.Envelope)
|
||||
PushMissedRelevantMessage(ctx context.Context, message *common.ReceivedMessage)
|
||||
}
|
||||
|
||||
// Waku represents a dark communication interface through the Ethereum
|
||||
|
@ -1125,7 +1127,6 @@ func (w *Waku) Start() error {
|
|||
go w.runPeerExchangeLoop()
|
||||
|
||||
if w.cfg.EnableMissingMessageVerification {
|
||||
|
||||
w.missingMsgVerifier = missing.NewMissingMessageVerifier(
|
||||
w.node.Store(),
|
||||
w,
|
||||
|
@ -1401,6 +1402,12 @@ func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.Messag
|
|||
return nil
|
||||
}
|
||||
|
||||
if w.statusTelemetryClient != nil {
|
||||
if msgType == common.MissingMessageType {
|
||||
w.statusTelemetryClient.PushMissedMessage(w.ctx, envelope)
|
||||
}
|
||||
}
|
||||
|
||||
logger := w.logger.With(
|
||||
zap.String("messageType", msgType),
|
||||
zap.Stringer("envelopeHash", envelope.Hash()),
|
||||
|
@ -1518,6 +1525,9 @@ func (w *Waku) processMessage(e *common.ReceivedMessage) {
|
|||
w.storeMsgIDsMu.Unlock()
|
||||
} else {
|
||||
logger.Debug("filters did match")
|
||||
if w.statusTelemetryClient != nil && e.MsgType == common.MissingMessageType {
|
||||
w.statusTelemetryClient.PushMissedRelevantMessage(w.ctx, e)
|
||||
}
|
||||
e.Processed.Store(true)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue