feat: event on error

This commit is contained in:
Anthony Laibe 2023-03-20 14:02:09 +01:00 committed by Anthony Laibe
parent 4e222cc404
commit ba75bda39e
10 changed files with 141 additions and 124 deletions

View File

@ -5,6 +5,7 @@ import (
"fmt"
"math/big"
"strings"
"sync"
"time"
"github.com/afex/hystrix-go/hystrix"
@ -31,8 +32,11 @@ type ClientWithFallback struct {
mainRPC *rpc.Client
fallbackRPC *rpc.Client
IsConnected bool
LastCheckedAt int64
WalletNotifier func(chainId uint64, message string)
IsConnected bool
IsConnectedLock sync.RWMutex
LastCheckedAt int64
}
var vmErrors = []error{
@ -118,6 +122,22 @@ func isVMError(err error) bool {
return false
}
func (c *ClientWithFallback) setIsConnected(value bool) {
c.IsConnectedLock.Lock()
defer c.IsConnectedLock.Unlock()
c.LastCheckedAt = time.Now().Unix()
if value != c.IsConnected {
message := "down"
if value {
message = "up"
}
if c.WalletNotifier != nil {
c.WalletNotifier(c.ChainID, message)
}
}
c.IsConnected = value
}
func (c *ClientWithFallback) makeCallNoReturn(main func() error, fallback func() error) error {
resultChan := make(chan CommandResult, 1)
c.LastCheckedAt = time.Now().Unix()
@ -130,7 +150,7 @@ func (c *ClientWithFallback) makeCallNoReturn(main func() error, fallback func()
}
return err
}
c.IsConnected = true
c.setIsConnected(true)
resultChan <- CommandResult{}
return nil
}, func(err error) error {
@ -144,10 +164,9 @@ func (c *ClientWithFallback) makeCallNoReturn(main func() error, fallback func()
resultChan <- CommandResult{vmError: err}
return nil
}
c.IsConnected = false
c.setIsConnected(false)
return err
}
c.IsConnected = true
resultChan <- CommandResult{}
return nil
})
@ -175,7 +194,7 @@ func (c *ClientWithFallback) makeCallSingleReturn(main func() (any, error), fall
}
return err
}
c.IsConnected = true
c.setIsConnected(true)
resultChan <- CommandResult{res1: res}
return nil
}, func(err error) error {
@ -189,10 +208,10 @@ func (c *ClientWithFallback) makeCallSingleReturn(main func() (any, error), fall
resultChan <- CommandResult{vmError: err}
return nil
}
c.IsConnected = false
c.setIsConnected(false)
return err
}
c.IsConnected = true
c.setIsConnected(true)
resultChan <- CommandResult{res1: res}
return nil
})
@ -220,7 +239,7 @@ func (c *ClientWithFallback) makeCallDoubleReturn(main func() (any, any, error),
}
return err
}
c.IsConnected = true
c.setIsConnected(true)
resultChan <- CommandResult{res1: a, res2: b}
return nil
}, func(err error) error {
@ -234,10 +253,10 @@ func (c *ClientWithFallback) makeCallDoubleReturn(main func() (any, any, error),
resultChan <- CommandResult{vmError: err}
return nil
}
c.IsConnected = false
c.setIsConnected(false)
return err
}
c.IsConnected = true
c.setIsConnected(true)
resultChan <- CommandResult{res1: a, res2: b}
return nil
})

View File

@ -52,6 +52,8 @@ type Client struct {
handlersMx sync.RWMutex // mx guards handlers
handlers map[string]Handler // locally registered handlers
log log.Logger
walletNotifier func(chainID uint64, message string)
}
// NewClient initializes Client and tries to connect to both,
@ -93,8 +95,15 @@ func NewClient(client *gethrpc.Client, upstreamChainID uint64, upstream params.U
return &c, nil
}
func (c *Client) SetWalletNotifier(notifier func(chainID uint64, message string)) {
c.walletNotifier = notifier
}
func (c *Client) getClientUsingCache(chainID uint64) (*chain.ClientWithFallback, error) {
if rpcClient, ok := c.rpcClients[chainID]; ok {
if rpcClient.WalletNotifier == nil {
rpcClient.WalletNotifier = c.walletNotifier
}
return rpcClient, nil
}
@ -120,6 +129,7 @@ func (c *Client) getClientUsingCache(chainID uint64) (*chain.ClientWithFallback,
}
client := chain.NewClient(rpcClient, rpcFallbackClient, chainID)
client.WalletNotifier = c.walletNotifier
c.rpcClients[chainID] = client
return client, nil
}

View File

@ -37,10 +37,6 @@ func (api *API) StartWallet(ctx context.Context) error {
return api.reader.Start()
}
func (api *API) CheckConnected(ctx context.Context) *ConnectedResult {
return api.s.CheckConnected(ctx)
}
func (api *API) StopWallet(ctx context.Context) error {
return api.s.Stop()
}

View File

@ -8,6 +8,7 @@ import (
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/event"
"github.com/status-im/status-go/contracts/collectibles"
"github.com/status-im/status-go/rpc"
"github.com/status-im/status-go/services/wallet/thirdparty"
@ -22,19 +23,21 @@ type Manager struct {
openseaAPIKey string
nftCache map[uint64]map[string]opensea.Asset
nftCacheLock sync.RWMutex
walletFeed *event.Feed
}
func NewManager(rpcClient *rpc.Client, metadataProvider thirdparty.NFTMetadataProvider, openseaAPIKey string) *Manager {
func NewManager(rpcClient *rpc.Client, metadataProvider thirdparty.NFTMetadataProvider, openseaAPIKey string, walletFeed *event.Feed) *Manager {
return &Manager{
rpcClient: rpcClient,
metadataProvider: metadataProvider,
openseaAPIKey: openseaAPIKey,
nftCache: make(map[uint64]map[string]opensea.Asset),
walletFeed: walletFeed,
}
}
func (o *Manager) FetchAllCollectionsByOwner(chainID uint64, owner common.Address) ([]opensea.OwnedCollection, error) {
client, err := opensea.NewOpenseaClient(chainID, o.openseaAPIKey)
client, err := opensea.NewOpenseaClient(chainID, o.openseaAPIKey, o.walletFeed)
if err != nil {
return nil, err
}
@ -42,7 +45,7 @@ func (o *Manager) FetchAllCollectionsByOwner(chainID uint64, owner common.Addres
}
func (o *Manager) FetchAllAssetsByOwnerAndCollection(chainID uint64, owner common.Address, collectionSlug string, cursor string, limit int) (*opensea.AssetContainer, error) {
client, err := opensea.NewOpenseaClient(chainID, o.openseaAPIKey)
client, err := opensea.NewOpenseaClient(chainID, o.openseaAPIKey, o.walletFeed)
if err != nil {
return nil, err
}
@ -61,7 +64,7 @@ func (o *Manager) FetchAllAssetsByOwnerAndCollection(chainID uint64, owner commo
}
func (o *Manager) FetchAllAssetsByOwnerAndContractAddress(chainID uint64, owner common.Address, contractAddresses []common.Address, cursor string, limit int) (*opensea.AssetContainer, error) {
client, err := opensea.NewOpenseaClient(chainID, o.openseaAPIKey)
client, err := opensea.NewOpenseaClient(chainID, o.openseaAPIKey, o.walletFeed)
if err != nil {
return nil, err
}
@ -80,7 +83,7 @@ func (o *Manager) FetchAllAssetsByOwnerAndContractAddress(chainID uint64, owner
}
func (o *Manager) FetchAllAssetsByOwner(chainID uint64, owner common.Address, cursor string, limit int) (*opensea.AssetContainer, error) {
client, err := opensea.NewOpenseaClient(chainID, o.openseaAPIKey)
client, err := opensea.NewOpenseaClient(chainID, o.openseaAPIKey, o.walletFeed)
if err != nil {
return nil, err
}
@ -103,7 +106,7 @@ func (o *Manager) FetchAssetsByNFTUniqueID(chainID uint64, uniqueIDs []thirdpart
idsToFetch := o.getIDsNotInCache(chainID, uniqueIDs)
if len(idsToFetch) > 0 {
client, err := opensea.NewOpenseaClient(chainID, o.openseaAPIKey)
client, err := opensea.NewOpenseaClient(chainID, o.openseaAPIKey, o.walletFeed)
if err != nil {
return nil, err
}

View File

@ -5,8 +5,15 @@ import (
"time"
"github.com/afex/hystrix-go/hystrix"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/event"
"github.com/status-im/status-go/services/wallet/thirdparty"
"github.com/status-im/status-go/services/wallet/walletevent"
)
const (
EventMarketStatusChanged walletevent.EventType = "wallet-market-status-changed"
)
type DataPoint struct {
@ -17,15 +24,17 @@ type DataPoint struct {
type DataPerTokenAndCurrency = map[string]map[string]DataPoint
type Manager struct {
main thirdparty.MarketDataProvider
fallback thirdparty.MarketDataProvider
priceCache DataPerTokenAndCurrency
IsConnected bool
LastCheckedAt int64
priceCacheLock sync.RWMutex
main thirdparty.MarketDataProvider
fallback thirdparty.MarketDataProvider
feed *event.Feed
priceCache DataPerTokenAndCurrency
priceCacheLock sync.RWMutex
IsConnected bool
LastCheckedAt int64
IsConnectedLock sync.RWMutex
}
func NewManager(main thirdparty.MarketDataProvider, fallback thirdparty.MarketDataProvider) *Manager {
func NewManager(main thirdparty.MarketDataProvider, fallback thirdparty.MarketDataProvider, feed *event.Feed) *Manager {
hystrix.ConfigureCommand("marketClient", hystrix.CommandConfig{
Timeout: 10000,
MaxConcurrentRequests: 100,
@ -36,21 +45,40 @@ func NewManager(main thirdparty.MarketDataProvider, fallback thirdparty.MarketDa
return &Manager{
main: main,
fallback: fallback,
feed: feed,
priceCache: make(DataPerTokenAndCurrency),
IsConnected: true,
LastCheckedAt: time.Now().Unix(),
}
}
func (pm *Manager) setIsConnected(value bool) {
pm.IsConnectedLock.Lock()
defer pm.IsConnectedLock.Unlock()
pm.LastCheckedAt = time.Now().Unix()
if value != pm.IsConnected {
message := "down"
if value {
message = "up"
}
pm.feed.Send(walletevent.Event{
Type: EventMarketStatusChanged,
Accounts: []common.Address{},
Message: message,
At: time.Now().Unix(),
})
}
pm.IsConnected = value
}
func (pm *Manager) makeCall(main func() (any, error), fallback func() (any, error)) (any, error) {
resultChan := make(chan any, 1)
pm.LastCheckedAt = time.Now().Unix()
errChan := hystrix.Go("marketClient", func() error {
res, err := main()
if err != nil {
return err
}
pm.IsConnected = true
pm.setIsConnected(true)
resultChan <- res
return nil
}, func(err error) error {
@ -60,10 +88,10 @@ func (pm *Manager) makeCall(main func() (any, error), fallback func() (any, erro
res, err := fallback()
if err != nil {
pm.IsConnected = false
pm.setIsConnected(false)
return err
}
pm.IsConnected = true
pm.setIsConnected(true)
resultChan <- res
return nil
})

View File

@ -110,21 +110,6 @@ func (r *Reader) Start() error {
ctx, cancel := context.WithCancel(context.Background())
r.cancel = cancel
go func() {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
r.walletFeed.Send(walletevent.Event{
Type: EventWalletTickCheckConnected,
})
}
}
}()
go func() {
ticker := time.NewTicker(10 * time.Minute)
defer ticker.Stop()

View File

@ -1,10 +1,10 @@
package wallet
import (
"context"
"database/sql"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
@ -23,23 +23,15 @@ import (
"github.com/status-im/status-go/services/wallet/thirdparty"
"github.com/status-im/status-go/services/wallet/thirdparty/coingecko"
"github.com/status-im/status-go/services/wallet/thirdparty/cryptocompare"
"github.com/status-im/status-go/services/wallet/thirdparty/opensea"
"github.com/status-im/status-go/services/wallet/token"
"github.com/status-im/status-go/services/wallet/transfer"
"github.com/status-im/status-go/services/wallet/walletevent"
"github.com/status-im/status-go/transactions"
)
type Connection struct {
Up bool `json:"up"`
LastCheckedAt int64 `json:"lastCheckedAt"`
}
type ConnectedResult struct {
Blockchains map[uint64]Connection `json:"blockchains"`
Market Connection `json:"market"`
Collectibles map[uint64]Connection `json:"collectibles"`
}
const (
EventBlockchainStatusChanged walletevent.EventType = "wallet-blockchain-status-changed"
)
// NewService initializes service instance.
func NewService(
@ -62,17 +54,26 @@ func NewService(
signals := &walletevent.SignalsTransmitter{
Publisher: walletFeed,
}
rpcClient.SetWalletNotifier(func(chainID uint64, message string) {
walletFeed.Send(walletevent.Event{
Type: EventBlockchainStatusChanged,
Accounts: []common.Address{},
Message: message,
At: time.Now().Unix(),
ChainID: chainID,
})
})
tokenManager := token.NewTokenManager(db, rpcClient, rpcClient.NetworkManager)
savedAddressesManager := &SavedAddressesManager{db: db}
transactionManager := transfer.NewTransactionManager(db, gethManager, transactor, config, accountsDB)
transferController := transfer.NewTransferController(db, rpcClient, accountFeed, walletFeed, transactionManager)
cryptoCompare := cryptocompare.NewClient()
coingecko := coingecko.NewClient()
marketManager := market.NewManager(cryptoCompare, coingecko)
marketManager := market.NewManager(cryptoCompare, coingecko, walletFeed)
reader := NewReader(rpcClient, tokenManager, marketManager, accountsDB, walletFeed)
history := history.NewService(db, walletFeed, rpcClient, tokenManager, marketManager)
currency := currency.NewService(db, walletFeed, tokenManager, marketManager)
collectiblesManager := collectibles.NewManager(rpcClient, nftMetadataProvider, openseaAPIKey)
collectiblesManager := collectibles.NewManager(rpcClient, nftMetadataProvider, openseaAPIKey, walletFeed)
return &Service{
db: db,
accountsDB: accountsDB,
@ -170,39 +171,3 @@ func (s *Service) Protocols() []p2p.Protocol {
func (s *Service) IsStarted() bool {
return s.started
}
func (s *Service) CheckConnected(ctx context.Context) *ConnectedResult {
networks, err := s.rpcClient.NetworkManager.Get(false)
blockchains := make(map[uint64]Connection)
if err == nil {
for _, network := range networks {
ethClient, err := s.rpcClient.EthClient(network.ChainID)
if err != nil {
blockchains[network.ChainID] = Connection{
Up: true,
LastCheckedAt: time.Now().Unix(),
}
}
blockchains[network.ChainID] = Connection{
Up: ethClient.IsConnected,
LastCheckedAt: ethClient.LastCheckedAt,
}
}
}
collectibles := make(map[uint64]Connection)
for chainID, client := range opensea.OpenseaClientInstances {
collectibles[chainID] = Connection{
Up: client.IsConnected,
LastCheckedAt: client.LastCheckedAt,
}
}
return &ConnectedResult{
Blockchains: blockchains,
Collectibles: collectibles,
Market: Connection{
Up: s.marketManager.IsConnected,
LastCheckedAt: s.marketManager.LastCheckedAt,
},
}
}

View File

@ -5,7 +5,6 @@ import (
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strings"
"sync"
"time"
@ -111,7 +110,6 @@ func mapTokensToSymbols(tokens []GeckoToken, tokenMap map[string]GeckoToken) {
}
func (c *Client) getTokens() (map[string]GeckoToken, error) {
c.fetchTokensMutex.Lock()
defer c.fetchTokensMutex.Unlock()
@ -226,16 +224,8 @@ func (c *Client) FetchTokenMarketValues(symbols []string, currency string) (map[
if err != nil {
return nil, err
}
queryParams := url.Values{
"ids": {strings.Join(ids, ",")},
"vs_currency": {currency},
"order": {"market_cap_desc"},
"per_page": {"250"},
"page": {"1"},
"sparkline": {"false"},
"price_change_percentage": {"1h,24h"},
}
url := baseURL + "coins/markets" + queryParams.Encode()
url := fmt.Sprintf("%scoins/markets?ids=%s&vs_currency=%s&order=market_cap_desc&per_page=250&page=1&sparkline=false&price_change_percentage=1h%2C24h", baseURL, strings.Join(ids, ","), currency)
resp, err := c.DoQuery(url)
if err != nil {
return nil, err

View File

@ -12,10 +12,16 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/services/wallet/bigint"
"github.com/status-im/status-go/services/wallet/thirdparty"
"github.com/status-im/status-go/services/wallet/walletevent"
)
const (
EventCollectibleStatusChanged walletevent.EventType = "wallet-collectible-status-changed"
)
const AssetLimit = 200
@ -216,10 +222,11 @@ type Client struct {
IsConnected bool
LastCheckedAt int64
IsConnectedLock sync.RWMutex
feed *event.Feed
}
// new opensea client.
func NewOpenseaClient(chainID uint64, apiKey string) (*Client, error) {
func NewOpenseaClient(chainID uint64, apiKey string, feed *event.Feed) (*Client, error) {
if OpenseaHTTPClient == nil {
OpenseaHTTPClient = newHTTPClient()
}
@ -228,16 +235,16 @@ func NewOpenseaClient(chainID uint64, apiKey string) (*Client, error) {
if chainID == ChainIDRequiringAPIKey {
tmpAPIKey = apiKey
}
if client, ok := OpenseaClientInstances[chainID]; ok {
if client.apiKey == tmpAPIKey {
return client, nil
}
}
baseURL, err := getbaseURL(chainID)
if err != nil {
return nil, err
}
if client, ok := OpenseaClientInstances[chainID]; ok {
if client.apiKey == tmpAPIKey {
return client, nil
}
}
openseaClient := &Client{
client: OpenseaHTTPClient,
@ -245,18 +252,30 @@ func NewOpenseaClient(chainID uint64, apiKey string) (*Client, error) {
apiKey: tmpAPIKey,
IsConnected: true,
LastCheckedAt: time.Now().Unix(),
feed: feed,
}
OpenseaClientInstances[chainID] = openseaClient
return openseaClient, nil
}
func (o *Client) setConnected(value bool) {
func (o *Client) setIsConnected(value bool) {
o.IsConnectedLock.Lock()
defer o.IsConnectedLock.Unlock()
o.IsConnected = value
o.LastCheckedAt = time.Now().Unix()
if value != o.IsConnected {
message := "down"
if value {
message = "up"
}
o.feed.Send(walletevent.Event{
Type: EventCollectibleStatusChanged,
Accounts: []common.Address{},
Message: message,
At: time.Now().Unix(),
})
}
o.IsConnected = value
}
func (o *Client) FetchAllCollectionsByOwner(owner common.Address) ([]OwnedCollection, error) {
offset := 0
var collections []OwnedCollection
@ -264,7 +283,7 @@ func (o *Client) FetchAllCollectionsByOwner(owner common.Address) ([]OwnedCollec
url := fmt.Sprintf("%s/collections?asset_owner=%s&offset=%d&limit=%d", o.url, owner, offset, CollectionLimit)
body, err := o.client.doGetRequest(url, o.apiKey)
if err != nil {
o.setConnected(false)
o.setIsConnected(false)
return nil, err
}
@ -276,7 +295,7 @@ func (o *Client) FetchAllCollectionsByOwner(owner common.Address) ([]OwnedCollec
var tmp []OwnedCollection
err = json.Unmarshal(body, &tmp)
if err != nil {
o.setConnected(false)
o.setIsConnected(false)
return nil, err
}
@ -286,7 +305,7 @@ func (o *Client) FetchAllCollectionsByOwner(owner common.Address) ([]OwnedCollec
break
}
}
o.setConnected(true)
o.setIsConnected(true)
return collections, nil
}
@ -360,7 +379,7 @@ func (o *Client) fetchAssets(queryParams url.Values, limit int) (*AssetContainer
body, err := o.client.doGetRequest(url, o.apiKey)
if err != nil {
o.setConnected(false)
o.setIsConnected(false)
return nil, err
}
@ -372,7 +391,7 @@ func (o *Client) fetchAssets(queryParams url.Values, limit int) (*AssetContainer
container := AssetContainer{}
err = json.Unmarshal(body, &container)
if err != nil {
o.setConnected(false)
o.setIsConnected(false)
return nil, err
}
@ -396,6 +415,6 @@ func (o *Client) fetchAssets(queryParams url.Values, limit int) (*AssetContainer
}
}
o.setConnected(true)
o.setIsConnected(true)
return assets, nil
}

View File

@ -15,4 +15,6 @@ type Event struct {
BlockNumber *big.Int `json:"blockNumber"`
Accounts []common.Address `json:"accounts"`
Message string `json:"message"`
At int64 `json:"at"`
ChainID uint64 `json:"chainId"`
}