feat_: use pubsub to send dapp changes to activity center notification

This commit is contained in:
Mohsen 2024-11-04 21:01:14 +03:00
parent 831414d64f
commit eaa5e662ff
No known key found for this signature in database
GPG Key ID: 20BACCB8426033CE
15 changed files with 343 additions and 52 deletions

View File

@ -2707,9 +2707,7 @@ func (b *GethStatusBackend) injectAccountsIntoWakuService(w types.WakuKeyManager
b.statusNode.EnsService().Init(messenger.SyncEnsNamesWithDispatchMessage) b.statusNode.EnsService().Init(messenger.SyncEnsNamesWithDispatchMessage)
b.statusNode.CommunityTokensService().Init(messenger) b.statusNode.CommunityTokensService().Init(messenger)
if walletService := b.statusNode.WalletService(); walletService != nil { b.statusNode.ActivityCenterService().Init(messenger)
walletService.InjectMessenger(messenger)
}
} }
return nil return nil

View File

@ -11,6 +11,8 @@ import (
"reflect" "reflect"
"sync" "sync"
"github.com/status-im/status-go/services/activitycenter"
"github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb"
"go.uber.org/zap" "go.uber.org/zap"
@ -135,9 +137,11 @@ type StatusNode struct {
connectorSrvc *connector.Service connectorSrvc *connector.Service
appGeneralSrvc *appgeneral.Service appGeneralSrvc *appgeneral.Service
ethSrvc *eth.Service ethSrvc *eth.Service
activityCenterSrvc *activitycenter.Service
accountsFeed event.Feed accountsFeed event.Feed
walletFeed event.Feed walletFeed event.Feed
walletConnectFeed event.Feed
} }
// New makes new instance of StatusNode. // New makes new instance of StatusNode.
@ -509,6 +513,7 @@ func (n *StatusNode) stop() error {
n.publicMethods = make(map[string]bool) n.publicMethods = make(map[string]bool)
n.pendingTracker = nil n.pendingTracker = nil
n.appGeneralSrvc = nil n.appGeneralSrvc = nil
n.activityCenterSrvc = nil
n.logger.Debug("status node stopped") n.logger.Debug("status node stopped")
return nil return nil
} }

View File

@ -10,6 +10,8 @@ import (
"reflect" "reflect"
"time" "time"
"github.com/status-im/status-go/services/activitycenter"
"go.uber.org/zap" "go.uber.org/zap"
"github.com/status-im/status-go/protocol/common/shard" "github.com/status-im/status-go/protocol/common/shard"
@ -107,11 +109,12 @@ func (b *StatusNode) initServices(config *params.NodeConfig, mediaServer *server
services = appendIf(config.ConnectorConfig.Enabled, services, b.connectorService()) services = appendIf(config.ConnectorConfig.Enabled, services, b.connectorService())
services = append(services, b.gifService(accDB)) services = append(services, b.gifService(accDB))
services = append(services, b.ChatService(accDB)) services = append(services, b.ChatService(accDB))
services = append(services, b.activityCenterService(&b.walletConnectFeed))
// Wallet Service is used by wakuExtSrvc/wakuV2ExtSrvc // Wallet Service is used by wakuExtSrvc/wakuV2ExtSrvc
// Keep this initialization before the other two // Keep this initialization before the other two
if config.WalletConfig.Enabled { if config.WalletConfig.Enabled {
walletService := b.walletService(accDB, b.appDB, &b.accountsFeed, settingsFeed, &b.walletFeed, config.WalletConfig.StatusProxyStageName) walletService := b.walletService(accDB, b.appDB, &b.accountsFeed, settingsFeed, &b.walletFeed, &b.walletConnectFeed, config.WalletConfig.StatusProxyStageName)
services = append(services, walletService) services = append(services, walletService)
} }
@ -589,13 +592,14 @@ func (b *StatusNode) SetWalletCommunityInfoProvider(provider thirdparty.Communit
} }
} }
func (b *StatusNode) walletService(accountsDB *accounts.Database, appDB *sql.DB, accountsFeed *event.Feed, settingsFeed *event.Feed, walletFeed *event.Feed, statusProxyStageName string) *wallet.Service { func (b *StatusNode) walletService(accountsDB *accounts.Database, appDB *sql.DB, accountsFeed *event.Feed, settingsFeed *event.Feed, walletFeed *event.Feed, walletConnectFeed *event.Feed, statusProxyStageName string) *wallet.Service {
if b.walletSrvc == nil { if b.walletSrvc == nil {
b.walletSrvc = wallet.NewService( b.walletSrvc = wallet.NewService(
b.walletDB, accountsDB, appDB, b.rpcClient, accountsFeed, settingsFeed, b.gethAccountManager, b.transactor, b.config, b.walletDB, accountsDB, appDB, b.rpcClient, accountsFeed, settingsFeed, b.gethAccountManager, b.transactor, b.config,
b.ensService(b.timeSourceNow()), b.ensService(b.timeSourceNow()),
b.pendingTracker, b.pendingTracker,
walletFeed, walletFeed,
walletConnectFeed,
b.httpServer, b.httpServer,
statusProxyStageName, statusProxyStageName,
) )
@ -789,3 +793,15 @@ func (b *StatusNode) CallRPC(inputJSON string) (string, error) {
return b.rpcClient.CallRaw(inputJSON), nil return b.rpcClient.CallRaw(inputJSON), nil
} }
func (b *StatusNode) activityCenterService(walletConnectFeed *event.Feed) *activitycenter.Service {
b.logger.Debug("Initializing activityCenterService activity center service")
if b.activityCenterSrvc == nil {
b.activityCenterSrvc = activitycenter.NewService(walletConnectFeed)
}
return b.activityCenterSrvc
}
func (b *StatusNode) ActivityCenterService() *activitycenter.Service {
return b.activityCenterSrvc
}

View File

@ -284,6 +284,10 @@ func (db sqlitePersistence) unmarshalActivityCenterNotificationRow(row *sql.Row)
var name sql.NullString var name sql.NullString
var author sql.NullString var author sql.NullString
var installationID sql.NullString var installationID sql.NullString
var walletProviderSessionTopic sql.NullString
var dAppURL sql.NullString
var dAppName sql.NullString
var dAppIconURL sql.NullString
notification := &ActivityCenterNotification{} notification := &ActivityCenterNotification{}
err := row.Scan( err := row.Scan(
&notification.ID, &notification.ID,
@ -303,10 +307,10 @@ func (db sqlitePersistence) unmarshalActivityCenterNotificationRow(row *sql.Row)
&name, &name,
&author, &author,
&tokenDataBytes, &tokenDataBytes,
&notification.WalletProviderSessionTopic, &walletProviderSessionTopic,
&notification.DAppURL, &dAppURL,
&notification.DAppName, &dAppName,
&notification.DAppIconURL, &dAppIconURL,
&notification.UpdatedAt, &notification.UpdatedAt,
&installationID) &installationID)
@ -334,6 +338,22 @@ func (db sqlitePersistence) unmarshalActivityCenterNotificationRow(row *sql.Row)
notification.InstallationID = installationID.String notification.InstallationID = installationID.String
} }
if walletProviderSessionTopic.Valid {
notification.WalletProviderSessionTopic = walletProviderSessionTopic.String
}
if dAppURL.Valid {
notification.DAppURL = dAppURL.String
}
if dAppName.Valid {
notification.DAppName = dAppName.String
}
if dAppIconURL.Valid {
notification.DAppIconURL = dAppIconURL.String
}
if len(tokenDataBytes) > 0 { if len(tokenDataBytes) > 0 {
err = json.Unmarshal(tokenDataBytes, &notification.TokenData) err = json.Unmarshal(tokenDataBytes, &notification.TokenData)
if err != nil { if err != nil {
@ -384,6 +404,10 @@ func (db sqlitePersistence) unmarshalActivityCenterNotificationRows(rows *sql.Ro
var name sql.NullString var name sql.NullString
var author sql.NullString var author sql.NullString
var installationID sql.NullString var installationID sql.NullString
var walletProviderSessionTopic sql.NullString
var dAppURL sql.NullString
var dAppName sql.NullString
var dAppIconURL sql.NullString
notification := &ActivityCenterNotification{} notification := &ActivityCenterNotification{}
err := rows.Scan( err := rows.Scan(
&notification.ID, &notification.ID,
@ -402,10 +426,10 @@ func (db sqlitePersistence) unmarshalActivityCenterNotificationRows(rows *sql.Ro
&name, &name,
&author, &author,
&tokenDataBytes, &tokenDataBytes,
&notification.WalletProviderSessionTopic, &walletProviderSessionTopic,
&notification.DAppURL, &dAppURL,
&notification.DAppName, &dAppName,
&notification.DAppIconURL, &dAppIconURL,
&latestCursor, &latestCursor,
&notification.UpdatedAt, &notification.UpdatedAt,
&installationID, &installationID,
@ -434,6 +458,22 @@ func (db sqlitePersistence) unmarshalActivityCenterNotificationRows(rows *sql.Ro
notification.InstallationID = installationID.String notification.InstallationID = installationID.String
} }
if walletProviderSessionTopic.Valid {
notification.WalletProviderSessionTopic = walletProviderSessionTopic.String
}
if dAppURL.Valid {
notification.DAppURL = dAppURL.String
}
if dAppName.Valid {
notification.DAppName = dAppName.String
}
if dAppIconURL.Valid {
notification.DAppIconURL = dAppIconURL.String
}
if len(tokenDataBytes) > 0 { if len(tokenDataBytes) > 0 {
tokenData := &ActivityTokenData{} tokenData := &ActivityTokenData{}
if err = json.Unmarshal(tokenDataBytes, &tokenData); err != nil { if err = json.Unmarshal(tokenDataBytes, &tokenData); err != nil {

View File

@ -4932,6 +4932,19 @@ func (m *Messenger) AddActivityCenterNotificationToResponse(communityID string,
} }
} }
func (m *Messenger) AddNotificationToActivityCenter(notification *ActivityCenterNotification) error {
response := &MessengerResponse{}
err := m.addActivityCenterNotification(response, notification, nil)
if err != nil {
return err
}
if m.config.messengerSignalsHandler != nil {
m.config.messengerSignalsHandler.MessengerResponse(response)
}
return nil
}
func (m *Messenger) leaveCommunityDueToKickOrBan(changes *communities.CommunityChanges, acType ActivityCenterType, stateResponse *MessengerResponse) { func (m *Messenger) leaveCommunityDueToKickOrBan(changes *communities.CommunityChanges, acType ActivityCenterType, stateResponse *MessengerResponse) {
response, err := m.kickedOutOfCommunity(changes.Community.ID(), false) response, err := m.kickedOutOfCommunity(changes.Community.ID(), false)
if err != nil { if err != nil {

View File

@ -1,9 +1,7 @@
package protocol package protocol
import ( import (
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/requests" "github.com/status-im/status-go/protocol/requests"
"github.com/status-im/status-go/services/wallet/walletconnect"
) )
type WalletConnectSession struct { type WalletConnectSession struct {
@ -32,33 +30,6 @@ func (m *Messenger) AddWalletConnectSession(request *requests.AddWalletConnectSe
return m.persistence.InsertWalletConnectSession(session) return m.persistence.InsertWalletConnectSession(session)
} }
func (m *Messenger) NewWalletConnectV2SessionCreatedNotification(session walletconnect.Session) error {
now := m.GetCurrentTimeInMillis()
notification := &ActivityCenterNotification{
ID: types.FromHex(string(session.Topic) + "_dapp_connected"),
Type: ActivityCenterNotificationTypeDAppConnected,
DAppURL: session.Peer.Metadata.URL,
DAppName: session.Peer.Metadata.Name,
WalletProviderSessionTopic: string(session.Topic),
Timestamp: now,
UpdatedAt: now,
}
if len(session.Peer.Metadata.Icons) > 0 {
notification.DAppIconURL = session.Peer.Metadata.Icons[0]
}
response := &MessengerResponse{}
err := m.addActivityCenterNotification(response, notification, nil)
if m.config.messengerSignalsHandler != nil {
m.config.messengerSignalsHandler.MessengerResponse(response)
}
return err
}
func (m *Messenger) GetWalletConnectSession() ([]WalletConnectSession, error) { func (m *Messenger) GetWalletConnectSession() ([]WalletConnectSession, error) {
return m.getWalletConnectSession() return m.getWalletConnectSession()

View File

@ -0,0 +1,89 @@
package activitycenter
import (
"sync"
"go.uber.org/zap"
"github.com/ethereum/go-ethereum/event"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/logutils"
"github.com/status-im/status-go/protocol"
"github.com/status-im/status-go/services/wallet/walletconnect"
"github.com/status-im/status-go/services/wallet/walletconnect/walletconnectevent"
)
type Controller struct {
messenger *protocol.Messenger
walletConnectsFeed *event.Feed
walletConnectWatcher *walletconnectevent.Watcher
commandsLock sync.RWMutex
}
func NewController(walletConnectsFeed *event.Feed) *Controller {
return &Controller{
walletConnectsFeed: walletConnectsFeed,
}
}
func (c *Controller) Start(messenger *protocol.Messenger) {
c.messenger = messenger
c.startWalletConnectConnectionWatcher()
}
func (c *Controller) Stop() {
c.stopWalletConnectConnectionWatcher()
}
func (c *Controller) startWalletConnectConnectionWatcher() {
logutils.ZapLogger().Debug("Starting wallet connect connection watcher")
if c.walletConnectWatcher != nil {
return
}
walletConnectChangeCb := func(eventType walletconnectevent.EventType, session walletconnect.Session) {
c.commandsLock.Lock()
defer c.commandsLock.Unlock()
// Whenever an dApp gets added, add it to activity center
if eventType == walletconnectevent.EventTypeAdded {
err := c.createNewSessionActivityCenterNotification(&session)
if err != nil {
logutils.ZapLogger().Error("Failed to create new session activity center notification", zap.Error(err))
}
}
}
c.walletConnectWatcher = walletconnectevent.NewWatcher(c.walletConnectsFeed, walletConnectChangeCb)
c.walletConnectWatcher.Start()
}
func (c *Controller) stopWalletConnectConnectionWatcher() {
if c.walletConnectWatcher != nil {
c.walletConnectWatcher.Stop()
c.walletConnectWatcher = nil
}
}
func (c *Controller) createNewSessionActivityCenterNotification(session *walletconnect.Session) error {
now := c.messenger.GetCurrentTimeInMillis()
logutils.ZapLogger().Info("Creating new session activity center notification", zap.Any("session", session))
notification := &protocol.ActivityCenterNotification{
ID: types.FromHex(string(session.Topic) + "_dapp_connected"),
Type: protocol.ActivityCenterNotificationTypeDAppConnected,
DAppURL: session.Peer.Metadata.URL,
DAppName: session.Peer.Metadata.Name,
WalletProviderSessionTopic: string(session.Topic),
Timestamp: now,
UpdatedAt: now,
}
if len(session.Peer.Metadata.Icons) > 0 {
notification.DAppIconURL = session.Peer.Metadata.Icons[0]
}
return c.messenger.AddNotificationToActivityCenter(notification)
}

View File

@ -0,0 +1,50 @@
package activitycenter
import (
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rpc"
"github.com/status-im/status-go/logutils"
"github.com/status-im/status-go/protocol"
)
type Service struct {
messenger *protocol.Messenger
walletConnectsFeed *event.Feed
controller *Controller
}
func NewService(walletConnectFeed *event.Feed) *Service {
return &Service{
walletConnectsFeed: walletConnectFeed,
controller: NewController(walletConnectFeed),
}
}
func (s *Service) Init(messenger *protocol.Messenger) {
logutils.ZapLogger().Debug("Initializing activity center service")
s.messenger = messenger
}
func (s *Service) Start() error {
if s.messenger != nil {
logutils.ZapLogger().Debug("Starting activity center service")
s.controller.Start(s.messenger)
} else {
logutils.ZapLogger().Error("Activity center service not started, messenger is nil")
}
return nil
}
func (s *Service) Stop() error {
s.controller.Stop()
return nil
}
func (s *Service) Protocols() []p2p.Protocol {
return nil
}
func (s *Service) APIs() []rpc.API {
return nil
}

View File

@ -15,6 +15,7 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/event"
gethrpc "github.com/ethereum/go-ethereum/rpc" gethrpc "github.com/ethereum/go-ethereum/rpc"
signercore "github.com/ethereum/go-ethereum/signer/core/apitypes" signercore "github.com/ethereum/go-ethereum/signer/core/apitypes"
abi_spec "github.com/status-im/status-go/abi-spec" abi_spec "github.com/status-im/status-go/abi-spec"
@ -39,17 +40,20 @@ import (
"github.com/status-im/status-go/services/wallet/token" "github.com/status-im/status-go/services/wallet/token"
"github.com/status-im/status-go/services/wallet/transfer" "github.com/status-im/status-go/services/wallet/transfer"
"github.com/status-im/status-go/services/wallet/walletconnect" "github.com/status-im/status-go/services/wallet/walletconnect"
"github.com/status-im/status-go/services/wallet/walletconnect/walletconnectevent"
"github.com/status-im/status-go/transactions" "github.com/status-im/status-go/transactions"
) )
func NewAPI(s *Service) *API { func NewAPI(s *Service) *API {
return &API{s, s.reader} return &API{s, s.reader, s.feed, s.walletConnectFeed}
} }
// API is class with methods available over RPC. // API is class with methods available over RPC.
type API struct { type API struct {
s *Service s *Service
reader *Reader reader *Reader
feed *event.Feed
walletConnectFeed *event.Feed
} }
func (api *API) StartWallet(ctx context.Context) error { func (api *API) StartWallet(ctx context.Context) error {
@ -926,9 +930,19 @@ func (api *API) getVerifiedWalletAccount(address, password string) (*account.Sel
} }
// AddWalletConnectSession adds or updates a session wallet connect session // AddWalletConnectSession adds or updates a session wallet connect session
func (api *API) AddWalletConnectSession(ctx context.Context, session_json string) error { func (api *API) AddWalletConnectSession(ctx context.Context, session_json string) (walletconnect.Session, error) {
logutils.ZapLogger().Debug("wallet.api.AddWalletConnectSession", zap.Int("rpcURL", len(session_json))) logutils.ZapLogger().Debug("wallet.api.AddWalletConnectSession", zap.Int("rpcURL", len(session_json)))
return walletconnect.AddSession(api.s.db, api.s.config.Networks, session_json) session, err := walletconnect.AddSession(api.s.db, api.s.config.Networks, session_json)
if err != nil {
logutils.ZapLogger().Error("wallet.api.AddWalletConnectSession", zap.Error(err))
}
api.walletConnectFeed.Send(walletconnectevent.Event{
Type: walletconnectevent.EventTypeAdded,
Session: session,
})
return session, err
} }
// DisconnectWalletConnectSession removes a wallet connect session // DisconnectWalletConnectSession removes a wallet connect session

View File

@ -159,7 +159,7 @@ func TestAPI_GetAddressDetails(t *testing.T) {
chainClient.SetWalletNotifier(func(chainID uint64, message string) {}) chainClient.SetWalletNotifier(func(chainID uint64, message string) {})
c.SetWalletNotifier(func(chainID uint64, message string) {}) c.SetWalletNotifier(func(chainID uint64, message string) {})
service := NewService(db, accountsDb, appDB, c, accountFeed, nil, nil, nil, &params.NodeConfig{}, nil, nil, nil, nil, "") service := NewService(db, accountsDb, appDB, c, accountFeed, nil, nil, nil, &params.NodeConfig{}, nil, nil, nil, nil, nil, "")
api := &API{ api := &API{
s: service, s: service,

View File

@ -29,7 +29,7 @@ func TestKeycardPairingsFile(t *testing.T) {
accountFeed := &event.Feed{} accountFeed := &event.Feed{}
service := NewService(db, accountsDb, appDB, &rpc.Client{NetworkManager: network.NewManager(db)}, accountFeed, nil, nil, nil, &params.NodeConfig{}, nil, nil, nil, nil, "") service := NewService(db, accountsDb, appDB, &rpc.Client{NetworkManager: network.NewManager(db)}, accountFeed, nil, nil, nil, &params.NodeConfig{}, nil, nil, nil, nil, nil, "")
data, err := service.KeycardPairings().GetPairingsJSONFileContent() data, err := service.KeycardPairings().GetPairingsJSONFileContent()
require.NoError(t, err) require.NoError(t, err)

View File

@ -62,6 +62,7 @@ func NewService(
ens *ens.Service, ens *ens.Service,
pendingTxManager *transactions.PendingTxTracker, pendingTxManager *transactions.PendingTxTracker,
feed *event.Feed, feed *event.Feed,
walletConnectFeed *event.Feed,
mediaServer *server.MediaServer, mediaServer *server.MediaServer,
statusProxyStageName string, statusProxyStageName string,
) *Service { ) *Service {
@ -216,6 +217,7 @@ func NewService(
transactor: transactor, transactor: transactor,
ens: ens, ens: ens,
feed: feed, feed: feed,
walletConnectFeed: walletConnectFeed,
signals: signals, signals: signals,
reader: reader, reader: reader,
history: history, history: history,
@ -296,6 +298,7 @@ type Service struct {
transactor *transactions.Transactor transactor *transactions.Transactor
ens *ens.Service ens *ens.Service
feed *event.Feed feed *event.Feed
walletConnectFeed *event.Feed
signals *walletevent.SignalsTransmitter signals *walletevent.SignalsTransmitter
reader *Reader reader *Reader
history *history.Service history *history.Service

View File

@ -0,0 +1,19 @@
package walletconnectevent
import (
"github.com/status-im/status-go/services/wallet/walletconnect"
)
// EventType type for event types.
type EventType string
// Event is a type for walletConnect events.
type Event struct {
Type EventType `json:"type"`
Session walletconnect.Session `json:"session"`
}
const (
// EventTypeAdded is emitted when a new session is added.
EventTypeAdded EventType = "added"
)

View File

@ -0,0 +1,73 @@
package walletconnectevent
import (
"context"
"go.uber.org/zap"
"github.com/ethereum/go-ethereum/event"
"github.com/status-im/status-go/logutils"
"github.com/status-im/status-go/services/wallet/async"
"github.com/status-im/status-go/services/wallet/walletconnect"
)
type WalletConnectsChangeCb func(eventType EventType, Session walletconnect.Session)
// Watcher executes a given callback whenever an walletConnect gets added/removed
type Watcher struct {
walletConnectFeed *event.Feed
group *async.Group
callback WalletConnectsChangeCb
}
func NewWatcher(walletConnectFeed *event.Feed, callback WalletConnectsChangeCb) *Watcher {
return &Watcher{
walletConnectFeed: walletConnectFeed,
callback: callback,
}
}
func (w *Watcher) Start() {
logutils.ZapLogger().Debug("Starting wallet connect connection watcher")
if w.group != nil {
return
}
w.group = async.NewGroup(context.Background())
w.group.Add(func(ctx context.Context) error {
return watch(ctx, w.walletConnectFeed, w.callback)
})
}
func (w *Watcher) Stop() {
if w.group != nil {
w.group.Stop()
w.group.Wait()
w.group = nil
}
}
func onWalletConnectSessionChange(callback WalletConnectsChangeCb, session walletconnect.Session, eventType EventType) {
if callback != nil {
callback(eventType, session)
}
}
func watch(ctx context.Context, walletConnectFeed *event.Feed, callback WalletConnectsChangeCb) error {
ch := make(chan Event, 1)
sub := walletConnectFeed.Subscribe(ch)
defer sub.Unsubscribe()
for {
select {
case <-ctx.Done():
return nil
case err := <-sub.Err():
if err != nil {
logutils.ZapLogger().Error("accounts watcher subscription failed", zap.Error(err))
}
case ev := <-ch:
onWalletConnectSessionChange(callback, ev.Session, ev.Type)
}
}
}