feat_: call telemetry upon error pushing envelope (#5430)

* feat_: call telemetry upon error pushing envelope

* feat_: call telemetry upon error pushing envelope

---------

Co-authored-by: Václav Pavlín <vaclav.pavlin@gmail.com>
This commit is contained in:
Arseniy Klempner 2024-06-28 03:24:04 -07:00 committed by GitHub
parent bd8aa0dac7
commit 5934233266
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 288 additions and 141 deletions

View File

@ -22,6 +22,7 @@ const APIModulesFlag = "api-modules"
const TelemetryServerURLFlag = "telemetry-server-url" const TelemetryServerURLFlag = "telemetry-server-url"
const KeyUIDFlag = "key-uid" const KeyUIDFlag = "key-uid"
const DebugLevel = "debug" const DebugLevel = "debug"
const MessageFailureFlag = "fail"
const RetrieveInterval = 300 * time.Millisecond const RetrieveInterval = 300 * time.Millisecond
const SendInterval = 1 * time.Second const SendInterval = 1 * time.Second
@ -64,6 +65,12 @@ var SimulateFlags = append([]cli.Flag{
Value: 1, Value: 1,
Usage: "How many messages to sent from each user", Usage: "How many messages to sent from each user",
}, },
&cli.BoolFlag{
Name: MessageFailureFlag,
Aliases: []string{"f"},
Usage: "Causes messages to fail about 25% of the time",
Value: false,
},
}, CommonFlags...) }, CommonFlags...)
var ServeFlags = append([]cli.Flag{ var ServeFlags = append([]cli.Flag{

View File

@ -3,7 +3,9 @@ package main
import ( import (
"bufio" "bufio"
"context" "context"
"crypto/rand"
"log/slog" "log/slog"
"math/big"
"os" "os"
"strings" "strings"
"sync" "sync"
@ -41,7 +43,30 @@ func (cli *StatusCLI) sendContactRequestAcceptance(ctx context.Context, msgID st
return nil return nil
} }
func (cli *StatusCLI) sendDirectMessage(ctx context.Context, text string) error { func (cli *StatusCLI) randomFailure() func() {
nBig, err := rand.Int(rand.Reader, big.NewInt(100))
if err != nil {
cli.logger.Error("failed to generate random number", "err", err)
return nil
}
n := nBig.Int64()
if n >= 40 {
return nil
}
cli.backend.StatusNode().WakuV2Service().SkipPublishToTopic(true)
return func() {
cli.backend.StatusNode().WakuV2Service().SkipPublishToTopic(false)
}
}
func (cli *StatusCLI) sendDirectMessage(ctx context.Context, text string, options ...bool) error {
randomFailure := false
if len(options) > 0 {
randomFailure = options[0]
}
if len(cli.messenger.MutualContacts()) == 0 { if len(cli.messenger.MutualContacts()) == 0 {
return nil return nil
} }
@ -58,8 +83,20 @@ func (cli *StatusCLI) sendDirectMessage(ctx context.Context, text string) error
inputMessage.ContentType = protobuf.ChatMessage_TEXT_PLAIN inputMessage.ContentType = protobuf.ChatMessage_TEXT_PLAIN
inputMessage.Text = text inputMessage.Text = text
shouldFail := false
if randomFailure {
if postFailure := cli.randomFailure(); postFailure != nil {
defer postFailure()
shouldFail = true
}
}
resp, err := cli.messenger.SendChatMessage(ctx, inputMessage) resp, err := cli.messenger.SendChatMessage(ctx, inputMessage)
if err != nil { if err != nil {
if shouldFail {
cli.logger.Info("simulating message failure")
cli.logger.Error("error sending message", "err", err)
return nil
}
return err return err
} }

View File

@ -33,6 +33,7 @@ func simulate(cCtx *cli.Context) error {
// Start messengers // Start messengers
apiModules := cCtx.String(APIModulesFlag) apiModules := cCtx.String(APIModulesFlag)
telemetryUrl := cCtx.String(TelemetryServerURLFlag) telemetryUrl := cCtx.String(TelemetryServerURLFlag)
failMessages := cCtx.Bool(MessageFailureFlag)
alice, err := start("Alice", 0, apiModules, telemetryUrl, "", logger.Named("alice")) alice, err := start("Alice", 0, apiModules, telemetryUrl, "", logger.Named("alice"))
if err != nil { if err != nil {
@ -76,13 +77,13 @@ func simulate(cCtx *cli.Context) error {
interactiveSendMessageLoop(ctx, alice, charlie) interactiveSendMessageLoop(ctx, alice, charlie)
} else { } else {
for i := 0; i < cCtx.Int(CountFlag); i++ { for i := 0; i < cCtx.Int(CountFlag); i++ {
err = alice.sendDirectMessage(ctx, fmt.Sprintf("message from alice, number: %d", i+1)) err = alice.sendDirectMessage(ctx, fmt.Sprintf("message from alice, number: %d", i+1), failMessages)
if err != nil { if err != nil {
return err return err
} }
time.Sleep(WaitingInterval) time.Sleep(WaitingInterval)
err = charlie.sendDirectMessage(ctx, fmt.Sprintf("message from charlie, number: %d", i+1)) err = charlie.sendDirectMessage(ctx, fmt.Sprintf("message from charlie, number: %d", i+1), failMessages)
if err != nil { if err != nil {
return err return err
} }

View File

@ -208,6 +208,7 @@ type Settings struct {
GifFavorites *json.RawMessage `json:"gifs/favorite-gifs"` GifFavorites *json.RawMessage `json:"gifs/favorite-gifs"`
OpenseaEnabled bool `json:"opensea-enabled?,omitempty"` OpenseaEnabled bool `json:"opensea-enabled?,omitempty"`
TelemetryServerURL string `json:"telemetry-server-url,omitempty"` TelemetryServerURL string `json:"telemetry-server-url,omitempty"`
TelemetrySendPeriodMs int `json:"telemetry-send-period-ms,omitempty"`
LastBackup uint64 `json:"last-backup,omitempty"` LastBackup uint64 `json:"last-backup,omitempty"`
BackupEnabled bool `json:"backup-enabled?,omitempty"` BackupEnabled bool `json:"backup-enabled?,omitempty"`
AutoMessageEnabled bool `json:"auto-message-enabled?,omitempty"` AutoMessageEnabled bool `json:"auto-message-enabled?,omitempty"`

View File

@ -113,6 +113,7 @@ type config struct {
messengerSignalsHandler MessengerSignalsHandler messengerSignalsHandler MessengerSignalsHandler
telemetryServerURL string telemetryServerURL string
telemetrySendPeriod time.Duration
wakuService *wakuv2.Waku wakuService *wakuv2.Waku
messageResendMinDelay time.Duration messageResendMinDelay time.Duration
@ -258,9 +259,10 @@ func WithAnonMetricsServerConfig(anonMetricsServerConfig *anonmetrics.ServerConf
} }
} }
func WithTelemetry(serverURL string) Option { func WithTelemetry(serverURL string, sendPeriod time.Duration) Option {
return func(c *config) error { return func(c *config) error {
c.telemetryServerURL = serverURL c.telemetryServerURL = serverURL
c.telemetrySendPeriod = sendPeriod
return nil return nil
} }
} }

View File

@ -464,7 +464,7 @@ func buildMessengerOptions(
} }
if settings.TelemetryServerURL != "" { if settings.TelemetryServerURL != "" {
options = append(options, protocol.WithTelemetry(settings.TelemetryServerURL)) options = append(options, protocol.WithTelemetry(settings.TelemetryServerURL, time.Duration(settings.TelemetrySendPeriodMs)*time.Millisecond))
} }
if settings.PushNotificationsServerEnabled { if settings.PushNotificationsServerEnabled {

View File

@ -6,6 +6,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"net/http" "net/http"
"sync"
"time" "time"
"go.uber.org/zap" "go.uber.org/zap"
@ -27,6 +28,7 @@ const (
SentEnvelopeMetric TelemetryType = "SentEnvelope" SentEnvelopeMetric TelemetryType = "SentEnvelope"
UpdateEnvelopeMetric TelemetryType = "UpdateEnvelope" UpdateEnvelopeMetric TelemetryType = "UpdateEnvelope"
ReceivedMessagesMetric TelemetryType = "ReceivedMessages" ReceivedMessagesMetric TelemetryType = "ReceivedMessages"
ErrorSendingEnvelopeMetric TelemetryType = "ErrorSendingEnvelope"
) )
type TelemetryRequest struct { type TelemetryRequest struct {
@ -36,15 +38,19 @@ type TelemetryRequest struct {
} }
func (c *Client) PushReceivedMessages(receivedMessages ReceivedMessages) { func (c *Client) PushReceivedMessages(receivedMessages ReceivedMessages) {
c.receivedMessagesCh <- receivedMessages c.processAndPushTelemetry(receivedMessages)
} }
func (c *Client) PushSentEnvelope(sentEnvelope wakuv2.SentEnvelope) { func (c *Client) PushSentEnvelope(sentEnvelope wakuv2.SentEnvelope) {
c.sentEnvelopeCh <- sentEnvelope c.processAndPushTelemetry(sentEnvelope)
} }
func (c *Client) PushReceivedEnvelope(receivedEnvelope *v2protocol.Envelope) { func (c *Client) PushReceivedEnvelope(receivedEnvelope *v2protocol.Envelope) {
c.receivedEnvelopeCh <- receivedEnvelope c.processAndPushTelemetry(receivedEnvelope)
}
func (c *Client) PushErrorSendingEnvelope(errorSendingEnvelope wakuv2.ErrorSendingEnvelope) {
c.processAndPushTelemetry(errorSendingEnvelope)
} }
type ReceivedMessages struct { type ReceivedMessages struct {
@ -60,50 +66,59 @@ type Client struct {
keyUID string keyUID string
nodeName string nodeName string
version string version string
receivedMessagesCh chan ReceivedMessages
receivedEnvelopeCh chan *v2protocol.Envelope
sentEnvelopeCh chan wakuv2.SentEnvelope
telemetryCh chan TelemetryRequest telemetryCh chan TelemetryRequest
telemetryCacheLock sync.Mutex
telemetryCache []TelemetryRequest
nextIdLock sync.Mutex
nextId int nextId int
sendPeriod time.Duration sendPeriod time.Duration
} }
func NewClient(logger *zap.Logger, serverURL string, keyUID string, nodeName string, version string) *Client { type TelemetryClientOption func(*Client)
return &Client{
func WithSendPeriod(sendPeriod time.Duration) TelemetryClientOption {
return func(c *Client) {
c.sendPeriod = sendPeriod
}
}
func NewClient(logger *zap.Logger, serverURL string, keyUID string, nodeName string, version string, opts ...TelemetryClientOption) *Client {
client := &Client{
serverURL: serverURL, serverURL: serverURL,
httpClient: &http.Client{Timeout: time.Minute}, httpClient: &http.Client{Timeout: time.Minute},
logger: logger, logger: logger,
keyUID: keyUID, keyUID: keyUID,
nodeName: nodeName, nodeName: nodeName,
version: version, version: version,
receivedMessagesCh: make(chan ReceivedMessages),
receivedEnvelopeCh: make(chan *v2protocol.Envelope),
sentEnvelopeCh: make(chan wakuv2.SentEnvelope),
telemetryCh: make(chan TelemetryRequest), telemetryCh: make(chan TelemetryRequest),
telemetryCacheLock: sync.Mutex{},
telemetryCache: make([]TelemetryRequest, 0),
nextId: 0, nextId: 0,
sendPeriod: 10 * time.Second, nextIdLock: sync.Mutex{},
} sendPeriod: 10 * time.Second, // default value
} }
func (c *Client) CollectAndProcessTelemetry(ctx context.Context) { for _, opt := range opts {
opt(client)
}
return client
}
func (c *Client) Start(ctx context.Context) {
go func() { go func() {
for { for {
select { select {
case receivedMessages := <-c.receivedMessagesCh: case telemetryRequest := <-c.telemetryCh:
c.processAndPushTelemetry(receivedMessages) c.telemetryCacheLock.Lock()
case receivedEnvelope := <-c.receivedEnvelopeCh: c.telemetryCache = append(c.telemetryCache, telemetryRequest)
c.processAndPushTelemetry(receivedEnvelope) c.telemetryCacheLock.Unlock()
case sentEnvelope := <-c.sentEnvelopeCh:
c.processAndPushTelemetry(sentEnvelope)
case <-ctx.Done(): case <-ctx.Done():
return return
} }
} }
}() }()
}
func (c *Client) Start(ctx context.Context) {
go c.CollectAndProcessTelemetry(ctx)
go func() { go func() {
ticker := time.NewTicker(c.sendPeriod) ticker := time.NewTicker(c.sendPeriod)
defer ticker.Stop() defer ticker.Stop()
@ -111,16 +126,12 @@ func (c *Client) Start(ctx context.Context) {
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
var telemetryRequests []TelemetryRequest c.telemetryCacheLock.Lock()
collecting := true telemetryRequests := make([]TelemetryRequest, len(c.telemetryCache))
for collecting { copy(telemetryRequests, c.telemetryCache)
select { c.telemetryCache = nil
case telemetryRequest := <-c.telemetryCh: c.telemetryCacheLock.Unlock()
telemetryRequests = append(telemetryRequests, telemetryRequest)
default:
collecting = false
}
}
if len(telemetryRequests) > 0 { if len(telemetryRequests) > 0 {
c.pushTelemetryRequest(telemetryRequests) c.pushTelemetryRequest(telemetryRequests)
} }
@ -128,6 +139,7 @@ func (c *Client) Start(ctx context.Context) {
return return
} }
} }
}() }()
} }
@ -152,13 +164,21 @@ func (c *Client) processAndPushTelemetry(data interface{}) {
TelemetryType: SentEnvelopeMetric, TelemetryType: SentEnvelopeMetric,
TelemetryData: c.ProcessSentEnvelope(v), TelemetryData: c.ProcessSentEnvelope(v),
} }
case wakuv2.ErrorSendingEnvelope:
telemetryRequest = TelemetryRequest{
Id: c.nextId,
TelemetryType: ErrorSendingEnvelopeMetric,
TelemetryData: c.ProcessErrorSendingEnvelope(v),
}
default: default:
c.logger.Error("Unknown telemetry data type") c.logger.Error("Unknown telemetry data type")
return return
} }
c.nextId++
c.telemetryCh <- telemetryRequest c.telemetryCh <- telemetryRequest
c.nextIdLock.Lock()
c.nextId++
c.nextIdLock.Unlock()
} }
func (c *Client) pushTelemetryRequest(request []TelemetryRequest) { func (c *Client) pushTelemetryRequest(request []TelemetryRequest) {
@ -224,6 +244,23 @@ func (c *Client) ProcessSentEnvelope(sentEnvelope wakuv2.SentEnvelope) *json.Raw
return &jsonRawMessage return &jsonRawMessage
} }
func (c *Client) ProcessErrorSendingEnvelope(errorSendingEnvelope wakuv2.ErrorSendingEnvelope) *json.RawMessage {
postBody := map[string]interface{}{
"messageHash": errorSendingEnvelope.SentEnvelope.Envelope.Hash().String(),
"sentAt": uint32(errorSendingEnvelope.SentEnvelope.Envelope.Message().GetTimestamp() / int64(time.Second)),
"pubsubTopic": errorSendingEnvelope.SentEnvelope.Envelope.PubsubTopic(),
"topic": errorSendingEnvelope.SentEnvelope.Envelope.Message().ContentTopic,
"senderKeyUID": c.keyUID,
"nodeName": c.nodeName,
"publishMethod": errorSendingEnvelope.SentEnvelope.PublishMethod.String(),
"statusVersion": c.version,
"error": errorSendingEnvelope.Error.Error(),
}
body, _ := json.Marshal(postBody)
jsonRawMessage := json.RawMessage(body)
return &jsonRawMessage
}
func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) { func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) {
c.logger.Debug("Pushing envelope update to telemetry server", zap.String("hash", types.EncodeHex(shhMessage.Hash))) c.logger.Debug("Pushing envelope update to telemetry server", zap.String("hash", types.EncodeHex(shhMessage.Hash)))
url := fmt.Sprintf("%s/update-envelope", c.serverURL) url := fmt.Sprintf("%s/update-envelope", c.serverURL)

View File

@ -1,9 +1,12 @@
package telemetry package telemetry
import ( import (
"context"
"encoding/json" "encoding/json"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"os"
"sync"
"testing" "testing"
"time" "time"
@ -13,14 +16,22 @@ import (
v2protocol "github.com/waku-org/go-waku/waku/v2/protocol" v2protocol "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/stretchr/testify/require"
"github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/transport" "github.com/status-im/status-go/protocol/transport"
v1protocol "github.com/status-im/status-go/protocol/v1" v1protocol "github.com/status-im/status-go/protocol/v1"
"github.com/status-im/status-go/wakuv2" "github.com/status-im/status-go/wakuv2"
) )
func createMockServer(t *testing.T) *httptest.Server { var (
testContentTopic = "/waku/1/0x12345679/rfc26"
)
func createMockServer(t *testing.T, wg *sync.WaitGroup, expectedType TelemetryType) *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer wg.Done() // Signal that a request was received
if r.Method != "POST" { if r.Method != "POST" {
t.Errorf("Expected 'POST' request, got '%s'", r.Method) t.Errorf("Expected 'POST' request, got '%s'", r.Method)
} }
@ -37,34 +48,51 @@ func createMockServer(t *testing.T) *httptest.Server {
if len(received) != 1 { if len(received) != 1 {
t.Errorf("Unexpected data received: %+v", received) t.Errorf("Unexpected data received: %+v", received)
} else {
if received[0].TelemetryType != expectedType {
t.Errorf("Unexpected telemetry type: got %v, want %v", received[0].TelemetryType, expectedType)
} else { } else {
// If the data is as expected, respond with success // If the data is as expected, respond with success
t.Log("Responding with success") t.Log("Responding with success")
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
} }
}
})) }))
} }
func TestClient_ProcessReceivedMessages(t *testing.T) { func createClient(t *testing.T, mockServerURL string) *Client {
// Setup a mock server to handle post requests
mockServer := createMockServer(t)
defer mockServer.Close()
// Create a client with the mock server URL
config := zap.NewDevelopmentConfig() config := zap.NewDevelopmentConfig()
config.Level = zap.NewAtomicLevelAt(zap.DebugLevel) config.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
logger, err := config.Build() logger, err := config.Build()
if err != nil { if err != nil {
t.Fatalf("Failed to create logger: %v", err) t.Fatalf("Failed to create logger: %v", err)
} }
client := NewClient(logger, mockServer.URL, "testUID", "testNode", "1.0") return NewClient(logger, mockServerURL, "testUID", "testNode", "1.0", WithSendPeriod(500*time.Millisecond))
}
func withMockServer(t *testing.T, expectedType TelemetryType, testFunc func(t *testing.T, client *Client, wg *sync.WaitGroup)) {
var wg sync.WaitGroup
wg.Add(1) // Expecting one request
mockServer := createMockServer(t, &wg, expectedType)
defer mockServer.Close()
client := createClient(t, mockServer.URL)
testFunc(t, client, &wg)
// Wait for the request to be received
wg.Wait()
}
func TestClient_ProcessReceivedMessages(t *testing.T) {
withMockServer(t, ReceivedMessagesMetric, func(t *testing.T, client *Client, wg *sync.WaitGroup) {
// Create a telemetry request to send // Create a telemetry request to send
data := ReceivedMessages{ data := ReceivedMessages{
Filter: transport.Filter{ Filter: transport.Filter{
ChatID: "testChat", ChatID: "testChat",
PubsubTopic: "testTopic", PubsubTopic: "testTopic",
ContentTopic: types.StringToTopic("testContentTopic"), ContentTopic: types.StringToTopic(testContentTopic),
}, },
SSHMessage: &types.Message{ SSHMessage: &types.Message{
Hash: []byte("hash"), Hash: []byte("hash"),
@ -88,26 +116,15 @@ func TestClient_ProcessReceivedMessages(t *testing.T) {
// Send the telemetry request // Send the telemetry request
client.pushTelemetryRequest([]TelemetryRequest{telemetryRequest}) client.pushTelemetryRequest([]TelemetryRequest{telemetryRequest})
})
} }
func TestClient_ProcessReceivedEnvelope(t *testing.T) { func TestClient_ProcessReceivedEnvelope(t *testing.T) {
// Setup a mock server to handle post requests withMockServer(t, ReceivedEnvelopeMetric, func(t *testing.T, client *Client, wg *sync.WaitGroup) {
mockServer := createMockServer(t)
defer mockServer.Close()
// Create a client with the mock server URL
config := zap.NewDevelopmentConfig()
config.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
logger, err := config.Build()
if err != nil {
t.Fatalf("Failed to create logger: %v", err)
}
client := NewClient(logger, mockServer.URL, "testUID", "testNode", "1.0")
// Create a telemetry request to send // Create a telemetry request to send
envelope := v2protocol.NewEnvelope(&pb.WakuMessage{ envelope := v2protocol.NewEnvelope(&pb.WakuMessage{
Payload: []byte{1, 2, 3, 4, 5}, Payload: []byte{1, 2, 3, 4, 5},
ContentTopic: "testContentTopic", ContentTopic: testContentTopic,
Version: proto.Uint32(0), Version: proto.Uint32(0),
Timestamp: proto.Int64(time.Now().Unix()), Timestamp: proto.Int64(time.Now().Unix()),
}, 0, "") }, 0, "")
@ -120,27 +137,16 @@ func TestClient_ProcessReceivedEnvelope(t *testing.T) {
// Send the telemetry request // Send the telemetry request
client.pushTelemetryRequest([]TelemetryRequest{telemetryRequest}) client.pushTelemetryRequest([]TelemetryRequest{telemetryRequest})
})
} }
func TestClient_ProcessSentEnvelope(t *testing.T) { func TestClient_ProcessSentEnvelope(t *testing.T) {
// Setup a mock server to handle post requests withMockServer(t, SentEnvelopeMetric, func(t *testing.T, client *Client, wg *sync.WaitGroup) {
mockServer := createMockServer(t)
defer mockServer.Close()
// Create a client with the mock server URL
config := zap.NewDevelopmentConfig()
config.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
logger, err := config.Build()
if err != nil {
t.Fatalf("Failed to create logger: %v", err)
}
client := NewClient(logger, mockServer.URL, "testUID", "testNode", "1.0")
// Create a telemetry request to send // Create a telemetry request to send
sentEnvelope := wakuv2.SentEnvelope{ sentEnvelope := wakuv2.SentEnvelope{
Envelope: v2protocol.NewEnvelope(&pb.WakuMessage{ Envelope: v2protocol.NewEnvelope(&pb.WakuMessage{
Payload: []byte{1, 2, 3, 4, 5}, Payload: []byte{1, 2, 3, 4, 5},
ContentTopic: "testContentTopic", ContentTopic: testContentTopic,
Version: proto.Uint32(0), Version: proto.Uint32(0),
Timestamp: proto.Int64(time.Now().Unix()), Timestamp: proto.Int64(time.Now().Unix()),
}, 0, ""), }, 0, ""),
@ -155,4 +161,54 @@ func TestClient_ProcessSentEnvelope(t *testing.T) {
// Send the telemetry request // Send the telemetry request
client.pushTelemetryRequest([]TelemetryRequest{telemetryRequest}) client.pushTelemetryRequest([]TelemetryRequest{telemetryRequest})
})
}
var (
testENRBootstrap = "enrtree://AI4W5N5IFEUIHF5LESUAOSMV6TKWF2MB6GU2YK7PU4TYUGUNOCEPW@store.staging.shards.nodes.status.im"
)
func TestTelemetryUponPublishError(t *testing.T) {
withMockServer(t, ErrorSendingEnvelopeMetric, func(t *testing.T, client *Client, wg *sync.WaitGroup) {
enrTreeAddress := testENRBootstrap
envEnrTreeAddress := os.Getenv("ENRTREE_ADDRESS")
if envEnrTreeAddress != "" {
enrTreeAddress = envEnrTreeAddress
}
wakuConfig := &wakuv2.Config{}
wakuConfig.Port = 0
wakuConfig.EnablePeerExchangeClient = true
wakuConfig.LightClient = true
wakuConfig.EnableDiscV5 = false
wakuConfig.DiscV5BootstrapNodes = []string{enrTreeAddress}
wakuConfig.DiscoveryLimit = 20
wakuConfig.UseShardAsDefaultTopic = true
wakuConfig.ClusterID = 16
wakuConfig.WakuNodes = []string{enrTreeAddress}
wakuConfig.TelemetryServerURL = client.serverURL
wakuConfig.TelemetrySendPeriodMs = 500
w, err := wakuv2.New(nil, "", wakuConfig, nil, nil, nil, nil, nil)
require.NoError(t, err)
client.Start(context.Background())
w.SetStatusTelemetryClient(client)
// Setting this forces the publish function to fail when sending a message
w.SkipPublishToTopic(true)
err = w.Start()
require.NoError(t, err)
msg := &pb.WakuMessage{
Payload: []byte{1, 2, 3, 4, 5},
ContentTopic: testContentTopic,
Version: proto.Uint32(0),
Timestamp: proto.Int64(time.Now().Unix()),
}
// This should result in a single request sent by the telemetry client
_, err = w.Send(wakuConfig.DefaultShardPubsubTopic, msg)
require.NoError(t, err)
})
} }

View File

@ -61,6 +61,7 @@ type Config struct {
StoreCapacity int `toml:",omitempty"` StoreCapacity int `toml:",omitempty"`
StoreSeconds int `toml:",omitempty"` StoreSeconds int `toml:",omitempty"`
TelemetryServerURL string `toml:",omitempty"` TelemetryServerURL string `toml:",omitempty"`
TelemetrySendPeriodMs int `toml:",omitempty"` // Number of milliseconds to wait between sending requests to telemetry service
DefaultShardPubsubTopic string `toml:",omitempty"` // Pubsub topic to be used by default for messages that do not have a topic assigned (depending whether sharding is used or not) DefaultShardPubsubTopic string `toml:",omitempty"` // Pubsub topic to be used by default for messages that do not have a topic assigned (depending whether sharding is used or not)
DefaultShardedPubsubTopics []string `toml:", omitempty"` DefaultShardedPubsubTopics []string `toml:", omitempty"`
UseShardAsDefaultTopic bool `toml:",omitempty"` UseShardAsDefaultTopic bool `toml:",omitempty"`

View File

@ -95,9 +95,15 @@ type SentEnvelope struct {
PublishMethod PublishMethod PublishMethod PublishMethod
} }
type ErrorSendingEnvelope struct {
Error error
SentEnvelope SentEnvelope
}
type ITelemetryClient interface { type ITelemetryClient interface {
PushReceivedEnvelope(receivedEnvelope *v2protocol.Envelope) PushReceivedEnvelope(receivedEnvelope *v2protocol.Envelope)
PushSentEnvelope(sentEnvelope SentEnvelope) PushSentEnvelope(sentEnvelope SentEnvelope)
PushErrorSendingEnvelope(errorSendingEnvelope ErrorSendingEnvelope)
} }
// Waku represents a dark communication interface through the Ethereum // Waku represents a dark communication interface through the Ethereum
@ -993,6 +999,7 @@ func (w *Waku) broadcast() {
var publishMethod PublishMethod var publishMethod PublishMethod
if w.cfg.SkipPublishToTopic { if w.cfg.SkipPublishToTopic {
// For now only used in testing to simulate going offline // For now only used in testing to simulate going offline
publishMethod = LightPush
fn = func(env *protocol.Envelope, logger *zap.Logger) error { fn = func(env *protocol.Envelope, logger *zap.Logger) error {
return errors.New("test send failure") return errors.New("test send failure")
} }
@ -1020,11 +1027,9 @@ func (w *Waku) broadcast() {
err := sendFn(env, logger) err := sendFn(env, logger)
if err == nil { if err == nil {
w.statusTelemetryClient.PushSentEnvelope(SentEnvelope{Envelope: env, PublishMethod: publishMethod}) w.statusTelemetryClient.PushSentEnvelope(SentEnvelope{Envelope: env, PublishMethod: publishMethod})
} else {
w.statusTelemetryClient.PushErrorSendingEnvelope(ErrorSendingEnvelope{Error: err, SentEnvelope: SentEnvelope{Envelope: env, PublishMethod: publishMethod}})
} }
// else {
// TODO: send error from Relay or LightPush to Telemetry
// w.statusTelemetryClient.PushError(err)
// }
return err return err
} }
} }