From ba75bda39e2d0210daaa77758193cff9ebc3626a Mon Sep 17 00:00:00 2001 From: Anthony Laibe Date: Mon, 20 Mar 2023 14:02:09 +0100 Subject: [PATCH] feat: event on error --- rpc/chain/client.go | 41 ++++++++---- rpc/client.go | 10 +++ services/wallet/api.go | 4 -- services/wallet/collectibles/collectibles.go | 15 +++-- services/wallet/market/market.go | 50 ++++++++++---- services/wallet/reader.go | 15 ----- services/wallet/service.go | 65 +++++-------------- .../wallet/thirdparty/coingecko/client.go | 14 +--- services/wallet/thirdparty/opensea/client.go | 49 +++++++++----- services/wallet/walletevent/events.go | 2 + 10 files changed, 141 insertions(+), 124 deletions(-) diff --git a/rpc/chain/client.go b/rpc/chain/client.go index 573271120..2fa224771 100644 --- a/rpc/chain/client.go +++ b/rpc/chain/client.go @@ -5,6 +5,7 @@ import ( "fmt" "math/big" "strings" + "sync" "time" "github.com/afex/hystrix-go/hystrix" @@ -31,8 +32,11 @@ type ClientWithFallback struct { mainRPC *rpc.Client fallbackRPC *rpc.Client - IsConnected bool - LastCheckedAt int64 + WalletNotifier func(chainId uint64, message string) + + IsConnected bool + IsConnectedLock sync.RWMutex + LastCheckedAt int64 } var vmErrors = []error{ @@ -118,6 +122,22 @@ func isVMError(err error) bool { return false } +func (c *ClientWithFallback) setIsConnected(value bool) { + c.IsConnectedLock.Lock() + defer c.IsConnectedLock.Unlock() + c.LastCheckedAt = time.Now().Unix() + if value != c.IsConnected { + message := "down" + if value { + message = "up" + } + if c.WalletNotifier != nil { + c.WalletNotifier(c.ChainID, message) + } + } + c.IsConnected = value +} + func (c *ClientWithFallback) makeCallNoReturn(main func() error, fallback func() error) error { resultChan := make(chan CommandResult, 1) c.LastCheckedAt = time.Now().Unix() @@ -130,7 +150,7 @@ func (c *ClientWithFallback) makeCallNoReturn(main func() error, fallback func() } return err } - c.IsConnected = true + c.setIsConnected(true) resultChan <- CommandResult{} return nil }, func(err error) error { @@ -144,10 +164,9 @@ func (c *ClientWithFallback) makeCallNoReturn(main func() error, fallback func() resultChan <- CommandResult{vmError: err} return nil } - c.IsConnected = false + c.setIsConnected(false) return err } - c.IsConnected = true resultChan <- CommandResult{} return nil }) @@ -175,7 +194,7 @@ func (c *ClientWithFallback) makeCallSingleReturn(main func() (any, error), fall } return err } - c.IsConnected = true + c.setIsConnected(true) resultChan <- CommandResult{res1: res} return nil }, func(err error) error { @@ -189,10 +208,10 @@ func (c *ClientWithFallback) makeCallSingleReturn(main func() (any, error), fall resultChan <- CommandResult{vmError: err} return nil } - c.IsConnected = false + c.setIsConnected(false) return err } - c.IsConnected = true + c.setIsConnected(true) resultChan <- CommandResult{res1: res} return nil }) @@ -220,7 +239,7 @@ func (c *ClientWithFallback) makeCallDoubleReturn(main func() (any, any, error), } return err } - c.IsConnected = true + c.setIsConnected(true) resultChan <- CommandResult{res1: a, res2: b} return nil }, func(err error) error { @@ -234,10 +253,10 @@ func (c *ClientWithFallback) makeCallDoubleReturn(main func() (any, any, error), resultChan <- CommandResult{vmError: err} return nil } - c.IsConnected = false + c.setIsConnected(false) return err } - c.IsConnected = true + c.setIsConnected(true) resultChan <- CommandResult{res1: a, res2: b} return nil }) diff --git a/rpc/client.go b/rpc/client.go index 53ab52cb7..c908b24e5 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -52,6 +52,8 @@ type Client struct { handlersMx sync.RWMutex // mx guards handlers handlers map[string]Handler // locally registered handlers log log.Logger + + walletNotifier func(chainID uint64, message string) } // NewClient initializes Client and tries to connect to both, @@ -93,8 +95,15 @@ func NewClient(client *gethrpc.Client, upstreamChainID uint64, upstream params.U return &c, nil } +func (c *Client) SetWalletNotifier(notifier func(chainID uint64, message string)) { + c.walletNotifier = notifier +} + func (c *Client) getClientUsingCache(chainID uint64) (*chain.ClientWithFallback, error) { if rpcClient, ok := c.rpcClients[chainID]; ok { + if rpcClient.WalletNotifier == nil { + rpcClient.WalletNotifier = c.walletNotifier + } return rpcClient, nil } @@ -120,6 +129,7 @@ func (c *Client) getClientUsingCache(chainID uint64) (*chain.ClientWithFallback, } client := chain.NewClient(rpcClient, rpcFallbackClient, chainID) + client.WalletNotifier = c.walletNotifier c.rpcClients[chainID] = client return client, nil } diff --git a/services/wallet/api.go b/services/wallet/api.go index 5505da39b..a22b97b62 100644 --- a/services/wallet/api.go +++ b/services/wallet/api.go @@ -37,10 +37,6 @@ func (api *API) StartWallet(ctx context.Context) error { return api.reader.Start() } -func (api *API) CheckConnected(ctx context.Context) *ConnectedResult { - return api.s.CheckConnected(ctx) -} - func (api *API) StopWallet(ctx context.Context) error { return api.s.Stop() } diff --git a/services/wallet/collectibles/collectibles.go b/services/wallet/collectibles/collectibles.go index b343df48b..f24165797 100644 --- a/services/wallet/collectibles/collectibles.go +++ b/services/wallet/collectibles/collectibles.go @@ -8,6 +8,7 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/event" "github.com/status-im/status-go/contracts/collectibles" "github.com/status-im/status-go/rpc" "github.com/status-im/status-go/services/wallet/thirdparty" @@ -22,19 +23,21 @@ type Manager struct { openseaAPIKey string nftCache map[uint64]map[string]opensea.Asset nftCacheLock sync.RWMutex + walletFeed *event.Feed } -func NewManager(rpcClient *rpc.Client, metadataProvider thirdparty.NFTMetadataProvider, openseaAPIKey string) *Manager { +func NewManager(rpcClient *rpc.Client, metadataProvider thirdparty.NFTMetadataProvider, openseaAPIKey string, walletFeed *event.Feed) *Manager { return &Manager{ rpcClient: rpcClient, metadataProvider: metadataProvider, openseaAPIKey: openseaAPIKey, nftCache: make(map[uint64]map[string]opensea.Asset), + walletFeed: walletFeed, } } func (o *Manager) FetchAllCollectionsByOwner(chainID uint64, owner common.Address) ([]opensea.OwnedCollection, error) { - client, err := opensea.NewOpenseaClient(chainID, o.openseaAPIKey) + client, err := opensea.NewOpenseaClient(chainID, o.openseaAPIKey, o.walletFeed) if err != nil { return nil, err } @@ -42,7 +45,7 @@ func (o *Manager) FetchAllCollectionsByOwner(chainID uint64, owner common.Addres } func (o *Manager) FetchAllAssetsByOwnerAndCollection(chainID uint64, owner common.Address, collectionSlug string, cursor string, limit int) (*opensea.AssetContainer, error) { - client, err := opensea.NewOpenseaClient(chainID, o.openseaAPIKey) + client, err := opensea.NewOpenseaClient(chainID, o.openseaAPIKey, o.walletFeed) if err != nil { return nil, err } @@ -61,7 +64,7 @@ func (o *Manager) FetchAllAssetsByOwnerAndCollection(chainID uint64, owner commo } func (o *Manager) FetchAllAssetsByOwnerAndContractAddress(chainID uint64, owner common.Address, contractAddresses []common.Address, cursor string, limit int) (*opensea.AssetContainer, error) { - client, err := opensea.NewOpenseaClient(chainID, o.openseaAPIKey) + client, err := opensea.NewOpenseaClient(chainID, o.openseaAPIKey, o.walletFeed) if err != nil { return nil, err } @@ -80,7 +83,7 @@ func (o *Manager) FetchAllAssetsByOwnerAndContractAddress(chainID uint64, owner } func (o *Manager) FetchAllAssetsByOwner(chainID uint64, owner common.Address, cursor string, limit int) (*opensea.AssetContainer, error) { - client, err := opensea.NewOpenseaClient(chainID, o.openseaAPIKey) + client, err := opensea.NewOpenseaClient(chainID, o.openseaAPIKey, o.walletFeed) if err != nil { return nil, err } @@ -103,7 +106,7 @@ func (o *Manager) FetchAssetsByNFTUniqueID(chainID uint64, uniqueIDs []thirdpart idsToFetch := o.getIDsNotInCache(chainID, uniqueIDs) if len(idsToFetch) > 0 { - client, err := opensea.NewOpenseaClient(chainID, o.openseaAPIKey) + client, err := opensea.NewOpenseaClient(chainID, o.openseaAPIKey, o.walletFeed) if err != nil { return nil, err } diff --git a/services/wallet/market/market.go b/services/wallet/market/market.go index 90e96be26..af669fbbe 100644 --- a/services/wallet/market/market.go +++ b/services/wallet/market/market.go @@ -5,8 +5,15 @@ import ( "time" "github.com/afex/hystrix-go/hystrix" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/event" "github.com/status-im/status-go/services/wallet/thirdparty" + "github.com/status-im/status-go/services/wallet/walletevent" +) + +const ( + EventMarketStatusChanged walletevent.EventType = "wallet-market-status-changed" ) type DataPoint struct { @@ -17,15 +24,17 @@ type DataPoint struct { type DataPerTokenAndCurrency = map[string]map[string]DataPoint type Manager struct { - main thirdparty.MarketDataProvider - fallback thirdparty.MarketDataProvider - priceCache DataPerTokenAndCurrency - IsConnected bool - LastCheckedAt int64 - priceCacheLock sync.RWMutex + main thirdparty.MarketDataProvider + fallback thirdparty.MarketDataProvider + feed *event.Feed + priceCache DataPerTokenAndCurrency + priceCacheLock sync.RWMutex + IsConnected bool + LastCheckedAt int64 + IsConnectedLock sync.RWMutex } -func NewManager(main thirdparty.MarketDataProvider, fallback thirdparty.MarketDataProvider) *Manager { +func NewManager(main thirdparty.MarketDataProvider, fallback thirdparty.MarketDataProvider, feed *event.Feed) *Manager { hystrix.ConfigureCommand("marketClient", hystrix.CommandConfig{ Timeout: 10000, MaxConcurrentRequests: 100, @@ -36,21 +45,40 @@ func NewManager(main thirdparty.MarketDataProvider, fallback thirdparty.MarketDa return &Manager{ main: main, fallback: fallback, + feed: feed, priceCache: make(DataPerTokenAndCurrency), IsConnected: true, LastCheckedAt: time.Now().Unix(), } } +func (pm *Manager) setIsConnected(value bool) { + pm.IsConnectedLock.Lock() + defer pm.IsConnectedLock.Unlock() + pm.LastCheckedAt = time.Now().Unix() + if value != pm.IsConnected { + message := "down" + if value { + message = "up" + } + pm.feed.Send(walletevent.Event{ + Type: EventMarketStatusChanged, + Accounts: []common.Address{}, + Message: message, + At: time.Now().Unix(), + }) + } + pm.IsConnected = value +} + func (pm *Manager) makeCall(main func() (any, error), fallback func() (any, error)) (any, error) { resultChan := make(chan any, 1) - pm.LastCheckedAt = time.Now().Unix() errChan := hystrix.Go("marketClient", func() error { res, err := main() if err != nil { return err } - pm.IsConnected = true + pm.setIsConnected(true) resultChan <- res return nil }, func(err error) error { @@ -60,10 +88,10 @@ func (pm *Manager) makeCall(main func() (any, error), fallback func() (any, erro res, err := fallback() if err != nil { - pm.IsConnected = false + pm.setIsConnected(false) return err } - pm.IsConnected = true + pm.setIsConnected(true) resultChan <- res return nil }) diff --git a/services/wallet/reader.go b/services/wallet/reader.go index e0d1417cb..bc22744c6 100644 --- a/services/wallet/reader.go +++ b/services/wallet/reader.go @@ -110,21 +110,6 @@ func (r *Reader) Start() error { ctx, cancel := context.WithCancel(context.Background()) r.cancel = cancel - go func() { - ticker := time.NewTicker(time.Minute) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - r.walletFeed.Send(walletevent.Event{ - Type: EventWalletTickCheckConnected, - }) - } - } - }() - go func() { ticker := time.NewTicker(10 * time.Minute) defer ticker.Stop() diff --git a/services/wallet/service.go b/services/wallet/service.go index d1167ebe5..7dea6b64c 100644 --- a/services/wallet/service.go +++ b/services/wallet/service.go @@ -1,10 +1,10 @@ package wallet import ( - "context" "database/sql" "time" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" @@ -23,23 +23,15 @@ import ( "github.com/status-im/status-go/services/wallet/thirdparty" "github.com/status-im/status-go/services/wallet/thirdparty/coingecko" "github.com/status-im/status-go/services/wallet/thirdparty/cryptocompare" - "github.com/status-im/status-go/services/wallet/thirdparty/opensea" "github.com/status-im/status-go/services/wallet/token" "github.com/status-im/status-go/services/wallet/transfer" "github.com/status-im/status-go/services/wallet/walletevent" "github.com/status-im/status-go/transactions" ) -type Connection struct { - Up bool `json:"up"` - LastCheckedAt int64 `json:"lastCheckedAt"` -} - -type ConnectedResult struct { - Blockchains map[uint64]Connection `json:"blockchains"` - Market Connection `json:"market"` - Collectibles map[uint64]Connection `json:"collectibles"` -} +const ( + EventBlockchainStatusChanged walletevent.EventType = "wallet-blockchain-status-changed" +) // NewService initializes service instance. func NewService( @@ -62,17 +54,26 @@ func NewService( signals := &walletevent.SignalsTransmitter{ Publisher: walletFeed, } + rpcClient.SetWalletNotifier(func(chainID uint64, message string) { + walletFeed.Send(walletevent.Event{ + Type: EventBlockchainStatusChanged, + Accounts: []common.Address{}, + Message: message, + At: time.Now().Unix(), + ChainID: chainID, + }) + }) tokenManager := token.NewTokenManager(db, rpcClient, rpcClient.NetworkManager) savedAddressesManager := &SavedAddressesManager{db: db} transactionManager := transfer.NewTransactionManager(db, gethManager, transactor, config, accountsDB) transferController := transfer.NewTransferController(db, rpcClient, accountFeed, walletFeed, transactionManager) cryptoCompare := cryptocompare.NewClient() coingecko := coingecko.NewClient() - marketManager := market.NewManager(cryptoCompare, coingecko) + marketManager := market.NewManager(cryptoCompare, coingecko, walletFeed) reader := NewReader(rpcClient, tokenManager, marketManager, accountsDB, walletFeed) history := history.NewService(db, walletFeed, rpcClient, tokenManager, marketManager) currency := currency.NewService(db, walletFeed, tokenManager, marketManager) - collectiblesManager := collectibles.NewManager(rpcClient, nftMetadataProvider, openseaAPIKey) + collectiblesManager := collectibles.NewManager(rpcClient, nftMetadataProvider, openseaAPIKey, walletFeed) return &Service{ db: db, accountsDB: accountsDB, @@ -170,39 +171,3 @@ func (s *Service) Protocols() []p2p.Protocol { func (s *Service) IsStarted() bool { return s.started } - -func (s *Service) CheckConnected(ctx context.Context) *ConnectedResult { - networks, err := s.rpcClient.NetworkManager.Get(false) - blockchains := make(map[uint64]Connection) - if err == nil { - for _, network := range networks { - ethClient, err := s.rpcClient.EthClient(network.ChainID) - if err != nil { - blockchains[network.ChainID] = Connection{ - Up: true, - LastCheckedAt: time.Now().Unix(), - } - } - blockchains[network.ChainID] = Connection{ - Up: ethClient.IsConnected, - LastCheckedAt: ethClient.LastCheckedAt, - } - } - } - - collectibles := make(map[uint64]Connection) - for chainID, client := range opensea.OpenseaClientInstances { - collectibles[chainID] = Connection{ - Up: client.IsConnected, - LastCheckedAt: client.LastCheckedAt, - } - } - return &ConnectedResult{ - Blockchains: blockchains, - Collectibles: collectibles, - Market: Connection{ - Up: s.marketManager.IsConnected, - LastCheckedAt: s.marketManager.LastCheckedAt, - }, - } -} diff --git a/services/wallet/thirdparty/coingecko/client.go b/services/wallet/thirdparty/coingecko/client.go index 0bd0f66d9..57d50c940 100644 --- a/services/wallet/thirdparty/coingecko/client.go +++ b/services/wallet/thirdparty/coingecko/client.go @@ -5,7 +5,6 @@ import ( "fmt" "io/ioutil" "net/http" - "net/url" "strings" "sync" "time" @@ -111,7 +110,6 @@ func mapTokensToSymbols(tokens []GeckoToken, tokenMap map[string]GeckoToken) { } func (c *Client) getTokens() (map[string]GeckoToken, error) { - c.fetchTokensMutex.Lock() defer c.fetchTokensMutex.Unlock() @@ -226,16 +224,8 @@ func (c *Client) FetchTokenMarketValues(symbols []string, currency string) (map[ if err != nil { return nil, err } - queryParams := url.Values{ - "ids": {strings.Join(ids, ",")}, - "vs_currency": {currency}, - "order": {"market_cap_desc"}, - "per_page": {"250"}, - "page": {"1"}, - "sparkline": {"false"}, - "price_change_percentage": {"1h,24h"}, - } - url := baseURL + "coins/markets" + queryParams.Encode() + url := fmt.Sprintf("%scoins/markets?ids=%s&vs_currency=%s&order=market_cap_desc&per_page=250&page=1&sparkline=false&price_change_percentage=1h%2C24h", baseURL, strings.Join(ids, ","), currency) + resp, err := c.DoQuery(url) if err != nil { return nil, err diff --git a/services/wallet/thirdparty/opensea/client.go b/services/wallet/thirdparty/opensea/client.go index ca7081c8c..470addfa5 100644 --- a/services/wallet/thirdparty/opensea/client.go +++ b/services/wallet/thirdparty/opensea/client.go @@ -12,10 +12,16 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/status-im/status-go/services/wallet/bigint" "github.com/status-im/status-go/services/wallet/thirdparty" + "github.com/status-im/status-go/services/wallet/walletevent" +) + +const ( + EventCollectibleStatusChanged walletevent.EventType = "wallet-collectible-status-changed" ) const AssetLimit = 200 @@ -216,10 +222,11 @@ type Client struct { IsConnected bool LastCheckedAt int64 IsConnectedLock sync.RWMutex + feed *event.Feed } // new opensea client. -func NewOpenseaClient(chainID uint64, apiKey string) (*Client, error) { +func NewOpenseaClient(chainID uint64, apiKey string, feed *event.Feed) (*Client, error) { if OpenseaHTTPClient == nil { OpenseaHTTPClient = newHTTPClient() } @@ -228,16 +235,16 @@ func NewOpenseaClient(chainID uint64, apiKey string) (*Client, error) { if chainID == ChainIDRequiringAPIKey { tmpAPIKey = apiKey } - if client, ok := OpenseaClientInstances[chainID]; ok { - if client.apiKey == tmpAPIKey { - return client, nil - } - } baseURL, err := getbaseURL(chainID) if err != nil { return nil, err } + if client, ok := OpenseaClientInstances[chainID]; ok { + if client.apiKey == tmpAPIKey { + return client, nil + } + } openseaClient := &Client{ client: OpenseaHTTPClient, @@ -245,18 +252,30 @@ func NewOpenseaClient(chainID uint64, apiKey string) (*Client, error) { apiKey: tmpAPIKey, IsConnected: true, LastCheckedAt: time.Now().Unix(), + feed: feed, } OpenseaClientInstances[chainID] = openseaClient return openseaClient, nil } -func (o *Client) setConnected(value bool) { +func (o *Client) setIsConnected(value bool) { o.IsConnectedLock.Lock() defer o.IsConnectedLock.Unlock() - o.IsConnected = value o.LastCheckedAt = time.Now().Unix() + if value != o.IsConnected { + message := "down" + if value { + message = "up" + } + o.feed.Send(walletevent.Event{ + Type: EventCollectibleStatusChanged, + Accounts: []common.Address{}, + Message: message, + At: time.Now().Unix(), + }) + } + o.IsConnected = value } - func (o *Client) FetchAllCollectionsByOwner(owner common.Address) ([]OwnedCollection, error) { offset := 0 var collections []OwnedCollection @@ -264,7 +283,7 @@ func (o *Client) FetchAllCollectionsByOwner(owner common.Address) ([]OwnedCollec url := fmt.Sprintf("%s/collections?asset_owner=%s&offset=%d&limit=%d", o.url, owner, offset, CollectionLimit) body, err := o.client.doGetRequest(url, o.apiKey) if err != nil { - o.setConnected(false) + o.setIsConnected(false) return nil, err } @@ -276,7 +295,7 @@ func (o *Client) FetchAllCollectionsByOwner(owner common.Address) ([]OwnedCollec var tmp []OwnedCollection err = json.Unmarshal(body, &tmp) if err != nil { - o.setConnected(false) + o.setIsConnected(false) return nil, err } @@ -286,7 +305,7 @@ func (o *Client) FetchAllCollectionsByOwner(owner common.Address) ([]OwnedCollec break } } - o.setConnected(true) + o.setIsConnected(true) return collections, nil } @@ -360,7 +379,7 @@ func (o *Client) fetchAssets(queryParams url.Values, limit int) (*AssetContainer body, err := o.client.doGetRequest(url, o.apiKey) if err != nil { - o.setConnected(false) + o.setIsConnected(false) return nil, err } @@ -372,7 +391,7 @@ func (o *Client) fetchAssets(queryParams url.Values, limit int) (*AssetContainer container := AssetContainer{} err = json.Unmarshal(body, &container) if err != nil { - o.setConnected(false) + o.setIsConnected(false) return nil, err } @@ -396,6 +415,6 @@ func (o *Client) fetchAssets(queryParams url.Values, limit int) (*AssetContainer } } - o.setConnected(true) + o.setIsConnected(true) return assets, nil } diff --git a/services/wallet/walletevent/events.go b/services/wallet/walletevent/events.go index 277b120d6..72cb5b0c5 100644 --- a/services/wallet/walletevent/events.go +++ b/services/wallet/walletevent/events.go @@ -15,4 +15,6 @@ type Event struct { BlockNumber *big.Int `json:"blockNumber"` Accounts []common.Address `json:"accounts"` Message string `json:"message"` + At int64 `json:"at"` + ChainID uint64 `json:"chainId"` }