diff --git a/geth/api/api.go b/geth/api/api.go index 625dbb757..33693ef98 100644 --- a/geth/api/api.go +++ b/geth/api/api.go @@ -12,6 +12,7 @@ import ( "github.com/status-im/status-go/geth/node" "github.com/status-im/status-go/geth/params" "github.com/status-im/status-go/geth/transactions" + "github.com/status-im/status-go/sign" ) // StatusAPI provides API to access Status related functionality. @@ -49,9 +50,14 @@ func (api *StatusAPI) JailManager() jail.Manager { return api.b.JailManager() } -// TxQueueManager returns reference to account manager -func (api *StatusAPI) TxQueueManager() *transactions.Manager { - return api.b.TxQueueManager() +// Transactor returns reference to a status transactor +func (api *StatusAPI) Transactor() *transactions.Transactor { + return api.b.Transactor() +} + +// PendingSignRequests returns reference to a list of current sign requests +func (api *StatusAPI) PendingSignRequests() *sign.PendingRequests { + return api.b.PendingSignRequests() } // StartNode start Status node, fails if node is already started @@ -156,7 +162,7 @@ func (api *StatusAPI) CompleteTransaction(id string, password string) (gethcommo } // CompleteTransactions instructs backend to complete sending of multiple transactions -func (api *StatusAPI) CompleteTransactions(ids []string, password string) map[string]transactions.Result { +func (api *StatusAPI) CompleteTransactions(ids []string, password string) map[string]sign.Result { return api.b.CompleteTransactions(ids, password) } diff --git a/geth/api/backend.go b/geth/api/backend.go index 78d43317c..aafbecedb 100644 --- a/geth/api/backend.go +++ b/geth/api/backend.go @@ -16,6 +16,7 @@ import ( "github.com/status-im/status-go/geth/params" "github.com/status-im/status-go/geth/signal" "github.com/status-im/status-go/geth/transactions" + "github.com/status-im/status-go/sign" ) const ( @@ -32,14 +33,15 @@ var ( // StatusBackend implements Status.im service type StatusBackend struct { - mu sync.Mutex - statusNode *node.StatusNode - accountManager *account.Manager - txQueueManager *transactions.Manager - jailManager jail.Manager - newNotification fcm.NotificationConstructor - connectionState ConnectionState - log log.Logger + mu sync.Mutex + statusNode *node.StatusNode + pendingSignRequests *sign.PendingRequests + accountManager *account.Manager + transactor *transactions.Transactor + jailManager jail.Manager + newNotification fcm.NotificationConstructor + connectionState ConnectionState + log log.Logger } // NewStatusBackend create a new NewStatusBackend instance @@ -47,18 +49,20 @@ func NewStatusBackend() *StatusBackend { defer log.Info("Status backend initialized") statusNode := node.New() + pendingSignRequests := sign.NewPendingRequests() accountManager := account.NewManager(statusNode) - txQueueManager := transactions.NewManager(statusNode) + transactor := transactions.NewTransactor(pendingSignRequests) jailManager := jail.New(statusNode) notificationManager := fcm.NewNotification(fcmServerKey) return &StatusBackend{ - statusNode: statusNode, - accountManager: accountManager, - jailManager: jailManager, - txQueueManager: txQueueManager, - newNotification: notificationManager, - log: log.New("package", "status-go/geth/api.StatusBackend"), + pendingSignRequests: pendingSignRequests, + statusNode: statusNode, + accountManager: accountManager, + jailManager: jailManager, + transactor: transactor, + newNotification: notificationManager, + log: log.New("package", "status-go/geth/api.StatusBackend"), } } @@ -77,9 +81,14 @@ func (b *StatusBackend) JailManager() jail.Manager { return b.jailManager } -// TxQueueManager returns reference to transactions manager -func (b *StatusBackend) TxQueueManager() *transactions.Manager { - return b.txQueueManager +// Transactor returns reference to a status transactor +func (b *StatusBackend) Transactor() *transactions.Transactor { + return b.transactor +} + +// PendingSignRequests returns reference to a list of current sign requests +func (b *StatusBackend) PendingSignRequests() *sign.PendingRequests { + return b.pendingSignRequests } // IsNodeRunning confirm that node is running @@ -117,9 +126,9 @@ func (b *StatusBackend) startNode(config *params.NodeConfig) (err error) { return err } signal.Send(signal.Envelope{Type: signal.EventNodeStarted}) - // tx queue manager should be started after node is started, it depends - // on rpc client being created - b.txQueueManager.Start(config.NetworkID) + + b.transactor.SetNetworkID(config.NetworkID) + b.transactor.SetRPCClient(b.statusNode.RPCClient()) if err := b.registerHandlers(); err != nil { b.log.Error("Handler registration failed", "err", err) } @@ -142,7 +151,6 @@ func (b *StatusBackend) stopNode() error { if !b.IsNodeRunning() { return node.ErrNoRunningNode } - b.txQueueManager.Stop() b.jailManager.Stop() defer signal.Send(signal.Envelope{Type: signal.EventNodeStopped}) return b.statusNode.Stop() @@ -193,18 +201,7 @@ func (b *StatusBackend) CallRPC(inputJSON string) string { // SendTransaction creates a new transaction and waits until it's complete. func (b *StatusBackend) SendTransaction(ctx context.Context, args transactions.SendTxArgs) (hash gethcommon.Hash, err error) { - if ctx == nil { - ctx = context.Background() - } - tx := transactions.Create(ctx, args) - if err = b.txQueueManager.QueueTransaction(tx); err != nil { - return hash, err - } - rst := b.txQueueManager.WaitForTransaction(tx) - if rst.Error != nil { - return hash, rst.Error - } - return rst.Hash, nil + return b.transactor.SendTransaction(ctx, args) } func (b *StatusBackend) getVerifiedAccount(password string) (*account.SelectedExtKey, error) { @@ -227,21 +224,15 @@ func (b *StatusBackend) getVerifiedAccount(password string) (*account.SelectedEx // CompleteTransaction instructs backend to complete sending of a given transaction func (b *StatusBackend) CompleteTransaction(id string, password string) (hash gethcommon.Hash, err error) { - selectedAccount, err := b.getVerifiedAccount(password) - if err != nil { - _ = b.txQueueManager.NotifyErrored(id, err) - return hash, err - } - - return b.txQueueManager.CompleteTransaction(id, selectedAccount) + return b.pendingSignRequests.Approve(id, password, b.getVerifiedAccount) } // CompleteTransactions instructs backend to complete sending of multiple transactions -func (b *StatusBackend) CompleteTransactions(ids []string, password string) map[string]transactions.Result { - results := make(map[string]transactions.Result) +func (b *StatusBackend) CompleteTransactions(ids []string, password string) map[string]sign.Result { + results := make(map[string]sign.Result) for _, txID := range ids { txHash, txErr := b.CompleteTransaction(txID, password) - results[txID] = transactions.Result{ + results[txID] = sign.Result{ Hash: txHash, Error: txErr, } @@ -251,7 +242,7 @@ func (b *StatusBackend) CompleteTransactions(ids []string, password string) map[ // DiscardTransaction discards a given transaction from transaction queue func (b *StatusBackend) DiscardTransaction(id string) error { - return b.txQueueManager.DiscardTransaction(id) + return b.pendingSignRequests.Discard(id) } // DiscardTransactions discards given multiple transactions from transaction queue @@ -273,13 +264,30 @@ func (b *StatusBackend) registerHandlers() error { if rpcClient == nil { return node.ErrRPCClient } - rpcClient.RegisterHandler("eth_accounts", func(context.Context, ...interface{}) (interface{}, error) { + + rpcClient.RegisterHandler(params.AccountsMethodName, func(context.Context, ...interface{}) (interface{}, error) { return b.AccountManager().Accounts() }) - rpcClient.RegisterHandler("eth_sendTransaction", b.txQueueManager.SendTransactionRPCHandler) + + rpcClient.RegisterHandler(params.SendTransactionMethodName, func(ctx context.Context, rpcParams ...interface{}) (interface{}, error) { + txArgs, err := transactions.RPCCalltoSendTxArgs(rpcParams...) + if err != nil { + return nil, err + } + + hash, err := b.SendTransaction(ctx, txArgs) + if err != nil { + return nil, err + } + + return hash.Hex(), err + }) + return nil } +// + // ConnectionChange handles network state changes logic. func (b *StatusBackend) ConnectionChange(state ConnectionState) { b.log.Info("Network state change", "old", b.connectionState, "new", state) diff --git a/geth/params/defaults.go b/geth/params/defaults.go index e138c8e40..656da7862 100644 --- a/geth/params/defaults.go +++ b/geth/params/defaults.go @@ -34,6 +34,10 @@ const ( // SendTransactionMethodName defines the name for a giving transaction. SendTransactionMethodName = "eth_sendTransaction" + // AccountsMethodName defines the name for listing the currently signed accounts. + AccountsMethodName = "eth_accounts" + // PersonalSignMethodName defines the name for `personal.sign` API. + PersonalSignMethodName = "personal_sign" // WSPort is a WS-RPC port (replaced in unit tests) WSPort = 8546 diff --git a/geth/transactions/errors.go b/geth/transactions/errors.go deleted file mode 100644 index 7eb38081e..000000000 --- a/geth/transactions/errors.go +++ /dev/null @@ -1,10 +0,0 @@ -package transactions - -import "errors" - -var ( - //ErrQueuedTxTimedOut - error transaction sending timed out - ErrQueuedTxTimedOut = errors.New("transaction sending timed out") - //ErrQueuedTxDiscarded - error transaction discarded - ErrQueuedTxDiscarded = errors.New("transaction has been discarded") -) diff --git a/geth/transactions/queue.go b/geth/transactions/queue.go deleted file mode 100644 index b8f24a410..000000000 --- a/geth/transactions/queue.go +++ /dev/null @@ -1,234 +0,0 @@ -package transactions - -import ( - "errors" - "sync" - "time" - - "github.com/ethereum/go-ethereum/accounts/keystore" - gethcommon "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/log" - "github.com/status-im/status-go/geth/account" -) - -const ( - // DefaultTxQueueCap defines how many items can be queued. - DefaultTxQueueCap = int(35) -) - -var ( - // ErrQueuedTxExist - transaction was already enqueued - ErrQueuedTxExist = errors.New("transaction already exist in queue") - //ErrQueuedTxIDNotFound - error transaction hash not found - ErrQueuedTxIDNotFound = errors.New("transaction hash not found") - //ErrQueuedTxInProgress - error transaction is in progress - ErrQueuedTxInProgress = errors.New("transaction is in progress") - //ErrInvalidCompleteTxSender - error transaction with invalid sender - ErrInvalidCompleteTxSender = errors.New("transaction can only be completed by the same account which created it") -) - -// remove from queue on any error (except for transient ones) and propagate -// defined as map[string]bool because errors from ethclient returned wrapped as jsonError -var transientErrs = map[string]bool{ - keystore.ErrDecrypt.Error(): true, // wrong password - ErrInvalidCompleteTxSender.Error(): true, // completing tx create from another account - account.ErrNoAccountSelected.Error(): true, // account not selected -} - -type empty struct{} - -// TxQueue is capped container that holds pending transactions -type TxQueue struct { - mu sync.RWMutex // to guard transactions map - transactions map[string]*QueuedTx - inprogress map[string]empty - - // TODO(dshulyak) research why eviction is done in separate goroutine - evictableIDs chan string - enqueueTicker chan struct{} - - // when this channel is closed, all queue channels processing must cease (incoming queue, processing queued items etc) - stopped chan struct{} - stoppedGroup sync.WaitGroup // to make sure that all routines are stopped - log log.Logger -} - -// newQueue creates a transaction queue. -func newQueue() *TxQueue { - - logger := log.New("package", "status-go/geth/transactions.TxQueue") - - logger.Info("initializing transaction queue") - return &TxQueue{ - transactions: make(map[string]*QueuedTx), - inprogress: make(map[string]empty), - evictableIDs: make(chan string, DefaultTxQueueCap), // will be used to evict in FIFO - enqueueTicker: make(chan struct{}), - log: logger, - } -} - -// Start starts enqueue and eviction loops -func (q *TxQueue) Start() { - q.log.Info("starting transaction queue") - - if q.stopped != nil { - return - } - - q.stopped = make(chan struct{}) - q.stoppedGroup.Add(1) - go q.evictionLoop() -} - -// Stop stops transaction enqueue and eviction loops -func (q *TxQueue) Stop() { - q.log.Info("stopping transaction queue") - - if q.stopped == nil { - return - } - - close(q.stopped) // stops all processing loops (enqueue, eviction etc) - q.stoppedGroup.Wait() - q.stopped = nil - - q.log.Info("finally stopped transaction queue") -} - -// evictionLoop frees up queue to accommodate another transaction item -func (q *TxQueue) evictionLoop() { - defer haltOnPanic() - evict := func() { - if q.Count() >= DefaultTxQueueCap { // eviction is required to accommodate another/last item - q.Remove(<-q.evictableIDs) - } - } - - for { - select { - case <-time.After(250 * time.Millisecond): // do not wait for manual ticks, check queue regularly - evict() - case <-q.enqueueTicker: // when manually requested - evict() - case <-q.stopped: - q.log.Info("transaction queue's eviction loop stopped") - q.stoppedGroup.Done() - return - } - } -} - -// Reset is to be used in tests only, as it simply creates new transaction map, w/o any cleanup of the previous one -func (q *TxQueue) Reset() { - q.mu.Lock() - defer q.mu.Unlock() - - q.transactions = make(map[string]*QueuedTx) - q.evictableIDs = make(chan string, DefaultTxQueueCap) - q.inprogress = make(map[string]empty) -} - -// Enqueue enqueues incoming transaction -func (q *TxQueue) Enqueue(tx *QueuedTx) error { - q.log.Info("enqueue transaction", "ID", tx.ID) - q.mu.RLock() - if _, ok := q.transactions[tx.ID]; ok { - q.mu.RUnlock() - return ErrQueuedTxExist - } - q.mu.RUnlock() - - // we can't hold a lock in this part - q.log.Debug("notifying eviction loop") - q.enqueueTicker <- struct{}{} // notify eviction loop that we are trying to insert new item - q.evictableIDs <- tx.ID // this will block when we hit DefaultTxQueueCap - q.log.Debug("notified eviction loop") - - q.mu.Lock() - q.transactions[tx.ID] = tx - q.mu.Unlock() - - // notify handler - q.log.Info("calling txEnqueueHandler") - return nil -} - -// Get returns transaction by transaction identifier -func (q *TxQueue) Get(id string) (*QueuedTx, error) { - q.mu.RLock() - defer q.mu.RUnlock() - - if tx, ok := q.transactions[id]; ok { - return tx, nil - } - return nil, ErrQueuedTxIDNotFound -} - -// LockInprogress returns error if transaction is already inprogress. -func (q *TxQueue) LockInprogress(id string) error { - q.mu.Lock() - defer q.mu.Unlock() - if _, ok := q.transactions[id]; ok { - if _, inprogress := q.inprogress[id]; inprogress { - return ErrQueuedTxInProgress - } - q.inprogress[id] = empty{} - return nil - } - return ErrQueuedTxIDNotFound -} - -// Remove removes transaction by transaction identifier -func (q *TxQueue) Remove(id string) { - q.mu.Lock() - defer q.mu.Unlock() - q.remove(id) -} - -func (q *TxQueue) remove(id string) { - delete(q.transactions, id) - delete(q.inprogress, id) -} - -// Done removes transaction from queue if no error or error is not transient -// and notify subscribers -func (q *TxQueue) Done(id string, hash gethcommon.Hash, err error) error { - q.mu.Lock() - defer q.mu.Unlock() - tx, ok := q.transactions[id] - if !ok { - return ErrQueuedTxIDNotFound - } - q.done(tx, hash, err) - return nil -} - -func (q *TxQueue) done(tx *QueuedTx, hash gethcommon.Hash, err error) { - delete(q.inprogress, tx.ID) - // hash is updated only if err is nil, but transaction is not removed from a queue - if err == nil { - q.transactions[tx.ID].Result <- Result{Hash: hash, Error: err} - q.remove(tx.ID) - return - } - if _, transient := transientErrs[err.Error()]; !transient { - q.transactions[tx.ID].Result <- Result{Error: err} - q.remove(tx.ID) - } -} - -// Count returns number of currently queued transactions -func (q *TxQueue) Count() int { - q.mu.RLock() - defer q.mu.RUnlock() - return len(q.transactions) -} - -// Has checks whether transaction with a given identifier exists in queue -func (q *TxQueue) Has(id string) bool { - q.mu.RLock() - defer q.mu.RUnlock() - _, ok := q.transactions[id] - return ok -} diff --git a/geth/transactions/queue_test.go b/geth/transactions/queue_test.go deleted file mode 100644 index e2e847a6e..000000000 --- a/geth/transactions/queue_test.go +++ /dev/null @@ -1,131 +0,0 @@ -package transactions - -import ( - "context" - "errors" - "testing" - - "github.com/ethereum/go-ethereum/accounts/keystore" - gethcommon "github.com/ethereum/go-ethereum/common" - - "github.com/stretchr/testify/suite" -) - -func TestQueueTestSuite(t *testing.T) { - suite.Run(t, new(QueueTestSuite)) -} - -type QueueTestSuite struct { - suite.Suite - queue *TxQueue -} - -func (s *QueueTestSuite) SetupTest() { - s.queue = newQueue() - s.queue.Start() -} - -func (s *QueueTestSuite) TearDownTest() { - s.queue.Stop() -} - -func (s *QueueTestSuite) TestLockInprogressTransaction() { - tx := Create(context.Background(), SendTxArgs{}) - s.NoError(s.queue.Enqueue(tx)) - enquedTx, err := s.queue.Get(tx.ID) - s.NoError(err) - s.NoError(s.queue.LockInprogress(tx.ID)) - s.Equal(tx, enquedTx) - - // verify that tx was marked as being inprogress - s.Equal(ErrQueuedTxInProgress, s.queue.LockInprogress(tx.ID)) -} - -func (s *QueueTestSuite) TestGetTransaction() { - tx := Create(context.Background(), SendTxArgs{}) - s.NoError(s.queue.Enqueue(tx)) - for i := 2; i > 0; i-- { - enquedTx, err := s.queue.Get(tx.ID) - s.NoError(err) - s.Equal(tx, enquedTx) - } -} - -func (s *QueueTestSuite) TestAlreadyEnqueued() { - tx := Create(context.Background(), SendTxArgs{}) - s.NoError(s.queue.Enqueue(tx)) - s.Equal(ErrQueuedTxExist, s.queue.Enqueue(tx)) - // try to enqueue another tx to double check locking - tx = Create(context.Background(), SendTxArgs{}) - s.NoError(s.queue.Enqueue(tx)) -} - -func (s *QueueTestSuite) testDone(hash gethcommon.Hash, err error) *QueuedTx { - tx := Create(context.Background(), SendTxArgs{}) - s.NoError(s.queue.Enqueue(tx)) - s.NoError(s.queue.Done(tx.ID, hash, err)) - return tx -} - -func (s *QueueTestSuite) TestDoneSuccess() { - hash := gethcommon.Hash{1} - tx := s.testDone(hash, nil) - // event is sent only if transaction was removed from a queue - select { - case rst := <-tx.Result: - s.NoError(rst.Error) - s.Equal(hash, rst.Hash) - s.False(s.queue.Has(tx.ID)) - default: - s.Fail("No event was sent to Done channel") - } -} - -func (s *QueueTestSuite) TestDoneTransientError() { - hash := gethcommon.Hash{1} - err := keystore.ErrDecrypt - tx := s.testDone(hash, err) - s.True(s.queue.Has(tx.ID)) - _, inp := s.queue.inprogress[tx.ID] - s.False(inp) -} - -func (s *QueueTestSuite) TestDoneError() { - hash := gethcommon.Hash{1} - err := errors.New("test") - tx := s.testDone(hash, err) - // event is sent only if transaction was removed from a queue - select { - case rst := <-tx.Result: - s.Equal(err, rst.Error) - s.NotEqual(hash, rst.Hash) - s.Equal(gethcommon.Hash{}, rst.Hash) - s.False(s.queue.Has(tx.ID)) - default: - s.Fail("No event was sent to Done channel") - } -} - -func (s QueueTestSuite) TestMultipleDone() { - hash := gethcommon.Hash{1} - err := keystore.ErrDecrypt - tx := s.testDone(hash, err) - s.NoError(s.queue.Done(tx.ID, hash, nil)) - s.Equal(ErrQueuedTxIDNotFound, s.queue.Done(tx.ID, hash, errors.New("timeout"))) -} - -func (s *QueueTestSuite) TestEviction() { - var first *QueuedTx - for i := 0; i < DefaultTxQueueCap; i++ { - tx := Create(context.Background(), SendTxArgs{}) - if first == nil { - first = tx - } - s.NoError(s.queue.Enqueue(tx)) - } - s.Equal(DefaultTxQueueCap, s.queue.Count()) - tx := Create(context.Background(), SendTxArgs{}) - s.NoError(s.queue.Enqueue(tx)) - s.Equal(DefaultTxQueueCap, s.queue.Count()) - s.False(s.queue.Has(first.ID)) -} diff --git a/geth/transactions/ethtxclient.go b/geth/transactions/rpc_wrapper.go similarity index 61% rename from geth/transactions/ethtxclient.go rename to geth/transactions/rpc_wrapper.go index dff5be2df..eb35c8ab7 100644 --- a/geth/transactions/ethtxclient.go +++ b/geth/transactions/rpc_wrapper.go @@ -9,40 +9,32 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/rlp" + "github.com/status-im/status-go/geth/rpc" ) -// EthTransactor provides methods to create transactions for ethereum network. -type EthTransactor interface { - PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) - ethereum.GasEstimator - ethereum.GasPricer - ethereum.TransactionSender +// rpcWrapper wraps provides convenient interface for ethereum RPC APIs we need for sending transactions +type rpcWrapper struct { + rpcClient *rpc.Client } -// EthTxClient wraps common API methods that are used to send transaction. -type EthTxClient struct { - c *rpc.Client -} - -// NewEthTxClient returns a new EthTxClient for client -func NewEthTxClient(client *rpc.Client) *EthTxClient { - return &EthTxClient{c: client} +func newRPCWrapper(client *rpc.Client) *rpcWrapper { + return &rpcWrapper{rpcClient: client} } // PendingNonceAt returns the account nonce of the given account in the pending state. // This is the nonce that should be used for the next transaction. -func (ec *EthTxClient) PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) { +func (w *rpcWrapper) PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) { var result hexutil.Uint64 - err := ec.c.CallContext(ctx, &result, "eth_getTransactionCount", account, "pending") + err := w.rpcClient.CallContext(ctx, &result, "eth_getTransactionCount", account, "pending") return uint64(result), err } // SuggestGasPrice retrieves the currently suggested gas price to allow a timely // execution of a transaction. -func (ec *EthTxClient) SuggestGasPrice(ctx context.Context) (*big.Int, error) { +func (w *rpcWrapper) SuggestGasPrice(ctx context.Context) (*big.Int, error) { var hex hexutil.Big - if err := ec.c.CallContext(ctx, &hex, "eth_gasPrice"); err != nil { + if err := w.rpcClient.CallContext(ctx, &hex, "eth_gasPrice"); err != nil { return nil, err } return (*big.Int)(&hex), nil @@ -52,9 +44,9 @@ func (ec *EthTxClient) SuggestGasPrice(ctx context.Context) (*big.Int, error) { // the current pending state of the backend blockchain. There is no guarantee that this is // the true gas limit requirement as other transactions may be added or removed by miners, // but it should provide a basis for setting a reasonable default. -func (ec *EthTxClient) EstimateGas(ctx context.Context, msg ethereum.CallMsg) (uint64, error) { +func (w *rpcWrapper) EstimateGas(ctx context.Context, msg ethereum.CallMsg) (uint64, error) { var hex hexutil.Uint64 - err := ec.c.CallContext(ctx, &hex, "eth_estimateGas", toCallArg(msg)) + err := w.rpcClient.CallContext(ctx, &hex, "eth_estimateGas", toCallArg(msg)) if err != nil { return 0, err } @@ -65,12 +57,12 @@ func (ec *EthTxClient) EstimateGas(ctx context.Context, msg ethereum.CallMsg) (u // // If the transaction was a contract creation use the TransactionReceipt method to get the // contract address after the transaction has been mined. -func (ec *EthTxClient) SendTransaction(ctx context.Context, tx *types.Transaction) error { +func (w *rpcWrapper) SendTransaction(ctx context.Context, tx *types.Transaction) error { data, err := rlp.EncodeToBytes(tx) if err != nil { return err } - return ec.c.CallContext(ctx, nil, "eth_sendRawTransaction", common.ToHex(data)) + return w.rpcClient.CallContext(ctx, nil, "eth_sendRawTransaction", common.ToHex(data)) } func toCallArg(msg ethereum.CallMsg) interface{} { diff --git a/geth/transactions/rpcclient_mock.go b/geth/transactions/rpcclient_mock.go deleted file mode 100644 index 187668fc6..000000000 --- a/geth/transactions/rpcclient_mock.go +++ /dev/null @@ -1,47 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: geth/transactions/txqueue_manager_test.go - -// Package transactions is a generated GoMock package. -package transactions - -import ( - reflect "reflect" - - gomock "github.com/golang/mock/gomock" - rpc "github.com/status-im/status-go/geth/rpc" -) - -// MocktestRPCClientProvider is a mock of testRPCClientProvider interface -type MocktestRPCClientProvider struct { - ctrl *gomock.Controller - recorder *MocktestRPCClientProviderMockRecorder -} - -// MocktestRPCClientProviderMockRecorder is the mock recorder for MocktestRPCClientProvider -type MocktestRPCClientProviderMockRecorder struct { - mock *MocktestRPCClientProvider -} - -// NewMocktestRPCClientProvider creates a new mock instance -func NewMocktestRPCClientProvider(ctrl *gomock.Controller) *MocktestRPCClientProvider { - mock := &MocktestRPCClientProvider{ctrl: ctrl} - mock.recorder = &MocktestRPCClientProviderMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use -func (m *MocktestRPCClientProvider) EXPECT() *MocktestRPCClientProviderMockRecorder { - return m.recorder -} - -// RPCClient mocks base method -func (m *MocktestRPCClientProvider) RPCClient() *rpc.Client { - ret := m.ctrl.Call(m, "RPCClient") - ret0, _ := ret[0].(*rpc.Client) - return ret0 -} - -// RPCClient indicates an expected call of RPCClient -func (mr *MocktestRPCClientProviderMockRecorder) RPCClient() *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RPCClient", reflect.TypeOf((*MocktestRPCClientProvider)(nil).RPCClient)) -} diff --git a/geth/transactions/transactor.go b/geth/transactions/transactor.go new file mode 100644 index 000000000..fb5952f66 --- /dev/null +++ b/geth/transactions/transactor.go @@ -0,0 +1,205 @@ +package transactions + +import ( + "context" + "math/big" + "sync" + "time" + + ethereum "github.com/ethereum/go-ethereum" + gethcommon "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/log" + + "github.com/status-im/status-go/geth/account" + "github.com/status-im/status-go/geth/rpc" + "github.com/status-im/status-go/sign" +) + +const ( + // sendTxTimeout defines how many seconds to wait before returning result in sentTransaction(). + sendTxTimeout = 300 * time.Second + rpcCallTimeout = time.Minute + + defaultGas = 90000 +) + +// Transactor validates, signs transactions. +// It uses upstream to propagate transactions to the Ethereum network. +type Transactor struct { + pendingSignRequests *sign.PendingRequests + sender ethereum.TransactionSender + pendingNonceProvider PendingNonceProvider + gasCalculator GasCalculator + sendTxTimeout time.Duration + rpcCallTimeout time.Duration + networkID uint64 + + addrLock *AddrLocker + localNonce sync.Map + log log.Logger +} + +// NewTransactor returns a new Manager. +func NewTransactor(signRequests *sign.PendingRequests) *Transactor { + return &Transactor{ + pendingSignRequests: signRequests, + addrLock: &AddrLocker{}, + sendTxTimeout: sendTxTimeout, + rpcCallTimeout: rpcCallTimeout, + localNonce: sync.Map{}, + log: log.New("package", "status-go/geth/transactions.Manager"), + } +} + +// SetNetworkID selects a correct network. +func (t *Transactor) SetNetworkID(networkID uint64) { + t.networkID = networkID +} + +// SetRPCClient an RPC client. +func (t *Transactor) SetRPCClient(rpcClient *rpc.Client) { + rpcWrapper := newRPCWrapper(rpcClient) + t.sender = rpcWrapper + t.pendingNonceProvider = rpcWrapper + t.gasCalculator = rpcWrapper +} + +// SendTransaction is an implementation of eth_sendTransaction. It queues the tx to the sign queue. +func (t *Transactor) SendTransaction(ctx context.Context, args SendTxArgs) (gethcommon.Hash, error) { + if ctx == nil { + ctx = context.Background() + } + + completeFunc := func(acc *account.SelectedExtKey) (gethcommon.Hash, error) { + return t.validateAndPropagate(acc, args) + } + + request, err := t.pendingSignRequests.Add(ctx, args, completeFunc) + + if err != nil { + return gethcommon.Hash{}, err + } + + rst := t.pendingSignRequests.Wait(request.ID, t.sendTxTimeout) + return rst.Hash, rst.Error +} + +// make sure that only account which created the tx can complete it +func (t *Transactor) validateAccount(args SendTxArgs, selectedAccount *account.SelectedExtKey) error { + if selectedAccount == nil { + return account.ErrNoAccountSelected + } + + if args.From.Hex() != selectedAccount.Address.Hex() { + err := sign.ErrInvalidCompleteTxSender + t.log.Error("queued transaction does not belong to the selected account", "err", err) + return err + } + + return nil +} + +func (t *Transactor) validateAndPropagate(selectedAccount *account.SelectedExtKey, args SendTxArgs) (hash gethcommon.Hash, err error) { + // TODO (mandrigin): Send sign request ID as a parameter to this function and uncoment the log message + // m.log.Info("complete transaction", "id", queuedTx.ID) + if err := t.validateAccount(args, selectedAccount); err != nil { + return hash, err + } + if !args.Valid() { + return hash, ErrInvalidSendTxArgs + } + t.addrLock.LockAddr(args.From) + var localNonce uint64 + if val, ok := t.localNonce.Load(args.From); ok { + localNonce = val.(uint64) + } + var nonce uint64 + defer func() { + // nonce should be incremented only if tx completed without error + // if upstream node returned nonce higher than ours we will stick to it + if err == nil { + t.localNonce.Store(args.From, nonce+1) + } + t.addrLock.UnlockAddr(args.From) + + }() + ctx, cancel := context.WithTimeout(context.Background(), t.rpcCallTimeout) + defer cancel() + nonce, err = t.pendingNonceProvider.PendingNonceAt(ctx, args.From) + if err != nil { + return hash, err + } + // if upstream node returned nonce higher than ours we will use it, as it probably means + // that another client was used for sending transactions + if localNonce > nonce { + nonce = localNonce + } + gasPrice := (*big.Int)(args.GasPrice) + if args.GasPrice == nil { + ctx, cancel = context.WithTimeout(context.Background(), t.rpcCallTimeout) + defer cancel() + gasPrice, err = t.gasCalculator.SuggestGasPrice(ctx) + if err != nil { + return hash, err + } + } + + chainID := big.NewInt(int64(t.networkID)) + value := (*big.Int)(args.Value) + + var gas uint64 + if args.Gas == nil { + ctx, cancel = context.WithTimeout(context.Background(), t.rpcCallTimeout) + defer cancel() + gas, err = t.gasCalculator.EstimateGas(ctx, ethereum.CallMsg{ + From: args.From, + To: args.To, + GasPrice: gasPrice, + Value: value, + Data: args.GetInput(), + }) + if err != nil { + return hash, err + } + if gas < defaultGas { + t.log.Info("default gas will be used. estimated gas", gas, "is lower than", defaultGas) + gas = defaultGas + } + } else { + gas = uint64(*args.Gas) + } + + var tx *types.Transaction + if args.To != nil { + t.log.Info("New transaction", + "From", args.From, + "To", *args.To, + "Gas", gas, + "GasPrice", gasPrice, + "Value", value, + ) + tx = types.NewTransaction(nonce, *args.To, value, gas, gasPrice, args.GetInput()) + } else { + // contract creation is rare enough to log an expected address + t.log.Info("New contract", + "From", args.From, + "Gas", gas, + "GasPrice", gasPrice, + "Value", value, + "Contract address", crypto.CreateAddress(args.From, nonce), + ) + tx = types.NewContractCreation(nonce, value, gas, gasPrice, args.GetInput()) + } + signedTx, err := types.SignTx(tx, types.NewEIP155Signer(chainID), selectedAccount.AccountKey.PrivateKey) + if err != nil { + return hash, err + } + ctx, cancel = context.WithTimeout(context.Background(), t.rpcCallTimeout) + defer cancel() + if err := t.sender.SendTransaction(ctx, signedTx); err != nil { + return hash, err + } + return signedTx.Hash(), nil +} diff --git a/geth/transactions/txqueue_manager_test.go b/geth/transactions/transactor_test.go similarity index 56% rename from geth/transactions/txqueue_manager_test.go rename to geth/transactions/transactor_test.go index 98a2ac913..581106a92 100644 --- a/geth/transactions/txqueue_manager_test.go +++ b/geth/transactions/transactor_test.go @@ -4,7 +4,6 @@ import ( "context" "errors" "math/big" - "sync" "testing" "time" @@ -26,52 +25,52 @@ import ( "github.com/status-im/status-go/geth/params" "github.com/status-im/status-go/geth/rpc" "github.com/status-im/status-go/geth/transactions/fake" + "github.com/status-im/status-go/sign" + . "github.com/status-im/status-go/t/utils" ) +func simpleVerifyFunc(acc *account.SelectedExtKey) func(string) (*account.SelectedExtKey, error) { + return func(string) (*account.SelectedExtKey, error) { + return acc, nil + } +} + func TestTxQueueTestSuite(t *testing.T) { suite.Run(t, new(TxQueueTestSuite)) } type TxQueueTestSuite struct { suite.Suite - rpcClientMockCtrl *gomock.Controller - rpcClientMock *MocktestRPCClientProvider server *gethrpc.Server client *gethrpc.Client txServiceMockCtrl *gomock.Controller txServiceMock *fake.MockPublicTransactionPoolAPI nodeConfig *params.NodeConfig - manager *Manager + manager *Transactor } func (s *TxQueueTestSuite) SetupTest() { - s.rpcClientMockCtrl = gomock.NewController(s.T()) s.txServiceMockCtrl = gomock.NewController(s.T()) - s.rpcClientMock = NewMocktestRPCClientProvider(s.rpcClientMockCtrl) - s.server, s.txServiceMock = fake.NewTestServer(s.txServiceMockCtrl) s.client = gethrpc.DialInProc(s.server) - rpclient, _ := rpc.NewClient(s.client, params.UpstreamRPCConfig{}) - s.rpcClientMock.EXPECT().RPCClient().Return(rpclient) + rpcClient, _ := rpc.NewClient(s.client, params.UpstreamRPCConfig{}) // expected by simulated backend chainID := gethparams.AllEthashProtocolChanges.ChainId.Uint64() nodeConfig, err := params.NewNodeConfig("/tmp", "", chainID, true) s.Require().NoError(err) s.nodeConfig = nodeConfig - s.manager = NewManager(s.rpcClientMock) - s.manager.DisableNotificactions() - s.manager.completionTimeout = time.Second + s.manager = NewTransactor(sign.NewPendingRequests()) + s.manager.sendTxTimeout = time.Second s.manager.rpcCallTimeout = time.Second - s.manager.Start(chainID) + s.manager.SetNetworkID(chainID) + s.manager.SetRPCClient(rpcClient) } func (s *TxQueueTestSuite) TearDownTest() { - s.manager.Stop() - s.rpcClientMockCtrl.Finish() s.txServiceMockCtrl.Finish() s.server.Stop() s.client.Close() @@ -83,38 +82,44 @@ var ( testNonce = hexutil.Uint64(10) ) -func (s *TxQueueTestSuite) setupTransactionPoolAPI(tx *QueuedTx, returnNonce, resultNonce hexutil.Uint64, account *account.SelectedExtKey, txErr error) { +func (s *TxQueueTestSuite) completeFunc(args SendTxArgs) func(acc *account.SelectedExtKey) (gethcommon.Hash, error) { + return func(acc *account.SelectedExtKey) (gethcommon.Hash, error) { + return s.manager.validateAndPropagate(acc, args) + } +} + +func (s *TxQueueTestSuite) setupTransactionPoolAPI(args SendTxArgs, returnNonce, resultNonce hexutil.Uint64, account *account.SelectedExtKey, txErr error) { // Expect calls to gas functions only if there are no user defined values. // And also set the expected gas and gas price for RLP encoding the expected tx. var usedGas hexutil.Uint64 var usedGasPrice *big.Int s.txServiceMock.EXPECT().GetTransactionCount(gomock.Any(), account.Address, gethrpc.PendingBlockNumber).Return(&returnNonce, nil) - if tx.Args.GasPrice == nil { + if args.GasPrice == nil { usedGasPrice = (*big.Int)(testGasPrice) s.txServiceMock.EXPECT().GasPrice(gomock.Any()).Return(usedGasPrice, nil) } else { - usedGasPrice = (*big.Int)(tx.Args.GasPrice) + usedGasPrice = (*big.Int)(args.GasPrice) } - if tx.Args.Gas == nil { + if args.Gas == nil { s.txServiceMock.EXPECT().EstimateGas(gomock.Any(), gomock.Any()).Return(testGas, nil) usedGas = testGas } else { - usedGas = *tx.Args.Gas + usedGas = *args.Gas } // Prepare the transaction anD RLP encode it. - data := s.rlpEncodeTx(tx, s.nodeConfig, account, &resultNonce, usedGas, usedGasPrice) + data := s.rlpEncodeTx(args, s.nodeConfig, account, &resultNonce, usedGas, usedGasPrice) // Expect the RLP encoded transaction. s.txServiceMock.EXPECT().SendRawTransaction(gomock.Any(), data).Return(gethcommon.Hash{}, txErr) } -func (s *TxQueueTestSuite) rlpEncodeTx(tx *QueuedTx, config *params.NodeConfig, account *account.SelectedExtKey, nonce *hexutil.Uint64, gas hexutil.Uint64, gasPrice *big.Int) hexutil.Bytes { +func (s *TxQueueTestSuite) rlpEncodeTx(args SendTxArgs, config *params.NodeConfig, account *account.SelectedExtKey, nonce *hexutil.Uint64, gas hexutil.Uint64, gasPrice *big.Int) hexutil.Bytes { newTx := types.NewTransaction( uint64(*nonce), - gethcommon.Address(*tx.Args.To), - tx.Args.Value.ToInt(), + gethcommon.Address(*args.To), + args.Value.ToInt(), uint64(gas), gasPrice, - []byte(tx.Args.Input), + []byte(args.Input), ) chainID := big.NewInt(int64(config.NetworkID)) signedTx, err := types.SignTx(newTx, types.NewEIP155Signer(chainID), account.AccountKey.PrivateKey) @@ -160,141 +165,103 @@ func (s *TxQueueTestSuite) TestCompleteTransaction() { for _, testCase := range testCases { s.T().Run(testCase.name, func(t *testing.T) { s.SetupTest() - tx := Create(context.Background(), SendTxArgs{ + args := SendTxArgs{ From: account.FromAddress(TestConfig.Account1.Address), To: account.ToAddress(TestConfig.Account2.Address), Gas: testCase.gas, GasPrice: testCase.gasPrice, - }) - s.setupTransactionPoolAPI(tx, testNonce, testNonce, selectedAccount, nil) + } + s.setupTransactionPoolAPI(args, testNonce, testNonce, selectedAccount, nil) - s.NoError(s.manager.QueueTransaction(tx)) w := make(chan struct{}) var ( - hash gethcommon.Hash - err error + sendHash gethcommon.Hash + err error ) go func() { - hash, err = s.manager.CompleteTransaction(tx.ID, selectedAccount) - s.NoError(err) + var sendErr error + sendHash, sendErr = s.manager.SendTransaction(context.Background(), args) + s.NoError(sendErr) close(w) }() - rst := s.manager.WaitForTransaction(tx) - // Check that error is assigned to the transaction. - s.NoError(rst.Error) - // Transaction should be already removed from the queue. - s.False(s.manager.TransactionQueue().Has(tx.ID)) + for i := 10; i > 0; i-- { + if s.manager.pendingSignRequests.Count() > 0 { + break + } + time.Sleep(time.Millisecond) + } + + req := s.manager.pendingSignRequests.First() + s.NotNil(req) + approveHash, err := s.manager.pendingSignRequests.Approve(req.ID, "", simpleVerifyFunc(selectedAccount)) + s.NoError(err) s.NoError(WaitClosed(w, time.Second)) - s.Equal(hash, rst.Hash) + + // Transaction should be already removed from the queue. + s.False(s.manager.pendingSignRequests.Has(req.ID)) + s.Equal(sendHash, approveHash) }) } } -func (s *TxQueueTestSuite) TestCompleteTransactionMultipleTimes() { - key, _ := crypto.GenerateKey() - selectedAccount := &account.SelectedExtKey{ - Address: account.FromAddress(TestConfig.Account1.Address), - AccountKey: &keystore.Key{PrivateKey: key}, - } - - tx := Create(context.Background(), SendTxArgs{ - From: account.FromAddress(TestConfig.Account1.Address), - To: account.ToAddress(TestConfig.Account2.Address), - }) - - s.setupTransactionPoolAPI(tx, testNonce, testNonce, selectedAccount, nil) - - err := s.manager.QueueTransaction(tx) - s.NoError(err) - - var ( - wg sync.WaitGroup - mu sync.Mutex - completedTx int - inprogressTx int - txCount = 3 - ) - for i := 0; i < txCount; i++ { - wg.Add(1) - go func() { - defer wg.Done() - _, err := s.manager.CompleteTransaction(tx.ID, selectedAccount) - mu.Lock() - defer mu.Unlock() - if err == nil { - completedTx++ - } else if err == ErrQueuedTxInProgress { - inprogressTx++ - } else { - s.Fail("tx failed with unexpected error: ", err.Error()) - } - }() - } - - rst := s.manager.WaitForTransaction(tx) - // Check that error is assigned to the transaction. - s.NoError(rst.Error) - // Transaction should be already removed from the queue. - s.False(s.manager.TransactionQueue().Has(tx.ID)) - - // Wait for all CompleteTransaction calls. - wg.Wait() - s.Equal(1, completedTx, "only 1 tx expected to be completed") - s.Equal(txCount-1, inprogressTx, "txs expected to be reported as inprogress") -} - func (s *TxQueueTestSuite) TestAccountMismatch() { selectedAccount := &account.SelectedExtKey{ Address: account.FromAddress(TestConfig.Account2.Address), } - tx := Create(context.Background(), SendTxArgs{ + args := SendTxArgs{ From: account.FromAddress(TestConfig.Account1.Address), To: account.ToAddress(TestConfig.Account2.Address), - }) + } - s.NoError(s.manager.QueueTransaction(tx)) + go func() { + s.manager.SendTransaction(context.Background(), args) // nolint: errcheck + }() - _, err := s.manager.CompleteTransaction(tx.ID, selectedAccount) - s.Equal(err, ErrInvalidCompleteTxSender) + for i := 10; i > 0; i-- { + if s.manager.pendingSignRequests.Count() > 0 { + break + } + time.Sleep(time.Millisecond) + } + + req := s.manager.pendingSignRequests.First() + s.NotNil(req) + _, err := s.manager.pendingSignRequests.Approve(req.ID, "", simpleVerifyFunc(selectedAccount)) + s.Equal(err, sign.ErrInvalidCompleteTxSender) // Transaction should stay in the queue as mismatched accounts // is a recoverable error. - s.True(s.manager.TransactionQueue().Has(tx.ID)) + s.True(s.manager.pendingSignRequests.Has(req.ID)) } func (s *TxQueueTestSuite) TestDiscardTransaction() { - tx := Create(context.Background(), SendTxArgs{ + args := SendTxArgs{ From: account.FromAddress(TestConfig.Account1.Address), To: account.ToAddress(TestConfig.Account2.Address), - }) - - s.NoError(s.manager.QueueTransaction(tx)) + } w := make(chan struct{}) go func() { - s.NoError(s.manager.DiscardTransaction(tx.ID)) + _, err := s.manager.SendTransaction(context.Background(), args) + s.Equal(sign.ErrSignReqDiscarded, err) close(w) }() - rst := s.manager.WaitForTransaction(tx) - s.Equal(ErrQueuedTxDiscarded, rst.Error) - // Transaction should be already removed from the queue. - s.False(s.manager.TransactionQueue().Has(tx.ID)) + for i := 10; i > 0; i-- { + if s.manager.pendingSignRequests.Count() > 0 { + break + } + time.Sleep(time.Millisecond) + } + + req := s.manager.pendingSignRequests.First() + s.NotNil(req) + err := s.manager.pendingSignRequests.Discard(req.ID) + s.NoError(err) s.NoError(WaitClosed(w, time.Second)) } -func (s *TxQueueTestSuite) TestCompletionTimedOut() { - tx := Create(context.Background(), SendTxArgs{ - From: account.FromAddress(TestConfig.Account1.Address), - To: account.ToAddress(TestConfig.Account2.Address), - }) - - s.NoError(s.manager.QueueTransaction(tx)) - rst := s.manager.WaitForTransaction(tx) - s.Equal(ErrQueuedTxTimedOut, rst.Error) -} - // TestLocalNonce verifies that local nonce will be used unless // upstream nonce is updated and higher than a local // in test we will run 3 transaction with nonce zero returned by upstream @@ -310,49 +277,60 @@ func (s *TxQueueTestSuite) TestLocalNonce() { AccountKey: &keystore.Key{PrivateKey: key}, } nonce := hexutil.Uint64(0) + + go func() { + approved := 0 + for { + // 3 in a cycle, then 2 + if approved >= txCount+2 { + return + } + req := s.manager.pendingSignRequests.First() + if req == nil { + time.Sleep(time.Millisecond) + } else { + s.manager.pendingSignRequests.Approve(req.ID, "", simpleVerifyFunc(selectedAccount)) // nolint: errcheck + } + } + }() + for i := 0; i < txCount; i++ { - tx := Create(context.Background(), SendTxArgs{ + args := SendTxArgs{ From: account.FromAddress(TestConfig.Account1.Address), To: account.ToAddress(TestConfig.Account2.Address), - }) - s.setupTransactionPoolAPI(tx, nonce, hexutil.Uint64(i), selectedAccount, nil) - s.NoError(s.manager.QueueTransaction(tx)) - hash, err := s.manager.CompleteTransaction(tx.ID, selectedAccount) - rst := s.manager.WaitForTransaction(tx) - // simple sanity checks + } + s.setupTransactionPoolAPI(args, nonce, hexutil.Uint64(i), selectedAccount, nil) + + _, err := s.manager.SendTransaction(context.Background(), args) s.NoError(err) - s.NoError(rst.Error) - s.Equal(rst.Hash, hash) - resultNonce, _ := s.manager.localNonce.Load(tx.Args.From) + resultNonce, _ := s.manager.localNonce.Load(args.From) s.Equal(uint64(i)+1, resultNonce.(uint64)) } + nonce = hexutil.Uint64(5) - tx := Create(context.Background(), SendTxArgs{ + args := SendTxArgs{ From: account.FromAddress(TestConfig.Account1.Address), To: account.ToAddress(TestConfig.Account2.Address), - }) - s.setupTransactionPoolAPI(tx, nonce, nonce, selectedAccount, nil) - s.NoError(s.manager.QueueTransaction(tx)) - hash, err := s.manager.CompleteTransaction(tx.ID, selectedAccount) - rst := s.manager.WaitForTransaction(tx) + } + + s.setupTransactionPoolAPI(args, nonce, nonce, selectedAccount, nil) + + _, err := s.manager.SendTransaction(context.Background(), args) s.NoError(err) - s.NoError(rst.Error) - s.Equal(rst.Hash, hash) - resultNonce, _ := s.manager.localNonce.Load(tx.Args.From) + + resultNonce, _ := s.manager.localNonce.Load(args.From) s.Equal(uint64(nonce)+1, resultNonce.(uint64)) testErr := errors.New("test") s.txServiceMock.EXPECT().GetTransactionCount(gomock.Any(), selectedAccount.Address, gethrpc.PendingBlockNumber).Return(nil, testErr) - tx = Create(context.Background(), SendTxArgs{ + args = SendTxArgs{ From: account.FromAddress(TestConfig.Account1.Address), To: account.ToAddress(TestConfig.Account2.Address), - }) - s.NoError(s.manager.QueueTransaction(tx)) - _, err = s.manager.CompleteTransaction(tx.ID, selectedAccount) - rst = s.manager.WaitForTransaction(tx) + } + + _, err = s.manager.SendTransaction(context.Background(), args) s.EqualError(testErr, err.Error()) - s.EqualError(testErr, rst.Error.Error()) - resultNonce, _ = s.manager.localNonce.Load(tx.Args.From) + resultNonce, _ = s.manager.localNonce.Load(args.From) s.Equal(uint64(nonce)+1, resultNonce.(uint64)) } @@ -367,13 +345,27 @@ func (s *TxQueueTestSuite) TestContractCreation() { Address: testaddr, AccountKey: &keystore.Key{PrivateKey: key}, } - s.manager.ethTxClient = backend - tx := Create(context.Background(), SendTxArgs{ + s.manager.sender = backend + s.manager.gasCalculator = backend + s.manager.pendingNonceProvider = backend + tx := SendTxArgs{ From: testaddr, Input: hexutil.Bytes(gethcommon.FromHex(contract.ENSBin)), - }) - s.NoError(s.manager.QueueTransaction(tx)) - hash, err := s.manager.CompleteTransaction(tx.ID, selectedAccount) + } + + go func() { + for i := 1000; i > 0; i-- { + req := s.manager.pendingSignRequests.First() + if req == nil { + time.Sleep(time.Millisecond) + } else { + s.manager.pendingSignRequests.Approve(req.ID, "", simpleVerifyFunc(selectedAccount)) // nolint: errcheck + break + } + } + }() + + hash, err := s.manager.SendTransaction(context.Background(), tx) s.NoError(err) backend.Commit() receipt, err := backend.TransactionReceipt(context.TODO(), hash) diff --git a/geth/transactions/txqueue_manager.go b/geth/transactions/txqueue_manager.go deleted file mode 100644 index e5168506b..000000000 --- a/geth/transactions/txqueue_manager.go +++ /dev/null @@ -1,333 +0,0 @@ -package transactions - -import ( - "context" - "encoding/json" - "errors" - "math/big" - "sync" - "time" - - ethereum "github.com/ethereum/go-ethereum" - gethcommon "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/log" - - "github.com/ethereum/go-ethereum/core/types" - "github.com/status-im/status-go/geth/account" - - "github.com/status-im/status-go/geth/rpc" -) - -const ( - // SendTxDefaultErrorCode is sent by default, when error is not nil, but type is unknown/unexpected. - SendTxDefaultErrorCode = SendTransactionDefaultErrorCode - // DefaultTxSendCompletionTimeout defines how many seconds to wait before returning result in sentTransaction(). - DefaultTxSendCompletionTimeout = 300 * time.Second - - defaultGas = 90000 - defaultTimeout = time.Minute -) - -var ( - // ErrUnexpectedArgs returned when args are of unexpected length. - ErrUnexpectedArgs = errors.New("unexpected args") -) - -// RPCClientProvider is an interface that provides a way -// to obtain an rpc.Client. -type RPCClientProvider interface { - RPCClient() *rpc.Client -} - -// Manager provides means to manage internal Status Backend (injected into LES) -type Manager struct { - rpcClientProvider RPCClientProvider - txQueue *TxQueue - ethTxClient EthTransactor - notify bool - completionTimeout time.Duration - rpcCallTimeout time.Duration - networkID uint64 - - addrLock *AddrLocker - localNonce sync.Map - log log.Logger -} - -// NewManager returns a new Manager. -func NewManager(rpcClientProvider RPCClientProvider) *Manager { - return &Manager{ - rpcClientProvider: rpcClientProvider, - txQueue: newQueue(), - addrLock: &AddrLocker{}, - notify: true, - completionTimeout: DefaultTxSendCompletionTimeout, - rpcCallTimeout: defaultTimeout, - localNonce: sync.Map{}, - log: log.New("package", "status-go/geth/transactions.Manager"), - } -} - -// DisableNotificactions turns off notifications on enqueue and return of tx. -// It is not thread safe and must be called only before manager is started. -func (m *Manager) DisableNotificactions() { - m.notify = false -} - -// Start starts accepting new transactions into the queue. -func (m *Manager) Start(networkID uint64) { - m.log.Info("start Manager") - m.networkID = networkID - m.ethTxClient = NewEthTxClient(m.rpcClientProvider.RPCClient()) - m.txQueue.Start() -} - -// Stop stops accepting new transactions into the queue. -func (m *Manager) Stop() { - m.log.Info("stop Manager") - m.txQueue.Stop() -} - -// TransactionQueue returns a reference to the queue. -func (m *Manager) TransactionQueue() *TxQueue { - return m.txQueue -} - -// QueueTransaction puts a transaction into the queue. -func (m *Manager) QueueTransaction(tx *QueuedTx) error { - if !tx.Args.Valid() { - return ErrInvalidSendTxArgs - } - to := "" - if tx.Args.To != nil { - to = tx.Args.To.Hex() - } - m.log.Info("queue a new transaction", "id", tx.ID, "from", tx.Args.From.Hex(), "to", to) - if err := m.txQueue.Enqueue(tx); err != nil { - return err - } - if m.notify { - NotifyOnEnqueue(tx) - } - return nil -} - -func (m *Manager) txDone(tx *QueuedTx, hash gethcommon.Hash, err error) { - if err := m.txQueue.Done(tx.ID, hash, err); err == ErrQueuedTxIDNotFound { - m.log.Warn("transaction is already removed from a queue", "ID", tx.ID) - return - } - if m.notify { - NotifyOnReturn(tx, err) - } -} - -// WaitForTransaction adds a transaction to the queue and blocks -// until it's completed, discarded or times out. -func (m *Manager) WaitForTransaction(tx *QueuedTx) Result { - m.log.Info("wait for transaction", "id", tx.ID) - // now wait up until transaction is: - // - completed (via CompleteQueuedTransaction), - // - discarded (via DiscardQueuedTransaction) - // - or times out - for { - select { - case rst := <-tx.Result: - return rst - case <-time.After(m.completionTimeout): - m.txDone(tx, gethcommon.Hash{}, ErrQueuedTxTimedOut) - } - } -} - -// NotifyErrored sends a notification for the given transaction -func (m *Manager) NotifyErrored(id string, inputError error) error { - tx, err := m.txQueue.Get(id) - if err != nil { - m.log.Warn("error getting a queued transaction", "err", err) - return err - } - - if m.notify { - NotifyOnReturn(tx, inputError) - } - - return nil -} - -// CompleteTransaction instructs backend to complete sending of a given transaction. -func (m *Manager) CompleteTransaction(id string, account *account.SelectedExtKey) (hash gethcommon.Hash, err error) { - m.log.Info("complete transaction", "id", id) - tx, err := m.txQueue.Get(id) - if err != nil { - m.log.Warn("error getting a queued transaction", "err", err) - return hash, err - } - if err := m.txQueue.LockInprogress(id); err != nil { - m.log.Warn("can't process transaction", "err", err) - return hash, err - } - - if err := m.validateAccount(tx, account); err != nil { - m.txDone(tx, hash, err) - return hash, err - } - hash, err = m.completeTransaction(account, tx) - m.log.Info("finally completed transaction", "id", tx.ID, "hash", hash, "err", err) - m.txDone(tx, hash, err) - return hash, err -} - -// make sure that only account which created the tx can complete it -func (m *Manager) validateAccount(tx *QueuedTx, selectedAccount *account.SelectedExtKey) error { - if selectedAccount == nil { - return account.ErrNoAccountSelected - } - - // make sure that only account which created the tx can complete it - if tx.Args.From.Hex() != selectedAccount.Address.Hex() { - m.log.Warn("queued transaction does not belong to the selected account", "err", ErrInvalidCompleteTxSender) - return ErrInvalidCompleteTxSender - } - - return nil -} - -func (m *Manager) completeTransaction(selectedAccount *account.SelectedExtKey, queuedTx *QueuedTx) (hash gethcommon.Hash, err error) { - m.log.Info("complete transaction", "id", queuedTx.ID) - m.addrLock.LockAddr(queuedTx.Args.From) - var localNonce uint64 - if val, ok := m.localNonce.Load(queuedTx.Args.From); ok { - localNonce = val.(uint64) - } - var nonce uint64 - defer func() { - // nonce should be incremented only if tx completed without error - // if upstream node returned nonce higher than ours we will stick to it - if err == nil { - m.localNonce.Store(queuedTx.Args.From, nonce+1) - } - m.addrLock.UnlockAddr(queuedTx.Args.From) - - }() - ctx, cancel := context.WithTimeout(context.Background(), m.rpcCallTimeout) - defer cancel() - nonce, err = m.ethTxClient.PendingNonceAt(ctx, queuedTx.Args.From) - if err != nil { - return hash, err - } - // if upstream node returned nonce higher than ours we will use it, as it probably means - // that another client was used for sending transactions - if localNonce > nonce { - nonce = localNonce - } - args := queuedTx.Args - if !args.Valid() { - return hash, ErrInvalidSendTxArgs - } - gasPrice := (*big.Int)(args.GasPrice) - if args.GasPrice == nil { - ctx, cancel = context.WithTimeout(context.Background(), m.rpcCallTimeout) - defer cancel() - gasPrice, err = m.ethTxClient.SuggestGasPrice(ctx) - if err != nil { - return hash, err - } - } - - chainID := big.NewInt(int64(m.networkID)) - value := (*big.Int)(args.Value) - - var gas uint64 - if args.Gas == nil { - ctx, cancel = context.WithTimeout(context.Background(), m.rpcCallTimeout) - defer cancel() - gas, err = m.ethTxClient.EstimateGas(ctx, ethereum.CallMsg{ - From: args.From, - To: args.To, - GasPrice: gasPrice, - Value: value, - Data: args.GetInput(), - }) - if err != nil { - return hash, err - } - if gas < defaultGas { - m.log.Info("default gas will be used. estimated gas", gas, "is lower than", defaultGas) - gas = defaultGas - } - } else { - gas = uint64(*args.Gas) - } - var tx *types.Transaction - if args.To != nil { - m.log.Info("New transaction", - "From", args.From, - "To", *args.To, - "Gas", gas, - "GasPrice", gasPrice, - "Value", value, - ) - tx = types.NewTransaction(nonce, *args.To, value, gas, gasPrice, args.GetInput()) - } else { - // contract creation is rare enough to log an expected address - m.log.Info("New contract", - "From", args.From, - "Gas", gas, - "GasPrice", gasPrice, - "Value", value, - "Contract address", crypto.CreateAddress(args.From, nonce), - ) - tx = types.NewContractCreation(nonce, value, gas, gasPrice, args.GetInput()) - } - signedTx, err := types.SignTx(tx, types.NewEIP155Signer(chainID), selectedAccount.AccountKey.PrivateKey) - if err != nil { - return hash, err - } - ctx, cancel = context.WithTimeout(context.Background(), m.rpcCallTimeout) - defer cancel() - if err := m.ethTxClient.SendTransaction(ctx, signedTx); err != nil { - return hash, err - } - return signedTx.Hash(), nil -} - -// DiscardTransaction discards a given transaction from transaction queue -func (m *Manager) DiscardTransaction(id string) error { - tx, err := m.txQueue.Get(id) - if err != nil { - return err - } - err = m.txQueue.Done(id, gethcommon.Hash{}, ErrQueuedTxDiscarded) - if m.notify { - NotifyOnReturn(tx, ErrQueuedTxDiscarded) - } - return err -} - -// SendTransactionRPCHandler is a handler for eth_sendTransaction method. -// It accepts one param which is a slice with a map of transaction params. -func (m *Manager) SendTransactionRPCHandler(ctx context.Context, args ...interface{}) (interface{}, error) { - m.log.Debug("SendTransactionRPCHandler called", "ARGS", args) - if len(args) != 1 { - return nil, ErrUnexpectedArgs - } - data, err := json.Marshal(args[0]) - if err != nil { - return nil, err - } - var txArgs SendTxArgs - if err := json.Unmarshal(data, &txArgs); err != nil { - return nil, err - } - tx := Create(ctx, txArgs) - if err := m.QueueTransaction(tx); err != nil { - return nil, err - } - rst := m.WaitForTransaction(tx) - if rst.Error != nil { - return nil, rst.Error - } - return rst.Hash.Hex(), nil -} diff --git a/geth/transactions/types.go b/geth/transactions/types.go index 7cb9ef993..f1f02af4b 100644 --- a/geth/transactions/types.go +++ b/geth/transactions/types.go @@ -3,29 +3,30 @@ package transactions import ( "bytes" "context" + "encoding/json" "errors" + ethereum "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" ) -// errors var ( + // ErrInvalidSendTxArgs is returned when the structure of SendTxArgs is ambigious. ErrInvalidSendTxArgs = errors.New("Transaction arguments are invalid (are both 'input' and 'data' fields used?)") + // ErrUnexpectedArgs returned when args are of unexpected length. + ErrUnexpectedArgs = errors.New("unexpected args") ) -// Result is a JSON returned from transaction complete function (used internally) -type Result struct { - Hash common.Hash - Error error +// PendingNonceProvider provides information about nonces. +type PendingNonceProvider interface { + PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) } -// QueuedTx holds enough information to complete the queued transaction. -type QueuedTx struct { - ID string - Context context.Context - Args SendTxArgs - Result chan Result +// GasCalculator provides methods for estimating and pricing gas. +type GasCalculator interface { + ethereum.GasEstimator + ethereum.GasPricer } // SendTxArgs represents the arguments to submit a new transaction into the transaction pool. @@ -68,3 +69,20 @@ func (args SendTxArgs) GetInput() hexutil.Bytes { func isNilOrEmpty(bytes hexutil.Bytes) bool { return bytes == nil || len(bytes) == 0 } + +// RPCCalltoSendTxArgs creates SendTxArgs based on RPC parameters +func RPCCalltoSendTxArgs(args ...interface{}) (SendTxArgs, error) { + var txArgs SendTxArgs + if len(args) != 1 { + return txArgs, ErrUnexpectedArgs + } + data, err := json.Marshal(args[0]) + if err != nil { + return txArgs, err + } + if err := json.Unmarshal(data, &txArgs); err != nil { + return txArgs, err + } + + return txArgs, nil +} diff --git a/geth/transactions/utils.go b/geth/transactions/utils.go deleted file mode 100644 index 939bc2cbb..000000000 --- a/geth/transactions/utils.go +++ /dev/null @@ -1,89 +0,0 @@ -package transactions - -import ( - "context" - "errors" - "fmt" - "io" - "os" - "reflect" - "runtime/debug" - - "github.com/pborman/uuid" - "github.com/status-im/status-go/geth/signal" -) - -const ( - // MessageIDKey is a key for message ID - // This ID is required to track from which chat a given send transaction request is coming. - MessageIDKey = contextKey("message_id") -) - -type contextKey string // in order to make sure that our context key does not collide with keys from other packages - -//ErrTxQueueRunFailure - error running transaction queue -var ErrTxQueueRunFailure = errors.New("error running transaction queue") - -// haltOnPanic recovers from panic, logs issue, sends upward notification, and exits -func haltOnPanic() { - if r := recover(); r != nil { - err := fmt.Errorf("%v: %v", ErrTxQueueRunFailure, r) - - // send signal up to native app - signal.Send(signal.Envelope{ - Type: signal.EventNodeCrashed, - Event: signal.NodeCrashEvent{ - Error: err, - }, - }) - - fatalf(err) // os.exit(1) is called internally - } -} - -// messageIDFromContext returns message id from context (if exists) -func messageIDFromContext(ctx context.Context) string { - if ctx == nil { - return "" - } - if messageID, ok := ctx.Value(MessageIDKey).(string); ok { - return messageID - } - - return "" -} - -// fatalf is used to halt the execution. -// When called the function prints stack end exits. -// Failure is logged into both StdErr and StdOut. -func fatalf(reason interface{}, args ...interface{}) { - // decide on output stream - w := io.MultiWriter(os.Stdout, os.Stderr) - outf, _ := os.Stdout.Stat() // nolint: gas - errf, _ := os.Stderr.Stat() // nolint: gas - if outf != nil && errf != nil && os.SameFile(outf, errf) { - w = os.Stderr - } - - // find out whether error or string has been passed as a reason - r := reflect.ValueOf(reason) - if r.Kind() == reflect.String { - fmt.Fprintf(w, "Fatal Failure: %v\n%v\n", reason.(string), args) - } else { - fmt.Fprintf(w, "Fatal Failure: %v\n", reason.(error)) - } - - debug.PrintStack() - - os.Exit(1) -} - -// Create returns a transaction object. -func Create(ctx context.Context, args SendTxArgs) *QueuedTx { - return &QueuedTx{ - ID: uuid.New(), - Context: ctx, - Args: args, - Result: make(chan Result, 1), - } -} diff --git a/lib/library_test_utils.go b/lib/library_test_utils.go index 5f94e044c..0ddc45b44 100644 --- a/lib/library_test_utils.go +++ b/lib/library_test_utils.go @@ -34,6 +34,7 @@ import ( "github.com/status-im/status-go/geth/params" "github.com/status-im/status-go/geth/signal" "github.com/status-im/status-go/geth/transactions" + "github.com/status-im/status-go/sign" "github.com/status-im/status-go/static" . "github.com/status-im/status-go/t/utils" //nolint: golint ) @@ -767,10 +768,8 @@ func testAccountLogout(t *testing.T) bool { } func testCompleteTransaction(t *testing.T) bool { - txQueueManager := statusAPI.TxQueueManager() - txQueue := txQueueManager.TransactionQueue() + signRequests := statusAPI.PendingSignRequests() - txQueue.Reset() EnsureNodeSync(statusAPI.StatusNode().EnsureSync) // log into account from which transactions will be sent @@ -792,7 +791,7 @@ func testCompleteTransaction(t *testing.T) bool { t.Errorf("cannot unmarshal event's JSON: %s. Error %q", jsonEvent, err) return } - if envelope.Type == transactions.EventTransactionQueued { + if envelope.Type == sign.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) t.Logf("transaction queued (will be completed shortly): {id: %s}\n", event["id"].(string)) @@ -839,7 +838,7 @@ func testCompleteTransaction(t *testing.T) bool { return false } - if txQueue.Count() != 0 { + if signRequests.Count() != 0 { t.Error("tx queue must be empty at this point") return false } @@ -848,8 +847,7 @@ func testCompleteTransaction(t *testing.T) bool { } func testCompleteMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocyclo - txQueue := statusAPI.TxQueueManager().TransactionQueue() - txQueue.Reset() + signRequests := statusAPI.PendingSignRequests() // log into account from which transactions will be sent if err := statusAPI.SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password); err != nil { @@ -870,7 +868,7 @@ func testCompleteMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocyc t.Errorf("cannot unmarshal event's JSON: %s", jsonEvent) return } - if envelope.Type == transactions.EventTransactionQueued { + if envelope.Type == sign.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) txID = event["id"].(string) t.Logf("transaction queued (will be completed in a single call, once aggregated): {id: %s}\n", txID) @@ -917,7 +915,7 @@ func testCompleteMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocyc } results := resultsStruct.Results - if len(results) != (testTxCount+1) || results["invalid-tx-id"].Error != transactions.ErrQueuedTxIDNotFound.Error() { + if len(results) != (testTxCount+1) || results["invalid-tx-id"].Error != sign.ErrSignReqNotFound.Error() { t.Errorf("cannot complete txs: %v", results) return } @@ -942,7 +940,7 @@ func testCompleteMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocyc time.Sleep(1 * time.Second) // make sure that tx complete signal propagates for _, txID := range parsedIDs { - if txQueue.Has(string(txID)) { + if signRequests.Has(string(txID)) { t.Errorf("txqueue should not have test tx at this point (it should be completed): %s", txID) return } @@ -972,7 +970,7 @@ func testCompleteMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocyc return false } - if txQueue.Count() != 0 { + if signRequests.Count() != 0 { t.Error("tx queue must be empty at this point") return false } @@ -981,8 +979,7 @@ func testCompleteMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocyc } func testDiscardTransaction(t *testing.T) bool { //nolint: gocyclo - txQueue := statusAPI.TxQueueManager().TransactionQueue() - txQueue.Reset() + signRequests := statusAPI.PendingSignRequests() // log into account from which transactions will be sent if err := statusAPI.SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password); err != nil { @@ -1003,12 +1000,12 @@ func testDiscardTransaction(t *testing.T) bool { //nolint: gocyclo t.Errorf("cannot unmarshal event's JSON: %s", jsonEvent) return } - if envelope.Type == transactions.EventTransactionQueued { + if envelope.Type == sign.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) txID = event["id"].(string) t.Logf("transaction queued (will be discarded soon): {id: %s}\n", txID) - if !txQueue.Has(string(txID)) { + if !signRequests.Has(string(txID)) { t.Errorf("txqueue should still have test tx: %s", txID) return } @@ -1028,13 +1025,12 @@ func testDiscardTransaction(t *testing.T) bool { //nolint: gocyclo // try completing discarded transaction _, err := statusAPI.CompleteTransaction(string(txID), TestConfig.Account1.Password) - if err != transactions.ErrQueuedTxIDNotFound { + if err != sign.ErrSignReqNotFound { t.Error("expects tx not found, but call to CompleteTransaction succeeded") return } - time.Sleep(1 * time.Second) // make sure that tx complete signal propagates - if txQueue.Has(string(txID)) { + if signRequests.Has(string(txID)) { t.Errorf("txqueue should not have test tx at this point (it should be discarded): %s", txID) return } @@ -1042,19 +1038,19 @@ func testDiscardTransaction(t *testing.T) bool { //nolint: gocyclo completeQueuedTransaction <- struct{}{} // so that timeout is aborted } - if envelope.Type == transactions.EventTransactionFailed { + if envelope.Type == sign.EventTransactionFailed { event := envelope.Event.(map[string]interface{}) t.Logf("transaction return event received: {id: %s}\n", event["id"].(string)) receivedErrMessage := event["error_message"].(string) - expectedErrMessage := transactions.ErrQueuedTxDiscarded.Error() + expectedErrMessage := sign.ErrSignReqDiscarded.Error() if receivedErrMessage != expectedErrMessage { t.Errorf("unexpected error message received: got %v", receivedErrMessage) return } receivedErrCode := event["error_code"].(string) - if receivedErrCode != strconv.Itoa(transactions.SendTransactionDiscardedErrorCode) { + if receivedErrCode != strconv.Itoa(sign.SendTransactionDiscardedErrorCode) { t.Errorf("unexpected error code received: got %v", receivedErrCode) return } @@ -1069,7 +1065,8 @@ func testDiscardTransaction(t *testing.T) bool { //nolint: gocyclo To: account.ToAddress(TestConfig.Account2.Address), Value: (*hexutil.Big)(big.NewInt(1000000000000)), }) - if err != transactions.ErrQueuedTxDiscarded { + time.Sleep(1 * time.Second) + if err != sign.ErrSignReqDiscarded { t.Errorf("expected error not thrown: %v", err) return false } @@ -1079,7 +1076,7 @@ func testDiscardTransaction(t *testing.T) bool { //nolint: gocyclo return false } - if txQueue.Count() != 0 { + if signRequests.Count() != 0 { t.Error("tx queue must be empty at this point") return false } @@ -1093,8 +1090,7 @@ func testDiscardTransaction(t *testing.T) bool { //nolint: gocyclo } func testDiscardMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocyclo - txQueue := statusAPI.TxQueueManager().TransactionQueue() - txQueue.Reset() + signRequests := statusAPI.PendingSignRequests() // log into account from which transactions will be sent if err := statusAPI.SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password); err != nil { @@ -1116,12 +1112,12 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocycl t.Errorf("cannot unmarshal event's JSON: %s", jsonEvent) return } - if envelope.Type == transactions.EventTransactionQueued { + if envelope.Type == sign.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) txID = event["id"].(string) t.Logf("transaction queued (will be discarded soon): {id: %s}\n", txID) - if !txQueue.Has(string(txID)) { + if !signRequests.Has(string(txID)) { t.Errorf("txqueue should still have test tx: %s", txID) return } @@ -1129,19 +1125,19 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocycl txIDs <- txID } - if envelope.Type == transactions.EventTransactionFailed { + if envelope.Type == sign.EventTransactionFailed { event := envelope.Event.(map[string]interface{}) t.Logf("transaction return event received: {id: %s}\n", event["id"].(string)) receivedErrMessage := event["error_message"].(string) - expectedErrMessage := transactions.ErrQueuedTxDiscarded.Error() + expectedErrMessage := sign.ErrSignReqDiscarded.Error() if receivedErrMessage != expectedErrMessage { t.Errorf("unexpected error message received: got %v", receivedErrMessage) return } receivedErrCode := event["error_code"].(string) - if receivedErrCode != strconv.Itoa(transactions.SendTransactionDiscardedErrorCode) { + if receivedErrCode != strconv.Itoa(sign.SendTransactionDiscardedErrorCode) { t.Errorf("unexpected error code received: got %v", receivedErrCode) return } @@ -1160,7 +1156,7 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocycl To: account.ToAddress(TestConfig.Account2.Address), Value: (*hexutil.Big)(big.NewInt(1000000000000)), }) - if err != transactions.ErrQueuedTxDiscarded { + if err != sign.ErrSignReqDiscarded { t.Errorf("expected error not thrown: %v", err) return } @@ -1191,7 +1187,7 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocycl } discardResults := discardResultsStruct.Results - if len(discardResults) != 1 || discardResults["invalid-tx-id"].Error != transactions.ErrQueuedTxIDNotFound.Error() { + if len(discardResults) != 1 || discardResults["invalid-tx-id"].Error != sign.ErrSignReqNotFound.Error() { t.Errorf("cannot discard txs: %v", discardResults) return } @@ -1213,7 +1209,7 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocycl t.Errorf("tx id not set in result: expected id is %s", txID) return } - if txResult.Error != transactions.ErrQueuedTxIDNotFound.Error() { + if txResult.Error != sign.ErrSignReqNotFound.Error() { t.Errorf("invalid error for %s", txResult.Hash) return } @@ -1225,7 +1221,7 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocycl time.Sleep(1 * time.Second) // make sure that tx complete signal propagates for _, txID := range parsedIDs { - if txQueue.Has(string(txID)) { + if signRequests.Has(string(txID)) { t.Errorf("txqueue should not have test tx at this point (it should be discarded): %s", txID) return } @@ -1254,7 +1250,7 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocycl return false } - if txQueue.Count() != 0 { + if signRequests.Count() != 0 { t.Error("tx queue must be empty at this point") return false } @@ -1430,7 +1426,7 @@ func startTestNode(t *testing.T) <-chan struct{} { return } - if envelope.Type == transactions.EventTransactionQueued { + if envelope.Type == sign.EventTransactionQueued { } if envelope.Type == signal.EventNodeStarted { t.Log("Node started, but we wait till it be ready") diff --git a/sign/README.md b/sign/README.md new file mode 100644 index 000000000..64009f8ab --- /dev/null +++ b/sign/README.md @@ -0,0 +1,13 @@ +# sign +`sign` package represents the API and signals for sending and receiving +signature request to and from our API user. + +When a method is called that requires an additional signature confirmation from +a user (like, a transaction), it gets it's sign request. + +Client of the API is then nofified of the sign request. + +Client has a chance to approve the sign request (by providing a valid password) +or to discard it. When the request is approved, the locked functinality is +executed. + diff --git a/sign/errors.go b/sign/errors.go new file mode 100644 index 000000000..b49e31462 --- /dev/null +++ b/sign/errors.go @@ -0,0 +1,36 @@ +package sign + +import ( + "errors" + + "github.com/ethereum/go-ethereum/accounts/keystore" + "github.com/status-im/status-go/geth/account" +) + +// TODO (mandrigin): Change values of these errors when API change is made. +var ( + //ErrSignReqNotFound - error transaction hash not found + ErrSignReqNotFound = errors.New("transaction hash not found") + //ErrSignReqInProgress - error transaction is in progress + ErrSignReqInProgress = errors.New("transaction is in progress") + // TODO (mandrigin): to be moved to `transactions` package + //ErrInvalidCompleteTxSender - error transaction with invalid sender + ErrInvalidCompleteTxSender = errors.New("transaction can only be completed by the same account which created it") + //ErrSignReqTimedOut - error transaction sending timed out + ErrSignReqTimedOut = errors.New("transaction sending timed out") + //ErrSignReqDiscarded - error transaction discarded + ErrSignReqDiscarded = errors.New("transaction has been discarded") +) + +// remove from queue on any error (except for transient ones) and propagate +// defined as map[string]bool because errors from ethclient returned wrapped as jsonError +var transientErrs = map[string]bool{ + keystore.ErrDecrypt.Error(): true, // wrong password + ErrInvalidCompleteTxSender.Error(): true, // completing tx create from another account + account.ErrNoAccountSelected.Error(): true, // account not selected +} + +func isTransient(err error) bool { + _, transient := transientErrs[err.Error()] + return transient +} diff --git a/geth/transactions/notifications.go b/sign/notifications.go similarity index 55% rename from geth/transactions/notifications.go rename to sign/notifications.go index 4434b3e0f..f6214629e 100644 --- a/geth/transactions/notifications.go +++ b/sign/notifications.go @@ -1,6 +1,8 @@ -package transactions +package sign import ( + "context" + "github.com/ethereum/go-ethereum/accounts/keystore" "github.com/status-im/status-go/geth/signal" ) @@ -25,57 +27,73 @@ const ( SendTransactionDiscardedErrorCode ) +const ( + // MessageIDKey is a key for message ID + // This ID is required to track from which chat a given send transaction request is coming. + MessageIDKey = contextKey("message_id") +) + +type contextKey string // in order to make sure that our context key does not collide with keys from other packages + +// messageIDFromContext returns message id from context (if exists) +func messageIDFromContext(ctx context.Context) string { + if ctx == nil { + return "" + } + if messageID, ok := ctx.Value(MessageIDKey).(string); ok { + return messageID + } + + return "" +} + var txReturnCodes = map[error]int{ - nil: SendTransactionNoErrorCode, - keystore.ErrDecrypt: SendTransactionPasswordErrorCode, - ErrQueuedTxTimedOut: SendTransactionTimeoutErrorCode, - ErrQueuedTxDiscarded: SendTransactionDiscardedErrorCode, + nil: SendTransactionNoErrorCode, + keystore.ErrDecrypt: SendTransactionPasswordErrorCode, + ErrSignReqTimedOut: SendTransactionTimeoutErrorCode, + ErrSignReqDiscarded: SendTransactionDiscardedErrorCode, } // SendTransactionEvent is a signal sent on a send transaction request type SendTransactionEvent struct { - ID string `json:"id"` - Args SendTxArgs `json:"args"` - MessageID string `json:"message_id"` + ID string `json:"id"` + Args interface{} `json:"args"` + MessageID string `json:"message_id"` } // NotifyOnEnqueue returns handler that processes incoming tx queue requests -func NotifyOnEnqueue(queuedTx *QueuedTx) { +func NotifyOnEnqueue(request *Request) { signal.Send(signal.Envelope{ Type: EventTransactionQueued, Event: SendTransactionEvent{ - ID: queuedTx.ID, - Args: queuedTx.Args, - MessageID: messageIDFromContext(queuedTx.Context), + ID: request.ID, + Args: request.Meta, + MessageID: messageIDFromContext(request.context), }, }) } // ReturnSendTransactionEvent is a JSON returned whenever transaction send is returned type ReturnSendTransactionEvent struct { - ID string `json:"id"` - Args SendTxArgs `json:"args"` - MessageID string `json:"message_id"` - ErrorMessage string `json:"error_message"` - ErrorCode int `json:"error_code,string"` + ID string `json:"id"` + Args interface{} `json:"args"` + MessageID string `json:"message_id"` + ErrorMessage string `json:"error_message"` + ErrorCode int `json:"error_code,string"` } // NotifyOnReturn returns handler that processes responses from internal tx manager -func NotifyOnReturn(queuedTx *QueuedTx, err error) { +func NotifyOnReturn(request *Request, err error) { // we don't want to notify a user if tx was sent successfully if err == nil { return } - // discard notifications with empty tx - if queuedTx == nil { - return - } signal.Send(signal.Envelope{ Type: EventTransactionFailed, Event: ReturnSendTransactionEvent{ - ID: queuedTx.ID, - Args: queuedTx.Args, - MessageID: messageIDFromContext(queuedTx.Context), + ID: request.ID, + Args: request.Meta, + MessageID: messageIDFromContext(request.context), ErrorMessage: err.Error(), ErrorCode: sendTransactionErrorCode(err), }, @@ -86,5 +104,5 @@ func sendTransactionErrorCode(err error) int { if code, ok := txReturnCodes[err]; ok { return code } - return SendTxDefaultErrorCode + return SendTransactionDefaultErrorCode } diff --git a/sign/pending_requests.go b/sign/pending_requests.go new file mode 100644 index 000000000..969d65a49 --- /dev/null +++ b/sign/pending_requests.go @@ -0,0 +1,172 @@ +package sign + +import ( + "context" + "sync" + "time" + + gethcommon "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" + "github.com/status-im/status-go/geth/account" +) + +type verifyFunc func(string) (*account.SelectedExtKey, error) + +// PendingRequests is a capped container that holds pending signing requests. +type PendingRequests struct { + mu sync.RWMutex // to guard transactions map + requests map[string]*Request + + log log.Logger +} + +// NewPendingRequests creates a new requests list +func NewPendingRequests() *PendingRequests { + logger := log.New("package", "status-go/sign.PendingRequests") + + return &PendingRequests{ + requests: make(map[string]*Request), + log: logger, + } +} + +// Add a new signing request. +func (rs *PendingRequests) Add(ctx context.Context, meta Meta, completeFunc completeFunc) (*Request, error) { + rs.mu.Lock() + defer rs.mu.Unlock() + + request := newRequest(ctx, meta, completeFunc) + rs.requests[request.ID] = request + rs.log.Info("signing request is created", "ID", request.ID) + + go NotifyOnEnqueue(request) + + return request, nil +} + +// Get returns a signing request by it's ID. +func (rs *PendingRequests) Get(id string) (*Request, error) { + rs.mu.RLock() + defer rs.mu.RUnlock() + + if request, ok := rs.requests[id]; ok { + return request, nil + } + return nil, ErrSignReqNotFound +} + +// First returns a first signing request (if exists, nil otherwise). +func (rs *PendingRequests) First() *Request { + rs.mu.RLock() + defer rs.mu.RUnlock() + + for _, req := range rs.requests { + return req + } + + return nil +} + +// Approve a signing request by it's ID. Requires a valid password and a verification function. +func (rs *PendingRequests) Approve(id string, password string, verify verifyFunc) (hash gethcommon.Hash, err error) { + rs.log.Info("complete transaction", "id", id) + request, err := rs.tryLock(id) + if err != nil { + rs.log.Warn("can't process transaction", "err", err) + return hash, err + } + + selectedAccount, err := verify(password) + if err != nil { + rs.complete(request, hash, err) + return hash, err + } + + hash, err = request.completeFunc(selectedAccount) + rs.log.Info("finally completed transaction", "id", request.ID, "hash", hash, "err", err) + + rs.complete(request, hash, err) + + return hash, err +} + +// Discard remove a signing request from the list of pending requests. +func (rs *PendingRequests) Discard(id string) error { + request, err := rs.Get(id) + if err != nil { + return err + } + + rs.complete(request, gethcommon.Hash{}, ErrSignReqDiscarded) + return nil +} + +// Wait blocks until a request with a specified ID is completed (approved or discarded) +func (rs *PendingRequests) Wait(id string, timeout time.Duration) Result { + request, err := rs.Get(id) + if err != nil { + return Result{Error: err} + } + for { + select { + case rst := <-request.result: + return rst + case <-time.After(timeout): + rs.complete(request, gethcommon.Hash{}, ErrSignReqTimedOut) + } + } +} + +// Count returns number of currently pending requests +func (rs *PendingRequests) Count() int { + rs.mu.RLock() + defer rs.mu.RUnlock() + return len(rs.requests) +} + +// Has checks whether a pending request with a given identifier exists in the list +func (rs *PendingRequests) Has(id string) bool { + rs.mu.RLock() + defer rs.mu.RUnlock() + _, ok := rs.requests[id] + return ok +} + +// tryLock is used to avoid double-completion of the same request. +// it returns a request instance if it isn't processing yet, returns an error otherwise. +func (rs *PendingRequests) tryLock(id string) (*Request, error) { + rs.mu.Lock() + defer rs.mu.Unlock() + if tx, ok := rs.requests[id]; ok { + if tx.locked { + return nil, ErrSignReqInProgress + } + tx.locked = true + return tx, nil + } + return nil, ErrSignReqNotFound +} + +// complete removes the request from the list if there is no error or an error is non-transient +func (rs *PendingRequests) complete(request *Request, hash gethcommon.Hash, err error) { + rs.mu.Lock() + defer rs.mu.Unlock() + + request.locked = false + + go NotifyOnReturn(request, err) + + if err != nil && isTransient(err) { + return + } + + delete(rs.requests, request.ID) + + // hash is updated only if err is nil, but transaction is not removed from a queue + result := Result{Error: err} + if err == nil { + result.Hash = hash + } + + request.result <- result +} diff --git a/sign/pending_requests_test.go b/sign/pending_requests_test.go new file mode 100644 index 000000000..40492e352 --- /dev/null +++ b/sign/pending_requests_test.go @@ -0,0 +1,216 @@ +package sign + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/ethereum/go-ethereum/accounts/keystore" + gethcommon "github.com/ethereum/go-ethereum/common" + + "github.com/status-im/status-go/geth/account" + "github.com/stretchr/testify/suite" +) + +const ( + correctPassword = "password-correct" + wrongPassword = "password-wrong" +) + +func testVerifyFunc(password string) (*account.SelectedExtKey, error) { + if password == correctPassword { + return nil, nil + } + + return nil, keystore.ErrDecrypt +} + +func TestPendingRequestsSuite(t *testing.T) { + suite.Run(t, new(PendingRequestsSuite)) +} + +type PendingRequestsSuite struct { + suite.Suite + pendingRequests *PendingRequests +} + +func (s *PendingRequestsSuite) SetupTest() { + s.pendingRequests = NewPendingRequests() +} + +func (s *PendingRequestsSuite) defaultCompleteFunc() completeFunc { + hash := gethcommon.Hash{1} + return func(acc *account.SelectedExtKey) (gethcommon.Hash, error) { + s.Nil(acc, "account should be `nil`") + return hash, nil + } +} + +func (s *PendingRequestsSuite) delayedCompleteFunc() completeFunc { + hash := gethcommon.Hash{1} + return func(acc *account.SelectedExtKey) (gethcommon.Hash, error) { + time.Sleep(10 * time.Millisecond) + s.Nil(acc, "account should be `nil`") + return hash, nil + } +} + +func (s *PendingRequestsSuite) errorCompleteFunc(err error) completeFunc { + hash := gethcommon.Hash{1} + return func(acc *account.SelectedExtKey) (gethcommon.Hash, error) { + s.Nil(acc, "account should be `nil`") + return hash, err + } +} + +func (s *PendingRequestsSuite) TestGet() { + req, err := s.pendingRequests.Add(context.Background(), nil, s.defaultCompleteFunc()) + s.NoError(err) + for i := 2; i > 0; i-- { + actualRequest, err := s.pendingRequests.Get(req.ID) + s.NoError(err) + s.Equal(req, actualRequest) + } +} + +func (s *PendingRequestsSuite) testComplete(password string, hash gethcommon.Hash, completeFunc completeFunc) (string, error) { + req, err := s.pendingRequests.Add(context.Background(), nil, completeFunc) + s.NoError(err) + + s.True(s.pendingRequests.Has(req.ID), "sign request should exist") + + hash2, err := s.pendingRequests.Approve(req.ID, password, testVerifyFunc) + s.Equal(hash, hash2, "hashes should match") + + return req.ID, err +} + +func (s *PendingRequestsSuite) TestCompleteSuccess() { + id, err := s.testComplete(correctPassword, gethcommon.Hash{1}, s.defaultCompleteFunc()) + s.NoError(err, "no errors should be there") + + s.False(s.pendingRequests.Has(id), "sign request should not exist") +} + +func (s *PendingRequestsSuite) TestCompleteTransientError() { + hash := gethcommon.Hash{} + id, err := s.testComplete(wrongPassword, hash, s.errorCompleteFunc(keystore.ErrDecrypt)) + s.Equal(keystore.ErrDecrypt, err, "error value should be preserved") + + s.True(s.pendingRequests.Has(id)) + // verify that you are able to re-approve it after a transient error + _, err = s.pendingRequests.tryLock(id) + s.NoError(err) +} + +func (s *PendingRequestsSuite) TestCompleteError() { + hash := gethcommon.Hash{1} + expectedError := errors.New("test") + + id, err := s.testComplete(correctPassword, hash, s.errorCompleteFunc(expectedError)) + + s.Equal(expectedError, err, "error value should be preserved") + + s.False(s.pendingRequests.Has(id)) +} + +func (s PendingRequestsSuite) TestMultipleComplete() { + id, err := s.testComplete(correctPassword, gethcommon.Hash{1}, s.defaultCompleteFunc()) + s.NoError(err, "no errors should be there") + + _, err = s.pendingRequests.Approve(id, correctPassword, testVerifyFunc) + + s.Equal(ErrSignReqNotFound, err) +} + +func (s PendingRequestsSuite) TestConcurrentComplete() { + req, err := s.pendingRequests.Add(context.Background(), nil, s.delayedCompleteFunc()) + s.NoError(err) + + s.True(s.pendingRequests.Has(req.ID), "sign request should exist") + + approved := 0 + tried := 0 + + for i := 10; i > 0; i-- { + go func() { + _, err = s.pendingRequests.Approve(req.ID, correctPassword, testVerifyFunc) + if err == nil { + approved++ + } + tried++ + }() + } + + s.pendingRequests.Wait(req.ID, 10*time.Second) + + s.False(s.pendingRequests.Has(req.ID), "sign request should exist") + + s.Equal(approved, 1, "request should be approved only once") + s.Equal(tried, 10, "request should be tried to approve 10 times") +} + +func (s PendingRequestsSuite) TestWaitSuccess() { + req, err := s.pendingRequests.Add(context.Background(), nil, s.defaultCompleteFunc()) + s.NoError(err) + + s.True(s.pendingRequests.Has(req.ID), "sign request should exist") + + go func() { + _, err := s.pendingRequests.Approve(req.ID, correctPassword, testVerifyFunc) + s.NoError(err) + }() + + result := s.pendingRequests.Wait(req.ID, 1*time.Second) + s.NoError(result.Error) +} + +func (s PendingRequestsSuite) TestDiscard() { + req, err := s.pendingRequests.Add(context.Background(), nil, s.defaultCompleteFunc()) + s.NoError(err) + + s.True(s.pendingRequests.Has(req.ID), "sign request should exist") + + s.Equal(ErrSignReqNotFound, s.pendingRequests.Discard("")) + + go func() { + // enough to make it be called after Wait + time.Sleep(time.Millisecond) + s.NoError(s.pendingRequests.Discard(req.ID)) + }() + + result := s.pendingRequests.Wait(req.ID, 1*time.Second) + s.Equal(ErrSignReqDiscarded, result.Error) +} + +func (s PendingRequestsSuite) TestWaitFail() { + expectedError := errors.New("test-wait-fail") + req, err := s.pendingRequests.Add(context.Background(), nil, s.errorCompleteFunc(expectedError)) + s.NoError(err) + + s.True(s.pendingRequests.Has(req.ID), "sign request should exist") + + go func() { + _, err := s.pendingRequests.Approve(req.ID, correctPassword, testVerifyFunc) + s.Equal(expectedError, err) + }() + + result := s.pendingRequests.Wait(req.ID, 1*time.Second) + s.Equal(expectedError, result.Error) +} + +func (s PendingRequestsSuite) TestWaitTimeout() { + req, err := s.pendingRequests.Add(context.Background(), nil, s.delayedCompleteFunc()) + s.NoError(err) + + s.True(s.pendingRequests.Has(req.ID), "sign request should exist") + + go func() { + _, err := s.pendingRequests.Approve(req.ID, correctPassword, testVerifyFunc) + s.NoError(err) + }() + + result := s.pendingRequests.Wait(req.ID, 0*time.Second) + s.Equal(result.Error, ErrSignReqTimedOut) +} diff --git a/sign/request.go b/sign/request.go new file mode 100644 index 000000000..e3cfcf8a1 --- /dev/null +++ b/sign/request.go @@ -0,0 +1,36 @@ +package sign + +import ( + "context" + + "github.com/ethereum/go-ethereum/common" + "github.com/pborman/uuid" + "github.com/status-im/status-go/geth/account" +) + +type completeFunc func(*account.SelectedExtKey) (common.Hash, error) + +// Meta represents any metadata that could be attached to a signing request. +// It will be JSON-serialized and used in notifications to the API consumer. +type Meta interface{} + +// Request is a single signing request. +type Request struct { + ID string + Meta Meta + context context.Context + locked bool + completeFunc completeFunc + result chan Result +} + +func newRequest(ctx context.Context, meta Meta, completeFunc completeFunc) *Request { + return &Request{ + ID: uuid.New(), + Meta: meta, + context: ctx, + locked: false, + completeFunc: completeFunc, + result: make(chan Result, 1), + } +} diff --git a/sign/result.go b/sign/result.go new file mode 100644 index 000000000..eaa09db28 --- /dev/null +++ b/sign/result.go @@ -0,0 +1,9 @@ +package sign + +import "github.com/ethereum/go-ethereum/common" + +// Result is a result of a signing request, error or successful +type Result struct { + Hash common.Hash + Error error +} diff --git a/t/e2e/jail/jail_rpc_test.go b/t/e2e/jail/jail_rpc_test.go index e513be281..37b1e333a 100644 --- a/t/e2e/jail/jail_rpc_test.go +++ b/t/e2e/jail/jail_rpc_test.go @@ -13,7 +13,7 @@ import ( "github.com/status-im/status-go/geth/jail" "github.com/status-im/status-go/geth/params" "github.com/status-im/status-go/geth/signal" - "github.com/status-im/status-go/geth/transactions" + "github.com/status-im/status-go/sign" e2e "github.com/status-im/status-go/t/e2e" . "github.com/status-im/status-go/t/utils" "github.com/stretchr/testify/suite" @@ -126,7 +126,7 @@ func (s *JailRPCTestSuite) TestContractDeployment() { unmarshalErr := json.Unmarshal([]byte(jsonEvent), &envelope) s.NoError(unmarshalErr, "cannot unmarshal JSON: %s", jsonEvent) - if envelope.Type == transactions.EventTransactionQueued { + if envelope.Type == sign.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) s.T().Logf("transaction queued and will be completed shortly, id: %v", event["id"]) @@ -284,7 +284,7 @@ func (s *JailRPCTestSuite) TestJailVMPersistence() { s.T().Errorf("cannot unmarshal event's JSON: %s", jsonEvent) return } - if envelope.Type == transactions.EventTransactionQueued { + if envelope.Type == sign.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) s.T().Logf("Transaction queued (will be completed shortly): {id: %s}\n", event["id"].(string)) diff --git a/t/e2e/suites.go b/t/e2e/suites.go index 376ea5395..e8a8883ed 100644 --- a/t/e2e/suites.go +++ b/t/e2e/suites.go @@ -5,6 +5,7 @@ import ( whisper "github.com/ethereum/go-ethereum/whisper/whisperv6" "github.com/status-im/status-go/geth/api" + "github.com/status-im/status-go/sign" "github.com/status-im/status-go/geth/node" "github.com/status-im/status-go/geth/signal" @@ -123,9 +124,14 @@ func (s *BackendTestSuite) WhisperService() *whisper.Whisper { return whisperService } -// TxQueueManager returns a reference to the TxQueueManager. -func (s *BackendTestSuite) TxQueueManager() *transactions.Manager { - return s.Backend.TxQueueManager() +// Transactor returns a reference to the Transactor. +func (s *BackendTestSuite) Transactor() *transactions.Transactor { + return s.Backend.Transactor() +} + +// PendingSignRequests returns a reference to PendingSignRequests. +func (s *BackendTestSuite) PendingSignRequests() *sign.PendingRequests { + return s.Backend.PendingSignRequests() } func importTestAccounts(keyStoreDir string) (err error) { diff --git a/t/e2e/transactions/transactions_test.go b/t/e2e/transactions/transactions_test.go index 8ee495c5d..eeae4f84a 100644 --- a/t/e2e/transactions/transactions_test.go +++ b/t/e2e/transactions/transactions_test.go @@ -6,7 +6,6 @@ import ( "fmt" "math/big" "reflect" - "sync" "testing" "time" @@ -18,6 +17,7 @@ import ( "github.com/status-im/status-go/geth/params" "github.com/status-im/status-go/geth/signal" "github.com/status-im/status-go/geth/transactions" + "github.com/status-im/status-go/sign" e2e "github.com/status-im/status-go/t/e2e" . "github.com/status-im/status-go/t/utils" "github.com/stretchr/testify/suite" @@ -50,12 +50,11 @@ func (s *TransactionsTestSuite) TestCallRPCSendTransaction() { err := json.Unmarshal([]byte(rawSignal), &sg) s.NoError(err) - if sg.Type == transactions.EventTransactionQueued { + if sg.Type == sign.EventTransactionQueued { event := sg.Event.(map[string]interface{}) txID := event["id"].(string) txHash, err = s.Backend.CompleteTransaction(string(txID), TestConfig.Account1.Password) s.NoError(err, "cannot complete queued transaction %s", txID) - close(transactionCompleted) } }) @@ -102,7 +101,7 @@ func (s *TransactionsTestSuite) TestCallRPCSendTransactionUpstream() { err := json.Unmarshal([]byte(rawSignal), &signalEnvelope) s.NoError(err) - if signalEnvelope.Type == transactions.EventTransactionQueued { + if signalEnvelope.Type == sign.EventTransactionQueued { event := signalEnvelope.Event.(map[string]interface{}) txID := event["id"].(string) @@ -155,11 +154,12 @@ func (s *TransactionsTestSuite) TestEmptyToFieldPreserved() { } err := json.Unmarshal([]byte(rawSignal), &sg) s.NoError(err) - if sg.Type == transactions.EventTransactionQueued { - var event transactions.SendTransactionEvent + if sg.Type == sign.EventTransactionQueued { + var event sign.SendTransactionEvent s.NoError(json.Unmarshal(sg.Event, &event)) - s.NotNil(event.Args.From) - s.Nil(event.Args.To) + args := event.Args.(map[string]interface{}) + s.NotNil(args["from"]) + s.Nil(args["to"]) _, err := s.Backend.CompleteTransaction(event.ID, TestConfig.Account1.Password) s.NoError(err) close(transactionCompleted) @@ -203,6 +203,8 @@ func (s *TransactionsTestSuite) TestSendContractTxCollision() { } s.testSendContractTx(initFunc, nil, "") + s.NoError(s.Backend.AccountManager().Logout()) + // Scenario 2: Both fields are filled with different values, expect an error inverted := func(source []byte) []byte { inverse := make([]byte, len(source)) @@ -245,7 +247,7 @@ func (s *TransactionsTestSuite) testSendContractTx(setInputAndDataValue initFunc err = json.Unmarshal([]byte(jsonEvent), &envelope) s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent)) - if envelope.Type == transactions.EventTransactionQueued { + if envelope.Type == sign.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) log.Info("transaction queued (will be completed shortly)", "id", event["id"].(string)) @@ -271,7 +273,7 @@ func (s *TransactionsTestSuite) testSendContractTx(setInputAndDataValue initFunc ) s.EqualError( err, - transactions.ErrInvalidCompleteTxSender.Error(), + sign.ErrInvalidCompleteTxSender.Error(), fmt.Sprintf("expected error on queued transaction[%v] not thrown", event["id"]), ) @@ -282,7 +284,11 @@ func (s *TransactionsTestSuite) testSendContractTx(setInputAndDataValue initFunc string(event["id"].(string)), TestConfig.Account1.Password, ) - s.NoError(err, fmt.Sprintf("cannot complete queued transaction[%v]", event["id"])) + if expectedError != nil { + s.Equal(expectedError, err) + } else { + s.NoError(err, fmt.Sprintf("cannot complete queued transaction[%v]", event["id"])) + } log.Info("contract transaction complete", "URL", "https://ropsten.etherscan.io/tx/"+txHash.Hex()) close(completeQueuedTransaction) @@ -319,7 +325,7 @@ func (s *TransactionsTestSuite) testSendContractTx(setInputAndDataValue initFunc s.Equal(txHashCheck.Hex(), txHash.Hex(), "transaction hash returned from SendTransaction is invalid") s.False(reflect.DeepEqual(txHashCheck, gethcommon.Hash{}), "transaction was never queued or completed") - s.Zero(s.TxQueueManager().TransactionQueue().Count(), "tx queue must be empty at this point") + s.Zero(s.PendingSignRequests().Count(), "tx queue must be empty at this point") } func (s *TransactionsTestSuite) TestSendEther() { @@ -341,7 +347,7 @@ func (s *TransactionsTestSuite) TestSendEther() { err = json.Unmarshal([]byte(jsonEvent), &envelope) s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent)) - if envelope.Type == transactions.EventTransactionQueued { + if envelope.Type == sign.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) log.Info("transaction queued (will be completed shortly)", "id", event["id"].(string)) @@ -365,7 +371,7 @@ func (s *TransactionsTestSuite) TestSendEther() { string(event["id"].(string)), TestConfig.Account1.Password) s.EqualError( err, - transactions.ErrInvalidCompleteTxSender.Error(), + sign.ErrInvalidCompleteTxSender.Error(), fmt.Sprintf("expected error on queued transaction[%v] not thrown", event["id"]), ) @@ -399,7 +405,7 @@ func (s *TransactionsTestSuite) TestSendEther() { s.Equal(txHashCheck.Hex(), txHash.Hex(), "transaction hash returned from SendTransaction is invalid") s.False(reflect.DeepEqual(txHashCheck, gethcommon.Hash{}), "transaction was never queued or completed") - s.Zero(s.Backend.TxQueueManager().TransactionQueue().Count(), "tx queue must be empty at this point") + s.Zero(s.Backend.PendingSignRequests().Count(), "tx queue must be empty at this point") } func (s *TransactionsTestSuite) TestSendEtherTxUpstream() { @@ -424,7 +430,7 @@ func (s *TransactionsTestSuite) TestSendEtherTxUpstream() { err = json.Unmarshal([]byte(jsonEvent), &envelope) s.NoError(err, "cannot unmarshal JSON: %s", jsonEvent) - if envelope.Type == transactions.EventTransactionQueued { + if envelope.Type == sign.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) log.Info("transaction queued (will be completed shortly)", "id", event["id"].(string)) @@ -456,7 +462,7 @@ func (s *TransactionsTestSuite) TestSendEtherTxUpstream() { } s.Equal(txHash.Hex(), txHashCheck.Hex(), "transaction hash returned from SendTransaction is invalid") - s.Zero(s.Backend.TxQueueManager().TransactionQueue().Count(), "tx queue must be empty at this point") + s.Zero(s.Backend.PendingSignRequests().Count(), "tx queue must be empty at this point") } func (s *TransactionsTestSuite) TestDoubleCompleteQueuedTransactions() { @@ -478,7 +484,7 @@ func (s *TransactionsTestSuite) TestDoubleCompleteQueuedTransactions() { err := json.Unmarshal([]byte(jsonEvent), &envelope) s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent)) - if envelope.Type == transactions.EventTransactionQueued { + if envelope.Type == sign.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) txID := string(event["id"].(string)) log.Info("transaction queued (will be failed and completed on the second call)", "id", txID) @@ -488,7 +494,7 @@ func (s *TransactionsTestSuite) TestDoubleCompleteQueuedTransactions() { _, err = s.Backend.CompleteTransaction(txID, TestConfig.Account1.Password+"wrong") s.EqualError(err, keystore.ErrDecrypt.Error()) - s.Equal(1, s.TxQueueManager().TransactionQueue().Count(), "txqueue cannot be empty, as tx has failed") + s.Equal(1, s.PendingSignRequests().Count(), "txqueue cannot be empty, as tx has failed") // now try to complete transaction, but with the correct password txHash, err = s.Backend.CompleteTransaction(txID, TestConfig.Account1.Password) @@ -498,7 +504,7 @@ func (s *TransactionsTestSuite) TestDoubleCompleteQueuedTransactions() { close(completeQueuedTransaction) } - if envelope.Type == transactions.EventTransactionFailed { + if envelope.Type == sign.EventTransactionFailed { event := envelope.Event.(map[string]interface{}) log.Info("transaction return event received", "id", event["id"].(string)) @@ -529,7 +535,7 @@ func (s *TransactionsTestSuite) TestDoubleCompleteQueuedTransactions() { s.Equal(txHashCheck.Hex(), txHash.Hex(), "transaction hash returned from SendTransaction is invalid") s.False(reflect.DeepEqual(txHashCheck, gethcommon.Hash{}), "transaction was never queued or completed") - s.Zero(s.Backend.TxQueueManager().TransactionQueue().Count(), "tx queue must be empty at this point") + s.Zero(s.Backend.PendingSignRequests().Count(), "tx queue must be empty at this point") s.True(txFailedEventCalled, "expected tx failure signal is not received") } @@ -539,9 +545,6 @@ func (s *TransactionsTestSuite) TestDiscardQueuedTransaction() { EnsureNodeSync(s.Backend.StatusNode().EnsureSync) - // reset queue - s.Backend.TxQueueManager().TransactionQueue().Reset() - // log into account from which transactions will be sent s.NoError(s.Backend.SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password)) @@ -554,12 +557,12 @@ func (s *TransactionsTestSuite) TestDiscardQueuedTransaction() { err := json.Unmarshal([]byte(jsonEvent), &envelope) s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent)) - if envelope.Type == transactions.EventTransactionQueued { + if envelope.Type == sign.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) txID := string(event["id"].(string)) log.Info("transaction queued (will be discarded soon)", "id", txID) - s.True(s.Backend.TxQueueManager().TransactionQueue().Has(txID), "txqueue should still have test tx") + s.True(s.Backend.PendingSignRequests().Has(txID), "txqueue should still have test tx") // discard err := s.Backend.DiscardTransaction(txID) @@ -570,18 +573,18 @@ func (s *TransactionsTestSuite) TestDiscardQueuedTransaction() { s.EqualError(err, "transaction hash not found", "expects tx not found, but call to CompleteTransaction succeeded") time.Sleep(1 * time.Second) // make sure that tx complete signal propagates - s.False(s.Backend.TxQueueManager().TransactionQueue().Has(txID), + s.False(s.Backend.PendingSignRequests().Has(txID), fmt.Sprintf("txqueue should not have test tx at this point (it should be discarded): %s", txID)) close(completeQueuedTransaction) } - if envelope.Type == transactions.EventTransactionFailed { + if envelope.Type == sign.EventTransactionFailed { event := envelope.Event.(map[string]interface{}) log.Info("transaction return event received", "id", event["id"].(string)) receivedErrMessage := event["error_message"].(string) - expectedErrMessage := transactions.ErrQueuedTxDiscarded.Error() + expectedErrMessage := sign.ErrSignReqDiscarded.Error() s.Equal(receivedErrMessage, expectedErrMessage) receivedErrCode := event["error_code"].(string) @@ -597,16 +600,16 @@ func (s *TransactionsTestSuite) TestDiscardQueuedTransaction() { To: account.ToAddress(TestConfig.Account2.Address), Value: (*hexutil.Big)(big.NewInt(1000000000000)), }) - s.EqualError(err, transactions.ErrQueuedTxDiscarded.Error(), "transaction is expected to be discarded") + s.EqualError(err, sign.ErrSignReqDiscarded.Error(), "transaction is expected to be discarded") select { case <-completeQueuedTransaction: - case <-time.After(time.Minute): + case <-time.After(10 * time.Second): s.FailNow("test timed out") } s.True(reflect.DeepEqual(txHashCheck, gethcommon.Hash{}), "transaction returned hash, while it shouldn't") - s.Zero(s.Backend.TxQueueManager().TransactionQueue().Count(), "tx queue must be empty at this point") + s.Zero(s.Backend.PendingSignRequests().Count(), "tx queue must be empty at this point") s.True(txFailedEventCalled, "expected tx failure signal is not received") } @@ -614,8 +617,6 @@ func (s *TransactionsTestSuite) TestCompleteMultipleQueuedTransactions() { s.setupLocalNode() defer s.StopTestBackend() - s.TxQueueManager().TransactionQueue().Reset() - // log into account from which transactions will be sent err := s.Backend.SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password) s.NoError(err) @@ -629,9 +630,6 @@ func (s *TransactionsTestSuite) TestDiscardMultipleQueuedTransactions() { EnsureNodeSync(s.Backend.StatusNode().EnsureSync) - // reset queue - s.Backend.TxQueueManager().TransactionQueue().Reset() - // log into account from which transactions will be sent s.NoError(s.Backend.SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password)) @@ -645,22 +643,22 @@ func (s *TransactionsTestSuite) TestDiscardMultipleQueuedTransactions() { var envelope signal.Envelope err := json.Unmarshal([]byte(jsonEvent), &envelope) s.NoError(err) - if envelope.Type == transactions.EventTransactionQueued { + if envelope.Type == sign.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) txID := string(event["id"].(string)) log.Info("transaction queued (will be discarded soon)", "id", txID) - s.True(s.Backend.TxQueueManager().TransactionQueue().Has(txID), + s.True(s.Backend.PendingSignRequests().Has(txID), "txqueue should still have test tx") txIDs <- txID } - if envelope.Type == transactions.EventTransactionFailed { + if envelope.Type == sign.EventTransactionFailed { event := envelope.Event.(map[string]interface{}) log.Info("transaction return event received", "id", event["id"].(string)) receivedErrMessage := event["error_message"].(string) - expectedErrMessage := transactions.ErrQueuedTxDiscarded.Error() + expectedErrMessage := sign.ErrSignReqDiscarded.Error() s.Equal(receivedErrMessage, expectedErrMessage) receivedErrCode := event["error_code"].(string) @@ -682,11 +680,11 @@ func (s *TransactionsTestSuite) TestDiscardMultipleQueuedTransactions() { To: account.ToAddress(TestConfig.Account2.Address), Value: (*hexutil.Big)(big.NewInt(1000000000000)), }) - require.EqualError(err, transactions.ErrQueuedTxDiscarded.Error()) + require.EqualError(err, sign.ErrSignReqDiscarded.Error()) require.Equal(gethcommon.Hash{}, txHashCheck, "transaction returned hash, while it shouldn't") } - txQueueManager := s.Backend.TxQueueManager() + signRequests := s.Backend.PendingSignRequests() // wait for transactions, and discard immediately discardTxs := func(txIDs []string) { @@ -710,7 +708,7 @@ func (s *TransactionsTestSuite) TestDiscardMultipleQueuedTransactions() { for _, txID := range txIDs { require.False( - txQueueManager.TransactionQueue().Has(txID), + signRequests.Has(txID), "txqueue should not have test tx at this point (it should be discarded): %s", txID, ) @@ -735,8 +733,9 @@ func (s *TransactionsTestSuite) TestDiscardMultipleQueuedTransactions() { case <-time.After(1 * time.Minute): s.FailNow("test timed out") } + time.Sleep(5 * time.Second) - s.Zero(s.Backend.TxQueueManager().TransactionQueue().Count(), "tx queue must be empty at this point") + s.Zero(s.Backend.PendingSignRequests().Count(), "tx queue must be empty at this point") } func (s *TransactionsTestSuite) TestNonExistentQueuedTransactions() { @@ -752,68 +751,13 @@ func (s *TransactionsTestSuite) TestNonExistentQueuedTransactions() { // try completing non-existing transaction _, err := s.Backend.CompleteTransaction("some-bad-transaction-id", TestConfig.Account1.Password) s.Error(err, "error expected and not received") - s.EqualError(err, transactions.ErrQueuedTxIDNotFound.Error()) -} - -func (s *TransactionsTestSuite) TestEvictionOfQueuedTransactions() { - s.StartTestBackend() - defer s.StopTestBackend() - - var m sync.Mutex - txCount := 0 - txIDs := [transactions.DefaultTxQueueCap + 5 + 10]string{} - - signal.SetDefaultNodeNotificationHandler(func(rawSignal string) { - var sg signal.Envelope - err := json.Unmarshal([]byte(rawSignal), &sg) - s.NoError(err) - - if sg.Type == transactions.EventTransactionQueued { - event := sg.Event.(map[string]interface{}) - txID := event["id"].(string) - m.Lock() - txIDs[txCount] = string(txID) - txCount++ - m.Unlock() - } - }) - - // reset queue - s.Backend.TxQueueManager().TransactionQueue().Reset() - - // log into account from which transactions will be sent - s.NoError(s.Backend.SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password)) - - txQueue := s.Backend.TxQueueManager().TransactionQueue() - s.Zero(txQueue.Count(), "transaction count should be zero") - - for j := 0; j < 10; j++ { - go s.Backend.SendTransaction(context.TODO(), transactions.SendTxArgs{}) // nolint: errcheck - } - time.Sleep(2 * time.Second) - s.Equal(10, txQueue.Count(), "transaction count should be 10") - - for i := 0; i < transactions.DefaultTxQueueCap+5; i++ { // stress test by hitting with lots of goroutines - go s.Backend.SendTransaction(context.TODO(), transactions.SendTxArgs{}) // nolint: errcheck - } - time.Sleep(5 * time.Second) - - s.True(txQueue.Count() <= transactions.DefaultTxQueueCap, "transaction count should be %d (or %d): got %d", transactions.DefaultTxQueueCap, transactions.DefaultTxQueueCap-1, txQueue.Count()) - - m.Lock() - for _, txID := range txIDs { - txQueue.Remove(txID) - } - m.Unlock() - s.Zero(txQueue.Count(), "transaction count should be zero: %d", txQueue.Count()) + s.EqualError(err, sign.ErrSignReqNotFound.Error()) } func (s *TransactionsTestSuite) TestCompleteMultipleQueuedTransactionsUpstream() { s.setupUpstreamNode() defer s.StopTestBackend() - s.TxQueueManager().TransactionQueue().Reset() - // log into account from which transactions will be sent err := s.Backend.SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password) s.NoError(err) @@ -849,7 +793,7 @@ func (s *TransactionsTestSuite) sendConcurrentTransactions(testTxCount int) { err := json.Unmarshal([]byte(jsonEvent), &envelope) require.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent)) - if envelope.Type == transactions.EventTransactionQueued { + if envelope.Type == sign.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) txID := string(event["id"].(string)) log.Info("transaction queued (will be completed in a single call, once aggregated)", "id", txID) @@ -892,7 +836,7 @@ func (s *TransactionsTestSuite) sendConcurrentTransactions(testTxCount int) { for _, txID := range txIDs { s.False( - s.Backend.TxQueueManager().TransactionQueue().Has(txID), + s.Backend.PendingSignRequests().Has(txID), "txqueue should not have test tx at this point (it should be completed)", ) } @@ -918,5 +862,5 @@ func (s *TransactionsTestSuite) sendConcurrentTransactions(testTxCount int) { s.FailNow("test timed out") } - s.Zero(s.TxQueueManager().TransactionQueue().Count(), "queue should be empty") + s.Zero(s.PendingSignRequests().Count(), "queue should be empty") }