feat(wallet)_: added group tag for RPC chain client

It is needed to be able to set common limits for chain client
Added a test for group tag limiter
Added a mutex to RPC limiter to change counters atomically
Replaced isConnected with atomic.Bool and made it a pointer to
be shared across client instances
This commit is contained in:
Ivan Belyakov 2024-05-21 19:40:37 +02:00 committed by IvanBelyakoff
parent bf7aabfa3e
commit cdf80b5300
4 changed files with 104 additions and 24 deletions

View File

@ -5,7 +5,7 @@ import (
"fmt" "fmt"
"math/big" "math/big"
"strings" "strings"
"sync" "sync/atomic"
"time" "time"
"github.com/afex/hystrix-go/hystrix" "github.com/afex/hystrix-go/hystrix"
@ -62,6 +62,8 @@ type ClientInterface interface {
type Tagger interface { type Tagger interface {
Tag() string Tag() string
SetTag(tag string) SetTag(tag string)
GroupTag() string
SetGroupTag(tag string)
DeepCopyTag() Tagger DeepCopyTag() Tagger
} }
@ -69,12 +71,15 @@ func DeepCopyTagger(t Tagger) Tagger {
return t.DeepCopyTag() return t.DeepCopyTag()
} }
// Shallow copy of the client with a deep copy of tag // Shallow copy of the client with a deep copy of tag and group tag
func ClientWithTag(chainClient ClientInterface, tag string) ClientInterface { // To avoid passing tags as parameter to every chain call, it is sufficient for now
// to set the tag and group tag once on the client
func ClientWithTag(chainClient ClientInterface, tag, groupTag string) ClientInterface {
newClient := chainClient newClient := chainClient
if tagIface, ok := chainClient.(Tagger); ok { if tagIface, ok := chainClient.(Tagger); ok {
tagIface = DeepCopyTagger(tagIface) tagIface = DeepCopyTagger(tagIface)
tagIface.SetTag(tag) tagIface.SetTag(tag)
tagIface.SetGroupTag(tag)
newClient = tagIface.(ClientInterface) newClient = tagIface.(ClientInterface)
} }
@ -94,12 +99,12 @@ type ClientWithFallback struct {
WalletNotifier func(chainId uint64, message string) WalletNotifier func(chainId uint64, message string)
isConnected bool isConnected *atomic.Bool
isConnectedLock sync.RWMutex
LastCheckedAt int64 LastCheckedAt int64
circuitBreakerCmdName string circuitBreakerCmdName string
tag string tag string // tag for the limiter
groupTag string // tag for the limiter group
} }
// Don't mark connection as failed if we get one of these errors // Don't mark connection as failed if we get one of these errors
@ -137,6 +142,8 @@ func NewSimpleClient(mainLimiter *RPCRpsLimiter, main *rpc.Client, chainID uint6
ErrorPercentThreshold: 25, ErrorPercentThreshold: 25,
}) })
isConnected := &atomic.Bool{}
isConnected.Store(true)
return &ClientWithFallback{ return &ClientWithFallback{
ChainID: chainID, ChainID: chainID,
main: ethclient.NewClient(main), main: ethclient.NewClient(main),
@ -145,7 +152,7 @@ func NewSimpleClient(mainLimiter *RPCRpsLimiter, main *rpc.Client, chainID uint6
fallbackLimiter: nil, fallbackLimiter: nil,
mainRPC: main, mainRPC: main,
fallbackRPC: nil, fallbackRPC: nil,
isConnected: true, isConnected: isConnected,
LastCheckedAt: time.Now().Unix(), LastCheckedAt: time.Now().Unix(),
circuitBreakerCmdName: circuitBreakerCmdName, circuitBreakerCmdName: circuitBreakerCmdName,
} }
@ -164,6 +171,9 @@ func NewClient(mainLimiter *RPCRpsLimiter, main *rpc.Client, fallbackLimiter *RP
if fallback != nil { if fallback != nil {
fallbackEthClient = ethclient.NewClient(fallback) fallbackEthClient = ethclient.NewClient(fallback)
} }
isConnected := &atomic.Bool{}
isConnected.Store(true)
return &ClientWithFallback{ return &ClientWithFallback{
ChainID: chainID, ChainID: chainID,
main: ethclient.NewClient(main), main: ethclient.NewClient(main),
@ -172,7 +182,7 @@ func NewClient(mainLimiter *RPCRpsLimiter, main *rpc.Client, fallbackLimiter *RP
fallbackLimiter: fallbackLimiter, fallbackLimiter: fallbackLimiter,
mainRPC: main, mainRPC: main,
fallbackRPC: fallback, fallbackRPC: fallback,
isConnected: true, isConnected: isConnected,
LastCheckedAt: time.Now().Unix(), LastCheckedAt: time.Now().Unix(),
circuitBreakerCmdName: circuitBreakerCmdName, circuitBreakerCmdName: circuitBreakerCmdName,
} }
@ -208,20 +218,18 @@ func isRPSLimitError(err error) bool {
} }
func (c *ClientWithFallback) SetIsConnected(value bool) { func (c *ClientWithFallback) SetIsConnected(value bool) {
c.isConnectedLock.Lock()
defer c.isConnectedLock.Unlock()
c.LastCheckedAt = time.Now().Unix() c.LastCheckedAt = time.Now().Unix()
if !value { if !value {
if c.isConnected { if c.isConnected.Load() {
if c.WalletNotifier != nil { if c.WalletNotifier != nil {
c.WalletNotifier(c.ChainID, "down") c.WalletNotifier(c.ChainID, "down")
} }
c.isConnected = false c.isConnected.Store(false)
} }
} else { } else {
if !c.isConnected { if !c.isConnected.Load() {
c.isConnected = true c.isConnected.Store(true)
if c.WalletNotifier != nil { if c.WalletNotifier != nil {
c.WalletNotifier(c.ChainID, "up") c.WalletNotifier(c.ChainID, "up")
} }
@ -230,9 +238,7 @@ func (c *ClientWithFallback) SetIsConnected(value bool) {
} }
func (c *ClientWithFallback) IsConnected() bool { func (c *ClientWithFallback) IsConnected() bool {
c.isConnectedLock.RLock() return c.isConnected.Load()
defer c.isConnectedLock.RUnlock()
return c.isConnected
} }
func (c *ClientWithFallback) makeCall(ctx context.Context, main func() ([]any, error), fallback func() ([]any, error)) ([]any, error) { func (c *ClientWithFallback) makeCall(ctx context.Context, main func() ([]any, error), fallback func() ([]any, error)) ([]any, error) {
@ -240,6 +246,10 @@ func (c *ClientWithFallback) makeCall(ctx context.Context, main func() ([]any, e
if allow, err := c.commonLimiter.Allow(c.tag); !allow { if allow, err := c.commonLimiter.Allow(c.tag); !allow {
return nil, fmt.Errorf("tag=%s, %w", c.tag, err) return nil, fmt.Errorf("tag=%s, %w", c.tag, err)
} }
if allow, err := c.commonLimiter.Allow(c.groupTag); !allow {
return nil, fmt.Errorf("groupTag=%s, %w", c.groupTag, err)
}
} }
resultChan := make(chan CommandResult, 1) resultChan := make(chan CommandResult, 1)
@ -1010,6 +1020,14 @@ func (c *ClientWithFallback) SetTag(tag string) {
c.tag = tag c.tag = tag
} }
func (c *ClientWithFallback) GroupTag() string {
return c.groupTag
}
func (c *ClientWithFallback) SetGroupTag(tag string) {
c.groupTag = tag
}
func (c *ClientWithFallback) DeepCopyTag() Tagger { func (c *ClientWithFallback) DeepCopyTag() Tagger {
copy := *c copy := *c
return &copy return &copy

View File

@ -70,6 +70,7 @@ type RequestLimiter interface {
type RPCRequestLimiter struct { type RPCRequestLimiter struct {
storage RequestsStorage storage RequestsStorage
mu sync.Mutex
} }
func NewRequestLimiter(storage RequestsStorage) *RPCRequestLimiter { func NewRequestLimiter(storage RequestsStorage) *RPCRequestLimiter {
@ -116,8 +117,10 @@ func (rl *RPCRequestLimiter) saveToStorage(tag string, maxRequests int, interval
} }
func (rl *RPCRequestLimiter) Allow(tag string) (bool, error) { func (rl *RPCRequestLimiter) Allow(tag string) (bool, error) {
rl.mu.Lock()
defer rl.mu.Unlock()
data, err := rl.storage.Get(tag) data, err := rl.storage.Get(tag)
log.Info("Allow", "data", data)
if err != nil { if err != nil {
return true, err return true, err
} }

View File

@ -30,6 +30,7 @@ const (
newTransferHistoryTag = "new_transfer_history" newTransferHistoryTag = "new_transfer_history"
transferHistoryLimit = 10000 transferHistoryLimit = 10000
transferHistoryLimitPerAccount = 5000
transferHistoryLimitPeriod = 24 * time.Hour transferHistoryLimitPeriod = 24 * time.Hour
) )
@ -1121,8 +1122,12 @@ func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocksForAccount(group *asyn
for _, rangeItem := range ranges { for _, rangeItem := range ranges {
log.Debug("range item", "r", rangeItem, "n", c.chainClient.NetworkID(), "a", account) log.Debug("range item", "r", rangeItem, "n", c.chainClient.NetworkID(), "a", account)
chainClient := chain.ClientWithTag(c.chainClient, transferHistoryTag) // Each account has its own limit and a global limit for all accounts
limiter := chain.NewRequestLimiter(chain.NewInMemRequestsMapStorage()) accountTag := transferHistoryTag + "_" + account.String()
chainClient := chain.ClientWithTag(c.chainClient, accountTag, transferHistoryTag)
storage := chain.NewInMemRequestsMapStorage()
limiter := chain.NewRequestLimiter(storage)
limiter.SetLimit(accountTag, transferHistoryLimitPerAccount, transferHistoryLimitPeriod)
limiter.SetLimit(transferHistoryTag, transferHistoryLimit, transferHistoryLimitPeriod) limiter.SetLimit(transferHistoryTag, transferHistoryLimit, transferHistoryLimitPeriod)
chainClient.SetLimiter(limiter) chainClient.SetLimiter(limiter)

View File

@ -65,6 +65,8 @@ type TestClient struct {
callsCounter map[string]int callsCounter map[string]int
currentBlock uint64 currentBlock uint64
limiter chain.RequestLimiter limiter chain.RequestLimiter
tag string
groupTag string
} }
var countAndlog = func(tc *TestClient, method string, params ...interface{}) error { var countAndlog = func(tc *TestClient, method string, params ...interface{}) error {
@ -1047,10 +1049,14 @@ func setupFindBlocksCommand(t *testing.T, accountAddress common.Address, fromBlo
// Reimplement the common function that is called from every method to check for the limit // Reimplement the common function that is called from every method to check for the limit
countAndlog = func(tc *TestClient, method string, params ...interface{}) error { countAndlog = func(tc *TestClient, method string, params ...interface{}) error {
if tc.GetLimiter() != nil { if tc.GetLimiter() != nil {
if allow, _ := tc.GetLimiter().Allow(transferHistoryTag); !allow { if allow, _ := tc.GetLimiter().Allow(tc.tag); !allow {
t.Log("ERROR: requests over limit") t.Log("ERROR: requests over limit")
return chain.ErrRequestsOverLimit return chain.ErrRequestsOverLimit
} }
if allow, _ := tc.GetLimiter().Allow(tc.groupTag); !allow {
t.Log("ERROR: requests over limit for group tag")
return chain.ErrRequestsOverLimit
}
} }
tc.incCounter(method) tc.incCounter(method)
@ -1224,6 +1230,54 @@ func TestFindBlocksCommandWithLimiterTagDifferentThanTransfers(t *testing.T) {
} }
} }
func TestFindBlocksCommandWithLimiterForMultipleAccountsSameGroup(t *testing.T) {
rangeSize := 20
maxRequestsTotal := 5
limit1 := 3
limit2 := 3
account1 := common.HexToAddress("0x1234")
account2 := common.HexToAddress("0x5678")
balances := map[common.Address][][]int{account1: {{5, 1, 0}, {20, 2, 0}, {45, 1, 1}, {46, 50, 0}, {75, 0, 1}}, account2: {{5, 1, 0}, {20, 2, 0}, {45, 1, 1}, {46, 50, 0}, {75, 0, 1}}}
outgoingERC20Transfers := map[common.Address][]testERC20Transfer{account1: {{big.NewInt(6), tokenTXXAddress, big.NewInt(1), walletcommon.Erc20TransferEventType}}}
incomingERC20Transfers := map[common.Address][]testERC20Transfer{account2: {{big.NewInt(6), tokenTXXAddress, big.NewInt(1), walletcommon.Erc20TransferEventType}}}
// Limiters share the same storage
storage := chain.NewInMemRequestsMapStorage()
// Set up the first account
fbc, tc, blockChannel, _ := setupFindBlocksCommand(t, account1, big.NewInt(0), big.NewInt(20), rangeSize, balances, outgoingERC20Transfers, nil, nil, nil)
tc.tag = transferHistoryTag + account1.String()
tc.groupTag = transferHistoryTag
limiter1 := chain.NewRequestLimiter(storage)
limiter1.SetLimit(transferHistoryTag, maxRequestsTotal, time.Hour)
limiter1.SetLimit(transferHistoryTag+account1.String(), limit1, time.Hour)
tc.SetLimiter(limiter1)
// Set up the second account
fbc2, tc2, _, _ := setupFindBlocksCommand(t, account2, big.NewInt(0), big.NewInt(20), rangeSize, balances, nil, incomingERC20Transfers, nil, nil)
tc2.tag = transferHistoryTag + account2.String()
tc2.groupTag = transferHistoryTag
limiter2 := chain.NewRequestLimiter(storage)
limiter2.SetLimit(transferHistoryTag, maxRequestsTotal, time.Hour)
limiter2.SetLimit(transferHistoryTag+account2.String(), limit2, time.Hour)
tc2.SetLimiter(limiter2)
fbc2.blocksLoadedCh = blockChannel
ctx := context.Background()
group := async.NewGroup(ctx)
group.Add(fbc.Command(1 * time.Millisecond))
group.Add(fbc2.Command(1 * time.Millisecond))
select {
case <-ctx.Done():
t.Log("ERROR")
case <-group.WaitAsync():
close(blockChannel)
require.LessOrEqual(t, tc.getCounter(), maxRequestsTotal)
}
}
type MockETHClient struct { type MockETHClient struct {
mock.Mock mock.Mock
} }