diff --git a/services/wallet/transfer/block_ranges_sequential_dao.go b/services/wallet/transfer/block_ranges_sequential_dao.go index 2e94bd49c..4be143b0d 100644 --- a/services/wallet/transfer/block_ranges_sequential_dao.go +++ b/services/wallet/transfer/block_ranges_sequential_dao.go @@ -120,7 +120,7 @@ func (b *BlockRangeSequentialDAO) upsertEthRange(chainID uint64, account common. return err } -func (b *BlockRangeSequentialDAO) upsertTokenRange(chainID uint64, account common.Address, +func (b *BlockRangeSequentialDAO) updateTokenRange(chainID uint64, account common.Address, newBlockRange *BlockRange) (err error) { ethTokensBlockRange, err := b.getBlockRange(chainID, account) @@ -133,13 +133,12 @@ func (b *BlockRangeSequentialDAO) upsertTokenRange(chainID uint64, account commo log.Debug("update tokens blocks range", "account", account, "chainID", chainID, "start", blockRange.Start, "first", blockRange.FirstKnown, "last", blockRange.LastKnown) - upsert, err := b.db.Prepare(`REPLACE INTO blocks_ranges_sequential - (network_id, address, token_blk_start, token_blk_first, token_blk_last) VALUES (?, ?, ?, ?, ?)`) + update, err := b.db.Prepare(`UPDATE blocks_ranges_sequential SET token_blk_start = ?, token_blk_first = ?, token_blk_last = ? WHERE network_id = ? AND address = ?`) if err != nil { return err } - _, err = upsert.Exec(chainID, account, (*bigint.SQLBigInt)(blockRange.Start), (*bigint.SQLBigInt)(blockRange.FirstKnown), + _, err = update.Exec(chainID, account, (*bigint.SQLBigInt)(blockRange.Start), (*bigint.SQLBigInt)(blockRange.FirstKnown), (*bigint.SQLBigInt)(blockRange.LastKnown)) return err diff --git a/services/wallet/transfer/commands_sequential.go b/services/wallet/transfer/commands_sequential.go index 495d8bbeb..8e296c73f 100644 --- a/services/wallet/transfer/commands_sequential.go +++ b/services/wallet/transfer/commands_sequential.go @@ -126,7 +126,7 @@ func (c *findNewBlocksCommand) markTokenBlockRangeChecked(accounts []common.Addr log.Debug("markTokenBlockRangeChecked", "chain", c.chainClient.NetworkID(), "from", from.Uint64(), "to", to.Uint64()) for _, account := range accounts { - err := c.blockRangeDAO.upsertTokenRange(c.chainClient.NetworkID(), account, &BlockRange{LastKnown: to}) + err := c.blockRangeDAO.updateTokenRange(c.chainClient.NetworkID(), account, &BlockRange{LastKnown: to}) if err != nil { c.error = err log.Error("findNewBlocksCommand upsertTokenRange", "error", err) diff --git a/services/wallet/transfer/commands_sequential_test.go b/services/wallet/transfer/commands_sequential_test.go index 7a7a3539d..7053c74f6 100644 --- a/services/wallet/transfer/commands_sequential_test.go +++ b/services/wallet/transfer/commands_sequential_test.go @@ -87,6 +87,12 @@ func (tc *TestClient) printCounter() { tc.t.Log("=========================================") } +func (tc *TestClient) resetCounter() { + tc.rw.Lock() + defer tc.rw.Unlock() + tc.callsCounter = map[string]int{} +} + func (tc *TestClient) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error { if tc.traceAPICalls { tc.t.Log("BatchCallContext") @@ -1109,3 +1115,225 @@ func TestFetchTransfersForLoadedBlocks(t *testing.T) { require.Equal(t, 1, tc.getCounter()) } } + +func getNewBlocksCases() []findBlockCase { + cases := []findBlockCase{ + findBlockCase{ + balanceChanges: [][]int{ + {20, 1, 0}, + }, + fromBlock: 0, + toBlock: 10, + expectedBlocksFound: 0, + label: "single block, but not in range", + }, + findBlockCase{ + balanceChanges: [][]int{ + {20, 1, 0}, + }, + fromBlock: 10, + toBlock: 20, + expectedBlocksFound: 1, + label: "single block in range", + }, + } + + return cases +} + +func TestFetchNewBlocksCommand_findBlocksWithEthTransfers(t *testing.T) { + appdb, err := helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{}) + require.NoError(t, err) + + db, err := helpers.SetupTestMemorySQLDB(walletdatabase.DbInitializer{}) + require.NoError(t, err) + tm := &TransactionManager{db, nil, nil, nil, nil, nil, nil, nil, nil, nil} + + wdb := NewDB(db) + blockChannel := make(chan []*DBHeader, 10) + + address := common.HexToAddress("0x1234") + accDB, err := accounts.NewDB(wdb.client) + require.NoError(t, err) + + for idx, testCase := range getNewBlocksCases() { + t.Log("case #", idx+1) + tc := &TestClient{ + t: t, + balances: testCase.balanceChanges, + outgoingERC20Transfers: []testERC20Transfer{}, + incomingERC20Transfers: []testERC20Transfer{}, + callsCounter: map[string]int{}, + currentBlock: 100, + } + + client, _ := statusRpc.NewClient(nil, 1, params.UpstreamRPCConfig{Enabled: false, URL: ""}, []params.Network{}, db) + client.SetClient(tc.NetworkID(), tc) + tokenManager := token.NewTokenManager(db, client, network.NewManager(appdb)) + + tokenManager.SetTokens([]*token.Token{ + { + Address: tokenTXXAddress, + Symbol: "TXX", + Decimals: 18, + ChainID: tc.NetworkID(), + Name: "Test Token 1", + Verified: true, + }, + { + Address: tokenTXYAddress, + Symbol: "TXY", + Decimals: 18, + ChainID: tc.NetworkID(), + Name: "Test Token 2", + Verified: true, + }, + }) + + cmd := &findNewBlocksCommand{ + findBlocksCommand: &findBlocksCommand{ + accounts: []common.Address{address}, + db: wdb, + accountsDB: accDB, + blockRangeDAO: &BlockRangeSequentialDAO{wdb.client}, + chainClient: tc, + balanceCacher: balance.NewCacherWithTTL(5 * time.Minute), + feed: &event.Feed{}, + noLimit: false, + transactionManager: tm, + tokenManager: tokenManager, + blocksLoadedCh: blockChannel, + defaultNodeBlockChunkSize: DefaultNodeBlockChunkSize, + }, + } + tc.prepareBalanceHistory(int(tc.currentBlock)) + tc.prepareTokenBalanceHistory(int(tc.currentBlock)) + + ctx := context.Background() + blocks, _, err := cmd.findBlocksWithEthTransfers(ctx, address, big.NewInt(testCase.fromBlock), big.NewInt(testCase.toBlock)) + require.NoError(t, err) + require.Equal(t, testCase.expectedBlocksFound, len(blocks), fmt.Sprintf("case %d: %s, blocks from %d to %d", idx+1, testCase.label, testCase.fromBlock, testCase.toBlock)) + } +} + +func TestFetchNewBlocksCommand(t *testing.T) { + appdb, err := helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{}) + require.NoError(t, err) + + db, err := helpers.SetupTestMemorySQLDB(walletdatabase.DbInitializer{}) + require.NoError(t, err) + tm := &TransactionManager{db, nil, nil, nil, nil, nil, nil, nil, nil, nil} + + wdb := NewDB(db) + blockChannel := make(chan []*DBHeader, 10) + + address1 := common.HexToAddress("0x1234") + address2 := common.HexToAddress("0x5678") + accDB, err := accounts.NewDB(wdb.client) + require.NoError(t, err) + + tc := &TestClient{ + t: t, + balances: [][]int{}, + outgoingERC20Transfers: []testERC20Transfer{}, + incomingERC20Transfers: []testERC20Transfer{}, + callsCounter: map[string]int{}, + currentBlock: 1, + } + + client, _ := statusRpc.NewClient(nil, 1, params.UpstreamRPCConfig{Enabled: false, URL: ""}, []params.Network{}, db) + client.SetClient(tc.NetworkID(), tc) + tokenManager := token.NewTokenManager(db, client, network.NewManager(appdb)) + + tokenManager.SetTokens([]*token.Token{ + { + Address: tokenTXXAddress, + Symbol: "TXX", + Decimals: 18, + ChainID: tc.NetworkID(), + Name: "Test Token 1", + Verified: true, + }, + { + Address: tokenTXYAddress, + Symbol: "TXY", + Decimals: 18, + ChainID: tc.NetworkID(), + Name: "Test Token 2", + Verified: true, + }, + }) + + cmd := &findNewBlocksCommand{ + findBlocksCommand: &findBlocksCommand{ + accounts: []common.Address{address1, address2}, + db: wdb, + accountsDB: accDB, + blockRangeDAO: &BlockRangeSequentialDAO{wdb.client}, + chainClient: tc, + balanceCacher: balance.NewCacherWithTTL(5 * time.Minute), + feed: &event.Feed{}, + noLimit: false, + fromBlockNumber: big.NewInt(int64(tc.currentBlock)), + transactionManager: tm, + tokenManager: tokenManager, + blocksLoadedCh: blockChannel, + defaultNodeBlockChunkSize: DefaultNodeBlockChunkSize, + }, + } + + ctx := context.Background() + + // I don't prepare lots of data and a loop, as I just need to verify a few cases + + // Verify that cmd.fromBlockNumber stays the same + tc.prepareBalanceHistory(int(tc.currentBlock)) + tc.prepareTokenBalanceHistory(int(tc.currentBlock)) + err = cmd.Run(ctx) + require.NoError(t, err) + require.Equal(t, uint64(1), cmd.fromBlockNumber.Uint64()) + + // Verify that cmd.fromBlockNumber is incremented, equal to the head block number + tc.currentBlock = 2 // this is the head block number that will be returned by the mock client + tc.prepareBalanceHistory(int(tc.currentBlock)) + tc.prepareTokenBalanceHistory(int(tc.currentBlock)) + err = cmd.Run(ctx) + require.NoError(t, err) + require.Equal(t, tc.currentBlock, cmd.fromBlockNumber.Uint64()) + + // Verify that blocks are found and cmd.fromBlockNumber is incremented + tc.resetCounter() + tc.currentBlock = 3 + tc.balances = [][]int{ + {3, 1, 0}, + } + tc.incomingERC20Transfers = []testERC20Transfer{ + {big.NewInt(3), tokenTXXAddress, big.NewInt(1), walletcommon.Erc20TransferEventType}, + } + tc.prepareBalanceHistory(int(tc.currentBlock)) + tc.prepareTokenBalanceHistory(int(tc.currentBlock)) + + group := async.NewGroup(ctx) + group.Add(cmd.Command()) // This is an infinite command, I can't use WaitAsync() here to wait for it to finish + + expectedBlocksNumber := 3 // ETH block is found twice for each account as we don't handle addresses in MockClient. A block with ERC20 transfer is found once + blocksFound := 0 + stop := false + for stop == false { + select { + case <-ctx.Done(): + require.Fail(t, "context done") + stop = true + case <-blockChannel: + blocksFound++ + case <-time.After(100 * time.Millisecond): + stop = true + } + } + group.Stop() + group.Wait() + require.Equal(t, expectedBlocksNumber, blocksFound) + require.Equal(t, tc.currentBlock, cmd.fromBlockNumber.Uint64()) + // We must check all the logs for all accounts with a single iteration of eth_getLogs call + require.Equal(t, 3, tc.callsCounter["FilterLogs"], "calls to FilterLogs") +}