feat(telemetry)_: publish peerId (#5534)

* feat(telemetry)_: publish peerId

* fix_: fix goimports lint error in waku

* fix_: trim extra slash, use the right Waku service

* fix_: update protocol/messenger.go

Co-authored-by: richΛrd <info@richardramos.me>

* fix_: panic when PeerID is called on wakuv1

* fix_: use option instead of additional param for PeerID

---------

Co-authored-by: Arseniy Klempner <arseniyk@status.im>
Co-authored-by: richΛrd <info@richardramos.me>
This commit is contained in:
Vaclav Pavlin 2024-08-01 05:27:43 +02:00 committed by GitHub
parent 8787da1b89
commit 780e3e55f4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 69 additions and 7 deletions

View File

@ -75,10 +75,9 @@ var SimulateFlags = append([]cli.Flag{
Usage: "How many messages to sent from each user",
},
&cli.BoolFlag{
Name: MessageFailureFlag,
Aliases: []string{"f"},
Usage: "Causes messages to fail about 25% of the time",
Value: false,
Name: MessageFailureFlag,
Usage: "Causes messages to fail about 25% of the time",
Value: false,
},
}, CommonFlags...)

View File

@ -79,7 +79,8 @@ func start(p StartParams, logger *zap.SugaredLogger) (*StatusCLI, error) {
if err != nil {
return nil, err
}
telemetryClient := telemetry.NewClient(telemetryLogger, p.TelemetryURL, backend.SelectedAccountKeyID(), p.Name, "cli")
waku := backend.StatusNode().WakuV2Service()
telemetryClient := telemetry.NewClient(telemetryLogger, p.TelemetryURL, backend.SelectedAccountKeyID(), p.Name, "cli", telemetry.WithPeerID(waku.PeerID().String()))
go telemetryClient.Start(context.Background())
backend.StatusNode().WakuV2Service().SetStatusTelemetryClient(telemetryClient)
}

View File

@ -333,3 +333,7 @@ func (w *GethWakuWrapper) ConfirmMessageDelivered(hashes []common.Hash) {
func (w *GethWakuWrapper) SetStorePeerID(peerID peer.ID) {
}
func (w *GethWakuWrapper) PeerID() peer.ID {
panic("not implemented")
}

View File

@ -350,3 +350,7 @@ func (w *gethWakuV2Wrapper) ConfirmMessageDelivered(hashes []common.Hash) {
func (w *gethWakuV2Wrapper) SetStorePeerID(peerID peer.ID) {
w.waku.SetStorePeerID(peerID)
}
func (w *gethWakuV2Wrapper) PeerID() peer.ID {
return w.waku.PeerID()
}

View File

@ -194,4 +194,7 @@ type Waku interface {
// SetStorePeerID updates the peer id of store node
SetStorePeerID(peerID peer.ID)
// PeerID returns node's PeerID
PeerID() peer.ID
}

View File

@ -17,6 +17,7 @@ import (
"github.com/davecgh/go-spew/spew"
"github.com/golang/protobuf/proto"
"github.com/google/uuid"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"go.uber.org/zap"
"golang.org/x/time/rate"
@ -359,6 +360,7 @@ func NewMessenger(
// Initialize transport layer.
var transp *transport.Transport
var peerId peer.ID
if waku, err := node.GetWaku(nil); err == nil && waku != nil {
transp, err = transport.NewTransport(
@ -379,6 +381,7 @@ func NewMessenger(
if err != nil || wakuV2 == nil {
return nil, errors.Wrap(err, "failed to find Whisper and Waku V1/V2 services")
}
peerId = wakuV2.PeerID()
transp, err = transport.NewTransport(
wakuV2,
identity,
@ -552,7 +555,7 @@ func NewMessenger(
var telemetryClient *telemetry.Client
if c.telemetryServerURL != "" {
telemetryClient = telemetry.NewClient(logger, c.telemetryServerURL, c.account.KeyUID, nodeName, version)
telemetryClient = telemetry.NewClient(logger, c.telemetryServerURL, c.account.KeyUID, nodeName, version, telemetry.WithPeerID(peerId.String()))
if c.wakuService != nil {
c.wakuService.SetStatusTelemetryClient(telemetryClient)
}

View File

@ -6,6 +6,7 @@ import (
"encoding/json"
"fmt"
"net/http"
"strings"
"sync"
"time"
@ -76,6 +77,7 @@ type Client struct {
logger *zap.Logger
keyUID string
nodeName string
peerId string
version string
telemetryCh chan TelemetryRequest
telemetryCacheLock sync.Mutex
@ -94,7 +96,14 @@ func WithSendPeriod(sendPeriod time.Duration) TelemetryClientOption {
}
}
func WithPeerID(peerId string) TelemetryClientOption {
return func(c *Client) {
c.peerId = peerId
}
}
func NewClient(logger *zap.Logger, serverURL string, keyUID string, nodeName string, version string, opts ...TelemetryClientOption) *Client {
serverURL = strings.TrimRight(serverURL, "/")
client := &Client{
serverURL: serverURL,
httpClient: &http.Client{Timeout: time.Minute},
@ -255,6 +264,7 @@ func (c *Client) ProcessReceivedMessages(receivedMessages ReceivedMessages) *jso
"topic": receivedMessages.Filter.ContentTopic.String(),
"messageType": message.ApplicationLayer.Type.String(),
"receiverKeyUID": c.keyUID,
"peerId": c.peerId,
"nodeName": c.nodeName,
"messageSize": len(receivedMessages.SSHMessage.Payload),
"statusVersion": c.version,
@ -272,6 +282,7 @@ func (c *Client) ProcessReceivedEnvelope(envelope *v2protocol.Envelope) *json.Ra
"pubsubTopic": envelope.PubsubTopic(),
"topic": envelope.Message().ContentTopic,
"receiverKeyUID": c.keyUID,
"peerId": c.peerId,
"nodeName": c.nodeName,
"statusVersion": c.version,
}
@ -287,6 +298,7 @@ func (c *Client) ProcessSentEnvelope(sentEnvelope wakuv2.SentEnvelope) *json.Raw
"pubsubTopic": sentEnvelope.Envelope.PubsubTopic(),
"topic": sentEnvelope.Envelope.Message().ContentTopic,
"senderKeyUID": c.keyUID,
"peerId": c.peerId,
"nodeName": c.nodeName,
"publishMethod": sentEnvelope.PublishMethod.String(),
"statusVersion": c.version,
@ -303,6 +315,7 @@ func (c *Client) ProcessErrorSendingEnvelope(errorSendingEnvelope wakuv2.ErrorSe
"pubsubTopic": errorSendingEnvelope.SentEnvelope.Envelope.PubsubTopic(),
"topic": errorSendingEnvelope.SentEnvelope.Envelope.Message().ContentTopic,
"senderKeyUID": c.keyUID,
"peerId": c.peerId,
"nodeName": c.nodeName,
"publishMethod": errorSendingEnvelope.SentEnvelope.PublishMethod.String(),
"statusVersion": c.version,
@ -318,6 +331,7 @@ func (c *Client) ProcessPeerCount(peerCount PeerCount) *json.RawMessage {
"peerCount": peerCount.PeerCount,
"nodeName": c.nodeName,
"nodeKeyUID": c.keyUID,
"peerId": c.peerId,
"statusVersion": c.version,
"timestamp": time.Now().Unix(),
}
@ -339,6 +353,7 @@ func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, proces
"pubsubTopic": shhMessage.PubsubTopic,
"topic": shhMessage.Topic,
"receiverKeyUID": c.keyUID,
"peerId": c.peerId,
"nodeName": c.nodeName,
"processingError": errorString,
}

View File

@ -92,7 +92,7 @@ func createClient(t *testing.T, mockServerURL string) *Client {
if err != nil {
t.Fatalf("Failed to create logger: %v", err)
}
return NewClient(logger, mockServerURL, "testUID", "testNode", "1.0", WithSendPeriod(100*time.Millisecond))
return NewClient(logger, mockServerURL, "testUID", "testNode", "1.0", WithSendPeriod(100*time.Millisecond), WithPeerID("16Uiu2HAkvWiyFsgRhuJEb9JfjYxEkoHLgnUQmr1N5mKWnYjxYRVm"))
}
type expectedCondition func(received []TelemetryRequest) (shouldSucceed bool, shouldFail bool)
@ -386,3 +386,30 @@ func TestPeerCount(t *testing.T) {
require.NotEqual(t, 0, len(w.Peers()))
})
}
func TestPeerId(t *testing.T) {
expectedCondition := func(received []TelemetryRequest) (shouldSucceed bool, shouldFail bool) {
var data map[string]interface{}
err := json.Unmarshal(*received[0].TelemetryData, &data)
if err != nil {
return false, true
}
_, ok := data["peerId"]
require.True(t, ok)
return ok, false
}
withMockServer(t, ReceivedEnvelopeMetric, expectedCondition, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) {
client.Start(ctx)
client.PushReceivedEnvelope(v2protocol.NewEnvelope(&pb.WakuMessage{
Payload: []byte{1, 2, 3, 4, 5},
ContentTopic: testContentTopic,
Version: proto.Uint32(0),
Timestamp: proto.Int64(time.Now().Unix()),
}, 0, ""))
})
}

View File

@ -28,6 +28,8 @@ import (
"sync"
"time"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/ethereum/go-ethereum/common/hexutil"
"go.uber.org/zap"
@ -1597,6 +1599,10 @@ func (w *Waku) Clean() error {
return nil
}
func (w *Waku) PeerID() peer.ID {
panic("not implemented")
}
// validatePrivateKey checks the format of the given private key.
func validatePrivateKey(k *ecdsa.PrivateKey) bool {
if k == nil || k.D == nil || k.D.Sign() == 0 {