From 59342332661317a2390efb69657b651f0a9d458d Mon Sep 17 00:00:00 2001 From: Arseniy Klempner Date: Fri, 28 Jun 2024 03:24:04 -0700 Subject: [PATCH] feat_: call telemetry upon error pushing envelope (#5430) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat_: call telemetry upon error pushing envelope * feat_: call telemetry upon error pushing envelope --------- Co-authored-by: Václav Pavlín --- cmd/status-cli/main.go | 7 + cmd/status-cli/message.go | 39 ++++- cmd/status-cli/simulate.go | 5 +- multiaccounts/settings/structs.go | 1 + protocol/messenger_config.go | 8 +- services/ext/service.go | 2 +- telemetry/client.go | 115 ++++++++++----- telemetry/client_test.go | 238 ++++++++++++++++++------------ wakuv2/config.go | 1 + wakuv2/waku.go | 13 +- 10 files changed, 288 insertions(+), 141 deletions(-) diff --git a/cmd/status-cli/main.go b/cmd/status-cli/main.go index 48a9b5490..f094031fd 100644 --- a/cmd/status-cli/main.go +++ b/cmd/status-cli/main.go @@ -22,6 +22,7 @@ const APIModulesFlag = "api-modules" const TelemetryServerURLFlag = "telemetry-server-url" const KeyUIDFlag = "key-uid" const DebugLevel = "debug" +const MessageFailureFlag = "fail" const RetrieveInterval = 300 * time.Millisecond const SendInterval = 1 * time.Second @@ -64,6 +65,12 @@ var SimulateFlags = append([]cli.Flag{ Value: 1, Usage: "How many messages to sent from each user", }, + &cli.BoolFlag{ + Name: MessageFailureFlag, + Aliases: []string{"f"}, + Usage: "Causes messages to fail about 25% of the time", + Value: false, + }, }, CommonFlags...) var ServeFlags = append([]cli.Flag{ diff --git a/cmd/status-cli/message.go b/cmd/status-cli/message.go index 005edff40..de695a9a9 100644 --- a/cmd/status-cli/message.go +++ b/cmd/status-cli/message.go @@ -3,7 +3,9 @@ package main import ( "bufio" "context" + "crypto/rand" "log/slog" + "math/big" "os" "strings" "sync" @@ -41,7 +43,30 @@ func (cli *StatusCLI) sendContactRequestAcceptance(ctx context.Context, msgID st return nil } -func (cli *StatusCLI) sendDirectMessage(ctx context.Context, text string) error { +func (cli *StatusCLI) randomFailure() func() { + nBig, err := rand.Int(rand.Reader, big.NewInt(100)) + if err != nil { + cli.logger.Error("failed to generate random number", "err", err) + return nil + } + n := nBig.Int64() + if n >= 40 { + return nil + } + + cli.backend.StatusNode().WakuV2Service().SkipPublishToTopic(true) + + return func() { + cli.backend.StatusNode().WakuV2Service().SkipPublishToTopic(false) + } +} + +func (cli *StatusCLI) sendDirectMessage(ctx context.Context, text string, options ...bool) error { + randomFailure := false + if len(options) > 0 { + randomFailure = options[0] + } + if len(cli.messenger.MutualContacts()) == 0 { return nil } @@ -58,8 +83,20 @@ func (cli *StatusCLI) sendDirectMessage(ctx context.Context, text string) error inputMessage.ContentType = protobuf.ChatMessage_TEXT_PLAIN inputMessage.Text = text + shouldFail := false + if randomFailure { + if postFailure := cli.randomFailure(); postFailure != nil { + defer postFailure() + shouldFail = true + } + } resp, err := cli.messenger.SendChatMessage(ctx, inputMessage) if err != nil { + if shouldFail { + cli.logger.Info("simulating message failure") + cli.logger.Error("error sending message", "err", err) + return nil + } return err } diff --git a/cmd/status-cli/simulate.go b/cmd/status-cli/simulate.go index b50d0c2ed..d7f703888 100644 --- a/cmd/status-cli/simulate.go +++ b/cmd/status-cli/simulate.go @@ -33,6 +33,7 @@ func simulate(cCtx *cli.Context) error { // Start messengers apiModules := cCtx.String(APIModulesFlag) telemetryUrl := cCtx.String(TelemetryServerURLFlag) + failMessages := cCtx.Bool(MessageFailureFlag) alice, err := start("Alice", 0, apiModules, telemetryUrl, "", logger.Named("alice")) if err != nil { @@ -76,13 +77,13 @@ func simulate(cCtx *cli.Context) error { interactiveSendMessageLoop(ctx, alice, charlie) } else { for i := 0; i < cCtx.Int(CountFlag); i++ { - err = alice.sendDirectMessage(ctx, fmt.Sprintf("message from alice, number: %d", i+1)) + err = alice.sendDirectMessage(ctx, fmt.Sprintf("message from alice, number: %d", i+1), failMessages) if err != nil { return err } time.Sleep(WaitingInterval) - err = charlie.sendDirectMessage(ctx, fmt.Sprintf("message from charlie, number: %d", i+1)) + err = charlie.sendDirectMessage(ctx, fmt.Sprintf("message from charlie, number: %d", i+1), failMessages) if err != nil { return err } diff --git a/multiaccounts/settings/structs.go b/multiaccounts/settings/structs.go index 81e306def..916f13975 100644 --- a/multiaccounts/settings/structs.go +++ b/multiaccounts/settings/structs.go @@ -208,6 +208,7 @@ type Settings struct { GifFavorites *json.RawMessage `json:"gifs/favorite-gifs"` OpenseaEnabled bool `json:"opensea-enabled?,omitempty"` TelemetryServerURL string `json:"telemetry-server-url,omitempty"` + TelemetrySendPeriodMs int `json:"telemetry-send-period-ms,omitempty"` LastBackup uint64 `json:"last-backup,omitempty"` BackupEnabled bool `json:"backup-enabled?,omitempty"` AutoMessageEnabled bool `json:"auto-message-enabled?,omitempty"` diff --git a/protocol/messenger_config.go b/protocol/messenger_config.go index 6763daeb6..d98d9edea 100644 --- a/protocol/messenger_config.go +++ b/protocol/messenger_config.go @@ -112,8 +112,9 @@ type config struct { messengerSignalsHandler MessengerSignalsHandler - telemetryServerURL string - wakuService *wakuv2.Waku + telemetryServerURL string + telemetrySendPeriod time.Duration + wakuService *wakuv2.Waku messageResendMinDelay time.Duration messageResendMaxCount int @@ -258,9 +259,10 @@ func WithAnonMetricsServerConfig(anonMetricsServerConfig *anonmetrics.ServerConf } } -func WithTelemetry(serverURL string) Option { +func WithTelemetry(serverURL string, sendPeriod time.Duration) Option { return func(c *config) error { c.telemetryServerURL = serverURL + c.telemetrySendPeriod = sendPeriod return nil } } diff --git a/services/ext/service.go b/services/ext/service.go index fcb364c68..95d1b49c7 100644 --- a/services/ext/service.go +++ b/services/ext/service.go @@ -464,7 +464,7 @@ func buildMessengerOptions( } if settings.TelemetryServerURL != "" { - options = append(options, protocol.WithTelemetry(settings.TelemetryServerURL)) + options = append(options, protocol.WithTelemetry(settings.TelemetryServerURL, time.Duration(settings.TelemetrySendPeriodMs)*time.Millisecond)) } if settings.PushNotificationsServerEnabled { diff --git a/telemetry/client.go b/telemetry/client.go index 155f140a1..dae442ef5 100644 --- a/telemetry/client.go +++ b/telemetry/client.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "net/http" + "sync" "time" "go.uber.org/zap" @@ -22,11 +23,12 @@ import ( type TelemetryType string const ( - ProtocolStatsMetric TelemetryType = "ProtocolStats" - ReceivedEnvelopeMetric TelemetryType = "ReceivedEnvelope" - SentEnvelopeMetric TelemetryType = "SentEnvelope" - UpdateEnvelopeMetric TelemetryType = "UpdateEnvelope" - ReceivedMessagesMetric TelemetryType = "ReceivedMessages" + ProtocolStatsMetric TelemetryType = "ProtocolStats" + ReceivedEnvelopeMetric TelemetryType = "ReceivedEnvelope" + SentEnvelopeMetric TelemetryType = "SentEnvelope" + UpdateEnvelopeMetric TelemetryType = "UpdateEnvelope" + ReceivedMessagesMetric TelemetryType = "ReceivedMessages" + ErrorSendingEnvelopeMetric TelemetryType = "ErrorSendingEnvelope" ) type TelemetryRequest struct { @@ -36,15 +38,19 @@ type TelemetryRequest struct { } func (c *Client) PushReceivedMessages(receivedMessages ReceivedMessages) { - c.receivedMessagesCh <- receivedMessages + c.processAndPushTelemetry(receivedMessages) } func (c *Client) PushSentEnvelope(sentEnvelope wakuv2.SentEnvelope) { - c.sentEnvelopeCh <- sentEnvelope + c.processAndPushTelemetry(sentEnvelope) } func (c *Client) PushReceivedEnvelope(receivedEnvelope *v2protocol.Envelope) { - c.receivedEnvelopeCh <- receivedEnvelope + c.processAndPushTelemetry(receivedEnvelope) +} + +func (c *Client) PushErrorSendingEnvelope(errorSendingEnvelope wakuv2.ErrorSendingEnvelope) { + c.processAndPushTelemetry(errorSendingEnvelope) } type ReceivedMessages struct { @@ -60,50 +66,59 @@ type Client struct { keyUID string nodeName string version string - receivedMessagesCh chan ReceivedMessages - receivedEnvelopeCh chan *v2protocol.Envelope - sentEnvelopeCh chan wakuv2.SentEnvelope telemetryCh chan TelemetryRequest + telemetryCacheLock sync.Mutex + telemetryCache []TelemetryRequest + nextIdLock sync.Mutex nextId int sendPeriod time.Duration } -func NewClient(logger *zap.Logger, serverURL string, keyUID string, nodeName string, version string) *Client { - return &Client{ +type TelemetryClientOption func(*Client) + +func WithSendPeriod(sendPeriod time.Duration) TelemetryClientOption { + return func(c *Client) { + c.sendPeriod = sendPeriod + } +} + +func NewClient(logger *zap.Logger, serverURL string, keyUID string, nodeName string, version string, opts ...TelemetryClientOption) *Client { + client := &Client{ serverURL: serverURL, httpClient: &http.Client{Timeout: time.Minute}, logger: logger, keyUID: keyUID, nodeName: nodeName, version: version, - receivedMessagesCh: make(chan ReceivedMessages), - receivedEnvelopeCh: make(chan *v2protocol.Envelope), - sentEnvelopeCh: make(chan wakuv2.SentEnvelope), telemetryCh: make(chan TelemetryRequest), + telemetryCacheLock: sync.Mutex{}, + telemetryCache: make([]TelemetryRequest, 0), nextId: 0, - sendPeriod: 10 * time.Second, + nextIdLock: sync.Mutex{}, + sendPeriod: 10 * time.Second, // default value } + + for _, opt := range opts { + opt(client) + } + + return client } -func (c *Client) CollectAndProcessTelemetry(ctx context.Context) { +func (c *Client) Start(ctx context.Context) { go func() { + for { select { - case receivedMessages := <-c.receivedMessagesCh: - c.processAndPushTelemetry(receivedMessages) - case receivedEnvelope := <-c.receivedEnvelopeCh: - c.processAndPushTelemetry(receivedEnvelope) - case sentEnvelope := <-c.sentEnvelopeCh: - c.processAndPushTelemetry(sentEnvelope) + case telemetryRequest := <-c.telemetryCh: + c.telemetryCacheLock.Lock() + c.telemetryCache = append(c.telemetryCache, telemetryRequest) + c.telemetryCacheLock.Unlock() case <-ctx.Done(): return } } }() -} - -func (c *Client) Start(ctx context.Context) { - go c.CollectAndProcessTelemetry(ctx) go func() { ticker := time.NewTicker(c.sendPeriod) defer ticker.Stop() @@ -111,16 +126,12 @@ func (c *Client) Start(ctx context.Context) { for { select { case <-ticker.C: - var telemetryRequests []TelemetryRequest - collecting := true - for collecting { - select { - case telemetryRequest := <-c.telemetryCh: - telemetryRequests = append(telemetryRequests, telemetryRequest) - default: - collecting = false - } - } + c.telemetryCacheLock.Lock() + telemetryRequests := make([]TelemetryRequest, len(c.telemetryCache)) + copy(telemetryRequests, c.telemetryCache) + c.telemetryCache = nil + c.telemetryCacheLock.Unlock() + if len(telemetryRequests) > 0 { c.pushTelemetryRequest(telemetryRequests) } @@ -128,6 +139,7 @@ func (c *Client) Start(ctx context.Context) { return } } + }() } @@ -152,13 +164,21 @@ func (c *Client) processAndPushTelemetry(data interface{}) { TelemetryType: SentEnvelopeMetric, TelemetryData: c.ProcessSentEnvelope(v), } + case wakuv2.ErrorSendingEnvelope: + telemetryRequest = TelemetryRequest{ + Id: c.nextId, + TelemetryType: ErrorSendingEnvelopeMetric, + TelemetryData: c.ProcessErrorSendingEnvelope(v), + } default: c.logger.Error("Unknown telemetry data type") return } - c.nextId++ c.telemetryCh <- telemetryRequest + c.nextIdLock.Lock() + c.nextId++ + c.nextIdLock.Unlock() } func (c *Client) pushTelemetryRequest(request []TelemetryRequest) { @@ -224,6 +244,23 @@ func (c *Client) ProcessSentEnvelope(sentEnvelope wakuv2.SentEnvelope) *json.Raw return &jsonRawMessage } +func (c *Client) ProcessErrorSendingEnvelope(errorSendingEnvelope wakuv2.ErrorSendingEnvelope) *json.RawMessage { + postBody := map[string]interface{}{ + "messageHash": errorSendingEnvelope.SentEnvelope.Envelope.Hash().String(), + "sentAt": uint32(errorSendingEnvelope.SentEnvelope.Envelope.Message().GetTimestamp() / int64(time.Second)), + "pubsubTopic": errorSendingEnvelope.SentEnvelope.Envelope.PubsubTopic(), + "topic": errorSendingEnvelope.SentEnvelope.Envelope.Message().ContentTopic, + "senderKeyUID": c.keyUID, + "nodeName": c.nodeName, + "publishMethod": errorSendingEnvelope.SentEnvelope.PublishMethod.String(), + "statusVersion": c.version, + "error": errorSendingEnvelope.Error.Error(), + } + body, _ := json.Marshal(postBody) + jsonRawMessage := json.RawMessage(body) + return &jsonRawMessage +} + func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) { c.logger.Debug("Pushing envelope update to telemetry server", zap.String("hash", types.EncodeHex(shhMessage.Hash))) url := fmt.Sprintf("%s/update-envelope", c.serverURL) diff --git a/telemetry/client_test.go b/telemetry/client_test.go index 6e2c13ee3..c4afcf993 100644 --- a/telemetry/client_test.go +++ b/telemetry/client_test.go @@ -1,9 +1,12 @@ package telemetry import ( + "context" "encoding/json" "net/http" "net/http/httptest" + "os" + "sync" "testing" "time" @@ -13,14 +16,22 @@ import ( v2protocol "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/stretchr/testify/require" + "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/protocol/transport" v1protocol "github.com/status-im/status-go/protocol/v1" "github.com/status-im/status-go/wakuv2" ) -func createMockServer(t *testing.T) *httptest.Server { +var ( + testContentTopic = "/waku/1/0x12345679/rfc26" +) + +func createMockServer(t *testing.T, wg *sync.WaitGroup, expectedType TelemetryType) *httptest.Server { return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + defer wg.Done() // Signal that a request was received + if r.Method != "POST" { t.Errorf("Expected 'POST' request, got '%s'", r.Method) } @@ -38,121 +49,166 @@ func createMockServer(t *testing.T) *httptest.Server { if len(received) != 1 { t.Errorf("Unexpected data received: %+v", received) } else { - // If the data is as expected, respond with success - t.Log("Responding with success") - w.WriteHeader(http.StatusOK) + if received[0].TelemetryType != expectedType { + t.Errorf("Unexpected telemetry type: got %v, want %v", received[0].TelemetryType, expectedType) + } else { + // If the data is as expected, respond with success + t.Log("Responding with success") + w.WriteHeader(http.StatusOK) + } } })) } -func TestClient_ProcessReceivedMessages(t *testing.T) { - // Setup a mock server to handle post requests - mockServer := createMockServer(t) - defer mockServer.Close() - - // Create a client with the mock server URL +func createClient(t *testing.T, mockServerURL string) *Client { config := zap.NewDevelopmentConfig() config.Level = zap.NewAtomicLevelAt(zap.DebugLevel) logger, err := config.Build() if err != nil { t.Fatalf("Failed to create logger: %v", err) } - client := NewClient(logger, mockServer.URL, "testUID", "testNode", "1.0") + return NewClient(logger, mockServerURL, "testUID", "testNode", "1.0", WithSendPeriod(500*time.Millisecond)) +} - // Create a telemetry request to send - data := ReceivedMessages{ - Filter: transport.Filter{ - ChatID: "testChat", - PubsubTopic: "testTopic", - ContentTopic: types.StringToTopic("testContentTopic"), - }, - SSHMessage: &types.Message{ - Hash: []byte("hash"), - Timestamp: uint32(time.Now().Unix()), - }, - Messages: []*v1protocol.StatusMessage{ - { - ApplicationLayer: v1protocol.ApplicationLayer{ - ID: types.HexBytes("123"), - Type: 1, +func withMockServer(t *testing.T, expectedType TelemetryType, testFunc func(t *testing.T, client *Client, wg *sync.WaitGroup)) { + var wg sync.WaitGroup + wg.Add(1) // Expecting one request + + mockServer := createMockServer(t, &wg, expectedType) + defer mockServer.Close() + + client := createClient(t, mockServer.URL) + + testFunc(t, client, &wg) + + // Wait for the request to be received + wg.Wait() +} + +func TestClient_ProcessReceivedMessages(t *testing.T) { + withMockServer(t, ReceivedMessagesMetric, func(t *testing.T, client *Client, wg *sync.WaitGroup) { + // Create a telemetry request to send + data := ReceivedMessages{ + Filter: transport.Filter{ + ChatID: "testChat", + PubsubTopic: "testTopic", + ContentTopic: types.StringToTopic(testContentTopic), + }, + SSHMessage: &types.Message{ + Hash: []byte("hash"), + Timestamp: uint32(time.Now().Unix()), + }, + Messages: []*v1protocol.StatusMessage{ + { + ApplicationLayer: v1protocol.ApplicationLayer{ + ID: types.HexBytes("123"), + Type: 1, + }, }, }, - }, - } - telemetryData := client.ProcessReceivedMessages(data) - telemetryRequest := TelemetryRequest{ - Id: 1, - TelemetryType: ReceivedMessagesMetric, - TelemetryData: telemetryData, - } + } + telemetryData := client.ProcessReceivedMessages(data) + telemetryRequest := TelemetryRequest{ + Id: 1, + TelemetryType: ReceivedMessagesMetric, + TelemetryData: telemetryData, + } - // Send the telemetry request - client.pushTelemetryRequest([]TelemetryRequest{telemetryRequest}) + // Send the telemetry request + client.pushTelemetryRequest([]TelemetryRequest{telemetryRequest}) + }) } func TestClient_ProcessReceivedEnvelope(t *testing.T) { - // Setup a mock server to handle post requests - mockServer := createMockServer(t) - defer mockServer.Close() + withMockServer(t, ReceivedEnvelopeMetric, func(t *testing.T, client *Client, wg *sync.WaitGroup) { + // Create a telemetry request to send + envelope := v2protocol.NewEnvelope(&pb.WakuMessage{ + Payload: []byte{1, 2, 3, 4, 5}, + ContentTopic: testContentTopic, + Version: proto.Uint32(0), + Timestamp: proto.Int64(time.Now().Unix()), + }, 0, "") + telemetryData := client.ProcessReceivedEnvelope(envelope) + telemetryRequest := TelemetryRequest{ + Id: 2, + TelemetryType: ReceivedEnvelopeMetric, + TelemetryData: telemetryData, + } - // Create a client with the mock server URL - config := zap.NewDevelopmentConfig() - config.Level = zap.NewAtomicLevelAt(zap.DebugLevel) - logger, err := config.Build() - if err != nil { - t.Fatalf("Failed to create logger: %v", err) - } - client := NewClient(logger, mockServer.URL, "testUID", "testNode", "1.0") - - // Create a telemetry request to send - envelope := v2protocol.NewEnvelope(&pb.WakuMessage{ - Payload: []byte{1, 2, 3, 4, 5}, - ContentTopic: "testContentTopic", - Version: proto.Uint32(0), - Timestamp: proto.Int64(time.Now().Unix()), - }, 0, "") - telemetryData := client.ProcessReceivedEnvelope(envelope) - telemetryRequest := TelemetryRequest{ - Id: 2, - TelemetryType: ReceivedEnvelopeMetric, - TelemetryData: telemetryData, - } - - // Send the telemetry request - client.pushTelemetryRequest([]TelemetryRequest{telemetryRequest}) + // Send the telemetry request + client.pushTelemetryRequest([]TelemetryRequest{telemetryRequest}) + }) } func TestClient_ProcessSentEnvelope(t *testing.T) { - // Setup a mock server to handle post requests - mockServer := createMockServer(t) - defer mockServer.Close() + withMockServer(t, SentEnvelopeMetric, func(t *testing.T, client *Client, wg *sync.WaitGroup) { + // Create a telemetry request to send + sentEnvelope := wakuv2.SentEnvelope{ + Envelope: v2protocol.NewEnvelope(&pb.WakuMessage{ + Payload: []byte{1, 2, 3, 4, 5}, + ContentTopic: testContentTopic, + Version: proto.Uint32(0), + Timestamp: proto.Int64(time.Now().Unix()), + }, 0, ""), + PublishMethod: wakuv2.LightPush, + } + telemetryData := client.ProcessSentEnvelope(sentEnvelope) + telemetryRequest := TelemetryRequest{ + Id: 3, + TelemetryType: SentEnvelopeMetric, + TelemetryData: telemetryData, + } - // Create a client with the mock server URL - config := zap.NewDevelopmentConfig() - config.Level = zap.NewAtomicLevelAt(zap.DebugLevel) - logger, err := config.Build() - if err != nil { - t.Fatalf("Failed to create logger: %v", err) - } - client := NewClient(logger, mockServer.URL, "testUID", "testNode", "1.0") + // Send the telemetry request + client.pushTelemetryRequest([]TelemetryRequest{telemetryRequest}) + }) +} - // Create a telemetry request to send - sentEnvelope := wakuv2.SentEnvelope{ - Envelope: v2protocol.NewEnvelope(&pb.WakuMessage{ +var ( + testENRBootstrap = "enrtree://AI4W5N5IFEUIHF5LESUAOSMV6TKWF2MB6GU2YK7PU4TYUGUNOCEPW@store.staging.shards.nodes.status.im" +) + +func TestTelemetryUponPublishError(t *testing.T) { + withMockServer(t, ErrorSendingEnvelopeMetric, func(t *testing.T, client *Client, wg *sync.WaitGroup) { + enrTreeAddress := testENRBootstrap + envEnrTreeAddress := os.Getenv("ENRTREE_ADDRESS") + if envEnrTreeAddress != "" { + enrTreeAddress = envEnrTreeAddress + } + + wakuConfig := &wakuv2.Config{} + wakuConfig.Port = 0 + wakuConfig.EnablePeerExchangeClient = true + wakuConfig.LightClient = true + wakuConfig.EnableDiscV5 = false + wakuConfig.DiscV5BootstrapNodes = []string{enrTreeAddress} + wakuConfig.DiscoveryLimit = 20 + wakuConfig.UseShardAsDefaultTopic = true + wakuConfig.ClusterID = 16 + wakuConfig.WakuNodes = []string{enrTreeAddress} + wakuConfig.TelemetryServerURL = client.serverURL + wakuConfig.TelemetrySendPeriodMs = 500 + w, err := wakuv2.New(nil, "", wakuConfig, nil, nil, nil, nil, nil) + require.NoError(t, err) + + client.Start(context.Background()) + w.SetStatusTelemetryClient(client) + + // Setting this forces the publish function to fail when sending a message + w.SkipPublishToTopic(true) + + err = w.Start() + require.NoError(t, err) + + msg := &pb.WakuMessage{ Payload: []byte{1, 2, 3, 4, 5}, - ContentTopic: "testContentTopic", + ContentTopic: testContentTopic, Version: proto.Uint32(0), Timestamp: proto.Int64(time.Now().Unix()), - }, 0, ""), - PublishMethod: wakuv2.LightPush, - } - telemetryData := client.ProcessSentEnvelope(sentEnvelope) - telemetryRequest := TelemetryRequest{ - Id: 3, - TelemetryType: SentEnvelopeMetric, - TelemetryData: telemetryData, - } + } - // Send the telemetry request - client.pushTelemetryRequest([]TelemetryRequest{telemetryRequest}) + // This should result in a single request sent by the telemetry client + _, err = w.Send(wakuConfig.DefaultShardPubsubTopic, msg) + require.NoError(t, err) + }) } diff --git a/wakuv2/config.go b/wakuv2/config.go index e84adfaa7..9785bad66 100644 --- a/wakuv2/config.go +++ b/wakuv2/config.go @@ -61,6 +61,7 @@ type Config struct { StoreCapacity int `toml:",omitempty"` StoreSeconds int `toml:",omitempty"` TelemetryServerURL string `toml:",omitempty"` + TelemetrySendPeriodMs int `toml:",omitempty"` // Number of milliseconds to wait between sending requests to telemetry service DefaultShardPubsubTopic string `toml:",omitempty"` // Pubsub topic to be used by default for messages that do not have a topic assigned (depending whether sharding is used or not) DefaultShardedPubsubTopics []string `toml:", omitempty"` UseShardAsDefaultTopic bool `toml:",omitempty"` diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 99c38f047..cf2cd1dc1 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -95,9 +95,15 @@ type SentEnvelope struct { PublishMethod PublishMethod } +type ErrorSendingEnvelope struct { + Error error + SentEnvelope SentEnvelope +} + type ITelemetryClient interface { PushReceivedEnvelope(receivedEnvelope *v2protocol.Envelope) PushSentEnvelope(sentEnvelope SentEnvelope) + PushErrorSendingEnvelope(errorSendingEnvelope ErrorSendingEnvelope) } // Waku represents a dark communication interface through the Ethereum @@ -993,6 +999,7 @@ func (w *Waku) broadcast() { var publishMethod PublishMethod if w.cfg.SkipPublishToTopic { // For now only used in testing to simulate going offline + publishMethod = LightPush fn = func(env *protocol.Envelope, logger *zap.Logger) error { return errors.New("test send failure") } @@ -1020,11 +1027,9 @@ func (w *Waku) broadcast() { err := sendFn(env, logger) if err == nil { w.statusTelemetryClient.PushSentEnvelope(SentEnvelope{Envelope: env, PublishMethod: publishMethod}) + } else { + w.statusTelemetryClient.PushErrorSendingEnvelope(ErrorSendingEnvelope{Error: err, SentEnvelope: SentEnvelope{Envelope: env, PublishMethod: publishMethod}}) } - // else { - // TODO: send error from Relay or LightPush to Telemetry - // w.statusTelemetryClient.PushError(err) - // } return err } }