From cdf80b53005e8cb1edb33c52f8ea92af43034211 Mon Sep 17 00:00:00 2001 From: Ivan Belyakov Date: Tue, 21 May 2024 19:40:37 +0200 Subject: [PATCH] feat(wallet)_: added group tag for RPC chain client It is needed to be able to set common limits for chain client Added a test for group tag limiter Added a mutex to RPC limiter to change counters atomically Replaced isConnected with atomic.Bool and made it a pointer to be shared across client instances --- rpc/chain/client.go | 54 ++++++++++++------ rpc/chain/rpc_limiter.go | 5 +- .../wallet/transfer/commands_sequential.go | 13 +++-- .../transfer/commands_sequential_test.go | 56 ++++++++++++++++++- 4 files changed, 104 insertions(+), 24 deletions(-) diff --git a/rpc/chain/client.go b/rpc/chain/client.go index fa807bc7a..29ebf8f0b 100644 --- a/rpc/chain/client.go +++ b/rpc/chain/client.go @@ -5,7 +5,7 @@ import ( "fmt" "math/big" "strings" - "sync" + "sync/atomic" "time" "github.com/afex/hystrix-go/hystrix" @@ -62,6 +62,8 @@ type ClientInterface interface { type Tagger interface { Tag() string SetTag(tag string) + GroupTag() string + SetGroupTag(tag string) DeepCopyTag() Tagger } @@ -69,12 +71,15 @@ func DeepCopyTagger(t Tagger) Tagger { return t.DeepCopyTag() } -// Shallow copy of the client with a deep copy of tag -func ClientWithTag(chainClient ClientInterface, tag string) ClientInterface { +// Shallow copy of the client with a deep copy of tag and group tag +// To avoid passing tags as parameter to every chain call, it is sufficient for now +// to set the tag and group tag once on the client +func ClientWithTag(chainClient ClientInterface, tag, groupTag string) ClientInterface { newClient := chainClient if tagIface, ok := chainClient.(Tagger); ok { tagIface = DeepCopyTagger(tagIface) tagIface.SetTag(tag) + tagIface.SetGroupTag(tag) newClient = tagIface.(ClientInterface) } @@ -94,12 +99,12 @@ type ClientWithFallback struct { WalletNotifier func(chainId uint64, message string) - isConnected bool - isConnectedLock sync.RWMutex - LastCheckedAt int64 + isConnected *atomic.Bool + LastCheckedAt int64 circuitBreakerCmdName string - tag string + tag string // tag for the limiter + groupTag string // tag for the limiter group } // Don't mark connection as failed if we get one of these errors @@ -137,6 +142,8 @@ func NewSimpleClient(mainLimiter *RPCRpsLimiter, main *rpc.Client, chainID uint6 ErrorPercentThreshold: 25, }) + isConnected := &atomic.Bool{} + isConnected.Store(true) return &ClientWithFallback{ ChainID: chainID, main: ethclient.NewClient(main), @@ -145,7 +152,7 @@ func NewSimpleClient(mainLimiter *RPCRpsLimiter, main *rpc.Client, chainID uint6 fallbackLimiter: nil, mainRPC: main, fallbackRPC: nil, - isConnected: true, + isConnected: isConnected, LastCheckedAt: time.Now().Unix(), circuitBreakerCmdName: circuitBreakerCmdName, } @@ -164,6 +171,9 @@ func NewClient(mainLimiter *RPCRpsLimiter, main *rpc.Client, fallbackLimiter *RP if fallback != nil { fallbackEthClient = ethclient.NewClient(fallback) } + isConnected := &atomic.Bool{} + isConnected.Store(true) + return &ClientWithFallback{ ChainID: chainID, main: ethclient.NewClient(main), @@ -172,7 +182,7 @@ func NewClient(mainLimiter *RPCRpsLimiter, main *rpc.Client, fallbackLimiter *RP fallbackLimiter: fallbackLimiter, mainRPC: main, fallbackRPC: fallback, - isConnected: true, + isConnected: isConnected, LastCheckedAt: time.Now().Unix(), circuitBreakerCmdName: circuitBreakerCmdName, } @@ -208,20 +218,18 @@ func isRPSLimitError(err error) bool { } func (c *ClientWithFallback) SetIsConnected(value bool) { - c.isConnectedLock.Lock() - defer c.isConnectedLock.Unlock() c.LastCheckedAt = time.Now().Unix() if !value { - if c.isConnected { + if c.isConnected.Load() { if c.WalletNotifier != nil { c.WalletNotifier(c.ChainID, "down") } - c.isConnected = false + c.isConnected.Store(false) } } else { - if !c.isConnected { - c.isConnected = true + if !c.isConnected.Load() { + c.isConnected.Store(true) if c.WalletNotifier != nil { c.WalletNotifier(c.ChainID, "up") } @@ -230,9 +238,7 @@ func (c *ClientWithFallback) SetIsConnected(value bool) { } func (c *ClientWithFallback) IsConnected() bool { - c.isConnectedLock.RLock() - defer c.isConnectedLock.RUnlock() - return c.isConnected + return c.isConnected.Load() } func (c *ClientWithFallback) makeCall(ctx context.Context, main func() ([]any, error), fallback func() ([]any, error)) ([]any, error) { @@ -240,6 +246,10 @@ func (c *ClientWithFallback) makeCall(ctx context.Context, main func() ([]any, e if allow, err := c.commonLimiter.Allow(c.tag); !allow { return nil, fmt.Errorf("tag=%s, %w", c.tag, err) } + + if allow, err := c.commonLimiter.Allow(c.groupTag); !allow { + return nil, fmt.Errorf("groupTag=%s, %w", c.groupTag, err) + } } resultChan := make(chan CommandResult, 1) @@ -1010,6 +1020,14 @@ func (c *ClientWithFallback) SetTag(tag string) { c.tag = tag } +func (c *ClientWithFallback) GroupTag() string { + return c.groupTag +} + +func (c *ClientWithFallback) SetGroupTag(tag string) { + c.groupTag = tag +} + func (c *ClientWithFallback) DeepCopyTag() Tagger { copy := *c return © diff --git a/rpc/chain/rpc_limiter.go b/rpc/chain/rpc_limiter.go index 1c5cabeae..f08a0ff89 100644 --- a/rpc/chain/rpc_limiter.go +++ b/rpc/chain/rpc_limiter.go @@ -70,6 +70,7 @@ type RequestLimiter interface { type RPCRequestLimiter struct { storage RequestsStorage + mu sync.Mutex } func NewRequestLimiter(storage RequestsStorage) *RPCRequestLimiter { @@ -116,8 +117,10 @@ func (rl *RPCRequestLimiter) saveToStorage(tag string, maxRequests int, interval } func (rl *RPCRequestLimiter) Allow(tag string) (bool, error) { + rl.mu.Lock() + defer rl.mu.Unlock() + data, err := rl.storage.Get(tag) - log.Info("Allow", "data", data) if err != nil { return true, err } diff --git a/services/wallet/transfer/commands_sequential.go b/services/wallet/transfer/commands_sequential.go index 70842339b..9ea06c059 100644 --- a/services/wallet/transfer/commands_sequential.go +++ b/services/wallet/transfer/commands_sequential.go @@ -29,8 +29,9 @@ const ( transferHistoryTag = "transfer_history" newTransferHistoryTag = "new_transfer_history" - transferHistoryLimit = 10000 - transferHistoryLimitPeriod = 24 * time.Hour + transferHistoryLimit = 10000 + transferHistoryLimitPerAccount = 5000 + transferHistoryLimitPeriod = 24 * time.Hour ) type nonceInfo struct { @@ -1121,8 +1122,12 @@ func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocksForAccount(group *asyn for _, rangeItem := range ranges { log.Debug("range item", "r", rangeItem, "n", c.chainClient.NetworkID(), "a", account) - chainClient := chain.ClientWithTag(c.chainClient, transferHistoryTag) - limiter := chain.NewRequestLimiter(chain.NewInMemRequestsMapStorage()) + // Each account has its own limit and a global limit for all accounts + accountTag := transferHistoryTag + "_" + account.String() + chainClient := chain.ClientWithTag(c.chainClient, accountTag, transferHistoryTag) + storage := chain.NewInMemRequestsMapStorage() + limiter := chain.NewRequestLimiter(storage) + limiter.SetLimit(accountTag, transferHistoryLimitPerAccount, transferHistoryLimitPeriod) limiter.SetLimit(transferHistoryTag, transferHistoryLimit, transferHistoryLimitPeriod) chainClient.SetLimiter(limiter) diff --git a/services/wallet/transfer/commands_sequential_test.go b/services/wallet/transfer/commands_sequential_test.go index 7b1e47893..d999a1389 100644 --- a/services/wallet/transfer/commands_sequential_test.go +++ b/services/wallet/transfer/commands_sequential_test.go @@ -65,6 +65,8 @@ type TestClient struct { callsCounter map[string]int currentBlock uint64 limiter chain.RequestLimiter + tag string + groupTag string } var countAndlog = func(tc *TestClient, method string, params ...interface{}) error { @@ -1047,10 +1049,14 @@ func setupFindBlocksCommand(t *testing.T, accountAddress common.Address, fromBlo // Reimplement the common function that is called from every method to check for the limit countAndlog = func(tc *TestClient, method string, params ...interface{}) error { if tc.GetLimiter() != nil { - if allow, _ := tc.GetLimiter().Allow(transferHistoryTag); !allow { + if allow, _ := tc.GetLimiter().Allow(tc.tag); !allow { t.Log("ERROR: requests over limit") return chain.ErrRequestsOverLimit } + if allow, _ := tc.GetLimiter().Allow(tc.groupTag); !allow { + t.Log("ERROR: requests over limit for group tag") + return chain.ErrRequestsOverLimit + } } tc.incCounter(method) @@ -1224,6 +1230,54 @@ func TestFindBlocksCommandWithLimiterTagDifferentThanTransfers(t *testing.T) { } } +func TestFindBlocksCommandWithLimiterForMultipleAccountsSameGroup(t *testing.T) { + rangeSize := 20 + maxRequestsTotal := 5 + limit1 := 3 + limit2 := 3 + account1 := common.HexToAddress("0x1234") + account2 := common.HexToAddress("0x5678") + balances := map[common.Address][][]int{account1: {{5, 1, 0}, {20, 2, 0}, {45, 1, 1}, {46, 50, 0}, {75, 0, 1}}, account2: {{5, 1, 0}, {20, 2, 0}, {45, 1, 1}, {46, 50, 0}, {75, 0, 1}}} + outgoingERC20Transfers := map[common.Address][]testERC20Transfer{account1: {{big.NewInt(6), tokenTXXAddress, big.NewInt(1), walletcommon.Erc20TransferEventType}}} + incomingERC20Transfers := map[common.Address][]testERC20Transfer{account2: {{big.NewInt(6), tokenTXXAddress, big.NewInt(1), walletcommon.Erc20TransferEventType}}} + + // Limiters share the same storage + storage := chain.NewInMemRequestsMapStorage() + + // Set up the first account + fbc, tc, blockChannel, _ := setupFindBlocksCommand(t, account1, big.NewInt(0), big.NewInt(20), rangeSize, balances, outgoingERC20Transfers, nil, nil, nil) + tc.tag = transferHistoryTag + account1.String() + tc.groupTag = transferHistoryTag + + limiter1 := chain.NewRequestLimiter(storage) + limiter1.SetLimit(transferHistoryTag, maxRequestsTotal, time.Hour) + limiter1.SetLimit(transferHistoryTag+account1.String(), limit1, time.Hour) + tc.SetLimiter(limiter1) + + // Set up the second account + fbc2, tc2, _, _ := setupFindBlocksCommand(t, account2, big.NewInt(0), big.NewInt(20), rangeSize, balances, nil, incomingERC20Transfers, nil, nil) + tc2.tag = transferHistoryTag + account2.String() + tc2.groupTag = transferHistoryTag + limiter2 := chain.NewRequestLimiter(storage) + limiter2.SetLimit(transferHistoryTag, maxRequestsTotal, time.Hour) + limiter2.SetLimit(transferHistoryTag+account2.String(), limit2, time.Hour) + tc2.SetLimiter(limiter2) + fbc2.blocksLoadedCh = blockChannel + + ctx := context.Background() + group := async.NewGroup(ctx) + group.Add(fbc.Command(1 * time.Millisecond)) + group.Add(fbc2.Command(1 * time.Millisecond)) + + select { + case <-ctx.Done(): + t.Log("ERROR") + case <-group.WaitAsync(): + close(blockChannel) + require.LessOrEqual(t, tc.getCounter(), maxRequestsTotal) + } +} + type MockETHClient struct { mock.Mock }