feat: Push messages to telemetry server when receiving messages

This commit is contained in:
Anthony Laibe 2021-11-03 13:38:37 +01:00
parent 7440c184c0
commit c95c29bba5
13 changed files with 412 additions and 368 deletions

View File

@ -1 +1 @@
0.89.18 0.89.19

View File

@ -1116,7 +1116,7 @@ func (b *GethStatusBackend) injectAccountsIntoServices() error {
st := b.statusNode.WakuExtService() st := b.statusNode.WakuExtService()
if st != nil { if st != nil {
if err := st.InitProtocol(identity, b.appDB, b.multiaccountsDB, acc, logutils.ZapLogger()); err != nil { if err := st.InitProtocol(b.statusNode.GethNode().Config().Name, identity, b.appDB, b.multiaccountsDB, acc, logutils.ZapLogger()); err != nil {
return err return err
} }
// Set initial connection state // Set initial connection state
@ -1141,7 +1141,7 @@ func (b *GethStatusBackend) injectAccountsIntoServices() error {
} }
st := b.statusNode.WakuV2ExtService() st := b.statusNode.WakuV2ExtService()
if err := st.InitProtocol(identity, b.appDB, b.multiaccountsDB, acc, logutils.ZapLogger()); err != nil { if err := st.InitProtocol(b.statusNode.GethNode().Config().Name, identity, b.appDB, b.multiaccountsDB, acc, logutils.ZapLogger()); err != nil {
return err return err
} }
} }

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,2 @@
ALTER TABLE settings ADD COLUMN telemetry_server_url VARCHAR NOT NULL DEFAULT "";
UPDATE settings SET telemetry_server_url = "";

View File

@ -211,7 +211,13 @@ func main() {
protocol.WithDatabase(db), protocol.WithDatabase(db),
} }
messenger, err := protocol.NewMessenger(identity, gethbridge.NewNodeBridge(backend.StatusNode().GethNode(), backend.StatusNode().WakuService(), backend.StatusNode().WakuV2Service()), installationID.String(), options...) messenger, err := protocol.NewMessenger(
config.Name,
identity,
gethbridge.NewNodeBridge(backend.StatusNode().GethNode(), backend.StatusNode().WakuService(), backend.StatusNode().WakuV2Service()),
installationID.String(),
options...,
)
if err != nil { if err != nil {
logger.Error("failed to create messenger", "error", err) logger.Error("failed to create messenger", "error", err)
return return

View File

@ -136,6 +136,7 @@ type Settings struct {
GifRecents *json.RawMessage `json:"gifs/recent-gifs"` GifRecents *json.RawMessage `json:"gifs/recent-gifs"`
GifFavorites *json.RawMessage `json:"gifs/favorite-gifs"` GifFavorites *json.RawMessage `json:"gifs/favorite-gifs"`
OpenseaEnabled bool `json:"opensea-enabled?,omitempty"` OpenseaEnabled bool `json:"opensea-enabled?,omitempty"`
TelemetryServerURL string `json:"telemetry-server-url,omitempty"`
} }
func NewDB(db *sql.DB) *Database { func NewDB(db *sql.DB) *Database {
@ -427,6 +428,8 @@ func (db *Database) SaveSetting(setting string, value interface{}) error {
return ErrInvalidConfig return ErrInvalidConfig
} }
update, err = db.db.Prepare("UPDATE settings SET opensea_enabled = ? WHERE synthetic_id = 'id'") update, err = db.db.Prepare("UPDATE settings SET opensea_enabled = ? WHERE synthetic_id = 'id'")
case "telemetry-server-url":
update, err = db.db.Prepare("UPDATE settings SET telemetry_server_url = ? WHERE synthetic_id = 'id'")
default: default:
return ErrInvalidConfig return ErrInvalidConfig
} }
@ -443,7 +446,7 @@ func (db *Database) GetNodeConfig(nodecfg interface{}) error {
func (db *Database) GetSettings() (Settings, error) { func (db *Database) GetSettings() (Settings, error) {
var s Settings var s Settings
err := db.db.QueryRow("SELECT address, anon_metrics_should_send, chaos_mode, currency, current_network, custom_bootnodes, custom_bootnodes_enabled, dapps_address, eip1581_address, fleet, hide_home_tooltip, installation_id, key_uid, keycard_instance_uid, keycard_paired_on, keycard_pairing, last_updated, latest_derived_path, link_preview_request_enabled, link_previews_enabled_sites, log_level, mnemonic, name, networks, notifications_enabled, push_notifications_server_enabled, push_notifications_from_contacts_only, remote_push_notifications_enabled, send_push_notifications, push_notifications_block_mentions, photo_path, pinned_mailservers, preferred_name, preview_privacy, public_key, remember_syncing_choice, signing_phrase, stickers_packs_installed, stickers_packs_pending, stickers_recent_stickers, syncing_on_mobile_network, default_sync_period, use_mailservers, messages_from_contacts_only, usernames, appearance, profile_pictures_show_to, profile_pictures_visibility, wallet_root_address, wallet_set_up_passed, wallet_visible_tokens, waku_bloom_filter_mode, webview_allow_permission_requests, current_user_status, send_status_updates, gif_recents, gif_favorites, opensea_enabled FROM settings WHERE synthetic_id = 'id'").Scan( err := db.db.QueryRow("SELECT address, anon_metrics_should_send, chaos_mode, currency, current_network, custom_bootnodes, custom_bootnodes_enabled, dapps_address, eip1581_address, fleet, hide_home_tooltip, installation_id, key_uid, keycard_instance_uid, keycard_paired_on, keycard_pairing, last_updated, latest_derived_path, link_preview_request_enabled, link_previews_enabled_sites, log_level, mnemonic, name, networks, notifications_enabled, push_notifications_server_enabled, push_notifications_from_contacts_only, remote_push_notifications_enabled, send_push_notifications, push_notifications_block_mentions, photo_path, pinned_mailservers, preferred_name, preview_privacy, public_key, remember_syncing_choice, signing_phrase, stickers_packs_installed, stickers_packs_pending, stickers_recent_stickers, syncing_on_mobile_network, default_sync_period, use_mailservers, messages_from_contacts_only, usernames, appearance, profile_pictures_show_to, profile_pictures_visibility, wallet_root_address, wallet_set_up_passed, wallet_visible_tokens, waku_bloom_filter_mode, webview_allow_permission_requests, current_user_status, send_status_updates, gif_recents, gif_favorites, opensea_enabled, telemetry_server_url FROM settings WHERE synthetic_id = 'id'").Scan(
&s.Address, &s.Address,
&s.AnonMetricsShouldSend, &s.AnonMetricsShouldSend,
&s.ChaosMode, &s.ChaosMode,
@ -502,6 +505,7 @@ func (db *Database) GetSettings() (Settings, error) {
&sqlite.JSONBlob{Data: &s.GifRecents}, &sqlite.JSONBlob{Data: &s.GifRecents},
&sqlite.JSONBlob{Data: &s.GifFavorites}, &sqlite.JSONBlob{Data: &s.GifFavorites},
&s.OpenseaEnabled, &s.OpenseaEnabled,
&s.TelemetryServerURL,
) )
return s, err return s, err
} }

View File

@ -68,6 +68,7 @@ func (s *MessengerCommunitiesSuite) TearDownTest() {
func (s *MessengerCommunitiesSuite) newMessengerWithOptions(shh types.Waku, privateKey *ecdsa.PrivateKey, options []Option) *Messenger { func (s *MessengerCommunitiesSuite) newMessengerWithOptions(shh types.Waku, privateKey *ecdsa.PrivateKey, options []Option) *Messenger {
m, err := NewMessenger( m, err := NewMessenger(
"Test",
privateKey, privateKey,
&testNode{shh: shh}, &testNode{shh: shh},
uuid.New().String(), uuid.New().String(),

View File

@ -47,6 +47,8 @@ import (
"github.com/status-im/status-go/protocol/transport" "github.com/status-im/status-go/protocol/transport"
v1protocol "github.com/status-im/status-go/protocol/v1" v1protocol "github.com/status-im/status-go/protocol/v1"
"github.com/status-im/status-go/services/mailservers" "github.com/status-im/status-go/services/mailservers"
"github.com/status-im/status-go/telemetry"
) )
//todo: kozieiev: get rid of wakutransp word //todo: kozieiev: get rid of wakutransp word
@ -110,6 +112,7 @@ type Messenger struct {
quit chan struct{} quit chan struct{}
requestedCommunities map[string]*transport.Filter requestedCommunities map[string]*transport.Filter
connectionState connection.State connectionState connection.State
telemetryClient *telemetry.Client
// TODO(samyoul) Determine if/how the remaining usage of this mutex can be removed // TODO(samyoul) Determine if/how the remaining usage of this mutex can be removed
mutex sync.Mutex mutex sync.Mutex
@ -160,6 +163,7 @@ func (interceptor EnvelopeEventsInterceptor) MailServerRequestExpired(hash types
} }
func NewMessenger( func NewMessenger(
nodeName string,
identity *ecdsa.PrivateKey, identity *ecdsa.PrivateKey,
node types.Node, node types.Node,
installationID string, installationID string,
@ -299,6 +303,11 @@ func NewMessenger(
anonMetricsServer.Logger = logger anonMetricsServer.Logger = logger
} }
var telemetryClient *telemetry.Client
if c.telemetryServerURL != "" {
telemetryClient = telemetry.NewClient(logger, c.telemetryServerURL, c.account.KeyUID, nodeName)
}
// Initialize push notification server // Initialize push notification server
var pushNotificationServer *pushnotificationserver.Server var pushNotificationServer *pushnotificationserver.Server
if c.pushNotificationServerConfig != nil && c.pushNotificationServerConfig.Enabled { if c.pushNotificationServerConfig != nil && c.pushNotificationServerConfig.Enabled {
@ -339,6 +348,7 @@ func NewMessenger(
sender: sender, sender: sender,
anonMetricsClient: anonMetricsClient, anonMetricsClient: anonMetricsClient,
anonMetricsServer: anonMetricsServer, anonMetricsServer: anonMetricsServer,
telemetryClient: telemetryClient,
pushNotificationClient: pushNotificationClient, pushNotificationClient: pushNotificationClient,
pushNotificationServer: pushNotificationServer, pushNotificationServer: pushNotificationServer,
communitiesManager: communitiesManager, communitiesManager: communitiesManager,
@ -2682,7 +2692,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
logger := m.logger.With(zap.String("site", "RetrieveAll")) logger := m.logger.With(zap.String("site", "RetrieveAll"))
for _, messages := range chatWithMessages { for filter, messages := range chatWithMessages {
var processedMessages []string var processedMessages []string
for _, shhMessage := range messages { for _, shhMessage := range messages {
logger := logger.With(zap.String("hash", types.EncodeHex(shhMessage.Hash))) logger := logger.With(zap.String("hash", types.EncodeHex(shhMessage.Hash)))
@ -2693,6 +2703,10 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
logger.Info("failed to decode messages", zap.Error(err)) logger.Info("failed to decode messages", zap.Error(err))
continue continue
} }
if m.telemetryClient != nil {
go m.telemetryClient.PushReceivedMessages(filter, shhMessage, statusMessages)
}
m.markDeliveredMessages(acks) m.markDeliveredMessages(acks)
logger.Debug("processing messages further", zap.Int("count", len(statusMessages))) logger.Debug("processing messages further", zap.Int("count", len(statusMessages)))

View File

@ -67,6 +67,8 @@ type config struct {
logger *zap.Logger logger *zap.Logger
messengerSignalsHandler MessengerSignalsHandler messengerSignalsHandler MessengerSignalsHandler
telemetryServerURL string
} }
type Option func(*config) error type Option func(*config) error
@ -171,6 +173,13 @@ func WithAnonMetricsServerConfig(anonMetricsServerConfig *anonmetrics.ServerConf
} }
} }
func WithTelemetry(serverURL string) Option {
return func(c *config) error {
c.telemetryServerURL = serverURL
return nil
}
}
func WithPushNotificationServerConfig(pushNotificationServerConfig *pushnotificationserver.Config) Option { func WithPushNotificationServerConfig(pushNotificationServerConfig *pushnotificationserver.Config) Option {
return func(c *config) error { return func(c *config) error {
c.pushNotificationServerConfig = pushNotificationServerConfig c.pushNotificationServerConfig = pushNotificationServerConfig

View File

@ -138,6 +138,7 @@ func newMessengerWithKey(shh types.Waku, privateKey *ecdsa.PrivateKey, logger *z
options = append(options, extraOptions...) options = append(options, extraOptions...)
m, err := NewMessenger( m, err := NewMessenger(
"Test",
privateKey, privateKey,
&testNode{shh: shh}, &testNode{shh: shh},
uuid.New().String(), uuid.New().String(),

View File

@ -102,7 +102,7 @@ func (s *Service) GetPeer(rawURL string) (*enode.Node, error) {
return enode.ParseV4(rawURL) return enode.ParseV4(rawURL)
} }
func (s *Service) InitProtocol(identity *ecdsa.PrivateKey, db *sql.DB, multiAccountDb *multiaccounts.Database, acc *multiaccounts.Account, logger *zap.Logger) error { func (s *Service) InitProtocol(nodeName string, identity *ecdsa.PrivateKey, db *sql.DB, multiAccountDb *multiaccounts.Database, acc *multiaccounts.Account, logger *zap.Logger) error {
if !s.config.PFSEnabled { if !s.config.PFSEnabled {
return nil return nil
} }
@ -143,6 +143,7 @@ func (s *Service) InitProtocol(identity *ecdsa.PrivateKey, db *sql.DB, multiAcco
} }
messenger, err := protocol.NewMessenger( messenger, err := protocol.NewMessenger(
nodeName,
identity, identity,
s.n, s.n,
s.config.InstallationID, s.config.InstallationID,
@ -454,6 +455,10 @@ func buildMessengerOptions(
options = append(options, protocol.WithAnonMetricsServerConfig(amsc)) options = append(options, protocol.WithAnonMetricsServerConfig(amsc))
} }
if settings.TelemetryServerURL != "" {
options = append(options, protocol.WithTelemetry(settings.TelemetryServerURL))
}
if settings.PushNotificationsServerEnabled { if settings.PushNotificationsServerEnabled {
config := &pushnotificationserver.Config{ config := &pushnotificationserver.Config{
Enabled: true, Enabled: true,

View File

@ -132,7 +132,7 @@ func TestInitProtocol(t *testing.T) {
acc := &multiaccounts.Account{KeyUID: "0xdeadbeef"} acc := &multiaccounts.Account{KeyUID: "0xdeadbeef"}
err = service.InitProtocol(privateKey, sqlDB, multiAccounts, acc, zap.NewNop()) err = service.InitProtocol("Test", privateKey, sqlDB, multiAccounts, acc, zap.NewNop())
require.NoError(t, err) require.NoError(t, err)
} }
@ -195,7 +195,7 @@ func (s *ShhExtSuite) createAndAddNode() {
acc := &multiaccounts.Account{KeyUID: "0xdeadbeef"} acc := &multiaccounts.Account{KeyUID: "0xdeadbeef"}
err = service.InitProtocol(privateKey, sqlDB, multiAccounts, acc, zap.NewNop()) err = service.InitProtocol("Test", privateKey, sqlDB, multiAccounts, acc, zap.NewNop())
s.NoError(err) s.NoError(err)
stack.RegisterLifecycle(service) stack.RegisterLifecycle(service)

55
telemetry/client.go Normal file
View File

@ -0,0 +1,55 @@
package telemetry
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"time"
"go.uber.org/zap"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/transport"
v1protocol "github.com/status-im/status-go/protocol/v1"
)
type Client struct {
serverURL string
httpClient *http.Client
logger *zap.Logger
keyUID string
nodeName string
}
func NewClient(logger *zap.Logger, serverURL string, keyUID string, nodeName string) *Client {
return &Client{
serverURL: serverURL,
httpClient: &http.Client{Timeout: time.Minute},
logger: logger,
keyUID: keyUID,
nodeName: nodeName,
}
}
func (c *Client) PushReceivedMessages(filter transport.Filter, sshMessage *types.Message, messages []*v1protocol.StatusMessage) {
c.logger.Debug("Pushing received messages to telemetry server")
url := fmt.Sprintf("%s/received-messages", c.serverURL)
var postBody []map[string]interface{}
for _, message := range messages {
postBody = append(postBody, map[string]interface{}{
"chatId": filter.ChatID,
"messageHash": types.EncodeHex(sshMessage.Hash),
"messageId": message.ID,
"sentAt": sshMessage.Timestamp,
"topic": filter.Topic.String(),
"receiverKeyUID": c.keyUID,
"nodeName": c.nodeName,
})
}
body, _ := json.Marshal(postBody)
_, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(body))
if err != nil {
c.logger.Error("Error sending message to telemetry server", zap.Error(err))
}
}