From eaa5e662ff5e8138c05e9004918c55d741776419 Mon Sep 17 00:00:00 2001 From: Mohsen Date: Mon, 4 Nov 2024 21:01:14 +0300 Subject: [PATCH] feat_: use pubsub to send dapp changes to activity center notification --- api/geth_backend.go | 4 +- node/get_status_node.go | 9 +- node/status_node_services.go | 20 ++++- protocol/activity_center_persistence.go | 58 ++++++++++-- protocol/messenger_communities.go | 13 +++ protocol/messenger_walletconnect.go | 29 ------ protocol/migrations/migrations.go | 0 services/activitycenter/controller.go | 89 +++++++++++++++++++ services/activitycenter/service.go | 50 +++++++++++ services/wallet/api.go | 24 +++-- services/wallet/api_test.go | 2 +- services/wallet/keycard_pairings_test.go | 2 +- services/wallet/service.go | 3 + .../walletconnectevent/events.go | 19 ++++ .../walletconnectevent/watcher.go | 73 +++++++++++++++ 15 files changed, 343 insertions(+), 52 deletions(-) delete mode 100644 protocol/migrations/migrations.go create mode 100644 services/activitycenter/controller.go create mode 100644 services/activitycenter/service.go create mode 100644 services/wallet/walletconnect/walletconnectevent/events.go create mode 100644 services/wallet/walletconnect/walletconnectevent/watcher.go diff --git a/api/geth_backend.go b/api/geth_backend.go index d3d80f272..76b6dd773 100644 --- a/api/geth_backend.go +++ b/api/geth_backend.go @@ -2707,9 +2707,7 @@ func (b *GethStatusBackend) injectAccountsIntoWakuService(w types.WakuKeyManager b.statusNode.EnsService().Init(messenger.SyncEnsNamesWithDispatchMessage) b.statusNode.CommunityTokensService().Init(messenger) - if walletService := b.statusNode.WalletService(); walletService != nil { - walletService.InjectMessenger(messenger) - } + b.statusNode.ActivityCenterService().Init(messenger) } return nil diff --git a/node/get_status_node.go b/node/get_status_node.go index 0ec00dd85..6b8dcb5ab 100644 --- a/node/get_status_node.go +++ b/node/get_status_node.go @@ -11,6 +11,8 @@ import ( "reflect" "sync" + "github.com/status-im/status-go/services/activitycenter" + "github.com/syndtr/goleveldb/leveldb" "go.uber.org/zap" @@ -135,9 +137,11 @@ type StatusNode struct { connectorSrvc *connector.Service appGeneralSrvc *appgeneral.Service ethSrvc *eth.Service + activityCenterSrvc *activitycenter.Service - accountsFeed event.Feed - walletFeed event.Feed + accountsFeed event.Feed + walletFeed event.Feed + walletConnectFeed event.Feed } // New makes new instance of StatusNode. @@ -509,6 +513,7 @@ func (n *StatusNode) stop() error { n.publicMethods = make(map[string]bool) n.pendingTracker = nil n.appGeneralSrvc = nil + n.activityCenterSrvc = nil n.logger.Debug("status node stopped") return nil } diff --git a/node/status_node_services.go b/node/status_node_services.go index ee76a8354..dedcf17b1 100644 --- a/node/status_node_services.go +++ b/node/status_node_services.go @@ -10,6 +10,8 @@ import ( "reflect" "time" + "github.com/status-im/status-go/services/activitycenter" + "go.uber.org/zap" "github.com/status-im/status-go/protocol/common/shard" @@ -107,11 +109,12 @@ func (b *StatusNode) initServices(config *params.NodeConfig, mediaServer *server services = appendIf(config.ConnectorConfig.Enabled, services, b.connectorService()) services = append(services, b.gifService(accDB)) services = append(services, b.ChatService(accDB)) + services = append(services, b.activityCenterService(&b.walletConnectFeed)) // Wallet Service is used by wakuExtSrvc/wakuV2ExtSrvc // Keep this initialization before the other two if config.WalletConfig.Enabled { - walletService := b.walletService(accDB, b.appDB, &b.accountsFeed, settingsFeed, &b.walletFeed, config.WalletConfig.StatusProxyStageName) + walletService := b.walletService(accDB, b.appDB, &b.accountsFeed, settingsFeed, &b.walletFeed, &b.walletConnectFeed, config.WalletConfig.StatusProxyStageName) services = append(services, walletService) } @@ -589,13 +592,14 @@ func (b *StatusNode) SetWalletCommunityInfoProvider(provider thirdparty.Communit } } -func (b *StatusNode) walletService(accountsDB *accounts.Database, appDB *sql.DB, accountsFeed *event.Feed, settingsFeed *event.Feed, walletFeed *event.Feed, statusProxyStageName string) *wallet.Service { +func (b *StatusNode) walletService(accountsDB *accounts.Database, appDB *sql.DB, accountsFeed *event.Feed, settingsFeed *event.Feed, walletFeed *event.Feed, walletConnectFeed *event.Feed, statusProxyStageName string) *wallet.Service { if b.walletSrvc == nil { b.walletSrvc = wallet.NewService( b.walletDB, accountsDB, appDB, b.rpcClient, accountsFeed, settingsFeed, b.gethAccountManager, b.transactor, b.config, b.ensService(b.timeSourceNow()), b.pendingTracker, walletFeed, + walletConnectFeed, b.httpServer, statusProxyStageName, ) @@ -789,3 +793,15 @@ func (b *StatusNode) CallRPC(inputJSON string) (string, error) { return b.rpcClient.CallRaw(inputJSON), nil } + +func (b *StatusNode) activityCenterService(walletConnectFeed *event.Feed) *activitycenter.Service { + b.logger.Debug("Initializing activityCenterService activity center service") + if b.activityCenterSrvc == nil { + b.activityCenterSrvc = activitycenter.NewService(walletConnectFeed) + } + return b.activityCenterSrvc +} + +func (b *StatusNode) ActivityCenterService() *activitycenter.Service { + return b.activityCenterSrvc +} diff --git a/protocol/activity_center_persistence.go b/protocol/activity_center_persistence.go index eb8e32cba..72a19e03c 100644 --- a/protocol/activity_center_persistence.go +++ b/protocol/activity_center_persistence.go @@ -284,6 +284,10 @@ func (db sqlitePersistence) unmarshalActivityCenterNotificationRow(row *sql.Row) var name sql.NullString var author sql.NullString var installationID sql.NullString + var walletProviderSessionTopic sql.NullString + var dAppURL sql.NullString + var dAppName sql.NullString + var dAppIconURL sql.NullString notification := &ActivityCenterNotification{} err := row.Scan( ¬ification.ID, @@ -303,12 +307,12 @@ func (db sqlitePersistence) unmarshalActivityCenterNotificationRow(row *sql.Row) &name, &author, &tokenDataBytes, - ¬ification.WalletProviderSessionTopic, - ¬ification.DAppURL, - ¬ification.DAppName, - ¬ification.DAppIconURL, + &walletProviderSessionTopic, + &dAppURL, + &dAppName, + &dAppIconURL, ¬ification.UpdatedAt, - &installationID) + &installationID) if err != nil { return nil, err @@ -334,6 +338,22 @@ func (db sqlitePersistence) unmarshalActivityCenterNotificationRow(row *sql.Row) notification.InstallationID = installationID.String } + if walletProviderSessionTopic.Valid { + notification.WalletProviderSessionTopic = walletProviderSessionTopic.String + } + + if dAppURL.Valid { + notification.DAppURL = dAppURL.String + } + + if dAppName.Valid { + notification.DAppName = dAppName.String + } + + if dAppIconURL.Valid { + notification.DAppIconURL = dAppIconURL.String + } + if len(tokenDataBytes) > 0 { err = json.Unmarshal(tokenDataBytes, ¬ification.TokenData) if err != nil { @@ -384,6 +404,10 @@ func (db sqlitePersistence) unmarshalActivityCenterNotificationRows(rows *sql.Ro var name sql.NullString var author sql.NullString var installationID sql.NullString + var walletProviderSessionTopic sql.NullString + var dAppURL sql.NullString + var dAppName sql.NullString + var dAppIconURL sql.NullString notification := &ActivityCenterNotification{} err := rows.Scan( ¬ification.ID, @@ -402,10 +426,10 @@ func (db sqlitePersistence) unmarshalActivityCenterNotificationRows(rows *sql.Ro &name, &author, &tokenDataBytes, - ¬ification.WalletProviderSessionTopic, - ¬ification.DAppURL, - ¬ification.DAppName, - ¬ification.DAppIconURL, + &walletProviderSessionTopic, + &dAppURL, + &dAppName, + &dAppIconURL, &latestCursor, ¬ification.UpdatedAt, &installationID, @@ -434,6 +458,22 @@ func (db sqlitePersistence) unmarshalActivityCenterNotificationRows(rows *sql.Ro notification.InstallationID = installationID.String } + if walletProviderSessionTopic.Valid { + notification.WalletProviderSessionTopic = walletProviderSessionTopic.String + } + + if dAppURL.Valid { + notification.DAppURL = dAppURL.String + } + + if dAppName.Valid { + notification.DAppName = dAppName.String + } + + if dAppIconURL.Valid { + notification.DAppIconURL = dAppIconURL.String + } + if len(tokenDataBytes) > 0 { tokenData := &ActivityTokenData{} if err = json.Unmarshal(tokenDataBytes, &tokenData); err != nil { diff --git a/protocol/messenger_communities.go b/protocol/messenger_communities.go index ad7038858..07e844c5b 100644 --- a/protocol/messenger_communities.go +++ b/protocol/messenger_communities.go @@ -4932,6 +4932,19 @@ func (m *Messenger) AddActivityCenterNotificationToResponse(communityID string, } } +func (m *Messenger) AddNotificationToActivityCenter(notification *ActivityCenterNotification) error { + response := &MessengerResponse{} + err := m.addActivityCenterNotification(response, notification, nil) + if err != nil { + return err + } + + if m.config.messengerSignalsHandler != nil { + m.config.messengerSignalsHandler.MessengerResponse(response) + } + return nil +} + func (m *Messenger) leaveCommunityDueToKickOrBan(changes *communities.CommunityChanges, acType ActivityCenterType, stateResponse *MessengerResponse) { response, err := m.kickedOutOfCommunity(changes.Community.ID(), false) if err != nil { diff --git a/protocol/messenger_walletconnect.go b/protocol/messenger_walletconnect.go index 721d65cee..7ac4fb0bc 100644 --- a/protocol/messenger_walletconnect.go +++ b/protocol/messenger_walletconnect.go @@ -1,9 +1,7 @@ package protocol import ( - "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/protocol/requests" - "github.com/status-im/status-go/services/wallet/walletconnect" ) type WalletConnectSession struct { @@ -32,33 +30,6 @@ func (m *Messenger) AddWalletConnectSession(request *requests.AddWalletConnectSe return m.persistence.InsertWalletConnectSession(session) } -func (m *Messenger) NewWalletConnectV2SessionCreatedNotification(session walletconnect.Session) error { - now := m.GetCurrentTimeInMillis() - - notification := &ActivityCenterNotification{ - ID: types.FromHex(string(session.Topic) + "_dapp_connected"), - Type: ActivityCenterNotificationTypeDAppConnected, - DAppURL: session.Peer.Metadata.URL, - DAppName: session.Peer.Metadata.Name, - WalletProviderSessionTopic: string(session.Topic), - Timestamp: now, - UpdatedAt: now, - } - - if len(session.Peer.Metadata.Icons) > 0 { - notification.DAppIconURL = session.Peer.Metadata.Icons[0] - } - - response := &MessengerResponse{} - err := m.addActivityCenterNotification(response, notification, nil) - - if m.config.messengerSignalsHandler != nil { - m.config.messengerSignalsHandler.MessengerResponse(response) - } - - return err -} - func (m *Messenger) GetWalletConnectSession() ([]WalletConnectSession, error) { return m.getWalletConnectSession() diff --git a/protocol/migrations/migrations.go b/protocol/migrations/migrations.go deleted file mode 100644 index e69de29bb..000000000 diff --git a/services/activitycenter/controller.go b/services/activitycenter/controller.go new file mode 100644 index 000000000..458144b6d --- /dev/null +++ b/services/activitycenter/controller.go @@ -0,0 +1,89 @@ +package activitycenter + +import ( + "sync" + + "go.uber.org/zap" + + "github.com/ethereum/go-ethereum/event" + "github.com/status-im/status-go/eth-node/types" + "github.com/status-im/status-go/logutils" + "github.com/status-im/status-go/protocol" + "github.com/status-im/status-go/services/wallet/walletconnect" + "github.com/status-im/status-go/services/wallet/walletconnect/walletconnectevent" +) + +type Controller struct { + messenger *protocol.Messenger + walletConnectsFeed *event.Feed + walletConnectWatcher *walletconnectevent.Watcher + commandsLock sync.RWMutex +} + +func NewController(walletConnectsFeed *event.Feed) *Controller { + return &Controller{ + walletConnectsFeed: walletConnectsFeed, + } +} + +func (c *Controller) Start(messenger *protocol.Messenger) { + c.messenger = messenger + c.startWalletConnectConnectionWatcher() +} + +func (c *Controller) Stop() { + c.stopWalletConnectConnectionWatcher() +} + +func (c *Controller) startWalletConnectConnectionWatcher() { + logutils.ZapLogger().Debug("Starting wallet connect connection watcher") + if c.walletConnectWatcher != nil { + return + } + + walletConnectChangeCb := func(eventType walletconnectevent.EventType, session walletconnect.Session) { + c.commandsLock.Lock() + defer c.commandsLock.Unlock() + // Whenever an dApp gets added, add it to activity center + if eventType == walletconnectevent.EventTypeAdded { + err := c.createNewSessionActivityCenterNotification(&session) + if err != nil { + logutils.ZapLogger().Error("Failed to create new session activity center notification", zap.Error(err)) + } + + } + } + + c.walletConnectWatcher = walletconnectevent.NewWatcher(c.walletConnectsFeed, walletConnectChangeCb) + + c.walletConnectWatcher.Start() +} + +func (c *Controller) stopWalletConnectConnectionWatcher() { + if c.walletConnectWatcher != nil { + c.walletConnectWatcher.Stop() + c.walletConnectWatcher = nil + } +} + +func (c *Controller) createNewSessionActivityCenterNotification(session *walletconnect.Session) error { + now := c.messenger.GetCurrentTimeInMillis() + + logutils.ZapLogger().Info("Creating new session activity center notification", zap.Any("session", session)) + + notification := &protocol.ActivityCenterNotification{ + ID: types.FromHex(string(session.Topic) + "_dapp_connected"), + Type: protocol.ActivityCenterNotificationTypeDAppConnected, + DAppURL: session.Peer.Metadata.URL, + DAppName: session.Peer.Metadata.Name, + WalletProviderSessionTopic: string(session.Topic), + Timestamp: now, + UpdatedAt: now, + } + + if len(session.Peer.Metadata.Icons) > 0 { + notification.DAppIconURL = session.Peer.Metadata.Icons[0] + } + + return c.messenger.AddNotificationToActivityCenter(notification) +} diff --git a/services/activitycenter/service.go b/services/activitycenter/service.go new file mode 100644 index 000000000..198518aec --- /dev/null +++ b/services/activitycenter/service.go @@ -0,0 +1,50 @@ +package activitycenter + +import ( + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/rpc" + "github.com/status-im/status-go/logutils" + "github.com/status-im/status-go/protocol" +) + +type Service struct { + messenger *protocol.Messenger + walletConnectsFeed *event.Feed + controller *Controller +} + +func NewService(walletConnectFeed *event.Feed) *Service { + return &Service{ + walletConnectsFeed: walletConnectFeed, + controller: NewController(walletConnectFeed), + } +} + +func (s *Service) Init(messenger *protocol.Messenger) { + logutils.ZapLogger().Debug("Initializing activity center service") + s.messenger = messenger +} + +func (s *Service) Start() error { + if s.messenger != nil { + logutils.ZapLogger().Debug("Starting activity center service") + s.controller.Start(s.messenger) + } else { + logutils.ZapLogger().Error("Activity center service not started, messenger is nil") + } + return nil +} + +func (s *Service) Stop() error { + s.controller.Stop() + return nil +} + +func (s *Service) Protocols() []p2p.Protocol { + return nil +} + +func (s *Service) APIs() []rpc.API { + return nil +} diff --git a/services/wallet/api.go b/services/wallet/api.go index 03c822c22..d967d4f56 100644 --- a/services/wallet/api.go +++ b/services/wallet/api.go @@ -15,6 +15,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/ethclient" + "github.com/ethereum/go-ethereum/event" gethrpc "github.com/ethereum/go-ethereum/rpc" signercore "github.com/ethereum/go-ethereum/signer/core/apitypes" abi_spec "github.com/status-im/status-go/abi-spec" @@ -39,17 +40,20 @@ import ( "github.com/status-im/status-go/services/wallet/token" "github.com/status-im/status-go/services/wallet/transfer" "github.com/status-im/status-go/services/wallet/walletconnect" + "github.com/status-im/status-go/services/wallet/walletconnect/walletconnectevent" "github.com/status-im/status-go/transactions" ) func NewAPI(s *Service) *API { - return &API{s, s.reader} + return &API{s, s.reader, s.feed, s.walletConnectFeed} } // API is class with methods available over RPC. type API struct { - s *Service - reader *Reader + s *Service + reader *Reader + feed *event.Feed + walletConnectFeed *event.Feed } func (api *API) StartWallet(ctx context.Context) error { @@ -926,9 +930,19 @@ func (api *API) getVerifiedWalletAccount(address, password string) (*account.Sel } // AddWalletConnectSession adds or updates a session wallet connect session -func (api *API) AddWalletConnectSession(ctx context.Context, session_json string) error { +func (api *API) AddWalletConnectSession(ctx context.Context, session_json string) (walletconnect.Session, error) { logutils.ZapLogger().Debug("wallet.api.AddWalletConnectSession", zap.Int("rpcURL", len(session_json))) - return walletconnect.AddSession(api.s.db, api.s.config.Networks, session_json) + session, err := walletconnect.AddSession(api.s.db, api.s.config.Networks, session_json) + if err != nil { + logutils.ZapLogger().Error("wallet.api.AddWalletConnectSession", zap.Error(err)) + } + + api.walletConnectFeed.Send(walletconnectevent.Event{ + Type: walletconnectevent.EventTypeAdded, + Session: session, + }) + + return session, err } // DisconnectWalletConnectSession removes a wallet connect session diff --git a/services/wallet/api_test.go b/services/wallet/api_test.go index dd41ce9a5..bea442534 100644 --- a/services/wallet/api_test.go +++ b/services/wallet/api_test.go @@ -159,7 +159,7 @@ func TestAPI_GetAddressDetails(t *testing.T) { chainClient.SetWalletNotifier(func(chainID uint64, message string) {}) c.SetWalletNotifier(func(chainID uint64, message string) {}) - service := NewService(db, accountsDb, appDB, c, accountFeed, nil, nil, nil, ¶ms.NodeConfig{}, nil, nil, nil, nil, "") + service := NewService(db, accountsDb, appDB, c, accountFeed, nil, nil, nil, ¶ms.NodeConfig{}, nil, nil, nil, nil, nil, "") api := &API{ s: service, diff --git a/services/wallet/keycard_pairings_test.go b/services/wallet/keycard_pairings_test.go index 44466e4cc..df1b200ff 100644 --- a/services/wallet/keycard_pairings_test.go +++ b/services/wallet/keycard_pairings_test.go @@ -29,7 +29,7 @@ func TestKeycardPairingsFile(t *testing.T) { accountFeed := &event.Feed{} - service := NewService(db, accountsDb, appDB, &rpc.Client{NetworkManager: network.NewManager(db)}, accountFeed, nil, nil, nil, ¶ms.NodeConfig{}, nil, nil, nil, nil, "") + service := NewService(db, accountsDb, appDB, &rpc.Client{NetworkManager: network.NewManager(db)}, accountFeed, nil, nil, nil, ¶ms.NodeConfig{}, nil, nil, nil, nil, nil, "") data, err := service.KeycardPairings().GetPairingsJSONFileContent() require.NoError(t, err) diff --git a/services/wallet/service.go b/services/wallet/service.go index 0148a04f5..1fa4b036c 100644 --- a/services/wallet/service.go +++ b/services/wallet/service.go @@ -62,6 +62,7 @@ func NewService( ens *ens.Service, pendingTxManager *transactions.PendingTxTracker, feed *event.Feed, + walletConnectFeed *event.Feed, mediaServer *server.MediaServer, statusProxyStageName string, ) *Service { @@ -216,6 +217,7 @@ func NewService( transactor: transactor, ens: ens, feed: feed, + walletConnectFeed: walletConnectFeed, signals: signals, reader: reader, history: history, @@ -296,6 +298,7 @@ type Service struct { transactor *transactions.Transactor ens *ens.Service feed *event.Feed + walletConnectFeed *event.Feed signals *walletevent.SignalsTransmitter reader *Reader history *history.Service diff --git a/services/wallet/walletconnect/walletconnectevent/events.go b/services/wallet/walletconnect/walletconnectevent/events.go new file mode 100644 index 000000000..0d35946c3 --- /dev/null +++ b/services/wallet/walletconnect/walletconnectevent/events.go @@ -0,0 +1,19 @@ +package walletconnectevent + +import ( + "github.com/status-im/status-go/services/wallet/walletconnect" +) + +// EventType type for event types. +type EventType string + +// Event is a type for walletConnect events. +type Event struct { + Type EventType `json:"type"` + Session walletconnect.Session `json:"session"` +} + +const ( + // EventTypeAdded is emitted when a new session is added. + EventTypeAdded EventType = "added" +) diff --git a/services/wallet/walletconnect/walletconnectevent/watcher.go b/services/wallet/walletconnect/walletconnectevent/watcher.go new file mode 100644 index 000000000..87c1b1700 --- /dev/null +++ b/services/wallet/walletconnect/walletconnectevent/watcher.go @@ -0,0 +1,73 @@ +package walletconnectevent + +import ( + "context" + + "go.uber.org/zap" + + "github.com/ethereum/go-ethereum/event" + "github.com/status-im/status-go/logutils" + "github.com/status-im/status-go/services/wallet/async" + "github.com/status-im/status-go/services/wallet/walletconnect" +) + +type WalletConnectsChangeCb func(eventType EventType, Session walletconnect.Session) + +// Watcher executes a given callback whenever an walletConnect gets added/removed +type Watcher struct { + walletConnectFeed *event.Feed + group *async.Group + callback WalletConnectsChangeCb +} + +func NewWatcher(walletConnectFeed *event.Feed, callback WalletConnectsChangeCb) *Watcher { + return &Watcher{ + walletConnectFeed: walletConnectFeed, + callback: callback, + } +} + +func (w *Watcher) Start() { + logutils.ZapLogger().Debug("Starting wallet connect connection watcher") + if w.group != nil { + return + } + + w.group = async.NewGroup(context.Background()) + w.group.Add(func(ctx context.Context) error { + return watch(ctx, w.walletConnectFeed, w.callback) + }) +} + +func (w *Watcher) Stop() { + if w.group != nil { + w.group.Stop() + w.group.Wait() + w.group = nil + } +} + +func onWalletConnectSessionChange(callback WalletConnectsChangeCb, session walletconnect.Session, eventType EventType) { + if callback != nil { + callback(eventType, session) + } +} + +func watch(ctx context.Context, walletConnectFeed *event.Feed, callback WalletConnectsChangeCb) error { + ch := make(chan Event, 1) + sub := walletConnectFeed.Subscribe(ch) + defer sub.Unsubscribe() + + for { + select { + case <-ctx.Done(): + return nil + case err := <-sub.Err(): + if err != nil { + logutils.ZapLogger().Error("accounts watcher subscription failed", zap.Error(err)) + } + case ev := <-ch: + onWalletConnectSessionChange(callback, ev.Session, ev.Type) + } + } +}