feat!: implement new activityV2 filter (#6102)
* feat!: implement new activityV2 filter * chore_: pr comments
This commit is contained in:
parent
ad28f15531
commit
3466ac2661
|
@ -40,7 +40,7 @@ var (
|
||||||
ZeroAddress = eth.Address{}
|
ZeroAddress = eth.Address{}
|
||||||
)
|
)
|
||||||
|
|
||||||
type TransferType = int
|
type TransferType = int64
|
||||||
|
|
||||||
const (
|
const (
|
||||||
TransferTypeEth TransferType = iota + 1
|
TransferTypeEth TransferType = iota + 1
|
||||||
|
@ -49,51 +49,66 @@ const (
|
||||||
TransferTypeErc1155
|
TransferTypeErc1155
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
L1FinalizationDuration = 960 // A block on layer 1 is every 12s, finalization require 64 blocks. A buffer of 16 blocks is added to not create false positives.
|
||||||
|
L2FinalizationDuration = 648000 // 7.5 days in seconds for layer 2 finalization. 0.5 day is buffer to not create false positive.
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
NoLimit = 0
|
||||||
|
)
|
||||||
|
|
||||||
type Entry struct {
|
type Entry struct {
|
||||||
payloadType PayloadType
|
payloadType PayloadType
|
||||||
transaction *transfer.TransactionIdentity
|
transaction *transfer.TransactionIdentity // ID for SimpleTransactionPT and PendingTransactionPT. Origin transaction for MultiTransactionPT
|
||||||
id common.MultiTransactionIDType
|
id common.MultiTransactionIDType // ID for MultiTransactionPT
|
||||||
timestamp int64
|
transactions []*transfer.TransactionIdentity // List of transactions for MultiTransactionPT
|
||||||
activityType Type
|
timestamp int64
|
||||||
activityStatus Status
|
activityType Type
|
||||||
amountOut *hexutil.Big // Used for activityType SendAT, SwapAT, BridgeAT
|
activityStatus Status
|
||||||
amountIn *hexutil.Big // Used for activityType ReceiveAT, BuyAT, SwapAT, BridgeAT, ApproveAT
|
amountOut *hexutil.Big // Used for activityType SendAT, SwapAT, BridgeAT
|
||||||
tokenOut *Token // Used for activityType SendAT, SwapAT, BridgeAT
|
amountIn *hexutil.Big // Used for activityType ReceiveAT, BuyAT, SwapAT, BridgeAT, ApproveAT
|
||||||
tokenIn *Token // Used for activityType ReceiveAT, BuyAT, SwapAT, BridgeAT, ApproveAT
|
tokenOut *Token // Used for activityType SendAT, SwapAT, BridgeAT
|
||||||
symbolOut *string
|
tokenIn *Token // Used for activityType ReceiveAT, BuyAT, SwapAT, BridgeAT, ApproveAT
|
||||||
symbolIn *string
|
symbolOut *string
|
||||||
sender *eth.Address
|
symbolIn *string
|
||||||
recipient *eth.Address
|
sender *eth.Address
|
||||||
chainIDOut *common.ChainID
|
recipient *eth.Address
|
||||||
chainIDIn *common.ChainID
|
chainIDOut *common.ChainID
|
||||||
transferType *TransferType
|
chainIDIn *common.ChainID
|
||||||
contractAddress *eth.Address
|
transferType *TransferType
|
||||||
communityID *string
|
contractAddress *eth.Address // Used for contract deployment
|
||||||
|
communityID *string
|
||||||
|
interactedContractAddress *eth.Address
|
||||||
|
approvalSpender *eth.Address
|
||||||
|
|
||||||
isNew bool // isNew is used to indicate if the entry is newer than session start (changed state also)
|
isNew bool // isNew is used to indicate if the entry is newer than session start (changed state also)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Only used for JSON marshalling
|
// Only used for JSON marshalling
|
||||||
type EntryData struct {
|
type EntryData struct {
|
||||||
PayloadType PayloadType `json:"payloadType"`
|
PayloadType PayloadType `json:"payloadType"`
|
||||||
Transaction *transfer.TransactionIdentity `json:"transaction,omitempty"`
|
Transaction *transfer.TransactionIdentity `json:"transaction,omitempty"`
|
||||||
ID *common.MultiTransactionIDType `json:"id,omitempty"`
|
ID *common.MultiTransactionIDType `json:"id,omitempty"`
|
||||||
Timestamp *int64 `json:"timestamp,omitempty"`
|
Transactions []*transfer.TransactionIdentity `json:"transactions,omitempty"`
|
||||||
ActivityType *Type `json:"activityType,omitempty"`
|
Timestamp *int64 `json:"timestamp,omitempty"`
|
||||||
ActivityStatus *Status `json:"activityStatus,omitempty"`
|
ActivityType *Type `json:"activityType,omitempty"`
|
||||||
AmountOut *hexutil.Big `json:"amountOut,omitempty"`
|
ActivityStatus *Status `json:"activityStatus,omitempty"`
|
||||||
AmountIn *hexutil.Big `json:"amountIn,omitempty"`
|
AmountOut *hexutil.Big `json:"amountOut,omitempty"`
|
||||||
TokenOut *Token `json:"tokenOut,omitempty"`
|
AmountIn *hexutil.Big `json:"amountIn,omitempty"`
|
||||||
TokenIn *Token `json:"tokenIn,omitempty"`
|
TokenOut *Token `json:"tokenOut,omitempty"`
|
||||||
SymbolOut *string `json:"symbolOut,omitempty"`
|
TokenIn *Token `json:"tokenIn,omitempty"`
|
||||||
SymbolIn *string `json:"symbolIn,omitempty"`
|
SymbolOut *string `json:"symbolOut,omitempty"`
|
||||||
Sender *eth.Address `json:"sender,omitempty"`
|
SymbolIn *string `json:"symbolIn,omitempty"`
|
||||||
Recipient *eth.Address `json:"recipient,omitempty"`
|
Sender *eth.Address `json:"sender,omitempty"`
|
||||||
ChainIDOut *common.ChainID `json:"chainIdOut,omitempty"`
|
Recipient *eth.Address `json:"recipient,omitempty"`
|
||||||
ChainIDIn *common.ChainID `json:"chainIdIn,omitempty"`
|
ChainIDOut *common.ChainID `json:"chainIdOut,omitempty"`
|
||||||
TransferType *TransferType `json:"transferType,omitempty"`
|
ChainIDIn *common.ChainID `json:"chainIdIn,omitempty"`
|
||||||
ContractAddress *eth.Address `json:"contractAddress,omitempty"`
|
TransferType *TransferType `json:"transferType,omitempty"`
|
||||||
CommunityID *string `json:"communityId,omitempty"`
|
ContractAddress *eth.Address `json:"contractAddress,omitempty"`
|
||||||
|
CommunityID *string `json:"communityId,omitempty"`
|
||||||
|
InteractedContractAddress *eth.Address `json:"interactedContractAddress,omitempty"`
|
||||||
|
ApprovalSpender *eth.Address `json:"approvalSpender,omitempty"`
|
||||||
|
|
||||||
IsNew *bool `json:"isNew,omitempty"`
|
IsNew *bool `json:"isNew,omitempty"`
|
||||||
|
|
||||||
|
@ -103,26 +118,29 @@ type EntryData struct {
|
||||||
|
|
||||||
func (e *Entry) MarshalJSON() ([]byte, error) {
|
func (e *Entry) MarshalJSON() ([]byte, error) {
|
||||||
data := EntryData{
|
data := EntryData{
|
||||||
Timestamp: &e.timestamp,
|
Timestamp: &e.timestamp,
|
||||||
ActivityType: &e.activityType,
|
ActivityType: &e.activityType,
|
||||||
ActivityStatus: &e.activityStatus,
|
ActivityStatus: &e.activityStatus,
|
||||||
AmountOut: e.amountOut,
|
AmountOut: e.amountOut,
|
||||||
AmountIn: e.amountIn,
|
AmountIn: e.amountIn,
|
||||||
TokenOut: e.tokenOut,
|
TokenOut: e.tokenOut,
|
||||||
TokenIn: e.tokenIn,
|
TokenIn: e.tokenIn,
|
||||||
SymbolOut: e.symbolOut,
|
SymbolOut: e.symbolOut,
|
||||||
SymbolIn: e.symbolIn,
|
SymbolIn: e.symbolIn,
|
||||||
Sender: e.sender,
|
Sender: e.sender,
|
||||||
Recipient: e.recipient,
|
Recipient: e.recipient,
|
||||||
ChainIDOut: e.chainIDOut,
|
ChainIDOut: e.chainIDOut,
|
||||||
ChainIDIn: e.chainIDIn,
|
ChainIDIn: e.chainIDIn,
|
||||||
TransferType: e.transferType,
|
TransferType: e.transferType,
|
||||||
ContractAddress: e.contractAddress,
|
ContractAddress: e.contractAddress,
|
||||||
CommunityID: e.communityID,
|
CommunityID: e.communityID,
|
||||||
|
InteractedContractAddress: e.interactedContractAddress,
|
||||||
|
ApprovalSpender: e.approvalSpender,
|
||||||
}
|
}
|
||||||
|
|
||||||
if e.payloadType == MultiTransactionPT {
|
if e.payloadType == MultiTransactionPT {
|
||||||
data.ID = common.NewAndSet(e.id)
|
data.ID = common.NewAndSet(e.id)
|
||||||
|
data.Transactions = e.transactions
|
||||||
} else {
|
} else {
|
||||||
data.Transaction = e.transaction
|
data.Transaction = e.transaction
|
||||||
}
|
}
|
||||||
|
@ -145,6 +163,7 @@ func (e *Entry) UnmarshalJSON(data []byte) error {
|
||||||
if aux.ID != nil {
|
if aux.ID != nil {
|
||||||
e.id = *aux.ID
|
e.id = *aux.ID
|
||||||
}
|
}
|
||||||
|
e.transactions = aux.Transactions
|
||||||
if aux.Timestamp != nil {
|
if aux.Timestamp != nil {
|
||||||
e.timestamp = *aux.Timestamp
|
e.timestamp = *aux.Timestamp
|
||||||
}
|
}
|
||||||
|
@ -166,6 +185,8 @@ func (e *Entry) UnmarshalJSON(data []byte) error {
|
||||||
e.chainIDIn = aux.ChainIDIn
|
e.chainIDIn = aux.ChainIDIn
|
||||||
e.transferType = aux.TransferType
|
e.transferType = aux.TransferType
|
||||||
e.communityID = aux.CommunityID
|
e.communityID = aux.CommunityID
|
||||||
|
e.interactedContractAddress = aux.InteractedContractAddress
|
||||||
|
e.approvalSpender = aux.ApprovalSpender
|
||||||
|
|
||||||
e.isNew = aux.IsNew != nil && *aux.IsNew
|
e.isNew = aux.IsNew != nil && *aux.IsNew
|
||||||
|
|
||||||
|
@ -496,8 +517,8 @@ func getActivityEntries(ctx context.Context, deps FilterDependencies, addresses
|
||||||
includeAllNetworks,
|
includeAllNetworks,
|
||||||
transactions.Pending,
|
transactions.Pending,
|
||||||
deps.currentTimestamp(),
|
deps.currentTimestamp(),
|
||||||
648000, // 7.5 days in seconds for layer 2 finalization. 0.5 day is buffer to not create false positive.
|
L2FinalizationDuration,
|
||||||
960, // A block on layer 1 is every 12s, finalization require 64 blocks. A buffer of 16 blocks is added to not create false positives.
|
L1FinalizationDuration,
|
||||||
limit, offset)
|
limit, offset)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
@ -0,0 +1,323 @@
|
||||||
|
package activity
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
sq "github.com/Masterminds/squirrel"
|
||||||
|
|
||||||
|
eth "github.com/ethereum/go-ethereum/common"
|
||||||
|
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||||
|
ethTypes "github.com/ethereum/go-ethereum/core/types"
|
||||||
|
|
||||||
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
"github.com/status-im/status-go/logutils"
|
||||||
|
wCommon "github.com/status-im/status-go/services/wallet/common"
|
||||||
|
"github.com/status-im/status-go/services/wallet/requests"
|
||||||
|
pathProcessorCommon "github.com/status-im/status-go/services/wallet/router/pathprocessor/common"
|
||||||
|
"github.com/status-im/status-go/services/wallet/router/routes"
|
||||||
|
"github.com/status-im/status-go/services/wallet/token"
|
||||||
|
"github.com/status-im/status-go/services/wallet/transfer"
|
||||||
|
"github.com/status-im/status-go/services/wallet/wallettypes"
|
||||||
|
"github.com/status-im/status-go/sqlite"
|
||||||
|
"github.com/status-im/status-go/transactions"
|
||||||
|
)
|
||||||
|
|
||||||
|
// getActivityEntriesV2 queries the route_* and tracked_transactions based on filter parameters and arguments
|
||||||
|
// it returns metadata for all entries ordered by timestamp column
|
||||||
|
func getActivityEntriesV2(ctx context.Context, deps FilterDependencies, addresses []eth.Address, allAddresses bool, chainIDs []wCommon.ChainID, filter Filter, offset int, limit int) ([]Entry, error) {
|
||||||
|
if len(addresses) == 0 {
|
||||||
|
return nil, ErrNoAddressesProvided
|
||||||
|
}
|
||||||
|
if len(chainIDs) == 0 {
|
||||||
|
return nil, ErrNoChainIDsProvided
|
||||||
|
}
|
||||||
|
|
||||||
|
q := sq.Select(`
|
||||||
|
st.tx_json,
|
||||||
|
rpt.tx_args_json,
|
||||||
|
rpt.is_approval,
|
||||||
|
rp.path_json,
|
||||||
|
rip.route_input_params_json,
|
||||||
|
rbtp.route_build_tx_params_json,
|
||||||
|
tt.tx_status,
|
||||||
|
tt.timestamp
|
||||||
|
`).Distinct()
|
||||||
|
q = q.From("sent_transactions st").
|
||||||
|
LeftJoin(`route_path_transactions rpt ON
|
||||||
|
st.chain_id = rpt.chain_id AND
|
||||||
|
st.tx_hash = rpt.tx_hash`).
|
||||||
|
LeftJoin(`tracked_transactions tt ON
|
||||||
|
st.chain_id = tt.chain_id AND
|
||||||
|
st.tx_hash = tt.tx_hash`).
|
||||||
|
LeftJoin(`route_paths rp ON
|
||||||
|
rpt.uuid = rp.uuid AND
|
||||||
|
rpt.path_idx = rp.path_idx`).
|
||||||
|
LeftJoin(`route_build_tx_parameters rbtp ON
|
||||||
|
rpt.uuid = rbtp.uuid`).
|
||||||
|
LeftJoin(`route_input_parameters rip ON
|
||||||
|
rpt.uuid = rip.uuid`)
|
||||||
|
q = q.OrderBy("tt.timestamp DESC")
|
||||||
|
|
||||||
|
qConditions := sq.And{}
|
||||||
|
|
||||||
|
qConditions = append(qConditions, sq.Eq{"rpt.chain_id": chainIDs})
|
||||||
|
qConditions = append(qConditions, sq.Eq{"rip.from_address": addresses})
|
||||||
|
|
||||||
|
q = q.Where(qConditions)
|
||||||
|
|
||||||
|
if limit != NoLimit {
|
||||||
|
q = q.Limit(uint64(limit))
|
||||||
|
q = q.Offset(uint64(offset))
|
||||||
|
}
|
||||||
|
|
||||||
|
query, args, err := q.ToSql()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
stmt, err := deps.db.Prepare(query)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer stmt.Close()
|
||||||
|
|
||||||
|
rows, err := stmt.QueryContext(ctx, args...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
data, err := rowsToDataV2(rows)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return dataToEntriesV2(deps, data)
|
||||||
|
}
|
||||||
|
|
||||||
|
type entryDataV2 struct {
|
||||||
|
TxArgs *wallettypes.SendTxArgs
|
||||||
|
Tx *ethTypes.Transaction
|
||||||
|
IsApproval bool
|
||||||
|
Status transactions.TxStatus
|
||||||
|
Timestamp int64
|
||||||
|
Path *routes.Path
|
||||||
|
RouteInputParams *requests.RouteInputParams
|
||||||
|
BuildInputParams *requests.RouterBuildTransactionsParams
|
||||||
|
}
|
||||||
|
|
||||||
|
func newEntryDataV2() *entryDataV2 {
|
||||||
|
return &entryDataV2{
|
||||||
|
TxArgs: new(wallettypes.SendTxArgs),
|
||||||
|
Tx: new(ethTypes.Transaction),
|
||||||
|
Path: new(routes.Path),
|
||||||
|
RouteInputParams: new(requests.RouteInputParams),
|
||||||
|
BuildInputParams: new(requests.RouterBuildTransactionsParams),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func rowsToDataV2(rows *sql.Rows) ([]*entryDataV2, error) {
|
||||||
|
var ret []*entryDataV2
|
||||||
|
for rows.Next() {
|
||||||
|
data := newEntryDataV2()
|
||||||
|
|
||||||
|
nullableTx := sqlite.JSONBlob{Data: data.Tx}
|
||||||
|
nullableTxArgs := sqlite.JSONBlob{Data: data.TxArgs}
|
||||||
|
nullableIsApproval := sql.NullBool{}
|
||||||
|
nullablePath := sqlite.JSONBlob{Data: data.Path}
|
||||||
|
nullableRouteInputParams := sqlite.JSONBlob{Data: data.RouteInputParams}
|
||||||
|
nullableBuildInputParams := sqlite.JSONBlob{Data: data.BuildInputParams}
|
||||||
|
nullableStatus := sql.NullString{}
|
||||||
|
nullableTimestamp := sql.NullInt64{}
|
||||||
|
|
||||||
|
err := rows.Scan(
|
||||||
|
&nullableTx,
|
||||||
|
&nullableTxArgs,
|
||||||
|
&nullableIsApproval,
|
||||||
|
&nullablePath,
|
||||||
|
&nullableRouteInputParams,
|
||||||
|
&nullableBuildInputParams,
|
||||||
|
&nullableStatus,
|
||||||
|
&nullableTimestamp,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check all necessary fields are not null
|
||||||
|
if !nullableTxArgs.Valid ||
|
||||||
|
!nullableTx.Valid ||
|
||||||
|
!nullableIsApproval.Valid ||
|
||||||
|
!nullableStatus.Valid ||
|
||||||
|
!nullableTimestamp.Valid ||
|
||||||
|
!nullablePath.Valid ||
|
||||||
|
!nullableRouteInputParams.Valid ||
|
||||||
|
!nullableBuildInputParams.Valid {
|
||||||
|
logutils.ZapLogger().Warn("some fields missing in entryData")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
data.IsApproval = nullableIsApproval.Bool
|
||||||
|
data.Status = nullableStatus.String
|
||||||
|
data.Timestamp = nullableTimestamp.Int64
|
||||||
|
|
||||||
|
ret = append(ret, data)
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func dataToEntriesV2(deps FilterDependencies, data []*entryDataV2) ([]Entry, error) {
|
||||||
|
var ret []Entry
|
||||||
|
|
||||||
|
now := time.Now().Unix()
|
||||||
|
|
||||||
|
for _, d := range data {
|
||||||
|
chainID := wCommon.ChainID(d.Path.FromChain.ChainID)
|
||||||
|
|
||||||
|
entry := Entry{
|
||||||
|
payloadType: MultiTransactionPT, // Temporary, to keep compatibility with clients
|
||||||
|
id: d.TxArgs.MultiTransactionID,
|
||||||
|
transactions: []*transfer.TransactionIdentity{
|
||||||
|
{
|
||||||
|
ChainID: chainID,
|
||||||
|
Hash: d.Tx.Hash(),
|
||||||
|
Address: d.RouteInputParams.AddrFrom,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
timestamp: d.Timestamp,
|
||||||
|
activityType: getActivityTypeV2(d.Path.ProcessorName, d.IsApproval),
|
||||||
|
activityStatus: getActivityStatusV2(d.Status, d.Timestamp, now, getFinalizationPeriod(chainID)),
|
||||||
|
amountOut: d.Path.AmountIn, // Path and Activity have inverse perspective for amountIn and amountOut
|
||||||
|
amountIn: d.Path.AmountOut, // Path has the Tx perspective, Activity has the Account perspective
|
||||||
|
tokenOut: getToken(d.Path.FromToken, d.Path.ProcessorName),
|
||||||
|
tokenIn: getToken(d.Path.ToToken, d.Path.ProcessorName),
|
||||||
|
sender: &d.RouteInputParams.AddrFrom,
|
||||||
|
recipient: &d.RouteInputParams.AddrTo,
|
||||||
|
transferType: getTransferType(d.Path.FromToken, d.Path.ProcessorName),
|
||||||
|
//contractAddress: // TODO: Handle community contract deployment
|
||||||
|
//communityID:
|
||||||
|
}
|
||||||
|
|
||||||
|
if d.Path.FromChain != nil {
|
||||||
|
chainID := wCommon.ChainID(d.Path.FromChain.ChainID)
|
||||||
|
entry.chainIDOut = &chainID
|
||||||
|
}
|
||||||
|
if d.Path.ToChain != nil {
|
||||||
|
chainID := wCommon.ChainID(d.Path.ToChain.ChainID)
|
||||||
|
entry.chainIDIn = &chainID
|
||||||
|
}
|
||||||
|
|
||||||
|
entry.symbolOut, entry.symbolIn = lookupAndFillInTokens(deps, entry.tokenOut, entry.tokenIn)
|
||||||
|
|
||||||
|
if entry.transferType == nil || TokenType(*entry.transferType) != Native {
|
||||||
|
interactedAddress := eth.BytesToAddress(d.Tx.To().Bytes())
|
||||||
|
entry.interactedContractAddress = &interactedAddress
|
||||||
|
}
|
||||||
|
|
||||||
|
if entry.activityType == ApproveAT {
|
||||||
|
entry.approvalSpender = d.Path.ApprovalContractAddress
|
||||||
|
}
|
||||||
|
|
||||||
|
ret = append(ret, entry)
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func getActivityTypeV2(processorName string, isApproval bool) Type {
|
||||||
|
if isApproval {
|
||||||
|
return ApproveAT
|
||||||
|
}
|
||||||
|
|
||||||
|
switch processorName {
|
||||||
|
case pathProcessorCommon.ProcessorTransferName, pathProcessorCommon.ProcessorERC721Name, pathProcessorCommon.ProcessorERC1155Name:
|
||||||
|
return SendAT
|
||||||
|
case pathProcessorCommon.ProcessorBridgeHopName, pathProcessorCommon.ProcessorBridgeCelerName:
|
||||||
|
return BridgeAT
|
||||||
|
case pathProcessorCommon.ProcessorSwapParaswapName:
|
||||||
|
return SwapAT
|
||||||
|
}
|
||||||
|
return UnknownAT
|
||||||
|
}
|
||||||
|
|
||||||
|
func getActivityStatusV2(status transactions.TxStatus, timestamp int64, now int64, finalizationDuration int64) Status {
|
||||||
|
switch status {
|
||||||
|
case transactions.Pending:
|
||||||
|
return PendingAS
|
||||||
|
case transactions.Success:
|
||||||
|
if timestamp+finalizationDuration > now {
|
||||||
|
return FinalizedAS
|
||||||
|
}
|
||||||
|
return CompleteAS
|
||||||
|
case transactions.Failed:
|
||||||
|
return FailedAS
|
||||||
|
}
|
||||||
|
|
||||||
|
logutils.ZapLogger().Error("unhandled transaction status value")
|
||||||
|
return FailedAS
|
||||||
|
}
|
||||||
|
|
||||||
|
func getFinalizationPeriod(chainID wCommon.ChainID) int64 {
|
||||||
|
switch uint64(chainID) {
|
||||||
|
case wCommon.EthereumMainnet, wCommon.EthereumSepolia:
|
||||||
|
return L1FinalizationDuration
|
||||||
|
}
|
||||||
|
|
||||||
|
return L2FinalizationDuration
|
||||||
|
}
|
||||||
|
|
||||||
|
func getTransferType(fromToken *token.Token, processorName string) *TransferType {
|
||||||
|
ret := new(TransferType)
|
||||||
|
|
||||||
|
switch processorName {
|
||||||
|
case pathProcessorCommon.ProcessorTransferName:
|
||||||
|
if fromToken.IsNative() {
|
||||||
|
*ret = TransferTypeEth
|
||||||
|
break
|
||||||
|
}
|
||||||
|
*ret = TransferTypeErc20
|
||||||
|
case pathProcessorCommon.ProcessorERC721Name:
|
||||||
|
*ret = TransferTypeErc721
|
||||||
|
case pathProcessorCommon.ProcessorERC1155Name:
|
||||||
|
*ret = TransferTypeErc1155
|
||||||
|
default:
|
||||||
|
ret = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret
|
||||||
|
}
|
||||||
|
|
||||||
|
func getToken(token *token.Token, processorName string) *Token {
|
||||||
|
if token == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
ret := new(Token)
|
||||||
|
ret.ChainID = wCommon.ChainID(token.ChainID)
|
||||||
|
if token.IsNative() {
|
||||||
|
ret.TokenType = Native
|
||||||
|
} else {
|
||||||
|
ret.Address = token.Address
|
||||||
|
switch processorName {
|
||||||
|
case pathProcessorCommon.ProcessorERC721Name, pathProcessorCommon.ProcessorERC1155Name:
|
||||||
|
id, err := wCommon.GetTokenIdFromSymbol(token.Symbol)
|
||||||
|
if err != nil {
|
||||||
|
logutils.ZapLogger().Warn("malformed token symbol", zap.Error(err))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
ret.TokenID = (*hexutil.Big)(id)
|
||||||
|
if processorName == pathProcessorCommon.ProcessorERC721Name {
|
||||||
|
ret.TokenType = Erc721
|
||||||
|
} else {
|
||||||
|
ret.TokenType = Erc1155
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
ret.TokenType = Erc20
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret
|
||||||
|
}
|
|
@ -0,0 +1,9 @@
|
||||||
|
package activity
|
||||||
|
|
||||||
|
import "errors"
|
||||||
|
|
||||||
|
var (
|
||||||
|
ErrNoAddressesProvided = errors.New("no addresses provided")
|
||||||
|
ErrNoChainIDsProvided = errors.New("no chainIDs provided")
|
||||||
|
ErrSessionNotFound = errors.New("session not found")
|
||||||
|
)
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"math"
|
||||||
|
|
||||||
// used for embedding the sql query in the binary
|
// used for embedding the sql query in the binary
|
||||||
_ "embed"
|
_ "embed"
|
||||||
|
@ -31,7 +32,7 @@ type Period struct {
|
||||||
EndTimestamp int64 `json:"endTimestamp"`
|
EndTimestamp int64 `json:"endTimestamp"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type Type int
|
type Type uint64
|
||||||
|
|
||||||
const (
|
const (
|
||||||
SendAT Type = iota
|
SendAT Type = iota
|
||||||
|
@ -42,6 +43,7 @@ const (
|
||||||
ContractDeploymentAT
|
ContractDeploymentAT
|
||||||
MintAT
|
MintAT
|
||||||
ApproveAT
|
ApproveAT
|
||||||
|
UnknownAT = math.MaxUint64
|
||||||
)
|
)
|
||||||
|
|
||||||
func allActivityTypesFilter() []Type {
|
func allActivityTypesFilter() []Type {
|
||||||
|
|
|
@ -67,10 +67,11 @@ type Service struct {
|
||||||
|
|
||||||
scheduler *async.MultiClientScheduler
|
scheduler *async.MultiClientScheduler
|
||||||
|
|
||||||
sessions map[SessionID]*Session
|
sessions map[SessionID]*Session
|
||||||
lastSessionID atomic.Int32
|
lastSessionID atomic.Int32
|
||||||
subscriptions event.Subscription
|
subscriptions event.Subscription
|
||||||
ch chan walletevent.Event
|
subscriptionsCancelFn context.CancelFunc
|
||||||
|
ch chan walletevent.Event
|
||||||
// sessionsRWMutex is used to protect all sessions related members
|
// sessionsRWMutex is used to protect all sessions related members
|
||||||
sessionsRWMutex sync.RWMutex
|
sessionsRWMutex sync.RWMutex
|
||||||
debounceDuration time.Duration
|
debounceDuration time.Duration
|
||||||
|
|
|
@ -413,284 +413,3 @@ func validateFilteringDone(t *testing.T, ch chan walletevent.Event, resCount int
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestService_IncrementalUpdateOnTop(t *testing.T) {
|
|
||||||
state := setupTestService(t)
|
|
||||||
defer state.close()
|
|
||||||
|
|
||||||
transactionCount := 2
|
|
||||||
allAddresses, pendings, ch, cleanup := setupTransactions(t, state, transactionCount, []transactions.TestTxSummary{{DontConfirm: true, Timestamp: transactionCount + 1}})
|
|
||||||
defer cleanup()
|
|
||||||
|
|
||||||
sessionID := state.service.StartFilterSession(allAddresses, allNetworksFilter(), Filter{}, 5)
|
|
||||||
require.Greater(t, sessionID, SessionID(0))
|
|
||||||
defer state.service.StopFilterSession(sessionID)
|
|
||||||
|
|
||||||
filterResponseCount := validateFilteringDone(t, ch, 2, nil, nil)
|
|
||||||
|
|
||||||
exp := pendings[0]
|
|
||||||
err := state.pendingTracker.StoreAndTrackPendingTx(&exp)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
vFn := getValidateSessionUpdateHasNewOnTopFn(t)
|
|
||||||
pendingTransactionUpdate, sessionUpdatesCount := validateSessionUpdateEvent(t, ch, &filterResponseCount, 1, vFn)
|
|
||||||
|
|
||||||
err = state.service.ResetFilterSession(sessionID, 5)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// Validate the reset data
|
|
||||||
eventActivityDoneCount := validateFilteringDone(t, ch, 3, func(payload FilterResponse) {
|
|
||||||
require.True(t, payload.Activities[0].isNew)
|
|
||||||
require.False(t, payload.Activities[1].isNew)
|
|
||||||
require.False(t, payload.Activities[2].isNew)
|
|
||||||
|
|
||||||
// Check the new transaction data
|
|
||||||
newTx := payload.Activities[0]
|
|
||||||
require.Equal(t, PendingTransactionPT, newTx.payloadType)
|
|
||||||
// We don't keep type in the DB
|
|
||||||
require.Equal(t, (*int)(nil), newTx.transferType)
|
|
||||||
require.Equal(t, SendAT, newTx.activityType)
|
|
||||||
require.Equal(t, PendingAS, newTx.activityStatus)
|
|
||||||
require.Equal(t, exp.ChainID, newTx.transaction.ChainID)
|
|
||||||
require.Equal(t, exp.ChainID, *newTx.chainIDOut)
|
|
||||||
require.Equal(t, (*common.ChainID)(nil), newTx.chainIDIn)
|
|
||||||
require.Equal(t, exp.Hash, newTx.transaction.Hash)
|
|
||||||
// Pending doesn't have address as part of identity
|
|
||||||
require.Equal(t, eth.Address{}, newTx.transaction.Address)
|
|
||||||
require.Equal(t, exp.From, *newTx.sender)
|
|
||||||
require.Equal(t, exp.To, *newTx.recipient)
|
|
||||||
require.Equal(t, 0, exp.Value.Int.Cmp((*big.Int)(newTx.amountOut)))
|
|
||||||
require.Equal(t, exp.Timestamp, uint64(newTx.timestamp))
|
|
||||||
require.Equal(t, exp.Symbol, *newTx.symbolOut)
|
|
||||||
require.Equal(t, (*string)(nil), newTx.symbolIn)
|
|
||||||
require.Equal(t, &Token{
|
|
||||||
TokenType: Native,
|
|
||||||
ChainID: 5,
|
|
||||||
}, newTx.tokenOut)
|
|
||||||
require.Equal(t, (*Token)(nil), newTx.tokenIn)
|
|
||||||
require.Equal(t, (*eth.Address)(nil), newTx.contractAddress)
|
|
||||||
|
|
||||||
// Check the order of the following transaction data
|
|
||||||
require.Equal(t, SimpleTransactionPT, payload.Activities[1].payloadType)
|
|
||||||
require.Equal(t, int64(transactionCount), payload.Activities[1].timestamp)
|
|
||||||
require.Equal(t, SimpleTransactionPT, payload.Activities[2].payloadType)
|
|
||||||
require.Equal(t, int64(transactionCount-1), payload.Activities[2].timestamp)
|
|
||||||
}, nil)
|
|
||||||
|
|
||||||
require.Equal(t, 1, pendingTransactionUpdate)
|
|
||||||
require.Equal(t, 1, filterResponseCount)
|
|
||||||
require.Equal(t, 1, sessionUpdatesCount)
|
|
||||||
require.Equal(t, 1, eventActivityDoneCount)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestService_IncrementalUpdateMixed(t *testing.T) {
|
|
||||||
state := setupTestService(t)
|
|
||||||
defer state.close()
|
|
||||||
|
|
||||||
transactionCount := 5
|
|
||||||
allAddresses, pendings, ch, cleanup := setupTransactions(t, state, transactionCount,
|
|
||||||
[]transactions.TestTxSummary{
|
|
||||||
{DontConfirm: true, Timestamp: 2},
|
|
||||||
{DontConfirm: true, Timestamp: 4},
|
|
||||||
{DontConfirm: true, Timestamp: 6},
|
|
||||||
},
|
|
||||||
)
|
|
||||||
defer cleanup()
|
|
||||||
|
|
||||||
sessionID := state.service.StartFilterSession(allAddresses, allNetworksFilter(), Filter{}, 5)
|
|
||||||
require.Greater(t, sessionID, SessionID(0))
|
|
||||||
defer state.service.StopFilterSession(sessionID)
|
|
||||||
|
|
||||||
filterResponseCount := validateFilteringDone(t, ch, 5, nil, nil)
|
|
||||||
|
|
||||||
for i := range pendings {
|
|
||||||
err := state.pendingTracker.StoreAndTrackPendingTx(&pendings[i])
|
|
||||||
require.NoError(t, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
pendingTransactionUpdate, sessionUpdatesCount := validateSessionUpdateEvent(t, ch, &filterResponseCount, 2, func(payload SessionUpdate) bool {
|
|
||||||
require.Nil(t, payload.HasNewOnTop)
|
|
||||||
require.NotEmpty(t, payload.New)
|
|
||||||
for _, update := range payload.New {
|
|
||||||
require.True(t, update.Entry.isNew)
|
|
||||||
foundIdx := -1
|
|
||||||
for i, pTx := range pendings {
|
|
||||||
if pTx.Hash == update.Entry.transaction.Hash && pTx.ChainID == update.Entry.transaction.ChainID {
|
|
||||||
foundIdx = i
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
require.Greater(t, foundIdx, -1, "the updated transaction should be found in the pending list")
|
|
||||||
pendings = append(pendings[:foundIdx], pendings[foundIdx+1:]...)
|
|
||||||
}
|
|
||||||
return len(pendings) == 1
|
|
||||||
})
|
|
||||||
|
|
||||||
// Validate that the last one (oldest) is out of the window
|
|
||||||
require.Equal(t, 1, len(pendings))
|
|
||||||
require.Equal(t, uint64(2), pendings[0].Timestamp)
|
|
||||||
|
|
||||||
require.Equal(t, 3, pendingTransactionUpdate)
|
|
||||||
require.LessOrEqual(t, sessionUpdatesCount, 3)
|
|
||||||
require.Equal(t, 1, filterResponseCount)
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestService_IncrementalUpdateFetchWindow(t *testing.T) {
|
|
||||||
state := setupTestService(t)
|
|
||||||
defer state.close()
|
|
||||||
|
|
||||||
transactionCount := 5
|
|
||||||
allAddresses, pendings, ch, cleanup := setupTransactions(t, state, transactionCount, []transactions.TestTxSummary{{DontConfirm: true, Timestamp: transactionCount + 1}})
|
|
||||||
defer cleanup()
|
|
||||||
|
|
||||||
sessionID := state.service.StartFilterSession(allAddresses, allNetworksFilter(), Filter{}, 2)
|
|
||||||
require.Greater(t, sessionID, SessionID(0))
|
|
||||||
defer state.service.StopFilterSession(sessionID)
|
|
||||||
|
|
||||||
filterResponseCount := validateFilteringDone(t, ch, 2, nil, nil)
|
|
||||||
|
|
||||||
exp := pendings[0]
|
|
||||||
err := state.pendingTracker.StoreAndTrackPendingTx(&exp)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
vFn := getValidateSessionUpdateHasNewOnTopFn(t)
|
|
||||||
pendingTransactionUpdate, sessionUpdatesCount := validateSessionUpdateEvent(t, ch, &filterResponseCount, 1, vFn)
|
|
||||||
|
|
||||||
err = state.service.ResetFilterSession(sessionID, 2)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// Validate the reset data
|
|
||||||
eventActivityDoneCount := validateFilteringDone(t, ch, 2, func(payload FilterResponse) {
|
|
||||||
require.True(t, payload.Activities[0].isNew)
|
|
||||||
require.Equal(t, int64(transactionCount+1), payload.Activities[0].timestamp)
|
|
||||||
require.False(t, payload.Activities[1].isNew)
|
|
||||||
require.Equal(t, int64(transactionCount), payload.Activities[1].timestamp)
|
|
||||||
}, nil)
|
|
||||||
|
|
||||||
require.Equal(t, 1, pendingTransactionUpdate)
|
|
||||||
require.Equal(t, 1, filterResponseCount)
|
|
||||||
require.Equal(t, 1, sessionUpdatesCount)
|
|
||||||
require.Equal(t, 1, eventActivityDoneCount)
|
|
||||||
|
|
||||||
err = state.service.GetMoreForFilterSession(sessionID, 2)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
eventActivityDoneCount = validateFilteringDone(t, ch, 2, func(payload FilterResponse) {
|
|
||||||
require.False(t, payload.Activities[0].isNew)
|
|
||||||
require.Equal(t, int64(transactionCount-1), payload.Activities[0].timestamp)
|
|
||||||
require.False(t, payload.Activities[1].isNew)
|
|
||||||
require.Equal(t, int64(transactionCount-2), payload.Activities[1].timestamp)
|
|
||||||
}, common.NewAndSet(extraExpect{common.NewAndSet(2), nil}))
|
|
||||||
require.Equal(t, 1, eventActivityDoneCount)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestService_IncrementalUpdateFetchWindowNoReset(t *testing.T) {
|
|
||||||
state := setupTestService(t)
|
|
||||||
defer state.close()
|
|
||||||
|
|
||||||
transactionCount := 5
|
|
||||||
allAddresses, pendings, ch, cleanup := setupTransactions(t, state, transactionCount, []transactions.TestTxSummary{{DontConfirm: true, Timestamp: transactionCount + 1}})
|
|
||||||
defer cleanup()
|
|
||||||
|
|
||||||
sessionID := state.service.StartFilterSession(allAddresses, allNetworksFilter(), Filter{}, 2)
|
|
||||||
require.Greater(t, sessionID, SessionID(0))
|
|
||||||
defer state.service.StopFilterSession(sessionID)
|
|
||||||
|
|
||||||
filterResponseCount := validateFilteringDone(t, ch, 2, func(payload FilterResponse) {
|
|
||||||
require.Equal(t, int64(transactionCount), payload.Activities[0].timestamp)
|
|
||||||
require.Equal(t, int64(transactionCount-1), payload.Activities[1].timestamp)
|
|
||||||
}, nil)
|
|
||||||
|
|
||||||
exp := pendings[0]
|
|
||||||
err := state.pendingTracker.StoreAndTrackPendingTx(&exp)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
vFn := getValidateSessionUpdateHasNewOnTopFn(t)
|
|
||||||
pendingTransactionUpdate, sessionUpdatesCount := validateSessionUpdateEvent(t, ch, &filterResponseCount, 1, vFn)
|
|
||||||
require.Equal(t, 1, pendingTransactionUpdate)
|
|
||||||
require.Equal(t, 1, filterResponseCount)
|
|
||||||
require.Equal(t, 1, sessionUpdatesCount)
|
|
||||||
|
|
||||||
err = state.service.GetMoreForFilterSession(sessionID, 2)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// Validate that client continue loading the next window without being affected by the internal state of new
|
|
||||||
eventActivityDoneCount := validateFilteringDone(t, ch, 2, func(payload FilterResponse) {
|
|
||||||
require.False(t, payload.Activities[0].isNew)
|
|
||||||
require.Equal(t, int64(transactionCount-2), payload.Activities[0].timestamp)
|
|
||||||
require.False(t, payload.Activities[1].isNew)
|
|
||||||
require.Equal(t, int64(transactionCount-3), payload.Activities[1].timestamp)
|
|
||||||
}, common.NewAndSet(extraExpect{common.NewAndSet(2), nil}))
|
|
||||||
require.Equal(t, 1, eventActivityDoneCount)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Simulate and validate a multi-step user flow that was also a regression in the original implementation
|
|
||||||
func TestService_FilteredIncrementalUpdateResetAndClear(t *testing.T) {
|
|
||||||
state := setupTestService(t)
|
|
||||||
defer state.close()
|
|
||||||
|
|
||||||
transactionCount := 5
|
|
||||||
allAddresses, pendings, ch, cleanup := setupTransactions(t, state, transactionCount, []transactions.TestTxSummary{{DontConfirm: true, Timestamp: transactionCount + 1}})
|
|
||||||
defer cleanup()
|
|
||||||
|
|
||||||
// Generate new transaction for step 5
|
|
||||||
newOffset := transactionCount + 2
|
|
||||||
newTxs, newFromTrs, newToTrs := transfer.GenerateTestTransfers(t, state.service.db, newOffset, 1)
|
|
||||||
allAddresses = append(append(allAddresses, newFromTrs...), newToTrs...)
|
|
||||||
|
|
||||||
// 1. User visualizes transactions for the first time
|
|
||||||
sessionID := state.service.StartFilterSession(allAddresses, allNetworksFilter(), Filter{}, 4)
|
|
||||||
require.Greater(t, sessionID, SessionID(0))
|
|
||||||
defer state.service.StopFilterSession(sessionID)
|
|
||||||
|
|
||||||
validateFilteringDone(t, ch, 4, nil, nil)
|
|
||||||
|
|
||||||
// 2. User applies a filter for pending transactions
|
|
||||||
err := state.service.UpdateFilterForSession(sessionID, Filter{Statuses: []Status{PendingAS}}, 4)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
filterResponseCount := validateFilteringDone(t, ch, 0, nil, nil)
|
|
||||||
|
|
||||||
// 3. A pending transaction is added
|
|
||||||
exp := pendings[0]
|
|
||||||
err = state.pendingTracker.StoreAndTrackPendingTx(&exp)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
vFn := getValidateSessionUpdateHasNewOnTopFn(t)
|
|
||||||
pendingTransactionUpdate, sessionUpdatesCount := validateSessionUpdateEvent(t, ch, &filterResponseCount, 1, vFn)
|
|
||||||
|
|
||||||
// 4. User resets the view and the new pending transaction has the new flag
|
|
||||||
err = state.service.ResetFilterSession(sessionID, 2)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// Validate the reset data
|
|
||||||
eventActivityDoneCount := validateFilteringDone(t, ch, 1, func(payload FilterResponse) {
|
|
||||||
require.True(t, payload.Activities[0].isNew)
|
|
||||||
require.Equal(t, int64(transactionCount+1), payload.Activities[0].timestamp)
|
|
||||||
}, nil)
|
|
||||||
|
|
||||||
require.Equal(t, 1, pendingTransactionUpdate)
|
|
||||||
require.Equal(t, 1, filterResponseCount)
|
|
||||||
require.Equal(t, 1, sessionUpdatesCount)
|
|
||||||
require.Equal(t, 1, eventActivityDoneCount)
|
|
||||||
|
|
||||||
// 5. A new transaction is downloaded
|
|
||||||
transfer.InsertTestTransfer(t, state.service.db, newTxs[0].To, &newTxs[0])
|
|
||||||
|
|
||||||
// 6. User clears the filter and only the new transaction should have the new flag
|
|
||||||
err = state.service.UpdateFilterForSession(sessionID, Filter{}, 4)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
eventActivityDoneCount = validateFilteringDone(t, ch, 4, func(payload FilterResponse) {
|
|
||||||
require.True(t, payload.Activities[0].isNew)
|
|
||||||
require.Equal(t, int64(newOffset), payload.Activities[0].timestamp)
|
|
||||||
require.False(t, payload.Activities[1].isNew)
|
|
||||||
require.Equal(t, int64(newOffset-1), payload.Activities[1].timestamp)
|
|
||||||
require.False(t, payload.Activities[2].isNew)
|
|
||||||
require.Equal(t, int64(newOffset-2), payload.Activities[2].timestamp)
|
|
||||||
require.False(t, payload.Activities[3].isNew)
|
|
||||||
require.Equal(t, int64(newOffset-3), payload.Activities[3].timestamp)
|
|
||||||
}, nil)
|
|
||||||
require.Equal(t, 1, eventActivityDoneCount)
|
|
||||||
}
|
|
||||||
|
|
|
@ -1,22 +1,12 @@
|
||||||
package activity
|
package activity
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"errors"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"sync"
|
||||||
|
|
||||||
"go.uber.org/zap"
|
|
||||||
|
|
||||||
eth "github.com/ethereum/go-ethereum/common"
|
eth "github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/event"
|
|
||||||
gocommon "github.com/status-im/status-go/common"
|
|
||||||
"github.com/status-im/status-go/logutils"
|
|
||||||
"github.com/status-im/status-go/services/wallet/async"
|
|
||||||
"github.com/status-im/status-go/services/wallet/common"
|
"github.com/status-im/status-go/services/wallet/common"
|
||||||
"github.com/status-im/status-go/services/wallet/transfer"
|
"github.com/status-im/status-go/services/wallet/transfer"
|
||||||
"github.com/status-im/status-go/services/wallet/walletevent"
|
|
||||||
"github.com/status-im/status-go/transactions"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const nilStr = "nil"
|
const nilStr = "nil"
|
||||||
|
@ -47,14 +37,9 @@ func (e EntryIdentity) key() string {
|
||||||
type SessionID int32
|
type SessionID int32
|
||||||
|
|
||||||
// Session stores state related to a filter session
|
// Session stores state related to a filter session
|
||||||
// The user happy flow is:
|
|
||||||
// 1. StartFilterSession to get a new SessionID and client be notified by the current state
|
|
||||||
// 2. GetMoreForFilterSession anytime to get more entries after the first page
|
|
||||||
// 3. UpdateFilterForSession to update the filter and get the new state or clean the filter and get the newer entries
|
|
||||||
// 4. ResetFilterSession in case client receives SessionUpdate with HasNewOnTop = true to get the latest state
|
|
||||||
// 5. StopFilterSession to stop the session when no used (user changed from activity screens or changed addresses and chains)
|
|
||||||
type Session struct {
|
type Session struct {
|
||||||
id SessionID
|
id SessionID
|
||||||
|
version Version
|
||||||
|
|
||||||
// Filter info
|
// Filter info
|
||||||
//
|
//
|
||||||
|
@ -68,462 +53,16 @@ type Session struct {
|
||||||
noFilterModel map[string]EntryIdentity
|
noFilterModel map[string]EntryIdentity
|
||||||
// new holds the new entries until user requests update by calling ResetFilterSession
|
// new holds the new entries until user requests update by calling ResetFilterSession
|
||||||
new []EntryIdentity
|
new []EntryIdentity
|
||||||
|
|
||||||
|
mu *sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
type EntryUpdate struct {
|
func (s *Session) getFullFilterParams() fullFilterParams {
|
||||||
Pos int `json:"pos"`
|
return fullFilterParams{
|
||||||
Entry *Entry `json:"entry"`
|
sessionID: s.id,
|
||||||
}
|
version: s.version,
|
||||||
|
addresses: s.addresses,
|
||||||
// SessionUpdate payload for EventActivitySessionUpdated
|
chainIDs: s.chainIDs,
|
||||||
type SessionUpdate struct {
|
filter: s.filter,
|
||||||
HasNewOnTop *bool `json:"hasNewOnTop,omitempty"`
|
|
||||||
New []*EntryUpdate `json:"new,omitempty"`
|
|
||||||
Removed []EntryIdentity `json:"removed,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type fullFilterParams struct {
|
|
||||||
sessionID SessionID
|
|
||||||
addresses []eth.Address
|
|
||||||
chainIDs []common.ChainID
|
|
||||||
filter Filter
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Service) internalFilter(f fullFilterParams, offset int, count int, processResults func(entries []Entry) (offsetOverride int)) {
|
|
||||||
s.scheduler.Enqueue(int32(f.sessionID), filterTask, func(ctx context.Context) (interface{}, error) {
|
|
||||||
allAddresses := s.areAllAddresses(f.addresses)
|
|
||||||
activities, err := getActivityEntries(ctx, s.getDeps(), f.addresses, allAddresses, f.chainIDs, f.filter, offset, count)
|
|
||||||
return activities, err
|
|
||||||
}, func(result interface{}, taskType async.TaskType, err error) {
|
|
||||||
res := FilterResponse{
|
|
||||||
ErrorCode: ErrorCodeFailed,
|
|
||||||
}
|
|
||||||
|
|
||||||
if errors.Is(err, context.Canceled) || errors.Is(err, async.ErrTaskOverwritten) {
|
|
||||||
res.ErrorCode = ErrorCodeTaskCanceled
|
|
||||||
} else if err == nil {
|
|
||||||
activities := result.([]Entry)
|
|
||||||
res.Activities = activities
|
|
||||||
res.HasMore = len(activities) == count
|
|
||||||
res.ErrorCode = ErrorCodeSuccess
|
|
||||||
|
|
||||||
res.Offset = processResults(activities)
|
|
||||||
}
|
|
||||||
|
|
||||||
int32SessionID := int32(f.sessionID)
|
|
||||||
sendResponseEvent(s.eventFeed, &int32SessionID, EventActivityFilteringDone, res, err)
|
|
||||||
|
|
||||||
s.getActivityDetailsAsync(int32SessionID, res.Activities)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// mirrorIdentities for update use
|
|
||||||
func mirrorIdentities(entries []Entry) []EntryIdentity {
|
|
||||||
model := make([]EntryIdentity, 0, len(entries))
|
|
||||||
for _, a := range entries {
|
|
||||||
model = append(model, EntryIdentity{
|
|
||||||
payloadType: a.payloadType,
|
|
||||||
transaction: a.transaction,
|
|
||||||
id: a.id,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
return model
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Service) internalFilterForSession(session *Session, firstPageCount int) {
|
|
||||||
s.internalFilter(
|
|
||||||
fullFilterParams{
|
|
||||||
sessionID: session.id,
|
|
||||||
addresses: session.addresses,
|
|
||||||
chainIDs: session.chainIDs,
|
|
||||||
filter: session.filter,
|
|
||||||
},
|
|
||||||
0,
|
|
||||||
firstPageCount,
|
|
||||||
func(entries []Entry) (offset int) {
|
|
||||||
s.sessionsRWMutex.Lock()
|
|
||||||
defer s.sessionsRWMutex.Unlock()
|
|
||||||
|
|
||||||
session.model = mirrorIdentities(entries)
|
|
||||||
|
|
||||||
return 0
|
|
||||||
},
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Service) StartFilterSession(addresses []eth.Address, chainIDs []common.ChainID, filter Filter, firstPageCount int) SessionID {
|
|
||||||
sessionID := s.nextSessionID()
|
|
||||||
|
|
||||||
session := &Session{
|
|
||||||
id: sessionID,
|
|
||||||
|
|
||||||
addresses: addresses,
|
|
||||||
chainIDs: chainIDs,
|
|
||||||
filter: filter,
|
|
||||||
|
|
||||||
model: make([]EntryIdentity, 0, firstPageCount),
|
|
||||||
}
|
|
||||||
|
|
||||||
s.sessionsRWMutex.Lock()
|
|
||||||
subscribeToEvents := len(s.sessions) == 0
|
|
||||||
|
|
||||||
s.sessions[sessionID] = session
|
|
||||||
|
|
||||||
if subscribeToEvents {
|
|
||||||
s.subscribeToEvents()
|
|
||||||
}
|
|
||||||
s.sessionsRWMutex.Unlock()
|
|
||||||
|
|
||||||
s.internalFilterForSession(session, firstPageCount)
|
|
||||||
|
|
||||||
return sessionID
|
|
||||||
}
|
|
||||||
|
|
||||||
// UpdateFilterForSession is to be called for updating the filter of a specific session
|
|
||||||
// After calling this method to set a filter all the incoming changes will be reported with
|
|
||||||
// Entry.isNew = true when filter is reset to empty
|
|
||||||
func (s *Service) UpdateFilterForSession(id SessionID, filter Filter, firstPageCount int) error {
|
|
||||||
s.sessionsRWMutex.RLock()
|
|
||||||
session, found := s.sessions[id]
|
|
||||||
if !found {
|
|
||||||
s.sessionsRWMutex.RUnlock()
|
|
||||||
return errors.New("session not found")
|
|
||||||
}
|
|
||||||
|
|
||||||
prevFilterEmpty := session.filter.IsEmpty()
|
|
||||||
newFilerEmpty := filter.IsEmpty()
|
|
||||||
s.sessionsRWMutex.RUnlock()
|
|
||||||
|
|
||||||
s.sessionsRWMutex.Lock()
|
|
||||||
|
|
||||||
session.new = nil
|
|
||||||
|
|
||||||
session.filter = filter
|
|
||||||
|
|
||||||
if prevFilterEmpty && !newFilerEmpty {
|
|
||||||
// Session is moving from empty to non-empty filter
|
|
||||||
// Take a snapshot of the current model
|
|
||||||
session.noFilterModel = entryIdsToMap(session.model)
|
|
||||||
|
|
||||||
session.model = make([]EntryIdentity, 0, firstPageCount)
|
|
||||||
|
|
||||||
// In this case there is nothing to flag so we request the first page
|
|
||||||
s.internalFilterForSession(session, firstPageCount)
|
|
||||||
} else if !prevFilterEmpty && newFilerEmpty {
|
|
||||||
// Session is moving from non-empty to empty filter
|
|
||||||
// In this case we need to flag all the new entries that are not in the noFilterModel
|
|
||||||
s.internalFilter(
|
|
||||||
fullFilterParams{
|
|
||||||
sessionID: session.id,
|
|
||||||
addresses: session.addresses,
|
|
||||||
chainIDs: session.chainIDs,
|
|
||||||
filter: session.filter,
|
|
||||||
},
|
|
||||||
0,
|
|
||||||
firstPageCount,
|
|
||||||
func(entries []Entry) (offset int) {
|
|
||||||
s.sessionsRWMutex.Lock()
|
|
||||||
defer s.sessionsRWMutex.Unlock()
|
|
||||||
|
|
||||||
// Mark new entries
|
|
||||||
for i, a := range entries {
|
|
||||||
_, found := session.noFilterModel[a.getIdentity().key()]
|
|
||||||
entries[i].isNew = !found
|
|
||||||
}
|
|
||||||
|
|
||||||
// Mirror identities for update use
|
|
||||||
session.model = mirrorIdentities(entries)
|
|
||||||
session.noFilterModel = nil
|
|
||||||
return 0
|
|
||||||
},
|
|
||||||
)
|
|
||||||
} else {
|
|
||||||
// Else act as a normal filter update
|
|
||||||
s.internalFilterForSession(session, firstPageCount)
|
|
||||||
}
|
|
||||||
s.sessionsRWMutex.Unlock()
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// ResetFilterSession is to be called when SessionUpdate.HasNewOnTop == true to
|
|
||||||
// update client with the latest state including new on top entries
|
|
||||||
func (s *Service) ResetFilterSession(id SessionID, firstPageCount int) error {
|
|
||||||
session, found := s.sessions[id]
|
|
||||||
if !found {
|
|
||||||
return errors.New("session not found")
|
|
||||||
}
|
|
||||||
|
|
||||||
s.internalFilter(
|
|
||||||
fullFilterParams{
|
|
||||||
sessionID: id,
|
|
||||||
addresses: session.addresses,
|
|
||||||
chainIDs: session.chainIDs,
|
|
||||||
filter: session.filter,
|
|
||||||
},
|
|
||||||
0,
|
|
||||||
firstPageCount,
|
|
||||||
func(entries []Entry) (offset int) {
|
|
||||||
s.sessionsRWMutex.Lock()
|
|
||||||
defer s.sessionsRWMutex.Unlock()
|
|
||||||
|
|
||||||
// Mark new entries
|
|
||||||
newMap := entryIdsToMap(session.new)
|
|
||||||
for i, a := range entries {
|
|
||||||
_, isNew := newMap[a.getIdentity().key()]
|
|
||||||
entries[i].isNew = isNew
|
|
||||||
}
|
|
||||||
session.new = nil
|
|
||||||
|
|
||||||
if session.noFilterModel != nil {
|
|
||||||
// Add reported new entries to mark them as seen
|
|
||||||
for _, a := range newMap {
|
|
||||||
session.noFilterModel[a.key()] = a
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Mirror client identities for checking updates
|
|
||||||
session.model = mirrorIdentities(entries)
|
|
||||||
|
|
||||||
return 0
|
|
||||||
},
|
|
||||||
)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Service) GetMoreForFilterSession(id SessionID, pageCount int) error {
|
|
||||||
session, found := s.sessions[id]
|
|
||||||
if !found {
|
|
||||||
return errors.New("session not found")
|
|
||||||
}
|
|
||||||
|
|
||||||
prevModelLen := len(session.model)
|
|
||||||
s.internalFilter(
|
|
||||||
fullFilterParams{
|
|
||||||
sessionID: id,
|
|
||||||
addresses: session.addresses,
|
|
||||||
chainIDs: session.chainIDs,
|
|
||||||
filter: session.filter,
|
|
||||||
},
|
|
||||||
prevModelLen+len(session.new),
|
|
||||||
pageCount,
|
|
||||||
func(entries []Entry) (offset int) {
|
|
||||||
s.sessionsRWMutex.Lock()
|
|
||||||
defer s.sessionsRWMutex.Unlock()
|
|
||||||
|
|
||||||
// Mirror client identities for checking updates
|
|
||||||
for _, a := range entries {
|
|
||||||
session.model = append(session.model, EntryIdentity{
|
|
||||||
payloadType: a.payloadType,
|
|
||||||
transaction: a.transaction,
|
|
||||||
id: a.id,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// Overwrite the offset to account for new entries
|
|
||||||
return prevModelLen
|
|
||||||
},
|
|
||||||
)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// subscribeToEvents should be called with sessionsRWMutex locked for writing
|
|
||||||
func (s *Service) subscribeToEvents() {
|
|
||||||
s.ch = make(chan walletevent.Event, 100)
|
|
||||||
s.subscriptions = s.eventFeed.Subscribe(s.ch)
|
|
||||||
go s.processEvents()
|
|
||||||
}
|
|
||||||
|
|
||||||
// processEvents runs only if more than one session is active
|
|
||||||
func (s *Service) processEvents() {
|
|
||||||
defer gocommon.LogOnPanic()
|
|
||||||
eventCount := 0
|
|
||||||
lastUpdate := time.Now().UnixMilli()
|
|
||||||
for event := range s.ch {
|
|
||||||
if event.Type == transactions.EventPendingTransactionUpdate ||
|
|
||||||
event.Type == transactions.EventPendingTransactionStatusChanged ||
|
|
||||||
event.Type == transfer.EventNewTransfers {
|
|
||||||
eventCount++
|
|
||||||
}
|
|
||||||
// debounce events updates
|
|
||||||
if eventCount > 0 &&
|
|
||||||
(time.Duration(time.Now().UnixMilli()-lastUpdate)*time.Millisecond) >= s.debounceDuration {
|
|
||||||
s.detectNew(eventCount)
|
|
||||||
eventCount = 0
|
|
||||||
lastUpdate = time.Now().UnixMilli()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) detectNew(changeCount int) {
|
|
||||||
for sessionID := range s.sessions {
|
|
||||||
session := s.sessions[sessionID]
|
|
||||||
|
|
||||||
fetchLen := len(session.model) + changeCount
|
|
||||||
allAddresses := s.areAllAddresses(session.addresses)
|
|
||||||
activities, err := getActivityEntries(context.Background(), s.getDeps(), session.addresses, allAddresses, session.chainIDs, session.filter, 0, fetchLen)
|
|
||||||
if err != nil {
|
|
||||||
logutils.ZapLogger().Error("Error getting activity entries", zap.Error(err))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
s.sessionsRWMutex.RLock()
|
|
||||||
allData := append(session.new, session.model...)
|
|
||||||
new, _ /*removed*/ := findUpdates(allData, activities)
|
|
||||||
s.sessionsRWMutex.RUnlock()
|
|
||||||
|
|
||||||
s.sessionsRWMutex.Lock()
|
|
||||||
lastProcessed := -1
|
|
||||||
onTop := true
|
|
||||||
var mixed []*EntryUpdate
|
|
||||||
for i, idRes := range new {
|
|
||||||
// Detect on top
|
|
||||||
if onTop {
|
|
||||||
// mixedIdentityResult.newPos includes session.new, therefore compensate for it
|
|
||||||
if ((idRes.newPos - len(session.new)) - lastProcessed) > 1 {
|
|
||||||
// From now on the events are not on top and continuous but mixed between existing entries
|
|
||||||
onTop = false
|
|
||||||
mixed = make([]*EntryUpdate, 0, len(new)-i)
|
|
||||||
}
|
|
||||||
lastProcessed = idRes.newPos
|
|
||||||
}
|
|
||||||
|
|
||||||
if onTop {
|
|
||||||
if session.new == nil {
|
|
||||||
session.new = make([]EntryIdentity, 0, len(new))
|
|
||||||
}
|
|
||||||
session.new = append(session.new, idRes.id)
|
|
||||||
} else {
|
|
||||||
modelPos := idRes.newPos - len(session.new)
|
|
||||||
entry := activities[idRes.newPos]
|
|
||||||
entry.isNew = true
|
|
||||||
mixed = append(mixed, &EntryUpdate{
|
|
||||||
Pos: modelPos,
|
|
||||||
Entry: &entry,
|
|
||||||
})
|
|
||||||
// Insert in session model at modelPos index
|
|
||||||
session.model = append(session.model[:modelPos], append([]EntryIdentity{{payloadType: entry.payloadType, transaction: entry.transaction, id: entry.id}}, session.model[modelPos:]...)...)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
s.sessionsRWMutex.Unlock()
|
|
||||||
|
|
||||||
if len(session.new) > 0 || len(mixed) > 0 {
|
|
||||||
go notify(s.eventFeed, sessionID, len(session.new) > 0, mixed)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func notify(eventFeed *event.Feed, id SessionID, hasNewOnTop bool, mixed []*EntryUpdate) {
|
|
||||||
defer gocommon.LogOnPanic()
|
|
||||||
payload := SessionUpdate{
|
|
||||||
New: mixed,
|
|
||||||
}
|
|
||||||
|
|
||||||
if hasNewOnTop {
|
|
||||||
payload.HasNewOnTop = &hasNewOnTop
|
|
||||||
}
|
|
||||||
|
|
||||||
sendResponseEvent(eventFeed, (*int32)(&id), EventActivitySessionUpdated, payload, nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
// unsubscribeFromEvents should be called with sessionsRWMutex locked for writing
|
|
||||||
func (s *Service) unsubscribeFromEvents() {
|
|
||||||
s.subscriptions.Unsubscribe()
|
|
||||||
close(s.ch)
|
|
||||||
s.ch = nil
|
|
||||||
s.subscriptions = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Service) StopFilterSession(id SessionID) {
|
|
||||||
s.sessionsRWMutex.Lock()
|
|
||||||
delete(s.sessions, id)
|
|
||||||
if len(s.sessions) == 0 {
|
|
||||||
s.unsubscribeFromEvents()
|
|
||||||
}
|
|
||||||
s.sessionsRWMutex.Unlock()
|
|
||||||
|
|
||||||
// Cancel any pending or ongoing task
|
|
||||||
s.scheduler.Enqueue(int32(id), filterTask, func(ctx context.Context) (interface{}, error) {
|
|
||||||
return nil, nil
|
|
||||||
}, func(result interface{}, taskType async.TaskType, err error) {})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Service) getActivityDetailsAsync(requestID int32, entries []Entry) {
|
|
||||||
if len(entries) == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx := context.Background()
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
defer gocommon.LogOnPanic()
|
|
||||||
activityData, err := s.getActivityDetails(ctx, entries)
|
|
||||||
if len(activityData) != 0 {
|
|
||||||
sendResponseEvent(s.eventFeed, &requestID, EventActivityFilteringUpdate, activityData, err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
type mixedIdentityResult struct {
|
|
||||||
newPos int
|
|
||||||
id EntryIdentity
|
|
||||||
}
|
|
||||||
|
|
||||||
func entryIdsToMap(ids []EntryIdentity) map[string]EntryIdentity {
|
|
||||||
idsMap := make(map[string]EntryIdentity, len(ids))
|
|
||||||
for _, id := range ids {
|
|
||||||
idsMap[id.key()] = id
|
|
||||||
}
|
|
||||||
return idsMap
|
|
||||||
}
|
|
||||||
|
|
||||||
func entriesToMap(entries []Entry) map[string]Entry {
|
|
||||||
entryMap := make(map[string]Entry, len(entries))
|
|
||||||
for _, entry := range entries {
|
|
||||||
updatedIdentity := entry.getIdentity()
|
|
||||||
entryMap[updatedIdentity.key()] = entry
|
|
||||||
}
|
|
||||||
return entryMap
|
|
||||||
}
|
|
||||||
|
|
||||||
// FindUpdates returns changes in updated entries compared to the identities
|
|
||||||
//
|
|
||||||
// expects identities and entries to be sorted by timestamp
|
|
||||||
//
|
|
||||||
// the returned newer are entries that are newer than the first identity
|
|
||||||
// the returned mixed are entries that are older than the first identity (sorted by timestamp)
|
|
||||||
// the returned removed are identities that are not present in the updated entries (sorted by timestamp)
|
|
||||||
//
|
|
||||||
// implementation assumes the order of each identity doesn't change from old state (identities) and new state (updated); we have either add or removed.
|
|
||||||
func findUpdates(identities []EntryIdentity, updated []Entry) (new []mixedIdentityResult, removed []EntryIdentity) {
|
|
||||||
if len(updated) == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
idsMap := entryIdsToMap(identities)
|
|
||||||
updatedMap := entriesToMap(updated)
|
|
||||||
|
|
||||||
for newIndex, entry := range updated {
|
|
||||||
id := entry.getIdentity()
|
|
||||||
if _, found := idsMap[id.key()]; !found {
|
|
||||||
new = append(new, mixedIdentityResult{
|
|
||||||
newPos: newIndex,
|
|
||||||
id: id,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(identities) > 0 && entry.getIdentity().same(identities[len(identities)-1]) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Account for new entries
|
|
||||||
for i := 0; i < len(identities); i++ {
|
|
||||||
id := identities[i]
|
|
||||||
if _, found := updatedMap[id.key()]; !found {
|
|
||||||
removed = append(removed, id)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
|
@ -0,0 +1,623 @@
|
||||||
|
package activity
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"strconv"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
eth "github.com/ethereum/go-ethereum/common"
|
||||||
|
"github.com/ethereum/go-ethereum/event"
|
||||||
|
|
||||||
|
gocommon "github.com/status-im/status-go/common"
|
||||||
|
"github.com/status-im/status-go/logutils"
|
||||||
|
"github.com/status-im/status-go/services/wallet/async"
|
||||||
|
"github.com/status-im/status-go/services/wallet/common"
|
||||||
|
"github.com/status-im/status-go/services/wallet/responses"
|
||||||
|
"github.com/status-im/status-go/services/wallet/routeexecution"
|
||||||
|
"github.com/status-im/status-go/services/wallet/transfer"
|
||||||
|
"github.com/status-im/status-go/services/wallet/walletevent"
|
||||||
|
"github.com/status-im/status-go/transactions"
|
||||||
|
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Version string
|
||||||
|
|
||||||
|
const (
|
||||||
|
V1 Version = "v1"
|
||||||
|
V2 Version = "v2"
|
||||||
|
)
|
||||||
|
|
||||||
|
type TransactionID struct {
|
||||||
|
ChainID common.ChainID
|
||||||
|
Hash eth.Hash
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t TransactionID) key() string {
|
||||||
|
return strconv.FormatUint(uint64(t.ChainID), 10) + t.Hash.Hex()
|
||||||
|
}
|
||||||
|
|
||||||
|
type EntryUpdate struct {
|
||||||
|
Pos int `json:"pos"`
|
||||||
|
Entry *Entry `json:"entry"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// SessionUpdate payload for EventActivitySessionUpdated
|
||||||
|
type SessionUpdate struct {
|
||||||
|
HasNewOnTop *bool `json:"hasNewOnTop,omitempty"`
|
||||||
|
New []*EntryUpdate `json:"new,omitempty"`
|
||||||
|
Removed []EntryIdentity `json:"removed,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type fullFilterParams struct {
|
||||||
|
sessionID SessionID
|
||||||
|
version Version
|
||||||
|
addresses []eth.Address
|
||||||
|
chainIDs []common.ChainID
|
||||||
|
filter Filter
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) getActivityEntries(ctx context.Context, f fullFilterParams, offset int, count int) ([]Entry, error) {
|
||||||
|
allAddresses := s.areAllAddresses(f.addresses)
|
||||||
|
if f.version == V1 {
|
||||||
|
return getActivityEntries(ctx, s.getDeps(), f.addresses, allAddresses, f.chainIDs, f.filter, offset, count)
|
||||||
|
}
|
||||||
|
return getActivityEntriesV2(ctx, s.getDeps(), f.addresses, allAddresses, f.chainIDs, f.filter, offset, count)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) internalFilter(f fullFilterParams, offset int, count int, processResults func(entries []Entry) (offsetOverride int)) {
|
||||||
|
s.scheduler.Enqueue(int32(f.sessionID), filterTask, func(ctx context.Context) (interface{}, error) {
|
||||||
|
return s.getActivityEntries(ctx, f, offset, count)
|
||||||
|
}, func(result interface{}, taskType async.TaskType, err error) {
|
||||||
|
res := FilterResponse{
|
||||||
|
ErrorCode: ErrorCodeFailed,
|
||||||
|
}
|
||||||
|
|
||||||
|
if errors.Is(err, context.Canceled) || errors.Is(err, async.ErrTaskOverwritten) {
|
||||||
|
res.ErrorCode = ErrorCodeTaskCanceled
|
||||||
|
} else if err == nil {
|
||||||
|
activities := result.([]Entry)
|
||||||
|
res.Activities = activities
|
||||||
|
res.HasMore = len(activities) == count
|
||||||
|
res.ErrorCode = ErrorCodeSuccess
|
||||||
|
|
||||||
|
res.Offset = processResults(activities)
|
||||||
|
}
|
||||||
|
|
||||||
|
int32SessionID := int32(f.sessionID)
|
||||||
|
sendResponseEvent(s.eventFeed, &int32SessionID, EventActivityFilteringDone, res, err)
|
||||||
|
|
||||||
|
s.getActivityDetailsAsync(int32SessionID, res.Activities)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// mirrorIdentities for update use
|
||||||
|
func mirrorIdentities(entries []Entry) []EntryIdentity {
|
||||||
|
model := make([]EntryIdentity, 0, len(entries))
|
||||||
|
for _, a := range entries {
|
||||||
|
model = append(model, EntryIdentity{
|
||||||
|
payloadType: a.payloadType,
|
||||||
|
transaction: a.transaction,
|
||||||
|
id: a.id,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return model
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) internalFilterForSession(session *Session, firstPageCount int) {
|
||||||
|
s.internalFilter(
|
||||||
|
session.getFullFilterParams(),
|
||||||
|
0,
|
||||||
|
firstPageCount,
|
||||||
|
func(entries []Entry) (offset int) {
|
||||||
|
session.model = mirrorIdentities(entries)
|
||||||
|
|
||||||
|
return 0
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) StartFilterSession(addresses []eth.Address, chainIDs []common.ChainID, filter Filter, firstPageCount int, version Version) SessionID {
|
||||||
|
sessionID := s.nextSessionID()
|
||||||
|
|
||||||
|
session := &Session{
|
||||||
|
id: sessionID,
|
||||||
|
version: version,
|
||||||
|
|
||||||
|
addresses: addresses,
|
||||||
|
chainIDs: chainIDs,
|
||||||
|
filter: filter,
|
||||||
|
|
||||||
|
model: make([]EntryIdentity, 0, firstPageCount),
|
||||||
|
|
||||||
|
mu: &sync.RWMutex{},
|
||||||
|
}
|
||||||
|
|
||||||
|
s.addSession(session)
|
||||||
|
|
||||||
|
session.mu.Lock()
|
||||||
|
defer session.mu.Unlock()
|
||||||
|
s.internalFilterForSession(session, firstPageCount)
|
||||||
|
|
||||||
|
return sessionID
|
||||||
|
}
|
||||||
|
|
||||||
|
// UpdateFilterForSession is to be called for updating the filter of a specific session
|
||||||
|
// After calling this method to set a filter all the incoming changes will be reported with
|
||||||
|
// Entry.isNew = true when filter is reset to empty
|
||||||
|
func (s *Service) UpdateFilterForSession(id SessionID, filter Filter, firstPageCount int) error {
|
||||||
|
session := s.getSession(id)
|
||||||
|
if session == nil {
|
||||||
|
return ErrSessionNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
session.mu.Lock()
|
||||||
|
defer session.mu.Unlock()
|
||||||
|
|
||||||
|
prevFilterEmpty := session.filter.IsEmpty()
|
||||||
|
newFilerEmpty := filter.IsEmpty()
|
||||||
|
|
||||||
|
session.new = nil
|
||||||
|
|
||||||
|
session.filter = filter
|
||||||
|
|
||||||
|
if prevFilterEmpty && !newFilerEmpty {
|
||||||
|
// Session is moving from empty to non-empty filter
|
||||||
|
// Take a snapshot of the current model
|
||||||
|
session.noFilterModel = entryIdsToMap(session.model)
|
||||||
|
|
||||||
|
session.model = make([]EntryIdentity, 0, firstPageCount)
|
||||||
|
|
||||||
|
// In this case there is nothing to flag so we request the first page
|
||||||
|
s.internalFilterForSession(session, firstPageCount)
|
||||||
|
} else if !prevFilterEmpty && newFilerEmpty {
|
||||||
|
// Session is moving from non-empty to empty filter
|
||||||
|
// In this case we need to flag all the new entries that are not in the noFilterModel
|
||||||
|
s.internalFilter(
|
||||||
|
session.getFullFilterParams(),
|
||||||
|
0,
|
||||||
|
firstPageCount,
|
||||||
|
func(entries []Entry) (offset int) {
|
||||||
|
// Mark new entries
|
||||||
|
for i, a := range entries {
|
||||||
|
_, found := session.noFilterModel[a.getIdentity().key()]
|
||||||
|
entries[i].isNew = !found
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mirror identities for update use
|
||||||
|
session.model = mirrorIdentities(entries)
|
||||||
|
session.noFilterModel = nil
|
||||||
|
return 0
|
||||||
|
},
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
// Else act as a normal filter update
|
||||||
|
s.internalFilterForSession(session, firstPageCount)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ResetFilterSession is to be called when SessionUpdate.HasNewOnTop == true to
|
||||||
|
// update client with the latest state including new on top entries
|
||||||
|
func (s *Service) ResetFilterSession(id SessionID, firstPageCount int) error {
|
||||||
|
session := s.getSession(id)
|
||||||
|
if session == nil {
|
||||||
|
return ErrSessionNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
session.mu.Lock()
|
||||||
|
defer session.mu.Unlock()
|
||||||
|
|
||||||
|
s.internalFilter(
|
||||||
|
session.getFullFilterParams(),
|
||||||
|
0,
|
||||||
|
firstPageCount,
|
||||||
|
func(entries []Entry) (offset int) {
|
||||||
|
// Mark new entries
|
||||||
|
newMap := entryIdsToMap(session.new)
|
||||||
|
for i, a := range entries {
|
||||||
|
_, isNew := newMap[a.getIdentity().key()]
|
||||||
|
entries[i].isNew = isNew
|
||||||
|
}
|
||||||
|
session.new = nil
|
||||||
|
|
||||||
|
if session.noFilterModel != nil {
|
||||||
|
// Add reported new entries to mark them as seen
|
||||||
|
for _, a := range newMap {
|
||||||
|
session.noFilterModel[a.key()] = a
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mirror client identities for checking updates
|
||||||
|
session.model = mirrorIdentities(entries)
|
||||||
|
|
||||||
|
return 0
|
||||||
|
},
|
||||||
|
)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) GetMoreForFilterSession(id SessionID, pageCount int) error {
|
||||||
|
session := s.getSession(id)
|
||||||
|
if session == nil {
|
||||||
|
return ErrSessionNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
session.mu.Lock()
|
||||||
|
defer session.mu.Unlock()
|
||||||
|
|
||||||
|
prevModelLen := len(session.model)
|
||||||
|
s.internalFilter(
|
||||||
|
session.getFullFilterParams(),
|
||||||
|
prevModelLen+len(session.new),
|
||||||
|
pageCount,
|
||||||
|
func(entries []Entry) (offset int) {
|
||||||
|
// Mirror client identities for checking updates
|
||||||
|
for _, a := range entries {
|
||||||
|
session.model = append(session.model, EntryIdentity{
|
||||||
|
payloadType: a.payloadType,
|
||||||
|
transaction: a.transaction,
|
||||||
|
id: a.id,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Overwrite the offset to account for new entries
|
||||||
|
return prevModelLen
|
||||||
|
},
|
||||||
|
)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// subscribeToEvents should be called with sessionsRWMutex locked for writing
|
||||||
|
func (s *Service) subscribeToEvents() {
|
||||||
|
s.ch = make(chan walletevent.Event, 100)
|
||||||
|
s.subscriptions = s.eventFeed.Subscribe(s.ch)
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
s.subscriptionsCancelFn = cancel
|
||||||
|
go s.processEvents(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// processEvents runs only if more than one session is active
|
||||||
|
func (s *Service) processEvents(ctx context.Context) {
|
||||||
|
defer gocommon.LogOnPanic()
|
||||||
|
eventCount := 0
|
||||||
|
changedTxs := make([]TransactionID, 0)
|
||||||
|
newTxs := false
|
||||||
|
|
||||||
|
var debounceTimer *time.Timer
|
||||||
|
debouncerCh := make(chan struct{})
|
||||||
|
debounceProcessChangesFn := func() {
|
||||||
|
if debounceTimer == nil {
|
||||||
|
debounceTimer = time.AfterFunc(s.debounceDuration, func() {
|
||||||
|
debouncerCh <- struct{}{}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case event := <-s.ch:
|
||||||
|
switch event.Type {
|
||||||
|
case transactions.EventPendingTransactionUpdate:
|
||||||
|
eventCount++
|
||||||
|
var payload transactions.PendingTxUpdatePayload
|
||||||
|
if err := json.Unmarshal([]byte(event.Message), &payload); err != nil {
|
||||||
|
logutils.ZapLogger().Error("Error unmarshalling PendingTxUpdatePayload", zap.Error(err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
changedTxs = append(changedTxs, TransactionID{
|
||||||
|
ChainID: payload.ChainID,
|
||||||
|
Hash: payload.Hash,
|
||||||
|
})
|
||||||
|
debounceProcessChangesFn()
|
||||||
|
case transactions.EventPendingTransactionStatusChanged:
|
||||||
|
eventCount++
|
||||||
|
var payload transactions.StatusChangedPayload
|
||||||
|
if err := json.Unmarshal([]byte(event.Message), &payload); err != nil {
|
||||||
|
logutils.ZapLogger().Error("Error unmarshalling StatusChangedPayload", zap.Error(err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
changedTxs = append(changedTxs, TransactionID{
|
||||||
|
ChainID: payload.ChainID,
|
||||||
|
Hash: payload.Hash,
|
||||||
|
})
|
||||||
|
debounceProcessChangesFn()
|
||||||
|
case transfer.EventNewTransfers:
|
||||||
|
eventCount++
|
||||||
|
// No updates here, these are detected with their final state, just trigger
|
||||||
|
// the detection of new entries
|
||||||
|
newTxs = true
|
||||||
|
debounceProcessChangesFn()
|
||||||
|
case routeexecution.EventRouteExecutionTransactionSent:
|
||||||
|
sentTxs, ok := event.EventParams.(*responses.RouterSentTransactions)
|
||||||
|
if ok && sentTxs != nil {
|
||||||
|
for _, tx := range sentTxs.SentTransactions {
|
||||||
|
changedTxs = append(changedTxs, TransactionID{
|
||||||
|
ChainID: common.ChainID(tx.FromChain),
|
||||||
|
Hash: eth.Hash(tx.Hash),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
debounceProcessChangesFn()
|
||||||
|
}
|
||||||
|
case <-debouncerCh:
|
||||||
|
if eventCount > 0 || newTxs || len(changedTxs) > 0 {
|
||||||
|
s.processChanges(eventCount, changedTxs)
|
||||||
|
eventCount = 0
|
||||||
|
newTxs = false
|
||||||
|
changedTxs = nil
|
||||||
|
debounceTimer = nil
|
||||||
|
}
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) processChangesForSession(session *Session, eventCount int, changedTxs []TransactionID) {
|
||||||
|
session.mu.Lock()
|
||||||
|
defer session.mu.Unlock()
|
||||||
|
|
||||||
|
f := session.getFullFilterParams()
|
||||||
|
limit := NoLimit
|
||||||
|
if session.version == V1 {
|
||||||
|
limit = len(session.model) + eventCount
|
||||||
|
}
|
||||||
|
activities, err := s.getActivityEntries(context.Background(), f, 0, limit)
|
||||||
|
if err != nil {
|
||||||
|
logutils.ZapLogger().Error("Error getting activity entries", zap.Error(err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if session.version != V1 {
|
||||||
|
s.processEntryDataUpdates(session.id, activities, changedTxs)
|
||||||
|
}
|
||||||
|
|
||||||
|
allData := append(session.new, session.model...)
|
||||||
|
new, _ /*removed*/ := findUpdates(allData, activities)
|
||||||
|
|
||||||
|
lastProcessed := -1
|
||||||
|
onTop := true
|
||||||
|
var mixed []*EntryUpdate
|
||||||
|
for i, idRes := range new {
|
||||||
|
// Detect on top
|
||||||
|
if onTop {
|
||||||
|
// mixedIdentityResult.newPos includes session.new, therefore compensate for it
|
||||||
|
if ((idRes.newPos - len(session.new)) - lastProcessed) > 1 {
|
||||||
|
// From now on the events are not on top and continuous but mixed between existing entries
|
||||||
|
onTop = false
|
||||||
|
mixed = make([]*EntryUpdate, 0, len(new)-i)
|
||||||
|
}
|
||||||
|
lastProcessed = idRes.newPos
|
||||||
|
}
|
||||||
|
|
||||||
|
if onTop {
|
||||||
|
if session.new == nil {
|
||||||
|
session.new = make([]EntryIdentity, 0, len(new))
|
||||||
|
}
|
||||||
|
session.new = append(session.new, idRes.id)
|
||||||
|
} else {
|
||||||
|
modelPos := idRes.newPos - len(session.new)
|
||||||
|
entry := activities[idRes.newPos]
|
||||||
|
entry.isNew = true
|
||||||
|
mixed = append(mixed, &EntryUpdate{
|
||||||
|
Pos: modelPos,
|
||||||
|
Entry: &entry,
|
||||||
|
})
|
||||||
|
// Insert in session model at modelPos index
|
||||||
|
session.model = append(session.model[:modelPos], append([]EntryIdentity{{payloadType: entry.payloadType, transaction: entry.transaction, id: entry.id}}, session.model[modelPos:]...)...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(session.new) > 0 || len(mixed) > 0 {
|
||||||
|
go notify(s.eventFeed, session.id, len(session.new) > 0, mixed)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) processChanges(eventCount int, changedTxs []TransactionID) {
|
||||||
|
sessions := s.getAllSessions()
|
||||||
|
for _, session := range sessions {
|
||||||
|
s.processChangesForSession(session, eventCount, changedTxs)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) processEntryDataUpdates(sessionID SessionID, entries []Entry, changedTxs []TransactionID) {
|
||||||
|
updateData := make([]*EntryData, 0, len(changedTxs))
|
||||||
|
|
||||||
|
entriesMap := make(map[string]Entry, len(entries))
|
||||||
|
for _, e := range entries {
|
||||||
|
if e.payloadType == MultiTransactionPT {
|
||||||
|
if e.id != common.NoMultiTransactionID {
|
||||||
|
for _, tx := range e.transactions {
|
||||||
|
id := TransactionID{
|
||||||
|
ChainID: tx.ChainID,
|
||||||
|
Hash: tx.Hash,
|
||||||
|
}
|
||||||
|
entriesMap[id.key()] = e
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if e.transaction != nil {
|
||||||
|
id := TransactionID{
|
||||||
|
ChainID: e.transaction.ChainID,
|
||||||
|
Hash: e.transaction.Hash,
|
||||||
|
}
|
||||||
|
entriesMap[id.key()] = e
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tx := range changedTxs {
|
||||||
|
e, found := entriesMap[tx.key()]
|
||||||
|
if !found {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
data := &EntryData{
|
||||||
|
ActivityStatus: &e.activityStatus,
|
||||||
|
}
|
||||||
|
if e.payloadType == MultiTransactionPT {
|
||||||
|
data.ID = common.NewAndSet(e.id)
|
||||||
|
} else {
|
||||||
|
data.Transaction = e.transaction
|
||||||
|
}
|
||||||
|
data.PayloadType = e.payloadType
|
||||||
|
|
||||||
|
updateData = append(updateData, data)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(updateData) > 0 {
|
||||||
|
requestID := int32(sessionID)
|
||||||
|
sendResponseEvent(s.eventFeed, &requestID, EventActivityFilteringUpdate, updateData, nil)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func notify(eventFeed *event.Feed, id SessionID, hasNewOnTop bool, mixed []*EntryUpdate) {
|
||||||
|
defer gocommon.LogOnPanic()
|
||||||
|
payload := SessionUpdate{
|
||||||
|
New: mixed,
|
||||||
|
}
|
||||||
|
|
||||||
|
if hasNewOnTop {
|
||||||
|
payload.HasNewOnTop = &hasNewOnTop
|
||||||
|
}
|
||||||
|
|
||||||
|
sendResponseEvent(eventFeed, (*int32)(&id), EventActivitySessionUpdated, payload, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
// unsubscribeFromEvents should be called with sessionsRWMutex locked for writing
|
||||||
|
func (s *Service) unsubscribeFromEvents() {
|
||||||
|
s.subscriptionsCancelFn()
|
||||||
|
s.subscriptionsCancelFn = nil
|
||||||
|
s.subscriptions.Unsubscribe()
|
||||||
|
close(s.ch)
|
||||||
|
s.ch = nil
|
||||||
|
s.subscriptions = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) StopFilterSession(id SessionID) {
|
||||||
|
s.removeSesssion(id)
|
||||||
|
|
||||||
|
// Cancel any pending or ongoing task
|
||||||
|
s.scheduler.Enqueue(int32(id), filterTask, func(ctx context.Context) (interface{}, error) {
|
||||||
|
return nil, nil
|
||||||
|
}, func(result interface{}, taskType async.TaskType, err error) {})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) getActivityDetailsAsync(requestID int32, entries []Entry) {
|
||||||
|
if len(entries) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer gocommon.LogOnPanic()
|
||||||
|
activityData, err := s.getActivityDetails(ctx, entries)
|
||||||
|
if len(activityData) != 0 {
|
||||||
|
sendResponseEvent(s.eventFeed, &requestID, EventActivityFilteringUpdate, activityData, err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
type mixedIdentityResult struct {
|
||||||
|
newPos int
|
||||||
|
id EntryIdentity
|
||||||
|
}
|
||||||
|
|
||||||
|
func entryIdsToMap(ids []EntryIdentity) map[string]EntryIdentity {
|
||||||
|
idsMap := make(map[string]EntryIdentity, len(ids))
|
||||||
|
for _, id := range ids {
|
||||||
|
idsMap[id.key()] = id
|
||||||
|
}
|
||||||
|
return idsMap
|
||||||
|
}
|
||||||
|
|
||||||
|
func entriesToMap(entries []Entry) map[string]Entry {
|
||||||
|
entryMap := make(map[string]Entry, len(entries))
|
||||||
|
for _, entry := range entries {
|
||||||
|
updatedIdentity := entry.getIdentity()
|
||||||
|
entryMap[updatedIdentity.key()] = entry
|
||||||
|
}
|
||||||
|
return entryMap
|
||||||
|
}
|
||||||
|
|
||||||
|
// FindUpdates returns changes in updated entries compared to the identities
|
||||||
|
//
|
||||||
|
// expects identities and entries to be sorted by timestamp
|
||||||
|
//
|
||||||
|
// the returned newer are entries that are newer than the first identity
|
||||||
|
// the returned mixed are entries that are older than the first identity (sorted by timestamp)
|
||||||
|
// the returned removed are identities that are not present in the updated entries (sorted by timestamp)
|
||||||
|
//
|
||||||
|
// implementation assumes the order of each identity doesn't change from old state (identities) and new state (updated); we have either add or removed.
|
||||||
|
func findUpdates(identities []EntryIdentity, updated []Entry) (new []mixedIdentityResult, removed []EntryIdentity) {
|
||||||
|
if len(updated) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
idsMap := entryIdsToMap(identities)
|
||||||
|
updatedMap := entriesToMap(updated)
|
||||||
|
|
||||||
|
for newIndex, entry := range updated {
|
||||||
|
id := entry.getIdentity()
|
||||||
|
if _, found := idsMap[id.key()]; !found {
|
||||||
|
new = append(new, mixedIdentityResult{
|
||||||
|
newPos: newIndex,
|
||||||
|
id: id,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(identities) > 0 && entry.getIdentity().same(identities[len(identities)-1]) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Account for new entries
|
||||||
|
for i := 0; i < len(identities); i++ {
|
||||||
|
id := identities[i]
|
||||||
|
if _, found := updatedMap[id.key()]; !found {
|
||||||
|
removed = append(removed, id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) addSession(session *Session) {
|
||||||
|
s.sessionsRWMutex.Lock()
|
||||||
|
defer s.sessionsRWMutex.Unlock()
|
||||||
|
subscribeToEvents := len(s.sessions) == 0
|
||||||
|
|
||||||
|
s.sessions[session.id] = session
|
||||||
|
|
||||||
|
if subscribeToEvents {
|
||||||
|
s.subscribeToEvents()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) removeSesssion(id SessionID) {
|
||||||
|
s.sessionsRWMutex.Lock()
|
||||||
|
defer s.sessionsRWMutex.Unlock()
|
||||||
|
delete(s.sessions, id)
|
||||||
|
if len(s.sessions) == 0 {
|
||||||
|
s.unsubscribeFromEvents()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) getSession(id SessionID) *Session {
|
||||||
|
s.sessionsRWMutex.RLock()
|
||||||
|
defer s.sessionsRWMutex.RUnlock()
|
||||||
|
return s.sessions[id]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) getAllSessions() []*Session {
|
||||||
|
s.sessionsRWMutex.RLock()
|
||||||
|
defer s.sessionsRWMutex.RUnlock()
|
||||||
|
sessions := make([]*Session, 0, len(s.sessions))
|
||||||
|
for _, session := range s.sessions {
|
||||||
|
sessions = append(sessions, session)
|
||||||
|
}
|
||||||
|
return sessions
|
||||||
|
}
|
|
@ -0,0 +1,339 @@
|
||||||
|
package activity
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math/big"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
eth "github.com/ethereum/go-ethereum/common"
|
||||||
|
"github.com/status-im/status-go/services/wallet/common"
|
||||||
|
"github.com/status-im/status-go/services/wallet/transfer"
|
||||||
|
"github.com/status-im/status-go/transactions"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestService_IncrementalUpdateOnTop(t *testing.T) {
|
||||||
|
state := setupTestService(t)
|
||||||
|
defer state.close()
|
||||||
|
|
||||||
|
transactionCount := 2
|
||||||
|
allAddresses, pendings, ch, cleanup := setupTransactions(t, state, transactionCount, []transactions.TestTxSummary{{DontConfirm: true, Timestamp: transactionCount + 1}})
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
sessionID := state.service.StartFilterSession(allAddresses, allNetworksFilter(), Filter{}, 5, V1)
|
||||||
|
require.Greater(t, sessionID, SessionID(0))
|
||||||
|
defer state.service.StopFilterSession(sessionID)
|
||||||
|
|
||||||
|
filterResponseCount := validateFilteringDone(t, ch, 2, nil, nil)
|
||||||
|
|
||||||
|
exp := pendings[0]
|
||||||
|
err := state.pendingTracker.StoreAndTrackPendingTx(&exp)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
vFn := getValidateSessionUpdateHasNewOnTopFn(t)
|
||||||
|
pendingTransactionUpdate, sessionUpdatesCount := validateSessionUpdateEvent(t, ch, &filterResponseCount, 1, vFn)
|
||||||
|
|
||||||
|
err = state.service.ResetFilterSession(sessionID, 5)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Validate the reset data
|
||||||
|
eventActivityDoneCount := validateFilteringDone(t, ch, 3, func(payload FilterResponse) {
|
||||||
|
require.True(t, payload.Activities[0].isNew)
|
||||||
|
require.False(t, payload.Activities[1].isNew)
|
||||||
|
require.False(t, payload.Activities[2].isNew)
|
||||||
|
|
||||||
|
// Check the new transaction data
|
||||||
|
newTx := payload.Activities[0]
|
||||||
|
require.Equal(t, PendingTransactionPT, newTx.payloadType)
|
||||||
|
// We don't keep type in the DB
|
||||||
|
require.Equal(t, (*int64)(nil), newTx.transferType)
|
||||||
|
require.Equal(t, SendAT, newTx.activityType)
|
||||||
|
require.Equal(t, PendingAS, newTx.activityStatus)
|
||||||
|
require.Equal(t, exp.ChainID, newTx.transaction.ChainID)
|
||||||
|
require.Equal(t, exp.ChainID, *newTx.chainIDOut)
|
||||||
|
require.Equal(t, (*common.ChainID)(nil), newTx.chainIDIn)
|
||||||
|
require.Equal(t, exp.Hash, newTx.transaction.Hash)
|
||||||
|
// Pending doesn't have address as part of identity
|
||||||
|
require.Equal(t, eth.Address{}, newTx.transaction.Address)
|
||||||
|
require.Equal(t, exp.From, *newTx.sender)
|
||||||
|
require.Equal(t, exp.To, *newTx.recipient)
|
||||||
|
require.Equal(t, 0, exp.Value.Int.Cmp((*big.Int)(newTx.amountOut)))
|
||||||
|
require.Equal(t, exp.Timestamp, uint64(newTx.timestamp))
|
||||||
|
require.Equal(t, exp.Symbol, *newTx.symbolOut)
|
||||||
|
require.Equal(t, (*string)(nil), newTx.symbolIn)
|
||||||
|
require.Equal(t, &Token{
|
||||||
|
TokenType: Native,
|
||||||
|
ChainID: 5,
|
||||||
|
}, newTx.tokenOut)
|
||||||
|
require.Equal(t, (*Token)(nil), newTx.tokenIn)
|
||||||
|
require.Equal(t, (*eth.Address)(nil), newTx.contractAddress)
|
||||||
|
|
||||||
|
// Check the order of the following transaction data
|
||||||
|
require.Equal(t, SimpleTransactionPT, payload.Activities[1].payloadType)
|
||||||
|
require.Equal(t, int64(transactionCount), payload.Activities[1].timestamp)
|
||||||
|
require.Equal(t, SimpleTransactionPT, payload.Activities[2].payloadType)
|
||||||
|
require.Equal(t, int64(transactionCount-1), payload.Activities[2].timestamp)
|
||||||
|
}, nil)
|
||||||
|
|
||||||
|
require.Equal(t, 1, pendingTransactionUpdate)
|
||||||
|
require.Equal(t, 1, filterResponseCount)
|
||||||
|
require.Equal(t, 1, sessionUpdatesCount)
|
||||||
|
require.Equal(t, 1, eventActivityDoneCount)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestService_IncrementalUpdateMixed(t *testing.T) {
|
||||||
|
state := setupTestService(t)
|
||||||
|
defer state.close()
|
||||||
|
|
||||||
|
transactionCount := 5
|
||||||
|
allAddresses, pendings, ch, cleanup := setupTransactions(t, state, transactionCount,
|
||||||
|
[]transactions.TestTxSummary{
|
||||||
|
{DontConfirm: true, Timestamp: 2},
|
||||||
|
{DontConfirm: true, Timestamp: 4},
|
||||||
|
{DontConfirm: true, Timestamp: 6},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
sessionID := state.service.StartFilterSession(allAddresses, allNetworksFilter(), Filter{}, 5, V1)
|
||||||
|
require.Greater(t, sessionID, SessionID(0))
|
||||||
|
defer state.service.StopFilterSession(sessionID)
|
||||||
|
|
||||||
|
filterResponseCount := validateFilteringDone(t, ch, 5, nil, nil)
|
||||||
|
|
||||||
|
for i := range pendings {
|
||||||
|
err := state.pendingTracker.StoreAndTrackPendingTx(&pendings[i])
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
pendingTransactionUpdate, sessionUpdatesCount := validateSessionUpdateEvent(t, ch, &filterResponseCount, 2, func(payload SessionUpdate) bool {
|
||||||
|
require.Nil(t, payload.HasNewOnTop)
|
||||||
|
require.NotEmpty(t, payload.New)
|
||||||
|
for _, update := range payload.New {
|
||||||
|
require.True(t, update.Entry.isNew)
|
||||||
|
foundIdx := -1
|
||||||
|
for i, pTx := range pendings {
|
||||||
|
if pTx.Hash == update.Entry.transaction.Hash && pTx.ChainID == update.Entry.transaction.ChainID {
|
||||||
|
foundIdx = i
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
require.Greater(t, foundIdx, -1, "the updated transaction should be found in the pending list")
|
||||||
|
pendings = append(pendings[:foundIdx], pendings[foundIdx+1:]...)
|
||||||
|
}
|
||||||
|
return len(pendings) == 1
|
||||||
|
})
|
||||||
|
|
||||||
|
// Validate that the last one (oldest) is out of the window
|
||||||
|
require.Equal(t, 1, len(pendings))
|
||||||
|
require.Equal(t, uint64(2), pendings[0].Timestamp)
|
||||||
|
|
||||||
|
require.Equal(t, 3, pendingTransactionUpdate)
|
||||||
|
require.LessOrEqual(t, sessionUpdatesCount, 3)
|
||||||
|
require.Equal(t, 1, filterResponseCount)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestService_IncrementalUpdateFetchWindow(t *testing.T) {
|
||||||
|
state := setupTestService(t)
|
||||||
|
defer state.close()
|
||||||
|
|
||||||
|
transactionCount := 5
|
||||||
|
allAddresses, pendings, ch, cleanup := setupTransactions(t, state, transactionCount, []transactions.TestTxSummary{{DontConfirm: true, Timestamp: transactionCount + 1}})
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
sessionID := state.service.StartFilterSession(allAddresses, allNetworksFilter(), Filter{}, 2, V1)
|
||||||
|
require.Greater(t, sessionID, SessionID(0))
|
||||||
|
defer state.service.StopFilterSession(sessionID)
|
||||||
|
|
||||||
|
filterResponseCount := validateFilteringDone(t, ch, 2, nil, nil)
|
||||||
|
|
||||||
|
exp := pendings[0]
|
||||||
|
err := state.pendingTracker.StoreAndTrackPendingTx(&exp)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
vFn := getValidateSessionUpdateHasNewOnTopFn(t)
|
||||||
|
pendingTransactionUpdate, sessionUpdatesCount := validateSessionUpdateEvent(t, ch, &filterResponseCount, 1, vFn)
|
||||||
|
|
||||||
|
err = state.service.ResetFilterSession(sessionID, 2)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Validate the reset data
|
||||||
|
eventActivityDoneCount := validateFilteringDone(t, ch, 2, func(payload FilterResponse) {
|
||||||
|
require.True(t, payload.Activities[0].isNew)
|
||||||
|
require.Equal(t, int64(transactionCount+1), payload.Activities[0].timestamp)
|
||||||
|
require.False(t, payload.Activities[1].isNew)
|
||||||
|
require.Equal(t, int64(transactionCount), payload.Activities[1].timestamp)
|
||||||
|
}, nil)
|
||||||
|
|
||||||
|
require.Equal(t, 1, pendingTransactionUpdate)
|
||||||
|
require.Equal(t, 1, filterResponseCount)
|
||||||
|
require.Equal(t, 1, sessionUpdatesCount)
|
||||||
|
require.Equal(t, 1, eventActivityDoneCount)
|
||||||
|
|
||||||
|
err = state.service.GetMoreForFilterSession(sessionID, 2)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
eventActivityDoneCount = validateFilteringDone(t, ch, 2, func(payload FilterResponse) {
|
||||||
|
require.False(t, payload.Activities[0].isNew)
|
||||||
|
require.Equal(t, int64(transactionCount-1), payload.Activities[0].timestamp)
|
||||||
|
require.False(t, payload.Activities[1].isNew)
|
||||||
|
require.Equal(t, int64(transactionCount-2), payload.Activities[1].timestamp)
|
||||||
|
}, common.NewAndSet(extraExpect{common.NewAndSet(2), nil}))
|
||||||
|
require.Equal(t, 1, eventActivityDoneCount)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestService_IncrementalUpdateFetchWindowNoReset(t *testing.T) {
|
||||||
|
state := setupTestService(t)
|
||||||
|
defer state.close()
|
||||||
|
|
||||||
|
transactionCount := 5
|
||||||
|
allAddresses, pendings, ch, cleanup := setupTransactions(t, state, transactionCount, []transactions.TestTxSummary{{DontConfirm: true, Timestamp: transactionCount + 1}})
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
sessionID := state.service.StartFilterSession(allAddresses, allNetworksFilter(), Filter{}, 2, V1)
|
||||||
|
require.Greater(t, sessionID, SessionID(0))
|
||||||
|
defer state.service.StopFilterSession(sessionID)
|
||||||
|
|
||||||
|
filterResponseCount := validateFilteringDone(t, ch, 2, func(payload FilterResponse) {
|
||||||
|
require.Equal(t, int64(transactionCount), payload.Activities[0].timestamp)
|
||||||
|
require.Equal(t, int64(transactionCount-1), payload.Activities[1].timestamp)
|
||||||
|
}, nil)
|
||||||
|
|
||||||
|
exp := pendings[0]
|
||||||
|
err := state.pendingTracker.StoreAndTrackPendingTx(&exp)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
vFn := getValidateSessionUpdateHasNewOnTopFn(t)
|
||||||
|
pendingTransactionUpdate, sessionUpdatesCount := validateSessionUpdateEvent(t, ch, &filterResponseCount, 1, vFn)
|
||||||
|
require.Equal(t, 1, pendingTransactionUpdate)
|
||||||
|
require.Equal(t, 1, filterResponseCount)
|
||||||
|
require.Equal(t, 1, sessionUpdatesCount)
|
||||||
|
|
||||||
|
err = state.service.GetMoreForFilterSession(sessionID, 2)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Validate that client continue loading the next window without being affected by the internal state of new
|
||||||
|
eventActivityDoneCount := validateFilteringDone(t, ch, 2, func(payload FilterResponse) {
|
||||||
|
require.False(t, payload.Activities[0].isNew)
|
||||||
|
require.Equal(t, int64(transactionCount-2), payload.Activities[0].timestamp)
|
||||||
|
require.False(t, payload.Activities[1].isNew)
|
||||||
|
require.Equal(t, int64(transactionCount-3), payload.Activities[1].timestamp)
|
||||||
|
}, common.NewAndSet(extraExpect{common.NewAndSet(2), nil}))
|
||||||
|
require.Equal(t, 1, eventActivityDoneCount)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Simulate and validate a multi-step user flow that was also a regression in the original implementation
|
||||||
|
func TestService_FilteredIncrementalUpdateResetAndClear(t *testing.T) {
|
||||||
|
state := setupTestService(t)
|
||||||
|
defer state.close()
|
||||||
|
|
||||||
|
transactionCount := 5
|
||||||
|
allAddresses, pendings, ch, cleanup := setupTransactions(t, state, transactionCount, []transactions.TestTxSummary{{DontConfirm: true, Timestamp: transactionCount + 1}})
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
// Generate new transaction for step 5
|
||||||
|
newOffset := transactionCount + 2
|
||||||
|
newTxs, newFromTrs, newToTrs := transfer.GenerateTestTransfers(t, state.service.db, newOffset, 1)
|
||||||
|
allAddresses = append(append(allAddresses, newFromTrs...), newToTrs...)
|
||||||
|
|
||||||
|
// 1. User visualizes transactions for the first time
|
||||||
|
sessionID := state.service.StartFilterSession(allAddresses, allNetworksFilter(), Filter{}, 4, V1)
|
||||||
|
require.Greater(t, sessionID, SessionID(0))
|
||||||
|
defer state.service.StopFilterSession(sessionID)
|
||||||
|
|
||||||
|
validateFilteringDone(t, ch, 4, nil, nil)
|
||||||
|
|
||||||
|
// 2. User applies a filter for pending transactions
|
||||||
|
err := state.service.UpdateFilterForSession(sessionID, Filter{Statuses: []Status{PendingAS}}, 4)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
filterResponseCount := validateFilteringDone(t, ch, 0, nil, nil)
|
||||||
|
|
||||||
|
// 3. A pending transaction is added
|
||||||
|
exp := pendings[0]
|
||||||
|
err = state.pendingTracker.StoreAndTrackPendingTx(&exp)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
vFn := getValidateSessionUpdateHasNewOnTopFn(t)
|
||||||
|
pendingTransactionUpdate, sessionUpdatesCount := validateSessionUpdateEvent(t, ch, &filterResponseCount, 1, vFn)
|
||||||
|
|
||||||
|
// 4. User resets the view and the new pending transaction has the new flag
|
||||||
|
err = state.service.ResetFilterSession(sessionID, 2)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Validate the reset data
|
||||||
|
eventActivityDoneCount := validateFilteringDone(t, ch, 1, func(payload FilterResponse) {
|
||||||
|
require.True(t, payload.Activities[0].isNew)
|
||||||
|
require.Equal(t, int64(transactionCount+1), payload.Activities[0].timestamp)
|
||||||
|
}, nil)
|
||||||
|
|
||||||
|
require.Equal(t, 1, pendingTransactionUpdate)
|
||||||
|
require.Equal(t, 1, filterResponseCount)
|
||||||
|
require.Equal(t, 1, sessionUpdatesCount)
|
||||||
|
require.Equal(t, 1, eventActivityDoneCount)
|
||||||
|
|
||||||
|
// 5. A new transaction is downloaded
|
||||||
|
transfer.InsertTestTransfer(t, state.service.db, newTxs[0].To, &newTxs[0])
|
||||||
|
|
||||||
|
// 6. User clears the filter and only the new transaction should have the new flag
|
||||||
|
err = state.service.UpdateFilterForSession(sessionID, Filter{}, 4)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
eventActivityDoneCount = validateFilteringDone(t, ch, 4, func(payload FilterResponse) {
|
||||||
|
require.True(t, payload.Activities[0].isNew)
|
||||||
|
require.Equal(t, int64(newOffset), payload.Activities[0].timestamp)
|
||||||
|
require.False(t, payload.Activities[1].isNew)
|
||||||
|
require.Equal(t, int64(newOffset-1), payload.Activities[1].timestamp)
|
||||||
|
require.False(t, payload.Activities[2].isNew)
|
||||||
|
require.Equal(t, int64(newOffset-2), payload.Activities[2].timestamp)
|
||||||
|
require.False(t, payload.Activities[3].isNew)
|
||||||
|
require.Equal(t, int64(newOffset-3), payload.Activities[3].timestamp)
|
||||||
|
}, nil)
|
||||||
|
require.Equal(t, 1, eventActivityDoneCount)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test the different session-related endpoints in a multi-threaded environment
|
||||||
|
func TestService_MultiThread(t *testing.T) {
|
||||||
|
state := setupTestService(t)
|
||||||
|
defer state.close()
|
||||||
|
|
||||||
|
const n = 3
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(n)
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
sessionID := state.service.StartFilterSession([]eth.Address{}, allNetworksFilter(), Filter{}, 5, V2)
|
||||||
|
require.Greater(t, sessionID, SessionID(0))
|
||||||
|
|
||||||
|
transactionCount := 5
|
||||||
|
_, _, _, cleanup := setupTransactions(t, state, transactionCount, []transactions.TestTxSummary{{DontConfirm: true, Timestamp: transactionCount + 1}})
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
const m = 10
|
||||||
|
var subwg sync.WaitGroup
|
||||||
|
subwg.Add(m)
|
||||||
|
for j := 0; j < m; j++ {
|
||||||
|
go func() {
|
||||||
|
defer subwg.Done()
|
||||||
|
var suberr error
|
||||||
|
|
||||||
|
suberr = state.service.UpdateFilterForSession(sessionID, Filter{}, 5)
|
||||||
|
require.NoError(t, suberr)
|
||||||
|
|
||||||
|
suberr = state.service.ResetFilterSession(sessionID, 5)
|
||||||
|
require.NoError(t, suberr)
|
||||||
|
|
||||||
|
suberr = state.service.GetMoreForFilterSession(sessionID, 5)
|
||||||
|
require.NoError(t, suberr)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
subwg.Wait()
|
||||||
|
|
||||||
|
state.service.StopFilterSession(sessionID)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
}
|
|
@ -814,7 +814,17 @@ func (api *API) StartActivityFilterSession(addresses []common.Address, chainIDs
|
||||||
zap.Int("firstPageCount", firstPageCount),
|
zap.Int("firstPageCount", firstPageCount),
|
||||||
)
|
)
|
||||||
|
|
||||||
return api.s.activity.StartFilterSession(addresses, chainIDs, filter, firstPageCount), nil
|
return api.s.activity.StartFilterSession(addresses, chainIDs, filter, firstPageCount, activity.V1), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (api *API) StartActivityFilterSessionV2(addresses []common.Address, chainIDs []wcommon.ChainID, filter activity.Filter, firstPageCount int) (activity.SessionID, error) {
|
||||||
|
logutils.ZapLogger().Debug("wallet.api.StartActivityFilterSessionV2",
|
||||||
|
zap.Int("addr.count", len(addresses)),
|
||||||
|
zap.Int("chainIDs.count", len(chainIDs)),
|
||||||
|
zap.Int("firstPageCount", firstPageCount),
|
||||||
|
)
|
||||||
|
|
||||||
|
return api.s.activity.StartFilterSession(addresses, chainIDs, filter, firstPageCount, activity.V2), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *API) UpdateActivityFilterForSession(sessionID activity.SessionID, filter activity.Filter, firstPageCount int) error {
|
func (api *API) UpdateActivityFilterForSession(sessionID activity.SessionID, filter activity.Filter, firstPageCount int) error {
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
|
"github.com/ethereum/go-ethereum/event"
|
||||||
"github.com/ethereum/go-ethereum/log"
|
"github.com/ethereum/go-ethereum/log"
|
||||||
|
|
||||||
"github.com/status-im/status-go/logutils"
|
"github.com/status-im/status-go/logutils"
|
||||||
|
@ -21,26 +22,33 @@ import (
|
||||||
pathProcessorCommon "github.com/status-im/status-go/services/wallet/router/pathprocessor/common"
|
pathProcessorCommon "github.com/status-im/status-go/services/wallet/router/pathprocessor/common"
|
||||||
"github.com/status-im/status-go/services/wallet/router/sendtype"
|
"github.com/status-im/status-go/services/wallet/router/sendtype"
|
||||||
"github.com/status-im/status-go/services/wallet/transfer"
|
"github.com/status-im/status-go/services/wallet/transfer"
|
||||||
|
"github.com/status-im/status-go/services/wallet/walletevent"
|
||||||
"github.com/status-im/status-go/services/wallet/wallettypes"
|
"github.com/status-im/status-go/services/wallet/wallettypes"
|
||||||
"github.com/status-im/status-go/signal"
|
"github.com/status-im/status-go/signal"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
EventRouteExecutionTransactionSent walletevent.EventType = walletevent.InternalEventTypePrefix + "wallet-route-execution-transaction-sent"
|
||||||
|
)
|
||||||
|
|
||||||
type Manager struct {
|
type Manager struct {
|
||||||
router *router.Router
|
router *router.Router
|
||||||
transactionManager *transfer.TransactionManager
|
transactionManager *transfer.TransactionManager
|
||||||
transferController *transfer.Controller
|
transferController *transfer.Controller
|
||||||
db *storage.DB
|
db *storage.DB
|
||||||
|
eventFeed *event.Feed
|
||||||
|
|
||||||
// Local data used for storage purposes
|
// Local data used for storage purposes
|
||||||
buildInputParams *requests.RouterBuildTransactionsParams
|
buildInputParams *requests.RouterBuildTransactionsParams
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewManager(walletDB *sql.DB, router *router.Router, transactionManager *transfer.TransactionManager, transferController *transfer.Controller) *Manager {
|
func NewManager(walletDB *sql.DB, eventFeed *event.Feed, router *router.Router, transactionManager *transfer.TransactionManager, transferController *transfer.Controller) *Manager {
|
||||||
return &Manager{
|
return &Manager{
|
||||||
router: router,
|
router: router,
|
||||||
transactionManager: transactionManager,
|
transactionManager: transactionManager,
|
||||||
transferController: transferController,
|
transferController: transferController,
|
||||||
db: storage.NewDB(walletDB),
|
db: storage.NewDB(walletDB),
|
||||||
|
eventFeed: eventFeed,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -134,6 +142,12 @@ func (m *Manager) SendRouterTransactionsWithSignatures(ctx context.Context, send
|
||||||
response.SendDetails.ErrorResponse = err.(*statusErrors.ErrorResponse)
|
response.SendDetails.ErrorResponse = err.(*statusErrors.ErrorResponse)
|
||||||
}
|
}
|
||||||
signal.SendWalletEvent(signal.RouterTransactionsSent, response)
|
signal.SendWalletEvent(signal.RouterTransactionsSent, response)
|
||||||
|
|
||||||
|
event := walletevent.Event{
|
||||||
|
Type: EventRouteExecutionTransactionSent,
|
||||||
|
EventParams: response,
|
||||||
|
}
|
||||||
|
m.eventFeed.Send(event)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
_, routeInputParams = m.router.GetBestRouteAndAssociatedInputParams()
|
_, routeInputParams = m.router.GetBestRouteAndAssociatedInputParams()
|
||||||
|
|
|
@ -204,7 +204,7 @@ func NewService(
|
||||||
router.AddPathProcessor(processor)
|
router.AddPathProcessor(processor)
|
||||||
}
|
}
|
||||||
|
|
||||||
routeExecutionManager := routeexecution.NewManager(db, router, transactionManager, transferController)
|
routeExecutionManager := routeexecution.NewManager(db, feed, router, transactionManager, transferController)
|
||||||
|
|
||||||
return &Service{
|
return &Service{
|
||||||
db: db,
|
db: db,
|
||||||
|
|
|
@ -350,6 +350,8 @@ func addSignatureAndSendTransaction(
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
txData.TxArgs.MultiTransactionID = multiTransactionID
|
||||||
|
|
||||||
return responses.NewRouterSentTransaction(txData.TxArgs, txData.SentHash, isApproval), nil
|
return responses.NewRouterSentTransaction(txData.TxArgs, txData.SentHash, isApproval), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -12,23 +12,58 @@ class SignalClient:
|
||||||
|
|
||||||
self.await_signals = await_signals
|
self.await_signals = await_signals
|
||||||
self.received_signals = {
|
self.received_signals = {
|
||||||
signal: [] for signal in self.await_signals
|
# For each signal type, store:
|
||||||
|
# - list of received signals
|
||||||
|
# - expected received event delta count (resets to 1 after each wait_for_event call)
|
||||||
|
# - expected received event count
|
||||||
|
# - a function that takes the received signal as an argument and returns True if the signal is accepted (counted) or discarded
|
||||||
|
signal: {
|
||||||
|
"received": [],
|
||||||
|
"delta_count": 1,
|
||||||
|
"expected_count": 1,
|
||||||
|
"accept_fn": None
|
||||||
|
} for signal in self.await_signals
|
||||||
}
|
}
|
||||||
|
|
||||||
def on_message(self, ws, signal):
|
def on_message(self, ws, signal):
|
||||||
signal = json.loads(signal)
|
signal = json.loads(signal)
|
||||||
if signal.get("type") in self.await_signals:
|
signal_type = signal.get("type")
|
||||||
self.received_signals[signal["type"]].append(signal)
|
if signal_type in self.await_signals:
|
||||||
|
accept_fn = self.received_signals[signal_type]["accept_fn"]
|
||||||
|
if not accept_fn or accept_fn(signal):
|
||||||
|
self.received_signals[signal_type]["received"].append(signal)
|
||||||
|
|
||||||
|
def check_signal_type(self, signal_type):
|
||||||
|
if signal_type not in self.await_signals:
|
||||||
|
raise ValueError(f"Signal type {signal_type} is not in the list of awaited signals")
|
||||||
|
|
||||||
|
# Used to set up how many instances of a signal to wait for, before triggering the actions
|
||||||
|
# that cause them to be emitted.
|
||||||
|
def prepare_wait_for_signal(self, signal_type, delta_count, accept_fn=None):
|
||||||
|
self.check_signal_type(signal_type)
|
||||||
|
|
||||||
|
if delta_count < 1:
|
||||||
|
raise ValueError("delta_count must be greater than 0")
|
||||||
|
self.received_signals[signal_type]["delta_count"] = delta_count
|
||||||
|
self.received_signals[signal_type]["expected_count"] = len(self.received_signals[signal_type]["received"]) + delta_count
|
||||||
|
self.received_signals[signal_type]["accept_fn"] = accept_fn
|
||||||
|
|
||||||
def wait_for_signal(self, signal_type, timeout=20):
|
def wait_for_signal(self, signal_type, timeout=20):
|
||||||
|
self.check_signal_type(signal_type)
|
||||||
|
|
||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
while not self.received_signals.get(signal_type):
|
received_signals = self.received_signals.get(signal_type)
|
||||||
|
while (not received_signals) or len(received_signals["received"]) < received_signals["expected_count"]:
|
||||||
if time.time() - start_time >= timeout:
|
if time.time() - start_time >= timeout:
|
||||||
raise TimeoutError(
|
raise TimeoutError(
|
||||||
f"Signal {signal_type} is not received in {timeout} seconds")
|
f"Signal {signal_type} is not received in {timeout} seconds")
|
||||||
time.sleep(0.2)
|
time.sleep(0.2)
|
||||||
logging.debug(f"Signal {signal_type} is received in {round(time.time() - start_time)} seconds")
|
logging.debug(f"Signal {signal_type} is received in {round(time.time() - start_time)} seconds")
|
||||||
return self.received_signals[signal_type][0]
|
delta_count = received_signals["delta_count"]
|
||||||
|
self.prepare_wait_for_signal(signal_type, 1)
|
||||||
|
if delta_count == 1:
|
||||||
|
return self.received_signals[signal_type]["received"][-1]
|
||||||
|
return self.received_signals[signal_type]["received"][-delta_count:]
|
||||||
|
|
||||||
def _on_error(self, ws, error):
|
def _on_error(self, ws, error):
|
||||||
logging.error(f"Error: {error}")
|
logging.error(f"Error: {error}")
|
||||||
|
|
|
@ -0,0 +1,90 @@
|
||||||
|
import json
|
||||||
|
import random
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from constants import user_1
|
||||||
|
from test_cases import SignalTestCase
|
||||||
|
|
||||||
|
import wallet_utils
|
||||||
|
|
||||||
|
import logging
|
||||||
|
|
||||||
|
EventActivityFilteringDone = "wallet-activity-filtering-done"
|
||||||
|
EventActivityFilteringUpdate = "wallet-activity-filtering-entries-updated"
|
||||||
|
EventActivitySessionUpdated = "wallet-activity-session-updated"
|
||||||
|
|
||||||
|
def validate_entry(entry, tx_data):
|
||||||
|
assert entry["transactions"][0]["chainId"] == tx_data["tx_status"]["chainId"]
|
||||||
|
assert entry["transactions"][0]["hash"] == tx_data["tx_status"]["hash"]
|
||||||
|
|
||||||
|
@pytest.mark.wallet
|
||||||
|
@pytest.mark.rpc
|
||||||
|
class TestWalletActivitySession(SignalTestCase):
|
||||||
|
await_signals = ["wallet",
|
||||||
|
"wallet.suggested.routes",
|
||||||
|
"wallet.router.sign-transactions",
|
||||||
|
"wallet.router.sending-transactions-started",
|
||||||
|
"wallet.transaction.status-changed",
|
||||||
|
"wallet.router.transactions-sent"]
|
||||||
|
|
||||||
|
def setup_method(self):
|
||||||
|
super().setup_method()
|
||||||
|
self.request_id = str(random.randint(1, 8888))
|
||||||
|
|
||||||
|
def test_wallet_start_activity_filter_session(self):
|
||||||
|
tx_data = [] # (routes, build_tx, tx_signatures, tx_status)
|
||||||
|
# Set up a transactions for account before starting session
|
||||||
|
tx_data.append(wallet_utils.send_router_transaction(self.rpc_client, self.signal_client))
|
||||||
|
|
||||||
|
# Start activity session
|
||||||
|
method = "wallet_startActivityFilterSessionV2"
|
||||||
|
params = [[user_1.address], [self.network_id],
|
||||||
|
{"period": {"startTimestamp": 0, "endTimestamp": 0}, "types": [], "statuses": [],
|
||||||
|
"counterpartyAddresses": [], "assets": [], "collectibles": [], "filterOutAssets": False,
|
||||||
|
"filterOutCollectibles": False}, 10]
|
||||||
|
self.signal_client.prepare_wait_for_signal("wallet", 1, lambda signal : signal["event"]["type"] == EventActivityFilteringDone)
|
||||||
|
response = self.rpc_client.rpc_valid_request(method, params, self.request_id)
|
||||||
|
event_response = self.signal_client.wait_for_signal("wallet", timeout=10)['event']
|
||||||
|
|
||||||
|
# Check response
|
||||||
|
sessionID = int(response.json()["result"])
|
||||||
|
assert sessionID > 0
|
||||||
|
|
||||||
|
# Check response event
|
||||||
|
assert int(event_response['requestId']) == sessionID
|
||||||
|
message = json.loads(event_response['message'].replace("'", "\""))
|
||||||
|
assert int(message['errorCode']) == 1
|
||||||
|
assert len(message['activities']) > 0 # Should have at least 1 entry
|
||||||
|
# First activity entry should match last sent transaction
|
||||||
|
validate_entry(message['activities'][0], tx_data[-1])
|
||||||
|
|
||||||
|
# Trigger new transaction
|
||||||
|
self.signal_client.prepare_wait_for_signal("wallet", 1, lambda signal : signal["event"]["type"] == EventActivitySessionUpdated and signal['event']['requestId'] == sessionID)
|
||||||
|
tx_data.append(wallet_utils.send_router_transaction(self.rpc_client, self.signal_client))
|
||||||
|
print(tx_data[-1])
|
||||||
|
event_response = self.signal_client.wait_for_signal("wallet", timeout=10)['event']
|
||||||
|
|
||||||
|
# Check response event
|
||||||
|
assert int(event_response['requestId']) == sessionID
|
||||||
|
message = json.loads(event_response['message'].replace("'", "\""))
|
||||||
|
assert message['hasNewOnTop'] # New entries reported
|
||||||
|
|
||||||
|
# Reset activity session
|
||||||
|
method = "wallet_resetActivityFilterSession"
|
||||||
|
params = [sessionID, 10]
|
||||||
|
self.signal_client.prepare_wait_for_signal("wallet", 1, lambda signal : signal["event"]["type"] == EventActivityFilteringDone and signal['event']['requestId'] == sessionID)
|
||||||
|
response = self.rpc_client.rpc_valid_request(method, params, self.request_id)
|
||||||
|
event_response = self.signal_client.wait_for_signal("wallet", timeout=10)['event']
|
||||||
|
|
||||||
|
# Check response event
|
||||||
|
assert int(event_response['requestId']) == sessionID
|
||||||
|
message = json.loads(event_response['message'].replace("'", "\""))
|
||||||
|
assert int(message['errorCode']) == 1
|
||||||
|
assert len(message['activities']) > 1 # Should have at least 2 entries
|
||||||
|
|
||||||
|
# First activity entry should match last sent transaction
|
||||||
|
validate_entry(message['activities'][0], tx_data[-1])
|
||||||
|
|
||||||
|
# Second activity entry should match second to last sent transaction
|
||||||
|
validate_entry(message['activities'][1], tx_data[-2])
|
|
@ -0,0 +1,131 @@
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import jsonschema
|
||||||
|
import uuid
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
|
||||||
|
from conftest import option
|
||||||
|
from constants import user_1, user_2
|
||||||
|
|
||||||
|
from clients.signals import SignalClient
|
||||||
|
|
||||||
|
def verify_json_schema(response, method):
|
||||||
|
with open(f"{option.base_dir}/schemas/{method}", "r") as schema:
|
||||||
|
jsonschema.validate(instance=response,
|
||||||
|
schema=json.load(schema))
|
||||||
|
|
||||||
|
def get_suggested_routes(rpc_client, signal_client, **kwargs):
|
||||||
|
_uuid = str(uuid.uuid4())
|
||||||
|
amount_in = "0xde0b6b3a7640000"
|
||||||
|
|
||||||
|
method = "wallet_getSuggestedRoutesAsync"
|
||||||
|
input_params = {
|
||||||
|
"uuid": _uuid,
|
||||||
|
"sendType": 0,
|
||||||
|
"addrFrom": user_1.address,
|
||||||
|
"addrTo": user_2.address,
|
||||||
|
"amountIn": amount_in,
|
||||||
|
"amountOut": "0x0",
|
||||||
|
"tokenID": "ETH",
|
||||||
|
"tokenIDIsOwnerToken": False,
|
||||||
|
"toTokenID": "",
|
||||||
|
"disabledFromChainIDs": [10, 42161],
|
||||||
|
"disabledToChainIDs": [10, 42161],
|
||||||
|
"gasFeeMode": 1,
|
||||||
|
"fromLockedAmount": {}
|
||||||
|
}
|
||||||
|
for key, new_value in kwargs.items():
|
||||||
|
if key in input_params:
|
||||||
|
input_params[key] = new_value
|
||||||
|
else:
|
||||||
|
logging.info(
|
||||||
|
f"Warning: The key '{key}' does not exist in the input_params parameters and will be ignored.")
|
||||||
|
params = [input_params]
|
||||||
|
|
||||||
|
signal_client.prepare_wait_for_signal("wallet.suggested.routes", 1)
|
||||||
|
_ = rpc_client.rpc_valid_request(method, params)
|
||||||
|
|
||||||
|
routes = signal_client.wait_for_signal("wallet.suggested.routes")
|
||||||
|
assert routes['event']['Uuid'] == _uuid
|
||||||
|
|
||||||
|
return routes['event']
|
||||||
|
|
||||||
|
def build_transactions_from_route(rpc_client, signal_client, uuid, **kwargs):
|
||||||
|
method = "wallet_buildTransactionsFromRoute"
|
||||||
|
build_tx_params = {
|
||||||
|
"uuid": uuid,
|
||||||
|
"slippagePercentage": 0
|
||||||
|
}
|
||||||
|
for key, new_value in kwargs.items():
|
||||||
|
if key in build_tx_params:
|
||||||
|
build_tx_params[key] = new_value
|
||||||
|
else:
|
||||||
|
logging.info(
|
||||||
|
f"Warning: The key '{key}' does not exist in the build_tx_params parameters and will be ignored.")
|
||||||
|
params = [build_tx_params]
|
||||||
|
_ = rpc_client.rpc_valid_request(method, params)
|
||||||
|
|
||||||
|
wallet_router_sign_transactions = signal_client.wait_for_signal("wallet.router.sign-transactions")
|
||||||
|
|
||||||
|
assert wallet_router_sign_transactions['event']['signingDetails']['signOnKeycard'] == False
|
||||||
|
transaction_hashes = wallet_router_sign_transactions['event']['signingDetails']['hashes']
|
||||||
|
|
||||||
|
assert transaction_hashes, "Transaction hashes are empty!"
|
||||||
|
|
||||||
|
return wallet_router_sign_transactions['event']
|
||||||
|
|
||||||
|
def sign_messages(rpc_client, hashes):
|
||||||
|
tx_signatures = {}
|
||||||
|
|
||||||
|
for hash in hashes:
|
||||||
|
|
||||||
|
method = "wallet_signMessage"
|
||||||
|
params = [
|
||||||
|
hash,
|
||||||
|
user_1.address,
|
||||||
|
option.password
|
||||||
|
]
|
||||||
|
|
||||||
|
response = rpc_client.rpc_valid_request(method, params)
|
||||||
|
|
||||||
|
if response.json()["result"].startswith("0x"):
|
||||||
|
tx_signature = response.json()["result"][2:]
|
||||||
|
|
||||||
|
signature = {
|
||||||
|
"r": tx_signature[:64],
|
||||||
|
"s": tx_signature[64:128],
|
||||||
|
"v": tx_signature[128:]
|
||||||
|
}
|
||||||
|
|
||||||
|
tx_signatures[hash] = signature
|
||||||
|
return tx_signatures
|
||||||
|
|
||||||
|
def send_router_transactions_with_signatures(rpc_client, signal_client, uuid, tx_signatures):
|
||||||
|
method = "wallet_sendRouterTransactionsWithSignatures"
|
||||||
|
params = [
|
||||||
|
{
|
||||||
|
"uuid": uuid,
|
||||||
|
"Signatures": tx_signatures
|
||||||
|
}
|
||||||
|
]
|
||||||
|
_ = rpc_client.rpc_valid_request(method, params)
|
||||||
|
|
||||||
|
tx_status = signal_client.wait_for_signal(
|
||||||
|
"wallet.transaction.status-changed")
|
||||||
|
|
||||||
|
assert tx_status["event"]["status"] == "Success"
|
||||||
|
|
||||||
|
return tx_status["event"]
|
||||||
|
|
||||||
|
def send_router_transaction(rpc_client, signal_client, **kwargs):
|
||||||
|
routes = get_suggested_routes(rpc_client, signal_client, **kwargs)
|
||||||
|
build_tx = build_transactions_from_route(rpc_client, signal_client, routes['Uuid'])
|
||||||
|
tx_signatures = sign_messages(rpc_client, build_tx['signingDetails']['hashes'])
|
||||||
|
tx_status = send_router_transactions_with_signatures(rpc_client, signal_client, routes['Uuid'], tx_signatures)
|
||||||
|
return {
|
||||||
|
"routes": routes,
|
||||||
|
"build_tx": build_tx,
|
||||||
|
"tx_signatures": tx_signatures,
|
||||||
|
"tx_status": tx_status
|
||||||
|
}
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/big"
|
"math/big"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
eth "github.com/ethereum/go-ethereum/common"
|
eth "github.com/ethereum/go-ethereum/common"
|
||||||
|
@ -35,6 +36,7 @@ type MockChainClient struct {
|
||||||
mock_client.MockClientInterface
|
mock_client.MockClientInterface
|
||||||
|
|
||||||
Clients map[common.ChainID]*MockETHClient
|
Clients map[common.ChainID]*MockETHClient
|
||||||
|
mu sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ chain.ClientInterface = (*MockChainClient)(nil)
|
var _ chain.ClientInterface = (*MockChainClient)(nil)
|
||||||
|
@ -46,6 +48,8 @@ func NewMockChainClient() *MockChainClient {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MockChainClient) SetAvailableClients(chainIDs []common.ChainID) *MockChainClient {
|
func (m *MockChainClient) SetAvailableClients(chainIDs []common.ChainID) *MockChainClient {
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
for _, chainID := range chainIDs {
|
for _, chainID := range chainIDs {
|
||||||
if _, ok := m.Clients[chainID]; !ok {
|
if _, ok := m.Clients[chainID]; !ok {
|
||||||
m.Clients[chainID] = new(MockETHClient)
|
m.Clients[chainID] = new(MockETHClient)
|
||||||
|
@ -55,6 +59,8 @@ func (m *MockChainClient) SetAvailableClients(chainIDs []common.ChainID) *MockCh
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MockChainClient) AbstractEthClient(chainID common.ChainID) (ethclient.BatchCallClient, error) {
|
func (m *MockChainClient) AbstractEthClient(chainID common.ChainID) (ethclient.BatchCallClient, error) {
|
||||||
|
m.mu.RLock()
|
||||||
|
defer m.mu.RUnlock()
|
||||||
if _, ok := m.Clients[chainID]; !ok {
|
if _, ok := m.Clients[chainID]; !ok {
|
||||||
panic(fmt.Sprintf("no mock client for chainID %d", chainID))
|
panic(fmt.Sprintf("no mock client for chainID %d", chainID))
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,5 @@
|
||||||
|
ALTER TABLE route_input_parameters ADD COLUMN from_address BLOB NOT NULL AS (unhex(substr(json_extract(route_input_params_json, '$.addrFrom'),3)));
|
||||||
|
ALTER TABLE route_input_parameters ADD COLUMN to_address BLOB NOT NULL AS (unhex(substr(json_extract(route_input_params_json, '$.addrTo'),3)));
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_route_input_parameters_per_from_address ON route_input_parameters (from_address);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_route_input_parameters_per_to_address ON route_input_parameters (to_address);
|
Loading…
Reference in New Issue