From a9eb5a7d2b4fe3457c843102fdefd1269cb2926f Mon Sep 17 00:00:00 2001 From: Igor Mandrigin Date: Mon, 9 Apr 2018 10:18:22 +0200 Subject: [PATCH] Generalize signing requests. We need to be able to sign more than just transactions to make DApps work properly. This change separates signing requests from the transactions and make it more general to prepare to intoduce different types of signing requests. This change is designed to preserve status APIs, so it is backward-comparible with the current API bindings. --- geth/api/api.go | 14 +- geth/api/backend.go | 102 +++--- geth/params/defaults.go | 4 + geth/transactions/errors.go | 10 - geth/transactions/queue.go | 234 ------------ geth/transactions/queue_test.go | 131 ------- .../{ethtxclient.go => rpc_wrapper.go} | 36 +- geth/transactions/rpcclient_mock.go | 47 --- geth/transactions/transactor.go | 205 +++++++++++ ...eue_manager_test.go => transactor_test.go} | 290 ++++++++------- geth/transactions/txqueue_manager.go | 333 ------------------ geth/transactions/types.go | 40 ++- geth/transactions/utils.go | 89 ----- lib/library_test_utils.go | 68 ++-- sign/README.md | 13 + sign/errors.go | 36 ++ {geth/transactions => sign}/notifications.go | 70 ++-- sign/pending_requests.go | 172 +++++++++ sign/pending_requests_test.go | 216 ++++++++++++ sign/request.go | 36 ++ sign/result.go | 9 + t/e2e/jail/jail_rpc_test.go | 6 +- t/e2e/suites.go | 12 +- t/e2e/transactions/transactions_test.go | 152 +++----- 24 files changed, 1076 insertions(+), 1249 deletions(-) delete mode 100644 geth/transactions/errors.go delete mode 100644 geth/transactions/queue.go delete mode 100644 geth/transactions/queue_test.go rename geth/transactions/{ethtxclient.go => rpc_wrapper.go} (61%) delete mode 100644 geth/transactions/rpcclient_mock.go create mode 100644 geth/transactions/transactor.go rename geth/transactions/{txqueue_manager_test.go => transactor_test.go} (56%) delete mode 100644 geth/transactions/txqueue_manager.go delete mode 100644 geth/transactions/utils.go create mode 100644 sign/README.md create mode 100644 sign/errors.go rename {geth/transactions => sign}/notifications.go (55%) create mode 100644 sign/pending_requests.go create mode 100644 sign/pending_requests_test.go create mode 100644 sign/request.go create mode 100644 sign/result.go diff --git a/geth/api/api.go b/geth/api/api.go index 625dbb757..33693ef98 100644 --- a/geth/api/api.go +++ b/geth/api/api.go @@ -12,6 +12,7 @@ import ( "github.com/status-im/status-go/geth/node" "github.com/status-im/status-go/geth/params" "github.com/status-im/status-go/geth/transactions" + "github.com/status-im/status-go/sign" ) // StatusAPI provides API to access Status related functionality. @@ -49,9 +50,14 @@ func (api *StatusAPI) JailManager() jail.Manager { return api.b.JailManager() } -// TxQueueManager returns reference to account manager -func (api *StatusAPI) TxQueueManager() *transactions.Manager { - return api.b.TxQueueManager() +// Transactor returns reference to a status transactor +func (api *StatusAPI) Transactor() *transactions.Transactor { + return api.b.Transactor() +} + +// PendingSignRequests returns reference to a list of current sign requests +func (api *StatusAPI) PendingSignRequests() *sign.PendingRequests { + return api.b.PendingSignRequests() } // StartNode start Status node, fails if node is already started @@ -156,7 +162,7 @@ func (api *StatusAPI) CompleteTransaction(id string, password string) (gethcommo } // CompleteTransactions instructs backend to complete sending of multiple transactions -func (api *StatusAPI) CompleteTransactions(ids []string, password string) map[string]transactions.Result { +func (api *StatusAPI) CompleteTransactions(ids []string, password string) map[string]sign.Result { return api.b.CompleteTransactions(ids, password) } diff --git a/geth/api/backend.go b/geth/api/backend.go index 78d43317c..aafbecedb 100644 --- a/geth/api/backend.go +++ b/geth/api/backend.go @@ -16,6 +16,7 @@ import ( "github.com/status-im/status-go/geth/params" "github.com/status-im/status-go/geth/signal" "github.com/status-im/status-go/geth/transactions" + "github.com/status-im/status-go/sign" ) const ( @@ -32,14 +33,15 @@ var ( // StatusBackend implements Status.im service type StatusBackend struct { - mu sync.Mutex - statusNode *node.StatusNode - accountManager *account.Manager - txQueueManager *transactions.Manager - jailManager jail.Manager - newNotification fcm.NotificationConstructor - connectionState ConnectionState - log log.Logger + mu sync.Mutex + statusNode *node.StatusNode + pendingSignRequests *sign.PendingRequests + accountManager *account.Manager + transactor *transactions.Transactor + jailManager jail.Manager + newNotification fcm.NotificationConstructor + connectionState ConnectionState + log log.Logger } // NewStatusBackend create a new NewStatusBackend instance @@ -47,18 +49,20 @@ func NewStatusBackend() *StatusBackend { defer log.Info("Status backend initialized") statusNode := node.New() + pendingSignRequests := sign.NewPendingRequests() accountManager := account.NewManager(statusNode) - txQueueManager := transactions.NewManager(statusNode) + transactor := transactions.NewTransactor(pendingSignRequests) jailManager := jail.New(statusNode) notificationManager := fcm.NewNotification(fcmServerKey) return &StatusBackend{ - statusNode: statusNode, - accountManager: accountManager, - jailManager: jailManager, - txQueueManager: txQueueManager, - newNotification: notificationManager, - log: log.New("package", "status-go/geth/api.StatusBackend"), + pendingSignRequests: pendingSignRequests, + statusNode: statusNode, + accountManager: accountManager, + jailManager: jailManager, + transactor: transactor, + newNotification: notificationManager, + log: log.New("package", "status-go/geth/api.StatusBackend"), } } @@ -77,9 +81,14 @@ func (b *StatusBackend) JailManager() jail.Manager { return b.jailManager } -// TxQueueManager returns reference to transactions manager -func (b *StatusBackend) TxQueueManager() *transactions.Manager { - return b.txQueueManager +// Transactor returns reference to a status transactor +func (b *StatusBackend) Transactor() *transactions.Transactor { + return b.transactor +} + +// PendingSignRequests returns reference to a list of current sign requests +func (b *StatusBackend) PendingSignRequests() *sign.PendingRequests { + return b.pendingSignRequests } // IsNodeRunning confirm that node is running @@ -117,9 +126,9 @@ func (b *StatusBackend) startNode(config *params.NodeConfig) (err error) { return err } signal.Send(signal.Envelope{Type: signal.EventNodeStarted}) - // tx queue manager should be started after node is started, it depends - // on rpc client being created - b.txQueueManager.Start(config.NetworkID) + + b.transactor.SetNetworkID(config.NetworkID) + b.transactor.SetRPCClient(b.statusNode.RPCClient()) if err := b.registerHandlers(); err != nil { b.log.Error("Handler registration failed", "err", err) } @@ -142,7 +151,6 @@ func (b *StatusBackend) stopNode() error { if !b.IsNodeRunning() { return node.ErrNoRunningNode } - b.txQueueManager.Stop() b.jailManager.Stop() defer signal.Send(signal.Envelope{Type: signal.EventNodeStopped}) return b.statusNode.Stop() @@ -193,18 +201,7 @@ func (b *StatusBackend) CallRPC(inputJSON string) string { // SendTransaction creates a new transaction and waits until it's complete. func (b *StatusBackend) SendTransaction(ctx context.Context, args transactions.SendTxArgs) (hash gethcommon.Hash, err error) { - if ctx == nil { - ctx = context.Background() - } - tx := transactions.Create(ctx, args) - if err = b.txQueueManager.QueueTransaction(tx); err != nil { - return hash, err - } - rst := b.txQueueManager.WaitForTransaction(tx) - if rst.Error != nil { - return hash, rst.Error - } - return rst.Hash, nil + return b.transactor.SendTransaction(ctx, args) } func (b *StatusBackend) getVerifiedAccount(password string) (*account.SelectedExtKey, error) { @@ -227,21 +224,15 @@ func (b *StatusBackend) getVerifiedAccount(password string) (*account.SelectedEx // CompleteTransaction instructs backend to complete sending of a given transaction func (b *StatusBackend) CompleteTransaction(id string, password string) (hash gethcommon.Hash, err error) { - selectedAccount, err := b.getVerifiedAccount(password) - if err != nil { - _ = b.txQueueManager.NotifyErrored(id, err) - return hash, err - } - - return b.txQueueManager.CompleteTransaction(id, selectedAccount) + return b.pendingSignRequests.Approve(id, password, b.getVerifiedAccount) } // CompleteTransactions instructs backend to complete sending of multiple transactions -func (b *StatusBackend) CompleteTransactions(ids []string, password string) map[string]transactions.Result { - results := make(map[string]transactions.Result) +func (b *StatusBackend) CompleteTransactions(ids []string, password string) map[string]sign.Result { + results := make(map[string]sign.Result) for _, txID := range ids { txHash, txErr := b.CompleteTransaction(txID, password) - results[txID] = transactions.Result{ + results[txID] = sign.Result{ Hash: txHash, Error: txErr, } @@ -251,7 +242,7 @@ func (b *StatusBackend) CompleteTransactions(ids []string, password string) map[ // DiscardTransaction discards a given transaction from transaction queue func (b *StatusBackend) DiscardTransaction(id string) error { - return b.txQueueManager.DiscardTransaction(id) + return b.pendingSignRequests.Discard(id) } // DiscardTransactions discards given multiple transactions from transaction queue @@ -273,13 +264,30 @@ func (b *StatusBackend) registerHandlers() error { if rpcClient == nil { return node.ErrRPCClient } - rpcClient.RegisterHandler("eth_accounts", func(context.Context, ...interface{}) (interface{}, error) { + + rpcClient.RegisterHandler(params.AccountsMethodName, func(context.Context, ...interface{}) (interface{}, error) { return b.AccountManager().Accounts() }) - rpcClient.RegisterHandler("eth_sendTransaction", b.txQueueManager.SendTransactionRPCHandler) + + rpcClient.RegisterHandler(params.SendTransactionMethodName, func(ctx context.Context, rpcParams ...interface{}) (interface{}, error) { + txArgs, err := transactions.RPCCalltoSendTxArgs(rpcParams...) + if err != nil { + return nil, err + } + + hash, err := b.SendTransaction(ctx, txArgs) + if err != nil { + return nil, err + } + + return hash.Hex(), err + }) + return nil } +// + // ConnectionChange handles network state changes logic. func (b *StatusBackend) ConnectionChange(state ConnectionState) { b.log.Info("Network state change", "old", b.connectionState, "new", state) diff --git a/geth/params/defaults.go b/geth/params/defaults.go index e138c8e40..656da7862 100644 --- a/geth/params/defaults.go +++ b/geth/params/defaults.go @@ -34,6 +34,10 @@ const ( // SendTransactionMethodName defines the name for a giving transaction. SendTransactionMethodName = "eth_sendTransaction" + // AccountsMethodName defines the name for listing the currently signed accounts. + AccountsMethodName = "eth_accounts" + // PersonalSignMethodName defines the name for `personal.sign` API. + PersonalSignMethodName = "personal_sign" // WSPort is a WS-RPC port (replaced in unit tests) WSPort = 8546 diff --git a/geth/transactions/errors.go b/geth/transactions/errors.go deleted file mode 100644 index 7eb38081e..000000000 --- a/geth/transactions/errors.go +++ /dev/null @@ -1,10 +0,0 @@ -package transactions - -import "errors" - -var ( - //ErrQueuedTxTimedOut - error transaction sending timed out - ErrQueuedTxTimedOut = errors.New("transaction sending timed out") - //ErrQueuedTxDiscarded - error transaction discarded - ErrQueuedTxDiscarded = errors.New("transaction has been discarded") -) diff --git a/geth/transactions/queue.go b/geth/transactions/queue.go deleted file mode 100644 index b8f24a410..000000000 --- a/geth/transactions/queue.go +++ /dev/null @@ -1,234 +0,0 @@ -package transactions - -import ( - "errors" - "sync" - "time" - - "github.com/ethereum/go-ethereum/accounts/keystore" - gethcommon "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/log" - "github.com/status-im/status-go/geth/account" -) - -const ( - // DefaultTxQueueCap defines how many items can be queued. - DefaultTxQueueCap = int(35) -) - -var ( - // ErrQueuedTxExist - transaction was already enqueued - ErrQueuedTxExist = errors.New("transaction already exist in queue") - //ErrQueuedTxIDNotFound - error transaction hash not found - ErrQueuedTxIDNotFound = errors.New("transaction hash not found") - //ErrQueuedTxInProgress - error transaction is in progress - ErrQueuedTxInProgress = errors.New("transaction is in progress") - //ErrInvalidCompleteTxSender - error transaction with invalid sender - ErrInvalidCompleteTxSender = errors.New("transaction can only be completed by the same account which created it") -) - -// remove from queue on any error (except for transient ones) and propagate -// defined as map[string]bool because errors from ethclient returned wrapped as jsonError -var transientErrs = map[string]bool{ - keystore.ErrDecrypt.Error(): true, // wrong password - ErrInvalidCompleteTxSender.Error(): true, // completing tx create from another account - account.ErrNoAccountSelected.Error(): true, // account not selected -} - -type empty struct{} - -// TxQueue is capped container that holds pending transactions -type TxQueue struct { - mu sync.RWMutex // to guard transactions map - transactions map[string]*QueuedTx - inprogress map[string]empty - - // TODO(dshulyak) research why eviction is done in separate goroutine - evictableIDs chan string - enqueueTicker chan struct{} - - // when this channel is closed, all queue channels processing must cease (incoming queue, processing queued items etc) - stopped chan struct{} - stoppedGroup sync.WaitGroup // to make sure that all routines are stopped - log log.Logger -} - -// newQueue creates a transaction queue. -func newQueue() *TxQueue { - - logger := log.New("package", "status-go/geth/transactions.TxQueue") - - logger.Info("initializing transaction queue") - return &TxQueue{ - transactions: make(map[string]*QueuedTx), - inprogress: make(map[string]empty), - evictableIDs: make(chan string, DefaultTxQueueCap), // will be used to evict in FIFO - enqueueTicker: make(chan struct{}), - log: logger, - } -} - -// Start starts enqueue and eviction loops -func (q *TxQueue) Start() { - q.log.Info("starting transaction queue") - - if q.stopped != nil { - return - } - - q.stopped = make(chan struct{}) - q.stoppedGroup.Add(1) - go q.evictionLoop() -} - -// Stop stops transaction enqueue and eviction loops -func (q *TxQueue) Stop() { - q.log.Info("stopping transaction queue") - - if q.stopped == nil { - return - } - - close(q.stopped) // stops all processing loops (enqueue, eviction etc) - q.stoppedGroup.Wait() - q.stopped = nil - - q.log.Info("finally stopped transaction queue") -} - -// evictionLoop frees up queue to accommodate another transaction item -func (q *TxQueue) evictionLoop() { - defer haltOnPanic() - evict := func() { - if q.Count() >= DefaultTxQueueCap { // eviction is required to accommodate another/last item - q.Remove(<-q.evictableIDs) - } - } - - for { - select { - case <-time.After(250 * time.Millisecond): // do not wait for manual ticks, check queue regularly - evict() - case <-q.enqueueTicker: // when manually requested - evict() - case <-q.stopped: - q.log.Info("transaction queue's eviction loop stopped") - q.stoppedGroup.Done() - return - } - } -} - -// Reset is to be used in tests only, as it simply creates new transaction map, w/o any cleanup of the previous one -func (q *TxQueue) Reset() { - q.mu.Lock() - defer q.mu.Unlock() - - q.transactions = make(map[string]*QueuedTx) - q.evictableIDs = make(chan string, DefaultTxQueueCap) - q.inprogress = make(map[string]empty) -} - -// Enqueue enqueues incoming transaction -func (q *TxQueue) Enqueue(tx *QueuedTx) error { - q.log.Info("enqueue transaction", "ID", tx.ID) - q.mu.RLock() - if _, ok := q.transactions[tx.ID]; ok { - q.mu.RUnlock() - return ErrQueuedTxExist - } - q.mu.RUnlock() - - // we can't hold a lock in this part - q.log.Debug("notifying eviction loop") - q.enqueueTicker <- struct{}{} // notify eviction loop that we are trying to insert new item - q.evictableIDs <- tx.ID // this will block when we hit DefaultTxQueueCap - q.log.Debug("notified eviction loop") - - q.mu.Lock() - q.transactions[tx.ID] = tx - q.mu.Unlock() - - // notify handler - q.log.Info("calling txEnqueueHandler") - return nil -} - -// Get returns transaction by transaction identifier -func (q *TxQueue) Get(id string) (*QueuedTx, error) { - q.mu.RLock() - defer q.mu.RUnlock() - - if tx, ok := q.transactions[id]; ok { - return tx, nil - } - return nil, ErrQueuedTxIDNotFound -} - -// LockInprogress returns error if transaction is already inprogress. -func (q *TxQueue) LockInprogress(id string) error { - q.mu.Lock() - defer q.mu.Unlock() - if _, ok := q.transactions[id]; ok { - if _, inprogress := q.inprogress[id]; inprogress { - return ErrQueuedTxInProgress - } - q.inprogress[id] = empty{} - return nil - } - return ErrQueuedTxIDNotFound -} - -// Remove removes transaction by transaction identifier -func (q *TxQueue) Remove(id string) { - q.mu.Lock() - defer q.mu.Unlock() - q.remove(id) -} - -func (q *TxQueue) remove(id string) { - delete(q.transactions, id) - delete(q.inprogress, id) -} - -// Done removes transaction from queue if no error or error is not transient -// and notify subscribers -func (q *TxQueue) Done(id string, hash gethcommon.Hash, err error) error { - q.mu.Lock() - defer q.mu.Unlock() - tx, ok := q.transactions[id] - if !ok { - return ErrQueuedTxIDNotFound - } - q.done(tx, hash, err) - return nil -} - -func (q *TxQueue) done(tx *QueuedTx, hash gethcommon.Hash, err error) { - delete(q.inprogress, tx.ID) - // hash is updated only if err is nil, but transaction is not removed from a queue - if err == nil { - q.transactions[tx.ID].Result <- Result{Hash: hash, Error: err} - q.remove(tx.ID) - return - } - if _, transient := transientErrs[err.Error()]; !transient { - q.transactions[tx.ID].Result <- Result{Error: err} - q.remove(tx.ID) - } -} - -// Count returns number of currently queued transactions -func (q *TxQueue) Count() int { - q.mu.RLock() - defer q.mu.RUnlock() - return len(q.transactions) -} - -// Has checks whether transaction with a given identifier exists in queue -func (q *TxQueue) Has(id string) bool { - q.mu.RLock() - defer q.mu.RUnlock() - _, ok := q.transactions[id] - return ok -} diff --git a/geth/transactions/queue_test.go b/geth/transactions/queue_test.go deleted file mode 100644 index e2e847a6e..000000000 --- a/geth/transactions/queue_test.go +++ /dev/null @@ -1,131 +0,0 @@ -package transactions - -import ( - "context" - "errors" - "testing" - - "github.com/ethereum/go-ethereum/accounts/keystore" - gethcommon "github.com/ethereum/go-ethereum/common" - - "github.com/stretchr/testify/suite" -) - -func TestQueueTestSuite(t *testing.T) { - suite.Run(t, new(QueueTestSuite)) -} - -type QueueTestSuite struct { - suite.Suite - queue *TxQueue -} - -func (s *QueueTestSuite) SetupTest() { - s.queue = newQueue() - s.queue.Start() -} - -func (s *QueueTestSuite) TearDownTest() { - s.queue.Stop() -} - -func (s *QueueTestSuite) TestLockInprogressTransaction() { - tx := Create(context.Background(), SendTxArgs{}) - s.NoError(s.queue.Enqueue(tx)) - enquedTx, err := s.queue.Get(tx.ID) - s.NoError(err) - s.NoError(s.queue.LockInprogress(tx.ID)) - s.Equal(tx, enquedTx) - - // verify that tx was marked as being inprogress - s.Equal(ErrQueuedTxInProgress, s.queue.LockInprogress(tx.ID)) -} - -func (s *QueueTestSuite) TestGetTransaction() { - tx := Create(context.Background(), SendTxArgs{}) - s.NoError(s.queue.Enqueue(tx)) - for i := 2; i > 0; i-- { - enquedTx, err := s.queue.Get(tx.ID) - s.NoError(err) - s.Equal(tx, enquedTx) - } -} - -func (s *QueueTestSuite) TestAlreadyEnqueued() { - tx := Create(context.Background(), SendTxArgs{}) - s.NoError(s.queue.Enqueue(tx)) - s.Equal(ErrQueuedTxExist, s.queue.Enqueue(tx)) - // try to enqueue another tx to double check locking - tx = Create(context.Background(), SendTxArgs{}) - s.NoError(s.queue.Enqueue(tx)) -} - -func (s *QueueTestSuite) testDone(hash gethcommon.Hash, err error) *QueuedTx { - tx := Create(context.Background(), SendTxArgs{}) - s.NoError(s.queue.Enqueue(tx)) - s.NoError(s.queue.Done(tx.ID, hash, err)) - return tx -} - -func (s *QueueTestSuite) TestDoneSuccess() { - hash := gethcommon.Hash{1} - tx := s.testDone(hash, nil) - // event is sent only if transaction was removed from a queue - select { - case rst := <-tx.Result: - s.NoError(rst.Error) - s.Equal(hash, rst.Hash) - s.False(s.queue.Has(tx.ID)) - default: - s.Fail("No event was sent to Done channel") - } -} - -func (s *QueueTestSuite) TestDoneTransientError() { - hash := gethcommon.Hash{1} - err := keystore.ErrDecrypt - tx := s.testDone(hash, err) - s.True(s.queue.Has(tx.ID)) - _, inp := s.queue.inprogress[tx.ID] - s.False(inp) -} - -func (s *QueueTestSuite) TestDoneError() { - hash := gethcommon.Hash{1} - err := errors.New("test") - tx := s.testDone(hash, err) - // event is sent only if transaction was removed from a queue - select { - case rst := <-tx.Result: - s.Equal(err, rst.Error) - s.NotEqual(hash, rst.Hash) - s.Equal(gethcommon.Hash{}, rst.Hash) - s.False(s.queue.Has(tx.ID)) - default: - s.Fail("No event was sent to Done channel") - } -} - -func (s QueueTestSuite) TestMultipleDone() { - hash := gethcommon.Hash{1} - err := keystore.ErrDecrypt - tx := s.testDone(hash, err) - s.NoError(s.queue.Done(tx.ID, hash, nil)) - s.Equal(ErrQueuedTxIDNotFound, s.queue.Done(tx.ID, hash, errors.New("timeout"))) -} - -func (s *QueueTestSuite) TestEviction() { - var first *QueuedTx - for i := 0; i < DefaultTxQueueCap; i++ { - tx := Create(context.Background(), SendTxArgs{}) - if first == nil { - first = tx - } - s.NoError(s.queue.Enqueue(tx)) - } - s.Equal(DefaultTxQueueCap, s.queue.Count()) - tx := Create(context.Background(), SendTxArgs{}) - s.NoError(s.queue.Enqueue(tx)) - s.Equal(DefaultTxQueueCap, s.queue.Count()) - s.False(s.queue.Has(first.ID)) -} diff --git a/geth/transactions/ethtxclient.go b/geth/transactions/rpc_wrapper.go similarity index 61% rename from geth/transactions/ethtxclient.go rename to geth/transactions/rpc_wrapper.go index dff5be2df..eb35c8ab7 100644 --- a/geth/transactions/ethtxclient.go +++ b/geth/transactions/rpc_wrapper.go @@ -9,40 +9,32 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/rlp" + "github.com/status-im/status-go/geth/rpc" ) -// EthTransactor provides methods to create transactions for ethereum network. -type EthTransactor interface { - PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) - ethereum.GasEstimator - ethereum.GasPricer - ethereum.TransactionSender +// rpcWrapper wraps provides convenient interface for ethereum RPC APIs we need for sending transactions +type rpcWrapper struct { + rpcClient *rpc.Client } -// EthTxClient wraps common API methods that are used to send transaction. -type EthTxClient struct { - c *rpc.Client -} - -// NewEthTxClient returns a new EthTxClient for client -func NewEthTxClient(client *rpc.Client) *EthTxClient { - return &EthTxClient{c: client} +func newRPCWrapper(client *rpc.Client) *rpcWrapper { + return &rpcWrapper{rpcClient: client} } // PendingNonceAt returns the account nonce of the given account in the pending state. // This is the nonce that should be used for the next transaction. -func (ec *EthTxClient) PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) { +func (w *rpcWrapper) PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) { var result hexutil.Uint64 - err := ec.c.CallContext(ctx, &result, "eth_getTransactionCount", account, "pending") + err := w.rpcClient.CallContext(ctx, &result, "eth_getTransactionCount", account, "pending") return uint64(result), err } // SuggestGasPrice retrieves the currently suggested gas price to allow a timely // execution of a transaction. -func (ec *EthTxClient) SuggestGasPrice(ctx context.Context) (*big.Int, error) { +func (w *rpcWrapper) SuggestGasPrice(ctx context.Context) (*big.Int, error) { var hex hexutil.Big - if err := ec.c.CallContext(ctx, &hex, "eth_gasPrice"); err != nil { + if err := w.rpcClient.CallContext(ctx, &hex, "eth_gasPrice"); err != nil { return nil, err } return (*big.Int)(&hex), nil @@ -52,9 +44,9 @@ func (ec *EthTxClient) SuggestGasPrice(ctx context.Context) (*big.Int, error) { // the current pending state of the backend blockchain. There is no guarantee that this is // the true gas limit requirement as other transactions may be added or removed by miners, // but it should provide a basis for setting a reasonable default. -func (ec *EthTxClient) EstimateGas(ctx context.Context, msg ethereum.CallMsg) (uint64, error) { +func (w *rpcWrapper) EstimateGas(ctx context.Context, msg ethereum.CallMsg) (uint64, error) { var hex hexutil.Uint64 - err := ec.c.CallContext(ctx, &hex, "eth_estimateGas", toCallArg(msg)) + err := w.rpcClient.CallContext(ctx, &hex, "eth_estimateGas", toCallArg(msg)) if err != nil { return 0, err } @@ -65,12 +57,12 @@ func (ec *EthTxClient) EstimateGas(ctx context.Context, msg ethereum.CallMsg) (u // // If the transaction was a contract creation use the TransactionReceipt method to get the // contract address after the transaction has been mined. -func (ec *EthTxClient) SendTransaction(ctx context.Context, tx *types.Transaction) error { +func (w *rpcWrapper) SendTransaction(ctx context.Context, tx *types.Transaction) error { data, err := rlp.EncodeToBytes(tx) if err != nil { return err } - return ec.c.CallContext(ctx, nil, "eth_sendRawTransaction", common.ToHex(data)) + return w.rpcClient.CallContext(ctx, nil, "eth_sendRawTransaction", common.ToHex(data)) } func toCallArg(msg ethereum.CallMsg) interface{} { diff --git a/geth/transactions/rpcclient_mock.go b/geth/transactions/rpcclient_mock.go deleted file mode 100644 index 187668fc6..000000000 --- a/geth/transactions/rpcclient_mock.go +++ /dev/null @@ -1,47 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: geth/transactions/txqueue_manager_test.go - -// Package transactions is a generated GoMock package. -package transactions - -import ( - reflect "reflect" - - gomock "github.com/golang/mock/gomock" - rpc "github.com/status-im/status-go/geth/rpc" -) - -// MocktestRPCClientProvider is a mock of testRPCClientProvider interface -type MocktestRPCClientProvider struct { - ctrl *gomock.Controller - recorder *MocktestRPCClientProviderMockRecorder -} - -// MocktestRPCClientProviderMockRecorder is the mock recorder for MocktestRPCClientProvider -type MocktestRPCClientProviderMockRecorder struct { - mock *MocktestRPCClientProvider -} - -// NewMocktestRPCClientProvider creates a new mock instance -func NewMocktestRPCClientProvider(ctrl *gomock.Controller) *MocktestRPCClientProvider { - mock := &MocktestRPCClientProvider{ctrl: ctrl} - mock.recorder = &MocktestRPCClientProviderMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use -func (m *MocktestRPCClientProvider) EXPECT() *MocktestRPCClientProviderMockRecorder { - return m.recorder -} - -// RPCClient mocks base method -func (m *MocktestRPCClientProvider) RPCClient() *rpc.Client { - ret := m.ctrl.Call(m, "RPCClient") - ret0, _ := ret[0].(*rpc.Client) - return ret0 -} - -// RPCClient indicates an expected call of RPCClient -func (mr *MocktestRPCClientProviderMockRecorder) RPCClient() *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RPCClient", reflect.TypeOf((*MocktestRPCClientProvider)(nil).RPCClient)) -} diff --git a/geth/transactions/transactor.go b/geth/transactions/transactor.go new file mode 100644 index 000000000..fb5952f66 --- /dev/null +++ b/geth/transactions/transactor.go @@ -0,0 +1,205 @@ +package transactions + +import ( + "context" + "math/big" + "sync" + "time" + + ethereum "github.com/ethereum/go-ethereum" + gethcommon "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/log" + + "github.com/status-im/status-go/geth/account" + "github.com/status-im/status-go/geth/rpc" + "github.com/status-im/status-go/sign" +) + +const ( + // sendTxTimeout defines how many seconds to wait before returning result in sentTransaction(). + sendTxTimeout = 300 * time.Second + rpcCallTimeout = time.Minute + + defaultGas = 90000 +) + +// Transactor validates, signs transactions. +// It uses upstream to propagate transactions to the Ethereum network. +type Transactor struct { + pendingSignRequests *sign.PendingRequests + sender ethereum.TransactionSender + pendingNonceProvider PendingNonceProvider + gasCalculator GasCalculator + sendTxTimeout time.Duration + rpcCallTimeout time.Duration + networkID uint64 + + addrLock *AddrLocker + localNonce sync.Map + log log.Logger +} + +// NewTransactor returns a new Manager. +func NewTransactor(signRequests *sign.PendingRequests) *Transactor { + return &Transactor{ + pendingSignRequests: signRequests, + addrLock: &AddrLocker{}, + sendTxTimeout: sendTxTimeout, + rpcCallTimeout: rpcCallTimeout, + localNonce: sync.Map{}, + log: log.New("package", "status-go/geth/transactions.Manager"), + } +} + +// SetNetworkID selects a correct network. +func (t *Transactor) SetNetworkID(networkID uint64) { + t.networkID = networkID +} + +// SetRPCClient an RPC client. +func (t *Transactor) SetRPCClient(rpcClient *rpc.Client) { + rpcWrapper := newRPCWrapper(rpcClient) + t.sender = rpcWrapper + t.pendingNonceProvider = rpcWrapper + t.gasCalculator = rpcWrapper +} + +// SendTransaction is an implementation of eth_sendTransaction. It queues the tx to the sign queue. +func (t *Transactor) SendTransaction(ctx context.Context, args SendTxArgs) (gethcommon.Hash, error) { + if ctx == nil { + ctx = context.Background() + } + + completeFunc := func(acc *account.SelectedExtKey) (gethcommon.Hash, error) { + return t.validateAndPropagate(acc, args) + } + + request, err := t.pendingSignRequests.Add(ctx, args, completeFunc) + + if err != nil { + return gethcommon.Hash{}, err + } + + rst := t.pendingSignRequests.Wait(request.ID, t.sendTxTimeout) + return rst.Hash, rst.Error +} + +// make sure that only account which created the tx can complete it +func (t *Transactor) validateAccount(args SendTxArgs, selectedAccount *account.SelectedExtKey) error { + if selectedAccount == nil { + return account.ErrNoAccountSelected + } + + if args.From.Hex() != selectedAccount.Address.Hex() { + err := sign.ErrInvalidCompleteTxSender + t.log.Error("queued transaction does not belong to the selected account", "err", err) + return err + } + + return nil +} + +func (t *Transactor) validateAndPropagate(selectedAccount *account.SelectedExtKey, args SendTxArgs) (hash gethcommon.Hash, err error) { + // TODO (mandrigin): Send sign request ID as a parameter to this function and uncoment the log message + // m.log.Info("complete transaction", "id", queuedTx.ID) + if err := t.validateAccount(args, selectedAccount); err != nil { + return hash, err + } + if !args.Valid() { + return hash, ErrInvalidSendTxArgs + } + t.addrLock.LockAddr(args.From) + var localNonce uint64 + if val, ok := t.localNonce.Load(args.From); ok { + localNonce = val.(uint64) + } + var nonce uint64 + defer func() { + // nonce should be incremented only if tx completed without error + // if upstream node returned nonce higher than ours we will stick to it + if err == nil { + t.localNonce.Store(args.From, nonce+1) + } + t.addrLock.UnlockAddr(args.From) + + }() + ctx, cancel := context.WithTimeout(context.Background(), t.rpcCallTimeout) + defer cancel() + nonce, err = t.pendingNonceProvider.PendingNonceAt(ctx, args.From) + if err != nil { + return hash, err + } + // if upstream node returned nonce higher than ours we will use it, as it probably means + // that another client was used for sending transactions + if localNonce > nonce { + nonce = localNonce + } + gasPrice := (*big.Int)(args.GasPrice) + if args.GasPrice == nil { + ctx, cancel = context.WithTimeout(context.Background(), t.rpcCallTimeout) + defer cancel() + gasPrice, err = t.gasCalculator.SuggestGasPrice(ctx) + if err != nil { + return hash, err + } + } + + chainID := big.NewInt(int64(t.networkID)) + value := (*big.Int)(args.Value) + + var gas uint64 + if args.Gas == nil { + ctx, cancel = context.WithTimeout(context.Background(), t.rpcCallTimeout) + defer cancel() + gas, err = t.gasCalculator.EstimateGas(ctx, ethereum.CallMsg{ + From: args.From, + To: args.To, + GasPrice: gasPrice, + Value: value, + Data: args.GetInput(), + }) + if err != nil { + return hash, err + } + if gas < defaultGas { + t.log.Info("default gas will be used. estimated gas", gas, "is lower than", defaultGas) + gas = defaultGas + } + } else { + gas = uint64(*args.Gas) + } + + var tx *types.Transaction + if args.To != nil { + t.log.Info("New transaction", + "From", args.From, + "To", *args.To, + "Gas", gas, + "GasPrice", gasPrice, + "Value", value, + ) + tx = types.NewTransaction(nonce, *args.To, value, gas, gasPrice, args.GetInput()) + } else { + // contract creation is rare enough to log an expected address + t.log.Info("New contract", + "From", args.From, + "Gas", gas, + "GasPrice", gasPrice, + "Value", value, + "Contract address", crypto.CreateAddress(args.From, nonce), + ) + tx = types.NewContractCreation(nonce, value, gas, gasPrice, args.GetInput()) + } + signedTx, err := types.SignTx(tx, types.NewEIP155Signer(chainID), selectedAccount.AccountKey.PrivateKey) + if err != nil { + return hash, err + } + ctx, cancel = context.WithTimeout(context.Background(), t.rpcCallTimeout) + defer cancel() + if err := t.sender.SendTransaction(ctx, signedTx); err != nil { + return hash, err + } + return signedTx.Hash(), nil +} diff --git a/geth/transactions/txqueue_manager_test.go b/geth/transactions/transactor_test.go similarity index 56% rename from geth/transactions/txqueue_manager_test.go rename to geth/transactions/transactor_test.go index 98a2ac913..581106a92 100644 --- a/geth/transactions/txqueue_manager_test.go +++ b/geth/transactions/transactor_test.go @@ -4,7 +4,6 @@ import ( "context" "errors" "math/big" - "sync" "testing" "time" @@ -26,52 +25,52 @@ import ( "github.com/status-im/status-go/geth/params" "github.com/status-im/status-go/geth/rpc" "github.com/status-im/status-go/geth/transactions/fake" + "github.com/status-im/status-go/sign" + . "github.com/status-im/status-go/t/utils" ) +func simpleVerifyFunc(acc *account.SelectedExtKey) func(string) (*account.SelectedExtKey, error) { + return func(string) (*account.SelectedExtKey, error) { + return acc, nil + } +} + func TestTxQueueTestSuite(t *testing.T) { suite.Run(t, new(TxQueueTestSuite)) } type TxQueueTestSuite struct { suite.Suite - rpcClientMockCtrl *gomock.Controller - rpcClientMock *MocktestRPCClientProvider server *gethrpc.Server client *gethrpc.Client txServiceMockCtrl *gomock.Controller txServiceMock *fake.MockPublicTransactionPoolAPI nodeConfig *params.NodeConfig - manager *Manager + manager *Transactor } func (s *TxQueueTestSuite) SetupTest() { - s.rpcClientMockCtrl = gomock.NewController(s.T()) s.txServiceMockCtrl = gomock.NewController(s.T()) - s.rpcClientMock = NewMocktestRPCClientProvider(s.rpcClientMockCtrl) - s.server, s.txServiceMock = fake.NewTestServer(s.txServiceMockCtrl) s.client = gethrpc.DialInProc(s.server) - rpclient, _ := rpc.NewClient(s.client, params.UpstreamRPCConfig{}) - s.rpcClientMock.EXPECT().RPCClient().Return(rpclient) + rpcClient, _ := rpc.NewClient(s.client, params.UpstreamRPCConfig{}) // expected by simulated backend chainID := gethparams.AllEthashProtocolChanges.ChainId.Uint64() nodeConfig, err := params.NewNodeConfig("/tmp", "", chainID, true) s.Require().NoError(err) s.nodeConfig = nodeConfig - s.manager = NewManager(s.rpcClientMock) - s.manager.DisableNotificactions() - s.manager.completionTimeout = time.Second + s.manager = NewTransactor(sign.NewPendingRequests()) + s.manager.sendTxTimeout = time.Second s.manager.rpcCallTimeout = time.Second - s.manager.Start(chainID) + s.manager.SetNetworkID(chainID) + s.manager.SetRPCClient(rpcClient) } func (s *TxQueueTestSuite) TearDownTest() { - s.manager.Stop() - s.rpcClientMockCtrl.Finish() s.txServiceMockCtrl.Finish() s.server.Stop() s.client.Close() @@ -83,38 +82,44 @@ var ( testNonce = hexutil.Uint64(10) ) -func (s *TxQueueTestSuite) setupTransactionPoolAPI(tx *QueuedTx, returnNonce, resultNonce hexutil.Uint64, account *account.SelectedExtKey, txErr error) { +func (s *TxQueueTestSuite) completeFunc(args SendTxArgs) func(acc *account.SelectedExtKey) (gethcommon.Hash, error) { + return func(acc *account.SelectedExtKey) (gethcommon.Hash, error) { + return s.manager.validateAndPropagate(acc, args) + } +} + +func (s *TxQueueTestSuite) setupTransactionPoolAPI(args SendTxArgs, returnNonce, resultNonce hexutil.Uint64, account *account.SelectedExtKey, txErr error) { // Expect calls to gas functions only if there are no user defined values. // And also set the expected gas and gas price for RLP encoding the expected tx. var usedGas hexutil.Uint64 var usedGasPrice *big.Int s.txServiceMock.EXPECT().GetTransactionCount(gomock.Any(), account.Address, gethrpc.PendingBlockNumber).Return(&returnNonce, nil) - if tx.Args.GasPrice == nil { + if args.GasPrice == nil { usedGasPrice = (*big.Int)(testGasPrice) s.txServiceMock.EXPECT().GasPrice(gomock.Any()).Return(usedGasPrice, nil) } else { - usedGasPrice = (*big.Int)(tx.Args.GasPrice) + usedGasPrice = (*big.Int)(args.GasPrice) } - if tx.Args.Gas == nil { + if args.Gas == nil { s.txServiceMock.EXPECT().EstimateGas(gomock.Any(), gomock.Any()).Return(testGas, nil) usedGas = testGas } else { - usedGas = *tx.Args.Gas + usedGas = *args.Gas } // Prepare the transaction anD RLP encode it. - data := s.rlpEncodeTx(tx, s.nodeConfig, account, &resultNonce, usedGas, usedGasPrice) + data := s.rlpEncodeTx(args, s.nodeConfig, account, &resultNonce, usedGas, usedGasPrice) // Expect the RLP encoded transaction. s.txServiceMock.EXPECT().SendRawTransaction(gomock.Any(), data).Return(gethcommon.Hash{}, txErr) } -func (s *TxQueueTestSuite) rlpEncodeTx(tx *QueuedTx, config *params.NodeConfig, account *account.SelectedExtKey, nonce *hexutil.Uint64, gas hexutil.Uint64, gasPrice *big.Int) hexutil.Bytes { +func (s *TxQueueTestSuite) rlpEncodeTx(args SendTxArgs, config *params.NodeConfig, account *account.SelectedExtKey, nonce *hexutil.Uint64, gas hexutil.Uint64, gasPrice *big.Int) hexutil.Bytes { newTx := types.NewTransaction( uint64(*nonce), - gethcommon.Address(*tx.Args.To), - tx.Args.Value.ToInt(), + gethcommon.Address(*args.To), + args.Value.ToInt(), uint64(gas), gasPrice, - []byte(tx.Args.Input), + []byte(args.Input), ) chainID := big.NewInt(int64(config.NetworkID)) signedTx, err := types.SignTx(newTx, types.NewEIP155Signer(chainID), account.AccountKey.PrivateKey) @@ -160,141 +165,103 @@ func (s *TxQueueTestSuite) TestCompleteTransaction() { for _, testCase := range testCases { s.T().Run(testCase.name, func(t *testing.T) { s.SetupTest() - tx := Create(context.Background(), SendTxArgs{ + args := SendTxArgs{ From: account.FromAddress(TestConfig.Account1.Address), To: account.ToAddress(TestConfig.Account2.Address), Gas: testCase.gas, GasPrice: testCase.gasPrice, - }) - s.setupTransactionPoolAPI(tx, testNonce, testNonce, selectedAccount, nil) + } + s.setupTransactionPoolAPI(args, testNonce, testNonce, selectedAccount, nil) - s.NoError(s.manager.QueueTransaction(tx)) w := make(chan struct{}) var ( - hash gethcommon.Hash - err error + sendHash gethcommon.Hash + err error ) go func() { - hash, err = s.manager.CompleteTransaction(tx.ID, selectedAccount) - s.NoError(err) + var sendErr error + sendHash, sendErr = s.manager.SendTransaction(context.Background(), args) + s.NoError(sendErr) close(w) }() - rst := s.manager.WaitForTransaction(tx) - // Check that error is assigned to the transaction. - s.NoError(rst.Error) - // Transaction should be already removed from the queue. - s.False(s.manager.TransactionQueue().Has(tx.ID)) + for i := 10; i > 0; i-- { + if s.manager.pendingSignRequests.Count() > 0 { + break + } + time.Sleep(time.Millisecond) + } + + req := s.manager.pendingSignRequests.First() + s.NotNil(req) + approveHash, err := s.manager.pendingSignRequests.Approve(req.ID, "", simpleVerifyFunc(selectedAccount)) + s.NoError(err) s.NoError(WaitClosed(w, time.Second)) - s.Equal(hash, rst.Hash) + + // Transaction should be already removed from the queue. + s.False(s.manager.pendingSignRequests.Has(req.ID)) + s.Equal(sendHash, approveHash) }) } } -func (s *TxQueueTestSuite) TestCompleteTransactionMultipleTimes() { - key, _ := crypto.GenerateKey() - selectedAccount := &account.SelectedExtKey{ - Address: account.FromAddress(TestConfig.Account1.Address), - AccountKey: &keystore.Key{PrivateKey: key}, - } - - tx := Create(context.Background(), SendTxArgs{ - From: account.FromAddress(TestConfig.Account1.Address), - To: account.ToAddress(TestConfig.Account2.Address), - }) - - s.setupTransactionPoolAPI(tx, testNonce, testNonce, selectedAccount, nil) - - err := s.manager.QueueTransaction(tx) - s.NoError(err) - - var ( - wg sync.WaitGroup - mu sync.Mutex - completedTx int - inprogressTx int - txCount = 3 - ) - for i := 0; i < txCount; i++ { - wg.Add(1) - go func() { - defer wg.Done() - _, err := s.manager.CompleteTransaction(tx.ID, selectedAccount) - mu.Lock() - defer mu.Unlock() - if err == nil { - completedTx++ - } else if err == ErrQueuedTxInProgress { - inprogressTx++ - } else { - s.Fail("tx failed with unexpected error: ", err.Error()) - } - }() - } - - rst := s.manager.WaitForTransaction(tx) - // Check that error is assigned to the transaction. - s.NoError(rst.Error) - // Transaction should be already removed from the queue. - s.False(s.manager.TransactionQueue().Has(tx.ID)) - - // Wait for all CompleteTransaction calls. - wg.Wait() - s.Equal(1, completedTx, "only 1 tx expected to be completed") - s.Equal(txCount-1, inprogressTx, "txs expected to be reported as inprogress") -} - func (s *TxQueueTestSuite) TestAccountMismatch() { selectedAccount := &account.SelectedExtKey{ Address: account.FromAddress(TestConfig.Account2.Address), } - tx := Create(context.Background(), SendTxArgs{ + args := SendTxArgs{ From: account.FromAddress(TestConfig.Account1.Address), To: account.ToAddress(TestConfig.Account2.Address), - }) + } - s.NoError(s.manager.QueueTransaction(tx)) + go func() { + s.manager.SendTransaction(context.Background(), args) // nolint: errcheck + }() - _, err := s.manager.CompleteTransaction(tx.ID, selectedAccount) - s.Equal(err, ErrInvalidCompleteTxSender) + for i := 10; i > 0; i-- { + if s.manager.pendingSignRequests.Count() > 0 { + break + } + time.Sleep(time.Millisecond) + } + + req := s.manager.pendingSignRequests.First() + s.NotNil(req) + _, err := s.manager.pendingSignRequests.Approve(req.ID, "", simpleVerifyFunc(selectedAccount)) + s.Equal(err, sign.ErrInvalidCompleteTxSender) // Transaction should stay in the queue as mismatched accounts // is a recoverable error. - s.True(s.manager.TransactionQueue().Has(tx.ID)) + s.True(s.manager.pendingSignRequests.Has(req.ID)) } func (s *TxQueueTestSuite) TestDiscardTransaction() { - tx := Create(context.Background(), SendTxArgs{ + args := SendTxArgs{ From: account.FromAddress(TestConfig.Account1.Address), To: account.ToAddress(TestConfig.Account2.Address), - }) - - s.NoError(s.manager.QueueTransaction(tx)) + } w := make(chan struct{}) go func() { - s.NoError(s.manager.DiscardTransaction(tx.ID)) + _, err := s.manager.SendTransaction(context.Background(), args) + s.Equal(sign.ErrSignReqDiscarded, err) close(w) }() - rst := s.manager.WaitForTransaction(tx) - s.Equal(ErrQueuedTxDiscarded, rst.Error) - // Transaction should be already removed from the queue. - s.False(s.manager.TransactionQueue().Has(tx.ID)) + for i := 10; i > 0; i-- { + if s.manager.pendingSignRequests.Count() > 0 { + break + } + time.Sleep(time.Millisecond) + } + + req := s.manager.pendingSignRequests.First() + s.NotNil(req) + err := s.manager.pendingSignRequests.Discard(req.ID) + s.NoError(err) s.NoError(WaitClosed(w, time.Second)) } -func (s *TxQueueTestSuite) TestCompletionTimedOut() { - tx := Create(context.Background(), SendTxArgs{ - From: account.FromAddress(TestConfig.Account1.Address), - To: account.ToAddress(TestConfig.Account2.Address), - }) - - s.NoError(s.manager.QueueTransaction(tx)) - rst := s.manager.WaitForTransaction(tx) - s.Equal(ErrQueuedTxTimedOut, rst.Error) -} - // TestLocalNonce verifies that local nonce will be used unless // upstream nonce is updated and higher than a local // in test we will run 3 transaction with nonce zero returned by upstream @@ -310,49 +277,60 @@ func (s *TxQueueTestSuite) TestLocalNonce() { AccountKey: &keystore.Key{PrivateKey: key}, } nonce := hexutil.Uint64(0) + + go func() { + approved := 0 + for { + // 3 in a cycle, then 2 + if approved >= txCount+2 { + return + } + req := s.manager.pendingSignRequests.First() + if req == nil { + time.Sleep(time.Millisecond) + } else { + s.manager.pendingSignRequests.Approve(req.ID, "", simpleVerifyFunc(selectedAccount)) // nolint: errcheck + } + } + }() + for i := 0; i < txCount; i++ { - tx := Create(context.Background(), SendTxArgs{ + args := SendTxArgs{ From: account.FromAddress(TestConfig.Account1.Address), To: account.ToAddress(TestConfig.Account2.Address), - }) - s.setupTransactionPoolAPI(tx, nonce, hexutil.Uint64(i), selectedAccount, nil) - s.NoError(s.manager.QueueTransaction(tx)) - hash, err := s.manager.CompleteTransaction(tx.ID, selectedAccount) - rst := s.manager.WaitForTransaction(tx) - // simple sanity checks + } + s.setupTransactionPoolAPI(args, nonce, hexutil.Uint64(i), selectedAccount, nil) + + _, err := s.manager.SendTransaction(context.Background(), args) s.NoError(err) - s.NoError(rst.Error) - s.Equal(rst.Hash, hash) - resultNonce, _ := s.manager.localNonce.Load(tx.Args.From) + resultNonce, _ := s.manager.localNonce.Load(args.From) s.Equal(uint64(i)+1, resultNonce.(uint64)) } + nonce = hexutil.Uint64(5) - tx := Create(context.Background(), SendTxArgs{ + args := SendTxArgs{ From: account.FromAddress(TestConfig.Account1.Address), To: account.ToAddress(TestConfig.Account2.Address), - }) - s.setupTransactionPoolAPI(tx, nonce, nonce, selectedAccount, nil) - s.NoError(s.manager.QueueTransaction(tx)) - hash, err := s.manager.CompleteTransaction(tx.ID, selectedAccount) - rst := s.manager.WaitForTransaction(tx) + } + + s.setupTransactionPoolAPI(args, nonce, nonce, selectedAccount, nil) + + _, err := s.manager.SendTransaction(context.Background(), args) s.NoError(err) - s.NoError(rst.Error) - s.Equal(rst.Hash, hash) - resultNonce, _ := s.manager.localNonce.Load(tx.Args.From) + + resultNonce, _ := s.manager.localNonce.Load(args.From) s.Equal(uint64(nonce)+1, resultNonce.(uint64)) testErr := errors.New("test") s.txServiceMock.EXPECT().GetTransactionCount(gomock.Any(), selectedAccount.Address, gethrpc.PendingBlockNumber).Return(nil, testErr) - tx = Create(context.Background(), SendTxArgs{ + args = SendTxArgs{ From: account.FromAddress(TestConfig.Account1.Address), To: account.ToAddress(TestConfig.Account2.Address), - }) - s.NoError(s.manager.QueueTransaction(tx)) - _, err = s.manager.CompleteTransaction(tx.ID, selectedAccount) - rst = s.manager.WaitForTransaction(tx) + } + + _, err = s.manager.SendTransaction(context.Background(), args) s.EqualError(testErr, err.Error()) - s.EqualError(testErr, rst.Error.Error()) - resultNonce, _ = s.manager.localNonce.Load(tx.Args.From) + resultNonce, _ = s.manager.localNonce.Load(args.From) s.Equal(uint64(nonce)+1, resultNonce.(uint64)) } @@ -367,13 +345,27 @@ func (s *TxQueueTestSuite) TestContractCreation() { Address: testaddr, AccountKey: &keystore.Key{PrivateKey: key}, } - s.manager.ethTxClient = backend - tx := Create(context.Background(), SendTxArgs{ + s.manager.sender = backend + s.manager.gasCalculator = backend + s.manager.pendingNonceProvider = backend + tx := SendTxArgs{ From: testaddr, Input: hexutil.Bytes(gethcommon.FromHex(contract.ENSBin)), - }) - s.NoError(s.manager.QueueTransaction(tx)) - hash, err := s.manager.CompleteTransaction(tx.ID, selectedAccount) + } + + go func() { + for i := 1000; i > 0; i-- { + req := s.manager.pendingSignRequests.First() + if req == nil { + time.Sleep(time.Millisecond) + } else { + s.manager.pendingSignRequests.Approve(req.ID, "", simpleVerifyFunc(selectedAccount)) // nolint: errcheck + break + } + } + }() + + hash, err := s.manager.SendTransaction(context.Background(), tx) s.NoError(err) backend.Commit() receipt, err := backend.TransactionReceipt(context.TODO(), hash) diff --git a/geth/transactions/txqueue_manager.go b/geth/transactions/txqueue_manager.go deleted file mode 100644 index e5168506b..000000000 --- a/geth/transactions/txqueue_manager.go +++ /dev/null @@ -1,333 +0,0 @@ -package transactions - -import ( - "context" - "encoding/json" - "errors" - "math/big" - "sync" - "time" - - ethereum "github.com/ethereum/go-ethereum" - gethcommon "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/log" - - "github.com/ethereum/go-ethereum/core/types" - "github.com/status-im/status-go/geth/account" - - "github.com/status-im/status-go/geth/rpc" -) - -const ( - // SendTxDefaultErrorCode is sent by default, when error is not nil, but type is unknown/unexpected. - SendTxDefaultErrorCode = SendTransactionDefaultErrorCode - // DefaultTxSendCompletionTimeout defines how many seconds to wait before returning result in sentTransaction(). - DefaultTxSendCompletionTimeout = 300 * time.Second - - defaultGas = 90000 - defaultTimeout = time.Minute -) - -var ( - // ErrUnexpectedArgs returned when args are of unexpected length. - ErrUnexpectedArgs = errors.New("unexpected args") -) - -// RPCClientProvider is an interface that provides a way -// to obtain an rpc.Client. -type RPCClientProvider interface { - RPCClient() *rpc.Client -} - -// Manager provides means to manage internal Status Backend (injected into LES) -type Manager struct { - rpcClientProvider RPCClientProvider - txQueue *TxQueue - ethTxClient EthTransactor - notify bool - completionTimeout time.Duration - rpcCallTimeout time.Duration - networkID uint64 - - addrLock *AddrLocker - localNonce sync.Map - log log.Logger -} - -// NewManager returns a new Manager. -func NewManager(rpcClientProvider RPCClientProvider) *Manager { - return &Manager{ - rpcClientProvider: rpcClientProvider, - txQueue: newQueue(), - addrLock: &AddrLocker{}, - notify: true, - completionTimeout: DefaultTxSendCompletionTimeout, - rpcCallTimeout: defaultTimeout, - localNonce: sync.Map{}, - log: log.New("package", "status-go/geth/transactions.Manager"), - } -} - -// DisableNotificactions turns off notifications on enqueue and return of tx. -// It is not thread safe and must be called only before manager is started. -func (m *Manager) DisableNotificactions() { - m.notify = false -} - -// Start starts accepting new transactions into the queue. -func (m *Manager) Start(networkID uint64) { - m.log.Info("start Manager") - m.networkID = networkID - m.ethTxClient = NewEthTxClient(m.rpcClientProvider.RPCClient()) - m.txQueue.Start() -} - -// Stop stops accepting new transactions into the queue. -func (m *Manager) Stop() { - m.log.Info("stop Manager") - m.txQueue.Stop() -} - -// TransactionQueue returns a reference to the queue. -func (m *Manager) TransactionQueue() *TxQueue { - return m.txQueue -} - -// QueueTransaction puts a transaction into the queue. -func (m *Manager) QueueTransaction(tx *QueuedTx) error { - if !tx.Args.Valid() { - return ErrInvalidSendTxArgs - } - to := "" - if tx.Args.To != nil { - to = tx.Args.To.Hex() - } - m.log.Info("queue a new transaction", "id", tx.ID, "from", tx.Args.From.Hex(), "to", to) - if err := m.txQueue.Enqueue(tx); err != nil { - return err - } - if m.notify { - NotifyOnEnqueue(tx) - } - return nil -} - -func (m *Manager) txDone(tx *QueuedTx, hash gethcommon.Hash, err error) { - if err := m.txQueue.Done(tx.ID, hash, err); err == ErrQueuedTxIDNotFound { - m.log.Warn("transaction is already removed from a queue", "ID", tx.ID) - return - } - if m.notify { - NotifyOnReturn(tx, err) - } -} - -// WaitForTransaction adds a transaction to the queue and blocks -// until it's completed, discarded or times out. -func (m *Manager) WaitForTransaction(tx *QueuedTx) Result { - m.log.Info("wait for transaction", "id", tx.ID) - // now wait up until transaction is: - // - completed (via CompleteQueuedTransaction), - // - discarded (via DiscardQueuedTransaction) - // - or times out - for { - select { - case rst := <-tx.Result: - return rst - case <-time.After(m.completionTimeout): - m.txDone(tx, gethcommon.Hash{}, ErrQueuedTxTimedOut) - } - } -} - -// NotifyErrored sends a notification for the given transaction -func (m *Manager) NotifyErrored(id string, inputError error) error { - tx, err := m.txQueue.Get(id) - if err != nil { - m.log.Warn("error getting a queued transaction", "err", err) - return err - } - - if m.notify { - NotifyOnReturn(tx, inputError) - } - - return nil -} - -// CompleteTransaction instructs backend to complete sending of a given transaction. -func (m *Manager) CompleteTransaction(id string, account *account.SelectedExtKey) (hash gethcommon.Hash, err error) { - m.log.Info("complete transaction", "id", id) - tx, err := m.txQueue.Get(id) - if err != nil { - m.log.Warn("error getting a queued transaction", "err", err) - return hash, err - } - if err := m.txQueue.LockInprogress(id); err != nil { - m.log.Warn("can't process transaction", "err", err) - return hash, err - } - - if err := m.validateAccount(tx, account); err != nil { - m.txDone(tx, hash, err) - return hash, err - } - hash, err = m.completeTransaction(account, tx) - m.log.Info("finally completed transaction", "id", tx.ID, "hash", hash, "err", err) - m.txDone(tx, hash, err) - return hash, err -} - -// make sure that only account which created the tx can complete it -func (m *Manager) validateAccount(tx *QueuedTx, selectedAccount *account.SelectedExtKey) error { - if selectedAccount == nil { - return account.ErrNoAccountSelected - } - - // make sure that only account which created the tx can complete it - if tx.Args.From.Hex() != selectedAccount.Address.Hex() { - m.log.Warn("queued transaction does not belong to the selected account", "err", ErrInvalidCompleteTxSender) - return ErrInvalidCompleteTxSender - } - - return nil -} - -func (m *Manager) completeTransaction(selectedAccount *account.SelectedExtKey, queuedTx *QueuedTx) (hash gethcommon.Hash, err error) { - m.log.Info("complete transaction", "id", queuedTx.ID) - m.addrLock.LockAddr(queuedTx.Args.From) - var localNonce uint64 - if val, ok := m.localNonce.Load(queuedTx.Args.From); ok { - localNonce = val.(uint64) - } - var nonce uint64 - defer func() { - // nonce should be incremented only if tx completed without error - // if upstream node returned nonce higher than ours we will stick to it - if err == nil { - m.localNonce.Store(queuedTx.Args.From, nonce+1) - } - m.addrLock.UnlockAddr(queuedTx.Args.From) - - }() - ctx, cancel := context.WithTimeout(context.Background(), m.rpcCallTimeout) - defer cancel() - nonce, err = m.ethTxClient.PendingNonceAt(ctx, queuedTx.Args.From) - if err != nil { - return hash, err - } - // if upstream node returned nonce higher than ours we will use it, as it probably means - // that another client was used for sending transactions - if localNonce > nonce { - nonce = localNonce - } - args := queuedTx.Args - if !args.Valid() { - return hash, ErrInvalidSendTxArgs - } - gasPrice := (*big.Int)(args.GasPrice) - if args.GasPrice == nil { - ctx, cancel = context.WithTimeout(context.Background(), m.rpcCallTimeout) - defer cancel() - gasPrice, err = m.ethTxClient.SuggestGasPrice(ctx) - if err != nil { - return hash, err - } - } - - chainID := big.NewInt(int64(m.networkID)) - value := (*big.Int)(args.Value) - - var gas uint64 - if args.Gas == nil { - ctx, cancel = context.WithTimeout(context.Background(), m.rpcCallTimeout) - defer cancel() - gas, err = m.ethTxClient.EstimateGas(ctx, ethereum.CallMsg{ - From: args.From, - To: args.To, - GasPrice: gasPrice, - Value: value, - Data: args.GetInput(), - }) - if err != nil { - return hash, err - } - if gas < defaultGas { - m.log.Info("default gas will be used. estimated gas", gas, "is lower than", defaultGas) - gas = defaultGas - } - } else { - gas = uint64(*args.Gas) - } - var tx *types.Transaction - if args.To != nil { - m.log.Info("New transaction", - "From", args.From, - "To", *args.To, - "Gas", gas, - "GasPrice", gasPrice, - "Value", value, - ) - tx = types.NewTransaction(nonce, *args.To, value, gas, gasPrice, args.GetInput()) - } else { - // contract creation is rare enough to log an expected address - m.log.Info("New contract", - "From", args.From, - "Gas", gas, - "GasPrice", gasPrice, - "Value", value, - "Contract address", crypto.CreateAddress(args.From, nonce), - ) - tx = types.NewContractCreation(nonce, value, gas, gasPrice, args.GetInput()) - } - signedTx, err := types.SignTx(tx, types.NewEIP155Signer(chainID), selectedAccount.AccountKey.PrivateKey) - if err != nil { - return hash, err - } - ctx, cancel = context.WithTimeout(context.Background(), m.rpcCallTimeout) - defer cancel() - if err := m.ethTxClient.SendTransaction(ctx, signedTx); err != nil { - return hash, err - } - return signedTx.Hash(), nil -} - -// DiscardTransaction discards a given transaction from transaction queue -func (m *Manager) DiscardTransaction(id string) error { - tx, err := m.txQueue.Get(id) - if err != nil { - return err - } - err = m.txQueue.Done(id, gethcommon.Hash{}, ErrQueuedTxDiscarded) - if m.notify { - NotifyOnReturn(tx, ErrQueuedTxDiscarded) - } - return err -} - -// SendTransactionRPCHandler is a handler for eth_sendTransaction method. -// It accepts one param which is a slice with a map of transaction params. -func (m *Manager) SendTransactionRPCHandler(ctx context.Context, args ...interface{}) (interface{}, error) { - m.log.Debug("SendTransactionRPCHandler called", "ARGS", args) - if len(args) != 1 { - return nil, ErrUnexpectedArgs - } - data, err := json.Marshal(args[0]) - if err != nil { - return nil, err - } - var txArgs SendTxArgs - if err := json.Unmarshal(data, &txArgs); err != nil { - return nil, err - } - tx := Create(ctx, txArgs) - if err := m.QueueTransaction(tx); err != nil { - return nil, err - } - rst := m.WaitForTransaction(tx) - if rst.Error != nil { - return nil, rst.Error - } - return rst.Hash.Hex(), nil -} diff --git a/geth/transactions/types.go b/geth/transactions/types.go index 7cb9ef993..f1f02af4b 100644 --- a/geth/transactions/types.go +++ b/geth/transactions/types.go @@ -3,29 +3,30 @@ package transactions import ( "bytes" "context" + "encoding/json" "errors" + ethereum "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" ) -// errors var ( + // ErrInvalidSendTxArgs is returned when the structure of SendTxArgs is ambigious. ErrInvalidSendTxArgs = errors.New("Transaction arguments are invalid (are both 'input' and 'data' fields used?)") + // ErrUnexpectedArgs returned when args are of unexpected length. + ErrUnexpectedArgs = errors.New("unexpected args") ) -// Result is a JSON returned from transaction complete function (used internally) -type Result struct { - Hash common.Hash - Error error +// PendingNonceProvider provides information about nonces. +type PendingNonceProvider interface { + PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) } -// QueuedTx holds enough information to complete the queued transaction. -type QueuedTx struct { - ID string - Context context.Context - Args SendTxArgs - Result chan Result +// GasCalculator provides methods for estimating and pricing gas. +type GasCalculator interface { + ethereum.GasEstimator + ethereum.GasPricer } // SendTxArgs represents the arguments to submit a new transaction into the transaction pool. @@ -68,3 +69,20 @@ func (args SendTxArgs) GetInput() hexutil.Bytes { func isNilOrEmpty(bytes hexutil.Bytes) bool { return bytes == nil || len(bytes) == 0 } + +// RPCCalltoSendTxArgs creates SendTxArgs based on RPC parameters +func RPCCalltoSendTxArgs(args ...interface{}) (SendTxArgs, error) { + var txArgs SendTxArgs + if len(args) != 1 { + return txArgs, ErrUnexpectedArgs + } + data, err := json.Marshal(args[0]) + if err != nil { + return txArgs, err + } + if err := json.Unmarshal(data, &txArgs); err != nil { + return txArgs, err + } + + return txArgs, nil +} diff --git a/geth/transactions/utils.go b/geth/transactions/utils.go deleted file mode 100644 index 939bc2cbb..000000000 --- a/geth/transactions/utils.go +++ /dev/null @@ -1,89 +0,0 @@ -package transactions - -import ( - "context" - "errors" - "fmt" - "io" - "os" - "reflect" - "runtime/debug" - - "github.com/pborman/uuid" - "github.com/status-im/status-go/geth/signal" -) - -const ( - // MessageIDKey is a key for message ID - // This ID is required to track from which chat a given send transaction request is coming. - MessageIDKey = contextKey("message_id") -) - -type contextKey string // in order to make sure that our context key does not collide with keys from other packages - -//ErrTxQueueRunFailure - error running transaction queue -var ErrTxQueueRunFailure = errors.New("error running transaction queue") - -// haltOnPanic recovers from panic, logs issue, sends upward notification, and exits -func haltOnPanic() { - if r := recover(); r != nil { - err := fmt.Errorf("%v: %v", ErrTxQueueRunFailure, r) - - // send signal up to native app - signal.Send(signal.Envelope{ - Type: signal.EventNodeCrashed, - Event: signal.NodeCrashEvent{ - Error: err, - }, - }) - - fatalf(err) // os.exit(1) is called internally - } -} - -// messageIDFromContext returns message id from context (if exists) -func messageIDFromContext(ctx context.Context) string { - if ctx == nil { - return "" - } - if messageID, ok := ctx.Value(MessageIDKey).(string); ok { - return messageID - } - - return "" -} - -// fatalf is used to halt the execution. -// When called the function prints stack end exits. -// Failure is logged into both StdErr and StdOut. -func fatalf(reason interface{}, args ...interface{}) { - // decide on output stream - w := io.MultiWriter(os.Stdout, os.Stderr) - outf, _ := os.Stdout.Stat() // nolint: gas - errf, _ := os.Stderr.Stat() // nolint: gas - if outf != nil && errf != nil && os.SameFile(outf, errf) { - w = os.Stderr - } - - // find out whether error or string has been passed as a reason - r := reflect.ValueOf(reason) - if r.Kind() == reflect.String { - fmt.Fprintf(w, "Fatal Failure: %v\n%v\n", reason.(string), args) - } else { - fmt.Fprintf(w, "Fatal Failure: %v\n", reason.(error)) - } - - debug.PrintStack() - - os.Exit(1) -} - -// Create returns a transaction object. -func Create(ctx context.Context, args SendTxArgs) *QueuedTx { - return &QueuedTx{ - ID: uuid.New(), - Context: ctx, - Args: args, - Result: make(chan Result, 1), - } -} diff --git a/lib/library_test_utils.go b/lib/library_test_utils.go index 5f94e044c..0ddc45b44 100644 --- a/lib/library_test_utils.go +++ b/lib/library_test_utils.go @@ -34,6 +34,7 @@ import ( "github.com/status-im/status-go/geth/params" "github.com/status-im/status-go/geth/signal" "github.com/status-im/status-go/geth/transactions" + "github.com/status-im/status-go/sign" "github.com/status-im/status-go/static" . "github.com/status-im/status-go/t/utils" //nolint: golint ) @@ -767,10 +768,8 @@ func testAccountLogout(t *testing.T) bool { } func testCompleteTransaction(t *testing.T) bool { - txQueueManager := statusAPI.TxQueueManager() - txQueue := txQueueManager.TransactionQueue() + signRequests := statusAPI.PendingSignRequests() - txQueue.Reset() EnsureNodeSync(statusAPI.StatusNode().EnsureSync) // log into account from which transactions will be sent @@ -792,7 +791,7 @@ func testCompleteTransaction(t *testing.T) bool { t.Errorf("cannot unmarshal event's JSON: %s. Error %q", jsonEvent, err) return } - if envelope.Type == transactions.EventTransactionQueued { + if envelope.Type == sign.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) t.Logf("transaction queued (will be completed shortly): {id: %s}\n", event["id"].(string)) @@ -839,7 +838,7 @@ func testCompleteTransaction(t *testing.T) bool { return false } - if txQueue.Count() != 0 { + if signRequests.Count() != 0 { t.Error("tx queue must be empty at this point") return false } @@ -848,8 +847,7 @@ func testCompleteTransaction(t *testing.T) bool { } func testCompleteMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocyclo - txQueue := statusAPI.TxQueueManager().TransactionQueue() - txQueue.Reset() + signRequests := statusAPI.PendingSignRequests() // log into account from which transactions will be sent if err := statusAPI.SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password); err != nil { @@ -870,7 +868,7 @@ func testCompleteMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocyc t.Errorf("cannot unmarshal event's JSON: %s", jsonEvent) return } - if envelope.Type == transactions.EventTransactionQueued { + if envelope.Type == sign.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) txID = event["id"].(string) t.Logf("transaction queued (will be completed in a single call, once aggregated): {id: %s}\n", txID) @@ -917,7 +915,7 @@ func testCompleteMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocyc } results := resultsStruct.Results - if len(results) != (testTxCount+1) || results["invalid-tx-id"].Error != transactions.ErrQueuedTxIDNotFound.Error() { + if len(results) != (testTxCount+1) || results["invalid-tx-id"].Error != sign.ErrSignReqNotFound.Error() { t.Errorf("cannot complete txs: %v", results) return } @@ -942,7 +940,7 @@ func testCompleteMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocyc time.Sleep(1 * time.Second) // make sure that tx complete signal propagates for _, txID := range parsedIDs { - if txQueue.Has(string(txID)) { + if signRequests.Has(string(txID)) { t.Errorf("txqueue should not have test tx at this point (it should be completed): %s", txID) return } @@ -972,7 +970,7 @@ func testCompleteMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocyc return false } - if txQueue.Count() != 0 { + if signRequests.Count() != 0 { t.Error("tx queue must be empty at this point") return false } @@ -981,8 +979,7 @@ func testCompleteMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocyc } func testDiscardTransaction(t *testing.T) bool { //nolint: gocyclo - txQueue := statusAPI.TxQueueManager().TransactionQueue() - txQueue.Reset() + signRequests := statusAPI.PendingSignRequests() // log into account from which transactions will be sent if err := statusAPI.SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password); err != nil { @@ -1003,12 +1000,12 @@ func testDiscardTransaction(t *testing.T) bool { //nolint: gocyclo t.Errorf("cannot unmarshal event's JSON: %s", jsonEvent) return } - if envelope.Type == transactions.EventTransactionQueued { + if envelope.Type == sign.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) txID = event["id"].(string) t.Logf("transaction queued (will be discarded soon): {id: %s}\n", txID) - if !txQueue.Has(string(txID)) { + if !signRequests.Has(string(txID)) { t.Errorf("txqueue should still have test tx: %s", txID) return } @@ -1028,13 +1025,12 @@ func testDiscardTransaction(t *testing.T) bool { //nolint: gocyclo // try completing discarded transaction _, err := statusAPI.CompleteTransaction(string(txID), TestConfig.Account1.Password) - if err != transactions.ErrQueuedTxIDNotFound { + if err != sign.ErrSignReqNotFound { t.Error("expects tx not found, but call to CompleteTransaction succeeded") return } - time.Sleep(1 * time.Second) // make sure that tx complete signal propagates - if txQueue.Has(string(txID)) { + if signRequests.Has(string(txID)) { t.Errorf("txqueue should not have test tx at this point (it should be discarded): %s", txID) return } @@ -1042,19 +1038,19 @@ func testDiscardTransaction(t *testing.T) bool { //nolint: gocyclo completeQueuedTransaction <- struct{}{} // so that timeout is aborted } - if envelope.Type == transactions.EventTransactionFailed { + if envelope.Type == sign.EventTransactionFailed { event := envelope.Event.(map[string]interface{}) t.Logf("transaction return event received: {id: %s}\n", event["id"].(string)) receivedErrMessage := event["error_message"].(string) - expectedErrMessage := transactions.ErrQueuedTxDiscarded.Error() + expectedErrMessage := sign.ErrSignReqDiscarded.Error() if receivedErrMessage != expectedErrMessage { t.Errorf("unexpected error message received: got %v", receivedErrMessage) return } receivedErrCode := event["error_code"].(string) - if receivedErrCode != strconv.Itoa(transactions.SendTransactionDiscardedErrorCode) { + if receivedErrCode != strconv.Itoa(sign.SendTransactionDiscardedErrorCode) { t.Errorf("unexpected error code received: got %v", receivedErrCode) return } @@ -1069,7 +1065,8 @@ func testDiscardTransaction(t *testing.T) bool { //nolint: gocyclo To: account.ToAddress(TestConfig.Account2.Address), Value: (*hexutil.Big)(big.NewInt(1000000000000)), }) - if err != transactions.ErrQueuedTxDiscarded { + time.Sleep(1 * time.Second) + if err != sign.ErrSignReqDiscarded { t.Errorf("expected error not thrown: %v", err) return false } @@ -1079,7 +1076,7 @@ func testDiscardTransaction(t *testing.T) bool { //nolint: gocyclo return false } - if txQueue.Count() != 0 { + if signRequests.Count() != 0 { t.Error("tx queue must be empty at this point") return false } @@ -1093,8 +1090,7 @@ func testDiscardTransaction(t *testing.T) bool { //nolint: gocyclo } func testDiscardMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocyclo - txQueue := statusAPI.TxQueueManager().TransactionQueue() - txQueue.Reset() + signRequests := statusAPI.PendingSignRequests() // log into account from which transactions will be sent if err := statusAPI.SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password); err != nil { @@ -1116,12 +1112,12 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocycl t.Errorf("cannot unmarshal event's JSON: %s", jsonEvent) return } - if envelope.Type == transactions.EventTransactionQueued { + if envelope.Type == sign.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) txID = event["id"].(string) t.Logf("transaction queued (will be discarded soon): {id: %s}\n", txID) - if !txQueue.Has(string(txID)) { + if !signRequests.Has(string(txID)) { t.Errorf("txqueue should still have test tx: %s", txID) return } @@ -1129,19 +1125,19 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocycl txIDs <- txID } - if envelope.Type == transactions.EventTransactionFailed { + if envelope.Type == sign.EventTransactionFailed { event := envelope.Event.(map[string]interface{}) t.Logf("transaction return event received: {id: %s}\n", event["id"].(string)) receivedErrMessage := event["error_message"].(string) - expectedErrMessage := transactions.ErrQueuedTxDiscarded.Error() + expectedErrMessage := sign.ErrSignReqDiscarded.Error() if receivedErrMessage != expectedErrMessage { t.Errorf("unexpected error message received: got %v", receivedErrMessage) return } receivedErrCode := event["error_code"].(string) - if receivedErrCode != strconv.Itoa(transactions.SendTransactionDiscardedErrorCode) { + if receivedErrCode != strconv.Itoa(sign.SendTransactionDiscardedErrorCode) { t.Errorf("unexpected error code received: got %v", receivedErrCode) return } @@ -1160,7 +1156,7 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocycl To: account.ToAddress(TestConfig.Account2.Address), Value: (*hexutil.Big)(big.NewInt(1000000000000)), }) - if err != transactions.ErrQueuedTxDiscarded { + if err != sign.ErrSignReqDiscarded { t.Errorf("expected error not thrown: %v", err) return } @@ -1191,7 +1187,7 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocycl } discardResults := discardResultsStruct.Results - if len(discardResults) != 1 || discardResults["invalid-tx-id"].Error != transactions.ErrQueuedTxIDNotFound.Error() { + if len(discardResults) != 1 || discardResults["invalid-tx-id"].Error != sign.ErrSignReqNotFound.Error() { t.Errorf("cannot discard txs: %v", discardResults) return } @@ -1213,7 +1209,7 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocycl t.Errorf("tx id not set in result: expected id is %s", txID) return } - if txResult.Error != transactions.ErrQueuedTxIDNotFound.Error() { + if txResult.Error != sign.ErrSignReqNotFound.Error() { t.Errorf("invalid error for %s", txResult.Hash) return } @@ -1225,7 +1221,7 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocycl time.Sleep(1 * time.Second) // make sure that tx complete signal propagates for _, txID := range parsedIDs { - if txQueue.Has(string(txID)) { + if signRequests.Has(string(txID)) { t.Errorf("txqueue should not have test tx at this point (it should be discarded): %s", txID) return } @@ -1254,7 +1250,7 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocycl return false } - if txQueue.Count() != 0 { + if signRequests.Count() != 0 { t.Error("tx queue must be empty at this point") return false } @@ -1430,7 +1426,7 @@ func startTestNode(t *testing.T) <-chan struct{} { return } - if envelope.Type == transactions.EventTransactionQueued { + if envelope.Type == sign.EventTransactionQueued { } if envelope.Type == signal.EventNodeStarted { t.Log("Node started, but we wait till it be ready") diff --git a/sign/README.md b/sign/README.md new file mode 100644 index 000000000..64009f8ab --- /dev/null +++ b/sign/README.md @@ -0,0 +1,13 @@ +# sign +`sign` package represents the API and signals for sending and receiving +signature request to and from our API user. + +When a method is called that requires an additional signature confirmation from +a user (like, a transaction), it gets it's sign request. + +Client of the API is then nofified of the sign request. + +Client has a chance to approve the sign request (by providing a valid password) +or to discard it. When the request is approved, the locked functinality is +executed. + diff --git a/sign/errors.go b/sign/errors.go new file mode 100644 index 000000000..b49e31462 --- /dev/null +++ b/sign/errors.go @@ -0,0 +1,36 @@ +package sign + +import ( + "errors" + + "github.com/ethereum/go-ethereum/accounts/keystore" + "github.com/status-im/status-go/geth/account" +) + +// TODO (mandrigin): Change values of these errors when API change is made. +var ( + //ErrSignReqNotFound - error transaction hash not found + ErrSignReqNotFound = errors.New("transaction hash not found") + //ErrSignReqInProgress - error transaction is in progress + ErrSignReqInProgress = errors.New("transaction is in progress") + // TODO (mandrigin): to be moved to `transactions` package + //ErrInvalidCompleteTxSender - error transaction with invalid sender + ErrInvalidCompleteTxSender = errors.New("transaction can only be completed by the same account which created it") + //ErrSignReqTimedOut - error transaction sending timed out + ErrSignReqTimedOut = errors.New("transaction sending timed out") + //ErrSignReqDiscarded - error transaction discarded + ErrSignReqDiscarded = errors.New("transaction has been discarded") +) + +// remove from queue on any error (except for transient ones) and propagate +// defined as map[string]bool because errors from ethclient returned wrapped as jsonError +var transientErrs = map[string]bool{ + keystore.ErrDecrypt.Error(): true, // wrong password + ErrInvalidCompleteTxSender.Error(): true, // completing tx create from another account + account.ErrNoAccountSelected.Error(): true, // account not selected +} + +func isTransient(err error) bool { + _, transient := transientErrs[err.Error()] + return transient +} diff --git a/geth/transactions/notifications.go b/sign/notifications.go similarity index 55% rename from geth/transactions/notifications.go rename to sign/notifications.go index 4434b3e0f..f6214629e 100644 --- a/geth/transactions/notifications.go +++ b/sign/notifications.go @@ -1,6 +1,8 @@ -package transactions +package sign import ( + "context" + "github.com/ethereum/go-ethereum/accounts/keystore" "github.com/status-im/status-go/geth/signal" ) @@ -25,57 +27,73 @@ const ( SendTransactionDiscardedErrorCode ) +const ( + // MessageIDKey is a key for message ID + // This ID is required to track from which chat a given send transaction request is coming. + MessageIDKey = contextKey("message_id") +) + +type contextKey string // in order to make sure that our context key does not collide with keys from other packages + +// messageIDFromContext returns message id from context (if exists) +func messageIDFromContext(ctx context.Context) string { + if ctx == nil { + return "" + } + if messageID, ok := ctx.Value(MessageIDKey).(string); ok { + return messageID + } + + return "" +} + var txReturnCodes = map[error]int{ - nil: SendTransactionNoErrorCode, - keystore.ErrDecrypt: SendTransactionPasswordErrorCode, - ErrQueuedTxTimedOut: SendTransactionTimeoutErrorCode, - ErrQueuedTxDiscarded: SendTransactionDiscardedErrorCode, + nil: SendTransactionNoErrorCode, + keystore.ErrDecrypt: SendTransactionPasswordErrorCode, + ErrSignReqTimedOut: SendTransactionTimeoutErrorCode, + ErrSignReqDiscarded: SendTransactionDiscardedErrorCode, } // SendTransactionEvent is a signal sent on a send transaction request type SendTransactionEvent struct { - ID string `json:"id"` - Args SendTxArgs `json:"args"` - MessageID string `json:"message_id"` + ID string `json:"id"` + Args interface{} `json:"args"` + MessageID string `json:"message_id"` } // NotifyOnEnqueue returns handler that processes incoming tx queue requests -func NotifyOnEnqueue(queuedTx *QueuedTx) { +func NotifyOnEnqueue(request *Request) { signal.Send(signal.Envelope{ Type: EventTransactionQueued, Event: SendTransactionEvent{ - ID: queuedTx.ID, - Args: queuedTx.Args, - MessageID: messageIDFromContext(queuedTx.Context), + ID: request.ID, + Args: request.Meta, + MessageID: messageIDFromContext(request.context), }, }) } // ReturnSendTransactionEvent is a JSON returned whenever transaction send is returned type ReturnSendTransactionEvent struct { - ID string `json:"id"` - Args SendTxArgs `json:"args"` - MessageID string `json:"message_id"` - ErrorMessage string `json:"error_message"` - ErrorCode int `json:"error_code,string"` + ID string `json:"id"` + Args interface{} `json:"args"` + MessageID string `json:"message_id"` + ErrorMessage string `json:"error_message"` + ErrorCode int `json:"error_code,string"` } // NotifyOnReturn returns handler that processes responses from internal tx manager -func NotifyOnReturn(queuedTx *QueuedTx, err error) { +func NotifyOnReturn(request *Request, err error) { // we don't want to notify a user if tx was sent successfully if err == nil { return } - // discard notifications with empty tx - if queuedTx == nil { - return - } signal.Send(signal.Envelope{ Type: EventTransactionFailed, Event: ReturnSendTransactionEvent{ - ID: queuedTx.ID, - Args: queuedTx.Args, - MessageID: messageIDFromContext(queuedTx.Context), + ID: request.ID, + Args: request.Meta, + MessageID: messageIDFromContext(request.context), ErrorMessage: err.Error(), ErrorCode: sendTransactionErrorCode(err), }, @@ -86,5 +104,5 @@ func sendTransactionErrorCode(err error) int { if code, ok := txReturnCodes[err]; ok { return code } - return SendTxDefaultErrorCode + return SendTransactionDefaultErrorCode } diff --git a/sign/pending_requests.go b/sign/pending_requests.go new file mode 100644 index 000000000..969d65a49 --- /dev/null +++ b/sign/pending_requests.go @@ -0,0 +1,172 @@ +package sign + +import ( + "context" + "sync" + "time" + + gethcommon "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" + "github.com/status-im/status-go/geth/account" +) + +type verifyFunc func(string) (*account.SelectedExtKey, error) + +// PendingRequests is a capped container that holds pending signing requests. +type PendingRequests struct { + mu sync.RWMutex // to guard transactions map + requests map[string]*Request + + log log.Logger +} + +// NewPendingRequests creates a new requests list +func NewPendingRequests() *PendingRequests { + logger := log.New("package", "status-go/sign.PendingRequests") + + return &PendingRequests{ + requests: make(map[string]*Request), + log: logger, + } +} + +// Add a new signing request. +func (rs *PendingRequests) Add(ctx context.Context, meta Meta, completeFunc completeFunc) (*Request, error) { + rs.mu.Lock() + defer rs.mu.Unlock() + + request := newRequest(ctx, meta, completeFunc) + rs.requests[request.ID] = request + rs.log.Info("signing request is created", "ID", request.ID) + + go NotifyOnEnqueue(request) + + return request, nil +} + +// Get returns a signing request by it's ID. +func (rs *PendingRequests) Get(id string) (*Request, error) { + rs.mu.RLock() + defer rs.mu.RUnlock() + + if request, ok := rs.requests[id]; ok { + return request, nil + } + return nil, ErrSignReqNotFound +} + +// First returns a first signing request (if exists, nil otherwise). +func (rs *PendingRequests) First() *Request { + rs.mu.RLock() + defer rs.mu.RUnlock() + + for _, req := range rs.requests { + return req + } + + return nil +} + +// Approve a signing request by it's ID. Requires a valid password and a verification function. +func (rs *PendingRequests) Approve(id string, password string, verify verifyFunc) (hash gethcommon.Hash, err error) { + rs.log.Info("complete transaction", "id", id) + request, err := rs.tryLock(id) + if err != nil { + rs.log.Warn("can't process transaction", "err", err) + return hash, err + } + + selectedAccount, err := verify(password) + if err != nil { + rs.complete(request, hash, err) + return hash, err + } + + hash, err = request.completeFunc(selectedAccount) + rs.log.Info("finally completed transaction", "id", request.ID, "hash", hash, "err", err) + + rs.complete(request, hash, err) + + return hash, err +} + +// Discard remove a signing request from the list of pending requests. +func (rs *PendingRequests) Discard(id string) error { + request, err := rs.Get(id) + if err != nil { + return err + } + + rs.complete(request, gethcommon.Hash{}, ErrSignReqDiscarded) + return nil +} + +// Wait blocks until a request with a specified ID is completed (approved or discarded) +func (rs *PendingRequests) Wait(id string, timeout time.Duration) Result { + request, err := rs.Get(id) + if err != nil { + return Result{Error: err} + } + for { + select { + case rst := <-request.result: + return rst + case <-time.After(timeout): + rs.complete(request, gethcommon.Hash{}, ErrSignReqTimedOut) + } + } +} + +// Count returns number of currently pending requests +func (rs *PendingRequests) Count() int { + rs.mu.RLock() + defer rs.mu.RUnlock() + return len(rs.requests) +} + +// Has checks whether a pending request with a given identifier exists in the list +func (rs *PendingRequests) Has(id string) bool { + rs.mu.RLock() + defer rs.mu.RUnlock() + _, ok := rs.requests[id] + return ok +} + +// tryLock is used to avoid double-completion of the same request. +// it returns a request instance if it isn't processing yet, returns an error otherwise. +func (rs *PendingRequests) tryLock(id string) (*Request, error) { + rs.mu.Lock() + defer rs.mu.Unlock() + if tx, ok := rs.requests[id]; ok { + if tx.locked { + return nil, ErrSignReqInProgress + } + tx.locked = true + return tx, nil + } + return nil, ErrSignReqNotFound +} + +// complete removes the request from the list if there is no error or an error is non-transient +func (rs *PendingRequests) complete(request *Request, hash gethcommon.Hash, err error) { + rs.mu.Lock() + defer rs.mu.Unlock() + + request.locked = false + + go NotifyOnReturn(request, err) + + if err != nil && isTransient(err) { + return + } + + delete(rs.requests, request.ID) + + // hash is updated only if err is nil, but transaction is not removed from a queue + result := Result{Error: err} + if err == nil { + result.Hash = hash + } + + request.result <- result +} diff --git a/sign/pending_requests_test.go b/sign/pending_requests_test.go new file mode 100644 index 000000000..40492e352 --- /dev/null +++ b/sign/pending_requests_test.go @@ -0,0 +1,216 @@ +package sign + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/ethereum/go-ethereum/accounts/keystore" + gethcommon "github.com/ethereum/go-ethereum/common" + + "github.com/status-im/status-go/geth/account" + "github.com/stretchr/testify/suite" +) + +const ( + correctPassword = "password-correct" + wrongPassword = "password-wrong" +) + +func testVerifyFunc(password string) (*account.SelectedExtKey, error) { + if password == correctPassword { + return nil, nil + } + + return nil, keystore.ErrDecrypt +} + +func TestPendingRequestsSuite(t *testing.T) { + suite.Run(t, new(PendingRequestsSuite)) +} + +type PendingRequestsSuite struct { + suite.Suite + pendingRequests *PendingRequests +} + +func (s *PendingRequestsSuite) SetupTest() { + s.pendingRequests = NewPendingRequests() +} + +func (s *PendingRequestsSuite) defaultCompleteFunc() completeFunc { + hash := gethcommon.Hash{1} + return func(acc *account.SelectedExtKey) (gethcommon.Hash, error) { + s.Nil(acc, "account should be `nil`") + return hash, nil + } +} + +func (s *PendingRequestsSuite) delayedCompleteFunc() completeFunc { + hash := gethcommon.Hash{1} + return func(acc *account.SelectedExtKey) (gethcommon.Hash, error) { + time.Sleep(10 * time.Millisecond) + s.Nil(acc, "account should be `nil`") + return hash, nil + } +} + +func (s *PendingRequestsSuite) errorCompleteFunc(err error) completeFunc { + hash := gethcommon.Hash{1} + return func(acc *account.SelectedExtKey) (gethcommon.Hash, error) { + s.Nil(acc, "account should be `nil`") + return hash, err + } +} + +func (s *PendingRequestsSuite) TestGet() { + req, err := s.pendingRequests.Add(context.Background(), nil, s.defaultCompleteFunc()) + s.NoError(err) + for i := 2; i > 0; i-- { + actualRequest, err := s.pendingRequests.Get(req.ID) + s.NoError(err) + s.Equal(req, actualRequest) + } +} + +func (s *PendingRequestsSuite) testComplete(password string, hash gethcommon.Hash, completeFunc completeFunc) (string, error) { + req, err := s.pendingRequests.Add(context.Background(), nil, completeFunc) + s.NoError(err) + + s.True(s.pendingRequests.Has(req.ID), "sign request should exist") + + hash2, err := s.pendingRequests.Approve(req.ID, password, testVerifyFunc) + s.Equal(hash, hash2, "hashes should match") + + return req.ID, err +} + +func (s *PendingRequestsSuite) TestCompleteSuccess() { + id, err := s.testComplete(correctPassword, gethcommon.Hash{1}, s.defaultCompleteFunc()) + s.NoError(err, "no errors should be there") + + s.False(s.pendingRequests.Has(id), "sign request should not exist") +} + +func (s *PendingRequestsSuite) TestCompleteTransientError() { + hash := gethcommon.Hash{} + id, err := s.testComplete(wrongPassword, hash, s.errorCompleteFunc(keystore.ErrDecrypt)) + s.Equal(keystore.ErrDecrypt, err, "error value should be preserved") + + s.True(s.pendingRequests.Has(id)) + // verify that you are able to re-approve it after a transient error + _, err = s.pendingRequests.tryLock(id) + s.NoError(err) +} + +func (s *PendingRequestsSuite) TestCompleteError() { + hash := gethcommon.Hash{1} + expectedError := errors.New("test") + + id, err := s.testComplete(correctPassword, hash, s.errorCompleteFunc(expectedError)) + + s.Equal(expectedError, err, "error value should be preserved") + + s.False(s.pendingRequests.Has(id)) +} + +func (s PendingRequestsSuite) TestMultipleComplete() { + id, err := s.testComplete(correctPassword, gethcommon.Hash{1}, s.defaultCompleteFunc()) + s.NoError(err, "no errors should be there") + + _, err = s.pendingRequests.Approve(id, correctPassword, testVerifyFunc) + + s.Equal(ErrSignReqNotFound, err) +} + +func (s PendingRequestsSuite) TestConcurrentComplete() { + req, err := s.pendingRequests.Add(context.Background(), nil, s.delayedCompleteFunc()) + s.NoError(err) + + s.True(s.pendingRequests.Has(req.ID), "sign request should exist") + + approved := 0 + tried := 0 + + for i := 10; i > 0; i-- { + go func() { + _, err = s.pendingRequests.Approve(req.ID, correctPassword, testVerifyFunc) + if err == nil { + approved++ + } + tried++ + }() + } + + s.pendingRequests.Wait(req.ID, 10*time.Second) + + s.False(s.pendingRequests.Has(req.ID), "sign request should exist") + + s.Equal(approved, 1, "request should be approved only once") + s.Equal(tried, 10, "request should be tried to approve 10 times") +} + +func (s PendingRequestsSuite) TestWaitSuccess() { + req, err := s.pendingRequests.Add(context.Background(), nil, s.defaultCompleteFunc()) + s.NoError(err) + + s.True(s.pendingRequests.Has(req.ID), "sign request should exist") + + go func() { + _, err := s.pendingRequests.Approve(req.ID, correctPassword, testVerifyFunc) + s.NoError(err) + }() + + result := s.pendingRequests.Wait(req.ID, 1*time.Second) + s.NoError(result.Error) +} + +func (s PendingRequestsSuite) TestDiscard() { + req, err := s.pendingRequests.Add(context.Background(), nil, s.defaultCompleteFunc()) + s.NoError(err) + + s.True(s.pendingRequests.Has(req.ID), "sign request should exist") + + s.Equal(ErrSignReqNotFound, s.pendingRequests.Discard("")) + + go func() { + // enough to make it be called after Wait + time.Sleep(time.Millisecond) + s.NoError(s.pendingRequests.Discard(req.ID)) + }() + + result := s.pendingRequests.Wait(req.ID, 1*time.Second) + s.Equal(ErrSignReqDiscarded, result.Error) +} + +func (s PendingRequestsSuite) TestWaitFail() { + expectedError := errors.New("test-wait-fail") + req, err := s.pendingRequests.Add(context.Background(), nil, s.errorCompleteFunc(expectedError)) + s.NoError(err) + + s.True(s.pendingRequests.Has(req.ID), "sign request should exist") + + go func() { + _, err := s.pendingRequests.Approve(req.ID, correctPassword, testVerifyFunc) + s.Equal(expectedError, err) + }() + + result := s.pendingRequests.Wait(req.ID, 1*time.Second) + s.Equal(expectedError, result.Error) +} + +func (s PendingRequestsSuite) TestWaitTimeout() { + req, err := s.pendingRequests.Add(context.Background(), nil, s.delayedCompleteFunc()) + s.NoError(err) + + s.True(s.pendingRequests.Has(req.ID), "sign request should exist") + + go func() { + _, err := s.pendingRequests.Approve(req.ID, correctPassword, testVerifyFunc) + s.NoError(err) + }() + + result := s.pendingRequests.Wait(req.ID, 0*time.Second) + s.Equal(result.Error, ErrSignReqTimedOut) +} diff --git a/sign/request.go b/sign/request.go new file mode 100644 index 000000000..e3cfcf8a1 --- /dev/null +++ b/sign/request.go @@ -0,0 +1,36 @@ +package sign + +import ( + "context" + + "github.com/ethereum/go-ethereum/common" + "github.com/pborman/uuid" + "github.com/status-im/status-go/geth/account" +) + +type completeFunc func(*account.SelectedExtKey) (common.Hash, error) + +// Meta represents any metadata that could be attached to a signing request. +// It will be JSON-serialized and used in notifications to the API consumer. +type Meta interface{} + +// Request is a single signing request. +type Request struct { + ID string + Meta Meta + context context.Context + locked bool + completeFunc completeFunc + result chan Result +} + +func newRequest(ctx context.Context, meta Meta, completeFunc completeFunc) *Request { + return &Request{ + ID: uuid.New(), + Meta: meta, + context: ctx, + locked: false, + completeFunc: completeFunc, + result: make(chan Result, 1), + } +} diff --git a/sign/result.go b/sign/result.go new file mode 100644 index 000000000..eaa09db28 --- /dev/null +++ b/sign/result.go @@ -0,0 +1,9 @@ +package sign + +import "github.com/ethereum/go-ethereum/common" + +// Result is a result of a signing request, error or successful +type Result struct { + Hash common.Hash + Error error +} diff --git a/t/e2e/jail/jail_rpc_test.go b/t/e2e/jail/jail_rpc_test.go index e513be281..37b1e333a 100644 --- a/t/e2e/jail/jail_rpc_test.go +++ b/t/e2e/jail/jail_rpc_test.go @@ -13,7 +13,7 @@ import ( "github.com/status-im/status-go/geth/jail" "github.com/status-im/status-go/geth/params" "github.com/status-im/status-go/geth/signal" - "github.com/status-im/status-go/geth/transactions" + "github.com/status-im/status-go/sign" e2e "github.com/status-im/status-go/t/e2e" . "github.com/status-im/status-go/t/utils" "github.com/stretchr/testify/suite" @@ -126,7 +126,7 @@ func (s *JailRPCTestSuite) TestContractDeployment() { unmarshalErr := json.Unmarshal([]byte(jsonEvent), &envelope) s.NoError(unmarshalErr, "cannot unmarshal JSON: %s", jsonEvent) - if envelope.Type == transactions.EventTransactionQueued { + if envelope.Type == sign.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) s.T().Logf("transaction queued and will be completed shortly, id: %v", event["id"]) @@ -284,7 +284,7 @@ func (s *JailRPCTestSuite) TestJailVMPersistence() { s.T().Errorf("cannot unmarshal event's JSON: %s", jsonEvent) return } - if envelope.Type == transactions.EventTransactionQueued { + if envelope.Type == sign.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) s.T().Logf("Transaction queued (will be completed shortly): {id: %s}\n", event["id"].(string)) diff --git a/t/e2e/suites.go b/t/e2e/suites.go index 376ea5395..e8a8883ed 100644 --- a/t/e2e/suites.go +++ b/t/e2e/suites.go @@ -5,6 +5,7 @@ import ( whisper "github.com/ethereum/go-ethereum/whisper/whisperv6" "github.com/status-im/status-go/geth/api" + "github.com/status-im/status-go/sign" "github.com/status-im/status-go/geth/node" "github.com/status-im/status-go/geth/signal" @@ -123,9 +124,14 @@ func (s *BackendTestSuite) WhisperService() *whisper.Whisper { return whisperService } -// TxQueueManager returns a reference to the TxQueueManager. -func (s *BackendTestSuite) TxQueueManager() *transactions.Manager { - return s.Backend.TxQueueManager() +// Transactor returns a reference to the Transactor. +func (s *BackendTestSuite) Transactor() *transactions.Transactor { + return s.Backend.Transactor() +} + +// PendingSignRequests returns a reference to PendingSignRequests. +func (s *BackendTestSuite) PendingSignRequests() *sign.PendingRequests { + return s.Backend.PendingSignRequests() } func importTestAccounts(keyStoreDir string) (err error) { diff --git a/t/e2e/transactions/transactions_test.go b/t/e2e/transactions/transactions_test.go index 8ee495c5d..eeae4f84a 100644 --- a/t/e2e/transactions/transactions_test.go +++ b/t/e2e/transactions/transactions_test.go @@ -6,7 +6,6 @@ import ( "fmt" "math/big" "reflect" - "sync" "testing" "time" @@ -18,6 +17,7 @@ import ( "github.com/status-im/status-go/geth/params" "github.com/status-im/status-go/geth/signal" "github.com/status-im/status-go/geth/transactions" + "github.com/status-im/status-go/sign" e2e "github.com/status-im/status-go/t/e2e" . "github.com/status-im/status-go/t/utils" "github.com/stretchr/testify/suite" @@ -50,12 +50,11 @@ func (s *TransactionsTestSuite) TestCallRPCSendTransaction() { err := json.Unmarshal([]byte(rawSignal), &sg) s.NoError(err) - if sg.Type == transactions.EventTransactionQueued { + if sg.Type == sign.EventTransactionQueued { event := sg.Event.(map[string]interface{}) txID := event["id"].(string) txHash, err = s.Backend.CompleteTransaction(string(txID), TestConfig.Account1.Password) s.NoError(err, "cannot complete queued transaction %s", txID) - close(transactionCompleted) } }) @@ -102,7 +101,7 @@ func (s *TransactionsTestSuite) TestCallRPCSendTransactionUpstream() { err := json.Unmarshal([]byte(rawSignal), &signalEnvelope) s.NoError(err) - if signalEnvelope.Type == transactions.EventTransactionQueued { + if signalEnvelope.Type == sign.EventTransactionQueued { event := signalEnvelope.Event.(map[string]interface{}) txID := event["id"].(string) @@ -155,11 +154,12 @@ func (s *TransactionsTestSuite) TestEmptyToFieldPreserved() { } err := json.Unmarshal([]byte(rawSignal), &sg) s.NoError(err) - if sg.Type == transactions.EventTransactionQueued { - var event transactions.SendTransactionEvent + if sg.Type == sign.EventTransactionQueued { + var event sign.SendTransactionEvent s.NoError(json.Unmarshal(sg.Event, &event)) - s.NotNil(event.Args.From) - s.Nil(event.Args.To) + args := event.Args.(map[string]interface{}) + s.NotNil(args["from"]) + s.Nil(args["to"]) _, err := s.Backend.CompleteTransaction(event.ID, TestConfig.Account1.Password) s.NoError(err) close(transactionCompleted) @@ -203,6 +203,8 @@ func (s *TransactionsTestSuite) TestSendContractTxCollision() { } s.testSendContractTx(initFunc, nil, "") + s.NoError(s.Backend.AccountManager().Logout()) + // Scenario 2: Both fields are filled with different values, expect an error inverted := func(source []byte) []byte { inverse := make([]byte, len(source)) @@ -245,7 +247,7 @@ func (s *TransactionsTestSuite) testSendContractTx(setInputAndDataValue initFunc err = json.Unmarshal([]byte(jsonEvent), &envelope) s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent)) - if envelope.Type == transactions.EventTransactionQueued { + if envelope.Type == sign.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) log.Info("transaction queued (will be completed shortly)", "id", event["id"].(string)) @@ -271,7 +273,7 @@ func (s *TransactionsTestSuite) testSendContractTx(setInputAndDataValue initFunc ) s.EqualError( err, - transactions.ErrInvalidCompleteTxSender.Error(), + sign.ErrInvalidCompleteTxSender.Error(), fmt.Sprintf("expected error on queued transaction[%v] not thrown", event["id"]), ) @@ -282,7 +284,11 @@ func (s *TransactionsTestSuite) testSendContractTx(setInputAndDataValue initFunc string(event["id"].(string)), TestConfig.Account1.Password, ) - s.NoError(err, fmt.Sprintf("cannot complete queued transaction[%v]", event["id"])) + if expectedError != nil { + s.Equal(expectedError, err) + } else { + s.NoError(err, fmt.Sprintf("cannot complete queued transaction[%v]", event["id"])) + } log.Info("contract transaction complete", "URL", "https://ropsten.etherscan.io/tx/"+txHash.Hex()) close(completeQueuedTransaction) @@ -319,7 +325,7 @@ func (s *TransactionsTestSuite) testSendContractTx(setInputAndDataValue initFunc s.Equal(txHashCheck.Hex(), txHash.Hex(), "transaction hash returned from SendTransaction is invalid") s.False(reflect.DeepEqual(txHashCheck, gethcommon.Hash{}), "transaction was never queued or completed") - s.Zero(s.TxQueueManager().TransactionQueue().Count(), "tx queue must be empty at this point") + s.Zero(s.PendingSignRequests().Count(), "tx queue must be empty at this point") } func (s *TransactionsTestSuite) TestSendEther() { @@ -341,7 +347,7 @@ func (s *TransactionsTestSuite) TestSendEther() { err = json.Unmarshal([]byte(jsonEvent), &envelope) s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent)) - if envelope.Type == transactions.EventTransactionQueued { + if envelope.Type == sign.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) log.Info("transaction queued (will be completed shortly)", "id", event["id"].(string)) @@ -365,7 +371,7 @@ func (s *TransactionsTestSuite) TestSendEther() { string(event["id"].(string)), TestConfig.Account1.Password) s.EqualError( err, - transactions.ErrInvalidCompleteTxSender.Error(), + sign.ErrInvalidCompleteTxSender.Error(), fmt.Sprintf("expected error on queued transaction[%v] not thrown", event["id"]), ) @@ -399,7 +405,7 @@ func (s *TransactionsTestSuite) TestSendEther() { s.Equal(txHashCheck.Hex(), txHash.Hex(), "transaction hash returned from SendTransaction is invalid") s.False(reflect.DeepEqual(txHashCheck, gethcommon.Hash{}), "transaction was never queued or completed") - s.Zero(s.Backend.TxQueueManager().TransactionQueue().Count(), "tx queue must be empty at this point") + s.Zero(s.Backend.PendingSignRequests().Count(), "tx queue must be empty at this point") } func (s *TransactionsTestSuite) TestSendEtherTxUpstream() { @@ -424,7 +430,7 @@ func (s *TransactionsTestSuite) TestSendEtherTxUpstream() { err = json.Unmarshal([]byte(jsonEvent), &envelope) s.NoError(err, "cannot unmarshal JSON: %s", jsonEvent) - if envelope.Type == transactions.EventTransactionQueued { + if envelope.Type == sign.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) log.Info("transaction queued (will be completed shortly)", "id", event["id"].(string)) @@ -456,7 +462,7 @@ func (s *TransactionsTestSuite) TestSendEtherTxUpstream() { } s.Equal(txHash.Hex(), txHashCheck.Hex(), "transaction hash returned from SendTransaction is invalid") - s.Zero(s.Backend.TxQueueManager().TransactionQueue().Count(), "tx queue must be empty at this point") + s.Zero(s.Backend.PendingSignRequests().Count(), "tx queue must be empty at this point") } func (s *TransactionsTestSuite) TestDoubleCompleteQueuedTransactions() { @@ -478,7 +484,7 @@ func (s *TransactionsTestSuite) TestDoubleCompleteQueuedTransactions() { err := json.Unmarshal([]byte(jsonEvent), &envelope) s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent)) - if envelope.Type == transactions.EventTransactionQueued { + if envelope.Type == sign.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) txID := string(event["id"].(string)) log.Info("transaction queued (will be failed and completed on the second call)", "id", txID) @@ -488,7 +494,7 @@ func (s *TransactionsTestSuite) TestDoubleCompleteQueuedTransactions() { _, err = s.Backend.CompleteTransaction(txID, TestConfig.Account1.Password+"wrong") s.EqualError(err, keystore.ErrDecrypt.Error()) - s.Equal(1, s.TxQueueManager().TransactionQueue().Count(), "txqueue cannot be empty, as tx has failed") + s.Equal(1, s.PendingSignRequests().Count(), "txqueue cannot be empty, as tx has failed") // now try to complete transaction, but with the correct password txHash, err = s.Backend.CompleteTransaction(txID, TestConfig.Account1.Password) @@ -498,7 +504,7 @@ func (s *TransactionsTestSuite) TestDoubleCompleteQueuedTransactions() { close(completeQueuedTransaction) } - if envelope.Type == transactions.EventTransactionFailed { + if envelope.Type == sign.EventTransactionFailed { event := envelope.Event.(map[string]interface{}) log.Info("transaction return event received", "id", event["id"].(string)) @@ -529,7 +535,7 @@ func (s *TransactionsTestSuite) TestDoubleCompleteQueuedTransactions() { s.Equal(txHashCheck.Hex(), txHash.Hex(), "transaction hash returned from SendTransaction is invalid") s.False(reflect.DeepEqual(txHashCheck, gethcommon.Hash{}), "transaction was never queued or completed") - s.Zero(s.Backend.TxQueueManager().TransactionQueue().Count(), "tx queue must be empty at this point") + s.Zero(s.Backend.PendingSignRequests().Count(), "tx queue must be empty at this point") s.True(txFailedEventCalled, "expected tx failure signal is not received") } @@ -539,9 +545,6 @@ func (s *TransactionsTestSuite) TestDiscardQueuedTransaction() { EnsureNodeSync(s.Backend.StatusNode().EnsureSync) - // reset queue - s.Backend.TxQueueManager().TransactionQueue().Reset() - // log into account from which transactions will be sent s.NoError(s.Backend.SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password)) @@ -554,12 +557,12 @@ func (s *TransactionsTestSuite) TestDiscardQueuedTransaction() { err := json.Unmarshal([]byte(jsonEvent), &envelope) s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent)) - if envelope.Type == transactions.EventTransactionQueued { + if envelope.Type == sign.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) txID := string(event["id"].(string)) log.Info("transaction queued (will be discarded soon)", "id", txID) - s.True(s.Backend.TxQueueManager().TransactionQueue().Has(txID), "txqueue should still have test tx") + s.True(s.Backend.PendingSignRequests().Has(txID), "txqueue should still have test tx") // discard err := s.Backend.DiscardTransaction(txID) @@ -570,18 +573,18 @@ func (s *TransactionsTestSuite) TestDiscardQueuedTransaction() { s.EqualError(err, "transaction hash not found", "expects tx not found, but call to CompleteTransaction succeeded") time.Sleep(1 * time.Second) // make sure that tx complete signal propagates - s.False(s.Backend.TxQueueManager().TransactionQueue().Has(txID), + s.False(s.Backend.PendingSignRequests().Has(txID), fmt.Sprintf("txqueue should not have test tx at this point (it should be discarded): %s", txID)) close(completeQueuedTransaction) } - if envelope.Type == transactions.EventTransactionFailed { + if envelope.Type == sign.EventTransactionFailed { event := envelope.Event.(map[string]interface{}) log.Info("transaction return event received", "id", event["id"].(string)) receivedErrMessage := event["error_message"].(string) - expectedErrMessage := transactions.ErrQueuedTxDiscarded.Error() + expectedErrMessage := sign.ErrSignReqDiscarded.Error() s.Equal(receivedErrMessage, expectedErrMessage) receivedErrCode := event["error_code"].(string) @@ -597,16 +600,16 @@ func (s *TransactionsTestSuite) TestDiscardQueuedTransaction() { To: account.ToAddress(TestConfig.Account2.Address), Value: (*hexutil.Big)(big.NewInt(1000000000000)), }) - s.EqualError(err, transactions.ErrQueuedTxDiscarded.Error(), "transaction is expected to be discarded") + s.EqualError(err, sign.ErrSignReqDiscarded.Error(), "transaction is expected to be discarded") select { case <-completeQueuedTransaction: - case <-time.After(time.Minute): + case <-time.After(10 * time.Second): s.FailNow("test timed out") } s.True(reflect.DeepEqual(txHashCheck, gethcommon.Hash{}), "transaction returned hash, while it shouldn't") - s.Zero(s.Backend.TxQueueManager().TransactionQueue().Count(), "tx queue must be empty at this point") + s.Zero(s.Backend.PendingSignRequests().Count(), "tx queue must be empty at this point") s.True(txFailedEventCalled, "expected tx failure signal is not received") } @@ -614,8 +617,6 @@ func (s *TransactionsTestSuite) TestCompleteMultipleQueuedTransactions() { s.setupLocalNode() defer s.StopTestBackend() - s.TxQueueManager().TransactionQueue().Reset() - // log into account from which transactions will be sent err := s.Backend.SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password) s.NoError(err) @@ -629,9 +630,6 @@ func (s *TransactionsTestSuite) TestDiscardMultipleQueuedTransactions() { EnsureNodeSync(s.Backend.StatusNode().EnsureSync) - // reset queue - s.Backend.TxQueueManager().TransactionQueue().Reset() - // log into account from which transactions will be sent s.NoError(s.Backend.SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password)) @@ -645,22 +643,22 @@ func (s *TransactionsTestSuite) TestDiscardMultipleQueuedTransactions() { var envelope signal.Envelope err := json.Unmarshal([]byte(jsonEvent), &envelope) s.NoError(err) - if envelope.Type == transactions.EventTransactionQueued { + if envelope.Type == sign.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) txID := string(event["id"].(string)) log.Info("transaction queued (will be discarded soon)", "id", txID) - s.True(s.Backend.TxQueueManager().TransactionQueue().Has(txID), + s.True(s.Backend.PendingSignRequests().Has(txID), "txqueue should still have test tx") txIDs <- txID } - if envelope.Type == transactions.EventTransactionFailed { + if envelope.Type == sign.EventTransactionFailed { event := envelope.Event.(map[string]interface{}) log.Info("transaction return event received", "id", event["id"].(string)) receivedErrMessage := event["error_message"].(string) - expectedErrMessage := transactions.ErrQueuedTxDiscarded.Error() + expectedErrMessage := sign.ErrSignReqDiscarded.Error() s.Equal(receivedErrMessage, expectedErrMessage) receivedErrCode := event["error_code"].(string) @@ -682,11 +680,11 @@ func (s *TransactionsTestSuite) TestDiscardMultipleQueuedTransactions() { To: account.ToAddress(TestConfig.Account2.Address), Value: (*hexutil.Big)(big.NewInt(1000000000000)), }) - require.EqualError(err, transactions.ErrQueuedTxDiscarded.Error()) + require.EqualError(err, sign.ErrSignReqDiscarded.Error()) require.Equal(gethcommon.Hash{}, txHashCheck, "transaction returned hash, while it shouldn't") } - txQueueManager := s.Backend.TxQueueManager() + signRequests := s.Backend.PendingSignRequests() // wait for transactions, and discard immediately discardTxs := func(txIDs []string) { @@ -710,7 +708,7 @@ func (s *TransactionsTestSuite) TestDiscardMultipleQueuedTransactions() { for _, txID := range txIDs { require.False( - txQueueManager.TransactionQueue().Has(txID), + signRequests.Has(txID), "txqueue should not have test tx at this point (it should be discarded): %s", txID, ) @@ -735,8 +733,9 @@ func (s *TransactionsTestSuite) TestDiscardMultipleQueuedTransactions() { case <-time.After(1 * time.Minute): s.FailNow("test timed out") } + time.Sleep(5 * time.Second) - s.Zero(s.Backend.TxQueueManager().TransactionQueue().Count(), "tx queue must be empty at this point") + s.Zero(s.Backend.PendingSignRequests().Count(), "tx queue must be empty at this point") } func (s *TransactionsTestSuite) TestNonExistentQueuedTransactions() { @@ -752,68 +751,13 @@ func (s *TransactionsTestSuite) TestNonExistentQueuedTransactions() { // try completing non-existing transaction _, err := s.Backend.CompleteTransaction("some-bad-transaction-id", TestConfig.Account1.Password) s.Error(err, "error expected and not received") - s.EqualError(err, transactions.ErrQueuedTxIDNotFound.Error()) -} - -func (s *TransactionsTestSuite) TestEvictionOfQueuedTransactions() { - s.StartTestBackend() - defer s.StopTestBackend() - - var m sync.Mutex - txCount := 0 - txIDs := [transactions.DefaultTxQueueCap + 5 + 10]string{} - - signal.SetDefaultNodeNotificationHandler(func(rawSignal string) { - var sg signal.Envelope - err := json.Unmarshal([]byte(rawSignal), &sg) - s.NoError(err) - - if sg.Type == transactions.EventTransactionQueued { - event := sg.Event.(map[string]interface{}) - txID := event["id"].(string) - m.Lock() - txIDs[txCount] = string(txID) - txCount++ - m.Unlock() - } - }) - - // reset queue - s.Backend.TxQueueManager().TransactionQueue().Reset() - - // log into account from which transactions will be sent - s.NoError(s.Backend.SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password)) - - txQueue := s.Backend.TxQueueManager().TransactionQueue() - s.Zero(txQueue.Count(), "transaction count should be zero") - - for j := 0; j < 10; j++ { - go s.Backend.SendTransaction(context.TODO(), transactions.SendTxArgs{}) // nolint: errcheck - } - time.Sleep(2 * time.Second) - s.Equal(10, txQueue.Count(), "transaction count should be 10") - - for i := 0; i < transactions.DefaultTxQueueCap+5; i++ { // stress test by hitting with lots of goroutines - go s.Backend.SendTransaction(context.TODO(), transactions.SendTxArgs{}) // nolint: errcheck - } - time.Sleep(5 * time.Second) - - s.True(txQueue.Count() <= transactions.DefaultTxQueueCap, "transaction count should be %d (or %d): got %d", transactions.DefaultTxQueueCap, transactions.DefaultTxQueueCap-1, txQueue.Count()) - - m.Lock() - for _, txID := range txIDs { - txQueue.Remove(txID) - } - m.Unlock() - s.Zero(txQueue.Count(), "transaction count should be zero: %d", txQueue.Count()) + s.EqualError(err, sign.ErrSignReqNotFound.Error()) } func (s *TransactionsTestSuite) TestCompleteMultipleQueuedTransactionsUpstream() { s.setupUpstreamNode() defer s.StopTestBackend() - s.TxQueueManager().TransactionQueue().Reset() - // log into account from which transactions will be sent err := s.Backend.SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password) s.NoError(err) @@ -849,7 +793,7 @@ func (s *TransactionsTestSuite) sendConcurrentTransactions(testTxCount int) { err := json.Unmarshal([]byte(jsonEvent), &envelope) require.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent)) - if envelope.Type == transactions.EventTransactionQueued { + if envelope.Type == sign.EventTransactionQueued { event := envelope.Event.(map[string]interface{}) txID := string(event["id"].(string)) log.Info("transaction queued (will be completed in a single call, once aggregated)", "id", txID) @@ -892,7 +836,7 @@ func (s *TransactionsTestSuite) sendConcurrentTransactions(testTxCount int) { for _, txID := range txIDs { s.False( - s.Backend.TxQueueManager().TransactionQueue().Has(txID), + s.Backend.PendingSignRequests().Has(txID), "txqueue should not have test tx at this point (it should be completed)", ) } @@ -918,5 +862,5 @@ func (s *TransactionsTestSuite) sendConcurrentTransactions(testTxCount int) { s.FailNow("test timed out") } - s.Zero(s.TxQueueManager().TransactionQueue().Count(), "queue should be empty") + s.Zero(s.PendingSignRequests().Count(), "queue should be empty") }