feat(telemetry)_: send connection failure metric (#5518)

This commit is contained in:
Arseniy Klempner 2024-08-05 11:44:57 -07:00 committed by GitHub
parent 551af54fda
commit 39497c2bff
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 92 additions and 30 deletions

View File

@ -31,6 +31,7 @@ const (
ReceivedMessagesMetric TelemetryType = "ReceivedMessages"
ErrorSendingEnvelopeMetric TelemetryType = "ErrorSendingEnvelope"
PeerCountMetric TelemetryType = "PeerCount"
PeerConnFailuresMetric TelemetryType = "PeerConnFailure"
MaxRetryCache = 5000
)
@ -58,7 +59,22 @@ func (c *Client) PushErrorSendingEnvelope(errorSendingEnvelope wakuv2.ErrorSendi
}
func (c *Client) PushPeerCount(peerCount int) {
c.processAndPushTelemetry(PeerCount{PeerCount: peerCount})
if peerCount != c.lastPeerCount {
c.lastPeerCount = peerCount
c.processAndPushTelemetry(PeerCount{PeerCount: peerCount})
}
}
func (c *Client) PushPeerConnFailures(peerConnFailures map[string]int) {
for peerID, failures := range peerConnFailures {
if lastFailures, exists := c.lastPeerConnFailures[peerID]; exists {
if failures == lastFailures {
continue
}
}
c.lastPeerConnFailures[peerID] = failures
c.processAndPushTelemetry(PeerConnFailure{FailedPeerId: peerID, FailureCount: failures})
}
}
type ReceivedMessages struct {
@ -71,21 +87,28 @@ type PeerCount struct {
PeerCount int
}
type PeerConnFailure struct {
FailedPeerId string
FailureCount int
}
type Client struct {
serverURL string
httpClient *http.Client
logger *zap.Logger
keyUID string
nodeName string
peerId string
version string
telemetryCh chan TelemetryRequest
telemetryCacheLock sync.Mutex
telemetryCache []TelemetryRequest
telemetryRetryCache []TelemetryRequest
nextIdLock sync.Mutex
nextId int
sendPeriod time.Duration
serverURL string
httpClient *http.Client
logger *zap.Logger
keyUID string
nodeName string
peerId string
version string
telemetryCh chan TelemetryRequest
telemetryCacheLock sync.Mutex
telemetryCache []TelemetryRequest
telemetryRetryCache []TelemetryRequest
nextIdLock sync.Mutex
nextId int
sendPeriod time.Duration
lastPeerCount int
lastPeerConnFailures map[string]int
}
type TelemetryClientOption func(*Client)
@ -105,19 +128,21 @@ func WithPeerID(peerId string) TelemetryClientOption {
func NewClient(logger *zap.Logger, serverURL string, keyUID string, nodeName string, version string, opts ...TelemetryClientOption) *Client {
serverURL = strings.TrimRight(serverURL, "/")
client := &Client{
serverURL: serverURL,
httpClient: &http.Client{Timeout: time.Minute},
logger: logger,
keyUID: keyUID,
nodeName: nodeName,
version: version,
telemetryCh: make(chan TelemetryRequest),
telemetryCacheLock: sync.Mutex{},
telemetryCache: make([]TelemetryRequest, 0),
telemetryRetryCache: make([]TelemetryRequest, 0),
nextId: 0,
nextIdLock: sync.Mutex{},
sendPeriod: 10 * time.Second, // default value
serverURL: serverURL,
httpClient: &http.Client{Timeout: time.Minute},
logger: logger,
keyUID: keyUID,
nodeName: nodeName,
version: version,
telemetryCh: make(chan TelemetryRequest),
telemetryCacheLock: sync.Mutex{},
telemetryCache: make([]TelemetryRequest, 0),
telemetryRetryCache: make([]TelemetryRequest, 0),
nextId: 0,
nextIdLock: sync.Mutex{},
sendPeriod: 10 * time.Second, // default value
lastPeerCount: 0,
lastPeerConnFailures: make(map[string]int),
}
for _, opt := range opts {
@ -207,6 +232,12 @@ func (c *Client) processAndPushTelemetry(data interface{}) {
TelemetryType: PeerCountMetric,
TelemetryData: c.ProcessPeerCount(v),
}
case PeerConnFailure:
telemetryRequest = TelemetryRequest{
Id: c.nextId,
TelemetryType: PeerConnFailuresMetric,
TelemetryData: c.ProcessPeerConnFailure(v),
}
default:
c.logger.Error("Unknown telemetry data type")
return
@ -340,6 +371,21 @@ func (c *Client) ProcessPeerCount(peerCount PeerCount) *json.RawMessage {
return &jsonRawMessage
}
func (c *Client) ProcessPeerConnFailure(peerConnFailure PeerConnFailure) *json.RawMessage {
postBody := map[string]interface{}{
"failedPeerId": peerConnFailure.FailedPeerId,
"failureCount": peerConnFailure.FailureCount,
"nodeName": c.nodeName,
"nodeKeyUID": c.keyUID,
"peerId": c.peerId,
"statusVersion": c.version,
"timestamp": time.Now().Unix(),
}
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
}
func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) {
c.logger.Debug("Pushing envelope update to telemetry server", zap.String("hash", types.EncodeHex(shhMessage.Hash)))
url := fmt.Sprintf("%s/update-envelope", c.serverURL)

View File

@ -109,8 +109,6 @@ func (w *Waku) broadcast() {
}
}
fn = w.limiter.ThrottlePublishFn(w.ctx, fn)
// Wraps the publish function with a call to the telemetry client
if w.statusTelemetryClient != nil {
sendFn := fn
@ -125,6 +123,9 @@ func (w *Waku) broadcast() {
}
}
// Wraps the publish function with rate limiter
fn = w.limiter.ThrottlePublishFn(w.ctx, fn)
w.wg.Add(1)
go w.publishEnvelope(envelope, fn, logger)
}

View File

@ -112,6 +112,7 @@ type ITelemetryClient interface {
PushSentEnvelope(sentEnvelope SentEnvelope)
PushErrorSendingEnvelope(errorSendingEnvelope ErrorSendingEnvelope)
PushPeerCount(peerCount int)
PushPeerConnFailures(peerConnFailures map[string]int)
}
// Waku represents a dark communication interface through the Ethereum
@ -1275,7 +1276,9 @@ func (w *Waku) Start() error {
}
if w.statusTelemetryClient != nil {
connFailures := FormatPeerConnFailures(w.node)
w.statusTelemetryClient.PushPeerCount(w.PeerCount())
w.statusTelemetryClient.PushPeerConnFailures(connFailures)
}
w.ConnectionChanged(connection.State{
@ -1953,6 +1956,18 @@ func (w *Waku) StoreNode() *store.WakuStore {
return w.node.Store()
}
func FormatPeerConnFailures(wakuNode *node.WakuNode) map[string]int {
p := make(map[string]int)
for _, peerID := range wakuNode.Host().Network().Peers() {
peerInfo := wakuNode.Host().Peerstore().PeerInfo(peerID)
connFailures := wakuNode.Host().Peerstore().(wps.WakuPeerstore).ConnFailures(peerInfo)
if connFailures > 0 {
p[peerID.String()] = connFailures
}
}
return p
}
func (w *Waku) LegacyStoreNode() legacy_store.Store {
return w.node.LegacyStore()
}