feat(telemetry)_: track missed messages

This commit is contained in:
Arseniy Klempner 2024-09-06 19:26:28 -07:00
parent 916376d3b3
commit 5d9680253b
No known key found for this signature in database
GPG Key ID: 51653F18863BD24B
2 changed files with 78 additions and 15 deletions

View File

@ -19,9 +19,10 @@ import (
wakuv2common "github.com/status-im/status-go/wakuv2/common" wakuv2common "github.com/status-im/status-go/wakuv2/common"
wps "github.com/waku-org/go-waku/waku/v2/peerstore" wps "github.com/waku-org/go-waku/waku/v2/peerstore"
v2protocol "github.com/waku-org/go-waku/waku/v2/protocol"
v1protocol "github.com/status-im/status-go/protocol/v1" v1protocol "github.com/status-im/status-go/protocol/v1"
v2common "github.com/status-im/status-go/wakuv2/common"
v2protocol "github.com/waku-org/go-waku/waku/v2/protocol"
) )
type TelemetryType string type TelemetryType string
@ -39,6 +40,8 @@ const (
PeerCountByShardMetric TelemetryType = "PeerCountByShard" PeerCountByShardMetric TelemetryType = "PeerCountByShard"
PeerCountByOriginMetric TelemetryType = "PeerCountByOrigin" PeerCountByOriginMetric TelemetryType = "PeerCountByOrigin"
DialFailureMetric TelemetryType = "DialFailure" DialFailureMetric TelemetryType = "DialFailure"
MissedMessageMetric TelemetryType = "MissedMessage"
MissedRelevantMessageMetric TelemetryType = "MissedRelevantMessage"
MaxRetryCache = 5000 MaxRetryCache = 5000
) )
@ -113,6 +116,14 @@ func (c *Client) PushDialFailure(ctx context.Context, dialFailure wakuv2common.D
c.processAndPushTelemetry(ctx, DialFailure{ErrorType: dialFailure.ErrType, ErrorMsg: errorMessage, Protocols: dialFailure.Protocols}) c.processAndPushTelemetry(ctx, DialFailure{ErrorType: dialFailure.ErrType, ErrorMsg: errorMessage, Protocols: dialFailure.Protocols})
} }
func (c *Client) PushMissedMessage(ctx context.Context, envelope *v2protocol.Envelope) {
c.processAndPushTelemetry(ctx, MissedMessage{Envelope: envelope})
}
func (c *Client) PushMissedRelevantMessage(ctx context.Context, receivedMessage *v2common.ReceivedMessage) {
c.processAndPushTelemetry(ctx, MissedRelevantMessage{ReceivedMessage: receivedMessage})
}
type ReceivedMessages struct { type ReceivedMessages struct {
Filter transport.Filter Filter transport.Filter
SSHMessage *types.Message SSHMessage *types.Message
@ -152,6 +163,14 @@ type DialFailure struct {
Protocols string Protocols string
} }
type MissedMessage struct {
Envelope *v2protocol.Envelope
}
type MissedRelevantMessage struct {
ReceivedMessage *v2common.ReceivedMessage
}
type Client struct { type Client struct {
serverURL string serverURL string
httpClient *http.Client httpClient *http.Client
@ -330,6 +349,18 @@ func (c *Client) processAndPushTelemetry(ctx context.Context, data interface{})
TelemetryType: DialFailureMetric, TelemetryType: DialFailureMetric,
TelemetryData: c.ProcessDialFailure(v), TelemetryData: c.ProcessDialFailure(v),
} }
case MissedMessage:
telemetryRequest = TelemetryRequest{
Id: c.nextId,
TelemetryType: MissedMessageMetric,
TelemetryData: c.ProcessMissedMessage(v),
}
case MissedRelevantMessage:
telemetryRequest = TelemetryRequest{
Id: c.nextId,
TelemetryType: MissedRelevantMessageMetric,
TelemetryData: c.ProcessMissedRelevantMessage(v),
}
default: default:
c.logger.Error("Unknown telemetry data type") c.logger.Error("Unknown telemetry data type")
return return
@ -499,6 +530,28 @@ func (c *Client) ProcessDialFailure(dialFailure DialFailure) *json.RawMessage {
return &jsonRawMessage return &jsonRawMessage
} }
func (c *Client) ProcessMissedMessage(missedMessage MissedMessage) *json.RawMessage {
postBody := c.commonPostBody()
postBody["messageHash"] = missedMessage.Envelope.Hash().String()
postBody["sentAt"] = uint32(missedMessage.Envelope.Message().GetTimestamp() / int64(time.Second))
postBody["pubsubTopic"] = missedMessage.Envelope.PubsubTopic()
postBody["contentTopic"] = missedMessage.Envelope.Message().ContentTopic
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
}
func (c *Client) ProcessMissedRelevantMessage(missedMessage MissedRelevantMessage) *json.RawMessage {
postBody := c.commonPostBody()
postBody["messageHash"] = missedMessage.ReceivedMessage.Envelope.Hash().String()
postBody["sentAt"] = missedMessage.ReceivedMessage.Sent
postBody["pubsubTopic"] = missedMessage.ReceivedMessage.PubsubTopic
postBody["contentTopic"] = missedMessage.ReceivedMessage.ContentTopic
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
}
func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) { func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) {
defer common.LogOnPanic() defer common.LogOnPanic()
c.logger.Debug("Pushing envelope update to telemetry server", zap.String("hash", types.EncodeHex(shhMessage.Hash))) c.logger.Debug("Pushing envelope update to telemetry server", zap.String("hash", types.EncodeHex(shhMessage.Hash)))

View File

@ -114,6 +114,8 @@ type ITelemetryClient interface {
PushPeerCountByShard(ctx context.Context, peerCountByShard map[uint16]uint) PushPeerCountByShard(ctx context.Context, peerCountByShard map[uint16]uint)
PushPeerCountByOrigin(ctx context.Context, peerCountByOrigin map[wps.Origin]uint) PushPeerCountByOrigin(ctx context.Context, peerCountByOrigin map[wps.Origin]uint)
PushDialFailure(ctx context.Context, dialFailure common.DialError) PushDialFailure(ctx context.Context, dialFailure common.DialError)
PushMissedMessage(ctx context.Context, envelope *protocol.Envelope)
PushMissedRelevantMessage(ctx context.Context, message *common.ReceivedMessage)
} }
// Waku represents a dark communication interface through the Ethereum // Waku represents a dark communication interface through the Ethereum
@ -1146,7 +1148,6 @@ func (w *Waku) Start() error {
go w.runPeerExchangeLoop() go w.runPeerExchangeLoop()
if w.cfg.EnableMissingMessageVerification { if w.cfg.EnableMissingMessageVerification {
w.missingMsgVerifier = missing.NewMissingMessageVerifier( w.missingMsgVerifier = missing.NewMissingMessageVerifier(
w.node.Store(), w.node.Store(),
w, w,
@ -1424,6 +1425,12 @@ func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.Messag
return nil return nil
} }
if w.statusTelemetryClient != nil {
if msgType == common.MissingMessageType {
w.statusTelemetryClient.PushMissedMessage(w.ctx, envelope)
}
}
logger := w.logger.With( logger := w.logger.With(
zap.String("messageType", msgType), zap.String("messageType", msgType),
zap.Stringer("envelopeHash", envelope.Hash()), zap.Stringer("envelopeHash", envelope.Hash()),
@ -1542,6 +1549,9 @@ func (w *Waku) processMessage(e *common.ReceivedMessage) {
w.storeMsgIDsMu.Unlock() w.storeMsgIDsMu.Unlock()
} else { } else {
logger.Debug("filters did match") logger.Debug("filters did match")
if w.statusTelemetryClient != nil && e.MsgType == common.MissingMessageType {
w.statusTelemetryClient.PushMissedRelevantMessage(w.ctx, e)
}
e.Processed.Store(true) e.Processed.Store(true)
} }