diff --git a/api/geth_backend.go b/api/geth_backend.go index bf3d33b17..c29477955 100644 --- a/api/geth_backend.go +++ b/api/geth_backend.go @@ -2299,7 +2299,7 @@ func (b *GethStatusBackend) injectAccountsIntoWakuService(w types.WakuKeyManager } if st != nil { - if err := st.InitProtocol(b.statusNode.GethNode().Config().Name, identity, b.appDB, b.walletDB, b.statusNode.HTTPServer(), b.multiaccountsDB, acc, b.accountManager, b.statusNode.RPCClient(), b.statusNode.WalletService(), b.statusNode.CommunityTokensService(), logutils.ZapLogger()); err != nil { + if err := st.InitProtocol(b.statusNode.GethNode().Config().Name, identity, b.appDB, b.walletDB, b.statusNode.HTTPServer(), b.multiaccountsDB, acc, b.accountManager, b.statusNode.RPCClient(), b.statusNode.WalletService(), b.statusNode.CommunityTokensService(), b.statusNode.WakuV2Service(), logutils.ZapLogger()); err != nil { return err } // Set initial connection state diff --git a/protocol/messenger.go b/protocol/messenger.go index efd097255..ef1e3ce1e 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -412,6 +412,9 @@ func NewMessenger( var telemetryClient *telemetry.Client if c.telemetryServerURL != "" { telemetryClient = telemetry.NewClient(logger, c.telemetryServerURL, c.account.KeyUID, nodeName) + if c.wakuService != nil { + c.wakuService.SetStatusTelemetryClient(telemetryClient) + } } // Initialize push notification server @@ -3625,6 +3628,9 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte statusMessages, acks, err := m.sender.HandleMessages(shhMessage) if err != nil { + if m.telemetryClient != nil { + go m.telemetryClient.UpdateEnvelopeProcessingError(shhMessage, err) + } logger.Info("failed to decode messages", zap.Error(err)) continue } diff --git a/protocol/messenger_config.go b/protocol/messenger_config.go index e63e6aee8..6e9919476 100644 --- a/protocol/messenger_config.go +++ b/protocol/messenger_config.go @@ -8,6 +8,7 @@ import ( "github.com/status-im/status-go/server" "github.com/status-im/status-go/services/browsers" "github.com/status-im/status-go/services/communitytokens" + "github.com/status-im/status-go/wakuv2" "go.uber.org/zap" @@ -112,6 +113,7 @@ type config struct { messengerSignalsHandler MessengerSignalsHandler telemetryServerURL string + wakuService *wakuv2.Waku } type Option func(*config) error @@ -346,6 +348,13 @@ func WithCommunityTokensService(s communitytokens.ServiceInterface) Option { } } +func WithWakuService(s *wakuv2.Waku) Option { + return func(c *config) error { + c.wakuService = s + return nil + } +} + func WithTokenManager(tokenManager communities.TokenManager) Option { return func(c *config) error { c.tokenManager = tokenManager diff --git a/services/ext/service.go b/services/ext/service.go index 238f2cec4..374df01a2 100644 --- a/services/ext/service.go +++ b/services/ext/service.go @@ -55,6 +55,7 @@ import ( "github.com/status-im/status-go/services/wallet" w_common "github.com/status-im/status-go/services/wallet/common" "github.com/status-im/status-go/services/wallet/thirdparty" + "github.com/status-im/status-go/wakuv2" ) const infinityString = "∞" @@ -121,7 +122,7 @@ func (s *Service) GetPeer(rawURL string) (*enode.Node, error) { return enode.ParseV4(rawURL) } -func (s *Service) InitProtocol(nodeName string, identity *ecdsa.PrivateKey, appDb, walletDb *sql.DB, httpServer *server.MediaServer, multiAccountDb *multiaccounts.Database, acc *multiaccounts.Account, accountManager *account.GethManager, rpcClient *rpc.Client, walletService *wallet.Service, communityTokensService *communitytokens.Service, logger *zap.Logger) error { +func (s *Service) InitProtocol(nodeName string, identity *ecdsa.PrivateKey, appDb, walletDb *sql.DB, httpServer *server.MediaServer, multiAccountDb *multiaccounts.Database, acc *multiaccounts.Account, accountManager *account.GethManager, rpcClient *rpc.Client, walletService *wallet.Service, communityTokensService *communitytokens.Service, wakuService *wakuv2.Waku, logger *zap.Logger) error { var err error if !s.config.ShhextConfig.PFSEnabled { return nil @@ -160,7 +161,7 @@ func (s *Service) InitProtocol(nodeName string, identity *ecdsa.PrivateKey, appD s.multiAccountsDB = multiAccountDb s.account = acc - options, err := buildMessengerOptions(s.config, identity, appDb, walletDb, httpServer, s.rpcClient, s.multiAccountsDB, acc, envelopesMonitorConfig, s.accountsDB, walletService, communityTokensService, logger, &MessengerSignalsHandler{}) + options, err := buildMessengerOptions(s.config, identity, appDb, walletDb, httpServer, s.rpcClient, s.multiAccountsDB, acc, envelopesMonitorConfig, s.accountsDB, walletService, communityTokensService, wakuService, logger, &MessengerSignalsHandler{}) if err != nil { return err } @@ -419,6 +420,7 @@ func buildMessengerOptions( accountsDB *accounts.Database, walletService *wallet.Service, communityTokensService *communitytokens.Service, + wakuService *wakuv2.Waku, logger *zap.Logger, messengerSignalsHandler protocol.MessengerSignalsHandler, ) ([]protocol.Option, error) { @@ -442,6 +444,7 @@ func buildMessengerOptions( protocol.WithWalletConfig(&config.WalletConfig), protocol.WithWalletService(walletService), protocol.WithCommunityTokensService(communityTokensService), + protocol.WithWakuService(wakuService), } if config.ShhextConfig.DataSyncEnabled { diff --git a/services/wakuext/api_test.go b/services/wakuext/api_test.go index e76d0354d..d20ac095d 100644 --- a/services/wakuext/api_test.go +++ b/services/wakuext/api_test.go @@ -136,7 +136,7 @@ func TestInitProtocol(t *testing.T) { defer func() { require.NoError(t, cleanupWalletDB()) }() require.NoError(t, err) - err = service.InitProtocol("Test", privateKey, appDB, walletDB, nil, multiAccounts, acc, nil, nil, nil, nil, zap.NewNop()) + err = service.InitProtocol("Test", privateKey, appDB, walletDB, nil, multiAccounts, acc, nil, nil, nil, nil, nil, zap.NewNop()) require.NoError(t, err) } @@ -207,7 +207,7 @@ func (s *ShhExtSuite) createAndAddNode() { walletDB, err := helpers.SetupTestMemorySQLDB(&walletdatabase.DbInitializer{}) s.Require().NoError(err) - err = service.InitProtocol("Test", privateKey, appDB, walletDB, nil, multiAccounts, acc, nil, nil, nil, nil, zap.NewNop()) + err = service.InitProtocol("Test", privateKey, appDB, walletDB, nil, multiAccounts, acc, nil, nil, nil, nil, nil, zap.NewNop()) s.NoError(err) stack.RegisterLifecycle(service) diff --git a/telemetry/client.go b/telemetry/client.go index c934d8f14..c06b61a90 100644 --- a/telemetry/client.go +++ b/telemetry/client.go @@ -12,6 +12,8 @@ import ( "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/protocol/transport" v1protocol "github.com/status-im/status-go/protocol/v1" + + v2protocol "github.com/waku-org/go-waku/waku/v2/protocol" ) type Client struct { @@ -56,3 +58,43 @@ func (c *Client) PushReceivedMessages(filter transport.Filter, sshMessage *types c.logger.Error("Error sending message to telemetry server", zap.Error(err)) } } + +func (c *Client) PushReceivedEnvelope(envelope *v2protocol.Envelope) { + url := fmt.Sprintf("%s/received-envelope", c.serverURL) + postBody := map[string]interface{}{ + "messageHash": types.EncodeHex(envelope.Hash()), + "sentAt": uint32(envelope.Message().Timestamp / int64(time.Second)), + "pubsubTopic": envelope.PubsubTopic(), + "topic": envelope.Message().ContentTopic, + "receiverKeyUID": c.keyUID, + "nodeName": c.nodeName, + } + body, _ := json.Marshal(postBody) + _, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(body)) + if err != nil { + c.logger.Error("Error sending envelope to telemetry server", zap.Error(err)) + } +} + +func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) { + c.logger.Debug("Pushing envelope update to telemetry server", zap.String("hash", types.EncodeHex(shhMessage.Hash))) + url := fmt.Sprintf("%s/update-envelope", c.serverURL) + var errorString = "" + if processingError != nil { + errorString = processingError.Error() + } + postBody := map[string]interface{}{ + "messageHash": types.EncodeHex(shhMessage.Hash), + "sentAt": shhMessage.Timestamp, + "pubsubTopic": shhMessage.PubsubTopic, + "topic": shhMessage.Topic, + "receiverKeyUID": c.keyUID, + "nodeName": c.nodeName, + "processingError": errorString, + } + body, _ := json.Marshal(postBody) + _, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(body)) + if err != nil { + c.logger.Error("Error sending envelope update to telemetry server", zap.Error(err)) + } +} diff --git a/wakuv2/waku.go b/wakuv2/waku.go index f16974cb7..be09fd390 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -90,6 +90,10 @@ type settings struct { Options []node.WakuNodeOption } +type ITelemetryClient interface { + PushReceivedEnvelope(*protocol.Envelope) +} + // Waku represents a dark communication interface through the Ethereum // network, using its very own P2P communication layer. type Waku struct { @@ -155,6 +159,12 @@ type Waku struct { onHistoricMessagesRequestFailed func([]byte, peer.ID, error) onPeerStats func(types.ConnStatus) + + statusTelemetryClient ITelemetryClient +} + +func (w *Waku) SetStatusTelemetryClient(client ITelemetryClient) { + w.statusTelemetryClient = client } func getUsableUDPPort() (int, error) { @@ -1290,6 +1300,10 @@ func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.Messag return nil } + if w.statusTelemetryClient != nil { + w.statusTelemetryClient.PushReceivedEnvelope(envelope) + } + logger := w.logger.With(zap.String("hash", recvMessage.Hash().Hex())) logger.Debug("received new envelope")