feat(telemetry)_: add metric for confirmed delivery
This commit is contained in:
parent
d1887cd501
commit
2749da2a5e
|
@ -12,7 +12,6 @@ import (
|
||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
|
||||||
"github.com/status-im/status-go/eth-node/types"
|
"github.com/status-im/status-go/eth-node/types"
|
||||||
"github.com/status-im/status-go/protocol/transport"
|
"github.com/status-im/status-go/protocol/transport"
|
||||||
"github.com/status-im/status-go/wakuv2"
|
"github.com/status-im/status-go/wakuv2"
|
||||||
|
@ -27,21 +26,21 @@ import (
|
||||||
type TelemetryType string
|
type TelemetryType string
|
||||||
|
|
||||||
const (
|
const (
|
||||||
ProtocolStatsMetric TelemetryType = "ProtocolStats"
|
ProtocolStatsMetric TelemetryType = "ProtocolStats"
|
||||||
SentEnvelopeMetric TelemetryType = "SentEnvelope"
|
SentEnvelopeMetric TelemetryType = "SentEnvelope"
|
||||||
UpdateEnvelopeMetric TelemetryType = "UpdateEnvelope"
|
UpdateEnvelopeMetric TelemetryType = "UpdateEnvelope"
|
||||||
ReceivedMessagesMetric TelemetryType = "ReceivedMessages"
|
ReceivedMessagesMetric TelemetryType = "ReceivedMessages"
|
||||||
ErrorSendingEnvelopeMetric TelemetryType = "ErrorSendingEnvelope"
|
ErrorSendingEnvelopeMetric TelemetryType = "ErrorSendingEnvelope"
|
||||||
PeerCountMetric TelemetryType = "PeerCount"
|
PeerCountMetric TelemetryType = "PeerCount"
|
||||||
PeerConnFailuresMetric TelemetryType = "PeerConnFailure"
|
PeerConnFailuresMetric TelemetryType = "PeerConnFailure"
|
||||||
MessageCheckSuccessMetric TelemetryType = "MessageCheckSuccess"
|
MessageCheckSuccessMetric TelemetryType = "MessageCheckSuccess"
|
||||||
MessageCheckFailureMetric TelemetryType = "MessageCheckFailure"
|
MessageCheckFailureMetric TelemetryType = "MessageCheckFailure"
|
||||||
PeerCountByShardMetric TelemetryType = "PeerCountByShard"
|
PeerCountByShardMetric TelemetryType = "PeerCountByShard"
|
||||||
PeerCountByOriginMetric TelemetryType = "PeerCountByOrigin"
|
PeerCountByOriginMetric TelemetryType = "PeerCountByOrigin"
|
||||||
StoreConfirmationFailedMetric TelemetryType = "StoreConfirmationFailed"
|
MissedMessageMetric TelemetryType = "MissedMessage"
|
||||||
MissedMessageMetric TelemetryType = "MissedMessage"
|
MissedRelevantMessageMetric TelemetryType = "MissedRelevantMessage"
|
||||||
MissedRelevantMessageMetric TelemetryType = "MissedRelevantMessage"
|
MessageDeliveryConfirmedMetric TelemetryType = "MessageDeliveryConfirmed"
|
||||||
MaxRetryCache = 5000
|
MaxRetryCache = 5000
|
||||||
)
|
)
|
||||||
|
|
||||||
type TelemetryRequest struct {
|
type TelemetryRequest struct {
|
||||||
|
@ -107,10 +106,6 @@ func (c *Client) PushPeerCountByOrigin(ctx context.Context, peerCountByOrigin ma
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) PushStoreConfirmationFailed(ctx context.Context, msgHash common.Hash) {
|
|
||||||
c.processAndPushTelemetry(ctx, StoreConfirmationFailed{MessageHash: msgHash.String()})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Client) PushMissedMessage(ctx context.Context, envelope *v2protocol.Envelope) {
|
func (c *Client) PushMissedMessage(ctx context.Context, envelope *v2protocol.Envelope) {
|
||||||
c.processAndPushTelemetry(ctx, MissedMessage{Envelope: envelope})
|
c.processAndPushTelemetry(ctx, MissedMessage{Envelope: envelope})
|
||||||
}
|
}
|
||||||
|
@ -119,6 +114,10 @@ func (c *Client) PushMissedRelevantMessage(ctx context.Context, receivedMessage
|
||||||
c.processAndPushTelemetry(ctx, MissedRelevantMessage{ReceivedMessage: receivedMessage})
|
c.processAndPushTelemetry(ctx, MissedRelevantMessage{ReceivedMessage: receivedMessage})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Client) PushMessageDeliveryConfirmed(ctx context.Context, numMessages int) {
|
||||||
|
c.processAndPushTelemetry(ctx, MessageDeliveryConfirmed{NumMessages: numMessages})
|
||||||
|
}
|
||||||
|
|
||||||
type ReceivedMessages struct {
|
type ReceivedMessages struct {
|
||||||
Filter transport.Filter
|
Filter transport.Filter
|
||||||
SSHMessage *types.Message
|
SSHMessage *types.Message
|
||||||
|
@ -164,6 +163,10 @@ type MissedRelevantMessage struct {
|
||||||
ReceivedMessage *v2common.ReceivedMessage
|
ReceivedMessage *v2common.ReceivedMessage
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type MessageDeliveryConfirmed struct {
|
||||||
|
NumMessages int
|
||||||
|
}
|
||||||
|
|
||||||
type Client struct {
|
type Client struct {
|
||||||
serverURL string
|
serverURL string
|
||||||
httpClient *http.Client
|
httpClient *http.Client
|
||||||
|
@ -334,12 +337,6 @@ func (c *Client) processAndPushTelemetry(ctx context.Context, data interface{})
|
||||||
TelemetryType: PeerCountByOriginMetric,
|
TelemetryType: PeerCountByOriginMetric,
|
||||||
TelemetryData: c.ProcessPeerCountByOrigin(v),
|
TelemetryData: c.ProcessPeerCountByOrigin(v),
|
||||||
}
|
}
|
||||||
case StoreConfirmationFailed:
|
|
||||||
telemetryRequest = TelemetryRequest{
|
|
||||||
Id: c.nextId,
|
|
||||||
TelemetryType: StoreConfirmationFailedMetric,
|
|
||||||
TelemetryData: c.ProcessStoreConfirmationFailed(v),
|
|
||||||
}
|
|
||||||
case MissedMessage:
|
case MissedMessage:
|
||||||
telemetryRequest = TelemetryRequest{
|
telemetryRequest = TelemetryRequest{
|
||||||
Id: c.nextId,
|
Id: c.nextId,
|
||||||
|
@ -352,6 +349,12 @@ func (c *Client) processAndPushTelemetry(ctx context.Context, data interface{})
|
||||||
TelemetryType: MissedRelevantMessageMetric,
|
TelemetryType: MissedRelevantMessageMetric,
|
||||||
TelemetryData: c.ProcessMissedRelevantMessage(v),
|
TelemetryData: c.ProcessMissedRelevantMessage(v),
|
||||||
}
|
}
|
||||||
|
case MessageDeliveryConfirmed:
|
||||||
|
telemetryRequest = TelemetryRequest{
|
||||||
|
Id: c.nextId,
|
||||||
|
TelemetryType: MessageDeliveryConfirmedMetric,
|
||||||
|
TelemetryData: c.ProcessMessageDeliveryConfirmed(v),
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
c.logger.Error("Unknown telemetry data type")
|
c.logger.Error("Unknown telemetry data type")
|
||||||
return
|
return
|
||||||
|
@ -511,14 +514,6 @@ func (c *Client) ProcessPeerCountByOrigin(peerCountByOrigin PeerCountByOrigin) *
|
||||||
return &jsonRawMessage
|
return &jsonRawMessage
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) ProcessStoreConfirmationFailed(storeConfirmationFailed StoreConfirmationFailed) *json.RawMessage {
|
|
||||||
postBody := c.commonPostBody()
|
|
||||||
postBody["messageHash"] = storeConfirmationFailed.MessageHash
|
|
||||||
body, _ := json.Marshal(postBody)
|
|
||||||
jsonRawMessage := json.RawMessage(body)
|
|
||||||
return &jsonRawMessage
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Client) ProcessMissedMessage(missedMessage MissedMessage) *json.RawMessage {
|
func (c *Client) ProcessMissedMessage(missedMessage MissedMessage) *json.RawMessage {
|
||||||
postBody := c.commonPostBody()
|
postBody := c.commonPostBody()
|
||||||
postBody["messageHash"] = missedMessage.Envelope.Hash().String()
|
postBody["messageHash"] = missedMessage.Envelope.Hash().String()
|
||||||
|
@ -541,6 +536,14 @@ func (c *Client) ProcessMissedRelevantMessage(missedMessage MissedRelevantMessag
|
||||||
return &jsonRawMessage
|
return &jsonRawMessage
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Client) ProcessMessageDeliveryConfirmed(messageDeliveryConfirmed MessageDeliveryConfirmed) *json.RawMessage {
|
||||||
|
postBody := c.commonPostBody()
|
||||||
|
postBody["numMessages"] = messageDeliveryConfirmed.NumMessages
|
||||||
|
body, _ := json.Marshal(postBody)
|
||||||
|
jsonRawMessage := json.RawMessage(body)
|
||||||
|
return &jsonRawMessage
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) {
|
func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) {
|
||||||
c.logger.Debug("Pushing envelope update to telemetry server", zap.String("hash", types.EncodeHex(shhMessage.Hash)))
|
c.logger.Debug("Pushing envelope update to telemetry server", zap.String("hash", types.EncodeHex(shhMessage.Hash)))
|
||||||
url := fmt.Sprintf("%s/update-envelope", c.serverURL)
|
url := fmt.Sprintf("%s/update-envelope", c.serverURL)
|
||||||
|
|
|
@ -112,9 +112,9 @@ type ITelemetryClient interface {
|
||||||
PushMessageCheckFailure(ctx context.Context, messageHash string)
|
PushMessageCheckFailure(ctx context.Context, messageHash string)
|
||||||
PushPeerCountByShard(ctx context.Context, peerCountByShard map[uint16]uint)
|
PushPeerCountByShard(ctx context.Context, peerCountByShard map[uint16]uint)
|
||||||
PushPeerCountByOrigin(ctx context.Context, peerCountByOrigin map[wps.Origin]uint)
|
PushPeerCountByOrigin(ctx context.Context, peerCountByOrigin map[wps.Origin]uint)
|
||||||
PushStoreConfirmationFailed(ctx context.Context, msgHash gethcommon.Hash)
|
|
||||||
PushMissedMessage(ctx context.Context, envelope *protocol.Envelope)
|
PushMissedMessage(ctx context.Context, envelope *protocol.Envelope)
|
||||||
PushMissedRelevantMessage(ctx context.Context, message *common.ReceivedMessage)
|
PushMissedRelevantMessage(ctx context.Context, message *common.ReceivedMessage)
|
||||||
|
PushMessageDeliveryConfirmed(ctx context.Context, numMessages int)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Waku represents a dark communication interface through the Ethereum
|
// Waku represents a dark communication interface through the Ethereum
|
||||||
|
@ -986,6 +986,9 @@ func (w *Waku) SkipPublishToTopic(value bool) {
|
||||||
|
|
||||||
func (w *Waku) ConfirmMessageDelivered(hashes []gethcommon.Hash) {
|
func (w *Waku) ConfirmMessageDelivered(hashes []gethcommon.Hash) {
|
||||||
w.messageSender.MessagesDelivered(hashes)
|
w.messageSender.MessagesDelivered(hashes)
|
||||||
|
if w.statusTelemetryClient != nil {
|
||||||
|
w.statusTelemetryClient.PushMessageDeliveryConfirmed(w.ctx, len(hashes))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *Waku) SetStorePeerID(peerID peer.ID) {
|
func (w *Waku) SetStorePeerID(peerID peer.ID) {
|
||||||
|
|
Loading…
Reference in New Issue