feat: Send envelopes to telemetry service

Issue #12430
This commit is contained in:
Michal Iskierko 2023-10-30 15:51:57 +01:00 committed by Michał Iskierko
parent 752d1a47f6
commit 02e4cc6e1f
7 changed files with 79 additions and 5 deletions

View File

@ -2299,7 +2299,7 @@ func (b *GethStatusBackend) injectAccountsIntoWakuService(w types.WakuKeyManager
} }
if st != nil { if st != nil {
if err := st.InitProtocol(b.statusNode.GethNode().Config().Name, identity, b.appDB, b.walletDB, b.statusNode.HTTPServer(), b.multiaccountsDB, acc, b.accountManager, b.statusNode.RPCClient(), b.statusNode.WalletService(), b.statusNode.CommunityTokensService(), logutils.ZapLogger()); err != nil { if err := st.InitProtocol(b.statusNode.GethNode().Config().Name, identity, b.appDB, b.walletDB, b.statusNode.HTTPServer(), b.multiaccountsDB, acc, b.accountManager, b.statusNode.RPCClient(), b.statusNode.WalletService(), b.statusNode.CommunityTokensService(), b.statusNode.WakuV2Service(), logutils.ZapLogger()); err != nil {
return err return err
} }
// Set initial connection state // Set initial connection state

View File

@ -412,6 +412,9 @@ func NewMessenger(
var telemetryClient *telemetry.Client var telemetryClient *telemetry.Client
if c.telemetryServerURL != "" { if c.telemetryServerURL != "" {
telemetryClient = telemetry.NewClient(logger, c.telemetryServerURL, c.account.KeyUID, nodeName) telemetryClient = telemetry.NewClient(logger, c.telemetryServerURL, c.account.KeyUID, nodeName)
if c.wakuService != nil {
c.wakuService.SetStatusTelemetryClient(telemetryClient)
}
} }
// Initialize push notification server // Initialize push notification server
@ -3625,6 +3628,9 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
statusMessages, acks, err := m.sender.HandleMessages(shhMessage) statusMessages, acks, err := m.sender.HandleMessages(shhMessage)
if err != nil { if err != nil {
if m.telemetryClient != nil {
go m.telemetryClient.UpdateEnvelopeProcessingError(shhMessage, err)
}
logger.Info("failed to decode messages", zap.Error(err)) logger.Info("failed to decode messages", zap.Error(err))
continue continue
} }

View File

@ -8,6 +8,7 @@ import (
"github.com/status-im/status-go/server" "github.com/status-im/status-go/server"
"github.com/status-im/status-go/services/browsers" "github.com/status-im/status-go/services/browsers"
"github.com/status-im/status-go/services/communitytokens" "github.com/status-im/status-go/services/communitytokens"
"github.com/status-im/status-go/wakuv2"
"go.uber.org/zap" "go.uber.org/zap"
@ -112,6 +113,7 @@ type config struct {
messengerSignalsHandler MessengerSignalsHandler messengerSignalsHandler MessengerSignalsHandler
telemetryServerURL string telemetryServerURL string
wakuService *wakuv2.Waku
} }
type Option func(*config) error type Option func(*config) error
@ -346,6 +348,13 @@ func WithCommunityTokensService(s communitytokens.ServiceInterface) Option {
} }
} }
func WithWakuService(s *wakuv2.Waku) Option {
return func(c *config) error {
c.wakuService = s
return nil
}
}
func WithTokenManager(tokenManager communities.TokenManager) Option { func WithTokenManager(tokenManager communities.TokenManager) Option {
return func(c *config) error { return func(c *config) error {
c.tokenManager = tokenManager c.tokenManager = tokenManager

View File

@ -55,6 +55,7 @@ import (
"github.com/status-im/status-go/services/wallet" "github.com/status-im/status-go/services/wallet"
w_common "github.com/status-im/status-go/services/wallet/common" w_common "github.com/status-im/status-go/services/wallet/common"
"github.com/status-im/status-go/services/wallet/thirdparty" "github.com/status-im/status-go/services/wallet/thirdparty"
"github.com/status-im/status-go/wakuv2"
) )
const infinityString = "∞" const infinityString = "∞"
@ -121,7 +122,7 @@ func (s *Service) GetPeer(rawURL string) (*enode.Node, error) {
return enode.ParseV4(rawURL) return enode.ParseV4(rawURL)
} }
func (s *Service) InitProtocol(nodeName string, identity *ecdsa.PrivateKey, appDb, walletDb *sql.DB, httpServer *server.MediaServer, multiAccountDb *multiaccounts.Database, acc *multiaccounts.Account, accountManager *account.GethManager, rpcClient *rpc.Client, walletService *wallet.Service, communityTokensService *communitytokens.Service, logger *zap.Logger) error { func (s *Service) InitProtocol(nodeName string, identity *ecdsa.PrivateKey, appDb, walletDb *sql.DB, httpServer *server.MediaServer, multiAccountDb *multiaccounts.Database, acc *multiaccounts.Account, accountManager *account.GethManager, rpcClient *rpc.Client, walletService *wallet.Service, communityTokensService *communitytokens.Service, wakuService *wakuv2.Waku, logger *zap.Logger) error {
var err error var err error
if !s.config.ShhextConfig.PFSEnabled { if !s.config.ShhextConfig.PFSEnabled {
return nil return nil
@ -160,7 +161,7 @@ func (s *Service) InitProtocol(nodeName string, identity *ecdsa.PrivateKey, appD
s.multiAccountsDB = multiAccountDb s.multiAccountsDB = multiAccountDb
s.account = acc s.account = acc
options, err := buildMessengerOptions(s.config, identity, appDb, walletDb, httpServer, s.rpcClient, s.multiAccountsDB, acc, envelopesMonitorConfig, s.accountsDB, walletService, communityTokensService, logger, &MessengerSignalsHandler{}) options, err := buildMessengerOptions(s.config, identity, appDb, walletDb, httpServer, s.rpcClient, s.multiAccountsDB, acc, envelopesMonitorConfig, s.accountsDB, walletService, communityTokensService, wakuService, logger, &MessengerSignalsHandler{})
if err != nil { if err != nil {
return err return err
} }
@ -419,6 +420,7 @@ func buildMessengerOptions(
accountsDB *accounts.Database, accountsDB *accounts.Database,
walletService *wallet.Service, walletService *wallet.Service,
communityTokensService *communitytokens.Service, communityTokensService *communitytokens.Service,
wakuService *wakuv2.Waku,
logger *zap.Logger, logger *zap.Logger,
messengerSignalsHandler protocol.MessengerSignalsHandler, messengerSignalsHandler protocol.MessengerSignalsHandler,
) ([]protocol.Option, error) { ) ([]protocol.Option, error) {
@ -442,6 +444,7 @@ func buildMessengerOptions(
protocol.WithWalletConfig(&config.WalletConfig), protocol.WithWalletConfig(&config.WalletConfig),
protocol.WithWalletService(walletService), protocol.WithWalletService(walletService),
protocol.WithCommunityTokensService(communityTokensService), protocol.WithCommunityTokensService(communityTokensService),
protocol.WithWakuService(wakuService),
} }
if config.ShhextConfig.DataSyncEnabled { if config.ShhextConfig.DataSyncEnabled {

View File

@ -136,7 +136,7 @@ func TestInitProtocol(t *testing.T) {
defer func() { require.NoError(t, cleanupWalletDB()) }() defer func() { require.NoError(t, cleanupWalletDB()) }()
require.NoError(t, err) require.NoError(t, err)
err = service.InitProtocol("Test", privateKey, appDB, walletDB, nil, multiAccounts, acc, nil, nil, nil, nil, zap.NewNop()) err = service.InitProtocol("Test", privateKey, appDB, walletDB, nil, multiAccounts, acc, nil, nil, nil, nil, nil, zap.NewNop())
require.NoError(t, err) require.NoError(t, err)
} }
@ -207,7 +207,7 @@ func (s *ShhExtSuite) createAndAddNode() {
walletDB, err := helpers.SetupTestMemorySQLDB(&walletdatabase.DbInitializer{}) walletDB, err := helpers.SetupTestMemorySQLDB(&walletdatabase.DbInitializer{})
s.Require().NoError(err) s.Require().NoError(err)
err = service.InitProtocol("Test", privateKey, appDB, walletDB, nil, multiAccounts, acc, nil, nil, nil, nil, zap.NewNop()) err = service.InitProtocol("Test", privateKey, appDB, walletDB, nil, multiAccounts, acc, nil, nil, nil, nil, nil, zap.NewNop())
s.NoError(err) s.NoError(err)
stack.RegisterLifecycle(service) stack.RegisterLifecycle(service)

View File

@ -12,6 +12,8 @@ import (
"github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/transport" "github.com/status-im/status-go/protocol/transport"
v1protocol "github.com/status-im/status-go/protocol/v1" v1protocol "github.com/status-im/status-go/protocol/v1"
v2protocol "github.com/waku-org/go-waku/waku/v2/protocol"
) )
type Client struct { type Client struct {
@ -56,3 +58,43 @@ func (c *Client) PushReceivedMessages(filter transport.Filter, sshMessage *types
c.logger.Error("Error sending message to telemetry server", zap.Error(err)) c.logger.Error("Error sending message to telemetry server", zap.Error(err))
} }
} }
func (c *Client) PushReceivedEnvelope(envelope *v2protocol.Envelope) {
url := fmt.Sprintf("%s/received-envelope", c.serverURL)
postBody := map[string]interface{}{
"messageHash": types.EncodeHex(envelope.Hash()),
"sentAt": uint32(envelope.Message().Timestamp / int64(time.Second)),
"pubsubTopic": envelope.PubsubTopic(),
"topic": envelope.Message().ContentTopic,
"receiverKeyUID": c.keyUID,
"nodeName": c.nodeName,
}
body, _ := json.Marshal(postBody)
_, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(body))
if err != nil {
c.logger.Error("Error sending envelope to telemetry server", zap.Error(err))
}
}
func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) {
c.logger.Debug("Pushing envelope update to telemetry server", zap.String("hash", types.EncodeHex(shhMessage.Hash)))
url := fmt.Sprintf("%s/update-envelope", c.serverURL)
var errorString = ""
if processingError != nil {
errorString = processingError.Error()
}
postBody := map[string]interface{}{
"messageHash": types.EncodeHex(shhMessage.Hash),
"sentAt": shhMessage.Timestamp,
"pubsubTopic": shhMessage.PubsubTopic,
"topic": shhMessage.Topic,
"receiverKeyUID": c.keyUID,
"nodeName": c.nodeName,
"processingError": errorString,
}
body, _ := json.Marshal(postBody)
_, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(body))
if err != nil {
c.logger.Error("Error sending envelope update to telemetry server", zap.Error(err))
}
}

View File

@ -90,6 +90,10 @@ type settings struct {
Options []node.WakuNodeOption Options []node.WakuNodeOption
} }
type ITelemetryClient interface {
PushReceivedEnvelope(*protocol.Envelope)
}
// Waku represents a dark communication interface through the Ethereum // Waku represents a dark communication interface through the Ethereum
// network, using its very own P2P communication layer. // network, using its very own P2P communication layer.
type Waku struct { type Waku struct {
@ -155,6 +159,12 @@ type Waku struct {
onHistoricMessagesRequestFailed func([]byte, peer.ID, error) onHistoricMessagesRequestFailed func([]byte, peer.ID, error)
onPeerStats func(types.ConnStatus) onPeerStats func(types.ConnStatus)
statusTelemetryClient ITelemetryClient
}
func (w *Waku) SetStatusTelemetryClient(client ITelemetryClient) {
w.statusTelemetryClient = client
} }
func getUsableUDPPort() (int, error) { func getUsableUDPPort() (int, error) {
@ -1290,6 +1300,10 @@ func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.Messag
return nil return nil
} }
if w.statusTelemetryClient != nil {
w.statusTelemetryClient.PushReceivedEnvelope(envelope)
}
logger := w.logger.With(zap.String("hash", recvMessage.Hash().Hex())) logger := w.logger.With(zap.String("hash", recvMessage.Hash().Hex()))
logger.Debug("received new envelope") logger.Debug("received new envelope")