feat(wallet) deliver pending changed events to clients

Add wallet events feed to TransactionManager and send pending changed
events on add and delete

Also

- Remove hardcoded values in the filter query
- Small improvement to query

Updates status-desktop #11233
This commit is contained in:
Stefan 2023-07-05 13:36:23 +01:00 committed by Stefan Dunca
parent cd1ba1abaa
commit 9a30674590
4 changed files with 71 additions and 15 deletions

View File

@ -322,8 +322,8 @@ const (
WHEN to_join.address IS NOT NULL AND from_join.address IS NULL THEN toTrType
WHEN from_join.address IS NOT NULL AND to_join.address IS NOT NULL THEN
CASE
WHEN from_join.address < to_join.address THEN 1
ELSE 2
WHEN from_join.address < to_join.address THEN fromTrType
ELSE toTrType
END
ELSE NULL
END as tr_type,
@ -391,12 +391,12 @@ const (
NULL AS mt_type,
CASE
WHEN from_join.address IS NOT NULL AND to_join.address IS NULL THEN 1
WHEN to_join.address IS NOT NULL AND from_join.address IS NULL THEN 2
WHEN from_join.address IS NOT NULL AND to_join.address IS NULL THEN fromTrType
WHEN to_join.address IS NOT NULL AND from_join.address IS NULL THEN toTrType
WHEN from_join.address IS NOT NULL AND to_join.address IS NOT NULL THEN
CASE
WHEN from_join.address < to_join.address THEN 1
ELSE 2
WHEN from_join.address < to_join.address THEN fromTrType
ELSE toTrType
END
ELSE NULL
END as tr_type,
@ -454,7 +454,7 @@ const (
multi_transactions.to_amount AS mt_to_amount,
CASE
WHEN tr_status.min_status = 1 AND pending_status.count IS NULL THEN statusSuccess
WHEN tr_status.min_status = 1 AND COALESCE(pending_status.count, 0) = 0 THEN statusSuccess
WHEN tr_status.min_status = 0 THEN statusFailed
ELSE statusPending
END AS agg_status,
@ -655,10 +655,13 @@ func getActivityEntries(ctx context.Context, deps FilterDependencies, addresses
}
entry = newActivityEntryWithSimpleTransaction(
&transfer.TransactionIdentity{ChainID: common.ChainID(chainID.Int64), Hash: eth.BytesToHash(transferHash), Address: ownerAddress},
&transfer.TransactionIdentity{ChainID: common.ChainID(chainID.Int64),
Hash: eth.BytesToHash(transferHash),
Address: ownerAddress,
},
timestamp, activityType, activityStatus, inAmount, outAmount, tokenOut, tokenIn)
} else if pendingHash != nil && chainID.Valid {
// Extract activity type: PendingAT
// Extract activity type: SendAT/ReceiveAT
activityType, _ := getActivityType(dbTrType)
inAmount, outAmount := getTrInAndOutAmounts(activityType, dbTrAmount)
@ -669,7 +672,10 @@ func getActivityEntries(ctx context.Context, deps FilterDependencies, addresses
tokenOut = deps.tokenFromSymbol(&cID, tokenCode.String)
}
entry = newActivityEntryWithPendingTransaction(&transfer.TransactionIdentity{ChainID: common.ChainID(chainID.Int64), Hash: eth.BytesToHash(pendingHash)},
entry = newActivityEntryWithPendingTransaction(
&transfer.TransactionIdentity{ChainID: common.ChainID(chainID.Int64),
Hash: eth.BytesToHash(pendingHash),
},
timestamp, activityType, activityStatus, inAmount, outAmount, tokenOut, tokenIn)
} else if multiTxID.Valid {
mtInAmount, mtOutAmount := getMtInAndOutAmounts(dbMtFromAmount, dbMtToAmount)

View File

@ -91,7 +91,7 @@ func NewService(
})
tokenManager := token.NewTokenManager(db, rpcClient, rpcClient.NetworkManager)
savedAddressesManager := &SavedAddressesManager{db: db}
transactionManager := transfer.NewTransactionManager(db, gethManager, transactor, config, accountsDB)
transactionManager := transfer.NewTransactionManager(db, gethManager, transactor, config, accountsDB, walletFeed)
transferController := transfer.NewTransferController(db, rpcClient, accountFeed, walletFeed, transactionManager, tokenManager, config.WalletConfig.LoadAllTransfers)
cryptoCompare := cryptocompare.NewClient()
coingecko := coingecko.NewClient()

View File

@ -11,6 +11,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/account"
"github.com/status-im/status-go/eth-node/types"
@ -21,31 +22,38 @@ import (
"github.com/status-im/status-go/services/wallet/bigint"
"github.com/status-im/status-go/services/wallet/bridge"
wallet_common "github.com/status-im/status-go/services/wallet/common"
"github.com/status-im/status-go/services/wallet/walletevent"
"github.com/status-im/status-go/transactions"
)
const (
// PendingTransactionUpdate is emitted when a pending transaction is updated (added or deleted)
EventPendingTransactionUpdate walletevent.EventType = "pending-transaction-update"
)
type TransactionManager struct {
db *sql.DB
gethManager *account.GethManager
transactor *transactions.Transactor
config *params.NodeConfig
accountsDB *accounts.Database
eventFeed *event.Feed
}
func NewTransactionManager(db *sql.DB, gethManager *account.GethManager, transactor *transactions.Transactor,
config *params.NodeConfig, accountsDB *accounts.Database) *TransactionManager {
config *params.NodeConfig, accountsDB *accounts.Database, eventFeed *event.Feed) *TransactionManager {
return &TransactionManager{
db: db,
gethManager: gethManager,
transactor: transactor,
config: config,
accountsDB: accountsDB,
eventFeed: eventFeed,
}
}
type MultiTransactionType uint8
// TODO: extend with know types
const (
MultiTransactionSend = iota
MultiTransactionSwap
@ -252,11 +260,52 @@ func (tm *TransactionManager) AddPending(transaction PendingTransaction) error {
transaction.AdditionalData,
transaction.MultiTransactionID,
)
// Notify listeners of new pending transaction (used in activity history)
if err == nil {
tm.notifyPendingTransactionListeners(transaction.ChainID, []common.Address{transaction.From, transaction.To}, transaction.Timestamp)
}
return err
}
func (tm *TransactionManager) notifyPendingTransactionListeners(chainID uint64, addresses []common.Address, timestamp uint64) {
if tm.eventFeed != nil {
tm.eventFeed.Send(walletevent.Event{
Type: EventPendingTransactionUpdate,
ChainID: chainID,
Accounts: addresses,
At: int64(timestamp),
})
}
}
func (tm *TransactionManager) DeletePending(chainID uint64, hash common.Hash) error {
_, err := tm.db.Exec(`DELETE FROM pending_transactions WHERE network_id = ? AND hash = ?`, chainID, hash)
tx, err := tm.db.BeginTx(context.Background(), nil)
if err != nil {
return err
}
defer func() {
if err != nil {
_ = tx.Rollback()
}
}()
row := tx.QueryRow(`SELECT from_address, to_address, timestamp FROM pending_transactions WHERE network_id = ? AND hash = ?`, chainID, hash)
var from, to common.Address
var timestamp uint64
err = row.Scan(&from, &to, &timestamp)
if err != nil {
return err
}
_, err = tx.Exec(`DELETE FROM pending_transactions WHERE network_id = ? AND hash = ?`, chainID, hash)
if err != nil {
return err
}
err = tx.Commit()
if err == nil {
tm.notifyPendingTransactionListeners(chainID, []common.Address{from, to}, timestamp)
}
return err
}

View File

@ -9,6 +9,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/event"
"github.com/status-im/status-go/appdatabase"
"github.com/status-im/status-go/services/wallet/bigint"
@ -17,7 +18,7 @@ import (
func setupTestTransactionDB(t *testing.T) (*TransactionManager, func()) {
db, err := appdatabase.SetupTestMemorySQLDB("wallet-transfer-transaction-tests")
require.NoError(t, err)
return &TransactionManager{db, nil, nil, nil, nil}, func() {
return &TransactionManager{db, nil, nil, nil, nil, &event.Feed{}}, func() {
require.NoError(t, db.Close())
}
}